Commit c6105e20 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#2821 #2829 refs[t:2821] refs[t:2829] fix the loader's merger vmsize and remove an fsync bubble

git-svn-id: file:///svn/toku/tokudb@22413 c7de825b-a66e-492c-adef-691d508d4ae1
parent 98f50f88
...@@ -163,6 +163,8 @@ struct brtloader_s { ...@@ -163,6 +163,8 @@ struct brtloader_s {
toku_pthread_t *fractal_threads; toku_pthread_t *fractal_threads;
BOOL *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread. (There is no NULL for a pthread_t, so we have to maintain this separately). BOOL *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread. (There is no NULL for a pthread_t, so we have to maintain this separately).
unsigned fractal_workers; // number of fractal tree writer threads
toku_pthread_mutex_t mutex; toku_pthread_mutex_t mutex;
BOOL mutex_init; BOOL mutex_init;
}; };
......
...@@ -71,11 +71,12 @@ static uint32_t size_factor = 1024; ...@@ -71,11 +71,12 @@ static uint32_t size_factor = 1024;
static int nodesize = (1<<22); static int nodesize = (1<<22);
enum { EXTRACTOR_QUEUE_DEPTH = 2, enum { EXTRACTOR_QUEUE_DEPTH = 2,
FILE_BUFFER_SIZE = 1<<24, FILE_BUFFER_SIZE = 1<<24,
MIN_ROWSET_MEMORY = 1<<23, MIN_ROWSET_MEMORY = 1<<23,
MIN_MERGE_FANIN = 2, MIN_MERGE_FANIN = 2,
MERGE_QUEUE_DEPTH = 3, FRACTAL_WRITER_QUEUE_DEPTH = 3,
TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big. DBUFIO_DEPTH = 2,
TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
MIN_MERGE_BUF_SIZE = 1<<20 // always use at least this much MIN_MERGE_BUF_SIZE = 1<<20 // always use at least this much
}; };
...@@ -94,6 +95,14 @@ toku_brtloader_get_rowset_budget_for_testing (void) ...@@ -94,6 +95,14 @@ toku_brtloader_get_rowset_budget_for_testing (void)
return 16ULL*size_factor*1024ULL; return 16ULL*size_factor*1024ULL;
} }
static void brt_loader_lock(BRTLOADER bl) {
int r = toku_pthread_mutex_lock(&bl->mutex); resource_assert(r == 0);
}
static void brt_loader_unlock(BRTLOADER bl) {
int r = toku_pthread_mutex_unlock(&bl->mutex); resource_assert(r == 0);
}
static int add_big_buffer(struct file_info *file) { static int add_big_buffer(struct file_info *file) {
int result = 0; int result = 0;
BOOL newbuffer = FALSE; BOOL newbuffer = FALSE;
...@@ -307,6 +316,7 @@ void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error) { ...@@ -307,6 +316,7 @@ void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error) {
int r = toku_pthread_mutex_destroy(&bl->mutex); resource_assert(r == 0); int r = toku_pthread_mutex_destroy(&bl->mutex); resource_assert(r == 0);
bl->mutex_init = FALSE; bl->mutex_init = FALSE;
} }
// These frees rely on the fact that if you free a NULL pointer then nothing bad happens. // These frees rely on the fact that if you free a NULL pointer then nothing bad happens.
toku_free(bl->dbs); toku_free(bl->dbs);
toku_free(bl->descriptors); toku_free(bl->descriptors);
...@@ -383,35 +393,61 @@ static uint64_t memory_per_rowset_during_extract (BRTLOADER bl) ...@@ -383,35 +393,61 @@ static uint64_t memory_per_rowset_during_extract (BRTLOADER bl)
} }
} }
static unsigned brt_loader_get_fractal_workers_count(BRTLOADER bl) {
unsigned w = 0;
while (1) {
brt_loader_lock(bl);
w = bl->fractal_workers;
brt_loader_unlock(bl);
if (w != 0)
break;
toku_pthread_yield(); // maybe use a cond var instead
}
return w;
}
static void brt_loader_set_fractal_workers_count(BRTLOADER bl) {
brt_loader_lock(bl);
if (bl->fractal_workers == 0)
bl->fractal_workers = cilk_worker_count;
brt_loader_unlock(bl);
}
// To compute a merge, we have a certain amount of memory to work with. // To compute a merge, we have a certain amount of memory to work with.
// We perform only one fanin at a time. // We perform only one fanin at a time.
// If the fanout is F then we are using // If the fanout is F then we are using
// F merges. Each merge uses // F merges. Each merge uses
// MERGE_QUEUE_DEPTH buffers for double buffering. Each buffer is of size at least MERGE_BUF_SIZE // DBUFIO_DEPTH buffers for double buffering. Each buffer is of size at least MERGE_BUF_SIZE
// so the memory is // so the memory is
// F*MERGE_BUF_SIZE*MERGE_QUEUE_DEPTH storage. // F*MERGE_BUF_SIZE*DOUBLE_BUFFER storage.
// We use some additional space to buffer the outputs. // We use some additional space to buffer the outputs.
// That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile. // That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile.
// And we have MERGE_QUEUE_DEPTH*MERGE_BUF_SIZE*bl->N buffers for queue // And we have FRACTAL_WRITER_QUEUE_DEPTH*MERGE_BUF_SIZE per queue
// And if we are doing a fractal, each worker could have have a fractal tree that it's working on. // And if we are doing a fractal, each worker could have have a fractal tree that it's working on.
static int64_t memory_avail_during_merge(BRTLOADER bl, BOOL is_fractal_node) { static int64_t memory_avail_during_merge(BRTLOADER bl, BOOL is_fractal_node) {
int64_t extra_reserved_memory_for_queues = (int64_t)MERGE_QUEUE_DEPTH*(int64_t)TARGET_MERGE_BUF_SIZE* (int64_t)bl->N; int64_t extra_reserved_memory = 0;
int64_t extra_reserved_memory_for_fractal = (is_fractal_node ? cilk_worker_count*(int64_t)nodesize : 0); if (is_fractal_node) {
int64_t extra_reserved_memory = extra_reserved_memory_for_queues + extra_reserved_memory_for_fractal; int64_t extra_reserved_memory_for_queues = (int64_t)FRACTAL_WRITER_QUEUE_DEPTH * (int64_t)TARGET_MERGE_BUF_SIZE; // * bl->N if we run multiple merges in parallel
return bl->reserved_memory-extra_reserved_memory; extra_reserved_memory += extra_reserved_memory_for_queues;
int64_t extra_reserved_memory_for_fractal = (int64_t)brt_loader_get_fractal_workers_count(bl) * (int64_t)nodesize * 2; // compressed and uncompressed buffers
extra_reserved_memory += extra_reserved_memory_for_fractal;
}
return bl->reserved_memory - extra_reserved_memory;
} }
static int merge_fanin (BRTLOADER bl, BOOL is_fractal_node) { static int merge_fanin (BRTLOADER bl, BOOL is_fractal_node) {
// return number of temp files to read in this pass
int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node); int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
int64_t min_buffers_needed = (int64_t)MIN_MERGE_FANIN*(int64_t)TARGET_MERGE_BUF_SIZE*(int64_t)MERGE_QUEUE_DEPTH; int fanin = memory_avail / ((int64_t)DBUFIO_DEPTH * (int64_t)TARGET_MERGE_BUF_SIZE);
if (memory_avail<min_buffers_needed) return MIN_MERGE_FANIN; if (fanin < MIN_MERGE_FANIN)
else return memory_avail/((int64_t)TARGET_MERGE_BUF_SIZE*(int64_t)MERGE_QUEUE_DEPTH); fanin = MIN_MERGE_FANIN;
return fanin;
} }
static uint64_t memory_per_rowset_during_merge (BRTLOADER bl, int merge_factor, BOOL is_fractal_node // if it is being sent to a q static uint64_t memory_per_rowset_during_merge (BRTLOADER bl, int merge_factor, BOOL is_fractal_node // if it is being sent to a q
) { ) {
int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node); int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
return MAX(memory_avail/merge_factor, (int64_t)MIN_MERGE_BUF_SIZE); return MAX(memory_avail/(DBUFIO_DEPTH * merge_factor), (int64_t)MIN_MERGE_BUF_SIZE);
} }
int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
...@@ -1797,7 +1833,7 @@ int merge_files (struct merge_fileset *fs, ...@@ -1797,7 +1833,7 @@ int merge_files (struct merge_fileset *fs,
int n_passes_left = (fs->n_temp_files<=final_mergelimit) int n_passes_left = (fs->n_temp_files<=final_mergelimit)
? 1 ? 1
: 1+n_passes((fs->n_temp_files+final_mergelimit-1)/final_mergelimit, earlier_mergelimit); : 1+n_passes((fs->n_temp_files+final_mergelimit-1)/final_mergelimit, earlier_mergelimit);
//printf("%d files, %d on last pass, %d on earlier passes, %d passes\n", fs->n_temp_files, final_mergelimit, earlier_mergelimit, n_passes_left); // printf("%d files, %d on last pass, %d on earlier passes, %d passes\n", fs->n_temp_files, final_mergelimit, earlier_mergelimit, n_passes_left);
int result = 0; int result = 0;
while (fs->n_temp_files > 0) { while (fs->n_temp_files > 0) {
int progress_allocation_for_this_pass = progress_allocation/n_passes_left; int progress_allocation_for_this_pass = progress_allocation/n_passes_left;
...@@ -1814,7 +1850,7 @@ int merge_files (struct merge_fileset *fs, ...@@ -1814,7 +1850,7 @@ int merge_files (struct merge_fileset *fs,
// We are about to do n_to_merge/n_temp_files of the remaining for this pass. // We are about to do n_to_merge/n_temp_files of the remaining for this pass.
int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files; int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files;
//printf("%s:%d progress_allocation_for_this_subpass=%d n_temp_files=%d\n", __FILE__, __LINE__, progress_allocation_for_this_subpass, fs->n_temp_files); // printf("%s:%d progress_allocation_for_this_subpass=%d n_temp_files=%d b=%llu\n", __FILE__, __LINE__, progress_allocation_for_this_subpass, fs->n_temp_files, (long long unsigned) memory_per_rowset_during_merge(bl, n_to_merge, to_queue));
progress_allocation_for_this_pass -= progress_allocation_for_this_subpass; progress_allocation_for_this_pass -= progress_allocation_for_this_subpass;
//printf("%s:%d merging\n", __FILE__, __LINE__); //printf("%s:%d merging\n", __FILE__, __LINE__);
...@@ -2187,6 +2223,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2187,6 +2223,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
uint64_t total_disksize_estimate) uint64_t total_disksize_estimate)
// Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree. Closes fd. // Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree. Closes fd.
{ {
// set the number of fractal tree writer threads so that we can partition memory in the merger
brt_loader_set_fractal_workers_count(bl);
int result = 0; int result = 0;
int r; int r;
...@@ -2475,7 +2514,7 @@ static int loader_do_i (BRTLOADER bl, ...@@ -2475,7 +2514,7 @@ static int loader_do_i (BRTLOADER bl,
progress_allocation -= allocation_for_merge; progress_allocation -= allocation_for_merge;
int r; int r;
r = queue_create(&bl->fractal_queues[which_db], MERGE_QUEUE_DEPTH); r = queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH);
if (r) goto error; if (r) goto error;
{ {
...@@ -2485,9 +2524,6 @@ static int loader_do_i (BRTLOADER bl, ...@@ -2485,9 +2524,6 @@ static int loader_do_i (BRTLOADER bl,
r = errno; goto error; r = errno; goto error;
} }
r = toku_fsync_directory(new_fname);
if (r != 0) goto error;
// This structure must stay live until the join below. // This structure must stay live until the join below.
struct fractal_thread_args fta = { bl, struct fractal_thread_args fta = { bl,
descriptor, descriptor,
...@@ -2565,6 +2601,14 @@ static int toku_brt_loader_close_internal (BRTLOADER bl) ...@@ -2565,6 +2601,14 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
invariant(0<=bl->progress && bl->progress <= PROGRESS_MAX); invariant(0<=bl->progress && bl->progress <= PROGRESS_MAX);
} }
if (result==0) invariant(remaining_progress==0); if (result==0) invariant(remaining_progress==0);
// fsync the directory containing the new tokudb files.
char *fname0 = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[0]);
int r = toku_fsync_directory(fname0);
toku_free(fname0);
if (r != 0) {
result = r; goto error;
}
} }
invariant(bl->file_infos.n_files_open == 0); invariant(bl->file_infos.n_files_open == 0);
invariant(bl->file_infos.n_files_extant == 0); invariant(bl->file_infos.n_files_extant == 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment