Commit 2ef2752c authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

cleanup merge_some_files error path [t:2642]

git-svn-id: file:///svn/toku/tokudb@20533 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4b9a5ce3
......@@ -121,17 +121,18 @@ static void cleanup_big_buffer(struct file_info *file) {
}
int brtloader_init_file_infos (struct file_infos *fi) {
int result = 0;
int r = toku_pthread_mutex_init(&fi->lock, NULL); resource_assert(r == 0);
fi->n_files = 0;
fi->n_files_limit = 1;
fi->n_files_open = 0;
fi->n_files_extant = 0;
MALLOC_N(fi->n_files_limit, fi->file_infos);
if (fi->file_infos == NULL) {
result = errno;
if (fi->file_infos) return 0;
else {
int result = errno;
toku_pthread_mutex_destroy(&fi->lock); // lazy no error check and maybe done elsewhere
return result;
}
return result;
}
void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error)
......@@ -391,6 +392,9 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
BL_TRACE(blt_calibrate_done);
#endif
bl->panic = FALSE;
bl->panic_errno = 0;
bl->generate_row_for_put = g;
bl->cachetable = cachetable;
if (bl->cachetable)
......@@ -461,6 +465,8 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
}
bl->extractor_live = TRUE;
*blp = bl;
return 0;
......@@ -498,14 +504,12 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bt_compare_functions,
temp_file_template,
load_lsn);
if (r != 0) result = r;
if (r!=0) result = r;
}
if (result == 0) {
if (result==0) {
BRTLOADER bl = *blp;
int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
if (r == 0) {
bl->extractor_live = TRUE;
} else {
if (r!=0) {
result = r;
toku_pthread_mutex_destroy(&bl->mutex);
toku_brtloader_internal_destroy(bl, TRUE);
......@@ -515,10 +519,17 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
return result;
}
static void brt_loader_set_panic(BRTLOADER bl, int error, BOOL callback) {
int r = brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
if (r == 0 && callback)
brt_loader_call_error_function(&bl->error_callback);
static void brt_loader_set_panic(BRTLOADER bl, int error) {
int r = toku_pthread_mutex_lock(&bl->mutex); resource_assert(r == 0);
BOOL is_panic = bl->panic;
if (!is_panic) {
bl->panic = TRUE;
bl->panic_errno = error;
}
r = toku_pthread_mutex_unlock(&bl->mutex); resource_assert(r == 0);
if (!is_panic) {
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
}
}
// One of the tests uses this.
......@@ -866,7 +877,7 @@ static void* extractor_thread (void *blv) {
{
r = process_primary_rows(bl, primary_rowset);
if (r)
brt_loader_set_panic(bl, r, FALSE);
brt_loader_set_panic(bl, r);
}
}
......@@ -874,7 +885,7 @@ static void* extractor_thread (void *blv) {
if (r == 0) {
r = finish_primary_rows(bl);
if (r)
brt_loader_set_panic(bl, r, FALSE);
brt_loader_set_panic(bl, r);
}
BL_TRACE(blt_extractor);
......@@ -1084,7 +1095,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
* Return value: 0 on success, an error number otherwise.
*/
{
if (brt_loader_get_error(&bl->error_callback))
if (bl->panic || brt_loader_get_error(&bl->error_callback))
return EINVAL; // previous panic
bl->n_rows++;
// return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
......@@ -1466,48 +1477,55 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
}
pqueue_t *pq;
pqueue_node_t *pq_nodes = (pqueue_node_t *)toku_malloc(n_sources * sizeof(pqueue_node_t));
pqueue_node_t *pq_nodes = (pqueue_node_t *)toku_malloc(n_sources * sizeof(pqueue_node_t)); // freed in cleanup
invariant(pq_nodes != NULL);
{
int r = pqueue_init(&pq, n_sources, which_db, dest_db, compare, &bl->error_callback);
lazy_assert(r == 0);
if (r) return r;
result = r;
}
u_int64_t n_rows = 0;
// load pqueue with first value from each source
for (int i=0; i<n_sources; i++) {
BL_TRACE_QUIET(blt_do_i);
int r = loader_read_row_from_dbufio(bfs, i, &keys[i], &vals[i]);
BL_TRACE_QUIET(blt_read_row);
if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
lazy_assert(r == 0);
if (r!=0) return r;
if ( result == 0 ) {
// load pqueue with first value from each source
for (int i=0; i<n_sources; i++) {
BL_TRACE_QUIET(blt_do_i);
int r = loader_read_row_from_dbufio(bfs, i, &keys[i], &vals[i]);
BL_TRACE_QUIET(blt_read_row);
if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
lazy_assert(r == 0);
if (r!=0) {
result = r;
break;
}
pq_nodes[i].key = &keys[i];
pq_nodes[i].val = &vals[i];
pq_nodes[i].i = i;
r = pqueue_insert(pq, &pq_nodes[i]);
if (r!=0) {
result = r;
// path tested by loader-dup-test5.tdbrun
// printf("%s:%d returning\n", __FILE__, __LINE__);
break;
}
pq_nodes[i].key = &keys[i];
pq_nodes[i].val = &vals[i];
pq_nodes[i].i = i;
r = pqueue_insert(pq, &pq_nodes[i]);
if (r!=0) {
result = r;
// path tested by loader-dup-test5.tdbrun
// printf("%s:%d returning\n", __FILE__, __LINE__);
break;
}
dataoff[i] = 0;
{ int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); resource_assert(r2==0); }
n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows;
{ int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); resource_assert(r2==0); }
dataoff[i] = 0;
{ int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); resource_assert(r2==0); }
n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows;
{ int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); resource_assert(r2==0); }
}
}
u_int64_t n_rows_done = 0;
struct rowset *output_rowset = NULL;
if (result==0 && to_q) {
XMALLOC(output_rowset);
XMALLOC(output_rowset); // freed in cleanup
int r = init_rowset(output_rowset, memory_per_rowset(bl));
lazy_assert(r==0);
if (r!=0) result = r;
}
//printf(" n_rows=%ld\n", n_rows);
......@@ -1531,18 +1549,28 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq);
lazy_assert(r==0);
MALLOC(output_rowset);
assert(output_rowset);
XMALLOC(output_rowset); // freed in cleanup
r = init_rowset(output_rowset, memory_per_rowset(bl));
lazy_assert(r==0);
if (r!=0) {
result = r;
break;
}
}
r = add_row(output_rowset, &keys[mini], &vals[mini]);
lazy_assert(r == 0);
if (r!=0) {
return = r;
break;
}
} else {
// write it to the dest file
r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
lazy_assert(r==0);
if (r!=0) return r;
if (r!=0) {
return = r;
break;
}
}
{
......@@ -1558,7 +1586,8 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
} else {
printf("%s:%d returning\n", __FILE__, __LINE__);
lazy_assert(0);
return r;
result = r;
break;
}
}
else {
......@@ -1573,7 +1602,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
}
}
}
n_rows_done++;
const u_int64_t rows_per_report = size_factor*1024;
if (n_rows_done%rows_per_report==0) {
......@@ -1589,14 +1618,17 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
result=r;
break;
}
}
}
}
if (result==0 && to_q) {
BL_TRACE(blt_do_i);
int r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq);
assert(r==0); // if (r!=0) result = r;
output_rowset = NULL;
lazy_assert(r==0); //
if (r!=0)
result = r;
else
output_rowset = NULL;
}
// cleanup
......@@ -1773,8 +1805,7 @@ int merge_files (struct merge_fileset *fs,
if (result!=0) break;
}
if (result)
brt_loader_set_panic(bl, result, TRUE);
if (result) brt_loader_set_panic(bl, result);
{
int r = queue_eof(output_q);
if (r!=0 && result==0) result = r;
......@@ -2143,7 +2174,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
seek_align(&out);
int64_t lblock;
result = allocate_block(&out, &lblock);
invariant(result == 0); // can not fail since the first block is reserved above
lazy_assert(result == 0); // can not fail since translations reserved above
struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock);
struct subtree_estimates est = zero_estimates;
est.exact = TRUE;
......@@ -2160,7 +2191,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
BL_TRACE(blt_fractal_deq);
if (rr == EOF) break;
if (rr != 0) {
brt_loader_set_panic(bl, rr, TRUE); // error after cilk sync
brt_loader_set_panic(bl, rr); // error after cilk sync
break;
}
}
......@@ -2196,9 +2227,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
n_pivots++;
if ((r = bl_write_dbt(&key, pivots_stream, NULL, bl))) {
brt_loader_set_panic(bl, r, TRUE); // error after cilk sync
if (result == 0)
result = r;
brt_loader_set_panic(bl, r); // error after cilk sync
if (result == 0) result = r;
break;
}
......@@ -2207,9 +2237,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
r = allocate_block(&out, &lblock);
if (r != 0) {
brt_loader_set_panic(bl, r, TRUE);
if (result == 0)
result = r;
brt_loader_set_panic(bl, r);
if (result == 0) result = r;
break;
}
lbuf = start_leaf(&out, descriptor, lblock);
......@@ -2237,9 +2266,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
cilk_sync;
if (result == 0) {
result = brt_loader_get_error(&bl->error_callback); // if there were any prior errors then exit
if (result) goto error;
if (bl->panic) { // if there were any prior errors then exit
result = bl->panic_errno; goto error;
}
// We haven't paniced, so the sum should add up.
......@@ -2313,6 +2341,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
result = errno; goto error;
}
// Do we need to pay attention to user_said_stop? Or should the guy at the other end of the queue pay attention and send in an EOF.
error:
{
int rr = toku_os_close(fd);
......@@ -2549,7 +2579,11 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
}
int toku_brt_loader_get_error(BRTLOADER bl, int *error) {
*error = brt_loader_get_error(&bl->error_callback);
*error = 0;
if (bl->panic)
*error = bl->panic_errno;
else if (bl->error_callback.error)
*error = bl->error_callback.error;
return 0;
}
......@@ -2696,10 +2730,12 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
//printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
if (result == 0) {
result = update_progress(progress_allocation, bl, "wrote node");
if (result != 0)
bl->user_said_stop = result;
}
if (result)
brt_loader_set_panic(bl, result, TRUE);
brt_loader_set_panic(bl, result);
}
CILK_END
......@@ -2952,7 +2988,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
blocknum_of_new_node = blocknum_of_new_node;
if (result != 0)
brt_loader_set_panic(bl, result, TRUE);
brt_loader_set_panic(bl, result);
}
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) {
......@@ -3054,8 +3090,8 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
cilk_sync;
if (result == 0) // pick up write_nonleaf_node errors
result = brt_loader_get_error(&bl->error_callback);
if (result == 0 && bl->panic) // pick up write_nonleaf_node errors
result = bl->panic_errno;
// Now set things up for the next iteration.
int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment