Commit 8e971a05 authored by unknown's avatar unknown

Merge bk-internal.mysql.com:/home/bk/mysql-maria

into  janus.mylan:/usr/home/serg/Abk/mysql-maria


storage/maria/trnman.c:
  Auto merged
parents 79ff2b03 7ca33ae5
......@@ -47,7 +47,7 @@ typedef struct {pthread_mutex_t rw;} my_atomic_rwlock_t;
#endif
#define make_atomic_add_body(S) int ## S sav; sav= *a; *a+= v; v=sav;
#define make_atomic_swap_body(S) int ## S sav; sav= *a; *a= v; v=sav;
#define make_atomic_fas_body(S) int ## S sav; sav= *a; *a= v; v=sav;
#define make_atomic_cas_body(S) if ((ret= (*a == *cmp))) *a= set; else *cmp=*a;
#define make_atomic_load_body(S) ret= *a;
#define make_atomic_store_body(S) *a= v;
......
......@@ -43,7 +43,7 @@
#define make_atomic_add_body(S) \
asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a))
#endif
#define make_atomic_swap_body(S) \
#define make_atomic_fas_body(S) \
asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a))
#define make_atomic_cas_body(S) \
asm volatile (LOCK_prefix "; cmpxchg %3, %0; setz %2;" \
......
......@@ -43,7 +43,7 @@
_asm setz al \
_asm movzx ret, al \
}
#define make_atomic_swap_body(S) \
#define make_atomic_fas_body(S) \
_asm { \
_asm mov reg_ ## S, v \
_asm xchg *a, reg_ ## S \
......
......@@ -14,6 +14,40 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
This header defines five atomic operations:
my_atomic_add#(&var, what)
add 'what' to *var, and return the old value of *var
my_atomic_fas#(&var, what)
'Fetch And Store'
store 'what' in *var, and return the old value of *var
my_atomic_cas#(&var, &old, new)
'Compare And Swap'
if *var is equal to *old, then store 'new' in *var, and return TRUE
otherwise store *var in *old, and return FALSE
my_atomic_load#(&var)
return *var
my_atomic_store#(&var, what)
store 'what' in *var
'#' is substituted by a size suffix - 8, 16, 32, or ptr
(e.g. my_atomic_add8, my_atomic_fas32, my_atomic_casptr).
NOTE This operations are not always atomic, so they always must be
enclosed in my_atomic_rwlock_rdlock(lock)/my_atomic_rwlock_rdunlock(lock)
or my_atomic_rwlock_wrlock(lock)/my_atomic_rwlock_wrunlock(lock).
Hint: if a code block makes intensive use of atomic ops, it make sense
to take/release rwlock once for the whole block, not for every statement.
On architectures where these operations are really atomic, rwlocks will
be optimized away.
*/
#ifndef my_atomic_rwlock_init
#define intptr void *
......@@ -43,11 +77,11 @@ STATIC_INLINE int ## S my_atomic_add ## S( \
return v; \
}
#define make_atomic_swap(S) \
STATIC_INLINE int ## S my_atomic_swap ## S( \
#define make_atomic_fas(S) \
STATIC_INLINE int ## S my_atomic_fas ## S( \
int ## S volatile *a, int ## S v) \
{ \
make_atomic_swap_body(S); \
make_atomic_fas_body(S); \
return v; \
}
......@@ -80,8 +114,8 @@ STATIC_INLINE void my_atomic_store ## S( \
#define make_atomic_add(S) \
extern int ## S my_atomic_add ## S(int ## S volatile *a, int ## S v);
#define make_atomic_swap(S) \
extern int ## S my_atomic_swap ## S(int ## S volatile *a, int ## S v);
#define make_atomic_fas(S) \
extern int ## S my_atomic_fas ## S(int ## S volatile *a, int ## S v);
#define make_atomic_cas(S) \
extern int my_atomic_cas ## S(int ## S volatile *a, int ## S *cmp, int ## S set);
......@@ -113,10 +147,10 @@ make_atomic_store(16)
make_atomic_store(32)
make_atomic_store(ptr)
make_atomic_swap( 8)
make_atomic_swap(16)
make_atomic_swap(32)
make_atomic_swap(ptr)
make_atomic_fas( 8)
make_atomic_fas(16)
make_atomic_fas(32)
make_atomic_fas(ptr)
#ifdef _atomic_h_cleanup_
#include _atomic_h_cleanup_
......@@ -127,12 +161,12 @@ make_atomic_swap(ptr)
#undef make_atomic_cas
#undef make_atomic_load
#undef make_atomic_store
#undef make_atomic_swap
#undef make_atomic_fas
#undef make_atomic_add_body
#undef make_atomic_cas_body
#undef make_atomic_load_body
#undef make_atomic_store_body
#undef make_atomic_swap_body
#undef make_atomic_fas_body
#undef intptr
#ifndef LF_BACKOFF
......
// TODO multi-pinbox
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
......@@ -17,24 +18,25 @@
/*
wait-free concurrent allocator based on pinning addresses
It works as follows: every thread (strictly speaking - every CPU, but it's
too difficult to do) has a small array of pointers. They're called "pins".
Before using an object its address must be stored in this array (pinned).
When an object is no longer necessary its address must be removed from
this array (unpinned). When a thread wants to free() an object it
scans all pins of all threads to see if somebody has this object pinned.
If yes - the object is not freed (but stored in a purgatory).
To reduce the cost of a single free() pins are not scanned on every free()
but only added to (thread-local) purgatory. On every LF_PURGATORY_SIZE
free() purgatory is scanned and all unpinned objects are freed.
It works as follows: every thread (strictly speaking - every CPU, but
it's too difficult to do) has a small array of pointers. They're called
"pins". Before using an object its address must be stored in this array
(pinned). When an object is no longer necessary its address must be
removed from this array (unpinned). When a thread wants to free() an
object it scans all pins of all threads to see if somebody has this
object pinned. If yes - the object is not freed (but stored in a
"purgatory"). To reduce the cost of a single free() pins are not scanned
on every free() but only added to (thread-local) purgatory. On every
LF_PURGATORY_SIZE free() purgatory is scanned and all unpinned objects
are freed.
Pins are used to solve ABA problem. To use pins one must obey
a pinning protocol:
1. Let's assume that PTR is a shared pointer to an object. Shared means
that any thread may modify it anytime to point to a different object and
free the old object. Later the freed object may be potentially allocated
by another thread. If we're unlucky that another thread may set PTR to
point to this object again. This is ABA problem.
that any thread may modify it anytime to point to a different object
and free the old object. Later the freed object may be potentially
allocated by another thread. If we're unlucky that another thread may
set PTR to point to this object again. This is ABA problem.
2. Create a local pointer LOCAL_PTR.
3. Pin the PTR in a loop:
do
......@@ -42,31 +44,31 @@
LOCAL_PTR= PTR;
pin(PTR, PIN_NUMBER);
} while (LOCAL_PTR != PTR)
4. It is guaranteed that after the loop is ended, LOCAL_PTR
4. It is guaranteed that after the loop has ended, LOCAL_PTR
points to an object (or NULL, if PTR may be NULL), that
will never be freed. It is not guaranteed though
that LOCAL_PTR == PTR
that LOCAL_PTR == PTR (as PTR can change any time)
5. When done working with the object, remove the pin:
unpin(PIN_NUMBER)
6. When copying pins (as in the list:
6. When copying pins (as in the list traversing loop:
pin(CUR, 1);
while ()
{
pin(CUR, 0);
do
{
NEXT=CUR->next;
pin(NEXT, 1);
} while (NEXT != CUR->next);
do // standard
{ // pinning
NEXT=CUR->next; // loop
pin(NEXT, 0); // see #3
} while (NEXT != CUR->next); // above
...
...
pin(CUR, 1);
CUR=NEXT;
pin(CUR, 1); // copy pin[0] to pin[1]
}
which keeps CUR address constantly pinned), note than pins may be copied
only upwards (!!!), that is pin N to pin M > N.
7. Don't keep the object pinned longer than necessary - the number of pins
you have is limited (and small), keeping an object pinned prevents its
reuse and cause unnecessary mallocs.
which keeps CUR address constantly pinned), note than pins may be
copied only upwards (!!!), that is pin[N] to pin[M], M > N.
7. Don't keep the object pinned longer than necessary - the number of
pins you have is limited (and small), keeping an object pinned
prevents its reuse and cause unnecessary mallocs.
Implementation details:
Pins are given away from a "pinbox". Pinbox is stack-based allocator.
......@@ -85,7 +87,7 @@
static void _lf_pinbox_real_free(LF_PINS *pins);
/*
Initialize a pinbox. Must be usually called from lf_alloc_init.
Initialize a pinbox. Normally called from lf_alloc_init.
See the latter for details.
*/
void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
......@@ -214,9 +216,9 @@ static int ptr_cmp(void **a, void **b)
*/
void _lf_pinbox_free(LF_PINS *pins, void *addr)
{
add_to_purgatory(pins, addr);
if (pins->purgatory_count % LF_PURGATORY_SIZE)
_lf_pinbox_real_free(pins);
add_to_purgatory(pins, addr);
}
struct st_harvester {
......
......@@ -26,10 +26,15 @@
Every element is aligned to sizeof(element) boundary
(to avoid false sharing if element is big enough).
LF_DYNARRAY is a recursive structure. On the zero level
LF_DYNARRAY::level[0] it's an array of LF_DYNARRAY_LEVEL_LENGTH elements,
on the first level it's an array of LF_DYNARRAY_LEVEL_LENGTH pointers
to arrays of elements, on the second level it's an array of pointers
to arrays of pointers to arrays of elements. And so on.
Actually, it's wait-free, not lock-free ;-)
*/
#undef DBUG_OFF
#include <my_global.h>
#include <strings.h>
#include <my_sys.h>
......@@ -75,6 +80,10 @@ static const int dynarray_idxes_in_prev_level[LF_DYNARRAY_LEVELS]=
LF_DYNARRAY_LEVEL_LENGTH
};
/*
Returns a valid lvalue pointer to the element number 'idx'.
Allocates memory if necessary.
*/
void *_lf_dynarray_lvalue(LF_DYNARRAY *array, uint idx)
{
void * ptr, * volatile * ptr_ptr= 0;
......@@ -123,6 +132,10 @@ void *_lf_dynarray_lvalue(LF_DYNARRAY *array, uint idx)
return ptr + array->size_of_element * idx;
}
/*
Returns a pointer to the element number 'idx'
or NULL if an element does not exists
*/
void *_lf_dynarray_value(LF_DYNARRAY *array, uint idx)
{
void * ptr, * volatile * ptr_ptr= 0;
......@@ -157,6 +170,16 @@ static int recursive_iterate(LF_DYNARRAY *array, void *ptr, int level,
return 0;
}
/*
Calls func(array, arg) on every array of LF_DYNARRAY_LEVEL_LENGTH elements
in lf_dynarray.
DESCRIPTION
lf_dynarray consists of a set of arrays, LF_DYNARRAY_LEVEL_LENGTH elements
each. _lf_dynarray_iterate() calls user-supplied function on every array
from the set. It is the fastest way to scan the array, faster than
for (i=0; i < N; i++) { func(_lf_dynarray_value(dynarray, i)); }
*/
int _lf_dynarray_iterate(LF_DYNARRAY *array, lf_dynarray_func func, void *arg)
{
int i, res;
......
......@@ -29,22 +29,35 @@
LF_REQUIRE_PINS(3);
/* An element of the list */
typedef struct {
intptr volatile link;
uint32 hashnr;
intptr volatile link; /* a pointer to the next element in a listand a flag */
uint32 hashnr; /* reversed hash number, for sorting */
const uchar *key;
uint keylen;
} LF_SLIST;
/*
a structure to pass the context (pointers two the three successive elements
in a list) from lfind to linsert/ldelete
*/
typedef struct {
intptr volatile *prev;
LF_SLIST *curr, *next;
} CURSOR;
/*
the last bit in LF_SLIST::link is a "deleted" flag.
the helper macros below convert it to a pure pointer or a pure flag
*/
#define PTR(V) (LF_SLIST *)((V) & (~(intptr)1))
#define DELETED(V) ((V) & 1)
/*
DESCRIPTION
Search for hashnr/key/keylen in the list starting from 'head' and
position the cursor. The list is ORDER BY hashnr, key
RETURN
0 - not found
1 - found
......@@ -53,7 +66,7 @@ typedef struct {
cursor is positioned in either case
pins[0..2] are used, they are NOT removed on return
*/
static int lfind(LF_SLIST * volatile *head, uint32 hashnr,
static int lfind(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
const uchar *key, uint keylen, CURSOR *cursor, LF_PINS *pins)
{
uint32 cur_hashnr;
......@@ -89,7 +102,8 @@ retry:
if (cur_hashnr >= hashnr)
{
int r=1;
if (cur_hashnr > hashnr || (r=memcmp(cur_key, key, keylen)) >= 0)
if (cur_hashnr > hashnr ||
(r=my_strnncoll(cs, cur_key, cur_keylen, key, keylen)) >= 0)
return !r;
}
cursor->prev=&(cursor->curr->link);
......@@ -112,22 +126,26 @@ retry:
}
/*
DESCRIPTION
insert a 'node' in the list that starts from 'head' in the correct
position (as found by lfind)
RETURN
0 - inserted
not 0 - a pointer to a conflict (not pinned and thus unusable)
not 0 - a pointer to a duplicate (not pinned and thus unusable)
NOTE
it uses pins[0..2], on return all pins are removed.
*/
static LF_SLIST *linsert(LF_SLIST * volatile *head, LF_SLIST *node,
LF_PINS *pins, uint flags)
static LF_SLIST *linsert(LF_SLIST * volatile *head, CHARSET_INFO *cs,
LF_SLIST *node, LF_PINS *pins, uint flags)
{
CURSOR cursor;
int res=-1;
do
{
if (lfind(head, node->hashnr, node->key, node->keylen,
if (lfind(head, cs, node->hashnr, node->key, node->keylen,
&cursor, pins) &&
(flags & LF_HASH_UNIQUE))
res=0; /* duplicate found */
......@@ -147,13 +165,18 @@ static LF_SLIST *linsert(LF_SLIST * volatile *head, LF_SLIST *node,
}
/*
DESCRIPTION
deletes a node as identified by hashnr/keey/keylen from the list
that starts from 'head'
RETURN
0 - ok
1 - not found
NOTE
it uses pins[0..2], on return all pins are removed.
*/
static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
static int ldelete(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
const uchar *key, uint keylen, LF_PINS *pins)
{
CURSOR cursor;
......@@ -161,7 +184,7 @@ static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
do
{
if (!lfind(head, hashnr, key, keylen, &cursor, pins))
if (!lfind(head, cs, hashnr, key, keylen, &cursor, pins))
res= 1;
else
if (my_atomic_casptr((void **)&(cursor.curr->link),
......@@ -171,7 +194,7 @@ static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
(void **)&cursor.curr, cursor.next))
_lf_alloc_free(pins, cursor.curr);
else
lfind(head, hashnr, key, keylen, &cursor, pins);
lfind(head, cs, hashnr, key, keylen, &cursor, pins);
res= 0;
}
} while (res == -1);
......@@ -182,18 +205,24 @@ static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
}
/*
DESCRIPTION
searches for a node as identified by hashnr/keey/keylen in the list
that starts from 'head'
RETURN
0 - not found
node - found
NOTE
it uses pins[0..2], on return the pin[2] keeps the node found
all other pins are removed.
*/
static LF_SLIST *lsearch(LF_SLIST * volatile *head, uint32 hashnr,
const uchar *key, uint keylen, LF_PINS *pins)
static LF_SLIST *lsearch(LF_SLIST * volatile *head, CHARSET_INFO *cs,
uint32 hashnr, const uchar *key, uint keylen,
LF_PINS *pins)
{
CURSOR cursor;
int res=lfind(head, hashnr, key, keylen, &cursor, pins);
int res=lfind(head, cs, hashnr, key, keylen, &cursor, pins);
if (res) _lf_pin(pins, 2, cursor.curr);
_lf_unpin(pins, 0);
_lf_unpin(pins, 1);
......@@ -219,6 +248,9 @@ static inline uint calc_hash(LF_HASH *hash, const uchar *key, uint keylen)
#define MAX_LOAD 1.0
static void initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
/*
Initializes lf_hash, the arguments are compatible with hash_init
*/
void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
uint key_offset, uint key_length, hash_get_key get_key,
CHARSET_INFO *charset)
......@@ -254,9 +286,14 @@ void lf_hash_destroy(LF_HASH *hash)
}
/*
DESCRIPTION
inserts a new element to a hash. it will have a _copy_ of
data, not a pointer to it.
RETURN
0 - inserted
1 - didn't (unique key conflict)
NOTE
see linsert() for pin usage notes
*/
......@@ -275,7 +312,7 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
if (*el == NULL)
initialize_bucket(hash, el, bucket, pins);
node->hashnr=my_reverse_bits(hashnr) | 1;
if (linsert(el, node, pins, hash->flags))
if (linsert(el, hash->charset, node, pins, hash->flags))
{
_lf_alloc_free(pins, node);
lf_rwunlock_by_pins(pins);
......@@ -305,7 +342,8 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
el=_lf_dynarray_lvalue(&hash->array, bucket);
if (*el == NULL)
initialize_bucket(hash, el, bucket, pins);
if (ldelete(el, my_reverse_bits(hashnr) | 1, (uchar *)key, keylen, pins))
if (ldelete(el, hash->charset, my_reverse_bits(hashnr) | 1,
(uchar *)key, keylen, pins))
{
lf_rwunlock_by_pins(pins);
return 1;
......@@ -329,7 +367,8 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
el=_lf_dynarray_lvalue(&hash->array, bucket);
if (*el == NULL)
initialize_bucket(hash, el, bucket, pins);
found= lsearch(el, my_reverse_bits(hashnr) | 1, (uchar *)key, keylen, pins);
found= lsearch(el, hash->charset, my_reverse_bits(hashnr) | 1,
(uchar *)key, keylen, pins);
lf_rwunlock_by_pins(pins);
return found ? found+1 : 0;
}
......@@ -348,7 +387,7 @@ static void initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
dummy->hashnr=my_reverse_bits(bucket);
dummy->key=dummy_key;
dummy->keylen=0;
if ((cur= linsert(el, dummy, pins, 0)))
if ((cur= linsert(el, hash->charset, dummy, pins, 0)))
{
my_free((void *)dummy, MYF(0));
dummy= cur;
......
......@@ -124,7 +124,7 @@ int trnman_init()
this could only be called in the "idle" state - no transaction can be
running. See asserts below.
*/
int trnman_destroy()
void trnman_destroy()
{
DBUG_ASSERT(trid_to_committed_trn.count == 0);
DBUG_ASSERT(trnman_active_transactions == 0);
......@@ -198,7 +198,10 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
/* Allocating a new TRN structure */
trn= pool;
/* Popping an unused TRN from the pool */
/*
Popping an unused TRN from the pool
(ABA isn't possible, we're behind a mutex
*/
my_atomic_rwlock_wrlock(&LOCK_pool);
while (trn && !my_atomic_casptr((void **)&pool, (void **)&trn,
(void *)trn->next))
......@@ -265,7 +268,6 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
*/
void trnman_end_trn(TRN *trn, my_bool commit)
{
int res;
TRN *free_me= 0;
LF_PINS *pins= trn->pins;
......@@ -303,8 +305,9 @@ void trnman_end_trn(TRN *trn, my_bool commit)
*/
if (commit && active_list_min.next != &active_list_max)
{
trn->commit_trid= global_trid_generator;
int res;
trn->commit_trid= global_trid_generator;
trn->next= &committed_list_max;
trn->prev= committed_list_max.prev;
committed_list_max.prev= trn->prev->next= trn;
......@@ -328,13 +331,18 @@ void trnman_end_trn(TRN *trn, my_bool commit)
my_atomic_storeptr((void **)&short_trid_to_active_trn[trn->short_id], 0);
my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn);
/*
we, under the mutex, removed going-in-free_me transactions from the
active and committed lists, thus nobody else may see them when it scans
those lists, and thus nobody may want to free them. Now we don't
need a mutex to access free_me list
*/
while (free_me) // XXX send them to the purge thread
{
int res;
TRN *t= free_me;
free_me= free_me->next;
res= lf_hash_delete(&trid_to_committed_trn, pins, &t->trid, sizeof(TrID));
lf_hash_delete(&trid_to_committed_trn, pins, &t->trid, sizeof(TrID));
trnman_free_trn(t);
}
......
......@@ -44,7 +44,7 @@ struct st_transaction
extern uint trnman_active_transactions, trnman_allocated_transactions;
int trnman_init(void);
int trnman_destroy(void);
void trnman_destroy(void);
TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond);
void trnman_end_trn(TRN *trn, my_bool commit);
#define trnman_commit_trn(T) trnman_end_trn(T, TRUE)
......
......@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#undef EXTRA_VERBOSE
//#define EXTRA_VERBOSE
#include <tap.h>
......@@ -46,7 +46,7 @@ LOCK_OWNER *loid2lo(uint16 loid)
lockman_release_locks(&lockman, loid2lo(O));print_lockhash(&lockman)
#define test_lock(O, R, L, S, RES) \
ok(lockman_getlock(&lockman, loid2lo(O), R, L) == RES, \
"lo" #O "> " S " lock resource " #R " with " #L "-lock"); \
"lo" #O "> " S "lock resource " #R " with " #L "-lock"); \
print_lockhash(&lockman)
#define lock_ok_a(O, R, L) \
test_lock(O, R, L, "", GOT_THE_LOCK)
......@@ -107,38 +107,49 @@ void test_lockman_simple()
unlock_all(3);
unlock_all(4);
lock_ok_i(1, 1, IX);
lock_ok_i(2, 1, IX);
lock_conflict(1, 1, S);
lock_conflict(2, 1, X);
unlock_all(1);
unlock_all(2);
}
pthread_attr_t rt_attr;
pthread_mutex_t rt_mutex;
pthread_cond_t rt_cond;
int rt_num_threads;
int litmus;
int thread_number= 0, timeouts= 0;
void run_test(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
pthread_t *threads;
ulonglong now= my_getsystime();
int i;
thread_number= timeouts= 0;
litmus= 0;
threads= (pthread_t *)my_malloc(sizeof(void *)*n, MYF(0));
if (!threads)
{
diag("Out of memory");
abort();
}
diag("Running %s with %d threads, %d iterations... ", test, n, m);
for (rt_num_threads= n ; n ; n--)
if (pthread_create(&t, &rt_attr, handler, &m))
rt_num_threads= n;
for (i= 0; i < n ; i++)
if (pthread_create(threads+i, 0, handler, &m))
{
diag("Could not create thread");
litmus++;
rt_num_threads--;
abort();
}
pthread_mutex_lock(&rt_mutex);
while (rt_num_threads)
pthread_cond_wait(&rt_cond, &rt_mutex);
pthread_mutex_unlock(&rt_mutex);
for (i= 0 ; i < n ; i++)
pthread_join(threads[i], 0);
now= my_getsystime()-now;
ok(litmus == 0, "Finished %s in %g secs (%d)", test, ((double)now)/1e7, litmus);
my_free((void*)threads, MYF(0));
}
pthread_mutex_t rt_mutex;
int Nrows= 100;
int Ntables= 10;
int table_lock_ratio= 10;
......@@ -222,10 +233,7 @@ pthread_handler_t test_lockman(void *arg)
rt_num_threads--;
timeouts+= timeout;
if (!rt_num_threads)
{
pthread_cond_signal(&rt_cond);
diag("number of timeouts: %d", timeouts);
}
pthread_mutex_unlock(&rt_mutex);
return 0;
......@@ -236,16 +244,13 @@ int main()
int i;
my_init();
pthread_mutex_init(&rt_mutex, 0);
plan(31);
plan(35);
if (my_atomic_initialize())
return exit_status();
pthread_attr_init(&rt_attr);
pthread_attr_setdetachstate(&rt_attr, PTHREAD_CREATE_DETACHED);
pthread_mutex_init(&rt_mutex, 0);
pthread_cond_init(&rt_cond, 0);
lockman_init(&lockman, &loid2lo, 50);
......@@ -290,12 +295,10 @@ int main()
ulonglong now= my_getsystime();
lockman_destroy(&lockman);
now= my_getsystime()-now;
diag("lockman_destroy: %g", ((double)now)/1e7);
diag("lockman_destroy: %g secs", ((double)now)/1e7);
}
pthread_mutex_destroy(&rt_mutex);
pthread_cond_destroy(&rt_cond);
pthread_attr_destroy(&rt_attr);
my_end(0);
return exit_status();
}
......
......@@ -22,9 +22,7 @@
#include <lf.h>
#include "../trnman.h"
pthread_attr_t rt_attr;
pthread_mutex_t rt_mutex;
pthread_cond_t rt_cond;
int rt_num_threads;
int litmus;
......@@ -74,8 +72,6 @@ end:
}
pthread_mutex_lock(&rt_mutex);
rt_num_threads--;
if (!rt_num_threads)
pthread_cond_signal(&rt_cond);
pthread_mutex_unlock(&rt_mutex);
return 0;
......@@ -84,25 +80,32 @@ end:
void run_test(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
pthread_t *threads;
ulonglong now= my_getsystime();
int i;
litmus= 0;
threads= (pthread_t *)my_malloc(sizeof(void *)*n, MYF(0));
if (!threads)
{
diag("Out of memory");
abort();
}
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (rt_num_threads= n ; n ; n--)
if (pthread_create(&t, &rt_attr, handler, &m))
rt_num_threads= n;
for (i= 0; i < n ; i++)
if (pthread_create(threads+i, 0, handler, &m))
{
diag("Could not create thread");
litmus++;
rt_num_threads--;
abort();
}
pthread_mutex_lock(&rt_mutex);
while (rt_num_threads)
pthread_cond_wait(&rt_cond, &rt_mutex);
pthread_mutex_unlock(&rt_mutex);
for (i= 0 ; i < n ; i++)
pthread_join(threads[i], 0);
now= my_getsystime()-now;
ok(litmus == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, litmus);
ok(litmus == 0, "Tested %s in %g secs (%d)", test, ((double)now)/1e7, litmus);
my_free((void*)threads, MYF(0));
}
#define ok_read_from(T1, T2, RES) \
......@@ -157,10 +160,7 @@ int main()
if (my_atomic_initialize())
return exit_status();
pthread_attr_init(&rt_attr);
pthread_attr_setdetachstate(&rt_attr, PTHREAD_CREATE_DETACHED);
pthread_mutex_init(&rt_mutex, 0);
pthread_cond_init(&rt_cond, 0);
#define CYCLES 10000
#define THREADS 10
......@@ -179,8 +179,6 @@ int main()
}
pthread_mutex_destroy(&rt_mutex);
pthread_cond_destroy(&rt_cond);
pthread_attr_destroy(&rt_attr);
my_end(0);
return exit_status();
}
......
......@@ -21,24 +21,19 @@
#include <my_atomic.h>
#include <lf.h>
volatile uint32 a32,b32,c32;
volatile uint32 a32,b32,c32, N;
my_atomic_rwlock_t rwl;
LF_ALLOCATOR lf_allocator;
LF_HASH lf_hash;
pthread_attr_t thr_attr;
pthread_mutex_t mutex;
pthread_cond_t cond;
int N;
/* add and sub a random number in a loop. Must get 0 at the end */
pthread_handler_t test_atomic_add_handler(void *arg)
{
int m=(*(int *)arg)/2;
int m= (*(int *)arg)/2;
int32 x;
for (x=((int)(intptr)(&m)); m ; m--)
for (x= ((int)(intptr)(&m)); m ; m--)
{
x=(x*m+0x87654321) & INT_MAX32;
x= (x*m+0x87654321) & INT_MAX32;
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, x);
my_atomic_rwlock_wrunlock(&rwl);
......@@ -47,10 +42,6 @@ pthread_handler_t test_atomic_add_handler(void *arg)
my_atomic_add32(&a32, -x);
my_atomic_rwlock_wrunlock(&rwl);
}
pthread_mutex_lock(&mutex);
N--;
if (!N) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
......@@ -62,24 +53,24 @@ pthread_handler_t test_atomic_add_handler(void *arg)
5. subtract result from a32
must get 0 in a32 at the end
*/
pthread_handler_t test_atomic_swap_handler(void *arg)
pthread_handler_t test_atomic_fas_handler(void *arg)
{
int m=*(int *)arg;
uint32 x=my_atomic_add32(&b32, 1);
int m= *(int *)arg;
uint32 x= my_atomic_add32(&b32, 1);
my_atomic_add32(&a32, x);
for (; m ; m--)
{
my_atomic_rwlock_wrlock(&rwl);
x=my_atomic_swap32(&c32, x);
x= my_atomic_fas32(&c32, x);
my_atomic_rwlock_wrunlock(&rwl);
}
if (!x)
{
my_atomic_rwlock_wrlock(&rwl);
x=my_atomic_swap32(&c32, x);
x= my_atomic_fas32(&c32, x);
my_atomic_rwlock_wrunlock(&rwl);
}
......@@ -87,10 +78,6 @@ pthread_handler_t test_atomic_swap_handler(void *arg)
my_atomic_add32(&a32, -x);
my_atomic_rwlock_wrunlock(&rwl);
pthread_mutex_lock(&mutex);
N--;
if (!N) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
......@@ -101,29 +88,25 @@ pthread_handler_t test_atomic_swap_handler(void *arg)
*/
pthread_handler_t test_atomic_cas_handler(void *arg)
{
int m=(*(int *)arg)/2, ok=0;
int m= (*(int *)arg)/2, ok= 0;
int32 x, y;
for (x=((int)(intptr)(&m)); m ; m--)
for (x= ((int)(intptr)(&m)); m ; m--)
{
my_atomic_rwlock_wrlock(&rwl);
y=my_atomic_load32(&a32);
y= my_atomic_load32(&a32);
my_atomic_rwlock_wrunlock(&rwl);
x=(x*m+0x87654321) & INT_MAX32;
x= (x*m+0x87654321) & INT_MAX32;
do {
my_atomic_rwlock_wrlock(&rwl);
ok=my_atomic_cas32(&a32, &y, y+x);
ok= my_atomic_cas32(&a32, &y, y+x);
my_atomic_rwlock_wrunlock(&rwl);
} while (!ok) ;
do {
my_atomic_rwlock_wrlock(&rwl);
ok=my_atomic_cas32(&a32, &y, y-x);
ok= my_atomic_cas32(&a32, &y, y-x);
my_atomic_rwlock_wrunlock(&rwl);
} while (!ok) ;
}
pthread_mutex_lock(&mutex);
N--;
if (!N) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
......@@ -132,23 +115,18 @@ pthread_handler_t test_atomic_cas_handler(void *arg)
*/
pthread_handler_t test_lf_pinbox(void *arg)
{
int m=*(int *)arg;
int32 x=0;
int m= *(int *)arg;
int32 x= 0;
LF_PINS *pins;
pins=lf_pinbox_get_pins(&lf_allocator.pinbox);
pins= lf_pinbox_get_pins(&lf_allocator.pinbox);
for (x=((int)(intptr)(&m)); m ; m--)
for (x= ((int)(intptr)(&m)); m ; m--)
{
lf_pinbox_put_pins(pins);
pins=lf_pinbox_get_pins(&lf_allocator.pinbox);
pins= lf_pinbox_get_pins(&lf_allocator.pinbox);
}
lf_pinbox_put_pins(pins);
pthread_mutex_lock(&mutex);
N--;
if (!N)
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
......@@ -159,122 +137,123 @@ typedef union {
pthread_handler_t test_lf_alloc(void *arg)
{
int m=(*(int *)arg)/2;
int32 x,y=0;
int m= (*(int *)arg)/2;
int32 x,y= 0;
LF_PINS *pins;
pins=lf_alloc_get_pins(&lf_allocator);
pins= lf_alloc_get_pins(&lf_allocator);
for (x=((int)(intptr)(&m)); m ; m--)
for (x= ((int)(intptr)(&m)); m ; m--)
{
TLA *node1, *node2;
x=(x*m+0x87654321) & INT_MAX32;
node1=(TLA *)lf_alloc_new(pins);
node1->data=x;
y+=node1->data;
node1->data=0;
node2=(TLA *)lf_alloc_new(pins);
node2->data=x;
y-=node2->data;
node2->data=0;
x= (x*m+0x87654321) & INT_MAX32;
node1= (TLA *)lf_alloc_new(pins);
node1->data= x;
y+= node1->data;
node1->data= 0;
node2= (TLA *)lf_alloc_new(pins);
node2->data= x;
y-= node2->data;
node2->data= 0;
lf_alloc_free(pins, node1);
lf_alloc_free(pins, node2);
}
lf_alloc_put_pins(pins);
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, y);
my_atomic_rwlock_wrunlock(&rwl);
pthread_mutex_lock(&mutex);
N--;
if (!N)
if (my_atomic_add32(&N, -1) == 1)
{
diag("%d mallocs, %d pins in stack",
lf_allocator.mallocs, lf_allocator.pinbox.pins_in_stack);
#ifdef MY_LF_EXTRA_DEBUG
a32|=lf_allocator.mallocs - lf_alloc_in_pool(&lf_allocator);
a32|= lf_allocator.mallocs - lf_alloc_in_pool(&lf_allocator);
#endif
pthread_cond_signal(&cond);
}
pthread_mutex_unlock(&mutex);
my_atomic_rwlock_wrunlock(&rwl);
return 0;
}
#define N_TLH 1000
pthread_handler_t test_lf_hash(void *arg)
{
int m=(*(int *)arg)/(2*N_TLH);
int32 x,y,z,sum=0, ins=0;
int m= (*(int *)arg)/(2*N_TLH);
int32 x,y,z,sum= 0, ins= 0;
LF_PINS *pins;
pins=lf_hash_get_pins(&lf_hash);
pins= lf_hash_get_pins(&lf_hash);
for (x=((int)(intptr)(&m)); m ; m--)
for (x= ((int)(intptr)(&m)); m ; m--)
{
int i;
y=x;
for (i=0; i < N_TLH; i++)
y= x;
for (i= 0; i < N_TLH; i++)
{
x=(x*(m+i)+0x87654321) & INT_MAX32;
z=(x<0) ? -x : x;
x= (x*(m+i)+0x87654321) & INT_MAX32;
z= (x<0) ? -x : x;
if (lf_hash_insert(&lf_hash, pins, &z))
{
sum+=z;
sum+= z;
ins++;
}
}
for (i=0; i < N_TLH; i++)
for (i= 0; i < N_TLH; i++)
{
y=(y*(m+i)+0x87654321) & INT_MAX32;
z=(y<0) ? -y : y;
y= (y*(m+i)+0x87654321) & INT_MAX32;
z= (y<0) ? -y : y;
if (lf_hash_delete(&lf_hash, pins, (uchar *)&z, sizeof(z)))
sum-=z;
sum-= z;
}
}
lf_hash_put_pins(pins);
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, sum);
my_atomic_add32(&b32, ins);
my_atomic_rwlock_wrunlock(&rwl);
pthread_mutex_lock(&mutex);
N--;
if (!N)
if (my_atomic_add32(&N, -1) == 1)
{
diag("%d mallocs, %d pins in stack, %d hash size, %d inserts",
lf_hash.alloc.mallocs, lf_hash.alloc.pinbox.pins_in_stack,
lf_hash.size, b32);
a32|=lf_hash.count;
pthread_cond_signal(&cond);
a32|= lf_hash.count;
}
pthread_mutex_unlock(&mutex);
my_atomic_rwlock_wrunlock(&rwl);
return 0;
}
void test_atomic(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
ulonglong now=my_getsystime();
pthread_t *threads;
ulonglong now= my_getsystime();
int i;
a32= 0;
b32= 0;
c32= 0;
threads= (pthread_t *)my_malloc(sizeof(void *)*n, MYF(0));
if (!threads)
{
diag("Out of memory");
abort();
}
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (N=n ; n ; n--)
N= n;
for (i= 0 ; i < n ; i++)
{
if (pthread_create(&t, &thr_attr, handler, &m) != 0)
if (pthread_create(threads+i, 0, handler, &m) != 0)
{
diag("Could not create thread");
a32= 1;
goto err;
abort();
}
}
pthread_mutex_lock(&mutex);
while (N)
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
now=my_getsystime()-now;
for (i= 0 ; i < n ; i++)
pthread_join(threads[i], 0);
now= my_getsystime()-now;
err:
ok(a32 == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, a32);
my_free((void *)threads, MYF(0));
}
int main()
......@@ -289,10 +268,6 @@ int main()
plan(7);
ok(err == 0, "my_atomic_initialize() returned %d", err);
pthread_attr_init(&thr_attr);
pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
my_atomic_rwlock_init(&rwl);
lf_alloc_init(&lf_allocator, sizeof(TLA), offsetof(TLA, not_used));
lf_hash_init(&lf_hash, sizeof(int), LF_HASH_UNIQUE, 0, sizeof(int), 0,
......@@ -306,18 +281,15 @@ int main()
#define THREADS 100
test_atomic("my_atomic_add32", test_atomic_add_handler, THREADS,CYCLES);
test_atomic("my_atomic_swap32", test_atomic_swap_handler, THREADS,CYCLES);
test_atomic("my_atomic_fas32", test_atomic_fas_handler, THREADS,CYCLES);
test_atomic("my_atomic_cas32", test_atomic_cas_handler, THREADS,CYCLES);
test_atomic("lf_pinbox", test_lf_pinbox, THREADS,CYCLES);
test_atomic("lf_alloc", test_lf_alloc, THREADS,CYCLES);
test_atomic("lf_hash", test_lf_hash, THREADS,CYCLES);
test_atomic("lf_hash", test_lf_hash, THREADS,CYCLES/10);
lf_hash_destroy(&lf_hash);
lf_alloc_destroy(&lf_allocator);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
pthread_attr_destroy(&thr_attr);
my_atomic_rwlock_destroy(&rwl);
my_end(0);
return exit_status();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment