Commit 684651ce authored by marko's avatar marko

branches/zip: Implement the crash recovery of MLOG_ZIP_WRITE_NODE_PTR.

page_zip_parse_write_node_ptr(): New function to apply a redo log of
MLOG_ZIP_WRITE_NODE_PTR.

page_zip_write_node_ptr(): Write all needed information to the redo log.

page_zip_write_header_log(): Write all necessary information to the redo log.
parent f4c776f5
......@@ -134,6 +134,18 @@ page_zip_write_blob_ptr(
or NULL if no logging is needed */
__attribute__((nonnull(1,2,3,4)));
/***************************************************************
Parses a log record of writing the node pointer of a record. */
byte*
page_zip_parse_write_node_ptr(
/*==========================*/
/* out: end of log record or NULL */
byte* ptr, /* in: redo log buffer */
byte* end_ptr,/* in: redo log buffer end */
page_t* page, /* in/out: uncompressed page */
page_zip_des_t* page_zip);/* in/out: compressed page */
/**************************************************************************
Write the node pointer of a record on a non-leaf compressed page. */
......
......@@ -894,6 +894,9 @@ recv_parse_or_apply_log_rec_body(
ULINT_UNDEFINED);
break;
case MLOG_ZIP_WRITE_NODE_PTR:
ptr = page_zip_parse_write_node_ptr(
ptr, end_ptr, page, page_zip);
break;
case MLOG_ZIP_WRITE_HEADER:
ut_error; /* TODO */
break;
......
......@@ -19,6 +19,7 @@ Created June 2005 by Marko Makela
#include "dict0dict.h"
#include "btr0cur.h"
#include "page0types.h"
#include "log0recv.h"
#include "zlib.h"
/* Please refer to ../include/page0zip.ic for a description of the
......@@ -2220,6 +2221,85 @@ page_zip_write_blob_ptr(
}
}
/***************************************************************
Parses a log record of writing the node pointer of a record. */
byte*
page_zip_parse_write_node_ptr(
/*==========================*/
/* out: end of log record or NULL */
byte* ptr, /* in: redo log buffer */
byte* end_ptr,/* in: redo log buffer end */
page_t* page, /* in/out: uncompressed page */
page_zip_des_t* page_zip)/* in/out: compressed page */
{
ulint offset;
ulint z_offset;
ut_ad(!page == !page_zip);
if (UNIV_UNLIKELY(end_ptr < ptr + (2 + 2 + 4))) {
return(NULL);
}
offset = mach_read_from_2(ptr);
z_offset = mach_read_from_2(ptr + 2);
if (UNIV_UNLIKELY(offset < PAGE_ZIP_START)
|| UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(z_offset >= UNIV_PAGE_SIZE)) {
corrupt:
recv_sys->found_corrupt_log = TRUE;
return(NULL);
}
if (page) {
byte* storage_end;
byte* field;
byte* storage;
ulint heap_no;
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
if (UNIV_UNLIKELY(page_is_leaf(page))) {
goto corrupt;
}
field = page + offset;
storage = page_zip->data + z_offset;
storage_end = page_zip->data + page_zip->size
- (page_dir_get_n_heap(page) - 2)
* PAGE_ZIP_DIR_SLOT_SIZE;
heap_no = 1 + (storage_end - storage) / REC_NODE_PTR_SIZE;
if (UNIV_UNLIKELY((storage_end - storage) % REC_NODE_PTR_SIZE)
|| UNIV_UNLIKELY(heap_no < 2)
|| UNIV_UNLIKELY(heap_no >= page_dir_get_n_heap(page))) {
goto corrupt;
}
#if REC_NODE_PTR_SIZE != 4
# error "REC_NODE_PTR_SIZE != 4"
#endif
memcpy(field, ptr + 4, REC_NODE_PTR_SIZE);
memcpy(storage, ptr + 4, REC_NODE_PTR_SIZE);
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
}
return(ptr + 6);
}
/**************************************************************************
Write the node pointer of a record on a non-leaf compressed page. */
......@@ -2263,8 +2343,18 @@ page_zip_write_node_ptr(
memcpy(storage, field, REC_NODE_PTR_SIZE);
if (mtr) {
mlog_write_initial_log_record(
rec, MLOG_ZIP_WRITE_NODE_PTR, mtr);
byte* log_ptr = mlog_open(mtr, 11 + 4 + 2);
if (UNIV_UNLIKELY(!log_ptr)) {
return;
}
log_ptr = mlog_write_initial_log_record_fast(field,
MLOG_ZIP_WRITE_NODE_PTR, log_ptr, mtr);
mach_write_to_2(log_ptr, storage - page_zip->data);
log_ptr += 2;
mach_write_to_4(log_ptr, ptr);
log_ptr += 4;
mlog_close(mtr, log_ptr + 6);
}
}
......@@ -2633,7 +2723,7 @@ page_zip_write_header_log(
ulint length, /* in: length of the data */
mtr_t* mtr) /* in: mini-transaction */
{
byte* log_ptr = mlog_open(mtr, 11 + 2 + 1);
byte* log_ptr = mlog_open(mtr, 11 + 1);
ut_ad(offset < PAGE_DATA);
ut_ad(offset + length < PAGE_DATA);
#if PAGE_DATA > 255
......@@ -2649,10 +2739,7 @@ page_zip_write_header_log(
log_ptr = mlog_write_initial_log_record_fast(page_zip->data + offset,
MLOG_ZIP_WRITE_HEADER, log_ptr, mtr);
mach_write_to_2(log_ptr, offset);
log_ptr += 2;
mach_write_to_1(log_ptr, length);
*log_ptr++ = (byte) length;
mlog_close(mtr, log_ptr);
mlog_catenate_string(mtr, page_zip->data + offset, length);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment