Commit 6b530a16 authored by osku's avatar osku

Sync array optimizations, most importantly signal_object changed to not

call os_event_set while holding the sync array mutex.

Idea by Heikki, implementation by Osku, bug-fixes and other changes by
Heikki, and final review and cleanup by Osku.
parent 6234dd64
......@@ -271,6 +271,9 @@ it is read or written. */
/* Tell the compiler that cond is unlikely to hold */
#define UNIV_UNLIKELY(cond) UNIV_EXPECT(cond, FALSE)
/* Compile-time constant of the given array's size. */
#define UT_ARR_SIZE(a) (sizeof(a) / sizeof((a)[0]))
#include <stdio.h>
#include "ut0dbg.h"
#include "ut0ut.h"
......
......@@ -78,11 +78,6 @@ struct sync_cell_struct {
the wait cell */
};
/* NOTE: It is allowed for a thread to wait
for an event allocated for the array without owning the
protecting mutex (depending on the case: OS or database mutex), but
all changes (set or reset) to the state of the event must be made
while owning the mutex. */
struct sync_array_struct {
ulint n_reserved; /* number of currently reserved
cells in the wait array */
......@@ -302,31 +297,6 @@ sync_array_validate(
sync_array_exit(arr);
}
/***********************************************************************
Puts the cell event in set state. Sync array mutex has to be reserved. */
static
void
sync_cell_event_set(
/*================*/
sync_cell_t* cell) /* in: array cell */
{
cell->state = SC_WAKING_UP;
os_event_set(cell->event);
cell->event_set = TRUE;
}
/***********************************************************************
Puts the cell event in reset state. */
static
void
sync_cell_event_reset(
/*==================*/
sync_cell_t* cell) /* in: array cell */
{
os_event_reset(cell->event);
cell->event_set = FALSE;
}
/**********************************************************************
Reserves a wait array cell for waiting for an object.
The event of the cell is reset to nonsignalled state. */
......@@ -357,10 +327,14 @@ sync_array_reserve_cell(
if (cell->state == SC_FREE) {
/* Make sure the event is reset */
if (cell->event_set) {
sync_cell_event_reset(cell);
}
/* We do not check cell->event_set because it is
set outside the protection of the sync array mutex
and we had a bug regarding it, and since resetting
an event when it is not needed does no harm it is
safer always to do it. */
cell->event_set = FALSE;
os_event_reset(cell->event);
cell->state = SC_RESERVED;
cell->reservation_time = time(NULL);
......@@ -439,6 +413,17 @@ sync_array_free_cell_protected(
if (cell->state == SC_RESERVED) {
ut_a(arr->n_reserved > 0);
arr->n_reserved--;
} else if (cell->state == SC_WAKING_UP) {
/* This is tricky; if we don't wait for the event to be
signaled, signal_object can set the state of a cell to
SC_WAKING_UP, mutex_spin_wait can call this and set the
state to SC_FREE, and then signal_object gets around to
calling os_set_event for the cell but since it's already
been freed things break horribly. */
sync_array_exit(arr);
os_event_wait(cell->event);
sync_array_enter(arr);
}
cell->state = SC_FREE;
......@@ -463,8 +448,6 @@ sync_array_wait_event(
ut_a(arr);
sync_array_enter(arr);
cell = sync_array_get_nth_cell(arr, index);
ut_a((cell->state == SC_RESERVED) || (cell->state == SC_WAKING_UP));
......@@ -482,6 +465,7 @@ sync_array_wait_event(
recursively sync_array routines, leading to trouble.
rw_lock_debug_mutex freezes the debug lists. */
sync_array_enter(arr);
rw_lock_debug_mutex_enter();
if (TRUE == sync_array_detect_deadlock(arr, cell, cell, 0)) {
......@@ -491,9 +475,8 @@ sync_array_wait_event(
}
rw_lock_debug_mutex_exit();
#endif
sync_array_exit(arr);
#endif
os_event_wait(event);
sync_array_free_cell(arr, index);
......@@ -520,11 +503,17 @@ sync_array_cell_print(
(ulong) os_thread_pf(cell->thread), cell->file,
(ulong) cell->line,
difftime(time(NULL), cell->reservation_time));
fprintf(file, "Wait array cell state %lu\n", (ulong)cell->state);
/* If the memory area pointed to by old_wait_mutex /
old_wait_rw_lock has been freed, this can crash. */
if (type == SYNC_MUTEX) {
if (cell->state != SC_RESERVED) {
/* If cell has this state, then even if we are holding the sync
array mutex, the wait object may get freed meanwhile. Do not
print the wait object then. */
} else if (type == SYNC_MUTEX) {
/* We use old_wait_mutex in case the cell has already
been freed meanwhile */
mutex = cell->old_wait_mutex;
......@@ -575,10 +564,6 @@ sync_array_cell_print(
ut_error;
}
if (!cell->waiting) {
fputs("wait has ended\n", file);
}
if (cell->event_set) {
fputs("wait is ending\n", file);
}
......@@ -718,7 +703,8 @@ sync_array_detect_deadlock(
if (ret) {
fprintf(stderr,
"Mutex %p owned by thread %lu file %s line %lu\n",
mutex, (ulong) os_thread_pf(mutex->thread_id),
mutex,
(ulong) os_thread_pf(mutex->thread_id),
mutex->file_name, (ulong) mutex->line);
sync_array_cell_print(stderr, cell);
......@@ -870,6 +856,20 @@ sync_array_signal_object(
ulint i;
ulint res_count;
/* We store the addresses of cells we need to signal and signal
them only after we have released the sync array's mutex (for
performance reasons). cell_count is the number of such cells, and
cell_ptr points to the first one. If there are less than
UT_ARR_SIZE(cells) of them, cell_ptr == &cells[0], otherwise
cell_ptr points to malloc'd memory that we must free. */
sync_cell_t* cells[100];
sync_cell_t** cell_ptr = &cells[0];
ulint cell_count = 0;
ulint cell_max_count = UT_ARR_SIZE(cells);
ut_a(100 == cell_max_count);
sync_array_enter(arr);
arr->sg_count++;
......@@ -877,7 +877,7 @@ sync_array_signal_object(
i = 0;
count = 0;
/* we need to store this to a local variable because it's modified
/* We need to store this to a local variable because it is modified
inside the loop */
res_count = arr->n_reserved;
......@@ -889,11 +889,32 @@ sync_array_signal_object(
count++;
if (cell->wait_object == object) {
sync_cell_event_set(cell);
cell->state = SC_WAKING_UP;
ut_a(arr->n_reserved > 0);
arr->n_reserved--;
if (cell_count == cell_max_count) {
sync_cell_t** old_cell_ptr = cell_ptr;
size_t old_size = cell_max_count *
sizeof(sync_cell_t*);
cell_max_count *= 2;
size_t new_size = cell_max_count *
sizeof(sync_cell_t*);
cell_ptr = malloc(new_size);
ut_a(cell_ptr);
memcpy(cell_ptr, old_cell_ptr,
old_size);
if (old_cell_ptr != &cells[0]) {
free(old_cell_ptr);
}
}
cell_ptr[cell_count] = cell;
cell_count++;
}
}
......@@ -901,6 +922,17 @@ sync_array_signal_object(
}
sync_array_exit(arr);
for (i = 0; i < cell_count; i++) {
cell = cell_ptr[i];
cell->event_set = TRUE;
os_event_set(cell->event);
}
if (cell_ptr != &cells[0]) {
free(cell_ptr);
}
}
/**************************************************************************
......@@ -927,8 +959,9 @@ sync_arr_wake_threads_if_sema_free(void)
i = 0;
count = 0;
/* we need to store this to a local variable because it's modified
/* We need to store this to a local variable because it is modified
inside the loop */
res_count = arr->n_reserved;
while (count < res_count) {
......@@ -940,7 +973,9 @@ sync_arr_wake_threads_if_sema_free(void)
count++;
if (sync_arr_cell_can_wake_up(cell)) {
sync_cell_event_set(cell);
cell->state = SC_WAKING_UP;
cell->event_set = TRUE;
os_event_set(cell->event);
ut_a(arr->n_reserved > 0);
arr->n_reserved--;
......@@ -1027,25 +1062,19 @@ sync_array_output_info(
mutex */
{
sync_cell_t* cell;
ulint count;
ulint i;
fprintf(file,
"OS WAIT ARRAY INFO: reservation count %ld, signal count %ld\n",
(long) arr->res_count, (long) arr->sg_count);
i = 0;
count = 0;
while (count < arr->n_reserved) {
(long) arr->res_count,
(long) arr->sg_count);
for (i = 0; i < arr->n_cells; i++) {
cell = sync_array_get_nth_cell(arr, i);
if (cell->state != SC_FREE) {
count++;
sync_array_cell_print(file, cell);
}
i++;
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment