From 266b8ea463255564f3d2ccc4501519e2f5bbfa48 Mon Sep 17 00:00:00 2001
From: "Bradley C. Kuszmaul" <bradley@tokutek.com>
Date: Tue, 4 Dec 2007 17:59:03 +0000
Subject: [PATCH] pma move logging works better.  Addresses #27

git-svn-id: file:///svn/tokudb@920 c7de825b-a66e-492c-adef-691d508d4ae1
---
 newbrt/pma.c     | 63 ++++++++++++++++++++++++++++++++++++++++++------
 newbrt/pma.h     |  7 ++++++
 newbrt/recover.c | 35 ++++++++++++++++++++++++---
 3 files changed, 94 insertions(+), 11 deletions(-)

diff --git a/newbrt/pma.c b/newbrt/pma.c
index b8d39e2e67..b861494f22 100644
--- a/newbrt/pma.c
+++ b/newbrt/pma.c
@@ -649,13 +649,9 @@ static int __pma_array_size(PMA pma __attribute__((unused)), int asksize) {
     return n;
 }
 
-static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz) {
-    int i;
-    int n;
-    int oldN = pma->N;
+int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize) {
+    pma->N = newsize;
 
-    n = __pma_array_size(pma, asksize);
-    pma->N = n;
     if (pma->pairs == 0)
         pma->pairs = toku_malloc((1 + pma->N) * sizeof (struct kv_pair *));
     else
@@ -664,9 +660,20 @@ static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pm
         return -1;
     pma->pairs[pma->N] = (void *) 0xdeadbeef;
 
-    for (i=startz; i<pma->N; i++) {
+    int i;
+    for (i=oldsize; i<pma->N; i++) {
         pma->pairs[i] = 0;
     }
+    return 0;
+}
+
+static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz) {
+    int n;
+    int oldN = pma->N;
+
+    n = __pma_array_size(pma, asksize);
+    int r = toku_resize_pma_exactly(pma, startz, n);
+    if (r!=0) return r;
     toku_pmainternal_calculate_parameters(pma);
     toku_log_resizepma (txn, toku_txn_get_txnid(txn), filenum, offset, oldN, n);
     return 0;
@@ -1640,3 +1647,45 @@ int toku_pma_set_at_index (PMA pma, int idx, DBT *key, DBT *value) {
     pma->n_pairs_present++;
     return 0;
 }
+
+// assume no cursors
+int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto) {
+    u_int32_t i;
+    for (i=0; i<fromto.size; i++) {
+	// First handle the case for sliding something left.  We can simply move it.
+	{
+	    int a=fromto.array[i].a;
+	    int b=fromto.array[i].b;
+	    if (b==a) continue;
+	    if (b<a) {
+		assert(pma->pairs[b]==0);
+		pma->pairs[b] = pma->pairs[a];
+		pma->pairs[a] = 0;
+		continue;
+	    }
+	}
+	// Otherwise slide things to the right  We have to find the rightmost thing that slides right and move it first.
+	{
+	    // We must slide things to the right.
+	    // Find the next index that does want to go to the left
+	    u_int32_t j;
+	    for (j=i+1; j<fromto.size && fromto.array[j].b <= fromto.array[j].a; j++) {
+		/*nothing */
+	    }
+	    // everything from i (inclusive) to j (exclusive) wants to slide to the right.
+	    u_int32_t jdown;
+	    for (jdown=j-1; 1; jdown--) {
+		int a=fromto.array[jdown].a;
+		int b=fromto.array[jdown].b;
+		if (a!=b) {
+		    assert(pma->pairs[b]==0);
+		    pma->pairs[b] = pma->pairs[a];
+		    pma->pairs[a] = 0;
+		}
+		if (i==jdown) break; // Do it this way so everything can be unsigned and we won't try to go negative.
+	    }
+	    i=j-1;
+	}
+    }
+    return 0;
+}
diff --git a/newbrt/pma.h b/newbrt/pma.h
index 0393891d08..cc0e7a7b11 100644
--- a/newbrt/pma.h
+++ b/newbrt/pma.h
@@ -144,8 +144,15 @@ void toku_pma_iterate (PMA, void(*)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), voi
 
 void toku_pma_verify_fingerprint (PMA pma, u_int32_t rand4fingerprint, u_int32_t fingerprint);
 
+// Set the PMA's size, without moving anything.
+int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize);
+
 int toku_pma_set_at_index (PMA, int /*index*/, DBT */*key*/, DBT */*value*/); // If the index is wrong or there is a value already, return nonzero
 
+// Requires:  No open cursors on the pma.
+int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto); // Return nonzero if the indices are somehow wrong.
+
 void toku_pma_show_stats (void);
 
+
 #endif
diff --git a/newbrt/recover.c b/newbrt/recover.c
index 4b0ef9483a..6f85d00384 100644
--- a/newbrt/recover.c
+++ b/newbrt/recover.c
@@ -133,6 +133,7 @@ static void toku_recover_fopen (struct logtype_fopen *c) {
     toku_free(fname);
     toku_free(c->fname.data);
 }
+
 static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
     CACHEFILE cf;
     BRT brt;
@@ -148,16 +149,42 @@ static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
     assert(r==0);
     node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
     node->u.l.n_bytes_in_buffer += KEY_VALUE_OVERHEAD + c->key.len + c->data.len; 
+    r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
+    assert(r==0);
 }
 
 static void toku_recover_resizepma (struct logtype_resizepma *c) {
-    c=c;
-    abort();
+    CACHEFILE cf;
+    BRT brt;
+    int r = find_cachefile(c->filenum, &cf, &brt);
+    assert(r==0);
+    void *node_v;
+    r = toku_cachetable_get_and_pin (cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
+    assert(r==0);
+    BRTNODE node = node_v;
+    assert(node->height==0);
+    r = toku_resize_pma_exactly (node->u.l.buffer, c->oldsize, c->newsize);
+    assert(r==0);
+    
+    r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
+    assert(r==0);
 }
 
 static void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
-    c=c;
-    abort();
+    CACHEFILE cf;
+    BRT brt;
+    int r = find_cachefile(c->filenum, &cf, &brt);
+    assert(r==0);
+    void *node_v;
+    r = toku_cachetable_get_and_pin(cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
+    assert(r==0);
+    BRTNODE node = node_v;
+    assert(node->height==0);
+    r = toku_pma_move_indices (node->u.l.buffer, c->fromto);
+    // The bytes in bufer and fingerprint shouldn't change
+
+    r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
+    assert(r==0);
 }
 
 int main (int argc, char *argv[]) {
-- 
2.30.9