Commit b85a5fdc authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Log db close so that recovery will work right if the same db is opened and closed repeatedly.

Also the file numbers can thus be reused.
Don't pass the BRT into the flush commands, since the BRT may no longer be present.
Put a counter in to see how many rollback records are present.  (Addresses #698.)
Increment the file version to 4.
Fixes #545, #703.

Note: All the tests pass except
 * Many cxx tests are getting valgrind errors.  (Addresses #716.  Possibly causes #716.)
 * {{{test_log9.recover}}} fails with "Binary files ... differ".  These will presumably be fixed by #711 or #714.  (Addresses #711, #714.)
 * {{{test_log10.recover}}} fails.   There are two failures:
  1. A valgrind problem (see #718.)  (Addresses #718.  Possibly causes #718.)
  1. The "Binary files ... differ" issue.


git-svn-id: file:///svn/tokudb@3486 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4c964dd8
...@@ -20,6 +20,7 @@ int dbcreate(char *dbfile, char *dbname, int dbflags, int argc, char *argv[]) { ...@@ -20,6 +20,7 @@ int dbcreate(char *dbfile, char *dbname, int dbflags, int argc, char *argv[]) {
r = db->open(0, dbfile, dbname, DB_BTREE, DB_CREATE, 0777); r = db->open(0, dbfile, dbname, DB_BTREE, DB_CREATE, 0777);
if (r != 0) { if (r != 0) {
printf("db->open %s(%s) %d %s\n", dbfile, dbname, r, db_strerror(r)); printf("db->open %s(%s) %d %s\n", dbfile, dbname, r, db_strerror(r));
db->close(0); delete db;
env->close(0); delete env; env->close(0); delete env;
return 1; return 1;
} }
......
...@@ -178,7 +178,7 @@ check-fanout: ...@@ -178,7 +178,7 @@ check-fanout:
let BRT_FANOUT=BRT_FANOUT+1; \ let BRT_FANOUT=BRT_FANOUT+1; \
done done
log-test log-test2 log-test3 log-test4 log-test5 log-test6 benchmark-test brt-test brt-test0 brt-test1 brt-test2 brt-test3 brt-test4 brt-test5 test-brt-overflow brt-test-named-db brt-test-cursor brt-test-cursor-2 test-brt-delete-both brt-serialize-test brtdump test-inc-split test-del-inorder: LDFLAGS+=-lz log-test log-test2 log-test3 log-test4 log-test5 log-test6 benchmark-test brt-test brt-test0 brt-test1 brt-test2 brt-test3 brt-test4 brt-test5 test-brt-overflow brt-test-named-db brt-test-cursor brt-test-cursor-2 test-brt-delete-both brt-serialize-test brtdump test-inc-split test-del-inorder cachetable-test cachetable-test2: LDFLAGS+=-lz
# pma: PROF_FLAGS=-fprofile-arcs -ftest-coverage # pma: PROF_FLAGS=-fprofile-arcs -ftest-coverage
BRT_INTERNAL_H_INCLUDES = brt-internal.h cachetable.h fifo.h gpma.h brt.h brt-search.h brttypes.h yerror.h ybt.h log.h ../include/db.h kv-pair.h memory.h crc.h mempool.h leafentry.h BRT_INTERNAL_H_INCLUDES = brt-internal.h cachetable.h fifo.h gpma.h brt.h brt-search.h brttypes.h yerror.h ybt.h log.h ../include/db.h kv-pair.h memory.h crc.h mempool.h leafentry.h
...@@ -228,10 +228,10 @@ test_toku_malloc_plain_free: memory.o toku_assert.o ...@@ -228,10 +228,10 @@ test_toku_malloc_plain_free: memory.o toku_assert.o
cachetable-test.o: cachetable.h memory.h cachetable-test.o: cachetable.h memory.h
cachetable-test: cachetable.o memory.o cachetable-test.o primes.o toku_assert.o cachetable-test: $(OFILES)
cachetable-test2.o: cachetable.h memory.h cachetable-test2.o: cachetable.h memory.h
cachetable-test2: cachetable.o memory.o cachetable-test2.o primes.o toku_assert.o cachetable-test2: $(OFILES)
benchmark-test: $(OFILES) benchmark-test: $(OFILES)
benchmark-test.o: brt.h brt-search.h ../include/db.h benchmark-test.o: brt.h brt-search.h ../include/db.h
......
...@@ -41,7 +41,7 @@ static void setup (void) { ...@@ -41,7 +41,7 @@ static void setup (void) {
static void shutdown (void) { static void shutdown (void) {
int r; int r;
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
static void long_long_to_array (unsigned char *a, unsigned long long l) { static void long_long_to_array (unsigned char *a, unsigned long long l) {
......
...@@ -111,6 +111,7 @@ struct brt_header { ...@@ -111,6 +111,7 @@ struct brt_header {
struct brt { struct brt {
CACHEFILE cf; CACHEFILE cf;
char *fname; // the filename
char *database_name; char *database_name;
// The header is shared. It is also ephemeral. // The header is shared. It is also ephemeral.
struct brt_header *h; struct brt_header *h;
...@@ -128,8 +129,8 @@ struct brt { ...@@ -128,8 +129,8 @@ struct brt {
}; };
/* serialization code */ /* serialization code */
void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node); void toku_serialize_brtnode_to(int fd, DISKOFF off, BRTNODE node);
int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, unsigned int flags, int nodesize); int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */ unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......
...@@ -27,7 +27,7 @@ static void test_serialize(void) { ...@@ -27,7 +27,7 @@ static void test_serialize(void) {
sn.thisnodename = sn.nodesize*20; sn.thisnodename = sn.nodesize*20;
sn.disk_lsn.lsn = 789; sn.disk_lsn.lsn = 789;
sn.log_lsn.lsn = 123456; sn.log_lsn.lsn = 123456;
sn.layout_version = 3; sn.layout_version = 4;
sn.height = 1; sn.height = 1;
sn.rand4fingerprint = randval; sn.rand4fingerprint = randval;
sn.local_fingerprint = 0; sn.local_fingerprint = 0;
...@@ -50,14 +50,14 @@ static void test_serialize(void) { ...@@ -50,14 +50,14 @@ static void test_serialize(void) {
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5); BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5); sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
toku_serialize_brtnode_to(fd, sn.nodesize*(DISKOFF)20, (DISKOFF)sn.nodesize, &sn); assert(r==0); toku_serialize_brtnode_to(fd, sn.nodesize*(DISKOFF)20, &sn); assert(r==0);
r = toku_deserialize_brtnode_from(fd, nodesize*(DISKOFF)20, &dn, sn.flags, nodesize); r = toku_deserialize_brtnode_from(fd, nodesize*(DISKOFF)20, &dn);
assert(r==0); assert(r==0);
assert(dn->thisnodename==nodesize*20); assert(dn->thisnodename==nodesize*20);
assert(dn->disk_lsn.lsn==123456); assert(dn->disk_lsn.lsn==123456);
assert(dn->layout_version ==3); assert(dn->layout_version ==4);
assert(dn->height == 1); assert(dn->height == 1);
assert(dn->rand4fingerprint==randval); assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2); assert(dn->u.n.n_children==2);
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf" static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf"
4+ // nodesize
8+ // checkpoint number 8+ // checkpoint number
4+ // block size 4+ // block size
4+ // data size 4+ // data size
...@@ -92,7 +93,7 @@ unsigned int toku_serialize_brtnode_size (BRTNODE node) { ...@@ -92,7 +93,7 @@ unsigned int toku_serialize_brtnode_size (BRTNODE node) {
return result; return result;
} }
void toku_serialize_brtnode_to (int fd, DISKOFF off, DISKOFF size, BRTNODE node) { void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) {
//printf("%s:%d serializing\n", __FILE__, __LINE__); //printf("%s:%d serializing\n", __FILE__, __LINE__);
struct wbuf w; struct wbuf w;
int i; int i;
...@@ -102,13 +103,13 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, DISKOFF size, BRTNODE node) ...@@ -102,13 +103,13 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, DISKOFF size, BRTNODE node)
//printf("Sizes don't match: %d %d\n", calculated_size, toku_serialize_brtnode_size(node)); //printf("Sizes don't match: %d %d\n", calculated_size, toku_serialize_brtnode_size(node));
} }
#endif #endif
assert(calculated_size<=size); //assert(calculated_size<=size);
//char buf[size]; //char buf[size];
char *MALLOC_N(size,buf); char *MALLOC_N(node->nodesize,buf);
//toku_verify_counts(node); //toku_verify_counts(node);
assert(size>0); //assert(size>0);
wbuf_init(&w, buf, size);
//printf("%s:%d serializing %lld w height=%d p0=%p\n", __FILE__, __LINE__, off, node->height, node->mdicts[0]); //printf("%s:%d serializing %lld w height=%d p0=%p\n", __FILE__, __LINE__, off, node->height, node->mdicts[0]);
wbuf_init(&w, buf, node->nodesize);
wbuf_literal_bytes(&w, "toku", 4); wbuf_literal_bytes(&w, "toku", 4);
if (node->height==0) wbuf_literal_bytes(&w, "leaf", 4); if (node->height==0) wbuf_literal_bytes(&w, "leaf", 4);
else wbuf_literal_bytes(&w, "node", 4); else wbuf_literal_bytes(&w, "node", 4);
...@@ -116,6 +117,7 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, DISKOFF size, BRTNODE node) ...@@ -116,6 +117,7 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, DISKOFF size, BRTNODE node)
wbuf_ulonglong(&w, node->log_lsn.lsn); wbuf_ulonglong(&w, node->log_lsn.lsn);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size); //printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_uint(&w, calculated_size); wbuf_uint(&w, calculated_size);
wbuf_int(&w, node->nodesize);
wbuf_uint(&w, node->flags); wbuf_uint(&w, node->flags);
wbuf_int(&w, node->height); wbuf_int(&w, node->height);
//printf("%s:%d %lld rand=%08x sum=%08x height=%d\n", __FILE__, __LINE__, node->thisnodename, node->rand4fingerprint, node->subtree_fingerprint, node->height); //printf("%s:%d %lld rand=%08x sum=%08x height=%d\n", __FILE__, __LINE__, node->thisnodename, node->rand4fingerprint, node->subtree_fingerprint, node->height);
...@@ -192,13 +194,13 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, DISKOFF size, BRTNODE node) ...@@ -192,13 +194,13 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, DISKOFF size, BRTNODE node)
wbuf_uint(&w, w.crc32); wbuf_uint(&w, w.crc32);
#endif #endif
memset(w.buf+w.ndone, 0, (size_t)(size-w.ndone)); // fill with zeros memset(w.buf+w.ndone, 0, (size_t)(node->nodesize-w.ndone)); // fill with zeros
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone); //write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
{ {
ssize_t r=pwrite(fd, w.buf, (size_t)size, off); // write the whole buffer, including the zeros ssize_t r=pwrite(fd, w.buf, (size_t)node->nodesize, off); // write the whole buffer, including the zeros
if (r<0) printf("r=%ld errno=%d\n", (long)r, errno); if (r<0) printf("r=%ld errno=%d\n", (long)r, errno);
assert(r==size); assert(r==node->nodesize);
} }
if (calculated_size!=w.ndone) if (calculated_size!=w.ndone)
...@@ -206,11 +208,11 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, DISKOFF size, BRTNODE node) ...@@ -206,11 +208,11 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, DISKOFF size, BRTNODE node)
assert(calculated_size==w.ndone); assert(calculated_size==w.ndone);
//printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size); //printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size);
assert(w.ndone<=size); assert(w.ndone<=node->nodesize);
toku_free(buf); toku_free(buf);
} }
int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, unsigned int flags, int nodesize) { int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode) {
TAGMALLOC(BRTNODE, result); TAGMALLOC(BRTNODE, result);
struct rbuf rc; struct rbuf rc;
int i; int i;
...@@ -264,7 +266,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, unsign ...@@ -264,7 +266,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, unsign
} }
} }
result->layout_version = rbuf_int(&rc); result->layout_version = rbuf_int(&rc);
if (result->layout_version!=3) { if (result->layout_version!=4) {
r=DB_BADFORMAT; r=DB_BADFORMAT;
goto died1; goto died1;
} }
...@@ -274,9 +276,9 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, unsign ...@@ -274,9 +276,9 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, unsign
unsigned int stored_size = rbuf_int(&rc); unsigned int stored_size = rbuf_int(&rc);
if (stored_size!=datasize) { r=DB_BADFORMAT; goto died1; } if (stored_size!=datasize) { r=DB_BADFORMAT; goto died1; }
} }
result->nodesize = nodesize; // How to compute the nodesize? result->nodesize = rbuf_int(&rc);
result->thisnodename = off; result->thisnodename = off;
result->flags = rbuf_int(&rc); assert(result->flags == (unsigned int) flags); result->flags = rbuf_int(&rc);
result->height = rbuf_int(&rc); result->height = rbuf_int(&rc);
result->rand4fingerprint = rbuf_int(&rc); result->rand4fingerprint = rbuf_int(&rc);
result->local_fingerprint = rbuf_int(&rc); result->local_fingerprint = rbuf_int(&rc);
...@@ -375,7 +377,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, unsign ...@@ -375,7 +377,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, unsign
} }
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer); //printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
{ {
int mpsize = nodesize + nodesize/4; int mpsize = result->nodesize + result->nodesize/4;
void *mp = toku_malloc(mpsize); void *mp = toku_malloc(mpsize);
if (mp==0) return ENOMEM; // TODO cleanup if (mp==0) return ENOMEM; // TODO cleanup
toku_mempool_init(&result->u.l.buffer_mempool, mp, mpsize); toku_mempool_init(&result->u.l.buffer_mempool, mp, mpsize);
......
...@@ -83,7 +83,7 @@ static void test_multiple_brt_cursor_dbts(int n, DB *db) { ...@@ -83,7 +83,7 @@ static void test_multiple_brt_cursor_dbts(int n, DB *db) {
assert(r == 0); assert(r == 0);
} }
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
......
...@@ -144,7 +144,7 @@ static void test_brt_cursor_first(int n, DB *db) { ...@@ -144,7 +144,7 @@ static void test_brt_cursor_first(int n, DB *db) {
else else
assert_cursor_value(brt, DB_FIRST, 0); assert_cursor_value(brt, DB_FIRST, 0);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -185,7 +185,7 @@ static void test_brt_cursor_last(int n, DB *db) { ...@@ -185,7 +185,7 @@ static void test_brt_cursor_last(int n, DB *db) {
else else
assert_cursor_value(brt, DB_LAST, n-1); assert_cursor_value(brt, DB_LAST, n-1);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -228,7 +228,7 @@ static void test_brt_cursor_first_last(int n, DB *db) { ...@@ -228,7 +228,7 @@ static void test_brt_cursor_first_last(int n, DB *db) {
} else } else
assert_cursor_first_last(brt, 0, n-1); assert_cursor_first_last(brt, 0, n-1);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -272,7 +272,7 @@ static void test_brt_cursor_rfirst(int n, DB *db) { ...@@ -272,7 +272,7 @@ static void test_brt_cursor_rfirst(int n, DB *db) {
else else
assert_cursor_value(brt, DB_FIRST, 0); assert_cursor_value(brt, DB_FIRST, 0);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -343,7 +343,7 @@ static void test_brt_cursor_walk(int n, DB *db) { ...@@ -343,7 +343,7 @@ static void test_brt_cursor_walk(int n, DB *db) {
/* walk the tree */ /* walk the tree */
assert_cursor_walk(brt, n); assert_cursor_walk(brt, n);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -415,7 +415,7 @@ static void test_brt_cursor_rwalk(int n, DB *db) { ...@@ -415,7 +415,7 @@ static void test_brt_cursor_rwalk(int n, DB *db) {
/* walk the tree */ /* walk the tree */
assert_cursor_rwalk(brt, n); assert_cursor_rwalk(brt, n);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -502,7 +502,7 @@ static void test_brt_cursor_rand(int n, DB *db) { ...@@ -502,7 +502,7 @@ static void test_brt_cursor_rand(int n, DB *db) {
/* walk the tree */ /* walk the tree */
assert_cursor_walk_inorder(brt, n); assert_cursor_walk_inorder(brt, n);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -583,7 +583,7 @@ static void test_brt_cursor_split(int n, DB *db) { ...@@ -583,7 +583,7 @@ static void test_brt_cursor_split(int n, DB *db) {
r = toku_brt_cursor_close(cursor); r = toku_brt_cursor_close(cursor);
assert(r==0); assert(r==0);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -617,7 +617,7 @@ static void test_multiple_brt_cursors(int n, DB *db) { ...@@ -617,7 +617,7 @@ static void test_multiple_brt_cursors(int n, DB *db) {
assert(r == 0); assert(r == 0);
} }
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -713,7 +713,7 @@ static void test_multiple_brt_cursor_walk(int n, DB *db) { ...@@ -713,7 +713,7 @@ static void test_multiple_brt_cursor_walk(int n, DB *db) {
assert(r == 0); assert(r == 0);
} }
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -785,7 +785,7 @@ static void test_brt_cursor_set(int n, int cursor_op, DB *db) { ...@@ -785,7 +785,7 @@ static void test_brt_cursor_set(int n, int cursor_op, DB *db) {
r = toku_brt_cursor_close(cursor); r = toku_brt_cursor_close(cursor);
assert(r==0); assert(r==0);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -851,7 +851,7 @@ static void test_brt_cursor_set_range(int n, DB *db) { ...@@ -851,7 +851,7 @@ static void test_brt_cursor_set_range(int n, DB *db) {
r = toku_brt_cursor_close(cursor); r = toku_brt_cursor_close(cursor);
assert(r==0); assert(r==0);
r = toku_close_brt(brt); r = toku_close_brt(brt, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -912,7 +912,7 @@ static void test_brt_cursor_delete(int n, DB *db) { ...@@ -912,7 +912,7 @@ static void test_brt_cursor_delete(int n, DB *db) {
error = toku_brt_cursor_close(cursor); error = toku_brt_cursor_close(cursor);
assert(error == 0); assert(error == 0);
error = toku_close_brt(brt); error = toku_close_brt(brt, 0);
assert(error == 0); assert(error == 0);
error = toku_cachetable_close(&ct); error = toku_cachetable_close(&ct);
...@@ -1015,7 +1015,7 @@ static void test_brt_cursor_get_both(int n, DB *db) { ...@@ -1015,7 +1015,7 @@ static void test_brt_cursor_get_both(int n, DB *db) {
error = toku_brt_cursor_close(cursor); error = toku_brt_cursor_close(cursor);
assert(error == 0); assert(error == 0);
error = toku_close_brt(brt); error = toku_close_brt(brt, 0);
assert(error == 0); assert(error == 0);
error = toku_cachetable_close(&ct); error = toku_cachetable_close(&ct);
......
...@@ -25,7 +25,7 @@ static void test_named_db (void) { ...@@ -25,7 +25,7 @@ static void test_named_db (void) {
toku_brt_insert(t0, toku_fill_dbt(&k, "good", 5), toku_fill_dbt(&v, "day", 4), null_txn); assert(r==0); toku_brt_insert(t0, toku_fill_dbt(&k, "good", 5), toku_fill_dbt(&v, "day", 4), null_txn); assert(r==0);
r = toku_close_brt(t0); assert(r==0); r = toku_close_brt(t0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
...@@ -40,7 +40,7 @@ static void test_named_db (void) { ...@@ -40,7 +40,7 @@ static void test_named_db (void) {
assert(strcmp(v.data,"day")==0); assert(strcmp(v.data,"day")==0);
} }
r = toku_close_brt(t0); assert(r==0); r = toku_close_brt(t0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
} }
......
This diff is collapsed.
...@@ -26,7 +26,7 @@ static void test0 (void) { ...@@ -26,7 +26,7 @@ static void test0 (void) {
assert(r==0); assert(r==0);
//printf("%s:%d test0\n", __FILE__, __LINE__); //printf("%s:%d test0\n", __FILE__, __LINE__);
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced); //printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced); //printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
assert(r==0); assert(r==0);
......
...@@ -30,7 +30,7 @@ static void test1 (void) { ...@@ -30,7 +30,7 @@ static void test1 (void) {
assert(strcmp(v.data, "there")==0); assert(strcmp(v.data, "there")==0);
assert(v.size==6); assert(v.size==6);
} }
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
if (verbose) printf("test1 ok\n"); if (verbose) printf("test1 ok\n");
......
...@@ -43,7 +43,7 @@ static void test2 (int memcheck, int limit) { ...@@ -43,7 +43,7 @@ static void test2 (int memcheck, int limit) {
} }
if (verbose) printf("%s:%d inserted\n", __FILE__, __LINE__); if (verbose) printf("%s:%d inserted\n", __FILE__, __LINE__);
r = toku_verify_brt(t); assert(r==0); r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
if (verbose) printf("test2 ok\n"); if (verbose) printf("test2 ok\n");
......
...@@ -43,7 +43,7 @@ static void test3 (int nodesize, int count, int memcheck) { ...@@ -43,7 +43,7 @@ static void test3 (int nodesize, int count, int memcheck) {
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn); toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
} }
r = toku_verify_brt(t); assert(r==0); r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
gettimeofday(&t1, 0); gettimeofday(&t1, 0);
......
...@@ -43,7 +43,7 @@ static void test4 (int nodesize, int count, int memcheck) { ...@@ -43,7 +43,7 @@ static void test4 (int nodesize, int count, int memcheck) {
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn); toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
} }
r = toku_verify_brt(t); assert(r==0); r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
gettimeofday(&t1, 0); gettimeofday(&t1, 0);
......
...@@ -54,7 +54,7 @@ static void test5 (void) { ...@@ -54,7 +54,7 @@ static void test5 (void) {
} }
if (verbose) printf("\n"); if (verbose) printf("\n");
toku_free(values); toku_free(values);
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
} }
......
...@@ -139,7 +139,7 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *b ...@@ -139,7 +139,7 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *b
assert(brtnode->thisnodename==nodename); assert(brtnode->thisnodename==nodename);
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]); //printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) { if (write_me) {
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode->nodesize, brtnode); toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode);
} }
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]); //printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (!keep_me) { if (!keep_me) {
...@@ -148,10 +148,9 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *b ...@@ -148,10 +148,9 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *b
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced); //printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
} }
int toku_brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) { int toku_brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep, void*UU(extraargs), LSN *written_lsn) {
BRT t =(BRT)extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv; BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, result, t->flags, t->nodesize); int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, result);
if (r == 0) { if (r == 0) {
*sizep = brtnode_size(*result); *sizep = brtnode_size(*result);
*written_lsn = (*result)->disk_lsn; *written_lsn = (*result)->disk_lsn;
...@@ -286,7 +285,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) ...@@ -286,7 +285,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
n->thisnodename = nodename; n->thisnodename = nodename;
n->disk_lsn.lsn = 0; // a new one can always be 0. n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn; n->log_lsn = n->disk_lsn;
n->layout_version = 3; n->layout_version = 4;
n->height = height; n->height = height;
n->rand4fingerprint = random(); n->rand4fingerprint = random();
n->local_fingerprint = 0; n->local_fingerprint = 0;
...@@ -1892,12 +1891,18 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -1892,12 +1891,18 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
assert(is_create || !only_create); assert(is_create || !only_create);
assert(!load_flags || !only_create); assert(!load_flags || !only_create);
t->fname = toku_strdup(fname_in_env);
if (t->fname==0) {
r = errno;
if (0) { died00: if (t->fname) toku_free(t->fname); t->fname=0; }
goto died0;
}
if (dbname) { if (dbname) {
malloced_name = toku_strdup(dbname); malloced_name = toku_strdup(dbname);
if (malloced_name==0) { if (malloced_name==0) {
r = ENOMEM; r = ENOMEM;
if (0) { died0a: if(malloced_name) toku_free(malloced_name); } if (0) { died0a: if(malloced_name) toku_free(malloced_name); }
goto died0; goto died00;
} }
} }
t->database_name = malloced_name; t->database_name = malloced_name;
...@@ -1922,12 +1927,12 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -1922,12 +1927,12 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
} else } else
goto died0a; goto died0a;
} }
r=toku_cachetable_openfd(&t->cf, cachetable, fd); r=toku_cachetable_openfd(&t->cf, cachetable, fd, fname_in_env);
if (r != 0) goto died0a; if (r != 0) goto died0a;
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf)); toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
} }
if (r!=0) { if (r!=0) {
if (0) { died1: toku_cachefile_close(&t->cf); } if (0) { died1: toku_cachefile_close(&t->cf, toku_txn_logger(txn)); }
t->database_name = 0; t->database_name = 0;
goto died0a; goto died0a;
} }
...@@ -2116,7 +2121,7 @@ int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *ne ...@@ -2116,7 +2121,7 @@ int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *ne
return r; return r;
} }
int toku_close_brt (BRT brt) { int toku_close_brt (BRT brt, TOKULOGGER logger) {
int r; int r;
while (!list_empty(&brt->cursors)) { while (!list_empty(&brt->cursors)) {
BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link); BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link);
...@@ -2124,11 +2129,19 @@ int toku_close_brt (BRT brt) { ...@@ -2124,11 +2129,19 @@ int toku_close_brt (BRT brt) {
if (r!=0) return r; if (r!=0) return r;
} }
if (brt->cf) { if (brt->cf) {
if (logger) {
assert(brt->fname);
BYTESTRING bs = {.len=strlen(brt->fname), .data=brt->fname};
LSN lsn;
r = toku_log_brtclose(logger, &lsn, 1, bs, toku_cachefile_filenum(brt->cf)); // flush the log on close, otherwise it might not make it out.
if (r!=0) return r;
}
assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero. assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero.
//printf("%s:%d closing cachetable\n", __FILE__, __LINE__); //printf("%s:%d closing cachetable\n", __FILE__, __LINE__);
if ((r = toku_cachefile_close(&brt->cf))!=0) return r; if ((r = toku_cachefile_close(&brt->cf, logger))!=0) return r;
} }
if (brt->database_name) toku_free(brt->database_name); if (brt->database_name) toku_free(brt->database_name);
if (brt->fname) toku_free(brt->fname);
if (brt->skey) { toku_free(brt->skey); } if (brt->skey) { toku_free(brt->skey); }
if (brt->sval) { toku_free(brt->sval); } if (brt->sval) { toku_free(brt->sval); }
toku_free(brt); toku_free(brt);
......
...@@ -29,7 +29,7 @@ int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN); ...@@ -29,7 +29,7 @@ int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN);
int toku_brt_lookup (BRT brt, DBT *k, DBT *v); int toku_brt_lookup (BRT brt, DBT *k, DBT *v);
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN); int toku_brt_delete (BRT brt, DBT *k, TOKUTXN);
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN); // Delete a pair only if both k and v are equal according to the comparison function. int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN); // Delete a pair only if both k and v are equal according to the comparison function.
int toku_close_brt (BRT); int toku_close_brt (BRT, TOKULOGGER);
int toku_dump_brt (BRT brt); int toku_dump_brt (BRT brt);
void brt_fsync (BRT); /* fsync, but don't clear the caches. */ void brt_fsync (BRT); /* fsync, but don't clear the caches. */
......
...@@ -219,7 +219,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) ...@@ -219,7 +219,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
n->thisnodename = nodename; n->thisnodename = nodename;
n->disk_lsn.lsn = 0; // a new one can always be 0. n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn; n->log_lsn = n->disk_lsn;
n->layout_version = 3; n->layout_version = 4;
n->height = height; n->height = height;
n->rand4fingerprint = random(); n->rand4fingerprint = random();
n->local_fingerprint = 0; n->local_fingerprint = 0;
......
...@@ -42,9 +42,9 @@ void print_item (bytevec val, ITEMLEN len) { ...@@ -42,9 +42,9 @@ void print_item (bytevec val, ITEMLEN len) {
printf("\""); printf("\"");
} }
void dump_node (int f, DISKOFF off, struct brt_header *h) { void dump_node (int f, DISKOFF off) {
BRTNODE n; BRTNODE n;
int r = toku_deserialize_brtnode_from (f, off, &n, h->flags, h->nodesize); int r = toku_deserialize_brtnode_from (f, off, &n);
assert(r==0); assert(r==0);
assert(n!=0); assert(n!=0);
printf("brtnode\n"); printf("brtnode\n");
...@@ -124,7 +124,7 @@ int main (int argc, const char *argv[]) { ...@@ -124,7 +124,7 @@ int main (int argc, const char *argv[]) {
dump_header(f, &h); dump_header(f, &h);
DISKOFF off; DISKOFF off;
for (off=h->nodesize; off<h->unused_memory; off+=h->nodesize) { for (off=h->nodesize; off<h->unused_memory; off+=h->nodesize) {
dump_node(f, off, h); dump_node(f, off);
} }
return 0; return 0;
} }
...@@ -172,7 +172,7 @@ static void test0 (void) { ...@@ -172,7 +172,7 @@ static void test0 (void) {
expectN(7); expectN(7);
expectN(6); expectN(6);
expectN(1); expectN(1);
r=toku_cachefile_close(&f); r=toku_cachefile_close(&f, 0);
assert(r==0); assert(r==0);
r=toku_cachetable_close(&t); r=toku_cachetable_close(&t);
assert(r==0); assert(r==0);
...@@ -230,7 +230,7 @@ static void test_nested_pin (void) { ...@@ -230,7 +230,7 @@ static void test_nested_pin (void) {
r = toku_cachetable_unpin(f, 1, 0, test_object_size); r = toku_cachetable_unpin(f, 1, 0, test_object_size);
assert(r==0); assert(r==0);
r = toku_cachefile_close(&f); assert(r==0); r = toku_cachefile_close(&f, 0); assert(r==0);
r = toku_cachetable_close(&t); assert(r==0); r = toku_cachetable_close(&t); assert(r==0);
} }
...@@ -289,9 +289,9 @@ static void test_multi_filehandles (void) { ...@@ -289,9 +289,9 @@ static void test_multi_filehandles (void) {
r = toku_cachetable_maybe_get_and_pin(f1, 2, &v); assert(r==0); r = toku_cachetable_maybe_get_and_pin(f1, 2, &v); assert(r==0);
assert((unsigned long)v==125); assert((unsigned long)v==125);
r = toku_cachefile_close(&f1); assert(r==0); r = toku_cachefile_close(&f1, 0); assert(r==0);
r = toku_cachefile_close(&f2); assert(r==0); r = toku_cachefile_close(&f2, 0); assert(r==0);
r = toku_cachefile_close(&f3); assert(r==0); r = toku_cachefile_close(&f3, 0); assert(r==0);
r = toku_cachetable_close(&t); assert(r==0); r = toku_cachetable_close(&t); assert(r==0);
} }
...@@ -398,7 +398,7 @@ static void test_dirty() { ...@@ -398,7 +398,7 @@ static void test_dirty() {
assert(dirty == 1); assert(dirty == 1);
assert(pinned == 0); assert(pinned == 0);
r = toku_cachefile_close(&f); r = toku_cachefile_close(&f, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_close(&t); r = toku_cachetable_close(&t);
assert(r == 0); assert(r == 0);
...@@ -459,7 +459,7 @@ static void test_size_resize() { ...@@ -459,7 +459,7 @@ static void test_size_resize() {
r = toku_cachetable_unpin(f, key, CACHETABLE_CLEAN, new_size); r = toku_cachetable_unpin(f, key, CACHETABLE_CLEAN, new_size);
assert(r == 0); assert(r == 0);
r = toku_cachefile_close(&f); r = toku_cachefile_close(&f, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_close(&t); r = toku_cachetable_close(&t);
assert(r == 0); assert(r == 0);
...@@ -517,7 +517,7 @@ static void test_size_flush() { ...@@ -517,7 +517,7 @@ static void test_size_flush() {
assert(r == 0); assert(r == 0);
} }
r = toku_cachefile_close(&f); r = toku_cachefile_close(&f, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_close(&t); r = toku_cachetable_close(&t);
assert(r == 0); assert(r == 0);
...@@ -606,7 +606,7 @@ static void test_rename (void) { ...@@ -606,7 +606,7 @@ static void test_rename (void) {
} }
} }
r = toku_cachefile_close(&f); r = toku_cachefile_close(&f, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_close(&t); r = toku_cachetable_close(&t);
assert(r == 0); assert(r == 0);
......
...@@ -159,13 +159,13 @@ static void test_chaining (void) { ...@@ -159,13 +159,13 @@ static void test_chaining (void) {
//printf("Close %d (%p), now n_present=%d\n", i, f[i], n_present); //printf("Close %d (%p), now n_present=%d\n", i, f[i], n_present);
//print_ints(); //print_ints();
CACHEFILE oldcf=f[i]; CACHEFILE oldcf=f[i];
r = toku_cachefile_close(&f[i]); assert(r==0); r = toku_cachefile_close(&f[i], 0); assert(r==0);
file_is_not_present(oldcf); file_is_not_present(oldcf);
r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR, 0777); assert(r==0); r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR, 0777); assert(r==0);
} }
} }
for (i=0; i<N_FILES; i++) { for (i=0; i<N_FILES; i++) {
r = toku_cachefile_close(&f[i]); assert(r==0); r = toku_cachefile_close(&f[i], 0); assert(r==0);
} }
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "toku_assert.h" #include "toku_assert.h"
#include "yerror.h" #include "yerror.h"
#include "brt-internal.h" #include "brt-internal.h"
#include "log_header.h"
#include <errno.h> #include <errno.h>
#include <stdio.h> #include <stdio.h>
...@@ -72,6 +73,7 @@ struct cachefile { ...@@ -72,6 +73,7 @@ struct cachefile {
CACHETABLE cachetable; CACHETABLE cachetable;
struct fileid fileid; struct fileid fileid;
FILENUM filenum; FILENUM filenum;
char *fname;
}; };
int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) { int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) {
...@@ -108,7 +110,7 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf) { ...@@ -108,7 +110,7 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf) {
} }
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile. // If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd) { int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fname) {
int r; int r;
CACHEFILE extant; CACHEFILE extant;
FILENUM max_filenum_in_use={0}; FILENUM max_filenum_in_use={0};
...@@ -137,6 +139,7 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd) { ...@@ -137,6 +139,7 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd) {
newcf->fd = fd; newcf->fd = fd;
newcf->cachetable = t; newcf->cachetable = t;
newcf->fileid = fileid; newcf->fileid = fileid;
newcf->fname = fname ? toku_strdup(fname) : 0;
t->cachefiles = newcf; t->cachefiles = newcf;
*cf = newcf; *cf = newcf;
return 0; return 0;
...@@ -146,7 +149,7 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd) { ...@@ -146,7 +149,7 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd) {
int toku_cachetable_openf (CACHEFILE *cf, CACHETABLE t, const char *fname, int flags, mode_t mode) { int toku_cachetable_openf (CACHEFILE *cf, CACHETABLE t, const char *fname, int flags, mode_t mode) {
int fd = open(fname, flags, mode); int fd = open(fname, flags, mode);
if (fd<0) return errno; if (fd<0) return errno;
return toku_cachetable_openfd (cf, t, fd); return toku_cachetable_openfd (cf, t, fd, fname);
} }
static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) { static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) {
...@@ -166,7 +169,7 @@ void toku_cachefile_refup (CACHEFILE cf) { ...@@ -166,7 +169,7 @@ void toku_cachefile_refup (CACHEFILE cf) {
cf->refcount++; cf->refcount++;
} }
int toku_cachefile_close (CACHEFILE *cfp) { int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) {
CACHEFILE cf = *cfp; CACHEFILE cf = *cfp;
assert(cf->refcount>0); assert(cf->refcount>0);
cf->refcount--; cf->refcount--;
...@@ -177,6 +180,13 @@ int toku_cachefile_close (CACHEFILE *cfp) { ...@@ -177,6 +180,13 @@ int toku_cachefile_close (CACHEFILE *cfp) {
assert(r == 0); assert(r == 0);
cf->fd = -1; cf->fd = -1;
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles); cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
if (logger) {
assert(cf->fname);
BYTESTRING bs = {.len=strlen(cf->fname), .data=cf->fname};
r = toku_log_cfclose(logger, 0, 0, bs, cf->filenum);
}
if (cf->fname)
toku_free(cf->fname);
toku_free(cf); toku_free(cf);
*cfp=0; *cfp=0;
return r; return r;
......
...@@ -26,7 +26,7 @@ typedef long long CACHEKEY; ...@@ -26,7 +26,7 @@ typedef long long CACHEKEY;
int toku_create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER); int toku_create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER);
int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode); int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode);
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/); int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fname (used for logging)*/);
typedef void (cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p); typedef void (cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
typedef cachetable_flush_func_t *CACHETABLE_FLUSH_FUNC_T; typedef cachetable_flush_func_t *CACHETABLE_FLUSH_FUNC_T;
...@@ -61,7 +61,7 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke ...@@ -61,7 +61,7 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke
//int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */ //int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */ int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
int toku_cachefile_close (CACHEFILE*); int toku_cachefile_close (CACHEFILE*, TOKULOGGER);
//int cachefile_flush (CACHEFILE); /* Flush everything related to the VOID* to disk and free all memory. Don't destroy the cachetable. */ //int cachefile_flush (CACHEFILE); /* Flush everything related to the VOID* to disk and free all memory. Don't destroy the cachetable. */
void toku_cachefile_refup (CACHEFILE cfp); // Increment the reference count. Use close to decrement it. void toku_cachefile_refup (CACHEFILE cfp); // Increment the reference count. Use close to decrement it.
......
...@@ -353,6 +353,8 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { ...@@ -353,6 +353,8 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
txn->newest_logentry = item->prev; txn->newest_logentry = item->prev;
rolltype_dispatch(item, toku_free_rolltype_); rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item); toku_free(item);
toku_logger_rollback_malloc_size -= sizeof(*item);
toku_logger_rollback_malloc_count --;
} }
r = 0; r = 0;
} else if (txn->parent!=0) { } else if (txn->parent!=0) {
...@@ -378,6 +380,8 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { ...@@ -378,6 +380,8 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
if (r!=0) goto broken; if (r!=0) goto broken;
rolltype_dispatch(item, toku_free_rolltype_); rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item); toku_free(item);
toku_logger_rollback_malloc_size -= sizeof(*item);
toku_logger_rollback_malloc_count --;
} }
r = 0; r = 0;
} }
...@@ -704,6 +708,8 @@ int toku_logger_abort(TOKUTXN txn) { ...@@ -704,6 +708,8 @@ int toku_logger_abort(TOKUTXN txn) {
if (r!=0) return r; if (r!=0) return r;
rolltype_dispatch(item, toku_free_rolltype_); rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item); toku_free(item);
toku_logger_rollback_malloc_size -= sizeof(*item);
toku_logger_rollback_malloc_count --;
} }
list_remove(&txn->live_txns_link); list_remove(&txn->live_txns_link);
toku_free(txn); toku_free(txn);
......
...@@ -147,6 +147,12 @@ const struct logtype logtypes[] = { ...@@ -147,6 +147,12 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "fname", 0}, {"BYTESTRING", "fname", 0},
{"FILENUM", "filenum", 0}, {"FILENUM", "filenum", 0},
NULLFIELD}}, NULLFIELD}},
{"brtclose", 'e', FA{{"BYTESTRING", "fname", 0}, // brtclose is logged when a particular brt is closed
{"FILENUM", "filenum", 0},
NULLFIELD}},
{"cfclose", 'o', FA{{"BYTESTRING", "fname", 0}, // cfclose is logged when a cachefile actually closes ("cfclose" means cache file close)
{"FILENUM", "filenum", 0},
NULLFIELD}},
{"brtdeq", 'U', FA{{"FILENUM", "filenum", 0}, {"brtdeq", 'U', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0}, {"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0}, {"u_int32_t", "childnum", 0},
...@@ -269,7 +275,7 @@ void generate_enum_internal (char *enum_name, char *enum_prefix, const struct lo ...@@ -269,7 +275,7 @@ void generate_enum_internal (char *enum_name, char *enum_prefix, const struct lo
count++; count++;
fprintf(hf, "\n"); fprintf(hf, "\n");
fprintf(hf," %s_%-16s = '%c'", enum_prefix, lt->name, cmd); fprintf(hf," %s_%-16s = '%c'", enum_prefix, lt->name, cmd);
if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d Command %d (%c) was used twice (second time for %s)\n", __FILE__, __LINE__, cmd, cmd, lt->name); abort(); } if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d: error: Command %d (%c) was used twice (second time for %s)\n", __FILE__, __LINE__, cmd, cmd, lt->name); abort(); }
used_cmds[cmd]=1; used_cmds[cmd]=1;
})); }));
fprintf(hf, "\n};\n\n"); fprintf(hf, "\n};\n\n");
...@@ -387,11 +393,11 @@ void generate_log_writer (void) { ...@@ -387,11 +393,11 @@ void generate_log_writer (void) {
fprintf(cf, " wbuf_int(&wbuf, buflen);\n"); fprintf(cf, " wbuf_int(&wbuf, buflen);\n");
fprintf(cf, " wbuf_char(&wbuf, '%c');\n", 0xff&lt->command_and_flags); fprintf(cf, " wbuf_char(&wbuf, '%c');\n", 0xff&lt->command_and_flags);
fprintf(cf, " ml_lock(&logger->input_lock);\n"); fprintf(cf, " ml_lock(&logger->input_lock);\n");
fprintf(cf, " logger->lsn.lsn++;\n");
fprintf(cf, " LSN lsn = logger->lsn;\n"); fprintf(cf, " LSN lsn = logger->lsn;\n");
fprintf(cf, " wbuf_LSN(&wbuf, lsn);\n"); fprintf(cf, " wbuf_LSN(&wbuf, lsn);\n");
fprintf(cf, " lbytes->lsn = lsn;\n"); fprintf(cf, " lbytes->lsn = lsn;\n");
fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n"); fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n");
fprintf(cf, " logger->lsn.lsn++;\n");
DO_FIELDS(ft, lt, DO_FIELDS(ft, lt,
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name)); fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " int r= toku_logger_finish(logger, lbytes, &wbuf, do_fsync);\n"); fprintf(cf, " int r= toku_logger_finish(logger, lbytes, &wbuf, do_fsync);\n");
...@@ -486,12 +492,15 @@ void generate_logprint (void) { ...@@ -486,12 +492,15 @@ void generate_logprint (void) {
} }
void generate_rollbacks (void) { void generate_rollbacks (void) {
fprintf(cf, " u_int64_t toku_logger_rollback_malloc_size=0, toku_logger_rollback_malloc_count=0;\n");
fprintf(hf, "extern u_int64_t toku_logger_rollback_malloc_size, toku_logger_rollback_malloc_count;\n");
DO_ROLLBACKS(lt, ({ DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name); fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
fprintf(cf, " struct roll_entry *v = toku_malloc(sizeof(*v));\n"); fprintf(cf, " struct roll_entry *v = toku_malloc(sizeof(*v));\n");
fprintf(cf, " toku_logger_rollback_malloc_count++; toku_logger_rollback_malloc_size+=sizeof(*v);\n");
fprintf(cf, " if (v==0) return errno;\n"); fprintf(cf, " if (v==0) return errno;\n");
fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff); fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff);
DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name)); DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name));
......
...@@ -48,7 +48,7 @@ void toku_recover_cleanup (void) { ...@@ -48,7 +48,7 @@ void toku_recover_cleanup (void) {
int i; int i;
for (i=0; i<n_cf_pairs; i++) { for (i=0; i<n_cf_pairs; i++) {
if (cf_pairs[i].brt) { if (cf_pairs[i].brt) {
int r = toku_close_brt(cf_pairs[i].brt); int r = toku_close_brt(cf_pairs[i].brt, 0);
//r = toku_cachefile_close(&cf_pairs[i].cf); //r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0); assert(r==0);
} }
...@@ -150,7 +150,7 @@ void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t ...@@ -150,7 +150,7 @@ void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t
n->thisnodename = diskoff; n->thisnodename = diskoff;
n->log_lsn = n->disk_lsn = lsn; n->log_lsn = n->disk_lsn = lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn); //printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = 3; n->layout_version = 4;
n->height = height; n->height = height;
n->rand4fingerprint = rand4fingerprint; n->rand4fingerprint = rand4fingerprint;
n->flags = is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ??? n->flags = is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
...@@ -422,13 +422,14 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM ...@@ -422,13 +422,14 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM
assert(fd>=0); assert(fd>=0);
BRT MALLOC(brt); BRT MALLOC(brt);
assert(errno==0 && brt!=0); assert(errno==0 && brt!=0);
brt->database_name = fixedfname; brt->fname = fixedfname;
brt->database_name = 0;
brt->h=0; brt->h=0;
list_init(&brt->cursors); list_init(&brt->cursors);
brt->compare_fun = 0; brt->compare_fun = 0;
brt->dup_compare = 0; brt->dup_compare = 0;
brt->db = 0; brt->db = 0;
int r = toku_cachetable_openfd(&cf, ct, fd); int r = toku_cachetable_openfd(&cf, ct, fd, fixedfname);
assert(r==0); assert(r==0);
brt->skey = brt->sval = 0; brt->skey = brt->sval = 0;
brt->cf=cf; brt->cf=cf;
...@@ -436,6 +437,32 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM ...@@ -436,6 +437,32 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM
toku_free_BYTESTRING(fname); toku_free_BYTESTRING(fname);
} }
void toku_recover_brtclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
// Bump up the reference count
toku_cachefile_refup(pair->cf);
r = toku_close_brt(pair->brt, 0);
assert(r==0);
pair->brt=0;
toku_free_BYTESTRING(fname);
}
void toku_recover_cfclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
int i;
for (i=0; i<n_cf_pairs; i++) {
if (filenum.fileid==cf_pairs[i].filenum.fileid) {
int r = toku_cachefile_close(&cf_pairs[i].cf, 0);
assert(r==0);
cf_pairs[i] = cf_pairs[n_cf_pairs-1];
n_cf_pairs--;
break;
}
}
toku_free_BYTESTRING(fname);
}
void toku_recover_insertleafentry (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, LEAFENTRY newleafentry) { void toku_recover_insertleafentry (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, LEAFENTRY newleafentry) {
struct cf_pair *pair = NULL; struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair); int r = find_cachefile(filenum, &pair);
...@@ -451,6 +478,7 @@ void toku_recover_insertleafentry (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_ ...@@ -451,6 +478,7 @@ void toku_recover_insertleafentry (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_
{ {
int memsize = leafentry_memsize(newleafentry); int memsize = leafentry_memsize(newleafentry);
void *mem = mempool_malloc_from_gpma(node->u.l.buffer, &node->u.l.buffer_mempool, memsize); void *mem = mempool_malloc_from_gpma(node->u.l.buffer, &node->u.l.buffer_mempool, memsize);
assert(mem);
memcpy(mem, newleafentry, memsize); memcpy(mem, newleafentry, memsize);
toku_gpma_set_at_index(node->u.l.buffer, pmaidx, memsize, mem); toku_gpma_set_at_index(node->u.l.buffer, pmaidx, memsize, mem);
node->u.l.n_bytes_in_buffer += PMA_ITEM_OVERHEAD + leafentry_disksize(newleafentry); node->u.l.n_bytes_in_buffer += PMA_ITEM_OVERHEAD + leafentry_disksize(newleafentry);
......
...@@ -44,7 +44,7 @@ int toku_commit_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING ...@@ -44,7 +44,7 @@ int toku_commit_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING
toku_fill_dbt(&data_dbt, data.data, data.len)}}; toku_fill_dbt(&data_dbt, data.data, data.len)}};
r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn)); r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r; if (r!=0) return r;
return toku_cachefile_close(&cf); return toku_cachefile_close(&cf, toku_txn_logger(txn));
} }
int toku_rollback_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) { int toku_rollback_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
...@@ -58,7 +58,7 @@ int toku_rollback_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRI ...@@ -58,7 +58,7 @@ int toku_rollback_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRI
toku_fill_dbt(&data_dbt, data.data, data.len)}}; toku_fill_dbt(&data_dbt, data.data, data.len)}};
r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn)); r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r; if (r!=0) return r;
return toku_cachefile_close(&cf); return toku_cachefile_close(&cf, toku_txn_logger(txn));
} }
int toku_commit_cmddeleteboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) { int toku_commit_cmddeleteboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
...@@ -80,7 +80,7 @@ int toku_commit_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN tx ...@@ -80,7 +80,7 @@ int toku_commit_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN tx
toku_init_dbt(&data_dbt)}}; toku_init_dbt(&data_dbt)}};
r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn)); r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r; if (r!=0) return r;
return toku_cachefile_close(&cf); return toku_cachefile_close(&cf, toku_txn_logger(txn));
} }
int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN txn) { int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN txn) {
...@@ -94,5 +94,5 @@ int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN ...@@ -94,5 +94,5 @@ int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN
toku_init_dbt(&data_dbt)}}; toku_init_dbt(&data_dbt)}};
r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn)); r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r; if (r!=0) return r;
return toku_cachefile_close(&cf); return toku_cachefile_close(&cf, toku_txn_logger(txn));
} }
...@@ -40,7 +40,7 @@ void doit (void) { ...@@ -40,7 +40,7 @@ void doit (void) {
r = toku_brt_lookup(t, toku_fill_dbt(&k, "a", 2), toku_init_dbt(&v)); r = toku_brt_lookup(t, toku_fill_dbt(&k, "a", 2), toku_init_dbt(&v));
assert(r==DB_NOTFOUND); assert(r==DB_NOTFOUND);
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
...@@ -48,5 +48,6 @@ int main (int argc, const char *argv[]) { ...@@ -48,5 +48,6 @@ int main (int argc, const char *argv[]) {
default_parse_args(argc, argv); default_parse_args(argc, argv);
doit(); doit();
if (verbose) printf("test ok\n"); if (verbose) printf("test ok\n");
toku_malloc_cleanup();
return 0; return 0;
} }
...@@ -31,7 +31,7 @@ void test_overflow (void) { ...@@ -31,7 +31,7 @@ void test_overflow (void) {
char key[]={'a'+i, 0}; char key[]={'a'+i, 0};
toku_brt_insert(t, toku_fill_dbt(&k, key, 2), toku_fill_dbt(&v,buf,sizeof(buf)), null_txn); toku_brt_insert(t, toku_fill_dbt(&k, key, 2), toku_fill_dbt(&v,buf,sizeof(buf)), null_txn);
} }
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -61,7 +61,7 @@ void doit (void) { ...@@ -61,7 +61,7 @@ void doit (void) {
r = toku_brt_lookup(t, &k, &v); r = toku_brt_lookup(t, &k, &v);
assert(r==0); assert(r==0);
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -136,7 +136,7 @@ void doit (int ksize __attribute__((__unused__))) { ...@@ -136,7 +136,7 @@ void doit (int ksize __attribute__((__unused__))) {
r = toku_testsetup_root(t, anode); r = toku_testsetup_root(t, anode);
assert(r==0); assert(r==0);
r = toku_close_brt(t); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
//printf("ksize=%d, unused\n", ksize); //printf("ksize=%d, unused\n", ksize);
......
...@@ -342,6 +342,9 @@ static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mo ...@@ -342,6 +342,9 @@ static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mo
toku_logger_close(&env->i->logger); toku_logger_close(&env->i->logger);
goto died1; goto died1;
} }
} else {
r = toku_logger_close(&env->i->logger); // if no logging system, then kill the logger
assert(r==0);
} }
unused_flags &= ~DB_INIT_MPOOL; // we always init an mpool. unused_flags &= ~DB_INIT_MPOOL; // we always init an mpool.
...@@ -364,7 +367,7 @@ static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mo ...@@ -364,7 +367,7 @@ static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mo
r = toku_brt_create_cachetable(&env->i->cachetable, env->i->cachetable_size, ZERO_LSN, env->i->logger); r = toku_brt_create_cachetable(&env->i->cachetable, env->i->cachetable_size, ZERO_LSN, env->i->logger);
if (r!=0) goto died2; if (r!=0) goto died2;
toku_logger_set_cachetable(env->i->logger, env->i->cachetable); if (env->i->logger) toku_logger_set_cachetable(env->i->logger, env->i->cachetable);
return 0; return 0;
} }
...@@ -990,7 +993,7 @@ static int toku_db_close(DB * db, u_int32_t flags) { ...@@ -990,7 +993,7 @@ static int toku_db_close(DB * db, u_int32_t flags) {
} }
} }
flags=flags; flags=flags;
int r = toku_close_brt(db->i->brt); int r = toku_close_brt(db->i->brt, db->dbenv->i->logger);
if (r != 0) if (r != 0)
return r; return r;
if (db->i->db_id) { toku_db_id_remove_ref(db->i->db_id); } if (db->i->db_id) { toku_db_id_remove_ref(db->i->db_id); }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment