Commit 03ea974a authored by Rich Prohaska's avatar Rich Prohaska

build the dupsort split key

git-svn-id: file:///svn/tokudb@605 c7de825b-a66e-492c-adef-691d508d4ae1
parent f0dc1356
......@@ -63,6 +63,8 @@ static void __pma_delete_resume(PMA pma, int here);
*/
static int __pma_count_cursor_refs(PMA pma, int here);
static int __pma_compare_kv(PMA pma, struct kv_pair *a, struct kv_pair *b, DB *db);
/**************************** end of static functions forward declarations. *********************/
......@@ -614,6 +616,11 @@ static int __pma_resize_array(PMA pma, int asksize, int startz) {
return 0;
}
int pma_set_compare(PMA pma, pma_compare_fun_t compare_fun) {
pma->compare_fun = compare_fun;
return 0;
}
int pma_set_dup_mode(PMA pma, int dup_mode) {
assert(dup_mode == 0 || dup_mode == DB_DUP || dup_mode == (DB_DUP+DB_DUPSORT));
pma->dup_mode = dup_mode;
......@@ -1291,6 +1298,15 @@ static void __pma_relocate_kvpairs(PMA pma) {
#endif
static int __pma_compare_kv(PMA pma, struct kv_pair *a, struct kv_pair *b, DB *db) {
DBT dbta, dbtb;
int cmp = pma->compare_fun(db, fill_dbt(&dbta, kv_pair_key(a), kv_pair_keylen(a)), fill_dbt(&dbtb, kv_pair_key(b), kv_pair_keylen(b)));
if (cmp == 0 && (pma->dup_mode & DB_DUPSORT)) {
cmp = pma->dup_compare_fun(db, fill_dbt(&dbta, kv_pair_val(a), kv_pair_vallen(b)), fill_dbt(&dbtb, kv_pair_val(b), kv_pair_vallen(b)));
}
return cmp;
}
int pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, DB *db,
PMA leftpma, unsigned int *leftpma_size, u_int32_t leftrand4fp, u_int32_t *leftfingerprint,
PMA rightpma, unsigned int *rightpma_size, u_int32_t rightrand4fp, u_int32_t *rightfingerprint) {
......@@ -1363,20 +1379,24 @@ int pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, DB *db,
}
if (splitk) {
struct kv_pair *kv = pairs[spliti-1].pair;
assert(kv_pair_valid(kv));
splitk->size = kv_pair_keylen(kv);
splitk->data = memdup(kv_pair_key(kv), splitk->size);
struct kv_pair *a = pairs[spliti-1].pair;
if (origpma->dup_mode & DB_DUPSORT) {
int kl = kv_pair_keylen(a);
int vl = kv_pair_vallen(a);
splitk->size = (sizeof vl) + kl + vl;
splitk->data = toku_malloc(splitk->size);
memcpy(splitk->data, &vl, sizeof vl);
memcpy(splitk->data + (sizeof vl), kv_pair_key(a), kl);
memcpy(splitk->data + (sizeof vl) + kl, kv_pair_val(a), vl);
} else {
splitk->size = kv_pair_keylen(a);
splitk->data = memdup(kv_pair_key(a), splitk->size);
}
splitk->flags = BRT_PIVOT_PRESENT_L;
if (spliti < npairs) {
kv = pairs[spliti].pair;
DBT k2;
int cmp = origpma->compare_fun(db, splitk, fill_dbt(&k2, kv_pair_key(kv), kv_pair_keylen(kv)));
if (cmp == 0) {
if (spliti < npairs && __pma_compare_kv(origpma, a, pairs[spliti].pair, db) == 0) {
splitk->flags += BRT_PIVOT_PRESENT_R;
}
}
}
/* put the first half of pairs into the left pma */
n = spliti;
......
......@@ -21,6 +21,8 @@ typedef int (*pma_compare_fun_t)(DB *, const DBT *a, const DBT *b);
int pma_create(PMA *, pma_compare_fun_t compare_fun, int maxsize);
int pma_set_compare(PMA pma, pma_compare_fun_t compare_fun);
/* set the duplicate mode
0 -> no duplications, DB_DUP, DB_DUPSORT */
int pma_set_dup_mode(PMA pma, int mode);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment