Commit 08d61de7 authored by unknown's avatar unknown

Separated service thread framework to be used for update/delete purge...

Separated service thread framework to be used for update/delete purge (WL#3067) and group commit (WL#3080).

storage/maria/CMakeLists.txt:
  Added files of service thread framework.
storage/maria/Makefile.am:
  Added files of service thread framework.
storage/maria/ma_checkpoint.c:
  Separated service thread framework.
storage/maria/ma_servicethread.c:
  Added files of service thread framework.
storage/maria/ma_servicethread.h:
  Added files of service thread framework.
parent 7dc83c50
...@@ -43,7 +43,7 @@ SET(MARIA_SOURCES ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c ...@@ -43,7 +43,7 @@ SET(MARIA_SOURCES ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c
ma_sp_key.c ma_control_file.c ma_loghandler.c ma_sp_key.c ma_control_file.c ma_loghandler.c
ma_pagecache.c ma_pagecaches.c ma_pagecache.c ma_pagecaches.c
ma_checkpoint.c ma_recovery.c ma_commit.c ma_pagecrc.c ma_checkpoint.c ma_recovery.c ma_commit.c ma_pagecrc.c
ha_maria.h maria_def.h ma_recovery_util.c ha_maria.h maria_def.h ma_recovery_util.c ma_servicethread.c
) )
IF(NOT SOURCE_SUBLIBS) IF(NOT SOURCE_SUBLIBS)
......
...@@ -73,7 +73,8 @@ noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \ ...@@ -73,7 +73,8 @@ noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \
ma_loghandler.h ma_loghandler_lsn.h ma_pagecache.h \ ma_loghandler.h ma_loghandler_lsn.h ma_pagecache.h \
ma_checkpoint.h ma_recovery.h ma_commit.h ma_state.h \ ma_checkpoint.h ma_recovery.h ma_commit.h ma_state.h \
trnman_public.h ma_check_standalone.h \ trnman_public.h ma_check_standalone.h \
ma_key_recover.h ma_recovery_util.h ma_key_recover.h ma_recovery_util.h \
ma_servicethread.h
ma_test1_DEPENDENCIES= $(LIBRARIES) ma_test1_DEPENDENCIES= $(LIBRARIES)
ma_test1_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \ ma_test1_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
$(top_builddir)/storage/myisam/libmyisam.a \ $(top_builddir)/storage/myisam/libmyisam.a \
...@@ -135,7 +136,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \ ...@@ -135,7 +136,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \
ma_pagecache.c ma_pagecaches.c \ ma_pagecache.c ma_pagecaches.c \
ma_checkpoint.c ma_recovery.c ma_commit.c \ ma_checkpoint.c ma_recovery.c ma_commit.c \
ma_pagecrc.c ma_recovery_util.c \ ma_pagecrc.c ma_recovery_util.c \
ha_maria.cc ha_maria.cc ma_servicethread.c
CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA? maria_log_control maria_log.0000* CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA? maria_log_control maria_log.0000*
SUFFIXES = .sh SUFFIXES = .sh
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include "ma_blockrec.h" #include "ma_blockrec.h"
#include "ma_checkpoint.h" #include "ma_checkpoint.h"
#include "ma_loghandler_lsn.h" #include "ma_loghandler_lsn.h"
#include "ma_servicethread.h"
/** @brief type of checkpoint currently running */ /** @brief type of checkpoint currently running */
...@@ -43,10 +44,9 @@ static CHECKPOINT_LEVEL checkpoint_in_progress= CHECKPOINT_NONE; ...@@ -43,10 +44,9 @@ static CHECKPOINT_LEVEL checkpoint_in_progress= CHECKPOINT_NONE;
static pthread_mutex_t LOCK_checkpoint; static pthread_mutex_t LOCK_checkpoint;
/** @brief for killing the background checkpoint thread */ /** @brief for killing the background checkpoint thread */
static pthread_cond_t COND_checkpoint; static pthread_cond_t COND_checkpoint;
/** @brief if checkpoint module was inited or not */ /** @brief control structure for checkpoint background thread */
static my_bool checkpoint_inited= FALSE; static MA_SERVICE_THREAD_CONTROL checkpoint_control=
/** @brief 'kill' flag for the background checkpoint thread */ {THREAD_DEAD, FALSE, &LOCK_checkpoint, &COND_checkpoint};
static int checkpoint_thread_die;
/* is ulong like pagecache->blocks_changed */ /* is ulong like pagecache->blocks_changed */
static ulong pages_to_flush_before_next_checkpoint; static ulong pages_to_flush_before_next_checkpoint;
static PAGECACHE_FILE *dfiles, /**< data files to flush in background */ static PAGECACHE_FILE *dfiles, /**< data files to flush in background */
...@@ -99,7 +99,7 @@ int ma_checkpoint_execute(CHECKPOINT_LEVEL level, my_bool no_wait) ...@@ -99,7 +99,7 @@ int ma_checkpoint_execute(CHECKPOINT_LEVEL level, my_bool no_wait)
int result= 0; int result= 0;
DBUG_ENTER("ma_checkpoint_execute"); DBUG_ENTER("ma_checkpoint_execute");
if (!checkpoint_inited) if (!checkpoint_control.inited)
{ {
/* /*
If ha_maria failed to start, maria_panic_hton is called, we come here. If ha_maria failed to start, maria_panic_hton is called, we come here.
...@@ -326,17 +326,17 @@ int ma_checkpoint_init(ulong interval) ...@@ -326,17 +326,17 @@ int ma_checkpoint_init(ulong interval)
pthread_t th; pthread_t th;
int res= 0; int res= 0;
DBUG_ENTER("ma_checkpoint_init"); DBUG_ENTER("ma_checkpoint_init");
checkpoint_inited= TRUE; if (ma_service_thread_control_init(&checkpoint_control))
checkpoint_thread_die= 2; /* not yet born == dead */
if (pthread_mutex_init(&LOCK_checkpoint, MY_MUTEX_INIT_SLOW) ||
pthread_cond_init(&COND_checkpoint, 0))
res= 1; res= 1;
else if (interval > 0) else if (interval > 0)
{ {
compile_time_assert(sizeof(void *) >= sizeof(ulong)); compile_time_assert(sizeof(void *) >= sizeof(ulong));
if (!(res= pthread_create(&th, NULL, ma_checkpoint_background, if (!(res= pthread_create(&th, NULL, ma_checkpoint_background,
(void *)interval))) (void *)interval)))
checkpoint_thread_die= 0; /* thread lives, will have to be killed */ {
/* thread lives, will have to be killed */
checkpoint_control.status= THREAD_RUNNING;
}
} }
DBUG_RETURN(res); DBUG_RETURN(res);
} }
...@@ -423,30 +423,12 @@ void ma_checkpoint_end(void) ...@@ -423,30 +423,12 @@ void ma_checkpoint_end(void)
DBUG_EXECUTE_IF("maria_crash", DBUG_EXECUTE_IF("maria_crash",
{ DBUG_PRINT("maria_crash", ("now")); DBUG_ABORT(); }); { DBUG_PRINT("maria_crash", ("now")); DBUG_ABORT(); });
if (checkpoint_inited) if (checkpoint_control.inited)
{ {
pthread_mutex_lock(&LOCK_checkpoint); ma_service_thread_control_end(&checkpoint_control);
if (checkpoint_thread_die != 2) /* thread was started ok */
{
DBUG_PRINT("info",("killing Maria background checkpoint thread"));
checkpoint_thread_die= 1; /* kill it */
do /* and wait for it to be dead */
{
/* wake it up if it was in a sleep */
pthread_cond_broadcast(&COND_checkpoint);
DBUG_PRINT("info",("waiting for Maria background checkpoint thread"
" to die"));
pthread_cond_wait(&COND_checkpoint, &LOCK_checkpoint);
}
while (checkpoint_thread_die != 2);
}
pthread_mutex_unlock(&LOCK_checkpoint);
my_free((uchar *)dfiles, MYF(MY_ALLOW_ZERO_PTR)); my_free((uchar *)dfiles, MYF(MY_ALLOW_ZERO_PTR));
my_free((uchar *)kfiles, MYF(MY_ALLOW_ZERO_PTR)); my_free((uchar *)kfiles, MYF(MY_ALLOW_ZERO_PTR));
dfiles= kfiles= NULL; dfiles= kfiles= NULL;
pthread_mutex_destroy(&LOCK_checkpoint);
pthread_cond_destroy(&COND_checkpoint);
checkpoint_inited= FALSE;
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -586,7 +568,6 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -586,7 +568,6 @@ pthread_handler_t ma_checkpoint_background(void *arg)
#if 0 /* good for testing, to do a lot of checkpoints, finds a lot of bugs */ #if 0 /* good for testing, to do a lot of checkpoints, finds a lot of bugs */
sleeps=0; sleeps=0;
#endif #endif
struct timespec abstime;
switch (sleeps % interval) switch (sleeps % interval)
{ {
case 0: case 0:
...@@ -691,25 +672,11 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -691,25 +672,11 @@ pthread_handler_t ma_checkpoint_background(void *arg)
sleep_time= interval - (sleeps % interval); sleep_time= interval - (sleeps % interval);
} }
} }
pthread_mutex_lock(&LOCK_checkpoint); if (my_service_thread_sleep(&checkpoint_control,
if (checkpoint_thread_die == 1) sleep_time * 1000000000ULL))
break; break;
#if 0 /* good for testing, to do a lot of checkpoints, finds a lot of bugs */
pthread_mutex_unlock(&LOCK_checkpoint);
my_sleep(100000); /* a tenth of a second */
pthread_mutex_lock(&LOCK_checkpoint);
#else
/* To have a killable sleep, we use timedwait like our SQL GET_LOCK() */
DBUG_PRINT("info", ("sleeping %u seconds", sleep_time));
set_timespec(abstime, sleep_time);
pthread_cond_timedwait(&COND_checkpoint, &LOCK_checkpoint, &abstime);
#endif
if (checkpoint_thread_die == 1)
break;
pthread_mutex_unlock(&LOCK_checkpoint);
sleeps+= sleep_time; sleeps+= sleep_time;
} }
pthread_mutex_unlock(&LOCK_checkpoint);
DBUG_PRINT("info",("Maria background checkpoint thread ends")); DBUG_PRINT("info",("Maria background checkpoint thread ends"));
{ {
CHECKPOINT_LEVEL level= CHECKPOINT_FULL; CHECKPOINT_LEVEL level= CHECKPOINT_FULL;
...@@ -720,12 +687,7 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -720,12 +687,7 @@ pthread_handler_t ma_checkpoint_background(void *arg)
DBUG_EXECUTE_IF("maria_checkpoint_indirect", level= CHECKPOINT_INDIRECT;); DBUG_EXECUTE_IF("maria_checkpoint_indirect", level= CHECKPOINT_INDIRECT;);
ma_checkpoint_execute(level, FALSE); ma_checkpoint_execute(level, FALSE);
} }
pthread_mutex_lock(&LOCK_checkpoint); my_service_thread_signal_end(&checkpoint_control);
checkpoint_thread_die= 2; /* indicate that we are dead */
/* wake up ma_checkpoint_end() which may be waiting for our death */
pthread_cond_broadcast(&COND_checkpoint);
/* broadcast was inside unlock because ma_checkpoint_end() destroys mutex */
pthread_mutex_unlock(&LOCK_checkpoint);
my_thread_end(); my_thread_end();
return 0; return 0;
} }
......
#include "maria_def.h"
#include "ma_servicethread.h"
/**
Initializes the service thread
@param control control block
@return Operation status
@retval 0 OK
@retval 1 error
*/
int ma_service_thread_control_init(MA_SERVICE_THREAD_CONTROL *control)
{
int res= 0;
DBUG_ENTER("ma_service_thread_control_init");
DBUG_PRINT("init", ("control 0x%lx", (ulong) control));
control->inited= TRUE;
control->status= THREAD_DEAD; /* not yet born == dead */
res= (pthread_mutex_init(control->LOCK_control, MY_MUTEX_INIT_SLOW) ||
pthread_cond_init(control->COND_control, 0));
DBUG_PRINT("info", ("init: %s", (res ? "Error" : "OK")));
DBUG_RETURN(res);
}
/**
Kill the service thread
@param control control block
@note The service thread should react on condition and status equal
THREAD_DYING, by setting status THREAD_DEAD, and issuing message to
control thread via condition and exiting. The base way to do so is using
my_service_thread_sleep() and my_service_thread_signal_end()
*/
void ma_service_thread_control_end(MA_SERVICE_THREAD_CONTROL *control)
{
DBUG_ENTER("ma_service_thread_control_end");
DBUG_PRINT("init", ("control 0x%lx", (ulong) control));
DBUG_ASSERT(control->inited);
pthread_mutex_lock(control->LOCK_control);
if (control->status != THREAD_DEAD) /* thread was started OK */
{
DBUG_PRINT("info",("killing Maria background thread"));
control->status= THREAD_DYING; /* kill it */
do /* and wait for it to be dead */
{
/* wake it up if it was in a sleep */
pthread_cond_broadcast(control->COND_control);
DBUG_PRINT("info",("waiting for Maria background thread to die"));
pthread_cond_wait(control->COND_control, control->LOCK_control);
}
while (control->status != THREAD_DEAD);
}
pthread_mutex_unlock(control->LOCK_control);
pthread_mutex_destroy(control->LOCK_control);
pthread_cond_destroy(control->COND_control);
control->inited= FALSE;
DBUG_VOID_RETURN;
}
/**
Sleep for given number of nanoseconds with reaction on thread kill
@param control control block
@param sleep_time time of sleeping
@return Operation status
@retval FALSE Time out
@retval TRUE Thread should be killed
*/
my_bool my_service_thread_sleep(MA_SERVICE_THREAD_CONTROL *control,
ulonglong sleep_time)
{
struct timespec abstime;
my_bool res= FALSE;
DBUG_ENTER("my_service_thread_sleep");
DBUG_PRINT("init", ("control 0x%lx", (ulong) control));
pthread_mutex_lock(control->LOCK_control);
if (control->status == THREAD_DYING)
{
pthread_mutex_unlock(control->LOCK_control);
DBUG_RETURN(TRUE);
}
#if 0 /* good for testing, to do a lot of checkpoints, finds a lot of bugs */
pthread_mutex_unlock(&control->LOCK_control);
my_sleep(100000); /* a tenth of a second */
pthread_mutex_lock(&control->LOCK_control);
#else
/* To have a killable sleep, we use timedwait like our SQL GET_LOCK() */
DBUG_PRINT("info", ("sleeping %llu nano seconds", sleep_time));
if (sleep_time)
{
set_timespec_nsec(abstime, sleep_time);
pthread_cond_timedwait(control->COND_control,
control->LOCK_control, &abstime);
}
#endif
if (control->status == THREAD_DYING)
res= TRUE;
pthread_mutex_unlock(control->LOCK_control);
DBUG_RETURN(res);
}
/**
inform about thread exiting
@param control control block
*/
void my_service_thread_signal_end(MA_SERVICE_THREAD_CONTROL *control)
{
DBUG_ENTER("my_service_thread_signal_end");
DBUG_PRINT("init", ("control 0x%lx", (ulong) control));
pthread_mutex_lock(control->LOCK_control);
control->status = THREAD_DEAD; /* indicate that we are dead */
/*
wake up ma_service_thread_control_end which may be waiting for
our death
*/
pthread_cond_broadcast(control->COND_control);
/*
broadcast was inside unlock because ma_service_thread_control_end
destroys mutex
*/
pthread_mutex_unlock(control->LOCK_control);
DBUG_VOID_RETURN;
}
#include <my_pthread.h>
enum ma_service_thread_state {THREAD_RUNNING, THREAD_DYING, THREAD_DEAD};
typedef struct st_ma_service_thread_control
{
/** 'kill' flag for the background thread */
enum ma_service_thread_state status;
/** if thread module was inited or not */
my_bool inited;
/** for killing the background thread */
pthread_mutex_t *LOCK_control;
/** for killing the background thread */
pthread_cond_t *COND_control;
} MA_SERVICE_THREAD_CONTROL;
int ma_service_thread_control_init(MA_SERVICE_THREAD_CONTROL *control);
void ma_service_thread_control_end(MA_SERVICE_THREAD_CONTROL *control);
my_bool my_service_thread_sleep(MA_SERVICE_THREAD_CONTROL *control,
ulonglong sleep_time);
void my_service_thread_signal_end(MA_SERVICE_THREAD_CONTROL *control);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment