Commit d0587d12 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

shutdown the extractor in loader abort. closes[t:2586] #2586

git-svn-id: file:///svn/toku/tokudb@20013 c7de825b-a66e-492c-adef-691d508d4ae1
parent 21c4c3a4
...@@ -126,6 +126,7 @@ struct brtloader_s { ...@@ -126,6 +126,7 @@ struct brtloader_s {
QUEUE primary_rowset_queue; // main thread enqueues rowsets in this queue (in maybe 64MB chunks). The extractor thread removes them, sorts them, adn writes to file. QUEUE primary_rowset_queue; // main thread enqueues rowsets in this queue (in maybe 64MB chunks). The extractor thread removes them, sorts them, adn writes to file.
toku_pthread_t extractor_thread; // the thread that takes primary rowset and does extraction and the first level sort and write to file. toku_pthread_t extractor_thread; // the thread that takes primary rowset and does extraction and the first level sort and write to file.
BOOL extractor_live;
struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file. struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file.
u_int64_t n_rows; // how many rows have been put? u_int64_t n_rows; // how many rows have been put?
......
...@@ -333,6 +333,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -333,6 +333,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
{ int r = queue_create(&bl->primary_rowset_queue, 1); if (r!=0) return r; } { int r = queue_create(&bl->primary_rowset_queue, 1); if (r!=0) return r; }
//printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__); //printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__);
{ int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); if (r!=0) return r; } { int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); if (r!=0) return r; }
bl->extractor_live = TRUE;
*blp = bl; *blp = bl;
BL_TRACE("open"); BL_TRACE("open");
...@@ -674,6 +675,7 @@ static int finish_extractor (BRTLOADER bl) { ...@@ -674,6 +675,7 @@ static int finish_extractor (BRTLOADER bl) {
void *toku_pthread_retval; void *toku_pthread_retval;
int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval); int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval);
assert(r==0 && toku_pthread_retval==NULL); assert(r==0 && toku_pthread_retval==NULL);
bl->extractor_live = FALSE;
BL_TRACE("join_on_extractor"); BL_TRACE("join_on_extractor");
} }
{ {
...@@ -1977,6 +1979,15 @@ int toku_brt_loader_close (BRTLOADER bl, ...@@ -1977,6 +1979,15 @@ int toku_brt_loader_close (BRTLOADER bl,
int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error) int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
/* Effect : Abort the bulk loader, free brtloader resources */ /* Effect : Abort the bulk loader, free brtloader resources */
{ {
// cleanup the extractor thread
if (bl->extractor_live) {
int r = finish_extractor(bl);
assert(r == 0);
}
assert(!bl->extractor_live);
for (int i = 0; i < bl->N; i++)
assert(!bl->fractal_threads_live[i]);
int result = 0; int result = 0;
brtloader_destroy(bl, is_error); brtloader_destroy(bl, is_error);
return result; return result;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment