Commit 76e85d0b authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

fix the loader-dup-test mem leaks closes[t:2594] #2594

git-svn-id: file:///svn/toku/tokudb@20064 c7de825b-a66e-492c-adef-691d508d4ae1
parent 350ae692
...@@ -1098,33 +1098,54 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER ...@@ -1098,33 +1098,54 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
//printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation); //printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
FIDX sfile = FIDX_NULL; FIDX sfile = FIDX_NULL;
u_int64_t soffset=0; u_int64_t soffset=0;
// TODO: erase the files, and deal with all the cleanup on error paths // TODO: erase the files, and deal with all the cleanup on error paths
//printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows); //printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows);
//bl_time_t before_sort = bl_time_now(); //bl_time_t before_sort = bl_time_now();
int r = sort_rows(&rows, which_db, dest_db, compare, bl);
if (r!=0) {
return r;
}
//bl_time_t after_sort = bl_time_now();
r = update_progress(progress_allocation/2, bl, "sorted rows");
progress_allocation -= progress_allocation/2;
if (r!=0) return r;
r = extend_fileset(bl, fs, &sfile); int result = 0;
FILE *sstream = toku_bl_fidx2file(bl, sfile); int r = sort_rows(&rows, which_db, dest_db, compare, bl);
if (r != 0) result = r;
if (r!=0) return r;
for (size_t i=0; i<rows.n_rows; i++) {
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl); //bl_time_t after_sort = bl_time_now();
if (r!=0) return r; if (result == 0) {
r = update_progress(progress_allocation/2, bl, "sorted rows");
progress_allocation -= progress_allocation/2;
if (r != 0) result = r;
}
if (result == 0) {
r = extend_fileset(bl, fs, &sfile);
if (r != 0)
result = r;
else {
FILE *sstream = toku_bl_fidx2file(bl, sfile);
for (size_t i=0; i<rows.n_rows; i++) {
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
if (r != 0) {
result = r;
break;
}
}
r = brtloader_fi_close(&bl->file_infos, sfile);
if (r != 0) result = r;
}
} }
r = brtloader_fi_close(&bl->file_infos, sfile); if (r!=0) return r;
destroy_rowset(&rows); destroy_rowset(&rows);
//bl_time_t after_write = bl_time_now(); //bl_time_t after_write = bl_time_now();
return update_progress(progress_allocation, bl, "wrote sorted"); if (result == 0) {
r = update_progress(progress_allocation, bl, "wrote sorted");
if (r != 0) result = r;
}
return result;
} }
CILK_END CILK_END
......
...@@ -255,8 +255,10 @@ static void test_loader(DB **dbs) ...@@ -255,8 +255,10 @@ static void test_loader(DB **dbs)
if (verbose) { printf("closing"); fflush(stdout); } if (verbose) { printf("closing"); fflush(stdout); }
r = loader->close(loader); r = loader->close(loader);
if (verbose) { printf(" done\n"); } if (verbose) { printf(" done\n"); }
assert(r==DB_KEYEXIST); if (NUM_ROWS > 0) {
assert(error_extra.error_count==1); assert(r==DB_KEYEXIST);
assert(error_extra.error_count==1);
}
r = txn->commit(txn, 0); r = txn->commit(txn, 0);
CKERR(r); CKERR(r);
...@@ -303,11 +305,11 @@ static void run_test(void) ...@@ -303,11 +305,11 @@ static void run_test(void)
generate_permute_tables(); generate_permute_tables();
// printf("running test_loader()\n"); if (verbose) printf("running test_loader()\n");
// -------------------------- // // -------------------------- //
test_loader(dbs); test_loader(dbs);
// -------------------------- // // -------------------------- //
// printf("done test_loader()\n"); if (verbose) printf("done test_loader()\n");
for(int i=0;i<NUM_DBS;i++) { for(int i=0;i<NUM_DBS;i++) {
dbs[i]->close(dbs[i], 0); CKERR(r); dbs[i]->close(dbs[i], 0); CKERR(r);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment