/* Copyright (C) 2006, 2007 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/*
  WL#3072 Maria recovery
  First version written by Guilhem Bichot on 2006-04-27.
*/

/* Here is the implementation of this module */

#include "maria_def.h"
#include "ma_recovery.h"
#include "ma_blockrec.h"
#include "ma_checkpoint.h"
#include "trnman.h"
#include "ma_key_recover.h"

struct st_trn_for_recovery /* used only in the REDO phase */
{
  LSN group_start_lsn, undo_lsn, first_undo_lsn;
  TrID long_trid;
};
struct st_dirty_page /* used only in the REDO phase */
{
  uint64 file_and_page_id;
  LSN rec_lsn;
};
struct st_table_for_recovery /* used in the REDO and UNDO phase */
{
  MARIA_HA *info;
  File org_kfile, org_dfile; /**< OS descriptors when Checkpoint saw table */
};
/* Variables used by all functions of this module. Ok as single-threaded */
static struct st_trn_for_recovery *all_active_trans;
static struct st_table_for_recovery *all_tables;
static HASH all_dirty_pages;
static struct st_dirty_page *dirty_pages_pool;
static LSN current_group_end_lsn,
  checkpoint_start= LSN_IMPOSSIBLE;
#ifndef DBUG_OFF
/** Current group of REDOs is about this table and only this one */
static MARIA_HA *current_group_table;
#endif
static TrID max_long_trid= 0; /**< max long trid seen by REDO phase */
static FILE *tracef; /**< trace file for debugging */
static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */
/** @brief to avoid writing a checkpoint if recovery did nothing. */
static my_bool checkpoint_useful;
static my_bool procent_printed;
static ulonglong now; /**< for tracking execution time of phases */
uint warnings; /**< count of warnings */

#define prototype_redo_exec_hook(R)                                          \
  static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)

#define prototype_redo_exec_hook_dummy(R)                                    \
  static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec        \
                               __attribute__ ((unused)))

#define prototype_undo_exec_hook(R)                                          \
  static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn)

prototype_redo_exec_hook(LONG_TRANSACTION_ID);
prototype_redo_exec_hook_dummy(CHECKPOINT);
prototype_redo_exec_hook(REDO_CREATE_TABLE);
prototype_redo_exec_hook(REDO_RENAME_TABLE);
prototype_redo_exec_hook(REDO_REPAIR_TABLE);
prototype_redo_exec_hook(REDO_DROP_TABLE);
prototype_redo_exec_hook(FILE_ID);
prototype_redo_exec_hook(INCOMPLETE_LOG);
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP);
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
prototype_redo_exec_hook(REDO_FREE_BLOCKS);
prototype_redo_exec_hook(REDO_DELETE_ALL);
prototype_redo_exec_hook(REDO_INDEX);
prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE);
prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE);
prototype_redo_exec_hook(UNDO_ROW_INSERT);
prototype_redo_exec_hook(UNDO_ROW_DELETE);
prototype_redo_exec_hook(UNDO_ROW_UPDATE);
prototype_redo_exec_hook(UNDO_KEY_INSERT);
prototype_redo_exec_hook(UNDO_KEY_DELETE);
prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
prototype_redo_exec_hook(COMMIT);
prototype_redo_exec_hook(CLR_END);
prototype_undo_exec_hook(UNDO_ROW_INSERT);
prototype_undo_exec_hook(UNDO_ROW_DELETE);
prototype_undo_exec_hook(UNDO_ROW_UPDATE);
prototype_undo_exec_hook(UNDO_KEY_INSERT);
prototype_undo_exec_hook(UNDO_KEY_DELETE);
prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);

static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
static int run_undo_phase(uint uncommitted);
static void display_record_position(const LOG_DESC *log_desc,
                                    const TRANSLOG_HEADER_BUFFER *rec,
                                    uint number);
static int display_and_apply_record(const LOG_DESC *log_desc,
                                    const TRANSLOG_HEADER_BUFFER *rec);
static MARIA_HA *get_MARIA_HA_from_REDO_record(const
                                               TRANSLOG_HEADER_BUFFER *rec);
static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
                                               TRANSLOG_HEADER_BUFFER *rec);
static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon);
static LSN parse_checkpoint_record(LSN lsn);
static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
                            LSN first_undo_lsn);
static int new_table(uint16 sid, const char *name,
                     File org_kfile, File org_dfile,
                     LSN lsn_of_file_id);
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
                    struct st_dirty_page *dirty_page);
static int close_all_tables(void);
static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr);
static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);

/** @brief global [out] buffer for translog_read_record(); never shrinks */
static struct
{
  uchar *str;
  size_t length;
} log_record_buffer;
static void enlarge_buffer(const TRANSLOG_HEADER_BUFFER *rec)
{
  if (log_record_buffer.length < rec->record_length)
  {
    log_record_buffer.length= rec->record_length;
    log_record_buffer.str= my_realloc(log_record_buffer.str,
                                      rec->record_length,
                                      MYF(MY_WME | MY_ALLOW_ZERO_PTR));
  }
}
/** @brief Tells what kind of progress message was printed to the error log */
static enum recovery_message_type
{
  REC_MSG_NONE= 0, REC_MSG_REDO, REC_MSG_UNDO, REC_MSG_FLUSH
} recovery_message_printed;
/** @brief Prints to a trace file if it is not NULL */
void tprint(FILE *trace_file, const char *format, ...)
  ATTRIBUTE_FORMAT(printf, 2, 3);
void tprint(FILE *trace_file __attribute__ ((unused)),
            const char *format __attribute__ ((unused)), ...)
{
  va_list args;
  va_start(args, format);
  DBUG_PRINT("info", ("%s", format));
  if (trace_file != NULL)
  {
    if (procent_printed)
    {
      procent_printed= 0;
      fputc('\n', trace_file ? trace_file : stderr);
    }
    vfprintf(trace_file, format, args);
  }
  va_end(args);
}

void eprint(FILE *trace_file, const char *format, ...)
  ATTRIBUTE_FORMAT(printf, 2, 3);

void eprint(FILE *trace_file __attribute__ ((unused)),
            const char *format __attribute__ ((unused)), ...)
{
  va_list args;
  va_start(args, format);
  DBUG_PRINT("error", ("%s", format));
  if (procent_printed)
  {
    /* In silent mode, print on another line than the 0% 10% 20% line */
    procent_printed= 0;
    fputc('\n', trace_file ? trace_file : stderr);
  }
  vfprintf(trace_file ? trace_file : stderr, format, args);
  va_end(args);
}


#define ALERT_USER() DBUG_ASSERT(0)

static void print_preamble()
{
  ma_message_no_user(ME_JUST_INFO, "starting recovery");
}


/**
   @brief Recovers from the last checkpoint.

   Runs the REDO phase using special structures, then sets up the playground
   of runtime: recreates transactions inside trnman, open tables with their
   two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
   tables.

   @return Operation status
     @retval 0      OK
     @retval !=0    Error
*/

int maria_recover(void)
{
  int res= 1;
  FILE *trace_file;
  uint warnings_count;
  DBUG_ENTER("maria_recover");

  DBUG_ASSERT(!maria_in_recovery);
  maria_in_recovery= TRUE;

#ifdef EXTRA_DEBUG
  trace_file= fopen("maria_recovery.trace", "a+");
#else
  trace_file= NULL; /* no trace file for being fast */
#endif
  tprint(trace_file, "TRACE of the last MARIA recovery from mysqld\n");
  DBUG_ASSERT(maria_pagecache->inited);
  res= maria_apply_log(LSN_IMPOSSIBLE, MARIA_LOG_APPLY, trace_file,
                       TRUE, TRUE, TRUE, &warnings_count);
  if (!res)
  {
    if (warnings_count == 0)
      tprint(trace_file, "SUCCESS\n");
    else
    {
      tprint(trace_file, "DOUBTFUL (%u warnings, check previous output)\n",
             warnings_count);
      /*
        We asked for execution of UNDOs, and skipped DDLs, so shouldn't get
        any warnings.
      */
      DBUG_ASSERT(0);
    }
  }
  if (trace_file)
    fclose(trace_file);
  maria_in_recovery= FALSE;
  DBUG_RETURN(res);
}


/**
   @brief Displays and/or applies the log

   @param  from_lsn        LSN from which log reading/applying should start;
                           LSN_IMPOSSIBLE means "use last checkpoint"
   @param  apply           how log records should be applied or not
   @param  trace_file      trace file where progress/debug messages will go
   @param  skip_DDLs_arg   Should DDL records (CREATE/RENAME/DROP/REPAIR)
                           be skipped by the REDO phase or not
   @param  take_checkpoints Should we take checkpoints or not.
   @param[out] warnings_count Count of warnings will be put there

   @todo This trace_file thing is primitive; soon we will make it similar to
   ma_check_print_warning() etc, and a successful recovery does not need to
   create a trace file. But for debugging now it is useful.

   @return Operation status
     @retval 0      OK
     @retval !=0    Error
*/

int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
                    FILE *trace_file,
                    my_bool should_run_undo_phase, my_bool skip_DDLs_arg,
                    my_bool take_checkpoints, uint *warnings_count)
{
  int error= 0;
  uint uncommitted_trans;
  ulonglong old_now;
  DBUG_ENTER("maria_apply_log");

  DBUG_ASSERT(apply == MARIA_LOG_APPLY || !should_run_undo_phase);
  DBUG_ASSERT(!maria_multi_threaded);
  warnings= 0;
  /* checkpoints can happen only if TRNs have been built */
  DBUG_ASSERT(should_run_undo_phase || !take_checkpoints);
  all_active_trans= (struct st_trn_for_recovery *)
    my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
              MYF(MY_ZEROFILL));
  all_tables= (struct st_table_for_recovery *)
    my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
              MYF(MY_ZEROFILL));
  if (!all_active_trans || !all_tables)
    goto err;

  if (take_checkpoints && ma_checkpoint_init(0))
    goto err;

  recovery_message_printed= REC_MSG_NONE;
  tracef= trace_file;
  skip_DDLs= skip_DDLs_arg;

  if (from_lsn == LSN_IMPOSSIBLE)
  {
    if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
    {
      from_lsn= translog_first_theoretical_lsn();
      /*
        as far as we have not yet any checkpoint then the very first
        log file should be present.
      */
      if (unlikely((from_lsn == LSN_IMPOSSIBLE) ||
                   (from_lsn == LSN_ERROR)))
        goto err;
    }
    else
    {
      from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
      if (from_lsn == LSN_ERROR)
        goto err;
    }
  }

  now= my_getsystime();
  if (run_redo_phase(from_lsn, apply))
  {
    ma_message_no_user(0, "Redo phase failed");
    goto err;
  }

  if ((uncommitted_trans=
       end_of_redo_phase(should_run_undo_phase)) == (uint)-1)
  {
    ma_message_no_user(0, "End of redo phase failed");
    goto err;
  }

  old_now= now;
  now= my_getsystime();
  if (recovery_message_printed == REC_MSG_REDO)
  {
    float phase_took= (now - old_now)/10000000.0;
    /*
      Detailed progress info goes to stderr, because ma_message_no_user()
      cannot put several messages on one line.
    */
    procent_printed= 1;
    fprintf(stderr, " (%.1f seconds); ", phase_took);
  }

  /**
     REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be
     wrong: if a future recovery used it, the REDO phase would always
     start from the checkpoint and never from before, wrongly skipping REDOs
     (tested). Another problem is that the REDO phase uses
     PAGECACHE_PLAIN_PAGE, while Checkpoint only collects PAGECACHE_LSN_PAGE.

     @todo fix this. pagecache_write() now can have a rec_lsn argument. And we
     could make a function which goes through pages at end of REDO phase and
     changes their type.
  */
#ifdef FIX_AND_ENABLE_LATER
  if (take_checkpoints && checkpoint_useful)
  {
    /*
      We take a checkpoint as it can save future recovery work if we crash
      during the UNDO phase. But we don't flush pages, as UNDOs will change
      them again probably.
    */
    if (ma_checkpoint_execute(CHECKPOINT_INDIRECT, FALSE))
      goto err;
  }
#endif

  if (should_run_undo_phase)
  {
    if (run_undo_phase(uncommitted_trans))
    {
      ma_message_no_user(0, "Undo phase failed");
      goto err;
    }
  }
  else if (uncommitted_trans > 0)
  {
    tprint(tracef, "***WARNING: %u uncommitted transactions; some tables may"
           " be left inconsistent!***\n", uncommitted_trans);
    warnings++;
  }

  old_now= now;
  now= my_getsystime();
  if (recovery_message_printed == REC_MSG_UNDO)
  {
    float phase_took= (now - old_now)/10000000.0;
    procent_printed= 1;
    fprintf(stderr, " (%.1f seconds); ", phase_took);
  }

  /*
    we don't use maria_panic() because it would maria_end(), and Recovery does
    not want that (we want to keep some modules initialized for runtime).
  */
  if (close_all_tables())
  {
    ma_message_no_user(0, "closing of tables failed");
    goto err;
  }

  old_now= now;
  now= my_getsystime();
  if (recovery_message_printed == REC_MSG_FLUSH)
  {
    float phase_took= (now - old_now)/10000000.0;
    procent_printed= 1;
    fprintf(stderr, " (%.1f seconds); ", phase_took);
  }

  if (take_checkpoints && checkpoint_useful)
  {
    /* No dirty pages, all tables are closed, no active transactions, save: */
    if (ma_checkpoint_execute(CHECKPOINT_FULL, FALSE))
      goto err;
  }

  goto end;
err:
  error= 1;
  tprint(tracef, "\nRecovery of tables with transaction logs FAILED\n");
end:
  hash_free(&all_dirty_pages);
  bzero(&all_dirty_pages, sizeof(all_dirty_pages));
  my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
  dirty_pages_pool= NULL;
  my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
  all_tables= NULL;
  my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
  all_active_trans= NULL;
  my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR));
  log_record_buffer.str= NULL;
  log_record_buffer.length= 0;
  ma_checkpoint_end();
  *warnings_count= warnings;
  if (recovery_message_printed != REC_MSG_NONE)
  {
    fprintf(stderr, "\n");
    if (!error)
      ma_message_no_user(ME_JUST_INFO, "recovery done");
  }
  if (error)
    my_message(HA_ERR_INITIALIZATION,
               "Maria recovery failed. Please run maria_chk -r on all maria "
               "tables and delete all maria_log.######## files", MYF(0));
  procent_printed= 0;
  /* we don't cleanly close tables if we hit some error (may corrupt them) */
  DBUG_RETURN(error);
}


/* very basic info about the record's header */
static void display_record_position(const LOG_DESC *log_desc,
                                    const TRANSLOG_HEADER_BUFFER *rec,
                                    uint number)
{
  /*
    if number==0, we're going over records which we had already seen and which
    form a group, so we indent below the group's end record
  */
  tprint(tracef,
         "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n",
         number ? "" : "   ", number, LSN_IN_PARTS(rec->lsn),
         rec->short_trid, log_desc->name, rec->type,
         (ulong)rec->record_length);
}


static int display_and_apply_record(const LOG_DESC *log_desc,
                                    const TRANSLOG_HEADER_BUFFER *rec)
{
  int error;
  if (log_desc->record_execute_in_redo_phase == NULL)
  {
    /* die on all not-yet-handled records :) */
    DBUG_ASSERT("one more hook" == "to write");
    return 1;
  }
  if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
    eprint(tracef, "Got error %d when executing record\n", my_errno);
  return error;
}


prototype_redo_exec_hook(LONG_TRANSACTION_ID)
{
  uint16 sid= rec->short_trid;
  TrID long_trid= all_active_trans[sid].long_trid;
  /*
    Any incomplete group should be of an old crash which already had a
    recovery and thus has logged INCOMPLETE_GROUP which we must have seen.
  */
  DBUG_ASSERT(all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE);
  if (long_trid != 0)
  {
    LSN ulsn= all_active_trans[sid].undo_lsn;
    /*
      If the first record of that transaction is after 'rec', it's probably
      because that transaction was found in the checkpoint record, and then
      it's ok, we can forget about that transaction (we'll meet it later
      again in the REDO phase) and replace it with the one in 'rec'.
    */
    if ((ulsn != LSN_IMPOSSIBLE) &&
        (cmp_translog_addr(ulsn, rec->lsn) < 0))
    {
      char llbuf[22];
      llstr(long_trid, llbuf);
      eprint(tracef, "Found an old transaction long_trid %s short_trid %u"
             " with same short id as this new transaction, and has neither"
             " committed nor rollback (undo_lsn: (%lu,0x%lx))\n",
             llbuf, sid, LSN_IN_PARTS(ulsn));
      goto err;
    }
  }
  long_trid= uint6korr(rec->header);
  new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
  goto end;
err:
  ALERT_USER();
  return 1;
end:
  return 0;
}


static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
                            LSN first_undo_lsn)
{
  char llbuf[22];
  all_active_trans[sid].long_trid= long_id;
  llstr(long_id, llbuf);
  tprint(tracef, "Transaction long_trid %s short_trid %u starts\n",
         llbuf, sid);
  all_active_trans[sid].undo_lsn= undo_lsn;
  all_active_trans[sid].first_undo_lsn= first_undo_lsn;
  set_if_bigger(max_long_trid, long_id);
}


prototype_redo_exec_hook_dummy(CHECKPOINT)
{
  /* the only checkpoint we care about was found via control file, ignore */
  return 0;
}


prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)
{
  /* abortion was already made */
  return 0;
}

prototype_redo_exec_hook(INCOMPLETE_LOG)
{
  MARIA_HA *info;
  if (skip_DDLs)
  {
    tprint(tracef, "we skip DDLs\n");
    return 0;
  }
  if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL)
  {
    /* no such table, don't need to warn */
    return 0;
  }
  /*
    Example of what can go wrong when replaying DDLs:
    CREATE TABLE t (logged); INSERT INTO t VALUES(1) (logged);
    ALTER TABLE t ... which does
    CREATE a temporary table #sql... (logged)
    INSERT data from t into #sql... (not logged)
    RENAME #sql TO t (logged)
    Removing tables by hand and replaying the log will leave in the
    end an empty table "t": missing records. If after the RENAME an INSERT
    into t was done, that row had number 1 in its page, executing the
    REDO_INSERT_ROW_HEAD on the recreated empty t will fail (assertion
    failure in _ma_apply_redo_insert_row_head_or_tail(): new data page is
    created whereas rownr is not 0).
    So when the server disables logging for ALTER TABLE or CREATE SELECT, it
    logs LOGREC_INCOMPLETE_LOG to warn maria_read_log and then the user.

    Another issue is that replaying of DDLs is not correct enough to work if
    there was a crash during a DDL (see comment in execution of
    REDO_RENAME_TABLE ).
  */
  tprint(tracef, "***WARNING: MySQL server currently logs no records"
         " about insertion of data by ALTER TABLE and CREATE SELECT,"
         " as they are not necessary for recovery;"
         " present applying of log records may well not work.***\n");
  warnings++;
  return 0;
}


prototype_redo_exec_hook(REDO_CREATE_TABLE)
{
  File dfile= -1, kfile= -1;
  char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *data_file_name,
    *index_file_name;
  uchar *kfile_header;
  myf create_flag;
  uint flags;
  int error= 1, create_mode= O_RDWR | O_TRUNC;
  MARIA_HA *info= NULL;
  uint kfile_size_before_extension, keystart;

  if (skip_DDLs)
  {
    tprint(tracef, "we skip DDLs\n");
    return 0;
  }
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
      rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    goto end;
  }
  name= (char *)log_record_buffer.str;
  /*
    TRUNCATE TABLE and REPAIR USE_FRM call maria_create(), so below we can
    find a REDO_CREATE_TABLE for a table which we have open, that's why we
    need to look for any open instances and close them first.
  */
  if (close_one_table(name, rec->lsn))
  {
    eprint(tracef, "Table '%s' got error %d on close\n", name, my_errno);
    ALERT_USER();
    goto end;
  }
  /* we try hard to get create_rename_lsn, to avoid mistakes if possible */
  info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
  if (info)
  {
    MARIA_SHARE *share= info->s;
    /* check that we're not already using it */
    if (share->reopen != 1)
    {
      eprint(tracef, "Table '%s is already open (reopen=%u)\n",
             name, share->reopen);
      ALERT_USER();
      goto end;
    }
    DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
    if (!share->base.born_transactional)
    {
      /*
        could be that transactional table was later dropped, and a non-trans
        one was renamed to its name, thus create_rename_lsn is 0 and should
        not be trusted.
      */
      tprint(tracef, "Table '%s' is not transactional, ignoring creation\n",
             name);
      ALERT_USER();
      error= 0;
      goto end;
    }
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
    {
      tprint(tracef, "Table '%s' has create_rename_lsn (%lu,0x%lx) more "
             "recent than record, ignoring creation",
             name, LSN_IN_PARTS(share->state.create_rename_lsn));
      error= 0;
      goto end;
    }
    if (maria_is_crashed(info))
    {
      eprint(tracef, "Table '%s' is crashed, can't recreate it\n", name);
      ALERT_USER();
      goto end;
    }
    maria_close(info);
    info= NULL;
  }
  else /* one or two files absent, or header corrupted... */
    tprint(tracef, "Table '%s' can't be opened, probably does not exist\n",
           name);
  /* if does not exist, or is older, overwrite it */
  ptr= name + strlen(name) + 1;
  if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
    tprint(tracef, ", we will only touch index file");
  ptr++;
  kfile_size_before_extension= uint2korr(ptr);
  ptr+= 2;
  keystart= uint2korr(ptr);
  ptr+= 2;
  kfile_header= (uchar *)ptr;
  ptr+= kfile_size_before_extension;
  /* set create_rename_lsn (for maria_read_log to be idempotent) */
  lsn_store(kfile_header + sizeof(info->s->state.header) + 2, rec->lsn);
  /* we also set is_of_horizon, like maria_create() does */
  lsn_store(kfile_header + sizeof(info->s->state.header) + 2 + LSN_STORE_SIZE,
            rec->lsn);
  data_file_name= ptr;
  ptr+= strlen(data_file_name) + 1;
  index_file_name= ptr;
  ptr+= strlen(index_file_name) + 1;
  /** @todo handle symlinks */
  if (data_file_name[0] || index_file_name[0])
  {
    eprint(tracef, "Table '%s' DATA|INDEX DIRECTORY clauses are not handled\n",
           name);
    goto end;
  }
  fn_format(filename, name, "", MARIA_NAME_IEXT,
            (MY_UNPACK_FILENAME |
             (flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) |
            MY_APPEND_EXT);
  linkname_ptr= NULL;
  create_flag= MY_DELETE_OLD;
  tprint(tracef, "Table '%s' creating as '%s'", name, filename);
  if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
                                     MYF(MY_WME|create_flag))) < 0)
  {
    eprint(tracef, "Failed to create index file\n");
    goto end;
  }
  if (my_pwrite(kfile, kfile_header,
                kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
      my_chsize(kfile, keystart, 0, MYF(MY_WME)))
  {
    eprint(tracef, "Failed to write to index file\n");
    goto end;
  }
  if (!(flags & HA_DONT_TOUCH_DATA))
  {
    fn_format(filename,name,"", MARIA_NAME_DEXT,
              MY_UNPACK_FILENAME | MY_APPEND_EXT);
    linkname_ptr= NULL;
    create_flag=MY_DELETE_OLD;
    if (((dfile=
          my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
                                 MYF(MY_WME | create_flag))) < 0) ||
        my_close(dfile, MYF(MY_WME)))
    {
      eprint(tracef, "Failed to create data file\n");
      goto end;
    }
    /*
      we now have an empty data file. To be able to
      _ma_initialize_data_file() we need some pieces of the share to be
      correctly filled. So we just open the table (fortunately, an empty
      data file does not preclude this).
    */
    if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
        _ma_initialize_data_file(info->s, info->dfile.file))
    {
      eprint(tracef, "Failed to open new table or write to data file\n");
      goto end;
    }
  }
  error= 0;
end:
  tprint(tracef, "\n");
  if (kfile >= 0)
    error|= my_close(kfile, MYF(MY_WME));
  if (info != NULL)
    error|= maria_close(info);
  return error;
}


prototype_redo_exec_hook(REDO_RENAME_TABLE)
{
  char *old_name, *new_name;
  int error= 1;
  MARIA_HA *info= NULL;
  if (skip_DDLs)
  {
    tprint(tracef, "we skip DDLs\n");
    return 0;
  }
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
      rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    goto end;
  }
  old_name= (char *)log_record_buffer.str;
  new_name= old_name + strlen(old_name) + 1;
  tprint(tracef, "Table '%s' to rename to '%s'; old-name table ", old_name,
         new_name);
  /*
    Here is why we skip CREATE/DROP/RENAME when doing a recovery from
    ha_maria (whereas we do when called from maria_read_log). Consider:
    CREATE TABLE t;
    RENAME TABLE t to u;
    DROP TABLE u;
    RENAME TABLE v to u; # crash between index rename and data rename.
    And do a Recovery (not removing tables beforehand).
    Recovery replays CREATE, then RENAME: the maria_open("t") works,
    maria_open("u") does not (no data file) so table "u" is considered
    inexistent and so maria_rename() is done which overwrites u's index file,
    which is lost. Ok, the data file (v.MAD) is still available, but only a
    REPAIR USE_FRM can rebuild the index, which is unsafe and downtime.
    So it is preferrable to not execute RENAME, and leave the "mess" of files,
    rather than possibly destroy a file. DBA will manually rename files.
    A safe recovery method would probably require checking the existence of
    the index file and of the data file separately (not via maria_open()), and
    maybe also to store a create_rename_lsn in the data file too
    For now, all we risk is to leave the mess (half-renamed files) left by the
    crash. We however sync files and directories at each file rename. The SQL
    layer is anyway not crash-safe for DDLs (except the repartioning-related
    ones).
    We replay DDLs in maria_read_log to be able to recreate tables from
    scratch. It means that "maria_read_log -a" should not be used on a
    database which just crashed during a DDL. And also ALTER TABLE does not
    log insertions of records into the temporary table, so replaying may
    fail (grep for INCOMPLETE_LOG in files).
  */
  info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
  if (info)
  {
    MARIA_SHARE *share= info->s;
    if (!share->base.born_transactional)
    {
      tprint(tracef, ", is not transactional, ignoring renaming\n");
      ALERT_USER();
      error= 0;
      goto end;
    }
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
    {
      tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
             " record, ignoring renaming",
             LSN_IN_PARTS(share->state.create_rename_lsn));
      error= 0;
      goto end;
    }
    if (maria_is_crashed(info))
    {
      tprint(tracef, ", is crashed, can't rename it");
      ALERT_USER();
      goto end;
    }
    if (close_one_table(info->s->open_file_name, rec->lsn) ||
        maria_close(info))
      goto end;
    info= NULL;
    tprint(tracef, ", is ok for renaming; new-name table ");
  }
  else /* one or two files absent, or header corrupted... */
  {
    tprint(tracef, ", can't be opened, probably does not exist");
    error= 0;
    goto end;
  }
  /*
    We must also check the create_rename_lsn of the 'new_name' table if it
    exists: otherwise we may, with our rename which overwrites, destroy
    another table. For example:
    CREATE TABLE t;
    RENAME t to u;
    DROP TABLE u;
    RENAME v to u; # v is an old table, its creation/insertions not in log
    And start executing the log (without removing tables beforehand): creates
    t, renames it to u (if not testing create_rename_lsn) thus overwriting
    old-named v, drops u, and we are stuck, we have lost data.
  */
  info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
  if (info)
  {
    MARIA_SHARE *share= info->s;
    /* We should not have open instances on this table. */
    if (share->reopen != 1)
    {
      tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
      ALERT_USER();
      goto end;
    }
    if (!share->base.born_transactional)
    {
      tprint(tracef, ", is not transactional, ignoring renaming\n");
      ALERT_USER();
      goto drop;
    }
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
    {
      tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
             " record, ignoring renaming",
             LSN_IN_PARTS(share->state.create_rename_lsn));
      /*
        We have to drop the old_name table. Consider:
        CREATE TABLE t;
        CREATE TABLE v;
        RENAME TABLE t to u;
        DROP TABLE u;
        RENAME TABLE v to u;
        and apply the log without removing tables beforehand. t will be
        created, v too; in REDO_RENAME u will be more recent, but we still
        have to drop t otherwise it stays.
      */
      goto drop;
    }
    if (maria_is_crashed(info))
    {
      tprint(tracef, ", is crashed, can't rename it");
      ALERT_USER();
      goto end;
    }
    if (maria_close(info))
      goto end;
    info= NULL;
    /* abnormal situation */
    tprint(tracef, ", exists but is older than record, can't rename it");
    goto end;
  }
  else /* one or two files absent, or header corrupted... */
    tprint(tracef, ", can't be opened, probably does not exist");
  tprint(tracef, ", renaming '%s'", old_name);
  if (maria_rename(old_name, new_name))
  {
    eprint(tracef, "Failed to rename table\n");
    goto end;
  }
  info= maria_open(new_name, O_RDONLY, 0);
  if (info == NULL)
  {
    eprint(tracef, "Failed to open renamed table\n");
    goto end;
  }
  if (_ma_update_create_rename_lsn(info->s, rec->lsn, TRUE))
    goto end;
  if (maria_close(info))
    goto end;
  info= NULL;
  error= 0;
  goto end;
drop:
  tprint(tracef, ", only dropping '%s'", old_name);
  if (maria_delete_table(old_name))
  {
    eprint(tracef, "Failed to drop table\n");
    goto end;
  }
  error= 0;
  goto end;
end:
  tprint(tracef, "\n");
  if (info != NULL)
    error|= maria_close(info);
  return error;
}


/*
  The record may come from REPAIR, ALTER TABLE ENABLE KEYS, OPTIMIZE.
*/
prototype_redo_exec_hook(REDO_REPAIR_TABLE)
{
  int error= 1;
  MARIA_HA *info;
  HA_CHECK param;
  char *name;
  uint quick_repair;
  DBUG_ENTER("exec_REDO_LOGREC_REDO_REPAIR_TABLE");

  if (skip_DDLs)
  {
    /*
      REPAIR is not exactly a DDL, but it manipulates files without logging
      insertions into them.
    */
    tprint(tracef, "we skip DDLs\n");
    DBUG_RETURN(0);
  }
  if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL)
    DBUG_RETURN(0);

  /*
    Otherwise, the mapping is newer than the table, and our record is newer
    than the mapping, so we can repair.
  */
  tprint(tracef, "   repairing...\n");

  maria_chk_init(&param);
  param.isam_file_name= name= info->s->open_file_name;
  param.testflag= uint4korr(rec->header + FILEID_STORE_SIZE);
  param.tmpdir= maria_tmpdir;
  DBUG_ASSERT(maria_tmpdir);

  info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 4);
  quick_repair= param.testflag & T_QUICK;


  if (param.testflag & T_REP_PARALLEL)
  {
    if (maria_repair_parallel(&param, info, name, quick_repair))
      goto end;
  }
  else if (param.testflag & T_REP_BY_SORT)
  {
    if (maria_repair_by_sort(&param, info, name, quick_repair))
      goto end;
  }
  else if (maria_repair(&param, info, name, quick_repair))
    goto end;

  if (_ma_update_create_rename_lsn(info->s, rec->lsn, TRUE))
    goto end;
  error= 0;

end:
  DBUG_RETURN(error);
}


prototype_redo_exec_hook(REDO_DROP_TABLE)
{
  char *name;
  int error= 1;
  MARIA_HA *info;
  if (skip_DDLs)
  {
    tprint(tracef, "we skip DDLs\n");
    return 0;
  }
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
      rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    return 1;
  }
  name= (char *)log_record_buffer.str;
  tprint(tracef, "Table '%s'", name);
  info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
  if (info)
  {
    MARIA_SHARE *share= info->s;
    if (!share->base.born_transactional)
    {
      tprint(tracef, ", is not transactional, ignoring removal\n");
      ALERT_USER();
      error= 0;
      goto end;
    }
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
    {
      tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
             " record, ignoring removal",
             LSN_IN_PARTS(share->state.create_rename_lsn));
      error= 0;
      goto end;
    }
    if (maria_is_crashed(info))
    {
      tprint(tracef, ", is crashed, can't drop it");
      ALERT_USER();
      goto end;
    }
    if (close_one_table(info->s->open_file_name, rec->lsn) ||
        maria_close(info))
      goto end;
    info= NULL;
    /* if it is older, or its header is corrupted, drop it */
    tprint(tracef, ", dropping '%s'", name);
    if (maria_delete_table(name))
    {
      eprint(tracef, "Failed to drop table\n");
      goto end;
    }
  }
  else /* one or two files absent, or header corrupted... */
    tprint(tracef,", can't be opened, probably does not exist");
  error= 0;
end:
  tprint(tracef, "\n");
  if (info != NULL)
    error|= maria_close(info);
  return error;
}


prototype_redo_exec_hook(FILE_ID)
{
  uint16 sid;
  int error= 1;
  const char *name;
  MARIA_HA *info;
  DBUG_ENTER("exec_REDO_LOGREC_FILE_ID");

  if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
  {
    /*
      If that mapping was still true at checkpoint time, it was found in
      checkpoint record, no need to recreate it. If that mapping had ended at
      checkpoint time (table was closed or repaired), a flush and force
      happened and so mapping is not needed.
    */
    tprint(tracef, "ignoring because before checkpoint\n");
    DBUG_RETURN(0);
  }

  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    goto end;
  }
  sid= fileid_korr(log_record_buffer.str);
  info= all_tables[sid].info;
  if (info != NULL)
  {
    tprint(tracef, "   Closing table '%s'\n", info->s->open_file_name);
    prepare_table_for_close(info, rec->lsn);
    if (maria_close(info))
    {
      eprint(tracef, "Failed to close table\n");
      goto end;
    }
    all_tables[sid].info= NULL;
  }
  name= (char *)log_record_buffer.str + FILEID_STORE_SIZE;
  if (new_table(sid, name, -1, -1, rec->lsn))
    goto end;
  error= 0;
end:
  DBUG_RETURN(error);
}


static int new_table(uint16 sid, const char *name,
                     File org_kfile, File org_dfile,
                     LSN lsn_of_file_id)
{
  /*
    -1 (skip table): close table and return 0;
    1 (error): close table and return 1;
    0 (success): leave table open and return 0.
  */
  int error= 1;
  MARIA_HA *info;
  MARIA_SHARE *share;
  my_off_t dfile_len, kfile_len;

  checkpoint_useful= TRUE;
  if ((name == NULL) || (name[0] == 0))
  {
    /*
      we didn't use DBUG_ASSERT() because such record corruption could
      silently pass in the "info == NULL" test below.
    */
    tprint(tracef, ", record is corrupted");
    info= NULL;
    goto end;
  }
  tprint(tracef, "Table '%s', id %u", name, sid);
  info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
  if (info == NULL)
  {
    tprint(tracef, ", is absent (must have been dropped later?)"
           " or its header is so corrupted that we cannot open it;"
           " we skip it");
    error= 0;
    goto end;
  }
  if (maria_is_crashed(info))
  {
    /** @todo what should we do? how to continue recovery? */
    tprint(tracef, "Table is crashed, can't apply log records to it\n");
    goto end;
  }
  share= info->s;
  /* check that we're not already using it */
  if (share->reopen != 1)
  {
    tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
    /*
      It could be that we have in the log
      FILE_ID(t1,10) ... (t1 was flushed) ... FILE_ID(t1,12);
    */
    if (close_one_table(share->open_file_name, lsn_of_file_id))
      goto end;
  }
  DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
  if (!share->base.born_transactional)
  {
    tprint(tracef, ", is not transactional\n");
    ALERT_USER();
    error= -1;
    goto end;
  }
  if (cmp_translog_addr(lsn_of_file_id, share->state.create_rename_lsn) <= 0)
  {
    tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
           " LOGREC_FILE_ID's LSN (%lu,0x%lx), ignoring open request",
           LSN_IN_PARTS(share->state.create_rename_lsn),
           LSN_IN_PARTS(lsn_of_file_id));
    error= -1;
    goto end;
  }
  /* don't log any records for this work */
  _ma_tmp_disable_logging_for_table(info, FALSE);
  /* _ma_unpin_all_pages() reads info->trn: */
  info->trn= &dummy_transaction_object;
  /* execution of some REDO records relies on data_file_length */
  dfile_len= my_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME));
  kfile_len= my_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME));
  if ((dfile_len == MY_FILEPOS_ERROR) ||
      (kfile_len == MY_FILEPOS_ERROR))
  {
    tprint(tracef, ", length unknown\n");
    goto end;
  }
  if (share->state.state.data_file_length != dfile_len)
  {
    tprint(tracef, ", has wrong state.data_file_length (fixing it)");
    share->state.state.data_file_length= dfile_len;
  }
  if (share->state.state.key_file_length != kfile_len)
  {
    tprint(tracef, ", has wrong state.key_file_length (fixing it)");
    share->state.state.key_file_length= kfile_len;
  }
  if ((dfile_len % share->block_size) || (kfile_len % share->block_size))
  {
    tprint(tracef, ", has too short last page\n");
    /* Recovery will fix this, no error */
    ALERT_USER();
  }
  /*
    This LSN serves in this situation; assume log is:
    FILE_ID(6->"t2") REDO_INSERT(6) FILE_ID(6->"t1") CHECKPOINT(6->"t1")
    then crash, checkpoint record is parsed and opens "t1" with id 6; assume
    REDO phase starts from the REDO_INSERT above: it will wrongly try to
    update a page of "t1". With this LSN below, REDO_INSERT can realize the
    mapping is newer than itself, and not execute.
    Same example is possible with UNDO_INSERT (update of the state).
  */
  info->s->lsn_of_file_id= lsn_of_file_id;
  all_tables[sid].info= info;
  all_tables[sid].org_kfile= org_kfile;
  all_tables[sid].org_dfile= org_dfile;
  /*
    We don't set info->s->id, it would be useless (no logging in REDO phase);
    if you change that, know that some records in REDO phase call
    _ma_update_create_rename_lsn() which resets info->s->id.
  */
  tprint(tracef, ", opened");
  error= 0;
end:
  tprint(tracef, "\n");
  if (error)
  {
    if (info != NULL)
      maria_close(info);
    if (error == -1)
      error= 0;
  }
  return error;
}


prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
{
  int error= 1;
  uchar *buff= NULL;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
  {
    /*
      Table was skipped at open time (because later dropped/renamed, not
      transactional, or create_rename_lsn newer than LOGREC_FILE_ID); it is
      not an error.
    */
    return 0;
  }
  /*
    If REDO's LSN is > page's LSN (read from disk), we are going to modify the
    page and change its LSN. The normal runtime code stores the UNDO's LSN
    into the page. Here storing the REDO's LSN (rec->lsn) would work
    (we are not writing to the log here, so don't have to "flush up to UNDO's
    LSN"). But in a test scenario where we do updates at runtime, then remove
    tables, apply the log and check that this results in the same table as at
    runtime, putting the same LSN as runtime had done will decrease
    differences. So we use the UNDO's LSN which is current_group_end_lsn.
  */
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL)
  {
    eprint(tracef, "Failed to read allocate buffer for record\n");
    goto end;
  }
  if (translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
      rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    goto end;
  }
  buff= log_record_buffer.str;
  if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
                                             HEAD_PAGE,
                                             buff + FILEID_STORE_SIZE,
                                             buff +
                                             FILEID_STORE_SIZE +
                                             PAGE_STORE_SIZE +
                                             DIRPOS_STORE_SIZE,
                                             rec->record_length -
                                             (FILEID_STORE_SIZE +
                                              PAGE_STORE_SIZE +
                                              DIRPOS_STORE_SIZE)))
    goto end;
  error= 0;
end:
  return error;
}


prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)
{
  int error= 1;
  uchar *buff;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    goto end;
  }
  buff= log_record_buffer.str;
  if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
                                             TAIL_PAGE,
                                             buff + FILEID_STORE_SIZE,
                                             buff +
                                             FILEID_STORE_SIZE +
                                             PAGE_STORE_SIZE +
                                             DIRPOS_STORE_SIZE,
                                             rec->record_length -
                                             (FILEID_STORE_SIZE +
                                              PAGE_STORE_SIZE +
                                              DIRPOS_STORE_SIZE)))
    goto end;
  error= 0;

end:
  return error;
}


prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)
{
  int error= 1;
  uchar *buff;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    goto end;
  }
  buff= log_record_buffer.str;
  if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn,
                                      buff + FILEID_STORE_SIZE))
    goto end;
  error= 0;

end:
  return error;
}


prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;
  if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
                                            HEAD_PAGE,
                                            rec->header + FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}


prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;
  if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
                                            TAIL_PAGE,
                                            rec->header + FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}


prototype_redo_exec_hook(REDO_FREE_BLOCKS)
{
  int error= 1;
  uchar *buff;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;
  enlarge_buffer(rec);

  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    goto end;
  }

  buff= log_record_buffer.str;
  if (_ma_apply_redo_free_blocks(info, current_group_end_lsn,
                                 buff + FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}


prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;

  if (_ma_apply_redo_free_head_or_tail(info, current_group_end_lsn,
                                       rec->header + FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}


prototype_redo_exec_hook(REDO_DELETE_ALL)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;
  tprint(tracef, "   deleting all %lu rows\n",
         (ulong)info->s->state.state.records);
  if (maria_delete_all_rows(info))
    goto end;
  error= 0;
end:
  return error;
}


prototype_redo_exec_hook(REDO_INDEX)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;
  enlarge_buffer(rec);

  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    goto end;
  }

  if (_ma_apply_redo_index(info, current_group_end_lsn,
                           log_record_buffer.str + FILEID_STORE_SIZE,
                           rec->record_length - FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}

prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;
  enlarge_buffer(rec);

  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    goto end;
  }

  if (_ma_apply_redo_index_new_page(info, current_group_end_lsn,
                                    log_record_buffer.str + FILEID_STORE_SIZE,
                                    rec->record_length - FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}


prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    return 0;

  if (_ma_apply_redo_index_free_page(info, current_group_end_lsn,
                                     rec->header + FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}


#define set_undo_lsn_for_active_trans(TRID, LSN) do {  \
    all_active_trans[TRID].undo_lsn= LSN;                            \
    if (all_active_trans[TRID].first_undo_lsn == LSN_IMPOSSIBLE)   \
      all_active_trans[TRID].first_undo_lsn= LSN; } while (0)

prototype_redo_exec_hook(UNDO_ROW_INSERT)
{
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  MARIA_SHARE *share;

  if (info == NULL)
    return 0;
  share= info->s;
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
  {
    tprint(tracef, "   state has LSN (%lu,0x%lx) older than record, updating"
           " rows' count\n", LSN_IN_PARTS(share->state.is_of_horizon));
    share->state.state.records++;
    if (share->calc_checksum)
    {
      uchar buff[HA_CHECKSUM_STORE_SIZE];
      if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
                               PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
                               HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
          HA_CHECKSUM_STORE_SIZE)
      {
        eprint(tracef, "Failed to read record\n");
        return 1;
      }
      share->state.state.checksum+= ha_checksum_korr(buff);
    }
    info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;
  }
  tprint(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
  /* Unpin all pages, stamp them with UNDO's LSN */
  _ma_unpin_all_pages(info, rec->lsn);
  return 0;
}


prototype_redo_exec_hook(UNDO_ROW_DELETE)
{
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  MARIA_SHARE *share;

  if (info == NULL)
    return 0;
  share= info->s;
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
  {
    tprint(tracef, "   state older than record\n");
    share->state.state.records--;
    if (share->calc_checksum)
    {
      uchar buff[HA_CHECKSUM_STORE_SIZE];
      if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
                               PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
                               HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
          HA_CHECKSUM_STORE_SIZE)
      {
        eprint(tracef, "Failed to read record\n");
        return 1;
      }
      share->state.state.checksum+= ha_checksum_korr(buff);
    }
    share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
      STATE_NOT_OPTIMIZED_ROWS;
  }
  tprint(tracef, "   rows' count %lu\n", (ulong)share->state.state.records);
  _ma_unpin_all_pages(info, rec->lsn);
  return 0;
}


prototype_redo_exec_hook(UNDO_ROW_UPDATE)
{
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  MARIA_SHARE *share;
  if (info == NULL)
    return 0;
  share= info->s;
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
  {
    if (share->calc_checksum)
    {
      uchar buff[HA_CHECKSUM_STORE_SIZE];
      if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
                               PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
                               HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
          HA_CHECKSUM_STORE_SIZE)
      {
        eprint(tracef, "Failed to read record\n");
        return 1;
      }
      share->state.state.checksum+= ha_checksum_korr(buff);
    }
    share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;
  }
  _ma_unpin_all_pages(info, rec->lsn);
  return 0;
}


prototype_redo_exec_hook(UNDO_KEY_INSERT)
{
  MARIA_HA *info;
  MARIA_SHARE *share;
  if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
    return 0;
  share= info->s;
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
  {
    const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE;
    uint keynr= key_nr_korr(ptr);
    if (share->base.auto_key == (keynr + 1)) /* it's auto-increment */
    {
      const HA_KEYSEG *keyseg= info->s->keyinfo[keynr].seg;
      ulonglong value;
      char llbuf[22];
      uchar *to;
      tprint(tracef, "   state older than record\n");
      /* we read the record to find the auto_increment value */
      enlarge_buffer(rec);
      if (log_record_buffer.str == NULL ||
          translog_read_record(rec->lsn, 0, rec->record_length,
                               log_record_buffer.str, NULL) !=
          rec->record_length)
      {
        eprint(tracef, "Failed to read record\n");
        return 1;
      }
      to= log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
        KEY_NR_STORE_SIZE;
      if (keyseg->flag & HA_SWAP_KEY)
      {
        /* We put key from log record to "data record" packing format... */
        uchar reversed[HA_MAX_KEY_BUFF];
        uchar *key_ptr= to;
        uchar *key_end= key_ptr + keyseg->length;
        to= reversed + keyseg->length;
        do
        {
          *--to= *key_ptr++;
        } while (key_ptr != key_end);
        /* ... so that we can read it with: */
      }
      value= ma_retrieve_auto_increment(to, keyseg->type);
      set_if_bigger(share->state.auto_increment, value);
      llstr(share->state.auto_increment, llbuf);
      tprint(tracef, "   auto-inc %s\n", llbuf);
    }
  }
  _ma_unpin_all_pages(info, rec->lsn);
  return 0;
}


prototype_redo_exec_hook(UNDO_KEY_DELETE)
{
  MARIA_HA *info;
  if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
    return 0;
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  _ma_unpin_all_pages(info, rec->lsn);
  return 0;
}


prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
{
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  MARIA_SHARE *share;
  if (info == NULL)
    return 0;
  share= info->s;
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
  {
    uint key_nr;
    my_off_t page;
    key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
    page=  page_korr(rec->header +  LSN_STORE_SIZE + FILEID_STORE_SIZE +
                     KEY_NR_STORE_SIZE);
    share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
                                    HA_OFFSET_ERROR :
                                    page * share->block_size);
  }
  _ma_unpin_all_pages(info, rec->lsn);
  return 0;
}


prototype_redo_exec_hook(COMMIT)
{
  uint16 sid= rec->short_trid;
  TrID long_trid= all_active_trans[sid].long_trid;
  char llbuf[22];
  if (long_trid == 0)
  {
    tprint(tracef, "We don't know about transaction with short_trid %u;"
           "it probably committed long ago, forget it\n", sid);
    bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
    return 0;
  }
  llstr(long_trid, llbuf);
  tprint(tracef, "Transaction long_trid %s short_trid %u committed\n",
         llbuf, sid);
  bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
#ifdef MARIA_VERSIONING
  /*
    if real recovery:
    transaction was committed, move it to some separate list for later
    purging (but don't purge now! purging may have been started before, we
    may find REDO_PURGE records soon).
  */
#endif
  return 0;
}


/*
  Set position for next active record that will have key inserted
*/

static void set_lastpos(MARIA_HA *info, uchar *pos)
{
  ulonglong page;
  uint dir_entry;

  /* If we have checksum, it's before rowid */
  if (info->s->calc_checksum)
    pos+= HA_CHECKSUM_STORE_SIZE;
  page= page_korr(pos);
  dir_entry= dirpos_korr(pos + PAGE_STORE_SIZE);
  info->cur_row.lastpos= ma_recordpos(page, dir_entry);
}


prototype_redo_exec_hook(CLR_END)
{
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  MARIA_SHARE *share;
  LSN previous_undo_lsn;
  enum translog_record_type undone_record_type;
  const LOG_DESC *log_desc;
  my_bool row_entry= 0;
  uchar *logpos;
  DBUG_ENTER("exec_REDO_LOGREC_CLR_END");

  if (info == NULL)
    DBUG_RETURN(0);
  share= info->s;
  previous_undo_lsn= lsn_korr(rec->header);
  undone_record_type=
    clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
  log_desc= &log_record_type_descriptor[undone_record_type];

  set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn);
  tprint(tracef, "   CLR_END was about %s, undo_lsn now LSN (%lu,0x%lx)\n",
         log_desc->name, LSN_IN_PARTS(previous_undo_lsn));

  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
      rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    return 1;
  }
  logpos= (log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
           CLR_TYPE_STORE_SIZE);

  if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
  {
    tprint(tracef, "   state older than record\n");
    switch (undone_record_type) {
    case LOGREC_UNDO_ROW_DELETE:
      row_entry= 1;
      share->state.state.records++;
      set_lastpos(info, logpos);
      break;
    case LOGREC_UNDO_ROW_INSERT:
      share->state.state.records--;
      share->state.changed|= STATE_NOT_OPTIMIZED_ROWS;
      row_entry= 1;
      break;
    case LOGREC_UNDO_ROW_UPDATE:
      row_entry= 1;
      set_lastpos(info, logpos);
      break;
    case LOGREC_UNDO_KEY_INSERT:
    case LOGREC_UNDO_KEY_DELETE:
      break;
    case LOGREC_UNDO_KEY_INSERT_WITH_ROOT:
    case LOGREC_UNDO_KEY_DELETE_WITH_ROOT:
    {
      uint key_nr;
      my_off_t page;
      key_nr= key_nr_korr(logpos);
      page=  page_korr(logpos + KEY_NR_STORE_SIZE);
      share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
                                      HA_OFFSET_ERROR :
                                      page * share->block_size);
      break;
    }
    default:
      DBUG_ASSERT(0);
    }
    if (row_entry && share->calc_checksum)
      share->state.state.checksum+= ha_checksum_korr(logpos);
    share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;
  }
  else
  {
    /* We must set lastpos for upcoming undo delete keys */
    switch (undone_record_type) {
    case LOGREC_UNDO_ROW_DELETE:
    case LOGREC_UNDO_ROW_UPDATE:
      set_lastpos(info, logpos);
      break;
    default:
      break;
    }
  }
  if (row_entry)
    tprint(tracef, "   rows' count %lu\n", (ulong)share->state.state.records);
  _ma_unpin_all_pages(info, rec->lsn);
  DBUG_RETURN(0);
}


prototype_undo_exec_hook(UNDO_ROW_INSERT)
{
  my_bool error;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  LSN previous_undo_lsn= lsn_korr(rec->header);
  MARIA_SHARE *share;
  const uchar *record_ptr;

  if (info == NULL)
  {
    /*
      Unlike for REDOs, if the table was skipped it is abnormal; we have a
      transaction to rollback which used this table, as it is not rolled back
      it was supposed to hold this table and so the table should still be
      there.
    */
    return 1;
  }
  share= info->s;
  share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
    STATE_NOT_OPTIMIZED_ROWS;

  record_ptr= rec->header;
  if (share->calc_checksum)
  {
    /*
      We need to read more of the record to put the checksum into the record
      buffer used by _ma_apply_undo_row_insert().
      If the table has no live checksum, rec->header will be enough.
    */
    enlarge_buffer(rec);
    if (log_record_buffer.str == NULL ||
        translog_read_record(rec->lsn, 0, rec->record_length,
                             log_record_buffer.str, NULL) !=
        rec->record_length)
    {
      eprint(tracef, "Failed to read record\n");
      return 1;
    }
    record_ptr= log_record_buffer.str;
  }

  info->trn= trn;
  error= _ma_apply_undo_row_insert(info, previous_undo_lsn,
                                   record_ptr + LSN_STORE_SIZE +
                                   FILEID_STORE_SIZE);
  info->trn= 0;
  /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
  tprint(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
         LSN_IN_PARTS(previous_undo_lsn));
  return error;
}


prototype_undo_exec_hook(UNDO_ROW_DELETE)
{
  my_bool error;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  LSN previous_undo_lsn= lsn_korr(rec->header);
  MARIA_SHARE *share;

  if (info == NULL)
    return 1;

  share= info->s;
  share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    return 1;
  }

  info->trn= trn;
  /*
    For now we skip the page and directory entry. This is to be used
    later when we mark rows as deleted.
  */
  error= _ma_apply_undo_row_delete(info, previous_undo_lsn,
                                   log_record_buffer.str + LSN_STORE_SIZE +
                                   FILEID_STORE_SIZE + PAGE_STORE_SIZE +
                                   DIRPOS_STORE_SIZE,
                                   rec->record_length -
                                   (LSN_STORE_SIZE + FILEID_STORE_SIZE +
                                    PAGE_STORE_SIZE + DIRPOS_STORE_SIZE));
  info->trn= 0;
  tprint(tracef, "   rows' count %lu\n   undo_lsn now LSN (%lu,0x%lx)\n",
         (ulong)share->state.state.records, LSN_IN_PARTS(previous_undo_lsn));
  return error;
}


prototype_undo_exec_hook(UNDO_ROW_UPDATE)
{
  my_bool error;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  LSN previous_undo_lsn= lsn_korr(rec->header);
  MARIA_SHARE *share;

  if (info == NULL)
    return 1;

  share= info->s;
  share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;

  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    return 1;
  }

  info->trn= trn;
  error= _ma_apply_undo_row_update(info, previous_undo_lsn,
                                   log_record_buffer.str + LSN_STORE_SIZE +
                                   FILEID_STORE_SIZE,
                                   rec->record_length -
                                   (LSN_STORE_SIZE + FILEID_STORE_SIZE));
  info->trn= 0;
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
         LSN_IN_PARTS(previous_undo_lsn));
  return error;
}


prototype_undo_exec_hook(UNDO_KEY_INSERT)
{
  my_bool error;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  LSN previous_undo_lsn= lsn_korr(rec->header);
  MARIA_SHARE *share;

  if (info == NULL)
  {
    /*
      Unlike for REDOs, if the table was skipped it is abnormal; we have a
      transaction to rollback which used this table, as it is not rolled back
      it was supposed to hold this table and so the table should still be
      there.
    */
    return 1;
  }

  share= info->s;
  share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;

  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
        rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    return 1;
  }

  info->trn= trn;
  error= _ma_apply_undo_key_insert(info, previous_undo_lsn,
                                   log_record_buffer.str + LSN_STORE_SIZE +
                                   FILEID_STORE_SIZE,
                                   rec->record_length - LSN_STORE_SIZE -
                                   FILEID_STORE_SIZE);
  info->trn= 0;
  /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
         LSN_IN_PARTS(previous_undo_lsn));
  return error;
}


prototype_undo_exec_hook(UNDO_KEY_DELETE)
{
  my_bool error;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  LSN previous_undo_lsn= lsn_korr(rec->header);
  MARIA_SHARE *share;

  if (info == NULL)
  {
    /*
      Unlike for REDOs, if the table was skipped it is abnormal; we have a
      transaction to rollback which used this table, as it is not rolled back
      it was supposed to hold this table and so the table should still be
      there.
    */
    return 1;
  }

  share= info->s;
  share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;

  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
        rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    return 1;
  }

  info->trn= trn;
  error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
                                   log_record_buffer.str + LSN_STORE_SIZE +
                                   FILEID_STORE_SIZE,
                                   rec->record_length - LSN_STORE_SIZE -
                                   FILEID_STORE_SIZE);
  info->trn= 0;
  /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
         LSN_IN_PARTS(previous_undo_lsn));
  return error;
}


prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
{
  my_bool error;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  LSN previous_undo_lsn= lsn_korr(rec->header);
  MARIA_SHARE *share;

  if (info == NULL)
  {
    /*
      Unlike for REDOs, if the table was skipped it is abnormal; we have a
      transaction to rollback which used this table, as it is not rolled back
      it was supposed to hold this table and so the table should still be
      there.
    */
    return 1;
  }

  share= info->s;
  share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;

  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
        rec->record_length)
  {
    eprint(tracef, "Failed to read record\n");
    return 1;
  }

  info->trn= trn;
  error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
                                   log_record_buffer.str + LSN_STORE_SIZE +
                                   FILEID_STORE_SIZE + PAGE_STORE_SIZE,
                                   rec->record_length - LSN_STORE_SIZE -
                                   FILEID_STORE_SIZE - PAGE_STORE_SIZE);
  info->trn= 0;
  /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
  tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
         LSN_IN_PARTS(previous_undo_lsn));
  return error;
}



static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
{
  TRANSLOG_HEADER_BUFFER rec;
  struct st_translog_scanner_data scanner;
  int len;
  uint i;

  /* install hooks for execution */
#define install_redo_exec_hook(R)                                        \
  log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
    exec_REDO_LOGREC_ ## R;
#define install_undo_exec_hook(R)                                        \
  log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \
    exec_UNDO_LOGREC_ ## R;
  install_redo_exec_hook(LONG_TRANSACTION_ID);
  install_redo_exec_hook(CHECKPOINT);
  install_redo_exec_hook(REDO_CREATE_TABLE);
  install_redo_exec_hook(REDO_RENAME_TABLE);
  install_redo_exec_hook(REDO_REPAIR_TABLE);
  install_redo_exec_hook(REDO_DROP_TABLE);
  install_redo_exec_hook(FILE_ID);
  install_redo_exec_hook(INCOMPLETE_LOG);
  install_redo_exec_hook(INCOMPLETE_GROUP);
  install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
  install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
  install_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
  install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
  install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
  install_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
  install_redo_exec_hook(REDO_FREE_BLOCKS);
  install_redo_exec_hook(REDO_DELETE_ALL);
  install_redo_exec_hook(REDO_INDEX);
  install_redo_exec_hook(REDO_INDEX_NEW_PAGE);
  install_redo_exec_hook(REDO_INDEX_FREE_PAGE);
  install_redo_exec_hook(UNDO_ROW_INSERT);
  install_redo_exec_hook(UNDO_ROW_DELETE);
  install_redo_exec_hook(UNDO_ROW_UPDATE);
  install_redo_exec_hook(UNDO_KEY_INSERT);
  install_redo_exec_hook(UNDO_KEY_DELETE);
  install_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
  install_redo_exec_hook(COMMIT);
  install_redo_exec_hook(CLR_END);
  install_undo_exec_hook(UNDO_ROW_INSERT);
  install_undo_exec_hook(UNDO_ROW_DELETE);
  install_undo_exec_hook(UNDO_ROW_UPDATE);
  install_undo_exec_hook(UNDO_KEY_INSERT);
  install_undo_exec_hook(UNDO_KEY_DELETE);
  install_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);

  current_group_end_lsn= LSN_IMPOSSIBLE;
#ifndef DBUG_OFF
  current_group_table= NULL;
#endif

  if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
  {
    tprint(tracef, "checkpoint address refers to the log end log or "
           "log is empty, nothing to do.\n");
    return 0;
  }

  len= translog_read_record_header(lsn, &rec);

  if (len == RECHEADER_READ_ERROR)
  {
    eprint(tracef, "Failed to read header of the first record.\n");
    return 1;
  }
  if (translog_scanner_init(lsn, 1, &scanner, 1))
  {
    tprint(tracef, "Scanner init failed\n");
    return 1;
  }
  for (i= 1;;i++)
  {
    uint16 sid= rec.short_trid;
    const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
    display_record_position(log_desc, &rec, i);
    /*
      A complete group is a set of log records with an "end mark" record
      (e.g. a set of REDOs for an operation, terminated by an UNDO for this
      operation); if there is no "end mark" record the group is incomplete and
      won't be executed.
    */
    if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
        (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
    {
      if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
      {
        if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
        {
          /*
            Can happen if the transaction got a table write error, then
            unlocked tables thus wrote a COMMIT record. Or can be an
            INCOMPLETE_GROUP record written by a previous recovery.
          */
          tprint(tracef, "\nDiscarding incomplete group before this record\n");
          all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
        }
        else
        {
          struct st_translog_scanner_data scanner2;
          TRANSLOG_HEADER_BUFFER rec2;
          /*
            There is a complete group for this transaction, containing more
            than this event.
          */
          tprint(tracef, "   ends a group:\n");
          len=
            translog_read_record_header(all_active_trans[sid].group_start_lsn,
                                        &rec2);
          if (len < 0) /* EOF or error */
          {
            tprint(tracef, "Cannot find record where it should be\n");
            goto err;
          }
          if (translog_scanner_init(rec2.lsn, 1, &scanner2, 1))
          {
            tprint(tracef, "Scanner2 init failed\n");
            goto err;
          }
          current_group_end_lsn= rec.lsn;
          do
          {
            if (rec2.short_trid == sid) /* it's in our group */
            {
              const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
              display_record_position(log_desc2, &rec2, 0);
              if (apply == MARIA_LOG_CHECK)
              {
                translog_size_t read_len;
                enlarge_buffer(&rec2);
                read_len=
                  translog_read_record(rec2.lsn, 0, rec2.record_length,
                                       log_record_buffer.str, NULL);
                if (read_len != rec2.record_length)
                {
                  tprint(tracef, "Cannot read record's body: read %u of"
                         " %u bytes\n", read_len, rec2.record_length);
                  goto err;
                }
              }
              if (apply == MARIA_LOG_APPLY &&
                  display_and_apply_record(log_desc2, &rec2))
              {
                translog_destroy_scanner(&scanner2);
                goto err;
              }
            }
            len= translog_read_next_record_header(&scanner2, &rec2);
            if (len < 0) /* EOF or error */
            {
              tprint(tracef, "Cannot find record where it should be\n");
              goto err;
            }
          }
          while (rec2.lsn < rec.lsn);
          translog_free_record_header(&rec2);
          /* group finished */
          all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
          current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
          display_record_position(log_desc, &rec, 0);
          translog_destroy_scanner(&scanner2);
        }
      }
      if (apply == MARIA_LOG_APPLY &&
          display_and_apply_record(log_desc, &rec))
        goto err;
#ifndef DBUG_OFF
      current_group_table= NULL;
#endif
    }
    else /* record does not end group */
    {
      /* just record the fact, can't know if can execute yet */
      if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
      {
        /* group not yet started */
        all_active_trans[sid].group_start_lsn= rec.lsn;
      }
    }
    len= translog_read_next_record_header(&scanner, &rec);
    if (len < 0)
    {
      switch (len)
      {
      case RECHEADER_READ_EOF:
        tprint(tracef, "EOF on the log\n");
        break;
      case RECHEADER_READ_ERROR:
        tprint(tracef, "Error reading log\n");
        goto err;
      }
      break;
    }
  }
  translog_destroy_scanner(&scanner);
  translog_free_record_header(&rec);
  if (recovery_message_printed == REC_MSG_REDO)
  {
    fprintf(stderr, " 100%%");
    procent_printed= 1;
  }
  return 0;

err:
  translog_destroy_scanner(&scanner);
  return 1;
}


/**
   @brief Informs about any aborted groups or uncommitted transactions,
   prepares for the UNDO phase if needed.

   @note Observe that it may init trnman.
*/
static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
{
  uint sid, uncommitted= 0;
  char llbuf[22];
  LSN addr;

  hash_free(&all_dirty_pages);
  /*
    hash_free() can be called multiple times probably, but be safe if that
    changes
  */
  bzero(&all_dirty_pages, sizeof(all_dirty_pages));
  my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
  dirty_pages_pool= NULL;

  llstr(max_long_trid, llbuf);
  tprint(tracef, "Maximum transaction long id seen: %s\n", llbuf);
  if (prepare_for_undo_phase && trnman_init(max_long_trid))
    return -1;

  for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
  {
    TrID long_trid= all_active_trans[sid].long_trid;
    LSN gslsn= all_active_trans[sid].group_start_lsn;
    TRN *trn;
    if (gslsn != LSN_IMPOSSIBLE)
    {
      tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n",
             LSN_IN_PARTS(gslsn), sid);
      all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
    }
    if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
    {
      llstr(long_trid, llbuf);
      tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n",
             llbuf, sid);
      /* dummy_transaction_object serves only for DDLs */
      DBUG_ASSERT(long_trid != 0);
      if (prepare_for_undo_phase)
      {
        if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
          return -1;
        trn->undo_lsn= all_active_trans[sid].undo_lsn;
        trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn |
          TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */
        if (gslsn != LSN_IMPOSSIBLE)
        {
          /*
            UNDO phase will log some records. So, a future recovery may see:
            REDO(from incomplete group) - REDO(from rollback) - CLR_END
            and thus execute the first REDO (finding it in "a complete
            group"). To prevent that:
          */
          LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS];
          LSN lsn;
          if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP,
                                    trn, NULL, 0,
                                    TRANSLOG_INTERNAL_PARTS, log_array,
                                    NULL, NULL))
            return -1;
        }
      }
      uncommitted++;
    }
#ifdef MARIA_VERSIONING
    /*
      If real recovery: if transaction was committed, move it to some separate
      list for soon purging.
    */
#endif
  }

  my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
  all_active_trans= NULL;

  /*
    The UNDO phase uses some normal run-time code of ROLLBACK: generates log
    records, etc; prepare tables for that
  */
  addr= translog_get_horizon();
  for (sid= 0; sid <= SHARE_ID_MAX; sid++)
  {
    MARIA_HA *info= all_tables[sid].info;
    if (info != NULL)
    {
      prepare_table_for_close(info, addr);
      /*
        But we don't close it; we leave it available for the UNDO phase;
        it's likely that the UNDO phase will need it.
      */
      if (prepare_for_undo_phase)
        translog_assign_id_to_share_from_recovery(info->s, sid);
    }
  }
  return uncommitted;
}


static int run_undo_phase(uint uncommitted)
{
  DBUG_ENTER("run_undo_phase");

  if (uncommitted > 0)
  {
    checkpoint_useful= TRUE;
    if (tracef != stdout)
    {
      if (recovery_message_printed == REC_MSG_NONE)
        print_preamble();
      fprintf(stderr, "transactions to roll back:");
      recovery_message_printed= REC_MSG_UNDO;
    }
    tprint(tracef, "%u transactions will be rolled back\n", uncommitted);
    for( ; ; )
    {
      char llbuf[22];
      TRN *trn;
      if (recovery_message_printed == REC_MSG_UNDO)
        fprintf(stderr, " %u", uncommitted);
      if ((uncommitted--) == 0)
        break;
      trn= trnman_get_any_trn();
      DBUG_ASSERT(trn != NULL);
      llstr(trn->trid, llbuf);
      tprint(tracef, "Rolling back transaction of long id %s\n", llbuf);

      /* Execute all undo entries */
      while (trn->undo_lsn)
      {
        TRANSLOG_HEADER_BUFFER rec;
        LOG_DESC *log_desc;
        if (translog_read_record_header(trn->undo_lsn, &rec) ==
            RECHEADER_READ_ERROR)
          DBUG_RETURN(1);
        log_desc= &log_record_type_descriptor[rec.type];
        display_record_position(log_desc, &rec, 0);
        if (log_desc->record_execute_in_undo_phase(&rec, trn))
        {
          tprint(tracef, "Got error %d when executing undo\n", my_errno);
          DBUG_RETURN(1);
        }
      }

      if (trnman_rollback_trn(trn))
        DBUG_RETURN(1);
      /* We could want to span a few threads (4?) instead of 1 */
      /* In the future, we want to have this phase *online* */
    }
  }
  DBUG_RETURN(0);
}


/**
   @brief re-enables transactionality, updates is_of_horizon

   @param  info                table
   @param  horizon             address to set is_of_horizon
*/

static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon)
{
  MARIA_SHARE *share= info->s;
  /*
    In a fully-forward REDO phase (no checkpoint record),
    state is now at least as new as the LSN of the current record. It may be
    newer, in case we are seeing a LOGREC_FILE_ID which tells us to close a
    table, but that table was later modified further in the log.
    But if we parsed a checkpoint record, it may be this way in the log:
    FILE_ID(6->t2)... FILE_ID(6->t1)... CHECKPOINT(6->t1)
    Checkpoint parsing opened t1 with id 6; first FILE_ID above is going to
    make t1 close; the first condition below is however false (when checkpoint
    was taken it increased is_of_horizon) and so it works. For safety we
    add the second condition.
  */
  if (cmp_translog_addr(share->state.is_of_horizon, horizon) < 0 &&
      cmp_translog_addr(share->lsn_of_file_id, horizon) < 0)
  {
    share->state.is_of_horizon= horizon;
    _ma_state_info_write_sub(share->kfile.file, &share->state, 1);
  }
  _ma_reenable_logging_for_table(share);
  info->trn= NULL; /* safety */
}


static MARIA_HA *get_MARIA_HA_from_REDO_record(const
                                               TRANSLOG_HEADER_BUFFER *rec)
{
  uint16 sid;
  pgcache_page_no_t page;
  MARIA_HA *info;
  char llbuf[22];
  my_bool index_page_redo_entry= 0;

  print_redo_phase_progress(rec->lsn);
  sid= fileid_korr(rec->header);
  page= page_korr(rec->header + FILEID_STORE_SIZE);
  switch (rec->type) {
    /* not all REDO records have a page: */
  case LOGREC_REDO_INDEX_NEW_PAGE:
  case LOGREC_REDO_INDEX:
  case LOGREC_REDO_INDEX_FREE_PAGE:
    index_page_redo_entry= 1;
    /* Fall trough*/
  case LOGREC_REDO_INSERT_ROW_HEAD:
  case LOGREC_REDO_INSERT_ROW_TAIL:
  case LOGREC_REDO_PURGE_ROW_HEAD:
  case LOGREC_REDO_PURGE_ROW_TAIL:
    llstr(page, llbuf);
    tprint(tracef, "   For page %s of table of short id %u", llbuf, sid);
    break;
    /* other types could print their info here too */
  default:
    break;
  }
  info= all_tables[sid].info;
#ifndef DBUG_OFF
  DBUG_ASSERT(current_group_table == NULL || current_group_table == info);
  current_group_table= info;
#endif
  if (info == NULL)
  {
    tprint(tracef, ", table skipped, so skipping record\n");
    return NULL;
  }
  tprint(tracef, ", '%s'", info->s->open_file_name);
  if (cmp_translog_addr(rec->lsn, info->s->lsn_of_file_id) <= 0)
  {
    /*
      This can happen only if processing a record before the checkpoint
      record.
      id->name mapping is newer than REDO record: for sure the table subject
      of the REDO has been flushed and forced (id re-assignment implies this);
      REDO can be ignored (and must be, as we don't know what this subject
      table was).
    */
    DBUG_ASSERT(cmp_translog_addr(rec->lsn, checkpoint_start) < 0);
    tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
           " than record, skipping record",
           LSN_IN_PARTS(info->s->lsn_of_file_id));
    return NULL;
  }
  /* detect if an open instance of a dropped table (internal bug) */
  DBUG_ASSERT(info->s->last_version != 0);
  if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
  {
    uint64 file_and_page_id=
      (((uint64) (index_page_redo_entry ? all_tables[sid].org_kfile :
                  all_tables[sid].org_dfile)) << 32) | page;
    struct st_dirty_page *dirty_page= (struct st_dirty_page *)
      hash_search(&all_dirty_pages,
                  (uchar *)&file_and_page_id, sizeof(file_and_page_id));
    if ((dirty_page == NULL) ||
        cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0)
    {
      tprint(tracef, ", ignoring because of dirty_pages list\n");
      return NULL;
    }
  }

  /*
    So we are going to read the page, and if its LSN is older than the
    record's we will modify the page
  */
  tprint(tracef, ", applying record\n");
  _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
  return info;
}


static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
                                               TRANSLOG_HEADER_BUFFER *rec)
{
  uint16 sid;
  MARIA_HA *info;

  sid= fileid_korr(rec->header + LSN_STORE_SIZE);
  tprint(tracef, "   For table of short id %u", sid);
  info= all_tables[sid].info;
#ifndef DBUG_OFF
  DBUG_ASSERT(current_group_table == NULL || current_group_table == info);
  current_group_table= info;
#endif
  if (info == NULL)
  {
    tprint(tracef, ", table skipped, so skipping record\n");
    return NULL;
  }
  tprint(tracef, ", '%s'", info->s->open_file_name);
  if (cmp_translog_addr(rec->lsn, info->s->lsn_of_file_id) <= 0)
  {
    tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
           " than record, skipping record",
           LSN_IN_PARTS(info->s->lsn_of_file_id));
    return NULL;
  }
  DBUG_ASSERT(info->s->last_version != 0);
  _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
  tprint(tracef, ", applying record\n");
  return info;
}


/**
   @brief Parses checkpoint record.

   Builds from it the dirty_pages list (a hash), opens tables and maps them to
   their 2-byte IDs, recreates transactions (not real TRNs though).

   @return LSN from where in the log the REDO phase should start
     @retval LSN_ERROR error
     @retval other     ok
*/

static LSN parse_checkpoint_record(LSN lsn)
{
  ulong i, nb_dirty_pages;
  TRANSLOG_HEADER_BUFFER rec;
  TRANSLOG_ADDRESS start_address;
  int len;
  uint nb_active_transactions, nb_committed_transactions, nb_tables;
  uchar *ptr;
  LSN minimum_rec_lsn_of_active_transactions, minimum_rec_lsn_of_dirty_pages;
  struct st_dirty_page *next_dirty_page_in_pool;

  tprint(tracef, "Loading data from checkpoint record at LSN (%lu,0x%lx)\n",
         LSN_IN_PARTS(lsn));
  if ((len= translog_read_record_header(lsn, &rec)) == RECHEADER_READ_ERROR)
  {
    tprint(tracef, "Cannot find checkpoint record where it should be\n");
    return LSN_ERROR;
  }

  enlarge_buffer(&rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec.lsn, 0, rec.record_length,
                           log_record_buffer.str, NULL) !=
      rec.record_length)
  {
    eprint(tracef, "Failed to read record\n");
    return LSN_ERROR;
  }

  ptr= log_record_buffer.str;
  start_address= lsn_korr(ptr);
  ptr+= LSN_STORE_SIZE;

  /* transactions */
  nb_active_transactions= uint2korr(ptr);
  ptr+= 2;
  tprint(tracef, "%u active transactions\n", nb_active_transactions);
  minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
  ptr+= LSN_STORE_SIZE;
  max_long_trid= transid_korr(ptr);
  ptr+= TRANSID_SIZE;

  /*
    how much brain juice and discussions there was to come to writing this
    line
  */
  set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions);

  for (i= 0; i < nb_active_transactions; i++)
  {
    uint16 sid= uint2korr(ptr);
    TrID long_id;
    LSN undo_lsn, first_undo_lsn;
    ptr+= 2;
    long_id= uint6korr(ptr);
    ptr+= 6;
    DBUG_ASSERT(sid > 0 && long_id > 0);
    undo_lsn= lsn_korr(ptr);
    ptr+= LSN_STORE_SIZE;
    first_undo_lsn= lsn_korr(ptr);
    ptr+= LSN_STORE_SIZE;
    new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
  }
  nb_committed_transactions= uint4korr(ptr);
  ptr+= 4;
  tprint(tracef, "%lu committed transactions\n",
         (ulong)nb_committed_transactions);
  /* no purging => committed transactions are not important */
  ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;

  /* tables  */
  nb_tables= uint4korr(ptr);
  ptr+= 4;
  tprint(tracef, "%u open tables\n", nb_tables);
  for (i= 0; i< nb_tables; i++)
  {
    char name[FN_REFLEN];
    File kfile, dfile;
    LSN first_log_write_lsn;
    uint name_len;
    uint16 sid= uint2korr(ptr);
    ptr+= 2;
    DBUG_ASSERT(sid > 0);
    kfile= uint4korr(ptr);
    ptr+= 4;
    dfile= uint4korr(ptr);
    ptr+= 4;
    first_log_write_lsn= lsn_korr(ptr);
    ptr+= LSN_STORE_SIZE;
    name_len= strlen((char *)ptr) + 1;
    strmake(name, (char *)ptr, sizeof(name)-1);
    ptr+= name_len;
    if (new_table(sid, name, kfile, dfile, first_log_write_lsn))
      return LSN_ERROR;
  }

  /* dirty pages */
  nb_dirty_pages= uint8korr(ptr);
  ptr+= 8;
  tprint(tracef, "%lu dirty pages\n", nb_dirty_pages);
  if (hash_init(&all_dirty_pages, &my_charset_bin, nb_dirty_pages,
                offsetof(struct st_dirty_page, file_and_page_id),
                sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
                NULL, NULL, 0))
    return LSN_ERROR;
  dirty_pages_pool=
    (struct st_dirty_page *)my_malloc(nb_dirty_pages *
                                      sizeof(struct st_dirty_page),
                                      MYF(MY_WME));
  if (unlikely(dirty_pages_pool == NULL))
    return LSN_ERROR;
  next_dirty_page_in_pool= dirty_pages_pool;
  minimum_rec_lsn_of_dirty_pages= LSN_MAX;
  for (i= 0; i < nb_dirty_pages ; i++)
  {
    pgcache_page_no_t pageid;
    LSN rec_lsn;
    File fileid= uint4korr(ptr);
    ptr+= 4;
    pageid= uint4korr(ptr);
    ptr+= 4;
    rec_lsn= lsn_korr(ptr);
    ptr+= LSN_STORE_SIZE;
    if (new_page(fileid, pageid, rec_lsn, next_dirty_page_in_pool++))
      return LSN_ERROR;
    set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
  }
  /* after that, there will be no insert/delete into the hash */
  /*
    sanity check on record (did we screw up with all those "ptr+=", did the
    checkpoint write code and checkpoint read code go out of sync?).
  */
  if (ptr != (log_record_buffer.str + log_record_buffer.length))
  {
    eprint(tracef, "checkpoint record corrupted\n");
    return LSN_ERROR;
  }
  set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages);

  /*
    Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for
    translog_read_record() functions
  */
  checkpoint_start= translog_next_LSN(start_address, LSN_IMPOSSIBLE);
  if (checkpoint_start == LSN_IMPOSSIBLE)
  {
    /*
      There must be a problem, as our checkpoint record exists and is >= the
      address which is stored in its first bytes, which is >= start_address.
    */
    return LSN_ERROR;
  }
  return checkpoint_start;
}

static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
                    struct st_dirty_page *dirty_page)
{
  /* serves as hash key */
  dirty_page->file_and_page_id= (((uint64)fileid) << 32) | pageid;
  dirty_page->rec_lsn= rec_lsn;
  return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
}


static int close_all_tables(void)
{
  int error= 0;
  uint count= 0;
  LIST *list_element, *next_open;
  MARIA_HA *info;
  TRANSLOG_ADDRESS addr;
  DBUG_ENTER("close_all_tables");

  pthread_mutex_lock(&THR_LOCK_maria);
  if (maria_open_list == NULL)
    goto end;
  tprint(tracef, "Closing all tables\n");
  if (tracef != stdout)
  {
    if (recovery_message_printed == REC_MSG_NONE)
      print_preamble();
    for (count= 0, list_element= maria_open_list ;
         list_element ; count++, (list_element= list_element->next))
      fprintf(stderr, "tables to flush:");
    recovery_message_printed= REC_MSG_FLUSH;
  }
  /*
    Since the end of end_of_redo_phase(), we may have written new records
    (if UNDO phase ran)  and thus the state is newer than at
    end_of_redo_phase(), we need to bump is_of_horizon again.
  */
  addr= translog_get_horizon();
  for (list_element= maria_open_list ; ; list_element= next_open)
  {
    if (recovery_message_printed == REC_MSG_FLUSH)
      fprintf(stderr, " %u", count--);
    if (list_element == NULL)
      break;
    next_open= list_element->next;
    info= (MARIA_HA*)list_element->data;
    pthread_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
    /*
      Tables which we see here are exactly those which were open at time of
      crash. They might have open_count>0 as Checkpoint maybe flushed their
      state while they were used. As Recovery corrected them, don't alarm the
      user, don't ask for a table check:
    */
    info->s->state.open_count= 0;
    prepare_table_for_close(info, addr);
    error|= maria_close(info);
    pthread_mutex_lock(&THR_LOCK_maria);
  }
end:
  pthread_mutex_unlock(&THR_LOCK_maria);
  DBUG_RETURN(error);
}


/**
   @brief Close all table instances with a certain name which are present in
   all_tables.

   @param  name                Name of table
   @param  addr                Log address passed to prepare_table_for_close()
*/

static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr)
{
  my_bool res= 0;
  /* There are no other threads using the tables, so we don't need any locks */
  struct st_table_for_recovery *internal_table, *end;
  for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1;
       internal_table < end ;
       internal_table++)
  {
    MARIA_HA *info= internal_table->info;
    if ((info != NULL) && !strcmp(info->s->open_file_name, name))
    {
      prepare_table_for_close(info, addr);
      if (maria_close(info))
        res= 1;
      internal_table->info= NULL;
    }
  }
  return res;
}


/**
   Temporarily disables logging for this table.

   If that makes the log incomplete, writes a LOGREC_INCOMPLETE_LOG to the log
   to warn log readers.

   @param  info            table
   @param  log_incomplete  if that disabling makes the log incomplete

   @note for example in the REDO phase we disable logging but that does not
   make the log incomplete.
*/
void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
                                       my_bool log_incomplete)
{
  MARIA_SHARE *share= info->s;
  if (log_incomplete)
  {
    uchar log_data[FILEID_STORE_SIZE];
    LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
    LSN lsn;
    log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    (char*) log_data;
    log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
    translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG,
                          info->trn, info, sizeof(log_data),
                          TRANSLOG_INTERNAL_PARTS + 1, log_array,
                          log_data, NULL);
  }
  /* if we disabled before writing the record, record wouldn't reach log */
  share->now_transactional= FALSE;
  share->page_type= PAGECACHE_PLAIN_PAGE;
}

static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
{
  static int end_logno= FILENO_IMPOSSIBLE, end_offset, percentage_printed= 0;
  static ulonglong initial_remainder= -1;
  int cur_logno, cur_offset;
  ulonglong local_remainder;
  int percentage_done;

  if (tracef == stdout)
    return;
  if (recovery_message_printed == REC_MSG_NONE)
  {
    print_preamble();
    fprintf(stderr, "recovered pages: 0%%");
    procent_printed= 1;
    recovery_message_printed= REC_MSG_REDO;
  }
  if (end_logno == FILENO_IMPOSSIBLE)
  {
    LSN end_addr= translog_get_horizon();
    end_logno= LSN_FILE_NO(end_addr);
    end_offset= LSN_OFFSET(end_addr);
  }
  cur_logno= LSN_FILE_NO(addr);
  cur_offset= LSN_OFFSET(addr);
  local_remainder= (cur_logno == end_logno) ? (end_offset - cur_offset) :
    (((longlong)log_file_size) - cur_offset +
     max(end_logno - cur_logno - 1, 0) * ((longlong)log_file_size) +
     end_offset);
  if (initial_remainder == (ulonglong)(-1))
    initial_remainder= local_remainder;
  percentage_done= ((initial_remainder - local_remainder) * ULL(100) /
                    initial_remainder);
  if ((percentage_done - percentage_printed) >= 10)
  {
    percentage_printed= percentage_done;
    fprintf(stderr, " %d%%", percentage_done);
    procent_printed= 1;
  }
}

#ifdef MARIA_EXTERNAL_LOCKING
#error Marias Checkpoint and Recovery are really not ready for it
#endif

/*
Recovery of the state :  how it works
=====================================

Here we ignore Checkpoints for a start.

The state (MARIA_HA::MARIA_SHARE::MARIA_STATE_INFO) is updated in
memory frequently (at least at every row write/update/delete) but goes
to disk at few moments: maria_close() when closing the last open
instance, and a few rare places like CHECK/REPAIR/ALTER
(non-transactional tables also do it at maria_lock_database() but we
needn't cover them here).

In case of crash, state on disk is likely to be older than what it was
in memory, the REDO phase needs to recreate the state as it was in
memory at the time of crash. When we say Recovery here we will always
mean "REDO phase".

For example MARIA_STATUS_INFO::records (count of records). It is updated at
the end of every row write/update/delete/delete_all. When Recovery sees the
sign of such row operation (UNDO or REDO), it may need to update the records'
count if that count does not reflect that operation (is older). How to know
the age of the state compared to the log record: every time the state
goes to disk at runtime, its member "is_of_horizon" is updated to the
current end-of-log horizon. So Recovery just needs to compare is_of_horizon
and the record's LSN to know if it should modify "records".

Other operations like ALTER TABLE DISABLE KEYS update the state but
don't write log records, thus the REDO phase cannot repeat their
effect on the state in case of crash. But we make them sync the state
as soon as they have finished. This reduces the window for a problem.

It looks like only one thread at a time updates the state in memory or
on disk. We assume that the upper level (normally MySQL) has protection
against issuing HA_EXTRA_(FORCE_REOPEN|PREPARE_FOR_RENAME) so that these
are not issued while there are any running transactions on the given table.
If this is not done, we may write a corrupted state to disk.

With checkpoints
================

Checkpoint module needs to read the state in memory and write it to
disk. This may happen while some other thread is modifying the state
in memory or on disk. Checkpoint thus may be reading changing data, it
needs a mutex to not have it corrupted, and concurrent modifiers of
the state need that mutex too for the same reason.
"records" is modified for every row write/update/delete, we don't want
to add a mutex lock/unlock there. So we re-use the mutex lock/unlock
which is already present in these moments, namely the log's mutex which is
taken when UNDO_ROW_INSERT|UPDATE|DELETE is written: we update "records" in
under-log-mutex hooks when writing these records (thus "records" is
not updated at the end of maria_write/update/delete() anymore).
Thus Checkpoint takes the log's lock and can read "records" from
memory an write it to disk and release log's lock.
We however want to avoid having the disk write under the log's
lock. So it has to be under another mutex, natural choice is
intern_lock (as Checkpoint needs it anyway to read MARIA_SHARE::kfile,
and as maria_close() takes it too). All state writes to disk are
changed to be protected with intern_lock.
So Checkpoint takes intern_lock, log's lock, reads "records" from
memory, releases log's lock, updates is_of_horizon and writes "records" to
disk, release intern_lock.
In practice, not only "records" needs to be written but the full
state. So, Checkpoint reads the full state from memory. Some other
thread may at this moment be modifying in memory some pieces of the
state which are not protected by the lock's log (see ma_extra.c
HA_EXTRA_NO_KEYS), and Checkpoint would be reading a corrupted state
from memory; to guard against that we extend the intern_lock-zone to
changes done to the state in memory by HA_EXTRA_NO_KEYS et al, and
also any change made in memory to create_rename_lsn/state_is_of_horizon.
Last, we don't want in Checkpoint to do
 log lock; read state from memory; release log lock;
for each table, it may hold the log's lock too much in total.
So, we instead do
 log lock; read N states from memory; release log lock;
Thus, the sequence above happens outside of any intern_lock.
But this re-introduces the problem that some other thread may be changing the
state in memory and on disk under intern_lock, without log's lock, like
HA_EXTRA_NO_KEYS, while we read the N states. However, when Checkpoint later
comes to handling the table under intern_lock, which is serialized with
HA_EXTRA_NO_KEYS, it can see that is_of_horizon is higher then when the state
was read from memory under log's lock, and thus can decide to not flush the
obsolete state it has, knowing that the other thread flushed a more recent
state already. If on the other hand is_of_horizon is not higher, the read
state is current and can be flushed. So we have a per-table sequence:
 lock intern_lock; test if is_of_horizon is higher than when we read the state
 under log's lock; if no then flush the read state to disk.
*/

/* some comments and pseudo-code which we keep for later */
#if 0
  /*
    MikaelR suggests: support checkpoints during REDO phase too: do checkpoint
    after a certain amount of log records have been executed. This helps
    against repeated crashes. Those checkpoints could not be user-requested
    (as engine is not communicating during the REDO phase), so they would be
    automatic: this changes the original assumption that we don't write to the
    log while in the REDO phase, but why not. How often should we checkpoint?
  */

  /*
    We want to have two steps:
    engine->recover_with_max_memory();
    next_engine->recover_with_max_memory();
    engine->init_with_normal_memory();
    next_engine->init_with_normal_memory();
    So: in recover_with_max_memory() allocate a giant page cache, do REDO
    phase, then all page cache is flushed and emptied and freed (only retain
    small structures like TM): take full checkpoint, which is useful if
    next engine crashes in its recovery the next second.
    Destroy all shares (maria_close()), then at init_with_normal_memory() we
    do this:
  */

  /**** UNDO PHASE *****/

  /*
    Launch one or more threads to do the background rollback. Don't wait for
    them to complete their rollback (background rollback; for debugging, we
    can have an option which waits). Set a counter (total_of_rollback_threads)
    to the number of threads to lauch.

    Note that InnoDB's rollback-in-background works as long as InnoDB is the
    last engine to recover, otherwise MySQL will refuse new connections until
    the last engine has recovered so it's not "background" from the user's
    point of view. InnoDB is near top of sys_table_types so all others
    (e.g. BDB) recover after it... So it's really "online rollback" only if
    InnoDB is the only engine.
  */

  /* wake up delete/update handler */
  /* tell the TM that it can now accept new transactions */

  /*
    mark that checkpoint requests are now allowed.
  */
#endif