Commit adac9798 authored by unknown's avatar unknown

WL#3072 Maria Recovery

- new program maria_read_log to display and apply log records
found in a Maria log (see file's revision comment)
- minor, misc fixes


storage/maria/Makefile.am:
  new program maria_read_log
storage/maria/ha_maria.cc:
  create control file if missing
storage/maria/ma_blockrec.c:
  0 -> LSN_IMPOSSIBLE; comments
storage/maria/ma_checkpoint.h:
  preparations for Checkpoint module
storage/maria/ma_close.c:
  comment
storage/maria/ma_control_file.c:
  renaming constants.
  Possibility to say "open control file but don't create it if it's
  missing" (used by maria_read_log which does not want to create
  anything)
storage/maria/ma_control_file.h:
  renaming constants
storage/maria/ma_create.c:
  I had duplicated "linkname" and "linkname_ptr", now I see it's not
  needed, reverting. Indeed those variables don't contain interesting
  information; fixing log record accordingly (the links are in
  ci->data/index_file_name). Storing keystart in log record is needed,
  to know at which size we must extend the file if we replay
  LOGREC_CREATE_TABLE.
storage/maria/ma_loghandler.c:
  some structures need to be known to maria_read_log.c, taking
  them to ma_loghandler.h
storage/maria/ma_loghandler.h:
  we have page_store, adding page_korr.
  translog_lock() made public, because Checkpoint will need it (to
  write to control file).
  Some structures moved from ma_loghandler.c because maria_read_log.c
  needs them (needs to know the execute-in-REDO-phase hooks of each
  record).
storage/maria/ma_loghandler_lsn.h:
  constants defined in ma_control_file.h serve everywhere,
  and they relate to LSNs, so putting them in ma_loghandler_lsn.h.
  Stronger constraints in LSN_VALID().
storage/maria/ma_pagecache.c:
  renaming constants
storage/maria/ma_recovery.h:
  copyright
storage/maria/ma_test1.c:
  new prototype
storage/maria/ma_test2.c:
  new prototype
storage/maria/trnman_public.h:
  double-inclusion safe
storage/maria/unittest/ma_control_file-t.c:
  constants renamed, new prototype
storage/maria/unittest/ma_test_loghandler-t.c:
  constants renamed, new prototype
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
  constants renamed, new prototype
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  constants renamed, new prototype
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
  constants renamed, new prototype
storage/myisam/mi_close.c:
  comment
storage/maria/maria_read_log.c:
  program to read and print log records from a Maria transaction log,
  and optionally apply them to tables. Very basic, early version.
  Should serve as a base for Recovery's code. Designed to be idempotent.
  Create a log by running maria.test, then cd to var/master-data
  and run "maria_read_log --only-display" to see info about records;
  run "maria_read_log --display-and-apply" to also apply the records
  to tables (it's more interesting if you first wipe out the
  tables in var/master-data/test, to see how they get re-created).
  Only a few records are handled by now: LONG_TRANSACTION_ID,
  COMMIT, FILE_ID, REDO_CREATE_TABLE; place is ready for
  REDO_INSERT_ROW_HEAD where I could use Monty's help (search for
  "Monty" in the file). Note: changes to the index pages, index's header
  and bitmap pages are not properly logged yet, so don't expect
  the program to work with that.
parent bb8bde8f
......@@ -33,7 +33,7 @@ SUBDIRS = . unittest
EXTRA_DIST = ma_test_all.sh ma_test_all.res ma_ft_stem.c CMakeLists.txt plug.in
pkgdata_DATA = ma_test_all ma_test_all.res
pkglib_LIBRARIES = libmaria.a
bin_PROGRAMS = maria_chk maria_pack maria_ftdump
bin_PROGRAMS = maria_chk maria_pack maria_ftdump maria_read_log
maria_chk_DEPENDENCIES= $(LIBRARIES)
# Only reason to link with libmyisam.a here is that it's where some fulltext
# pieces are (but soon we'll remove fulltext dependencies from Maria).
......@@ -49,6 +49,12 @@ maria_pack_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@
maria_read_log_DEPENDENCIES=$(LIBRARIES)
maria_read_log_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
$(top_builddir)/storage/myisam/libmyisam.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@
noinst_PROGRAMS = ma_test1 ma_test2 ma_test3 ma_rt_test ma_sp_test
noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \
ma_sp_defs.h ma_fulltext.h ma_ftdefs.h ma_ft_test1.h \
......
......@@ -2241,7 +2241,7 @@ static int ha_maria_init(void *p)
maria_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES;
bzero(maria_log_pagecache, sizeof(*maria_log_pagecache));
maria_data_root= mysql_real_data_home;
res= maria_init() || ma_control_file_create_or_open() ||
res= maria_init() || ma_control_file_create_or_open(TRUE) ||
(init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) ||
......
......@@ -557,7 +557,8 @@ static my_bool check_if_zero(byte *pos, uint length)
SYNOPSIS
_ma_unpin_all_pages()
info Maria handler
undo_lsn LSN for undo pages. 0 if we shouldn't write undo (error)
undo_lsn LSN for undo pages. LSN_IMPOSSIBLE if we shouldn't write undo
(error)
NOTE
We unpin pages in the reverse order as they where pinned; This may not
......@@ -580,14 +581,15 @@ void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn)
DBUG_PRINT("info", ("undo_lsn: %lu", (ulong) undo_lsn));
/* True if not disk error */
DBUG_ASSERT(undo_lsn != 0 || !info->s->base.transactional);
DBUG_ASSERT((undo_lsn != LSN_IMPOSSIBLE) || !info->s->base.transactional);
if (!info->s->base.transactional)
{
/*
If this is a transactional table but with transactionality temporarily
disabled (like in ALTER TABLE) we need to give a sensible LSN to pages
and not 0. If this is not a transactional table it will reduce to 0.
and not LSN_IMPOSSIBLE. If this is not a transactional table it will
reduce to LSN_IMPOSSIBLE.
*/
undo_lsn= info->s->state.create_rename_lsn;
}
......@@ -1958,8 +1960,8 @@ static my_bool write_block_record(MARIA_HA *info,
size_t data_length= (size_t) (data - row_pos->data);
/* Log REDO changes of head page */
page_store(log_data+ FILEID_STORE_SIZE, head_block->page);
dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE,
page_store(log_data + FILEID_STORE_SIZE, head_block->page);
dirpos_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE,
row_pos->rownr);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
......@@ -2183,12 +2185,22 @@ static my_bool write_block_record(MARIA_HA *info,
disk_err:
/**
@todo RECOVERY we are going to let dirty pages go to disk while we have
logged UNDO, this violates WAL. If we have not written any full pages,
all dirty pages are pinned so we could just delete them from the
pagecache. Moreover, we have written some REDOs without a closing UNDO,
logged UNDO, this violates WAL. We must mark the table corrupted!
@todo RECOVERY we have written some REDOs without a closing UNDO,
it's possible that a next operation by this transaction succeeds and then
Recovery would glue the "orphan REDOs" to the succeeded operation and
execute the failed REDOs.
execute the failed REDOs. We need some mark "abort this group" in the
log, or mark the table corrupted (then user will repair it and thus REDOs
will be skipped).
@todo RECOVERY to not let write errors go unnoticed, pagecache_write()
should take a MARIA_HA* in argument, and it it
fails when flushing a page to disk it should call
(*the_maria_ha->write_error_func)(the_maria_ha)
and this hook will mark the table corrupted.
Maybe hook should be stored in the pagecache's block structure, or in a
hash "file->maria_ha*".
*/
/* Unpin all pinned pages to not cause problems for disk cache */
_ma_unpin_all_pages(info, 0);
......
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
/* Copyright (C) 2006,2007 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -21,14 +21,61 @@
/* This is the interface of this module. */
typedef enum enum_checkpoint_level {
NONE=-1,
INDIRECT, /* just write dirty_pages, transactions table and sync files */
MEDIUM, /* also flush all dirty pages which were already dirty at prev checkpoint*/
FULL /* also flush all dirty pages */
typedef enum enum_ma_checkpoint_level {
CHECKPOINT_NONE= 0,
/* just write dirty_pages, transactions table and sync files */
CHECKPOINT_INDIRECT,
/* also flush all dirty pages which were already dirty at prev checkpoint */
CHECKPOINT_MEDIUM,
/* also flush all dirty pages */
CHECKPOINT_FULL
} CHECKPOINT_LEVEL;
void request_asynchronous_checkpoint(CHECKPOINT_LEVEL level);
my_bool execute_synchronous_checkpoint(CHECKPOINT_LEVEL level);
my_bool execute_asynchronous_checkpoint_if_any();
/* that's all that's needed in the interface */
C_MODE_START
int ma_checkpoint_init();
void ma_checkpoint_end();
int ma_checkpoint_execute(CHECKPOINT_LEVEL level, my_bool no_wait);
C_MODE_END
/**
@brief reads some LSNs with special trickery
If a 64-bit variable transitions between both halves being zero to both
halves being non-zero, and back, this function can be used to do a read of
it (without mutex, without atomic load) which always produces a correct
(though maybe slightly old) value (even on 32-bit CPUs). The value is at
least as new as the latest mutex unlock done by the calling thread.
The assumption is that the system sets both 4-byte halves either at the
same time, or one after the other (in any order), but NOT some bytes of the
first half then some bytes of the second half then the rest of bytes of the
first half. With this assumption, the function can detect when it is
seeing an inconsistent value.
@param LSN pointer to the LSN variable to read
@return LSN part (most significant byte always 0)
*/
#if ( SIZEOF_CHARP >= 8 )
/* 64-bit CPU, 64-bit reads are atomic */
#define lsn_read_non_atomic LSN_WITH_FLAGS_TO_LSN
#else
static inline LSN lsn_read_non_atomic_32(const volatile LSN *x)
{
/*
32-bit CPU, 64-bit reads may give a mixed of old half and new half (old
low bits and new high bits, or the contrary).
*/
for (;;) /* loop until no atomicity problems */
{
/*
Remove most significant byte in case this is a LSN_WITH_FLAGS object.
Those flags in TRN::first_undo_lsn break the condition on transitions so
they must be removed below.
*/
LSN y= LSN_WITH_FLAGS_TO_LSN(*x);
if (likely((y == LSN_IMPOSSIBLE) || LSN_VALID(y)))
return y;
}
}
#define lsn_read_non_atomic(x) lsn_read_non_atomic_32(&x)
#endif
......@@ -85,6 +85,7 @@ int maria_close(register MARIA_HA *info)
not change the crashed state.
We can NOT write the state in other cases as other threads
may be using the file at this point
IF using --external-locking, which does not apply to Maria.
*/
if (share->mode != O_RDONLY && maria_is_crashed(info))
_ma_state_info_write(share->kfile.file, &share->state, 1);
......
......@@ -40,15 +40,9 @@
#define CONTROL_FILE_FILENO_SIZE 4
#define CONTROL_FILE_SIZE (CONTROL_FILE_FILENO_OFFSET + CONTROL_FILE_FILENO_SIZE)
/*
This module owns these two vars.
uint32 is always atomically updated, but LSN is 8 bytes, we will need
provisions to ensure that it's updated atomically in
ma_control_file_write_and_force(). Probably the log mutex could be
used. TODO.
*/
LSN last_checkpoint_lsn;
uint32 last_logno;
/* This module owns these two vars. */
LSN last_checkpoint_lsn= LSN_IMPOSSIBLE;
uint32 last_logno= FILENO_IMPOSSIBLE;
/**
@brief If log's lock should be asserted when writing to control file.
......@@ -65,16 +59,16 @@ my_bool maria_multi_threaded= FALSE;
static int control_file_fd= -1;
/*
Initialize control file subsystem
SYNOPSIS
ma_control_file_create_or_open()
@brief Initialize control file subsystem
Looks for the control file. If absent, it's a fresh start, creates file.
Looks for the control file. If none and creation is requested, creates file.
If present, reads it to find out last checkpoint's LSN and last log, updates
the last_checkpoint_lsn and last_logno global variables.
Called at engine's start.
@param create_if_missing
@note
The format of the control file is:
4 bytes: magic string
4 bytes: checksum of the following bytes
......@@ -82,11 +76,11 @@ static int control_file_fd= -1;
4 bytes: offset in log where last checkpoint is
4 bytes: number of last log
RETURN
0 - OK
1 - Error (in which case the file is left closed)
@return Operation status
@retval 0 OK
@retval 1 Error (in which case the file is left closed)
*/
CONTROL_FILE_ERROR ma_control_file_create_or_open()
CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing)
{
char buffer[CONTROL_FILE_SIZE];
char name[FN_REFLEN];
......@@ -115,6 +109,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open()
if (create_file)
{
if (!create_if_missing)
DBUG_RETURN(CONTROL_FILE_MISSING);
if ((control_file_fd= my_create(name, 0,
open_flags, MYF(MY_SYNC_DIR))) < 0)
DBUG_RETURN(CONTROL_FILE_UNKNOWN_ERROR);
......@@ -136,8 +132,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open()
*/
/* init the file with these "undefined" values */
DBUG_RETURN(ma_control_file_write_and_force(CONTROL_FILE_IMPOSSIBLE_LSN,
CONTROL_FILE_IMPOSSIBLE_FILENO,
DBUG_RETURN(ma_control_file_write_and_force(LSN_IMPOSSIBLE,
FILENO_IMPOSSIBLE,
CONTROL_FILE_UPDATE_ALL));
}
......@@ -315,8 +311,8 @@ int ma_control_file_end()
As this module owns these variables, closing the module forbids access to
them (just a safety):
*/
last_checkpoint_lsn= CONTROL_FILE_IMPOSSIBLE_LSN;
last_logno= CONTROL_FILE_IMPOSSIBLE_FILENO;
last_checkpoint_lsn= LSN_IMPOSSIBLE;
last_logno= FILENO_IMPOSSIBLE;
DBUG_RETURN(close_error);
}
......@@ -19,27 +19,17 @@
*/
#define CONTROL_FILE_BASE_NAME "maria_control"
/*
indicate absence of the log file number; first log is always number 1, 0 is
impossible.
*/
#define CONTROL_FILE_IMPOSSIBLE_FILENO 0
/* logs always have a header */
#define CONTROL_FILE_IMPOSSIBLE_LOG_OFFSET 0
/* indicate absence of LSN. */
#define CONTROL_FILE_IMPOSSIBLE_LSN ((LSN)0)
/* Here is the interface of this module */
/*
LSN of the last checkoint
(if last_checkpoint_lsn == CONTROL_FILE_IMPOSSIBLE_LSN
then there was never a checkpoint)
(if last_checkpoint_lsn == LSN_IMPOSSIBLE then there was never a checkpoint)
*/
extern LSN last_checkpoint_lsn;
/*
Last log number (if last_logno ==
CONTROL_FILE_IMPOSSIBLE_FILENO then there is no log file yet)
Last log number (if last_logno == FILENO_IMPOSSIBLE then there is no log
file yet)
*/
extern uint32 last_logno;
......@@ -51,6 +41,7 @@ typedef enum enum_control_file_error {
CONTROL_FILE_TOO_BIG,
CONTROL_FILE_BAD_MAGIC_STRING,
CONTROL_FILE_BAD_CHECKSUM,
CONTROL_FILE_MISSING,
CONTROL_FILE_UNKNOWN_ERROR /* any other error */
} CONTROL_FILE_ERROR;
......@@ -63,11 +54,11 @@ extern "C" {
#endif
/*
Looks for the control file. If absent, it's a fresh start, create file.
If present, read it to find out last checkpoint's LSN and last log.
Looks for the control file. If none and creation was requested, creates file.
If present, reads it to find out last checkpoint's LSN and last log.
Called at engine's start.
*/
CONTROL_FILE_ERROR ma_control_file_create_or_open();
CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool);
/*
Write information durably to the control file.
Called when we have created a new log (after syncing this log's creation)
......
......@@ -52,8 +52,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
unique_key_parts,fulltext_keys,offset, not_block_record_extra_length;
uint max_field_lengths, extra_header_size;
ulong reclength, real_reclength,min_pack_length;
char filename[FN_REFLEN], dlinkname[FN_REFLEN], *dlinkname_ptr= NULL,
klinkname[FN_REFLEN], *klinkname_ptr= NULL;
char filename[FN_REFLEN], linkname[FN_REFLEN], *linkname_ptr;
ulong pack_reclength;
ulonglong tot_length,max_rows, tmp;
enum en_fieldtype type;
......@@ -628,7 +627,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
share.state.dellink = HA_OFFSET_ERROR;
share.state.first_bitmap_with_space= 0;
share.state.create_rename_lsn= 0;
share.state.create_rename_lsn= LSN_IMPOSSIBLE;
share.state.process= (ulong) getpid();
share.state.unique= (ulong) 0;
share.state.update_count=(ulong) 0;
......@@ -721,9 +720,9 @@ int maria_create(const char *name, enum data_file_type datafile_type,
MY_UNPACK_FILENAME | (have_iext ? MY_REPLACE_EXT :
MY_APPEND_EXT));
}
fn_format(klinkname, name, "", MARIA_NAME_IEXT,
fn_format(linkname, name, "", MARIA_NAME_IEXT,
MY_UNPACK_FILENAME|MY_APPEND_EXT);
klinkname_ptr= klinkname;
linkname_ptr= linkname;
/*
Don't create the table if the link or file exists to ensure that one
doesn't accidently destroy another table.
......@@ -739,6 +738,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
(MY_UNPACK_FILENAME |
(flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) |
MY_APPEND_EXT);
linkname_ptr= NULL;
/*
Replace the current file.
Don't sync dir now if the data file has the same path.
......@@ -761,7 +761,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
goto err;
}
if ((file= my_create_with_symlink(klinkname_ptr, filename, 0, create_mode,
if ((file= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME|create_flag))) < 0)
goto err;
errpos=1;
......@@ -788,19 +788,20 @@ int maria_create(const char *name, enum data_file_type datafile_type,
MY_UNPACK_FILENAME |
(have_dext ? MY_REPLACE_EXT : MY_APPEND_EXT));
}
fn_format(dlinkname, name, "",MARIA_NAME_DEXT,
fn_format(linkname, name, "",MARIA_NAME_DEXT,
MY_UNPACK_FILENAME | MY_APPEND_EXT);
dlinkname_ptr= dlinkname;
linkname_ptr= linkname;
create_flag=0;
}
else
{
fn_format(filename,name,"", MARIA_NAME_DEXT,
MY_UNPACK_FILENAME | MY_APPEND_EXT);
linkname_ptr= NULL;
create_flag=MY_DELETE_OLD;
}
if ((dfile=
my_create_with_symlink(dlinkname_ptr, filename, 0, create_mode,
my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME | create_flag | sync_dir))) < 0)
goto err;
errpos=3;
......@@ -948,15 +949,15 @@ int maria_create(const char *name, enum data_file_type datafile_type,
not log 1 KB of mostly zeroes if this is a small table.
*/
char empty_string[]= "";
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
uint total_rec_length= 0;
uint i;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= 1 + 2 +
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= 1 + 2 + 2 +
kfile_size_before_extension;
/* we are needing maybe 64 kB, so don't use the stack */
log_data= my_malloc(log_array[TRANSLOG_INTERNAL_PARTS + 0].length, MYF(0));
log_data= my_malloc(log_array[TRANSLOG_INTERNAL_PARTS + 1].length, MYF(0));
if ((log_data == NULL) ||
my_pread(file, 1 + 2 + log_data, kfile_size_before_extension,
my_pread(file, 1 + 2 + 2 + log_data, kfile_size_before_extension,
0, MYF(MY_NABP)))
goto err_no_lock;
/*
......@@ -965,16 +966,21 @@ int maria_create(const char *name, enum data_file_type datafile_type,
*/
log_data[0]= test(flags & HA_DONT_TOUCH_DATA);
int2store(log_data + 1, kfile_size_before_extension);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
int2store(log_data + 1 + 2, share.base.keystart);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char *)name;
/* we store the end-zero, for Recovery to just pass it to my_create() */
log_array[TRANSLOG_INTERNAL_PARTS + 0].length=
strlen(log_array[TRANSLOG_INTERNAL_PARTS + 0].str) + 1;
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= log_data;
/* symlink description is also needed for re-creation by Recovery: */
log_array[TRANSLOG_INTERNAL_PARTS + 1].str=
dlinkname_ptr ? dlinkname : empty_string;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length=
strlen(log_array[TRANSLOG_INTERNAL_PARTS + 1].str);
log_array[TRANSLOG_INTERNAL_PARTS + 2].str=
klinkname_ptr ? klinkname : empty_string;
log_array[TRANSLOG_INTERNAL_PARTS + 2].str= (char *)
(ci->data_file_name ? ci->data_file_name : empty_string);
log_array[TRANSLOG_INTERNAL_PARTS + 2].length=
strlen(log_array[TRANSLOG_INTERNAL_PARTS + 2].str);
strlen(log_array[TRANSLOG_INTERNAL_PARTS + 2].str) + 1;
log_array[TRANSLOG_INTERNAL_PARTS + 3].str= (char *)
(ci->index_file_name ? ci->index_file_name : empty_string);
log_array[TRANSLOG_INTERNAL_PARTS + 3].length=
strlen(log_array[TRANSLOG_INTERNAL_PARTS + 3].str) + 1;
for (i= TRANSLOG_INTERNAL_PARTS;
i < (sizeof(log_array)/sizeof(log_array[0])); i++)
total_rec_length+= log_array[i].length;
......
......@@ -61,21 +61,6 @@
#define COMPRESSED_LSN_MAX_STORE_SIZE (2 + LSN_STORE_SIZE)
#define MAX_NUMBER_OF_LSNS_PER_RECORD 2
/* record parts descriptor */
struct st_translog_parts
{
/* full record length */
translog_size_t record_length;
/* full record length with chunk headers */
translog_size_t total_record_length;
/* current part index */
uint current;
/* total number of elements in parts */
uint elements;
/* array of parts (LEX_STRING) */
LEX_STRING *parts;
};
/* log write buffer descriptor */
struct st_translog_buffer
{
......@@ -176,15 +161,6 @@ static byte end_of_log= 0;
my_bool translog_inited= 0;
/* record classes */
enum record_class
{
LOGRECTYPE_NOT_ALLOWED,
LOGRECTYPE_VARIABLE_LENGTH,
LOGRECTYPE_PSEUDOFIXEDLENGTH,
LOGRECTYPE_FIXEDLENGTH
};
/* chunk types */
#define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */
#define TRANSLOG_CHUNK_FIXED (1 << 6) /* 1 (pseudo)fixed record (also LSN) */
......@@ -196,46 +172,6 @@ enum record_class
/* compressed (relative) LSN constants */
#define TRANSLOG_CLSN_LEN_BITS 0xC0 /* Mask to get compressed LSN length */
typedef my_bool(*prewrite_rec_hook) (enum translog_record_type type,
TRN *trn, struct st_maria_share *share,
struct st_translog_parts *parts);
typedef my_bool(*inwrite_rec_hook) (enum translog_record_type type,
TRN *trn,
LSN *lsn,
struct st_translog_parts *parts);
typedef uint16(*read_rec_hook) (enum translog_record_type type,
uint16 read_length, uchar *read_buff,
byte *decoded_buff);
/*
Descriptor of log record type
Note: Don't reorder because of constructs later...
*/
struct st_log_record_type_descriptor
{
/* internal class of the record */
enum record_class class;
/*
length for fixed-size record, pseudo-fixed record
length with uncompressed LSNs
*/
uint16 fixed_length;
/* how much record body (belonged to headers too) read with headers */
uint16 read_header_len;
/* HOOK for writing the record called before lock */
prewrite_rec_hook prewrite_hook;
/* HOOK for writing the record called when LSN is known, inside lock */
inwrite_rec_hook inwrite_hook;
/* HOOK for reading headers */
read_rec_hook read_hook;
/*
For pseudo fixed records number of compressed LSNs followed by
system header
*/
int16 compressed_LSN;
};
#include <my_atomic.h>
......@@ -257,27 +193,32 @@ static my_bool write_hook_for_undo(enum translog_record_type type,
NOTE that after first public Maria release, these can NOT be changed
*/
typedef struct st_log_record_type_descriptor LOG_DESC;
static LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
static LOG_DESC INIT_LOGREC_FIXED_RECORD_0LSN_EXAMPLE=
{LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0};
{LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0,
"fixed0example", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0,
"variable0example", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_FIXED_RECORD_1LSN_EXAMPLE=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 7, 7, NULL, NULL, NULL, 1};
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 7, 7, NULL, NULL, NULL, 1,
"fixed1example", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_1LSN_EXAMPLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 12, NULL, NULL, NULL, 1};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 12, NULL, NULL, NULL, 1,
"variable1example", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_FIXED_RECORD_2LSN_EXAMPLE=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 23, 23, NULL, NULL, NULL, 2};
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 23, 23, NULL, NULL, NULL, 2,
"fixed2example", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_2LSN_EXAMPLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 19, NULL, NULL, NULL, 2};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 19, NULL, NULL, NULL, 2,
"variable2example", FALSE, NULL, NULL};
void example_loghandler_init()
......@@ -298,126 +239,158 @@ void example_loghandler_init()
static LOG_DESC INIT_LOGREC_RESERVED_FOR_CHUNKS23=
{LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0 };
{LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0,
"reserved", FALSE, NULL, NULL };
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_HEAD=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL,
write_hook_for_redo, NULL, 0};
write_hook_for_redo, NULL, 0,
"redo_insert_row_head", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_TAIL=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL,
write_hook_for_redo, NULL, 0};
write_hook_for_redo, NULL, 0,
"redo_insert_row_tail", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOB=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, write_hook_for_redo, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, write_hook_for_redo, NULL, 0,
"redo_insert_row_blob", FALSE, NULL, NULL};
/*QQQ:TODO:header???*/
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOBS=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, write_hook_for_redo, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, FILEID_STORE_SIZE, NULL,
write_hook_for_redo, NULL, 0,
"redo_insert_row_blobs", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_HEAD=
{LOGRECTYPE_FIXEDLENGTH,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0};
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_row_head", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_TAIL=
{LOGRECTYPE_FIXEDLENGTH,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0};
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_row_tail", FALSE, NULL, NULL};
/* QQQ: TODO: variable and fixed size??? */
static LOG_DESC INIT_LOGREC_REDO_PURGE_BLOCKS=
{LOGRECTYPE_VARIABLE_LENGTH,
0,
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + PAGE_STORE_SIZE +
PAGERANGE_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0};
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_blocks", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_DELETE_ROW=
{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0};
{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0,
"redo_delete_row", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_UPDATE_ROW_HEAD=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0,
"redo_update_row_head", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_INDEX=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0,
"redo_index", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW=
{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0};
{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0,
"redo_undelete_row", FALSE, NULL, NULL};
static LOG_DESC INIT_LOGREC_CLR_END=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, write_hook_for_redo, NULL, 1};
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, write_hook_for_redo, NULL, 1,
"clr_end", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_PURGE_END=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1};
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1,
"purge_end", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_ROW_INSERT=
{LOGRECTYPE_FIXEDLENGTH,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_undo, NULL, 0};
NULL, write_hook_for_undo, NULL, 0,
"undo_row_insert", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_ROW_DELETE=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_undo, NULL, 0};
NULL, write_hook_for_undo, NULL, 0,
"undo_row_delete", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_ROW_UPDATE=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_undo, NULL, 1};
NULL, write_hook_for_undo, NULL, 1,
"undo_row_update", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_ROW_PURGE=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, LSN_STORE_SIZE, LSN_STORE_SIZE,
NULL, NULL, NULL, 1};
NULL, NULL, NULL, 1,
"undo_row_purge", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_KEY_INSERT=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 10, NULL, write_hook_for_undo, NULL, 1};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 10, NULL, write_hook_for_undo, NULL, 1,
"undo_key_insert", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_KEY_DELETE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 15, NULL, write_hook_for_undo, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 15, NULL, write_hook_for_undo, NULL, 0,
"undo_key_delete", TRUE, NULL, NULL}; // QQ: why not compressed?
static LOG_DESC INIT_LOGREC_PREPARE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
"prepare", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_PREPARE_WITH_UNDO_PURGE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 5, NULL, NULL, NULL, 1};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 5, NULL, NULL, NULL, 1,
"prepare_with_undo_purge", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_COMMIT=
{LOGRECTYPE_FIXEDLENGTH, 0, 0, NULL, NULL, NULL, 0};
{LOGRECTYPE_FIXEDLENGTH, 0, 0, NULL, NULL, NULL, 0,
"commit", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_COMMIT_WITH_UNDO_PURGE=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1};
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1,
"commit_with_undo_purge", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_CHECKPOINT=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
"checkpoint", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_CREATE_TABLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 1 + 2, NULL, NULL, NULL, 0,
"redo_create_table", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_RENAME_TABLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
"redo_rename_table", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_DROP_TABLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
"redo_drop_table", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_DELETE_ALL=
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE, FILEID_STORE_SIZE,
NULL, NULL, NULL, 0};
NULL, NULL, NULL, 0,
"redo_delete_all", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_REPAIR_TABLE=
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE + 4, FILEID_STORE_SIZE + 4,
NULL, NULL, NULL, 0};
NULL, NULL, NULL, 0,
"redo_repair_table", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_FILE_ID=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 4, NULL, NULL, NULL, 0};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 2, NULL, NULL, NULL, 0,
"file_id", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_LONG_TRANSACTION_ID=
{LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0};
{LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0,
"long_transaction_id", TRUE, NULL, NULL};
const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL;
......@@ -701,7 +674,7 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
{
DBUG_ENTER("translog_buffer_init");
/* This buffer offset */
buffer->last_lsn= CONTROL_FILE_IMPOSSIBLE_LSN;
buffer->last_lsn= LSN_IMPOSSIBLE;
/* This Buffer File */
buffer->file= -1;
buffer->overlay= 0;
......@@ -779,7 +752,7 @@ static my_bool translog_create_new_file()
translog_write_file_header())
DBUG_RETURN(1);
if (ma_control_file_write_and_force(CONTROL_FILE_IMPOSSIBLE_LSN, file_no,
if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, file_no,
CONTROL_FILE_UPDATE_ONLY_LOGNO))
DBUG_RETURN(1);
......@@ -1206,7 +1179,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
(ulong) LSN_OFFSET(log_descriptor.horizon),
(ulong) LSN_OFFSET(log_descriptor.horizon)));
DBUG_ASSERT(buffer_no == buffer->buffer_no);
buffer->last_lsn= CONTROL_FILE_IMPOSSIBLE_LSN;
buffer->last_lsn= LSN_IMPOSSIBLE;
buffer->offset= log_descriptor.horizon;
buffer->file= log_descriptor.log_file_num[0];
buffer->overlay= 0;
......@@ -2088,7 +2061,7 @@ my_bool translog_init(const char *directory,
i, (ulong) log_descriptor.buffers + i));
}
logs_found= (last_logno != CONTROL_FILE_IMPOSSIBLE_FILENO);
logs_found= (last_logno != FILENO_IMPOSSIBLE);
if (logs_found)
{
......@@ -2100,7 +2073,7 @@ my_bool translog_init(const char *directory,
find the log end
*/
if (LSN_FILE_NO(last_checkpoint_lsn) == CONTROL_FILE_IMPOSSIBLE_FILENO)
if (LSN_FILE_NO(last_checkpoint_lsn) == FILENO_IMPOSSIBLE)
{
DBUG_ASSERT(LSN_OFFSET(last_checkpoint_lsn) == 0);
/* there was no checkpoints we will read from the beginning */
......@@ -2138,7 +2111,7 @@ my_bool translog_init(const char *directory,
/* TODO: check page size */
last_valid_page= CONTROL_FILE_IMPOSSIBLE_LSN;
last_valid_page= LSN_IMPOSSIBLE;
/* scan and validate pages */
do
{
......@@ -2186,7 +2159,7 @@ my_bool translog_init(const char *directory,
current_page= LSN_REPLACE_OFFSET(current_page, TRANSLOG_PAGE_SIZE);
} while (LSN_FILE_NO(current_page) <= LSN_FILE_NO(last_page) &&
!old_log_was_recovered);
if (last_valid_page == CONTROL_FILE_IMPOSSIBLE_LSN)
if (last_valid_page == LSN_IMPOSSIBLE)
{
/* Panic!!! Even page which should be valid is invalid */
/* TODO: issue error */
......@@ -2272,7 +2245,7 @@ my_bool translog_init(const char *directory,
open_logfile_by_number_no_cache(1)) == -1 ||
translog_write_file_header())
DBUG_RETURN(1);
if (ma_control_file_write_and_force(CONTROL_FILE_IMPOSSIBLE_LSN, 1,
if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, 1,
CONTROL_FILE_UPDATE_ONLY_LOGNO))
DBUG_RETURN(1);
/* assign buffer 0 */
......@@ -2405,7 +2378,7 @@ void translog_destroy()
1 Error
*/
static my_bool translog_lock()
my_bool translog_lock()
{
struct st_translog_buffer *current_buffer;
DBUG_ENTER("translog_lock");
......@@ -2438,7 +2411,7 @@ static my_bool translog_lock()
1 Error
*/
static inline my_bool translog_unlock()
my_bool translog_unlock()
{
DBUG_ENTER("translog_unlock");
translog_buffer_unlock(log_descriptor.bc.buffer);
......@@ -4312,14 +4285,14 @@ my_bool translog_write_record(LSN *lsn,
}
if (unlikely(!(trn->first_undo_lsn & TRANSACTION_LOGGED_LONG_ID)))
{
LSN lsn;
LSN dummy_lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[6];
int6store(log_data, trn->trid);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
trn->first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; /* no recursion */
if (unlikely(translog_write_record(&lsn, LOGREC_LONG_TRANSACTION_ID,
if (unlikely(translog_write_record(&dummy_lsn, LOGREC_LONG_TRANSACTION_ID,
trn, NULL, sizeof(log_data),
sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL)))
......@@ -4404,6 +4377,8 @@ my_bool translog_write_record(LSN *lsn,
}
}
DBUG_PRINT("info", ("LSN: (%lu,0x%lx)", (ulong) LSN_FILE_NO(*lsn),
(ulong) LSN_OFFSET(*lsn)));
DBUG_RETURN(rc);
}
......@@ -5093,7 +5068,7 @@ translog_read_record_header_scan(TRANSLOG_SCANNER_DATA
- it is like translog_read_record_header, but read next record, so see
its NOTES.
- in case of end of the log buff->lsn will be set to
(CONTROL_FILE_IMPOSSIBLE_LSN)
(LSN_IMPOSSIBLE)
RETURN
0 error
......@@ -5138,7 +5113,7 @@ translog_size_t translog_read_next_record_header(TRANSLOG_SCANNER_DATA
if (scanner->page[scanner->page_offset] == 0)
{
/* Last record was read */
buff->lsn= CONTROL_FILE_IMPOSSIBLE_LSN;
buff->lsn= LSN_IMPOSSIBLE;
/* Return 'end of log' marker */
DBUG_RETURN(TRANSLOG_RECORD_HEADER_MAX_SIZE + 1);
}
......@@ -5300,7 +5275,7 @@ translog_size_t translog_read_record(LSN lsn,
if (data == NULL)
{
DBUG_ASSERT(lsn != CONTROL_FILE_IMPOSSIBLE_LSN);
DBUG_ASSERT(lsn != LSN_IMPOSSIBLE);
data= &internal_data;
}
if (lsn ||
......@@ -5739,7 +5714,7 @@ int translog_assign_id_to_share(MARIA_SHARE *share, TRN *trn)
strlen()
*/
log_array[TRANSLOG_INTERNAL_PARTS + 1].length=
strlen(share->open_file_name);
strlen(share->open_file_name) + 1;
if (unlikely(translog_write_record(&lsn, LOGREC_FILE_ID, trn, share,
sizeof(log_data) +
log_array[TRANSLOG_INTERNAL_PARTS +
......@@ -5773,3 +5748,15 @@ void translog_deassign_id_from_share(MARIA_SHARE *share)
my_atomic_storeptr((void **)&id_to_share[share->id], 0);
my_atomic_rwlock_rdunlock(&LOCK_id_to_share);
}
/**
@brief returns the LSN of the first record starting in this log
@note so far works only for the very first log created on this system
*/
LSN first_lsn_in_log()
{
return MAKE_LSN(1, TRANSLOG_PAGE_SIZE + log_descriptor.page_overhead);
}
// TODO copyright
#ifndef _ma_loghandler_h
#define _ma_loghandler_h
/* transaction log default cache size (TODO: make it global variable) */
#define TRANSLOG_PAGECACHE_SIZE 1024*1024*2
/* transaction log default file size (TODO: make it global variable) */
......@@ -20,6 +25,7 @@
#define TRANSLOG_PAGE_SIZE (8*1024)
#include "ma_loghandler_lsn.h"
#include "trnman_public.h"
/* short transaction ID type */
typedef uint16 SHORT_TRANSACTION_ID;
......@@ -41,6 +47,10 @@ struct st_maria_share;
#define page_store(T,A) int5store(T,A)
#define dirpos_store(T,A) ((*(uchar*) (T)) = A)
#define pagerange_store(T,A) int2store(T,A)
#define fileid_korr(P) uint2korr(P)
#define page_korr(P) uint5korr(P)
#define dirpos_korr(P) (P[0])
#define pagerange_korr(P) uint2korr(P)
/*
Length of disk drive sector size (we assume that writing it
......@@ -228,10 +238,99 @@ extern translog_size_t translog_read_next_record_header(TRANSLOG_SCANNER_DATA
*scanner,
TRANSLOG_HEADER_BUFFER
*buff);
extern my_bool translog_lock();
extern my_bool translog_unlock();
extern void translog_lock_assert_owner();
extern TRANSLOG_ADDRESS translog_get_horizon();
extern int translog_assign_id_to_share(struct st_maria_share *share,
struct st_transaction *trn);
extern void translog_deassign_id_from_share(struct st_maria_share *share);
extern my_bool translog_inited;
/*
all the rest added because of recovery; should we make
ma_loghandler_for_recovery.h ?
*/
extern LSN first_lsn_in_log();
/* record parts descriptor */
struct st_translog_parts
{
/* full record length */
translog_size_t record_length;
/* full record length with chunk headers */
translog_size_t total_record_length;
/* current part index */
uint current;
/* total number of elements in parts */
uint elements;
/* array of parts (LEX_STRING) */
LEX_STRING *parts;
};
typedef my_bool(*prewrite_rec_hook) (enum translog_record_type type,
TRN *trn, struct st_maria_share *share,
struct st_translog_parts *parts);
typedef my_bool(*inwrite_rec_hook) (enum translog_record_type type,
TRN *trn,
LSN *lsn,
struct st_translog_parts *parts);
typedef uint16(*read_rec_hook) (enum translog_record_type type,
uint16 read_length, uchar *read_buff,
byte *decoded_buff);
/* record classes */
enum record_class
{
LOGRECTYPE_NOT_ALLOWED,
LOGRECTYPE_VARIABLE_LENGTH,
LOGRECTYPE_PSEUDOFIXEDLENGTH,
LOGRECTYPE_FIXEDLENGTH
};
/* C++ can't bear that a variable's name is "class" */
#ifndef __cplusplus
/*
Descriptor of log record type
Note: Don't reorder because of constructs later...
*/
typedef struct st_log_record_type_descriptor
{
/* internal class of the record */
enum record_class class;
/*
length for fixed-size record, pseudo-fixed record
length with uncompressed LSNs
*/
uint16 fixed_length;
/* how much record body (belonged to headers too) read with headers */
uint16 read_header_len;
/* HOOK for writing the record called before lock */
prewrite_rec_hook prewrite_hook;
/* HOOK for writing the record called when LSN is known, inside lock */
inwrite_rec_hook inwrite_hook;
/* HOOK for reading headers */
read_rec_hook read_hook;
/*
For pseudo fixed records number of compressed LSNs followed by
system header
*/
int16 compressed_LSN;
/* the rest is for maria_read_log & Recovery */
/** @brief for debug error messages or "maria_read_log" command-line tool */
const char *name;
my_bool record_ends_group;
/* a function to execute when we see the record during the REDO phase */
int (*record_execute_in_redo_phase)(const TRANSLOG_HEADER_BUFFER *);
/* a function to execute when we see the record during the UNDO phase */
int (*record_execute_in_undo_phase)(const TRANSLOG_HEADER_BUFFER *);
} LOG_DESC;
extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
#endif
C_MODE_END
#endif
......@@ -24,7 +24,7 @@ typedef TRANSLOG_ADDRESS LSN;
#define LSN_FILE_NO(L) ((L) >> 32)
/* Gets raw file number part of a LSN/log address */
#define LSN_FINE_NO_PART(L) ((L) & ((int64)0xFFFFFF00000000LL))
#define LSN_FILE_NO_PART(L) ((L) & ((int64)0xFFFFFF00000000LL))
/* Gets record offset of a LSN/log address */
#define LSN_OFFSET(L) ((L) & 0xFFFFFFFFL)
......@@ -33,7 +33,9 @@ typedef TRANSLOG_ADDRESS LSN;
#define MAKE_LSN(F,S) ((((uint64)(F)) << 32) | (S))
/* checks LSN */
#define LSN_VALID(L) DBUG_ASSERT((L) >= 0 && (L) < (uint64)0xFFFFFFFFFFFFFFLL)
#define LSN_VALID(L) \
((LSN_FILE_NO_PART(L) != FILENO_IMPOSSIBLE) && \
(LSN_OFFSET(L) != LOG_OFFSET_IMPOSSIBLE))
/* size of stored LSN on a disk, don't change it! */
#define LSN_STORE_SIZE 7
......@@ -51,7 +53,7 @@ typedef TRANSLOG_ADDRESS LSN;
/* what we need to add to LSN to increase it on one file */
#define LSN_ONE_FILE ((int64)0x100000000LL)
#define LSN_REPLACE_OFFSET(L, S) (LSN_FINE_NO_PART(L) | (S))
#define LSN_REPLACE_OFFSET(L, S) (LSN_FILE_NO_PART(L) | (S))
/*
an 8-byte type whose most significant byte is used for "flags"; 7
......@@ -61,4 +63,7 @@ typedef LSN LSN_WITH_FLAGS;
#define LSN_WITH_FLAGS_TO_LSN(x) (x & ULL(0x00FFFFFFFFFFFFFF))
#define LSN_WITH_FLAGS_TO_FLAGS(x) (x & ULL(0xFF00000000000000))
#define FILENO_IMPOSSIBLE 0 /**< log file's numbering starts at 1 */
#define LOG_OFFSET_IMPOSSIBLE 0 /**< log always has a header */
#define LSN_IMPOSSIBLE 0
#endif
......@@ -587,11 +587,7 @@ static uint pagecache_fwrite(PAGECACHE *pagecache,
DBUG_PRINT("info", ("Log handler call"));
/* TODO: integrate with page format */
lsn= lsn_korr(buffer + PAGE_LSN_OFFSET);
/*
check CONTROL_FILE_IMPOSSIBLE_FILENO &
CONTROL_FILE_IMPOSSIBLE_LOG_OFFSET
*/
DBUG_ASSERT(lsn != 0);
DBUG_ASSERT(LSN_VALID(lsn));
translog_flush(lsn);
}
DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size,
......@@ -2474,7 +2470,7 @@ static void check_and_set_lsn(LSN lsn, PAGECACHE_BLOCK_LINK *block)
lock lock change
pin pin page
first_REDO_LSN_for_page do not set it if it is zero
lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it
lsn if it is not LSN_IMPOSSIBLE (0) and it
is bigger then LSN on the page it will be written on
the page
......@@ -2566,7 +2562,7 @@ void pagecache_unlock(PAGECACHE *pagecache,
pagecache pointer to a page cache data structure
file handler for the file for the block of data to be read
pageno number of the block of data in the file
lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it
lsn if it is not LSN_IMPOSSIBLE (0) and it
is bigger then LSN on the page it will be written on
the page
*/
......@@ -2635,10 +2631,9 @@ void pagecache_unpin(PAGECACHE *pagecache,
link direct link to page (returned by read or write)
lock lock change
pin pin page
first_REDO_LSN_for_page do not set it if it is zero
lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it
is bigger then LSN on the page it will be written on
the page
first_REDO_LSN_for_page do not set it if it is LSN_IMPOSSIBLE (0)
lsn if it is not LSN_IMPOSSIBLE and it is bigger then
LSN on the page it will be written on the page
*/
void pagecache_unlock_by_link(PAGECACHE *pagecache,
......@@ -2681,7 +2676,7 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache,
DBUG_ASSERT(pagecache->can_be_used);
inc_counter_for_resize_op(pagecache);
if (first_REDO_LSN_for_page)
if (first_REDO_LSN_for_page != LSN_IMPOSSIBLE)
{
/*
LOCK_READ_UNLOCK is ok here as the page may have first locked
......@@ -2694,10 +2689,8 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache,
if (block->rec_lsn == 0)
block->rec_lsn= first_REDO_LSN_for_page;
}
if (lsn != 0)
{
if (lsn != LSN_IMPOSSIBLE)
check_and_set_lsn(lsn, block);
}
if (make_lock_and_pin(pagecache, block, lock, pin))
DBUG_ASSERT(0); /* should not happend */
......@@ -2726,7 +2719,7 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache,
pagecache_unpin_by_link()
pagecache pointer to a page cache data structure
link direct link to page (returned by read or write)
lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it
lsn if it is not LSN_IMPOSSIBLE (0) and it
is bigger then LSN on the page it will be written on
the page
*/
......
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
/* Copyright (C) 2006,2007 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......
......@@ -60,7 +60,7 @@ int main(int argc,char *argv[])
if (maria_init() ||
(init_pagecache(maria_pagecache, IO_SIZE*16, 0, 0,
maria_block_size) == 0) ||
ma_control_file_create_or_open() ||
ma_control_file_create_or_open(TRUE) ||
(init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) ||
......
......@@ -224,7 +224,7 @@ int main(int argc, char *argv[])
/* Maria requires that we always have a page cache */
if ((init_pagecache(maria_pagecache, pagecache_size, 0, 0,
maria_block_size) == 0) ||
ma_control_file_create_or_open() ||
ma_control_file_create_or_open(TRUE) ||
(init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) ||
......
/* Copyright (C) 2007 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "maria_def.h"
#include <my_getopt.h>
#define PCACHE_SIZE (1024*1024*10)
#define LOG_FLAGS 0
#define LOG_FILE_SIZE (1024L*1024L)
static PAGECACHE pagecache;
static const char *load_default_groups[]= { "maria_read_log",0 };
static void get_options(int *argc,char * * *argv);
#ifndef DBUG_OFF
static const char *default_dbug_option;
#endif
static my_bool opt_only_display, opt_display_and_apply;
struct TRN_FOR_RECOVERY
{
LSN group_start_lsn, undo_lsn;
TrID long_trid;
};
struct TRN_FOR_RECOVERY all_active_trans[SHORT_TRID_MAX + 1];
MARIA_HA *all_tables[SHORT_TRID_MAX + 1];
static void end_of_redo_phase();
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number);
static int display_and_apply_record(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec);
#define prototype_exec_hook(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
prototype_exec_hook(LONG_TRANSACTION_ID);
prototype_exec_hook(CHECKPOINT);
prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(FILE_ID);
prototype_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_exec_hook(COMMIT);
/*
To implement REDO_DROP_TABLE and REDO_RENAME_TABLE, we would need to go
through the all_tables[] array, find all open instances of the
table-to-drop-or-rename, and remove them from the array.
We however know that in real Recovery, we don't have to handle those log
records at all, same for REDO_CREATE_TABLE.
So for now, we can use this program to replay/debug a sequence of CREATE +
DMLs, but not DROP/RENAME; it is probably enough for a start.
*/
int main(int argc, char **argv)
{
LSN lsn;
char **default_argv;
MY_INIT(argv[0]);
load_defaults("my", load_default_groups, &argc, &argv);
default_argv= argv;
get_options(&argc, &argv);
maria_data_root= ".";
#ifndef DBUG_OFF
#if defined(__WIN__)
default_dbug_option= "d:t:i:O,\\maria_read_log.trace";
#else
default_dbug_option= "d:t:i:o,/tmp/maria_read_log.trace";
#endif
if (argc > 1)
{
DBUG_SET(default_dbug_option);
DBUG_SET_INITIAL(default_dbug_option);
}
#endif
if (maria_init())
{
fprintf(stderr, "Can't init Maria engine (%d)\n", errno);
goto err;
}
/* we don't want to create a control file, it MUST exist */
if (ma_control_file_create_or_open(FALSE))
{
fprintf(stderr, "Can't open control file (%d)\n", errno);
goto err;
}
if (last_logno == FILENO_IMPOSSIBLE)
{
fprintf(stderr, "Can't find any log\n");
goto err;
}
if (init_pagecache(&pagecache, PCACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0)
{
fprintf(stderr, "Got error in init_pagecache() (errno: %d)\n", errno);
goto err;
}
/*
If log handler does not find the "last_logno" log it will return error,
which is good.
But if it finds a log and this log was crashed, it will create a new log,
which is useless. TODO: start log handler in read-only mode.
*/
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
TRANSLOG_DEFAULT_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
goto err;
}
/* install hooks for execution */
#define install_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID);
install_exec_hook(CHECKPOINT);
install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(COMMIT);
if (opt_only_display)
printf("You are using --only-display, NOTHING will be written to disk\n");
lsn= first_lsn_in_log(); /*could also be last_checkpoint_lsn */
TRANSLOG_HEADER_BUFFER rec;
struct st_translog_scanner_data scanner;
uint i= 1;
translog_size_t len= translog_read_record_header(lsn, &rec);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
printf("EOF on the log\n");
goto end;
}
if (translog_init_scanner(lsn, 1, &scanner))
{
fprintf(stderr, "Scanner init failed\n");
goto err;
}
for (;;i++)
{
uint16 sid= rec.short_trid;
const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
display_record_position(log_desc, &rec, i);
/*
A complete group is a set of log records with an "end mark" record
(e.g. a set of REDOs for an operation, terminated by an UNDO for this
operation); if there is no "end mark" record the group is incomplete
and won't be executed.
*/
if (log_desc->record_ends_group)
{
if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
{
/*
There is a complete group for this transaction, containing more than
this event.
*/
printf(" ends a group:\n");
struct st_translog_scanner_data scanner2;
TRANSLOG_HEADER_BUFFER rec2;
len=
translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
fprintf(stderr, "Cannot find record where it should be\n");
goto err;
}
if (translog_init_scanner(rec2.lsn, 1, &scanner2))
{
fprintf(stderr, "Scanner2 init failed\n");
goto err;
}
do
{
if (rec2.short_trid == sid) /* it's in our group */
{
const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
display_record_position(log_desc2, &rec2, 0);
if (display_and_apply_record(log_desc2, &rec2))
goto err;
}
len= translog_read_next_record_header(&scanner2, &rec2);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
fprintf(stderr, "Cannot find record where it should be\n");
goto err;
}
}
while (rec2.lsn < rec.lsn);
translog_free_record_header(&rec2);
/* group finished */
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (display_and_apply_record(log_desc, &rec))
goto err;
}
else /* record does not end group */
{
/* just record the fact, can't know if can execute yet */
if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
{
/* group not yet started */
all_active_trans[sid].group_start_lsn= rec.lsn;
}
}
len= translog_read_next_record_header(&scanner, &rec);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
printf("EOF on the log\n");
goto end;
}
}
translog_free_record_header(&rec);
/*
So we have applied all REDOs.
We may now have unfinished transactions.
I don't think it's this program's job to roll them back:
to roll back and at the same time stay idempotent, it needs to write log
records (without CLRs, 2nd rollback would hit the effects of first
rollback and fail). But this standalone tool is not allowed to write to
the server's transaction log. So we do not roll back anything.
In the real Recovery code, or the code to do "recover after online
backup", yes we will roll back.
*/
end_of_redo_phase();
goto end;
err:
/* don't touch anything more, in case we hit a bug */
exit(1);
end:
maria_end();
free_defaults(default_argv);
my_end(0);
exit(0);
return 0; /* No compiler warning */
}
static struct my_option my_long_options[] =
{
{"only-display", 'o', "display brief info about records's header",
(gptr*) &opt_only_display, (gptr*) &opt_only_display, 0, GET_BOOL, NO_ARG,
0, 0, 0, 0, 0, 0},
{"display-and-apply", 'a',
"like --only-display but displays more info and modifies tables",
(gptr*) &opt_display_and_apply, (gptr*) &opt_display_and_apply, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DBUG_OFF
{"debug", '#', "Output debug log. Often this is 'd:t:o,filename'.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
#endif
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
#include <help_start.h>
static void print_version(void)
{
VOID(printf("%s Ver 1.0 for %s on %s\n",
my_progname, SYSTEM_TYPE, MACHINE_TYPE));
NETWARE_SET_SCREEN_MODE(1);
}
static void usage(void)
{
print_version();
puts("Copyright (C) 2007 MySQL AB");
puts("This software comes with ABSOLUTELY NO WARRANTY. This is free software,");
puts("and you are welcome to modify and redistribute it under the GPL license\n");
puts("Display and apply log records from a MARIA transaction log");
puts("found in the current directory (for now)");
VOID(printf("\nUsage: %s OPTIONS\n", my_progname));
puts("You need to use one of -o or -a");
my_print_help(my_long_options);
print_defaults("my", load_default_groups);
my_print_variables(my_long_options);
}
#include <help_end.h>
static my_bool
get_one_option(int optid __attribute__((unused)),
const struct my_option *opt __attribute__((unused)),
char *argument __attribute__((unused)))
{
/* for now there is nothing special with our options */
return 0;
}
static void get_options(int *argc,char ***argv)
{
int ho_error;
my_progname= argv[0][0];
if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option)))
exit(ho_error);
if ((opt_only_display + opt_display_and_apply) != 1)
{
usage();
exit(1);
}
}
/* very basic info about the record's header */
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number)
{
/*
if number==0, we're going over records which we had already seen and which
form a group, so we indent below the group's end record
*/
printf("%sRecord #%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u)\n",
number ? "" : " ", number,
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn),
rec->short_trid, log_desc->name, rec->type);
}
static int display_and_apply_record(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec)
{
int error;
if (opt_only_display)
return 0;
if (log_desc->record_execute_in_redo_phase == NULL)
{
/* die on all not-yet-handled records :) */
DBUG_ASSERT("one more hook" == "to write");
}
if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
fprintf(stderr, "Got error when executing record\n");
return error;
}
prototype_exec_hook(LONG_TRANSACTION_ID)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
/* abort group of this trn (must be of before a crash) */
LSN gslsn= all_active_trans[sid].group_start_lsn;
if (gslsn != LSN_IMPOSSIBLE)
{
printf("Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (long_trid != 0)
{
LSN ulsn= all_active_trans[sid].undo_lsn;
if (ulsn != LSN_IMPOSSIBLE)
{
fprintf(stderr, "Found an old transaction long_trid %llu short_trid %u"
" with same short id as this new transaction, and has neither"
" committed nor rollback (undo_lsn: (%lu,0x%lx))\n", long_trid,
sid, (ulong) LSN_FILE_NO(ulsn), (ulong) LSN_OFFSET(ulsn));
goto err;
}
}
long_trid= uint6korr(rec->header);
all_active_trans[sid].long_trid= long_trid;
printf("Transaction long_trid %lu short_trid %u starts\n", long_trid, sid);
goto end;
err:
DBUG_ASSERT(0);
return 1;
end:
return 0;
}
prototype_exec_hook(CHECKPOINT)
{
/* the only checkpoint we care about was found via control file, ignore */
return 0;
}
prototype_exec_hook(REDO_CREATE_TABLE)
{
File dfile= -1, kfile= -1;
char *linkname_ptr, filename[FN_REFLEN];
char *name, *ptr;
myf create_flag;
uint flags;
int error, create_mode= O_RDWR | O_TRUNC;
MARIA_HA *info= NULL;
if (((name= my_malloc(rec->record_length, MYF(MY_WME))) == NULL) ||
(translog_read_record(rec->lsn, 0, rec->record_length, name, NULL) !=
rec->record_length))
{
fprintf(stderr, "Failed to read record\n");
goto err;
}
printf("Table '%s'", name);
/* we try hard to get create_rename_lsn, to avoid mistakes if possible */
info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
if (info)
{
if (!info->s->base.transactional)
{
/*
could be that transactional table was later dropped, and a non-trans
one was renamed to its name, thus create_rename_lsn is 0 and should
not be trusted.
*/
printf(", is not transactional\n");
DBUG_ASSERT(0); /* I want to know this */
goto end;
}
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn),
(ulong) LSN_OFFSET(rec->lsn));
goto end;
}
if (maria_is_crashed(info))
{
printf(", is crashed, overwriting it");
DBUG_ASSERT(0); /* I want to know this */
}
maria_close(info);
info= NULL;
}
/* if does not exist, is older, or its header is corrupted, overwrite it */
// TODO symlinks
ptr= name + strlen(name) + 1;
if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
printf(", we will only touch index file");
fn_format(filename, name, "", MARIA_NAME_IEXT,
(MY_UNPACK_FILENAME |
(flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) |
MY_APPEND_EXT);
linkname_ptr= NULL;
create_flag= MY_DELETE_OLD;
printf(", creating as '%s'", filename);
if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME|create_flag))) < 0)
{
fprintf(stderr, "Failed to create index file\n");
goto err;
}
ptr++;
uint kfile_size_before_extension= uint2korr(ptr);
ptr+= 2;
uint keystart= uint2korr(ptr);
ptr+= 2;
/* set create_rename_lsn (for maria_read_log to be idempotent) */
lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn);
if (my_pwrite(kfile, ptr,
kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
my_chsize(kfile, keystart, 0, MYF(MY_WME)))
{
fprintf(stderr, "Failed to write to index file\n");
goto err;
}
if (!(flags & HA_DONT_TOUCH_DATA))
{
fn_format(filename,name,"", MARIA_NAME_DEXT,
MY_UNPACK_FILENAME | MY_APPEND_EXT);
linkname_ptr= NULL;
create_flag=MY_DELETE_OLD;
if ((dfile=
my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME | create_flag))) < 0)
{
fprintf(stderr, "Failed to create data file\n");
goto err;
}
/*
we now have an empty data file. To be able to
_ma_initialize_data_file() we need some pieces of the share to be
correctly filled. So we just open the table (fortunately, an empty
data file does not preclude this).
*/
if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
_ma_initialize_data_file(dfile, info->s))
{
fprintf(stderr, "Failed to open new table or write to data file\n");
goto err;
}
}
error= 0;
goto end;
err:
DBUG_ASSERT(0);
error= 1;
end:
printf("\n");
if (kfile >= 0)
error|= my_close(kfile, MYF(MY_WME));
if (dfile >= 0)
error|= my_close(dfile, MYF(MY_WME));
if (info != NULL)
error|= maria_close(info);
my_free(name, MYF(MY_ALLOW_ZERO_PTR));
return 0;
}
prototype_exec_hook(FILE_ID)
{
uint16 sid;
int error;
char *name, *buff;
MARIA_HA *info= NULL;
if (((buff= my_malloc(rec->record_length, MYF(MY_WME))) == NULL) ||
(translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) !=
rec->record_length))
{
fprintf(stderr, "Failed to read record\n");
goto err;
}
sid= fileid_korr(buff);
name= buff + FILEID_STORE_SIZE;
printf("Table '%s', id %u", name, sid);
info= all_tables[sid];
if (info != NULL)
{
printf(", closing table '%s'", info->s->open_file_name);
all_tables[sid]= NULL;
info->s->base.transactional= TRUE; /* put back the truth */
if (maria_close(info))
{
fprintf(stderr, "Failed to close table\n");
goto err;
}
}
info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
if (info == NULL)
{
printf(", is absent (must have been dropped later?)"
" or its header is so corrupted that we cannot open it;"
" we skip it\n");
goto end;
}
if (maria_is_crashed(info))
{
fprintf(stderr, "Table is crashed, can't apply log records to it\n");
goto err;
}
if (!info->s->base.transactional)
{
printf(", is not transactional\n");
DBUG_ASSERT(0); /* I want to know this */
goto end;
}
all_tables[sid]= info;
/*
don't log any records for this work. TODO make sure this variable does not
go to disk before we restore it to its true value.
*/
info->s->base.transactional= FALSE;
printf(", opened\n");
error= 0;
goto end;
err:
DBUG_ASSERT(0);
error= 1;
if (info != NULL)
error|= maria_close(info);
end:
my_free(buff, MYF(MY_ALLOW_ZERO_PTR));
return 0;
}
prototype_exec_hook(REDO_INSERT_ROW_HEAD)
{
uint16 sid;
ulonglong page;
MARIA_HA *info;
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
printf("For page %llu of table of short id %u", page, sid);
info= all_tables[sid];
if (info == NULL)
{
printf(", table skipped, so skipping record\n");
goto end;
}
printf(", '%s'", info->s->open_file_name);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
goto end;
}
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
*/
printf(", applying record\n");
DBUG_ASSERT("Monty" == "this is the place");
end:
/* as we don't have apply working: */
return 1;
}
prototype_exec_hook(COMMIT)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
if (long_trid == 0)
{
printf("We don't know about transaction short_trid %u;"
"it probably committed long ago, forget it\n", sid);
return 0;
}
printf("Transaction long_trid %lu short_trid %u committed", long_trid, sid);
if (gslsn != LSN_IMPOSSIBLE)
{
/*
It's not an error, it may be that trn got a disk error when writing to a
table, so an unfinished group staid in the log.
*/
printf(", with group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
printf("\n");
all_active_trans[sid].long_trid= 0;
#ifdef MARIA_VERSIONING
/*
if real recovery:
transaction was committed, move it to some separate list for later
purging (but don't purge now! purging may have been started before, we
may find REDO_PURGE records soon).
*/
#endif
return 0;
}
/* Just to inform about any aborted groups or unfinished transactions */
static void end_of_redo_phase()
{
uint sid;
for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
{
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
if (long_trid == 0)
continue;
if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
printf("Transaction long_trid %lu short_trid %u unfinished\n",
long_trid, sid);
if (gslsn != LSN_IMPOSSIBLE)
{
printf("Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
}
/* If real recovery: roll back unfinished transaction */
#ifdef MARIA_VERSIONING
/*
If real recovery: transaction was committed, move it to some separate
list for soon purging.
*/
#endif
}
}
......@@ -20,6 +20,9 @@
to include my_atomic.h in C++ code.
*/
#ifndef _trnman_public_h
#define _trnman_public_h
#include "ma_loghandler_lsn.h"
C_MODE_START
......@@ -52,3 +55,4 @@ my_bool trnman_has_locked_tables(TRN *trn);
void trnman_reset_locked_tables(TRN *trn);
C_MODE_END
#endif
......@@ -121,8 +121,8 @@ static int delete_file(myf my_flags)
The error will however be printed on stderr.
*/
my_delete(file_name, my_flags);
expect_checkpoint_lsn= CONTROL_FILE_IMPOSSIBLE_LSN;
expect_logno= CONTROL_FILE_IMPOSSIBLE_FILENO;
expect_checkpoint_lsn= LSN_IMPOSSIBLE;
expect_logno= FILENO_IMPOSSIBLE;
return 0;
}
......@@ -146,9 +146,9 @@ static int verify_module_values_match_expected()
*/
static int verify_module_values_are_impossible()
{
RET_ERR_UNLESS(last_logno == CONTROL_FILE_IMPOSSIBLE_FILENO);
RET_ERR_UNLESS(last_logno == FILENO_IMPOSSIBLE);
RET_ERR_UNLESS(last_checkpoint_lsn ==
CONTROL_FILE_IMPOSSIBLE_LSN);
LSN_IMPOSSIBLE);
return 0;
}
......@@ -164,7 +164,7 @@ static int close_file()
static int create_or_open_file()
{
RET_ERR_UNLESS(ma_control_file_create_or_open() == CONTROL_FILE_OK);
RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) == CONTROL_FILE_OK);
/* Check that the module reports expected information */
RET_ERR_UNLESS(verify_module_values_match_expected() == 0);
return 0;
......@@ -188,7 +188,7 @@ static int test_one_log()
RET_ERR_UNLESS(create_or_open_file() == CONTROL_FILE_OK);
objs_to_write= CONTROL_FILE_UPDATE_ONLY_LOGNO;
expect_logno= 123;
RET_ERR_UNLESS(write_file(CONTROL_FILE_IMPOSSIBLE_LSN,
RET_ERR_UNLESS(write_file(LSN_IMPOSSIBLE,
expect_logno,
objs_to_write) == 0);
RET_ERR_UNLESS(close_file() == 0);
......@@ -206,7 +206,7 @@ static int test_five_logs()
for (i= 0; i<5; i++)
{
expect_logno*= 3;
RET_ERR_UNLESS(write_file(CONTROL_FILE_IMPOSSIBLE_LSN, expect_logno,
RET_ERR_UNLESS(write_file(LSN_IMPOSSIBLE, expect_logno,
objs_to_write) == 0);
}
RET_ERR_UNLESS(close_file() == 0);
......@@ -320,7 +320,7 @@ static int test_bad_magic_string()
RET_ERR_UNLESS(my_pwrite(fd, "papa", 4, 0, MYF(MY_FNABP | MY_WME)) == 0);
/* Check that control file module sees the problem */
RET_ERR_UNLESS(ma_control_file_create_or_open() ==
RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) ==
CONTROL_FILE_BAD_MAGIC_STRING);
/* Restore magic string */
RET_ERR_UNLESS(my_pwrite(fd, buffer, 4, 0, MYF(MY_FNABP | MY_WME)) == 0);
......@@ -346,7 +346,7 @@ static int test_bad_checksum()
buffer[0]+= 3; /* mangle checksum */
RET_ERR_UNLESS(my_pwrite(fd, buffer, 1, 8, MYF(MY_FNABP | MY_WME)) == 0);
/* Check that control file module sees the problem */
RET_ERR_UNLESS(ma_control_file_create_or_open() ==
RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) ==
CONTROL_FILE_BAD_CHECKSUM);
/* Restore checksum */
buffer[0]-= 3;
......@@ -369,10 +369,11 @@ static int test_bad_size()
MYF(MY_WME))) >= 0);
RET_ERR_UNLESS(my_write(fd, buffer, 10, MYF(MY_FNABP | MY_WME)) == 0);
/* Check that control file module sees the problem */
RET_ERR_UNLESS(ma_control_file_create_or_open() == CONTROL_FILE_TOO_SMALL);
RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) ==
CONTROL_FILE_TOO_SMALL);
RET_ERR_UNLESS(my_write(fd, buffer, 30, MYF(MY_FNABP | MY_WME)) == 0);
/* Check that control file module sees the problem */
RET_ERR_UNLESS(ma_control_file_create_or_open() == CONTROL_FILE_TOO_BIG);
RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) == CONTROL_FILE_TOO_BIG);
RET_ERR_UNLESS(my_close(fd, MYF(MY_WME)) == 0);
/* Leave a correct control file */
......
......@@ -164,7 +164,7 @@ int main(int argc __attribute__((unused)), char *argv[])
}
#endif
if (ma_control_file_create_or_open())
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1);
......@@ -336,7 +336,7 @@ int main(int argc __attribute__((unused)), char *argv[])
ma_control_file_end();
if (ma_control_file_create_or_open())
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "pass2: Can't init control file (%d)\n", errno);
exit(1);
......@@ -398,7 +398,7 @@ int main(int argc __attribute__((unused)), char *argv[])
i, errno);
goto err;
}
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN)
if (rec.lsn == LSN_IMPOSSIBLE)
{
if (i != ITERATIONS)
{
......@@ -477,7 +477,7 @@ int main(int argc __attribute__((unused)), char *argv[])
"failed (%d)\n", i, errno);
goto err;
}
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN)
if (rec.lsn == LSN_IMPOSSIBLE)
{
fprintf(stderr, "EOL met at the middle of iteration (first var) %u "
"instead of beginning of %u\n", i, ITERATIONS);
......@@ -572,7 +572,7 @@ int main(int argc __attribute__((unused)), char *argv[])
i, errno);
goto err;
}
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN)
if (rec.lsn == LSN_IMPOSSIBLE)
{
fprintf(stderr, "EOL met at the middle of iteration %u "
"instead of beginning of %u\n", i, ITERATIONS);
......
......@@ -161,7 +161,7 @@ int main(int argc __attribute__((unused)), char *argv[])
}
#endif
if (ma_control_file_create_or_open())
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1);
......@@ -325,7 +325,7 @@ int main(int argc __attribute__((unused)), char *argv[])
end_pagecache(&pagecache, 1);
ma_control_file_end();
if (ma_control_file_create_or_open())
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "pass2: Can't init control file (%d)\n", errno);
exit(1);
......@@ -390,7 +390,7 @@ int main(int argc __attribute__((unused)), char *argv[])
translog_free_record_header(&rec);
goto err;
}
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN)
if (rec.lsn == LSN_IMPOSSIBLE)
{
if (i != ITERATIONS)
{
......@@ -470,7 +470,7 @@ int main(int argc __attribute__((unused)), char *argv[])
"failed (%d)\n", i, errno);
goto err;
}
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN)
if (rec.lsn == LSN_IMPOSSIBLE)
{
fprintf(stderr, "EOL met at the middle of iteration (first var) %u "
"instead of beginning of %u\n", i, ITERATIONS);
......@@ -568,7 +568,7 @@ int main(int argc __attribute__((unused)), char *argv[])
translog_free_record_header(&rec);
goto err;
}
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN)
if (rec.lsn == LSN_IMPOSSIBLE)
{
fprintf(stderr, "EOL met at the middle of iteration %u "
"instead of beginning of %u\n", i, ITERATIONS);
......
......@@ -270,7 +270,7 @@ int main(int argc __attribute__((unused)),
my_thread_global_init();
if (ma_control_file_create_or_open())
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1);
......@@ -384,7 +384,7 @@ int main(int argc __attribute__((unused)),
translog_free_record_header(&rec);
goto err;
}
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN)
if (rec.lsn == LSN_IMPOSSIBLE)
{
if (i != WRITERS * ITERATIONS * 2)
{
......
......@@ -56,7 +56,7 @@ int main(int argc __attribute__((unused)), char *argv[])
}
#endif
if (ma_control_file_create_or_open())
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1);
......
......@@ -75,6 +75,7 @@ int mi_close(register MI_INFO *info)
not change the crashed state.
We can NOT write the state in other cases as other threads
may be using the file at this point
IF using --external-locking.
*/
if (share->mode != O_RDONLY && mi_is_crashed(info))
mi_state_info_write(share->kfile, &share->state, 1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment