Commit 0cf96a32 authored by unknown's avatar unknown

WL#3072 - Maria recovery

For this scenario: server crashes (could be because a table is
corrupted) and Recovery repeatedly crashes on this table. User repairs
it with maria_chk (as REPAIR TABLE is not possible), restarts the
server, Recovery runs: for Recovery to not apply old REDOs to this
repaired table (which would fail: rows have moved), maria_chk sets
create_rename_lsn to the max value. Later when the server opens
the table via ha_maria, it sets the LSN to the correct current value.


storage/maria/ma_check.c:
  using helper function
storage/maria/ma_create.c:
  A new helper function which stores the create_rename_lsn 
  into the table's header on disk when we cannot wait for this to happen
  naturally at a later _ma_state_info_write().
storage/maria/ma_delete_all.c:
  using helper function; so log_data now can be FILEID_STORE_SIZE.
storage/maria/ma_open.c:
  When opening a transactional table in the server, we discover
  if it has been repaired with maria_chk and if yes, give it a correct
  create_rename_lsn.
storage/maria/ma_rename.c:
  using helper function
storage/maria/maria_chk.c:
  By setting create_rename_lsn to the maximum possible LSN, maria_chk
  ensures that old REDOs are not applied to the new table it is
  going to produce.
storage/maria/maria_def.h:
  new helper function
parent 10bce560
......@@ -5200,12 +5200,9 @@ int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info)
and to not apply old REDOs to the new table. The table's existence was
made durable earlier (MY_SYNC_DIR passed to maria_change_to_newfile()).
*/
lsn_store(log_data, share->state.create_rename_lsn);
DBUG_ASSERT(info->dfile.file >= 0);
DBUG_ASSERT(share->kfile.file >= 0);
return (my_pwrite(share->kfile.file, log_data, sizeof(log_data),
sizeof(share->state.header) + 2, MYF(MY_NABP)) ||
_ma_sync_table_files(info));
return _ma_update_create_rename_lsn_on_disk(share, FALSE) ||
_ma_sync_table_files(info);
}
return 0;
}
......@@ -1009,9 +1009,8 @@ int maria_create(const char *name, enum data_file_type datafile_type,
If such direct my_pwrite() to a fixed offset is too "hackish", I can
call ma_state_info_write() again but it will be less efficient.
*/
lsn_store(log_data, share.state.create_rename_lsn);
if (my_pwrite(file, log_data, LSN_STORE_SIZE,
sizeof(share.state.header) + 2, MYF(MY_NABP)))
share.kfile.file= file;
if (_ma_update_create_rename_lsn_on_disk(&share, FALSE))
goto err_no_lock;
my_free(log_data, MYF(0));
}
......@@ -1163,3 +1162,32 @@ int _ma_initialize_data_file(File dfile, MARIA_SHARE *share)
}
return 0;
}
/**
@brief Writes create_rename_lsn to disk, optionally forces
This is for special cases where:
- we don't want to write the full state to disk (so, not call
_ma_state_info_write()) because some parts of the state may be
currently inconsistent, or because it would be overkill
- we must sync this LSN immediately for correctness.
@param share table's share
@param do_sync if the write should be forced to disk
@return Operation status
@retval 0 ok
@retval 1 error (disk problem)
*/
int _ma_update_create_rename_lsn_on_disk(MARIA_SHARE *share, my_bool do_sync)
{
char buf[LSN_STORE_SIZE];
File file= share->kfile.file;
DBUG_ASSERT(file >= 0);
lsn_store(buf, share->state.create_rename_lsn);
return (my_pwrite(file, buf, sizeof(buf),
sizeof(share->state.header) + 2, MYF(MY_NABP)) ||
(do_sync && my_sync(file, MYF(0))));
}
......@@ -89,9 +89,9 @@ int maria_delete_all_rows(MARIA_HA *info)
{
/* For now this record is only informative */
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE];
uchar log_data[FILEID_STORE_SIZE];
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= FILEID_STORE_SIZE;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (unlikely(translog_write_record(&share->state.create_rename_lsn,
LOGREC_REDO_DELETE_ALL,
info->trn, share, 0,
......@@ -106,9 +106,7 @@ int maria_delete_all_rows(MARIA_HA *info)
Note that storing the LSN could not be done by _ma_writeinfo() above as
the table is locked at this moment. So we need to do it by ourselves.
*/
lsn_store(log_data, share->state.create_rename_lsn);
if (my_pwrite(share->kfile.file, log_data, sizeof(log_data),
sizeof(share->state.header) + 2, MYF(MY_NABP)) ||
if (_ma_update_create_rename_lsn_on_disk(share, FALSE) ||
_ma_sync_table_files(info))
goto err;
/**
......
......@@ -587,7 +587,19 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
share->base.pack_bytes +
test(share->options & HA_OPTION_CHECKSUM));
if (share->base.transactional)
{
share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE;
if (unlikely((share->state.create_rename_lsn == (LSN)ULONGLONG_MAX) &&
(open_flags & HA_OPEN_FROM_SQL_LAYER)))
{
/*
This table was repaired with maria_chk. Past log records should be
ignored, future log records should not: we define the present.
*/
share->state.create_rename_lsn= translog_get_horizon();
_ma_update_create_rename_lsn_on_disk(share, TRUE);
}
}
share->base.default_rec_buff_size= max(share->base.pack_reclength,
share->base.max_key_length);
share->page_type= (share->base.transactional ? PAGECACHE_LSN_PAGE :
......
......@@ -60,13 +60,13 @@ int maria_rename(const char *old_name, const char *new_name)
MY_SYNC_DIR : 0;
if (sync_dir)
{
uchar log_data[LSN_STORE_SIZE];
uchar log_data[2 + 2];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
uint old_name_len= strlen(old_name), new_name_len= strlen(new_name);
int2store(log_data, old_name_len);
int2store(log_data + 2, new_name_len);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= 2 + 2;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (char *)old_name;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= old_name_len;
log_array[TRANSLOG_INTERNAL_PARTS + 2].str= (char *)new_name;
......@@ -93,10 +93,7 @@ int maria_rename(const char *old_name, const char *new_name)
store LSN into file, needed for Recovery to not be confused if a
RENAME happened (applying REDOs to the wrong table).
*/
lsn_store(log_data, share->state.create_rename_lsn);
if (my_pwrite(share->kfile.file, log_data, sizeof(log_data),
sizeof(share->state.header) + 2, MYF(MY_NABP)) ||
my_sync(share->kfile.file, MYF(MY_WME)))
if (_ma_update_create_rename_lsn_on_disk(share, TRUE))
{
maria_close(info);
DBUG_RETURN(1);
......
......@@ -1026,6 +1026,13 @@ static int maria_chk(HA_CHECK *param, my_string filename)
}
if (!error)
{
/*
Tell the server's Recovery to ignore old REDOs on this table; we don't
know what the log's end LSN is now, so we just let the server know
that it will have to find and store it.
*/
if (share->base.transactional)
share->state.create_rename_lsn= (LSN)ULONGLONG_MAX;
if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) &&
(maria_is_any_key_active(share->state.key_map) ||
(rep_quick && !param->keys_in_use && !recreate)) &&
......
......@@ -886,13 +886,13 @@ void _ma_remap_file(MARIA_HA *info, my_off_t size);
MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info, const byte *record);
my_bool _ma_write_abort_default(MARIA_HA *info);
/* Functions needed by _ma_check (are overrided in MySQL) */
C_MODE_START
int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info);
/* Functions needed by _ma_check (are overrided in MySQL) */
volatile int *_ma_killed_ptr(HA_CHECK *param);
void _ma_check_print_error _VARARGS((HA_CHECK *param, const char *fmt, ...));
void _ma_check_print_warning _VARARGS((HA_CHECK *param, const char *fmt, ...));
void _ma_check_print_info _VARARGS((HA_CHECK *param, const char *fmt, ...));
int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info);
C_MODE_END
int _ma_flush_pending_blocks(MARIA_SORT_PARAM *param);
......@@ -909,6 +909,7 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
ulong);
int _ma_sync_table_files(const MARIA_HA *info);
int _ma_initialize_data_file(File dfile, MARIA_SHARE *share);
int _ma_update_create_rename_lsn_on_disk(MARIA_SHARE *share, my_bool do_sync);
void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment