Commit 876f0fbd authored by osku's avatar osku

Forward port r121 from branches/5.0:

Optimize sync array behavior by not having the woken threads reserve the
sync array mutex.
parent 6915755d
...@@ -66,12 +66,12 @@ sync_array_wait_event( ...@@ -66,12 +66,12 @@ sync_array_wait_event(
sync_array_t* arr, /* in: wait array */ sync_array_t* arr, /* in: wait array */
ulint index); /* in: index of the reserved cell */ ulint index); /* in: index of the reserved cell */
/********************************************************************** /**********************************************************************
Frees the cell. NOTE! sync_array_wait_event frees the cell Frees the cell safely by reserving the sync array mutex and decrementing
automatically! */ n_reserved if necessary. Should only be called from mutex_spin_wait. */
void void
sync_array_free_cell( sync_array_free_cell_protected(
/*=================*/ /*===========================*/
sync_array_t* arr, /* in: wait array */ sync_array_t* arr, /* in: wait array */
ulint index); /* in: index of the cell in array */ ulint index); /* in: index of the cell in array */
/************************************************************************** /**************************************************************************
......
...@@ -46,9 +46,18 @@ algorithm, because 10 000 events are created fast, but ...@@ -46,9 +46,18 @@ algorithm, because 10 000 events are created fast, but
until a resource is released. The suspending is implemented until a resource is released. The suspending is implemented
using an operating system event semaphore. */ using an operating system event semaphore. */
struct sync_cell_struct { struct sync_cell_struct {
/* State of the cell. SC_WAKING_UP means
sync_array_struct->n_reserved has been decremented, but the thread
in this cell has not waken up yet. When it does, it will set the
state to SC_FREE. Note that this is done without the protection of
any mutex. */
enum { SC_FREE, SC_RESERVED, SC_WAKING_UP } state;
void* wait_object; /* pointer to the object the void* wait_object; /* pointer to the object the
thread is waiting for; if NULL thread is waiting for; like state,
the cell is free for use */ this can change from non-NULL to
NULL without any mutex protection */
mutex_t* old_wait_mutex; /* the latest wait mutex in cell */ mutex_t* old_wait_mutex; /* the latest wait mutex in cell */
rw_lock_t* old_wait_rw_lock;/* the latest wait rw-lock in cell */ rw_lock_t* old_wait_rw_lock;/* the latest wait rw-lock in cell */
ulint request_type; /* lock type requested on the ulint request_type; /* lock type requested on the
...@@ -217,6 +226,7 @@ sync_array_create( ...@@ -217,6 +226,7 @@ sync_array_create(
for (i = 0; i < n_cells; i++) { for (i = 0; i < n_cells; i++) {
cell = sync_array_get_nth_cell(arr, i); cell = sync_array_get_nth_cell(arr, i);
cell->state = SC_FREE;
cell->wait_object = NULL; cell->wait_object = NULL;
/* Create an operating system event semaphore with no name */ /* Create an operating system event semaphore with no name */
...@@ -282,7 +292,7 @@ sync_array_validate( ...@@ -282,7 +292,7 @@ sync_array_validate(
for (i = 0; i < arr->n_cells; i++) { for (i = 0; i < arr->n_cells; i++) {
cell = sync_array_get_nth_cell(arr, i); cell = sync_array_get_nth_cell(arr, i);
if (cell->wait_object != NULL) { if (cell->state == SC_RESERVED) {
count++; count++;
} }
} }
...@@ -293,13 +303,14 @@ sync_array_validate( ...@@ -293,13 +303,14 @@ sync_array_validate(
} }
/*********************************************************************** /***********************************************************************
Puts the cell event in set state. */ Puts the cell event in set state. Sync array mutex has to be reserved. */
static static
void void
sync_cell_event_set( sync_cell_event_set(
/*================*/ /*================*/
sync_cell_t* cell) /* in: array cell */ sync_cell_t* cell) /* in: array cell */
{ {
cell->state = SC_WAKING_UP;
os_event_set(cell->event); os_event_set(cell->event);
cell->event_set = TRUE; cell->event_set = TRUE;
} }
...@@ -344,13 +355,14 @@ sync_array_reserve_cell( ...@@ -344,13 +355,14 @@ sync_array_reserve_cell(
for (i = 0; i < arr->n_cells; i++) { for (i = 0; i < arr->n_cells; i++) {
cell = sync_array_get_nth_cell(arr, i); cell = sync_array_get_nth_cell(arr, i);
if (cell->wait_object == NULL) { if (cell->state == SC_FREE) {
/* Make sure the event is reset */ /* Make sure the event is reset */
if (cell->event_set) { if (cell->event_set) {
sync_cell_event_reset(cell); sync_cell_event_reset(cell);
} }
cell->state = SC_RESERVED;
cell->reservation_time = time(NULL); cell->reservation_time = time(NULL);
cell->thread = os_thread_get_curr_id(); cell->thread = os_thread_get_curr_id();
...@@ -383,6 +395,59 @@ sync_array_reserve_cell( ...@@ -383,6 +395,59 @@ sync_array_reserve_cell(
return; return;
} }
/**********************************************************************
Frees the cell. Note that we don't have any mutex reserved when calling
this. */
static
void
sync_array_free_cell(
/*=================*/
sync_array_t* arr, /* in: wait array */
ulint index) /* in: index of the cell in array */
{
sync_cell_t* cell;
cell = sync_array_get_nth_cell(arr, index);
ut_a(cell->state == SC_WAKING_UP);
ut_a(cell->wait_object != NULL);
cell->state = SC_FREE;
cell->wait_object = NULL;
}
/**********************************************************************
Frees the cell safely by reserving the sync array mutex and decrementing
n_reserved if necessary. Should only be called from mutex_spin_wait. */
void
sync_array_free_cell_protected(
/*===========================*/
sync_array_t* arr, /* in: wait array */
ulint index) /* in: index of the cell in array */
{
sync_cell_t* cell;
sync_array_enter(arr);
cell = sync_array_get_nth_cell(arr, index);
ut_a(cell->state != SC_FREE);
ut_a(cell->wait_object != NULL);
/* We only need to decrement n_reserved if it has not already been
done by sync_array_signal_object. */
if (cell->state == SC_RESERVED) {
ut_a(arr->n_reserved > 0);
arr->n_reserved--;
}
cell->state = SC_FREE;
cell->wait_object = NULL;
sync_array_exit(arr);
}
/********************************************************************** /**********************************************************************
This function should be called when a thread starts to wait on This function should be called when a thread starts to wait on
a wait array cell. In the debug version this function checks a wait array cell. In the debug version this function checks
...@@ -404,6 +469,7 @@ sync_array_wait_event( ...@@ -404,6 +469,7 @@ sync_array_wait_event(
cell = sync_array_get_nth_cell(arr, index); cell = sync_array_get_nth_cell(arr, index);
ut_a((cell->state == SC_RESERVED) || (cell->state == SC_WAKING_UP));
ut_a(cell->wait_object); ut_a(cell->wait_object);
ut_a(!cell->waiting); ut_a(!cell->waiting);
ut_ad(os_thread_get_curr_id() == cell->thread); ut_ad(os_thread_get_curr_id() == cell->thread);
...@@ -436,7 +502,8 @@ sync_array_wait_event( ...@@ -436,7 +502,8 @@ sync_array_wait_event(
} }
/********************************************************************** /**********************************************************************
Reports info of a wait array cell. */ Reports info of a wait array cell. Note: sync_array_print_long_waits()
calls this without mutex protection. */
static static
void void
sync_array_cell_print( sync_array_cell_print(
...@@ -456,6 +523,9 @@ sync_array_cell_print( ...@@ -456,6 +523,9 @@ sync_array_cell_print(
(ulong) cell->line, (ulong) cell->line,
difftime(time(NULL), cell->reservation_time)); difftime(time(NULL), cell->reservation_time));
/* If the memory area pointed to by old_wait_mutex /
old_wait_rw_lock has been freed, this can crash. */
if (type == SYNC_MUTEX) { if (type == SYNC_MUTEX) {
/* We use old_wait_mutex in case the cell has already /* We use old_wait_mutex in case the cell has already
been freed meanwhile */ been freed meanwhile */
...@@ -535,7 +605,7 @@ sync_array_find_thread( ...@@ -535,7 +605,7 @@ sync_array_find_thread(
cell = sync_array_get_nth_cell(arr, i); cell = sync_array_get_nth_cell(arr, i);
if (cell->wait_object != NULL if ((cell->state == SC_RESERVED)
&& os_thread_eq(cell->thread, thread)) { && os_thread_eq(cell->thread, thread)) {
return(cell); /* Found */ return(cell); /* Found */
...@@ -617,6 +687,7 @@ sync_array_detect_deadlock( ...@@ -617,6 +687,7 @@ sync_array_detect_deadlock(
rw_lock_debug_t*debug; rw_lock_debug_t*debug;
ut_a(arr && start && cell); ut_a(arr && start && cell);
ut_ad(cell->state == SC_RESERVED);
ut_ad(cell->wait_object); ut_ad(cell->wait_object);
ut_ad(os_thread_get_curr_id() == start->thread); ut_ad(os_thread_get_curr_id() == start->thread);
ut_ad(depth < 100); ut_ad(depth < 100);
...@@ -784,32 +855,6 @@ sync_arr_cell_can_wake_up( ...@@ -784,32 +855,6 @@ sync_arr_cell_can_wake_up(
return(FALSE); return(FALSE);
} }
/**********************************************************************
Frees the cell. NOTE! sync_array_wait_event frees the cell
automatically! */
void
sync_array_free_cell(
/*=================*/
sync_array_t* arr, /* in: wait array */
ulint index) /* in: index of the cell in array */
{
sync_cell_t* cell;
sync_array_enter(arr);
cell = sync_array_get_nth_cell(arr, index);
ut_a(cell->wait_object != NULL);
cell->wait_object = NULL;
ut_a(arr->n_reserved > 0);
arr->n_reserved--;
sync_array_exit(arr);
}
/************************************************************************** /**************************************************************************
Looks for the cells in the wait array which refer to the wait object Looks for the cells in the wait array which refer to the wait object
specified, and sets their corresponding events to the signaled state. In this specified, and sets their corresponding events to the signaled state. In this
...@@ -825,6 +870,7 @@ sync_array_signal_object( ...@@ -825,6 +870,7 @@ sync_array_signal_object(
sync_cell_t* cell; sync_cell_t* cell;
ulint count; ulint count;
ulint i; ulint i;
ulint res_count;
sync_array_enter(arr); sync_array_enter(arr);
...@@ -833,16 +879,23 @@ sync_array_signal_object( ...@@ -833,16 +879,23 @@ sync_array_signal_object(
i = 0; i = 0;
count = 0; count = 0;
while (count < arr->n_reserved) { /* we need to store this to a local variable because it's modified
inside the loop */
res_count = arr->n_reserved;
while (count < res_count) {
cell = sync_array_get_nth_cell(arr, i); cell = sync_array_get_nth_cell(arr, i);
if (cell->wait_object != NULL) { if (cell->state == SC_RESERVED) {
count++; count++;
if (cell->wait_object == object) { if (cell->wait_object == object) {
sync_cell_event_set(cell); sync_cell_event_set(cell);
ut_a(arr->n_reserved > 0);
arr->n_reserved--;
} }
} }
...@@ -865,23 +918,35 @@ sync_arr_wake_threads_if_sema_free(void) ...@@ -865,23 +918,35 @@ sync_arr_wake_threads_if_sema_free(void)
sync_cell_t* cell; sync_cell_t* cell;
ulint count; ulint count;
ulint i; ulint i;
ulint res_count;
sync_array_enter(arr); sync_array_enter(arr);
i = 0; i = 0;
count = 0; count = 0;
while (count < arr->n_reserved) { /* we need to store this to a local variable because it's modified
inside the loop */
res_count = arr->n_reserved;
while (count < res_count) {
cell = sync_array_get_nth_cell(arr, i); cell = sync_array_get_nth_cell(arr, i);
if (cell->wait_object != NULL) { if (cell->state == SC_RESERVED) {
count++; count++;
if (sync_arr_cell_can_wake_up(cell)) { if (sync_arr_cell_can_wake_up(cell)) {
ut_print_timestamp(stderr);
fprintf(stderr,
" InnoDB: warning: sync_arr_wake_threads_if_sema_free()\n"
"InnoDB: found a thread to wake up.\n");
sync_cell_event_set(cell); sync_cell_event_set(cell);
ut_a(arr->n_reserved > 0);
arr->n_reserved--;
} }
} }
...@@ -911,7 +976,7 @@ sync_array_print_long_waits(void) ...@@ -911,7 +976,7 @@ sync_array_print_long_waits(void)
cell = sync_array_get_nth_cell(sync_primary_wait_array, i); cell = sync_array_get_nth_cell(sync_primary_wait_array, i);
if (cell->wait_object != NULL if ((cell->state != SC_FREE)
&& difftime(time(NULL), cell->reservation_time) > 240) { && difftime(time(NULL), cell->reservation_time) > 240) {
fputs("InnoDB: Warning: a long semaphore wait:\n", fputs("InnoDB: Warning: a long semaphore wait:\n",
stderr); stderr);
...@@ -919,7 +984,7 @@ sync_array_print_long_waits(void) ...@@ -919,7 +984,7 @@ sync_array_print_long_waits(void)
noticed = TRUE; noticed = TRUE;
} }
if (cell->wait_object != NULL if ((cell->state != SC_FREE)
&& difftime(time(NULL), cell->reservation_time) && difftime(time(NULL), cell->reservation_time)
> fatal_timeout) { > fatal_timeout) {
fatal = TRUE; fatal = TRUE;
...@@ -978,7 +1043,7 @@ sync_array_output_info( ...@@ -978,7 +1043,7 @@ sync_array_output_info(
cell = sync_array_get_nth_cell(arr, i); cell = sync_array_get_nth_cell(arr, i);
if (cell->wait_object != NULL) { if (cell->state != SC_FREE) {
count++; count++;
sync_array_cell_print(file, cell); sync_array_cell_print(file, cell);
} }
......
...@@ -482,7 +482,7 @@ spin_loop: ...@@ -482,7 +482,7 @@ spin_loop:
{ {
/* Succeeded! Free the reserved wait cell */ /* Succeeded! Free the reserved wait cell */
sync_array_free_cell(sync_primary_wait_array, index); sync_array_free_cell_protected(sync_primary_wait_array, index);
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
mutex_set_debug_info(mutex, file_name, line); mutex_set_debug_info(mutex, file_name, line);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment