From 230d0d46329e44a417ddc0aaa3854aa018000d83 Mon Sep 17 00:00:00 2001
From: Rich Prohaska <prohaska@tokutek.com>
Date: Wed, 5 Dec 2007 22:07:48 +0000
Subject: [PATCH] add DBC->get DB_NEXT_DUP.  addresses #121

git-svn-id: file:///svn/tokudb@948 c7de825b-a66e-492c-adef-691d508d4ae1
---
 newbrt/brt.c              | 46 ++++++++++++++++++++++++++++++++++++++-
 newbrt/pma.c              | 15 ++-----------
 src/tests/test_dup_next.c | 15 ++++++++-----
 3 files changed, 56 insertions(+), 20 deletions(-)

diff --git a/newbrt/brt.c b/newbrt/brt.c
index eaac064cf3b..b7c0f7e64ef 100644
--- a/newbrt/brt.c
+++ b/newbrt/brt.c
@@ -2860,6 +2860,50 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
 	r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt); if (r!=0) goto died0;
 	if (r == 0) assert_cursor_path(cursor);
         break;
+    case DB_NEXT_DUP:
+        if (cursor->path_len<=0) {
+            r = EINVAL; goto died0;
+        }
+        /* get the current key, move the cursor, get the new key and compare them
+           if the keys are the same then return the duplicate key and data */
+
+        DBT k1; memset(&k1, 0, sizeof k1); 
+        r = toku_pma_cursor_get_current(cursor->pmacurs, &k1, 0); if (r != 0) goto died0;
+        r = toku_pma_cursor_set_position_next(cursor->pmacurs);
+        if (r == 0) {
+            DBT k2; memset(&k2, 0, sizeof k2); k2.flags = DB_DBT_MALLOC;
+            r = toku_pma_cursor_get_current(cursor->pmacurs, &k2, 0); if (r != 0) goto died0;
+            int cmp = cursor->brt->compare_fun(cursor->brt->db, &k1, &k2);
+            toku_free(k2.data);
+            if (cmp == 0) {
+                r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
+                if (r != 0) {
+                    toku_pma_cursor_set_position_prev(cursor->pmacurs); goto died0;
+                }
+            } else {
+                toku_pma_cursor_set_position_prev(cursor->pmacurs); r = DB_NOTFOUND; goto died0;
+            }
+        } else if (r == DB_NOTFOUND) {
+            /* we are at the end of the pma. move to the next tuple in the tree and search there */
+            r = brtcurs_set_position_next(cursor, 0, txn); 
+            if (r != 0) {
+                unpin_cursor(cursor);
+                brtcurs_set_position_last(cursor, *rootp, kbt, txn, null_brtnode);
+                goto died0;
+            }
+            DBT k2; memset(&k2, 0, sizeof k2); k2.flags = DB_DBT_MALLOC;
+            r = toku_pma_cursor_get_current(cursor->pmacurs, &k2, 0); assert(r == 0);
+            int cmp = cursor->brt->compare_fun(cursor->brt->db, &k1, &k2);
+            toku_free(k2.data);
+            if (cmp != 0) {
+                brtcurs_set_position_prev(cursor, 0, txn);
+                r = DB_NOTFOUND;
+            } else
+                r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
+            if (r != 0) goto died0;
+        } else
+            goto died0;
+        break;
     case DB_PREV:
         if (cursor->path_len<= 0)
             goto do_db_last;
@@ -2879,7 +2923,7 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
         assert(r == 0);
         r = brtcurs_set_key(cursor, *rootp, kbt, vbt, DB_SET, txn, null_brtnode);
         if (r != 0) goto died0;
-        r = toku_pma_cursor_get_current_data(cursor->pmacurs, vbt);
+        r = toku_pma_cursor_get_current(cursor->pmacurs, 0, vbt);
         if (r != 0) goto died0;
         break;
     case DB_GET_BOTH:
diff --git a/newbrt/pma.c b/newbrt/pma.c
index a841b6ff636..9fbcd7e268d 100644
--- a/newbrt/pma.c
+++ b/newbrt/pma.c
@@ -785,17 +785,6 @@ int toku_pma_cursor_set_position_next (PMA_CURSOR c) {
     return DB_NOTFOUND;
 }
 
-int toku_pma_cursor_get_current_data(PMA_CURSOR c, DBT *data) {
-    if (c->position == -1)
-        return DB_NOTFOUND;
-    PMA pma = c->pma;
-    struct kv_pair *pair = pma->pairs[c->position];
-    if (!kv_pair_valid(pair)) 
-        return BRT_KEYEMPTY;
-    toku_dbt_set_value(data, kv_pair_val(pair), kv_pair_vallen(pair), c->ssval);
-    return 0;
-}
-
 int toku_pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val) {
     if (c->position == -1)
         return DB_NOTFOUND;
@@ -803,8 +792,8 @@ int toku_pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val) {
     struct kv_pair *pair = pma->pairs[c->position];
     if (!kv_pair_valid(pair)) 
         return BRT_KEYEMPTY;
-    toku_dbt_set_value(key, kv_pair_key(pair), kv_pair_keylen(pair), c->sskey);
-    toku_dbt_set_value(val, kv_pair_val(pair), kv_pair_vallen(pair), c->ssval);
+    if (key) toku_dbt_set_value(key, kv_pair_key(pair), kv_pair_keylen(pair), c->sskey);
+    if (val) toku_dbt_set_value(val, kv_pair_val(pair), kv_pair_vallen(pair), c->ssval);
     return 0;
 }
 
diff --git a/src/tests/test_dup_next.c b/src/tests/test_dup_next.c
index dc5fe581d4a..bb51c39325d 100644
--- a/src/tests/test_dup_next.c
+++ b/src/tests/test_dup_next.c
@@ -96,8 +96,8 @@ int expect_cursor_get(DBC *cursor, int expectk, int expectv, int op) {
     return r;
 }
 
-void test_dup_next(int n, int dup_mode) {
-    if (verbose) printf("test_dup_next:%d %d\n", n, dup_mode);
+void test_dup_next(int n, int dup_mode, int bracket_dups) {
+    if (verbose) printf("test_dup_next:%d %d %d\n", n, dup_mode, bracket_dups);
 
     DB_ENV * const null_env = 0;
     DB *db;
@@ -114,8 +114,8 @@ void test_dup_next(int n, int dup_mode) {
     r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0);
 
     db_put(db, 0, 0);
-    db_put(db, 2, 0);
-    /* seq inserts to build the tree */
+    if (bracket_dups) db_put(db, 2, 0);
+
     int i;
     for (i=0; i<n; i++) {
         int k = htonl(1);
@@ -137,6 +137,8 @@ void test_dup_next(int n, int dup_mode) {
 
     r = expect_cursor_get(cursor, htonl(1), htonl(i), DB_NEXT_DUP); assert(r == DB_NOTFOUND);
 
+    r = expect_cursor_get(cursor, htonl(1), htonl(i-1), DB_CURRENT); assert(r == 0);
+
     r = cursor->c_close(cursor); assert(r == 0);
 
     r = db->close(db, 0); assert(r == 0);
@@ -150,8 +152,9 @@ int main(int argc, const char *argv[]) {
     system("rm -rf " DIR);
     mkdir(DIR, 0777);
 
-    for (i = 1; i <= (1<<16); i *= 2) {
-         test_dup_next(i, DB_DUP + DB_DUPSORT);
+    for (i = 1; i <= 65536; i *= 2) {
+        test_dup_next(i, DB_DUP + DB_DUPSORT, 0);
+        test_dup_next(i, DB_DUP + DB_DUPSORT, 1);
     }
 
     return 0;
-- 
2.30.9