Commit 5bd70821 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3923], merge to main

git-svn-id: file:///svn/toku/tokudb@35506 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3f2dd44f
...@@ -576,6 +576,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi ...@@ -576,6 +576,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi
void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default"))); void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default")));
void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default"))); void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default")));
void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default"))); void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default")));
void db_env_set_flusher_thread_callback (void (*)(int, void*), void*) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -591,6 +591,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi ...@@ -591,6 +591,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi
void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default"))); void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default")));
void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default"))); void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default")));
void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default"))); void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default")));
void db_env_set_flusher_thread_callback (void (*)(int, void*), void*) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -597,6 +597,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi ...@@ -597,6 +597,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi
void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default"))); void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default")));
void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default"))); void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default")));
void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default"))); void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default")));
void db_env_set_flusher_thread_callback (void (*)(int, void*), void*) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -597,6 +597,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi ...@@ -597,6 +597,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi
void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default"))); void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default")));
void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default"))); void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default")));
void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default"))); void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default")));
void db_env_set_flusher_thread_callback (void (*)(int, void*), void*) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -601,6 +601,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi ...@@ -601,6 +601,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi
void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default"))); void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default")));
void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default"))); void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default")));
void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default"))); void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default")));
void db_env_set_flusher_thread_callback (void (*)(int, void*), void*) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -791,6 +791,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__ ...@@ -791,6 +791,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
printf("void db_env_set_loader_size_factor (uint32_t) %s;\n", VISIBLE); printf("void db_env_set_loader_size_factor (uint32_t) %s;\n", VISIBLE);
printf("void db_env_set_mvcc_garbage_collection_verification(u_int32_t) %s;\n", VISIBLE); printf("void db_env_set_mvcc_garbage_collection_verification(u_int32_t) %s;\n", VISIBLE);
printf("void db_env_enable_engine_status(u_int32_t) %s;\n", VISIBLE); printf("void db_env_enable_engine_status(u_int32_t) %s;\n", VISIBLE);
printf("void db_env_set_flusher_thread_callback (void (*)(int, void*), void*) %s;\n", VISIBLE);
printf("#if defined(__cplusplus)\n}\n#endif\n"); printf("#if defined(__cplusplus)\n}\n#endif\n");
printf("#endif\n"); printf("#endif\n");
return 0; return 0;
......
...@@ -535,6 +535,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi ...@@ -535,6 +535,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi
void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default"))); void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default")));
void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default"))); void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default")));
void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default"))); void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default")));
void db_env_set_flusher_thread_callback (void (*)(int, void*), void*) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -535,6 +535,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi ...@@ -535,6 +535,7 @@ void db_env_set_recover_callback2 (void (*)(void*), void*) __attribute__((__visi
void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default"))); void db_env_set_loader_size_factor (uint32_t) __attribute__((__visibility__("default")));
void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default"))); void db_env_set_mvcc_garbage_collection_verification(u_int32_t) __attribute__((__visibility__("default")));
void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default"))); void db_env_enable_engine_status(u_int32_t) __attribute__((__visibility__("default")));
void db_env_set_flusher_thread_callback (void (*)(int, void*), void*) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -51,6 +51,7 @@ BRT_SOURCES = \ ...@@ -51,6 +51,7 @@ BRT_SOURCES = \
dbufio \ dbufio \
fifo \ fifo \
key \ key \
kibbutz \
leafentry \ leafentry \
le-cursor \ le-cursor \
logfilemgr \ logfilemgr \
......
...@@ -509,6 +509,8 @@ struct brt { ...@@ -509,6 +509,8 @@ struct brt {
struct toku_list zombie_brt_link; struct toku_list zombie_brt_link;
}; };
long brtnode_memory_size (BRTNODE node);
/* serialization code */ /* serialization code */
void void
toku_create_compressed_partition_from_available( toku_create_compressed_partition_from_available(
...@@ -566,7 +568,7 @@ struct brtenv { ...@@ -566,7 +568,7 @@ struct brtenv {
long long checksum_number; long long checksum_number;
}; };
extern void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint); extern void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, long* new_size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint);
extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, int*dirty, void*extraargs); extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, int*dirty, void*extraargs);
extern void toku_brtnode_pe_est_callback(void* brtnode_pv, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs); extern void toku_brtnode_pe_est_callback(void* brtnode_pv, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs);
extern int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_freed, void *extraargs); extern int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_freed, void *extraargs);
...@@ -649,6 +651,8 @@ struct pivot_bounds { ...@@ -649,6 +651,8 @@ struct pivot_bounds {
struct kv_pair const * const upper_bound_inclusive; // NULL to indicate negative or positive infinity (which are in practice exclusive since there are now transfinite keys in messages). struct kv_pair const * const upper_bound_inclusive; // NULL to indicate negative or positive infinity (which are in practice exclusive since there are now transfinite keys in messages).
}; };
void maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds);
int int
toku_brt_search_which_child( toku_brt_search_which_child(
DB *cmp_extra, DB *cmp_extra,
...@@ -770,25 +774,29 @@ typedef struct brt_status { ...@@ -770,25 +774,29 @@ typedef struct brt_status {
void toku_brt_get_status(BRT_STATUS); void toku_brt_get_status(BRT_STATUS);
void void
brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, BOOL create_new_node); brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, BOOL create_new_node, u_int32_t num_dependent_nodes, BRTNODE* dependent_nodes);
void void
brt_leaf_apply_cmd_once ( brt_leaf_apply_cmd_once (
BASEMENTNODE bn, BASEMENTNODE bn,
SUBTREE_EST se, SUBTREE_EST se,
const BRT_MSG cmd, const BRT_MSG cmd,
u_int32_t idx, u_int32_t idx,
LEAFENTRY le, LEAFENTRY le,
TOKULOGGER logger, OMT snapshot_txnids,
OMT live_list_reverse,
uint64_t *workdonep uint64_t *workdonep
); );
void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, ANCESTORS ancestors, uint64_t *workdone); void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, uint64_t *workdone, OMT snapshot_txnids, OMT live_list_reverse);
void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created); void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created);
// Reset the root_xid_that_created field to the given value. // Reset the root_xid_that_created field to the given value.
// This redefines which xid created the dictionary. // This redefines which xid created the dictionary.
void toku_flusher_thread_set_callback(void (*callback_f)(int, void*), void* extra);
C_END C_END
#endif #endif
...@@ -747,7 +747,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, ...@@ -747,7 +747,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
// Each partition represents a compressed sub block // Each partition represents a compressed sub block
// For internal nodes, a sub block is a message buffer // For internal nodes, a sub block is a message buffer
// For leaf nodes, a sub block is a basement node // For leaf nodes, a sub block is a basement node
struct sub_block *MALLOC_N(npartitions, sb); struct sub_block *XMALLOC_N(npartitions, sb);
struct sub_block sb_node_info; struct sub_block sb_node_info;
for (int i = 0; i < npartitions; i++) { for (int i = 0; i < npartitions; i++) {
sub_block_init(&sb[i]);; sub_block_init(&sb[i]);;
...@@ -874,9 +874,9 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, ...@@ -874,9 +874,9 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
int nfresh = 0, nstale = 0; int nfresh = 0, nstale = 0;
int nbroadcast_offsets = 0; int nbroadcast_offsets = 0;
if (cmp) { if (cmp) {
MALLOC_N(n_in_this_buffer, stale_offsets); XMALLOC_N(n_in_this_buffer, stale_offsets);
MALLOC_N(n_in_this_buffer, fresh_offsets); XMALLOC_N(n_in_this_buffer, fresh_offsets);
MALLOC_N(n_in_this_buffer, broadcast_offsets); XMALLOC_N(n_in_this_buffer, broadcast_offsets);
} }
for (int i = 0; i < n_in_this_buffer; i++) { for (int i = 0; i < n_in_this_buffer; i++) {
bytevec key; ITEMLEN keylen; bytevec key; ITEMLEN keylen;
......
...@@ -200,7 +200,7 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t ...@@ -200,7 +200,7 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
toku_brtnode_pf_callback, toku_brtnode_pf_callback,
&bfe, &bfe,
brt->h brt->h
); );
if (r!=0) return r; if (r!=0) return r;
BRTNODE node=node_v; BRTNODE node=node_v;
assert(node->height>0); assert(node->height>0);
......
...@@ -97,8 +97,69 @@ get_ith_leafentry (BASEMENTNODE bn, int i) { ...@@ -97,8 +97,69 @@ get_ith_leafentry (BASEMENTNODE bn, int i) {
if (!keep_going_on_failure) goto done; \ if (!keep_going_on_failure) goto done; \
}}) }})
int struct count_msgs_extra {
toku_verify_brtnode (BRT brt, int count;
DBT *key;
MSN msn;
FIFO fifo;
DB *cmp_extra;
brt_compare_func cmp;
};
static int
count_msgs(OMTVALUE v, u_int32_t UU(idx), void *ve)
{
struct count_msgs_extra *e = ve;
long offset = (long) v;
const struct fifo_entry *entry = toku_fifo_get_entry(e->fifo, offset);
DBT dbt;
const DBT *buffer_key = fill_dbt_for_fifo_entry(&dbt, entry);
if (e->cmp(e->cmp_extra, e->key, buffer_key) == 0 && entry->msn.msn == e->msn.msn) {
e->count++;
}
return 0;
}
struct verify_message_tree_extra {
FIFO fifo;
bool broadcast;
bool is_fresh;
int i;
int verbose;
BLOCKNUM blocknum;
int keep_going_on_failure;
};
static int
verify_message_tree(OMTVALUE v, u_int32_t UU(idx), void *ve)
{
struct verify_message_tree_extra *e = ve;
int verbose = e->verbose;
BLOCKNUM blocknum = e->blocknum;
int keep_going_on_failure = e->keep_going_on_failure;
int result = 0;
long offset = (long) v;
const struct fifo_entry *entry = toku_fifo_get_entry(e->fifo, offset);
if (e->broadcast) {
VERIFY_ASSERTION(brt_msg_type_applies_all((enum brt_msg_type) entry->type) || brt_msg_type_does_nothing((enum brt_msg_type) entry->type),
e->i, "message found in broadcast list that is not a broadcast");
} else {
VERIFY_ASSERTION(brt_msg_type_applies_once((enum brt_msg_type) entry->type),
e->i, "message found in fresh or stale message tree that does not apply once");
if (e->is_fresh) {
VERIFY_ASSERTION(entry->is_fresh,
e->i, "message found in fresh message tree that is not fresh");
} else {
VERIFY_ASSERTION(!entry->is_fresh,
e->i, "message found in stale message tree that is fresh");
}
}
done:
return result;
}
int
toku_verify_brtnode (BRT brt,
MSN rootmsn, MSN parentmsn, MSN rootmsn, MSN parentmsn,
BLOCKNUM blocknum, int height, BLOCKNUM blocknum, int height,
struct kv_pair *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.) struct kv_pair *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
...@@ -110,7 +171,7 @@ toku_verify_brtnode (BRT brt, ...@@ -110,7 +171,7 @@ toku_verify_brtnode (BRT brt,
BRTNODE node; BRTNODE node;
void *node_v; void *node_v;
MSN thismsn; MSN thismsn;
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum); u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
{ {
struct brtnode_fetch_extra bfe; struct brtnode_fetch_extra bfe;
...@@ -148,7 +209,7 @@ toku_verify_brtnode (BRT brt, ...@@ -148,7 +209,7 @@ toku_verify_brtnode (BRT brt,
} }
if (node->height > 0) { if (node->height > 0) {
VERIFY_ASSERTION((parentmsn.msn >= thismsn.msn), 0, "node msn must be descending down tree, newest messages at top"); VERIFY_ASSERTION((parentmsn.msn >= thismsn.msn), 0, "node msn must be descending down tree, newest messages at top");
} }
// Verify that all the pivot keys are in order. // Verify that all the pivot keys are in order.
for (int i = 0; i < node->n_children-2; i++) { for (int i = 0; i < node->n_children-2; i++) {
int compare = compare_pairs(brt, node->childkeys[i], node->childkeys[i+1]); int compare = compare_pairs(brt, node->childkeys[i], node->childkeys[i+1]);
...@@ -165,7 +226,7 @@ toku_verify_brtnode (BRT brt, ...@@ -165,7 +226,7 @@ toku_verify_brtnode (BRT brt,
VERIFY_ASSERTION(compare >= 0, i, "Pivot is < the upper-bound pivot"); VERIFY_ASSERTION(compare >= 0, i, "Pivot is < the upper-bound pivot");
} }
} }
for (int i = 0; i < node->n_children; i++) { for (int i = 0; i < node->n_children; i++) {
struct kv_pair *curr_less_pivot = (i==0) ? lesser_pivot : node->childkeys[i-1]; struct kv_pair *curr_less_pivot = (i==0) ? lesser_pivot : node->childkeys[i-1];
struct kv_pair *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : node->childkeys[i]; struct kv_pair *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : node->childkeys[i];
...@@ -173,15 +234,64 @@ toku_verify_brtnode (BRT brt, ...@@ -173,15 +234,64 @@ toku_verify_brtnode (BRT brt,
MSN lastmsn = ZERO_MSN; MSN lastmsn = ZERO_MSN;
// Verify that messages in the buffers are in the right place. // Verify that messages in the buffers are in the right place.
NONLEAF_CHILDINFO bnc = BNC(node, i); NONLEAF_CHILDINFO bnc = BNC(node, i);
FIFO_ITERATE(bnc->buffer, key, keylen, data, datalen, type, msn, xid, UU(is_fresh), FIFO_ITERATE(bnc->buffer, key, keylen, data, datalen, itype, msn, xid, is_fresh,
{ ({
int r = verify_msg_in_child_buffer(brt, type, msn, key, keylen, data, datalen, xid, enum brt_msg_type type = (enum brt_msg_type) itype;
int r = verify_msg_in_child_buffer(brt, type, msn, key, keylen, data, datalen, xid,
curr_less_pivot, curr_less_pivot,
curr_geq_pivot); curr_geq_pivot);
VERIFY_ASSERTION(r==0, i, "A message in the buffer is out of place"); VERIFY_ASSERTION(r==0, i, "A message in the buffer is out of place");
VERIFY_ASSERTION((msn.msn > lastmsn.msn), i, "msn per msg must be monotonically increasing toward newer messages in buffer"); VERIFY_ASSERTION((msn.msn > lastmsn.msn), i, "msn per msg must be monotonically increasing toward newer messages in buffer");
VERIFY_ASSERTION((msn.msn <= thismsn.msn), i, "all messages must have msn within limit of this node's max_msn_applied_to_node_in_memory"); VERIFY_ASSERTION((msn.msn <= thismsn.msn), i, "all messages must have msn within limit of this node's max_msn_applied_to_node_in_memory");
}); DBT keydbt;
struct count_msgs_extra extra = { .count = 0, .key = toku_fill_dbt(&keydbt, key, keylen),
.msn = msn, .fifo = bnc->buffer,
.cmp_extra = brt->db, .cmp = brt->compare_fun };
toku_omt_iterate(bnc->fresh_message_tree, count_msgs, &extra);
if (brt_msg_type_applies_all(type) || brt_msg_type_does_nothing(type)) {
VERIFY_ASSERTION(extra.count == 0, i, "a broadcast message was found in the fresh message tree");
} else {
VERIFY_ASSERTION(brt_msg_type_applies_once(type), i, "a message was found that does not apply either to all or to only one key");
if (is_fresh) {
VERIFY_ASSERTION(extra.count == 1, i, "a fresh message was not found in the fresh message tree");
} else {
VERIFY_ASSERTION(extra.count == 0, i, "a stale message was found in the fresh message tree");
}
}
extra.count = 0;
toku_omt_iterate(bnc->stale_message_tree, count_msgs, &extra);
if (brt_msg_type_applies_all(type) || brt_msg_type_does_nothing(type)) {
VERIFY_ASSERTION(extra.count == 0, i, "a broadcast message was found in the stale message tree");
} else {
VERIFY_ASSERTION(brt_msg_type_applies_once(type), i, "a message was found that does not apply either to all or to only one key");
if (is_fresh) {
VERIFY_ASSERTION(extra.count == 0, i, "a fresh message was found in the stale message tree");
} else {
VERIFY_ASSERTION(extra.count == 1, i, "a stale message was not found in the stale message tree");
}
}
extra.count = 0;
toku_omt_iterate(bnc->broadcast_list, count_msgs, &extra);
if (brt_msg_type_applies_all(type) || brt_msg_type_does_nothing(type)) {
VERIFY_ASSERTION(extra.count == 1, i, "a broadcast message was not found in the broadcast list");
} else {
VERIFY_ASSERTION(brt_msg_type_applies_once(type), i, "a message was found that does not apply either to all or to only one key");
if (is_fresh) {
VERIFY_ASSERTION(extra.count == 0, i, "a broadcast message was found in the fresh message tree");
} else {
VERIFY_ASSERTION(extra.count == 0, i, "a broadcast message was found in the fresh message tree");
}
}
}));
struct verify_message_tree_extra extra = { .fifo = bnc->buffer, .broadcast = false, .is_fresh = true, .i = i, .verbose = verbose, .blocknum = blocknum, .keep_going_on_failure = keep_going_on_failure };
int r = toku_omt_iterate(bnc->fresh_message_tree, verify_message_tree, &extra);
if (r != 0) { result = r; goto done; }
extra.is_fresh = false;
r = toku_omt_iterate(bnc->stale_message_tree, verify_message_tree, &extra);
if (r != 0) { result = r; goto done; }
extra.broadcast = true;
r = toku_omt_iterate(bnc->broadcast_list, verify_message_tree, &extra);
if (r != 0) { result = r; goto done; }
} }
else { else {
BASEMENTNODE bn = BLB(node, i); BASEMENTNODE bn = BLB(node, i);
...@@ -237,12 +347,7 @@ toku_verify_brt_with_progress (BRT brt, int (*progress_callback)(void *extra, fl ...@@ -237,12 +347,7 @@ toku_verify_brt_with_progress (BRT brt, int (*progress_callback)(void *extra, fl
assert(brt->h); assert(brt->h);
u_int32_t root_hash; u_int32_t root_hash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &root_hash); CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &root_hash);
int n_pinned_before = toku_cachefile_count_pinned(brt->cf, 0);
int r = toku_verify_brtnode(brt, ZERO_MSN, ZERO_MSN, *rootp, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going); int r = toku_verify_brtnode(brt, ZERO_MSN, ZERO_MSN, *rootp, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
int n_pinned_after = toku_cachefile_count_pinned(brt->cf, 0);
if (n_pinned_before!=n_pinned_after) {// this may stop working if we release the ydb lock (in some future version of the code).
fprintf(stderr, "%s:%d n_pinned_before=%d n_pinned_after=%d\n", __FILE__, __LINE__, n_pinned_before, n_pinned_after);
}
return r; return r;
} }
......
This diff is collapsed.
...@@ -2776,7 +2776,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int ...@@ -2776,7 +2776,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
DBT theval = { .data = val, .size = vallen }; DBT theval = { .data = val, .size = vallen };
BRT_MSG_S cmd = { BRT_INSERT, ZERO_MSN, lbuf->xids, .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, ZERO_MSN, lbuf->xids, .u.id = { &thekey, &theval } };
uint64_t workdone=0; uint64_t workdone=0;
brt_leaf_apply_cmd_once(BLB(leafnode,0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, &workdone); brt_leaf_apply_cmd_once(BLB(leafnode,0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL, &workdone);
} }
static int write_literal(struct dbout *out, void*data, size_t len) { static int write_literal(struct dbout *out, void*data, size_t len) {
...@@ -3002,6 +3002,8 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -3002,6 +3002,8 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
BRTNODE XMALLOC(node); BRTNODE XMALLOC(node);
toku_initialize_empty_brtnode(node, make_blocknum(blocknum_of_new_node), height, n_children, toku_initialize_empty_brtnode(node, make_blocknum(blocknum_of_new_node), height, n_children,
BRT_LAYOUT_VERSION, target_nodesize, 0); BRT_LAYOUT_VERSION, target_nodesize, 0);
for (int i=0; i<n_children-1; i++)
node->childkeys[i] = NULL;
unsigned int totalchildkeylens = 0; unsigned int totalchildkeylens = 0;
for (int i=0; i<n_children-1; i++) { for (int i=0; i<n_children-1; i++) {
struct kv_pair *childkey = kv_pair_malloc(pivots[i].data, pivots[i].size, NULL, 0); struct kv_pair *childkey = kv_pair_malloc(pivots[i].data, pivots[i].size, NULL, 0);
......
This diff is collapsed.
...@@ -52,6 +52,10 @@ int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CAC ...@@ -52,6 +52,10 @@ int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CAC
// Return the filename // Return the filename
char * toku_cachefile_fname_in_cwd (CACHEFILE cf); char * toku_cachefile_fname_in_cwd (CACHEFILE cf);
// return the fd
int toku_cachefile_fd(CACHEFILE cf);
// TODO: #1510 Add comments on how these behave // TODO: #1510 Add comments on how these behave
int toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER); int toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER);
int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
...@@ -62,6 +66,9 @@ int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, ...@@ -62,6 +66,9 @@ int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
// Requires no locks be held that are taken by the checkpoint function // Requires no locks be held that are taken by the checkpoint function
void toku_cachetable_minicron_shutdown(CACHETABLE ct); void toku_cachetable_minicron_shutdown(CACHETABLE ct);
// Wait for the cachefile's background work to finish.
void toku_cachefile_wait_for_background_work_to_quiesce(CACHEFILE cf);
// Close the cachetable. // Close the cachetable.
// Effects: All of the memory objects are flushed to disk, and the cachetable is destroyed. // Effects: All of the memory objects are flushed to disk, and the cachetable is destroyed.
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */ int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
...@@ -79,9 +86,6 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int /*fd*/, ...@@ -79,9 +86,6 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int /*fd*/,
const char *fname_in_env, const char *fname_in_env,
BOOL with_filenum, FILENUM filenum, BOOL reserved); BOOL with_filenum, FILENUM filenum, BOOL reserved);
// Change the binding of which file is attached to a cachefile. Close the old fd. Use the new fd.
int toku_cachefile_redirect (CACHEFILE cf, int fd, const char *fname_in_env);
int toku_cachetable_reserve_filenum (CACHETABLE ct, FILENUM *reserved_filenum, BOOL with_filenum, FILENUM filenum); int toku_cachetable_reserve_filenum (CACHETABLE ct, FILENUM *reserved_filenum, BOOL with_filenum, FILENUM filenum);
void toku_cachetable_unreserve_filenum (CACHETABLE ct, FILENUM reserved_filenum); void toku_cachetable_unreserve_filenum (CACHETABLE ct, FILENUM reserved_filenum);
...@@ -116,7 +120,7 @@ enum partial_eviction_cost { ...@@ -116,7 +120,7 @@ enum partial_eviction_cost {
// When for_checkpoint is true, this was a 'pending' write // When for_checkpoint is true, this was a 'pending' write
// Returns: 0 if success, otherwise an error number. // Returns: 0 if success, otherwise an error number.
// Can access fd (fd is protected by a readlock during call) // Can access fd (fd is protected by a readlock during call)
typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, void *value, void *write_extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint); typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, void *value, void *write_extraargs, long size, long* new_size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint);
// The fetch callback is called when a thread is attempting to get and pin a memory // The fetch callback is called when a thread is attempting to get and pin a memory
// object and it is not in the cachetable. // object and it is not in the cachetable.
...@@ -155,6 +159,8 @@ typedef BOOL (*CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK)(void *brtnode_pv, voi ...@@ -155,6 +159,8 @@ typedef BOOL (*CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK)(void *brtnode_pv, voi
// Returns: 0 if success, otherwise an error number. // Returns: 0 if success, otherwise an error number.
typedef int (*CACHETABLE_PARTIAL_FETCH_CALLBACK)(void *brtnode_pv, void *read_extraargs, int fd, long *sizep); typedef int (*CACHETABLE_PARTIAL_FETCH_CALLBACK)(void *brtnode_pv, void *read_extraargs, int fd, long *sizep);
typedef void (*CACHETABLE_GET_KEY_AND_FULLHASH)(CACHEKEY* cachekey, u_int32_t* fullhash, void* extra);
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
int (*log_fassociate_during_checkpoint)(CACHEFILE, void*), int (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*), int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
...@@ -175,6 +181,45 @@ void *toku_cachefile_get_userdata(CACHEFILE); ...@@ -175,6 +181,45 @@ void *toku_cachefile_get_userdata(CACHEFILE);
CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf); CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf);
// Effect: Get the cachetable. // Effect: Get the cachetable.
// cachetable pair clean or dirty WRT external memory
enum cachetable_dirty {
CACHETABLE_CLEAN=0, // the cached object is clean WRT the cachefile
CACHETABLE_DIRTY=1, // the cached object is dirty WRT the cachefile
};
void toku_checkpoint_pairs(
CACHEFILE cf,
u_int32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
u_int32_t* dependent_fullhash, //array of fullhashes of dependent pairs
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
);
// put something into the cachetable and checkpoint dependent pairs
// if the checkpointing is necessary
int toku_cachetable_put_with_dep_pairs(
CACHEFILE cachefile,
CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,
void*value,
long size,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *write_extraargs,
void *get_key_and_fullhash_extra,
u_int32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
u_int32_t* dependent_fullhash, //array of fullhashes of dependent pairs
enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs
CACHEKEY* key,
u_int32_t* fullhash
);
// Put a memory object into the cachetable. // Put a memory object into the cachetable.
// Effects: Lookup the key in the cachetable. If the key is not in the cachetable, // Effects: Lookup the key in the cachetable. If the key is not in the cachetable,
// then insert the pair and pin it. Otherwise return an error. Some of the key // then insert the pair and pin it. Otherwise return an error. Some of the key
...@@ -189,6 +234,29 @@ int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -189,6 +234,29 @@ int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
void *write_extraargs void *write_extraargs
); );
int toku_cachetable_get_and_pin_with_dep_pairs (
CACHEFILE cachefile,
CACHEKEY key,
u_int32_t fullhash,
void**value,
long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
void* read_extraargs,
void* write_extraargs,
u_int32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
u_int32_t* dependent_fullhash, //array of fullhashes of dependent pairs
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
);
// Get and pin a memory object. // Get and pin a memory object.
// Effects: If the memory object is in the cachetable, acquire a read lock on it. // Effects: If the memory object is in the cachetable, acquire a read lock on it.
// Otherwise, fetch it from storage by calling the fetch callback. If the fetch // Otherwise, fetch it from storage by calling the fetch callback. If the fetch
...@@ -250,12 +318,6 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash ...@@ -250,12 +318,6 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash
int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**); int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
// Effect: Like maybe get and pin, but may pin a clean pair. // Effect: Like maybe get and pin, but may pin a clean pair.
// cachetable pair clean or dirty WRT external memory
enum cachetable_dirty {
CACHETABLE_CLEAN=0, // the cached object is clean WRT the cachefile
CACHETABLE_DIRTY=1, // the cached object is dirty WRT the cachefile
};
int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size); int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size);
// Effect: Unpin a memory object // Effect: Unpin a memory object
// Modifies: If the memory object is in the cachetable, then OR the dirty flag, // Modifies: If the memory object is in the cachetable, then OR the dirty flag,
...@@ -263,11 +325,18 @@ int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetab ...@@ -263,11 +325,18 @@ int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetab
// Returns: 0 if success, otherwise returns an error number. // Returns: 0 if success, otherwise returns an error number.
// Requires: The ct is locked. // Requires: The ct is locked.
int toku_cachetable_unpin_ct_prelocked(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size); int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size);
// Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked. // Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked.
// Requires: The ct is NOT locked. // Requires: The ct is NOT locked.
int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing something already present is OK. */ void toku_cachetable_prelock(CACHEFILE cf);
// Effect: locks cachetable
void toku_cachetable_unlock(CACHEFILE cf);
// Effect: unlocks cachetable
int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY, BOOL); /* Removing something already present is OK. */
// Effect: Remove an object from the cachetable. Don't write it back. // Effect: Remove an object from the cachetable. Don't write it back.
// Requires: The object must be pinned exactly once. // Requires: The object must be pinned exactly once.
...@@ -386,6 +455,11 @@ int toku_cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, CACHEFILE cf, ...@@ -386,6 +455,11 @@ int toku_cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, CACHEFILE cf,
// Verify the whole cachetable that the cachefile is in. Slow. // Verify the whole cachetable that the cachefile is in. Slow.
void toku_cachefile_verify (CACHEFILE cf); void toku_cachefile_verify (CACHEFILE cf);
int64_t toku_cachetable_size_slowslow (CACHETABLE t);
int64_t toku_cachetable_size_discrepancy (CACHETABLE t);
int64_t toku_cachetable_size_discrepancy_pinned (CACHETABLE t);
int64_t toku_cachetable_size_slow (CACHETABLE t);
// Verify the cachetable. Slow. // Verify the cachetable. Slow.
void toku_cachetable_verify (CACHETABLE t); void toku_cachetable_verify (CACHETABLE t);
...@@ -434,8 +508,15 @@ void toku_cachetable_set_lock_unlock_for_io (CACHETABLE ct, void (*ydb_lock_call ...@@ -434,8 +508,15 @@ void toku_cachetable_set_lock_unlock_for_io (CACHETABLE ct, void (*ydb_lock_call
// Effect: When we do I/O we may need to release locks (e.g., the ydb lock). These functions release the lock acquire the lock. // Effect: When we do I/O we may need to release locks (e.g., the ydb lock). These functions release the lock acquire the lock.
#if 0
int toku_cachetable_local_checkpoint_for_commit(CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[]); int toku_cachetable_local_checkpoint_for_commit(CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[]);
#endif
void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra);
// Effect: Add a job to the cachetable's collection of work to do. Note that function f must call notify_cachefile_that_kibbutz_job_finishing()
void notify_cachefile_that_kibbutz_job_finishing (CACHEFILE cf);
// Effect: When a kibbutz job finishes in a cachefile, the cachetable must be notified.
// test-only function // test-only function
extern int toku_cachetable_get_checkpointing_user_data_status(void); extern int toku_cachetable_get_checkpointing_user_data_status(void);
......
...@@ -34,7 +34,7 @@ static struct fifo_entry *fifo_peek(struct fifo *fifo) { ...@@ -34,7 +34,7 @@ static struct fifo_entry *fifo_peek(struct fifo *fifo) {
} }
int toku_fifo_create(FIFO *ptr) { int toku_fifo_create(FIFO *ptr) {
struct fifo *MALLOC(fifo); struct fifo *XMALLOC(fifo);
if (fifo == 0) return ENOMEM; if (fifo == 0) return ENOMEM;
fifo_init(fifo); fifo_init(fifo);
*ptr = fifo; *ptr = fifo;
...@@ -65,7 +65,7 @@ static int next_power_of_two (int n) { ...@@ -65,7 +65,7 @@ static int next_power_of_two (int n) {
void toku_fifo_size_hint(FIFO fifo, size_t size) { void toku_fifo_size_hint(FIFO fifo, size_t size) {
if (fifo->memory == NULL) { if (fifo->memory == NULL) {
fifo->memory_size = next_power_of_two(size); fifo->memory_size = next_power_of_two(size);
fifo->memory = toku_malloc(fifo->memory_size); fifo->memory = toku_xmalloc(fifo->memory_size);
} }
} }
...@@ -77,7 +77,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d ...@@ -77,7 +77,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
int need_space_total = fifo->memory_used+need_space_here; int need_space_total = fifo->memory_used+need_space_here;
if (fifo->memory == NULL) { if (fifo->memory == NULL) {
fifo->memory_size = next_power_of_two(need_space_total); fifo->memory_size = next_power_of_two(need_space_total);
fifo->memory = toku_malloc(fifo->memory_size); fifo->memory = toku_xmalloc(fifo->memory_size);
} }
if (fifo->memory_start+need_space_total > fifo->memory_size) { if (fifo->memory_start+need_space_total > fifo->memory_size) {
// Out of memory at the end. // Out of memory at the end.
...@@ -85,7 +85,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d ...@@ -85,7 +85,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
if ((2*next_2 > fifo->memory_size) if ((2*next_2 > fifo->memory_size)
|| (8*next_2 < fifo->memory_size)) { || (8*next_2 < fifo->memory_size)) {
// resize the fifo // resize the fifo
char *newmem = toku_malloc(next_2); char *newmem = toku_xmalloc(next_2);
char *oldmem = fifo->memory; char *oldmem = fifo->memory;
if (newmem==0) return ENOMEM; if (newmem==0) return ENOMEM;
memcpy(newmem, oldmem+fifo->memory_start, fifo->memory_used); memcpy(newmem, oldmem+fifo->memory_start, fifo->memory_used);
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <stdbool.h>
#include "toku_pthread.h"
#include "kibbutz.h"
#include "includes.h"
// A Kibbutz is a collection of workers and some work to do.
struct todo {
void (*f)(void *extra);
void *extra;
struct todo *next;
struct todo *prev;
};
struct kid {
struct kibbutz *k;
int id;
};
struct kibbutz {
toku_pthread_mutex_t mutex;
toku_pthread_cond_t cond;
bool please_shutdown;
struct todo *head, *tail; // head is the next thing to do.
int n_workers;
pthread_t *workers; // an array of n_workers
struct kid *ids; // pass this in when creating a worker so it knows who it is.
};
static void *work_on_kibbutz (void *);
KIBBUTZ toku_kibbutz_create (int n_workers) {
KIBBUTZ XMALLOC(k);
{ int r = toku_pthread_mutex_init(&k->mutex, NULL); assert(r==0); }
{ int r = toku_pthread_cond_init(&k->cond, NULL); assert(r==0); }
k->please_shutdown = false;
k->head = NULL;
k->tail = NULL;
k->n_workers = n_workers;
XMALLOC_N(n_workers, k->workers);
XMALLOC_N(n_workers, k->ids);
for (int i=0; i<n_workers; i++) {
k->ids[i].k = k;
k->ids[i].id = i;
int r = toku_pthread_create(&k->workers[i], NULL, work_on_kibbutz, &k->ids[i]);
assert(r==0);
}
return k;
}
static void klock (KIBBUTZ k) {
int r = toku_pthread_mutex_lock(&k->mutex);
assert(r==0);
}
static void kunlock (KIBBUTZ k) {
int r = toku_pthread_mutex_unlock(&k->mutex);
assert(r==0);
}
static void kwait (KIBBUTZ k) {
int r = toku_pthread_cond_wait(&k->cond, &k->mutex);
assert(r==0);
}
static void ksignal (KIBBUTZ k) {
int r = toku_pthread_cond_signal(&k->cond);
assert(r==0);
}
static void *work_on_kibbutz (void *kidv) {
struct kid *kid = kidv;
KIBBUTZ k = kid->k;
klock(k);
while (1) {
while (k->tail) {
struct todo *item = k->tail;
k->tail = item->prev;
if (k->tail==NULL) {
k->head=NULL;
} else {
// if there are other things to do, then wake up the next guy, if there is one.
ksignal(k);
}
kunlock(k);
item->f(item->extra);
toku_free(item);
klock(k);
// if there's another item on k->head, then we'll just go grab it now, without waiting for a signal.
}
if (k->please_shutdown) {
// Don't follow this unless the work is all done, so that when we set please_shutdown, all the work finishes before any threads quit.
ksignal(k); // must wake up anyone else who is waiting, so they can shut down.
kunlock(k);
return NULL;
}
// There is no work to do and it's not time to shutdown, so wait.
kwait(k);
}
}
void toku_kibbutz_enq (KIBBUTZ k, void (*f)(void*), void *extra) {
struct todo *XMALLOC(td);
td->f = f;
td->extra = extra;
klock(k);
assert(!k->please_shutdown);
td->next = k->head;
td->prev = NULL;
if (k->head) {
assert(k->head->prev == NULL);
k->head->prev = td;
}
k->head = td;
if (k->tail==NULL) k->tail = td;
ksignal(k);
kunlock(k);
}
void toku_kibbutz_destroy (KIBBUTZ k)
// Effect: wait for all the enqueued work to finish, and then destroy the kibbutz.
// Note: It is an error for to perform kibbutz_enq operations after this is called.
{
klock(k);
assert(!k->please_shutdown);
k->please_shutdown = true;
ksignal(k); // must wake everyone up to tell them to shutdown.
kunlock(k);
for (int i=0; i<k->n_workers; i++) {
void *result;
int r = toku_pthread_join(k->workers[i], &result);
assert(r==0);
assert(result==NULL);
}
toku_free(k->workers);
toku_free(k->ids);
{ int r = toku_pthread_cond_destroy(&k->cond); assert(r==0); }
{ int r = toku_pthread_mutex_destroy(&k->mutex); assert(r==0); }
toku_free(k);
}
#ifndef KIBBUTZ_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "c_dialects.h"
C_BEGIN
typedef struct kibbutz *KIBBUTZ;
KIBBUTZ toku_kibbutz_create (int n_workers);
void toku_kibbutz_enq (KIBBUTZ k, void (*f)(void*), void *extra);
void toku_kibbutz_destroy (KIBBUTZ k);
C_END
#endif
...@@ -82,6 +82,7 @@ struct tokulogger { ...@@ -82,6 +82,7 @@ struct tokulogger {
int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB
// To access these, you must have the input lock // To access these, you must have the input lock
toku_pthread_mutex_t txn_list_lock; // a lock protecting live_list_reverse and snapshot_txnids for now TODO: revisit this decision
LSN lsn; // the next available lsn LSN lsn; // the next available lsn
OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that? OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that?
OMT live_root_txns; // a sorted tree. OMT live_root_txns; // a sorted tree.
......
...@@ -66,6 +66,7 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -66,6 +66,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
// ct is uninitialized on purpose // ct is uninitialized on purpose
result->lg_max = 100<<20; // 100MB default result->lg_max = 100<<20; // 100MB default
// lsn is uninitialized // lsn is uninitialized
r = toku_pthread_mutex_init(&result->txn_list_lock, 0); if (r!=0) goto panic;
r = toku_omt_create(&result->live_txns); if (r!=0) goto panic; r = toku_omt_create(&result->live_txns); if (r!=0) goto panic;
r = toku_omt_create(&result->live_root_txns); if (r!=0) goto panic; r = toku_omt_create(&result->live_root_txns); if (r!=0) goto panic;
r = toku_omt_create(&result->snapshot_txnids); if (r!=0) goto panic; r = toku_omt_create(&result->snapshot_txnids); if (r!=0) goto panic;
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef TOKU_NBMUTEX_H
#define TOKU_NBMUTEX_H
#ident "$Id: rwlock.h 32279 2011-06-29 13:51:57Z bkuszmaul $"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_assert.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
//Use case:
//Use case:
// General purpose non blocking mutex with properties:
// 1. one writer at a time
// An external mutex must be locked when using these functions. An alternate
// design would bury a mutex into the nb_mutex itself. While this may
// increase parallelism at the expense of single thread performance, we
// are experimenting with a single higher level lock.
typedef struct nb_mutex *NB_MUTEX;
struct nb_mutex {
int writer; // the number of writers
int want_write; // the number of blocked writers
toku_pthread_cond_t wait_write;
};
// initialize a read write lock
static __attribute__((__unused__))
void
nb_mutex_init(NB_MUTEX nb_mutex) {
int r;
nb_mutex->writer = nb_mutex->want_write = 0;
r = toku_pthread_cond_init(&nb_mutex->wait_write, 0); assert(r == 0);
}
// destroy a read write lock
static __attribute__((__unused__))
void
nb_mutex_destroy(NB_MUTEX nb_mutex) {
int r;
assert(nb_mutex->writer == 0 && nb_mutex->want_write == 0);
r = toku_pthread_cond_destroy(&nb_mutex->wait_write); assert(r == 0);
}
// obtain a write lock
// expects: mutex is locked
static inline void nb_mutex_write_lock(NB_MUTEX nb_mutex, toku_pthread_mutex_t *mutex) {
if (nb_mutex->writer) {
nb_mutex->want_write++;
while (nb_mutex->writer) {
int r = toku_pthread_cond_wait(&nb_mutex->wait_write, mutex); assert(r == 0);
}
nb_mutex->want_write--;
}
nb_mutex->writer++;
}
// release a write lock
// expects: mutex is locked
static inline void nb_mutex_write_unlock(NB_MUTEX nb_mutex) {
assert(nb_mutex->writer == 1);
nb_mutex->writer--;
if (nb_mutex->want_write) {
int r = toku_pthread_cond_signal(&nb_mutex->wait_write); assert(r == 0);
}
}
// returns: the number of writers who are waiting for the lock
static inline int nb_mutex_blocked_writers(NB_MUTEX nb_mutex) {
return nb_mutex->want_write;
}
// returns: the number of writers
static inline int nb_mutex_writers(NB_MUTEX nb_mutex) {
return nb_mutex->writer;
}
// returns: the sum of the number of readers, pending readers, writers, and
// pending writers
static inline int nb_mutex_users(NB_MUTEX nb_mutex) {
return nb_mutex->writer + nb_mutex->want_write;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
...@@ -52,7 +52,7 @@ struct omt { ...@@ -52,7 +52,7 @@ struct omt {
static inline int static inline int
omt_create_no_array(OMT *omtp) { omt_create_no_array(OMT *omtp) {
OMT MALLOC(result); OMT XMALLOC(result);
if (result==NULL) return ENOMEM; if (result==NULL) return ENOMEM;
result->is_array = TRUE; result->is_array = TRUE;
result->i.a.num_values = 0; result->i.a.num_values = 0;
...@@ -67,11 +67,7 @@ static int omt_create_internal(OMT *omtp, u_int32_t num_starting_nodes) { ...@@ -67,11 +67,7 @@ static int omt_create_internal(OMT *omtp, u_int32_t num_starting_nodes) {
if (r) return r; if (r) return r;
if (num_starting_nodes < 2) num_starting_nodes = 2; if (num_starting_nodes < 2) num_starting_nodes = 2;
result->capacity = 2*num_starting_nodes; result->capacity = 2*num_starting_nodes;
MALLOC_N(result->capacity, result->i.a.values); XMALLOC_N(result->capacity, result->i.a.values);
if (result->i.a.values==NULL) {
toku_free(result);
return errno;
}
*omtp = result; *omtp = result;
return 0; return 0;
} }
...@@ -144,7 +140,7 @@ static inline int maybe_resize_array(OMT omt, u_int32_t n) { ...@@ -144,7 +140,7 @@ static inline int maybe_resize_array(OMT omt, u_int32_t n) {
u_int32_t room = omt->capacity - omt->i.a.start_idx; u_int32_t room = omt->capacity - omt->i.a.start_idx;
if (room<n || omt->capacity/2>=new_size) { if (room<n || omt->capacity/2>=new_size) {
OMTVALUE *MALLOC_N(new_size, tmp_values); OMTVALUE *XMALLOC_N(new_size, tmp_values);
if (tmp_values==NULL) return errno; if (tmp_values==NULL) return errno;
memcpy(tmp_values, omt->i.a.values+omt->i.a.start_idx, memcpy(tmp_values, omt->i.a.values+omt->i.a.start_idx,
omt->i.a.num_values*sizeof(*tmp_values)); omt->i.a.num_values*sizeof(*tmp_values));
...@@ -162,7 +158,7 @@ static int omt_convert_to_tree(OMT omt) { ...@@ -162,7 +158,7 @@ static int omt_convert_to_tree(OMT omt) {
u_int32_t new_size = num_nodes*2; u_int32_t new_size = num_nodes*2;
new_size = new_size < 4 ? 4 : new_size; new_size = new_size < 4 ? 4 : new_size;
OMT_NODE MALLOC_N(new_size, new_nodes); OMT_NODE XMALLOC_N(new_size, new_nodes);
if (new_nodes==NULL) return errno; if (new_nodes==NULL) return errno;
OMTVALUE *values = omt->i.a.values; OMTVALUE *values = omt->i.a.values;
OMTVALUE *tmp_values = values + omt->i.a.start_idx; OMTVALUE *tmp_values = values + omt->i.a.start_idx;
...@@ -182,7 +178,7 @@ static int omt_convert_to_array(OMT omt) { ...@@ -182,7 +178,7 @@ static int omt_convert_to_array(OMT omt) {
u_int32_t new_size = 2*num_values; u_int32_t new_size = 2*num_values;
new_size = new_size < 4 ? 4 : new_size; new_size = new_size < 4 ? 4 : new_size;
OMTVALUE *MALLOC_N(new_size, tmp_values); OMTVALUE *XMALLOC_N(new_size, tmp_values);
if (tmp_values==NULL) return errno; if (tmp_values==NULL) return errno;
fill_array_with_subtree_values(omt, tmp_values, omt->i.t.root); fill_array_with_subtree_values(omt, tmp_values, omt->i.t.root);
toku_free(omt->i.t.nodes); toku_free(omt->i.t.nodes);
...@@ -262,7 +258,7 @@ static inline void rebalance(OMT omt, node_idx *n_idxp) { ...@@ -262,7 +258,7 @@ static inline void rebalance(OMT omt, node_idx *n_idxp) {
} }
else { else {
malloced = TRUE; malloced = TRUE;
MALLOC_N(n->weight, tmp_array); XMALLOC_N(n->weight, tmp_array);
if (tmp_array==NULL) return; //Don't rebalance. Still a working tree. if (tmp_array==NULL) return; //Don't rebalance. Still a working tree.
} }
fill_array_with_subtree_idxs(omt, tmp_array, idx); fill_array_with_subtree_idxs(omt, tmp_array, idx);
...@@ -781,6 +777,87 @@ int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) { ...@@ -781,6 +777,87 @@ int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) {
return 0; return 0;
} }
struct copy_data_extra {
OMTVALUE *a;
u_int32_t eltsize;
};
static int copy_data_iter(OMTVALUE v, u_int32_t idx, void *ve) {
struct copy_data_extra *e = ve;
memcpy(e->a[idx], v, e->eltsize);
return 0;
}
static int omt_copy_data(OMTVALUE *a, OMT omt, u_int32_t eltsize) {
struct copy_data_extra extra = { .a = a, .eltsize = eltsize };
if (omt->is_array) {
return iterate_internal_array(omt, 0, omt_size(omt), copy_data_iter, &extra);
}
return iterate_internal(omt, 0, nweight(omt, omt->i.t.root), omt->i.t.root, 0, copy_data_iter, &extra);
}
int toku_omt_clone(OMT *dest, OMT src, u_int32_t eltsize) {
u_int32_t size = omt_size(src);
if (size == 0) {
*dest = NULL;
return 0;
}
OMTVALUE *a = toku_xmalloc((sizeof *a) * size);
for (u_int32_t i = 0; i < size; ++i) {
a[i] = toku_xmalloc(eltsize);
}
int r = omt_copy_data(a, src, eltsize);
if (r != 0) { goto err; }
r = toku_omt_create_steal_sorted_array(dest, &a, size, size);
if (r != 0) { goto err; }
return 0;
err:
toku_free(a);
return r;
}
int toku_omt_clone_pool(OMT *dest, OMT src, u_int32_t eltsize) {
u_int32_t size = omt_size(src);
if (size == 0) {
*dest = NULL;
return 0;
}
OMTVALUE *a = toku_xmalloc((sizeof *a) * size);
unsigned char *data = toku_xmalloc(eltsize * size);
for (u_int32_t i = 0; i < size; ++i) {
a[i] = &data[eltsize * i];
}
int r = omt_copy_data(a, src, eltsize);
if (r != 0) { goto err; }
r = toku_omt_create_steal_sorted_array(dest, &a, size, size);
if (r != 0) { goto err; }
return 0;
err:
toku_free(data);
toku_free(a);
return r;
}
int toku_omt_clone_noptr(OMT *dest, OMT src) {
u_int32_t size = omt_size(src);
if (size == 0) {
*dest = NULL;
return 0;
}
OMTVALUE *a = toku_xmalloc((sizeof *a) * size);
if (src->is_array) {
memcpy(a, src->i.a.values + src->i.a.start_idx, size * (sizeof *src->i.a.values));
} else {
fill_array_with_subtree_values(src, a, src->i.t.root);
}
int r = toku_omt_create_steal_sorted_array(dest, &a, size, size);
if (r != 0) { goto err; }
return 0;
err:
toku_free(a);
return r;
}
void toku_omt_clear(OMT omt) { void toku_omt_clear(OMT omt) {
if (omt->is_array) { if (omt->is_array) {
omt->i.a.start_idx = 0; omt->i.a.start_idx = 0;
......
...@@ -407,6 +407,36 @@ int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomt); ...@@ -407,6 +407,36 @@ int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomt);
// On error, nothing is modified. // On error, nothing is modified.
// Performance: time=O(n) is acceptable, but one can imagine implementations that are O(\log n) worst-case. // Performance: time=O(n) is acceptable, but one can imagine implementations that are O(\log n) worst-case.
int toku_omt_clone(OMT *dest, OMT src, u_int32_t eltsize);
// Effect: Creates a copy of an omt.
// Sets *dest to the clone
// Each element is allocated separately with toku_xmalloc and is assumed to be eltsize big.
// Returns 0 on success
// ENOMEM on out of memory.
// On error, nothing is modified.
// Performance: time between O(n) and O(n log n), depending how long it
// takes to traverse src.
int toku_omt_clone_pool(OMT *dest, OMT src, u_int32_t eltsize);
// Effect: Creates a copy of an omt.
// Sets *dest to the clone
// Each element is copied to a contiguous buffer allocated with toku_xmalloc and each element is assumed to be eltsize big.
// Returns 0 on success
// ENOMEM on out of memory.
// On error, nothing is modified.
// Performance: time between O(n) and O(n log n), depending how long it
// takes to traverse src.
int toku_omt_clone_noptr(OMT *dest, OMT src);
// Effect: Creates a copy of an omt.
// Sets *dest to the clone
// Each element is assumed to be stored directly in the omt, that is, the OMTVALUEs are not pointers, they are data. Thus no extra memory allocation is required.
// Returns 0 on success
// ENOMEM on out of memory.
// On error, nothing is modified.
// Performance: time between O(n) and O(n log n), depending how long it
// takes to traverse src.
void toku_omt_clear(OMT omt); void toku_omt_clear(OMT omt);
// Effect: Set the tree to be empty. // Effect: Set the tree to be empty.
// Note: Will not reallocate or resize any memory, since returning void precludes calling malloc. // Note: Will not reallocate or resize any memory, since returning void precludes calling malloc.
......
...@@ -56,7 +56,7 @@ toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) { ...@@ -56,7 +56,7 @@ toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
if (txn->pinned_inprogress_rollback_log == log) { if (txn->pinned_inprogress_rollback_log == log) {
txn->pinned_inprogress_rollback_log = NULL; txn->pinned_inprogress_rollback_log = NULL;
} }
r = toku_cachetable_unpin_and_remove (cf, log->thislogname); r = toku_cachetable_unpin_and_remove (cf, log->thislogname, FALSE);
assert(r==0); assert(r==0);
toku_free_blocknum(h->blocktable, &to_free, h); toku_free_blocknum(h->blocktable, &to_free, h);
return r; return r;
...@@ -141,10 +141,10 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, ...@@ -141,10 +141,10 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
int int
toku_find_xid_by_xid (OMTVALUE v, void *xidv) { toku_find_xid_by_xid (OMTVALUE v, void *xidv) {
TXNID *xid = v; TXNID xid = (TXNID) v;
TXNID *xidfind = xidv; TXNID xidfind = (TXNID) xidv;
if (*xid<*xidfind) return -1; if (xid<xidfind) return -1;
if (*xid>*xidfind) return +1; if (xid>xidfind) return +1;
return 0; return 0;
} }
...@@ -168,8 +168,8 @@ toku_find_pair_by_xid (OMTVALUE v, void *xidv) { ...@@ -168,8 +168,8 @@ toku_find_pair_by_xid (OMTVALUE v, void *xidv) {
static int static int
live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) { live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
TOKUTXN txn = txnv; TOKUTXN txn = txnv;
TXNID xid = txn->txnid64; // xid of txn that is closing TXNID xid = txn->txnid64; // xid of txn that is closing
TXNID *live_xid = live_xidv; // xid on closing txn's live list TXNID *live_xid = live_xidv; // xid on closing txn's live list
OMTVALUE pairv; OMTVALUE pairv;
XID_PAIR pair; XID_PAIR pair;
uint32_t idx; uint32_t idx;
...@@ -182,20 +182,20 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi ...@@ -182,20 +182,20 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi
invariant(pair->xid1 == *live_xid); //sanity check invariant(pair->xid1 == *live_xid); //sanity check
if (pair->xid2 == xid) { if (pair->xid2 == xid) {
//There is a record that needs to be either deleted or updated //There is a record that needs to be either deleted or updated
TXNID *olderxid; TXNID olderxid;
OMTVALUE olderv; OMTVALUE olderv;
uint32_t olderidx; uint32_t olderidx;
OMT snapshot = txn->logger->snapshot_txnids; OMT snapshot = txn->logger->snapshot_txnids;
BOOL should_delete = TRUE; BOOL should_delete = TRUE;
// find the youngest txn in snapshot that is older than xid // find the youngest txn in snapshot that is older than xid
r = toku_omt_find(snapshot, toku_find_xid_by_xid, &xid, -1, &olderv, &olderidx); r = toku_omt_find(snapshot, toku_find_xid_by_xid, (OMTVALUE) xid, -1, &olderv, &olderidx);
if (r==0) { if (r==0) {
//There is an older txn //There is an older txn
olderxid = olderv; olderxid = (TXNID) olderv;
invariant(*olderxid < xid); invariant(olderxid < xid);
if (*olderxid >= *live_xid) { if (olderxid >= *live_xid) {
//Older txn is new enough, we need to update. //Older txn is new enough, we need to update.
pair->xid2 = *olderxid; pair->xid2 = olderxid;
should_delete = FALSE; should_delete = FALSE;
} }
} }
...@@ -232,6 +232,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -232,6 +232,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b); assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
assert(txn->current_rollback.b == ROLLBACK_NONE.b); assert(txn->current_rollback.b == ROLLBACK_NONE.b);
int r; int r;
r = toku_pthread_mutex_lock(&txn->logger->txn_list_lock); assert_zero(r);
{ {
{ {
//Remove txn from list (omt) of live transactions //Remove txn from list (omt) of live transactions
...@@ -266,13 +267,12 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -266,13 +267,12 @@ void toku_rollback_txn_close (TOKUTXN txn) {
u_int32_t idx; u_int32_t idx;
OMTVALUE v; OMTVALUE v;
//Free memory used for snapshot_txnids //Free memory used for snapshot_txnids
r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, &txn->txnid64, &v, &idx); r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
invariant(r==0); invariant(r==0);
TXNID *xid = v; TXNID xid = (TXNID) v;
invariant(*xid == txn->txnid64); invariant(xid == txn->txnid64);
r = toku_omt_delete_at(txn->logger->snapshot_txnids, idx); r = toku_omt_delete_at(txn->logger->snapshot_txnids, idx);
invariant(r==0); invariant(r==0);
toku_free(v);
} }
live_list_reverse_note_txn_end(txn); live_list_reverse_note_txn_end(txn);
{ {
...@@ -287,6 +287,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -287,6 +287,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
} }
} }
} }
r = toku_pthread_mutex_unlock(&txn->logger->txn_list_lock); assert_zero(r);
assert(txn->logger->oldest_living_xid <= txn->txnid64); assert(txn->logger->oldest_living_xid <= txn->txnid64);
if (txn->txnid64 == txn->logger->oldest_living_xid) { if (txn->txnid64 == txn->logger->oldest_living_xid) {
...@@ -466,7 +467,7 @@ toku_rollback_log_free(ROLLBACK_LOG_NODE *log_p) { ...@@ -466,7 +467,7 @@ toku_rollback_log_free(ROLLBACK_LOG_NODE *log_p) {
} }
static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
void *rollback_v, void *extraargs, long UU(size), void *rollback_v, void *extraargs, long size, long* new_size,
BOOL write_me, BOOL keep_me, BOOL for_checkpoint) { BOOL write_me, BOOL keep_me, BOOL for_checkpoint) {
int r; int r;
ROLLBACK_LOG_NODE log = rollback_v; ROLLBACK_LOG_NODE log = rollback_v;
...@@ -491,6 +492,7 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM ...@@ -491,6 +492,7 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM
} }
} }
} }
*new_size = size;
if (!keep_me) { if (!keep_me) {
toku_rollback_log_free(&log); toku_rollback_log_free(&log);
} }
...@@ -583,6 +585,16 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o ...@@ -583,6 +585,16 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o
return 0; return 0;
} }
int
toku_unpin_inprogress_rollback_log(TOKUTXN txn) {
if (txn->pinned_inprogress_rollback_log) {
return toku_rollback_log_unpin(txn, txn->pinned_inprogress_rollback_log);
}
else {
return 0;
}
}
int int
toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) { toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r; int r;
...@@ -827,7 +839,8 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO ...@@ -827,7 +839,8 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO
toku_rollback_pf_req_callback, toku_rollback_pf_req_callback,
toku_rollback_pf_callback, toku_rollback_pf_callback,
h, h,
h); h
);
assert(r==0); assert(r==0);
log = (ROLLBACK_LOG_NODE)log_v; log = (ROLLBACK_LOG_NODE)log_v;
} }
......
...@@ -21,6 +21,7 @@ int toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE ...@@ -21,6 +21,7 @@ int toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE
int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLOCKNUM name, uint32_t hash, ROLLBACK_LOG_NODE *result); int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLOCKNUM name, uint32_t hash, ROLLBACK_LOG_NODE *result);
int toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log); int toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log);
int toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log); int toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log);
int toku_unpin_inprogress_rollback_log(TOKUTXN txn);
int toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log); int toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log);
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
......
...@@ -119,7 +119,7 @@ static void *my_malloc(size_t n) { ...@@ -119,7 +119,7 @@ static void *my_malloc(size_t n) {
if (n >= my_big_malloc_limit) { if (n >= my_big_malloc_limit) {
(void) toku_sync_fetch_and_increment_int32(&my_big_malloc_count); // my_big_malloc_count++; (void) toku_sync_fetch_and_increment_int32(&my_big_malloc_count); // my_big_malloc_count++;
if (do_malloc_errors) { if (do_malloc_errors) {
if (event_add_and_fetch()== event_count_trigger) { if (event_add_and_fetch() == event_count_trigger) {
event_hit(); event_hit();
errno = ENOMEM; errno = ENOMEM;
return NULL; return NULL;
......
...@@ -40,9 +40,10 @@ static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT * ...@@ -40,9 +40,10 @@ static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *
abort(); abort();
} }
static void write_dbfile (char *template, int n, char *output_name, BOOL expect_error, int testno) { static int write_dbfile (char *template, int n, char *output_name, BOOL expect_error, int testno) {
if (verbose) printf("test start %d %d testno=%d\n", n, expect_error, testno); if (verbose) printf("test start %d %d testno=%d\n", n, expect_error, testno);
int result = 0;
DB *dest_db = NULL; DB *dest_db = NULL;
struct brtloader_s bl = { struct brtloader_s bl = {
.temp_file_template = template, .temp_file_template = template,
...@@ -74,7 +75,6 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -74,7 +75,6 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_set_error_function(&bl.error_callback, err_cb, NULL); brt_loader_set_error_function(&bl.error_callback, err_cb, NULL);
brt_loader_init_poll_callback(&bl.poll_callback); brt_loader_init_poll_callback(&bl.poll_callback);
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r); r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
// destroy_rowset(&aset);
brtloader_fi_close_all(&bl.file_infos); brtloader_fi_close_all(&bl.file_infos);
...@@ -132,9 +132,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -132,9 +132,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_set_error_function(&bl.error_callback, NULL, NULL); brt_loader_set_error_function(&bl.error_callback, NULL, NULL);
brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL); brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0, 0); result = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0, 0);
// if (!(expect_error ? r != 0 : r == 0)) printf("WARNING%%d expect_error=%d r=%d\n", __LINE__, expect_error, r);
assert(expect_error ? r != 0 : r == 0);
toku_set_func_malloc_only(NULL); toku_set_func_malloc_only(NULL);
toku_set_func_realloc_only(NULL); toku_set_func_realloc_only(NULL);
...@@ -147,11 +145,12 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -147,11 +145,12 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_lock_destroy(&bl); brt_loader_lock_destroy(&bl);
r = queue_destroy(q2); r = queue_destroy(q2);
//if (r != 0) printf("WARNING%d r=%d\n", __LINE__, r);
assert(r==0); assert(r==0);
destroy_merge_fileset(&fs); destroy_merge_fileset(&fs);
brtloader_fi_destroy(&bl.file_infos, expect_error); brtloader_fi_destroy(&bl.file_infos, expect_error);
return result;
} }
static int usage(const char *progname, int n) { static int usage(const char *progname, int n) {
...@@ -222,7 +221,7 @@ int test_main (int argc, const char *argv[]) { ...@@ -222,7 +221,7 @@ int test_main (int argc, const char *argv[]) {
int r; int r;
r = system(unlink_all); CKERR(r); r = system(unlink_all); CKERR(r);
r = toku_os_mkdir(directory, 0755); CKERR(r); r = toku_os_mkdir(directory, 0755); CKERR(r);
write_dbfile(template, n, output_name, FALSE, 0); r = write_dbfile(template, n, output_name, FALSE, 0); CKERR(r);
if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count); if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count);
if (verbose) printf("my_realloc_count=%d big_count=%d\n", my_realloc_count, my_big_realloc_count); if (verbose) printf("my_realloc_count=%d big_count=%d\n", my_realloc_count, my_big_realloc_count);
...@@ -230,13 +229,19 @@ int test_main (int argc, const char *argv[]) { ...@@ -230,13 +229,19 @@ int test_main (int argc, const char *argv[]) {
int event_limit = event_count; int event_limit = event_count;
if (verbose) printf("event_limit=%d\n", event_limit); if (verbose) printf("event_limit=%d\n", event_limit);
// we computed an upper bound on the number of events. since the loader continues to malloc after a
// malloc failure, the actual number of events that can induce a failed load is less than the upper
// bound.
for (int i = 1; i <= event_limit; i++) { for (int i = 1; i <= event_limit; i++) {
reset_event_counts(); reset_event_counts();
reset_my_malloc_counts(); reset_my_malloc_counts();
event_count_trigger = i; event_count_trigger = i;
r = system(unlink_all); CKERR(r); r = system(unlink_all); CKERR(r);
r = toku_os_mkdir(directory, 0755); CKERR(r); r = toku_os_mkdir(directory, 0755); CKERR(r);
write_dbfile(template, n, output_name, TRUE, i); r = write_dbfile(template, n, output_name, TRUE, i);
if (verbose) printf("event_count=%d\n", event_count);
if (r == 0)
break;
} }
r = system(unlink_all); CKERR(r); r = system(unlink_all); CKERR(r);
......
This diff is collapsed.
...@@ -10,6 +10,7 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -10,6 +10,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
void *v __attribute__((__unused__)), void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)), void *e __attribute__((__unused__)),
long s __attribute__((__unused__)), long s __attribute__((__unused__)),
long* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)), BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)), BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__)) BOOL c __attribute__((__unused__))
...@@ -58,7 +59,7 @@ pe_callback ( ...@@ -58,7 +59,7 @@ pe_callback (
void* extraargs __attribute__((__unused__)) void* extraargs __attribute__((__unused__))
) )
{ {
*bytes_freed = 0; *bytes_freed = bytes_to_free;
return 0; return 0;
} }
......
...@@ -32,7 +32,7 @@ sleep_random (void) ...@@ -32,7 +32,7 @@ sleep_random (void)
int expect_value = 42; // initially 42, later 43 int expect_value = 42; // initially 42, later 43
static void static void
flush (CACHEFILE UU(thiscf), int UU(fd), CACHEKEY UU(key), void *value, void *UU(extraargs), long size, BOOL write_me, BOOL keep_me, BOOL UU(for_checkpoint)) flush (CACHEFILE UU(thiscf), int UU(fd), CACHEKEY UU(key), void *value, void *UU(extraargs), long size, long* UU(new_size), BOOL write_me, BOOL keep_me, BOOL UU(for_checkpoint))
{ {
// printf("f"); // printf("f");
assert(size == item_size); assert(size == item_size);
...@@ -72,7 +72,7 @@ pe_callback ( ...@@ -72,7 +72,7 @@ pe_callback (
void* extraargs __attribute__((__unused__)) void* extraargs __attribute__((__unused__))
) )
{ {
*bytes_freed = 0; *bytes_freed = bytes_to_free;
return 0; return 0;
} }
......
This diff is collapsed.
This diff is collapsed.
...@@ -12,7 +12,7 @@ static const int item_size = 1; ...@@ -12,7 +12,7 @@ static const int item_size = 1;
static int n_flush, n_write_me, n_keep_me, n_fetch; static int n_flush, n_write_me, n_keep_me, n_fetch;
static void flush(CACHEFILE cf, int UU(fd), CACHEKEY key, void *value, void *extraargs, long size, BOOL write_me, BOOL keep_me, BOOL UU(for_checkpoint)) { static void flush(CACHEFILE cf, int UU(fd), CACHEKEY key, void *value, void *extraargs, long size, long* UU(new_size), BOOL write_me, BOOL keep_me, BOOL UU(for_checkpoint)) {
cf = cf; key = key; value = value; extraargs = extraargs; cf = cf; key = key; value = value; extraargs = extraargs;
// assert(key == make_blocknum((long)value)); // assert(key == make_blocknum((long)value));
assert(size == item_size); assert(size == item_size);
...@@ -41,7 +41,7 @@ pe_callback ( ...@@ -41,7 +41,7 @@ pe_callback (
void* extraargs __attribute__((__unused__)) void* extraargs __attribute__((__unused__))
) )
{ {
*bytes_freed = 0; *bytes_freed = bytes_to_free;
return 0; return 0;
} }
......
...@@ -12,6 +12,7 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -12,6 +12,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
void *v __attribute__((__unused__)), void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)), void *e __attribute__((__unused__)),
long s __attribute__((__unused__)), long s __attribute__((__unused__)),
long* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)), BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)), BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__)) BOOL c __attribute__((__unused__))
...@@ -38,7 +39,7 @@ pe_callback ( ...@@ -38,7 +39,7 @@ pe_callback (
void* extraargs __attribute__((__unused__)) void* extraargs __attribute__((__unused__))
) )
{ {
*bytes_freed = 0; *bytes_freed = bytes_to_free;
return 0; return 0;
} }
......
...@@ -15,6 +15,7 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -15,6 +15,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
void *v __attribute__((__unused__)), void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)), void *e __attribute__((__unused__)),
long s __attribute__((__unused__)), long s __attribute__((__unused__)),
long* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)), BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)), BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__)) BOOL c __attribute__((__unused__))
...@@ -65,7 +66,7 @@ pe_callback ( ...@@ -65,7 +66,7 @@ pe_callback (
void* extraargs __attribute__((__unused__)) void* extraargs __attribute__((__unused__))
) )
{ {
*bytes_freed = 0; *bytes_freed = bytes_to_free;
return 0; return 0;
} }
......
...@@ -12,6 +12,7 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -12,6 +12,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
void *v, void *v,
void *e __attribute__((__unused__)), void *e __attribute__((__unused__)),
long s __attribute__((__unused__)), long s __attribute__((__unused__)),
long* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)), BOOL w __attribute__((__unused__)),
BOOL keep, BOOL keep,
BOOL c __attribute__((__unused__)) BOOL c __attribute__((__unused__))
...@@ -49,6 +50,7 @@ other_flush (CACHEFILE f __attribute__((__unused__)), ...@@ -49,6 +50,7 @@ other_flush (CACHEFILE f __attribute__((__unused__)),
void *v __attribute__((__unused__)), void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)), void *e __attribute__((__unused__)),
long s __attribute__((__unused__)), long s __attribute__((__unused__)),
long* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)), BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)), BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__)) BOOL c __attribute__((__unused__))
...@@ -75,7 +77,7 @@ pe_callback ( ...@@ -75,7 +77,7 @@ pe_callback (
void* extraargs __attribute__((__unused__)) void* extraargs __attribute__((__unused__))
) )
{ {
*bytes_freed = 1; *bytes_freed = bytes_to_free-1;
expected_bytes_to_free--; expected_bytes_to_free--;
int* foo = brtnode_pv; int* foo = brtnode_pv;
int blah = *foo; int blah = *foo;
......
...@@ -12,6 +12,7 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -12,6 +12,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
void* UU(v), void* UU(v),
void *e __attribute__((__unused__)), void *e __attribute__((__unused__)),
long s __attribute__((__unused__)), long s __attribute__((__unused__)),
long* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)), BOOL w __attribute__((__unused__)),
BOOL keep, BOOL keep,
BOOL c __attribute__((__unused__)) BOOL c __attribute__((__unused__))
...@@ -49,6 +50,7 @@ other_flush (CACHEFILE f __attribute__((__unused__)), ...@@ -49,6 +50,7 @@ other_flush (CACHEFILE f __attribute__((__unused__)),
void *v __attribute__((__unused__)), void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)), void *e __attribute__((__unused__)),
long s __attribute__((__unused__)), long s __attribute__((__unused__)),
long* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)), BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)), BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__)) BOOL c __attribute__((__unused__))
...@@ -75,7 +77,7 @@ pe_callback ( ...@@ -75,7 +77,7 @@ pe_callback (
void* extraargs __attribute__((__unused__)) void* extraargs __attribute__((__unused__))
) )
{ {
*bytes_freed = 1; *bytes_freed = bytes_to_free-1;
printf("calling pe_callback\n"); printf("calling pe_callback\n");
expected_bytes_to_free--; expected_bytes_to_free--;
int* foo = brtnode_pv; int* foo = brtnode_pv;
...@@ -92,6 +94,7 @@ other_pe_callback ( ...@@ -92,6 +94,7 @@ other_pe_callback (
void* extraargs __attribute__((__unused__)) void* extraargs __attribute__((__unused__))
) )
{ {
*bytes_freed = bytes_to_free;
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) { static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment