Commit 0fc32fb9 authored by Barry Perlman's avatar Barry Perlman Committed by Yoni Fogel

[t:2449] Export test function (override loader's fwrite) via env. Fix bug in...

[t:2449] Export test function (override loader's fwrite) via env. Fix bug in brtloader's handling of error from overridden fwrite().
Fix bug in ydb's handling of ephemeral loader when unable to write to disk.
Improve loader-cleanup-test.

git-svn-id: file:///svn/toku/tokudb@19825 c7de825b-a66e-492c-adef-691d508d4ae1
parent fa396255
...@@ -457,6 +457,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de ...@@ -457,6 +457,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default"))); int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
void db_env_set_func_loader_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
......
...@@ -473,6 +473,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de ...@@ -473,6 +473,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default"))); int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
void db_env_set_func_loader_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
......
...@@ -481,6 +481,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de ...@@ -481,6 +481,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default"))); int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
void db_env_set_func_loader_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
......
...@@ -481,6 +481,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de ...@@ -481,6 +481,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default"))); int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
void db_env_set_func_loader_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
......
...@@ -486,6 +486,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de ...@@ -486,6 +486,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default"))); int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
void db_env_set_func_loader_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
......
...@@ -641,6 +641,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__ ...@@ -641,6 +641,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
printf("int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) %s;\n", VISIBLE); printf("int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) %s;\n", VISIBLE); printf("int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_realloc (void *(*)(void*, size_t)) %s;\n", VISIBLE); printf("int db_env_set_func_realloc (void *(*)(void*, size_t)) %s;\n", VISIBLE);
printf("void db_env_set_func_loader_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) %s;\n", VISIBLE);
printf("void db_env_set_checkpoint_callback (void (*)(void*), void*) %s;\n", VISIBLE); printf("void db_env_set_checkpoint_callback (void (*)(void*), void*) %s;\n", VISIBLE);
printf("void db_env_set_checkpoint_callback2 (void (*)(void*), void*) %s;\n", VISIBLE); printf("void db_env_set_checkpoint_callback2 (void (*)(void*), void*) %s;\n", VISIBLE);
printf("void db_env_set_recover_callback (void (*)(void*), void*) %s;\n", VISIBLE); printf("void db_env_set_recover_callback (void (*)(void*), void*) %s;\n", VISIBLE);
......
...@@ -418,6 +418,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de ...@@ -418,6 +418,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default"))); int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
void db_env_set_func_loader_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
......
...@@ -418,6 +418,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de ...@@ -418,6 +418,7 @@ int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("de
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default"))); int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
void db_env_set_func_loader_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default"))); void db_env_set_recover_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
......
...@@ -247,7 +247,11 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD ...@@ -247,7 +247,11 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD
FILE *stream = bl_fidx2file(bl, streami); FILE *stream = bl_fidx2file(bl, streami);
size_t r = do_fwrite(ptr, size, nmemb, stream); size_t r = do_fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) { if (r!=nmemb) {
int e = ferror(stream); int e;
if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ...
e = errno; // ... then there is no error in the stream, but there is one in errno
else
e = ferror(stream);
assert(e!=0); assert(e!=0);
bl->panic = 1; bl->panic = 1;
bl->panic_errno = e; bl->panic_errno = e;
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
db_env_set_func_free; db_env_set_func_free;
db_env_set_func_pwrite; db_env_set_func_pwrite;
db_env_set_func_write; db_env_set_func_write;
db_env_set_func_loader_fwrite;
db_env_set_checkpoint_callback; db_env_set_checkpoint_callback;
db_env_set_checkpoint_callback2; db_env_set_checkpoint_callback2;
db_env_set_recover_callback; db_env_set_recover_callback;
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
* *
* Improve enospc testing * Improve enospc testing
* - fail only after n calls to write() * - fail only after n calls to write()
* - capture stream write() calls
* *
* Decide how to test on recovery (using checkpoint_stress technique?), implement. * Decide how to test on recovery (using checkpoint_stress technique?), implement.
* *
...@@ -53,8 +52,12 @@ ...@@ -53,8 +52,12 @@
#include "ydb-internal.h" #include "ydb-internal.h"
enum test_type {commit, // close loader, commit txn
enum test_type {commit, abort_loader, abort_via_poll, enospc, abort_txn}; abort_txn, // close loader, abort txn
abort_loader, // abort loader, abort txn
abort_via_poll, // close loader, but poll function returns non-zero, abort txn
enospc_w, // close loader, but close fails due to enospc return from toku_os_write
enospc_f}; // either loader->put() or loader->close() fails due to enospc return from do_fwrite()
int abort_on_poll = 0; // set when test_loader() called with test_type of abort_via_poll int abort_on_poll = 0; // set when test_loader() called with test_type of abort_via_poll
...@@ -79,6 +82,22 @@ int verify_file(char * dirname, char * filename); ...@@ -79,6 +82,22 @@ int verify_file(char * dirname, char * filename);
void assert_inames_missing(DBT* inames); void assert_inames_missing(DBT* inames);
ssize_t bad_write(int, const void *, size_t); ssize_t bad_write(int, const void *, size_t);
int fwrite_count = 0;
int fwrite_count_nominal = 0; // number of fwrite calls for normal operation, initially zero
int fwrite_count_trigger = 0; // sequence number of fwrite call that will fail (zero disables induced failure)
static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
fwrite_count++;
size_t r;
if (fwrite_count_trigger == fwrite_count) {
errno = ENOSPC;
r = -1;
} else {
r = fwrite(ptr, size, nmemb, stream);
}
return r;
}
// return number of temp files // return number of temp files
...@@ -148,10 +167,15 @@ assert_inames_missing(DBT* inames) { ...@@ -148,10 +167,15 @@ assert_inames_missing(DBT* inames) {
} }
} }
int write_calls = 0;
ssize_t ssize_t
bad_write(int UU(fd), const void * UU(src), size_t UU(n)) { bad_write(int UU(fd), const void * UU(bp), size_t UU(len)) {
ssize_t r;
write_calls++;
errno= ENOSPC; errno= ENOSPC;
return -1; r = -1;
return r;
} }
...@@ -362,6 +386,8 @@ static int poll_function (void *extra, float progress) { ...@@ -362,6 +386,8 @@ static int poll_function (void *extra, float progress) {
static void test_loader(enum test_type t, DB **dbs) static void test_loader(enum test_type t, DB **dbs)
{ {
int failed_put = 0;
if (t == abort_via_poll) if (t == abort_via_poll)
abort_on_poll = 1; abort_on_poll = 1;
else else
...@@ -398,13 +424,16 @@ static void test_loader(enum test_type t, DB **dbs) ...@@ -398,13 +424,16 @@ static void test_loader(enum test_type t, DB **dbs)
// using loader->put, put values into DB // using loader->put, put values into DB
DBT key, val; DBT key, val;
unsigned int k, v; unsigned int k, v;
for(int i=1;i<=NUM_ROWS;i++) { for(int i=1;i<=NUM_ROWS && !failed_put;i++) {
k = i; k = i;
v = generate_val(i, 0); v = generate_val(i, 0);
dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val); r = loader->put(loader, &key, &val);
CKERR(r); if (t == enospc_f)
failed_put = r;
else
CKERR(r);
if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} } if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
} }
if( CHECK_RESULTS || verbose ) {printf("\n"); fflush(stdout);} if( CHECK_RESULTS || verbose ) {printf("\n"); fflush(stdout);}
...@@ -429,12 +458,21 @@ static void test_loader(enum test_type t, DB **dbs) ...@@ -429,12 +458,21 @@ static void test_loader(enum test_type t, DB **dbs)
r = loader->close(loader); r = loader->close(loader);
assert(r); // not defined what close() returns when poll function returns non-zero assert(r); // not defined what close() returns when poll function returns non-zero
} }
else if (t == enospc) { else if (t == enospc_w) {
r = db_env_set_func_write(bad_write); r = db_env_set_func_write(bad_write);
CKERR(r); CKERR(r);
printf("closing, but expecting failure from enospc\n"); printf("closing, but expecting failure from enospc\n");
r = loader->close(loader); r = loader->close(loader);
assert(r); printf("write_calls = %d\n", write_calls);
// assert(r);
}
else if (t == enospc_f && !failed_put) {
printf("closing, but expecting failure from enospc\n");
r = loader->close(loader);
if (!USE_PUTS)
assert(r);
else
CKERR(r); // if using puts, "outer" loader should close just fine
} }
else { else {
printf("aborting loader"); fflush(stdout); printf("aborting loader"); fflush(stdout);
...@@ -449,6 +487,8 @@ static void test_loader(enum test_type t, DB **dbs) ...@@ -449,6 +487,8 @@ static void test_loader(enum test_type t, DB **dbs)
printf(" done\n"); printf(" done\n");
if (t == commit) { if (t == commit) {
fwrite_count_nominal = fwrite_count; // capture how many fwrites were required for normal operation
if (verbose) printf("Calls to fwrite nominal: %d\n", fwrite_count_nominal);
r = txn->commit(txn, 0); r = txn->commit(txn, 0);
CKERR(r); CKERR(r);
if (!USE_PUTS) { if (!USE_PUTS) {
...@@ -457,6 +497,7 @@ static void test_loader(enum test_type t, DB **dbs) ...@@ -457,6 +497,7 @@ static void test_loader(enum test_type t, DB **dbs)
if ( CHECK_RESULTS ) { if ( CHECK_RESULTS ) {
check_results(dbs); check_results(dbs);
} }
} }
else { else {
r = txn->abort(txn); r = txn->abort(txn);
...@@ -469,9 +510,10 @@ static void test_loader(enum test_type t, DB **dbs) ...@@ -469,9 +510,10 @@ static void test_loader(enum test_type t, DB **dbs)
} }
static void run_test(enum test_type t) static void run_test(enum test_type t, int trigger)
{ {
int r; int r;
r = system("rm -rf " ENVDIR); CKERR(r); r = system("rm -rf " ENVDIR); CKERR(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
...@@ -505,6 +547,11 @@ static void run_test(enum test_type t) ...@@ -505,6 +547,11 @@ static void run_test(enum test_type t)
generate_permute_tables(); generate_permute_tables();
fwrite_count = 0;
fwrite_count_trigger = trigger;
db_env_set_func_loader_fwrite(bad_fwrite);
test_loader(t, dbs); test_loader(t, dbs);
for(int i=0;i<NUM_DBS;i++) { for(int i=0;i<NUM_DBS;i++) {
...@@ -521,18 +568,26 @@ static void do_args(int argc, char * const argv[]); ...@@ -521,18 +568,26 @@ static void do_args(int argc, char * const argv[]);
int test_main(int argc, char * const *argv) { int test_main(int argc, char * const *argv) {
do_args(argc, argv); do_args(argc, argv);
if (verbose) printf("\n\nTesting loader with close and commit (normal)\n"); if (verbose) printf("\n\nTesting loader with close and commit (normal)\n");
run_test(commit); run_test(commit, 0);
if (verbose) printf("\n\nTesting loader with loader abort and txn abort\n"); if (verbose) printf("\n\nTesting loader with loader abort and txn abort\n");
run_test(abort_loader); run_test(abort_loader, 0);
if (!USE_PUTS) { if (!USE_PUTS) {
if (verbose) printf("\n\nTesting loader with loader abort_via_poll and txn abort\n"); if (verbose) printf("\n\nTesting loader with loader abort_via_poll and txn abort\n");
run_test(abort_via_poll); run_test(abort_via_poll, 0);
} }
if (verbose) printf("\n\nTesting loader with loader close and txn abort\n"); if (verbose) printf("\n\nTesting loader with loader close and txn abort\n");
run_test(abort_txn); run_test(abort_txn, 0);
if (INDUCE_ENOSPC) { if (INDUCE_ENOSPC) {
if (verbose) printf("\n\nTesting loader with enospc induced during loader close\n"); if (verbose) printf("\n\nTesting loader with enospc induced during loader close\n");
run_test(enospc); run_test(enospc_w, 0);
}
{
int i;
for (i = 1; i < 5; i++) {
int trigger = fwrite_count_nominal / i;
if (verbose) printf("\n\nTesting loader with enospc induced at fwrite count %d\n", trigger);
run_test(enospc_f, trigger);
}
} }
return 0; return 0;
} }
...@@ -575,7 +630,7 @@ static void do_args(int argc, char * const argv[]) { ...@@ -575,7 +630,7 @@ static void do_args(int argc, char * const argv[]) {
printf("Using puts\n"); printf("Using puts\n");
} else if (strcmp(argv[0], "-e")==0) { } else if (strcmp(argv[0], "-e")==0) {
INDUCE_ENOSPC = 1; INDUCE_ENOSPC = 1;
printf("Using puts\n"); printf("Using enospc\n");
} else { } else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]); fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1; resultcode=1;
......
...@@ -31,7 +31,7 @@ const char *toku_copyright_string = "Copyright (c) 2007-2009 Tokutek Inc. All r ...@@ -31,7 +31,7 @@ const char *toku_copyright_string = "Copyright (c) 2007-2009 Tokutek Inc. All r
#include "key.h" #include "key.h"
#include "loader.h" #include "loader.h"
#include "ydb_load.h" #include "ydb_load.h"
#include "brtloader.h"
#ifdef TOKUTRACE #ifdef TOKUTRACE
#define DB_ENV_CREATE_FUN db_env_create_toku10 #define DB_ENV_CREATE_FUN db_env_create_toku10
...@@ -5054,20 +5054,27 @@ int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) { ...@@ -5054,20 +5054,27 @@ int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) {
uint32_t dbt_flags[1] = {0}; uint32_t dbt_flags[1] = {0};
uint32_t loader_flags = DB_PRELOCKED_WRITE; //Don't recursively prelock uint32_t loader_flags = DB_PRELOCKED_WRITE; //Don't recursively prelock
DB_ENV *env = db->dbenv; DB_ENV *env = db->dbenv;
DB_TXN *child = NULL;
{
// begin child
int rt = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1);
assert(rt==0);
}
toku_ydb_unlock(); //Cannot hold ydb lock when creating loader toku_ydb_unlock(); //Cannot hold ydb lock when creating loader
int r_loader;
r_loader = env->create_loader(env, txn, &loader, NULL, 1, dbs, db_flags, dbt_flags, loader_flags); int r_loader = env->create_loader(env, child, &loader, NULL, 1, dbs, db_flags, dbt_flags, loader_flags);
if (r_loader==0) { if (r_loader==0) {
int r2; r_loader = loader->set_error_callback(loader, NULL, NULL);
r2 = loader->set_error_callback(loader, NULL, NULL); assert(r_loader==0);
assert(r2==0); r_loader = loader->set_poll_function(loader, NULL, NULL);
r2 = loader->set_poll_function(loader, NULL, NULL); assert(r_loader==0);
assert(r2==0);
// close the loader // close the loader
r2 = loader->close(loader); r_loader = loader->close(loader);
assert(r2==0); if (r_loader==0) {
toku_brt_suppress_recovery_logs(db->i->brt, db_txn_struct_i(txn)->tokutxn); toku_brt_suppress_recovery_logs(db->i->brt, db_txn_struct_i(child)->tokutxn);
}
} }
else if (r_loader != DB_LOCK_NOTGRANTED) { else if (r_loader != DB_LOCK_NOTGRANTED) {
//Lock not granted is not an error. //Lock not granted is not an error.
...@@ -5075,6 +5082,16 @@ int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) { ...@@ -5075,6 +5082,16 @@ int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) {
assert(r==0); assert(r==0);
r = r_loader; r = r_loader;
} }
if (r_loader == 0) { // commit
r = locked_txn_commit(child, 0);
assert(r==0);
}
else { // abort
r = locked_txn_abort(child);
assert(r==0);
}
toku_ydb_lock(); //Reaquire ydb lock. toku_ydb_lock(); //Reaquire ydb lock.
} }
...@@ -5518,6 +5535,12 @@ int db_env_set_func_write (ssize_t (*write_function)(int, const void *, size_t)) ...@@ -5518,6 +5535,12 @@ int db_env_set_func_write (ssize_t (*write_function)(int, const void *, size_t))
return toku_set_func_write(write_function); return toku_set_func_write(write_function);
} }
void
db_env_set_func_loader_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) {
brtloader_set_os_fwrite(fwrite_fun);
}
int db_env_set_func_malloc (void *(*f)(size_t)) { int db_env_set_func_malloc (void *(*f)(size_t)) {
return toku_set_func_malloc(f); return toku_set_func_malloc(f);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment