Commit 377446fa authored by unknown's avatar unknown

WL#3337 (Event scheduler new architecture)

This is the first cut of separating Event_scheduler in two
classes which are more specialized.
Inheritance was used to separate methods and member variables.
Still Event_scheduler is a child of Event_queue. This dependency
will be removed soon.


sql/event_data_objects.cc:
  add comments
sql/event_db_repository.cc:
  coding style
sql/event_db_repository.h:
  add a call, will be implemented later
sql/event_queue.cc:
  Event_queue, still as super-class of Event_scheduler
sql/event_queue.h:
  Event_queue as super-class of Event_scheduler. Trying to
  separate the two classes
sql/event_scheduler.cc:
  Event_scheduler as child class of Event_queue.
  Trying to separate both classes.
sql/event_scheduler.h:
  Event_scheduler as child class of Event_queue.
  Trying to separate both classes.
sql/events.cc:
  Don't allocate on the stack the scheduler but on the heap.
  The exact way it is done will be changed, that's ok for now.
parent 8d961c45
...@@ -25,6 +25,19 @@ ...@@ -25,6 +25,19 @@
#define EVEX_MAX_INTERVAL_VALUE 2147483647L #define EVEX_MAX_INTERVAL_VALUE 2147483647L
/*
Returns a new instance
SYNOPSIS
Event_parse_data::new_instance()
RETURN VALUE
Address or NULL in case of error
NOTE
Created on THD's mem_root
*/
Event_parse_data * Event_parse_data *
Event_parse_data::new_instance(THD *thd) Event_parse_data::new_instance(THD *thd)
{ {
...@@ -32,6 +45,13 @@ Event_parse_data::new_instance(THD *thd) ...@@ -32,6 +45,13 @@ Event_parse_data::new_instance(THD *thd)
} }
/*
Constructor
SYNOPSIS
Event_parse_data::Event_parse_data()
*/
Event_parse_data::Event_parse_data() Event_parse_data::Event_parse_data()
{ {
item_execute_at= item_expression= item_starts= item_ends= NULL; item_execute_at= item_expression= item_starts= item_ends= NULL;
......
...@@ -1297,3 +1297,4 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na ...@@ -1297,3 +1297,4 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
DBUG_RETURN(OP_OK); DBUG_RETURN(OP_OK);
} }
...@@ -113,6 +113,9 @@ private: ...@@ -113,6 +113,9 @@ private:
int int
table_scan_all_for_i_s(THD *thd, TABLE *schema_table, TABLE *event_table); table_scan_all_for_i_s(THD *thd, TABLE *schema_table, TABLE *event_table);
static bool
check_system_tables(THD *thd);
MEM_ROOT repo_root; MEM_ROOT repo_root;
/* Prevent use of these */ /* Prevent use of these */
......
This diff is collapsed.
...@@ -16,5 +16,107 @@ ...@@ -16,5 +16,107 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
class sp_name;
class Event_timed;
class Event_db_repository;
class THD;
typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*);
class Event_scheduler;
class Event_queue
{
public:
Event_queue();
static void
init_mutexes();
static void
destroy_mutexes();
bool
init(Event_db_repository *db_repo);
void
deinit();
/* Methods for queue management follow */
int
create_event(THD *thd, Event_parse_data *et, bool check_existence);
int
update_event(THD *thd, Event_parse_data *et, LEX_STRING *new_schema,
LEX_STRING *new_name);
bool
drop_event(THD *thd, sp_name *name);
int
drop_schema_events(THD *thd, LEX_STRING schema);
int
drop_user_events(THD *thd, LEX_STRING *definer)
{ DBUG_ASSERT(0); return 0;}
uint
events_count();
uint
events_count_no_lock();
static bool
check_system_tables(THD *thd);
void
recalculate_queue(THD *thd);
void
empty_queue();
///////////////protected
Event_timed *
find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q);
int
load_events_from_db(THD *thd);
void
drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*)(Event_timed *,LEX_STRING *));
/* LOCK_event_queue is the mutex which protects the access to the queue. */
pthread_mutex_t LOCK_event_queue;
Event_db_repository *db_repository;
/* The MEM_ROOT of the object */
MEM_ROOT scheduler_root;
/* The sorted queue with the Event_timed objects */
QUEUE queue;
uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line;
const char* mutex_last_locked_in_func;
const char* mutex_last_unlocked_in_func;
bool mutex_queue_data_locked;
/* helper functions for working with mutexes & conditionals */
void
lock_data(const char *func, uint line);
void
unlock_data(const char *func, uint line);
static void
on_queue_change();
protected:
/* Singleton instance */
static Event_scheduler *singleton;
};
#endif /* _EVENT_QUEUE_H_ */ #endif /* _EVENT_QUEUE_H_ */
This diff is collapsed.
...@@ -21,7 +21,6 @@ class Event_timed; ...@@ -21,7 +21,6 @@ class Event_timed;
class Event_db_repository; class Event_db_repository;
class THD; class THD;
typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*);
int int
events_init(); events_init();
...@@ -29,10 +28,12 @@ events_init(); ...@@ -29,10 +28,12 @@ events_init();
void void
events_shutdown(); events_shutdown();
class Event_scheduler #include "event_queue.h"
#include "event_scheduler.h"
class Event_scheduler : public Event_queue
{ {
public: public:
enum enum_state enum enum_state
{ {
UNINITIALIZED= 0, UNINITIALIZED= 0,
...@@ -50,32 +51,22 @@ public: ...@@ -50,32 +51,22 @@ public:
RESUME= 2 RESUME= 2
}; };
/* Singleton access */ /* This is the current status of the life-cycle of the scheduler. */
static Event_scheduler* enum enum_state state;
get_instance();
/* Methods for queue management follow */
int
create_event(THD *thd, Event_parse_data *et, bool check_existence);
int
update_event(THD *thd, Event_parse_data *et, LEX_STRING *new_schema,
LEX_STRING *new_name);
bool
drop_event(THD *thd, sp_name *name);
static void
create_instance();
int /* Singleton access */
drop_schema_events(THD *thd, LEX_STRING schema); static Event_scheduler*
get_instance();
int bool
drop_user_events(THD *thd, LEX_STRING *definer) init(Event_db_repository *db_repo);
{ DBUG_ASSERT(0); return 0;}
uint void
events_count(); destroy();
/* State changing methods follow */ /* State changing methods follow */
...@@ -97,19 +88,13 @@ public: ...@@ -97,19 +88,13 @@ public:
int int
suspend_or_resume(enum enum_suspend_or_resume action); suspend_or_resume(enum enum_suspend_or_resume action);
/*
bool
init(Event_db_repository *db_repo);
void
destroy();
static void static void
init_mutexes(); init_mutexes();
static void static void
destroy_mutexes(); destroy_mutexes();
*/
void void
report_error_during_start(); report_error_during_start();
...@@ -124,35 +109,34 @@ public: ...@@ -124,35 +109,34 @@ public:
static int static int
dump_internal_status(THD *thd); dump_internal_status(THD *thd);
static bool /* helper functions for working with mutexes & conditionals */
check_system_tables(THD *thd); void
lock_data(const char *func, uint line);
private: void
Event_timed * unlock_data(const char *func, uint line);
find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q);
int
cond_wait(int cond, pthread_mutex_t *mutex);
void
queue_changed();
protected:
uint uint
workers_count(); workers_count();
bool
is_running_or_suspended();
/* helper functions */ /* helper functions */
bool bool
execute_top(THD *thd); execute_top(THD *thd, Event_timed *et);
void void
clean_queue(THD *thd); clean_memory(THD *thd);
void void
stop_all_running_events(THD *thd); stop_all_running_events(THD *thd);
int
load_events_from_db(THD *thd);
void
drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*)(Event_timed *,LEX_STRING *));
bool bool
check_n_suspend_if_needed(THD *thd); check_n_suspend_if_needed(THD *thd);
...@@ -163,47 +147,13 @@ private: ...@@ -163,47 +147,13 @@ private:
/* Singleton DP is used */ /* Singleton DP is used */
Event_scheduler(); Event_scheduler();
enum enum_cond_vars
{
COND_NONE= -1,
/*
COND_new_work is a conditional used to signal that there is a change
of the queue that should inform the executor thread that new event should
be executed sooner than previously expected, because of add/replace event.
*/
COND_new_work= 0,
/*
COND_started is a conditional used to synchronize the thread in which
::start() was called and the spawned thread. ::start() spawns a new thread
and then waits on COND_started but also checks when awaken that `state` is
either RUNNING or CANTSTART. Then it returns back.
*/
COND_started_or_stopped,
/*
Conditional used for signalling from the scheduler thread back to the
thread that calls ::suspend() or ::resume. Synchronizing the calls.
*/
COND_suspend_or_resume,
/* Must be always last */
COND_LAST
};
/* Singleton instance */ pthread_mutex_t *LOCK_scheduler_data;
static Event_scheduler singleton;
/* This is the current status of the life-cycle of the manager. */
enum enum_state state;
/* Set to start the scheduler in suspended state */ /* Set to start the scheduler in suspended state */
bool start_scheduler_suspended; bool start_scheduler_suspended;
/*
LOCK_scheduler_data is the mutex which protects the access to the
manager's queue as well as used when signalling COND_new_work,
COND_started and COND_shutdown.
*/
pthread_mutex_t LOCK_scheduler_data;
/* /*
Holds the thread id of the executor thread or 0 if the executor is not Holds the thread id of the executor thread or 0 if the executor is not
running. It is used by ::shutdown() to know which thread to kill with running. It is used by ::shutdown() to know which thread to kill with
...@@ -212,33 +162,27 @@ private: ...@@ -212,33 +162,27 @@ private:
*/ */
ulong thread_id; ulong thread_id;
pthread_cond_t cond_vars[COND_LAST]; enum enum_cond_vars
static const char * const cond_vars_names[COND_LAST]; {
COND_NONE= -1,
/* The MEM_ROOT of the object */ COND_new_work= 0,
MEM_ROOT scheduler_root; COND_started_or_stopped,
COND_suspend_or_resume,
Event_db_repository *db_repository; /* Must be always last */
COND_LAST
};
/* The sorted queue with the Event_timed objects */ uint mutex_last_locked_at_line_nr;
QUEUE queue; uint mutex_last_unlocked_at_line_nr;
const char* mutex_last_locked_in_func_name;
uint mutex_last_locked_at_line; const char* mutex_last_unlocked_in_func_name;
uint mutex_last_unlocked_at_line; int cond_waiting_on;
const char* mutex_last_locked_in_func;
const char* mutex_last_unlocked_in_func;
enum enum_cond_vars cond_waiting_on;
bool mutex_scheduler_data_locked; bool mutex_scheduler_data_locked;
/* helper functions for working with mutexes & conditionals */
void
lock_data(const char *func, uint line);
void static const char * const cond_vars_names[COND_LAST];
unlock_data(const char *func, uint line);
int pthread_cond_t cond_vars[COND_LAST];
cond_wait(enum enum_cond_vars, pthread_mutex_t *mutex);
private: private:
/* Prevent use of these */ /* Prevent use of these */
......
...@@ -559,6 +559,7 @@ void ...@@ -559,6 +559,7 @@ void
Events::init_mutexes() Events::init_mutexes()
{ {
db_repository= new Event_db_repository; db_repository= new Event_db_repository;
Event_scheduler::create_instance();
Event_scheduler::init_mutexes(); Event_scheduler::init_mutexes();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment