Commit 9c96fde1 authored by Sergei Golubchik's avatar Sergei Golubchik

post-review fixes

include/atomic/generic-msvc.h:
  prevent possible compiler warnings
include/lf.h:
  comments, better definition for LF_HASH_OVERHEAD
include/maria.h:
  define MARIA_CANNOT_ROLLBACK here
include/my_pthread.h:
  avoid possible name clash
include/waiting_threads.h:
  comments, const, move WT_RESOURCE to waiting_threads.c
mysql-test/suite/maria/r/maria_notembedded.result:
  new test
mysql-test/suite/maria/t/maria_notembedded.test:
  new test - 5-way deadlock
mysys/lf_hash.c:
  better definition for LF_HASH_OVERHEAD
mysys/my_static.c:
  comment
mysys/my_thr_init.c:
  casts
mysys/waiting_threads.c:
  comments, asserts, etc
server-tools/instance-manager/parse.cc:
  fix my_init_dynamic_array() to follow new calling conventions
sql/mysqld.cc:
  call wt_init after set_proper_floating_point_mode
sql/sql_class.h:
  comment
storage/maria/ha_maria.cc:
  move MARIA_CANNOT_ROLLBACK to a common header
storage/maria/ma_commit.c:
  comment
storage/maria/ma_write.c:
  comments, check for HA_ERR_FOUND_DUPP_KEY
storage/maria/trnman.c:
  comments, assert
storage/maria/trnman.h:
  comments
storage/maria/unittest/trnman-t.c:
  be paranoid
unittest/mysys/lf-t.c:
  comments
unittest/mysys/waiting_threads-t.c:
  comments, safety, memory leak
parent e01f6c89
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -55,7 +55,7 @@ LONG _InterlockedExchangeAdd (LONG volatile *Addend, LONG Value); ...@@ -55,7 +55,7 @@ LONG _InterlockedExchangeAdd (LONG volatile *Addend, LONG Value);
#define IL_EXCHG_ADD32(X,Y) InterlockedExchangeAdd((volatile LONG *)(X),(Y)) #define IL_EXCHG_ADD32(X,Y) InterlockedExchangeAdd((volatile LONG *)(X),(Y))
#define IL_COMP_EXCHG32(X,Y,Z) InterlockedCompareExchange((volatile LONG *)(X),(Y),(Z)) #define IL_COMP_EXCHG32(X,Y,Z) InterlockedCompareExchange((volatile LONG *)(X),(Y),(Z))
#define IL_COMP_EXCHGptr InterlockedCompareExchangePointer #define IL_COMP_EXCHGptr InterlockedCompareExchangePointer
#define IL_EXCHG32 InterlockedExchange #define IL_EXCHG32(X,Y) InterlockedExchange((volatile LONG *)(X),(Y))
#define IL_EXCHGptr InterlockedExchangePointer #define IL_EXCHGptr InterlockedExchangePointer
#define make_atomic_add_body(S) \ #define make_atomic_add_body(S) \
v= IL_EXCHG_ADD ## S (a, v) v= IL_EXCHG_ADD ## S (a, v)
......
/* Copyright (C) 2007 MySQL AB /* Copyright (C) 2007-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -187,8 +187,8 @@ typedef struct st_lf_allocator { ...@@ -187,8 +187,8 @@ typedef struct st_lf_allocator {
uchar * volatile top; uchar * volatile top;
uint element_size; uint element_size;
uint32 volatile mallocs; uint32 volatile mallocs;
void (*constructor)(uchar *); void (*constructor)(uchar *); /* called, when an object is malloc()'ed */
void (*destructor)(uchar *); void (*destructor)(uchar *); /* called, when an object is free()'d */
} LF_ALLOCATOR; } LF_ALLOCATOR;
void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset); void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset);
...@@ -219,7 +219,7 @@ lock_wrap(lf_alloc_new, void *, ...@@ -219,7 +219,7 @@ lock_wrap(lf_alloc_new, void *,
#define LF_HASH_UNIQUE 1 #define LF_HASH_UNIQUE 1
/* lf_hash overhead per element (that is, sizeof(LF_SLIST) */ /* lf_hash overhead per element (that is, sizeof(LF_SLIST) */
#define LF_HASH_OVERHEAD (sizeof(int*)*4) extern const int LF_HASH_OVERHEAD;
typedef struct { typedef struct {
LF_DYNARRAY array; /* hash itself */ LF_DYNARRAY array; /* hash itself */
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -33,6 +33,8 @@ extern "C" { ...@@ -33,6 +33,8 @@ extern "C" {
#include <myisamchk.h> #include <myisamchk.h>
#include <mysql/plugin.h> #include <mysql/plugin.h>
#define MARIA_CANNOT_ROLLBACK
/* /*
Limit max keys according to HA_MAX_POSSIBLE_KEY; See myisamchk.h for details Limit max keys according to HA_MAX_POSSIBLE_KEY; See myisamchk.h for details
*/ */
......
/* Copyright (C) 2000 MySQL AB /* Copyright (C) 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -430,19 +430,19 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex); ...@@ -430,19 +430,19 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex);
/* adapt for two different flavors of struct timespec */ /* adapt for two different flavors of struct timespec */
#ifdef HAVE_TIMESPEC_TS_SEC #ifdef HAVE_TIMESPEC_TS_SEC
#define TV_sec ts_sec #define MY_tv_sec ts_sec
#define TV_nsec ts_nsec #define MY_tv_nsec ts_nsec
#else #else
#define TV_sec tv_sec #define MY_tv_sec tv_sec
#define TV_nsec tv_nsec #define MY_tv_nsec tv_nsec
#endif /* HAVE_TIMESPEC_TS_SEC */ #endif /* HAVE_TIMESPEC_TS_SEC */
#ifndef set_timespec_time_nsec #ifndef set_timespec_time_nsec
#define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \ #define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \
ulonglong nsec= (NSEC); \ ulonglong nsec= (NSEC); \
ulonglong now= (TIME) + (nsec/100); \ ulonglong now= (TIME) + (nsec/100); \
(ABSTIME).TV_sec= (now / ULL(10000000)); \ (ABSTIME).MY_tv_sec= (now / ULL(10000000)); \
(ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + (nsec % 100)); \ (ABSTIME).MY_tv_nsec= (now % ULL(10000000) * 100 + (nsec % 100)); \
} while(0) } while(0)
#endif /* !set_timespec_time_nsec */ #endif /* !set_timespec_time_nsec */
......
/* Copyright (C) 2008 MySQL AB /* Copyright (C) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -24,16 +24,18 @@ ...@@ -24,16 +24,18 @@
C_MODE_START C_MODE_START
typedef struct st_wt_resource_id WT_RESOURCE_ID; typedef struct st_wt_resource_id WT_RESOURCE_ID;
typedef struct st_wt_resource WT_RESOURCE;
typedef struct st_wt_resource_type { typedef struct st_wt_resource_type {
int (*compare)(void *a, void *b); my_bool (*compare)(const void *a, const void *b);
const void *(*make_key)(WT_RESOURCE_ID *id, uint *len); const void *(*make_key)(const WT_RESOURCE_ID *id, uint *len); /* not used */
} WT_RESOURCE_TYPE; } WT_RESOURCE_TYPE;
struct st_wt_resource_id { struct st_wt_resource_id {
ulonglong value; ulonglong value;
WT_RESOURCE_TYPE *type; const WT_RESOURCE_TYPE *type;
}; };
/* the below differs from sizeof(WT_RESOURCE_ID) by the amount of padding */
#define sizeof_WT_RESOURCE_ID (sizeof(ulonglong)+sizeof(void*)) #define sizeof_WT_RESOURCE_ID (sizeof(ulonglong)+sizeof(void*))
#define WT_WAIT_STATS 24 #define WT_WAIT_STATS 24
...@@ -43,93 +45,17 @@ extern uint32 wt_wait_stats[WT_WAIT_STATS+1]; ...@@ -43,93 +45,17 @@ extern uint32 wt_wait_stats[WT_WAIT_STATS+1];
extern uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1]; extern uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1];
extern uint32 wt_success_stats; extern uint32 wt_success_stats;
/*
'lock' protects 'owners', 'state', and 'waiter_count'
'id' is read-only
a resource is picked up from a hash in a lock-free manner
it's returned pinned, so it cannot be freed at once
but it may be freed right after the pin is removed
to free a resource it should be
1. have no owners
2. have no waiters
two ways to access a resource:
1. find it in a hash
- it's returned pinned.
a) take a lock in exclusive mode
b) check the state, it should be ACTIVE
c) unpin
2. by a direct reference
- could only used if a resource cannot be freed
e.g. accessing a resource by thd->waiting_for is safe,
a resource cannot be freed as there's a thread waiting for it
*/
typedef struct st_wt_resource {
WT_RESOURCE_ID id;
uint waiter_count;
enum { ACTIVE, FREE } state;
#ifndef DBUG_OFF
pthread_mutex_t *mutex;
#endif
/*
before the 'lock' all elements are mutable, after (and including) -
immutable in the sense that lf_hash_insert() won't memcpy() over them.
See wt_init().
*/
#ifdef WT_RWLOCKS_USE_MUTEXES
/*
we need a special rwlock-like 'lock' to allow readers bypass
waiting writers, otherwise readers can deadlock. For example:
A waits on resource x, owned by B, B waits on resource y, owned
by A, we have a cycle (A->x->B->y->A)
Both A and B start deadlock detection:
A locks x B locks y
A goes deeper B goes deeper
A locks y B locks x
with mutexes it would deadlock. With rwlocks it won't, as long
as both A and B are taking read locks (and they do).
But other threads may take write locks. Assume there's
C who wants to start waiting on x, and D who wants to start
waiting on y.
A read-locks x B read-locks y
A goes deeper B goes deeper
=> C write-locks x (to add a new edge) D write-locks y
.. C is blocked D is blocked
A read-locks y B read-locks x
Now, if a read lock can bypass a pending wrote lock request, we're fine.
If it can not, we have a deadlock.
writer starvation is technically possible, but unlikely, because
the contention is expected to be low.
*/
struct {
pthread_cond_t cond;
pthread_mutex_t mutex;
uint readers: 16;
uint pending_writers: 15;
uint write_locked: 1;
} lock;
#else
rw_lock_t lock;
#endif
pthread_cond_t cond;
DYNAMIC_ARRAY owners;
} WT_RESOURCE;
typedef struct st_wt_thd { typedef struct st_wt_thd {
/* /*
XXX XXX
there's no protection (mutex) against concurrent access of there's no protection (mutex) against concurrent access of the
the dynarray below. it is assumed that a caller will have it dynarray below. it is assumed that a caller will have it anyway
automatically (not to protect this array but to protect its (not to protect this array but to protect its own - caller's -
own - caller's - data structures, and we'll get it for free. data structures), and we'll get it for free. A caller needs to
If not, we'll need to add a mutex ensure that a blocker won't release a resource before a blocked
thread starts waiting, which is usually done with a mutex.
If the above assumption is wrong, we'll need to add a mutex here.
*/ */
DYNAMIC_ARRAY my_resources; DYNAMIC_ARRAY my_resources;
/* /*
...@@ -141,8 +67,10 @@ typedef struct st_wt_thd { ...@@ -141,8 +67,10 @@ typedef struct st_wt_thd {
LF_PINS *pins; LF_PINS *pins;
/* pointers to values */ /* pointers to values */
ulong *timeout_short, *deadlock_search_depth_short; const ulong *timeout_short;
ulong *timeout_long, *deadlock_search_depth_long; const ulong *deadlock_search_depth_short;
const ulong *timeout_long;
const ulong *deadlock_search_depth_long;
/* /*
weight relates to the desirability of a transaction being killed if it's weight relates to the desirability of a transaction being killed if it's
...@@ -169,13 +97,13 @@ typedef struct st_wt_thd { ...@@ -169,13 +97,13 @@ typedef struct st_wt_thd {
*/ */
ulong volatile weight; ulong volatile weight;
/* /*
'killed' is indirectly protected by waiting_for->lock - 'killed' is indirectly protected by waiting_for->lock because
a killed thread needs to clear its 'waiting_for', and thus needs a lock. a killed thread needs to clear its 'waiting_for' and thus needs a lock.
That is a thread needs an exclusive lock to read 'killed' reliably. That is a thread needs an exclusive lock to read 'killed' reliably.
But other threads may change 'killed' from 0 to 1, a shared But other threads may change 'killed' from 0 to 1, a shared
lock is enough for that. lock is enough for that.
*/ */
my_bool volatile killed; my_bool killed;
#ifndef DBUG_OFF #ifndef DBUG_OFF
const char *name; const char *name;
#endif #endif
...@@ -189,13 +117,13 @@ typedef struct st_wt_thd { ...@@ -189,13 +117,13 @@ typedef struct st_wt_thd {
void wt_init(void); void wt_init(void);
void wt_end(void); void wt_end(void);
void wt_thd_lazy_init(WT_THD *, ulong *, ulong *, ulong *, ulong *); void wt_thd_lazy_init(WT_THD *, const ulong *, const ulong *, const ulong *, const ulong *);
void wt_thd_destroy(WT_THD *); void wt_thd_destroy(WT_THD *);
int wt_thd_will_wait_for(WT_THD *, WT_THD *, WT_RESOURCE_ID *); int wt_thd_will_wait_for(WT_THD *, WT_THD *, const WT_RESOURCE_ID *);
int wt_thd_cond_timedwait(WT_THD *, pthread_mutex_t *); int wt_thd_cond_timedwait(WT_THD *, pthread_mutex_t *);
void wt_thd_release(WT_THD *, WT_RESOURCE_ID *); void wt_thd_release(WT_THD *, const WT_RESOURCE_ID *);
#define wt_thd_release_all(THD) wt_thd_release((THD), 0) #define wt_thd_release_all(THD) wt_thd_release((THD), 0)
int wt_resource_id_memcmp(void *, void *); int wt_resource_id_memcmp(const void *, const void *);
C_MODE_END C_MODE_END
......
...@@ -30,9 +30,24 @@ insert t1 values (2); ...@@ -30,9 +30,24 @@ insert t1 values (2);
lock table t1 write concurrent; lock table t1 write concurrent;
insert t1 values (3); insert t1 values (3);
insert t1 values (2); insert t1 values (2);
lock table t1 write concurrent;
insert t1 values (4);
insert t1 values (3); insert t1 values (3);
lock table t1 write concurrent;
insert t1 values (5);
insert t1 values (4);
lock table t1 write concurrent;
insert t1 values (6);
insert t1 values (5);
insert t1 values (6);
ERROR 40001: Deadlock found when trying to get lock; try restarting transaction ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
unlock tables; unlock tables;
ERROR 23000: Duplicate entry '2' for key 'a' ERROR 23000: Duplicate entry '2' for key 'a'
unlock tables; unlock tables;
ERROR 23000: Duplicate entry '3' for key 'a'
unlock tables;
ERROR 23000: Duplicate entry '4' for key 'a'
unlock tables;
ERROR 23000: Duplicate entry '5' for key 'a'
unlock tables;
drop table t1; drop table t1;
...@@ -33,27 +33,64 @@ drop table t1; ...@@ -33,27 +33,64 @@ drop table t1;
# #
create table t1 (a int unique) transactional=1; create table t1 (a int unique) transactional=1;
insert t1 values (1); insert t1 values (1);
lock table t1 write concurrent; lock table t1 write concurrent;
insert t1 values (2); insert t1 values (2);
connect(con_d,localhost,root,,);
connect(con_a,localhost,root,,);
lock table t1 write concurrent; lock table t1 write concurrent;
insert t1 values (3); insert t1 values (3);
send insert t1 values (2); send insert t1 values (2);
connect(con_b,localhost,root,,);
lock table t1 write concurrent;
insert t1 values (4);
send insert t1 values (3);
connect(con_c,localhost,root,,);
lock table t1 write concurrent;
insert t1 values (5);
send insert t1 values (4);
connect(con_d,localhost,root,,);
lock table t1 write concurrent;
insert t1 values (6);
send insert t1 values (5);
connection default; connection default;
let $wait_condition=select count(*) = 1 from information_schema.processlist where state="waiting for a resource"; let $wait_condition=select count(*) = 4 from information_schema.processlist where state="waiting for a resource";
--source include/wait_condition.inc --source include/wait_condition.inc
--error ER_LOCK_DEADLOCK --error ER_LOCK_DEADLOCK
insert t1 values (3); insert t1 values (6);
unlock tables; unlock tables;
connection con_a;
--error ER_DUP_ENTRY
reap;
unlock tables;
disconnect con_a;
connection con_b;
--error ER_DUP_ENTRY
reap;
unlock tables;
disconnect con_b;
connection con_c;
--error ER_DUP_ENTRY
reap;
unlock tables;
disconnect con_c;
connection con_d; connection con_d;
--error ER_DUP_ENTRY --error ER_DUP_ENTRY
reap; reap;
unlock tables; unlock tables;
disconnect con_d; disconnect con_d;
connection default; connection default;
drop table t1; drop table t1;
--disable_result_log --disable_result_log
--disable_query_log --disable_query_log
eval set session storage_engine=$default_engine; eval set session storage_engine=$default_engine;
......
/* QQ: TODO multi-pinbox */ /* QQ: TODO multi-pinbox */
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -330,7 +330,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins) ...@@ -330,7 +330,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
{ {
int npins, alloca_size; int npins, alloca_size;
void *list, **addr; void *list, **addr;
uchar *first, *last= NULL; void *first, *last= NULL;
LF_PINBOX *pinbox= pins->pinbox; LF_PINBOX *pinbox= pins->pinbox;
LINT_INIT(first); LINT_INIT(first);
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -42,6 +42,8 @@ typedef struct { ...@@ -42,6 +42,8 @@ typedef struct {
*/ */
} LF_SLIST; } LF_SLIST;
const int LF_HASH_OVERHEAD= sizeof(LF_SLIST);
/* /*
a structure to pass the context (pointers two the three successive elements a structure to pass the context (pointers two the three successive elements
in a list) from lfind to linsert/ldelete in a list) from lfind to linsert/ldelete
...@@ -315,7 +317,6 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, ...@@ -315,7 +317,6 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
uint key_offset, uint key_length, hash_get_key get_key, uint key_offset, uint key_length, hash_get_key get_key,
CHARSET_INFO *charset) CHARSET_INFO *charset)
{ {
compile_time_assert(sizeof(LF_SLIST) == LF_HASH_OVERHEAD);
lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size, lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size,
offsetof(LF_SLIST, key)); offsetof(LF_SLIST, key));
lf_dynarray_init(&hash->array, sizeof(LF_SLIST *)); lf_dynarray_init(&hash->array, sizeof(LF_SLIST *));
......
/* Copyright (C) 2000 MySQL AB /* Copyright (C) 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -101,6 +101,7 @@ static const char *proc_info_dummy(void *a __attribute__((unused)), ...@@ -101,6 +101,7 @@ static const char *proc_info_dummy(void *a __attribute__((unused)),
return 0; return 0;
} }
/* this is to be able to call set_thd_proc_info from the C code */
const char *(*proc_info_hook)(void *, const char *, const char *, const char *, const char *(*proc_info_hook)(void *, const char *, const char *, const char *,
const unsigned int)= proc_info_dummy; const unsigned int)= proc_info_dummy;
......
/* Copyright (C) 2000 MySQL AB /* Copyright (C) 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -332,7 +332,8 @@ my_bool my_thread_init(void) ...@@ -332,7 +332,8 @@ my_bool my_thread_init(void)
0); 0);
pthread_cond_init(&tmp->suspend, NULL); pthread_cond_init(&tmp->suspend, NULL);
tmp->stack_ends_here= &tmp + STACK_DIRECTION * my_thread_stack_size; tmp->stack_ends_here= (char*)&tmp +
STACK_DIRECTION * (long)my_thread_stack_size;
pthread_mutex_lock(&THR_LOCK_threads); pthread_mutex_lock(&THR_LOCK_threads);
tmp->id= ++thread_id; tmp->id= ++thread_id;
......
/* Copyright (C) 2008 MySQL AB /* Copyright (C) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -13,14 +13,16 @@ ...@@ -13,14 +13,16 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* /**
@file
"waiting threads" subsystem - a unified interface for threads to wait "waiting threads" subsystem - a unified interface for threads to wait
on each other, with built-in deadlock detection. on each other, with built-in deadlock detection.
Main concepts Main concepts
^^^^^^^^^^^^^ ^^^^^^^^^^^^^
a thread - is represented by a WT_THD structure. One physical thread a thread - is represented by a WT_THD structure. One physical thread
can have only one WT_THD descriptor. can have only one WT_THD descriptor at any given moment.
a resource - a thread does not wait for other threads directly, a resource - a thread does not wait for other threads directly,
instead it waits for a "resource", which is "owned" by other threads. instead it waits for a "resource", which is "owned" by other threads.
...@@ -37,9 +39,67 @@ ...@@ -37,9 +39,67 @@
a function that knows how to compare values of this resource type. a function that knows how to compare values of this resource type.
In the simple case it could be wt_resource_id_memcmp(). In the simple case it could be wt_resource_id_memcmp().
a wait-for graph - a graph, that represenst "wait-for" relationships.
It has two types of nodes - threads and resources. There are directed
edges from a thread to a resource it is waiting for (WT_THD::waiting_for),
from a thread to resources that it "owns" (WT_THD::my_resources),
and from a resource to threads that "own" it (WT_RESOURCE::owners)
Graph completeness
^^^^^^^^^^^^^^^^^^
For flawless deadlock detection wait-for graph must be complete.
It means that when a thread starts waiting it needs to know *all* its
blockers, and call wt_thd_will_wait_for() for every one of them.
Otherwise two phenomena should be expected:
1. Fuzzy timeouts:
thread A needs to get a lock, and is blocked by a thread B.
it waits.
Just before the timeout thread B releases the lock.
thread A is ready to grab the lock but discovers that it is also
blocked by a thread C.
It waits and times out.
As a result thread A has waited two timeout intervals, instead of one.
2. Unreliable cycle detection:
Thread A waits for threads B and C
Thread C waits for D
Thread D wants to start waiting for A
one can see immediately that thread D creates a cycle, and thus
a deadlock is detected.
But if thread A would only wait for B, and start waiting for C
when B would unlock, thread D would be allowed to wait, a deadlock
would be only detected when B unlocks or somebody times out.
These two phenomena don't affect a correctness, and strictly speaking,
the caller is not required to call wt_thd_will_wait_for() for *all*
blockers - it may optimize wt_thd_will_wait_for() calls. But they
may be perceived as bugs by users, it must be understood that such
an optimization comes with its price.
Usage Usage
^^^^^ ^^^^^
to use the interface one needs to use this thread's WT_THD,
First, the wt* subsystem must be initialized by calling
wt_init(). In the server you don't need to do it, it's done
in mysqld.cc.
Similarly, wt_end() frees wt* structures, should be called
at the end, but in the server mysqld.cc takes care of that.
Every WT_THD should be initialized with wt_thd_lazy_init().
After that they can be used in other wt_thd_* calls.
Before discarding, WT_THD should be free'd with
wt_thd_destroy(). In the server both are handled in sql_class.cc,
it's an error to try to do it manually.
To use the deadlock detection one needs to use this thread's WT_THD,
call wt_thd_will_wait_for() for every thread it needs to wait on, call wt_thd_will_wait_for() for every thread it needs to wait on,
then call wt_thd_cond_timedwait(). When thread releases a resource then call wt_thd_cond_timedwait(). When thread releases a resource
it should call wt_thd_release() (or wt_thd_release_all()) - it will it should call wt_thd_release() (or wt_thd_release_all()) - it will
...@@ -48,7 +108,7 @@ ...@@ -48,7 +108,7 @@
Just like with pthread's cond_wait, there could be spurious Just like with pthread's cond_wait, there could be spurious
wake-ups from wt_thd_cond_timedwait(). A caller is expected to wake-ups from wt_thd_cond_timedwait(). A caller is expected to
handle that. handle that (that is, to re-check the blocking criteria).
wt_thd_will_wait_for() and wt_thd_cond_timedwait() return either wt_thd_will_wait_for() and wt_thd_cond_timedwait() return either
WT_OK or WT_DEADLOCK. Additionally wt_thd_cond_timedwait() can return WT_OK or WT_DEADLOCK. Additionally wt_thd_cond_timedwait() can return
...@@ -79,8 +139,8 @@ ...@@ -79,8 +139,8 @@
wt_thd_cond_timedwait()), a number of timeouts, a deadlock cycle wt_thd_cond_timedwait()), a number of timeouts, a deadlock cycle
length distribution - number of deadlocks with every length from length distribution - number of deadlocks with every length from
1 to WT_CYCLE_STATS, and a wait time distribution - number 1 to WT_CYCLE_STATS, and a wait time distribution - number
of waits with a time from 1 us to 1 min in WT_CYCLE_STATS of waits with a time from 1 us to 1 min in WT_WAIT_STATS
intervals on a log scale. intervals on a log e scale.
*/ */
/* /*
...@@ -93,10 +153,11 @@ ...@@ -93,10 +153,11 @@
(example A=IX, B=IS, C=S, D=X) (example A=IX, B=IS, C=S, D=X)
you need to include lock level in the resource identifier - thread 1 you need to include lock level in the resource identifier - a
waiting for lock A on resource R and thread 2 waiting for lock B thread waiting for lock of the type A on resource R and another
on resource R should wait on different WT_RESOURCE structures, on different thread waiting for lock of the type B on resource R should wait on
{lock, resource} pairs. Otherwise the following is possible: different WT_RESOURCE structures, on different {lock, resource}
pairs. Otherwise the following is possible:
thread1> take S-lock on R thread1> take S-lock on R
thread2> take IS-lock on R thread2> take IS-lock on R
...@@ -113,40 +174,46 @@ ...@@ -113,40 +174,46 @@
#include <waiting_threads.h> #include <waiting_threads.h>
#include <m_string.h> #include <m_string.h>
/* /* status variables */
status variables:
distribution of cycle lengths
wait time log distribution
Note:
we call deadlock() twice per wait (with different search lengths). /**
it means a deadlock will be counted twice. It's difficult to avoid, preset table of wait intervals
as on the second search we could find a *different* deadlock and we
*want* to count it too. So we just count all deadlocks - two searches
mean two increments on the wt_cycle_stats.
*/ */
ulonglong wt_wait_table[WT_WAIT_STATS]; ulonglong wt_wait_table[WT_WAIT_STATS];
/**
wait time distribution (log e scale)
*/
uint32 wt_wait_stats[WT_WAIT_STATS+1]; uint32 wt_wait_stats[WT_WAIT_STATS+1];
uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1], wt_success_stats; /**
distribution of cycle lengths
first column tells whether this was during short or long detection
*/
uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1];
uint32 wt_success_stats;
static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock; static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock;
#ifdef SAFE_STATISTICS
#define incr(VAR, LOCK) \
do { \
my_atomic_rwlock_wrlock(&(LOCK)); \
my_atomic_add32(&(VAR), 1); \
my_atomic_rwlock_wrunlock(&(LOCK)); \
} while(0)
#else
#define incr(VAR,LOCK) do { (VAR)++; } while(0)
#endif
static void increment_success_stats() static void increment_success_stats()
{ {
my_atomic_rwlock_wrlock(&success_stats_lock); incr(wt_success_stats, success_stats_lock);
my_atomic_add32(&wt_success_stats, 1);
my_atomic_rwlock_wrunlock(&success_stats_lock);
} }
static void increment_cycle_stats(uint depth, uint slot) static void increment_cycle_stats(uint depth, uint slot)
{ {
if (depth >= WT_CYCLE_STATS) if (depth >= WT_CYCLE_STATS)
depth= WT_CYCLE_STATS; depth= WT_CYCLE_STATS;
my_atomic_rwlock_wrlock(&cycle_stats_lock); incr(wt_cycle_stats[slot][depth], cycle_stats_lock);
my_atomic_add32(&wt_cycle_stats[slot][depth], 1);
my_atomic_rwlock_wrunlock(&cycle_stats_lock);
} }
static void increment_wait_stats(ulonglong waited,int ret) static void increment_wait_stats(ulonglong waited,int ret)
...@@ -155,12 +222,89 @@ static void increment_wait_stats(ulonglong waited,int ret) ...@@ -155,12 +222,89 @@ static void increment_wait_stats(ulonglong waited,int ret)
if ((ret) == ETIMEDOUT) if ((ret) == ETIMEDOUT)
i= WT_WAIT_STATS; i= WT_WAIT_STATS;
else else
for (i=0; i < WT_WAIT_STATS && waited/10 > wt_wait_table[i]; i++) ; for (i= 0; i < WT_WAIT_STATS && waited/10 > wt_wait_table[i]; i++) ;
my_atomic_rwlock_wrlock(&wait_stats_lock); incr(wt_wait_stats[i], wait_stats_lock);
my_atomic_add32(wt_wait_stats+i, 1);
my_atomic_rwlock_wrunlock(&wait_stats_lock);
} }
/*
'lock' protects 'owners', 'state', and 'waiter_count'
'id' is read-only
a resource is picked up from a hash in a lock-free manner
it's returned pinned, so it cannot be freed at once
but it may be freed right after the pin is removed
to free a resource it should
1. have no owners
2. have no waiters
two ways to access a resource:
1. find it in a hash
- it's returned pinned.
a) take a lock in exclusive mode
b) check the state, it should be ACTIVE to be usable
c) unpin
2. by a direct reference
- could only used if a resource cannot be freed
e.g. accessing a resource by thd->waiting_for is safe,
a resource cannot be freed as there's a thread waiting for it
*/
struct st_wt_resource {
WT_RESOURCE_ID id;
uint waiter_count;
enum { ACTIVE, FREE } state;
#ifndef DBUG_OFF
pthread_mutex_t *cond_mutex; /* a mutex for the 'cond' below */
#endif
/*
before the 'lock' all elements are mutable, after (and including) -
immutable in the sense that lf_hash_insert() won't memcpy() over them.
See wt_init().
*/
#ifdef WT_RWLOCKS_USE_MUTEXES
/*
we need a special rwlock-like 'lock' to allow readers bypass
waiting writers, otherwise readers can deadlock. For example:
A waits on resource x, owned by B, B waits on resource y, owned
by A, we have a cycle (A->x->B->y->A)
Both A and B start deadlock detection:
A locks x B locks y
A goes deeper B goes deeper
A locks y B locks x
with mutexes it would deadlock. With rwlocks it won't, as long
as both A and B are taking read locks (and they do).
But other threads may take write locks. Assume there's
C who wants to start waiting on x, and D who wants to start
waiting on y.
A read-locks x B read-locks y
A goes deeper B goes deeper
=> C write-locks x (to add a new edge) D write-locks y
.. C is blocked D is blocked
A read-locks y B read-locks x
Now, if a read lock can bypass a pending wrote lock request, we're fine.
If it can not, we have a deadlock.
writer starvation is technically possible, but unlikely, because
the contention is expected to be low.
*/
struct {
pthread_cond_t cond;
pthread_mutex_t mutex;
uint readers: 16;
uint pending_writers: 15;
uint write_locked: 1;
} lock;
#else
rw_lock_t lock;
#endif
pthread_cond_t cond; /* the corresponding mutex is provided by the caller */
DYNAMIC_ARRAY owners;
};
#ifdef WT_RWLOCKS_USE_MUTEXES #ifdef WT_RWLOCKS_USE_MUTEXES
static void rc_rwlock_init(WT_RESOURCE *rc) static void rc_rwlock_init(WT_RESOURCE *rc)
{ {
...@@ -169,6 +313,8 @@ static void rc_rwlock_init(WT_RESOURCE *rc) ...@@ -169,6 +313,8 @@ static void rc_rwlock_init(WT_RESOURCE *rc)
} }
static void rc_rwlock_destroy(WT_RESOURCE *rc) static void rc_rwlock_destroy(WT_RESOURCE *rc)
{ {
DBUG_ASSERT(rc->lock.write_locked == 0);
DBUG_ASSERT(rc->lock.readers == 0);
pthread_cond_destroy(&rc->lock.cond); pthread_cond_destroy(&rc->lock.cond);
pthread_mutex_destroy(&rc->lock.mutex); pthread_mutex_destroy(&rc->lock.mutex);
} }
...@@ -188,7 +334,7 @@ static void rc_wrlock(WT_RESOURCE *rc) ...@@ -188,7 +334,7 @@ static void rc_wrlock(WT_RESOURCE *rc)
pthread_mutex_lock(&rc->lock.mutex); pthread_mutex_lock(&rc->lock.mutex);
while (rc->lock.write_locked || rc->lock.readers) while (rc->lock.write_locked || rc->lock.readers)
pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex); pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex);
rc->lock.write_locked=1; rc->lock.write_locked= 1;
pthread_mutex_unlock(&rc->lock.mutex); pthread_mutex_unlock(&rc->lock.mutex);
DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value)); DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value));
} }
...@@ -198,7 +344,7 @@ static void rc_unlock(WT_RESOURCE *rc) ...@@ -198,7 +344,7 @@ static void rc_unlock(WT_RESOURCE *rc)
pthread_mutex_lock(&rc->lock.mutex); pthread_mutex_lock(&rc->lock.mutex);
if (rc->lock.write_locked) if (rc->lock.write_locked)
{ {
rc->lock.write_locked=0; rc->lock.write_locked= 0;
pthread_cond_broadcast(&rc->lock.cond); pthread_cond_broadcast(&rc->lock.cond);
} }
else if (--rc->lock.readers == 0) else if (--rc->lock.readers == 0)
...@@ -242,12 +388,12 @@ static LF_HASH reshash; ...@@ -242,12 +388,12 @@ static LF_HASH reshash;
/** /**
WT_RESOURCE constructor WT_RESOURCE constructor
It's called from lf_hash and takes an offset to LF_SLIST instance. It's called from lf_hash and takes a pointer to an LF_SLIST instance.
WT_RESOURCE is located at arg+sizeof(LF_SLIST) WT_RESOURCE is located at arg+sizeof(LF_SLIST)
*/ */
static void wt_resource_init(uchar *arg) static void wt_resource_init(uchar *arg)
{ {
WT_RESOURCE *rc=(WT_RESOURCE*)(arg+LF_HASH_OVERHEAD); WT_RESOURCE *rc= (WT_RESOURCE*)(arg+LF_HASH_OVERHEAD);
DBUG_ENTER("wt_resource_init"); DBUG_ENTER("wt_resource_init");
bzero(rc, sizeof(*rc)); bzero(rc, sizeof(*rc));
...@@ -260,12 +406,12 @@ static void wt_resource_init(uchar *arg) ...@@ -260,12 +406,12 @@ static void wt_resource_init(uchar *arg)
/** /**
WT_RESOURCE destructor WT_RESOURCE destructor
It's called from lf_hash and takes an offset to LF_SLIST instance. It's called from lf_hash and takes a pointer to an LF_SLIST instance.
WT_RESOURCE is located at arg+sizeof(LF_SLIST) WT_RESOURCE is located at arg+sizeof(LF_SLIST)
*/ */
static void wt_resource_destroy(uchar *arg) static void wt_resource_destroy(uchar *arg)
{ {
WT_RESOURCE *rc=(WT_RESOURCE*)(arg+LF_HASH_OVERHEAD); WT_RESOURCE *rc= (WT_RESOURCE*)(arg+LF_HASH_OVERHEAD);
DBUG_ENTER("wt_resource_destroy"); DBUG_ENTER("wt_resource_destroy");
DBUG_ASSERT(rc->owners.elements == 0); DBUG_ASSERT(rc->owners.elements == 0);
...@@ -278,6 +424,7 @@ static void wt_resource_destroy(uchar *arg) ...@@ -278,6 +424,7 @@ static void wt_resource_destroy(uchar *arg)
void wt_init() void wt_init()
{ {
DBUG_ENTER("wt_init"); DBUG_ENTER("wt_init");
DBUG_ASSERT(reshash.alloc.constructor != wt_resource_init);
lf_hash_init(&reshash, sizeof(WT_RESOURCE), LF_HASH_UNIQUE, 0, lf_hash_init(&reshash, sizeof(WT_RESOURCE), LF_HASH_UNIQUE, 0,
sizeof_WT_RESOURCE_ID, 0, 0); sizeof_WT_RESOURCE_ID, 0, 0);
...@@ -293,15 +440,15 @@ void wt_init() ...@@ -293,15 +440,15 @@ void wt_init()
reshash.element_size= offsetof(WT_RESOURCE, lock); reshash.element_size= offsetof(WT_RESOURCE, lock);
bzero(wt_wait_stats, sizeof(wt_wait_stats)); bzero(wt_wait_stats, sizeof(wt_wait_stats));
bzero(wt_cycle_stats, sizeof(wt_cycle_stats)); bzero(wt_cycle_stats, sizeof(wt_cycle_stats));
wt_success_stats=0; wt_success_stats= 0;
{ /* initialize wt_wait_table[]. from 1 us to 1 min, log scale */ { /* initialize wt_wait_table[]. from 1 us to 1 min, log e scale */
int i; int i;
double from=log(1); /* 1 us */ double from= log(1); /* 1 us */
double to=log(60e6); /* 1 min */ double to= log(60e6); /* 1 min */
for (i=0; i < WT_WAIT_STATS; i++) for (i= 0; i < WT_WAIT_STATS; i++)
{ {
wt_wait_table[i]=(ulonglong)exp((to-from)/(WT_WAIT_STATS-1)*i+from); wt_wait_table[i]= (ulonglong)exp((to-from)/(WT_WAIT_STATS-1)*i+from);
DBUG_ASSERT(i==0 || wt_wait_table[i-1] != wt_wait_table[i]); DBUG_ASSERT(i == 0 || wt_wait_table[i-1] != wt_wait_table[i]);
} }
} }
my_atomic_rwlock_init(&cycle_stats_lock); my_atomic_rwlock_init(&cycle_stats_lock);
...@@ -325,7 +472,7 @@ void wt_end() ...@@ -325,7 +472,7 @@ void wt_end()
/** /**
Lazy WT_THD initialization Lazy WT_THD initialization
Cheap initialization of WT_THD. Only initialized fields that don't require Cheap initialization of WT_THD. Only initialize fields that don't require
memory allocations - basically, it only does assignments. The rest of the memory allocations - basically, it only does assignments. The rest of the
WT_THD structure will be initialized on demand, on the first use. WT_THD structure will be initialized on demand, on the first use.
This allows one to initialize lazily all WT_THD structures, even if some This allows one to initialize lazily all WT_THD structures, even if some
...@@ -335,14 +482,18 @@ void wt_end() ...@@ -335,14 +482,18 @@ void wt_end()
@param ts a pointer to deadlock timeout short value @param ts a pointer to deadlock timeout short value
@param dl a pointer to deadlock search depth long value @param dl a pointer to deadlock search depth long value
@param tl a pointer to deadlock timeout long value @param tl a pointer to deadlock timeout long value
@note these are pointers to values, and WT_THD stores them as pointers.
It allows one later to change search depths and timeouts for existing
threads. It also means that the pointers must stay valid for the lifetime
of WT_THD.
*/ */
void wt_thd_lazy_init(WT_THD *thd, ulong *ds, ulong *ts, ulong *dl, ulong *tl) void wt_thd_lazy_init(WT_THD *thd, const ulong *ds, const ulong *ts,
const ulong *dl, const ulong *tl)
{ {
DBUG_ENTER("wt_thd_lazy_init"); DBUG_ENTER("wt_thd_lazy_init");
thd->waiting_for=0; thd->waiting_for= 0;
thd->my_resources.buffer= 0; thd->weight= 0;
thd->my_resources.elements= 0;
thd->weight=0;
thd->deadlock_search_depth_short= ds; thd->deadlock_search_depth_short= ds;
thd->timeout_short= ts; thd->timeout_short= ts;
thd->deadlock_search_depth_long= dl; thd->deadlock_search_depth_long= dl;
...@@ -350,7 +501,7 @@ void wt_thd_lazy_init(WT_THD *thd, ulong *ds, ulong *ts, ulong *dl, ulong *tl) ...@@ -350,7 +501,7 @@ void wt_thd_lazy_init(WT_THD *thd, ulong *ds, ulong *ts, ulong *dl, ulong *tl)
/* dynamic array is also initialized lazily - without memory allocations */ /* dynamic array is also initialized lazily - without memory allocations */
my_init_dynamic_array(&thd->my_resources, sizeof(WT_RESOURCE *), 0, 5); my_init_dynamic_array(&thd->my_resources, sizeof(WT_RESOURCE *), 0, 5);
#ifndef DBUG_OFF #ifndef DBUG_OFF
thd->name=my_thread_name(); thd->name= my_thread_name();
#endif #endif
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -367,9 +518,9 @@ static int fix_thd_pins(WT_THD *thd) ...@@ -367,9 +518,9 @@ static int fix_thd_pins(WT_THD *thd)
{ {
if (unlikely(thd->pins == 0)) if (unlikely(thd->pins == 0))
{ {
thd->pins=lf_hash_get_pins(&reshash); thd->pins= lf_hash_get_pins(&reshash);
#ifndef DBUG_OFF #ifndef DBUG_OFF
thd->name=my_thread_name(); thd->name= my_thread_name();
#endif #endif
} }
return thd->pins == 0; return thd->pins == 0;
...@@ -380,12 +531,12 @@ void wt_thd_destroy(WT_THD *thd) ...@@ -380,12 +531,12 @@ void wt_thd_destroy(WT_THD *thd)
DBUG_ENTER("wt_thd_destroy"); DBUG_ENTER("wt_thd_destroy");
DBUG_ASSERT(thd->my_resources.elements == 0); DBUG_ASSERT(thd->my_resources.elements == 0);
DBUG_ASSERT(thd->waiting_for == 0);
if (thd->pins != 0) if (thd->pins != 0)
lf_hash_put_pins(thd->pins); lf_hash_put_pins(thd->pins);
delete_dynamic(&thd->my_resources); delete_dynamic(&thd->my_resources);
thd->waiting_for=0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/** /**
...@@ -394,7 +545,7 @@ void wt_thd_destroy(WT_THD *thd) ...@@ -394,7 +545,7 @@ void wt_thd_destroy(WT_THD *thd)
It can be used in WT_RESOURCE_TYPE structures where bytewise It can be used in WT_RESOURCE_TYPE structures where bytewise
comparison of values is sufficient. comparison of values is sufficient.
*/ */
int wt_resource_id_memcmp(void *a, void *b) int wt_resource_id_memcmp(const void *a, const void *b)
{ {
/* we use the fact that there's no padding in the middle of WT_RESOURCE_ID */ /* we use the fact that there's no padding in the middle of WT_RESOURCE_ID */
compile_time_assert(offsetof(WT_RESOURCE_ID, type) == sizeof(ulonglong)); compile_time_assert(offsetof(WT_RESOURCE_ID, type) == sizeof(ulonglong));
...@@ -405,10 +556,10 @@ int wt_resource_id_memcmp(void *a, void *b) ...@@ -405,10 +556,10 @@ int wt_resource_id_memcmp(void *a, void *b)
arguments for the recursive deadlock_search function arguments for the recursive deadlock_search function
*/ */
struct deadlock_arg { struct deadlock_arg {
WT_THD *thd; /**< starting point of a search */ WT_THD * const thd; /**< starting point of a search */
uint max_depth; /**< search depth limit */ uint const max_depth; /**< search depth limit */
WT_THD *victim; /**< a thread to be killed to resolve a deadlock */ WT_THD *victim; /**< a thread to be killed to resolve a deadlock */
WT_RESOURCE *rc; /**< see comment at the end of deadlock_search() */ WT_RESOURCE *last_locked_rc; /**< see comment at the end of deadlock_search() */
}; };
/** /**
...@@ -421,10 +572,10 @@ static void change_victim(WT_THD* found, struct deadlock_arg *arg) ...@@ -421,10 +572,10 @@ static void change_victim(WT_THD* found, struct deadlock_arg *arg)
if (arg->victim != arg->thd) if (arg->victim != arg->thd)
{ {
rc_unlock(arg->victim->waiting_for); /* release the previous victim */ rc_unlock(arg->victim->waiting_for); /* release the previous victim */
DBUG_ASSERT(arg->rc == found->waiting_for); DBUG_ASSERT(arg->last_locked_rc == found->waiting_for);
} }
arg->victim= found; arg->victim= found;
arg->rc= 0; arg->last_locked_rc= 0;
} }
} }
...@@ -444,7 +595,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, ...@@ -444,7 +595,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
LF_REQUIRE_PINS(1); LF_REQUIRE_PINS(1);
arg->rc= 0; arg->last_locked_rc= 0;
if (depth > arg->max_depth) if (depth > arg->max_depth)
{ {
...@@ -453,7 +604,10 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, ...@@ -453,7 +604,10 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
} }
retry: retry:
/* safe dereference as explained in lf_alloc-pin.c */ /*
safe dereference as explained in lf_alloc-pin.c
(in short: protects against lf_alloc_free() in lf_hash_delete())
*/
do do
{ {
rc= *shared_ptr; rc= *shared_ptr;
...@@ -469,6 +623,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, ...@@ -469,6 +623,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
rc_rdlock(rc); rc_rdlock(rc);
if (rc->state != ACTIVE || *shared_ptr != rc) if (rc->state != ACTIVE || *shared_ptr != rc)
{ {
/* blocker is not waiting on this resource anymore */
rc_unlock(rc); rc_unlock(rc);
lf_unpin(arg->thd->pins, 0); lf_unpin(arg->thd->pins, 0);
goto retry; goto retry;
...@@ -480,20 +635,22 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, ...@@ -480,20 +635,22 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
Below is not a pure depth-first search. It's a depth-first with a Below is not a pure depth-first search. It's a depth-first with a
slightest hint of breadth-first. Depth-first is: slightest hint of breadth-first. Depth-first is:
check(element): check(element, X):
foreach current in element->nodes[] do: foreach current in element->nodes[] do:
if current == element return error; if current == X return error;
check(current); check(current, X);
while we do while we do
check(element): check(element, X):
foreach current in element->nodes[] do: foreach current in element->nodes[] do:
if current == element return error; if current == X return error;
foreach current in element->nodes[] do: foreach current in element->nodes[] do:
check(current); check(current, X);
preferring shorter deadlocks over longer ones.
*/ */
for (i=0; i < rc->owners.elements; i++) for (i= 0; i < rc->owners.elements; i++)
{ {
cursor= *dynamic_element(&rc->owners, i, WT_THD**); cursor= *dynamic_element(&rc->owners, i, WT_THD**);
/* /*
...@@ -517,7 +674,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, ...@@ -517,7 +674,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
goto end; goto end;
} }
} }
for (i=0; i < rc->owners.elements; i++) for (i= 0; i < rc->owners.elements; i++)
{ {
cursor= *dynamic_element(&rc->owners, i, WT_THD**); cursor= *dynamic_element(&rc->owners, i, WT_THD**);
switch (deadlock_search(arg, cursor, depth+1)) { switch (deadlock_search(arg, cursor, depth+1)) {
...@@ -528,20 +685,21 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, ...@@ -528,20 +685,21 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
break; break;
case WT_DEADLOCK: case WT_DEADLOCK:
ret= WT_DEADLOCK; ret= WT_DEADLOCK;
change_victim(cursor, arg); /* also sets arg->rc to 0 */ change_victim(cursor, arg); /* also sets arg->last_locked_rc to 0 */
i= rc->owners.elements; /* jump out of the loop */ i= rc->owners.elements; /* jump out of the loop */
break; break;
default: default:
DBUG_ASSERT(0); DBUG_ASSERT(0);
} }
if (arg->rc) if (arg->last_locked_rc)
rc_unlock(arg->rc); rc_unlock(arg->last_locked_rc);
} }
end: end:
/* /*
Note that 'rc' is locked in this function, but it's never unlocked here. Note that 'rc' is locked in this function, but it's never unlocked here.
Instead it's saved in arg->rc and the *caller* is expected to unlock it. Instead it's saved in arg->last_locked_rc and the *caller* is
It's done to support different killing strategies. This is how it works: expected to unlock it. It's done to support different killing
strategies. This is how it works:
Assuming a graph Assuming a graph
thd->A->B->C->thd thd->A->B->C->thd
...@@ -552,9 +710,9 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, ...@@ -552,9 +710,9 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
on. Goes down recursively, locks B. Goes down recursively, locks C. on. Goes down recursively, locks B. Goes down recursively, locks C.
Notices that C is waiting on thd. Deadlock detected. Sets arg->victim=thd. Notices that C is waiting on thd. Deadlock detected. Sets arg->victim=thd.
Returns from the last deadlock_search() call. C stays locked! Returns from the last deadlock_search() call. C stays locked!
Now it checks whether C is a more appropriate victim then 'thd'. Now it checks whether C is a more appropriate victim than 'thd'.
If yes - arg->victim=C, otherwise C is unlocked. Returns. B stays locked. If yes - arg->victim=C, otherwise C is unlocked. Returns. B stays locked.
Now it checks whether B is a more appropriate victim then arg->victim. Now it checks whether B is a more appropriate victim than arg->victim.
If yes - old arg->victim is unlocked and arg->victim=B, If yes - old arg->victim is unlocked and arg->victim=B,
otherwise B is unlocked. Return. otherwise B is unlocked. Return.
And so on. And so on.
...@@ -566,7 +724,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, ...@@ -566,7 +724,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
is unrolled and we are back to deadlock() function, there are only two is unrolled and we are back to deadlock() function, there are only two
locks left - on thd and on the victim. locks left - on thd and on the victim.
*/ */
arg->rc= rc; arg->last_locked_rc= rc;
DBUG_PRINT("wt", ("exit: %s", DBUG_PRINT("wt", ("exit: %s",
ret == WT_DEPTH_EXCEEDED ? "WT_DEPTH_EXCEEDED" : ret == WT_DEPTH_EXCEEDED ? "WT_DEPTH_EXCEEDED" :
ret ? "WT_DEADLOCK" : "OK")); ret ? "WT_DEADLOCK" : "OK"));
...@@ -612,30 +770,31 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth, ...@@ -612,30 +770,31 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
*/ */
if (ret == WT_DEADLOCK && depth) if (ret == WT_DEADLOCK && depth)
change_victim(blocker, &arg); change_victim(blocker, &arg);
if (arg.rc) if (arg.last_locked_rc)
{ {
/* /*
Special return code if there's nobody to wait for. Special return code if there's nobody to wait for.
depth == 0 means that we start the search from thd (thd == blocker). depth == 0 means that we start the search from thd (thd == blocker).
ret == WT_OK means that no cycle was found and arg.rc == thd->waiting_for. ret == WT_OK means that no cycle was found and
and arg.rc->owners.elements == 0 means that (applying the rule above) arg.last_locked_rc == thd->waiting_for.
thd->waiting_for->owners.elements == 0, and thd doesn't have anybody to and arg.last_locked_rc->owners.elements == 0 means that
wait for. (applying the rule above) thd->waiting_for->owners.elements == 0,
and thd doesn't have anybody to wait for.
*/ */
if (depth == 0 && ret == WT_OK && arg.rc->owners.elements == 0) if (depth == 0 && ret == WT_OK && arg.last_locked_rc->owners.elements == 0)
{ {
DBUG_ASSERT(thd == blocker); DBUG_ASSERT(thd == blocker);
DBUG_ASSERT(arg.rc == thd->waiting_for); DBUG_ASSERT(arg.last_locked_rc == thd->waiting_for);
ret= WT_FREE_TO_GO; ret= WT_FREE_TO_GO;
} }
rc_unlock(arg.rc); rc_unlock(arg.last_locked_rc);
} }
/* notify the victim, if appropriate */ /* notify the victim, if appropriate */
if (ret == WT_DEADLOCK && arg.victim != thd) if (ret == WT_DEADLOCK && arg.victim != thd)
{ {
DBUG_PRINT("wt", ("killing %s", arg.victim->name)); DBUG_PRINT("wt", ("killing %s", arg.victim->name));
arg.victim->killed=1; arg.victim->killed= 1;
pthread_cond_broadcast(&arg.victim->waiting_for->cond); pthread_cond_broadcast(&arg.victim->waiting_for->cond);
rc_unlock(arg.victim->waiting_for); rc_unlock(arg.victim->waiting_for);
ret= WT_OK; ret= WT_OK;
...@@ -659,7 +818,7 @@ static int unlock_lock_and_free_resource(WT_THD *thd, WT_RESOURCE *rc) ...@@ -659,7 +818,7 @@ static int unlock_lock_and_free_resource(WT_THD *thd, WT_RESOURCE *rc)
if (rc->owners.elements || rc->waiter_count) if (rc->owners.elements || rc->waiter_count)
{ {
DBUG_PRINT("wt", ("nothing to do, %d owners, %d waiters", DBUG_PRINT("wt", ("nothing to do, %u owners, %u waiters",
rc->owners.elements, rc->waiter_count)); rc->owners.elements, rc->waiter_count));
rc_unlock(rc); rc_unlock(rc);
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -683,12 +842,8 @@ static int unlock_lock_and_free_resource(WT_THD *thd, WT_RESOURCE *rc) ...@@ -683,12 +842,8 @@ static int unlock_lock_and_free_resource(WT_THD *thd, WT_RESOURCE *rc)
2. set the state to FREE 2. set the state to FREE
3. release the lock 3. release the lock
4. remove from the hash 4. remove from the hash
I *think* it's safe to release the lock while the element is still
in the hash. If not, the corrected procedure should be
3. pin; 4; remove; 5; release; 6; unpin and it'll need pin[3].
*/ */
rc->state=FREE; rc->state= FREE;
rc_unlock(rc); rc_unlock(rc);
DBUG_RETURN(lf_hash_delete(&reshash, thd->pins, key, keylen) == -1); DBUG_RETURN(lf_hash_delete(&reshash, thd->pins, key, keylen) == -1);
} }
...@@ -739,15 +894,19 @@ static int stop_waiting(WT_THD *thd) ...@@ -739,15 +894,19 @@ static int stop_waiting(WT_THD *thd)
/** /**
notify the system that a thread needs to wait for another thread notify the system that a thread needs to wait for another thread
called by a *waiter* to declare what resource it will wait for. called by a *waiter* to declare that it (thd) will wait for another
thread (blocker) on a specific resource (resid).
can be called many times, if many blockers own a blocking resource. can be called many times, if many blockers own a blocking resource.
but must always be called with the same resource id - a thread cannot but must always be called with the same resource id - a thread cannot
wait for more than one resource at a time. wait for more than one resource at a time.
@return WT_OK or WT_DEADLOCK
As a new edge is added to the wait-for graph, a deadlock detection is As a new edge is added to the wait-for graph, a deadlock detection is
performed for this new edge. performed for this new edge.
*/ */
int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid) int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker,
const WT_RESOURCE_ID *resid)
{ {
uint i; uint i;
WT_RESOURCE *rc; WT_RESOURCE *rc;
...@@ -822,7 +981,7 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid) ...@@ -822,7 +981,7 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
/* /*
we can safely access the resource here, it's in the hash as it has we can safely access the resource here, it's in the hash as it has
at least one owner, and non-zero waiter_count non-zero waiter_count
*/ */
rc= thd->waiting_for; rc= thd->waiting_for;
rc_wrlock(rc); rc_wrlock(rc);
...@@ -835,7 +994,11 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid) ...@@ -835,7 +994,11 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
DBUG_RETURN(WT_DEADLOCK); DBUG_RETURN(WT_DEADLOCK);
} }
} }
for (i=0; i < rc->owners.elements; i++) /*
Another thread could be waiting on this resource for this very 'blocker'.
In this case we should not add it to the list for the second time.
*/
for (i= 0; i < rc->owners.elements; i++)
if (*dynamic_element(&rc->owners, i, WT_THD**) == blocker) if (*dynamic_element(&rc->owners, i, WT_THD**) == blocker)
break; break;
if (i >= rc->owners.elements) if (i >= rc->owners.elements)
...@@ -854,19 +1017,21 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid) ...@@ -854,19 +1017,21 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
} }
rc_unlock(rc); rc_unlock(rc);
if (deadlock(thd, blocker, 1, *thd->deadlock_search_depth_short)) if (deadlock(thd, blocker, 1, *thd->deadlock_search_depth_short) != WT_OK)
{ {
stop_waiting(thd); stop_waiting(thd);
DBUG_RETURN(WT_DEADLOCK); DBUG_RETURN(WT_DEADLOCK);
} }
DBUG_RETURN(0); DBUG_RETURN(WT_OK);
} }
/** /**
called by a *waiter* to start waiting called by a *waiter* (thd) to start waiting
It's supposed to be a drop-in replacement for It's supposed to be a drop-in replacement for
pthread_cond_timedwait(), and it takes mutex as an argument. pthread_cond_timedwait(), and it takes mutex as an argument.
@return one of WT_TIMEOUT, WT_DEADLOCK, WT_OK
*/ */
int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex) int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
{ {
...@@ -878,10 +1043,10 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex) ...@@ -878,10 +1043,10 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
DBUG_PRINT("wt", ("enter: thd=%s, rc=%p", thd->name, rc)); DBUG_PRINT("wt", ("enter: thd=%s, rc=%p", thd->name, rc));
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (rc->mutex) if (rc->cond_mutex)
DBUG_ASSERT(rc->mutex == mutex); DBUG_ASSERT(rc->cond_mutex == mutex);
else else
rc->mutex= mutex; rc->cond_mutex= mutex;
safe_mutex_assert_owner(mutex); safe_mutex_assert_owner(mutex);
#endif #endif
...@@ -890,20 +1055,27 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex) ...@@ -890,20 +1055,27 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
#ifdef __WIN__ #ifdef __WIN__
/* /*
only for the sake of Windows we distinguish between only for the sake of Windows we distinguish between
'before' and 'starttime' 'before' and 'starttime':
my_getsystime() returns high-resolution value, that cannot be used for
waiting (it doesn't follow system clock changes), but is good for time
intervals.
GetSystemTimeAsFileTime() follows system clock, but is low-resolution
and will result in lousy intervals.
*/ */
GetSystemTimeAsFileTime((PFILETIME)&starttime); GetSystemTimeAsFileTime((PFILETIME)&starttime);
#endif #endif
rc_wrlock(rc); rc_wrlock(rc);
if (rc->owners.elements == 0 || thd->killed) if (rc->owners.elements == 0)
ret= WT_OK; ret= WT_OK;
rc_unlock(rc); rc_unlock(rc);
set_timespec_time_nsec(timeout, starttime, (*thd->timeout_short)*ULL(1000)); set_timespec_time_nsec(timeout, starttime, (*thd->timeout_short)*ULL(1000));
if (ret == WT_TIMEOUT) if (ret == WT_TIMEOUT && !thd->killed)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout); ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
if (ret == WT_TIMEOUT) if (ret == WT_TIMEOUT && !thd->killed)
{ {
int r= deadlock(thd, thd, 0, *thd->deadlock_search_depth_long); int r= deadlock(thd, thd, 0, *thd->deadlock_search_depth_long);
if (r == WT_FREE_TO_GO) if (r == WT_FREE_TO_GO)
...@@ -935,24 +1107,25 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex) ...@@ -935,24 +1107,25 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
@param resid a resource to release. 0 to release all resources @param resid a resource to release. 0 to release all resources
*/ */
void wt_thd_release(WT_THD *thd, WT_RESOURCE_ID *resid) void wt_thd_release(WT_THD *thd, const WT_RESOURCE_ID *resid)
{ {
uint i; uint i;
DBUG_ENTER("wt_thd_release"); DBUG_ENTER("wt_thd_release");
for (i=0; i < thd->my_resources.elements; i++) for (i= 0; i < thd->my_resources.elements; i++)
{ {
uint j;
WT_RESOURCE *rc= *dynamic_element(&thd->my_resources, i, WT_RESOURCE**); WT_RESOURCE *rc= *dynamic_element(&thd->my_resources, i, WT_RESOURCE**);
if (!resid || (resid->type->compare(&rc->id, resid) == 0)) if (!resid || (resid->type->compare(&rc->id, resid) == 0))
{ {
uint j;
rc_wrlock(rc); rc_wrlock(rc);
/* /*
nobody's trying to free the resource now, nobody's trying to free the resource now,
as its owners[] array is not empty (at least thd must be there) as its owners[] array is not empty (at least thd must be there)
*/ */
DBUG_ASSERT(rc->state == ACTIVE); DBUG_ASSERT(rc->state == ACTIVE);
for (j=0; j < rc->owners.elements; j++) for (j= 0; j < rc->owners.elements; j++)
if (*dynamic_element(&rc->owners, j, WT_THD**) == thd) if (*dynamic_element(&rc->owners, j, WT_THD**) == thd)
break; break;
DBUG_ASSERT(j < rc->owners.elements); DBUG_ASSERT(j < rc->owners.elements);
...@@ -961,8 +1134,8 @@ void wt_thd_release(WT_THD *thd, WT_RESOURCE_ID *resid) ...@@ -961,8 +1134,8 @@ void wt_thd_release(WT_THD *thd, WT_RESOURCE_ID *resid)
{ {
pthread_cond_broadcast(&rc->cond); pthread_cond_broadcast(&rc->cond);
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (rc->mutex) if (rc->cond_mutex)
safe_mutex_assert_owner(rc->mutex); safe_mutex_assert_owner(rc->cond_mutex);
#endif #endif
} }
unlock_lock_and_free_resource(thd, rc); unlock_lock_and_free_resource(thd, rc);
......
...@@ -78,7 +78,7 @@ Named_value_arr::Named_value_arr() : ...@@ -78,7 +78,7 @@ Named_value_arr::Named_value_arr() :
bool Named_value_arr::init() bool Named_value_arr::init()
{ {
if (my_init_dynamic_array(&arr, sizeof(Named_value), 0, 32)) if (my_init_dynamic_array(&arr, sizeof(Named_value), 32, 32))
return TRUE; return TRUE;
initialized= TRUE; initialized= TRUE;
......
/* Copyright (C) 2000-2003 MySQL AB /* Copyright (C) 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -3718,8 +3718,6 @@ static int init_server_components() ...@@ -3718,8 +3718,6 @@ static int init_server_components()
if (table_cache_init() | table_def_init() | hostname_cache_init()) if (table_cache_init() | table_def_init() | hostname_cache_init())
unireg_abort(1); unireg_abort(1);
wt_init();
query_cache_result_size_limit(query_cache_limit); query_cache_result_size_limit(query_cache_limit);
query_cache_set_min_res_unit(query_cache_min_res_unit); query_cache_set_min_res_unit(query_cache_min_res_unit);
query_cache_init(); query_cache_init();
...@@ -3731,6 +3729,7 @@ static int init_server_components() ...@@ -3731,6 +3729,7 @@ static int init_server_components()
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
init_slave_list(); init_slave_list();
#endif #endif
wt_init();
/* Setup logs */ /* Setup logs */
...@@ -7471,7 +7470,7 @@ static void usage(void) ...@@ -7471,7 +7470,7 @@ static void usage(void)
default_collation_name= (char*) default_charset_info->name; default_collation_name= (char*) default_charset_info->name;
print_version(); print_version();
puts("\ puts("\
Copyright (C) 2000 MySQL AB, by Monty and others\n\ Copyright (C) 2000-2008 MySQL AB, Monty and others, 2008-2009 Sun Microsystems, Inc.\n\
This software comes with ABSOLUTELY NO WARRANTY. This is free software,\n\ This software comes with ABSOLUTELY NO WARRANTY. This is free software,\n\
and you are welcome to modify and redistribute it under the GPL license\n\n\ and you are welcome to modify and redistribute it under the GPL license\n\n\
Starts the MySQL database server\n"); Starts the MySQL database server\n");
......
/* Copyright (C) 2000-2006 MySQL AB /* Copyright (C) 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -1123,6 +1123,8 @@ bool THD::store_globals() ...@@ -1123,6 +1123,8 @@ bool THD::store_globals()
*/ */
mysys_var->id= thread_id; mysys_var->id= thread_id;
real_id= pthread_self(); // For debugging real_id= pthread_self(); // For debugging
mysys_var->stack_ends_here= thread_stack + // for consistency, see libevent_thread_proc
STACK_DIRECTION * (long)my_thread_stack_size;
/* /*
We have to call thr_lock_info_init() again here as THD may have been We have to call thr_lock_info_init() again here as THD may have been
......
/* Copyright (C) 2000-2006 MySQL AB /* Copyright (C) 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -1417,7 +1417,7 @@ class THD :public Statement, ...@@ -1417,7 +1417,7 @@ class THD :public Statement,
THD_TRANS stmt; // Trans for current statement THD_TRANS stmt; // Trans for current statement
bool on; // see ha_enable_transaction() bool on; // see ha_enable_transaction()
XID_STATE xid_state; XID_STATE xid_state;
WT_THD wt; WT_THD wt; ///< for deadlock detection
Rows_log_event *m_pending_rows_event; Rows_log_event *m_pending_rows_event;
/* /*
......
/* Copyright (C) 2006,2004 MySQL AB & MySQL Finland AB & TCX DataKonsult AB /* Copyright (C) 2004-2008 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
Copyright (C) 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -40,9 +41,11 @@ C_MODE_END ...@@ -40,9 +41,11 @@ C_MODE_END
Note that in future versions, only *transactional* Maria tables can Note that in future versions, only *transactional* Maria tables can
rollback, so this flag should be up or down conditionally. rollback, so this flag should be up or down conditionally.
*/ */
#define MARIA_CANNOT_ROLLBACK HA_NO_TRANSACTIONS
#ifdef MARIA_CANNOT_ROLLBACK #ifdef MARIA_CANNOT_ROLLBACK
#define CANNOT_ROLLBACK_FLAG HA_NO_TRANSACTIONS
#define trans_register_ha(A, B, C) do { /* nothing */ } while(0) #define trans_register_ha(A, B, C) do { /* nothing */ } while(0)
#else
#define CANNOT_ROLLBACK_FLAG 0
#endif #endif
#define THD_TRN (*(TRN **)thd_ha_data(thd, maria_hton)) #define THD_TRN (*(TRN **)thd_ha_data(thd, maria_hton))
...@@ -716,7 +719,7 @@ handler(hton, table_arg), file(0), ...@@ -716,7 +719,7 @@ handler(hton, table_arg), file(0),
int_table_flags(HA_NULL_IN_KEY | HA_CAN_FULLTEXT | HA_CAN_SQL_HANDLER | int_table_flags(HA_NULL_IN_KEY | HA_CAN_FULLTEXT | HA_CAN_SQL_HANDLER |
HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE |
HA_DUPLICATE_POS | HA_CAN_INDEX_BLOBS | HA_AUTO_PART_KEY | HA_DUPLICATE_POS | HA_CAN_INDEX_BLOBS | HA_AUTO_PART_KEY |
HA_FILE_BASED | HA_CAN_GEOMETRY | MARIA_CANNOT_ROLLBACK | HA_FILE_BASED | HA_CAN_GEOMETRY | CANNOT_ROLLBACK_FLAG |
HA_CAN_BIT_FIELD | HA_CAN_RTREEKEYS | HA_CAN_BIT_FIELD | HA_CAN_RTREEKEYS |
HA_HAS_RECORDS | HA_STATS_RECORDS_IS_EXACT), HA_HAS_RECORDS | HA_STATS_RECORDS_IS_EXACT),
can_enable_indexes(1), bulk_insert_single_undo(BULK_INSERT_NONE) can_enable_indexes(1), bulk_insert_single_undo(BULK_INSERT_NONE)
......
/* Copyright (C) 2007 MySQL AB /* Copyright (C) 2007-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "trnman.h" #include "trnman.h"
/** /**
@brief writes a COMMIT record to log and commits transaction in memory writes a COMMIT record to log and commits transaction in memory
@param trn transaction @param trn transaction
...@@ -82,7 +82,7 @@ int ma_commit(TRN *trn) ...@@ -82,7 +82,7 @@ int ma_commit(TRN *trn)
/** /**
@brief Writes a COMMIT record for a transaciton associated with a file Writes a COMMIT record for a transaciton associated with a file
@param info Maria handler @param info Maria handler
...@@ -98,13 +98,17 @@ int maria_commit(MARIA_HA *info) ...@@ -98,13 +98,17 @@ int maria_commit(MARIA_HA *info)
/** /**
@brief Starts a transaction on a file handle Starts a transaction on a file handle
@param info Maria handler @param info Maria handler
@return Operation status @return Operation status
@retval 0 ok @retval 0 ok
@retval # Error code. @retval # Error code.
@note this can be used only in single-threaded programs (tests),
because we create a transaction (trnman_new_trn) with WT_THD=0.
XXX it needs to be fixed when we'll start using maria_begin from SQL.
*/ */
int maria_begin(MARIA_HA *info) int maria_begin(MARIA_HA *info)
......
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB /* Copyright (C) 2004-2008 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
Copyright (C) 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -39,8 +40,8 @@ static uchar *_ma_find_last_pos(MARIA_HA *info, MARIA_KEY *int_key, ...@@ -39,8 +40,8 @@ static uchar *_ma_find_last_pos(MARIA_HA *info, MARIA_KEY *int_key,
uchar *page, uchar **after_key); uchar *page, uchar **after_key);
static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key); static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key);
static my_bool _ma_ck_write_btree(register MARIA_HA *info, MARIA_KEY *key); static my_bool _ma_ck_write_btree(register MARIA_HA *info, MARIA_KEY *key);
static int _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEY *key, static int _ma_ck_write_btree_with_log(MARIA_HA *, MARIA_KEY *, my_off_t *,
my_off_t *root, uint32 comp_flag); uint32);
static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, const uchar *buff, static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, const uchar *buff,
uint org_length, uint new_length, uint org_length, uint new_length,
const uchar *key_pos, const uchar *key_pos,
...@@ -181,9 +182,8 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -181,9 +182,8 @@ int maria_write(MARIA_HA *info, uchar *record)
else else
{ {
while (keyinfo->ck_insert(info, while (keyinfo->ck_insert(info,
(*keyinfo->make_key)(info, &int_key, i, (*keyinfo->make_key)(info, &int_key, i, buff, record,
buff, record, filepos, filepos, info->trn->trid)))
info->trn->trid)))
{ {
TRN *blocker; TRN *blocker;
DBUG_PRINT("error",("Got error: %d on write",my_errno)); DBUG_PRINT("error",("Got error: %d on write",my_errno));
...@@ -193,10 +193,12 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -193,10 +193,12 @@ int maria_write(MARIA_HA *info, uchar *record)
below doesn't work for them. below doesn't work for them.
Also, filter out non-thread maria use, and table modified in Also, filter out non-thread maria use, and table modified in
the same transaction. the same transaction.
At last, filter out non-dup-unique errors.
*/ */
if (!local_lock_tree) if (!local_lock_tree)
goto err; goto err;
if (info->dup_key_trid == info->trn->trid) if (info->dup_key_trid == info->trn->trid ||
my_errno != HA_ERR_FOUND_DUPP_KEY)
{ {
rw_unlock(&keyinfo->root_lock); rw_unlock(&keyinfo->root_lock);
goto err; goto err;
...@@ -257,6 +259,9 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -257,6 +259,9 @@ int maria_write(MARIA_HA *info, uchar *record)
} }
} }
rw_wrlock(&keyinfo->root_lock); rw_wrlock(&keyinfo->root_lock);
#ifndef MARIA_CANNOT_ROLLBACK
keyinfo->version++;
#endif
} }
} }
...@@ -671,12 +676,14 @@ static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key, ...@@ -671,12 +676,14 @@ static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key,
When the index will support true versioning - with multiple When the index will support true versioning - with multiple
identical values in the UNIQUE index, invisible to each other - identical values in the UNIQUE index, invisible to each other -
the following should be changed to "continue inserting keys, at the the following should be changed to "continue inserting keys, at the
end (of the row or statement) wait". Until it's done we cannot properly end (of the row or statement) wait". We need to wait on *all*
support deadlock timeouts. unique conflicts at once, not one-at-a-time, because we need to
know all blockers in advance, otherwise we'll have incomplete wait-for
graph.
*/ */
/* /*
transaction that has inserted the conflicting key is in progress. transaction that has inserted the conflicting key may be in progress.
wait for it to be committed or aborted. the caller will wait for it to be committed or aborted.
*/ */
info->dup_key_trid= _ma_trid_from_key(&tmp_key); info->dup_key_trid= _ma_trid_from_key(&tmp_key);
info->dup_key_pos= dup_key_pos; info->dup_key_pos= dup_key_pos;
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -42,7 +42,7 @@ static TrID global_trid_generator; ...@@ -42,7 +42,7 @@ static TrID global_trid_generator;
The default value is used when transaction manager not initialize; The default value is used when transaction manager not initialize;
Probably called from maria_chk Probably called from maria_chk
*/ */
static TrID trid_min_read_from= ~(TrID) 0; static TrID trid_min_read_from= MAX_TRID;
/* the mutex for everything above */ /* the mutex for everything above */
static pthread_mutex_t LOCK_trn_list; static pthread_mutex_t LOCK_trn_list;
...@@ -59,6 +59,7 @@ static TRN **short_trid_to_active_trn; ...@@ -59,6 +59,7 @@ static TRN **short_trid_to_active_trn;
/* locks for short_trid_to_active_trn and pool */ /* locks for short_trid_to_active_trn and pool */
static my_atomic_rwlock_t LOCK_short_trid_to_trn, LOCK_pool; static my_atomic_rwlock_t LOCK_short_trid_to_trn, LOCK_pool;
static my_bool default_trnman_end_trans_hook(TRN *, my_bool, my_bool); static my_bool default_trnman_end_trans_hook(TRN *, my_bool, my_bool);
static void trnman_free_trn(TRN *);
my_bool (*trnman_end_trans_hook)(TRN *, my_bool, my_bool)= my_bool (*trnman_end_trans_hook)(TRN *, my_bool, my_bool)=
default_trnman_end_trans_hook; default_trnman_end_trans_hook;
...@@ -88,6 +89,7 @@ void trnman_reset_locked_tables(TRN *trn, uint locked_tables) ...@@ -88,6 +89,7 @@ void trnman_reset_locked_tables(TRN *trn, uint locked_tables)
trn->locked_tables= locked_tables; trn->locked_tables= locked_tables;
} }
/** Wake up threads waiting for this transaction */
static void wt_thd_release_self(TRN *trn) static void wt_thd_release_self(TRN *trn)
{ {
if (trn->wt) if (trn->wt)
...@@ -149,12 +151,12 @@ int trnman_init(TrID initial_trid) ...@@ -149,12 +151,12 @@ int trnman_init(TrID initial_trid)
*/ */
active_list_max.trid= active_list_min.trid= 0; active_list_max.trid= active_list_min.trid= 0;
active_list_max.min_read_from= ~(TrID) 0; active_list_max.min_read_from= MAX_TRID;
active_list_max.next= active_list_min.prev= 0; active_list_max.next= active_list_min.prev= 0;
active_list_max.prev= &active_list_min; active_list_max.prev= &active_list_min;
active_list_min.next= &active_list_max; active_list_min.next= &active_list_max;
committed_list_max.commit_trid= ~(TrID) 0; committed_list_max.commit_trid= MAX_TRID;
committed_list_max.next= committed_list_min.prev= 0; committed_list_max.next= committed_list_min.prev= 0;
committed_list_max.prev= &committed_list_min; committed_list_max.prev= &committed_list_min;
committed_list_min.next= &committed_list_max; committed_list_min.next= &committed_list_max;
...@@ -198,6 +200,7 @@ void trnman_destroy() ...@@ -198,6 +200,7 @@ void trnman_destroy()
{ {
TRN *trn= pool; TRN *trn= pool;
pool= pool->next; pool= pool->next;
DBUG_ASSERT(trn->wt == NULL);
pthread_mutex_destroy(&trn->state_lock); pthread_mutex_destroy(&trn->state_lock);
my_free((void *)trn, MYF(0)); my_free((void *)trn, MYF(0));
} }
...@@ -251,10 +254,12 @@ static uint get_short_trid(TRN *trn) ...@@ -251,10 +254,12 @@ static uint get_short_trid(TRN *trn)
return res; return res;
} }
/* /**
DESCRIPTION Allocates and initialzies a new TRN object
start a new transaction, allocate and initialize transaction object
mutex and cond will be used for lock waits @note the 'wt' parameter can only be 0 in a single-threaded code (or,
generally, where threads cannot block each other), otherwise the
first call to the deadlock detector will sigsegv.
*/ */
TRN *trnman_new_trn(WT_THD *wt) TRN *trnman_new_trn(WT_THD *wt)
...@@ -338,7 +343,8 @@ TRN *trnman_new_trn(WT_THD *wt) ...@@ -338,7 +343,8 @@ TRN *trnman_new_trn(WT_THD *wt)
trn->min_read_from= trn->trid + 1; trn->min_read_from= trn->trid + 1;
} }
trn->commit_trid= ~(TrID)0; /* no other transaction can read changes done by this one */
trn->commit_trid= MAX_TRID;
trn->rec_lsn= trn->undo_lsn= trn->first_undo_lsn= 0; trn->rec_lsn= trn->undo_lsn= trn->first_undo_lsn= 0;
trn->used_tables= 0; trn->used_tables= 0;
...@@ -394,6 +400,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) ...@@ -394,6 +400,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
/* if a rollback, all UNDO records should have been executed */ /* if a rollback, all UNDO records should have been executed */
DBUG_ASSERT(commit || trn->undo_lsn == 0); DBUG_ASSERT(commit || trn->undo_lsn == 0);
DBUG_ASSERT(trn != &dummy_transaction_object);
DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list")); DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list"));
pthread_mutex_lock(&LOCK_trn_list); pthread_mutex_lock(&LOCK_trn_list);
...@@ -429,6 +436,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) ...@@ -429,6 +436,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
} }
pthread_mutex_lock(&trn->state_lock); pthread_mutex_lock(&trn->state_lock);
if (commit)
trn->commit_trid= global_trid_generator; trn->commit_trid= global_trid_generator;
wt_thd_release_self(trn); wt_thd_release_self(trn);
pthread_mutex_unlock(&trn->state_lock); pthread_mutex_unlock(&trn->state_lock);
...@@ -502,7 +510,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) ...@@ -502,7 +510,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
running. It may even be called automatically on checkpoints if no running. It may even be called automatically on checkpoints if no
transactions are running. transactions are running.
*/ */
void trnman_free_trn(TRN *trn) static void trnman_free_trn(TRN *trn)
{ {
/* /*
union is to solve strict aliasing issue. union is to solve strict aliasing issue.
...@@ -580,6 +588,16 @@ int trnman_can_read_from(TRN *trn, TrID trid) ...@@ -580,6 +588,16 @@ int trnman_can_read_from(TRN *trn, TrID trid)
return can; return can;
} }
/**
Finds a TRN by its TrID
@param trn current trn. Needed for pinning pointers (see lf_pin)
@param trid trid to search for
@return found trn or 0
@note that trn is returned with its state locked!
*/
TRN *trnman_trid_to_trn(TRN *trn, TrID trid) TRN *trnman_trid_to_trn(TRN *trn, TrID trid)
{ {
TRN **found; TRN **found;
...@@ -604,7 +622,7 @@ TRN *trnman_trid_to_trn(TRN *trn, TrID trid) ...@@ -604,7 +622,7 @@ TRN *trnman_trid_to_trn(TRN *trn, TrID trid)
lf_hash_search_unpin(trn->pins); lf_hash_search_unpin(trn->pins);
/* Gotcha! */ /* Gotcha! */
return *found; /* note that TRN is returned locked !!! */ return *found;
} }
/* TODO: the stubs below are waiting for savepoints to be implemented */ /* TODO: the stubs below are waiting for savepoints to be implemented */
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -22,7 +22,7 @@ C_MODE_START ...@@ -22,7 +22,7 @@ C_MODE_START
#include "trnman_public.h" #include "trnman_public.h"
#include "ma_loghandler_lsn.h" #include "ma_loghandler_lsn.h"
/* /**
trid - 6 uchar transaction identifier. Assigned when a transaction trid - 6 uchar transaction identifier. Assigned when a transaction
is created. Transaction can always be identified by its trid, is created. Transaction can always be identified by its trid,
even after transaction has ended. even after transaction has ended.
...@@ -33,7 +33,7 @@ C_MODE_START ...@@ -33,7 +33,7 @@ C_MODE_START
when short_id is 0, TRN is not initialized, for all practical purposes when short_id is 0, TRN is not initialized, for all practical purposes
it could be considered unused. it could be considered unused.
when commit_trid is ~(TrID)0 the transaction is running, otherwise it's when commit_trid is MAX_TRID the transaction is running, otherwise it's
committed. committed.
state_lock mutex protects the state of a TRN, that is whether a TRN state_lock mutex protects the state of a TRN, that is whether a TRN
...@@ -46,7 +46,7 @@ struct st_ma_transaction ...@@ -46,7 +46,7 @@ struct st_ma_transaction
LF_PINS *pins; LF_PINS *pins;
WT_THD *wt; WT_THD *wt;
pthread_mutex_t state_lock; pthread_mutex_t state_lock;
void *used_tables; /* Tables used by transaction */ void *used_tables; /**< Tables used by transaction */
TRN *next, *prev; TRN *next, *prev;
TrID trid, min_read_from, commit_trid; TrID trid, min_read_from, commit_trid;
LSN rec_lsn, undo_lsn; LSN rec_lsn, undo_lsn;
...@@ -56,6 +56,7 @@ struct st_ma_transaction ...@@ -56,6 +56,7 @@ struct st_ma_transaction
}; };
#define TRANSACTION_LOGGED_LONG_ID ULL(0x8000000000000000) #define TRANSACTION_LOGGED_LONG_ID ULL(0x8000000000000000)
#define MAX_TRID (~(TrID)0)
extern WT_RESOURCE_TYPE ma_rc_dup_unique; extern WT_RESOURCE_TYPE ma_rc_dup_unique;
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -44,7 +44,6 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit); ...@@ -44,7 +44,6 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit);
#define trnman_commit_trn(T) trnman_end_trn(T, TRUE) #define trnman_commit_trn(T) trnman_end_trn(T, TRUE)
#define trnman_abort_trn(T) trnman_end_trn(T, FALSE) #define trnman_abort_trn(T) trnman_end_trn(T, FALSE)
#define trnman_rollback_trn(T) trnman_end_trn(T, FALSE) #define trnman_rollback_trn(T) trnman_end_trn(T, FALSE)
void trnman_free_trn(TRN *trn);
int trnman_can_read_from(TRN *trn, TrID trid); int trnman_can_read_from(TRN *trn, TrID trid);
TRN *trnman_trid_to_trn(TRN *trn, TrID trid); TRN *trnman_trid_to_trn(TRN *trn, TrID trid);
void trnman_new_statement(TRN *trn); void trnman_new_statement(TRN *trn);
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -40,7 +40,8 @@ pthread_handler_t test_trnman(void *arg) ...@@ -40,7 +40,8 @@ pthread_handler_t test_trnman(void *arg)
TRN *trn[MAX_ITER]; TRN *trn[MAX_ITER];
int m= (*(int *)arg); int m= (*(int *)arg);
my_thread_init(); if (my_thread_init())
BAIL_OUT("my_thread_init failed!");
for (x= ((int)(intptr)(&m)); m > 0; ) for (x= ((int)(intptr)(&m)); m > 0; )
{ {
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2008-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -13,6 +13,12 @@ ...@@ -13,6 +13,12 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/**
@file
Unit tests for lock-free algorithms of mysys
*/
#include "thr_template.c" #include "thr_template.c"
#include <lf.h> #include <lf.h>
...@@ -47,6 +53,10 @@ pthread_handler_t test_lf_pinbox(void *arg) ...@@ -47,6 +53,10 @@ pthread_handler_t test_lf_pinbox(void *arg)
return 0; return 0;
} }
/*
thread local data area, allocated using lf_alloc.
union is required to enforce the minimum required element size (sizeof(ptr))
*/
typedef union { typedef union {
int32 data; int32 data;
void *not_used; void *not_used;
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2006-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
......
/* Copyright (C) 2006 MySQL AB /* Copyright (C) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#include "thr_template.c" #include "thr_template.c"
#include <waiting_threads.h> #include <waiting_threads.h>
#include <m_string.h> #include <m_string.h>
#include <locale.h>
struct test_wt_thd { struct test_wt_thd {
WT_THD thd; WT_THD thd;
...@@ -31,6 +30,7 @@ ulong wt_timeout_long=10000, wt_deadlock_search_depth_long=15; ...@@ -31,6 +30,7 @@ ulong wt_timeout_long=10000, wt_deadlock_search_depth_long=15;
#define reset(ARRAY) bzero(ARRAY, sizeof(ARRAY)) #define reset(ARRAY) bzero(ARRAY, sizeof(ARRAY))
/* see explanation of the kill strategies in waiting_threads.h */
enum { LATEST, RANDOM, YOUNGEST, LOCKS } kill_strategy; enum { LATEST, RANDOM, YOUNGEST, LOCKS } kill_strategy;
WT_RESOURCE_TYPE restype={ wt_resource_id_memcmp, 0}; WT_RESOURCE_TYPE restype={ wt_resource_id_memcmp, 0};
...@@ -68,13 +68,14 @@ pthread_handler_t test_wt(void *arg) ...@@ -68,13 +68,14 @@ pthread_handler_t test_wt(void *arg)
res= 0; res= 0;
/* prepare for waiting for a random number of random threads */
for (j= n= (rnd() % THREADS)/10; !res && j >= 0; j--) for (j= n= (rnd() % THREADS)/10; !res && j >= 0; j--)
{ {
retry: retry:
i= rnd() % (THREADS-1); i= rnd() % (THREADS-1); /* pick a random thread */
if (i >= id) i++; if (i >= id) i++; /* with a number from 0 to THREADS-1 excluding ours */
for (k=n; k >=j; k--) for (k=n; k >=j; k--) /* the one we didn't pick before */
if (blockers[k] == i) if (blockers[k] == i)
goto retry; goto retry;
blockers[j]= i; blockers[j]= i;
...@@ -121,7 +122,7 @@ pthread_handler_t test_wt(void *arg) ...@@ -121,7 +122,7 @@ pthread_handler_t test_wt(void *arg)
#define DEL "(deleted)" #define DEL "(deleted)"
char *x=malloc(strlen(thds[id].thd.name)+sizeof(DEL)+1); char *x=malloc(strlen(thds[id].thd.name)+sizeof(DEL)+1);
strxmov(x, thds[id].thd.name, DEL, 0); strxmov(x, thds[id].thd.name, DEL, 0);
thds[id].thd.name=x; /* it's a memory leak, go on, shoot me */ thds[id].thd.name=x;
} }
#endif #endif
...@@ -165,8 +166,8 @@ void do_one_test() ...@@ -165,8 +166,8 @@ void do_one_test()
void do_tests() void do_tests()
{ {
plan(12); plan(14);
compile_time_assert(THREADS >= 3); compile_time_assert(THREADS >= 4);
DBUG_PRINT("wt", ("================= initialization ===================")); DBUG_PRINT("wt", ("================= initialization ==================="));
...@@ -206,22 +207,22 @@ void do_tests() ...@@ -206,22 +207,22 @@ void do_tests()
pthread_mutex_lock(&lock); pthread_mutex_lock(&lock);
bad= wt_thd_cond_timedwait(& thds[0].thd, &lock); bad= wt_thd_cond_timedwait(& thds[0].thd, &lock);
pthread_mutex_unlock(&lock); pthread_mutex_unlock(&lock);
ok(bad == ETIMEDOUT, "timeout test returned %d", bad); ok(bad == WT_TIMEOUT, "timeout test returned %d", bad);
ok_wait(0,1,0); ok_wait(0,1,0);
ok_wait(1,2,1); ok_wait(1,2,1);
ok_deadlock(2,0,2); ok_deadlock(2,0,2);
pthread_mutex_lock(&lock); pthread_mutex_lock(&lock);
wt_thd_cond_timedwait(& thds[0].thd, &lock); ok(wt_thd_cond_timedwait(& thds[0].thd, &lock) == WT_TIMEOUT, "as always");
wt_thd_cond_timedwait(& thds[1].thd, &lock); ok(wt_thd_cond_timedwait(& thds[1].thd, &lock) == WT_TIMEOUT, "as always");
wt_thd_release_all(& thds[0].thd); wt_thd_release_all(& thds[0].thd);
wt_thd_release_all(& thds[1].thd); wt_thd_release_all(& thds[1].thd);
wt_thd_release_all(& thds[2].thd); wt_thd_release_all(& thds[2].thd);
wt_thd_release_all(& thds[3].thd); wt_thd_release_all(& thds[3].thd);
pthread_mutex_unlock(&lock); pthread_mutex_unlock(&lock);
for (cnt=0; cnt < 3; cnt++) for (cnt=0; cnt < 4; cnt++)
{ {
wt_thd_destroy(& thds[cnt].thd); wt_thd_destroy(& thds[cnt].thd);
wt_thd_lazy_init(& thds[cnt].thd, wt_thd_lazy_init(& thds[cnt].thd,
...@@ -261,6 +262,7 @@ void do_tests() ...@@ -261,6 +262,7 @@ void do_tests()
wt_thd_release_all(& thds[cnt].thd); wt_thd_release_all(& thds[cnt].thd);
wt_thd_destroy(& thds[cnt].thd); wt_thd_destroy(& thds[cnt].thd);
pthread_mutex_destroy(& thds[cnt].lock); pthread_mutex_destroy(& thds[cnt].lock);
free(thds[cnt].thd.name);
} }
pthread_mutex_unlock(&lock); pthread_mutex_unlock(&lock);
wt_end(); wt_end();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment