From 0a074cbe91dd282cecef1be191e8edb828c17daa Mon Sep 17 00:00:00 2001
From: "Bradley C. Kuszmaul" <bradley@tokutek.com>
Date: Mon, 23 Jul 2007 20:10:16 +0000
Subject: [PATCH] pma with compare fun works, and also uses DBTs for most
 interface work

git-svn-id: file:///svn/tokudb@31 c7de825b-a66e-492c-adef-691d508d4ae1
---
 newbrt/pma-internal.h |   3 +-
 newbrt/pma-test.c     | 116 ++++++++++++++++++++++--------------------
 newbrt/pma.c          |  68 +++++++++----------------
 newbrt/pma.h          |   8 +--
 newbrt/ybt.c          |   8 +++
 newbrt/ybt.h          |   1 +
 6 files changed, 102 insertions(+), 102 deletions(-)

diff --git a/newbrt/pma-internal.h b/newbrt/pma-internal.h
index 9f4f3958ac2..265bc43a790 100644
--- a/newbrt/pma-internal.h
+++ b/newbrt/pma-internal.h
@@ -28,6 +28,7 @@ struct pma {
 			  *  The densitystep is 0.10. */
     PMA_CURSOR cursors_head, cursors_tail;
     int (*compare_fun)(DB*,DBT*,DBT*);
+    void *skey, *sval; /* used in dbts */
 };
 
 int pmainternal_count_region (struct pair *pairs, int lo, int hi);
@@ -35,5 +36,5 @@ void pmainternal_calculate_parameters (PMA pma);
 int pmainternal_smooth_region (struct pair *pairs, int n, int idx);
 int pmainternal_printpairs (struct pair *pairs, int N);
 int pmainternal_make_space_at (PMA pma, int idx);
-int pmainternal_find (PMA pma, bytevec key, int keylen);
+int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called.
 void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
diff --git a/newbrt/pma-test.c b/newbrt/pma-test.c
index 526079382c4..97b8aca09b8 100644
--- a/newbrt/pma-test.c
+++ b/newbrt/pma-test.c
@@ -70,37 +70,38 @@ static void test_pma_find (void) {
     int i;
     int r;
     const int N = 16;
+    DBT k;
     MALLOC(pma);
     MALLOC_N(N,pma->pairs);
     // All that is needed to test pma_find is N and pairs.
     pma->N = N;
     for (i=0; i<N; i++) pma->pairs[i].key=0;
     assert(pma_index_limit(pma)==N);
-    r=pmainternal_find(pma, "hello", 5);
+    r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
     assert(r==0);
 
     pma->pairs[5].key="hello";
     pma->pairs[5].keylen=5;
     assert(pma_index_limit(pma)==N);
-    r=pmainternal_find(pma, "hello", 5);
+    r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
     assert(pma_index_limit(pma)==N);
     assert(r==5);
-    r=pmainternal_find(pma, "there", 5);
+    r=pmainternal_find(pma, fill_dbt(&k, "there", 5), 0);
     assert(r==6);
-    r=pmainternal_find(pma, "aaa", 3);
+    r=pmainternal_find(pma, fill_dbt(&k, "aaa", 3), 0);
     assert(r==0);
 
     pma->pairs[N-1].key="there";
     pma->pairs[N-1].keylen=5;
-    r=pmainternal_find(pma, "hello", 5);
+    r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
     assert(r==5);
-    r=pmainternal_find(pma, "there", 5);
+    r=pmainternal_find(pma, fill_dbt(&k, "there", 5), 0);
     assert(r==N-1);
-    r=pmainternal_find(pma, "aaa", 3);
+    r=pmainternal_find(pma, fill_dbt(&k, "aaa", 3), 0);
     assert(r==0);
-    r=pmainternal_find(pma, "hellob", 6);
+    r=pmainternal_find(pma, fill_dbt(&k, "hellob", 6), 0);
     assert(r==6);
-    r=pmainternal_find(pma, "zzz", 3);
+    r=pmainternal_find(pma, fill_dbt(&k, "zzz", 3), 0);
     assert(r==N);
     toku_free(pma->pairs);
     toku_free(pma);
@@ -203,25 +204,26 @@ static void test_pma_random_pick (void) {
     int r = pma_create(&pma, default_compare_fun);
     bytevec key,val;
     ITEMLEN keylen,vallen;
+    DBT k,v;
     assert(r==0);
     r = pma_random_pick(pma, &key, &keylen, &val, &vallen);
     assert(r==DB_NOTFOUND);
-    r = pma_insert(pma, "hello", 6, "there", 6);
+    r = pma_insert(pma, fill_dbt(&k, "hello", 6), fill_dbt(&v, "there", 6), 0);
     assert(r==BRT_OK);
     r = pma_random_pick(pma, &key, &keylen, &val, &vallen);
     assert(r==0);
     assert(keylen==6); assert(vallen==6);
     assert(strcmp(key,"hello")==0);
     assert(strcmp(val,"there")==0);
-    r = pma_delete(pma, "nothello", 9);
+    r = pma_delete(pma, fill_dbt(&k, "nothello", 9), 0);
     assert(r==DB_NOTFOUND);
-    r = pma_delete(pma, "hello", 6);
+    r = pma_delete(pma, fill_dbt(&k, "hello", 6), 0);
     assert(r==BRT_OK);
 
     r = pma_random_pick(pma, &key, &keylen, &val, &vallen);
     assert(r==DB_NOTFOUND);
     
-    r = pma_insert(pma, "hello", 6, "there", 6);
+    r = pma_insert(pma, fill_dbt(&k, "hello", 6), fill_dbt(&v, "there", 6), 0);
     assert(r==BRT_OK);
 
 
@@ -231,20 +233,20 @@ static void test_pma_random_pick (void) {
     assert(strcmp(key,"hello")==0);
     assert(strcmp(val,"there")==0);
 
-    r = pma_insert(pma, "aaa", 4, "athere", 7); assert(r==BRT_OK);
-    r = pma_insert(pma, "aab", 4, "bthere", 7); assert(r==BRT_OK);
-    r = pma_insert(pma, "aac", 4, "cthere", 7); assert(r==BRT_OK);
-    r = pma_insert(pma, "aad", 4, "dthere", 7); assert(r==BRT_OK);
-    r = pma_insert(pma, "aae", 4, "ethere", 7); assert(r==BRT_OK);
-    r = pma_insert(pma, "aaf", 4, "fthere", 7); assert(r==BRT_OK);
-    r = pma_insert(pma, "aag", 4, "gthere", 7); assert(r==BRT_OK);
-    r = pma_delete(pma, "aaa", 4);              assert(r==BRT_OK);
-    r = pma_delete(pma, "aab", 4);              assert(r==BRT_OK);
-    r = pma_delete(pma, "aac", 4);              assert(r==BRT_OK);
-    r = pma_delete(pma, "aad", 4);              assert(r==BRT_OK);
-    r = pma_delete(pma, "aae", 4);              assert(r==BRT_OK);
-    r = pma_delete(pma, "aag", 4);              assert(r==BRT_OK);
-    r = pma_delete(pma, "hello", 6);            assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "aaa", 4), fill_dbt(&v, "athere", 7), 0); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "aab", 4), fill_dbt(&v, "bthere", 7), 0); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "aac", 4), fill_dbt(&v, "cthere", 7), 0); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "aad", 4), fill_dbt(&v, "dthere", 7), 0); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "aae", 4), fill_dbt(&v, "ethere", 7), 0); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "aaf", 4), fill_dbt(&v, "fthere", 7), 0); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "aag", 4), fill_dbt(&v, "gthere", 7), 0); assert(r==BRT_OK);
+    r = pma_delete(pma, fill_dbt(&k, "aaa", 4), 0);              assert(r==BRT_OK);
+    r = pma_delete(pma, fill_dbt(&k, "aab", 4), 0);              assert(r==BRT_OK);
+    r = pma_delete(pma, fill_dbt(&k, "aac", 4), 0);              assert(r==BRT_OK);
+    r = pma_delete(pma, fill_dbt(&k, "aad", 4), 0);              assert(r==BRT_OK);
+    r = pma_delete(pma, fill_dbt(&k, "aae", 4), 0);              assert(r==BRT_OK);
+    r = pma_delete(pma, fill_dbt(&k, "aag", 4), 0);              assert(r==BRT_OK);
+    r = pma_delete(pma, fill_dbt(&k, "hello", 6), 0);            assert(r==BRT_OK);
    
     r = pma_random_pick(pma, &key, &keylen, &val, &vallen);
     assert(r==0);
@@ -258,34 +260,37 @@ static void test_pma_random_pick (void) {
 static void test_find_insert (void) {
     PMA pma;
     int r;
-    bytevec dv;
-    ITEMLEN dl;
+    DBT k,v;
     pma_create(&pma, default_compare_fun);
-    r=pma_lookup(pma, "aaa", 3, &dv, &dl);
+    r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0);
     assert(r==DB_NOTFOUND);
 
-    r=pma_insert(pma, "aaa", 3, "aaadata", 7);
+    r=pma_insert(pma, fill_dbt(&k, "aaa", 3), fill_dbt(&v, "aaadata", 7), 0);
     assert(r==BRT_OK);
 
-    dv=0; dl=0;
-    r=pma_lookup(pma, "aaa", 3, &dv, &dl);
+    ybt_init(&v);
+    r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0);
     assert(r==BRT_OK);
-    assert(keycompare(dv,dl,"aaadata", 7)==0);
+    assert(v.size==7);
+    assert(keycompare(v.data,v.size,"aaadata", 7)==0);
+    //toku_free(v.data); v.data=0;
 
-    r=pma_insert(pma, "bbb", 4, "bbbdata", 8);
+    r=pma_insert(pma, fill_dbt(&k, "bbb", 4), fill_dbt(&v, "bbbdata", 8), 0);
     assert(r==BRT_OK);
 
-    r=pma_lookup(pma, "aaa", 3, &dv, &dl);
+    ybt_init(&v);
+    r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0);
     assert(r==BRT_OK);
-    assert(keycompare(dv,dl,"aaadata", 7)==0);
+    assert(keycompare(v.data,v.size,"aaadata", 7)==0);
 
-    r=pma_lookup(pma, "bbb", 4, &dv, &dl);
+    ybt_init(&v);
+    r=pma_lookup(pma, fill_dbt(&k, "bbb", 4), &v, 0);
     assert(r==BRT_OK);
-    assert(keycompare(dv,dl,"bbbdata", 8)==0);
+    assert(keycompare(v.data,v.size,"bbbdata", 8)==0);
 
     assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL);
     
-    r=pma_insert(pma, "00000", 6, "d0", 3);
+    r=pma_insert(pma, fill_dbt(&k, "00000", 6), fill_dbt(&v, "d0", 3), 0);
     assert(r==BRT_OK);
 
     assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL);
@@ -301,7 +306,7 @@ static void test_find_insert (void) {
 	    snprintf(string,10,"%05d",i);
 	    snprintf(dstring,10,"d%d", i);
 	    printf("Inserting %d: string=%s dstring=%s\n", i, string, dstring);
-	    r=pma_insert(pma, string, strlen(string)+1, dstring, strlen(dstring)+1);
+	    r=pma_insert(pma, fill_dbt(&k, string, strlen(string)+1), fill_dbt(&v, dstring, strlen(dstring)+1), 0);
 	    assert(r==BRT_OK);
 	}
     }
@@ -327,12 +332,13 @@ static void test_pma_iterate_internal (PMA pma, int expected_k, int expected_v)
 static void test_pma_iterate (void) {
     PMA pma;
     int r;
+    DBT k,v;
     pma_create(&pma, default_compare_fun);
-    r=pma_insert(pma, "42", 3, "-19", 4);
+    r=pma_insert(pma, fill_dbt(&k, "42", 3), fill_dbt(&v, "-19", 4), 0);
     assert(r==BRT_OK);
     test_pma_iterate_internal(pma, 42, -19);
 
-    r=pma_insert(pma, "12", 3, "-100", 5);
+    r=pma_insert(pma, fill_dbt(&k, "12", 3), fill_dbt(&v, "-100", 5), 0);
     assert(r==BRT_OK);
     test_pma_iterate_internal(pma, 42+12, -19-100);
     r=pma_free(&pma); assert(r==0); assert(pma==0);
@@ -343,11 +349,12 @@ static void test_pma_iterate2 (void) {
     int r;
     int sum=0;
     int n_items=0;
+    DBT k,v;
     r=pma_create(&pma0, default_compare_fun); assert(r==0);
     r=pma_create(&pma1, default_compare_fun); assert(r==0);
-    pma_insert(pma0, "a", 2, "aval", 5);
-    pma_insert(pma0, "b", 2, "bval", 5);
-    pma_insert(pma1, "x", 2, "xval", 5);
+    pma_insert(pma0, fill_dbt(&k, "a", 2), fill_dbt(&v, "aval", 5), 0);
+    pma_insert(pma0, fill_dbt(&k, "b", 2), fill_dbt(&v, "bval", 5), 0);
+    pma_insert(pma1, fill_dbt(&k, "x", 2), fill_dbt(&v, "xval", 5), 0);
     PMA_ITERATE(pma0,kv __attribute__((__unused__)),kl,dv __attribute__((__unused__)),dl, (n_items++,sum+=kl+dl));
     PMA_ITERATE(pma1,kv __attribute__((__unused__)),kl,dv __attribute__((__unused__)), dl, (n_items++,sum+=kl+dl));
     assert(sum==21);
@@ -422,10 +429,11 @@ void test_pma_cursor_3 (void) {
     PMA_CURSOR c=0;
     int r;
     DBT key,val;
+    DBT k,v;
     r=pma_create(&pma, default_compare_fun); assert(r==0);
-    r=pma_insert(pma, "x", 2, "xx", 3); assert(r==BRT_OK);
-    r=pma_insert(pma, "m", 2, "mm", 3); assert(r==BRT_OK);
-    r=pma_insert(pma, "aa", 3, "a", 2); assert(r==BRT_OK);
+    r=pma_insert(pma, fill_dbt(&k, "x", 2),  fill_dbt(&v, "xx", 3), 0); assert(r==BRT_OK);
+    r=pma_insert(pma, fill_dbt(&k, "m", 2),  fill_dbt(&v, "mm", 3), 0); assert(r==BRT_OK);
+    r=pma_insert(pma, fill_dbt(&k, "aa", 3), fill_dbt(&v,"a", 2),   0); assert(r==BRT_OK);
     ybt_init(&key); key.flags=DB_DBT_REALLOC;
     ybt_init(&val); val.flags=DB_DBT_REALLOC;
     r=pma_cursor(pma, &c); assert(r==0); assert(c!=0);
@@ -478,7 +486,6 @@ int wrong_endian_compare_fun (DB *ignore __attribute__((__unused__)),
     int siz = a->size;
     assert(a->size==b->size); // This function requires that the keys be the same size.
     
-    abort();
     for (i=0; i<a->size; i++) {
 	if (ad[siz-1-i]<bd[siz-1-i]) return -1;
 	if (ad[siz-1-i]>bd[siz-1-i]) return +1;
@@ -495,11 +502,12 @@ void test_pma_compare_fun (int wrong_endian_p) {
     char *right_endian_expected_keys[] = {"00", "01", "10", "11"};
     char **expected_keys = wrong_endian_p ? wrong_endian_expected_keys : right_endian_expected_keys;
     int i;
+    DBT k,v;
     r = pma_create(&pma, wrong_endian_p ? wrong_endian_compare_fun : default_compare_fun); assert(r==0);
-    r = pma_insert(pma, "10", 3, "10v", 4); assert(r==BRT_OK);
-    r = pma_insert(pma, "00", 3, "00v", 4); assert(r==BRT_OK);
-    r = pma_insert(pma, "01", 3, "01v", 4); assert(r==BRT_OK);
-    r = pma_insert(pma, "11", 3, "11v", 4); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "10", 3), fill_dbt(&v, "10v", 4), 0); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "00", 3), fill_dbt(&v, "00v", 4), 0); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "01", 3), fill_dbt(&v, "01v", 4), 0); assert(r==BRT_OK);
+    r = pma_insert(pma, fill_dbt(&k, "11", 3), fill_dbt(&v, "11v", 4), 0); assert(r==BRT_OK);
     ybt_init(&key); key.flags=DB_DBT_REALLOC;
     ybt_init(&val); val.flags=DB_DBT_REALLOC;
     r=pma_cursor(pma, &c); assert(r==0); assert(c!=0);
diff --git a/newbrt/pma.c b/newbrt/pma.c
index 23e7265e1a4..d8134d2c1d2 100644
--- a/newbrt/pma.c
+++ b/newbrt/pma.c
@@ -92,7 +92,7 @@ void pma_show_stats (void) {
 // For example: if the array is empty, that means we return 0.
 // For example: if the array is full of small keys, that means we return pma_index_limit(pma), which is off the end of teh array.
 // For example: if the array is full of large keys, then we return 0.
-int pmainternal_find (PMA pma, bytevec key, int keylen) {
+int pmainternal_find (PMA pma, DBT *k, DB *db) {
     int lo=0, hi=pma_index_limit(pma);
     /* lo and hi are the minimum and maximum values (inclusive) that we could possibly return. */
     pma_count_finds++;
@@ -102,7 +102,8 @@ int pmainternal_find (PMA pma, bytevec key, int keylen) {
 	for (mid=(lo+hi)/2; mid<hi; mid++) {
 	    if (pma->pairs[mid].key!=0) {
 		// Found one.
-		int cmp = keycompare(key,keylen, pma->pairs[mid].key, pma->pairs[mid].keylen);
+		DBT k2;
+		int cmp = pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[mid].key, pma->pairs[mid].keylen));
 		if (cmp==0) return mid;
 		else if (cmp<0) {
 		    /* key is smaller than the midpoint, so look in the low half. */
@@ -131,7 +132,8 @@ int pmainternal_find (PMA pma, bytevec key, int keylen) {
     /* If lo points at something, the something should not be smaller than key. */
     if (lo>0 && lo < pma_index_limit(pma) && pma->pairs[lo].key) {
 	//printf("lo=%d\n", lo);
-	assert(0 >= keycompare(key, keylen, pma->pairs[lo].key, pma->pairs[lo].keylen));
+	DBT k2;
+	assert(0 >= pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[lo].key, pma->pairs[lo].keylen)));
     }
     return lo;
 }
@@ -283,6 +285,8 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,DBT*,DBT*)) {
     pmainternal_calculate_parameters(result);
     result->cursors_head = result->cursors_tail = 0;
     result->compare_fun = compare_fun;
+    result->skey=0;
+    result->sval = 0;
     *pma = result;
     assert((unsigned long)result->pairs[result->N].key==0xdeadbeefL);
     return 0;
@@ -446,16 +450,13 @@ int pmainternal_make_space_at (PMA pma, int idx) {
 }
 
 
-/* Exposes internals of the PMA by returning a pointer to the guts.
- * Don't modify the returned data.  Don't free it. */
-enum pma_errors pma_lookup (PMA pma, bytevec key, ITEMLEN keylen, bytevec*val, ITEMLEN *vallen) {
-    int l = pmainternal_find(pma, key, keylen);
+enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) {
+    DBT k2;
+    int l = pmainternal_find(pma, k, db);
     assert(0<=l ); assert(l<=pma_index_limit(pma));
     if (l==pma_index_limit(pma)) return DB_NOTFOUND;
-    if (keycompare(key,keylen,pma->pairs[l].key,pma->pairs[l].keylen)==0) {
-	*val = pma->pairs[l].val;
-	*vallen = pma->pairs[l].vallen;
-	return BRT_OK;
+    if (pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[l].key,pma->pairs[l].keylen))==0) {
+	return ybt_set_value(v, pma->pairs[l].val, pma->pairs[l].vallen, &pma->sval);
     } else {
 	return DB_NOTFOUND;
     }
@@ -481,15 +482,18 @@ int pma_free (PMA *pmap) {
     }
     toku_free(pma->pairs);
     toku_free(pma);
+    if (pma->skey) toku_free(pma->skey);
+    if (pma->sval) toku_free(pma->sval);
     *pmap=0;
     return 0;
 }
 
 /* Copies keylen and datalen */ 
-int pma_insert (PMA pma, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen) {
-    int idx = pmainternal_find(pma, key, keylen);
+int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) {
+    int idx = pmainternal_find(pma, k, db);
     if (idx < pma_index_limit(pma) && pma->pairs[idx].key) {
-	if (0==keycompare(key, keylen, pma->pairs[idx].key, pma->pairs[idx].keylen)) {
+	DBT k2;
+	if (0==pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[idx].key, pma->pairs[idx].keylen))) {
 	    return BRT_ALREADY_THERE; /* It is already here.  Return an error. */
 	}
     }
@@ -497,40 +501,16 @@ int pma_insert (PMA pma, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN data
 	idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
     }
     assert(!pma->pairs[idx].key);
-    pma->pairs[idx].key    = memdup(key, keylen);
-    pma->pairs[idx].keylen = keylen;
-    pma->pairs[idx].val    = memdup(data, datalen);
-    pma->pairs[idx].vallen = datalen;
+    pma->pairs[idx].key    = memdup(k->data, k->size);
+    pma->pairs[idx].keylen = k->size;
+    pma->pairs[idx].val    = memdup(v->data, v->size);
+    pma->pairs[idx].vallen = v->size;
     pma->n_pairs_present++;
     return BRT_OK;
 }    
 
-#if 0
-void smooth_after_delete (PMA pma, int idx) {
-    int size=pma->uplgN;
-    int lo=idx;
-    int hi=idx;
-    double density=0.1;
-    while (1) {
-	lo=idx-size/2;
-	hi=idx+size/2;
-	if (lo<0) { hi-=lo; lo=0; }
-	else if (hi>pma_index_limit(pma)) { lo-=(hi-pma_index_limit(pma)); hi=pma_index_limit(pma); }
-	else { ; /* nothing */ }
-
-	assert(density<0.25);
-	{
-	    int count=pmainternal_count_region(pma->pairs, lo, hi);
-	    if (count/(double)(hi-lo) >= density) break;
-	    if (lo==0 && hi==pma_index_limit(pma)) {
-		/* The array needs to be shrunk */
-		
-}
-#endif
-	    
-
-int pma_delete (PMA pma, bytevec key, ITEMLEN keylen) {
-    int l = pmainternal_find(pma, key, keylen);
+int pma_delete (PMA pma, DBT *k, DB *db) {
+    int l = pmainternal_find(pma, k, db);
     if (pma->pairs[l].key==0) {
 	printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, l, DB_NOTFOUND);
 	return DB_NOTFOUND;
diff --git a/newbrt/pma.h b/newbrt/pma.h
index fe0c8f47ac3..0cbca0499b4 100644
--- a/newbrt/pma.h
+++ b/newbrt/pma.h
@@ -24,15 +24,17 @@ int  pma_n_entries (PMA);
 /* The values returned should not be modified.by the caller. */
 /* Any cursors should be updated. */
 /* Duplicates the key and keylen. */
-enum pma_errors pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
+//enum pma_errors pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
+// The DB pointer is there so that the comparison function can be called.
+enum pma_errors pma_insert (PMA, DBT*, DBT*, DB*);
 /* This returns an error if the key is NOT present. */
 int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
 /* This returns an error if the key is NOT present. */
-int pma_delete (PMA, bytevec key, ITEMLEN keylen);
+int pma_delete (PMA, DBT *, DB*);
 
 /* Exposes internals of the PMA by returning a pointer to the guts.
  * Don't modify the returned data.  Don't free it. */
-enum pma_errors pma_lookup (PMA, bytevec key, ITEMLEN keylen, bytevec*data, ITEMLEN *datalen);
+enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*);
 
 /* Move the cursor to the beginning or the end or to a key */
 int pma_cursor (PMA, PMA_CURSOR *);
diff --git a/newbrt/ybt.c b/newbrt/ybt.c
index d2bf6622d68..8f446c2dd68 100644
--- a/newbrt/ybt.c
+++ b/newbrt/ybt.c
@@ -9,6 +9,14 @@ int ybt_init (DBT *ybt) {
     return 0;
 }
 
+DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
+    ybt_init(dbt);
+    dbt->size=len;
+    dbt->data=(char*)k;
+    return dbt;
+}
+
+
 int ybt_set_value (DBT *ybt, bytevec val, ITEMLEN vallen, void **staticptrp) {
     if (ybt->flags==DB_DBT_MALLOC) {
     domalloc:
diff --git a/newbrt/ybt.h b/newbrt/ybt.h
index f8a0480d053..13a69c5a35c 100644
--- a/newbrt/ybt.h
+++ b/newbrt/ybt.h
@@ -7,6 +7,7 @@
 
 
 int ybt_init (DBT *);
+DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
 int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);
 
 #endif
-- 
2.30.9