Commit 5c81f889 authored by John Esmet's avatar John Esmet

FT-249 Move de/serialization code to the message buffer class itself,

further simplifying the logic that is contained in
ft/ft_node-serialize.cc
parent ebc1a7ea
......@@ -341,41 +341,26 @@ wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *co
return 0;
}
static void
serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb)
{
static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) {
unsigned char ch = FTNODE_PARTITION_MSG_BUFFER;
wbuf_nocrc_char(wb, ch);
// serialize the message buffer, first the number of entries, then the elements
wbuf_nocrc_int(wb, toku_bnc_n_entries(bnc));
struct msg_serialize_fn {
struct wbuf *wb;
msg_serialize_fn(struct wbuf *w) : wb(w) { }
int operator()(FT_MSG msg, bool is_fresh) {
enum ft_msg_type type = (enum ft_msg_type) msg->type;
paranoid_invariant((int) type >= 0 && (int) type < 256);
wbuf_nocrc_char(wb, (unsigned char) type);
wbuf_nocrc_char(wb, (unsigned char) is_fresh);
wbuf_MSN(wb, msg->msn);
wbuf_nocrc_xids(wb, ft_msg_get_xids(msg));
wbuf_nocrc_bytes(wb, ft_msg_get_key(msg), ft_msg_get_keylen(msg));
wbuf_nocrc_bytes(wb, ft_msg_get_val(msg), ft_msg_get_vallen(msg));
return 0;
}
} serialize_fn(wb);
bnc->msg_buffer.iterate(serialize_fn);
// serialize the message buffer
bnc->msg_buffer.serialize_to_wbuf(wb);
// serialize the message trees (num entries, offsets array):
// first, verify their contents are consistent with the message buffer
bnc_verify_message_trees(bnc);
// serialize the message trees (num entries, offsets array):
// fresh, stale, broadcast
// fresh
wbuf_nocrc_int(wb, bnc->fresh_message_tree.size());
bnc->fresh_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
// stale
wbuf_nocrc_int(wb, bnc->stale_message_tree.size());
bnc->stale_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
// broadcast
wbuf_nocrc_int(wb, bnc->broadcast_list.size());
bnc->broadcast_list.iterate<struct wbuf, wbuf_write_offset>(wb);
}
......@@ -875,67 +860,23 @@ toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DA
static void
deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, const toku::comparator &cmp) {
int n_in_this_buffer = rbuf_int(rbuf);
int32_t *fresh_offsets = nullptr, *stale_offsets = nullptr;
int32_t *broadcast_offsets = nullptr;
int nfresh = 0, nstale = 0;
int nbroadcast_offsets = 0;
int32_t nfresh = 0, nstale = 0, nbroadcast = 0;
int32_t *fresh_offsets, *stale_offsets, *broadcast_offsets;
// Only sort buffers if we have a valid comparison function. In certain scenarios,
// like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
// for simple inspection and don't actually require that the message buffers are
// properly sorted. This is very ugly, but correct.
const bool sort_buffers = cmp.valid();
const bool sort = cmp.valid();
if (sort_buffers) {
XMALLOC_N(n_in_this_buffer, stale_offsets);
XMALLOC_N(n_in_this_buffer, fresh_offsets);
XMALLOC_N(n_in_this_buffer, broadcast_offsets);
}
// read in the message buffer
bnc->msg_buffer.deserialize_from_rbuf(rbuf,
sort ? &fresh_offsets : nullptr, &nfresh,
sort ? &stale_offsets : nullptr, &nstale,
sort ? &broadcast_offsets : nullptr, &nbroadcast);
bnc->msg_buffer.resize(rbuf->size + 64);
for (int i = 0; i < n_in_this_buffer; i++) {
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
// this is weird but it's necessary to pass icc and gcc together
unsigned char ctype = rbuf_char(rbuf);
enum ft_msg_type type = (enum ft_msg_type) ctype;
bool is_fresh = rbuf_char(rbuf);
MSN msn = rbuf_msn(rbuf);
XIDS xids;
xids_create_from_buffer(rbuf, &xids);
rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rbuf, &val, &vallen);
int32_t *dest = nullptr;
if (sort_buffers) {
if (ft_msg_type_applies_once(type)) {
if (is_fresh) {
dest = &fresh_offsets[nfresh];
nfresh++;
} else {
dest = &stale_offsets[nstale];
nstale++;
}
} else if (ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)) {
dest = &broadcast_offsets[nbroadcast_offsets];
nbroadcast_offsets++;
} else {
abort();
}
}
// TODO: Function to parse stuff out of an rbuf into an FT_MSG
DBT k, v;
FT_MSG_S msg = {
type, msn, xids,
.u = { .id = { toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, val, vallen) } }
};
bnc->msg_buffer.enqueue(&msg, is_fresh, dest);
xids_destroy(&xids);
}
invariant(rbuf->ndone == rbuf->size);
if (sort_buffers) {
if (sort) {
int n_in_this_buffer = nfresh + nstale + nbroadcast;
struct toku_msg_buffer_key_msn_cmp_extra extra(cmp, &bnc->msg_buffer);
toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp>::mergesort_r(fresh_offsets, nfresh, extra);
bnc->fresh_message_tree.destroy();
......@@ -944,66 +885,44 @@ deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, const tok
bnc->stale_message_tree.destroy();
bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, n_in_this_buffer);
bnc->broadcast_list.destroy();
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast_offsets, n_in_this_buffer);
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast, n_in_this_buffer);
}
}
// effect: deserialize a single message from rbuf and enqueue the result into the given message buffer
static void
msg_buffer_deserialize_msg_from_rbuf(message_buffer *msg_buffer, struct rbuf *rbuf) {
bytevec key, val;
ITEMLEN keylen, vallen;
enum ft_msg_type type = (enum ft_msg_type) rbuf_char(rbuf);
bool is_fresh = rbuf_char(rbuf);
MSN msn = rbuf_msn(rbuf);
XIDS xids;
xids_create_from_buffer(rbuf, &xids);
rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rbuf, &val, &vallen);
// TODO: Function to parse stuff out of an rbuf into an FT_MSG
DBT k, v;
FT_MSG_S msg = {
type, msn, xids,
.u = { .id = { toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, val, vallen) } }
};
msg_buffer->enqueue(&msg, is_fresh, nullptr);
xids_destroy(&xids);
}
static void
deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf) {
int n_in_this_buffer = rbuf_int(rbuf);
int nfresh = 0, nstale = 0, nbroadcast_offsets = 0;
int32_t *XMALLOC_N(n_in_this_buffer, stale_offsets);
int32_t *XMALLOC_N(n_in_this_buffer, fresh_offsets);
int32_t *XMALLOC_N(n_in_this_buffer, broadcast_offsets);
bnc->msg_buffer.resize(rbuf->size + 64);
for (int i = 0; i < n_in_this_buffer; i++) {
msg_buffer_deserialize_msg_from_rbuf(&bnc->msg_buffer, rbuf);
}
// read in the message buffer
bnc->msg_buffer.deserialize_from_rbuf(rbuf,
nullptr, nullptr, // fresh_offsets, nfresh,
nullptr, nullptr, // stale_offsets, nstale,
nullptr, nullptr); // broadcast_offsets, nbroadcast
// read in each message tree (fresh, stale, broadcast)
nfresh = rbuf_int(rbuf);
int32_t nfresh = rbuf_int(rbuf);
int32_t *XMALLOC_N(nfresh, fresh_offsets);
for (int i = 0; i < nfresh; i++) {
fresh_offsets[i] = rbuf_int(rbuf);
}
nstale = rbuf_int(rbuf);
int32_t nstale = rbuf_int(rbuf);
int32_t *XMALLOC_N(nstale, stale_offsets);
for (int i = 0; i < nstale; i++) {
stale_offsets[i] = rbuf_int(rbuf);
}
nbroadcast_offsets = rbuf_int(rbuf);
for (int i = 0; i < nbroadcast_offsets; i++) {
int32_t nbroadcast = rbuf_int(rbuf);
int32_t *XMALLOC_N(nbroadcast, broadcast_offsets);
for (int i = 0; i < nbroadcast; i++) {
broadcast_offsets[i] = rbuf_int(rbuf);
}
// build OMTs out of each offset array
bnc->fresh_message_tree.destroy();
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer);
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, nfresh);
bnc->stale_message_tree.destroy();
bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, n_in_this_buffer);
bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, nstale);
bnc->broadcast_list.destroy();
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast_offsets, n_in_this_buffer);
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast, nbroadcast);
}
// dump a buffer to stderr
......@@ -1776,7 +1695,7 @@ deserialize_and_upgrade_internal_node(FTNODE node,
int32_t *fresh_offsets = nullptr;
int32_t *broadcast_offsets = nullptr;
int nfresh = 0;
int nbroadcast_offsets = 0;
int nbroadcast = 0;
// We skip 'stale' offsets for upgraded nodes.
if (sort_buffers) {
......@@ -1811,8 +1730,8 @@ deserialize_and_upgrade_internal_node(FTNODE node,
dest = &fresh_offsets[nfresh];
nfresh++;
} else if (ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)) {
dest = &broadcast_offsets[nbroadcast_offsets];
nbroadcast_offsets++;
dest = &broadcast_offsets[nbroadcast];
nbroadcast++;
} else {
abort();
}
......@@ -1838,7 +1757,7 @@ deserialize_and_upgrade_internal_node(FTNODE node,
bnc->fresh_message_tree.destroy();
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer);
bnc->broadcast_list.destroy();
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast_offsets, n_in_this_buffer);
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast, n_in_this_buffer);
}
}
......
......@@ -110,7 +110,63 @@ void message_buffer::destroy() {
}
}
void message_buffer::resize(size_t new_size) {
void message_buffer::deserialize_from_rbuf(struct rbuf *rb,
int32_t **fresh_offsets, int32_t *nfresh,
int32_t **stale_offsets, int32_t *nstale,
int32_t **broadcast_offsets, int32_t *nbroadcast) {
// read the number of messages in this buffer
int n_in_this_buffer = rbuf_int(rb);
if (fresh_offsets != nullptr) {
XMALLOC_N(n_in_this_buffer, *fresh_offsets);
}
if (stale_offsets != nullptr) {
XMALLOC_N(n_in_this_buffer, *stale_offsets);
}
if (broadcast_offsets != nullptr) {
XMALLOC_N(n_in_this_buffer, *broadcast_offsets);
}
_resize(rb->size + 64); // rb->size is a good hint for how big the buffer will be
// read in each message individually
for (int i = 0; i < n_in_this_buffer; i++) {
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
// this is weird but it's necessary to pass icc and gcc together
unsigned char ctype = rbuf_char(rb);
enum ft_msg_type type = (enum ft_msg_type) ctype;
bool is_fresh = rbuf_char(rb);
MSN msn = rbuf_msn(rb);
XIDS xids;
xids_create_from_buffer(rb, &xids);
rbuf_bytes(rb, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rb, &val, &vallen);
int32_t *dest = nullptr;
if (ft_msg_type_applies_once(type)) {
if (is_fresh) {
dest = fresh_offsets ? *fresh_offsets + (*nfresh)++ : nullptr;
} else {
dest = stale_offsets ? *stale_offsets + (*nstale)++ : nullptr;
}
} else {
invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type));
dest = broadcast_offsets ? *broadcast_offsets + (*nbroadcast)++ : nullptr;
}
// TODO: Function to parse stuff out of an rbuf into an FT_MSG
DBT k, v;
FT_MSG_S msg = {
type, msn, xids,
.u = { .id = { toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, val, vallen) } }
};
enqueue(&msg, is_fresh, dest);
xids_destroy(&xids);
}
invariant(num_entries() == n_in_this_buffer);
}
void message_buffer::_resize(size_t new_size) {
XREALLOC_N(new_size, _memory);
_memory_size = new_size;
}
......@@ -134,7 +190,7 @@ void message_buffer::enqueue(FT_MSG msg, bool is_fresh, int32_t *offset) {
if (_memory == nullptr || need_space_total > _memory_size) {
// resize the buffer to the next power of 2 greater than the needed space
int next_2 = next_power_of_two(need_space_total);
resize(next_2);
_resize(next_2);
}
ITEMLEN keylen = ft_msg_get_keylen(msg);
ITEMLEN datalen = ft_msg_get_vallen(msg);
......@@ -212,6 +268,26 @@ bool message_buffer::equals(message_buffer *other) const {
memcmp(_memory, other->_memory, _memory_used) == 0);
}
void message_buffer::serialize_to_wbuf(struct wbuf *wb) const {
wbuf_nocrc_int(wb, num_entries());
struct msg_serialize_fn {
struct wbuf *wb;
msg_serialize_fn(struct wbuf *w) : wb(w) { }
int operator()(FT_MSG msg, bool is_fresh) {
enum ft_msg_type type = (enum ft_msg_type) msg->type;
paranoid_invariant((int) type >= 0 && (int) type < 256);
wbuf_nocrc_char(wb, (unsigned char) type);
wbuf_nocrc_char(wb, (unsigned char) is_fresh);
wbuf_MSN(wb, msg->msn);
wbuf_nocrc_xids(wb, ft_msg_get_xids(msg));
wbuf_nocrc_bytes(wb, ft_msg_get_key(msg), ft_msg_get_keylen(msg));
wbuf_nocrc_bytes(wb, ft_msg_get_val(msg), ft_msg_get_vallen(msg));
return 0;
}
} serialize_fn(wb);
iterate(serialize_fn);
}
size_t message_buffer::msg_memsize_in_buffer(FT_MSG msg) {
const uint32_t keylen = ft_msg_get_keylen(msg);
const uint32_t datalen = ft_msg_get_vallen(msg);
......
......@@ -102,7 +102,14 @@ class message_buffer {
void destroy();
void resize(size_t new_size);
// effect: deserializes a message buffer from the given rbuf
// returns: *fresh_offsets (etc) malloc'd to be num_entries large and
// populated with *nfresh (etc) offsets in the message buffer
// requires: if fresh_offsets (etc) != nullptr, then nfresh != nullptr
void deserialize_from_rbuf(struct rbuf *rb,
int32_t **fresh_offsets, int32_t *nfresh,
int32_t **stale_offsets, int32_t *nstale,
int32_t **broadcast_offsets, int32_t *nbroadcast);
void enqueue(FT_MSG msg, bool is_fresh, int32_t *offset);
......@@ -139,9 +146,13 @@ class message_buffer {
bool equals(message_buffer *other) const;
void serialize_to_wbuf(struct wbuf *wb) const;
static size_t msg_memsize_in_buffer(FT_MSG msg);
private:
void _resize(size_t new_size);
// If this isn't packged, the compiler aligns the xids array and we waste a lot of space
struct __attribute__((__packed__)) buffer_entry {
unsigned int keylen;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment