Commit 5c5a2043 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge in

 * bradley's benchmarking changes, and
 * the big speedup in the sort-and-write-rows (slowness caused the lock in fidx2file).
Refs #2571. [t:2571]
{{{
svn merge -r 19909:20027 https://svn.tokutek.com/tokudb/toku/tokudb.2571
}}}
.


git-svn-id: file:///svn/toku/tokudb@20029 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9d0ff467
...@@ -103,6 +103,7 @@ endif ...@@ -103,6 +103,7 @@ endif
NEWBRT_O_FILES += brtloader.$(OEXT) NEWBRT_O_FILES += brtloader.$(OEXT)
brtloader.$(OEXT): $(DEPEND_COMPILE)
ifeq ($(BRTLOADER),cilk) ifeq ($(BRTLOADER),cilk)
brtloader.$(OEXT): brtloader.c brtloader.$(OEXT): brtloader.c
$(CILKPP) -DTOKU_ALLOW_DEPRECATED $(CILKFLAGS) -c $< $(CILKPP) -DTOKU_ALLOW_DEPRECATED $(CILKFLAGS) -c $<
......
...@@ -37,6 +37,7 @@ typedef struct fidx { int idx; } FIDX; ...@@ -37,6 +37,7 @@ typedef struct fidx { int idx; } FIDX;
static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1}; static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1};
static int fidx_is_null (const FIDX f) __attribute__((__unused__)); static int fidx_is_null (const FIDX f) __attribute__((__unused__));
static int fidx_is_null (const FIDX f) { return f.idx==-1; } static int fidx_is_null (const FIDX f) { return f.idx==-1; }
FILE *toku_bl_fidx2file (BRTLOADER bl, FIDX i);
int brtloader_open_temp_file (BRTLOADER bl, FIDX*file_idx); int brtloader_open_temp_file (BRTLOADER bl, FIDX*file_idx);
...@@ -56,7 +57,7 @@ int init_rowset (struct rowset *rows); ...@@ -56,7 +57,7 @@ int init_rowset (struct rowset *rows);
void destroy_rowset (struct rowset *rows); void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val); void add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADER bl); int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl); int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl);
struct merge_fileset { struct merge_fileset {
...@@ -183,7 +184,7 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_ ...@@ -183,7 +184,7 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
extern "Cilk++" { extern "Cilk++" {
#endif #endif
int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func, int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func,
int progress_allocation); int progress_allocation);
int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *); int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
......
...@@ -278,11 +278,11 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -278,11 +278,11 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
BRTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC) BRTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC)
if (!bl) return errno; if (!bl) return errno;
BL_TRACE("calibrate begin");
#if BL_DO_TRACE #if BL_DO_TRACE
BL_TRACE(blt_calibrate_begin);
sleep(1); sleep(1);
BL_TRACE(blt_calibrate_done);
#endif #endif
BL_TRACE("calibrate done");
bl->panic = 0; bl->panic = 0;
bl->panic_errno = 0; bl->panic_errno = 0;
...@@ -336,7 +336,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -336,7 +336,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bl->extractor_live = TRUE; bl->extractor_live = TRUE;
*blp = bl; *blp = bl;
BL_TRACE("open"); BL_TRACE(blt_open);
return 0; return 0;
} }
...@@ -346,7 +346,8 @@ static void brt_loader_set_panic(BRTLOADER bl, int error) { ...@@ -346,7 +346,8 @@ static void brt_loader_set_panic(BRTLOADER bl, int error) {
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL); brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
} }
static FILE *bl_fidx2file (BRTLOADER bl, FIDX i) { // One of the tests uses this.
FILE *toku_bl_fidx2file (BRTLOADER bl, FIDX i) {
{ int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); assert(r2==0); } { int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); assert(r2==0); }
assert(i.idx >=0 && i.idx < bl->file_infos.n_files); assert(i.idx >=0 && i.idx < bl->file_infos.n_files);
assert(bl->file_infos.file_infos[i.idx].is_open); assert(bl->file_infos.file_infos[i.idx].is_open);
...@@ -355,7 +356,7 @@ static FILE *bl_fidx2file (BRTLOADER bl, FIDX i) { ...@@ -355,7 +356,7 @@ static FILE *bl_fidx2file (BRTLOADER bl, FIDX i) {
return result; return result;
} }
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOADER bl) static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl)
/* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number. /* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number.
* Arguments: * Arguments:
* ptr the data to be writen. * ptr the data to be writen.
...@@ -366,7 +367,6 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD ...@@ -366,7 +367,6 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD
* Return value: 0 on success, an error number otherwise. * Return value: 0 on success, an error number otherwise.
*/ */
{ {
FILE *stream = bl_fidx2file(bl, streami);
size_t r = do_fwrite(ptr, size, nmemb, stream); size_t r = do_fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) { if (r!=nmemb) {
int e; int e;
...@@ -392,7 +392,7 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD ...@@ -392,7 +392,7 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD
* Return value: 0 on success, an error number otherwise. * Return value: 0 on success, an error number otherwise.
*/ */
{ {
FILE *stream = bl_fidx2file(bl, streami); FILE *stream = toku_bl_fidx2file(bl, streami);
size_t r = fread(ptr, size, nmemb, stream); size_t r = fread(ptr, size, nmemb, stream);
if (r==0) { if (r==0) {
if (feof(stream)) return EOF; if (feof(stream)) return EOF;
...@@ -409,7 +409,7 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD ...@@ -409,7 +409,7 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD
} }
} }
static int bl_write_dbt (DBT *dbt, FIDX datafile, uint64_t *dataoff, BRTLOADER bl) static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, BRTLOADER bl)
{ {
int r; int r;
int dlen = dbt->size; int dlen = dbt->size;
...@@ -437,7 +437,7 @@ static int bl_read_dbt (/*in*/DBT *dbt, FIDX datafile, BRTLOADER bl) ...@@ -437,7 +437,7 @@ static int bl_read_dbt (/*in*/DBT *dbt, FIDX datafile, BRTLOADER bl)
return 0; return 0;
} }
int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADER bl) int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *dataoff, BRTLOADER bl)
/* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date. /* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date.
* Arguments: * Arguments:
* key, val write these. * key, val write these.
...@@ -451,8 +451,8 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADE ...@@ -451,8 +451,8 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADE
//int vlen = val->size; //int vlen = val->size;
int r; int r;
// we have a chance to handle the errors because when we close we can delete all the files. // we have a chance to handle the errors because when we close we can delete all the files.
if ((r=bl_write_dbt(key, data, dataoff, bl))) return r; if ((r=bl_write_dbt(key, dataf, dataoff, bl))) return r;
if ((r=bl_write_dbt(val, data, dataoff, bl))) return r; if ((r=bl_write_dbt(val, dataf, dataoff, bl))) return r;
{ int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); assert(r2==0); } { int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); assert(r2==0); }
bl->file_infos.file_infos[data.idx].n_rows++; bl->file_infos.file_infos[data.idx].n_rows++;
{ int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); assert(r2==0); } { int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); assert(r2==0); }
...@@ -527,13 +527,6 @@ static int row_wont_fit (struct rowset *rows, size_t size) ...@@ -527,13 +527,6 @@ static int row_wont_fit (struct rowset *rows, size_t size)
return (data_buffer_limit < rows->n_bytes + size); return (data_buffer_limit < rows->n_bytes + size);
} }
static void reset_rows (struct rowset *rows)
/* Effect: Reset the rows to an empty collection (but reuse any allocated space) */
{
rows->n_bytes = 0;
rows->n_rows = 0;
}
void add_row (struct rowset *rows, DBT *key, DBT *val) void add_row (struct rowset *rows, DBT *key, DBT *val)
/* Effect: add a row to a collection. */ /* Effect: add a row to a collection. */
{ {
...@@ -563,17 +556,20 @@ void add_row (struct rowset *rows, DBT *key, DBT *val) ...@@ -563,17 +556,20 @@ void add_row (struct rowset *rows, DBT *key, DBT *val)
static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset); static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset);
CILK_BEGIN CILK_BEGIN
static int finish_primary_rows_internal (BRTLOADER bl) { static int finish_primary_rows_internal (BRTLOADER bl)
// now we have been asked to finish up. // now we have been asked to finish up.
// Be sure to destroy the rowsets.
{
int ra[bl->N]; int ra[bl->N];
cilk_for (int i = 0; i < bl->N; i++) { cilk_for (int i = 0; i < bl->N; i++) {
struct rowset *rows = &(bl->rows[i]); struct rowset *rows = &(bl->rows[i]);
//printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows); //printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows);
int progress_this_sort = 0; // fix? int progress_this_sort = 0; // fix?
ra[i] = /*cilk_spawn*/sort_and_write_rows(rows, &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i], progress_this_sort); ra[i] = cilk_spawn sort_and_write_rows(*rows, &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i], progress_this_sort);
zero_rowset(rows);
} }
// cilk_sync; // Implicit cilk_sync after that cilk_for loop.
int r = 0; int r = 0;
for (int i = 0; i < bl->N; i++) for (int i = 0; i < bl->N; i++)
...@@ -593,14 +589,14 @@ static int finish_primary_rows (BRTLOADER bl) { ...@@ -593,14 +589,14 @@ static int finish_primary_rows (BRTLOADER bl) {
} }
static void* extractor_thread (void *blv) { static void* extractor_thread (void *blv) {
BL_TRACE("extractor_init"); BL_TRACE(blt_extractor_init);
BRTLOADER bl = (BRTLOADER)blv; BRTLOADER bl = (BRTLOADER)blv;
while (1) { while (1) {
void *item; void *item;
{ {
BL_TRACE("extractor"); BL_TRACE(blt_extractor);
int r = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL); int r = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
BL_TRACE("extractor_deq"); BL_TRACE(blt_extract_deq);
if (r==EOF) break; if (r==EOF) break;
assert(r==0); // other errors are arbitrarily bad. assert(r==0); // other errors are arbitrarily bad.
} }
...@@ -623,10 +619,7 @@ static void* extractor_thread (void *blv) { ...@@ -623,10 +619,7 @@ static void* extractor_thread (void *blv) {
int r = finish_primary_rows(bl); int r = finish_primary_rows(bl);
r = r; // RFP 2578 assert(r==0); // !!! should deal with this. r = r; // RFP 2578 assert(r==0); // !!! should deal with this.
for (int i=0; i<bl->N; i++) { BL_TRACE(blt_extractor);
destroy_rowset(&bl->rows[i]); // destroy the rowset here so that if we spawn the sort_and_write, there won't be a race on destroying the rows (thanks to the cilk_sync 2 lines up)
}
BL_TRACE("extractor");
return 0; return 0;
} }
...@@ -648,9 +641,9 @@ static int loader_do_put(BRTLOADER bl, ...@@ -648,9 +641,9 @@ static int loader_do_put(BRTLOADER bl,
if (row_wont_fit(&bl->primary_rowset, 0)) { if (row_wont_fit(&bl->primary_rowset, 0)) {
// queue the rows for further processing by the extractor thread. // queue the rows for further processing by the extractor thread.
//printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows); //printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
BL_TRACE("do_put"); BL_TRACE(blt_do_put);
enqueue_for_extraction(bl); enqueue_for_extraction(bl);
BL_TRACE("extract_enq"); BL_TRACE(blt_extract_enq);
init_rowset(&bl->primary_rowset); init_rowset(&bl->primary_rowset);
} }
return 0; return 0;
...@@ -659,10 +652,10 @@ static int loader_do_put(BRTLOADER bl, ...@@ -659,10 +652,10 @@ static int loader_do_put(BRTLOADER bl,
static int finish_extractor (BRTLOADER bl) { static int finish_extractor (BRTLOADER bl) {
//printf("%s:%d now finishing extraction\n", __FILE__, __LINE__); //printf("%s:%d now finishing extraction\n", __FILE__, __LINE__);
BL_TRACE("do_put"); BL_TRACE(blt_do_put);
if (bl->primary_rowset.n_rows>0) { if (bl->primary_rowset.n_rows>0) {
enqueue_for_extraction(bl); enqueue_for_extraction(bl);
BL_TRACE("extract_enqeue"); BL_TRACE(blt_extract_enq);
} else { } else {
destroy_rowset(&bl->primary_rowset); destroy_rowset(&bl->primary_rowset);
} }
...@@ -677,7 +670,7 @@ static int finish_extractor (BRTLOADER bl) { ...@@ -677,7 +670,7 @@ static int finish_extractor (BRTLOADER bl) {
int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval); int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval);
assert(r==0 && toku_pthread_retval==NULL); assert(r==0 && toku_pthread_retval==NULL);
bl->extractor_live = FALSE; bl->extractor_live = FALSE;
BL_TRACE("join_on_extractor"); BL_TRACE(blt_join_on_extractor);
} }
{ {
int r = queue_destroy(bl->primary_rowset_queue); int r = queue_destroy(bl->primary_rowset_queue);
...@@ -702,7 +695,6 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -702,7 +695,6 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
// if FLUSH is true then write all the buffered rows out. // if FLUSH is true then write all the buffered rows out.
// if primary_rowset is NULL then treat it as empty. // if primary_rowset is NULL then treat it as empty.
{ {
BL_TRACE("cilk_call");
int error_count = 0; int error_count = 0;
int *MALLOC_N(bl->N, error_codes); int *MALLOC_N(bl->N, error_codes);
...@@ -741,10 +733,10 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -741,10 +733,10 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
if (row_wont_fit(rows, skey.size + sval.size)) { if (row_wont_fit(rows, skey.size + sval.size)) {
//printf("rows.n_rows=%ld\n", rows.n_rows); //printf("rows.n_rows=%ld\n", rows.n_rows);
BL_TRACE("puts"); BL_TRACE(blt_extractor);
int progress_this_sort = 0; // fix? int progress_this_sort = 0; // fix?
int r = sort_and_write_rows(rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however. int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
BL_TRACE("sort_and_write_rows"); BL_TRACE(blt_sort_and_write_rows);
if (r!=0) { if (r!=0) {
error_codes[i] = r; error_codes[i] = r;
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
...@@ -754,7 +746,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -754,7 +746,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
#endif #endif
break; break;
} }
reset_rows(rows); init_rowset(rows); // we passed the contents of rows to sort_and_write_rows.
} }
add_row(rows, &skey, &sval); add_row(rows, &skey, &sval);
...@@ -789,20 +781,20 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -789,20 +781,20 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
assert(0); // could not find the error code. This is an error in the program if we get here. assert(0); // could not find the error code. This is an error in the program if we get here.
} }
toku_free(error_codes); toku_free(error_codes);
BL_TRACE("extractor"); BL_TRACE(blt_extractor);
return r; return r;
} }
CILK_END CILK_END
static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset) { static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset) {
BL_TRACE("extractor"); BL_TRACE(blt_extractor);
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
int r = cilk::run(process_primary_rows_internal, bl, primary_rowset); int r = cilk::run(process_primary_rows_internal, bl, primary_rowset);
BL_TRACE("cilk_return");
return r;
#else #else
return process_primary_rows_internal (bl, primary_rowset); int r = process_primary_rows_internal (bl, primary_rowset);
#endif #endif
BL_TRACE(blt_extractor);
return r;
} }
...@@ -1090,7 +1082,7 @@ static int update_progress (int N, ...@@ -1090,7 +1082,7 @@ static int update_progress (int N,
} }
CILK_BEGIN CILK_BEGIN
int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare,
int progress_allocation) int progress_allocation)
/* Effect: Given a rowset, sort it and write it to a temporary file. /* Effect: Given a rowset, sort it and write it to a temporary file.
* Arguments: * Arguments:
...@@ -1100,32 +1092,38 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE ...@@ -1100,32 +1092,38 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE
* dest_db the DB, needed for the comparison function. * dest_db the DB, needed for the comparison function.
* compare The comparison function. * compare The comparison function.
* Returns 0 on success, otherwise an error number. * Returns 0 on success, otherwise an error number.
* Destroy the rowset after finishing it.
*/ */
{ {
//printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation); //printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
FIDX sfile; FIDX sfile = FIDX_NULL;
u_int64_t soffset=0; u_int64_t soffset=0;
// TODO: erase the files, and deal with all the cleanup on error paths // TODO: erase the files, and deal with all the cleanup on error paths
//printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows); //printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows);
int r = sort_rows(rows, which_db, dest_db, compare, bl); //bl_time_t before_sort = bl_time_now();
int r = sort_rows(&rows, which_db, dest_db, compare, bl);
if (r!=0) { if (r!=0) {
return r; return r;
} }
//bl_time_t after_sort = bl_time_now();
r = update_progress(progress_allocation/2, bl, "sorted rows"); r = update_progress(progress_allocation/2, bl, "sorted rows");
progress_allocation -= progress_allocation/2; progress_allocation -= progress_allocation/2;
if (r!=0) return r; if (r!=0) return r;
r = extend_fileset(bl, fs, &sfile); r = extend_fileset(bl, fs, &sfile);
FILE *sstream = toku_bl_fidx2file(bl, sfile);
if (r!=0) return r; if (r!=0) return r;
for (size_t i=0; i<rows->n_rows; i++) { for (size_t i=0; i<rows.n_rows; i++) {
DBT skey; memset(&skey, 0, sizeof skey); skey.data = rows->data + rows->rows[i].off; skey.size=rows->rows[i].klen; DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
DBT sval; memset(&sval, 0, sizeof sval); sval.data = rows->data + rows->rows[i].off + rows->rows[i].klen; sval.size=rows->rows[i].vlen; DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
r = loader_write_row(&skey, &sval, sfile, &soffset, bl); r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
if (r!=0) return r; if (r!=0) return r;
} }
r = brtloader_fi_close(&bl->file_infos, sfile); if (r!=0) return r; r = brtloader_fi_close(&bl->file_infos, sfile); if (r!=0) return r;
destroy_rowset(&rows);
//bl_time_t after_write = bl_time_now();
return update_progress(progress_allocation, bl, "wrote sorted"); return update_progress(progress_allocation, bl, "wrote sorted");
} }
CILK_END CILK_END
...@@ -1134,9 +1132,9 @@ CILK_END ...@@ -1134,9 +1132,9 @@ CILK_END
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare,
int progress_allocation) { int progress_allocation) {
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
return cilk::run(sort_and_write_rows, rows, fs, bl, which_db, dest_db, compare, progress_allocation); return cilk::run(sort_and_write_rows, *rows, fs, bl, which_db, dest_db, compare, progress_allocation);
#else #else
return sort_and_write_rows (rows, fs, bl, which_db, dest_db, compare, progress_allocation); return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare, progress_allocation);
#endif #endif
} }
...@@ -1158,6 +1156,8 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1158,6 +1156,8 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
* Return value: 0 on success, otherwise an error number. * Return value: 0 on success, otherwise an error number.
*/ */
{ {
FILE *dest_stream = to_q ? NULL : toku_bl_fidx2file(bl, dest_data);
//printf(" merge_some_files progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation); //printf(" merge_some_files progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
DBT keys[n_sources]; DBT keys[n_sources];
DBT vals[n_sources]; DBT vals[n_sources];
...@@ -1179,7 +1179,9 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1179,7 +1179,9 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
u_int64_t n_rows = 0; u_int64_t n_rows = 0;
// load pqueue with first value from each source // load pqueue with first value from each source
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
BL_TRACE_QUIET(blt_do_i);
int r = loader_read_row(srcs_data[i], &keys[i], &vals[i], bl); int r = loader_read_row(srcs_data[i], &keys[i], &vals[i], bl);
BL_TRACE_QUIET(blt_read_row);
if (r==EOF) continue; // if the file is empty, don't initialize the pqueue. if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
if (r!=0) return r; if (r!=0) return r;
...@@ -1233,7 +1235,9 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1233,7 +1235,9 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
} }
if (to_q) { if (to_q) {
if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) { if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) {
BL_TRACE(blt_do_i);
r = queue_enq(q, (void*)output_rowset, 1, NULL); r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq);
assert(r==0); assert(r==0);
MALLOC(output_rowset); MALLOC(output_rowset);
assert(output_rowset); assert(output_rowset);
...@@ -1243,20 +1247,22 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1243,20 +1247,22 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
add_row(output_rowset, &keys[mini], &vals[mini]); add_row(output_rowset, &keys[mini], &vals[mini]);
} else { } else {
// write it to the dest file // write it to the dest file
r = loader_write_row(&keys[mini], &vals[mini], dest_data, &dataoff[mini], bl); r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
if (r!=0) return r; if (r!=0) return r;
} }
{ {
// read next row from file that just sourced min value // read next row from file that just sourced min value
BL_TRACE_QUIET(blt_do_i);
r = loader_read_row(srcs_data[mini], &keys[mini], &vals[mini], bl); r = loader_read_row(srcs_data[mini], &keys[mini], &vals[mini], bl);
BL_TRACE_QUIET(blt_read_row);
if (r!=0) { if (r!=0) {
if (feof(bl_fidx2file(bl, srcs_data[mini]))) { if (feof(toku_bl_fidx2file(bl, srcs_data[mini]))) {
// on feof, queue size permanently smaller // on feof, queue size permanently smaller
toku_free(keys[mini].data); toku_free(keys[mini].data);
toku_free(vals[mini].data); toku_free(vals[mini].data);
} else { } else {
r = ferror(bl_fidx2file(bl, srcs_data[mini])); r = ferror(toku_bl_fidx2file(bl, srcs_data[mini]));
assert(r!=0); assert(r!=0);
return r; return r;
} }
...@@ -1290,8 +1296,10 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1290,8 +1296,10 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
} }
} }
if (to_q) { if (to_q) {
int r = queue_enq(q, (void*)output_rowset, 1, NULL); BL_TRACE(blt_do_i);
assert(r==0); int r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq);
assert(r==0);
} }
pqueue_free(pq); pqueue_free(pq);
toku_free(pq_nodes); toku_free(pq_nodes);
...@@ -1687,6 +1695,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -1687,6 +1695,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
int64_t n_pivots=0; // number of pivots in pivots_file int64_t n_pivots=0; // number of pivots in pivots_file
FIDX pivots_file; // the file FIDX pivots_file; // the file
brtloader_open_temp_file (bl, &pivots_file); brtloader_open_temp_file (bl, &pivots_file);
FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
// The blocks_array will contain all the block numbers that correspond to the pivots. Generally there should be one more block than pivot. // The blocks_array will contain all the block numbers that correspond to the pivots. Generally there should be one more block than pivot.
struct subtrees_info sts; struct subtrees_info sts;
...@@ -1731,7 +1740,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -1731,7 +1740,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
while (1) { while (1) {
void *item; void *item;
{ {
BL_TRACE(blt_fractal_thread);
int rr = queue_deq(q, &item, NULL, NULL); int rr = queue_deq(q, &item, NULL, NULL);
BL_TRACE(blt_fractal_deq);
if (rr == EOF) break; if (rr == EOF) break;
if (rr != 0) { if (rr != 0) {
r=rr; goto error; r=rr; goto error;
...@@ -1752,7 +1763,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -1752,7 +1763,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
allocate_node(&sts, lblock, est, lbuf->local_fingerprint); allocate_node(&sts, lblock, est, lbuf->local_fingerprint);
n_pivots++; n_pivots++;
if ((r=bl_write_dbt(&key, pivots_file, NULL, bl))) { if ((r=bl_write_dbt(&key, pivots_stream, NULL, bl))) {
// how to handle errors in fractal thread? // how to handle errors in fractal thread?
assert(r==0); // this always fails. assert(r==0); // this always fails.
return r; return r;
...@@ -1787,7 +1798,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -1787,7 +1798,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
{ {
DBT key=make_dbt(0,0); // must write an extra DBT into the pivots file. DBT key=make_dbt(0,0); // must write an extra DBT into the pivots file.
r = bl_write_dbt(&key, pivots_file, NULL, bl); r=bl_write_dbt(&key, pivots_stream, NULL, bl);
if (r) goto error; // RFP2578 if (r) goto error; // RFP2578
} }
...@@ -1840,6 +1851,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -1840,6 +1851,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
error: error:
subtrees_info_destroy(&sts); subtrees_info_destroy(&sts);
dbout_destroy(&out); dbout_destroy(&out);
BL_TRACE(blt_fractal_thread);
return r; // RFP2578 return r; // RFP2578
} }
...@@ -1861,7 +1873,7 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl, ...@@ -1861,7 +1873,7 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
static void* fractal_thread (void *ftav) { static void* fractal_thread (void *ftav) {
BL_TRACE("start_fractal_thread"); BL_TRACE(blt_start_fractal_thread);
struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav; struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q); int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q);
...@@ -1869,7 +1881,6 @@ static void* fractal_thread (void *ftav) { ...@@ -1869,7 +1881,6 @@ static void* fractal_thread (void *ftav) {
int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q); int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q);
#endif #endif
fta->errno_result = r; fta->errno_result = r;
BL_TRACE("fractal_thread");
return NULL; return NULL;
} }
...@@ -1886,7 +1897,7 @@ static int loader_do_i (BRTLOADER bl, ...@@ -1886,7 +1897,7 @@ static int loader_do_i (BRTLOADER bl,
//printf("doing i use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation); //printf("doing i use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
struct merge_fileset *fs = &(bl->fs[which_db]); struct merge_fileset *fs = &(bl->fs[which_db]);
struct rowset *rows = &(bl->rows[which_db]); struct rowset *rows = &(bl->rows[which_db]);
assert(rows->data==NULL); // the rows should be all cleaned up alrea assert(rows->data==NULL); // the rows should be all cleaned up already
// a better allocation would be to figure out roughly how many merge passes we'll need. // a better allocation would be to figure out roughly how many merge passes we'll need.
int allocation_for_merge = (2*progress_allocation)/3; int allocation_for_merge = (2*progress_allocation)/3;
...@@ -1901,6 +1912,7 @@ static int loader_do_i (BRTLOADER bl, ...@@ -1901,6 +1912,7 @@ static int loader_do_i (BRTLOADER bl,
int fd = open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode); int fd = open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode);
assert(fd>=0); assert(fd>=0);
// This structure must stay live until the join below.
struct fractal_thread_args fta = {bl, struct fractal_thread_args fta = {bl,
descriptor, descriptor,
fd, fd,
...@@ -1918,22 +1930,25 @@ static int loader_do_i (BRTLOADER bl, ...@@ -1918,22 +1930,25 @@ static int loader_do_i (BRTLOADER bl,
assert(bl->fractal_threads_live[which_db]==FALSE); assert(bl->fractal_threads_live[which_db]==FALSE);
bl->fractal_threads_live[which_db] = TRUE; bl->fractal_threads_live[which_db] = TRUE;
r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]); r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
void *toku_pthread_retval; {
BL_TRACE("do_i"); void *toku_pthread_retval;
int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval); BL_TRACE(blt_do_i);
BL_TRACE("join_on_fractal"); int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
assert(r2==0 && toku_pthread_retval==NULL); assert(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug but that struct into a C block statement.
assert(bl->fractal_threads_live[which_db]); BL_TRACE(blt_join_on_fractal);
bl->fractal_threads_live[which_db] = FALSE; assert(r2==0 && toku_pthread_retval==NULL);
if (r == 0) r = fta.errno_result; assert(bl->fractal_threads_live[which_db]);
} bl->fractal_threads_live[which_db] = FALSE;
if (r == 0) r = fta.errno_result;
{ }
int r2 = queue_destroy(bl->fractal_queues[which_db]);
assert(r2==0); {
bl->fractal_queues[which_db]=NULL; int r2 = queue_destroy(bl->fractal_queues[which_db]);
assert(r2==0);
bl->fractal_queues[which_db]=NULL;
}
} }
error: // this is the cleanup code. Even if r==0 (no error) we fall through to here. error: // this is the cleanup code. Even if r==0 (no error) we fall through to here.
...@@ -1941,7 +1956,7 @@ static int loader_do_i (BRTLOADER bl, ...@@ -1941,7 +1956,7 @@ static int loader_do_i (BRTLOADER bl,
toku_free(rows->data); rows->data = NULL; toku_free(rows->data); rows->data = NULL;
toku_free(rows->rows); rows->rows = NULL; toku_free(rows->rows); rows->rows = NULL;
toku_free(fs->data_fidxs); fs->data_fidxs = NULL; toku_free(fs->data_fidxs); fs->data_fidxs = NULL;
BL_TRACE("do_i"); BL_TRACE(blt_do_i);
return r; return r;
} }
...@@ -1949,7 +1964,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl) ...@@ -1949,7 +1964,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
/* Effect: Close the bulk loader. /* Effect: Close the bulk loader.
* Return all the file descriptors in the array fds. */ * Return all the file descriptors in the array fds. */
{ {
BL_TRACE("puts"); BL_TRACE(blt_do_put);
int result = 0; int result = 0;
int remaining_progress = PROGRESS_MAX; int remaining_progress = PROGRESS_MAX;
// RFP cilk_for, no breaks in the loop // RFP cilk_for, no breaks in the loop
...@@ -1960,7 +1975,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl) ...@@ -1960,7 +1975,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
int allocate_here = remaining_progress/(bl->N - i); int allocate_here = remaining_progress/(bl->N - i);
remaining_progress -= allocate_here; remaining_progress -= allocate_here;
//printf("%s:%d do_i(%d)\n", __FILE__, __LINE__, i); //printf("%s:%d do_i(%d)\n", __FILE__, __LINE__, i);
BL_TRACE("close"); BL_TRACE(blt_close);
result = loader_do_i(bl, i, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd, result = loader_do_i(bl, i, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd,
allocate_here allocate_here
); );
...@@ -1977,7 +1992,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl) ...@@ -1977,7 +1992,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
assert(bl->progress == PROGRESS_MAX); assert(bl->progress == PROGRESS_MAX);
error: error:
brtloader_destroy(bl, (BOOL)(result!=0)); brtloader_destroy(bl, (BOOL)(result!=0));
BL_TRACE("close"); BL_TRACE(blt_close);
BL_TRACE_END; BL_TRACE_END;
return result; return result;
} }
...@@ -2241,7 +2256,9 @@ static int setup_nonleaf_block (int n_children, ...@@ -2241,7 +2256,9 @@ static int setup_nonleaf_block (int n_children,
int r = read_some_pivots(pivots_file, n_children, bl, pivots); int r = read_some_pivots(pivots_file, n_children, bl, pivots);
assert(r==0); assert(r==0);
if ((r=bl_write_dbt(&pivots[n_children-1], next_pivots_file, NULL, bl))) return r; FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file);
if ((r=bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, bl))) return r;
// The last pivot was written to the next_pivots file, so we free it now instead of returning it. // The last pivot was written to the next_pivots file, so we free it now instead of returning it.
toku_free(pivots[n_children-1].data); toku_free(pivots[n_children-1].data);
pivots[n_children-1] = zero_dbt; pivots[n_children-1] = zero_dbt;
...@@ -2354,7 +2371,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s ...@@ -2354,7 +2371,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
// 2) We put the 15 pivots and 16 blocks into an non-leaf node. // 2) We put the 15 pivots and 16 blocks into an non-leaf node.
// 3) We put the 16th pivot into the next pivots file. // 3) We put the 16th pivot into the next pivots file.
{ {
int r = fseek(bl_fidx2file(bl, pivots_fidx), 0, SEEK_SET); int r = fseek(toku_bl_fidx2file(bl, pivots_fidx), 0, SEEK_SET);
if (r!=0) { assert(errno!=0); return errno; } if (r!=0) { assert(errno!=0); return errno; }
} }
......
...@@ -185,7 +185,7 @@ static void test_read_write_rows (char *template) { ...@@ -185,7 +185,7 @@ static void test_read_write_rows (char *template) {
for (int i=0; i<3; i++) { for (int i=0; i<3; i++) {
DBT key = {.size=strlen(keystrings[i]), .data=keystrings[i]}; DBT key = {.size=strlen(keystrings[i]), .data=keystrings[i]};
DBT val = {.size=strlen(valstrings[i]), .data=valstrings[i]}; DBT val = {.size=strlen(valstrings[i]), .data=valstrings[i]};
r = loader_write_row(&key, &val, file, &dataoff, &bl); r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, &bl);
CKERR(r); CKERR(r);
actual_size+=key.size + val.size + 8; actual_size+=key.size + val.size + 8;
} }
......
...@@ -9,6 +9,54 @@ ...@@ -9,6 +9,54 @@
#include "rdtsc.h" #include "rdtsc.h"
#include "trace_mem.h" #include "trace_mem.h"
#if BL_DO_TRACE && BL_SIMPLE_TRACE
static double saved_scale_factor = 1e-9; // approximate until we get the actual scale factor
static unsigned long long first_time = 0;
static __thread unsigned long long prev_time = 0;
static unsigned long long trace_hist[BLT_LIMIT];
unsigned long long bl_trace (const BL_TRACE_ENUM l, const int quiet) {
assert(l<BLT_LIMIT);
unsigned long long t = rdtsc();
if (first_time==0) {
first_time = t;
}
if (prev_time != 0) {
unsigned long long diff = t-prev_time;
if (l==blt_calibrate_done) {
saved_scale_factor = 1/(double)diff;
}
if (!quiet)
printf("-> %30s %21llu %21llu %13.6fs\n", blt_to_string(l), trace_hist[l], t, (t-first_time)*saved_scale_factor);
trace_hist[l] += diff;
}
prev_time = t;
return t;
}
bl_time_t bl_time_now(void) {
return rdtsc();
}
double bl_time_diff(const bl_time_t a, const bl_time_t b) {
return (a-b)*saved_scale_factor;
}
void bl_trace_end(void) {
double scale_factor = trace_hist[blt_calibrate_done];
unsigned long long total = 0;
for (BL_TRACE_ENUM i=0; i<BLT_LIMIT; i++) {
total+=trace_hist[i];
}
for (BL_TRACE_ENUM i=0; i<BLT_LIMIT; i++) {
printf("%25s %20lld %8.3fs %5.1f%%\n", blt_to_string(i), trace_hist[i], trace_hist[i]/scale_factor, 100.0*trace_hist[i]/(double)total);
}
}
#elif BL_DO_TRACE && !BL_SIMPLE_TRACE
// customize this as required // customize this as required
#define NTRACE 0 #define NTRACE 0
#if NTRACE #if NTRACE
...@@ -94,6 +142,7 @@ void bl_trace(const char *func __attribute__((unused)), ...@@ -94,6 +142,7 @@ void bl_trace(const char *func __attribute__((unused)),
void bl_trace_end(void) void bl_trace_end(void)
{ {
#error
#if BL_DO_TRACE #if BL_DO_TRACE
char bltracefile[128]; char bltracefile[128];
sprintf(bltracefile, "brtloader_%d.trace", toku_os_getpid());; sprintf(bltracefile, "brtloader_%d.trace", toku_os_getpid());;
...@@ -113,3 +162,4 @@ void bl_trace_end(void) ...@@ -113,3 +162,4 @@ void bl_trace_end(void)
#endif #endif
} }
#endif
...@@ -9,6 +9,17 @@ ...@@ -9,6 +9,17 @@
extern "C" { extern "C" {
#endif #endif
// Define this even when traces are compiled out so we don't have to recompile things like scanscan.c
// print the trace
void toku_print_trace_mem(void) __attribute__((__visibility__("default")));
#define BL_DO_TRACE 1
// BL_SIMPLE_TRACE 1 is Bradley's in-memory trace analysis.
// BL_SIMPLE_TRACE 0 is Dave's post-processing analysis.
#define BL_SIMPLE_TRACE 1
#define BL_TRACE_PRINT 0
#if BL_DO_TRACE && !BL_SIMPLE_TRACE
// a circular log of trace entries is maintained in memory. the trace // a circular log of trace entries is maintained in memory. the trace
// entry consists of a string pointer, an integer, and the processor // entry consists of a string pointer, an integer, and the processor
// timestamp. there are functions to add an entry to the end of the // timestamp. there are functions to add an entry to the end of the
...@@ -22,9 +33,6 @@ extern "C" { ...@@ -22,9 +33,6 @@ extern "C" {
// pointer, a number, and the processor timestamp // pointer, a number, and the processor timestamp
void toku_add_trace_mem(const char *str, int n) __attribute__((__visibility__("default"))); void toku_add_trace_mem(const char *str, int n) __attribute__((__visibility__("default")));
// print the trace
void toku_print_trace_mem(void) __attribute__((__visibility__("default")));
// some trace functions added for the bulk loader // some trace functions added for the bulk loader
void bl_trace(const char *func __attribute__((unused)), void bl_trace(const char *func __attribute__((unused)),
int line __attribute__ ((unused)), int line __attribute__ ((unused)),
...@@ -32,15 +40,94 @@ void bl_trace(const char *func __attribute__((unused)), ...@@ -32,15 +40,94 @@ void bl_trace(const char *func __attribute__((unused)),
__attribute__((unused)); __attribute__((unused));
void bl_trace_end(void) __attribute__((unused)); void bl_trace_end(void) __attribute__((unused));
#define BL_DO_TRACE 0 #define BL_TRACE(sym) bl_trace(__FUNCTION__, __LINE__, #sym)
#define BL_TRACE_PRINT 0
#if BL_DO_TRACE
#define BL_TRACE(str) bl_trace(__FUNCTION__, __LINE__, str)
#define BL_TRACE_END bl_trace_end() #define BL_TRACE_END bl_trace_end()
#elif BL_DO_TRACE && BL_SIMPLE_TRACE
typedef enum bl_trace_enum {BLT_START,
blt_calibrate_begin,
blt_calibrate_done,
blt_open,
// Time spent in the extractor
blt_extractor_init,
blt_extractor,
blt_extract_deq,
blt_sort_and_write_rows,
// Time spent by the main thread in parallel to the extractor
blt_do_put,
blt_extract_enq,
blt_join_on_extractor,
// Time spent in the fractal thread
blt_fractal_thread,
blt_fractal_deq,
// Time spent by the main thread in parallel to the fractal thread
blt_start_fractal_thread,
blt_do_i,
blt_read_row,
blt_fractal_enq,
blt_join_on_fractal,
blt_close,
//
BLT_LIMIT}
BL_TRACE_ENUM;
static const char * blt_to_string (BL_TRACE_ENUM) __attribute__((__unused__));
#define BLSCASE(s) case s: return #s
static const char * blt_to_string (BL_TRACE_ENUM i) {
switch(i) {
BLSCASE(BLT_START);
BLSCASE(blt_calibrate_begin);
BLSCASE(blt_calibrate_done);
BLSCASE(blt_close);
BLSCASE(blt_do_i);
BLSCASE(blt_do_put);
BLSCASE(blt_extract_deq);
BLSCASE(blt_extract_enq);
BLSCASE(blt_extractor);
BLSCASE(blt_extractor_init);
BLSCASE(blt_fractal_deq);
BLSCASE(blt_fractal_enq);
BLSCASE(blt_fractal_thread);
BLSCASE(blt_join_on_extractor);
BLSCASE(blt_join_on_fractal);
BLSCASE(blt_open);
BLSCASE(blt_read_row);
BLSCASE(blt_sort_and_write_rows);
BLSCASE(blt_start_fractal_thread);
BLSCASE(BLT_LIMIT);
}
return NULL;
}
typedef unsigned long long bl_time_t;
bl_time_t bl_trace(const BL_TRACE_ENUM, const int quiet);
bl_time_t bl_time_now(void);
double bl_time_diff(const bl_time_t a, const bl_time_t b);
void bl_trace_end(void);
#define BL_TRACE(sym) bl_trace(sym, 0)
#if 0
#define BL_TRACE_QUIET(sym) bl_trace(sym, 1)
#else
#define BL_TRACE_QUIET(sym)
#endif
#define BL_TRACE_END bl_trace_end()
#else #else
#define BL_TRACE(str)
#define BL_TRACE(sym)
#define BL_TRACE_FROM(sym,q,t)
#define BL_TRACE_QUIET(sym)
#define BL_TRACE_FROM_QUIET(sym,q,t)
#define BL_TRACE_END #define BL_TRACE_END
#endif #endif
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
......
...@@ -257,9 +257,9 @@ static void test_loader(DB **dbs) ...@@ -257,9 +257,9 @@ static void test_loader(DB **dbs)
poll_count=0; poll_count=0;
// close the loader // close the loader
printf("%9.6fs closing", elapsed_time()); fflush(stdout); printf("%9.6fs closing\n", elapsed_time());
r = loader->close(loader); r = loader->close(loader);
printf(" done\n"); printf("%9.6fs done\n", elapsed_time());
CKERR2s(r,0,TOKUDB_CANCELED); CKERR2s(r,0,TOKUDB_CANCELED);
if (r==0) { if (r==0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment