diff --git a/newbrt/brt.c b/newbrt/brt.c index 79334b9763ad32ba374eba524768423da30f779f..7d7b6719996328dd6e56951195c199683b7a08d3 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -2947,6 +2947,76 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT return 0; } +/* clear the right present flag if the key matches the pivot key */ +static int brt_node_maybe_clear_right_pivot_flag(BRTNODE node, DBT *key, DBT *val, int childnum, BRT t) { + int match = 0; + if (0 <= childnum) { + if (0 == brt_compare_pivot(t, key, val, node->u.n.childkeys[childnum])) { + assert(node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_R); + node->u.n.pivotflags[childnum] &= ~BRT_PIVOT_PRESENT_R; + node->dirty = 1; + match = 1; + } + } + return match; +} + +/* clear the left present flag if the key matches the pivot key */ +static int brt_node_maybe_clear_left_pivot_flag(BRTNODE node, DBT *key, DBT *val, int childnum, BRT t) { + int match = 0; + if (childnum < node->u.n.n_children - 1) { + if (0 == brt_compare_pivot(t, key, val, node->u.n.childkeys[childnum])) { + assert(node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_L); + node->u.n.pivotflags[childnum] &= ~BRT_PIVOT_PRESENT_L; + node->dirty = 1; + match = 1; + } + } + return match; +} + +/* check if any subtree of this node contains the key */ +static int brt_node_any_key_present(BRTNODE node, DBT *key, DBT *val, BRT t) { + int i; + for (i=0; i<node->u.n.n_children-1; i++) + if (0 == brt_compare_pivot(t, key, val, node->u.n.childkeys[i]) && (node->u.n.pivotflags[i] & (BRT_PIVOT_PRESENT_L + BRT_PIVOT_PRESENT_R))) + return 1; + return 0; +} + +/* clear the key present flags in the nodes along the cursor path */ +static void brt_cursor_maybe_clear_pivot_flags(BRT_CURSOR cursor) { + int r; + DBT key, *k; + toku_init_dbt(&key); key.flags = DB_DBT_MALLOC; + k = &key; + + DBT val, *v; + if (cursor->brt->flags & TOKU_DB_DUPSORT) { + toku_init_dbt(&val); val.flags = DB_DBT_MALLOC; + v = &val; + } else + v = 0; + + r = toku_pma_cursor_get_current(cursor->pmacurs, k, v, 1); assert(r == 0); + + int i; + for (i = cursor->path_len - 2; i >= 0; i -= 1) { + BRTNODE node = cursor->path[i]; + int childnum = cursor->pathcnum[i]; + int match; + match = brt_node_maybe_clear_left_pivot_flag(node, k, v, childnum, cursor->brt); + match += brt_node_maybe_clear_right_pivot_flag(node, k, v, childnum-1, cursor->brt); + if (!match) break; + + /* if matching keys in any subtrees of this node then we are done */ + if (brt_node_any_key_present(node, k, v, cursor->brt)) break; + } + + toku_free(k->data); + if (v) toku_free(v->data); +} + /* delete the key and value under the cursor */ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused__))) { int r; @@ -2954,11 +3024,12 @@ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused_ if (cursor->path_len > 0) { BRTNODE node = cursor->path[cursor->path_len-1]; assert(node->height == 0); - int kvsize; - r = toku_pma_cursor_delete_under(cursor->pmacurs, &kvsize, node->rand4fingerprint, &node->local_fingerprint); + int kvsize, lastmatch; + r = toku_pma_cursor_delete_under(cursor->pmacurs, &kvsize, node->rand4fingerprint, &node->local_fingerprint, &lastmatch); if (r == 0) { node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kvsize; node->dirty = 1; + if (lastmatch) brt_cursor_maybe_clear_pivot_flags(cursor); } } else r = DB_NOTFOUND; diff --git a/newbrt/pma-test.c b/newbrt/pma-test.c index 9d8b34a42a88266d8dcc54f39da2f7bd45cd1f74..9c089ac6ece0cffcd8647637f00c5a9865bb421f 100644 --- a/newbrt/pma-test.c +++ b/newbrt/pma-test.c @@ -1898,21 +1898,18 @@ static void test_pma_cursor_delete_under() { assert(error == 0); PMA_CURSOR cursor; - error = toku_pma_cursor(pma, &cursor, &skey, &sval); - assert(error == 0); + error = toku_pma_cursor(pma, &cursor, &skey, &sval); assert(error == 0); - int kvsize; + int kvsize, lastkey; /* delete under an uninitialized cursor should fail */ - error = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint); + error = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint, 0); assert(error == DB_NOTFOUND); - DBT key, val; int k, v; - int i; - /* insert 0 .. n-1 */ + int i; for (i=0; i<n; i++) { k = htonl(i); v = i; @@ -1925,6 +1922,7 @@ static void test_pma_cursor_delete_under() { assert(error == DB_NOTFOUND); break; } + DBT key, val; toku_init_dbt(&key); key.flags = DB_DBT_MALLOC; toku_init_dbt(&val); val.flags = DB_DBT_MALLOC; error = toku_pma_cursor_get_current(cursor, &key, &val, 0); @@ -1937,21 +1935,94 @@ static void test_pma_cursor_delete_under() { toku_free(val.data); /* delete under should succeed */ - error = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint); - assert(error == 0); + error = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint, &lastkey); + assert(error == 0 && lastkey); /* 2nd delete under should fail */ - error = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint); + error = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint, 0); assert(error == DB_NOTFOUND); } assert(i == n); - error = toku_pma_cursor_free(&cursor); - assert(error == 0); + error = toku_pma_cursor_free(&cursor); assert(error == 0); assert(toku_pma_n_entries(pma) == 0); - error = toku_pma_free(&pma); + error = toku_pma_free(&pma); assert(error == 0); +} + +static void test_pma_cursor_delete_under_mode(int n, int dup_mode) { + printf("test_pma_cursor_delete_under_mode:%d %d\n", n, dup_mode); + + int error; + PMA pma; + + u_int32_t rand4fingerprint = random(); + u_int32_t sum = 0; + u_int32_t expect_fingerprint = 0; + + error = toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, n * (8 + sizeof (int) + sizeof (int))); assert(error == 0); + + error = toku_pma_set_dup_mode(pma, dup_mode); assert(error == 0); + + PMA_CURSOR cursor; + error = toku_pma_cursor(pma, &cursor, &skey, &sval); assert(error == 0); + + int kvsize, lastkey; + + /* delete under an uninitialized cursor should fail */ + error = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint, &lastkey); + assert(error == DB_NOTFOUND); + + int k, v; + + k = htonl(1); v = 0; + do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); + + /* insert n-2 dups */ + int i; + for (i=1; i < n-1; i++) { + k = htonl(2); + v = i; + do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); + } + + k = htonl(3); v = i; + do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); + + for (i=0;;i++) { + error = toku_pma_cursor_set_position_next(cursor); + if (error != 0) { + assert(error == DB_NOTFOUND); + break; + } + DBT key, val; + toku_init_dbt(&key); key.flags = DB_DBT_MALLOC; + toku_init_dbt(&val); val.flags = DB_DBT_MALLOC; + error = toku_pma_cursor_get_current(cursor, &key, &val, 0); + assert(error == 0); + int vv; + assert(val.size == sizeof vv); + memcpy(&vv, val.data, val.size); + assert(vv == i); + toku_free(key.data); + toku_free(val.data); + + /* delete under should succeed */ + error = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint, &lastkey); + assert(error == 0); + if (i == 0 || i >= n-2) assert(lastkey); + + /* 2nd delete under should fail */ + error = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint, &lastkey); + assert(error == DB_NOTFOUND); + } + assert(i == n); + + error = toku_pma_cursor_free(&cursor); assert(error == 0); + assert(toku_pma_n_entries(pma) == 0); + + error = toku_pma_free(&pma); assert(error == 0); } static void test_pma_cursor_set_both() { @@ -2428,6 +2499,12 @@ static void test_dup() { static void pma_tests (void) { toku_memory_check=1; + + test_pma_cursor_delete_under(); local_memory_check_all_free(); + test_pma_cursor_delete_under_mode(3, TOKU_DB_DUP); local_memory_check_all_free(); + test_pma_cursor_delete_under_mode(3, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free(); + return; + toku_test_keycompare(); local_memory_check_all_free(); test_pma_compare_fun(0); local_memory_check_all_free(); test_pma_compare_fun(1); local_memory_check_all_free(); @@ -2451,6 +2528,7 @@ static void pma_tests (void) { test_pma_cursor_set_key(); local_memory_check_all_free(); test_pma_cursor_set_range(); local_memory_check_all_free(); test_pma_cursor_delete_under(); local_memory_check_all_free(); + test_pma_cursor_delete_under_mode(3, TOKU_DB_DUP); local_memory_check_all_free(); test_pma_cursor_set_both(); local_memory_check_all_free(); test_dup(); } diff --git a/newbrt/pma.c b/newbrt/pma.c index 6a6c28908cd24ef932d14030b264acf0314019e1..accdf5d64a78f695c77480addc11aed5b7ecf66b 100644 --- a/newbrt/pma.c +++ b/newbrt/pma.c @@ -865,7 +865,88 @@ int toku_pma_cursor_set_range(PMA_CURSOR c, DBT *key) { return r; } -int toku_pma_cursor_delete_under(PMA_CURSOR c, int *kvsize, u_int32_t rand4sem, u_int32_t *fingerprint) { +/* find the next matching key in the pma starting from index here */ +static int pma_next_key(PMA pma, DBT *k, DBT *v, int here, int n, int *found) { + assert(0 <= here); + *found = 0; + while (here < n && !kv_pair_inuse(pma->pairs[here])) + here += 1; + if (here < n) { + struct kv_pair *kv = kv_pair_ptr(pma->pairs[here]); + DBT k2, v2; + int cmp = pma->compare_fun(pma->db, k, toku_fill_dbt(&k2, kv_pair_key(kv), kv_pair_keylen(kv))); + if (cmp == 0 && v) + cmp = pma->dup_compare_fun(pma->db, v, toku_fill_dbt(&v2, kv_pair_val(kv), kv_pair_vallen(kv))); + if (cmp == 0) + *found = 1; + } + return here; +} + +/* find the previous matching key in the pma starting from index here */ +static int pma_prev_key(PMA pma, DBT *k, DBT *v, int here, int n, int *found) { + assert(here < n); + *found = 0; + while (0 <= here && !kv_pair_inuse(pma->pairs[here])) + here -= 1; + if (0 <= here) { + struct kv_pair *kv = kv_pair_ptr(pma->pairs[here]); + DBT k2, v2; + int cmp = pma->compare_fun(pma->db, k, toku_fill_dbt(&k2, kv_pair_key(kv), kv_pair_keylen(kv))); + if (cmp == 0 && v) + cmp = pma->dup_compare_fun(pma->db, v, toku_fill_dbt(&v2, kv_pair_val(kv), kv_pair_vallen(kv))); + if (cmp == 0) + *found = 1; + } + return here; +} + +/* set lastkeymatch if the kv pair under the cursor is the last one in the pma + compare with the next and previous valid pma entries */ + +static void pma_cursor_key_last(PMA_CURSOR c, int *lastkeymatch) { + *lastkeymatch = 1; + PMA pma = c->pma; + if (pma->dup_mode & TOKU_DB_DUP) { + int here, found; + + /* get the current key */ + here = c->position; assert(0 <= here && here < (int) pma->N); + struct kv_pair *kv = kv_pair_ptr(pma->pairs[here]); + DBT currentkey; toku_fill_dbt(¤tkey, kv_pair_key(kv), kv_pair_keylen(kv)); + DBT currentval, *v; + + if (pma->dup_mode & TOKU_DB_DUPSORT) { + toku_fill_dbt(¤tval, kv_pair_val(kv), kv_pair_vallen(kv)); + v = ¤tval; + } else + v = 0; + + /* check if the next key == current key */ + here = c->position+1; + for (;;) { + here = pma_next_key(pma, ¤tkey, v, here, pma->N, &found); + if (!found) break; + if (kv_pair_valid(pma->pairs[here])) { + *lastkeymatch = 0; /* next key == current key */ + return; + } + } + + /* check if the prev key == current key */ + here = c->position-1; + for (;;) { + here = pma_prev_key(pma, ¤tkey, v, here, pma->N, &found); + if (!found) break; + if (kv_pair_valid(pma->pairs[here])) { + *lastkeymatch = 0; /* prev key == current key */ + return; + } + } + } +} + +int toku_pma_cursor_delete_under(PMA_CURSOR c, int *kvsize, u_int32_t rand4sem, u_int32_t *fingerprint, int *lastkeymatch) { int r = DB_NOTFOUND; if (c->position >= 0) { PMA pma = c->pma; @@ -877,6 +958,8 @@ int toku_pma_cursor_delete_under(PMA_CURSOR c, int *kvsize, u_int32_t rand4sem, *fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv)); pma->pairs[c->position] = kv_pair_set_deleted(kv); r = 0; + if (lastkeymatch) + pma_cursor_key_last(c, lastkeymatch); } } return r; @@ -1057,21 +1140,6 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK } } -/* find the next matching key in the pma starting from index here */ -static int pma_next_key(PMA pma, DBT *k, int here, int n, int *found) { - assert(0 <= here); - *found = 0; - while (here < n && !kv_pair_inuse(pma->pairs[here])) - here += 1; - if (here < n) { - struct kv_pair *kv = kv_pair_ptr(pma->pairs[here]); - DBT k2; - if (0 == pma->compare_fun(pma->db, k, toku_fill_dbt(&k2, kv_pair_key(kv), kv_pair_keylen(kv)))) - *found = 1; - } - return here; -} - static int pma_delete_dup (PMA pma, DBT *k, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) { /* find the left most matching key in the pma */ int found; @@ -1092,7 +1160,7 @@ static int pma_delete_dup (PMA pma, DBT *k, u_int32_t rand4sem, u_int32_t *finge } } /* find the next matching key in the pma */ - righthere = pma_next_key(pma, k, righthere+1, pma->N, &rightfound); + righthere = pma_next_key(pma, k, 0, righthere+1, pma->N, &rightfound); } if (found) { /* check the density of the region centered around the deleted pairs */ diff --git a/newbrt/pma.h b/newbrt/pma.h index abbdc61eb098036cda9ecadba509f4f72bf61330..cf9cefc0f00abbd75f2e0a19ec228aebc7e68233 100644 --- a/newbrt/pma.h +++ b/newbrt/pma.h @@ -117,7 +117,7 @@ int toku_pma_cursor_set_key(PMA_CURSOR c, DBT *key); int toku_pma_cursor_set_range(PMA_CURSOR c, DBT *key); /* delete the key value pair under the cursor, return the size of the pair */ -int toku_pma_cursor_delete_under(PMA_CURSOR /*c*/, int */*kvsize*/, u_int32_t /*rand4sem*/, u_int32_t */*fingerprint*/); +int toku_pma_cursor_delete_under(PMA_CURSOR /*c*/, int */*kvsize*/, u_int32_t /*rand4sem*/, u_int32_t */*fingerprint*/, int */*lastkeymatch*/); int toku_pma_random_pick(PMA, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen); diff --git a/src/tests/test_cursor_delete.c b/src/tests/test_cursor_delete.c new file mode 100644 index 0000000000000000000000000000000000000000..f2c0140f2b24c15ce5526bb12918df6df9e4dd30 --- /dev/null +++ b/src/tests/test_cursor_delete.c @@ -0,0 +1,141 @@ +/* -*- mode: C; c-basic-offset: 4 -*- */ +#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." + +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include <unistd.h> +#include <string.h> +#include <sys/stat.h> +#include <arpa/inet.h> +#include <db.h> + +#include "test.h" + +void cursor_expect(DBC *cursor, int k, int v, int op) { + DBT key, val; + int r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), op); + assert(r == 0); + assert(key.size == sizeof k); + int kk; + memcpy(&kk, key.data, key.size); + assert(val.size == sizeof v); + int vv; + memcpy(&vv, val.data, val.size); + if (kk != k || vv != v) printf("expect key %d got %d - %d %d\n", htonl(k), htonl(kk), htonl(v), htonl(vv)); + assert(kk == k); + assert(vv == v); + + free(key.data); + free(val.data); +} + +/* generate a multi-level tree and delete all entries with a cursor + verify that the pivot flags are toggled (currently by inspection) */ + +void test_cursor_delete(int dup_mode) { + if (verbose) printf("test_cursor_delete:%d\n", dup_mode); + + int pagesize = 4096; + int elementsize = 32; + int npp = pagesize/elementsize; + int n = 16*npp; /* build a 2 level tree */ + + DB_ENV * const null_env = 0; + DB *db; + DB_TXN * const null_txn = 0; + const char * const fname = DIR "/" "test.cursor.delete.brt"; + int r; + + unlink(fname); + + /* create the dup database file */ + r = db_create(&db, null_env, 0); assert(r == 0); + r = db->set_flags(db, dup_mode); assert(r == 0); + r = db->set_pagesize(db, pagesize); assert(r == 0); + r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0); + + int i; + for (i=0; i<n; i++) { + int k = htonl(dup_mode & DB_DUP ? 1 : i); + int v = htonl(i); + DBT key, val; + r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0); + } + + /* verify the sort order with a cursor */ + DBC *cursor; + r = db->cursor(db, null_txn, &cursor, 0); assert(r == 0); + + for (i=0; i<n; i++) { + cursor_expect(cursor, htonl(dup_mode & DB_DUP ? 1 : i), htonl(i), DB_NEXT); + + r = cursor->c_del(cursor, 0); assert(r == 0); + } + + r = cursor->c_close(cursor); assert(r == 0); + + r = db->close(db, 0); assert(r == 0); +} + +/* insert duplicate duplicates into a sorted duplicate tree */ + +void test_cursor_delete_dupsort(int dup_mode) { + if (verbose) printf("test_cursor_delete_dupsort:%d\n", dup_mode); + + int pagesize = 4096; + int elementsize = 32; + int npp = pagesize/elementsize; + int n = 16*npp; /* build a 2 level tree */ + + DB_ENV * const null_env = 0; + DB *db; + DB_TXN * const null_txn = 0; + const char * const fname = DIR "/" "test.cursor.delete.brt"; + int r; + + unlink(fname); + + /* create the dup database file */ + r = db_create(&db, null_env, 0); assert(r == 0); + r = db->set_flags(db, dup_mode); assert(r == 0); + r = db->set_pagesize(db, pagesize); assert(r == 0); + r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0); + + int i; + for (i=0; i<n; i++) { + int k = htonl(1); + int v = htonl(1); + DBT key, val; + r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0); + } + + /* verify the sort order with a cursor */ + DBC *cursor; + r = db->cursor(db, null_txn, &cursor, 0); assert(r == 0); + + for (i=0; i<n; i++) { + cursor_expect(cursor, htonl(1), htonl(1), DB_NEXT); + + r = cursor->c_del(cursor, 0); assert(r == 0); + } + + r = cursor->c_close(cursor); assert(r == 0); + + r = db->close(db, 0); assert(r == 0); +} + +int main(int argc, const char *argv[]) { + + parse_args(argc, argv); + + system("rm -rf " DIR); + mkdir(DIR, 0777); + + test_cursor_delete(0); + test_cursor_delete(DB_DUP); + test_cursor_delete(DB_DUP + DB_DUPSORT); + test_cursor_delete_dupsort(DB_DUP + DB_DUPSORT); + + return 0; +}