Fix for bug#22740 Events: Decouple Event_queue from Event_db_repository

This patch implements the idea of the bug report by making Event_queue
unaware of Event_db_repository by making a higher level class - Events,
which is aware of most of all classes, responsible for passing all data
needed for adding/updating/deleting an event to/from the queue.

Introduces few new classes :
 - Event_worker_thread
 - Event_queue_element_for_exec
parent db5c1646
......@@ -20,6 +20,8 @@
#include "event_db_repository.h"
#include "sp_head.h"
/* That's a provisional solution */
extern Event_db_repository events_event_db_repository;
#define EVEX_MAX_INTERVAL_VALUE 1000000000L
......@@ -30,6 +32,47 @@ event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host,
static void
event_restore_security_context(THD *thd, Security_context *backup);
/*
Initiliazes dbname and name of an Event_queue_element_for_exec
object
SYNOPSIS
Event_queue_element_for_exec::init()
RETURN VALUE
FALSE OK
TRUE Error (OOM)
*/
bool
Event_queue_element_for_exec::init(LEX_STRING db, LEX_STRING n)
{
if (!(dbname.str= my_strndup(db.str, dbname.length= db.length, MYF(MY_WME))))
return TRUE;
if (!(name.str= my_strndup(n.str, name.length= n.length, MYF(MY_WME))))
{
my_free((gptr) dbname.str, MYF(0));
return TRUE;
}
return FALSE;
}
/*
Destructor
SYNOPSIS
Event_queue_element_for_exec::~Event_queue_element_for_exec()
*/
Event_queue_element_for_exec::~Event_queue_element_for_exec()
{
my_free((gptr) dbname.str, MYF(0));
my_free((gptr) name.str, MYF(0));
}
/*
Returns a new instance
......@@ -743,7 +786,7 @@ Event_timed::~Event_timed()
*/
Event_job_data::Event_job_data()
:thd(NULL), sphead(NULL), sql_mode(0)
:sphead(NULL), sql_mode(0)
{
}
......@@ -1239,6 +1282,7 @@ Event_queue_element::compute_next_execution_time()
DBUG_PRINT("info", ("Dropped: %d", dropped));
status= Event_queue_element::DISABLED;
status_changed= TRUE;
dropped= TRUE;
goto ret;
}
......@@ -1446,32 +1490,6 @@ Event_queue_element::mark_last_executed(THD *thd)
}
/*
Drops the event
SYNOPSIS
Event_queue_element::drop()
thd thread context
RETURN VALUE
0 OK
-1 Cannot open mysql.event
-2 Cannot find the event in mysql.event (already deleted?)
others return code from SE in case deletion of the event row
failed.
*/
int
Event_queue_element::drop(THD *thd)
{
DBUG_ENTER("Event_queue_element::drop");
DBUG_RETURN(Events::get_instance()->
drop_event(thd, dbname, name, FALSE, TRUE));
}
/*
Saves status and last_executed_at to the disk if changed.
......@@ -1503,13 +1521,13 @@ Event_queue_element::update_timing_fields(THD *thd)
thd->reset_n_backup_open_tables_state(&backup);
if (Events::get_instance()->open_event_table(thd, TL_WRITE, &table))
if (events_event_db_repository.open_event_table(thd, TL_WRITE, &table))
{
ret= TRUE;
goto done;
}
fields= table->field;
if ((ret= Events::get_instance()->db_repository->
if ((ret= events_event_db_repository.
find_named_event(thd, dbname, name, table)))
goto done;
......
......@@ -27,6 +27,27 @@ class sp_head;
class Sql_alloc;
class Event_queue_element_for_exec
{
public:
Event_queue_element_for_exec(){};
~Event_queue_element_for_exec();
bool
init(LEX_STRING dbname, LEX_STRING name);
LEX_STRING dbname;
LEX_STRING name;
bool dropped;
THD *thd;
private:
/* Prevent use of these */
Event_queue_element_for_exec(const Event_queue_element_for_exec &);
void operator=(Event_queue_element_for_exec &);
};
class Event_basic
{
protected:
......@@ -96,9 +117,6 @@ public:
bool
compute_next_execution_time();
int
drop(THD *thd);
void
mark_last_executed(THD *thd);
......@@ -160,7 +178,6 @@ public:
class Event_job_data : public Event_basic
{
public:
THD *thd;
sp_head *sphead;
LEX_STRING body;
......
......@@ -16,7 +16,6 @@
#include "mysql_priv.h"
#include "event_queue.h"
#include "event_data_objects.h"
#include "event_db_repository.h"
#define EVENT_QUEUE_INITIAL_SIZE 30
......@@ -136,16 +135,14 @@ Event_queue::deinit_mutexes()
*/
bool
Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
Event_queue::init_queue(THD *thd)
{
bool res;
struct event_queue_param *event_queue_param_value= NULL;
DBUG_ENTER("Event_queue::init_queue");
DBUG_PRINT("enter", ("this: 0x%lx", (long) this));
LOCK_QUEUE_DATA();
db_repository= db_repo;
if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
0 /*max_on_top*/, event_queue_element_compare_q,
......@@ -162,12 +159,8 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
goto err;
}
res= load_events_from_db(thd);
UNLOCK_QUEUE_DATA();
if (res)
deinit_queue();
DBUG_RETURN(res);
DBUG_RETURN(FALSE);
err:
UNLOCK_QUEUE_DATA();
......@@ -204,37 +197,29 @@ Event_queue::deinit_queue()
Event_queue::create_event()
dbname The schema of the new event
name The name of the new event
RETURN VALUE
OP_OK OK or scheduler not working
OP_LOAD_ERROR Error during loading from disk
*/
int
Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
void
Event_queue::create_event(THD *thd, Event_queue_element *new_element)
{
int res;
Event_queue_element *new_element;
DBUG_ENTER("Event_queue::create_event");
DBUG_PRINT("enter", ("thd: 0x%lx et=%s.%s", (long) thd, dbname.str, name.str));
DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd,
new_element->dbname.str, new_element->name.str));
new_element= new Event_queue_element();
res= db_repository->load_named_event(thd, dbname, name, new_element);
if (res || new_element->status == Event_queue_element::DISABLED)
if (new_element->status == Event_queue_element::DISABLED)
delete new_element;
else
{
new_element->compute_next_execution_time();
DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
LOCK_QUEUE_DATA();
DBUG_PRINT("info", ("new event in the queue: 0x%lx", (long) new_element));
queue_insert_safe(&queue, (byte *) new_element);
dbug_dump_queue(thd->query_start());
pthread_cond_broadcast(&COND_queue_state);
UNLOCK_QUEUE_DATA();
}
DBUG_RETURN(res);
DBUG_VOID_RETURN;
}
......@@ -248,32 +233,16 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
name Name of the event
new_schema New schema, in case of RENAME TO, otherwise NULL
new_name New name, in case of RENAME TO, otherwise NULL
RETURN VALUE
OP_OK OK or scheduler not working
OP_LOAD_ERROR Error during loading from disk
*/
int
void
Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
LEX_STRING *new_schema, LEX_STRING *new_name)
Event_queue_element *new_element)
{
int res;
Event_queue_element *new_element;
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd: 0x%lx et=[%s.%s]", (long) thd, dbname.str, name.str));
new_element= new Event_queue_element();
res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname,
new_name ? *new_name:name, new_element);
if (res)
{
delete new_element;
goto end;
}
else if (new_element->status == Event_queue_element::DISABLED)
if (new_element->status == Event_queue_element::DISABLED)
{
DBUG_PRINT("info", ("The event is disabled."));
/*
......@@ -300,9 +269,7 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
end:
DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res);
DBUG_VOID_RETURN;
}
......@@ -453,133 +420,6 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
}
/*
Loads all ENABLED events from mysql.event into the prioritized
queue. Called during scheduler main thread initialization. Compiles
the events. Creates Event_queue_element instances for every ENABLED event
from mysql.event.
SYNOPSIS
Event_queue::load_events_from_db()
thd - Thread context. Used for memory allocation in some cases.
RETURN VALUE
0 OK
!0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
EVEX_COMPILE_ERROR) - in all these cases mysql.event was
tampered.
NOTES
Reports the error to the console
*/
int
Event_queue::load_events_from_db(THD *thd)
{
TABLE *table;
READ_RECORD read_record_info;
int ret= -1;
uint count= 0;
bool clean_the_queue= TRUE;
DBUG_ENTER("Event_queue::load_events_from_db");
DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
{
sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
}
init_read_record(&read_record_info, thd, table ,NULL,1,0);
while (!(read_record_info.read_record(&read_record_info)))
{
Event_queue_element *et;
if (!(et= new Event_queue_element))
{
DBUG_PRINT("info", ("Out of memory"));
break;
}
DBUG_PRINT("info", ("Loading event from row."));
if ((ret= et->load_from_row(table)))
{
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
"Table probably corrupted");
break;
}
if (et->status != Event_queue_element::ENABLED)
{
DBUG_PRINT("info",("%s is disabled",et->name.str));
delete et;
continue;
}
/* let's find when to be executed */
if (et->compute_next_execution_time())
{
sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
" Skipping", et->dbname.str, et->name.str);
continue;
}
{
Event_job_data temp_job_data;
DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
temp_job_data.load_from_row(table);
/*
We load only on scheduler root just to check whether the body
compiles.
*/
switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
case EVEX_MICROSECOND_UNSUP:
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
"supported but found in mysql.event");
break;
case EVEX_COMPILE_ERROR:
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
et->dbname.str, et->name.str);
break;
default:
break;
}
thd->end_statement();
thd->cleanup_after_query();
}
if (ret)
{
delete et;
goto end;
}
queue_insert_safe(&queue, (byte *) et);
count++;
}
clean_the_queue= FALSE;
end:
end_read_record(&read_record_info);
if (clean_the_queue)
{
empty_queue();
ret= -1;
}
else
{
ret= 0;
sql_print_information("SCHEDULER: Loaded %d event%s", count,
(count == 1)?"":"s");
}
close_thread_tables(thd);
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);
}
/*
Recalculates activation times in the queue. There is one reason for
that. Because the values (execute_at) by which the queue is ordered are
......@@ -629,7 +469,7 @@ Event_queue::empty_queue()
{
uint i;
DBUG_ENTER("Event_queue::empty_queue");
DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
DBUG_PRINT("enter", ("Purging the queue. %u element(s)", queue.elements));
sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements);
/* empty the queue */
for (i= 0; i < queue.elements; ++i)
......@@ -690,31 +530,27 @@ static const char *queue_wait_msg= "Waiting for next activation";
SYNOPSIS
Event_queue::get_top_for_execution_if_time()
thd [in] Thread
job_data [out] The object to execute
thd [in] Thread
event_name [out] The object to execute
RETURN VALUE
FALSE No error. If *job_data==NULL then top not elligible for execution.
Could be that there is no top.
TRUE Error
FALSE No error. event_name != NULL
TRUE Serious error
*/
bool
Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
Event_queue::get_top_for_execution_if_time(THD *thd,
Event_queue_element_for_exec **event_name)
{
bool ret= FALSE;
struct timespec top_time;
Event_queue_element *top= NULL;
bool to_free= FALSE;
bool to_drop= FALSE;
*job_data= NULL;
*event_name= NULL;
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
LOCK_QUEUE_DATA();
for (;;)
{
int res;
Event_queue_element *top= NULL;
/* Break loop if thd has been killed */
if (thd->killed)
......@@ -753,39 +589,30 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
continue;
}
DBUG_PRINT("info", ("Ready for execution"));
if (!(*job_data= new Event_job_data()))
{
ret= TRUE;
break;
}
if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
*job_data)))
if (!(*event_name= new Event_queue_element_for_exec()) ||
(*event_name)->init(top->dbname, top->name))
{
DBUG_PRINT("error", ("Got %d from load_named_event", res));
delete *job_data;
*job_data= NULL;
ret= TRUE;
break;
}
DBUG_PRINT("info", ("Ready for execution"));
top->mark_last_executed(thd);
if (top->compute_next_execution_time())
top->status= Event_queue_element::DISABLED;
DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status));
(*job_data)->execution_count= top->execution_count;
top->execution_count++;
(*event_name)->dropped= top->dropped;
top->update_timing_fields(thd);
if (((top->execute_at.year && !top->expression) || top->execute_at_null) ||
(top->status == Event_queue_element::DISABLED))
if (top->status == Event_queue_element::DISABLED)
{
DBUG_PRINT("info", ("removing from the queue"));
sql_print_information("SCHEDULER: Last execution of %s.%s. %s",
top->dbname.str, top->name.str,
top->dropped? "Dropping.":"");
to_free= TRUE;
to_drop= top->dropped;
delete top;
queue_remove(&queue, 0);
}
else
......@@ -796,19 +623,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
}
end:
UNLOCK_QUEUE_DATA();
if (to_drop)
{
DBUG_PRINT("info", ("Dropping from disk"));
top->drop(thd);
}
if (to_free)
delete top;
DBUG_PRINT("info", ("returning %d et_new: 0x%lx ", ret, (long) *job_data));
DBUG_PRINT("info", ("returning %d et_new: 0x%lx ",
ret, (long) *event_name));
if (*job_data)
DBUG_PRINT("info", ("db: %s name: %s definer=%s", (*job_data)->dbname.str,
(*job_data)->name.str, (*job_data)->definer.str));
if (*event_name)
DBUG_PRINT("info", ("db: %s name: %s",
(*event_name)->dbname.str, (*event_name)->name.str));
DBUG_RETURN(ret);
}
......
......@@ -16,12 +16,10 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
class Event_basic;
class Event_db_repository;
class Event_job_data;
class Event_queue_element;
class Event_queue_element_for_exec;
class THD;
class Event_scheduler;
class Event_queue
{
......@@ -35,19 +33,19 @@ public:
deinit_mutexes();
bool
init_queue(THD *thd, Event_db_repository *db_repo);
init_queue(THD *thd);
void
deinit_queue();
/* Methods for queue management follow */
int
create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
void
create_event(THD *thd, Event_queue_element *new_element);
int
void
update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
LEX_STRING *new_schema, LEX_STRING *new_name);
Event_queue_element *new_element);
void
drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
......@@ -59,14 +57,15 @@ public:
recalculate_activation_times(THD *thd);
bool
get_top_for_execution_if_time(THD *thd, Event_job_data **job_data);
get_top_for_execution_if_time(THD *thd,
Event_queue_element_for_exec **event_name);
void
dump_internal_status();
int
load_events_from_db(THD *thd);
void
empty_queue();
protected:
void
find_n_remove_event(LEX_STRING db, LEX_STRING name);
......@@ -76,8 +75,6 @@ protected:
drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*)(LEX_STRING, Event_basic *));
void
empty_queue();
void
dbug_dump_queue(time_t now);
......@@ -86,11 +83,7 @@ protected:
pthread_mutex_t LOCK_event_queue;
pthread_cond_t COND_queue_state;
Event_db_repository *db_repository;
Event_scheduler *scheduler;
/* The sorted queue with the Event_job_data objects */
/* The sorted queue with the Event_queue_element objects */
QUEUE queue;
TIME next_activation_at;
......
......@@ -18,6 +18,7 @@
#include "event_data_objects.h"
#include "event_scheduler.h"
#include "event_queue.h"
#include "event_db_repository.h"
#ifdef __GNUC__
#if __GNUC__ >= 2
......@@ -34,6 +35,11 @@
extern pthread_attr_t connection_attrib;
Event_db_repository *Event_worker_thread::db_repository;
Events *Event_worker_thread::events_facade;
static
const LEX_STRING scheduler_states_names[] =
{
......@@ -60,8 +66,8 @@ struct scheduler_param {
et The event itself
*/
static void
evex_print_warnings(THD *thd, Event_job_data *et)
void
Event_worker_thread::print_warnings(THD *thd, Event_job_data *et)
{
MYSQL_ERROR *err;
DBUG_ENTER("evex_print_warnings");
......@@ -253,49 +259,97 @@ event_worker_thread(void *arg)
{
/* needs to be first for thread_stack */
THD *thd;
Event_job_data *event= (Event_job_data *)arg;
int ret;
Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg;
thd= event->thd;
thd->thread_stack= (char *) &thd; // remember where our stack is
DBUG_ENTER("event_worker_thread");
if (!post_init_event_thread(thd))
Event_worker_thread worker_thread;
worker_thread.run(thd, (Event_queue_element_for_exec *)arg);
deinit_event_thread(thd);
return 0; // Can't return anything here
}
/*
Function that executes an event in a child thread. Setups the
environment for the event execution and cleans after that.
SYNOPSIS
Event_worker_thread::run()
thd Thread context
event The Event_queue_element_for_exec object to be processed
*/
void
Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event)
{
int ret;
Event_job_data *job_data= NULL;
DBUG_ENTER("Event_worker_thread::run");
DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
"THD=0x%lx", time(NULL), thd));
if (post_init_event_thread(thd))
goto end;
if (!(job_data= new Event_job_data()))
goto end;
else if ((ret= db_repository->
load_named_event(thd, event->dbname, event->name, job_data)))
{
DBUG_PRINT("info", ("Baikonur, time is %ld, BURAN reporting and operational."
"THD: 0x%lx",
(long) time(NULL), (long) thd));
sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. "
"Execution %u",
event->dbname.str, event->name.str,
event->definer.str, thd->thread_id,
event->execution_count);
thd->enable_slow_log= TRUE;
ret= event->execute(thd);
evex_print_warnings(thd, event);
sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. "
"RetCode=%d", event->dbname.str, event->name.str,
event->definer.str, thd->thread_id, ret);
if (ret == EVEX_COMPILE_ERROR)
sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
event->dbname.str, event->name.str,
event->definer.str);
else if (ret == EVEX_MICROSECOND_UNSUP)
sql_print_information("SCHEDULER: MICROSECOND is not supported");
DBUG_PRINT("error", ("Got %d from load_named_event", ret));
goto end;
}
sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. ",
job_data->dbname.str, job_data->name.str,
job_data->definer.str, thd->thread_id);
thd->enable_slow_log= TRUE;
ret= job_data->execute(thd);
print_warnings(thd, job_data);
sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. "
"RetCode=%d", job_data->dbname.str, job_data->name.str,
job_data->definer.str, thd->thread_id, ret);
if (ret == EVEX_COMPILE_ERROR)
sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
job_data->dbname.str, job_data->name.str,
job_data->definer.str);
else if (ret == EVEX_MICROSECOND_UNSUP)
end:
delete job_data;
if (event->dropped)
{
sql_print_information("SCHEDULER: Dropping %s.%s", event->dbname.str,
event->name.str);
/*
Using db_repository can lead to a race condition because we access
the table without holding LOCK_metadata.
Scenario:
1. CREATE EVENT xyz AT ... (conn thread)
2. execute xyz (worker)
3. CREATE EVENT XYZ EVERY ... (conn thread)
4. drop xyz (worker)
5. XYZ was just created on disk but `drop xyz` of the worker dropped it.
A consequent load to create Event_queue_element will fail.
If all operations are performed under LOCK_metadata there is no such
problem. However, this comes at the price of introduction bi-directional
association between class Events and class Event_worker_thread.
*/
events_facade->drop_event(thd, event->dbname, event->name, FALSE);
}
DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str,
event->name.str));
delete event;
deinit_event_thread(thd);
DBUG_RETURN(0); // Can't return anything here
delete event;
}
......@@ -441,7 +495,6 @@ bool
Event_scheduler::run(THD *thd)
{
int res= FALSE;
Event_job_data *job_data;
DBUG_ENTER("Event_scheduler::run");
sql_print_information("SCHEDULER: Manager thread started with id %lu",
......@@ -454,18 +507,20 @@ Event_scheduler::run(THD *thd)
while (is_running())
{
Event_queue_element_for_exec *event_name;
/* Gets a minimized version */
if (queue->get_top_for_execution_if_time(thd, &job_data))
if (queue->get_top_for_execution_if_time(thd, &event_name))
{
sql_print_information("SCHEDULER: Serious error during getting next "
"event to execute. Stopping");
break;
}
DBUG_PRINT("info", ("get_top returned job_data: 0x%lx", (long) job_data));
if (job_data)
DBUG_PRINT("info", ("get_top returned job_data=0x%lx", event_name));
if (event_name)
{
if ((res= execute_top(thd, job_data)))
if ((res= execute_top(thd, event_name)))
break;
}
else
......@@ -499,7 +554,7 @@ Event_scheduler::run(THD *thd)
*/
bool
Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
Event_scheduler::execute_top(THD *thd, Event_queue_element_for_exec *event_name)
{
THD *new_thd;
pthread_t th;
......@@ -510,13 +565,13 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
pre_init_event_thread(new_thd);
new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
job_data->thd= new_thd;
event_name->thd= new_thd;
DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition",
job_data->dbname.str, job_data->name.str));
event_name->dbname.str, event_name->name.str));
/* Major failure */
if ((res= pthread_create(&th, &connection_attrib, event_worker_thread,
job_data)))
event_name)))
goto error;
++started_events;
......@@ -537,7 +592,7 @@ error:
delete new_thd;
pthread_mutex_unlock(&LOCK_thread_count);
}
delete job_data;
delete event_name;
DBUG_RETURN(TRUE);
}
......
......@@ -15,8 +15,11 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
class Event_queue;
class Event_job_data;
class Event_db_repository;
class Events;
void
pre_init_event_thread(THD* thd);
......@@ -27,6 +30,29 @@ post_init_event_thread(THD* thd);
void
deinit_event_thread(THD *thd);
class Event_worker_thread
{
public:
static void
init(Events *events, Event_db_repository *db_repo)
{
db_repository= db_repo;
events_facade= events;
}
void
run(THD *thd, Event_queue_element_for_exec *event);
private:
void
print_warnings(THD *thd, Event_job_data *et);
static Event_db_repository *db_repository;
static Events *events_facade;
};
class Event_scheduler
{
public:
......@@ -71,10 +97,9 @@ private:
uint
workers_count();
/* helper functions */
bool
execute_top(THD *thd, Event_job_data *job_data);
execute_top(THD *thd, Event_queue_element_for_exec *event_name);
/* helper functions for working with mutexes & conditionals */
void
......
......@@ -97,7 +97,7 @@ Event_queue events_event_queue;
static
Event_scheduler events_event_scheduler;
static
Event_db_repository events_event_db_repository;
Events Events::singleton;
......@@ -295,29 +295,6 @@ Events::Events()
}
/*
Opens mysql.event table with specified lock
SYNOPSIS
Events::open_event_table()
thd Thread context
lock_type How to lock the table
table We will store the open table here
RETURN VALUE
1 Cannot lock table
2 The table is corrupted - different number of fields
0 OK
*/
int
Events::open_event_table(THD *thd, enum thr_lock_type lock_type,
TABLE **table)
{
return db_repository->open_event_table(thd, lock_type, table);
}
/*
The function exported to the world for creating of events.
......@@ -351,16 +328,24 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists)
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->create_event(thd, parse_data, if_not_exists)))
{
if ((ret= event_queue->create_event(thd, parse_data->dbname,
parse_data->name)))
Event_queue_element *new_element;
if (!(new_element= new Event_queue_element()))
ret= TRUE; // OOM
else if ((ret= db_repository->load_named_event(thd, parse_data->dbname,
parse_data->name,
new_element)))
{
DBUG_ASSERT(ret == OP_LOAD_ERROR);
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
delete new_element;
}
else
event_queue->create_event(thd, new_element);
}
pthread_mutex_unlock(&LOCK_event_metadata);
DBUG_RETURN(ret);
}
......@@ -387,6 +372,7 @@ bool
Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
{
int ret;
Event_queue_element *new_element;
DBUG_ENTER("Events::update_event");
LEX_STRING *new_dbname= rename_to ? &rename_to->m_db : NULL;
LEX_STRING *new_name= rename_to ? &rename_to->m_name : NULL;
......@@ -400,12 +386,20 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->update_event(thd, parse_data, new_dbname, new_name)))
{
if ((ret= event_queue->update_event(thd, parse_data->dbname,
parse_data->name, new_dbname, new_name)))
LEX_STRING dbname= new_dbname ? *new_dbname : parse_data->dbname;
LEX_STRING name= new_name ? *new_name : parse_data->name;
if (!(new_element= new Event_queue_element()))
ret= TRUE; // OOM
else if ((ret= db_repository->load_named_event(thd, dbname, name,
new_element)))
{
DBUG_ASSERT(ret == OP_LOAD_ERROR);
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
delete new_element;
}
else
event_queue->update_event(thd, parse_data->dbname, parse_data->name,
new_element);
}
pthread_mutex_unlock(&LOCK_event_metadata);
......@@ -423,10 +417,6 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
name [in] Event's name
if_exists [in] When set and the event does not exist =>
warning onto the stack
only_from_disk [in] Whether to remove the event from the queue too.
In case of Event_job_data::drop() it's needed to
do only disk drop because Event_queue will handle
removal from memory queue.
RETURN VALUE
FALSE OK
......@@ -434,8 +424,7 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
*/
bool
Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
bool only_from_disk)
Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists)
{
int ret;
DBUG_ENTER("Events::drop_event");
......@@ -448,10 +437,7 @@ Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
pthread_mutex_lock(&LOCK_event_metadata);
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->drop_event(thd, dbname, name, if_exists)))
{
if (!only_from_disk)
event_queue->drop_event(thd, dbname, name);
}
event_queue->drop_event(thd, dbname, name);
pthread_mutex_unlock(&LOCK_event_metadata);
DBUG_RETURN(ret);
}
......@@ -655,11 +641,12 @@ Events::init()
}
check_system_tables_error= FALSE;
if (event_queue->init_queue(thd, db_repository))
if (event_queue->init_queue(thd) || load_events_from_db(thd))
{
sql_print_error("SCHEDULER: Error while loading from disk.");
goto end;
}
scheduler->init_scheduler(event_queue);
DBUG_ASSERT(opt_event_scheduler == Events::EVENTS_ON ||
......@@ -667,6 +654,7 @@ Events::init()
if (opt_event_scheduler == Events::EVENTS_ON)
res= scheduler->start();
Event_worker_thread::init(this, db_repository);
end:
delete thd;
/* Remember that we don't have a THD */
......@@ -903,3 +891,131 @@ Events::check_system_tables(THD *thd)
DBUG_RETURN(ret);
}
/*
Loads all ENABLED events from mysql.event into the prioritized
queue. Called during scheduler main thread initialization. Compiles
the events. Creates Event_queue_element instances for every ENABLED event
from mysql.event.
SYNOPSIS
Events::load_events_from_db()
thd Thread context. Used for memory allocation in some cases.
RETURN VALUE
0 OK
!0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
EVEX_COMPILE_ERROR) - in all these cases mysql.event was
tampered.
NOTES
Reports the error to the console
*/
int
Events::load_events_from_db(THD *thd)
{
TABLE *table;
READ_RECORD read_record_info;
int ret= -1;
uint count= 0;
bool clean_the_queue= TRUE;
DBUG_ENTER("Events::load_events_from_db");
DBUG_PRINT("enter", ("thd=0x%lx", thd));
if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
{
sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
}
init_read_record(&read_record_info, thd, table ,NULL,1,0);
while (!(read_record_info.read_record(&read_record_info)))
{
Event_queue_element *et;
if (!(et= new Event_queue_element))
{
DBUG_PRINT("info", ("Out of memory"));
break;
}
DBUG_PRINT("info", ("Loading event from row."));
if ((ret= et->load_from_row(table)))
{
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
"Table probably corrupted");
break;
}
if (et->status != Event_queue_element::ENABLED)
{
DBUG_PRINT("info",("%s is disabled",et->name.str));
delete et;
continue;
}
/* let's find when to be executed */
if (et->compute_next_execution_time())
{
sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
" Skipping", et->dbname.str, et->name.str);
continue;
}
{
Event_job_data temp_job_data;
DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
temp_job_data.load_from_row(table);
/*
We load only on scheduler root just to check whether the body
compiles.
*/
switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
case EVEX_MICROSECOND_UNSUP:
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
"supported but found in mysql.event");
break;
case EVEX_COMPILE_ERROR:
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
et->dbname.str, et->name.str);
break;
default:
break;
}
thd->end_statement();
thd->cleanup_after_query();
}
if (ret)
{
delete et;
goto end;
}
DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
event_queue->create_event(thd, et);
count++;
}
clean_the_queue= FALSE;
end:
end_read_record(&read_record_info);
if (clean_the_queue)
{
event_queue->empty_queue();
ret= -1;
}
else
{
ret= 0;
sql_print_information("SCHEDULER: Loaded %d event%s", count,
(count == 1)?"":"s");
}
close_thread_tables(thd);
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);
}
......@@ -42,13 +42,6 @@ sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs);
class Events
{
public:
/*
Quite NOT the best practice and will be removed once
Event_timed::drop() and Event_timed is fixed not do drop directly
or other scheme will be found.
*/
friend class Event_queue_element;
/* The order should match the order in opt_typelib */
enum enum_opt_event_scheduler
{
......@@ -92,15 +85,11 @@ public:
update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to);
bool
drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
bool only_from_disk);
drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists);
void
drop_schema_events(THD *thd, char *db);
int
open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
bool
show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
......@@ -119,6 +108,9 @@ private:
bool
check_system_tables(THD *thd);
int
load_events_from_db(THD *thd);
/* Singleton DP is used */
Events();
~Events(){}
......
......@@ -4047,8 +4047,7 @@ end_with_restore_list:
if (!(res= Events::get_instance()->drop_event(thd,
lex->spname->m_db,
lex->spname->m_name,
lex->drop_if_exists,
FALSE)))
lex->drop_if_exists)))
send_ok(thd);
}
break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment