Commit fb818dd7 authored by unknown's avatar unknown

more post-review fixes - comments, renames, error checks in unit tests

concurrency bug in lock manager


include/my_global.h:
  compile-time assert macro
mysys/my_atomic.c:
  use compile_time_assert() macro
storage/maria/lockman.c:
  bug in concurrent lockdelete (with retries)
storage/maria/trnman.c:
  more post-review fixes - comments, renames
storage/maria/trnman.h:
  more post-review fixes - comments
storage/maria/unittest/lockman-t.c:
  friendlier error checks
storage/maria/unittest/trnman-t.c:
  friendlier error checks
parent a79868ae
...@@ -428,6 +428,13 @@ C_MODE_END ...@@ -428,6 +428,13 @@ C_MODE_END
*/ */
#include <assert.h> #include <assert.h>
/* an assert that works at compile-time. only for constant expression */
#define compile_time_assert(X) \
do \
{ \
char compile_time_assert[(X) ? 1 : -1]; \
} while(0)
/* Go around some bugs in different OS and compilers */ /* Go around some bugs in different OS and compilers */
#if defined (HPUX11) && defined(_LARGEFILE_SOURCE) #if defined (HPUX11) && defined(_LARGEFILE_SOURCE)
#define _LARGEFILE64_SOURCE #define _LARGEFILE64_SOURCE
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
*/ */
int my_atomic_initialize() int my_atomic_initialize()
{ {
char assert_the_size[sizeof(intptr) == sizeof(void *) ? 1 : -1]; compile_time_assert(sizeof(intptr) == sizeof(void *));
/* currently the only thing worth checking is SMP/UP issue */ /* currently the only thing worth checking is SMP/UP issue */
#ifdef MY_ATOMIC_MODE_DUMMY #ifdef MY_ATOMIC_MODE_DUMMY
return my_getncpus() == 1 ? MY_ATOMIC_OK : MY_ATOMIC_NOT_1CPU; return my_getncpus() == 1 ? MY_ATOMIC_OK : MY_ATOMIC_NOT_1CPU;
......
...@@ -421,13 +421,14 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins, ...@@ -421,13 +421,14 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
} }
if (!(res & NEED_TO_WAIT)) if (!(res & NEED_TO_WAIT))
node->flags|= ACTIVE; node->flags|= ACTIVE;
else
node->flags&= ~ACTIVE; /* if we're retrying on REPEAT_ONCE_MORE */
node->link= (intptr)cursor.curr; node->link= (intptr)cursor.curr;
DBUG_ASSERT(node->link != (intptr)node); DBUG_ASSERT(node->link != (intptr)node);
DBUG_ASSERT(cursor.prev != &node->link); DBUG_ASSERT(cursor.prev != &node->link);
if (!my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node)) if (!my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node))
{
res= REPEAT_ONCE_MORE; res= REPEAT_ONCE_MORE;
node->flags&= ~ACTIVE;
}
if (res & LOCK_UPGRADE) if (res & LOCK_UPGRADE)
cursor.upgrade_from->flags|= IGNORE_ME; cursor.upgrade_from->flags|= IGNORE_ME;
} }
...@@ -496,7 +497,11 @@ static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins) ...@@ -496,7 +497,11 @@ static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins)
lockfind(head, node, &cursor, pins); lockfind(head, node, &cursor, pins);
} }
else else
{
res= REPEAT_ONCE_MORE; res= REPEAT_ONCE_MORE;
if (cursor.upgrade_from) /* to satisfy the assert in lockfind */
cursor.upgrade_from->flags|= IGNORE_ME;
}
} while (res == REPEAT_ONCE_MORE); } while (res == REPEAT_ONCE_MORE);
_lf_unpin(pins, 0); _lf_unpin(pins, 0);
_lf_unpin(pins, 1); _lf_unpin(pins, 1);
...@@ -744,7 +749,7 @@ static char *lock2str[]= ...@@ -744,7 +749,7 @@ static char *lock2str[]=
void print_lockhash(LOCKMAN *lm) void print_lockhash(LOCKMAN *lm)
{ {
LOCK *el= *(LOCK **)_lf_dynarray_lvalue(&lm->array, 0); LOCK *el= *(LOCK **)_lf_dynarray_lvalue(&lm->array, 0);
printf("hash: size:%u count:%u\n", lm->size, lm->count); printf("hash: size %u count %u\n", lm->size, lm->count);
while (el) while (el)
{ {
intptr next= el->link; intptr next= el->link;
......
...@@ -48,25 +48,35 @@ static my_atomic_rwlock_t LOCK_short_trid_to_trn, LOCK_pool; ...@@ -48,25 +48,35 @@ static my_atomic_rwlock_t LOCK_short_trid_to_trn, LOCK_pool;
static LOCKMAN maria_lockman; static LOCKMAN maria_lockman;
static byte *trn_get_hash_key(const byte *trn, uint* len, my_bool unused) /*
{ short transaction id is at the same time its identifier
*len= sizeof(TrID); for a lock manager - its lock owner identifier (loid)
return (byte *) & ((*((TRN **)trn))->trid); */
} #define short_id locks.loid
static LOCK_OWNER *trnman_short_trid_to_TRN(uint16 short_trid) /*
NOTE
Just as short_id doubles as loid, this function doubles as
short_trid_to_LOCK_OWNER. See the compile-time assert below.
*/
static TRN *short_trid_to_TRN(uint16 short_trid)
{ {
TRN *trn; TRN *trn;
compile_time_assert(offsetof(TRN, locks) == 0);
my_atomic_rwlock_rdlock(&LOCK_short_trid_to_trn); my_atomic_rwlock_rdlock(&LOCK_short_trid_to_trn);
trn= my_atomic_loadptr((void **)&short_trid_to_active_trn[short_trid]); trn= my_atomic_loadptr((void **)&short_trid_to_active_trn[short_trid]);
my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn); my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn);
return (LOCK_OWNER *)trn; return (TRN *)trn;
} }
int trnman_init() static byte *trn_get_hash_key(const byte *trn, uint* len, my_bool unused)
{ {
pthread_mutex_init(&LOCK_trn_list, MY_MUTEX_INIT_FAST); *len= sizeof(TrID);
return (byte *) & ((*((TRN **)trn))->trid);
}
int trnman_init()
{
/* /*
Initialize lists. Initialize lists.
active_list_max.min_read_from must be larger than any trid, active_list_max.min_read_from must be larger than any trid,
...@@ -95,6 +105,7 @@ int trnman_init() ...@@ -95,6 +105,7 @@ int trnman_init()
global_trid_generator= 0; /* set later by the recovery code */ global_trid_generator= 0; /* set later by the recovery code */
lf_hash_init(&trid_to_committed_trn, sizeof(TRN*), LF_HASH_UNIQUE, lf_hash_init(&trid_to_committed_trn, sizeof(TRN*), LF_HASH_UNIQUE,
0, 0, trn_get_hash_key, 0); 0, 0, trn_get_hash_key, 0);
pthread_mutex_init(&LOCK_trn_list, MY_MUTEX_INIT_FAST);
my_atomic_rwlock_init(&LOCK_short_trid_to_trn); my_atomic_rwlock_init(&LOCK_short_trid_to_trn);
my_atomic_rwlock_init(&LOCK_pool); my_atomic_rwlock_init(&LOCK_pool);
short_trid_to_active_trn= (TRN **)my_malloc(SHORT_TRID_MAX*sizeof(TRN*), short_trid_to_active_trn= (TRN **)my_malloc(SHORT_TRID_MAX*sizeof(TRN*),
...@@ -103,7 +114,7 @@ int trnman_init() ...@@ -103,7 +114,7 @@ int trnman_init()
return 1; return 1;
short_trid_to_active_trn--; /* min short_trid is 1 */ short_trid_to_active_trn--; /* min short_trid is 1 */
lockman_init(&maria_lockman, &trnman_short_trid_to_TRN, 10000); lockman_init(&maria_lockman, (loid_to_lo_func *)&short_trid_to_TRN, 10000);
return 0; return 0;
} }
...@@ -162,7 +173,7 @@ static void set_short_trid(TRN *trn) ...@@ -162,7 +173,7 @@ static void set_short_trid(TRN *trn)
break; break;
} }
my_atomic_rwlock_wrunlock(&LOCK_short_trid_to_trn); my_atomic_rwlock_wrunlock(&LOCK_short_trid_to_trn);
trn->locks.loid= i; trn->short_id= i;
} }
/* /*
...@@ -210,7 +221,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) ...@@ -210,7 +221,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
trn->min_read_from= active_list_min.next->trid; trn->min_read_from= active_list_min.next->trid;
trn->trid= new_trid(); trn->trid= new_trid();
trn->locks.loid= 0; trn->short_id= 0;
trn->next= &active_list_max; trn->next= &active_list_max;
trn->prev= active_list_max.prev; trn->prev= active_list_max.prev;
...@@ -242,6 +253,15 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) ...@@ -242,6 +253,15 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
/* /*
remove a trn from the active list. remove a trn from the active list.
if necessary - move to committed list and set commit_trid if necessary - move to committed list and set commit_trid
NOTE
Locks are released at the end. In particular, after placing the
transaction in commit list, and after setting commit_trid. It's
important, as commit_trid affects visibility. Locks don't affect
anything they simply delay execution of other threads - they could be
released arbitrarily late. In other words, when locks are released it
serves as a start banner for other threads, they start to run. So
everything they may need must be ready at that point.
*/ */
void trnman_end_trn(TRN *trn, my_bool commit) void trnman_end_trn(TRN *trn, my_bool commit)
{ {
...@@ -305,7 +325,7 @@ void trnman_end_trn(TRN *trn, my_bool commit) ...@@ -305,7 +325,7 @@ void trnman_end_trn(TRN *trn, my_bool commit)
trn->locks.mutex= 0; trn->locks.mutex= 0;
trn->locks.cond= 0; trn->locks.cond= 0;
my_atomic_rwlock_rdlock(&LOCK_short_trid_to_trn); my_atomic_rwlock_rdlock(&LOCK_short_trid_to_trn);
my_atomic_storeptr((void **)&short_trid_to_active_trn[trn->locks.loid], 0); my_atomic_storeptr((void **)&short_trid_to_active_trn[trn->short_id], 0);
my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn); my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn);
while (free_me) // XXX send them to the purge thread while (free_me) // XXX send them to the purge thread
...@@ -325,9 +345,13 @@ void trnman_end_trn(TRN *trn, my_bool commit) ...@@ -325,9 +345,13 @@ void trnman_end_trn(TRN *trn, my_bool commit)
/* /*
free a trn (add to the pool, that is) free a trn (add to the pool, that is)
note - we can never really free() a TRN if there's at least one note - we can never really free() a TRN if there's at least one other
other running transaction - see, e.g., how lock waits are implemented running transaction - see, e.g., how lock waits are implemented in
in lockman.c lockman.c
The same is true for other lock-free data structures too. We may need some
kind of FLUSH command to reset them all - ensuring that no transactions are
running. It may even be called automatically on checkpoints if no
transactions are running.
*/ */
void trnman_free_trn(TRN *trn) void trnman_free_trn(TRN *trn)
{ {
......
...@@ -22,9 +22,17 @@ ...@@ -22,9 +22,17 @@
typedef uint64 TrID; /* our TrID is 6 bytes */ typedef uint64 TrID; /* our TrID is 6 bytes */
typedef struct st_transaction TRN; typedef struct st_transaction TRN;
/*
trid - 6 byte transaction identifier. Assigned when a transaction
is created. Transaction can always be identified by its trid,
even after transaction has ended.
short_trid - 2-byte transaction identifier, identifies a running
transaction, is reassigned when transaction ends.
*/
struct st_transaction struct st_transaction
{ {
LOCK_OWNER locks; LOCK_OWNER locks; /* must be the first! see short_trid_to_TRN() */
LF_PINS *pins; LF_PINS *pins;
TrID trid, min_read_from, commit_trid; TrID trid, min_read_from, commit_trid;
TRN *next, *prev; TRN *next, *prev;
......
...@@ -123,16 +123,20 @@ void run_test(const char *test, pthread_handler handler, int n, int m) ...@@ -123,16 +123,20 @@ void run_test(const char *test, pthread_handler handler, int n, int m)
thread_number= timeouts= 0; thread_number= timeouts= 0;
litmus= 0; litmus= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m); diag("Running %s with %d threads, %d iterations... ", test, n, m);
for (rt_num_threads= n ; n ; n--) for (rt_num_threads= n ; n ; n--)
if (pthread_create(&t, &rt_attr, handler, &m)) if (pthread_create(&t, &rt_attr, handler, &m))
abort(); {
diag("Could not create thread");
litmus++;
rt_num_threads--;
}
pthread_mutex_lock(&rt_mutex); pthread_mutex_lock(&rt_mutex);
while (rt_num_threads) while (rt_num_threads)
pthread_cond_wait(&rt_cond, &rt_mutex); pthread_cond_wait(&rt_cond, &rt_mutex);
pthread_mutex_unlock(&rt_mutex); pthread_mutex_unlock(&rt_mutex);
now= my_getsystime()-now; now= my_getsystime()-now;
ok(litmus == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, litmus); ok(litmus == 0, "Finished %s in %g secs (%d)", test, ((double)now)/1e7, litmus);
} }
int Nrows= 100; int Nrows= 100;
...@@ -266,13 +270,13 @@ int main() ...@@ -266,13 +270,13 @@ int main()
Nrows= 100; Nrows= 100;
Ntables= 10; Ntables= 10;
table_lock_ratio= 10; table_lock_ratio= 10;
run_test("lockman", test_lockman, THREADS, CYCLES); run_test("\"random lock\" stress test", test_lockman, THREADS, CYCLES);
/* "real-life" simulation - many rows, no table locks */ /* "real-life" simulation - many rows, no table locks */
Nrows= 1000000; Nrows= 1000000;
Ntables= 10; Ntables= 10;
table_lock_ratio= 0; table_lock_ratio= 0;
run_test("lockman", test_lockman, THREADS, 10000); run_test("\"real-life\" simulation test", test_lockman, THREADS, CYCLES*10);
for (i= 0; i < Nlos; i++) for (i= 0; i < Nlos; i++)
{ {
......
...@@ -52,14 +52,21 @@ pthread_handler_t test_trnman(void *arg) ...@@ -52,14 +52,21 @@ pthread_handler_t test_trnman(void *arg)
y= x= (x*3628273133 + 1500450271) % 9576890767; /* three prime numbers */ y= x= (x*3628273133 + 1500450271) % 9576890767; /* three prime numbers */
m-= n= x % MAX_ITER; m-= n= x % MAX_ITER;
for (i= 0; i < n; i++) for (i= 0; i < n; i++)
{
trn[i]= trnman_new_trn(&mutexes[i], &conds[i]); trn[i]= trnman_new_trn(&mutexes[i], &conds[i]);
if (!trn[i])
{
diag("trnman_new_trn() failed");
litmus++;
}
}
for (i= 0; i < n; i++) for (i= 0; i < n; i++)
{ {
y= (y*19 + 7) % 31; y= (y*19 + 7) % 31;
trnman_end_trn(trn[i], y & 1); trnman_end_trn(trn[i], y & 1);
} }
} }
end:
for (i= 0; i < MAX_ITER; i++) for (i= 0; i < MAX_ITER; i++)
{ {
pthread_mutex_destroy(&mutexes[i]); pthread_mutex_destroy(&mutexes[i]);
...@@ -85,7 +92,11 @@ void run_test(const char *test, pthread_handler handler, int n, int m) ...@@ -85,7 +92,11 @@ void run_test(const char *test, pthread_handler handler, int n, int m)
diag("Testing %s with %d threads, %d iterations... ", test, n, m); diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (rt_num_threads= n ; n ; n--) for (rt_num_threads= n ; n ; n--)
if (pthread_create(&t, &rt_attr, handler, &m)) if (pthread_create(&t, &rt_attr, handler, &m))
abort(); {
diag("Could not create thread");
litmus++;
rt_num_threads--;
}
pthread_mutex_lock(&rt_mutex); pthread_mutex_lock(&rt_mutex);
while (rt_num_threads) while (rt_num_threads)
pthread_cond_wait(&rt_cond, &rt_mutex); pthread_cond_wait(&rt_cond, &rt_mutex);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment