Commit 0ca9bf51 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge tokudb.2603 to tokudb refs[t:2603]

git-svn-id: file:///svn/toku/tokudb@20244 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7b2ebbe8
...@@ -104,14 +104,13 @@ static void cleanup_big_buffer(struct file_info *file) { ...@@ -104,14 +104,13 @@ static void cleanup_big_buffer(struct file_info *file) {
} }
int brtloader_init_file_infos (struct file_infos *fi) { int brtloader_init_file_infos (struct file_infos *fi) {
int r = toku_pthread_mutex_init(&fi->lock, NULL); int r = toku_pthread_mutex_init(&fi->lock, NULL); assert(r == 0);
assert(r==0);
fi->n_files = 0; fi->n_files = 0;
fi->n_files_limit = 1; fi->n_files_limit = 1;
fi->n_files_open = 0; fi->n_files_open = 0;
fi->n_files_extant = 0; fi->n_files_extant = 0;
MALLOC_N(fi->n_files_limit, fi->file_infos); MALLOC_N(fi->n_files_limit, fi->file_infos);
if (fi->n_files_limit) return 0; if (fi->file_infos) return 0;
else return errno; else return errno;
} }
...@@ -121,7 +120,7 @@ void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error) ...@@ -121,7 +120,7 @@ void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error)
// If !is_error then requires that all the temp files have been closed and destroyed // If !is_error then requires that all the temp files have been closed and destroyed
// No error codes are returned. If anything goes wrong with closing and unlinking then it's only in an is_error case, so we don't care. // No error codes are returned. If anything goes wrong with closing and unlinking then it's only in an is_error case, so we don't care.
{ {
toku_pthread_mutex_destroy(&fi->lock); int r = toku_pthread_mutex_destroy(&fi->lock); assert(r == 0);
if (!is_error) { if (!is_error) {
assert(fi->n_files_open==0); assert(fi->n_files_open==0);
assert(fi->n_files_extant==0); assert(fi->n_files_extant==0);
...@@ -239,6 +238,7 @@ int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx) ...@@ -239,6 +238,7 @@ int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx)
} }
static void brtloader_destroy (BRTLOADER bl, BOOL is_error) { static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
int r = toku_pthread_mutex_destroy(&bl->mutex); assert(r == 0);
// These frees rely on the fact that if you free a NULL pointer then nothing bad happens. // These frees rely on the fact that if you free a NULL pointer then nothing bad happens.
toku_free(bl->dbs); toku_free(bl->dbs);
toku_free(bl->descriptors); toku_free(bl->descriptors);
...@@ -259,7 +259,6 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) { ...@@ -259,7 +259,6 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
destroy_merge_fileset(&bl->fs[i]); destroy_merge_fileset(&bl->fs[i]);
toku_free(bl->fs); toku_free(bl->fs);
//
for (int i=0; i<bl->N; i++) { for (int i=0; i<bl->N; i++) {
assert(bl->fractal_queues[i]==NULL); // !!! If this isn't true, we may have to kill the pthreads and destroy the fractal trees. For now just barf. assert(bl->fractal_queues[i]==NULL); // !!! If this isn't true, we may have to kill the pthreads and destroy the fractal trees. For now just barf.
} }
...@@ -267,6 +266,7 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) { ...@@ -267,6 +266,7 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
toku_free(bl->fractal_queues); toku_free(bl->fractal_queues);
toku_free(bl->fractal_threads_live); toku_free(bl->fractal_threads_live);
if (bl->cachetable)
toku_cachetable_release_reserved_memory(bl->cachetable, bl->reserved_memory); toku_cachetable_release_reserved_memory(bl->cachetable, bl->reserved_memory);
brt_loader_destroy_error_callback(&bl->error_callback); brt_loader_destroy_error_callback(&bl->error_callback);
...@@ -314,7 +314,8 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -314,7 +314,8 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bl->generate_row_for_put = g; bl->generate_row_for_put = g;
bl->cachetable = cachetable; bl->cachetable = cachetable;
bl->reserved_memory = toku_cachetable_reserve_memory(cachetable, 0.5); if (bl->cachetable)
bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 0.5);
bl->src_db = src_db; bl->src_db = src_db;
bl->N = N; bl->N = N;
...@@ -345,8 +346,8 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -345,8 +346,8 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bl->n_rows = 0; bl->n_rows = 0;
bl->progress = 0; bl->progress = 0;
bl->rows = (struct rowset *) toku_malloc(N*sizeof(struct rowset)); if (bl->rows == NULL) return ENOSPC; bl->rows = (struct rowset *) toku_malloc(N*sizeof(struct rowset)); if (bl->rows == NULL) return errno;
bl->fs = (struct merge_fileset *) toku_malloc(N*sizeof(struct merge_fileset)); if (bl->rows == NULL) return ENOSPC; bl->fs = (struct merge_fileset *) toku_malloc(N*sizeof(struct merge_fileset)); if (bl->rows == NULL) return errno;
for(int i=0;i<N;i++) { for(int i=0;i<N;i++) {
{ int r = init_rowset(&bl->rows[i]); if (r!=0) return r; } { int r = init_rowset(&bl->rows[i]); if (r!=0) return r; }
init_merge_fileset(&bl->fs[i]); init_merge_fileset(&bl->fs[i]);
...@@ -358,6 +359,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -358,6 +359,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
{ int r = init_rowset(&bl->primary_rowset); if (r!=0) return r; } { int r = init_rowset(&bl->primary_rowset); if (r!=0) return r; }
{ int r = queue_create(&bl->primary_rowset_queue, 2); if (r!=0) return r; } { int r = queue_create(&bl->primary_rowset_queue, 2); if (r!=0) return r; }
//printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__); //printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__);
{ int r = toku_pthread_mutex_init(&bl->mutex, NULL); if (r != 0) return r; }
{ int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); if (r!=0) return r; } { int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); if (r!=0) return r; }
bl->extractor_live = TRUE; bl->extractor_live = TRUE;
...@@ -367,10 +369,14 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -367,10 +369,14 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
} }
static void brt_loader_set_panic(BRTLOADER bl, int error) { static void brt_loader_set_panic(BRTLOADER bl, int error) {
// RFP2578 fix races on bl->panic etc int r = toku_pthread_mutex_lock(&bl->mutex); assert(r == 0);
if (!bl->panic) { BOOL is_panic = bl->panic;
if (!is_panic) {
bl->panic = TRUE; bl->panic = TRUE;
bl->panic_errno = error; bl->panic_errno = error;
}
r = toku_pthread_mutex_unlock(&bl->mutex); assert(r == 0);
if (!is_panic) {
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL); brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
} }
} }
...@@ -569,26 +575,28 @@ static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *ke ...@@ -569,26 +575,28 @@ static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *ke
int init_rowset (struct rowset *rows) int init_rowset (struct rowset *rows)
/* Effect: Initialize a collection of rows to be empty. */ /* Effect: Initialize a collection of rows to be empty. */
{ {
int result = 0;
rows->rows = NULL; rows->rows = NULL;
rows->data = NULL; rows->data = NULL;
rows->n_rows = 0; rows->n_rows = 0;
rows->n_rows_limit = 100; rows->n_rows_limit = 100;
MALLOC_N(rows->n_rows_limit, rows->rows); MALLOC_N(rows->n_rows_limit, rows->rows);
int err = errno; if (rows->rows == NULL)
result = errno;
rows->n_bytes = 0; rows->n_bytes = 0;
rows->n_bytes_limit = 1024*size_factor*16; rows->n_bytes_limit = 1024*size_factor*16;
rows->data = (char *) toku_malloc(rows->n_bytes_limit); rows->data = (char *) toku_malloc(rows->n_bytes_limit);
if (rows->rows==NULL || rows->data==NULL) { if (rows->rows==NULL || rows->data==NULL) {
int r = errno ? errno : err; if (result == 0)
result = errno;
toku_free(rows->rows); toku_free(rows->rows);
toku_free(rows->data); toku_free(rows->data);
rows->rows = NULL; rows->rows = NULL;
rows->data = NULL; rows->data = NULL;
return r;
} else {
return 0;
} }
return result;
} }
static void zero_rowset (struct rowset *rows) { static void zero_rowset (struct rowset *rows) {
...@@ -690,8 +698,6 @@ static void* extractor_thread (void *blv) { ...@@ -690,8 +698,6 @@ static void* extractor_thread (void *blv) {
//printf("%s:%d extractor got %ld rows\n", __FILE__, __LINE__, primary_rowset.n_rows); //printf("%s:%d extractor got %ld rows\n", __FILE__, __LINE__, primary_rowset.n_rows);
assert(primary_rowset->n_rows>0);
// Now we have some rows to output // Now we have some rows to output
{ {
r = process_primary_rows(bl, primary_rowset); r = process_primary_rows(bl, primary_rowset);
...@@ -783,6 +789,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -783,6 +789,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
// if primary_rowset is NULL then treat it as empty. // if primary_rowset is NULL then treat it as empty.
{ {
int error_count = 0; int error_count = 0;
// cilk++ bug int error_codes[bl-N];
int *MALLOC_N(bl->N, error_codes); int *MALLOC_N(bl->N, error_codes);
// Do parallelize this loop with cilk_grainsize = 1 so that every iteration will run in parallel. // Do parallelize this loop with cilk_grainsize = 1 so that every iteration will run in parallel.
...@@ -867,7 +874,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -867,7 +874,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
for (int i=0; i<bl->N; i++) { for (int i=0; i<bl->N; i++) {
if (error_codes[i]) r = error_codes[i]; if (error_codes[i]) r = error_codes[i];
} }
assert(r); // could not find the error code. This is an error in the program if we get here. assert(r); // found the error
} }
toku_free(error_codes); toku_free(error_codes);
BL_TRACE(blt_extractor); BL_TRACE(blt_extractor);
...@@ -1161,13 +1168,13 @@ static int update_progress (int N, ...@@ -1161,13 +1168,13 @@ static int update_progress (int N,
{ {
// Need a lock here because of cilk and also the various pthreads. // Need a lock here because of cilk and also the various pthreads.
// Must protect the increment and the call to the poll_function. // Must protect the increment and the call to the poll_function.
toku_pthread_mutex_lock(&update_progress_lock); { int r = toku_pthread_mutex_lock(&update_progress_lock); assert(r == 0); }
bl->progress+=N; bl->progress+=N;
//printf(" %20s: %d ", message, bl->progress); //printf(" %20s: %d ", message, bl->progress);
int r = brt_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX); int result = brt_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX);
toku_pthread_mutex_unlock(&update_progress_lock); { int r = toku_pthread_mutex_unlock(&update_progress_lock); assert(r == 0); }
return r; return result;
} }
CILK_BEGIN CILK_BEGIN
...@@ -2190,8 +2197,11 @@ int toku_brt_loader_close (BRTLOADER bl, ...@@ -2190,8 +2197,11 @@ int toku_brt_loader_close (BRTLOADER bl,
brt_loader_set_poll_function(&bl->poll_callback, poll_function, poll_extra); brt_loader_set_poll_function(&bl->poll_callback, poll_function, poll_extra);
if (bl->extractor_live) {
r = finish_extractor(bl); r = finish_extractor(bl);
assert(r==0); // !!! should check this error code and cleanup if needed. assert(r == 0); // LAZY !!! should check this error code and cleanup if needed.
assert(!bl->extractor_live);
}
// check for an error during extraction // check for an error during extraction
r = brt_loader_call_error_function(&bl->error_callback); r = brt_loader_call_error_function(&bl->error_callback);
...@@ -2205,6 +2215,18 @@ int toku_brt_loader_close (BRTLOADER bl, ...@@ -2205,6 +2215,18 @@ int toku_brt_loader_close (BRTLOADER bl,
return r; return r;
} }
int toku_brt_loader_finish_extractor(BRTLOADER bl) {
int result = 0;
if (!bl->extractor_live)
result = EINVAL;
else {
int r = finish_extractor(bl);
if (r)
result = r;
}
return result;
}
int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error) int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
/* Effect : Abort the bulk loader, free brtloader resources */ /* Effect : Abort the bulk loader, free brtloader resources */
{ {
...@@ -2212,9 +2234,9 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error) ...@@ -2212,9 +2234,9 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
if (bl->extractor_live) { if (bl->extractor_live) {
int r = finish_extractor(bl); int r = finish_extractor(bl);
assert(r == 0); assert(r == 0);
assert(!bl->extractor_live);
} }
assert(!bl->extractor_live);
for (int i = 0; i < bl->N; i++) for (int i = 0; i < bl->N; i++)
assert(!bl->fractal_threads_live[i]); assert(!bl->fractal_threads_live[i]);
...@@ -2223,6 +2245,15 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error) ...@@ -2223,6 +2245,15 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
return result; return result;
} }
int toku_brt_loader_get_error(BRTLOADER bl, int *error) {
*error = 0;
if (bl->panic)
*error = bl->panic_errno;
else if (bl->error_callback.error)
*error = bl->error_callback.error;
return 0;
}
static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen) { static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen) {
lbuf->n_in_buf++; lbuf->n_in_buf++;
lbuf->nkeys++; // assume NODUP lbuf->nkeys++; // assume NODUP
......
...@@ -44,9 +44,12 @@ void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FIL ...@@ -44,9 +44,12 @@ void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FIL
// For test purposes only // For test purposes only
void toku_brtloader_set_size_factor (uint32_t factor); void toku_brtloader_set_size_factor (uint32_t factor);
int toku_brt_loader_finish_extractor(BRTLOADER bl);
int toku_brt_loader_get_error(BRTLOADER bl, int *loader_errno);
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
}; }
#endif #endif
#endif // BRTLOADER_H #endif // BRTLOADER_H
...@@ -94,9 +94,9 @@ check_test-assert$(BINSUF): test-assert$(BINSUF) $(PTHREAD_LOCAL) ...@@ -94,9 +94,9 @@ check_test-assert$(BINSUF): test-assert$(BINSUF) $(PTHREAD_LOCAL)
check_brtloader-test$(BINSUF): EXTRA_ARGS=dir.brtloader-test check_brtloader-test$(BINSUF): EXTRA_ARGS=dir.brtloader-test
check_brtloader-test-write-dbfile$(BINSUF): EXTRA_ARGS=-s -r 100000 dir.brtloader-test-write-dbfile check_brtloader-test-writer$(BINSUF): EXTRA_ARGS=-s -r 100000 dir.brtloader-test-writer
check_brtloader-test-write-dbfile-enospc$(BINSUF): EXTRA_ARGS=-s -r 10000 dir.brtloader-test-write-dbfile-enospc check_brtloader-test-writer-errors$(BINSUF): EXTRA_ARGS=-s -r 10000 dir.brtloader-test-writer-errors
brtloader-%$(BINSUF): brtloader-%.$(OEXT) brtloader-%$(BINSUF): brtloader-%.$(OEXT)
ifeq ($(BRTLOADER),cilk) ifeq ($(BRTLOADER),cilk)
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: pqueue.c$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
// The purpose of this test is to test the extractor component of the brt loader
#define DONT_DEPRECATE_MALLOC
#define DONT_DEPRECATE_WRITES
#include "test.h"
#include "brtloader.h"
#include "brtloader-internal.h"
#include "memory.h"
#if defined(__cplusplus)
extern "C" {
#endif
#if 0
static int my_malloc_count = 0;
static int my_malloc_trigger = 0;
static void set_my_malloc_trigger(int n) {
my_malloc_count = 0;
my_malloc_trigger = n;
}
static void *my_malloc(size_t n) {
my_malloc_count++;
if (my_malloc_count == my_malloc_trigger) {
errno = ENOSPC;
return NULL;
} else
return malloc(n);
}
#endif
static int write_count, write_count_trigger, write_enospc;
static void reset_write_counts(void) {
write_count = write_count_trigger = write_enospc = 0;
}
static void count_enospc(void) {
write_enospc++;
}
static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
write_count++;
size_t r;
if (write_count_trigger == write_count) {
count_enospc();
errno = ENOSPC;
r = -1;
} else {
r = fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) {
errno = ferror(stream);
}
}
return r;
}
static ssize_t bad_write(int fd, const void * bp, size_t len) {
ssize_t r;
write_count++;
if (write_count_trigger == write_count) {
count_enospc();
errno = ENOSPC;
r = -1;
} else {
r = write(fd, bp, len);
}
return r;
}
static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
ssize_t r;
write_count++;
if (write_count_trigger == write_count) {
count_enospc();
errno = ENOSPC;
r = -1;
} else {
r = pwrite(fd, bp, len, off);
}
return r;
}
static int generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_val = dest_val; src_key = src_key; src_val = src_val; extra = extra;
return EINVAL;
}
static int qsort_compare_ints (const void *a, const void *b) {
int avalue = *(int*)a;
int bvalue = *(int*)b;
if (avalue<bvalue) return -1;
if (avalue>bvalue) return +1;
return 0;
}
static int compare_int(DB *dest_db, const DBT *akey, const DBT *bkey) {
assert(dest_db == NULL);
assert(akey->size == sizeof (int));
assert(bkey->size == sizeof (int));
return qsort_compare_ints(akey->data, bkey->data);
}
static void populate_rowset(struct rowset *rowset, int seq, int nrows) {
for (int i = 0; i < nrows; i++) {
int k = htonl(seq + i);
int v = seq + i;
DBT key = { .size = sizeof k, .data = &k };
DBT val = { .size = sizeof v, .data = &v };
add_row(rowset, &key, &val);
}
}
static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) {
if (verbose) printf("%s %d %d\n", __FUNCTION__, nrows, nrowsets);
int r;
// open the brtloader. this runs the extractor.
const int N = 1;
DB *dbs[N];
const struct descriptor *descriptors[N];
const char *fnames[N];
brt_compare_func compares[N];
for (int i = 0; i < N; i++) {
dbs[i] = NULL;
descriptors[i] = NULL;
fnames[i] = "";
compares[i] = compare_int;
}
BRTLOADER loader;
r = toku_brt_loader_open(&loader, NULL, generate, NULL, N, dbs, descriptors, fnames, compares, "tempXXXXXX", ZERO_LSN);
assert(r == 0);
struct rowset *rowset[nrowsets];
for (int i = 0 ; i < nrowsets; i++) {
rowset[i] = (struct rowset *) toku_malloc(sizeof (struct rowset));
assert(rowset[i]);
init_rowset(rowset[i]);
populate_rowset(rowset[i], i, nrows);
}
// setup error injection
brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite);
// feed rowsets to the extractor
for (int i = 0; i < nrowsets; i++) {
r = queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL);
assert(r == 0);
}
brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite);
// verify the temp files
// abort the brtloader. this ends the test
r = toku_brt_loader_abort(loader, TRUE);
assert(r == 0);
expect_fail = expect_fail;
}
static int nrows = 1;
static int nrowsets = 2;
static int usage(const char *progname) {
fprintf(stderr, "Usage:\n %s [-h] [-v] [-q] [-s] [-r %d] [--rowsets %d]\n", progname, nrows, nrowsets);
return 1;
}
int test_main (int argc, const char *argv[]) {
const char *progname=argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0],"-h")==0) {
return usage(progname);
} else if (strcmp(argv[0],"-v")==0) {
verbose=1;
} else if (strcmp(argv[0],"-q")==0) {
verbose=0;
} else if (strcmp(argv[0],"-r") == 0 && argc >= 1) {
argc--; argv++;
nrows = atoi(argv[0]);
} else if (strcmp(argv[0],"--nrowsets") == 0 && argc >= 1) {
argc--; argv++;
nrowsets = atoi(argv[0]);
} else if (strcmp(argv[0],"-s") == 0) {
toku_brtloader_set_size_factor(1);
} else if (argc!=1) {
return usage(progname);
exit(1);
}
else {
break;
}
argc--; argv++;
}
// callibrate
test_extractor(nrows, nrowsets, FALSE);
// run tests
int write_error_limit = write_count;
if (verbose) printf("write_error_limit=%d\n", write_error_limit);
for (int i = 1; i <= write_error_limit; i++) {
reset_write_counts();
write_count_trigger = i;
test_extractor(nrows, nrowsets, TRUE);
}
return 0;
}
#if defined(__cplusplus)
}
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: pqueue.c$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
// The purpose of this test is to test the extractor component of the brt loader
#define DONT_DEPRECATE_MALLOC
#define DONT_DEPRECATE_WRITES
#include "test.h"
#include "brtloader.h"
#include "brtloader-internal.h"
#include "memory.h"
#if defined(__cplusplus)
extern "C" {
#endif
#if 0
static int my_malloc_count = 0;
static int my_malloc_trigger = 0;
static void set_my_malloc_trigger(int n) {
my_malloc_count = 0;
my_malloc_trigger = n;
}
static void *my_malloc(size_t n) {
my_malloc_count++;
if (my_malloc_count == my_malloc_trigger) {
errno = ENOSPC;
return NULL;
} else
return malloc(n);
}
#endif
static int write_count, write_count_trigger, write_enospc;
static void reset_write_counts(void) {
write_count = write_count_trigger = write_enospc = 0;
}
static void count_enospc(void) {
write_enospc++;
}
static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
write_count++;
size_t r;
if (write_count_trigger == write_count) {
count_enospc();
errno = ENOSPC;
r = -1;
} else {
r = fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) {
errno = ferror(stream);
}
}
return r;
}
static ssize_t bad_write(int fd, const void * bp, size_t len) {
ssize_t r;
write_count++;
if (write_count_trigger == write_count) {
count_enospc();
errno = ENOSPC;
r = -1;
} else {
r = write(fd, bp, len);
}
return r;
}
static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
ssize_t r;
write_count++;
if (write_count_trigger == write_count) {
count_enospc();
errno = ENOSPC;
r = -1;
} else {
r = pwrite(fd, bp, len, off);
}
return r;
}
static void copy_dbt(DBT *dest, const DBT *src) {
assert(dest->flags & DB_DBT_REALLOC);
dest->data = toku_realloc(dest->data, src->size);
dest->size = src->size;
memcpy(dest->data, src->data, src->size);
}
static int generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_val = dest_val; src_key = src_key; src_val = src_val; extra = extra;
copy_dbt(dest_key, src_key);
copy_dbt(dest_val, src_val);
return 0;
}
static int qsort_compare_ints (const void *a, const void *b) {
int avalue = *(int*)a;
int bvalue = *(int*)b;
if (avalue<bvalue) return -1;
if (avalue>bvalue) return +1;
return 0;
}
static int compare_int(DB *dest_db, const DBT *akey, const DBT *bkey) {
assert(dest_db == NULL);
assert(akey->size == sizeof (int));
assert(bkey->size == sizeof (int));
return qsort_compare_ints(akey->data, bkey->data);
}
static void populate_rowset(struct rowset *rowset, int seq, int nrows) {
for (int i = 0; i < nrows; i++) {
int k = htonl(seq * nrows + i);
int v = seq * nrows + i;
DBT key = { .size = sizeof k, .data = &k };
DBT val = { .size = sizeof v, .data = &v };
add_row(rowset, &key, &val);
}
}
static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) {
if (verbose) printf("%s %d %d\n", __FUNCTION__, nrows, nrowsets);
int r;
// open the brtloader. this runs the extractor.
const int N = 1;
DB *dbs[N];
const struct descriptor *descriptors[N];
const char *fnames[N];
brt_compare_func compares[N];
for (int i = 0; i < N; i++) {
dbs[i] = NULL;
descriptors[i] = NULL;
fnames[i] = "";
compares[i] = compare_int;
}
BRTLOADER loader;
r = toku_brt_loader_open(&loader, NULL, generate, NULL, N, dbs, descriptors, fnames, compares, "tempXXXXXX", ZERO_LSN);
assert(r == 0);
struct rowset *rowset[nrowsets];
for (int i = 0 ; i < nrowsets; i++) {
rowset[i] = (struct rowset *) toku_malloc(sizeof (struct rowset));
assert(rowset[i]);
init_rowset(rowset[i]);
populate_rowset(rowset[i], i, nrows);
}
// setup error injection
brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite);
// feed rowsets to the extractor
for (int i = 0; i < nrowsets; i++) {
r = queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL);
assert(r == 0);
}
r = toku_brt_loader_finish_extractor(loader);
assert(r == 0);
brtloader_set_os_fwrite(NULL);
toku_set_func_write(NULL);
toku_set_func_pwrite(NULL);
int error;
r = toku_brt_loader_get_error(loader, &error);
assert(r == 0);
assert(expect_fail ? error != 0 : error == 0);
// verify the temp files
// abort the brtloader. this ends the test
r = toku_brt_loader_abort(loader, TRUE);
assert(r == 0);
expect_fail = expect_fail;
}
static int nrows = 1;
static int nrowsets = 2;
static int usage(const char *progname) {
fprintf(stderr, "Usage:\n %s [-h] [-v] [-q] [-s] [-r %d] [--rowsets %d]\n", progname, nrows, nrowsets);
return 1;
}
int test_main (int argc, const char *argv[]) {
const char *progname=argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0],"-h")==0) {
return usage(progname);
} else if (strcmp(argv[0],"-v")==0) {
verbose=1;
} else if (strcmp(argv[0],"-q")==0) {
verbose=0;
} else if (strcmp(argv[0],"-r") == 0 && argc >= 1) {
argc--; argv++;
nrows = atoi(argv[0]);
} else if (strcmp(argv[0],"--rowsets") == 0 && argc >= 1) {
argc--; argv++;
nrowsets = atoi(argv[0]);
} else if (strcmp(argv[0],"-s") == 0) {
toku_brtloader_set_size_factor(1);
} else if (argc!=1) {
return usage(progname);
exit(1);
}
else {
break;
}
argc--; argv++;
}
// callibrate
test_extractor(nrows, nrowsets, FALSE);
// run tests
int write_error_limit = write_count;
if (verbose) printf("write_error_limit=%d\n", write_error_limit);
for (int i = 1; i <= write_error_limit; i++) {
reset_write_counts();
write_count_trigger = i;
test_extractor(nrows, nrowsets, TRUE);
}
return 0;
}
#if defined(__cplusplus)
}
#endif
...@@ -217,12 +217,19 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -217,12 +217,19 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_destroy_error_callback(&bl.error_callback); brt_loader_destroy_error_callback(&bl.error_callback);
} }
static int usage(const char *progname, int n) {
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] directory\n", progname, n);
return 1;
}
int test_main (int argc, const char *argv[]) { int test_main (int argc, const char *argv[]) {
const char *progname=argv[0]; const char *progname=argv[0];
int n = 1; int n = 1;
argc--; argv++; argc--; argv++;
while (argc>0) { while (argc>0) {
if (strcmp(argv[0],"-v")==0) { if (strcmp(argv[0],"-h")==0) {
return usage(progname, n);
} else if (strcmp(argv[0],"-v")==0) {
verbose=1; verbose=1;
} else if (strcmp(argv[0],"-q")==0) { } else if (strcmp(argv[0],"-q")==0) {
verbose=0; verbose=0;
...@@ -232,8 +239,7 @@ int test_main (int argc, const char *argv[]) { ...@@ -232,8 +239,7 @@ int test_main (int argc, const char *argv[]) {
} else if (strcmp(argv[0],"-s") == 0) { } else if (strcmp(argv[0],"-s") == 0) {
toku_brtloader_set_size_factor(1); toku_brtloader_set_size_factor(1);
} else if (argc!=1) { } else if (argc!=1) {
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] directory\n", progname, n); return usage(progname, n);
exit(1);
} }
else { else {
break; break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment