Commit 74d050d0 authored by unknown's avatar unknown

maria transaction manager with unit tests


include/lf.h:
  few lf API changes
mysys/lf_alloc-pin.c:
  few lf API changes
mysys/lf_dynarray.c:
  few lf API changes
mysys/lf_hash.c:
  few lf API changes
storage/maria/Makefile.am:
  transaction manager
unittest/Makefile.am:
  maria transaction manager
unittest/mysys/my_atomic-t.c:
  ensure that values are positive
storage/maria/trxman.h:
  New BitKeeper file ``storage/maria/trxman.h''
unittest/maria/Makefile.am:
  New BitKeeper file ``unittest/maria/Makefile.am''
unittest/maria/trxman-t.c:
  New BitKeeper file ``unittest/maria/trxman-t.c''
storage/maria/trxman.c:
  comment clarified
parent d1a8a2c7
...@@ -2537,7 +2537,7 @@ AC_SUBST(MAKE_BINARY_DISTRIBUTION_OPTIONS) ...@@ -2537,7 +2537,7 @@ AC_SUBST(MAKE_BINARY_DISTRIBUTION_OPTIONS)
# Output results # Output results
AC_CONFIG_FILES(Makefile extra/Makefile mysys/Makefile dnl AC_CONFIG_FILES(Makefile extra/Makefile mysys/Makefile dnl
unittest/Makefile unittest/mytap/Makefile unittest/mytap/t/Makefile dnl unittest/Makefile unittest/mytap/Makefile unittest/mytap/t/Makefile dnl
unittest/mysys/Makefile unittest/examples/Makefile dnl unittest/mysys/Makefile unittest/examples/Makefile unittest/maria/Makefile dnl
strings/Makefile regex/Makefile storage/Makefile dnl strings/Makefile regex/Makefile storage/Makefile dnl
man/Makefile BUILD/Makefile vio/Makefile dnl man/Makefile BUILD/Makefile vio/Makefile dnl
libmysql/Makefile client/Makefile dnl libmysql/Makefile client/Makefile dnl
......
...@@ -66,7 +66,7 @@ typedef struct { ...@@ -66,7 +66,7 @@ typedef struct {
typedef int (*lf_dynarray_func)(void *, void *); typedef int (*lf_dynarray_func)(void *, void *);
void lf_dynarray_init(LF_DYNARRAY *array, uint element_size); void lf_dynarray_init(LF_DYNARRAY *array, uint element_size);
void lf_dynarray_end(LF_DYNARRAY *array); void lf_dynarray_destroy(LF_DYNARRAY *array);
nolock_wrap(lf_dynarray_nr, int, nolock_wrap(lf_dynarray_nr, int,
(LF_DYNARRAY *array, void *el), (LF_DYNARRAY *array, void *el),
...@@ -139,15 +139,15 @@ typedef struct { ...@@ -139,15 +139,15 @@ typedef struct {
#define _lf_unpin(PINS, PIN) _lf_pin(PINS, PIN, NULL) #define _lf_unpin(PINS, PIN) _lf_pin(PINS, PIN, NULL)
#define lf_pin(PINS, PIN, ADDR) \ #define lf_pin(PINS, PIN, ADDR) \
do { \ do { \
lf_lock_pins(PINS); \ lf_lock_by_pins(PINS); \
_lf_pin(PINS, PIN, ADDR); \ _lf_pin(PINS, PIN, ADDR); \
lf_unlock_pins(PINS); \ lf_unlock_by_pins(PINS); \
} while (0) } while (0)
#define lf_unpin(PINS, PIN) lf_pin(PINS, PIN, NULL) #define lf_unpin(PINS, PIN) lf_pin(PINS, PIN, NULL)
void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func, void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func,
void * free_func_arg); void * free_func_arg);
void lf_pinbox_end(LF_PINBOX *pinbox); void lf_pinbox_destroy(LF_PINBOX *pinbox);
lock_wrap(lf_pinbox_get_pins, LF_PINS *, lock_wrap(lf_pinbox_get_pins, LF_PINS *,
(LF_PINBOX *pinbox), (LF_PINBOX *pinbox),
...@@ -180,7 +180,7 @@ typedef struct st_lf_allocator { ...@@ -180,7 +180,7 @@ typedef struct st_lf_allocator {
} LF_ALLOCATOR; } LF_ALLOCATOR;
void lf_alloc_init(LF_ALLOCATOR *allocator, uint size); void lf_alloc_init(LF_ALLOCATOR *allocator, uint size);
void lf_alloc_end(LF_ALLOCATOR *allocator); void lf_alloc_destroy(LF_ALLOCATOR *allocator);
uint lf_alloc_in_pool(LF_ALLOCATOR *allocator); uint lf_alloc_in_pool(LF_ALLOCATOR *allocator);
#define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR)) #define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR))
#define lf_alloc_free(PINS, PTR) lf_pinbox_free((PINS), (PTR)) #define lf_alloc_free(PINS, PTR) lf_pinbox_free((PINS), (PTR))
...@@ -216,10 +216,10 @@ typedef struct { ...@@ -216,10 +216,10 @@ typedef struct {
void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
uint key_offset, uint key_length, hash_get_key get_key, uint key_offset, uint key_length, hash_get_key get_key,
CHARSET_INFO *charset); CHARSET_INFO *charset);
void lf_hash_end(LF_HASH *hash); void lf_hash_destroy(LF_HASH *hash);
int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data); int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data);
int lf_hash_search(LF_HASH *hash, LF_PINS *pins, const uchar *key, uint keylen); void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen);
int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const uchar *key, uint keylen); int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen);
#define _lf_hash_get_pins(HASH) _lf_alloc_get_pins(&(HASH)->alloc) #define _lf_hash_get_pins(HASH) _lf_alloc_get_pins(&(HASH)->alloc)
#define lf_hash_get_pins(HASH) lf_alloc_get_pins(&(HASH)->alloc) #define lf_hash_get_pins(HASH) lf_alloc_get_pins(&(HASH)->alloc)
#define _lf_hash_put_pins(PINS) _lf_pinbox_put_pins(PINS) #define _lf_hash_put_pins(PINS) _lf_pinbox_put_pins(PINS)
......
...@@ -54,9 +54,9 @@ void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func, ...@@ -54,9 +54,9 @@ void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func,
pinbox->free_func_arg=free_func_arg; pinbox->free_func_arg=free_func_arg;
} }
void lf_pinbox_end(LF_PINBOX *pinbox) void lf_pinbox_destroy(LF_PINBOX *pinbox)
{ {
lf_dynarray_end(&pinbox->pinstack); lf_dynarray_destroy(&pinbox->pinstack);
} }
LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox) LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox)
...@@ -292,7 +292,7 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size) ...@@ -292,7 +292,7 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size)
DBUG_ASSERT(size >= (int)sizeof(void *)); DBUG_ASSERT(size >= (int)sizeof(void *));
} }
void lf_alloc_end(LF_ALLOCATOR *allocator) void lf_alloc_destroy(LF_ALLOCATOR *allocator)
{ {
void *el=allocator->top; void *el=allocator->top;
while (el) while (el)
...@@ -301,7 +301,7 @@ void lf_alloc_end(LF_ALLOCATOR *allocator) ...@@ -301,7 +301,7 @@ void lf_alloc_end(LF_ALLOCATOR *allocator)
my_free(el, MYF(0)); my_free(el, MYF(0));
el=tmp; el=tmp;
} }
lf_pinbox_end(&allocator->pinbox); lf_pinbox_destroy(&allocator->pinbox);
allocator->top=0; allocator->top=0;
} }
......
...@@ -57,7 +57,7 @@ static void recursive_free(void **alloc, int level) ...@@ -57,7 +57,7 @@ static void recursive_free(void **alloc, int level)
my_free(alloc[-1], MYF(0)); my_free(alloc[-1], MYF(0));
} }
void lf_dynarray_end(LF_DYNARRAY *array) void lf_dynarray_destroy(LF_DYNARRAY *array)
{ {
int i; int i;
for (i=0; i < LF_DYNARRAY_LEVELS; i++) for (i=0; i < LF_DYNARRAY_LEVELS; i++)
......
...@@ -228,14 +228,14 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, ...@@ -228,14 +228,14 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
hash->count=0; hash->count=0;
hash->element_size=element_size; hash->element_size=element_size;
hash->flags=flags; hash->flags=flags;
hash->charset=charset; hash->charset=charset ? charset : &my_charset_bin;
hash->key_offset=key_offset; hash->key_offset=key_offset;
hash->key_length=key_length; hash->key_length=key_length;
hash->get_key=get_key; hash->get_key=get_key;
DBUG_ASSERT(get_key ? !key_offset && !key_length : key_length); DBUG_ASSERT(get_key ? !key_offset && !key_length : key_length);
} }
void lf_hash_end(LF_HASH *hash) void lf_hash_destroy(LF_HASH *hash)
{ {
LF_SLIST *el=*(LF_SLIST **)_lf_dynarray_lvalue(&hash->array, 0); LF_SLIST *el=*(LF_SLIST **)_lf_dynarray_lvalue(&hash->array, 0);
while (el) while (el)
...@@ -244,109 +244,89 @@ void lf_hash_end(LF_HASH *hash) ...@@ -244,109 +244,89 @@ void lf_hash_end(LF_HASH *hash)
lf_alloc_real_free(&hash->alloc, el); lf_alloc_real_free(&hash->alloc, el);
el=(LF_SLIST *)next; el=(LF_SLIST *)next;
} }
lf_alloc_end(&hash->alloc); lf_alloc_destroy(&hash->alloc);
lf_dynarray_end(&hash->array); lf_dynarray_destroy(&hash->array);
} }
/* /*
RETURN
0 - inserted
1 - didn't (unique key conflict)
NOTE NOTE
see linsert() for pin usage see linsert() for pin usage notes
*/ */
int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data) int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
{ {
uint csize, bucket, hashnr, keylen; uint csize, bucket, hashnr;
LF_SLIST *node, * volatile *el; LF_SLIST *node, * volatile *el;
const uchar *key;
key= hash_key(hash, data, &keylen);
hashnr= calc_hash(hash, key, keylen);
bucket= hashnr % hash->size;
lf_lock_by_pins(pins); lf_lock_by_pins(pins);
node=(LF_SLIST *)_lf_alloc_new(pins); node=(LF_SLIST *)_lf_alloc_new(pins);
memcpy(node+1, data, hash->element_size); memcpy(node+1, data, hash->element_size);
node->key= hash_key(hash, node+1, &node->keylen);
hashnr= calc_hash(hash, node->key, node->keylen);
bucket= hashnr % hash->size;
el=_lf_dynarray_lvalue(&hash->array, bucket); el=_lf_dynarray_lvalue(&hash->array, bucket);
if (*el == NULL) if (*el == NULL)
initialize_bucket(hash, el, bucket, pins); initialize_bucket(hash, el, bucket, pins);
node->hashnr=my_reverse_bits(hashnr) | 1; node->hashnr=my_reverse_bits(hashnr) | 1;
node->key=((char *)(node+1))+(key-(uchar *)data);
node->keylen=keylen;
if (linsert(el, node, pins, hash->flags)) if (linsert(el, node, pins, hash->flags))
{ {
_lf_alloc_free(pins, node); _lf_alloc_free(pins, node);
lf_unlock_by_pins(pins); lf_unlock_by_pins(pins);
return 0; return 1;
} }
csize= hash->size; csize= hash->size;
if ((my_atomic_add32(&hash->count, 1)+1.0) / csize > MAX_LOAD) if ((my_atomic_add32(&hash->count, 1)+1.0) / csize > MAX_LOAD)
my_atomic_cas32(&hash->size, &csize, csize*2); my_atomic_cas32(&hash->size, &csize, csize*2);
#if 0
node=*(LF_SLIST **)_lf_dynarray_lvalue(&hash->array, 0);
hashnr=0;
while (node)
{
assert (node->hashnr >= hashnr);
hashnr=node->hashnr;
node=(LF_SLIST *)node->link;
}
#endif
lf_unlock_by_pins(pins); lf_unlock_by_pins(pins);
return 1; return 0;
} }
/* /*
RETURN
0 - deleted
1 - didn't (not found)
NOTE NOTE
see ldelete() for pin usage see ldelete() for pin usage notes
*/ */
int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const uchar *key, uint keylen) int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
{ {
LF_SLIST * volatile *el; LF_SLIST * volatile *el;
uint bucket, hashnr=calc_hash(hash, key, keylen); uint bucket, hashnr=calc_hash(hash, (uchar *)key, keylen);
bucket= hashnr % hash->size; bucket= hashnr % hash->size;
lf_lock_by_pins(pins); lf_lock_by_pins(pins);
el=_lf_dynarray_lvalue(&hash->array, bucket); el=_lf_dynarray_lvalue(&hash->array, bucket);
if (*el == NULL) if (*el == NULL)
initialize_bucket(hash, el, bucket, pins); initialize_bucket(hash, el, bucket, pins);
if (ldelete(el, my_reverse_bits(hashnr) | 1, key, keylen, pins)) if (ldelete(el, my_reverse_bits(hashnr) | 1, (uchar *)key, keylen, pins))
{ {
lf_unlock_by_pins(pins); lf_unlock_by_pins(pins);
return 0; return 1;
} }
my_atomic_add32(&hash->count, -1); my_atomic_add32(&hash->count, -1);
#if 0
{
LF_SLIST *node=*(LF_SLIST **)_lf_dynarray_lvalue(&hash->array, 0);
hashnr=0;
while (node)
{
assert (node->hashnr >= hashnr);
hashnr=node->hashnr;
node=(LF_SLIST *)node->link;
}
}
#endif
lf_unlock_by_pins(pins); lf_unlock_by_pins(pins);
return 1; return 0;
} }
/* /*
NOTE NOTE
see lsearch() for pin usage see lsearch() for pin usage notes
*/ */
int lf_hash_search(LF_HASH *hash, LF_PINS *pins, const uchar *key, uint keylen) void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
{ {
int res; LF_SLIST * volatile *el, *found;
LF_SLIST * volatile *el; uint bucket, hashnr=calc_hash(hash, (uchar *)key, keylen);
uint bucket, hashnr=calc_hash(hash, key, keylen);
bucket= hashnr % hash->size; bucket= hashnr % hash->size;
lf_lock_by_pins(pins); lf_lock_by_pins(pins);
el=_lf_dynarray_lvalue(&hash->array, bucket); el=_lf_dynarray_lvalue(&hash->array, bucket);
if (*el == NULL) if (*el == NULL)
initialize_bucket(hash, el, bucket, pins); initialize_bucket(hash, el, bucket, pins);
res=NULL != lsearch(el, my_reverse_bits(hashnr) | 1, key, keylen, pins); found= lsearch(el, my_reverse_bits(hashnr) | 1, (uchar *)key, keylen, pins);
lf_unlock_by_pins(pins); lf_unlock_by_pins(pins);
return res; return found+1;
} }
static void initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node, static void initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
......
...@@ -28,7 +28,9 @@ bin_PROGRAMS = maria_chk maria_pack maria_ftdump ...@@ -28,7 +28,9 @@ bin_PROGRAMS = maria_chk maria_pack maria_ftdump
maria_chk_DEPENDENCIES= $(LIBRARIES) maria_chk_DEPENDENCIES= $(LIBRARIES)
maria_pack_DEPENDENCIES=$(LIBRARIES) maria_pack_DEPENDENCIES=$(LIBRARIES)
noinst_PROGRAMS = ma_test1 ma_test2 ma_test3 ma_rt_test ma_sp_test noinst_PROGRAMS = ma_test1 ma_test2 ma_test3 ma_rt_test ma_sp_test
noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h ma_sp_defs.h ma_fulltext.h ma_ftdefs.h ma_ft_test1.h ma_ft_eval.h noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \
ma_sp_defs.h ma_fulltext.h ma_ftdefs.h ma_ft_test1.h \
ma_ft_eval.h trxman.h
ma_test1_DEPENDENCIES= $(LIBRARIES) ma_test1_DEPENDENCIES= $(LIBRARIES)
ma_test2_DEPENDENCIES= $(LIBRARIES) ma_test2_DEPENDENCIES= $(LIBRARIES)
ma_test3_DEPENDENCIES= $(LIBRARIES) ma_test3_DEPENDENCIES= $(LIBRARIES)
...@@ -53,7 +55,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \ ...@@ -53,7 +55,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \
ma_ft_update.c ma_ft_boolean_search.c \ ma_ft_update.c ma_ft_boolean_search.c \
ma_ft_nlq_search.c ft_maria.c ma_sort.c \ ma_ft_nlq_search.c ft_maria.c ma_sort.c \
ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \ ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \
ma_sp_key.c ma_sp_key.c trxman.c
CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA? CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA?
DEFS = DEFS =
......
#include <my_global.h>
#include <my_sys.h>
#include <lf.h>
#include "trxman.h"
TRX active_list_min, active_list_max,
committed_list_min, committed_list_max, *pool;
pthread_mutex_t LOCK_trx_list;
uint active_transactions;
TrID global_trid_generator;
TRX **short_id_to_trx;
my_atomic_rwlock_t LOCK_short_id_to_trx;
LF_HASH trid_to_trx;
static byte *trx_get_hash_key(const byte *trx,uint* len, my_bool unused)
{
*len= sizeof(TrID);
return (byte *) & ((*((TRX **)trx))->trid);
}
int trxman_init()
{
pthread_mutex_init(&LOCK_trx_list, MY_MUTEX_INIT_FAST);
active_list_max.trid= active_list_min.trid= 0;
active_list_max.min_read_from=~0;
active_list_max.next= active_list_min.prev= 0;
active_list_max.prev= &active_list_min;
active_list_min.next= &active_list_max;
active_transactions= 0;
committed_list_max.commit_trid= ~0;
committed_list_max.next= committed_list_min.prev= 0;
committed_list_max.prev= &committed_list_min;
committed_list_min.next= &committed_list_max;
pool=0;
global_trid_generator=0; /* set later by recovery code */
lf_hash_init(&trid_to_trx, sizeof(TRX*), LF_HASH_UNIQUE,
0, 0, trx_get_hash_key, 0);
my_atomic_rwlock_init(&LOCK_short_id_to_trx);
short_id_to_trx=(TRX **)my_malloc(SHORT_ID_MAX*sizeof(TRX*),
MYF(MY_WME|MY_ZEROFILL));
if (!short_id_to_trx)
return 1;
short_id_to_trx--; /* min short_id is 1 */
return 0;
}
int trxman_destroy()
{
DBUG_ASSERT(trid_to_trx.count == 0);
DBUG_ASSERT(active_transactions == 0);
DBUG_ASSERT(active_list_max.prev == &active_list_min);
DBUG_ASSERT(active_list_min.next == &active_list_max);
DBUG_ASSERT(committed_list_max.prev == &committed_list_min);
DBUG_ASSERT(committed_list_min.next == &committed_list_max);
while (pool)
{
TRX *tmp=pool->next;
my_free(pool, MYF(0));
pool=tmp;
}
lf_hash_destroy(&trid_to_trx);
pthread_mutex_destroy(&LOCK_trx_list);
my_atomic_rwlock_destroy(&LOCK_short_id_to_trx);
my_free((void *)(short_id_to_trx+1), MYF(0));
}
static TrID new_trid()
{
DBUG_ASSERT(global_trid_generator < 0xffffffffffffLL);
safe_mutex_assert_owner(&LOCK_trx_list);
return ++global_trid_generator;
}
static void set_short_id(TRX *trx)
{
int i= (global_trid_generator + (intptr)trx) * 312089 % SHORT_ID_MAX;
my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx);
for ( ; ; i= i % SHORT_ID_MAX + 1) /* the range is [1..SHORT_ID_MAX] */
{
void *tmp=NULL;
if (short_id_to_trx[i] == NULL &&
my_atomic_casptr((void **)&short_id_to_trx[i], &tmp, trx))
break;
}
my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx);
trx->short_id= i;
}
extern int global_malloc;
TRX *trxman_new_trx()
{
TRX *trx;
my_atomic_add32(&active_transactions, 1);
/*
we need a mutex here to ensure that
transactions in the active list are ordered by the trid.
So, incrementing global_trid_generator and
adding to the list must be atomic.
and as we have a mutex, we can as well do everything
under it - allocating a TRX, incrementing active_transactions,
setting trx->min_read_from.
Note that all the above is fast. generating short_id may be slow,
as it involves scanning a big array - so it's still done
outside of the mutex.
*/
pthread_mutex_lock(&LOCK_trx_list);
trx=pool;
while (trx && !my_atomic_casptr((void **)&pool, (void **)&trx, trx->next))
/* no-op */;
if (!trx)
{
trx=(TRX *)my_malloc(sizeof(TRX), MYF(MY_WME));
global_malloc++;
}
if (!trx)
return 0;
trx->min_read_from= active_list_min.next->trid;
trx->trid= new_trid();
trx->short_id= 0;
trx->next= &active_list_max;
trx->prev= active_list_max.prev;
active_list_max.prev= trx->prev->next= trx;
pthread_mutex_unlock(&LOCK_trx_list);
trx->pins=lf_hash_get_pins(&trid_to_trx);
if (!trx->min_read_from)
trx->min_read_from= trx->trid;
trx->commit_trid=0;
set_short_id(trx); /* this must be the last! */
return trx;
}
/*
remove a trx from the active list,
move to committed list,
set commit_trid
TODO
integrate with lock manager, log manager. That means:
a common "commit" mutex - forcing the log and setting commit_trid
must be done atomically (QQ how the heck it could be done with
group commit ???)
trid_to_trx, active_list_*, and committed_list_* can be
updated asyncronously.
*/
void trxman_end_trx(TRX *trx, my_bool commit)
{
int res;
TRX *free_me= 0;
LF_PINS *pins= trx->pins;
pthread_mutex_lock(&LOCK_trx_list);
trx->next->prev= trx->prev;
trx->prev->next= trx->next;
if (trx->prev == &active_list_min)
{
TRX *t;
for (t= committed_list_min.next;
t->commit_trid < active_list_min.next->min_read_from;
t= t->next) /* no-op */;
if (t != committed_list_min.next)
{
free_me= committed_list_min.next;
committed_list_min.next= t;
t->prev->next=0;
t->prev= &committed_list_min;
}
}
my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx);
my_atomic_storeptr((void **)&short_id_to_trx[trx->short_id], 0);
my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx);
if (commit && active_list_min.next != &active_list_max)
{
trx->commit_trid= global_trid_generator;
trx->next= &committed_list_max;
trx->prev= committed_list_max.prev;
committed_list_max.prev= trx->prev->next= trx;
res= lf_hash_insert(&trid_to_trx, pins, &trx);
DBUG_ASSERT(res == 0);
}
else
{
trx->next=free_me;
free_me=trx;
}
pthread_mutex_unlock(&LOCK_trx_list);
my_atomic_add32(&active_transactions, -1);
while (free_me)
{
int res;
TRX *t= free_me;
free_me= free_me->next;
res= lf_hash_delete(&trid_to_trx, pins, &t->trid, sizeof(TrID));
trxman_free_trx(t);
}
lf_hash_put_pins(pins);
}
/* free a trx (add to the pool, that is */
void trxman_free_trx(TRX *trx)
{
TRX *tmp=pool;
do
{
trx->next=tmp;
} while (!my_atomic_casptr((void **)&pool, (void **)&tmp, trx));
}
my_bool trx_can_read_from(TRX *trx, TrID trid)
{
TRX *found;
my_bool can;
if (trid < trx->min_read_from)
return TRUE;
if (trid > trx->trid)
return FALSE;
found= lf_hash_search(&trid_to_trx, trx->pins, &trid, sizeof(trid));
if (!found)
return FALSE; /* not in the hash = cannot read */
can= found->commit_trid < trx->trid;
lf_unpin(trx->pins, 2);
return can;
}
typedef uint64 TrID; /* our TrID is 6 bytes */
typedef struct st_transaction
{
TrID trid, min_read_from, commit_trid;
struct st_transaction *next, *prev;
/* Note! if short_id is 0, trx is NOT initialized */
uint16 short_id;
LF_PINS *pins;
} TRX;
#define SHORT_ID_MAX 65535
extern uint active_transactions;
extern TRX **short_id_to_trx;
extern my_atomic_rwlock_t LOCK_short_id_to_trx;
int trxman_init();
int trxman_end();
TRX *trxman_new_trx();
void trxman_end_trx(TRX *trx, my_bool commit);
#define trxman_commit_trx(T) trxman_end_trx(T, TRUE)
#define trxman_abort_trx(T) trxman_end_trx(T, FALSE)
void trxman_free_trx(TRX *trx);
my_bool trx_can_read_from(TRX *trx, TrID trid);
SUBDIRS = mytap . mysys examples SUBDIRS = mytap mysys maria examples
noinst_SCRIPTS = unit noinst_SCRIPTS = unit
EXTRA_DIST = unit.pl EXTRA_DIST = unit.pl
CLEANFILES = unit CLEANFILES = unit
unittests = mytap mysys unittests = mytap mysys maria
test: unit test: unit
./unit run $(unittests) ./unit run $(unittests)
......
AM_CPPFLAGS = @ZLIB_INCLUDES@ -I$(top_builddir)/include
AM_CPPFLAGS += -I$(top_srcdir)/include -I$(top_srcdir)/unittest/mytap
LDADD = $(top_builddir)/unittest/mytap/libmytap.a \
$(top_builddir)/storage/maria/libmaria.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/strings/libmystrings.a
noinst_PROGRAMS = trxman-t
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <tap.h>
#include <my_global.h>
#include <my_sys.h>
#include <my_atomic.h>
#include <lf.h>
#include "../../storage/maria/trxman.h"
pthread_attr_t rt_attr;
pthread_mutex_t rt_mutex;
pthread_cond_t rt_cond;
int rt_num_threads;
int litmus;
/* template for a test: the goal is to have litmus==0 if the test passed
#define ITER nnn
pthread_handler_t test_XXXXXXXX(void *arg)
{
int m=(*(int *)arg)/ITER, x;
for (x=((int)(intptr)(&m)); m ; m--)
{
// do something with litmus
}
// do something more with litmus
pthread_mutex_lock(&rt_mutex);
rt_num_threads--;
if (!rt_num_threads)
{
diag("whatever diagnostics we want", blabla, foobar);
pthread_cond_signal(&rt_cond);
}
pthread_mutex_unlock(&rt_mutex);
return 0;
}
#undef ITER
*/
/*
create and end (commit or rollback) transactions randomly
*/
#define MAX_ITER 100
pthread_handler_t test_trxman(void *arg)
{
int m=(*(int *)arg);
uint x, y, i, j, n;
TRX *trx[MAX_ITER];
for (x=((int)(intptr)(&m)); m > 0; )
{
y= x= (x*3628273133 + 1500450271) % 9576890767; /* three prime numbers */
m-= n= x % MAX_ITER;
for (i=0; i < n; i++)
trx[i]=trxman_new_trx();
for (i=0; i < n; i++)
{
y=(y*19 + 7) % 31;
trxman_end_trx(trx[i], y & 1);
}
}
pthread_mutex_lock(&rt_mutex);
rt_num_threads--;
if (!rt_num_threads)
pthread_cond_signal(&rt_cond);
pthread_mutex_unlock(&rt_mutex);
return 0;
}
#undef MAX_ITER
void run_test(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
ulonglong now=my_getsystime();
litmus= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (rt_num_threads=n ; n ; n--)
pthread_create(&t, &rt_attr, handler, &m);
pthread_mutex_lock(&rt_mutex);
while (rt_num_threads)
pthread_cond_wait(&rt_cond, &rt_mutex);
pthread_mutex_unlock(&rt_mutex);
now=my_getsystime()-now;
ok(litmus == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, litmus);
}
int global_malloc=0;
int main()
{
plan(1);
if (my_atomic_initialize())
return exit_status();
my_init();
pthread_attr_init(&rt_attr);
pthread_attr_setdetachstate(&rt_attr,PTHREAD_CREATE_DETACHED);
pthread_mutex_init(&rt_mutex, 0);
pthread_cond_init(&rt_cond, 0);
#define CYCLES 10000
#define THREADS 10
trxman_init();
run_test("trxman", test_trxman, THREADS,CYCLES);
trxman_destroy();
diag("mallocs: %d\n", global_malloc);
pthread_mutex_destroy(&rt_mutex);
pthread_cond_destroy(&rt_cond);
pthread_attr_destroy(&rt_attr);
my_end(0);
return exit_status();
}
...@@ -14,9 +14,6 @@ ...@@ -14,9 +14,6 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
//#define MY_ATOMIC_MODE_RWLOCKS
//#define MY_ATOMIC_MODE_DUMMY
#include <tap.h> #include <tap.h>
#include <my_global.h> #include <my_global.h>
...@@ -41,7 +38,7 @@ pthread_handler_t test_atomic_add_handler(void *arg) ...@@ -41,7 +38,7 @@ pthread_handler_t test_atomic_add_handler(void *arg)
int32 x; int32 x;
for (x=((int)(intptr)(&m)); m ; m--) for (x=((int)(intptr)(&m)); m ; m--)
{ {
x=x*m+0x87654321; x=(x*m+0x87654321) & INT_MAX32;
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, x); my_atomic_add32(&a32, x);
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
...@@ -111,7 +108,7 @@ pthread_handler_t test_atomic_cas_handler(void *arg) ...@@ -111,7 +108,7 @@ pthread_handler_t test_atomic_cas_handler(void *arg)
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
y=my_atomic_load32(&a32); y=my_atomic_load32(&a32);
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
x=x*m+0x87654321; x=(x*m+0x87654321) & INT_MAX32;
do { do {
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
ok=my_atomic_cas32(&a32, &y, y+x); ok=my_atomic_cas32(&a32, &y, y+x);
...@@ -171,7 +168,7 @@ pthread_handler_t test_lf_alloc(void *arg) ...@@ -171,7 +168,7 @@ pthread_handler_t test_lf_alloc(void *arg)
for (x=((int)(intptr)(&m)); m ; m--) for (x=((int)(intptr)(&m)); m ; m--)
{ {
TLA *node1, *node2; TLA *node1, *node2;
x=x*m+0x87654321; x=(x*m+0x87654321) & INT_MAX32;
node1=(TLA *)lf_alloc_new(pins); node1=(TLA *)lf_alloc_new(pins);
node1->data=x; node1->data=x;
y+=node1->data; y+=node1->data;
...@@ -217,7 +214,7 @@ pthread_handler_t test_lf_hash(void *arg) ...@@ -217,7 +214,7 @@ pthread_handler_t test_lf_hash(void *arg)
y=x; y=x;
for (i=0; i < N_TLH; i++) for (i=0; i < N_TLH; i++)
{ {
x=x*(m+i)+0x87654321; x=(x*(m+i)+0x87654321) & INT_MAX32;
z=(x<0) ? -x : x; z=(x<0) ? -x : x;
if (lf_hash_insert(&lf_hash, pins, &z)) if (lf_hash_insert(&lf_hash, pins, &z))
{ {
...@@ -227,7 +224,7 @@ pthread_handler_t test_lf_hash(void *arg) ...@@ -227,7 +224,7 @@ pthread_handler_t test_lf_hash(void *arg)
} }
for (i=0; i < N_TLH; i++) for (i=0; i < N_TLH; i++)
{ {
y=y*(m+i)+0x87654321; y=(y*(m+i)+0x87654321) & INT_MAX32;
z=(y<0) ? -y : y; z=(y<0) ? -y : y;
if (lf_hash_delete(&lf_hash, pins, (uchar *)&z, sizeof(z))) if (lf_hash_delete(&lf_hash, pins, (uchar *)&z, sizeof(z)))
sum-=z; sum-=z;
...@@ -307,8 +304,8 @@ int main() ...@@ -307,8 +304,8 @@ int main()
test_atomic("lf_alloc", test_lf_alloc, THREADS,CYCLES); test_atomic("lf_alloc", test_lf_alloc, THREADS,CYCLES);
test_atomic("lf_hash", test_lf_hash, THREADS,CYCLES); test_atomic("lf_hash", test_lf_hash, THREADS,CYCLES);
lf_hash_end(&lf_hash); lf_hash_destroy(&lf_hash);
lf_alloc_end(&lf_allocator); lf_alloc_destroy(&lf_allocator);
pthread_mutex_destroy(&mutex); pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond); pthread_cond_destroy(&cond);
pthread_attr_destroy(&thr_attr); pthread_attr_destroy(&thr_attr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment