Commit 14c00d3c authored by unknown's avatar unknown

WL#3337 (Event scheduler new architecture)

More small fixes to the API : use LEX_STRING instead of LEX_STRING* and if error
then return bool(true) instead of error code.
Merged functions. Reduced usage of sp_name.
Fixed a lot of function documentation errors.
Added function documentation wherever needed.
Removed some unused defines and error codes.

Next to come is batch rename of Event_scheduler_ng to Event_scheduler.


mysql-test/r/events.result:
  update result
mysql-test/r/events_logs_tests.result:
  update result
mysql-test/t/events.test:
  more test coverage
mysql-test/t/events_logs_tests.test:
  fix test
sql/event_data_objects.cc:
  Cosmetics.
  Fix function documentation whenever needed.
  Move Event_job_data::compile() next to Event_job_data::execute()
sql/event_data_objects.h:
  Remove unneeded error codes and defines
  Move function declarations at the end of the header
sql/event_db_repository.cc:
  Fix function documentation.
  Event_db_repository::update_event() now uses LEX_STRING *-s instead of
  sp_name . Lower coupling.
sql/event_db_repository.h:
  Event_db_repository::update_event() now uses LEX_STRING *-s instead of
  sp_name . Lower coupling.
  find_event -> find_named_event
  find_event_by_name is not used externally, merge with load_named_event()
sql/event_queue.cc:
  LEX_STRING* to LEX_STRING
  Fix comments.
  Fix and add function documentation.
  Remove Event_queue::events_count() as it is unused
  Change get_top_for_execution_if_time() to return status code as return value
  and the object is in out parameter.
sql/event_queue.h:
  LEX_STRING* to LEX_STRING
  Fix comments.
  Fix and add function documentation.
  Remove Event_queue::events_count() as it is unused
  Change get_top_for_execution_if_time() to return status code as return value
  and the object is in out parameter.
  Try to detect also lock attemptions for deadlocks.
sql/event_scheduler_ng.cc:
  Always execute on thd->mem_root
  Fix according to changed API of Event_queue::get_top_for_execution_if_time()
sql/events.cc:
  Fix function documentation.
  Fix code after API changes of internal Event module classes.
sql/events.h:
  sp_name -> LEX_STRINGs
sql/sql_parse.cc:
  Fix according to changed API of Events::show_create_event()
sql/sql_yacc.yy:
  Don't pass NULL as third parameter to sp_head::init_strings()
parent 3b6894af
......@@ -111,7 +111,15 @@ a
800219
drop event non_qualif_ev;
drop table non_qualif;
alter event non_existant rename to non_existant_too;
ERROR HY000: Unknown event 'non_existant'
set global event_scheduler = 2;
create event existant on schedule at now() + interval 1 year do select 12;
alter event non_existant rename to existant;
ERROR HY000: Event 'existant' already exists
alter event existant rename to events_test.existant;
ERROR HY000: Same old and new event name
drop event existant;
create table t_event3 (a int, b float);
drop event if exists event3;
Warnings:
......
create database if not exists events_test;
use events_test;
CREATE DATABASE IF NOT EXISTS events_test;
USE events_test;
"We use procedure here because its statements won't be logged into the general log"
"If we had used normal select that are logged in different ways depending on whether"
"the test suite is run in normal mode or with --ps-protocol"
......@@ -8,18 +8,21 @@ BEGIN
SELECT user_host, argument FROM mysql.general_log WHERE argument LIKE '%alabala%';
END|
"Check General Query Log"
SET GLOBAL event_scheduler=2;
create event log_general on schedule every 1 minute do SELect 'alabala', sleep(1) from dual;
TRUNCATE mysql.general_log;
"1 row, the current statement!"
call select_general_log();
CALL select_general_log();
user_host argument
USER_HOST CREATE procedure select_general_log()
BEGIN
SELECT user_host, argument FROM mysql.general_log WHERE argument LIKE '%alabala%';
END
SET GLOBAL event_scheduler=1;
TRUNCATE mysql.general_log;
CREATE EVENT log_general ON SCHEDULE EVERY 1 MINUTE DO SELECT 'alabala', SLEEP(1) FROM DUAL;
"Wait the scheduler to start"
"Should see 3 rows - the 'SELect' is in the middle. The other two are selects from general_log"
call select_general_log();
"Should see 2 rows - the 'SELECT' is in the middle. The other two are selects from general_log"
CALL select_general_log();
user_host argument
USER_HOST SELect 'alabala', sleep(1) from dual
USER_HOST CREATE EVENT log_general ON SCHEDULE EVERY 1 MINUTE DO SELECT 'alabala', SLEEP(1) FROM DUAL
USER_HOST SELECT 'alabala', SLEEP(1) FROM DUAL
DROP PROCEDURE select_general_log;
DROP EVENT log_general;
SET GLOBAL event_scheduler=2;
......@@ -90,4 +93,4 @@ TRUNCATE mysql.slow_log;
DROP TABLE slow_event_test;
SET GLOBAL long_query_time =@old_global_long_query_time;
SET SESSION long_query_time =@old_session_long_query_time;
drop database events_test;
DROP DATABASE events_test;
......@@ -105,7 +105,18 @@ create event non_qualif_ev on schedule every 10 minute do insert into non_qualif
select * from non_qualif;
drop event non_qualif_ev;
drop table non_qualif;
--error ER_EVENT_DOES_NOT_EXIST
alter event non_existant rename to non_existant_too;
set global event_scheduler = 2;
create event existant on schedule at now() + interval 1 year do select 12;
--error ER_EVENT_ALREADY_EXISTS
alter event non_existant rename to existant;
--error ER_EVENT_SAME_NAME
alter event existant rename to events_test.existant;
drop event existant;
create table t_event3 (a int, b float);
drop event if exists event3;
......
# Can't test with embedded server that doesn't support grants
-- source include/not_embedded.inc
create database if not exists events_test;
use events_test;
CREATE DATABASE IF NOT EXISTS events_test;
USE events_test;
--echo "We use procedure here because its statements won't be logged into the general log"
--echo "If we had used normal select that are logged in different ways depending on whether"
--echo "the test suite is run in normal mode or with --ps-protocol"
......@@ -13,18 +13,16 @@ BEGIN
END|
delimiter ;|
--echo "Check General Query Log"
SET GLOBAL event_scheduler=2;
create event log_general on schedule every 1 minute do SELect 'alabala', sleep(1) from dual;
TRUNCATE mysql.general_log;
--echo "1 row, the current statement!"
--replace_column 1 USER_HOST
call select_general_log();
CALL select_general_log();
SET GLOBAL event_scheduler=1;
TRUNCATE mysql.general_log;
CREATE EVENT log_general ON SCHEDULE EVERY 1 MINUTE DO SELECT 'alabala', SLEEP(1) FROM DUAL;
--echo "Wait the scheduler to start"
--echo "Should see 3 rows - the 'SELect' is in the middle. The other two are selects from general_log"
--sleep 0.7
--sleep 1.5
--echo "Should see 2 rows - the 'SELECT' is in the middle. The other two are selects from general_log"
--replace_column 1 USER_HOST
call select_general_log();
CALL select_general_log();
DROP PROCEDURE select_general_log;
DROP EVENT log_general;
SET GLOBAL event_scheduler=2;
......@@ -102,4 +100,4 @@ DROP TABLE slow_event_test;
SET GLOBAL long_query_time =@old_global_long_query_time;
SET SESSION long_query_time =@old_session_long_query_time;
drop database events_test;
DROP DATABASE events_test;
......@@ -182,12 +182,10 @@ Event_parse_data::init_body(THD *thd)
SYNOPSIS
Event_parse_data::init_definer()
RETURN VALUE
0 OK
thd Thread
*/
int
void
Event_parse_data::init_definer(THD *thd)
{
int definer_user_len;
......@@ -216,22 +214,20 @@ Event_parse_data::init_definer(THD *thd)
definer.str[definer.length]= '\0';
DBUG_PRINT("info",("definer [%s] initted", definer.str));
DBUG_RETURN(0);
DBUG_VOID_RETURN;
}
/*
Set time for execution for one time events.
Sets time for execution for one-time event.
SYNOPSIS
Event_parse_data::init_execute_at()
expr when (datetime)
thd Thread
RETURN VALUE
0 OK
EVEX_PARSE_ERROR fix_fields failed
EVEX_BAD_PARAMS datetime is in the past
ER_WRONG_VALUE wrong value for execute at
0 OK
ER_WRONG_VALUE Wrong value for execute at (reported)
*/
int
......@@ -293,18 +289,16 @@ Event_parse_data::init_execute_at(THD *thd)
/*
Set time for execution for transient events.
Sets time for execution of multi-time event.s
SYNOPSIS
Event_parse_data::init_interval()
expr how much?
new_interval what is the interval
thd Thread
RETURN VALUE
0 OK
EVEX_PARSE_ERROR fix_fields failed (reported)
EVEX_BAD_PARAMS Interval is not positive (reported)
EVEX_MICROSECOND_UNSUP Microseconds are not supported (reported)
0 OK
EVEX_BAD_PARAMS Interval is not positive or MICROSECOND (reported)
ER_WRONG_VALUE Wrong value for interval (reported)
*/
int
......@@ -402,12 +396,11 @@ Event_parse_data::init_interval(THD *thd)
/*
Sets activation time.
Sets STARTS.
SYNOPSIS
Event_parse_data::init_starts()
expr how much?
interval what is the interval
NOTES
Note that activation time is not execution time.
......@@ -418,9 +411,8 @@ Event_parse_data::init_interval(THD *thd)
same time.
RETURN VALUE
0 OK
EVEX_PARSE_ERROR fix_fields failed
EVEX_BAD_PARAMS starts before now
0 OK
ER_WRONG_VALUE Starts before now
*/
int
......@@ -471,12 +463,11 @@ Event_parse_data::init_starts(THD *thd)
/*
Sets deactivation time.
Sets ENDS (deactivation time).
SYNOPSIS
Event_parse_data::init_ends()
thd THD
new_ends When?
NOTES
Note that activation time is not execution time.
......@@ -566,7 +557,7 @@ Event_parse_data::report_bad_value(const char *item_name, Item *bad_item)
/*
Performs checking of the data gathered during the parsing phase.
Checks for validity the data gathered during the parsing phase.
SYNOPSIS
Event_parse_data::check_parse_data()
......@@ -594,6 +585,7 @@ Event_parse_data::check_parse_data(THD *thd)
DBUG_RETURN(ret);
}
/*
Constructor
......@@ -769,11 +761,8 @@ Event_timed::init()
{
DBUG_ENTER("Event_timed::init");
body.str= comment.str= NULL;
body.length= comment.length= 0;
definer_user.str= definer_host.str= 0;
definer_user.length= definer_host.length= 0;
definer_user.str= definer_host.str= body.str= comment.str= NULL;
definer_user.length= definer_host.length= body.length= comment.length= 0;
sql_mode= 0;
......@@ -880,7 +869,7 @@ Event_queue_element::load_from_row(TABLE *table)
expression= 0;
/*
If res1 and res2 are TRUE then both fields are empty.
Hence if ET_FIELD_EXECUTE_AT is empty there is an error.
Hence, if ET_FIELD_EXECUTE_AT is empty there is an error.
*/
execute_at_null= table->field[ET_FIELD_EXECUTE_AT]->is_null();
DBUG_ASSERT(!(starts_null && ends_null && !expression && execute_at_null));
......@@ -1440,8 +1429,8 @@ Event_queue_element::drop(THD *thd)
uint tmp= 0;
DBUG_ENTER("Event_queue_element::drop");
DBUG_RETURN(Events::get_instance()->drop_event(thd, dbname, name, FALSE,
&tmp, TRUE));
DBUG_RETURN(Events::get_instance()->
drop_event(thd, dbname, name, FALSE, &tmp, TRUE));
}
......@@ -1453,20 +1442,17 @@ Event_queue_element::drop(THD *thd)
thd - thread context
RETURN VALUE
0 OK
EVEX_OPEN_TABLE_FAILED Error while opening mysql.event for writing
EVEX_WRITE_ROW_FAILED On error to write to disk
others return code from SE in case deletion of the event
row failed.
FALSE OK
TRUE Error while opening mysql.event for writing or during write on disk
*/
bool
Event_queue_element::update_timing_fields(THD *thd)
{
TABLE *table;
Field **fields;
Open_tables_state backup;
int ret;
int ret= FALSE;
DBUG_ENTER("Event_queue_element::update_timing_fields");
......@@ -1480,12 +1466,12 @@ Event_queue_element::update_timing_fields(THD *thd)
if (Events::get_instance()->open_event_table(thd, TL_WRITE, &table))
{
ret= EVEX_OPEN_TABLE_FAILED;
ret= TRUE;
goto done;
}
fields= table->field;
if ((ret= Events::get_instance()->db_repository->
find_event_by_name(thd, dbname, name, table)))
find_named_event(thd, dbname, name, table)))
goto done;
store_record(table,record[1]);
......@@ -1494,20 +1480,20 @@ Event_queue_element::update_timing_fields(THD *thd)
if (last_executed_changed)
{
table->field[ET_FIELD_LAST_EXECUTED]->set_notnull();
table->field[ET_FIELD_LAST_EXECUTED]->store_time(&last_executed,
fields[ET_FIELD_LAST_EXECUTED]->set_notnull();
fields[ET_FIELD_LAST_EXECUTED]->store_time(&last_executed,
MYSQL_TIMESTAMP_DATETIME);
last_executed_changed= FALSE;
}
if (status_changed)
{
table->field[ET_FIELD_STATUS]->set_notnull();
table->field[ET_FIELD_STATUS]->store((longlong)status, TRUE);
fields[ET_FIELD_STATUS]->set_notnull();
fields[ET_FIELD_STATUS]->store((longlong)status, TRUE);
status_changed= FALSE;
}
if ((table->file->ha_update_row(table->record[1],table->record[0])))
ret= EVEX_WRITE_ROW_FAILED;
if ((table->file->ha_update_row(table->record[1], table->record[0])))
ret= TRUE;
done:
close_thread_tables(thd);
......@@ -1550,10 +1536,9 @@ Event_timed::get_create_event(THD *thd, String *buf)
buf->append(STRING_WITH_LEN("CREATE EVENT "));
append_identifier(thd, buf, name.str, name.length);
buf->append(STRING_WITH_LEN(" ON SCHEDULE "));
if (expression)
{
buf->append(STRING_WITH_LEN("EVERY "));
buf->append(STRING_WITH_LEN(" ON SCHEDULE EVERY "));
buf->append(expr_buf);
buf->append(' ');
LEX_STRING *ival= &interval_type_to_name[interval];
......@@ -1562,7 +1547,7 @@ Event_timed::get_create_event(THD *thd, String *buf)
else
{
char dtime_buff[20*2+32];/* +32 to make my_snprintf_{8bit|ucs2} happy */
buf->append(STRING_WITH_LEN("AT '"));
buf->append(STRING_WITH_LEN(" ON SCHEDULE AT '"));
/*
Pass the buffer and the second param tells fills the buffer and
returns the number of chars to copy.
......@@ -1612,7 +1597,7 @@ int
Event_job_data::get_fake_create_event(THD *thd, String *buf)
{
DBUG_ENTER("Event_job_data::get_create_event");
buf->append(STRING_WITH_LEN("CREATE EVENT test.anonymous ON SCHEDULE "
buf->append(STRING_WITH_LEN("CREATE EVENT anonymous ON SCHEDULE "
"EVERY 3337 HOUR DO "));
buf->append(body.str, body.length);
......@@ -1620,81 +1605,6 @@ Event_job_data::get_fake_create_event(THD *thd, String *buf)
}
/*
Executes the event (the underlying sp_head object);
SYNOPSIS
Event_job_data::execute()
thd THD
mem_root If != NULL use it to compile the event on it
RETURN VALUE
0 success
-99 No rights on this.dbname.str
-100 event in execution (parallel execution is impossible)
others retcodes of sp_head::execute_procedure()
*/
int
Event_job_data::execute(THD *thd, MEM_ROOT *mem_root)
{
Security_context *save_ctx;
/* this one is local and not needed after exec */
Security_context security_ctx;
int ret= 0;
DBUG_ENTER("Event_job_data::execute");
DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str));
thd->change_security_context(definer_user, definer_host, dbname,
&security_ctx, &save_ctx);
if (!sphead && (ret= compile(thd, mem_root)))
goto done;
/*
THD::~THD will clean this or if there is DROP DATABASE in the SP then
it will be free there. It should not point to our buffer which is allocated
on a mem_root.
*/
thd->db= my_strdup(dbname.str, MYF(0));
thd->db_length= dbname.length;
if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str)))
{
List<Item> empty_item_list;
empty_item_list.empty();
if (thd->enable_slow_log)
sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS;
sphead->m_flags|= sp_head::LOG_GENERAL_LOG;
ret= sphead->execute_procedure(thd, &empty_item_list);
}
else
{
DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str,
definer_host.str, dbname.str));
ret= -99;
}
/* Will compile every time a new sp_head on different root */
free_sp();
done:
thd->restore_security_context(save_ctx);
/*
1. Don't cache sphead if allocated on another mem_root
2. Don't call security_ctx.destroy() because this will free our dbname.str
name.str and definer.str
*/
if (mem_root && sphead)
{
delete sphead;
sphead= 0;
}
DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret));
DBUG_RETURN(ret);
}
/*
Frees the memory of the sp_head object we hold
SYNOPSIS
......@@ -1816,7 +1726,6 @@ Event_job_data::compile(THD *thd, MEM_ROOT *mem_root)
DBUG_PRINT("note", ("success compiling %s.%s", dbname.str, name.str));
sphead= lex.sphead;
sphead->m_db= dbname;
sphead->set_definer(definer.str, definer.length);
sphead->set_info(0, 0, &lex.sp_chistics, sql_mode);
......@@ -1847,11 +1756,77 @@ Event_job_data::compile(THD *thd, MEM_ROOT *mem_root)
}
/*
Executes the event (the underlying sp_head object);
SYNOPSIS
Event_job_data::execute()
thd THD
RETURN VALUE
0 success
-99 No rights on this.dbname.str
others retcodes of sp_head::execute_procedure()
*/
int
Event_job_data::execute(THD *thd)
{
Security_context *save_ctx;
/* this one is local and not needed after exec */
Security_context security_ctx;
int ret= 0;
DBUG_ENTER("Event_job_data::execute");
DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str));
if ((ret= compile(thd, NULL)))
goto done;
thd->change_security_context(definer_user, definer_host, dbname,
&security_ctx, &save_ctx);
/*
THD::~THD will clean this or if there is DROP DATABASE in the SP then
it will be free there. It should not point to our buffer which is allocated
on a mem_root.
*/
thd->db= my_strdup(dbname.str, MYF(0));
thd->db_length= dbname.length;
if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str)))
{
List<Item> empty_item_list;
empty_item_list.empty();
if (thd->enable_slow_log)
sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS;
sphead->m_flags|= sp_head::LOG_GENERAL_LOG;
ret= sphead->execute_procedure(thd, &empty_item_list);
}
else
{
DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str,
definer_host.str, dbname.str));
ret= -99;
}
thd->restore_security_context(save_ctx);
done:
free_sp();
DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret));
DBUG_RETURN(ret);
}
/*
Checks whether two events are in the same schema
SYNOPSIS
event_basic_db_equal()
db Schema
et Compare et->dbname to `db`
RETURN VALUE
TRUE Equal
......@@ -1859,9 +1834,9 @@ Event_job_data::compile(THD *thd, MEM_ROOT *mem_root)
*/
bool
event_basic_db_equal(LEX_STRING *db, Event_basic *et)
event_basic_db_equal(LEX_STRING db, Event_basic *et)
{
return !sortcmp_lex_string(et->dbname, *db, system_charset_info);
return !sortcmp_lex_string(et->dbname, db, system_charset_info);
}
......
......@@ -17,45 +17,16 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define EVEX_OK 0
#define EVEX_KEY_NOT_FOUND -1
#define EVEX_OPEN_TABLE_FAILED -2
#define EVEX_WRITE_ROW_FAILED -3
#define EVEX_DELETE_ROW_FAILED -4
#define EVEX_GET_FIELD_FAILED -5
#define EVEX_PARSE_ERROR -6
#define EVEX_INTERNAL_ERROR -7
#define EVEX_NO_DB_ERROR -8
#define EVEX_COMPILE_ERROR -19
#define EVEX_GENERAL_ERROR -20
#define EVEX_BAD_IDENTIFIER -21
#define EVEX_BODY_TOO_LONG -22
#define EVEX_BAD_PARAMS -23
#define EVEX_NOT_RUNNING -24
#define EVEX_MICROSECOND_UNSUP -25
#define EVEX_CANT_KILL -26
#define EVENT_EXEC_NO_MORE (1L << 0)
#define EVENT_NOT_USED (1L << 1)
#define EVENT_FREE_WHEN_FINISHED (1L << 2)
#define EVENT_EXEC_STARTED 0
#define EVENT_EXEC_ALREADY_EXEC 1
#define EVENT_EXEC_CANT_FORK 2
#define EVEX_GET_FIELD_FAILED -2
#define EVEX_COMPILE_ERROR -3
#define EVEX_GENERAL_ERROR -4
#define EVEX_BAD_PARAMS -5
#define EVEX_MICROSECOND_UNSUP -6
class sp_head;
class Sql_alloc;
class Event_basic;
/* Compares only the schema part of the identifier */
bool
event_basic_db_equal( LEX_STRING *db, Event_basic *et);
/* Compares the whole identifier*/
bool
event_basic_identifier_equal(LEX_STRING db, LEX_STRING name, Event_basic *b);
class Event_basic
{
......@@ -206,7 +177,7 @@ class Event_job_data : public Event_basic
load_from_row(TABLE *table);
int
execute(THD *thd, MEM_ROOT *mem_root);
execute(THD *thd);
private:
int
get_fake_create_event(THD *thd, String *buf);
......@@ -274,7 +245,7 @@ class Event_parse_data : public Sql_alloc
private:
int
void
init_definer(THD *thd);
void
......@@ -303,4 +274,13 @@ class Event_parse_data : public Sql_alloc
};
/* Compares only the schema part of the identifier */
bool
event_basic_db_equal(LEX_STRING db, Event_basic *et);
/* Compares the whole identifier*/
bool
event_basic_identifier_equal(LEX_STRING db, LEX_STRING name, Event_basic *b);
#endif /* _EVENT_DATA_OBJECTS_H_ */
......@@ -17,10 +17,10 @@
#include "mysql_priv.h"
#include "event_db_repository.h"
#include "event_data_objects.h"
#include "sp_head.h"
#include "sp.h"
#include "events.h"
#include "sql_show.h"
#include "sp.h"
#include "sp_head.h"
#define EVEX_DB_FIELD_LEN 64
#define EVEX_NAME_FIELD_LEN 64
......@@ -509,8 +509,8 @@ check_parse_params(THD *thd, Event_parse_data *parse_data)
rows_affected [out] How many rows were affected
RETURN VALUE
0 - OK
EVEX_GENERAL_ERROR - Failure
0 OK
EVEX_GENERAL_ERROR Failure
DESCRIPTION
Creates an event. Relies on mysql_event_fill_row which is shared with
......@@ -545,7 +545,7 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
parse_data->name.str));
DBUG_PRINT("info", ("check existance of an event with the same name"));
if (!find_event_by_name(thd, parse_data->dbname, parse_data->name, table))
if (!find_named_event(thd, parse_data->dbname, parse_data->name, table))
{
if (create_if_not)
{
......@@ -623,7 +623,7 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
(void) mysql_change_db(thd, old_db.str, 1);
if (table)
close_thread_tables(thd);
DBUG_RETURN(EVEX_OK);
DBUG_RETURN(0);
err:
if (dbchanged)
......@@ -652,13 +652,13 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
alter in case of RENAME TO.
*/
int
bool
Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
sp_name *new_name)
LEX_STRING *new_dbname, LEX_STRING *new_name)
{
CHARSET_INFO *scs= system_charset_info;
TABLE *table= NULL;
int ret= EVEX_OPEN_TABLE_FAILED;
int ret;
DBUG_ENTER("Event_db_repository::update_event");
if (open_event_table(thd, TL_WRITE, &table))
......@@ -673,22 +673,22 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
DBUG_PRINT("info", ("dbname: %s", parse_data->dbname.str));
DBUG_PRINT("info", ("name: %s", parse_data->name.str));
DBUG_PRINT("info", ("user: %s", parse_data->definer.str));
if (new_name)
DBUG_PRINT("info", ("rename to: %s", new_name->m_name.str));
if (new_dbname)
DBUG_PRINT("info", ("rename to: %s@%s", new_dbname->str, new_name->str));
/* first look whether we overwrite */
if (new_name)
if (new_dbname)
{
if (!sortcmp_lex_string(parse_data->name, new_name->m_name, scs) &&
!sortcmp_lex_string(parse_data->dbname, new_name->m_db, scs))
if (!sortcmp_lex_string(parse_data->name, *new_name, scs) &&
!sortcmp_lex_string(parse_data->dbname, *new_dbname, scs))
{
my_error(ER_EVENT_SAME_NAME, MYF(0), parse_data->name.str);
goto err;
}
if (!find_event_by_name(thd, new_name->m_db, new_name->m_name, table))
if (!find_named_event(thd, *new_dbname, *new_name, table))
{
my_error(ER_EVENT_ALREADY_EXISTS, MYF(0), new_name->m_name.str);
my_error(ER_EVENT_ALREADY_EXISTS, MYF(0), new_name->str);
goto err;
}
}
......@@ -698,8 +698,7 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
overwrite the key and SE will tell us that it cannot find the already found
row (copied into record[1] later
*/
if (EVEX_KEY_NOT_FOUND == find_event_by_name(thd, parse_data->dbname,
parse_data->name, table))
if (find_named_event(thd, parse_data->dbname, parse_data->name, table))
{
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), parse_data->name.str);
goto err;
......@@ -714,22 +713,20 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
mysql_event_fill_row() calls my_error() in case of error so no need to
handle it here
*/
if ((ret= mysql_event_fill_row(thd, table, parse_data, TRUE)))
if (mysql_event_fill_row(thd, table, parse_data, TRUE))
goto err;
if (new_name)
if (new_dbname)
{
table->field[ET_FIELD_DB]->
store(new_name->m_db.str, new_name->m_db.length, scs);
table->field[ET_FIELD_NAME]->
store(new_name->m_name.str, new_name->m_name.length, scs);
table->field[ET_FIELD_DB]->store(new_dbname->str, new_dbname->length, scs);
table->field[ET_FIELD_NAME]->store(new_name->str, new_name->length, scs);
}
/* Close active transaction only if We are going to modify disk */
if (end_active_trans(thd))
goto err;
if ((ret= table->file->ha_update_row(table->record[1], table->record[0])))
if (table->file->ha_update_row(table->record[1], table->record[0]))
{
my_error(ER_EVENT_STORE_FAILED, MYF(0), parse_data->name.str, ret);
goto err;
......@@ -737,12 +734,12 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
/* close mysql.event or we crash later when loading the event from disk */
close_thread_tables(thd);
DBUG_RETURN(0);
DBUG_RETURN(FALSE);
err:
if (table)
close_thread_tables(thd);
DBUG_RETURN(EVEX_GENERAL_ERROR);
DBUG_RETURN(TRUE);
}
......@@ -759,11 +756,11 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
rows_affected [out] Affected number of rows is returned heres
RETURN VALUE
0 OK
!0 Error (my_error() called)
FALSE OK
TRUE Error (reported)
*/
int
bool
Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name,
bool drop_if_exists, uint *rows_affected)
{
......@@ -772,26 +769,24 @@ Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name,
int ret;
DBUG_ENTER("Event_db_repository::drop_event");
DBUG_PRINT("enter", ("db=%s name=%s", db.str, name.str));
ret= EVEX_OPEN_TABLE_FAILED;
DBUG_PRINT("enter", ("%s@%s", db.str, name.str));
thd->reset_n_backup_open_tables_state(&backup);
if (open_event_table(thd, TL_WRITE, &table))
if ((ret= open_event_table(thd, TL_WRITE, &table)))
{
my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0));
goto done;
}
switch ((ret= find_event_by_name(thd, db, name, table))) {
case 0:
if (!(ret= find_named_event(thd, db, name, table)))
{
/* Close active transaction only if we are actually going to modify disk */
if ((ret= end_active_trans(thd)))
break;
if ((ret= table->file->ha_delete_row(table->record[0])))
if (!(ret= end_active_trans(thd)) &&
(ret= table->file->ha_delete_row(table->record[0])))
my_error(ER_EVENT_CANNOT_DELETE, MYF(0));
break;
case EVEX_KEY_NOT_FOUND:
}
else
{
if (drop_if_exists)
{
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
......@@ -800,15 +795,13 @@ Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name,
ret= 0;
} else
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name.str);
break;
default:
;
}
done:
if (table)
close_thread_tables(thd);
thd->restore_backup_open_tables_state(&backup);
DBUG_RETURN(ret);
}
......@@ -818,23 +811,23 @@ Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name,
is stored.
SYNOPSIS
Event_db_repository::find_event_by_name()
Event_db_repository::find_named_event()
thd Thread
db Schema
name Event name
table Opened mysql.event
RETURN VALUE
0 OK
EVEX_KEY_NOT_FOUND No such event
FALSE OK
TRUE No such event
*/
int
Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db,
LEX_STRING name, TABLE *table)
bool
Event_db_repository::find_named_event(THD *thd, LEX_STRING db, LEX_STRING name,
TABLE *table)
{
byte key[MAX_KEY_LENGTH];
DBUG_ENTER("Event_db_repository::find_event_by_name");
DBUG_ENTER("Event_db_repository::find_named_event");
DBUG_PRINT("enter", ("name: %.*s", name.length, name.str));
/*
......@@ -846,7 +839,7 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db,
*/
if (db.length > table->field[ET_FIELD_DB]->field_length ||
name.length > table->field[ET_FIELD_NAME]->field_length)
DBUG_RETURN(EVEX_KEY_NOT_FOUND);
DBUG_RETURN(TRUE);
table->field[ET_FIELD_DB]->store(db.str, db.length, &my_charset_bin);
table->field[ET_FIELD_NAME]->store(name.str, name.length, &my_charset_bin);
......@@ -858,11 +851,11 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db,
HA_READ_KEY_EXACT))
{
DBUG_PRINT("info", ("Row not found"));
DBUG_RETURN(EVEX_KEY_NOT_FOUND);
DBUG_RETURN(TRUE);
}
DBUG_PRINT("info", ("Row found!"));
DBUG_RETURN(0);
DBUG_RETURN(FALSE);
}
......@@ -944,67 +937,6 @@ Event_db_repository::drop_events_by_field(THD *thd,
}
/*
Looks for a named event in mysql.event and in case of success returns
an object will data loaded from the table.
SYNOPSIS
Event_db_repository::find_event()
thd [in] THD
name [in] The name of the event to find
ett [out] Event's data if event is found
tbl [in] TABLE object to use when not NULL
NOTES
1) Use sp_name for look up, return in **ett if found
2) tbl is not closed at exit
RETURN VALUE
0 ok In this case *ett is set to the event
# error *ett == 0
*/
int
Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
Event_basic *et)
{
TABLE *table= NULL;
int ret;
DBUG_ENTER("Event_db_repository::find_event");
DBUG_PRINT("enter", ("name: %*s", name.length, name.str));
if (open_event_table(thd, TL_READ, &table))
{
my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0));
ret= EVEX_GENERAL_ERROR;
goto done;
}
if ((ret= find_event_by_name(thd, dbname, name, table)))
{
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name.str);
goto done;
}
/*
1)The table should not be closed beforehand. ::load_from_row() only loads
and does not compile
2)::load_from_row() is silent on error therefore we emit error msg here
*/
if ((ret= et->load_from_row(table)))
{
my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0), "event");
goto done;
}
done:
if (table)
close_thread_tables(thd);
DBUG_RETURN(ret);
}
/*
Looks for a named event in mysql.event and then loads it from
the table, compiles and inserts it into the cache.
......@@ -1017,14 +949,15 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
etn_new [out] The loaded event
RETURN VALUE
OP_OK OK
OP_LOAD_ERROR Error during loading from disk
FALSE OK
TRUE Error (reported)
*/
int
bool
Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname,
LEX_STRING name, Event_basic *etn)
{
TABLE *table= NULL;
int ret= 0;
Open_tables_state backup;
......@@ -1032,12 +965,19 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname,
DBUG_PRINT("enter",("thd=0x%lx name:%*s",thd, name.length, name.str));
thd->reset_n_backup_open_tables_state(&backup);
/* No need to use my_error() here because find_event() has done it */
ret= find_event(thd, dbname, name, etn);
if ((ret= open_event_table(thd, TL_READ, &table)))
my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0));
else if ((ret= find_named_event(thd, dbname, name, table)))
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name.str);
else if ((ret= etn->load_from_row(table)))
my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0), "event");
if (table)
close_thread_tables(thd);
thd->restore_backup_open_tables_state(&backup);
/* In this case no memory was allocated so we don't need to clean */
if (ret)
DBUG_RETURN(OP_LOAD_ERROR);
DBUG_RETURN(OP_OK);
DBUG_RETURN(ret);
}
......@@ -16,6 +16,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define EVEX_OPEN_TABLE_FAILED -1
enum enum_events_table_field
{
ET_FIELD_DB = 0,
......@@ -60,24 +62,23 @@ class Event_db_repository
create_event(THD *thd, Event_parse_data *parse_data, my_bool create_if_not,
uint *rows_affected);
int
update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name);
bool
update_event(THD *thd, Event_parse_data *parse_data, LEX_STRING *new_dbname,
LEX_STRING *new_name);
int
bool
drop_event(THD *thd, LEX_STRING db, LEX_STRING name, bool drop_if_exists,
uint *rows_affected);
int
drop_schema_events(THD *thd, LEX_STRING schema);
int
find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_basic *et);
bool
find_named_event(THD *thd, LEX_STRING db, LEX_STRING name, TABLE *table);
int
bool
load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_basic *et);
int
find_event_by_name(THD *thd, LEX_STRING db, LEX_STRING name, TABLE *table);
int
open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
......
......@@ -15,13 +15,14 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "mysql_priv.h"
#include "events.h"
#include "event_scheduler_ng.h"
#include "event_queue.h"
#include "event_data_objects.h"
#include "event_db_repository.h"
#include "sp_head.h"
#include "event_scheduler_ng.h"
#define EVENT_QUEUE_INITIAL_SIZE 30
#define EVENT_QUEUE_EXTENT 30
#ifdef __GNUC__
#if __GNUC__ >= 2
......@@ -36,21 +37,20 @@
/*
Compares the execute_at members of 2 Event_queue_element instances.
Compares the execute_at members of two Event_queue_element instances.
Used as callback for the prioritized queue when shifting
elements inside.
SYNOPSIS
event_queue_element_data_compare_q()
vptr - not used (set it to NULL)
a - first Event_queue_element object
b - second Event_queue_element object
vptr Not used (set it to NULL)
a First Event_queue_element object
b Second Event_queue_element object
RETURN VALUE
-1 - a->execute_at < b->execute_at
0 - a->execute_at == b->execute_at
1 - a->execute_at > b->execute_at
-1 a->execute_at < b->execute_at
0 a->execute_at == b->execute_at
1 a->execute_at > b->execute_at
NOTES
execute_at.second_part is not considered during comparison
......@@ -73,9 +73,13 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b)
Event_queue::Event_queue()
{
mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0;
mutex_last_unlocked_in_func= mutex_last_locked_in_func= "";
mutex_queue_data_locked= FALSE;
mutex_last_unlocked_at_line= mutex_last_locked_at_line=
mutex_last_attempted_lock_at_line= 0;
mutex_last_unlocked_in_func= mutex_last_locked_in_func=
mutex_last_attempted_lock_in_func= "";
mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
}
......@@ -107,24 +111,6 @@ Event_queue::deinit_mutexes()
}
/*
Signals the main scheduler thread that the queue has changed
its state.
SYNOPSIS
Event_queue::notify_observers()
*/
void
Event_queue::notify_observers()
{
DBUG_ENTER("Event_queue::notify_observers");
DBUG_PRINT("info", ("Signalling change of the queue"));
scheduler->queue_changed();
DBUG_VOID_RETURN;
}
/*
Inits the queue
......@@ -148,8 +134,9 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
db_repository= db_repo;
scheduler= sched;
if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
event_queue_element_compare_q, NULL, 30 /*auto_extent*/))
if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
0 /*smallest_on_top*/, event_queue_element_compare_q,
NULL, EVENT_QUEUE_EXTENT))
{
sql_print_error("SCHEDULER: Can't initialize the execution queue");
ret= TRUE;
......@@ -172,7 +159,8 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
/*
Deinits the queue
Deinits the queue. Remove all elements from it and destroys them
too.
SYNOPSIS
Event_queue::deinit_queue()
......@@ -193,12 +181,12 @@ Event_queue::deinit_queue()
/*
Creates an event in the scheduler queue
Adds an event to the queue.
SYNOPSIS
Event_queue::create_event()
et The event to add
check_existence Whether to check if already loaded.
dbname The schema of the new event
name The name of the new event
RETURN VALUE
OP_OK OK or scheduler not working
......@@ -209,21 +197,21 @@ int
Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{
int res;
Event_queue_element *element_new;
Event_queue_element *new_element;
DBUG_ENTER("Event_queue::create_event");
DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str));
element_new= new Event_queue_element();
res= db_repository->load_named_event(thd, dbname, name, element_new);
if (res || element_new->status == Event_queue_element::DISABLED)
delete element_new;
new_element= new Event_queue_element();
res= db_repository->load_named_event(thd, dbname, name, new_element);
if (res || new_element->status == Event_queue_element::DISABLED)
delete new_element;
else
{
element_new->compute_next_execution_time();
new_element->compute_next_execution_time();
LOCK_QUEUE_DATA();
DBUG_PRINT("info", ("new event in the queue 0x%lx", element_new));
queue_insert_safe(&queue, (byte *) element_new);
DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
queue_insert_safe(&queue, (byte *) new_element);
UNLOCK_QUEUE_DATA();
notify_observers();
......@@ -254,53 +242,54 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
LEX_STRING *new_schema, LEX_STRING *new_name)
{
int res;
Event_queue_element *element_old= NULL,
*element_new;
Event_queue_element *old_element= NULL,
*new_element;
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
element_new= new Event_queue_element();
new_element= new Event_queue_element();
res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname,
new_name? *new_name:name, element_new);
new_name? *new_name:name, new_element);
if (res)
{
delete element_new;
delete new_element;
goto end;
}
else if (element_new->status == Event_queue_element::DISABLED)
else if (new_element->status == Event_queue_element::DISABLED)
{
DBUG_PRINT("info", ("The event is disabled."));
/*
Destroy the object but don't skip to end: because we may have to remove
object from the cache.
*/
delete element_new;
element_new= NULL;
delete new_element;
new_element= NULL;
}
else
element_new->compute_next_execution_time();
new_element->compute_next_execution_time();
LOCK_QUEUE_DATA();
if (!(element_old= find_event(dbname, name, TRUE)))
if (!(old_element= find_n_remove_event(dbname, name)))
{
DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED",
dbname.str, name.str));
}
/* If not disabled event */
if (element_new)
if (new_element)
{
DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx",
element_new, element_old));
queue_insert_safe(&queue, (byte *) element_new);
new_element, old_element));
queue_insert_safe(&queue, (byte *) new_element);
}
UNLOCK_QUEUE_DATA();
notify_observers();
if (new_element)
notify_observers();
if (element_old)
delete element_old;
if (old_element)
delete old_element;
end:
DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res);
......@@ -326,7 +315,7 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name));
LOCK_QUEUE_DATA();
element= find_event(dbname, name, TRUE);
element= find_n_remove_event(dbname, name);
UNLOCK_QUEUE_DATA();
if (element)
......@@ -343,48 +332,6 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
}
/*
Searches for an event in the queue
SYNOPSIS
Event_queue::find_event()
db The schema of the event to find
name The event to find
remove_from_q If found whether to remove from the Q
RETURN VALUE
NULL Not found
otherwise Address
NOTE
The caller should do the locking also the caller is responsible for
actual signalling in case an event is removed from the queue
(signalling COND_new_work for instance).
*/
Event_queue_element *
Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
{
uint i;
DBUG_ENTER("Event_queue::find_event");
for (i= 0; i < queue.elements; ++i)
{
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
et->dbname.str, et->name.str));
if (event_basic_identifier_equal(db, name, et))
{
if (remove_from_q)
queue_remove(&queue, i);
DBUG_RETURN(et);
}
}
DBUG_RETURN(NULL);
}
/*
Drops all events from the in-memory queue and disk that match
certain pattern evaluated by a comparator function
......@@ -404,7 +351,7 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
void
Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*comparator)(LEX_STRING *, Event_basic *))
bool (*comparator)(LEX_STRING, Event_basic *))
{
DBUG_ENTER("Event_queue::drop_matching_events");
DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str));
......@@ -414,7 +361,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
{
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str));
if (comparator(&pattern, et))
if (comparator(pattern, et))
{
/*
The queue is ordered. If we remove an element, then all elements after
......@@ -468,25 +415,59 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
/*
Returns the number of elements in the queue
Signals the observers (the main scheduler thread) that the
state of the queue has been changed.
SYNOPSIS
Event_queue::events_count()
Event_queue::notify_observers()
*/
void
Event_queue::notify_observers()
{
DBUG_ENTER("Event_queue::notify_observers");
DBUG_PRINT("info", ("Signalling change of the queue"));
scheduler->queue_changed();
DBUG_VOID_RETURN;
}
/*
Searches for an event in the queue
SYNOPSIS
Event_queue::find_n_remove_event()
db The schema of the event to find
name The event to find
RETURN VALUE
Number of Event_queue_element objects in the queue
NULL Not found
otherwise Address
NOTE
The caller should do the locking also the caller is responsible for
actual signalling in case an event is removed from the queue.
*/
uint
Event_queue::events_count()
Event_queue_element *
Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
{
uint n;
DBUG_ENTER("Event_scheduler::events_count");
LOCK_QUEUE_DATA();
n= queue.elements;
UNLOCK_QUEUE_DATA();
DBUG_PRINT("info", ("n=%u", n));
DBUG_RETURN(n);
uint i;
DBUG_ENTER("Event_queue::find_n_remove_event");
for (i= 0; i < queue.elements; ++i)
{
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
et->dbname.str, et->name.str));
if (event_basic_identifier_equal(db, name, et))
{
queue_remove(&queue, i);
DBUG_RETURN(et);
}
}
DBUG_RETURN(NULL);
}
......@@ -620,6 +601,11 @@ Event_queue::load_events_from_db(THD *thd)
SYNOPSIS
Event_queue::check_system_tables()
thd Thread
RETURN VALUE
FALSE OK
TRUE Error
*/
bool
......@@ -738,6 +724,14 @@ Event_queue::empty_queue()
}
/*
Dumps the queue to the trace log.
SYNOPSIS
Event_queue::dbug_dump_queue()
now Current timestamp
*/
inline void
Event_queue::dbug_dump_queue(time_t now)
{
......@@ -761,12 +755,37 @@ Event_queue::dbug_dump_queue(time_t now)
#endif
}
Event_job_data *
/*
Checks whether the top of the queue is elligible for execution and
returns an Event_job_data instance in case it should be executed.
`now` is compared against `execute_at` of the top element in the queue.
SYNOPSIS
Event_queue::dbug_dump_queue()
thd [in] Thread
now [in] Current timestamp
job_data [out] The object to execute
abstime [out] Time to sleep
RETURN VALUE
FALSE No error. If *job_data==NULL then top not elligible for execution.
Could be that there is no top. If abstime->tv_sec is set to value
greater than zero then use abstime with pthread_cond_timedwait().
If abstime->tv_sec is zero then sleep with pthread_cond_wait().
abstime->tv_nsec is always zero.
TRUE Error
*/
bool
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
Event_job_data **job_data,
struct timespec *abstime)
{
bool ret= FALSE;
struct timespec top_time;
Event_job_data *et_new= NULL;
*job_data= NULL;
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now));
abstime->tv_nsec= 0;
......@@ -780,56 +799,58 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
}
dbug_dump_queue(now);
Event_queue_element *et= ((Event_queue_element*) queue_element(&queue, 0));
top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0));
if (top_time.tv_sec <= now)
{
DBUG_PRINT("info", ("Ready for execution"));
abstime->tv_sec= 0;
et_new= new Event_job_data();
if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
et_new)))
{
delete et_new;
et_new= NULL;
DBUG_ASSERT(0);
break;
}
et->mark_last_executed(thd);
if (et->compute_next_execution_time())
et->status= Event_queue_element::DISABLED;
DBUG_PRINT("info", ("event's status is %d", et->status));
et->update_timing_fields(thd);
if (((et->execute_at.year && !et->expression) || et->execute_at_null) ||
(et->status == Event_queue_element::DISABLED))
{
DBUG_PRINT("info", ("removing from the queue"));
if (et->dropped)
et->drop(thd);
delete et;
queue_remove(&queue, 0);
}
else
queue_replaced(&queue);
}
else
top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at);
if (top_time.tv_sec > now)
{
abstime->tv_sec= top_time.tv_sec;
DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now,
abstime->tv_sec));
break;
}
DBUG_PRINT("info", ("Ready for execution"));
abstime->tv_sec= 0;
*job_data= new Event_job_data();
if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
*job_data)))
{
delete *job_data;
*job_data= NULL;
ret= TRUE;
break;
}
top->mark_last_executed(thd);
if (top->compute_next_execution_time())
top->status= Event_queue_element::DISABLED;
DBUG_PRINT("info", ("event's status is %d", top->status));
top->update_timing_fields(thd);
if (((top->execute_at.year && !top->expression) || top->execute_at_null) ||
(top->status == Event_queue_element::DISABLED))
{
DBUG_PRINT("info", ("removing from the queue"));
if (top->dropped)
top->drop(thd);
delete top;
queue_remove(&queue, 0);
}
else
queue_replaced(&queue);
} while (0);
UNLOCK_QUEUE_DATA();
DBUG_PRINT("info", ("returning. et_new=0x%lx abstime.tv_sec=%d ", et_new,
abstime->tv_sec));
if (et_new)
DBUG_PRINT("info", ("db=%s name=%s definer=%s",
et_new->dbname.str, et_new->name.str, et_new->definer.str));
DBUG_RETURN(et_new);
DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ",
ret, *job_data, abstime->tv_sec));
if (*job_data)
DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str,
(*job_data)->name.str, (*job_data)->definer.str));
DBUG_RETURN(ret);
}
......@@ -848,10 +869,18 @@ Event_queue::lock_data(const char *func, uint line)
{
DBUG_ENTER("Event_queue::lock_data");
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
mutex_last_attempted_lock_in_func= func;
mutex_last_attempted_lock_at_line= line;
mutex_queue_data_attempting_lock= TRUE;
pthread_mutex_lock(&LOCK_event_queue);
mutex_last_attempted_lock_in_func= "";
mutex_last_attempted_lock_at_line= 0;
mutex_queue_data_attempting_lock= FALSE;
mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line;
mutex_queue_data_locked= TRUE;
DBUG_VOID_RETURN;
}
......@@ -921,6 +950,13 @@ Event_queue::dump_internal_status(THD *thd)
protocol->store(&int_string);
ret= protocol->write();
/* queue_data_attempting_lock */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue data attempting lock"), scs);
int_string.set((longlong) mutex_queue_data_attempting_lock, scs);
protocol->store(&int_string);
ret= protocol->write();
/* last locked at*/
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue last locked at"), scs);
......@@ -940,6 +976,17 @@ Event_queue::dump_internal_status(THD *thd)
mutex_last_unlocked_at_line));
protocol->store(&tmp_string);
ret= protocol->write();
/* last attempted lock at*/
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue last attempted lock at"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s::%d",
mutex_last_attempted_lock_in_func,
mutex_last_attempted_lock_at_line));
protocol->store(&tmp_string);
ret= protocol->write();
#endif
DBUG_RETURN(FALSE);
}
......@@ -16,7 +16,6 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
class sp_name;
class Event_basic;
class Event_db_repository;
class Event_job_data;
......@@ -57,31 +56,28 @@ class Event_queue
void
drop_schema_events(THD *thd, LEX_STRING schema);
uint
events_count();
static bool
check_system_tables(THD *thd);
void
recalculate_activation_times(THD *thd);
Event_job_data *
get_top_for_execution_if_time(THD *thd, time_t now, struct timespec *abstime);
bool
get_top_for_execution_if_time(THD *thd, time_t now, Event_job_data **job_data,
struct timespec *abstime);
bool
dump_internal_status(THD *thd);
protected:
Event_queue_element *
find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q);
find_n_remove_event(LEX_STRING db, LEX_STRING name);
int
load_events_from_db(THD *thd);
void
drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*)(LEX_STRING *, Event_basic *));
bool (*)(LEX_STRING, Event_basic *));
void
empty_queue();
......@@ -93,9 +89,12 @@ class Event_queue
uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line;
uint mutex_last_attempted_lock_at_line;
const char* mutex_last_locked_in_func;
const char* mutex_last_unlocked_in_func;
const char* mutex_last_attempted_lock_in_func;
bool mutex_queue_data_locked;
bool mutex_queue_data_attempting_lock;
/* helper functions for working with mutexes & conditionals */
void
......
......@@ -176,6 +176,7 @@ deinit_event_thread(THD *thd)
my_thread_end();
}
/*
Function that executes the scheduler,
......@@ -271,7 +272,7 @@ event_worker_ng_thread(void *arg)
thd->enable_slow_log= TRUE;
ret= event->execute(thd, thd->mem_root);
ret= event->execute(thd);
evex_print_warnings(thd, event);
......@@ -506,8 +507,13 @@ Event_scheduler_ng::run(THD *thd)
{
thd->end_time();
/* Gets a minimized version */
job_data= queue->
get_top_for_execution_if_time(thd, thd->query_start(), &abstime);
if (queue->get_top_for_execution_if_time(thd, thd->query_start(),
&job_data, &abstime))
{
sql_print_information("SCHEDULER: Serious error during getting next"
" event to execute. Stopping.");
break;
}
DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d "
"abs_time.tv_sec=%d",
......
......@@ -41,10 +41,6 @@
- Add logging to file
Warning:
- For now parallel execution is not possible because the same sp_head cannot
be executed few times!!! There is still no lock attached to particular
event.
*/
......@@ -84,18 +80,14 @@ ulong Events::opt_event_scheduler= 2;
SYNOPSIS
sortcmp_lex_string()
s - first LEX_STRING
t - second LEX_STRING
cs - charset
s First LEX_STRING
t Second LEX_STRING
cs Charset
RETURN VALUE
-1 - s < t
0 - s == t
1 - s > t
Notes
TIME.second_part is not considered during comparison
-1 s < t
0 s == t
1 s > t
*/
int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
......@@ -104,6 +96,7 @@ int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
(uchar *) t.str,t.length, 0);
}
/*
Accessor for the singleton instance.
......@@ -131,13 +124,13 @@ Events::get_instance()
SYNOPSIS
Events::reconstruct_interval_expression()
buf - preallocated String buffer to add the value to
interval - the interval type (for instance YEAR_MONTH)
expression - the value in the lowest entity
buf Preallocated String buffer to add the value to
interval The interval type (for instance YEAR_MONTH)
expression The value in the lowest entity
RETURN VALUE
0 - OK
1 - Error
0 OK
1 Error
*/
int
......@@ -256,7 +249,7 @@ Events::reconstruct_interval_expression(String *buf, interval_type interval,
/*
Open mysql.event table for read
Opens mysql.event table with specified lock
SYNOPSIS
Events::open_event_table()
......@@ -283,11 +276,10 @@ Events::open_event_table(THD *thd, enum thr_lock_type lock_type,
SYNOPSIS
Events::create_event()
thd THD
et event's data
create_options Options specified when in the query. We are
interested whether there is IF NOT EXISTS
rows_affected How many rows were affected
thd [in] THD
et [in] Event's data from parsing stage
if_not_exists [in] Whether IF NOT EXISTS was specified in the DDL
rows_affected [out] How many rows were affected
RETURN VALUE
0 OK
......@@ -328,9 +320,10 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists,
SYNOPSIS
Events::update_event()
thd THD
et Event's data from parsing stage
new_name Set in case of RENAME TO.
thd [in] THD
et [in] Event's data from parsing stage
rename_to [in] Set in case of RENAME TO.
rows_affected [out] How many rows were affected.
RETURN VALUE
0 OK
......@@ -338,26 +331,25 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists,
NOTES
et contains data about dbname and event name.
new_name is the new name of the event, if not null (this means
that RENAME TO was specified in the query)
new_name is the new name of the event, if not null this means
that RENAME TO was specified in the query
*/
int
Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to,
uint *rows_affected)
{
int ret;
DBUG_ENTER("Events::update_event");
LEX_STRING *new_dbname= rename_to? &rename_to->m_db: NULL;
LEX_STRING *new_name= rename_to? &rename_to->m_name: NULL;
pthread_mutex_lock(&LOCK_event_metadata);
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->update_event(thd, parse_data, new_name)))
if (!(ret= db_repository->update_event(thd, parse_data, new_dbname, new_name)))
{
if ((ret= event_queue->update_event(thd,
parse_data->dbname,
parse_data->name,
new_name? &new_name->m_db: NULL,
new_name? &new_name->m_name: NULL)))
if ((ret= event_queue->update_event(thd, parse_data->dbname,
parse_data->name, new_dbname, new_name)))
{
DBUG_ASSERT(ret == OP_LOAD_ERROR);
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
......@@ -374,16 +366,16 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
SYNOPSIS
Events::drop_event()
thd THD
dbname Event's schema
name Event's name
if_exists When set and the event does not exist => warning onto
the stack
rows_affected Affected number of rows is returned heres
only_from_disk Whether to remove the event from the queue too. In case
of Event_job_data::drop() it's needed to do only disk
drop because Event_queue will handle removal from memory
queue.
thd [in] THD
dbname [in] Event's schema
name [in] Event's name
if_exists [in] When set and the event does not exist =>
warning onto the stack
rows_affected [out] Affected number of rows is returned here
only_from_disk [in] Whether to remove the event from the queue too.
In case of Event_job_data::drop() it's needed to
do only disk drop because Event_queue will handle
removal from memory queue.
RETURN VALUE
0 OK
......@@ -429,7 +421,7 @@ Events::drop_schema_events(THD *thd, char *db)
int ret= 0;
LEX_STRING db_lex= {db, strlen(db)};
DBUG_ENTER("evex_drop_db_events");
DBUG_ENTER("Events::drop_schema_events");
DBUG_PRINT("enter", ("dropping events from %s", db));
pthread_mutex_lock(&LOCK_event_metadata);
......@@ -455,24 +447,22 @@ Events::drop_schema_events(THD *thd, char *db)
*/
int
Events::show_create_event(THD *thd, sp_name *spn)
Events::show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{
CHARSET_INFO *scs= system_charset_info;
int ret;
Event_timed *et= new Event_timed();
Open_tables_state backup;
DBUG_ENTER("Events::show_create_event");
DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str));
DBUG_PRINT("enter", ("name: %s@%s", dbname.str, name.str));
thd->reset_n_backup_open_tables_state(&backup);
ret= db_repository->find_event(thd, spn->m_db, spn->m_name, et);
thd->restore_backup_open_tables_state(&backup);
ret= db_repository->load_named_event(thd, dbname, name, et);
if (!ret)
{
Protocol *protocol= thd->protocol;
char show_str_buf[768];
String show_str(show_str_buf, sizeof(show_str_buf), system_charset_info);
char show_str_buf[10 * STRING_BUFFER_USUAL_SIZE];
String show_str(show_str_buf, sizeof(show_str_buf), scs);
List<Item> field_list;
byte *sql_mode_str;
ulong sql_mode_len=0;
......@@ -491,18 +481,19 @@ Events::show_create_event(THD *thd, sp_name *spn)
field_list.push_back(new Item_empty_string("sql_mode", sql_mode_len));
field_list.push_back(new Item_empty_string("Create Event",
show_str.length()));
field_list.
push_back(new Item_empty_string("Create Event", show_str.length()));
if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF))
goto err;
protocol->prepare_for_resend();
protocol->store(et->name.str, et->name.length, system_charset_info);
protocol->store(et->name.str, et->name.length, scs);
protocol->store((char*) sql_mode_str, sql_mode_len, system_charset_info);
protocol->store((char*) sql_mode_str, sql_mode_len, scs);
protocol->store(show_str.c_ptr(), show_str.length(), system_charset_info);
protocol->store(show_str.c_ptr(), show_str.length(), scs);
ret= protocol->write();
send_eof(thd);
}
......@@ -546,7 +537,8 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */)
DBUG_RETURN(1);
db= thd->lex->select_lex.db;
}
DBUG_RETURN(get_instance()->db_repository->fill_schema_events(thd, tables, db));
DBUG_RETURN(get_instance()->db_repository->
fill_schema_events(thd, tables, db));
}
......@@ -561,14 +553,12 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */)
RETURN VALUE
0 OK
1 Error
1 Error in case the scheduler can't start
*/
int
Events::init()
{
int ret= 0;
Event_db_repository *db_repo;
DBUG_ENTER("Events::init");
event_queue->init_queue(db_repository, scheduler_ng);
scheduler_ng->init_scheduler(event_queue);
......@@ -653,7 +643,10 @@ Events::destroy_mutexes()
/*
Proxy for Event_scheduler::dump_internal_status
Dumps the internal status of the scheduler and the memory cache
into a table with two columns - Name & Value. Different properties
which could be useful for debugging for instance deadlocks are
returned.
SYNOPSIS
Events::dump_internal_status()
......@@ -733,8 +726,8 @@ Events::stop_execution_of_events()
Events::is_started()
RETURN VALUE
TRUE Yes
FALSE No
TRUE Yes
FALSE No
*/
bool
......
......@@ -81,7 +81,7 @@ class Events
uint *rows_affected);
int
update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to,
uint *rows_affected);
int
......@@ -95,7 +95,7 @@ class Events
open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
int
show_create_event(THD *thd, sp_name *spn);
show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
/* Needed for both SHOW CREATE EVENT and INFORMATION_SCHEMA */
static int
......
......@@ -3926,7 +3926,8 @@ mysql_execute_command(THD *thd)
}
if (lex->sql_command == SQLCOM_SHOW_CREATE_EVENT)
res= Events::get_instance()->show_create_event(thd, lex->spname);
res= Events::get_instance()->show_create_event(thd, lex->spname->m_db,
lex->spname->m_name);
else
{
uint affected= 1;
......
......@@ -1447,7 +1447,8 @@ ev_sql_stmt:
LEX *lex=Lex;
// return back to the original memory root ASAP
lex->sphead->init_strings(YYTHD, lex, NULL);
lex->sphead->init_strings(YYTHD, lex,
Lex->event_parse_data->identifier);
lex->sphead->restore_thd_mem_root(YYTHD);
lex->sp_chistics.suid= SP_IS_SUID;//always the definer!
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment