Commit f0596569 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Improving logging for db-benchmark-test. Addresses #27.

git-svn-id: file:///svn/tokudb@1951 c7de825b-a66e-492c-adef-691d508d4ae1
parent efd6448d
......@@ -119,6 +119,7 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
//printf("%s:%d local_fingerprint=%8x\n", __FILE__, __LINE__, node->local_fingerprint);
//printf("%s:%d w.ndone=%d n_children=%d\n", __FILE__, __LINE__, w.ndone, node->n_children);
if (node->height>0) {
assert(node->u.n.n_children>0);
// Local fingerprint is not actually stored while in main memory. Must calculate it.
// Subtract the child fingerprints from the subtree fingerprint to get the local fingerprint.
{
......
This diff is collapsed.
......@@ -77,7 +77,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
return 0;
}
/* peek at the head of the fifo */
/* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, int *type) {
struct fifo_entry *entry = fifo_peek(fifo);
if (entry == 0) return -1;
......@@ -91,7 +91,7 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data,
int toku_fifo_deq(FIFO fifo) {
struct fifo_entry *entry = fifo_deq(fifo);
if (entry == 0) return ENOMEM;
if (entry == 0) return -1; // if entry is 0 then it was an empty fifo
toku_free_n(entry, fifo_entry_size(entry));
return 0;
}
......
......@@ -564,6 +564,7 @@ TXNID toku_txn_get_txnid (TOKUTXN txn) {
}
LSN toku_txn_get_last_lsn (TOKUTXN txn) {
if (txn==0) return (LSN){0};
return txn->last_lsn;
}
......
......@@ -88,12 +88,23 @@ const struct logtype logtypes[] = {
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0}, // children scoot over
{"DISKOFF", "child", 0},
{"u_int32_t", "childfingerprint", "%08x"},
NULLFIELD}},
{"delchild", 'r', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0}, // children scoot over
{"DISKOFF", "child", 0},
{"u_int32_t", "childfingerprint", "%08x"},
{"BYTESTRING", "pivotkey", 0},
NULLFIELD}},
{"setchild", 'i', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"DISKOFF", "child", 0},
{"DISKOFF", "oldchild", 0},
{"DISKOFF", "newchild", 0},
NULLFIELD}},
{"setpivot", 'k', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
......@@ -105,6 +116,26 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "fname", 0},
{"FILENUM", "filenum", 0},
NULLFIELD}},
{"brtdeq", 'U', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
{"u_int32_t", "oldfingerprint", "%08x"},
{"u_int32_t", "newfingerprint", "%08x"},
NULLFIELD}},
{"brtenq", 'Q', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
{"u_int32_t", "oldfingerprint", "%08x"},
{"u_int32_t", "newfingerprint", "%08x"},
NULLFIELD}},
{"insertinleaf", 'I', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
......
......@@ -714,6 +714,7 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
if (toku_txn_get_last_lsn(txn).lsn>=3836455 && toku_txn_get_last_lsn(txn).lsn<=3836460) printf("%s:%d inserting\n", __FILE__, __LINE__);
int r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
return r;
......@@ -844,8 +845,10 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int r;
int found;
int traceit = toku_txn_get_last_lsn(txn).lsn>=3836455 && toku_txn_get_last_lsn(txn).lsn<=3836460;
unsigned int idx = pma_search(pma, k, pma->dup_mode & TOKU_DB_DUPSORT ? v : 0, 0, pma->N, &found);
if (found) {
if (traceit) printf("%s:%d Already present at lsn %ld\n", __FILE__, __LINE__, toku_txn_get_last_lsn(txn).lsn);
struct kv_pair *kv = pma->pairs[idx];
*replaced_v_size = kv->vallen;
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
......@@ -868,6 +871,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
}
if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx;
if (traceit) printf("%s:%d inuse\n", __FILE__, __LINE__);
r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, node_lsn); /* returns the new idx. */
if (r!=0) return r;
idx=newidx;
......@@ -884,6 +888,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
if (traceit) printf("%s:%d inserting\n", __FILE__, __LINE__);
r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
}
......
......@@ -222,6 +222,13 @@ static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf,
*cf = pair->cf;
}
int toku_rollback_brtdeq (struct logtype_brtdeq * le, TOKUTXN txn) ABORTIT
void toku_recover_brtdeq (struct logtype_brtdeq *le) { le=le; assert(0); }
int toku_rollback_brtenq (struct logtype_brtenq * le, TOKUTXN txn) ABORTIT
void toku_recover_brtenq (struct logtype_brtenq *le) { le=le; assert(0); }
int toku_rollback_addchild (struct logtype_addchild *le, TOKUTXN txn) ABORTIT
void toku_recover_addchild (struct logtype_addchild *le) {
CACHEFILE cf;
......@@ -241,7 +248,7 @@ void toku_recover_addchild (struct logtype_addchild *le) {
}
node->u.n.childinfos[le->childnum].subtree_fingerprint = 0;
node->u.n.childkeys [le->childnum] = 0;
node->u.n.children [le->childnum] = -1;
node->u.n.children [le->childnum] = le->child;
int r= toku_fifo_create(&node->u.n.buffers[le->childnum]); assert(r==0);
node->u.n.n_bytes_in_buffer[le->childnum] = 0;
node->u.n.n_children++;
......@@ -250,6 +257,9 @@ void toku_recover_addchild (struct logtype_addchild *le) {
assert(r==0);
}
int toku_rollback_delchild (struct logtype_delchild * le, TOKUTXN txn) ABORTIT
void toku_recover_delchild (struct logtype_delchild *le) { le=le; assert(0); }
int toku_rollback_setchild (struct logtype_setchild *le, TOKUTXN txn) ABORTIT
void toku_recover_setchild (struct logtype_setchild *le) {
struct cf_pair *pair;
......@@ -262,7 +272,7 @@ void toku_recover_setchild (struct logtype_setchild *le) {
BRTNODE node = node_v;
assert(node->height>0);
assert(le->childnum < (unsigned)node->u.n.n_children);
node->u.n.children[le->childnum] = le->child;
node->u.n.children[le->childnum] = le->newchild;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
......@@ -298,6 +308,7 @@ void toku_recover_changechildfingerprint (struct logtype_changechildfingerprint
assert(r==0);
BRTNODE node = node_v;
assert(node->height>0);
assert((signed)le->childnum < node->u.n.n_children);
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, le->childnum) = le->newfingerprint;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
......@@ -380,13 +391,11 @@ void toku_recover_deleteinleaf (struct logtype_deleteinleaf *c) {
BRTNODE node = node_v;
assert(node->height==0);
VERIFY_COUNTS(node);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
r = toku_pma_clear_at_index(node->u.l.buffer, c->pmaidx);
assert (r==0);
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment