Commit 813eaa05 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

get the loader-cleanup-test to run to completion refs[t:2578]

git-svn-id: file:///svn/toku/tokudb@20019 c7de825b-a66e-492c-adef-691d508d4ae1
parent d0587d12
......@@ -70,7 +70,7 @@ int brt_loader_call_error_function(brtloader_error_callback loader_error) {
int r;
error_callback_lock(loader_error);
r = loader_error->error;
if (r && !loader_error->did_callback) {
if (r && loader_error->error_callback && !loader_error->did_callback) {
loader_error->did_callback = 1;
loader_error->error_callback(loader_error->db,
loader_error->which_db,
......
......@@ -343,6 +343,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
static void brt_loader_set_panic(BRTLOADER bl, int error) {
bl->panic = 1;
bl->panic_errno = error;
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
}
static FILE *bl_fidx2file (BRTLOADER bl, FIDX i) {
......@@ -620,7 +621,7 @@ static void* extractor_thread (void *blv) {
//printf("%s:%d extractor finishing\n", __FILE__, __LINE__);
int r = finish_primary_rows(bl);
r = r; // RFP 2535 assert(r==0); // !!! should deal with this.
r = r; // RFP 2578 assert(r==0); // !!! should deal with this.
for (int i=0; i<bl->N; i++) {
destroy_rowset(&bl->rows[i]); // destroy the rowset here so that if we spawn the sort_and_write, there won't be a race on destroying the rows (thanks to the cilk_sync 2 lines up)
......@@ -1439,6 +1440,15 @@ struct subtrees_info {
struct subtree_info *subtrees;
};
static void subtrees_info_init(struct subtrees_info *p) {
memset(p, 0, sizeof *p);
}
static void subtrees_info_destroy(struct subtrees_info *p) {
toku_free(p->subtrees);
memset(p, 0, sizeof *p);
}
static void allocate_node (struct subtrees_info *sts, int64_t b, const struct subtree_estimates est, const int fingerprint) {
if (sts->n_subtrees >= sts->n_subtrees_limit) {
sts->n_subtrees_limit *= 2;
......@@ -1480,10 +1490,19 @@ struct dbout {
int64_t n_translations_limit;
struct translation *translation;
#ifndef CILK_STUB
cilk::mutex mutex;
cilk::mutex mutex; // RFP how is this initialized?
#endif
};
static inline void dbout_init(struct dbout *out) {
memset(out, 0, sizeof *out);
}
static inline void dbout_destroy(struct dbout *out) {
toku_free(out->translation);
memset(out, 0, sizeof *out);
}
static inline void dbout_lock(struct dbout *out) {
#ifndef CILK_STUB
out->mutex.lock();
......@@ -1527,7 +1546,7 @@ static void dbuf_init (struct dbuf *dbuf) {
dbuf->off=0;
}
static void dbuf_destroy (struct dbuf *dbuf) {
toku_free(dbuf->buf);
toku_free(dbuf->buf); dbuf->buf = NULL;
}
static int64_t allocate_block (struct dbout *out)
......@@ -1670,20 +1689,27 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
brtloader_open_temp_file (bl, &pivots_file);
// The blocks_array will contain all the block numbers that correspond to the pivots. Generally there should be one more block than pivot.
struct subtrees_info sts; memset(&sts, 0, sizeof sts);
sts.next_free_block = 3;
struct subtrees_info sts;
subtrees_info_init(&sts);
sts.next_free_block = 3;
sts.n_subtrees = 0;
sts.n_subtrees_limit = 1;
XMALLOC_N(sts.n_subtrees_limit, sts.subtrees);
if (sts.subtrees == NULL) {
r = errno; goto error;
}
struct dbout out; memset(&out, 0, sizeof out);
struct dbout out;
dbout_init(&out);
out.fd = fd;
out.current_off = 8192; // leave 8K reserved at beginning
out.n_translations = 3; // 3 translations reserved at the beginning
out.n_translations_limit = 4;
MALLOC_N(out.n_translations_limit, out.translation);
if (out.translation == NULL) {
r = errno; goto error;
}
out.translation[0].off = -2LL; out.translation[0].size = 0; // block 0 is NULL
assert(1==RESERVED_BLOCKNUM_TRANSLATION);
assert(2==RESERVED_BLOCKNUM_DESCRIPTOR);
......@@ -1701,8 +1727,10 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
void *item;
{
int rr = queue_deq(q, &item, NULL, NULL);
if (rr==EOF) break;
if (rr!=0) { r=rr; goto error; }
if (rr == EOF) break;
if (rr != 0) {
r=rr; goto error;
}
}
struct rowset *output_rowset = (struct rowset *)item;
......@@ -1754,17 +1782,17 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
{
DBT key=make_dbt(0,0); // must write an extra DBT into the pivots file.
r=bl_write_dbt(&key, pivots_file, NULL, bl);
assert(r==0); //???!!!
r = bl_write_dbt(&key, pivots_file, NULL, bl);
if (r) goto error; // RFP2578
}
r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor);
assert(r==0); //???!!!
assert(r==0); // RFP2578
{
assert(sts.n_subtrees==1);
BLOCKNUM root_block = make_blocknum(sts.subtrees[0].block);
toku_free(sts.subtrees);
toku_free(sts.subtrees); sts.subtrees = NULL;
// write the descriptor
{
......@@ -1783,16 +1811,15 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
wbuf_int(&wbuf, checksum);
assert(wbuf.ndone==desc_size);
r = toku_os_write(out.fd, wbuf.buf, wbuf.ndone);
assert(r==0);
assert(r==0); // RFP2578
out.current_off += desc_size;
toku_free(buf);
}
long long off_of_translation;
r = write_translation_table(&out, &off_of_translation);
assert(r==0);
write_header(&out, off_of_translation, (out.n_translations+1)*16+4, root_block, bl->load_lsn);
if (out.translation) toku_free(out.translation);
assert(r==0); // RFP2578
write_header(&out, off_of_translation, (out.n_translations+1)*16+4, root_block, bl->load_lsn); // RFP2578
r = update_progress(progress_allocation, bl, "wrote tdb file");
}
......@@ -1804,8 +1831,11 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
// Do we need to pay attention to user_said_stop? Or should the guy at the other end of the queue pay attention and send in an EOF.
error:
return 0;
error:
subtrees_info_destroy(&sts);
dbout_destroy(&out);
return r; // RFP2578
}
CILK_END
......@@ -1865,14 +1895,13 @@ static int loader_do_i (BRTLOADER bl,
int fd = open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode);
assert(fd>=0);
struct fractal_thread_args fta = {bl,
descriptor,
fd,
progress_allocation,
bl->fractal_queues[which_db],
0 // result
};
struct fractal_thread_args fta = {bl,
descriptor,
fd,
progress_allocation,
bl->fractal_queues[which_db],
0 // result
};
r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
if (r) {
......@@ -1882,11 +1911,9 @@ static int loader_do_i (BRTLOADER bl,
}
assert(bl->fractal_threads_live[which_db]==FALSE);
bl->fractal_threads_live[which_db] = TRUE;
}
r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
{
void *toku_pthread_retval;
BL_TRACE("do_i");
int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
......@@ -1894,6 +1921,7 @@ static int loader_do_i (BRTLOADER bl,
assert(r2==0 && toku_pthread_retval==NULL);
assert(bl->fractal_threads_live[which_db]);
bl->fractal_threads_live[which_db] = FALSE;
if (r == 0) r = fta.errno_result;
}
{
......@@ -1984,6 +2012,7 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
int r = finish_extractor(bl);
assert(r == 0);
}
assert(!bl->extractor_live);
for (int i = 0; i < bl->N; i++)
assert(!bl->fractal_threads_live[i]);
......@@ -2021,7 +2050,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
static void write_literal(struct dbout *out, void*data, size_t len) {
assert(out->current_off%4096==0);
int r = toku_os_write(out->fd, data, len);
assert(r==0);
assert(r==0); // RFP2578
out->current_off+=len;
}
......@@ -2104,7 +2133,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
//}
fprintf(stderr, "\ntotal bytes written = %d, last byte is \\%o\n", size, compressed_buf[size-1]);
}
write_literal(out, compressed_buf, size);
write_literal(out, compressed_buf, size); // RFP2578
//printf("translation[%lld].off = %lld\n", lbuf->blocknum, off_of_leaf);
out->translation[lbuf->blocknum].off = off_of_leaf;
out->translation[lbuf->blocknum].size = size;
......@@ -2120,6 +2149,8 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
//printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
int r = update_progress(progress_allocation, bl, "wrote node");
if (r!=0) bl->user_said_stop = r;
// RFP2578 return value
}
CILK_END
......@@ -2141,7 +2172,7 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla
unsigned int checksum = x1764_memory(ttable.buf, ttable.off);
putbuf_int32(&ttable, checksum);
assert(bt_size_on_disk==ttable.off);
toku_os_full_pwrite(out->fd, ttable.buf, ttable.off, off_of_translation); /* use a bare pwrite and check error codes. ??? */
toku_os_full_pwrite(out->fd, ttable.buf, ttable.off, off_of_translation); /* RFP2578 use a bare pwrite and check error codes. ??? */
dbuf_destroy(&ttable);
*off_of_translation_p = off_of_translation;
return 0;
......@@ -2164,7 +2195,7 @@ static void write_header (struct dbout *out, long long translation_location_on_d
wbuf_init(&wbuf, buf, size);
toku_serialize_brt_header_to_wbuf(&wbuf, &h, translation_location_on_disk, translation_size_on_disk);
assert(wbuf.ndone==size);
toku_os_full_pwrite(out->fd, wbuf.buf, wbuf.ndone, 0); // ??? use the version that returns error codes?
toku_os_full_pwrite(out->fd, wbuf.buf, wbuf.ndone, 0); // RFP2578 use the version that returns error codes?
toku_free(buf);
}
......@@ -2273,14 +2304,14 @@ static void write_nonleaf_node (struct dbout *out, int64_t blocknum_of_new_node,
size_t n_bytes;
char *bytes;
int r = toku_serialize_brtnode_to_memory(node, 1, 1, &n_bytes, &bytes);
assert(r==0);
assert(r==0);
dbout_lock(out);
out->translation[blocknum_of_new_node].off = out->current_off;
out->translation[blocknum_of_new_node].size = n_bytes;
//fprintf(stderr, "Wrote internal node at %ld (%ld bytes)\n", out->current_off, n_bytes);
//for (uint32_t i=0; i<n_bytes; i++) { unsigned char b = bytes[i]; printf("%d:%02x (%d) ('%c')\n", i, b, b, (b>=' ' && b<128) ? b : '*'); }
write_literal(out, bytes, n_bytes);
write_literal(out, bytes, n_bytes); // RFP2578
seek_align_locked(out);
dbout_unlock(out);
......@@ -2323,10 +2354,10 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
FIDX next_pivots_file;
brtloader_open_temp_file (bl, &next_pivots_file);
struct subtrees_info next_sts; memset(&next_sts, 0, sizeof next_sts);
struct subtrees_info next_sts;
subtrees_info_init(&next_sts);
next_sts.n_subtrees = 0;
next_sts.n_subtrees_limit = 1;
XMALLOC_N(next_sts.n_subtrees_limit, next_sts.subtrees);
const int n_per_block = 16;
......@@ -2343,6 +2374,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
&blocknum_of_new_node, &subtree_info, &pivots);
assert(r==0);
cilk_spawn write_nonleaf_node(out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor); // frees all the data structures that go into making the node.
// RFP2578
n_subtrees_used += n_per_block;
}
// Now we have a one or two blocks at the end to handle.
......@@ -2361,6 +2393,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
&blocknum_of_new_node, &subtree_info, &pivots);
assert(r==0);
cilk_spawn write_nonleaf_node(out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor);
// RFP2578
n_blocks_left -= n_first;
n_subtrees_used += n_first;
}
......@@ -2376,6 +2409,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
&blocknum_of_new_node, &subtree_info, &pivots);
assert(r==0);
cilk_spawn write_nonleaf_node(out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor);
// RFP2578
n_subtrees_used += n_blocks_left;
}
assert(n_subtrees_used == sts->n_subtrees);
......@@ -2386,7 +2420,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); assert(r==0);
r = brtloader_fi_unlink(&bl->file_infos, pivots_fidx); assert(r==0);
pivots_fidx = next_pivots_file;
toku_free(sts->subtrees);
toku_free(sts->subtrees); sts->subtrees = NULL;
*sts = next_sts;
height++;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment