Commit 7c867a7a authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge some changes back to main line (Refs #2623, #2631). [t:2623] [t:2631]

{{{
svn merge -r 20381:20412 https://svn.tokutek.com/tokudb/toku/tokudb.2623
}}}
.


git-svn-id: file:///svn/toku/tokudb@20413 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2063b40c
...@@ -5842,3 +5842,19 @@ toku_brt_get_fragmentation(BRT brt, TOKU_DB_FRAGMENTATION report) { ...@@ -5842,3 +5842,19 @@ toku_brt_get_fragmentation(BRT brt, TOKU_DB_FRAGMENTATION report) {
return r; return r;
} }
int toku_brt_strerror_r(int error, char *buf, size_t buflen)
{
if (error>=0) {
return strerror_r(error, buf, buflen);
} else {
switch (error) {
case DB_KEYEXIST:
snprintf(buf, buflen, "Key exists");
return 0;
default:
snprintf(buf, buflen, "Unknown error %d", error);
errno = EINVAL;
return -1;
}
}
}
...@@ -229,6 +229,11 @@ BOOL toku_brt_is_recovery_logging_suppressed (BRT); ...@@ -229,6 +229,11 @@ BOOL toku_brt_is_recovery_logging_suppressed (BRT);
#define TOKU_MULTIPLE_MAIN_THREADS 0 #define TOKU_MULTIPLE_MAIN_THREADS 0
#endif #endif
int toku_brt_strerror_r(int error, char *buf, size_t buflen);
// Effect: LIke the XSI-compliant strerorr_r, extended to db_strerror().
// If error>=0 then the result is to do strerror_r(error, buf, buflen), that is fill buf with a descriptive error message.
// If error<0 then return a TokuDB-specific error code. For unknown cases, we return -1 and set errno=EINVAL, even for cases that *should* be known. (Not all DB errors are known by this function which is a bug.)
C_END C_END
#endif #endif
...@@ -129,8 +129,9 @@ int brtloader_init_file_infos (struct file_infos *fi) { ...@@ -129,8 +129,9 @@ int brtloader_init_file_infos (struct file_infos *fi) {
MALLOC_N(fi->n_files_limit, fi->file_infos); MALLOC_N(fi->n_files_limit, fi->file_infos);
if (fi->file_infos) return 0; if (fi->file_infos) return 0;
else { else {
int result = errno;
toku_pthread_mutex_destroy(&fi->lock); toku_pthread_mutex_destroy(&fi->lock);
return errno; return result;
} }
} }
...@@ -1447,6 +1448,8 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1447,6 +1448,8 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
* The fidxs are not closed by this function. * The fidxs are not closed by this function.
*/ */
{ {
int result = 0;
FILE *dest_stream = to_q ? NULL : toku_bl_fidx2file(bl, dest_data); FILE *dest_stream = to_q ? NULL : toku_bl_fidx2file(bl, dest_data);
//printf(" merge_some_files progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation); //printf(" merge_some_files progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
...@@ -1487,6 +1490,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1487,6 +1490,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
} }
pqueue_free(pq); pqueue_free(pq);
toku_free(pq_nodes); toku_free(pq_nodes);
printf("%s:%d returning %d\n", __FILE__, __LINE__, r); // remove this printf when we know that this path is tested.
return r; return r;
} }
...@@ -1519,6 +1523,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1519,6 +1523,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
} }
pqueue_free(pq); pqueue_free(pq);
toku_free(pq_nodes); toku_free(pq_nodes);
printf("%s:%d returning\n", __FILE__, __LINE__);
return r; return r;
} }
mini = node->i; mini = node->i;
...@@ -1553,6 +1558,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1553,6 +1558,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
toku_free(keys[mini].data); toku_free(keys[mini].data);
toku_free(vals[mini].data); toku_free(vals[mini].data);
} else { } else {
printf("%s:%d returning\n", __FILE__, __LINE__);
return r; return r;
} }
} }
...@@ -1561,13 +1567,13 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1561,13 +1567,13 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
pq_nodes[mini].key = &keys[mini]; pq_nodes[mini].key = &keys[mini];
r = pqueue_insert(pq, &pq_nodes[mini]); r = pqueue_insert(pq, &pq_nodes[mini]);
if (r!=0) { if (r!=0) {
// Note: This error path tested by loader-dup-test1.tdbrun
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
toku_free(keys[i].data); toku_free(keys[i].data);
toku_free(vals[i].data); toku_free(vals[i].data);
} }
pqueue_free(pq); result = r;
toku_free(pq_nodes); break;
return r;
} }
} }
} }
...@@ -1584,15 +1590,26 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1584,15 +1590,26 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
if (r!=0) return r; if (r!=0) return r;
} }
} }
if (to_q) { if (result==0 && to_q) {
BL_TRACE(blt_do_i); BL_TRACE(blt_do_i);
int r = queue_enq(q, (void*)output_rowset, 1, NULL); int r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq); BL_TRACE(blt_fractal_enq);
assert(r==0); assert(r==0); // if (r!=0) result = r;
output_rowset = NULL;
}
// cleanup
if (output_rowset) {
destroy_rowset(output_rowset);
toku_free(output_rowset);
} }
pqueue_free(pq); pqueue_free(pq);
toku_free(pq_nodes); toku_free(pq_nodes);
return update_progress(progress_allocation, bl, "end of merge_some_files"); {
int r = update_progress(progress_allocation, bl, "end of merge_some_files");
if (r!=0 && result==0) result = r;
}
return result;
} }
static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation) static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation)
......
...@@ -122,8 +122,15 @@ static int compare_ints (DB *dest_db, const DBT *akey, const DBT *bkey) { ...@@ -122,8 +122,15 @@ static int compare_ints (DB *dest_db, const DBT *akey, const DBT *bkey) {
return qsort_compare_ints(akey->data, bkey->data); return qsort_compare_ints(akey->data, bkey->data);
} }
static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) { static char *errorstr_static (int err) {
fprintf(stderr, "error in test"); static char errorstr[100];
toku_brt_strerror_r(err, errorstr, sizeof(errorstr));
return errorstr;
}
static void err_cb(DB *db UU(), int dbn, int err, DBT *key UU(), DBT *val UU(), void *extra UU()) {
fprintf(stderr, "error in test dbn=%d err=%d (%s)\n", dbn, err, errorstr_static(err));
abort(); abort();
} }
...@@ -132,7 +139,7 @@ enum { N_SOURCES = 2, N_RECORDS=10, N_DEST_DBS=1 }; ...@@ -132,7 +139,7 @@ enum { N_SOURCES = 2, N_RECORDS=10, N_DEST_DBS=1 };
static char *make_fname(const char *directory, const char *fname, int idx) { static char *make_fname(const char *directory, const char *fname, int idx) {
int len = strlen(directory)+strlen(fname)+20; int len = strlen(directory)+strlen(fname)+20;
char *XMALLOC_N(len, result); char *XMALLOC_N(len, result);
int r = snprintf(result, len, "%s%s%d", directory, fname, idx); int r = snprintf(result, len, "%s/%s%d", directory, fname, idx);
assert(r<len); assert(r<len);
return result; // don't care that it's a little too long. return result; // don't care that it's a little too long.
} }
...@@ -146,7 +153,7 @@ static void test (const char *directory) { ...@@ -146,7 +153,7 @@ static void test (const char *directory) {
char **XMALLOC_N(N_SOURCES, fnames); char **XMALLOC_N(N_SOURCES, fnames);
for (int i=0; i<N_SOURCES; i++) { for (int i=0; i<N_SOURCES; i++) {
fnames[i] = make_fname(directory, "temp", i); fnames[i] = make_fname(directory, "temp", i);
fds[i] = open(fnames[0], O_CREAT|O_RDWR, S_IRWXU); fds[i] = open(fnames[i], O_CREAT|O_RDWR, S_IRWXU);
assert(fds[i]>=0); assert(fds[i]>=0);
} }
for (int i=0; i<N_RECORDS; i++) { for (int i=0; i<N_RECORDS; i++) {
...@@ -172,6 +179,12 @@ static void test (const char *directory) { ...@@ -172,6 +179,12 @@ static void test (const char *directory) {
DB **XMALLOC_N(N_DEST_DBS, dbs); DB **XMALLOC_N(N_DEST_DBS, dbs);
const struct descriptor **XMALLOC_N(N_DEST_DBS, descriptors); const struct descriptor **XMALLOC_N(N_DEST_DBS, descriptors);
const char **XMALLOC_N(N_DEST_DBS, new_fnames_in_env); const char **XMALLOC_N(N_DEST_DBS, new_fnames_in_env);
for (int i=0; i<N_DEST_DBS; i++) {
char s[100];
snprintf(s, sizeof(s), "db%d.db", i);
new_fnames_in_env[i] = toku_strdup(s);
assert(new_fnames_in_env[i]);
}
brt_compare_func *XMALLOC_N(N_DEST_DBS, bt_compare_functions); brt_compare_func *XMALLOC_N(N_DEST_DBS, bt_compare_functions);
bt_compare_functions[0] = compare_ints; bt_compare_functions[0] = compare_ints;
CACHETABLE ct; CACHETABLE ct;
...@@ -190,7 +203,7 @@ static void test (const char *directory) { ...@@ -190,7 +203,7 @@ static void test (const char *directory) {
descriptors, descriptors,
new_fnames_in_env, new_fnames_in_env,
bt_compare_functions, bt_compare_functions,
(const char *)NULL, // temp_filetemplate "tempxxxxxx",
*lsnp); *lsnp);
assert(r==0); assert(r==0);
} }
...@@ -207,8 +220,34 @@ static void test (const char *directory) { ...@@ -207,8 +220,34 @@ static void test (const char *directory) {
const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues. const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues.
{ int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE); assert(r==0); } { int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE); assert(r==0); }
FIDX *XMALLOC_N(N_SOURCES, src_fidxs); FIDX *XMALLOC_N(N_SOURCES, src_fidxs);
assert(bl->file_infos.n_files==0);
bl->file_infos.n_files = N_SOURCES;
bl->file_infos.n_files_limit = N_SOURCES;
bl->file_infos.n_files_open = 0;
bl->file_infos.n_files_extant = N_SOURCES;
XREALLOC_N(bl->file_infos.n_files_limit, bl->file_infos.file_infos);
const int BUFFER_SIZE = 100;
for (int i=0; i<N_SOURCES; i++) {
bl->file_infos.file_infos[i] = (struct file_info){ .is_open = FALSE,
.is_extant = TRUE,
.fname = toku_strdup(fnames[i]),
.file = (FILE*)NULL,
.n_rows = N_RECORDS,
.buffer_size = BUFFER_SIZE,
.buffer = toku_xmalloc(BUFFER_SIZE)};
src_fidxs[i].idx = i;
}
{ {
int r = toku_merge_some_files_using_dbufio(TRUE, FIDX_NULL, q, N_SOURCES, bfs, src_fidxs, bl, 0, (DB*)NULL, compare_ints, 10000); int r = toku_merge_some_files_using_dbufio(TRUE, FIDX_NULL, q, N_SOURCES, bfs, src_fidxs, bl, 0, (DB*)NULL, compare_ints, 10000);
if (r!=0) printf("%s:%d r=%d (%s)\n", __FILE__, __LINE__, r, errorstr_static(r));
assert(r==0);
}
{
int r = toku_brtloader_destroy(bl);
assert(r==0);
}
{
int r = toku_cachetable_close(&ct);
assert(r==0); assert(r==0);
} }
} }
...@@ -272,6 +311,7 @@ int test_main (int argc, const char *argv[]) { ...@@ -272,6 +311,7 @@ int test_main (int argc, const char *argv[]) {
if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count); if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count);
if (0) {
int event_limit = event_count; int event_limit = event_count;
if (verbose) printf("event_limit=%d\n", event_limit); if (verbose) printf("event_limit=%d\n", event_limit);
...@@ -284,6 +324,7 @@ int test_main (int argc, const char *argv[]) { ...@@ -284,6 +324,7 @@ int test_main (int argc, const char *argv[]) {
test(directory); test(directory);
} }
}
return 0; return 0;
} }
......
...@@ -651,3 +651,8 @@ test_thread_stack.%run: test_thread_stack.%$(BINSUF) $(PTHREAD_LOCAL) ...@@ -651,3 +651,8 @@ test_thread_stack.%run: test_thread_stack.%$(BINSUF) $(PTHREAD_LOCAL)
./$< -a -thread_stack 16384 && \ ./$< -a -thread_stack 16384 && \
./$< -a -thread_stack 16384 -resume $(SUMMARIZE_CMD) ./$< -a -thread_stack 16384 -resume $(SUMMARIZE_CMD)
loader-dup-test.tdbrun: loader-dup-test1.tdbrun
loader-dup-test1.tdbrun: loader-dup-test.tdb$(BINSUF)
$(VGRIND) ./loader-dup-test.tdb -d 1 -r 500000 -e 1 $(SUMMARIZE_CMD)
...@@ -269,12 +269,20 @@ static void test_loader(DB **dbs) ...@@ -269,12 +269,20 @@ static void test_loader(DB **dbs)
} }
} }
char *free_me = NULL;
char *env_dir = ENVDIR; // the default env_dir
static void run_test(void) static void run_test(void)
{ {
int r; int r;
r = system("rm -rf " ENVDIR); CKERR(r); {
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); int len = strlen(env_dir) + 20;
char syscmd[len];
r = snprintf(syscmd, len, "rm -rf %s", env_dir);
assert(r<len);
r = system(syscmd); CKERR(r);
}
r = toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_env_create(&env, 0); CKERR(r); r = db_env_create(&env, 0); CKERR(r);
r = env->set_default_bt_compare(env, uint_dbt_cmp); CKERR(r); r = env->set_default_bt_compare(env, uint_dbt_cmp); CKERR(r);
...@@ -282,7 +290,7 @@ static void run_test(void) ...@@ -282,7 +290,7 @@ static void run_test(void)
r = env->set_generate_row_callback_for_put(env, put_multiple_generate); r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r); CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG | DB_CREATE | DB_PRIVATE; int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG | DB_CREATE | DB_PRIVATE;
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, env_dir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr); env->set_errfile(env, stderr);
//Disable auto-checkpointing //Disable auto-checkpointing
r = env->checkpointing_set_period(env, 0); CKERR(r); r = env->checkpointing_set_period(env, 0); CKERR(r);
...@@ -338,6 +346,7 @@ int test_main(int argc, char * const *argv) { ...@@ -338,6 +346,7 @@ int test_main(int argc, char * const *argv) {
run_test(); run_test();
} }
} }
if (free_me) toku_free(free_me);
return 0; return 0;
} }
...@@ -349,8 +358,17 @@ static void do_args(int argc, char * const argv[]) { ...@@ -349,8 +358,17 @@ static void do_args(int argc, char * const argv[]) {
if (strcmp(argv[0], "-h")==0) { if (strcmp(argv[0], "-h")==0) {
resultcode=0; resultcode=0;
do_usage: do_usage:
fprintf(stderr, "Usage: %s -h -c -d %d -r %d\n", cmd, NUM_DBS, NUM_ROWS); fprintf(stderr, "Usage: %s -h -c -d %d -r %d [ -e <envdir> ]\n", cmd, NUM_DBS, NUM_ROWS);
fprintf(stderr, " where -e <env> uses <env> to construct the directory (so that different tests of loader-stress-test can run concurrently)\n");
exit(resultcode); exit(resultcode);
} else if (strcmp(argv[0], "-e")==0) {
argc--; argv++;
if (free_me) toku_free(free_me);
int len = strlen(ENVDIR) + strlen(argv[0]) + 2;
char full_env_dir[len];
int r = snprintf(full_env_dir, len, "%s.%s", ENVDIR, argv[0]);
assert(r<len);
free_me = env_dir = toku_strdup(full_env_dir);
} else if (strcmp(argv[0], "-v")==0) { } else if (strcmp(argv[0], "-v")==0) {
verbose++; verbose++;
} else if (strcmp(argv[0],"-q")==0) { } else if (strcmp(argv[0],"-q")==0) {
......
...@@ -330,7 +330,7 @@ static void run_test(void) ...@@ -330,7 +330,7 @@ static void run_test(void)
r = env->set_generate_row_callback_for_put(env, put_multiple_generate); r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r); CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE; int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, env_dir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr); env->set_errfile(env, stderr);
r = env->checkpointing_set_period(env, 60); CKERR(r); r = env->checkpointing_set_period(env, 60); CKERR(r);
...@@ -391,7 +391,7 @@ static void do_args(int argc, char * const argv[]) { ...@@ -391,7 +391,7 @@ static void do_args(int argc, char * const argv[]) {
do_usage: do_usage:
fprintf(stderr, "Usage: -h -c -d <num_dbs> -r <num_rows> [ -b <num_calls> ] [-m <megabytes>] [-M]\n%s\n", cmd); fprintf(stderr, "Usage: -h -c -d <num_dbs> -r <num_rows> [ -b <num_calls> ] [-m <megabytes>] [-M]\n%s\n", cmd);
fprintf(stderr, " where -b <num_calls> causes the poll function to return nonzero after <num_calls>\n"); fprintf(stderr, " where -b <num_calls> causes the poll function to return nonzero after <num_calls>\n");
fprintf(stderr, " -e <env> uses <env> to construct the directory (so that different tests of loader-stress-test can run concurrently)\n"); fprintf(stderr, " -e <env> uses <env> to construct the directory (so that different tests can run concurrently)\n");
fprintf(stderr, " -m <m> use m MB of memeory for the cachetable (defualt is %d MB)\n", default_cachesize); fprintf(stderr, " -m <m> use m MB of memeory for the cachetable (defualt is %d MB)\n", default_cachesize);
fprintf(stderr, " -M use half of physical memory for the cachetable\n"); fprintf(stderr, " -M use half of physical memory for the cachetable\n");
fprintf(stderr, " -s use size factor of 1 and count temporary files\n"); fprintf(stderr, " -s use size factor of 1 and count temporary files\n");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment