Commit 15e9a90a authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1510 Final merge into main. Delete #1510 branch.

svn merge -r 11048:11110 ../tokudb.1510

git-svn-id: file:///svn/toku/tokudb@11112 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7a4217dc
...@@ -6,11 +6,12 @@ TOKUROOT=./ ...@@ -6,11 +6,12 @@ TOKUROOT=./
include $(TOKUROOT)toku_include/Makefile.include include $(TOKUROOT)toku_include/Makefile.include
default: build default: build
SRCDIRS = $(OS_CHOICE) newbrt src cxx utils db-benchmark-test db-benchmark-test-cxx SRCDIRS = $(OS_CHOICE) newbrt src/range_tree src/lock_tree src cxx utils db-benchmark-test db-benchmark-test-cxx
BUILDDIRS = $(SRCDIRS) man/texi BUILDDIRS = $(SRCDIRS) man/texi
newbrt.dir: $(OS_CHOICE).dir newbrt.dir: $(OS_CHOICE).dir
src.dir: newbrt.dir src/lock_tree.dir: src/range_tree.dir
src.dir: newbrt.dir src/lock_tree.dir
cxx.dir: src.dir cxx.dir: src.dir
db-benchmark-test.dir: src.dir db-benchmark-test.dir: src.dir
db-benchmark-test-cxx.dir: cxx.dir db-benchmark-test-cxx.dir: cxx.dir
......
...@@ -122,7 +122,13 @@ typedef enum { ...@@ -122,7 +122,13 @@ typedef enum {
#endif #endif
struct __toku_db_env { struct __toku_db_env {
struct __toku_db_env_internal *i; struct __toku_db_env_internal *i;
void* __toku_dummy0[8]; int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[2];
void *app_private; /* 32-bit offset=36 size=4, 64=bit offset=72 size=8 */ void *app_private; /* 32-bit offset=36 size=4, 64=bit offset=72 size=8 */
void* __toku_dummy1[27]; void* __toku_dummy1[27];
char __toku_dummy2[64]; char __toku_dummy2[64];
......
...@@ -124,7 +124,13 @@ typedef enum { ...@@ -124,7 +124,13 @@ typedef enum {
#endif #endif
struct __toku_db_env { struct __toku_db_env {
struct __toku_db_env_internal *i; struct __toku_db_env_internal *i;
void* __toku_dummy0[10]; int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[4];
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */ void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
void* __toku_dummy1[25]; void* __toku_dummy1[25];
char __toku_dummy2[96]; char __toku_dummy2[96];
......
...@@ -125,7 +125,13 @@ typedef enum { ...@@ -125,7 +125,13 @@ typedef enum {
#endif #endif
struct __toku_db_env { struct __toku_db_env {
struct __toku_db_env_internal *i; struct __toku_db_env_internal *i;
void* __toku_dummy0[10]; int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[4];
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */ void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
void* __toku_dummy1[40]; void* __toku_dummy1[40];
char __toku_dummy2[128]; char __toku_dummy2[128];
......
...@@ -125,7 +125,13 @@ typedef enum { ...@@ -125,7 +125,13 @@ typedef enum {
#endif #endif
struct __toku_db_env { struct __toku_db_env {
struct __toku_db_env_internal *i; struct __toku_db_env_internal *i;
void* __toku_dummy0[12]; int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[6];
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */ void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
void* __toku_dummy1[38]; void* __toku_dummy1[38];
char __toku_dummy2[128]; char __toku_dummy2[128];
......
...@@ -127,7 +127,13 @@ typedef enum { ...@@ -127,7 +127,13 @@ typedef enum {
#endif #endif
struct __toku_db_env { struct __toku_db_env {
struct __toku_db_env_internal *i; struct __toku_db_env_internal *i;
void* __toku_dummy0[12]; int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[6];
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */ void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
void* __toku_dummy1[39]; void* __toku_dummy1[39];
char __toku_dummy2[144]; char __toku_dummy2[144];
......
...@@ -321,7 +321,17 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un ...@@ -321,7 +321,17 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
// Don't produce db_btree_stat records. // Don't produce db_btree_stat records.
//print_struct("db_btree_stat", 0, db_btree_stat_fields32, db_btree_stat_fields64, sizeof(db_btree_stat_fields32)/sizeof(db_btree_stat_fields32[0]), 0); //print_struct("db_btree_stat", 0, db_btree_stat_fields32, db_btree_stat_fields64, sizeof(db_btree_stat_fields32)/sizeof(db_btree_stat_fields32[0]), 0);
assert(sizeof(db_env_fields32)==sizeof(db_env_fields64)); assert(sizeof(db_env_fields32)==sizeof(db_env_fields64));
print_struct("db_env", 1, db_env_fields32, db_env_fields64, sizeof(db_env_fields32)/sizeof(db_env_fields32[0]), 0); {
const char *extra[]={
"int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */",
"int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */",
"int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */",
"int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */",
"int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */",
"int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */",
NULL};
print_struct("db_env", 1, db_env_fields32, db_env_fields64, sizeof(db_env_fields32)/sizeof(db_env_fields32[0]), extra);
}
assert(sizeof(db_key_range_fields32)==sizeof(db_key_range_fields64)); assert(sizeof(db_key_range_fields32)==sizeof(db_key_range_fields64));
print_struct("db_key_range", 0, db_key_range_fields32, db_key_range_fields64, sizeof(db_key_range_fields32)/sizeof(db_key_range_fields32[0]), 0); print_struct("db_key_range", 0, db_key_range_fields32, db_key_range_fields64, sizeof(db_key_range_fields32)/sizeof(db_key_range_fields32[0]), 0);
......
...@@ -116,11 +116,11 @@ static void test_db_exceptions (void) { ...@@ -116,11 +116,11 @@ static void test_db_exceptions (void) {
} }
{ {
Db db2(&env, 0); Db db2(&env, 0);
TC(db2.open(0, FNAME, "sub.db", DB_BTREE, DB_CREATE, 0777), EINVAL); // sub DB cannot exist TC(db2.open(0, FNAME, "sub.db", DB_BTREE, DB_CREATE, 0777), ENOTDIR); // sub DB cannot exist
} }
{ {
Db db2(&env, 0); Db db2(&env, 0);
TC(db2.open(0, FNAME, "sub.db", DB_BTREE, 0, 0777), EINVAL); // sub DB cannot exist withou DB_CREATE TC(db2.open(0, FNAME, "sub.db", DB_BTREE, 0, 0777), ENOTDIR); // sub DB cannot exist withou DB_CREATE
} }
{ {
Dbc *curs; Dbc *curs;
......
...@@ -127,7 +127,13 @@ typedef enum { ...@@ -127,7 +127,13 @@ typedef enum {
#endif #endif
struct __toku_db_env { struct __toku_db_env {
struct __toku_db_env_internal *i; struct __toku_db_env_internal *i;
void* __toku_dummy0[12]; int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[6];
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */ void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
void* __toku_dummy1[39]; void* __toku_dummy1[39];
char __toku_dummy2[144]; char __toku_dummy2[144];
......
...@@ -39,6 +39,7 @@ struct translation { //This is the BTT (block translation table) ...@@ -39,6 +39,7 @@ struct translation { //This is the BTT (block translation table)
enum {RESERVED_BLOCKNUM_NULL=0, enum {RESERVED_BLOCKNUM_NULL=0,
RESERVED_BLOCKNUM_TRANSLATION=1, RESERVED_BLOCKNUM_TRANSLATION=1,
RESERVED_BLOCKNUM_FIFO=2, RESERVED_BLOCKNUM_FIFO=2,
RESERVED_BLOCKNUM_DESCRIPTOR=3,
RESERVED_BLOCKNUMS}; RESERVED_BLOCKNUMS};
static const BLOCKNUM freelist_null = {-1}; // in a freelist, this indicates end of list static const BLOCKNUM freelist_null = {-1}; // in a freelist, this indicates end of list
...@@ -97,6 +98,24 @@ brtheader_set_dirty(struct brt_header *h, BOOL for_checkpoint){ ...@@ -97,6 +98,24 @@ brtheader_set_dirty(struct brt_header *h, BOOL for_checkpoint){
} }
} }
static void
maybe_truncate_cachefile(BLOCK_TABLE bt, struct brt_header *h, u_int64_t size_needed_before) {
assert(bt->is_locked);
u_int64_t new_size_needed = block_allocator_allocated_limit(bt->block_allocator);
//Save a call to toku_os_get_file_size (kernel call) if unlikely to be useful.
if (new_size_needed < size_needed_before)
toku_maybe_truncate_cachefile(h->cf, new_size_needed);
}
void
toku_maybe_truncate_cachefile_on_open(BLOCK_TABLE bt, struct brt_header *h) {
lock_for_blocktable(bt);
u_int64_t size_needed = block_allocator_allocated_limit(bt->block_allocator);
toku_maybe_truncate_cachefile(h->cf, size_needed);
unlock_for_blocktable(bt);
}
static void static void
copy_translation(struct translation * dst, struct translation * src, enum translation_type newtype) { copy_translation(struct translation * dst, struct translation * src, enum translation_type newtype) {
...@@ -182,9 +201,10 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt); ...@@ -182,9 +201,10 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt);
// move inprogress to checkpoint (resetting type) // move inprogress to checkpoint (resetting type)
// inprogress = NULL // inprogress = NULL
void void
toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt) { toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt, struct brt_header *h) {
// Free unused blocks // Free unused blocks
lock_for_blocktable(bt); lock_for_blocktable(bt);
u_int64_t allocated_limit_at_start = block_allocator_allocated_limit(bt->block_allocator);
assert(bt->inprogress.block_translation); assert(bt->inprogress.block_translation);
if (bt->checkpoint_skipped || bt->checkpoint_failed) { if (bt->checkpoint_skipped || bt->checkpoint_failed) {
cleanup_failed_checkpoint(bt); cleanup_failed_checkpoint(bt);
...@@ -210,6 +230,7 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt); ...@@ -210,6 +230,7 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt);
bt->checkpointed = bt->inprogress; bt->checkpointed = bt->inprogress;
bt->checkpointed.type = TRANSLATION_CHECKPOINTED; bt->checkpointed.type = TRANSLATION_CHECKPOINTED;
memset(&bt->inprogress, 0, sizeof(bt->inprogress)); memset(&bt->inprogress, 0, sizeof(bt->inprogress));
maybe_truncate_cachefile(bt, h, allocated_limit_at_start);
end: end:
unlock_for_blocktable(bt); unlock_for_blocktable(bt);
} }
...@@ -317,12 +338,11 @@ translation_prevents_freeing(struct translation *t, BLOCKNUM b, struct block_tra ...@@ -317,12 +338,11 @@ translation_prevents_freeing(struct translation *t, BLOCKNUM b, struct block_tra
} }
static void static void
blocknum_realloc_on_disk_unlocked (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint) { blocknum_realloc_on_disk_internal (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint) {
assert(bt->is_locked); assert(bt->is_locked);
brtheader_set_dirty(h, for_checkpoint); brtheader_set_dirty(h, for_checkpoint);
struct translation *t = &bt->current; struct translation *t = &bt->current;
verify_valid_freeable_blocknum(t, b);
struct block_translation_pair old_pair = t->block_translation[b.b]; struct block_translation_pair old_pair = t->block_translation[b.b];
PRNTF("old", b.b, old_pair.size, old_pair.u.diskoff, bt); PRNTF("old", b.b, old_pair.size, old_pair.u.diskoff, bt);
//Free the old block if it is not still in use by the checkpoint in progress or the previous checkpoint //Free the old block if it is not still in use by the checkpoint in progress or the previous checkpoint
...@@ -346,13 +366,14 @@ PRNTF("New", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.di ...@@ -346,13 +366,14 @@ PRNTF("New", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.di
assert(b.b < bt->inprogress.length_of_array); assert(b.b < bt->inprogress.length_of_array);
bt->inprogress.block_translation[b.b] = t->block_translation[b.b]; bt->inprogress.block_translation[b.b] = t->block_translation[b.b];
} }
} }
void void
toku_blocknum_realloc_on_disk (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint) { toku_blocknum_realloc_on_disk (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint) {
lock_for_blocktable(bt); lock_for_blocktable(bt);
blocknum_realloc_on_disk_unlocked(bt, b, size, offset, h, for_checkpoint); struct translation *t = &bt->current;
verify_valid_freeable_blocknum(t, b);
blocknum_realloc_on_disk_internal(bt, b, size, offset, h, for_checkpoint);
unlock_for_blocktable(bt); unlock_for_blocktable(bt);
} }
...@@ -446,13 +467,19 @@ toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w, ...@@ -446,13 +467,19 @@ toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w,
// Perhaps rename: purpose is get disk address of a block, given its blocknum (blockid?) // Perhaps rename: purpose is get disk address of a block, given its blocknum (blockid?)
void static void
toku_translate_blocknum_to_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size) { translate_blocknum_to_offset_size_unlocked(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size) {
lock_for_blocktable(bt);
struct translation *t = &bt->current; struct translation *t = &bt->current;
verify_valid_blocknum(t, b); verify_valid_blocknum(t, b);
if (offset) *offset = t->block_translation[b.b].u.diskoff; if (offset) *offset = t->block_translation[b.b].u.diskoff;
if (size) *size = t->block_translation[b.b].size; if (size) *size = t->block_translation[b.b].size;
}
// Perhaps rename: purpose is get disk address of a block, given its blocknum (blockid?)
void
toku_translate_blocknum_to_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size) {
lock_for_blocktable(bt);
translate_blocknum_to_offset_size_unlocked(bt, b, offset, size);
unlock_for_blocktable(bt); unlock_for_blocktable(bt);
} }
...@@ -468,12 +495,20 @@ maybe_expand_translation (struct translation *t) { ...@@ -468,12 +495,20 @@ maybe_expand_translation (struct translation *t) {
u_int64_t i; u_int64_t i;
for (i = t->length_of_array; i < new_length; i++) { for (i = t->length_of_array; i < new_length; i++) {
t->block_translation[i].u.next_free_blocknum = freelist_null; t->block_translation[i].u.next_free_blocknum = freelist_null;
t->block_translation[i].size = size_is_free; t->block_translation[i].size = size_is_free;
} }
t->length_of_array = new_length; t->length_of_array = new_length;
} }
} }
void
toku_allocate_blocknum_for_descriptor(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) {
lock_for_blocktable(bt);
*res = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
brtheader_set_dirty(h, FALSE);
unlock_for_blocktable(bt);
}
void void
toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) { toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) {
assert(bt->is_locked); assert(bt->is_locked);
...@@ -549,16 +584,17 @@ toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h) { ...@@ -549,16 +584,17 @@ toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h) {
void void
toku_block_translation_truncate_unlocked(BLOCK_TABLE bt, struct brt_header *h) { toku_block_translation_truncate_unlocked(BLOCK_TABLE bt, struct brt_header *h) {
assert(bt->is_locked); assert(bt->is_locked);
u_int64_t allocated_limit_at_start = block_allocator_allocated_limit(bt->block_allocator);
brtheader_set_dirty(h, FALSE); brtheader_set_dirty(h, FALSE);
//Free all used blocks except descriptor //Free all regular/data blocks (non reserved)
BLOCKNUM keep_only = h->descriptor.b; //Meta data is stored in reserved blocks
struct translation *t = &bt->current; struct translation *t = &bt->current;
int64_t i; int64_t i;
for (i=RESERVED_BLOCKNUMS; i<t->smallest_never_used_blocknum.b; i++) { for (i=RESERVED_BLOCKNUMS; i<t->smallest_never_used_blocknum.b; i++) {
if (i==keep_only.b) continue;
BLOCKNUM b = make_blocknum(i); BLOCKNUM b = make_blocknum(i);
if (t->block_translation[i].size > 0) free_blocknum_unlocked(bt, &b, h); if (t->block_translation[i].size >= 0) free_blocknum_unlocked(bt, &b, h);
} }
maybe_truncate_cachefile(bt, h, allocated_limit_at_start);
} }
//Verify there are no free blocks. //Verify there are no free blocks.
...@@ -669,8 +705,8 @@ blocktable_create_internal (void) { ...@@ -669,8 +705,8 @@ blocktable_create_internal (void) {
static void static void
translation_default(struct translation *t) { // destination into which to create a default translation translation_default(struct translation *t) { // destination into which to create a default translation
t->type = TRANSLATION_CHECKPOINTED; t->type = TRANSLATION_CHECKPOINTED;
t->smallest_never_used_blocknum = make_blocknum(RESERVED_BLOCKNUMS); t->smallest_never_used_blocknum = make_blocknum(RESERVED_BLOCKNUMS);
t->length_of_array = t->smallest_never_used_blocknum.b + 1; //Enough to store the root t->length_of_array = t->smallest_never_used_blocknum.b;
t->blocknum_freelist_head = freelist_null; t->blocknum_freelist_head = freelist_null;
XMALLOC_N(t->length_of_array, t->block_translation); XMALLOC_N(t->length_of_array, t->block_translation);
int64_t i; int64_t i;
...@@ -856,3 +892,20 @@ toku_blocktable_internal_fragmentation (BLOCK_TABLE bt, int64_t *total_sizep, in ...@@ -856,3 +892,20 @@ toku_blocktable_internal_fragmentation (BLOCK_TABLE bt, int64_t *total_sizep, in
if (used_sizep) *used_sizep = info.used_space; if (used_sizep) *used_sizep = info.used_space;
} }
void
toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, struct brt_header * h) {
lock_for_blocktable(bt);
BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
blocknum_realloc_on_disk_internal(bt, b, size, offset, h, FALSE);
unlock_for_blocktable(bt);
}
void
toku_get_descriptor_offset_size(BLOCK_TABLE bt, DISKOFF *offset, DISKOFF *size) {
lock_for_blocktable(bt);
BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
translate_blocknum_to_offset_size_unlocked(bt, b, offset, size);
unlock_for_blocktable(bt);
}
...@@ -22,10 +22,11 @@ void toku_block_lock_for_multiple_operations(BLOCK_TABLE bt); ...@@ -22,10 +22,11 @@ void toku_block_lock_for_multiple_operations(BLOCK_TABLE bt);
void toku_block_unlock_for_multiple_operations(BLOCK_TABLE bt); void toku_block_unlock_for_multiple_operations(BLOCK_TABLE bt);
void toku_block_translation_note_start_checkpoint_unlocked(BLOCK_TABLE bt); void toku_block_translation_note_start_checkpoint_unlocked(BLOCK_TABLE bt);
void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt); void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt, struct brt_header *h);
void toku_block_translation_note_failed_checkpoint(BLOCK_TABLE bt); void toku_block_translation_note_failed_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_note_skipped_checkpoint(BLOCK_TABLE bt); void toku_block_translation_note_skipped_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_truncate_unlocked(BLOCK_TABLE bt, struct brt_header *h); void toku_block_translation_truncate_unlocked(BLOCK_TABLE bt, struct brt_header *h);
void toku_maybe_truncate_cachefile_on_open(BLOCK_TABLE bt, struct brt_header *h);
//Blocknums //Blocknums
void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h); void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h);
...@@ -35,6 +36,8 @@ void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b); ...@@ -35,6 +36,8 @@ void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt); void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt);
void toku_realloc_fifo_on_disk_unlocked (BLOCK_TABLE, DISKOFF size, DISKOFF *offset); void toku_realloc_fifo_on_disk_unlocked (BLOCK_TABLE, DISKOFF size, DISKOFF *offset);
void toku_get_fifo_offset_on_disk(BLOCK_TABLE bt, DISKOFF *offset); void toku_get_fifo_offset_on_disk(BLOCK_TABLE bt, DISKOFF *offset);
void toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, struct brt_header * h);
void toku_get_descriptor_offset_size(BLOCK_TABLE bt, DISKOFF *offset, DISKOFF *size);
//Blocks and Blocknums //Blocks and Blocknums
void toku_blocknum_realloc_on_disk(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint); void toku_blocknum_realloc_on_disk(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint);
......
...@@ -151,14 +151,10 @@ struct remembered_hash { ...@@ -151,14 +151,10 @@ struct remembered_hash {
enum brtheader_type {BRTHEADER_CURRENT=1, BRTHEADER_CHECKPOINT_INPROGRESS}; enum brtheader_type {BRTHEADER_CURRENT=1, BRTHEADER_CHECKPOINT_INPROGRESS};
struct descriptor {
struct simple_dbt sdbt;
BLOCKNUM b;
};
struct brt_header { struct brt_header {
enum brtheader_type type; enum brtheader_type type;
struct brt_header * checkpoint_header; struct brt_header * checkpoint_header;
CACHEFILE cf;
u_int64_t checkpoint_count; // Free-running counter incremented once per checkpoint (toggling LSB). u_int64_t checkpoint_count; // Free-running counter incremented once per checkpoint (toggling LSB).
// LSB indicates which header location is used on disk so this // LSB indicates which header location is used on disk so this
// counter is effectively a boolean which alternates with each checkpoint. // counter is effectively a boolean which alternates with each checkpoint.
...@@ -172,7 +168,7 @@ struct brt_header { ...@@ -172,7 +168,7 @@ struct brt_header {
BLOCKNUM root; // roots of the dictionary BLOCKNUM root; // roots of the dictionary
struct remembered_hash root_hash; // hash of the root offset. struct remembered_hash root_hash; // hash of the root offset.
unsigned int flags; unsigned int flags;
struct descriptor descriptor; struct simple_dbt descriptor;
FIFO fifo; // all the abort and commit commands. If the header gets flushed to disk, we write the fifo contents beyond the unused_memory. FIFO fifo; // all the abort and commit commands. If the header gets flushed to disk, we write the fifo contents beyond the unused_memory.
......
...@@ -23,35 +23,6 @@ static inline u_int64_t alignup (u_int64_t a, u_int64_t b) { ...@@ -23,35 +23,6 @@ static inline u_int64_t alignup (u_int64_t a, u_int64_t b) {
return ((a+b-1)/b)*b; return ((a+b-1)/b)*b;
} }
int
maybe_preallocate_in_file (int fd, u_int64_t size)
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
// Return 0 on success, otherwise an error number.
{
int64_t file_size;
{
int r = toku_os_get_file_size(fd, &file_size);
assert(r==0);
}
assert(file_size >= 0);
if ((u_int64_t)file_size < size) {
const int N = umin64(size, 16<<20); // Double the size of the file, or add 16MB, whichever is less.
char *MALLOC_N(N, wbuf);
memset(wbuf, 0, N);
toku_off_t start_write = alignup(file_size, 4096);
assert(start_write >= file_size);
ssize_t r = toku_os_pwrite(fd, wbuf, N, start_write);
if (r==-1) {
int e=errno; // must save errno before calling toku_free.
toku_free(wbuf);
return e;
}
toku_free(wbuf);
assert(r==N); // We don't handle short writes properly, which is the case where 0<= r < N.
}
return 0;
}
// This mutex protects pwrite from running in parallel, and also protects modifications to the block allocator. // This mutex protects pwrite from running in parallel, and also protects modifications to the block allocator.
static toku_pthread_mutex_t pwrite_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER; static toku_pthread_mutex_t pwrite_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER;
static int pwrite_is_locked=0; static int pwrite_is_locked=0;
...@@ -79,6 +50,76 @@ unlock_for_pwrite (void) { ...@@ -79,6 +50,76 @@ unlock_for_pwrite (void) {
assert(r==0); assert(r==0);
} }
enum {FILE_CHANGE_INCREMENT = (16<<20)};
void
toku_maybe_truncate_cachefile (CACHEFILE cf, u_int64_t size_used)
// Effect: If file size >= SIZE+32MiB, reduce file size.
// (32 instead of 16.. hysteresis).
// Return 0 on success, otherwise an error number.
{
//Check file size before taking pwrite lock to reduce likelihood of taking
//the lock needlessly.
//Check file size after taking lock to avoid race conditions.
int64_t file_size;
{
int r = toku_os_get_file_size(toku_cachefile_fd(cf), &file_size);
if (r!=0 && toku_cachefile_is_dev_null(cf)) goto done;
assert(r==0);
assert(file_size >= 0);
}
// If file space is overallocated by at least 32M
if ((u_int64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
lock_for_pwrite();
{
int r = toku_os_get_file_size(toku_cachefile_fd(cf), &file_size);
if (r!=0 && toku_cachefile_is_dev_null(cf)) goto cleanup;
assert(r==0);
assert(file_size >= 0);
}
if ((u_int64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
toku_off_t new_size = alignup(file_size, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
assert(new_size < file_size);
int r = toku_cachefile_truncate(cf, new_size);
assert(r==0);
}
cleanup:
unlock_for_pwrite();
}
done:
return;
}
int
maybe_preallocate_in_file (int fd, u_int64_t size)
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MiB whichever is less.
// Return 0 on success, otherwise an error number.
{
int64_t file_size;
{
int r = toku_os_get_file_size(fd, &file_size);
assert(r==0);
}
assert(file_size >= 0);
if ((u_int64_t)file_size < size) {
const int N = umin64(size, FILE_CHANGE_INCREMENT); // Double the size of the file, or add 16MiB, whichever is less.
char *MALLOC_N(N, wbuf);
memset(wbuf, 0, N);
toku_off_t start_write = alignup(file_size, 4096);
assert(start_write >= file_size);
ssize_t r = toku_os_pwrite(fd, wbuf, N, start_write);
if (r==-1) {
int e=errno; // must save errno before calling toku_free.
toku_free(wbuf);
return e;
}
toku_free(wbuf);
assert(r==N); // We don't handle short writes properly, which is the case where 0<= r < N.
}
return 0;
}
static int static int
toku_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offset, ssize_t *num_wrote) toku_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offset, ssize_t *num_wrote)
// requires that the pwrite has been locked // requires that the pwrite has been locked
...@@ -476,8 +517,6 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -476,8 +517,6 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone); //write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
int r; int r;
{ {
lock_for_pwrite();
//TODO: #1463 START (might not be the entire range
// If the node has never been written, then write the whole buffer, including the zeros // If the node has never been written, then write the whole buffer, including the zeros
assert(blocknum.b>=0); assert(blocknum.b>=0);
//printf("%s:%d h=%p\n", __FILE__, __LINE__, h); //printf("%s:%d h=%p\n", __FILE__, __LINE__, h);
...@@ -491,13 +530,13 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -491,13 +530,13 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset, toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset,
h, for_checkpoint); h, for_checkpoint);
ssize_t n_wrote; ssize_t n_wrote;
lock_for_pwrite();
r=toku_pwrite_extend(fd, compressed_buf, n_to_write, offset, &n_wrote); r=toku_pwrite_extend(fd, compressed_buf, n_to_write, offset, &n_wrote);
if (r) { if (r) {
// fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, r, strerror(r)); // fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, r, strerror(r));
} else { } else {
r=0; r=0;
} }
//TODO: #1463 END
unlock_for_pwrite(); unlock_for_pwrite();
} }
...@@ -929,18 +968,11 @@ int toku_serialize_brt_header_size (struct brt_header *UU(h)) { ...@@ -929,18 +968,11 @@ int toku_serialize_brt_header_size (struct brt_header *UU(h)) {
); );
size+=(+8 // diskoff size+=(+8 // diskoff
+4 // flags +4 // flags
+8 // blocknum of descriptor
); );
assert(size <= BLOCK_ALLOCATOR_HEADER_RESERVE); assert(size <= BLOCK_ALLOCATOR_HEADER_RESERVE);
return size; return size;
} }
static void
serialize_descriptor_to_wbuf(struct wbuf *wb, struct descriptor *desc) {
wbuf_BLOCKNUM (wb, desc->b);
//descriptor contents are written to disk on opening brt
}
int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h, DISKOFF translation_location_on_disk, DISKOFF translation_size_on_disk) { int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h, DISKOFF translation_location_on_disk, DISKOFF translation_size_on_disk) {
unsigned int size = toku_serialize_brt_header_size (h); // !!! seems silly to recompute the size when the caller knew it. Do we really need the size? unsigned int size = toku_serialize_brt_header_size (h); // !!! seems silly to recompute the size when the caller knew it. Do we really need the size?
wbuf_literal_bytes(wbuf, "tokudata", 8); wbuf_literal_bytes(wbuf, "tokudata", 8);
...@@ -956,7 +988,6 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h, ...@@ -956,7 +988,6 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h,
wbuf_DISKOFF(wbuf, translation_size_on_disk); wbuf_DISKOFF(wbuf, translation_size_on_disk);
wbuf_BLOCKNUM(wbuf, h->root); wbuf_BLOCKNUM(wbuf, h->root);
wbuf_int (wbuf, h->flags); wbuf_int (wbuf, h->flags);
serialize_descriptor_to_wbuf(wbuf, &h->descriptor);
u_int32_t checksum = x1764_finish(&wbuf->checksum); u_int32_t checksum = x1764_finish(&wbuf->checksum);
wbuf_int(wbuf, checksum); wbuf_int(wbuf, checksum);
assert(wbuf->ndone<=wbuf->size); assert(wbuf->ndone<=wbuf->size);
...@@ -967,7 +998,6 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -967,7 +998,6 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
int rr = 0; int rr = 0;
if (h->panic) return h->panic; if (h->panic) return h->panic;
assert(h->type==BRTHEADER_CHECKPOINT_INPROGRESS); assert(h->type==BRTHEADER_CHECKPOINT_INPROGRESS);
lock_for_pwrite();
toku_block_lock_for_multiple_operations(h->blocktable); toku_block_lock_for_multiple_operations(h->blocktable);
struct wbuf w_translation; struct wbuf w_translation;
int64_t size_translation; int64_t size_translation;
...@@ -991,6 +1021,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -991,6 +1021,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
} }
toku_block_unlock_for_multiple_operations(h->blocktable); toku_block_unlock_for_multiple_operations(h->blocktable);
char *writing_what; char *writing_what;
lock_for_pwrite();
{ {
//Actual Write translation table //Actual Write translation table
ssize_t nwrote; ssize_t nwrote;
...@@ -1033,6 +1064,9 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -1033,6 +1064,9 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
return rr; return rr;
} }
//Descriptor is written to disk during toku_brt_open iff we have a new (or changed)
//descriptor.
//Descriptors are NOT written during the header checkpoint process.
int int
toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset) { toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset) {
int r; int r;
...@@ -1057,17 +1091,14 @@ toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset) { ...@@ -1057,17 +1091,14 @@ toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset) {
} }
static void static void
deserialize_descriptor_from(int fd, struct rbuf *rb, struct brt_header *h, struct descriptor *desc) { deserialize_descriptor_from(int fd, struct brt_header *h, struct simple_dbt *desc) {
DISKOFF offset;
DISKOFF size;
toku_get_descriptor_offset_size(h->blocktable, &offset, &size);
memset(desc, 0, sizeof(*desc)); memset(desc, 0, sizeof(*desc));
desc->b = rbuf_blocknum(rb); if (size > 0) {
if (desc->b.b > 0) {
DISKOFF offset;
DISKOFF size;
toku_translate_blocknum_to_offset_size(h->blocktable, desc->b, &offset, &size);
assert(size>=4); //4 for checksum assert(size>=4); //4 for checksum
{ {
unsigned char *XMALLOC_N(size, dbuf); unsigned char *XMALLOC_N(size, dbuf);
{ {
lock_for_pwrite(); lock_for_pwrite();
...@@ -1082,8 +1113,8 @@ deserialize_descriptor_from(int fd, struct rbuf *rb, struct brt_header *h, struc ...@@ -1082,8 +1113,8 @@ deserialize_descriptor_from(int fd, struct rbuf *rb, struct brt_header *h, struc
u_int32_t stored_x1764 = toku_dtoh32(*(int*)(dbuf + size-4)); u_int32_t stored_x1764 = toku_dtoh32(*(int*)(dbuf + size-4));
assert(x1764 == stored_x1764); assert(x1764 == stored_x1764);
} }
desc->sdbt.len = size-4; desc->len = size-4;
desc->sdbt.data = dbuf; //Uses 4 extra bytes, but fast. desc->data = dbuf; //Uses 4 extra bytes, but fast.
} }
} }
} }
...@@ -1099,7 +1130,7 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) { ...@@ -1099,7 +1130,7 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
struct rbuf rc = *rb; struct rbuf rc = *rb;
memset(rb, 0, sizeof(*rb)); memset(rb, 0, sizeof(*rb));
struct brt_header *MALLOC(h); struct brt_header *CALLOC(h);
if (h==0) return errno; if (h==0) return errno;
int ret=-1; int ret=-1;
if (0) { died1: toku_free(h); return ret; } if (0) { died1: toku_free(h); return ret; }
...@@ -1135,19 +1166,19 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) { ...@@ -1135,19 +1166,19 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
ssize_t r = pread(fd, tbuf, translation_size_on_disk, translation_address_on_disk); ssize_t r = pread(fd, tbuf, translation_size_on_disk, translation_address_on_disk);
assert(r==translation_size_on_disk); assert(r==translation_size_on_disk);
} }
unlock_for_pwrite();
// Create table and read in data. // Create table and read in data.
toku_blocktable_create_from_buffer(&h->blocktable, toku_blocktable_create_from_buffer(&h->blocktable,
translation_address_on_disk, translation_address_on_disk,
translation_size_on_disk, translation_size_on_disk,
tbuf); tbuf);
unlock_for_pwrite();
toku_free(tbuf); toku_free(tbuf);
} }
h->root = rbuf_blocknum(&rc); h->root = rbuf_blocknum(&rc);
h->root_hash.valid = FALSE; h->root_hash.valid = FALSE;
h->flags = rbuf_int(&rc); h->flags = rbuf_int(&rc);
deserialize_descriptor_from(fd, &rc, h, &h->descriptor); deserialize_descriptor_from(fd, h, &h->descriptor);
(void)rbuf_int(&rc); //Read in checksum and ignore (already verified). (void)rbuf_int(&rc); //Read in checksum and ignore (already verified).
if (rc.ndone!=rc.size) {ret = EINVAL; goto died1;} if (rc.ndone!=rc.size) {ret = EINVAL; goto died1;}
toku_free(rc.buf); toku_free(rc.buf);
......
...@@ -617,7 +617,7 @@ brtheader_destroy(struct brt_header *h) { ...@@ -617,7 +617,7 @@ brtheader_destroy(struct brt_header *h) {
else { else {
assert(h->type == BRTHEADER_CURRENT); assert(h->type == BRTHEADER_CURRENT);
toku_blocktable_destroy(&h->blocktable); toku_blocktable_destroy(&h->blocktable);
if (h->descriptor.sdbt.data) toku_free(h->descriptor.sdbt.data); if (h->descriptor.data) toku_free(h->descriptor.data);
} }
} }
...@@ -2861,6 +2861,8 @@ static int ...@@ -2861,6 +2861,8 @@ static int
brt_init_header_partial (BRT t) { brt_init_header_partial (BRT t) {
int r; int r;
t->h->flags = t->flags; t->h->flags = t->flags;
if (t->h->cf!=NULL) assert(t->h->cf == t->cf);
t->h->cf = t->cf;
t->h->nodesize=t->nodesize; t->h->nodesize=t->nodesize;
compute_and_fill_remembered_hash(t); compute_and_fill_remembered_hash(t);
...@@ -2950,6 +2952,7 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header ...@@ -2950,6 +2952,7 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header
struct brt_header *h; struct brt_header *h;
int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cf), &h); int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cf), &h);
if (r!=0) return r; if (r!=0) return r;
h->cf = cf;
h->root_put_counter = global_root_put_counter++; h->root_put_counter = global_root_put_counter++;
toku_cachefile_set_userdata(cf, (void*)h, toku_brtheader_close, toku_brtheader_checkpoint, toku_brtheader_begin_checkpoint, toku_brtheader_end_checkpoint); toku_cachefile_set_userdata(cf, (void*)h, toku_brtheader_close, toku_brtheader_checkpoint, toku_brtheader_begin_checkpoint, toku_brtheader_end_checkpoint);
*header = h; *header = h;
...@@ -3029,25 +3032,27 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre ...@@ -3029,25 +3032,27 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre
} }
} }
if (t->did_set_descriptor) { if (t->did_set_descriptor) {
if (t->h->descriptor.sdbt.len!=t->temp_descriptor.size || if (t->h->descriptor.len!=t->temp_descriptor.size ||
memcmp(t->h->descriptor.sdbt.data, t->temp_descriptor.data, t->temp_descriptor.size)) { memcmp(t->h->descriptor.data, t->temp_descriptor.data, t->temp_descriptor.size)) {
if (t->h->descriptor.b.b <= 0) toku_allocate_blocknum(t->h->blocktable, &t->h->descriptor.b, t->h);
DISKOFF offset; DISKOFF offset;
//4 for checksum //4 for checksum
toku_blocknum_realloc_on_disk(t->h->blocktable, t->h->descriptor.b, t->temp_descriptor.size+4, &offset, t->h, FALSE); toku_realloc_descriptor_on_disk(t->h->blocktable, t->temp_descriptor.size+4, &offset, t->h);
r = toku_serialize_descriptor_contents_to_fd(toku_cachefile_fd(t->cf), &t->temp_descriptor, offset); r = toku_serialize_descriptor_contents_to_fd(toku_cachefile_fd(t->cf), &t->temp_descriptor, offset);
if (r!=0) goto died_after_read_and_pin; if (r!=0) goto died_after_read_and_pin;
if (t->h->descriptor.sdbt.data) toku_free(t->h->descriptor.sdbt.data); if (t->h->descriptor.data) toku_free(t->h->descriptor.data);
t->h->descriptor.sdbt.data = t->temp_descriptor.data; t->h->descriptor.data = t->temp_descriptor.data;
t->h->descriptor.sdbt.len = t->temp_descriptor.size; t->h->descriptor.len = t->temp_descriptor.size;
t->temp_descriptor.data = NULL; t->temp_descriptor.data = NULL;
} }
t->did_set_descriptor = 0; t->did_set_descriptor = 0;
} }
if (t->db) { if (t->db) {
toku_fill_dbt(&t->db->descriptor, t->h->descriptor.sdbt.data, t->h->descriptor.sdbt.len); toku_fill_dbt(&t->db->descriptor, t->h->descriptor.data, t->h->descriptor.len);
} }
assert(t->h); assert(t->h);
//Opening a brt may restore to previous checkpoint. Truncate if necessary.
toku_maybe_truncate_cachefile_on_open(t->h->blocktable, t->h);
WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE -> %p\n", t)); WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE -> %p\n", t));
return 0; return 0;
} }
...@@ -3179,10 +3184,10 @@ toku_brtheader_end_checkpoint (CACHEFILE cachefile, void *header_v) { ...@@ -3179,10 +3184,10 @@ toku_brtheader_end_checkpoint (CACHEFILE cachefile, void *header_v) {
else else
h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location
} }
toku_block_translation_note_end_checkpoint(h->blocktable); toku_block_translation_note_end_checkpoint(h->blocktable, h);
brtheader_free(h->checkpoint_header);
h->checkpoint_header = NULL;
} }
if (h->checkpoint_header) brtheader_free(h->checkpoint_header);
h->checkpoint_header = NULL;
return r; return r;
} }
...@@ -4695,18 +4700,19 @@ int toku_dump_brt (FILE *f, BRT brt) { ...@@ -4695,18 +4700,19 @@ int toku_dump_brt (FILE *f, BRT brt) {
int toku_brt_truncate (BRT brt) { int toku_brt_truncate (BRT brt) {
int r; int r;
// flush the cached tree blocks // flush the cached tree blocks and remove all related pairs from the cachetable
r = toku_brt_flush(brt); r = toku_brt_flush(brt);
// TODO log the truncate? // TODO log the truncate?
toku_block_lock_for_multiple_operations(brt->h->blocktable); toku_block_lock_for_multiple_operations(brt->h->blocktable);
if (r==0) { if (r==0) {
// reinit the header //Free all data blocknums and associated disk space (if not held on to by checkpoint)
toku_block_translation_truncate_unlocked(brt->h->blocktable, brt->h); toku_block_translation_truncate_unlocked(brt->h->blocktable, brt->h);
//Assign blocknum for root block, also dirty the header //Assign blocknum for root block, also dirty the header
toku_allocate_blocknum_unlocked(brt->h->blocktable, &brt->h->root, brt->h); toku_allocate_blocknum_unlocked(brt->h->blocktable, &brt->h->root, brt->h);
// reinit the header
brtheader_partial_destroy(brt->h); brtheader_partial_destroy(brt->h);
r = brt_init_header_partial(brt); r = brt_init_header_partial(brt);
} }
......
...@@ -138,6 +138,9 @@ void toku_brt_destroy(void); ...@@ -138,6 +138,9 @@ void toku_brt_destroy(void);
void toku_pwrite_lock_init(void); void toku_pwrite_lock_init(void);
void toku_pwrite_lock_destroy(void); void toku_pwrite_lock_destroy(void);
void toku_maybe_truncate_cachefile (CACHEFILE cf, u_int64_t size_used);
// Effect: truncate file if overallocated by at least 32MiB
int maybe_preallocate_in_file (int fd, u_int64_t size); int maybe_preallocate_in_file (int fd, u_int64_t size);
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less. // Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
......
...@@ -58,6 +58,13 @@ enum ctpair_state { ...@@ -58,6 +58,13 @@ enum ctpair_state {
CTPAIR_WRITING = 3, // being written from memory CTPAIR_WRITING = 3, // being written from memory
}; };
/* The workqueue pointer cq is set in:
* cachetable_complete_write_pair() cq is cleared, called from many paths, cachetable lock is held during this function
* cachetable_flush_cachefile() called during close and truncate, cachetable lock is held during this function
* toku_cachetable_unpin_and_remove() called during node merge, cachetable lock is held during this function
*
*/
typedef struct ctpair *PAIR; typedef struct ctpair *PAIR;
struct ctpair { struct ctpair {
enum typ_tag tag; enum typ_tag tag;
...@@ -315,11 +322,20 @@ int toku_cachefile_fd (CACHEFILE cf) { ...@@ -315,11 +322,20 @@ int toku_cachefile_fd (CACHEFILE cf) {
return cf->fd; return cf->fd;
} }
int toku_cachefile_truncate0 (CACHEFILE cf) { BOOL
toku_cachefile_is_dev_null (CACHEFILE cf) {
return cf->fname==NULL;
}
int
toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size) {
int r; int r;
r = ftruncate(cf->fd, 0); if (cf->fname==NULL) r = 0; //Don't truncate /dev/null
if (r != 0) else {
r = errno; r = ftruncate(cf->fd, new_size);
if (r != 0)
r = errno;
}
return r; return r;
} }
...@@ -868,12 +884,6 @@ write_pair_for_checkpoint (CACHETABLE ct, PAIR p) ...@@ -868,12 +884,6 @@ write_pair_for_checkpoint (CACHETABLE ct, PAIR p)
// this is essentially a flush_and_maybe_remove except that // this is essentially a flush_and_maybe_remove except that
// we already have p->rwlock and we just do the write in our own thread. // we already have p->rwlock and we just do the write in our own thread.
assert(p->dirty); // it must be dirty if its pending. assert(p->dirty); // it must be dirty if its pending.
#if 0
// TODO: Determine if this is legal, and/or required. Commented out for now
// I believe if it has a queue, removing it it will break whatever's waiting for it.
// p->cq = 0; // I don't want any delay, just do it.
#endif
p->state = CTPAIR_WRITING; //most of this code should run only if NOT ALREADY CTPAIR_WRITING p->state = CTPAIR_WRITING; //most of this code should run only if NOT ALREADY CTPAIR_WRITING
assert(ct->size_writing>=0); assert(ct->size_writing>=0);
ct->size_writing += p->size; ct->size_writing += p->size;
...@@ -1186,8 +1196,6 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -1186,8 +1196,6 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
// work queue. // work queue.
unsigned i; unsigned i;
//THIS LOOP IS NOT THREAD SAFE! Has race condition since flush_and_maybe_remove releases cachetable lock
unsigned num_pairs = 0; unsigned num_pairs = 0;
unsigned list_size = 16; unsigned list_size = 16;
PAIR *list = NULL; PAIR *list = NULL;
...@@ -1335,7 +1343,7 @@ log_open_txn (TOKULOGGER logger, TOKUTXN txn, void *UU(v)) ...@@ -1335,7 +1343,7 @@ log_open_txn (TOKULOGGER logger, TOKUTXN txn, void *UU(v))
int int
toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
// Requires: All three checkpoint-relevant locks must be held (see src/checkpoint.c). // Requires: All three checkpoint-relevant locks must be held (see checkpoint.c).
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record. // Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
// Use the begin_checkpoint callback to take necessary snapshots (header, btt) // Use the begin_checkpoint callback to take necessary snapshots (header, btt)
// Mark every dirty node as "pending." ("Pending" means that the node must be // Mark every dirty node as "pending." ("Pending" means that the node must be
...@@ -1431,7 +1439,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -1431,7 +1439,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
int int
toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string) { toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string) {
// Requires: The big checkpoint lock must be held (see src/checkpoint.c). // Requires: The big checkpoint lock must be held (see checkpoint.c).
// Algorithm: Write all pending nodes to disk // Algorithm: Write all pending nodes to disk
// Use checkpoint callback to write snapshot information to disk (header, btt) // Use checkpoint callback to write snapshot information to disk (header, btt)
// Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks // Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks
...@@ -1670,7 +1678,7 @@ int ...@@ -1670,7 +1678,7 @@ int
toku_cachefile_fsync(CACHEFILE cf) { toku_cachefile_fsync(CACHEFILE cf) {
int r; int r;
if (cf->fname==0) r = 0; //Don't fsync /dev/null if (cf->fname==NULL) r = 0; //Don't fsync /dev/null
else r = fsync(cf->fd); else r = fsync(cf->fd);
return r; return r;
} }
......
...@@ -177,8 +177,10 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname); ...@@ -177,8 +177,10 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname);
int toku_cachefile_redirect_nullfd (CACHEFILE cf); int toku_cachefile_redirect_nullfd (CACHEFILE cf);
// Truncate a cachefile // Truncate a cachefile
// Effect: set the cachefile size to 0 int toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size);
int toku_cachefile_truncate0 (CACHEFILE cf);
//has it been redirected to dev null?
BOOL toku_cachefile_is_dev_null (CACHEFILE cf);
// Return the logger associated with the cachefile // Return the logger associated with the cachefile
TOKULOGGER toku_cachefile_logger (CACHEFILE); TOKULOGGER toku_cachefile_logger (CACHEFILE);
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* The purpose of this file is to implement the high-level logic for * The purpose of this file is to implement the high-level logic for
* taking a checkpoint. * taking a checkpoint.
* *
* There are three locks used for taking a checkpoint. They are: * There are three locks used for taking a checkpoint. They are listed below.
* *
* NOTE: The reader-writer locks may be held by either multiple clients * NOTE: The reader-writer locks may be held by either multiple clients
* or the checkpoint function. (The checkpoint function has the role * or the checkpoint function. (The checkpoint function has the role
...@@ -32,11 +32,10 @@ ...@@ -32,11 +32,10 @@
* (the checkpoint function is non-re-entrant), and to prevent certain operations * (the checkpoint function is non-re-entrant), and to prevent certain operations
* that should not happen during a checkpoint. * that should not happen during a checkpoint.
* The following operations must take the checkpoint_safe lock: * The following operations must take the checkpoint_safe lock:
* - open a dictionary
* - close a dictionary
* - delete a dictionary * - delete a dictionary
* - truncate a dictionary
* - rename a dictionary * - rename a dictionary
* The application can use this lock to disable checkpointing during other sensitive
* operations, such as making a backup copy of the database.
* *
* - ydb_big_lock * - ydb_big_lock
* This is the existing lock used to serialize all access to tokudb. * This is the existing lock used to serialize all access to tokudb.
...@@ -44,15 +43,13 @@ ...@@ -44,15 +43,13 @@
* to set all the "pending" bits and to create the checkpoint-in-progress versions * to set all the "pending" bits and to create the checkpoint-in-progress versions
* of the header and translation table (btt). * of the header and translation table (btt).
* *
* Once the "pending" bits are set and the snapshots are take of the header and btt, * Once the "pending" bits are set and the snapshots are taken of the header and btt,
* most normal database operations are permitted to resume. * most normal database operations are permitted to resume.
* *
* *
* *
*****/ *****/
#include <stdio.h>
#include "toku_portability.h" #include "toku_portability.h"
#include "brttypes.h" #include "brttypes.h"
#include "cachetable.h" #include "cachetable.h"
...@@ -170,8 +167,6 @@ int ...@@ -170,8 +167,6 @@ int
toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string) { toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string) {
int r; int r;
printf("Yipes, checkpoint is being tested\n");
assert(initialized); assert(initialized);
multi_operation_checkpoint_lock(); multi_operation_checkpoint_lock();
checkpoint_safe_checkpoint_lock(); checkpoint_safe_checkpoint_lock();
......
...@@ -6,13 +6,10 @@ ...@@ -6,13 +6,10 @@
#ident "$Id$" #ident "$Id$"
// TODO: $1510 Need callback mechanism so newbrt layer can get/release ydb lock.
/****** /******
* *
* NOTE: atomic operation lock is highest level lock * NOTE: multi_operation_lock is highest level lock
* checkpoint_safe lock is next level lock * checkpoint_safe_lock is next level lock
* ydb_big_lock is next level lock * ydb_big_lock is next level lock
* *
* Locks must always be taken in this sequence (highest level first). * Locks must always be taken in this sequence (highest level first).
...@@ -20,13 +17,12 @@ ...@@ -20,13 +17,12 @@
*/ */
/****** TODO: ydb layer should be taking this lock /******
* Client code must hold the checkpoint_safe lock during the following operations: * Client code must hold the checkpoint_safe lock during the following operations:
* - delete a dictionary * - delete a dictionary via DB->remove
* - truncate a dictionary * - delete a dictionary via DB_TXN->abort(txn) (where txn created a dictionary)
* - rename a dictionary * - rename a dictionary //TODO: Handlerton rename needs to take this
* - abort a transaction that created a dictionary (abort causes dictionary delete) * //TODO: Handlerton rename needs to be recoded for transaction recovery
* - abort a transaction that had a table lock on an empty table (abort causes dictionary truncate)
*****/ *****/
void toku_checkpoint_safe_client_lock(void); void toku_checkpoint_safe_client_lock(void);
...@@ -35,9 +31,8 @@ void toku_checkpoint_safe_client_unlock(void); ...@@ -35,9 +31,8 @@ void toku_checkpoint_safe_client_unlock(void);
/****** TODO: rename these functions as something like begin_atomic_operation and end_atomic_operation /******
* these may need ydb wrappers * These functions are called from the ydb level.
* These functions are called from the handlerton level.
* Client code must hold the multi_operation lock during the following operations: * Client code must hold the multi_operation lock during the following operations:
* - insertion into multiple indexes * - insertion into multiple indexes
* - replace into (simultaneous delete/insert on a single key) * - replace into (simultaneous delete/insert on a single key)
......
...@@ -408,13 +408,13 @@ void toku_logger_txn_close (TOKUTXN txn) { ...@@ -408,13 +408,13 @@ void toku_logger_txn_close (TOKUTXN txn) {
return; return;
} }
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, void (*yield)(void*), void*yieldv) { int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv) {
int r=0; int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv); rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv);
return r; return r;
} }
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, void (*yield)(void*), void*yieldv) { int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv) {
int r=0; int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv); rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv);
if (r!=0) return r; if (r!=0) return r;
...@@ -426,7 +426,7 @@ static int note_brt_used_in_parent_txn(OMTVALUE brtv, u_int32_t UU(index), void* ...@@ -426,7 +426,7 @@ static int note_brt_used_in_parent_txn(OMTVALUE brtv, u_int32_t UU(index), void*
} }
// Doesn't close the txn, just performs the commit operations. // Doesn't close the txn, just performs the commit operations.
int toku_logger_commit (TOKUTXN txn, int nosync, void(*yield)(void*yieldv), void*yieldv) { int toku_logger_commit (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv) {
// printf("%s:%d committing\n", __FILE__, __LINE__); // printf("%s:%d committing\n", __FILE__, __LINE__);
// panic handled in log_commit // panic handled in log_commit
int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks. int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
...@@ -497,7 +497,7 @@ int toku_logger_commit (TOKUTXN txn, int nosync, void(*yield)(void*yieldv), void ...@@ -497,7 +497,7 @@ int toku_logger_commit (TOKUTXN txn, int nosync, void(*yield)(void*yieldv), void
r = toku_commit_rollback_item(txn, item, yield, yieldv); r = toku_commit_rollback_item(txn, item, yield, yieldv);
if (r!=0) return r; if (r!=0) return r;
count++; count++;
if (count%2 == 0) yield(yieldv); if (count%2 == 0) yield(NULL, yieldv);
} }
} }
...@@ -866,7 +866,7 @@ toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unused__) ...@@ -866,7 +866,7 @@ toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unused__)
#endif #endif
// Doesn't close the txn, just performs the abort operations. // Doesn't close the txn, just performs the abort operations.
int toku_logger_abort(TOKUTXN txn, void (*yield)(void*), void*yieldv) { int toku_logger_abort(TOKUTXN txn, YIELDF yield, void*yieldv) {
//printf("%s:%d aborting\n", __FILE__, __LINE__); //printf("%s:%d aborting\n", __FILE__, __LINE__);
// Must undo everything. Must undo it all in reverse order. // Must undo everything. Must undo it all in reverse order.
// Build the reverse list // Build the reverse list
...@@ -883,7 +883,7 @@ int toku_logger_abort(TOKUTXN txn, void (*yield)(void*), void*yieldv) { ...@@ -883,7 +883,7 @@ int toku_logger_abort(TOKUTXN txn, void (*yield)(void*), void*yieldv) {
int r = toku_abort_rollback_item(txn, item, yield, yieldv); int r = toku_abort_rollback_item(txn, item, yield, yieldv);
if (r!=0) return r; if (r!=0) return r;
count++; count++;
if (count%2 == 0) yield(yieldv); if (count%2 == 0) yield(NULL, yieldv);
} }
} }
list_remove(&txn->live_txns_link); list_remove(&txn->live_txns_link);
......
...@@ -40,11 +40,13 @@ int toku_logger_set_lg_max (TOKULOGGER logger, u_int32_t); ...@@ -40,11 +40,13 @@ int toku_logger_set_lg_max (TOKULOGGER logger, u_int32_t);
int toku_logger_get_lg_max (TOKULOGGER logger, u_int32_t *); int toku_logger_get_lg_max (TOKULOGGER logger, u_int32_t *);
int toku_logger_set_lg_bsize(TOKULOGGER, u_int32_t); int toku_logger_set_lg_bsize(TOKULOGGER, u_int32_t);
typedef void(*voidfp)(void);
typedef void(*YIELDF)(voidfp, void*);
// Doesn't close the txn, just performs the commit operations. // Doesn't close the txn, just performs the commit operations.
int toku_logger_commit (TOKUTXN txn, int no_sync, void(*yield)(void*yield_v), void*yield_v); int toku_logger_commit (TOKUTXN txn, int no_sync, YIELDF yield, void*yield_v);
// Doesn't close the txn, just performs the abort operations. // Doesn't close the txn, just performs the abort operations.
int toku_logger_abort(TOKUTXN, void(*/*yield*/)(void*), void*/*yield_v*/); int toku_logger_abort(TOKUTXN, YIELDF, void*/*yield_v*/);
// Closes a txn. Call after commiting or aborting. // Closes a txn. Call after commiting or aborting.
void toku_logger_txn_close (TOKUTXN); void toku_logger_txn_close (TOKUTXN);
...@@ -157,16 +159,16 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn); ...@@ -157,16 +159,16 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn);
struct roll_entry; struct roll_entry;
int toku_rollback_fileentries (int fd, int toku_rollback_fileentries (int fd,
TOKUTXN txn, TOKUTXN txn,
void (*yield)(void*yieldv), YIELDF yield,
void * yieldv); void * yieldv);
int toku_commit_fileentries (int fd, int toku_commit_fileentries (int fd,
TOKUTXN txn, TOKUTXN txn,
void (*yield)(void*yieldv), YIELDF yield,
void * yieldv); void * yieldv);
// do the commit items. Call yield(yield_v) once in a while. // do the commit items. Call yield(yield_v) once in a while.
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, void(*yield)(void*yield_v), void*yield_v); int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yield_v);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, void(*yield)(void*yield_v), void*yield_v); int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yield_v);
int toku_txn_note_brt (TOKUTXN txn, BRT brt); int toku_txn_note_brt (TOKUTXN txn, BRT brt);
int toku_txn_note_close_brt (BRT brt); int toku_txn_note_close_brt (BRT brt);
......
...@@ -225,10 +225,10 @@ generate_log_struct (void) { ...@@ -225,10 +225,10 @@ generate_log_struct (void) {
fprintf(hf, "};\n"); fprintf(hf, "};\n");
fprintf(hf, "int toku_rollback_%s (", lt->name); fprintf(hf, "int toku_rollback_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn, void(*yield)(void*yield_v), void*yield_v);\n"); fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v);\n");
fprintf(hf, "int toku_commit_%s (", lt->name); fprintf(hf, "int toku_commit_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn, void(*yield)(void*yield_v), void*yield_v);\n"); fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v);\n");
}); });
fprintf(hf, "struct log_entry {\n"); fprintf(hf, "struct log_entry {\n");
fprintf(hf, " enum lt_cmd cmd;\n"); fprintf(hf, " enum lt_cmd cmd;\n");
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
/* rollback and rollforward routines. */ /* rollback and rollforward routines. */
#include "includes.h" #include "includes.h"
#include "checkpoint.h"
// these flags control whether or not we send commit messages for // these flags control whether or not we send commit messages for
// various operations // various operations
...@@ -11,8 +12,6 @@ ...@@ -11,8 +12,6 @@
#define TOKU_DO_COMMIT_CMD_DELETE 1 #define TOKU_DO_COMMIT_CMD_DELETE 1
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1 #define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
typedef void (*YIELDF)(void*);
int int
toku_commit_fcreate (TXNID UU(xid), toku_commit_fcreate (TXNID UU(xid),
FILENUM UU(filenum), FILENUM UU(filenum),
...@@ -29,9 +28,10 @@ toku_rollback_fcreate (TXNID UU(xid), ...@@ -29,9 +28,10 @@ toku_rollback_fcreate (TXNID UU(xid),
FILENUM filenum, FILENUM filenum,
BYTESTRING bs_fname, BYTESTRING bs_fname,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield), YIELDF yield,
void* UU(yield_v)) void* yield_v)
{ {
yield(toku_checkpoint_safe_client_lock, yield_v);
char *fname = fixup_fname(&bs_fname); char *fname = fixup_fname(&bs_fname);
char *directory = txn->logger->directory; char *directory = txn->logger->directory;
int full_len=strlen(fname)+strlen(directory)+2; int full_len=strlen(fname)+strlen(directory)+2;
...@@ -48,6 +48,7 @@ toku_rollback_fcreate (TXNID UU(xid), ...@@ -48,6 +48,7 @@ toku_rollback_fcreate (TXNID UU(xid),
r = unlink(full_fname); r = unlink(full_fname);
assert(r==0); assert(r==0);
toku_free(fname); toku_free(fname);
toku_checkpoint_safe_client_unlock();
return 0; return 0;
} }
...@@ -218,7 +219,7 @@ toku_commit_fileentries (int fd, ...@@ -218,7 +219,7 @@ toku_commit_fileentries (int fd,
if (r!=0) goto finish; if (r!=0) goto finish;
memarena_clear(ma); memarena_clear(ma);
count++; count++;
if (count%2==0) yield(yieldv); if (count%2==0) yield(NULL, yieldv);
} }
finish: finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); } { int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
...@@ -245,7 +246,7 @@ toku_rollback_fileentries (int fd, ...@@ -245,7 +246,7 @@ toku_rollback_fileentries (int fd,
if (r!=0) goto finish; if (r!=0) goto finish;
memarena_clear(ma); memarena_clear(ma);
count++; count++;
if (count%2==0) yield(yieldv); if (count%2==0) yield(NULL, yieldv);
} }
finish: finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); } { int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
......
...@@ -83,6 +83,7 @@ BDB_DONTRUN_TESTS = \ ...@@ -83,6 +83,7 @@ BDB_DONTRUN_TESTS = \
test_db_descriptor \ test_db_descriptor \
test_db_descriptor_named_db \ test_db_descriptor_named_db \
test_heaviside_straddle_1622 \ test_heaviside_straddle_1622 \
test_dbremove_old \
#\ ends prev line #\ ends prev line
ifeq ($(OS_CHOICE),windows) ifeq ($(OS_CHOICE),windows)
......
...@@ -58,6 +58,8 @@ struct __toku_db_env_internal { ...@@ -58,6 +58,8 @@ struct __toku_db_env_internal {
char *tmp_dir; char *tmp_dir;
char *lg_dir; char *lg_dir;
char **data_dirs; char **data_dirs;
int (*bt_compare) (DB *, const DBT *, const DBT *);
int (*dup_compare) (DB *, const DBT *, const DBT *);
u_int32_t n_data_dirs; u_int32_t n_data_dirs;
//void (*noticecall)(DB_ENV *, db_notices); //void (*noticecall)(DB_ENV *, db_notices);
unsigned long cachetable_size; unsigned long cachetable_size;
......
...@@ -29,6 +29,7 @@ const char *toku_copyright_string = "Copyright (c) 2007, 2008 Tokutek Inc. All ...@@ -29,6 +29,7 @@ const char *toku_copyright_string = "Copyright (c) 2007, 2008 Tokutek Inc. All
#include "memory.h" #include "memory.h"
#include "dlmalloc.h" #include "dlmalloc.h"
#include "checkpoint.h" #include "checkpoint.h"
#include "key.h"
#ifdef TOKUTRACE #ifdef TOKUTRACE
#define DB_ENV_CREATE_FUN db_env_create_toku10 #define DB_ENV_CREATE_FUN db_env_create_toku10
...@@ -785,6 +786,80 @@ static int locked_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t fla ...@@ -785,6 +786,80 @@ static int locked_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t fla
toku_ydb_lock(); int r = toku_env_txn_stat(env, statp, flags); toku_ydb_unlock(); return r; toku_ydb_lock(); int r = toku_env_txn_stat(env, statp, flags); toku_ydb_unlock(); return r;
} }
static int
env_checkpointing_postpone(DB_ENV * env) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (!env_opened(env)) r = EINVAL;
else toku_checkpoint_safe_client_lock();
return r;
}
static int
env_checkpointing_resume(DB_ENV * env) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (!env_opened(env)) r = EINVAL;
else toku_checkpoint_safe_client_unlock();
return r;
}
static int
env_checkpointing_begin_atomic_operation(DB_ENV * env) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (!env_opened(env)) r = EINVAL;
else toku_multi_operation_client_lock();
return r;
}
static int
env_checkpointing_end_atomic_operation(DB_ENV * env) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (!env_opened(env)) r = EINVAL;
else toku_multi_operation_client_unlock();
return r;
}
static int
env_set_default_dup_compare(DB_ENV * env, int (*dup_compare) (DB *, const DBT *, const DBT *)) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (env_opened(env)) r = EINVAL;
else {
env->i->dup_compare = dup_compare;
}
return r;
}
static int
locked_env_set_default_dup_compare(DB_ENV * env, int (*dup_compare) (DB *, const DBT *, const DBT *)) {
toku_ydb_lock();
int r = env_set_default_dup_compare(env, dup_compare);
toku_ydb_unlock();
return r;
}
static int
env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (env_opened(env)) r = EINVAL;
else {
env->i->bt_compare = bt_compare;
}
return r;
}
static int
locked_env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
toku_ydb_lock();
int r = env_set_default_bt_compare(env, bt_compare);
toku_ydb_unlock();
return r;
}
static int locked_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); static int locked_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
static int toku_db_lt_panic(DB* db, int r); static int toku_db_lt_panic(DB* db, int r);
...@@ -802,6 +877,12 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -802,6 +877,12 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
if (result == 0) { r = ENOMEM; goto cleanup; } if (result == 0) { r = ENOMEM; goto cleanup; }
memset(result, 0, sizeof *result); memset(result, 0, sizeof *result);
result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_locked_env_err; result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_locked_env_err;
result->set_default_bt_compare = locked_env_set_default_bt_compare;
result->set_default_dup_compare = locked_env_set_default_dup_compare;
result->checkpointing_postpone = env_checkpointing_postpone;
result->checkpointing_resume = env_checkpointing_resume;
result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation;
result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation;
result->open = locked_env_open; result->open = locked_env_open;
result->close = locked_env_close; result->close = locked_env_close;
result->txn_checkpoint = toku_env_txn_checkpoint; result->txn_checkpoint = toku_env_txn_checkpoint;
...@@ -835,6 +916,8 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -835,6 +916,8 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
MALLOC(result->i); MALLOC(result->i);
if (result->i == 0) { r = ENOMEM; goto cleanup; } if (result->i == 0) { r = ENOMEM; goto cleanup; }
memset(result->i, 0, sizeof *result->i); memset(result->i, 0, sizeof *result->i);
result->i->bt_compare = toku_default_compare_fun;
result->i->dup_compare = toku_default_compare_fun;
result->i->is_panicked=0; result->i->is_panicked=0;
result->i->panic_string = 0; result->i->panic_string = 0;
result->i->ref_count = 1; result->i->ref_count = 1;
...@@ -904,8 +987,9 @@ static int toku_txn_release_locks(DB_TXN* txn) { ...@@ -904,8 +987,9 @@ static int toku_txn_release_locks(DB_TXN* txn) {
// Yield the lock so someone else can work, and then reacquire the lock. // Yield the lock so someone else can work, and then reacquire the lock.
// Useful while processing commit or rollback logs, to allow others to access the system. // Useful while processing commit or rollback logs, to allow others to access the system.
static void ydb_yield (void *UU(v)) { static void ydb_yield (voidfp f, void *UU(v)) {
toku_ydb_unlock(); toku_ydb_unlock();
if (f) f();
toku_ydb_lock(); toku_ydb_lock();
} }
...@@ -3064,6 +3148,7 @@ static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int3 ...@@ -3064,6 +3148,7 @@ static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int3
static int static int
toku_db_remove_subdb(DB * db, const char *fname, const char *dbname, u_int32_t flags) { toku_db_remove_subdb(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
BOOL need_close = TRUE;
char *directory_name = NULL; char *directory_name = NULL;
int r = find_db_file(db->dbenv, fname, &directory_name); int r = find_db_file(db->dbenv, fname, &directory_name);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
...@@ -3076,10 +3161,15 @@ toku_db_remove_subdb(DB * db, const char *fname, const char *dbname, u_int32_t f ...@@ -3076,10 +3161,15 @@ toku_db_remove_subdb(DB * db, const char *fname, const char *dbname, u_int32_t f
int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname); int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
assert(bytes==(int)sizeof(subdb_full_name)-1); assert(bytes==(int)sizeof(subdb_full_name)-1);
const char *null_subdbname = NULL; const char *null_subdbname = NULL;
need_close = FALSE;
r = toku_db_remove(db, subdb_full_name, null_subdbname, flags); r = toku_db_remove(db, subdb_full_name, null_subdbname, flags);
} }
cleanup: cleanup:
if (need_close) {
int r2 = toku_db_close(db, 0);
if (r==0) r = r2;
}
if (directory_name) toku_free(directory_name); if (directory_name) toku_free(directory_name);
return r; return r;
} }
...@@ -3144,6 +3234,7 @@ static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int3 ...@@ -3144,6 +3234,7 @@ static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int3
TODO: Verify the DB file is not in use (either a single db file or TODO: Verify the DB file is not in use (either a single db file or
a file with multi-databases). a file with multi-databases).
TODO: Check the other directories in the environment for the file TODO: Check the other directories in the environment for the file
TODO: Alert the BRT layer (for logging/recovery purposes)
*/ */
static int toku_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) { static int toku_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
...@@ -3438,11 +3529,21 @@ static int locked_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t ...@@ -3438,11 +3529,21 @@ static int locked_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t
} }
static int locked_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) { static int locked_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
toku_ydb_lock(); int r = toku_db_remove(db, fname, dbname, flags); toku_ydb_unlock(); return r; toku_checkpoint_safe_client_lock();
toku_ydb_lock();
int r = toku_db_remove(db, fname, dbname, flags);
toku_ydb_unlock();
toku_checkpoint_safe_client_unlock();
return r;
} }
static int locked_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) { static int locked_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) {
toku_ydb_lock(); int r = toku_db_rename(db, namea, nameb, namec, flags); toku_ydb_unlock(); return r; toku_checkpoint_safe_client_lock();
toku_ydb_lock();
int r = toku_db_rename(db, namea, nameb, namec, flags);
toku_ydb_unlock();
toku_checkpoint_safe_client_unlock();
return r;
} }
static int locked_db_set_bt_compare(DB * db, int (*bt_compare) (DB *, const DBT *, const DBT *)) { static int locked_db_set_bt_compare(DB * db, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
...@@ -3493,8 +3594,7 @@ static const DBT* toku_db_dbt_neg_infty(void) { ...@@ -3493,8 +3594,7 @@ static const DBT* toku_db_dbt_neg_infty(void) {
} }
static int locked_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) { static int locked_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) {
toku_ydb_lock(); int r = toku_db_truncate(db, txn, row_count, flags); toku_ydb_unlock(); return r\ toku_ydb_lock(); int r = toku_db_truncate(db, txn, row_count, flags); toku_ydb_unlock(); return r;
;
} }
static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) { static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
...@@ -3577,6 +3677,10 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) { ...@@ -3577,6 +3677,10 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
env_unref(env); env_unref(env);
return ENOMEM; return ENOMEM;
} }
r = toku_brt_set_bt_compare(result->i->brt, env->i->bt_compare);
assert(r==0);
r = toku_brt_set_dup_compare(result->i->brt, env->i->dup_compare);
assert(r==0);
ydb_add_ref(); ydb_add_ref();
*db = result; *db = result;
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment