Commit 3b840ade authored by andrey@lmy004's avatar andrey@lmy004

WL#3337 (Event scheduler new architecture)

Cleaned up the code a bit. Fixed few leaks.
This code still does not load events on server startup
from disk. The problem is that there is a need for a THD instance, which
does not exist during server boot. This will be solved soon.
Still Event_timed is used both for the memory queue and for exectution.
This will be changed according to WL#3337 probably in the next commit.
parent 2bdd872e
......@@ -530,18 +530,14 @@ Event_parse_data::init_ends(THD *thd, Item *new_ends)
Event_timed::Event_timed()
*/
Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0),
running(0), thread_id(0), status_changed(false),
Event_timed::Event_timed():status_changed(false),
last_executed_changed(false), expression(0),
created(0), modified(0),
on_completion(Event_timed::ON_COMPLETION_DROP),
status(Event_timed::ENABLED), sphead(0),
sql_mode(0), dropped(false),
free_sphead_on_delete(true), flags(0)
sql_mode(0), dropped(false), flags(0)
{
pthread_mutex_init(&this->LOCK_running, MY_MUTEX_INIT_FAST);
pthread_cond_init(&this->COND_finished, NULL);
init();
}
......@@ -555,49 +551,11 @@ Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0),
Event_timed::~Event_timed()
{
deinit_mutexes();
free_root(&mem_root, MYF(0));
if (free_sphead_on_delete)
free_sp();
}
/*
Destructor
SYNOPSIS
Event_timed::deinit_mutexes()
*/
void
Event_timed::deinit_mutexes()
{
pthread_mutex_destroy(&this->LOCK_running);
pthread_cond_destroy(&this->COND_finished);
}
/*
Checks whether the event is running
SYNOPSIS
Event_timed::is_running()
*/
bool
Event_timed::is_running()
{
bool ret;
VOID(pthread_mutex_lock(&this->LOCK_running));
ret= running;
VOID(pthread_mutex_unlock(&this->LOCK_running));
return ret;
}
/*
Init all member variables
......@@ -1253,7 +1211,7 @@ Event_timed::update_fields(THD *thd)
Open_tables_state backup;
int ret;
DBUG_ENTER("Event_timed::update_time_fields");
DBUG_ENTER("Event_timed::update_fields");
DBUG_PRINT("enter", ("name: %*s", name.length, name.str));
......@@ -1382,7 +1340,7 @@ Event_timed::get_create_event(THD *thd, String *buf)
Executes the event (the underlying sp_head object);
SYNOPSIS
evex_fill_row()
Event_timed::execute()
thd THD
mem_root If != NULL use it to compile the event on it
......@@ -1607,149 +1565,6 @@ done:
}
extern pthread_attr_t connection_attrib;
/*
Checks whether is possible and forks a thread. Passes self as argument.
RETURN VALUE
EVENT_EXEC_STARTED OK
EVENT_EXEC_ALREADY_EXEC Thread not forked, already working
EVENT_EXEC_CANT_FORK Unable to spawn thread (error)
*/
int
Event_timed::spawn_now(void * (*thread_func)(void*), void *arg)
{
THD *thd= current_thd;
int ret= EVENT_EXEC_STARTED;
DBUG_ENTER("Event_timed::spawn_now");
DBUG_PRINT("info", ("[%s.%s]", dbname.str, name.str));
VOID(pthread_mutex_lock(&this->LOCK_running));
DBUG_PRINT("info", ("SCHEDULER: execute_at of %s is %lld", name.str,
TIME_to_ulonglong_datetime(&execute_at)));
mark_last_executed(thd);
if (compute_next_execution_time())
{
sql_print_error("SCHEDULER: Error while computing time of %s.%s . "
"Disabling after execution.", dbname.str, name.str);
status= DISABLED;
}
DBUG_PRINT("evex manager", ("[%10s] next exec at [%llu]", name.str,
TIME_to_ulonglong_datetime(&execute_at)));
/*
1. For one-time event : year is > 0 and expression is 0
2. For recurring, expression is != -=> check execute_at_null in this case
*/
if ((execute_at.year && !expression) || execute_at_null)
{
sql_print_information("SCHEDULER: [%s.%s of %s] no more executions "
"after this one", dbname.str, name.str,
definer.str);
flags |= EVENT_EXEC_NO_MORE | EVENT_FREE_WHEN_FINISHED;
}
update_fields(thd);
if (!in_spawned_thread)
{
pthread_t th;
in_spawned_thread= true;
if (pthread_create(&th, &connection_attrib, thread_func, arg))
{
DBUG_PRINT("info", ("problem while spawning thread"));
ret= EVENT_EXEC_CANT_FORK;
in_spawned_thread= false;
}
}
else
{
DBUG_PRINT("info", ("already in spawned thread. skipping"));
ret= EVENT_EXEC_ALREADY_EXEC;
}
VOID(pthread_mutex_unlock(&this->LOCK_running));
DBUG_RETURN(ret);
}
bool
Event_timed::spawn_thread_finish(THD *thd)
{
bool should_free;
DBUG_ENTER("Event_timed::spawn_thread_finish");
VOID(pthread_mutex_lock(&LOCK_running));
in_spawned_thread= false;
DBUG_PRINT("info", ("Sending COND_finished for thread %d", thread_id));
thread_id= 0;
if (dropped)
drop(thd);
pthread_cond_broadcast(&COND_finished);
should_free= flags & EVENT_FREE_WHEN_FINISHED;
VOID(pthread_mutex_unlock(&LOCK_running));
DBUG_RETURN(should_free);
}
/*
Kills a running event
SYNOPSIS
Event_timed::kill_thread()
RETURN VALUE
0 OK
-1 EVEX_CANT_KILL
!0 Error
*/
int
Event_timed::kill_thread(THD *thd)
{
int ret= 0;
DBUG_ENTER("Event_timed::kill_thread");
pthread_mutex_lock(&LOCK_running);
DBUG_PRINT("info", ("thread_id=%lu", thread_id));
if (thread_id == thd->thread_id)
{
/*
We don't kill ourselves in cases like :
alter event e_43 do alter event e_43 do set @a = 4 because
we will never receive COND_finished.
*/
DBUG_PRINT("info", ("It's not safe to kill ourselves in self altering queries"));
ret= EVEX_CANT_KILL;
}
else if (thread_id && !(ret= kill_one_thread(thd, thread_id, false)))
{
thd->enter_cond(&COND_finished, &LOCK_running, "Waiting for finished");
DBUG_PRINT("info", ("Waiting for COND_finished from thread %d", thread_id));
while (thread_id)
pthread_cond_wait(&COND_finished, &LOCK_running);
DBUG_PRINT("info", ("Got COND_finished"));
/* This will implicitly unlock LOCK_running. Hence we return before that */
thd->exit_cond("");
DBUG_RETURN(0);
}
else if (!thread_id && in_spawned_thread)
{
/*
Because the manager thread waits for the forked thread to update thread_id
this situation is impossible.
*/
DBUG_ASSERT(0);
}
pthread_mutex_unlock(&LOCK_running);
DBUG_PRINT("exit", ("%d", ret));
DBUG_RETURN(ret);
}
/*
Checks whether two events have the same name
......
......@@ -63,12 +63,6 @@ class Event_timed
{
Event_timed(const Event_timed &); /* Prevent use of these */
void operator=(Event_timed &);
my_bool in_spawned_thread;
ulong locked_by_thread_id;
my_bool running;
ulong thread_id;
pthread_mutex_t LOCK_running;
pthread_cond_t COND_finished;
bool status_changed;
bool last_executed_changed;
......@@ -118,7 +112,6 @@ public:
ulong sql_mode;
bool dropped;
bool free_sphead_on_delete;
uint flags;//all kind of purposes
static void *operator new(size_t size)
......@@ -146,9 +139,6 @@ public:
void
init();
void
deinit_mutexes();
int
load_from_row(TABLE *table);
......@@ -173,23 +163,8 @@ public:
int
compile(THD *thd, MEM_ROOT *mem_root);
bool
is_running();
int
spawn_now(void * (*thread_func)(void*), void *arg);
bool
spawn_thread_finish(THD *thd);
void
free_sp();
int
kill_thread(THD *thd);
void
set_thread_id(ulong tid) { thread_id= tid; }
};
......
......@@ -374,8 +374,7 @@ Event_db_repository::table_scan_all_for_i_s(THD *thd, TABLE *schema_table,
ret= read_record_info.read_record(&read_record_info);
if (ret == 0)
ret= copy_event_to_schema_table(thd, schema_table, event_table);
}
while (ret == 0);
} while (ret == 0);
DBUG_PRINT("info", ("Scan finished. ret=%d", ret));
end_read_record(&read_record_info);
......@@ -464,8 +463,7 @@ Event_db_repository::fill_schema_events(THD *thd, TABLE_LIST *tables, char *db)
int
Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
Event_timed **ett,
TABLE *tbl, MEM_ROOT *root)
Event_timed **ett, TABLE *tbl)
{
TABLE *table;
int ret;
......@@ -505,7 +503,7 @@ done:
if (ret)
{
delete et;
et= 0;
et= NULL;
}
/* don't close the table if we haven't opened it ourselves */
if (!tbl && table)
......@@ -518,7 +516,6 @@ done:
int
Event_db_repository::init_repository()
{
init_alloc_root(&repo_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
return 0;
}
......@@ -526,7 +523,6 @@ Event_db_repository::init_repository()
void
Event_db_repository::deinit_repository()
{
free_root(&repo_root, MYF(0));
}
......@@ -731,7 +727,8 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
parse_data->name.str));
DBUG_PRINT("info", ("check existance of an event with the same name"));
if (!evex_db_find_event_by_name(thd, parse_data->dbname, parse_data->name, table))
if (!evex_db_find_event_by_name(thd, parse_data->dbname,
parse_data->name, table))
{
if (create_if_not)
{
......@@ -1026,14 +1023,12 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db,
*/
if (db.length > table->field[ET_FIELD_DB]->field_length ||
name.length > table->field[ET_FIELD_NAME]->field_length)
DBUG_RETURN(EVEX_KEY_NOT_FOUND);
table->field[ET_FIELD_DB]->store(db.str, db.length, &my_charset_bin);
table->field[ET_FIELD_NAME]->store(name.str, name.length, &my_charset_bin);
key_copy(key, table->record[0], table->key_info,
table->key_info->key_length);
key_copy(key, table->record[0], table->key_info, table->key_info->key_length);
if (table->file->index_read_idx(table->record[0], 0, key,
table->key_info->key_length,
......@@ -1125,7 +1120,7 @@ Event_db_repository::drop_events_by_field(THD *thd,
the table, compiles and inserts it into the cache.
SYNOPSIS
Event_scheduler::load_named_event()
Event_db_repository::load_named_event_timed()
thd THD
etn The name of the event to load and compile on scheduler's root
etn_new The loaded event
......@@ -1136,7 +1131,8 @@ Event_db_repository::drop_events_by_field(THD *thd,
*/
int
Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
Event_db_repository::load_named_event_timed(THD *thd, LEX_STRING dbname,
LEX_STRING name,
Event_timed **etn_new)
{
int ret= 0;
......@@ -1144,12 +1140,12 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
Event_timed *et_loaded= NULL;
Open_tables_state backup;
DBUG_ENTER("Event_db_repository::load_named_event");
DBUG_ENTER("Event_db_repository::load_named_event_timed");
DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str));
thd->reset_n_backup_open_tables_state(&backup);
/* No need to use my_error() here because db_find_event() has done it */
ret= find_event(thd, dbname, name, &et_loaded, NULL, &repo_root);
ret= find_event(thd, dbname, name, &et_loaded, NULL);
thd->restore_backup_open_tables_state(&backup);
/* In this case no memory was allocated so we don't need to clean */
if (ret)
......@@ -1171,3 +1167,57 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
DBUG_RETURN(OP_OK);
}
/*
Looks for a named event in mysql.event and then loads it from
the table, compiles and inserts it into the cache.
SYNOPSIS
Event_db_repository::load_named_event_job()
thd THD
etn The name of the event to load and compile on scheduler's root
etn_new The loaded event
RETURN VALUE
NULL Error during compile or the event is non-enabled.
otherwise Address
*/
int
Event_db_repository::load_named_event_job(THD *thd, LEX_STRING dbname,
LEX_STRING name,
Event_job_data **etn_new)
{
int ret= 0;
MEM_ROOT *tmp_mem_root;
Event_timed *et_loaded= NULL;
Open_tables_state backup;
DBUG_ENTER("Event_db_repository::load_named_event_job");
DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str));
#if 0
thd->reset_n_backup_open_tables_state(&backup);
/* No need to use my_error() here because db_find_event() has done it */
ret= find_event(thd, dbname, name, &et_loaded, NULL);
thd->restore_backup_open_tables_state(&backup);
/* In this case no memory was allocated so we don't need to clean */
if (ret)
DBUG_RETURN(OP_LOAD_ERROR);
if (et_loaded->status != Event_timed::ENABLED)
{
/*
We don't load non-enabled events.
In db_find_event() `et_new` was allocated on the heap and not on
scheduler_root therefore we delete it here.
*/
delete et_loaded;
DBUG_RETURN(OP_DISABLED_EVENT);
}
et_loaded->compute_next_execution_time();
*etn_new= et_loaded;
#endif
DBUG_RETURN(OP_OK);
}
......@@ -56,6 +56,7 @@ fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */);
class Event_timed;
class Event_parse_data;
class Event_queue_element;
class Event_job_data;
class Event_db_repository
{
......@@ -88,10 +89,15 @@ public:
int
find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_timed **ett,
TABLE *tbl, MEM_ROOT *root);
TABLE *tbl);
int
load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_timed **etn_new);
load_named_event_timed(THD *thd, LEX_STRING dbname, LEX_STRING name,
Event_timed **etn_new);
int
load_named_event_job(THD *thd, LEX_STRING dbname, LEX_STRING name,
Event_job_data **etn_new);
int
find_event_by_name(THD *thd, LEX_STRING db, LEX_STRING name, TABLE *table);
......@@ -116,8 +122,6 @@ private:
static bool
check_system_tables(THD *thd);
MEM_ROOT repo_root;
/* Prevent use of these */
Event_db_repository(const Event_db_repository &);
void operator=(Event_db_repository &);
......
This diff is collapsed.
......@@ -38,15 +38,15 @@ public:
deinit_mutexes();
bool
init(Event_db_repository *db_repo);
init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched);
void
deinit();
deinit_queue();
/* Methods for queue management follow */
int
create_event(THD *thd, Event_parse_data *et, bool check_existence);
create_event(THD *thd, Event_parse_data *et);
int
update_event(THD *thd, Event_parse_data *et, LEX_STRING *new_schema,
......@@ -55,13 +55,9 @@ public:
bool
drop_event(THD *thd, sp_name *name);
int
void
drop_schema_events(THD *thd, LEX_STRING schema);
int
drop_user_events(THD *thd, LEX_STRING *definer)
{ DBUG_ASSERT(0); return 0;}
uint
events_count();
......@@ -89,7 +85,7 @@ public:
void
top_changed();
///////////////protected
protected:
Event_timed *
find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q);
......@@ -105,8 +101,6 @@ public:
Event_db_repository *db_repository;
/* The sorted queue with the Event_timed objects */
QUEUE queue;
uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line;
......@@ -122,10 +116,16 @@ public:
unlock_data(const char *func, uint line);
void
on_queue_change();
notify_observers();
void
dbug_dump_queue(time_t now);
Event_scheduler_ng *scheduler;
protected:
//public:
/* The sorted queue with the Event_timed objects */
QUEUE queue;
};
......
This diff is collapsed.
......@@ -16,191 +16,4 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
class sp_name;
class Event_timed;
class Event_db_repository;
class Event_queue;
class THD;
int
events_init();
void
events_shutdown();
#include "event_queue.h"
#include "event_scheduler.h"
class Event_scheduler
{
public:
enum enum_state
{
UNINITIALIZED= 0,
INITIALIZED,
COMMENCING,
CANTSTART,
RUNNING,
SUSPENDED,
IN_SHUTDOWN
};
enum enum_suspend_or_resume
{
SUSPEND= 1,
RESUME= 2
};
/* This is the current status of the life-cycle of the scheduler. */
enum enum_state state;
static void
create_instance(Event_queue *queue);
static void
init_mutexes();
static void
destroy_mutexes();
/* Singleton access */
static Event_scheduler*
get_instance();
bool
init(Event_db_repository *db_repo);
void
destroy();
/* State changing methods follow */
bool
start();
int
stop();
bool
start_suspended();
/*
Need to be public because has to be called from the function
passed to pthread_create.
*/
bool
run(THD *thd);
int
suspend_or_resume(enum enum_suspend_or_resume action);
/*
static void
init_mutexes();
static void
destroy_mutexes();
*/
void
report_error_during_start();
/* Information retrieving methods follow */
enum enum_state
get_state();
bool
initialized();
static int
dump_internal_status(THD *thd);
/* helper functions for working with mutexes & conditionals */
void
lock_data(const char *func, uint line);
void
unlock_data(const char *func, uint line);
int
cond_wait(int cond, pthread_mutex_t *mutex);
void
queue_changed();
Event_queue *event_queue;
protected:
uint
workers_count();
/* helper functions */
bool
execute_top(THD *thd, Event_timed *et);
void
clean_memory(THD *thd);
void
stop_all_running_events(THD *thd);
bool
check_n_suspend_if_needed(THD *thd);
bool
check_n_wait_for_non_empty_queue(THD *thd);
/* Singleton DP is used */
Event_scheduler();
pthread_mutex_t LOCK_data;
pthread_mutex_t *LOCK_scheduler_data;
/* The MEM_ROOT of the object */
MEM_ROOT scheduler_root;
/* Set to start the scheduler in suspended state */
bool start_scheduler_suspended;
/*
Holds the thread id of the executor thread or 0 if the executor is not
running. It is used by ::shutdown() to know which thread to kill with
kill_one_thread(). The latter wake ups a thread if it is waiting on a
conditional variable and sets thd->killed to non-zero.
*/
ulong thread_id;
enum enum_cond_vars
{
COND_NONE= -1,
COND_new_work= 0,
COND_started_or_stopped,
COND_suspend_or_resume,
/* Must be always last */
COND_LAST
};
uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line;
const char* mutex_last_locked_in_func;
const char* mutex_last_unlocked_in_func;
int cond_waiting_on;
bool mutex_scheduler_data_locked;
static const char * const cond_vars_names[COND_LAST];
pthread_cond_t cond_vars[COND_LAST];
/* Singleton instance */
static Event_scheduler *singleton;
private:
/* Prevent use of these */
Event_scheduler(const Event_scheduler &);
void operator=(Event_scheduler &);
};
#endif /* _EVENT_SCHEDULER_H_ */
......@@ -212,6 +212,7 @@ end:
pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end();
DBUG_RETURN(0); // Against gcc warnings
}
......@@ -296,26 +297,22 @@ end:
delete event;
my_thread_end();
DBUG_RETURN(0); // Against gcc warnings
}
bool
Event_scheduler_ng::init(Event_queue *q)
Event_scheduler_ng::init_scheduler(Event_queue *q)
{
thread_id= 0;
state= INITIALIZED;
/* init memory root */
queue= q;
return FALSE;
}
void
Event_scheduler_ng::deinit()
{
}
Event_scheduler_ng::deinit_scheduler() {}
void
......@@ -477,7 +474,6 @@ Event_scheduler_ng::run(THD *thd)
pthread_cond_signal(&COND_state);
error:
state= INITIALIZED;
stop_all_running_events(thd);
UNLOCK_SCHEDULER_DATA();
sql_print_information("SCHEDULER: Stopped");
......@@ -560,98 +556,18 @@ Event_scheduler_ng::workers_count()
}
/*
Stops all running events
SYNOPSIS
Event_scheduler::stop_all_running_events()
thd Thread
NOTE
LOCK_scheduler data must be acquired prior to call to this method
*/
void
Event_scheduler_ng::stop_all_running_events(THD *thd)
{
CHARSET_INFO *scs= system_charset_info;
uint i;
DYNAMIC_ARRAY running_threads;
THD *tmp;
DBUG_ENTER("Event_scheduler::stop_all_running_events");
DBUG_PRINT("enter", ("workers_count=%d", workers_count()));
my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10);
bool had_super= FALSE;
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
if (tmp->command == COM_DAEMON)
continue;
if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
push_dynamic(&running_threads, (gptr) &tmp->thread_id);
}
VOID(pthread_mutex_unlock(&LOCK_thread_count));
/* We need temporarily SUPER_ACL to be able to kill our offsprings */
if (!(thd->security_ctx->master_access & SUPER_ACL))
thd->security_ctx->master_access|= SUPER_ACL;
else
had_super= TRUE;
char tmp_buff[10*STRING_BUFFER_USUAL_SIZE];
char int_buff[STRING_BUFFER_USUAL_SIZE];
String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
String int_string(int_buff, sizeof(int_buff), scs);
tmp_string.length(0);
for (i= 0; i < running_threads.elements; ++i)
{
int ret;
ulong thd_id= *dynamic_element(&running_threads, i, ulong*);
int_string.set((longlong) thd_id,scs);
tmp_string.append(int_string);
if (i < running_threads.elements - 1)
tmp_string.append(' ');
if ((ret= kill_one_thread(thd, thd_id, FALSE)))
{
sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret);
break;
}
}
if (running_threads.elements)
sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr());
if (!had_super)
thd->security_ctx->master_access &= ~SUPER_ACL;
delete_dynamic(&running_threads);
sql_print_information("SCHEDULER: Waiting for worker threads to finish");
while (workers_count())
my_sleep(100000);
DBUG_VOID_RETURN;
}
/*
Signals the main scheduler thread that the queue has changed
its state.
SYNOPSIS
Event_scheduler::queue_changed()
Event_scheduler_ng::queue_changed()
*/
void
Event_scheduler_ng::queue_changed()
{
DBUG_ENTER("Event_scheduler::queue_changed");
DBUG_ENTER("Event_scheduler_ng::queue_changed");
DBUG_PRINT("info", ("Sending COND_state"));
pthread_cond_signal(&COND_state);
DBUG_VOID_RETURN;
......@@ -662,8 +578,7 @@ void
Event_scheduler_ng::lock_data(const char *func, uint line)
{
DBUG_ENTER("Event_scheduler_ng::lock_mutex");
DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
&LOCK_scheduler_state, func, line));
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
pthread_mutex_lock(&LOCK_scheduler_state);
mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line;
......@@ -675,9 +590,8 @@ Event_scheduler_ng::lock_data(const char *func, uint line)
void
Event_scheduler_ng::unlock_data(const char *func, uint line)
{
DBUG_ENTER("Event_scheduler_ng::UNLOCK_mutex");
DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
&LOCK_scheduler_state, func, line));
DBUG_ENTER("Event_scheduler_ng::unlock_mutex");
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
mutex_last_unlocked_at_line= line;
mutex_scheduler_data_locked= FALSE;
mutex_last_unlocked_in_func= func;
......
......@@ -48,10 +48,10 @@ public:
run(THD *thd);
bool
init(Event_queue *queue);
init_scheduler(Event_queue *queue);
void
deinit();
deinit_scheduler();
void
init_mutexes();
......@@ -78,9 +78,6 @@ private:
bool
execute_top(THD *thd, Event_timed *job_data);
void
stop_all_running_events(THD *thd);
/* helper functions for working with mutexes & conditionals */
void
lock_data(const char *func, uint line);
......@@ -104,7 +101,6 @@ private:
pthread_cond_t COND_state;
Event_queue *queue;
Event_db_repository *db_repository;
uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line;
......
......@@ -17,10 +17,10 @@
#include "mysql_priv.h"
#include "events.h"
#include "event_data_objects.h"
#include "event_scheduler.h"
#include "event_db_repository.h"
#include "sp_head.h"
#include "event_queue.h"
#include "event_scheduler_ng.h"
#include "sp_head.h"
/*
TODO list :
......@@ -48,6 +48,21 @@ Warning:
*/
/*
If the user (un)intentionally removes an event directly from mysql.event
the following sequence has to be used to be able to remove the in-memory
counterpart.
1. CREATE EVENT the_name ON SCHEDULE EVERY 1 SECOND DISABLE DO SELECT 1;
2. DROP EVENT the_name
In other words, the first one will create a row in mysql.event . In the
second step because there will be a line, disk based drop will pass and
the scheduler will remove the memory counterpart. The reason is that
in-memory queue does not check whether the event we try to drop from memory
is disabled. Disabled events are not kept in-memory because they are not
eligible for execution.
*/
const char *event_scheduler_state_names[]=
{ "OFF", "0", "ON", "1", "SUSPEND", "2", NullS };
......@@ -284,17 +299,15 @@ Events::open_event_table(THD *thd, enum thr_lock_type lock_type,
*/
int
Events::create_event(THD *thd, Event_parse_data *parse_data, uint create_options,
Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists,
uint *rows_affected)
{
int ret;
DBUG_ENTER("Events::create_event");
if (!(ret= db_repository->
create_event(thd, parse_data,
create_options & HA_LEX_CREATE_IF_NOT_EXISTS,
if (!(ret= db_repository->create_event(thd, parse_data, if_not_exists,
rows_affected)))
{
if ((ret= event_queue->create_event(thd, parse_data, true)))
if ((ret= event_queue->create_event(thd, parse_data)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
}
/* No need to close the table, it will be closed in sql_parse::do_command */
......@@ -350,9 +363,10 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
SYNOPSIS
Events::drop_event()
thd THD
name event's name
drop_if_exists if set and the event not existing => warning onto the stack
rows_affected affected number of rows is returned heres
name Event's name
if_exists When set and the event does not exist => warning onto
the stack
rows_affected Affected number of rows is returned heres
RETURN VALUE
0 OK
......@@ -360,15 +374,13 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
*/
int
Events::drop_event(THD *thd, sp_name *name, bool drop_if_exists,
uint *rows_affected)
Events::drop_event(THD *thd, sp_name *name, bool if_exists, uint *rows_affected)
{
int ret;
DBUG_ENTER("Events::drop_event");
if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name,
drop_if_exists, rows_affected)))
if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name, if_exists,
rows_affected)))
{
if ((ret= event_queue->drop_event(thd, name)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
......@@ -401,7 +413,7 @@ Events::show_create_event(THD *thd, sp_name *spn)
DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str));
thd->reset_n_backup_open_tables_state(&backup);
ret= db_repository->find_event(thd, spn->m_db, spn->m_name, &et, NULL, thd->mem_root);
ret= db_repository->find_event(thd, spn->m_db, spn->m_name, &et, NULL);
thd->restore_backup_open_tables_state(&backup);
if (!ret)
......@@ -472,7 +484,7 @@ Events::drop_schema_events(THD *thd, char *db)
DBUG_ENTER("evex_drop_db_events");
DBUG_PRINT("enter", ("dropping events from %s", db));
ret= event_queue->drop_schema_events(thd, db_lex);
event_queue->drop_schema_events(thd, db_lex);
ret= db_repository->drop_schema_events(thd, db_lex);
DBUG_RETURN(ret);
......@@ -500,9 +512,8 @@ Events::init()
Event_db_repository *db_repo;
DBUG_ENTER("Events::init");
db_repository->init_repository();
event_queue->init(db_repository);
event_queue->scheduler= scheduler_ng;
scheduler_ng->init(event_queue);
event_queue->init_queue(db_repository, scheduler_ng);
scheduler_ng->init_scheduler(event_queue);
/* it should be an assignment! */
if (opt_event_scheduler)
......@@ -532,8 +543,9 @@ Events::deinit()
DBUG_ENTER("Events::deinit");
scheduler_ng->stop();
scheduler_ng->deinit();
event_queue->deinit();
scheduler_ng->deinit_scheduler();
event_queue->deinit_queue();
db_repository->deinit_repository();
DBUG_VOID_RETURN;
......
......@@ -75,7 +75,7 @@ public:
get_instance();
int
create_event(THD *thd, Event_parse_data *parse_data, uint create_options,
create_event(THD *thd, Event_parse_data *parse_data, bool if_exists,
uint *rows_affected);
int
......@@ -83,7 +83,7 @@ public:
uint *rows_affected);
int
drop_event(THD *thd, sp_name *name, bool drop_if_exists, uint *rows_affected);
drop_event(THD *thd, sp_name *name, bool if_exists, uint *rows_affected);
int
drop_schema_events(THD *thd, char *db);
......@@ -105,9 +105,9 @@ public:
int
dump_internal_status(THD *thd);
Event_db_repository *db_repository;
Event_queue *event_queue;
Event_scheduler_ng *scheduler_ng;
Event_db_repository *db_repository;
private:
/* Singleton DP is used */
......
......@@ -57,7 +57,6 @@
#include <myisam.h>
#include <my_dir.h>
#include "event_scheduler.h"
#include "events.h"
/* WITH_BERKELEY_STORAGE_ENGINE */
......@@ -3894,7 +3893,6 @@ bool
sys_var_event_scheduler::update(THD *thd, set_var *var)
{
int res;
Event_scheduler *scheduler= Event_scheduler::get_instance();
/* here start the thread if not running. */
DBUG_ENTER("sys_var_event_scheduler::update");
if (Events::opt_event_scheduler == 0)
......@@ -3927,8 +3925,6 @@ sys_var_event_scheduler::update(THD *thd, set_var *var)
byte *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type,
LEX_STRING *base)
{
Event_scheduler *scheduler= Event_scheduler::get_instance();
if (Events::opt_event_scheduler == 0)
thd->sys_var_tmp.long_value= 0;
else if (Events::get_instance()->is_started())
......
......@@ -3832,25 +3832,26 @@ end_with_restore_list:
case SQLCOM_CREATE_EVENT:
case SQLCOM_ALTER_EVENT:
{
uint rows_affected= 1;
uint affected= 1;
DBUG_ASSERT(lex->event_parse_data);
switch (lex->sql_command) {
case SQLCOM_CREATE_EVENT:
res= Events::get_instance()->create_event(thd, lex->event_parse_data,
(uint) lex->create_info.options,
&rows_affected);
res= Events::get_instance()->
create_event(thd, lex->event_parse_data,
lex->create_info.options & HA_LEX_CREATE_IF_NOT_EXISTS,
&affected);
break;
case SQLCOM_ALTER_EVENT:
res= Events::get_instance()->update_event(thd, lex->event_parse_data,
lex->spname, &rows_affected);
res= Events::get_instance()->
update_event(thd, lex->event_parse_data, lex->spname, &affected);
break;
default:;
}
DBUG_PRINT("info", ("CREATE/ALTER/DROP returned error code=%d af_rows=%d",
res, rows_affected));
DBUG_PRINT("info",("DDL error code=%d affected=%d", res, affected));
if (!res)
send_ok(thd, rows_affected);
send_ok(thd, affected);
/* Don't do it, if we are inside a SP */
if (!thd->spcont)
{
delete lex->sphead;
......@@ -3867,8 +3868,7 @@ end_with_restore_list:
if (! lex->spname->m_db.str)
{
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
res= true;
break;
goto error;
}
if (check_access(thd, EVENT_ACL, lex->spname->m_db.str, 0, 0, 0,
is_schema_db(lex->spname->m_db.str)))
......@@ -3885,11 +3885,10 @@ end_with_restore_list:
res= Events::get_instance()->show_create_event(thd, lex->spname);
else
{
uint rows_affected= 1;
if (!(res= Events::get_instance()->drop_event(thd, lex->spname,
lex->drop_if_exists,
&rows_affected)))
send_ok(thd, rows_affected);
uint affected= 1;
if (!(res= Events::get_instance()->
drop_event(thd, lex->spname, lex->drop_if_exists, &affected)))
send_ok(thd, affected);
}
break;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment