scanscan.c 16.4 KB
Newer Older
1
/* Scan the bench.tokudb/bench.db over and over. */
2
#define DONT_DEPRECATE_MALLOC
3

4
#include <toku_portability.h>
5
#include <toku_assert.h>
6 7 8
#include <db.h>
#include <errno.h>
#include <stdlib.h>
9
#include <string.h>
10
#include <fcntl.h>
11
#include <unistd.h>
12 13
#ifdef TOKUDB
#include "key.h"
14 15
#include "cachetable.h"
#include "trace_mem.h"
16
#endif
17

18
const char *pname;
19
enum run_mode { RUN_HWC, RUN_LWC, RUN_VERIFY, RUN_HEAVI, RUN_RANGE} run_mode = RUN_HWC;
20 21
int do_txns=1, prelock=0, prelockflag=0;
u_int32_t lock_flag = 0;
22
long limitcount=-1;
23
u_int32_t cachesize = 127*1024*1024;
24
static int do_mysql = 0;
25 26
static u_int64_t start_range = 0, end_range = 0;
static int n_experiments = 2;
27
static int verbose = 0;
28

29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
static int print_usage (const char *argv0) {
    fprintf(stderr, "Usage:\n%s [--verify-lwc | --lwc | --nohwc] [--prelock] [--prelockflag] [--prelockwriteflag] [--env DIR]\n", argv0);
    fprintf(stderr, "  --hwc               run heavy weight cursors (this is the default)\n");
    fprintf(stderr, "  --verify-lwc        means to run the light weight cursor and the heavyweight cursor to verify that they get the same answer.\n");
    fprintf(stderr, "  --lwc               run light weight cursors instead of heavy weight cursors\n");
    fprintf(stderr, "  --prelock           acquire a read lock on the entire table before running\n");
    fprintf(stderr, "  --prelockflag       pass DB_PRELOCKED to the the cursor get operation whenever the locks have been acquired\n");
    fprintf(stderr, "  --prelockwriteflag  pass DB_PRELOCKED_WRITE to the cursor get operation\n");
    fprintf(stderr, "  --nox               no transactions (no locking)\n");
    fprintf(stderr, "  --count <count>     read the first COUNT rows and then  stop.\n");
    fprintf(stderr, "  --cachesize <n>     set the env cachesize to <n>\n");
    fprintf(stderr, "  --mysql             compare keys that are mysql big int not null types\n");
    fprintf(stderr, "  --env DIR           put db files in DIR instead of default\n");
    return 1;
}

DB_ENV *env;
DB *db;
DB_TXN *tid=0;

#define STRINGIFY2(s) #s
#define STRINGIFY(s) STRINGIFY2(s)
const char *dbdir = "./bench."  STRINGIFY(DIRSUF); /* DIRSUF is passed in as a -D argument to the compiler. */
int env_open_flags_yesx = DB_CREATE|DB_PRIVATE|DB_INIT_MPOOL|DB_INIT_TXN|DB_INIT_LOG|DB_INIT_LOCK;
int env_open_flags_nox = DB_CREATE|DB_PRIVATE|DB_INIT_MPOOL;
char *dbfilename = "bench.db";

56 57 58 59 60 61
static double gettime (void) {
    struct timeval tv;
    int r = gettimeofday(&tv, 0);
    assert(r==0);
    return tv.tv_sec + 1e-6*tv.tv_usec;
}
62

63
static void parse_args (int argc, const char *argv[]) {
64
    pname=argv[0];
65
    argc--; argv++;
66
    int specified_run_mode=0;
67
    while (argc>0) {
68 69 70
        if (strcmp(*argv,"--verbose")==0) {
            verbose++;
        } else if (strcmp(*argv,"--verify-lwc")==0) {
71 72 73 74 75 76 77 78 79
	    if (specified_run_mode && run_mode!=RUN_VERIFY) { two_modes: fprintf(stderr, "You specified two run modes\n"); exit(1); }
	    run_mode = RUN_VERIFY;
	} else if (strcmp(*argv, "--lwc")==0)  {
	    if (specified_run_mode && run_mode!=RUN_LWC) goto two_modes;
	    run_mode = RUN_LWC;
	} else if (strcmp(*argv, "--hwc")==0)  {
	    if (specified_run_mode && run_mode!=RUN_VERIFY) goto two_modes;
	    run_mode = RUN_HWC;
	} else if (strcmp(*argv, "--prelock")==0) prelock=1;
80
#ifdef TOKUDB
81 82 83 84
	else if (strcmp(*argv, "--heavi")==0)  {
	    if (specified_run_mode && run_mode!=RUN_HEAVI) goto two_modes;
	    run_mode = RUN_HEAVI;
        }
85
        else if (strcmp(*argv, "--prelockflag")==0)      { prelockflag=1; lock_flag = DB_PRELOCKED; }
Yoni Fogel's avatar
Yoni Fogel committed
86
        else if (strcmp(*argv, "--prelockwriteflag")==0) { prelockflag=1; lock_flag = DB_PRELOCKED_WRITE; }
87
#endif
88
	else if (strcmp(*argv, "--nox")==0)              { do_txns=0; }
89 90
	else if (strcmp(*argv, "--count")==0)            {
	    char *end;
91
            argc--; argv++; 
92 93
	    errno=0; limitcount=strtol(*argv, &end, 10); assert(errno==0);
	    printf("Limiting count to %ld\n", limitcount);
94 95
        } else if (strcmp(*argv, "--cachesize")==0 && argc>0) {
            char *end;
96
            argc--; argv++; 
97
            cachesize=(u_int32_t)strtol(*argv, &end, 10);
98
	} else if (strcmp(*argv, "--env") == 0) {
99
            argc--; argv++;
100 101
	    if (argc==0) exit(print_usage(pname));
	    dbdir = *argv;
102 103
        } else if (strcmp(*argv, "--mysql") == 0) {
            do_mysql = 1;
104 105 106 107 108 109 110 111 112
        } else if (strcmp(*argv, "--range") == 0 && argc > 2) {
            run_mode = RUN_RANGE;
            argc--; argv++;
            start_range = strtoll(*argv, NULL, 10);
            argc--; argv++;
            end_range = strtoll(*argv, NULL, 10);
        } else if (strcmp(*argv, "--experiments") == 0 && argc > 1) {
            argc--; argv++;
            n_experiments = strtol(*argv, NULL, 10);
113 114 115
        } else if (strcmp(*argv, "--recover") == 0) {
            env_open_flags_yesx |= DB_RECOVER;
            env_open_flags_nox |= DB_RECOVER;
116
	} else {
117
            exit(print_usage(pname));
118
	}
119
	argc--; argv++;
120
    }
121 122 123 124 125 126
    //Prelocking is meaningless without transactions
    if (do_txns==0) {
        prelockflag=0;
        lock_flag=0;
        prelock=0;
    }
127 128 129
}


130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
static inline uint64_t mysql_get_bigint(unsigned char *d) {
    uint64_t r = 0;
    memcpy(&r, d, sizeof r);
    return r;
}

static int mysql_key_compare(DB *mydb __attribute__((unused)),
                               const DBT *adbt, const DBT *bdbt) {
    unsigned char *adata = adbt->data;
    unsigned char *bdata = bdbt->data;
    uint64_t a, b;
    assert(adbt->size == 9 && bdbt->size == 9);
    assert(adata[0] == 0 && bdata[0] == 0);
    a = mysql_get_bigint(adata+1);
    b = mysql_get_bigint(bdata+1);
    if (a < b) return -1;
    if (a > b) return +1;
    return 0;
}

150
static void scanscan_setup (void) {
151
    int r;
152
    r = db_env_create(&env, 0);                                                           assert(r==0);
153
    r = env->set_cachesize(env, 0, cachesize, 1);                                         assert(r==0);
154
    double tstart = gettime();
155
    r = env->open(env, dbdir, do_txns? env_open_flags_yesx : env_open_flags_nox, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);   assert(r==0);
156 157 158
    double tend = gettime();
    if (verbose)
        printf("env open %f seconds\n", tend-tstart);
159
    r = db_create(&db, env, 0);                                                           assert(r==0);
160 161 162
    if (do_mysql) {
        r = db->set_bt_compare(db, mysql_key_compare); assert(r == 0);
    }
163 164 165
    if (do_txns) {
	r = env->txn_begin(env, 0, &tid, 0);                                              assert(r==0);
    }
166
    r = db->open(db, tid, dbfilename, NULL, DB_BTREE, 0, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);                           assert(r==0);
167
#ifdef TOKUDB
168 169 170 171 172 173 174
    if (prelock) {
	r = db->pre_acquire_read_lock(db,
				      tid,
				      db->dbt_neg_infty(), db->dbt_neg_infty(),
				      db->dbt_pos_infty(), db->dbt_pos_infty());
	assert(r==0);
    }
175
#endif
176 177
}

178
static void scanscan_shutdown (void) {
179
    int r;
180
    r = db->close(db, 0);                                       assert(r==0);
181 182 183
    if (do_txns) {
	r = tid->commit(tid, 0);                                    assert(r==0);
    }
184
    r = env->close(env, 0);                                     assert(r==0);
185 186

#if 0 && defined TOKUDB
187
    {
Yoni Fogel's avatar
Yoni Fogel committed
188 189 190 191 192
	extern int toku_os_get_max_rss(int64_t*);
        int64_t mrss;
        int r = toku_os_get_max_rss(&mrss);
        assert(r==0);
	printf("maxrss=%.2fMB\n", mrss/256.0);
193
    }
194
#endif
195 196
}

197
static void scanscan_hwc (void) {
198 199
    int r;
    int counter=0;
200
    for (counter=0; counter<n_experiments; counter++) {
201 202 203
	long long totalbytes=0;
	int rowcounter=0;
	double prevtime = gettime();
204
	DBT k,v;
205 206
	DBC *dbc;
	r = db->cursor(db, tid, &dbc, 0);                           assert(r==0);
207 208
	memset(&k, 0, sizeof(k));
	memset(&v, 0, sizeof(v));
Yoni Fogel's avatar
Yoni Fogel committed
209 210 211 212 213
        u_int32_t c_get_flags = DB_NEXT;
        if (prelockflag && (counter || prelock)) {
            c_get_flags |= lock_flag;
        }
	while (0 == (r = dbc->c_get(dbc, &k, &v, c_get_flags))) {
214 215
	    totalbytes += k.size + v.size;
	    rowcounter++;
216
	    if (limitcount>0 && rowcounter>=limitcount) break;
217
	}
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
218
	assert(r==DB_NOTFOUND); // complain about things like lock-not-found
219
	r = dbc->c_close(dbc);                                      assert(r==0);
220 221
	double thistime = gettime();
	double tdiff = thistime-prevtime;
222
	printf("Scan    %lld bytes (%d rows) in %9.6fs at %9fMB/s\n", totalbytes, rowcounter, tdiff, 1e-6*totalbytes/tdiff);
223 224 225
    }
}

226 227
#ifdef TOKUDB

228 229 230 231
struct extra_count {
    long long totalbytes;
    int rowcounter;
};
232

233
static int counttotalbytes (DBT const *key, DBT const *data, void *extrav) {
234 235 236
    struct extra_count *e=extrav;
    e->totalbytes += key->size + data->size;
    e->rowcounter++;
237 238
    if (do_mysql && 0) {
        static uint64_t expect_key = 0;
239
        uint64_t k = mysql_get_bigint((unsigned char*)key->data+1);
240
        if (k != expect_key)
241
            printf("%s:%d %"PRIu64" %"PRIu64"\n", __FUNCTION__, __LINE__, k, expect_key);
242 243
        expect_key = k + 1;
    }
244
    return 0;
245 246
}

247
static void scanscan_lwc (void) {
248 249
    int r;
    int counter=0;
250
    for (counter=0; counter<n_experiments; counter++) {
251 252 253 254
	struct extra_count e = {0,0};
	double prevtime = gettime();
	DBC *dbc;
	r = db->cursor(db, tid, &dbc, 0);                           assert(r==0);
Yoni Fogel's avatar
Yoni Fogel committed
255 256 257 258
        u_int32_t f_flags = 0;
        if (prelockflag && (counter || prelock)) {
            f_flags |= lock_flag;
        }
259 260 261 262 263
	long rowcounter=0;
	while (0 == (r = dbc->c_getf_next(dbc, f_flags, counttotalbytes, &e))) {
	    rowcounter++;
	    if (limitcount>0 && rowcounter>=limitcount) break;
	}
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
264
	assert(r==DB_NOTFOUND);
265 266 267 268 269 270
	r = dbc->c_close(dbc);                                      assert(r==0);
	double thistime = gettime();
	double tdiff = thistime-prevtime;
	printf("LWC Scan %lld bytes (%d rows) in %9.6fs at %9fMB/s\n", e.totalbytes, e.rowcounter, tdiff, 1e-6*e.totalbytes/tdiff);
    }
}
271
#endif
272

273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
static void scanscan_range (void) {
    int fnull = open("/dev/null", O_WRONLY); assert(fnull >= 0); // use with strace
    int r;
    double tstart = gettime();
    DBC *dbc;
    r = db->cursor(db, tid, &dbc, 0); assert(r==0);

    int counter;
    for (counter=0; counter<n_experiments; counter++) {

        // generate a random key in the key range
        u_int64_t k = (start_range + (random() % (end_range - start_range))) * (1<<6);
        char kv[8];
        int i;
        for (i=0; i<8; i++)
            kv[i] = k >> (56-8*i);
        DBT key; memset(&key, 0, sizeof key); key.data = &kv, key.size = sizeof kv;
        DBT val; memset(&val, 0, sizeof val);

        // set the cursor to the random key
        write(fnull, "s", 1);
        r = dbc->c_get(dbc, &key, &val, DB_SET_RANGE+lock_flag); assert(r==0);
        write(fnull, "e", 1);

#if 0
	long rowcounter=0;
	while (0 == (r = dbc->c_getf_next(dbc, f_flags, counttotalbytes, &e))) {
	    rowcounter++;
	    if (limitcount>0 && rowcounter>=limitcount) break;
	}
#endif
    }

    r = dbc->c_close(dbc);                                      
    assert(r==0);	

    double tend = gettime();
    double tdiff = tend-tstart;
    printf("Range %d %f\n", n_experiments, tdiff);

    close(fnull);
}

316
#ifdef TOKUDB
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
struct extra_heavi {
    long long totalbytes;
    int rowcounter;
    DBT key;
    DBT val;
};

static int
copy_dbt(DBT *target, DBT const *source) {
    int r;
    if (target->ulen < source->size) {
        target->data = realloc(target->data, source->size);
        target->ulen = source->size;
    }
    if (!target->data) r = ENOMEM;
    else {
        target->size = source->size;
        memcpy(target->data, source->data, target->size);
        r = 0;
    }
    return r;
}

typedef struct foo{int a; } FOO;

static int
heaviside_next(const DBT *key, const DBT *val, void *extra_h) {
    struct extra_heavi *e=extra_h;

    int cmp;
    cmp = toku_default_compare_fun(db, key, &e->key);
    if (cmp != 0) return cmp;
    if (val) cmp = toku_default_compare_fun(db, val, &e->val);
    if (cmp != 0) return cmp;
    return -1; //Return negative on <=, positive on >
}

static int copy_and_counttotalbytes (DBT const *key, DBT const *val, void *extrav, int r_h) {
    assert(r_h>0);
    struct extra_heavi *e=extrav;
    e->totalbytes += key->size + val->size;
    e->rowcounter++;
    int r;
    r = copy_dbt(&e->key, key);
    if (r==0) r = copy_dbt(&e->val, val);
    return r;
}

static void scanscan_heaviside (void) {
    int r;
    int counter=0;
368
    for (counter=0; counter<n_experiments; counter++) {
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
	struct extra_heavi e;
        memset(&e, 0, sizeof(e));
        e.key.flags = DB_DBT_REALLOC;
        e.val.flags = DB_DBT_REALLOC;
	double prevtime = gettime();
	DBC *dbc;
	r = db->cursor(db, tid, &dbc, 0);                           assert(r==0);
        u_int32_t f_flags = 0;
        if (prelockflag && (counter || prelock)) {
            f_flags |= lock_flag;
        }
        //Get first manually.
	long rowcounter=1;
        r = dbc->c_get(dbc, &e.key, &e.val, DB_FIRST | f_flags); assert(r==0);
        e.rowcounter = 1;
        e.totalbytes = e.key.size + e.val.size;

	while (0 == (r = dbc->c_getf_heaviside(dbc, f_flags, 
                        copy_and_counttotalbytes, &e,
                        heaviside_next, &e,
                        1))) {
	    rowcounter++;
	    if (limitcount>0 && rowcounter>=limitcount) break;
	}
        assert(rowcounter==e.rowcounter);
        if (e.key.data) {
            free(e.key.data);
            e.key.data = NULL;
        }
        if (e.val.data) {
            free(e.val.data);
            e.val.data = NULL;
        }
	r = dbc->c_close(dbc);                                      assert(r==0);
	double thistime = gettime();
	double tdiff = thistime-prevtime;
	printf("LWC Scan %lld bytes (%d rows) in %9.6fs at %9fMB/s\n", e.totalbytes, e.rowcounter, tdiff, 1e-6*e.totalbytes/tdiff);
    }
}

409 410 411 412 413
struct extra_verify {
    long long totalbytes;
    int rowcounter;
    DBT k,v; // the k and v are gotten using the old cursor
};
414

415 416
static int
checkbytes (DBT const *key, DBT const *data, void *extrav) {
417 418 419 420 421 422 423 424 425
    struct extra_verify *e=extrav;
    e->totalbytes += key->size + data->size;
    e->rowcounter++;
    assert(e->k.size == key->size);
    assert(e->v.size == data->size);
    assert(memcmp(e->k.data, key->data,  key->size)==0);
    assert(memcmp(e->v.data, data->data, data->size)==0);
    assert(e->k.data != key->data);
    assert(e->v.data != data->data);
426
    return 0;
427 428 429
}
    

430
static void scanscan_verify (void) {
431 432
    int r;
    int counter=0;
433
    for (counter=0; counter<n_experiments; counter++) {
434 435 436 437 438 439 440 441 442
	struct extra_verify v;
	v.totalbytes=0;
	v.rowcounter=0;
	double prevtime = gettime();
	DBC *dbc1, *dbc2;
	r = db->cursor(db, tid, &dbc1, 0);                           assert(r==0);
	r = db->cursor(db, tid, &dbc2, 0);                           assert(r==0);
	memset(&v.k, 0, sizeof(v.k));
	memset(&v.v, 0, sizeof(v.v));
Yoni Fogel's avatar
Yoni Fogel committed
443 444 445 446 447 448
        u_int32_t f_flags = 0;
        u_int32_t c_get_flags = DB_NEXT;
        if (prelockflag && (counter || prelock)) {
            f_flags     |= lock_flag;
            c_get_flags |= lock_flag;
        }
449
	while (1) {
450 451 452
	    int r1,r2;
	    r2 = dbc1->c_get(dbc1, &v.k, &v.v, c_get_flags);
	    r1 = dbc2->c_getf_next(dbc2, f_flags, checkbytes, &v);
453 454 455 456 457 458 459 460 461 462 463
	    assert(r1==r2);
	    if (r1) break;
	}
	r = dbc1->c_close(dbc1);                                      assert(r==0);
	r = dbc2->c_close(dbc2);                                      assert(r==0);
	double thistime = gettime();
	double tdiff = thistime-prevtime;
	printf("verify   %lld bytes (%d rows) in %9.6fs at %9fMB/s\n", v.totalbytes, v.rowcounter, tdiff, 1e-6*v.totalbytes/tdiff);
    }
}

464
#endif
465 466 467 468 469

int main (int argc, const char *argv[]) {

    parse_args(argc,argv);

470
    scanscan_setup();
471
    switch (run_mode) {
472 473 474 475
    case RUN_HWC:    scanscan_hwc();    break;
#ifdef TOKUDB
    case RUN_LWC:    scanscan_lwc();    break;
    case RUN_VERIFY: scanscan_verify(); break;
476
    case RUN_HEAVI:  scanscan_heaviside(); break;
Yoni Fogel's avatar
Yoni Fogel committed
477
#endif
478
    case RUN_RANGE:  scanscan_range();  break;
479
    default:         assert(0);         break;
480
    }
481
    scanscan_shutdown();
482

483
#if defined(TOKUDB)
484
    if (verbose) {
485
	toku_cachetable_print_hash_histogram();
486
    }
487 488 489

    // if tokudb has tracing enabled (see trace_mem.h) then this will dump
    // the trace data
490
    if (0) {
491 492
        toku_print_trace_mem();
    }
493
#endif
494
#if defined(__linux__) && __linux__
495 496 497 498 499 500 501 502 503 504 505 506
    if (verbose) {
        char fname[256];
        sprintf(fname, "/proc/%d/status", toku_os_getpid());
        FILE *f = fopen(fname, "r");
        if (f) {
            char line[256];
            while (fgets(line, sizeof line, f)) {
                int n;
                if (sscanf(line, "VmPeak: %d", &n) || sscanf(line, "VmHWM: %d", &n) || sscanf(line, "VmRSS: %d", &n))
                    fputs(line, stdout);
            }
            fclose(f);
507
        }
508
    }
509
#endif
510 511
    return 0;
}