Commit 4103a85c authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Add the extraargs callback for fetch. Also make brt-serial-test run. ...

Add the extraargs callback for fetch.  Also make brt-serial-test run.  Addresses #1000, #1080, #1131.

git-svn-id: file:///svn/tokudb.1131b+1080a@6115 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9d232d1a
......@@ -138,13 +138,13 @@ struct brt_header {
BLOCKNUM free_blocks; // free list for blocks. Use -1 to indicate that there are no free blocks
BLOCKNUM unused_blocks; // first unused block
u_int64_t max_blocknum_translated;
u_int64_t translated_blocknum_limit;
struct block_translation_pair *block_translation;
// Where and how big is the block translation vector stored on disk.
// The size of the on_disk buffer may no longer match the max_blocknum_translated field, since blocks may have been allocated or freed.
// We need to remember this old information so we can free it properly.
u_int64_t block_translation_size_on_disk; // the size of the block (i.e. 8 times the number of entries)
u_int64_t block_translation_size_on_disk; // the size of the block containing the translation (i.e. 8 times the number of entries)
u_int64_t block_translation_address_on_disk; // 0 if there is no memory allocated
// The in-memory data structure for block allocation
......@@ -175,7 +175,7 @@ struct brt {
};
/* serialization code */
void toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node);
void toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, BRT brt);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, int tree_node_size);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......
......@@ -126,9 +126,8 @@ const int uncompressed_magic_len = (8 // tokuleaf or tokunode
const int compression_header_len = (4 // compressed_len
+4); // uncompressed_len
void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node) {
void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, BRT brt) {
//printf("%s:%d serializing\n", __FILE__, __LINE__);
DISKOFF offset = blocknum.b * node->nodesize;
struct wbuf w;
int i;
unsigned int calculated_size = toku_serialize_brtnode_size(node) - 8; // don't include the compressed or uncompressed sizes
......@@ -255,7 +254,25 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node) {
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
{
// If the node has never been written, then write the whole buffer, including the zeros
assert(blocknum.b>=0);
printf("%s:%d trans=%lu\n", __FILE__, __LINE__, brt->h->translated_blocknum_limit);
if (brt->h->translated_blocknum_limit > (u_int64_t)blocknum.b) {
u_int64_t new_limit = blocknum.b + 1;
u_int64_t old_limit = brt->h->translated_blocknum_limit;
XREALLOC_N(new_limit, brt->h->block_translation);
while (++old_limit < new_limit) {
brt->h->block_translation[old_limit].diskoff = 0;
brt->h->block_translation[old_limit].size = 0;
}
brt->h->translated_blocknum_limit = new_limit;
} else {
block_allocator_free_block(brt->h->block_allocator, brt->h->block_translation[blocknum.b].diskoff);
}
size_t n_to_write = uncompressed_magic_len + compression_header_len + compressed_len;
u_int64_t offset;
block_allocator_alloc_block(brt->h->block_allocator, n_to_write, &offset);
brt->h->block_translation[blocknum.b].diskoff = offset;
brt->h->block_translation[blocknum.b].size = n_to_write;
ssize_t r=pwrite(fd, compressed_buf, n_to_write, offset);
if (r<0) printf("r=%ld errno=%d\n", (long)r, errno);
assert(r==(ssize_t)n_to_write);
......@@ -593,8 +610,8 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h)
if (h->block_translation_address_on_disk != 0) {
block_allocator_free_block(h->block_allocator, h->block_translation_address_on_disk);
}
block_allocator_alloc_block(h->block_allocator, 4 + 8*h->max_blocknum_translated, &h->block_translation_address_on_disk);
wbuf_ulonglong(wbuf, h->max_blocknum_translated);
block_allocator_alloc_block(h->block_allocator, 4 + 8*h->translated_blocknum_limit, &h->block_translation_address_on_disk);
wbuf_ulonglong(wbuf, h->translated_blocknum_limit);
wbuf_DISKOFF(wbuf, h->block_translation_address_on_disk);
if (h->n_named_roots>=0) {
int i;
......@@ -629,11 +646,11 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
}
{
struct wbuf w;
u_int64_t size = 4 + h->max_blocknum_translated * 8; // 4 for the checksum
u_int64_t size = 4 + h->translated_blocknum_limit * 8; // 4 for the checksum
printf("%s:%d writing translation table of size %ld\n", __FILE__, __LINE__, size);
wbuf_init(&w, toku_malloc(size), size);
u_int64_t i;
for (i=0; i<h->max_blocknum_translated; i++) {
for (i=0; i<h->translated_blocknum_limit; i++) {
wbuf_ulonglong(&w, h->block_translation[i].diskoff);
wbuf_ulonglong(&w, h->block_translation[i].size);
}
......@@ -671,8 +688,8 @@ int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_heade
h->free_blocks = rbuf_blocknum(&rc);
h->unused_blocks = rbuf_blocknum(&rc);
h->n_named_roots = rbuf_int(&rc);
h->max_blocknum_translated = rbuf_diskoff(&rc);
h->block_translation_size_on_disk = 4 + 8 * h->max_blocknum_translated;
h->translated_blocknum_limit = rbuf_diskoff(&rc);
h->block_translation_size_on_disk = 4 + 8 * h->translated_blocknum_limit;
h->block_translation_address_on_disk = rbuf_diskoff(&rc);
// Set up the the block translation buffer.
create_block_allocator(&h->block_allocator, h->nodesize);
......@@ -680,7 +697,7 @@ int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_heade
h->block_translation = 0;
} else {
block_allocator_alloc_block_at(h->block_allocator, h->block_translation_address_on_disk, h->block_translation_size_on_disk);
XMALLOC_N(h->max_blocknum_translated, h->block_translation);
XMALLOC_N(h->translated_blocknum_limit, h->block_translation);
unsigned char *XMALLOC_N(h->block_translation_size_on_disk, tbuf);
{
ssize_t r = pread(fd, tbuf, h->block_translation_size_on_disk, h->block_translation_address_on_disk);
......@@ -701,7 +718,7 @@ int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_heade
rt.ndone = 0;
rt.size = h->block_translation_size_on_disk-4;
assert(rt.size>0);
for (i=0; i<h->max_blocknum_translated; i++) {
for (i=0; i<h->translated_blocknum_limit; i++) {
h->block_translation[i].diskoff = rbuf_diskoff(&rt);
h->block_translation[i].size = rbuf_diskoff(&rt);
block_allocator_alloc_block_at(h->block_allocator, h->block_translation[i].diskoff, h->block_translation[i].size);
......
......@@ -161,7 +161,8 @@ static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) {
return cmp;
}
void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *brtnode_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
BRT brt = extraargs;
BRTNODE brtnode = brtnode_v;
// if ((write_me || keep_me) && (brtnode->height==0)) {
// toku_pma_verify_fingerprint(brtnode->u.l.buffer, brtnode->rand4fingerprint, brtnode->subtree_fingerprint);
......@@ -175,7 +176,7 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *
assert(brtnode->thisnodename.b==nodename.b);
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) {
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode);
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, brt);
}
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (!keep_me) {
......@@ -213,7 +214,15 @@ void toku_brtheader_free (struct brt_header *h) {
toku_free(h);
}
void toku_brtheader_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *header_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN lsn __attribute__((__unused__)), BOOL rename_p __attribute__((__unused__))) {
void toku_brtheader_flush_callback (CACHEFILE cachefile,
BLOCKNUM nodename,
void *header_v,
void *extra_args __attribute__((__unused__)),
long size __attribute__((unused)),
BOOL write_me,
BOOL keep_me,
LSN lsn __attribute__((__unused__)),
BOOL rename_p __attribute__((__unused__))) {
struct brt_header *h = header_v;
assert(nodename.b==0);
assert(!h->dirty); // shouldn't be dirty once it is unpinned.
......@@ -2156,7 +2165,7 @@ static int brt_alloc_init_header(BRT t, const char *dbname, TOKUTXN txn) {
t->h->nodesize=t->nodesize;
t->h->free_blocks = make_blocknum(-1);
t->h->unused_blocks=make_blocknum(2);
t->h->max_blocknum_translated = 0;
t->h->translated_blocknum_limit = 0;
t->h->block_translation = 0;
t->h->block_translation_size_on_disk = 0;
t->h->block_translation_address_on_disk = 0;
......
......@@ -505,7 +505,7 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) {
#if DO_CALLBACK_UNLOCK
cachetable_unlock(ct);
#endif
p->flush_callback(p->cachefile, p->key, p->value, p->size, FALSE, FALSE,
p->flush_callback(p->cachefile, p->key, p->value, p->extraargs, p->size, FALSE, FALSE,
ct->lsn_of_checkpoint, need_to_rename_p(ct, p));
ctpair_destroy(p);
#if DO_CALLBACK_UNLOCK
......@@ -528,7 +528,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
cachetable_unlock(ct);
#endif
// write callback
p->flush_callback(p->cachefile, p->key, p->value, p->size, p->dirty && p->write_me, TRUE,
p->flush_callback(p->cachefile, p->key, p->value, p->extraargs, p->size, p->dirty && p->write_me, TRUE,
ct->lsn_of_checkpoint, need_to_rename_p(ct, p));
#if DO_CALLBACK_USLEEP
usleep(DO_CALLBACK_USLEEP);
......
......@@ -41,7 +41,7 @@ int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int fl
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fname (used for logging)*/);
// the flush callback (write, free)
typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value, void *extraargs, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
// the fetch callback
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
......
......@@ -53,7 +53,23 @@ static void test_serialize(void) {
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
toku_serialize_brtnode_to(fd, make_blocknum(20), &sn); assert(r==0);
struct brt *XMALLOC(brt);
struct brt_header *XMALLOC(brt_h);
struct block_translation_pair *XMALLOC_N(21, btps);
memset(btps, 0, sizeof(btps));
brt->h = brt_h;
brt_h->translated_blocknum_limit = 1;
brt_h->block_translation = btps;
brt_h->block_translation[20].diskoff = 4096;
brt_h->block_translation[20].size = 100;
create_block_allocator(&brt_h->block_allocator, 4096);
{
u_int64_t b;
block_allocator_alloc_block(brt_h->block_allocator, 100, &b);
assert(b==4096);
}
toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt); assert(r==0);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, nodesize);
assert(r==0);
......@@ -108,6 +124,12 @@ static void test_serialize(void) {
toku_fifo_free(&BNC_BUFFER(&sn,1));
toku_free(sn.u.n.childinfos);
toku_free(sn.u.n.childkeys);
block_allocator_free_block(brt_h->block_allocator, 4096);
destroy_block_allocator(&brt_h->block_allocator);
toku_free(btps);
toku_free(brt_h);
toku_free(brt);
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
......
......@@ -43,7 +43,9 @@ static void* vals[KEYLIMIT];
static int n_keys=0;
static void r_flush (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k, void *value,
CACHEKEY k,
void *value,
void *extra __attribute__((__unused__)),
long size __attribute__((__unused__)),
BOOL write_me __attribute__((__unused__)),
BOOL keep_me,
......
......@@ -19,6 +19,7 @@ static double tdiff (struct timeval *a, struct timeval *b) {
static void f_flush (CACHEFILE f,
CACHEKEY key,
void *value,
void *extra __attribute__((__unused__)),
long size,
BOOL write_me,
BOOL keep_me,
......
......@@ -94,7 +94,15 @@ static void expectN(int64_t blocknum_n) {
static CACHEFILE expect_f;
static void flush (CACHEFILE f, CACHEKEY key, void*value, long size __attribute__((__unused__)), BOOL write_me __attribute__((__unused__)), BOOL keep_me __attribute__((__unused__)), LSN modified_lsn __attribute__((__unused__)), BOOL rename_p __attribute__((__unused__))) {
static void flush (CACHEFILE f,
CACHEKEY key,
void*value,
void *extra __attribute__((__unused__)),
long size __attribute__((__unused__)),
BOOL write_me __attribute__((__unused__)),
BOOL keep_me __attribute__((__unused__)),
LSN modified_lsn __attribute__((__unused__)),
BOOL rename_p __attribute__((__unused__))) {
struct item *it = value;
int i;
......@@ -261,7 +269,9 @@ static void test0 (void) {
toku_memory_check_all_free();
}
static void flush_n (CACHEFILE f __attribute__((__unused__)), CACHEKEY key __attribute__((__unused__)), void *value,
static void flush_n (CACHEFILE f __attribute__((__unused__)), CACHEKEY key __attribute__((__unused__)),
void *value,
void *extra __attribute__((__unused__)),
long size __attribute__((__unused__)),
BOOL write_me __attribute__((__unused__)), BOOL keep_me __attribute__((__unused__)),
LSN modified_lsn __attribute__((__unused__)), BOOL rename_p __attribute ((__unused__))) {
......@@ -324,6 +334,7 @@ static void test_nested_pin (void) {
static void null_flush (CACHEFILE cf __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *extra __attribute__((__unused__)),
long size __attribute__((__unused__)),
BOOL write_me __attribute__((__unused__)),
BOOL keep_me __attribute__((__unused__)),
......@@ -391,7 +402,15 @@ static void test_multi_filehandles (void) {
r = toku_cachetable_close(&t); assert(r==0);
}
static void test_dirty_flush(CACHEFILE f, CACHEKEY key, void *value, long size, BOOL do_write, BOOL keep, LSN modified_lsn __attribute__((__unused__)), BOOL rename_p __attribute__((__unused__))) {
static void test_dirty_flush(CACHEFILE f,
CACHEKEY key,
void *value,
void *extra __attribute__((__unused__)),
long size,
BOOL do_write,
BOOL keep,
LSN modified_lsn __attribute__((__unused__)),
BOOL rename_p __attribute__((__unused__))) {
if (verbose) printf("test_dirty_flush %p %" PRId64 " %p %ld %d %d\n", f, key.b, value, size, do_write, keep);
}
......@@ -508,7 +527,15 @@ static void test_dirty() {
static int test_size_debug;
static CACHEKEY test_size_flush_key;
static void test_size_flush_callback(CACHEFILE f, CACHEKEY key, void *value, long size, BOOL do_write, BOOL keep, LSN modified_lsn __attribute__((__unused__)), BOOL rename_p __attribute__((__unused__))) {
static void test_size_flush_callback(CACHEFILE f,
CACHEKEY key,
void *value,
void *extra __attribute__((__unused__)),
long size,
BOOL do_write,
BOOL keep,
LSN modified_lsn __attribute__((__unused__)),
BOOL rename_p __attribute__((__unused__))) {
if (test_size_debug && verbose) printf("test_size_flush %p %" PRId64 " %p %ld %d %d\n", f, key.b, value, size, do_write, keep);
if (keep) {
assert(do_write != 0);
......
......@@ -99,6 +99,7 @@ static void file_is_not_present(CACHEFILE cf) {
static void flush_forchain (CACHEFILE f __attribute__((__unused__)),
CACHEKEY key,
void *value,
void *extra __attribute__((__unused__)),
long size __attribute__((__unused__)),
BOOL write_me __attribute__((__unused__)),
BOOL keep_me __attribute__((__unused__)),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment