Commit 0d517461 authored by andrey@lmy004's avatar andrey@lmy004

WL#3337 (Event scheduler new architecture)

More small fixes to the API : use LEX_STRING instead of LEX_STRING* and if error
then return bool(true) instead of error code.
Merged functions. Reduced usage of sp_name.
Fixed a lot of function documentation errors.
Added function documentation wherever needed.
Removed some unused defines and error codes.

Next to come is batch rename of Event_scheduler_ng to Event_scheduler.
parent 2f7555b3
...@@ -111,7 +111,15 @@ a ...@@ -111,7 +111,15 @@ a
800219 800219
drop event non_qualif_ev; drop event non_qualif_ev;
drop table non_qualif; drop table non_qualif;
alter event non_existant rename to non_existant_too;
ERROR HY000: Unknown event 'non_existant'
set global event_scheduler = 2; set global event_scheduler = 2;
create event existant on schedule at now() + interval 1 year do select 12;
alter event non_existant rename to existant;
ERROR HY000: Event 'existant' already exists
alter event existant rename to events_test.existant;
ERROR HY000: Same old and new event name
drop event existant;
create table t_event3 (a int, b float); create table t_event3 (a int, b float);
drop event if exists event3; drop event if exists event3;
Warnings: Warnings:
......
create database if not exists events_test; CREATE DATABASE IF NOT EXISTS events_test;
use events_test; USE events_test;
"We use procedure here because its statements won't be logged into the general log" "We use procedure here because its statements won't be logged into the general log"
"If we had used normal select that are logged in different ways depending on whether" "If we had used normal select that are logged in different ways depending on whether"
"the test suite is run in normal mode or with --ps-protocol" "the test suite is run in normal mode or with --ps-protocol"
...@@ -8,18 +8,21 @@ BEGIN ...@@ -8,18 +8,21 @@ BEGIN
SELECT user_host, argument FROM mysql.general_log WHERE argument LIKE '%alabala%'; SELECT user_host, argument FROM mysql.general_log WHERE argument LIKE '%alabala%';
END| END|
"Check General Query Log" "Check General Query Log"
SET GLOBAL event_scheduler=2; CALL select_general_log();
create event log_general on schedule every 1 minute do SELect 'alabala', sleep(1) from dual;
TRUNCATE mysql.general_log;
"1 row, the current statement!"
call select_general_log();
user_host argument user_host argument
USER_HOST CREATE procedure select_general_log()
BEGIN
SELECT user_host, argument FROM mysql.general_log WHERE argument LIKE '%alabala%';
END
SET GLOBAL event_scheduler=1; SET GLOBAL event_scheduler=1;
TRUNCATE mysql.general_log;
CREATE EVENT log_general ON SCHEDULE EVERY 1 MINUTE DO SELECT 'alabala', SLEEP(1) FROM DUAL;
"Wait the scheduler to start" "Wait the scheduler to start"
"Should see 3 rows - the 'SELect' is in the middle. The other two are selects from general_log" "Should see 2 rows - the 'SELECT' is in the middle. The other two are selects from general_log"
call select_general_log(); CALL select_general_log();
user_host argument user_host argument
USER_HOST SELect 'alabala', sleep(1) from dual USER_HOST CREATE EVENT log_general ON SCHEDULE EVERY 1 MINUTE DO SELECT 'alabala', SLEEP(1) FROM DUAL
USER_HOST SELECT 'alabala', SLEEP(1) FROM DUAL
DROP PROCEDURE select_general_log; DROP PROCEDURE select_general_log;
DROP EVENT log_general; DROP EVENT log_general;
SET GLOBAL event_scheduler=2; SET GLOBAL event_scheduler=2;
...@@ -90,4 +93,4 @@ TRUNCATE mysql.slow_log; ...@@ -90,4 +93,4 @@ TRUNCATE mysql.slow_log;
DROP TABLE slow_event_test; DROP TABLE slow_event_test;
SET GLOBAL long_query_time =@old_global_long_query_time; SET GLOBAL long_query_time =@old_global_long_query_time;
SET SESSION long_query_time =@old_session_long_query_time; SET SESSION long_query_time =@old_session_long_query_time;
drop database events_test; DROP DATABASE events_test;
...@@ -105,7 +105,18 @@ create event non_qualif_ev on schedule every 10 minute do insert into non_qualif ...@@ -105,7 +105,18 @@ create event non_qualif_ev on schedule every 10 minute do insert into non_qualif
select * from non_qualif; select * from non_qualif;
drop event non_qualif_ev; drop event non_qualif_ev;
drop table non_qualif; drop table non_qualif;
--error ER_EVENT_DOES_NOT_EXIST
alter event non_existant rename to non_existant_too;
set global event_scheduler = 2; set global event_scheduler = 2;
create event existant on schedule at now() + interval 1 year do select 12;
--error ER_EVENT_ALREADY_EXISTS
alter event non_existant rename to existant;
--error ER_EVENT_SAME_NAME
alter event existant rename to events_test.existant;
drop event existant;
create table t_event3 (a int, b float); create table t_event3 (a int, b float);
drop event if exists event3; drop event if exists event3;
......
# Can't test with embedded server that doesn't support grants # Can't test with embedded server that doesn't support grants
-- source include/not_embedded.inc -- source include/not_embedded.inc
create database if not exists events_test; CREATE DATABASE IF NOT EXISTS events_test;
use events_test; USE events_test;
--echo "We use procedure here because its statements won't be logged into the general log" --echo "We use procedure here because its statements won't be logged into the general log"
--echo "If we had used normal select that are logged in different ways depending on whether" --echo "If we had used normal select that are logged in different ways depending on whether"
--echo "the test suite is run in normal mode or with --ps-protocol" --echo "the test suite is run in normal mode or with --ps-protocol"
...@@ -13,18 +13,16 @@ BEGIN ...@@ -13,18 +13,16 @@ BEGIN
END| END|
delimiter ;| delimiter ;|
--echo "Check General Query Log" --echo "Check General Query Log"
SET GLOBAL event_scheduler=2;
create event log_general on schedule every 1 minute do SELect 'alabala', sleep(1) from dual;
TRUNCATE mysql.general_log;
--echo "1 row, the current statement!"
--replace_column 1 USER_HOST --replace_column 1 USER_HOST
call select_general_log(); CALL select_general_log();
SET GLOBAL event_scheduler=1; SET GLOBAL event_scheduler=1;
TRUNCATE mysql.general_log;
CREATE EVENT log_general ON SCHEDULE EVERY 1 MINUTE DO SELECT 'alabala', SLEEP(1) FROM DUAL;
--echo "Wait the scheduler to start" --echo "Wait the scheduler to start"
--echo "Should see 3 rows - the 'SELect' is in the middle. The other two are selects from general_log" --sleep 1.5
--sleep 0.7 --echo "Should see 2 rows - the 'SELECT' is in the middle. The other two are selects from general_log"
--replace_column 1 USER_HOST --replace_column 1 USER_HOST
call select_general_log(); CALL select_general_log();
DROP PROCEDURE select_general_log; DROP PROCEDURE select_general_log;
DROP EVENT log_general; DROP EVENT log_general;
SET GLOBAL event_scheduler=2; SET GLOBAL event_scheduler=2;
...@@ -102,4 +100,4 @@ DROP TABLE slow_event_test; ...@@ -102,4 +100,4 @@ DROP TABLE slow_event_test;
SET GLOBAL long_query_time =@old_global_long_query_time; SET GLOBAL long_query_time =@old_global_long_query_time;
SET SESSION long_query_time =@old_session_long_query_time; SET SESSION long_query_time =@old_session_long_query_time;
drop database events_test; DROP DATABASE events_test;
...@@ -182,12 +182,10 @@ Event_parse_data::init_body(THD *thd) ...@@ -182,12 +182,10 @@ Event_parse_data::init_body(THD *thd)
SYNOPSIS SYNOPSIS
Event_parse_data::init_definer() Event_parse_data::init_definer()
thd Thread
RETURN VALUE
0 OK
*/ */
int void
Event_parse_data::init_definer(THD *thd) Event_parse_data::init_definer(THD *thd)
{ {
int definer_user_len; int definer_user_len;
...@@ -216,22 +214,20 @@ Event_parse_data::init_definer(THD *thd) ...@@ -216,22 +214,20 @@ Event_parse_data::init_definer(THD *thd)
definer.str[definer.length]= '\0'; definer.str[definer.length]= '\0';
DBUG_PRINT("info",("definer [%s] initted", definer.str)); DBUG_PRINT("info",("definer [%s] initted", definer.str));
DBUG_RETURN(0); DBUG_VOID_RETURN;
} }
/* /*
Set time for execution for one time events. Sets time for execution for one-time event.
SYNOPSIS SYNOPSIS
Event_parse_data::init_execute_at() Event_parse_data::init_execute_at()
expr when (datetime) thd Thread
RETURN VALUE RETURN VALUE
0 OK 0 OK
EVEX_PARSE_ERROR fix_fields failed ER_WRONG_VALUE Wrong value for execute at (reported)
EVEX_BAD_PARAMS datetime is in the past
ER_WRONG_VALUE wrong value for execute at
*/ */
int int
...@@ -293,18 +289,16 @@ wrong_value: ...@@ -293,18 +289,16 @@ wrong_value:
/* /*
Set time for execution for transient events. Sets time for execution of multi-time event.s
SYNOPSIS SYNOPSIS
Event_parse_data::init_interval() Event_parse_data::init_interval()
expr how much? thd Thread
new_interval what is the interval
RETURN VALUE RETURN VALUE
0 OK 0 OK
EVEX_PARSE_ERROR fix_fields failed (reported) EVEX_BAD_PARAMS Interval is not positive or MICROSECOND (reported)
EVEX_BAD_PARAMS Interval is not positive (reported) ER_WRONG_VALUE Wrong value for interval (reported)
EVEX_MICROSECOND_UNSUP Microseconds are not supported (reported)
*/ */
int int
...@@ -402,12 +396,11 @@ wrong_value: ...@@ -402,12 +396,11 @@ wrong_value:
/* /*
Sets activation time. Sets STARTS.
SYNOPSIS SYNOPSIS
Event_parse_data::init_starts() Event_parse_data::init_starts()
expr how much? expr how much?
interval what is the interval
NOTES NOTES
Note that activation time is not execution time. Note that activation time is not execution time.
...@@ -418,9 +411,8 @@ wrong_value: ...@@ -418,9 +411,8 @@ wrong_value:
same time. same time.
RETURN VALUE RETURN VALUE
0 OK 0 OK
EVEX_PARSE_ERROR fix_fields failed ER_WRONG_VALUE Starts before now
EVEX_BAD_PARAMS starts before now
*/ */
int int
...@@ -471,12 +463,11 @@ wrong_value: ...@@ -471,12 +463,11 @@ wrong_value:
/* /*
Sets deactivation time. Sets ENDS (deactivation time).
SYNOPSIS SYNOPSIS
Event_parse_data::init_ends() Event_parse_data::init_ends()
thd THD thd THD
new_ends When?
NOTES NOTES
Note that activation time is not execution time. Note that activation time is not execution time.
...@@ -566,7 +557,7 @@ Event_parse_data::report_bad_value(const char *item_name, Item *bad_item) ...@@ -566,7 +557,7 @@ Event_parse_data::report_bad_value(const char *item_name, Item *bad_item)
/* /*
Performs checking of the data gathered during the parsing phase. Checks for validity the data gathered during the parsing phase.
SYNOPSIS SYNOPSIS
Event_parse_data::check_parse_data() Event_parse_data::check_parse_data()
...@@ -594,6 +585,7 @@ Event_parse_data::check_parse_data(THD *thd) ...@@ -594,6 +585,7 @@ Event_parse_data::check_parse_data(THD *thd)
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
/* /*
Constructor Constructor
...@@ -769,11 +761,8 @@ Event_timed::init() ...@@ -769,11 +761,8 @@ Event_timed::init()
{ {
DBUG_ENTER("Event_timed::init"); DBUG_ENTER("Event_timed::init");
body.str= comment.str= NULL; definer_user.str= definer_host.str= body.str= comment.str= NULL;
body.length= comment.length= 0; definer_user.length= definer_host.length= body.length= comment.length= 0;
definer_user.str= definer_host.str= 0;
definer_user.length= definer_host.length= 0;
sql_mode= 0; sql_mode= 0;
...@@ -880,7 +869,7 @@ Event_queue_element::load_from_row(TABLE *table) ...@@ -880,7 +869,7 @@ Event_queue_element::load_from_row(TABLE *table)
expression= 0; expression= 0;
/* /*
If res1 and res2 are TRUE then both fields are empty. If res1 and res2 are TRUE then both fields are empty.
Hence if ET_FIELD_EXECUTE_AT is empty there is an error. Hence, if ET_FIELD_EXECUTE_AT is empty there is an error.
*/ */
execute_at_null= table->field[ET_FIELD_EXECUTE_AT]->is_null(); execute_at_null= table->field[ET_FIELD_EXECUTE_AT]->is_null();
DBUG_ASSERT(!(starts_null && ends_null && !expression && execute_at_null)); DBUG_ASSERT(!(starts_null && ends_null && !expression && execute_at_null));
...@@ -1440,8 +1429,8 @@ Event_queue_element::drop(THD *thd) ...@@ -1440,8 +1429,8 @@ Event_queue_element::drop(THD *thd)
uint tmp= 0; uint tmp= 0;
DBUG_ENTER("Event_queue_element::drop"); DBUG_ENTER("Event_queue_element::drop");
DBUG_RETURN(Events::get_instance()->drop_event(thd, dbname, name, FALSE, DBUG_RETURN(Events::get_instance()->
&tmp, TRUE)); drop_event(thd, dbname, name, FALSE, &tmp, TRUE));
} }
...@@ -1453,20 +1442,17 @@ Event_queue_element::drop(THD *thd) ...@@ -1453,20 +1442,17 @@ Event_queue_element::drop(THD *thd)
thd - thread context thd - thread context
RETURN VALUE RETURN VALUE
0 OK FALSE OK
EVEX_OPEN_TABLE_FAILED Error while opening mysql.event for writing TRUE Error while opening mysql.event for writing or during write on disk
EVEX_WRITE_ROW_FAILED On error to write to disk
others return code from SE in case deletion of the event
row failed.
*/ */
bool bool
Event_queue_element::update_timing_fields(THD *thd) Event_queue_element::update_timing_fields(THD *thd)
{ {
TABLE *table; TABLE *table;
Field **fields;
Open_tables_state backup; Open_tables_state backup;
int ret; int ret= FALSE;
DBUG_ENTER("Event_queue_element::update_timing_fields"); DBUG_ENTER("Event_queue_element::update_timing_fields");
...@@ -1480,12 +1466,12 @@ Event_queue_element::update_timing_fields(THD *thd) ...@@ -1480,12 +1466,12 @@ Event_queue_element::update_timing_fields(THD *thd)
if (Events::get_instance()->open_event_table(thd, TL_WRITE, &table)) if (Events::get_instance()->open_event_table(thd, TL_WRITE, &table))
{ {
ret= EVEX_OPEN_TABLE_FAILED; ret= TRUE;
goto done; goto done;
} }
fields= table->field;
if ((ret= Events::get_instance()->db_repository-> if ((ret= Events::get_instance()->db_repository->
find_event_by_name(thd, dbname, name, table))) find_named_event(thd, dbname, name, table)))
goto done; goto done;
store_record(table,record[1]); store_record(table,record[1]);
...@@ -1494,20 +1480,20 @@ Event_queue_element::update_timing_fields(THD *thd) ...@@ -1494,20 +1480,20 @@ Event_queue_element::update_timing_fields(THD *thd)
if (last_executed_changed) if (last_executed_changed)
{ {
table->field[ET_FIELD_LAST_EXECUTED]->set_notnull(); fields[ET_FIELD_LAST_EXECUTED]->set_notnull();
table->field[ET_FIELD_LAST_EXECUTED]->store_time(&last_executed, fields[ET_FIELD_LAST_EXECUTED]->store_time(&last_executed,
MYSQL_TIMESTAMP_DATETIME); MYSQL_TIMESTAMP_DATETIME);
last_executed_changed= FALSE; last_executed_changed= FALSE;
} }
if (status_changed) if (status_changed)
{ {
table->field[ET_FIELD_STATUS]->set_notnull(); fields[ET_FIELD_STATUS]->set_notnull();
table->field[ET_FIELD_STATUS]->store((longlong)status, TRUE); fields[ET_FIELD_STATUS]->store((longlong)status, TRUE);
status_changed= FALSE; status_changed= FALSE;
} }
if ((table->file->ha_update_row(table->record[1],table->record[0]))) if ((table->file->ha_update_row(table->record[1], table->record[0])))
ret= EVEX_WRITE_ROW_FAILED; ret= TRUE;
done: done:
close_thread_tables(thd); close_thread_tables(thd);
...@@ -1550,10 +1536,9 @@ Event_timed::get_create_event(THD *thd, String *buf) ...@@ -1550,10 +1536,9 @@ Event_timed::get_create_event(THD *thd, String *buf)
buf->append(STRING_WITH_LEN("CREATE EVENT ")); buf->append(STRING_WITH_LEN("CREATE EVENT "));
append_identifier(thd, buf, name.str, name.length); append_identifier(thd, buf, name.str, name.length);
buf->append(STRING_WITH_LEN(" ON SCHEDULE "));
if (expression) if (expression)
{ {
buf->append(STRING_WITH_LEN("EVERY ")); buf->append(STRING_WITH_LEN(" ON SCHEDULE EVERY "));
buf->append(expr_buf); buf->append(expr_buf);
buf->append(' '); buf->append(' ');
LEX_STRING *ival= &interval_type_to_name[interval]; LEX_STRING *ival= &interval_type_to_name[interval];
...@@ -1562,7 +1547,7 @@ Event_timed::get_create_event(THD *thd, String *buf) ...@@ -1562,7 +1547,7 @@ Event_timed::get_create_event(THD *thd, String *buf)
else else
{ {
char dtime_buff[20*2+32];/* +32 to make my_snprintf_{8bit|ucs2} happy */ char dtime_buff[20*2+32];/* +32 to make my_snprintf_{8bit|ucs2} happy */
buf->append(STRING_WITH_LEN("AT '")); buf->append(STRING_WITH_LEN(" ON SCHEDULE AT '"));
/* /*
Pass the buffer and the second param tells fills the buffer and Pass the buffer and the second param tells fills the buffer and
returns the number of chars to copy. returns the number of chars to copy.
...@@ -1612,7 +1597,7 @@ int ...@@ -1612,7 +1597,7 @@ int
Event_job_data::get_fake_create_event(THD *thd, String *buf) Event_job_data::get_fake_create_event(THD *thd, String *buf)
{ {
DBUG_ENTER("Event_job_data::get_create_event"); DBUG_ENTER("Event_job_data::get_create_event");
buf->append(STRING_WITH_LEN("CREATE EVENT test.anonymous ON SCHEDULE " buf->append(STRING_WITH_LEN("CREATE EVENT anonymous ON SCHEDULE "
"EVERY 3337 HOUR DO ")); "EVERY 3337 HOUR DO "));
buf->append(body.str, body.length); buf->append(body.str, body.length);
...@@ -1620,81 +1605,6 @@ Event_job_data::get_fake_create_event(THD *thd, String *buf) ...@@ -1620,81 +1605,6 @@ Event_job_data::get_fake_create_event(THD *thd, String *buf)
} }
/*
Executes the event (the underlying sp_head object);
SYNOPSIS
Event_job_data::execute()
thd THD
mem_root If != NULL use it to compile the event on it
RETURN VALUE
0 success
-99 No rights on this.dbname.str
-100 event in execution (parallel execution is impossible)
others retcodes of sp_head::execute_procedure()
*/
int
Event_job_data::execute(THD *thd, MEM_ROOT *mem_root)
{
Security_context *save_ctx;
/* this one is local and not needed after exec */
Security_context security_ctx;
int ret= 0;
DBUG_ENTER("Event_job_data::execute");
DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str));
thd->change_security_context(definer_user, definer_host, dbname,
&security_ctx, &save_ctx);
if (!sphead && (ret= compile(thd, mem_root)))
goto done;
/*
THD::~THD will clean this or if there is DROP DATABASE in the SP then
it will be free there. It should not point to our buffer which is allocated
on a mem_root.
*/
thd->db= my_strdup(dbname.str, MYF(0));
thd->db_length= dbname.length;
if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str)))
{
List<Item> empty_item_list;
empty_item_list.empty();
if (thd->enable_slow_log)
sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS;
sphead->m_flags|= sp_head::LOG_GENERAL_LOG;
ret= sphead->execute_procedure(thd, &empty_item_list);
}
else
{
DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str,
definer_host.str, dbname.str));
ret= -99;
}
/* Will compile every time a new sp_head on different root */
free_sp();
done:
thd->restore_security_context(save_ctx);
/*
1. Don't cache sphead if allocated on another mem_root
2. Don't call security_ctx.destroy() because this will free our dbname.str
name.str and definer.str
*/
if (mem_root && sphead)
{
delete sphead;
sphead= 0;
}
DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret));
DBUG_RETURN(ret);
}
/* /*
Frees the memory of the sp_head object we hold Frees the memory of the sp_head object we hold
SYNOPSIS SYNOPSIS
...@@ -1816,7 +1726,6 @@ Event_job_data::compile(THD *thd, MEM_ROOT *mem_root) ...@@ -1816,7 +1726,6 @@ Event_job_data::compile(THD *thd, MEM_ROOT *mem_root)
DBUG_PRINT("note", ("success compiling %s.%s", dbname.str, name.str)); DBUG_PRINT("note", ("success compiling %s.%s", dbname.str, name.str));
sphead= lex.sphead; sphead= lex.sphead;
sphead->m_db= dbname;
sphead->set_definer(definer.str, definer.length); sphead->set_definer(definer.str, definer.length);
sphead->set_info(0, 0, &lex.sp_chistics, sql_mode); sphead->set_info(0, 0, &lex.sp_chistics, sql_mode);
...@@ -1847,11 +1756,77 @@ done: ...@@ -1847,11 +1756,77 @@ done:
} }
/*
Executes the event (the underlying sp_head object);
SYNOPSIS
Event_job_data::execute()
thd THD
RETURN VALUE
0 success
-99 No rights on this.dbname.str
others retcodes of sp_head::execute_procedure()
*/
int
Event_job_data::execute(THD *thd)
{
Security_context *save_ctx;
/* this one is local and not needed after exec */
Security_context security_ctx;
int ret= 0;
DBUG_ENTER("Event_job_data::execute");
DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str));
if ((ret= compile(thd, NULL)))
goto done;
thd->change_security_context(definer_user, definer_host, dbname,
&security_ctx, &save_ctx);
/*
THD::~THD will clean this or if there is DROP DATABASE in the SP then
it will be free there. It should not point to our buffer which is allocated
on a mem_root.
*/
thd->db= my_strdup(dbname.str, MYF(0));
thd->db_length= dbname.length;
if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str)))
{
List<Item> empty_item_list;
empty_item_list.empty();
if (thd->enable_slow_log)
sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS;
sphead->m_flags|= sp_head::LOG_GENERAL_LOG;
ret= sphead->execute_procedure(thd, &empty_item_list);
}
else
{
DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str,
definer_host.str, dbname.str));
ret= -99;
}
thd->restore_security_context(save_ctx);
done:
free_sp();
DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret));
DBUG_RETURN(ret);
}
/* /*
Checks whether two events are in the same schema Checks whether two events are in the same schema
SYNOPSIS SYNOPSIS
event_basic_db_equal() event_basic_db_equal()
db Schema
et Compare et->dbname to `db`
RETURN VALUE RETURN VALUE
TRUE Equal TRUE Equal
...@@ -1859,9 +1834,9 @@ done: ...@@ -1859,9 +1834,9 @@ done:
*/ */
bool bool
event_basic_db_equal(LEX_STRING *db, Event_basic *et) event_basic_db_equal(LEX_STRING db, Event_basic *et)
{ {
return !sortcmp_lex_string(et->dbname, *db, system_charset_info); return !sortcmp_lex_string(et->dbname, db, system_charset_info);
} }
......
...@@ -17,45 +17,16 @@ ...@@ -17,45 +17,16 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define EVEX_OK 0 #define EVEX_GET_FIELD_FAILED -2
#define EVEX_KEY_NOT_FOUND -1 #define EVEX_COMPILE_ERROR -3
#define EVEX_OPEN_TABLE_FAILED -2 #define EVEX_GENERAL_ERROR -4
#define EVEX_WRITE_ROW_FAILED -3 #define EVEX_BAD_PARAMS -5
#define EVEX_DELETE_ROW_FAILED -4 #define EVEX_MICROSECOND_UNSUP -6
#define EVEX_GET_FIELD_FAILED -5
#define EVEX_PARSE_ERROR -6
#define EVEX_INTERNAL_ERROR -7
#define EVEX_NO_DB_ERROR -8
#define EVEX_COMPILE_ERROR -19
#define EVEX_GENERAL_ERROR -20
#define EVEX_BAD_IDENTIFIER -21
#define EVEX_BODY_TOO_LONG -22
#define EVEX_BAD_PARAMS -23
#define EVEX_NOT_RUNNING -24
#define EVEX_MICROSECOND_UNSUP -25
#define EVEX_CANT_KILL -26
#define EVENT_EXEC_NO_MORE (1L << 0)
#define EVENT_NOT_USED (1L << 1)
#define EVENT_FREE_WHEN_FINISHED (1L << 2)
#define EVENT_EXEC_STARTED 0
#define EVENT_EXEC_ALREADY_EXEC 1
#define EVENT_EXEC_CANT_FORK 2
class sp_head; class sp_head;
class Sql_alloc; class Sql_alloc;
class Event_basic;
/* Compares only the schema part of the identifier */
bool
event_basic_db_equal( LEX_STRING *db, Event_basic *et);
/* Compares the whole identifier*/
bool
event_basic_identifier_equal(LEX_STRING db, LEX_STRING name, Event_basic *b);
class Event_basic class Event_basic
{ {
...@@ -206,7 +177,7 @@ public: ...@@ -206,7 +177,7 @@ public:
load_from_row(TABLE *table); load_from_row(TABLE *table);
int int
execute(THD *thd, MEM_ROOT *mem_root); execute(THD *thd);
private: private:
int int
get_fake_create_event(THD *thd, String *buf); get_fake_create_event(THD *thd, String *buf);
...@@ -274,7 +245,7 @@ public: ...@@ -274,7 +245,7 @@ public:
private: private:
int void
init_definer(THD *thd); init_definer(THD *thd);
void void
...@@ -303,4 +274,13 @@ private: ...@@ -303,4 +274,13 @@ private:
}; };
/* Compares only the schema part of the identifier */
bool
event_basic_db_equal(LEX_STRING db, Event_basic *et);
/* Compares the whole identifier*/
bool
event_basic_identifier_equal(LEX_STRING db, LEX_STRING name, Event_basic *b);
#endif /* _EVENT_DATA_OBJECTS_H_ */ #endif /* _EVENT_DATA_OBJECTS_H_ */
...@@ -17,10 +17,10 @@ ...@@ -17,10 +17,10 @@
#include "mysql_priv.h" #include "mysql_priv.h"
#include "event_db_repository.h" #include "event_db_repository.h"
#include "event_data_objects.h" #include "event_data_objects.h"
#include "sp_head.h"
#include "sp.h"
#include "events.h" #include "events.h"
#include "sql_show.h" #include "sql_show.h"
#include "sp.h"
#include "sp_head.h"
#define EVEX_DB_FIELD_LEN 64 #define EVEX_DB_FIELD_LEN 64
#define EVEX_NAME_FIELD_LEN 64 #define EVEX_NAME_FIELD_LEN 64
...@@ -509,8 +509,8 @@ check_parse_params(THD *thd, Event_parse_data *parse_data) ...@@ -509,8 +509,8 @@ check_parse_params(THD *thd, Event_parse_data *parse_data)
rows_affected [out] How many rows were affected rows_affected [out] How many rows were affected
RETURN VALUE RETURN VALUE
0 - OK 0 OK
EVEX_GENERAL_ERROR - Failure EVEX_GENERAL_ERROR Failure
DESCRIPTION DESCRIPTION
Creates an event. Relies on mysql_event_fill_row which is shared with Creates an event. Relies on mysql_event_fill_row which is shared with
...@@ -545,7 +545,7 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data, ...@@ -545,7 +545,7 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
parse_data->name.str)); parse_data->name.str));
DBUG_PRINT("info", ("check existance of an event with the same name")); DBUG_PRINT("info", ("check existance of an event with the same name"));
if (!find_event_by_name(thd, parse_data->dbname, parse_data->name, table)) if (!find_named_event(thd, parse_data->dbname, parse_data->name, table))
{ {
if (create_if_not) if (create_if_not)
{ {
...@@ -623,7 +623,7 @@ ok: ...@@ -623,7 +623,7 @@ ok:
(void) mysql_change_db(thd, old_db.str, 1); (void) mysql_change_db(thd, old_db.str, 1);
if (table) if (table)
close_thread_tables(thd); close_thread_tables(thd);
DBUG_RETURN(EVEX_OK); DBUG_RETURN(0);
err: err:
if (dbchanged) if (dbchanged)
...@@ -652,13 +652,13 @@ err: ...@@ -652,13 +652,13 @@ err:
alter in case of RENAME TO. alter in case of RENAME TO.
*/ */
int bool
Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data, Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
sp_name *new_name) LEX_STRING *new_dbname, LEX_STRING *new_name)
{ {
CHARSET_INFO *scs= system_charset_info; CHARSET_INFO *scs= system_charset_info;
TABLE *table= NULL; TABLE *table= NULL;
int ret= EVEX_OPEN_TABLE_FAILED; int ret;
DBUG_ENTER("Event_db_repository::update_event"); DBUG_ENTER("Event_db_repository::update_event");
if (open_event_table(thd, TL_WRITE, &table)) if (open_event_table(thd, TL_WRITE, &table))
...@@ -673,22 +673,22 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data, ...@@ -673,22 +673,22 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
DBUG_PRINT("info", ("dbname: %s", parse_data->dbname.str)); DBUG_PRINT("info", ("dbname: %s", parse_data->dbname.str));
DBUG_PRINT("info", ("name: %s", parse_data->name.str)); DBUG_PRINT("info", ("name: %s", parse_data->name.str));
DBUG_PRINT("info", ("user: %s", parse_data->definer.str)); DBUG_PRINT("info", ("user: %s", parse_data->definer.str));
if (new_name) if (new_dbname)
DBUG_PRINT("info", ("rename to: %s", new_name->m_name.str)); DBUG_PRINT("info", ("rename to: %s@%s", new_dbname->str, new_name->str));
/* first look whether we overwrite */ /* first look whether we overwrite */
if (new_name) if (new_dbname)
{ {
if (!sortcmp_lex_string(parse_data->name, new_name->m_name, scs) && if (!sortcmp_lex_string(parse_data->name, *new_name, scs) &&
!sortcmp_lex_string(parse_data->dbname, new_name->m_db, scs)) !sortcmp_lex_string(parse_data->dbname, *new_dbname, scs))
{ {
my_error(ER_EVENT_SAME_NAME, MYF(0), parse_data->name.str); my_error(ER_EVENT_SAME_NAME, MYF(0), parse_data->name.str);
goto err; goto err;
} }
if (!find_event_by_name(thd, new_name->m_db, new_name->m_name, table)) if (!find_named_event(thd, *new_dbname, *new_name, table))
{ {
my_error(ER_EVENT_ALREADY_EXISTS, MYF(0), new_name->m_name.str); my_error(ER_EVENT_ALREADY_EXISTS, MYF(0), new_name->str);
goto err; goto err;
} }
} }
...@@ -698,8 +698,7 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data, ...@@ -698,8 +698,7 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
overwrite the key and SE will tell us that it cannot find the already found overwrite the key and SE will tell us that it cannot find the already found
row (copied into record[1] later row (copied into record[1] later
*/ */
if (EVEX_KEY_NOT_FOUND == find_event_by_name(thd, parse_data->dbname, if (find_named_event(thd, parse_data->dbname, parse_data->name, table))
parse_data->name, table))
{ {
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), parse_data->name.str); my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), parse_data->name.str);
goto err; goto err;
...@@ -714,22 +713,20 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data, ...@@ -714,22 +713,20 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
mysql_event_fill_row() calls my_error() in case of error so no need to mysql_event_fill_row() calls my_error() in case of error so no need to
handle it here handle it here
*/ */
if ((ret= mysql_event_fill_row(thd, table, parse_data, TRUE))) if (mysql_event_fill_row(thd, table, parse_data, TRUE))
goto err; goto err;
if (new_name) if (new_dbname)
{ {
table->field[ET_FIELD_DB]-> table->field[ET_FIELD_DB]->store(new_dbname->str, new_dbname->length, scs);
store(new_name->m_db.str, new_name->m_db.length, scs); table->field[ET_FIELD_NAME]->store(new_name->str, new_name->length, scs);
table->field[ET_FIELD_NAME]->
store(new_name->m_name.str, new_name->m_name.length, scs);
} }
/* Close active transaction only if We are going to modify disk */ /* Close active transaction only if We are going to modify disk */
if (end_active_trans(thd)) if (end_active_trans(thd))
goto err; goto err;
if ((ret= table->file->ha_update_row(table->record[1], table->record[0]))) if (table->file->ha_update_row(table->record[1], table->record[0]))
{ {
my_error(ER_EVENT_STORE_FAILED, MYF(0), parse_data->name.str, ret); my_error(ER_EVENT_STORE_FAILED, MYF(0), parse_data->name.str, ret);
goto err; goto err;
...@@ -737,12 +734,12 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data, ...@@ -737,12 +734,12 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
/* close mysql.event or we crash later when loading the event from disk */ /* close mysql.event or we crash later when loading the event from disk */
close_thread_tables(thd); close_thread_tables(thd);
DBUG_RETURN(0); DBUG_RETURN(FALSE);
err: err:
if (table) if (table)
close_thread_tables(thd); close_thread_tables(thd);
DBUG_RETURN(EVEX_GENERAL_ERROR); DBUG_RETURN(TRUE);
} }
...@@ -759,11 +756,11 @@ err: ...@@ -759,11 +756,11 @@ err:
rows_affected [out] Affected number of rows is returned heres rows_affected [out] Affected number of rows is returned heres
RETURN VALUE RETURN VALUE
0 OK FALSE OK
!0 Error (my_error() called) TRUE Error (reported)
*/ */
int bool
Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name, Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name,
bool drop_if_exists, uint *rows_affected) bool drop_if_exists, uint *rows_affected)
{ {
...@@ -772,26 +769,24 @@ Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name, ...@@ -772,26 +769,24 @@ Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name,
int ret; int ret;
DBUG_ENTER("Event_db_repository::drop_event"); DBUG_ENTER("Event_db_repository::drop_event");
DBUG_PRINT("enter", ("db=%s name=%s", db.str, name.str)); DBUG_PRINT("enter", ("%s@%s", db.str, name.str));
ret= EVEX_OPEN_TABLE_FAILED;
thd->reset_n_backup_open_tables_state(&backup); thd->reset_n_backup_open_tables_state(&backup);
if (open_event_table(thd, TL_WRITE, &table)) if ((ret= open_event_table(thd, TL_WRITE, &table)))
{ {
my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0)); my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0));
goto done; goto done;
} }
switch ((ret= find_event_by_name(thd, db, name, table))) { if (!(ret= find_named_event(thd, db, name, table)))
case 0: {
/* Close active transaction only if we are actually going to modify disk */ /* Close active transaction only if we are actually going to modify disk */
if ((ret= end_active_trans(thd))) if (!(ret= end_active_trans(thd)) &&
break; (ret= table->file->ha_delete_row(table->record[0])))
if ((ret= table->file->ha_delete_row(table->record[0])))
my_error(ER_EVENT_CANNOT_DELETE, MYF(0)); my_error(ER_EVENT_CANNOT_DELETE, MYF(0));
break; }
case EVEX_KEY_NOT_FOUND: else
{
if (drop_if_exists) if (drop_if_exists)
{ {
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
...@@ -800,15 +795,13 @@ Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name, ...@@ -800,15 +795,13 @@ Event_db_repository::drop_event(THD *thd, LEX_STRING db, LEX_STRING name,
ret= 0; ret= 0;
} else } else
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name.str); my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name.str);
break;
default:
;
} }
done: done:
if (table) if (table)
close_thread_tables(thd); close_thread_tables(thd);
thd->restore_backup_open_tables_state(&backup); thd->restore_backup_open_tables_state(&backup);
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
...@@ -818,23 +811,23 @@ done: ...@@ -818,23 +811,23 @@ done:
is stored. is stored.
SYNOPSIS SYNOPSIS
Event_db_repository::find_event_by_name() Event_db_repository::find_named_event()
thd Thread thd Thread
db Schema db Schema
name Event name name Event name
table Opened mysql.event table Opened mysql.event
RETURN VALUE RETURN VALUE
0 OK FALSE OK
EVEX_KEY_NOT_FOUND No such event TRUE No such event
*/ */
int bool
Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db, Event_db_repository::find_named_event(THD *thd, LEX_STRING db, LEX_STRING name,
LEX_STRING name, TABLE *table) TABLE *table)
{ {
byte key[MAX_KEY_LENGTH]; byte key[MAX_KEY_LENGTH];
DBUG_ENTER("Event_db_repository::find_event_by_name"); DBUG_ENTER("Event_db_repository::find_named_event");
DBUG_PRINT("enter", ("name: %.*s", name.length, name.str)); DBUG_PRINT("enter", ("name: %.*s", name.length, name.str));
/* /*
...@@ -846,7 +839,7 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db, ...@@ -846,7 +839,7 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db,
*/ */
if (db.length > table->field[ET_FIELD_DB]->field_length || if (db.length > table->field[ET_FIELD_DB]->field_length ||
name.length > table->field[ET_FIELD_NAME]->field_length) name.length > table->field[ET_FIELD_NAME]->field_length)
DBUG_RETURN(EVEX_KEY_NOT_FOUND); DBUG_RETURN(TRUE);
table->field[ET_FIELD_DB]->store(db.str, db.length, &my_charset_bin); table->field[ET_FIELD_DB]->store(db.str, db.length, &my_charset_bin);
table->field[ET_FIELD_NAME]->store(name.str, name.length, &my_charset_bin); table->field[ET_FIELD_NAME]->store(name.str, name.length, &my_charset_bin);
...@@ -858,11 +851,11 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db, ...@@ -858,11 +851,11 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db,
HA_READ_KEY_EXACT)) HA_READ_KEY_EXACT))
{ {
DBUG_PRINT("info", ("Row not found")); DBUG_PRINT("info", ("Row not found"));
DBUG_RETURN(EVEX_KEY_NOT_FOUND); DBUG_RETURN(TRUE);
} }
DBUG_PRINT("info", ("Row found!")); DBUG_PRINT("info", ("Row found!"));
DBUG_RETURN(0); DBUG_RETURN(FALSE);
} }
...@@ -944,67 +937,6 @@ Event_db_repository::drop_events_by_field(THD *thd, ...@@ -944,67 +937,6 @@ Event_db_repository::drop_events_by_field(THD *thd,
} }
/*
Looks for a named event in mysql.event and in case of success returns
an object will data loaded from the table.
SYNOPSIS
Event_db_repository::find_event()
thd [in] THD
name [in] The name of the event to find
ett [out] Event's data if event is found
tbl [in] TABLE object to use when not NULL
NOTES
1) Use sp_name for look up, return in **ett if found
2) tbl is not closed at exit
RETURN VALUE
0 ok In this case *ett is set to the event
# error *ett == 0
*/
int
Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
Event_basic *et)
{
TABLE *table= NULL;
int ret;
DBUG_ENTER("Event_db_repository::find_event");
DBUG_PRINT("enter", ("name: %*s", name.length, name.str));
if (open_event_table(thd, TL_READ, &table))
{
my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0));
ret= EVEX_GENERAL_ERROR;
goto done;
}
if ((ret= find_event_by_name(thd, dbname, name, table)))
{
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name.str);
goto done;
}
/*
1)The table should not be closed beforehand. ::load_from_row() only loads
and does not compile
2)::load_from_row() is silent on error therefore we emit error msg here
*/
if ((ret= et->load_from_row(table)))
{
my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0), "event");
goto done;
}
done:
if (table)
close_thread_tables(thd);
DBUG_RETURN(ret);
}
/* /*
Looks for a named event in mysql.event and then loads it from Looks for a named event in mysql.event and then loads it from
the table, compiles and inserts it into the cache. the table, compiles and inserts it into the cache.
...@@ -1017,14 +949,15 @@ done: ...@@ -1017,14 +949,15 @@ done:
etn_new [out] The loaded event etn_new [out] The loaded event
RETURN VALUE RETURN VALUE
OP_OK OK FALSE OK
OP_LOAD_ERROR Error during loading from disk TRUE Error (reported)
*/ */
int bool
Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname,
LEX_STRING name, Event_basic *etn) LEX_STRING name, Event_basic *etn)
{ {
TABLE *table= NULL;
int ret= 0; int ret= 0;
Open_tables_state backup; Open_tables_state backup;
...@@ -1032,12 +965,19 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, ...@@ -1032,12 +965,19 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname,
DBUG_PRINT("enter",("thd=0x%lx name:%*s",thd, name.length, name.str)); DBUG_PRINT("enter",("thd=0x%lx name:%*s",thd, name.length, name.str));
thd->reset_n_backup_open_tables_state(&backup); thd->reset_n_backup_open_tables_state(&backup);
/* No need to use my_error() here because find_event() has done it */
ret= find_event(thd, dbname, name, etn); if ((ret= open_event_table(thd, TL_READ, &table)))
my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0));
else if ((ret= find_named_event(thd, dbname, name, table)))
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name.str);
else if ((ret= etn->load_from_row(table)))
my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0), "event");
if (table)
close_thread_tables(thd);
thd->restore_backup_open_tables_state(&backup); thd->restore_backup_open_tables_state(&backup);
/* In this case no memory was allocated so we don't need to clean */ /* In this case no memory was allocated so we don't need to clean */
if (ret)
DBUG_RETURN(OP_LOAD_ERROR);
DBUG_RETURN(OP_OK); DBUG_RETURN(ret);
} }
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define EVEX_OPEN_TABLE_FAILED -1
enum enum_events_table_field enum enum_events_table_field
{ {
ET_FIELD_DB = 0, ET_FIELD_DB = 0,
...@@ -60,24 +62,23 @@ public: ...@@ -60,24 +62,23 @@ public:
create_event(THD *thd, Event_parse_data *parse_data, my_bool create_if_not, create_event(THD *thd, Event_parse_data *parse_data, my_bool create_if_not,
uint *rows_affected); uint *rows_affected);
int bool
update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name); update_event(THD *thd, Event_parse_data *parse_data, LEX_STRING *new_dbname,
LEX_STRING *new_name);
int bool
drop_event(THD *thd, LEX_STRING db, LEX_STRING name, bool drop_if_exists, drop_event(THD *thd, LEX_STRING db, LEX_STRING name, bool drop_if_exists,
uint *rows_affected); uint *rows_affected);
int int
drop_schema_events(THD *thd, LEX_STRING schema); drop_schema_events(THD *thd, LEX_STRING schema);
int bool
find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_basic *et); find_named_event(THD *thd, LEX_STRING db, LEX_STRING name, TABLE *table);
int bool
load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_basic *et); load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_basic *et);
int
find_event_by_name(THD *thd, LEX_STRING db, LEX_STRING name, TABLE *table);
int int
open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table); open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
......
...@@ -15,13 +15,14 @@ ...@@ -15,13 +15,14 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "mysql_priv.h" #include "mysql_priv.h"
#include "events.h"
#include "event_scheduler_ng.h"
#include "event_queue.h" #include "event_queue.h"
#include "event_data_objects.h" #include "event_data_objects.h"
#include "event_db_repository.h" #include "event_db_repository.h"
#include "sp_head.h" #include "event_scheduler_ng.h"
#define EVENT_QUEUE_INITIAL_SIZE 30
#define EVENT_QUEUE_EXTENT 30
#ifdef __GNUC__ #ifdef __GNUC__
#if __GNUC__ >= 2 #if __GNUC__ >= 2
...@@ -36,21 +37,20 @@ ...@@ -36,21 +37,20 @@
/* /*
Compares the execute_at members of 2 Event_queue_element instances. Compares the execute_at members of two Event_queue_element instances.
Used as callback for the prioritized queue when shifting Used as callback for the prioritized queue when shifting
elements inside. elements inside.
SYNOPSIS SYNOPSIS
event_queue_element_data_compare_q() event_queue_element_data_compare_q()
vptr Not used (set it to NULL)
vptr - not used (set it to NULL) a First Event_queue_element object
a - first Event_queue_element object b Second Event_queue_element object
b - second Event_queue_element object
RETURN VALUE RETURN VALUE
-1 - a->execute_at < b->execute_at -1 a->execute_at < b->execute_at
0 - a->execute_at == b->execute_at 0 a->execute_at == b->execute_at
1 - a->execute_at > b->execute_at 1 a->execute_at > b->execute_at
NOTES NOTES
execute_at.second_part is not considered during comparison execute_at.second_part is not considered during comparison
...@@ -73,9 +73,13 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b) ...@@ -73,9 +73,13 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b)
Event_queue::Event_queue() Event_queue::Event_queue()
{ {
mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0; mutex_last_unlocked_at_line= mutex_last_locked_at_line=
mutex_last_unlocked_in_func= mutex_last_locked_in_func= ""; mutex_last_attempted_lock_at_line= 0;
mutex_queue_data_locked= FALSE;
mutex_last_unlocked_in_func= mutex_last_locked_in_func=
mutex_last_attempted_lock_in_func= "";
mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
} }
...@@ -107,24 +111,6 @@ Event_queue::deinit_mutexes() ...@@ -107,24 +111,6 @@ Event_queue::deinit_mutexes()
} }
/*
Signals the main scheduler thread that the queue has changed
its state.
SYNOPSIS
Event_queue::notify_observers()
*/
void
Event_queue::notify_observers()
{
DBUG_ENTER("Event_queue::notify_observers");
DBUG_PRINT("info", ("Signalling change of the queue"));
scheduler->queue_changed();
DBUG_VOID_RETURN;
}
/* /*
Inits the queue Inits the queue
...@@ -148,8 +134,9 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched) ...@@ -148,8 +134,9 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
db_repository= db_repo; db_repository= db_repo;
scheduler= sched; scheduler= sched;
if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
event_queue_element_compare_q, NULL, 30 /*auto_extent*/)) 0 /*smallest_on_top*/, event_queue_element_compare_q,
NULL, EVENT_QUEUE_EXTENT))
{ {
sql_print_error("SCHEDULER: Can't initialize the execution queue"); sql_print_error("SCHEDULER: Can't initialize the execution queue");
ret= TRUE; ret= TRUE;
...@@ -172,7 +159,8 @@ end: ...@@ -172,7 +159,8 @@ end:
/* /*
Deinits the queue Deinits the queue. Remove all elements from it and destroys them
too.
SYNOPSIS SYNOPSIS
Event_queue::deinit_queue() Event_queue::deinit_queue()
...@@ -193,12 +181,12 @@ Event_queue::deinit_queue() ...@@ -193,12 +181,12 @@ Event_queue::deinit_queue()
/* /*
Creates an event in the scheduler queue Adds an event to the queue.
SYNOPSIS SYNOPSIS
Event_queue::create_event() Event_queue::create_event()
et The event to add dbname The schema of the new event
check_existence Whether to check if already loaded. name The name of the new event
RETURN VALUE RETURN VALUE
OP_OK OK or scheduler not working OP_OK OK or scheduler not working
...@@ -209,21 +197,21 @@ int ...@@ -209,21 +197,21 @@ int
Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{ {
int res; int res;
Event_queue_element *element_new; Event_queue_element *new_element;
DBUG_ENTER("Event_queue::create_event"); DBUG_ENTER("Event_queue::create_event");
DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str)); DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str));
element_new= new Event_queue_element(); new_element= new Event_queue_element();
res= db_repository->load_named_event(thd, dbname, name, element_new); res= db_repository->load_named_event(thd, dbname, name, new_element);
if (res || element_new->status == Event_queue_element::DISABLED) if (res || new_element->status == Event_queue_element::DISABLED)
delete element_new; delete new_element;
else else
{ {
element_new->compute_next_execution_time(); new_element->compute_next_execution_time();
LOCK_QUEUE_DATA(); LOCK_QUEUE_DATA();
DBUG_PRINT("info", ("new event in the queue 0x%lx", element_new)); DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
queue_insert_safe(&queue, (byte *) element_new); queue_insert_safe(&queue, (byte *) new_element);
UNLOCK_QUEUE_DATA(); UNLOCK_QUEUE_DATA();
notify_observers(); notify_observers();
...@@ -254,53 +242,54 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, ...@@ -254,53 +242,54 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
LEX_STRING *new_schema, LEX_STRING *new_name) LEX_STRING *new_schema, LEX_STRING *new_name)
{ {
int res; int res;
Event_queue_element *element_old= NULL, Event_queue_element *old_element= NULL,
*element_new; *new_element;
DBUG_ENTER("Event_queue::update_event"); DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str)); DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
element_new= new Event_queue_element(); new_element= new Event_queue_element();
res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname, res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname,
new_name? *new_name:name, element_new); new_name? *new_name:name, new_element);
if (res) if (res)
{ {
delete element_new; delete new_element;
goto end; goto end;
} }
else if (element_new->status == Event_queue_element::DISABLED) else if (new_element->status == Event_queue_element::DISABLED)
{ {
DBUG_PRINT("info", ("The event is disabled.")); DBUG_PRINT("info", ("The event is disabled."));
/* /*
Destroy the object but don't skip to end: because we may have to remove Destroy the object but don't skip to end: because we may have to remove
object from the cache. object from the cache.
*/ */
delete element_new; delete new_element;
element_new= NULL; new_element= NULL;
} }
else else
element_new->compute_next_execution_time(); new_element->compute_next_execution_time();
LOCK_QUEUE_DATA(); LOCK_QUEUE_DATA();
if (!(element_old= find_event(dbname, name, TRUE))) if (!(old_element= find_n_remove_event(dbname, name)))
{ {
DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED", DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED",
dbname.str, name.str)); dbname.str, name.str));
} }
/* If not disabled event */ /* If not disabled event */
if (element_new) if (new_element)
{ {
DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx", DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx",
element_new, element_old)); new_element, old_element));
queue_insert_safe(&queue, (byte *) element_new); queue_insert_safe(&queue, (byte *) new_element);
} }
UNLOCK_QUEUE_DATA(); UNLOCK_QUEUE_DATA();
notify_observers(); if (new_element)
notify_observers();
if (element_old) if (old_element)
delete element_old; delete old_element;
end: end:
DBUG_PRINT("info", ("res=%d", res)); DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res); DBUG_RETURN(res);
...@@ -326,7 +315,7 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name) ...@@ -326,7 +315,7 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name)); DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name));
LOCK_QUEUE_DATA(); LOCK_QUEUE_DATA();
element= find_event(dbname, name, TRUE); element= find_n_remove_event(dbname, name);
UNLOCK_QUEUE_DATA(); UNLOCK_QUEUE_DATA();
if (element) if (element)
...@@ -343,48 +332,6 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name) ...@@ -343,48 +332,6 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
} }
/*
Searches for an event in the queue
SYNOPSIS
Event_queue::find_event()
db The schema of the event to find
name The event to find
remove_from_q If found whether to remove from the Q
RETURN VALUE
NULL Not found
otherwise Address
NOTE
The caller should do the locking also the caller is responsible for
actual signalling in case an event is removed from the queue
(signalling COND_new_work for instance).
*/
Event_queue_element *
Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
{
uint i;
DBUG_ENTER("Event_queue::find_event");
for (i= 0; i < queue.elements; ++i)
{
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
et->dbname.str, et->name.str));
if (event_basic_identifier_equal(db, name, et))
{
if (remove_from_q)
queue_remove(&queue, i);
DBUG_RETURN(et);
}
}
DBUG_RETURN(NULL);
}
/* /*
Drops all events from the in-memory queue and disk that match Drops all events from the in-memory queue and disk that match
certain pattern evaluated by a comparator function certain pattern evaluated by a comparator function
...@@ -404,7 +351,7 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) ...@@ -404,7 +351,7 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
void void
Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*comparator)(LEX_STRING *, Event_basic *)) bool (*comparator)(LEX_STRING, Event_basic *))
{ {
DBUG_ENTER("Event_queue::drop_matching_events"); DBUG_ENTER("Event_queue::drop_matching_events");
DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str)); DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str));
...@@ -414,7 +361,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, ...@@ -414,7 +361,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
{ {
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i); Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str)); DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str));
if (comparator(&pattern, et)) if (comparator(pattern, et))
{ {
/* /*
The queue is ordered. If we remove an element, then all elements after The queue is ordered. If we remove an element, then all elements after
...@@ -468,25 +415,59 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) ...@@ -468,25 +415,59 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
/* /*
Returns the number of elements in the queue Signals the observers (the main scheduler thread) that the
state of the queue has been changed.
SYNOPSIS SYNOPSIS
Event_queue::events_count() Event_queue::notify_observers()
*/
void
Event_queue::notify_observers()
{
DBUG_ENTER("Event_queue::notify_observers");
DBUG_PRINT("info", ("Signalling change of the queue"));
scheduler->queue_changed();
DBUG_VOID_RETURN;
}
/*
Searches for an event in the queue
SYNOPSIS
Event_queue::find_n_remove_event()
db The schema of the event to find
name The event to find
RETURN VALUE RETURN VALUE
Number of Event_queue_element objects in the queue NULL Not found
otherwise Address
NOTE
The caller should do the locking also the caller is responsible for
actual signalling in case an event is removed from the queue.
*/ */
uint Event_queue_element *
Event_queue::events_count() Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
{ {
uint n; uint i;
DBUG_ENTER("Event_scheduler::events_count"); DBUG_ENTER("Event_queue::find_n_remove_event");
LOCK_QUEUE_DATA();
n= queue.elements; for (i= 0; i < queue.elements; ++i)
UNLOCK_QUEUE_DATA(); {
DBUG_PRINT("info", ("n=%u", n)); Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
DBUG_RETURN(n); DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
et->dbname.str, et->name.str));
if (event_basic_identifier_equal(db, name, et))
{
queue_remove(&queue, i);
DBUG_RETURN(et);
}
}
DBUG_RETURN(NULL);
} }
...@@ -620,6 +601,11 @@ end: ...@@ -620,6 +601,11 @@ end:
SYNOPSIS SYNOPSIS
Event_queue::check_system_tables() Event_queue::check_system_tables()
thd Thread
RETURN VALUE
FALSE OK
TRUE Error
*/ */
bool bool
...@@ -738,6 +724,14 @@ Event_queue::empty_queue() ...@@ -738,6 +724,14 @@ Event_queue::empty_queue()
} }
/*
Dumps the queue to the trace log.
SYNOPSIS
Event_queue::dbug_dump_queue()
now Current timestamp
*/
inline void inline void
Event_queue::dbug_dump_queue(time_t now) Event_queue::dbug_dump_queue(time_t now)
{ {
...@@ -761,12 +755,37 @@ Event_queue::dbug_dump_queue(time_t now) ...@@ -761,12 +755,37 @@ Event_queue::dbug_dump_queue(time_t now)
#endif #endif
} }
Event_job_data *
/*
Checks whether the top of the queue is elligible for execution and
returns an Event_job_data instance in case it should be executed.
`now` is compared against `execute_at` of the top element in the queue.
SYNOPSIS
Event_queue::dbug_dump_queue()
thd [in] Thread
now [in] Current timestamp
job_data [out] The object to execute
abstime [out] Time to sleep
RETURN VALUE
FALSE No error. If *job_data==NULL then top not elligible for execution.
Could be that there is no top. If abstime->tv_sec is set to value
greater than zero then use abstime with pthread_cond_timedwait().
If abstime->tv_sec is zero then sleep with pthread_cond_wait().
abstime->tv_nsec is always zero.
TRUE Error
*/
bool
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
Event_job_data **job_data,
struct timespec *abstime) struct timespec *abstime)
{ {
bool ret= FALSE;
struct timespec top_time; struct timespec top_time;
Event_job_data *et_new= NULL; *job_data= NULL;
DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now)); DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now));
abstime->tv_nsec= 0; abstime->tv_nsec= 0;
...@@ -780,56 +799,58 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, ...@@ -780,56 +799,58 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
} }
dbug_dump_queue(now); dbug_dump_queue(now);
Event_queue_element *et= ((Event_queue_element*) queue_element(&queue, 0)); Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0));
top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
if (top_time.tv_sec <= now) top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at);
{
DBUG_PRINT("info", ("Ready for execution")); if (top_time.tv_sec > now)
abstime->tv_sec= 0;
et_new= new Event_job_data();
if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
et_new)))
{
delete et_new;
et_new= NULL;
DBUG_ASSERT(0);
break;
}
et->mark_last_executed(thd);
if (et->compute_next_execution_time())
et->status= Event_queue_element::DISABLED;
DBUG_PRINT("info", ("event's status is %d", et->status));
et->update_timing_fields(thd);
if (((et->execute_at.year && !et->expression) || et->execute_at_null) ||
(et->status == Event_queue_element::DISABLED))
{
DBUG_PRINT("info", ("removing from the queue"));
if (et->dropped)
et->drop(thd);
delete et;
queue_remove(&queue, 0);
}
else
queue_replaced(&queue);
}
else
{ {
abstime->tv_sec= top_time.tv_sec; abstime->tv_sec= top_time.tv_sec;
DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now, DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now,
abstime->tv_sec)); abstime->tv_sec));
break;
} }
DBUG_PRINT("info", ("Ready for execution"));
abstime->tv_sec= 0;
*job_data= new Event_job_data();
if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
*job_data)))
{
delete *job_data;
*job_data= NULL;
ret= TRUE;
break;
}
top->mark_last_executed(thd);
if (top->compute_next_execution_time())
top->status= Event_queue_element::DISABLED;
DBUG_PRINT("info", ("event's status is %d", top->status));
top->update_timing_fields(thd);
if (((top->execute_at.year && !top->expression) || top->execute_at_null) ||
(top->status == Event_queue_element::DISABLED))
{
DBUG_PRINT("info", ("removing from the queue"));
if (top->dropped)
top->drop(thd);
delete top;
queue_remove(&queue, 0);
}
else
queue_replaced(&queue);
} while (0); } while (0);
UNLOCK_QUEUE_DATA(); UNLOCK_QUEUE_DATA();
DBUG_PRINT("info", ("returning. et_new=0x%lx abstime.tv_sec=%d ", et_new, DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ",
abstime->tv_sec)); ret, *job_data, abstime->tv_sec));
if (et_new)
DBUG_PRINT("info", ("db=%s name=%s definer=%s", if (*job_data)
et_new->dbname.str, et_new->name.str, et_new->definer.str)); DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str,
DBUG_RETURN(et_new); (*job_data)->name.str, (*job_data)->definer.str));
DBUG_RETURN(ret);
} }
...@@ -848,10 +869,18 @@ Event_queue::lock_data(const char *func, uint line) ...@@ -848,10 +869,18 @@ Event_queue::lock_data(const char *func, uint line)
{ {
DBUG_ENTER("Event_queue::lock_data"); DBUG_ENTER("Event_queue::lock_data");
DBUG_PRINT("enter", ("func=%s line=%u", func, line)); DBUG_PRINT("enter", ("func=%s line=%u", func, line));
mutex_last_attempted_lock_in_func= func;
mutex_last_attempted_lock_at_line= line;
mutex_queue_data_attempting_lock= TRUE;
pthread_mutex_lock(&LOCK_event_queue); pthread_mutex_lock(&LOCK_event_queue);
mutex_last_attempted_lock_in_func= "";
mutex_last_attempted_lock_at_line= 0;
mutex_queue_data_attempting_lock= FALSE;
mutex_last_locked_in_func= func; mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line; mutex_last_locked_at_line= line;
mutex_queue_data_locked= TRUE; mutex_queue_data_locked= TRUE;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -921,6 +950,13 @@ Event_queue::dump_internal_status(THD *thd) ...@@ -921,6 +950,13 @@ Event_queue::dump_internal_status(THD *thd)
protocol->store(&int_string); protocol->store(&int_string);
ret= protocol->write(); ret= protocol->write();
/* queue_data_attempting_lock */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue data attempting lock"), scs);
int_string.set((longlong) mutex_queue_data_attempting_lock, scs);
protocol->store(&int_string);
ret= protocol->write();
/* last locked at*/ /* last locked at*/
protocol->prepare_for_resend(); protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue last locked at"), scs); protocol->store(STRING_WITH_LEN("queue last locked at"), scs);
...@@ -940,6 +976,17 @@ Event_queue::dump_internal_status(THD *thd) ...@@ -940,6 +976,17 @@ Event_queue::dump_internal_status(THD *thd)
mutex_last_unlocked_at_line)); mutex_last_unlocked_at_line));
protocol->store(&tmp_string); protocol->store(&tmp_string);
ret= protocol->write(); ret= protocol->write();
/* last attempted lock at*/
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue last attempted lock at"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s::%d",
mutex_last_attempted_lock_in_func,
mutex_last_attempted_lock_at_line));
protocol->store(&tmp_string);
ret= protocol->write();
#endif #endif
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
class sp_name;
class Event_basic; class Event_basic;
class Event_db_repository; class Event_db_repository;
class Event_job_data; class Event_job_data;
...@@ -57,31 +56,28 @@ public: ...@@ -57,31 +56,28 @@ public:
void void
drop_schema_events(THD *thd, LEX_STRING schema); drop_schema_events(THD *thd, LEX_STRING schema);
uint
events_count();
static bool static bool
check_system_tables(THD *thd); check_system_tables(THD *thd);
void void
recalculate_activation_times(THD *thd); recalculate_activation_times(THD *thd);
Event_job_data * bool
get_top_for_execution_if_time(THD *thd, time_t now, struct timespec *abstime); get_top_for_execution_if_time(THD *thd, time_t now, Event_job_data **job_data,
struct timespec *abstime);
bool bool
dump_internal_status(THD *thd); dump_internal_status(THD *thd);
protected: protected:
Event_queue_element * Event_queue_element *
find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q); find_n_remove_event(LEX_STRING db, LEX_STRING name);
int int
load_events_from_db(THD *thd); load_events_from_db(THD *thd);
void void
drop_matching_events(THD *thd, LEX_STRING pattern, drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*)(LEX_STRING *, Event_basic *)); bool (*)(LEX_STRING, Event_basic *));
void void
empty_queue(); empty_queue();
...@@ -93,9 +89,12 @@ protected: ...@@ -93,9 +89,12 @@ protected:
uint mutex_last_locked_at_line; uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line; uint mutex_last_unlocked_at_line;
uint mutex_last_attempted_lock_at_line;
const char* mutex_last_locked_in_func; const char* mutex_last_locked_in_func;
const char* mutex_last_unlocked_in_func; const char* mutex_last_unlocked_in_func;
const char* mutex_last_attempted_lock_in_func;
bool mutex_queue_data_locked; bool mutex_queue_data_locked;
bool mutex_queue_data_attempting_lock;
/* helper functions for working with mutexes & conditionals */ /* helper functions for working with mutexes & conditionals */
void void
......
...@@ -176,6 +176,7 @@ deinit_event_thread(THD *thd) ...@@ -176,6 +176,7 @@ deinit_event_thread(THD *thd)
my_thread_end(); my_thread_end();
} }
/* /*
Function that executes the scheduler, Function that executes the scheduler,
...@@ -271,7 +272,7 @@ event_worker_ng_thread(void *arg) ...@@ -271,7 +272,7 @@ event_worker_ng_thread(void *arg)
thd->enable_slow_log= TRUE; thd->enable_slow_log= TRUE;
ret= event->execute(thd, thd->mem_root); ret= event->execute(thd);
evex_print_warnings(thd, event); evex_print_warnings(thd, event);
...@@ -506,8 +507,13 @@ Event_scheduler_ng::run(THD *thd) ...@@ -506,8 +507,13 @@ Event_scheduler_ng::run(THD *thd)
{ {
thd->end_time(); thd->end_time();
/* Gets a minimized version */ /* Gets a minimized version */
job_data= queue-> if (queue->get_top_for_execution_if_time(thd, thd->query_start(),
get_top_for_execution_if_time(thd, thd->query_start(), &abstime); &job_data, &abstime))
{
sql_print_information("SCHEDULER: Serious error during getting next"
" event to execute. Stopping.");
break;
}
DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d " DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d "
"abs_time.tv_sec=%d", "abs_time.tv_sec=%d",
......
...@@ -41,10 +41,6 @@ ...@@ -41,10 +41,6 @@
- Add logging to file - Add logging to file
Warning:
- For now parallel execution is not possible because the same sp_head cannot
be executed few times!!! There is still no lock attached to particular
event.
*/ */
...@@ -84,18 +80,14 @@ ulong Events::opt_event_scheduler= 2; ...@@ -84,18 +80,14 @@ ulong Events::opt_event_scheduler= 2;
SYNOPSIS SYNOPSIS
sortcmp_lex_string() sortcmp_lex_string()
s First LEX_STRING
s - first LEX_STRING t Second LEX_STRING
t - second LEX_STRING cs Charset
cs - charset
RETURN VALUE RETURN VALUE
-1 - s < t -1 s < t
0 - s == t 0 s == t
1 - s > t 1 s > t
Notes
TIME.second_part is not considered during comparison
*/ */
int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs) int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
...@@ -104,6 +96,7 @@ int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs) ...@@ -104,6 +96,7 @@ int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
(uchar *) t.str,t.length, 0); (uchar *) t.str,t.length, 0);
} }
/* /*
Accessor for the singleton instance. Accessor for the singleton instance.
...@@ -131,13 +124,13 @@ Events::get_instance() ...@@ -131,13 +124,13 @@ Events::get_instance()
SYNOPSIS SYNOPSIS
Events::reconstruct_interval_expression() Events::reconstruct_interval_expression()
buf - preallocated String buffer to add the value to buf Preallocated String buffer to add the value to
interval - the interval type (for instance YEAR_MONTH) interval The interval type (for instance YEAR_MONTH)
expression - the value in the lowest entity expression The value in the lowest entity
RETURN VALUE RETURN VALUE
0 - OK 0 OK
1 - Error 1 Error
*/ */
int int
...@@ -256,7 +249,7 @@ common_1_lev_code: ...@@ -256,7 +249,7 @@ common_1_lev_code:
/* /*
Open mysql.event table for read Opens mysql.event table with specified lock
SYNOPSIS SYNOPSIS
Events::open_event_table() Events::open_event_table()
...@@ -283,11 +276,10 @@ Events::open_event_table(THD *thd, enum thr_lock_type lock_type, ...@@ -283,11 +276,10 @@ Events::open_event_table(THD *thd, enum thr_lock_type lock_type,
SYNOPSIS SYNOPSIS
Events::create_event() Events::create_event()
thd THD thd [in] THD
et event's data et [in] Event's data from parsing stage
create_options Options specified when in the query. We are if_not_exists [in] Whether IF NOT EXISTS was specified in the DDL
interested whether there is IF NOT EXISTS rows_affected [out] How many rows were affected
rows_affected How many rows were affected
RETURN VALUE RETURN VALUE
0 OK 0 OK
...@@ -328,9 +320,10 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists, ...@@ -328,9 +320,10 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists,
SYNOPSIS SYNOPSIS
Events::update_event() Events::update_event()
thd THD thd [in] THD
et Event's data from parsing stage et [in] Event's data from parsing stage
new_name Set in case of RENAME TO. rename_to [in] Set in case of RENAME TO.
rows_affected [out] How many rows were affected.
RETURN VALUE RETURN VALUE
0 OK 0 OK
...@@ -338,26 +331,25 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists, ...@@ -338,26 +331,25 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists,
NOTES NOTES
et contains data about dbname and event name. et contains data about dbname and event name.
new_name is the new name of the event, if not null (this means new_name is the new name of the event, if not null this means
that RENAME TO was specified in the query) that RENAME TO was specified in the query
*/ */
int int
Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name, Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to,
uint *rows_affected) uint *rows_affected)
{ {
int ret; int ret;
DBUG_ENTER("Events::update_event"); DBUG_ENTER("Events::update_event");
LEX_STRING *new_dbname= rename_to? &rename_to->m_db: NULL;
LEX_STRING *new_name= rename_to? &rename_to->m_name: NULL;
pthread_mutex_lock(&LOCK_event_metadata); pthread_mutex_lock(&LOCK_event_metadata);
/* On error conditions my_error() is called so no need to handle here */ /* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->update_event(thd, parse_data, new_name))) if (!(ret= db_repository->update_event(thd, parse_data, new_dbname, new_name)))
{ {
if ((ret= event_queue->update_event(thd, if ((ret= event_queue->update_event(thd, parse_data->dbname,
parse_data->dbname, parse_data->name, new_dbname, new_name)))
parse_data->name,
new_name? &new_name->m_db: NULL,
new_name? &new_name->m_name: NULL)))
{ {
DBUG_ASSERT(ret == OP_LOAD_ERROR); DBUG_ASSERT(ret == OP_LOAD_ERROR);
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0)); my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
...@@ -374,16 +366,16 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name, ...@@ -374,16 +366,16 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
SYNOPSIS SYNOPSIS
Events::drop_event() Events::drop_event()
thd THD thd [in] THD
dbname Event's schema dbname [in] Event's schema
name Event's name name [in] Event's name
if_exists When set and the event does not exist => warning onto if_exists [in] When set and the event does not exist =>
the stack warning onto the stack
rows_affected Affected number of rows is returned heres rows_affected [out] Affected number of rows is returned here
only_from_disk Whether to remove the event from the queue too. In case only_from_disk [in] Whether to remove the event from the queue too.
of Event_job_data::drop() it's needed to do only disk In case of Event_job_data::drop() it's needed to
drop because Event_queue will handle removal from memory do only disk drop because Event_queue will handle
queue. removal from memory queue.
RETURN VALUE RETURN VALUE
0 OK 0 OK
...@@ -429,7 +421,7 @@ Events::drop_schema_events(THD *thd, char *db) ...@@ -429,7 +421,7 @@ Events::drop_schema_events(THD *thd, char *db)
int ret= 0; int ret= 0;
LEX_STRING db_lex= {db, strlen(db)}; LEX_STRING db_lex= {db, strlen(db)};
DBUG_ENTER("evex_drop_db_events"); DBUG_ENTER("Events::drop_schema_events");
DBUG_PRINT("enter", ("dropping events from %s", db)); DBUG_PRINT("enter", ("dropping events from %s", db));
pthread_mutex_lock(&LOCK_event_metadata); pthread_mutex_lock(&LOCK_event_metadata);
...@@ -455,24 +447,22 @@ Events::drop_schema_events(THD *thd, char *db) ...@@ -455,24 +447,22 @@ Events::drop_schema_events(THD *thd, char *db)
*/ */
int int
Events::show_create_event(THD *thd, sp_name *spn) Events::show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{ {
CHARSET_INFO *scs= system_charset_info;
int ret; int ret;
Event_timed *et= new Event_timed(); Event_timed *et= new Event_timed();
Open_tables_state backup;
DBUG_ENTER("Events::show_create_event"); DBUG_ENTER("Events::show_create_event");
DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str)); DBUG_PRINT("enter", ("name: %s@%s", dbname.str, name.str));
thd->reset_n_backup_open_tables_state(&backup); ret= db_repository->load_named_event(thd, dbname, name, et);
ret= db_repository->find_event(thd, spn->m_db, spn->m_name, et);
thd->restore_backup_open_tables_state(&backup);
if (!ret) if (!ret)
{ {
Protocol *protocol= thd->protocol; Protocol *protocol= thd->protocol;
char show_str_buf[768]; char show_str_buf[10 * STRING_BUFFER_USUAL_SIZE];
String show_str(show_str_buf, sizeof(show_str_buf), system_charset_info); String show_str(show_str_buf, sizeof(show_str_buf), scs);
List<Item> field_list; List<Item> field_list;
byte *sql_mode_str; byte *sql_mode_str;
ulong sql_mode_len=0; ulong sql_mode_len=0;
...@@ -491,18 +481,19 @@ Events::show_create_event(THD *thd, sp_name *spn) ...@@ -491,18 +481,19 @@ Events::show_create_event(THD *thd, sp_name *spn)
field_list.push_back(new Item_empty_string("sql_mode", sql_mode_len)); field_list.push_back(new Item_empty_string("sql_mode", sql_mode_len));
field_list.push_back(new Item_empty_string("Create Event", field_list.
show_str.length())); push_back(new Item_empty_string("Create Event", show_str.length()));
if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF)) Protocol::SEND_EOF))
goto err; goto err;
protocol->prepare_for_resend(); protocol->prepare_for_resend();
protocol->store(et->name.str, et->name.length, system_charset_info); protocol->store(et->name.str, et->name.length, scs);
protocol->store((char*) sql_mode_str, sql_mode_len, system_charset_info); protocol->store((char*) sql_mode_str, sql_mode_len, scs);
protocol->store(show_str.c_ptr(), show_str.length(), system_charset_info); protocol->store(show_str.c_ptr(), show_str.length(), scs);
ret= protocol->write(); ret= protocol->write();
send_eof(thd); send_eof(thd);
} }
...@@ -546,7 +537,8 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */) ...@@ -546,7 +537,8 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */)
DBUG_RETURN(1); DBUG_RETURN(1);
db= thd->lex->select_lex.db; db= thd->lex->select_lex.db;
} }
DBUG_RETURN(get_instance()->db_repository->fill_schema_events(thd, tables, db)); DBUG_RETURN(get_instance()->db_repository->
fill_schema_events(thd, tables, db));
} }
...@@ -561,14 +553,12 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */) ...@@ -561,14 +553,12 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */)
RETURN VALUE RETURN VALUE
0 OK 0 OK
1 Error 1 Error in case the scheduler can't start
*/ */
int int
Events::init() Events::init()
{ {
int ret= 0;
Event_db_repository *db_repo;
DBUG_ENTER("Events::init"); DBUG_ENTER("Events::init");
event_queue->init_queue(db_repository, scheduler_ng); event_queue->init_queue(db_repository, scheduler_ng);
scheduler_ng->init_scheduler(event_queue); scheduler_ng->init_scheduler(event_queue);
...@@ -653,7 +643,10 @@ Events::destroy_mutexes() ...@@ -653,7 +643,10 @@ Events::destroy_mutexes()
/* /*
Proxy for Event_scheduler::dump_internal_status Dumps the internal status of the scheduler and the memory cache
into a table with two columns - Name & Value. Different properties
which could be useful for debugging for instance deadlocks are
returned.
SYNOPSIS SYNOPSIS
Events::dump_internal_status() Events::dump_internal_status()
...@@ -733,8 +726,8 @@ Events::stop_execution_of_events() ...@@ -733,8 +726,8 @@ Events::stop_execution_of_events()
Events::is_started() Events::is_started()
RETURN VALUE RETURN VALUE
TRUE Yes TRUE Yes
FALSE No FALSE No
*/ */
bool bool
......
...@@ -81,7 +81,7 @@ public: ...@@ -81,7 +81,7 @@ public:
uint *rows_affected); uint *rows_affected);
int int
update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name, update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to,
uint *rows_affected); uint *rows_affected);
int int
...@@ -95,7 +95,7 @@ public: ...@@ -95,7 +95,7 @@ public:
open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table); open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
int int
show_create_event(THD *thd, sp_name *spn); show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
/* Needed for both SHOW CREATE EVENT and INFORMATION_SCHEMA */ /* Needed for both SHOW CREATE EVENT and INFORMATION_SCHEMA */
static int static int
......
...@@ -3926,7 +3926,8 @@ end_with_restore_list: ...@@ -3926,7 +3926,8 @@ end_with_restore_list:
} }
if (lex->sql_command == SQLCOM_SHOW_CREATE_EVENT) if (lex->sql_command == SQLCOM_SHOW_CREATE_EVENT)
res= Events::get_instance()->show_create_event(thd, lex->spname); res= Events::get_instance()->show_create_event(thd, lex->spname->m_db,
lex->spname->m_name);
else else
{ {
uint affected= 1; uint affected= 1;
......
...@@ -1447,7 +1447,8 @@ ev_sql_stmt: ...@@ -1447,7 +1447,8 @@ ev_sql_stmt:
LEX *lex=Lex; LEX *lex=Lex;
// return back to the original memory root ASAP // return back to the original memory root ASAP
lex->sphead->init_strings(YYTHD, lex, NULL); lex->sphead->init_strings(YYTHD, lex,
Lex->event_parse_data->identifier);
lex->sphead->restore_thd_mem_root(YYTHD); lex->sphead->restore_thd_mem_root(YYTHD);
lex->sp_chistics.suid= SP_IS_SUID;//always the definer! lex->sp_chistics.suid= SP_IS_SUID;//always the definer!
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment