Commit f6ad3258 authored by Jan Lindström's avatar Jan Lindström

Code cleanup. Removed those questions that are now addressed.

parent 184e302a
...@@ -108,7 +108,7 @@ typedef struct wrk_itm ...@@ -108,7 +108,7 @@ typedef struct wrk_itm
wr_tsk_t wr; /*!< Flush page list */ wr_tsk_t wr; /*!< Flush page list */
rd_tsk_t rd; /*!< Decompress page list */ rd_tsk_t rd; /*!< Decompress page list */
ulint n_flushed; /*!< Flushed pages count */ ulint n_flushed; /*!< Flushed pages count */
os_thread_t id_usr; /*!< Thread-id currently working */ os_thread_id_t id_usr; /*!< Thread-id currently working */
wrk_status_t wi_status; /*!< Work item status */ wrk_status_t wi_status; /*!< Work item status */
struct wrk_itm *next; /*!< Next work item */ struct wrk_itm *next; /*!< Next work item */
} wrk_t; } wrk_t;
...@@ -125,12 +125,12 @@ typedef struct thread_sync ...@@ -125,12 +125,12 @@ typedef struct thread_sync
wthr_status_t wt_status; /*!< Worker thread status */ wthr_status_t wt_status; /*!< Worker thread status */
mem_heap_t* wheap; /*!< Work heap where memory mem_heap_t* wheap; /*!< Work heap where memory
is allocated */ is allocated */
wrk_t* work_item; /*!< Work items to be processed */ wrk_t* work_item; /*!< Array of work-items that are
individually accessed by multiple
threads. Items are accessed in a
thread safe manner.*/
} thread_sync_t; } thread_sync_t;
/* QUESTION: Is this array used from several threads concurrently ? */
// static wrk_t work_items[MTFLUSH_MAX_WORKER];
/* TODO: REALLY NEEDED ? */ /* TODO: REALLY NEEDED ? */
static int mtflush_work_initialized = -1; static int mtflush_work_initialized = -1;
static os_fast_mutex_t mtflush_mtx; static os_fast_mutex_t mtflush_mtx;
...@@ -203,9 +203,7 @@ buf_mtflu_flush_pool_instance( ...@@ -203,9 +203,7 @@ buf_mtflu_flush_pool_instance(
help in the retry which will follow the help in the retry which will follow the
failure. */ failure. */
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/* QUESTION: is this a really failure ? */ fprintf(stderr, "flush start failed.\n");
fprintf(stderr, "flush_start Failed, flush_type:%d\n",
work_item->wr.flush_type);
#endif #endif
return 0; return 0;
} }
...@@ -230,7 +228,7 @@ buf_mtflu_flush_pool_instance( ...@@ -230,7 +228,7 @@ buf_mtflu_flush_pool_instance(
buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type); buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type);
buf_flush_common(work_item->wr.flush_type, work_item->n_flushed); buf_flush_common(work_item->wr.flush_type, work_item->n_flushed);
return 0; return work_item->n_flushed;
} }
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
...@@ -287,23 +285,30 @@ mtflush_service_io( ...@@ -287,23 +285,30 @@ mtflush_service_io(
return; return;
} }
work_item->id_usr = mtflush_io->wthread; work_item->id_usr = os_thread_get_curr_id();
/* This works as a producer/consumer model, where in tasks are
* inserted into the work-queue (wq) and completions are based
* on the type of operations performed and as a result the WRITE/
* compression/flush operation completions get posted to wr_cq.
* And READ/decompress operations completions get posted to rd_cq.
* in future we may have others.
*/
switch(work_item->tsk) { switch(work_item->tsk) {
case MT_WRK_NONE: case MT_WRK_NONE:
ut_a(work_item->wi_status == WRK_ITEM_EXIT); ut_a(work_item->wi_status == WRK_ITEM_EXIT);
work_item->wi_status = WRK_ITEM_SUCCESS; work_item->wi_status = WRK_ITEM_SUCCESS;
/* QUESTION: Why completed work items are inserted to
completion queue ? */
ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap);
break; break;
case MT_WRK_WRITE: case MT_WRK_WRITE:
work_item->wi_status = WRK_ITEM_START; work_item->wi_status = WRK_ITEM_START;
/* Process work item */ /* Process work item */
/* QUESTION: Is this a really a error ? */ if (0 == (n_flushed = buf_mtflu_flush_pool_instance(work_item))) {
if (0 != (n_flushed = buf_mtflu_flush_pool_instance(work_item))) { #ifdef UNIV_DEBUG
fprintf(stderr, "FLUSH op failed ret:%lu\n", n_flushed); fprintf(stderr, "No pages flushed\n");
#endif
work_item->wi_status = WRK_ITEM_FAILED; work_item->wi_status = WRK_ITEM_FAILED;
} }
work_item->wi_status = WRK_ITEM_SUCCESS; work_item->wi_status = WRK_ITEM_SUCCESS;
...@@ -551,7 +556,7 @@ buf_mtflu_flush_work_items( ...@@ -551,7 +556,7 @@ buf_mtflu_flush_work_items(
"**Set/Unused work_item[%lu] flush_type=%d\n", "**Set/Unused work_item[%lu] flush_type=%d\n",
i, i,
done_wi->wr.flush_type); done_wi->wr.flush_type);
ut_a(0); ut_ad(0);
} }
n_flushed+= done_wi->n_flushed; n_flushed+= done_wi->n_flushed;
...@@ -598,7 +603,7 @@ buf_mtflu_flush_list( ...@@ -598,7 +603,7 @@ buf_mtflu_flush_list(
/ srv_buf_pool_instances; / srv_buf_pool_instances;
} }
/* QUESTION: What is procted by below mutex ? */ /* This lock is to safequard against re-entry if any. */
os_fast_mutex_lock(&mtflush_mtx); os_fast_mutex_lock(&mtflush_mtx);
buf_mtflu_flush_work_items(srv_buf_pool_instances, buf_mtflu_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LIST, cnt_flush, BUF_FLUSH_LIST,
...@@ -641,7 +646,7 @@ buf_mtflu_flush_LRU_tail(void) ...@@ -641,7 +646,7 @@ buf_mtflu_flush_LRU_tail(void)
ut_a(buf_mtflu_init_done()); ut_a(buf_mtflu_init_done());
/* QUESTION: What is protected by below mutex ? */ /* This lock is to safeguard against re-entry if any */
os_fast_mutex_lock(&mtflush_mtx); os_fast_mutex_lock(&mtflush_mtx);
buf_mtflu_flush_work_items(srv_buf_pool_instances, buf_mtflu_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0); cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
......
...@@ -108,7 +108,7 @@ typedef struct wrk_itm ...@@ -108,7 +108,7 @@ typedef struct wrk_itm
wr_tsk_t wr; /*!< Flush page list */ wr_tsk_t wr; /*!< Flush page list */
rd_tsk_t rd; /*!< Decompress page list */ rd_tsk_t rd; /*!< Decompress page list */
ulint n_flushed; /*!< Flushed pages count */ ulint n_flushed; /*!< Flushed pages count */
os_thread_t id_usr; /*!< Thread-id currently working */ os_thread_id_t id_usr; /*!< Thread-id currently working */
wrk_status_t wi_status; /*!< Work item status */ wrk_status_t wi_status; /*!< Work item status */
struct wrk_itm *next; /*!< Next work item */ struct wrk_itm *next; /*!< Next work item */
} wrk_t; } wrk_t;
...@@ -125,12 +125,12 @@ typedef struct thread_sync ...@@ -125,12 +125,12 @@ typedef struct thread_sync
wthr_status_t wt_status; /*!< Worker thread status */ wthr_status_t wt_status; /*!< Worker thread status */
mem_heap_t* wheap; /*!< Work heap where memory mem_heap_t* wheap; /*!< Work heap where memory
is allocated */ is allocated */
wrk_t* work_item; /*!< Work items to be processed */ wrk_t* work_item; /*!< Array of work-items that are
individually accessed by multiple
threads. Items are accessed in a
thread safe manner.*/
} thread_sync_t; } thread_sync_t;
/* QUESTION: Is this array used from several threads concurrently ? */
// static wrk_t work_items[MTFLUSH_MAX_WORKER];
/* TODO: REALLY NEEDED ? */ /* TODO: REALLY NEEDED ? */
static int mtflush_work_initialized = -1; static int mtflush_work_initialized = -1;
static os_fast_mutex_t mtflush_mtx; static os_fast_mutex_t mtflush_mtx;
...@@ -205,9 +205,7 @@ buf_mtflu_flush_pool_instance( ...@@ -205,9 +205,7 @@ buf_mtflu_flush_pool_instance(
help in the retry which will follow the help in the retry which will follow the
failure. */ failure. */
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/* QUESTION: is this a really failure ? */ fprintf(stderr, "flush start failed.\n");
fprintf(stderr, "flush_start Failed, flush_type:%d\n",
work_item->wr.flush_type);
#endif #endif
return 0; return 0;
} }
...@@ -235,7 +233,7 @@ buf_mtflu_flush_pool_instance( ...@@ -235,7 +233,7 @@ buf_mtflu_flush_pool_instance(
buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type); buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type);
buf_flush_common(work_item->wr.flush_type, work_item->n_flushed); buf_flush_common(work_item->wr.flush_type, work_item->n_flushed);
return 0; return work_item->n_flushed;
} }
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
...@@ -293,23 +291,30 @@ mtflush_service_io( ...@@ -293,23 +291,30 @@ mtflush_service_io(
return; return;
} }
work_item->id_usr = mtflush_io->wthread; work_item->id_usr = os_thread_get_curr_id();
/* This works as a producer/consumer model, where in tasks are
* inserted into the work-queue (wq) and completions are based
* on the type of operations performed and as a result the WRITE/
* compression/flush operation completions get posted to wr_cq.
* And READ/decompress operations completions get posted to rd_cq.
* in future we may have others.
*/
switch(work_item->tsk) { switch(work_item->tsk) {
case MT_WRK_NONE: case MT_WRK_NONE:
ut_a(work_item->wi_status == WRK_ITEM_EXIT); ut_a(work_item->wi_status == WRK_ITEM_EXIT);
work_item->wi_status = WRK_ITEM_SUCCESS; work_item->wi_status = WRK_ITEM_SUCCESS;
/* QUESTION: Why completed work items are inserted to
completion queue ? */
ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap);
break; break;
case MT_WRK_WRITE: case MT_WRK_WRITE:
work_item->wi_status = WRK_ITEM_START; work_item->wi_status = WRK_ITEM_START;
/* Process work item */ /* Process work item */
/* QUESTION: Is this a really a error ? */ if (0 == (n_flushed = buf_mtflu_flush_pool_instance(work_item))) {
if (0 != (n_flushed = buf_mtflu_flush_pool_instance(work_item))) { #ifdef UNIV_DEBUG
fprintf(stderr, "FLUSH op failed ret:%lu\n", n_flushed); fprintf(stderr, "No pages flushed\n");
#endif
work_item->wi_status = WRK_ITEM_FAILED; work_item->wi_status = WRK_ITEM_FAILED;
} }
work_item->wi_status = WRK_ITEM_SUCCESS; work_item->wi_status = WRK_ITEM_SUCCESS;
...@@ -557,7 +562,7 @@ buf_mtflu_flush_work_items( ...@@ -557,7 +562,7 @@ buf_mtflu_flush_work_items(
"**Set/Unused work_item[%lu] flush_type=%d\n", "**Set/Unused work_item[%lu] flush_type=%d\n",
i, i,
done_wi->wr.flush_type); done_wi->wr.flush_type);
ut_a(0); ut_ad(0);
} }
n_flushed+= done_wi->n_flushed; n_flushed+= done_wi->n_flushed;
...@@ -604,7 +609,7 @@ buf_mtflu_flush_list( ...@@ -604,7 +609,7 @@ buf_mtflu_flush_list(
/ srv_buf_pool_instances; / srv_buf_pool_instances;
} }
/* QUESTION: What is procted by below mutex ? */ /* This lock is to safequard against re-entry if any. */
os_fast_mutex_lock(&mtflush_mtx); os_fast_mutex_lock(&mtflush_mtx);
buf_mtflu_flush_work_items(srv_buf_pool_instances, buf_mtflu_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LIST, cnt_flush, BUF_FLUSH_LIST,
...@@ -647,7 +652,7 @@ buf_mtflu_flush_LRU_tail(void) ...@@ -647,7 +652,7 @@ buf_mtflu_flush_LRU_tail(void)
ut_a(buf_mtflu_init_done()); ut_a(buf_mtflu_init_done());
/* QUESTION: What is protected by below mutex ? */ /* This lock is to safeguard against re-entry if any */
os_fast_mutex_lock(&mtflush_mtx); os_fast_mutex_lock(&mtflush_mtx);
buf_mtflu_flush_work_items(srv_buf_pool_instances, buf_mtflu_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0); cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment