loader.c 17.8 KB
Newer Older
1 2 3 4 5 6 7 8
/* -*- mode: C; c-basic-offset: 4 -*-
 *
 * Copyright (c) 2007, 2008, 2009, 2010 Tokutek Inc.  All rights reserved." 
 * The technology is licensed by the Massachusetts Institute of Technology, 
 * Rutgers State University of New Jersey, and the Research Foundation of 
 * State University of New York at Stony Brook under United States of America 
 * Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
 */
9 10 11 12
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ident "Copyright (c) 2007-2009 Tokutek Inc.  All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ident "$Id$"
13 14

/*
15
 *   The loader
16 17
 */

18
#include <toku_portability.h>
19 20
#include <stdio.h>
#include <string.h>
21
#include "ydb-internal.h"
22
#include "../newbrt/brtloader.h"
23
#include "loader.h"
24 25 26
#include "ydb_load.h"
#include "checkpoint.h"
#include "brt-internal.h"
27
#include "ydb_db.h"
28

Dave Wells's avatar
Dave Wells committed
29 30 31 32 33

#define lazy_assert(a) assert(a) // indicates code is incomplete 
#define invariant(a) assert(a) // indicates a code invariant that must be true
#define resource_assert(a) assert(a) // indicates resource must be available, otherwise unrecoverable

34
enum {MAX_FILE_SIZE=256};
35

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
///////////////////////////////////////////////////////////////////////////////////
// Engine status
//
// Status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.

static LOADER_STATUS_S loader_status;

#define STATUS_INIT(k,t,l) { \
	loader_status.status[k].keyname = #k; \
	loader_status.status[k].type    = t;  \
	loader_status.status[k].legend  = "loader: " l; \
    }

static void
status_init(void) {
    // Note, this function initializes the keyname, type, and legend fields.
    // Value fields are initialized to zero by compiler.
    STATUS_INIT(LOADER_CREATE,      UINT64, "number of loaders successfully created");
    STATUS_INIT(LOADER_CREATE_FAIL, UINT64, "number of calls to toku_loader_create_loader() that failed");
    STATUS_INIT(LOADER_PUT,       UINT64, "number of calls to loader->put() succeeded");
    STATUS_INIT(LOADER_PUT_FAIL,  UINT64, "number of calls to loader->put() failed");
    STATUS_INIT(LOADER_CLOSE,       UINT64, "number of calls to loader->close() that succeeded");
    STATUS_INIT(LOADER_CLOSE_FAIL,  UINT64, "number of calls to loader->close() that failed");
    STATUS_INIT(LOADER_ABORT,       UINT64, "number of calls to loader->abort()");
    STATUS_INIT(LOADER_CURRENT,     UINT64, "number of loaders currently in existence");
    STATUS_INIT(LOADER_MAX,         UINT64, "max number of loaders that ever existed simultaneously");
    loader_status.initialized = true;
}
#undef STATUS_INIT

void
toku_loader_get_status(LOADER_STATUS statp) {
    if (!loader_status.initialized)
	status_init();
    *statp = loader_status;
}

#define STATUS_VALUE(x) loader_status.status[x].value.num

76

77 78 79
struct __toku_loader_internal {
    DB_ENV *env;
    DB_TXN *txn;
80
    BRTLOADER brt_loader;
81 82 83
    int N;
    DB **dbs; /* [N] */
    DB *src_db;
84
    uint32_t *db_flags;
85
    uint32_t *dbt_flags;
86
    uint32_t loader_flags;
87
    void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra);
88
    void *error_extra;
89 90
    int  (*poll_func)(void *poll_extra, float progress);
    void *poll_extra;
91 92 93 94
    char *temp_file_template;

    DBT *ekeys;
    DBT *evals;
95

96 97 98 99 100 101
    DBT err_key;   /* error key */
    DBT err_val;   /* error val */
    int err_i;     /* error i   */
    int err_errno;

    char **inames_in_env; /* [N]  inames of new files to be created */
102 103
};

104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
static void
loader_add_refs(DB_LOADER *loader) {
    if (loader->i->src_db)
        toku_db_add_ref(loader->i->src_db);
    for (int i = 0; i < loader->i->N; i++)
        toku_db_add_ref(loader->i->dbs[i]);
}

static void
loader_release_refs(DB_LOADER *loader) {
    if (loader->i->src_db)
        toku_db_release_ref(loader->i->src_db);
    for (int i = 0; i < loader->i->N; i++)
        toku_db_release_ref(loader->i->dbs[i]);
}

120 121 122 123
/*
 *  free_loader_resources() frees all of the resources associated with
 *      struct __toku_loader_internal 
 *  assumes any previously freed items set the field pointer to NULL
124
 *  Requires that the brt_loader is closed or destroyed before calling this function.
125 126 127 128
 */
static void free_loader_resources(DB_LOADER *loader) 
{
    if ( loader->i ) {
129
        loader_release_refs(loader);
130 131
        for (int i=0; i<loader->i->N; i++) {
            if (loader->i->ekeys &&
132 133
                loader->i->ekeys[i].data &&
                loader->i->ekeys[i].flags == DB_DBT_REALLOC) {
134 135 136
                toku_free(loader->i->ekeys[i].data);
            }
            if (loader->i->evals &&
137 138
                loader->i->evals[i].data &&
                loader->i->evals[i].flags == DB_DBT_REALLOC) {
139 140 141 142 143 144 145 146 147
                toku_free(loader->i->evals[i].data);
            }
        }
        if (loader->i->ekeys)              toku_free(loader->i->ekeys);
        if (loader->i->evals)              toku_free(loader->i->evals);

        if (loader->i->err_key.data)       toku_free(loader->i->err_key.data);
        if (loader->i->err_val.data)       toku_free(loader->i->err_val.data);

148 149 150 151 152 153
        if (loader->i->inames_in_env) {
            for (int i=0; i<loader->i->N; i++) {
                if (loader->i->inames_in_env[i]) toku_free(loader->i->inames_in_env[i]);
            }
            toku_free(loader->i->inames_in_env);
        }
154 155 156 157 158 159 160 161 162 163 164 165 166 167
        if (loader->i->temp_file_template) toku_free(loader->i->temp_file_template);

        // loader->i
        toku_free(loader->i);
        loader->i = NULL;
    }
}

static void free_loader(DB_LOADER *loader)
{
    if ( loader ) free_loader_resources(loader);
    toku_free(loader);
}

168
static const char *loader_temp_prefix = "tokuld"; // #2536
169 170
static const char *loader_temp_suffix = "XXXXXX";

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
static int brt_loader_close_and_redirect(DB_LOADER *loader) {
    int r;
    // use the bulk loader
    // in case you've been looking - here is where the real work is done!
    r = toku_brt_loader_close(loader->i->brt_loader,
                              loader->i->error_callback, loader->i->error_extra,
                              loader->i->poll_func,      loader->i->poll_extra);
    if ( r==0 ) {
        for (int i=0; i<loader->i->N; i++) {
            toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect.
            r = toku_dictionary_redirect(loader->i->inames_in_env[i],
                                         loader->i->dbs[i]->i->brt,
                                         db_txn_struct_i(loader->i->txn)->tokutxn);
            toku_ydb_unlock();
            if ( r!=0 ) break;
        }
    }
    return r;
}
190 191 192 193 194 195 196

// loader_flags currently has three possible values:
//   0                   use brt loader
//   USE_PUTS            do not use brt loader, use log suppression mechanism (2440)
//                       which results in recursive call here via toku_db_pre_acquire_table_lock()
//   DB_PRELOCKED_WRITE  do not use brt loader, this is the recursive (inner) call via 
//                       toku_db_pre_acquire_table_lock()
197 198 199 200 201 202
int toku_loader_create_loader(DB_ENV *env, 
                              DB_TXN *txn, 
                              DB_LOADER **blp, 
                              DB *src_db, 
                              int N, 
                              DB *dbs[], 
203
                              uint32_t db_flags[N], 
204
                              uint32_t dbt_flags[N], 
205
                              uint32_t loader_flags)
206
{
207
    int rval;
208
    BOOL use_brt_loader = (loader_flags == 0); 
209

210 211
    *blp = NULL;           // set later when created

212
    DB_LOADER *loader = NULL;
213
    XCALLOC(loader);       // init to all zeroes (thus initializing the error_callback and poll_func)
214
    XCALLOC(loader->i);    // init to all zeroes (thus initializing all pointers to NULL)
215 216 217 218 219

    loader->i->env                = env;
    loader->i->txn                = txn;
    loader->i->N                  = N;
    loader->i->dbs                = dbs;
220
    loader->i->src_db             = src_db;
221 222 223 224 225
    loader->i->db_flags           = db_flags;
    loader->i->dbt_flags          = dbt_flags;
    loader->i->loader_flags       = loader_flags;
    loader->i->temp_file_template = (char *)toku_malloc(MAX_FILE_SIZE);

226
    int n = snprintf(loader->i->temp_file_template, MAX_FILE_SIZE, "%s/%s%s", env->i->real_tmp_dir, loader_temp_prefix, loader_temp_suffix);
227
    if ( !(n>0 && n<MAX_FILE_SIZE) ) {
228
        rval = -1;
229
        goto create_exit;
230
    }
231 232 233 234 235

    memset(&loader->i->err_key, 0, sizeof(loader->i->err_key));
    memset(&loader->i->err_val, 0, sizeof(loader->i->err_val));
    loader->i->err_i      = 0;
    loader->i->err_errno  = 0;
236

237
    loader->set_error_callback     = toku_loader_set_error_callback;
238
    loader->set_poll_function      = toku_loader_set_poll_function;
239 240
    loader->put                    = toku_loader_put;
    loader->close                  = toku_loader_close;
241
    loader->abort                  = toku_loader_abort;
242

243
    int r = 0;
244 245
    // lock tables and check empty
    for(int i=0;i<N;i++) {
246
        if (!(loader_flags&DB_PRELOCKED_WRITE)) {
247
            r = toku_db_pre_acquire_table_lock(dbs[i], txn);
248
            if (r!=0) break;
249
        }
250
        r = !toku_brt_is_empty_fast(dbs[i]->i->brt);
251 252 253
        if (r!=0) break;
    }
    if ( r!=0 ) {
254
        rval = -1;
255
        goto create_exit;
256 257
    }

258
    {
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
        brt_compare_func compare_functions[N];
        for (int i=0; i<N; i++) {
            compare_functions[i] = env->i->bt_compare;
        }

        // time to open the big kahuna
        char **XMALLOC_N(N, new_inames_in_env);
        BRT *XMALLOC_N(N, brts);
        for (int i=0; i<N; i++) {
            brts[i] = dbs[i]->i->brt;
        }
        loader->i->ekeys = NULL;
        loader->i->evals = NULL;
        LSN load_lsn;
        r = ydb_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_brt_loader);
        if ( r!=0 ) {
            toku_free(new_inames_in_env);
            toku_free(brts);
            rval = r;
            goto create_exit;
        }
        TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
        r = toku_brt_loader_open(&loader->i->brt_loader,
                                 loader->i->env->i->cachetable,
                                 loader->i->env->i->generate_row_for_put,
                                 src_db,
                                 N,
                                 brts,
                                 (const char **)new_inames_in_env,
                                 compare_functions,
                                 loader->i->temp_file_template,
                                 load_lsn,
                                 ttxn);
        if ( r!=0 ) {
            toku_free(new_inames_in_env);
            toku_free(brts);
            rval = r;
            goto create_exit;
        }
        loader->i->inames_in_env = new_inames_in_env;
        toku_free(brts);

        if (loader->i->loader_flags & LOADER_USE_PUTS) {
            XCALLOC_N(loader->i->N, loader->i->ekeys);
            XCALLOC_N(loader->i->N, loader->i->evals);
            toku_ydb_unlock();
            // the following function grabs the ydb lock, so we
            // first unlock before calling it
            rval = brt_loader_close_and_redirect(loader);
            toku_ydb_lock();
            assert_zero(rval);
            for (int i=0; i<N; i++) {
                loader->i->ekeys[i].flags = DB_DBT_REALLOC;
                loader->i->evals[i].flags = DB_DBT_REALLOC;
                toku_brt_suppress_recovery_logs(dbs[i]->i->brt, db_txn_struct_i(txn)->tokutxn);
314
            }
315 316 317 318 319 320
            loader->i->brt_loader = NULL;
            // close the brtloader and skip to the redirection
            rval = 0;
        }

        rval = 0;
321
    }
322
    *blp = loader;
323
 create_exit:
324
    loader_add_refs(loader);
325
    if (rval == 0) {
326 327 328 329
        (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE), 1);
        (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CURRENT), 1);
        if (STATUS_VALUE(LOADER_CURRENT) > STATUS_VALUE(LOADER_MAX) )
            STATUS_VALUE(LOADER_MAX) = STATUS_VALUE(LOADER_CURRENT);  // not worth a lock to make threadsafe, may be inaccurate
330
    }
331
    else {
332
        (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE_FAIL), 1);
333 334
        free_loader(loader);
    }
335
    return rval;
336 337 338
}

int toku_loader_set_poll_function(DB_LOADER *loader,
339 340
                                  int (*poll_func)(void *extra, float progress),
				  void *poll_extra) 
341
{
342
    invariant(loader != NULL);
343
    loader->i->poll_func = poll_func;
344
    loader->i->poll_extra = poll_extra;
345 346 347
    return 0;
}

348
int toku_loader_set_error_callback(DB_LOADER *loader, 
349 350
                                   void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra),
				   void *error_extra) 
351
{
352
    invariant(loader != NULL);
353
    loader->i->error_callback = error_cb;
354
    loader->i->error_extra    = error_extra;
355 356 357 358 359
    return 0;
}

int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) 
{
360 361 362 363 364 365 366 367
    int r = 0;
    int i = 0;
    //      err_i is unused now( always 0).  How would we know which dictionary
    //      the error happens in?  (put_multiple and toku_brt_loader_put do NOT report
    //      which dictionary). 

    // skip put if error already found
    if ( loader->i->err_errno != 0 ) {
368 369
        r = -1;
	goto cleanup;
370
    }
371

372
    if (loader->i->loader_flags & LOADER_USE_PUTS) {
373 374 375 376 377 378 379 380
        r = loader->i->env->put_multiple(loader->i->env,
                                         loader->i->src_db, // src_db
                                         loader->i->txn,
                                         key, val,
                                         loader->i->N, // num_dbs
                                         loader->i->dbs, // (DB**)db_array
                                         loader->i->ekeys, 
                                         loader->i->evals,
381
                                         loader->i->db_flags); // flags_array
382 383
    }
    else {
384 385 386
        // calling toku_brt_loader_put without a lock assumes that the 
        //  handlerton is guaranteeing single access to the loader
        // future multi-threaded solutions may need to protect this call
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
        r = toku_brt_loader_put(loader->i->brt_loader, key, val);
    }
    if ( r != 0 ) {
        // spec says errors all happen on close
        //   - have to save key, val, errno (r) and i for duplicate callback
        loader->i->err_key.size = key->size;
        loader->i->err_key.data = toku_malloc(key->size);
        memcpy(loader->i->err_key.data, key->data, key->size);

        loader->i->err_val.size = val->size;
        loader->i->err_val.data = toku_malloc(val->size);
        memcpy(loader->i->err_val.data, val->data, val->size);

        loader->i->err_i = i;
        loader->i->err_errno = r;
402 403 404
        
        // deliberately return content free value
        //   - must call error_callback to get error info
405
        r = -1;
406
    }
407 408
 cleanup:
    if (r==0)
409
	STATUS_VALUE(LOADER_PUT)++;  // executed too often to be worth making threadsafe
410
    else
411
	STATUS_VALUE(LOADER_PUT_FAIL)++;
412
    return r;
413 414 415 416
}

int toku_loader_close(DB_LOADER *loader) 
{
417
    (void) __sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1);
418
    int r=0;
419 420
    if ( loader->i->err_errno != 0 ) {
        if ( loader->i->error_callback != NULL ) {
421
            loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
422
        }
423
        if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
424 425 426 427 428
            r = toku_brt_loader_abort(loader->i->brt_loader, TRUE);
        }
        else {
            r = loader->i->err_errno;
        }
429 430
    } 
    else { // no error outstanding 
431
        if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
432
            r = brt_loader_close_and_redirect(loader);
433 434
        }
    }
435
    toku_ydb_lock();
436
    free_loader(loader);
437
    toku_ydb_unlock();
438
    if (r==0)
439
	(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE), 1);
440
    else
441
	(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE_FAIL), 1);
442
    return r;
443 444
}

445 446
int toku_loader_abort(DB_LOADER *loader) 
{
447 448
    (void) __sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1);
    (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_ABORT), 1);
449 450 451 452 453
    int r=0;
    if ( loader->i->err_errno != 0 ) {
        if ( loader->i->error_callback != NULL ) {
            loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
        }
454
    }
455

456
    if (!(loader->i->loader_flags & LOADER_USE_PUTS) ) {
457
        r = toku_brt_loader_abort(loader->i->brt_loader, TRUE);
458
    }
459
    toku_ydb_lock();
460
    free_loader(loader);
461
    toku_ydb_unlock();
462
    return r;
463
}
464

465

466 467 468 469
// find all of the files in the environments home directory that match the loader temp name and remove them
int toku_loader_cleanup_temp_files(DB_ENV *env) {
    int result;
    struct dirent *de;
470
    char * dir = env->i->real_tmp_dir;
471
    DIR *d = opendir(dir);
472 473 474 475 476 477 478 479
    if (d==0) {
        result = errno; goto exit;
    }

    result = 0;
    while ((de = readdir(d))) {
        int r = memcmp(de->d_name, loader_temp_prefix, strlen(loader_temp_prefix));
        if (r == 0 && strlen(de->d_name) == strlen(loader_temp_prefix) + strlen(loader_temp_suffix)) {
480
            int fnamelen = strlen(dir) + 1 + strlen(de->d_name) + 1; // One for the slash and one for the trailing NUL.
481
            char fname[fnamelen];
482
            int l = snprintf(fname, fnamelen, "%s/%s", dir, de->d_name);
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
            assert(l+1 == fnamelen);
            r = unlink(fname);
            if (r!=0) {
                result = errno;
                perror("Trying to delete a rolltmp file");
            }
        }
    }
    {
        int r = closedir(d);
        if (r == -1) 
            result = errno;
    }

exit:
    return result;
}
500

501 502 503 504


#undef STATUS_VALUE