Commit e4252160 authored by Oleksandr Byelkin's avatar Oleksandr Byelkin

MDEV-15113: Hang in Aria loghandler

Added unregistering writers in case of log error.
Added more debugging control about adding/removing writers to the buffers.
parent 147744d4
...@@ -78,6 +78,32 @@ typedef union ...@@ -78,6 +78,32 @@ typedef union
uchar buffer[TRANSLOG_PAGE_SIZE]; uchar buffer[TRANSLOG_PAGE_SIZE];
} TRANSLOG_PAGE_SIZE_BUFF; } TRANSLOG_PAGE_SIZE_BUFF;
#define MAX_TRUNSLOG_USED_BUFFERS 3
typedef struct
{
struct st_translog_buffer *buff[MAX_TRUNSLOG_USED_BUFFERS];
uint8 wrt_ptr;
uint8 unlck_ptr;
} TRUNSLOG_USED_BUFFERS;
static void
used_buffs_init(TRUNSLOG_USED_BUFFERS *buffs)
{
buffs->unlck_ptr= buffs->wrt_ptr= 0;
}
static void
used_buffs_add(TRUNSLOG_USED_BUFFERS *buffs,
struct st_translog_buffer *buff);
static void
used_buffs_register_unlock(TRUNSLOG_USED_BUFFERS *buffs,
struct st_translog_buffer *buff);
static void
used_buffs_urgent_unlock(TRUNSLOG_USED_BUFFERS *buffs);
/* min chunk length */ /* min chunk length */
#define TRANSLOG_MIN_CHUNK 3 #define TRANSLOG_MIN_CHUNK 3
/* /*
...@@ -156,7 +182,28 @@ struct st_translog_buffer ...@@ -156,7 +182,28 @@ struct st_translog_buffer
TRANSLOG_FILE *file; TRANSLOG_FILE *file;
/* Threads which are waiting for buffer filling/freeing */ /* Threads which are waiting for buffer filling/freeing */
mysql_cond_t waiting_filling_buffer; mysql_cond_t waiting_filling_buffer;
/* Number of records which are in copy progress */ /*
Number of records which are in copy progress.
Controlled via translog_buffer_increase_writers() and
translog_buffer_decrease_writers().
1 Simple case: translog_force_current_buffer_to_finish both called in
the same procedure.
2 Simple case: translog_write_variable_record_1group:
translog_advance_pointer() increase writer of the buffer and
translog_buffer_decrease_writers() decrease it.
Usual case:
1) translog_advance_pointer (i.e. reserve place for future writing)
increase writers for all buffers where place reserved.
Simpliest case: just all space reserved in one buffer
complex case: end of the first buffer, all second buffer, beginning
of the third buffer.
2) When we finish with writing translog_chaser_page_next() will be
called and unlock the buffer by decreasing number of writers.
*/
uint copy_to_buffer_in_progress; uint copy_to_buffer_in_progress;
/* list of waiting buffer ready threads */ /* list of waiting buffer ready threads */
struct st_my_thread_var *waiting_flush; struct st_my_thread_var *waiting_flush;
...@@ -214,6 +261,7 @@ struct st_translog_buffer ...@@ -214,6 +261,7 @@ struct st_translog_buffer
struct st_buffer_cursor struct st_buffer_cursor
{ {
TRUNSLOG_USED_BUFFERS buffs;
/* pointer into the buffer */ /* pointer into the buffer */
uchar *ptr; uchar *ptr;
/* current buffer */ /* current buffer */
...@@ -1663,15 +1711,12 @@ static my_bool translog_create_new_file() ...@@ -1663,15 +1711,12 @@ static my_bool translog_create_new_file()
DBUG_PRINT("info", ("file_no: %lu", (ulong)file_no)); DBUG_PRINT("info", ("file_no: %lu", (ulong)file_no));
if (translog_write_file_header()) if (translog_write_file_header())
DBUG_RETURN(1); goto error;
if (ma_control_file_write_and_force(last_checkpoint_lsn, file_no, if (ma_control_file_write_and_force(last_checkpoint_lsn, file_no,
max_trid_in_control_file, max_trid_in_control_file,
recovery_failures)) recovery_failures))
{ goto error;
translog_stop_writing();
DBUG_RETURN(1);
}
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -1711,10 +1756,6 @@ static void translog_buffer_lock(struct st_translog_buffer *buffer) ...@@ -1711,10 +1756,6 @@ static void translog_buffer_lock(struct st_translog_buffer *buffer)
SYNOPSIS SYNOPSIS
translog_buffer_unlock() translog_buffer_unlock()
buffer This buffer which should be unlocked buffer This buffer which should be unlocked
RETURN
0 OK
1 Error
*/ */
static void translog_buffer_unlock(struct st_translog_buffer *buffer) static void translog_buffer_unlock(struct st_translog_buffer *buffer)
...@@ -1908,7 +1949,10 @@ static void translog_finish_page(TRANSLOG_ADDRESS *horizon, ...@@ -1908,7 +1949,10 @@ static void translog_finish_page(TRANSLOG_ADDRESS *horizon,
(ulong) cursor->buffer->size, (ulong) cursor->buffer->size,
(ulong) (cursor->ptr -cursor->buffer->buffer), (ulong) (cursor->ptr -cursor->buffer->buffer),
(uint) cursor->current_page_fill, (uint) left)); (uint) cursor->current_page_fill, (uint) left));
DBUG_ASSERT(LSN_FILE_NO(*horizon) == LSN_FILE_NO(cursor->buffer->offset)); DBUG_ASSERT(LSN_FILE_NO(*horizon) == LSN_FILE_NO(cursor->buffer->offset)
|| translog_status == TRANSLOG_UNINITED);
if ((LSN_FILE_NO(*horizon) != LSN_FILE_NO(cursor->buffer->offset)))
DBUG_VOID_RETURN; // everything wrong do not write to awoid more problems
translog_check_cursor(cursor); translog_check_cursor(cursor);
if (cursor->protected) if (cursor->protected)
{ {
...@@ -4607,6 +4651,7 @@ static my_bool translog_chaser_page_next(TRANSLOG_ADDRESS *horizon, ...@@ -4607,6 +4651,7 @@ static my_bool translog_chaser_page_next(TRANSLOG_ADDRESS *horizon,
{ {
translog_buffer_lock(buffer_to_flush); translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush); translog_buffer_decrease_writers(buffer_to_flush);
used_buffs_register_unlock(&cursor->buffs, buffer_to_flush);
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
...@@ -4711,7 +4756,8 @@ translog_write_variable_record_chunk3_page(struct st_translog_parts *parts, ...@@ -4711,7 +4756,8 @@ translog_write_variable_record_chunk3_page(struct st_translog_parts *parts,
1 Error 1 Error
*/ */
static my_bool translog_advance_pointer(int pages, uint16 last_page_data) static my_bool translog_advance_pointer(int pages, uint16 last_page_data,
TRUNSLOG_USED_BUFFERS *buffs)
{ {
translog_size_t last_page_offset= (log_descriptor.page_overhead + translog_size_t last_page_offset= (log_descriptor.page_overhead +
last_page_data); last_page_data);
...@@ -4728,6 +4774,8 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4728,6 +4774,8 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
(uint) last_page_data)); (uint) last_page_data));
translog_lock_assert_owner(); translog_lock_assert_owner();
used_buffs_init(buffs);
if (pages == -1) if (pages == -1)
{ {
/* /*
...@@ -4805,8 +4853,10 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4805,8 +4853,10 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
translog_wait_for_buffer_free(new_buffer); translog_wait_for_buffer_free(new_buffer);
#ifndef DBUG_OFF #ifndef DBUG_OFF
/* We keep the handler locked so nobody can start this new buffer */ /* We keep the handler locked so nobody can start this new buffer */
DBUG_ASSERT(offset == new_buffer->offset && new_buffer->file == NULL && DBUG_ASSERT((offset == new_buffer->offset && new_buffer->file == NULL &&
(file == NULL ? ver : (uint8)(ver + 1)) == new_buffer->ver); (file == NULL ? ver : (uint8)(ver + 1)) ==
new_buffer->ver) ||
translog_status == TRANSLOG_READONLY);
} }
#endif #endif
...@@ -4827,6 +4877,8 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4827,6 +4877,8 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no == DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no ==
log_descriptor.bc.buffer_no); log_descriptor.bc.buffer_no);
translog_buffer_increase_writers(log_descriptor.bc.buffer); translog_buffer_increase_writers(log_descriptor.bc.buffer);
// register for case of error
used_buffs_add(buffs, log_descriptor.bc.buffer);
if (file_end_offset <= buffer_end_offset) if (file_end_offset <= buffer_end_offset)
{ {
...@@ -4837,6 +4889,10 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4837,6 +4889,10 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
(ulong) LSN_FILE_NO(log_descriptor.horizon))); (ulong) LSN_FILE_NO(log_descriptor.horizon)));
if (translog_create_new_file()) if (translog_create_new_file())
{ {
struct st_translog_buffer *ob= log_descriptor.bc.buffer;
translog_buffer_unlock(ob);
used_buffs_urgent_unlock(buffs);
translog_buffer_lock(ob);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
} }
...@@ -4858,6 +4914,7 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4858,6 +4914,7 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
log_descriptor.bc.ptr+= offset; log_descriptor.bc.ptr+= offset;
log_descriptor.bc.buffer->size+= offset; log_descriptor.bc.buffer->size+= offset;
translog_buffer_increase_writers(log_descriptor.bc.buffer); translog_buffer_increase_writers(log_descriptor.bc.buffer);
used_buffs_add(buffs, log_descriptor.bc.buffer);
log_descriptor.horizon+= offset; /* offset increasing */ log_descriptor.horizon+= offset; /* offset increasing */
log_descriptor.bc.current_page_fill= last_page_offset; log_descriptor.bc.current_page_fill= last_page_offset;
DBUG_PRINT("info", ("NewP buffer #%u: 0x%lx chaser: %d Size: %lu (%lu) " DBUG_PRINT("info", ("NewP buffer #%u: 0x%lx chaser: %d Size: %lu (%lu) "
...@@ -4878,6 +4935,56 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4878,6 +4935,56 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
static void
used_buffs_add(TRUNSLOG_USED_BUFFERS *buffs,
struct st_translog_buffer *buff)
{
DBUG_ENTER("used_buffs_add");
DBUG_PRINT("enter", ("ADD buffs: %p unlk %u (%p) wrt_ptr: %u (%p)"
" buff %p (%u)",
buffs,
buffs->wrt_ptr, buffs->buff[buffs->wrt_ptr],
buffs->unlck_ptr, buffs->buff[buffs->unlck_ptr],
buff, buff->buffer_no));
DBUG_ASSERT(buffs->wrt_ptr < MAX_TRUNSLOG_USED_BUFFERS);
buffs->buff[buffs->wrt_ptr++]= buff;
DBUG_VOID_RETURN;
}
static void
used_buffs_register_unlock(TRUNSLOG_USED_BUFFERS *buffs,
struct st_translog_buffer *buff
__attribute__((unused)) )
{
DBUG_ENTER("used_buffs_register_unlock");
DBUG_PRINT("enter", ("SUB buffs: %p unlk %u (%p) wrt_ptr: %u (%p)"
" buff %p (%u)",
buffs,
buffs->wrt_ptr, buffs->buff[buffs->wrt_ptr],
buffs->unlck_ptr, buffs->buff[buffs->unlck_ptr],
buff, buff->buffer_no));
DBUG_ASSERT(buffs->buff[buffs->unlck_ptr] == buff);
buffs->unlck_ptr++;
DBUG_VOID_RETURN;
}
static void used_buffs_urgent_unlock(TRUNSLOG_USED_BUFFERS *buffs)
{
uint i;
DBUG_ENTER("used_buffs_urgent_unlock");
translog_lock();
translog_stop_writing();
translog_unlock();
for (i= buffs->unlck_ptr; i < buffs->wrt_ptr; i++)
{
struct st_translog_buffer *buf= buffs->buff[i];
translog_buffer_lock(buf);
translog_buffer_decrease_writers(buf);
translog_buffer_unlock(buf);
buffs->buff[i]= NULL;
}
used_buffs_init(buffs);
DBUG_VOID_RETURN;
}
/* /*
Get page rest Get page rest
...@@ -5016,6 +5123,11 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -5016,6 +5123,11 @@ translog_write_variable_record_1group(LSN *lsn,
lsn, hook_arg))) lsn, hook_arg)))
{ {
translog_unlock(); translog_unlock();
if (buffer_to_flush != NULL)
{
translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush);
}
DBUG_RETURN(1); DBUG_RETURN(1);
} }
cursor= log_descriptor.bc; cursor= log_descriptor.bc;
...@@ -5046,8 +5158,9 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -5046,8 +5158,9 @@ translog_write_variable_record_1group(LSN *lsn,
(log_descriptor.page_capacity_chunk_2 - 1), (log_descriptor.page_capacity_chunk_2 - 1),
record_rest, parts->record_length)); record_rest, parts->record_length));
/* record_rest + 3 is chunk type 3 overhead + record_rest */ /* record_rest + 3 is chunk type 3 overhead + record_rest */
rc|= translog_advance_pointer((int)(full_pages + additional_chunk3_page), rc= translog_advance_pointer((int)(full_pages + additional_chunk3_page),
(record_rest ? record_rest + 3 : 0)); (record_rest ? record_rest + 3 : 0),
&cursor.buffs);
log_descriptor.bc.buffer->last_lsn= *lsn; log_descriptor.bc.buffer->last_lsn= *lsn;
DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx", DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn), LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn),
...@@ -5066,7 +5179,11 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -5066,7 +5179,11 @@ translog_write_variable_record_1group(LSN *lsn,
translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
} }
if (rc) if (rc)
{
//translog_advance_pointer decreased writers so it is OK
DBUG_ASSERT(cursor.buffs.unlck_ptr == cursor.buffs.wrt_ptr);
DBUG_RETURN(1); DBUG_RETURN(1);
}
translog_write_variable_record_1group_header(parts, type, short_trid, translog_write_variable_record_1group_header(parts, type, short_trid,
header_length, chunk0_header); header_length, chunk0_header);
...@@ -5081,7 +5198,7 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -5081,7 +5198,7 @@ translog_write_variable_record_1group(LSN *lsn,
for (i= 0; i < full_pages; i++) for (i= 0; i < full_pages; i++)
{ {
if (translog_write_variable_record_chunk2_page(parts, &horizon, &cursor)) if (translog_write_variable_record_chunk2_page(parts, &horizon, &cursor))
DBUG_RETURN(1); goto error;
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)", DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)",
LSN_IN_PARTS(log_descriptor.horizon), LSN_IN_PARTS(log_descriptor.horizon),
...@@ -5094,7 +5211,7 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -5094,7 +5211,7 @@ translog_write_variable_record_1group(LSN *lsn,
log_descriptor. log_descriptor.
page_capacity_chunk_2 - 2, page_capacity_chunk_2 - 2,
&horizon, &cursor)) &horizon, &cursor))
DBUG_RETURN(1); goto error;
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)", DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)",
LSN_IN_PARTS(log_descriptor.horizon), LSN_IN_PARTS(log_descriptor.horizon),
LSN_IN_PARTS(horizon))); LSN_IN_PARTS(horizon)));
...@@ -5104,7 +5221,7 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -5104,7 +5221,7 @@ translog_write_variable_record_1group(LSN *lsn,
if (translog_write_variable_record_chunk3_page(parts, if (translog_write_variable_record_chunk3_page(parts,
record_rest, record_rest,
&horizon, &cursor)) &horizon, &cursor))
DBUG_RETURN(1); goto error;
DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)", DBUG_PRINT("info", ("absolute horizon: (%lu,0x%lx) local: (%lu,0x%lx)",
(ulong) LSN_FILE_NO(log_descriptor.horizon), (ulong) LSN_FILE_NO(log_descriptor.horizon),
(ulong) LSN_OFFSET(log_descriptor.horizon), (ulong) LSN_OFFSET(log_descriptor.horizon),
...@@ -5113,8 +5230,13 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -5113,8 +5230,13 @@ translog_write_variable_record_1group(LSN *lsn,
translog_buffer_lock(cursor.buffer); translog_buffer_lock(cursor.buffer);
translog_buffer_decrease_writers(cursor.buffer); translog_buffer_decrease_writers(cursor.buffer);
used_buffs_register_unlock(&cursor.buffs, cursor.buffer);
translog_buffer_unlock(cursor.buffer); translog_buffer_unlock(cursor.buffer);
DBUG_RETURN(rc); DBUG_ASSERT(cursor.buffs.unlck_ptr == cursor.buffs.wrt_ptr);
DBUG_RETURN(0);
error:
used_buffs_urgent_unlock(&cursor.buffs);
DBUG_RETURN(1);
} }
...@@ -5168,7 +5290,8 @@ translog_write_variable_record_1chunk(LSN *lsn, ...@@ -5168,7 +5290,8 @@ translog_write_variable_record_1chunk(LSN *lsn,
lsn, hook_arg))) lsn, hook_arg)))
{ {
translog_unlock(); translog_unlock();
DBUG_RETURN(1); rc= 1;
goto err;
} }
rc= translog_write_parts_on_page(&log_descriptor.horizon, rc= translog_write_parts_on_page(&log_descriptor.horizon,
...@@ -5184,6 +5307,7 @@ translog_write_variable_record_1chunk(LSN *lsn, ...@@ -5184,6 +5307,7 @@ translog_write_variable_record_1chunk(LSN *lsn,
check if we switched buffer and need process it (current buffer is check if we switched buffer and need process it (current buffer is
unlocked already => we will not delay other threads unlocked already => we will not delay other threads
*/ */
err:
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
{ {
if (!rc) if (!rc)
...@@ -5523,9 +5647,11 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5523,9 +5647,11 @@ translog_write_variable_record_mgroup(LSN *lsn,
uint file_of_the_first_group; uint file_of_the_first_group;
int pages_to_skip; int pages_to_skip;
struct st_translog_buffer *buffer_of_last_lsn; struct st_translog_buffer *buffer_of_last_lsn;
my_bool external_buffer_to_flush= TRUE;
DBUG_ENTER("translog_write_variable_record_mgroup"); DBUG_ENTER("translog_write_variable_record_mgroup");
translog_lock_assert_owner(); translog_lock_assert_owner();
used_buffs_init(&cursor.buffs);
chunk2_header[0]= TRANSLOG_CHUNK_NOHDR; chunk2_header[0]= TRANSLOG_CHUNK_NOHDR;
if (my_init_dynamic_array(&groups, if (my_init_dynamic_array(&groups,
...@@ -5533,6 +5659,11 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5533,6 +5659,11 @@ translog_write_variable_record_mgroup(LSN *lsn,
10, 10)) 10, 10))
{ {
translog_unlock(); translog_unlock();
if (buffer_to_flush != NULL)
{
translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush);
}
DBUG_PRINT("error", ("init array failed")); DBUG_PRINT("error", ("init array failed"));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -5559,6 +5690,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5559,6 +5690,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
translog_mark_file_unfinished(file_of_the_first_group); translog_mark_file_unfinished(file_of_the_first_group);
do do
{ {
DBUG_ASSERT(cursor.buffs.unlck_ptr == cursor.buffs.wrt_ptr);
group.addr= horizon= log_descriptor.horizon; group.addr= horizon= log_descriptor.horizon;
cursor= log_descriptor.bc; cursor= log_descriptor.bc;
cursor.chaser= 1; cursor.chaser= 1;
...@@ -5591,21 +5723,26 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5591,21 +5723,26 @@ translog_write_variable_record_mgroup(LSN *lsn,
(ulong)(parts->record_length - (first_page - 1 + (ulong)(parts->record_length - (first_page - 1 +
buffer_rest) - buffer_rest) -
done))); done)));
rc|= translog_advance_pointer((int)full_pages, 0); rc= translog_advance_pointer((int)full_pages, 0, &cursor.buffs);
translog_unlock(); translog_unlock();
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
{ {
if (!external_buffer_to_flush)
translog_buffer_decrease_writers(buffer_to_flush); translog_buffer_decrease_writers(buffer_to_flush);
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
buffer_to_flush= NULL; buffer_to_flush= NULL;
} }
external_buffer_to_flush= FALSE;
if (rc) if (rc)
{ {
DBUG_PRINT("error", ("flush of unlock buffer failed")); DBUG_PRINT("error", ("flush of unlock buffer failed"));
//translog_advance_pointer decreased writers so it is OK
DBUG_ASSERT(cursor.buffs.unlck_ptr == cursor.buffs.wrt_ptr);
goto err; goto err;
} }
...@@ -5642,6 +5779,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5642,6 +5779,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
} }
translog_buffer_lock(cursor.buffer); translog_buffer_lock(cursor.buffer);
translog_buffer_decrease_writers(cursor.buffer); translog_buffer_decrease_writers(cursor.buffer);
used_buffs_register_unlock(&cursor.buffs, cursor.buffer);
translog_buffer_unlock(cursor.buffer); translog_buffer_unlock(cursor.buffer);
translog_lock(); translog_lock();
...@@ -5656,6 +5794,11 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5656,6 +5794,11 @@ translog_write_variable_record_mgroup(LSN *lsn,
first_page= translog_get_current_page_rest(); first_page= translog_get_current_page_rest();
} }
buffer_rest= translog_get_current_group_size(); buffer_rest= translog_get_current_group_size();
if (buffer_to_flush)
used_buffs_register_unlock(&cursor.buffs,
buffer_to_flush); // will be unlocked
} while ((translog_size_t)(first_page + buffer_rest) < } while ((translog_size_t)(first_page + buffer_rest) <
(translog_size_t)(parts->record_length - done)); (translog_size_t)(parts->record_length - done));
...@@ -5751,17 +5894,21 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5751,17 +5894,21 @@ translog_write_variable_record_mgroup(LSN *lsn,
(ulong) full_pages * (ulong) full_pages *
log_descriptor.page_capacity_chunk_2, log_descriptor.page_capacity_chunk_2,
chunk3_pages, (uint) chunk3_size, (uint) record_rest)); chunk3_pages, (uint) chunk3_size, (uint) record_rest));
DBUG_ASSERT(cursor.buffs.unlck_ptr == cursor.buffs.wrt_ptr);
rc= translog_advance_pointer(pages_to_skip + (int)(chunk0_pages - 1), rc= translog_advance_pointer(pages_to_skip + (int)(chunk0_pages - 1),
record_rest + header_fixed_part + record_rest + header_fixed_part +
(groups.elements - (groups.elements -
((page_capacity - ((page_capacity -
header_fixed_part) / (7 + 1)) * header_fixed_part) / (7 + 1)) *
(chunk0_pages - 1)) * (7 + 1)); (chunk0_pages - 1)) * (7 + 1),
&cursor.buffs);
buffer_of_last_lsn= log_descriptor.bc.buffer; buffer_of_last_lsn= log_descriptor.bc.buffer;
translog_unlock(); translog_unlock();
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
{ {
DBUG_ASSERT(!external_buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush); translog_buffer_decrease_writers(buffer_to_flush);
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
...@@ -5925,8 +6072,10 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5925,8 +6072,10 @@ translog_write_variable_record_mgroup(LSN *lsn,
} while (chunk0_pages != 0); } while (chunk0_pages != 0);
translog_buffer_lock(cursor.buffer); translog_buffer_lock(cursor.buffer);
translog_buffer_decrease_writers(cursor.buffer); translog_buffer_decrease_writers(cursor.buffer);
used_buffs_register_unlock(&cursor.buffs, cursor.buffer);
translog_buffer_unlock(cursor.buffer); translog_buffer_unlock(cursor.buffer);
rc= 0; rc= 0;
DBUG_ASSERT(cursor.buffs.unlck_ptr == cursor.buffs.wrt_ptr);
if (translog_set_lsn_for_files(file_of_the_first_group, LSN_FILE_NO(*lsn), if (translog_set_lsn_for_files(file_of_the_first_group, LSN_FILE_NO(*lsn),
*lsn, FALSE)) *lsn, FALSE))
...@@ -5935,16 +6084,21 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5935,16 +6084,21 @@ translog_write_variable_record_mgroup(LSN *lsn,
translog_mark_file_finished(file_of_the_first_group); translog_mark_file_finished(file_of_the_first_group);
delete_dynamic(&groups); delete_dynamic(&groups);
DBUG_RETURN(rc); DBUG_RETURN(0);
err_unlock: err_unlock:
translog_unlock(); translog_unlock();
err: err:
if (cursor.buffs.unlck_ptr != cursor.buffs.wrt_ptr)
used_buffs_urgent_unlock(&cursor.buffs);
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
{ {
/* This is to prevent locking buffer forever in case of error */ /* This is to prevent locking buffer forever in case of error */
if (!external_buffer_to_flush)
translog_buffer_decrease_writers(buffer_to_flush); translog_buffer_decrease_writers(buffer_to_flush);
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
...@@ -7500,7 +7654,8 @@ static void translog_force_current_buffer_to_finish() ...@@ -7500,7 +7654,8 @@ static void translog_force_current_buffer_to_finish()
DBUG_ASSERT(log_descriptor.bc.ptr !=NULL); DBUG_ASSERT(log_descriptor.bc.ptr !=NULL);
DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) == DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) ==
LSN_FILE_NO(old_buffer->offset)); LSN_FILE_NO(old_buffer->offset) ||
translog_status == TRANSLOG_READONLY );
translog_check_cursor(&log_descriptor.bc); translog_check_cursor(&log_descriptor.bc);
DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE); DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE);
if (left) if (left)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment