Commit e736324e authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

fix some error paths in the brtloader extractor.

fix an uninitialized var problem in the ydbloader.
refs[t:2597] refs[t:2598]
merge -r 20145:20165 tokudb.2597 to tokudb



git-svn-id: file:///svn/toku/tokudb@20166 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3e921e65
......@@ -508,11 +508,12 @@ int init_rowset (struct rowset *rows)
rows->n_rows = 0;
rows->n_rows_limit = 100;
MALLOC_N(rows->n_rows_limit, rows->rows);
int err = errno;
rows->n_bytes = 0;
rows->n_bytes_limit = 1024*size_factor*16;
rows->data = (char *) toku_malloc(rows->n_bytes_limit);
if (rows->rows==NULL || rows->data==NULL) {
int r = errno;
int r = errno ? errno : err;
toku_free(rows->rows);
toku_free(rows->data);
rows->rows = NULL;
......@@ -608,14 +609,15 @@ static int finish_primary_rows (BRTLOADER bl) {
static void* extractor_thread (void *blv) {
BL_TRACE(blt_extractor_init);
BRTLOADER bl = (BRTLOADER)blv;
int r = 0;
while (1) {
void *item;
{
BL_TRACE(blt_extractor);
int r = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
int rq = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
BL_TRACE(blt_extract_deq);
if (r==EOF) break;
assert(r==0); // other errors are arbitrarily bad.
if (rq==EOF) break;
assert(rq==0); // other errors are arbitrarily bad.
}
struct rowset *primary_rowset = (struct rowset *)item;
......@@ -625,21 +627,21 @@ static void* extractor_thread (void *blv) {
// Now we have some rows to output
{
int r = process_primary_rows(bl, primary_rowset);
r = process_primary_rows(bl, primary_rowset);
if (r)
brt_loader_set_panic(bl, r);
}
}
//printf("%s:%d extractor finishing\n", __FILE__, __LINE__);
int r = finish_primary_rows(bl);
if (r == 0) {
r = finish_primary_rows(bl);
if (r)
brt_loader_set_panic(bl, r);
}
BL_TRACE(blt_extractor);
return 0;
return NULL;
}
static void enqueue_for_extraction (BRTLOADER bl) {
......@@ -743,10 +745,9 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
pval.data = primary_rowset->data + prow->off + prow->klen;
pval.size = prow->vlen;
{
int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval, NULL);
assert(r==0);
assert(r==0); // LAZY
}
if (row_wont_fit(rows, skey.size + sval.size)) {
......@@ -755,6 +756,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
int progress_this_sort = 0; // fix?
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
BL_TRACE(blt_sort_and_write_rows);
init_rowset(rows); // we passed the contents of rows to sort_and_write_rows.
if (r!=0) {
error_codes[i] = r;
#if defined(__cilkplusplus)
......@@ -764,7 +766,6 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
#endif
break;
}
init_rowset(rows); // we passed the contents of rows to sort_and_write_rows.
}
add_row(rows, &skey, &sval);
......@@ -783,20 +784,23 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
}
{
toku_free(skey.data);
toku_free(sval.data);
skey.data = sval.data = NULL; // set to NULL so that the final cleanup won't free them again.
if (skey.flags) {
toku_free(skey.data); skey.data = NULL;
}
if (sval.flags) {
toku_free(sval.data); sval.data = NULL;
}
}
}
destroy_rowset(primary_rowset);
toku_free(primary_rowset);
int r = 0;
if (error_count>0) {
if (error_count > 0) {
for (int i=0; i<bl->N; i++) {
if (error_codes[i]) r = error_codes[i];
}
assert(0); // could not find the error code. This is an error in the program if we get here.
assert(r); // could not find the error code. This is an error in the program if we get here.
}
toku_free(error_codes);
BL_TRACE(blt_extractor);
......
......@@ -206,6 +206,7 @@ int toku_loader_create_loader(DB_ENV *env,
loader->i->evals[i].flags = DB_DBT_REALLOC;
}
loader->i->brt_loader = NULL;
rval = 0;
}
else {
char **XMALLOC_N(N, new_inames_in_env);
......
......@@ -18,10 +18,12 @@ static free_fun_t t_free = 0;
static realloc_fun_t t_realloc = 0;
void *toku_malloc(size_t size) {
void *p;
if (t_malloc)
return t_malloc(size);
p = t_malloc(size);
else
return os_malloc(size);
p = os_malloc(size);
return p;
}
void *
......@@ -60,7 +62,6 @@ toku_xrealloc(void *v, size_t size)
void *
toku_tagmalloc(size_t size, enum typ_tag typtag)
{
//printf("%s:%d tagmalloc\n", __FILE__, __LINE__);
void *r = toku_malloc(size);
if (!r) return 0;
assert(size>sizeof(int));
......@@ -71,14 +72,16 @@ toku_tagmalloc(size_t size, enum typ_tag typtag)
void *
toku_realloc(void *p, size_t size)
{
void *q;
if (t_realloc)
return t_realloc(p, size);
q = t_realloc(p, size);
else
return os_realloc(p, size);
q = os_realloc(p, size);
return q;
}
void
toku_free(void* p)
toku_free(void *p)
{
if (t_free)
t_free(p);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment