Commit f92086d0 authored by unknown's avatar unknown

WL#1034 update

(cleanups, leaks fixed)


sql/event.cc:
  - update comments
  - remove dead code
  - event_timed is no more copied after allocation on a DYNAMIC_ARRAY
    because there is a problem when the array is reallocated - we get
    dangling pointers from the scheduling queue. anyway it makes little
    sense to keep them there except that cleaning is quite efficient but
    iterating over all events and cleaning them one by one is not that bad
    considering that happens only when the main scheduler thread is killed
    or during server shutdown.
sql/event_executor.cc:
  - DYNAMIC_ARRAY is no more
sql/event_priv.h:
  - remove unneeded code/defines. the scheduler's queue is of 
    type QUEUE and cannot run anymore on top of DYNAMIC_ARRAY
sql/event_timed.cc:
  - after parsing (in ::compile() ) destruct the event_timed object
sql/sql_parse.cc:
  in case of syntax error clean up lex->et because there could be an object
  created. as in the code the same is done for SPs.
parent 112d408f
...@@ -23,8 +23,6 @@ ...@@ -23,8 +23,6 @@
- The default value of created/modified should not be 0000-00-00 because of - The default value of created/modified should not be 0000-00-00 because of
STRICT mode restricions. STRICT mode restricions.
- Use timestamps instead of datetime.
- CREATE EVENT should not go into binary log! Does it now? The SQL statements - CREATE EVENT should not go into binary log! Does it now? The SQL statements
issued by the EVENT are replicated. issued by the EVENT are replicated.
I have an idea how to solve the problem at failover. So the status field I have an idea how to solve the problem at failover. So the status field
...@@ -43,13 +41,14 @@ ...@@ -43,13 +41,14 @@
- Maybe move all allocations during parsing to evex_mem_root thus saving - Maybe move all allocations during parsing to evex_mem_root thus saving
double parsing in evex_create_event! double parsing in evex_create_event!
- If the server is killed (stopping) try to kill executing events.. - If the server is killed (stopping) try to kill executing events?
- What happens if one renames an event in the DB while it is in memory? - What happens if one renames an event in the DB while it is in memory?
Or even deleting it? Or even deleting it?
- Consider using conditional variable when doing shutdown instead of - Consider using conditional variable when doing shutdown instead of
waiting till all worker threads end. waiting till all worker threads end.
- Make event_timed::get_show_create_event() work - Make event_timed::get_show_create_event() work
- Add function documentation whenever needed. - Add function documentation whenever needed.
...@@ -58,10 +57,6 @@ ...@@ -58,10 +57,6 @@
- Move comparison code to class event_timed - Move comparison code to class event_timed
- Overload event_timed::new to put the event directly in the DYNAMIC_ARRAY.
This will skip copy operation as well as will simplify the code which is
now aware of events_array DYNAMIC_ARRAY
Warning: Warning:
- For now parallel execution is not possible because the same sp_head cannot be - For now parallel execution is not possible because the same sp_head cannot be
executed few times!!! There is still no lock attached to particular event. executed few times!!! There is still no lock attached to particular event.
...@@ -72,7 +67,6 @@ ...@@ -72,7 +67,6 @@
bool mysql_event_table_exists= 1; bool mysql_event_table_exists= 1;
DYNAMIC_ARRAY events_array;
QUEUE EVEX_EQ_NAME; QUEUE EVEX_EQ_NAME;
MEM_ROOT evex_mem_root; MEM_ROOT evex_mem_root;
...@@ -81,54 +75,12 @@ MEM_ROOT evex_mem_root; ...@@ -81,54 +75,12 @@ MEM_ROOT evex_mem_root;
void void
evex_queue_init(EVEX_QUEUE_TYPE *queue) evex_queue_init(EVEX_QUEUE_TYPE *queue)
{ {
#ifndef EVEX_USE_QUEUE
VOID(my_init_dynamic_array(queue, sizeof(event_timed *), 50, 100));
#else
if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/, if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/,
0 /*smallest_on_top*/, event_timed_compare_q, NULL, 0 /*smallest_on_top*/, event_timed_compare_q, NULL,
100 /*auto_extent*/)) 100 /*auto_extent*/))
sql_print_error("Insufficient memory to initialize executing queue."); sql_print_error("Insufficient memory to initialize executing queue.");
#endif
}
int
evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element)
{
#ifndef EVEX_USE_QUEUE
VOID(push_dynamic(queue, element));
return 0;
#else
return queue_insert_safe(queue, element);
#endif
} }
void
evex_queue_top_updated(EVEX_QUEUE_TYPE *queue)
{
#ifdef EVEX_USE_QUEUE
queue_replaced(queue);
#endif
}
void
evex_queue_sort(EVEX_QUEUE_TYPE *queue)
{
#ifndef EVEX_USE_QUEUE
qsort((gptr) dynamic_element(queue, 0, event_timed**),
queue->elements,
sizeof(event_timed **),
(qsort_cmp) event_timed_compare);
#endif
}
/* NOTE Andrey: Document better
Compares two TIME structures.
a > b -> 1
a = b -> 0
a < b -> -1
*/
static static
int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs) int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
...@@ -714,7 +666,7 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock) ...@@ -714,7 +666,7 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock)
{ {
int ret= 0; int ret= 0;
MEM_ROOT *tmp_mem_root; MEM_ROOT *tmp_mem_root;
event_timed *ett, *ett_copy; event_timed *ett;
DBUG_ENTER("db_load_and_compile_event"); DBUG_ENTER("db_load_and_compile_event");
DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str)); DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str));
...@@ -737,18 +689,12 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock) ...@@ -737,18 +689,12 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock)
if (use_lock) if (use_lock)
VOID(pthread_mutex_lock(&LOCK_event_arrays)); VOID(pthread_mutex_lock(&LOCK_event_arrays));
VOID(push_dynamic(&events_array,(gptr) ett)); evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett);
ett_copy= dynamic_element(&events_array, events_array.elements - 1,
event_timed*);
evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett_copy);
/* /*
There is a copy in the array which we don't need. sphead won't be There is a copy in the array which we don't need. sphead won't be
destroyed. destroyed.
*/ */
ett->free_sphead_on_delete= false;
delete ett;
if (use_lock) if (use_lock)
VOID(pthread_mutex_unlock(&LOCK_event_arrays)); VOID(pthread_mutex_unlock(&LOCK_event_arrays));
...@@ -783,43 +729,14 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock) ...@@ -783,43 +729,14 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock)
if (!sortcmp_lex_string(*name, et->name, system_charset_info) && if (!sortcmp_lex_string(*name, et->name, system_charset_info) &&
!sortcmp_lex_string(*db, et->dbname, system_charset_info)) !sortcmp_lex_string(*db, et->dbname, system_charset_info))
{ {
int idx= get_index_dynamic(&events_array, (gptr) et);
//we are lucky the event is in the executing queue, no need of second pass
//destruct first and then remove. the destructor will delete sp_head
et->free_sp(); et->free_sp();
delete_dynamic_element(&events_array, idx); delete et;
evex_queue_delete_element(&EVEX_EQ_NAME, i); evex_queue_delete_element(&EVEX_EQ_NAME, i);
// ok, we have cleaned // ok, we have cleaned
goto done; goto done;
} }
} }
/*
ToDo Andrey : Think about whether second pass is needed. All events
that are in memory are enabled. If an event is being
disabled (by a SQL stmt) it will be uncached. Hmm...
However is this true for events that has been
disabled because of another reason like - no need
to be executed because ENDS is in the past?
For instance, second_pass is needed when an event
was created as DISABLED but then altered as ENABLED.
*/
/*
we haven't found the event in the executing queue. This is nice! :)
Look for it in the events_array.
*/
for (i= 0; i < events_array.elements; ++i)
{
event_timed *et= dynamic_element(&events_array, i, event_timed*);
if (!sortcmp_lex_string(*name, et->name, system_charset_info) &&
!sortcmp_lex_string(*db, et->dbname, system_charset_info))
{
delete_dynamic_element(&events_array, i);
break;
}
}
done: done:
if (use_lock) if (use_lock)
VOID(pthread_mutex_unlock(&LOCK_event_arrays)); VOID(pthread_mutex_unlock(&LOCK_event_arrays));
......
...@@ -160,6 +160,7 @@ event_executor_main(void *arg) ...@@ -160,6 +160,7 @@ event_executor_main(void *arg)
THD *thd; /* needs to be first for thread_stack */ THD *thd; /* needs to be first for thread_stack */
ulonglong iter_num= 0; ulonglong iter_num= 0;
uint i=0, j=0; uint i=0, j=0;
my_ulonglong cnt= 0;
DBUG_ENTER("event_executor_main"); DBUG_ENTER("event_executor_main");
DBUG_PRINT("event_executor_main", ("EVEX thread started")); DBUG_PRINT("event_executor_main", ("EVEX thread started"));
...@@ -194,24 +195,16 @@ event_executor_main(void *arg) ...@@ -194,24 +195,16 @@ event_executor_main(void *arg)
thread_running++; thread_running++;
VOID(pthread_mutex_unlock(&LOCK_thread_count)); VOID(pthread_mutex_unlock(&LOCK_thread_count));
DBUG_PRINT("EVEX main thread", ("Initing events_array")); DBUG_PRINT("EVEX main thread", ("Initing events_queuey"));
VOID(pthread_mutex_lock(&LOCK_event_arrays));
/*
my_malloc is used as underlying allocator which does not use a mem_root
thus data should be freed at later stage.
*/
VOID(my_init_dynamic_array(&events_array, sizeof(event_timed), 50, 100));
evex_queue_init(&EVEX_EQ_NAME);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
/* /*
eventually manifest that we are running, not to crashe because of eventually manifest that we are running, not to crashe because of
usage of non-initialized memory structures. usage of non-initialized memory structures.
*/ */
VOID(pthread_mutex_lock(&LOCK_evex_running)); VOID(pthread_mutex_lock(&LOCK_evex_running));
VOID(pthread_mutex_lock(&LOCK_event_arrays));
evex_queue_init(&EVEX_EQ_NAME);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
evex_is_running= true; evex_is_running= true;
event_executor_running_global_var= opt_event_executor; event_executor_running_global_var= opt_event_executor;
VOID(pthread_mutex_unlock(&LOCK_evex_running)); VOID(pthread_mutex_unlock(&LOCK_evex_running));
...@@ -222,15 +215,16 @@ event_executor_main(void *arg) ...@@ -222,15 +215,16 @@ event_executor_main(void *arg)
THD_CHECK_SENTRY(thd); THD_CHECK_SENTRY(thd);
/* Read queries from the IO/THREAD until this thread is killed */ /* Read queries from the IO/THREAD until this thread is killed */
evex_main_thread_id= thd->thread_id; evex_main_thread_id= thd->thread_id;
sql_print_information("Scheduler thread started");
while (!thd->killed) while (!thd->killed)
{ {
TIME time_now; TIME time_now;
my_time_t now; my_time_t now;
my_ulonglong cnt;
event_timed *et; event_timed *et;
DBUG_PRINT("info", ("EVEX External Loop %d", ++cnt)); cnt++;
DBUG_PRINT("info", ("EVEX External Loop %d", cnt));
if (cnt > 1000) continue;
thd->proc_info = "Sleeping"; thd->proc_info = "Sleeping";
if (!evex_queue_num_elements(EVEX_EQ_NAME) || if (!evex_queue_num_elements(EVEX_EQ_NAME) ||
!event_executor_running_global_var) !event_executor_running_global_var)
...@@ -326,18 +320,12 @@ event_executor_main(void *arg) ...@@ -326,18 +320,12 @@ event_executor_main(void *arg)
} }
if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
{ {
evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
if (et->dropped) if (et->dropped)
{
// we have to drop the event
int idx;
et->drop(thd); et->drop(thd);
idx= get_index_dynamic(&events_array, (gptr) et); delete et;
DBUG_ASSERT(idx != -1); evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
delete_dynamic_element(&events_array, idx);
}
} else } else
evex_queue_first_updated(&EVEX_EQ_NAME); evex_queue_first_updated(&EVEX_EQ_NAME);
VOID(pthread_mutex_unlock(&LOCK_event_arrays)); VOID(pthread_mutex_unlock(&LOCK_event_arrays));
}// while }// while
...@@ -365,15 +353,15 @@ event_executor_main(void *arg) ...@@ -365,15 +353,15 @@ event_executor_main(void *arg)
LEX_STRINGs reside in the memory root and will be destroyed with it. LEX_STRINGs reside in the memory root and will be destroyed with it.
Hence no need of delete but only freeing of SP Hence no need of delete but only freeing of SP
*/ */
for (i= 0; i < events_array.elements; ++i) // First we free all objects ...
dynamic_element(&events_array, i, event_timed*)->free_sp(); for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i)
{
VOID(pthread_mutex_lock(&LOCK_event_arrays)); event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*);
// No need to use lock here if EVEX is not running but anyway et->free_sp();
delete_queue(&executing_queue); delete et;
}
// ... then we can thras the whole queue at once
evex_queue_destroy(&EVEX_EQ_NAME); evex_queue_destroy(&EVEX_EQ_NAME);
delete_dynamic(&events_array);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
thd->proc_info = "Clearing"; thd->proc_info = "Clearing";
DBUG_ASSERT(thd->net.buff != 0); DBUG_ASSERT(thd->net.buff != 0);
...@@ -529,7 +517,7 @@ evex_load_events_from_db(THD *thd) ...@@ -529,7 +517,7 @@ evex_load_events_from_db(THD *thd)
init_read_record(&read_record_info, thd, table ,NULL,1,0); init_read_record(&read_record_info, thd, table ,NULL,1,0);
while (!(read_record_info.read_record(&read_record_info))) while (!(read_record_info.read_record(&read_record_info)))
{ {
event_timed *et, *et_copy; event_timed *et;
if (!(et= new event_timed)) if (!(et= new event_timed))
{ {
DBUG_PRINT("evex_load_events_from_db", ("Out of memory")); DBUG_PRINT("evex_load_events_from_db", ("Out of memory"));
...@@ -564,20 +552,10 @@ evex_load_events_from_db(THD *thd) ...@@ -564,20 +552,10 @@ evex_load_events_from_db(THD *thd)
et->compute_next_execution_time(); et->compute_next_execution_time();
DBUG_PRINT("evex_load_events_from_db", ("Adding to the exec list.")); DBUG_PRINT("evex_load_events_from_db", ("Adding to the exec list."));
VOID(push_dynamic(&events_array,(gptr) et));
/* evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et);
We always add at the end so the number of elements - 1 is the place DBUG_PRINT("evex_load_events_from_db", ("%p %*s",
in the buffer. et, et->name.length,et->name.str));
DYNAMIC_ARRAY copies the object bit by bit so we have a hollow copy
in event_array. We don't need the original therefore we delete it.
*/
et_copy= dynamic_element(&events_array, events_array.elements - 1,
event_timed*);
evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et_copy);
printf("%p %s\n", et_copy, et_copy->name.str);
et->free_sphead_on_delete= false;
delete et;
} }
ret= 0; ret= 0;
......
...@@ -67,52 +67,29 @@ evex_time_diff(TIME *a, TIME *b); ...@@ -67,52 +67,29 @@ evex_time_diff(TIME *a, TIME *b);
#define EXEC_QUEUE_QUEUE_NAME executing_queue #define EXEC_QUEUE_QUEUE_NAME executing_queue
#define EXEC_QUEUE_DARR_NAME evex_executing_queue #define EXEC_QUEUE_DARR_NAME evex_executing_queue
#ifdef EVEX_USE_QUEUE
#define EVEX_QUEUE_TYPE QUEUE
#define EVEX_PTOQEL byte *
#define EVEX_EQ_NAME executing_queue
#define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue))
#define evex_queue_element(queue, idx, __cast) ((__cast)queue_top(queue))
#define evex_queue_delete_element(queue, idx) queue_remove(queue, idx)
#define evex_queue_destroy(queue) delete_queue(queue)
#define evex_queue_first_updated(queue) queue_replaced(queue)
#define evex_queue_insert(queue, element) queue_insert_safe(queue, element);
#else
#define EVEX_QUEUE_TYPE DYNAMIC_ARRAY
#define EVEX_PTOQEL gptr
#define EVEX_EQ_NAME evex_executing_queue
#define evex_queue_element(queue, idx, __cast) dynamic_element(queue,idx, __cast)
#define evex_queue_delete_element(queue, idx) delete_dynamic_element(queue, idx);
#define evex_queue_destroy(queue) delete_dynamic(queue)
/*
push_dynamic() expects ptr to the memory to put in, to make things fast
so when a pointer has to be put inside a ptr-to-ptr is being passed
*/
#define evex_queue_first_updated(queue)
#define evex_queue_insert(queue, element) VOID(push_dynamic(queue, &element))
#endif
#define EVEX_QUEUE_TYPE QUEUE
#define EVEX_PTOQEL byte *
#define EVEX_EQ_NAME executing_queue
#define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue))
#define evex_queue_element(queue, idx, __cast) ((__cast)queue_element(queue, idx))
#define evex_queue_delete_element(queue, idx) queue_remove(queue, idx)
#define evex_queue_destroy(queue) delete_queue(queue)
#define evex_queue_first_updated(queue) queue_replaced(queue)
#define evex_queue_insert(queue, element) queue_insert_safe(queue, element);
void
evex_queue_init(EVEX_QUEUE_TYPE *queue);
int
evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element);
void void
evex_queue_sort(EVEX_QUEUE_TYPE *queue); evex_queue_init(EVEX_QUEUE_TYPE *queue);
#define evex_queue_num_elements(queue) queue.elements #define evex_queue_num_elements(queue) queue.elements
extern bool evex_is_running; extern bool evex_is_running;
extern bool mysql_event_table_exists; extern bool mysql_event_table_exists;
extern DYNAMIC_ARRAY events_array; //extern DYNAMIC_ARRAY events_array;
extern MEM_ROOT evex_mem_root; extern MEM_ROOT evex_mem_root;
extern pthread_mutex_t LOCK_event_arrays, extern pthread_mutex_t LOCK_event_arrays,
LOCK_workers_count, LOCK_workers_count,
......
...@@ -939,6 +939,7 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root) ...@@ -939,6 +939,7 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root)
sphead->optimize(); sphead->optimize();
ret= 0; ret= 0;
done: done:
delete lex.et;
lex_end(&lex); lex_end(&lex);
thd->lex= old_lex; thd->lex= old_lex;
thd->query= old_query; thd->query= old_query;
......
...@@ -3685,9 +3685,11 @@ mysql_execute_command(THD *thd) ...@@ -3685,9 +3685,11 @@ mysql_execute_command(THD *thd)
res= true; res= true;
break; break;
} }
if (check_access(thd, EVENT_ACL, lex->et->dbname.str, 0, 0, 0, if (check_access(thd, EVENT_ACL, lex->et->dbname.str, 0, 0, 0,
is_schema_db(lex->et->dbname.str))) is_schema_db(lex->et->dbname.str)))
break; break;
switch (lex->sql_command) { switch (lex->sql_command) {
case SQLCOM_CREATE_EVENT: case SQLCOM_CREATE_EVENT:
res= evex_create_event(thd, lex->et, (uint) lex->create_info.options); res= evex_create_event(thd, lex->et, (uint) lex->create_info.options);
...@@ -5652,6 +5654,11 @@ void mysql_parse(THD *thd, char *inBuf, uint length) ...@@ -5652,6 +5654,11 @@ void mysql_parse(THD *thd, char *inBuf, uint length)
delete thd->lex->sphead; delete thd->lex->sphead;
thd->lex->sphead= NULL; thd->lex->sphead= NULL;
} }
if (thd->lex->et)
{
delete thd->lex->et;
thd->lex->et= NULL;
}
} }
else else
{ {
...@@ -5687,6 +5694,11 @@ void mysql_parse(THD *thd, char *inBuf, uint length) ...@@ -5687,6 +5694,11 @@ void mysql_parse(THD *thd, char *inBuf, uint length)
delete thd->lex->sphead; delete thd->lex->sphead;
thd->lex->sphead= NULL; thd->lex->sphead= NULL;
} }
if (thd->lex->et)
{
delete thd->lex->et;
thd->lex->et= NULL;
}
} }
thd->proc_info="freeing items"; thd->proc_info="freeing items";
thd->end_statement(); thd->end_statement();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment