diff --git a/mysql-test/r/events_stress.result b/mysql-test/r/events_stress.result new file mode 100644 index 0000000000000000000000000000000000000000..9f95cfad75d8d21849469358f116e930b1c95f3b --- /dev/null +++ b/mysql-test/r/events_stress.result @@ -0,0 +1,46 @@ +CREATE DATABASE IF NOT EXISTS events_test; +CREATE DATABASE events_test2; +USE events_test2; +CREATE EVENT ev_drop1 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +CREATE EVENT ev_drop2 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +CREATE EVENT ev_drop3 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +USE events_test; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +3 +DROP DATABASE events_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +0 +"Now testing stability - dropping db -> events while they are running" +CREATE DATABASE events_test2; +USE events_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +1000 +SET GLOBAL event_scheduler=1; +DROP DATABASE events_test2; +SET GLOBAL event_scheduler=0; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +0 +CREATE DATABASE events_test3; +USE events_test3; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test3'; +COUNT(*) +950 +CREATE DATABASE events_test4; +USE events_test4; +CREATE DATABASE events_test2; +USE events_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +1050 +DROP DATABASE events_test2; +SET GLOBAL event_scheduler=0; +DROP DATABASE events_test3; +SET GLOBAL event_scheduler=1; +DROP DATABASE events_test4; +SET GLOBAL event_scheduler=1; +USE events_test; +DROP DATABASE events_test; diff --git a/mysql-test/t/events.test b/mysql-test/t/events.test index 947d8e7576b3dbc6d07746965aa573ab845421da..2755d4ec02e70f140bdce25afc5c2c1e8e42b49c 100644 --- a/mysql-test/t/events.test +++ b/mysql-test/t/events.test @@ -274,7 +274,6 @@ drop event one_event; --echo "Sleep a bit so the server closes the second connection" --sleep 2 - create event e_26 on schedule at '2017-01-01 00:00:00' disable do set @a = 5; select db, name, body, definer, convert_tz(execute_at, 'UTC', 'SYSTEM'), on_completion from mysql.event; drop event e_26; diff --git a/mysql-test/t/events_stress.test b/mysql-test/t/events_stress.test new file mode 100644 index 0000000000000000000000000000000000000000..f6eed79425c848d106ff897454751602c443b2c5 --- /dev/null +++ b/mysql-test/t/events_stress.test @@ -0,0 +1,80 @@ +CREATE DATABASE IF NOT EXISTS events_test; +# +# DROP DATABASE test start (bug #16406) +# +CREATE DATABASE events_test2; +USE events_test2; +CREATE EVENT ev_drop1 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +CREATE EVENT ev_drop2 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +CREATE EVENT ev_drop3 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +USE events_test; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +DROP DATABASE events_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; + +--echo "Now testing stability - dropping db -> events while they are running" +CREATE DATABASE events_test2; +USE events_test2; +--disable_query_log +let $1= 1000; +while ($1) +{ + eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + dec $1; +} +--enable_query_log +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +SET GLOBAL event_scheduler=1; +--sleep 4 +DROP DATABASE events_test2; + +SET GLOBAL event_scheduler=0; +--sleep 2 +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +CREATE DATABASE events_test3; +USE events_test3; +--disable_query_log +let $1= 950; +while ($1) +{ + eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + dec $1; +} +--enable_query_log +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test3'; +--sleep 3 +CREATE DATABASE events_test4; +USE events_test4; +--disable_query_log +let $1= 860; +while ($1) +{ + eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + dec $1; +} +--enable_query_log + + +CREATE DATABASE events_test2; +USE events_test2; +--disable_query_log +let $1= 1050; +while ($1) +{ + eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + dec $1; +} +--enable_query_log +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +--sleep 6 +DROP DATABASE events_test2; +SET GLOBAL event_scheduler=0; +DROP DATABASE events_test3; +SET GLOBAL event_scheduler=1; +DROP DATABASE events_test4; +SET GLOBAL event_scheduler=1; +USE events_test; +# +# DROP DATABASE test end (bug #16406) +# +DROP DATABASE events_test; diff --git a/sql/event.cc b/sql/event.cc index c0a043aaf09641e93a0be296bcfc4bd0b394ccb3..76d802b9ac01e969a225b7ff0b79c8369f40dc8c 100644 --- a/sql/event.cc +++ b/sql/event.cc @@ -514,6 +514,28 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table) SYNOPSIS evex_db_find_event_aux() + thd Thread context + et evet_timed object containing dbname, name & definer + table TABLE object for open mysql.event table. + + RETURN VALUE + 0 - Routine found + EVEX_KEY_NOT_FOUND - No routine with given name +*/ + +inline int +evex_db_find_event_aux(THD *thd, event_timed *et, TABLE *table) +{ + return evex_db_find_event_by_name(thd, et->dbname, et->name, + et->definer, table); +} + + +/* + Find row in open mysql.event table representing event + + SYNOPSIS + evex_db_find_event_by_name() thd Thread context dbname Name of event's database rname Name of the event inside the db @@ -525,13 +547,13 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table) */ int -evex_db_find_event_aux(THD *thd, const LEX_STRING dbname, - const LEX_STRING ev_name, - const LEX_STRING user_name, - TABLE *table) +evex_db_find_event_by_name(THD *thd, const LEX_STRING dbname, + const LEX_STRING ev_name, + const LEX_STRING user_name, + TABLE *table) { byte key[MAX_KEY_LENGTH]; - DBUG_ENTER("evex_db_find_event_aux"); + DBUG_ENTER("evex_db_find_event_by_name"); DBUG_PRINT("enter", ("name: %.*s", ev_name.length, ev_name.str)); /* @@ -710,7 +732,7 @@ db_create_event(THD *thd, event_timed *et, my_bool create_if_not, } DBUG_PRINT("info", ("check existance of an event with the same name")); - if (!evex_db_find_event_aux(thd, et->dbname, et->name, et->definer, table)) + if (!evex_db_find_event_aux(thd, et, table)) { if (create_if_not) { @@ -848,7 +870,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name) goto err; } - if (!evex_db_find_event_aux(thd, new_name->m_db, new_name->m_name, + if (!evex_db_find_event_by_name(thd, new_name->m_db, new_name->m_name, et->definer, table)) { my_error(ER_EVENT_ALREADY_EXISTS, MYF(0), new_name->m_name.str); @@ -861,8 +883,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name) overwrite the key and SE will tell us that it cannot find the already found row (copied into record[1] later */ - if (EVEX_KEY_NOT_FOUND == evex_db_find_event_aux(thd, et->dbname, et->name, - et->definer, table)) + if (EVEX_KEY_NOT_FOUND == evex_db_find_event_aux(thd, et, table)) { my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), et->name.str); goto err; @@ -943,8 +964,8 @@ db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, event_timed **ett, goto done; } - if ((ret= evex_db_find_event_aux(thd, name->m_db, name->m_name, *definer, - table))) + if ((ret= evex_db_find_event_by_name(thd, name->m_db, name->m_name, *definer, + table))) { my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name->m_name.str); goto done; @@ -1089,7 +1110,7 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock, if (!sortcmp_lex_string(*name, et->name, system_charset_info) && !sortcmp_lex_string(*db, et->dbname, system_charset_info)) { - if (!et->is_running()) + if (et->can_spawn_now()) { DBUG_PRINT("evex_remove_from_cache", ("not running - free and delete")); et->free_sp(); @@ -1393,3 +1414,187 @@ evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer) DBUG_RETURN(ret); } + + +/* + evex_drop_db_events - Drops all events in the selected database + + thd - Thread + db - ASCIIZ the name of the database + + Returns: + 0 - OK + 1 - Failed to delete a specific row + 2 - Got NULL while reading db name from a row + + Note: + The algo is the following + 1. Go through the in-memory cache, if the scheduler is working + and for every event whose dbname matches the database we drop + check whether is currently in execution: + - event_timed::can_spawn() returns true -> the event is not + being executed in a child thread. The reason not to use + event_timed::is_running() is that the latter shows only if + it is being executed, which is 99% of the time in the thread + but there are some initiliazations before and after the + anonymous SP is being called. So if we delete in this moment + -=> *boom*, so we have to check whether the thread has been + spawned and can_spawn() is the right method. + - event_timed::can_spawn() returns false -> being runned ATM + just set the flags so it should drop itself. + +*/ + +int +evex_drop_db_events(THD *thd, char *db) +{ + TABLE *table; + READ_RECORD read_record_info; + MYSQL_LOCK *lock; + int ret= 0; + int i; + LEX_STRING db_lex= {db, strlen(db)}; + + DBUG_ENTER("evex_drop_db_events"); + DBUG_PRINT("info",("dropping events from %s", db)); + + + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + + if ((ret= evex_open_event_table(thd, TL_WRITE, &table))) + { + sql_print_error("Table mysql.event is damaged."); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + DBUG_RETURN(SP_OPEN_TABLE_FAILED); + } + + DBUG_PRINT("info",("%d elements in the queue", + evex_queue_num_elements(EVEX_EQ_NAME))); + VOID(pthread_mutex_lock(&LOCK_evex_running)); + if (!evex_is_running) + goto skip_memory; + + for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) + { + event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*); + if (sortcmp_lex_string(et->dbname, db_lex, system_charset_info)) + continue; + + if (et->can_spawn_now_n_lock(thd)) + { + DBUG_PRINT("info",("event %s not running - direct delete", et->name.str)); + if (!(ret= evex_db_find_event_aux(thd, et, table))) + { + DBUG_PRINT("info",("event %s found on disk", et->name.str)); + if ((ret= table->file->ha_delete_row(table->record[0]))) + { + sql_print_error("Error while deleting a row - dropping " + "a database. Skipping the rest."); + my_error(ER_EVENT_DROP_FAILED, MYF(0), et->name.str); + goto end; + } + DBUG_PRINT("info",("deleted event [%s] num [%d]. Time to free mem", + et->name.str, i)); + } + else if (ret == EVEX_KEY_NOT_FOUND) + { + sql_print_error("Expected to find event %s.%s of %s on disk-not there.", + et->dbname.str, et->name.str, et->definer.str); + } + et->free_sp(); + delete et; + et= 0; + /* no need to call et->spawn_unlock because we already cleaned et */ + } + else + { + DBUG_PRINT("info",("event %s is running. setting exec_no_more and dropped", + et->name.str)); + et->flags|= EVENT_EXEC_NO_MORE; + et->dropped= TRUE; + } + DBUG_PRINT("info",("%d elements in the queue", + evex_queue_num_elements(EVEX_EQ_NAME))); + evex_queue_delete_element(&EVEX_EQ_NAME, i);// 1 is top + DBUG_PRINT("info",("%d elements in the queue", + evex_queue_num_elements(EVEX_EQ_NAME))); + /* + decrease so we start at the same position, there will be + less elements in the queue, it will still be ordered so on + next iteration it will be again i the current element or if + no more we finish. + */ + --i; + } + +skip_memory: + /* + The reasoning behind having two loops is the following: + If there was only one loop, the table-scan, then for every element which + matches, the queue in memory has to be searched to remove the element. + While if we go first over the queue and remove what's in there we have only + one pass over it and after finishing it, moving to table-scan for the disabled + events. This needs quite less time and means quite less locking on + LOCK_event_arrays. + */ + DBUG_PRINT("info",("Mem-cache checked, now going to db for disabled events")); + /* only enabled events are in memory, so we go now and delete the rest */ + init_read_record(&read_record_info, thd, table ,NULL,1,0); + while (!(read_record_info.read_record(&read_record_info)) && !ret) + { + char *et_db; + + if ((et_db= get_field(thd->mem_root, table->field[EVEX_FIELD_DB])) == NULL) + { + ret= 2; + break; + } + + LEX_STRING et_db_lex= {et_db, strlen(et_db)}; + if (!sortcmp_lex_string(et_db_lex, db_lex, system_charset_info)) + { + event_timed ett; + char *ptr; + + if ((ptr= get_field(thd->mem_root, table->field[EVEX_FIELD_STATUS])) + == NullS) + { + sql_print_error("Error while loading from mysql.event. " + "Table probably corrupted"); + goto end; + } + /* + When not running nothing is in memory so we have to clean + everything. + We don't delete EVENT_ENABLED events when the scheduler is running + because maybe this is an event which we asked to drop itself when + it is finished and it hasn't finished yet, so we don't touch it. + It will drop itself. The not running ENABLED events has been already + deleted from ha_delete_row() above in the loop over the QUEUE + (in case the executor is running). + 'D' stands for DISABLED, 'E' for ENABLED - it's an enum + */ + if ((evex_is_running && ptr[0] == 'D') || !evex_is_running) + { + DBUG_PRINT("info", ("Dropping %s.%s", et_db, ett.name.str)); + if ((ret= table->file->ha_delete_row(table->record[0]))) + { + my_error(ER_EVENT_DROP_FAILED, MYF(0), ett.name.str); + goto end; + } + } + } + } + DBUG_PRINT("info",("Disk checked for disabled events. Finishing.")); + +end: + VOID(pthread_mutex_unlock(&LOCK_evex_running)); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + end_read_record(&read_record_info); + + thd->version--; // Force close to free memory + + close_thread_tables(thd); + + DBUG_RETURN(ret); +} diff --git a/sql/event.h b/sql/event.h index 9218d34f6bcaf9c8853d746321e893e584788ce6..f6e1ecd3188add7c70ad83727be674c3e2096b40 100644 --- a/sql/event.h +++ b/sql/event.h @@ -79,6 +79,8 @@ class event_timed { event_timed(const event_timed &); /* Prevent use of these */ void operator=(event_timed &); + my_bool in_spawned_thread; + ulong locked_by_thread_id; my_bool running; pthread_mutex_t LOCK_running; @@ -117,13 +119,14 @@ class event_timed bool free_sphead_on_delete; uint flags;//all kind of purposes - event_timed():running(0), status_changed(false), last_executed_changed(false), - expression(0), created(0), modified(0), - on_completion(MYSQL_EVENT_ON_COMPLETION_DROP), - status(MYSQL_EVENT_ENABLED), sphead(0), sql_mode(0), - body_begin(0), dropped(false), free_sphead_on_delete(true), - flags(0) - + event_timed():in_spawned_thread(0),locked_by_thread_id(0), + running(0), status_changed(false), + last_executed_changed(false), expression(0), created(0), + modified(0), on_completion(MYSQL_EVENT_ON_COMPLETION_DROP), + status(MYSQL_EVENT_ENABLED), sphead(0), sql_mode(0), + body_begin(0), dropped(false), + free_sphead_on_delete(true), flags(0) + { pthread_mutex_init(&this->LOCK_running, MY_MUTEX_INIT_FAST); init(); @@ -200,7 +203,44 @@ class event_timed return ret; } - void free_sp() + /* + Checks whether the object is being used in a spawned thread. + This method is for very basic checking. Use ::can_spawn_now_n_lock() + for most of the cases. + */ + + my_bool + can_spawn_now() + { + my_bool ret; + VOID(pthread_mutex_lock(&this->LOCK_running)); + ret= !in_spawned_thread; + VOID(pthread_mutex_unlock(&this->LOCK_running)); + return ret; + } + + /* + Checks whether this thread can lock the object for modification -> + preventing being spawned for execution, and locks if possible. + use ::can_spawn_now() only for basic checking because a race + condition may occur between the check and eventual modification (deletion) + of the object. + */ + + my_bool + can_spawn_now_n_lock(THD *thd); + + int + spawn_unlock(THD *thd); + + int + spawn_now(void * (*thread_func)(void*)); + + void + spawn_thread_finish(THD *thd); + + void + free_sp() { delete sphead; sphead= 0; @@ -239,6 +279,10 @@ event_reconstruct_interval_expression(String *buf, interval_type interval, longlong expression); +int +evex_drop_db_events(THD *thd, char *db); + + int init_events(); diff --git a/sql/event_executor.cc b/sql/event_executor.cc index 43be372e96ccd07b5a9eaf48675be4f52f8cb54c..9483c2ab1654cc584e5a9c8eecccf0cca560d5b6 100644 --- a/sql/event_executor.cc +++ b/sql/event_executor.cc @@ -18,6 +18,11 @@ #include "event.h" #include "sp.h" +#define WAIT_STATUS_READY 0 +#define WAIT_STATUS_EMPTY_QUEUE 1 +#define WAIT_STATUS_NEW_TOP_EVENT 2 +#define WAIT_STATUS_STOP_EXECUTOR 3 + /* Make this define DBUG_FAULTY_THR to be able to put breakpoints inside @@ -294,6 +299,85 @@ init_event_thread(THD* thd) } +/* + This function waits till the time next event in the queue should be + executed. + + Returns + WAIT_STATUS_READY There is an event to be executed right now + WAIT_STATUS_EMPTY_QUEUE No events or the last event was dropped. + WAIT_STATUS_NEW_TOP_EVENT New event has entered the queue and scheduled + on top. Restart ticking. + WAIT_STATUS_STOP_EXECUTOR The thread was killed or SET global event_scheduler=0; +*/ + +static int +executor_wait_till_next_event_exec(THD *thd) +{ + event_timed *et; + TIME time_now; + int t2sleep; + + DBUG_ENTER("executor_wait_till_next_event_exec"); + /* + now let's see how much time to sleep, we know there is at least 1 + element in the queue. + */ + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + { + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + DBUG_RETURN(1); + } + et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); + DBUG_ASSERT(et); + if (et->status == MYSQL_EVENT_DISABLED) + { + DBUG_PRINT("evex main thread",("Now it is disabled-exec no more")); + if (et->dropped) + et->drop(thd); + delete et; + evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + sql_print_information("Event found disabled, dropping."); + DBUG_RETURN(1); + } + + DBUG_PRINT("evex main thread",("computing time to sleep till next exec")); + // set the internal clock of thd + thd->end_time(); + my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start()); + t2sleep= evex_time_diff(&et->execute_at, &time_now); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + + DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays")); + if (t2sleep > 0) + { + /* + We sleep t2sleep seconds but we check every second whether this thread + has been killed, or there is a new candidate + */ + while (t2sleep-- && !thd->killed && event_executor_running_global_var && + evex_queue_num_elements(EVEX_EQ_NAME) && + (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et)) + { + DBUG_PRINT("evex main thread",("will sleep a bit more")); + my_sleep(1000000); + } + } + + int ret= 0; + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + ret= 1; + else if (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) != et) + ret= 2; + if (thd->killed && event_executor_running_global_var) + ret= 3; + + DBUG_RETURN(ret); +} + + /* The main scheduler thread. Inits the priority queue on start and destroys it on thread shutdown. Forks child threads for every event @@ -313,9 +397,9 @@ pthread_handler_t event_executor_main(void *arg) { THD *thd; /* needs to be first for thread_stack */ - ulonglong iter_num= 0; uint i=0, j=0; my_ulonglong cnt= 0; + TIME time_now; DBUG_ENTER("event_executor_main"); DBUG_PRINT("event_executor_main", ("EVEX thread started")); @@ -330,15 +414,16 @@ event_executor_main(void *arg) if (sizeof(my_time_t) != sizeof(time_t)) { - sql_print_error("sizeof(my_time_t) != sizeof(time_t) ." + sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." "The scheduler will not work correctly. Stopping."); + DBUG_ASSERT(0); goto err_no_thd; } //TODO Andrey: Check for NULL if (!(thd = new THD)) // note that contructor of THD uses DBUG_ ! { - sql_print_error("Cannot create THD for event_executor_main"); + sql_print_error("SCHEDULER: Cannot create THD for the main thread."); goto err_no_thd; } thd->thread_stack = (char*)&thd; // remember where our stack is @@ -346,7 +431,7 @@ event_executor_main(void *arg) pthread_detach_this_thread(); if (init_event_thread(thd)) - goto err; + goto finish; /* make this thread visible it has no vio -> show processlist won't see it @@ -360,7 +445,7 @@ event_executor_main(void *arg) thread_running++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); - DBUG_PRINT("EVEX main thread", ("Initing events_queuey")); + DBUG_PRINT("EVEX main thread", ("Initing events_queue")); /* eventually manifest that we are running, not to crashe because of @@ -376,15 +461,14 @@ event_executor_main(void *arg) thd->security_ctx->user= my_strdup("event_scheduler", MYF(0)); if (evex_load_events_from_db(thd)) - goto err; + goto finish; evex_main_thread_id= thd->thread_id; - sql_print_information("Scheduler thread started"); + sql_print_information("SCHEDULER: Main thread started"); while (!thd->killed) { TIME time_now; - my_time_t now; event_timed *et; cnt++; @@ -393,7 +477,7 @@ event_executor_main(void *arg) thd->proc_info = "Sleeping"; if (!event_executor_running_global_var) { - sql_print_information("Scheduler asked to stop."); + sql_print_information("SCHEDULER: Asked to stop."); break; } @@ -402,62 +486,31 @@ event_executor_main(void *arg) my_sleep(1000000);// sleep 1s continue; } - - { - int t2sleep; - /* - now let's see how much time to sleep, we know there is at least 1 - element in the queue. - */ - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - if (!evex_queue_num_elements(EVEX_EQ_NAME)) - { - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - continue; - } - et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); - if (et->status == MYSQL_EVENT_DISABLED) - { - DBUG_PRINT("evex main thread",("Now it is disabled-exec no more")); - if (et->dropped) - et->drop(thd); - delete et; - evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - sql_print_information("Event found disabled, dropping."); - continue; - } - - DBUG_PRINT("evex main thread",("computing time to sleep till next exec")); - time((time_t *)&now); - my_tz_UTC->gmt_sec_to_TIME(&time_now, now); - t2sleep= evex_time_diff(&et->execute_at, &time_now); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - - DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays")); - if (t2sleep > 0) - { - /* - We sleep t2sleep seconds but we check every second whether this thread - has been killed, or there is a new candidate - */ - while (t2sleep-- && !thd->killed && event_executor_running_global_var && - evex_queue_num_elements(EVEX_EQ_NAME) && - (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et)) - { - DBUG_PRINT("evex main thread",("will sleep a bit more")); - my_sleep(1000000); - } - } - if (!event_executor_running_global_var) - { - sql_print_information("Scheduler asked to stop."); - break; - } + +restart_ticking: + switch (executor_wait_till_next_event_exec(thd)) { + case WAIT_STATUS_READY: // time to execute the event on top + DBUG_PRINT("evex main thread",("time to execute an event")); + break; + case WAIT_STATUS_EMPTY_QUEUE: // no more events + DBUG_PRINT("evex main thread",("no more events")); + continue; + break; + case WAIT_STATUS_NEW_TOP_EVENT: // new event on top in the queue + DBUG_PRINT("evex main thread",("restart ticking")); + goto restart_ticking; + case WAIT_STATUS_STOP_EXECUTOR: + sql_print_information("SCHEDULER: Asked to stop."); + goto finish; + break; + default: + DBUG_ASSERT(0); } VOID(pthread_mutex_lock(&LOCK_event_arrays)); + thd->end_time(); + my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start()); if (!evex_queue_num_elements(EVEX_EQ_NAME)) { @@ -479,14 +532,14 @@ event_executor_main(void *arg) DBUG_PRINT("evex main thread",("it's right time")); if (et->status == MYSQL_EVENT_ENABLED) { - pthread_t th; + int fork_ret_code; DBUG_PRINT("evex main thread", ("[%10s] this exec at [%llu]", et->name.str, TIME_to_ulonglong_datetime(&et->execute_at))); et->mark_last_executed(thd); if (et->compute_next_execution_time()) { - sql_print_error("Error while computing time of %s.%s . " + sql_print_error("SCHEDULER: Error while computing time of %s.%s . " "Disabling after execution.", et->dbname.str, et->name.str); et->status= MYSQL_EVENT_DISABLED; @@ -495,13 +548,23 @@ event_executor_main(void *arg) TIME_to_ulonglong_datetime(&et->execute_at))); et->update_fields(thd); - ++iter_num; - DBUG_PRINT("info", (" Spawning a thread %d", iter_num)); #ifndef DBUG_FAULTY_THR - if (pthread_create(&th,&connection_attrib,event_executor_worker,(void*)et)) - { - sql_print_error("Problem while trying to create a thread"); - UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err); + thread_safe_increment(workers_count, &LOCK_workers_count); + switch ((fork_ret_code= et->spawn_now(event_executor_worker))) { + case EVENT_EXEC_CANT_FORK: + thread_safe_decrement(workers_count, &LOCK_workers_count); + sql_print_error("SCHEDULER: Problem while trying to create a thread"); + UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, finish); + case EVENT_EXEC_ALREADY_EXEC: + thread_safe_decrement(workers_count, &LOCK_workers_count); + sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", + et->dbname.str, et->name.str); + break; + default: + DBUG_ASSERT(!fork_ret_code); + if (fork_ret_code) + thread_safe_decrement(workers_count, &LOCK_workers_count); + break; } #else event_executor_worker((void *) et); @@ -511,22 +574,21 @@ event_executor_main(void *arg) et->flags |= EVENT_EXEC_NO_MORE; if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) - evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top + evex_queue_delete_element(&EVEX_EQ_NAME, 0);// 0 is top, internally 1 else evex_queue_first_updated(&EVEX_EQ_NAME); } DBUG_PRINT("evex main thread",("unlocking")); VOID(pthread_mutex_unlock(&LOCK_event_arrays)); }// while +finish: -err: // First manifest that this thread does not work and then destroy VOID(pthread_mutex_lock(&LOCK_evex_running)); evex_is_running= false; evex_main_thread_id= 0; VOID(pthread_mutex_unlock(&LOCK_evex_running)); - sql_print_information("Event scheduler stopping. Waiting for worker threads to finish."); /* TODO: A better will be with a conditional variable @@ -535,21 +597,33 @@ event_executor_main(void *arg) Read workers_count without lock, no need for locking. In the worst case we have to wait 1sec more. */ - while (workers_count) - my_sleep(1000000);// 1s + sql_print_information("SCHEDULER: Stopping. Waiting for worker threads to finish."); + while (1) + { + VOID(pthread_mutex_lock(&LOCK_workers_count)); + if (!workers_count) + { + VOID(pthread_mutex_unlock(&LOCK_workers_count)); + break; + } + VOID(pthread_mutex_unlock(&LOCK_workers_count)); + my_sleep(1000000);// 1s + } /* - LEX_STRINGs reside in the memory root and will be destroyed with it. - Hence no need of delete but only freeing of SP + First we free all objects ... + Lock because a DROP DATABASE could be running in parallel and it locks on these */ - // First we free all objects ... + sql_print_information("SCHEDULER: Emptying the queue."); + VOID(pthread_mutex_lock(&LOCK_event_arrays)); for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) { event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*); et->free_sp(); delete et; } - // ... then we can thras the whole queue at once + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + // ... then we can thrash the whole queue at once evex_queue_destroy(&EVEX_EQ_NAME); thd->proc_info = "Clearing"; @@ -573,7 +647,7 @@ event_executor_main(void *arg) VOID(pthread_mutex_unlock(&LOCK_evex_running)); free_root(&evex_mem_root, MYF(0)); - sql_print_information("Event scheduler stopped."); + sql_print_information("SCHEDULER: Stopped."); #ifndef DBUG_FAULTY_THR my_thread_end(); @@ -600,9 +674,6 @@ event_executor_worker(void *event_void) MEM_ROOT worker_mem_root; DBUG_ENTER("event_executor_worker"); - VOID(pthread_mutex_lock(&LOCK_workers_count)); - ++workers_count; - VOID(pthread_mutex_unlock(&LOCK_workers_count)); init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); @@ -611,7 +682,7 @@ event_executor_worker(void *event_void) if (!(thd = new THD)) // note that contructor of THD uses DBUG_ ! { - sql_print_error("Cannot create a THD structure in a scheduler worker thread"); + sql_print_error("SCHEDULER: Cannot create a THD structure in an worker."); goto err_no_thd; } thd->thread_stack = (char*)&thd; // remember where our stack is @@ -653,14 +724,7 @@ event_executor_worker(void *event_void) event->dbname.str, event->name.str, event->definer.str); } - if ((event->flags & EVENT_EXEC_NO_MORE) || event->status==MYSQL_EVENT_DISABLED) - { - DBUG_PRINT("event_executor_worker", - ("%s exec no more. to drop=%d",event->name.str, event->dropped)); - if (event->dropped) - event->drop(thd); - delete event; - } + event->spawn_thread_finish(thd); err: @@ -689,10 +753,7 @@ event_executor_worker(void *event_void) err_no_thd: free_root(&worker_mem_root, MYF(0)); - - VOID(pthread_mutex_lock(&LOCK_workers_count)); - --workers_count; - VOID(pthread_mutex_unlock(&LOCK_workers_count)); + thread_safe_decrement(workers_count, &LOCK_workers_count); #ifndef DBUG_FAULTY_THR my_thread_end(); @@ -733,7 +794,7 @@ evex_load_events_from_db(THD *thd) if ((ret= evex_open_event_table(thd, TL_READ, &table))) { - sql_print_error("Table mysql.event is damaged."); + sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); DBUG_RETURN(SP_OPEN_TABLE_FAILED); } @@ -753,7 +814,7 @@ evex_load_events_from_db(THD *thd) if ((ret= et->load_from_row(&evex_mem_root, table))) { - sql_print_error("Error while loading from mysql.event. " + sql_print_error("SCHEDULER: Error while loading from mysql.event. " "Table probably corrupted"); goto end; } @@ -769,7 +830,7 @@ evex_load_events_from_db(THD *thd) if ((ret= et->compile(thd, &evex_mem_root))) { - sql_print_error("Error while compiling %s.%s. Aborting load.", + sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", et->dbname.str, et->name.str); goto end; } @@ -777,8 +838,8 @@ evex_load_events_from_db(THD *thd) // let's find when to be executed if (et->compute_next_execution_time()) { - sql_print_error("Error while computing execution time of %s.%s. Skipping", - et->dbname.str, et->name.str); + sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." + " Skipping", et->dbname.str, et->name.str); continue; } @@ -799,7 +860,7 @@ evex_load_events_from_db(THD *thd) thd->version--; // Force close to free memory close_thread_tables(thd); - sql_print_information("Scheduler loaded %d event%s", count, (count == 1)?"":"s"); + sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); DBUG_RETURN(ret); diff --git a/sql/event_priv.h b/sql/event_priv.h index 0d4cf3a29935e0b72d9869cf091488e69486c5e0..4f575a962126eb2668a076659cec332c772eb080 100644 --- a/sql/event_priv.h +++ b/sql/event_priv.h @@ -19,6 +19,10 @@ #include "mysql_priv.h" +#define EVENT_EXEC_STARTED 0 +#define EVENT_EXEC_ALREADY_EXEC 1 +#define EVENT_EXEC_CANT_FORK 2 + #define EVEX_USE_QUEUE #define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \ @@ -32,10 +36,10 @@ int my_time_compare(TIME *a, TIME *b); int -evex_db_find_event_aux(THD *thd, const LEX_STRING dbname, - const LEX_STRING rname, - const LEX_STRING definer, - TABLE *table); +evex_db_find_event_by_name(THD *thd, const LEX_STRING dbname, + const LEX_STRING ev_name, + const LEX_STRING user_name, + TABLE *table); int event_timed_compare_q(void *vptr, byte* a, byte *b); diff --git a/sql/event_timed.cc b/sql/event_timed.cc index 34e9b50f7a753ef77db1b8baf3e561dfc8a9feb7..d76a9777d4a07a499060386af645475866165500 100644 --- a/sql/event_timed.cc +++ b/sql/event_timed.cc @@ -908,7 +908,7 @@ event_timed::drop(THD *thd) Saves status and last_executed_at to the disk if changed. SYNOPSIS - event_timed::drop() + event_timed::update_fields() thd - thread context RETURN VALUE @@ -945,7 +945,7 @@ event_timed::update_fields(THD *thd) } - if ((ret= evex_db_find_event_aux(thd, dbname, name, definer, table))) + if ((ret= evex_db_find_event_by_name(thd, dbname, name, definer, table))) goto done; store_record(table,record[1]); @@ -1204,6 +1204,7 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root) MEM_ROOT *tmp_mem_root= 0; LEX *old_lex= thd->lex, lex; char *old_db; + int old_db_length; event_timed *ett; sp_name *spn; char *old_query; @@ -1237,7 +1238,9 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root) old_query_len= thd->query_length; old_query= thd->query; old_db= thd->db; + old_db_length= thd->db_length; thd->db= dbname.str; + thd->db_length= dbname.length; get_create_event(thd, &show_create); @@ -1303,3 +1306,135 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root) DBUG_RETURN(ret); } + +/* + Checks whether this thread can lock the object for modification -> + preventing being spawned for execution, and locks if possible. + use ::can_spawn_now() only for basic checking because a race + condition may occur between the check and eventual modification (deletion) + of the object. + + Returns + true - locked + false - cannot lock +*/ + +my_bool +event_timed::can_spawn_now_n_lock(THD *thd) +{ + my_bool ret= FALSE; + VOID(pthread_mutex_lock(&this->LOCK_running)); + if (!in_spawned_thread) + { + in_spawned_thread= TRUE; + ret= TRUE; + locked_by_thread_id= thd->thread_id; + } + VOID(pthread_mutex_unlock(&this->LOCK_running)); + return ret; +} + + +extern pthread_attr_t connection_attrib; + +/* + Checks whether is possible and forks a thread. Passes self as argument. + + Returns + EVENT_EXEC_STARTED - OK + EVENT_EXEC_ALREADY_EXEC - Thread not forked, already working + EVENT_EXEC_CANT_FORK - Unable to spawn thread (error) +*/ + +int +event_timed::spawn_now(void * (*thread_func)(void*)) +{ + int ret= EVENT_EXEC_STARTED; + static uint exec_num= 0; + DBUG_ENTER("event_timed::spawn_now"); + DBUG_PRINT("info", ("[%s.%s]", dbname.str, name.str)); + + VOID(pthread_mutex_lock(&this->LOCK_running)); + if (!in_spawned_thread) + { + pthread_t th; + in_spawned_thread= true; + if (pthread_create(&th, &connection_attrib, thread_func, (void*)this)) + { + DBUG_PRINT("info", ("problem while spawning thread")); + ret= EVENT_EXEC_CANT_FORK; + in_spawned_thread= false; + } +#ifndef DBUG_OFF + else + { + sql_print_information("SCHEDULER: Started thread %d", ++exec_num); + DBUG_PRINT("info", ("thread spawned")); + } +#endif + } + else + { + DBUG_PRINT("info", ("already in spawned thread. skipping")); + ret= EVENT_EXEC_ALREADY_EXEC; + } + VOID(pthread_mutex_unlock(&this->LOCK_running)); + + DBUG_RETURN(ret); +} + + +void +event_timed::spawn_thread_finish(THD *thd) +{ + DBUG_ENTER("event_timed::spawn_thread_finish"); + VOID(pthread_mutex_lock(&this->LOCK_running)); + in_spawned_thread= false; + if ((flags & EVENT_EXEC_NO_MORE) || status == MYSQL_EVENT_DISABLED) + { + DBUG_PRINT("info", ("%s exec no more. to drop=%d", name.str, dropped)); + if (dropped) + drop(thd); + VOID(pthread_mutex_unlock(&this->LOCK_running)); + delete this; + DBUG_VOID_RETURN; + } + VOID(pthread_mutex_unlock(&this->LOCK_running)); + DBUG_VOID_RETURN; +} + + +/* + Unlocks the object after it has been locked with ::can_spawn_now_n_lock() + + Returns + 0 - ok + 1 - not locked by this thread + +*/ + + +int +event_timed::spawn_unlock(THD *thd) +{ + int ret= 0; + VOID(pthread_mutex_lock(&this->LOCK_running)); + if (!in_spawned_thread) + { + if (locked_by_thread_id == thd->thread_id) + { + in_spawned_thread= FALSE; + locked_by_thread_id= 0; + } + else + { + sql_print_error("A thread tries to unlock when he hasn't locked. " + "thread_id=%ld locked by %ld", + thd->thread_id, locked_by_thread_id); + DBUG_ASSERT(0); + ret= 1; + } + } + VOID(pthread_mutex_unlock(&this->LOCK_running)); + return ret; +} diff --git a/sql/sql_db.cc b/sql/sql_db.cc index a7a7327bb87f2815ab954caa075d6846993742a8..885643cf8991abd67d2e9e82f739d8daa0faba55 100644 --- a/sql/sql_db.cc +++ b/sql/sql_db.cc @@ -20,6 +20,7 @@ #include "mysql_priv.h" #include <mysys_err.h> #include "sp.h" +#include "event.h" #include <my_dir.h> #include <m_ctype.h> #ifdef __WIN__ @@ -870,6 +871,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent) exit: (void)sp_drop_db_routines(thd, db); /* QQ Ignore errors for now */ + (void)evex_drop_db_events(thd, db); /* QQ Ignore errors for now */ start_waiting_global_read_lock(thd); /* If this database was the client's selected database, we silently change the