Commit 13a968e0 authored by Rich Prohaska's avatar Rich Prohaska

set pma dupsort insert dup dup behaviour to replace. addresses #178

git-svn-id: file:///svn/tokudb@1132 c7de825b-a66e-492c-adef-691d508d4ae1
parent 02cfc459
...@@ -2362,7 +2362,7 @@ static void test_dupsort_key_insert(int n, int dup_data) { ...@@ -2362,7 +2362,7 @@ static void test_dupsort_key_insert(int n, int dup_data) {
int values[n]; int values[n];
int i; int i;
for (i=0; i<n; i++) for (i=0; i<n; i++)
values[i] = (i==0 || dup_data) ? (int) htonl(random()) : values[i-1]; values[i] = (!dup_data || i==0) ? (int) htonl(random()) : values[i-1];
/* insert 2->n-i */ /* insert 2->n-i */
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
...@@ -2489,8 +2489,10 @@ static void test_dup() { ...@@ -2489,8 +2489,10 @@ static void test_dup() {
test_dup_key_delete(1000, TOKU_DB_DUP); local_memory_check_all_free(); test_dup_key_delete(1000, TOKU_DB_DUP); local_memory_check_all_free();
test_dupsort_key_insert(2, 0); local_memory_check_all_free(); test_dupsort_key_insert(2, 0); local_memory_check_all_free();
test_dupsort_key_insert(1000, 0); local_memory_check_all_free(); test_dupsort_key_insert(1000, 0); local_memory_check_all_free();
#if PMA_DUP_DUP
test_dupsort_key_insert(2, 1); local_memory_check_all_free(); test_dupsort_key_insert(2, 1); local_memory_check_all_free();
test_dupsort_key_insert(1000, 1); local_memory_check_all_free(); test_dupsort_key_insert(1000, 1); local_memory_check_all_free();
#endif
test_dup_key_delete(0, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free(); test_dup_key_delete(0, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_dup_key_delete(1000, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free(); test_dup_key_delete(1000, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_dup_key_lookup(32, TOKU_DB_DUP); local_memory_check_all_free(); test_dup_key_lookup(32, TOKU_DB_DUP); local_memory_check_all_free();
......
...@@ -1096,12 +1096,10 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK ...@@ -1096,12 +1096,10 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
if (pma->dup_mode & TOKU_DB_DUPSORT) { if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = __pma_dup_search(pma, k, v, 0, pma->N, &found); idx = __pma_dup_search(pma, k, v, 0, pma->N, &found);
if (found) if (found) return BRT_ALREADY_THERE;
idx += 1;
} else if (pma->dup_mode & TOKU_DB_DUP) { } else if (pma->dup_mode & TOKU_DB_DUP) {
idx = __pma_right_search(pma, k, 0, pma->N, &found); idx = __pma_right_search(pma, k, 0, pma->N, &found);
if (found) if (found) idx += 1;
idx += 1;
} else { } else {
idx = toku_pmainternal_find(pma, k); idx = toku_pmainternal_find(pma, k);
if (idx < toku_pma_index_limit(pma) && pma->pairs[idx]) { if (idx < toku_pma_index_limit(pma) && pma->pairs[idx]) {
...@@ -1282,24 +1280,29 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -1282,24 +1280,29 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
u_int32_t rand4fingerprint, u_int32_t *fingerprint) { u_int32_t rand4fingerprint, u_int32_t *fingerprint) {
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size); //printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int r; int r;
struct kv_pair *kv;
unsigned int idx; unsigned int idx;
int found; int found;
if (pma->dup_mode & TOKU_DB_DUPSORT) { if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = __pma_dup_search(pma, k, v, 0, pma->N, &found); idx = __pma_dup_search(pma, k, v, 0, pma->N, &found);
if (found) #if PMA_DUP_DUP
idx += 1; if (found) idx += 1;
#else
if (found) {
kv = pma->pairs[idx]; goto replaceit;
}
#endif
} else if (pma->dup_mode & TOKU_DB_DUP) { } else if (pma->dup_mode & TOKU_DB_DUP) {
idx = __pma_right_search(pma, k, 0, pma->N, &found); idx = __pma_right_search(pma, k, 0, pma->N, &found);
if (found) if (found) idx += 1;
idx += 1;
} else { } else {
idx = toku_pmainternal_find(pma, k); idx = toku_pmainternal_find(pma, k);
struct kv_pair *kv;
if (idx < toku_pma_index_limit(pma) && (kv = pma->pairs[idx])) { if (idx < toku_pma_index_limit(pma) && (kv = pma->pairs[idx])) {
DBT k2; DBT k2;
// printf("%s:%d\n", __FILE__, __LINE__); // printf("%s:%d\n", __FILE__, __LINE__);
kv = kv_pair_ptr(kv); kv = kv_pair_ptr(kv);
if (0==pma->compare_fun(pma->db, k, toku_fill_dbt(&k2, kv->key, kv->keylen))) { if (0==pma->compare_fun(pma->db, k, toku_fill_dbt(&k2, kv->key, kv->keylen))) {
replaceit:
if (!kv_pair_deleted(pma->pairs[idx])) { if (!kv_pair_deleted(pma->pairs[idx])) {
*replaced_v_size = kv->vallen; *replaced_v_size = kv->vallen;
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv)); *fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
......
...@@ -57,7 +57,7 @@ void test_insert(int n, int dup_mode) { ...@@ -57,7 +57,7 @@ void test_insert(int n, int dup_mode) {
int values[n]; int values[n];
for (i=0; i<n; i++) for (i=0; i<n; i++)
values[i] = htonl((random() << 16) + i); values[i] = htonl((i << 16) + (random() & 0xffff));
int sortvalues[n]; int sortvalues[n];
for (i=0; i<n; i++) for (i=0; i<n; i++)
sortvalues[i] = values[i]; sortvalues[i] = values[i];
...@@ -160,7 +160,7 @@ void test_nonleaf_insert(int n, int dup_mode) { ...@@ -160,7 +160,7 @@ void test_nonleaf_insert(int n, int dup_mode) {
int values[n]; int values[n];
for (i=0; i<n; i++) for (i=0; i<n; i++)
values[i] = htonl((random() << 16) + i); values[i] = htonl((i << 16) + (random() & 0xffff));
int sortvalues[n]; int sortvalues[n];
for (i=0; i<n; i++) for (i=0; i<n; i++)
sortvalues[i] = values[i]; sortvalues[i] = values[i];
......
...@@ -13,78 +13,6 @@ ...@@ -13,78 +13,6 @@
#include "test.h" #include "test.h"
void test_hsoc_1(int pagesize, int dup_mode) {
if (verbose) printf("test_hsoc:%d %d\n", pagesize, dup_mode);
int npp = pagesize / 16;
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = DIR "/" "test.hsoc.brt";
int r;
system("rm -rf " DIR);
r=mkdir(DIR, 0777); assert(r==0);
/* create the dup database file */
r = db_create(&db, null_env, 0); assert(r == 0);
r = db->set_flags(db, dup_mode); assert(r == 0);
r = db->set_pagesize(db, pagesize); assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0);
int i;
DBT key, val;
int k, v;
/* force one leaf split */
for (i=0; i<npp; i++) {
k = htonl(i); v = i;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
/* almost fill the leaves */
for (i=0; i<(npp/2)-4; i++) {
k = htonl(0);v = i;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
for (i=0; i<(npp/2)-4; i++) {
k = htonl(npp); v = i;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, null_env, 0); assert(r == 0);
r = db->set_flags(db, dup_mode); assert(r == 0);
r = db->set_pagesize(db, pagesize); assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666); assert(r == 0);
/* do a cursor get k=0 to pull in leaf 0 */
DBC *cursor;
r = db->cursor(db,null_txn, &cursor, 0); assert(r == 0);
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST); assert(r == 0);
free(key.data); free(val.data);
/* fill up buffer 2 in the root node */
for (i=0; i<235; i++) {
k = htonl(npp); v = i;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
/* push a cmd to leaf 0 to cause it to split */
for (i=0; i<3; i++) {
k = htonl(0); v = i;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
r = cursor->c_close(cursor); assert(r == 0);
r = db->close(db, 0); assert(r == 0);
}
/* create a tree with 15 of 16 leaf nodes /* create a tree with 15 of 16 leaf nodes
each of the leaves should be about 1/2 full each of the leaves should be about 1/2 full
then almost fill leaf 0 and leaf 13 to almost full then almost fill leaf 0 and leaf 13 to almost full
...@@ -132,7 +60,7 @@ void test_hsoc(int pagesize, int dup_mode) { ...@@ -132,7 +60,7 @@ void test_hsoc(int pagesize, int dup_mode) {
/* almost fill leaf 0 */ /* almost fill leaf 0 */
if (verbose) printf("fill0\n"); if (verbose) printf("fill0\n");
for (i=0; i<(npp/2)-4; i++) { for (i=0; i<(npp/2)-4; i++) {
k = htonl(0);v = i; k = htonl(0); v = n+i;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0); r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
} }
...@@ -166,7 +94,7 @@ void test_hsoc(int pagesize, int dup_mode) { ...@@ -166,7 +94,7 @@ void test_hsoc(int pagesize, int dup_mode) {
/* push a cmd to leaf 0 to cause it to split */ /* push a cmd to leaf 0 to cause it to split */
for (i=0; i<3; i++) { for (i=0; i<3; i++) {
k = htonl(0); v = i; k = htonl(0); v = 2*n+i;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0); r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
} }
...@@ -178,8 +106,7 @@ void test_hsoc(int pagesize, int dup_mode) { ...@@ -178,8 +106,7 @@ void test_hsoc(int pagesize, int dup_mode) {
int main(int argc, const char *argv[]) { int main(int argc, const char *argv[]) {
parse_args(argc, argv); parse_args(argc, argv);
// test_hsoc_1(4096, DB_DUP); test_hsoc(4096, DB_DUP + DB_DUPSORT);
test_hsoc(4096, DB_DUP);
return 0; return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment