Commit c2b840be authored by unknown's avatar unknown

Multigroup record write fixed.

Memory leak fixed.
Some other small cahnges.


mysql-test/include/wait_until_connected_again.inc:
  increased time for recovery (to be able use with --debug)
mysql-test/r/maria-recovery.result:
  Result fixed.
storage/maria/ma_loghandler.c:
  Multigroup record write fixed.
  Function for skipping to the next page while filling buffer with
    unlocked handler added.
  Removed possible memory leaks.
  More debug.
storage/maria/ma_recovery.c:
  Memory leak fixed.
parent 2a9d6a0c
......@@ -4,7 +4,7 @@
# You should have done --enable_reconnect first
--disable_result_log
--disable_query_log
let $counter= 500;
let $counter= 5000;
let $mysql_errno= 1;
while ($mysql_errno)
{
......
......@@ -273,7 +273,7 @@ drop table t1;
* shut down mysqld, removed logs, restarted it
use mysqltest;
set @@max_allowed_packet=32000000;
create table t1 (a int, b longtext) engine=maria;
create table t1 (a int, b longtext) engine=maria table_checksum=1;
* copied t1 for feeding_recovery
insert into t1 values (1,"123456789012345678901234567890"),(2,"09876543210987654321");
flush table t1;
......
......@@ -1588,10 +1588,12 @@ static void translog_new_page_header(TRANSLOG_ADDRESS *horizon,
cursor->buffer->size+= len;
}
cursor->ptr= ptr;
DBUG_PRINT("info", ("NewP buffer #%u: 0x%lx chaser: %d Size: %lu (%lu)",
DBUG_PRINT("info", ("NewP buffer #%u: 0x%lx chaser: %d Size: %lu (%lu) "
"Horizon: (%lu,0x%lu)",
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
cursor->chaser, (ulong) cursor->buffer->size,
(ulong) (cursor->ptr - cursor->buffer->buffer)));
(ulong) (cursor->ptr - cursor->buffer->buffer),
LSN_IN_PARTS(*horizon)));
translog_check_cursor(cursor);
DBUG_VOID_RETURN;
}
......@@ -2024,10 +2026,12 @@ static void translog_set_sent_to_disk(LSN lsn, TRANSLOG_ADDRESS in_buffers)
DBUG_ENTER("translog_set_sent_to_disk");
pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
DBUG_PRINT("enter", ("lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx) "
"in_buffers_only: (%lu,0x%lx)",
"in_buffers_only: (%lu,0x%lx) "
"sent_to_disk: (%lu,0x%lx)",
LSN_IN_PARTS(lsn),
LSN_IN_PARTS(in_buffers),
LSN_IN_PARTS(log_descriptor.in_buffers_only)));
LSN_IN_PARTS(log_descriptor.in_buffers_only),
LSN_IN_PARTS(log_descriptor.sent_to_disk)));
DBUG_ASSERT(cmp_translog_addr(lsn, log_descriptor.sent_to_disk) >= 0);
log_descriptor.sent_to_disk= lsn;
/* LSN_IMPOSSIBLE == 0 => it will work for very first time */
......@@ -2404,7 +2408,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
{
DBUG_PRINT("error",
("Can't write page (%lu,0x%lx) to pagecache, error: %d",
(ulong) buffer->file,
(ulong) buffer->file->number,
(ulong) (LSN_OFFSET(buffer->offset)+ i),
my_errno));
translog_stop_writing();
......@@ -3739,7 +3743,10 @@ my_bool translog_init_with_table(const char *directory,
if (readonly)
log_descriptor.horizon= last_lsn;
else if (translog_truncate_log(last_lsn))
{
translog_free_record_header(&rec);
DBUG_RETURN(1);
}
}
else
{
......@@ -3761,10 +3768,14 @@ my_bool translog_init_with_table(const char *directory,
if (readonly)
log_descriptor.horizon= last_lsn;
else if (translog_truncate_log(last_lsn))
{
translog_free_record_header(&rec);
DBUG_RETURN(1);
}
}
}
}
translog_free_record_header(&rec);
}
}
......@@ -4147,6 +4158,36 @@ static void translog_buffer_decrease_writers(struct st_translog_buffer *buffer)
}
/**
@brief Skip to the next page for chaser (thread which advanced horizon
pointer and now feeling the buffer)
@param horizon \ Pointers on file position and buffer
@param cursor /
@retval 1 OK
@retval 0 Error
*/
static my_bool translog_chaser_page_next(TRANSLOG_ADDRESS *horizon,
struct st_buffer_cursor *cursor)
{
struct st_translog_buffer *buffer_to_flush;
my_bool rc;
DBUG_ENTER("translog_chaser_page_next");
DBUG_ASSERT(cursor->chaser);
rc= translog_page_next(horizon, cursor, &buffer_to_flush);
if (buffer_to_flush != NULL)
{
translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush);
if (!rc)
rc= translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush);
}
DBUG_RETURN(rc);
}
/*
Put chunk 2 from new page beginning
......@@ -4166,22 +4207,11 @@ translog_write_variable_record_chunk2_page(struct st_translog_parts *parts,
TRANSLOG_ADDRESS *horizon,
struct st_buffer_cursor *cursor)
{
struct st_translog_buffer *buffer_to_flush;
int rc;
uchar chunk2_header[1];
DBUG_ENTER("translog_write_variable_record_chunk2_page");
chunk2_header[0]= TRANSLOG_CHUNK_NOHDR;
rc= translog_page_next(horizon, cursor, &buffer_to_flush);
if (buffer_to_flush != NULL)
{
translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush);
if (!rc)
rc= translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush);
}
if (rc)
if (translog_chaser_page_next(horizon, cursor))
DBUG_RETURN(1);
/* Puts chunk type */
......@@ -4214,23 +4244,13 @@ translog_write_variable_record_chunk3_page(struct st_translog_parts *parts,
TRANSLOG_ADDRESS *horizon,
struct st_buffer_cursor *cursor)
{
struct st_translog_buffer *buffer_to_flush;
LEX_STRING *part;
int rc;
uchar chunk3_header[1 + 2];
DBUG_ENTER("translog_write_variable_record_chunk3_page");
rc= translog_page_next(horizon, cursor, &buffer_to_flush);
if (buffer_to_flush != NULL)
{
translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush);
if (!rc)
rc= translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush);
}
if (rc)
if (translog_chaser_page_next(horizon, cursor))
DBUG_RETURN(1);
if (length == 0)
{
/* It was call to write page header only (no data for chunk 3) */
......@@ -4540,7 +4560,7 @@ translog_write_variable_record_1group(LSN *lsn,
*lsn= horizon= log_descriptor.horizon;
if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn),
*lsn, TRUE) ||
*lsn, TRUE) ||
(log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info,
lsn, hook_arg)))
......@@ -5043,6 +5063,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
uint header_fixed_part= header_length + 2;
uint groups_per_page= (page_capacity - header_fixed_part) / (7 + 1);
uint file_of_the_first_group;
int pages_to_skip;
DBUG_ENTER("translog_write_variable_record_mgroup");
translog_lock_assert_owner();
......@@ -5117,7 +5138,6 @@ translog_write_variable_record_mgroup(LSN *lsn,
if (buffer_to_flush != NULL)
{
translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush);
if (!rc)
rc= translog_buffer_flush(buffer_to_flush);
......@@ -5156,18 +5176,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
done+= (first_page - 1 + buffer_rest);
/* TODO: make separate function for following */
rc= translog_page_next(&horizon, &cursor, &buffer_to_flush);
if (buffer_to_flush != NULL)
{
translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush);
if (!rc)
rc= translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush);
buffer_to_flush= NULL;
}
if (rc)
if (translog_chaser_page_next(&horizon, &cursor))
{
DBUG_PRINT("error", ("flush of unlock buffer failed"));
goto err;
......@@ -5178,9 +5187,18 @@ translog_write_variable_record_mgroup(LSN *lsn,
translog_lock();
/* Check that we have place for chunk type 2 */
first_page= translog_get_current_page_rest();
if (first_page <= 1)
{
if (translog_page_next(&log_descriptor.horizon, &log_descriptor.bc,
&buffer_to_flush))
goto err_unlock;
first_page= translog_get_current_page_rest();
}
buffer_rest= translog_get_current_group_size();
} while (first_page + buffer_rest < (uint) (parts->record_length - done));
} while ((translog_size_t)(first_page + buffer_rest) <
(translog_size_t)(parts->record_length - done));
group.addr= horizon= log_descriptor.horizon;
cursor= log_descriptor.bc;
......@@ -5193,19 +5211,39 @@ translog_write_variable_record_mgroup(LSN *lsn,
}
record_rest= parts->record_length - done;
DBUG_PRINT("info", ("Record rest: %lu", (ulong) record_rest));
if (first_page <= record_rest + 1)
if (first_page > record_rest + 1)
{
/*
We have not so much data to fill all first page
(no speaking about full pages)
so it will be:
<chunk0 <data>>
or
<chunk0>...<chunk0><chunk0 <data>>
or
<chunk3 <data>><chunk0>...<chunk0><chunk0 <possible data of 1 byte>>
*/
chunk2_page= full_pages= 0;
last_page_capacity= first_page;
pages_to_skip= -1;
}
else
{
/*
We will have:
<chunk2 <data>>...<chunk2 <data>><chunk0 <data>>
or
<chunk2 <data>>...<chunk2 <data>><chunk0>...<chunk0><chunk0 <data>>
or
<chunk3 <data>><chunk0>...<chunk0><chunk0 <possible data of 1 byte>>
*/
chunk2_page= 1;
record_rest-= (first_page - 1);
full_pages= record_rest / log_descriptor.page_capacity_chunk_2;
pages_to_skip= full_pages=
record_rest / log_descriptor.page_capacity_chunk_2;
record_rest= (record_rest % log_descriptor.page_capacity_chunk_2);
last_page_capacity= page_capacity;
}
else
{
chunk2_page= full_pages= 0;
last_page_capacity= first_page;
}
chunk3_size= 0;
chunk3_pages= 0;
if (last_page_capacity > record_rest + 1 && record_rest != 0)
......@@ -5218,6 +5256,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
}
else
{
pages_to_skip++;
chunk3_pages= 1;
if (record_rest + 2 == last_page_capacity)
{
......@@ -5253,16 +5292,28 @@ translog_write_variable_record_mgroup(LSN *lsn,
(ulong) full_pages *
log_descriptor.page_capacity_chunk_2,
chunk3_pages, (uint) chunk3_size, (uint) record_rest));
rc= translog_advance_pointer((int)(full_pages + chunk3_pages +
(chunk0_pages - 1)) +
(full_pages + chunk3_pages + chunk0_pages +
chunk2_page == 1? -1 : 0),
rc= translog_advance_pointer(pages_to_skip + (int)(chunk0_pages - 1),
record_rest + header_fixed_part +
(groups.elements -
((page_capacity -
header_fixed_part) / (7 + 1)) *
(chunk0_pages - 1)) * (7 + 1));
translog_unlock();
if (buffer_to_flush != NULL)
{
translog_buffer_decrease_writers(buffer_to_flush);
if (!rc)
rc= translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush);
buffer_to_flush= NULL;
}
if (rc)
{
DBUG_PRINT("error", ("flush of unlock buffer failed"));
goto err;
}
if (rc)
goto err;
......@@ -5332,23 +5383,11 @@ translog_write_variable_record_mgroup(LSN *lsn,
do
{
int limit;
if (new_page_before_chunk0)
if (new_page_before_chunk0 &&
translog_chaser_page_next(&horizon, &cursor))
{
rc= translog_page_next(&horizon, &cursor, &buffer_to_flush);
if (buffer_to_flush != NULL)
{
translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush);
if (!rc)
rc= translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush);
buffer_to_flush= NULL;
}
if (rc)
{
DBUG_PRINT("error", ("flush of unlock buffer failed"));
goto err;
}
DBUG_PRINT("error", ("flush of unlock buffer failed"));
goto err;
}
new_page_before_chunk0= 1;
......@@ -5429,6 +5468,16 @@ translog_write_variable_record_mgroup(LSN *lsn,
translog_unlock();
err:
if (buffer_to_flush != NULL)
{
/* This is to prevent locking buffer forever in case of error */
translog_buffer_decrease_writers(buffer_to_flush);
if (!rc)
rc= translog_buffer_flush(buffer_to_flush);
translog_buffer_unlock(buffer_to_flush);
buffer_to_flush= NULL;
}
translog_mark_file_finished(file_of_the_first_group);
......@@ -6292,6 +6341,7 @@ translog_variable_length_header(uchar *page, translog_size_t page_offset,
uint16 length= desc->read_header_len;
uint16 buffer_length= length;
uint16 body_len;
int rc;
TRANSLOG_SCANNER_DATA internal_scanner;
DBUG_ENTER("translog_variable_length_header");
......@@ -6376,19 +6426,24 @@ translog_variable_length_header(uchar *page, translog_size_t page_offset,
DBUG_PRINT("info", ("use internal scanner for header reading"));
scanner= &internal_scanner;
if (translog_scanner_init(buff->lsn, 1, scanner, 0))
DBUG_RETURN(RECHEADER_READ_ERROR);
{
rc= RECHEADER_READ_ERROR;
goto exit_and_free;
}
}
if (translog_get_next_chunk(scanner))
{
if (scanner == &internal_scanner)
translog_destroy_scanner(scanner);
DBUG_RETURN(RECHEADER_READ_ERROR);
rc= RECHEADER_READ_ERROR;
goto exit_and_free;
}
if (scanner->page == END_OF_LOG)
{
if (scanner == &internal_scanner)
translog_destroy_scanner(scanner);
DBUG_RETURN(RECHEADER_READ_EOF);
rc= RECHEADER_READ_EOF;
goto exit_and_free;
}
page= scanner->page;
page_offset= scanner->page_offset;
......@@ -6445,6 +6500,11 @@ translog_variable_length_header(uchar *page, translog_size_t page_offset,
buff->non_header_data_start_offset,
buff->non_header_data_len, buffer_length));
DBUG_RETURN(buffer_length);
exit_and_free:
my_free(buff->groups, MYF(0));
buff->groups_no= 0; /* prevent try to use of buff->groups */
DBUG_RETURN(rc);
}
......@@ -6808,6 +6868,7 @@ static my_bool translog_init_reader_data(LSN lsn,
static void translog_destroy_reader_data(TRANSLOG_READER_DATA *data)
{
translog_destroy_scanner(&data->scanner);
translog_free_record_header(&data->header);
}
......
......@@ -2335,6 +2335,8 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
{
tprint(tracef, "Cannot read record's body: read %u of"
" %u bytes\n", read_len, rec2.record_length);
translog_destroy_scanner(&scanner2);
translog_free_record_header(&rec2);
goto err;
}
}
......@@ -2342,23 +2344,27 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
display_and_apply_record(log_desc2, &rec2))
{
translog_destroy_scanner(&scanner2);
translog_free_record_header(&rec2);
goto err;
}
}
translog_free_record_header(&rec2);
len= translog_read_next_record_header(&scanner2, &rec2);
if (len < 0) /* EOF or error */
{
tprint(tracef, "Cannot find record where it should be\n");
translog_destroy_scanner(&scanner2);
translog_free_record_header(&rec2);
goto err;
}
}
while (rec2.lsn < rec.lsn);
translog_free_record_header(&rec2);
/* group finished */
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
display_record_position(log_desc, &rec, 0);
translog_destroy_scanner(&scanner2);
translog_free_record_header(&rec2);
}
}
if (apply == MARIA_LOG_APPLY &&
......@@ -2377,6 +2383,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
all_active_trans[sid].group_start_lsn= rec.lsn;
}
}
translog_free_record_header(&rec);
len= translog_read_next_record_header(&scanner, &rec);
if (len < 0)
{
......@@ -2403,6 +2410,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
err:
translog_destroy_scanner(&scanner);
translog_free_record_header(&rec);
return 1;
}
......@@ -2559,8 +2567,10 @@ static int run_undo_phase(uint uncommitted)
{
eprint(tracef, "Got error %d when executing undo %s", my_errno,
log_desc->name);
translog_free_record_header(&rec);
DBUG_RETURN(1);
}
translog_free_record_header(&rec);
}
if (trnman_rollback_trn(trn))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment