Commit 078e510e authored by Sergei Golubchik's avatar Sergei Golubchik

Merge branch 'merge/merge-tokudb-5.6' into 10.0

parents 2e914acb e312e2e6
SET(TOKUDB_VERSION 5.6.31-77.0)
SET(TOKUDB_VERSION 5.6.32-78.1)
# PerconaFT only supports x86-64 and cmake-2.8.9+
IF(CMAKE_VERSION VERSION_LESS "2.8.9")
MESSAGE(STATUS "CMake 2.8.9 or higher is required by TokuDB")
......
......@@ -367,8 +367,8 @@ static void print_db_env_struct (void) {
"int (*checkpointing_get_period) (DB_ENV*, uint32_t*) /* Retrieve the delay between automatic checkpoints. 0 means disabled. */",
"int (*cleaner_set_period) (DB_ENV*, uint32_t) /* Change the delay between automatic cleaner attempts. 0 means disabled. */",
"int (*cleaner_get_period) (DB_ENV*, uint32_t*) /* Retrieve the delay between automatic cleaner attempts. 0 means disabled. */",
"int (*cleaner_set_iterations) (DB_ENV*, uint32_t) /* Change the number of attempts on each cleaner invokation. 0 means disabled. */",
"int (*cleaner_get_iterations) (DB_ENV*, uint32_t*) /* Retrieve the number of attempts on each cleaner invokation. 0 means disabled. */",
"int (*cleaner_set_iterations) (DB_ENV*, uint32_t) /* Change the number of attempts on each cleaner invocation. 0 means disabled. */",
"int (*cleaner_get_iterations) (DB_ENV*, uint32_t*) /* Retrieve the number of attempts on each cleaner invocation. 0 means disabled. */",
"int (*evictor_set_enable_partial_eviction) (DB_ENV*, bool) /* Enables or disabled partial eviction of nodes from cachetable. */",
"int (*evictor_get_enable_partial_eviction) (DB_ENV*, bool*) /* Retrieve the status of partial eviction of nodes from cachetable. */",
"int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */",
......
......@@ -103,6 +103,7 @@ set_cflags_if_supported(
-Wno-pointer-bool-conversion
-fno-rtti
-fno-exceptions
-Wno-error=nonnull-compare
)
## set_cflags_if_supported_named("-Weffc++" -Weffcpp)
......
......@@ -55,8 +55,8 @@ set(FT_SOURCES
msg_buffer
node
pivotkeys
serialize/rbtree_mhs
serialize/block_allocator
serialize/block_allocator_strategy
serialize/block_table
serialize/compress
serialize/ft_node-serialize
......
......@@ -496,7 +496,7 @@ handle_split_of_child(
// We never set the rightmost blocknum to be the root.
// Instead, we wait for the root to split and let promotion initialize the rightmost
// blocknum to be the first non-root leaf node on the right extreme to recieve an insert.
// blocknum to be the first non-root leaf node on the right extreme to receive an insert.
BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
invariant(ft->h->root_blocknum.b != rightmost_blocknum.b);
if (childa->blocknum.b == rightmost_blocknum.b) {
......@@ -1470,7 +1470,7 @@ void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa)
// It is possible after reading in the entire child,
// that we now know that the child is not reactive
// if so, we can unpin parent right now
// we wont be splitting/merging child
// we won't be splitting/merging child
// and we have already replaced the bnc
// for the root with a fresh one
enum reactivity child_re = toku_ftnode_get_reactivity(ft, child);
......
This diff is collapsed.
......@@ -73,30 +73,20 @@ static bool recount_rows_interrupt(void* extra, uint64_t deleted_rows) {
return rre->_cancelled =
rre->_progress_callback(rre->_keys, deleted_rows, rre->_progress_extra);
}
int toku_ft_recount_rows(
FT_HANDLE ft,
int (*progress_callback)(
uint64_t count,
uint64_t deleted,
void* progress_extra),
void* progress_extra) {
int toku_ft_recount_rows(FT_HANDLE ft,
int (*progress_callback)(uint64_t count,
uint64_t deleted,
void* progress_extra),
void* progress_extra) {
int ret = 0;
recount_rows_extra_t rre = {
progress_callback,
progress_extra,
0,
false
};
recount_rows_extra_t rre = {progress_callback, progress_extra, 0, false};
ft_cursor c;
ret = toku_ft_cursor_create(ft, &c, nullptr, C_READ_ANY, false, false);
if (ret) return ret;
if (ret)
return ret;
toku_ft_cursor_set_check_interrupt_cb(
&c,
recount_rows_interrupt,
&rre);
toku_ft_cursor_set_check_interrupt_cb(&c, recount_rows_interrupt, &rre);
ret = toku_ft_cursor_first(&c, recount_rows_found, &rre);
while (FT_LIKELY(ret == 0)) {
......@@ -108,6 +98,7 @@ int toku_ft_recount_rows(
if (rre._cancelled == false) {
// update ft count
toku_unsafe_set(&ft->ft->in_memory_logical_rows, rre._keys);
ft->ft->h->dirty = 1;
ret = 0;
}
......
......@@ -903,6 +903,9 @@ void toku_ft_adjust_logical_row_count(FT ft, int64_t delta) {
// must be returned in toku_ft_stat64.
if (delta != 0 && ft->in_memory_logical_rows != (uint64_t)-1) {
toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), delta);
if (ft->in_memory_logical_rows == (uint64_t)-1) {
toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), 1);
}
}
}
......
......@@ -301,7 +301,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error);
// For test purposes only. (In production, the rowset size is determined by negotation with the cachetable for some memory. See #2613.)
// For test purposes only. (In production, the rowset size is determined by negotiation with the cachetable for some memory. See #2613.)
uint64_t toku_ft_loader_get_rowset_budget_for_testing (void);
int toku_ft_loader_finish_extractor(FTLOADER bl);
......
......@@ -91,7 +91,7 @@ toku_ft_loader_set_size_factor(uint32_t factor) {
uint64_t
toku_ft_loader_get_rowset_budget_for_testing (void)
// For test purposes only. In production, the rowset size is determined by negotation with the cachetable for some memory. (See #2613).
// For test purposes only. In production, the rowset size is determined by negotiation with the cachetable for some memory. (See #2613).
{
return 16ULL*size_factor*1024ULL;
}
......
......@@ -373,52 +373,48 @@ find_bounds_within_message_tree(
}
}
/**
* For each message in the ancestor's buffer (determined by childnum) that
* is key-wise between lower_bound_exclusive and upper_bound_inclusive,
* apply the message to the basement node. We treat the bounds as minus
* or plus infinity respectively if they are NULL. Do not mark the node
* as dirty (preserve previous state of 'dirty' bit).
*/
// For each message in the ancestor's buffer (determined by childnum) that
// is key-wise between lower_bound_exclusive and upper_bound_inclusive,
// apply the message to the basement node. We treat the bounds as minus
// or plus infinity respectively if they are NULL. Do not mark the node
// as dirty (preserve previous state of 'dirty' bit).
static void bnc_apply_messages_to_basement_node(
FT_HANDLE t, // used for comparison function
BASEMENTNODE bn, // where to apply messages
FT_HANDLE t, // used for comparison function
BASEMENTNODE bn, // where to apply messages
FTNODE ancestor, // the ancestor node where we can find messages to apply
int childnum, // which child buffer of ancestor contains messages we want
const pivot_bounds &bounds, // contains pivot key bounds of this basement node
txn_gc_info* gc_info,
bool* msgs_applied) {
int childnum, // which child buffer of ancestor contains messages we want
const pivot_bounds &
bounds, // contains pivot key bounds of this basement node
txn_gc_info *gc_info,
bool *msgs_applied) {
int r;
NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
// Determine the offsets in the message trees between which we need to
// apply messages from this buffer
STAT64INFO_S stats_delta = {0,0};
STAT64INFO_S stats_delta = {0, 0};
uint64_t workdone_this_ancestor = 0;
int64_t logical_rows_delta = 0;
uint32_t stale_lbi, stale_ube;
if (!bn->stale_ancestor_messages_applied) {
find_bounds_within_message_tree(
t->ft->cmp,
bnc->stale_message_tree,
&bnc->msg_buffer,
bounds,
&stale_lbi,
&stale_ube);
find_bounds_within_message_tree(t->ft->cmp,
bnc->stale_message_tree,
&bnc->msg_buffer,
bounds,
&stale_lbi,
&stale_ube);
} else {
stale_lbi = 0;
stale_ube = 0;
}
uint32_t fresh_lbi, fresh_ube;
find_bounds_within_message_tree(
t->ft->cmp,
bnc->fresh_message_tree,
&bnc->msg_buffer,
bounds,
&fresh_lbi,
&fresh_ube);
find_bounds_within_message_tree(t->ft->cmp,
bnc->fresh_message_tree,
&bnc->msg_buffer,
bounds,
&fresh_lbi,
&fresh_ube);
// We now know where all the messages we must apply are, so one of the
// following 4 cases will do the application, depending on which of
......@@ -432,44 +428,53 @@ static void bnc_apply_messages_to_basement_node(
// We have messages in multiple trees, so we grab all
// the relevant messages' offsets and sort them by MSN, then apply
// them in MSN order.
const int buffer_size = ((stale_ube - stale_lbi) +
(fresh_ube - fresh_lbi) +
bnc->broadcast_list.size());
const int buffer_size =
((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) +
bnc->broadcast_list.size());
toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t));
int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get());
struct store_msg_buffer_offset_extra sfo_extra = { .offsets = offsets, .i = 0 };
struct store_msg_buffer_offset_extra sfo_extra = {.offsets = offsets,
.i = 0};
// Populate offsets array with offsets to stale messages
r = bnc->stale_message_tree.iterate_on_range<struct store_msg_buffer_offset_extra, store_msg_buffer_offset>(stale_lbi, stale_ube, &sfo_extra);
r = bnc->stale_message_tree
.iterate_on_range<struct store_msg_buffer_offset_extra,
store_msg_buffer_offset>(
stale_lbi, stale_ube, &sfo_extra);
assert_zero(r);
// Then store fresh offsets, and mark them to be moved to stale later.
r = bnc->fresh_message_tree.iterate_and_mark_range<struct store_msg_buffer_offset_extra, store_msg_buffer_offset>(fresh_lbi, fresh_ube, &sfo_extra);
r = bnc->fresh_message_tree
.iterate_and_mark_range<struct store_msg_buffer_offset_extra,
store_msg_buffer_offset>(
fresh_lbi, fresh_ube, &sfo_extra);
assert_zero(r);
// Store offsets of all broadcast messages.
r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra, store_msg_buffer_offset>(&sfo_extra);
r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra,
store_msg_buffer_offset>(&sfo_extra);
assert_zero(r);
invariant(sfo_extra.i == buffer_size);
// Sort by MSN.
toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>::mergesort_r(offsets, buffer_size, bnc->msg_buffer);
toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>::
mergesort_r(offsets, buffer_size, bnc->msg_buffer);
// Apply the messages in MSN order.
for (int i = 0; i < buffer_size; ++i) {
*msgs_applied = true;
do_bn_apply_msg(
t,
bn,
&bnc->msg_buffer,
offsets[i],
gc_info,
&workdone_this_ancestor,
&stats_delta,
&logical_rows_delta);
do_bn_apply_msg(t,
bn,
&bnc->msg_buffer,
offsets[i],
gc_info,
&workdone_this_ancestor,
&stats_delta,
&logical_rows_delta);
}
} else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later.
// No stale messages to apply, we just apply fresh messages, and mark
// them to be moved to stale later.
struct iterate_do_bn_apply_msg_extra iter_extra = {
.t = t,
.bn = bn,
......@@ -477,16 +482,20 @@ static void bnc_apply_messages_to_basement_node(
.gc_info = gc_info,
.workdone = &workdone_this_ancestor,
.stats_to_update = &stats_delta,
.logical_rows_delta = &logical_rows_delta
};
if (fresh_ube - fresh_lbi > 0) *msgs_applied = true;
r = bnc->fresh_message_tree.iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra, iterate_do_bn_apply_msg>(fresh_lbi, fresh_ube, &iter_extra);
.logical_rows_delta = &logical_rows_delta};
if (fresh_ube - fresh_lbi > 0)
*msgs_applied = true;
r = bnc->fresh_message_tree
.iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra,
iterate_do_bn_apply_msg>(
fresh_lbi, fresh_ube, &iter_extra);
assert_zero(r);
} else {
invariant(fresh_lbi == fresh_ube);
// No fresh messages to apply, we just apply stale messages.
if (stale_ube - stale_lbi > 0) *msgs_applied = true;
if (stale_ube - stale_lbi > 0)
*msgs_applied = true;
struct iterate_do_bn_apply_msg_extra iter_extra = {
.t = t,
.bn = bn,
......@@ -494,22 +503,26 @@ static void bnc_apply_messages_to_basement_node(
.gc_info = gc_info,
.workdone = &workdone_this_ancestor,
.stats_to_update = &stats_delta,
.logical_rows_delta = &logical_rows_delta
};
.logical_rows_delta = &logical_rows_delta};
r = bnc->stale_message_tree.iterate_on_range<struct iterate_do_bn_apply_msg_extra, iterate_do_bn_apply_msg>(stale_lbi, stale_ube, &iter_extra);
r = bnc->stale_message_tree
.iterate_on_range<struct iterate_do_bn_apply_msg_extra,
iterate_do_bn_apply_msg>(
stale_lbi, stale_ube, &iter_extra);
assert_zero(r);
}
//
// update stats
//
if (workdone_this_ancestor > 0) {
(void) toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum), workdone_this_ancestor);
(void)toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum),
workdone_this_ancestor);
}
if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
}
toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta);
bn->logical_rows_delta += logical_rows_delta;
}
static void
......
......@@ -199,6 +199,7 @@ struct ftnode_leaf_basement_node {
MSN max_msn_applied; // max message sequence number applied
bool stale_ancestor_messages_applied;
STAT64INFO_S stat64_delta; // change in stat64 counters since basement was last written to disk
int64_t logical_rows_delta;
};
typedef struct ftnode_leaf_basement_node *BASEMENTNODE;
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*======
This file is part of PerconaFT.
Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
----------------------------------------
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License, version 3,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
======= */
#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
#include <algorithm>
#include <string.h>
#include "portability/toku_assert.h"
#include "ft/serialize/block_allocator_strategy.h"
static uint64_t _align(uint64_t value, uint64_t ba_alignment) {
return ((value + ba_alignment - 1) / ba_alignment) * ba_alignment;
}
static uint64_t _roundup_to_power_of_two(uint64_t value) {
uint64_t r = 4096;
while (r < value) {
r *= 2;
invariant(r > 0);
}
return r;
}
// First fit block allocation
static struct block_allocator::blockpair *
_first_fit(struct block_allocator::blockpair *blocks_array,
uint64_t n_blocks, uint64_t size, uint64_t alignment,
uint64_t max_padding) {
if (n_blocks == 1) {
// won't enter loop, can't underflow the direction < 0 case
return nullptr;
}
struct block_allocator::blockpair *bp = &blocks_array[0];
for (uint64_t n_spaces_to_check = n_blocks - 1; n_spaces_to_check > 0;
n_spaces_to_check--, bp++) {
// Consider the space after bp
uint64_t padded_alignment = max_padding != 0 ? _align(max_padding, alignment) : alignment;
uint64_t possible_offset = _align(bp->offset + bp->size, padded_alignment);
if (possible_offset + size <= bp[1].offset) { // bp[1] is always valid since bp < &blocks_array[n_blocks-1]
invariant(bp - blocks_array < (int64_t) n_blocks);
return bp;
}
}
return nullptr;
}
static struct block_allocator::blockpair *
_first_fit_bw(struct block_allocator::blockpair *blocks_array,
uint64_t n_blocks, uint64_t size, uint64_t alignment,
uint64_t max_padding, struct block_allocator::blockpair *blocks_array_limit) {
if (n_blocks == 1) {
// won't enter loop, can't underflow the direction < 0 case
return nullptr;
}
struct block_allocator::blockpair *bp = &blocks_array[-1];
for (uint64_t n_spaces_to_check = n_blocks - 1; n_spaces_to_check > 0;
n_spaces_to_check--, bp--) {
// Consider the space after bp
uint64_t padded_alignment = max_padding != 0 ? _align(max_padding, alignment) : alignment;
uint64_t possible_offset = _align(bp->offset + bp->size, padded_alignment);
if (&bp[1] < blocks_array_limit && possible_offset + size <= bp[1].offset) {
invariant(blocks_array - bp < (int64_t) n_blocks);
return bp;
}
}
return nullptr;
}
struct block_allocator::blockpair *
block_allocator_strategy::first_fit(struct block_allocator::blockpair *blocks_array,
uint64_t n_blocks, uint64_t size, uint64_t alignment) {
return _first_fit(blocks_array, n_blocks, size, alignment, 0);
}
// Best fit block allocation
struct block_allocator::blockpair *
block_allocator_strategy::best_fit(struct block_allocator::blockpair *blocks_array,
uint64_t n_blocks, uint64_t size, uint64_t alignment) {
struct block_allocator::blockpair *best_bp = nullptr;
uint64_t best_hole_size = 0;
for (uint64_t blocknum = 0; blocknum + 1 < n_blocks; blocknum++) {
// Consider the space after blocknum
struct block_allocator::blockpair *bp = &blocks_array[blocknum];
uint64_t possible_offset = _align(bp->offset + bp->size, alignment);
uint64_t possible_end_offset = possible_offset + size;
if (possible_end_offset <= bp[1].offset) {
// It fits here. Is it the best fit?
uint64_t hole_size = bp[1].offset - possible_end_offset;
if (best_bp == nullptr || hole_size < best_hole_size) {
best_hole_size = hole_size;
best_bp = bp;
}
}
}
return best_bp;
}
static uint64_t padded_fit_alignment = 4096;
// TODO: These compiler specific directives should be abstracted in a portability header
// portability/toku_compiler.h?
__attribute__((__constructor__))
static void determine_padded_fit_alignment_from_env(void) {
// TODO: Should be in portability as 'toku_os_getenv()?'
const char *s = getenv("TOKU_BA_PADDED_FIT_ALIGNMENT");
if (s != nullptr && strlen(s) > 0) {
const int64_t alignment = strtoll(s, nullptr, 10);
if (alignment <= 0) {
fprintf(stderr, "tokuft: error: block allocator padded fit alignment found in environment (%s), "
"but it's out of range (should be an integer > 0). defaulting to %" PRIu64 "\n",
s, padded_fit_alignment);
} else {
padded_fit_alignment = _roundup_to_power_of_two(alignment);
fprintf(stderr, "tokuft: setting block allocator padded fit alignment to %" PRIu64 "\n",
padded_fit_alignment);
}
}
}
// First fit into a block that is oversized by up to max_padding.
// The hope is that if we purposefully waste a bit of space at allocation
// time we'll be more likely to reuse this block later.
struct block_allocator::blockpair *
block_allocator_strategy::padded_fit(struct block_allocator::blockpair *blocks_array,
uint64_t n_blocks, uint64_t size, uint64_t alignment) {
return _first_fit(blocks_array, n_blocks, size, alignment, padded_fit_alignment);
}
static double hot_zone_threshold = 0.85;
// TODO: These compiler specific directives should be abstracted in a portability header
// portability/toku_compiler.h?
__attribute__((__constructor__))
static void determine_hot_zone_threshold_from_env(void) {
// TODO: Should be in portability as 'toku_os_getenv()?'
const char *s = getenv("TOKU_BA_HOT_ZONE_THRESHOLD");
if (s != nullptr && strlen(s) > 0) {
const double hot_zone = strtod(s, nullptr);
if (hot_zone < 1 || hot_zone > 99) {
fprintf(stderr, "tokuft: error: block allocator hot zone threshold found in environment (%s), "
"but it's out of range (should be an integer 1 through 99). defaulting to 85\n", s);
hot_zone_threshold = 85 / 100;
} else {
fprintf(stderr, "tokuft: setting block allocator hot zone threshold to %s\n", s);
hot_zone_threshold = hot_zone / 100;
}
}
}
struct block_allocator::blockpair *
block_allocator_strategy::heat_zone(struct block_allocator::blockpair *blocks_array,
uint64_t n_blocks, uint64_t size, uint64_t alignment,
uint64_t heat) {
if (heat > 0) {
struct block_allocator::blockpair *bp, *boundary_bp;
// Hot allocation. Find the beginning of the hot zone.
boundary_bp = &blocks_array[n_blocks - 1];
uint64_t highest_offset = _align(boundary_bp->offset + boundary_bp->size, alignment);
uint64_t hot_zone_offset = static_cast<uint64_t>(hot_zone_threshold * highest_offset);
boundary_bp = std::lower_bound(blocks_array, blocks_array + n_blocks, hot_zone_offset);
uint64_t blocks_in_zone = (blocks_array + n_blocks) - boundary_bp;
uint64_t blocks_outside_zone = boundary_bp - blocks_array;
invariant(blocks_in_zone + blocks_outside_zone == n_blocks);
if (blocks_in_zone > 0) {
// Find the first fit in the hot zone, going forward.
bp = _first_fit(boundary_bp, blocks_in_zone, size, alignment, 0);
if (bp != nullptr) {
return bp;
}
}
if (blocks_outside_zone > 0) {
// Find the first fit in the cold zone, going backwards.
bp = _first_fit_bw(boundary_bp, blocks_outside_zone, size, alignment, 0, &blocks_array[n_blocks]);
if (bp != nullptr) {
return bp;
}
}
} else {
// Cold allocations are simply first-fit from the beginning.
return _first_fit(blocks_array, n_blocks, size, alignment, 0);
}
return nullptr;
}
......@@ -235,7 +235,7 @@ void toku_decompress (Bytef *dest, uLongf destLen,
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
char windowBits = source[1];
int8_t windowBits = source[1];
int r = inflateInit2(&strm, windowBits);
lazy_assert(r == Z_OK);
strm.next_out = dest;
......
......@@ -99,13 +99,11 @@ void toku_ft_serialize_layer_init(void) {
num_cores = toku_os_get_number_active_processors();
int r = toku_thread_pool_create(&ft_pool, num_cores);
lazy_assert_zero(r);
block_allocator::maybe_initialize_trace();
toku_serialize_in_parallel = false;
}
void toku_ft_serialize_layer_destroy(void) {
toku_thread_pool_destroy(&ft_pool);
block_allocator::maybe_close_trace();
}
enum { FILE_CHANGE_INCREMENT = (16 << 20) };
......@@ -773,19 +771,23 @@ int toku_serialize_ftnode_to_memory(FTNODE node,
return 0;
}
int
toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DATA* ndd, bool do_rebalancing, FT ft, bool for_checkpoint) {
int toku_serialize_ftnode_to(int fd,
BLOCKNUM blocknum,
FTNODE node,
FTNODE_DISK_DATA *ndd,
bool do_rebalancing,
FT ft,
bool for_checkpoint) {
size_t n_to_write;
size_t n_uncompressed_bytes;
char *compressed_buf = nullptr;
// because toku_serialize_ftnode_to is only called for
// because toku_serialize_ftnode_to is only called for
// in toku_ftnode_flush_callback, we pass false
// for in_parallel. The reasoning is that when we write
// nodes to disk via toku_ftnode_flush_callback, we
// nodes to disk via toku_ftnode_flush_callback, we
// assume that it is being done on a non-critical
// background thread (probably for checkpointing), and therefore
// background thread (probably for checkpointing), and therefore
// should not hog CPU,
//
// Should the above facts change, we may want to revisit
......@@ -802,32 +804,32 @@ toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DA
toku_unsafe_fetch(&toku_serialize_in_parallel),
&n_to_write,
&n_uncompressed_bytes,
&compressed_buf
);
&compressed_buf);
if (r != 0) {
return r;
}
// If the node has never been written, then write the whole buffer, including the zeros
invariant(blocknum.b>=0);
// If the node has never been written, then write the whole buffer,
// including the zeros
invariant(blocknum.b >= 0);
DISKOFF offset;
// Dirties the ft
ft->blocktable.realloc_on_disk(blocknum, n_to_write, &offset,
ft, fd, for_checkpoint,
// Allocations for nodes high in the tree are considered 'hot',
// as they are likely to move again in the next checkpoint.
node->height);
ft->blocktable.realloc_on_disk(
blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
tokutime_t t0 = toku_time_now();
toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
tokutime_t t1 = toku_time_now();
tokutime_t io_time = t1 - t0;
toku_ft_status_update_flush_reason(node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);
toku_ft_status_update_flush_reason(
node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);
toku_free(compressed_buf);
node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
node->dirty = 0; // See #1957. Must set the node to be clean after
// serializing it so that it doesn't get written again on
// the next checkpoint or eviction.
return 0;
}
......@@ -994,6 +996,7 @@ BASEMENTNODE toku_clone_bn(BASEMENTNODE orig_bn) {
bn->seqinsert = orig_bn->seqinsert;
bn->stale_ancestor_messages_applied = orig_bn->stale_ancestor_messages_applied;
bn->stat64_delta = orig_bn->stat64_delta;
bn->logical_rows_delta = orig_bn->logical_rows_delta;
bn->data_buffer.clone(&orig_bn->data_buffer);
return bn;
}
......@@ -1004,6 +1007,7 @@ BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
bn->seqinsert = 0;
bn->stale_ancestor_messages_applied = false;
bn->stat64_delta = ZEROSTATS;
bn->logical_rows_delta = 0;
bn->data_buffer.init_zero();
return bn;
}
......@@ -1897,7 +1901,7 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
/* out */ int *layout_version_p);
// This function upgrades a version 14 or 13 ftnode to the current
// verison. NOTE: This code assumes the first field of the rbuf has
// version. NOTE: This code assumes the first field of the rbuf has
// already been read from the buffer (namely the layout_version of the
// ftnode.)
static int
......@@ -2488,9 +2492,12 @@ toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIAL
serialized->blocknum = log->blocknum;
}
int
toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized_log, bool is_serialized,
FT ft, bool for_checkpoint) {
int toku_serialize_rollback_log_to(int fd,
ROLLBACK_LOG_NODE log,
SERIALIZED_ROLLBACK_LOG_NODE serialized_log,
bool is_serialized,
FT ft,
bool for_checkpoint) {
size_t n_to_write;
char *compressed_buf;
struct serialized_rollback_log_node serialized_local;
......@@ -2511,21 +2518,21 @@ toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBA
serialized_log->n_sub_blocks,
serialized_log->sub_block,
ft->h->compression_method,
&n_to_write, &compressed_buf);
&n_to_write,
&compressed_buf);
// Dirties the ft
DISKOFF offset;
ft->blocktable.realloc_on_disk(blocknum, n_to_write, &offset,
ft, fd, for_checkpoint,
// We consider rollback log flushing the hottest possible allocation,
// since rollback logs are short-lived compared to FT nodes.
INT_MAX);
ft->blocktable.realloc_on_disk(
blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
toku_free(compressed_buf);
if (!is_serialized) {
toku_static_serialized_rollback_log_destroy(&serialized_local);
log->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
log->dirty = 0; // See #1957. Must set the node to be clean after
// serializing it so that it doesn't get written again
// on the next checkpoint or eviction.
}
return 0;
}
......@@ -2704,7 +2711,7 @@ decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, s
}
static int decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
// This function exists solely to accomodate future changes in compression.
// This function exists solely to accommodate future changes in compression.
int r = 0;
if ((version == FT_LAYOUT_VERSION_13 || version == FT_LAYOUT_VERSION_14) ||
(FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) ||
......
This diff is collapsed.
This diff is collapsed.
......@@ -45,7 +45,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
// #5978 is fixed. Here is what we do. We have four pairs with
// blocknums and fullhashes of 1,2,3,4. The cachetable has only
// two bucket mutexes, so 1 and 3 share a pair mutex, as do 2 and 4.
// We pin all four with expensive write locks. Then, on backgroud threads,
// We pin all four with expensive write locks. Then, on background threads,
// we call get_and_pin_nonblocking on 3, where the unlockers unpins 2, and
// we call get_and_pin_nonblocking on 4, where the unlockers unpins 1. Run this
// enough times, and we should see a deadlock before the fix, and no deadlock
......
......@@ -77,7 +77,7 @@ flush (
//
// test the following things for simple cloning:
// - verifies that after teh checkpoint ends, the PAIR is properly
// - verifies that after the checkpoint ends, the PAIR is properly
// dirty or clean based on the second unpin
//
static void
......
......@@ -164,17 +164,16 @@ static void test_read_what_was_written (void) {
int r;
const int NVALS=10000;
if (verbose) printf("test_read_what_was_written(): "); fflush(stdout);
if (verbose) {
printf("test_read_what_was_written(): "); fflush(stdout);
}
unlink(fname);
toku_cachetable_create(&ct, 0, ZERO_LSN, nullptr);
r = toku_open_ft_handle(fname, 1, &ft, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, null_txn, toku_builtin_compare_fun); assert(r==0);
r = toku_close_ft_handle_nolsn(ft, 0); assert(r==0);
toku_cachetable_close(&ct);
toku_cachetable_close(&ct);
/* Now see if we can read an empty tree in. */
toku_cachetable_create(&ct, 0, ZERO_LSN, nullptr);
......@@ -189,8 +188,6 @@ static void test_read_what_was_written (void) {
r = toku_close_ft_handle_nolsn(ft, 0); assert(r==0);
toku_cachetable_close(&ct);
/* Now see if we can read it in and get the value. */
toku_cachetable_create(&ct, 0, ZERO_LSN, nullptr);
r = toku_open_ft_handle(fname, 0, &ft, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, null_txn, toku_builtin_compare_fun); assert(r==0);
......
......@@ -109,7 +109,9 @@ static int run_test(void)
r = pqueue_pop(pq, &node); assert(r==0);
if (verbose) printf("%d : %d\n", i, *(int*)(node->key->data));
if ( *(int*)(node->key->data) != i ) {
if (verbose) printf("FAIL\n"); return -1;
if (verbose)
printf("FAIL\n");
return -1;
}
}
pqueue_free(pq);
......
This diff is collapsed.
This diff is collapsed.
# commited insert
# committed insert
key k1
insert committed 0 v100
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment