ftloader.cc 113 KB
Newer Older
1 2
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3
#ident "$Id$"
4
#ident "Copyright (c) 2007-2012 Tokutek Inc.  All rights reserved."
5 6
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."

7 8 9 10 11 12
#include <toku_portability.h>

#if !TOKU_WINDOWS
#include <arpa/inet.h>
#endif

13 14 15 16
#include <stdio.h>
#include <memory.h>
#include <errno.h>
#include <toku_assert.h>
17 18 19
#include <string.h>
#include <fcntl.h>
#include "x1764.h"
20 21
#include "ftloader-internal.h"
#include "ft-internal.h"
22
#include "sub_block.h"
23
#include "sub_block_map.h"
24
#include "pqueue.h"
25
#include "dbufio.h"
26 27
#include "leafentry.h"
#include "log-internal.h"
28
#include "ft.h"
29

30
#if defined(__cilkplusplus)
31 32 33 34 35 36
#error DISABLING CILK ARTS CILK
#endif

#if defined(HAVE_CILK)
#include <cilk/cilk.h>
#define cilk_worker_count (__cilkrts_get_nworkers())
37 38 39 40
#else
#define cilk_spawn
#define cilk_sync
#define cilk_for for
41
#define cilk_worker_count 1
42 43
#endif

44
static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL;
45
void ft_loader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) {
46 47
    os_fwrite_fun=fwrite_fun;
}
48

49 50
static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
    if (os_fwrite_fun) {
51
        return os_fwrite_fun(ptr, size, nmemb, stream);
52
    } else {
53
        return fwrite(ptr, size, nmemb, stream);
54 55 56
    }
}

57 58 59 60

// 1024 is the right size_factor for production.  
// Different values for these sizes may be used for testing.
static uint32_t size_factor = 1024;
61 62
static uint32_t default_loader_nodesize = FT_DEFAULT_NODE_SIZE;
static uint32_t default_loader_basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
63

64
enum { EXTRACTOR_QUEUE_DEPTH = 2,
65
       FILE_BUFFER_SIZE  = 1<<24,
66
       MIN_ROWSET_MEMORY = 1<<23,
67
       MIN_MERGE_FANIN   = 2,
68
       FRACTAL_WRITER_QUEUE_DEPTH = 3,
69
       FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2,
70 71
       DBUFIO_DEPTH = 2,
       TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
72
       MIN_MERGE_BUF_SIZE = 1<<20 // always use at least this much
73
};
74 75 76


void
77
toku_ft_loader_set_size_factor(uint32_t factor) {
78 79
// For test purposes only
    size_factor = factor;
80
    default_loader_nodesize = (size_factor==1) ? (1<<15) : FT_DEFAULT_NODE_SIZE;
81 82
}

83
uint64_t
84
toku_ft_loader_get_rowset_budget_for_testing (void)
85 86 87 88
// For test purposes only.  In production, the rowset size is determined by negotation with the cachetable for some memory.  (See #2613).
{
    return 16ULL*size_factor*1024ULL;
}
89

90
void ft_loader_lock_init(FTLOADER bl) {
91
    invariant(!bl->mutex_init);
92
    toku_mutex_init(&bl->mutex, NULL); 
Yoni Fogel's avatar
Yoni Fogel committed
93
    bl->mutex_init = true;
94 95
}

96
void ft_loader_lock_destroy(FTLOADER bl) {
97
    if (bl->mutex_init) {
98
        toku_mutex_destroy(&bl->mutex);
Yoni Fogel's avatar
Yoni Fogel committed
99
        bl->mutex_init = false;
100 101 102
    }
}

103
static void ft_loader_lock(FTLOADER bl) {
104
    invariant(bl->mutex_init);
105
    toku_mutex_lock(&bl->mutex);
106 107
}

108
static void ft_loader_unlock(FTLOADER bl) {
109
    invariant(bl->mutex_init);
110
    toku_mutex_unlock(&bl->mutex);
111 112
}

113 114
static int add_big_buffer(struct file_info *file) {
    int result = 0;
Yoni Fogel's avatar
Yoni Fogel committed
115
    bool newbuffer = false;
116
    if (file->buffer == NULL) {
117
        file->buffer = toku_malloc(file->buffer_size);
118
        if (file->buffer == NULL)
119
            result = get_error_errno();
120
        else
Yoni Fogel's avatar
Yoni Fogel committed
121
            newbuffer = true;
122
    }
123 124 125
    if (result == 0) {
        int r = setvbuf(file->file, (char *) file->buffer, _IOFBF, file->buffer_size);
        if (r != 0) {
126
            result = get_error_errno();
127 128 129 130 131 132 133
            if (newbuffer) {
                toku_free(file->buffer);
                file->buffer = NULL;
            }
        }
    } 
    return result;
134 135 136 137 138 139 140 141
}

static void cleanup_big_buffer(struct file_info *file) {
    if (file->buffer) {
        toku_free(file->buffer);
        file->buffer = NULL;
    }
}
142

143
int ft_loader_init_file_infos (struct file_infos *fi) {
144
    int result = 0;
145
    toku_mutex_init(&fi->lock, NULL);
146 147 148 149 150
    fi->n_files = 0;
    fi->n_files_limit = 1;
    fi->n_files_open = 0;
    fi->n_files_extant = 0;
    MALLOC_N(fi->n_files_limit, fi->file_infos);
151
    if (fi->file_infos == NULL)
152
        result = get_error_errno();
153
    return result;
154 155
}

Yoni Fogel's avatar
Yoni Fogel committed
156
void ft_loader_fi_destroy (struct file_infos *fi, bool is_error)
157 158 159 160 161
// Effect: Free the resources in the fi.
// If is_error then we close and unlink all the temp files.
// If !is_error then requires that all the temp files have been closed and destroyed
// No error codes are returned.  If anything goes wrong with closing and unlinking then it's only in an is_error case, so we don't care.
{
162 163 164 165
    if (fi->file_infos == NULL) {
        // ft_loader_init_file_infos guarantees this isn't null, so if it is, we know it hasn't been inited yet and we don't need to destroy it.
        return;
    }
166
    toku_mutex_destroy(&fi->lock);
167
    if (!is_error) {
168 169
        invariant(fi->n_files_open==0);
        invariant(fi->n_files_extant==0);
170 171
    }
    for (int i=0; i<fi->n_files; i++) {
172 173 174 175 176 177 178 179 180
        if (fi->file_infos[i].is_open) {
            invariant(is_error);
            toku_os_fclose(fi->file_infos[i].file); // don't check for errors, since we are in an error case.
        }
        if (fi->file_infos[i].is_extant) {
            invariant(is_error);
            unlink(fi->file_infos[i].fname);
            toku_free(fi->file_infos[i].fname);
        }
181
        cleanup_big_buffer(&fi->file_infos[i]);
182 183 184 185 186 187 188
    }
    toku_free(fi->file_infos);
    fi->n_files=0;
    fi->n_files_limit=0;
    fi->file_infos = NULL;
}

189
static int open_file_add (struct file_infos *fi,
190 191 192
                          FILE *file,
                          char *fname,
                          /* out */ FIDX *idx)
193
{
194
    int result = 0;
195
    toku_mutex_lock(&fi->lock);
196
    if (fi->n_files >= fi->n_files_limit) {
197 198
        fi->n_files_limit *=2;
        XREALLOC_N(fi->n_files_limit, fi->file_infos);
199
    }
Dave Wells's avatar
Dave Wells committed
200
    invariant(fi->n_files < fi->n_files_limit);
Yoni Fogel's avatar
Yoni Fogel committed
201 202
    fi->file_infos[fi->n_files].is_open   = true;
    fi->file_infos[fi->n_files].is_extant = true;
203 204
    fi->file_infos[fi->n_files].fname     = fname;
    fi->file_infos[fi->n_files].file      = file;
205
    fi->file_infos[fi->n_files].n_rows    = 0;
206
    fi->file_infos[fi->n_files].buffer_size = FILE_BUFFER_SIZE;
207
    fi->file_infos[fi->n_files].buffer    = NULL;
208 209 210 211 212 213 214
    result = add_big_buffer(&fi->file_infos[fi->n_files]);
    if (result == 0) {
        idx->idx = fi->n_files;
        fi->n_files++;
        fi->n_files_extant++;
        fi->n_files_open++;
    }
215
   toku_mutex_unlock(&fi->lock);
216
    return result;
217 218
}

219
int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) {
220
    int result = 0;
221
    toku_mutex_lock(&fi->lock);
222
    int i = idx.idx;
Dave Wells's avatar
Dave Wells committed
223 224 225
    invariant(i>=0 && i<fi->n_files);
    invariant(!fi->file_infos[i].is_open);
    invariant(fi->file_infos[i].is_extant);
226
    fi->file_infos[i].file = toku_os_fopen(fi->file_infos[i].fname, mode);
227
    if (fi->file_infos[i].file == NULL) { 
228
        result = get_error_errno();
229
    } else {
Yoni Fogel's avatar
Yoni Fogel committed
230
        fi->file_infos[i].is_open = true;
231 232 233 234
        // No longer need the big buffer for reopened files.  Don't allocate the space, we need it elsewhere.
        //add_big_buffer(&fi->file_infos[i]);
        fi->n_files_open++;
    }
235
    toku_mutex_unlock(&fi->lock);
236
    return result;
237 238
}

Yoni Fogel's avatar
Yoni Fogel committed
239
int ft_loader_fi_close (struct file_infos *fi, FIDX idx, bool require_open)
240
{
241
    int result = 0;
242
    toku_mutex_lock(&fi->lock); 
Dave Wells's avatar
Dave Wells committed
243
    invariant(idx.idx >=0 && idx.idx < fi->n_files);
244 245 246
    if (fi->file_infos[idx.idx].is_open) {
        invariant(fi->n_files_open>0);   // loader-cleanup-test failure
        fi->n_files_open--;
Yoni Fogel's avatar
Yoni Fogel committed
247
        fi->file_infos[idx.idx].is_open = false;
248 249
        int r = toku_os_fclose(fi->file_infos[idx.idx].file);
        if (r)
250
            result = get_error_errno();
251
        cleanup_big_buffer(&fi->file_infos[idx.idx]);
252
    } else if (require_open)
253
        result = EINVAL;
254
    toku_mutex_unlock(&fi->lock); 
255
    return result;
256 257
}

258
int ft_loader_fi_unlink (struct file_infos *fi, FIDX idx) {
259
    int result = 0;
260
    toku_mutex_lock(&fi->lock);
261
    int id = idx.idx;
Dave Wells's avatar
Dave Wells committed
262
    invariant(id >=0 && id < fi->n_files);
263 264 265 266
    if (fi->file_infos[id].is_extant) { // must still exist
        invariant(fi->n_files_extant>0);
        fi->n_files_extant--;
        invariant(!fi->file_infos[id].is_open); // must be closed before we unlink
Yoni Fogel's avatar
Yoni Fogel committed
267
        fi->file_infos[id].is_extant = false;
268 269
        int r = unlink(fi->file_infos[id].fname);  
        if (r != 0) 
270
            result = get_error_errno();
271 272 273 274
        toku_free(fi->file_infos[id].fname);
        fi->file_infos[id].fname = NULL;
    } else
        result = EINVAL;
275
    toku_mutex_unlock(&fi->lock);
276
    return result;
277 278
}

279
int
280
ft_loader_fi_close_all(struct file_infos *fi) {
281
    int rval = 0;
282
    for (int i = 0; i < fi->n_files; i++) {
283
        int r;
284
        FIDX idx = { i };
Yoni Fogel's avatar
Yoni Fogel committed
285
        r = ft_loader_fi_close(fi, idx, false);  // ignore files that are already closed
286 287
        if (rval == 0 && r)
            rval = r;  // capture first error
288
    }
289
    return rval;
290 291
}

292
int ft_loader_open_temp_file (FTLOADER bl, FIDX *file_idx)
293 294
/* Effect: Open a temporary file in read-write mode.  Save enough information to close and delete the file later.
 * Return value: 0 on success, an error number otherwise.
295 296
 *  On error, *file_idx and *fnamep will be unmodified.
 *  The open file will be saved in bl->file_infos so that even if errors happen we can free them all.
297 298
 */
{
299
    int result = 0;
300
    FILE *f = NULL;
301 302 303
    int fd = -1;
    char *fname = toku_strdup(bl->temp_file_template);    
    if (fname == NULL)
304
        result = get_error_errno();
305 306 307
    else {
        fd = mkstemp(fname);
        if (fd < 0) { 
308
            result = get_error_errno();
309 310 311
        } else {
            f = toku_os_fdopen(fd, "r+");
            if (f == NULL)
312
                result = get_error_errno();
313 314 315
            else
                result = open_file_add(&bl->file_infos, f, fname, file_idx);
        }
316 317
    }
    if (result != 0) {
318
        if (fd >= 0) {
319
            toku_os_close(fd);
320 321
            unlink(fname);
        }
322
        if (f != NULL)
323
            toku_os_fclose(f);  // don't check for error because we're already in an error case
324 325 326 327
        if (fname != NULL)
            toku_free(fname);
    }
    return result;
328 329
}

Yoni Fogel's avatar
Yoni Fogel committed
330
void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error) {
331
    ft_loader_lock_destroy(bl);
332

333 334 335
    // These frees rely on the fact that if you free a NULL pointer then nothing bad happens.
    toku_free(bl->dbs);
    toku_free(bl->descriptors);
336
    toku_free(bl->root_xids_that_created);
337 338
    if (bl->new_fnames_in_env) {
        for (int i = 0; i < bl->N; i++)
339
            toku_free((char*)bl->new_fnames_in_env[i]);
340
        toku_free(bl->new_fnames_in_env);
341
    }
342
    toku_free(bl->extracted_datasizes);
343 344
    toku_free(bl->bt_compare_funs);
    toku_free((char*)bl->temp_file_template);
345
    ft_loader_fi_destroy(&bl->file_infos, is_error);
346 347 348 349 350 351 352 353 354

    for (int i = 0; i < bl->N; i++) 
        destroy_rowset(&bl->rows[i]);
    toku_free(bl->rows);

    for (int i = 0; i < bl->N; i++)
        destroy_merge_fileset(&bl->fs[i]);
    toku_free(bl->fs);

355 356 357 358 359 360
    if (bl->last_key) {
        for (int i=0; i < bl->N; i++) {
            toku_free(bl->last_key[i].data);
        }
        toku_free(bl->last_key);
        bl->last_key = NULL;
361 362
    }

363 364
    destroy_rowset(&bl->primary_rowset);

365
    for (int i=0; i<bl->N; i++) {
366 367 368
        if ( bl->fractal_queues ) {
            invariant(bl->fractal_queues[i]==NULL);
        }
369 370 371 372 373
    }
    toku_free(bl->fractal_threads);
    toku_free(bl->fractal_queues);
    toku_free(bl->fractal_threads_live);

374
    if (bl->did_reserve_memory) {
375
        invariant(bl->cachetable);
376
        toku_cachetable_release_reserved_memory(bl->cachetable, bl->reserved_memory);
377
    }
378

379 380
    ft_loader_destroy_error_callback(&bl->error_callback);
    ft_loader_destroy_poll_callback(&bl->poll_callback);
381

382 383
    //printf("Progress=%d/%d\n", bl->progress, PROGRESS_MAX);

384
    toku_free(bl);
385 386
}

387 388
static void *extractor_thread (void*);

389
#define MAX(a,b) (((a)<(b)) ? (b) : (a))
390

391
static uint64_t memory_per_rowset_during_extract (FTLOADER bl)
392 393
// Return how much memory can be allocated for each rowset.
{
394
    if (size_factor==1) {
395
        return 16*1024;
396
    } else {
397 398 399 400 401 402 403
        // There is a primary rowset being maintained by the foreground thread.
        // There could be two more in the queue.
        // There is one rowset for each index (bl->N) being filled in.
        // Later we may have sort_and_write operations spawning in parallel, and will need to account for that.
        int n_copies = (1 // primary rowset
                        +EXTRACTOR_QUEUE_DEPTH  // the number of primaries in the queue
                        +bl->N // the N rowsets being constructed by the extractor thread.
404
                        +bl->N // the N sort buffers
405 406 407 408 409
                        +1     // Give the extractor thread one more so that it can have temporary space for sorting.  This is overkill.
                        );
        int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE;  // for each index we are writing to a file at any given time.
        int64_t tentative_rowset_size = ((int64_t)(bl->reserved_memory - extra_reserved_memory))/(n_copies);
        return MAX(tentative_rowset_size, (int64_t)MIN_ROWSET_MEMORY);
410
    }
411 412
}

413
static unsigned ft_loader_get_fractal_workers_count(FTLOADER bl) {
414 415
    unsigned w = 0;
    while (1) {
416
        ft_loader_lock(bl);
417
        w = bl->fractal_workers;
418
        ft_loader_unlock(bl);
419 420 421 422 423 424 425
        if (w != 0)
            break;
        toku_pthread_yield();  // maybe use a cond var instead
    }
    return w;
}

426 427
static void ft_loader_set_fractal_workers_count(FTLOADER bl) {
    ft_loader_lock(bl);
428 429
    if (bl->fractal_workers == 0)
        bl->fractal_workers = cilk_worker_count;
430
    ft_loader_unlock(bl);
431 432
}

433 434 435 436
// To compute a merge, we have a certain amount of memory to work with.
// We perform only one fanin at a time.
// If the fanout is F then we are using
//   F merges.  Each merge uses
437
//   DBUFIO_DEPTH buffers for double buffering.  Each buffer is of size at least MERGE_BUF_SIZE
438
// so the memory is
439
//   F*MERGE_BUF_SIZE*DBUFIO_DEPTH storage.
440 441
// We use some additional space to buffer the outputs. 
//  That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile.
442
//  And we have FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE per queue
443
//  And if we are doing a fractal, each worker could have have a fractal tree that it's working on.
444 445 446
//
// DBUFIO_DEPTH*F*MERGE_BUF_SIZE + FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE + WORKERS*NODESIZE*2 <= RESERVED_MEMORY

Yoni Fogel's avatar
Yoni Fogel committed
447
static int64_t memory_avail_during_merge(FTLOADER bl, bool is_fractal_node) {
448 449
    // avail memory = reserved memory - WORKERS*NODESIZE*2 for the last merge stage only
    int64_t avail_memory = bl->reserved_memory;
450
    if (is_fractal_node) {
451
        // reserve space for the fractal writer thread buffers
452
        avail_memory -= (int64_t)ft_loader_get_fractal_workers_count(bl) * (int64_t)default_loader_nodesize * 2; // compressed and uncompressed buffers
453
    }
454
    return avail_memory;
455 456
}

Yoni Fogel's avatar
Yoni Fogel committed
457
static int merge_fanin (FTLOADER bl, bool is_fractal_node) {
458
    // return number of temp files to read in this pass
459
    int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
460 461 462 463
    int64_t nbuffers = memory_avail / (int64_t)TARGET_MERGE_BUF_SIZE;
    if (is_fractal_node)
        nbuffers -= FRACTAL_WRITER_ROWSETS;
    return MAX(nbuffers / (int64_t)DBUFIO_DEPTH, (int)MIN_MERGE_FANIN);
464 465
}

Yoni Fogel's avatar
Yoni Fogel committed
466
static uint64_t memory_per_rowset_during_merge (FTLOADER bl, int merge_factor, bool is_fractal_node // if it is being sent to a q
467
                                                ) {
468
    int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
469 470 471 472
    int64_t nbuffers = DBUFIO_DEPTH * merge_factor;
    if (is_fractal_node)
        nbuffers += FRACTAL_WRITER_ROWSETS;
    return MAX(memory_avail / nbuffers, (int64_t)MIN_MERGE_BUF_SIZE);
473
}
474

475
int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
476 477 478 479 480 481 482 483
                                   CACHETABLE cachetable,
                                   generate_row_for_put_func g,
                                   DB *src_db,
                                   int N, FT_HANDLE brts[/*N*/], DB* dbs[/*N*/],
                                   const char *new_fnames_in_env[/*N*/],
                                   ft_compare_func bt_compare_functions[/*N*/],
                                   const char *temp_file_template,
                                   LSN load_lsn,
484
                                   TOKUTXN txn,
Yoni Fogel's avatar
Yoni Fogel committed
485
                                   bool reserve_memory)
486
// Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread.
487
{
488
    FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC)
489
    if (!bl) return get_error_errno();
490

491
    bl->generate_row_for_put = g;
Yoni Fogel's avatar
Yoni Fogel committed
492
    bl->cachetable = cachetable;
493
    if (reserve_memory && bl->cachetable) {
Yoni Fogel's avatar
Yoni Fogel committed
494
        bl->did_reserve_memory = true;
495
        bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 2.0/3.0); // allocate 2/3 of the unreserved part (which is 3/4 of the memory to start with).
496 497
    }
    else {
Yoni Fogel's avatar
Yoni Fogel committed
498
        bl->did_reserve_memory = false;
499
        bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
500
    }
501
    //printf("Reserved memory=%ld\n", bl->reserved_memory);
502 503 504

    bl->src_db = src_db;
    bl->N = N;
505
    bl->load_lsn = load_lsn;
506 507 508 509 510 511
    if (txn) {
        bl->load_root_xid = txn->ancestor_txnid64;
    }
    else {
        bl->load_root_xid = TXNID_NONE;
    }
512 513 514
    
    ft_loader_init_error_callback(&bl->error_callback);
    ft_loader_init_poll_callback(&bl->poll_callback);
515

Yoni Fogel's avatar
Yoni Fogel committed
516 517
#define MY_CALLOC_N(n,v) CALLOC_N(n,v); if (!v) { int r = get_error_errno(); toku_ft_loader_internal_destroy(bl, true); return r; }
#define SET_TO_MY_STRDUP(lval, s) do { char *v = toku_strdup(s); if (!v) { int r = get_error_errno(); toku_ft_loader_internal_destroy(bl, true); return r; } lval = v; } while (0)
518

519
    MY_CALLOC_N(N, bl->root_xids_that_created);
520
    for (int i=0; i<N; i++) if (brts[i]) bl->root_xids_that_created[i]=brts[i]->ft->h->root_xid_that_created;
521
    MY_CALLOC_N(N, bl->dbs);
522
    for (int i=0; i<N; i++) if (brts[i]) bl->dbs[i]=dbs[i];
523
    MY_CALLOC_N(N, bl->descriptors);
524
    for (int i=0; i<N; i++) if (brts[i]) bl->descriptors[i]=&brts[i]->ft->descriptor;
525
    MY_CALLOC_N(N, bl->new_fnames_in_env);
526
    for (int i=0; i<N; i++) SET_TO_MY_STRDUP(bl->new_fnames_in_env[i], new_fnames_in_env[i]);
527
    MY_CALLOC_N(N, bl->extracted_datasizes); // the calloc_n zeroed everything, which is what we want
528
    MY_CALLOC_N(N, bl->bt_compare_funs);
529
    for (int i=0; i<N; i++) bl->bt_compare_funs[i] = bt_compare_functions[i];
530

531 532 533 534
    MY_CALLOC_N(N, bl->fractal_queues);
    for (int i=0; i<N; i++) bl->fractal_queues[i]=NULL;
    MY_CALLOC_N(N, bl->fractal_threads);
    MY_CALLOC_N(N, bl->fractal_threads_live);
Yoni Fogel's avatar
Yoni Fogel committed
535
    for (int i=0; i<N; i++) bl->fractal_threads_live[i] = false;
536

537
    {
538
        int r = ft_loader_init_file_infos(&bl->file_infos); 
Yoni Fogel's avatar
Yoni Fogel committed
539
        if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
540
    }
541

542
    SET_TO_MY_STRDUP(bl->temp_file_template, temp_file_template);
543 544 545

    bl->n_rows   = 0; 
    bl->progress = 0;
546
    bl->progress_callback_result = 0;
547

548 549
    MY_CALLOC_N(N, bl->rows);
    MY_CALLOC_N(N, bl->fs);
550
    MY_CALLOC_N(N, bl->last_key);
551
    for(int i=0;i<N;i++) {
552
        { 
553
            int r = init_rowset(&bl->rows[i], memory_per_rowset_during_extract(bl)); 
Yoni Fogel's avatar
Yoni Fogel committed
554
            if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; } 
555
        }
556
        init_merge_fileset(&bl->fs[i]);
557
        bl->last_key[i].flags = DB_DBT_REALLOC; // don't really need this, but it's nice to maintain it.  We use ulen to keep track of the realloced space.
558
    }
559

560
    { 
561
        int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl)); 
Yoni Fogel's avatar
Yoni Fogel committed
562
        if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
563 564
    }
    {   int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH); 
Yoni Fogel's avatar
Yoni Fogel committed
565
        if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
566
    }
567
    //printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__);
568
    {
569
        ft_loader_lock_init(bl);
570
    }
571 572 573 574 575 576

    *blp = bl;

    return 0;
}

577
int toku_ft_loader_open (/* out */ FTLOADER *blp,
578
                          CACHETABLE cachetable,
579 580 581 582 583 584
                          generate_row_for_put_func g,
                          DB *src_db,
                          int N, FT_HANDLE brts[/*N*/], DB* dbs[/*N*/],
                          const char *new_fnames_in_env[/*N*/],
                          ft_compare_func bt_compare_functions[/*N*/],
                          const char *temp_file_template,
585
                          LSN load_lsn,
586
                          TOKUTXN txn,
Yoni Fogel's avatar
Yoni Fogel committed
587
                          bool reserve_memory)
588 589 590 591 592 593 594 595 596 597 598 599 600 601
/* Effect: called by DB_ENV->create_loader to create a brt loader.
 * Arguments:
 *   blp                  Return the brt loader here.
 *   g                    The function for generating a row
 *   src_db               The source database.  Needed by g.  May be NULL if that's ok with g.
 *   N                    The number of dbs to create.
 *   dbs                  An array of open databases.  Used by g.  The data will be put in these database.
 *   new_fnames           The file names (these strings are owned by the caller: we make a copy for our own purposes).
 *   temp_file_template   A template suitable for mkstemp()
 * Return value: 0 on success, an error number otherwise.
 */
{
    int result = 0;
    {
602 603 604 605 606 607
        int r = toku_ft_loader_internal_init(blp, cachetable, g, src_db,
                                              N, brts, dbs,
                                              new_fnames_in_env,
                                              bt_compare_functions,
                                              temp_file_template,
                                              load_lsn,
608 609
                                              txn,
                                              reserve_memory);
610
        if (r!=0) result = r;
611
    }
612
    if (result==0) {
613
        FTLOADER bl = *blp;
614
        int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); 
615
        if (r==0) {
Yoni Fogel's avatar
Yoni Fogel committed
616
            bl->extractor_live = true;
617 618
        } else  { 
            result = r;
Yoni Fogel's avatar
Yoni Fogel committed
619
            (void) toku_ft_loader_internal_destroy(bl, true);
620 621
        }
    }
622
    return result;
623
}
624

Yoni Fogel's avatar
Yoni Fogel committed
625
static void ft_loader_set_panic(FTLOADER bl, int error, bool callback) {
626
    int r = ft_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
627
    if (r == 0 && callback)
628
        ft_loader_call_error_function(&bl->error_callback);
629 630
}

Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
631
// One of the tests uses this.
632
FILE *toku_bl_fidx2file (FTLOADER bl, FIDX i) {
633
    toku_mutex_lock(&bl->file_infos.lock);
Dave Wells's avatar
Dave Wells committed
634 635
    invariant(i.idx >=0 && i.idx < bl->file_infos.n_files);
    invariant(bl->file_infos.file_infos[i.idx].is_open);
636
    FILE *result=bl->file_infos.file_infos[i.idx].file;
637
    toku_mutex_unlock(&bl->file_infos.lock);
638
    return result;
639
}
640

641
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, FTLOADER UU(bl))
642 643 644 645 646 647
/* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number.
 * Arguments:
 *   ptr    the data to be writen.
 *   size   the amount of data to be written.
 *   nmemb  the number of units of size to be written.
 *   stream write the data here.
648
 *   bl     passed so we can panic the ft_loader if something goes wrong (recording the error number).
649 650 651
 * Return value: 0 on success, an error number otherwise.
 */
{
652
    size_t r = do_fwrite(ptr, size, nmemb, stream);
653
    if (r!=nmemb) {
654 655
        int e;
        if (os_fwrite_fun)    // if using hook to induce artificial errors (for testing) ...
656
            e = get_maybe_error_errno();        // ... then there is no error in the stream, but there is one in errno
657 658 659 660
        else
            e = ferror(stream);
        invariant(e!=0);
        return e;
661 662 663 664
    }
    return 0;
}

665
static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream)
666 667 668 669 670 671 672 673 674
/* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number.
 * Arguments:
 *  ptr      read data into here.
 *  size     size of data element to be read.
 *  nmemb    number of data elements to be read.
 *  stream   where to read the data from.
 * Return value: 0 on success, an error number otherwise.
 */
{
675 676
    size_t r = fread(ptr, size, nmemb, stream);
    if (r==0) {
677 678 679 680
        if (feof(stream)) return EOF;
        else {
        do_error: ;
            int e = ferror(stream);
681
            // r == 0 && !feof && e == 0, how does this happen? invariant(e!=0);
682 683
            return e;
        }
684
    } else if (r<nmemb) {
685
        goto do_error;
686
    } else {
687
        return 0;
688 689 690
    }
}

691
static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, FTLOADER bl)
692 693 694 695 696 697
{
    int r;
    int dlen = dbt->size;
    if ((r=bl_fwrite(&dlen,     sizeof(dlen), 1,    datafile, bl))) return r;
    if ((r=bl_fwrite(dbt->data, 1,            dlen, datafile, bl))) return r;
    if (dataoff)
698
        *dataoff += dlen + sizeof(dlen);
699 700 701
    return 0;
}

702
static int bl_read_dbt (/*in*/DBT *dbt, FILE *stream)
703 704 705
{
    int len;
    {
706 707 708
        int r;
        if ((r = bl_fread(&len, sizeof(len), 1, stream))) return r;
        invariant(len>=0);
709 710 711
    }
    if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); }
    {
712 713
        int r;
        if ((r = bl_fread(dbt->data, 1, len, stream)))     return r;
714 715 716 717 718
    }
    dbt->size = len;
    return 0;
}

719 720 721
static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int filenum)
{
    int result = 0;
Yoni Fogel's avatar
Yoni Fogel committed
722
    uint32_t len;
723
    {
724 725 726 727 728 729 730
        size_t n_read;
        int r = dbufio_fileset_read(bfs, filenum, &len, sizeof(len), &n_read);
        if (r!=0) {
            result = r;
        } else if (n_read<sizeof(len)) {
            result = TOKUDB_NO_DATA; // must have run out of data prematurely.  This is not EOF, it's a real error.
        }
731 732
    }
    if (result==0) {
733 734 735
        if (dbt->ulen<len) {
            void * data = toku_realloc(dbt->data, len);
            if (data==NULL) {
736
                result = get_error_errno();
737 738 739 740 741
            } else {
                dbt->ulen=len;
                dbt->data=data;
            }
        }
742 743
    }
    if (result==0) {
744 745 746 747 748 749 750 751 752
        size_t n_read;
        int r = dbufio_fileset_read(bfs, filenum, dbt->data, len, &n_read);
        if (r!=0) {
            result = r;
        } else if (n_read<len) {
            result = TOKUDB_NO_DATA; // must have run out of data prematurely.  This is not EOF, it's a real error.
        } else {
            dbt->size = len;
        }
753 754 755 756 757
    }
    return result;
}


Yoni Fogel's avatar
Yoni Fogel committed
758
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *dataoff, FTLOADER bl)
759 760 761 762 763
/* Effect: Given a key and a val (both DBTs), write them to a file.  Increment *dataoff so that it's up to date.
 * Arguments:
 *   key, val   write these.
 *   data       the file to write them to
 *   dataoff    a pointer to a counter that keeps track of the amount of data written so far.
764
 *   bl         the ft_loader (passed so we can panic if needed).
765 766 767
 * Return value: 0 on success, an error number otherwise.
 */
{
768 769
    //int klen = key->size;
    //int vlen = val->size;
770
    int r;
771
    // we have a chance to handle the errors because when we close we can delete all the files.
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
772 773
    if ((r=bl_write_dbt(key, dataf, dataoff, bl))) return r;
    if ((r=bl_write_dbt(val, dataf, dataoff, bl))) return r;
774
    toku_mutex_lock(&bl->file_infos.lock);
775
    bl->file_infos.file_infos[data.idx].n_rows++;
776
    toku_mutex_unlock(&bl->file_infos.lock);
777 778 779
    return 0;
}

780 781 782 783 784 785 786 787 788 789 790
int loader_read_row (FILE *f, DBT *key, DBT *val)
/* Effect: Read a key value pair from a file.  The DBTs must have DB_DBT_REALLOC set.
 * Arguments:
 *    f         where to read it from.
 *    key, val  read it into these.
 *    bl        passed so we can panic if needed.
 * Return value: 0 on success, an error number otherwise.
 * Requires:   The DBTs must have DB_DBT_REALLOC
 */
{
    {
791 792
        int r = bl_read_dbt(key, f);
        if (r!=0) return r;
793 794
    }
    {
795 796
        int r = bl_read_dbt(val, f);
        if (r!=0) return r;
797 798 799 800 801
    }
    return 0;
}

static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *key, DBT *val)
802 803 804 805 806 807 808 809 810 811
/* Effect: Read a key value pair from a file.  The DBTs must have DB_DBT_REALLOC set.
 * Arguments:
 *    f         where to read it from.
 *    key, val  read it into these.
 *    bl        passed so we can panic if needed.
 * Return value: 0 on success, an error number otherwise.
 * Requires:   The DBTs must have DB_DBT_REALLOC
 */
{
    {
812 813
        int r = bl_read_dbt_from_dbufio(key, bfs, filenum);
        if (r!=0) return r;
814 815
    }
    {
816 817
        int r = bl_read_dbt_from_dbufio(val, bfs, filenum);
        if (r!=0) return r;
818 819 820 821 822
    }
    return 0;
}


823
int init_rowset (struct rowset *rows, uint64_t memory_budget) 
824 825
/* Effect: Initialize a collection of rows to be empty. */
{
826 827
    int result = 0;

828 829
    rows->memory_budget = memory_budget;

830 831 832
    rows->rows = NULL;
    rows->data = NULL;

833 834 835
    rows->n_rows = 0;
    rows->n_rows_limit = 100;
    MALLOC_N(rows->n_rows_limit, rows->rows);
836
    if (rows->rows == NULL)
837
        result = get_error_errno();
838
    rows->n_bytes = 0;
839 840
    rows->n_bytes_limit = (size_factor==1) ? 1024*size_factor*16 : memory_budget;
    //printf("%s:%d n_bytes_limit=%ld (size_factor based limit=%d)\n", __FILE__, __LINE__, rows->n_bytes_limit, 1024*size_factor*16);
841
    rows->data = (char *) toku_malloc(rows->n_bytes_limit);
842
    if (rows->rows==NULL || rows->data==NULL) {
843
        if (result == 0)
844
            result = get_error_errno();
845 846 847 848
        toku_free(rows->rows);
        toku_free(rows->data);
        rows->rows = NULL;
        rows->data = NULL;
849
    }
850
    return result;
851
}
852

853 854 855 856
static void zero_rowset (struct rowset *rows) {
    memset(rows, 0, sizeof(*rows));
}

857
void destroy_rowset (struct rowset *rows) {
858 859 860 861 862
    if ( rows ) {
        toku_free(rows->data);
        toku_free(rows->rows);
        zero_rowset(rows);
    }
863
}
864

865 866 867
static int row_wont_fit (struct rowset *rows, size_t size)
/* Effect: Return nonzero if adding a row of size SIZE would be too big (bigger than the buffer limit) */ 
{
868 869
    // Account for the memory used by the data and also the row structures.
    size_t memory_in_use = (rows->n_rows*sizeof(struct row)
870
                            + rows->n_bytes);
871
    return (rows->memory_budget <  memory_in_use + size);
872
}
873

874
int add_row (struct rowset *rows, DBT *key, DBT *val)
875 876
/* Effect: add a row to a collection. */
{
877
    int result = 0;
878
    if (rows->n_rows >= rows->n_rows_limit) {
879 880
        struct row *old_rows = rows->rows;
        size_t old_n_rows_limit = rows->n_rows_limit;
881 882
        rows->n_rows_limit *= 2;
        REALLOC_N(rows->n_rows_limit, rows->rows);
883
        if (rows->rows == NULL) {
884
            result = get_error_errno();
885 886 887 888
            rows->rows = old_rows;
            rows->n_rows_limit = old_n_rows_limit;
            return result;
        }
889 890 891
    }
    size_t off      = rows->n_bytes;
    size_t next_off = off + key->size + val->size;
892 893 894 895

    struct row newrow; 
    memset(&newrow, 0, sizeof newrow); newrow.off = off; newrow.klen = key->size; newrow.vlen = val->size;

896 897
    rows->rows[rows->n_rows++] = newrow;
    if (next_off > rows->n_bytes_limit) {
898
        size_t old_n_bytes_limit = rows->n_bytes_limit;
899 900 901 902
        while (next_off > rows->n_bytes_limit) {
            rows->n_bytes_limit = rows->n_bytes_limit*2; 
        }
        invariant(next_off <= rows->n_bytes_limit);
903
        char *old_data = rows->data;
904
        REALLOC_N(rows->n_bytes_limit, rows->data);
905
        if (rows->data == NULL) {
906
            result = get_error_errno();
907
            rows->data = old_data;
908
            rows->n_bytes_limit = old_n_bytes_limit;
909 910
            return result;
        }
911 912 913 914
    }
    memcpy(rows->data+off,           key->data, key->size);
    memcpy(rows->data+off+key->size, val->data, val->size);
    rows->n_bytes = next_off;
915
    return result;
916 917
}

918
static int process_primary_rows (FTLOADER bl, struct rowset *primary_rowset);
919

920
static int finish_primary_rows_internal (FTLOADER bl)
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
921 922 923
// now we have been asked to finish up.
// Be sure to destroy the rowsets.
{
924
    int *MALLOC_N(bl->N, ra);
925
    if (ra==NULL) return get_error_errno();
926

927 928
#if defined(HAVE_CILK)
    #pragma cilk grainsize = 1
929 930
#endif

931
    cilk_for (int i = 0; i < bl->N; i++) {
932 933 934
        //printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows);
        ra[i] = sort_and_write_rows(bl->rows[i], &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]);
        zero_rowset(&bl->rows[i]);
935
    }
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
936
    // Implicit cilk_sync after that cilk_for loop.
937

938
    // accept any of the error codes (in this case, the last one).
939 940 941 942 943
    int r = 0;
    for (int i = 0; i < bl->N; i++)
        if (ra[i] != 0)
            r = ra[i];

944
    toku_free(ra);
945 946 947
    return r;
}

948
static int finish_primary_rows (FTLOADER bl) {
949 950 951 952
    return           finish_primary_rows_internal (bl);
}

static void* extractor_thread (void *blv) {
953
    FTLOADER bl = (FTLOADER)blv;
954
    int r = 0;
955
    while (1) {
956 957 958 959 960 961 962 963 964 965 966 967 968
        void *item;
        {
            int rq = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
            if (rq==EOF) break;
            invariant(rq==0); // other errors are arbitrarily bad.
        }
        struct rowset *primary_rowset = (struct rowset *)item;

        //printf("%s:%d extractor got %ld rows\n", __FILE__, __LINE__, primary_rowset.n_rows);

        // Now we have some rows to output
        {
            r = process_primary_rows(bl, primary_rowset);
969
            if (r)
Yoni Fogel's avatar
Yoni Fogel committed
970
                ft_loader_set_panic(bl, r, false);
971
        }
972 973 974
    }

    //printf("%s:%d extractor finishing\n", __FILE__, __LINE__);
975
    if (r == 0) {
976 977
        r = finish_primary_rows(bl); 
        if (r) 
Yoni Fogel's avatar
Yoni Fogel committed
978
            ft_loader_set_panic(bl, r, false);
979
        
980 981
    }
    return NULL;
982 983
}

984
static void enqueue_for_extraction (FTLOADER bl) {
985
    //printf("%s:%d enqueing %ld items\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
986
    struct rowset *XMALLOC(enqueue_me);
987 988 989
    *enqueue_me = bl->primary_rowset;
    zero_rowset(&bl->primary_rowset);
    int r = queue_enq(bl->primary_rowset_queue, (void*)enqueue_me, 1, NULL);
990
    resource_assert_zero(r); 
991 992
}

993
static int loader_do_put(FTLOADER bl,
994 995 996
                         DBT *pkey,
                         DBT *pval)
{
997 998 999
    int result;
    result = add_row(&bl->primary_rowset, pkey, pval);
    if (result == 0 && row_wont_fit(&bl->primary_rowset, 0)) {
1000 1001 1002 1003
        // queue the rows for further processing by the extractor thread.
        //printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
        enqueue_for_extraction(bl);
        {
1004
            int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl)); 
1005
            // bl->primary_rowset will get destroyed by toku_ft_loader_abort
1006 1007
            if (r != 0) 
                result = r;
1008
        }
1009
    }
1010
    return result;
1011 1012
}

1013
static int 
1014
finish_extractor (FTLOADER bl) {
1015 1016
    //printf("%s:%d now finishing extraction\n", __FILE__, __LINE__);

1017 1018
    int rval;

1019
    if (bl->primary_rowset.n_rows>0) {
1020
        enqueue_for_extraction(bl);
1021
    } else {
1022
        destroy_rowset(&bl->primary_rowset);
1023 1024 1025
    }
    //printf("%s:%d please finish extraction\n", __FILE__, __LINE__);
    {
1026 1027
        int r = queue_eof(bl->primary_rowset_queue);
        invariant(r==0);
1028 1029 1030
    }
    //printf("%s:%d joining\n", __FILE__, __LINE__);
    {
1031 1032
        void *toku_pthread_retval;
        int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval);
1033 1034
        resource_assert_zero(r); 
        invariant(toku_pthread_retval == NULL);
Yoni Fogel's avatar
Yoni Fogel committed
1035
        bl->extractor_live = false;
1036 1037
    }
    {
1038 1039
        int r = queue_destroy(bl->primary_rowset_queue);
        invariant(r==0);
1040
    }
1041

1042
    rval = ft_loader_fi_close_all(&bl->file_infos);
1043

1044
   //printf("%s:%d joined\n", __FILE__, __LINE__);
1045
    return rval;
1046 1047 1048
}

static const DBT zero_dbt = {0,0,0,0};
1049

Yoni Fogel's avatar
Yoni Fogel committed
1050
static DBT make_dbt (void *data, uint32_t size) {
1051 1052 1053 1054 1055 1056
    DBT result = zero_dbt;
    result.data = data;
    result.size = size;
    return result;
}

1057
#define inc_error_count() error_count++
1058

1059
static TXNID leafentry_xid(FTLOADER bl, int which_db) {
1060 1061 1062 1063 1064 1065
    TXNID le_xid = TXNID_NONE;
    if (bl->root_xids_that_created && bl->load_root_xid != bl->root_xids_that_created[which_db])
        le_xid = bl->load_root_xid;
    return le_xid;
}

1066
size_t ft_loader_leafentry_size(size_t key_size, size_t val_size, TXNID xid) {
1067 1068 1069 1070 1071 1072 1073 1074
    size_t s = 0;
    if (xid == TXNID_NONE)
        s = LE_CLEAN_MEMSIZE(key_size, val_size);
    else
        s = LE_MVCC_COMMITTED_MEMSIZE(key_size, val_size);
    return s;
}

1075
static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_rowset)
1076 1077 1078 1079 1080
// process the rows in primary_rowset, and then destroy the rowset.
// if FLUSH is true then write all the buffered rows out.
// if primary_rowset is NULL then treat it as empty.
{
    int error_count = 0;
1081
    // cilk++ bug int error_codes[bl-N]; 
1082
    int *XMALLOC_N(bl->N, error_codes);
1083 1084

    // Do parallelize this loop with cilk_grainsize = 1 so that every iteration will run in parallel.
1085 1086
#if defined(HAVE_CILK)
    #pragma cilk grainsize = 1
1087 1088
#endif
    cilk_for (int i = 0; i < bl->N; i++) {
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116
        unsigned int klimit,vlimit; // maximum row sizes.
        toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit);

        error_codes[i] = 0;
        struct rowset *rows = &(bl->rows[i]);
        struct merge_fileset *fs = &(bl->fs[i]);
        ft_compare_func compare = bl->bt_compare_funs[i];

        DBT skey = zero_dbt;
        skey.flags = DB_DBT_REALLOC;
        DBT sval=skey;

        // Don't parallelize this loop, or we have to lock access to add_row() which would be a lot of overehad.
        // Also this way we can reuse the DB_DBT_REALLOC'd value inside skey and sval without a race.
        for (size_t prownum=0; prownum<primary_rowset->n_rows; prownum++) {
            if (error_count) break;

            struct row *prow = &primary_rowset->rows[prownum];
            DBT pkey = zero_dbt;
            DBT pval = zero_dbt;
            pkey.data = primary_rowset->data + prow->off;
            pkey.size = prow->klen;
            pval.data = primary_rowset->data + prow->off + prow->klen;
            pval.size = prow->vlen;
        
            {
                int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval);
                if (r != 0) {
1117
                    error_codes[i] = r;
1118
                    inc_error_count();
1119 1120
                    break;
                }
1121 1122 1123
                if (skey.size > klimit) {
                    error_codes[i] = EINVAL;
                    fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", skey.size, klimit);
1124
                    inc_error_count();
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
                    break;
                }
                if (sval.size > vlimit) {
                    error_codes[i] = EINVAL;
                    fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", sval.size, vlimit);
                    inc_error_count();
                    break;
                }
            }

            bl->extracted_datasizes[i] += ft_loader_leafentry_size(skey.size, sval.size, leafentry_xid(bl, i));

            if (row_wont_fit(rows, skey.size + sval.size)) {
                //printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
                int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows.  If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
                // If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
                init_rowset(rows, memory_per_rowset_during_extract(bl)); // we passed the contents of rows to sort_and_write_rows.
                if (r != 0) {
                    error_codes[i] = r;
                    inc_error_count();
                    break;
                }
            }
            int r = add_row(rows, &skey, &sval);
1149 1150
            if (r != 0) {
                error_codes[i] = r;
1151
                inc_error_count();
1152 1153
                break;
            }
1154

1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
            //flags==0 means generate_row_for_put callback changed it
            //(and freed any memory necessary to do so) so that values are now stored
            //in temporary memory that does not need to be freed.  We need to continue
            //using DB_DBT_REALLOC however.
            if (skey.flags == 0) {
                toku_init_dbt(&skey);
                skey.flags = DB_DBT_REALLOC;
            }
            if (sval.flags == 0) {
                toku_init_dbt(&sval);
                sval.flags = DB_DBT_REALLOC;
            }
        }

        {
            if (skey.flags) {
1171 1172 1173 1174 1175
                toku_free(skey.data); skey.data = NULL;
            }
            if (sval.flags) {
                toku_free(sval.data); sval.data = NULL;
            }
1176
        }
1177 1178 1179 1180 1181
    }
    
    destroy_rowset(primary_rowset);
    toku_free(primary_rowset);
    int r = 0;
1182
    if (error_count > 0) {
1183 1184 1185
        for (int i=0; i<bl->N; i++) {
            if (error_codes[i]) r = error_codes[i];
        }
1186
        invariant(r); // found the error 
1187 1188 1189 1190 1191
    }
    toku_free(error_codes);
    return r;
}

1192
static int process_primary_rows (FTLOADER bl, struct rowset *primary_rowset) {
1193
    int r = process_primary_rows_internal (bl, primary_rowset);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1194
    return r;
1195 1196
}
 
1197
int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val)
1198 1199 1200 1201
/* Effect: Put a key-value pair into the brt loader.  Called by DB_LOADER->put().
 * Return value: 0 on success, an error number otherwise.
 */
{
1202
    if (ft_loader_get_error(&bl->error_callback)) 
1203 1204 1205 1206 1207 1208
        return EINVAL; // previous panic
    bl->n_rows++;
//    return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
    return loader_do_put(bl, key, val);
}

Yoni Fogel's avatar
Yoni Fogel committed
1209
void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows) {
1210 1211 1212
    bl->n_rows = n_rows;
}

Yoni Fogel's avatar
Yoni Fogel committed
1213
uint64_t toku_ft_loader_get_n_rows(FTLOADER bl) {
1214 1215 1216 1217
    return bl->n_rows;
}

int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
1218 1219 1220 1221
                           int which_db, DB *dest_db, ft_compare_func compare,
                           
                           FTLOADER bl,
                           struct rowset *rowset)
1222
/* Effect: Given two arrays of rows, a and b, merge them using the comparison function, and write them into dest.
1223
 *   This function is suitable for use in a mergesort.
1224
 *   If a pair of duplicate keys is ever noticed, then call the error_callback function (if it exists), and return DB_KEYEXIST.
1225 1226 1227 1228 1229 1230 1231 1232 1233
 * Arguments:
 *   dest    write the rows here
 *   a,b     the rows being merged
 *   an,bn   the lenth of a and b respectively.
 *   dest_db We need the dest_db to run the comparison function.
 *   compare We need the compare function for the dest_db.
 */
{
    while (an>0 && bn>0) {
1234 1235
        DBT akey; memset(&akey, 0, sizeof akey); akey.data=rowset->data+a->off; akey.size=a->klen;
        DBT bkey; memset(&bkey, 0, sizeof bkey); bkey.data=rowset->data+b->off; bkey.size=b->klen;
1236

1237 1238
        int compare_result = compare(dest_db, &akey, &bkey);
        if (compare_result==0) {
1239 1240
            if (bl->error_callback.error_callback) {
                DBT aval; memset(&aval, 0, sizeof aval); aval.data=rowset->data + a->off + a->klen; aval.size = a->vlen;
1241
                ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
            }
            return DB_KEYEXIST;
        } else if (compare_result<0) {
            // a is smaller
            *dest = *a;
            dest++; a++; an--;
        } else {
            *dest = *b;
            dest++; b++; bn--;
        }
1252 1253
    }
    while (an>0) {
1254 1255
        *dest = *a;
        dest++; a++; an--;
1256 1257
    }
    while (bn>0) {
1258 1259
        *dest = *b;
        dest++; b++; bn--;
1260
    }
1261
    return 0;
1262 1263
}

1264
static int binary_search (int *location,
1265 1266 1267 1268 1269 1270
                          const DBT *key,
                          struct row a[/*an*/], int an,
                          int abefore,
                          int which_db, DB *dest_db, ft_compare_func compare,
                          FTLOADER bl,
                          struct rowset *rowset)
1271 1272 1273 1274 1275 1276 1277
// Given a sorted array of rows a, and a dbt key, find the first row in a that is > key.
// If no such row exists, then consider the result to be equal to an.
// On success store abefore+the index into *location
// Return 0 on success.
// Return DB_KEYEXIST if we find a row that is equal to key.
{
    if (an==0) {
1278 1279
        *location = abefore;
        return 0;
1280
    } else {
1281 1282 1283 1284 1285
        int a2 = an/2;
        DBT akey = make_dbt(rowset->data+a[a2].off,  a[a2].klen);
        int compare_result = compare(dest_db, key, &akey);
        if (compare_result==0) {
            if (bl->error_callback.error_callback) {
1286
                DBT aval = make_dbt(rowset->data + a[a2].off + a[a2].klen,  a[a2].vlen);
1287
                ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
            }
            return DB_KEYEXIST;
        } else if (compare_result<0) {
            // key is before a2
            if (an==1) {
                *location = abefore;
                return 0;
            } else {
                return binary_search(location, key,
                                     a,    a2,
                                     abefore,
                                     which_db, dest_db, compare, bl, rowset);
            }
        } else {
            // key is after a2
            if (an==1) {
                *location = abefore + 1;
                return 0;
            } else {
                return binary_search(location, key,
                                     a+a2, an-a2,
                                     abefore+a2,
                                     which_db, dest_db, compare, bl, rowset);
            }
        }
    }
}
                   
1316 1317 1318 1319

#define SWAP(typ,x,y) { typ tmp = x; x=y; y=tmp; }

static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
1320 1321 1322
                             int which_db, DB *dest_db, ft_compare_func compare,
                             FTLOADER bl,
                             struct rowset *rowset)
1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
/* Effect: Given two sorted arrays of rows, a and b, merge them using the comparison function, and write them into dest.
 *   This function is a cilk function with parallelism, and is suitable for use in a mergesort.
 * Arguments:
 *   dest    write the rows here
 *   a,b     the rows being merged
 *   an,bn   the lenth of a and b respectively.
 *   dest_db We need the dest_db to run the comparison function.
 *   compare We need the compare function for the dest_db.
 */
{
    if (an + bn < 10000) {
1334
        return merge_row_arrays_base(dest, a, an, b, bn, which_db, dest_db, compare, bl, rowset);
1335 1336
    }
    if (an < bn) {
1337 1338
        SWAP(struct row *,a, b)
        SWAP(int         ,an,bn)
1339 1340 1341 1342 1343 1344
    }
    // an >= bn
    int a2 = an/2;
    DBT akey = make_dbt(rowset->data+a[a2].off, a[a2].klen);
    int b2 = 0; // initialize to zero so we can add the answer in.
    {
1345 1346
        int r = binary_search(&b2, &akey, b, bn, 0, which_db, dest_db, compare, bl, rowset);
        if (r!=0) return r; // for example if we found a duplicate, called the error_callback, and now we return an error code.
1347 1348 1349 1350 1351 1352 1353 1354 1355
    }
    int ra, rb;
    ra = cilk_spawn merge_row_arrays(dest,       a,    a2,    b,    b2,    which_db, dest_db, compare, bl, rowset);
    rb =            merge_row_arrays(dest+a2+b2, a+a2, an-a2, b+b2, bn-b2, which_db, dest_db, compare, bl, rowset);
    cilk_sync;
    if (ra!=0) return ra;
    else       return rb;
}

1356
int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func compare, FTLOADER bl, struct rowset *rowset)
1357 1358 1359 1360 1361 1362 1363 1364
/* Sort an array of rows (using mergesort).
 * Arguments:
 *   rows   sort this array of rows.
 *   n      the length of the array.
 *   dest_db  used by the comparison function.
 *   compare  the compare function
 */
{
1365
    if (n<=1) return 0; // base case is sorted
1366
    int mid = n/2;
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
    int r1, r2;
    r1 = cilk_spawn mergesort_row_array (rows,     mid,   which_db, dest_db, compare, bl, rowset);

    // Don't spawn this one explicitly
    r2 =            mergesort_row_array (rows+mid, n-mid, which_db, dest_db, compare, bl, rowset);

    cilk_sync;
    if (r1!=0) return r1;
    if (r2!=0) return r2;

1377
    struct row *MALLOC_N(n, tmp); 
1378
    if (tmp == NULL) return get_error_errno();
1379
    {
1380 1381 1382 1383 1384
        int r = merge_row_arrays(tmp, rows, mid, rows+mid, n-mid, which_db, dest_db, compare, bl, rowset);
        if (r!=0) {
            toku_free(tmp);
            return r;
        }
1385
    }
1386 1387
    memcpy(rows, tmp, sizeof(*tmp)*n);
    toku_free(tmp);
1388
    return 0;
1389
}
1390 1391

// C function for testing mergesort_row_array 
1392
int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func compare, FTLOADER bl, struct rowset *rowset) {
1393
    return mergesort_row_array (rows, n, which_db, dest_db, compare, bl, rowset);
1394
}
1395

1396
static int sort_rows (struct rowset *rows, int which_db, DB *dest_db, ft_compare_func compare,
1397
                      FTLOADER bl)
1398
/* Effect: Sort a collection of rows.
1399 1400
 * If any duplicates are found, then call the error_callback function and return non zero.
 * Otherwise return 0.
1401 1402 1403
 * Arguments:
 *   rowset    the */
{
1404
    return mergesort_row_array(rows->rows, rows->n_rows, which_db, dest_db, compare, bl, rows);
1405 1406 1407 1408 1409
}

/* filesets Maintain a collection of files.  Typically these files are each individually sorted, and we will merge them.
 * These files have two parts, one is for the data rows, and the other is a collection of offsets so we an more easily parallelize the manipulation (e.g., by allowing us to find the offset of the ith row quickly). */

1410
void init_merge_fileset (struct merge_fileset *fs)
1411 1412
/* Effect: Initialize a fileset */ 
{
Yoni Fogel's avatar
Yoni Fogel committed
1413
    fs->have_sorted_output = false;
1414 1415 1416 1417
    fs->sorted_output      = FIDX_NULL;
    fs->prev_key           = zero_dbt;
    fs->prev_key.flags     = DB_DBT_REALLOC;

1418 1419
    fs->n_temp_files = 0;
    fs->n_temp_files_limit = 0;
1420
    fs->data_fidxs = NULL;
1421 1422
}

1423 1424 1425
void destroy_merge_fileset (struct merge_fileset *fs)
/* Effect: Destroy a fileset. */
{
1426
    if ( fs ) {
1427
        toku_destroy_dbt(&fs->prev_key);
1428 1429 1430 1431 1432
        fs->n_temp_files = 0;
        fs->n_temp_files_limit = 0;
        toku_free(fs->data_fidxs);
        fs->data_fidxs = NULL;
    }
1433 1434 1435
}


1436
static int extend_fileset (FTLOADER bl, struct merge_fileset *fs, FIDX*ffile)
1437 1438
/* Effect: Add two files (one for data and one for idx) to the fileset.
 * Arguments:
1439
 *   bl   the ft_loader (needed to panic if anything goes wrong, and also to get the temp_file_template.
1440 1441 1442 1443 1444
 *   fs   the fileset
 *   ffile  the data file (which will be open)
 *   fidx   the index file (which will be open)
 */
{
1445
    FIDX sfile;
1446
    int r;
1447
    r = ft_loader_open_temp_file(bl, &sfile); if (r!=0) return r;
1448 1449

    if (fs->n_temp_files+1 > fs->n_temp_files_limit) {
1450 1451
        fs->n_temp_files_limit = (fs->n_temp_files+1)*2;
        XREALLOC_N(fs->n_temp_files_limit, fs->data_fidxs);
1452
    }
1453
    fs->data_fidxs[fs->n_temp_files] = sfile;
1454 1455 1456 1457 1458
    fs->n_temp_files++;

    *ffile = sfile;
    return 0;
}
1459

1460
// RFP maybe this should be buried in the ft_loader struct
1461
// This was previously a cilk lock, but now we need it to work for pthreads too.
1462
static toku_mutex_t update_progress_lock = TOKU_MUTEX_INITIALIZER;
1463

1464
static int update_progress (int N,
1465 1466
                            FTLOADER bl,
                            const char *UU(message))
1467
{
1468 1469
    // Need a lock here because of cilk and also the various pthreads.
    // Must protect the increment and the call to the poll_function.
1470
    toku_mutex_lock(&update_progress_lock);
1471
    bl->progress+=N;
1472

1473 1474
    int result;
    if (bl->progress_callback_result == 0) {
1475 1476 1477 1478 1479
        //printf(" %20s: %d ", message, bl->progress);
        result = ft_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX);
        if (result!=0) {
            bl->progress_callback_result = result;
        }
1480
    } else {
1481
        result = bl->progress_callback_result;
1482
    }
1483
    toku_mutex_unlock(&update_progress_lock);
1484
    return result;
1485 1486
}

1487

1488
static int write_rowset_to_file (FTLOADER bl, FIDX sfile, const struct rowset rows) {
1489 1490
    FILE *sstream = toku_bl_fidx2file(bl, sfile);
    for (size_t i=0; i<rows.n_rows; i++) {
1491 1492 1493
        DBT skey = make_dbt(rows.data + rows.rows[i].off,                     rows.rows[i].klen);
        DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
        
Yoni Fogel's avatar
Yoni Fogel committed
1494
        uint64_t soffset=0; // don't really need this.
1495 1496
        int r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
        if (r != 0) return r;
1497 1498 1499 1500 1501
    }
    return 0;
}


1502
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare)
1503
/* Effect: Given a rowset, sort it and write it to a temporary file.
1504 1505
 * Note:  The loader maintains for each index the most recently written-to file, as well as the DBT for the last key written into that file.
 *   If this rowset is sorted and all greater than that dbt, then we append to the file (skipping the sort, and reducing the number of temporary files).
1506 1507 1508
 * Arguments:
 *   rows    the rowset
 *   fs      the fileset into which the sorted data will be added
1509
 *   bl      the ft_loader
1510 1511 1512
 *   dest_db the DB, needed for the comparison function.
 *   compare The comparison function.
 * Returns 0 on success, otherwise an error number.
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1513
 * Destroy the rowset after finishing it.
1514
 * Note: There is no sense in trying to calculate progress by this function since it's done concurrently with the loader->put operation.
Yoni Fogel's avatar
Yoni Fogel committed
1515
 * Note first time called: invariant: fs->have_sorted_output == false
1516 1517
 */
{
1518
    //printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
1519

1520
    // TODO: erase the files, and deal with all the cleanup on error paths
1521
    //printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1522
    //bl_time_t before_sort = bl_time_now();
1523

1524 1525 1526 1527 1528 1529 1530 1531 1532
    int result;
    if (rows.n_rows == 0) {
        result = 0;
    } else {
        result = sort_rows(&rows, which_db, dest_db, compare, bl);

        //bl_time_t after_sort = bl_time_now();

        if (result == 0) {
1533 1534 1535
            DBT min_rowset_key = make_dbt(rows.data+rows.rows[0].off, rows.rows[0].klen);
            if (fs->have_sorted_output && compare(dest_db, &fs->prev_key, &min_rowset_key) < 0) {
                // write everything to the same output if the max key in the temp file (prev_key) is < min of the sorted rowset
1536
                result = write_rowset_to_file(bl, fs->sorted_output, rows);
1537 1538 1539 1540
                if (result == 0) {
                    // set the max key in the temp file to the max key in the sorted rowset
                    result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
                }
1541
            } else {
1542 1543
                // write the sorted rowset into a new temp file
                if (fs->have_sorted_output) {
Yoni Fogel's avatar
Yoni Fogel committed
1544 1545
                    fs->have_sorted_output = false;
                    result = ft_loader_fi_close(&bl->file_infos, fs->sorted_output, true);
1546
                }
1547
                if (result == 0) {
1548 1549
                    FIDX sfile = FIDX_NULL;
                    result = extend_fileset(bl, fs, &sfile);
1550 1551 1552
                    if (result == 0) {
                        result = write_rowset_to_file(bl, sfile, rows);
                        if (result == 0) {
Yoni Fogel's avatar
Yoni Fogel committed
1553
                            fs->have_sorted_output = true; fs->sorted_output = sfile;
1554
                            // set the max key in the temp file to the max key in the sorted rowset
1555 1556 1557
                            result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
                        }
                    }
1558
                }
Yoni Fogel's avatar
Yoni Fogel committed
1559
                // Note: if result == 0 then invariant fs->have_sorted_output == true
1560 1561
            }
        }
1562
    }
1563

Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1564
    destroy_rowset(&rows);
1565

Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1566
    //bl_time_t after_write = bl_time_now();
1567 1568
    
    return result;
1569
}
1570 1571

// C function for testing sort_and_write_rows
1572
int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare) {
1573
    return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare);
1574
}
1575

Yoni Fogel's avatar
Yoni Fogel committed
1576
int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation)
1577
/* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to an output.  All the files remain open after the merge.
1578
 *   This merge is performed in one pass, so don't pass too many files in.  If you need a tree of merges do it elsewhere.
1579
 *   If TO_Q is true then we write rowsets into queue Q.  Otherwise we write into dest_data.
1580 1581 1582
 * Modifies:  May modify the arrays of files (but if modified, it must be a permutation so the caller can use that array to close everything.)
 * Requires: The number of sources is at least one, and each of the input files must have at least one row in it.
 * Arguments:
1583
 *   to_q         boolean indicating that output is queue (true) or a file (false)
1584
 *   dest_data    where to write the sorted data
1585
 *   q            where to write the sorted data
1586 1587
 *   n_sources    how many source files.
 *   srcs_data    the array of source data files.
1588
 *   bl           the ft_loader.
1589 1590
 *   dest_db      the destination DB (used in the comparison function).
 * Return value: 0 on success, otherwise an error number.
1591
 * The fidxs are not closed by this function.
1592 1593
 */
{
1594 1595
    int result = 0;

Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1596 1597
    FILE *dest_stream = to_q ? NULL : toku_bl_fidx2file(bl, dest_data);

1598
    //printf(" merge_some_files progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
1599 1600
    DBT keys[n_sources];
    DBT vals[n_sources];
Yoni Fogel's avatar
Yoni Fogel committed
1601
    uint64_t dataoff[n_sources];
1602
    DBT zero = zero_dbt;  zero.flags=DB_DBT_REALLOC;
1603

1604
    for (int i=0; i<n_sources; i++) {
1605
        keys[i] = vals[i] = zero; // fill these all in with zero so we can delete stuff more reliably.
1606
    }
1607

1608
    pqueue_t      *pq = NULL;
1609 1610
    pqueue_node_t *MALLOC_N(n_sources, pq_nodes); // freed in cleanup
    if (pq_nodes == NULL) { result = get_error_errno(); }
1611

1612
    if (result==0) {
1613 1614
        int r = pqueue_init(&pq, n_sources, which_db, dest_db, compare, &bl->error_callback);
        if (r!=0) result = r; 
1615 1616
    }

Yoni Fogel's avatar
Yoni Fogel committed
1617
    uint64_t n_rows = 0;
1618
    if (result==0) {
1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639
        // load pqueue with first value from each source
        for (int i=0; i<n_sources; i++) {
            int r = loader_read_row_from_dbufio(bfs, i, &keys[i], &vals[i]);
            if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
            if (r!=0) {
                result = r;
                break;
            }

            pq_nodes[i].key = &keys[i];
            pq_nodes[i].val = &vals[i];
            pq_nodes[i].i   = i;
            r = pqueue_insert(pq, &pq_nodes[i]);
            if (r!=0) {
                result = r;
                // path tested by loader-dup-test5.tdbrun
                // printf("%s:%d returning\n", __FILE__, __LINE__);
                break;
            }

            dataoff[i] = 0;
1640
            toku_mutex_lock(&bl->file_infos.lock);
1641 1642 1643
            n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows;
            toku_mutex_unlock(&bl->file_infos.lock);
        }
1644
    }
Yoni Fogel's avatar
Yoni Fogel committed
1645
    uint64_t n_rows_done = 0;
1646 1647

    struct rowset *output_rowset = NULL;
1648
    if (result==0 && to_q) {
1649 1650
        XMALLOC(output_rowset); // freed in cleanup
        int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
1651
        if (r!=0) result = r;
1652 1653
    }
    
1654
    //printf(" n_rows=%ld\n", n_rows);
1655
    while (result==0 && pqueue_size(pq)>0) {
1656 1657 1658 1659
        int mini;
        {
            // get the minimum 
            pqueue_node_t *node;
1660
            int r = pqueue_pop(pq, &node);
1661
            if (r!=0) {
1662 1663 1664
                result = r;
                invariant(0);
                break;
1665 1666 1667
            }
            mini = node->i;
        }
1668 1669 1670 1671 1672 1673 1674 1675 1676
        if (to_q) {
            if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) {
                {
                    int r = queue_enq(q, (void*)output_rowset, 1, NULL);
                    if (r!=0) {
                        result = r;
                        break;
                    }
                }
1677
                XMALLOC(output_rowset); // freed in cleanup
1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693
                {
                    int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
                    if (r!=0) {        
                        result = r;
                        break;
                    }
                }
            }
            {
                int r = add_row(output_rowset, &keys[mini], &vals[mini]);
                if (r!=0) {
                    result = r;
                    break;
                }
            }
        } else {
1694
            // write it to the dest file
1695
            int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
1696
            if (r!=0) {
1697
                result = r;
1698 1699
                break;
            }
1700
        }
1701
        
1702
        {
1703
            // read next row from file that just sourced min value 
1704 1705 1706
            int r = loader_read_row_from_dbufio(bfs, mini, &keys[mini], &vals[mini]);
            if (r!=0) {
                if (r==EOF) {
1707
                    // on feof, queue size permanently smaller
1708 1709 1710
                    toku_free(keys[mini].data);  keys[mini].data = NULL;
                    toku_free(vals[mini].data);  vals[mini].data = NULL;
                } else {
1711
                    fprintf(stderr, "%s:%d r=%d errno=%d bfs=%p mini=%d\n", __FILE__, __LINE__, r, get_maybe_error_errno(), bfs, mini);
1712
                    dbufio_print(bfs);
1713 1714
                    result = r;
                    break;
1715 1716
                }
            } else {
1717 1718 1719 1720
                // insert value into queue (re-populate queue)
                pq_nodes[mini].key = &keys[mini];
                r = pqueue_insert(pq, &pq_nodes[mini]);
                if (r!=0) {
1721
                    // Note: This error path tested by loader-dup-test1.tdbrun (and by loader-dup-test4)
1722
                    result = r;
1723 1724
                    // printf("%s:%d returning\n", __FILE__, __LINE__);
                    break;
1725 1726 1727
                }
            }
        }
1728
 
1729
        n_rows_done++;
Yoni Fogel's avatar
Yoni Fogel committed
1730
        const uint64_t rows_per_report = size_factor*1024;
1731 1732 1733 1734 1735 1736 1737 1738 1739
        if (n_rows_done%rows_per_report==0) {
            // need to update the progress.
            double fraction_of_remaining_we_just_did = (double)rows_per_report / (double)(n_rows - n_rows_done + rows_per_report);
            invariant(0<= fraction_of_remaining_we_just_did && fraction_of_remaining_we_just_did<=1);
            int progress_just_done = fraction_of_remaining_we_just_did * progress_allocation;
            progress_allocation -= progress_just_done;
            // ignore the result from update_progress here, we'll call update_progress again below, which will give us the nonzero result.
            int r = update_progress(progress_just_done, bl, "in file merge");
            if (0) printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
1740
        }
1741
    }
1742
    if (result==0 && to_q) {
1743
        int r = queue_enq(q, (void*)output_rowset, 1, NULL);
1744 1745 1746 1747
        if (r!=0) 
            result = r;
        else 
            output_rowset = NULL;
1748 1749 1750
    }

    // cleanup
1751
    for (int i=0; i<n_sources; i++) {
1752 1753
        toku_free(keys[i].data);  keys[i].data = NULL;
        toku_free(vals[i].data);  vals[i].data = NULL;
1754
    }
1755
    if (output_rowset) {
1756 1757
        destroy_rowset(output_rowset);
        toku_free(output_rowset);
1758
    }
1759
    if (pq) { pqueue_free(pq); pq=NULL; }
1760
    toku_free(pq_nodes);
1761
    {
1762 1763 1764
        int r = update_progress(progress_allocation, bl, "end of merge_some_files");
        //printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
        if (r!=0 && result==0) result = r;
1765 1766
    }
    return result;
1767 1768
}

Yoni Fogel's avatar
Yoni Fogel committed
1769
static int merge_some_files (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation)
1770 1771 1772 1773
{
    int result = 0;
    DBUFIO_FILESET bfs = NULL;
    int *MALLOC_N(n_sources, fds);
1774
    if (fds==NULL) result=get_error_errno();
1775
    if (result==0) {
1776 1777 1778
        for (int i=0; i<n_sources; i++) {
            int r = fileno(toku_bl_fidx2file(bl, srcs_fidxs[i])); // we rely on the fact that when the files are closed, the fd is also closed.
            if (r==-1) {
1779
                result=get_error_errno();
1780 1781 1782 1783
                break;
            }
            fds[i] = r;
        }
1784 1785
    }
    if (result==0) {
1786 1787
        int r = create_dbufio_fileset(&bfs, n_sources, fds, memory_per_rowset_during_merge(bl, n_sources, to_q));
        if (r!=0) { result = r; }
1788
    }
1789
        
1790
    if (result==0) {
1791 1792
        int r = toku_merge_some_files_using_dbufio (to_q, dest_data, q, n_sources, bfs, srcs_fidxs, bl, which_db, dest_db, compare, progress_allocation);
        if (r!=0) { result = r; }
1793
    }
1794

1795
    if (bfs!=NULL) {
1796 1797
        if (result != 0)
            (void) panic_dbufio_fileset(bfs, result);
1798 1799 1800
        int r = destroy_dbufio_fileset(bfs);
        if (r!=0 && result==0) result=r;
        bfs = NULL;
1801 1802
    }
    if (fds!=NULL) {
1803 1804
        toku_free(fds);
        fds = NULL;
1805 1806 1807
    }
    return result;
}
1808

1809 1810 1811 1812 1813
static int int_min (int a, int b)
{
    if (a<b) return a;
    else return b;
}
1814

1815 1816 1817
static int n_passes (int N, int B) {
    int result = 0;
    while (N>1) {
1818 1819
        N = (N+B-1)/B;
        result++;
1820 1821 1822 1823
    }
    return result;
}

1824
int merge_files (struct merge_fileset *fs,
1825 1826 1827 1828 1829 1830 1831
                 FTLOADER bl,
                 // These are needed for the comparison function and error callback.
                 int which_db, DB *dest_db, ft_compare_func compare,
                 int progress_allocation,
                 // Write rowsets into this queue.
                 QUEUE output_q
                 )
1832 1833
/* Effect:  Given a fileset, merge all the files writing all the answers into a queue.
 *   All the files in fs, and any temporary files will be closed and unlinked (and the fileset will be empty)
1834
 * Return value: 0 on success, otherwise an error number.
1835 1836
 *   On error *fs will contain no open files.  All the files (including any temporary files) will be closed and unlinked.
 *    (however the fs will still need to be deallocated.)
1837 1838
 */
{
1839
    //printf(" merge_files %d files\n", fs->n_temp_files);
1840
    //printf(" merge_files use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
Yoni Fogel's avatar
Yoni Fogel committed
1841 1842
    const int final_mergelimit   = (size_factor == 1) ? 4 : merge_fanin(bl, true); // try for a merge to the leaf level
    const int earlier_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, false); // try for a merge at nonleaf.
1843
    int n_passes_left  = (fs->n_temp_files<=final_mergelimit)
1844 1845
        ? 1
        : 1+n_passes((fs->n_temp_files+final_mergelimit-1)/final_mergelimit, earlier_mergelimit);
1846
    // printf("%d files, %d on last pass, %d on earlier passes, %d passes\n", fs->n_temp_files, final_mergelimit, earlier_mergelimit, n_passes_left);
1847 1848
    int result = 0;
    while (fs->n_temp_files > 0) {
1849 1850 1851 1852 1853 1854
        int progress_allocation_for_this_pass = progress_allocation/n_passes_left;
        progress_allocation -= progress_allocation_for_this_pass;
        //printf("%s:%d n_passes_left=%d progress_allocation_for_this_pass=%d\n", __FILE__, __LINE__, n_passes_left, progress_allocation_for_this_pass);

        invariant(fs->n_temp_files>0);
        struct merge_fileset next_file_set;
Yoni Fogel's avatar
Yoni Fogel committed
1855
        bool to_queue = (bool)(fs->n_temp_files <= final_mergelimit);
1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895
        init_merge_fileset(&next_file_set);
        while (fs->n_temp_files>0) {
            // grab some files and merge them.
            int n_to_merge = int_min(to_queue?final_mergelimit:earlier_mergelimit, fs->n_temp_files);

            // We are about to do n_to_merge/n_temp_files of the remaining for this pass.
            int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files;
            // printf("%s:%d progress_allocation_for_this_subpass=%d n_temp_files=%d b=%llu\n", __FILE__, __LINE__, progress_allocation_for_this_subpass, fs->n_temp_files, (long long unsigned) memory_per_rowset_during_merge(bl, n_to_merge, to_queue));
            progress_allocation_for_this_pass -= progress_allocation_for_this_subpass;

            //printf("%s:%d merging\n", __FILE__, __LINE__);
            FIDX merged_data = FIDX_NULL;

            FIDX *XMALLOC_N(n_to_merge, data_fidxs);
            for (int i=0; i<n_to_merge; i++) {
                data_fidxs[i] = FIDX_NULL;
            }
            for (int i=0; i<n_to_merge; i++) {
                int idx = fs->n_temp_files -1 -i;
                FIDX fidx = fs->data_fidxs[idx];
                result = ft_loader_fi_reopen(&bl->file_infos, fidx, "r");
                if (result) break;
                data_fidxs[i] = fidx;
            }
            if (result==0 && !to_queue) {
                result = extend_fileset(bl, &next_file_set,  &merged_data);
            }

            if (result==0) {
                result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass);
                // if result!=0, fall through
                if (result==0) {
                    /*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0
                }
            }

            //printf("%s:%d merged\n", __FILE__, __LINE__);
            for (int i=0; i<n_to_merge; i++) {
                if (!fidx_is_null(data_fidxs[i])) {
                    {
Yoni Fogel's avatar
Yoni Fogel committed
1896
                        int r = ft_loader_fi_close(&bl->file_infos, data_fidxs[i], true);
1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908
                        if (r!=0 && result==0) result = r;
                    }
                    {
                        int r = ft_loader_fi_unlink(&bl->file_infos, data_fidxs[i]);
                        if (r!=0 && result==0) result = r;
                    }
                    data_fidxs[i] = FIDX_NULL;
                }
            }

            fs->n_temp_files -= n_to_merge;
            if (!to_queue && !fidx_is_null(merged_data)) {
Yoni Fogel's avatar
Yoni Fogel committed
1909
                int r = ft_loader_fi_close(&bl->file_infos, merged_data, true);
1910 1911 1912 1913 1914 1915
                if (r!=0 && result==0) result = r;
            }
            toku_free(data_fidxs);

            if (result!=0) break;
        }
1916

1917
        destroy_merge_fileset(fs);
1918
        *fs = next_file_set;
1919

1920 1921
        // Update the progress
        n_passes_left--;
1922

1923
        if (result==0) { invariant(progress_allocation_for_this_pass==0); }
1924

1925
        if (result!=0) break;
1926
    }
Yoni Fogel's avatar
Yoni Fogel committed
1927
    if (result) ft_loader_set_panic(bl, result, true);
1928

1929
    {
1930 1931
        int r = queue_eof(output_q);
        if (r!=0 && result==0) result = r;
1932
    }
1933
    // It's conceivable that the progress_allocation could be nonzero (for example if bl->N==0)
1934
    {
1935 1936
        int r = update_progress(progress_allocation, bl, "did merge_files");
        if (r!=0 && result==0) result = r;
1937 1938
    }
    return result;
1939 1940
}

1941 1942 1943
struct subtree_info {
    int64_t block;
};
1944

1945 1946 1947 1948 1949 1950
struct subtrees_info {
    int64_t next_free_block;
    int64_t n_subtrees;       // was n_blocks
    int64_t n_subtrees_limit;
    struct subtree_info *subtrees;
};
1951

1952
static void subtrees_info_init(struct subtrees_info *p) {
1953 1954
    p->next_free_block = p->n_subtrees = p->n_subtrees_limit = 0;
    p->subtrees = NULL;
1955 1956 1957 1958
}

static void subtrees_info_destroy(struct subtrees_info *p) {
    toku_free(p->subtrees);
1959
    p->subtrees = NULL;
1960 1961
}

1962
static void allocate_node (struct subtrees_info *sts, int64_t b) {
1963
    if (sts->n_subtrees >= sts->n_subtrees_limit) {
1964 1965
        sts->n_subtrees_limit *= 2;
        XREALLOC_N(sts->n_subtrees_limit, sts->subtrees);
1966 1967 1968 1969
    }
    sts->subtrees[sts->n_subtrees].block = b;
    sts->n_subtrees++;
}
1970

1971 1972 1973 1974
struct dbuf {
    unsigned char *buf;
    int buflen;
    int off;
1975
    int error;
1976
};
1977

1978
struct leaf_buf {
1979
    BLOCKNUM blocknum;
1980
    TXNID xid;
1981
    uint64_t nkeys, ndata, dsize;
1982
    FTNODE node;
1983 1984
    XIDS xids;
    uint64_t off;
1985
};
1986

1987 1988 1989
struct translation {
    int64_t off, size;
};
1990

1991 1992 1993
struct dbout {
    int fd;
    toku_off_t current_off;
1994

1995 1996 1997
    int64_t n_translations;
    int64_t n_translations_limit;
    struct translation *translation;
1998
    toku_mutex_t mutex;
1999
    FT h;
2000
};
2001

2002
static inline void dbout_init(struct dbout *out, FT h) {
2003 2004 2005 2006
    out->fd = -1;
    out->current_off = 0;
    out->n_translations = out->n_translations_limit = 0;
    out->translation = NULL;
2007
    toku_mutex_init(&out->mutex, NULL);
2008
    out->h = h;
2009 2010 2011
}

static inline void dbout_destroy(struct dbout *out) {
2012 2013 2014 2015
    if (out->fd >= 0) {
        toku_os_close(out->fd);
        out->fd = -1;
    }
2016
    toku_free(out->translation);
2017
    out->translation = NULL;
2018
    toku_mutex_destroy(&out->mutex);
2019 2020
}

2021
static inline void dbout_lock(struct dbout *out) {
2022
    toku_mutex_lock(&out->mutex);
2023 2024
}

2025
static inline void dbout_unlock(struct dbout *out) {
2026
    toku_mutex_unlock(&out->mutex);
2027 2028
}

2029 2030 2031 2032 2033 2034
static void seek_align_locked(struct dbout *out) {
    toku_off_t old_current_off = out->current_off;
    int alignment = 4096;
    out->current_off += alignment-1;
    out->current_off &= ~(alignment-1);
    toku_off_t r = lseek(out->fd, out->current_off, SEEK_SET);
Dave Wells's avatar
Dave Wells committed
2035 2036 2037 2038
    invariant(r==out->current_off);
    invariant(out->current_off >= old_current_off);
    invariant(out->current_off < old_current_off+alignment);
    invariant(out->current_off % alignment == 0);
2039 2040
}

2041 2042 2043 2044 2045
static void seek_align(struct dbout *out) {
    dbout_lock(out);
    seek_align_locked(out);
    dbout_unlock(out);
}
2046 2047

static void dbuf_init (struct dbuf *dbuf) {
2048 2049 2050 2051
    dbuf->buf = 0;
    dbuf->buflen = 0;
    dbuf->off = 0;
    dbuf->error = 0;
2052
}
2053

2054
static void dbuf_destroy (struct dbuf *dbuf) {
2055
    toku_free(dbuf->buf); dbuf->buf = NULL;
2056 2057
}

2058
static int allocate_block (struct dbout *out, int64_t *ret_block_number)
2059 2060
// Return the new block number
{
2061
    int result = 0;
2062
    dbout_lock(out);
2063 2064 2065 2066
    int64_t block_number = out->n_translations;
    if (block_number >= out->n_translations_limit) {
        int64_t old_n_translations_limit = out->n_translations_limit;
        struct translation *old_translation = out->translation;
2067 2068 2069 2070 2071 2072
        if (out->n_translations_limit==0) {
            out->n_translations_limit = 1;
        } else {
            out->n_translations_limit *= 2;
        }
        REALLOC_N(out->n_translations_limit, out->translation);
2073
        if (out->translation == NULL) {
2074 2075
            result = get_error_errno();
            invariant(result);
2076 2077
            out->n_translations_limit = old_n_translations_limit;
            out->translation = old_translation;
2078
            goto cleanup;
2079 2080
        }
    }
2081 2082 2083
    out->n_translations++;
    *ret_block_number = block_number;
cleanup:
2084 2085 2086 2087
    dbout_unlock(out);
    return result;
}

2088
static void putbuf_bytes (struct dbuf *dbuf, const void *bytes, int nbytes) {
2089
    if (!dbuf->error && dbuf->off + nbytes > dbuf->buflen) {
2090
        unsigned char *oldbuf = dbuf->buf;
2091
        int oldbuflen = dbuf->buflen;
2092 2093 2094
        dbuf->buflen += dbuf->off + nbytes;
        dbuf->buflen *= 2;
        REALLOC_N(dbuf->buflen, dbuf->buf);
2095
        if (dbuf->buf == NULL) {
2096
            dbuf->error = get_error_errno();
2097 2098 2099 2100 2101 2102 2103
            dbuf->buf = oldbuf;
            dbuf->buflen = oldbuflen;
        }
    }
    if (!dbuf->error) {
        memcpy(dbuf->buf + dbuf->off, bytes, nbytes);
        dbuf->off += nbytes;
2104 2105 2106 2107 2108 2109
    }
}

static void putbuf_int32 (struct dbuf *dbuf, int v) {
    putbuf_bytes(dbuf, &v, 4);
}
2110

2111
static void putbuf_int64 (struct dbuf *dbuf, long long v) {
2112 2113 2114 2115
    putbuf_int32(dbuf, v>>32);
    putbuf_int32(dbuf, v&0xFFFFFFFF);
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
2116
static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc), int64_t lblocknum, TXNID xid, uint32_t UU(target_nodesize)) {
Dave Wells's avatar
Dave Wells committed
2117
    invariant(lblocknum < out->n_translations_limit);
2118

2119
    struct leaf_buf *XMALLOC(lbuf);
2120 2121
    lbuf->blocknum.b = lblocknum;
    lbuf->xid = xid;
2122
    lbuf->nkeys = lbuf->ndata = lbuf->dsize = 0;
2123
    lbuf->off = 0;
2124

2125 2126 2127 2128 2129 2130 2131 2132
    lbuf->xids = xids_get_root_xids();
    if (xid != TXNID_NONE) {
        XIDS new_xids = NULL;
        int r = xids_create_child(lbuf->xids, &new_xids, xid); 
        assert(r == 0 && new_xids);
        xids_destroy(&lbuf->xids);
        lbuf->xids = new_xids;
    }
2133

2134
    FTNODE XMALLOC(node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2135
    toku_initialize_empty_ftnode(node, lbuf->blocknum, 0 /*height*/, 1 /*basement nodes*/, FT_LAYOUT_VERSION, 0);
2136
    BP_STATE(node, 0) = PT_AVAIL;
2137
    lbuf->node = node;
2138 2139 2140 2141

    return lbuf;
}

2142 2143
static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, FTLOADER bl, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method);
static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method);
2144
static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen, int this_leafentry_size, STAT64INFO stats_to_update);
2145
static int write_translation_table (struct dbout *out, long long *off_of_translation_p);
2146
static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk);
2147

2148
static void drain_writer_q(QUEUE q) {
2149
    void *item;
2150
    while (1) {
2151
        int r = queue_deq(q, &item, NULL, NULL);
2152 2153
        if (r == EOF)
            break;
Dave Wells's avatar
Dave Wells committed
2154
        invariant(r == 0);
2155
        struct rowset *rowset = (struct rowset *) item;
2156 2157 2158 2159 2160
        destroy_rowset(rowset);
        toku_free(rowset);
    }
}

2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182
static void cleanup_maxkey(DBT *maxkey) {
    if (maxkey->flags == DB_DBT_REALLOC) {
        toku_free(maxkey->data);
        maxkey->data = NULL;
        maxkey->flags = 0;
    }
}

static void update_maxkey(DBT *maxkey, DBT *key) {
    cleanup_maxkey(maxkey);
    *maxkey = *key;
}

static int copy_maxkey(DBT *maxkey) {
    DBT newkey;
    toku_init_dbt_flags(&newkey, DB_DBT_REALLOC);
    int r = toku_dbt_set(maxkey->size, maxkey->data, &newkey, NULL);
    if (r == 0)
        update_maxkey(maxkey, &newkey);
    return r;
}

2183
static int toku_loader_write_ft_from_q (FTLOADER bl,
2184 2185 2186 2187 2188
                                         const DESCRIPTOR descriptor,
                                         int fd, // write to here
                                         int progress_allocation,
                                         QUEUE q,
                                         uint64_t total_disksize_estimate,
2189
                                         int which_db,
2190
                                         uint32_t target_nodesize,
2191 2192
                                         uint32_t target_basementnodesize,
                                         enum toku_compression_method target_compression_method)
2193 2194
// Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree.  Closes fd.
{
2195
    // set the number of fractal tree writer threads so that we can partition memory in the merger
2196
    ft_loader_set_fractal_workers_count(bl);
2197

2198
    int result = 0;
2199 2200 2201 2202 2203 2204 2205 2206
    int r;

    // The pivots file will contain all the pivot strings (in the form <size(32bits)> <data>)
    // The pivots_fname is the name of the pivots file.
    // Note that the pivots file will have one extra pivot in it (the last key in the dictionary) which will not appear in the tree.
    int64_t n_pivots=0; // number of pivots in pivots_file
    FIDX pivots_file;  // the file

2207
    r = ft_loader_open_temp_file (bl, &pivots_file);
2208
    if (r) {
2209 2210 2211
        result = r; 
        drain_writer_q(q); 
        return result;
2212
    }
2213
    FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
2214

2215 2216 2217 2218
    TXNID root_xid_that_created = TXNID_NONE;
    if (bl->root_xids_that_created)
        root_xid_that_created = bl->root_xids_that_created[which_db];

2219 2220 2221
    // TODO: (Zardosht/Yoni/Leif), do this code properly
    struct ft ft;
    toku_ft_init(&ft, (BLOCKNUM){0}, bl->load_lsn, root_xid_that_created, target_nodesize, target_basementnodesize, target_compression_method);
2222

2223
    struct dbout out;
Leif Walsh's avatar
Leif Walsh committed
2224
    ZERO_STRUCT(out);
2225
    dbout_init(&out, &ft);
2226 2227 2228 2229 2230
    out.fd = fd;
    out.current_off = 8192; // leave 8K reserved at beginning
    out.n_translations = 3; // 3 translations reserved at the beginning
    out.n_translations_limit = 4;
    MALLOC_N(out.n_translations_limit, out.translation);
2231
    if (out.translation == NULL) {
2232
        result = get_error_errno();
2233
        dbout_destroy(&out);
2234
        drain_writer_q(q);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2235
        toku_free(ft.h);
2236
        return result;
2237 2238 2239 2240 2241 2242 2243 2244 2245 2246
    }

    // The blocks_array will contain all the block numbers that correspond to the pivots.  Generally there should be one more block than pivot.
    struct subtrees_info sts; 
    subtrees_info_init(&sts);
    sts.next_free_block  = 3;
    sts.n_subtrees       = 0;
    sts.n_subtrees_limit = 1;
    MALLOC_N(sts.n_subtrees_limit, sts.subtrees);
    if (sts.subtrees == NULL) {
2247
        result = get_error_errno();
2248 2249
        subtrees_info_destroy(&sts);
        dbout_destroy(&out);
2250
        drain_writer_q(q);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2251
        toku_free(ft.h);
2252
        return result;
2253 2254
    }

2255
    out.translation[0].off = -2LL; out.translation[0].size = 0; // block 0 is NULL
Dave Wells's avatar
Dave Wells committed
2256 2257
    invariant(1==RESERVED_BLOCKNUM_TRANSLATION);
    invariant(2==RESERVED_BLOCKNUM_DESCRIPTOR);
2258 2259 2260
    out.translation[1].off = -1;                                // block 1 is the block translation, filled in later
    out.translation[2].off = -1;                                // block 2 is the descriptor
    seek_align(&out);
2261
    int64_t lblock = 0;  // make gcc --happy
2262
    result = allocate_block(&out, &lblock);
2263
    invariant(result == 0); // can not fail since translations reserved above
2264 2265

    TXNID le_xid = leafentry_xid(bl, which_db);
2266
    struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
Yoni Fogel's avatar
Yoni Fogel committed
2267 2268
    uint64_t n_rows_remaining = bl->n_rows;
    uint64_t old_n_rows_remaining = bl->n_rows;
2269

2270 2271 2272
    uint64_t  used_estimate = 0;  // how much diskspace have we used up?

    DBT maxkey = make_dbt(0, 0); // keep track of the max key of the current node
2273

2274
    STAT64INFO_S deltas = ZEROSTATS;
2275
    while (result == 0) {
2276 2277 2278 2279 2280
        void *item;
        {
            int rr = queue_deq(q, &item, NULL, NULL);
            if (rr == EOF) break;
            if (rr != 0) {
Yoni Fogel's avatar
Yoni Fogel committed
2281
                ft_loader_set_panic(bl, rr, true); // error after cilk sync
2282
                break;
2283
            }
2284 2285
        }
        struct rowset *output_rowset = (struct rowset *)item;
2286

2287 2288 2289
        for (unsigned int i = 0; i < output_rowset->n_rows; i++) {
            DBT key = make_dbt(output_rowset->data+output_rowset->rows[i].off,                               output_rowset->rows[i].klen);
            DBT val = make_dbt(output_rowset->data+output_rowset->rows[i].off + output_rowset->rows[i].klen, output_rowset->rows[i].vlen);
2290

2291
            size_t this_leafentry_size = ft_loader_leafentry_size(key.size, val.size, le_xid);
2292

2293
            used_estimate += this_leafentry_size;
2294

2295
            // Spawn off a node if
2296
            //   a) there is at least one row in it, and
2297 2298 2299 2300 2301 2302 2303
            //   b) this item would make the nodesize too big, or
            //   c) the remaining amount won't fit in the current node and the current node's data is more than the remaining amount
            uint64_t remaining_amount = total_disksize_estimate - used_estimate;
            uint64_t used_here = lbuf->off + 1000;             // leave 1000 for various overheads.
            uint64_t target_size = (target_nodesize*7L)/8;     // use only 7/8 of the node.
            uint64_t used_here_with_next_key = used_here + this_leafentry_size;
            if (lbuf->nkeys > 0 &&
2304 2305
                ((used_here_with_next_key >= target_size) || (used_here + remaining_amount >= target_size && lbuf->off > remaining_amount))) {

2306 2307 2308
                int progress_this_node = progress_allocation * (double)(old_n_rows_remaining - n_rows_remaining)/(double)old_n_rows_remaining;
                progress_allocation -= progress_this_node;
                old_n_rows_remaining = n_rows_remaining;
2309

2310
                allocate_node(&sts, lblock);
2311

2312
                n_pivots++;
2313

2314
                invariant(maxkey.data != NULL);
2315
                if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) {
Yoni Fogel's avatar
Yoni Fogel committed
2316
                    ft_loader_set_panic(bl, r, true); // error after cilk sync
2317
                    if (result == 0) result = r;
2318 2319
                    break;
                }
2320

2321
                cilk_spawn finish_leafnode(&out, lbuf, progress_this_node, bl, target_basementnodesize, target_compression_method);
2322
                lbuf = NULL;
2323

2324
                r = allocate_block(&out, &lblock);
2325
                if (r != 0) {
Yoni Fogel's avatar
Yoni Fogel committed
2326
                    ft_loader_set_panic(bl, r, true);
2327
                    if (result == 0) result = r;
2328 2329
                    break;
                }
2330 2331
                lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
            }
2332

2333 2334
            add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size, this_leafentry_size, &deltas);
            n_rows_remaining--;
2335 2336

            update_maxkey(&maxkey, &key); // set the new maxkey to the current key
2337
        }
2338

2339 2340 2341
        r = copy_maxkey(&maxkey); // make a copy of maxkey before the rowset is destroyed
        if (result == 0)
            result = r;
2342 2343
        destroy_rowset(output_rowset);
        toku_free(output_rowset);
2344 2345

        if (result == 0)
2346
            result = ft_loader_get_error(&bl->error_callback); // check if an error was posted and terminate this quickly
2347 2348
    }

2349
    if (deltas.numrows || deltas.numbytes) {
2350
        toku_ft_update_stats(&ft.in_memory_stats, deltas);
2351 2352
    }

2353 2354
    cleanup_maxkey(&maxkey);

2355
    if (lbuf) {
2356
        allocate_node(&sts, lblock);
2357 2358
        {
            int p = progress_allocation/2;
2359
            finish_leafnode(&out, lbuf, p, bl, target_basementnodesize, target_compression_method);
2360 2361
            progress_allocation -= p;
        }
2362 2363 2364 2365
    }

    cilk_sync;

2366
    if (result == 0) {
2367
        result = ft_loader_get_error(&bl->error_callback); // if there were any prior errors then exit
2368 2369
    }

2370 2371
    if (result != 0) goto error;

2372
    // We haven't paniced, so the sum should add up.
2373
    invariant(used_estimate == total_disksize_estimate);
2374

2375 2376 2377
    n_pivots++;

    {
2378 2379
        DBT key = make_dbt(0,0); // must write an extra DBT into the pivots file.
        r = bl_write_dbt(&key, pivots_stream, NULL, bl);
2380 2381 2382
        if (r) { 
            result = r; goto error;
        }
2383 2384
    }

2385
    r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
2386 2387 2388
    if (r) {
        result = r; goto error;
    }
2389 2390

    {
2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407
        invariant(sts.n_subtrees==1);
        out.h->h->root_blocknum = make_blocknum(sts.subtrees[0].block);
        toku_free(sts.subtrees); sts.subtrees = NULL;

        // write the descriptor
        {
            seek_align(&out);
            invariant(out.n_translations >= RESERVED_BLOCKNUM_DESCRIPTOR);
            invariant(out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off == -1);
            out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off = out.current_off;
            size_t desc_size = 4+toku_serialize_descriptor_size(descriptor);
            invariant(desc_size>0);
            out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].size = desc_size;
            struct wbuf wbuf;
            char *XMALLOC_N(desc_size, buf);
            wbuf_init(&wbuf, buf, desc_size);
            toku_serialize_descriptor_contents_to_wbuf(&wbuf, descriptor);
Yoni Fogel's avatar
Yoni Fogel committed
2408
            uint32_t checksum = x1764_finish(&wbuf.checksum);
2409 2410 2411 2412 2413
            wbuf_int(&wbuf, checksum);
            invariant(wbuf.ndone==desc_size);
            r = toku_os_write(out.fd, wbuf.buf, wbuf.ndone);
            out.current_off += desc_size;
            toku_free(buf);    // wbuf_destroy
2414 2415 2416
            if (r) {
                result = r; goto error;
            }
2417 2418 2419 2420
        }

        long long off_of_translation;
        r = write_translation_table(&out, &off_of_translation);
2421 2422 2423 2424
        if (r) {
            result = r; goto error;
        }

2425
        r = write_header(&out, off_of_translation, (out.n_translations+1)*16+4);
2426 2427 2428 2429
        if (r) {
            result = r; goto error;
        }

2430
        r = update_progress(progress_allocation, bl, "wrote tdb file");
2431 2432 2433
        if (r) {
            result = r; goto error;
        }
2434 2435
    }

2436
    r = fsync(out.fd);
2437
    if (r) {
2438
        result = get_error_errno(); goto error;
2439
    }
2440

2441 2442
    // Do we need to pay attention to user_said_stop?  Or should the guy at the other end of the queue pay attention and send in an EOF.

2443
 error:
2444
    {
2445
        int rr = toku_os_close(fd);
2446
        if (rr)
2447
            result = get_error_errno();
2448 2449 2450
    }
    out.fd = -1;

2451 2452
    subtrees_info_destroy(&sts);
    dbout_destroy(&out);
2453
    drain_writer_q(q);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2454
    toku_free(ft.h);
2455

2456
    return result;
2457 2458
}

2459
int toku_loader_write_brt_from_q_in_C (FTLOADER                bl,
2460 2461 2462 2463 2464
                                       const DESCRIPTOR descriptor,
                                       int                      fd, // write to here
                                       int                      progress_allocation,
                                       QUEUE                    q,
                                       uint64_t                 total_disksize_estimate,
2465
                                       int                      which_db,
2466
                                       uint32_t                 target_nodesize,
2467 2468
                                       uint32_t                 target_basementnodesize,
                                       enum toku_compression_method target_compression_method)
2469 2470
// This is probably only for testing.
{
2471
    target_nodesize = target_nodesize == 0 ? default_loader_nodesize : target_nodesize;
2472
    target_basementnodesize = target_basementnodesize == 0 ? default_loader_basementnodesize : target_basementnodesize;
2473
    return toku_loader_write_ft_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize, target_compression_method);
2474 2475 2476 2477 2478
}


static void* fractal_thread (void *ftav) {
    struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
2479
    int r = toku_loader_write_ft_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize, fta->target_basementnodesize, fta->target_compression_method);
2480 2481 2482 2483
    fta->errno_result = r;
    return NULL;
}

2484
static int loader_do_i (FTLOADER bl,
2485
                        int which_db,
2486
                        DB *dest_db,
2487
                        ft_compare_func compare,
2488
                        const DESCRIPTOR descriptor,
2489 2490 2491 2492
                        const char *new_fname,
                        int progress_allocation // how much progress do I need to add into bl->progress by the end..
                        )
/* Effect: Handle the file creating for one particular DB in the bulk loader. */
2493
/* Requires: The data is fully extracted, so we can do merges out of files and write the ft file. */
2494 2495 2496 2497
{
    //printf("doing i use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
    struct merge_fileset *fs = &(bl->fs[which_db]);
    struct rowset *rows = &(bl->rows[which_db]);
Dave Wells's avatar
Dave Wells committed
2498
    invariant(rows->data==NULL); // the rows should be all cleaned up already
2499 2500 2501 2502 2503 2504

    // a better allocation would be to figure out roughly how many merge passes we'll need.
    int allocation_for_merge = (2*progress_allocation)/3;
    progress_allocation -= allocation_for_merge;

    int r;
2505
    r = queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH);
2506 2507 2508
    if (r) goto error;

    {
2509 2510 2511
        mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
        int fd = toku_os_open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode); // #2621
        if (fd < 0) {
2512
            r = get_error_errno(); goto error;
2513
        }
2514

2515
        uint32_t target_nodesize, target_basementnodesize;
2516
        enum toku_compression_method target_compression_method;
2517 2518
        r = dest_db->get_pagesize(dest_db, &target_nodesize);
        invariant_zero(r);
2519 2520
        r = dest_db->get_readpagesize(dest_db, &target_basementnodesize);
        invariant_zero(r);
2521 2522
        r = dest_db->get_compression_method(dest_db, &target_compression_method);
        invariant_zero(r);
2523

2524
        // This structure must stay live until the join below.
Dave Wells's avatar
Dave Wells committed
2525 2526 2527 2528 2529
        struct fractal_thread_args fta = { bl,
                                           descriptor,
                                           fd,
                                           progress_allocation,
                                           bl->fractal_queues[which_db],
2530
                                           bl->extracted_datasizes[which_db],
2531
                                           0,
2532 2533
                                           which_db,
                                           target_nodesize,
2534
                                           target_basementnodesize,
2535
                                           target_compression_method,
2536
        };
2537

2538 2539 2540 2541 2542 2543
        r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
        if (r) {
            int r2 __attribute__((__unused__)) = queue_destroy(bl->fractal_queues[which_db]);
            // ignore r2, since we already have an error
            goto error;
        }
Yoni Fogel's avatar
Yoni Fogel committed
2544 2545
        invariant(bl->fractal_threads_live[which_db]==false);
        bl->fractal_threads_live[which_db] = true;
2546 2547 2548 2549 2550 2551 2552 2553

        r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);

        {
            void *toku_pthread_retval;
            int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
            invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here.  A previous bug but that struct into a C block statement.
            resource_assert_zero(r2);
Dave Wells's avatar
Dave Wells committed
2554
            invariant(toku_pthread_retval==NULL);
2555
            invariant(bl->fractal_threads_live[which_db]);
Yoni Fogel's avatar
Yoni Fogel committed
2556
            bl->fractal_threads_live[which_db] = false;
2557 2558
            if (r == 0) r = fta.errno_result;
        }
2559 2560 2561
    }

 error: // this is the cleanup code.  Even if r==0 (no error) we fall through to here.
2562 2563 2564 2565 2566 2567
    {
        int r2 = queue_destroy(bl->fractal_queues[which_db]);
        invariant(r2==0);
        bl->fractal_queues[which_db]=NULL;
    }

2568 2569 2570 2571 2572 2573 2574
    // if we get here we need to free up the merge_fileset and the rowset, as well as the keys
    toku_free(rows->data); rows->data = NULL;
    toku_free(rows->rows); rows->rows = NULL;
    toku_free(fs->data_fidxs); fs->data_fidxs = NULL;
    return r;
}

2575
static int toku_ft_loader_close_internal (FTLOADER bl)
2576 2577 2578 2579
/* Effect: Close the bulk loader.
 * Return all the file descriptors in the array fds. */
{
    int result = 0;
2580 2581 2582 2583
    if (bl->N == 0)
        result = update_progress(PROGRESS_MAX, bl, "done");
    else {
        int remaining_progress = PROGRESS_MAX;
2584
        for (int i = 0; i < bl->N; i++) {
2585 2586 2587 2588
            // Take the unallocated progress and divide it among the unfinished jobs.
            // This calculation allocates all of the PROGRESS_MAX bits of progress to some job.
            int allocate_here = remaining_progress/(bl->N - i);
            remaining_progress -= allocate_here;
2589 2590
            char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[i]);
            result = loader_do_i(bl, i, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd, allocate_here);
2591
            toku_free(fname_in_cwd);
2592 2593 2594
            if (result != 0) 
                goto error;
            invariant(0 <= bl->progress && bl->progress <= PROGRESS_MAX);
2595
        }
2596
        if (result==0) invariant(remaining_progress==0);
2597 2598 2599 2600 2601 2602 2603 2604

        // fsync the directory containing the new tokudb files.
        char *fname0 = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[0]);
        int r = toku_fsync_directory(fname0);
        toku_free(fname0);
        if (r != 0) {
            result = r; goto error;
        }
2605
    }
Dave Wells's avatar
Dave Wells committed
2606 2607 2608
    invariant(bl->file_infos.n_files_open   == 0);
    invariant(bl->file_infos.n_files_extant == 0);
    invariant(bl->progress == PROGRESS_MAX);
2609
 error:
Yoni Fogel's avatar
Yoni Fogel committed
2610
    toku_ft_loader_internal_destroy(bl, (bool)(result!=0));
2611 2612 2613
    return result;
}

2614 2615
int toku_ft_loader_close (FTLOADER bl,
                           ft_loader_error_func error_function, void *error_extra,
2616 2617
                           ft_loader_poll_func  poll_function,  void *poll_extra
                           )
2618
{
2619 2620
    int result = 0;

2621 2622 2623 2624
    int r;

    //printf("Closing\n");

2625
    ft_loader_set_error_function(&bl->error_callback, error_function, error_extra);
2626

2627
    ft_loader_set_poll_function(&bl->poll_callback, poll_function, poll_extra);
2628

2629 2630
    if (bl->extractor_live) {
        r = finish_extractor(bl);
2631 2632
        if (r)
            result = r;
2633
        invariant(!bl->extractor_live);
2634
    }
2635 2636

    // check for an error during extraction
2637
    if (result == 0) {
2638
        r = ft_loader_call_error_function(&bl->error_callback);
2639 2640
        if (r)
            result = r;
2641 2642
    }

2643
    if (result == 0) {
2644
        r = toku_ft_loader_close_internal(bl);
2645 2646 2647
        if (r && result == 0)
            result = r;
    } else
Yoni Fogel's avatar
Yoni Fogel committed
2648
        toku_ft_loader_internal_destroy(bl, true);
2649

2650
    return result;
2651 2652
}

2653
int toku_ft_loader_finish_extractor(FTLOADER bl) {
2654
    int result = 0;
2655
    if (bl->extractor_live) {
2656 2657 2658
        int r = finish_extractor(bl);
        if (r)
            result = r;
2659 2660 2661
        invariant(!bl->extractor_live);
    } else
        result = EINVAL;
2662 2663 2664
    return result;
}

Yoni Fogel's avatar
Yoni Fogel committed
2665
int toku_ft_loader_abort(FTLOADER bl, bool is_error) 
2666
/* Effect : Abort the bulk loader, free ft_loader resources */
2667
{
2668 2669
    int result = 0;

2670 2671 2672
    // cleanup the extractor thread
    if (bl->extractor_live) {
        int r = finish_extractor(bl);
2673 2674 2675
        if (r)
            result = r;
        invariant(!bl->extractor_live);
2676
    }
2677

2678
    for (int i = 0; i < bl->N; i++)
2679
        invariant(!bl->fractal_threads_live[i]);
2680

2681
    toku_ft_loader_internal_destroy(bl, is_error);
2682 2683 2684
    return result;
}

2685 2686
int toku_ft_loader_get_error(FTLOADER bl, int *error) {
    *error = ft_loader_get_error(&bl->error_callback);
2687 2688 2689
    return 0;
}

2690
static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen, int this_leafentry_size, STAT64INFO stats_to_update) {
2691
    lbuf->nkeys++;
2692
    lbuf->ndata++;
2693 2694
    lbuf->dsize += keylen + vallen;
    lbuf->off += this_leafentry_size;
2695 2696 2697 2698

    // append this key val pair to the leafnode 
    // #3588 TODO just make a clean ule and append it to the omt
    // #3588 TODO can do the rebalancing here and avoid a lot of work later
2699
    FTNODE leafnode = lbuf->node;
2700
    uint32_t idx = toku_omt_size(BLB_BUFFER(leafnode, 0));
2701 2702 2703 2704 2705 2706
    DBT thekey = { .data = key, .size = (uint32_t) keylen }; 
    DBT theval = { .data = val, .size = (uint32_t) vallen };
    FT_MSG_S cmd = { .type = FT_INSERT,
                     .msn = ZERO_MSN,
                     .xids = lbuf->xids,
                     .u = { .id = { &thekey, &theval } } };
2707
    uint64_t workdone=0;
2708
    toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, &workdone, stats_to_update);
2709 2710
}

2711
static int write_literal(struct dbout *out, void*data,  size_t len) {
Dave Wells's avatar
Dave Wells committed
2712
    invariant(out->current_off%4096==0);
2713 2714 2715 2716
    int result = toku_os_write(out->fd, data, len);
    if (result == 0)
        out->current_off+=len;
    return result;
2717 2718
}

2719
static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, FTLOADER bl, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method) {
2720 2721
    int result = 0;

2722 2723
    // serialize leaf to buffer
    size_t serialized_leaf_size = 0;
2724
    size_t uncompressed_serialized_leaf_size = 0;
2725
    char *serialized_leaf = NULL;
2726
    FTNODE_DISK_DATA ndd = NULL;
2727
    result = toku_serialize_ftnode_to_memory(lbuf->node, &ndd, target_basementnodesize, target_compression_method, true, true, &serialized_leaf_size, &uncompressed_serialized_leaf_size, &serialized_leaf);
2728

2729 2730 2731 2732 2733 2734 2735 2736 2737
    // write it out
    if (result == 0) {
        dbout_lock(out);
        long long off_of_leaf = out->current_off;
        result = write_literal(out, serialized_leaf, serialized_leaf_size);
        if (result == 0) {
            out->translation[lbuf->blocknum.b].off  = off_of_leaf;
            out->translation[lbuf->blocknum.b].size = serialized_leaf_size;
            seek_align_locked(out);
2738
        }
2739
        dbout_unlock(out);
2740
    }
2741

2742
    // free the node
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2743 2744
    if (serialized_leaf) {
        toku_free(ndd);
2745
        toku_free(serialized_leaf);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2746
    }
2747
    toku_ftnode_free(&lbuf->node);
2748
    xids_destroy(&lbuf->xids);
2749
    toku_free(lbuf);
2750 2751

    //printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
2752
    if (result == 0)
2753
        result = update_progress(progress_allocation, bl, "wrote node");
2754

2755
    if (result)
Yoni Fogel's avatar
Yoni Fogel committed
2756
        ft_loader_set_panic(bl, result, true);
2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769
}

static int write_translation_table (struct dbout *out, long long *off_of_translation_p) {
    seek_align(out);
    struct dbuf ttable;
    dbuf_init(&ttable);
    long long off_of_translation = out->current_off;
    long long bt_size_on_disk = out->n_translations * 16 + 20;
    putbuf_int64(&ttable, out->n_translations);    // number of records
    putbuf_int64(&ttable, -1LL); // the linked list
    out->translation[1].off = off_of_translation;
    out->translation[1].size = bt_size_on_disk;
    for (int i=0; i<out->n_translations; i++) {
2770 2771
        putbuf_int64(&ttable, out->translation[i].off);
        putbuf_int64(&ttable, out->translation[i].size);
2772 2773 2774
    }
    unsigned int checksum = x1764_memory(ttable.buf, ttable.off);
    putbuf_int32(&ttable, checksum);
2775 2776 2777 2778 2779
    int result = ttable.error;
    if (result == 0) {
        invariant(bt_size_on_disk==ttable.off);
        result = toku_os_pwrite(out->fd, ttable.buf, ttable.off, off_of_translation);
    }
2780 2781
    dbuf_destroy(&ttable);
    *off_of_translation_p = off_of_translation;
2782
    return result;
2783 2784
}

2785
static int
2786
write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk) {
2787
    int result = 0;
2788
    size_t size = toku_serialize_ft_size(out->h->h);
2789 2790
    struct wbuf wbuf;
    char *MALLOC_N(size, buf);
2791
    if (buf == NULL) {
2792
        result = get_error_errno();
2793 2794
    } else {
        wbuf_init(&wbuf, buf, size);
2795 2796
        out->h->h->on_disk_stats = out->h->in_memory_stats;
        toku_serialize_ft_to_wbuf(&wbuf, out->h->h, translation_location_on_disk, translation_size_on_disk);
2797 2798 2799 2800 2801 2802 2803
        if (wbuf.ndone != size)
            result = EINVAL;
        else
            result = toku_os_pwrite(out->fd, wbuf.buf, wbuf.ndone, 0);
        toku_free(buf);
    }
    return result;
2804 2805
}

2806
static int read_some_pivots (FIDX pivots_file, int n_to_read, FTLOADER bl,
2807
                      /*out*/ DBT pivots[/*n_to_read*/])
2808 2809
// pivots is an array to be filled in.  The pivots array is uninitialized.
{
2810 2811 2812
    for (int i = 0; i < n_to_read; i++)
        pivots[i] = zero_dbt;

2813
    FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
2814 2815 2816

    int result = 0;
    for (int i = 0; i < n_to_read; i++) {
2817 2818
        int r = bl_read_dbt(&pivots[i], pivots_stream);
        if (r != 0) {
2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829
            result = r;
            break;
        }
    }
    return result;
}

static void delete_pivots(DBT pivots[], int n) {
    for (int i = 0; i < n; i++) 
        toku_free(pivots[i].data);
    toku_free(pivots);
2830 2831 2832
}

static int setup_nonleaf_block (int n_children,
2833 2834 2835 2836 2837 2838
                                struct subtrees_info *subtrees,         FIDX pivots_file,        int64_t first_child_offset_in_subtrees,
                                struct subtrees_info *next_subtrees,    FIDX next_pivots_file,
                                struct dbout *out, FTLOADER bl,
                                /*out*/int64_t *blocknum,
                                /*out*/struct subtree_info **subtrees_info_p,
                                /*out*/DBT **pivots_p)
2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850
// Do the serial part of setting up a non leaf block.
//   Read the pivots out of the file, and store them in a newly allocated array of DBTs (returned in *pivots_p)  There are (n_blocks_to_use-1) of these.
//   Copy the final pivot into the next_pivots file instead of returning it.
//   Copy the subtree_info from the subtrees structure, and store them in a newly allocated array of subtree_infos (return in *subtrees_info_p).  There are n_blocks_to_use of these.
//   Allocate a block number and return it in *blocknum.
//   Store the blocknum in the next_blocks structure, so it can be combined with the pivots at the next level of the tree.
//   Update n_blocks_used and n_translations.
// This code cannot be called in parallel because of all the race conditions.
// The actual creation of the node can be called in parallel after this work is done.
{
    //printf("Nonleaf has children :"); for(int i=0; i<n_children; i++) printf(" %ld", subtrees->subtrees[i].block); printf("\n");

2851 2852
    int result = 0;
    
2853
    DBT *MALLOC_N(n_children, pivots);
2854 2855 2856
    if (pivots == NULL) {
        result = get_error_errno();
    }
2857

2858 2859
    if (result == 0) {
        int r = read_some_pivots(pivots_file, n_children, bl, pivots);
2860
        if (r)
2861 2862
            result = r;
    }
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
2863

2864 2865 2866
    if (result == 0) {
        FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file);
        int r = bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, bl);
2867
        if (r)
2868 2869
            result = r;
    }
2870

2871 2872 2873 2874 2875
    if (result == 0) {
        // The last pivot was written to the next_pivots file, so we free it now instead of returning it.
        toku_free(pivots[n_children-1].data);
        pivots[n_children-1] = zero_dbt;

Dave Wells's avatar
Dave Wells committed
2876
        struct subtree_info *XMALLOC_N(n_children, subtrees_array);
2877 2878 2879 2880
        for (int i = 0; i < n_children; i++) {
            int64_t from_blocknum = first_child_offset_in_subtrees + i;
            subtrees_array[i] = subtrees->subtrees[from_blocknum];
        }
2881

2882 2883 2884 2885 2886
        int r = allocate_block(out, blocknum);
        if (r) {
            toku_free(subtrees_array);
            result = r;
        } else {
2887
            allocate_node(next_subtrees, *blocknum);
2888 2889 2890 2891 2892
            
            *pivots_p = pivots;
            *subtrees_info_p = subtrees_array;
        }
    }
2893

2894 2895 2896 2897
    if (result != 0) {
        if (pivots) {
            delete_pivots(pivots, n_children); pivots = NULL;
        }
2898
    }
2899

2900
    return result;
2901 2902
}

2903
static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum_of_new_node, int n_children,
2904
                                DBT *pivots, /* must free this array, as well as the things it points t */
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2905
                                struct subtree_info *subtree_info, int height, const DESCRIPTOR UU(desc), uint32_t UU(target_nodesize), uint32_t target_basementnodesize, enum toku_compression_method target_compression_method)
2906
{
2907
    //Nodes do not currently touch descriptors
2908
    invariant(height > 0);
2909 2910 2911

    int result = 0;

2912 2913
    FTNODE XMALLOC(node);
    toku_initialize_empty_ftnode(node, make_blocknum(blocknum_of_new_node), height, n_children,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2914
                                  FT_LAYOUT_VERSION, 0);
2915
    node->totalchildkeylens = 0;
2916
    for (int i=0; i<n_children-1; i++) {
2917
        toku_clone_dbt(&node->childkeys[i], pivots[i]);
2918
        node->totalchildkeylens += pivots[i].size;
2919
    }
2920
    assert(node->bp);
2921
    for (int i=0; i<n_children; i++) {
2922
        BP_BLOCKNUM(node,i)  = make_blocknum(subtree_info[i].block); 
2923
        BP_STATE(node,i) = PT_AVAIL;
2924 2925
    }

2926
    FTNODE_DISK_DATA ndd = NULL;
2927 2928
    if (result == 0) {
        size_t n_bytes;
2929
        size_t n_uncompressed_bytes;
2930 2931
        char *bytes;
        int r;
2932
        r = toku_serialize_ftnode_to_memory(node, &ndd, target_basementnodesize, target_compression_method, true, true, &n_bytes, &n_uncompressed_bytes, &bytes);
2933
        if (r) {
2934
            result = r;
2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948
        } else {
            dbout_lock(out);
            out->translation[blocknum_of_new_node].off = out->current_off;
            out->translation[blocknum_of_new_node].size = n_bytes;
            //fprintf(stderr, "Wrote internal node at %ld (%ld bytes)\n", out->current_off, n_bytes);
            //for (uint32_t i=0; i<n_bytes; i++) { unsigned char b = bytes[i]; printf("%d:%02x (%d) ('%c')\n", i, b, b, (b>=' ' && b<128) ? b : '*'); }
            r = write_literal(out, bytes, n_bytes); 
            if (r)
                result = r;
            else
                seek_align_locked(out);
            dbout_unlock(out);
            toku_free(bytes);
        }
2949
    }
2950 2951

    for (int i=0; i<n_children-1; i++) {
2952 2953
        toku_free(pivots[i].data);
        toku_free(node->childkeys[i].data);
2954 2955
    }
    for (int i=0; i<n_children; i++) {
2956
        destroy_nonleaf_childinfo(BNC(node,i));
2957 2958
    }
    toku_free(pivots);
2959
    toku_free(node->bp);
2960
    toku_free(node->childkeys);
2961
    toku_free(node);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
2962
    toku_free(ndd);
2963 2964
    toku_free(subtree_info);

2965
    if (result != 0)
Yoni Fogel's avatar
Yoni Fogel committed
2966
        ft_loader_set_panic(bl, result, true);
2967 2968
}

2969
static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method) {
2970 2971
    int result = 0;
    int height = 1;
2972 2973 2974 2975

    // Watch out for the case where we saved the last pivot but didn't write any more nodes out.
    // The trick is not to look at n_pivots, but to look at blocks.n_blocks
    while (sts->n_subtrees > 1) {
2976 2977 2978 2979 2980 2981 2982 2983 2984 2985
        // If there is more than one block in blocks, then we must build another level of the tree.

        // we need to create a pivots file for the pivots of the next level.
        // and a blocks_array
        // So for example.
        //  1) we grab 16 pivots and 16 blocks.
        //  2) We put the 15 pivots and 16 blocks into an non-leaf node.
        //  3) We put the 16th pivot into the next pivots file.
        {
            int r = fseek(toku_bl_fidx2file(bl, pivots_fidx), 0, SEEK_SET);
2986
            if (r!=0) { return get_error_errno(); }
2987 2988 2989 2990
        }

        FIDX next_pivots_file;
        { 
2991
            int r = ft_loader_open_temp_file (bl, &next_pivots_file); 
2992 2993
            if (r != 0) { result = r; break; } 
        }
2994

2995
        struct subtrees_info next_sts; 
2996
        subtrees_info_init(&next_sts);
2997 2998
        next_sts.n_subtrees = 0;
        next_sts.n_subtrees_limit = 1;
2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012
        XMALLOC_N(next_sts.n_subtrees_limit, next_sts.subtrees);

        const int n_per_block = 15;
        int64_t n_subtrees_used = 0;
        while (sts->n_subtrees - n_subtrees_used >= n_per_block*2) {
            // grab the first N_PER_BLOCK and build a node.
            DBT *pivots;
            int64_t blocknum_of_new_node;
            struct subtree_info *subtree_info;
            int r = setup_nonleaf_block (n_per_block,
                                         sts, pivots_fidx, n_subtrees_used,
                                         &next_sts, next_pivots_file,
                                         out, bl,
                                         &blocknum_of_new_node, &subtree_info, &pivots);
3013 3014 3015 3016
            if (r) {
                result = r;
                break;
            } else {
3017
                cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); // frees all the data structures that go into making the node.
3018 3019
                n_subtrees_used += n_per_block;
            }
3020
        }
3021 3022 3023 3024

        int64_t n_blocks_left = sts->n_subtrees - n_subtrees_used;
        if (result == 0) {
            // Now we have a one or two blocks at the end to handle.
Dave Wells's avatar
Dave Wells committed
3025
            invariant(n_blocks_left>=2);
3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039
            if (n_blocks_left > n_per_block) {
                // Write half the remaining blocks
                int64_t n_first = n_blocks_left/2;
                DBT *pivots;
                int64_t blocknum_of_new_node;
                struct subtree_info *subtree_info;
                int r = setup_nonleaf_block(n_first,
                                            sts, pivots_fidx, n_subtrees_used,
                                            &next_sts, next_pivots_file,
                                            out, bl,
                                            &blocknum_of_new_node, &subtree_info, &pivots);
                if (r) {
                    result = r;
                } else {
3040
                    cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
3041 3042 3043 3044
                    n_blocks_left -= n_first;
                    n_subtrees_used += n_first;
                }
            }
3045
        }
3046
        if (result == 0) {
3047 3048 3049 3050 3051 3052 3053 3054 3055
            // Write the last block. 
            DBT *pivots;
            int64_t blocknum_of_new_node;
            struct subtree_info *subtree_info;
            int r = setup_nonleaf_block(n_blocks_left,
                                        sts, pivots_fidx, n_subtrees_used,
                                        &next_sts, next_pivots_file,
                                        out, bl,
                                        &blocknum_of_new_node, &subtree_info, &pivots);
3056 3057 3058
            if (r) {
                result = r;
            } else {
3059
                cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
3060 3061
                n_subtrees_used += n_blocks_left;
            }
3062
        }
3063
        if (result == 0)
Dave Wells's avatar
Dave Wells committed
3064
            invariant(n_subtrees_used == sts->n_subtrees);
3065 3066 3067

        cilk_sync;

3068
        if (result == 0) // pick up write_nonleaf_node errors
3069 3070 3071
            result = ft_loader_get_error(&bl->error_callback);

        // Now set things up for the next iteration.
Yoni Fogel's avatar
Yoni Fogel committed
3072
        int r = ft_loader_fi_close(&bl->file_infos, pivots_fidx, true); if (r != 0 && result == 0) result = r;
3073 3074 3075 3076 3077
        r = ft_loader_fi_unlink(&bl->file_infos, pivots_fidx);    if (r != 0 && result == 0) result = r;
        pivots_fidx = next_pivots_file;
        toku_free(sts->subtrees); sts->subtrees = NULL;
        *sts = next_sts;
        height++;
3078 3079 3080

        if (result)
            break;
3081
    }
Yoni Fogel's avatar
Yoni Fogel committed
3082
    { int r = ft_loader_fi_close (&bl->file_infos, pivots_fidx, true); if (r != 0 && result == 0) result = r; }
3083
    { int r = ft_loader_fi_unlink(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r; }
3084
    return result;
3085 3086
}

3087 3088
void ft_loader_set_fractal_workers_count_from_c(FTLOADER bl) {
    ft_loader_set_fractal_workers_count (bl);
3089
}
3090

3091