Commit 7ca33ae5 authored by unknown's avatar unknown

comments, minor changes

---
comments


mysys/lf_alloc-pin.c:
  comments
mysys/lf_dynarray.c:
  comments
mysys/lf_hash.c:
  comments, charset-aware comparison
storage/maria/trnman.c:
  comments
storage/maria/unittest/lockman-t.c:
  test case for a bug
unittest/mysys/my_atomic-t.c:
  removed mistakenly copied line
parent ce707d9f
// TODO multi-pinbox
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
......@@ -17,24 +18,25 @@
/*
wait-free concurrent allocator based on pinning addresses
It works as follows: every thread (strictly speaking - every CPU, but it's
too difficult to do) has a small array of pointers. They're called "pins".
Before using an object its address must be stored in this array (pinned).
When an object is no longer necessary its address must be removed from
this array (unpinned). When a thread wants to free() an object it
scans all pins of all threads to see if somebody has this object pinned.
If yes - the object is not freed (but stored in a purgatory).
To reduce the cost of a single free() pins are not scanned on every free()
but only added to (thread-local) purgatory. On every LF_PURGATORY_SIZE
free() purgatory is scanned and all unpinned objects are freed.
It works as follows: every thread (strictly speaking - every CPU, but
it's too difficult to do) has a small array of pointers. They're called
"pins". Before using an object its address must be stored in this array
(pinned). When an object is no longer necessary its address must be
removed from this array (unpinned). When a thread wants to free() an
object it scans all pins of all threads to see if somebody has this
object pinned. If yes - the object is not freed (but stored in a
"purgatory"). To reduce the cost of a single free() pins are not scanned
on every free() but only added to (thread-local) purgatory. On every
LF_PURGATORY_SIZE free() purgatory is scanned and all unpinned objects
are freed.
Pins are used to solve ABA problem. To use pins one must obey
a pinning protocol:
1. Let's assume that PTR is a shared pointer to an object. Shared means
that any thread may modify it anytime to point to a different object and
free the old object. Later the freed object may be potentially allocated
by another thread. If we're unlucky that another thread may set PTR to
point to this object again. This is ABA problem.
that any thread may modify it anytime to point to a different object
and free the old object. Later the freed object may be potentially
allocated by another thread. If we're unlucky that another thread may
set PTR to point to this object again. This is ABA problem.
2. Create a local pointer LOCAL_PTR.
3. Pin the PTR in a loop:
do
......@@ -42,31 +44,31 @@
LOCAL_PTR= PTR;
pin(PTR, PIN_NUMBER);
} while (LOCAL_PTR != PTR)
4. It is guaranteed that after the loop is ended, LOCAL_PTR
4. It is guaranteed that after the loop has ended, LOCAL_PTR
points to an object (or NULL, if PTR may be NULL), that
will never be freed. It is not guaranteed though
that LOCAL_PTR == PTR
that LOCAL_PTR == PTR (as PTR can change any time)
5. When done working with the object, remove the pin:
unpin(PIN_NUMBER)
6. When copying pins (as in the list:
6. When copying pins (as in the list traversing loop:
pin(CUR, 1);
while ()
{
pin(CUR, 0);
do
{
NEXT=CUR->next;
pin(NEXT, 1);
} while (NEXT != CUR->next);
do // standard
{ // pinning
NEXT=CUR->next; // loop
pin(NEXT, 0); // see #3
} while (NEXT != CUR->next); // above
...
...
pin(CUR, 1);
CUR=NEXT;
pin(CUR, 1); // copy pin[0] to pin[1]
}
which keeps CUR address constantly pinned), note than pins may be copied
only upwards (!!!), that is pin N to pin M > N.
7. Don't keep the object pinned longer than necessary - the number of pins
you have is limited (and small), keeping an object pinned prevents its
reuse and cause unnecessary mallocs.
which keeps CUR address constantly pinned), note than pins may be
copied only upwards (!!!), that is pin[N] to pin[M], M > N.
7. Don't keep the object pinned longer than necessary - the number of
pins you have is limited (and small), keeping an object pinned
prevents its reuse and cause unnecessary mallocs.
Implementation details:
Pins are given away from a "pinbox". Pinbox is stack-based allocator.
......@@ -85,7 +87,7 @@
static void _lf_pinbox_real_free(LF_PINS *pins);
/*
Initialize a pinbox. Must be usually called from lf_alloc_init.
Initialize a pinbox. Normally called from lf_alloc_init.
See the latter for details.
*/
void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
......@@ -214,9 +216,9 @@ static int ptr_cmp(void **a, void **b)
*/
void _lf_pinbox_free(LF_PINS *pins, void *addr)
{
add_to_purgatory(pins, addr);
if (pins->purgatory_count % LF_PURGATORY_SIZE)
_lf_pinbox_real_free(pins);
add_to_purgatory(pins, addr);
}
struct st_harvester {
......
......@@ -26,10 +26,15 @@
Every element is aligned to sizeof(element) boundary
(to avoid false sharing if element is big enough).
LF_DYNARRAY is a recursive structure. On the zero level
LF_DYNARRAY::level[0] it's an array of LF_DYNARRAY_LEVEL_LENGTH elements,
on the first level it's an array of LF_DYNARRAY_LEVEL_LENGTH pointers
to arrays of elements, on the second level it's an array of pointers
to arrays of pointers to arrays of elements. And so on.
Actually, it's wait-free, not lock-free ;-)
*/
#undef DBUG_OFF
#include <my_global.h>
#include <strings.h>
#include <my_sys.h>
......@@ -75,6 +80,10 @@ static const int dynarray_idxes_in_prev_level[LF_DYNARRAY_LEVELS]=
LF_DYNARRAY_LEVEL_LENGTH
};
/*
Returns a valid lvalue pointer to the element number 'idx'.
Allocates memory if necessary.
*/
void *_lf_dynarray_lvalue(LF_DYNARRAY *array, uint idx)
{
void * ptr, * volatile * ptr_ptr= 0;
......@@ -123,6 +132,10 @@ void *_lf_dynarray_lvalue(LF_DYNARRAY *array, uint idx)
return ptr + array->size_of_element * idx;
}
/*
Returns a pointer to the element number 'idx'
or NULL if an element does not exists
*/
void *_lf_dynarray_value(LF_DYNARRAY *array, uint idx)
{
void * ptr, * volatile * ptr_ptr= 0;
......@@ -157,6 +170,16 @@ static int recursive_iterate(LF_DYNARRAY *array, void *ptr, int level,
return 0;
}
/*
Calls func(array, arg) on every array of LF_DYNARRAY_LEVEL_LENGTH elements
in lf_dynarray.
DESCRIPTION
lf_dynarray consists of a set of arrays, LF_DYNARRAY_LEVEL_LENGTH elements
each. _lf_dynarray_iterate() calls user-supplied function on every array
from the set. It is the fastest way to scan the array, faster than
for (i=0; i < N; i++) { func(_lf_dynarray_value(dynarray, i)); }
*/
int _lf_dynarray_iterate(LF_DYNARRAY *array, lf_dynarray_func func, void *arg)
{
int i, res;
......
......@@ -29,22 +29,35 @@
LF_REQUIRE_PINS(3);
/* An element of the list */
typedef struct {
intptr volatile link;
uint32 hashnr;
intptr volatile link; /* a pointer to the next element in a listand a flag */
uint32 hashnr; /* reversed hash number, for sorting */
const uchar *key;
uint keylen;
} LF_SLIST;
/*
a structure to pass the context (pointers two the three successive elements
in a list) from lfind to linsert/ldelete
*/
typedef struct {
intptr volatile *prev;
LF_SLIST *curr, *next;
} CURSOR;
/*
the last bit in LF_SLIST::link is a "deleted" flag.
the helper macros below convert it to a pure pointer or a pure flag
*/
#define PTR(V) (LF_SLIST *)((V) & (~(intptr)1))
#define DELETED(V) ((V) & 1)
/*
DESCRIPTION
Search for hashnr/key/keylen in the list starting from 'head' and
position the cursor. The list is ORDER BY hashnr, key
RETURN
0 - not found
1 - found
......@@ -53,7 +66,7 @@ typedef struct {
cursor is positioned in either case
pins[0..2] are used, they are NOT removed on return
*/
static int lfind(LF_SLIST * volatile *head, uint32 hashnr,
static int lfind(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
const uchar *key, uint keylen, CURSOR *cursor, LF_PINS *pins)
{
uint32 cur_hashnr;
......@@ -89,7 +102,8 @@ retry:
if (cur_hashnr >= hashnr)
{
int r=1;
if (cur_hashnr > hashnr || (r=memcmp(cur_key, key, keylen)) >= 0)
if (cur_hashnr > hashnr ||
(r=my_strnncoll(cs, cur_key, cur_keylen, key, keylen)) >= 0)
return !r;
}
cursor->prev=&(cursor->curr->link);
......@@ -112,22 +126,26 @@ retry:
}
/*
DESCRIPTION
insert a 'node' in the list that starts from 'head' in the correct
position (as found by lfind)
RETURN
0 - inserted
not 0 - a pointer to a conflict (not pinned and thus unusable)
not 0 - a pointer to a duplicate (not pinned and thus unusable)
NOTE
it uses pins[0..2], on return all pins are removed.
*/
static LF_SLIST *linsert(LF_SLIST * volatile *head, LF_SLIST *node,
LF_PINS *pins, uint flags)
static LF_SLIST *linsert(LF_SLIST * volatile *head, CHARSET_INFO *cs,
LF_SLIST *node, LF_PINS *pins, uint flags)
{
CURSOR cursor;
int res=-1;
do
{
if (lfind(head, node->hashnr, node->key, node->keylen,
if (lfind(head, cs, node->hashnr, node->key, node->keylen,
&cursor, pins) &&
(flags & LF_HASH_UNIQUE))
res=0; /* duplicate found */
......@@ -147,13 +165,18 @@ static LF_SLIST *linsert(LF_SLIST * volatile *head, LF_SLIST *node,
}
/*
DESCRIPTION
deletes a node as identified by hashnr/keey/keylen from the list
that starts from 'head'
RETURN
0 - ok
1 - not found
NOTE
it uses pins[0..2], on return all pins are removed.
*/
static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
static int ldelete(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
const uchar *key, uint keylen, LF_PINS *pins)
{
CURSOR cursor;
......@@ -161,7 +184,7 @@ static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
do
{
if (!lfind(head, hashnr, key, keylen, &cursor, pins))
if (!lfind(head, cs, hashnr, key, keylen, &cursor, pins))
res= 1;
else
if (my_atomic_casptr((void **)&(cursor.curr->link),
......@@ -171,7 +194,7 @@ static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
(void **)&cursor.curr, cursor.next))
_lf_alloc_free(pins, cursor.curr);
else
lfind(head, hashnr, key, keylen, &cursor, pins);
lfind(head, cs, hashnr, key, keylen, &cursor, pins);
res= 0;
}
} while (res == -1);
......@@ -182,18 +205,24 @@ static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
}
/*
DESCRIPTION
searches for a node as identified by hashnr/keey/keylen in the list
that starts from 'head'
RETURN
0 - not found
node - found
NOTE
it uses pins[0..2], on return the pin[2] keeps the node found
all other pins are removed.
*/
static LF_SLIST *lsearch(LF_SLIST * volatile *head, uint32 hashnr,
const uchar *key, uint keylen, LF_PINS *pins)
static LF_SLIST *lsearch(LF_SLIST * volatile *head, CHARSET_INFO *cs,
uint32 hashnr, const uchar *key, uint keylen,
LF_PINS *pins)
{
CURSOR cursor;
int res=lfind(head, hashnr, key, keylen, &cursor, pins);
int res=lfind(head, cs, hashnr, key, keylen, &cursor, pins);
if (res) _lf_pin(pins, 2, cursor.curr);
_lf_unpin(pins, 0);
_lf_unpin(pins, 1);
......@@ -219,6 +248,9 @@ static inline uint calc_hash(LF_HASH *hash, const uchar *key, uint keylen)
#define MAX_LOAD 1.0
static void initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
/*
Initializes lf_hash, the arguments are compatible with hash_init
*/
void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
uint key_offset, uint key_length, hash_get_key get_key,
CHARSET_INFO *charset)
......@@ -254,9 +286,14 @@ void lf_hash_destroy(LF_HASH *hash)
}
/*
DESCRIPTION
inserts a new element to a hash. it will have a _copy_ of
data, not a pointer to it.
RETURN
0 - inserted
1 - didn't (unique key conflict)
NOTE
see linsert() for pin usage notes
*/
......@@ -275,7 +312,7 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
if (*el == NULL)
initialize_bucket(hash, el, bucket, pins);
node->hashnr=my_reverse_bits(hashnr) | 1;
if (linsert(el, node, pins, hash->flags))
if (linsert(el, hash->charset, node, pins, hash->flags))
{
_lf_alloc_free(pins, node);
lf_rwunlock_by_pins(pins);
......@@ -305,7 +342,8 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
el=_lf_dynarray_lvalue(&hash->array, bucket);
if (*el == NULL)
initialize_bucket(hash, el, bucket, pins);
if (ldelete(el, my_reverse_bits(hashnr) | 1, (uchar *)key, keylen, pins))
if (ldelete(el, hash->charset, my_reverse_bits(hashnr) | 1,
(uchar *)key, keylen, pins))
{
lf_rwunlock_by_pins(pins);
return 1;
......@@ -329,7 +367,8 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
el=_lf_dynarray_lvalue(&hash->array, bucket);
if (*el == NULL)
initialize_bucket(hash, el, bucket, pins);
found= lsearch(el, my_reverse_bits(hashnr) | 1, (uchar *)key, keylen, pins);
found= lsearch(el, hash->charset, my_reverse_bits(hashnr) | 1,
(uchar *)key, keylen, pins);
lf_rwunlock_by_pins(pins);
return found ? found+1 : 0;
}
......@@ -348,7 +387,7 @@ static void initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
dummy->hashnr=my_reverse_bits(bucket);
dummy->key=dummy_key;
dummy->keylen=0;
if ((cur= linsert(el, dummy, pins, 0)))
if ((cur= linsert(el, hash->charset, dummy, pins, 0)))
{
my_free((void *)dummy, MYF(0));
dummy= cur;
......
......@@ -198,7 +198,10 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
/* Allocating a new TRN structure */
trn= pool;
/* Popping an unused TRN from the pool */
/*
Popping an unused TRN from the pool
(ABA isn't possible, we're behind a mutex
*/
my_atomic_rwlock_wrlock(&LOCK_pool);
while (trn && !my_atomic_casptr((void **)&pool, (void **)&trn,
(void *)trn->next))
......@@ -328,6 +331,12 @@ void trnman_end_trn(TRN *trn, my_bool commit)
my_atomic_storeptr((void **)&short_trid_to_active_trn[trn->short_id], 0);
my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn);
/*
we, under the mutex, removed going-in-free_me transactions from the
active and committed lists, thus nobody else may see them when it scans
those lists, and thus nobody may want to free them. Now we don't
need a mutex to access free_me list
*/
while (free_me) // XXX send them to the purge thread
{
TRN *t= free_me;
......
......@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#undef EXTRA_VERBOSE
//#define EXTRA_VERBOSE
#include <tap.h>
......@@ -46,7 +46,7 @@ LOCK_OWNER *loid2lo(uint16 loid)
lockman_release_locks(&lockman, loid2lo(O));print_lockhash(&lockman)
#define test_lock(O, R, L, S, RES) \
ok(lockman_getlock(&lockman, loid2lo(O), R, L) == RES, \
"lo" #O "> " S " lock resource " #R " with " #L "-lock"); \
"lo" #O "> " S "lock resource " #R " with " #L "-lock"); \
print_lockhash(&lockman)
#define lock_ok_a(O, R, L) \
test_lock(O, R, L, "", GOT_THE_LOCK)
......@@ -107,6 +107,12 @@ void test_lockman_simple()
unlock_all(3);
unlock_all(4);
lock_ok_i(1, 1, IX);
lock_ok_i(2, 1, IX);
lock_conflict(1, 1, S);
lock_conflict(2, 1, X);
unlock_all(1);
unlock_all(2);
}
int rt_num_threads;
......@@ -240,7 +246,7 @@ int main()
my_init();
pthread_mutex_init(&rt_mutex, 0);
plan(31);
plan(35);
if (my_atomic_initialize())
return exit_status();
......@@ -289,7 +295,7 @@ int main()
ulonglong now= my_getsystime();
lockman_destroy(&lockman);
now= my_getsystime()-now;
diag("lockman_destroy: %g", ((double)now)/1e7);
diag("lockman_destroy: %g secs", ((double)now)/1e7);
}
pthread_mutex_destroy(&rt_mutex);
......
......@@ -209,7 +209,6 @@ pthread_handler_t test_lf_hash(void *arg)
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, sum);
my_atomic_add32(&b32, ins);
my_atomic_add32(&a32, y);
if (my_atomic_add32(&N, -1) == 1)
{
......@@ -281,12 +280,12 @@ int main()
#endif
#define THREADS 100
test_atomic("my_atomic_add32", test_atomic_add_handler, THREADS,CYCLES);
test_atomic("my_atomic_fas32", test_atomic_fas_handler, THREADS,CYCLES);
test_atomic("my_atomic_cas32", test_atomic_cas_handler, THREADS,CYCLES);
test_atomic("lf_pinbox", test_lf_pinbox, THREADS,CYCLES);
test_atomic("lf_alloc", test_lf_alloc, THREADS,CYCLES);
test_atomic("lf_hash", test_lf_hash, THREADS,CYCLES);
test_atomic("my_atomic_add32", test_atomic_add_handler, THREADS,CYCLES);
test_atomic("my_atomic_fas32", test_atomic_fas_handler, THREADS,CYCLES);
test_atomic("my_atomic_cas32", test_atomic_cas_handler, THREADS,CYCLES);
test_atomic("lf_pinbox", test_lf_pinbox, THREADS,CYCLES);
test_atomic("lf_alloc", test_lf_alloc, THREADS,CYCLES);
test_atomic("lf_hash", test_lf_hash, THREADS,CYCLES/10);
lf_hash_destroy(&lf_hash);
lf_alloc_destroy(&lf_allocator);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment