Commit e2219ec9 authored by Sergei Golubchik's avatar Sergei Golubchik

wt_thd_lazy_init(), per-thread deadlock search depths and timeouts

mysys/array.c:
  lazy alloc in dynamic array
sql-common/client.c:
  for dynamic array, specify init_alloc==alloc_increment explicitly
sql/mysqld.cc:
  per-thread deadlock search depths and timeouts
sql/set_var.cc:
  per-thread deadlock search depths and timeouts
sql/sql_class.h:
  per-thread deadlock search depths and timeouts
parent f8c1059c
......@@ -38,9 +38,6 @@ struct st_wt_resource_id {
} value;
};
extern uint wt_timeout_short, wt_deadlock_search_depth_short;
extern uint wt_timeout_long, wt_deadlock_search_depth_long;
#define WT_WAIT_STATS 24
#define WT_CYCLE_STATS 32
extern ulonglong wt_wait_table[WT_WAIT_STATS];
......@@ -105,6 +102,11 @@ typedef struct st_wt_thd {
*/
WT_RESOURCE *waiting_for;
LF_PINS *pins;
/* pointers to values */
ulong *timeout_short, *deadlock_search_depth_short;
ulong *timeout_long, *deadlock_search_depth_long;
/*
weight relates to the desirability of a transaction being killed if it's
part of a deadlock. In a deadlock situation transactions with lower weights
......@@ -149,7 +151,7 @@ typedef struct st_wt_thd {
void wt_init(void);
void wt_end(void);
void wt_thd_init(WT_THD *);
void wt_thd_lazy_init(WT_THD *, ulong *, ulong *, ulong *, ulong *);
void wt_thd_destroy(WT_THD *);
int wt_thd_will_wait_for(WT_THD *, WT_THD *, WT_RESOURCE_ID *);
int wt_thd_dontwait(WT_THD *);
......
......@@ -51,19 +51,14 @@ my_bool init_dynamic_array2(DYNAMIC_ARRAY *array, uint element_size,
if (init_alloc > 8 && alloc_increment > init_alloc * 2)
alloc_increment=init_alloc*2;
}
if (!init_alloc)
{
init_alloc=alloc_increment;
init_buffer= 0;
}
array->elements=0;
array->max_element=init_alloc;
array->alloc_increment=alloc_increment;
array->size_of_element=element_size;
if ((array->buffer= init_buffer))
DBUG_RETURN(FALSE);
if (!(array->buffer=(uchar*) my_malloc_ci(element_size*init_alloc,
if (init_alloc &&
!(array->buffer=(uchar*) my_malloc_ci(element_size*init_alloc,
MYF(MY_WME))))
{
array->max_element=0;
......
......@@ -43,9 +43,6 @@
#include <waiting_threads.h>
#include <m_string.h>
uint wt_timeout_short=100, wt_deadlock_search_depth_short=4;
uint wt_timeout_long=10000, wt_deadlock_search_depth_long=15;
/*
status variables:
distribution of cycle lengths
......@@ -73,13 +70,13 @@ static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock;
my_atomic_rwlock_wrunlock(&success_stats_lock); \
} while (0)
#define increment_cycle_stats(X,MAX) \
#define increment_cycle_stats(X,SLOT) \
do { \
uint i= (X), j= (MAX) == wt_deadlock_search_depth_long; \
uint i= (X); \
if (i >= WT_CYCLE_STATS) \
i= WT_CYCLE_STATS; \
my_atomic_rwlock_wrlock(&cycle_stats_lock); \
my_atomic_add32(&wt_cycle_stats[j][i], 1); \
my_atomic_add32(&wt_cycle_stats[SLOT][i], 1); \
my_atomic_rwlock_wrunlock(&cycle_stats_lock); \
} while (0)
......@@ -190,14 +187,29 @@ void wt_end()
DBUG_VOID_RETURN;
}
void wt_thd_init(WT_THD *thd)
static void fix_thd_pins(WT_THD *thd)
{
DBUG_ENTER("wt_thd_init");
if (unlikely(thd->pins == 0))
{
thd->pins=lf_hash_get_pins(&reshash);
#ifndef DBUG_OFF
thd->name=my_thread_name();
#endif
}
}
my_init_dynamic_array(&thd->my_resources, sizeof(WT_RESOURCE *), 10, 5);
thd->pins=lf_hash_get_pins(&reshash);
void wt_thd_lazy_init(WT_THD *thd, ulong *ds, ulong *ts, ulong *dl, ulong *tl)
{
DBUG_ENTER("wt_thd_lazy_init");
thd->waiting_for=0;
thd->my_resources.buffer= 0;
thd->my_resources.elements= 0;
thd->weight=0;
thd->deadlock_search_depth_short= ds;
thd->timeout_short= ts;
thd->deadlock_search_depth_long= dl;
thd->timeout_long= tl;
my_init_dynamic_array(&thd->my_resources, sizeof(WT_RESOURCE *), 0, 5);
#ifndef DBUG_OFF
thd->name=my_thread_name();
#endif
......@@ -208,12 +220,12 @@ void wt_thd_destroy(WT_THD *thd)
{
DBUG_ENTER("wt_thd_destroy");
if (thd->my_resources.buffer == 0)
DBUG_VOID_RETURN; /* nothing to do */
DBUG_ASSERT(thd->my_resources.elements == 0);
if (thd->pins != 0)
lf_hash_put_pins(thd->pins);
delete_dynamic(&thd->my_resources);
lf_hash_put_pins(thd->pins);
thd->waiting_for=0;
DBUG_VOID_RETURN;
}
......@@ -297,7 +309,8 @@ retry:
if (cursor == arg->thd)
{
ret= WT_DEADLOCK;
increment_cycle_stats(depth, arg->max_depth);
increment_cycle_stats(depth, arg->max_depth ==
*arg->thd->deadlock_search_depth_long);
arg->victim= cursor;
goto end;
}
......@@ -340,7 +353,8 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
ret= deadlock_search(&arg, blocker, depth);
if (ret == WT_DEPTH_EXCEEDED)
{
increment_cycle_stats(WT_CYCLE_STATS, max_depth);
increment_cycle_stats(WT_CYCLE_STATS, max_depth ==
*thd->deadlock_search_depth_long);
ret= WT_OK;
}
if (ret == WT_DEADLOCK && depth)
......@@ -379,6 +393,8 @@ static void unlock_lock_and_free_resource(WT_THD *thd, WT_RESOURCE *rc)
DBUG_VOID_RETURN;
}
fix_thd_pins(thd);
/* XXX if (rc->id.type->make_key) key= rc->id.type->make_key(&rc->id, &keylen); else */
{
key= &rc->id;
......@@ -450,8 +466,7 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%llu",
thd->name, blocker->name, resid->value.num));
if (unlikely(thd->my_resources.buffer == 0))
wt_thd_init(thd);
fix_thd_pins(thd);
if (thd->waiting_for == 0)
{
......@@ -538,7 +553,7 @@ retry:
}
rc_unlock(rc);
if (deadlock(thd, blocker, 1, wt_deadlock_search_depth_short))
if (deadlock(thd, blocker, 1, *thd->deadlock_search_depth_short))
{
wt_thd_dontwait(thd);
DBUG_RETURN(WT_DEADLOCK);
......@@ -584,16 +599,16 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
ret= WT_OK;
rc_unlock(rc);
set_timespec_time_nsec(timeout, starttime, wt_timeout_short*ULL(1000));
set_timespec_time_nsec(timeout, starttime, (*thd->timeout_short)*ULL(1000));
if (ret == WT_TIMEOUT)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
if (ret == WT_TIMEOUT)
{
if (deadlock(thd, thd, 0, wt_deadlock_search_depth_long))
if (deadlock(thd, thd, 0, *thd->deadlock_search_depth_long))
ret= WT_DEADLOCK;
else if (wt_timeout_long > wt_timeout_short)
else if (*thd->timeout_long > *thd->timeout_short)
{
set_timespec_time_nsec(timeout, starttime, wt_timeout_long*ULL(1000));
set_timespec_time_nsec(timeout, starttime, (*thd->timeout_long)*ULL(1000));
if (!thd->killed)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
}
......@@ -644,7 +659,7 @@ void wt_thd_release(WT_THD *thd, WT_RESOURCE_ID *resid)
if (rc->mutex)
safe_mutex_assert_owner(rc->mutex);
#endif
}
}
unlock_lock_and_free_resource(thd, rc);
if (resid)
{
......
......@@ -996,7 +996,7 @@ static int add_init_command(struct st_mysql_options *options, const char *cmd)
{
options->init_commands= (DYNAMIC_ARRAY*)my_malloc(sizeof(DYNAMIC_ARRAY),
MYF(MY_WME));
init_dynamic_array(options->init_commands,sizeof(char*),0,5 CALLER_INFO);
init_dynamic_array(options->init_commands,sizeof(char*),5,5 CALLER_INFO);
}
if (!(tmp= my_strdup(cmd,MYF(MY_WME))) ||
......
......@@ -5570,7 +5570,11 @@ enum options_mysqld
OPT_MIN_EXAMINED_ROW_LIMIT,
OPT_LOG_SLOW_SLAVE_STATEMENTS,
OPT_DEBUG_CRC, OPT_DEBUG_ON, OPT_OLD_MODE,
OPT_SLAVE_EXEC_MODE
OPT_SLAVE_EXEC_MODE,
OPT_DEADLOCK_SEARCH_DEPTH_SHORT,
OPT_DEADLOCK_SEARCH_DEPTH_LONG,
OPT_DEADLOCK_TIMEOUT_SHORT,
OPT_DEADLOCK_TIMEOUT_LONG
};
......@@ -5693,6 +5697,26 @@ struct my_option my_long_options[] =
NO_ARG, 0, 0, 0, 0, 0, 0},
{"datadir", 'h', "Path to the database root.", (uchar**) &mysql_data_home,
(uchar**) &mysql_data_home, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"deadlock-search-depth-short", OPT_DEADLOCK_SEARCH_DEPTH_SHORT,
"Short search depth for the two-step deadlock detection",
(uchar**) &global_system_variables.wt_deadlock_search_depth_short,
(uchar**) &global_system_variables.wt_deadlock_search_depth_short,
0, GET_ULONG, REQUIRED_ARG, 4, 0, 32, 0, 0, 0},
{"deadlock-search-depth-long", OPT_DEADLOCK_SEARCH_DEPTH_LONG,
"Long search depth for the two-step deadlock detection",
(uchar**) &global_system_variables.wt_deadlock_search_depth_long,
(uchar**) &global_system_variables.wt_deadlock_search_depth_long,
0, GET_ULONG, REQUIRED_ARG, 15, 0, 33, 0, 0, 0},
{"deadlock-timeout-short", OPT_DEADLOCK_TIMEOUT_SHORT,
"Short timeout for the two-step deadlock detection (in microseconds)",
(uchar**) &global_system_variables.wt_timeout_short,
(uchar**) &global_system_variables.wt_timeout_short,
0, GET_ULONG, REQUIRED_ARG, 100, 0, ULONG_MAX, 0, 0, 0},
{"deadlock-timeout-long", OPT_DEADLOCK_TIMEOUT_LONG,
"Long timeout for the two-step deadlock detection (in microseconds)",
(uchar**) &global_system_variables.wt_timeout_long,
(uchar**) &global_system_variables.wt_timeout_long,
0, GET_ULONG, REQUIRED_ARG, 10000, 0, ULONG_MAX, 0, 0, 0},
#ifndef DBUG_OFF
{"debug", '#', "Debug log.", (uchar**) &default_dbug_option,
(uchar**) &default_dbug_option, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
......
......@@ -59,7 +59,7 @@
#include <thr_alarm.h>
#include <myisam.h>
#include <my_dir.h>
#include <waiting_threads.h>
#include "events.h"
/* WITH_NDBCLUSTER_STORAGE_ENGINE */
......@@ -227,6 +227,19 @@ static sys_var_long_ptr sys_concurrent_insert(&vars, "concurrent_insert",
static sys_var_long_ptr sys_connect_timeout(&vars, "connect_timeout",
&connect_timeout);
static sys_var_const_str sys_datadir(&vars, "datadir", mysql_real_data_home);
static sys_var_thd_ulong sys_deadlock_search_depth_short(&vars,
"deadlock_search_depth_short",
&SV::wt_deadlock_search_depth_short);
static sys_var_thd_ulong sys_deadlock_search_depth_long(&vars,
"deadlock_search_depth_long",
&SV::wt_deadlock_search_depth_long);
static sys_var_thd_ulong sys_deadlock_timeout_short(&vars,
"deadlock_timeout_short",
&SV::wt_timeout_short);
static sys_var_thd_ulong sys_deadlock_timeout_long(&vars,
"deadlock_timeout_long",
&SV::wt_timeout_long);
#ifndef DBUG_OFF
static sys_var_thd_dbug sys_dbug(&vars, "debug");
#endif
......
......@@ -353,6 +353,9 @@ struct system_variables
DATE_TIME_FORMAT *time_format;
my_bool sysdate_is_now;
/* deadlock detection */
ulong wt_timeout_short, wt_deadlock_search_depth_short;
ulong wt_timeout_long, wt_deadlock_search_depth_long;
};
......@@ -1349,9 +1352,14 @@ public:
st_transactions()
{
#ifdef USING_TRANSACTIONS
THD *thd=current_thd;
bzero((char*)this, sizeof(*this));
xid_state.xid.null();
init_sql_alloc(&mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
wt_thd_lazy_init(&wt, &thd->variables.wt_deadlock_search_depth_short,
&thd->variables.wt_timeout_short,
&thd->variables.wt_deadlock_search_depth_long,
&thd->variables.wt_timeout_long);
#else
xid_state.xa_state= XA_NOTR;
#endif
......
......@@ -26,6 +26,9 @@ struct test_wt_thd {
uint i, cnt;
pthread_mutex_t lock;
ulong wt_timeout_short=100, wt_deadlock_search_depth_short=4;
ulong wt_timeout_long=10000, wt_deadlock_search_depth_long=15;
#define reset(ARRAY) bzero(ARRAY, sizeof(ARRAY))
enum { LATEST, RANDOM, YOUNGEST, LOCKS } kill_strategy;
......@@ -55,15 +58,6 @@ pthread_handler_t test_wt(void *arg)
if (kill_strategy == LOCKS)
thds[id].thd.weight= 0;
/*
wt_thd_init() is supposed to be called in the thread that will use it.
We didn't do that, and now need to fix the broken object.
*/
thds[id].thd.pins->stack_ends_here= & my_thread_var->stack_ends_here;
#ifndef DBUG_OFF
thds[id].thd.name=my_thread_name();
#endif
for (m= *(int *)arg; m ; m--)
{
WT_RESOURCE_ID resid;
......@@ -81,11 +75,6 @@ retry:
i= rnd() % (THREADS-1);
if (i >= id) i++;
#ifndef DBUG_OFF
if (thds[i].thd.name==0)
goto retry;
#endif
for (k=n; k >=j; k--)
if (blockers[k] == i)
goto retry;
......@@ -133,7 +122,7 @@ retry:
#define DEL "(deleted)"
char *x=malloc(strlen(thds[id].thd.name)+sizeof(DEL)+1);
strxmov(x, thds[id].thd.name, DEL, 0);
thds[id].thd.name=x; /* it's a memory leak, go on, shot me */
thds[id].thd.name=x; /* it's a memory leak, go on, shoot me */
}
#endif
......@@ -149,12 +138,6 @@ void do_one_test()
{
double sum, sum0;
#ifndef DBUG_OFF
for (cnt=0; cnt < THREADS; cnt++)
thds[cnt].thd.name=0;
#endif
reset(wt_cycle_stats);
reset(wt_wait_stats);
wt_success_stats=0;
......@@ -195,7 +178,9 @@ void do_tests()
wt_init();
for (cnt=0; cnt < THREADS; cnt++)
{
wt_thd_init(& thds[cnt].thd);
wt_thd_lazy_init(& thds[cnt].thd,
& wt_deadlock_search_depth_short, & wt_timeout_short,
& wt_deadlock_search_depth_long, & wt_timeout_long);
pthread_mutex_init(& thds[cnt].lock, 0);
}
{
......@@ -240,6 +225,14 @@ void do_tests()
wt_thd_release_all(& thds[2].thd);
wt_thd_release_all(& thds[3].thd);
pthread_mutex_unlock(&lock);
for (cnt=0; cnt < 3; cnt++)
{
wt_thd_destroy(& thds[cnt].thd);
wt_thd_lazy_init(& thds[cnt].thd,
& wt_deadlock_search_depth_short, & wt_timeout_short,
& wt_deadlock_search_depth_long, & wt_timeout_long);
}
}
wt_deadlock_search_depth_short=6;
......@@ -248,9 +241,9 @@ void do_tests()
wt_deadlock_search_depth_long=16;
DBUG_PRINT("wt", ("================= stress test ==================="));
diag("timeout_short=%d us, deadlock_search_depth_short=%d",
diag("timeout_short=%lu us, deadlock_search_depth_short=%lu",
wt_timeout_short, wt_deadlock_search_depth_short);
diag("timeout_long=%d us, deadlock_search_depth_long=%d",
diag("timeout_long=%lu us, deadlock_search_depth_long=%lu",
wt_timeout_long, wt_deadlock_search_depth_long);
#define test_kill_strategy(X) \
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment