/* Copyright (C) 2006, 2007 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* WL#3072 Maria recovery First version written by Guilhem Bichot on 2006-04-27. */ /* Here is the implementation of this module */ #include "maria_def.h" #include "ma_recovery.h" #include "ma_blockrec.h" #include "ma_checkpoint.h" #include "trnman.h" #include "ma_key_recover.h" struct st_trn_for_recovery /* used only in the REDO phase */ { LSN group_start_lsn, undo_lsn, first_undo_lsn; TrID long_trid; }; struct st_dirty_page /* used only in the REDO phase */ { uint64 file_and_page_id; LSN rec_lsn; }; struct st_table_for_recovery /* used in the REDO and UNDO phase */ { MARIA_HA *info; File org_kfile, org_dfile; /**< OS descriptors when Checkpoint saw table */ }; /* Variables used by all functions of this module. Ok as single-threaded */ static struct st_trn_for_recovery *all_active_trans; static struct st_table_for_recovery *all_tables; static HASH all_dirty_pages; static struct st_dirty_page *dirty_pages_pool; static LSN current_group_end_lsn, checkpoint_start= LSN_IMPOSSIBLE; #ifndef DBUG_OFF /** Current group of REDOs is about this table and only this one */ static MARIA_HA *current_group_table; #endif static TrID max_long_trid= 0; /**< max long trid seen by REDO phase */ static FILE *tracef; /**< trace file for debugging */ static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */ /** @brief to avoid writing a checkpoint if recovery did nothing. */ static my_bool checkpoint_useful; static my_bool procent_printed; static ulonglong now; /**< for tracking execution time of phases */ uint warnings; /**< count of warnings */ #define prototype_redo_exec_hook(R) \ static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec) #define prototype_redo_exec_hook_dummy(R) \ static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \ __attribute__ ((unused))) #define prototype_undo_exec_hook(R) \ static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn) prototype_redo_exec_hook(LONG_TRANSACTION_ID); prototype_redo_exec_hook_dummy(CHECKPOINT); prototype_redo_exec_hook(REDO_CREATE_TABLE); prototype_redo_exec_hook(REDO_RENAME_TABLE); prototype_redo_exec_hook(REDO_REPAIR_TABLE); prototype_redo_exec_hook(REDO_DROP_TABLE); prototype_redo_exec_hook(FILE_ID); prototype_redo_exec_hook(INCOMPLETE_LOG); prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP); prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD); prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL); prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS); prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD); prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL); prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL); prototype_redo_exec_hook(REDO_FREE_BLOCKS); prototype_redo_exec_hook(REDO_DELETE_ALL); prototype_redo_exec_hook(REDO_INDEX); prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE); prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE); prototype_redo_exec_hook(UNDO_ROW_INSERT); prototype_redo_exec_hook(UNDO_ROW_DELETE); prototype_redo_exec_hook(UNDO_ROW_UPDATE); prototype_redo_exec_hook(UNDO_KEY_INSERT); prototype_redo_exec_hook(UNDO_KEY_DELETE); prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT); prototype_redo_exec_hook(COMMIT); prototype_redo_exec_hook(CLR_END); prototype_undo_exec_hook(UNDO_ROW_INSERT); prototype_undo_exec_hook(UNDO_ROW_DELETE); prototype_undo_exec_hook(UNDO_ROW_UPDATE); prototype_undo_exec_hook(UNDO_KEY_INSERT); prototype_undo_exec_hook(UNDO_KEY_DELETE); prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT); static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply); static uint end_of_redo_phase(my_bool prepare_for_undo_phase); static int run_undo_phase(uint uncommitted); static void display_record_position(const LOG_DESC *log_desc, const TRANSLOG_HEADER_BUFFER *rec, uint number); static int display_and_apply_record(const LOG_DESC *log_desc, const TRANSLOG_HEADER_BUFFER *rec); static MARIA_HA *get_MARIA_HA_from_REDO_record(const TRANSLOG_HEADER_BUFFER *rec); static MARIA_HA *get_MARIA_HA_from_UNDO_record(const TRANSLOG_HEADER_BUFFER *rec); static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon); static LSN parse_checkpoint_record(LSN lsn); static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn, LSN first_undo_lsn); static int new_table(uint16 sid, const char *name, File org_kfile, File org_dfile, LSN lsn_of_file_id); static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn, struct st_dirty_page *dirty_page); static int close_all_tables(void); static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr); static void print_redo_phase_progress(TRANSLOG_ADDRESS addr); /** @brief global [out] buffer for translog_read_record(); never shrinks */ static struct { uchar *str; size_t length; } log_record_buffer; static void enlarge_buffer(const TRANSLOG_HEADER_BUFFER *rec) { if (log_record_buffer.length < rec->record_length) { log_record_buffer.length= rec->record_length; log_record_buffer.str= my_realloc(log_record_buffer.str, rec->record_length, MYF(MY_WME | MY_ALLOW_ZERO_PTR)); } } /** @brief Tells what kind of progress message was printed to the error log */ static enum recovery_message_type { REC_MSG_NONE= 0, REC_MSG_REDO, REC_MSG_UNDO, REC_MSG_FLUSH } recovery_message_printed; /** @brief Prints to a trace file if it is not NULL */ void tprint(FILE *trace_file, const char *format, ...) ATTRIBUTE_FORMAT(printf, 2, 3); void tprint(FILE *trace_file __attribute__ ((unused)), const char *format __attribute__ ((unused)), ...) { va_list args; va_start(args, format); DBUG_PRINT("info", ("%s", format)); if (trace_file != NULL) { if (procent_printed) { procent_printed= 0; fputc('\n', trace_file ? trace_file : stderr); } vfprintf(trace_file, format, args); } va_end(args); } void eprint(FILE *trace_file, const char *format, ...) ATTRIBUTE_FORMAT(printf, 2, 3); void eprint(FILE *trace_file __attribute__ ((unused)), const char *format __attribute__ ((unused)), ...) { va_list args; va_start(args, format); DBUG_PRINT("error", ("%s", format)); if (procent_printed) { /* In silent mode, print on another line than the 0% 10% 20% line */ procent_printed= 0; fputc('\n', trace_file ? trace_file : stderr); } vfprintf(trace_file ? trace_file : stderr, format, args); va_end(args); } #define ALERT_USER() DBUG_ASSERT(0) static void print_preamble() { ma_message_no_user(ME_JUST_INFO, "starting recovery"); } /** @brief Recovers from the last checkpoint. Runs the REDO phase using special structures, then sets up the playground of runtime: recreates transactions inside trnman, open tables with their two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all tables. @return Operation status @retval 0 OK @retval !=0 Error */ int maria_recover(void) { int res= 1; FILE *trace_file; uint warnings_count; DBUG_ENTER("maria_recover"); DBUG_ASSERT(!maria_in_recovery); maria_in_recovery= TRUE; #ifdef EXTRA_DEBUG trace_file= fopen("maria_recovery.trace", "a+"); #else trace_file= NULL; /* no trace file for being fast */ #endif tprint(trace_file, "TRACE of the last MARIA recovery from mysqld\n"); DBUG_ASSERT(maria_pagecache->inited); res= maria_apply_log(LSN_IMPOSSIBLE, MARIA_LOG_APPLY, trace_file, TRUE, TRUE, TRUE, &warnings_count); if (!res) { if (warnings_count == 0) tprint(trace_file, "SUCCESS\n"); else { tprint(trace_file, "DOUBTFUL (%u warnings, check previous output)\n", warnings_count); /* We asked for execution of UNDOs, and skipped DDLs, so shouldn't get any warnings. */ DBUG_ASSERT(0); } } if (trace_file) fclose(trace_file); maria_in_recovery= FALSE; DBUG_RETURN(res); } /** @brief Displays and/or applies the log @param from_lsn LSN from which log reading/applying should start; LSN_IMPOSSIBLE means "use last checkpoint" @param apply how log records should be applied or not @param trace_file trace file where progress/debug messages will go @param skip_DDLs_arg Should DDL records (CREATE/RENAME/DROP/REPAIR) be skipped by the REDO phase or not @param take_checkpoints Should we take checkpoints or not. @param[out] warnings_count Count of warnings will be put there @todo This trace_file thing is primitive; soon we will make it similar to ma_check_print_warning() etc, and a successful recovery does not need to create a trace file. But for debugging now it is useful. @return Operation status @retval 0 OK @retval !=0 Error */ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply, FILE *trace_file, my_bool should_run_undo_phase, my_bool skip_DDLs_arg, my_bool take_checkpoints, uint *warnings_count) { int error= 0; uint uncommitted_trans; ulonglong old_now; DBUG_ENTER("maria_apply_log"); DBUG_ASSERT(apply == MARIA_LOG_APPLY || !should_run_undo_phase); DBUG_ASSERT(!maria_multi_threaded); warnings= 0; /* checkpoints can happen only if TRNs have been built */ DBUG_ASSERT(should_run_undo_phase || !take_checkpoints); all_active_trans= (struct st_trn_for_recovery *) my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery), MYF(MY_ZEROFILL)); all_tables= (struct st_table_for_recovery *) my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery), MYF(MY_ZEROFILL)); if (!all_active_trans || !all_tables) goto err; if (take_checkpoints && ma_checkpoint_init(0)) goto err; recovery_message_printed= REC_MSG_NONE; tracef= trace_file; skip_DDLs= skip_DDLs_arg; if (from_lsn == LSN_IMPOSSIBLE) { if (last_checkpoint_lsn == LSN_IMPOSSIBLE) { from_lsn= translog_first_theoretical_lsn(); /* as far as we have not yet any checkpoint then the very first log file should be present. */ if (unlikely((from_lsn == LSN_IMPOSSIBLE) || (from_lsn == LSN_ERROR))) goto err; } else { from_lsn= parse_checkpoint_record(last_checkpoint_lsn); if (from_lsn == LSN_ERROR) goto err; } } now= my_getsystime(); if (run_redo_phase(from_lsn, apply)) { ma_message_no_user(0, "Redo phase failed"); goto err; } if ((uncommitted_trans= end_of_redo_phase(should_run_undo_phase)) == (uint)-1) { ma_message_no_user(0, "End of redo phase failed"); goto err; } old_now= now; now= my_getsystime(); if (recovery_message_printed == REC_MSG_REDO) { float phase_took= (now - old_now)/10000000.0; /* Detailed progress info goes to stderr, because ma_message_no_user() cannot put several messages on one line. */ procent_printed= 1; fprintf(stderr, " (%.1f seconds); ", phase_took); } /** REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be wrong: if a future recovery used it, the REDO phase would always start from the checkpoint and never from before, wrongly skipping REDOs (tested). Another problem is that the REDO phase uses PAGECACHE_PLAIN_PAGE, while Checkpoint only collects PAGECACHE_LSN_PAGE. @todo fix this. pagecache_write() now can have a rec_lsn argument. And we could make a function which goes through pages at end of REDO phase and changes their type. */ #ifdef FIX_AND_ENABLE_LATER if (take_checkpoints && checkpoint_useful) { /* We take a checkpoint as it can save future recovery work if we crash during the UNDO phase. But we don't flush pages, as UNDOs will change them again probably. */ if (ma_checkpoint_execute(CHECKPOINT_INDIRECT, FALSE)) goto err; } #endif if (should_run_undo_phase) { if (run_undo_phase(uncommitted_trans)) { ma_message_no_user(0, "Undo phase failed"); goto err; } } else if (uncommitted_trans > 0) { tprint(tracef, "***WARNING: %u uncommitted transactions; some tables may" " be left inconsistent!***\n", uncommitted_trans); warnings++; } old_now= now; now= my_getsystime(); if (recovery_message_printed == REC_MSG_UNDO) { float phase_took= (now - old_now)/10000000.0; procent_printed= 1; fprintf(stderr, " (%.1f seconds); ", phase_took); } /* we don't use maria_panic() because it would maria_end(), and Recovery does not want that (we want to keep some modules initialized for runtime). */ if (close_all_tables()) { ma_message_no_user(0, "closing of tables failed"); goto err; } old_now= now; now= my_getsystime(); if (recovery_message_printed == REC_MSG_FLUSH) { float phase_took= (now - old_now)/10000000.0; procent_printed= 1; fprintf(stderr, " (%.1f seconds); ", phase_took); } if (take_checkpoints && checkpoint_useful) { /* No dirty pages, all tables are closed, no active transactions, save: */ if (ma_checkpoint_execute(CHECKPOINT_FULL, FALSE)) goto err; } goto end; err: error= 1; tprint(tracef, "\nRecovery of tables with transaction logs FAILED\n"); end: hash_free(&all_dirty_pages); bzero(&all_dirty_pages, sizeof(all_dirty_pages)); my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR)); dirty_pages_pool= NULL; my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR)); all_tables= NULL; my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR)); all_active_trans= NULL; my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR)); log_record_buffer.str= NULL; log_record_buffer.length= 0; ma_checkpoint_end(); *warnings_count= warnings; if (recovery_message_printed != REC_MSG_NONE) { fprintf(stderr, "\n"); if (!error) ma_message_no_user(ME_JUST_INFO, "recovery done"); } if (error) my_message(HA_ERR_INITIALIZATION, "Maria recovery failed. Please run maria_chk -r on all maria " "tables and delete all maria_log.######## files", MYF(0)); procent_printed= 0; /* we don't cleanly close tables if we hit some error (may corrupt them) */ DBUG_RETURN(error); } /* very basic info about the record's header */ static void display_record_position(const LOG_DESC *log_desc, const TRANSLOG_HEADER_BUFFER *rec, uint number) { /* if number==0, we're going over records which we had already seen and which form a group, so we indent below the group's end record */ tprint(tracef, "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n", number ? "" : " ", number, LSN_IN_PARTS(rec->lsn), rec->short_trid, log_desc->name, rec->type, (ulong)rec->record_length); } static int display_and_apply_record(const LOG_DESC *log_desc, const TRANSLOG_HEADER_BUFFER *rec) { int error; if (log_desc->record_execute_in_redo_phase == NULL) { /* die on all not-yet-handled records :) */ DBUG_ASSERT("one more hook" == "to write"); return 1; } if ((error= (*log_desc->record_execute_in_redo_phase)(rec))) eprint(tracef, "Got error %d when executing record\n", my_errno); return error; } prototype_redo_exec_hook(LONG_TRANSACTION_ID) { uint16 sid= rec->short_trid; TrID long_trid= all_active_trans[sid].long_trid; /* Any incomplete group should be of an old crash which already had a recovery and thus has logged INCOMPLETE_GROUP which we must have seen. */ DBUG_ASSERT(all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE); if (long_trid != 0) { LSN ulsn= all_active_trans[sid].undo_lsn; /* If the first record of that transaction is after 'rec', it's probably because that transaction was found in the checkpoint record, and then it's ok, we can forget about that transaction (we'll meet it later again in the REDO phase) and replace it with the one in 'rec'. */ if ((ulsn != LSN_IMPOSSIBLE) && (cmp_translog_addr(ulsn, rec->lsn) < 0)) { char llbuf[22]; llstr(long_trid, llbuf); eprint(tracef, "Found an old transaction long_trid %s short_trid %u" " with same short id as this new transaction, and has neither" " committed nor rollback (undo_lsn: (%lu,0x%lx))\n", llbuf, sid, LSN_IN_PARTS(ulsn)); goto err; } } long_trid= uint6korr(rec->header); new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE); goto end; err: ALERT_USER(); return 1; end: return 0; } static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn, LSN first_undo_lsn) { char llbuf[22]; all_active_trans[sid].long_trid= long_id; llstr(long_id, llbuf); tprint(tracef, "Transaction long_trid %s short_trid %u starts\n", llbuf, sid); all_active_trans[sid].undo_lsn= undo_lsn; all_active_trans[sid].first_undo_lsn= first_undo_lsn; set_if_bigger(max_long_trid, long_id); } prototype_redo_exec_hook_dummy(CHECKPOINT) { /* the only checkpoint we care about was found via control file, ignore */ return 0; } prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP) { /* abortion was already made */ return 0; } prototype_redo_exec_hook(INCOMPLETE_LOG) { MARIA_HA *info; if (skip_DDLs) { tprint(tracef, "we skip DDLs\n"); return 0; } if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL) { /* no such table, don't need to warn */ return 0; } /* Example of what can go wrong when replaying DDLs: CREATE TABLE t (logged); INSERT INTO t VALUES(1) (logged); ALTER TABLE t ... which does CREATE a temporary table #sql... (logged) INSERT data from t into #sql... (not logged) RENAME #sql TO t (logged) Removing tables by hand and replaying the log will leave in the end an empty table "t": missing records. If after the RENAME an INSERT into t was done, that row had number 1 in its page, executing the REDO_INSERT_ROW_HEAD on the recreated empty t will fail (assertion failure in _ma_apply_redo_insert_row_head_or_tail(): new data page is created whereas rownr is not 0). So when the server disables logging for ALTER TABLE or CREATE SELECT, it logs LOGREC_INCOMPLETE_LOG to warn maria_read_log and then the user. Another issue is that replaying of DDLs is not correct enough to work if there was a crash during a DDL (see comment in execution of REDO_RENAME_TABLE ). */ tprint(tracef, "***WARNING: MySQL server currently logs no records" " about insertion of data by ALTER TABLE and CREATE SELECT," " as they are not necessary for recovery;" " present applying of log records may well not work.***\n"); warnings++; return 0; } prototype_redo_exec_hook(REDO_CREATE_TABLE) { File dfile= -1, kfile= -1; char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *data_file_name, *index_file_name; uchar *kfile_header; myf create_flag; uint flags; int error= 1, create_mode= O_RDWR | O_TRUNC; MARIA_HA *info= NULL; uint kfile_size_before_extension, keystart; if (skip_DDLs) { tprint(tracef, "we skip DDLs\n"); return 0; } enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); goto end; } name= (char *)log_record_buffer.str; /* TRUNCATE TABLE and REPAIR USE_FRM call maria_create(), so below we can find a REDO_CREATE_TABLE for a table which we have open, that's why we need to look for any open instances and close them first. */ if (close_one_table(name, rec->lsn)) { eprint(tracef, "Table '%s' got error %d on close\n", name, my_errno); ALERT_USER(); goto end; } /* we try hard to get create_rename_lsn, to avoid mistakes if possible */ info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); if (info) { MARIA_SHARE *share= info->s; /* check that we're not already using it */ if (share->reopen != 1) { eprint(tracef, "Table '%s is already open (reopen=%u)\n", name, share->reopen); ALERT_USER(); goto end; } DBUG_ASSERT(share->now_transactional == share->base.born_transactional); if (!share->base.born_transactional) { /* could be that transactional table was later dropped, and a non-trans one was renamed to its name, thus create_rename_lsn is 0 and should not be trusted. */ tprint(tracef, "Table '%s' is not transactional, ignoring creation\n", name); ALERT_USER(); error= 0; goto end; } if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) { tprint(tracef, "Table '%s' has create_rename_lsn (%lu,0x%lx) more " "recent than record, ignoring creation", name, LSN_IN_PARTS(share->state.create_rename_lsn)); error= 0; goto end; } if (maria_is_crashed(info)) { eprint(tracef, "Table '%s' is crashed, can't recreate it\n", name); ALERT_USER(); goto end; } maria_close(info); info= NULL; } else /* one or two files absent, or header corrupted... */ tprint(tracef, "Table '%s' can't be opened, probably does not exist\n", name); /* if does not exist, or is older, overwrite it */ ptr= name + strlen(name) + 1; if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0)) tprint(tracef, ", we will only touch index file"); ptr++; kfile_size_before_extension= uint2korr(ptr); ptr+= 2; keystart= uint2korr(ptr); ptr+= 2; kfile_header= (uchar *)ptr; ptr+= kfile_size_before_extension; /* set create_rename_lsn (for maria_read_log to be idempotent) */ lsn_store(kfile_header + sizeof(info->s->state.header) + 2, rec->lsn); /* we also set is_of_horizon, like maria_create() does */ lsn_store(kfile_header + sizeof(info->s->state.header) + 2 + LSN_STORE_SIZE, rec->lsn); data_file_name= ptr; ptr+= strlen(data_file_name) + 1; index_file_name= ptr; ptr+= strlen(index_file_name) + 1; /** @todo handle symlinks */ if (data_file_name[0] || index_file_name[0]) { eprint(tracef, "Table '%s' DATA|INDEX DIRECTORY clauses are not handled\n", name); goto end; } fn_format(filename, name, "", MARIA_NAME_IEXT, (MY_UNPACK_FILENAME | (flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) | MY_APPEND_EXT); linkname_ptr= NULL; create_flag= MY_DELETE_OLD; tprint(tracef, "Table '%s' creating as '%s'", name, filename); if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode, MYF(MY_WME|create_flag))) < 0) { eprint(tracef, "Failed to create index file\n"); goto end; } if (my_pwrite(kfile, kfile_header, kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) || my_chsize(kfile, keystart, 0, MYF(MY_WME))) { eprint(tracef, "Failed to write to index file\n"); goto end; } if (!(flags & HA_DONT_TOUCH_DATA)) { fn_format(filename,name,"", MARIA_NAME_DEXT, MY_UNPACK_FILENAME | MY_APPEND_EXT); linkname_ptr= NULL; create_flag=MY_DELETE_OLD; if (((dfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode, MYF(MY_WME | create_flag))) < 0) || my_close(dfile, MYF(MY_WME))) { eprint(tracef, "Failed to create data file\n"); goto end; } /* we now have an empty data file. To be able to _ma_initialize_data_file() we need some pieces of the share to be correctly filled. So we just open the table (fortunately, an empty data file does not preclude this). */ if (((info= maria_open(name, O_RDONLY, 0)) == NULL) || _ma_initialize_data_file(info->s, info->dfile.file)) { eprint(tracef, "Failed to open new table or write to data file\n"); goto end; } } error= 0; end: tprint(tracef, "\n"); if (kfile >= 0) error|= my_close(kfile, MYF(MY_WME)); if (info != NULL) error|= maria_close(info); return error; } prototype_redo_exec_hook(REDO_RENAME_TABLE) { char *old_name, *new_name; int error= 1; MARIA_HA *info= NULL; if (skip_DDLs) { tprint(tracef, "we skip DDLs\n"); return 0; } enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); goto end; } old_name= (char *)log_record_buffer.str; new_name= old_name + strlen(old_name) + 1; tprint(tracef, "Table '%s' to rename to '%s'; old-name table ", old_name, new_name); /* Here is why we skip CREATE/DROP/RENAME when doing a recovery from ha_maria (whereas we do when called from maria_read_log). Consider: CREATE TABLE t; RENAME TABLE t to u; DROP TABLE u; RENAME TABLE v to u; # crash between index rename and data rename. And do a Recovery (not removing tables beforehand). Recovery replays CREATE, then RENAME: the maria_open("t") works, maria_open("u") does not (no data file) so table "u" is considered inexistent and so maria_rename() is done which overwrites u's index file, which is lost. Ok, the data file (v.MAD) is still available, but only a REPAIR USE_FRM can rebuild the index, which is unsafe and downtime. So it is preferrable to not execute RENAME, and leave the "mess" of files, rather than possibly destroy a file. DBA will manually rename files. A safe recovery method would probably require checking the existence of the index file and of the data file separately (not via maria_open()), and maybe also to store a create_rename_lsn in the data file too For now, all we risk is to leave the mess (half-renamed files) left by the crash. We however sync files and directories at each file rename. The SQL layer is anyway not crash-safe for DDLs (except the repartioning-related ones). We replay DDLs in maria_read_log to be able to recreate tables from scratch. It means that "maria_read_log -a" should not be used on a database which just crashed during a DDL. And also ALTER TABLE does not log insertions of records into the temporary table, so replaying may fail (grep for INCOMPLETE_LOG in files). */ info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR); if (info) { MARIA_SHARE *share= info->s; if (!share->base.born_transactional) { tprint(tracef, ", is not transactional, ignoring renaming\n"); ALERT_USER(); error= 0; goto end; } if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) { tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than" " record, ignoring renaming", LSN_IN_PARTS(share->state.create_rename_lsn)); error= 0; goto end; } if (maria_is_crashed(info)) { tprint(tracef, ", is crashed, can't rename it"); ALERT_USER(); goto end; } if (close_one_table(info->s->open_file_name, rec->lsn) || maria_close(info)) goto end; info= NULL; tprint(tracef, ", is ok for renaming; new-name table "); } else /* one or two files absent, or header corrupted... */ { tprint(tracef, ", can't be opened, probably does not exist"); error= 0; goto end; } /* We must also check the create_rename_lsn of the 'new_name' table if it exists: otherwise we may, with our rename which overwrites, destroy another table. For example: CREATE TABLE t; RENAME t to u; DROP TABLE u; RENAME v to u; # v is an old table, its creation/insertions not in log And start executing the log (without removing tables beforehand): creates t, renames it to u (if not testing create_rename_lsn) thus overwriting old-named v, drops u, and we are stuck, we have lost data. */ info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR); if (info) { MARIA_SHARE *share= info->s; /* We should not have open instances on this table. */ if (share->reopen != 1) { tprint(tracef, ", is already open (reopen=%u)\n", share->reopen); ALERT_USER(); goto end; } if (!share->base.born_transactional) { tprint(tracef, ", is not transactional, ignoring renaming\n"); ALERT_USER(); goto drop; } if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) { tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than" " record, ignoring renaming", LSN_IN_PARTS(share->state.create_rename_lsn)); /* We have to drop the old_name table. Consider: CREATE TABLE t; CREATE TABLE v; RENAME TABLE t to u; DROP TABLE u; RENAME TABLE v to u; and apply the log without removing tables beforehand. t will be created, v too; in REDO_RENAME u will be more recent, but we still have to drop t otherwise it stays. */ goto drop; } if (maria_is_crashed(info)) { tprint(tracef, ", is crashed, can't rename it"); ALERT_USER(); goto end; } if (maria_close(info)) goto end; info= NULL; /* abnormal situation */ tprint(tracef, ", exists but is older than record, can't rename it"); goto end; } else /* one or two files absent, or header corrupted... */ tprint(tracef, ", can't be opened, probably does not exist"); tprint(tracef, ", renaming '%s'", old_name); if (maria_rename(old_name, new_name)) { eprint(tracef, "Failed to rename table\n"); goto end; } info= maria_open(new_name, O_RDONLY, 0); if (info == NULL) { eprint(tracef, "Failed to open renamed table\n"); goto end; } if (_ma_update_create_rename_lsn(info->s, rec->lsn, TRUE)) goto end; if (maria_close(info)) goto end; info= NULL; error= 0; goto end; drop: tprint(tracef, ", only dropping '%s'", old_name); if (maria_delete_table(old_name)) { eprint(tracef, "Failed to drop table\n"); goto end; } error= 0; goto end; end: tprint(tracef, "\n"); if (info != NULL) error|= maria_close(info); return error; } /* The record may come from REPAIR, ALTER TABLE ENABLE KEYS, OPTIMIZE. */ prototype_redo_exec_hook(REDO_REPAIR_TABLE) { int error= 1; MARIA_HA *info; HA_CHECK param; char *name; uint quick_repair; DBUG_ENTER("exec_REDO_LOGREC_REDO_REPAIR_TABLE"); if (skip_DDLs) { /* REPAIR is not exactly a DDL, but it manipulates files without logging insertions into them. */ tprint(tracef, "we skip DDLs\n"); DBUG_RETURN(0); } if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL) DBUG_RETURN(0); /* Otherwise, the mapping is newer than the table, and our record is newer than the mapping, so we can repair. */ tprint(tracef, " repairing...\n"); maria_chk_init(¶m); param.isam_file_name= name= info->s->open_file_name; param.testflag= uint4korr(rec->header + FILEID_STORE_SIZE); param.tmpdir= maria_tmpdir; DBUG_ASSERT(maria_tmpdir); info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 4); quick_repair= param.testflag & T_QUICK; if (param.testflag & T_REP_PARALLEL) { if (maria_repair_parallel(¶m, info, name, quick_repair)) goto end; } else if (param.testflag & T_REP_BY_SORT) { if (maria_repair_by_sort(¶m, info, name, quick_repair)) goto end; } else if (maria_repair(¶m, info, name, quick_repair)) goto end; if (_ma_update_create_rename_lsn(info->s, rec->lsn, TRUE)) goto end; error= 0; end: DBUG_RETURN(error); } prototype_redo_exec_hook(REDO_DROP_TABLE) { char *name; int error= 1; MARIA_HA *info; if (skip_DDLs) { tprint(tracef, "we skip DDLs\n"); return 0; } enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); return 1; } name= (char *)log_record_buffer.str; tprint(tracef, "Table '%s'", name); info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); if (info) { MARIA_SHARE *share= info->s; if (!share->base.born_transactional) { tprint(tracef, ", is not transactional, ignoring removal\n"); ALERT_USER(); error= 0; goto end; } if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) { tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than" " record, ignoring removal", LSN_IN_PARTS(share->state.create_rename_lsn)); error= 0; goto end; } if (maria_is_crashed(info)) { tprint(tracef, ", is crashed, can't drop it"); ALERT_USER(); goto end; } if (close_one_table(info->s->open_file_name, rec->lsn) || maria_close(info)) goto end; info= NULL; /* if it is older, or its header is corrupted, drop it */ tprint(tracef, ", dropping '%s'", name); if (maria_delete_table(name)) { eprint(tracef, "Failed to drop table\n"); goto end; } } else /* one or two files absent, or header corrupted... */ tprint(tracef,", can't be opened, probably does not exist"); error= 0; end: tprint(tracef, "\n"); if (info != NULL) error|= maria_close(info); return error; } prototype_redo_exec_hook(FILE_ID) { uint16 sid; int error= 1; const char *name; MARIA_HA *info; DBUG_ENTER("exec_REDO_LOGREC_FILE_ID"); if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0) { /* If that mapping was still true at checkpoint time, it was found in checkpoint record, no need to recreate it. If that mapping had ended at checkpoint time (table was closed or repaired), a flush and force happened and so mapping is not needed. */ tprint(tracef, "ignoring because before checkpoint\n"); DBUG_RETURN(0); } enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); goto end; } sid= fileid_korr(log_record_buffer.str); info= all_tables[sid].info; if (info != NULL) { tprint(tracef, " Closing table '%s'\n", info->s->open_file_name); prepare_table_for_close(info, rec->lsn); if (maria_close(info)) { eprint(tracef, "Failed to close table\n"); goto end; } all_tables[sid].info= NULL; } name= (char *)log_record_buffer.str + FILEID_STORE_SIZE; if (new_table(sid, name, -1, -1, rec->lsn)) goto end; error= 0; end: DBUG_RETURN(error); } static int new_table(uint16 sid, const char *name, File org_kfile, File org_dfile, LSN lsn_of_file_id) { /* -1 (skip table): close table and return 0; 1 (error): close table and return 1; 0 (success): leave table open and return 0. */ int error= 1; MARIA_HA *info; MARIA_SHARE *share; my_off_t dfile_len, kfile_len; checkpoint_useful= TRUE; if ((name == NULL) || (name[0] == 0)) { /* we didn't use DBUG_ASSERT() because such record corruption could silently pass in the "info == NULL" test below. */ tprint(tracef, ", record is corrupted"); info= NULL; goto end; } tprint(tracef, "Table '%s', id %u", name, sid); info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR); if (info == NULL) { tprint(tracef, ", is absent (must have been dropped later?)" " or its header is so corrupted that we cannot open it;" " we skip it"); error= 0; goto end; } if (maria_is_crashed(info)) { /** @todo what should we do? how to continue recovery? */ tprint(tracef, "Table is crashed, can't apply log records to it\n"); goto end; } share= info->s; /* check that we're not already using it */ if (share->reopen != 1) { tprint(tracef, ", is already open (reopen=%u)\n", share->reopen); /* It could be that we have in the log FILE_ID(t1,10) ... (t1 was flushed) ... FILE_ID(t1,12); */ if (close_one_table(share->open_file_name, lsn_of_file_id)) goto end; } DBUG_ASSERT(share->now_transactional == share->base.born_transactional); if (!share->base.born_transactional) { tprint(tracef, ", is not transactional\n"); ALERT_USER(); error= -1; goto end; } if (cmp_translog_addr(lsn_of_file_id, share->state.create_rename_lsn) <= 0) { tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than" " LOGREC_FILE_ID's LSN (%lu,0x%lx), ignoring open request", LSN_IN_PARTS(share->state.create_rename_lsn), LSN_IN_PARTS(lsn_of_file_id)); error= -1; goto end; } /* don't log any records for this work */ _ma_tmp_disable_logging_for_table(info, FALSE); /* _ma_unpin_all_pages() reads info->trn: */ info->trn= &dummy_transaction_object; /* execution of some REDO records relies on data_file_length */ dfile_len= my_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME)); kfile_len= my_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME)); if ((dfile_len == MY_FILEPOS_ERROR) || (kfile_len == MY_FILEPOS_ERROR)) { tprint(tracef, ", length unknown\n"); goto end; } if (share->state.state.data_file_length != dfile_len) { tprint(tracef, ", has wrong state.data_file_length (fixing it)"); share->state.state.data_file_length= dfile_len; } if (share->state.state.key_file_length != kfile_len) { tprint(tracef, ", has wrong state.key_file_length (fixing it)"); share->state.state.key_file_length= kfile_len; } if ((dfile_len % share->block_size) || (kfile_len % share->block_size)) { tprint(tracef, ", has too short last page\n"); /* Recovery will fix this, no error */ ALERT_USER(); } /* This LSN serves in this situation; assume log is: FILE_ID(6->"t2") REDO_INSERT(6) FILE_ID(6->"t1") CHECKPOINT(6->"t1") then crash, checkpoint record is parsed and opens "t1" with id 6; assume REDO phase starts from the REDO_INSERT above: it will wrongly try to update a page of "t1". With this LSN below, REDO_INSERT can realize the mapping is newer than itself, and not execute. Same example is possible with UNDO_INSERT (update of the state). */ info->s->lsn_of_file_id= lsn_of_file_id; all_tables[sid].info= info; all_tables[sid].org_kfile= org_kfile; all_tables[sid].org_dfile= org_dfile; /* We don't set info->s->id, it would be useless (no logging in REDO phase); if you change that, know that some records in REDO phase call _ma_update_create_rename_lsn() which resets info->s->id. */ tprint(tracef, ", opened"); error= 0; end: tprint(tracef, "\n"); if (error) { if (info != NULL) maria_close(info); if (error == -1) error= 0; } return error; } prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD) { int error= 1; uchar *buff= NULL; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) { /* Table was skipped at open time (because later dropped/renamed, not transactional, or create_rename_lsn newer than LOGREC_FILE_ID); it is not an error. */ return 0; } /* If REDO's LSN is > page's LSN (read from disk), we are going to modify the page and change its LSN. The normal runtime code stores the UNDO's LSN into the page. Here storing the REDO's LSN (rec->lsn) would work (we are not writing to the log here, so don't have to "flush up to UNDO's LSN"). But in a test scenario where we do updates at runtime, then remove tables, apply the log and check that this results in the same table as at runtime, putting the same LSN as runtime had done will decrease differences. So we use the UNDO's LSN which is current_group_end_lsn. */ enlarge_buffer(rec); if (log_record_buffer.str == NULL) { eprint(tracef, "Failed to read allocate buffer for record\n"); goto end; } if (translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); goto end; } buff= log_record_buffer.str; if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn, HEAD_PAGE, buff + FILEID_STORE_SIZE, buff + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, rec->record_length - (FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE))) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL) { int error= 1; uchar *buff; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); goto end; } buff= log_record_buffer.str; if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn, TAIL_PAGE, buff + FILEID_STORE_SIZE, buff + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, rec->record_length - (FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE))) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS) { int error= 1; uchar *buff; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); goto end; } buff= log_record_buffer.str; if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn, buff + FILEID_STORE_SIZE)) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD) { int error= 1; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn, HEAD_PAGE, rec->header + FILEID_STORE_SIZE)) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL) { int error= 1; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn, TAIL_PAGE, rec->header + FILEID_STORE_SIZE)) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_FREE_BLOCKS) { int error= 1; uchar *buff; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); goto end; } buff= log_record_buffer.str; if (_ma_apply_redo_free_blocks(info, current_group_end_lsn, buff + FILEID_STORE_SIZE)) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL) { int error= 1; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; if (_ma_apply_redo_free_head_or_tail(info, current_group_end_lsn, rec->header + FILEID_STORE_SIZE)) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_DELETE_ALL) { int error= 1; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; tprint(tracef, " deleting all %lu rows\n", (ulong)info->s->state.state.records); if (maria_delete_all_rows(info)) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_INDEX) { int error= 1; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); goto end; } if (_ma_apply_redo_index(info, current_group_end_lsn, log_record_buffer.str + FILEID_STORE_SIZE, rec->record_length - FILEID_STORE_SIZE)) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE) { int error= 1; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); goto end; } if (_ma_apply_redo_index_new_page(info, current_group_end_lsn, log_record_buffer.str + FILEID_STORE_SIZE, rec->record_length - FILEID_STORE_SIZE)) goto end; error= 0; end: return error; } prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE) { int error= 1; MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); if (info == NULL) return 0; if (_ma_apply_redo_index_free_page(info, current_group_end_lsn, rec->header + FILEID_STORE_SIZE)) goto end; error= 0; end: return error; } #define set_undo_lsn_for_active_trans(TRID, LSN) do { \ all_active_trans[TRID].undo_lsn= LSN; \ if (all_active_trans[TRID].first_undo_lsn == LSN_IMPOSSIBLE) \ all_active_trans[TRID].first_undo_lsn= LSN; } while (0) prototype_redo_exec_hook(UNDO_ROW_INSERT) { MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_SHARE *share; if (info == NULL) return 0; share= info->s; set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { tprint(tracef, " state has LSN (%lu,0x%lx) older than record, updating" " rows' count\n", LSN_IN_PARTS(share->state.is_of_horizon)); share->state.state.records++; if (share->calc_checksum) { uchar buff[HA_CHECKSUM_STORE_SIZE]; if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, HA_CHECKSUM_STORE_SIZE, buff, NULL) != HA_CHECKSUM_STORE_SIZE) { eprint(tracef, "Failed to read record\n"); return 1; } share->state.state.checksum+= ha_checksum_korr(buff); } info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; } tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records); /* Unpin all pages, stamp them with UNDO's LSN */ _ma_unpin_all_pages(info, rec->lsn); return 0; } prototype_redo_exec_hook(UNDO_ROW_DELETE) { MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_SHARE *share; if (info == NULL) return 0; share= info->s; set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { tprint(tracef, " state older than record\n"); share->state.state.records--; if (share->calc_checksum) { uchar buff[HA_CHECKSUM_STORE_SIZE]; if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, HA_CHECKSUM_STORE_SIZE, buff, NULL) != HA_CHECKSUM_STORE_SIZE) { eprint(tracef, "Failed to read record\n"); return 1; } share->state.state.checksum+= ha_checksum_korr(buff); } share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED | STATE_NOT_OPTIMIZED_ROWS; } tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records); _ma_unpin_all_pages(info, rec->lsn); return 0; } prototype_redo_exec_hook(UNDO_ROW_UPDATE) { MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_SHARE *share; if (info == NULL) return 0; share= info->s; set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { if (share->calc_checksum) { uchar buff[HA_CHECKSUM_STORE_SIZE]; if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, HA_CHECKSUM_STORE_SIZE, buff, NULL) != HA_CHECKSUM_STORE_SIZE) { eprint(tracef, "Failed to read record\n"); return 1; } share->state.state.checksum+= ha_checksum_korr(buff); } share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; } _ma_unpin_all_pages(info, rec->lsn); return 0; } prototype_redo_exec_hook(UNDO_KEY_INSERT) { MARIA_HA *info; MARIA_SHARE *share; if (!(info= get_MARIA_HA_from_UNDO_record(rec))) return 0; share= info->s; set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE; uint keynr= key_nr_korr(ptr); if (share->base.auto_key == (keynr + 1)) /* it's auto-increment */ { const HA_KEYSEG *keyseg= info->s->keyinfo[keynr].seg; ulonglong value; char llbuf[22]; uchar *to; tprint(tracef, " state older than record\n"); /* we read the record to find the auto_increment value */ enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); return 1; } to= log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE; if (keyseg->flag & HA_SWAP_KEY) { /* We put key from log record to "data record" packing format... */ uchar reversed[HA_MAX_KEY_BUFF]; uchar *key_ptr= to; uchar *key_end= key_ptr + keyseg->length; to= reversed + keyseg->length; do { *--to= *key_ptr++; } while (key_ptr != key_end); /* ... so that we can read it with: */ } value= ma_retrieve_auto_increment(to, keyseg->type); set_if_bigger(share->state.auto_increment, value); llstr(share->state.auto_increment, llbuf); tprint(tracef, " auto-inc %s\n", llbuf); } } _ma_unpin_all_pages(info, rec->lsn); return 0; } prototype_redo_exec_hook(UNDO_KEY_DELETE) { MARIA_HA *info; if (!(info= get_MARIA_HA_from_UNDO_record(rec))) return 0; set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); _ma_unpin_all_pages(info, rec->lsn); return 0; } prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT) { MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_SHARE *share; if (info == NULL) return 0; share= info->s; set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { uint key_nr; my_off_t page; key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE); page= page_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE); share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ? HA_OFFSET_ERROR : page * share->block_size); } _ma_unpin_all_pages(info, rec->lsn); return 0; } prototype_redo_exec_hook(COMMIT) { uint16 sid= rec->short_trid; TrID long_trid= all_active_trans[sid].long_trid; char llbuf[22]; if (long_trid == 0) { tprint(tracef, "We don't know about transaction with short_trid %u;" "it probably committed long ago, forget it\n", sid); bzero(&all_active_trans[sid], sizeof(all_active_trans[sid])); return 0; } llstr(long_trid, llbuf); tprint(tracef, "Transaction long_trid %s short_trid %u committed\n", llbuf, sid); bzero(&all_active_trans[sid], sizeof(all_active_trans[sid])); #ifdef MARIA_VERSIONING /* if real recovery: transaction was committed, move it to some separate list for later purging (but don't purge now! purging may have been started before, we may find REDO_PURGE records soon). */ #endif return 0; } /* Set position for next active record that will have key inserted */ static void set_lastpos(MARIA_HA *info, uchar *pos) { ulonglong page; uint dir_entry; /* If we have checksum, it's before rowid */ if (info->s->calc_checksum) pos+= HA_CHECKSUM_STORE_SIZE; page= page_korr(pos); dir_entry= dirpos_korr(pos + PAGE_STORE_SIZE); info->cur_row.lastpos= ma_recordpos(page, dir_entry); } prototype_redo_exec_hook(CLR_END) { MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_SHARE *share; LSN previous_undo_lsn; enum translog_record_type undone_record_type; const LOG_DESC *log_desc; my_bool row_entry= 0; uchar *logpos; DBUG_ENTER("exec_REDO_LOGREC_CLR_END"); if (info == NULL) DBUG_RETURN(0); share= info->s; previous_undo_lsn= lsn_korr(rec->header); undone_record_type= clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE); log_desc= &log_record_type_descriptor[undone_record_type]; set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn); tprint(tracef, " CLR_END was about %s, undo_lsn now LSN (%lu,0x%lx)\n", log_desc->name, LSN_IN_PARTS(previous_undo_lsn)); enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); return 1; } logpos= (log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE + CLR_TYPE_STORE_SIZE); if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0) { tprint(tracef, " state older than record\n"); switch (undone_record_type) { case LOGREC_UNDO_ROW_DELETE: row_entry= 1; share->state.state.records++; set_lastpos(info, logpos); break; case LOGREC_UNDO_ROW_INSERT: share->state.state.records--; share->state.changed|= STATE_NOT_OPTIMIZED_ROWS; row_entry= 1; break; case LOGREC_UNDO_ROW_UPDATE: row_entry= 1; set_lastpos(info, logpos); break; case LOGREC_UNDO_KEY_INSERT: case LOGREC_UNDO_KEY_DELETE: break; case LOGREC_UNDO_KEY_INSERT_WITH_ROOT: case LOGREC_UNDO_KEY_DELETE_WITH_ROOT: { uint key_nr; my_off_t page; key_nr= key_nr_korr(logpos); page= page_korr(logpos + KEY_NR_STORE_SIZE); share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ? HA_OFFSET_ERROR : page * share->block_size); break; } default: DBUG_ASSERT(0); } if (row_entry && share->calc_checksum) share->state.state.checksum+= ha_checksum_korr(logpos); share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; } else { /* We must set lastpos for upcoming undo delete keys */ switch (undone_record_type) { case LOGREC_UNDO_ROW_DELETE: case LOGREC_UNDO_ROW_UPDATE: set_lastpos(info, logpos); break; default: break; } } if (row_entry) tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records); _ma_unpin_all_pages(info, rec->lsn); DBUG_RETURN(0); } prototype_undo_exec_hook(UNDO_ROW_INSERT) { my_bool error; MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); LSN previous_undo_lsn= lsn_korr(rec->header); MARIA_SHARE *share; const uchar *record_ptr; if (info == NULL) { /* Unlike for REDOs, if the table was skipped it is abnormal; we have a transaction to rollback which used this table, as it is not rolled back it was supposed to hold this table and so the table should still be there. */ return 1; } share= info->s; share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED | STATE_NOT_OPTIMIZED_ROWS; record_ptr= rec->header; if (share->calc_checksum) { /* We need to read more of the record to put the checksum into the record buffer used by _ma_apply_undo_row_insert(). If the table has no live checksum, rec->header will be enough. */ enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); return 1; } record_ptr= log_record_buffer.str; } info->trn= trn; error= _ma_apply_undo_row_insert(info, previous_undo_lsn, record_ptr + LSN_STORE_SIZE + FILEID_STORE_SIZE); info->trn= 0; /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */ tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records); tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", LSN_IN_PARTS(previous_undo_lsn)); return error; } prototype_undo_exec_hook(UNDO_ROW_DELETE) { my_bool error; MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); LSN previous_undo_lsn= lsn_korr(rec->header); MARIA_SHARE *share; if (info == NULL) return 1; share= info->s; share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); return 1; } info->trn= trn; /* For now we skip the page and directory entry. This is to be used later when we mark rows as deleted. */ error= _ma_apply_undo_row_delete(info, previous_undo_lsn, log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, rec->record_length - (LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE)); info->trn= 0; tprint(tracef, " rows' count %lu\n undo_lsn now LSN (%lu,0x%lx)\n", (ulong)share->state.state.records, LSN_IN_PARTS(previous_undo_lsn)); return error; } prototype_undo_exec_hook(UNDO_ROW_UPDATE) { my_bool error; MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); LSN previous_undo_lsn= lsn_korr(rec->header); MARIA_SHARE *share; if (info == NULL) return 1; share= info->s; share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); return 1; } info->trn= trn; error= _ma_apply_undo_row_update(info, previous_undo_lsn, log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE, rec->record_length - (LSN_STORE_SIZE + FILEID_STORE_SIZE)); info->trn= 0; tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", LSN_IN_PARTS(previous_undo_lsn)); return error; } prototype_undo_exec_hook(UNDO_KEY_INSERT) { my_bool error; MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); LSN previous_undo_lsn= lsn_korr(rec->header); MARIA_SHARE *share; if (info == NULL) { /* Unlike for REDOs, if the table was skipped it is abnormal; we have a transaction to rollback which used this table, as it is not rolled back it was supposed to hold this table and so the table should still be there. */ return 1; } share= info->s; share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); return 1; } info->trn= trn; error= _ma_apply_undo_key_insert(info, previous_undo_lsn, log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE, rec->record_length - LSN_STORE_SIZE - FILEID_STORE_SIZE); info->trn= 0; /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */ tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", LSN_IN_PARTS(previous_undo_lsn)); return error; } prototype_undo_exec_hook(UNDO_KEY_DELETE) { my_bool error; MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); LSN previous_undo_lsn= lsn_korr(rec->header); MARIA_SHARE *share; if (info == NULL) { /* Unlike for REDOs, if the table was skipped it is abnormal; we have a transaction to rollback which used this table, as it is not rolled back it was supposed to hold this table and so the table should still be there. */ return 1; } share= info->s; share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); return 1; } info->trn= trn; error= _ma_apply_undo_key_delete(info, previous_undo_lsn, log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE, rec->record_length - LSN_STORE_SIZE - FILEID_STORE_SIZE); info->trn= 0; /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */ tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", LSN_IN_PARTS(previous_undo_lsn)); return error; } prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT) { my_bool error; MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); LSN previous_undo_lsn= lsn_korr(rec->header); MARIA_SHARE *share; if (info == NULL) { /* Unlike for REDOs, if the table was skipped it is abnormal; we have a transaction to rollback which used this table, as it is not rolled back it was supposed to hold this table and so the table should still be there. */ return 1; } share= info->s; share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; enlarge_buffer(rec); if (log_record_buffer.str == NULL || translog_read_record(rec->lsn, 0, rec->record_length, log_record_buffer.str, NULL) != rec->record_length) { eprint(tracef, "Failed to read record\n"); return 1; } info->trn= trn; error= _ma_apply_undo_key_delete(info, previous_undo_lsn, log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE, rec->record_length - LSN_STORE_SIZE - FILEID_STORE_SIZE - PAGE_STORE_SIZE); info->trn= 0; /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */ tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n", LSN_IN_PARTS(previous_undo_lsn)); return error; } static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply) { TRANSLOG_HEADER_BUFFER rec; struct st_translog_scanner_data scanner; int len; uint i; /* install hooks for execution */ #define install_redo_exec_hook(R) \ log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \ exec_REDO_LOGREC_ ## R; #define install_undo_exec_hook(R) \ log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \ exec_UNDO_LOGREC_ ## R; install_redo_exec_hook(LONG_TRANSACTION_ID); install_redo_exec_hook(CHECKPOINT); install_redo_exec_hook(REDO_CREATE_TABLE); install_redo_exec_hook(REDO_RENAME_TABLE); install_redo_exec_hook(REDO_REPAIR_TABLE); install_redo_exec_hook(REDO_DROP_TABLE); install_redo_exec_hook(FILE_ID); install_redo_exec_hook(INCOMPLETE_LOG); install_redo_exec_hook(INCOMPLETE_GROUP); install_redo_exec_hook(REDO_INSERT_ROW_HEAD); install_redo_exec_hook(REDO_INSERT_ROW_TAIL); install_redo_exec_hook(REDO_INSERT_ROW_BLOBS); install_redo_exec_hook(REDO_PURGE_ROW_HEAD); install_redo_exec_hook(REDO_PURGE_ROW_TAIL); install_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL); install_redo_exec_hook(REDO_FREE_BLOCKS); install_redo_exec_hook(REDO_DELETE_ALL); install_redo_exec_hook(REDO_INDEX); install_redo_exec_hook(REDO_INDEX_NEW_PAGE); install_redo_exec_hook(REDO_INDEX_FREE_PAGE); install_redo_exec_hook(UNDO_ROW_INSERT); install_redo_exec_hook(UNDO_ROW_DELETE); install_redo_exec_hook(UNDO_ROW_UPDATE); install_redo_exec_hook(UNDO_KEY_INSERT); install_redo_exec_hook(UNDO_KEY_DELETE); install_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT); install_redo_exec_hook(COMMIT); install_redo_exec_hook(CLR_END); install_undo_exec_hook(UNDO_ROW_INSERT); install_undo_exec_hook(UNDO_ROW_DELETE); install_undo_exec_hook(UNDO_ROW_UPDATE); install_undo_exec_hook(UNDO_KEY_INSERT); install_undo_exec_hook(UNDO_KEY_DELETE); install_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT); current_group_end_lsn= LSN_IMPOSSIBLE; #ifndef DBUG_OFF current_group_table= NULL; #endif if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon())) { tprint(tracef, "checkpoint address refers to the log end log or " "log is empty, nothing to do.\n"); return 0; } len= translog_read_record_header(lsn, &rec); if (len == RECHEADER_READ_ERROR) { eprint(tracef, "Failed to read header of the first record.\n"); return 1; } if (translog_scanner_init(lsn, 1, &scanner, 1)) { tprint(tracef, "Scanner init failed\n"); return 1; } for (i= 1;;i++) { uint16 sid= rec.short_trid; const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type]; display_record_position(log_desc, &rec, i); /* A complete group is a set of log records with an "end mark" record (e.g. a set of REDOs for an operation, terminated by an UNDO for this operation); if there is no "end mark" record the group is incomplete and won't be executed. */ if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) || (log_desc->record_in_group == LOGREC_LAST_IN_GROUP)) { if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE) { if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) { /* Can happen if the transaction got a table write error, then unlocked tables thus wrote a COMMIT record. Or can be an INCOMPLETE_GROUP record written by a previous recovery. */ tprint(tracef, "\nDiscarding incomplete group before this record\n"); all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; } else { struct st_translog_scanner_data scanner2; TRANSLOG_HEADER_BUFFER rec2; /* There is a complete group for this transaction, containing more than this event. */ tprint(tracef, " ends a group:\n"); len= translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2); if (len < 0) /* EOF or error */ { tprint(tracef, "Cannot find record where it should be\n"); goto err; } if (translog_scanner_init(rec2.lsn, 1, &scanner2, 1)) { tprint(tracef, "Scanner2 init failed\n"); goto err; } current_group_end_lsn= rec.lsn; do { if (rec2.short_trid == sid) /* it's in our group */ { const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type]; display_record_position(log_desc2, &rec2, 0); if (apply == MARIA_LOG_CHECK) { translog_size_t read_len; enlarge_buffer(&rec2); read_len= translog_read_record(rec2.lsn, 0, rec2.record_length, log_record_buffer.str, NULL); if (read_len != rec2.record_length) { tprint(tracef, "Cannot read record's body: read %u of" " %u bytes\n", read_len, rec2.record_length); goto err; } } if (apply == MARIA_LOG_APPLY && display_and_apply_record(log_desc2, &rec2)) { translog_destroy_scanner(&scanner2); goto err; } } len= translog_read_next_record_header(&scanner2, &rec2); if (len < 0) /* EOF or error */ { tprint(tracef, "Cannot find record where it should be\n"); goto err; } } while (rec2.lsn < rec.lsn); translog_free_record_header(&rec2); /* group finished */ all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */ display_record_position(log_desc, &rec, 0); translog_destroy_scanner(&scanner2); } } if (apply == MARIA_LOG_APPLY && display_and_apply_record(log_desc, &rec)) goto err; #ifndef DBUG_OFF current_group_table= NULL; #endif } else /* record does not end group */ { /* just record the fact, can't know if can execute yet */ if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE) { /* group not yet started */ all_active_trans[sid].group_start_lsn= rec.lsn; } } len= translog_read_next_record_header(&scanner, &rec); if (len < 0) { switch (len) { case RECHEADER_READ_EOF: tprint(tracef, "EOF on the log\n"); break; case RECHEADER_READ_ERROR: tprint(tracef, "Error reading log\n"); goto err; } break; } } translog_destroy_scanner(&scanner); translog_free_record_header(&rec); if (recovery_message_printed == REC_MSG_REDO) { fprintf(stderr, " 100%%"); procent_printed= 1; } return 0; err: translog_destroy_scanner(&scanner); return 1; } /** @brief Informs about any aborted groups or uncommitted transactions, prepares for the UNDO phase if needed. @note Observe that it may init trnman. */ static uint end_of_redo_phase(my_bool prepare_for_undo_phase) { uint sid, uncommitted= 0; char llbuf[22]; LSN addr; hash_free(&all_dirty_pages); /* hash_free() can be called multiple times probably, but be safe if that changes */ bzero(&all_dirty_pages, sizeof(all_dirty_pages)); my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR)); dirty_pages_pool= NULL; llstr(max_long_trid, llbuf); tprint(tracef, "Maximum transaction long id seen: %s\n", llbuf); if (prepare_for_undo_phase && trnman_init(max_long_trid)) return -1; for (sid= 0; sid <= SHORT_TRID_MAX; sid++) { TrID long_trid= all_active_trans[sid].long_trid; LSN gslsn= all_active_trans[sid].group_start_lsn; TRN *trn; if (gslsn != LSN_IMPOSSIBLE) { tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n", LSN_IN_PARTS(gslsn), sid); all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; } if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE) { llstr(long_trid, llbuf); tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n", llbuf, sid); /* dummy_transaction_object serves only for DDLs */ DBUG_ASSERT(long_trid != 0); if (prepare_for_undo_phase) { if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL) return -1; trn->undo_lsn= all_active_trans[sid].undo_lsn; trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn | TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */ if (gslsn != LSN_IMPOSSIBLE) { /* UNDO phase will log some records. So, a future recovery may see: REDO(from incomplete group) - REDO(from rollback) - CLR_END and thus execute the first REDO (finding it in "a complete group"). To prevent that: */ LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS]; LSN lsn; if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP, trn, NULL, 0, TRANSLOG_INTERNAL_PARTS, log_array, NULL, NULL)) return -1; } } uncommitted++; } #ifdef MARIA_VERSIONING /* If real recovery: if transaction was committed, move it to some separate list for soon purging. */ #endif } my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR)); all_active_trans= NULL; /* The UNDO phase uses some normal run-time code of ROLLBACK: generates log records, etc; prepare tables for that */ addr= translog_get_horizon(); for (sid= 0; sid <= SHARE_ID_MAX; sid++) { MARIA_HA *info= all_tables[sid].info; if (info != NULL) { prepare_table_for_close(info, addr); /* But we don't close it; we leave it available for the UNDO phase; it's likely that the UNDO phase will need it. */ if (prepare_for_undo_phase) translog_assign_id_to_share_from_recovery(info->s, sid); } } return uncommitted; } static int run_undo_phase(uint uncommitted) { DBUG_ENTER("run_undo_phase"); if (uncommitted > 0) { checkpoint_useful= TRUE; if (tracef != stdout) { if (recovery_message_printed == REC_MSG_NONE) print_preamble(); fprintf(stderr, "transactions to roll back:"); recovery_message_printed= REC_MSG_UNDO; } tprint(tracef, "%u transactions will be rolled back\n", uncommitted); for( ; ; ) { char llbuf[22]; TRN *trn; if (recovery_message_printed == REC_MSG_UNDO) fprintf(stderr, " %u", uncommitted); if ((uncommitted--) == 0) break; trn= trnman_get_any_trn(); DBUG_ASSERT(trn != NULL); llstr(trn->trid, llbuf); tprint(tracef, "Rolling back transaction of long id %s\n", llbuf); /* Execute all undo entries */ while (trn->undo_lsn) { TRANSLOG_HEADER_BUFFER rec; LOG_DESC *log_desc; if (translog_read_record_header(trn->undo_lsn, &rec) == RECHEADER_READ_ERROR) DBUG_RETURN(1); log_desc= &log_record_type_descriptor[rec.type]; display_record_position(log_desc, &rec, 0); if (log_desc->record_execute_in_undo_phase(&rec, trn)) { tprint(tracef, "Got error %d when executing undo\n", my_errno); DBUG_RETURN(1); } } if (trnman_rollback_trn(trn)) DBUG_RETURN(1); /* We could want to span a few threads (4?) instead of 1 */ /* In the future, we want to have this phase *online* */ } } DBUG_RETURN(0); } /** @brief re-enables transactionality, updates is_of_horizon @param info table @param horizon address to set is_of_horizon */ static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon) { MARIA_SHARE *share= info->s; /* In a fully-forward REDO phase (no checkpoint record), state is now at least as new as the LSN of the current record. It may be newer, in case we are seeing a LOGREC_FILE_ID which tells us to close a table, but that table was later modified further in the log. But if we parsed a checkpoint record, it may be this way in the log: FILE_ID(6->t2)... FILE_ID(6->t1)... CHECKPOINT(6->t1) Checkpoint parsing opened t1 with id 6; first FILE_ID above is going to make t1 close; the first condition below is however false (when checkpoint was taken it increased is_of_horizon) and so it works. For safety we add the second condition. */ if (cmp_translog_addr(share->state.is_of_horizon, horizon) < 0 && cmp_translog_addr(share->lsn_of_file_id, horizon) < 0) { share->state.is_of_horizon= horizon; _ma_state_info_write_sub(share->kfile.file, &share->state, 1); } _ma_reenable_logging_for_table(share); info->trn= NULL; /* safety */ } static MARIA_HA *get_MARIA_HA_from_REDO_record(const TRANSLOG_HEADER_BUFFER *rec) { uint16 sid; pgcache_page_no_t page; MARIA_HA *info; char llbuf[22]; my_bool index_page_redo_entry= 0; print_redo_phase_progress(rec->lsn); sid= fileid_korr(rec->header); page= page_korr(rec->header + FILEID_STORE_SIZE); switch (rec->type) { /* not all REDO records have a page: */ case LOGREC_REDO_INDEX_NEW_PAGE: case LOGREC_REDO_INDEX: case LOGREC_REDO_INDEX_FREE_PAGE: index_page_redo_entry= 1; /* Fall trough*/ case LOGREC_REDO_INSERT_ROW_HEAD: case LOGREC_REDO_INSERT_ROW_TAIL: case LOGREC_REDO_PURGE_ROW_HEAD: case LOGREC_REDO_PURGE_ROW_TAIL: llstr(page, llbuf); tprint(tracef, " For page %s of table of short id %u", llbuf, sid); break; /* other types could print their info here too */ default: break; } info= all_tables[sid].info; #ifndef DBUG_OFF DBUG_ASSERT(current_group_table == NULL || current_group_table == info); current_group_table= info; #endif if (info == NULL) { tprint(tracef, ", table skipped, so skipping record\n"); return NULL; } tprint(tracef, ", '%s'", info->s->open_file_name); if (cmp_translog_addr(rec->lsn, info->s->lsn_of_file_id) <= 0) { /* This can happen only if processing a record before the checkpoint record. id->name mapping is newer than REDO record: for sure the table subject of the REDO has been flushed and forced (id re-assignment implies this); REDO can be ignored (and must be, as we don't know what this subject table was). */ DBUG_ASSERT(cmp_translog_addr(rec->lsn, checkpoint_start) < 0); tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent" " than record, skipping record", LSN_IN_PARTS(info->s->lsn_of_file_id)); return NULL; } /* detect if an open instance of a dropped table (internal bug) */ DBUG_ASSERT(info->s->last_version != 0); if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0) { uint64 file_and_page_id= (((uint64) (index_page_redo_entry ? all_tables[sid].org_kfile : all_tables[sid].org_dfile)) << 32) | page; struct st_dirty_page *dirty_page= (struct st_dirty_page *) hash_search(&all_dirty_pages, (uchar *)&file_and_page_id, sizeof(file_and_page_id)); if ((dirty_page == NULL) || cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0) { tprint(tracef, ", ignoring because of dirty_pages list\n"); return NULL; } } /* So we are going to read the page, and if its LSN is older than the record's we will modify the page */ tprint(tracef, ", applying record\n"); _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */ return info; } static MARIA_HA *get_MARIA_HA_from_UNDO_record(const TRANSLOG_HEADER_BUFFER *rec) { uint16 sid; MARIA_HA *info; sid= fileid_korr(rec->header + LSN_STORE_SIZE); tprint(tracef, " For table of short id %u", sid); info= all_tables[sid].info; #ifndef DBUG_OFF DBUG_ASSERT(current_group_table == NULL || current_group_table == info); current_group_table= info; #endif if (info == NULL) { tprint(tracef, ", table skipped, so skipping record\n"); return NULL; } tprint(tracef, ", '%s'", info->s->open_file_name); if (cmp_translog_addr(rec->lsn, info->s->lsn_of_file_id) <= 0) { tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent" " than record, skipping record", LSN_IN_PARTS(info->s->lsn_of_file_id)); return NULL; } DBUG_ASSERT(info->s->last_version != 0); _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */ tprint(tracef, ", applying record\n"); return info; } /** @brief Parses checkpoint record. Builds from it the dirty_pages list (a hash), opens tables and maps them to their 2-byte IDs, recreates transactions (not real TRNs though). @return LSN from where in the log the REDO phase should start @retval LSN_ERROR error @retval other ok */ static LSN parse_checkpoint_record(LSN lsn) { ulong i, nb_dirty_pages; TRANSLOG_HEADER_BUFFER rec; TRANSLOG_ADDRESS start_address; int len; uint nb_active_transactions, nb_committed_transactions, nb_tables; uchar *ptr; LSN minimum_rec_lsn_of_active_transactions, minimum_rec_lsn_of_dirty_pages; struct st_dirty_page *next_dirty_page_in_pool; tprint(tracef, "Loading data from checkpoint record at LSN (%lu,0x%lx)\n", LSN_IN_PARTS(lsn)); if ((len= translog_read_record_header(lsn, &rec)) == RECHEADER_READ_ERROR) { tprint(tracef, "Cannot find checkpoint record where it should be\n"); return LSN_ERROR; } enlarge_buffer(&rec); if (log_record_buffer.str == NULL || translog_read_record(rec.lsn, 0, rec.record_length, log_record_buffer.str, NULL) != rec.record_length) { eprint(tracef, "Failed to read record\n"); return LSN_ERROR; } ptr= log_record_buffer.str; start_address= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; /* transactions */ nb_active_transactions= uint2korr(ptr); ptr+= 2; tprint(tracef, "%u active transactions\n", nb_active_transactions); minimum_rec_lsn_of_active_transactions= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; max_long_trid= transid_korr(ptr); ptr+= TRANSID_SIZE; /* how much brain juice and discussions there was to come to writing this line */ set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions); for (i= 0; i < nb_active_transactions; i++) { uint16 sid= uint2korr(ptr); TrID long_id; LSN undo_lsn, first_undo_lsn; ptr+= 2; long_id= uint6korr(ptr); ptr+= 6; DBUG_ASSERT(sid > 0 && long_id > 0); undo_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; first_undo_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; new_transaction(sid, long_id, undo_lsn, first_undo_lsn); } nb_committed_transactions= uint4korr(ptr); ptr+= 4; tprint(tracef, "%lu committed transactions\n", (ulong)nb_committed_transactions); /* no purging => committed transactions are not important */ ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions; /* tables */ nb_tables= uint4korr(ptr); ptr+= 4; tprint(tracef, "%u open tables\n", nb_tables); for (i= 0; i< nb_tables; i++) { char name[FN_REFLEN]; File kfile, dfile; LSN first_log_write_lsn; uint name_len; uint16 sid= uint2korr(ptr); ptr+= 2; DBUG_ASSERT(sid > 0); kfile= uint4korr(ptr); ptr+= 4; dfile= uint4korr(ptr); ptr+= 4; first_log_write_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; name_len= strlen((char *)ptr) + 1; strmake(name, (char *)ptr, sizeof(name)-1); ptr+= name_len; if (new_table(sid, name, kfile, dfile, first_log_write_lsn)) return LSN_ERROR; } /* dirty pages */ nb_dirty_pages= uint8korr(ptr); ptr+= 8; tprint(tracef, "%lu dirty pages\n", nb_dirty_pages); if (hash_init(&all_dirty_pages, &my_charset_bin, nb_dirty_pages, offsetof(struct st_dirty_page, file_and_page_id), sizeof(((struct st_dirty_page *)NULL)->file_and_page_id), NULL, NULL, 0)) return LSN_ERROR; dirty_pages_pool= (struct st_dirty_page *)my_malloc(nb_dirty_pages * sizeof(struct st_dirty_page), MYF(MY_WME)); if (unlikely(dirty_pages_pool == NULL)) return LSN_ERROR; next_dirty_page_in_pool= dirty_pages_pool; minimum_rec_lsn_of_dirty_pages= LSN_MAX; for (i= 0; i < nb_dirty_pages ; i++) { pgcache_page_no_t pageid; LSN rec_lsn; File fileid= uint4korr(ptr); ptr+= 4; pageid= uint4korr(ptr); ptr+= 4; rec_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; if (new_page(fileid, pageid, rec_lsn, next_dirty_page_in_pool++)) return LSN_ERROR; set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn); } /* after that, there will be no insert/delete into the hash */ /* sanity check on record (did we screw up with all those "ptr+=", did the checkpoint write code and checkpoint read code go out of sync?). */ if (ptr != (log_record_buffer.str + log_record_buffer.length)) { eprint(tracef, "checkpoint record corrupted\n"); return LSN_ERROR; } set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages); /* Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for translog_read_record() functions */ checkpoint_start= translog_next_LSN(start_address, LSN_IMPOSSIBLE); if (checkpoint_start == LSN_IMPOSSIBLE) { /* There must be a problem, as our checkpoint record exists and is >= the address which is stored in its first bytes, which is >= start_address. */ return LSN_ERROR; } return checkpoint_start; } static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn, struct st_dirty_page *dirty_page) { /* serves as hash key */ dirty_page->file_and_page_id= (((uint64)fileid) << 32) | pageid; dirty_page->rec_lsn= rec_lsn; return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page); } static int close_all_tables(void) { int error= 0; uint count= 0; LIST *list_element, *next_open; MARIA_HA *info; TRANSLOG_ADDRESS addr; DBUG_ENTER("close_all_tables"); pthread_mutex_lock(&THR_LOCK_maria); if (maria_open_list == NULL) goto end; tprint(tracef, "Closing all tables\n"); if (tracef != stdout) { if (recovery_message_printed == REC_MSG_NONE) print_preamble(); for (count= 0, list_element= maria_open_list ; list_element ; count++, (list_element= list_element->next)) fprintf(stderr, "tables to flush:"); recovery_message_printed= REC_MSG_FLUSH; } /* Since the end of end_of_redo_phase(), we may have written new records (if UNDO phase ran) and thus the state is newer than at end_of_redo_phase(), we need to bump is_of_horizon again. */ addr= translog_get_horizon(); for (list_element= maria_open_list ; ; list_element= next_open) { if (recovery_message_printed == REC_MSG_FLUSH) fprintf(stderr, " %u", count--); if (list_element == NULL) break; next_open= list_element->next; info= (MARIA_HA*)list_element->data; pthread_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */ /* Tables which we see here are exactly those which were open at time of crash. They might have open_count>0 as Checkpoint maybe flushed their state while they were used. As Recovery corrected them, don't alarm the user, don't ask for a table check: */ info->s->state.open_count= 0; prepare_table_for_close(info, addr); error|= maria_close(info); pthread_mutex_lock(&THR_LOCK_maria); } end: pthread_mutex_unlock(&THR_LOCK_maria); DBUG_RETURN(error); } /** @brief Close all table instances with a certain name which are present in all_tables. @param name Name of table @param addr Log address passed to prepare_table_for_close() */ static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr) { my_bool res= 0; /* There are no other threads using the tables, so we don't need any locks */ struct st_table_for_recovery *internal_table, *end; for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1; internal_table < end ; internal_table++) { MARIA_HA *info= internal_table->info; if ((info != NULL) && !strcmp(info->s->open_file_name, name)) { prepare_table_for_close(info, addr); if (maria_close(info)) res= 1; internal_table->info= NULL; } } return res; } /** Temporarily disables logging for this table. If that makes the log incomplete, writes a LOGREC_INCOMPLETE_LOG to the log to warn log readers. @param info table @param log_incomplete if that disabling makes the log incomplete @note for example in the REDO phase we disable logging but that does not make the log incomplete. */ void _ma_tmp_disable_logging_for_table(MARIA_HA *info, my_bool log_incomplete) { MARIA_SHARE *share= info->s; if (log_incomplete) { uchar log_data[FILEID_STORE_SIZE]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LSN lsn; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG, info->trn, info, sizeof(log_data), TRANSLOG_INTERNAL_PARTS + 1, log_array, log_data, NULL); } /* if we disabled before writing the record, record wouldn't reach log */ share->now_transactional= FALSE; share->page_type= PAGECACHE_PLAIN_PAGE; } static void print_redo_phase_progress(TRANSLOG_ADDRESS addr) { static int end_logno= FILENO_IMPOSSIBLE, end_offset, percentage_printed= 0; static ulonglong initial_remainder= -1; int cur_logno, cur_offset; ulonglong local_remainder; int percentage_done; if (tracef == stdout) return; if (recovery_message_printed == REC_MSG_NONE) { print_preamble(); fprintf(stderr, "recovered pages: 0%%"); procent_printed= 1; recovery_message_printed= REC_MSG_REDO; } if (end_logno == FILENO_IMPOSSIBLE) { LSN end_addr= translog_get_horizon(); end_logno= LSN_FILE_NO(end_addr); end_offset= LSN_OFFSET(end_addr); } cur_logno= LSN_FILE_NO(addr); cur_offset= LSN_OFFSET(addr); local_remainder= (cur_logno == end_logno) ? (end_offset - cur_offset) : (((longlong)log_file_size) - cur_offset + max(end_logno - cur_logno - 1, 0) * ((longlong)log_file_size) + end_offset); if (initial_remainder == (ulonglong)(-1)) initial_remainder= local_remainder; percentage_done= ((initial_remainder - local_remainder) * ULL(100) / initial_remainder); if ((percentage_done - percentage_printed) >= 10) { percentage_printed= percentage_done; fprintf(stderr, " %d%%", percentage_done); procent_printed= 1; } } #ifdef MARIA_EXTERNAL_LOCKING #error Marias Checkpoint and Recovery are really not ready for it #endif /* Recovery of the state : how it works ===================================== Here we ignore Checkpoints for a start. The state (MARIA_HA::MARIA_SHARE::MARIA_STATE_INFO) is updated in memory frequently (at least at every row write/update/delete) but goes to disk at few moments: maria_close() when closing the last open instance, and a few rare places like CHECK/REPAIR/ALTER (non-transactional tables also do it at maria_lock_database() but we needn't cover them here). In case of crash, state on disk is likely to be older than what it was in memory, the REDO phase needs to recreate the state as it was in memory at the time of crash. When we say Recovery here we will always mean "REDO phase". For example MARIA_STATUS_INFO::records (count of records). It is updated at the end of every row write/update/delete/delete_all. When Recovery sees the sign of such row operation (UNDO or REDO), it may need to update the records' count if that count does not reflect that operation (is older). How to know the age of the state compared to the log record: every time the state goes to disk at runtime, its member "is_of_horizon" is updated to the current end-of-log horizon. So Recovery just needs to compare is_of_horizon and the record's LSN to know if it should modify "records". Other operations like ALTER TABLE DISABLE KEYS update the state but don't write log records, thus the REDO phase cannot repeat their effect on the state in case of crash. But we make them sync the state as soon as they have finished. This reduces the window for a problem. It looks like only one thread at a time updates the state in memory or on disk. We assume that the upper level (normally MySQL) has protection against issuing HA_EXTRA_(FORCE_REOPEN|PREPARE_FOR_RENAME) so that these are not issued while there are any running transactions on the given table. If this is not done, we may write a corrupted state to disk. With checkpoints ================ Checkpoint module needs to read the state in memory and write it to disk. This may happen while some other thread is modifying the state in memory or on disk. Checkpoint thus may be reading changing data, it needs a mutex to not have it corrupted, and concurrent modifiers of the state need that mutex too for the same reason. "records" is modified for every row write/update/delete, we don't want to add a mutex lock/unlock there. So we re-use the mutex lock/unlock which is already present in these moments, namely the log's mutex which is taken when UNDO_ROW_INSERT|UPDATE|DELETE is written: we update "records" in under-log-mutex hooks when writing these records (thus "records" is not updated at the end of maria_write/update/delete() anymore). Thus Checkpoint takes the log's lock and can read "records" from memory an write it to disk and release log's lock. We however want to avoid having the disk write under the log's lock. So it has to be under another mutex, natural choice is intern_lock (as Checkpoint needs it anyway to read MARIA_SHARE::kfile, and as maria_close() takes it too). All state writes to disk are changed to be protected with intern_lock. So Checkpoint takes intern_lock, log's lock, reads "records" from memory, releases log's lock, updates is_of_horizon and writes "records" to disk, release intern_lock. In practice, not only "records" needs to be written but the full state. So, Checkpoint reads the full state from memory. Some other thread may at this moment be modifying in memory some pieces of the state which are not protected by the lock's log (see ma_extra.c HA_EXTRA_NO_KEYS), and Checkpoint would be reading a corrupted state from memory; to guard against that we extend the intern_lock-zone to changes done to the state in memory by HA_EXTRA_NO_KEYS et al, and also any change made in memory to create_rename_lsn/state_is_of_horizon. Last, we don't want in Checkpoint to do log lock; read state from memory; release log lock; for each table, it may hold the log's lock too much in total. So, we instead do log lock; read N states from memory; release log lock; Thus, the sequence above happens outside of any intern_lock. But this re-introduces the problem that some other thread may be changing the state in memory and on disk under intern_lock, without log's lock, like HA_EXTRA_NO_KEYS, while we read the N states. However, when Checkpoint later comes to handling the table under intern_lock, which is serialized with HA_EXTRA_NO_KEYS, it can see that is_of_horizon is higher then when the state was read from memory under log's lock, and thus can decide to not flush the obsolete state it has, knowing that the other thread flushed a more recent state already. If on the other hand is_of_horizon is not higher, the read state is current and can be flushed. So we have a per-table sequence: lock intern_lock; test if is_of_horizon is higher than when we read the state under log's lock; if no then flush the read state to disk. */ /* some comments and pseudo-code which we keep for later */ #if 0 /* MikaelR suggests: support checkpoints during REDO phase too: do checkpoint after a certain amount of log records have been executed. This helps against repeated crashes. Those checkpoints could not be user-requested (as engine is not communicating during the REDO phase), so they would be automatic: this changes the original assumption that we don't write to the log while in the REDO phase, but why not. How often should we checkpoint? */ /* We want to have two steps: engine->recover_with_max_memory(); next_engine->recover_with_max_memory(); engine->init_with_normal_memory(); next_engine->init_with_normal_memory(); So: in recover_with_max_memory() allocate a giant page cache, do REDO phase, then all page cache is flushed and emptied and freed (only retain small structures like TM): take full checkpoint, which is useful if next engine crashes in its recovery the next second. Destroy all shares (maria_close()), then at init_with_normal_memory() we do this: */ /**** UNDO PHASE *****/ /* Launch one or more threads to do the background rollback. Don't wait for them to complete their rollback (background rollback; for debugging, we can have an option which waits). Set a counter (total_of_rollback_threads) to the number of threads to lauch. Note that InnoDB's rollback-in-background works as long as InnoDB is the last engine to recover, otherwise MySQL will refuse new connections until the last engine has recovered so it's not "background" from the user's point of view. InnoDB is near top of sys_table_types so all others (e.g. BDB) recover after it... So it's really "online rollback" only if InnoDB is the only engine. */ /* wake up delete/update handler */ /* tell the TM that it can now accept new transactions */ /* mark that checkpoint requests are now allowed. */ #endif