#include <my_global.h> #include <my_sys.h> #include <lf.h> #include "trxman.h" TRX active_list_min, active_list_max, committed_list_min, committed_list_max, *pool; pthread_mutex_t LOCK_trx_list; uint trxman_active_transactions, trxman_allocated_transactions; TrID global_trid_generator; TRX **short_id_to_trx; my_atomic_rwlock_t LOCK_short_id_to_trx; LF_HASH trid_to_trx; static byte *trx_get_hash_key(const byte *trx,uint* len, my_bool unused) { *len= sizeof(TrID); return (byte *) & ((*((TRX **)trx))->trid); } int trxman_init() { pthread_mutex_init(&LOCK_trx_list, MY_MUTEX_INIT_FAST); active_list_max.trid= active_list_min.trid= 0; active_list_max.min_read_from=~0; active_list_max.next= active_list_min.prev= 0; active_list_max.prev= &active_list_min; active_list_min.next= &active_list_max; trxman_active_transactions= 0; trxman_allocated_transactions= 0; committed_list_max.commit_trid= ~0; committed_list_max.next= committed_list_min.prev= 0; committed_list_max.prev= &committed_list_min; committed_list_min.next= &committed_list_max; pool=0; global_trid_generator=0; /* set later by recovery code */ lf_hash_init(&trid_to_trx, sizeof(TRX*), LF_HASH_UNIQUE, 0, 0, trx_get_hash_key, 0); my_atomic_rwlock_init(&LOCK_short_id_to_trx); short_id_to_trx=(TRX **)my_malloc(SHORT_ID_MAX*sizeof(TRX*), MYF(MY_WME|MY_ZEROFILL)); if (!short_id_to_trx) return 1; short_id_to_trx--; /* min short_id is 1 */ return 0; } int trxman_destroy() { DBUG_ASSERT(trid_to_trx.count == 0); DBUG_ASSERT(trxman_active_transactions == 0); DBUG_ASSERT(active_list_max.prev == &active_list_min); DBUG_ASSERT(active_list_min.next == &active_list_max); DBUG_ASSERT(committed_list_max.prev == &committed_list_min); DBUG_ASSERT(committed_list_min.next == &committed_list_max); while (pool) { TRX *tmp=pool->next; my_free((void *)pool, MYF(0)); pool=tmp; } lf_hash_destroy(&trid_to_trx); pthread_mutex_destroy(&LOCK_trx_list); my_atomic_rwlock_destroy(&LOCK_short_id_to_trx); my_free((void *)(short_id_to_trx+1), MYF(0)); } static TrID new_trid() { DBUG_ASSERT(global_trid_generator < 0xffffffffffffLL); safe_mutex_assert_owner(&LOCK_trx_list); return ++global_trid_generator; } static void set_short_id(TRX *trx) { int i= (global_trid_generator + (intptr)trx) * 312089 % SHORT_ID_MAX; my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx); for ( ; ; i= i % SHORT_ID_MAX + 1) /* the range is [1..SHORT_ID_MAX] */ { void *tmp=NULL; if (short_id_to_trx[i] == NULL && my_atomic_casptr((void **)&short_id_to_trx[i], &tmp, trx)) break; } my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx); trx->short_id= i; } TRX *trxman_new_trx() { TRX *trx; my_atomic_add32(&trxman_active_transactions, 1); /* see trxman_end_trx to see why we need a mutex here and as we have a mutex, we can as well do everything under it - allocating a TRX, incrementing trxman_active_transactions, setting trx->min_read_from. Note that all the above is fast. generating short_id may be slow, as it involves scanning a big array - so it's still done outside of the mutex. */ pthread_mutex_lock(&LOCK_trx_list); trx=pool; while (trx && !my_atomic_casptr((void **)&pool, (void **)&trx, trx->next)) /* no-op */; if (!trx) { trx=(TRX *)my_malloc(sizeof(TRX), MYF(MY_WME)); trxman_allocated_transactions++; } if (!trx) return 0; trx->min_read_from= active_list_min.next->trid; trx->trid= new_trid(); trx->short_id= 0; trx->next= &active_list_max; trx->prev= active_list_max.prev; active_list_max.prev= trx->prev->next= trx; pthread_mutex_unlock(&LOCK_trx_list); trx->pins=lf_hash_get_pins(&trid_to_trx); if (!trx->min_read_from) trx->min_read_from= trx->trid; trx->commit_trid=0; set_short_id(trx); /* this must be the last! */ return trx; } /* remove a trx from the active list, move to committed list, set commit_trid TODO integrate with lock manager, log manager. That means: a common "commit" mutex - forcing the log and setting commit_trid must be done atomically (QQ how the heck it could be done with group commit ???) trid_to_trx, active_list_*, and committed_list_* can be updated asyncronously. */ void trxman_end_trx(TRX *trx, my_bool commit) { int res; TRX *free_me= 0; LF_PINS *pins= trx->pins; pthread_mutex_lock(&LOCK_trx_list); trx->next->prev= trx->prev; trx->prev->next= trx->next; if (trx->prev == &active_list_min) { TRX *t; for (t= committed_list_min.next; t->commit_trid < active_list_min.next->min_read_from; t= t->next) /* no-op */; if (t != committed_list_min.next) { free_me= committed_list_min.next; committed_list_min.next= t; t->prev->next=0; t->prev= &committed_list_min; } } my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx); my_atomic_storeptr((void **)&short_id_to_trx[trx->short_id], 0); my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx); if (commit && active_list_min.next != &active_list_max) { trx->commit_trid= global_trid_generator; trx->next= &committed_list_max; trx->prev= committed_list_max.prev; committed_list_max.prev= trx->prev->next= trx; res= lf_hash_insert(&trid_to_trx, pins, &trx); DBUG_ASSERT(res == 0); } else { trx->next=free_me; free_me=trx; } pthread_mutex_unlock(&LOCK_trx_list); my_atomic_add32(&trxman_active_transactions, -1); while (free_me) { int res; TRX *t= free_me; free_me= free_me->next; res= lf_hash_delete(&trid_to_trx, pins, &t->trid, sizeof(TrID)); trxman_free_trx(t); } lf_hash_put_pins(pins); } /* free a trx (add to the pool, that is */ void trxman_free_trx(TRX *trx) { TRX *tmp=pool; do { trx->next=tmp; } while (!my_atomic_casptr((void **)&pool, (void **)&tmp, trx)); } my_bool trx_can_read_from(TRX *trx, TrID trid) { TRX *found; my_bool can; if (trid < trx->min_read_from) return TRUE; if (trid > trx->trid) return FALSE; found= lf_hash_search(&trid_to_trx, trx->pins, &trid, sizeof(trid)); if (!found) return FALSE; /* not in the hash = cannot read */ can= found->commit_trid < trx->trid; lf_unpin(trx->pins, 2); return can; }