Commit b95f342f authored by andrey@lmy004's avatar andrey@lmy004

WL #1034 (update)

- improve the stability of the executor
- make create event if not exists work as before
parent fa9f2a77
...@@ -71,9 +71,8 @@ MEM_ROOT evex_mem_root; ...@@ -71,9 +71,8 @@ MEM_ROOT evex_mem_root;
void void
evex_queue_init(EVEX_QUEUE_TYPE *queue) evex_queue_init(EVEX_QUEUE_TYPE *queue)
{ {
if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/, if (init_queue_ex(queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
0 /*smallest_on_top*/, event_timed_compare_q, NULL, event_timed_compare_q, NULL, 30 /*auto_extent*/))
100 /*auto_extent*/))
sql_print_error("Insufficient memory to initialize executing queue."); sql_print_error("Insufficient memory to initialize executing queue.");
} }
...@@ -81,8 +80,7 @@ evex_queue_init(EVEX_QUEUE_TYPE *queue) ...@@ -81,8 +80,7 @@ evex_queue_init(EVEX_QUEUE_TYPE *queue)
static static
int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs) int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
{ {
return cs->coll->strnncollsp(cs, return cs->coll->strnncollsp(cs, (unsigned char *) s.str,s.length,
(unsigned char *) s.str,s.length,
(unsigned char *) t.str,t.length, 0); (unsigned char *) t.str,t.length, 0);
} }
...@@ -90,94 +88,29 @@ int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs) ...@@ -90,94 +88,29 @@ int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
int int
my_time_compare(TIME *a, TIME *b) my_time_compare(TIME *a, TIME *b)
{ {
/* my_ulonglong a_t= TIME_to_ulonglong_datetime(a)*100L + a->second_part;
Or maybe it is faster to use TIME_to_ulonglong_datetime my_ulonglong b_t= TIME_to_ulonglong_datetime(b)*100L + b->second_part;
for "a" and "b"
*/
DBUG_ENTER("my_time_compare");
if (a->year > b->year)
DBUG_RETURN(1);
if (a->year < b->year)
DBUG_RETURN(-1);
if (a->month > b->month)
DBUG_RETURN(1);
if (a->month < b->month)
DBUG_RETURN(-1);
if (a->day > b->day)
DBUG_RETURN(1);
if (a->day < b->day)
DBUG_RETURN(-1);
if (a->hour > b->hour)
DBUG_RETURN(1);
if (a->hour < b->hour)
DBUG_RETURN(-1);
if (a->minute > b->minute)
DBUG_RETURN(1);
if (a->minute < b->minute)
DBUG_RETURN(-1);
if (a->second > b->second)
DBUG_RETURN(1);
if (a->second < b->second)
DBUG_RETURN(-1);
if (a->second_part > b->second_part)
DBUG_RETURN(1);
if (a->second_part < b->second_part)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
int
evex_time_diff(TIME *a, TIME *b)
{
my_bool in_gap;
DBUG_ENTER("my_time_diff");
return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b);
}
inline int
event_timed_compare(event_timed **a, event_timed **b)
{
my_ulonglong a_t, b_t;
a_t= TIME_to_ulonglong_datetime(&(*a)->execute_at)*100L +
(*a)->execute_at.second_part;
b_t= TIME_to_ulonglong_datetime(&(*b)->execute_at)*100L +
(*b)->execute_at.second_part;
if (a_t > b_t) if (a_t > b_t)
return 1; return 1;
else if (a_t < b_t) else if (a_t < b_t)
return -1; return -1;
else
return 0; return 0;
}
inline int
event_timed_compare(event_timed *a, event_timed *b)
{
return my_time_compare(&a->execute_at, &b->execute_at);
} }
int int
event_timed_compare_q(void *vptr, byte* a, byte *b) event_timed_compare_q(void *vptr, byte* a, byte *b)
{ {
return event_timed_compare((event_timed **)&a, (event_timed **)&b); return event_timed_compare((event_timed *)a, (event_timed *)b);
} }
...@@ -371,6 +304,8 @@ evex_fill_row(THD *thd, TABLE *table, event_timed *et, my_bool is_update) ...@@ -371,6 +304,8 @@ evex_fill_row(THD *thd, TABLE *table, event_timed *et, my_bool is_update)
db_create_event() db_create_event()
thd THD thd THD
et event_timed object containing information for the event et event_timed object containing information for the event
create_if_not - if an warning should be generated in case event exists
rows_affected - how many rows were affected
Return value Return value
0 - OK 0 - OK
...@@ -381,9 +316,10 @@ evex_fill_row(THD *thd, TABLE *table, event_timed *et, my_bool is_update) ...@@ -381,9 +316,10 @@ evex_fill_row(THD *thd, TABLE *table, event_timed *et, my_bool is_update)
*/ */
static int static int
db_create_event(THD *thd, event_timed *et) db_create_event(THD *thd, event_timed *et, my_bool create_if_not,
uint *rows_affected)
{ {
int ret= EVEX_OK; int ret= 0;
TABLE *table; TABLE *table;
char definer[HOSTNAME_LENGTH+USERNAME_LENGTH+2]; char definer[HOSTNAME_LENGTH+USERNAME_LENGTH+2];
char olddb[128]; char olddb[128];
...@@ -391,7 +327,7 @@ db_create_event(THD *thd, event_timed *et) ...@@ -391,7 +327,7 @@ db_create_event(THD *thd, event_timed *et)
DBUG_ENTER("db_create_event"); DBUG_ENTER("db_create_event");
DBUG_PRINT("enter", ("name: %.*s", et->name.length, et->name.str)); DBUG_PRINT("enter", ("name: %.*s", et->name.length, et->name.str));
*rows_affected= 0;
DBUG_PRINT("info", ("open mysql.event for update")); DBUG_PRINT("info", ("open mysql.event for update"));
if (evex_open_event_table(thd, TL_WRITE, &table)) if (evex_open_event_table(thd, TL_WRITE, &table))
{ {
...@@ -402,8 +338,10 @@ db_create_event(THD *thd, event_timed *et) ...@@ -402,8 +338,10 @@ db_create_event(THD *thd, event_timed *et)
DBUG_PRINT("info", ("check existance of an event with the same name")); DBUG_PRINT("info", ("check existance of an event with the same name"));
if (!evex_db_find_event_aux(thd, et->dbname, et->name, table)) if (!evex_db_find_event_aux(thd, et->dbname, et->name, table))
{ {
my_error(ER_EVENT_ALREADY_EXISTS, MYF(0), et->name.str); push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
goto err; ER_EVENT_ALREADY_EXISTS, ER(ER_EVENT_ALREADY_EXISTS),
et->name.str);
goto ok;
} }
DBUG_PRINT("info", ("non-existant, go forward")); DBUG_PRINT("info", ("non-existant, go forward"));
...@@ -462,6 +400,8 @@ db_create_event(THD *thd, event_timed *et) ...@@ -462,6 +400,8 @@ db_create_event(THD *thd, event_timed *et)
mysql_bin_log.write(&qinfo); mysql_bin_log.write(&qinfo);
} }
*rows_affected= 1;
ok:
if (dbchanged) if (dbchanged)
(void) mysql_change_db(thd, olddb, 1); (void) mysql_change_db(thd, olddb, 1);
if (table) if (table)
...@@ -755,6 +695,7 @@ done: ...@@ -755,6 +695,7 @@ done:
et event's data et event's data
create_options Options specified when in the query. We are create_options Options specified when in the query. We are
interested whether there is IF NOT EXISTS interested whether there is IF NOT EXISTS
rows_affected How many rows were affected
NOTES NOTES
- in case there is an event with the same name (db) and - in case there is an event with the same name (db) and
...@@ -762,7 +703,8 @@ done: ...@@ -762,7 +703,8 @@ done:
*/ */
int int
evex_create_event(THD *thd, event_timed *et, uint create_options) evex_create_event(THD *thd, event_timed *et, uint create_options,
uint *rows_affected)
{ {
int ret = 0; int ret = 0;
...@@ -770,22 +712,9 @@ evex_create_event(THD *thd, event_timed *et, uint create_options) ...@@ -770,22 +712,9 @@ evex_create_event(THD *thd, event_timed *et, uint create_options)
DBUG_PRINT("enter", ("name: %*s options:%d", et->name.length, DBUG_PRINT("enter", ("name: %*s options:%d", et->name.length,
et->name.str, create_options)); et->name.str, create_options));
if ((ret = db_create_event(thd, et)) == EVEX_WRITE_ROW_FAILED && if ((ret = db_create_event(thd, et,
(create_options & HA_LEX_CREATE_IF_NOT_EXISTS)) create_options & HA_LEX_CREATE_IF_NOT_EXISTS,
{ rows_affected)))
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS),
"EVENT", et->name.str);
ret= 0;
goto done;
}
/*
A warning is thrown only when create_options is set to
HA_LEX_CREATE_IF_NOT_EXISTS. In this case if EVEX_WRITE_ROW_FAILED,
which means that we have duplicated key -> warning. In all
other cases -> error.
*/
if (ret)
goto done; goto done;
VOID(pthread_mutex_lock(&LOCK_evex_running)); VOID(pthread_mutex_lock(&LOCK_evex_running));
...@@ -819,7 +748,8 @@ done: ...@@ -819,7 +748,8 @@ done:
*/ */
int int
evex_update_event(THD *thd, event_timed *et, sp_name *new_name) evex_update_event(THD *thd, event_timed *et, sp_name *new_name,
uint *rows_affected)
{ {
int ret, i; int ret, i;
bool need_second_pass= true; bool need_second_pass= true;
...@@ -873,7 +803,8 @@ done: ...@@ -873,7 +803,8 @@ done:
*/ */
int int
evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists) evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists,
uint *rows_affected)
{ {
TABLE *table; TABLE *table;
int ret= EVEX_OPEN_TABLE_FAILED; int ret= EVEX_OPEN_TABLE_FAILED;
......
...@@ -173,13 +173,16 @@ public: ...@@ -173,13 +173,16 @@ public:
int int
evex_create_event(THD *thd, event_timed *et, uint create_options); evex_create_event(THD *thd, event_timed *et, uint create_options,
uint *rows_affected);
int int
evex_update_event(THD *thd, event_timed *et, sp_name *new_name); evex_update_event(THD *thd, event_timed *et, sp_name *new_name,
uint *rows_affected);
int int
evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists); evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists,
uint *rows_affected);
int int
......
...@@ -61,8 +61,15 @@ event_executor_worker(void *arg); ...@@ -61,8 +61,15 @@ event_executor_worker(void *arg);
pthread_handler_t pthread_handler_t
event_executor_main(void *arg); event_executor_main(void *arg);
static static int
void evex_init_mutexes() evex_time_diff(TIME *a, TIME *b)
{
return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b);
}
static void
evex_init_mutexes()
{ {
if (evex_mutexes_initted) if (evex_mutexes_initted)
return; return;
...@@ -239,8 +246,6 @@ event_executor_main(void *arg) ...@@ -239,8 +246,6 @@ event_executor_main(void *arg)
{ {
int t2sleep; int t2sleep;
/* /*
now let's see how much time to sleep, we know there is at least 1 now let's see how much time to sleep, we know there is at least 1
element in the queue. element in the queue.
...@@ -272,7 +277,7 @@ event_executor_main(void *arg) ...@@ -272,7 +277,7 @@ event_executor_main(void *arg)
{ {
/* /*
We sleep t2sleep seconds but we check every second whether this thread We sleep t2sleep seconds but we check every second whether this thread
has been killed, or there is new candidate has been killed, or there is a new candidate
*/ */
while (t2sleep-- && !thd->killed && while (t2sleep-- && !thd->killed &&
evex_queue_num_elements(EVEX_EQ_NAME) && evex_queue_num_elements(EVEX_EQ_NAME) &&
...@@ -308,10 +313,13 @@ event_executor_main(void *arg) ...@@ -308,10 +313,13 @@ event_executor_main(void *arg)
{ {
pthread_t th; pthread_t th;
printf("[%10s] exec at [%llu]\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
et->mark_last_executed();
et->compute_next_execution_time();
printf("[%10s] next at [%llu]\n\n\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
et->update_fields(thd);
DBUG_PRINT("info", (" Spawning a thread %d", ++iter_num)); DBUG_PRINT("info", (" Spawning a thread %d", ++iter_num));
// sql_print_information(" Spawning a thread %d", ++iter_num);
#ifndef DBUG_FAULTY_THR #ifndef DBUG_FAULTY_THR
// sql_print_information(" Thread is not debuggable!");
if (pthread_create(&th, NULL, event_executor_worker, (void*)et)) if (pthread_create(&th, NULL, event_executor_worker, (void*)et))
{ {
sql_print_error("Problem while trying to create a thread"); sql_print_error("Problem while trying to create a thread");
...@@ -320,24 +328,15 @@ event_executor_main(void *arg) ...@@ -320,24 +328,15 @@ event_executor_main(void *arg)
#else #else
event_executor_worker((void *) et); event_executor_worker((void *) et);
#endif #endif
printf("[%10s] exec at [%llu]\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
et->mark_last_executed();
et->compute_next_execution_time();
printf("[%10s] next at [%llu]\n\n\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
et->update_fields(thd);
if ((et->execute_at.year && !et->expression) || if ((et->execute_at.year && !et->expression) ||
TIME_to_ulonglong_datetime(&et->execute_at) == 0) TIME_to_ulonglong_datetime(&et->execute_at) == 0)
et->flags |= EVENT_EXEC_NO_MORE; et->flags |= EVENT_EXEC_NO_MORE;
}
if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
{
if (et->dropped)
et->drop(thd);
delete et;
evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
} else else
evex_queue_first_updated(&EVEX_EQ_NAME); evex_queue_first_updated(&EVEX_EQ_NAME);
}
VOID(pthread_mutex_unlock(&LOCK_event_arrays)); VOID(pthread_mutex_unlock(&LOCK_event_arrays));
}// while }// while
...@@ -454,7 +453,8 @@ event_executor_worker(void *event_void) ...@@ -454,7 +453,8 @@ event_executor_worker(void *event_void)
strxnmov(thd->security_ctx->priv_host, sizeof(thd->security_ctx->priv_host), strxnmov(thd->security_ctx->priv_host, sizeof(thd->security_ctx->priv_host),
event->definer_host.str, NullS); event->definer_host.str, NullS);
thd->security_ctx->user= thd->security_ctx->priv_user= my_strdup(event->definer_user.str, MYF(0)); thd->security_ctx->user= thd->security_ctx->priv_user=
my_strdup(event->definer_user.str, MYF(0));
thd->db= event->dbname.str; thd->db= event->dbname.str;
if (!check_access(thd, EVENT_ACL, event->dbname.str, 0, 0, 0, if (!check_access(thd, EVENT_ACL, event->dbname.str, 0, 0, 0,
...@@ -467,6 +467,13 @@ event_executor_worker(void *event_void) ...@@ -467,6 +467,13 @@ event_executor_worker(void *event_void)
sql_print_information(" EVEX EXECUTED event for event %s.%s [EXPR:%d]. RetCode=%d", event->dbname.str, event->name.str,(int) event->expression, ret); sql_print_information(" EVEX EXECUTED event for event %s.%s [EXPR:%d]. RetCode=%d", event->dbname.str, event->name.str,(int) event->expression, ret);
DBUG_PRINT("info", (" EVEX EXECUTED event for event %s.%s [EXPR:%d]. RetCode=%d", event->dbname.str, event->name.str,(int) event->expression, ret)); DBUG_PRINT("info", (" EVEX EXECUTED event for event %s.%s [EXPR:%d]. RetCode=%d", event->dbname.str, event->name.str,(int) event->expression, ret));
} }
if ((event->flags & EVENT_EXEC_NO_MORE) || event->status==MYSQL_EVENT_DISABLED)
{
if (event->dropped)
event->drop(thd);
delete event;
}
thd->db= 0; thd->db= 0;
err: err:
......
...@@ -59,10 +59,6 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table); ...@@ -59,10 +59,6 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
int int
event_timed_compare_q(void *vptr, byte* a, byte *b); event_timed_compare_q(void *vptr, byte* a, byte *b);
int
evex_time_diff(TIME *a, TIME *b);
#define EXEC_QUEUE_QUEUE_NAME executing_queue #define EXEC_QUEUE_QUEUE_NAME executing_queue
#define EXEC_QUEUE_DARR_NAME evex_executing_queue #define EXEC_QUEUE_DARR_NAME evex_executing_queue
......
...@@ -5722,7 +5722,7 @@ ER_DROP_PARTITION_WHEN_FK_DEFINED ...@@ -5722,7 +5722,7 @@ ER_DROP_PARTITION_WHEN_FK_DEFINED
ER_PLUGIN_IS_NOT_LOADED ER_PLUGIN_IS_NOT_LOADED
eng "Plugin '%-.64s' is not loaded" eng "Plugin '%-.64s' is not loaded"
ER_EVENT_ALREADY_EXISTS ER_EVENT_ALREADY_EXISTS
eng "Event %s already exists" eng "Event '%-.64s' already exists"
ER_EVENT_STORE_FAILED ER_EVENT_STORE_FAILED
eng "Failed to store event %s. Error code %d from storage engine." eng "Failed to store event %s. Error code %d from storage engine."
ER_EVENT_DOES_NOT_EXIST ER_EVENT_DOES_NOT_EXIST
......
...@@ -3675,6 +3675,7 @@ end_with_restore_list: ...@@ -3675,6 +3675,7 @@ end_with_restore_list:
case SQLCOM_ALTER_EVENT: case SQLCOM_ALTER_EVENT:
case SQLCOM_DROP_EVENT: case SQLCOM_DROP_EVENT:
{ {
uint rows_affected= 1;
DBUG_ASSERT(lex->et); DBUG_ASSERT(lex->et);
do { do {
if (! lex->et->dbname.str) if (! lex->et->dbname.str)
...@@ -3690,17 +3691,18 @@ end_with_restore_list: ...@@ -3690,17 +3691,18 @@ end_with_restore_list:
switch (lex->sql_command) { switch (lex->sql_command) {
case SQLCOM_CREATE_EVENT: case SQLCOM_CREATE_EVENT:
res= evex_create_event(thd, lex->et, (uint) lex->create_info.options); res= evex_create_event(thd, lex->et, (uint) lex->create_info.options,
&rows_affected);
break; break;
case SQLCOM_ALTER_EVENT: case SQLCOM_ALTER_EVENT:
res= evex_update_event(thd, lex->et, lex->spname); res= evex_update_event(thd, lex->et, lex->spname, &rows_affected);
break; break;
case SQLCOM_DROP_EVENT: case SQLCOM_DROP_EVENT:
res= evex_drop_event(thd, lex->et, lex->drop_if_exists); res= evex_drop_event(thd, lex->et, lex->drop_if_exists, &rows_affected);
default:; default:;
} }
if (!res) if (!res)
send_ok(thd, 1); send_ok(thd, rows_affected);
/* lex->unit.cleanup() is called outside, no need to call it here */ /* lex->unit.cleanup() is called outside, no need to call it here */
} while (0); } while (0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment