Commit 62296eab authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Log some pma movements. Addresses #27.

git-svn-id: file:///svn/tokudb@911 c7de825b-a66e-492c-adef-691d508d4ae1
parent b7933628
...@@ -390,7 +390,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl ...@@ -390,7 +390,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
} }
if (n_in_buf > 0) { if (n_in_buf > 0) {
u_int32_t actual_sum = 0; u_int32_t actual_sum = 0;
r = toku_pma_bulk_insert(result->u.l.buffer, keys, vals, n_in_buf, result->rand4fingerprint, &actual_sum); r = toku_pma_bulk_insert((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, result->u.l.buffer, keys, vals, n_in_buf, result->rand4fingerprint, &actual_sum);
if (r!=0) goto died_21; if (r!=0) goto died_21;
if (actual_sum!=result->local_fingerprint) { if (actual_sum!=result->local_fingerprint) {
//fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf); //fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf);
......
...@@ -393,7 +393,7 @@ static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v ...@@ -393,7 +393,7 @@ static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v
} }
static int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk) { static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk) {
BRTNODE A,B; BRTNODE A,B;
assert(node->height==0); assert(node->height==0);
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */ assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
...@@ -408,9 +408,9 @@ static int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, D ...@@ -408,9 +408,9 @@ static int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, D
//printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize); //printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize);
assert(node->height>0 || node->u.l.buffer!=0); assert(node->height>0 || node->u.l.buffer!=0);
int r; int r;
r = toku_pma_split(node->u.l.buffer, &node->u.l.n_bytes_in_buffer, splitk, r = toku_pma_split(txn, filenum, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, splitk,
A->u.l.buffer, &A->u.l.n_bytes_in_buffer, A->rand4fingerprint, &A->local_fingerprint, A->thisnodename, A->u.l.buffer, &A->u.l.n_bytes_in_buffer, A->rand4fingerprint, &A->local_fingerprint,
B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint); A->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint);
assert(r == 0); assert(r == 0);
assert(node->height>0 || node->u.l.buffer!=0); assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */ /* Remove it from the cache table, and free its storage. */
...@@ -933,11 +933,12 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -933,11 +933,12 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int debug, int debug,
TOKUTXN txn) { TOKUTXN txn) {
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint); // toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
FILENUM filenum = toku_cachefile_filenum(t->cf);
if (cmd->type == BRT_INSERT) { if (cmd->type == BRT_INSERT) {
DBT *k = cmd->u.id.key; DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val; DBT *v = cmd->u.id.val;
int replaced_v_size; int replaced_v_size;
enum pma_errors pma_status = toku_pma_insert_or_replace(node->u.l.buffer, k, v, &replaced_v_size, txn, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint); enum pma_errors pma_status = toku_pma_insert_or_replace(node->u.l.buffer, k, v, &replaced_v_size, txn, filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint);
assert(pma_status==BRT_OK); assert(pma_status==BRT_OK);
//printf("replaced_v_size=%d\n", replaced_v_size); //printf("replaced_v_size=%d\n", replaced_v_size);
if (replaced_v_size>=0) { if (replaced_v_size>=0) {
...@@ -951,7 +952,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -951,7 +952,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
// If it doesn't fit, then split the leaf. // If it doesn't fit, then split the leaf.
if (toku_serialize_brtnode_size(node) > node->nodesize) { if (toku_serialize_brtnode_size(node) > node->nodesize) {
int r = brtleaf_split (t, node, nodea, nodeb, splitk); int r = brtleaf_split (txn, filenum, t, node, nodea, nodeb, splitk);
if (r!=0) return r; if (r!=0) return r;
//printf("%s:%d splitkey=%s\n", __FILE__, __LINE__, (char*)*splitkey); //printf("%s:%d splitkey=%s\n", __FILE__, __LINE__, (char*)*splitkey);
split_count++; split_count++;
......
...@@ -56,4 +56,11 @@ typedef struct loggedbrtheader { ...@@ -56,4 +56,11 @@ typedef struct loggedbrtheader {
} u; } u;
} LOGGEDBRTHEADER; } LOGGEDBRTHEADER;
typedef struct intpairarray {
u_int32_t size;
struct intpair {
u_int32_t a,b;
} *array;
} INTPAIRARRAY;
#endif #endif
...@@ -72,3 +72,7 @@ static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) { ...@@ -72,3 +72,7 @@ static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) {
assert(bs.n_named_roots=0); assert(bs.n_named_roots=0);
return 4+4+4+8+8+4+8; return 4+4+4+8+8+4+8;
} }
static inline int toku_logsizeof_INTPAIRARRAY (INTPAIRARRAY pa) {
return 4+(4+4)*pa.size;
}
...@@ -469,7 +469,7 @@ int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, u_int32_t *crc, u_int32_t *l ...@@ -469,7 +469,7 @@ int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, u_int32_t *crc, u_int32_t *l
return 0; return 0;
} }
int toku_fread_LOGGEDBRTHEADER(FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_int32_t *len) { int toku_fread_LOGGEDBRTHEADER (FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_int32_t *len) {
int r; int r;
r = toku_fread_u_int32_t(f, &v->size, crc, len); if (r!=0) return r; r = toku_fread_u_int32_t(f, &v->size, crc, len); if (r!=0) return r;
r = toku_fread_u_int32_t(f, &v->flags, crc, len); if (r!=0) return r; r = toku_fread_u_int32_t(f, &v->flags, crc, len); if (r!=0) return r;
...@@ -482,6 +482,19 @@ int toku_fread_LOGGEDBRTHEADER(FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_in ...@@ -482,6 +482,19 @@ int toku_fread_LOGGEDBRTHEADER(FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_in
return 0; return 0;
} }
int toku_fread_INTPAIRARRAY (FILE *f, INTPAIRARRAY *v, u_int32_t *crc, u_int32_t *len) {
int r;
u_int32_t i;
r = toku_fread_u_int32_t(f, &v->size, crc, len); if (r!=0) return r;
MALLOC_N(v->size, v->array);
if (v->array!=0) return errno;
for (i=0; i<v->size; i++) {
r = toku_fread_u_int32_t(f, &v->array[i].a, crc, len); if (r!=0) return r;
r = toku_fread_u_int32_t(f, &v->array[i].b, crc, len); if (r!=0) return r;
}
return 0;
}
int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *format __attribute__((__unused__))) { int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *format __attribute__((__unused__))) {
LSN v; LSN v;
int r = toku_fread_LSN(inf, &v, crc, len); int r = toku_fread_LSN(inf, &v, crc, len);
...@@ -559,6 +572,20 @@ int toku_logprint_LOGGEDBRTHEADER (FILE *outf, FILE *inf, const char *fieldname, ...@@ -559,6 +572,20 @@ int toku_logprint_LOGGEDBRTHEADER (FILE *outf, FILE *inf, const char *fieldname,
} }
int toku_logprint_INTPAIRARRAY (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *format __attribute__((__unused__))) {
INTPAIRARRAY v;
u_int32_t i;
int r = toku_fread_INTPAIRARRAY(inf, &v, crc, len);
if (r!=0) return r;
fprintf(outf, " %s={size=%d array={", fieldname, v.size);
for (i=0; i<v.size; i++) {
if (i!=0) fprintf(outf, " ");
fprintf(outf, "{%d %d}", v.array[i].a, v.array[i].b);
}
toku_free(v.array);
return 0;
}
int toku_read_and_print_logmagic (FILE *f, u_int32_t *versionp) { int toku_read_and_print_logmagic (FILE *f, u_int32_t *versionp) {
{ {
char magic[8]; char magic[8];
......
...@@ -42,6 +42,7 @@ int toku_fread_TXNID (FILE *f, TXNID *txnid, u_int32_t *crc, u_int32_t *len); ...@@ -42,6 +42,7 @@ int toku_fread_TXNID (FILE *f, TXNID *txnid, u_int32_t *crc, u_int32_t *len);
// fills in the bs with malloced data. // fills in the bs with malloced data.
int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, u_int32_t *crc, u_int32_t *len); int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, u_int32_t *crc, u_int32_t *len);
int toku_fread_LOGGEDBRTHEADER(FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_int32_t *len); int toku_fread_LOGGEDBRTHEADER(FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_int32_t *len);
int toku_fread_INTPAIRARRAY (FILE *f, INTPAIRARRAY *v, u_int32_t *crc, u_int32_t *len);
int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *); int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *);
int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *); int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *);
...@@ -51,6 +52,7 @@ int toku_logprint_DISKOFF (FILE *outf, FILE *inf, const char *fieldname, ...@@ -51,6 +52,7 @@ int toku_logprint_DISKOFF (FILE *outf, FILE *inf, const char *fieldname,
int toku_logprint_u_int8_t (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *); int toku_logprint_u_int8_t (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *);
int toku_logprint_u_int32_t (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *); int toku_logprint_u_int32_t (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *);
int toku_logprint_LOGGEDBRTHEADER (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *); int toku_logprint_LOGGEDBRTHEADER (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *);
int toku_logprint_INTPAIRARRAY (FILE *outf, FILE *inf, const char *fieldname, u_int32_t *crc, u_int32_t *len, const char *);
int toku_read_and_print_logmagic (FILE *f, u_int32_t *version); int toku_read_and_print_logmagic (FILE *f, u_int32_t *version);
......
...@@ -71,6 +71,17 @@ const struct logtype logtypes[] = { ...@@ -71,6 +71,17 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
{"resizepma", 'R', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "oldsize", 0},
{"u_int32_t", "newsize", 0},
NULLFIELD}},
{"pmadistribute", 'M', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"INTPAIRARRAY", "fromto", 0},
NULLFIELD}},
{0,0,FA{NULLFIELD}} {0,0,FA{NULLFIELD}}
}; };
......
...@@ -36,9 +36,9 @@ struct pma { ...@@ -36,9 +36,9 @@ struct pma {
int toku_pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi); int toku_pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi);
void toku_pmainternal_calculate_parameters (PMA pma); void toku_pmainternal_calculate_parameters (PMA pma);
int toku_pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx, int base, PMA pma); int toku_pmainternal_smooth_region (TOKUTXN, FILENUM, DISKOFF, struct kv_pair */*pairs*/[], int /*n*/, int /*idx*/, int /*base*/, PMA /*pma*/, int */*new_idx*/);
int toku_pmainternal_printpairs (struct kv_pair *pairs[], int N); int toku_pmainternal_printpairs (struct kv_pair *pairs[], int N);
int toku_pmainternal_make_space_at (PMA pma, int idx); int toku_pmainternal_make_space_at (TOKUTXN, FILENUM, DISKOFF, PMA pma, int idx, int *new_index);
int toku_pmainternal_find (PMA pma, DBT *); // The DB is so the comparison fuction can be called. int toku_pmainternal_find (PMA pma, DBT *); // The DB is so the comparison fuction can be called.
void toku_print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */ void toku_print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
......
...@@ -16,7 +16,7 @@ static DB * const null_db = 0; ...@@ -16,7 +16,7 @@ static DB * const null_db = 0;
static const DISKOFF null_diskoff = -1; static const DISKOFF null_diskoff = -1;
static const FILENUM null_filenum = {0}; static const FILENUM null_filenum = {0};
#define NULL_ARGS null_txn, null_diskoff #define NULL_ARGS null_txn, null_filenum, null_diskoff
void *skey=0, *sval=0; void *skey=0, *sval=0;
...@@ -30,7 +30,7 @@ void local_memory_check_all_free(void) { ...@@ -30,7 +30,7 @@ void local_memory_check_all_free(void) {
static void test_make_space_at (void) { static void test_make_space_at (void) {
PMA pma; PMA pma;
char *key; char *key;
int r; int r, newi;
struct kv_pair *key_A, *key_B; struct kv_pair *key_A, *key_B;
key = "A"; key = "A";
...@@ -41,17 +41,19 @@ static void test_make_space_at (void) { ...@@ -41,17 +41,19 @@ static void test_make_space_at (void) {
r=toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0); r=toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0);
assert(r==0); assert(r==0);
assert(toku_pma_n_entries(pma)==0); assert(toku_pma_n_entries(pma)==0);
r=toku_pmainternal_make_space_at(pma, 2); r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi);
assert(r==0);
assert(toku_pma_index_limit(pma)==4); assert(toku_pma_index_limit(pma)==4);
assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL); assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL);
toku_print_pma(pma); toku_print_pma(pma);
pma->pairs[2] = key_A; pma->pairs[2] = key_A;
pma->n_pairs_present++; pma->n_pairs_present++;
r=toku_pmainternal_make_space_at(pma,2); r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi);
printf("Requested space at 2, got space at %d\n", r); assert(r==0);
printf("Requested space at 2, got space at %d\n", newi);
toku_print_pma(pma); toku_print_pma(pma);
assert(pma->pairs[r]==0); assert(pma->pairs[newi]==0);
assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL); assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL);
assert(toku_pma_index_limit(pma)==4); assert(toku_pma_index_limit(pma)==4);
...@@ -61,8 +63,9 @@ static void test_make_space_at (void) { ...@@ -61,8 +63,9 @@ static void test_make_space_at (void) {
pma->pairs[3] = 0; pma->pairs[3] = 0;
pma->n_pairs_present=2; pma->n_pairs_present=2;
toku_print_pma(pma); toku_print_pma(pma);
r=toku_pmainternal_make_space_at(pma,0); toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 0, &newi);
printf("Requested space at 0, got space at %d\n", r); assert(r==0);
printf("Requested space at 0, got space at %d\n", newi);
toku_print_pma(pma); toku_print_pma(pma);
assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL); // make sure it doesn't go off the end. assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL); // make sure it doesn't go off the end.
...@@ -77,14 +80,15 @@ static void test_make_space_at (void) { ...@@ -77,14 +80,15 @@ static void test_make_space_at (void) {
pma->pairs[7] = 0; pma->pairs[7] = 0;
pma->n_pairs_present=2; pma->n_pairs_present=2;
toku_print_pma(pma); toku_print_pma(pma);
r=toku_pmainternal_make_space_at(pma,5); r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 5, &newi);
assert(r==0);
toku_print_pma(pma); toku_print_pma(pma);
printf("r=%d\n", r); printf("r=%d\n", newi);
{ {
int i; int i;
for (i=0; i<toku_pma_index_limit(pma); i++) { for (i=0; i<toku_pma_index_limit(pma); i++) {
if (pma->pairs[i]) { if (pma->pairs[i]) {
assert(i<r); assert(i<newi);
pma->pairs[i] = 0; pma->pairs[i] = 0;
} }
} }
...@@ -170,7 +174,7 @@ static void test_smooth_region_N (int N) { ...@@ -170,7 +174,7 @@ static void test_smooth_region_N (int N) {
} }
} }
toku_pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat); toku_pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat);
r = toku_pmainternal_smooth_region(pairs, N, insertat, 0, 0); toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, insertat, 0, 0, &r);
toku_pmainternal_printpairs(pairs, N); printf(" at %d\n", r); toku_pmainternal_printpairs(pairs, N); printf(" at %d\n", r);
assert(0<=r); assert(r<N); assert(0<=r); assert(r<N);
assert(pairs[r]==0); assert(pairs[r]==0);
...@@ -211,7 +215,8 @@ static void test_smooth_region6 (void) { ...@@ -211,7 +215,8 @@ static void test_smooth_region6 (void) {
key = "B"; key = "B";
pairs[1] = kv_pair_malloc(key, strlen(key)+1, 0, 0); pairs[1] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
int r = toku_pmainternal_smooth_region(pairs, N, 2, 0, 0); int r;
toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, 2, 0, 0, &r);
printf("{ "); printf("{ ");
for (i=0; i<N; i++) for (i=0; i<N; i++)
printf("%s ", pairs[i] ? pairs[i]->key : "?"); printf("%s ", pairs[i] ? pairs[i]->key : "?");
...@@ -859,7 +864,10 @@ static void test_pma_split_n(int n) { ...@@ -859,7 +864,10 @@ static void test_pma_split_n(int n) {
printf("a:"); toku_print_pma(pmaa); printf("a:"); toku_print_pma(pmaa);
error = toku_pma_split(pmaa, 0, 0, pmab, 0, brand, &bsum, pmac, 0, crand, &csum); error = toku_pma_split(null_txn, null_filenum,
pmaa, 0, 0,
null_diskoff, pmab, 0, brand, &bsum,
null_diskoff, pmac, 0, crand, &csum);
assert(error == 0); assert(error == 0);
toku_pma_verify(pmaa); toku_pma_verify(pmaa);
toku_pma_verify(pmab); toku_pma_verify(pmab);
...@@ -925,7 +933,10 @@ static void test_pma_dup_split_n(int n, int dup_mode) { ...@@ -925,7 +933,10 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
DBT splitk; DBT splitk;
error = toku_pma_split(pmaa, 0, &splitk, pmab, 0, brand, &bsum, pmac, 0, crand, &csum); error = toku_pma_split(null_txn, null_filenum,
pmaa, 0, &splitk,
(DISKOFF)0, pmab, 0, brand, &bsum,
(DISKOFF)0, pmac, 0, crand, &csum);
assert(error == 0); assert(error == 0);
toku_pma_verify(pmaa); toku_pma_verify(pmaa);
toku_pma_verify(pmab); toku_pma_verify(pmab);
...@@ -997,7 +1008,10 @@ static void test_pma_split_varkey(void) { ...@@ -997,7 +1008,10 @@ static void test_pma_split_varkey(void) {
printf("a:"); toku_print_pma(pmaa); printf("a:"); toku_print_pma(pmaa);
error = toku_pma_split(pmaa, 0, 0, pmab, 0, brand, &bsum, pmac, 0, crand, &csum); error = toku_pma_split(null_txn, null_filenum,
pmaa, 0, 0,
(DISKOFF)0, pmab, 0, brand, &bsum,
(DISKOFF)0, pmac, 0, crand, &csum);
assert(error == 0); assert(error == 0);
toku_pma_verify(pmaa); toku_pma_verify(pmaa);
toku_pma_verify(pmab); toku_pma_verify(pmab);
...@@ -1143,7 +1157,9 @@ static void test_pma_split_cursor(void) { ...@@ -1143,7 +1157,9 @@ static void test_pma_split_cursor(void) {
// print_cursor("cursorc", cursorc); // print_cursor("cursorc", cursorc);
assert_cursor_val(cursorc, 16); assert_cursor_val(cursorc, 16);
error = toku_pma_split(pmaa, 0, 0, pmab, 0, brand, &bsum, pmac, 0, crand, &csum); error = toku_pma_split(null_txn, null_filenum, pmaa, 0, 0,
(DISKOFF)0, pmab, 0, brand, &bsum,
(DISKOFF)0, pmac, 0, crand, &csum);
assert(error == 0); assert(error == 0);
toku_pma_verify_fingerprint(pmab, brand, bsum); toku_pma_verify_fingerprint(pmab, brand, bsum);
...@@ -1253,7 +1269,7 @@ static void test_pma_bulk_insert_n(int n) { ...@@ -1253,7 +1269,7 @@ static void test_pma_bulk_insert_n(int n) {
} }
/* bulk insert n kv pairs */ /* bulk insert n kv pairs */
error = toku_pma_bulk_insert(pma, keys, vals, n, rand4fingerprint, &sum); error = toku_pma_bulk_insert(null_txn, null_filenum, (DISKOFF)0, pma, keys, vals, n, rand4fingerprint, &sum);
assert(error == 0); assert(error == 0);
assert(sum==expect_fingerprint); assert(sum==expect_fingerprint);
toku_pma_verify(pma); toku_pma_verify(pma);
......
This diff is collapsed.
...@@ -47,15 +47,15 @@ int toku_pma_n_entries (PMA); ...@@ -47,15 +47,15 @@ int toku_pma_n_entries (PMA);
/* Duplicates the key and keylen. */ /* Duplicates the key and keylen. */
//enum pma_errors toku_pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen); //enum pma_errors toku_pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
// The DB pointer is there so that the comparison function can be called. // The DB pointer is there so that the comparison function can be called.
enum pma_errors toku_pma_insert (PMA, DBT*, DBT*, TOKUTXN txn, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/); enum pma_errors toku_pma_insert (PMA, DBT*, DBT*, TOKUTXN, FILENUM, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/);
/* This returns an error if the key is NOT present. */ /* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen); int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
/* This returns an error if the key is NOT present. */ /* This returns an error if the key is NOT present. */
int toku_pma_delete (PMA, DBT *, u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/, u_int32_t *deleted_size); int toku_pma_delete (PMA, DBT *, u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/, u_int32_t *deleted_size);
int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, int toku_pma_insert_or_replace (PMA /*pma*/, DBT */*k*/, DBT */*v*/,
int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */ int */*replaced_v_size*/, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
TOKUTXN txn, DISKOFF, TOKUTXN /*txn*/, FILENUM, DISKOFF,
u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/); u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/);
...@@ -73,9 +73,10 @@ enum pma_errors toku_pma_lookup (PMA, DBT*, DBT*); ...@@ -73,9 +73,10 @@ enum pma_errors toku_pma_lookup (PMA, DBT*, DBT*);
* leftpma - the pma assigned keys <= pivot key * leftpma - the pma assigned keys <= pivot key
* rightpma - the pma assigned keys > pivot key * rightpma - the pma assigned keys > pivot key
*/ */
int toku_pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, int toku_pma_split(TOKUTXN, FILENUM,
PMA leftpma, unsigned int *leftpma_size, u_int32_t leftrand4sum, u_int32_t *leftfingerprint, PMA /*origpma*/, unsigned int */*origpma_size*/, DBT */*splitk*/,
PMA rightpma, unsigned int *rightpma_size, u_int32_t rightrand4sum, u_int32_t *rightfingerprint); DISKOFF /*leftdiskoff*/, PMA /*leftpma*/, unsigned int */*leftpma_size*/, u_int32_t /*leftrand4sum*/, u_int32_t */*leftfingerprint*/,
DISKOFF /*rightdiskoff*/, PMA /*rightpma*/, unsigned int */*rightpma_size*/, u_int32_t /*rightrand4sum*/, u_int32_t */*rightfingerprint*/);
/* /*
* Insert several key value pairs into an empty pma. * Insert several key value pairs into an empty pma.
...@@ -88,7 +89,7 @@ int toku_pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, ...@@ -88,7 +89,7 @@ int toku_pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk,
* vals - an array of values * vals - an array of values
* n_newpairs - the number of key value pairs * n_newpairs - the number of key value pairs
*/ */
int toku_pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint); int toku_pma_bulk_insert(TOKUTXN, FILENUM, DISKOFF, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint);
/* Move the cursor to the beginning or the end or to a key */ /* Move the cursor to the beginning or the end or to a key */
int toku_pma_cursor (PMA, PMA_CURSOR *, void** /*sskey*/, void ** /*ssval*/); // the sskey and ssval point to variables that hold blocks that can be used to return values for zero'd DBTS. int toku_pma_cursor (PMA, PMA_CURSOR *, void** /*sskey*/, void ** /*ssval*/); // the sskey and ssval point to variables that hold blocks that can be used to return values for zero'd DBTS.
......
...@@ -145,9 +145,19 @@ static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) { ...@@ -145,9 +145,19 @@ static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
assert(node->height==0); assert(node->height==0);
DBT key,data; DBT key,data;
r = toku_pma_set_at_index(node->u.l.buffer, c->pmaidx, toku_fill_dbt(&key, c->key.data, c->key.len), toku_fill_dbt(&data, c->data.data, c->data.len)); r = toku_pma_set_at_index(node->u.l.buffer, c->pmaidx, toku_fill_dbt(&key, c->key.data, c->key.len), toku_fill_dbt(&data, c->data.data, c->data.len));
assert(r==0);
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len); node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer += KEY_VALUE_OVERHEAD + c->key.len + c->data.len; node->u.l.n_bytes_in_buffer += KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
assert(r==0); }
static void toku_recover_resizepma (struct logtype_resizepma *c) {
c=c;
abort();
}
static void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
c=c;
abort();
} }
int main (int argc, char *argv[]) { int main (int argc, char *argv[]) {
......
...@@ -137,4 +137,13 @@ static inline void wbuf_LOGGEDBRTHEADER (struct wbuf *w, LOGGEDBRTHEADER h) { ...@@ -137,4 +137,13 @@ static inline void wbuf_LOGGEDBRTHEADER (struct wbuf *w, LOGGEDBRTHEADER h) {
} }
} }
static inline void wbuf_INTPAIRARRAY (struct wbuf *w, INTPAIRARRAY h) {
u_int32_t i;
wbuf_int(w, h.size);
for (i=0; i<h.size; i++) {
wbuf_int(w, h.array[i].a);
wbuf_int(w, h.array[i].b);
}
}
#endif #endif
...@@ -32,7 +32,7 @@ static void make_db (void) { ...@@ -32,7 +32,7 @@ static void make_db (void) {
r=tid->commit(tid, 0); assert(r==0); r=tid->commit(tid, 0); assert(r==0);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0); r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
for (i=0; i<10; i++) { for (i=0; i<2; i++) {
char hello[30], there[30]; char hello[30], there[30];
DBT key,data; DBT key,data;
snprintf(hello, sizeof(hello), "hello%ld.%d", random(), i); snprintf(hello, sizeof(hello), "hello%ld.%d", random(), i);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment