Commit eaaf39a1 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Implement {{{db_env_set_func_fsync}}} and write a test that can discern the...

Implement {{{db_env_set_func_fsync}}} and write a test that can discern the use of group commit.  Fixes #499, #496.

git-svn-id: file:///svn/tokudb@2655 c7de825b-a66e-492c-adef-691d508d4ae1
parent b2c1ad2c
...@@ -279,6 +279,7 @@ int db_create(DB **, DB_ENV *, u_int32_t) __attribute__((__visibility__("default ...@@ -279,6 +279,7 @@ int db_create(DB **, DB_ENV *, u_int32_t) __attribute__((__visibility__("default
char *db_strerror(int) __attribute__((__visibility__("default"))); char *db_strerror(int) __attribute__((__visibility__("default")));
const char *db_version(int*,int *,int *) __attribute__((__visibility__("default"))); const char *db_version(int*,int *,int *) __attribute__((__visibility__("default")));
int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("default"))); int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("default")));
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -187,4 +187,6 @@ int toku_testsetup_get_sersize(BRT brt, DISKOFF diskoff); // Return the size on ...@@ -187,4 +187,6 @@ int toku_testsetup_get_sersize(BRT brt, DISKOFF diskoff); // Return the size on
int toku_testsetup_insert_to_leaf (BRT brt, DISKOFF diskoff, char *key, int keylen, char *val, int vallen, u_int32_t *leaf_fingerprint); int toku_testsetup_insert_to_leaf (BRT brt, DISKOFF diskoff, char *key, int keylen, char *val, int vallen, u_int32_t *leaf_fingerprint);
int toku_testsetup_insert_to_nonleaf (BRT brt, DISKOFF diskoff, enum brt_cmd_type, char *key, int keylen, char *val, int vallen, u_int32_t *subtree_fingerprint); int toku_testsetup_insert_to_nonleaf (BRT brt, DISKOFF diskoff, enum brt_cmd_type, char *key, int keylen, char *val, int vallen, u_int32_t *subtree_fingerprint);
int toku_set_func_fsync (int (*fsync_function)(int));
#endif #endif
...@@ -184,6 +184,8 @@ int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) { ...@@ -184,6 +184,8 @@ int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
return 0; return 0;
} }
static int (*toku_os_fsync_function)(int)=fsync;
int toku_logger_close(TOKULOGGER *loggerp) { int toku_logger_close(TOKULOGGER *loggerp) {
TOKULOGGER logger = *loggerp; TOKULOGGER logger = *loggerp;
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
...@@ -195,7 +197,7 @@ int toku_logger_close(TOKULOGGER *loggerp) { ...@@ -195,7 +197,7 @@ int toku_logger_close(TOKULOGGER *loggerp) {
r = write(logger->fd, logger->buf, logger->n_in_buf); r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno; if (r==-1) return errno;
} }
r = fsync(logger->fd); r = toku_os_fsync_function(logger->fd);
if (r!=0) close(logger->fd); if (r!=0) close(logger->fd);
else r = close(logger->fd); else r = close(logger->fd);
} }
...@@ -224,7 +226,7 @@ int toku_logger_fsync (TOKULOGGER logger) { ...@@ -224,7 +226,7 @@ int toku_logger_fsync (TOKULOGGER logger) {
int r=flush(logger, 0); int r=flush(logger, 0);
if (r!=0) return r; if (r!=0) return r;
if (logger->fd>=0) { if (logger->fd>=0) {
r = fsync(logger->fd); r = toku_os_fsync_function(logger->fd);
if (r!=0) return errno; if (r!=0) return errno;
} }
return 0; return 0;
...@@ -661,3 +663,8 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) { ...@@ -661,3 +663,8 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
*result = 0; *result = 0;
return 0; return 0;
} }
int toku_set_func_fsync (int (*fsync_function)(int)) {
toku_os_fsync_function = fsync_function;
return 0;
}
...@@ -5,7 +5,8 @@ ...@@ -5,7 +5,8 @@
db_strerror; db_strerror;
db_version; db_version;
log_compare; log_compare;
db_env_set_func_fsync;
toku_ydb_error_all_cases; toku_ydb_error_all_cases;
local: *; local: *;
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
/* Test by counting the fsyncs, to see if group commit is working. */
#include <db.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include "test.h"
DB_ENV *env;
DB *db;
#define NITER 100
void *start_a_thread (void *i_p) {
int *which_thread_p = i_p;
int i,r;
for (i=0; i<NITER; i++) {
DB_TXN *tid;
char keystr[100];
DBT key,data;
snprintf(keystr, sizeof(key), "%ld.%d.%d", random(), *which_thread_p, i);
r=env->txn_begin(env, 0, &tid, 0); CKERR(r);
r=db->put(db, tid,
dbt_init(&key, keystr, 1+strlen(keystr)),
dbt_init(&data, keystr, 1+strlen(keystr)),
0);
r=tid->commit(tid, 0); CKERR(r);
}
return 0;
}
void test_groupcommit (int nthreads) {
int r;
DB_TXN *tid;
r=db_env_create(&env, 0); assert(r==0);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_THREAD, 0777); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, 0777); CKERR(r);
r=tid->commit(tid, 0); assert(r==0);
int i;
pthread_t threads[nthreads];
int whichthread[nthreads];
for (i=0; i<nthreads; i++) {
whichthread[i]=i;
r=pthread_create(&threads[i], 0, start_a_thread, &whichthread[i]);
}
for (i=0; i<nthreads; i++) {
pthread_join(threads[i], 0);
}
#if 0
r=env->txn_begin(env, 0, &tid, 0); CKERR(r);
char there[1000];
memset(there, 'a',sizeof(there));
there[999]=0;
for (i=0; sum<(effective_max*3)/2; i++) {
DBT key,data;
char hello[20];
snprintf(hello, 20, "hello%d", i);
r=db->put(db, tid,
dbt_init(&key, hello, strlen(hello)+1),
dbt_init(&data, there, sizeof(there)),
0);
assert(r==0);
sum+=strlen(hello)+1+sizeof(there);
if ((i+1)%10==0) {
r=tid->commit(tid, 0); assert(r==0);
r=env->txn_begin(env, 0, &tid, 0); CKERR(r);
}
}
if (verbose) printf("i=%d sum=%d effmax=%d\n", i, sum, effective_max);
r=tid->commit(tid, 0); assert(r==0);
#endif
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
}
int fsync_count=0;
int do_fsync (int fd) {
fsync_count++;
return fsync(fd);
}
const char *progname;
struct timeval prevtime;
int prev_count;
void printtdiff (char *str) {
struct timeval thistime;
gettimeofday(&thistime, 0);
printf("%s: %10.6fs %d fsyncs for %s\n", progname, thistime.tv_sec-prevtime.tv_sec+1e-6*(thistime.tv_usec-prevtime.tv_usec), fsync_count-prev_count, str);
prevtime=thistime;
prev_count=fsync_count;
}
int main (int argc, const char *argv[]) {
progname=argv[0];
parse_args(argc, argv);
gettimeofday(&prevtime, 0);
prev_count=0;
{ int r = db_env_set_func_fsync(do_fsync); CKERR(r); }
system("rm -rf " ENVDIR);
{ int r=mkdir(ENVDIR, 0777); assert(r==0); }
test_groupcommit(1); printtdiff("1 thread");
test_groupcommit(2); printtdiff("2 threads");
test_groupcommit(10); printtdiff("10 threads");
int count_before_20 = fsync_count;
test_groupcommit(20); printtdiff("20 threads");
if (fsync_count-count_before_20 >= 20*NITER) {
printf("It looks like too many fsyncs. Group commit doesn't appear to be occuring.\n");
exit(1);
}
return 0;
}
...@@ -546,12 +546,14 @@ static int toku_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags ...@@ -546,12 +546,14 @@ static int toku_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1
void toku_default_errcall(const char *errpfx, char *msg) { void toku_default_errcall(const char *errpfx, char *msg) {
fprintf(stderr, "YDB: %s: %s", errpfx, msg);
}
#else #else
void toku_default_errcall(const DB_ENV *env, const char *errpfx, const char *msg) { void toku_default_errcall(const DB_ENV *env, const char *errpfx, const char *msg) {
env = env; env = env;
#endif
fprintf(stderr, "YDB: %s: %s", errpfx, msg); fprintf(stderr, "YDB: %s: %s", errpfx, msg);
} }
#endif
static int locked_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { static int locked_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
toku_ydb_lock(); int r = toku_env_open(env, home, flags, mode); toku_ydb_unlock(); return r; toku_ydb_lock(); int r = toku_env_open(env, home, flags, mode); toku_ydb_unlock(); return r;
...@@ -2489,3 +2491,7 @@ const char *db_version(int *major, int *minor, int *patch) { ...@@ -2489,3 +2491,7 @@ const char *db_version(int *major, int *minor, int *patch) {
*patch = DB_VERSION_PATCH; *patch = DB_VERSION_PATCH;
return DB_VERSION_STRING; return DB_VERSION_STRING;
} }
int db_env_set_func_fsync (int (*fsync_function)(int)) {
return toku_set_func_fsync(fsync_function);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment