Commit 9596982f authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Fix #2643. (Make progress reporting work right again in the loader). close[t:2643]

git-svn-id: file:///svn/toku/tokudb@20470 c7de825b-a66e-492c-adef-691d508d4ae1
parent 811bd1c5
...@@ -190,8 +190,7 @@ int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int ...@@ -190,8 +190,7 @@ int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int
int merge_files (struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func, int progress_allocation, QUEUE); int merge_files (struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func, int progress_allocation, QUEUE);
CILK_BEGIN CILK_BEGIN
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func, int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func);
int progress_allocation);
int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *); int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
...@@ -200,8 +199,7 @@ CILK_END ...@@ -200,8 +199,7 @@ CILK_END
//int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor, int progress_allocation); //int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor, int progress_allocation);
int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation); int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation);
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func, int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func);
int progress_allocation);
// This is probably only for testing. // This is probably only for testing.
int toku_loader_write_brt_from_q_in_C (BRTLOADER bl, int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
......
...@@ -329,6 +329,8 @@ void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error) { ...@@ -329,6 +329,8 @@ void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error) {
brt_loader_destroy_error_callback(&bl->error_callback); brt_loader_destroy_error_callback(&bl->error_callback);
brt_loader_destroy_poll_callback(&bl->poll_callback); brt_loader_destroy_poll_callback(&bl->poll_callback);
//printf("Progress=%d/%d\n", bl->progress, PROGRESS_MAX);
toku_free(bl); toku_free(bl);
} }
...@@ -826,8 +828,7 @@ static int finish_primary_rows_internal (BRTLOADER bl) ...@@ -826,8 +828,7 @@ static int finish_primary_rows_internal (BRTLOADER bl)
cilk_for (int i = 0; i < bl->N; i++) { cilk_for (int i = 0; i < bl->N; i++) {
struct rowset *rows = &(bl->rows[i]); struct rowset *rows = &(bl->rows[i]);
//printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows); //printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows);
int progress_this_sort = 0; // fix? ra[i] = cilk_spawn sort_and_write_rows(*rows, &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]);
ra[i] = cilk_spawn sort_and_write_rows(*rows, &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i], progress_this_sort);
zero_rowset(rows); zero_rowset(rows);
} }
// Implicit cilk_sync after that cilk_for loop. // Implicit cilk_sync after that cilk_for loop.
...@@ -1017,8 +1018,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -1017,8 +1018,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
if (row_wont_fit(rows, skey.size + sval.size)) { if (row_wont_fit(rows, skey.size + sval.size)) {
//printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes); //printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
BL_TRACE(blt_extractor); BL_TRACE(blt_extractor);
int progress_this_sort = 0; // fix? int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
// If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function. // If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
BL_TRACE(blt_sort_and_write_rows); BL_TRACE(blt_sort_and_write_rows);
init_rowset(rows, memory_per_rowset(bl)); // we passed the contents of rows to sort_and_write_rows. init_rowset(rows, memory_per_rowset(bl)); // we passed the contents of rows to sort_and_write_rows.
...@@ -1371,8 +1371,7 @@ static int update_progress (int N, ...@@ -1371,8 +1371,7 @@ static int update_progress (int N,
} }
CILK_BEGIN CILK_BEGIN
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare)
int progress_allocation)
/* Effect: Given a rowset, sort it and write it to a temporary file. /* Effect: Given a rowset, sort it and write it to a temporary file.
* Arguments: * Arguments:
* rows the rowset * rows the rowset
...@@ -1382,6 +1381,7 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER ...@@ -1382,6 +1381,7 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
* compare The comparison function. * compare The comparison function.
* Returns 0 on success, otherwise an error number. * Returns 0 on success, otherwise an error number.
* Destroy the rowset after finishing it. * Destroy the rowset after finishing it.
* Note: There is no sense in trying to calculate progress by this function since it's done concurrently with the loader->put operation.
*/ */
{ {
//printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation); //printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
...@@ -1397,11 +1397,6 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER ...@@ -1397,11 +1397,6 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
if (r != 0) result = r; if (r != 0) result = r;
//bl_time_t after_sort = bl_time_now(); //bl_time_t after_sort = bl_time_now();
if (result == 0) {
r = update_progress(progress_allocation/2, bl, "sorted rows");
progress_allocation -= progress_allocation/2;
if (r != 0) result = r;
}
if (result == 0) { if (result == 0) {
r = extend_fileset(bl, fs, &sfile); r = extend_fileset(bl, fs, &sfile);
...@@ -1429,22 +1424,17 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER ...@@ -1429,22 +1424,17 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
destroy_rowset(&rows); destroy_rowset(&rows);
//bl_time_t after_write = bl_time_now(); //bl_time_t after_write = bl_time_now();
if (result == 0) {
r = update_progress(progress_allocation, bl, "wrote sorted");
if (r != 0) result = r;
}
return result; return result;
} }
CILK_END CILK_END
// C function for testing sort_and_write_rows // C function for testing sort_and_write_rows
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare) {
int progress_allocation) {
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
return cilk::run(sort_and_write_rows, *rows, fs, bl, which_db, dest_db, compare, progress_allocation); return cilk::run(sort_and_write_rows, *rows, fs, bl, which_db, dest_db, compare);
#else #else
return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare, progress_allocation); return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare);
#endif #endif
} }
...@@ -1705,6 +1695,7 @@ int merge_files (struct merge_fileset *fs, ...@@ -1705,6 +1695,7 @@ int merge_files (struct merge_fileset *fs,
while (fs->n_temp_files > 0) { while (fs->n_temp_files > 0) {
int progress_allocation_for_this_pass = progress_allocation/n_passes_left; int progress_allocation_for_this_pass = progress_allocation/n_passes_left;
progress_allocation -= progress_allocation_for_this_pass; progress_allocation -= progress_allocation_for_this_pass;
//printf("%s:%d n_passes_left=%d progress_allocation_for_this_pass=%d\n", __FILE__, __LINE__, n_passes_left, progress_allocation_for_this_pass);
invariant(fs->n_temp_files>0); invariant(fs->n_temp_files>0);
struct merge_fileset next_file_set; struct merge_fileset next_file_set;
...@@ -1716,9 +1707,9 @@ int merge_files (struct merge_fileset *fs, ...@@ -1716,9 +1707,9 @@ int merge_files (struct merge_fileset *fs,
// We are about to do n_to_merge/n_temp_files of the remaining for this pass. // We are about to do n_to_merge/n_temp_files of the remaining for this pass.
int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files; int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files;
//printf("%s:%d progress_allocation_for_this_subpass=%d n_temp_files=%d\n", __FILE__, __LINE__, progress_allocation_for_this_subpass, fs->n_temp_files);
progress_allocation_for_this_pass -= progress_allocation_for_this_subpass; progress_allocation_for_this_pass -= progress_allocation_for_this_subpass;
//printf("%s:%d merging\n", __FILE__, __LINE__); //printf("%s:%d merging\n", __FILE__, __LINE__);
FIDX merged_data = FIDX_NULL; FIDX merged_data = FIDX_NULL;
...@@ -1776,10 +1767,8 @@ int merge_files (struct merge_fileset *fs, ...@@ -1776,10 +1767,8 @@ int merge_files (struct merge_fileset *fs,
// Update the progress // Update the progress
n_passes_left--; n_passes_left--;
{
int r = update_progress(progress_allocation_for_this_pass, bl, "merging files"); if (result==0) { invariant(progress_allocation_for_this_pass==0); }
if (r!=0 && result==0) result = r;
}
if (result!=0) break; if (result!=0) break;
} }
...@@ -1787,6 +1776,7 @@ int merge_files (struct merge_fileset *fs, ...@@ -1787,6 +1776,7 @@ int merge_files (struct merge_fileset *fs,
int r = queue_eof(output_q); int r = queue_eof(output_q);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
// It's conceivable that the progress_allocation could be nonzero (for example if bl->N==0)
{ {
int r = update_progress(progress_allocation, bl, "did merge_files"); int r = update_progress(progress_allocation, bl, "did merge_files");
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
...@@ -2471,9 +2461,8 @@ static int toku_brt_loader_close_internal (BRTLOADER bl) ...@@ -2471,9 +2461,8 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
toku_free((void*)bl->new_fnames_in_env[i]); toku_free((void*)bl->new_fnames_in_env[i]);
bl->new_fnames_in_env[i] = NULL; bl->new_fnames_in_env[i] = NULL;
invariant(0<=bl->progress && bl->progress <= PROGRESS_MAX); invariant(0<=bl->progress && bl->progress <= PROGRESS_MAX);
result = update_progress(0, bl, "did index");
if (result) goto error;
} }
if (result==0) invariant(remaining_progress==0);
} }
invariant(bl->file_infos.n_files_open == 0); invariant(bl->file_infos.n_files_open == 0);
invariant(bl->file_infos.n_files_extant == 0); invariant(bl->file_infos.n_files_extant == 0);
......
...@@ -208,17 +208,30 @@ static void check_results(DB **dbs) ...@@ -208,17 +208,30 @@ static void check_results(DB **dbs)
static void *expect_poll_void = &expect_poll_void; static void *expect_poll_void = &expect_poll_void;
static uint64_t poll_count=0; static uint64_t poll_count=0;
static uint64_t bomb_after_poll_count=UINT64_MAX; static uint64_t bomb_after_poll_count=UINT64_MAX;
static struct progress_info {
double time;
double progress;
} *progress_infos=NULL;
static int progress_infos_count=0;
static int progress_infos_limit=0;
// timing
static BOOL did_start=FALSE;
static struct timeval start;
static int poll_function (void *extra, float progress) { static int poll_function (void *extra, float progress) {
if (0) { if (verbose>=2) {
static int did_one=0; assert(did_start);
static struct timeval start;
struct timeval now; struct timeval now;
gettimeofday(&now, 0); gettimeofday(&now, 0);
if (!did_one) { double elapsed = now.tv_sec - start.tv_sec + 1e-6*(now.tv_usec - start.tv_usec);
start=now; printf("Progress: %6.6fs %5.1f%%\n", elapsed, progress*100);
did_one=1; if (progress_infos_count>=progress_infos_limit) {
progress_infos_limit = 2*progress_infos_limit + 1;
XREALLOC_N(progress_infos_limit, progress_infos);
} }
printf("%6.6f %5.1f%%\n", now.tv_sec - start.tv_sec + 1e-6*(now.tv_usec - start.tv_usec), progress*100); progress_infos[progress_infos_count++] = (struct progress_info){elapsed, progress};
} }
assert(extra==expect_poll_void); assert(extra==expect_poll_void);
assert(0.0<=progress && progress<=1.0); assert(0.0<=progress && progress<=1.0);
...@@ -282,6 +295,9 @@ static void test_loader(DB **dbs) ...@@ -282,6 +295,9 @@ static void test_loader(DB **dbs)
int n = count_temp(env->i->real_data_dir); int n = count_temp(env->i->real_data_dir);
if (verbose) printf("Num temp files = %d\n", n); if (verbose) printf("Num temp files = %d\n", n);
did_start = TRUE;
gettimeofday(&start, 0);
// close the loader // close the loader
printf("%9.6fs closing\n", elapsed_time()); printf("%9.6fs closing\n", elapsed_time());
r = loader->close(loader); r = loader->close(loader);
...@@ -374,6 +390,17 @@ int test_main(int argc, char * const *argv) { ...@@ -374,6 +390,17 @@ int test_main(int argc, char * const *argv) {
do_args(argc, argv); do_args(argc, argv);
run_test(); run_test();
if (free_me) toku_free(free_me); if (free_me) toku_free(free_me);
if (progress_infos) {
if (verbose>=2) {
double ratio=progress_infos[progress_infos_count-1].time/progress_infos[progress_infos_count-1].progress;
printf("Progress ratios:\n");
for (int i=0; i<progress_infos_count; i++) {
printf(" %5.3f\n", (progress_infos[i].time/progress_infos[i].progress)/ratio);
}
}
toku_free(progress_infos);
}
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment