ydb_db.c 33.4 KB
Newer Older
1
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
// vim: expandtab:ts=8:sw=4:softtabstop=4:
3 4
#ident "Copyright (c) 2007-2009 Tokutek Inc.  All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
5
#ident "$Id$"
6 7 8 9

#include <ctype.h>
#include <db.h>
#include "ydb-internal.h"
10
#include <ft/ft.h>
11 12
#include <ft/ft-flusher.h>
#include <ft/checkpoint.h>
13 14
#include "indexer.h"
#include "ydb_load.h"
15
#include <ft/log_header.h>
16 17 18 19
#include "ydb_cursor.h"
#include "ydb_row_lock.h"
#include "ydb_db.h"
#include "ydb_write.h"
20
#include <lock_tree/locktree.h>
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90

static YDB_DB_LAYER_STATUS_S ydb_db_layer_status;
#ifdef STATUS_VALUE
#undef STATUS_VALUE
#endif
#define STATUS_VALUE(x) ydb_db_layer_status.status[x].value.num

#define STATUS_INIT(k,t,l) { \
        ydb_db_layer_status.status[k].keyname = #k; \
        ydb_db_layer_status.status[k].type    = t;  \
        ydb_db_layer_status.status[k].legend  = l; \
    }

static void
ydb_db_layer_status_init (void) {
    // Note, this function initializes the keyname, type, and legend fields.
    // Value fields are initialized to zero by compiler.

    STATUS_INIT(YDB_LAYER_DIRECTORY_WRITE_LOCKS,      UINT64,   "directory write locks");
    STATUS_INIT(YDB_LAYER_DIRECTORY_WRITE_LOCKS_FAIL, UINT64,   "directory write locks fail");
    STATUS_INIT(YDB_LAYER_LOGSUPPRESS,                UINT64,   "log suppress");
    STATUS_INIT(YDB_LAYER_LOGSUPPRESS_FAIL,           UINT64,   "log suppress fail");
    ydb_db_layer_status.initialized = true;
}
#undef STATUS_INIT

void
ydb_db_layer_get_status(YDB_DB_LAYER_STATUS statp) {
    if (!ydb_db_layer_status.initialized)
        ydb_db_layer_status_init();
    *statp = ydb_db_layer_status;
}

static inline DBT*
init_dbt_realloc(DBT *dbt) {
    memset(dbt, 0, sizeof(*dbt));
    dbt->flags = DB_DBT_REALLOC;
    return dbt;
}

static void
create_iname_hint(const char *dname, char *hint) {
    //Requires: size of hint array must be > strlen(dname)
    //Copy alphanumeric characters only.
    //Replace strings of non-alphanumeric characters with a single underscore.
    BOOL underscored = FALSE;
    while (*dname) {
        if (isalnum(*dname)) {
            char c = *dname++;
            *hint++ = c;
            underscored = FALSE;
        }
        else {
            if (!underscored)
                *hint++ = '_';
            dname++;
            underscored = TRUE;
        }
    }
    *hint = '\0';
}


// n < 0  means to ignore mark and ignore n
// n >= 0 means to include mark ("_B_" or "_P_") with hex value of n in iname
// (intended for use by loader, which will create many inames using one txnid).
static char *
create_iname(DB_ENV *env, u_int64_t id, char *hint, char *mark, int n) {
    int bytes;
    char inamebase[strlen(hint) +
91 92 93 94
                   8 +  // hex file format version
                   16 + // hex id (normally the txnid)
                   8  + // hex value of n if non-neg
                   sizeof("_B___.tokudb")]; // extra pieces
95
    if (n < 0)
96
        bytes = snprintf(inamebase, sizeof(inamebase),
97
                         "%s_%"PRIx64"_%"PRIx32            ".tokudb",
98
                         hint, id, FT_LAYOUT_VERSION);
99
    else {
100 101
        invariant(strlen(mark) == 1);
        bytes = snprintf(inamebase, sizeof(inamebase),
102
                         "%s_%"PRIx64"_%"PRIx32"_%s_%"PRIx32".tokudb",
103
                         hint, id, FT_LAYOUT_VERSION, mark, n);
104 105 106 107 108 109 110 111 112 113 114 115 116 117
    }
    assert(bytes>0);
    assert(bytes<=(int)sizeof(inamebase)-1);
    char *rval;
    if (env->i->data_dir)
        rval = toku_construct_full_name(2, env->i->data_dir, inamebase);
    else
        rval = toku_construct_full_name(1, inamebase);
    assert(rval);
    return rval;
}

static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode);

118 119
// Effect: Do the work required of DB->close().
// requires: the multi_operation client lock is held.
120
int 
121
toku_db_close(DB * db) {
122
    int r = 0;
123 124 125 126
    if (db_opened(db) && db->i->dname) {
        // internal (non-user) dictionary has no dname
        env_note_db_closed(db->dbenv, db);  // tell env that this db is no longer in use by the user of this api (user-closed, may still be in use by fractal tree internals)
    }
127 128 129 130 131 132 133 134 135
    // close the ft handle, and possibly close the locktree
    toku_ft_handle_close(db->i->ft_handle);
    if (db->i->lt) {
        toku_lt_remove_db_ref(db->i->lt);
    }
    toku_sdbt_cleanup(&db->i->skey);
    toku_sdbt_cleanup(&db->i->sval);
    if (db->i->dname) {
        toku_free(db->i->dname);
136
    }
137 138
    toku_free(db->i);
    toku_free(db);
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
    return r;
}

///////////
//db_getf_XXX is equivalent to c_getf_XXX, without a persistent cursor

int
db_getf_set(DB *db, DB_TXN *txn, u_int32_t flags, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
    HANDLE_PANICKED_DB(db);
    HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
    DBC *c;
    uint32_t create_flags = flags & (DB_ISOLATION_FLAGS | DB_RMW);
    flags &= ~DB_ISOLATION_FLAGS;
    int r = toku_db_cursor_internal(db, txn, &c, create_flags | DBC_DISABLE_PREFETCHING, 1);
    if (r==0) {
        r = toku_c_getf_set(c, flags, key, f, extra);
        int r2 = toku_c_close(c);
        if (r==0) r = r2;
    }
    return r;
}

static inline int 
db_thread_need_flags(DBT *dbt) {
    return (dbt->flags & (DB_DBT_MALLOC+DB_DBT_REALLOC+DB_DBT_USERMEM)) == 0;
}

int 
toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
    HANDLE_PANICKED_DB(db);
    HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
    int r;
    u_int32_t iso_flags = flags & DB_ISOLATION_FLAGS;

    if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data))
        return EINVAL;

    u_int32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
    flags &= ~lock_flags;
    flags &= ~DB_ISOLATION_FLAGS;
    // And DB_GET_BOTH is no longer supported. #2862.
    if (flags != 0) return EINVAL;


    DBC *dbc;
    r = toku_db_cursor_internal(db, txn, &dbc, iso_flags | DBC_DISABLE_PREFETCHING, 1);
    if (r!=0) return r;
    u_int32_t c_get_flags = DB_SET;
    r = toku_c_get(dbc, key, data, c_get_flags | lock_flags);
    int r2 = toku_c_close(dbc);
    return r ? r : r2;
}

static int
db_open_subdb(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
    int r;
    if (!fname || !dbname) r = EINVAL;
    else {
        char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
        int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
        assert(bytes==(int)sizeof(subdb_full_name)-1);
        const char *null_subdbname = NULL;
        r = toku_db_open(db, txn, subdb_full_name, null_subdbname, dbtype, flags, mode);
    }
    return r;
}

// inames are created here.
// algorithm:
//  begin txn
//  convert dname to iname (possibly creating new iname)
210
//  open file (toku_ft_handle_open() will handle logging)
211 212
//  close txn
//  if created a new iname, take full range lock
213
// Requires: no checkpoint may take place during this function, which is enforced by holding the multi_operation_client_lock.
214 215 216
static int 
toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
    HANDLE_PANICKED_DB(db);
217
    if (dbname != NULL) {
218
        return db_open_subdb(db, txn, fname, dbname, dbtype, flags, mode);
219
    }
220 221 222

    // at this point fname is the dname
    //This code ONLY supports single-db files.
223
    assert(dbname == NULL);
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
    const char * dname = fname;  // db_open_subdb() converts (fname, dbname) to dname

    ////////////////////////////// do some level of parameter checking.
    u_int32_t unused_flags = flags;
    int r;
    if (dbtype!=DB_BTREE && dbtype!=DB_UNKNOWN) return EINVAL;
    int is_db_excl    = flags & DB_EXCL;    unused_flags&=~DB_EXCL;
    int is_db_create  = flags & DB_CREATE;  unused_flags&=~DB_CREATE;
    int is_db_hot_index  = flags & DB_IS_HOT_INDEX;  unused_flags&=~DB_IS_HOT_INDEX;

    //We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
                                            unused_flags&=~DB_READ_UNCOMMITTED;
                                            unused_flags&=~DB_READ_COMMITTED;
                                            unused_flags&=~DB_SERIALIZABLE;
    if (unused_flags & ~DB_THREAD) return EINVAL; // unknown flags
    if (is_db_excl && !is_db_create) return EINVAL;
    if (dbtype==DB_UNKNOWN && is_db_excl) return EINVAL;

    /* tokudb supports no duplicates and sorted duplicates only */
    unsigned int tflags;
244
    r = toku_ft_get_flags(db->i->ft_handle, &tflags);
245 246 247
    if (r != 0) 
        return r;

248 249 250
    if (db_opened(db)) {
        // it was already open
        return EINVAL;
251
    }
252
    //////////////////////////////
253 254 255 256 257 258 259 260

    // convert dname to iname
    //  - look up dname, get iname
    //  - if dname does not exist, create iname and make entry in directory
    DBT dname_dbt;  // holds dname
    DBT iname_dbt;  // holds iname_in_env
    toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
    init_dbt_realloc(&iname_dbt);  // sets iname_dbt.data = NULL
261
    r = toku_db_get(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE);  // allocates memory for iname
262
    char *iname = iname_dbt.data;
263
    if (r == DB_NOTFOUND && !is_db_create) {
264
        r = ENOENT;
265
    } else if (r==0 && is_db_excl) {
266
        r = EEXIST;
267
    } else if (r == DB_NOTFOUND) {
268 269 270 271 272
        char hint[strlen(dname) + 1];

        // create iname and make entry in directory
        u_int64_t id = 0;

273 274
        if (txn) {
            id = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn);
275 276 277 278 279
        }
        create_iname_hint(dname, hint);
        iname = create_iname(db->dbenv, id, hint, NULL, -1);  // allocated memory for iname
        toku_fill_dbt(&iname_dbt, iname, strlen(iname) + 1);
        //
280
        // put_flags will be 0 for performance only, avoid unnecessary query
281 282 283 284
        // if we are creating a hot index, per #3166, we do not want the write lock  in directory grabbed.
        // directory read lock is grabbed in toku_db_get above
        //
        u_int32_t put_flags = 0 | ((is_db_hot_index) ? DB_PRELOCKED_WRITE : 0); 
285
        r = toku_db_put(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, put_flags, TRUE);  
286 287 288 289
    }

    // we now have an iname
    if (r == 0) {
290 291
        r = db_open_iname(db, txn, iname, flags, mode);
        if (r == 0) {
292 293 294 295 296
            db->i->dname = toku_xstrdup(dname);
            env_note_db_opened(db->dbenv, db);  // tell env that a new db handle is open (using dname)
        }
    }

297 298
    if (iname) {
        toku_free(iname);
299 300 301 302
    }
    return r;
}

303 304 305 306 307 308 309 310
// set the descriptor and cmp_descriptor to the
// descriptors from the given ft
static void
db_set_descriptors(DB *db, FT_HANDLE ft_handle) {
    db->descriptor = toku_ft_get_descriptor(ft_handle);
    db->cmp_descriptor = toku_ft_get_cmp_descriptor(ft_handle);
}

311
// callback that sets the descriptors when 
312
// a dictionary is redirected at the ft layer
313 314 315 316
// I wonder if client applications can safely access
// the descriptor via db->descriptor, because
// a redirect may be happening underneath the covers.
// Need to investigate further.
317 318
static void
db_on_redirect_callback(FT_HANDLE ft_handle, void* extra) {
319 320
    DB *db = extra;
    db_set_descriptors(db, ft_handle);
321 322
}

323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
struct lt_on_create_callback_extra {
    DB_TXN *txn;
    FT_HANDLE ft_handle;
};

// when a locktree is created, clone a ft handle and store it
// as userdata so we can close it later.
static void
lt_on_create_callback(toku_lock_tree *lt, void *extra) {
    int r;
    struct lt_on_create_callback_extra *info = extra;
    TOKUTXN ttxn = info->txn ? db_txn_struct_i(info->txn)->tokutxn : NULL;
    FT_HANDLE ft_handle = info->ft_handle;

    FT_HANDLE cloned_ft_handle;
    r = toku_ft_handle_clone(&cloned_ft_handle, ft_handle, ttxn);
    invariant_zero(r);

    assert(toku_lt_get_userdata(lt) == NULL);
    toku_lt_set_userdata(lt, cloned_ft_handle);
}

// when a locktree closes, get its ft handle as userdata and close it.
static void
lt_on_close_callback(toku_lock_tree *lt) {
    FT_HANDLE ft_handle = toku_lt_get_userdata(lt);
    assert(ft_handle);
    toku_ft_handle_close(ft_handle);
}

353 354 355 356 357 358
int 
db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, u_int32_t flags, int mode) {
    int r;

    //Set comparison functions if not yet set.
    if (!db->i->key_compare_was_set && db->dbenv->i->bt_compare) {
359
        r = toku_ft_set_bt_compare(db->i->ft_handle, db->dbenv->i->bt_compare);
360 361 362 363
        assert(r==0);
        db->i->key_compare_was_set = TRUE;
    }
    if (db->dbenv->i->update_function) {
364
        r = toku_ft_set_update(db->i->ft_handle,db->dbenv->i->update_function);
365 366
        assert(r==0);
    }
367 368
    toku_ft_set_redirect_callback(
        db->i->ft_handle,
369 370 371
        db_on_redirect_callback,
        db
        );
372 373 374 375 376 377 378 379 380 381
    BOOL need_locktree = (BOOL)((db->dbenv->i->open_flags & DB_INIT_LOCK) &&
                                (db->dbenv->i->open_flags & DB_INIT_TXN));

    int is_db_excl    = flags & DB_EXCL;    flags&=~DB_EXCL;
    int is_db_create  = flags & DB_CREATE;  flags&=~DB_CREATE;
    //We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
                                            flags&=~DB_READ_UNCOMMITTED;
                                            flags&=~DB_READ_COMMITTED;
                                            flags&=~DB_SERIALIZABLE;
                                            flags&=~DB_IS_HOT_INDEX;
382 383 384 385
    // unknown or conflicting flags are bad
    if ((flags & ~DB_THREAD) || (is_db_excl && !is_db_create)) {
        return EINVAL;
    }
386 387 388

    /* tokudb supports no duplicates and sorted duplicates only */
    unsigned int tflags;
389
    r = toku_ft_get_flags(db->i->ft_handle, &tflags);
390
    if (r != 0)  {
391
        return r;
392
    }
393

394
    if (db_opened(db)) {
395
        return EINVAL;              /* It was already open. */
396
    }
397 398 399 400
    
    db->i->open_flags = flags;
    db->i->open_mode = mode;

401 402
    FT_HANDLE ft_handle = db->i->ft_handle;
    r = toku_ft_handle_open(ft_handle, iname_in_env,
403 404 405
                      is_db_create, is_db_excl,
                      db->dbenv->i->cachetable,
                      txn ? db_txn_struct_i(txn)->tokutxn : NULL_TXN);
406
    if (r != 0) {
407
        goto error_cleanup;
408
    }
409 410

    db->i->opened = 1;
411

412 413 414
    // now that the handle has successfully opened, a valid descriptor
    // is in the ft. we need to set the db's descriptor pointers
    db_set_descriptors(db, ft_handle);
415

416
    if (need_locktree) {
417
        db->i->dict_id = toku_ft_get_dictionary_id(db->i->ft_handle);
418 419 420 421 422 423 424 425 426
        struct lt_on_create_callback_extra on_create_extra = {
            .txn = txn,
            .ft_handle = db->i->ft_handle,
        };
        r = toku_ltm_get_lt(db->dbenv->i->ltm, &db->i->lt, db->i->dict_id, db->cmp_descriptor, 
                toku_ft_get_bt_compare(db->i->ft_handle), lt_on_create_callback, &on_create_extra, lt_on_close_callback);
        if (r != 0) { 
            goto error_cleanup; 
        }
427 428 429 430 431 432 433
    }
    return 0;
 
error_cleanup:
    db->i->dict_id = DICTIONARY_ID_NONE;
    db->i->opened = 0;
    if (db->i->lt) {
434
        toku_lt_remove_db_ref(db->i->lt);
435 436 437 438 439 440 441 442
        db->i->lt = NULL;
    }
    return r;
}

// Return the maximum key and val size in 
// *key_size and *val_size respectively
static void
443
toku_db_get_max_row_size(DB * UU(db), uint32_t * max_key_size, uint32_t * max_val_size) {
444 445
    *max_key_size = 0;
    *max_val_size = 0;
446
    toku_ft_get_maximum_advised_key_value_lengths(max_key_size, max_val_size);
447 448 449 450 451 452 453 454 455 456 457 458
}

int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
    // bad hack because some environment dictionaries do not have a dname
    char *dname = db->i->dname;
    if (!dname)
        return 0;

    DBT key_in_directory = { .data = dname, .size = strlen(dname)+1 };
    //Left end of range == right end of range (point lock)
    int r = get_range_lock(db->dbenv->i->directory, txn, &key_in_directory, &key_in_directory, LOCK_REQUEST_WRITE);
    if (r == 0)
459
        STATUS_VALUE(YDB_LAYER_DIRECTORY_WRITE_LOCKS)++;  // accountability 
460
    else
461
        STATUS_VALUE(YDB_LAYER_DIRECTORY_WRITE_LOCKS_FAIL)++;  // accountability 
462 463 464 465
    return r;
}

//
466 467
// This function is used both to set an initial descriptor of a DB and to
// change a descriptor. (only way to set a descriptor of a DB)
468
//
469 470 471 472 473 474
// Requires:
//  - The caller must not call put_multiple, del_multiple, or update_multiple concurrently
//  - The caller must not have a hot index running concurrently on db
//  - If the caller has passed DB_UPDATE_CMP_DESCRIPTOR as a flag, then he is calling this function
//     ONLY immediately after creating the dictionary and before doing any actual work on the dictionary.
//
475 476 477 478 479 480 481 482
static int 
toku_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, u_int32_t flags) {
    HANDLE_PANICKED_DB(db);
    HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
    int r;
    TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
    DBT old_descriptor;
    BOOL is_db_hot_index  = ((flags & DB_IS_HOT_INDEX) != 0);
483
    BOOL update_cmp_descriptor = ((flags & DB_UPDATE_CMP_DESCRIPTOR) != 0);
484 485 486 487 488 489 490 491 492 493

    toku_init_dbt(&old_descriptor);
    if (!db_opened(db) || !txn || !descriptor || (descriptor->size>0 && !descriptor->data)){
        r = EINVAL;
        goto cleanup;
    }
    if (txn->parent != NULL) {
        r = EINVAL; // cannot have a parent if you are a resetting op
        goto cleanup;
    }
494 495 496
    // For a hot index, this is an initial descriptor.
    // We do not support (yet) hcad with hot index concurrently on a single table, which
    // would require changing a descriptor for a hot index.
497
    if (!is_db_hot_index) {
498
        r = toku_db_pre_acquire_table_lock(db, txn);
499 500
        if (r != 0) { goto cleanup; }    
    }
501

502 503
    old_descriptor.size = db->descriptor->dbt.size;
    old_descriptor.data = toku_memdup(db->descriptor->dbt.data, db->descriptor->dbt.size);
504 505
    r = toku_ft_change_descriptor(
        db->i->ft_handle, 
506 507 508 509 510 511
        &old_descriptor, 
        descriptor, 
        TRUE, 
        ttxn, 
        update_cmp_descriptor
        );
512 513 514 515 516 517 518 519
    if (r != 0) { goto cleanup; }

    // the lock tree uses a copy of the header's descriptor for comparisons.
    // if we need to update the cmp descriptor, we need to make sure the lock
    // tree can get a copy of the new descriptor.
    if (update_cmp_descriptor) {
        toku_lt_update_descriptor(db->i->lt, db->cmp_descriptor);
    }
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
cleanup:
    if (old_descriptor.data) toku_free(old_descriptor.data);
    return r;
}

static int 
toku_db_set_flags(DB *db, u_int32_t flags) {
    HANDLE_PANICKED_DB(db);

    /* the following matches BDB */
    if (db_opened(db) && flags != 0) return EINVAL;

    return 0;
}

static int 
toku_db_get_flags(DB *db, u_int32_t *pflags) {
    HANDLE_PANICKED_DB(db);
    if (!pflags) return EINVAL;
    *pflags = 0;
    return 0;
}

543 544 545 546 547 548 549 550
static int 
toku_db_change_pagesize(DB *db, u_int32_t pagesize) {
    HANDLE_PANICKED_DB(db);
    if (!db_opened(db)) return EINVAL;
    toku_ft_handle_set_nodesize(db->i->ft_handle, pagesize);
    return 0;
}

551 552 553
static int 
toku_db_set_pagesize(DB *db, u_int32_t pagesize) {
    HANDLE_PANICKED_DB(db);
554 555 556
    if (db_opened(db)) return EINVAL;
    toku_ft_handle_set_nodesize(db->i->ft_handle, pagesize);
    return 0;
557 558 559 560 561
}

static int 
toku_db_get_pagesize(DB *db, u_int32_t *pagesize_ptr) {
    HANDLE_PANICKED_DB(db);
562 563 564 565 566 567 568 569 570 571
    toku_ft_handle_get_nodesize(db->i->ft_handle, pagesize_ptr);
    return 0;
}

static int 
toku_db_change_readpagesize(DB *db, u_int32_t readpagesize) {
    HANDLE_PANICKED_DB(db);
    if (!db_opened(db)) return EINVAL;
    toku_ft_handle_set_basementnodesize(db->i->ft_handle, readpagesize);
    return 0;
572 573 574 575 576
}

static int 
toku_db_set_readpagesize(DB *db, u_int32_t readpagesize) {
    HANDLE_PANICKED_DB(db);
577 578 579
    if (db_opened(db)) return EINVAL;
    toku_ft_handle_set_basementnodesize(db->i->ft_handle, readpagesize);
    return 0;
580 581 582 583 584
}

static int 
toku_db_get_readpagesize(DB *db, u_int32_t *readpagesize_ptr) {
    HANDLE_PANICKED_DB(db);
585 586 587 588 589 590 591 592 593 594
    toku_ft_handle_get_basementnodesize(db->i->ft_handle, readpagesize_ptr);
    return 0;
}

static int 
toku_db_change_compression_method(DB *db, enum toku_compression_method compression_method) {
    HANDLE_PANICKED_DB(db);
    if (!db_opened(db)) return EINVAL;
    toku_ft_handle_set_compression_method(db->i->ft_handle, compression_method);
    return 0;
595 596
}

597 598 599
static int 
toku_db_set_compression_method(DB *db, enum toku_compression_method compression_method) {
    HANDLE_PANICKED_DB(db);
600 601 602
    if (db_opened(db)) return EINVAL;
    toku_ft_handle_set_compression_method(db->i->ft_handle, compression_method);
    return 0;
603 604 605 606 607
}

static int 
toku_db_get_compression_method(DB *db, enum toku_compression_method *compression_method_ptr) {
    HANDLE_PANICKED_DB(db);
608 609
    toku_ft_handle_get_compression_method(db->i->ft_handle, compression_method_ptr);
    return 0;
610 611
}

612 613 614 615
static int 
toku_db_stat64(DB * db, DB_TXN *txn, DB_BTREE_STAT64 *s) {
    HANDLE_PANICKED_DB(db);
    HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
616
    struct ftstat64_s ftstat;
617 618 619 620
    TOKUTXN tokutxn = NULL;
    if (txn != NULL) {
        tokutxn = db_txn_struct_i(txn)->tokutxn;
    }
621
    int r = toku_ft_handle_stat64(db->i->ft_handle, tokutxn, &ftstat);
622
    if (r==0) {
623 624 625 626 627 628 629
        s->bt_nkeys = ftstat.nkeys;
        s->bt_ndata = ftstat.ndata;
        s->bt_dsize = ftstat.dsize;
        s->bt_fsize = ftstat.fsize;
        s->bt_create_time_sec = ftstat.create_time_sec;
        s->bt_modify_time_sec = ftstat.modify_time_sec;
        s->bt_verify_time_sec = ftstat.verify_time_sec;
630 631 632 633 634 635 636 637 638
    }
    return r;
}

static int 
toku_db_key_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* key, u_int64_t* less, u_int64_t* equal, u_int64_t* greater, int* is_exact) {
    HANDLE_PANICKED_DB(db);
    HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);

639
    // note that toku_ft_keyrange does not have a txn param
640 641 642
    // this will be fixed later
    // temporarily, because the caller, locked_db_keyrange, 
    // has the ydb lock, we are ok
643
    int r = toku_ft_keyrange(db->i->ft_handle, key, less, equal, greater);
644
    if (r != 0) { goto cleanup; }
645
    // temporarily set is_exact to 0 because ft_keyrange does not have this parameter
646 647 648 649 650 651 652
    *is_exact = 0;
cleanup:
    return r;
}

// needed by loader.c
int 
653
toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
654 655 656 657 658 659 660 661
    HANDLE_PANICKED_DB(db);
    if (!db->i->lt || !txn) return 0;
    int r;
    r = get_range_lock(db, txn, toku_lt_neg_infinity, toku_lt_infinity, LOCK_REQUEST_WRITE);
    return r;
}

static int 
662
locked_db_close(DB * db, u_int32_t UU(flags)) {
663 664
    // cannot begin a checkpoint
    toku_multi_operation_client_lock();
665
    int r = toku_db_close(db);
666
    toku_multi_operation_client_unlock();
667 668 669 670 671 672
    return r;
}

int 
autotxn_db_get(DB* db, DB_TXN* txn, DBT* key, DBT* data, u_int32_t flags) {
    BOOL changed; int r;
673
    r = toku_db_construct_autotxn(db, &txn, &changed, FALSE);
674 675
    if (r!=0) return r;
    r = toku_db_get(db, txn, key, data, flags);
676
    return toku_db_destruct_autotxn(txn, r, changed);
677 678 679 680 681
}

static inline int 
autotxn_db_getf_set (DB *db, DB_TXN *txn, u_int32_t flags, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
    BOOL changed; int r;
682
    r = toku_db_construct_autotxn(db, &txn, &changed, FALSE);
683 684
    if (r!=0) return r;
    r = db_getf_set(db, txn, flags, key, f, extra);
685
    return toku_db_destruct_autotxn(txn, r, changed);
686 687 688 689
}

static int 
locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
690 691 692 693 694 695 696
    int ret, r;
    HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);

    DB_ENV *env = db->dbenv;
    DB_TXN *child_txn = NULL;
    int using_txns = env->i->open_flags & DB_INIT_TXN;
    if (using_txns) {
697
        ret = toku_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
        invariant_zero(ret);
    }

    // cannot begin a checkpoint
    toku_multi_operation_client_lock();
    r = toku_db_open(db, child_txn, fname, dbname, dbtype, flags & ~DB_AUTO_COMMIT, mode);
    toku_multi_operation_client_unlock();

    if (using_txns) {
        if (r == 0) {
            ret = locked_txn_commit(child_txn, DB_TXN_NOSYNC);
            invariant_zero(ret);
        } else {
            ret = locked_txn_abort(child_txn);
            invariant_zero(ret);
        }
    }
715 716 717 718 719
    return r;
}

static int 
locked_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, u_int32_t flags) {
720
    toku_multi_operation_client_lock(); //Cannot begin checkpoint
721
    int r = toku_db_change_descriptor(db, txn, descriptor, flags);
722
    toku_multi_operation_client_unlock(); //Can now begin checkpoint
723 724 725 726
    return r;
}

static void 
727
toku_db_set_errfile (DB *db, FILE *errfile) {
728 729 730 731 732
    db->dbenv->set_errfile(db->dbenv, errfile);
}

// TODO 2216 delete this
static int 
733
toku_db_fd(DB * UU(db), int * UU(fdp)) {
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
    return 0;
}
static const DBT* toku_db_dbt_pos_infty(void) __attribute__((pure));
static const DBT*
toku_db_dbt_pos_infty(void) {
    return toku_lt_infinity;
}

static const DBT* toku_db_dbt_neg_infty(void) __attribute__((pure));
static const DBT* 
toku_db_dbt_neg_infty(void) {
    return toku_lt_neg_infinity;
}

static int
toku_db_optimize(DB *db) {
    HANDLE_PANICKED_DB(db);
751
    int r = toku_ft_optimize(db->i->ft_handle);
752 753 754 755 756 757 758 759 760 761 762
    return r;
}

static int
toku_db_hot_optimize(DB *db,
                     int (*progress_callback)(void *extra, float progress),
                     void *progress_extra)
{
    HANDLE_PANICKED_DB(db);
    int r = 0;
    // If we areunable to get a directory read lock, do nothing.
763
    r = toku_ft_hot_optimize(db->i->ft_handle,
764 765 766 767 768 769 770 771
                              progress_callback,
                              progress_extra);

    return r;
}

static int 
locked_db_optimize(DB *db) {
772 773 774
    // need to protect from checkpointing because
    // toku_db_optimize does a message injection
    toku_multi_operation_client_lock(); //Cannot begin checkpoint
775
    int r = toku_db_optimize(db);
776
    toku_multi_operation_client_unlock();
777 778 779 780
    return r;
}

static int
781
toku_db_get_fragmentation(DB * db, TOKU_DB_FRAGMENTATION report) {
782 783 784 785 786
    HANDLE_PANICKED_DB(db);
    int r;
    if (!db_opened(db))
        r = toku_ydb_do_error(db->dbenv, EINVAL, "Fragmentation report available only on open DBs.\n");
    else
787
        r = toku_ft_get_fragmentation(db->i->ft_handle, report);
788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
    return r;
}

int 
toku_db_set_indexer(DB *db, DB_INDEXER * indexer) {
    int r = 0;
    if ( db->i->indexer != NULL && indexer != NULL ) {
        // you are trying to overwrite a valid indexer
        r = EINVAL;
    }
    else {
        db->i->indexer = indexer;
    }
    return r;
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
804 805 806 807 808
DB_INDEXER *
toku_db_get_indexer(DB *db) {
    return db->i->indexer;
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
809
static void 
Zardosht Kasheff's avatar
Zardosht Kasheff committed
810 811
db_get_indexer(DB *db, DB_INDEXER **indexer_ptr) {
    *indexer_ptr = toku_db_get_indexer(db);
812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
}

struct ydb_verify_context {
    int (*progress_callback)(void *extra, float progress);
    void *progress_extra;
};

static int
ydb_verify_progress_callback(void *extra, float progress) {
    struct ydb_verify_context *context = (struct ydb_verify_context *) extra;
    int r = 0;
    if (context->progress_callback) {
        r = context->progress_callback(context->progress_extra, progress);
    }
    return r;
}

static int
toku_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float progress), void *progress_extra, int verbose, int keep_going) {
    struct ydb_verify_context context = { progress_callback, progress_extra };
832
    int r = toku_verify_ft_with_progress(db->i->ft_handle, ydb_verify_progress_callback, &context, verbose, keep_going);
833 834 835
    return r;
}

836
int toku_setup_db_internal (DB **dbp, DB_ENV *env, u_int32_t flags, FT_HANDLE brt, bool is_open) {
837 838 839 840 841 842 843 844 845 846 847 848
    if (flags || env == NULL) 
        return EINVAL;

    if (!env_opened(env))
        return EINVAL;
    
    DB *MALLOC(result);
    if (result == 0) {
        return ENOMEM;
    }
    memset(result, 0, sizeof *result);
    result->dbenv = env;
849 850 851 852 853 854
    MALLOC(result->i);
    if (result->i == 0) {
        toku_free(result);
        return ENOMEM;
    }
    memset(result->i, 0, sizeof *result->i);
855
    result->i->ft_handle = brt;
856 857 858 859 860 861 862 863 864 865 866 867 868 869
    result->i->opened = is_open;
    *dbp = result;
    return 0;
}

int 
toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
    if (flags || env == NULL) 
        return EINVAL;

    if (!env_opened(env))
        return EINVAL;
    

870
    FT_HANDLE brt;
871
    int r;
872
    r = toku_ft_handle_create(&brt);
873 874 875 876 877 878 879
    if (r!=0) return r;

    r = toku_setup_db_internal(db, env, flags, brt, false);
    if (r != 0) return r;


    DB *result=*db;
880 881 882 883 884 885 886
    // methods that grab the ydb lock
#define SDB(name) result->name = locked_db_ ## name
    SDB(close);
    SDB(open);
    SDB(change_descriptor);
    SDB(optimize);
#undef SDB
887 888 889 890 891
    // methods that do not take the ydb lock
#define USDB(name) result->name = toku_db_ ## name
    USDB(set_errfile);
    USDB(set_pagesize);
    USDB(get_pagesize);
892
    USDB(change_pagesize);
893 894
    USDB(set_readpagesize);
    USDB(get_readpagesize);
895
    USDB(change_readpagesize);
896 897
    USDB(set_compression_method);
    USDB(get_compression_method);
898
    USDB(change_compression_method);
899 900 901 902 903 904 905 906 907 908 909 910 911 912
    USDB(set_flags);
    USDB(get_flags);
    USDB(fd);
    USDB(get_max_row_size);
    USDB(set_indexer);
    USDB(pre_acquire_table_lock);
    USDB(pre_acquire_fileops_lock);
    USDB(key_range64);
    USDB(hot_optimize);
    USDB(stat64);
    USDB(verify_with_progress);
    USDB(cursor);
    USDB(dbt_pos_infty);
    USDB(dbt_neg_infty);
913
    USDB(get_fragmentation);
914
#undef USDB
Zardosht Kasheff's avatar
Zardosht Kasheff committed
915
    result->get_indexer = db_get_indexer;
916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
    result->del = autotxn_db_del;
    result->put = autotxn_db_put;
    result->update = autotxn_db_update;
    result->update_broadcast = autotxn_db_update_broadcast;
    
    // unlocked methods
    result->get = autotxn_db_get;
    result->getf_set = autotxn_db_getf_set;
    
    result->i->dict_id = DICTIONARY_ID_NONE;
    result->i->opened = 0;
    result->i->open_flags = 0;
    result->i->open_mode = 0;
    result->i->indexer = NULL;
    *db = result;
    return 0;
}

934
// When the loader is created, it makes this call (toku_env_load_inames).
935 936 937 938 939 940 941 942 943 944 945
// For each dictionary to be loaded, replace old iname in directory
// with a newly generated iname.  This will also take a write lock
// on the directory entries.  The write lock will be released when
// the transaction of the loader is completed.
// If the transaction commits, the new inames are in place.
// If the transaction aborts, the old inames will be restored.
// The new inames are returned to the caller.  
// It is the caller's responsibility to free them.
// If "mark_as_loader" is true, then include a mark in the iname
// to indicate that the file is created by the brt loader.
// Return 0 on success (could fail if write lock not available).
946 947 948
static int
load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N], LSN *load_lsn, BOOL mark_as_loader) {
    int rval = 0;
949 950 951 952 953 954 955 956
    int i;
    
    TXNID xid = 0;
    DBT dname_dbt;  // holds dname
    DBT iname_dbt;  // holds new iname
    
    char * mark;

957
    if (mark_as_loader) {
958
        mark = "B";
959
    } else {
960
        mark = "P";
961
    }
962 963

    for (i=0; i<N; i++) {
964
        new_inames_in_env[i] = NULL;
965 966
    }

967 968
    if (txn) {
        xid = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn);
969 970
    }
    for (i = 0; i < N; i++) {
971 972 973 974 975 976 977
        char * dname = dbs[i]->i->dname;
        toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
        // now create new iname
        char hint[strlen(dname) + 1];
        create_iname_hint(dname, hint);
        char * new_iname = create_iname(env, xid, hint, mark, i);               // allocates memory for iname_in_env
        new_inames_in_env[i] = new_iname;
978
        toku_fill_dbt(&iname_dbt, new_iname, strlen(new_iname) + 1);      // iname_in_env goes in directory
979
        rval = toku_db_put(env->i->directory, txn, &dname_dbt, &iname_dbt, 0, TRUE);
980
        if (rval) break;
981 982 983
    }

    // Generate load log entries.
984
    if (!rval && txn) {
985 986 987 988
        TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
        int do_fsync = 0;
        LSN *get_lsn = NULL;
        for (i = 0; i < N; i++) {
989
            FT_HANDLE brt  = dbs[i]->i->ft_handle;
990 991 992 993 994
            //Fsync is necessary for the last one only.
            if (i==N-1) {
                do_fsync = 1; //We only need a single fsync of logs.
                get_lsn  = load_lsn; //Set pointer to capture the last lsn.
            }
995
            rval = toku_ft_load(brt, ttxn, new_inames_in_env[i], do_fsync, get_lsn);
996 997 998
            if (rval) break;
        }
    }
999 1000 1001 1002 1003 1004 1005 1006 1007 1008
    return rval;
}

int
locked_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N], LSN *load_lsn, BOOL mark_as_loader) {
    int ret, r;

    DB_TXN *child_txn = NULL;
    int using_txns = env->i->open_flags & DB_INIT_TXN;
    if (using_txns) {
1009
        ret = toku_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
1010 1011 1012 1013 1014 1015 1016 1017
        invariant_zero(ret);
    }

    // cannot begin a checkpoint
    toku_multi_operation_client_lock();
    r = load_inames(env, child_txn, N, dbs, new_inames_in_env, load_lsn, mark_as_loader);
    toku_multi_operation_client_unlock();

1018
    if (using_txns) {
1019 1020 1021 1022 1023 1024 1025
        if (r == 0) {
            ret = locked_txn_commit(child_txn, DB_TXN_NOSYNC);
            invariant_zero(ret);
        } else {
            ret = locked_txn_abort(child_txn);
            invariant_zero(ret);
            for (int i = 0; i < N; i++) {
1026 1027 1028 1029 1030 1031
                if (new_inames_in_env[i]) {
                    toku_free(new_inames_in_env[i]);
                    new_inames_in_env[i] = NULL;
                }
            }
        }
1032
    }
1033
    return r;
1034 1035 1036

}

1037

1038 1039
#undef STATUS_VALUE

1040 1041
#include <valgrind/helgrind.h>
void __attribute__((constructor)) toku_ydb_db_helgrind_ignore(void);
1042
void
1043 1044
toku_ydb_db_helgrind_ignore(void) {
    VALGRIND_HG_DISABLE_CHECKING(&ydb_db_layer_status, sizeof ydb_db_layer_status);
1045
}