Commit 3a53c018 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3163 use the nodesize configured in a db in the brtloader refs[t:3163]

git-svn-id: file:///svn/toku/tokudb@26909 c7de825b-a66e-492c-adef-691d508d4ae1
parent d3d8102d
...@@ -187,6 +187,7 @@ struct fractal_thread_args { ...@@ -187,6 +187,7 @@ struct fractal_thread_args {
uint64_t total_disksize_estimate; uint64_t total_disksize_estimate;
int errno_result; // the final result. int errno_result; // the final result.
int which_db; int which_db;
uint32_t target_nodesize;
}; };
void toku_brt_loader_set_n_rows(BRTLOADER bl, u_int64_t n_rows); void toku_brt_loader_set_n_rows(BRTLOADER bl, u_int64_t n_rows);
...@@ -218,7 +219,8 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl, ...@@ -218,7 +219,8 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
int progress_allocation, int progress_allocation,
QUEUE q, QUEUE q,
uint64_t total_disksize_estimate, uint64_t total_disksize_estimate,
int which_db); int which_db,
uint32_t target_nodesize);
int brt_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *); int brt_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
......
...@@ -70,7 +70,7 @@ static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *strea ...@@ -70,7 +70,7 @@ static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *strea
// 1024 is the right size_factor for production. // 1024 is the right size_factor for production.
// Different values for these sizes may be used for testing. // Different values for these sizes may be used for testing.
static uint32_t size_factor = 1024; static uint32_t size_factor = 1024;
static int nodesize = (1<<22); static uint32_t default_loader_nodesize = (1<<22);
enum { EXTRACTOR_QUEUE_DEPTH = 2, enum { EXTRACTOR_QUEUE_DEPTH = 2,
FILE_BUFFER_SIZE = 1<<24, FILE_BUFFER_SIZE = 1<<24,
...@@ -88,7 +88,7 @@ void ...@@ -88,7 +88,7 @@ void
toku_brtloader_set_size_factor(uint32_t factor) { toku_brtloader_set_size_factor(uint32_t factor) {
// For test purposes only // For test purposes only
size_factor = factor; size_factor = factor;
nodesize = (size_factor==1) ? (1<<15) : (1<<22); default_loader_nodesize = (size_factor==1) ? (1<<15) : (1<<22);
} }
uint64_t uint64_t
...@@ -464,7 +464,7 @@ static int64_t memory_avail_during_merge(BRTLOADER bl, BOOL is_fractal_node) { ...@@ -464,7 +464,7 @@ static int64_t memory_avail_during_merge(BRTLOADER bl, BOOL is_fractal_node) {
int64_t avail_memory = bl->reserved_memory; int64_t avail_memory = bl->reserved_memory;
if (is_fractal_node) { if (is_fractal_node) {
// reserve space for the fractal writer thread buffers // reserve space for the fractal writer thread buffers
avail_memory -= (int64_t)brt_loader_get_fractal_workers_count(bl) * (int64_t)nodesize * 2; // compressed and uncompressed buffers avail_memory -= (int64_t)brt_loader_get_fractal_workers_count(bl) * (int64_t)default_loader_nodesize * 2; // compressed and uncompressed buffers
} }
return avail_memory; return avail_memory;
} }
...@@ -2215,7 +2215,7 @@ static inline long int loader_random(void) { ...@@ -2215,7 +2215,7 @@ static inline long int loader_random(void) {
return r; return r;
} }
static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc), int64_t lblocknum, TXNID xid) { static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc), int64_t lblocknum, TXNID xid, uint32_t target_nodesize) {
invariant(lblocknum < out->n_translations_limit); invariant(lblocknum < out->n_translations_limit);
struct leaf_buf *XMALLOC(lbuf); struct leaf_buf *XMALLOC(lbuf);
lbuf->blocknum = lblocknum; lbuf->blocknum = lblocknum;
...@@ -2227,7 +2227,7 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc) ...@@ -2227,7 +2227,7 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc)
putbuf_int32(&lbuf->dbuf, layout_version); putbuf_int32(&lbuf->dbuf, layout_version);
putbuf_int32(&lbuf->dbuf, layout_version); // layout_version original putbuf_int32(&lbuf->dbuf, layout_version); // layout_version original
putbuf_int32(&lbuf->dbuf, nodesize); putbuf_int32(&lbuf->dbuf, target_nodesize);
putbuf_int32(&lbuf->dbuf, flags); putbuf_int32(&lbuf->dbuf, flags);
putbuf_int32(&lbuf->dbuf, height); putbuf_int32(&lbuf->dbuf, height);
lbuf->rand4fingerprint = loader_random(); lbuf->rand4fingerprint = loader_random();
...@@ -2253,11 +2253,11 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc) ...@@ -2253,11 +2253,11 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc)
CILK_BEGIN CILK_BEGIN
static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, BRTLOADER bl); static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, BRTLOADER bl);
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor); static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize);
CILK_END CILK_END
static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen); static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen);
static int write_translation_table (struct dbout *out, long long *off_of_translation_p); static int write_translation_table (struct dbout *out, long long *off_of_translation_p);
static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk, LSN load_lsn, TXNID root_xid); static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk, LSN load_lsn, TXNID root_xid, uint32_t target_nodesize);
static void drain_writer_q(QUEUE q) { static void drain_writer_q(QUEUE q) {
void *item; void *item;
...@@ -2279,7 +2279,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2279,7 +2279,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
int progress_allocation, int progress_allocation,
QUEUE q, QUEUE q,
uint64_t total_disksize_estimate, uint64_t total_disksize_estimate,
int which_db) int which_db,
uint32_t target_nodesize)
// Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree. Closes fd. // Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree. Closes fd.
{ {
// set the number of fractal tree writer threads so that we can partition memory in the merger // set the number of fractal tree writer threads so that we can partition memory in the merger
...@@ -2344,7 +2345,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2344,7 +2345,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
if (bl->root_xids_that_created && bl->load_root_xid != bl->root_xids_that_created[which_db]) { if (bl->root_xids_that_created && bl->load_root_xid != bl->root_xids_that_created[which_db]) {
le_xid = bl->load_root_xid; le_xid = bl->load_root_xid;
} }
struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock, le_xid); struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
u_int64_t n_rows_remaining = bl->n_rows; u_int64_t n_rows_remaining = bl->n_rows;
u_int64_t old_n_rows_remaining = bl->n_rows; u_int64_t old_n_rows_remaining = bl->n_rows;
...@@ -2375,7 +2376,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2375,7 +2376,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
// b) the remaining amount won't fit in the current node and the current node's data is more than the remaining amount // b) the remaining amount won't fit in the current node and the current node's data is more than the remaining amount
int remaining_amount = total_disksize_estimate - used_estimate; int remaining_amount = total_disksize_estimate - used_estimate;
int used_here = lbuf->dbuf.off + 1000; // leave 1000 for various overheads. int used_here = lbuf->dbuf.off + 1000; // leave 1000 for various overheads.
int target_size = (nodesize*7L)/8; // use only 7/8 of the node. int target_size = (target_nodesize*7L)/8; // use only 7/8 of the node.
int used_here_with_next_key = used_here + key.size + val.size + disksize_row_overhead; int used_here_with_next_key = used_here + key.size + val.size + disksize_row_overhead;
if ((used_here_with_next_key >= target_size) if ((used_here_with_next_key >= target_size)
|| (used_here + remaining_amount >= target_size || (used_here + remaining_amount >= target_size
...@@ -2408,7 +2409,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2408,7 +2409,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
if (result == 0) result = r; if (result == 0) result = r;
break; break;
} }
lbuf = start_leaf(&out, descriptor, lblock, le_xid); lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
} }
add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size); add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size);
...@@ -2453,7 +2454,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2453,7 +2454,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
} }
} }
r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor); r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor, target_nodesize);
if (r) { if (r) {
result = r; goto error; result = r; goto error;
} }
...@@ -2497,7 +2498,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2497,7 +2498,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
if (bl->root_xids_that_created) { if (bl->root_xids_that_created) {
root_xid_that_created = bl->root_xids_that_created[which_db]; root_xid_that_created = bl->root_xids_that_created[which_db];
} }
r = write_header(&out, off_of_translation, (out.n_translations+1)*16+4, root_block, bl->load_lsn, root_xid_that_created); r = write_header(&out, off_of_translation, (out.n_translations+1)*16+4, root_block, bl->load_lsn, root_xid_that_created, target_nodesize);
if (r) { if (r) {
result = r; goto error; result = r; goto error;
} }
...@@ -2538,13 +2539,15 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl, ...@@ -2538,13 +2539,15 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
int progress_allocation, int progress_allocation,
QUEUE q, QUEUE q,
uint64_t total_disksize_estimate, uint64_t total_disksize_estimate,
int which_db) int which_db,
uint32_t target_nodesize)
// This is probably only for testing. // This is probably only for testing.
{ {
target_nodesize = target_nodesize == 0 ? default_loader_nodesize : target_nodesize;
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
return cilk::run(toku_loader_write_brt_from_q, bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db); return cilk::run(toku_loader_write_brt_from_q, bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize);
#else #else
return toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db); return toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize);
#endif #endif
} }
...@@ -2553,9 +2556,9 @@ static void* fractal_thread (void *ftav) { ...@@ -2553,9 +2556,9 @@ static void* fractal_thread (void *ftav) {
BL_TRACE(blt_start_fractal_thread); BL_TRACE(blt_start_fractal_thread);
struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav; struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db); int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize);
#else #else
int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db); int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize);
#endif #endif
fta->errno_result = r; fta->errno_result = r;
return NULL; return NULL;
...@@ -2591,7 +2594,11 @@ static int loader_do_i (BRTLOADER bl, ...@@ -2591,7 +2594,11 @@ static int loader_do_i (BRTLOADER bl,
if (fd < 0) { if (fd < 0) {
r = errno; goto error; r = errno; goto error;
} }
uint32_t target_nodesize;
r = dest_db->get_pagesize(dest_db, &target_nodesize);
invariant_zero(r);
// This structure must stay live until the join below. // This structure must stay live until the join below.
struct fractal_thread_args fta = { bl, struct fractal_thread_args fta = { bl,
descriptor, descriptor,
...@@ -2600,7 +2607,9 @@ static int loader_do_i (BRTLOADER bl, ...@@ -2600,7 +2607,9 @@ static int loader_do_i (BRTLOADER bl,
bl->fractal_queues[which_db], bl->fractal_queues[which_db],
bl->extracted_datasizes[which_db], bl->extracted_datasizes[which_db],
0, 0,
which_db }; which_db,
target_nodesize,
};
r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta); r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
if (r) { if (r) {
...@@ -2848,7 +2857,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr ...@@ -2848,7 +2857,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
for (int i = 0; i < n_sub_blocks; i++) for (int i = 0; i < n_sub_blocks; i++)
sub_block_init(&sub_block[i]); sub_block_init(&sub_block[i]);
set_all_sub_block_sizes(uncompressed_len, sub_block_size, n_sub_blocks, sub_block); set_all_sub_block_sizes(uncompressed_len, sub_block_size, n_sub_blocks, sub_block);
// allocate space for the compressed bufer // allocate space for the compressed bufer
int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block); int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
unsigned char *MALLOC_N(header_len + bound, compressed_buf); unsigned char *MALLOC_N(header_len + bound, compressed_buf);
...@@ -2946,14 +2955,16 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla ...@@ -2946,14 +2955,16 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla
} }
static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk, LSN load_lsn, TXNID root_xid_that_created) { static int
write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk,
LSN load_lsn, TXNID root_xid_that_created, uint32_t target_nodesize) {
int result = 0; int result = 0;
struct brt_header h; memset(&h, 0, sizeof h); struct brt_header h; memset(&h, 0, sizeof h);
h.layout_version = BRT_LAYOUT_VERSION; h.layout_version = BRT_LAYOUT_VERSION;
h.checkpoint_count = 1; h.checkpoint_count = 1;
h.checkpoint_lsn = load_lsn; h.checkpoint_lsn = load_lsn;
h.nodesize = nodesize; h.nodesize = target_nodesize;
h.root = root_blocknum_on_disk; h.root = root_blocknum_on_disk;
h.flags = 0; h.flags = 0;
h.layout_version_original = BRT_LAYOUT_VERSION; h.layout_version_original = BRT_LAYOUT_VERSION;
...@@ -3081,7 +3092,7 @@ CILK_BEGIN ...@@ -3081,7 +3092,7 @@ CILK_BEGIN
static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknum_of_new_node, int n_children, static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknum_of_new_node, int n_children,
DBT *pivots, /* must free this array, as well as the things it points t */ DBT *pivots, /* must free this array, as well as the things it points t */
struct subtree_info *subtree_info, int height, const DESCRIPTOR UU(desc)) struct subtree_info *subtree_info, int height, const DESCRIPTOR UU(desc), uint32_t target_nodesize)
{ {
//Nodes do not currently touch descriptors //Nodes do not currently touch descriptors
invariant(height>0); invariant(height>0);
...@@ -3089,7 +3100,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -3089,7 +3100,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
int result = 0; int result = 0;
BRTNODE XMALLOC(node); BRTNODE XMALLOC(node);
node->nodesize = nodesize; node->nodesize = target_nodesize;
node->thisnodename = make_blocknum(blocknum_of_new_node); node->thisnodename = make_blocknum(blocknum_of_new_node);
node->layout_version = BRT_LAYOUT_VERSION; node->layout_version = BRT_LAYOUT_VERSION;
node->layout_version_original = BRT_LAYOUT_VERSION; node->layout_version_original = BRT_LAYOUT_VERSION;
...@@ -3174,7 +3185,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -3174,7 +3185,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
brt_loader_set_panic(bl, result, TRUE); brt_loader_set_panic(bl, result, TRUE);
} }
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor) { static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize) {
int result = 0; int result = 0;
int height = 1; int height = 1;
...@@ -3222,7 +3233,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s ...@@ -3222,7 +3233,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
result = r; result = r;
break; break;
} else { } else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor); // frees all the data structures that go into making the node. cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize); // frees all the data structures that go into making the node.
n_subtrees_used += n_per_block; n_subtrees_used += n_per_block;
} }
} }
...@@ -3245,7 +3256,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s ...@@ -3245,7 +3256,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
if (r) { if (r) {
result = r; result = r;
} else { } else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor); cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor, target_nodesize);
n_blocks_left -= n_first; n_blocks_left -= n_first;
n_subtrees_used += n_first; n_subtrees_used += n_first;
} }
...@@ -3264,7 +3275,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s ...@@ -3264,7 +3275,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
if (r) { if (r) {
result = r; result = r;
} else { } else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor); cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize);
n_subtrees_used += n_blocks_left; n_subtrees_used += n_blocks_left;
} }
} }
......
...@@ -132,7 +132,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -132,7 +132,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_set_error_function(&bl.error_callback, NULL, NULL); brt_loader_set_error_function(&bl.error_callback, NULL, NULL);
brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL); brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0); r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0);
// if (!(expect_error ? r != 0 : r == 0)) printf("WARNING%%d expect_error=%d r=%d\n", __LINE__, expect_error, r); // if (!(expect_error ? r != 0 : r == 0)) printf("WARNING%%d expect_error=%d r=%d\n", __LINE__, expect_error, r);
assert(expect_error ? r != 0 : r == 0); assert(expect_error ? r != 0 : r == 0);
......
...@@ -163,7 +163,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) { ...@@ -163,7 +163,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
assert(fd>=0); assert(fd>=0);
if (verbose) traceit("write to file"); if (verbose) traceit("write to file");
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0); r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0);
assert(r==0); assert(r==0);
r = queue_destroy(q2); r = queue_destroy(q2);
......
...@@ -325,7 +325,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c ...@@ -325,7 +325,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
assert(fd>=0); assert(fd>=0);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q, size_est, 0); r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q, size_est, 0, 0);
assert(r==0); assert(r==0);
destroy_merge_fileset(&fs); destroy_merge_fileset(&fs);
......
...@@ -19,6 +19,7 @@ struct kv_pair { ...@@ -19,6 +19,7 @@ struct kv_pair {
struct kv_pair kv_pairs[NUM_KV_PAIRS] = {{1,4}, struct kv_pair kv_pairs[NUM_KV_PAIRS] = {{1,4},
{2,5}, {2,5},
{3,6}}; {3,6}};
static uint32_t block_size = 0;
static int put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val) { static int put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val) {
...@@ -132,6 +133,9 @@ static void run_test(void) ...@@ -132,6 +133,9 @@ static void run_test(void)
r = db_create(&dbs[i], env, 0); CKERR(r); r = db_create(&dbs[i], env, 0); CKERR(r);
r = dbs[i]->set_descriptor(dbs[i], 1, &desc); CKERR(r); r = dbs[i]->set_descriptor(dbs[i], 1, &desc); CKERR(r);
dbs[i]->app_private = &idx[i]; dbs[i]->app_private = &idx[i];
if (block_size != 0) {
r = dbs[i]->set_pagesize(dbs[i], block_size); CKERR(r);
}
snprintf(name, sizeof(name), "db_%04x", i); snprintf(name, sizeof(name), "db_%04x", i);
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r); r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
} }
...@@ -173,6 +177,9 @@ static void do_args(int argc, char * const argv[]) { ...@@ -173,6 +177,9 @@ static void do_args(int argc, char * const argv[]) {
exit(resultcode); exit(resultcode);
} else if (strcmp(argv[0], "-p")==0) { } else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1; USE_PUTS = 1;
} else if (strcmp(argv[0], "--block_size") == 0) {
argc--; argv++;
block_size = atoi(argv[0]);
} else { } else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]); fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1; resultcode=1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment