/* Copyright (C) 2006, 2007 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/*
  WL#3072 Maria recovery
  First version written by Guilhem Bichot on 2006-04-27.
*/

/* Here is the implementation of this module */

#include "maria_def.h"
#include "ma_recovery.h"
#include "ma_blockrec.h"
#include "trnman.h"

struct st_trn_for_recovery /* used only in the REDO phase */
{
  LSN group_start_lsn, undo_lsn, first_undo_lsn;
  TrID long_trid;
};
struct st_dirty_page /* used only in the REDO phase */
{
  uint64 file_and_page_id;
  LSN rec_lsn;
};
struct st_table_for_recovery /* used in the REDO and UNDO phase */
{
  MARIA_HA *info;
  File org_kfile, org_dfile; /**< OS descriptors when Checkpoint saw table */
};
/* Variables used by all functions of this module. Ok as single-threaded */
static struct st_trn_for_recovery *all_active_trans;
static struct st_table_for_recovery *all_tables;
static HASH all_dirty_pages;
static struct st_dirty_page *dirty_pages_pool;
static LSN current_group_end_lsn,
  checkpoint_start= LSN_IMPOSSIBLE;
static FILE *tracef; /**< trace file for debugging */

#define prototype_exec_hook(R)                                          \
  static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
#define prototype_exec_hook_dummy(R)                                    \
  static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec        \
                               __attribute ((unused)))
prototype_exec_hook(LONG_TRANSACTION_ID);
prototype_exec_hook_dummy(CHECKPOINT);
prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(REDO_DROP_TABLE);
prototype_exec_hook(FILE_ID);
prototype_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_BLOCKS);
prototype_exec_hook(REDO_DELETE_ALL);
prototype_exec_hook(UNDO_ROW_INSERT);
prototype_exec_hook(UNDO_ROW_DELETE);
prototype_exec_hook(UNDO_ROW_UPDATE);
prototype_exec_hook(UNDO_ROW_PURGE);
prototype_exec_hook(COMMIT);
static int run_redo_phase(LSN lsn, my_bool apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
static int run_undo_phase(uint unfinished);
static void display_record_position(const LOG_DESC *log_desc,
                                    const TRANSLOG_HEADER_BUFFER *rec,
                                    uint number);
static int display_and_apply_record(const LOG_DESC *log_desc,
                                    const TRANSLOG_HEADER_BUFFER *rec);
static MARIA_HA *get_MARIA_HA_from_REDO_record(const
                                               TRANSLOG_HEADER_BUFFER *rec);
static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
                                               TRANSLOG_HEADER_BUFFER *rec);
static void prepare_table_for_close(MARIA_HA *info, LSN at_lsn);
static int parse_checkpoint_record(LSN lsn);
static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
                            LSN first_undo_lsn);
static int new_table(uint16 sid, const char *name,
                     File org_kfile, File org_dfile, LSN lsn);
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
                    struct st_dirty_page *dirty_page);
static int close_all_tables();

/** @brief global [out] buffer for translog_read_record(); never shrinks */
static LEX_STRING log_record_buffer;
#define enlarge_buffer(rec)                                             \
  if (log_record_buffer.length < (rec)->record_length)                  \
  {                                                                     \
    log_record_buffer.length= (rec)->record_length;                     \
    log_record_buffer.str= my_realloc(log_record_buffer.str,            \
                                      (rec)->record_length, MYF(MY_WME)); \
  }

#define ALERT_USER() DBUG_ASSERT(0)
#define LSN_IN_HEX(L) (ulong)LSN_FILE_NO(L),(ulong)LSN_OFFSET(L)


/**
   @brief Recovers from the last checkpoint.

   Runs the REDO phase using special structures, then sets up the playground
   of runtime: recreates transactions inside trnman, open tables with their
   two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
   tables.
*/

int maria_recover()
{
  int res= 1;
  FILE *trace_file;
  DBUG_ENTER("maria_recover");

  DBUG_ASSERT(!maria_in_recovery);
  maria_in_recovery= TRUE;

  if ((trace_file= fopen("maria_recovery.trace", "w")))
  {
    fprintf(trace_file, "TRACE of the last MARIA recovery from mysqld\n");
    DBUG_ASSERT(maria_pagecache->inited);
    res= maria_apply_log(LSN_IMPOSSIBLE, TRUE, trace_file, TRUE);
    if (!res)
      fprintf(trace_file, "SUCCESS\n");
    fclose(trace_file);
  }
  maria_in_recovery= FALSE;
  DBUG_RETURN(res);
}


/**
   @brief Displays and/or applies the log

   @param  from_lsn        LSN from which log reading/applying should start;
                           LSN_IMPOSSIBLE means "use last checkpoint"
   @param  apply           if log records should be applied or not
   @param  trace_file      trace file where progress/debug messages will go

   @todo This trace_file thing is primitive; soon we will make it similar to
   ma_check_print_warning() etc, and a successful recovery does not need to
   create a trace file. But for debugging now it is useful.

   @return Operation status
     @retval 0      OK
     @retval !=0    Error
*/

int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
                    my_bool should_run_undo_phase)
{
  int error= 0;
  DBUG_ENTER("maria_apply_log");

  DBUG_ASSERT(apply || !should_run_undo_phase);
  DBUG_ASSERT(!maria_multi_threaded);
  all_active_trans= (struct st_trn_for_recovery *)
    my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
              MYF(MY_ZEROFILL));
  all_tables= (struct st_table_for_recovery *)
    my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
              MYF(MY_ZEROFILL));
  if (!all_active_trans || !all_tables)
    goto err;

  tracef= trace_file;

  if (from_lsn == LSN_IMPOSSIBLE)
  {
    if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
    {
      from_lsn= translog_first_theoretical_lsn();
      /*
        as far as we have not yet any checkpoint then the very first
        log file should be present.
      */
      if (unlikely((from_lsn == LSN_IMPOSSIBLE) ||
                   (from_lsn == LSN_ERROR)))
        goto err;
    }
    else
    {
      DBUG_ASSERT(0); /* not yet implemented */
      from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
      if (from_lsn == LSN_IMPOSSIBLE)
        goto err;
    }
  }

  if (run_redo_phase(from_lsn, apply))
    goto err;

  uint unfinished_trans= end_of_redo_phase(should_run_undo_phase);
  if (unfinished_trans == (uint)-1)
    goto err;
  if (should_run_undo_phase)
  {
    if (run_undo_phase(unfinished_trans))
      return 1;
  }
  else if (unfinished_trans > 0)
    fprintf(tracef, "WARNING: %u unfinished transactions; some tables may be"
            " left inconsistent!\n", unfinished_trans);

  /*
    we don't use maria_panic() because it would maria_end(), and Recovery does
    not want that (we want to keep modules initialized for runtime).
  */
  if (close_all_tables())
    goto err;

  /*
    At this stage, end of recovery, trnman is left initialized. This is for
    the future, when we have an online UNDO phase or prepared transactions.
  */
  goto end;
err:
  error= 1;
  fprintf(tracef, "Recovery of tables with transaction logs FAILED\n");
end:
  hash_free(&all_dirty_pages);
  bzero(&all_dirty_pages, sizeof(all_dirty_pages));
  my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
  dirty_pages_pool= NULL;
  my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
  all_tables= NULL;
  my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
  all_active_trans= NULL;
  my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR));
  log_record_buffer.str= NULL;
  log_record_buffer.length= 0;
  /* we don't cleanly close tables if we hit some error (may corrupt them) */
  DBUG_RETURN(error);
}


/* very basic info about the record's header */
static void display_record_position(const LOG_DESC *log_desc,
                                    const TRANSLOG_HEADER_BUFFER *rec,
                                    uint number)
{
  /*
    if number==0, we're going over records which we had already seen and which
    form a group, so we indent below the group's end record
  */
  fprintf(tracef, "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n",
          number ? "" : "   ", number, LSN_IN_HEX(rec->lsn),
          rec->short_trid, log_desc->name, rec->type,
         (ulong)rec->record_length);
}


static int display_and_apply_record(const LOG_DESC *log_desc,
                                    const TRANSLOG_HEADER_BUFFER *rec)
{
  int error;
  if (log_desc->record_execute_in_redo_phase == NULL)
  {
    /* die on all not-yet-handled records :) */
    DBUG_ASSERT("one more hook" == "to write");
    return 1;
  }
  if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
    fprintf(tracef, "Got error when executing record\n");
  return error;
}


prototype_exec_hook(LONG_TRANSACTION_ID)
{
  uint16 sid= rec->short_trid;
  TrID long_trid= all_active_trans[sid].long_trid;
  /* abort group of this trn (must be of before a crash) */
  LSN gslsn= all_active_trans[sid].group_start_lsn;
  if (gslsn != LSN_IMPOSSIBLE)
  {
    fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
            LSN_IN_HEX(gslsn), sid);
    all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
  }
  if (long_trid != 0)
  {
    LSN ulsn= all_active_trans[sid].undo_lsn;
    if (ulsn != LSN_IMPOSSIBLE)
    {
      char llbuf[22];
      llstr(long_trid, llbuf);
      fprintf(tracef, "Found an old transaction long_trid %s short_trid %u"
              " with same short id as this new transaction, and has neither"
              " committed nor rollback (undo_lsn: (%lu,0x%lx))\n", llbuf,
              sid, LSN_IN_HEX(ulsn));
      goto err;
    }
  }
  long_trid= uint6korr(rec->header);
  new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
  goto end;
err:
  ALERT_USER();
  return 1;
end:
  return 0;
}


static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
                            LSN first_undo_lsn)
{
  char llbuf[22];
  all_active_trans[sid].long_trid= long_id;
  llstr(long_id, llbuf);
  fprintf(tracef, "Transaction long_trid %s short_trid %u starts\n",
          llbuf, sid);
  all_active_trans[sid].undo_lsn= undo_lsn;
  all_active_trans[sid].first_undo_lsn= first_undo_lsn;
}


prototype_exec_hook_dummy(CHECKPOINT)
{
  /* the only checkpoint we care about was found via control file, ignore */
  return 0;
}


prototype_exec_hook(REDO_CREATE_TABLE)
{
  File dfile= -1, kfile= -1;
  char *linkname_ptr, filename[FN_REFLEN];
  char *name, *ptr;
  myf create_flag;
  uint flags;
  int error= 1, create_mode= O_RDWR | O_TRUNC;
  MARIA_HA *info= NULL;
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
      rec->record_length)
  {
    fprintf(tracef, "Failed to read record\n");
    goto end;
  }
  name= log_record_buffer.str;
  fprintf(tracef, "Table '%s'", name);
  /* we try hard to get create_rename_lsn, to avoid mistakes if possible */
  info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
  if (info)
  {
    MARIA_SHARE *share= info->s;
    /* check that we're not already using it */
    DBUG_ASSERT(share->reopen == 1);
    DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
    if (!share->base.born_transactional)
    {
      /*
        could be that transactional table was later dropped, and a non-trans
        one was renamed to its name, thus create_rename_lsn is 0 and should
        not be trusted.
      */
      fprintf(tracef, ", is not transactional\n");
      ALERT_USER();
      error= 0;
      goto end;
    }
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
    {
      fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
              " record, ignoring",
              LSN_IN_HEX(share->state.create_rename_lsn));
      error= 0;
      goto end;
    }
    if (maria_is_crashed(info))
    {
      fprintf(tracef, ", is crashed, overwriting it");
      ALERT_USER();
    }
    maria_close(info);
    info= NULL;
  }
  /* if does not exist, is older, or its header is corrupted, overwrite it */
  /** @todo symlinks */
  ptr= name + strlen(name) + 1;
  if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
    fprintf(tracef, ", we will only touch index file");
  fn_format(filename, name, "", MARIA_NAME_IEXT,
            (MY_UNPACK_FILENAME |
             (flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) |
            MY_APPEND_EXT);
  linkname_ptr= NULL;
  create_flag= MY_DELETE_OLD;
  fprintf(tracef, ", creating as '%s'", filename);
  if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
                                     MYF(MY_WME|create_flag))) < 0)
  {
    fprintf(tracef, "Failed to create index file\n");
    goto end;
  }
  ptr++;
  uint kfile_size_before_extension= uint2korr(ptr);
  ptr+= 2;
  uint keystart= uint2korr(ptr);
  ptr+= 2;
  /* set create_rename_lsn (for maria_read_log to be idempotent) */
  lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn);
  if (my_pwrite(kfile, ptr,
                kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
      my_chsize(kfile, keystart, 0, MYF(MY_WME)))
  {
    fprintf(tracef, "Failed to write to index file\n");
    goto end;
  }
  if (!(flags & HA_DONT_TOUCH_DATA))
  {
    fn_format(filename,name,"", MARIA_NAME_DEXT,
              MY_UNPACK_FILENAME | MY_APPEND_EXT);
    linkname_ptr= NULL;
    create_flag=MY_DELETE_OLD;
    if (((dfile=
          my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
                                 MYF(MY_WME | create_flag))) < 0) ||
        my_close(dfile, MYF(MY_WME)))
    {
      fprintf(tracef, "Failed to create data file\n");
      goto end;
    }
    /*
      we now have an empty data file. To be able to
      _ma_initialize_data_file() we need some pieces of the share to be
      correctly filled. So we just open the table (fortunately, an empty
      data file does not preclude this).
    */
    if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
        _ma_initialize_data_file(info->s, info->dfile.file))
    {
      fprintf(tracef, "Failed to open new table or write to data file\n");
      goto end;
    }
  }
  error= 0;
end:
  fprintf(tracef, "\n");
  if (kfile >= 0)
    error|= my_close(kfile, MYF(MY_WME));
  if (info != NULL)
    error|= maria_close(info);
  return error;
}


prototype_exec_hook(REDO_DROP_TABLE)
{
  char *name;
  int error= 1;
  MARIA_HA *info= NULL;
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
      rec->record_length)
  {
    fprintf(tracef, "Failed to read record\n");
    goto end;
  }
  name= log_record_buffer.str;
  fprintf(tracef, "Table '%s'", name);
  info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
  if (info)
  {
    MARIA_SHARE *share= info->s;
    /*
      We may have open instances on this table. But it does not matter, the
      maria_extra() below will take care of them.
    */
    if (!share->base.born_transactional)
    {
      fprintf(tracef, ", is not transactional\n");
      ALERT_USER();
      error= 0;
      goto end;
    }
    if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
    {
      fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
              " record, ignoring",
              LSN_IN_HEX(share->state.create_rename_lsn));
      error= 0;
      goto end;
    }
    if (maria_is_crashed(info))
    {
      fprintf(tracef, ", is crashed, dropping it");
      ALERT_USER();
    }
    /*
      This maria_extra() call serves to signal that old open instances of
      this table should not be used anymore, and (only on Windows) to close
      open files so they can be deleted
    */
    if (maria_extra(info, HA_EXTRA_PREPARE_FOR_DELETE, NULL) ||
        maria_close(info))
      goto end;
    info= NULL;
  }
  /* if does not exist, is older, or its header is corrupted, drop it */
  fprintf(tracef, ", dropping '%s'", name);
  if (maria_delete_table(name))
  {
    fprintf(tracef, "Failed to drop table\n");
    goto end;
  }
  error= 0;
end:
  fprintf(tracef, "\n");
  if (info != NULL)
    error|= maria_close(info);
  return error;
}


prototype_exec_hook(FILE_ID)
{
  uint16 sid;
  int error= 1;
  const char *name;
  MARIA_HA *info;

  if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
  {
    fprintf(tracef, "ignoring because before checkpoint\n");
    return 0;
  }

  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    fprintf(tracef, "Failed to read record\n");
    goto end;
  }
  sid= fileid_korr(log_record_buffer.str);
  info= all_tables[sid].info;
  if (info != NULL)
  {
    fprintf(tracef, "   Closing table '%s'\n", info->s->open_file_name);
    prepare_table_for_close(info, rec->lsn);
    if (maria_close(info))
    {
      fprintf(tracef, "Failed to close table\n");
      goto end;
    }
    all_tables[sid].info= NULL;
  }
  name= log_record_buffer.str + FILEID_STORE_SIZE;
  if (new_table(sid, name, -1, -1, rec->lsn))
    goto end;
  error= 0;
end:
  return error;
}


static int new_table(uint16 sid, const char *name,
                     File org_kfile, File org_dfile, LSN lsn)
{
  /*
    -1 (skip table): close table and return 0;
    1 (error): close table and return 1;
    0 (success): leave table open and return 0.
  */
  int error= 1;

  fprintf(tracef, "Table '%s', id %u", name, sid);
  MARIA_HA *info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
  if (info == NULL)
  {
    fprintf(tracef, ", is absent (must have been dropped later?)"
           " or its header is so corrupted that we cannot open it;"
           " we skip it\n");
    error= 0;
    goto end;
  }
  if (maria_is_crashed(info))
  {
    fprintf(tracef, "Table is crashed, can't apply log records to it\n");
    goto end;
   /*
      we should make an exception for REDO_REPAIR_TABLE records: if we want to
      execute them, we should not reject the crashed table here.
    */
  }
  MARIA_SHARE *share= info->s;
  /* check that we're not already using it */
  DBUG_ASSERT(share->reopen == 1);
  DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
  if (!share->base.born_transactional)
  {
    fprintf(tracef, ", is not transactional\n");
    ALERT_USER();
    error= -1;
    goto end;
  }
  if (cmp_translog_addr(lsn, share->state.create_rename_lsn) <= 0)
  {
    fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
            " record, ignoring",
            LSN_IN_HEX(share->state.create_rename_lsn));
    error= -1;
    goto end;
  }
  /* don't log any records for this work */
  _ma_tmp_disable_logging_for_table(share);
  /* execution of some REDO records relies on data_file_length */
  my_off_t dfile_len= my_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME));
  my_off_t kfile_len= my_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME));
  if ((dfile_len == MY_FILEPOS_ERROR) ||
      (kfile_len == MY_FILEPOS_ERROR))
  {
    fprintf(tracef, ", length unknown\n");
    goto end;
  }
  share->state.state.data_file_length= dfile_len;
  share->state.state.key_file_length=  kfile_len;
  if ((dfile_len % share->block_size) > 0)
  {
    fprintf(tracef, ", has too short last page\n");
    /* Recovery will fix this, no error */
    ALERT_USER();
  }
  all_tables[sid].info= info;
  all_tables[sid].org_kfile= org_kfile;
  all_tables[sid].org_dfile= org_dfile;
  fprintf(tracef, ", opened\n");
  error= 0;
end:
  if (error)
  {
    if (info != NULL)
      maria_close(info);
    if (error == -1)
      error= 0;
  }
  return error;
}


prototype_exec_hook(REDO_INSERT_ROW_HEAD)
{
  int error= 1;
  uchar *buff= NULL;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    goto end;
  /*
    If REDO's LSN is > page's LSN (read from disk), we are going to modify the
    page and change its LSN. The normal runtime code stores the UNDO's LSN
    into the page. Here storing the REDO's LSN (rec->lsn) would work
    (we are not writing to the log here, so don't have to "flush up to UNDO's
    LSN"). But in a test scenario where we do updates at runtime, then remove
    tables, apply the log and check that this results in the same table as at
    runtime, putting the same LSN as runtime had done will decrease
    differences. So we use the UNDO's LSN which is current_group_end_lsn.
  */
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    fprintf(tracef, "Failed to read record\n");
    goto end;
  }
  buff= log_record_buffer.str;
  if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
                                             HEAD_PAGE,
                                             buff + FILEID_STORE_SIZE,
                                             buff +
                                             FILEID_STORE_SIZE +
                                             PAGE_STORE_SIZE +
                                             DIRPOS_STORE_SIZE,
                                             rec->record_length -
                                             (FILEID_STORE_SIZE +
                                              PAGE_STORE_SIZE +
                                              DIRPOS_STORE_SIZE)))
    goto end;
  error= 0;
end:
  return error;
}


prototype_exec_hook(REDO_INSERT_ROW_TAIL)
{
  int error= 1;
  uchar *buff;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    goto end;
  enlarge_buffer(rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    fprintf(tracef, "Failed to read record\n");
    goto end;
  }
  buff= log_record_buffer.str;
  if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
                                             TAIL_PAGE,
                                             buff + FILEID_STORE_SIZE,
                                             buff +
                                             FILEID_STORE_SIZE +
                                             PAGE_STORE_SIZE +
                                             DIRPOS_STORE_SIZE,
                                             rec->record_length -
                                             (FILEID_STORE_SIZE +
                                              PAGE_STORE_SIZE +
                                              DIRPOS_STORE_SIZE)))
    goto end;
  error= 0;

end:
  return error;
}


prototype_exec_hook(REDO_PURGE_ROW_HEAD)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    goto end;
  if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
                                            HEAD_PAGE,
                                            rec->header + FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}


prototype_exec_hook(REDO_PURGE_ROW_TAIL)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    goto end;
  if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
                                            TAIL_PAGE,
                                            rec->header + FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}


prototype_exec_hook(REDO_PURGE_BLOCKS)
{
  int error= 1;
  uchar *buff;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    goto end;
  enlarge_buffer(rec);

  if (log_record_buffer.str == NULL ||
      translog_read_record(rec->lsn, 0, rec->record_length,
                           log_record_buffer.str, NULL) !=
       rec->record_length)
  {
    fprintf(tracef, "Failed to read record\n");
    goto end;
  }

  buff= log_record_buffer.str;
  if (_ma_apply_redo_purge_blocks(info, current_group_end_lsn,
                                  buff + FILEID_STORE_SIZE))
    goto end;
  error= 0;
end:
  return error;
}


prototype_exec_hook(REDO_DELETE_ALL)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
  if (info == NULL)
    goto end;
  fprintf(tracef, "   deleting all %lu rows\n",
         (ulong)info->s->state.state.records);
  if (maria_delete_all_rows(info))
    goto end;
  error= 0;
end:
  return error;
}


#define set_undo_lsn_for_active_trans(I, L) do {  \
    all_active_trans[I].undo_lsn= L;                            \
    if (all_active_trans[I].first_undo_lsn == LSN_IMPOSSIBLE)   \
      all_active_trans[I].first_undo_lsn= L; } while (0)

prototype_exec_hook(UNDO_ROW_INSERT)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  if (info == NULL)
    goto end;
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  /*
    in an upcoming patch ("recovery of the state"), we introduce
    state.is_of_lsn. For now, we just assume the state is old (true when we
    recreate tables from scratch - but not idempotent).
  */
  {
    fprintf(tracef, "   state older than record, updating rows' count\n");
    info->s->state.state.records++;
    /** @todo RECOVERY BUG Also update the table's checksum */
    /**
       @todo some bits below will rather be set when executing UNDOs related
       to keys
    */
    info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
      STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
  }
  fprintf(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
  error= 0;
end:
  return error;
}


prototype_exec_hook(UNDO_ROW_DELETE)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  if (info == NULL)
    goto end;
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  {
    fprintf(tracef, "   state older than record, updating rows' count\n");
    info->s->state.state.records--;
    info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
      STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
  }
  fprintf(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
  error= 0;
end:
  return error;
}


prototype_exec_hook(UNDO_ROW_UPDATE)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  if (info == NULL)
    goto end;
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  {
    info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
      STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
  }
  error= 0;
end:
  return error;
}


prototype_exec_hook(UNDO_ROW_PURGE)
{
  int error= 1;
  MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
  if (info == NULL)
    goto end;
  /* this a bit broken, but this log record type will be deleted soon */
  set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
  {
    fprintf(tracef, "   state older than record, updating rows' count\n");
    info->s->state.state.records--;
    info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
      STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
  }
  fprintf(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
  error= 0;
end:
  return error;
}


prototype_exec_hook(COMMIT)
{
  uint16 sid= rec->short_trid;
  TrID long_trid= all_active_trans[sid].long_trid;
  LSN gslsn= all_active_trans[sid].group_start_lsn;
  char llbuf[22];
  if (long_trid == 0)
  {
    fprintf(tracef, "We don't know about transaction with short_trid %u;"
           "it probably committed long ago, forget it\n", sid);
    return 0;
  }
  llstr(long_trid, llbuf);
  fprintf(tracef, "Transaction long_trid %s short_trid %u committed", llbuf, sid);
  if (gslsn != LSN_IMPOSSIBLE)
  {
    /*
      It's not an error, it may be that trn got a disk error when writing to a
      table, so an unfinished group staid in the log.
    */
    fprintf(tracef, ", with group at LSN (%lu,0x%lx) short_trid %u aborted\n",
           (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
    all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
  }
  else
    fprintf(tracef, "\n");
  bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
#ifdef MARIA_VERSIONING
  /*
    if real recovery:
    transaction was committed, move it to some separate list for later
    purging (but don't purge now! purging may have been started before, we
    may find REDO_PURGE records soon).
  */
#endif
  return 0;
}


static int run_redo_phase(LSN lsn, my_bool apply)
{
  /* install hooks for execution */
#define install_exec_hook(R)                                            \
  log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
    exec_LOGREC_ ## R;
  install_exec_hook(LONG_TRANSACTION_ID);
  install_exec_hook(CHECKPOINT);
  install_exec_hook(REDO_CREATE_TABLE);
  install_exec_hook(REDO_DROP_TABLE);
  install_exec_hook(FILE_ID);
  install_exec_hook(REDO_INSERT_ROW_HEAD);
  install_exec_hook(REDO_INSERT_ROW_TAIL);
  install_exec_hook(REDO_PURGE_ROW_HEAD);
  install_exec_hook(REDO_PURGE_ROW_TAIL);
  install_exec_hook(REDO_PURGE_BLOCKS);
  install_exec_hook(REDO_DELETE_ALL);
  install_exec_hook(UNDO_ROW_INSERT);
  install_exec_hook(UNDO_ROW_DELETE);
  install_exec_hook(UNDO_ROW_UPDATE);
  install_exec_hook(UNDO_ROW_PURGE);
  install_exec_hook(COMMIT);

  current_group_end_lsn= LSN_IMPOSSIBLE;

  TRANSLOG_HEADER_BUFFER rec;
  /*
    instead of this block below we will soon use
    translog_first_lsn_in_log()...
  */
  int len= translog_read_record_header(lsn, &rec);

  /** @todo EOF should be detected */
  if (len == RECHEADER_READ_ERROR)
  {
    fprintf(tracef, "Cannot find a first record\n");
    return 1;
  }
  struct st_translog_scanner_data scanner;
  if (translog_init_scanner(lsn, 1, &scanner))
  {
    fprintf(tracef, "Scanner init failed\n");
    return 1;
  }
  uint i;
  for (i= 1;;i++)
  {
    uint16 sid= rec.short_trid;
    const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
    display_record_position(log_desc, &rec, i);

    /*
      A complete group is a set of log records with an "end mark" record
      (e.g. a set of REDOs for an operation, terminated by an UNDO for this
      operation); if there is no "end mark" record the group is incomplete
      and won't be executed.
    */
    if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
        (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
    {
      if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
      {
        if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
        {
          /*
            can happen if the transaction got a table write error, then
            unlocked tables thus wrote a COMMIT record.
          */
          fprintf(tracef, "\nDiscarding unfinished group before this record\n");
          ALERT_USER();
          all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
        }
        else
        {
          /*
            There is a complete group for this transaction, containing more
            than this event.
          */
          fprintf(tracef, "   ends a group:\n");
          struct st_translog_scanner_data scanner2;
          TRANSLOG_HEADER_BUFFER rec2;
          len=
            translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
          if (len < 0) /* EOF or error */
          {
            fprintf(tracef, "Cannot find record where it should be\n");
            return 1;
          }
          if (translog_init_scanner(rec2.lsn, 1, &scanner2))
          {
            fprintf(tracef, "Scanner2 init failed\n");
            return 1;
          }
          current_group_end_lsn= rec.lsn;
          do
          {
            if (rec2.short_trid == sid) /* it's in our group */
            {
              const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
              display_record_position(log_desc2, &rec2, 0);
              if (apply && display_and_apply_record(log_desc2, &rec2))
                return 1;
            }
            len= translog_read_next_record_header(&scanner2, &rec2);
            if (len < 0) /* EOF or error */
            {
              fprintf(tracef, "Cannot find record where it should be\n");
              return 1;
            }
          }
          while (rec2.lsn < rec.lsn);
          translog_free_record_header(&rec2);
          /* group finished */
          all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
          current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
          display_record_position(log_desc, &rec, 0);
        }
      }
      if (apply && display_and_apply_record(log_desc, &rec))
        return 1;
    }
    else /* record does not end group */
    {
      /* just record the fact, can't know if can execute yet */
      if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
      {
        /* group not yet started */
        all_active_trans[sid].group_start_lsn= rec.lsn;
      }
    }
    len= translog_read_next_record_header(&scanner, &rec);
    if (len < 0)
    {
      switch (len)
      {
      case RECHEADER_READ_EOF:
        fprintf(tracef, "EOF on the log\n");
        break;
      case RECHEADER_READ_ERROR:
        fprintf(stderr, "Error reading log\n");
        return 1;
      }
      break;
    }
  }
  translog_free_record_header(&rec);
  return 0;
}


/**
   @brief Informs about any aborted groups or unfinished transactions,
   prepares for the UNDO phase if needed.

   @param  prepare_for_undo_phase

   @note Observe that it may init trnman.
*/
static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
{
  uint sid, unfinished= 0;

  hash_free(&all_dirty_pages);
  /*
    hash_free() can be called multiple times probably, but be safe it that
    changes
  */
  bzero(&all_dirty_pages, sizeof(all_dirty_pages));
  my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
  dirty_pages_pool= NULL;

  if (prepare_for_undo_phase && trnman_init())
    return -1;

  for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
  {
    TrID long_trid= all_active_trans[sid].long_trid;
    LSN gslsn= all_active_trans[sid].group_start_lsn;
    TRN *trn;
    if (gslsn != LSN_IMPOSSIBLE)
    {
      fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
             (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
      ALERT_USER();
    }
    if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
    {
      char llbuf[22];
      llstr(long_trid, llbuf);
      fprintf(tracef, "Transaction long_trid %s short_trid %u unfinished\n",
             llbuf, sid);
      /* dummy_transaction_object serves only for DDLs */
      DBUG_ASSERT(long_trid != 0);
      if (prepare_for_undo_phase)
      {
        if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
          return -1;
        trn->undo_lsn= all_active_trans[sid].undo_lsn;
      }
      /* otherwise we will just warn about it */
      unfinished++;
    }
#ifdef MARIA_VERSIONING
    /*
      If real recovery: if transaction was committed, move it to some separate
      list for soon purging.
    */
#endif
  }

  my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
  all_active_trans= NULL;

  /*
    The UNDO phase uses some normal run-time code of ROLLBACK: generates log
    records, etc; prepare tables for that
  */
  LSN addr= translog_get_horizon();
  for (sid= 0; sid <= SHARE_ID_MAX; sid++)
  {
    MARIA_HA *info= all_tables[sid].info;
    if (info != NULL)
    {
      prepare_table_for_close(info, addr);
      /*
        But we don't close it; we leave it available for the UNDO phase;
        it's likely that the UNDO phase will need it.
      */
      if (prepare_for_undo_phase)
        translog_assign_id_to_share_from_recovery(info->s, sid);
    }
  }

  /* we don't need all_tables anymore, maria_open_list is enough */
  my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
  all_tables= NULL;

  /*
    We could take a checkpoint here, in case of a crash during the UNDO
    phase. The drawback is that a page which got a REDO (thus, flushed
    by this would-be checkpoint) is likely to have an UNDO executed on it
    soon. And so, the flush was probably lost time.
    So for now we prefer to do recovery with maximum speed and take a
    checkpoint only at the end of the UNDO phase.
  */

  return unfinished;
}


static int run_undo_phase(uint unfinished)
{
  if (unfinished > 0)
  {
    fprintf(tracef, "%u transactions will be rolled back\n", unfinished);
    for( ; unfinished-- ; )
    {
      char llbuf[22];
      TRN *trn= trnman_get_any_trn();
      DBUG_ASSERT(trn != NULL);
      llstr(trn->trid, llbuf);
      fprintf(tracef, "Rolling back transaction of long id %s\n", llbuf);
      /* of course we miss execution of UNDOs here */
      if (trnman_rollback_trn(trn))
        return 1;
      /* We could want to span a few threads (4?) instead of 1 */
      /* In the future, we want to have this phase *online* */
    }
  }
  return 0;
}


static void prepare_table_for_close(MARIA_HA *info,
                                    LSN at_lsn __attribute__ ((unused)))
{
  MARIA_SHARE *share= info->s;
  /* we will soon use at_lsn here */
  _ma_reenable_logging_for_table(share);
}


static MARIA_HA *get_MARIA_HA_from_REDO_record(const
                                               TRANSLOG_HEADER_BUFFER *rec)
{
  uint16 sid;
  pgcache_page_no_t page;
  MARIA_HA *info;
  char llbuf[22];

  sid= fileid_korr(rec->header);
  page= page_korr(rec->header + FILEID_STORE_SIZE);
  /**
     @todo RECOVERY BUG
     - for REDO_PURGE_BLOCKS, page is not at this pos
     - for DELETE_ALL, record ends here! buffer overrun!
     Solution: caller should pass a param enum { i_am_about_data_file,
     i_am_about_index_file, none }.
  */
  llstr(page, llbuf);
  fprintf(tracef, "   For page %s of table of short id %u", llbuf, sid);
  info= all_tables[sid].info;
  if (info == NULL)
  {
    fprintf(tracef, ", table skipped, so skipping record\n");
    return NULL;
  }
  fprintf(tracef, ", '%s'", info->s->open_file_name);
  /* detect if an open instance of a dropped table (internal bug) */
  DBUG_ASSERT(info->s->last_version != 0);
  if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
  {
    /**
       @todo RECOVERY BUG always assuming this is REDO for data file, but it
       could soon be index file
    */
    uint64 file_and_page_id=
      (((uint64)all_tables[sid].org_dfile) << 32) | page;
    struct st_dirty_page *dirty_page= (struct st_dirty_page *)
      hash_search(&all_dirty_pages,
                  (uchar *)&file_and_page_id, sizeof(file_and_page_id));
    if ((dirty_page == NULL) ||
        cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0)
    {
      fprintf(tracef, ", ignoring because of dirty_pages list\n");
      return NULL;
    }
  }

  /*
    So we are going to read the page, and if its LSN is older than the
    record's we will modify the page
  */
  fprintf(tracef, ", applying record\n");
  return info;
}


static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
                                               TRANSLOG_HEADER_BUFFER *rec)
{
  uint16 sid;
  MARIA_HA *info;

  sid= fileid_korr(rec->header + LSN_STORE_SIZE);
  fprintf(tracef, "   For table of short id %u", sid);
  info= all_tables[sid].info;
  if (info == NULL)
  {
    fprintf(tracef, ", table skipped, so skipping record\n");
    return NULL;
  }
  fprintf(tracef, ", '%s'", info->s->open_file_name);
  DBUG_ASSERT(info->s->last_version != 0);
  fprintf(tracef, ", applying record\n");
  return info;
}


static int parse_checkpoint_record(LSN lsn)
{
  uint i;
  TRANSLOG_HEADER_BUFFER rec;

  fprintf(tracef, "Loading data from checkpoint record\n");
  int len= translog_read_record_header(lsn, &rec);

  /** @todo EOF should be detected */
  if (len == RECHEADER_READ_ERROR)
  {
    fprintf(tracef, "Cannot find checkpoint record where it should be\n");
    return 1;
  }

  enlarge_buffer(&rec);
  if (log_record_buffer.str == NULL ||
      translog_read_record(rec.lsn, 0, rec.record_length,
                           log_record_buffer.str, NULL) !=
      rec.record_length)
  {
    fprintf(tracef, "Failed to read record\n");
    return 1;
  }

  char *ptr= log_record_buffer.str;
  checkpoint_start= lsn_korr(ptr);
  ptr+= LSN_STORE_SIZE;

  /* transactions */
  uint nb_active_transactions= uint2korr(ptr);
  ptr+= 2;
  fprintf(tracef, "%u active transactions\n", nb_active_transactions);
  LSN minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
  ptr+= LSN_STORE_SIZE;

  /*
    how much brain juice and discussions there was to come to writing this
    line
  */
  set_if_smaller(checkpoint_start, minimum_rec_lsn_of_active_transactions);

  for (i= 0; i < nb_active_transactions; i++)
  {
    uint16 sid= uint2korr(ptr);
    ptr+= 2;
    TrID long_id= uint6korr(ptr);
    ptr+= 6;
    DBUG_ASSERT(sid > 0 && long_id > 0);
    LSN undo_lsn= lsn_korr(ptr);
    ptr+= LSN_STORE_SIZE;
    LSN first_undo_lsn= lsn_korr(ptr);
    ptr+= LSN_STORE_SIZE;
    new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
  }
  uint nb_committed_transactions= uint4korr(ptr);
  ptr+= 4;
  fprintf(tracef, "%lu committed transactions\n",
          (ulong)nb_committed_transactions);
  /* no purging => committed transactions are not important */
  ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;

  /* tables  */
  uint nb_tables= uint4korr(ptr);
  fprintf(tracef, "%u open tables\n", nb_tables);
  for (i= 0; i< nb_tables; i++)
  {
    char name[FN_REFLEN];
    uint16 sid= uint2korr(ptr);
    ptr+= 2;
    DBUG_ASSERT(sid > 0);
    File kfile= uint4korr(ptr);
    ptr+= 4;
    File dfile= uint4korr(ptr);
    ptr+= 4;
    LSN first_log_write_lsn= lsn_korr(ptr);
    ptr+= LSN_STORE_SIZE;
    uint name_len= strlen(ptr) + 1;
    ptr+= name_len;
    strnmov(name, ptr, sizeof(name));
    if (new_table(sid, name, kfile, dfile, first_log_write_lsn))
      return 1;
  }

  /* dirty pages */
  uint nb_dirty_pages= uint4korr(ptr);
  ptr+= 4;
  if (hash_init(&all_dirty_pages, &my_charset_bin, nb_dirty_pages,
                offsetof(struct st_dirty_page, file_and_page_id),
                sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
                NULL, NULL, 0))
    return 1;
  dirty_pages_pool=
    (struct st_dirty_page *)my_malloc(nb_dirty_pages *
                                      sizeof(struct st_dirty_page),
                                      MYF(MY_WME));
  if (unlikely(dirty_pages_pool == NULL))
    return 1;
  struct st_dirty_page *next_dirty_page_in_pool= dirty_pages_pool;
  LSN minimum_rec_lsn_of_dirty_pages= LSN_MAX;
  for (i= 0; i < nb_dirty_pages ; i++)
  {
    File fileid= uint4korr(ptr);
    ptr+= 4;
    pgcache_page_no_t pageid= uint4korr(ptr);
    ptr+= 4;
    LSN rec_lsn= lsn_korr(ptr);
    ptr+= LSN_STORE_SIZE;
    if (new_page(fileid, pageid, rec_lsn, next_dirty_page_in_pool++))
      return 1;
    set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
  }
  /* after that, there will be no insert/delete into the hash */
  /*
    sanity check on record (did we screw up with all those "ptr+=", did the
    checkpoint write code and checkpoint read code go out of sync?).
  */
  /**
     @todo This probably presently and hopefully detects that
     first_log_write_lsn is not written by the checkpoint record; we need
     to add MARIA_SHARE::first_log_write_lsn, fill it with a inwrite-hook of
     LOGREC_FILE_ID (note that when we write this record we hold intern_lock,
     so Checkpoint will read the LSN correctly), and store it in the
     checkpoint record.
  */
  if (ptr != (log_record_buffer.str + log_record_buffer.length))
  {
    fprintf(tracef, "checkpoint record corrupted\n");
    return 1;
  }
  set_if_smaller(checkpoint_start, minimum_rec_lsn_of_dirty_pages);

  return 0;
}

static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
                    struct st_dirty_page *dirty_page)
{
  /* serves as hash key */
  dirty_page->file_and_page_id= (((uint64)fileid) << 32) | pageid;
  dirty_page->rec_lsn= rec_lsn;
  return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
}


static int close_all_tables()
{
  int error= 0;
  LIST *list_element, *next_open;
  MARIA_HA *info;
  pthread_mutex_lock(&THR_LOCK_maria);
  if (maria_open_list == NULL)
    goto end;
  fprintf(tracef, "Closing all tables\n");
  for (list_element= maria_open_list ; list_element ; list_element= next_open)
  {
    next_open= list_element->next;
    info= (MARIA_HA*)list_element->data;
    pthread_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
    error|= maria_close(info);
    pthread_mutex_lock(&THR_LOCK_maria);
  }
end:
  pthread_mutex_unlock(&THR_LOCK_maria);
  return error;
}

/* some comments and pseudo-code which we keep for later */
#if 0
  /*
    MikaelR suggests: support checkpoints during REDO phase too: do checkpoint
    after a certain amount of log records have been executed. This helps
    against repeated crashes. Those checkpoints could not be user-requested
    (as engine is not communicating during the REDO phase), so they would be
    automatic: this changes the original assumption that we don't write to the
    log while in the REDO phase, but why not. How often should we checkpoint?
  */

  /*
    We want to have two steps:
    engine->recover_with_max_memory();
    next_engine->recover_with_max_memory();
    engine->init_with_normal_memory();
    next_engine->init_with_normal_memory();
    So: in recover_with_max_memory() allocate a giant page cache, do REDO
    phase, then all page cache is flushed and emptied and freed (only retain
    small structures like TM): take full checkpoint, which is useful if
    next engine crashes in its recovery the next second.
    Destroy all shares (maria_close()), then at init_with_normal_memory() we
    do this:
  */

  /**** UNDO PHASE *****/

  print_information_to_error_log(nb of trans to roll back, nb of prepared trans
  /*
    Launch one or more threads to do the background rollback. Don't wait for
    them to complete their rollback (background rollback; for debugging, we
    can have an option which waits). Set a counter (total_of_rollback_threads)
    to the number of threads to lauch.

    Note that InnoDB's rollback-in-background works as long as InnoDB is the
    last engine to recover, otherwise MySQL will refuse new connections until
    the last engine has recovered so it's not "background" from the user's
    point of view. InnoDB is near top of sys_table_types so all others
    (e.g. BDB) recover after it... So it's really "online rollback" only if
    InnoDB is the only engine.
  */

  /* wake up delete/update handler */
  /* tell the TM that it can now accept new transactions */

  /*
    mark that checkpoint requests are now allowed.
  */
}

pthread_handler_decl rollback_background_thread()
{
  /*
    execute the normal runtime-rollback code for a bunch of transactions.
  */
  while (trans in list_of_trans_to_rollback_by_this_thread)
  {
    while (trans->undo_lsn != 0)
    {
      /* this is the normal runtime-rollback code: */
      record= log_read_record(trans->undo_lsn);
      execute_log_record_in_undo_phase(record);
      trans->undo_lsn= record.prev_undo_lsn;
    }
    /* remove trans from list */
  }
  lock_mutex(rollback_threads); /* or atomic counter */
  if (--total_of_rollback_threads == 0)
  {
    /*
      All rollback threads are done.  Print "rollback finished" to the error
      log and take a full checkpoint.
    */
  }
  unlock_mutex(rollback_threads);
  pthread_exit();
}
#endif