ft-flusher.cc 70 KB
Newer Older
1 2
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3
#ident "$Id$"
4
#ident "Copyright (c) 2007-2012 Tokutek Inc.  All rights reserved."
5 6
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."

7 8 9 10 11
#include <ft-internal.h>
#include <ft-flusher.h>
#include <ft-flusher-internal.h>
#include <ft-cachetable-wrappers.h>
#include <ft.h>
12 13
#include <toku_assert.h>
#include <portability/toku_atomic.h>
Leif Walsh's avatar
Leif Walsh committed
14

15 16 17
/* Status is intended for display to humans to help understand system behavior.
 * It does not need to be perfectly thread-safe.
 */
18
static FT_FLUSHER_STATUS_S ft_flusher_status;
19 20

#define STATUS_INIT(k,                                                           t,                                      l) {                                            \
21 22 23
        ft_flusher_status.status[k].keyname = #k;                      \
        ft_flusher_status.status[k].type    = t;                       \
        ft_flusher_status.status[k].legend  = "brt flusher: " l;       \
24
    }
Leif Walsh's avatar
Leif Walsh committed
25

26 27
#define STATUS_VALUE(x) ft_flusher_status.status[x].value.num
void toku_ft_flusher_status_init(void) {
28 29
    // Note,                                                                     this function initializes the keyname,  type, and legend fields.
    // Value fields are initialized to zero by compiler.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
    STATUS_INIT(FT_FLUSHER_CLEANER_TOTAL_NODES,                UINT64, "total nodes potentially flushed by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_H1_NODES,                   UINT64, "height-one nodes flushed by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_HGT1_NODES,                 UINT64, "height-greater-than-one nodes flushed by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_EMPTY_NODES,                UINT64, "nodes cleaned which had empty buffers");
    STATUS_INIT(FT_FLUSHER_CLEANER_NODES_DIRTIED,              UINT64, "nodes dirtied by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE,            UINT64, "max bytes in a buffer flushed by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE,            UINT64, "min bytes in a buffer flushed by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_TOTAL_BUFFER_SIZE,          UINT64, "total bytes in buffers flushed by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE,        UINT64, "max workdone in a buffer flushed by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE,        UINT64, "min workdone in a buffer flushed by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_TOTAL_BUFFER_WORKDONE,      UINT64, "total workdone in buffers flushed by cleaner thread");
    STATUS_INIT(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED,    UINT64, "times cleaner thread tries to merge a leaf");
    STATUS_INIT(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING,    UINT64, "cleaner thread leaf merges in progress");
    STATUS_INIT(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED,  UINT64, "cleaner thread leaf merges successful");
    STATUS_INIT(FT_FLUSHER_CLEANER_NUM_DIRTIED_FOR_LEAF_MERGE, UINT64, "nodes dirtied by cleaner thread leaf merges");
    STATUS_INIT(FT_FLUSHER_FLUSH_TOTAL,                        UINT64, "total number of flushes done by flusher threads or cleaner threads");
    STATUS_INIT(FT_FLUSHER_FLUSH_IN_MEMORY,                    UINT64, "number of in memory flushes");
    STATUS_INIT(FT_FLUSHER_FLUSH_NEEDED_IO,                    UINT64, "number of flushes that read something off disk");
    STATUS_INIT(FT_FLUSHER_FLUSH_CASCADES,                     UINT64, "number of flushes that triggered another flush in child");
    STATUS_INIT(FT_FLUSHER_FLUSH_CASCADES_1,                   UINT64, "number of flushes that triggered 1 cascading flush");
    STATUS_INIT(FT_FLUSHER_FLUSH_CASCADES_2,                   UINT64, "number of flushes that triggered 2 cascading flushes");
    STATUS_INIT(FT_FLUSHER_FLUSH_CASCADES_3,                   UINT64, "number of flushes that triggered 3 cascading flushes");
    STATUS_INIT(FT_FLUSHER_FLUSH_CASCADES_4,                   UINT64, "number of flushes that triggered 4 cascading flushes");
    STATUS_INIT(FT_FLUSHER_FLUSH_CASCADES_5,                   UINT64, "number of flushes that triggered 5 cascading flushes");
    STATUS_INIT(FT_FLUSHER_FLUSH_CASCADES_GT_5,                UINT64, "number of flushes that triggered over 5 cascading flushes");
    STATUS_INIT(FT_FLUSHER_SPLIT_LEAF,                         UINT64, "leaf node splits");
    STATUS_INIT(FT_FLUSHER_SPLIT_NONLEAF,                      UINT64, "nonleaf node splits");
    STATUS_INIT(FT_FLUSHER_MERGE_LEAF,                         UINT64, "leaf node merges");
    STATUS_INIT(FT_FLUSHER_MERGE_NONLEAF,                      UINT64, "nonleaf node merges");
    STATUS_INIT(FT_FLUSHER_BALANCE_LEAF,                       UINT64, "leaf node balances");

    STATUS_VALUE(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE) = UINT64_MAX;
    STATUS_VALUE(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE) = UINT64_MAX;

    ft_flusher_status.initialized = true;
Leif Walsh's avatar
Leif Walsh committed
65
}
66
#undef STATUS_INIT
Leif Walsh's avatar
Leif Walsh committed
67

68 69 70
void toku_ft_flusher_get_status(FT_FLUSHER_STATUS status) {
    if (!ft_flusher_status.initialized) {
        toku_ft_flusher_status_init();
71
    }
72
    *status = ft_flusher_status;
Leif Walsh's avatar
Leif Walsh committed
73
}
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88

//
// For test purposes only.
// These callbacks are never used in production code, only as a way
//  to test the system (for example, by causing crashes at predictable times).
//
static void (*flusher_thread_callback)(int, void*) = NULL;
static void *flusher_thread_callback_extra = NULL;

void toku_flusher_thread_set_callback(void (*callback_f)(int, void*),
                                      void* extra) {
    flusher_thread_callback = callback_f;
    flusher_thread_callback_extra = extra;
}

89
static void call_flusher_thread_callback(int flt_state) {
90
    if (flusher_thread_callback) {
91
        flusher_thread_callback(flt_state, flusher_thread_callback_extra);
92 93 94
    }
}

Leif Walsh's avatar
Leif Walsh committed
95
static int
96
find_heaviest_child(FTNODE node)
97 98 99 100 101 102
{
    int max_child = 0;
    int max_weight = toku_bnc_nbytesinbuf(BNC(node, 0)) + BP_WORKDONE(node, 0);
    int i;

    if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
103
    paranoid_invariant(node->n_children>0);
104
    for (i=1; i<node->n_children; i++) {
105
#ifdef TOKU_DEBUG_PARANOID
106 107 108
        if (BP_WORKDONE(node,i)) {
            assert(toku_bnc_nbytesinbuf(BNC(node,i)) > 0);
        }
109
#endif
110 111 112 113 114 115 116 117
        int this_weight = toku_bnc_nbytesinbuf(BNC(node,i)) + BP_WORKDONE(node,i);;
        if (0) printf(" %d", this_weight);
        if (max_weight < this_weight) {
            max_child = i;
            max_weight = this_weight;
        }
    }
    if (0) printf("\n");
Leif Walsh's avatar
Leif Walsh committed
118
    return max_child;
119 120 121
}

static void
122 123
update_flush_status(FTNODE child, int cascades) {
    STATUS_VALUE(FT_FLUSHER_FLUSH_TOTAL)++;
124
    if (cascades > 0) {
125
        STATUS_VALUE(FT_FLUSHER_FLUSH_CASCADES)++;
126 127
        switch (cascades) {
        case 1:
128
            STATUS_VALUE(FT_FLUSHER_FLUSH_CASCADES_1)++; break;
129
        case 2:
130
            STATUS_VALUE(FT_FLUSHER_FLUSH_CASCADES_2)++; break;
131
        case 3:
132
            STATUS_VALUE(FT_FLUSHER_FLUSH_CASCADES_3)++; break;
133
        case 4:
134
            STATUS_VALUE(FT_FLUSHER_FLUSH_CASCADES_4)++; break;
135
        case 5:
136
            STATUS_VALUE(FT_FLUSHER_FLUSH_CASCADES_5)++; break;
137
        default:
138
            STATUS_VALUE(FT_FLUSHER_FLUSH_CASCADES_GT_5)++; break;
139 140 141 142 143 144 145 146 147
        }
    }
    bool flush_needs_io = false;
    for (int i = 0; !flush_needs_io && i < child->n_children; ++i) {
        if (BP_STATE(child, i) == PT_ON_DISK) {
            flush_needs_io = true;
        }
    }
    if (flush_needs_io) {
148
        STATUS_VALUE(FT_FLUSHER_FLUSH_NEEDED_IO)++;
149
    } else {
150
        STATUS_VALUE(FT_FLUSHER_FLUSH_IN_MEMORY)++;
151 152 153 154
    }
}

static void
155
maybe_destroy_child_blbs(FTNODE node, FTNODE child, FT h)
156
{
157 158 159
    // If the node is already fully in memory, as in upgrade, we don't
    // need to destroy the basement nodes because they are all equally
    // up to date.
160
    if (child->n_children > 1 && 
161 162
        child->height == 0 && 
        !child->dirty) {
163 164
        for (int i = 0; i < child->n_children; ++i) {
            if (BP_STATE(child, i) == PT_AVAIL &&
165 166 167
                node->max_msn_applied_to_node_on_disk.msn < BLB_MAX_MSN_APPLIED(child, i).msn) 
            {
                toku_evict_bn_from_memory(child, i, h);
168 169 170 171 172
            }
        }
    }
}

Leif Walsh's avatar
Leif Walsh committed
173
static void
174 175 176
ft_merge_child(
    FT h,
    FTNODE node,
Leif Walsh's avatar
Leif Walsh committed
177
    int childnum_to_merge,
Yoni Fogel's avatar
Yoni Fogel committed
178
    bool *did_react,
Leif Walsh's avatar
Leif Walsh committed
179 180 181
    struct flusher_advice *fa);

static int
182 183
pick_heaviest_child(FT UU(h),
                    FTNODE parent,
Leif Walsh's avatar
Leif Walsh committed
184 185 186
                    void* UU(extra))
{
    int childnum = find_heaviest_child(parent);
187
    paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
Leif Walsh's avatar
Leif Walsh committed
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
    return childnum;
}

bool
dont_destroy_basement_nodes(void* UU(extra))
{
    return false;
}

static bool
do_destroy_basement_nodes(void* UU(extra))
{
    return true;
}

bool
204
always_recursively_flush(FTNODE UU(child), void* UU(extra))
Leif Walsh's avatar
Leif Walsh committed
205 206 207 208
{
    return true;
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
209 210 211 212 213 214
bool
never_recursively_flush(FTNODE UU(child), void* UU(extra))
{
    return false;
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
215 216 217 218 219 220 221 222
/**
 * Flusher thread ("normal" flushing) implementation.
 */
struct flush_status_update_extra {
    int cascades;
    uint32_t nodesize;
};

Leif Walsh's avatar
Leif Walsh committed
223
static bool
Zardosht Kasheff's avatar
Zardosht Kasheff committed
224
recurse_if_child_is_gorged(FTNODE child, void* extra)
Leif Walsh's avatar
Leif Walsh committed
225
{
Zardosht Kasheff's avatar
Zardosht Kasheff committed
226 227
    struct flush_status_update_extra *fste = (flush_status_update_extra *)extra;
    return toku_ft_nonleaf_is_gorged(child, fste->nodesize);
Leif Walsh's avatar
Leif Walsh committed
228 229 230
}

int
231 232
default_pick_child_after_split(FT UU(h),
                               FTNODE UU(parent),
Leif Walsh's avatar
Leif Walsh committed
233 234 235 236 237 238 239 240 241
                               int UU(childnuma),
                               int UU(childnumb),
                               void* UU(extra))
{
    return -1;
}

void
default_merge_child(struct flusher_advice *fa,
242 243
                    FT h,
                    FTNODE parent,
Leif Walsh's avatar
Leif Walsh committed
244
                    int childnum,
245
                    FTNODE child,
Leif Walsh's avatar
Leif Walsh committed
246 247 248
                    void* UU(extra))
{
    //
249 250
    // There is probably a way to pass FTNODE child
    // into ft_merge_child, but for simplicity for now,
Leif Walsh's avatar
Leif Walsh committed
251
    // we are just going to unpin child and
252
    // let ft_merge_child pin it again
Leif Walsh's avatar
Leif Walsh committed
253
    //
254
    toku_unpin_ftnode_off_client_thread(h, child);
Leif Walsh's avatar
Leif Walsh committed
255 256
    //
    //
257
    // it is responsibility of ft_merge_child to unlock parent
Leif Walsh's avatar
Leif Walsh committed
258
    //
Yoni Fogel's avatar
Yoni Fogel committed
259
    bool did_react;
260
    ft_merge_child(h, parent, childnum, &did_react, fa);
Leif Walsh's avatar
Leif Walsh committed
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
}

void
flusher_advice_init(
    struct flusher_advice *fa,
    FA_PICK_CHILD pick_child,
    FA_SHOULD_DESTROY_BN should_destroy_basement_nodes,
    FA_SHOULD_RECURSIVELY_FLUSH should_recursively_flush,
    FA_MAYBE_MERGE_CHILD maybe_merge_child,
    FA_UPDATE_STATUS update_status,
    FA_PICK_CHILD_AFTER_SPLIT pick_child_after_split,
    void* extra
    )
{
    fa->pick_child = pick_child;
    fa->should_destroy_basement_nodes = should_destroy_basement_nodes;
    fa->should_recursively_flush = should_recursively_flush;
    fa->maybe_merge_child = maybe_merge_child;
    fa->update_status = update_status;
    fa->pick_child_after_split = pick_child_after_split;
    fa->extra = extra;
}

static void
285
flt_update_status(FTNODE child,
Leif Walsh's avatar
Leif Walsh committed
286 287 288
                 int UU(dirtied),
                 void* extra)
{
289
    struct flush_status_update_extra *fste = (struct flush_status_update_extra *) extra;
Leif Walsh's avatar
Leif Walsh committed
290 291 292 293 294 295 296
    update_flush_status(child, fste->cascades);
    // If `flush_some_child` decides to recurse after this, we'll need
    // cascades to increase.  If not it doesn't matter.
    fste->cascades++;
}

static void
Zardosht Kasheff's avatar
Zardosht Kasheff committed
297
flt_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra *fste, uint32_t nodesize)
Leif Walsh's avatar
Leif Walsh committed
298 299
{
    fste->cascades = 0;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
300
    fste->nodesize = nodesize;
Leif Walsh's avatar
Leif Walsh committed
301 302 303 304 305
    flusher_advice_init(fa,
                        pick_heaviest_child,
                        dont_destroy_basement_nodes,
                        recurse_if_child_is_gorged,
                        default_merge_child,
306
                        flt_update_status,
Leif Walsh's avatar
Leif Walsh committed
307 308 309 310 311
                        default_pick_child_after_split,
                        fste);
}

struct ctm_extra {
Yoni Fogel's avatar
Yoni Fogel committed
312
    bool is_last_child;
Leif Walsh's avatar
Leif Walsh committed
313 314 315 316
    DBT target_key;
};

static int
317 318
ctm_pick_child(FT h,
               FTNODE parent,
Leif Walsh's avatar
Leif Walsh committed
319 320
               void* extra)
{
321
    struct ctm_extra* ctme = (struct ctm_extra *) extra;
Leif Walsh's avatar
Leif Walsh committed
322 323 324 325 326
    int childnum;
    if (parent->height == 1 && ctme->is_last_child) {
        childnum = parent->n_children - 1;
    }
    else {
327
        childnum = toku_ftnode_which_child(
Leif Walsh's avatar
Leif Walsh committed
328 329
            parent,
            &ctme->target_key,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
330
            &h->cmp_descriptor,
Leif Walsh's avatar
Leif Walsh committed
331 332 333 334 335 336 337
            h->compare_fun);
    }
    return childnum;
}

static void
ctm_update_status(
338
    FTNODE UU(child),
Leif Walsh's avatar
Leif Walsh committed
339 340 341 342
    int dirtied,
    void* UU(extra)
    )
{
343
    STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_DIRTIED_FOR_LEAF_MERGE) += dirtied;
Leif Walsh's avatar
Leif Walsh committed
344 345
}

346 347
static void
ctm_maybe_merge_child(struct flusher_advice *fa,
348 349
                      FT h,
                      FTNODE parent,
350
                      int childnum,
351
                      FTNODE child,
352 353 354
                      void *extra)
{
    if (child->height == 0) {
355
        (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1);
356
    }
357
    default_merge_child(fa, h, parent, childnum, child, extra);
358 359
}

Leif Walsh's avatar
Leif Walsh committed
360 361
static void
ct_maybe_merge_child(struct flusher_advice *fa,
362 363
                     FT h,
                     FTNODE parent,
Leif Walsh's avatar
Leif Walsh committed
364
                     int childnum,
365
                     FTNODE child,
Leif Walsh's avatar
Leif Walsh committed
366 367 368 369 370 371 372
                     void* extra)
{
    if (child->height > 0) {
        default_merge_child(fa, h, parent, childnum, child, extra);
    }
    else {
        struct ctm_extra ctme;
373
        paranoid_invariant(parent->n_children > 1);
Leif Walsh's avatar
Leif Walsh committed
374 375 376 377 378 379 380 381 382 383
        int pivot_to_save;
        //
        // we have two cases, one where the childnum
        // is the last child, and therefore the pivot we
        // save is not of the pivot which we wish to descend
        // and another where it is not the last child,
        // so the pivot is sufficient for identifying the leaf
        // to be merged
        //
        if (childnum == (parent->n_children - 1)) {
Yoni Fogel's avatar
Yoni Fogel committed
384
            ctme.is_last_child = true;
Leif Walsh's avatar
Leif Walsh committed
385 386 387
            pivot_to_save = childnum - 1;
        }
        else {
Yoni Fogel's avatar
Yoni Fogel committed
388
            ctme.is_last_child = false;
Leif Walsh's avatar
Leif Walsh committed
389 390
            pivot_to_save = childnum;
        }
391 392
        const DBT *pivot = &parent->childkeys[pivot_to_save];
        toku_clone_dbt(&ctme.target_key, *pivot);
Leif Walsh's avatar
Leif Walsh committed
393 394 395 396 397 398 399 400

        // at this point, ctme is properly setup, now we can do the merge
        struct flusher_advice new_fa;
        flusher_advice_init(
            &new_fa,
            ctm_pick_child,
            dont_destroy_basement_nodes,
            always_recursively_flush,
401
            ctm_maybe_merge_child,
Leif Walsh's avatar
Leif Walsh committed
402 403 404 405
            ctm_update_status,
            default_pick_child_after_split,
            &ctme);

406 407
        toku_unpin_ftnode_off_client_thread(h, parent);
        toku_unpin_ftnode_off_client_thread(h, child);
Leif Walsh's avatar
Leif Walsh committed
408

409
        FTNODE root_node = NULL;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
410
        {
Yoni Fogel's avatar
Yoni Fogel committed
411
            uint32_t fullhash;
412 413
            CACHEKEY root;
            toku_calculate_root_offset_pointer(h, &root, &fullhash);
414
            struct ftnode_fetch_extra bfe;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
415
            fill_bfe_for_full_read(&bfe, h);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
416
            toku_pin_ftnode_off_client_thread(h, root, fullhash, &bfe, PL_WRITE_EXPENSIVE, 0, NULL, &root_node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
417 418
            toku_assert_entire_node_in_memory(root_node);
        }
Leif Walsh's avatar
Leif Walsh committed
419

420 421
        (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
        (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
422

Leif Walsh's avatar
Leif Walsh committed
423 424
        flush_some_child(h, root_node, &new_fa);

425
        (void) toku_sync_fetch_and_sub(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
426

427
        toku_destroy_dbt(&ctme.target_key);
Leif Walsh's avatar
Leif Walsh committed
428 429 430 431
    }
}

static void
432
ct_update_status(FTNODE child,
Leif Walsh's avatar
Leif Walsh committed
433 434 435
                 int dirtied,
                 void* extra)
{
436
    struct flush_status_update_extra* fste = (struct flush_status_update_extra *) extra;
Leif Walsh's avatar
Leif Walsh committed
437
    update_flush_status(child, fste->cascades);
438
    STATUS_VALUE(FT_FLUSHER_CLEANER_NODES_DIRTIED) += dirtied;
Leif Walsh's avatar
Leif Walsh committed
439 440 441 442 443
    // Incrementing this in case `flush_some_child` decides to recurse.
    fste->cascades++;
}

static void
Zardosht Kasheff's avatar
Zardosht Kasheff committed
444
ct_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra* fste, uint32_t nodesize)
Leif Walsh's avatar
Leif Walsh committed
445 446
{
    fste->cascades = 0;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
447
    fste->nodesize = nodesize;
Leif Walsh's avatar
Leif Walsh committed
448 449 450 451 452 453 454 455 456
    flusher_advice_init(fa,
                        pick_heaviest_child,
                        do_destroy_basement_nodes,
                        recurse_if_child_is_gorged,
                        ct_maybe_merge_child,
                        ct_update_status,
                        default_pick_child_after_split,
                        fste);
}
457 458 459 460 461 462 463 464

//
// This returns true if the node MAY be reactive,
// false is we are absolutely sure that it is NOT reactive.
// The reason for inaccuracy is that the node may be
// a leaf node that is not entirely in memory. If so, then
// we cannot be sure if the node is reactive.
//
465
static bool may_node_be_reactive(FTNODE node)
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
{
    if (node->height == 0) return true;
    else {
        return (get_nonleaf_reactivity(node) != RE_STABLE);
    }
}

/* NODE is a node with a child.
 * childnum was split into two nodes childa, and childb.  childa is the same as the original child.  childb is a new child.
 * We must slide things around, & move things from the old table to the new tables.
 * Requires: the CHILDNUMth buffer of node is empty.
 * We don't push anything down to children.  We split the node, and things land wherever they land.
 * We must delete the old buffer (but the old child is already deleted.)
 * On return, the new children and node STAY PINNED.
 */
static void
handle_split_of_child(
483
    FTNODE node,
484
    int childnum,
485 486
    FTNODE childa,
    FTNODE childb,
487 488 489
    DBT *splitk /* the data in the childsplitk is alloc'd and is consumed by this call. */
    )
{
490 491 492
    paranoid_invariant(node->height>0);
    paranoid_invariant(0 <= childnum);
    paranoid_invariant(childnum < node->n_children);
493 494 495
    toku_assert_entire_node_in_memory(node);
    toku_assert_entire_node_in_memory(childa);
    toku_assert_entire_node_in_memory(childb);
496 497
    NONLEAF_CHILDINFO old_bnc = BNC(node, childnum);
    paranoid_invariant(toku_bnc_nbytesinbuf(old_bnc)==0);
498 499
    int cnum;
    WHEN_NOT_GCOV(
500
    if (toku_ft_debug_mode) {
501 502 503
        int i;
        printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
        printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
504
        for(i=0; i<node->n_children-1; i++) printf(" %s", (char *) node->childkeys[i].data);
505 506 507 508
        printf("\n");
    }
                 )

509
    node->dirty = 1;
510 511 512 513 514 515 516 517 518 519 520 521 522

    XREALLOC_N(node->n_children+1, node->bp);
    XREALLOC_N(node->n_children, node->childkeys);
    // Slide the children over.
    // suppose n_children is 10 and childnum is 5, meaning node->childnum[5] just got split
    // this moves node->bp[6] through node->bp[9] over to
    // node->bp[7] through node->bp[10]
    for (cnum=node->n_children; cnum>childnum+1; cnum--) {
        node->bp[cnum] = node->bp[cnum-1];
    }
    memset(&node->bp[childnum+1],0,sizeof(node->bp[0]));
    node->n_children++;

523
    paranoid_invariant(BP_BLOCKNUM(node, childnum).b==childa->thisnodename.b); // use the same child
524 525 526 527 528

    BP_BLOCKNUM(node, childnum+1) = childb->thisnodename;
    BP_WORKDONE(node, childnum+1)  = 0;
    BP_STATE(node,childnum+1) = PT_AVAIL;

529 530 531 532 533 534 535 536
    NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
    for (unsigned int i = 0; i < (sizeof new_bnc->flow) / (sizeof new_bnc->flow[0]); ++i) {
        // just split the flows in half for now, can't guess much better
        // at the moment
        new_bnc->flow[i] = old_bnc->flow[i] / 2;
        old_bnc->flow[i] = (old_bnc->flow[i] + 1) / 2;
    }
    set_BNC(node, childnum+1, new_bnc);
537 538 539 540

    // Slide the keys over
    {
        for (cnum=node->n_children-2; cnum>childnum; cnum--) {
541
            toku_copy_dbt(&node->childkeys[cnum], node->childkeys[cnum-1]);
542 543
        }
        //if (logger) assert((t->flags&TOKU_DB_DUPSORT)==0); // the setpivot is wrong for TOKU_DB_DUPSORT, so recovery will be broken.
544
        toku_copy_dbt(&node->childkeys[childnum], *splitk);
545
        node->totalchildkeylens += splitk->size;
546 547 548
    }

    WHEN_NOT_GCOV(
549
    if (toku_ft_debug_mode) {
550 551
        int i;
        printf("%s:%d splitkeys:", __FILE__, __LINE__);
552
        for(i=0; i<node->n_children-2; i++) printf(" %s", (char*)node->childkeys[i].data);
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
        printf("\n");
    }
                 )

    /* Keep pushing to the children, but not if the children would require a pushdown */
    toku_assert_entire_node_in_memory(node);
    toku_assert_entire_node_in_memory(childa);
    toku_assert_entire_node_in_memory(childb);

    VERIFY_NODE(t, node);
    VERIFY_NODE(t, childa);
    VERIFY_NODE(t, childb);
}

static int
568
UU() verify_in_mempool(OMTVALUE lev, uint32_t UU(idx), void *mpv)
569
{
570 571 572 573
    LEAFENTRY CAST_FROM_VOIDP(le, lev);
    struct mempool *CAST_FROM_VOIDP(mp, mpv);
    int r = toku_mempool_inrange(mp, le, leafentry_memsize(le));
    lazy_assert(r);
574 575 576 577
    return 0;
}

static void
578
verify_all_in_mempool(FTNODE UU() node)
579
{
580
#ifdef TOKU_DEBUG_PARANOID
581 582 583 584 585 586 587
    if (node->height==0) {
        for (int i = 0; i < node->n_children; i++) {
            invariant(BP_STATE(node,i) == PT_AVAIL);
            BASEMENTNODE bn = BLB(node, i);
            toku_omt_iterate(bn->buffer, verify_in_mempool, &bn->buffer_mempool);
        }
    }
588
#endif
589 590
}

Yoni Fogel's avatar
Yoni Fogel committed
591
static uint64_t
592
ftleaf_disk_size(FTNODE node)
593 594
// Effect: get the disk size of a leafentry
{
595
    paranoid_invariant(node->height == 0);
596
    toku_assert_entire_node_in_memory(node);
597 598
    uint64_t retval = 0;
    for (int i = 0; i < node->n_children; i++) {
599
        OMT curr_buffer = BLB_BUFFER(node, i);
600 601
        const uint32_t n_leafentries = toku_omt_size(curr_buffer);
        for (uint32_t j=0; j < n_leafentries; j++) {
602 603 604
            OMTVALUE v;
            int r = toku_omt_fetch(curr_buffer, j, &v);
            assert_zero(r);
605
            LEAFENTRY CAST_FROM_VOIDP(curr_le, v);
606 607 608 609 610 611 612
            retval += leafentry_disksize(curr_le);
        }
    }
    return retval;
}

static void
613 614
ftleaf_get_split_loc(
    FTNODE node,
615 616 617
    enum split_mode split_mode,
    int *num_left_bns,   // which basement within leaf
    int *num_left_les    // which key within basement
618 619
    )
// Effect: Find the location within a leaf node where we want to perform a split
620 621
// num_left_bns is how many basement nodes (which OMT) should be split to the left.
// num_left_les is how many leafentries in OMT of the last bn should be on the left side of the split.
622
{
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668
    switch (split_mode) {
    case SPLIT_LEFT_HEAVY: {
        *num_left_bns = node->n_children;
        *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1));
        if (*num_left_les == 0) {
            *num_left_bns = node->n_children - 1;
            *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1));
        }
        goto exit;
    }
    case SPLIT_RIGHT_HEAVY: {
        *num_left_bns = 1;
        *num_left_les = 1;
        goto exit;
    }
    case SPLIT_EVENLY: {
        paranoid_invariant(node->height == 0);
        // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice
        uint64_t sumlesizes = ftleaf_disk_size(node);
        uint32_t size_so_far = 0;
        for (int i = 0; i < node->n_children; i++) {
            OMT curr_buffer = BLB_BUFFER(node, i);
            uint32_t n_leafentries = toku_omt_size(curr_buffer);
            for (uint32_t j=0; j < n_leafentries; j++) {
                OMTVALUE lev;
                int r = toku_omt_fetch(curr_buffer, j, &lev);
                assert_zero(r);
                LEAFENTRY CAST_FROM_VOIDP(curr_le, lev);
                size_so_far += leafentry_disksize(curr_le);
                if (size_so_far >= sumlesizes/2) {
                    *num_left_bns = i + 1;
                    *num_left_les = j + 1;
                    if (*num_left_bns == node->n_children &&
                        (unsigned int) *num_left_les == n_leafentries) {
                        // need to correct for when we're splitting after the
                        // last element, that makes no sense
                        if (*num_left_les > 1) {
                            (*num_left_les)--;
                        } else if (*num_left_bns > 1) {
                            (*num_left_bns)--;
                            *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1));
                        } else {
                            // we are trying to split a leaf with only one
                            // leafentry in it
                            abort();
                        }
669
                    }
670
                    goto exit;
671 672 673 674
                }
            }
        }
    }
675 676
    }
    abort();
677 678 679 680 681
exit:
    return;
}

// TODO: (Zardosht) possibly get rid of this function and use toku_omt_split_at in
682
// ftleaf_split
683 684 685 686
static void
move_leafentries(
    BASEMENTNODE dest_bn,
    BASEMENTNODE src_bn,
687 688 689
    uint32_t lbi, //lower bound inclusive
    uint32_t ube, //upper bound exclusive
    uint32_t* num_bytes_moved
690 691 692
    )
//Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt
{
693
    paranoid_invariant(lbi < ube);
694 695 696 697 698 699 700
    OMTVALUE *XMALLOC_N(ube-lbi, newleafpointers);    // create new omt

    size_t mpsize = toku_mempool_get_used_space(&src_bn->buffer_mempool);   // overkill, but safe
    struct mempool *dest_mp = &dest_bn->buffer_mempool;
    struct mempool *src_mp  = &src_bn->buffer_mempool;
    toku_mempool_construct(dest_mp, mpsize);

701
    uint32_t i = 0;
702 703
    *num_bytes_moved = 0;
    for (i = lbi; i < ube; i++) {
704 705 706
        OMTVALUE lev;
        toku_omt_fetch(src_bn->buffer, i, &lev);
        LEAFENTRY CAST_FROM_VOIDP(curr_le, lev);
707 708
        size_t le_size = leafentry_memsize(curr_le);
        *num_bytes_moved += leafentry_disksize(curr_le);
709
        LEAFENTRY CAST_FROM_VOIDP(new_le, toku_mempool_malloc(dest_mp, le_size, 1));
710 711 712 713 714
        memcpy(new_le, curr_le, le_size);
        newleafpointers[i-lbi] = new_le;
        toku_mempool_mfree(src_mp, curr_le, le_size);
    }

715
    toku_omt_create_steal_sorted_array(&dest_bn->buffer, &newleafpointers, ube-lbi, ube-lbi);
716 717
    // now remove the elements from src_omt
    for (i=ube-1; i >= lbi; i--) {
718
        toku_omt_delete_at(src_bn->buffer, i);
719 720 721 722
    }
}

void
723 724 725 726 727
ftleaf_split(
    FT h,
    FTNODE node,
    FTNODE *nodea,
    FTNODE *nodeb,
728
    DBT *splitk,
Yoni Fogel's avatar
Yoni Fogel committed
729
    bool create_new_node,
730
    enum split_mode split_mode,
Yoni Fogel's avatar
Yoni Fogel committed
731
    uint32_t num_dependent_nodes,
732
    FTNODE* dependent_nodes)
733 734 735 736 737 738
// Effect: Split a leaf node.
// Argument "node" is node to be split.
// Upon return:
//   nodea and nodeb point to new nodes that result from split of "node"
//   nodea is the left node that results from the split
//   splitk is the right-most key of nodea
739 740
{

741
    paranoid_invariant(node->height == 0);
742
    STATUS_VALUE(FT_FLUSHER_SPLIT_LEAF)++;
743
    if (node->n_children) {
744 745 746 747 748 749 750 751 752
        // First move all the accumulated stat64info deltas into the first basement.
        // After the split, either both nodes or neither node will be included in the next checkpoint.
        // The accumulated stats in the dictionary will be correct in either case.
        // By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain
        // correct information for a basement that is divided between two leafnodes (i.e. when split is
        // not on a basement boundary).
        STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node);
        BASEMENTNODE bn = BLB(node,0);
        bn->stat64_delta = delta_for_leafnode;
753
    }
754

755

756
    FTNODE B;
Yoni Fogel's avatar
Yoni Fogel committed
757
    uint32_t fullhash;
758
    BLOCKNUM name;
759

760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782
    if (create_new_node) {
        // put value in cachetable and do checkpointing
        // of dependent nodes
        //
        // We do this here, before evaluating the last_bn_on_left
        // and last_le_on_left_within_bn because this operation
        // may write to disk the dependent nodes.
        // While doing so, we may rebalance the leaf node
        // we are splitting, thereby invalidating the
        // values of last_bn_on_left and last_le_on_left_within_bn.
        // So, we must call this before evaluating
        // those two values
        cachetable_put_empty_node_with_dep_nodes(
            h,
            num_dependent_nodes,
            dependent_nodes,
            &name,
            &fullhash,
            &B
            );
    }


783
    paranoid_invariant(node->height==0);
784 785 786 787
    toku_assert_entire_node_in_memory(node);
    verify_all_in_mempool(node);
    MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;

788 789 790 791 792 793
    // variables that say where we will do the split.
    // After the split, there will be num_left_bns basement nodes in the left node,
    // and the last basement node in the left node will have num_left_les leafentries.
    int num_left_bns;
    int num_left_les;
    ftleaf_get_split_loc(node, split_mode, &num_left_bns, &num_left_les);
794 795
    {
        // did we split right on the boundary between basement nodes?
796
        const bool split_on_boundary = (num_left_les == 0) || (num_left_les == (int) toku_omt_size(BLB_BUFFER(node, num_left_bns - 1)));
797 798 799 800 801 802 803 804 805 806 807 808
        // Now we know where we are going to break it
        // the two nodes will have a total of n_children+1 basement nodes
        // and n_children-1 pivots
        // the left node, node, will have last_bn_on_left+1 basement nodes
        // the right node, B, will have n_children-last_bn_on_left basement nodes
        // the pivots of node will be the first last_bn_on_left pivots that originally exist
        // the pivots of B will be the last (n_children - 1 - last_bn_on_left) pivots that originally exist

        // Note: The basements will not be rebalanced.  Only the mempool of the basement that is split
        //       (if split_on_boundary is false) will be affected.  All other mempools will remain intact. ???

        //set up the basement nodes in the new node
809 810 811 812 813 814 815 816 817 818 819
        int num_children_in_node = num_left_bns;
        // In the SPLIT_RIGHT_HEAVY case, we need to add 1 back because
        // while it's not on the boundary, we do need node->n_children
        // children in B.
        int num_children_in_b = node->n_children - num_left_bns + (!split_on_boundary ? 1 : 0);
        if (num_children_in_b == 0) {
            // for uneven split, make sure we have at least 1 bn
            paranoid_invariant(split_mode == SPLIT_LEFT_HEAVY);
            num_children_in_b = 1;
        }
        paranoid_invariant(num_children_in_node > 0);
820
        if (create_new_node) {
821
            toku_initialize_empty_ftnode(
822 823 824 825
                B,
                name,
                0,
                num_children_in_b,
826 827
                h->h->layout_version,
                h->h->flags);
828 829 830 831 832 833 834 835 836 837 838 839 840 841
            B->fullhash = fullhash;
        }
        else {
            B = *nodeb;
            REALLOC_N(num_children_in_b-1, B->childkeys);
            REALLOC_N(num_children_in_b,   B->bp);
            B->n_children = num_children_in_b;
            for (int i = 0; i < num_children_in_b; i++) {
                BP_BLOCKNUM(B,i).b = 0;
                BP_STATE(B,i) = PT_AVAIL;
                BP_WORKDONE(B,i) = 0;
                set_BLB(B, i, toku_create_empty_bn());
            }
        }
842 843

        // now move all the data
844

845
        int curr_src_bn_index = num_left_bns - 1;
846 847 848 849 850
        int curr_dest_bn_index = 0;

        // handle the move of a subset of data in last_bn_on_left from node to B
        if (!split_on_boundary) {
            BP_STATE(B,curr_dest_bn_index) = PT_AVAIL;
Yoni Fogel's avatar
Yoni Fogel committed
851
            uint32_t diff_size = 0;
852
            destroy_basement_node(BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array
853 854 855 856
            set_BNULL(B, curr_dest_bn_index);
            set_BLB(B, curr_dest_bn_index, toku_create_empty_bn_no_buffer());
            move_leafentries(BLB(B, curr_dest_bn_index),
                             BLB(node, curr_src_bn_index),
857
                             num_left_les,         // first row to be moved to B
858 859 860 861 862 863 864 865 866
                             toku_omt_size(BLB_BUFFER(node, curr_src_bn_index)),    // number of rows in basement to be split
                             &diff_size);
            BLB_MAX_MSN_APPLIED(B, curr_dest_bn_index) = BLB_MAX_MSN_APPLIED(node, curr_src_bn_index);
            BLB_NBYTESINBUF(node, curr_src_bn_index) -= diff_size;
            BLB_NBYTESINBUF(B, curr_dest_bn_index) += diff_size;
            curr_dest_bn_index++;
        }
        curr_src_bn_index++;

867 868 869
        paranoid_invariant(B->n_children >= curr_dest_bn_index);
        paranoid_invariant(node->n_children >= curr_src_bn_index);

870 871 872 873 874 875
        // move the rest of the basement nodes
        for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) {
            destroy_basement_node(BLB(B, curr_dest_bn_index));
            set_BNULL(B, curr_dest_bn_index);
            B->bp[curr_dest_bn_index] = node->bp[curr_src_bn_index];
        }
876 877 878 879 880
        if (curr_dest_bn_index < B->n_children) {
            // B already has an empty basement node here.
            BP_STATE(B, curr_dest_bn_index) = PT_AVAIL;
        }

881 882 883 884 885 886
        //
        // now handle the pivots
        //

        // the child index in the original node that corresponds to the
        // first node in the right node of the split
887
        int base_index = num_left_bns - (split_on_boundary ? 0 : 1);
888 889
        // make pivots in B
        for (int i=0; i < num_children_in_b-1; i++) {
890
            toku_copy_dbt(&B->childkeys[i], node->childkeys[i+base_index]);
891 892 893
            B->totalchildkeylens += node->childkeys[i+base_index].size;
            node->totalchildkeylens -= node->childkeys[i+base_index].size;
            toku_init_dbt(&node->childkeys[i+base_index]);
894
        }
895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910
        if (split_on_boundary && num_left_bns < node->n_children) {
            if (splitk) {
                toku_copy_dbt(splitk, node->childkeys[num_left_bns - 1]);
            } else {
                toku_destroy_dbt(&node->childkeys[num_left_bns - 1]);
            }
        } else if (splitk) {
            OMTVALUE lev;
            OMT buffer = BLB_BUFFER(node, num_left_bns - 1);
            int r = toku_omt_fetch(buffer, toku_omt_size(buffer) - 1, &lev);
            assert_zero(r); // that fetch should have worked.
            LEAFENTRY CAST_FROM_VOIDP(le, lev);
            uint32_t keylen;
            void *key = le_key_and_len(le, &keylen);
            toku_fill_dbt(splitk, toku_xmemdup(key, keylen), keylen);
            splitk->flags = DB_DBT_MALLOC;
911
        }
912 913

        node->n_children = num_children_in_node;
914 915 916 917
        REALLOC_N(num_children_in_node, node->bp);
        REALLOC_N(num_children_in_node-1, node->childkeys);
    }

918 919 920
    verify_all_in_mempool(node);
    verify_all_in_mempool(B);

921 922 923
    node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
    B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;

924 925
    node->dirty = 1;
    B->dirty = 1;
926 927 928 929

    *nodea = node;
    *nodeb = B;

930
}    // end of ftleaf_split()
931 932

void
933 934 935 936 937
ft_nonleaf_split(
    FT h,
    FTNODE node,
    FTNODE *nodea,
    FTNODE *nodeb,
938
    DBT *splitk,
Yoni Fogel's avatar
Yoni Fogel committed
939
    uint32_t num_dependent_nodes,
940
    FTNODE* dependent_nodes)
941 942
{
    //VERIFY_NODE(t,node);
943
    STATUS_VALUE(FT_FLUSHER_SPLIT_NONLEAF)++;
944 945 946 947 948
    toku_assert_entire_node_in_memory(node);
    int old_n_children = node->n_children;
    int n_children_in_a = old_n_children/2;
    int n_children_in_b = old_n_children-n_children_in_a;
    MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
949
    FTNODE B;
950 951
    paranoid_invariant(node->height>0);
    paranoid_invariant(node->n_children>=2); // Otherwise, how do we split?	 We need at least two children to split. */
952
    create_new_ftnode_with_dep_nodes(h, &B, node->height, n_children_in_b, num_dependent_nodes, dependent_nodes);
953 954 955 956 957 958 959 960 961
    {
        /* The first n_children_in_a go into node a.
         * That means that the first n_children_in_a-1 keys go into node a.
         * The splitter key is key number n_children_in_a */
        int i;

        for (i=n_children_in_a; i<old_n_children; i++) {
            int targchild = i-n_children_in_a;
            // TODO: Figure out better way to handle this
962
            // the problem is that create_new_ftnode_with_dep_nodes for B creates
963 964 965 966 967 968 969 970 971 972 973
            // all the data structures, whereas we really don't want it to fill
            // in anything for the bp's.
            // Now we have to go free what it just created so we can
            // slide the bp over
            destroy_nonleaf_childinfo(BNC(B, targchild));
            // now move the bp over
            B->bp[targchild] = node->bp[i];
            memset(&node->bp[i], 0, sizeof(node->bp[0]));

            // Delete a child, removing the preceeding pivot key.  The child number must be > 0
            {
974
                paranoid_invariant(i>0);
975
                if (i>n_children_in_a) {
976
                    toku_copy_dbt(&B->childkeys[targchild-1], node->childkeys[i-1]);
977 978 979
                    B->totalchildkeylens += node->childkeys[i-1].size;
                    node->totalchildkeylens -= node->childkeys[i-1].size;
                    toku_init_dbt(&node->childkeys[i-1]);
980 981 982 983 984 985
                }
            }
        }

        node->n_children=n_children_in_a;

986
        toku_copy_dbt(splitk, node->childkeys[n_children_in_a-1]);
987
        node->totalchildkeylens -= node->childkeys[n_children_in_a-1].size;
988 989 990 991 992 993 994 995

        REALLOC_N(n_children_in_a,   node->bp);
        REALLOC_N(n_children_in_a-1, node->childkeys);
    }

    node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
    B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;

996
    node->dirty = 1;
997
    B->dirty = 1;
998 999 1000 1001 1002 1003 1004 1005 1006
    toku_assert_entire_node_in_memory(node);
    toku_assert_entire_node_in_memory(B);
    //VERIFY_NODE(t,node);
    //VERIFY_NODE(t,B);
    *nodea = node;
    *nodeb = B;
}

//
1007
// responsibility of ft_split_child is to take locked FTNODEs node and child
1008 1009 1010 1011 1012 1013 1014
// and do the following:
//  - split child,
//  - fix node,
//  - release lock on node
//  - possibly flush either new children created from split, otherwise unlock children
//
static void
1015 1016 1017
ft_split_child(
    FT h,
    FTNODE node,
1018
    int childnum,
1019
    FTNODE child,
1020
    enum split_mode split_mode,
Leif Walsh's avatar
Leif Walsh committed
1021
    struct flusher_advice *fa)
1022
{
1023 1024
    paranoid_invariant(node->height>0);
    paranoid_invariant(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
1025
    FTNODE nodea, nodeb;
1026 1027 1028
    DBT splitk;

    // for test
1029
    call_flusher_thread_callback(flt_flush_before_split);
1030

1031
    FTNODE dep_nodes[2];
1032 1033 1034
    dep_nodes[0] = node;
    dep_nodes[1] = child;
    if (child->height==0) {
1035
        ftleaf_split(h, child, &nodea, &nodeb, &splitk, true, split_mode, 2, dep_nodes);
1036
    } else {
1037
        ft_nonleaf_split(h, child, &nodea, &nodeb, &splitk, 2, dep_nodes);
1038 1039 1040 1041 1042
    }
    // printf("%s:%d child did split\n", __FILE__, __LINE__);
    handle_split_of_child (node, childnum, nodea, nodeb, &splitk);

    // for test
1043
    call_flusher_thread_callback(flt_flush_during_split);
1044 1045 1046 1047 1048

    // at this point, the split is complete
    // now we need to unlock node,
    // and possibly continue
    // flushing one of the children
Leif Walsh's avatar
Leif Walsh committed
1049
    int picked_child = fa->pick_child_after_split(h, node, childnum, childnum + 1, fa->extra);
1050
    toku_unpin_ftnode_off_client_thread(h, node);
Leif Walsh's avatar
Leif Walsh committed
1051 1052
    if (picked_child == childnum ||
        (picked_child < 0 && nodea->height > 0 && fa->should_recursively_flush(nodea, fa->extra))) {
1053
        toku_unpin_ftnode_off_client_thread(h, nodeb);
Leif Walsh's avatar
Leif Walsh committed
1054
        flush_some_child(h, nodea, fa);
1055
    }
Leif Walsh's avatar
Leif Walsh committed
1056 1057
    else if (picked_child == childnum + 1 ||
             (picked_child < 0 && nodeb->height > 0 && fa->should_recursively_flush(nodeb, fa->extra))) {
1058
        toku_unpin_ftnode_off_client_thread(h, nodea);
Leif Walsh's avatar
Leif Walsh committed
1059
        flush_some_child(h, nodeb, fa);
1060 1061
    }
    else {
1062 1063
        toku_unpin_ftnode_off_client_thread(h, nodea);
        toku_unpin_ftnode_off_client_thread(h, nodeb);
1064 1065 1066 1067 1068
    }
}

static void
flush_this_child(
1069 1070 1071
    FT h,
    FTNODE node,
    FTNODE child,
1072
    int childnum,
Leif Walsh's avatar
Leif Walsh committed
1073
    struct flusher_advice *fa)
1074 1075
// Effect: Push everything in the CHILDNUMth buffer of node down into the child.
{
Leif Walsh's avatar
Leif Walsh committed
1076
    update_flush_status(child, 0);
1077
    toku_assert_entire_node_in_memory(node);
Leif Walsh's avatar
Leif Walsh committed
1078
    if (fa->should_destroy_basement_nodes(fa)) {
1079
        maybe_destroy_child_blbs(node, child, h);
1080 1081 1082
    }
    bring_node_fully_into_memory(child, h);
    toku_assert_entire_node_in_memory(child);
1083 1084
    paranoid_invariant(node->height>0);
    paranoid_invariant(child->thisnodename.b!=0);
1085 1086
    // VERIFY_NODE does not work off client thread as of now
    //VERIFY_NODE(t, child);
1087 1088
    node->dirty = 1;
    child->dirty = 1;
1089 1090 1091 1092 1093 1094

    BP_WORKDONE(node, childnum) = 0;  // this buffer is drained, no work has been done by its contents
    NONLEAF_CHILDINFO bnc = BNC(node, childnum);
    set_BNC(node, childnum, toku_create_empty_nl());

    // now we have a bnc to flush to the child
1095
    toku_bnc_flush_to_child(h, bnc, child);
1096 1097 1098 1099
    destroy_nonleaf_childinfo(bnc);
}

static void
1100
merge_leaf_nodes(FTNODE a, FTNODE b)
1101
{
1102
    STATUS_VALUE(FT_FLUSHER_MERGE_LEAF)++;
1103 1104
    toku_assert_entire_node_in_memory(a);
    toku_assert_entire_node_in_memory(b);
1105 1106 1107 1108
    paranoid_invariant(a->height == 0);
    paranoid_invariant(b->height == 0);
    paranoid_invariant(a->n_children > 0);
    paranoid_invariant(b->n_children > 0);
1109

1110 1111 1112 1113
    // Mark nodes as dirty before moving basements from b to a.
    // This way, whatever deltas are accumulated in the basements are
    // applied to the in_memory_stats in the header if they have not already
    // been (if nodes are clean).
1114 1115 1116 1117 1118
    // TODO(leif): this is no longer the way in_memory_stats is
    // maintained. verify that it's ok to move this just before the unpin
    // and then do that.
    a->dirty = 1;
    b->dirty = 1;
1119

1120
    OMT a_last_buffer = BLB_BUFFER(a, a->n_children-1);
Yoni Fogel's avatar
Yoni Fogel committed
1121
    // this bool states if the last basement node in a has any items or not
1122 1123
    // If it does, then it stays in the merge. If it does not, the last basement node
    // of a gets eliminated because we do not have a pivot to store for it (because it has no elements)
1124
    const bool a_has_tail = toku_omt_size(a_last_buffer) > 0;
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150

    // move each basement node from b to a
    // move the pivots, adding one of what used to be max(a)
    // move the estimates
    int num_children = a->n_children + b->n_children;
    if (!a_has_tail) {
        uint lastchild = a->n_children-1;
        BASEMENTNODE bn = BLB(a, lastchild);
        {
            // verify that last basement in a is empty, then destroy mempool
            struct mempool * mp = &bn->buffer_mempool;
            size_t used_space = toku_mempool_get_used_space(mp);
            invariant_zero(used_space);
            toku_mempool_destroy(mp);
        }
        destroy_basement_node(bn);
        set_BNULL(a, a->n_children-1);
        num_children--;
    }

    //realloc pivots and basement nodes in a
    REALLOC_N(num_children, a->bp);
    REALLOC_N(num_children-1, a->childkeys);

    // fill in pivot for what used to be max of node 'a', if it is needed
    if (a_has_tail) {
1151 1152 1153
        OMTVALUE lev;
        toku_omt_fetch(a_last_buffer, toku_omt_size(a_last_buffer) - 1, &lev);
        LEAFENTRY CAST_FROM_VOIDP(le, lev);
1154 1155 1156 1157
        uint32_t keylen;
        void *key = le_key_and_len(le, &keylen);
        toku_fill_dbt(&a->childkeys[a->n_children-1], toku_xmemdup(key, keylen), keylen);
        a->totalchildkeylens += keylen;
1158 1159
    }

Yoni Fogel's avatar
Yoni Fogel committed
1160
    uint32_t offset = a_has_tail ? a->n_children : a->n_children - 1;
1161 1162 1163 1164
    for (int i = 0; i < b->n_children; i++) {
        a->bp[i+offset] = b->bp[i];
        memset(&b->bp[i],0,sizeof(b->bp[0]));
        if (i < (b->n_children-1)) {
1165
            toku_copy_dbt(&a->childkeys[i+offset], b->childkeys[i]);
1166
            toku_init_dbt(&b->childkeys[i]);
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177
        }
    }
    a->totalchildkeylens += b->totalchildkeylens;
    a->n_children = num_children;

    // now that all the data has been moved from b to a, we can destroy the data in b
    // b can remain untouched, as it will be destroyed later
    b->totalchildkeylens = 0;
    b->n_children = 0;
}

1178
static void balance_leaf_nodes(
1179 1180
    FTNODE a,
    FTNODE b,
1181
    DBT *splitk)
1182 1183 1184 1185
// Effect:
//  If b is bigger then move stuff from b to a until b is the smaller.
//  If a is bigger then move stuff from a to b until a is the smaller.
{
1186
    STATUS_VALUE(FT_FLUSHER_BALANCE_LEAF)++;
1187
    // first merge all the data into a
Leif Walsh's avatar
Leif Walsh committed
1188
    merge_leaf_nodes(a,b);
1189 1190
    // now split them
    // because we are not creating a new node, we can pass in no dependent nodes
1191
    ftleaf_split(NULL, a, &a, &b, splitk, false, SPLIT_EVENLY, 0, NULL);
1192 1193 1194 1195
}

static void
maybe_merge_pinned_leaf_nodes(
1196 1197
    FTNODE a,
    FTNODE b,
1198
    DBT *parent_splitk,
Yoni Fogel's avatar
Yoni Fogel committed
1199 1200
    bool *did_merge,
    bool *did_rebalance,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1201 1202 1203
    DBT *splitk,
    uint32_t nodesize
    )
Yoni Fogel's avatar
Yoni Fogel committed
1204
// Effect: Either merge a and b into one one node (merge them into a) and set *did_merge = true.
1205
//	   (We do this if the resulting node is not fissible)
Yoni Fogel's avatar
Yoni Fogel committed
1206
//	   or distribute the leafentries evenly between a and b, and set *did_rebalance = true.
1207 1208
//	   (If a and be are already evenly distributed, we may do nothing.)
{
1209 1210
    unsigned int sizea = toku_serialize_ftnode_size(a);
    unsigned int sizeb = toku_serialize_ftnode_size(b);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1211
    if ((sizea + sizeb)*4 > (nodesize*3)) {
1212
        // the combined size is more than 3/4 of a node, so don't merge them.
Yoni Fogel's avatar
Yoni Fogel committed
1213
        *did_merge = false;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1214
        if (sizea*4 > nodesize && sizeb*4 > nodesize) {
1215
            // no need to do anything if both are more than 1/4 of a node.
Yoni Fogel's avatar
Yoni Fogel committed
1216
            *did_rebalance = false;
1217
            toku_clone_dbt(splitk, *parent_splitk);
1218 1219 1220
            return;
        }
        // one is less than 1/4 of a node, and together they are more than 3/4 of a node.
1221
        toku_destroy_dbt(parent_splitk); // We don't need the parent_splitk any more. If we need a splitk (if we don't merge) we'll malloc a new one.
Yoni Fogel's avatar
Yoni Fogel committed
1222
        *did_rebalance = true;
1223
        balance_leaf_nodes(a, b, splitk);
1224 1225
    } else {
        // we are merging them.
Yoni Fogel's avatar
Yoni Fogel committed
1226 1227
        *did_merge = true;
        *did_rebalance = false;
1228
        toku_init_dbt(splitk);
1229
        toku_destroy_dbt(parent_splitk); // if we are merging, the splitk gets freed.
Leif Walsh's avatar
Leif Walsh committed
1230
        merge_leaf_nodes(a, b);
1231 1232 1233 1234 1235
    }
}

static void
maybe_merge_pinned_nonleaf_nodes(
1236
    const DBT *parent_splitk,
1237 1238
    FTNODE a,
    FTNODE b,
Yoni Fogel's avatar
Yoni Fogel committed
1239 1240
    bool *did_merge,
    bool *did_rebalance,
1241
    DBT *splitk)
1242 1243 1244
{
    toku_assert_entire_node_in_memory(a);
    toku_assert_entire_node_in_memory(b);
1245
    paranoid_invariant(parent_splitk->data);
1246 1247 1248 1249 1250 1251 1252 1253 1254
    int old_n_children = a->n_children;
    int new_n_children = old_n_children + b->n_children;
    XREALLOC_N(new_n_children, a->bp);
    memcpy(a->bp + old_n_children,
           b->bp,
           b->n_children*sizeof(b->bp[0]));
    memset(b->bp,0,b->n_children*sizeof(b->bp[0]));

    XREALLOC_N(new_n_children-1, a->childkeys);
1255 1256 1257 1258 1259 1260 1261
    toku_copy_dbt(&a->childkeys[old_n_children-1], *parent_splitk);
    a->totalchildkeylens += parent_splitk->size;
    for (int i = 0; i < b->n_children; ++i) {
        toku_copy_dbt(&a->childkeys[old_n_children + i], b->childkeys[i]);
        a->totalchildkeylens += b->childkeys[i].size;
        toku_init_dbt(&b->childkeys[i]);
    }
1262 1263 1264 1265 1266
    a->n_children = new_n_children;

    b->totalchildkeylens = 0;
    b->n_children = 0;

1267 1268
    a->dirty = 1;
    b->dirty = 1;
1269

Yoni Fogel's avatar
Yoni Fogel committed
1270 1271
    *did_merge = true;
    *did_rebalance = false;
1272
    toku_init_dbt(splitk);
Leif Walsh's avatar
Leif Walsh committed
1273

1274
    STATUS_VALUE(FT_FLUSHER_MERGE_NONLEAF)++;
1275 1276 1277 1278
}

static void
maybe_merge_pinned_nodes(
1279
    FTNODE parent,
1280
    DBT *parent_splitk,
1281 1282
    FTNODE a,
    FTNODE b,
Yoni Fogel's avatar
Yoni Fogel committed
1283 1284
    bool *did_merge,
    bool *did_rebalance,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1285 1286 1287
    DBT *splitk,
    uint32_t nodesize
    )
Yoni Fogel's avatar
Yoni Fogel committed
1288
// Effect: either merge a and b into one node (merge them into a) and set *did_merge = true.
1289
//	   (We do this if the resulting node is not fissible)
Yoni Fogel's avatar
Yoni Fogel committed
1290
//	   or distribute a and b evenly and set *did_merge = false and *did_rebalance = true
1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306
//	   (If a and be are already evenly distributed, we may do nothing.)
//  If we distribute:
//    For leaf nodes, we distribute the leafentries evenly.
//    For nonleaf nodes, we distribute the children evenly.  That may leave one or both of the nodes overfull, but that's OK.
//  If we distribute, we set *splitk to a malloced pivot key.
// Parameters:
//  t			The BRT.
//  parent		The parent of the two nodes to be split.
//  parent_splitk	The pivot key between a and b.	 This is either free()'d or returned in *splitk.
//  a			The first node to merge.
//  b			The second node to merge.
//  logger		The logger.
//  did_merge		(OUT):	Did the two nodes actually get merged?
//  splitk		(OUT):	If the two nodes did not get merged, the new pivot key between the two nodes.
{
    MSN msn_max;
1307
    paranoid_invariant(a->height == b->height);
1308 1309 1310
    toku_assert_entire_node_in_memory(parent);
    toku_assert_entire_node_in_memory(a);
    toku_assert_entire_node_in_memory(b);
1311
    parent->dirty = 1;   // just to make sure
1312 1313 1314 1315 1316 1317
    {
        MSN msna = a->max_msn_applied_to_node_on_disk;
        MSN msnb = b->max_msn_applied_to_node_on_disk;
        msn_max = (msna.msn > msnb.msn) ? msna : msnb;
    }
    if (a->height == 0) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1318
        maybe_merge_pinned_leaf_nodes(a, b, parent_splitk, did_merge, did_rebalance, splitk, nodesize);
1319
    } else {
Leif Walsh's avatar
Leif Walsh committed
1320
        maybe_merge_pinned_nonleaf_nodes(parent_splitk, a, b, did_merge, did_rebalance, splitk);
1321 1322 1323 1324 1325 1326 1327 1328 1329 1330
    }
    if (*did_merge || *did_rebalance) {
        // accurate for leaf nodes because all msgs above have been
        // applied, accurate for non-leaf nodes because buffer immediately
        // above each node has been flushed
        a->max_msn_applied_to_node_on_disk = msn_max;
        b->max_msn_applied_to_node_on_disk = msn_max;
    }
}

1331
static void merge_remove_key_callback(
Leif Walsh's avatar
Leif Walsh committed
1332
    BLOCKNUM *bp,
Yoni Fogel's avatar
Yoni Fogel committed
1333
    bool for_checkpoint,
Leif Walsh's avatar
Leif Walsh committed
1334
    void *extra)
1335
{
1336
    FT h = (FT) extra;
Leif Walsh's avatar
Leif Walsh committed
1337
    toku_free_blocknum(h->blocktable, bp, h, for_checkpoint);
1338 1339
}

1340 1341 1342 1343 1344
//
// Takes as input a locked node and a childnum_to_merge
// As output, two of node's children are merged or rebalanced, and node is unlocked
//
static void
1345 1346 1347
ft_merge_child(
    FT h,
    FTNODE node,
1348
    int childnum_to_merge,
Yoni Fogel's avatar
Yoni Fogel committed
1349
    bool *did_react,
Leif Walsh's avatar
Leif Walsh committed
1350
    struct flusher_advice *fa)
1351
{
Leif Walsh's avatar
Leif Walsh committed
1352
    // this function should not be called
1353
    // if the child is not mergable
1354
    paranoid_invariant(node->n_children > 1);
1355 1356 1357 1358 1359 1360 1361 1362 1363 1364
    toku_assert_entire_node_in_memory(node);

    int childnuma,childnumb;
    if (childnum_to_merge > 0) {
        childnuma = childnum_to_merge-1;
        childnumb = childnum_to_merge;
    } else {
        childnuma = childnum_to_merge;
        childnumb = childnum_to_merge+1;
    }
1365 1366 1367
    paranoid_invariant(0 <= childnuma);
    paranoid_invariant(childnuma+1 == childnumb);
    paranoid_invariant(childnumb < node->n_children);
1368

1369
    paranoid_invariant(node->height>0);
1370 1371 1372

    // We suspect that at least one of the children is fusible, but they might not be.
    // for test
1373
    call_flusher_thread_callback(flt_flush_before_merge);
1374

1375
    FTNODE childa, childb;
1376
    {
Yoni Fogel's avatar
Yoni Fogel committed
1377
        uint32_t childfullhash = compute_child_fullhash(h->cf, node, childnuma);
1378
        struct ftnode_fetch_extra bfe;
1379
        fill_bfe_for_full_read(&bfe, h);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1380
        toku_pin_ftnode_off_client_thread(h, BP_BLOCKNUM(node, childnuma), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &node, &childa);
1381
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1382
    // for test
1383
    call_flusher_thread_callback(flt_flush_before_pin_second_node_for_merge);
1384
    {
1385
        FTNODE dep_nodes[2];
1386 1387
        dep_nodes[0] = node;
        dep_nodes[1] = childa;
Yoni Fogel's avatar
Yoni Fogel committed
1388
        uint32_t childfullhash = compute_child_fullhash(h->cf, node, childnumb);
1389
        struct ftnode_fetch_extra bfe;
1390
        fill_bfe_for_full_read(&bfe, h);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1391
        toku_pin_ftnode_off_client_thread(h, BP_BLOCKNUM(node, childnumb), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 2, dep_nodes, &childb);
1392 1393 1394
    }

    if (toku_bnc_n_entries(BNC(node,childnuma))>0) {
Leif Walsh's avatar
Leif Walsh committed
1395
        flush_this_child(h, node, childa, childnuma, fa);
1396 1397
    }
    if (toku_bnc_n_entries(BNC(node,childnumb))>0) {
Leif Walsh's avatar
Leif Walsh committed
1398
        flush_this_child(h, node, childb, childnumb, fa);
1399 1400 1401 1402 1403
    }

    // now we have both children pinned in main memory, and cachetable locked,
    // so no checkpoints will occur.

Yoni Fogel's avatar
Yoni Fogel committed
1404
    bool did_merge, did_rebalance;
1405
    {
1406 1407 1408 1409
        DBT splitk;
        toku_init_dbt(&splitk);
        DBT *old_split_key = &node->childkeys[childnuma];
        unsigned int deleted_size = old_split_key->size;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1410
        maybe_merge_pinned_nodes(node, &node->childkeys[childnuma], childa, childb, &did_merge, &did_rebalance, &splitk, h->h->nodesize);
1411 1412
        if (childa->height>0) {
            for (int i=0; i+1<childa->n_children; i++) {
1413
                paranoid_invariant(childa->childkeys[i].data);
1414 1415
            }
        }
1416 1417
        //toku_verify_estimates(t,childa);
        // the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
Yoni Fogel's avatar
Yoni Fogel committed
1418
        *did_react = (bool)(did_merge || did_rebalance);
1419
        if (did_merge) {
1420
            paranoid_invariant(!splitk.data);
1421
        } else {
1422
            paranoid_invariant(splitk.data);
1423
        }
1424 1425 1426 1427

        node->totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes.

        if (did_merge) {
1428 1429 1430 1431 1432 1433
            NONLEAF_CHILDINFO remaining_bnc = BNC(node, childnuma);
            NONLEAF_CHILDINFO merged_bnc = BNC(node, childnumb);
            for (unsigned int i = 0; i < (sizeof remaining_bnc->flow) / (sizeof remaining_bnc->flow[0]); ++i) {
                remaining_bnc->flow[i] += merged_bnc->flow[i];
            }
            destroy_nonleaf_childinfo(merged_bnc);
1434 1435 1436 1437 1438 1439 1440 1441 1442 1443
            set_BNULL(node, childnumb);
            node->n_children--;
            memmove(&node->bp[childnumb],
                    &node->bp[childnumb+1],
                    (node->n_children-childnumb)*sizeof(node->bp[0]));
            REALLOC_N(node->n_children, node->bp);
            memmove(&node->childkeys[childnuma],
                    &node->childkeys[childnuma+1],
                    (node->n_children-childnumb)*sizeof(node->childkeys[0]));
            REALLOC_N(node->n_children-1, node->childkeys);
1444
            paranoid_invariant(BP_BLOCKNUM(node, childnuma).b == childa->thisnodename.b);
1445 1446
            childa->dirty = 1;  // just to make sure
            childb->dirty = 1;  // just to make sure
1447
        } else {
1448 1449 1450 1451
            // flow will be inaccurate for a while, oh well.  the children
            // are leaves in this case so it's not a huge deal (we're
            // pretty far down the tree)

1452
            // If we didn't merge the nodes, then we need the correct pivot.
1453
            toku_copy_dbt(&node->childkeys[childnuma], splitk);
1454
            node->totalchildkeylens += node->childkeys[childnuma].size;
1455
            node->dirty = 1;
1456 1457 1458 1459 1460 1461
        }
    }
    //
    // now we possibly flush the children
    //
    if (did_merge) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1462
        // for test
1463
        call_flusher_thread_callback(flt_flush_before_unpin_remove);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1464

1465 1466
        // merge_remove_key_callback will free the blocknum
        int rrb = toku_cachetable_unpin_and_remove(
Leif Walsh's avatar
Leif Walsh committed
1467
            h->cf,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1468
            childb->ct_pair,
Leif Walsh's avatar
Leif Walsh committed
1469
            merge_remove_key_callback,
1470 1471
            h
            );
1472
        assert_zero(rrb);
1473

1474
        // for test
1475
        call_flusher_thread_callback(ft_flush_aflter_merge);
1476 1477

        // unlock the parent
1478
        paranoid_invariant(node->dirty);
1479
        toku_unpin_ftnode_off_client_thread(h, node);
1480 1481 1482
    }
    else {
        // for test
1483
        call_flusher_thread_callback(ft_flush_aflter_rebalance);
1484 1485

        // unlock the parent
1486
        paranoid_invariant(node->dirty);
1487 1488
        toku_unpin_ftnode_off_client_thread(h, node);
        toku_unpin_ftnode_off_client_thread(h, childb);
1489
    }
Leif Walsh's avatar
Leif Walsh committed
1490 1491
    if (childa->height > 0 && fa->should_recursively_flush(childa, fa->extra)) {
        flush_some_child(h, childa, fa);
1492 1493
    }
    else {
1494
        toku_unpin_ftnode_off_client_thread(h, childa);
1495 1496 1497
    }
}

Leif Walsh's avatar
Leif Walsh committed
1498
void
1499
flush_some_child(
1500 1501
    FT h,
    FTNODE parent,
Leif Walsh's avatar
Leif Walsh committed
1502
    struct flusher_advice *fa)
1503 1504 1505 1506 1507 1508 1509 1510 1511
// Effect: This function does the following:
//   - Pick a child of parent (the heaviest child),
//   - flush from parent to child,
//   - possibly split/merge child.
//   - if child is gorged, recursively proceed with child
//  Note that parent is already locked
//  Upon exit of this function, parent is unlocked and no new
//  new nodes (such as a child) remain locked
{
Leif Walsh's avatar
Leif Walsh committed
1512 1513
    int dirtied = 0;
    NONLEAF_CHILDINFO bnc = NULL;
1514
    paranoid_invariant(parent->height>0);
1515 1516 1517
    toku_assert_entire_node_in_memory(parent);

    // pick the child we want to flush to
Leif Walsh's avatar
Leif Walsh committed
1518
    int childnum = fa->pick_child(h, parent, fa->extra);
1519 1520

    // for test
1521
    call_flusher_thread_callback(flt_flush_before_child_pin);
1522 1523 1524 1525

    // get the child into memory
    BLOCKNUM targetchild = BP_BLOCKNUM(parent, childnum);
    toku_verify_blocknum_allocated(h->blocktable, targetchild);
Yoni Fogel's avatar
Yoni Fogel committed
1526
    uint32_t childfullhash = compute_child_fullhash(h->cf, parent, childnum);
1527 1528
    FTNODE child;
    struct ftnode_fetch_extra bfe;
1529 1530 1531
    // Note that we don't read the entire node into memory yet.
    // The idea is let's try to do the minimum work before releasing the parent lock
    fill_bfe_for_min_read(&bfe, h);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1532
    toku_pin_ftnode_off_client_thread(h, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child);
1533 1534

    // for test
1535
    call_flusher_thread_callback(ft_flush_aflter_child_pin);
1536

Leif Walsh's avatar
Leif Walsh committed
1537
    if (fa->should_destroy_basement_nodes(fa)) {
1538
        maybe_destroy_child_blbs(parent, child, h);
1539 1540 1541 1542 1543 1544 1545 1546
    }

    //Note that at this point, we don't have the entire child in.
    // Let's do a quick check to see if the child may be reactive
    // If the child cannot be reactive, then we can safely unlock
    // the parent before finishing reading in the entire child node.
    bool may_child_be_reactive = may_node_be_reactive(child);

1547
    paranoid_invariant(child->thisnodename.b!=0);
1548 1549
    //VERIFY_NODE(brt, child);

Leif Walsh's avatar
Leif Walsh committed
1550 1551 1552 1553
    // only do the following work if there is a flush to perform
    if (toku_bnc_n_entries(BNC(parent, childnum)) > 0) {
        if (!parent->dirty) {
            dirtied++;
1554
            parent->dirty = 1;
Leif Walsh's avatar
Leif Walsh committed
1555 1556 1557 1558
        }
        // detach buffer
        BP_WORKDONE(parent, childnum) = 0;  // this buffer is drained, no work has been done by its contents
        bnc = BNC(parent, childnum);
1559 1560 1561
        NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
        memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
        set_BNC(parent, childnum, new_bnc);
Leif Walsh's avatar
Leif Walsh committed
1562
    }
1563 1564 1565 1566 1567 1568 1569 1570

    //
    // at this point, the buffer has been detached from the parent
    // and a new empty buffer has been placed in its stead
    // so, if we are absolutely sure that the child is not
    // reactive, we can unpin the parent
    //
    if (!may_child_be_reactive) {
1571
        toku_unpin_ftnode_off_client_thread(h, parent);
Leif Walsh's avatar
Leif Walsh committed
1572
        parent = NULL;
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586
    }

    //
    // now, if necessary, read/decompress the rest of child into memory,
    // so that we can proceed and apply the flush
    //
    bring_node_fully_into_memory(child, h);

    // It is possible after reading in the entire child,
    // that we now know that the child is not reactive
    // if so, we can unpin parent right now
    // we wont be splitting/merging child
    // and we have already replaced the bnc
    // for the root with a fresh one
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1587
    enum reactivity child_re = get_node_reactivity(child, h->h->nodesize);
Leif Walsh's avatar
Leif Walsh committed
1588
    if (parent && child_re == RE_STABLE) {
1589
        toku_unpin_ftnode_off_client_thread(h, parent);
Leif Walsh's avatar
Leif Walsh committed
1590
        parent = NULL;
1591 1592
    }

Leif Walsh's avatar
Leif Walsh committed
1593 1594 1595 1596 1597 1598 1599 1600
    // from above, we know at this point that either the bnc
    // is detached from the parent (which may be unpinned),
    // and we have to apply the flush, or there was no data
    // in the buffer to flush, and as a result, flushing is not necessary
    // and bnc is NULL
    if (bnc != NULL) {
        if (!child->dirty) {
            dirtied++;
1601
            child->dirty = 1;
Leif Walsh's avatar
Leif Walsh committed
1602 1603
        }
        // do the actual flush
1604
        toku_bnc_flush_to_child(
1605
            h,
Leif Walsh's avatar
Leif Walsh committed
1606 1607 1608 1609 1610
            bnc,
            child
            );
        destroy_nonleaf_childinfo(bnc);
    }
1611

Leif Walsh's avatar
Leif Walsh committed
1612
    fa->update_status(child, dirtied, fa->extra);
1613 1614 1615
    // let's get the reactivity of the child again,
    // it is possible that the flush got rid of some values
    // and now the parent is no longer reactive
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1616
    child_re = get_node_reactivity(child, h->h->nodesize);
1617 1618 1619 1620
    // if the parent has been unpinned above, then
    // this is our only option, even if the child is not stable
    // if the child is not stable, we'll handle it the next
    // time we need to flush to the child
Leif Walsh's avatar
Leif Walsh committed
1621 1622
    if (!parent ||
        child_re == RE_STABLE ||
1623
        (child_re == RE_FUSIBLE && parent->n_children == 1)
Leif Walsh's avatar
Leif Walsh committed
1624
        )
1625
    {
Leif Walsh's avatar
Leif Walsh committed
1626
        if (parent) {
1627
            toku_unpin_ftnode_off_client_thread(h, parent);
Leif Walsh's avatar
Leif Walsh committed
1628
            parent = NULL;
1629 1630
        }
        //
Leif Walsh's avatar
Leif Walsh committed
1631
        // it is the responsibility of flush_some_child to unpin child
1632
        //
Leif Walsh's avatar
Leif Walsh committed
1633 1634
        if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) {
            flush_some_child(h, child, fa);
1635 1636
        }
        else {
1637
            toku_unpin_ftnode_off_client_thread(h, child);
1638 1639 1640 1641
        }
    }
    else if (child_re == RE_FISSIBLE) {
        //
1642
        // it is responsibility of `ft_split_child` to unlock nodes of
Leif Walsh's avatar
Leif Walsh committed
1643
        // parent and child as it sees fit
1644
        //
1645 1646
        paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
        ft_split_child(h, parent, childnum, child, SPLIT_EVENLY, fa);
1647 1648 1649
    }
    else if (child_re == RE_FUSIBLE) {
        //
Leif Walsh's avatar
Leif Walsh committed
1650 1651
        // it is responsibility of `maybe_merge_child to unlock nodes of
        // parent and child as it sees fit
1652
        //
1653
        paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
Leif Walsh's avatar
Leif Walsh committed
1654
        fa->maybe_merge_child(fa, h, parent, childnum, child, fa->extra);
1655 1656
    }
    else {
1657
        abort();
1658 1659 1660 1661 1662
    }
}

static void
update_cleaner_status(
1663
    FTNODE node,
Leif Walsh's avatar
Leif Walsh committed
1664
    int childnum)
1665
{
1666
    STATUS_VALUE(FT_FLUSHER_CLEANER_TOTAL_NODES)++;
1667
    if (node->height == 1) {
1668
        STATUS_VALUE(FT_FLUSHER_CLEANER_H1_NODES)++;
1669
    } else {
1670
        STATUS_VALUE(FT_FLUSHER_CLEANER_HGT1_NODES)++;
1671 1672 1673 1674
    }

    unsigned int nbytesinbuf = toku_bnc_nbytesinbuf(BNC(node, childnum));
    if (nbytesinbuf == 0) {
1675
        STATUS_VALUE(FT_FLUSHER_CLEANER_EMPTY_NODES)++;
1676
    } else {
1677 1678
        if (nbytesinbuf > STATUS_VALUE(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE)) {
            STATUS_VALUE(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE) = nbytesinbuf;
1679
        }
1680 1681
        if (nbytesinbuf < STATUS_VALUE(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE)) {
            STATUS_VALUE(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE) = nbytesinbuf;
1682
        }
1683
        STATUS_VALUE(FT_FLUSHER_CLEANER_TOTAL_BUFFER_SIZE) += nbytesinbuf;
1684 1685

        uint64_t workdone = BP_WORKDONE(node, childnum);
1686 1687
        if (workdone > STATUS_VALUE(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE)) {
            STATUS_VALUE(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE) = workdone;
1688
        }
1689 1690
        if (workdone < STATUS_VALUE(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE)) {
            STATUS_VALUE(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE) = workdone;
1691
        }
1692
        STATUS_VALUE(FT_FLUSHER_CLEANER_TOTAL_BUFFER_WORKDONE) += workdone;
1693 1694 1695
    }
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709
static void
dummy_update_status(
    FTNODE UU(child),
    int UU(dirtied),
    void* UU(extra)
    ) 
{
}

static int
dummy_pick_heaviest_child(FT UU(h),
                    FTNODE UU(parent),
                    void* UU(extra))
{
1710
    abort();
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1711 1712 1713 1714 1715 1716 1717
    return -1;
}

void toku_ft_split_child(
    FT ft,
    FTNODE node,
    int childnum,
1718 1719
    FTNODE child,
    enum split_mode split_mode
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737
    )
{
    struct flusher_advice fa;
    flusher_advice_init(
        &fa,
        dummy_pick_heaviest_child,
        dont_destroy_basement_nodes,
        never_recursively_flush,
        default_merge_child,
        dummy_update_status,
        default_pick_child_after_split,
        NULL
        );
    ft_split_child(
        ft,
        node,
        childnum, // childnum to split
        child,
1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765
        split_mode,
        &fa
        );
}

void toku_ft_merge_child(
    FT ft,
    FTNODE node,
    int childnum
    )
{
    struct flusher_advice fa;
    flusher_advice_init(
        &fa,
        dummy_pick_heaviest_child,
        dont_destroy_basement_nodes,
        never_recursively_flush,
        default_merge_child,
        dummy_update_status,
        default_pick_child_after_split,
        NULL
        );
    bool did_react;
    ft_merge_child(
        ft,
        node,
        childnum, // childnum to merge
        &did_react,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1766 1767 1768 1769
        &fa
        );
}

1770
int
1771 1772
toku_ftnode_cleaner_callback(
    void *ftnode_pv,
1773
    BLOCKNUM blocknum,
Yoni Fogel's avatar
Yoni Fogel committed
1774
    uint32_t fullhash,
Leif Walsh's avatar
Leif Walsh committed
1775
    void *extraargs)
1776
{
1777
    FTNODE node = (FTNODE) ftnode_pv;
1778 1779 1780
    invariant(node->thisnodename.b == blocknum.b);
    invariant(node->fullhash == fullhash);
    invariant(node->height > 0);   // we should never pick a leaf node (for now at least)
1781
    FT h = (FT) extraargs;
1782
    bring_node_fully_into_memory(node, h);
Leif Walsh's avatar
Leif Walsh committed
1783 1784
    int childnum = find_heaviest_child(node);
    update_cleaner_status(node, childnum);
1785 1786 1787

    // Either flush_some_child will unlock the node, or we do it here.
    if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) {
Leif Walsh's avatar
Leif Walsh committed
1788 1789
        struct flusher_advice fa;
        struct flush_status_update_extra fste;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1790
        ct_flusher_advice_init(&fa, &fste, h->h->nodesize);
Leif Walsh's avatar
Leif Walsh committed
1791
        flush_some_child(h, node, &fa);
1792
    } else {
1793
        toku_unpin_ftnode_off_client_thread(h, node);
1794 1795 1796 1797 1798
    }
    return 0;
}

struct flusher_extra {
1799 1800
    FT h;
    FTNODE node;
1801 1802 1803 1804 1805 1806 1807 1808 1809 1810
    NONLEAF_CHILDINFO bnc;
};

//
// This is the function that gets called by a
// background thread. Its purpose is to complete
// a flush, and possibly do a split/merge.
//
static void flush_node_fun(void *fe_v)
{
1811
    struct flusher_extra* fe = (struct flusher_extra *) fe_v;
1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
    // The node that has been placed on the background
    // thread may not be fully in memory. Some message
    // buffers may be compressed. Before performing
    // any operations, we must first make sure
    // the node is fully in memory
    //
    // If we have a bnc, that means fe->node is a child, and we've already
    // destroyed its basement nodes if necessary, so we now need to either
    // read them back in, or just do the regular partial fetch.  If we
    // don't, that means fe->node is a parent, so we need to do this anyway.
    bring_node_fully_into_memory(fe->node,fe->h);
1823
    fe->node->dirty = 1;
1824

Leif Walsh's avatar
Leif Walsh committed
1825 1826
    struct flusher_advice fa;
    struct flush_status_update_extra fste;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1827
    flt_flusher_advice_init(&fa, &fste, fe->h->h->nodesize);
Leif Walsh's avatar
Leif Walsh committed
1828

1829 1830 1831 1832
    if (fe->bnc) {
        // In this case, we have a bnc to flush to a node

        // for test purposes
1833
        call_flusher_thread_callback(flt_flush_before_applying_inbox);
1834

1835
        toku_bnc_flush_to_child(
1836
            fe->h,
1837 1838 1839 1840 1841 1842 1843 1844 1845
            fe->bnc,
            fe->node
            );
        destroy_nonleaf_childinfo(fe->bnc);

        // after the flush has completed, now check to see if the node needs flushing
        // If so, call flush_some_child on the node, and it is the responsibility
        // of flush_some_child to unlock the node
        // otherwise, we unlock the node here.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1846
        if (fe->node->height > 0 && toku_ft_nonleaf_is_gorged(fe->node, fe->h->h->nodesize)) {
Leif Walsh's avatar
Leif Walsh committed
1847
            flush_some_child(fe->h, fe->node, &fa);
1848 1849
        }
        else {
1850
            toku_unpin_ftnode_off_client_thread(fe->h,fe->node);
1851 1852 1853 1854 1855 1856 1857
        }
    }
    else {
        // In this case, we were just passed a node with no
        // bnc, which means we are tasked with flushing some
        // buffer in the node.
        // It is the responsibility of flush_some_child to unlock the node
Leif Walsh's avatar
Leif Walsh committed
1858
        flush_some_child(fe->h, fe->node, &fa);
1859
    }
1860
    remove_background_job_from_cf(fe->h->cf);
1861 1862 1863 1864 1865
    toku_free(fe);
}

static void
place_node_and_bnc_on_background_thread(
1866 1867
    FT h,
    FTNODE node,
Leif Walsh's avatar
Leif Walsh committed
1868
    NONLEAF_CHILDINFO bnc)
1869
{
1870
    struct flusher_extra *XMALLOC(fe);
1871
    fe->h = h;
1872 1873
    fe->node = node;
    fe->bnc = bnc;
1874
    cachefile_kibbutz_enq(h->cf, flush_node_fun, fe);
1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890
}

//
// This takes as input a gorged, locked,  non-leaf node named parent
// and sets up a flush to be done in the background.
// The flush is setup like this:
//  - We call maybe_get_and_pin_clean on the child we want to flush to in order to try to lock the child
//  - if we successfully pin the child, and the child does not need to be split or merged
//     then we detach the buffer, place the child and buffer onto a background thread, and
//     have the flush complete in the background, and unlock the parent. The child will be
//     unlocked on the background thread
//  - if any of the above does not happen (child cannot be locked,
//     child needs to be split/merged), then we place the parent on the background thread.
//     The parent will be unlocked on the background thread
//
void
1891
flush_node_on_background_thread(FT h, FTNODE parent)
1892 1893 1894 1895 1896
{
    //
    // first let's see if we can detach buffer on client thread
    // and pick the child we want to flush to
    //
Leif Walsh's avatar
Leif Walsh committed
1897
    int childnum = find_heaviest_child(parent);
1898
    paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
1899 1900 1901
    //
    // see if we can pin the child
    //
1902
    FTNODE child;
Yoni Fogel's avatar
Yoni Fogel committed
1903
    uint32_t childfullhash = compute_child_fullhash(h->cf, parent, childnum);
1904
    int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child);
1905 1906 1907 1908
    if (r != 0) {
        // In this case, we could not lock the child, so just place the parent on the background thread
        // In the callback, we will use flush_some_child, which checks to
        // see if we should blow away the old basement nodes.
1909
        place_node_and_bnc_on_background_thread(h, parent, NULL);
1910 1911 1912 1913 1914 1915 1916 1917 1918 1919
    }
    else {
        //
        // successfully locked child
        //
        bool may_child_be_reactive = may_node_be_reactive(child);
        if (!may_child_be_reactive) {
            // We're going to unpin the parent, so before we do, we must
            // check to see if we need to blow away the basement nodes to
            // keep the MSN invariants intact.
1920
            maybe_destroy_child_blbs(parent, child, h);
1921 1922 1923 1924

            //
            // can detach buffer and unpin root here
            //
1925
            parent->dirty = 1;
1926 1927
            BP_WORKDONE(parent, childnum) = 0;  // this buffer is drained, no work has been done by its contents
            NONLEAF_CHILDINFO bnc = BNC(parent, childnum);
1928 1929 1930
            NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
            memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
            set_BNC(parent, childnum, new_bnc);
1931 1932 1933 1934 1935 1936 1937

            //
            // at this point, the buffer has been detached from the parent
            // and a new empty buffer has been placed in its stead
            // so, because we know for sure the child is not
            // reactive, we can unpin the parent
            //
1938
            place_node_and_bnc_on_background_thread(h, child, bnc);
1939
            toku_unpin_ftnode(h, parent);
1940 1941 1942 1943 1944
        }
        else {
            // because the child may be reactive, we need to
            // put parent on background thread.
            // As a result, we unlock the child here.
1945
            toku_unpin_ftnode(h, child);
1946 1947
            // Again, we'll have the parent on the background thread, so
            // we don't need to destroy the basement nodes yet.
1948
            place_node_and_bnc_on_background_thread(h, parent, NULL);
1949 1950 1951
        }
    }
}
1952

1953
#include <toku_race_tools.h>
1954
void __attribute__((__constructor__)) toku_ft_flusher_helgrind_ignore(void);
1955
void
1956
toku_ft_flusher_helgrind_ignore(void) {
1957
    TOKU_VALGRIND_HG_DISABLE_CHECKING(&ft_flusher_status, sizeof ft_flusher_status);
1958
}
1959 1960

#undef STATUS_VALUE