Commit 266b8ea4 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

pma move logging works better. Addresses #27

git-svn-id: file:///svn/tokudb@920 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3604deea
...@@ -649,13 +649,9 @@ static int __pma_array_size(PMA pma __attribute__((unused)), int asksize) { ...@@ -649,13 +649,9 @@ static int __pma_array_size(PMA pma __attribute__((unused)), int asksize) {
return n; return n;
} }
static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz) { int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize) {
int i; pma->N = newsize;
int n;
int oldN = pma->N;
n = __pma_array_size(pma, asksize);
pma->N = n;
if (pma->pairs == 0) if (pma->pairs == 0)
pma->pairs = toku_malloc((1 + pma->N) * sizeof (struct kv_pair *)); pma->pairs = toku_malloc((1 + pma->N) * sizeof (struct kv_pair *));
else else
...@@ -664,9 +660,20 @@ static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pm ...@@ -664,9 +660,20 @@ static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pm
return -1; return -1;
pma->pairs[pma->N] = (void *) 0xdeadbeef; pma->pairs[pma->N] = (void *) 0xdeadbeef;
for (i=startz; i<pma->N; i++) { int i;
for (i=oldsize; i<pma->N; i++) {
pma->pairs[i] = 0; pma->pairs[i] = 0;
} }
return 0;
}
static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz) {
int n;
int oldN = pma->N;
n = __pma_array_size(pma, asksize);
int r = toku_resize_pma_exactly(pma, startz, n);
if (r!=0) return r;
toku_pmainternal_calculate_parameters(pma); toku_pmainternal_calculate_parameters(pma);
toku_log_resizepma (txn, toku_txn_get_txnid(txn), filenum, offset, oldN, n); toku_log_resizepma (txn, toku_txn_get_txnid(txn), filenum, offset, oldN, n);
return 0; return 0;
...@@ -1640,3 +1647,45 @@ int toku_pma_set_at_index (PMA pma, int idx, DBT *key, DBT *value) { ...@@ -1640,3 +1647,45 @@ int toku_pma_set_at_index (PMA pma, int idx, DBT *key, DBT *value) {
pma->n_pairs_present++; pma->n_pairs_present++;
return 0; return 0;
} }
// assume no cursors
int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto) {
u_int32_t i;
for (i=0; i<fromto.size; i++) {
// First handle the case for sliding something left. We can simply move it.
{
int a=fromto.array[i].a;
int b=fromto.array[i].b;
if (b==a) continue;
if (b<a) {
assert(pma->pairs[b]==0);
pma->pairs[b] = pma->pairs[a];
pma->pairs[a] = 0;
continue;
}
}
// Otherwise slide things to the right We have to find the rightmost thing that slides right and move it first.
{
// We must slide things to the right.
// Find the next index that does want to go to the left
u_int32_t j;
for (j=i+1; j<fromto.size && fromto.array[j].b <= fromto.array[j].a; j++) {
/*nothing */
}
// everything from i (inclusive) to j (exclusive) wants to slide to the right.
u_int32_t jdown;
for (jdown=j-1; 1; jdown--) {
int a=fromto.array[jdown].a;
int b=fromto.array[jdown].b;
if (a!=b) {
assert(pma->pairs[b]==0);
pma->pairs[b] = pma->pairs[a];
pma->pairs[a] = 0;
}
if (i==jdown) break; // Do it this way so everything can be unsigned and we won't try to go negative.
}
i=j-1;
}
}
return 0;
}
...@@ -144,8 +144,15 @@ void toku_pma_iterate (PMA, void(*)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), voi ...@@ -144,8 +144,15 @@ void toku_pma_iterate (PMA, void(*)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), voi
void toku_pma_verify_fingerprint (PMA pma, u_int32_t rand4fingerprint, u_int32_t fingerprint); void toku_pma_verify_fingerprint (PMA pma, u_int32_t rand4fingerprint, u_int32_t fingerprint);
// Set the PMA's size, without moving anything.
int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize);
int toku_pma_set_at_index (PMA, int /*index*/, DBT */*key*/, DBT */*value*/); // If the index is wrong or there is a value already, return nonzero int toku_pma_set_at_index (PMA, int /*index*/, DBT */*key*/, DBT */*value*/); // If the index is wrong or there is a value already, return nonzero
// Requires: No open cursors on the pma.
int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto); // Return nonzero if the indices are somehow wrong.
void toku_pma_show_stats (void); void toku_pma_show_stats (void);
#endif #endif
...@@ -133,6 +133,7 @@ static void toku_recover_fopen (struct logtype_fopen *c) { ...@@ -133,6 +133,7 @@ static void toku_recover_fopen (struct logtype_fopen *c) {
toku_free(fname); toku_free(fname);
toku_free(c->fname.data); toku_free(c->fname.data);
} }
static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) { static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
CACHEFILE cf; CACHEFILE cf;
BRT brt; BRT brt;
...@@ -148,16 +149,42 @@ static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) { ...@@ -148,16 +149,42 @@ static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
assert(r==0); assert(r==0);
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len); node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer += KEY_VALUE_OVERHEAD + c->key.len + c->data.len; node->u.l.n_bytes_in_buffer += KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
} }
static void toku_recover_resizepma (struct logtype_resizepma *c) { static void toku_recover_resizepma (struct logtype_resizepma *c) {
c=c; CACHEFILE cf;
abort(); BRT brt;
int r = find_cachefile(c->filenum, &cf, &brt);
assert(r==0);
void *node_v;
r = toku_cachetable_get_and_pin (cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
r = toku_resize_pma_exactly (node->u.l.buffer, c->oldsize, c->newsize);
assert(r==0);
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
} }
static void toku_recover_pmadistribute (struct logtype_pmadistribute *c) { static void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
c=c; CACHEFILE cf;
abort(); BRT brt;
int r = find_cachefile(c->filenum, &cf, &brt);
assert(r==0);
void *node_v;
r = toku_cachetable_get_and_pin(cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
r = toku_pma_move_indices (node->u.l.buffer, c->fromto);
// The bytes in bufer and fingerprint shouldn't change
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
} }
int main (int argc, char *argv[]) { int main (int argc, char *argv[]) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment