Commit 9172fe82 authored by Rich Prohaska's avatar Rich Prohaska

simplify the pma searches. addresses #250

git-svn-id: file:///svn/tokudb@1780 c7de825b-a66e-492c-adef-691d508d4ae1
parent 11f669d5
......@@ -1869,6 +1869,39 @@ static void test_pma_already_there() {
assert(error == 0);
}
static void test_pma_cursor_first(int n) {
if (verbose) printf("test_pma_cursor_first:%d\n", n);
u_int32_t rand4fingerprint = random();
u_int32_t sum = 0;
int r;
PMA pma;
r = toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0); assert(r == 0);
PMA_CURSOR cursor;
r = toku_pma_cursor(pma, &cursor, &skey, &sval); assert(r == 0);
DBT key, val;
int k, v;
int i;
for (i=0; i<n; i++) {
k = htonl(i); v = htonl(i);
r = toku_pma_insert(pma, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), NULL_ARGS, rand4fingerprint, &sum);
assert(r == 0);
}
for (i=0; ; i++) {
r = toku_pma_cursor_set_position_first(cursor);
if (r != 0) break;
k = htonl(i);
r = toku_pma_delete(pma, toku_fill_dbt(&key, &k, sizeof k), 0, rand4fingerprint, &sum, 0); assert(r == 0);
}
assert(i == n);
r = toku_pma_cursor_free(&cursor); assert(r == 0);
r = toku_pma_free(&pma); assert(r == 0);
}
static void test_pma_cursor_set_key() {
if (verbose) printf("test_pma_cursor_set_key\n");
......@@ -2525,6 +2558,7 @@ static void pma_tests (void) {
test_pma_insert_or_replace(); local_memory_check_all_free();
test_pma_delete();
test_pma_already_there(); local_memory_check_all_free();
test_pma_cursor_first(8); local_memory_check_all_free();
test_pma_cursor_set_key(); local_memory_check_all_free();
test_pma_cursor_set_range(); local_memory_check_all_free();
test_pma_cursor_delete_under(); local_memory_check_all_free();
......
......@@ -268,8 +268,8 @@ static int pma_compare_dbt_kv(PMA pma, DBT *k, DBT *v, struct kv_pair *kv) {
return cmp;
}
/* search the index for a matching key */
static unsigned int __pma_search(PMA pma, DBT *k, int lo, int hi, int *found) {
/* search the index for a matching key and maybe value */
static unsigned int pma_search(PMA pma, DBT *k, DBT *v, int lo, int hi, int *found) {
assert(0 <= lo && lo <= hi);
if (lo >= hi) {
*found = 0;
......@@ -281,70 +281,15 @@ static unsigned int __pma_search(PMA pma, DBT *k, int lo, int hi, int *found) {
while (mi < hi && !kv_pair_inuse(pma->pairs[mi]))
mi++;
if (mi >= hi)
return __pma_search(pma, k, lo, omi, found);
int cmp = pma_compare_dbt_kv(pma, k, 0, kv_pair_ptr(pma->pairs[mi]));
if (cmp > 0)
return __pma_search(pma, k, mi+1, hi, found);
if (cmp < 0)
return __pma_search(pma, k, lo, mi, found);
*found = 1;
return mi;
}
}
/* search the index for the rightmost matching key */
static unsigned int pma_right_search(PMA pma, DBT *k, DBT *v, int lo, int hi, int *found) {
assert(0 <= lo && lo <= hi);
if (lo >= hi) {
*found = 0;
return lo;
} else {
int mi = (lo + hi)/2;
assert(lo <= mi && mi < hi);
int omi = mi;
while (mi < hi && !kv_pair_inuse(pma->pairs[mi]))
mi++;
if (mi >= hi)
return pma_right_search(pma, k, v, lo, omi, found);
int cmp = pma_compare_dbt_kv(pma, k, v, kv_pair_ptr(pma->pairs[mi]));
if (cmp > 0)
return pma_right_search(pma, k, v, mi+1, hi, found);
if (cmp < 0)
return pma_right_search(pma, k, v, lo, mi, found);
/* we have a match, try to find a match on the right tree */
int here;
here = pma_right_search(pma, k, v, mi+1, hi, found);
if (*found == 0)
here = mi;
*found = 1;
return here;
}
}
/* search the index for the left most matching key */
static unsigned int pma_left_search(PMA pma, DBT *k, DBT *v, int lo, int hi, int *found) {
assert(0 <= lo && lo <= hi);
if (lo >= hi) {
*found = 0;
return lo;
} else {
int mi = (lo + hi)/2;
assert(lo <= mi && mi < hi);
int omi = mi;
while (mi < hi && !kv_pair_inuse(pma->pairs[mi]))
mi++;
if (mi >= hi)
return pma_left_search(pma, k, v, lo, omi, found);
return pma_search(pma, k, v, lo, omi, found);
int cmp = pma_compare_dbt_kv(pma, k, v, kv_pair_ptr(pma->pairs[mi]));
if (cmp > 0)
return pma_left_search(pma, k, v, mi+1, hi, found);
return pma_search(pma, k, v, mi+1, hi, found);
if (cmp < 0)
return pma_left_search(pma, k, v, lo, mi, found);
return pma_search(pma, k, v, lo, mi, found);
/* we have a match, try to find a match on the left tree */
int here;
here = pma_left_search(pma, k, v, lo, mi, found);
/* we have a match, try to find a better match on the left tree */
int here = pma_search(pma, k, v, lo, mi, found);
if (*found == 0)
here = mi;
*found = 1;
......@@ -359,51 +304,9 @@ static unsigned int pma_left_search(PMA pma, DBT *k, DBT *v, int lo, int hi, int
// For example: if the array is full of small keys, that means we return toku_pma_index_limit(pma), which is off the end of teh array.
// For example: if the array is full of large keys, then we return 0.
int toku_pmainternal_find (PMA pma, DBT *k) {
#if 1
int lo=0, hi=toku_pma_index_limit(pma);
/* lo and hi are the minimum and maximum values (inclusive) that we could possibly return. */
pma_count_finds++;
while (lo<hi) {
int mid;
// Scan forward looking for a non-null value.
for (mid=(lo+hi)/2; mid<hi; mid++) {
struct kv_pair *kv = pma->pairs[mid];
if (kv_pair_inuse(kv)) {
// Found one.
kv = kv_pair_ptr(kv);
DBT k2;
int cmp = pma->compare_fun(pma->db, k, toku_fill_dbt(&k2, kv->key, kv->keylen));
if (cmp==0) return mid;
else if (cmp<0) {
/* key is smaller than the midpoint, so look in the low half. */
hi = (lo+hi)/2; /* recalculate the midpoint, since mid is no necessarily the midpoint now. */
pma_count_divides++;
goto next_range;
} else {
/* key is larger than the midpoint. So look in the high half. */
lo = mid+1; /* The smallest value we could want to return is lo. */
pma_count_divides++;
goto next_range;
}
/* Not reached */
}
pma_count_scans++;
}
/* If we got here, all from mid to hi were null, so adjust hi to the midpoint. */
/* If the whole array is null, we'll end up returning index 0, which is good. */
hi = (lo+hi)/2;
pma_count_divides++;
next_range: ; /* We have adjusted lo and hi, so look again. */
}
assert(0<=lo);
assert(lo==hi);
assert((unsigned)hi <= toku_pma_index_limit(pma));
return lo;
#else
int found;
int lo = __pma_search(pma, k, 0, pma->N, &found);
int lo = pma_search(pma, k, 0, 0, pma->N, &found);
return lo;
#endif
}
//int min (int i, int j) { if (i<j) return i; else return j; }
......@@ -799,7 +702,7 @@ static int pma_prev_key(PMA pma, DBT *k, DBT *v, int here, int n, int *found) {
int toku_pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val) {
PMA pma = c->pma;
unsigned int here; int found;
here = pma_left_search(pma, key, val, 0, pma->N, &found);
here = pma_search(pma, key, val, 0, pma->N, &found);
assert(here<=toku_pma_index_limit(pma));
int r = DB_NOTFOUND;
/* skip any deleted pairs that match */
......@@ -816,7 +719,7 @@ int toku_pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val) {
int toku_pma_cursor_set_range_both(PMA_CURSOR c, DBT *key, DBT *val) {
PMA pma = c->pma;
unsigned int here; int found;
here = pma_left_search(pma, key, val, 0, pma->N, &found);
here = pma_search(pma, key, val, 0, pma->N, &found);
assert(here<=toku_pma_index_limit(pma));
/* find the first valid pair where key[here] >= key */
......@@ -925,20 +828,12 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset
enum pma_errors toku_pma_lookup (PMA pma, DBT *k, DBT *v) {
unsigned int here;
int found;
if (pma->dup_mode & TOKU_DB_DUPSORT) {
here = pma_left_search(pma, k, 0, 0, pma->N, &found);
} else
here = toku_pmainternal_find(pma, k);
assert(here<=toku_pma_index_limit(pma));
if (here==toku_pma_index_limit(pma)) return DB_NOTFOUND;
DBT k2;
struct kv_pair *pair;
pair = pma->pairs[here];
if (kv_pair_valid(pair) && pma->compare_fun(pma->db, k, toku_fill_dbt(&k2, pair->key, pair->keylen))==0) {
return toku_dbt_set_value(v, pair->key + pair->keylen, pair->vallen, &pma->sval);
} else {
here = pma_search(pma, k, 0, 0, pma->N, &found);
struct kv_pair *kv = pma->pairs[here];
if (found && kv_pair_valid(kv))
return toku_dbt_set_value(v, kv->key + kv->keylen, kv->vallen, &pma->sval);
else
return DB_NOTFOUND;
}
}
/* returns 0 if OK.
......@@ -979,26 +874,20 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
int found;
unsigned int idx;
if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = pma_right_search(pma, k, v, 0, pma->N, &found);
if (found) return BRT_ALREADY_THERE;
} else {
idx = toku_pmainternal_find(pma, k);
if (idx < toku_pma_index_limit(pma) && pma->pairs[idx]) {
DBT k2;
struct kv_pair *kv = kv_pair_ptr(pma->pairs[idx]);
if (0==pma->compare_fun(pma->db, k, toku_fill_dbt(&k2, kv->key, kv->keylen))) {
if (kv_pair_deleted(pma->pairs[idx])) {
pma_mfree_kv_pair(pma, pma->pairs[idx]);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
int r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
return r;
} else
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
}
}
if (pma->dup_mode & TOKU_DB_DUPSORT)
idx = pma_search(pma, k, v, 0, pma->N, &found);
else
idx = pma_search(pma, k, 0, 0, pma->N, &found);
if (found) {
if (kv_pair_deleted(pma->pairs[idx])) {
pma_mfree_kv_pair(pma, pma->pairs[idx]);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
int r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
return r;
} else
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
}
if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx;
......@@ -1024,7 +913,7 @@ static int pma_delete_dup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_
/* find the left most matching key in the pma */
int found;
unsigned int lefthere;
lefthere = pma_left_search(pma, k, v, 0, pma->N, &found);
lefthere = pma_search(pma, k, v, 0, pma->N, &found);
int rightfound = found, righthere = lefthere;
while (rightfound) {
struct kv_pair *kv = pma->pairs[righthere];
......@@ -1053,11 +942,9 @@ static int pma_delete_nodup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int3
/* find the left most matching key in the pma */
int found;
unsigned int here;
here = pma_left_search(pma, k, v, 0, pma->N, &found);
if (!found)
return DB_NOTFOUND;
here = pma_search(pma, k, v, 0, pma->N, &found);
struct kv_pair *kv = pma->pairs[here];
if (!kv_pair_valid(kv))
if (!found || !kv_pair_valid(kv))
return DB_NOTFOUND;
*deleted_size = PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kv_pair_keylen(kv) + kv_pair_vallen(kv);
*fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
......@@ -1166,43 +1053,33 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
u_int32_t rand4fingerprint, u_int32_t *fingerprint) {
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int r;
struct kv_pair *kv;
unsigned int idx;
int found;
if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = pma_right_search(pma, k, v, 0, pma->N, &found);
if (found) {
kv = kv_pair_ptr(pma->pairs[idx]); goto replaceit;
if (pma->dup_mode & TOKU_DB_DUPSORT)
idx = pma_search(pma, k, v, 0, pma->N, &found);
else
idx = pma_search(pma, k, 0, 0, pma->N, &found);
if (found) {
struct kv_pair *kv = pma->pairs[idx];
if (kv_pair_deleted(kv)) {
*replaced_v_size = -1;
pma->pairs[idx] = kv_pair_ptr(kv); /* mark as not deleted */
} else {
*replaced_v_size = kv->vallen;
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
r=toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, kv);
if (r!=0) return r;
}
} else {
idx = toku_pmainternal_find(pma, k);
if (idx < toku_pma_index_limit(pma) && (kv = pma->pairs[idx])) {
DBT k2;
// printf("%s:%d\n", __FILE__, __LINE__);
kv = kv_pair_ptr(kv);
if (0==pma->compare_fun(pma->db, k, toku_fill_dbt(&k2, kv->key, kv->keylen))) {
replaceit:
if (kv_pair_deleted(pma->pairs[idx])) {
*replaced_v_size = -1;
pma->pairs[idx] = kv;
} else {
*replaced_v_size = kv->vallen;
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
r=toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, kv);
if (r!=0) return r;
}
if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size);
} else {
pma_mfree_kv_pair(pma, kv);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
}
r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r;
}
if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size);
} else {
pma_mfree_kv_pair(pma, kv);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
}
r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r;
}
if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment