Commit ba961b40 authored by Yoni Fogel's avatar Yoni Fogel

Closes #1693 Zombie brts pass off responsibilities to others.

When a new brt is open, it takes over responsibilities of all existing zombie brts, and closes them.
When a brt closes, if other brts exist, it passes off responsibilities and closes immediately.  It only becomes a zombie if no other brts exist.

(Everything above is in context of a single header).

git-svn-id: file:///svn/toku/tokudb@11533 c7de825b-a66e-492c-adef-691d508d4ae1
parent d997f45e
......@@ -282,12 +282,14 @@ unlock_for_blocktable (BLOCK_TABLE bt) {
}
void
toku_block_lock_for_multiple_operations (BLOCK_TABLE bt) {
toku_brtheader_lock (struct brt_header *h) {
BLOCK_TABLE bt = h->blocktable;
lock_for_blocktable(bt);
}
void
toku_block_unlock_for_multiple_operations (BLOCK_TABLE bt) {
toku_brtheader_unlock (struct brt_header *h) {
BLOCK_TABLE bt = h->blocktable;
assert(bt->is_locked);
unlock_for_blocktable(bt);
}
......
......@@ -20,8 +20,8 @@ void toku_blocktable_create_new(BLOCK_TABLE *btp);
void toku_blocktable_create_from_buffer(BLOCK_TABLE *btp, DISKOFF location_on_disk, DISKOFF size_on_disk, unsigned char *translation_buffer);
void toku_blocktable_destroy(BLOCK_TABLE *btp);
void toku_block_lock_for_multiple_operations(BLOCK_TABLE bt);
void toku_block_unlock_for_multiple_operations(BLOCK_TABLE bt);
void toku_brtheader_lock(struct brt_header *h);
void toku_brtheader_unlock(struct brt_header *h);
void toku_block_translation_note_start_checkpoint_unlocked(BLOCK_TABLE bt);
void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt, struct brt_header *h);
......
......@@ -162,7 +162,6 @@ struct brt_header {
// LSB indicates which header location is used on disk so this
// counter is effectively a boolean which alternates with each checkpoint.
LSN checkpoint_lsn; // LSN of creation of "checkpoint-begin" record in log.
int refcount;
int dirty;
int panic; // If nonzero there was a write error. Don't write any more, because it probably only gets worse. This is the error code.
char *panic_string; // A malloced string that can indicate what went wrong.
......@@ -180,6 +179,8 @@ struct brt_header {
// If a transaction locked the BRT when it was empty, which transaction? (Only the latest one matters)
// 0 if no such transaction
TXNID txnid_that_created_or_locked_when_empty;
struct list live_brts;
struct list zombie_brts;
};
struct brt {
......@@ -205,6 +206,9 @@ struct brt {
int was_closed; //True when this brt was closed, but is being kept around for transactions.
int (*close_db)(DB*, u_int32_t);
u_int32_t close_flags;
struct list live_brt_link;
struct list zombie_brt_link;
};
/* serialization code */
......
......@@ -1030,7 +1030,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
int rr = 0;
if (h->panic) return h->panic;
assert(h->type==BRTHEADER_CHECKPOINT_INPROGRESS);
toku_block_lock_for_multiple_operations(h->blocktable);
toku_brtheader_lock(h);
struct wbuf w_translation;
int64_t size_translation;
int64_t address_translation;
......@@ -1051,7 +1051,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
}
assert(w_main.ndone==size_main);
}
toku_block_unlock_for_multiple_operations(h->blocktable);
toku_brtheader_unlock(h);
char *writing_what;
lock_for_pwrite();
{
......@@ -1225,6 +1225,8 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
h->dirty=0;
h->panic = 0;
h->panic_string = 0;
list_init(&h->live_brts);
list_init(&h->zombie_brts);
//version MUST be in network order on disk regardless of disk order
h->layout_version = rbuf_network_int(&rc);
assert(h->layout_version==BRT_LAYOUT_VERSION_10);
......
......@@ -2871,6 +2871,8 @@ brt_init_header (BRT t) {
toku_allocate_blocknum(t->h->blocktable, &root, t->h);
t->h->root = root;
list_init(&t->h->live_brts);
list_init(&t->h->zombie_brts);
int r = brt_init_header_partial(t);
if (r==0) toku_block_verify_no_free_blocknums(t->h->blocktable);
return r;
......@@ -2919,6 +2921,36 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header
return 0;
}
static void
brtheader_note_brt_close(BRT t) {
struct brt_header *h = t->h;
if (h) { //Might not yet have been opened.
toku_brtheader_lock(h);
list_remove(&t->live_brt_link);
list_remove(&t->zombie_brt_link);
toku_brtheader_unlock(h);
}
}
static int
brtheader_note_brt_open(BRT live) {
struct brt_header *h = live->h;
int retval = 0;
toku_brtheader_lock(h);
while (!list_empty(&h->zombie_brts)) {
//Remove dead brt from list
BRT zombie = list_struct(list_pop(&h->zombie_brts), struct brt, zombie_brt_link);
toku_brtheader_unlock(h); //Cannot be holding lock when swapping brts.
retval = toku_txn_note_swap_brt(live, zombie); //Steal responsibility, close
toku_brtheader_lock(h);
if (retval) break;
}
if (retval==0)
list_push(&h->live_brts, &live->live_brt_link);
toku_brtheader_unlock(h);
return retval;
}
int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
int r;
......@@ -3001,8 +3033,15 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre
r = EINVAL;
goto died_after_read_and_pin;
}
//TODO: Disallow changing if exists two brts with the same header (counting this one)
// The upgrade would be impossible/very hard!
toku_brtheader_lock(t->h);
if (!list_empty(&t->h->live_brts) || !list_empty(&t->h->zombie_brts)) {
//Disallow changing if exists two brts with the same header (counting this one)
//The upgrade would be impossible/very hard!
r = EINVAL;
toku_brtheader_unlock(t->h);
goto died_after_read_and_pin;
}
toku_brtheader_unlock(t->h);
DISKOFF offset;
//4 for checksum
toku_realloc_descriptor_on_disk(t->h->blocktable, toku_serialize_descriptor_size(&t->temp_descriptor)+4, &offset, t->h);
......@@ -3015,6 +3054,8 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre
t->temp_descriptor.dbt.data = NULL;
t->did_set_descriptor = 0;
}
r = brtheader_note_brt_open(t);
if (r!=0) goto died_after_read_and_pin;
if (t->db) t->db->descriptor = &t->h->descriptor.dbt;
if (txn_created) {
assert(txn);
......@@ -3079,13 +3120,13 @@ toku_brtheader_begin_checkpoint (CACHEFILE UU(cachefile), LSN checkpoint_lsn, vo
int r = h->panic;
if (r==0) {
// hold lock around copying and clearing of dirty bit
toku_block_lock_for_multiple_operations (h->blocktable);
toku_brtheader_lock (h);
assert(h->type == BRTHEADER_CURRENT);
assert(h->checkpoint_header == NULL);
brtheader_copy_for_checkpoint(h, checkpoint_lsn);
h->dirty = 0; // this is only place this bit is cleared (in currentheader)
toku_block_translation_note_start_checkpoint_unlocked(h->blocktable);
toku_block_unlock_for_multiple_operations (h->blocktable);
toku_brtheader_unlock (h);
}
return r;
}
......@@ -3159,6 +3200,10 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error
{
struct brt_header *h = header_v;
assert(h->type == BRTHEADER_CURRENT);
toku_brtheader_lock(h);
assert(list_empty(&h->live_brts));
assert(list_empty(&h->zombie_brts));
toku_brtheader_unlock(h);
int r = 0;
if (h->dirty) { // this is the only place this bit is tested (in currentheader)
//TODO: #1627 put meaningful LSN in for begin_checkpoint
......@@ -3180,22 +3225,42 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error
}
int
toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t), u_int32_t close_flags) {
toku_brt_db_delay_closed (BRT zombie, DB* db, int (*close_db)(DB*, u_int32_t), u_int32_t close_flags) {
//Requires: close_db needs to call toku_close_brt to delete the final reference.
int r;
if (brt->was_closed) r = EINVAL;
else if (brt->db && brt->db!=db) r = EINVAL;
struct brt_header *h = zombie->h;
if (zombie->was_closed) r = EINVAL;
else if (zombie->db && zombie->db!=db) r = EINVAL;
else {
assert(brt->close_db==NULL);
brt->close_db = close_db;
brt->close_flags = close_flags;
brt->was_closed = 1;
if (!brt->db) brt->db = db;
if (toku_omt_size(brt->txns) == 0) {
assert(zombie->close_db==NULL);
zombie->close_db = close_db;
zombie->close_flags = close_flags;
zombie->was_closed = 1;
if (!zombie->db) zombie->db = db;
if (toku_omt_size(zombie->txns) == 0) {
//Close immediately.
r = brt->close_db(brt->db, brt->close_flags);
r = zombie->close_db(zombie->db, zombie->close_flags);
}
else {
//Try to pass responsibility off.
toku_brtheader_lock(zombie->h);
list_remove(&zombie->live_brt_link); //Remove from live.
BRT replacement = NULL;
if (!list_empty(&h->live_brts)) {
replacement = list_struct(list_head(&h->live_brts), struct brt, live_brt_link);
}
else if (!list_empty(&h->zombie_brts)) {
replacement = list_struct(list_head(&h->zombie_brts), struct brt, zombie_brt_link);
}
list_push(&h->zombie_brts, &zombie->zombie_brt_link); //Add to dead list.
toku_brtheader_unlock(zombie->h);
if (replacement == NULL) r = 0; //Just delay close
else {
//Pass responsibility off and close zombie.
//Skip adding to dead list
r = toku_txn_note_swap_brt(replacement, zombie);
}
}
else r = 0;
}
return r;
}
......@@ -3213,6 +3278,7 @@ int toku_close_brt (BRT brt, TOKULOGGER logger, char **error_string) {
r=toku_txn_note_close_brt(brt);
assert(r==0);
toku_omt_destroy(&brt->txns);
brtheader_note_brt_close(brt);
if (brt->cf) {
if (logger) {
......@@ -3241,6 +3307,8 @@ int toku_brt_create(BRT *brt_ptr) {
if (brt == 0)
return ENOMEM;
memset(brt, 0, sizeof *brt);
list_init(&brt->live_brt_link);
list_init(&brt->zombie_brt_link);
list_init(&brt->cursors);
brt->flags = 0;
brt->did_set_flags = 0;
......@@ -4683,7 +4751,7 @@ int toku_brt_truncate (BRT brt) {
// TODO log the truncate?
toku_block_lock_for_multiple_operations(brt->h->blocktable);
toku_brtheader_lock(brt->h);
if (r==0) {
//Free all data blocknums and associated disk space (if not held on to by checkpoint)
toku_block_translation_truncate_unlocked(brt->h->blocktable, brt->h);
......@@ -4693,7 +4761,7 @@ int toku_brt_truncate (BRT brt) {
r = brt_init_header_partial(brt);
}
toku_block_unlock_for_multiple_operations(brt->h->blocktable);
toku_brtheader_unlock(brt->h);
return r;
}
......
......@@ -1141,6 +1141,42 @@ int toku_txn_note_brt (TOKUTXN txn, BRT brt) {
return 0;
}
struct swap_brt_extra {
BRT live;
BRT zombie;
};
static int swap_brt (OMTVALUE txnv, u_int32_t UU(idx), void *extra) {
struct swap_brt_extra *info = extra;
TOKUTXN txn = txnv;
OMTVALUE zombie_again=NULL;
u_int32_t index;
int r;
r = toku_txn_note_brt(txn, info->live); //Add new brt.
assert(r==0);
r = toku_omt_find_zero(txn->open_brts, find_filenum, info->zombie, &zombie_again, &index, NULL);
assert(r==0);
assert((void*)zombie_again==info->zombie);
r = toku_omt_delete_at(txn->open_brts, index); //Delete old brt.
assert(r==0);
return 0;
}
int toku_txn_note_swap_brt (BRT live, BRT zombie) {
struct swap_brt_extra swap = {.live = live, .zombie = zombie};
int r = toku_omt_iterate(zombie->txns, swap_brt, &swap);
assert(r==0);
toku_omt_clear(zombie->txns);
//Close immediately.
assert(zombie->close_db);
r = zombie->close_db(zombie->db, zombie->close_flags);
return r;
}
static int remove_brt (OMTVALUE txnv, u_int32_t UU(idx), void *brtv) {
TOKUTXN txn = txnv;
BRT brt = brtv;
......
......@@ -174,6 +174,7 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield
int toku_txn_note_brt (TOKUTXN txn, BRT brt);
int toku_txn_note_close_brt (BRT brt);
int toku_txn_note_swap_brt (BRT t, BRT deadbeat);
// find the TOKUTXN object by xid
// if found then return 0 and set txnptr to the address of the TOKUTXN object
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment