ft-ops.cc 210 KB
Newer Older
1 2
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
3
#ident "$Id$"
4
#ident "Copyright (c) 2007-2012 Tokutek Inc.  All rights reserved."
5
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
6

7 8
/*

9
Managing the tree shape:  How insertion, deletion, and querying work
10

11
When we insert a message into the FT_HANDLE, here's what happens.
12

13
to insert a message at the root
14

15 16 17 18
    - find the root node
    - capture the next msn of the root node and assign it to the message
    - split the root if it needs to be split
    - insert the message into the root buffer
19
    - if the root is too full, then flush_some_child() of the root on a flusher thread
20

21
flusher functions use an advice struct with provides some functions to
22
call that tell it what to do based on the context of the flush. see ft-flusher.h
23 24 25 26 27

to flush some child, given a parent and some advice
    - pick the child using advice->pick_child()
    - remove that childs buffer from the parent
    - flush the buffer to the child
28
    - if the child has stable reactivity and
29 30 31 32 33
      advice->should_recursively_flush() is true, then
      flush_some_child() of the child
    - otherwise split the child if it needs to be split
    - otherwise maybe merge the child if it needs to be merged

34
flusher threads:
35

36 37 38 39 40 41 42 43 44 45 46 47 48 49
    flusher threads are created on demand as the result of internal nodes
    becoming gorged by insertions. this allows flushing to be done somewhere
    other than the client thread. these work items are enqueued onto
    the cachetable kibbutz and are done in a first in first out order.

cleaner threads:

    the cleaner thread wakes up every so often (say, 1 second) and chooses
    a small number (say, 5) of nodes as candidates for a flush. the one
    with the largest cache pressure is chosen to be flushed. cache pressure
    is a function of the size of the node in the cachetable plus the work done.
    the cleaner thread need not actually do a flush when awoken, so only
    nodes that have sufficient cache pressure are flushed.

50
checkpointing:
51 52 53 54 55 56 57 58 59

    the checkpoint thread wakes up every minute to checkpoint dirty nodes
    to disk. at the time of this writing, nodes during checkpoint are
    locked and cannot be queried or flushed to. a design in which nodes
    are copied before checkpoint is being considered as a way to reduce
    the performance variability caused by a checkpoint locking too
    many nodes and preventing other threads from traversing down the tree,
    for a query or otherwise.

60
To shrink a file: Let X be the size of the reachable data.
61 62 63
    We define an acceptable bloat constant of C.  For example we set C=2 if we are willing to allow the file to be as much as 2X in size.
    The goal is to find the smallest amount of stuff we can move to get the file down to size CX.
    That seems like a difficult problem, so we use the following heuristics:
64 65 66 67 68 69 70 71
       If we can relocate the last block to an lower location, then do so immediately.        (The file gets smaller right away, so even though the new location
         may even not be in the first CX bytes, we are making the file smaller.)
       Otherwise all of the earlier blocks are smaller than the last block (of size L).         So find the smallest region that has L free bytes in it.
         (This can be computed in one pass)
         Move the first allocated block in that region to some location not in the interior of the region.
               (Outside of the region is OK, and reallocating the block at the edge of the region is OK).
            This has the effect of creating a smaller region with at least L free bytes in it.
         Go back to the top (because by now some other block may have been allocated or freed).
72 73 74 75 76 77 78 79 80
    Claim: if there are no other allocations going on concurrently, then this algorithm will shrink the file reasonably efficiently.  By this I mean that
       each block of shrinkage does the smallest amount of work possible.  That doesn't mean that the work overall is minimized.
    Note: If there are other allocations and deallocations going on concurrently, we might never get enough space to move the last block.  But it takes a lot
      of allocations and deallocations to make that happen, and it's probably reasonable for the file not to shrink in this case.

To split or merge a child of a node:
Split_or_merge (node, childnum) {
  If the child needs to be split (it's a leaf with too much stuff or a nonleaf with too much fanout)
    fetch the node and the child into main memory.
81
    split the child, producing two nodes A and B, and also a pivot.   Don't worry if the resulting child is still too big or too small.         Fix it on the next pass.
82
    fixup node to point at the two new children.  Don't worry about the node getting too much fanout.
83
    return;
84 85 86 87
  If the child needs to be merged (it's a leaf with too little stuff (less than 1/4 full) or a nonleaf with too little fanout (less than 1/4)
    fetch node, the child  and a sibling of the child into main memory.
    move all messages from the node to the two children (so that the FIFOs are empty)
    If the two siblings together fit into one node then
88
      merge the two siblings.
89 90 91
      fixup the node to point at one child
    Otherwise
      load balance the content of the two nodes
92
    Don't worry about the resulting children having too many messages or otherwise being too big or too small.        Fix it on the next pass.
93 94
  }
}
95

96 97
Here's how querying works:

98 99
lookups:
    - As of Dr. No, we don't do any tree shaping on lookup.
100
    - We don't promote eagerly or use aggressive promotion or passive-aggressive
101
    promotion.        We just push messages down according to the traditional FT_HANDLE
102 103
    algorithm on insertions.
    - when a node is brought into memory, we apply ancestor messages above it.
104 105 106 107

basement nodes, bulk fetch,  and partial fetch:
    - leaf nodes are comprised of N basement nodes, each of nominal size. when
    a query hits a leaf node. it may require one or more basement nodes to be in memory.
108
    - for point queries, we do not read the entire node into memory. instead,
109 110
      we only read in the required basement node
    - for range queries, cursors may return cursor continue in their callback
111 112 113 114
      to take a the shortcut path until the end of the basement node.
    - for range queries, cursors may prelock a range of keys (with or without a txn).
      the fractal tree will prefetch nodes aggressively until the end of the range.
    - without a prelocked range, range queries behave like successive point queries.
115 116

*/
117

118
#include "includes.h"
119
#include "checkpoint.h"
120
#include "mempool.h"
121 122 123
// Access to nested transaction logic
#include "ule.h"
#include "xids.h"
124
#include "sub_block.h"
125
#include "sort-tmpl.h"
126 127
#include <ft-cachetable-wrappers.h>
#include <ft-flusher.h>
128
#include <valgrind/helgrind.h>
129
#include "txn_manager.h"
130
#include "partitioned_counter.h"
131

132 133 134 135 136 137 138 139 140
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
#define cilk_worker_count (__cilkrts_get_nworkers())
#else
#define cilk_spawn
#define cilk_sync
#define cilk_for for
#define cilk_worker_count 1
#endif
141

142
static const uint32_t this_version = FT_LAYOUT_VERSION;
143

144 145 146
/* Status is intended for display to humans to help understand system behavior.
 * It does not need to be perfectly thread-safe.
 */
147
static FT_STATUS_S ft_status;
148

149
#define STATUS_INIT(k,t,l) {                            \
150 151 152
        ft_status.status[k].keyname = #k;              \
        ft_status.status[k].type    = t;               \
        ft_status.status[k].legend  = "brt: " l;       \
153 154
    }

155 156
static toku_mutex_t ft_open_close_lock;

157 158 159 160 161
static void
status_init(void)
{
    // Note, this function initializes the keyname, type, and legend fields.
    // Value fields are initialized to zero by compiler.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
    STATUS_INIT(FT_UPDATES,                                UINT64, "dictionary updates");
    STATUS_INIT(FT_UPDATES_BROADCAST,                      UINT64, "dictionary broadcast updates");
    STATUS_INIT(FT_DESCRIPTOR_SET,                         UINT64, "descriptor set");
    STATUS_INIT(FT_PARTIAL_EVICTIONS_NONLEAF,              UINT64, "nonleaf node partial evictions");
    STATUS_INIT(FT_PARTIAL_EVICTIONS_LEAF,                 UINT64, "leaf node partial evictions");
    STATUS_INIT(FT_MSN_DISCARDS,                           UINT64, "messages ignored by leaf due to msn");
    STATUS_INIT(FT_MAX_WORKDONE,                           UINT64, "max workdone over all buffers");
    STATUS_INIT(FT_TOTAL_RETRIES,                          UINT64, "total search retries due to TRY_AGAIN");
    STATUS_INIT(FT_MAX_SEARCH_EXCESS_RETRIES,              UINT64, "max excess search retries (retries - tree height) due to TRY_AGAIN");
    STATUS_INIT(FT_SEARCH_TRIES_GT_HEIGHT,                 UINT64, "searches requiring more tries than the height of the tree");
    STATUS_INIT(FT_SEARCH_TRIES_GT_HEIGHTPLUS3,            UINT64, "searches requiring more tries than the height of the tree plus three");
    STATUS_INIT(FT_DISK_FLUSH_LEAF,                        UINT64, "leaf nodes flushed to disk (not for checkpoint)");
    STATUS_INIT(FT_DISK_FLUSH_NONLEAF,                     UINT64, "nonleaf nodes flushed to disk (not for checkpoint)");
    STATUS_INIT(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT,         UINT64, "leaf nodes flushed to disk (for checkpoint)");
    STATUS_INIT(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT,      UINT64, "nonleaf nodes flushed to disk (for checkpoint)");
    STATUS_INIT(FT_CREATE_LEAF,                            UINT64, "leaf nodes created");
    STATUS_INIT(FT_CREATE_NONLEAF,                         UINT64, "nonleaf nodes created");
    STATUS_INIT(FT_DESTROY_LEAF,                           UINT64, "leaf nodes destroyed");
    STATUS_INIT(FT_DESTROY_NONLEAF,                        UINT64, "nonleaf nodes destroyed");
    STATUS_INIT(FT_MSG_BYTES_IN,                           UINT64, "bytes of messages injected at root (all trees)");
    STATUS_INIT(FT_MSG_BYTES_OUT,                          UINT64, "bytes of messages flushed from h1 nodes to leaves");
    STATUS_INIT(FT_MSG_BYTES_CURR,                         UINT64, "bytes of messages currently in trees (estimate)");
    STATUS_INIT(FT_MSG_BYTES_MAX,                          UINT64, "max bytes of messages ever in trees (estimate)");
    STATUS_INIT(FT_MSG_NUM,                                UINT64, "messages injected at root");
    STATUS_INIT(FT_MSG_NUM_BROADCAST,                      UINT64, "broadcast messages injected at root");
    STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL,      UINT64, "basements decompressed as a target of a query");
    STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE,  UINT64, "basements decompressed for prelocked range");
    STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH,    UINT64, "basements decompressed for prefetch");
    STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_WRITE,       UINT64, "basements decompressed for write");
    STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL,     UINT64, "buffers decompressed as a target of a query");
    STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE, UINT64, "buffers decompressed for prelocked range");
    STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH,   UINT64, "buffers decompressed for prefetch");
    STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE,      UINT64, "buffers decompressed for write");
    STATUS_INIT(FT_NUM_PIVOTS_FETCHED_QUERY,               UINT64, "pivots fetched for query");
    STATUS_INIT(FT_NUM_PIVOTS_FETCHED_PREFETCH,            UINT64, "pivots fetched for prefetch");
    STATUS_INIT(FT_NUM_PIVOTS_FETCHED_WRITE,               UINT64, "pivots fetched for write");
    STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_NORMAL,           UINT64, "basements fetched as a target of a query");
    STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE,       UINT64, "basements fetched for prelocked range");
    STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_PREFETCH,         UINT64, "basements fetched for prefetch");
    STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_WRITE,            UINT64, "basements fetched for write");
    STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_NORMAL,          UINT64, "buffers fetched as a target of a query");
    STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE,      UINT64, "buffers fetched for prelocked range");
    STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH,        UINT64, "buffers fetched for prefetch");
    STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_WRITE,           UINT64, "buffers fetched for write");

    ft_status.initialized = true;
208 209 210 211
}
#undef STATUS_INIT

void
212 213
toku_ft_get_status(FT_STATUS s) {
    if (!ft_status.initialized) {
214 215
        status_init();
    }
216
    *s = ft_status;
217 218
}

219
#define STATUS_VALUE(x) ft_status.status[x].value.num
220

221
bool is_entire_node_in_memory(FTNODE node) {
222 223 224 225 226 227 228
    for (int i = 0; i < node->n_children; i++) {
        if(BP_STATE(node,i) != PT_AVAIL) {
            return false;
        }
    }
    return true;
}
229

230
void
231
toku_assert_entire_node_in_memory(FTNODE node) {
232
    assert(is_entire_node_in_memory(node));
233 234
}

Yoni Fogel's avatar
Yoni Fogel committed
235
static uint32_t
236
get_leaf_num_entries(FTNODE node) {
Yoni Fogel's avatar
Yoni Fogel committed
237
    uint32_t result = 0;
238
    int i;
239
    toku_assert_entire_node_in_memory(node);
240
    for ( i = 0; i < node->n_children; i++) {
241
        result += toku_omt_size(BLB_BUFFER(node, i));
242 243 244 245
    }
    return result;
}

246
static enum reactivity
247
get_leaf_reactivity (FTNODE node) {
248
    enum reactivity re = RE_STABLE;
Leif Walsh's avatar
Leif Walsh committed
249
    toku_assert_entire_node_in_memory(node);
250
    assert(node->height==0);
251
    unsigned int size = toku_serialize_ftnode_size(node);
Leif Walsh's avatar
Leif Walsh committed
252 253 254 255 256
    if (size > node->nodesize && get_leaf_num_entries(node) > 1) {
        re = RE_FISSIBLE;
    }
    else if ((size*4) < node->nodesize && !BLB_SEQINSERT(node, node->n_children-1)) {
        re = RE_FUSIBLE;
257 258
    }
    return re;
259 260
}

261
enum reactivity
262
get_nonleaf_reactivity (FTNODE node) {
263 264
    assert(node->height>0);
    int n_children = node->n_children;
265 266 267 268 269
    if (n_children > TREE_FANOUT) return RE_FISSIBLE;
    if (n_children*4 < TREE_FANOUT) return RE_FUSIBLE;
    return RE_STABLE;
}

270
enum reactivity
271
get_node_reactivity (FTNODE node) {
272
    toku_assert_entire_node_in_memory(node);
273
    if (node->height==0)
274
        return get_leaf_reactivity(node);
275
    else
276
        return get_nonleaf_reactivity(node);
277 278
}

279 280 281
unsigned int
toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc)
{
282
    return toku_fifo_buffer_size_in_use(bnc->buffer);
283 284
}

Yoni Fogel's avatar
Yoni Fogel committed
285
// return true if the size of the buffers plus the amount of work done is large enough.   (But return false if there is nothing to be flushed (the buffers empty)).
286
bool
287
toku_ft_nonleaf_is_gorged (FTNODE node) {
Yoni Fogel's avatar
Yoni Fogel committed
288
    uint64_t size = toku_serialize_ftnode_size(node);
289

Yoni Fogel's avatar
Yoni Fogel committed
290
    bool buffers_are_empty = true;
291
    toku_assert_entire_node_in_memory(node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
292 293 294 295 296 297 298
    //
    // the nonleaf node is gorged if the following holds true:
    //  - the buffers are non-empty
    //  - the total workdone by the buffers PLUS the size of the buffers
    //     is greater than node->nodesize (which as of Maxwell should be
    //     4MB)
    //
299
    assert(node->height > 0);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
300 301 302
    for (int child = 0; child < node->n_children; ++child) {
        size += BP_WORKDONE(node, child);
    }
303
    for (int child = 0; child < node->n_children; ++child) {
304
        if (toku_bnc_nbytesinbuf(BNC(node, child)) > 0) {
Yoni Fogel's avatar
Yoni Fogel committed
305
            buffers_are_empty = false;
306 307 308
            break;
        }
    }
309
    return ((size > node->nodesize)
310 311
            &&
            (!buffers_are_empty));
312 313
}

314 315
static void ft_verify_flags(FT ft, FTNODE node) {
    assert(ft->h->flags == node->flags);
316
}
317

318
int toku_ft_debug_mode = 0;
319

Yoni Fogel's avatar
Yoni Fogel committed
320
uint32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum) {
321
    assert(node->height>0 && childnum<node->n_children);
322
    return toku_cachetable_hash(cf, BP_BLOCKNUM(node, childnum));
323 324
}

325
// TODO: (Zardosht) look into this and possibly fix and use
326
static void __attribute__((__unused__))
327
ft_leaf_check_leaf_stats (FTNODE node)
328
{
329
    assert(node);
Yoni Fogel's avatar
Yoni Fogel committed
330
    assert(false);
331 332 333 334 335 336 337
    // static int count=0; count++;
    // if (node->height>0) return;
    // struct subtree_estimates e = calc_leaf_stats(node);
    // assert(e.ndata == node->u.l.leaf_stats.ndata);
    // assert(e.nkeys == node->u.l.leaf_stats.nkeys);
    // assert(e.dsize == node->u.l.leaf_stats.dsize);
    // assert(node->u.l.leaf_stats.exact);
338 339
}

340 341 342 343 344 345
int
toku_bnc_n_entries(NONLEAF_CHILDINFO bnc)
{
    return toku_fifo_n_entries(bnc->buffer);
}

346
static const DBT *prepivotkey (FTNODE node, int childnum, const DBT * const lower_bound_exclusive) {
347
    if (childnum==0)
348
        return lower_bound_exclusive;
349
    else {
350
        return &node->childkeys[childnum-1];
351 352 353
    }
}

354
static const DBT *postpivotkey (FTNODE node, int childnum, const DBT * const upper_bound_inclusive) {
355
    if (childnum+1 == node->n_children)
356
        return upper_bound_inclusive;
357
    else {
358
        return &node->childkeys[childnum];
359 360
    }
}
361
static struct pivot_bounds next_pivot_keys (FTNODE node, int childnum, struct pivot_bounds const * const old_pb) {
362
    struct pivot_bounds pb = {.lower_bound_exclusive = prepivotkey(node, childnum, old_pb->lower_bound_exclusive),
363
                              .upper_bound_inclusive = postpivotkey(node, childnum, old_pb->upper_bound_inclusive)};
364 365
    return pb;
}
366

367
// how much memory does this child buffer consume?
368
long
369
toku_bnc_memory_size(NONLEAF_CHILDINFO bnc)
370 371 372
{
    return (sizeof(*bnc) +
            toku_fifo_memory_footprint(bnc->buffer) +
373 374 375
            bnc->fresh_message_tree.memory_size() +
            bnc->stale_message_tree.memory_size() +
            bnc->broadcast_list.memory_size());
376 377 378 379 380 381
}

// how much memory in this child buffer holds useful data?
// originally created solely for use by test program(s).
long
toku_bnc_memory_used(NONLEAF_CHILDINFO bnc)
382 383
{
    return (sizeof(*bnc) +
384
            toku_fifo_memory_size_in_use(bnc->buffer) +
385 386 387
            bnc->fresh_message_tree.memory_size() +
            bnc->stale_message_tree.memory_size() +
            bnc->broadcast_list.memory_size());
388 389
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
390
static long
391
get_avail_internal_node_partition_size(FTNODE node, int i)
Zardosht Kasheff's avatar
Zardosht Kasheff committed
392 393
{
    assert(node->height > 0);
394
    return toku_bnc_memory_size(BNC(node, i));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
395 396
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
397

398
static long
399
ftnode_cachepressure_size(FTNODE node)
Zardosht Kasheff's avatar
Zardosht Kasheff committed
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
{
    long retval = 0;
    bool totally_empty = true;
    if (node->height == 0) {
        goto exit;
    }
    else {
        for (int i = 0; i < node->n_children; i++) {
            if (BP_STATE(node,i) == PT_INVALID || BP_STATE(node,i) == PT_ON_DISK) {
                continue;
            }
            else if (BP_STATE(node,i) == PT_COMPRESSED) {
                SUB_BLOCK sb = BSB(node, i);
                totally_empty = false;
                retval += sb->compressed_size;
            }
            else if (BP_STATE(node,i) == PT_AVAIL) {
                totally_empty = totally_empty && (toku_bnc_n_entries(BNC(node, i)) == 0);
                retval += get_avail_internal_node_partition_size(node, i);
419
                retval += BP_WORKDONE(node, i);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
420 421
            }
            else {
Yoni Fogel's avatar
Yoni Fogel committed
422
                assert(false);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
423 424 425 426 427 428 429 430 431 432
            }
        }
    }
exit:
    if (totally_empty) {
        return 0;
    }
    return retval;
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
433
long
434
ftnode_memory_size (FTNODE node)
435
// Effect: Estimate how much main memory a node requires.
436
{
437 438 439
    long retval = 0;
    int n_children = node->n_children;
    retval += sizeof(*node);
440
    retval += (n_children)*(sizeof(node->bp[0]));
441
    retval += node->totalchildkeylens;
442 443 444 445 446 447 448

    // now calculate the sizes of the partitions
    for (int i = 0; i < n_children; i++) {
        if (BP_STATE(node,i) == PT_INVALID || BP_STATE(node,i) == PT_ON_DISK) {
            continue;
        }
        else if (BP_STATE(node,i) == PT_COMPRESSED) {
449
            SUB_BLOCK sb = BSB(node, i);
450 451 452 453 454
            retval += sizeof(*sb);
            retval += sb->compressed_size;
        }
        else if (BP_STATE(node,i) == PT_AVAIL) {
            if (node->height > 0) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
455
                retval += get_avail_internal_node_partition_size(node, i);
456 457
            }
            else {
458
                BASEMENTNODE bn = BLB(node, i);
459
                retval += sizeof(*bn);
460 461 462 463 464 465 466
                {
                    // include fragmentation overhead but do not include space in the
                    // mempool that has not yet been allocated for leaf entries
                    size_t poolsize = toku_mempool_footprint(&bn->buffer_mempool);
                    invariant (poolsize >= BLB_NBYTESINBUF(node,i));
                    retval += poolsize;
                }
467 468 469 470 471
                OMT curr_omt = BLB_BUFFER(node, i);
                retval += (toku_omt_memory_size(curr_omt));
            }
        }
        else {
Yoni Fogel's avatar
Yoni Fogel committed
472
            assert(false);
473
        }
474
    }
475
    return retval;
476
}
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
477

478 479 480
PAIR_ATTR make_ftnode_pair_attr(FTNODE node) {
    long size = ftnode_memory_size(node);
    long cachepressure_size = ftnode_cachepressure_size(node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
481
    PAIR_ATTR result={
482 483 484 485 486
        .size = size,
        .nonleaf_size = (node->height > 0) ? size : 0,
        .leaf_size = (node->height > 0) ? 0 : size,
        .rollback_size = 0,
        .cache_pressure_size = cachepressure_size,
Yoni Fogel's avatar
Yoni Fogel committed
487
        .is_valid = true
488 489
    };
    return result;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
490 491
}

492 493
PAIR_ATTR make_invalid_pair_attr(void) {
    PAIR_ATTR result={
494 495 496 497 498
        .size = 0,
        .nonleaf_size = 0,
        .leaf_size = 0,
        .rollback_size = 0,
        .cache_pressure_size = 0,
Yoni Fogel's avatar
Yoni Fogel committed
499
        .is_valid = false
500 501
    };
    return result;
502
}
Zardosht Kasheff's avatar
Zardosht Kasheff committed
503 504


505
// assign unique dictionary id
506
static uint64_t dict_id_serial = 1;
507 508
static DICTIONARY_ID
next_dict_id(void) {
509
    uint64_t i = __sync_fetch_and_add(&dict_id_serial, 1);
510
    assert(i);        // guarantee unique dictionary id by asserting 64-bit counter never wraps
511 512 513 514
    DICTIONARY_ID d = {.dictid = i};
    return d;
}

515
//
516
// Given a bfe and a childnum, returns whether the query that constructed the bfe
517 518 519
// wants the child available.
// Requires: bfe->child_to_read to have been set
//
520
bool
521
toku_bfe_wants_child_available (struct ftnode_fetch_extra* bfe, int childnum)
522
{
523 524
    if (bfe->type == ftnode_fetch_all ||
        (bfe->type == ftnode_fetch_subset && bfe->child_to_read == childnum))
525
    {
526
        return true;
527 528
    }
    else {
529
        return false;
530 531 532
    }
}

533
int
534
toku_bfe_leftmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node)
535
{
536
    lazy_assert(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch);
537 538 539 540 541
    if (bfe->left_is_neg_infty) {
        return 0;
    } else if (bfe->range_lock_left_key == NULL) {
        return -1;
    } else {
542
        return toku_ftnode_which_child(node, bfe->range_lock_left_key, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
543 544 545 546
    }
}

int
547
toku_bfe_rightmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node)
548
{
549
    lazy_assert(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch);
550 551 552 553 554
    if (bfe->right_is_pos_infty) {
        return node->n_children - 1;
    } else if (bfe->range_lock_right_key == NULL) {
        return -1;
    } else {
555
        return toku_ftnode_which_child(node, bfe->range_lock_right_key, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
556 557
    }
}
558

559
static int
560
ft_cursor_rightmost_child_wanted(FT_CURSOR cursor, FT_HANDLE brt, FTNODE node)
561 562 563 564 565 566
{
    if (cursor->right_is_pos_infty) {
        return node->n_children - 1;
    } else if (cursor->range_lock_right_key.data == NULL) {
        return -1;
    } else {
567
        return toku_ftnode_which_child(node, &cursor->range_lock_right_key, &brt->ft->cmp_descriptor, brt->ft->compare_fun);
568 569
    }
}
570

571
STAT64INFO_S
572
toku_get_and_clear_basement_stats(FTNODE leafnode) {
573 574 575
    invariant(leafnode->height == 0);
    STAT64INFO_S deltas = ZEROSTATS;
    for (int i = 0; i < leafnode->n_children; i++) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
576 577 578 579 580
        BASEMENTNODE bn = BLB(leafnode, i);
        invariant(BP_STATE(leafnode,i) == PT_AVAIL);
        deltas.numrows  += bn->stat64_delta.numrows;
        deltas.numbytes += bn->stat64_delta.numbytes;
        bn->stat64_delta = ZEROSTATS;
581 582 583 584
    }
    return deltas;
}

Yoni Fogel's avatar
Yoni Fogel committed
585
static void ft_status_update_flush_reason(FTNODE node, bool for_checkpoint) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
586 587
    if (node->height == 0) {
        if (for_checkpoint) {
588
            __sync_fetch_and_add(&STATUS_VALUE(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT), 1);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
589 590
        }
        else {
591
            __sync_fetch_and_add(&STATUS_VALUE(FT_DISK_FLUSH_LEAF), 1);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
592 593 594 595
        }
    }
    else {
        if (for_checkpoint) {
596
            __sync_fetch_and_add(&STATUS_VALUE(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT), 1);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
597 598
        }
        else {
599
            __sync_fetch_and_add(&STATUS_VALUE(FT_DISK_FLUSH_NONLEAF), 1);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
600 601 602 603
        }
    }
}

604 605
static void ftnode_update_disk_stats(
    FTNODE ftnode,
606
    FT ft,
Yoni Fogel's avatar
Yoni Fogel committed
607
    bool for_checkpoint
608
    )
Zardosht Kasheff's avatar
Zardosht Kasheff committed
609 610 611
{
    STAT64INFO_S deltas = ZEROSTATS;
    // capture deltas before rebalancing basements for serialization
612
    deltas = toku_get_and_clear_basement_stats(ftnode);
613 614 615 616 617 618 619 620 621 622
    // locking not necessary here with respect to checkpointing
    // in Clayface (because of the pending lock and cachetable lock
    // in toku_cachetable_begin_checkpoint)
    // essentially, if we are dealing with a for_checkpoint 
    // parameter in a function that is called by the flush_callback,
    // then the cachetable needs to ensure that this is called in a safe
    // manner that does not interfere with the beginning
    // of a checkpoint, which it does with the cachetable lock
    // and pending lock
    toku_ft_update_stats(&ft->h->on_disk_stats, deltas);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
623
    if (for_checkpoint) {
624
        toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
625 626 627
    }
}

628
static void ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
629 630 631 632 633 634
    for (int i = 0; i < node->n_children; i++) {
        BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i);
        assert(BP_STATE(node,i) == PT_AVAIL);
        BP_STATE(cloned_node,i) = PT_AVAIL;
        BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i);
        if (node->height == 0) {
635
            set_BLB(cloned_node, i, toku_clone_bn(BLB(node,i)));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
636 637 638 639 640 641 642
        }
        else {
            set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i)));
        }
    }
}

643
void toku_ftnode_clone_callback(
644 645 646
    void* value_data,
    void** cloned_value_data,
    PAIR_ATTR* new_attr,
Yoni Fogel's avatar
Yoni Fogel committed
647
    bool for_checkpoint,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
648 649 650
    void* write_extraargs
    )
{
651
    FTNODE node = (FTNODE) value_data;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
652
    toku_assert_entire_node_in_memory(node);
653
    FT ft = (FT) write_extraargs;
654 655
    FTNODE XMALLOC(cloned_node);
    //FTNODE cloned_node = (FTNODE)toku_xmalloc(sizeof(*FTNODE));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
656 657
    memset(cloned_node, 0, sizeof(*cloned_node));
    if (node->height == 0) {
658
        // set header stats, must be done before rebalancing
659
        ftnode_update_disk_stats(node, ft, for_checkpoint);
660
        // rebalance the leaf node
661
        rebalance_ftnode_leaf(node, ft->h->basementnodesize);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681
    }

    cloned_node->max_msn_applied_to_node_on_disk = node->max_msn_applied_to_node_on_disk;
    cloned_node->nodesize = node->nodesize;
    cloned_node->flags = node->flags;
    cloned_node->thisnodename = node->thisnodename;
    cloned_node->layout_version = node->layout_version;
    cloned_node->layout_version_original = node->layout_version_original;
    cloned_node->layout_version_read_from_disk = node->layout_version_read_from_disk;
    cloned_node->build_id = node->build_id;
    cloned_node->height = node->height;
    cloned_node->dirty = node->dirty;
    cloned_node->fullhash = node->fullhash;
    cloned_node->n_children = node->n_children;
    cloned_node->totalchildkeylens = node->totalchildkeylens;

    XMALLOC_N(node->n_children-1, cloned_node->childkeys);
    XMALLOC_N(node->n_children, cloned_node->bp);
    // clone pivots
    for (int i = 0; i < node->n_children-1; i++) {
682
        toku_clone_dbt(&cloned_node->childkeys[i], node->childkeys[i]);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
683 684
    }
    // clone partition
685
    ftnode_clone_partitions(node, cloned_node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
686 687 688 689

    // clear dirty bit
    node->dirty = 0;
    cloned_node->dirty = 0;
690
    node->layout_version_read_from_disk = FT_LAYOUT_VERSION;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
691 692
    // set new pair attr if necessary
    if (node->height == 0) {
693
        *new_attr = make_ftnode_pair_attr(node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
694 695
    }
    else {
Yoni Fogel's avatar
Yoni Fogel committed
696
        new_attr->is_valid = false;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
697 698 699 700 701
    }
    *cloned_value_data = cloned_node;
}


702
void toku_ftnode_flush_callback (
703
    CACHEFILE UU(cachefile),
704 705
    int fd,
    BLOCKNUM nodename,
706
    void *ftnode_v,
707 708 709 710
    void** disk_data,
    void *extraargs,
    PAIR_ATTR size __attribute__((unused)),
    PAIR_ATTR* new_size,
Yoni Fogel's avatar
Yoni Fogel committed
711 712 713 714
    bool write_me,
    bool keep_me,
    bool for_checkpoint,
    bool is_clone
715
    )
Zardosht Kasheff's avatar
Zardosht Kasheff committed
716
{
717 718
    FT h = (FT) extraargs;
    FTNODE ftnode = (FTNODE) ftnode_v;
719 720 721
    FTNODE_DISK_DATA* ndd = (FTNODE_DISK_DATA*)disk_data;
    assert(ftnode->thisnodename.b==nodename.b);
    int height = ftnode->height;
722
    if (write_me) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
723
        if (height == 0 && !is_clone) {
724
            ftnode_update_disk_stats(ftnode, h, for_checkpoint);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
725
        }
726
        if (!h->panic) { // if the brt panicked, stop writing, otherwise try to write it.
727
            toku_assert_entire_node_in_memory(ftnode);
728
            int r = toku_serialize_ftnode_to(fd, ftnode->thisnodename, ftnode, ndd, !is_clone, h, for_checkpoint);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
729
            assert_zero(r);
730
            ftnode->layout_version_read_from_disk = FT_LAYOUT_VERSION;
731
        }
732
        ft_status_update_flush_reason(ftnode, for_checkpoint);
733 734
    }
    if (!keep_me) {
735
        if (!is_clone) {
736 737 738
            toku_free(*disk_data);
        }
        else {
739 740 741 742 743
            if (ftnode->height == 0) {
                for (int i = 0; i < ftnode->n_children; i++) {
                    if (BP_STATE(ftnode,i) == PT_AVAIL) {
                        BASEMENTNODE bn = BLB(ftnode, i);
                        toku_ft_decrease_stats(&h->in_memory_stats, bn->stat64_delta);
744 745 746 747
                    }
                }
            }
        }
748
        toku_ftnode_free(&ftnode);
749
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
750
    else {
751
        *new_size = make_ftnode_pair_attr(ftnode);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
752
    }
753 754
}

755
void
756
toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe)
757
{
758 759 760 761 762 763
    if (bfe->type == ftnode_fetch_prefetch) {
        STATUS_VALUE(FT_NUM_PIVOTS_FETCHED_PREFETCH)++;
    } else if (bfe->type == ftnode_fetch_all) {
        STATUS_VALUE(FT_NUM_PIVOTS_FETCHED_WRITE)++;
    } else if (bfe->type == ftnode_fetch_subset) {
        STATUS_VALUE(FT_NUM_PIVOTS_FETCHED_QUERY)++;
764 765
    }
}
766

Zardosht Kasheff's avatar
Zardosht Kasheff committed
767
int toku_ftnode_fetch_callback (CACHEFILE UU(cachefile), PAIR p, int fd, BLOCKNUM nodename, uint32_t fullhash,
768
                                 void **ftnode_pv,  void** disk_data, PAIR_ATTR *sizep, int *dirtyp, void *extraargs) {
769
    assert(extraargs);
770 771 772 773
    assert(*ftnode_pv == NULL);
    FTNODE_DISK_DATA* ndd = (FTNODE_DISK_DATA*)disk_data;
    struct ftnode_fetch_extra *bfe = (struct ftnode_fetch_extra *)extraargs;
    FTNODE *node=(FTNODE*)ftnode_pv;
774 775 776
    // deserialize the node, must pass the bfe in because we cannot
    // evaluate what piece of the the node is necessary until we get it at
    // least partially into memory
777 778 779
    int r = toku_deserialize_ftnode_from(fd, nodename, fullhash, node, ndd, bfe);
    if (r != 0) {
        if (r == TOKUDB_BAD_CHECKSUM) {
780
            fprintf(stderr,
781 782
                    "Checksum failure while reading node in file %s.\n",
                    toku_cachefile_fname_in_env(cachefile));
783
        } else {
784
            fprintf(stderr, "Error deserializing node, errno = %d", r);
785 786
        }
        // make absolutely sure we crash before doing anything else.
787 788
        assert(false);
    }
789

790
    if (r == 0) {
791
        *sizep = make_ftnode_pair_attr(*node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
792
        (*node)->ct_pair = p;
793
        *dirtyp = (*node)->dirty;  // deserialize could mark the node as dirty (presumably for upgrade)
794
    }
795 796 797
    return r;
}

798 799
void toku_ftnode_pe_est_callback(
    void* ftnode_pv,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
800
    void* disk_data,
801 802
    long* bytes_freed_estimate,
    enum partial_eviction_cost *cost,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
803 804 805
    void* UU(write_extraargs)
    )
{
806
    assert(ftnode_pv != NULL);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
807
    long bytes_to_free = 0;
808
    FTNODE node = (FTNODE)ftnode_pv;
809
    if (node->dirty || node->height == 0 ||
810
        node->layout_version_read_from_disk < FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
811 812 813 814 815 816 817 818 819 820 821 822 823
        *bytes_freed_estimate = 0;
        *cost = PE_CHEAP;
        goto exit;
    }

    //
    // we are dealing with a clean internal node
    //
    *cost = PE_EXPENSIVE;
    // now lets get an estimate for how much data we can free up
    // we estimate the compressed size of data to be how large
    // the compressed data is on disk
    for (int i = 0; i < node->n_children; i++) {
John Esmet's avatar
John Esmet committed
824
        if (BP_STATE(node,i) == PT_AVAIL && BP_SHOULD_EVICT(node,i)) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
825 826 827 828
            // calculate how much data would be freed if
            // we compress this node and add it to
            // bytes_to_free

829
            // first get an estimate for how much space will be taken
Zardosht Kasheff's avatar
Zardosht Kasheff committed
830 831
            // after compression, it is simply the size of compressed
            // data on disk plus the size of the struct that holds it
832
            FTNODE_DISK_DATA ndd = (FTNODE_DISK_DATA) disk_data;
Yoni Fogel's avatar
Yoni Fogel committed
833
            uint32_t compressed_data_size = BP_SIZE(ndd, i);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
834 835 836
            compressed_data_size += sizeof(struct sub_block);

            // now get the space taken now
Yoni Fogel's avatar
Yoni Fogel committed
837
            uint32_t decompressed_data_size = get_avail_internal_node_partition_size(node,i);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
838 839 840 841 842 843 844 845 846
            bytes_to_free += (decompressed_data_size - compressed_data_size);
        }
    }

    *bytes_freed_estimate = bytes_to_free;
exit:
    return;
}

847
static void
848
compress_internal_node_partition(FTNODE node, int i, enum toku_compression_method compression_method)
849 850 851 852 853
{
    // if we should evict, compress the
    // message buffer into a sub_block
    assert(BP_STATE(node, i) == PT_AVAIL);
    assert(node->height > 0);
854
    SUB_BLOCK XMALLOC(sb);
855
    sub_block_init(sb);
856
    toku_create_compressed_partition_from_available(node, i, compression_method, sb);
857

858 859 860 861 862 863
    // now free the old partition and replace it with this
    destroy_nonleaf_childinfo(BNC(node,i));
    set_BSB(node, i, sb);
    BP_STATE(node,i) = PT_COMPRESSED;
}

864
void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h) {
865 866 867
    // free the basement node
    assert(!node->dirty);
    BASEMENTNODE bn = BLB(node, childnum);
868
    toku_ft_decrease_stats(&h->in_memory_stats, bn->stat64_delta);
869 870 871 872 873 874 875
    struct mempool * mp = &bn->buffer_mempool;
    toku_mempool_destroy(mp);
    destroy_basement_node(bn);
    set_BNULL(node, childnum);
    BP_STATE(node, childnum) = PT_ON_DISK;
}

876
// callback for partially evicting a node
877 878
int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* new_attr, void* extraargs) {
    FTNODE node = (FTNODE)ftnode_pv;
879
    FT ft = (FT) extraargs;
880
    // Don't partially evict dirty nodes
Zardosht Kasheff's avatar
Zardosht Kasheff committed
881 882 883
    if (node->dirty) {
        goto exit;
    }
884 885
    // Don't partially evict nodes whose partitions can't be read back
    // from disk individually
886
    if (node->layout_version_read_from_disk < FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
887 888
        goto exit;
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
889
    //
890
    // partial eviction for nonleaf nodes
Zardosht Kasheff's avatar
Zardosht Kasheff committed
891 892 893 894 895
    //
    if (node->height > 0) {
        for (int i = 0; i < node->n_children; i++) {
            if (BP_STATE(node,i) == PT_AVAIL) {
                if (BP_SHOULD_EVICT(node,i)) {
896
                    STATUS_VALUE(FT_PARTIAL_EVICTIONS_NONLEAF)++;
897
                    cilk_spawn compress_internal_node_partition(node, i, ft->h->compression_method);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
898 899 900 901 902 903 904 905 906
                }
                else {
                    BP_SWEEP_CLOCK(node,i);
                }
            }
            else {
                continue;
            }
        }
907
        cilk_sync;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
908 909 910 911 912 913 914 915 916 917
    }
    //
    // partial eviction strategy for basement nodes:
    //  if the bn is compressed, evict it
    //  else: check if it requires eviction, if it does, evict it, if not, sweep the clock count
    //
    else {
        for (int i = 0; i < node->n_children; i++) {
            // Get rid of compressed stuff no matter what.
            if (BP_STATE(node,i) == PT_COMPRESSED) {
918
                STATUS_VALUE(FT_PARTIAL_EVICTIONS_LEAF)++;
919
                SUB_BLOCK sb = BSB(node, i);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
920
                toku_free(sb->compressed_ptr);
921
                toku_free(sb);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
922
                set_BNULL(node, i);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
923 924 925 926
                BP_STATE(node,i) = PT_ON_DISK;
            }
            else if (BP_STATE(node,i) == PT_AVAIL) {
                if (BP_SHOULD_EVICT(node,i)) {
927
                    STATUS_VALUE(FT_PARTIAL_EVICTIONS_LEAF)++;
928
                    toku_evict_bn_from_memory(node, i, ft);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
929 930 931 932 933 934 935 936 937
                }
                else {
                    BP_SWEEP_CLOCK(node,i);
                }
            }
            else if (BP_STATE(node,i) == PT_ON_DISK) {
                continue;
            }
            else {
Yoni Fogel's avatar
Yoni Fogel committed
938
                assert(false);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
939 940 941
            }
        }
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
942 943

exit:
944
    *new_attr = make_ftnode_pair_attr(node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
945 946 947
    return 0;
}

948

Zardosht Kasheff's avatar
Zardosht Kasheff committed
949
// Callback that states if a partial fetch of the node is necessary
950 951 952 953
// Currently, this function is responsible for the following things:
//  - reporting to the cachetable whether a partial fetch is required (as required by the contract of the callback)
//  - A couple of things that are NOT required by the callback, but we do for efficiency and simplicity reasons:
//   - for queries, set the value of bfe->child_to_read so that the query that called this can proceed with the query
954
//      as opposed to having to evaluate toku_ft_search_which_child again. This is done to make the in-memory query faster
955 956 957
//   - touch the necessary partition's clock. The reason we do it here is so that there is one central place it is done, and not done
//      by all the various callers
//
Yoni Fogel's avatar
Yoni Fogel committed
958
bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs) {
959
    // placeholder for now
Yoni Fogel's avatar
Yoni Fogel committed
960
    bool retval = false;
961 962
    FTNODE node = (FTNODE) ftnode_pv;
    struct ftnode_fetch_extra *bfe = (struct ftnode_fetch_extra *) read_extraargs;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
963 964
    //
    // The three types of fetches that the brt layer may request are:
965 966 967
    //  - ftnode_fetch_none: no partitions are necessary (example use: stat64)
    //  - ftnode_fetch_subset: some subset is necessary (example use: toku_ft_search)
    //  - ftnode_fetch_all: entire node is necessary (example use: flush, split, merge)
968
    // The code below checks if the necessary partitions are already in memory,
Yoni Fogel's avatar
Yoni Fogel committed
969
    // and if they are, return false, and if not, return true
Zardosht Kasheff's avatar
Zardosht Kasheff committed
970
    //
971
    if (bfe->type == ftnode_fetch_none) {
Yoni Fogel's avatar
Yoni Fogel committed
972
        retval = false;
973
    }
974
    else if (bfe->type == ftnode_fetch_all) {
Yoni Fogel's avatar
Yoni Fogel committed
975
        retval = false;
976
        for (int i = 0; i < node->n_children; i++) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
977 978 979 980
            BP_TOUCH_CLOCK(node,i);
            // if we find a partition that is not available,
            // then a partial fetch is required because
            // the entire node must be made available
981
            if (BP_STATE(node,i) != PT_AVAIL) {
Yoni Fogel's avatar
Yoni Fogel committed
982
                retval = true;
983 984 985
            }
        }
    }
986
    else if (bfe->type == ftnode_fetch_subset) {
987 988 989 990 991
        // we do not take into account prefetching yet
        // as of now, if we need a subset, the only thing
        // we can possibly require is a single basement node
        // we find out what basement node the query cares about
        // and check if it is available
Zardosht Kasheff's avatar
Zardosht Kasheff committed
992
        assert(bfe->h->compare_fun);
993
        assert(bfe->search);
994
        bfe->child_to_read = toku_ft_search_which_child(
Zardosht Kasheff's avatar
Zardosht Kasheff committed
995
            &bfe->h->cmp_descriptor,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
996
            bfe->h->compare_fun,
997 998 999
            node,
            bfe->search
            );
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1000
        BP_TOUCH_CLOCK(node,bfe->child_to_read);
Yoni Fogel's avatar
Yoni Fogel committed
1001
        // child we want to read is not available, must set retval to true
1002 1003
        retval = (BP_STATE(node, bfe->child_to_read) != PT_AVAIL);
    }
1004
    else if (bfe->type == ftnode_fetch_prefetch) {
1005 1006 1007
        // makes no sense to have prefetching disabled
        // and still call this function
        assert(!bfe->disable_prefetching);
1008 1009 1010 1011
        int lc = toku_bfe_leftmost_child_wanted(bfe, node);
        int rc = toku_bfe_rightmost_child_wanted(bfe, node);
        for (int i = lc; i <= rc; ++i) {
            if (BP_STATE(node, i) != PT_AVAIL) {
Yoni Fogel's avatar
Yoni Fogel committed
1012
                retval = true;
1013 1014
            }
        }
1015 1016 1017
    }
    else {
        // we have a bug. The type should be known
Yoni Fogel's avatar
Yoni Fogel committed
1018
        assert(false);
1019 1020
    }
    return retval;
1021 1022
}

1023
static void
1024 1025
ft_status_update_partial_fetch_reason(
    struct ftnode_fetch_extra* UU(bfe),
1026 1027
    int UU(i),
    int UU(state),
Yoni Fogel's avatar
Yoni Fogel committed
1028
    bool UU(is_leaf)
1029 1030 1031 1032
    )
{
    invariant(state == PT_COMPRESSED || state == PT_ON_DISK);
    if (is_leaf) {
1033
        if (bfe->type == ftnode_fetch_prefetch) {
1034
            if (state == PT_COMPRESSED) {
1035
                STATUS_VALUE(FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH)++;
1036
            } else {
1037
                STATUS_VALUE(FT_NUM_BASEMENTS_FETCHED_PREFETCH)++;
1038
            }
1039
        } else if (bfe->type == ftnode_fetch_all) {
1040
            if (state == PT_COMPRESSED) {
1041
                STATUS_VALUE(FT_NUM_BASEMENTS_DECOMPRESSED_WRITE)++;
1042
            } else {
1043
                STATUS_VALUE(FT_NUM_BASEMENTS_FETCHED_WRITE)++;
1044 1045 1046
            }
        } else if (i == bfe->child_to_read) {
            if (state == PT_COMPRESSED) {
1047
                STATUS_VALUE(FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL)++;
1048
            } else {
1049
                STATUS_VALUE(FT_NUM_BASEMENTS_FETCHED_NORMAL)++;
1050 1051 1052
            }
        } else {
            if (state == PT_COMPRESSED) {
1053
                STATUS_VALUE(FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE)++;
1054
            } else {
1055
                STATUS_VALUE(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE)++;
1056 1057 1058 1059
            }
        }
    }
    else {
1060
        if (bfe->type == ftnode_fetch_prefetch) {
1061
            if (state == PT_COMPRESSED) {
1062
                STATUS_VALUE(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH)++;
1063
            } else {
1064
                STATUS_VALUE(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH)++;
1065
            }
1066
        } else if (bfe->type == ftnode_fetch_all) {
1067
            if (state == PT_COMPRESSED) {
1068
                STATUS_VALUE(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE)++;
1069
            } else {
1070
                STATUS_VALUE(FT_NUM_MSG_BUFFER_FETCHED_WRITE)++;
1071 1072 1073
            }
        } else if (i == bfe->child_to_read) {
            if (state == PT_COMPRESSED) {
1074
                STATUS_VALUE(FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL)++;
1075
            } else {
1076
                STATUS_VALUE(FT_NUM_MSG_BUFFER_FETCHED_NORMAL)++;
1077 1078 1079
            }
        } else {
            if (state == PT_COMPRESSED) {
1080
                STATUS_VALUE(FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE)++;
1081
            } else {
1082
                STATUS_VALUE(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE)++;
1083 1084 1085 1086 1087
            }
        }
    }
}

1088
// callback for partially reading a node
1089 1090
// could have just used toku_ftnode_fetch_callback, but wanted to separate the two cases to separate functions
int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraargs, int fd, PAIR_ATTR* sizep) {
1091
    int r = 0;
1092 1093 1094
    FTNODE node = (FTNODE) ftnode_pv;
    FTNODE_DISK_DATA ndd = (FTNODE_DISK_DATA) disk_data;
    struct ftnode_fetch_extra *bfe = (struct ftnode_fetch_extra *) read_extraargs;
1095
    // there must be a reason this is being called. If we get a garbage type or the type is ftnode_fetch_none,
1096
    // then something went wrong
1097
    assert((bfe->type == ftnode_fetch_subset) || (bfe->type == ftnode_fetch_all) || (bfe->type == ftnode_fetch_prefetch));
1098 1099
    // determine the range to prefetch
    int lc, rc;
1100
    if (!bfe->disable_prefetching &&
1101
        (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch)
1102
        )
1103
    {
1104 1105 1106 1107 1108 1109
        lc = toku_bfe_leftmost_child_wanted(bfe, node);
        rc = toku_bfe_rightmost_child_wanted(bfe, node);
    } else {
        lc = -1;
        rc = -1;
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1110 1111
    // TODO: possibly cilkify expensive operations in this loop
    // TODO: review this with others to see if it can be made faster
1112 1113 1114 1115
    for (int i = 0; i < node->n_children; i++) {
        if (BP_STATE(node,i) == PT_AVAIL) {
            continue;
        }
1116
        if ((lc <= i && i <= rc) || toku_bfe_wants_child_available(bfe, i)) {
1117
            ft_status_update_partial_fetch_reason(bfe, i, BP_STATE(node, i), (node->height == 0));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1118
            if (BP_STATE(node,i) == PT_COMPRESSED) {
1119
                r = toku_deserialize_bp_from_compressed(node, i, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1120 1121
            }
            else if (BP_STATE(node,i) == PT_ON_DISK) {
1122
                r = toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1123 1124
            }
            else {
Yoni Fogel's avatar
Yoni Fogel committed
1125
                assert(false);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1126
            }
1127
        }
1128

1129 1130
        if (r != 0) {
            if (r == TOKUDB_BAD_CHECKSUM) {
1131 1132 1133
                fprintf(stderr,
                        "Checksum failure while reading node partition in file %s.\n",
                        toku_cachefile_fname_in_env(bfe->h->cf));
1134
            } else {
1135 1136
                fprintf(stderr,
                        "Error while reading node partition %d\n",
1137
                        get_maybe_error_errno());
1138
            }
1139 1140
            assert(false);
        }
1141
    }
1142

1143
    *sizep = make_ftnode_pair_attr(node);
1144

1145
    return 0;
1146 1147
}

1148
static int
Yoni Fogel's avatar
Yoni Fogel committed
1149
leafval_heaviside_le (uint32_t klen, void *kval,
1150
                      struct cmd_leafval_heaviside_extra *be) {
1151
    DBT dbt;
1152
    DBT const * const key = be->key;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1153
    FAKE_DB(db, be->desc);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1154
    return be->compare_fun(&db,
1155 1156
                           toku_fill_dbt(&dbt, kval, klen),
                           key);
1157 1158
}

1159 1160 1161
//TODO: #1125 optimize
int
toku_cmd_leafval_heaviside (OMTVALUE lev, void *extra) {
1162 1163
    LEAFENTRY CAST_FROM_VOIDP(le, lev);
    struct cmd_leafval_heaviside_extra *CAST_FROM_VOIDP(be, extra);
Yoni Fogel's avatar
Yoni Fogel committed
1164
    uint32_t keylen;
1165 1166
    void*     key = le_key_and_len(le, &keylen);
    return leafval_heaviside_le(keylen, key,
1167
                                be);
1168 1169
}

1170
static int
1171
ft_compare_pivot(DESCRIPTOR desc, ft_compare_func cmp, const DBT *key, const DBT *pivot)
1172
{
1173
    int r;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1174
    FAKE_DB(db, desc);
1175
    r = cmp(&db, key, pivot);
1176
    return r;
1177 1178
}

1179

1180
// destroys the internals of the ftnode, but it does not free the values
1181
// that are stored
1182
// this is common functionality for toku_ftnode_free and rebalance_ftnode_leaf
1183
// MUST NOT do anything besides free the structures that have been allocated
1184
void toku_destroy_ftnode_internals(FTNODE node)
1185
{
1186
    for (int i=0; i<node->n_children-1; i++) {
1187
        toku_free(node->childkeys[i].data);
1188 1189 1190
    }
    toku_free(node->childkeys);
    node->childkeys = NULL;
1191 1192 1193 1194

    for (int i=0; i < node->n_children; i++) {
        if (BP_STATE(node,i) == PT_AVAIL) {
            if (node->height > 0) {
1195
                destroy_nonleaf_childinfo(BNC(node,i));
1196
            } else {
1197
                destroy_basement_node(BLB(node, i));
1198
            }
1199
        } else if (BP_STATE(node,i) == PT_COMPRESSED) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1200 1201
            SUB_BLOCK sb = BSB(node,i);
            toku_free(sb->compressed_ptr);
1202
            toku_free(sb);
1203
        } else {
1204
            assert(is_BNULL(node, i));
1205
        }
1206
        set_BNULL(node, i);
1207 1208 1209 1210
    }
    toku_free(node->bp);
    node->bp = NULL;

1211 1212
}

1213

1214
/* Frees a node, including all the stuff in the hash table. */
1215
void toku_ftnode_free (FTNODE *nodep) {
1216

1217
    //TODO: #1378 Take omt lock (via ftnode) around call to toku_omt_destroy().
1218

1219
    FTNODE node=*nodep;
1220
    if (node->height == 0) {
1221
        for (int i = 0; i < node->n_children; i++) {
1222
            if (BP_STATE(node,i) == PT_AVAIL) {
1223 1224
                struct mempool * mp = &(BLB_BUFFER_MEMPOOL(node, i));
                toku_mempool_destroy(mp);
1225
            }
1226
        }
1227
        STATUS_VALUE(FT_DESTROY_LEAF)++;
1228
    } else {
1229
        STATUS_VALUE(FT_DESTROY_NONLEAF)++;
1230
    }
1231
    toku_destroy_ftnode_internals(node);
1232 1233 1234 1235
    toku_free(node);
    *nodep=0;
}

1236
void
1237 1238
toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_children, int layout_version, unsigned int nodesize, unsigned int flags)
// Effect: Fill in N as an empty ftnode.
1239
{
1240 1241 1242
    assert(layout_version != 0);
    assert(height >= 0);

1243
    if (height == 0)
1244
        STATUS_VALUE(FT_CREATE_LEAF)++;
1245
    else
1246
        STATUS_VALUE(FT_CREATE_NONLEAF)++;
1247

1248
    n->max_msn_applied_to_node_on_disk = ZERO_MSN;    // correct value for root node, harmless for others
1249 1250
    n->nodesize = nodesize;
    n->flags = flags;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1251
    n->thisnodename = nodename;
1252
    n->layout_version               = layout_version;
1253 1254 1255
    n->layout_version_original = layout_version;
    n->layout_version_read_from_disk = layout_version;
    n->height = height;
1256
    n->totalchildkeylens = 0;
1257
    n->childkeys = 0;
1258
    n->bp = 0;
1259
    n->n_children = num_children;
1260

1261
    if (num_children > 0) {
1262 1263
        XMALLOC_N(num_children-1, n->childkeys);
        XMALLOC_N(num_children, n->bp);
1264
        for (int i = 0; i < num_children; i++) {
1265 1266
            BP_BLOCKNUM(n,i).b=0;
            BP_STATE(n,i) = PT_INVALID;
1267
            BP_WORKDONE(n,i) = 0;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1268
            BP_INIT_TOUCHED_CLOCK(n, i);
1269
            set_BNULL(n,i);
1270
            if (height > 0) {
1271 1272 1273
                set_BNC(n, i, toku_create_empty_nl());
            } else {
                set_BLB(n, i, toku_create_empty_bn());
1274
            }
1275
        }
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1276
    }
1277
    n->dirty = 1;  // special case exception, it's okay to mark as dirty because the basements are empty
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1278 1279
}

1280
static void
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1281 1282 1283
ft_init_new_root(FT ft, FTNODE oldroot, FTNODE *newrootp)
// Effect:  Create a new root node whose two children are the split of oldroot.
//  oldroot is unpinned in the process.
1284 1285
//  Leave the new root pinned.
{
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306
    FTNODE newroot;

    BLOCKNUM old_blocknum = oldroot->thisnodename;
    uint32_t old_fullhash = oldroot->fullhash;
    PAIR old_pair = oldroot->ct_pair;
    
    int new_height = oldroot->height+1;
    uint32_t new_fullhash;
    BLOCKNUM new_blocknum;
    PAIR new_pair = NULL;

    cachetable_put_empty_node_with_dep_nodes(
        ft,
        1,
        &oldroot,
        &new_blocknum,
        &new_fullhash,
        &newroot
        );
    new_pair = newroot->ct_pair;
    
1307 1308
    assert(newroot);
    assert(new_height > 0);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319
    toku_initialize_empty_ftnode (
        newroot, 
        new_blocknum, 
        new_height, 
        1, 
        ft->h->layout_version, 
        ft->h->nodesize, 
        ft->h->flags
        );
    MSN msna = oldroot->max_msn_applied_to_node_on_disk;
    newroot->max_msn_applied_to_node_on_disk = msna;
1320
    BP_STATE(newroot,0) = PT_AVAIL;
1321
    newroot->dirty = 1;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356

    // now do the "switcheroo"
    BP_BLOCKNUM(newroot,0) = new_blocknum;
    newroot->thisnodename = old_blocknum;
    newroot->fullhash = old_fullhash;
    newroot->ct_pair = old_pair;

    oldroot->thisnodename = new_blocknum;
    oldroot->fullhash = new_fullhash;
    oldroot->ct_pair = new_pair;

    toku_cachetable_swap_pair_values(old_pair, new_pair);

    toku_ft_split_child(
        ft,
        newroot,
        0, // childnum to split
        oldroot
        );

    // ft_split_child released locks on newroot
    // and oldroot, so now we repin and
    // return to caller
    struct ftnode_fetch_extra bfe;
    fill_bfe_for_full_read(&bfe, ft);
    toku_pin_ftnode_off_client_thread(
        ft,
        old_blocknum,
        old_fullhash,
        &bfe,
        PL_WRITE_EXPENSIVE, // may_modify_node
        0,
        NULL,
        newrootp
        );
1357
}
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1358

1359
static void
1360
init_childinfo(FTNODE node, int childnum, FTNODE child) {
1361 1362
    BP_BLOCKNUM(node,childnum) = child->thisnodename;
    BP_STATE(node,childnum) = PT_AVAIL;
1363 1364
    BP_WORKDONE(node, childnum)   = 0;
    set_BNC(node, childnum, toku_create_empty_nl());
1365 1366 1367
}

static void
1368
init_childkey(FTNODE node, int childnum, const DBT *pivotkey) {
1369 1370
    toku_copyref_dbt(&node->childkeys[childnum], *pivotkey);
    node->totalchildkeylens += pivotkey->size;
1371 1372
}

1373
// Used only by test programs: append a child node to a parent node
1374
void
1375
toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) {
1376 1377
    int childnum = node->n_children;
    node->n_children++;
1378
    XREALLOC_N(node->n_children, node->bp);
1379
    init_childinfo(node, childnum, child);
1380
    XREALLOC_N(node->n_children-1, node->childkeys);
1381
    if (pivotkey) {
1382 1383
        invariant(childnum > 0);
        init_childkey(node, childnum-1, pivotkey);
1384
    }
1385
    node->dirty = 1;
1386 1387
}

1388
static void
1389
ft_leaf_delete_leafentry (
1390
    BASEMENTNODE bn,
Yoni Fogel's avatar
Yoni Fogel committed
1391
    uint32_t idx,
1392 1393
    LEAFENTRY le
    )
1394 1395 1396 1397 1398 1399 1400
// Effect: Delete leafentry
//   idx is the location where it is
//   le is the leafentry to be deleted
{
    // Figure out if one of the other keys is the same key

    {
1401 1402
        int r = toku_omt_delete_at(bn->buffer, idx);
        assert(r==0);
1403 1404
    }

1405
    bn->n_bytes_in_buffer -= leafentry_disksize(le);
1406

1407
    toku_mempool_mfree(&bn->buffer_mempool, 0, leafentry_memsize(le)); // Must pass 0, since le is no good any more.
1408 1409
}

1410
void
1411
toku_ft_bn_apply_cmd_once (
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1412
    BASEMENTNODE bn,
1413
    const FT_MSG cmd,
Yoni Fogel's avatar
Yoni Fogel committed
1414
    uint32_t idx,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1415
    LEAFENTRY le,
1416 1417
    uint64_t *workdone,
    STAT64INFO stats_to_update
1418 1419
    )
// Effect: Apply cmd to leafentry (msn is ignored)
1420
//         Calculate work done by message on leafentry and add it to caller's workdone counter.
1421 1422 1423
//   idx is the location where it goes
//   le is old leafentry
{
1424
    // ft_leaf_check_leaf_stats(node);
1425

1426
    size_t newsize=0, oldsize=0, workdone_this_le=0;
1427
    LEAFENTRY new_le=0;
1428
    void *maybe_free = 0;
1429 1430
    int64_t numbytes_delta = 0;  // how many bytes of user data (not including overhead) were added or deleted from this row
    int64_t numrows_delta = 0;   // will be +1 or -1 or 0 (if row was added or deleted or not)
1431 1432

    if (le)
1433
        oldsize = leafentry_memsize(le);
1434

1435
    // apply_msg_to_leafentry() may call mempool_malloc_from_omt() to allocate more space.
1436
    // That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is
1437
    // no longer in use.  We'll have to release the old mempool later.
1438
    {
1439 1440
        int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta);
        invariant(r==0);
1441
    }
1442

1443
    if (new_le) assert(newsize == leafentry_disksize(new_le));
1444
    if (le && new_le) {
1445 1446
        bn->n_bytes_in_buffer -= oldsize;
        bn->n_bytes_in_buffer += newsize;
1447

1448 1449 1450 1451
        // This mfree must occur after the mempool_malloc so that when
        // the mempool is compressed everything is accounted for.  But
        // we must compute the size before doing the mempool mfree
        // because otherwise the le pointer is no good.
1452
        toku_mempool_mfree(&bn->buffer_mempool, 0, oldsize); // Must pass 0, since le may be no good any more.
1453

1454 1455 1456 1457 1458 1459
        {
            int r = toku_omt_set_at(bn->buffer, new_le, idx);
            invariant(r==0);
        }

        workdone_this_le = (oldsize > newsize ? oldsize : newsize);  // work done is max of le size before and after message application
1460

1461
    } else {           // we did not just replace a row, so ...
1462 1463
        if (le) {
            //            ... we just deleted a row ...
1464
            // It was there, note that it's gone and remove it from the mempool
1465
            ft_leaf_delete_leafentry (bn, idx, le);
1466 1467 1468 1469 1470 1471 1472
            workdone_this_le = oldsize;
            numrows_delta = -1;
        }
        if (new_le) {
            //            ... or we just added a row
            int r = toku_omt_insert_at(bn->buffer, new_le, idx);
            invariant(r==0);
1473
            bn->n_bytes_in_buffer += newsize;
1474 1475
            workdone_this_le = newsize;
            numrows_delta = 1;
1476
        }
1477
    }
1478
    if (workdone) {  // test programs may call with NULL
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1479 1480 1481
        uint64_t new_workdone = __sync_add_and_fetch(workdone, workdone_this_le);
        if (new_workdone > STATUS_VALUE(FT_MAX_WORKDONE))
            STATUS_VALUE(FT_MAX_WORKDONE) = new_workdone;
1482
    }
1483

1484 1485 1486
    // if we created a new mempool buffer, free the old one
    if (maybe_free) toku_free(maybe_free);

1487 1488 1489
    // now update stat64 statistics
    bn->stat64_delta.numrows  += numrows_delta;
    bn->stat64_delta.numbytes += numbytes_delta;
1490 1491 1492 1493 1494
    // the only reason stats_to_update may be null is for tests
    if (stats_to_update) {
        stats_to_update->numrows += numrows_delta;
        stats_to_update->numbytes += numbytes_delta;
    }
1495

1496 1497
}

1498 1499
static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number.  We want to make sure that the user actually passes us the setval_extra_s that we passed in.
struct setval_extra_s {
Yoni Fogel's avatar
Yoni Fogel committed
1500 1501
    uint32_t  tag;
    bool did_set_val;
1502
    int         setval_r;    // any error code that setval_fun wants to return goes here.
1503
    // need arguments for toku_ft_bn_apply_cmd_once
1504
    BASEMENTNODE bn;
1505
    MSN msn;              // captured from original message, not currently used
1506 1507
    XIDS xids;
    const DBT *key;
Yoni Fogel's avatar
Yoni Fogel committed
1508
    uint32_t idx;
1509
    LEAFENTRY le;
1510
    uint64_t * workdone;  // set by toku_ft_bn_apply_cmd_once()
1511
    STAT64INFO stats_to_update;
1512 1513 1514 1515 1516
};

/*
 * If new_val == NULL, we send a delete message instead of an insert.
 * This happens here instead of in do_delete() for consistency.
1517 1518
 * setval_fun() is called from handlerton, passing in svextra_v
 * from setval_extra_s input arg to brt->update_fun().
1519 1520
 */
static void setval_fun (const DBT *new_val, void *svextra_v) {
1521
    struct setval_extra_s *CAST_FROM_VOIDP(svextra, svextra_v);
1522 1523
    assert(svextra->tag==setval_tag);
    assert(!svextra->did_set_val);
Yoni Fogel's avatar
Yoni Fogel committed
1524
    svextra->did_set_val = true;
1525 1526

    {
1527
        // can't leave scope until toku_ft_bn_apply_cmd_once if
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1528 1529
        // this is a delete
        DBT val;
1530
        FT_MSG_S msg = { FT_NONE, svextra->msn, svextra->xids,
1531
                         .u = { .id = {svextra->key, NULL} } };
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1532
        if (new_val) {
1533
            msg.type = FT_INSERT;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1534 1535
            msg.u.id.val = new_val;
        } else {
1536
            msg.type = FT_DELETE_ANY;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1537 1538 1539
            toku_init_dbt(&val);
            msg.u.id.val = &val;
        }
1540
        toku_ft_bn_apply_cmd_once(svextra->bn, &msg,
1541 1542
                                   svextra->idx, svextra->le,
                                   svextra->workdone, svextra->stats_to_update);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1543
        svextra->setval_r = 0;
1544
    }
1545 1546
}

1547
// We are already past the msn filter (in toku_ft_bn_apply_cmd(), which calls do_update()),
1548
// so capturing the msn in the setval_extra_s is not strictly required.         The alternative
1549 1550
// would be to put a dummy msn in the messages created by setval_fun(), but preserving
// the original msn seems cleaner and it preserves accountability at a lower layer.
1551
static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn, FT_MSG cmd, uint32_t idx,
1552
                     LEAFENTRY le,
1553 1554
                     uint64_t * workdone,
                     STAT64INFO stats_to_update) {
1555 1556 1557 1558 1559 1560 1561 1562 1563
    LEAFENTRY le_for_update;
    DBT key;
    const DBT *keyp;
    const DBT *update_function_extra;
    DBT vdbt;
    const DBT *vdbtp;

    // the location of data depends whether this is a regular or
    // broadcast update
1564
    if (cmd->type == FT_UPDATE) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1565 1566
        // key is passed in with command (should be same as from le)
        // update function extra is passed in with command
1567
        STATUS_VALUE(FT_UPDATES)++;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1568 1569
        keyp = cmd->u.id.key;
        update_function_extra = cmd->u.id.val;
1570
    } else if (cmd->type == FT_UPDATE_BROADCAST_ALL) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1571 1572 1573 1574 1575
        // key is not passed in with broadcast, it comes from le
        // update function extra is passed in with command
        assert(le);  // for broadcast updates, we just hit all leafentries
                     // so this cannot be null
        assert(cmd->u.id.key->size == 0);
1576
        STATUS_VALUE(FT_UPDATES_BROADCAST)++;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1577 1578
        keyp = toku_fill_dbt(&key, le_key(le), le_keylen(le));
        update_function_extra = cmd->u.id.val;
1579
    } else {
Yoni Fogel's avatar
Yoni Fogel committed
1580
        assert(false);
1581 1582 1583
    }

    if (le && !le_latest_is_del(le)) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1584
        // if the latest val exists, use it, and we'll use the leafentry later
Yoni Fogel's avatar
Yoni Fogel committed
1585
        uint32_t vallen;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1586 1587 1588
        void *valp = le_latest_val_and_len(le, &vallen);
        vdbtp = toku_fill_dbt(&vdbt, valp, vallen);
        le_for_update = le;
1589
    } else {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1590 1591 1592
        // otherwise, the val and leafentry are both going to be null
        vdbtp = NULL;
        le_for_update = NULL;
1593 1594
    }

Yoni Fogel's avatar
Yoni Fogel committed
1595
    struct setval_extra_s setval_extra = {setval_tag, false, 0, bn, cmd->msn, cmd->xids,
1596
                                          keyp, idx, le_for_update, workdone, stats_to_update};
1597
    // call handlerton's brt->update_fun(), which passes setval_extra to setval_fun()
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1598
    FAKE_DB(db, desc);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1599 1600 1601 1602 1603 1604 1605
    int r = update_fun(
        &db,
        keyp,
        vdbtp,
        update_function_extra,
        setval_fun, &setval_extra
        );
1606 1607 1608 1609 1610

    if (r == 0) { r = setval_extra.setval_r; }
    return r;
}

1611
// Should be renamed as something like "apply_cmd_to_basement()."
1612
void
1613 1614 1615
toku_ft_bn_apply_cmd (
    ft_compare_func compare_fun,
    ft_update_func update_fun,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1616
    DESCRIPTOR desc,
1617
    BASEMENTNODE bn,
1618
    FT_MSG cmd,
1619 1620
    uint64_t *workdone,
    STAT64INFO stats_to_update
1621
    )
1622
// Effect:
1623 1624
//   Put a cmd into a leaf.
//   Calculate work done by message on leafnode and add it to caller's workdone counter.
1625
// The leaf could end up "too big" or "too small".  The caller must fix that up.
1626 1627 1628 1629
{
    LEAFENTRY storeddata;
    OMTVALUE storeddatav=NULL;

Yoni Fogel's avatar
Yoni Fogel committed
1630
    uint32_t omt_size;
1631
    int r;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1632
    struct cmd_leafval_heaviside_extra be = {compare_fun, desc, cmd->u.id.key};
1633

1634 1635
    unsigned int doing_seqinsert = bn->seqinsert;
    bn->seqinsert = 0;
1636

1637
    switch (cmd->type) {
1638 1639
    case FT_INSERT_NO_OVERWRITE:
    case FT_INSERT: {
Yoni Fogel's avatar
Yoni Fogel committed
1640
        uint32_t idx;
1641 1642 1643 1644
        if (doing_seqinsert) {
            idx = toku_omt_size(bn->buffer);
            r = toku_omt_fetch(bn->buffer, idx-1, &storeddatav);
            if (r != 0) goto fz;
1645
            CAST_FROM_VOIDP(storeddata, storeddatav);
1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657
            int cmp = toku_cmd_leafval_heaviside(storeddata, &be);
            if (cmp >= 0) goto fz;
            r = DB_NOTFOUND;
        } else {
        fz:
            r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
                                   &storeddatav, &idx);
        }
        if (r==DB_NOTFOUND) {
            storeddata = 0;
        } else {
            assert(r==0);
1658
            CAST_FROM_VOIDP(storeddata, storeddatav);
1659
        }
1660
        toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update);
1661 1662 1663 1664 1665

        // if the insertion point is within a window of the right edge of
        // the leaf then it is sequential
        // window = min(32, number of leaf entries/16)
        {
Yoni Fogel's avatar
Yoni Fogel committed
1666 1667
            uint32_t s = toku_omt_size(bn->buffer);
            uint32_t w = s / 16;
1668 1669 1670 1671 1672 1673 1674 1675
            if (w == 0) w = 1;
            if (w > 32) w = 32;

            // within the window?
            if (s - idx <= w)
                bn->seqinsert = doing_seqinsert + 1;
        }
        break;
1676
    }
1677 1678 1679
    case FT_DELETE_ANY:
    case FT_ABORT_ANY:
    case FT_COMMIT_ANY: {
Yoni Fogel's avatar
Yoni Fogel committed
1680
        uint32_t idx;
1681 1682 1683 1684 1685 1686
        // Apply to all the matches

        r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
                               &storeddatav, &idx);
        if (r == DB_NOTFOUND) break;
        assert(r==0);
1687
        CAST_FROM_VOIDP(storeddata, storeddatav);
1688 1689

        while (1) {
Yoni Fogel's avatar
Yoni Fogel committed
1690
            uint32_t num_leafentries_before = toku_omt_size(bn->buffer);
1691

1692
            toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update);
1693 1694 1695

            {
                // Now we must find the next leafentry.
Yoni Fogel's avatar
Yoni Fogel committed
1696
                uint32_t num_leafentries_after = toku_omt_size(bn->buffer);
1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710
                //idx is the index of the leafentry we just modified.
                //If the leafentry was deleted, we will have one less leafentry in
                //the omt than we started with and the next leafentry will be at the
                //same index as the deleted one. Otherwise, the next leafentry will
                //be at the next index (+1).
                assert(num_leafentries_before        == num_leafentries_after ||
                       num_leafentries_before-1 == num_leafentries_after);
                if (num_leafentries_after==num_leafentries_before) idx++; //Not deleted, advance index.

                assert(idx <= num_leafentries_after);
                if (idx == num_leafentries_after) break; //Reached the end of the leaf
                r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
                assert_zero(r);
            }
1711
            CAST_FROM_VOIDP(storeddata, storeddatav);
1712 1713
            {        // Continue only if the next record that we found has the same key.
                DBT adbt;
Yoni Fogel's avatar
Yoni Fogel committed
1714
                uint32_t keylen;
1715 1716 1717 1718 1719 1720 1721 1722 1723 1724
                void *keyp = le_key_and_len(storeddata, &keylen);
                FAKE_DB(db, desc);
                if (compare_fun(&db,
                                toku_fill_dbt(&adbt, keyp, keylen),
                                cmd->u.id.key) != 0)
                    break;
            }
        }

        break;
1725
    }
1726
    case FT_OPTIMIZE_FOR_UPGRADE:
1727
        // fall through so that optimize_for_upgrade performs rest of the optimize logic
1728 1729
    case FT_COMMIT_BROADCAST_ALL:
    case FT_OPTIMIZE:
1730 1731
        // Apply to all leafentries
        omt_size = toku_omt_size(bn->buffer);
Yoni Fogel's avatar
Yoni Fogel committed
1732
        for (uint32_t idx = 0; idx < omt_size; ) {
1733 1734
            r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
            assert_zero(r);
1735
            CAST_FROM_VOIDP(storeddata, storeddatav);
1736 1737
            int deleted = 0;
            if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
1738
                toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update);
Yoni Fogel's avatar
Yoni Fogel committed
1739
                uint32_t new_omt_size = toku_omt_size(bn->buffer);
1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
                if (new_omt_size != omt_size) {
                    assert(new_omt_size+1 == omt_size);
                    //Item was deleted.
                    deleted = 1;
                }
            }
            if (deleted)
                omt_size--;
            else
                idx++;
        }
        assert(toku_omt_size(bn->buffer) == omt_size);

        break;
1754 1755
    case FT_COMMIT_BROADCAST_TXN:
    case FT_ABORT_BROADCAST_TXN:
1756 1757
        // Apply to all leafentries if txn is represented
        omt_size = toku_omt_size(bn->buffer);
Yoni Fogel's avatar
Yoni Fogel committed
1758
        for (uint32_t idx = 0; idx < omt_size; ) {
1759 1760
            r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
            assert_zero(r);
1761
            CAST_FROM_VOIDP(storeddata, storeddatav);
1762 1763
            int deleted = 0;
            if (le_has_xids(storeddata, cmd->xids)) {
1764
                toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update);
Yoni Fogel's avatar
Yoni Fogel committed
1765
                uint32_t new_omt_size = toku_omt_size(bn->buffer);
1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779
                if (new_omt_size != omt_size) {
                    assert(new_omt_size+1 == omt_size);
                    //Item was deleted.
                    deleted = 1;
                }
            }
            if (deleted)
                omt_size--;
            else
                idx++;
        }
        assert(toku_omt_size(bn->buffer) == omt_size);

        break;
1780
    case FT_UPDATE: {
Yoni Fogel's avatar
Yoni Fogel committed
1781
        uint32_t idx;
1782 1783 1784 1785 1786
        r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
                               &storeddatav, &idx);
        if (r==DB_NOTFOUND) {
            r = do_update(update_fun, desc, bn, cmd, idx, NULL, workdone, stats_to_update);
        } else if (r==0) {
1787
            CAST_FROM_VOIDP(storeddata, storeddatav);
1788 1789 1790
            r = do_update(update_fun, desc, bn, cmd, idx, storeddata, workdone, stats_to_update);
        } // otherwise, a worse error, just return it
        break;
1791
    }
1792
    case FT_UPDATE_BROADCAST_ALL: {
1793
        // apply to all leafentries.
Yoni Fogel's avatar
Yoni Fogel committed
1794 1795
        uint32_t idx = 0;
        uint32_t num_leafentries_before;
1796 1797 1798
        while (idx < (num_leafentries_before = toku_omt_size(bn->buffer))) {
            r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
            assert(r==0);
1799
            CAST_FROM_VOIDP(storeddata, storeddatav);
1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810
            r = do_update(update_fun, desc, bn, cmd, idx, storeddata, workdone, stats_to_update);
            // TODO(leif): This early return means get_leaf_reactivity()
            // and VERIFY_NODE() never get called.  Is this a problem?
            assert(r==0);

            if (num_leafentries_before == toku_omt_size(bn->buffer)) {
                // we didn't delete something, so increment the index.
                idx++;
            }
        }
        break;
1811
    }
1812
    case FT_NONE: break; // don't do anything
1813 1814
    }

1815
    return;
1816 1817
}

1818 1819
static inline int
key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn,
1820
            DESCRIPTOR descriptor, ft_compare_func key_cmp)
1821
{
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1822
    FAKE_DB(db, descriptor);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1823
    int r = key_cmp(&db, a, b);
1824
    if (r == 0) {
1825 1826 1827 1828 1829 1830 1831
        if (amsn.msn > bmsn.msn) {
            r = +1;
        } else if (amsn.msn < bmsn.msn) {
            r = -1;
        } else {
            r = 0;
        }
1832 1833 1834 1835 1836
    }
    return r;
}

int
1837
toku_fifo_entry_key_msn_heaviside(const int32_t &offset, const struct toku_fifo_entry_key_msn_heaviside_extra &extra)
1838
{
1839
    const struct fifo_entry *query = toku_fifo_get_entry(extra.fifo, offset);
1840
    DBT qdbt;
1841
    const DBT *query_key = fill_dbt_for_fifo_entry(&qdbt, query);
1842 1843 1844
    const DBT *target_key = extra.key;
    return key_msn_cmp(query_key, target_key, query->msn, extra.msn,
                       extra.desc, extra.cmp);
1845 1846 1847
}

int
1848
toku_fifo_entry_key_msn_cmp(const struct toku_fifo_entry_key_msn_cmp_extra &extra, const int32_t &ao, const int32_t &bo)
1849
{
1850 1851
    const struct fifo_entry *a = toku_fifo_get_entry(extra.fifo, ao);
    const struct fifo_entry *b = toku_fifo_get_entry(extra.fifo, bo);
1852 1853 1854 1855
    DBT adbt, bdbt;
    const DBT *akey = fill_dbt_for_fifo_entry(&adbt, a);
    const DBT *bkey = fill_dbt_for_fifo_entry(&bdbt, b);
    return key_msn_cmp(akey, bkey, a->msn, b->msn,
1856
                       extra.desc, extra.cmp);
1857 1858
}

1859
int
1860
toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp)
1861 1862 1863
// Effect: Enqueue the message represented by the parameters into the
//   bnc's buffer, and put it in either the fresh or stale message tree,
//   or the broadcast list.
1864 1865
//
// This is only exported for tests.
1866
{
1867
    int32_t offset;
1868 1869
    int r = toku_fifo_enq(bnc->buffer, key, keylen, data, datalen, type, msn, xids, is_fresh, &offset);
    assert_zero(r);
1870
    if (ft_msg_type_applies_once(type)) {
1871 1872
        DBT keydbt;
        struct toku_fifo_entry_key_msn_heaviside_extra extra = { .desc = desc, .cmp = cmp, .fifo = bnc->buffer, .key = toku_fill_dbt(&keydbt, key, keylen), .msn = msn };
1873
        if (is_fresh) {
1874
            r = bnc->fresh_message_tree.insert<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(offset, extra, nullptr);
1875
            assert_zero(r);
1876
        } else {
1877
            r = bnc->stale_message_tree.insert<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(offset, extra, nullptr);
1878
            assert_zero(r);
1879
        }
1880
    } else if (ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)) {
1881 1882
        const uint32_t idx = bnc->broadcast_list.size();
        r = bnc->broadcast_list.insert_at(offset, idx);
1883
        assert_zero(r);
1884
    } else {
1885
        abort();
1886
    }
1887 1888 1889 1890 1891 1892
    return r;
}

// append a cmd to a nonleaf node's child buffer
// should be static, but used by test programs
void
1893
toku_ft_append_to_child_buffer(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) {
1894
    assert(BP_STATE(node,childnum) == PT_AVAIL);
1895
    int r = toku_bnc_insert_msg(BNC(node, childnum), key->data, key->size, val->data, val->size, type, msn, xids, is_fresh, desc, compare_fun);
1896
    invariant_zero(r);
1897
    node->dirty = 1;
1898 1899
}

1900
static void ft_nonleaf_cmd_once_to_child (ft_compare_func compare_fun, DESCRIPTOR desc,  FTNODE node, unsigned int childnum, FT_MSG cmd, bool is_fresh)
1901 1902 1903
// Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint.  So now we are just putting it in the buffer here.
// Also we don't worry about the node getting overfull here.  It's the caller's problem.
{
1904
    toku_ft_append_to_child_buffer(compare_fun, desc, node, childnum, cmd->type, cmd->msn, cmd->xids, is_fresh, cmd->u.id.key, cmd->u.id.val);
1905
}
1906

1907 1908 1909 1910
/* Find the leftmost child that may contain the key.
 * If the key exists it will be in the child whose number
 * is the return value of this function.
 */
1911 1912
unsigned int toku_ftnode_which_child(FTNODE node, const DBT *k,
                                      DESCRIPTOR desc, ft_compare_func cmp) {
1913 1914
#define DO_PIVOT_SEARCH_LR 0
#if DO_PIVOT_SEARCH_LR
1915
    int i;
1916
    for (i=0; i<node->n_children-1; i++) {
1917
        int c = ft_compare_pivot(desc, cmp, k, d, &node->childkeys[i]);
1918 1919 1920
        if (c > 0) continue;
        if (c < 0) return i;
        return i;
1921
    }
1922
    return node->n_children-1;
1923
#else
1924 1925 1926
#endif
#define DO_PIVOT_SEARCH_RL 0
#if DO_PIVOT_SEARCH_RL
1927
    // give preference for appending to the dictionary.         no change for
1928
    // random keys
1929
    int i;
1930
    for (i = node->n_children-2; i >= 0; i--) {
1931
        int c = ft_compare_pivot(desc, cmp, k, d, &node->childkeys[i]);
1932
        if (c > 0) return i+1;
1933 1934 1935
    }
    return 0;
#endif
1936 1937 1938
#define DO_PIVOT_BIN_SEARCH 1
#if DO_PIVOT_BIN_SEARCH
    // a funny case of no pivots
1939
    if (node->n_children <= 1) return 0;
1940 1941

    // check the last key to optimize seq insertions
1942
    int n = node->n_children-1;
1943
    int c = ft_compare_pivot(desc, cmp, k, &node->childkeys[n-1]);
1944
    if (c > 0) return n;
1945 1946 1947 1948 1949 1950

    // binary search the pivots
    int lo = 0;
    int hi = n-1; // skip the last one, we checked it above
    int mi;
    while (lo < hi) {
1951
        mi = (lo + hi) / 2;
1952
        c = ft_compare_pivot(desc, cmp, k, &node->childkeys[mi]);
1953 1954 1955 1956 1957 1958 1959 1960 1961
        if (c > 0) {
            lo = mi+1;
            continue;
        }
        if (c < 0) {
            hi = mi;
            continue;
        }
        return mi;
1962 1963 1964
    }
    return lo;
#endif
1965 1966
}

Leif Walsh's avatar
Leif Walsh committed
1967 1968
// Used for HOT.
unsigned int
1969
toku_ftnode_hot_next_child(FTNODE node,
Leif Walsh's avatar
Leif Walsh committed
1970 1971
                            const DBT *k,
                            DESCRIPTOR desc,
1972
                            ft_compare_func cmp) {
Leif Walsh's avatar
Leif Walsh committed
1973 1974 1975 1976 1977
    int low = 0;
    int hi = node->n_children - 1;
    int mi;
    while (low < hi) {
        mi = (low + hi) / 2;
1978
        int r = ft_compare_pivot(desc, cmp, k, &node->childkeys[mi]);
Leif Walsh's avatar
Leif Walsh committed
1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992
        if (r > 0) {
            low = mi + 1;
        } else if (r < 0) {
            hi = mi;
        } else {
            // if they were exactly equal, then we want the sub-tree under
            // the next pivot.
            return mi + 1;
        }
    }
    invariant(low == hi);
    return low;
}

1993 1994 1995
// TODO Use this function to clean up other places where bits of messages are passed around
//      such as toku_bnc_insert_msg() and the call stack above it.
static size_t
1996
ft_msg_size(FT_MSG msg) {
1997 1998 1999
    size_t keylen = msg->u.id.key->size;
    size_t vallen = msg->u.id.val->size;
    size_t xids_size = xids_get_serialize_size(msg->xids);
2000
    size_t rval = keylen + vallen + KEY_VALUE_OVERHEAD + FT_CMD_OVERHEAD + xids_size;
2001 2002 2003 2004
    return rval;
}


2005
static void ft_nonleaf_cmd_once(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh)
2006 2007 2008 2009 2010
// Effect: Insert a message into a nonleaf.  We may put it into a child, possibly causing the child to become reactive.
//  We don't do the splitting and merging.  That's up to the caller after doing all the puts it wants to do.
//  The re_array[i] gets set to reactivity of any modified child.
{
    /* find the right subtree */
2011
    //TODO: accesses key, val directly
2012
    unsigned int childnum = toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun);
2013

2014
    ft_nonleaf_cmd_once_to_child (compare_fun, desc, node, childnum, cmd, is_fresh);
2015 2016
}

2017
static void
2018
ft_nonleaf_cmd_all (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh)
2019 2020
// Effect: Put the cmd into a nonleaf node.  We put it into all children, possibly causing the children to become reactive.
//  We don't do the splitting and merging.  That's up to the caller after doing all the puts it wants to do.
2021
//  The re_array[i] gets set to the reactivity of any modified child i.         (And there may be several such children.)
2022 2023
{
    int i;
2024
    for (i = 0; i < node->n_children; i++) {
2025
        ft_nonleaf_cmd_once_to_child(compare_fun, desc, node, i, cmd, is_fresh);
2026 2027 2028
    }
}

Yoni Fogel's avatar
Yoni Fogel committed
2029
static bool
2030
ft_msg_applies_once(FT_MSG cmd)
2031
{
2032
    return ft_msg_type_applies_once(cmd->type);
2033 2034
}

Yoni Fogel's avatar
Yoni Fogel committed
2035
static bool
2036
ft_msg_applies_all(FT_MSG cmd)
2037
{
2038
    return ft_msg_type_applies_all(cmd->type);
2039 2040
}

Yoni Fogel's avatar
Yoni Fogel committed
2041
static bool
2042
ft_msg_does_nothing(FT_MSG cmd)
2043
{
2044
    return ft_msg_type_does_nothing(cmd->type);
2045 2046 2047
}

static void
2048
ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh)
2049 2050
// Effect: Put the cmd into a nonleaf node.  We may put it into a child, possibly causing the child to become reactive.
//  We don't do the splitting and merging.  That's up to the caller after doing all the puts it wants to do.
2051
//  The re_array[i] gets set to the reactivity of any modified child i.         (And there may be several such children.)
2052 2053
//
{
2054 2055

    //
2056
    // see comments in toku_ft_leaf_apply_cmd
2057 2058
    // to understand why we handle setting
    // node->max_msn_applied_to_node_on_disk here,
2059
    // and don't do it in toku_ft_node_put_cmd
2060
    //
2061
    MSN cmd_msn = cmd->msn;
2062 2063
    invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
    node->max_msn_applied_to_node_on_disk = cmd_msn;
2064

2065
    //TODO: Accessing type directly
2066
    switch (cmd->type) {
2067 2068 2069 2070 2071 2072 2073
    case FT_INSERT_NO_OVERWRITE:
    case FT_INSERT:
    case FT_DELETE_ANY:
    case FT_ABORT_ANY:
    case FT_COMMIT_ANY:
    case FT_UPDATE:
        ft_nonleaf_cmd_once(compare_fun, desc, node, cmd, is_fresh);
2074
        return;
2075 2076 2077 2078 2079 2080 2081
    case FT_COMMIT_BROADCAST_ALL:
    case FT_COMMIT_BROADCAST_TXN:
    case FT_ABORT_BROADCAST_TXN:
    case FT_OPTIMIZE:
    case FT_OPTIMIZE_FOR_UPGRADE:
    case FT_UPDATE_BROADCAST_ALL:
        ft_nonleaf_cmd_all (compare_fun, desc, node, cmd, is_fresh);  // send message to all children
2082
        return;
2083
    case FT_NONE:
2084
        return;
2085
    }
2086
    abort(); // cannot happen
2087 2088
}

2089

Yoni Fogel's avatar
Yoni Fogel committed
2090
// return true if root changed, false otherwise
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2091 2092
static void
ft_process_maybe_reactive_root (FT ft, FTNODE *nodep) {
2093
    FTNODE node = *nodep;
2094
    toku_assert_entire_node_in_memory(node);
2095
    enum reactivity re = get_node_reactivity(node);
2096 2097
    switch (re) {
    case RE_STABLE:
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2098
        return;
2099
    case RE_FISSIBLE:
2100
    {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2101 2102
        ft_init_new_root(ft, node, nodep);
        return;
2103
    }
2104
    case RE_FUSIBLE:
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2105
        return; // Cannot merge anything at the root, so return happy.
2106
    }
2107
    abort(); // cannot happen
2108 2109
}

2110 2111 2112

// Garbage collect one leaf entry.
static void
2113
ft_basement_node_gc_once(BASEMENTNODE bn,
Yoni Fogel's avatar
Yoni Fogel committed
2114
                          uint32_t index,
2115
                          LEAFENTRY leaf_entry,
2116 2117 2118
                          const xid_omt_t &snapshot_xids,
                          const rx_omt_t &referenced_xids,
                          const xid_omt_t &live_root_txns,
2119 2120 2121 2122 2123 2124 2125 2126 2127 2128
                          STAT64INFO_S * delta)
{
    assert(leaf_entry);

    // There is no point in running GC if there is only one committed
    // leaf entry.
    if (leaf_entry->type != LE_MVCC || leaf_entry->u.mvcc.num_cxrs <= 1) { // MAKE ACCESSOR
        goto exit;
    }

2129 2130 2131 2132 2133 2134
    size_t oldsize;
    oldsize = 0;
    size_t newsize;
    newsize = 0;
    LEAFENTRY new_leaf_entry;
    new_leaf_entry = NULL;
2135 2136 2137 2138

    // The mempool doesn't free itself.  When it allocates new memory,
    // this pointer will be set to the older memory that must now be
    // freed.
2139 2140
    void * maybe_free;
    maybe_free = NULL;
2141 2142 2143 2144 2145 2146 2147 2148 2149 2150

    // Cache the size of the leaf entry.
    oldsize = leafentry_memsize(leaf_entry);
    garbage_collect_leafentry(leaf_entry,
                              &new_leaf_entry,
                              &newsize,
                              bn->buffer,
                              &bn->buffer_mempool,
                              &maybe_free,
                              snapshot_xids,
2151
                              referenced_xids,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2152
                              live_root_txns);
2153 2154 2155

    // These will represent the number of bytes and rows changed as
    // part of the garbage collection.
2156 2157 2158 2159
    int64_t numbytes_delta;
    numbytes_delta = newsize - oldsize;
    int64_t numrows_delta;
    numrows_delta = 0;
2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170
    if (new_leaf_entry) {
        // If we have a new leaf entry, we must update the size of the
        // memory object.
        bn->n_bytes_in_buffer -= oldsize;
        bn->n_bytes_in_buffer += newsize;
        toku_mempool_mfree(&bn->buffer_mempool, 0, oldsize);
        toku_omt_set_at(bn->buffer, new_leaf_entry, index);
        numrows_delta = 0;
    } else {
        // Our garbage collection removed the leaf entry so we must
        // remove it from the mempool.
2171
        ft_leaf_delete_leafentry (bn, index, leaf_entry);
2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186
        numrows_delta = -1;
    }

    // If we created a new mempool buffer we must free the
    // old/original buffer.
    if (maybe_free) {
        toku_free(maybe_free);
    }

    // Update stats.
    bn->stat64_delta.numrows += numrows_delta;
    bn->stat64_delta.numbytes += numbytes_delta;
    delta->numrows += numrows_delta;
    delta->numbytes += numbytes_delta;

2187
exit:
2188 2189 2190 2191 2192 2193
    return;
}

// Garbage collect all leaf entries for a given basement node.
static void
basement_node_gc_all_les(BASEMENTNODE bn,
2194 2195 2196
                         const xid_omt_t &snapshot_xids,
                         const rx_omt_t &referenced_xids,
                         const xid_omt_t &live_root_txns,
2197 2198 2199
                         STAT64INFO_S * delta)
{
    int r = 0;
Yoni Fogel's avatar
Yoni Fogel committed
2200 2201
    uint32_t index = 0;
    uint32_t num_leafentries_before;
2202 2203 2204 2205 2206
    while (index < (num_leafentries_before = toku_omt_size(bn->buffer))) {
        OMTVALUE storedatav = NULL;
        LEAFENTRY leaf_entry;
        r = toku_omt_fetch(bn->buffer, index, &storedatav);
        assert(r == 0);
2207
        CAST_FROM_VOIDP(leaf_entry, storedatav);
2208
        ft_basement_node_gc_once(bn, index, leaf_entry, snapshot_xids, referenced_xids, live_root_txns, delta);
2209 2210 2211 2212 2213 2214 2215 2216 2217
        // Check if the leaf entry was deleted or not.
        if (num_leafentries_before == toku_omt_size(bn->buffer)) {
            ++index;
        }
    }
}

// Garbage collect all leaf entires.
static void
2218 2219
ft_leaf_gc_all_les(FTNODE node,
                    FT h,
2220 2221 2222
                    const xid_omt_t &snapshot_xids,
                    const rx_omt_t &referenced_xids,
                    const xid_omt_t &live_root_txns)
2223 2224 2225 2226 2227 2228 2229 2230 2231 2232
{
    toku_assert_entire_node_in_memory(node);
    assert(node->height == 0);
    // Loop through each leaf entry, garbage collecting as we go.
    for (int i = 0; i < node->n_children; ++i) {
        // Perform the garbage collection.
        BASEMENTNODE bn = BLB(node, i);
        STAT64INFO_S delta;
        delta.numrows = 0;
        delta.numbytes = 0;
2233
        basement_node_gc_all_les(bn, snapshot_xids, referenced_xids, live_root_txns, &delta);
2234
        toku_ft_update_stats(&h->in_memory_stats, delta);
2235 2236 2237
    }
}

2238
int
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2239
toku_bnc_flush_to_child(
2240
    FT h,
2241
    NONLEAF_CHILDINFO bnc,
2242
    FTNODE child
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2243
    )
2244
{
Leif Walsh's avatar
Leif Walsh committed
2245
    assert(bnc);
2246
    assert(toku_fifo_n_entries(bnc->buffer)>0);
2247
    STAT64INFO_S stats_delta = {0,0};
2248 2249 2250 2251
    FIFO_ITERATE(
        bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
        ({
            DBT hk,hv;
2252 2253
            FT_MSG_S ftcmd = { type, msn, xids, .u = { .id = { toku_fill_dbt(&hk, key, keylen),
                                                               toku_fill_dbt(&hv, val, vallen) } } };
2254
            toku_ft_node_put_cmd(
2255 2256 2257
                h->compare_fun,
                h->update_fun,
                &h->cmp_descriptor,
2258
                child,
2259
                &ftcmd,
2260 2261
                is_fresh,
                &stats_delta
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2262
                );
2263
        }));
2264
    if (stats_delta.numbytes || stats_delta.numrows) {
2265
        toku_ft_update_stats(&h->in_memory_stats, stats_delta);
2266
    }
2267
    // Run garbage collection, if we are a leaf entry.
2268
    TOKULOGGER logger = toku_cachefile_logger(h->cf);
2269
    if (child->height == 0 && logger) {
2270 2271 2272
        xid_omt_t snapshot_txnids;
        rx_omt_t referenced_xids;
        xid_omt_t live_root_txns;
2273 2274
        toku_txn_manager_clone_state_for_gc(
            logger->txn_manager,
2275 2276 2277
            snapshot_txnids,
            referenced_xids,
            live_root_txns
2278 2279 2280 2281 2282
            );
        size_t buffsize = toku_fifo_buffer_size_in_use(bnc->buffer);
        STATUS_VALUE(FT_MSG_BYTES_OUT) += buffsize;
        // may be misleading if there's a broadcast message in there
        STATUS_VALUE(FT_MSG_BYTES_CURR) -= buffsize;
2283
        // Perform the garbage collection.
2284
        ft_leaf_gc_all_les(child, h, snapshot_txnids, referenced_xids, live_root_txns);
2285 2286

        // Free the OMT's we used for garbage collecting.
2287 2288 2289
        snapshot_txnids.destroy();
        live_root_txns.destroy();
        referenced_xids.destroy();
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2290
    }
2291

2292 2293 2294
    return 0;
}

2295
void bring_node_fully_into_memory(FTNODE node, FT h)
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2296
{
2297
    if (!is_entire_node_in_memory(node)) {
2298
        struct ftnode_fetch_extra bfe;
2299
        fill_bfe_for_full_read(&bfe, h);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2300 2301
        toku_cachetable_pf_pinned_pair(
            node,
2302
            toku_ftnode_pf_callback,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2303 2304 2305 2306 2307
            &bfe,
            h->cf,
            node->thisnodename,
            toku_cachetable_hash(h->cf, node->thisnodename)
            );
2308
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2309 2310
}

Leif Walsh's avatar
Leif Walsh committed
2311
void
2312 2313 2314
toku_ft_node_put_cmd (
    ft_compare_func compare_fun,
    ft_update_func update_fun,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2315
    DESCRIPTOR desc,
2316 2317
    FTNODE node,
    FT_MSG cmd,
2318 2319
    bool is_fresh,
    STAT64INFO stats_to_update
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2320
    )
2321 2322
// Effect: Push CMD into the subtree rooted at NODE.
//   If NODE is a leaf, then
2323
//   put CMD into leaf, applying it to the leafentries
2324 2325
//   If NODE is a nonleaf, then push the cmd into the FIFO(s) of the relevent child(ren).
//   The node may become overfull.  That's not our problem.
2326
{
2327
    toku_assert_entire_node_in_memory(node);
2328
    //
2329
    // see comments in toku_ft_leaf_apply_cmd
2330 2331 2332 2333
    // to understand why we don't handle setting
    // node->max_msn_applied_to_node_on_disk here,
    // and instead defer to these functions
    //
2334
    if (node->height==0) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2335
        uint64_t workdone = 0;
2336
        toku_ft_leaf_apply_cmd(
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2337 2338 2339
            compare_fun,
            update_fun,
            desc,
2340 2341
            node,
            cmd,
2342 2343
            &workdone,
            stats_to_update
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2344
            );
2345
    } else {
2346
        ft_nonleaf_put_cmd(compare_fun, desc, node, cmd, is_fresh);
2347
    }
2348 2349
}

2350
static const struct pivot_bounds infinite_bounds = {.lower_bound_exclusive=NULL,
2351
                                                    .upper_bound_inclusive=NULL};
2352

Zardosht Kasheff's avatar
Zardosht Kasheff committed
2353

2354
// Effect: applies the cmd to the leaf if the appropriate basement node is in memory.
2355 2356
//           This function is called during message injection and/or flushing, so the entire
//           node MUST be in memory.
2357 2358 2359
void toku_ft_leaf_apply_cmd(
    ft_compare_func compare_fun,
    ft_update_func update_fun,
2360
    DESCRIPTOR desc,
2361 2362
    FTNODE node,
    FT_MSG cmd,
2363 2364
    uint64_t *workdone,
    STAT64INFO stats_to_update
2365
    )
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2366
{
2367
    VERIFY_NODE(t, node);
2368
    toku_assert_entire_node_in_memory(node);
2369

2370
    //
2371
    // Because toku_ft_leaf_apply_cmd is called with the intent of permanently
2372 2373
    // applying a message to a leaf node (meaning the message is permanently applied
    // and will be purged from the system after this call, as opposed to
2374
    // toku_apply_ancestors_messages_to_node, which applies a message
2375 2376
    // for a query, but the message may still reside in the system and
    // be reapplied later), we mark the node as dirty and
2377 2378
    // take the opportunity to update node->max_msn_applied_to_node_on_disk.
    //
2379
    node->dirty = 1;
2380

2381 2382 2383
    //
    // we cannot blindly update node->max_msn_applied_to_node_on_disk,
    // we must check to see if the msn is greater that the one already stored,
2384
    // because the cmd may have already been applied earlier (via
2385
    // toku_apply_ancestors_messages_to_node) to answer a query
2386
    //
2387
    // This is why we handle node->max_msn_applied_to_node_on_disk both here
2388
    // and in ft_nonleaf_put_cmd, as opposed to in one location, toku_ft_node_put_cmd.
2389
    //
2390
    MSN cmd_msn = cmd->msn;
2391 2392 2393
    if (cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn) {
        node->max_msn_applied_to_node_on_disk = cmd_msn;
    }
2394

2395 2396
    if (ft_msg_applies_once(cmd)) {
        unsigned int childnum = toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun);
2397 2398
        if (cmd->msn.msn > BLB(node, childnum)->max_msn_applied.msn) {
            BLB(node, childnum)->max_msn_applied = cmd->msn;
2399
            toku_ft_bn_apply_cmd(compare_fun,
2400 2401 2402 2403 2404 2405
                                  update_fun,
                                  desc,
                                  BLB(node, childnum),
                                  cmd,
                                  workdone,
                                  stats_to_update);
2406
        } else {
2407
            STATUS_VALUE(FT_MSN_DISCARDS)++;
2408 2409
        }
    }
2410
    else if (ft_msg_applies_all(cmd)) {
2411
        for (int childnum=0; childnum<node->n_children; childnum++) {
2412 2413
            if (cmd->msn.msn > BLB(node, childnum)->max_msn_applied.msn) {
                BLB(node, childnum)->max_msn_applied = cmd->msn;
2414
                toku_ft_bn_apply_cmd(compare_fun,
2415 2416 2417 2418 2419 2420
                                      update_fun,
                                      desc,
                                      BLB(node, childnum),
                                      cmd,
                                      workdone,
                                      stats_to_update);
2421
            } else {
2422
                STATUS_VALUE(FT_MSN_DISCARDS)++;
2423
            }
2424
        }
2425
    }
2426
    else if (!ft_msg_does_nothing(cmd)) {
Yoni Fogel's avatar
Yoni Fogel committed
2427
        assert(false);
2428 2429 2430 2431
    }
    VERIFY_NODE(t, node);
}

2432
static void push_something_at_root (FT h, FTNODE *nodep, FT_MSG cmd)
2433
// Effect:  Put CMD into brt's root node, and update
2434
// the value of root's max_msn_applied_to_node_on_disk
2435
{
2436
    FTNODE node = *nodep;
2437
    toku_assert_entire_node_in_memory(node);
2438 2439
    MSN cmd_msn = cmd->msn;
    invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
2440
    STAT64INFO_S stats_delta = {0,0};
2441
    toku_ft_node_put_cmd(
2442 2443 2444
        h->compare_fun,
        h->update_fun,
        &h->cmp_descriptor,
2445 2446
        node,
        cmd,
2447 2448
        true,
        &stats_delta
2449
        );
2450
    if (stats_delta.numbytes || stats_delta.numrows) {
2451
        toku_ft_update_stats(&h->in_memory_stats, stats_delta);
2452
    }
2453
    //
2454
    // assumption is that toku_ft_node_put_cmd will
2455 2456 2457 2458
    // mark the node as dirty.
    // enforcing invariant here.
    //
    invariant(node->dirty != 0);
2459 2460 2461

    // update some status variables
    if (node->height != 0) {
2462 2463 2464 2465 2466
        uint64_t msgsize = ft_msg_size(cmd);
        STATUS_VALUE(FT_MSG_BYTES_IN) += msgsize;
        STATUS_VALUE(FT_MSG_BYTES_CURR) += msgsize;
        if (STATUS_VALUE(FT_MSG_BYTES_CURR) > STATUS_VALUE(FT_MSG_BYTES_MAX)) {
            STATUS_VALUE(FT_MSG_BYTES_MAX) = STATUS_VALUE(FT_MSG_BYTES_CURR);
2467
        }
2468 2469 2470
        STATUS_VALUE(FT_MSG_NUM)++;
        if (ft_msg_applies_all(cmd)) {
            STATUS_VALUE(FT_MSG_NUM_BROADCAST)++;
2471
        }
2472
    }
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
2473 2474
}

2475
int
2476
toku_ft_root_put_cmd (FT ft, FT_MSG_S * cmd)
2477
// Effect:
2478
//  - assign msn to cmd
2479 2480
//  - push the cmd into the brt
//  - cmd will set new msn in tree
2481
{
2482 2483 2484 2485 2486
    // blackhole fractal trees drop all messages, so do nothing.
    if (ft->blackhole) {
        return 0;
    }

2487
    FTNODE node;
2488
    CACHEKEY root_key;
2489
    //assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
2490
    assert(ft);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2491 2492 2493 2494 2495 2496 2497
    //
    // As of Dr. Noga, the following code is currently protected by two locks:
    //  - the ydb lock
    //  - header's tree lock
    //
    // We hold the header's tree lock to stop other threads from
    // descending down the tree while the root node may change.
2498
    // The root node may change when ft_process_maybe_reactive_root is called.
2499
    // Other threads (such as the cleaner thread or hot optimize) that want to
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2500 2501 2502 2503 2504 2505 2506 2507 2508
    // descend down the tree must grab the header's tree lock, so they are
    // ensured that what they think is the root's blocknum is actually
    // the root's blocknum.
    //
    // We also hold the ydb lock for a number of reasons, but an important
    // one is to make sure that a begin_checkpoint may not start while
    // this code is executing. A begin_checkpoint does (at least) two things
    // that can interfere with the operations here:
    //  - copies the header to a checkpoint header. Because we may change
2509
    //    the root blocknum below, we don't want the header to be copied in
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2510
    //    the middle of these operations.
2511 2512 2513
    //  - Takes note of the log's LSN. Because this put operation has
    //     already been logged, this message injection must be included
    //     in any checkpoint that contains this put's logentry.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2514 2515 2516 2517 2518 2519
    //     Holding the ydb lock throughout this function ensures that fact.
    //  As of Dr. Noga, I (Zardosht) THINK these are the only reasons why
    //  the ydb lock must be held for this function, but there may be
    //  others
    //
    {
Yoni Fogel's avatar
Yoni Fogel committed
2520
        uint32_t fullhash;
2521
        toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2522

Zardosht Kasheff's avatar
Zardosht Kasheff committed
2523
        // get the root node
2524
        struct ftnode_fetch_extra bfe;
2525
        fill_bfe_for_full_read(&bfe, ft);
2526
        toku_pin_ftnode_off_client_thread(
2527
            ft,
2528
            root_key,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2529
            fullhash,
2530
            &bfe,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2531
            PL_WRITE_EXPENSIVE, // may_modify_node
2532 2533
            0,
            NULL,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2534 2535
            &node
            );
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2536
        toku_assert_entire_node_in_memory(node);
2537

2538
        //VERIFY_NODE(brt, node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2539
        assert(node->fullhash==fullhash);
2540
        ft_verify_flags(ft, node);
2541

Zardosht Kasheff's avatar
Zardosht Kasheff committed
2542
        // first handle a reactive root, then put in the message
2543 2544
        // ft_process_maybe_reactive_root may release and re-pin the root
        // so we cannot set the msn yet.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2545
        ft_process_maybe_reactive_root(ft, &node);
2546 2547 2548 2549 2550 2551

        cmd->msn.msn = node->max_msn_applied_to_node_on_disk.msn + 1;
        // Note, the lower level function that filters messages based on
        // msn, (toku_ft_bn_apply_cmd() or ft_nonleaf_put_cmd()) will capture
        // the msn and store it in the relevant node, including the root
        // node. This is how the new msn is set in the root.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2552
    }
2553
    push_something_at_root(ft, &node, cmd);
2554
    // verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock)
2555
    invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2556 2557 2558

    // if we call flush_some_child, then that function unpins the root
    // otherwise, we unpin ourselves
2559
    if (node->height > 0 && toku_ft_nonleaf_is_gorged(node)) {
2560
        flush_node_on_background_thread(ft, node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2561 2562
    }
    else {
2563
        toku_unpin_ftnode(ft, node);  // unpin root
2564
    }
2565

2566 2567 2568
    return 0;
}

2569
// Effect: Insert the key-val pair into brt.
2570
int toku_ft_insert (FT_HANDLE brt, DBT *key, DBT *val, TOKUTXN txn) {
Yoni Fogel's avatar
Yoni Fogel committed
2571
    return toku_ft_maybe_insert(brt, key, val, txn, false, ZERO_LSN, true, FT_INSERT);
2572 2573
}

Yoni Fogel's avatar
Yoni Fogel committed
2574
int
2575
toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) {
Yoni Fogel's avatar
Yoni Fogel committed
2576
    int r = 0;
2577
    assert(txn);
Yoni Fogel's avatar
Yoni Fogel committed
2578
    toku_txn_force_fsync_on_commit(txn);  //If the txn commits, the commit MUST be in the log
2579
                                          //before the (old) file is actually unlinked
Yoni Fogel's avatar
Yoni Fogel committed
2580 2581
    TOKULOGGER logger = toku_txn_logger(txn);

2582
    BYTESTRING new_iname_bs = {.len=(uint32_t) strlen(new_iname), .data=(char*)new_iname};
2583
    r = toku_logger_save_rollback_load(txn, old_filenum, &new_iname_bs);
Yoni Fogel's avatar
Yoni Fogel committed
2584
    if (r==0 && do_log && logger) {
2585
        TXNID xid = toku_txn_get_txnid(txn);
2586
        r = toku_log_load(logger, load_lsn, do_fsync, txn, xid, old_filenum, new_iname_bs);
Yoni Fogel's avatar
Yoni Fogel committed
2587 2588 2589 2590
    }
    return r;
}

2591 2592 2593 2594 2595
// 2954
// this function handles the tasks needed to be recoverable
//  - write to rollback log
//  - write to recovery log
int
2596
toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn)
2597 2598 2599 2600 2601 2602 2603 2604
{
    int r = 0;
    assert(txn);
    TOKULOGGER logger = toku_txn_logger(txn);

    // write to the rollback log
    r = toku_logger_save_rollback_hot_index(txn, &filenums);
    if ( r==0 && do_log && logger) {
2605 2606
        TXNID xid = toku_txn_get_txnid(txn);
        // write to the recovery log
2607
        r = toku_log_hot_index(logger, hot_index_lsn, do_fsync, txn, xid, filenums);
2608 2609 2610
    }
    return r;
}
2611

2612 2613
// Effect: Optimize the brt.
int
2614
toku_ft_optimize (FT_HANDLE brt) {
2615 2616
    int r = 0;

2617
    TOKULOGGER logger = toku_cachefile_logger(brt->ft->cf);
2618
    if (logger) {
2619
        TXNID oldest = toku_txn_manager_get_oldest_living_xid(logger->txn_manager);
2620

2621 2622 2623 2624 2625 2626 2627 2628 2629
        XIDS root_xids = xids_get_root_xids();
        XIDS message_xids;
        if (oldest == TXNID_NONE_LIVING) {
            message_xids = root_xids;
        }
        else {
            r = xids_create_child(root_xids, &message_xids, oldest);
            invariant(r==0);
        }
2630

2631 2632 2633 2634
        DBT key;
        DBT val;
        toku_init_dbt(&key);
        toku_init_dbt(&val);
2635
        FT_MSG_S ftcmd = { FT_OPTIMIZE, ZERO_MSN, message_xids, .u = { .id = {&key,&val} } };
2636 2637 2638
        r = toku_ft_root_put_cmd(brt->ft, &ftcmd);
        xids_destroy(&message_xids);
    }
2639 2640 2641
    return r;
}

Yoni Fogel's avatar
Yoni Fogel committed
2642
int
2643
toku_ft_load(FT_HANDLE brt, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *load_lsn) {
Yoni Fogel's avatar
Yoni Fogel committed
2644
    int r = 0;
2645
    FILENUM old_filenum = toku_cachefile_filenum(brt->ft->cf);
Yoni Fogel's avatar
Yoni Fogel committed
2646
    int do_log = 1;
2647
    r = toku_ft_load_recovery(txn, old_filenum, new_iname, do_fsync, do_log, load_lsn);
Yoni Fogel's avatar
Yoni Fogel committed
2648 2649 2650
    return r;
}

2651 2652
// 2954
// brt actions for logging hot index filenums
2653
int
2654
toku_ft_hot_index(FT_HANDLE brt __attribute__ ((unused)), TOKUTXN txn, FILENUMS filenums, int do_fsync, LSN *lsn) {
2655 2656
    int r = 0;
    int do_log = 1;
2657
    r = toku_ft_hot_index_recovery(txn, filenums, do_fsync, do_log, lsn);
2658 2659 2660
    return r;
}

2661
int
2662
toku_ft_log_put (TOKUTXN txn, FT_HANDLE brt, const DBT *key, const DBT *val) {
2663 2664
    int r = 0;
    TOKULOGGER logger = toku_txn_logger(txn);
2665
    if (logger && brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
2666 2667
        BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
        BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2668
        TXNID xid = toku_txn_get_txnid(txn);
2669
        // if (type == FT_INSERT)
2670
            r = toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(brt->ft->cf), xid, keybs, valbs);
2671
        // else
2672
            // r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs, valbs);
2673 2674 2675 2676
    }
    return r;
}

2677
int
2678
toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val) {
2679
    int r = 0;
2680
    assert(txn);
2681
    assert(num_fts > 0);
2682 2683
    TOKULOGGER logger = toku_txn_logger(txn);
    if (logger) {
2684
        FILENUM         fnums[num_fts];
2685
        int i;
2686
        uint32_t num_unsuppressed_fts = 0;
2687
        for (i = 0; i < num_fts; i++) {
2688
            if (brts[i]->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
2689
                //Logging not suppressed for this brt.
2690
                fnums[num_unsuppressed_fts++] = toku_cachefile_filenum(brts[i]->ft->cf);
2691 2692
            }
        }
2693 2694
        if (num_unsuppressed_fts) {
            FILENUMS filenums = {.num = num_unsuppressed_fts, .filenums = fnums};
2695 2696
            BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
            BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2697
            TXNID xid = toku_txn_get_txnid(txn);
2698
            FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE;
2699
            r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs);
2700
        }
2701 2702 2703 2704
    }
    return r;
}

2705
int
Yoni Fogel's avatar
Yoni Fogel committed
2706
toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging, enum ft_msg_type type) {
2707
    assert(type==FT_INSERT || type==FT_INSERT_NO_OVERWRITE);
2708
    int r = 0;
2709
    XIDS message_xids = xids_get_root_xids(); //By default use committed messages
2710
    TXNID xid = toku_txn_get_txnid(txn);
2711
    if (txn) {
2712
        if (ft_h->ft->txnid_that_created_or_locked_when_empty != xid) {
2713
            BYTESTRING keybs = {key->size, (char *) key->data};
2714
            r = toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
2715
            if (r!=0) return r;
2716
            toku_txn_maybe_note_ft(txn, ft_h->ft);
2717 2718 2719
            //We have transactions, and this is not 2440.  We must send the full root-to-leaf-path
            message_xids = toku_txn_get_xids(txn);
        }
2720
        else if (txn->ancestor_txnid64 != ft_h->ft->h->root_xid_that_created) {
2721 2722 2723
            //We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary.         We must send the full root-to-leaf-path
            message_xids = toku_txn_get_xids(txn);
        }
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
2724
    }
2725
    TOKULOGGER logger = toku_txn_logger(txn);
2726
    if (do_logging && logger &&
2727
        ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
2728 2729
        BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
        BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2730
        if (type == FT_INSERT) {
2731
            r = toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
2732 2733
        }
        else {
2734
            r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
2735 2736
        }
        if (r!=0) return r;
2737 2738
    }

2739
    LSN treelsn;
2740
    if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2741
        r = 0;
2742
    } else {
2743
        r = toku_ft_send_insert(ft_h, key, val, message_xids, type);
2744
    }
2745 2746
    return r;
}
2747

2748
static int
2749
ft_send_update_msg(FT_HANDLE brt, FT_MSG_S *msg, TOKUTXN txn) {
2750
    msg->xids = (txn
2751 2752
                 ? toku_txn_get_xids(txn)
                 : xids_get_root_xids());
2753
    int r = toku_ft_root_put_cmd(brt->ft, msg);
2754 2755 2756 2757
    return r;
}

int
2758
toku_ft_maybe_update(FT_HANDLE ft_h, const DBT *key, const DBT *update_function_extra,
Yoni Fogel's avatar
Yoni Fogel committed
2759 2760
                      TOKUTXN txn, bool oplsn_valid, LSN oplsn,
                      bool do_logging) {
2761 2762 2763 2764
    int r = 0;

    TXNID xid = toku_txn_get_txnid(txn);
    if (txn) {
2765
        BYTESTRING keybs = { key->size, (char *) key->data };
2766
        r = toku_logger_save_rollback_cmdupdate(
2767
            txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
2768
        if (r != 0) { goto cleanup; }
2769
        toku_txn_maybe_note_ft(txn, ft_h->ft);
2770 2771
    }

2772 2773
    TOKULOGGER logger;
    logger = toku_txn_logger(txn);
2774
    if (do_logging && logger &&
2775
        ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
2776
        BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2777
        BYTESTRING extrabs = {.len=update_function_extra->size,
2778
                              .data = (char *) update_function_extra->data};
2779
        r = toku_log_enq_update(logger, NULL, 0, txn,
2780
                                toku_cachefile_filenum(ft_h->ft->cf),
2781 2782
                                xid, keybs, extrabs);
        if (r != 0) { goto cleanup; }
2783 2784 2785 2786
    }

    LSN treelsn;
    if (oplsn_valid &&
2787
        oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2788
        r = 0;
2789
    } else {
2790
        FT_MSG_S msg = { FT_UPDATE, ZERO_MSN, NULL,
2791
                         .u = { .id = { key, update_function_extra } } };
2792
        r = ft_send_update_msg(ft_h, &msg, txn);
2793 2794 2795 2796 2797 2798 2799
    }

cleanup:
    return r;
}

int
2800
toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_extra,
Yoni Fogel's avatar
Yoni Fogel committed
2801 2802
                                TOKUTXN txn, bool oplsn_valid, LSN oplsn,
                                bool do_logging, bool is_resetting_op) {
2803 2804 2805
    int r = 0;

    TXNID xid = toku_txn_get_txnid(txn);
Yoni Fogel's avatar
Yoni Fogel committed
2806
    uint8_t  resetting = is_resetting_op ? 1 : 0;
2807
    if (txn) {
2808
        r = toku_logger_save_rollback_cmdupdatebroadcast(txn, toku_cachefile_filenum(ft_h->ft->cf), resetting);
2809
        if (r != 0) { goto cleanup; }
2810
        toku_txn_maybe_note_ft(txn, ft_h->ft);
2811 2812
    }

2813 2814
    TOKULOGGER logger;
    logger = toku_txn_logger(txn);
2815
    if (do_logging && logger &&
2816
        ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
2817
        BYTESTRING extrabs = {.len=update_function_extra->size,
2818
                              .data = (char *) update_function_extra->data};
2819
        r = toku_log_enq_updatebroadcast(logger, NULL, 0, txn,
2820
                                         toku_cachefile_filenum(ft_h->ft->cf),
2821 2822
                                         xid, extrabs, resetting);
        if (r != 0) { goto cleanup; }
2823 2824
    }

2825
    //TODO(yoni): remove treelsn here and similar calls (no longer being used)
2826 2827
    LSN treelsn;
    if (oplsn_valid &&
2828
        oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2829
        r = 0;
2830
    } else {
2831 2832
        DBT nullkey;
        const DBT *nullkeyp = toku_init_dbt(&nullkey);
2833
        FT_MSG_S msg = { FT_UPDATE_BROADCAST_ALL, ZERO_MSN, NULL,
2834
                         .u = { .id = { nullkeyp, update_function_extra } } };
2835
        r = ft_send_update_msg(ft_h, &msg, txn);
2836 2837 2838 2839 2840 2841
    }

cleanup:
    return r;
}

2842
int
2843
toku_ft_send_insert(FT_HANDLE brt, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type) {
2844
    FT_MSG_S ftcmd = { type, ZERO_MSN, xids, .u = { .id = { key, val } } };
2845
    int r = toku_ft_root_put_cmd(brt->ft, &ftcmd);
2846 2847 2848 2849
    return r;
}

int
2850
toku_ft_send_commit_any(FT_HANDLE brt, DBT *key, XIDS xids) {
2851
    DBT val;
2852
    FT_MSG_S ftcmd = { FT_COMMIT_ANY, ZERO_MSN, xids, .u = { .id = { key, toku_init_dbt(&val) } } };
2853
    int r = toku_ft_root_put_cmd(brt->ft, &ftcmd);
2854 2855 2856
    return r;
}

2857
int toku_ft_delete(FT_HANDLE brt, DBT *key, TOKUTXN txn) {
Yoni Fogel's avatar
Yoni Fogel committed
2858
    return toku_ft_maybe_delete(brt, key, txn, false, ZERO_LSN, true);
2859 2860
}

2861
int
2862
toku_ft_log_del(TOKUTXN txn, FT_HANDLE brt, const DBT *key) {
2863 2864
    int r = 0;
    TOKULOGGER logger = toku_txn_logger(txn);
2865
    if (logger && brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
2866
        BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2867
        TXNID xid = toku_txn_get_txnid(txn);
2868
        r = toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(brt->ft->cf), xid, keybs);
2869 2870 2871 2872
    }
    return r;
}

2873
int
2874
toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val) {
2875
    int r = 0;
2876
    assert(txn);
2877
    assert(num_fts > 0);
2878 2879
    TOKULOGGER logger = toku_txn_logger(txn);
    if (logger) {
2880
        FILENUM         fnums[num_fts];
2881
        int i;
2882
        uint32_t num_unsuppressed_fts = 0;
2883
        for (i = 0; i < num_fts; i++) {
2884
            if (brts[i]->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
2885
                //Logging not suppressed for this brt.
2886
                fnums[num_unsuppressed_fts++] = toku_cachefile_filenum(brts[i]->ft->cf);
2887 2888
            }
        }
2889 2890
        if (num_unsuppressed_fts) {
            FILENUMS filenums = {.num = num_unsuppressed_fts, .filenums = fnums};
2891 2892
            BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
            BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2893
            TXNID xid = toku_txn_get_txnid(txn);
2894
            FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE;
2895
            r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs);
2896
        }
2897 2898 2899 2900
    }
    return r;
}

2901
int
Yoni Fogel's avatar
Yoni Fogel committed
2902
toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging) {
2903
    int r;
2904
    XIDS message_xids = xids_get_root_xids(); //By default use committed messages
2905
    TXNID xid = toku_txn_get_txnid(txn);
2906
    if (txn) {
2907
        if (ft_h->ft->txnid_that_created_or_locked_when_empty != xid) {
2908
            BYTESTRING keybs = {key->size, (char *) key->data};
2909
            r = toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
2910
            if (r!=0) return r;
2911
            toku_txn_maybe_note_ft(txn, ft_h->ft);
2912 2913 2914
            //We have transactions, and this is not 2440.  We must send the full root-to-leaf-path
            message_xids = toku_txn_get_xids(txn);
        }
2915
        else if (txn->ancestor_txnid64 != ft_h->ft->h->root_xid_that_created) {
2916 2917 2918
            //We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary.         We must send the full root-to-leaf-path
            message_xids = toku_txn_get_xids(txn);
        }
2919
    }
2920
    TOKULOGGER logger = toku_txn_logger(txn);
2921
    if (do_logging && logger &&
2922
        ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
2923
        BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2924
        r = toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs);
2925
        if (r!=0) return r;
2926
    }
2927

2928
    LSN treelsn;
2929
    if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2930
        r = 0;
2931
    } else {
2932
        r = toku_ft_send_delete(ft_h, key, message_xids);
2933
    }
2934 2935 2936
    return r;
}

2937
int
2938
toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids) {
2939
    DBT val; toku_init_dbt(&val);
2940
    FT_MSG_S ftcmd = { FT_DELETE_ANY, ZERO_MSN, xids, .u = { .id = { key, &val } } };
2941
    int result = toku_ft_root_put_cmd(brt->ft, &ftcmd);
2942 2943 2944
    return result;
}

2945 2946 2947 2948 2949 2950 2951
/* mempool support */

struct omt_compressor_state {
    struct mempool *new_kvspace;
    OMT omt;
};

Yoni Fogel's avatar
Yoni Fogel committed
2952
static int move_it (OMTVALUE lev, uint32_t idx, void *v) {
2953 2954
    LEAFENTRY CAST_FROM_VOIDP(le, lev);
    struct omt_compressor_state *CAST_FROM_VOIDP(oc, v);
Yoni Fogel's avatar
Yoni Fogel committed
2955
    uint32_t size = leafentry_memsize(le);
2956 2957
    LEAFENTRY CAST_FROM_VOIDP(newdata, toku_mempool_malloc(oc->new_kvspace, size, 1));
    lazy_assert(newdata); // we do this on a fresh mempool, so nothing bad should happen
2958 2959 2960 2961 2962 2963 2964 2965
    memcpy(newdata, le, size);
    toku_omt_set_at(oc->omt, newdata, idx);
    return 0;
}

// Compress things, and grow the mempool if needed.
// TODO 4092 should copy data to new memory, then call toku_mempool_destory() followed by toku_mempool_init()
static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_size, void **maybe_free) {
Yoni Fogel's avatar
Yoni Fogel committed
2966
    uint32_t total_size_needed = memp->free_offset-memp->frag_size + added_size;
2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996
    if (total_size_needed+total_size_needed/4 >= memp->size) {
        memp->size = total_size_needed+total_size_needed/4;
    }
    void *newmem = toku_xmalloc(memp->size);
    struct mempool new_kvspace;
    toku_mempool_init(&new_kvspace, newmem, memp->size);
    struct omt_compressor_state oc = { &new_kvspace, omt };
    toku_omt_iterate(omt, move_it, &oc);

    if (maybe_free) {
        *maybe_free = memp->base;
    } else {
        toku_free(memp->base);
    }
    *memp = new_kvspace;
    return 0;
}

void *
mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size, void **maybe_free) {
    void *v = toku_mempool_malloc(mp, size, 1);
    if (v==0) {
        if (0 == omt_compress_kvspace(omt, mp, size, maybe_free)) {
            v = toku_mempool_malloc(mp, size, 1);
            lazy_assert(v);
        }
    }
    return v;
}

2997 2998
/* ******************** open,close and create  ********************** */

2999
// Test only function (not used in running system). This one has no env
3000
int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *ft_handle_p, int nodesize,
3001 3002 3003 3004
                   int basementnodesize,
                   enum toku_compression_method compression_method,
                   CACHETABLE cachetable, TOKUTXN txn,
                   int (*compare_fun)(DB *, const DBT*,const DBT*)) {
3005
    FT_HANDLE brt;
3006 3007 3008
    int r;
    const int only_create = 0;

3009
    r = toku_ft_handle_create(&brt);
3010
    if (r != 0)
3011
        return r;
3012 3013 3014
    toku_ft_handle_set_nodesize(brt, nodesize);
    toku_ft_handle_set_basementnodesize(brt, basementnodesize);
    toku_ft_handle_set_compression_method(brt, compression_method);
3015
    r = toku_ft_set_bt_compare(brt, compare_fun); assert_zero(r);
3016

3017
    r = toku_ft_handle_open(brt, fname, is_create, only_create, cachetable, txn);
3018
    if (r != 0) {
3019
        return r;
3020 3021
    }

3022
    *ft_handle_p = brt;
3023 3024 3025
    return r;
}

3026 3027
// open a file for use by the brt
// Requires:  File does not exist.
3028
static int ft_create_file(FT_HANDLE UU(brt), const char *fname, int *fdp) {
3029
    mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
3030
    int r;
3031
    int fd;
3032
    int er;
3033
    fd = open(fname, O_RDWR | O_BINARY, mode);
3034
    assert(fd==-1);
3035 3036
    if ((er = get_maybe_error_errno()) != ENOENT) {
        return er;
3037 3038
    }
    fd = open(fname, O_RDWR | O_CREAT | O_BINARY, mode);
3039
    if (fd==-1) {
3040
        r = get_error_errno();
3041
        return r;
3042
    }
3043 3044

    r = toku_fsync_directory(fname);
3045
    resource_assert_zero(r);
3046

3047 3048 3049 3050 3051
    *fdp = fd;
    return 0;
}

// open a file for use by the brt.  if the file does not exist, error
3052
static int ft_open_file(const char *fname, int *fdp) {
3053 3054 3055 3056
    mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
    int fd;
    fd = open(fname, O_RDWR | O_BINARY, mode);
    if (fd==-1) {
3057
        return get_error_errno();
3058 3059 3060 3061 3062
    }
    *fdp = fd;
    return 0;
}

3063 3064
void
toku_ft_handle_set_compression_method(FT_HANDLE t, enum toku_compression_method method)
3065
{
3066 3067 3068 3069 3070 3071
    if (t->ft) {
        toku_ft_set_compression_method(t->ft, method);
    }
    else {
        t->options.compression_method = method;
    }
3072 3073
}

3074 3075
void
toku_ft_handle_get_compression_method(FT_HANDLE t, enum toku_compression_method *methodp)
3076
{
3077 3078 3079 3080 3081 3082
    if (t->ft) {
        toku_ft_get_compression_method(t->ft, methodp);
    }
    else {
        *methodp = t->options.compression_method;
    }
3083 3084
}

3085
static int
Yoni Fogel's avatar
Yoni Fogel committed
3086
verify_builtin_comparisons_consistent(FT_HANDLE t, uint32_t flags) {
3087
    if ((flags & TOKU_DB_KEYCMP_BUILTIN) && (t->options.compare_fun != toku_builtin_compare_fun))
3088
        return EINVAL;
3089 3090 3091
    return 0;
}

3092 3093 3094 3095
//
// See comments in toku_db_change_descriptor to understand invariants 
// in the system when this function is called
//
3096
int
3097
toku_ft_change_descriptor(
3098
    FT_HANDLE ft_h,
3099 3100
    const DBT* old_descriptor,
    const DBT* new_descriptor,
Yoni Fogel's avatar
Yoni Fogel committed
3101
    bool do_log,
3102
    TOKUTXN txn,
Yoni Fogel's avatar
Yoni Fogel committed
3103
    bool update_cmp_descriptor
3104
    )
3105
{
3106
    int r = 0;
3107 3108
    DESCRIPTOR_S new_d;

3109 3110 3111 3112 3113 3114
    // if running with txns, save to rollback + write to recovery log
    if (txn) {
        // put information into rollback file
        BYTESTRING old_desc_bs = { old_descriptor->size, (char *) old_descriptor->data };
        BYTESTRING new_desc_bs = { new_descriptor->size, (char *) new_descriptor->data };
        r = toku_logger_save_rollback_change_fdescriptor(
3115
            txn,
3116
            toku_cachefile_filenum(ft_h->ft->cf),
3117
            &old_desc_bs
3118 3119
            );
        if (r != 0) { goto cleanup; }
3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135
        toku_txn_maybe_note_ft(txn, ft_h->ft);

        if (do_log) {
            TOKULOGGER logger = toku_txn_logger(txn);
            TXNID xid = toku_txn_get_txnid(txn);
            r = toku_log_change_fdescriptor(
                logger, NULL, 0,
                txn,
                toku_cachefile_filenum(ft_h->ft->cf),
                xid,
                old_desc_bs,
                new_desc_bs,
                update_cmp_descriptor
                );
            if (r != 0) { goto cleanup; }
        }
3136
    }
3137 3138 3139

    // write new_descriptor to header
    new_d.dbt = *new_descriptor;
3140
    toku_ft_update_descriptor(ft_h->ft, &new_d);
3141 3142
    // very infrequent operation, worth precise threadsafe count
    if (r == 0) {
3143
        STATUS_VALUE(FT_DESCRIPTOR_SET)++;
3144
    }
3145 3146
    if (r!=0) goto cleanup;

3147
    if (update_cmp_descriptor) {
3148
        toku_ft_update_cmp_descriptor(ft_h->ft);
3149
    }
3150 3151 3152 3153
cleanup:
    return r;
}

3154 3155 3156
static void
toku_ft_handle_inherit_options(FT_HANDLE t, FT ft) {
    struct ft_options options = {
3157 3158 3159 3160
        .nodesize = ft->h->nodesize,
        .basementnodesize = ft->h->basementnodesize,
        .compression_method = ft->h->compression_method,
        .flags = ft->h->flags,
3161 3162 3163 3164
        .compare_fun = ft->compare_fun,
        .update_fun = ft->update_fun
    };
    t->options = options;
Yoni Fogel's avatar
Yoni Fogel committed
3165
    t->did_set_flags = true;
3166 3167
}

3168
// This is the actual open, used for various purposes, such as normal use, recovery, and redirect.
3169 3170
// fname_in_env is the iname, relative to the env_dir  (data_dir is already in iname as prefix).
// The checkpointed version (checkpoint_lsn) of the dictionary must be no later than max_acceptable_lsn .
3171
// Requires: The multi-operation client lock must be held to prevent a checkpoint from occuring.
3172
static int
3173
ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id, LSN max_acceptable_lsn) {
3174
    int r;
Yoni Fogel's avatar
Yoni Fogel committed
3175
    bool txn_created = false;
3176 3177
    char *fname_in_cwd = NULL;
    CACHEFILE cf = NULL;
3178
    FT ft = NULL;
Yoni Fogel's avatar
Yoni Fogel committed
3179
    bool did_create = false;
3180
    toku_ft_open_close_lock();
3181

3182 3183
    if (ft_h->did_set_flags) {
        r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
3184 3185 3186 3187 3188
        if (r!=0) { goto exit; }
    }
    if (txn && txn->logger->is_panicked) {
        r = EINVAL;
        goto exit;
3189
    }
3190

3191
    assert(is_create || !only_create);
3192 3193
    FILENUM reserved_filenum;
    reserved_filenum = use_filenum;
3194
    fname_in_cwd = toku_cachetable_get_fname_in_cwd(cachetable, fname_in_env);
3195
    {
3196
        int fd = -1;
3197
        r = ft_open_file(fname_in_cwd, &fd);
John Esmet's avatar
John Esmet committed
3198 3199 3200
        if (reserved_filenum.fileid == FILENUM_NONE.fileid) {
            reserved_filenum = toku_cachetable_reserve_filenum(cachetable);
        }
3201
        if (r==ENOENT && is_create) {
Yoni Fogel's avatar
Yoni Fogel committed
3202
            did_create = true;
3203 3204
            mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
            if (txn) {
3205
                BYTESTRING bs = { .len=(uint32_t) strlen(fname_in_env), .data = (char*)fname_in_env };
3206 3207 3208
                r = toku_logger_save_rollback_fcreate(txn, reserved_filenum, &bs); // bs is a copy of the fname relative to the environment
                assert_zero(r);
            }
Yoni Fogel's avatar
Yoni Fogel committed
3209
            txn_created = (bool)(txn!=NULL);
3210
            r = toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, mode, ft_h->options.flags, ft_h->options.nodesize, ft_h->options.basementnodesize, ft_h->options.compression_method);
3211
            assert_zero(r); // only possible failure is panic, which we check above
3212
            r = ft_create_file(ft_h, fname_in_cwd, &fd);
3213 3214 3215
            assert_zero(r);
        }
        if (r) { goto exit; }
John Esmet's avatar
John Esmet committed
3216
        r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum);
3217
        if (r) { goto exit; }
3218
    }
3219
    assert(ft_h->options.nodesize>0);
Yoni Fogel's avatar
Yoni Fogel committed
3220
    bool was_already_open;
3221
    if (is_create) {
3222
        r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open);
3223
        if (r==TOKUDB_DICTIONARY_NO_HEADER) {
3224
            r = toku_create_new_ft(&ft, &ft_h->options, cf, txn);
3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235
            if (r) { goto exit; }
        }
        else if (r!=0) {
            goto exit;
        }
        else if (only_create) {
            assert_zero(r);
            r = EEXIST;
            goto exit;
        }
        // if we get here, then is_create was true but only_create was false,
3236 3237
        // so it is ok for toku_read_ft_and_store_in_cachefile to have read
        // the header via toku_read_ft_and_store_in_cachefile
3238
    } else {
3239
        r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open);
3240 3241
        if (r) { goto exit; }
    }
3242 3243
    if (!ft_h->did_set_flags) {
        r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
3244
        if (r) { goto exit; }
3245
    } else if (ft_h->options.flags != ft->h->flags) {                  /* if flags have been set then flags must match */
3246 3247
        r = EINVAL;
        goto exit;
3248
    }
3249
    toku_ft_handle_inherit_options(ft_h, ft);
3250 3251

    if (!was_already_open) {
3252
        if (!did_create) { //Only log the fopen that OPENs the file.  If it was already open, don't log.
3253
            r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), ft_h->options.flags);
3254 3255
            assert_zero(r);
        }
3256
    }
3257 3258
    int use_reserved_dict_id;
    use_reserved_dict_id = use_dictionary_id.dictid != DICTIONARY_ID_NONE.dictid;
3259
    if (!was_already_open) {
3260 3261 3262 3263 3264 3265 3266
        DICTIONARY_ID dict_id;
        if (use_reserved_dict_id) {
            dict_id = use_dictionary_id;
        }
        else {
            dict_id = next_dict_id();
        }
3267
        ft->dict_id = dict_id;
3268 3269
    }
    else {
3270 3271
        // dict_id is already in header
        if (use_reserved_dict_id) {
3272
            assert(ft->dict_id.dictid == use_dictionary_id.dictid);
3273
        }
3274
    }
3275 3276 3277
    assert(ft);
    assert(ft->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
    assert(ft->dict_id.dictid < dict_id_serial);
3278

3279 3280 3281
    // important note here,
    // after this point, where we associate the header
    // with the brt, the function is not allowed to fail
3282 3283
    // Code that handles failure (located below "exit"),
    // depends on this
3284
    toku_ft_note_ft_handle_open(ft, ft_h);
3285
    if (txn_created) {
3286
        assert(txn);
3287
        toku_ft_suppress_rollbacks(ft, txn);
3288
        toku_txn_maybe_note_ft(txn, ft);
3289
    }
3290

3291
    //Opening a brt may restore to previous checkpoint.         Truncate if necessary.
3292
    {
3293
        int fd = toku_cachefile_get_fd (ft->cf);
3294
        toku_maybe_truncate_file_on_open(ft->blocktable, fd);
3295
    }
3296 3297 3298 3299 3300 3301

    r = 0;
exit:
    if (fname_in_cwd) {
        toku_free(fname_in_cwd);
    }
John Esmet's avatar
John Esmet committed
3302
    if (r != 0 && cf) {
3303
        if (ft) {
John Esmet's avatar
John Esmet committed
3304 3305 3306 3307 3308 3309
            // we only call toku_ft_note_ft_handle_open
            // when the function succeeds, so if we are here,
            // then that means we have a reference to the header
            // but we have not linked it to this brt. So,
            // we can simply try to remove the header.
            // We don't need to unlink this brt from the header
3310
            toku_ft_grab_reflock(ft);
Yoni Fogel's avatar
Yoni Fogel committed
3311
            bool needed = toku_ft_needed_unlocked(ft);
3312 3313
            toku_ft_release_reflock(ft);
            if (!needed) {
John Esmet's avatar
John Esmet committed
3314 3315
                //Close immediately.
                char *error_string = NULL;
3316
                r = toku_ft_evict_from_memory(ft, &error_string, false, ZERO_LSN);
John Esmet's avatar
John Esmet committed
3317
                lazy_assert_zero(r);
3318 3319
            }
        }
John Esmet's avatar
John Esmet committed
3320
        else {
Yoni Fogel's avatar
Yoni Fogel committed
3321
            toku_cachefile_close(&cf, 0, false, ZERO_LSN);
John Esmet's avatar
John Esmet committed
3322
        }
3323
    }
3324
    toku_ft_open_close_unlock();
3325
    return r;
3326 3327
}

3328
// Open a brt for the purpose of recovery, which requires that the brt be open to a pre-determined FILENUM
3329
// and may require a specific checkpointed version of the file.
3330
// (dict_id is assigned by the ft_handle_open() function.)
3331
int
3332
toku_ft_handle_open_recovery(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, LSN max_acceptable_lsn) {
3333
    int r;
3334
    assert(use_filenum.fileid != FILENUM_NONE.fileid);
3335
    r = ft_handle_open(t, fname_in_env, is_create, only_create, cachetable,
3336
                 txn, use_filenum, DICTIONARY_ID_NONE, max_acceptable_lsn);
3337 3338 3339
    return r;
}

3340
// Open a brt in normal use.  The FILENUM and dict_id are assigned by the ft_handle_open() function.
3341
// Requires: The multi-operation client lock must be held to prevent a checkpoint from occuring.
3342
int
3343
toku_ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn) {
3344
    int r;
3345
    r = ft_handle_open(t, fname_in_env, is_create, only_create, cachetable, txn, FILENUM_NONE, DICTIONARY_ID_NONE, MAX_LSN);
3346 3347 3348
    return r;
}

3349 3350 3351 3352 3353 3354
// clone an ft handle. the cloned handle has a new dict_id but refers to the same fractal tree
int 
toku_ft_handle_clone(FT_HANDLE *cloned_ft_handle, FT_HANDLE ft_handle, TOKUTXN txn) {
    int r;
    FT_HANDLE result_ft_handle; 
    r = toku_ft_handle_create(&result_ft_handle);
3355
    resource_assert_zero(r);
3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376

    // we're cloning, so the handle better have an open ft and open cf
    invariant(ft_handle->ft);
    invariant(ft_handle->ft->cf);

    // inherit the options of the ft whose handle is being cloned.
    toku_ft_handle_inherit_options(result_ft_handle, ft_handle->ft);

    // we can clone the handle by creating a new handle with the same fname
    CACHEFILE cf = ft_handle->ft->cf;
    CACHETABLE ct = toku_cachefile_get_cachetable(cf);
    const char *fname_in_env = toku_cachefile_fname_in_env(cf);
    r = toku_ft_handle_open(result_ft_handle, fname_in_env, false, false, ct, txn); 
    if (r != 0) {
        toku_ft_handle_close(result_ft_handle);
        result_ft_handle = NULL;
    }
    *cloned_ft_handle = result_ft_handle;
    return r;
}

3377
// Open a brt in normal use.  The FILENUM and dict_id are assigned by the ft_handle_open() function.
3378
int
3379 3380
toku_ft_handle_open_with_dict_id(
    FT_HANDLE t,
3381 3382 3383 3384 3385
    const char *fname_in_env,
    int is_create,
    int only_create,
    CACHETABLE cachetable,
    TOKUTXN txn,
3386
    DICTIONARY_ID use_dictionary_id
3387
    )
3388
{
3389
    int r;
3390
    r = ft_handle_open(
3391 3392 3393 3394 3395 3396 3397 3398
        t,
        fname_in_env,
        is_create,
        only_create,
        cachetable,
        txn,
        FILENUM_NONE,
        use_dictionary_id,
3399 3400
        MAX_LSN
        );
3401 3402 3403 3404
    return r;
}

DICTIONARY_ID
3405
toku_ft_get_dictionary_id(FT_HANDLE brt) {
3406
    FT h = brt->ft;
3407 3408 3409 3410
    DICTIONARY_ID dict_id = h->dict_id;
    return dict_id;
}

3411 3412 3413
void toku_ft_set_flags(FT_HANDLE ft_handle, unsigned int flags) {
    ft_handle->did_set_flags = true;
    ft_handle->options.flags = flags;
3414 3415
}

3416 3417
void toku_ft_get_flags(FT_HANDLE ft_handle, unsigned int *flags) {
    *flags = ft_handle->options.flags;
3418 3419
}

3420
void toku_ft_get_maximum_advised_key_value_lengths (unsigned int *max_key_len, unsigned int *max_val_len)
3421 3422 3423 3424 3425 3426
// return the maximum advisable key value lengths.  The brt doesn't enforce these.
{
    *max_key_len = 32*1024;
    *max_val_len = 32*1024*1024;
}

3427 3428 3429 3430 3431 3432 3433 3434

void toku_ft_handle_set_nodesize(FT_HANDLE ft_handle, unsigned int nodesize) {
    if (ft_handle->ft) {
        toku_ft_set_nodesize(ft_handle->ft, nodesize);
    }
    else {
        ft_handle->options.nodesize = nodesize;
    }
3435 3436
}

3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461
void toku_ft_handle_get_nodesize(FT_HANDLE ft_handle, unsigned int *nodesize) {
    if (ft_handle->ft) {
        toku_ft_get_nodesize(ft_handle->ft, nodesize);
    }
    else {
        *nodesize = ft_handle->options.nodesize;
    }
}

void toku_ft_handle_set_basementnodesize(FT_HANDLE ft_handle, unsigned int basementnodesize) {
    if (ft_handle->ft) {
        toku_ft_set_basementnodesize(ft_handle->ft, basementnodesize);
    }
    else {
        ft_handle->options.basementnodesize = basementnodesize;
    }
}

void toku_ft_handle_get_basementnodesize(FT_HANDLE ft_handle, unsigned int *basementnodesize) {
    if (ft_handle->ft) {
        toku_ft_get_basementnodesize(ft_handle->ft, basementnodesize);
    }
    else {
        *basementnodesize = ft_handle->options.basementnodesize;
    }
3462 3463
}

3464
int toku_ft_set_bt_compare(FT_HANDLE brt, int (*bt_compare)(DB*, const DBT*, const DBT*)) {
3465
    brt->options.compare_fun = bt_compare;
3466 3467 3468
    return 0;
}

3469
void toku_ft_set_redirect_callback(FT_HANDLE brt, on_redirect_callback redir_cb, void* extra) {
3470 3471 3472 3473
    brt->redirect_callback = redir_cb;
    brt->redirect_callback_extra = extra;
}

3474
int toku_ft_set_update(FT_HANDLE brt, ft_update_func update_fun) {
3475
    brt->options.update_fun = update_fun;
3476 3477 3478
    return 0;
}

3479
ft_compare_func toku_ft_get_bt_compare (FT_HANDLE brt) {
3480
    return brt->options.compare_fun;
3481 3482
}

3483 3484
static void
ft_remove_handle_ref_callback(FT UU(ft), void *extra) {
3485
    FT_HANDLE CAST_FROM_VOIDP(handle, extra);
3486 3487 3488
    toku_list_remove(&handle->live_ft_handle_link);
}

3489 3490 3491 3492 3493 3494
// close an ft handle during normal operation. the underlying ft may or may not close,
// depending if there are still references. an lsn for this close will come from the logger.
void
toku_ft_handle_close(FT_HANDLE ft_handle) {
    // There are error paths in the ft_handle_open that end with ft_handle->ft==NULL.
    FT ft = ft_handle->ft;
3495
    if (ft) {
3496 3497
        const bool oplsn_valid = false;
        toku_ft_remove_reference(ft, oplsn_valid, ZERO_LSN, ft_remove_handle_ref_callback, ft_handle);
3498
    }
3499
    toku_free(ft_handle);
3500 3501
}

3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518
// close an ft handle during recovery. the underlying ft must close, and will use the given lsn.
void 
toku_ft_handle_close_recovery(FT_HANDLE ft_handle, LSN oplsn) {
    FT ft = ft_handle->ft;
    // the ft must exist if closing during recovery. error paths during 
    // open for recovery should close handles using toku_ft_handle_close()
    assert(ft);
    const bool oplsn_valid = true;
    toku_ft_remove_reference(ft, oplsn_valid, oplsn, ft_remove_handle_ref_callback, ft_handle);
    toku_free(ft_handle);
}

// TODO: remove this, callers should instead just use toku_ft_handle_close()
int 
toku_close_ft_handle_nolsn (FT_HANDLE ft_handle, char** UU(error_string)) {
    toku_ft_handle_close(ft_handle);
    return 0;
3519 3520
}

3521 3522
int toku_ft_handle_create(FT_HANDLE *ft_handle_ptr) {
    FT_HANDLE MALLOC(brt);
3523
    if (brt == 0)
3524
        return ENOMEM;
3525
    memset(brt, 0, sizeof *brt);
3526
    toku_list_init(&brt->live_ft_handle_link);
3527
    brt->options.flags = 0;
Yoni Fogel's avatar
Yoni Fogel committed
3528
    brt->did_set_flags = false;
3529 3530 3531 3532 3533
    brt->options.nodesize = FT_DEFAULT_NODE_SIZE;
    brt->options.basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
    brt->options.compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
    brt->options.compare_fun = toku_builtin_compare_fun;
    brt->options.update_fun = NULL;
3534
    *ft_handle_ptr = brt;
3535 3536
    return 0;
}
3537

3538 3539
/* ************* CURSORS ********************* */

3540
static inline void
3541
ft_cursor_cleanup_dbts(FT_CURSOR c) {
3542 3543 3544 3545
    if (c->key.data) toku_free(c->key.data);
    if (c->val.data) toku_free(c->val.data);
    memset(&c->key, 0, sizeof(c->key));
    memset(&c->val, 0, sizeof(c->val));
3546 3547
}

3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558
//
// This function is used by the leafentry iterators.
// returns TOKUDB_ACCEPT if live transaction context is allowed to read a value
// that is written by transaction with LSN of id
// live transaction context may read value if either id is the root ancestor of context, or if
// id was committed before context's snapshot was taken.
// For id to be committed before context's snapshot was taken, the following must be true:
//  - id < context->snapshot_txnid64 AND id is not in context's live root transaction list
// For the above to NOT be true:
//  - id > context->snapshot_txnid64 OR id is in context's live root transaction list
//
3559
static int
3560
does_txn_read_entry(TXNID id, TOKUTXN context) {
3561 3562 3563
    int rval;
    TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(context);
    if (id < oldest_live_in_snapshot || id == context->ancestor_txnid64) {
3564
        rval = TOKUDB_ACCEPT;
3565
    }
3566
    else if (id > context->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(*context->live_root_txn_list, id)) {
3567
        rval = 0;
3568 3569
    }
    else {
3570
        rval = TOKUDB_ACCEPT;
3571 3572 3573 3574
    }
    return rval;
}

3575
static inline void
3576 3577
ft_cursor_extract_key_and_val(LEAFENTRY le,
                               FT_CURSOR cursor,
Yoni Fogel's avatar
Yoni Fogel committed
3578
                               uint32_t *keylen,
3579
                               void            **key,
Yoni Fogel's avatar
Yoni Fogel committed
3580
                               uint32_t *vallen,
3581
                               void            **val) {
3582
    if (toku_ft_cursor_is_leaf_mode(cursor)) {
3583 3584 3585
        *key = le_key_and_len(le, keylen);
        *val = le;
        *vallen = leafentry_memsize(le);
3586
    } else if (cursor->is_snapshot_read) {
3587 3588 3589 3590 3591 3592 3593 3594
        le_iterate_val(
            le,
            does_txn_read_entry,
            val,
            vallen,
            cursor->ttxn
            );
        *key = le_key_and_len(le, keylen);
3595
    } else {
3596 3597
        *key = le_key_and_len(le, keylen);
        *val = le_latest_val_and_len(le, vallen);
3598
    }
3599 3600
}

3601 3602 3603
int toku_ft_cursor (
    FT_HANDLE brt,
    FT_CURSOR *cursorptr,
3604
    TOKUTXN ttxn,
Yoni Fogel's avatar
Yoni Fogel committed
3605 3606
    bool is_snapshot_read,
    bool disable_prefetching
3607
    )
3608 3609
{
    if (is_snapshot_read) {
3610
        invariant(ttxn != NULL);
3611
        int accepted = does_txn_read_entry(brt->ft->h->root_xid_that_created, ttxn);
3612 3613 3614 3615
        if (accepted!=TOKUDB_ACCEPT) {
            invariant(accepted==0);
            return TOKUDB_MVCC_DICTIONARY_TOO_NEW;
        }
3616
    }
3617
    FT_CURSOR MALLOC(cursor);
3618
    // if this cursor is to do read_committed fetches, then the txn objects must be valid.
3619
    if (cursor == 0)
3620
        return ENOMEM;
3621
    memset(cursor, 0, sizeof(*cursor));
3622
    cursor->ft_handle = brt;
Yoni Fogel's avatar
Yoni Fogel committed
3623
    cursor->prefetching = false;
3624 3625
    toku_init_dbt(&cursor->range_lock_left_key);
    toku_init_dbt(&cursor->range_lock_right_key);
Yoni Fogel's avatar
Yoni Fogel committed
3626 3627
    cursor->left_is_neg_infty = false;
    cursor->right_is_pos_infty = false;
3628
    cursor->is_snapshot_read = is_snapshot_read;
Yoni Fogel's avatar
Yoni Fogel committed
3629
    cursor->is_leaf_mode = false;
3630
    cursor->ttxn = ttxn;
3631
    cursor->disable_prefetching = disable_prefetching;
Yoni Fogel's avatar
Yoni Fogel committed
3632
    cursor->is_temporary = false;
3633 3634 3635 3636
    *cursorptr = cursor;
    return 0;
}

3637
void
3638
toku_ft_cursor_set_temporary(FT_CURSOR ftcursor) {
Yoni Fogel's avatar
Yoni Fogel committed
3639
    ftcursor->is_temporary = true;
3640 3641
}

3642
void
3643
toku_ft_cursor_set_leaf_mode(FT_CURSOR ftcursor) {
Yoni Fogel's avatar
Yoni Fogel committed
3644
    ftcursor->is_leaf_mode = true;
3645 3646 3647
}

int
3648 3649
toku_ft_cursor_is_leaf_mode(FT_CURSOR ftcursor) {
    return ftcursor->is_leaf_mode;
3650 3651
}

3652
void
3653
toku_ft_cursor_set_range_lock(FT_CURSOR cursor, const DBT *left, const DBT *right,
Yoni Fogel's avatar
Yoni Fogel committed
3654
                               bool left_is_neg_infty, bool right_is_pos_infty)
3655 3656
{
    if (cursor->range_lock_left_key.data) {
Leif Walsh's avatar
Leif Walsh committed
3657 3658
        toku_free(cursor->range_lock_left_key.data);
        toku_init_dbt(&cursor->range_lock_left_key);
3659 3660
    }
    if (cursor->range_lock_right_key.data) {
Leif Walsh's avatar
Leif Walsh committed
3661 3662
        toku_free(cursor->range_lock_right_key.data);
        toku_init_dbt(&cursor->range_lock_right_key);
3663 3664 3665
    }

    if (left_is_neg_infty) {
Yoni Fogel's avatar
Yoni Fogel committed
3666
        cursor->left_is_neg_infty = true;
3667 3668 3669 3670 3671
    } else {
        toku_fill_dbt(&cursor->range_lock_left_key,
                      toku_xmemdup(left->data, left->size), left->size);
    }
    if (right_is_pos_infty) {
Yoni Fogel's avatar
Yoni Fogel committed
3672
        cursor->right_is_pos_infty = true;
3673 3674 3675 3676 3677 3678
    } else {
        toku_fill_dbt(&cursor->range_lock_right_key,
                      toku_xmemdup(right->data, right->size), right->size);
    }
}

3679 3680
int toku_ft_cursor_close(FT_CURSOR cursor) {
    ft_cursor_cleanup_dbts(cursor);
3681
    if (cursor->range_lock_left_key.data) {
Leif Walsh's avatar
Leif Walsh committed
3682
        toku_free(cursor->range_lock_left_key.data);
3683 3684 3685
        toku_destroy_dbt(&cursor->range_lock_left_key);
    }
    if (cursor->range_lock_right_key.data) {
Leif Walsh's avatar
Leif Walsh committed
3686
        toku_free(cursor->range_lock_right_key.data);
3687 3688
        toku_destroy_dbt(&cursor->range_lock_right_key);
    }
3689
    toku_free(cursor);
3690 3691
    return 0;
}
3692

3693
static inline void ft_cursor_set_prefetching(FT_CURSOR cursor) {
Yoni Fogel's avatar
Yoni Fogel committed
3694
    cursor->prefetching = true;
3695 3696
}

Yoni Fogel's avatar
Yoni Fogel committed
3697
static inline bool ft_cursor_prefetching(FT_CURSOR cursor) {
3698 3699 3700
    return cursor->prefetching;
}

Yoni Fogel's avatar
Yoni Fogel committed
3701 3702
//Return true if cursor is uninitialized.  false otherwise.
static bool
3703
ft_cursor_not_set(FT_CURSOR cursor) {
3704
    assert((cursor->key.data==NULL) == (cursor->val.data==NULL));
Yoni Fogel's avatar
Yoni Fogel committed
3705
    return (bool)(cursor->key.data == NULL);
3706 3707 3708
}

static int
Yoni Fogel's avatar
Yoni Fogel committed
3709
pair_leafval_heaviside_le (uint32_t klen, void *kval,
3710
                           ft_search_t *search) {
3711
    DBT x;
3712
    int cmp = search->compare(search,
3713
                              search->k ? toku_fill_dbt(&x, kval, klen) : 0);
3714 3715
    // The search->compare function returns only 0 or 1
    switch (search->direction) {
3716 3717
    case FT_SEARCH_LEFT:   return cmp==0 ? -1 : +1;
    case FT_SEARCH_RIGHT:  return cmp==0 ? +1 : -1; // Because the comparison runs backwards for right searches.
3718
    }
3719
    abort(); return 0;
3720 3721 3722 3723
}


static int
3724
heaviside_from_search_t (OMTVALUE lev, void *extra) {
3725 3726
    LEAFENTRY CAST_FROM_VOIDP(le, lev);
    ft_search_t *CAST_FROM_VOIDP(search, extra);
Yoni Fogel's avatar
Yoni Fogel committed
3727
    uint32_t keylen;
3728
    void* key = le_key_and_len(le, &keylen);
3729

3730
    return pair_leafval_heaviside_le (keylen, key,
3731
                                      search);
3732 3733 3734
}


3735 3736 3737
//
// Returns true if the value that is to be read is empty.
//
3738
static inline int
3739
is_le_val_del(LEAFENTRY le, FT_CURSOR ftcursor) {
3740
    int rval;
3741
    if (ftcursor->is_snapshot_read) {
Yoni Fogel's avatar
Yoni Fogel committed
3742
        bool is_del;
3743 3744 3745 3746
        le_iterate_is_del(
            le,
            does_txn_read_entry,
            &is_del,
3747
            ftcursor->ttxn
3748 3749
            );
        rval = is_del;
3750 3751
    }
    else {
3752
        rval = le_latest_is_del(le);
3753
    }
3754
    return rval;
3755 3756
}

3757 3758
static const DBT zero_dbt = {0,0,0,0};

3759
static void search_save_bound (ft_search_t *search, DBT *pivot) {
3760
    if (search->have_pivot_bound) {
3761
        toku_free(search->pivot_bound.data);
3762 3763 3764 3765 3766
    }
    search->pivot_bound = zero_dbt;
    search->pivot_bound.data = toku_malloc(pivot->size);
    search->pivot_bound.size = pivot->size;
    memcpy(search->pivot_bound.data, pivot->data, pivot->size);
Yoni Fogel's avatar
Yoni Fogel committed
3767
    search->have_pivot_bound = true;
3768 3769
}

Yoni Fogel's avatar
Yoni Fogel committed
3770 3771 3772
static bool search_pivot_is_bounded (ft_search_t *search, DESCRIPTOR desc, ft_compare_func cmp, DBT *pivot) __attribute__((unused));
static bool search_pivot_is_bounded (ft_search_t *search, DESCRIPTOR desc, ft_compare_func cmp, DBT *pivot)
// Effect:  Return true iff the pivot has already been searched (for fixing #3522.)
3773 3774 3775
//  If searching from left to right, if we have already searched all the values less than pivot, we don't want to search again.
//  If searching from right to left, if we have already searched all the vlaues greater than pivot, we don't want to search again.
{
Yoni Fogel's avatar
Yoni Fogel committed
3776
    if (!search->have_pivot_bound) return true; // isn't bounded.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
3777
    FAKE_DB(db, desc);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
3778
    int comp = cmp(&db, pivot, &search->pivot_bound);
3779
    if (search->direction == FT_SEARCH_LEFT) {
3780 3781
        // searching from left to right.  If the comparison function says the pivot is <= something we already compared, don't do it again.
        return comp>0;
3782
    } else {
3783
        return comp<0;
3784 3785 3786
    }
}

3787
struct store_fifo_offset_extra {
3788
    int32_t *offsets;
3789 3790 3791
    int i;
};

3792 3793
__attribute__((nonnull(3)))
static int store_fifo_offset(const int32_t &offset, const uint32_t UU(idx), struct store_fifo_offset_extra *const extra)
3794
{
3795 3796
    extra->offsets[extra->i] = offset;
    extra->i++;
3797 3798 3799
    return 0;
}

3800 3801 3802 3803 3804
/**
 * Given pointers to offsets within a FIFO where we can find messages,
 * figure out the MSN of each message, and compare those MSNs.  Returns 1,
 * 0, or -1 if a is larger than, equal to, or smaller than b.
 */
3805 3806
static int
fifo_offset_msn_cmp(FIFO &fifo, const int32_t &ao, const int32_t &bo)
3807
{
3808 3809
    const struct fifo_entry *a = toku_fifo_get_entry(fifo, ao);
    const struct fifo_entry *b = toku_fifo_get_entry(fifo, bo);
3810 3811 3812 3813 3814 3815 3816
    if (a->msn.msn > b->msn.msn) {
        return +1;
    }
    if (a->msn.msn < b->msn.msn) {
        return -1;
    }
    return 0;
3817 3818
}

3819 3820
/**
 * Given a fifo_entry, either decompose it into its parameters and call
3821
 * toku_ft_bn_apply_cmd, or discard it, based on its MSN and the MSN of the
3822 3823
 * basement node.
 */
3824
static void
3825
do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, struct fifo_entry *entry, STAT64INFO stats_to_update)
3826
{
3827 3828 3829 3830
    // The messages are being iterated over in (key,msn) order or just in
    // msn order, so all the messages for one key, from one buffer, are in
    // ascending msn order.  So it's ok that we don't update the basement
    // node's msn until the end.
3831 3832 3833
    if (entry->msn.msn > bn->max_msn_applied.msn) {
        ITEMLEN keylen = entry->keylen;
        ITEMLEN vallen = entry->vallen;
3834
        enum ft_msg_type type = fifo_entry_get_msg_type(entry);
3835 3836 3837
        MSN msn = entry->msn;
        const XIDS xids = (XIDS) &entry->xids_s;
        bytevec key = xids_get_end_of_array(xids);
Yoni Fogel's avatar
Yoni Fogel committed
3838
        bytevec val = (uint8_t*)key + entry->keylen;
3839 3840 3841 3842

        DBT hk;
        toku_fill_dbt(&hk, key, keylen);
        DBT hv;
3843
        FT_MSG_S ftcmd = { type, msn, xids, .u = { .id = { &hk, toku_fill_dbt(&hv, val, vallen) } } };
3844
        toku_ft_bn_apply_cmd(
3845 3846 3847
            t->ft->compare_fun,
            t->ft->update_fun,
            &t->ft->cmp_descriptor,
3848
            bn,
3849
            &ftcmd,
3850 3851 3852
            &BP_WORKDONE(ancestor, childnum),
            stats_to_update
            );
3853
    } else {
3854
        STATUS_VALUE(FT_MSN_DISCARDS)++;
3855
    }
3856 3857 3858 3859 3860
    // We must always mark entry as stale since it has been marked
    // (using omt::iterate_and_mark_range)
    // It is possible to call do_bn_apply_cmd even when it won't apply the message because
    // the node containing it could have been evicted and brought back in.
    entry->is_fresh = false;
3861 3862
}

3863
struct iterate_do_bn_apply_cmd_extra {
3864
    FT_HANDLE t;
3865
    BASEMENTNODE bn;
3866
    FTNODE ancestor;
3867
    int childnum;
3868
    STAT64INFO stats_to_update;
3869 3870
};

3871 3872
__attribute__((nonnull(3)))
static int iterate_do_bn_apply_cmd(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_cmd_extra *const e)
3873
{
3874
    NONLEAF_CHILDINFO bnc = BNC(e->ancestor, e->childnum);
3875
    struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offset);
3876
    do_bn_apply_cmd(e->t, e->bn, e->ancestor, e->childnum, entry, e->stats_to_update);
3877 3878 3879
    return 0;
}

3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894
/**
 * Given the bounds of the basement node to which we will apply messages,
 * find the indexes within message_tree which contain the range of
 * relevant messages.
 *
 * The message tree contains offsets into the buffer, where messages are
 * found.  The pivot_bounds are the lower bound exclusive and upper bound
 * inclusive, because they come from pivot keys in the tree.  We want OMT
 * indices, which must have the lower bound be inclusive and the upper
 * bound exclusive.  We will get these by telling toku_omt_find to look
 * for something strictly bigger than each of our pivot bounds.
 *
 * Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper
 * bound exclusive).
 */
3895
template<typename find_bounds_omt_t>
3896
static void
3897 3898
find_bounds_within_message_tree(
    DESCRIPTOR desc,       /// used for cmp
3899
    ft_compare_func cmp,  /// used to compare keys
3900
    const find_bounds_omt_t &message_tree,      /// tree holding FIFO offsets, in which we want to look for indices
3901 3902
    FIFO buffer,           /// buffer in which messages are found
    struct pivot_bounds const * const bounds,  /// key bounds within the basement node we're applying messages to
3903 3904
    uint32_t *lbi,        /// (output) "lower bound inclusive" (index into message_tree)
    uint32_t *ube         /// (output) "upper bound exclusive" (index into message_tree)
3905 3906 3907
    )
{
    int r = 0;
3908 3909

    if (bounds->lower_bound_exclusive) {
3910 3911 3912 3913 3914
        // By setting msn to MAX_MSN and by using direction of +1, we will
        // get the first message greater than (in (key, msn) order) any
        // message (with any msn) with the key lower_bound_exclusive.
        // This will be a message we want to try applying, so it is the
        // "lower bound inclusive" within the message_tree.
3915 3916 3917 3918 3919 3920 3921 3922 3923
        struct toku_fifo_entry_key_msn_heaviside_extra lbi_extra;
        ZERO_STRUCT(lbi_extra);
        lbi_extra.desc = desc;
        lbi_extra.cmp = cmp;
        lbi_extra.fifo = buffer;
        lbi_extra.key = bounds->lower_bound_exclusive;
        lbi_extra.msn = MAX_MSN;
        int32_t found_lb;
        r = message_tree.template find<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
3924
        if (r == DB_NOTFOUND) {
3925 3926 3927
            // There is no relevant data (the lower bound is bigger than
            // any message in this tree), so we have no range and we're
            // done.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
3928 3929
            *lbi = 0;
            *ube = 0;
3930
            return;
3931 3932
        }
        if (bounds->upper_bound_inclusive) {
3933
            // Check if what we found for lbi is greater than the upper
3934 3935
            // bound inclusive that we have.  If so, there are no relevant
            // messages between these bounds.
3936
            const DBT *ubi = bounds->upper_bound_inclusive;
3937
            const int32_t offset = found_lb;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
3938 3939
            DBT found_lbidbt;
            fill_dbt_for_fifo_entry(&found_lbidbt, toku_fifo_get_entry(buffer, offset));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
3940
            FAKE_DB(db, desc);
3941
            int c = cmp(&db, &found_lbidbt, ubi);
3942 3943 3944 3945 3946
            // These DBTs really are both inclusive bounds, so we need
            // strict inequality in order to determine that there's
            // nothing between them.  If they're equal, then we actually
            // need to apply the message pointed to by lbi, and also
            // anything with the same key but a bigger msn.
3947
            if (c > 0) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
3948 3949
                *lbi = 0;
                *ube = 0;
3950
                return;
3951 3952
            }
        }
3953
    } else {
3954 3955
        // No lower bound given, it's negative infinity, so we start at
        // the first message in the OMT.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
3956
        *lbi = 0;
3957 3958
    }
    if (bounds->upper_bound_inclusive) {
3959 3960 3961 3962
        // Again, we use an msn of MAX_MSN and a direction of +1 to get
        // the first thing bigger than the upper_bound_inclusive key.
        // This is therefore the smallest thing we don't want to apply,
        // and toku_omt_iterate_on_range will not examine it.
3963 3964 3965 3966 3967 3968 3969 3970
        struct toku_fifo_entry_key_msn_heaviside_extra ube_extra;
        ZERO_STRUCT(ube_extra);
        ube_extra.desc = desc;
        ube_extra.cmp = cmp;
        ube_extra.fifo = buffer;
        ube_extra.key = bounds->upper_bound_inclusive;
        ube_extra.msn = MAX_MSN;
        r = message_tree.template find<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
3971
        if (r == DB_NOTFOUND) {
3972 3973 3974
            // Couldn't find anything in the buffer bigger than our key,
            // so we need to look at everything up to the end of
            // message_tree.
3975
            *ube = message_tree.size();
3976
        }
3977
    } else {
3978 3979
        // No upper bound given, it's positive infinity, so we need to go
        // through the end of the OMT.
3980
        *ube = message_tree.size();
3981 3982 3983
    }
}

3984 3985 3986 3987 3988 3989 3990
/**
 * For each message in the ancestor's buffer (determined by childnum) that
 * is key-wise between lower_bound_exclusive and upper_bound_inclusive,
 * apply the message to the basement node.  We treat the bounds as minus
 * or plus infinity respectively if they are NULL.  Do not mark the node
 * as dirty (preserve previous state of 'dirty' bit).
 */
3991
static void
3992
bnc_apply_messages_to_basement_node(
3993
    FT_HANDLE t,             // used for comparison function
3994
    BASEMENTNODE bn,   // where to apply messages
3995
    FTNODE ancestor,  // the ancestor node where we can find messages to apply
3996
    int childnum,      // which child buffer of ancestor contains messages we want
Zardosht Kasheff's avatar
Zardosht Kasheff committed
3997
    struct pivot_bounds const * const bounds,  // contains pivot key bounds of this basement node
Yoni Fogel's avatar
Yoni Fogel committed
3998
    bool* msgs_applied
3999 4000 4001 4002
    )
{
    int r;
    NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
4003

4004 4005
    // Determine the offsets in the message trees between which we need to
    // apply messages from this buffer
4006
    STAT64INFO_S stats_delta = {0,0};
4007
    uint32_t stale_lbi, stale_ube;
4008
    if (!bn->stale_ancestor_messages_applied) {
4009
        find_bounds_within_message_tree(&t->ft->cmp_descriptor, t->ft->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbi, &stale_ube);
4010
    } else {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4011 4012
        stale_lbi = 0;
        stale_ube = 0;
4013
    }
Yoni Fogel's avatar
Yoni Fogel committed
4014
    uint32_t fresh_lbi, fresh_ube;
4015
    find_bounds_within_message_tree(&t->ft->cmp_descriptor, t->ft->compare_fun, bnc->fresh_message_tree, bnc->buffer, bounds, &fresh_lbi, &fresh_ube);
4016

4017 4018
    // We now know where all the messages we must apply are, so one of the
    // following 4 cases will do the application, depending on which of
4019 4020
    // the lists contains relevant messages:
    //
4021
    // 1. broadcast messages and anything else, or a mix of fresh and stale
4022 4023
    // 2. only fresh messages
    // 3. only stale messages
4024 4025 4026
    if (bnc->broadcast_list.size() > 0 ||
        (stale_lbi != stale_ube && fresh_lbi != fresh_ube)) {
        // We have messages in multiple trees, so we grab all
4027 4028
        // the relevant messages' offsets and sort them by MSN, then apply
        // them in MSN order.
4029
        const int buffer_size = ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + bnc->broadcast_list.size());
4030
        int32_t *XMALLOC_N(buffer_size, offsets);
4031
        struct store_fifo_offset_extra sfo_extra = { .offsets = offsets, .i = 0 };
4032

4033
        // Populate offsets array with offsets to stale messages
4034
        r = bnc->stale_message_tree.iterate_on_range<struct store_fifo_offset_extra, store_fifo_offset>(stale_lbi, stale_ube, &sfo_extra);
4035
        assert_zero(r);
4036

4037 4038
        // Then store fresh offsets, and mark them to be moved to stale later.
        r = bnc->fresh_message_tree.iterate_and_mark_range<struct store_fifo_offset_extra, store_fifo_offset>(fresh_lbi, fresh_ube, &sfo_extra);
4039 4040
        assert_zero(r);

4041
        // Store offsets of all broadcast messages.
4042
        r = bnc->broadcast_list.iterate<struct store_fifo_offset_extra, store_fifo_offset>(&sfo_extra);
4043
        assert_zero(r);
4044
        invariant(sfo_extra.i == buffer_size);
4045 4046

        // Sort by MSN.
4047
        r = toku::sort<int32_t, FIFO, fifo_offset_msn_cmp>::mergesort_r(offsets, buffer_size, bnc->buffer);
4048 4049
        assert_zero(r);

4050
        // Apply the messages in MSN order.
4051
        for (int i = 0; i < buffer_size; ++i) {
Yoni Fogel's avatar
Yoni Fogel committed
4052
            *msgs_applied = true;
4053
            struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]);
4054
            do_bn_apply_cmd(t, bn, ancestor, childnum, entry, &stats_delta);
4055
        }
4056

4057
        toku_free(offsets);
4058
    } else if (stale_lbi == stale_ube) {
4059
        // No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later.
4060
        struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .ancestor = ancestor, .childnum = childnum, .stats_to_update = &stats_delta};
Yoni Fogel's avatar
Yoni Fogel committed
4061
        if (fresh_ube - fresh_lbi > 0) *msgs_applied = true;
4062
        r = bnc->fresh_message_tree.iterate_and_mark_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(fresh_lbi, fresh_ube, &iter_extra);
4063
        assert_zero(r);
4064 4065
    } else {
        invariant(fresh_lbi == fresh_ube);
4066 4067
        // No fresh messages to apply, we just apply stale messages.

Yoni Fogel's avatar
Yoni Fogel committed
4068
        if (stale_ube - stale_lbi > 0) *msgs_applied = true;
4069
        struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .ancestor = ancestor, .childnum = childnum , .stats_to_update = &stats_delta};
4070

4071
        r = bnc->stale_message_tree.iterate_on_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(stale_lbi, stale_ube, &iter_extra);
4072
        assert_zero(r);
4073
    }
4074 4075 4076 4077
    //
    // update stats
    //
    if (stats_delta.numbytes || stats_delta.numrows) {
4078
        toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
4079
    }
4080 4081
#if 0
#endif
4082 4083
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
4084
void
4085
toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied)
4086
// Effect:
4087
//   Bring a leaf node up-to-date according to all the messages in the ancestors.
4088
//   If the leaf node is already up-to-date then do nothing.
4089 4090 4091 4092 4093
//   If the leaf node is not already up-to-date, then record the work done
//   for that leaf in each ancestor.
// Requires:
//   This is being called when pinning a leaf node for the query path.
//   The entire root-to-leaf path is pinned and appears in the ancestors list.
4094 4095
{
    VERIFY_NODE(t, node);
4096
    invariant(node->height == 0);
4097
    // know we are a leaf node
4098 4099 4100 4101 4102
    // An important invariant:
    // We MUST bring every available basement node up to date.
    // flushing on the cleaner thread depends on this. This invariant
    // allows the cleaner thread to just pick an internal node and flush it
    // as opposed to being forced to start from the root.
4103
    for (int i = 0; i < node->n_children; i++) {
4104
        if (BP_STATE(node, i) != PT_AVAIL) { continue; }
4105 4106 4107 4108
        BASEMENTNODE curr_bn = BLB(node, i);
        struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds);
        for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
            if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
4109 4110
                assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
                bnc_apply_messages_to_basement_node(
4111 4112 4113 4114
                    t,
                    curr_bn,
                    curr_ancestors->node,
                    curr_ancestors->childnum,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4115 4116
                    &curr_bounds,
                    msgs_applied
4117
                    );
4118 4119
                // We don't want to check this ancestor node again if the
                // next time we query it, the msn hasn't changed.
4120 4121 4122
                curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
            }
        }
4123 4124 4125 4126 4127
        // At this point, we know all the stale messages above this
        // basement node have been applied, and any new messages will be
        // fresh, so we don't need to look at stale messages for this
        // basement node, unless it gets evicted (and this field becomes
        // false when it's read in again).
4128
        curr_bn->stale_ancestor_messages_applied = true;
4129 4130
    }
    VERIFY_NODE(t, node);
4131 4132
}

Leif Walsh's avatar
Leif Walsh committed
4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222
bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, MSN *const max_msn_in_path)
// Effect: Determine whether there are messages in a node's ancestors
//  which must be applied to it.  These messages are in the correct
//  keyrange for any available basement nodes, and are in nodes with the
//  correct max_msn_applied_to_node_on_disk.
// Notes:
//  This is an approximate query.
// Output:
//  max_msn_in_path: max of "max_msn_applied_to_node_on_disk" over
//    ancestors.  This is used later to update basement nodes'
//    max_msn_applied values in case we don't do the full algorithm.
// Returns:
//  true if there may be some such messages
//  false only if there are definitely no such messages
// Rationale:
//  When we pin a node with a read lock, we want to quickly determine if
//  we should exchange it for a write lock in preparation for applying
//  messages.  If there are no messages, we don't need the write lock.
{
    invariant(node->height == 0);
    MSN max_msn_applied = ZERO_MSN;
    bool needs_ancestors_messages = false;
    for (int i = 0; i < node->n_children; ++i) {
        if (BP_STATE(node, i) != PT_AVAIL) { continue; }
        BASEMENTNODE bn = BLB(node, i);
        struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds);
        for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
            if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
                assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
                NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum);
                if (bnc->broadcast_list.size() > 0) {
                    needs_ancestors_messages = true;
                    goto cleanup;
                }
                if (!bn->stale_ancestor_messages_applied) {
                    uint32_t stale_lbi, stale_ube;
                    find_bounds_within_message_tree(&ft->cmp_descriptor,
                                                    ft->compare_fun,
                                                    bnc->stale_message_tree,
                                                    bnc->buffer,
                                                    &curr_bounds,
                                                    &stale_lbi,
                                                    &stale_ube);
                    if (stale_lbi < stale_ube) {
                        needs_ancestors_messages = true;
                        goto cleanup;
                    }
                }
                uint32_t fresh_lbi, fresh_ube;
                find_bounds_within_message_tree(&ft->cmp_descriptor,
                                                ft->compare_fun,
                                                bnc->fresh_message_tree,
                                                bnc->buffer,
                                                &curr_bounds,
                                                &fresh_lbi,
                                                &fresh_ube);
                if (fresh_lbi < fresh_ube) {
                    needs_ancestors_messages = true;
                    goto cleanup;
                }
                if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied.msn) {
                    max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
                }
            }
        }
    }
    *max_msn_in_path = max_msn_applied;
cleanup:
    return needs_ancestors_messages;
}

void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied) {
    invariant(node->height == 0);
    // At this point, we aren't going to run toku_apply_... but that
    // doesn't mean max_msn_applied doesn't need to be updated.
    // This function runs in a shared access context, so to silence tools
    // like DRD, we use a CAS and ignore the result.
    // Any threads trying to update these basement nodes should be
    // updating them to the same thing (since they all have a read lock on
    // the same root-to-leaf path) so this is safe.
    for (int i = 0; i < node->n_children; ++i) {
        if (BP_STATE(node, i) != PT_AVAIL) { continue; }
        BASEMENTNODE bn = BLB(node, i);
        // Remember, this all happens in the context of a read lock.
        if (max_msn_applied.msn > bn->max_msn_applied.msn) {
            (void) __sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
        }
    }
}

4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259
struct copy_to_stale_extra {
    FT ft;
    NONLEAF_CHILDINFO bnc;
};

__attribute__((nonnull(3)))
static int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
{
    struct fifo_entry *entry = toku_fifo_get_entry(extra->bnc->buffer, offset);
    DBT keydbt;
    DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry);
    struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->ft->cmp_descriptor, .cmp = extra->ft->compare_fun, .fifo = extra->bnc->buffer, .key = key, .msn = entry->msn };
    int r = extra->bnc->stale_message_tree.insert<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(offset, heaviside_extra, nullptr);
    invariant_zero(r);
    return 0;
}

__attribute__((nonnull))
void
toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) {
    invariant(node->height > 0);
    // TODO: could be cilkified
    for (int i = 0; i < node->n_children; ++i) {
        if (BP_STATE(node, i) != PT_AVAIL) {
            continue;
        }
        NONLEAF_CHILDINFO bnc = BNC(node, i);
        // We can't delete things out of the fresh tree inside the above
        // procedures because we're still looking at the fresh tree.  Instead
        // we have to move messages after we're done looking at it.
        struct copy_to_stale_extra cts_extra = { .ft = ft, .bnc = bnc };
        int r = bnc->fresh_message_tree.iterate_over_marked<struct copy_to_stale_extra, copy_to_stale>(&cts_extra);
        invariant_zero(r);
        bnc->fresh_message_tree.delete_all_marked();
    }
}

4260
static int
4261 4262
ft_cursor_shortcut (
    FT_CURSOR cursor,
4263
    int direction,
4264
    FT_GET_CALLBACK_FUNCTION getf,
4265
    void *getf_v,
Yoni Fogel's avatar
Yoni Fogel committed
4266
    uint32_t *keylen,
4267
    void **key,
Yoni Fogel's avatar
Yoni Fogel committed
4268
    uint32_t *vallen,
4269 4270 4271
    void **val
    );

4272 4273
// This is a bottom layer of the search functions.
static int
4274
ft_search_basement_node(
4275
    BASEMENTNODE bn,
4276 4277
    ft_search_t *search,
    FT_GET_CALLBACK_FUNCTION getf,
4278
    void *getf_v,
Yoni Fogel's avatar
Yoni Fogel committed
4279
    bool *doprefetch,
4280
    FT_CURSOR ftcursor,
Yoni Fogel's avatar
Yoni Fogel committed
4281
    bool can_bulk_fetch
4282
    )
4283
{
4284
    // Now we have to convert from ft_search_t to the heaviside function with a direction.  What a pain...
4285

4286 4287
    int direction;
    switch (search->direction) {
4288 4289
    case FT_SEARCH_LEFT:   direction = +1; goto ok;
    case FT_SEARCH_RIGHT:  direction = -1; goto ok;
4290 4291
    }
    return EINVAL;  // This return and the goto are a hack to get both compile-time and run-time checking on enum
4292
ok: ;
4293
    OMTVALUE datav;
Yoni Fogel's avatar
Yoni Fogel committed
4294
    uint32_t idx = 0;
4295
    int r = toku_omt_find(bn->buffer,
4296 4297 4298
                          heaviside_from_search_t,
                          search,
                          direction,
4299
                          &datav, &idx);
4300 4301
    if (r!=0) return r;

4302
    LEAFENTRY CAST_FROM_VOIDP(le, datav);
4303
    if (toku_ft_cursor_is_leaf_mode(ftcursor))
4304
        goto got_a_good_value;        // leaf mode cursors see all leaf entries
4305
    if (is_le_val_del(le,ftcursor)) {
4306 4307 4308 4309
        // Provisionally deleted stuff is gone.
        // So we need to scan in the direction to see if we can find something
        while (1) {
            switch (search->direction) {
4310
            case FT_SEARCH_LEFT:
4311
                idx++;
4312 4313
                if (idx >= toku_omt_size(bn->buffer))
                    return DB_NOTFOUND;
4314
                break;
4315
            case FT_SEARCH_RIGHT:
4316
                if (idx == 0)
4317
                    return DB_NOTFOUND;
4318 4319 4320
                idx--;
                break;
            default:
Yoni Fogel's avatar
Yoni Fogel committed
4321
                assert(false);
4322
            }
4323
            r = toku_omt_fetch(bn->buffer, idx, &datav);
4324
            assert_zero(r); // we just validated the index
4325
            CAST_FROM_VOIDP(le, datav);
4326
            if (!is_le_val_del(le,ftcursor)) goto got_a_good_value;
4327
        }
4328 4329
    }
got_a_good_value:
4330
    {
Yoni Fogel's avatar
Yoni Fogel committed
4331
        uint32_t keylen;
4332
        void *key;
Yoni Fogel's avatar
Yoni Fogel committed
4333
        uint32_t vallen;
4334 4335
        void *val;

4336 4337
        ft_cursor_extract_key_and_val(le,
                                       ftcursor,
4338 4339 4340 4341
                                       &keylen,
                                       &key,
                                       &vallen,
                                       &val
4342
            );
4343

4344
        r = getf(keylen, key, vallen, val, getf_v, false);
4345
        if (r==0 || r == TOKUDB_CURSOR_CONTINUE) {
4346 4347
            ftcursor->leaf_info.to_be.omt   = bn->buffer;
            ftcursor->leaf_info.to_be.index = idx;
4348

4349
            if (r == TOKUDB_CURSOR_CONTINUE && can_bulk_fetch) {
4350 4351
                r = ft_cursor_shortcut(
                    ftcursor,
4352 4353 4354 4355 4356 4357 4358 4359 4360 4361
                    direction,
                    getf,
                    getf_v,
                    &keylen,
                    &key,
                    &vallen,
                    &val
                    );
            }

4362 4363 4364 4365 4366 4367
            ft_cursor_cleanup_dbts(ftcursor);
            if (!ftcursor->is_temporary) {
                ftcursor->key.data = toku_memdup(key, keylen);
                ftcursor->val.data = toku_memdup(val, vallen);
                ftcursor->key.size = keylen;
                ftcursor->val.size = vallen;
4368
            }
4369
            //The search was successful.  Prefetching can continue.
Yoni Fogel's avatar
Yoni Fogel committed
4370
            *doprefetch = true;
4371
        }
4372
    }
4373
    if (r == TOKUDB_CURSOR_CONTINUE) r = 0;
4374
    return r;
4375 4376 4377
}

static int
4378 4379 4380 4381
ft_search_node (
    FT_HANDLE brt,
    FTNODE node,
    ft_search_t *search,
4382
    int child_to_search,
4383
    FT_GET_CALLBACK_FUNCTION getf,
4384
    void *getf_v,
Yoni Fogel's avatar
Yoni Fogel committed
4385
    bool *doprefetch,
4386
    FT_CURSOR ftcursor,
4387 4388
    UNLOCKERS unlockers,
    ANCESTORS,
4389
    struct pivot_bounds const * const bounds,
Yoni Fogel's avatar
Yoni Fogel committed
4390
    bool can_bulk_fetch
4391
    );
4392 4393

// the number of nodes to prefetch
4394
#define TOKU_DO_PREFETCH 1
4395 4396
#if TOKU_DO_PREFETCH

4397
static int
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4398
ftnode_fetch_callback_and_free_bfe(CACHEFILE cf, PAIR p, int fd, BLOCKNUM nodename, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int *dirtyp, void *extraargs)
4399
{
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4400
    int r = toku_ftnode_fetch_callback(cf, p, fd, nodename, fullhash, ftnode_pv, disk_data, sizep, dirtyp, extraargs);
4401
    struct ftnode_fetch_extra *CAST_FROM_VOIDP(ffe, extraargs);
4402 4403
    destroy_bfe_for_prefetch(ffe);
    toku_free(ffe);
4404 4405 4406 4407
    return r;
}

static int
4408
ftnode_pf_callback_and_free_bfe(void *ftnode_pv, void* disk_data, void *read_extraargs, int fd, PAIR_ATTR *sizep)
4409
{
4410
    int r = toku_ftnode_pf_callback(ftnode_pv, disk_data, read_extraargs, fd, sizep);
4411
    struct ftnode_fetch_extra *CAST_FROM_VOIDP(ffe, read_extraargs);
4412 4413
    destroy_bfe_for_prefetch(ffe);
    toku_free(ffe);
4414 4415 4416
    return r;
}

4417
static void
Yoni Fogel's avatar
Yoni Fogel committed
4418
ft_node_maybe_prefetch(FT_HANDLE brt, FTNODE node, int childnum, FT_CURSOR ftcursor, bool *doprefetch) {
Rich Prohaska's avatar
Rich Prohaska committed
4419

4420
    // if we want to prefetch in the tree
4421
    // then prefetch the next children if there are any
4422 4423
    if (*doprefetch && ft_cursor_prefetching(ftcursor) && !ftcursor->disable_prefetching) {
        int rc = ft_cursor_rightmost_child_wanted(ftcursor, brt, node);
4424 4425
        for (int i = childnum + 1; (i <= childnum + TOKU_DO_PREFETCH) && (i <= rc); i++) {
            BLOCKNUM nextchildblocknum = BP_BLOCKNUM(node, i);
Yoni Fogel's avatar
Yoni Fogel committed
4426
            uint32_t nextfullhash = compute_child_fullhash(brt->ft->cf, node, i);
4427
            struct ftnode_fetch_extra *MALLOC(bfe);
4428
            fill_bfe_for_prefetch(bfe, brt->ft, ftcursor);
Yoni Fogel's avatar
Yoni Fogel committed
4429
            bool doing_prefetch = false;
4430
            toku_cachefile_prefetch(
4431
                brt->ft->cf,
4432 4433
                nextchildblocknum,
                nextfullhash,
4434
                get_write_callbacks_for_node(brt->ft),
4435 4436 4437
                ftnode_fetch_callback_and_free_bfe,
                toku_ftnode_pf_req_callback,
                ftnode_pf_callback_and_free_bfe,
4438 4439 4440 4441 4442 4443 4444
                bfe,
                &doing_prefetch
                );
            if (!doing_prefetch) {
                destroy_bfe_for_prefetch(bfe);
                toku_free(bfe);
            }
Yoni Fogel's avatar
Yoni Fogel committed
4445
            *doprefetch = false;
4446
        }
4447 4448 4449 4450
    }
}

#endif
4451

4452 4453 4454
struct unlock_ftnode_extra {
    FT_HANDLE ft_handle;
    FTNODE node;
Yoni Fogel's avatar
Yoni Fogel committed
4455
    bool msgs_applied;
4456 4457 4458
};
// When this is called, the cachetable lock is held
static void
4459
unlock_ftnode_fun (void *v) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4460 4461
    struct unlock_ftnode_extra *x = NULL;
    CAST_FROM_VOIDP(x, v);
4462 4463
    FT_HANDLE brt = x->ft_handle;
    FTNODE node = x->node;
4464
    // CT lock is held
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4465
    int r = toku_cachetable_unpin_ct_prelocked_no_flush(
4466
        brt->ft->cf,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4467
        node->ct_pair,
4468
        (enum cachetable_dirty) node->dirty,
4469
        x->msgs_applied ? make_ftnode_pair_attr(node) : make_invalid_pair_attr()
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4470
        );
4471 4472 4473
    assert(r==0);
}

4474
/* search in a node's child */
4475
static int
Yoni Fogel's avatar
Yoni Fogel committed
4476 4477
ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, bool *doprefetch, FT_CURSOR ftcursor, UNLOCKERS unlockers,
                 ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool can_bulk_fetch)
4478
// Effect: Search in a node's child.  Searches are read-only now (at least as far as the hardcopy is concerned).
4479
{
4480
    struct ancestors next_ancestors = {node, childnum, ancestors};
4481

4482
    BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
Yoni Fogel's avatar
Yoni Fogel committed
4483
    uint32_t fullhash = compute_child_fullhash(brt->ft->cf, node, childnum);
4484
    FTNODE childnode;
4485

4486
    struct ftnode_fetch_extra bfe;
4487
    fill_bfe_for_subset_read(
4488
        &bfe,
4489
        brt->ft,
4490
        search,
4491 4492 4493 4494 4495
        &ftcursor->range_lock_left_key,
        &ftcursor->range_lock_right_key,
        ftcursor->left_is_neg_infty,
        ftcursor->right_is_pos_infty,
        ftcursor->disable_prefetching
4496
        );
Yoni Fogel's avatar
Yoni Fogel committed
4497
    bool msgs_applied = false;
4498
    {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4499 4500 4501 4502
        int rr = toku_pin_ftnode_batched(brt, childblocknum, fullhash,
                                         unlockers,
                                         &next_ancestors, bounds,
                                         &bfe,
Leif Walsh's avatar
Leif Walsh committed
4503
                                         PL_READ, // we try to get a read lock, but we may upgrade to a write lock on a leaf for message application.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4504 4505 4506 4507 4508 4509 4510 4511 4512 4513 4514 4515 4516
                                         true,
                                         (node->height == 1), // end_batch_on_success true iff child is a leaf
                                         &childnode,
                                         &msgs_applied);
        if (rr==TOKUDB_TRY_AGAIN) {
            // We're going to try again, so we aren't pinning any more
            // nodes in this batch.
            toku_cachetable_end_batched_pin(brt->ft->cf);
            return rr;
        }
        // We end the batch before applying ancestor messages if we get
        // all the way to a leaf.
        invariant_zero(rr);
4517 4518
    }

4519
    struct unlock_ftnode_extra unlock_extra   = {brt,childnode,msgs_applied};
Yoni Fogel's avatar
Yoni Fogel committed
4520
    struct unlockers next_unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, unlockers};
4521

4522
    int r = ft_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, ftcursor, &next_unlockers, &next_ancestors, bounds, can_bulk_fetch);
4523
    if (r!=TOKUDB_TRY_AGAIN) {
4524
#if TOKU_DO_PREFETCH
4525 4526
        // maybe prefetch the next child
        if (r == 0 && node->height == 1) {
4527
            ft_node_maybe_prefetch(brt, node, childnum, ftcursor, doprefetch);
4528
        }
4529 4530
#endif

4531
        assert(next_unlockers.locked);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4532
        if (msgs_applied) {
4533
            toku_unpin_ftnode(brt->ft, childnode);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4534 4535
        }
        else {
4536
            toku_unpin_ftnode_read_only(brt, childnode);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4537
        }
4538
    } else {
4539
        // try again.
4540 4541

        // there are two cases where we get TOKUDB_TRY_AGAIN
4542
        //  case 1 is when some later call to toku_pin_ftnode returned
4543
        //  that value and unpinned all the nodes anyway. case 2
4544
        //  is when ft_search_node had to stop its search because
4545 4546
        //  some piece of a node that it needed was not in memory. In this case,
        //  the node was not unpinned, so we unpin it here
4547
        if (next_unlockers.locked) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4548
            if (msgs_applied) {
4549
                toku_unpin_ftnode(brt->ft, childnode);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4550 4551
            }
            else {
4552
                toku_unpin_ftnode_read_only(brt, childnode);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4553
            }
4554
        }
4555
    }
4556

4557 4558 4559
    return r;
}

4560
static inline int
4561
search_which_child_cmp_with_bound(DB *db, ft_compare_func cmp, FTNODE node, int childnum, ft_search_t *search, DBT *dbt)
4562
{
4563
    return cmp(db, toku_copyref_dbt(dbt, node->childkeys[childnum]), &search->pivot_bound);
4564 4565
}

4566
int
4567
toku_ft_search_which_child(
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4568
    DESCRIPTOR desc,
4569 4570 4571
    ft_compare_func cmp,
    FTNODE node,
    ft_search_t *search
4572
    )
4573
{
4574 4575 4576 4577 4578 4579 4580 4581 4582 4583 4584
#define DO_SEARCH_WHICH_CHILD_BINARY 1
#if DO_SEARCH_WHICH_CHILD_BINARY
    if (node->n_children <= 1) return 0;

    DBT pivotkey;
    toku_init_dbt(&pivotkey);
    int lo = 0;
    int hi = node->n_children - 1;
    int mi;
    while (lo < hi) {
        mi = (lo + hi) / 2;
4585
        toku_copyref_dbt(&pivotkey, node->childkeys[mi]);
4586 4587 4588 4589 4590 4591 4592 4593 4594 4595 4596 4597
        // search->compare is really strange, and only works well with a
        // linear search, it makes binary search a pita.
        //
        // if you are searching left to right, it returns
        //   "0" for pivots that are < the target, and
        //   "1" for pivots that are >= the target
        // if you are searching right to left, it's the opposite.
        //
        // so if we're searching from the left and search->compare says
        // "1", we want to go left from here, if it says "0" we want to go
        // right.  searching from the right does the opposite.
        bool c = search->compare(search, &pivotkey);
4598 4599
        if (((search->direction == FT_SEARCH_LEFT) && c) ||
            ((search->direction == FT_SEARCH_RIGHT) && !c)) {
4600 4601
            hi = mi;
        } else {
4602 4603
            assert(((search->direction == FT_SEARCH_LEFT) && !c) ||
                   ((search->direction == FT_SEARCH_RIGHT) && c));
4604 4605 4606 4607 4608 4609
            lo = mi + 1;
        }
    }
    // ready to return something, if the pivot is bounded, we have to move
    // over a bit to get away from what we've already searched
    if (search->have_pivot_bound) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4610
        FAKE_DB(db, desc);
4611
        if (search->direction == FT_SEARCH_LEFT) {
4612 4613 4614 4615 4616 4617 4618 4619 4620 4621 4622 4623 4624 4625 4626 4627 4628 4629 4630 4631 4632 4633
            while (lo < node->n_children - 1 &&
                   search_which_child_cmp_with_bound(&db, cmp, node, lo, search, &pivotkey) <= 0) {
                // searching left to right, if the comparison says the
                // current pivot (lo) is left of or equal to our bound,
                // don't search that child again
                lo++;
            }
        } else {
            while (lo > 0 &&
                   search_which_child_cmp_with_bound(&db, cmp, node, lo - 1, search, &pivotkey) >= 0) {
                // searching right to left, same argument as just above
                // (but we had to pass lo - 1 because the pivot between lo
                // and the thing just less than it is at that position in
                // the childkeys array)
                lo--;
            }
        }
    }
    return lo;
#endif
#define DO_SEARCH_WHICH_CHILD_LINEAR 0
#if DO_SEARCH_WHICH_CHILD_LINEAR
4634
    int c;
4635 4636
    DBT pivotkey;
    toku_init_dbt(&pivotkey);
4637

4638 4639
    /* binary search is overkill for a small array */
    int child[node->n_children];
4640

4641 4642
    /* scan left to right or right to left depending on the search direction */
    for (c = 0; c < node->n_children; c++) {
4643
        child[c] = (search->direction == FT_SEARCH_LEFT) ? c : node->n_children - 1 - c;
4644 4645
    }
    for (c = 0; c < node->n_children-1; c++) {
4646
        int p = (search->direction == FT_SEARCH_LEFT) ? child[c] : child[c] - 1;
4647
        toku_copyref_dbt(&pivotkey, node->childkeys[p]);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4648
        if (search_pivot_is_bounded(search, desc, cmp, &pivotkey) && search->compare(search, &pivotkey)) {
4649 4650 4651 4652 4653
            return child[c];
        }
    }
    /* check the first (left) or last (right) node if nothing has been found */
    return child[c];
4654
#endif
4655
}
4656

4657 4658
static void
maybe_search_save_bound(
4659
    FTNODE node,
4660
    int child_searched,
4661
    ft_search_t *search)
4662
{
4663
    int p = (search->direction == FT_SEARCH_LEFT) ? child_searched : child_searched - 1;
4664
    if (p >= 0 && p < node->n_children-1) {
4665
        search_save_bound(search, &node->childkeys[p]);
4666 4667 4668 4669
    }
}

static int
4670 4671 4672 4673
ft_search_node(
    FT_HANDLE brt,
    FTNODE node,
    ft_search_t *search,
4674
    int child_to_search,
4675
    FT_GET_CALLBACK_FUNCTION getf,
4676
    void *getf_v,
Yoni Fogel's avatar
Yoni Fogel committed
4677
    bool *doprefetch,
4678
    FT_CURSOR ftcursor,
4679
    UNLOCKERS unlockers,
4680
    ANCESTORS ancestors,
4681
    struct pivot_bounds const * const bounds,
Yoni Fogel's avatar
Yoni Fogel committed
4682
    bool can_bulk_fetch
4683
    )
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4684 4685
{
    int r = 0;
4686
    // assert that we got a valid child_to_search
4687
    assert(child_to_search >= 0 && child_to_search < node->n_children);
4688 4689 4690 4691
    //
    // At this point, we must have the necessary partition available to continue the search
    //
    assert(BP_STATE(node,child_to_search) == PT_AVAIL);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4692 4693 4694 4695
    // When we enter, we are in a batch.  If we search a node but get
    // DB_NOTFOUND and need to search the next node, we'll need to start
    // another batch.
    bool must_begin_batch = false;
4696
    while (child_to_search >= 0 && child_to_search < node->n_children) {
4697 4698
        //
        // Normally, the child we want to use is available, as we checked
4699
        // before entering this while loop. However, if we pass through
4700
        // the loop once, getting DB_NOTFOUND for this first value
4701
        // of child_to_search, we enter the while loop again with a
4702 4703
        // child_to_search that may not be in memory. If it is not,
        // we need to return TOKUDB_TRY_AGAIN so the query can
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4704
        // read the appropriate partition into memory
4705 4706 4707 4708
        //
        if (BP_STATE(node,child_to_search) != PT_AVAIL) {
            return TOKUDB_TRY_AGAIN;
        }
4709 4710
        const struct pivot_bounds next_bounds = next_pivot_keys(node, child_to_search, bounds);
        if (node->height > 0) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4711 4712 4713 4714
            if (must_begin_batch) {
                toku_cachetable_begin_batched_pin(brt->ft->cf);
                must_begin_batch = false;
            }
4715
            r = ft_search_child(
4716 4717 4718 4719 4720 4721 4722
                brt,
                node,
                child_to_search,
                search,
                getf,
                getf_v,
                doprefetch,
4723
                ftcursor,
4724 4725
                unlockers,
                ancestors,
4726 4727
                &next_bounds,
                can_bulk_fetch
4728 4729 4730
                );
        }
        else {
4731
            r = ft_search_basement_node(
4732
                BLB(node, child_to_search),
4733 4734 4735 4736
                search,
                getf,
                getf_v,
                doprefetch,
4737
                ftcursor,
4738
                can_bulk_fetch
4739 4740 4741
                );
        }
        if (r == 0) return r; //Success
4742

4743 4744
        if (r != DB_NOTFOUND) {
            return r; //Error (or message to quit early, such as TOKUDB_FOUND_BUT_REJECTED or TOKUDB_TRY_AGAIN)
4745
        }
4746 4747 4748 4749
        // not really necessary, just put this here so that reading the
        // code becomes simpler. The point is at this point in the code,
        // we know that we got DB_NOTFOUND and we have to continue
        assert(r == DB_NOTFOUND);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774
        // we have a new pivotkey
        if (node->height == 0) {
            // when we run off the end of a basement, try to lock the range up to the pivot. solves #3529
            const DBT *pivot = NULL;
            if (search->direction == FT_SEARCH_LEFT)
                pivot = next_bounds.upper_bound_inclusive; // left -> right
            else
                pivot = next_bounds.lower_bound_exclusive; // right -> left
            if (pivot) {
                int rr = getf(pivot->size, pivot->data, 0, NULL, getf_v, true);
                if (rr != 0)
                    return rr; // lock was not granted
            }
        }

        // If we got a DB_NOTFOUND then we have to search the next record.        Possibly everything present is not visible.
        // This way of doing DB_NOTFOUND is a kludge, and ought to be simplified.  Something like this is needed for DB_NEXT, but
        //        for point queries, it's overkill.  If we got a DB_NOTFOUND on a point query then we should just stop looking.
        // When releasing locks on I/O we must not search the same subtree again, or we won't be guaranteed to make forward progress.
        // If we got a DB_NOTFOUND, then the pivot is too small if searching from left to right (too large if searching from right to left).
        // So save the pivot key in the search object.
        maybe_search_save_bound(node, child_to_search, search);

        // We're about to pin some more nodes, but we thought we were done before.
        must_begin_batch = true;
4775
        if (search->direction == FT_SEARCH_LEFT) {
4776 4777 4778 4779 4780
            child_to_search++;
        }
        else {
            child_to_search--;
        }
4781
    }
4782
    return r;
4783 4784 4785
}

static int
Yoni Fogel's avatar
Yoni Fogel committed
4786
toku_ft_search (FT_HANDLE brt, ft_search_t *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, FT_CURSOR ftcursor, bool can_bulk_fetch)
4787 4788 4789
// Effect: Perform a search.  Associate cursor with a leaf if possible.
// All searches are performed through this function.
{
4790
    int r;
4791
    uint trycount = 0;     // How many tries did it take to get the result?
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4792
    FT ft = brt->ft;
4793

Zardosht Kasheff's avatar
Zardosht Kasheff committed
4794
try_again:
4795

4796
    trycount++;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4797
    assert(ft);
4798

Zardosht Kasheff's avatar
Zardosht Kasheff committed
4799 4800 4801 4802 4803 4804 4805 4806 4807 4808
    //
    // Here is how searches work
    // At a high level, we descend down the tree, using the search parameter
    // to guide us towards where to look. But the search parameter is not
    // used here to determine which child of a node to read (regardless
    // of whether that child is another node or a basement node)
    // The search parameter is used while we are pinning the node into
    // memory, because that is when the system needs to ensure that
    // the appropriate partition of the child we are using is in memory.
    // So, here are the steps for a search (and this applies to this function
4809 4810 4811
    // as well as ft_search_child:
    //  - Take the search parameter, and create a ftnode_fetch_extra, that will be used by toku_pin_ftnode(_holding_lock)
    //  - Call toku_pin_ftnode(_holding_lock) with the bfe as the extra for the fetch callback (in case the node is not at all in memory)
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4812
    //       and the partial fetch callback (in case the node is perhaps partially in memory) to the fetch the node
4813
    //  - This eventually calls either toku_ftnode_fetch_callback or  toku_ftnode_pf_req_callback depending on whether the node is in
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4814
    //     memory at all or not.
4815 4816 4817
    //  - Within these functions, the "ft_search_t search" parameter is used to evaluate which child the search is interested in.
    //     If the node is not in memory at all, toku_ftnode_fetch_callback will read the node and decompress only the partition for the
    //     relevant child, be it a message buffer or basement node. If the node is in memory, then toku_ftnode_pf_req_callback
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4818
    //     will tell the cachetable that a partial fetch is required if and only if the relevant child is not in memory. If the relevant child
4819
    //     is not in memory, then toku_ftnode_pf_callback is called to fetch the partition.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4820
    //  - These functions set bfe->child_to_read so that the search code does not need to reevaluate it.
4821 4822 4823
    //  - Just to reiterate, all of the last item happens within toku_ftnode_pin(_holding_lock)
    //  - At this point, toku_ftnode_pin_holding_lock has returned, with bfe.child_to_read set,
    //  - ft_search_node is called, assuming that the node and its relevant partition are in memory.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4824
    //
4825
    struct ftnode_fetch_extra bfe;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4826 4827
    fill_bfe_for_subset_read(
        &bfe,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4828
        ft,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4829
        search,
4830 4831 4832 4833 4834
        &ftcursor->range_lock_left_key,
        &ftcursor->range_lock_right_key,
        ftcursor->left_is_neg_infty,
        ftcursor->right_is_pos_infty,
        ftcursor->disable_prefetching
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4835
        );
4836
    FTNODE node = NULL;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4837
    {
Yoni Fogel's avatar
Yoni Fogel committed
4838
        uint32_t fullhash;
4839
        CACHEKEY root_key;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4840 4841 4842 4843 4844 4845 4846 4847
        toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
        // Begin a batch of pins here.  If a child gets TOKUDB_TRY_AGAIN
        // it must immediately end the batch.  Otherwise, it must end the
        // batch as soon as it pins the leaf.  The batch will never be
        // ended in this function.
        toku_cachetable_begin_batched_pin(ft->cf);
        toku_pin_ftnode_off_client_thread_batched(
            ft,
4848
            root_key,
4849
            fullhash,
4850
            &bfe,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4851
            PL_READ, // may_modify_node set to false, because root cannot change during search
4852 4853 4854 4855
            0,
            NULL,
            &node
            );
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4856 4857 4858 4859 4860 4861
        if (node->height == 0) {
            // The root is a leaf, must end the batch now because we
            // won't apply ancestor messages, which is where we usually
            // end it.
            toku_cachetable_end_batched_pin(ft->cf);
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4862
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4863

4864
    uint tree_height = node->height + 1;  // How high is the tree?  This is the height of the root node plus one (leaf is at height 0).
4865

Zardosht Kasheff's avatar
Zardosht Kasheff committed
4866

Yoni Fogel's avatar
Yoni Fogel committed
4867 4868
    struct unlock_ftnode_extra unlock_extra   = {brt,node,false};
    struct unlockers                unlockers      = {true, unlock_ftnode_fun, (void*)&unlock_extra, (UNLOCKERS)NULL};
4869

4870
    {
Yoni Fogel's avatar
Yoni Fogel committed
4871
        bool doprefetch = false;
4872
        //static int counter = 0;         counter++;
4873
        r = ft_search_node(brt, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, ftcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds, can_bulk_fetch);
4874
        if (r==TOKUDB_TRY_AGAIN) {
4875
            // there are two cases where we get TOKUDB_TRY_AGAIN
4876
            //  case 1 is when some later call to toku_pin_ftnode returned
4877
            //  that value and unpinned all the nodes anyway. case 2
4878
            //  is when ft_search_node had to stop its search because
4879
            //  some piece of a node that it needed was not in memory.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
4880
            //  In this case, the node was not unpinned, so we unpin it here
4881
            if (unlockers.locked) {
4882
                toku_unpin_ftnode_read_only(brt, node);
4883
            }
4884 4885 4886 4887
            goto try_again;
        } else {
            assert(unlockers.locked);
        }
4888 4889
    }

4890
    assert(unlockers.locked);
4891
    toku_unpin_ftnode_read_only(brt, node);
4892

4893

4894 4895
    //Heaviside function (+direction) queries define only a lower or upper
    //bound.  Some queries require both an upper and lower bound.
4896
    //They do this by wrapping the FT_GET_CALLBACK_FUNCTION with another
4897 4898 4899 4900 4901 4902
    //test that checks for the other bound.  If the other bound fails,
    //it returns TOKUDB_FOUND_BUT_REJECTED which means not found, but
    //stop searching immediately, as opposed to DB_NOTFOUND
    //which can mean not found, but keep looking in another leaf.
    if (r==TOKUDB_FOUND_BUT_REJECTED) r = DB_NOTFOUND;
    else if (r==DB_NOTFOUND) {
4903
        //We truly did not find an answer to the query.
4904
        //Therefore, the FT_GET_CALLBACK_FUNCTION has NOT been called.
4905 4906 4907 4908 4909 4910 4911
        //The contract specifies that the callback function must be called
        //for 'r= (0|DB_NOTFOUND|TOKUDB_FOUND_BUT_REJECTED)'
        //TODO: #1378 This is not the ultimate location of this call to the
        //callback.  It is surely wrong for node-level locking, and probably
        //wrong for the STRADDLE callback for heaviside function(two sets of key/vals)
        int r2 = getf(0,NULL, 0,NULL, getf_v, false);
        if (r2!=0) r = r2;
4912
    }
4913
    {   // accounting (to detect and measure thrashing)
4914
        uint retrycount = trycount - 1;         // how many retries were needed?
4915
        if (retrycount) STATUS_VALUE(FT_TOTAL_RETRIES) += retrycount;
4916
        if (retrycount > tree_height) {         // if at least one node was read from disk more than once
4917
            STATUS_VALUE(FT_SEARCH_TRIES_GT_HEIGHT)++;
4918
            uint excess_tries = retrycount - tree_height;
4919 4920
            if (excess_tries > STATUS_VALUE(FT_MAX_SEARCH_EXCESS_RETRIES))
                STATUS_VALUE(FT_MAX_SEARCH_EXCESS_RETRIES) = excess_tries;
4921
            if (retrycount > (tree_height+3))
4922
                STATUS_VALUE(FT_SEARCH_TRIES_GT_HEIGHTPLUS3)++;
4923
        }
4924
    }
4925 4926 4927
    return r;
}

4928 4929
struct ft_cursor_search_struct {
    FT_GET_CALLBACK_FUNCTION getf;
4930
    void *getf_v;
4931 4932
    FT_CURSOR cursor;
    ft_search_t *search;
4933
};
4934

4935 4936
/* search for the first kv pair that matches the search object */
static int
Yoni Fogel's avatar
Yoni Fogel committed
4937
ft_cursor_search(FT_CURSOR cursor, ft_search_t *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, bool can_bulk_fetch)
4938
{
4939
    int r = toku_ft_search(cursor->ft_handle, search, getf, getf_v, cursor, can_bulk_fetch);
4940 4941 4942
    return r;
}

4943
static inline int compare_k_x(FT_HANDLE brt, const DBT *k, const DBT *x) {
4944 4945
    FAKE_DB(db, &brt->ft->cmp_descriptor);
    return brt->ft->compare_fun(&db, k, x);
4946 4947
}

4948
static int
4949
ft_cursor_compare_one(ft_search_t *search __attribute__((__unused__)), DBT *x __attribute__((__unused__)))
4950
{
4951 4952 4953
    return 1;
}

4954
static int ft_cursor_compare_set(ft_search_t *search, DBT *x) {
4955
    FT_HANDLE CAST_FROM_VOIDP(brt, search->context);
4956
    return compare_k_x(brt, search->k, x) <= 0; /* return min xy: kv <= xy */
4957 4958
}

4959
static int
4960
ft_cursor_current_getf(ITEMLEN keylen,                 bytevec key,
4961 4962
                        ITEMLEN vallen,                 bytevec val,
                        void *v, bool lock_only) {
4963
    struct ft_cursor_search_struct *CAST_FROM_VOIDP(bcss, v);
4964 4965
    int r;
    if (key==NULL) {
4966
        r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v, lock_only);
4967
    } else {
4968
        FT_CURSOR cursor = bcss->cursor;
4969 4970
        DBT newkey;
        toku_fill_dbt(&newkey, key, keylen);
4971
        if (compare_k_x(cursor->ft_handle, &cursor->key, &newkey) != 0) {
4972 4973 4974 4975 4976
            r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v, lock_only); // This was once DB_KEYEMPTY
            if (r==0) r = TOKUDB_FOUND_BUT_REJECTED;
        }
        else
            r = bcss->getf(keylen, key, vallen, val, bcss->getf_v, lock_only);
4977 4978 4979 4980 4981
    }
    return r;
}

int
4982
toku_ft_cursor_current(FT_CURSOR cursor, int op, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
4983
{
4984
    if (ft_cursor_not_set(cursor))
4985
        return EINVAL;
4986
    if (op == DB_CURRENT) {
4987 4988
        struct ft_cursor_search_struct bcss = {getf, getf_v, cursor, 0};
        ft_search_t search; ft_search_init(&search, ft_cursor_compare_set, FT_SEARCH_LEFT, &cursor->key, cursor->ft_handle);
Yoni Fogel's avatar
Yoni Fogel committed
4989
        int r = toku_ft_search(cursor->ft_handle, &search, ft_cursor_current_getf, &bcss, cursor, false);
4990
        ft_search_finish(&search);
4991
        return r;
4992
    }
4993
    return getf(cursor->key.size, cursor->key.data, cursor->val.size, cursor->val.data, getf_v, false); // ft_cursor_copyout(cursor, outkey, outval);
4994 4995
}

4996
int
4997
toku_ft_cursor_first(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
4998
{
4999
    ft_search_t search; ft_search_init(&search, ft_cursor_compare_one, FT_SEARCH_LEFT, 0, cursor->ft_handle);
Yoni Fogel's avatar
Yoni Fogel committed
5000
    int r = ft_cursor_search(cursor, &search, getf, getf_v, false);
5001
    ft_search_finish(&search);
5002
    return r;
5003 5004
}

5005
int
5006
toku_ft_cursor_last(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
5007
{
5008
    ft_search_t search; ft_search_init(&search, ft_cursor_compare_one, FT_SEARCH_RIGHT, 0, cursor->ft_handle);
Yoni Fogel's avatar
Yoni Fogel committed
5009
    int r = ft_cursor_search(cursor, &search, getf, getf_v, false);
5010
    ft_search_finish(&search);
5011
    return r;
5012 5013
}

5014
static int ft_cursor_compare_next(ft_search_t *search, DBT *x) {
5015
    FT_HANDLE CAST_FROM_VOIDP(brt, search->context);
5016
    return compare_k_x(brt, search->k, x) < 0; /* return min xy: kv < xy */
5017 5018
}

5019

5020
static int
5021 5022
ft_cursor_shortcut (
    FT_CURSOR cursor,
5023
    int direction,
5024
    FT_GET_CALLBACK_FUNCTION getf,
5025
    void *getf_v,
Yoni Fogel's avatar
Yoni Fogel committed
5026
    uint32_t *keylen,
5027
    void **key,
Yoni Fogel's avatar
Yoni Fogel committed
5028
    uint32_t *vallen,
5029
    void **val
5030
    )
5031
{
5032
    int r = 0;
Yoni Fogel's avatar
Yoni Fogel committed
5033
    uint32_t index = cursor->leaf_info.to_be.index;
5034 5035 5036
    OMT omt = cursor->leaf_info.to_be.omt;
    // if we are searching towards the end, limit is last element
    // if we are searching towards the beginning, limit is the first element
Yoni Fogel's avatar
Yoni Fogel committed
5037
    uint32_t limit = (direction > 0) ? (toku_omt_size(omt) - 1) : 0;
5038 5039

    //Starting with the prev, find the first real (non-provdel) leafentry.
5040
    OMTVALUE lev = NULL;
5041 5042
    while (index != limit) {
        index += direction;
5043
        r = toku_omt_fetch(omt, index, &lev);
5044
        assert_zero(r);
5045
        LEAFENTRY CAST_FROM_VOIDP(le, lev);
5046

5047
        if (toku_ft_cursor_is_leaf_mode(cursor) || !is_le_val_del(le, cursor)) {
5048

5049
            ft_cursor_extract_key_and_val(
5050 5051 5052 5053 5054 5055 5056
                le,
                cursor,
                keylen,
                key,
                vallen,
                val
                );
5057

5058 5059
            r = getf(*keylen, *key, *vallen, *val, getf_v, false);
            if (r == 0 || r == TOKUDB_CURSOR_CONTINUE) {
5060 5061 5062
                //Update cursor.
                cursor->leaf_info.to_be.index = index;
            }
5063
            if (r == TOKUDB_CURSOR_CONTINUE) {
5064 5065 5066 5067 5068
                continue;
            }
            else {
                break;
            }
5069
        }
5070
    }
5071

5072
    return r;
5073 5074 5075
}

int
5076
toku_ft_cursor_next(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
5077
{
5078
    ft_search_t search; ft_search_init(&search, ft_cursor_compare_next, FT_SEARCH_LEFT, &cursor->key, cursor->ft_handle);
Yoni Fogel's avatar
Yoni Fogel committed
5079
    int r = ft_cursor_search(cursor, &search, getf, getf_v, true);
5080 5081
    ft_search_finish(&search);
    if (r == 0) ft_cursor_set_prefetching(cursor);
5082
    return r;
5083 5084
}

5085
static int
5086
ft_cursor_search_eq_k_x_getf(ITEMLEN keylen,               bytevec key,
5087 5088
                              ITEMLEN vallen,               bytevec val,
                              void *v, bool lock_only) {
5089
    struct ft_cursor_search_struct *CAST_FROM_VOIDP(bcss, v);
5090 5091
    int r;
    if (key==NULL) {
5092
        r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v, false);
5093
    } else {
5094
        FT_CURSOR cursor = bcss->cursor;
5095 5096
        DBT newkey;
        toku_fill_dbt(&newkey, key, keylen);
5097
        if (compare_k_x(cursor->ft_handle, bcss->search->k, &newkey) == 0) {
5098 5099 5100 5101 5102
            r = bcss->getf(keylen, key, vallen, val, bcss->getf_v, lock_only);
        } else {
            r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v, lock_only);
            if (r==0) r = TOKUDB_FOUND_BUT_REJECTED;
        }
5103 5104 5105
    }
    return r;
}
5106 5107 5108

/* search for the kv pair that matches the search object and is equal to k */
static int
5109
ft_cursor_search_eq_k_x(FT_CURSOR cursor, ft_search_t *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
5110
{
5111
    struct ft_cursor_search_struct bcss = {getf, getf_v, cursor, search};
Yoni Fogel's avatar
Yoni Fogel committed
5112
    int r = toku_ft_search(cursor->ft_handle, search, ft_cursor_search_eq_k_x_getf, &bcss, cursor, false);
5113
    return r;
5114 5115
}

5116
static int ft_cursor_compare_prev(ft_search_t *search, DBT *x) {
5117
    FT_HANDLE CAST_FROM_VOIDP(brt, search->context);
5118
    return compare_k_x(brt, search->k, x) > 0; /* return max xy: kv > xy */
5119 5120
}

5121
int
5122
toku_ft_cursor_prev(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
5123
{
5124
    ft_search_t search; ft_search_init(&search, ft_cursor_compare_prev, FT_SEARCH_RIGHT, &cursor->key, cursor->ft_handle);
Yoni Fogel's avatar
Yoni Fogel committed
5125
    int r = ft_cursor_search(cursor, &search, getf, getf_v, true);
5126
    ft_search_finish(&search);
5127
    return r;
5128 5129
}

5130
static int ft_cursor_compare_set_range(ft_search_t *search, DBT *x) {
5131
    FT_HANDLE CAST_FROM_VOIDP(brt, search->context);
5132
    return compare_k_x(brt, search->k,        x) <= 0; /* return kv <= xy */
5133 5134 5135
}

int
5136
toku_ft_cursor_set(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
5137
{
5138 5139 5140
    ft_search_t search; ft_search_init(&search, ft_cursor_compare_set_range, FT_SEARCH_LEFT, key, cursor->ft_handle);
    int r = ft_cursor_search_eq_k_x(cursor, &search, getf, getf_v);
    ft_search_finish(&search);
5141
    return r;
5142 5143 5144
}

int
5145
toku_ft_cursor_set_range(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
5146
{
5147
    ft_search_t search; ft_search_init(&search, ft_cursor_compare_set_range, FT_SEARCH_LEFT, key, cursor->ft_handle);
Yoni Fogel's avatar
Yoni Fogel committed
5148
    int r = ft_cursor_search(cursor, &search, getf, getf_v, false);
5149
    ft_search_finish(&search);
5150
    return r;
5151 5152
}

5153
static int ft_cursor_compare_set_range_reverse(ft_search_t *search, DBT *x) {
5154
    FT_HANDLE CAST_FROM_VOIDP(brt, search->context);
5155
    return compare_k_x(brt, search->k, x) >= 0; /* return kv >= xy */
5156 5157 5158
}

int
5159
toku_ft_cursor_set_range_reverse(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
5160
{
5161
    ft_search_t search; ft_search_init(&search, ft_cursor_compare_set_range_reverse, FT_SEARCH_RIGHT, key, cursor->ft_handle);
Yoni Fogel's avatar
Yoni Fogel committed
5162
    int r = ft_cursor_search(cursor, &search, getf, getf_v, false);
5163
    ft_search_finish(&search);
5164
    return r;
5165 5166 5167
}


5168 5169 5170
//TODO: When tests have been rewritten, get rid of this function.
//Only used by tests.
int
5171
toku_ft_cursor_get (FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, int get_flags)
5172
{
5173
    int op = get_flags & DB_OPFLAGS_MASK;
5174
    if (get_flags & ~DB_OPFLAGS_MASK)
5175
        return EINVAL;
5176 5177 5178 5179

    switch (op) {
    case DB_CURRENT:
    case DB_CURRENT_BINDING:
5180
        return toku_ft_cursor_current(cursor, op, getf, getf_v);
5181
    case DB_FIRST:
5182
        return toku_ft_cursor_first(cursor, getf, getf_v);
5183
    case DB_LAST:
5184
        return toku_ft_cursor_last(cursor, getf, getf_v);
5185 5186
    case DB_NEXT:
    case DB_NEXT_NODUP:
5187 5188
        if (ft_cursor_not_set(cursor))
            return toku_ft_cursor_first(cursor, getf, getf_v);
5189
        else
5190
            return toku_ft_cursor_next(cursor, getf, getf_v);
5191 5192
    case DB_PREV:
    case DB_PREV_NODUP:
5193 5194
        if (ft_cursor_not_set(cursor))
            return toku_ft_cursor_last(cursor, getf, getf_v);
5195
        else
5196
            return toku_ft_cursor_prev(cursor, getf, getf_v);
5197
    case DB_SET:
5198
        return toku_ft_cursor_set(cursor, key, getf, getf_v);
5199
    case DB_SET_RANGE:
5200
        return toku_ft_cursor_set_range(cursor, key, getf, getf_v);
5201
    default: ;// Fall through
5202
    }
5203
    return EINVAL;
5204 5205
}

5206
void
5207
toku_ft_cursor_peek(FT_CURSOR cursor, const DBT **pkey, const DBT **pval)
5208 5209
// Effect: Retrieves a pointer to the DBTs for the current key and value.
// Requires:  The caller may not modify the DBTs or the memory at which they points.
5210
// Requires:  The caller must be in the context of a
5211
// FT_GET_(STRADDLE_)CALLBACK_FUNCTION
5212 5213 5214 5215 5216 5217
{
    *pkey = &cursor->key;
    *pval = &cursor->val;
}

//We pass in toku_dbt_fake to the search functions, since it will not pass the
5218
//key(or val) to the heaviside function if key(or val) is NULL.
5219 5220 5221
//It is not used for anything else,
//the actual 'extra' information for the heaviside function is inside the
//wrapper.
5222
static const DBT __toku_dbt_fake = {};
5223 5224
static const DBT* const toku_dbt_fake = &__toku_dbt_fake;

Yoni Fogel's avatar
Yoni Fogel committed
5225
bool toku_ft_cursor_uninitialized(FT_CURSOR c) {
5226
    return ft_cursor_not_set(c);
5227 5228 5229
}


5230 5231
/* ********************************* lookup **************************************/

5232
int
5233
toku_ft_lookup (FT_HANDLE brt, DBT *k, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
5234
{
5235
    int r, rr;
5236
    FT_CURSOR cursor;
5237

Yoni Fogel's avatar
Yoni Fogel committed
5238
    rr = toku_ft_cursor(brt, &cursor, NULL, false, false);
5239 5240
    if (rr != 0) return rr;

5241
    int op = DB_SET;
5242
    r = toku_ft_cursor_get(cursor, k, getf, getf_v, op);
5243

5244
    rr = toku_ft_cursor_close(cursor); assert_zero(rr);
5245 5246 5247 5248

    return r;
}

5249
/* ********************************* delete **************************************/
5250
static int
5251
getf_nothing (ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN UU(vallen), bytevec UU(val), void *UU(pair_v), bool UU(lock_only)) {
5252 5253 5254 5255
    return 0;
}

int
5256
toku_ft_cursor_delete(FT_CURSOR cursor, int flags, TOKUTXN txn) {
5257 5258 5259
    int r;

    int unchecked_flags = flags;
Yoni Fogel's avatar
Yoni Fogel committed
5260
    bool error_if_missing = (bool) !(flags&DB_DELETE_ANY);
5261 5262
    unchecked_flags &= ~DB_DELETE_ANY;
    if (unchecked_flags!=0) r = EINVAL;
5263
    else if (ft_cursor_not_set(cursor)) r = EINVAL;
5264
    else {
5265 5266
        r = 0;
        if (error_if_missing) {
5267
            r = toku_ft_cursor_current(cursor, DB_CURRENT, getf_nothing, NULL);
5268 5269
        }
        if (r == 0) {
5270
            r = toku_ft_delete(cursor->ft_handle, &cursor->key, txn);
5271
        }
5272 5273 5274 5275 5276 5277 5278
    }
    return r;
}

/* ********************* keyrange ************************ */


5279
struct keyrange_compare_s {
5280
    FT_HANDLE ft_handle;
5281 5282 5283
    DBT *key;
};

5284
static int
5285
keyrange_compare (OMTVALUE lev, void *extra) {
5286
    LEAFENTRY CAST_FROM_VOIDP(le, lev);
Yoni Fogel's avatar
Yoni Fogel committed
5287
    uint32_t keylen;
5288 5289 5290
    void* key = le_key_and_len(le, &keylen);
    DBT   omt_dbt;
    toku_fill_dbt(&omt_dbt, key, keylen);
5291
    struct keyrange_compare_s *CAST_FROM_VOIDP(s, extra);
5292
    // TODO: maybe put a const fake_db in the header
5293 5294
    FAKE_DB(db, &s->ft_handle->ft->cmp_descriptor);
    return s->ft_handle->ft->compare_fun(&db, &omt_dbt, s->key);
5295 5296
}

5297
static void
Yoni Fogel's avatar
Yoni Fogel committed
5298 5299
keyrange_in_leaf_partition (FT_HANDLE brt, FTNODE node, DBT *key, int child_number, uint64_t estimated_num_rows,
                            uint64_t *less, uint64_t *equal, uint64_t *greater)
5300
// If the partition is in main memory then estimate the number
5301
// If KEY==NULL then use an arbitrary key (leftmost or zero)
5302
{
5303
    assert(node->height == 0); // we are in a leaf
5304
    if (BP_STATE(node, child_number) == PT_AVAIL) {
5305 5306 5307 5308
        // If the partition is in main memory then get an exact count.
        struct keyrange_compare_s s = {brt,key};
        BASEMENTNODE bn = BLB(node, child_number);
        OMTVALUE datav;
Yoni Fogel's avatar
Yoni Fogel committed
5309
        uint32_t idx = 0;
5310 5311 5312 5313 5314 5315 5316 5317 5318 5319 5320 5321
        // if key is NULL then set r==-1 and idx==0.
        int r = key ? toku_omt_find_zero(bn->buffer, keyrange_compare, &s, &datav, &idx) : -1;
        if (r==0) {
            *less    = idx;
            *equal   = 1;
            *greater = toku_omt_size(bn->buffer)-idx-1;
        } else {
            // If not found, then the idx says where it's between.
            *less    = idx;
            *equal   = 0;
            *greater = toku_omt_size(bn->buffer)-idx;
        }
5322
    } else {
5323 5324 5325
        *less    = estimated_num_rows / 2;
        *equal   = 0;
        *greater = *less;
5326 5327 5328
    }
}

5329
static int
5330
toku_ft_keyrange_internal (FT_HANDLE brt, FTNODE node,
Yoni Fogel's avatar
Yoni Fogel committed
5331 5332
                            DBT *key, uint64_t *less, uint64_t *equal, uint64_t *greater,
                            uint64_t estimated_num_rows,
5333
                            struct ftnode_fetch_extra *bfe, // set up to read a minimal read.
5334
                            struct unlockers *unlockers, ANCESTORS ancestors, struct pivot_bounds const * const bounds)
5335 5336 5337
// Implementation note: Assign values to less, equal, and greater, and then on the way out (returning up the stack) we add more values in.
{
    int r = 0;
5338
    // if KEY is NULL then use the leftmost key.
5339
    int child_number = key ? toku_ftnode_which_child (node, key, &brt->ft->cmp_descriptor, brt->ft->compare_fun) : 0;
5340 5341 5342
    uint64_t rows_per_child = estimated_num_rows / node->n_children;
    if (node->height == 0) {

5343
        keyrange_in_leaf_partition(brt, node, key, child_number, rows_per_child, less, equal, greater);
5344 5345 5346

        *less    += rows_per_child * child_number;
        *greater += rows_per_child * (node->n_children - child_number - 1);
5347

5348
    } else {
5349 5350 5351
        // do the child.
        struct ancestors next_ancestors = {node, child_number, ancestors};
        BLOCKNUM childblocknum = BP_BLOCKNUM(node, child_number);
Yoni Fogel's avatar
Yoni Fogel committed
5352
        uint32_t fullhash = compute_child_fullhash(brt->ft->cf, node, child_number);
5353
        FTNODE childnode;
Yoni Fogel's avatar
Yoni Fogel committed
5354
        bool msgs_applied = false;
5355
        r = toku_pin_ftnode_batched(
5356 5357 5358 5359 5360 5361
            brt,
            childblocknum,
            fullhash,
            unlockers,
            &next_ancestors,
            bounds,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5362
            bfe,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5363
            PL_READ, // may_modify_node is false, because node guaranteed to not change
Yoni Fogel's avatar
Yoni Fogel committed
5364
            false,
5365
            false,
5366
            &childnode,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5367 5368
            &msgs_applied
            );
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5369
        assert(!msgs_applied);
5370 5371
        if (r != TOKUDB_TRY_AGAIN) {
            assert(r == 0);
5372

Yoni Fogel's avatar
Yoni Fogel committed
5373 5374
            struct unlock_ftnode_extra unlock_extra   = {brt,childnode,false};
            struct unlockers next_unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, unlockers};
5375
            const struct pivot_bounds next_bounds = next_pivot_keys(node, child_number, bounds);
5376

5377
            r = toku_ft_keyrange_internal(brt, childnode, key, less, equal, greater, rows_per_child,
5378
                                           bfe, &next_unlockers, &next_ancestors, &next_bounds);
5379 5380
            if (r != TOKUDB_TRY_AGAIN) {
                assert(r == 0);
5381

5382 5383
                *less    += rows_per_child * child_number;
                *greater += rows_per_child * (node->n_children - child_number - 1);
5384

5385
                assert(unlockers->locked);
5386
                toku_unpin_ftnode_read_only(brt, childnode);
5387 5388
            }
        }
5389
    }
5390
    return r;
5391 5392
}

5393
int
Yoni Fogel's avatar
Yoni Fogel committed
5394
toku_ft_keyrange (FT_HANDLE brt, DBT *key, uint64_t *less_p, uint64_t *equal_p, uint64_t *greater_p)
5395 5396
// Effect: Return an estimate  of the number of keys to the left, the number equal, and the number to the right of the key.
//   The values are an estimate.
5397
//   If you perform a keyrange on two keys that are in the same in-memory and uncompressed basement,
5398 5399 5400
//   you can use the keys_right numbers (or the keys_left) numbers to get an exact number keys in the range,
//   if the basement does not change between the keyrange queries.
//   TODO 4184: What to do with a NULL key?
5401
//   If KEY is NULL then the system picks an arbitrary key and returns it.
5402
{
5403
    assert(brt->ft);
5404
    struct ftnode_fetch_extra bfe;
5405
    fill_bfe_for_min_read(&bfe, brt->ft);  // read pivot keys but not message buffers
5406
try_again:
5407
    {
Yoni Fogel's avatar
Yoni Fogel committed
5408
        uint64_t less = 0, equal = 0, greater = 0;
5409
        FTNODE node = NULL;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5410
        {
Yoni Fogel's avatar
Yoni Fogel committed
5411
            uint32_t fullhash;
5412
            CACHEKEY root_key;
5413
            toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash);
5414 5415
            toku_cachetable_begin_batched_pin(brt->ft->cf);
            toku_pin_ftnode_off_client_thread_batched(
5416
                brt->ft,
5417
                root_key,
5418
                fullhash,
5419
                &bfe,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5420
                PL_READ, // may_modify_node, cannot change root during keyrange
5421 5422 5423 5424
                0,
                NULL,
                &node
                );
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5425
        }
5426

Yoni Fogel's avatar
Yoni Fogel committed
5427 5428
        struct unlock_ftnode_extra unlock_extra = {brt,node,false};
        struct unlockers unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, (UNLOCKERS)NULL};
5429 5430

        {
5431
            int64_t numrows = brt->ft->in_memory_stats.numrows;
5432 5433
            if (numrows < 0)
                numrows = 0;  // prevent appearance of a negative number
5434
            int r = toku_ft_keyrange_internal (brt, node, key,
5435 5436 5437 5438
                                                &less, &equal, &greater,
                                                numrows,
                                                &bfe, &unlockers, (ANCESTORS)NULL, &infinite_bounds);
            assert(r == 0 || r == TOKUDB_TRY_AGAIN);
5439
            toku_cachetable_end_batched_pin(brt->ft->cf);
5440 5441 5442 5443 5444 5445
            if (r == TOKUDB_TRY_AGAIN) {
                assert(!unlockers.locked);
                goto try_again;
            }
        }
        assert(unlockers.locked);
5446
        toku_unpin_ftnode_read_only(brt, node);
5447 5448 5449
        *less_p    = less;
        *equal_p   = equal;
        *greater_p = greater;
5450
    }
5451 5452 5453
    return 0;
}

5454
int
5455
toku_ft_handle_stat64 (FT_HANDLE brt, TOKUTXN UU(txn), struct ftstat64_s *s) {
5456 5457
    assert(brt->ft);
    toku_ft_stat64(brt->ft, s);
5458 5459 5460
    return 0;
}

5461 5462
/* ********************* debugging dump ************************ */
static int
5463
toku_dump_ftnode (FILE *file, FT_HANDLE brt, BLOCKNUM blocknum, int depth, const DBT *lorange, const DBT *hirange) {
5464
    int result=0;
5465
    FTNODE node;
5466
    toku_get_node_for_verify(blocknum, brt, &node);
5467
    result=toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0);
Yoni Fogel's avatar
Yoni Fogel committed
5468
    uint32_t fullhash = toku_cachetable_hash(brt->ft->cf, blocknum);
5469
    struct ftnode_fetch_extra bfe;
5470
    fill_bfe_for_full_read(&bfe, brt->ft);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5471 5472
    toku_pin_ftnode_off_client_thread(
        brt->ft,
5473
        blocknum,
5474
        fullhash,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5475 5476 5477
        &bfe,
        PL_WRITE_EXPENSIVE,
        0,
5478
        NULL,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5479
        &node
5480
        );
5481
    assert(node->fullhash==fullhash);
5482
    fprintf(file, "%*sNode=%p\n", depth, "", node);
5483

5484
    fprintf(file, "%*sNode %" PRId64 " nodesize=%u height=%d n_children=%d  keyrange=%s %s\n",
5485
            depth, "", blocknum.b, node->nodesize, node->height, node->n_children, (char*)(lorange ? lorange->data : 0), (char*)(hirange ? hirange->data : 0));
5486
    {
5487 5488 5489
        int i;
        for (i=0; i+1< node->n_children; i++) {
            fprintf(file, "%*spivotkey %d =", depth+1, "", i);
5490
            toku_print_BYTESTRING(file, node->childkeys[i].size, (char *) node->childkeys[i].data);
5491 5492 5493 5494
            fprintf(file, "\n");
        }
        for (i=0; i< node->n_children; i++) {
            if (node->height > 0) {
5495
                NONLEAF_CHILDINFO bnc = BNC(node, i);
5496 5497 5498 5499
                fprintf(file, "%*schild %d buffered (%d entries):", depth+1, "", i, toku_bnc_n_entries(bnc));
                FIFO_ITERATE(bnc->buffer, key, keylen, data, datalen, type, msn, xids, UU(is_fresh),
                             {
                                 data=data; datalen=datalen; keylen=keylen;
5500
                                 fprintf(file, "%*s xid=%" PRIu64 " %u (type=%d) msn=0x%" PRIu64 "\n", depth+2, "", xids_get_innermost_xid(xids), (unsigned)toku_dtoh32(*(int*)key), type, msn.msn);
5501 5502 5503 5504 5505 5506 5507 5508 5509
                                 //assert(strlen((char*)key)+1==keylen);
                                 //assert(strlen((char*)data)+1==datalen);
                             });
            }
            else {
                int size = toku_omt_size(BLB_BUFFER(node, i));
                if (0)
                    for (int j=0; j<size; j++) {
                        OMTVALUE v = 0;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5510
                        int r = toku_omt_fetch(BLB_BUFFER(node, i), j, &v);
5511
                        assert_zero(r);
5512
                        LEAFENTRY CAST_FROM_VOIDP(le, v);
5513
                        fprintf(file, " [%d]=", j);
5514
                        print_leafentry(file, le);
5515 5516 5517 5518 5519 5520 5521 5522 5523 5524
                        fprintf(file, "\n");
                    }
                //               printf(" (%d)%u ", len, *(int*)le_key(data)));
                fprintf(file, "\n");
            }
        }
        if (node->height > 0) {
            for (i=0; i<node->n_children; i++) {
                fprintf(file, "%*schild %d\n", depth, "", i);
                if (i>0) {
5525
                    char *CAST_FROM_VOIDP(key, node->childkeys[i-1].data);
5526 5527
                    fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->childkeys[i-1].size, (unsigned)toku_dtoh32(*(int*)key));
                }
5528
                toku_dump_ftnode(file, brt, BP_BLOCKNUM(node, i), depth+4,
5529 5530 5531 5532
                                  (i==0) ? lorange : &node->childkeys[i-1],
                                  (i==node->n_children-1) ? hirange : &node->childkeys[i]);
            }
        }
5533
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5534
    toku_unpin_ftnode_off_client_thread(brt->ft, node);
5535 5536 5537
    return result;
}

5538
int toku_dump_ft (FILE *f, FT_HANDLE brt) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5539
    int r;
5540 5541
    assert(brt->ft);
    toku_dump_translation_table(f, brt->ft->blocktable);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5542
    {
Yoni Fogel's avatar
Yoni Fogel committed
5543
        uint32_t fullhash = 0;
5544
        CACHEKEY root_key;
5545
        toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash);
5546
        r = toku_dump_ftnode(f, brt, root_key, 0, 0, 0);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5547 5548
    }
    return r;
5549
}
5550

5551
int toku_ft_layer_init(void) {
Yoni Fogel's avatar
Yoni Fogel committed
5552 5553
    int r = 0;
    //Portability must be initialized first
5554 5555 5556
    r = toku_portability_init();
    if (r) { goto exit; }

5557
    partitioned_counters_init();
5558 5559
    toku_checkpoint_init();
    toku_ft_serialize_layer_init();
5560 5561
    toku_mutex_init(&ft_open_close_lock, NULL);
exit:
Yoni Fogel's avatar
Yoni Fogel committed
5562
    return r;
Yoni Fogel's avatar
Yoni Fogel committed
5563 5564
}

5565
void toku_ft_layer_destroy(void) {
5566
    toku_mutex_destroy(&ft_open_close_lock);
5567 5568
    toku_ft_serialize_layer_destroy();
    toku_checkpoint_destroy();
5569
    partitioned_counters_destroy();
Yoni Fogel's avatar
Yoni Fogel committed
5570
    //Portability must be cleaned up last
5571
    toku_portability_destroy();
Yoni Fogel's avatar
Yoni Fogel committed
5572 5573
}

5574 5575
// This lock serializes all opens and closes because the cachetable requires that clients do not try to open or close a cachefile in parallel.  We made
// it coarser by not allowing any cachefiles to be open or closed in parallel.
5576 5577 5578 5579 5580 5581 5582 5583
void toku_ft_open_close_lock(void) {
    toku_mutex_lock(&ft_open_close_lock);
}

void toku_ft_open_close_unlock(void) {
    toku_mutex_unlock(&ft_open_close_lock);
}

5584 5585
//Suppress both rollback and recovery logs.
void
5586
toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn) {
5587 5588 5589
    assert(brt->ft->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(txn));
    assert(brt->ft->txnid_that_suppressed_recovery_logs           == TXNID_NONE);
    brt->ft->txnid_that_suppressed_recovery_logs                   = toku_txn_get_txnid(txn);
Yoni Fogel's avatar
Yoni Fogel committed
5590
    txn->checkpoint_needed_before_commit = true;
5591 5592
}

5593
int toku_ft_handle_set_panic(FT_HANDLE brt, int panic, const char *panic_string) {
5594
    return toku_ft_set_panic(brt->ft, panic, panic_string);
5595 5596
}

5597 5598
// Prepare to remove a dictionary from the database when this transaction is committed:
//  - mark transaction as NEED fsync on commit
5599
//  - make entry in rollback log
5600
//  - make fdelete entry in recovery log
5601 5602 5603 5604 5605 5606 5607 5608 5609 5610 5611 5612 5613
//
// Effect: when the txn commits, the ft's cachefile will be marked as unlink
//         on close. see toku_commit_fdelete and how unlink on close works
//         in toku_cachefile_close();
// Requires: serialized with begin checkpoint
//           this does not need to take the open close lock because
//           1.) the ft/cf cannot go away because we have a live handle.
//           2.) we're not setting the unlink on close bit _here_. that
//           happens on txn commit (as the name suggests).
//           3.) we're already holding the multi operation lock to 
//           synchronize with begin checkpoint.
// Contract: the iname of the ft should never be reused.
int 
5614
toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn) {
5615
    int r;
5616
    CACHEFILE cf;
5617

5618 5619
    assert(txn);
    cf = handle->ft->cf;
5620
    FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
5621

5622 5623
    toku_txn_maybe_note_ft(txn, ft);

5624 5625 5626 5627 5628 5629 5630 5631
    // If the txn commits, the commit MUST be in the log before the file is actually unlinked
    toku_txn_force_fsync_on_commit(txn); 
    // make entry in rollback log
    FILENUM filenum = toku_cachefile_filenum(cf);
    r = toku_logger_save_rollback_fdelete(txn, filenum);
    assert_zero(r);
    // make entry in recovery log
    r = toku_logger_log_fdelete(txn, filenum);
5632 5633 5634
    return r;
}

5635 5636 5637 5638 5639 5640 5641
// Non-transactional version of fdelete
//
// Effect: The ft file is unlinked when the handle closes and it's ft is not
//         pinned by checkpoint. see toku_remove_ft_ref() and how unlink on
//         close works in toku_cachefile_close();
// Requires: serialized with begin checkpoint
void 
5642
toku_ft_unlink(FT_HANDLE handle) {
5643
    CACHEFILE cf;
5644 5645
    cf = handle->ft->cf;
    toku_cachefile_unlink_on_close(cf);
5646 5647
}

5648
int
5649
toku_ft_get_fragmentation(FT_HANDLE brt, TOKU_DB_FRAGMENTATION report) {
5650 5651
    int r;

5652
    int fd = toku_cachefile_get_fd(brt->ft->cf);
5653
    toku_ft_lock(brt->ft);
5654 5655

    int64_t file_size;
5656
    r = toku_os_get_file_size(fd, &file_size);
5657
    if (r==0) {
5658
        report->file_size_bytes = file_size;
5659
        toku_block_table_get_fragmentation_unlocked(brt->ft->blocktable, report);
5660
    }
5661
    toku_ft_unlock(brt->ft);
5662 5663
    return r;
}
5664

Yoni Fogel's avatar
Yoni Fogel committed
5665
static bool is_empty_fast_iter (FT_HANDLE brt, FTNODE node) {
5666
    if (node->height > 0) {
5667
        for (int childnum=0; childnum<node->n_children; childnum++) {
5668
            if (toku_bnc_nbytesinbuf(BNC(node, childnum)) != 0) {
5669 5670
                return 0; // it's not empty if there are bytes in buffers
            }
5671
            FTNODE childnode;
5672 5673
            {
                BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
Yoni Fogel's avatar
Yoni Fogel committed
5674
                uint32_t fullhash =  compute_child_fullhash(brt->ft->cf, node, childnum);
5675
                struct ftnode_fetch_extra bfe;
5676
                fill_bfe_for_full_read(&bfe, brt->ft);
5677
                // don't need to pass in dependent nodes as we are not
5678
                // modifying nodes we are pinning
5679
                toku_pin_ftnode_off_client_thread(
5680
                    brt->ft,
5681 5682 5683
                    childblocknum,
                    fullhash,
                    &bfe,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5684
                    PL_READ, // may_modify_node set to false, as nodes not modified
5685 5686 5687
                    0,
                    NULL,
                    &childnode
5688
                    );
5689 5690
            }
            int child_is_empty = is_empty_fast_iter(brt, childnode);
5691
            toku_unpin_ftnode(brt->ft, childnode);
5692 5693 5694
            if (!child_is_empty) return 0;
        }
        return 1;
5695
    } else {
5696 5697 5698
        // leaf:  If the omt is empty, we are happy.
        for (int i = 0; i < node->n_children; i++) {
            if (toku_omt_size(BLB_BUFFER(node, i))) {
Yoni Fogel's avatar
Yoni Fogel committed
5699
                return false;
5700 5701
            }
        }
Yoni Fogel's avatar
Yoni Fogel committed
5702
        return true;
5703 5704 5705
    }
}

Yoni Fogel's avatar
Yoni Fogel committed
5706
bool toku_ft_is_empty_fast (FT_HANDLE brt)
5707 5708 5709
// A fast check to see if the tree is empty.  If there are any messages or leafentries, we consider the tree to be nonempty.  It's possible that those
// messages and leafentries would all optimize away and that the tree is empty, but we'll say it is nonempty.
{
Yoni Fogel's avatar
Yoni Fogel committed
5710
    uint32_t fullhash;
5711
    FTNODE node;
5712
    {
5713
        CACHEKEY root_key;
5714
        toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash);
5715
        struct ftnode_fetch_extra bfe;
5716
        fill_bfe_for_full_read(&bfe, brt->ft);
5717
        toku_pin_ftnode_off_client_thread(
5718
            brt->ft,
5719
            root_key,
5720
            fullhash,
5721
            &bfe,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
5722
            PL_READ, // may_modify_node set to false, node does not change
5723 5724 5725
            0,
            NULL,
            &node
5726
            );
5727
    }
Yoni Fogel's avatar
Yoni Fogel committed
5728
    bool r = is_empty_fast_iter(brt, node);
5729
    toku_unpin_ftnode(brt->ft, node);
5730 5731
    return r;
}
5732

5733
// test-only
5734
int toku_ft_strerror_r(int error, char *buf, size_t buflen)
5735 5736
{
    if (error>=0) {
5737
        return (long) strerror_r(error, buf, buflen);
5738
    } else {
5739 5740 5741 5742 5743 5744 5745 5746 5747
        switch (error) {
        case DB_KEYEXIST:
            snprintf(buf, buflen, "Key exists");
            return 0;
        case TOKUDB_CANCELED:
            snprintf(buf, buflen, "User canceled operation");
            return 0;
        default:
            snprintf(buf, buflen, "Unknown error %d", error);
5748
            return EINVAL;
5749
        }
5750 5751
    }
}
5752

5753
#include <valgrind/helgrind.h>
5754
void __attribute__((__constructor__)) toku_ft_helgrind_ignore(void);
5755
void
5756
toku_ft_helgrind_ignore(void) {
5757
    HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&ft_status, sizeof ft_status);
5758
}
5759 5760

#undef STATUS_VALUE