Commit ba79f499 authored by andrey@lmy004's avatar andrey@lmy004

WL#1034 update

(cleanups, leaks fixed)
parent 854d2d4b
......@@ -23,8 +23,6 @@
- The default value of created/modified should not be 0000-00-00 because of
STRICT mode restricions.
- Use timestamps instead of datetime.
- CREATE EVENT should not go into binary log! Does it now? The SQL statements
issued by the EVENT are replicated.
I have an idea how to solve the problem at failover. So the status field
......@@ -43,13 +41,14 @@
- Maybe move all allocations during parsing to evex_mem_root thus saving
double parsing in evex_create_event!
- If the server is killed (stopping) try to kill executing events..
- If the server is killed (stopping) try to kill executing events?
- What happens if one renames an event in the DB while it is in memory?
Or even deleting it?
- Consider using conditional variable when doing shutdown instead of
waiting till all worker threads end.
waiting till all worker threads end.
- Make event_timed::get_show_create_event() work
- Add function documentation whenever needed.
......@@ -58,10 +57,6 @@
- Move comparison code to class event_timed
- Overload event_timed::new to put the event directly in the DYNAMIC_ARRAY.
This will skip copy operation as well as will simplify the code which is
now aware of events_array DYNAMIC_ARRAY
Warning:
- For now parallel execution is not possible because the same sp_head cannot be
executed few times!!! There is still no lock attached to particular event.
......@@ -72,7 +67,6 @@ Warning:
bool mysql_event_table_exists= 1;
DYNAMIC_ARRAY events_array;
QUEUE EVEX_EQ_NAME;
MEM_ROOT evex_mem_root;
......@@ -81,54 +75,12 @@ MEM_ROOT evex_mem_root;
void
evex_queue_init(EVEX_QUEUE_TYPE *queue)
{
#ifndef EVEX_USE_QUEUE
VOID(my_init_dynamic_array(queue, sizeof(event_timed *), 50, 100));
#else
if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/,
0 /*smallest_on_top*/, event_timed_compare_q, NULL,
100 /*auto_extent*/))
sql_print_error("Insufficient memory to initialize executing queue.");
#endif
}
int
evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element)
{
#ifndef EVEX_USE_QUEUE
VOID(push_dynamic(queue, element));
return 0;
#else
return queue_insert_safe(queue, element);
#endif
}
void
evex_queue_top_updated(EVEX_QUEUE_TYPE *queue)
{
#ifdef EVEX_USE_QUEUE
queue_replaced(queue);
#endif
}
void
evex_queue_sort(EVEX_QUEUE_TYPE *queue)
{
#ifndef EVEX_USE_QUEUE
qsort((gptr) dynamic_element(queue, 0, event_timed**),
queue->elements,
sizeof(event_timed **),
(qsort_cmp) event_timed_compare);
#endif
}
/* NOTE Andrey: Document better
Compares two TIME structures.
a > b -> 1
a = b -> 0
a < b -> -1
*/
static
int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
......@@ -714,7 +666,7 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock)
{
int ret= 0;
MEM_ROOT *tmp_mem_root;
event_timed *ett, *ett_copy;
event_timed *ett;
DBUG_ENTER("db_load_and_compile_event");
DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str));
......@@ -737,18 +689,12 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock)
if (use_lock)
VOID(pthread_mutex_lock(&LOCK_event_arrays));
VOID(push_dynamic(&events_array,(gptr) ett));
ett_copy= dynamic_element(&events_array, events_array.elements - 1,
event_timed*);
evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett_copy);
evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett);
/*
There is a copy in the array which we don't need. sphead won't be
destroyed.
*/
ett->free_sphead_on_delete= false;
delete ett;
if (use_lock)
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
......@@ -783,43 +729,14 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock)
if (!sortcmp_lex_string(*name, et->name, system_charset_info) &&
!sortcmp_lex_string(*db, et->dbname, system_charset_info))
{
int idx= get_index_dynamic(&events_array, (gptr) et);
//we are lucky the event is in the executing queue, no need of second pass
//destruct first and then remove. the destructor will delete sp_head
et->free_sp();
delete_dynamic_element(&events_array, idx);
delete et;
evex_queue_delete_element(&EVEX_EQ_NAME, i);
// ok, we have cleaned
goto done;
}
}
/*
ToDo Andrey : Think about whether second pass is needed. All events
that are in memory are enabled. If an event is being
disabled (by a SQL stmt) it will be uncached. Hmm...
However is this true for events that has been
disabled because of another reason like - no need
to be executed because ENDS is in the past?
For instance, second_pass is needed when an event
was created as DISABLED but then altered as ENABLED.
*/
/*
we haven't found the event in the executing queue. This is nice! :)
Look for it in the events_array.
*/
for (i= 0; i < events_array.elements; ++i)
{
event_timed *et= dynamic_element(&events_array, i, event_timed*);
if (!sortcmp_lex_string(*name, et->name, system_charset_info) &&
!sortcmp_lex_string(*db, et->dbname, system_charset_info))
{
delete_dynamic_element(&events_array, i);
break;
}
}
done:
if (use_lock)
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
......
......@@ -160,6 +160,7 @@ event_executor_main(void *arg)
THD *thd; /* needs to be first for thread_stack */
ulonglong iter_num= 0;
uint i=0, j=0;
my_ulonglong cnt= 0;
DBUG_ENTER("event_executor_main");
DBUG_PRINT("event_executor_main", ("EVEX thread started"));
......@@ -194,24 +195,16 @@ event_executor_main(void *arg)
thread_running++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
DBUG_PRINT("EVEX main thread", ("Initing events_array"));
VOID(pthread_mutex_lock(&LOCK_event_arrays));
/*
my_malloc is used as underlying allocator which does not use a mem_root
thus data should be freed at later stage.
*/
VOID(my_init_dynamic_array(&events_array, sizeof(event_timed), 50, 100));
evex_queue_init(&EVEX_EQ_NAME);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
DBUG_PRINT("EVEX main thread", ("Initing events_queuey"));
/*
eventually manifest that we are running, not to crashe because of
usage of non-initialized memory structures.
*/
VOID(pthread_mutex_lock(&LOCK_evex_running));
VOID(pthread_mutex_lock(&LOCK_event_arrays));
evex_queue_init(&EVEX_EQ_NAME);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
evex_is_running= true;
event_executor_running_global_var= opt_event_executor;
VOID(pthread_mutex_unlock(&LOCK_evex_running));
......@@ -222,15 +215,16 @@ event_executor_main(void *arg)
THD_CHECK_SENTRY(thd);
/* Read queries from the IO/THREAD until this thread is killed */
evex_main_thread_id= thd->thread_id;
sql_print_information("Scheduler thread started");
while (!thd->killed)
{
TIME time_now;
my_time_t now;
my_ulonglong cnt;
event_timed *et;
DBUG_PRINT("info", ("EVEX External Loop %d", ++cnt));
cnt++;
DBUG_PRINT("info", ("EVEX External Loop %d", cnt));
if (cnt > 1000) continue;
thd->proc_info = "Sleeping";
if (!evex_queue_num_elements(EVEX_EQ_NAME) ||
!event_executor_running_global_var)
......@@ -326,18 +320,12 @@ event_executor_main(void *arg)
}
if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
{
evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
if (et->dropped)
{
// we have to drop the event
int idx;
et->drop(thd);
idx= get_index_dynamic(&events_array, (gptr) et);
DBUG_ASSERT(idx != -1);
delete_dynamic_element(&events_array, idx);
}
delete et;
evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
} else
evex_queue_first_updated(&EVEX_EQ_NAME);
evex_queue_first_updated(&EVEX_EQ_NAME);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
}// while
......@@ -365,15 +353,15 @@ err:
LEX_STRINGs reside in the memory root and will be destroyed with it.
Hence no need of delete but only freeing of SP
*/
for (i= 0; i < events_array.elements; ++i)
dynamic_element(&events_array, i, event_timed*)->free_sp();
VOID(pthread_mutex_lock(&LOCK_event_arrays));
// No need to use lock here if EVEX is not running but anyway
delete_queue(&executing_queue);
// First we free all objects ...
for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i)
{
event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*);
et->free_sp();
delete et;
}
// ... then we can thras the whole queue at once
evex_queue_destroy(&EVEX_EQ_NAME);
delete_dynamic(&events_array);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
thd->proc_info = "Clearing";
DBUG_ASSERT(thd->net.buff != 0);
......@@ -529,7 +517,7 @@ evex_load_events_from_db(THD *thd)
init_read_record(&read_record_info, thd, table ,NULL,1,0);
while (!(read_record_info.read_record(&read_record_info)))
{
event_timed *et, *et_copy;
event_timed *et;
if (!(et= new event_timed))
{
DBUG_PRINT("evex_load_events_from_db", ("Out of memory"));
......@@ -564,20 +552,10 @@ evex_load_events_from_db(THD *thd)
et->compute_next_execution_time();
DBUG_PRINT("evex_load_events_from_db", ("Adding to the exec list."));
VOID(push_dynamic(&events_array,(gptr) et));
/*
We always add at the end so the number of elements - 1 is the place
in the buffer.
DYNAMIC_ARRAY copies the object bit by bit so we have a hollow copy
in event_array. We don't need the original therefore we delete it.
*/
et_copy= dynamic_element(&events_array, events_array.elements - 1,
event_timed*);
evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et_copy);
printf("%p %s\n", et_copy, et_copy->name.str);
et->free_sphead_on_delete= false;
delete et;
evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et);
DBUG_PRINT("evex_load_events_from_db", ("%p %*s",
et, et->name.length,et->name.str));
}
ret= 0;
......
......@@ -67,52 +67,29 @@ evex_time_diff(TIME *a, TIME *b);
#define EXEC_QUEUE_QUEUE_NAME executing_queue
#define EXEC_QUEUE_DARR_NAME evex_executing_queue
#ifdef EVEX_USE_QUEUE
#define EVEX_QUEUE_TYPE QUEUE
#define EVEX_PTOQEL byte *
#define EVEX_EQ_NAME executing_queue
#define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue))
#define evex_queue_element(queue, idx, __cast) ((__cast)queue_top(queue))
#define evex_queue_delete_element(queue, idx) queue_remove(queue, idx)
#define evex_queue_destroy(queue) delete_queue(queue)
#define evex_queue_first_updated(queue) queue_replaced(queue)
#define evex_queue_insert(queue, element) queue_insert_safe(queue, element);
#else
#define EVEX_QUEUE_TYPE DYNAMIC_ARRAY
#define EVEX_PTOQEL gptr
#define EVEX_EQ_NAME evex_executing_queue
#define evex_queue_element(queue, idx, __cast) dynamic_element(queue,idx, __cast)
#define evex_queue_delete_element(queue, idx) delete_dynamic_element(queue, idx);
#define evex_queue_destroy(queue) delete_dynamic(queue)
/*
push_dynamic() expects ptr to the memory to put in, to make things fast
so when a pointer has to be put inside a ptr-to-ptr is being passed
*/
#define evex_queue_first_updated(queue)
#define evex_queue_insert(queue, element) VOID(push_dynamic(queue, &element))
#endif
#define EVEX_QUEUE_TYPE QUEUE
#define EVEX_PTOQEL byte *
#define EVEX_EQ_NAME executing_queue
#define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue))
#define evex_queue_element(queue, idx, __cast) ((__cast)queue_element(queue, idx))
#define evex_queue_delete_element(queue, idx) queue_remove(queue, idx)
#define evex_queue_destroy(queue) delete_queue(queue)
#define evex_queue_first_updated(queue) queue_replaced(queue)
#define evex_queue_insert(queue, element) queue_insert_safe(queue, element);
void
evex_queue_init(EVEX_QUEUE_TYPE *queue);
int
evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element);
void
evex_queue_sort(EVEX_QUEUE_TYPE *queue);
evex_queue_init(EVEX_QUEUE_TYPE *queue);
#define evex_queue_num_elements(queue) queue.elements
extern bool evex_is_running;
extern bool mysql_event_table_exists;
extern DYNAMIC_ARRAY events_array;
//extern DYNAMIC_ARRAY events_array;
extern MEM_ROOT evex_mem_root;
extern pthread_mutex_t LOCK_event_arrays,
LOCK_workers_count,
......
......@@ -939,6 +939,7 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root)
sphead->optimize();
ret= 0;
done:
delete lex.et;
lex_end(&lex);
thd->lex= old_lex;
thd->query= old_query;
......
......@@ -3685,9 +3685,11 @@ end_with_restore_list:
res= true;
break;
}
if (check_access(thd, EVENT_ACL, lex->et->dbname.str, 0, 0, 0,
is_schema_db(lex->et->dbname.str)))
break;
switch (lex->sql_command) {
case SQLCOM_CREATE_EVENT:
res= evex_create_event(thd, lex->et, (uint) lex->create_info.options);
......@@ -5652,6 +5654,11 @@ void mysql_parse(THD *thd, char *inBuf, uint length)
delete thd->lex->sphead;
thd->lex->sphead= NULL;
}
if (thd->lex->et)
{
delete thd->lex->et;
thd->lex->et= NULL;
}
}
else
{
......@@ -5687,6 +5694,11 @@ void mysql_parse(THD *thd, char *inBuf, uint length)
delete thd->lex->sphead;
thd->lex->sphead= NULL;
}
if (thd->lex->et)
{
delete thd->lex->et;
thd->lex->et= NULL;
}
}
thd->proc_info="freeing items";
thd->end_statement();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment