Commit 2cd8b437 authored by John Esmet's avatar John Esmet

FT-249 FT-256 Add a message buffer class to replace FIFO. Use a real

functor instead of a macro for iterate.
parent 4f1762f8
...@@ -30,7 +30,6 @@ set(FT_SOURCES ...@@ -30,7 +30,6 @@ set(FT_SOURCES
cachetable cachetable
checkpoint checkpoint
compress compress
fifo
ft ft
ft-cachetable-wrappers ft-cachetable-wrappers
ft-flusher ft-flusher
...@@ -52,6 +51,7 @@ set(FT_SOURCES ...@@ -52,6 +51,7 @@ set(FT_SOURCES
logfilemgr logfilemgr
logger logger
log_upgrade log_upgrade
msg_buffer
quicklz quicklz
recover recover
rollback rollback
......
...@@ -198,9 +198,6 @@ void block_allocator_print (BLOCK_ALLOCATOR ba); ...@@ -198,9 +198,6 @@ void block_allocator_print (BLOCK_ALLOCATOR ba);
uint64_t block_allocator_allocated_limit (BLOCK_ALLOCATOR ba); uint64_t block_allocator_allocated_limit (BLOCK_ALLOCATOR ba);
// Effect: Return the unallocated block address of "infinite" size. // Effect: Return the unallocated block address of "infinite" size.
// That is, return the smallest address that is above all the allocated blocks. // That is, return the smallest address that is above all the allocated blocks.
// Rationale: When writing the root FIFO we don't know how big the block is.
// So we start at the "infinite" block, write the fifo, and then
// allocate_block_at of the correct size and offset to account for the root FIFO.
int block_allocator_get_nth_block_in_layout_order (BLOCK_ALLOCATOR ba, uint64_t b, uint64_t *offset, uint64_t *size); int block_allocator_get_nth_block_in_layout_order (BLOCK_ALLOCATOR ba, uint64_t b, uint64_t *offset, uint64_t *size);
// Effect: Consider the blocks in sorted order. The reserved block at the beginning is number 0. The next one is number 1 and so forth. // Effect: Consider the blocks in sorted order. The reserved block at the beginning is number 0. The next one is number 1 and so forth.
......
...@@ -106,7 +106,6 @@ PATENT RIGHTS GRANT: ...@@ -106,7 +106,6 @@ PATENT RIGHTS GRANT:
#include "ft_layout_version.h" #include "ft_layout_version.h"
#include "block_allocator.h" #include "block_allocator.h"
#include "cachetable.h" #include "cachetable.h"
#include "fifo.h"
#include "ft-ops.h" #include "ft-ops.h"
#include "toku_list.h" #include "toku_list.h"
#include <util/omt.h> #include <util/omt.h>
...@@ -118,6 +117,7 @@ PATENT RIGHTS GRANT: ...@@ -118,6 +117,7 @@ PATENT RIGHTS GRANT:
#include "ft/bndata.h" #include "ft/bndata.h"
#include "ft/rollback.h" #include "ft/rollback.h"
#include "ft/ft-search.h" #include "ft/ft-search.h"
#include "ft/msg_buffer.h"
enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */ enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */
enum { FT_MSG_OVERHEAD = (2 + sizeof(MSN)) }; // the type plus freshness plus MSN enum { FT_MSG_OVERHEAD = (2 + sizeof(MSN)) }; // the type plus freshness plus MSN
...@@ -207,10 +207,10 @@ struct ftnode_fetch_extra { ...@@ -207,10 +207,10 @@ struct ftnode_fetch_extra {
}; };
typedef struct ftnode_fetch_extra *FTNODE_FETCH_EXTRA; typedef struct ftnode_fetch_extra *FTNODE_FETCH_EXTRA;
struct toku_fifo_entry_key_msn_heaviside_extra { struct toku_msg_buffer_key_msn_heaviside_extra {
DESCRIPTOR desc; DESCRIPTOR desc;
ft_compare_func cmp; ft_compare_func cmp;
FIFO fifo; message_buffer *msg_buffer;
const DBT *key; const DBT *key;
MSN msn; MSN msn;
}; };
...@@ -218,24 +218,24 @@ struct toku_fifo_entry_key_msn_heaviside_extra { ...@@ -218,24 +218,24 @@ struct toku_fifo_entry_key_msn_heaviside_extra {
// comparison function for inserting messages into a // comparison function for inserting messages into a
// ftnode_nonleaf_childinfo's message_tree // ftnode_nonleaf_childinfo's message_tree
int int
toku_fifo_entry_key_msn_heaviside(const int32_t &v, const struct toku_fifo_entry_key_msn_heaviside_extra &extra); toku_msg_buffer_key_msn_heaviside(const int32_t &v, const struct toku_msg_buffer_key_msn_heaviside_extra &extra);
struct toku_fifo_entry_key_msn_cmp_extra { struct toku_msg_buffer_key_msn_cmp_extra {
DESCRIPTOR desc; DESCRIPTOR desc;
ft_compare_func cmp; ft_compare_func cmp;
FIFO fifo; message_buffer *msg_buffer;
}; };
// same thing for qsort_r // same thing for qsort_r
int int
toku_fifo_entry_key_msn_cmp(const struct toku_fifo_entry_key_msn_cmp_extra &extrap, const int &a, const int &b); toku_msg_buffer_key_msn_cmp(const struct toku_msg_buffer_key_msn_cmp_extra &extrap, const int &a, const int &b);
typedef toku::omt<int32_t> off_omt_t; typedef toku::omt<int32_t> off_omt_t;
typedef toku::omt<int32_t, int32_t, true> marked_off_omt_t; typedef toku::omt<int32_t, int32_t, true> marked_off_omt_t;
// data of an available partition of a nonleaf ftnode // data of an available partition of a nonleaf ftnode
struct ftnode_nonleaf_childinfo { struct ftnode_nonleaf_childinfo {
FIFO buffer; message_buffer msg_buffer;
off_omt_t broadcast_list; off_omt_t broadcast_list;
marked_off_omt_t fresh_message_tree; marked_off_omt_t fresh_message_tree;
off_omt_t stale_message_tree; off_omt_t stale_message_tree;
...@@ -946,9 +946,6 @@ bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancesto ...@@ -946,9 +946,6 @@ bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancesto
__attribute__((nonnull)) __attribute__((nonnull))
void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read); void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read);
__attribute__((const,nonnull))
size_t toku_ft_msg_memsize_in_fifo(FT_MSG msg);
int int
toku_ft_search_which_child( toku_ft_search_which_child(
DESCRIPTOR desc, DESCRIPTOR desc,
......
...@@ -168,7 +168,7 @@ Split_or_merge (node, childnum) { ...@@ -168,7 +168,7 @@ Split_or_merge (node, childnum) {
return; return;
If the child needs to be merged (it's a leaf with too little stuff (less than 1/4 full) or a nonleaf with too little fanout (less than 1/4) If the child needs to be merged (it's a leaf with too little stuff (less than 1/4 full) or a nonleaf with too little fanout (less than 1/4)
fetch node, the child and a sibling of the child into main memory. fetch node, the child and a sibling of the child into main memory.
move all messages from the node to the two children (so that the FIFOs are empty) move all messages from the node to the two children (so that the message buffers are empty)
If the two siblings together fit into one node then If the two siblings together fit into one node then
merge the two siblings. merge the two siblings.
fixup the node to point at one child fixup the node to point at one child
...@@ -491,7 +491,7 @@ get_node_reactivity(FT ft, FTNODE node) { ...@@ -491,7 +491,7 @@ get_node_reactivity(FT ft, FTNODE node) {
unsigned int unsigned int
toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc) toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc)
{ {
return toku_fifo_buffer_size_in_use(bnc->buffer); return bnc->msg_buffer.buffer_size_in_use();
} }
// return true if the size of the buffers plus the amount of work done is large enough. (But return false if there is nothing to be flushed (the buffers empty)). // return true if the size of the buffers plus the amount of work done is large enough. (But return false if there is nothing to be flushed (the buffers empty)).
...@@ -538,7 +538,7 @@ uint32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum) { ...@@ -538,7 +538,7 @@ uint32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum) {
int int
toku_bnc_n_entries(NONLEAF_CHILDINFO bnc) toku_bnc_n_entries(NONLEAF_CHILDINFO bnc)
{ {
return toku_fifo_n_entries(bnc->buffer); return bnc->msg_buffer.num_entries();
} }
static const DBT *prepivotkey (FTNODE node, int childnum, const DBT * const lower_bound_exclusive) { static const DBT *prepivotkey (FTNODE node, int childnum, const DBT * const lower_bound_exclusive) {
...@@ -567,7 +567,7 @@ long ...@@ -567,7 +567,7 @@ long
toku_bnc_memory_size(NONLEAF_CHILDINFO bnc) toku_bnc_memory_size(NONLEAF_CHILDINFO bnc)
{ {
return (sizeof(*bnc) + return (sizeof(*bnc) +
toku_fifo_memory_footprint(bnc->buffer) + bnc->msg_buffer.memory_footprint() +
bnc->fresh_message_tree.memory_size() + bnc->fresh_message_tree.memory_size() +
bnc->stale_message_tree.memory_size() + bnc->stale_message_tree.memory_size() +
bnc->broadcast_list.memory_size()); bnc->broadcast_list.memory_size());
...@@ -579,7 +579,7 @@ long ...@@ -579,7 +579,7 @@ long
toku_bnc_memory_used(NONLEAF_CHILDINFO bnc) toku_bnc_memory_used(NONLEAF_CHILDINFO bnc)
{ {
return (sizeof(*bnc) + return (sizeof(*bnc) +
toku_fifo_memory_size_in_use(bnc->buffer) + bnc->msg_buffer.memory_size_in_use() +
bnc->fresh_message_tree.memory_size() + bnc->fresh_message_tree.memory_size() +
bnc->stale_message_tree.memory_size() + bnc->stale_message_tree.memory_size() +
bnc->broadcast_list.memory_size()); bnc->broadcast_list.memory_size());
...@@ -2162,46 +2162,43 @@ key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, ...@@ -2162,46 +2162,43 @@ key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn,
} }
int int
toku_fifo_entry_key_msn_heaviside(const int32_t &offset, const struct toku_fifo_entry_key_msn_heaviside_extra &extra) toku_msg_buffer_key_msn_heaviside(const int32_t &offset, const struct toku_msg_buffer_key_msn_heaviside_extra &extra)
{ {
const struct fifo_entry *query = toku_fifo_get_entry(extra.fifo, offset); MSN query_msn;
DBT qdbt; DBT query_key;
const DBT *query_key = fill_dbt_for_fifo_entry(&qdbt, query); extra.msg_buffer->get_message_key_msn(offset, &query_key, &query_msn);
const DBT *target_key = extra.key; return key_msn_cmp(&query_key, extra.key, query_msn, extra.msn,
return key_msn_cmp(query_key, target_key, query->msn, extra.msn,
extra.desc, extra.cmp); extra.desc, extra.cmp);
} }
int int
toku_fifo_entry_key_msn_cmp(const struct toku_fifo_entry_key_msn_cmp_extra &extra, const int32_t &ao, const int32_t &bo) toku_msg_buffer_key_msn_cmp(const struct toku_msg_buffer_key_msn_cmp_extra &extra, const int32_t &ao, const int32_t &bo)
{ {
const struct fifo_entry *a = toku_fifo_get_entry(extra.fifo, ao); MSN amsn, bmsn;
const struct fifo_entry *b = toku_fifo_get_entry(extra.fifo, bo); DBT akey, bkey;
DBT adbt, bdbt; extra.msg_buffer->get_message_key_msn(ao, &akey, &amsn);
const DBT *akey = fill_dbt_for_fifo_entry(&adbt, a); extra.msg_buffer->get_message_key_msn(bo, &bkey, &bmsn);
const DBT *bkey = fill_dbt_for_fifo_entry(&bdbt, b); return key_msn_cmp(&akey, &bkey, amsn, bmsn,
return key_msn_cmp(akey, bkey, a->msn, b->msn,
extra.desc, extra.cmp); extra.desc, extra.cmp);
} }
void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp)
// Effect: Enqueue the message represented by the parameters into the // Effect: Enqueue the message represented by the parameters into the
// bnc's buffer, and put it in either the fresh or stale message tree, // bnc's buffer, and put it in either the fresh or stale message tree,
// or the broadcast list. // or the broadcast list.
// static void bnc_insert_msg(NONLEAF_CHILDINFO bnc, FT_MSG msg, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp) {
// This is only exported for tests. int r = 0;
{
int32_t offset; int32_t offset;
int r = toku_fifo_enq(bnc->buffer, key, keylen, data, datalen, type, msn, xids, is_fresh, &offset); bnc->msg_buffer.enqueue(msg, is_fresh, &offset);
assert_zero(r); enum ft_msg_type type = ft_msg_get_type(msg);
if (ft_msg_type_applies_once(type)) { if (ft_msg_type_applies_once(type)) {
DBT keydbt; DBT key;
struct toku_fifo_entry_key_msn_heaviside_extra extra = { .desc = desc, .cmp = cmp, .fifo = bnc->buffer, .key = toku_fill_dbt(&keydbt, key, keylen), .msn = msn }; toku_fill_dbt(&key, ft_msg_get_key(msg), ft_msg_get_keylen(msg));
struct toku_msg_buffer_key_msn_heaviside_extra extra = { .desc = desc, .cmp = cmp, .msg_buffer = &bnc->msg_buffer, .key = &key, .msn = msg->msn };
if (is_fresh) { if (is_fresh) {
r = bnc->fresh_message_tree.insert<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(offset, extra, nullptr); r = bnc->fresh_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
assert_zero(r); assert_zero(r);
} else { } else {
r = bnc->stale_message_tree.insert<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(offset, extra, nullptr); r = bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
assert_zero(r); assert_zero(r);
} }
} else { } else {
...@@ -2212,14 +2209,32 @@ void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, ...@@ -2212,14 +2209,32 @@ void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen,
} }
} }
// This is only exported for tests.
void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp)
{
DBT k, v;
FT_MSG_S msg = {
type, msn, xids, .u = { .id = { toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen) } }
};
bnc_insert_msg(bnc, &msg, is_fresh, desc, cmp);
}
// append a msg to a nonleaf node's child buffer // append a msg to a nonleaf node's child buffer
// should be static, but used by test programs static void ft_append_msg_to_child_buffer(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node,
void toku_ft_append_to_child_buffer(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) { int childnum, FT_MSG msg, bool is_fresh) {
paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL); paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL);
toku_bnc_insert_msg(BNC(node, childnum), key->data, key->size, val->data, val->size, type, msn, xids, is_fresh, desc, compare_fun); bnc_insert_msg(BNC(node, childnum), msg, is_fresh, desc, compare_fun);
node->dirty = 1; node->dirty = 1;
} }
// This is only exported for tests.
void toku_ft_append_to_child_buffer(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) {
FT_MSG_S msg = {
type, msn, xids, .u = { .id = { key, val } }
};
ft_append_msg_to_child_buffer(compare_fun, desc, node, childnum, &msg, is_fresh);
}
static void ft_nonleaf_msg_once_to_child(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int target_childnum, FT_MSG msg, bool is_fresh, size_t flow_deltas[]) static void ft_nonleaf_msg_once_to_child(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int target_childnum, FT_MSG msg, bool is_fresh, size_t flow_deltas[])
// Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here. // Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here.
// Also we don't worry about the node getting overfull here. It's the caller's problem. // Also we don't worry about the node getting overfull here. It's the caller's problem.
...@@ -2227,7 +2242,7 @@ static void ft_nonleaf_msg_once_to_child(ft_compare_func compare_fun, DESCRIPTOR ...@@ -2227,7 +2242,7 @@ static void ft_nonleaf_msg_once_to_child(ft_compare_func compare_fun, DESCRIPTOR
unsigned int childnum = (target_childnum >= 0 unsigned int childnum = (target_childnum >= 0
? target_childnum ? target_childnum
: toku_ftnode_which_child(node, msg->u.id.key, desc, compare_fun)); : toku_ftnode_which_child(node, msg->u.id.key, desc, compare_fun));
toku_ft_append_to_child_buffer(compare_fun, desc, node, childnum, msg->type, msg->msn, msg->xids, is_fresh, msg->u.id.key, msg->u.id.val); ft_append_msg_to_child_buffer(compare_fun, desc, node, childnum, msg, is_fresh);
NONLEAF_CHILDINFO bnc = BNC(node, childnum); NONLEAF_CHILDINFO bnc = BNC(node, childnum);
bnc->flow[0] += flow_deltas[0]; bnc->flow[0] += flow_deltas[0];
bnc->flow[1] += flow_deltas[1]; bnc->flow[1] += flow_deltas[1];
...@@ -2514,8 +2529,6 @@ void toku_bnc_flush_to_child( ...@@ -2514,8 +2529,6 @@ void toku_bnc_flush_to_child(
) )
{ {
paranoid_invariant(bnc); paranoid_invariant(bnc);
STAT64INFO_S stats_delta = {0,0};
size_t remaining_memsize = toku_fifo_buffer_size_in_use(bnc->buffer);
TOKULOGGER logger = toku_cachefile_logger(ft->cf); TOKULOGGER logger = toku_cachefile_logger(ft->cf);
TXN_MANAGER txn_manager = logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr; TXN_MANAGER txn_manager = logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
...@@ -2531,21 +2544,30 @@ void toku_bnc_flush_to_child( ...@@ -2531,21 +2544,30 @@ void toku_bnc_flush_to_child(
oldest_referenced_xid_for_simple_gc, oldest_referenced_xid_for_simple_gc,
child->oldest_referenced_xid_known, child->oldest_referenced_xid_known,
true); true);
FIFO_ITERATE( struct flush_msg_fn {
bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, FT ft;
({ FTNODE child;
DBT hk,hv; NONLEAF_CHILDINFO bnc;
FT_MSG_S ftmsg = { type, msn, xids, .u = { .id = { toku_fill_dbt(&hk, key, keylen), txn_gc_info *gc_info;
toku_fill_dbt(&hv, val, vallen) } } };
STAT64INFO_S stats_delta;
size_t remaining_memsize = bnc->msg_buffer.buffer_size_in_use();
flush_msg_fn(FT t, FTNODE n, NONLEAF_CHILDINFO nl, txn_gc_info *g) :
ft(t), child(n), bnc(nl), gc_info(g), remaining_memsize(bnc->msg_buffer.buffer_size_in_use()) {
stats_delta = { 0, 0 };
}
int operator()(FT_MSG msg, bool is_fresh) {
size_t flow_deltas[] = { 0, 0 }; size_t flow_deltas[] = { 0, 0 };
size_t memsize_in_buffer = message_buffer::msg_memsize_in_buffer(msg);
if (remaining_memsize <= bnc->flow[0]) { if (remaining_memsize <= bnc->flow[0]) {
// this message is in the current checkpoint's worth of // this message is in the current checkpoint's worth of
// the end of the fifo // the end of the message buffer
flow_deltas[0] = FIFO_CURRENT_ENTRY_MEMSIZE; flow_deltas[0] = memsize_in_buffer;
} else if (remaining_memsize <= bnc->flow[0] + bnc->flow[1]) { } else if (remaining_memsize <= bnc->flow[0] + bnc->flow[1]) {
// this message is in the last checkpoint's worth of the // this message is in the last checkpoint's worth of the
// end of the fifo // end of the message buffer
flow_deltas[1] = FIFO_CURRENT_ENTRY_MEMSIZE; flow_deltas[1] = memsize_in_buffer;
} }
toku_ft_node_put_msg( toku_ft_node_put_msg(
ft->compare_fun, ft->compare_fun,
...@@ -2553,22 +2575,26 @@ void toku_bnc_flush_to_child( ...@@ -2553,22 +2575,26 @@ void toku_bnc_flush_to_child(
&ft->cmp_descriptor, &ft->cmp_descriptor,
child, child,
-1, -1,
&ftmsg, msg,
is_fresh, is_fresh,
&gc_info, gc_info,
flow_deltas, flow_deltas,
&stats_delta &stats_delta
); );
remaining_memsize -= FIFO_CURRENT_ENTRY_MEMSIZE; remaining_memsize -= memsize_in_buffer;
})); return 0;
}
} flush_fn(ft, child, bnc, &gc_info);
bnc->msg_buffer.iterate(flush_fn);
child->oldest_referenced_xid_known = parent_oldest_referenced_xid_known; child->oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
invariant(remaining_memsize == 0); invariant(flush_fn.remaining_memsize == 0);
if (stats_delta.numbytes || stats_delta.numrows) { if (flush_fn.stats_delta.numbytes || flush_fn.stats_delta.numrows) {
toku_ft_update_stats(&ft->in_memory_stats, stats_delta); toku_ft_update_stats(&ft->in_memory_stats, flush_fn.stats_delta);
} }
if (do_garbage_collection) { if (do_garbage_collection) {
size_t buffsize = toku_fifo_buffer_size_in_use(bnc->buffer); size_t buffsize = bnc->msg_buffer.buffer_size_in_use();
STATUS_INC(FT_MSG_BYTES_OUT, buffsize); STATUS_INC(FT_MSG_BYTES_OUT, buffsize);
// may be misleading if there's a broadcast message in there // may be misleading if there's a broadcast message in there
STATUS_INC(FT_MSG_BYTES_CURR, -buffsize); STATUS_INC(FT_MSG_BYTES_CURR, -buffsize);
...@@ -2597,7 +2623,7 @@ toku_ft_node_put_msg ( ...@@ -2597,7 +2623,7 @@ toku_ft_node_put_msg (
// Effect: Push message into the subtree rooted at NODE. // Effect: Push message into the subtree rooted at NODE.
// If NODE is a leaf, then // If NODE is a leaf, then
// put message into leaf, applying it to the leafentries // put message into leaf, applying it to the leafentries
// If NODE is a nonleaf, then push the message into the FIFO(s) of the relevent child(ren). // If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren).
// The node may become overfull. That's not our problem. // The node may become overfull. That's not our problem.
{ {
toku_assert_entire_node_in_memory(node); toku_assert_entire_node_in_memory(node);
...@@ -3192,7 +3218,7 @@ void toku_ft_root_put_msg( ...@@ -3192,7 +3218,7 @@ void toku_ft_root_put_msg(
struct ftnode_fetch_extra bfe; struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, ft); fill_bfe_for_full_read(&bfe, ft);
size_t flow_deltas[] = { toku_ft_msg_memsize_in_fifo(msg), 0 }; size_t flow_deltas[] = { message_buffer::msg_memsize_in_buffer(msg), 0 };
pair_lock_type lock_type; pair_lock_type lock_type;
lock_type = PL_READ; // try first for a read lock lock_type = PL_READ; // try first for a read lock
...@@ -4656,13 +4682,13 @@ is_le_val_del(LEAFENTRY le, FT_CURSOR ftcursor) { ...@@ -4656,13 +4682,13 @@ is_le_val_del(LEAFENTRY le, FT_CURSOR ftcursor) {
return rval; return rval;
} }
struct store_fifo_offset_extra { struct store_msg_buffer_offset_extra {
int32_t *offsets; int32_t *offsets;
int i; int i;
}; };
int store_fifo_offset(const int32_t &offset, const uint32_t UU(idx), struct store_fifo_offset_extra *const extra) __attribute__((nonnull(3))); int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra) __attribute__((nonnull(3)));
int store_fifo_offset(const int32_t &offset, const uint32_t UU(idx), struct store_fifo_offset_extra *const extra) int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra)
{ {
extra->offsets[extra->i] = offset; extra->offsets[extra->i] = offset;
extra->i++; extra->i++;
...@@ -4670,55 +4696,46 @@ int store_fifo_offset(const int32_t &offset, const uint32_t UU(idx), struct stor ...@@ -4670,55 +4696,46 @@ int store_fifo_offset(const int32_t &offset, const uint32_t UU(idx), struct stor
} }
/** /**
* Given pointers to offsets within a FIFO where we can find messages, * Given pointers to offsets within a message buffer where we can find messages,
* figure out the MSN of each message, and compare those MSNs. Returns 1, * figure out the MSN of each message, and compare those MSNs. Returns 1,
* 0, or -1 if a is larger than, equal to, or smaller than b. * 0, or -1 if a is larger than, equal to, or smaller than b.
*/ */
int fifo_offset_msn_cmp(FIFO &fifo, const int32_t &ao, const int32_t &bo); int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo);
int fifo_offset_msn_cmp(FIFO &fifo, const int32_t &ao, const int32_t &bo) int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo)
{ {
const struct fifo_entry *a = toku_fifo_get_entry(fifo, ao); MSN amsn, bmsn;
const struct fifo_entry *b = toku_fifo_get_entry(fifo, bo); msg_buffer.get_message_key_msn(ao, nullptr, &amsn);
if (a->msn.msn > b->msn.msn) { msg_buffer.get_message_key_msn(bo, nullptr, &bmsn);
if (amsn.msn > bmsn.msn) {
return +1; return +1;
} }
if (a->msn.msn < b->msn.msn) { if (amsn.msn < bmsn.msn) {
return -1; return -1;
} }
return 0; return 0;
} }
/** /**
* Given a fifo_entry, either decompose it into its parameters and call * Given a message buffer and and offset, apply the message with toku_ft_bn_apply_msg, or discard it,
* toku_ft_bn_apply_msg, or discard it, based on its MSN and the MSN of the * based on its MSN and the MSN of the basement node.
* basement node.
*/ */
static void static void
do_bn_apply_msg(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, txn_gc_info *gc_info, uint64_t *workdone, STAT64INFO stats_to_update) do_bn_apply_msg(FT_HANDLE ft_handle, BASEMENTNODE bn, message_buffer *msg_buffer, int32_t offset,
{ txn_gc_info *gc_info, uint64_t *workdone, STAT64INFO stats_to_update) {
DBT k, v;
FT_MSG_S msg = msg_buffer->get_message(offset, &k, &v);
// The messages are being iterated over in (key,msn) order or just in // The messages are being iterated over in (key,msn) order or just in
// msn order, so all the messages for one key, from one buffer, are in // msn order, so all the messages for one key, from one buffer, are in
// ascending msn order. So it's ok that we don't update the basement // ascending msn order. So it's ok that we don't update the basement
// node's msn until the end. // node's msn until the end.
if (entry->msn.msn > bn->max_msn_applied.msn) { if (msg.msn.msn > bn->max_msn_applied.msn) {
ITEMLEN keylen = entry->keylen;
ITEMLEN vallen = entry->vallen;
enum ft_msg_type type = (enum ft_msg_type) entry->type;
MSN msn = entry->msn;
const XIDS xids = (XIDS) &entry->xids_s;
bytevec key = xids_get_end_of_array(xids);
bytevec val = (uint8_t*)key + entry->keylen;
DBT hk;
toku_fill_dbt(&hk, key, keylen);
DBT hv;
FT_MSG_S ftmsg = { type, msn, xids, .u = { .id = { &hk, toku_fill_dbt(&hv, val, vallen) } } };
toku_ft_bn_apply_msg( toku_ft_bn_apply_msg(
t->ft->compare_fun, ft_handle->ft->compare_fun,
t->ft->update_fun, ft_handle->ft->update_fun,
&t->ft->cmp_descriptor, &ft_handle->ft->cmp_descriptor,
bn, bn,
&ftmsg, &msg,
gc_info, gc_info,
workdone, workdone,
stats_to_update stats_to_update
...@@ -4726,13 +4743,15 @@ do_bn_apply_msg(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, txn_gc_i ...@@ -4726,13 +4743,15 @@ do_bn_apply_msg(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, txn_gc_i
} else { } else {
STATUS_INC(FT_MSN_DISCARDS, 1); STATUS_INC(FT_MSN_DISCARDS, 1);
} }
// We must always mark entry as stale since it has been marked
// We must always mark message as stale since it has been marked
// (using omt::iterate_and_mark_range) // (using omt::iterate_and_mark_range)
// It is possible to call do_bn_apply_msg even when it won't apply the message because // It is possible to call do_bn_apply_msg even when it won't apply the message because
// the node containing it could have been evicted and brought back in. // the node containing it could have been evicted and brought back in.
entry->is_fresh = false; msg_buffer->set_freshness(offset, false);
} }
struct iterate_do_bn_apply_msg_extra { struct iterate_do_bn_apply_msg_extra {
FT_HANDLE t; FT_HANDLE t;
BASEMENTNODE bn; BASEMENTNODE bn;
...@@ -4745,8 +4764,7 @@ struct iterate_do_bn_apply_msg_extra { ...@@ -4745,8 +4764,7 @@ struct iterate_do_bn_apply_msg_extra {
int iterate_do_bn_apply_msg(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_msg_extra *const e) __attribute__((nonnull(3))); int iterate_do_bn_apply_msg(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_msg_extra *const e) __attribute__((nonnull(3)));
int iterate_do_bn_apply_msg(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_msg_extra *const e) int iterate_do_bn_apply_msg(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_msg_extra *const e)
{ {
struct fifo_entry *entry = toku_fifo_get_entry(e->bnc->buffer, offset); do_bn_apply_msg(e->t, e->bn, &e->bnc->msg_buffer, offset, e->gc_info, e->workdone, e->stats_to_update);
do_bn_apply_msg(e->t, e->bn, entry, e->gc_info, e->workdone, e->stats_to_update);
return 0; return 0;
} }
...@@ -4770,8 +4788,8 @@ static void ...@@ -4770,8 +4788,8 @@ static void
find_bounds_within_message_tree( find_bounds_within_message_tree(
DESCRIPTOR desc, /// used for cmp DESCRIPTOR desc, /// used for cmp
ft_compare_func cmp, /// used to compare keys ft_compare_func cmp, /// used to compare keys
const find_bounds_omt_t &message_tree, /// tree holding FIFO offsets, in which we want to look for indices const find_bounds_omt_t &message_tree, /// tree holding message buffer offsets, in which we want to look for indices
FIFO buffer, /// buffer in which messages are found message_buffer *msg_buffer, /// message buffer in which messages are found
struct pivot_bounds const * const bounds, /// key bounds within the basement node we're applying messages to struct pivot_bounds const * const bounds, /// key bounds within the basement node we're applying messages to
uint32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree) uint32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree)
uint32_t *ube /// (output) "upper bound exclusive" (index into message_tree) uint32_t *ube /// (output) "upper bound exclusive" (index into message_tree)
...@@ -4785,15 +4803,15 @@ find_bounds_within_message_tree( ...@@ -4785,15 +4803,15 @@ find_bounds_within_message_tree(
// message (with any msn) with the key lower_bound_exclusive. // message (with any msn) with the key lower_bound_exclusive.
// This will be a message we want to try applying, so it is the // This will be a message we want to try applying, so it is the
// "lower bound inclusive" within the message_tree. // "lower bound inclusive" within the message_tree.
struct toku_fifo_entry_key_msn_heaviside_extra lbi_extra; struct toku_msg_buffer_key_msn_heaviside_extra lbi_extra;
ZERO_STRUCT(lbi_extra); ZERO_STRUCT(lbi_extra);
lbi_extra.desc = desc; lbi_extra.desc = desc;
lbi_extra.cmp = cmp; lbi_extra.cmp = cmp;
lbi_extra.fifo = buffer; lbi_extra.msg_buffer = msg_buffer;
lbi_extra.key = bounds->lower_bound_exclusive; lbi_extra.key = bounds->lower_bound_exclusive;
lbi_extra.msn = MAX_MSN; lbi_extra.msn = MAX_MSN;
int32_t found_lb; int32_t found_lb;
r = message_tree.template find<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi); r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
// There is no relevant data (the lower bound is bigger than // There is no relevant data (the lower bound is bigger than
// any message in this tree), so we have no range and we're // any message in this tree), so we have no range and we're
...@@ -4809,7 +4827,7 @@ find_bounds_within_message_tree( ...@@ -4809,7 +4827,7 @@ find_bounds_within_message_tree(
const DBT *ubi = bounds->upper_bound_inclusive; const DBT *ubi = bounds->upper_bound_inclusive;
const int32_t offset = found_lb; const int32_t offset = found_lb;
DBT found_lbidbt; DBT found_lbidbt;
fill_dbt_for_fifo_entry(&found_lbidbt, toku_fifo_get_entry(buffer, offset)); msg_buffer->get_message_key_msn(offset, &found_lbidbt, nullptr);
FAKE_DB(db, desc); FAKE_DB(db, desc);
int c = cmp(&db, &found_lbidbt, ubi); int c = cmp(&db, &found_lbidbt, ubi);
// These DBTs really are both inclusive bounds, so we need // These DBTs really are both inclusive bounds, so we need
...@@ -4833,14 +4851,14 @@ find_bounds_within_message_tree( ...@@ -4833,14 +4851,14 @@ find_bounds_within_message_tree(
// the first thing bigger than the upper_bound_inclusive key. // the first thing bigger than the upper_bound_inclusive key.
// This is therefore the smallest thing we don't want to apply, // This is therefore the smallest thing we don't want to apply,
// and omt::iterate_on_range will not examine it. // and omt::iterate_on_range will not examine it.
struct toku_fifo_entry_key_msn_heaviside_extra ube_extra; struct toku_msg_buffer_key_msn_heaviside_extra ube_extra;
ZERO_STRUCT(ube_extra); ZERO_STRUCT(ube_extra);
ube_extra.desc = desc; ube_extra.desc = desc;
ube_extra.cmp = cmp; ube_extra.cmp = cmp;
ube_extra.fifo = buffer; ube_extra.msg_buffer = msg_buffer;
ube_extra.key = bounds->upper_bound_inclusive; ube_extra.key = bounds->upper_bound_inclusive;
ube_extra.msn = MAX_MSN; ube_extra.msn = MAX_MSN;
r = message_tree.template find<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(ube_extra, +1, nullptr, ube); r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
// Couldn't find anything in the buffer bigger than our key, // Couldn't find anything in the buffer bigger than our key,
// so we need to look at everything up to the end of // so we need to look at everything up to the end of
...@@ -4882,13 +4900,13 @@ bnc_apply_messages_to_basement_node( ...@@ -4882,13 +4900,13 @@ bnc_apply_messages_to_basement_node(
uint32_t stale_lbi, stale_ube; uint32_t stale_lbi, stale_ube;
if (!bn->stale_ancestor_messages_applied) { if (!bn->stale_ancestor_messages_applied) {
find_bounds_within_message_tree(&t->ft->cmp_descriptor, t->ft->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbi, &stale_ube); find_bounds_within_message_tree(&t->ft->cmp_descriptor, t->ft->compare_fun, bnc->stale_message_tree, &bnc->msg_buffer, bounds, &stale_lbi, &stale_ube);
} else { } else {
stale_lbi = 0; stale_lbi = 0;
stale_ube = 0; stale_ube = 0;
} }
uint32_t fresh_lbi, fresh_ube; uint32_t fresh_lbi, fresh_ube;
find_bounds_within_message_tree(&t->ft->cmp_descriptor, t->ft->compare_fun, bnc->fresh_message_tree, bnc->buffer, bounds, &fresh_lbi, &fresh_ube); find_bounds_within_message_tree(&t->ft->cmp_descriptor, t->ft->compare_fun, bnc->fresh_message_tree, &bnc->msg_buffer, bounds, &fresh_lbi, &fresh_ube);
// We now know where all the messages we must apply are, so one of the // We now know where all the messages we must apply are, so one of the
// following 4 cases will do the application, depending on which of // following 4 cases will do the application, depending on which of
...@@ -4905,30 +4923,29 @@ bnc_apply_messages_to_basement_node( ...@@ -4905,30 +4923,29 @@ bnc_apply_messages_to_basement_node(
const int buffer_size = ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + bnc->broadcast_list.size()); const int buffer_size = ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + bnc->broadcast_list.size());
toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t)); toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t));
int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get()); int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get());
struct store_fifo_offset_extra sfo_extra = { .offsets = offsets, .i = 0 }; struct store_msg_buffer_offset_extra sfo_extra = { .offsets = offsets, .i = 0 };
// Populate offsets array with offsets to stale messages // Populate offsets array with offsets to stale messages
r = bnc->stale_message_tree.iterate_on_range<struct store_fifo_offset_extra, store_fifo_offset>(stale_lbi, stale_ube, &sfo_extra); r = bnc->stale_message_tree.iterate_on_range<struct store_msg_buffer_offset_extra, store_msg_buffer_offset>(stale_lbi, stale_ube, &sfo_extra);
assert_zero(r); assert_zero(r);
// Then store fresh offsets, and mark them to be moved to stale later. // Then store fresh offsets, and mark them to be moved to stale later.
r = bnc->fresh_message_tree.iterate_and_mark_range<struct store_fifo_offset_extra, store_fifo_offset>(fresh_lbi, fresh_ube, &sfo_extra); r = bnc->fresh_message_tree.iterate_and_mark_range<struct store_msg_buffer_offset_extra, store_msg_buffer_offset>(fresh_lbi, fresh_ube, &sfo_extra);
assert_zero(r); assert_zero(r);
// Store offsets of all broadcast messages. // Store offsets of all broadcast messages.
r = bnc->broadcast_list.iterate<struct store_fifo_offset_extra, store_fifo_offset>(&sfo_extra); r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra, store_msg_buffer_offset>(&sfo_extra);
assert_zero(r); assert_zero(r);
invariant(sfo_extra.i == buffer_size); invariant(sfo_extra.i == buffer_size);
// Sort by MSN. // Sort by MSN.
r = toku::sort<int32_t, FIFO, fifo_offset_msn_cmp>::mergesort_r(offsets, buffer_size, bnc->buffer); r = toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>::mergesort_r(offsets, buffer_size, bnc->msg_buffer);
assert_zero(r); assert_zero(r);
// Apply the messages in MSN order. // Apply the messages in MSN order.
for (int i = 0; i < buffer_size; ++i) { for (int i = 0; i < buffer_size; ++i) {
*msgs_applied = true; *msgs_applied = true;
struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]); do_bn_apply_msg(t, bn, &bnc->msg_buffer, offsets[i], gc_info, &workdone_this_ancestor, &stats_delta);
do_bn_apply_msg(t, bn, entry, gc_info, &workdone_this_ancestor, &stats_delta);
} }
} else if (stale_lbi == stale_ube) { } else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later. // No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later.
...@@ -5084,7 +5101,7 @@ static bool bn_needs_ancestors_messages( ...@@ -5084,7 +5101,7 @@ static bool bn_needs_ancestors_messages(
find_bounds_within_message_tree(&ft->cmp_descriptor, find_bounds_within_message_tree(&ft->cmp_descriptor,
ft->compare_fun, ft->compare_fun,
bnc->stale_message_tree, bnc->stale_message_tree,
bnc->buffer, &bnc->msg_buffer,
&curr_bounds, &curr_bounds,
&stale_lbi, &stale_lbi,
&stale_ube); &stale_ube);
...@@ -5097,7 +5114,7 @@ static bool bn_needs_ancestors_messages( ...@@ -5097,7 +5114,7 @@ static bool bn_needs_ancestors_messages(
find_bounds_within_message_tree(&ft->cmp_descriptor, find_bounds_within_message_tree(&ft->cmp_descriptor,
ft->compare_fun, ft->compare_fun,
bnc->fresh_message_tree, bnc->fresh_message_tree,
bnc->buffer, &bnc->msg_buffer,
&curr_bounds, &curr_bounds,
&fresh_lbi, &fresh_lbi,
&fresh_ube); &fresh_ube);
...@@ -5208,11 +5225,11 @@ struct copy_to_stale_extra { ...@@ -5208,11 +5225,11 @@ struct copy_to_stale_extra {
int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra) __attribute__((nonnull(3))); int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra) __attribute__((nonnull(3)));
int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra) int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
{ {
struct fifo_entry *entry = toku_fifo_get_entry(extra->bnc->buffer, offset); MSN msn;
DBT keydbt; DBT key;
DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry); extra->bnc->msg_buffer.get_message_key_msn(offset, &key, &msn);
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->ft->cmp_descriptor, .cmp = extra->ft->compare_fun, .fifo = extra->bnc->buffer, .key = key, .msn = entry->msn }; struct toku_msg_buffer_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->ft->cmp_descriptor, .cmp = extra->ft->compare_fun, .msg_buffer = &extra->bnc->msg_buffer, .key = &key, .msn = msn };
int r = extra->bnc->stale_message_tree.insert<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(offset, heaviside_extra, nullptr); int r = extra->bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, heaviside_extra, nullptr);
invariant_zero(r); invariant_zero(r);
return 0; return 0;
} }
...@@ -6786,13 +6803,20 @@ toku_dump_ftnode (FILE *file, FT_HANDLE ft_handle, BLOCKNUM blocknum, int depth, ...@@ -6786,13 +6803,20 @@ toku_dump_ftnode (FILE *file, FT_HANDLE ft_handle, BLOCKNUM blocknum, int depth,
if (node->height > 0) { if (node->height > 0) {
NONLEAF_CHILDINFO bnc = BNC(node, i); NONLEAF_CHILDINFO bnc = BNC(node, i);
fprintf(file, "%*schild %d buffered (%d entries):", depth+1, "", i, toku_bnc_n_entries(bnc)); fprintf(file, "%*schild %d buffered (%d entries):", depth+1, "", i, toku_bnc_n_entries(bnc));
FIFO_ITERATE(bnc->buffer, key, keylen, data, datalen, type, msn, xids, UU(is_fresh), struct print_msg_fn {
{ FILE *file;
data=data; datalen=datalen; keylen=keylen; int depth;
fprintf(file, "%*s xid=%" PRIu64 " %u (type=%d) msn=0x%" PRIu64 "\n", depth+2, "", xids_get_innermost_xid(xids), (unsigned)toku_dtoh32(*(int*)key), type, msn.msn); print_msg_fn(FILE *f, int d) : file(f), depth(d) { }
//assert(strlen((char*)key)+1==keylen); int operator()(FT_MSG msg, bool UU(is_fresh)) {
//assert(strlen((char*)data)+1==datalen); fprintf(file, "%*s xid=%" PRIu64 " %u (type=%d) msn=0x%" PRIu64 "\n",
}); depth+2, "",
xids_get_innermost_xid(ft_msg_get_xids(msg)),
(unsigned)toku_dtoh32(*(int*)ft_msg_get_key(msg)),
ft_msg_get_type(msg), msg->msn.msn);
return 0;
}
} print_fn(file, depth);
bnc->msg_buffer.iterate(print_fn);
} }
else { else {
int size = BLB_DATA(node, i)->num_klpairs(); int size = BLB_DATA(node, i)->num_klpairs();
......
...@@ -120,6 +120,7 @@ static int ...@@ -120,6 +120,7 @@ static int
verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, bytevec key, ITEMLEN keylen, bytevec UU(data), ITEMLEN UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot) verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, bytevec key, ITEMLEN keylen, bytevec UU(data), ITEMLEN UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot)
__attribute__((warn_unused_result)); __attribute__((warn_unused_result));
UU()
static int static int
verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, bytevec key, ITEMLEN keylen, bytevec UU(data), ITEMLEN UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot) { verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, bytevec key, ITEMLEN keylen, bytevec UU(data), ITEMLEN UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot) {
int result = 0; int result = 0;
...@@ -169,7 +170,7 @@ get_ith_key_dbt (BASEMENTNODE bn, int i) { ...@@ -169,7 +170,7 @@ get_ith_key_dbt (BASEMENTNODE bn, int i) {
struct count_msgs_extra { struct count_msgs_extra {
int count; int count;
MSN msn; MSN msn;
FIFO fifo; message_buffer *msg_buffer;
}; };
// template-only function, but must be extern // template-only function, but must be extern
...@@ -177,15 +178,16 @@ int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_ ...@@ -177,15 +178,16 @@ int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_
__attribute__((nonnull(3))); __attribute__((nonnull(3)));
int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e) int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
{ {
const struct fifo_entry *entry = toku_fifo_get_entry(e->fifo, offset); MSN msn;
if (entry->msn.msn == e->msn.msn) { e->msg_buffer->get_message_key_msn(offset, nullptr, &msn);
if (msn.msn == e->msn.msn) {
e->count++; e->count++;
} }
return 0; return 0;
} }
struct verify_message_tree_extra { struct verify_message_tree_extra {
FIFO fifo; message_buffer *msg_buffer;
bool broadcast; bool broadcast;
bool is_fresh; bool is_fresh;
int i; int i;
...@@ -202,20 +204,22 @@ int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct ve ...@@ -202,20 +204,22 @@ int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct ve
BLOCKNUM blocknum = e->blocknum; BLOCKNUM blocknum = e->blocknum;
int keep_going_on_failure = e->keep_going_on_failure; int keep_going_on_failure = e->keep_going_on_failure;
int result = 0; int result = 0;
const struct fifo_entry *entry = toku_fifo_get_entry(e->fifo, offset); DBT k, v;
FT_MSG_S msg = e->msg_buffer->get_message(offset, &k, &v);
bool is_fresh = e->msg_buffer->get_freshness(offset);
if (e->broadcast) { if (e->broadcast) {
VERIFY_ASSERTION(ft_msg_type_applies_all((enum ft_msg_type) entry->type) || ft_msg_type_does_nothing((enum ft_msg_type) entry->type), VERIFY_ASSERTION(ft_msg_type_applies_all((enum ft_msg_type) msg.type) || ft_msg_type_does_nothing((enum ft_msg_type) msg.type),
e->i, "message found in broadcast list that is not a broadcast"); e->i, "message found in broadcast list that is not a broadcast");
} else { } else {
VERIFY_ASSERTION(ft_msg_type_applies_once((enum ft_msg_type) entry->type), VERIFY_ASSERTION(ft_msg_type_applies_once((enum ft_msg_type) msg.type),
e->i, "message found in fresh or stale message tree that does not apply once"); e->i, "message found in fresh or stale message tree that does not apply once");
if (e->is_fresh) { if (e->is_fresh) {
if (e->messages_have_been_moved) { if (e->messages_have_been_moved) {
VERIFY_ASSERTION(entry->is_fresh, VERIFY_ASSERTION(is_fresh,
e->i, "message found in fresh message tree that is not fresh"); e->i, "message found in fresh message tree that is not fresh");
} }
} else { } else {
VERIFY_ASSERTION(!entry->is_fresh, VERIFY_ASSERTION(!is_fresh,
e->i, "message found in stale message tree that is fresh"); e->i, "message found in stale message tree that is fresh");
} }
} }
...@@ -235,15 +239,15 @@ int verify_marked_messages(const int32_t &offset, const uint32_t UU(idx), struct ...@@ -235,15 +239,15 @@ int verify_marked_messages(const int32_t &offset, const uint32_t UU(idx), struct
BLOCKNUM blocknum = e->blocknum; BLOCKNUM blocknum = e->blocknum;
int keep_going_on_failure = e->keep_going_on_failure; int keep_going_on_failure = e->keep_going_on_failure;
int result = 0; int result = 0;
const struct fifo_entry *entry = toku_fifo_get_entry(e->fifo, offset); bool is_fresh = e->msg_buffer->get_freshness(offset);
VERIFY_ASSERTION(!entry->is_fresh, e->i, "marked message found in the fresh message tree that is fresh"); VERIFY_ASSERTION(!is_fresh, e->i, "marked message found in the fresh message tree that is fresh");
done: done:
return result; return result;
} }
template<typename verify_omt_t> template<typename verify_omt_t>
static int static int
verify_sorted_by_key_msn(FT_HANDLE ft_handle, FIFO fifo, const verify_omt_t &mt) { verify_sorted_by_key_msn(FT_HANDLE ft_handle, message_buffer *msg_buffer, const verify_omt_t &mt) {
int result = 0; int result = 0;
size_t last_offset = 0; size_t last_offset = 0;
for (uint32_t i = 0; i < mt.size(); i++) { for (uint32_t i = 0; i < mt.size(); i++) {
...@@ -251,12 +255,12 @@ verify_sorted_by_key_msn(FT_HANDLE ft_handle, FIFO fifo, const verify_omt_t &mt) ...@@ -251,12 +255,12 @@ verify_sorted_by_key_msn(FT_HANDLE ft_handle, FIFO fifo, const verify_omt_t &mt)
int r = mt.fetch(i, &offset); int r = mt.fetch(i, &offset);
assert_zero(r); assert_zero(r);
if (i > 0) { if (i > 0) {
struct toku_fifo_entry_key_msn_cmp_extra extra; struct toku_msg_buffer_key_msn_cmp_extra extra;
ZERO_STRUCT(extra); ZERO_STRUCT(extra);
extra.desc = &ft_handle->ft->cmp_descriptor; extra.desc = &ft_handle->ft->cmp_descriptor;
extra.cmp = ft_handle->ft->compare_fun; extra.cmp = ft_handle->ft->compare_fun;
extra.fifo = fifo; extra.msg_buffer = msg_buffer;
if (toku_fifo_entry_key_msn_cmp(extra, last_offset, offset) >= 0) { if (toku_msg_buffer_key_msn_cmp(extra, last_offset, offset) >= 0) {
result = TOKUDB_NEEDS_REPAIR; result = TOKUDB_NEEDS_REPAIR;
break; break;
} }
...@@ -268,15 +272,15 @@ verify_sorted_by_key_msn(FT_HANDLE ft_handle, FIFO fifo, const verify_omt_t &mt) ...@@ -268,15 +272,15 @@ verify_sorted_by_key_msn(FT_HANDLE ft_handle, FIFO fifo, const verify_omt_t &mt)
template<typename count_omt_t> template<typename count_omt_t>
static int static int
count_eq_key_msn(FT_HANDLE ft_handle, FIFO fifo, const count_omt_t &mt, const DBT *key, MSN msn) { count_eq_key_msn(FT_HANDLE ft_handle, message_buffer *msg_buffer, const count_omt_t &mt, const DBT *key, MSN msn) {
struct toku_fifo_entry_key_msn_heaviside_extra extra; struct toku_msg_buffer_key_msn_heaviside_extra extra;
ZERO_STRUCT(extra); ZERO_STRUCT(extra);
extra.desc = &ft_handle->ft->cmp_descriptor; extra.desc = &ft_handle->ft->cmp_descriptor;
extra.cmp = ft_handle->ft->compare_fun; extra.cmp = ft_handle->ft->compare_fun;
extra.fifo = fifo; extra.msg_buffer = msg_buffer;
extra.key = key; extra.key = key;
extra.msn = msn; extra.msn = msn;
int r = mt.template find_zero<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(extra, nullptr, nullptr); int r = mt.template find_zero<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(extra, nullptr, nullptr);
int count; int count;
if (r == 0) { if (r == 0) {
count = 1; count = 1;
...@@ -308,6 +312,80 @@ toku_get_node_for_verify( ...@@ -308,6 +312,80 @@ toku_get_node_for_verify(
); );
} }
struct verify_msg_fn {
FT_HANDLE ft_handle;
NONLEAF_CHILDINFO bnc;
const DBT *curr_less_pivot;
const DBT *curr_geq_pivot;
BLOCKNUM blocknum;
MSN this_msn;
int verbose;
int keep_going_on_failure;
bool messages_have_been_moved;
MSN last_msn;
int msg_i;
int result = 0; // needed by VERIFY_ASSERTION
verify_msg_fn(FT_HANDLE handle, NONLEAF_CHILDINFO nl, const DBT *less, const DBT *geq,
BLOCKNUM b, MSN tmsn, int v, int k, bool m) :
ft_handle(handle), bnc(nl), curr_less_pivot(less), curr_geq_pivot(geq),
blocknum(b), this_msn(tmsn), verbose(v), keep_going_on_failure(k), messages_have_been_moved(m), last_msn(ZERO_MSN), msg_i(0) {
}
int operator()(FT_MSG msg, bool is_fresh) {
enum ft_msg_type type = (enum ft_msg_type) msg->type;
MSN msn = msg->msn;
XIDS xid = msg->xids;
const void *key = ft_msg_get_key(msg);
const void *data = ft_msg_get_val(msg);
ITEMLEN keylen = ft_msg_get_keylen(msg);
ITEMLEN datalen = ft_msg_get_vallen(msg);
int r = verify_msg_in_child_buffer(ft_handle, type, msn, key, keylen, data, datalen, xid,
curr_less_pivot,
curr_geq_pivot);
VERIFY_ASSERTION(r == 0, msg_i, "A message in the buffer is out of place");
VERIFY_ASSERTION((msn.msn > last_msn.msn), msg_i, "msn per msg must be monotonically increasing toward newer messages in buffer");
VERIFY_ASSERTION((msn.msn <= this_msn.msn), msg_i, "all messages must have msn within limit of this node's max_msn_applied_to_node_in_memory");
if (ft_msg_type_applies_once(type)) {
int count;
DBT keydbt;
toku_fill_dbt(&keydbt, key, keylen);
int total_count = 0;
count = count_eq_key_msn(ft_handle, &bnc->msg_buffer, bnc->fresh_message_tree, toku_fill_dbt(&keydbt, key, keylen), msn);
total_count += count;
if (is_fresh) {
VERIFY_ASSERTION(count == 1, msg_i, "a fresh message was not found in the fresh message tree");
} else if (messages_have_been_moved) {
VERIFY_ASSERTION(count == 0, msg_i, "a stale message was found in the fresh message tree");
}
VERIFY_ASSERTION(count <= 1, msg_i, "a message was found multiple times in the fresh message tree");
count = count_eq_key_msn(ft_handle, &bnc->msg_buffer, bnc->stale_message_tree, &keydbt, msn);
total_count += count;
if (is_fresh) {
VERIFY_ASSERTION(count == 0, msg_i, "a fresh message was found in the stale message tree");
} else if (messages_have_been_moved) {
VERIFY_ASSERTION(count == 1, msg_i, "a stale message was not found in the stale message tree");
}
VERIFY_ASSERTION(count <= 1, msg_i, "a message was found multiple times in the stale message tree");
VERIFY_ASSERTION(total_count <= 1, msg_i, "a message was found in both message trees (or more than once in a single tree)");
VERIFY_ASSERTION(total_count >= 1, msg_i, "a message was not found in either message tree");
} else {
VERIFY_ASSERTION(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type), msg_i, "a message was found that does not apply either to all or to only one key");
struct count_msgs_extra extra = { .count = 0, .msn = msn, .msg_buffer = &bnc->msg_buffer };
bnc->broadcast_list.iterate<struct count_msgs_extra, count_msgs>(&extra);
VERIFY_ASSERTION(extra.count == 1, msg_i, "a broadcast message was not found in the broadcast list");
}
last_msn = msn;
msg_i++;
done:
return result;
}
};
static int static int
toku_verify_ftnode_internal(FT_HANDLE ft_handle, toku_verify_ftnode_internal(FT_HANDLE ft_handle,
MSN rootmsn, MSN parentmsn_with_messages, bool messages_exist_above, MSN rootmsn, MSN parentmsn_with_messages, bool messages_exist_above,
...@@ -351,55 +429,18 @@ toku_verify_ftnode_internal(FT_HANDLE ft_handle, ...@@ -351,55 +429,18 @@ toku_verify_ftnode_internal(FT_HANDLE ft_handle,
const DBT *curr_less_pivot = (i==0) ? lesser_pivot : &node->childkeys[i-1]; const DBT *curr_less_pivot = (i==0) ? lesser_pivot : &node->childkeys[i-1];
const DBT *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : &node->childkeys[i]; const DBT *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : &node->childkeys[i];
if (node->height > 0) { if (node->height > 0) {
MSN last_msn = ZERO_MSN;
// Verify that messages in the buffers are in the right place.
NONLEAF_CHILDINFO bnc = BNC(node, i); NONLEAF_CHILDINFO bnc = BNC(node, i);
VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, bnc->buffer, bnc->fresh_message_tree) == 0, i, "fresh_message_tree"); // Verify that messages in the buffers are in the right place.
VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, bnc->buffer, bnc->stale_message_tree) == 0, i, "stale_message_tree"); VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, &bnc->msg_buffer, bnc->fresh_message_tree) == 0, i, "fresh_message_tree");
FIFO_ITERATE(bnc->buffer, key, keylen, data, datalen, itype, msn, xid, is_fresh, VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, &bnc->msg_buffer, bnc->stale_message_tree) == 0, i, "stale_message_tree");
({
enum ft_msg_type type = (enum ft_msg_type) itype; verify_msg_fn verify_msg(ft_handle, bnc, curr_less_pivot, curr_geq_pivot,
int r = verify_msg_in_child_buffer(ft_handle, type, msn, key, keylen, data, datalen, xid, blocknum, this_msn, verbose, keep_going_on_failure, messages_have_been_moved);
curr_less_pivot, int r = bnc->msg_buffer.iterate(verify_msg);
curr_geq_pivot); if (r != 0) { result = r; goto done; }
VERIFY_ASSERTION(r==0, i, "A message in the buffer is out of place");
VERIFY_ASSERTION((msn.msn > last_msn.msn), i, "msn per msg must be monotonically increasing toward newer messages in buffer"); struct verify_message_tree_extra extra = { .msg_buffer = &bnc->msg_buffer, .broadcast = false, .is_fresh = true, .i = i, .verbose = verbose, .blocknum = node->thisnodename, .keep_going_on_failure = keep_going_on_failure, .messages_have_been_moved = messages_have_been_moved };
VERIFY_ASSERTION((msn.msn <= this_msn.msn), i, "all messages must have msn within limit of this node's max_msn_applied_to_node_in_memory"); r = bnc->fresh_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
if (ft_msg_type_applies_once(type)) {
int count;
DBT keydbt;
toku_fill_dbt(&keydbt, key, keylen);
int total_count = 0;
count = count_eq_key_msn(ft_handle, bnc->buffer, bnc->fresh_message_tree, toku_fill_dbt(&keydbt, key, keylen), msn);
total_count += count;
if (is_fresh) {
VERIFY_ASSERTION(count == 1, i, "a fresh message was not found in the fresh message tree");
} else if (messages_have_been_moved) {
VERIFY_ASSERTION(count == 0, i, "a stale message was found in the fresh message tree");
}
VERIFY_ASSERTION(count <= 1, i, "a message was found multiple times in the fresh message tree");
count = count_eq_key_msn(ft_handle, bnc->buffer, bnc->stale_message_tree, &keydbt, msn);
total_count += count;
if (is_fresh) {
VERIFY_ASSERTION(count == 0, i, "a fresh message was found in the stale message tree");
} else if (messages_have_been_moved) {
VERIFY_ASSERTION(count == 1, i, "a stale message was not found in the stale message tree");
}
VERIFY_ASSERTION(count <= 1, i, "a message was found multiple times in the stale message tree");
VERIFY_ASSERTION(total_count <= 1, i, "a message was found in both message trees (or more than once in a single tree)");
VERIFY_ASSERTION(total_count >= 1, i, "a message was not found in either message tree");
} else {
VERIFY_ASSERTION(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type), i, "a message was found that does not apply either to all or to only one key");
struct count_msgs_extra extra = { .count = 0, .msn = msn, .fifo = bnc->buffer };
bnc->broadcast_list.iterate<struct count_msgs_extra, count_msgs>(&extra);
VERIFY_ASSERTION(extra.count == 1, i, "a broadcast message was not found in the broadcast list");
}
last_msn = msn;
}));
struct verify_message_tree_extra extra = { .fifo = bnc->buffer, .broadcast = false, .is_fresh = true, .i = i, .verbose = verbose, .blocknum = node->thisnodename, .keep_going_on_failure = keep_going_on_failure, .messages_have_been_moved = messages_have_been_moved };
int r = bnc->fresh_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
if (r != 0) { result = r; goto done; } if (r != 0) { result = r; goto done; }
extra.is_fresh = false; extra.is_fresh = false;
r = bnc->stale_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra); r = bnc->stale_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
......
...@@ -187,7 +187,7 @@ ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) { ...@@ -187,7 +187,7 @@ ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
} }
// Maps to cf->begin_checkpoint_userdata // Maps to cf->begin_checkpoint_userdata
// Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...). // Create checkpoint-in-progress versions of header and translation (btt)
// Has access to fd (it is protected). // Has access to fd (it is protected).
// //
// Not reentrant for a single FT (see ft_checkpoint) // Not reentrant for a single FT (see ft_checkpoint)
......
...@@ -2,7 +2,8 @@ ...@@ -2,7 +2,8 @@
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
/* The purpose of this file is to provide access to the ft_msg, /* The purpose of this file is to provide access to the ft_msg,
* which is the ephemeral version of the fifo_msg. * which is the ephemeral version of the messages that lives in
* a message buffer.
*/ */
#ifndef FT_MSG_H #ifndef FT_MSG_H
......
...@@ -310,26 +310,26 @@ serialize_ftnode_partition_size (FTNODE node, int i) ...@@ -310,26 +310,26 @@ serialize_ftnode_partition_size (FTNODE node, int i)
} }
#define FTNODE_PARTITION_DMT_LEAVES 0xaa #define FTNODE_PARTITION_DMT_LEAVES 0xaa
#define FTNODE_PARTITION_FIFO_MSG 0xbb #define FTNODE_PARTITION_MSG_BUFFER 0xbb
UU() static int UU() static int
assert_fresh(const int32_t &offset, const uint32_t UU(idx), struct fifo *const f) { assert_fresh(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
struct fifo_entry *entry = toku_fifo_get_entry(f, offset); bool is_fresh = msg_buffer->get_freshness(offset);
assert(entry->is_fresh); assert(is_fresh);
return 0; return 0;
} }
UU() static int UU() static int
assert_stale(const int32_t &offset, const uint32_t UU(idx), struct fifo *const f) { assert_stale(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
struct fifo_entry *entry = toku_fifo_get_entry(f, offset); bool is_fresh = msg_buffer->get_freshness(offset);
assert(!entry->is_fresh); assert(!is_fresh);
return 0; return 0;
} }
static void bnc_verify_message_trees(NONLEAF_CHILDINFO UU(bnc)) { static void bnc_verify_message_trees(NONLEAF_CHILDINFO UU(bnc)) {
#ifdef TOKU_DEBUG_PARANOID #ifdef TOKU_DEBUG_PARANOID
bnc->fresh_message_tree.iterate<struct fifo, assert_fresh>(bnc->buffer); bnc->fresh_message_tree.iterate<message_buffer, assert_fresh>(&bnc->msg_buffer);
bnc->stale_message_tree.iterate<struct fifo, assert_stale>(bnc->buffer); bnc->stale_message_tree.iterate<message_buffer, assert_stale>(&bnc->msg_buffer);
#endif #endif
} }
...@@ -342,21 +342,27 @@ wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *co ...@@ -342,21 +342,27 @@ wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *co
static void static void
serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb)
{ {
unsigned char ch = FTNODE_PARTITION_FIFO_MSG; unsigned char ch = FTNODE_PARTITION_MSG_BUFFER;
wbuf_nocrc_char(wb, ch); wbuf_nocrc_char(wb, ch);
// serialize the FIFO, first the number of entries, then the elements
// serialize the message buffer, first the number of entries, then the elements
wbuf_nocrc_int(wb, toku_bnc_n_entries(bnc)); wbuf_nocrc_int(wb, toku_bnc_n_entries(bnc));
FIFO_ITERATE( struct msg_serialize_fn {
bnc->buffer, key, keylen, data, datalen, type, msn, xids, is_fresh, struct wbuf *wb;
{ msg_serialize_fn(struct wbuf *w) : wb(w) { }
int operator()(FT_MSG msg, bool is_fresh) {
enum ft_msg_type type = (enum ft_msg_type) msg->type;
paranoid_invariant((int) type >= 0 && (int) type < 256); paranoid_invariant((int) type >= 0 && (int) type < 256);
wbuf_nocrc_char(wb, (unsigned char) type); wbuf_nocrc_char(wb, (unsigned char) type);
wbuf_nocrc_char(wb, (unsigned char) is_fresh); wbuf_nocrc_char(wb, (unsigned char) is_fresh);
wbuf_MSN(wb, msn); wbuf_MSN(wb, msg->msn);
wbuf_nocrc_xids(wb, xids); wbuf_nocrc_xids(wb, ft_msg_get_xids(msg));
wbuf_nocrc_bytes(wb, key, keylen); wbuf_nocrc_bytes(wb, ft_msg_get_key(msg), ft_msg_get_keylen(msg));
wbuf_nocrc_bytes(wb, data, datalen); wbuf_nocrc_bytes(wb, ft_msg_get_val(msg), ft_msg_get_vallen(msg));
}); return 0;
}
} serialize_fn(wb);
bnc->msg_buffer.iterate(serialize_fn);
bnc_verify_message_trees(bnc); bnc_verify_message_trees(bnc);
...@@ -1084,7 +1090,7 @@ deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, ...@@ -1084,7 +1090,7 @@ deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
XMALLOC_N(n_in_this_buffer, fresh_offsets); XMALLOC_N(n_in_this_buffer, fresh_offsets);
XMALLOC_N(n_in_this_buffer, broadcast_offsets); XMALLOC_N(n_in_this_buffer, broadcast_offsets);
} }
toku_fifo_resize(bnc->buffer, rbuf->size + 64); bnc->msg_buffer.resize(rbuf->size + 64);
for (int i = 0; i < n_in_this_buffer; i++) { for (int i = 0; i < n_in_this_buffer; i++) {
bytevec key; ITEMLEN keylen; bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen; bytevec val; ITEMLEN vallen;
...@@ -1116,19 +1122,24 @@ deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, ...@@ -1116,19 +1122,24 @@ deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
} else { } else {
dest = NULL; dest = NULL;
} }
r = toku_fifo_enq(bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, dest); /* Copies the data into the fifo */ // TODO: Function to parse stuff out of an rbuf into an FT_MSG
lazy_assert_zero(r); DBT k, v;
FT_MSG_S msg = {
type, msn, xids,
.u = { .id = { toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, val, vallen) } }
};
bnc->msg_buffer.enqueue(&msg, is_fresh, dest);
xids_destroy(&xids); xids_destroy(&xids);
} }
invariant(rbuf->ndone == rbuf->size); invariant(rbuf->ndone == rbuf->size);
if (cmp) { if (cmp) {
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc = desc, .cmp = cmp, .fifo = bnc->buffer }; struct toku_msg_buffer_key_msn_cmp_extra extra = { .desc = desc, .cmp = cmp, .msg_buffer = &bnc->msg_buffer };
r = toku::sort<int32_t, const struct toku_fifo_entry_key_msn_cmp_extra, toku_fifo_entry_key_msn_cmp>::mergesort_r(fresh_offsets, nfresh, extra); r = toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp>::mergesort_r(fresh_offsets, nfresh, extra);
assert_zero(r); assert_zero(r);
bnc->fresh_message_tree.destroy(); bnc->fresh_message_tree.destroy();
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer); bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer);
r = toku::sort<int32_t, const struct toku_fifo_entry_key_msn_cmp_extra, toku_fifo_entry_key_msn_cmp>::mergesort_r(stale_offsets, nstale, extra); r = toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp>::mergesort_r(stale_offsets, nstale, extra);
assert_zero(r); assert_zero(r);
bnc->stale_message_tree.destroy(); bnc->stale_message_tree.destroy();
bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, n_in_this_buffer); bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, n_in_this_buffer);
...@@ -1137,9 +1148,9 @@ deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, ...@@ -1137,9 +1148,9 @@ deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
} }
} }
// effect: deserialize a single message from rbuf and enque the result into the given fifo // effect: deserialize a single message from rbuf and enqueue the result into the given message buffer
static void static void
fifo_deserialize_msg_from_rbuf(FIFO fifo, struct rbuf *rbuf) { msg_buffer_deserialize_msg_from_rbuf(message_buffer *msg_buffer, struct rbuf *rbuf) {
bytevec key, val; bytevec key, val;
ITEMLEN keylen, vallen; ITEMLEN keylen, vallen;
enum ft_msg_type type = (enum ft_msg_type) rbuf_char(rbuf); enum ft_msg_type type = (enum ft_msg_type) rbuf_char(rbuf);
...@@ -1149,8 +1160,13 @@ fifo_deserialize_msg_from_rbuf(FIFO fifo, struct rbuf *rbuf) { ...@@ -1149,8 +1160,13 @@ fifo_deserialize_msg_from_rbuf(FIFO fifo, struct rbuf *rbuf) {
xids_create_from_buffer(rbuf, &xids); xids_create_from_buffer(rbuf, &xids);
rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */ rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rbuf, &val, &vallen); rbuf_bytes(rbuf, &val, &vallen);
int r = toku_fifo_enq(fifo, key, keylen, val, vallen, type, msn, xids, is_fresh, nullptr); // TODO: Function to parse stuff out of an rbuf into an FT_MSG
lazy_assert_zero(r); DBT k, v;
FT_MSG_S msg = {
type, msn, xids,
.u = { .id = { toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, val, vallen) } }
};
msg_buffer->enqueue(&msg, is_fresh, nullptr);
xids_destroy(&xids); xids_destroy(&xids);
} }
...@@ -1162,9 +1178,9 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf) { ...@@ -1162,9 +1178,9 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf) {
int32_t *XMALLOC_N(n_in_this_buffer, fresh_offsets); int32_t *XMALLOC_N(n_in_this_buffer, fresh_offsets);
int32_t *XMALLOC_N(n_in_this_buffer, broadcast_offsets); int32_t *XMALLOC_N(n_in_this_buffer, broadcast_offsets);
toku_fifo_resize(bnc->buffer, rbuf->size + 64); bnc->msg_buffer.resize(rbuf->size + 64);
for (int i = 0; i < n_in_this_buffer; i++) { for (int i = 0; i < n_in_this_buffer; i++) {
fifo_deserialize_msg_from_rbuf(bnc->buffer, rbuf); msg_buffer_deserialize_msg_from_rbuf(&bnc->msg_buffer, rbuf);
} }
// read in each message tree (fresh, stale, broadcast) // read in each message tree (fresh, stale, broadcast)
...@@ -1253,7 +1269,7 @@ BASEMENTNODE toku_create_empty_bn_no_buffer(void) { ...@@ -1253,7 +1269,7 @@ BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
NONLEAF_CHILDINFO toku_create_empty_nl(void) { NONLEAF_CHILDINFO toku_create_empty_nl(void) {
NONLEAF_CHILDINFO XMALLOC(cn); NONLEAF_CHILDINFO XMALLOC(cn);
int r = toku_fifo_create(&cn->buffer); assert_zero(r); cn->msg_buffer.create();
cn->fresh_message_tree.create_no_array(); cn->fresh_message_tree.create_no_array();
cn->stale_message_tree.create_no_array(); cn->stale_message_tree.create_no_array();
cn->broadcast_list.create_no_array(); cn->broadcast_list.create_no_array();
...@@ -1261,10 +1277,10 @@ NONLEAF_CHILDINFO toku_create_empty_nl(void) { ...@@ -1261,10 +1277,10 @@ NONLEAF_CHILDINFO toku_create_empty_nl(void) {
return cn; return cn;
} }
// must clone the OMTs, since we serialize them along with the FIFO // must clone the OMTs, since we serialize them along with the message buffer
NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) { NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) {
NONLEAF_CHILDINFO XMALLOC(cn); NONLEAF_CHILDINFO XMALLOC(cn);
toku_fifo_clone(orig_childinfo->buffer, &cn->buffer); cn->msg_buffer.clone(&orig_childinfo->msg_buffer);
cn->fresh_message_tree.create_no_array(); cn->fresh_message_tree.create_no_array();
cn->fresh_message_tree.clone(orig_childinfo->fresh_message_tree); cn->fresh_message_tree.clone(orig_childinfo->fresh_message_tree);
cn->stale_message_tree.create_no_array(); cn->stale_message_tree.create_no_array();
...@@ -1283,7 +1299,7 @@ void destroy_basement_node (BASEMENTNODE bn) ...@@ -1283,7 +1299,7 @@ void destroy_basement_node (BASEMENTNODE bn)
void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl) void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl)
{ {
toku_fifo_free(&nl->buffer); nl->msg_buffer.destroy();
nl->fresh_message_tree.destroy(); nl->fresh_message_tree.destroy();
nl->stale_message_tree.destroy(); nl->stale_message_tree.destroy();
nl->broadcast_list.destroy(); nl->broadcast_list.destroy();
...@@ -1615,7 +1631,7 @@ deserialize_ftnode_partition( ...@@ -1615,7 +1631,7 @@ deserialize_ftnode_partition(
ch = rbuf_char(&rb); ch = rbuf_char(&rb);
if (node->height > 0) { if (node->height > 0) {
assert(ch == FTNODE_PARTITION_FIFO_MSG); assert(ch == FTNODE_PARTITION_MSG_BUFFER);
NONLEAF_CHILDINFO bnc = BNC(node, childnum); NONLEAF_CHILDINFO bnc = BNC(node, childnum);
if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_26) { if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_26) {
// Layout version <= 26 did not serialize sorted message trees to disk. // Layout version <= 26 did not serialize sorted message trees to disk.
...@@ -1827,7 +1843,7 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode, ...@@ -1827,7 +1843,7 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
paranoid_invariant(is_valid_ftnode_fetch_type(bfe->type)); paranoid_invariant(is_valid_ftnode_fetch_type(bfe->type));
// setup the memory of the partitions // setup the memory of the partitions
// for partitions being decompressed, create either FIFO or basement node // for partitions being decompressed, create either message buffer or basement node
// for partitions staying compressed, create sub_block // for partitions staying compressed, create sub_block
setup_ftnode_partitions(node, bfe, false); setup_ftnode_partitions(node, bfe, false);
...@@ -1995,7 +2011,7 @@ deserialize_and_upgrade_internal_node(FTNODE node, ...@@ -1995,7 +2011,7 @@ deserialize_and_upgrade_internal_node(FTNODE node,
highest_msn.msn = lowest.msn + n_in_this_buffer; highest_msn.msn = lowest.msn + n_in_this_buffer;
} }
// Create the FIFO entires from the deserialized buffer. // Create the message buffers from the deserialized buffer.
for (int j = 0; j < n_in_this_buffer; ++j) { for (int j = 0; j < n_in_this_buffer; ++j) {
bytevec key; ITEMLEN keylen; bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen; bytevec val; ITEMLEN vallen;
...@@ -2025,25 +2041,21 @@ deserialize_and_upgrade_internal_node(FTNODE node, ...@@ -2025,25 +2041,21 @@ deserialize_and_upgrade_internal_node(FTNODE node,
// Increment our MSN, the last message should have the // Increment our MSN, the last message should have the
// newest/highest MSN. See above for a full explanation. // newest/highest MSN. See above for a full explanation.
lowest.msn++; lowest.msn++;
r = toku_fifo_enq(bnc->buffer, // TODO: Function to parse stuff out of an rbuf into an FT_MSG
key, DBT k, v;
keylen, FT_MSG_S msg = {
val, type, lowest, xids,
vallen, .u = { .id = { toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, val, vallen) } }
type, };
lowest, bnc->msg_buffer.enqueue(&msg, true, dest);
xids,
true,
dest);
lazy_assert_zero(r);
xids_destroy(&xids); xids_destroy(&xids);
} }
if (bfe->h->compare_fun) { if (bfe->h->compare_fun) {
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc = &bfe->h->cmp_descriptor, struct toku_msg_buffer_key_msn_cmp_extra extra = { .desc = &bfe->h->cmp_descriptor,
.cmp = bfe->h->compare_fun, .cmp = bfe->h->compare_fun,
.fifo = bnc->buffer }; .msg_buffer = &bnc->msg_buffer };
typedef toku::sort<int32_t, const struct toku_fifo_entry_key_msn_cmp_extra, toku_fifo_entry_key_msn_cmp> key_msn_sort; typedef toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp> key_msn_sort;
r = key_msn_sort::mergesort_r(fresh_offsets, nfresh, extra); r = key_msn_sort::mergesort_r(fresh_offsets, nfresh, extra);
assert_zero(r); assert_zero(r);
bnc->fresh_message_tree.destroy(); bnc->fresh_message_tree.destroy();
...@@ -2053,7 +2065,7 @@ deserialize_and_upgrade_internal_node(FTNODE node, ...@@ -2053,7 +2065,7 @@ deserialize_and_upgrade_internal_node(FTNODE node,
} }
} }
// Assign the highest msn from our upgrade message FIFO queues. // Assign the highest msn from our upgrade message buffers
node->max_msn_applied_to_node_on_disk = highest_msn; node->max_msn_applied_to_node_on_disk = highest_msn;
// Since we assigned MSNs to this node's messages, we need to dirty it. // Since we assigned MSNs to this node's messages, we need to dirty it.
node->dirty = 1; node->dirty = 1;
...@@ -2433,7 +2445,7 @@ deserialize_ftnode_from_rbuf( ...@@ -2433,7 +2445,7 @@ deserialize_ftnode_from_rbuf(
paranoid_invariant(is_valid_ftnode_fetch_type(bfe->type)); paranoid_invariant(is_valid_ftnode_fetch_type(bfe->type));
// setup the memory of the partitions // setup the memory of the partitions
// for partitions being decompressed, create either FIFO or basement node // for partitions being decompressed, create either message buffer or basement node
// for partitions staying compressed, create sub_block // for partitions staying compressed, create sub_block
setup_ftnode_partitions(node, bfe, true); setup_ftnode_partitions(node, bfe, true);
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/* /*
COPYING CONDITIONS NOTICE: COPYING CONDITIONS NOTICE:
...@@ -30,7 +30,7 @@ COPYING CONDITIONS NOTICE: ...@@ -30,7 +30,7 @@ COPYING CONDITIONS NOTICE:
COPYRIGHT NOTICE: COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library. TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc. Copyright (C) 2014 Tokutek, Inc.
DISCLAIMER: DISCLAIMER:
...@@ -86,69 +86,33 @@ PATENT RIGHTS GRANT: ...@@ -86,69 +86,33 @@ PATENT RIGHTS GRANT:
under this License. under this License.
*/ */
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved." #include "ft/msg_buffer.h"
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #include "ft/ybt.h"
#include "fifo.h"
#include "xids.h"
#include "ybt.h"
#include <memory.h>
#include <toku_assert.h>
struct fifo {
int n_items_in_fifo;
char *memory; // An array of bytes into which fifo_entries are embedded.
int memory_size; // How big is fifo_memory
int memory_used; // How many bytes are in use?
};
static void fifo_init(struct fifo *fifo) {
fifo->n_items_in_fifo = 0;
fifo->memory = 0;
fifo->memory_size = 0;
fifo->memory_used = 0;
}
__attribute__((const,nonnull))
static int fifo_entry_size(struct fifo_entry *entry) {
return sizeof (struct fifo_entry) + entry->keylen + entry->vallen
+ xids_get_size(&entry->xids_s)
- sizeof(XIDS_S); //Prevent double counting from fifo_entry+xids_get_size
}
__attribute__((const,nonnull))
size_t toku_ft_msg_memsize_in_fifo(FT_MSG msg) {
// This must stay in sync with fifo_entry_size because that's what we
// really trust. But sometimes we only have an in-memory FT_MSG, not
// a serialized fifo_entry so we have to fake it.
return sizeof (struct fifo_entry) + msg->u.id.key->size + msg->u.id.val->size
+ xids_get_size(msg->xids)
- sizeof(XIDS_S);
}
int toku_fifo_create(FIFO *ptr) { void message_buffer::create() {
struct fifo *XMALLOC(fifo); _num_entries = 0;
if (fifo == 0) return ENOMEM; _memory = nullptr;
fifo_init(fifo); _memory_size = 0;
*ptr = fifo; _memory_used = 0;
return 0;
} }
void toku_fifo_resize(FIFO fifo, size_t new_size) { void message_buffer::clone(message_buffer *src) {
XREALLOC_N(new_size, fifo->memory); _num_entries = src->_num_entries;
fifo->memory_size = new_size; _memory_used = src->_memory_used;
_memory_size = src->_memory_size;
XMALLOC_N(_memory_size, _memory);
memcpy(_memory, src->_memory, _memory_size);
} }
void toku_fifo_free(FIFO *ptr) { void message_buffer::destroy() {
FIFO fifo = *ptr; if (_memory != nullptr) {
if (fifo->memory) toku_free(fifo->memory); toku_free(_memory);
fifo->memory=0; }
toku_free(fifo);
*ptr = 0;
} }
int toku_fifo_n_entries(FIFO fifo) { void message_buffer::resize(size_t new_size) {
return fifo->n_items_in_fifo; XREALLOC_N(new_size, _memory);
_memory_size = new_size;
} }
static int next_power_of_two (int n) { static int next_power_of_two (int n) {
...@@ -160,94 +124,101 @@ static int next_power_of_two (int n) { ...@@ -160,94 +124,101 @@ static int next_power_of_two (int n) {
return r; return r;
} }
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, int32_t *dest) { struct message_buffer::buffer_entry *message_buffer::get_buffer_entry(int32_t offset) const {
int need_space_here = sizeof(struct fifo_entry) return (struct buffer_entry *) (_memory + offset);
}
void message_buffer::enqueue(FT_MSG msg, bool is_fresh, int32_t *offset) {
ITEMLEN keylen = ft_msg_get_keylen(msg);
ITEMLEN datalen = ft_msg_get_vallen(msg);
XIDS xids = ft_msg_get_xids(msg);
int need_space_here = sizeof(struct buffer_entry)
+ keylen + datalen + keylen + datalen
+ xids_get_size(xids) + xids_get_size(xids)
- sizeof(XIDS_S); //Prevent double counting - sizeof(XIDS_S); //Prevent double counting
int need_space_total = fifo->memory_used+need_space_here; int need_space_total = _memory_used + need_space_here;
if (fifo->memory == NULL || need_space_total > fifo->memory_size) { if (_memory == nullptr || need_space_total > _memory_size) {
// resize the fifo to the next power of 2 greater than the needed space // resize the buffer to the next power of 2 greater than the needed space
int next_2 = next_power_of_two(need_space_total); int next_2 = next_power_of_two(need_space_total);
toku_fifo_resize(fifo, next_2); resize(next_2);
} }
struct fifo_entry *entry = (struct fifo_entry *)(fifo->memory + fifo->memory_used); struct buffer_entry *entry = get_buffer_entry(_memory_used);
entry->type = (unsigned char) type; entry->type = (unsigned char) ft_msg_get_type(msg);
entry->msn = msn; entry->msn = msg->msn;
xids_cpy(&entry->xids_s, xids); xids_cpy(&entry->xids_s, xids);
entry->is_fresh = is_fresh; entry->is_fresh = is_fresh;
entry->keylen = keylen;
unsigned char *e_key = xids_get_end_of_array(&entry->xids_s); unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
memcpy(e_key, key, keylen); entry->keylen = keylen;
memcpy(e_key, ft_msg_get_key(msg), keylen);
entry->vallen = datalen; entry->vallen = datalen;
memcpy(e_key + keylen, data, datalen); memcpy(e_key + keylen, ft_msg_get_val(msg), datalen);
if (dest) { if (offset) {
*dest = fifo->memory_used; *offset = _memory_used;
} }
fifo->n_items_in_fifo++; _num_entries++;
fifo->memory_used += need_space_here; _memory_used += need_space_here;
return 0;
} }
int toku_fifo_iterate_internal_start(FIFO UU(fifo)) { return 0; } void message_buffer::set_freshness(int32_t offset, bool is_fresh) {
int toku_fifo_iterate_internal_has_more(FIFO fifo, int off) { return off < fifo->memory_used; } struct buffer_entry *entry = get_buffer_entry(offset);
int toku_fifo_iterate_internal_next(FIFO fifo, int off) { entry->is_fresh = is_fresh;
struct fifo_entry *e = (struct fifo_entry *)(fifo->memory + off);
return off + fifo_entry_size(e);
}
struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off) {
return (struct fifo_entry *)(fifo->memory + off);
} }
size_t toku_fifo_internal_entry_memsize(struct fifo_entry *e) {
return fifo_entry_size(e); bool message_buffer::get_freshness(int32_t offset) const {
struct buffer_entry *entry = get_buffer_entry(offset);
return entry->is_fresh;
} }
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, void*), void *arg) { FT_MSG_S message_buffer::get_message(int32_t offset, DBT *keydbt, DBT *valdbt) const {
FIFO_ITERATE(fifo, struct buffer_entry *entry = get_buffer_entry(offset);
key, keylen, data, datalen, type, msn, xids, is_fresh, ITEMLEN keylen = entry->keylen;
f(key,keylen,data,datalen,type,msn,xids,is_fresh, arg)); ITEMLEN vallen = entry->vallen;
enum ft_msg_type type = (enum ft_msg_type) entry->type;
MSN msn = entry->msn;
const XIDS xids = (XIDS) &entry->xids_s;
bytevec key = xids_get_end_of_array(xids);
bytevec val = (uint8_t *) key + entry->keylen;
FT_MSG_S msg = {
type, msn, xids,
.u = { .id = { toku_fill_dbt(keydbt, key, keylen), toku_fill_dbt(valdbt, val, vallen) } }
};
return msg;
} }
unsigned int toku_fifo_buffer_size_in_use (FIFO fifo) { void message_buffer::get_message_key_msn(int32_t offset, DBT *key, MSN *msn) const {
return fifo->memory_used; struct buffer_entry *entry = get_buffer_entry(offset);
if (key != nullptr) {
toku_fill_dbt(key, xids_get_end_of_array((XIDS) &entry->xids_s), entry->keylen);
}
if (msn != nullptr) {
*msn = entry->msn;
}
} }
unsigned long toku_fifo_memory_size_in_use(FIFO fifo) { int message_buffer::num_entries() const {
return sizeof(*fifo)+fifo->memory_used; return _num_entries;
} }
unsigned long toku_fifo_memory_footprint(FIFO fifo) { size_t message_buffer::buffer_size_in_use() const {
size_t size_used = toku_memory_footprint(fifo->memory, fifo->memory_used); return _memory_used;
long rval = sizeof(*fifo) + size_used;
return rval;
} }
DBT *fill_dbt_for_fifo_entry(DBT *dbt, const struct fifo_entry *entry) { size_t message_buffer::memory_size_in_use() const {
return toku_fill_dbt(dbt, xids_get_end_of_array((XIDS) &entry->xids_s), entry->keylen); return sizeof(*this) + _memory_used;
} }
struct fifo_entry *toku_fifo_get_entry(FIFO fifo, int off) { size_t message_buffer::memory_footprint() const {
return toku_fifo_iterate_internal_get_entry(fifo, off); return sizeof(*this) + toku_memory_footprint(_memory, _memory_used);
} }
void toku_fifo_clone(FIFO orig_fifo, FIFO* cloned_fifo) { bool message_buffer::equals(message_buffer *other) const {
struct fifo *XMALLOC(new_fifo); return (_memory_used == other->_memory_used &&
assert(new_fifo); memcmp(_memory, other->_memory, _memory_used) == 0);
new_fifo->n_items_in_fifo = orig_fifo->n_items_in_fifo;
new_fifo->memory_used = orig_fifo->memory_used;
new_fifo->memory_size = new_fifo->memory_used;
XMALLOC_N(new_fifo->memory_size, new_fifo->memory);
memcpy(
new_fifo->memory,
orig_fifo->memory,
new_fifo->memory_size
);
*cloned_fifo = new_fifo;
} }
bool toku_are_fifos_same(FIFO fifo1, FIFO fifo2) { size_t message_buffer::msg_memsize_in_buffer(FT_MSG msg) {
return ( return sizeof(struct buffer_entry)
fifo1->memory_used == fifo2->memory_used && + msg->u.id.key->size + msg->u.id.val->size
memcmp(fifo1->memory, fifo2->memory, fifo1->memory_used) == 0 + xids_get_size(msg->xids)
); - sizeof(XIDS_S);
} }
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ifndef FIFO_H
#define FIFO_H
#ident "$Id$"
/* /*
COPYING CONDITIONS NOTICE: COPYING CONDITIONS NOTICE:
...@@ -32,7 +30,7 @@ COPYING CONDITIONS NOTICE: ...@@ -32,7 +30,7 @@ COPYING CONDITIONS NOTICE:
COPYRIGHT NOTICE: COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library. TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc. Copyright (C) 2014 Tokutek, Inc.
DISCLAIMER: DISCLAIMER:
...@@ -88,77 +86,76 @@ PATENT RIGHTS GRANT: ...@@ -88,77 +86,76 @@ PATENT RIGHTS GRANT:
under this License. under this License.
*/ */
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved." #pragma once
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "ft/fttypes.h" #include "ft/fttypes.h"
#include "ft/xids-internal.h" #include "ft/xids-internal.h"
#include "ft/xids.h" #include "ft/xids.h"
#include "ft/ft_msg.h" #include "ft/ft_msg.h"
#include "ft/ybt.h"
// If the fifo_entry is unpacked, the compiler aligns the xids array and we waste a lot of space class message_buffer {
struct __attribute__((__packed__)) fifo_entry { public:
unsigned int keylen; void create();
unsigned int vallen;
unsigned char type;
bool is_fresh;
MSN msn;
XIDS_S xids_s;
};
typedef struct fifo *FIFO;
int toku_fifo_create(FIFO *); void clone(message_buffer *dst);
void toku_fifo_resize(FIFO fifo, size_t new_size); void destroy();
void toku_fifo_free(FIFO *); void resize(size_t new_size);
int toku_fifo_n_entries(FIFO); void enqueue(FT_MSG msg, bool is_fresh, int32_t *offset);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, int32_t *dest); void set_freshness(int32_t offset, bool is_fresh);
unsigned int toku_fifo_buffer_size_in_use (FIFO fifo); bool get_freshness(int32_t offset) const;
unsigned long toku_fifo_memory_size_in_use(FIFO fifo); // return how much memory in the fifo holds useful data
unsigned long toku_fifo_memory_footprint(FIFO fifo); // return how much memory the fifo occupies FT_MSG_S get_message(int32_t offset, DBT *keydbt, DBT *valdbt) const;
void toku_fifo_iterate(FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, void*), void*); void get_message_key_msn(int32_t offset, DBT *key, MSN *msn) const;
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,msnvar,xidsvar,is_freshvar,body) ({ \ int num_entries() const;
for (int fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \
toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \
fifo_iterate_off = toku_fifo_iterate_internal_next(fifo, fifo_iterate_off)) { \
struct fifo_entry *e = toku_fifo_iterate_internal_get_entry(fifo, fifo_iterate_off); \
ITEMLEN keylenvar = e->keylen; \
ITEMLEN datalenvar = e->vallen; \
enum ft_msg_type typevar = (enum ft_msg_type) e->type; \
MSN msnvar = e->msn; \
XIDS xidsvar = &e->xids_s; \
bytevec keyvar = xids_get_end_of_array(xidsvar); \
bytevec datavar = (const uint8_t*)keyvar + e->keylen; \
bool is_freshvar = e->is_fresh; \
body; \
} })
#define FIFO_CURRENT_ENTRY_MEMSIZE toku_fifo_internal_entry_memsize(e) size_t buffer_size_in_use() const;
// Internal functions for the iterator. size_t memory_size_in_use() const;
int toku_fifo_iterate_internal_start(FIFO fifo);
int toku_fifo_iterate_internal_has_more(FIFO fifo, int off);
int toku_fifo_iterate_internal_next(FIFO fifo, int off);
struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off);
size_t toku_fifo_internal_entry_memsize(struct fifo_entry *e) __attribute__((const,nonnull));
size_t toku_ft_msg_memsize_in_fifo(FT_MSG msg) __attribute__((const,nonnull));
DBT *fill_dbt_for_fifo_entry(DBT *dbt, const struct fifo_entry *entry); size_t memory_footprint() const;
struct fifo_entry *toku_fifo_get_entry(FIFO fifo, int off);
void toku_fifo_clone(FIFO orig_fifo, FIFO* cloned_fifo); template <typename F>
int iterate(F &fn) const {
for (int32_t offset = 0; offset < _memory_used; ) {
DBT k, v;
FT_MSG_S msg = get_message(offset, &k, &v);
bool is_fresh = get_freshness(offset);
int r = fn(&msg, is_fresh);
if (r != 0) {
return r;
}
offset += msg_memsize_in_buffer(&msg);
}
return 0;
}
bool toku_are_fifos_same(FIFO fifo1, FIFO fifo2); bool equals(message_buffer *other) const;
static size_t msg_memsize_in_buffer(FT_MSG msg);
private:
// If this isn't packged, the compiler aligns the xids array and we waste a lot of space
struct __attribute__((__packed__)) buffer_entry {
unsigned int keylen;
unsigned int vallen;
unsigned char type;
bool is_fresh;
MSN msn;
XIDS_S xids_s;
};
struct buffer_entry *get_buffer_entry(int32_t offset) const;
#endif int _num_entries;
char *_memory; // An array of bytes into which buffer entries are embedded.
int _memory_size; // How big is _memory
int _memory_used; // How many bytes are in use?
};
...@@ -94,28 +94,19 @@ PATENT RIGHTS GRANT: ...@@ -94,28 +94,19 @@ PATENT RIGHTS GRANT:
#include "test.h" #include "test.h"
static void static void
test_fifo_create (void) { test_create (void) {
int r; message_buffer msg_buffer;
FIFO f; msg_buffer.create();
msg_buffer.destroy();
f = 0;
r = toku_fifo_create(&f);
assert(r == 0); assert(f != 0);
toku_fifo_free(&f);
assert(f == 0);
} }
static void static void
test_fifo_enq (int n) { test_enqueue(int n) {
int r; int r;
FIFO f; message_buffer msg_buffer;
MSN startmsn = ZERO_MSN; MSN startmsn = ZERO_MSN;
f = 0; msg_buffer.create();
r = toku_fifo_create(&f);
assert(r == 0); assert(f != 0);
char *thekey = 0; int thekeylen; char *thekey = 0; int thekeylen;
char *theval = 0; int thevallen; char *theval = 0; int thevallen;
...@@ -146,38 +137,56 @@ test_fifo_enq (int n) { ...@@ -146,38 +137,56 @@ test_fifo_enq (int n) {
if (startmsn.msn == ZERO_MSN.msn) if (startmsn.msn == ZERO_MSN.msn)
startmsn = msn; startmsn = msn;
enum ft_msg_type type = (enum ft_msg_type) i; enum ft_msg_type type = (enum ft_msg_type) i;
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, type, msn, xids, true, NULL); assert(r == 0); DBT k, v;
FT_MSG_S msg = {
type, msn, xids, .u = { .id = { toku_fill_dbt(&k, thekey, thekeylen), toku_fill_dbt(&v, theval, thevallen) } }
};
msg_buffer.enqueue(&msg, true, nullptr);
xids_destroy(&xids); xids_destroy(&xids);
} }
int i = 0; struct checkit_fn {
FIFO_ITERATE(f, key, keylen, val, vallen, type, msn, xids, UU(is_fresh), { char *thekey;
if (verbose) printf("checkit %d %d %" PRIu64 "\n", i, type, msn.msn); int thekeylen;
assert(msn.msn == startmsn.msn + i); char *theval;
buildkey(i); int thevallen;
buildval(i); MSN startmsn;
assert((int) keylen == thekeylen); assert(memcmp(key, thekey, keylen) == 0); int verbose;
assert((int) vallen == thevallen); assert(memcmp(val, theval, vallen) == 0); int i;
assert(i % 256 == (int)type); checkit_fn(char *tk, int tkl, char *tv, int tvl, MSN smsn, bool v)
assert((TXNID)i==xids_get_innermost_xid(xids)); : thekey(tk), thekeylen(tkl), theval(tv), thevallen(tvl), startmsn(smsn), verbose(v), i(0) {
i += 1; }
}); int operator()(FT_MSG msg, bool UU(is_fresh)) {
assert(i == n); MSN msn = msg->msn;
enum ft_msg_type type = ft_msg_get_type(msg);
if (verbose) printf("checkit %d %d %" PRIu64 "\n", i, type, msn.msn);
assert(msn.msn == startmsn.msn + i);
buildkey(i);
buildval(i);
assert((int) ft_msg_get_keylen(msg) == thekeylen); assert(memcmp(ft_msg_get_key(msg), thekey, ft_msg_get_keylen(msg)) == 0);
assert((int) ft_msg_get_vallen(msg) == thevallen); assert(memcmp(ft_msg_get_val(msg), theval, ft_msg_get_vallen(msg)) == 0);
assert(i % 256 == (int)type);
assert((TXNID)i==xids_get_innermost_xid(ft_msg_get_xids(msg)));
i += 1;
return 0;
}
} checkit(thekey, thekeylen, theval, thevallen, startmsn, verbose);
msg_buffer.iterate(checkit);
assert(checkit.i == n);
if (thekey) toku_free(thekey); if (thekey) toku_free(thekey);
if (theval) toku_free(theval); if (theval) toku_free(theval);
toku_fifo_free(&f); msg_buffer.destroy();
assert(f == 0);
} }
int int
test_main(int argc, const char *argv[]) { test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv); default_parse_args(argc, argv);
initialize_dummymsn(); initialize_dummymsn();
test_fifo_create(); test_create();
test_fifo_enq(4); test_enqueue(4);
test_fifo_enq(512); test_enqueue(512);
return 0; return 0;
} }
...@@ -1160,13 +1160,13 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, bool do_clone) { ...@@ -1160,13 +1160,13 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, bool do_clone) {
assert(BP_BLOCKNUM(dn,0).b==30); assert(BP_BLOCKNUM(dn,0).b==30);
assert(BP_BLOCKNUM(dn,1).b==35); assert(BP_BLOCKNUM(dn,1).b==35);
FIFO src_fifo_1 = BNC(&sn, 0)->buffer; message_buffer *src_msg_buffer1 = &BNC(&sn, 0)->msg_buffer;
FIFO src_fifo_2 = BNC(&sn, 1)->buffer; message_buffer *src_msg_buffer2 = &BNC(&sn, 1)->msg_buffer;
FIFO dest_fifo_1 = BNC(dn, 0)->buffer; message_buffer *dest_msg_buffer1 = &BNC(dn, 0)->msg_buffer;
FIFO dest_fifo_2 = BNC(dn, 1)->buffer; message_buffer *dest_msg_buffer2 = &BNC(dn, 1)->msg_buffer;
assert(toku_are_fifos_same(src_fifo_1, dest_fifo_1)); assert(src_msg_buffer1->equals(dest_msg_buffer1));
assert(toku_are_fifos_same(src_fifo_2, dest_fifo_2)); assert(src_msg_buffer2->equals(dest_msg_buffer2));
toku_ftnode_free(&dn); toku_ftnode_free(&dn);
......
...@@ -384,41 +384,60 @@ flush_to_internal(FT_HANDLE t) { ...@@ -384,41 +384,60 @@ flush_to_internal(FT_HANDLE t) {
memset(parent_messages_present, 0, sizeof parent_messages_present); memset(parent_messages_present, 0, sizeof parent_messages_present);
memset(child_messages_present, 0, sizeof child_messages_present); memset(child_messages_present, 0, sizeof child_messages_present);
FIFO_ITERATE(child_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, struct checkit_fn {
{ int num_parent_messages;
DBT keydbt; FT_MSG *parent_messages;
DBT valdbt; int *parent_messages_present;
toku_fill_dbt(&keydbt, key, keylen); bool *parent_messages_is_fresh;
toku_fill_dbt(&valdbt, val, vallen); int num_child_messages;
int found = 0; FT_MSG *child_messages;
for (i = 0; i < num_parent_messages; ++i) { int *child_messages_present;
if (dummy_cmp(NULL, &keydbt, parent_messages[i]->u.id.key) == 0 && bool *child_messages_is_fresh;
msn.msn == parent_messages[i]->msn.msn) { checkit_fn(int np, FT_MSG *pm, int *npp, bool *pmf, int nc, FT_MSG *cm, int *ncp, bool *cmf) :
assert(parent_messages_present[i] == 0); num_parent_messages(np), parent_messages(pm), parent_messages_present(npp), parent_messages_is_fresh(pmf),
assert(found == 0); num_child_messages(nc), child_messages(cm), child_messages_present(ncp), child_messages_is_fresh(cmf) {
assert(dummy_cmp(NULL, &valdbt, parent_messages[i]->u.id.val) == 0); }
assert(type == parent_messages[i]->type); int operator()(FT_MSG msg, bool is_fresh) {
assert(xids_get_innermost_xid(xids) == xids_get_innermost_xid(parent_messages[i]->xids)); DBT keydbt;
assert(parent_messages_is_fresh[i] == is_fresh); DBT valdbt;
parent_messages_present[i]++; toku_fill_dbt(&keydbt, ft_msg_get_key(msg), ft_msg_get_keylen(msg));
found++; toku_fill_dbt(&valdbt, ft_msg_get_val(msg), ft_msg_get_vallen(msg));
} int found = 0;
} MSN msn = msg->msn;
for (i = 0; i < num_child_messages; ++i) { enum ft_msg_type type = ft_msg_get_type(msg);
if (dummy_cmp(NULL, &keydbt, child_messages[i]->u.id.key) == 0 && XIDS xids = ft_msg_get_xids(msg);
msn.msn == child_messages[i]->msn.msn) { for (int i = 0; i < num_parent_messages; ++i) {
assert(child_messages_present[i] == 0); if (dummy_cmp(NULL, &keydbt, parent_messages[i]->u.id.key) == 0 &&
assert(found == 0); msn.msn == parent_messages[i]->msn.msn) {
assert(dummy_cmp(NULL, &valdbt, child_messages[i]->u.id.val) == 0); assert(parent_messages_present[i] == 0);
assert(type == child_messages[i]->type); assert(found == 0);
assert(xids_get_innermost_xid(xids) == xids_get_innermost_xid(child_messages[i]->xids)); assert(dummy_cmp(NULL, &valdbt, parent_messages[i]->u.id.val) == 0);
assert(child_messages_is_fresh[i] == is_fresh); assert(type == parent_messages[i]->type);
child_messages_present[i]++; assert(xids_get_innermost_xid(xids) == xids_get_innermost_xid(parent_messages[i]->xids));
found++; assert(parent_messages_is_fresh[i] == is_fresh);
} parent_messages_present[i]++;
} found++;
assert(found == 1); }
}); }
for (int i = 0; i < num_child_messages; ++i) {
if (dummy_cmp(NULL, &keydbt, child_messages[i]->u.id.key) == 0 &&
msn.msn == child_messages[i]->msn.msn) {
assert(child_messages_present[i] == 0);
assert(found == 0);
assert(dummy_cmp(NULL, &valdbt, child_messages[i]->u.id.val) == 0);
assert(type == child_messages[i]->type);
assert(xids_get_innermost_xid(xids) == xids_get_innermost_xid(child_messages[i]->xids));
assert(child_messages_is_fresh[i] == is_fresh);
child_messages_present[i]++;
found++;
}
}
assert(found == 1);
return 0;
}
} checkit(num_parent_messages, parent_messages, parent_messages_present, parent_messages_is_fresh,
num_child_messages, child_messages, child_messages_present, child_messages_is_fresh);
child_bnc->msg_buffer.iterate(checkit);
for (i = 0; i < num_parent_messages; ++i) { for (i = 0; i < num_parent_messages; ++i) {
assert(parent_messages_present[i] == 1); assert(parent_messages_present[i] == 1);
...@@ -525,41 +544,60 @@ flush_to_internal_multiple(FT_HANDLE t) { ...@@ -525,41 +544,60 @@ flush_to_internal_multiple(FT_HANDLE t) {
memset(child_messages_present, 0, sizeof child_messages_present); memset(child_messages_present, 0, sizeof child_messages_present);
for (int j = 0; j < 8; ++j) { for (int j = 0; j < 8; ++j) {
FIFO_ITERATE(child_bncs[j]->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, struct checkit_fn {
{ int num_parent_messages;
DBT keydbt; FT_MSG *parent_messages;
DBT valdbt; int *parent_messages_present;
toku_fill_dbt(&keydbt, key, keylen); bool *parent_messages_is_fresh;
toku_fill_dbt(&valdbt, val, vallen); int num_child_messages;
int found = 0; FT_MSG *child_messages;
for (i = 0; i < num_parent_messages; ++i) { int *child_messages_present;
if (dummy_cmp(NULL, &keydbt, parent_messages[i]->u.id.key) == 0 && bool *child_messages_is_fresh;
msn.msn == parent_messages[i]->msn.msn) { checkit_fn(int np, FT_MSG *pm, int *npp, bool *pmf, int nc, FT_MSG *cm, int *ncp, bool *cmf) :
assert(parent_messages_present[i] == 0); num_parent_messages(np), parent_messages(pm), parent_messages_present(npp), parent_messages_is_fresh(pmf),
assert(found == 0); num_child_messages(nc), child_messages(cm), child_messages_present(ncp), child_messages_is_fresh(cmf) {
assert(dummy_cmp(NULL, &valdbt, parent_messages[i]->u.id.val) == 0); }
assert(type == parent_messages[i]->type); int operator()(FT_MSG msg, bool is_fresh) {
assert(xids_get_innermost_xid(xids) == xids_get_innermost_xid(parent_messages[i]->xids)); DBT keydbt;
assert(parent_messages_is_fresh[i] == is_fresh); DBT valdbt;
parent_messages_present[i]++; toku_fill_dbt(&keydbt, ft_msg_get_key(msg), ft_msg_get_keylen(msg));
found++; toku_fill_dbt(&valdbt, ft_msg_get_val(msg), ft_msg_get_vallen(msg));
} int found = 0;
} MSN msn = msg->msn;
for (i = 0; i < num_child_messages; ++i) { enum ft_msg_type type = ft_msg_get_type(msg);
if (dummy_cmp(NULL, &keydbt, child_messages[i]->u.id.key) == 0 && XIDS xids = ft_msg_get_xids(msg);
msn.msn == child_messages[i]->msn.msn) { for (int i = 0; i < num_parent_messages; ++i) {
assert(child_messages_present[i] == 0); if (dummy_cmp(NULL, &keydbt, parent_messages[i]->u.id.key) == 0 &&
assert(found == 0); msn.msn == parent_messages[i]->msn.msn) {
assert(dummy_cmp(NULL, &valdbt, child_messages[i]->u.id.val) == 0); assert(parent_messages_present[i] == 0);
assert(type == child_messages[i]->type); assert(found == 0);
assert(xids_get_innermost_xid(xids) == xids_get_innermost_xid(child_messages[i]->xids)); assert(dummy_cmp(NULL, &valdbt, parent_messages[i]->u.id.val) == 0);
assert(child_messages_is_fresh[i] == is_fresh); assert(type == parent_messages[i]->type);
child_messages_present[i]++; assert(xids_get_innermost_xid(xids) == xids_get_innermost_xid(parent_messages[i]->xids));
found++; assert(parent_messages_is_fresh[i] == is_fresh);
} parent_messages_present[i]++;
} found++;
assert(found == 1); }
}); }
for (int i = 0; i < num_child_messages; ++i) {
if (dummy_cmp(NULL, &keydbt, child_messages[i]->u.id.key) == 0 &&
msn.msn == child_messages[i]->msn.msn) {
assert(child_messages_present[i] == 0);
assert(found == 0);
assert(dummy_cmp(NULL, &valdbt, child_messages[i]->u.id.val) == 0);
assert(type == child_messages[i]->type);
assert(xids_get_innermost_xid(xids) == xids_get_innermost_xid(child_messages[i]->xids));
assert(child_messages_is_fresh[i] == is_fresh);
child_messages_present[i]++;
found++;
}
}
assert(found == 1);
return 0;
}
} checkit(num_parent_messages, parent_messages, parent_messages_present, parent_messages_is_fresh,
num_child_messages, child_messages, child_messages_present, child_messages_is_fresh);
child_bncs[j]->msg_buffer.iterate(checkit);
} }
for (i = 0; i < num_parent_messages; ++i) { for (i = 0; i < num_parent_messages; ++i) {
...@@ -721,11 +759,13 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) { ...@@ -721,11 +759,13 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
bool msgs_applied; bool msgs_applied;
toku_apply_ancestors_messages_to_node(t, child, &ancestors, &infinite_bounds, &msgs_applied, -1); toku_apply_ancestors_messages_to_node(t, child, &ancestors, &infinite_bounds, &msgs_applied, -1);
FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, struct checkit_fn {
{ int operator()(FT_MSG UU(msg), bool is_fresh) {
key = key; keylen = keylen; val = val; vallen = vallen; type = type; msn = msn; xids = xids; assert(!is_fresh);
assert(!is_fresh); return 0;
}); }
} checkit;
parent_bnc->msg_buffer.iterate(checkit);
invariant(parent_bnc->fresh_message_tree.size() + parent_bnc->stale_message_tree.size() invariant(parent_bnc->fresh_message_tree.size() + parent_bnc->stale_message_tree.size()
== (uint32_t) num_parent_messages); == (uint32_t) num_parent_messages);
...@@ -947,23 +987,33 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) { ...@@ -947,23 +987,33 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
bool msgs_applied; bool msgs_applied;
toku_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds, &msgs_applied, -1); toku_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds, &msgs_applied, -1);
FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, struct checkit_fn {
{ DBT *childkeys;
val = val; vallen = vallen; type = type; msn = msn; xids = xids; int num_parent_messages;
DBT keydbt; FT_MSG *parent_messages;
toku_fill_dbt(&keydbt, key, keylen); bool *parent_messages_is_fresh;
if (dummy_cmp(NULL, &keydbt, &childkeys[7]) > 0) { checkit_fn(DBT *ck, int np, FT_MSG *pm, bool *pmf) :
for (i = 0; i < num_parent_messages; ++i) { childkeys(ck), num_parent_messages(np), parent_messages(pm), parent_messages_is_fresh(pmf) {
if (dummy_cmp(NULL, &keydbt, parent_messages[i]->u.id.key) == 0 && }
msn.msn == parent_messages[i]->msn.msn) { int operator()(FT_MSG msg, bool is_fresh) {
assert(is_fresh == parent_messages_is_fresh[i]); DBT keydbt;
break; toku_fill_dbt(&keydbt, ft_msg_get_key(msg), ft_msg_get_keylen(msg));
} MSN msn = msg->msn;
} if (dummy_cmp(NULL, &keydbt, &childkeys[7]) > 0) {
} else { for (int i = 0; i < num_parent_messages; ++i) {
assert(!is_fresh); if (dummy_cmp(NULL, &keydbt, parent_messages[i]->u.id.key) == 0 &&
} msn.msn == parent_messages[i]->msn.msn) {
}); assert(is_fresh == parent_messages_is_fresh[i]);
break;
}
}
} else {
assert(!is_fresh);
}
return 0;
}
} checkit(childkeys, num_parent_messages, parent_messages, parent_messages_is_fresh);
parent_bnc->msg_buffer.iterate(checkit);
toku_ftnode_free(&parentnode); toku_ftnode_free(&parentnode);
...@@ -1134,11 +1184,13 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) { ...@@ -1134,11 +1184,13 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
bool msgs_applied; bool msgs_applied;
toku_apply_ancestors_messages_to_node(t, child2, &ancestors, &infinite_bounds, &msgs_applied, -1); toku_apply_ancestors_messages_to_node(t, child2, &ancestors, &infinite_bounds, &msgs_applied, -1);
FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, struct checkit_fn {
{ int operator()(FT_MSG UU(msg), bool is_fresh) {
key = key; keylen = keylen; val = val; vallen = vallen; type = type; msn = msn; xids = xids; assert(!is_fresh);
assert(!is_fresh); return 0;
}); }
} checkit;
parent_bnc->msg_buffer.iterate(checkit);
invariant(parent_bnc->fresh_message_tree.size() + parent_bnc->stale_message_tree.size() invariant(parent_bnc->fresh_message_tree.size() + parent_bnc->stale_message_tree.size()
== (uint32_t) num_parent_messages); == (uint32_t) num_parent_messages);
......
...@@ -276,38 +276,47 @@ static void dump_node(int fd, BLOCKNUM blocknum, FT h) { ...@@ -276,38 +276,47 @@ static void dump_node(int fd, BLOCKNUM blocknum, FT h) {
printf(" buffer contains %u bytes (%d items)\n", n_bytes, n_entries); printf(" buffer contains %u bytes (%d items)\n", n_bytes, n_entries);
} }
if (do_dump_data) { if (do_dump_data) {
FIFO_ITERATE(bnc->buffer, key, keylen, data, datalen, typ, msn, xids, UU(is_fresh), struct dump_data_fn {
{ int operator()(FT_MSG msg, bool UU(is_fresh)) {
printf(" msn=%" PRIu64 " (0x%" PRIx64 ") ", msn.msn, msn.msn); enum ft_msg_type type = (enum ft_msg_type) msg->type;
printf(" TYPE="); MSN msn = msg->msn;
switch ((enum ft_msg_type)typ) { XIDS xids = msg->xids;
case FT_NONE: printf("NONE"); goto ok; const void *key = ft_msg_get_key(msg);
case FT_INSERT: printf("INSERT"); goto ok; const void *data = ft_msg_get_val(msg);
case FT_INSERT_NO_OVERWRITE: printf("INSERT_NO_OVERWRITE"); goto ok; ITEMLEN keylen = ft_msg_get_keylen(msg);
case FT_DELETE_ANY: printf("DELETE_ANY"); goto ok; ITEMLEN datalen = ft_msg_get_vallen(msg);
case FT_ABORT_ANY: printf("ABORT_ANY"); goto ok; printf(" msn=%" PRIu64 " (0x%" PRIx64 ") ", msn.msn, msn.msn);
case FT_COMMIT_ANY: printf("COMMIT_ANY"); goto ok; printf(" TYPE=");
case FT_COMMIT_BROADCAST_ALL: printf("COMMIT_BROADCAST_ALL"); goto ok; switch (type) {
case FT_COMMIT_BROADCAST_TXN: printf("COMMIT_BROADCAST_TXN"); goto ok; case FT_NONE: printf("NONE"); goto ok;
case FT_ABORT_BROADCAST_TXN: printf("ABORT_BROADCAST_TXN"); goto ok; case FT_INSERT: printf("INSERT"); goto ok;
case FT_OPTIMIZE: printf("OPTIMIZE"); goto ok; case FT_INSERT_NO_OVERWRITE: printf("INSERT_NO_OVERWRITE"); goto ok;
case FT_OPTIMIZE_FOR_UPGRADE: printf("OPTIMIZE_FOR_UPGRADE"); goto ok; case FT_DELETE_ANY: printf("DELETE_ANY"); goto ok;
case FT_UPDATE: printf("UPDATE"); goto ok; case FT_ABORT_ANY: printf("ABORT_ANY"); goto ok;
case FT_UPDATE_BROADCAST_ALL: printf("UPDATE_BROADCAST_ALL"); goto ok; case FT_COMMIT_ANY: printf("COMMIT_ANY"); goto ok;
} case FT_COMMIT_BROADCAST_ALL: printf("COMMIT_BROADCAST_ALL"); goto ok;
printf("HUH?"); case FT_COMMIT_BROADCAST_TXN: printf("COMMIT_BROADCAST_TXN"); goto ok;
ok: case FT_ABORT_BROADCAST_TXN: printf("ABORT_BROADCAST_TXN"); goto ok;
printf(" xid="); case FT_OPTIMIZE: printf("OPTIMIZE"); goto ok;
xids_fprintf(stdout, xids); case FT_OPTIMIZE_FOR_UPGRADE: printf("OPTIMIZE_FOR_UPGRADE"); goto ok;
printf(" "); case FT_UPDATE: printf("UPDATE"); goto ok;
print_item(key, keylen); case FT_UPDATE_BROADCAST_ALL: printf("UPDATE_BROADCAST_ALL"); goto ok;
if (datalen>0) { }
printf(" "); printf("HUH?");
print_item(data, datalen); ok:
} printf(" xid=");
printf("\n"); xids_fprintf(stdout, xids);
} printf(" ");
); print_item(key, keylen);
if (datalen>0) {
printf(" ");
print_item(data, datalen);
}
printf("\n");
return 0;
}
} dump_fn;
bnc->msg_buffer.iterate(dump_fn);
} }
} else { } else {
printf(" n_bytes_in_buffer= %" PRIu64 "", BLB_DATA(n, i)->get_disk_size()); printf(" n_bytes_in_buffer= %" PRIu64 "", BLB_DATA(n, i)->get_disk_size());
......
...@@ -98,6 +98,8 @@ PATENT RIGHTS GRANT: ...@@ -98,6 +98,8 @@ PATENT RIGHTS GRANT:
// ids[num_xids - 1] is the innermost transaction. // ids[num_xids - 1] is the innermost transaction.
// Should only be accessed by accessor functions xids_xxx, not directly. // Should only be accessed by accessor functions xids_xxx, not directly.
#include <portability/toku_stdint.h>
// If the xids struct is unpacked, the compiler aligns the ids[] and we waste a lot of space // If the xids struct is unpacked, the compiler aligns the ids[] and we waste a lot of space
typedef struct __attribute__((__packed__)) xids_t { typedef struct __attribute__((__packed__)) xids_t {
uint8_t num_xids; // maximum value of MAX_TRANSACTION_RECORDS - 1 ... uint8_t num_xids; // maximum value of MAX_TRANSACTION_RECORDS - 1 ...
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment