Commit f0068111 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

closes[t:2644] combine the brtloader panic and error callback state

git-svn-id: file:///svn/toku/tokudb@20526 c7de825b-a66e-492c-adef-691d508d4ae1
parent bddfac9a
...@@ -84,14 +84,14 @@ void brt_loader_set_poll_function(brtloader_poll_callback, brt_loader_poll_func ...@@ -84,14 +84,14 @@ void brt_loader_set_poll_function(brtloader_poll_callback, brt_loader_poll_func
int brt_loader_call_poll_function(brtloader_poll_callback, float progress); int brt_loader_call_poll_function(brtloader_poll_callback, float progress);
struct error_callback_s { struct error_callback_s {
int error;
brt_loader_error_func error_callback; brt_loader_error_func error_callback;
void *extra; void *extra;
int did_callback;
int error;
DB *db; DB *db;
int which_db; int which_db;
DBT key; DBT key;
DBT val; DBT val;
BOOL did_callback;
toku_pthread_mutex_t mutex; toku_pthread_mutex_t mutex;
}; };
typedef struct error_callback_s *brtloader_error_callback; typedef struct error_callback_s *brtloader_error_callback;
...@@ -111,8 +111,9 @@ int brt_loader_call_error_function(brtloader_error_callback); ...@@ -111,8 +111,9 @@ int brt_loader_call_error_function(brtloader_error_callback);
int brt_loader_set_error_and_callback(brtloader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val); int brt_loader_set_error_and_callback(brtloader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
struct brtloader_s { struct brtloader_s {
BOOL panic; // These two are set in the close function, and used while running close
int panic_errno; struct error_callback_s error_callback;
struct poll_callback_s poll_callback;
generate_row_for_put_func generate_row_for_put; generate_row_for_put_func generate_row_for_put;
brt_compare_func *bt_compare_funs; brt_compare_func *bt_compare_funs;
...@@ -148,11 +149,6 @@ struct brtloader_s { ...@@ -148,11 +149,6 @@ struct brtloader_s {
int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0 int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0
// We use an integer so that we can add to the progress using a fetch-and-add instruction. // We use an integer so that we can add to the progress using a fetch-and-add instruction.
// These two are set in the close function, and used while running close
struct error_callback_s error_callback;
struct poll_callback_s poll_callback;
int user_said_stop; // 0 if the poll_function always returned zero. If it ever returns nonzero, then store that value here.
LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in brt headers made by this loader. LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in brt headers made by this loader.
QUEUE *fractal_queues; // an array of work queues, one for each secondary index. QUEUE *fractal_queues; // an array of work queues, one for each secondary index.
...@@ -234,6 +230,13 @@ void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error); ...@@ -234,6 +230,13 @@ void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error);
enum { disksize_row_overhead = 9 }; // how much overhead for a row in the fractal tree enum { disksize_row_overhead = 9 }; // how much overhead for a row in the fractal tree
// For test purposes only. (In production, the rowset size is determined by negotation with the cachetable for some memory. See #2613.)
uint64_t toku_brtloader_get_rowset_budget_for_testing (void);
int toku_brt_loader_finish_extractor(BRTLOADER bl);
int toku_brt_loader_get_error(BRTLOADER bl, int *loader_errno);
C_END C_END
#endif #endif
...@@ -121,18 +121,17 @@ static void cleanup_big_buffer(struct file_info *file) { ...@@ -121,18 +121,17 @@ static void cleanup_big_buffer(struct file_info *file) {
} }
int brtloader_init_file_infos (struct file_infos *fi) { int brtloader_init_file_infos (struct file_infos *fi) {
int result = 0;
int r = toku_pthread_mutex_init(&fi->lock, NULL); resource_assert(r == 0); int r = toku_pthread_mutex_init(&fi->lock, NULL); resource_assert(r == 0);
fi->n_files = 0; fi->n_files = 0;
fi->n_files_limit = 1; fi->n_files_limit = 1;
fi->n_files_open = 0; fi->n_files_open = 0;
fi->n_files_extant = 0; fi->n_files_extant = 0;
MALLOC_N(fi->n_files_limit, fi->file_infos); MALLOC_N(fi->n_files_limit, fi->file_infos);
if (fi->file_infos) return 0; if (fi->file_infos == NULL) {
else { result = errno;
int result = errno;
toku_pthread_mutex_destroy(&fi->lock); // lazy no error check and maybe done elsewhere
return result;
} }
return result;
} }
void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error) void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error)
...@@ -392,9 +391,6 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -392,9 +391,6 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
BL_TRACE(blt_calibrate_done); BL_TRACE(blt_calibrate_done);
#endif #endif
bl->panic = FALSE;
bl->panic_errno = 0;
bl->generate_row_for_put = g; bl->generate_row_for_put = g;
bl->cachetable = cachetable; bl->cachetable = cachetable;
if (bl->cachetable) if (bl->cachetable)
...@@ -465,8 +461,6 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -465,8 +461,6 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; } if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
bl->extractor_live = TRUE;
*blp = bl; *blp = bl;
return 0; return 0;
...@@ -504,12 +498,14 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -504,12 +498,14 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bt_compare_functions, bt_compare_functions,
temp_file_template, temp_file_template,
load_lsn); load_lsn);
if (r!=0) result = r; if (r != 0) result = r;
} }
if (result==0) { if (result == 0) {
BRTLOADER bl = *blp; BRTLOADER bl = *blp;
int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
if (r!=0) { if (r == 0) {
bl->extractor_live = TRUE;
} else {
result = r; result = r;
toku_pthread_mutex_destroy(&bl->mutex); toku_pthread_mutex_destroy(&bl->mutex);
toku_brtloader_internal_destroy(bl, TRUE); toku_brtloader_internal_destroy(bl, TRUE);
...@@ -519,17 +515,10 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -519,17 +515,10 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
return result; return result;
} }
static void brt_loader_set_panic(BRTLOADER bl, int error) { static void brt_loader_set_panic(BRTLOADER bl, int error, BOOL callback) {
int r = toku_pthread_mutex_lock(&bl->mutex); resource_assert(r == 0); int r = brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
BOOL is_panic = bl->panic; if (r == 0 && callback)
if (!is_panic) { brt_loader_call_error_function(&bl->error_callback);
bl->panic = TRUE;
bl->panic_errno = error;
}
r = toku_pthread_mutex_unlock(&bl->mutex); resource_assert(r == 0);
if (!is_panic) {
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
}
} }
// One of the tests uses this. // One of the tests uses this.
...@@ -877,7 +866,7 @@ static void* extractor_thread (void *blv) { ...@@ -877,7 +866,7 @@ static void* extractor_thread (void *blv) {
{ {
r = process_primary_rows(bl, primary_rowset); r = process_primary_rows(bl, primary_rowset);
if (r) if (r)
brt_loader_set_panic(bl, r); brt_loader_set_panic(bl, r, FALSE);
} }
} }
...@@ -885,7 +874,7 @@ static void* extractor_thread (void *blv) { ...@@ -885,7 +874,7 @@ static void* extractor_thread (void *blv) {
if (r == 0) { if (r == 0) {
r = finish_primary_rows(bl); r = finish_primary_rows(bl);
if (r) if (r)
brt_loader_set_panic(bl, r); brt_loader_set_panic(bl, r, FALSE);
} }
BL_TRACE(blt_extractor); BL_TRACE(blt_extractor);
...@@ -1095,7 +1084,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val) ...@@ -1095,7 +1084,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
* Return value: 0 on success, an error number otherwise. * Return value: 0 on success, an error number otherwise.
*/ */
{ {
if (bl->panic || brt_loader_get_error(&bl->error_callback)) if (brt_loader_get_error(&bl->error_callback))
return EINVAL; // previous panic return EINVAL; // previous panic
bl->n_rows++; bl->n_rows++;
// return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl); // return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
...@@ -1784,7 +1773,8 @@ int merge_files (struct merge_fileset *fs, ...@@ -1784,7 +1773,8 @@ int merge_files (struct merge_fileset *fs,
if (result!=0) break; if (result!=0) break;
} }
if (result) brt_loader_set_panic(bl, result); if (result)
brt_loader_set_panic(bl, result, TRUE);
{ {
int r = queue_eof(output_q); int r = queue_eof(output_q);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
...@@ -2153,7 +2143,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2153,7 +2143,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
seek_align(&out); seek_align(&out);
int64_t lblock; int64_t lblock;
result = allocate_block(&out, &lblock); result = allocate_block(&out, &lblock);
lazy_assert(result == 0); // can not fail since translations reserved above invariant(result == 0); // can not fail since the first block is reserved above
struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock); struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock);
struct subtree_estimates est = zero_estimates; struct subtree_estimates est = zero_estimates;
est.exact = TRUE; est.exact = TRUE;
...@@ -2170,7 +2160,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2170,7 +2160,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
BL_TRACE(blt_fractal_deq); BL_TRACE(blt_fractal_deq);
if (rr == EOF) break; if (rr == EOF) break;
if (rr != 0) { if (rr != 0) {
brt_loader_set_panic(bl, rr); // error after cilk sync brt_loader_set_panic(bl, rr, TRUE); // error after cilk sync
break; break;
} }
} }
...@@ -2206,8 +2196,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2206,8 +2196,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
n_pivots++; n_pivots++;
if ((r = bl_write_dbt(&key, pivots_stream, NULL, bl))) { if ((r = bl_write_dbt(&key, pivots_stream, NULL, bl))) {
brt_loader_set_panic(bl, r); // error after cilk sync brt_loader_set_panic(bl, r, TRUE); // error after cilk sync
if (result == 0) result = r; if (result == 0)
result = r;
break; break;
} }
...@@ -2216,8 +2207,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2216,8 +2207,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
r = allocate_block(&out, &lblock); r = allocate_block(&out, &lblock);
if (r != 0) { if (r != 0) {
brt_loader_set_panic(bl, r); brt_loader_set_panic(bl, r, TRUE);
if (result == 0) result = r; if (result == 0)
result = r;
break; break;
} }
lbuf = start_leaf(&out, descriptor, lblock); lbuf = start_leaf(&out, descriptor, lblock);
...@@ -2245,8 +2237,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2245,8 +2237,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
cilk_sync; cilk_sync;
if (bl->panic) { // if there were any prior errors then exit if (result == 0) {
result = bl->panic_errno; goto error; result = brt_loader_get_error(&bl->error_callback); // if there were any prior errors then exit
if (result) goto error;
} }
// We haven't paniced, so the sum should add up. // We haven't paniced, so the sum should add up.
...@@ -2320,8 +2313,6 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2320,8 +2313,6 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
result = errno; goto error; result = errno; goto error;
} }
// Do we need to pay attention to user_said_stop? Or should the guy at the other end of the queue pay attention and send in an EOF.
error: error:
{ {
int rr = toku_os_close(fd); int rr = toku_os_close(fd);
...@@ -2558,11 +2549,7 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error) ...@@ -2558,11 +2549,7 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
} }
int toku_brt_loader_get_error(BRTLOADER bl, int *error) { int toku_brt_loader_get_error(BRTLOADER bl, int *error) {
*error = 0; *error = brt_loader_get_error(&bl->error_callback);
if (bl->panic)
*error = bl->panic_errno;
else if (bl->error_callback.error)
*error = bl->error_callback.error;
return 0; return 0;
} }
...@@ -2709,12 +2696,10 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr ...@@ -2709,12 +2696,10 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
//printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX); //printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
if (result == 0) { if (result == 0) {
result = update_progress(progress_allocation, bl, "wrote node"); result = update_progress(progress_allocation, bl, "wrote node");
if (result != 0)
bl->user_said_stop = result;
} }
if (result) if (result)
brt_loader_set_panic(bl, result); brt_loader_set_panic(bl, result, TRUE);
} }
CILK_END CILK_END
...@@ -2967,7 +2952,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -2967,7 +2952,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
blocknum_of_new_node = blocknum_of_new_node; blocknum_of_new_node = blocknum_of_new_node;
if (result != 0) if (result != 0)
brt_loader_set_panic(bl, result); brt_loader_set_panic(bl, result, TRUE);
} }
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) { static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) {
...@@ -3069,8 +3054,8 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s ...@@ -3069,8 +3054,8 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
cilk_sync; cilk_sync;
if (result == 0 && bl->panic) // pick up write_nonleaf_node errors if (result == 0) // pick up write_nonleaf_node errors
result = bl->panic_errno; result = brt_loader_get_error(&bl->error_callback);
// Now set things up for the next iteration. // Now set things up for the next iteration.
int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r; int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r;
......
...@@ -39,17 +39,10 @@ int toku_brt_loader_close (BRTLOADER bl, ...@@ -39,17 +39,10 @@ int toku_brt_loader_close (BRTLOADER bl,
int toku_brt_loader_abort(BRTLOADER bl, int toku_brt_loader_abort(BRTLOADER bl,
BOOL is_error); BOOL is_error);
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*));
// For test purposes only // For test purposes only
void toku_brtloader_set_size_factor (uint32_t factor); void toku_brtloader_set_size_factor (uint32_t factor);
// For test purposes only. (In production, the rowset size is determined by negotation with the cachetable for some memory. See #2613.)
uint64_t toku_brtloader_get_rowset_budget_for_testing (void);
int toku_brt_loader_finish_extractor(BRTLOADER bl); void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*));
int toku_brt_loader_get_error(BRTLOADER bl, int *loader_errno);
C_END C_END
......
...@@ -163,7 +163,6 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -163,7 +163,6 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
DB *dest_db = NULL; DB *dest_db = NULL;
struct brtloader_s bl = { struct brtloader_s bl = {
.panic = 0,
.temp_file_template = template, .temp_file_template = template,
.reserved_memory = 512*1024*1024, .reserved_memory = 512*1024*1024,
}; };
...@@ -244,6 +243,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -244,6 +243,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brtloader_set_os_fwrite(bad_fwrite); brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write); toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite); toku_set_func_pwrite(bad_pwrite);
brt_loader_set_error_function(&bl.error_callback, NULL, NULL);
brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL); brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est); r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est);
......
...@@ -83,7 +83,6 @@ static void test_write_dbfile (char *template, int n, char *output_name) { ...@@ -83,7 +83,6 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
DB *dest_db = NULL; DB *dest_db = NULL;
struct brtloader_s bl = { struct brtloader_s bl = {
.panic = 0,
.temp_file_template = template, .temp_file_template = template,
.reserved_memory = 512*1024*1024, .reserved_memory = 512*1024*1024,
}; };
......
...@@ -169,8 +169,7 @@ static void test_mergesort_row_array (void) { ...@@ -169,8 +169,7 @@ static void test_mergesort_row_array (void) {
} }
static void test_read_write_rows (char *template) { static void test_read_write_rows (char *template) {
struct brtloader_s bl = {.panic = 0, struct brtloader_s bl = { .temp_file_template = template};
.temp_file_template = template};
int r = brtloader_init_file_infos(&bl.file_infos); int r = brtloader_init_file_infos(&bl.file_infos);
CKERR(r); CKERR(r);
FIDX file; FIDX file;
...@@ -280,7 +279,6 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c ...@@ -280,7 +279,6 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
static void test_merge_files (const char *template, const char *output_name) { static void test_merge_files (const char *template, const char *output_name) {
DB *dest_db = NULL; DB *dest_db = NULL;
struct brtloader_s bl = { struct brtloader_s bl = {
.panic = 0,
.temp_file_template = template, .temp_file_template = template,
.reserved_memory = 512*1024*1024, .reserved_memory = 512*1024*1024,
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment