Commit 1f8b0f38 authored by marko's avatar marko

branches/zip: Implement crash recovery of writing BLOB pointers.

page_zip_parse_write_blob_ptr(): New function for applying the redo log
record MLOG_ZIP_WRITE_BLOB_PTR.

page_zip_write_blob_ptr(): Write the necessary information to the redo log.

page0zip.c: Tighten the assertions to ensure that blob_ptr < page_zip->n_blobs.

page_zip_write_node_ptr(): Use memcpy() instead of mach_write_to_4().
parent 817ca61f
...@@ -117,8 +117,20 @@ page_zip_write_rec( ...@@ -117,8 +117,20 @@ page_zip_write_rec(
ulint create) /* in: nonzero=insert, zero=update */ ulint create) /* in: nonzero=insert, zero=update */
__attribute__((nonnull)); __attribute__((nonnull));
/***************************************************************
Parses a log record of writing a BLOB pointer of a record. */
byte*
page_zip_parse_write_blob_ptr(
/*==========================*/
/* out: end of log record or NULL */
byte* ptr, /* in: redo log buffer */
byte* end_ptr,/* in: redo log buffer end */
page_t* page, /* in/out: uncompressed page */
page_zip_des_t* page_zip);/* in/out: compressed page */
/************************************************************************** /**************************************************************************
Write the BLOB pointer of a record on the leaf page of a clustered index. Write a BLOB pointer of a record on the leaf page of a clustered index.
The information must already have been updated on the uncompressed page. */ The information must already have been updated on the uncompressed page. */
void void
......
...@@ -897,6 +897,10 @@ recv_parse_or_apply_log_rec_body( ...@@ -897,6 +897,10 @@ recv_parse_or_apply_log_rec_body(
ptr = page_zip_parse_write_node_ptr( ptr = page_zip_parse_write_node_ptr(
ptr, end_ptr, page, page_zip); ptr, end_ptr, page, page_zip);
break; break;
case MLOG_ZIP_WRITE_BLOB_PTR:
ptr = page_zip_parse_write_blob_ptr(
ptr, end_ptr, page, page_zip);
break;
case MLOG_ZIP_WRITE_HEADER: case MLOG_ZIP_WRITE_HEADER:
ptr = page_zip_parse_write_header( ptr = page_zip_parse_write_header(
ptr, end_ptr, page, page_zip); ptr, end_ptr, page, page_zip);
......
...@@ -2043,7 +2043,7 @@ page_zip_write_rec( ...@@ -2043,7 +2043,7 @@ page_zip_write_rec(
page_zip, rec, index); page_zip, rec, index);
byte* ext_end = externs - page_zip->n_blobs byte* ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE; * BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(blob_no <= page_zip->n_blobs); ut_ad(blob_no < page_zip->n_blobs);
externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE; externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
if (create) { if (create) {
...@@ -2058,7 +2058,7 @@ page_zip_write_rec( ...@@ -2058,7 +2058,7 @@ page_zip_write_rec(
externs - ext_end); externs - ext_end);
} }
ut_a(blob_no + n_ext <= page_zip->n_blobs); ut_a(blob_no + n_ext < page_zip->n_blobs);
} }
/* Store separately trx_id, roll_ptr and /* Store separately trx_id, roll_ptr and
...@@ -2153,8 +2153,66 @@ page_zip_write_rec( ...@@ -2153,8 +2153,66 @@ page_zip_write_rec(
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */ #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
} }
/***************************************************************
Parses a log record of writing a BLOB pointer of a record. */
byte*
page_zip_parse_write_blob_ptr(
/*==========================*/
/* out: end of log record or NULL */
byte* ptr, /* in: redo log buffer */
byte* end_ptr,/* in: redo log buffer end */
page_t* page, /* in/out: uncompressed page */
page_zip_des_t* page_zip)/* in/out: compressed page */
{
ulint offset;
ulint z_offset;
ut_ad(!page == !page_zip);
if (UNIV_UNLIKELY(end_ptr < ptr + (2 + 2 + BTR_EXTERN_FIELD_REF_SIZE))) {
return(NULL);
}
offset = mach_read_from_2(ptr);
z_offset = mach_read_from_2(ptr + 2);
if (UNIV_UNLIKELY(offset < PAGE_ZIP_START)
|| UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(z_offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(!page_zip)) {
corrupt:
recv_sys->found_corrupt_log = TRUE;
return(NULL);
}
if (page) {
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
if (UNIV_UNLIKELY(!page_is_leaf(page))) {
goto corrupt;
}
memcpy(page + offset,
ptr + 4, BTR_EXTERN_FIELD_REF_SIZE);
memcpy(page_zip->data + z_offset,
ptr + 4, BTR_EXTERN_FIELD_REF_SIZE);
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
}
return(ptr + (2 + 2 + BTR_EXTERN_FIELD_REF_SIZE));
}
/************************************************************************** /**************************************************************************
Write the BLOB pointer of a record on the leaf page of a clustered index. Write a BLOB pointer of a record on the leaf page of a clustered index.
The information must already have been updated on the uncompressed page. */ The information must already have been updated on the uncompressed page. */
void void
...@@ -2189,7 +2247,7 @@ page_zip_write_blob_ptr( ...@@ -2189,7 +2247,7 @@ page_zip_write_blob_ptr(
blob_no = page_zip_get_n_prev_extern(page_zip, rec, index) blob_no = page_zip_get_n_prev_extern(page_zip, rec, index)
+ rec_get_n_extern_new(rec, index, n); + rec_get_n_extern_new(rec, index, n);
ut_a(blob_no <= page_zip->n_blobs); ut_a(blob_no < page_zip->n_blobs);
/* The heap number of the first user record is 2. */ /* The heap number of the first user record is 2. */
if (dict_index_is_clust(index)) { if (dict_index_is_clust(index)) {
...@@ -2205,9 +2263,10 @@ page_zip_write_blob_ptr( ...@@ -2205,9 +2263,10 @@ page_zip_write_blob_ptr(
field = rec_get_nth_field((rec_t*) rec, offsets, n, &len); field = rec_get_nth_field((rec_t*) rec, offsets, n, &len);
memcpy(externs - (blob_no + 1) * BTR_EXTERN_FIELD_REF_SIZE, externs -= (blob_no + 1) * BTR_EXTERN_FIELD_REF_SIZE;
field + len - BTR_EXTERN_FIELD_REF_SIZE, field += len - BTR_EXTERN_FIELD_REF_SIZE;
BTR_EXTERN_FIELD_REF_SIZE);
memcpy(externs, field, BTR_EXTERN_FIELD_REF_SIZE);
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, ut_a(page_zip_validate(page_zip,
...@@ -2215,9 +2274,19 @@ page_zip_write_blob_ptr( ...@@ -2215,9 +2274,19 @@ page_zip_write_blob_ptr(
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */ #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
if (mtr) { if (mtr) {
mlog_write_initial_log_record( byte* log_ptr = mlog_open(mtr,
(rec_t*) rec, MLOG_ZIP_WRITE_BLOB_PTR, mtr); 11 + 2 + BTR_EXTERN_FIELD_REF_SIZE);
/* TODO: write n */ if (UNIV_UNLIKELY(!log_ptr)) {
return;
}
log_ptr = mlog_write_initial_log_record_fast((byte*) field,
MLOG_ZIP_WRITE_BLOB_PTR, log_ptr, mtr);
mach_write_to_2(log_ptr, externs - page_zip->data);
log_ptr += 2;
memcpy(log_ptr, externs, BTR_EXTERN_FIELD_REF_SIZE);
log_ptr += BTR_EXTERN_FIELD_REF_SIZE;
mlog_close(mtr, log_ptr);
} }
} }
...@@ -2238,7 +2307,7 @@ page_zip_parse_write_node_ptr( ...@@ -2238,7 +2307,7 @@ page_zip_parse_write_node_ptr(
ut_ad(!page == !page_zip); ut_ad(!page == !page_zip);
if (UNIV_UNLIKELY(end_ptr < ptr + (2 + 2 + 4))) { if (UNIV_UNLIKELY(end_ptr < ptr + (2 + 2 + REC_NODE_PTR_SIZE))) {
return(NULL); return(NULL);
} }
...@@ -2287,9 +2356,6 @@ page_zip_parse_write_node_ptr( ...@@ -2287,9 +2356,6 @@ page_zip_parse_write_node_ptr(
goto corrupt; goto corrupt;
} }
#if REC_NODE_PTR_SIZE != 4
# error "REC_NODE_PTR_SIZE != 4"
#endif
memcpy(field, ptr + 4, REC_NODE_PTR_SIZE); memcpy(field, ptr + 4, REC_NODE_PTR_SIZE);
memcpy(storage, ptr + 4, REC_NODE_PTR_SIZE); memcpy(storage, ptr + 4, REC_NODE_PTR_SIZE);
...@@ -2298,7 +2364,7 @@ page_zip_parse_write_node_ptr( ...@@ -2298,7 +2364,7 @@ page_zip_parse_write_node_ptr(
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */ #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
} }
return(ptr + 6); return(ptr + (2 + 2 + REC_NODE_PTR_SIZE));
} }
/************************************************************************** /**************************************************************************
...@@ -2344,7 +2410,7 @@ page_zip_write_node_ptr( ...@@ -2344,7 +2410,7 @@ page_zip_write_node_ptr(
memcpy(storage, field, REC_NODE_PTR_SIZE); memcpy(storage, field, REC_NODE_PTR_SIZE);
if (mtr) { if (mtr) {
byte* log_ptr = mlog_open(mtr, 11 + 4 + 2); byte* log_ptr = mlog_open(mtr, 11 + 2 + REC_NODE_PTR_SIZE);
if (UNIV_UNLIKELY(!log_ptr)) { if (UNIV_UNLIKELY(!log_ptr)) {
return; return;
} }
...@@ -2353,8 +2419,7 @@ page_zip_write_node_ptr( ...@@ -2353,8 +2419,7 @@ page_zip_write_node_ptr(
MLOG_ZIP_WRITE_NODE_PTR, log_ptr, mtr); MLOG_ZIP_WRITE_NODE_PTR, log_ptr, mtr);
mach_write_to_2(log_ptr, storage - page_zip->data); mach_write_to_2(log_ptr, storage - page_zip->data);
log_ptr += 2; log_ptr += 2;
mach_write_to_4(log_ptr, ptr); memcpy(log_ptr, field, REC_NODE_PTR_SIZE);
log_ptr += 4;
mlog_close(mtr, log_ptr + 6); mlog_close(mtr, log_ptr + 6);
} }
} }
...@@ -2787,7 +2852,7 @@ page_zip_write_header_log( ...@@ -2787,7 +2852,7 @@ page_zip_write_header_log(
ut_ad(length < 256); ut_ad(length < 256);
/* If no logging is requested, we may return now */ /* If no logging is requested, we may return now */
if (log_ptr == NULL) { if (UNIV_UNLIKELY(!log_ptr)) {
return; return;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment