diff --git a/newbrt/brt-old.c b/newbrt/brt-old.c index 763e4dac1b286513035cc7537da6f6afa18e9e4f..5a8e1ac0a4b5c4c072e597b155e3029581198b91 100644 --- a/newbrt/brt-old.c +++ b/newbrt/brt-old.c @@ -1458,324 +1458,6 @@ int toku_cmd_leafval_bessel (OMTVALUE lev, void *extra) { // Whenever anything provisional is happening, it's XID must match the cmd's. -static int apply_cmd_to_le_committed (u_int32_t klen, void *kval, - u_int32_t dlen, void *dval, - BRT_CMD cmd, - u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { - //assert(cmd->u.id.key->size == klen); - //assert(memcmp(cmd->u.id.key->data, kval, klen)==0); - switch (cmd->type) { - case BRT_INSERT: - return le_both(cmd->xid, - klen, kval, - dlen, dval, - cmd->u.id.val->size, cmd->u.id.val->data, - newlen, disksize, new_data); - case BRT_DELETE_ANY: - case BRT_DELETE_BOTH: - return le_provdel(cmd->xid, - klen, kval, - dlen, dval, - newlen, disksize, new_data); - case BRT_ABORT_BOTH: - case BRT_ABORT_ANY: - case BRT_COMMIT_BOTH: - case BRT_COMMIT_ANY: - // Just return the original committed record - return le_committed(klen, kval, dlen, dval, - newlen, disksize, new_data); - case BRT_NONE: break; - } - assert(0); - return 0; -} - -static int apply_cmd_to_le_both (TXNID xid, - u_int32_t klen, void *kval, - u_int32_t clen, void *cval, - u_int32_t plen, void *pval, - BRT_CMD cmd, - u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { - u_int32_t prev_len; - void *prev_val; - if (xid==cmd->xid) { - // The xids match, so throw away the provisional value. - prev_len = clen; prev_val = cval; - } else { - // If the xids don't match, then we are moving the provisional value to committed status. - prev_len = plen; prev_val = pval; - } - // keep the committed value for rollback. - //assert(cmd->u.id.key->size == klen); - //assert(memcmp(cmd->u.id.key->data, kval, klen)==0); - switch (cmd->type) { - case BRT_INSERT: - return le_both(cmd->xid, - klen, kval, - prev_len, prev_val, - cmd->u.id.val->size, cmd->u.id.val->data, - newlen, disksize, new_data); - case BRT_DELETE_ANY: - case BRT_DELETE_BOTH: - return le_provdel(cmd->xid, - klen, kval, - prev_len, prev_val, - newlen, disksize, new_data); - case BRT_ABORT_BOTH: - case BRT_ABORT_ANY: - // I don't see how you could have an abort where the xids don't match. But do it anyway. - return le_committed(klen, kval, - prev_len, prev_val, - newlen, disksize, new_data); - case BRT_COMMIT_BOTH: - case BRT_COMMIT_ANY: - // In the future we won't even have these commit messages. - return le_committed(klen, kval, - plen, pval, - newlen, disksize, new_data); - case BRT_NONE: break; - } - assert(0); - return 0; -} - -static int apply_cmd_to_le_provdel (TXNID xid, - u_int32_t klen, void *kval, - u_int32_t clen, void *cval, - BRT_CMD cmd, - u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { - // keep the committed value for rollback - //assert(cmd->u.id.key->size == klen); - //assert(memcmp(cmd->u.id.key->data, kval, klen)==0); - switch (cmd->type) { - case BRT_INSERT: - if (cmd->xid == xid) { - return le_both(cmd->xid, - klen, kval, - clen, cval, - cmd->u.id.val->size, cmd->u.id.val->data, - newlen, disksize, new_data); - } else { - // It's an insert, but the committed value is deleted (since the xids don't match, we assume the delete took effect) - return le_provpair(cmd->xid, - klen, kval, - cmd->u.id.val->size, cmd->u.id.val->data, - newlen, disksize, new_data); - } - case BRT_DELETE_ANY: - case BRT_DELETE_BOTH: - if (cmd->xid == xid) { - // A delete of a delete could conceivably return the identical value, saving a malloc and a free, but to simplify things we just reallocate it - // because othewise we have to notice not to free() the olditem. - return le_provdel(cmd->xid, - klen, kval, - clen, cval, - newlen, disksize, new_data); - } else { - // The commited value is deleted, and we are deleting, so treat as a delete. - *new_data = 0; - return 0; - } - case BRT_ABORT_BOTH: - case BRT_ABORT_ANY: - // I don't see how the xids could not match... - return le_committed(klen, kval, - clen, cval, - newlen, disksize, new_data); - case BRT_COMMIT_BOTH: - case BRT_COMMIT_ANY: - *new_data = 0; - return 0; - case BRT_NONE: break; - } - assert(0); - return 0; -} - -static int apply_cmd_to_le_provpair (TXNID xid, - u_int32_t klen, void *kval, - u_int32_t plen , void *pval, - BRT_CMD cmd, - u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { - //assert(cmd->u.id.key->size == klen); - //assert(memcmp(cmd->u.id.key->data, kval, klen)==0); - switch (cmd->type) { - case BRT_INSERT: - if (cmd->xid == xid) { - // it's still a provpair (the old prov value is lost) - return le_provpair(cmd->xid, - klen, kval, - cmd->u.id.val->size, cmd->u.id.val->data, - newlen, disksize, new_data); - } else { - // the old prov was actually committed. - return le_both(cmd->xid, - klen, kval, - plen, pval, - cmd->u.id.val->size, cmd->u.id.val->data, - newlen, disksize, new_data); - } - case BRT_DELETE_BOTH: - case BRT_DELETE_ANY: - if (cmd->xid == xid) { - // A delete of a provisional pair is nothign - *new_data = 0; - return 0; - } else { - // The prov pair is actually a committed value. - return le_provdel(cmd->xid, - klen, kval, - plen, pval, - newlen, disksize, new_data); - } - case BRT_ABORT_BOTH: - case BRT_ABORT_ANY: - // An abort of a provisional pair is nothing. - *new_data = 0; - return 0; - case BRT_COMMIT_ANY: - case BRT_COMMIT_BOTH: - return le_committed(klen, kval, - plen, pval, - newlen, disksize, new_data); - case BRT_NONE: break; - } - assert(0); - return 0; -} - -static int apply_cmd_to_leaf (BRT_CMD cmd, - void *stored_data, // NULL if there was no stored data. - u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { - if (stored_data==0) { - switch (cmd->type) { - case BRT_INSERT: - { - LEAFENTRY le; - int r = le_provpair(cmd->xid, - cmd->u.id.key->size, cmd->u.id.key->data, - cmd->u.id.val->size, cmd->u.id.val->data, - newlen, disksize, &le); - if (r==0) *new_data=le; - return r; - } - case BRT_DELETE_BOTH: - case BRT_DELETE_ANY: - case BRT_ABORT_BOTH: - case BRT_ABORT_ANY: - case BRT_COMMIT_BOTH: - case BRT_COMMIT_ANY: - *new_data = 0; - return 0; // Don't have to insert anything. - case BRT_NONE: - break; - } - assert(0); - return 0; - } else { - LESWITCHCALL(stored_data, apply_cmd_to, cmd, - newlen, disksize, new_data); - } - abort(); return 0; // make certain compilers happy -} - -static int -should_compare_both_keys (BRTNODE node, BRT_CMD cmd) { - switch (cmd->type) { - case BRT_INSERT: - return node->flags & TOKU_DB_DUPSORT; - case BRT_DELETE_BOTH: - case BRT_ABORT_BOTH: - case BRT_COMMIT_BOTH: - return 1; - case BRT_DELETE_ANY: - case BRT_ABORT_ANY: - case BRT_COMMIT_ANY: - return 0; - case BRT_NONE: - break; - } - assert(0); - return 0; -} - -static int brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, - u_int32_t idx, LEAFENTRY le) { - FILENUM filenum = toku_cachefile_filenum(t->cf); - u_int32_t newlen=0, newdisksize=0; - LEAFENTRY newdata=0; - int r = apply_cmd_to_leaf(cmd, le, &newlen, &newdisksize, &newdata); - if (r!=0) return r; - if (newdata) assert(newdisksize == leafentry_disksize(newdata)); - - //printf("Applying command: %s xid=%lld ", unparse_cmd_type(cmd->type), (long long)cmd->xid); - //toku_print_BYTESTRING(stdout, cmd->u.id.key->size, cmd->u.id.key->data); - //printf(" "); - //toku_print_BYTESTRING(stdout, cmd->u.id.val->size, cmd->u.id.val->data); - //printf(" to \n"); - //print_leafentry(stdout, le); printf("\n"); - //printf(" got "); print_leafentry(stdout, newdata); printf("\n"); - - if (le && newdata) { - if (t->txn_that_created != cmd->xid) { - if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r; - if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r; - } - - node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le); - node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le); - - u_int32_t size = leafentry_memsize(le); - - LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen); - assert(new_le); - memcpy(new_le, newdata, newlen); - - // This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for. - // But we must compute the size before doing the mempool malloc because otherwise the le pointer is no good. - toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since le may be no good any more. - - node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize; - node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata); - toku_free(newdata); - - if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) return r; - - } else { - if (le) { - // It's there, note that it's gone and remove it from the mempool - - if (t->txn_that_created != cmd->xid) { - if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r; - } - - if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) return r; - - node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le); - node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le); - - toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(le)); // Must pass 0, since le may be no good any more. - - } - if (newdata) { - LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen); - assert(new_le); - memcpy(new_le, newdata, newlen); - if ((r = toku_omt_insert_at(node->u.l.buffer, new_le, idx))) return r; - - if (t->txn_that_created != cmd->xid) { - if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r; - } - - node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize; - node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata); - toku_free(newdata); - } - } -// printf("%s:%d rand4=%08x local_fingerprint=%08x this=%08x\n", __FILE__, __LINE__, node->rand4fingerprint, node->local_fingerprint, toku_calccrc32_kvpair_struct(kv)); - return 0; -} - static int brt_leaf_put_cmd_simple (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, u_int64_t *new_size /*OUT*/ diff --git a/newbrt/brt.c b/newbrt/brt.c index 3d42e7de051091810f07508128953f8a8249b065..acdfef97a51aadf8dc34aaaaee3473741051a154 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -134,6 +134,15 @@ message are not overfull. (But they may be underfull or too fat or too thin.) // even from unrelated BRTs. This way we only invalidate an OMTCURSOR if static u_int64_t global_root_put_counter = 0; +enum should_status { SHOULD_OK, SHOULD_MERGE, SHOULD_SPLIT }; + +//#define SLOW +#ifdef SLOW +#define VERIFY_NODE(t,n) (toku_verify_counts(n), toku_verify_estimates(t,n)) +#else +#define VERIFY_NODE(t,n) ((void)0) +#endif + static void fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child, BRT UU(brt), TOKULOGGER UU(logger)) // Effect: Sum the child fingerprint (and leafentry estimates) and store them in NODE. @@ -316,21 +325,485 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r return 0; } -enum should_status { SHOULD_OK, SHOULD_MERGE, SHOULD_SPLIT }; +static int +should_compare_both_keys (BRTNODE node, BRT_CMD cmd) +// Effect: Return nonzero if we need to compare both the key and the value. +{ + switch (cmd->type) { + case BRT_INSERT: + return node->flags & TOKU_DB_DUPSORT; + case BRT_DELETE_BOTH: + case BRT_ABORT_BOTH: + case BRT_COMMIT_BOTH: + return 1; + case BRT_DELETE_ANY: + case BRT_ABORT_ANY: + case BRT_COMMIT_ANY: + return 0; + case BRT_NONE: + break; + } + assert(0); + return 0; +} + +static int apply_cmd_to_le_committed (u_int32_t klen, void *kval, + u_int32_t dlen, void *dval, + BRT_CMD cmd, + u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { + //assert(cmd->u.id.key->size == klen); + //assert(memcmp(cmd->u.id.key->data, kval, klen)==0); + switch (cmd->type) { + case BRT_INSERT: + return le_both(cmd->xid, + klen, kval, + dlen, dval, + cmd->u.id.val->size, cmd->u.id.val->data, + newlen, disksize, new_data); + case BRT_DELETE_ANY: + case BRT_DELETE_BOTH: + return le_provdel(cmd->xid, + klen, kval, + dlen, dval, + newlen, disksize, new_data); + case BRT_ABORT_BOTH: + case BRT_ABORT_ANY: + case BRT_COMMIT_BOTH: + case BRT_COMMIT_ANY: + // Just return the original committed record + return le_committed(klen, kval, dlen, dval, + newlen, disksize, new_data); + case BRT_NONE: break; + } + assert(0); + return 0; +} + +static int apply_cmd_to_le_both (TXNID xid, + u_int32_t klen, void *kval, + u_int32_t clen, void *cval, + u_int32_t plen, void *pval, + BRT_CMD cmd, + u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { + u_int32_t prev_len; + void *prev_val; + if (xid==cmd->xid) { + // The xids match, so throw away the provisional value. + prev_len = clen; prev_val = cval; + } else { + // If the xids don't match, then we are moving the provisional value to committed status. + prev_len = plen; prev_val = pval; + } + // keep the committed value for rollback. + //assert(cmd->u.id.key->size == klen); + //assert(memcmp(cmd->u.id.key->data, kval, klen)==0); + switch (cmd->type) { + case BRT_INSERT: + return le_both(cmd->xid, + klen, kval, + prev_len, prev_val, + cmd->u.id.val->size, cmd->u.id.val->data, + newlen, disksize, new_data); + case BRT_DELETE_ANY: + case BRT_DELETE_BOTH: + return le_provdel(cmd->xid, + klen, kval, + prev_len, prev_val, + newlen, disksize, new_data); + case BRT_ABORT_BOTH: + case BRT_ABORT_ANY: + // I don't see how you could have an abort where the xids don't match. But do it anyway. + return le_committed(klen, kval, + prev_len, prev_val, + newlen, disksize, new_data); + case BRT_COMMIT_BOTH: + case BRT_COMMIT_ANY: + // In the future we won't even have these commit messages. + return le_committed(klen, kval, + plen, pval, + newlen, disksize, new_data); + case BRT_NONE: break; + } + assert(0); + return 0; +} + +static int apply_cmd_to_le_provdel (TXNID xid, + u_int32_t klen, void *kval, + u_int32_t clen, void *cval, + BRT_CMD cmd, + u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { + // keep the committed value for rollback + //assert(cmd->u.id.key->size == klen); + //assert(memcmp(cmd->u.id.key->data, kval, klen)==0); + switch (cmd->type) { + case BRT_INSERT: + if (cmd->xid == xid) { + return le_both(cmd->xid, + klen, kval, + clen, cval, + cmd->u.id.val->size, cmd->u.id.val->data, + newlen, disksize, new_data); + } else { + // It's an insert, but the committed value is deleted (since the xids don't match, we assume the delete took effect) + return le_provpair(cmd->xid, + klen, kval, + cmd->u.id.val->size, cmd->u.id.val->data, + newlen, disksize, new_data); + } + case BRT_DELETE_ANY: + case BRT_DELETE_BOTH: + if (cmd->xid == xid) { + // A delete of a delete could conceivably return the identical value, saving a malloc and a free, but to simplify things we just reallocate it + // because othewise we have to notice not to free() the olditem. + return le_provdel(cmd->xid, + klen, kval, + clen, cval, + newlen, disksize, new_data); + } else { + // The commited value is deleted, and we are deleting, so treat as a delete. + *new_data = 0; + return 0; + } + case BRT_ABORT_BOTH: + case BRT_ABORT_ANY: + // I don't see how the xids could not match... + return le_committed(klen, kval, + clen, cval, + newlen, disksize, new_data); + case BRT_COMMIT_BOTH: + case BRT_COMMIT_ANY: + *new_data = 0; + return 0; + case BRT_NONE: break; + } + assert(0); + return 0; +} + +static int apply_cmd_to_le_provpair (TXNID xid, + u_int32_t klen, void *kval, + u_int32_t plen , void *pval, + BRT_CMD cmd, + u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { + //assert(cmd->u.id.key->size == klen); + //assert(memcmp(cmd->u.id.key->data, kval, klen)==0); + switch (cmd->type) { + case BRT_INSERT: + if (cmd->xid == xid) { + // it's still a provpair (the old prov value is lost) + return le_provpair(cmd->xid, + klen, kval, + cmd->u.id.val->size, cmd->u.id.val->data, + newlen, disksize, new_data); + } else { + // the old prov was actually committed. + return le_both(cmd->xid, + klen, kval, + plen, pval, + cmd->u.id.val->size, cmd->u.id.val->data, + newlen, disksize, new_data); + } + case BRT_DELETE_BOTH: + case BRT_DELETE_ANY: + if (cmd->xid == xid) { + // A delete of a provisional pair is nothign + *new_data = 0; + return 0; + } else { + // The prov pair is actually a committed value. + return le_provdel(cmd->xid, + klen, kval, + plen, pval, + newlen, disksize, new_data); + } + case BRT_ABORT_BOTH: + case BRT_ABORT_ANY: + // An abort of a provisional pair is nothing. + *new_data = 0; + return 0; + case BRT_COMMIT_ANY: + case BRT_COMMIT_BOTH: + return le_committed(klen, kval, + plen, pval, + newlen, disksize, new_data); + case BRT_NONE: break; + } + assert(0); + return 0; +} + +static int +apply_cmd_to_leaf (BRT_CMD cmd, + void *stored_data, // NULL if there was no stored data. + u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) +{ + if (stored_data==0) { + switch (cmd->type) { + case BRT_INSERT: + { + LEAFENTRY le; + int r = le_provpair(cmd->xid, + cmd->u.id.key->size, cmd->u.id.key->data, + cmd->u.id.val->size, cmd->u.id.val->data, + newlen, disksize, &le); + if (r==0) *new_data=le; + return r; + } + case BRT_DELETE_BOTH: + case BRT_DELETE_ANY: + case BRT_ABORT_BOTH: + case BRT_ABORT_ANY: + case BRT_COMMIT_BOTH: + case BRT_COMMIT_ANY: + *new_data = 0; + return 0; // Don't have to insert anything. + case BRT_NONE: + break; + } + assert(0); + return 0; + } else { + LESWITCHCALL(stored_data, apply_cmd_to, cmd, + newlen, disksize, new_data); + } + abort(); return 0; // make certain compilers happy +} + +static int +brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, + u_int32_t idx, LEAFENTRY le) +// Effect: Apply cmd to leafentry +// idx is the location where it goes +// le is old leafentry +{ + FILENUM filenum = toku_cachefile_filenum(t->cf); + u_int32_t newlen=0, newdisksize=0; + LEAFENTRY newdata=0; + int r = apply_cmd_to_leaf(cmd, le, &newlen, &newdisksize, &newdata); + if (r!=0) return r; + if (newdata) assert(newdisksize == leafentry_disksize(newdata)); + + //printf("Applying command: %s xid=%lld ", unparse_cmd_type(cmd->type), (long long)cmd->xid); + //toku_print_BYTESTRING(stdout, cmd->u.id.key->size, cmd->u.id.key->data); + //printf(" "); + //toku_print_BYTESTRING(stdout, cmd->u.id.val->size, cmd->u.id.val->data); + //printf(" to \n"); + //print_leafentry(stdout, le); printf("\n"); + //printf(" got "); print_leafentry(stdout, newdata); printf("\n"); + + if (le && newdata) { + if (t->txn_that_created != cmd->xid) { + if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r; + if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r; + } + + node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le); + node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le); + + u_int32_t size = leafentry_memsize(le); + + LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen); + assert(new_le); + memcpy(new_le, newdata, newlen); + + // This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for. + // But we must compute the size before doing the mempool malloc because otherwise the le pointer is no good. + toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since le may be no good any more. + + node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize; + node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata); + toku_free(newdata); + + if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) return r; + + } else { + if (le) { + // It's there, note that it's gone and remove it from the mempool + + if (t->txn_that_created != cmd->xid) { + if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r; + } + + if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) return r; + + node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le); + node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le); + + toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(le)); // Must pass 0, since le may be no good any more. + + } + if (newdata) { + LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen); + assert(new_le); + memcpy(new_le, newdata, newlen); + if ((r = toku_omt_insert_at(node->u.l.buffer, new_le, idx))) return r; + + if (t->txn_that_created != cmd->xid) { + if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r; + } + + node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize; + node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata); + toku_free(newdata); + } + } +// printf("%s:%d rand4=%08x local_fingerprint=%08x this=%08x\n", __FILE__, __LINE__, node->rand4fingerprint, node->local_fingerprint, toku_calccrc32_kvpair_struct(kv)); + return 0; +} + +static int +brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, + u_int64_t *new_size /*OUT*/ + ) +// Effect: Put a cmd into a leaf. +// Return the serialization size in *new_size. +// The leaf could end up "too big". It is up to the caller to fix that up. +{ +// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint); + VERIFY_NODE(t, node); + assert(node->height==0); + + LEAFENTRY storeddata; + OMTVALUE storeddatav=NULL; + + u_int32_t idx; + int r; + int compare_both = should_compare_both_keys(node, cmd); + struct cmd_leafval_bessel_extra be = {t, cmd, compare_both}; + + //static int counter=0; + //counter++; + //printf("counter=%d\n", counter); + + switch (cmd->type) { + case BRT_INSERT: + if (node->u.l.seqinsert) { + idx = toku_omt_size(node->u.l.buffer); + r = toku_omt_fetch(node->u.l.buffer, idx-1, &storeddatav, NULL); + if (r != 0) goto fz; + storeddata = storeddatav; + int cmp = toku_cmd_leafval_bessel(storeddata, &be); + if (cmp >= 0) goto fz; + r = DB_NOTFOUND; + } else { + fz: + r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be, + &storeddatav, &idx, NULL); + } + if (r==DB_NOTFOUND) { + storeddata = 0; + } else if (r!=0) { + return r; + } else { + storeddata=storeddatav; + } + + r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata); + if (r!=0) return r; + + // if the insertion point is within a window of the right edge of + // the leaf then it is sequential + + // window = min(32, number of leaf entries/16) + u_int32_t s = toku_omt_size(node->u.l.buffer); + u_int32_t w = s / 16; + if (w == 0) w = 1; + if (w > 32) w = 32; + + // within the window? + if (s - idx <= w) { + node->u.l.seqinsert += 1; + } else { + node->u.l.seqinsert = 0; + } + break; + case BRT_DELETE_BOTH: + case BRT_ABORT_BOTH: + case BRT_COMMIT_BOTH: + + // Delete the one item + r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be, + &storeddatav, &idx, NULL); + if (r == DB_NOTFOUND) break; + if (r != 0) return r; + storeddata=storeddatav; + + VERIFY_NODE(t, node); + + static int count=0; + count++; + r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata); + if (r!=0) return r; + + VERIFY_NODE(t, node); + break; + + case BRT_DELETE_ANY: + case BRT_ABORT_ANY: + case BRT_COMMIT_ANY: + // Delete all the matches + + r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be, + &storeddatav, &idx, NULL); + if (r == DB_NOTFOUND) break; + if (r != 0) return r; + storeddata=storeddatav; + + while (1) { + int vallen = le_any_vallen(storeddata); + void *save_val = toku_memdup(le_any_val(storeddata), vallen); + + r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata); + if (r!=0) return r; + + // Now we must find the next one. + DBT valdbt; + BRT_CMD_S ncmd = { cmd->type, cmd->xid, .u.id={cmd->u.id.key, toku_fill_dbt(&valdbt, save_val, vallen)}}; + struct cmd_leafval_bessel_extra nbe = {t, &ncmd, 1}; + r = toku_omt_find(node->u.l.buffer, toku_cmd_leafval_bessel, &nbe, +1, + &storeddatav, &idx, NULL); + + toku_free(save_val); + if (r!=0) break; + storeddata=storeddatav; + { // Continue only if the next record that we found has the same key. + DBT adbt; + if (t->compare_fun(t->db, + toku_fill_dbt(&adbt, le_any_key(storeddata), le_any_keylen(storeddata)), + cmd->u.id.key) != 0) + break; + } + } + + break; + + case BRT_NONE: return EINVAL; + } + /// All done doing the work + + node->dirty = 1; + +// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint); + + VERIFY_NODE(t, node); + *new_size = toku_serialize_brtnode_size(node); + return 0; +} static int -brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum should_status *should, BOOL *did_io) +brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum should_status *should, int *io_count) // Effect: Push CMD into the subtree rooted at NODE, and indicate whether as a result NODE should split or should merge. // If NODE is a leaf, then // put CMD into leaf, applying it to the leafentries // If NODE is a nonleaf, then copy the cmd into the relevant child fifos. // For each child fifo that is empty and where the child is in main memory put the command into the child (using this same algorithm) -// Use *did_io to determine whether I/O has already been performed. If it has then we avoid doing additional I/O. +// Use *io_count to determine whether I/O has already been performed. Once I/O has occured, we avoid additional I/O by leaving nodes that are overfull, underfull, overfat, or underfat. // Set *should as follows: // { SHOULD_SPLIT if the node is overfull // *should = { SHOULD_MERGE if the node is underfull // { SHOULD_OK if the node is ok. (Those cases are mutually exclusive.) -// If we perform I/O then set *did_io to true. +// For every I/O increment *io_count { if (node->height==0) { int r; @@ -343,7 +816,7 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum shoul } else { int r; u_int32_t new_fanout = 0; // Some compiler bug in gcc is complaining that this is uninitialized. - r = brt_nonleaf_put_cmd(t, node, cmd, logger, &new_fanout); + r = brt_nonleaf_put_cmd(t, node, cmd, logger, &new_fanout, io_count); if (r!=0) return 0; if (new_fanout > TREE_FANOUT) *should = SHOULD_SPLIT; else if (new_fanout*4 < TREE_FANOUT) *should = SHOULD_MERGE;