Commit b15e793a authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge 2578b to main refs[t:2578]

git-svn-id: file:///svn/toku/tokudb@20143 c7de825b-a66e-492c-adef-691d508d4ae1
parent 21c769b9
......@@ -103,7 +103,7 @@ toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
}
void
toku_os_full_pwrite (int fd, const void *buf, size_t len, off_t off) {
toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) {
const char *bp = (const char *) buf;
while (len > 0) {
ssize_t r;
......@@ -124,6 +124,28 @@ toku_os_full_pwrite (int fd, const void *buf, size_t len, off_t off) {
assert(len == 0);
}
int
toku_os_pwrite (int fd, const void *buf, size_t len, toku_off_t off) {
const char *bp = (const char *) buf;
int result = 0;
while (len > 0) {
ssize_t r;
if (t_pwrite) {
r = t_pwrite(fd, bp, len, off);
} else {
r = pwrite(fd, bp, len, off);
}
if (r < 0) {
result = errno;
break;
}
len -= r;
bp += r;
off += r;
}
return result;
}
static ssize_t (*t_write)(int, const void *, size_t) = 0;
int
......@@ -156,6 +178,7 @@ toku_os_full_write (int fd, const void *buf, size_t len) {
int
toku_os_write (int fd, const void *buf, size_t len) {
const char *bp = (const char *) buf;
int result = 0;
while (len > 0) {
ssize_t r;
if (t_write) {
......@@ -163,12 +186,14 @@ toku_os_write (int fd, const void *buf, size_t len) {
} else {
r = write(fd, bp, len);
}
if (r < 0)
return errno;
if (r < 0) {
result = errno;
break;
}
len -= r;
bp += r;
}
return 0;
return result;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......
......@@ -110,9 +110,6 @@ int brt_loader_call_error_function(brtloader_error_callback);
int brt_loader_set_error_and_callback(brtloader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
struct brtloader_s {
int panic;
int panic_errno;
generate_row_for_put_func generate_row_for_put;
brt_compare_func *bt_compare_funs;
......@@ -129,6 +126,9 @@ struct brtloader_s {
toku_pthread_t extractor_thread; // the thread that takes primary rowset and does extraction and the first level sort and write to file.
BOOL extractor_live;
BOOL panic;
int panic_errno;
struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file.
u_int64_t n_rows; // how many rows have been put?
struct merge_fileset *fs;
......@@ -153,6 +153,8 @@ struct brtloader_s {
QUEUE *fractal_queues; // an array of work queues, one for each secondary index.
toku_pthread_t *fractal_threads;
BOOL *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread. (There is no NULL for a pthread_t, so we have to maintain this separately).
pthread_mutex_t mutex;
};
// Set the number of rows in the loader. Used for test.
......
......@@ -302,7 +302,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
BL_TRACE(blt_calibrate_done);
#endif
bl->panic = 0;
bl->panic = FALSE;
bl->panic_errno = 0;
bl->generate_row_for_put = g;
......@@ -359,9 +359,12 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
}
static void brt_loader_set_panic(BRTLOADER bl, int error) {
bl->panic = 1;
// RFP2578 fix races on bl->panic etc
if (!bl->panic) {
bl->panic = TRUE;
bl->panic_errno = error;
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
}
}
// One of the tests uses this.
......@@ -374,7 +377,7 @@ FILE *toku_bl_fidx2file (BRTLOADER bl, FIDX i) {
return result;
}
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl)
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER UU(bl))
/* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number.
* Arguments:
* ptr the data to be writen.
......@@ -393,13 +396,12 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD
else
e = ferror(stream);
assert(e!=0);
brt_loader_set_panic(bl, e);
return e;
}
return 0;
}
static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl)
static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER UU(bl))
/* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number.
* Arguments:
* ptr read data into here.
......@@ -416,7 +418,6 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD
else {
do_error: ;
int e = ferror(stream);
brt_loader_set_panic(bl, e);
return e;
}
} else if (r<nmemb) {
......@@ -625,15 +626,16 @@ static void* extractor_thread (void *blv) {
// Now we have some rows to output
{
int r = process_primary_rows(bl, primary_rowset);
// !!! need to handle this error better.
if (r!=0) abort();
if (r)
brt_loader_set_panic(bl, r);
}
}
//printf("%s:%d extractor finishing\n", __FILE__, __LINE__);
int r = finish_primary_rows(bl);
r = r; // RFP 2578 assert(r==0); // !!! should deal with this.
if (r)
brt_loader_set_panic(bl, r);
BL_TRACE(blt_extractor);
......@@ -1493,12 +1495,13 @@ struct subtrees_info {
};
static void subtrees_info_init(struct subtrees_info *p) {
memset(p, 0, sizeof *p);
p->next_free_block = p->n_subtrees = p->n_subtrees_limit = 0;
p->subtrees = NULL;
}
static void subtrees_info_destroy(struct subtrees_info *p) {
toku_free(p->subtrees);
memset(p, 0, sizeof *p);
p->subtrees = NULL;
}
static void allocate_node (struct subtrees_info *sts, int64_t b, const struct subtree_estimates est, const int fingerprint) {
......@@ -1541,17 +1544,21 @@ struct dbout {
int64_t n_translations_limit;
struct translation *translation;
#ifndef CILK_STUB
cilk::mutex mutex; // RFP how is this initialized?
cilk::mutex mutex; // the mutex is initialized by the dbout constructor
#endif
};
static inline void dbout_init(struct dbout *out) {
memset(out, 0, sizeof *out);
out->fd = -1;
out->current_off = 0;
out->n_translations = out->n_translations_limit = 0;
out->translation = NULL;
}
static inline void dbout_destroy(struct dbout *out) {
assert(out->fd == -1);
toku_free(out->translation);
memset(out, 0, sizeof *out);
out->translation = NULL;
}
static inline void dbout_lock(struct dbout *out) {
......@@ -1729,7 +1736,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
QUEUE q)
// Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree. Closes fd.
{
int result = 0;
int r;
// The pivots file will contain all the pivot strings (in the form <size(32bits)> <data>)
......@@ -1737,21 +1744,12 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
// Note that the pivots file will have one extra pivot in it (the last key in the dictionary) which will not appear in the tree.
int64_t n_pivots=0; // number of pivots in pivots_file
FIDX pivots_file; // the file
brtloader_open_temp_file (bl, &pivots_file);
FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
// The blocks_array will contain all the block numbers that correspond to the pivots. Generally there should be one more block than pivot.
struct subtrees_info sts;
subtrees_info_init(&sts);
sts.next_free_block = 3;
sts.n_subtrees = 0;
sts.n_subtrees_limit = 1;
XMALLOC_N(sts.n_subtrees_limit, sts.subtrees);
if (sts.subtrees == NULL) {
r = errno;
subtrees_info_destroy(&sts);
return r;
r = brtloader_open_temp_file (bl, &pivots_file);
if (r) {
result = r; return result; // RFP2578 goto error?
}
FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
struct dbout out;
dbout_init(&out);
......@@ -1761,10 +1759,23 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
out.n_translations_limit = 4;
MALLOC_N(out.n_translations_limit, out.translation);
if (out.translation == NULL) {
r = errno;
result = errno;
dbout_destroy(&out);
return result; // RFP2578 goto error?
}
// The blocks_array will contain all the block numbers that correspond to the pivots. Generally there should be one more block than pivot.
struct subtrees_info sts;
subtrees_info_init(&sts);
sts.next_free_block = 3;
sts.n_subtrees = 0;
sts.n_subtrees_limit = 1;
MALLOC_N(sts.n_subtrees_limit, sts.subtrees);
if (sts.subtrees == NULL) {
result = errno;
subtrees_info_destroy(&sts);
dbout_destroy(&out);
return r;
return result; // RFP2578 goto error?
}
out.translation[0].off = -2LL; out.translation[0].size = 0; // block 0 is NULL
......@@ -1788,12 +1799,13 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
BL_TRACE(blt_fractal_deq);
if (rr == EOF) break;
if (rr != 0) {
r=rr; goto error;
brt_loader_set_panic(bl, rr); // error after cilk sync
break;
}
}
struct rowset *output_rowset = (struct rowset *)item;
for (unsigned int i=0; i<output_rowset->n_rows; i++) {
for (unsigned int i = 0; i < output_rowset->n_rows; i++) {
DBT key = make_dbt(output_rowset->data+output_rowset->rows[i].off, output_rowset->rows[i].klen);
DBT val = make_dbt(output_rowset->data+output_rowset->rows[i].off + output_rowset->rows[i].klen, output_rowset->rows[i].vlen);
......@@ -1806,10 +1818,10 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
allocate_node(&sts, lblock, est, lbuf->local_fingerprint);
n_pivots++;
if ((r=bl_write_dbt(&key, pivots_stream, NULL, bl))) {
// how to handle errors in fractal thread?
assert(r==0); // this always fails.
return r;
if ((r = bl_write_dbt(&key, pivots_stream, NULL, bl))) {
brt_loader_set_panic(bl, r); // error after cilk sync
break;
}
cilk_spawn finish_leafnode(&out, lbuf, progress_this_node, bl);
......@@ -1824,6 +1836,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
est.dsize+=key.size + val.size;
n_rows_remaining--;
}
destroy_rowset(output_rowset);
toku_free(output_rowset);
}
......@@ -1837,16 +1850,24 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
cilk_sync;
if (bl->panic) { // if there were any prior errors then exit
result = bl->panic_errno; goto error;
}
n_pivots++;
{
DBT key=make_dbt(0,0); // must write an extra DBT into the pivots file.
r=bl_write_dbt(&key, pivots_stream, NULL, bl);
if (r) goto error; // RFP2578
if (r) {
result = r; goto error;
}
}
r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor);
assert(r==0); // RFP2578
if (r) {
result = r; goto error;
}
{
assert(sts.n_subtrees==1);
......@@ -1870,33 +1891,50 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
wbuf_int(&wbuf, checksum);
assert(wbuf.ndone==desc_size);
r = toku_os_write(out.fd, wbuf.buf, wbuf.ndone);
assert(r==0); // RFP2578
out.current_off += desc_size;
toku_free(buf);
toku_free(buf); // wbuf_destroy
if (r) {
result = r; goto error;
}
}
long long off_of_translation;
r = write_translation_table(&out, &off_of_translation);
assert(r == 0); // RFP2578
if (r) {
result = r; goto error;
}
r = write_header(&out, off_of_translation, (out.n_translations+1)*16+4, root_block, bl->load_lsn);
assert(r == 0); // RFP2578
if (r) {
result = r; goto error;
}
r = update_progress(progress_allocation, bl, "wrote tdb file");
if (r) {
result = r; goto error;
}
}
if (r) goto error;
r = fsync(fd);
if (r) { r=errno; goto error; }
r = close(fd);
if (r) { r=errno; goto error; }
r = fsync(out.fd);
if (r) {
result = errno; goto error;
}
// Do we need to pay attention to user_said_stop? Or should the guy at the other end of the queue pay attention and send in an EOF.
error:
{
int rr = close(out.fd);
if (rr)
result = errno;
}
out.fd = -1;
subtrees_info_destroy(&sts);
dbout_destroy(&out);
BL_TRACE(blt_fractal_thread);
return r; // RFP2578
return result;
}
CILK_END
......@@ -2010,7 +2048,6 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
BL_TRACE(blt_do_put);
int result = 0;
int remaining_progress = PROGRESS_MAX;
// RFP cilk_for, no breaks in the loop
for (int i=0; i<bl->N; i++) {
char * fname_in_cwd = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[i]);
// Take the unallocated progress and divide it among the unfinished jobs.
......@@ -2111,11 +2148,12 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
}
}
static void write_literal(struct dbout *out, void*data, size_t len) {
static int write_literal(struct dbout *out, void*data, size_t len) {
assert(out->current_off%4096==0);
int r = toku_os_write(out->fd, data, len);
assert(r==0); // RFP2578
int result = toku_os_write(out->fd, data, len);
if (result == 0)
out->current_off+=len;
return result;
}
CILK_BEGIN
......@@ -2155,6 +2193,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
// initialize the sub blocks
// struct sub_block sub_block[n_sub_blocks]; RFP cilk++ dynamic array bug, use malloc instead
struct sub_block *MALLOC_N(n_sub_blocks, sub_block);
assert(sub_block);
for (int i = 0; i < n_sub_blocks; i++)
sub_block_init(&sub_block[i]);
set_all_sub_block_sizes(uncompressed_len, sub_block_size, n_sub_blocks, sub_block);
......@@ -2162,6 +2201,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
// allocate space for the compressed bufer
int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
unsigned char *MALLOC_N(header_len + bound, compressed_buf);
assert(compressed_buf);
// compress and checksum the sub blocks
int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
......@@ -2197,11 +2237,14 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
//}
fprintf(stderr, "\ntotal bytes written = %d, last byte is \\%o\n", size, compressed_buf[size-1]);
}
write_literal(out, compressed_buf, size); // RFP2578
int result = write_literal(out, compressed_buf, size);
if (result == 0) {
//printf("translation[%lld].off = %lld\n", lbuf->blocknum, off_of_leaf);
out->translation[lbuf->blocknum].off = off_of_leaf;
out->translation[lbuf->blocknum].size = size;
seek_align_locked(out);
}
dbout_unlock(out);
toku_free(sub_block); // RFP cilk++ bug
......@@ -2211,10 +2254,14 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
toku_free(lbuf);
//printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
int r = update_progress(progress_allocation, bl, "wrote node");
if (r!=0) bl->user_said_stop = r;
if (result == 0) {
result = update_progress(progress_allocation, bl, "wrote node");
if (result != 0)
bl->user_said_stop = result;
}
// RFP2578 return value
if (result)
brt_loader_set_panic(bl, result);
}
CILK_END
......@@ -2236,14 +2283,16 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla
unsigned int checksum = x1764_memory(ttable.buf, ttable.off);
putbuf_int32(&ttable, checksum);
assert(bt_size_on_disk==ttable.off);
toku_os_full_pwrite(out->fd, ttable.buf, ttable.off, off_of_translation); /* RFP2578 use a bare pwrite and check error codes. ??? */
int result = toku_os_pwrite(out->fd, ttable.buf, ttable.off, off_of_translation);
dbuf_destroy(&ttable);
*off_of_translation_p = off_of_translation;
return 0;
return result;
}
static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk, LSN load_lsn) {
int result = 0;
struct brt_header h; memset(&h, 0, sizeof h);
h.layout_version = BRT_LAYOUT_VERSION;
h.checkpoint_count = 1;
......@@ -2256,25 +2305,44 @@ static int write_header (struct dbout *out, long long translation_location_on_di
unsigned int size = toku_serialize_brt_header_size (&h);
struct wbuf wbuf;
char *MALLOC_N(size, buf);
if (buf == NULL) {
result = errno;
} else {
wbuf_init(&wbuf, buf, size);
toku_serialize_brt_header_to_wbuf(&wbuf, &h, translation_location_on_disk, translation_size_on_disk);
assert(wbuf.ndone==size);
toku_os_full_pwrite(out->fd, wbuf.buf, wbuf.ndone, 0); // RFP2578 use the version that returns error codes?
if (wbuf.ndone != size)
result = EINVAL;
else
result = toku_os_pwrite(out->fd, wbuf.buf, wbuf.ndone, 0);
toku_free(buf);
return 0;
}
return result;
}
static int read_some_pivots (FIDX pivots_file, int n_to_read, BRTLOADER bl,
/*out*/ DBT pivots[/*n_to_read*/])
// pivots is an array to be filled in. The pivots array is uninitialized.
{
for (int i = 0; i < n_to_read; i++)
pivots[i] = zero_dbt;
FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
for (int i=0; i<n_to_read; i++) {
memset(&pivots[i], 0, sizeof pivots[i]);
int result = 0;
for (int i = 0; i < n_to_read; i++) {
int r = bl_read_dbt(&pivots[i], pivots_stream, bl);
if (r!=0) return r;
};
return 0;
if (r != 0) {
result = r;
break;
}
}
return result;
}
static void delete_pivots(DBT pivots[], int n) {
for (int i = 0; i < n; i++)
toku_free(pivots[i].data);
toku_free(pivots);
}
static int setup_nonleaf_block (int n_children,
......@@ -2296,13 +2364,30 @@ static int setup_nonleaf_block (int n_children,
{
//printf("Nonleaf has children :"); for(int i=0; i<n_children; i++) printf(" %ld", subtrees->subtrees[i].block); printf("\n");
int result = 0;
DBT *MALLOC_N(n_children, pivots);
if (pivots == NULL)
result = errno;
if (result == 0) {
int r = read_some_pivots(pivots_file, n_children, bl, pivots);
assert(r==0);
if (r) {
delete_pivots(pivots, n_children);
result = r;
}
}
if (result == 0) {
FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file);
int r = bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, bl);
if (r) {
delete_pivots(pivots, n_children);
result = r;
}
}
if ((r=bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, bl))) return r;
if (result == 0) {
// The last pivot was written to the next_pivots file, so we free it now instead of returning it.
toku_free(pivots[n_children-1].data);
pivots[n_children-1] = zero_dbt;
......@@ -2312,8 +2397,9 @@ static int setup_nonleaf_block (int n_children,
new_subtree_estimates.exact = TRUE;
struct subtree_info *MALLOC_N(n_children, subtrees_array);
assert(subtrees_array);
int32_t fingerprint = 0;
for (int i=0; i<n_children; i++) {
for (int i = 0; i < n_children; i++) {
int64_t from_blocknum = first_child_offset_in_subtrees + i;
subtrees_array[i] = subtrees->subtrees[from_blocknum];
add_estimates(&new_subtree_estimates, &subtrees->subtrees[from_blocknum].subtree_estimates);
......@@ -2325,16 +2411,21 @@ static int setup_nonleaf_block (int n_children,
*pivots_p = pivots;
*subtrees_info_p = subtrees_array;
return 0;
}
return result;
}
CILK_BEGIN
static void write_nonleaf_node (struct dbout *out, int64_t blocknum_of_new_node, int n_children,
static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknum_of_new_node, int n_children,
DBT *pivots, /* must free this array, as well as the things it points t */
struct subtree_info *subtree_info, int height, const struct descriptor *desc)
{
assert(height>0);
int result = 0;
BRTNODE XMALLOC(node);
node->desc =(struct descriptor *)desc;
node->nodesize = nodesize;
......@@ -2346,6 +2437,7 @@ static void write_nonleaf_node (struct dbout *out, int64_t blocknum_of_new_node,
node->flags = 0;
node->local_fingerprint = 0;
node->rand4fingerprint = loader_random();
XMALLOC_N(n_children-1, node->u.n.childkeys);
unsigned int totalchildkeylens = 0;
for (int i=0; i<n_children-1; i++) {
......@@ -2371,17 +2463,23 @@ static void write_nonleaf_node (struct dbout *out, int64_t blocknum_of_new_node,
size_t n_bytes;
char *bytes;
int r = toku_serialize_brtnode_to_memory(node, 1, 1, &n_bytes, &bytes);
assert(r==0);
int r;
r = toku_serialize_brtnode_to_memory(node, 1, 1, &n_bytes, &bytes);
if (r) {
result = r;
} else {
dbout_lock(out);
out->translation[blocknum_of_new_node].off = out->current_off;
out->translation[blocknum_of_new_node].size = n_bytes;
//fprintf(stderr, "Wrote internal node at %ld (%ld bytes)\n", out->current_off, n_bytes);
//for (uint32_t i=0; i<n_bytes; i++) { unsigned char b = bytes[i]; printf("%d:%02x (%d) ('%c')\n", i, b, b, (b>=' ' && b<128) ? b : '*'); }
write_literal(out, bytes, n_bytes); // RFP2578
r = write_literal(out, bytes, n_bytes);
if (r)
result = r;
else
seek_align_locked(out);
dbout_unlock(out);
}
toku_free(bytes);
for (int i=0; i<n_children-1; i++) {
......@@ -2398,11 +2496,15 @@ static void write_nonleaf_node (struct dbout *out, int64_t blocknum_of_new_node,
toku_free(subtree_info);
blocknum_of_new_node = blocknum_of_new_node;
if (result != 0)
brt_loader_set_panic(bl, result);
}
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) {
int result = 0;
int height = 1;
int height=1;
// Watch out for the case where we saved the last pivot but didn't write any more nodes out.
// The trick is not to look at n_pivots, but to look at blocks.n_blocks
while (sts->n_subtrees > 1) {
......@@ -2440,13 +2542,18 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
&next_sts, next_pivots_file,
out, bl,
&blocknum_of_new_node, &subtree_info, &pivots);
assert(r==0);
cilk_spawn write_nonleaf_node(out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor); // frees all the data structures that go into making the node.
// RFP2578
if (r) {
result = r;
break;
} else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor); // frees all the data structures that go into making the node.
n_subtrees_used += n_per_block;
}
// Now we have a one or two blocks at the end to handle.
}
int64_t n_blocks_left = sts->n_subtrees - n_subtrees_used;
if (result == 0) {
// Now we have a one or two blocks at the end to handle.
assert(n_blocks_left>=2);
if (n_blocks_left > n_per_block) {
// Write half the remaining blocks
......@@ -2459,13 +2566,16 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
&next_sts, next_pivots_file,
out, bl,
&blocknum_of_new_node, &subtree_info, &pivots);
assert(r==0);
cilk_spawn write_nonleaf_node(out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor);
// RFP2578
if (r) {
result = r;
} else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor);
n_blocks_left -= n_first;
n_subtrees_used += n_first;
}
{
}
}
if (result == 0) {
// Write the last block.
DBT *pivots;
int64_t blocknum_of_new_node;
......@@ -2475,26 +2585,35 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
&next_sts, next_pivots_file,
out, bl,
&blocknum_of_new_node, &subtree_info, &pivots);
assert(r==0);
cilk_spawn write_nonleaf_node(out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor);
// RFP2578
if (r) {
result = r;
} else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor);
n_subtrees_used += n_blocks_left;
}
}
if (result == 0)
assert(n_subtrees_used == sts->n_subtrees);
cilk_sync;
// Now set things up for the next iteration.
if (result == 0 && bl->panic) // pick up write_nonleaf_node errors
result = bl->panic_errno;
// Now set things up for the nexxt iteration.
int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); assert(r==0);
r = brtloader_fi_unlink(&bl->file_infos, pivots_fidx); assert(r==0);
pivots_fidx = next_pivots_file;
toku_free(sts->subtrees); sts->subtrees = NULL;
*sts = next_sts;
height++;
if (result)
break;
}
{ int r = brtloader_fi_close (&bl->file_infos, pivots_fidx); assert(r==0); }
{ int r = brtloader_fi_unlink(&bl->file_infos, pivots_fidx); assert(r==0); }
return 0;
return result;
}
CILK_END
......
......@@ -94,17 +94,11 @@ check_test-assert$(BINSUF): test-assert$(BINSUF) $(PTHREAD_LOCAL)
check_brtloader-test$(BINSUF): EXTRA_ARGS=dir.brtloader-test
check_brtloader-test-write-dbfile$(BINSUF): EXTRA_ARGS=-n 1000000 dir.brtloader-test-write-dbfile
check_brtloader-test-write-dbfile$(BINSUF): EXTRA_ARGS=-s -r 100000 dir.brtloader-test-write-dbfile
brtloader-test$(BINSUF): brtloader-test.$(OEXT)
ifeq ($(BRTLOADER),cilk)
$(CILKPP) $(CILKFLAGS) $< -o $@ $(LDFLAGS)
endif
ifeq ($(BRTLOADER),cxx)
$(CXX) $(CXXFLAGS) $< -o $@ $(LDFLAGS)
endif
check_brtloader-test-write-dbfile-enospc$(BINSUF): EXTRA_ARGS=-s -r 10000 dir.brtloader-test-write-dbfile-enospc
brtloader-test-write-dbfile$(BINSUF): brtloader-test-write-dbfile.$(OEXT)
brtloader-%$(BINSUF): brtloader-%.$(OEXT)
ifeq ($(BRTLOADER),cilk)
$(CILKPP) $(CILKFLAGS) $< -o $@ $(LDFLAGS)
endif
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: pqueue.c$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
// test the loader write dbfile function
#define DONT_DEPRECATE_WRITES
#include "includes.h"
#include "test.h"
#include "brtloader-internal.h"
#if defined(__cplusplus)
extern "C" {
#endif
static int write_count, write_count_trigger, write_enospc;
static void reset_write_counts(void) {
write_count = write_count_trigger = write_enospc = 0;
}
static void count_enospc(void) {
write_enospc++;
}
static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
write_count++;
size_t r;
if (write_count_trigger == write_count) {
count_enospc();
errno = ENOSPC;
r = -1;
} else {
r = fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) {
errno = ferror(stream);
}
}
return r;
}
static ssize_t bad_write(int fd, const void * bp, size_t len) {
ssize_t r;
write_count++;
if (write_count_trigger == write_count) {
count_enospc();
errno = ENOSPC;
r = -1;
} else {
r = write(fd, bp, len);
}
return r;
}
static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
ssize_t r;
write_count++;
if (write_count_trigger == write_count) {
count_enospc();
errno = ENOSPC;
r = -1;
} else {
r = pwrite(fd, bp, len, off);
}
return r;
}
static int qsort_compare_ints (const void *a, const void *b) {
int avalue = *(int*)a;
int bvalue = *(int*)b;
if (avalue<bvalue) return -1;
if (avalue>bvalue) return +1;
return 0;
}
static int compare_ints (DB *dest_db, const DBT *akey, const DBT *bkey) {
assert(dest_db==NULL);
assert(akey->size==sizeof(int));
assert(bkey->size==sizeof(int));
return qsort_compare_ints(akey->data, bkey->data);
}
static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
fprintf(stderr, "error in test");
abort();
}
static void verify_dbfile(int n, const char *name) {
if (verbose) printf("verify\n");
int r;
CACHETABLE ct;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
TOKUTXN const null_txn = NULL;
BRT t = NULL;
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_bt_compare(t, compare_ints); assert(r == 0);
r = toku_brt_open(t, name, 0, 0, ct, null_txn, 0); assert(r==0);
BRT_CURSOR cursor = NULL;
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE); assert(r == 0);
int i;
for (i=0; ; i++) {
int kk = i;
int vv = i;
struct check_pair pair = {sizeof kk, &kk, sizeof vv, &vv, 0};
r = toku_brt_cursor_get(cursor, NULL, NULL, lookup_checkf, &pair, DB_NEXT);
if (r != 0) {
assert(pair.call_count ==0);
break;
}
assert(pair.call_count==1);
}
assert(i == n);
r = toku_brt_cursor_close(cursor); assert(r == 0);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct);assert(r==0);
if (verbose) printf("verify done\n");
}
static void write_dbfile (char *template, int n, char *output_name, BOOL expect_error) {
if (verbose) printf("test start %d %d\n", n, expect_error);
DB *dest_db = NULL;
struct brtloader_s bl = {.panic = 0,
.temp_file_template = template};
int r = brtloader_init_file_infos(&bl.file_infos);
CKERR(r);
struct merge_fileset fs;
init_merge_fileset(&fs);
// put rows in the row set
struct rowset aset;
init_rowset(&aset);
for (int i=0; i<n; i++) {
DBT key = {.size=sizeof i,
.data=&i};
DBT val = {.size=sizeof i,
.data=&i};
add_row(&aset, &key, &val);
}
toku_brt_loader_set_n_rows(&bl, n);
brt_loader_init_error_callback(&bl.error_callback);
brt_loader_set_error_function(&bl.error_callback, err_cb, NULL);
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints, 0); CKERR(r);
// destroy_rowset(&aset);
QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0);
r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
assert(fs.n_temp_files==0);
QUEUE q2;
r = queue_create(&q2, 0xFFFFFFFF); // infinite queue.
assert(r==0);
size_t num_found = 0;
while (1) {
void *v;
r = queue_deq(q, &v, NULL, NULL);
if (r==EOF) break;
struct rowset *rs = (struct rowset *)v;
if (verbose) printf("v=%p\n", v);
for (size_t i=num_found; i<rs->n_rows; i++) {
struct row *row = &rs->rows[i];
assert(row->klen==sizeof(int));
assert(row->vlen==sizeof(int));
assert((int)i==*(int*)(rs->data+row->off));
}
num_found += rs->n_rows;
r = queue_enq(q2, v, 0, NULL);
assert(r==0);
}
assert((int)num_found == n);
r = queue_eof(q2);
assert(r==0);
r = queue_destroy(q);
assert(r==0);
struct descriptor desc = {.version = 1, .dbt = (DBT){.size = 4, .data="abcd"}};
int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
assert(fd>=0);
brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2);
assert(expect_error ? r != 0 : r == 0);
brtloader_set_os_fwrite(NULL);
toku_set_func_write(NULL);
toku_set_func_pwrite(NULL);
r = queue_destroy(q2);
assert(r==0);
destroy_merge_fileset(&fs);
brtloader_fi_destroy(&bl.file_infos, expect_error);
brt_loader_destroy_error_callback(&bl.error_callback);
}
int test_main (int argc, const char *argv[]) {
const char *progname=argv[0];
int n = 1;
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0],"-v")==0) {
verbose=1;
} else if (strcmp(argv[0],"-q")==0) {
verbose=0;
} else if (strcmp(argv[0],"-r") == 0) {
argc--; argv++;
n = atoi(argv[0]);
} else if (strcmp(argv[0],"-s") == 0) {
toku_brtloader_set_size_factor(1);
} else if (argc!=1) {
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] directory\n", progname, n);
exit(1);
}
else {
break;
}
argc--; argv++;
}
assert(argc==1); // argv[1] is the directory in which to do the test.
const char* directory = argv[0];
char unlink_all[strlen(directory)+20];
snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory);
int templen = strlen(directory)+15;
char template[templen];
int tlen = snprintf(template, templen, "%s/tempXXXXXX", directory);
assert (tlen>0 && tlen<templen);
char output_name[templen];
int olen = snprintf(output_name, templen, "%s/test.tokudb", directory);
assert (olen>0 && olen<templen);
// callibrate
int r;
r = system(unlink_all); CKERR(r);
r = toku_os_mkdir(directory, 0755); CKERR(r);
write_dbfile(template, n, output_name, FALSE);
if (0) verify_dbfile(n, output_name);
int write_error_limit = write_count;
if (verbose) printf("write_error_limit=%d\n", write_error_limit);
for (int i = 1; i <= write_error_limit; i++) {
reset_write_counts();
write_count_trigger = i;
r = system(unlink_all); CKERR(r);
r = toku_os_mkdir(directory, 0755); CKERR(r);
write_dbfile(template, n, output_name, TRUE);
}
return 0;
}
#if defined(__cplusplus)
}
#endif
......@@ -123,7 +123,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
r = queue_deq(q, &v, NULL, NULL);
if (r==EOF) break;
struct rowset *rs = (struct rowset *)v;
printf("v=%p\n", v);
if (verbose) printf("v=%p\n", v);
for (size_t i=num_found; i<rs->n_rows; i++) {
struct row *row = &rs->rows[i];
......@@ -166,7 +166,6 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
brt_loader_destroy_error_callback(&bl.error_callback);
}
/* Test to see if we can open temporary files. */
int test_main (int argc, const char *argv[]) {
const char *progname=argv[0];
int n = 1;
......@@ -176,11 +175,13 @@ int test_main (int argc, const char *argv[]) {
verbose=1;
} else if (strcmp(argv[0],"-q")==0) {
verbose=0;
} else if (strcmp(argv[0],"-n") == 0) {
} else if (strcmp(argv[0],"-r") == 0) {
argc--; argv++;
n = atoi(argv[0]);
} else if (strcmp(argv[0],"-s") == 0) {
toku_brtloader_set_size_factor(1);
} else if (argc!=1) {
fprintf(stderr, "Usage:\n %s [-v] [-q] directory\n", progname);
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] directory\n", progname, n);
exit(1);
}
else {
......@@ -213,5 +214,5 @@ int test_main (int argc, const char *argv[]) {
}
#if defined(__cplusplus)
};
}
#endif
......@@ -373,5 +373,5 @@ int test_main (int argc, const char *argv[]) {
}
#if defined(__cplusplus)
};
}
#endif
......@@ -147,10 +147,13 @@ extern void *realloc(void*, size_t) __THROW __attribute__((__deprecat
void *os_malloc(size_t);
void *os_realloc(void*,size_t);
void os_free(void*);
// full_pwrite and full_write performs a pwrite, and checks errors. It doesn't return unless all the data was written. */
void toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) __attribute__((__visibility__("default")));
void toku_os_full_write (int fd, const void *buf, size_t len) __attribute__((__visibility__("default")));
// os_write returns 0 on success, otherwise an errno.
int toku_os_pwrite (int fd, const void *buf, size_t len, toku_off_t off) __attribute__((__visibility__("default")));
int toku_os_write (int fd, const void *buf, size_t len) __attribute__((__visibility__("default")));
// wrapper around fsync
......
......@@ -251,6 +251,30 @@ toku_os_full_pwrite (int fd, const void *org_buf, size_t len, toku_off_t off)
}
assert(len == 0);
}
int
toku_os_pwrite (int fd, const void *org_buf, size_t len, toku_off_t off)
{
const uint8_t *buf = org_buf;
int result = 0;
while (len > 0) {
ssize_t r;
if (t_pwrite) {
r = t_pwrite(fd, buf, len, off);
} else {
r = pwrite(fd, buf, len, off);
}
if (r < 0) {
result = errno;
break;
}
len -= r;
buf += r;
off += r;
}
return result;
}
/*
{
ssize_t r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment