Commit 81eadfca authored by unknown's avatar unknown

WL#1034 update

QUEUE implementation working now. this should be ready more or less
for testing once the debug output is being cleaned and some things
around DYNAMIC_ARRAY are cleaned
- fix handling in case of errors that lead to crashes, now no more crashes
  in case of table corruption and such.


include/queues.h:
  introduce a safe version of queue_insert that will extend the queue if
  necessary. the auto_extent is passed to the _ex version of init_queue()
mysys/queues.c:
  add init_queue_ex() implementation
  add queue_insert_safe() implementation
sql/event.cc:
  - move mysql_priv.h inclusion to event_priv.h
  - use a priority queue instead of DYNAMIC_ARRAY which is sorted
sql/event.h:
  reorder
sql/event_executor.cc:
  reorder
sql/event_priv.h:
  - reorder a bit
  - add macroses and functions for queue manipulation which stay on top
   of QUEUE (partly implemented for DYNAMIC_ARRAY but will be cleared to be
   only for QUEUE).
sql/event_timed.cc:
  allocate one more byte and zeroterminate, really
parent 1c5573a4
......@@ -35,6 +35,7 @@ typedef struct st_queue {
uint offset_to_key; /* compare is done on element+offset */
int max_at_top; /* Set if queue_top gives max */
int (*compare)(void *, byte *,byte *);
uint auto_extent;
} QUEUE;
#define queue_top(queue) ((queue)->root[1])
......@@ -49,14 +50,19 @@ typedef int (*queue_compare)(void *,byte *, byte *);
int init_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
pbool max_at_top, queue_compare compare,
void *first_cmp_arg);
int init_queue_ex(QUEUE *queue,uint max_elements,uint offset_to_key,
pbool max_at_top, queue_compare compare,
void *first_cmp_arg, uint auto_extent);
int reinit_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
pbool max_at_top, queue_compare compare,
void *first_cmp_arg);
int resize_queue(QUEUE *queue, uint max_elements);
void delete_queue(QUEUE *queue);
void queue_insert(QUEUE *queue,byte *element);
int queue_insert_safe(QUEUE *queue, byte *element);
byte *queue_remove(QUEUE *queue,uint idx);
#define queue_remove_all(queue) { (queue)->elements= 0; }
#define queue_is_full(queue) (queue->elements == queue->max_elements)
void _downheap(QUEUE *queue,uint idx);
void queue_fix(QUEUE *queue);
#define is_queue_inited(queue) ((queue)->root != 0)
......
......@@ -19,7 +19,7 @@
Implemention of queues from "Algoritms in C" by Robert Sedgewick.
An optimisation of _downheap suggested in Exercise 7.51 in "Data
Structures & Algorithms in C++" by Mark Allen Weiss, Second Edition
was implemented by Mikael Ronstrm 2005. Also the O(N) algorithm
was implemented by Mikael Ronstrom 2005. Also the O(N) algorithm
of queue_fix was implemented.
*/
......@@ -67,6 +67,46 @@ int init_queue(QUEUE *queue, uint max_elements, uint offset_to_key,
}
/*
Init queue, uses init_queue internally for init work but also accepts
auto_extent as parameter
SYNOPSIS
init_queue_ex()
queue Queue to initialise
max_elements Max elements that will be put in queue
offset_to_key Offset to key in element stored in queue
Used when sending pointers to compare function
max_at_top Set to 1 if you want biggest element on top.
compare Compare function for elements, takes 3 arguments.
first_cmp_arg First argument to compare function
auto_extent When the queue is full and there is insert operation
extend the queue.
NOTES
Will allocate max_element pointers for queue array
RETURN
0 ok
1 Could not allocate memory
*/
int init_queue_ex(QUEUE *queue, uint max_elements, uint offset_to_key,
pbool max_at_top, int (*compare) (void *, byte *, byte *),
void *first_cmp_arg, uint auto_extent)
{
int ret;
DBUG_ENTER("init_queue_ex");
if ((ret= init_queue(queue, max_elements, offset_to_key, max_at_top, compare,
first_cmp_arg)))
DBUG_RETURN(ret);
queue->auto_extent= auto_extent;
DBUG_RETURN(0);
}
/*
Reinitialize queue for other usage
......@@ -192,6 +232,31 @@ void queue_insert(register QUEUE *queue, byte *element)
}
}
/*
Does safe insert. If no more space left on the queue resize it.
Return codes:
0 - OK
1 - Cannot allocate more memory
2 - auto_extend is 0, the operation would
*/
int queue_insert_safe(register QUEUE *queue, byte *element)
{
if (queue->elements == queue->max_elements)
{
if (!queue->auto_extent)
return 2;
else if (resize_queue(queue, queue->max_elements + queue->auto_extent))
return 1;
}
queue_insert(queue, element);
return 0;
}
/* Remove item from queue */
/* Returns pointer to removed element */
......
......@@ -14,16 +14,14 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "mysql_priv.h"
#include "event.h"
#include "event_priv.h"
#include "event.h"
#include "sp.h"
/*
TODO list :
- The default value of created/modified should not be 0000-00-00 because of
STRICT mode restricions.
- Remove m_ prefixes of member variables.
- Use timestamps instead of datetime.
......@@ -53,9 +51,17 @@
- Consider using conditional variable when doing shutdown instead of
waiting till all worker threads end.
- Make event_timed::get_show_create_event() work
- Add function documentation whenever needed.
- Add logging to file
- Move comparison code to class event_timed
- Overload event_timed::new to put the event directly in the DYNAMIC_ARRAY.
This will skip copy operation as well as will simplify the code which is
now aware of events_array DYNAMIC_ARRAY
Warning:
- For now parallel execution is not possible because the same sp_head cannot be
executed few times!!! There is still no lock attached to particular event.
......@@ -67,19 +73,60 @@
bool mysql_event_table_exists= 1;
DYNAMIC_ARRAY events_array;
DYNAMIC_ARRAY evex_executing_queue;
DYNAMIC_ARRAY EXEC_QUEUE_DARR_NAME;
QUEUE EXEC_QUEUE_QUEUE_NAME;
MEM_ROOT evex_mem_root;
//extern volatile uint thread_running;
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
//////////////// Static functions follow ///////////////////////////
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
void
evex_queue_init(EVEX_QUEUE_TYPE *queue)
{
#ifndef EVEX_USE_QUEUE
VOID(my_init_dynamic_array(queue, sizeof(event_timed *), 50, 100));
#else
if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/,
0 /*smallest_on_top*/, event_timed_compare_q, NULL,
100 /*auto_extent*/))
sql_print_error("Insufficient memory to initialize executing queue.");
#endif
}
int
evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element)
{
#ifndef EVEX_USE_QUEUE
VOID(push_dynamic(queue, element));
return 0;
#else
return queue_insert_safe(queue, element);
#endif
}
void
evex_queue_top_updated(EVEX_QUEUE_TYPE *queue)
{
#ifdef EVEX_USE_QUEUE
queue_replaced(queue);
#endif
}
void
evex_queue_sort(EVEX_QUEUE_TYPE *queue)
{
#ifndef EVEX_USE_QUEUE
qsort((gptr) dynamic_element(queue, 0, event_timed**),
queue->elements,
sizeof(event_timed **),
(qsort_cmp) event_timed_compare);
#endif
}
/* NOTE Andrey: Document better
Compares two TIME structures.
......@@ -98,7 +145,7 @@ int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
}
inline int
int
my_time_compare(TIME *a, TIME *b)
{
/*
......@@ -107,6 +154,7 @@ my_time_compare(TIME *a, TIME *b)
*/
DBUG_ENTER("my_time_compare");
if (a->year > b->year)
DBUG_RETURN(1);
......@@ -143,19 +191,53 @@ my_time_compare(TIME *a, TIME *b)
if (a->second < b->second)
DBUG_RETURN(-1);
/*!! second_part is not compared !*/
if (a->second_part > b->second_part)
DBUG_RETURN(1);
if (a->second_part < b->second_part)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
int
evex_time_diff(TIME *a, TIME *b)
{
my_bool in_gap;
DBUG_ENTER("my_time_diff");
return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b);
}
inline int
event_timed_compare(event_timed **a, event_timed **b)
{
return my_time_compare(&(*a)->execute_at, &(*b)->execute_at);
my_ulonglong a_t, b_t;
a_t= TIME_to_ulonglong_datetime(&(*a)->execute_at)*100L +
(*a)->execute_at.second_part;
b_t= TIME_to_ulonglong_datetime(&(*b)->execute_at)*100L +
(*b)->execute_at.second_part;
if (a_t > b_t)
return 1;
else if (a_t < b_t)
return -1;
else
return 0;
// return my_time_compare(&(*a)->execute_at, &(*b)->execute_at);
}
int
event_timed_compare_q(void *vptr, byte* a, byte *b)
{
return event_timed_compare((event_timed **)&a, (event_timed **)&b);
}
/*
Open mysql.event table for read
......@@ -660,7 +742,10 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock)
VOID(push_dynamic(&events_array,(gptr) ett));
ett_copy= dynamic_element(&events_array, events_array.elements - 1,
event_timed*);
/**
VOID(push_dynamic(&evex_executing_queue, (gptr) &ett_copy));
**/
evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett_copy);
/*
There is a copy in the array which we don't need. sphead won't be
......@@ -674,10 +759,13 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock)
qsort of events_array.elements (the current number of elements).
We know that the elements are stored in a contiguous block w/o holes.
*/
/**
qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**),
evex_executing_queue.elements,
sizeof(event_timed **),
(qsort_cmp) event_timed_compare);
**/
evex_queue_sort(&EVEX_EQ_NAME);
if (use_lock)
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
......@@ -703,7 +791,7 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock)
if (use_lock)
VOID(pthread_mutex_lock(&LOCK_event_arrays));
/**
for (i= 0; i < evex_executing_queue.elements; ++i)
{
event_timed *et= *dynamic_element(&evex_executing_queue, i, event_timed**);
......@@ -733,6 +821,25 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock)
goto done;
}
}
**/
for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i)
{
event_timed *et= *evex_queue_element(&EVEX_EQ_NAME, i, event_timed**);
DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?",db->str,name->str, et->dbname.str,
et->name.str));
if (!sortcmp_lex_string(*name, et->name, system_charset_info) &&
!sortcmp_lex_string(*db, et->dbname, system_charset_info))
{
int idx= get_index_dynamic(&events_array, (gptr) et);
//we are lucky the event is in the executing queue, no need of second pass
//destruct first and then remove. the destructor will delete sp_head
et->free_sp();
evex_queue_delete_element(&EVEX_EQ_NAME, idx);
evex_queue_delete_element(&EVEX_EQ_NAME, i);
// ok, we have cleaned
goto done;
}
}
/*
ToDo Andrey : Think about whether second pass is needed. All events
......
......@@ -16,11 +16,9 @@
#ifndef _EVENT_H_
#define _EVENT_H_
#include "sp_head.h"
#include "sp.h"
extern ulong opt_event_executor;
#include "sp.h"
#include "sp_head.h"
#define EVEX_OK SP_OK
#define EVEX_KEY_NOT_FOUND SP_KEY_NOT_FOUND
......
This diff is collapsed.
......@@ -16,8 +16,11 @@
#ifndef _EVENT_PRIV_H_
#define _EVENT_PRIV_H_
#include "mysql_priv.h"
#define EVEX_USE_QUEUE
#define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \
{ VOID(pthread_mutex_unlock(&__mutex)); goto __label; }
......@@ -41,14 +44,6 @@ enum
EVEX_FIELD_COUNT /* a cool trick to count the number of fields :) */
};
extern bool evex_is_running;
extern bool mysql_event_table_exists;
extern DYNAMIC_ARRAY events_array;
extern DYNAMIC_ARRAY evex_executing_queue;
extern MEM_ROOT evex_mem_root;
extern pthread_mutex_t LOCK_event_arrays,
LOCK_workers_count,
LOCK_evex_running;
int
......@@ -60,4 +55,71 @@ evex_db_find_event_aux(THD *thd, const LEX_STRING dbname,
TABLE *
evex_open_event_table(THD *thd, enum thr_lock_type lock_type);
int
event_timed_compare_q(void *vptr, byte* a, byte *b);
int
evex_time_diff(TIME *a, TIME *b);
#define EXEC_QUEUE_QUEUE_NAME executing_queue
#define EXEC_QUEUE_DARR_NAME evex_executing_queue
#ifdef EVEX_USE_QUEUE
#define EVEX_QUEUE_TYPE QUEUE
#define EVEX_PTOQEL byte *
#define EVEX_EQ_NAME executing_queue
#define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue))
#define evex_queue_element(queue, idx, __cast) ((__cast)queue_top(queue))
#define evex_queue_delete_element(queue, idx) queue_remove(queue, idx)
#define evex_queue_destroy(queue) delete_queue(queue)
#define evex_queue_first_updated(queue) queue_replaced(queue)
#define evex_queue_insert(queue, element) queue_insert_safe(queue, element);
#else
#define EVEX_QUEUE_TYPE DYNAMIC_ARRAY
#define EVEX_PTOQEL gptr
#define EVEX_EQ_NAME evex_executing_queue
#define evex_queue_element(queue, idx, __cast) dynamic_element(queue,idx, __cast)
#define evex_queue_delete_element(queue, idx) delete_dynamic_element(queue, idx);
#define evex_queue_destroy(queue) delete_dynamic(queue)
/*
push_dynamic() expects ptr to the memory to put in, to make things fast
so when a pointer has to be put inside a ptr-to-ptr is being passed
*/
#define evex_queue_first_updated(queue)
#define evex_queue_insert(queue, element) VOID(push_dynamic(queue, &element))
#endif
void
evex_queue_init(EVEX_QUEUE_TYPE *queue);
int
evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element);
void
evex_queue_sort(EVEX_QUEUE_TYPE *queue);
#define evex_queue_num_elements(queue) queue.elements
extern bool evex_is_running;
extern bool mysql_event_table_exists;
extern DYNAMIC_ARRAY events_array;
extern DYNAMIC_ARRAY EXEC_QUEUE_DARR_NAME;
extern QUEUE EXEC_QUEUE_QUEUE_NAME;
extern MEM_ROOT evex_mem_root;
extern pthread_mutex_t LOCK_event_arrays,
LOCK_workers_count,
LOCK_evex_running;
extern ulonglong evex_main_thread_id;
#endif /* _EVENT_PRIV_H_ */
......@@ -14,9 +14,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "mysql_priv.h"
#include "event.h"
#include "event_priv.h"
#include "event.h"
#include "sp.h"
......@@ -789,7 +788,7 @@ event_timed::get_show_create_event(THD *thd, uint *length)
+ strlen("DO ") +
+ body.length + strlen(";");
ret= dst= (char*) alloc_root(thd->mem_root, len);
ret= dst= (char*) alloc_root(thd->mem_root, len + 1);
memcpy(dst, "CREATE EVENT ", tmp_len= strlen("CREATE EVENT "));
dst+= tmp_len;
memcpy(dst, dbname.str, tmp_len=dbname.length);
......@@ -832,7 +831,7 @@ event_timed::get_show_create_event(THD *thd, uint *length)
*dst= '\0';
*length= len;
dst[len]= '\0';
return ret;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment