Commit 5d626c5c authored by andrey@example.com's avatar andrey@example.com

WL#3337 (Event scheduler new architecture)

Post-review fixes. Mostly whitespace, int-to-bool return value, fixed comments
parent 5b2608db
......@@ -105,8 +105,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
tztime.cc my_time.c my_user.c my_decimal.cc\
sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \
sp_cache.cc parse_file.cc sql_trigger.cc \
event_scheduler.cc events.cc event_data_objects.cc \
event_queue.cc event_db_repository.cc \
event_scheduler.cc event_data_objects.cc \
event_queue.cc event_db_repository.cc events.cc \
sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc
......
......@@ -59,19 +59,17 @@ Event_parse_data::new_instance(THD *thd)
*/
Event_parse_data::Event_parse_data()
:on_completion(ON_COMPLETION_DROP), status(ENABLED),
item_starts(NULL), item_ends(NULL), item_execute_at(NULL),
starts_null(TRUE), ends_null(TRUE), execute_at_null(TRUE),
item_expression(NULL), expression(0)
{
DBUG_ENTER("Event_parse_data::Event_parse_data");
item_execute_at= item_expression= item_starts= item_ends= NULL;
status= ENABLED;
on_completion= ON_COMPLETION_DROP;
expression= 0;
/* Actually in the parser STARTS is always set */
set_zero_time(&starts, MYSQL_TIMESTAMP_DATETIME);
set_zero_time(&ends, MYSQL_TIMESTAMP_DATETIME);
set_zero_time(&execute_at, MYSQL_TIMESTAMP_DATETIME);
starts_null= ends_null= execute_at_null= TRUE;
body.str= comment.str= NULL;
body.length= comment.length= 0;
......@@ -161,7 +159,7 @@ Event_parse_data::init_body(THD *thd)
*/
if ((*(body_end - 1) == '*') && (*body_end == '/'))
{
DBUG_PRINT("info", ("consumend one '*" "/' comment in the query '%s'",
DBUG_PRINT("info", ("consumend one '*" "/' comment in the query '%s'",
body_begin));
body.length-= 2;
body_end-= 2;
......@@ -217,7 +215,7 @@ Event_parse_data::init_execute_at(THD *thd)
DBUG_ASSERT(starts_null && ends_null);
/* let's check whether time is in the past */
thd->variables.time_zone->gmt_sec_to_TIME(&time_tmp,
thd->variables.time_zone->gmt_sec_to_TIME(&time_tmp,
(my_time_t) thd->query_start());
if ((not_used= item_execute_at->get_date(&ltime, TIME_NO_ZERO_DATE)))
......@@ -736,8 +734,8 @@ Event_timed::~Event_timed()
Event_job_data::Event_job_data()
*/
Event_job_data::Event_job_data():
thd(NULL), sphead(NULL), sql_mode(0)
Event_job_data::Event_job_data()
:thd(NULL), sphead(NULL), sql_mode(0)
{
}
......@@ -1073,7 +1071,7 @@ bool get_next_time(TIME *next, TIME *start, TIME *time_now, TIME *last_exec,
{
longlong seconds_diff;
long microsec_diff;
if (calc_time_diff(time_now, start, 1, &seconds_diff, &microsec_diff))
{
DBUG_PRINT("error", ("negative difference"));
......@@ -1115,14 +1113,16 @@ bool get_next_time(TIME *next, TIME *start, TIME *time_now, TIME *last_exec,
interval.month= (diff_months / months)*months;
/*
Check if the same month as last_exec (always set - prerequisite)
An event happens at most once per month so there is no way to schedule
it two times for the current month. This saves us from two calls to
date_add_interval() if the event was just executed. But if the scheduler
is started and there was at least 1 scheduled date skipped this one does
not help and two calls to date_add_interval() will be done, which is a
bit more expensive but compared to the rareness of the case is neglectable.
An event happens at most once per month so there is no way to
schedule it two times for the current month. This saves us from two
calls to date_add_interval() if the event was just executed. But if
the scheduler is started and there was at least 1 scheduled date
skipped this one does not help and two calls to date_add_interval()
will be done, which is a bit more expensive but compared to the
rareness of the case is neglectable.
*/
if (time_now->year==last_exec->year && time_now->month==last_exec->month)
if (time_now->year == last_exec->year &&
time_now->month == last_exec->month)
interval.month+= months;
tmp= *start;
......@@ -1289,7 +1289,7 @@ Event_queue_element::compute_next_execution_time()
}
goto ret;
}
else if (starts_null && ends_null)
else if (starts_null && ends_null)
{
/* starts is always set, so this is a dead branch !! */
DBUG_PRINT("info", ("Neither STARTS nor ENDS are set"));
......@@ -1333,7 +1333,7 @@ Event_queue_element::compute_next_execution_time()
{
TIME next_exec;
if (get_next_time(&next_exec, &starts, &time_now,
if (get_next_time(&next_exec, &starts, &time_now,
last_executed.year? &last_executed:&starts,
expression, interval))
goto err;
......@@ -1454,7 +1454,8 @@ Event_queue_element::drop(THD *thd)
RETURN VALUE
FALSE OK
TRUE Error while opening mysql.event for writing or during write on disk
TRUE Error while opening mysql.event for writing or during
write on disk
*/
bool
......@@ -1645,9 +1646,9 @@ Event_job_data::execute(THD *thd)
event_change_security_context(thd, definer_user, definer_host, dbname,
&save_ctx);
/*
THD::~THD will clean this or if there is DROP DATABASE in the SP then
it will be free there. It should not point to our buffer which is allocated
on a mem_root.
THD::~THD will clean this or if there is DROP DATABASE in the
SP then it will be free there. It should not point to our buffer
which is allocated on a mem_root.
*/
thd->db= my_strdup(dbname.str, MYF(0));
thd->db_length= dbname.length;
......@@ -1719,7 +1720,6 @@ Event_job_data::compile(THD *thd, MEM_ROOT *mem_root)
switch (get_fake_create_event(thd, &show_create)) {
case EVEX_MICROSECOND_UNSUP:
sql_print_error("Scheduler");
DBUG_RETURN(EVEX_MICROSECOND_UNSUP);
case 0:
break;
......@@ -1769,7 +1769,8 @@ Event_job_data::compile(THD *thd, MEM_ROOT *mem_root)
Free lex associated resources
QQ: Do we really need all this stuff here?
*/
sql_print_error("error during compile of %s.%s or thd->is_fatal_error=%d",
sql_print_error("SCHEDULER: Error during compilation of %s.%s or "
"thd->is_fatal_error=%d",
dbname.str, name.str, thd->is_fatal_error);
lex.unit.cleanup();
......@@ -1832,10 +1833,13 @@ event_basic_db_equal(LEX_STRING db, Event_basic *et)
/*
Checks whether two events are equal by identifiers
Checks whether an event has equal `db` and `name`
SYNOPSIS
event_basic_identifier_equal()
db Schema
name Name
et The event object
RETURN VALUE
TRUE Equal
......@@ -1851,7 +1855,8 @@ event_basic_identifier_equal(LEX_STRING db, LEX_STRING name, Event_basic *b)
/*
Switches the security context
Switches the security context.
SYNOPSIS
event_change_security_context()
thd Thread
......@@ -1859,7 +1864,7 @@ event_basic_identifier_equal(LEX_STRING db, LEX_STRING name, Event_basic *b)
host The host of the user
db The schema for which the security_ctx will be loaded
backup Where to store the old context
RETURN VALUE
FALSE OK
TRUE Error (generates error too)
......@@ -1887,7 +1892,8 @@ event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host,
/*
Restores the security context
Restores the security context.
SYNOPSIS
event_restore_security_context()
thd Thread
......
......@@ -37,7 +37,7 @@ class Event_basic
LEX_STRING dbname;
LEX_STRING name;
LEX_STRING definer;// combination of user and host
Event_basic();
virtual ~Event_basic();
......@@ -90,7 +90,7 @@ class Event_queue_element : public Event_basic
Event_queue_element();
virtual ~Event_queue_element();
virtual int
load_from_row(TABLE *table);
......@@ -187,7 +187,7 @@ class Event_job_data : public Event_basic
int
get_fake_create_event(THD *thd, String *buf);
Event_job_data(const Event_job_data &); /* Prevent use of these */
Event_job_data(const Event_job_data &); /* Prevent use of these */
void operator=(Event_job_data &);
};
......@@ -237,7 +237,7 @@ class Event_parse_data : public Sql_alloc
new_instance(THD *thd);
bool
check_parse_data(THD *);
check_parse_data(THD *thd);
void
init_body(THD *thd);
......
......@@ -22,14 +22,12 @@
#include "sp.h"
#include "sp_head.h"
#define EVEX_DB_FIELD_LEN 64
#define EVEX_NAME_FIELD_LEN 64
static
time_t mysql_event_last_create_time= 0L;
static
TABLE_FIELD_W_TYPE event_table_fields[ET_FIELD_COUNT] =
TABLE_FIELD_W_TYPE const event_table_fields[ET_FIELD_COUNT] =
{
{
{(char *) STRING_WITH_LEN("db")},
......@@ -250,18 +248,18 @@ mysql_event_fill_row(THD *thd, TABLE *table, Event_parse_data *et,
/*
Performs an index scan of event_table (mysql.event) and fills schema_table.
Synopsis
SYNOPSIS
Event_db_repository::index_read_for_db_for_i_s()
thd Thread
schema_table The I_S.EVENTS table
event_table The event table to use for loading (mysql.event)
Returns
RETURN VALUE
0 OK
1 Error
*/
int
bool
Event_db_repository::index_read_for_db_for_i_s(THD *thd, TABLE *schema_table,
TABLE *event_table, char *db)
{
......@@ -305,33 +303,34 @@ Event_db_repository::index_read_for_db_for_i_s(THD *thd, TABLE *schema_table,
event_table->file->ha_index_end();
/* ret is guaranteed to be != 0 */
if (ret == HA_ERR_END_OF_FILE || ret == HA_ERR_KEY_NOT_FOUND)
DBUG_RETURN(0);
DBUG_RETURN(1);
DBUG_RETURN(FALSE);
DBUG_RETURN(TRUE);
}
/*
Performs a table scan of event_table (mysql.event) and fills schema_table.
Synopsis
SYNOPSIS
Events_db_repository::table_scan_all_for_i_s()
thd Thread
schema_table The I_S.EVENTS in memory table
event_table The event table to use for loading.
Returns
0 OK
1 Error
RETURN VALUE
FALSE OK
TRUE Error
*/
int
bool
Event_db_repository::table_scan_all_for_i_s(THD *thd, TABLE *schema_table,
TABLE *event_table)
{
int ret;
READ_RECORD read_record_info;
DBUG_ENTER("Event_db_repository::table_scan_all_for_i_s");
init_read_record(&read_record_info, thd, event_table, NULL, 1, 0);
/*
......@@ -350,7 +349,7 @@ Event_db_repository::table_scan_all_for_i_s(THD *thd, TABLE *schema_table,
end_read_record(&read_record_info);
/* ret is guaranteed to be != 0 */
DBUG_RETURN(ret == -1? 0:1);
DBUG_RETURN(ret == -1? FALSE:TRUE);
}
......@@ -358,15 +357,15 @@ Event_db_repository::table_scan_all_for_i_s(THD *thd, TABLE *schema_table,
Fills I_S.EVENTS with data loaded from mysql.event. Also used by
SHOW EVENTS
Synopsis
SYNOPSIS
Event_db_repository::fill_schema_events()
thd Thread
tables The schema table
db If not NULL then get events only from this schema
Returns
0 OK
1 Error
RETURN VALUE
FALSE OK
TRUE Error
*/
int
......@@ -455,16 +454,16 @@ Event_db_repository::open_event_table(THD *thd, enum thr_lock_type lock_type,
/*
Checks parameters which we got from the parsing phase.
Checks parameters which we got from the parsing phase.
SYNOPSIS
check_parse_params()
thd THD
et event's data
SYNOPSIS
check_parse_params()
thd Thread context
parse_data Event's data
RETURNS
0 OK
EVEX_BAD_PARAMS Error (reported)
RETURN VALUE
FALSE OK
TRUE Error (reported)
*/
static int
......@@ -504,7 +503,7 @@ check_parse_params(THD *thd, Event_parse_data *parse_data)
SYNOPSIS
Event_db_repository::create_event()
thd [in] THD
et [in] Object containing info about the event
parse_data [in] Object containing info about the event
create_if_not [in] Whether to generate anwarning in case event exists
rows_affected [out] How many rows were affected
......@@ -517,7 +516,7 @@ check_parse_params(THD *thd, Event_parse_data *parse_data)
::update_event. The name of the event is inside "et".
*/
int
bool
Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
my_bool create_if_not, uint *rows_affected)
{
......@@ -531,6 +530,9 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
DBUG_ENTER("Event_db_repository::create_event");
*rows_affected= 0;
if (check_parse_params(thd, parse_data))
goto err;
DBUG_PRINT("info", ("open mysql.event for update"));
if (open_event_table(thd, TL_WRITE, &table))
{
......@@ -538,8 +540,6 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
goto err;
}
if (check_parse_params(thd, parse_data))
goto err;
DBUG_PRINT("info", ("name: %.*s", parse_data->name.length,
parse_data->name.str));
......@@ -570,16 +570,17 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
if (system_charset_info->cset->
numchars(system_charset_info, parse_data->dbname.str,
parse_data->dbname.str +
parse_data->dbname.length) > EVEX_DB_FIELD_LEN)
parse_data->dbname.str + parse_data->dbname.length) >
table->field[ET_FIELD_DB]->char_length())
{
my_error(ER_TOO_LONG_IDENT, MYF(0), parse_data->dbname.str);
goto err;
}
if (system_charset_info->cset->
numchars(system_charset_info, parse_data->name.str,
parse_data->name.str +
parse_data->name.length) > EVEX_DB_FIELD_LEN)
parse_data->name.str + parse_data->name.length) >
table->field[ET_FIELD_NAME]->char_length())
{
my_error(ER_TOO_LONG_IDENT, MYF(0), parse_data->name.str);
goto err;
......@@ -622,36 +623,18 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
if (dbchanged)
(void) mysql_change_db(thd, old_db.str, 1);
/*
When valgrinded, the following call may lead to the following error:
Syscall param pwrite64(buf) points to uninitialised byte(s)
at 0x406003B: do_pwrite64 (in /lib/tls/libpthread.so.0)
by 0x40600EF: pwrite64 (in /lib/tls/libpthread.so.0)
by 0x856FF74: my_pwrite (my_pread.c:146)
by 0x85734E1: flush_cached_blocks (mf_keycache.c:2280)
....
Address 0x6618110 is 56 bytes inside a block of size 927,772 alloc'd
at 0x401C451: malloc (vg_replace_malloc.c:149)
by 0x8578CDC: _mymalloc (safemalloc.c:138)
by 0x858E5E2: my_large_malloc (my_largepage.c:65)
by 0x8570634: init_key_cache (mf_keycache.c:343)
by 0x82EDA51: ha_init_key_cache(char const*, st_key_cache*) (handler.cc:2509)
by 0x8212071: process_key_caches(int (*)(char const*, st_key_cache*))
(set_var.cc:3824)
by 0x8206D75: init_server_components() (mysqld.cc:3304)
by 0x8207163: main (mysqld.cc:3578)
I think it is safe not to think about it.
This statement may cause a spooky valgrind warning at startup
inside init_key_cache on my system (ahristov, 2006/08/10)
*/
close_thread_tables(thd);
DBUG_RETURN(0);
DBUG_RETURN(FALSE);
err:
if (dbchanged)
(void) mysql_change_db(thd, old_db.str, 1);
if (table)
close_thread_tables(thd);
DBUG_RETURN(EVEX_GENERAL_ERROR);
DBUG_RETURN(TRUE);
}
......@@ -665,8 +648,8 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
et event's data
RETURN VALUE
0 OK
EVEX_GENERAL_ERROR Error occured and reported
FALSE OK
TRUE Error (reported)
NOTES
sp_name is passed since this is the name of the event to
......@@ -679,7 +662,6 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
{
CHARSET_INFO *scs= system_charset_info;
TABLE *table= NULL;
int ret;
DBUG_ENTER("Event_db_repository::update_event");
if (open_event_table(thd, TL_WRITE, &table))
......@@ -698,7 +680,7 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
DBUG_PRINT("info", ("rename to: %s@%s", new_dbname->str, new_name->str));
/* first look whether we overwrite */
if (new_dbname)
if (new_name)
{
if (!sortcmp_lex_string(parse_data->name, *new_name, scs) &&
!sortcmp_lex_string(parse_data->dbname, *new_dbname, scs))
......@@ -747,9 +729,10 @@ Event_db_repository::update_event(THD *thd, Event_parse_data *parse_data,
if (end_active_trans(thd))
goto err;
if (table->file->ha_update_row(table->record[1], table->record[0]))
int res;
if ((res= table->file->ha_update_row(table->record[1], table->record[0])))
{
my_error(ER_EVENT_STORE_FAILED, MYF(0), parse_data->name.str, ret);
my_error(ER_EVENT_STORE_FAILED, MYF(0), parse_data->name.str, res);
goto err;
}
......@@ -887,16 +870,14 @@ Event_db_repository::find_named_event(THD *thd, LEX_STRING db, LEX_STRING name,
Event_db_repository::drop_schema_events()
thd Thread
schema The database to clean from events
RETURN VALUE
0 OK
!0 Error (Reported)
*/
int
void
Event_db_repository::drop_schema_events(THD *thd, LEX_STRING schema)
{
return drop_events_by_field(thd, ET_FIELD_DB, schema);
DBUG_ENTER("Event_db_repository::drop_schema_events");
drop_events_by_field(thd, ET_FIELD_DB, schema);
DBUG_VOID_RETURN;
}
......@@ -909,28 +890,29 @@ Event_db_repository::drop_schema_events(THD *thd, LEX_STRING schema)
table mysql.event TABLE
field Which field of the row to use for matching
field_value The value that should match
RETURN VALUE
0 OK
!0 Error from ha_delete_row
*/
int
void
Event_db_repository::drop_events_by_field(THD *thd,
enum enum_events_table_field field,
LEX_STRING field_value)
{
int ret= 0;
TABLE *table= NULL;
Open_tables_state backup;
READ_RECORD read_record_info;
DBUG_ENTER("Event_db_repository::drop_events_by_field");
DBUG_PRINT("enter", ("field=%d field_value=%s", field, field_value.str));
if (open_event_table(thd, TL_WRITE, &table))
{
my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0));
DBUG_RETURN(1);
/*
Currently being used only for DROP DATABASE - In this case we don't need
error message since the OK packet has been sent. But for DROP USER we
could need it.
my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0));
*/
DBUG_VOID_RETURN;
}
/* only enabled events are in memory, so we go now and delete the rest */
......@@ -939,22 +921,20 @@ Event_db_repository::drop_events_by_field(THD *thd,
{
char *et_field= get_field(thd->mem_root, table->field[field]);
LEX_STRING et_field_lex= {et_field, strlen(et_field)};
LEX_STRING et_field_lex= { et_field, strlen(et_field) };
DBUG_PRINT("info", ("Current event %s name=%s", et_field,
get_field(thd->mem_root, table->field[ET_FIELD_NAME])));
if (!sortcmp_lex_string(et_field_lex, field_value, system_charset_info))
{
DBUG_PRINT("info", ("Dropping"));
if ((ret= table->file->ha_delete_row(table->record[0])))
my_error(ER_EVENT_DROP_FAILED, MYF(0),
get_field(thd->mem_root, table->field[ET_FIELD_NAME]));
ret= table->file->ha_delete_row(table->record[0]);
}
}
end_read_record(&read_record_info);
thd->version--; /* Force close to free memory */
close_thread_tables(thd);
DBUG_RETURN(ret);
DBUG_VOID_RETURN;
}
......@@ -964,10 +944,10 @@ Event_db_repository::drop_events_by_field(THD *thd,
SYNOPSIS
Event_db_repository::load_named_event()
thd [in] THD
thd [in] Thread context
dbname [in] Event's db name
name [in] Event's name
etn_new [out] The loaded event
etn [out] The loaded event
RETURN VALUE
FALSE OK
......
......@@ -47,9 +47,6 @@ events_table_index_read_for_db(THD *thd, TABLE *schema_table,
int
events_table_scan_all(THD *thd, TABLE *schema_table, TABLE *event_table);
int
fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */);
class Event_basic;
class Event_parse_data;
......@@ -58,7 +55,7 @@ class Event_db_repository
public:
Event_db_repository(){}
int
bool
create_event(THD *thd, Event_parse_data *parse_data, my_bool create_if_not,
uint *rows_affected);
......@@ -70,7 +67,7 @@ class Event_db_repository
drop_event(THD *thd, LEX_STRING db, LEX_STRING name, bool drop_if_exists,
uint *rows_affected);
int
void
drop_schema_events(THD *thd, LEX_STRING schema);
bool
......@@ -79,7 +76,6 @@ class Event_db_repository
bool
load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_basic *et);
int
open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
......@@ -87,14 +83,14 @@ class Event_db_repository
fill_schema_events(THD *thd, TABLE_LIST *tables, char *db);
private:
int
void
drop_events_by_field(THD *thd, enum enum_events_table_field field,
LEX_STRING field_value);
int
bool
index_read_for_db_for_i_s(THD *thd, TABLE *schema_table, TABLE *event_table,
char *db);
int
bool
table_scan_all_for_i_s(THD *thd, TABLE *schema_table, TABLE *event_table);
static bool
......
......@@ -73,35 +73,6 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b)
}
pthread_handler_t
event_queue_loader_thread(void *arg)
{
/* needs to be first for thread_stack */
THD *thd= (THD *)((struct event_queue_param *) arg)->thd;
struct event_queue_param *param= (struct event_queue_param *) arg;
thd->thread_stack= (char *) &thd;
if (post_init_event_thread(thd))
goto end;
DBUG_ENTER("event_queue_loader_thread");
pthread_mutex_lock(&param->LOCK_loaded);
param->queue->check_system_tables(thd);
param->queue->load_events_from_db(thd);
param->loading_finished= TRUE;
pthread_cond_signal(&param->COND_loaded);
pthread_mutex_unlock(&param->LOCK_loaded);
end:
deinit_event_thread(thd);
DBUG_RETURN(0); // Against gcc warnings
}
/*
Constructor of class Event_queue.
......@@ -110,14 +81,12 @@ event_queue_loader_thread(void *arg)
*/
Event_queue::Event_queue()
:mutex_last_unlocked_at_line(0), mutex_last_locked_at_line(0),
mutex_last_attempted_lock_at_line(0),
mutex_queue_data_locked(FALSE), mutex_queue_data_attempting_lock(FALSE)
{
mutex_last_unlocked_at_line= mutex_last_locked_at_line=
mutex_last_attempted_lock_at_line= 0;
mutex_last_unlocked_in_func= mutex_last_locked_in_func=
mutex_last_attempted_lock_in_func= "";
mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
}
......@@ -150,7 +119,12 @@ Event_queue::deinit_mutexes()
/*
Inits the queue
This is a queue's constructor. Until this method is called, the
queue is unusable. We don't use a C++ constructor instead in
order to be able to check the return value. The queue is
initialized once at server startup. Initialization can fail in
case of a failure reading events from the database or out of
memory.
SYNOPSIS
Event_queue::init()
......@@ -161,9 +135,9 @@ Event_queue::deinit_mutexes()
*/
bool
Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
Event_queue::init_queue(THD *thd, Event_db_repository *db_repo,
Event_scheduler *sched)
{
THD *new_thd;
pthread_t th;
bool res;
struct event_queue_param *event_queue_param_value= NULL;
......@@ -186,43 +160,16 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
if (sizeof(my_time_t) != sizeof(time_t))
{
sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
"The scheduler may not work correctly. Stopping.");
"The scheduler may not work correctly. Stopping");
DBUG_ASSERT(0);
goto err;
}
if (!(new_thd= new THD))
goto err;
pre_init_event_thread(new_thd);
new_thd->security_ctx->set_user((char*)"event_scheduler_loader");
event_queue_param_value= (struct event_queue_param *)
my_malloc(sizeof(struct event_queue_param), MYF(0));
event_queue_param_value->thd= new_thd;
event_queue_param_value->queue= this;
event_queue_param_value->loading_finished= FALSE;
pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST);
pthread_cond_init(&event_queue_param_value->COND_loaded, NULL);
pthread_mutex_lock(&event_queue_param_value->LOCK_loaded);
DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
if (!(res= pthread_create(&th, &connection_attrib, event_queue_loader_thread,
(void*)event_queue_param_value)))
{
do {
pthread_cond_wait(&event_queue_param_value->COND_loaded,
&event_queue_param_value->LOCK_loaded);
} while (event_queue_param_value->loading_finished == FALSE);
}
pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded);
pthread_mutex_destroy(&event_queue_param_value->LOCK_loaded);
pthread_cond_destroy(&event_queue_param_value->COND_loaded);
my_free((char *)event_queue_param_value, MYF(0));
res= load_events_from_db(thd);
UNLOCK_QUEUE_DATA();
if (res)
deinit_queue();
DBUG_RETURN(res);
err:
......@@ -316,16 +263,15 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
LEX_STRING *new_schema, LEX_STRING *new_name)
{
int res;
Event_queue_element *old_element= NULL,
*new_element;
Event_queue_element *new_element;
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
new_element= new Event_queue_element();
res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname,
new_name? *new_name:name, new_element);
res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname,
new_name ? *new_name:name, new_element);
if (res)
{
delete new_element;
......@@ -345,16 +291,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
new_element->compute_next_execution_time();
LOCK_QUEUE_DATA();
if (!(old_element= find_n_remove_event(dbname, name)))
{
DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED",
dbname.str, name.str));
}
find_n_remove_event(dbname, name);
/* If not disabled event */
if (new_element)
{
DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx",
new_element, old_element));
DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element));
queue_insert_safe(&queue, (byte *) new_element);
}
dbug_dump_queue(thd->query_start());
......@@ -363,8 +305,6 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
if (new_element)
notify_observers();
if (old_element)
delete old_element;
end:
DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res);
......@@ -385,19 +325,13 @@ void
Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{
int res;
Event_queue_element *element;
DBUG_ENTER("Event_queue::drop_event");
DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name));
LOCK_QUEUE_DATA();
element= find_n_remove_event(dbname, name);
find_n_remove_event(dbname, name);
dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
if (element)
delete element;
else
DBUG_PRINT("info", ("No such event found, probably DISABLED"));
/*
We don't signal here because the scheduler will catch the change
......@@ -429,10 +363,10 @@ void
Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*comparator)(LEX_STRING, Event_basic *))
{
uint i= 0;
DBUG_ENTER("Event_queue::drop_matching_events");
DBUG_PRINT("enter", ("pattern=%s", pattern.str));
uint i= 0;
while (i < queue.elements)
{
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
......@@ -440,10 +374,10 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
if (comparator(pattern, et))
{
/*
The queue is ordered. If we remove an element, then all elements after
it will shift one position to the left, if we imagine it as an array
from left to the right. In this case we should not increment the
counter and the (i < queue.elements) condition is ok.
The queue is ordered. If we remove an element, then all elements
after it will shift one position to the left, if we imagine it as
an array from left to the right. In this case we should not
increment the counter and the (i < queue.elements) condition is ok.
*/
queue_remove(&queue, i);
delete et;
......@@ -453,12 +387,12 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
}
/*
We don't call notify_observers() . If we remove the top event:
1. The queue is empty. The scheduler will wake up at some time and realize
that the queue is empty. If create_event() comes inbetween it will
signal the scheduler
2. The queue is not empty, but the next event after the previous top, won't
be executed any time sooner than the element we removed. Hence, we may
not notify the scheduler and it will realize the change when it
1. The queue is empty. The scheduler will wake up at some time and
realize that the queue is empty. If create_event() comes inbetween
it will signal the scheduler
2. The queue is not empty, but the next event after the previous top,
won't be executed any time sooner than the element we removed. Hence,
we may not notify the scheduler and it will realize the change when it
wakes up from timedwait.
*/
......@@ -472,11 +406,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
SYNOPSIS
Event_queue::drop_schema_events()
thd THD
db The schema name
RETURN VALUE
>=0 Number of dropped events
thd HD
schema The schema name
*/
void
......@@ -516,16 +447,12 @@ Event_queue::notify_observers()
db The schema of the event to find
name The event to find
RETURN VALUE
NULL Not found
otherwise Address
NOTE
The caller should do the locking also the caller is responsible for
actual signalling in case an event is removed from the queue.
*/
Event_queue_element *
void
Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
{
uint i;
......@@ -539,11 +466,12 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
if (event_basic_identifier_equal(db, name, et))
{
queue_remove(&queue, i);
DBUG_RETURN(et);
delete et;
break;
}
}
DBUG_RETURN(NULL);
DBUG_VOID_RETURN;
}
......@@ -583,7 +511,7 @@ Event_queue::load_events_from_db(THD *thd)
if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
{
sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open.");
sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
}
......@@ -625,14 +553,17 @@ Event_queue::load_events_from_db(THD *thd)
temp_job_data.load_from_row(table);
/* We load only on scheduler root just to check whether the body compiles */
/*
We load only on scheduler root just to check whether the body
compiles.
*/
switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
case EVEX_MICROSECOND_UNSUP:
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
"supported but found in mysql.event");
break;
case EVEX_COMPILE_ERROR:
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
et->dbname.str, et->name.str);
break;
default:
......@@ -663,12 +594,10 @@ Event_queue::load_events_from_db(THD *thd)
else
{
ret= 0;
sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s");
sql_print_information("SCHEDULER: Loaded %d event%s", count,
(count == 1)?"":"s");
}
/* Force close to free memory */
thd->version--;
close_thread_tables(thd);
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
......@@ -676,71 +605,6 @@ Event_queue::load_events_from_db(THD *thd)
}
/*
Opens mysql.db and mysql.user and checks whether:
1. mysql.db has column Event_priv at column 20 (0 based);
2. mysql.user has column Event_priv at column 29 (0 based);
SYNOPSIS
Event_queue::check_system_tables()
thd Thread
RETURN VALUE
FALSE OK
TRUE Error
*/
void
Event_queue::check_system_tables(THD *thd)
{
TABLE_LIST tables;
bool not_used;
Open_tables_state backup;
bool ret;
DBUG_ENTER("Event_queue::check_system_tables");
DBUG_PRINT("enter", ("thd=0x%lx", thd));
thd->reset_n_backup_open_tables_state(&backup);
bzero((char*) &tables, sizeof(tables));
tables.db= (char*) "mysql";
tables.table_name= tables.alias= (char*) "db";
tables.lock_type= TL_READ;
if ((ret= simple_open_n_lock_tables(thd, &tables)))
{
sql_print_error("Cannot open mysql.db");
goto end;
}
ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
mysql_db_table_fields, &mysql_db_table_last_check,
ER_CANNOT_LOAD_FROM_TABLE);
close_thread_tables(thd);
bzero((char*) &tables, sizeof(tables));
tables.db= (char*) "mysql";
tables.table_name= tables.alias= (char*) "user";
tables.lock_type= TL_READ;
if (simple_open_n_lock_tables(thd, &tables))
sql_print_error("Cannot open mysql.db");
else
{
if (tables.table->s->fields < 29 ||
strncmp(tables.table->field[29]->field_name,
STRING_WITH_LEN("Event_priv")))
sql_print_error("mysql.user has no `Event_priv` column at position 29");
close_thread_tables(thd);
}
end:
thd->restore_backup_open_tables_state(&backup);
DBUG_VOID_RETURN;
}
/*
Recalculates activation times in the queue. There is one reason for
that. Because the values (execute_at) by which the queue is ordered are
......@@ -782,7 +646,7 @@ Event_queue::recalculate_activation_times(THD *thd)
Event_queue::empty_queue()
NOTE
Should be called with LOCK_event_queue locked
Should be called with LOCK_event_queue locked
*/
void
......
......@@ -31,12 +31,12 @@ class Event_queue
void
init_mutexes();
void
deinit_mutexes();
bool
init_queue(Event_db_repository *db_repo, Event_scheduler *sched);
init_queue(THD *thd, Event_db_repository *db_repo, Event_scheduler *sched);
void
deinit_queue();
......@@ -56,9 +56,6 @@ class Event_queue
void
drop_schema_events(THD *thd, LEX_STRING schema);
void
check_system_tables(THD *thd);
void
recalculate_activation_times(THD *thd);
......@@ -72,7 +69,7 @@ class Event_queue
load_events_from_db(THD *thd);
protected:
Event_queue_element *
void
find_n_remove_event(LEX_STRING db, LEX_STRING name);
......@@ -107,7 +104,7 @@ class Event_queue
const char* mutex_last_attempted_lock_in_func;
bool mutex_queue_data_locked;
bool mutex_queue_data_attempting_lock;
/* helper functions for working with mutexes & conditionals */
void
lock_data(const char *func, uint line);
......
......@@ -384,7 +384,7 @@ Event_scheduler::start()
if (!(new_thd= new THD))
{
sql_print_error("SCHEDULER: Cannot init manager event thread.");
sql_print_error("SCHEDULER: Cannot init manager event thread");
ret= TRUE;
goto end;
}
......@@ -441,7 +441,7 @@ Event_scheduler::start()
bool
Event_scheduler::run(THD *thd)
{
int res;
int res= FALSE;
struct timespec abstime;
Event_job_data *job_data;
DBUG_ENTER("Event_scheduler::run");
......@@ -464,7 +464,7 @@ Event_scheduler::run(THD *thd)
&job_data, &abstime))
{
sql_print_information("SCHEDULER: Serious error during getting next"
" event to execute. Stopping.");
" event to execute. Stopping");
break;
}
......@@ -532,7 +532,7 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
pthread_t th;
int res= 0;
DBUG_ENTER("Event_scheduler::execute_top");
if (!(new_thd= new THD))
if (!(new_thd= new THD()))
goto error;
pre_init_event_thread(new_thd);
......
......@@ -31,12 +31,13 @@ deinit_event_thread(THD *thd);
class Event_scheduler
{
public:
Event_scheduler(){}
Event_scheduler():state(UNINITIALIZED){}
~Event_scheduler(){}
enum enum_state
{
INITIALIZED = 0,
UNINITIALIZED = 0,
INITIALIZED,
RUNNING,
STOPPING
};
......@@ -50,12 +51,12 @@ class Event_scheduler
stop();
/*
Need to be public because has to be called from the function
Need to be public because has to be called from the function
passed to pthread_create.
*/
bool
run(THD *thd);
void
init_scheduler(Event_queue *queue);
......@@ -64,7 +65,7 @@ class Event_scheduler
void
init_mutexes();
void
deinit_mutexes();
......@@ -112,7 +113,7 @@ class Event_scheduler
ulong thread_id;
pthread_cond_t COND_state;
Event_queue *queue;
uint mutex_last_locked_at_line;
......@@ -121,7 +122,7 @@ class Event_scheduler
const char* mutex_last_unlocked_in_func;
bool mutex_scheduler_data_locked;
bool waiting_on_cond;
ulonglong started_events;
private:
......
......@@ -70,6 +70,15 @@ TYPELIB Events::opt_typelib=
NULL
};
static
Event_queue events_event_queue;
static
Event_scheduler events_event_scheduler;
static
Event_db_repository events_event_db_repository;
Events Events::singleton;
ulong Events::opt_event_scheduler= 2;
......@@ -218,7 +227,7 @@ Events::reconstruct_interval_expression(String *buf, interval_type interval,
expr= tmp_expr - (tmp_expr/60)*60;
/* the code after the switch will finish */
}
break;
break;
case INTERVAL_DAY_MICROSECOND:
case INTERVAL_HOUR_MICROSECOND:
case INTERVAL_MINUTE_MICROSECOND:
......@@ -247,6 +256,22 @@ Events::reconstruct_interval_expression(String *buf, interval_type interval,
return 0;
}
/*
Constructor of Events class. It's called when events.o
is loaded. Assigning addressed of static variables in this
object file.
SYNOPSIS
Events::Events()
*/
Events::Events()
{
scheduler= &events_event_scheduler;
event_queue= &events_event_queue;
db_repository= &events_event_db_repository;
}
/*
Opens mysql.event table with specified lock
......@@ -265,7 +290,7 @@ Events::reconstruct_interval_expression(String *buf, interval_type interval,
int
Events::open_event_table(THD *thd, enum thr_lock_type lock_type,
TABLE **table)
TABLE **table)
{
return db_repository->open_event_table(thd, lock_type, table);
}
......@@ -277,25 +302,30 @@ Events::open_event_table(THD *thd, enum thr_lock_type lock_type,
SYNOPSIS
Events::create_event()
thd [in] THD
et [in] Event's data from parsing stage
parse_data [in] Event's data from parsing stage
if_not_exists [in] Whether IF NOT EXISTS was specified in the DDL
rows_affected [out] How many rows were affected
RETURN VALUE
0 OK
!0 Error (Reported)
FALSE OK
TRUE Error (Reported)
NOTES
In case there is an event with the same name (db) and
IF NOT EXISTS is specified, an warning is put into the stack.
*/
int
bool
Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists,
uint *rows_affected)
{
int ret;
DBUG_ENTER("Events::create_event");
if (unlikely(check_system_tables_error))
{
my_error(ER_EVENTS_DB_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
pthread_mutex_lock(&LOCK_event_metadata);
/* On error conditions my_error() is called so no need to handle here */
......@@ -321,13 +351,13 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists,
SYNOPSIS
Events::update_event()
thd [in] THD
et [in] Event's data from parsing stage
parse_data [in] Event's data from parsing stage
rename_to [in] Set in case of RENAME TO.
rows_affected [out] How many rows were affected.
RETURN VALUE
0 OK
!0 Error
FALSE OK
TRUE Error
NOTES
et contains data about dbname and event name.
......@@ -335,14 +365,19 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists,
that RENAME TO was specified in the query
*/
int
bool
Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to,
uint *rows_affected)
{
int ret;
DBUG_ENTER("Events::update_event");
LEX_STRING *new_dbname= rename_to? &rename_to->m_db: NULL;
LEX_STRING *new_name= rename_to? &rename_to->m_name: NULL;
LEX_STRING *new_dbname= rename_to ? &rename_to->m_db : NULL;
LEX_STRING *new_name= rename_to ? &rename_to->m_name : NULL;
if (unlikely(check_system_tables_error))
{
my_error(ER_EVENTS_DB_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
pthread_mutex_lock(&LOCK_event_metadata);
/* On error conditions my_error() is called so no need to handle here */
......@@ -378,16 +413,21 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to,
removal from memory queue.
RETURN VALUE
0 OK
!0 Error (reported)
FALSE OK
TRUE Error (reported)
*/
int
bool
Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
uint *rows_affected, bool only_from_disk)
{
int ret;
DBUG_ENTER("Events::drop_event");
if (unlikely(check_system_tables_error))
{
my_error(ER_EVENTS_DB_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
pthread_mutex_lock(&LOCK_event_metadata);
/* On error conditions my_error() is called so no need to handle here */
......@@ -409,27 +449,27 @@ Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
Events::drop_schema_events()
thd Thread
db ASCIIZ schema name
RETURN VALUE
0 OK
!0 Error
*/
int
void
Events::drop_schema_events(THD *thd, char *db)
{
int ret= 0;
LEX_STRING db_lex= {db, strlen(db)};
LEX_STRING const db_lex= { db, strlen(db) };
DBUG_ENTER("Events::drop_schema_events");
DBUG_PRINT("enter", ("dropping events from %s", db));
if (unlikely(check_system_tables_error))
{
my_error(ER_EVENTS_DB_ERROR, MYF(0));
DBUG_VOID_RETURN;
}
pthread_mutex_lock(&LOCK_event_metadata);
event_queue->drop_schema_events(thd, db_lex);
ret= db_repository->drop_schema_events(thd, db_lex);
db_repository->drop_schema_events(thd, db_lex);
pthread_mutex_unlock(&LOCK_event_metadata);
DBUG_RETURN(ret);
DBUG_VOID_RETURN;
}
......@@ -438,15 +478,15 @@ Events::drop_schema_events(THD *thd, char *db)
SYNOPSIS
Events::show_create_event()
thd THD
spn the name of the event (db, name)
thd Thread context
spn The name of the event (db, name)
RETURN VALUE
0 OK
1 Error during writing to the wire
FALSE OK
TRUE Error during writing to the wire
*/
int
bool
Events::show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{
CHARSET_INFO *scs= system_charset_info;
......@@ -455,6 +495,11 @@ Events::show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
DBUG_ENTER("Events::show_create_event");
DBUG_PRINT("enter", ("name: %s@%s", dbname.str, name.str));
if (unlikely(check_system_tables_error))
{
my_error(ER_EVENTS_DB_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
ret= db_repository->load_named_event(thd, dbname, name, et);
......@@ -481,8 +526,8 @@ Events::show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
field_list.push_back(new Item_empty_string("sql_mode", sql_mode_len));
field_list.
push_back(new Item_empty_string("Create Event", show_str.length()));
field_list.push_back(new Item_empty_string("Create Event",
show_str.length()));
if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF))
......@@ -501,7 +546,7 @@ Events::show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
DBUG_RETURN(ret);
err:
delete et;
DBUG_RETURN(1);
DBUG_RETURN(TRUE);
}
......@@ -511,7 +556,7 @@ Events::show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
SYNOPSIS
Events::fill_schema_events()
thd Thread
thd Thread context
tables The schema table
cond Unused
......@@ -525,6 +570,13 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */)
{
char *db= NULL;
DBUG_ENTER("Events::fill_schema_events");
Events *myself= get_instance();
if (unlikely(myself->check_system_tables_error))
{
my_error(ER_EVENTS_DB_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
/*
If it's SHOW EVENTS then thd->lex->select_lex.db is guaranteed not to
be NULL. Let's do an assert anyway.
......@@ -537,8 +589,7 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */)
DBUG_RETURN(1);
db= thd->lex->select_lex.db;
}
DBUG_RETURN(get_instance()->db_repository->
fill_schema_events(thd, tables, db));
DBUG_RETURN(myself->db_repository->fill_schema_events(thd, tables, db));
}
......@@ -552,31 +603,60 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */)
This function is not synchronized.
RETURN VALUE
0 OK
1 Error in case the scheduler can't start
FALSE OK
TRUE Error in case the scheduler can't start
*/
bool
Events::init()
{
int res;
THD *thd;
bool res= FALSE;
DBUG_ENTER("Events::init");
if (event_queue->init_queue(db_repository, scheduler))
/* We need a temporary THD during boot */
if (!(thd= new THD()))
{
sql_print_information("SCHEDULER: Error while loading from disk.");
DBUG_RETURN(TRUE);
res= TRUE;
goto end;
}
/*
The thread stack does not start from this function but we cannot
guess the real value. So better some value that doesn't assert than
no value.
*/
thd->thread_stack= (char*) &thd;
thd->store_globals();
if (check_system_tables(thd))
{
check_system_tables_error= TRUE;
sql_print_error("SCHEDULER: The system tables are damaged. "
"The scheduler subsystem will be unusable during this run.");
goto end;
}
check_system_tables_error= FALSE;
if (event_queue->init_queue(thd, db_repository, scheduler))
{
sql_print_error("SCHEDULER: Error while loading from disk.");
goto end;
}
scheduler->init_scheduler(event_queue);
/* it should be an assignment! */
if (opt_event_scheduler)
{
DBUG_ASSERT(opt_event_scheduler == 1 || opt_event_scheduler == 2);
if (opt_event_scheduler == 1)
DBUG_RETURN(scheduler->start());
res= scheduler->start();
}
DBUG_RETURN(FALSE);
end:
delete thd;
/* Remember that we don't have a THD */
my_pthread_setspecific_ptr(THR_THD, NULL);
DBUG_RETURN(res);
}
......@@ -595,10 +675,13 @@ Events::deinit()
{
DBUG_ENTER("Events::deinit");
scheduler->stop();
scheduler->deinit_scheduler();
if (likely(!check_system_tables_error))
{
scheduler->stop();
scheduler->deinit_scheduler();
event_queue->deinit_queue();
event_queue->deinit_queue();
}
DBUG_VOID_RETURN;
}
......@@ -616,13 +699,7 @@ void
Events::init_mutexes()
{
pthread_mutex_init(&LOCK_event_metadata, MY_MUTEX_INIT_FAST);
db_repository= new Event_db_repository;
event_queue= new Event_queue;
event_queue->init_mutexes();
scheduler= new Event_scheduler;
scheduler->init_mutexes();
}
......@@ -639,11 +716,6 @@ Events::destroy_mutexes()
{
event_queue->deinit_mutexes();
scheduler->deinit_mutexes();
delete scheduler;
delete db_repository;
delete event_queue;
pthread_mutex_destroy(&LOCK_event_metadata);
}
......@@ -700,6 +772,11 @@ bool
Events::start_execution_of_events()
{
DBUG_ENTER("Events::start_execution_of_events");
if (unlikely(check_system_tables_error))
{
my_error(ER_EVENTS_DB_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
DBUG_RETURN(scheduler->start());
}
......@@ -721,6 +798,11 @@ bool
Events::stop_execution_of_events()
{
DBUG_ENTER("Events::stop_execution_of_events");
if (unlikely(check_system_tables_error))
{
my_error(ER_EVENTS_DB_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
DBUG_RETURN(scheduler->stop());
}
......@@ -737,8 +819,86 @@ Events::stop_execution_of_events()
*/
bool
Events::is_started()
Events::is_execution_of_events_started()
{
DBUG_ENTER("Events::is_started");
DBUG_ENTER("Events::is_execution_of_events_started");
if (unlikely(check_system_tables_error))
{
my_error(ER_EVENTS_DB_ERROR, MYF(0));
DBUG_RETURN(FALSE);
}
DBUG_RETURN(scheduler->get_state() == Event_scheduler::RUNNING);
}
/*
Opens mysql.db and mysql.user and checks whether:
1. mysql.db has column Event_priv at column 20 (0 based);
2. mysql.user has column Event_priv at column 29 (0 based);
SYNOPSIS
Events::check_system_tables()
thd Thread
RETURN VALUE
FALSE OK
TRUE Error
*/
bool
Events::check_system_tables(THD *thd)
{
TABLE_LIST tables;
bool not_used;
Open_tables_state backup;
bool ret= FALSE;
DBUG_ENTER("Events::check_system_tables");
DBUG_PRINT("enter", ("thd=0x%lx", thd));
thd->reset_n_backup_open_tables_state(&backup);
bzero((char*) &tables, sizeof(tables));
tables.db= (char*) "mysql";
tables.table_name= tables.alias= (char*) "db";
tables.lock_type= TL_READ;
if ((ret= simple_open_n_lock_tables(thd, &tables)))
{
sql_print_error("SCHEDULER: Cannot open mysql.db");
ret= TRUE;
}
ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
mysql_db_table_fields, &mysql_db_table_last_check,
ER_CANNOT_LOAD_FROM_TABLE);
close_thread_tables(thd);
bzero((char*) &tables, sizeof(tables));
tables.db= (char*) "mysql";
tables.table_name= tables.alias= (char*) "user";
tables.lock_type= TL_READ;
if (simple_open_n_lock_tables(thd, &tables))
{
sql_print_error("SCHEDULER: Cannot open mysql.user");
ret= TRUE;
}
else
{
if (tables.table->s->fields < 29 ||
strncmp(tables.table->field[29]->field_name,
STRING_WITH_LEN("Event_priv")))
{
sql_print_error("mysql.user has no `Event_priv` column at position %d",
29);
ret= TRUE;
}
close_thread_tables(thd);
}
end:
thd->restore_backup_open_tables_state(&backup);
DBUG_RETURN(ret);
}
......@@ -42,59 +42,59 @@ sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs);
class Events
{
public:
friend class Event_queue_element;
/*
Quite NOT the best practice and will be removed once
Event_timed::drop() and Event_timed is fixed not do drop directly
or other scheme will be found.
*/
friend class Event_queue_element;
static ulong opt_event_scheduler;
static TYPELIB opt_typelib;
bool
init();
void
deinit();
void
init_mutexes();
void
destroy_mutexes();
bool
start_execution_of_events();
bool
stop_execution_of_events();
bool
is_started();
is_execution_of_events_started();
static Events*
static Events *
get_instance();
int
bool
create_event(THD *thd, Event_parse_data *parse_data, bool if_exists,
uint *rows_affected);
int
bool
update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to,
uint *rows_affected);
int
bool
drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
uint *rows_affected, bool only_from_disk);
int
void
drop_schema_events(THD *thd, char *db);
int
open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
int
bool
show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
/* Needed for both SHOW CREATE EVENT and INFORMATION_SCHEMA */
......@@ -104,23 +104,28 @@ class Events
static int
fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */);
bool
dump_internal_status(THD *thd);
private:
bool
check_system_tables(THD *thd);
/* Singleton DP is used */
Events(){}
Events();
~Events(){}
/* Singleton instance */
static Events singleton;
Event_queue *event_queue;
Event_scheduler *scheduler;
Event_scheduler *scheduler;
Event_db_repository *db_repository;
pthread_mutex_t LOCK_event_metadata;
pthread_mutex_t LOCK_event_metadata;
bool check_system_tables_error;
/* Prevent use of these */
Events(const Events &);
......
......@@ -3978,7 +3978,7 @@ byte *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type,
{
if (Events::opt_event_scheduler == 0)
thd->sys_var_tmp.long_value= 0;
else if (Events::get_instance()->is_started())
else if (Events::get_instance()->is_execution_of_events_started())
thd->sys_var_tmp.long_value= 1;
else
thd->sys_var_tmp.long_value= 2;
......
......@@ -5851,3 +5851,5 @@ ER_CANT_DROP_LOG_TABLE
eng "Cannot drop log table if log is enabled"
ER_EVENT_RECURSIVITY_FORBIDDEN
eng "Recursivity of EVENT DDL statements is forbidden when body is present"
ER_EVENTS_DB_ERROR
eng "Cannot proceed because the tables used by events were found damaged at server start"
......@@ -949,7 +949,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
exit:
(void)sp_drop_db_routines(thd, db); /* QQ Ignore errors for now */
error= Events::get_instance()->drop_schema_events(thd, db);
Events::get_instance()->drop_schema_events(thd, db);
/*
If this database was the client's selected database, we silently
change the client's selected database to nothing (to have an empty
......
......@@ -1445,11 +1445,11 @@ ev_sql_stmt:
{
LEX *lex=Lex;
// return back to the original memory root ASAP
/* return back to the original memory root ASAP */
lex->sphead->init_strings(YYTHD, lex);
lex->sphead->restore_thd_mem_root(YYTHD);
lex->sp_chistics.suid= SP_IS_SUID;//always the definer!
lex->sp_chistics.suid= SP_IS_SUID; //always the definer!
Lex->event_parse_data->init_body(YYTHD);
}
......@@ -1568,10 +1568,10 @@ create_function_tail:
sp->m_type= TYPE_ENUM_FUNCTION;
lex->sphead= sp;
/*
* We have to turn of CLIENT_MULTI_QUERIES while parsing a
* stored procedure, otherwise yylex will chop it into pieces
* at each ';'.
*/
We have to turn off CLIENT_MULTI_QUERIES while parsing a
stored procedure, otherwise yylex will chop it into pieces
at each ';'.
*/
$<ulong_num>$= YYTHD->client_capabilities & CLIENT_MULTI_QUERIES;
YYTHD->client_capabilities &= ~CLIENT_MULTI_QUERIES;
lex->sphead->m_param_begin= lex->tok_start+1;
......@@ -4673,25 +4673,24 @@ alter:
{}
| ALTER EVENT_SYM sp_name
/*
BE CAREFUL when you add a new rule to update the block where
YYTHD->client_capabilities is set back to original value
BE CAREFUL when you add a new rule to update the block where
YYTHD->client_capabilities is set back to original value
*/
{
/*
It is safe to use Lex->spname because
ALTER EVENT xxx RENATE TO yyy DO ALTER EVENT RENAME TO
is not allowed. Lex->spname is used in the case of RENAME TO
If it had to be supported spname had to be added to
Event_parse_data.
It is safe to use Lex->spname because
ALTER EVENT xxx RENATE TO yyy DO ALTER EVENT RENAME TO
is not allowed. Lex->spname is used in the case of RENAME TO
If it had to be supported spname had to be added to
Event_parse_data.
*/
Lex->spname= NULL;
if (!(Lex->event_parse_data= Event_parse_data::new_instance(YYTHD)))
YYABORT;
Lex->event_parse_data->identifier= $3;
/*
We have to turn of CLIENT_MULTI_QUERIES while parsing a
We have to turn off CLIENT_MULTI_QUERIES while parsing a
stored procedure, otherwise yylex will chop it into pieces
at each ';'.
*/
......@@ -4757,9 +4756,11 @@ ev_alter_on_schedule_completion: /* empty */ { $$= 0;}
opt_ev_rename_to: /* empty */ { $$= 0;}
| RENAME TO_SYM sp_name
{
LEX *lex=Lex;
lex->spname= $3; //use lex's spname to hold the new name
//the original name is in the Event_parse_data object
/*
Use lex's spname to hold the new name.
The original name is in the Event_parse_data object
*/
Lex->spname= $3;
$$= 1;
}
;
......@@ -4783,7 +4784,7 @@ alter_commands:
| remove_partitioning
| partitioning
/*
This part was added for release 5.1 by Mikael Ronström.
This part was added for release 5.1 by Mikael Ronstrm.
From here we insert a number of commands to manage the partitions of a
partitioned table such as adding partitions, dropping partitions,
reorganising partitions in various manners. In future releases the list
......
......@@ -2352,28 +2352,28 @@ bool check_column_name(const char *name)
Checks whether a table is intact. Should be done *just* after the table has
been opened.
Synopsis
SYNOPSIS
table_check_intact()
table - the table to check
table_f_count - expected number of columns in the table
table_def - expected structure of the table (column name and type)
last_create_time- the table->file->create_time of the table in memory
we have checked last time
error_num - ER_XXXX from the error messages file. When 0 no error
is sent to the client in case types does not match.
If different col number either
ER_COL_COUNT_DOESNT_MATCH_PLEASE_UPDATE or
ER_COL_COUNT_DOESNT_MATCH_CORRUPTED is used
table The table to check
table_f_count Expected number of columns in the table
table_def Expected structure of the table (column name and type)
last_create_time The table->file->create_time of the table in memory
we have checked last time
error_num ER_XXXX from the error messages file. When 0 no error
is sent to the client in case types does not match.
If different col number either
ER_COL_COUNT_DOESNT_MATCH_PLEASE_UPDATE or
ER_COL_COUNT_DOESNT_MATCH_CORRUPTED is used
RETURNS
0 - OK
1 - There was an error
FALSE OK
TRUE There was an error
*/
my_bool
table_check_intact(TABLE *table, uint table_f_count,
TABLE_FIELD_W_TYPE *table_def, time_t *last_create_time,
int error_num)
table_check_intact(TABLE *table, const uint table_f_count,
const TABLE_FIELD_W_TYPE *table_def,
time_t *last_create_time, int error_num)
{
uint i;
my_bool error= FALSE;
......@@ -2388,7 +2388,7 @@ table_check_intact(TABLE *table, uint table_f_count,
DBUG_PRINT("info", ("I am suspecting, checking table"));
if (fields_diff_count)
{
// previous MySQL version
/* previous MySQL version */
error= TRUE;
if (MYSQL_VERSION_ID > table->s->mysql_version)
{
......@@ -2411,22 +2411,22 @@ table_check_intact(TABLE *table, uint table_f_count,
else
{
/*
moving from newer mysql to older one -> let's say not an error but
Moving from newer mysql to older one -> let's say not an error but
will check the definition afterwards. If a column was added at the
end then we don't care much since it's not in the middle.
*/
error= FALSE;
}
}
//definitely something has changed
/* definitely something has changed */
char buffer[255];
for (i=0 ; i < table_f_count; i++, table_def++)
{
String sql_type(buffer, sizeof(buffer), system_charset_info);
sql_type.length(0);
/*
name changes are not fatal, we use sequence numbers => no prob for us
but this can show tampered table or broken table.
Name changes are not fatal, we use sequence numbers => no problem
for us but this can show tampered table or broken table.
*/
if (i < table->s->fields)
{
......@@ -2440,7 +2440,7 @@ table_check_intact(TABLE *table, uint table_f_count,
}
/*
IF the type does not match than something is really wrong
If the type does not match than something is really wrong
Check up to length - 1. Why?
1. datetime -> datetim -> the same
2. int(11) -> int(11 -> the same
......
......@@ -965,9 +965,9 @@ typedef struct st_table_field_w_type
my_bool
table_check_intact(TABLE *table, uint table_f_count,
TABLE_FIELD_W_TYPE *table_def, time_t *last_create_time,
int error_num);
table_check_intact(TABLE *table, const uint table_f_count,
const TABLE_FIELD_W_TYPE * const table_def,
time_t *last_create_time, int error_num);
static inline my_bitmap_map *tmp_use_all_columns(TABLE *table,
MY_BITMAP *bitmap)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment