Commit 12a55aea authored by unknown's avatar unknown

lock manager passed unit tests


storage/maria/trnman.c:
  comments
include/my_dbug.h:
  make DBUG_ASSERT always a statement
storage/maria/lockman.h:
  comments
include/lf.h:
  lf_pinbox - don't use a fixed-size purgatory.
mysys/lf_alloc-pin.c:
  lf_pinbox - don't use a fixed-size purgatory.
mysys/lf_hash.c:
  lf_pinbox - don't use a fixed-size purgatory.
storage/maria/lockman.c:
  removed IGNORE_ME/UPGDARED matching - it was wrong in the first place.
  updated for "lf_pinbox - don't use a fixed-size purgatory"
storage/maria/unittest/lockman-t.c:
  IGNORE_ME/UPGRADED pair counting bugtest.
  more tests
unittest/mysys/my_atomic-t.c:
  lf_pinbox - don't use a fixed-size purgatory.
parent 67a2b7cf
......@@ -96,20 +96,21 @@ typedef void lf_pinbox_free_func(void *, void *);
typedef struct {
LF_DYNARRAY pinstack;
lf_pinbox_free_func *free_func;
void * free_func_arg;
void *free_func_arg;
uint free_ptr_offset;
uint32 volatile pinstack_top_ver; /* this is a versioned pointer */
uint32 volatile pins_in_stack; /* number of elements in array */
} LF_PINBOX;
/* we want sizeof(LF_PINS) to be close to 128 to avoid false sharing */
/* we want sizeof(LF_PINS) to be 128 to avoid false sharing */
typedef struct {
void * volatile pin[LF_PINBOX_PINS];
void * purgatory[LF_PURGATORY_SIZE];
LF_PINBOX *pinbox;
void *purgatory;
uint32 purgatory_count;
uint32 volatile link;
char pad[128-sizeof(uint32)*2
-sizeof(void *)*(LF_PINBOX_PINS+LF_PURGATORY_SIZE+1)];
-sizeof(void *)*(LF_PINBOX_PINS+2)];
} LF_PINS;
#define lf_rwlock_by_pins(PINS) \
......@@ -147,8 +148,8 @@ typedef struct {
#define _lf_assert_pin(PINS, PIN) assert((PINS)->pin[PIN] != 0)
#define _lf_assert_unpin(PINS, PIN) assert((PINS)->pin[PIN]==0)
void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func,
void * free_func_arg);
void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
lf_pinbox_free_func *free_func, void * free_func_arg);
void lf_pinbox_destroy(LF_PINBOX *pinbox);
lock_wrap(lf_pinbox_get_pins, LF_PINS *,
......@@ -181,7 +182,7 @@ typedef struct st_lf_allocator {
uint32 volatile mallocs;
} LF_ALLOCATOR;
void lf_alloc_init(LF_ALLOCATOR *allocator, uint size);
void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset);
void lf_alloc_destroy(LF_ALLOCATOR *allocator);
uint lf_alloc_in_pool(LF_ALLOCATOR *allocator);
#define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR))
......
......@@ -100,7 +100,7 @@ extern FILE *_db_fp_(void);
#define DBUG_LONGJMP(a1) longjmp(a1)
#define DBUG_DUMP(keyword,a1,a2)
#define DBUG_END()
#define DBUG_ASSERT(A)
#define DBUG_ASSERT(A) do { } while(0)
#define DBUG_LOCK_FILE
#define DBUG_FILE (stderr)
#define DBUG_UNLOCK_FILE
......
......@@ -15,21 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
concurrent allocator based on pinning addresses
strictly speaking it's not lock-free, as it can be blocked
if a thread's purgatory is full and all addresses from there
are pinned.
But until the above happens, it's wait-free.
It can be made strictly wait-free by increasing purgatory size.
If it's larger than pins_in_stack*LF_PINBOX_PINS, then apocalyptical
condition above will never happen. But than the memory requirements
will be O(pins_in_stack^2).
Note, that for large purgatory sizes it makes sense to remove
purgatory array, and link objects in a list using embedded pointer.
wait-free concurrent allocator based on pinning addresses
TODO test with more than 256 threads
TODO test w/o alloca
......@@ -43,15 +29,17 @@
static void _lf_pinbox_real_free(LF_PINS *pins);
void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func,
void *free_func_arg)
void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
lf_pinbox_free_func *free_func,void *free_func_arg)
{
DBUG_ASSERT(sizeof(LF_PINS) == 128);
DBUG_ASSERT(free_ptr_offset % sizeof(void *) == 0);
lf_dynarray_init(&pinbox->pinstack, sizeof(LF_PINS));
pinbox->pinstack_top_ver=0;
pinbox->pins_in_stack=0;
pinbox->free_func=free_func;
pinbox->free_func_arg=free_func_arg;
pinbox->pinstack_top_ver= 0;
pinbox->pins_in_stack= 0;
pinbox->free_ptr_offset= free_ptr_offset;
pinbox->free_func= free_func;
pinbox->free_func_arg= free_func_arg;
}
void lf_pinbox_destroy(LF_PINBOX *pinbox)
......@@ -64,58 +52,64 @@ LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox)
uint32 pins, next, top_ver;
LF_PINS *el;
top_ver=pinbox->pinstack_top_ver;
top_ver= pinbox->pinstack_top_ver;
do
{
if (!(pins=top_ver % LF_PINBOX_MAX_PINS))
if (!(pins= top_ver % LF_PINBOX_MAX_PINS))
{
pins=my_atomic_add32(&pinbox->pins_in_stack, 1)+1;
el=(LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins);
pins= my_atomic_add32(&pinbox->pins_in_stack, 1)+1;
el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins);
break;
}
el=(LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins);
next=el->link;
el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins);
next= el->link;
} while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver,
top_ver-pins+next+LF_PINBOX_MAX_PINS));
el->link=pins;
el->purgatory_count=0;
el->pinbox=pinbox;
el->link= pins;
el->purgatory_count= 0;
el->pinbox= pinbox;
return el;
}
void _lf_pinbox_put_pins(LF_PINS *pins)
{
LF_PINBOX *pinbox=pins->pinbox;
LF_PINBOX *pinbox= pins->pinbox;
uint32 top_ver, nr;
nr=pins->link;
nr= pins->link;
#ifdef MY_LF_EXTRA_DEBUG
{
int i;
for (i=0; i < LF_PINBOX_PINS; i++)
for (i= 0; i < LF_PINBOX_PINS; i++)
assert(pins->pin[i] == 0);
}
#endif
/*
Note - this will deadlock if other threads will wait for
the caller to do something after _lf_pinbox_put_pins(),
and they would have pinned addresses that the caller wants to free.
Thus: only free pins when all work is done and nobody can wait for you!!!
*/
while (pins->purgatory_count)
{
_lf_pinbox_real_free(pins);
if (pins->purgatory_count && my_getncpus() == 1)
if (pins->purgatory_count)
{
my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock);
pthread_yield();
my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock);
}
}
top_ver=pinbox->pinstack_top_ver;
top_ver= pinbox->pinstack_top_ver;
if (nr == pinbox->pins_in_stack)
{
int32 tmp=nr;
int32 tmp= nr;
if (my_atomic_cas32(&pinbox->pins_in_stack, &tmp, tmp-1))
goto ret;
}
do
{
pins->link=top_ver % LF_PINBOX_MAX_PINS;
pins->link= top_ver % LF_PINBOX_MAX_PINS;
} while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver,
top_ver-pins->link+nr+LF_PINBOX_MAX_PINS));
ret:
......@@ -127,19 +121,20 @@ static int ptr_cmp(void **a, void **b)
return *a < *b ? -1 : *a == *b ? 0 : 1;
}
#define add_to_purgatory(PINS, ADDR) \
do \
{ \
*(void **)((char *)(ADDR)+(PINS)->pinbox->free_ptr_offset)= \
(PINS)->purgatory; \
(PINS)->purgatory= (ADDR); \
(PINS)->purgatory_count++; \
} while (0)
void _lf_pinbox_free(LF_PINS *pins, void *addr)
{
while (pins->purgatory_count == LF_PURGATORY_SIZE)
{
if (pins->purgatory_count % LF_PURGATORY_SIZE)
_lf_pinbox_real_free(pins);
if (pins->purgatory_count == LF_PURGATORY_SIZE && my_getncpus() == 1)
{
my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock);
pthread_yield();
my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock);
}
}
pins->purgatory[pins->purgatory_count++]=addr;
add_to_purgatory(pins, addr);
}
struct st_harvester {
......@@ -178,11 +173,11 @@ static int match_pins(LF_PINS *el, void *addr)
static void _lf_pinbox_real_free(LF_PINS *pins)
{
int npins;
void **addr=0;
void **start, **cur, **end=pins->purgatory+pins->purgatory_count;
LF_PINBOX *pinbox=pins->pinbox;
void *list;
void **addr;
LF_PINBOX *pinbox= pins->pinbox;
npins=pinbox->pins_in_stack+1;
npins= pinbox->pins_in_stack+1;
#ifdef HAVE_ALLOCA
/* create a sorted list of pinned addresses, to speed up searches */
......@@ -190,64 +185,64 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
{
struct st_harvester hv;
addr= (void **) alloca(sizeof(void *)*LF_PINBOX_PINS*npins);
hv.granary=addr;
hv.npins=npins;
hv.granary= addr;
hv.npins= npins;
_lf_dynarray_iterate(&pinbox->pinstack,
(lf_dynarray_func)harvest_pins, &hv);
npins=hv.granary-addr;
npins= hv.granary-addr;
if (npins)
qsort(addr, npins, sizeof(void *), (qsort_cmp)ptr_cmp);
}
else
#endif
addr= 0;
start= cur= pins->purgatory;
end= start+pins->purgatory_count;
for (; cur < end; cur++)
list= pins->purgatory;
pins->purgatory= 0;
pins->purgatory_count= 0;
while (list)
{
void *cur= list;
list= *(void **)((char *)cur+pinbox->free_ptr_offset);
if (npins)
{
if (addr)
{
void **a,**b,**c;
for (a=addr, b=addr+npins-1, c=a+(b-a)/2; b-a>1; c=a+(b-a)/2)
if (*cur == *c)
a=b=c;
else if (*cur > *c)
a=c;
for (a= addr, b= addr+npins-1, c= a+(b-a)/2; b-a>1; c= a+(b-a)/2)
if (cur == *c)
a= b= c;
else if (cur > *c)
a= c;
else
b=c;
if (*cur == *a || *cur == *b)
b= c;
if (cur == *a || cur == *b)
goto found;
}
else
{
if (_lf_dynarray_iterate(&pinbox->pinstack,
(lf_dynarray_func)match_pins, *cur))
(lf_dynarray_func)match_pins, cur))
goto found;
}
}
/* not pinned - freeing */
pinbox->free_func(*cur, pinbox->free_func_arg);
pinbox->free_func(cur, pinbox->free_func_arg);
continue;
found:
/* pinned - keeping */
*start++=*cur;
add_to_purgatory(pins, cur);
}
pins->purgatory_count=start-pins->purgatory;
#ifdef MY_LF_EXTRA_DEBUG
while (start < pins->purgatory + LF_PURGATORY_SIZE)
*start++=0;
#endif
}
static void alloc_free(void *node, LF_ALLOCATOR *allocator)
{
void *tmp;
tmp=allocator->top;
tmp= allocator->top;
do
{
(*(void **)node)=tmp;
(*(void **)node)= tmp;
} while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, node) &&
LF_BACKOFF);
}
......@@ -255,18 +250,18 @@ static void alloc_free(void *node, LF_ALLOCATOR *allocator)
LF_REQUIRE_PINS(1);
void *_lf_alloc_new(LF_PINS *pins)
{
LF_ALLOCATOR *allocator=(LF_ALLOCATOR *)(pins->pinbox->free_func_arg);
LF_ALLOCATOR *allocator= (LF_ALLOCATOR *)(pins->pinbox->free_func_arg);
void *node;
for (;;)
{
do
{
node=allocator->top;
node= allocator->top;
_lf_pin(pins, 0, node);
} while (node !=allocator->top && LF_BACKOFF);
} while (node != allocator->top && LF_BACKOFF);
if (!node)
{
if (!(node=my_malloc(allocator->element_size, MYF(MY_WME|MY_ZEROFILL))))
if (!(node= my_malloc(allocator->element_size, MYF(MY_WME|MY_ZEROFILL))))
goto ret;
#ifdef MY_LF_EXTRA_DEBUG
my_atomic_add32(&allocator->mallocs, 1);
......@@ -282,27 +277,27 @@ ret:
return node;
}
void lf_alloc_init(LF_ALLOCATOR *allocator, uint size)
void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset)
{
lf_pinbox_init(&allocator->pinbox,
lf_pinbox_init(&allocator->pinbox, free_ptr_offset,
(lf_pinbox_free_func *)alloc_free, allocator);
allocator->top=0;
allocator->mallocs=0;
allocator->element_size=size;
allocator->top= 0;
allocator->mallocs= 0;
allocator->element_size= size;
DBUG_ASSERT(size >= (int)sizeof(void *));
}
void lf_alloc_destroy(LF_ALLOCATOR *allocator)
{
void *el=allocator->top;
void *el= allocator->top;
while (el)
{
void *tmp=*(void **)el;
void *tmp= *(void **)el;
my_free(el, MYF(0));
el=tmp;
el= tmp;
}
lf_pinbox_destroy(&allocator->pinbox);
allocator->top=0;
allocator->top= 0;
}
/*
......@@ -313,7 +308,8 @@ uint lf_alloc_in_pool(LF_ALLOCATOR *allocator)
{
uint i;
void *node;
for (node=allocator->top, i=0; node; node=*(void **)node, i++) /* no op */;
for (node= allocator->top, i= 0; node; node= *(void **)node, i++)
/* no op */;
return i;
}
......@@ -20,6 +20,7 @@
TODO
try to get rid of dummy nodes ?
for non-unique hash, count only _distinct_ values
(but how to do it in lf_hash_delete ?)
*/
#include <my_global.h>
#include <my_sys.h>
......@@ -222,7 +223,8 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
uint key_offset, uint key_length, hash_get_key get_key,
CHARSET_INFO *charset)
{
lf_alloc_init(&hash->alloc,sizeof(LF_SLIST)+element_size);
lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size,
offsetof(LF_SLIST, key));
lf_dynarray_init(&hash->array, sizeof(LF_SLIST **));
hash->size=1;
hash->count=0;
......
// TODO lock escalation, instant duration locks
// TODO instant duration locks
// automatically place S instead of LS if possible
/*
TODO optimization: table locks - they have completely
......@@ -21,6 +21,94 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
Generic Lock Manager
Lock manager handles locks on "resources", a resource must be uniquely
identified by a 64-bit number. Lock manager itself does not imply
anything about the nature of a resource - it can be a row, a table, a
database, or just anything.
Locks belong to "lock owners". A Lock owner is uniquely identified by a
16-bit number. A function loid2lo must be provided by the application
that takes such a number as an argument and returns a LOCK_OWNER
structure.
Lock levels are completely defined by three tables. Lock compatibility
matrix specifies which locks can be held at the same time on a resource.
Lock combining matrix specifies what lock level has the same behaviour as
a pair of two locks of given levels. getlock_result matrix simplifies
intention locking and lock escalation for an application, basically it
defines which locks are intention locks and which locks are "loose"
locks. It is only used to provide better diagnostics for the
application, lock manager itself does not differentiate between normal,
intention, and loose locks.
Internally lock manager is based on a lock-free hash, see lf_hash.c for
details. All locks are stored in a hash, with a resource id as a search
key, so all locks for the same resource will be considered collisions and
will be put in a one (lock-free) linked list. The main lock-handling
logic is in the inner loop that searches for a lock in such a linked
list - lockfind().
This works as follows. Locks generally are added to the end of the list
(with one exception, see below). When scanning the list it is always
possible to determine what locks are granted (active) and what locks are
waiting - first lock is obviously active, the second is active if it's
compatible with the first, and so on, a lock is active if it's compatible
with all previous locks and all locks before it are also active.
To calculate the "compatible with all previous locks" all locks are
accumulated in prev_lock variable using lock_combining_matrix.
Lock upgrades: when a thread that has a lock on a given resource,
requests a new lock on the same resource and the old lock is not enough
to satisfy new lock requirements (which is defined by
lock_combining_matrix[old_lock][new_lock] != old_lock), a new lock is
placed in the list. Depending on other locks it is immediately active or
it will wait for other locks. Here's an exception to "locks are added
to the end" rule - upgraded locks are added after the last active lock
but before all waiting locks. Old lock (the one we upgraded from) is
not removed from the list, indeed we may need to return to it later if
the new lock was in a savepoint that gets rolled back. So old lock is
marked as "ignored" (IGNORE_ME flag). New lock gets an UPGRADED flag.
Loose locks add an important exception to the above. Loose locks do not
always commute with other locks. In the list IX-LS both locks are active,
while in the LS-IX list only the first lock is active. This creates a
problem in lock upgrades. If the list was IX-LS and the owner of the
first lock wants to place LS lock (which can be immediately granted), the
IX lock is upgraded to LSIX and the list becomes IX-LS-LSIX, which,
according to the lock compatibility matrix means that the last lock is
waiting - of course it all happened because IX and LS were swapped and
they don't commute. To work around this there's ACTIVE flag which is set
in every lock that never waited (was placed active), and this flag
overrides "compatible with all previous locks" rule.
When a lock is placed to the end of the list it's either compatible with
all locks and all locks are active - new lock becomes active at once, or
it conflicts with some of the locks, in this case in the 'blocker'
variable a conflicting lock is returned and the calling thread waits on a
pthread condition in the LOCK_OWNER structure of the owner of the
conflicting lock. Or a new lock is compatible with all locks, but some
existing locks are not compatible with previous locks (example: request IS,
when the list is S-IX) - that is not all locks are active. In this case a
first waiting lock is returned in the 'blocker' variable,
lockman_getlock() notices that a "blocker" does not conflict with the
requested lock, and "dereferences" it, to find the lock that it's waiting
on. The calling thread than begins to wait on the same lock.
To better support table-row relations where one needs to lock the table
with an intention lock before locking the row, extended diagnostics is
provided. When an intention lock (presumably on a table) is granted,
lockman_getlock() returns one of GOT_THE_LOCK (no need to lock the row,
perhaps the thread already has a normal lock on this table),
GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE (need to lock the row, as usual),
GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE (only need to check
whether it's possible to lock the row, but no need to lock it - perhaps
the thread has a loose lock on this table). This is defined by
getlock_result[] table.
*/
#include <my_global.h>
#include <my_sys.h>
#include <my_bit.h>
......@@ -36,11 +124,6 @@
') Though you can take LS lock while somebody has S lock, it makes no
sense - it's simpler to take S lock too.
") Strictly speaking you can take LX lock while somebody has S lock.
But in this case you lock no rows, because all rows are locked by this
somebody. So we prefer to define that LX cannot be taken when S
exists. Same about LX and X.
1 - compatible
0 - incompatible
-1 - "impossible", so that we can assert the impossibility.
......@@ -107,8 +190,8 @@ static enum lock_type lock_combining_matrix[10][10]=
Defines below help to preserve the table structure.
I/L/A values are self explanatory
x means the combination is possible (assert should not crash)
but cannot happen in row locks, only in table locks (S,X), or
lock escalations (LS,LX)
but it cannot happen in row locks, only in table locks (S,X),
or lock escalations (LS,LX)
*/
#define I GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE
#define L GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE
......@@ -138,8 +221,7 @@ typedef struct lockman_lock {
uint64 resource;
struct lockman_lock *lonext;
intptr volatile link;
uint32 hashnr;
//#warning TODO - remove hashnr from LOCK
uint32 hashnr; // TODO - remove hashnr from LOCK
uint16 loid;
uchar lock; /* sizeof(uchar) <= sizeof(enum) */
uchar flags;
......@@ -147,6 +229,7 @@ typedef struct lockman_lock {
#define IGNORE_ME 1
#define UPGRADED 2
#define ACTIVE 4
typedef struct {
intptr volatile *prev;
......@@ -171,7 +254,7 @@ static int lockfind(LOCK * volatile *head, LOCK *node,
my_bool cur_active, compatible, upgrading, prev_active;
enum lock_type lock, prev_lock, cur_lock;
uint16 loid, cur_loid;
int upgraded_pairs, cur_flags, flags;
int cur_flags, flags;
hashnr= node->hashnr;
resource= node->resource;
......@@ -187,7 +270,6 @@ retry:
upgrading= FALSE;
cursor->blocker= cursor->upgrade_from= 0;
_lf_unpin(pins, 3);
upgraded_pairs= 0;
do {
cursor->curr= PTR(*cursor->prev);
_lf_pin(pins,1,cursor->curr);
......@@ -217,28 +299,24 @@ retry:
(cur_hashnr == hashnr && cur_resource >= resource))
{
if (cur_hashnr > hashnr || cur_resource > resource)
{
if (upgraded_pairs != 0)
goto retry;
break;
}
/* ok, we have a lock for this resource */
DBUG_ASSERT(lock_compatibility_matrix[prev_lock][cur_lock] >= 0);
DBUG_ASSERT(lock_compatibility_matrix[cur_lock][lock] >= 0);
if (cur_flags & UPGRADED)
upgraded_pairs++;
if ((cur_flags & IGNORE_ME) && ! (flags & IGNORE_ME))
{
DBUG_ASSERT(cur_active);
upgraded_pairs--;
if (cur_loid == loid)
cursor->upgrade_from= cursor->curr;
}
else
{
prev_active= cur_active;
if (cur_flags & ACTIVE)
DBUG_ASSERT(prev_active == TRUE);
else
cur_active&= lock_compatibility_matrix[prev_lock][cur_lock];
if (upgrading && !cur_active && upgraded_pairs == 0)
if (upgrading && !cur_active)
break;
if (prev_active && !cur_active)
{
......@@ -253,8 +331,14 @@ retry:
if (lock_combining_matrix[cur_lock][lock] == cur_lock)
{
/* new lock is compatible */
return cur_active ? ALREADY_HAVE_THE_LOCK
: ALREADY_HAVE_THE_REQUEST;
if (cur_active)
{
cursor->blocker= cursor->curr; /* loose-locks! */
_lf_unpin(pins, 3); /* loose-locks! */
return ALREADY_HAVE_THE_LOCK;
}
else
return ALREADY_HAVE_THE_REQUEST;
}
/* not compatible, upgrading */
upgrading= TRUE;
......@@ -268,11 +352,11 @@ retry:
cursor->blocker= cursor->curr;
_lf_pin(pins, 3, cursor->curr);
}
}
prev_lock= lock_combining_matrix[prev_lock][cur_lock];
DBUG_ASSERT(prev_lock != N);
}
}
}
cursor->prev= &(cursor->curr->link);
_lf_pin(pins, 2, cursor->curr);
}
......@@ -335,6 +419,10 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
node->flags|= UPGRADED;
node->lock= lock_combining_matrix[cursor.upgrade_from->lock][node->lock];
}
if (!(res & NEED_TO_WAIT))
node->flags|= ACTIVE;
else
node->flags&= ~ACTIVE; /* if we're retrying on REPEAT_ONCE_MORE */
node->link= (intptr)cursor.curr;
DBUG_ASSERT(node->link != (intptr)node);
DBUG_ASSERT(cursor.prev != &node->link);
......@@ -349,13 +437,13 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
_lf_unpin(pins, 1);
_lf_unpin(pins, 2);
/*
note that cursor.curr is NOT pinned on return.
this is ok as it's either a dummy node for initialize_bucket
note that blocker is not necessarily pinned here (when it's == curr).
this is ok as it's either a dummy node then for initialize_bucket
and dummy nodes don't need pinning,
or it's a lock of the same transaction for lockman_getlock,
and it cannot be removed by another thread
*/
*blocker= cursor.blocker ? cursor.blocker : cursor.curr;
*blocker= cursor.blocker;
return res;
}
......@@ -419,7 +507,7 @@ static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins)
void lockman_init(LOCKMAN *lm, loid_to_lo_func *func, uint timeout)
{
lf_alloc_init(&lm->alloc,sizeof(LOCK));
lf_alloc_init(&lm->alloc,sizeof(LOCK), offsetof(LOCK,lonext));
lf_dynarray_init(&lm->array, sizeof(LOCK **));
lm->size= 1;
lm->count= 0;
......@@ -516,13 +604,13 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
res= lockinsert(el, node, pins, &blocker);
if (res & ALREADY_HAVE)
{
int r;
old_lock= blocker->lock;
_lf_assert_unpin(pins, 3); /* unpin should not be needed */
_lf_alloc_free(pins, node);
lf_rwunlock_by_pins(pins);
res= getlock_result[old_lock][lock];
DBUG_ASSERT(res);
return res;
r= getlock_result[old_lock][lock];
DBUG_ASSERT(r);
return r;
}
/* a new value was added to the hash */
csize= lm->size;
......@@ -537,9 +625,8 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
struct timespec timeout;
_lf_assert_pin(pins, 3); /* blocker must be pinned here */
lf_rwunlock_by_pins(pins);
wait_for_lo= lm->loid_to_lo(blocker->loid);
/*
now, this is tricky. blocker is not necessarily a LOCK
we're waiting for. If it's compatible with what we want,
......@@ -550,12 +637,9 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
if (lock_compatibility_matrix[blocker->lock][lock])
{
blocker= wait_for_lo->all_locks;
lf_pin(pins, 3, blocker);
_lf_pin(pins, 3, blocker);
if (blocker != wait_for_lo->all_locks)
{
lf_rwlock_by_pins(pins);
continue;
}
wait_for_lo= wait_for_lo->waiting_for;
}
......@@ -565,11 +649,11 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
to an unrelated - albeit valid - LOCK_OWNER
*/
if (!wait_for_lo)
{
/* blocker transaction has ended, short id was released */
lf_rwlock_by_pins(pins);
continue;
}
lo->waiting_for= wait_for_lo;
lf_rwunlock_by_pins(pins);
/*
We lock a mutex - it may belong to a wrong LOCK_OWNER, but it must
belong to _some_ LOCK_OWNER. It means, we can never free() a LOCK_OWNER,
......@@ -587,9 +671,8 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
lf_rwlock_by_pins(pins);
continue;
}
/* yuck. waiting */
lo->waiting_for= wait_for_lo;
/* yuck. waiting */
deadline= my_getsystime() + lm->lock_timeout * 10000;
timeout.tv_sec= deadline/10000000;
timeout.tv_nsec= (deadline % 10000000) * 100;
......@@ -607,11 +690,12 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
Instead we're relying on the caller to abort the transaction,
and release all locks at once - see lockman_release_locks()
*/
_lf_unpin(pins, 3);
lf_rwunlock_by_pins(pins);
return DIDNT_GET_THE_LOCK;
}
lo->waiting_for= 0;
}
lo->waiting_for= 0;
_lf_assert_unpin(pins, 3); /* unpin should not be needed */
lf_rwunlock_by_pins(pins);
return getlock_result[lock][lock];
......@@ -626,14 +710,15 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
*/
int lockman_release_locks(LOCKMAN *lm, LOCK_OWNER *lo)
{
LOCK * volatile *el, *node;
LOCK * volatile *el, *node, *next;
uint bucket;
LF_PINS *pins= lo->pins;
pthread_mutex_lock(lo->mutex);
lf_rwlock_by_pins(pins);
for (node= lo->all_locks; node; node= node->lonext)
for (node= lo->all_locks; node; node= next)
{
next= node->lonext;
bucket= calc_hash(node->resource) % lm->size;
el= _lf_dynarray_lvalue(&lm->array, bucket);
if (*el == NULL)
......@@ -650,12 +735,12 @@ int lockman_release_locks(LOCKMAN *lm, LOCK_OWNER *lo)
}
#ifdef MY_LF_EXTRA_DEBUG
static char *lock2str[]=
{ "N", "S", "X", "IS", "IX", "SIX", "LS", "LX", "SLX", "LSIX" };
/*
NOTE
the function below is NOT thread-safe !!!
*/
static char *lock2str[]=
{ "N", "S", "X", "IS", "IX", "SIX", "LS", "LX", "SLX", "LSIX" };
void print_lockhash(LOCKMAN *lm)
{
LOCK *el= *(LOCK **)_lf_dynarray_lvalue(&lm->array, 0);
......@@ -664,17 +749,21 @@ void print_lockhash(LOCKMAN *lm)
{
intptr next= el->link;
if (el->hashnr & 1)
{
printf("0x%08x { resource %llu, loid %u, lock %s",
el->hashnr, el->resource, el->loid, lock2str[el->lock]);
if (el->flags & IGNORE_ME) printf(" IGNORE_ME");
if (el->flags & UPGRADED) printf(" UPGRADED");
if (el->flags & ACTIVE) printf(" ACTIVE");
if (DELETED(next)) printf(" ***DELETED***");
printf("}\n");
}
else
{
printf("0x%08x { dummy ", el->hashnr);
/*printf("0x%08x { dummy }\n", el->hashnr);*/
DBUG_ASSERT(el->resource == 0 && el->loid == 0 && el->lock == X);
}
if (el->flags & IGNORE_ME) printf(" IGNORE_ME");
if (el->flags & UPGRADED) printf(" UPGRADED");
printf("}\n");
el= (LOCK *)next;
el= PTR(next);
}
}
#endif
......
......@@ -18,7 +18,10 @@
#define _lockman_h
/*
N - "no lock", not a lock, used sometimes to simplify the code
Lock levels:
^^^^^^^^^^^
N - "no lock", not a lock, used sometimes internally to simplify the code
S - Shared
X - eXclusive
IS - Intention Shared
......@@ -35,8 +38,8 @@ struct lockman_lock;
typedef struct st_lock_owner LOCK_OWNER;
struct st_lock_owner {
LF_PINS *pins;
struct lockman_lock *all_locks;
LF_PINS *pins; /* must be allocated from lockman's pinbox */
struct lockman_lock *all_locks; /* a LIFO */
LOCK_OWNER *waiting_for;
pthread_cond_t *cond; /* transactions waiting for this, wait on 'cond' */
pthread_mutex_t *mutex; /* mutex is required to use 'cond' */
......
......@@ -128,6 +128,11 @@ static void set_short_trid(TRN *trn)
trn->locks.loid= i;
}
/*
DESCRIPTION
start a new transaction, allocate and initialize transaction object
mutex and cond will be used for lock waits
*/
TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
{
TRN *trn;
......@@ -148,6 +153,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
trnman_active_transactions++;
trn= pool;
/* popping an element from a stack */
my_atomic_rwlock_wrlock(&LOCK_pool);
while (trn && !my_atomic_casptr((void **)&pool, (void **)&trn,
(void *)trn->next))
......@@ -213,9 +219,12 @@ void trnman_end_trn(TRN *trn, my_bool commit)
LF_PINS *pins= trn->pins;
pthread_mutex_lock(&LOCK_trn_list);
/* remove from active list */
trn->next->prev= trn->prev;
trn->prev->next= trn->next;
/* if this transaction was the oldest - clean up committed list */
if (trn->prev == &active_list_min)
{
TRN *t;
......@@ -232,6 +241,7 @@ void trnman_end_trn(TRN *trn, my_bool commit)
}
}
/* add transaction to the committed list (for read-from relations) */
if (commit && active_list_min.next != &active_list_max)
{
trn->commit_trid= global_trid_generator;
......@@ -243,7 +253,7 @@ void trnman_end_trn(TRN *trn, my_bool commit)
res= lf_hash_insert(&trid_to_trn, pins, &trn);
DBUG_ASSERT(res == 0);
}
else
else /* or free it right away */
{
trn->next= free_me;
free_me= trn;
......@@ -251,6 +261,7 @@ void trnman_end_trn(TRN *trn, my_bool commit)
trnman_active_transactions--;
pthread_mutex_unlock(&LOCK_trn_list);
/* the rest is done outside of a critical section */
lockman_release_locks(&maria_lockman, &trn->locks);
trn->locks.mutex= 0;
trn->locks.cond= 0;
......@@ -258,7 +269,6 @@ void trnman_end_trn(TRN *trn, my_bool commit)
my_atomic_storeptr((void **)&short_trid_to_trn[trn->locks.loid], 0);
my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn);
while (free_me) // XXX send them to the purge thread
{
int res;
......@@ -288,7 +298,7 @@ void trnman_free_trn(TRN *trn)
do
{
/*
without volatile cast gcc-3.4.4 moved the assignment
without this volatile cast gcc-3.4.4 moved the assignment
down after the loop at -O2
*/
*(TRN * volatile *)&(trn->next)= tmp;
......@@ -317,13 +327,13 @@ my_bool trnman_can_read_from(TRN *trn, TrID trid)
LF_REQUIRE_PINS(3);
if (trid < trn->min_read_from)
return TRUE;
return TRUE; /* can read */
if (trid > trn->trid)
return FALSE;
return FALSE; /* cannot read */
found= lf_hash_search(&trid_to_trn, trn->pins, &trid, sizeof(trid));
if (!found)
return FALSE; /* not in the hash of committed transactions = cannot read*/
return FALSE; /* not in the hash of committed transactions = cannot read */
can= (*found)->commit_trid < trn->trid;
lf_unpin(trn->pins, 2);
......
......@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
//#define EXTRA_VERBOSE
#undef EXTRA_VERBOSE
#include <tap.h>
......@@ -24,7 +24,7 @@
#include <lf.h>
#include "../lockman.h"
#define Nlos 10
#define Nlos 100
LOCK_OWNER loarray[Nlos];
pthread_mutex_t mutexes[Nlos];
pthread_cond_t conds[Nlos];
......@@ -51,8 +51,7 @@ LOCK_OWNER *loid2lo(uint16 loid)
#define lock_ok_a(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK)
#define lock_ok_i(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE)
#define lock_ok_l(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE)
#define lock_conflict(O,R,L) test_lock(O,R,L,"cannot ",DIDNT_GET_THE_LOCK); \
unlock_all(O)
#define lock_conflict(O,R,L) test_lock(O,R,L,"cannot ",DIDNT_GET_THE_LOCK);
void test_lockman_simple()
{
......@@ -64,7 +63,8 @@ void test_lockman_simple()
lock_ok_a(1, 1, X);
lock_ok_i(2, 2, IX);
/* failures */
lock_conflict(2,1,X); /* this removes all locks of lo2 */
lock_conflict(2,1,X);
unlock_all(2);
lock_ok_a(1,2,S);
lock_ok_a(1,2,IS);
lock_ok_a(1,2,LS);
......@@ -72,8 +72,36 @@ void test_lockman_simple()
lock_ok_a(2,3,LS);
lock_ok_i(1,3,IX);
lock_ok_l(2,3,IS);
lockman_release_locks(&lockman, loid2lo(1));
lockman_release_locks(&lockman, loid2lo(2));
unlock_all(1);
unlock_all(2);
lock_ok_i(1,1,IX);
lock_conflict(2,1,S);
lock_ok_a(1,1,LS);
unlock_all(1);
unlock_all(2);
lock_ok_i(1,1,IX);
lock_ok_a(2,1,LS);
lock_ok_a(1,1,LS);
lock_ok_i(1,1,IX);
lock_ok_i(3,1,IS);
unlock_all(1);
unlock_all(2);
unlock_all(3);
lock_ok_i(1,4,IS);
lock_ok_i(2,4,IS);
lock_ok_i(3,4,IS);
lock_ok_a(3,4,LS);
lock_ok_i(4,4,IS);
lock_conflict(4,4,IX);
lock_conflict(2,4,IX);
lock_ok_a(1,4,LS);
unlock_all(1);
unlock_all(2);
unlock_all(3);
unlock_all(4);
}
......@@ -82,11 +110,13 @@ pthread_mutex_t rt_mutex;
pthread_cond_t rt_cond;
int rt_num_threads;
int litmus;
int thread_number= 0, timeouts=0;
void run_test(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
ulonglong now= my_getsystime();
thread_number= timeouts= 0;
litmus= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
......@@ -100,13 +130,12 @@ void run_test(const char *test, pthread_handler handler, int n, int m)
ok(litmus == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, litmus);
}
int thread_number= 0, timeouts=0;
#define Nrows 1000
#define Ntables 10
#define TABLE_LOCK_RATIO 10
int Nrows= 100;
int Ntables= 10;
int table_lock_ratio= 10;
enum lock_type lock_array[6]={S,X,LS,LX,IS,IX};
char *lock2str[6]={"S","X","LS","LX","IS","IX"};
char *res2str[6]={
char *res2str[4]={
"DIDN'T GET THE LOCK",
"GOT THE LOCK",
"GOT THE LOCK NEED TO LOCK A SUBRESOURCE",
......@@ -128,10 +157,11 @@ pthread_handler_t test_lockman(void *arg)
row= x % Nrows + Ntables;
table= row % Ntables;
locklevel= (x/Nrows) & 3;
if ((x/Nrows/4) % TABLE_LOCK_RATIO == 0)
if (table_lock_ratio && (x/Nrows/4) % table_lock_ratio == 0)
{ /* table lock */
res= lockman_getlock(&lockman, lo, table, lock_array[locklevel]);
DIAG(("loid=%2d, table %d lock %s, res=%s", loid, table, lock2str[locklevel], res2str[res]));
DIAG(("loid=%2d, table %d lock %s, res=%s", loid, table,
lock2str[locklevel], res2str[res]));
if (res == DIDNT_GET_THE_LOCK)
{
lockman_release_locks(&lockman, lo);
......@@ -145,7 +175,8 @@ pthread_handler_t test_lockman(void *arg)
{ /* row lock */
locklevel&= 1;
res= lockman_getlock(&lockman, lo, table, lock_array[locklevel + 4]);
DIAG(("loid=%2d, row %d lock %s, res=%s", loid, row, lock2str[locklevel+4], res2str[res]));
DIAG(("loid=%2d, row %d lock %s, res=%s", loid, row,
lock2str[locklevel+4], res2str[res]));
switch (res)
{
case DIDNT_GET_THE_LOCK:
......@@ -159,7 +190,8 @@ pthread_handler_t test_lockman(void *arg)
/* not implemented, so take a regular lock */
case GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE:
res= lockman_getlock(&lockman, lo, row, lock_array[locklevel]);
DIAG(("loid=%2d, ROW %d lock %s, res=%s", loid, row, lock2str[locklevel], res2str[res]));
DIAG(("loid=%2d, ROW %d lock %s, res=%s", loid, row,
lock2str[locklevel], res2str[res]));
if (res == DIDNT_GET_THE_LOCK)
{
lockman_release_locks(&lockman, lo);
......@@ -196,7 +228,7 @@ int main()
my_init();
plan(14);
plan(31);
if (my_atomic_initialize())
return exit_status();
......@@ -222,11 +254,21 @@ int main()
test_lockman_simple();
#define CYCLES 100
#define CYCLES 1000
#define THREADS Nlos /* don't change this line */
/* mixed load, stress-test with random locks */
Nrows= 100;
Ntables= 10;
table_lock_ratio= 10;
run_test("lockman", test_lockman, THREADS,CYCLES);
/* "real-life" simulation - many rows, no table locks */
Nrows= 1000000;
Ntables= 10;
table_lock_ratio= 0;
run_test("lockman", test_lockman, THREADS,10000);
for (i= 0; i < Nlos; i++)
{
lockman_release_locks(&lockman, &loarray[i]);
......@@ -235,7 +277,12 @@ int main()
lf_pinbox_put_pins(loarray[i].pins);
}
{
ulonglong now= my_getsystime();
lockman_destroy(&lockman);
now= my_getsystime()-now;
diag("lockman_destroy: %g", ((double)now)/1e7);
}
pthread_mutex_destroy(&rt_mutex);
pthread_cond_destroy(&rt_cond);
......
......@@ -154,7 +154,7 @@ pthread_handler_t test_lf_pinbox(void *arg)
typedef union {
int32 data;
void *not_used; /* to guarantee sizeof(TLA) >= sizeof(void *) */
void *not_used;
} TLA;
pthread_handler_t test_lf_alloc(void *arg)
......@@ -294,7 +294,7 @@ int main()
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
my_atomic_rwlock_init(&rwl);
lf_alloc_init(&lf_allocator, sizeof(TLA));
lf_alloc_init(&lf_allocator, sizeof(TLA), offsetof(TLA, not_used));
lf_hash_init(&lf_hash, sizeof(int), LF_HASH_UNIQUE, 0, sizeof(int), 0,
&my_charset_bin);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment