Commit 441c29e9 authored by marko's avatar marko

branches/zip: Fix some crash recovery bugs.

dict_load_table(): Initialize table->flags with zip_size.

mlog_parse_nbytes(), mlog_parse_string(): Add parameter page_zip and
write the changes also to the compressed page if one is specified.
Assert that these functions are not called on FIL_PAGE_INDEX pages.

buf_page_io_complete(): Replace block->frame with frame where appropriate.

recv_parse_or_apply_log_rec_body(): Add ut_a(!page_zip) where appropriate.

page_parse_delete_rec_list(): Add parameter page_zip.
parent e4077d56
......@@ -2002,15 +2002,14 @@ buf_page_io_complete(
/* If this page is not uninitialized and not in the
doublewrite buffer, then the page number and space id
should be the same as in block. */
read_page_no = mach_read_from_4((block->frame)
+ FIL_PAGE_OFFSET);
read_page_no = mach_read_from_4(frame + FIL_PAGE_OFFSET);
switch (fil_page_get_type(frame)) {
case FIL_PAGE_TYPE_ZBLOB:
read_space_id = mach_read_from_4(block->frame
read_space_id = mach_read_from_4(frame
+ FIL_PAGE_ZBLOB_SPACE_ID);
break;
default:
read_space_id = mach_read_from_4(block->frame
read_space_id = mach_read_from_4(frame
+ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
}
......
......@@ -837,7 +837,7 @@ dict_load_table(
/* Check if the tablespace exists and has the right name */
if (space != 0) {
ulint zip_size = dict_sys_tables_get_zip_size(rec);
zip_size = dict_sys_tables_get_zip_size(rec);
ut_a(zip_size != ULINT_UNDEFINED);
if (fil_space_for_table_exists_in_mem(space, name, FALSE,
......@@ -863,6 +863,8 @@ dict_load_table(
ibd_file_missing = TRUE;
}
}
} else {
zip_size = 0;
}
ut_a(0 == ut_strcmp("N_COLS",
......@@ -872,7 +874,7 @@ dict_load_table(
field = rec_get_nth_field_old(rec, 4, &len);
n_cols = mach_read_from_4(field);
flags = 0;
flags = zip_size << DICT_TF_COMPRESSED_SHIFT;
/* The high-order bit of N_COLS is the "compact format" flag. */
if (n_cols & 0x80000000UL) {
......
......@@ -170,7 +170,8 @@ mlog_parse_nbytes(
ulint type, /* in: log record type: MLOG_1BYTE, ... */
byte* ptr, /* in: buffer */
byte* end_ptr,/* in: buffer end */
byte* page); /* in: page where to apply the log record, or NULL */
byte* page, /* in: page where to apply the log record, or NULL */
void* page_zip);/* in/out: compressed page, or NULL */
/************************************************************
Parses a log record written by mlog_write_string. */
......@@ -181,7 +182,8 @@ mlog_parse_string(
record */
byte* ptr, /* in: buffer */
byte* end_ptr,/* in: buffer end */
byte* page); /* in: page where to apply the log record, or NULL */
byte* page, /* in: page where to apply the log record, or NULL */
void* page_zip);/* in/out: compressed page, or NULL */
/************************************************************
......
......@@ -792,7 +792,8 @@ page_parse_delete_rec_list(
byte* ptr, /* in: buffer */
byte* end_ptr,/* in: buffer end */
dict_index_t* index, /* in: record descriptor */
page_t* page, /* in: page or NULL */
page_t* page, /* in/out: page or NULL */
page_zip_des_t* page_zip,/* in/out: compressed page or NULL */
mtr_t* mtr); /* in: mtr or NULL */
/***************************************************************
Parses a redo log record of creating a page. */
......
......@@ -764,8 +764,7 @@ recv_parse_or_apply_log_rec_body(
switch (type) {
case MLOG_1BYTE: case MLOG_2BYTES: case MLOG_4BYTES: case MLOG_8BYTES:
ut_a(!page_zip || fil_page_get_type(page) != FIL_PAGE_INDEX);
ptr = mlog_parse_nbytes(type, ptr, end_ptr, page);
ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip);
break;
case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
if (NULL != (ptr = mlog_parse_index(ptr, end_ptr,
......@@ -791,6 +790,7 @@ recv_parse_or_apply_log_rec_body(
/* This log record type is obsolete, but we process it for
backward compatibility with MySQL 5.0.3 and 5.0.4. */
ut_a(!page || page_is_comp(page));
ut_a(!page_zip);
ptr = mlog_parse_index(ptr, end_ptr, TRUE, &index);
if (!ptr) {
break;
......@@ -819,7 +819,7 @@ recv_parse_or_apply_log_rec_body(
|| (ibool)!!page_is_comp(page)
== dict_table_is_comp(index->table));
ptr = page_parse_delete_rec_list(type, ptr, end_ptr,
index, page, mtr);
index, page, page_zip, mtr);
}
break;
case MLOG_LIST_END_COPY_CREATED: case MLOG_COMP_LIST_END_COPY_CREATED:
......@@ -843,6 +843,7 @@ recv_parse_or_apply_log_rec_body(
}
break;
case MLOG_PAGE_CREATE: case MLOG_COMP_PAGE_CREATE:
ut_a(!page_zip);
ptr = page_parse_create(ptr, end_ptr,
type == MLOG_COMP_PAGE_CREATE,
page, mtr);
......@@ -865,6 +866,7 @@ recv_parse_or_apply_log_rec_body(
page, mtr);
break;
case MLOG_REC_MIN_MARK: case MLOG_COMP_REC_MIN_MARK:
ut_a(!page_zip);
ptr = btr_parse_set_min_rec_mark(ptr, end_ptr,
type == MLOG_COMP_REC_MIN_MARK, page, mtr);
break;
......@@ -886,8 +888,7 @@ recv_parse_or_apply_log_rec_body(
ptr = fsp_parse_init_file_page(ptr, end_ptr, page);
break;
case MLOG_WRITE_STRING:
ut_a(!page_zip || fil_page_get_type(page) != FIL_PAGE_INDEX);
ptr = mlog_parse_string(ptr, end_ptr, page);
ptr = mlog_parse_string(ptr, end_ptr, page, page_zip);
break;
case MLOG_FILE_CREATE:
case MLOG_FILE_RENAME:
......
......@@ -128,13 +128,15 @@ mlog_parse_nbytes(
ulint type, /* in: log record type: MLOG_1BYTE, ... */
byte* ptr, /* in: buffer */
byte* end_ptr,/* in: buffer end */
byte* page) /* in: page where to apply the log record, or NULL */
byte* page, /* in: page where to apply the log record, or NULL */
void* page_zip)/* in/out: compressed page, or NULL */
{
ulint offset;
ulint val;
dulint dval;
ut_a(type <= MLOG_8BYTES);
ut_a(!page || !page_zip || fil_page_get_type(page) != FIL_PAGE_INDEX);
if (end_ptr < ptr + 2) {
......@@ -159,6 +161,11 @@ mlog_parse_nbytes(
}
if (page) {
if (UNIV_LIKELY_NULL(page_zip)) {
mach_write_to_8(
((page_zip_des_t*) page_zip)->data
+ offset, dval);
}
mach_write_to_8(page + offset, dval);
}
......@@ -172,35 +179,47 @@ mlog_parse_nbytes(
return(NULL);
}
if (type == MLOG_1BYTE) {
if (val > 0xFFUL) {
recv_sys->found_corrupt_log = TRUE;
return(NULL);
switch (type) {
case MLOG_1BYTE:
if (UNIV_UNLIKELY(val > 0xFFUL)) {
goto corrupt;
}
} else if (type == MLOG_2BYTES) {
if (val > 0xFFFFUL) {
recv_sys->found_corrupt_log = TRUE;
return(NULL);
if (page) {
if (UNIV_LIKELY_NULL(page_zip)) {
mach_write_to_1(
((page_zip_des_t*) page_zip)->data
+ offset, val);
}
} else {
if (type != MLOG_4BYTES) {
recv_sys->found_corrupt_log = TRUE;
return(NULL);
mach_write_to_1(page + offset, val);
}
break;
case MLOG_2BYTES:
if (UNIV_UNLIKELY(val > 0xFFFFUL)) {
goto corrupt;
}
if (page) {
if (type == MLOG_1BYTE) {
mach_write_to_1(page + offset, val);
} else if (type == MLOG_2BYTES) {
if (UNIV_LIKELY_NULL(page_zip)) {
mach_write_to_2(
((page_zip_des_t*) page_zip)->data
+ offset, val);
}
mach_write_to_2(page + offset, val);
} else {
ut_a(type == MLOG_4BYTES);
}
break;
case MLOG_4BYTES:
if (page) {
if (UNIV_LIKELY_NULL(page_zip)) {
mach_write_to_4(
((page_zip_des_t*) page_zip)->data
+ offset, val);
}
mach_write_to_4(page + offset, val);
}
break;
default:
corrupt:
recv_sys->found_corrupt_log = TRUE;
ptr = NULL;
}
return(ptr);
......@@ -374,11 +393,14 @@ mlog_parse_string(
record */
byte* ptr, /* in: buffer */
byte* end_ptr,/* in: buffer end */
byte* page) /* in: page where to apply the log record, or NULL */
byte* page, /* in: page where to apply the log record, or NULL */
void* page_zip)/* in/out: compressed page, or NULL */
{
ulint offset;
ulint len;
ut_a(!page || !page_zip || fil_page_get_type(page) != FIL_PAGE_INDEX);
if (end_ptr < ptr + 4) {
return(NULL);
......@@ -386,25 +408,27 @@ mlog_parse_string(
offset = mach_read_from_2(ptr);
ptr += 2;
len = mach_read_from_2(ptr);
ptr += 2;
if (offset >= UNIV_PAGE_SIZE) {
if (UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(len + offset) > UNIV_PAGE_SIZE) {
recv_sys->found_corrupt_log = TRUE;
return(NULL);
}
len = mach_read_from_2(ptr);
ptr += 2;
ut_a(len + offset <= UNIV_PAGE_SIZE);
if (end_ptr < ptr + len) {
return(NULL);
}
if (page) {
ut_memcpy(page + offset, ptr, len);
if (UNIV_LIKELY_NULL(page_zip)) {
memcpy(((page_zip_des_t*) page_zip)->data
+ offset, ptr, len);
}
memcpy(page + offset, ptr, len);
}
return(ptr + len);
......
......@@ -800,11 +800,11 @@ page_parse_delete_rec_list(
byte* ptr, /* in: buffer */
byte* end_ptr,/* in: buffer end */
dict_index_t* index, /* in: record descriptor */
page_t* page, /* in: page or NULL */
page_t* page, /* in/out: page or NULL */
page_zip_des_t* page_zip,/* in/out: compressed page or NULL */
mtr_t* mtr) /* in: mtr or NULL */
{
ulint offset;
page_zip_des_t* page_zip;
ut_ad(type == MLOG_LIST_END_DELETE
|| type == MLOG_LIST_START_DELETE
......@@ -828,8 +828,6 @@ page_parse_delete_rec_list(
ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
page_zip = buf_block_get_page_zip(buf_block_align(page));
if (type == MLOG_LIST_END_DELETE
|| type == MLOG_COMP_LIST_END_DELETE) {
page_delete_rec_list_end(page + offset, index,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment