Commit 198a1b6f authored by unknown's avatar unknown

WL#3072 - Maria recovery.

* Recovery of the table's live checksum (CREATE TABLE ... CHECKSUM=1)
is achieved in this patch. The table's live checksum
(info->s->state.state.checksum) is updated in inwrite_rec_hook's
under the log mutex when writing UNDO_ROW_INSERT|UPDATE|DELETE
and REDO_DELETE_ALL. The checksum variation caused by the operation
is stored in these UNDOs, so that the REDO phase, when it sees such
UNDOs, can update the live checksum if it is older (state.is_of_lsn is
lower) than the record. It is also used, as a nice add-on with no
cost, to do less row checksum computation during the UNDO phase
(as we have it in the record already).
Doing this work, it became pressing to move in-write hooks
(write_hook_for_redo() et al) to ma_blockrec.c.
The 'parts' argument of inwrite_rec_hook is unpredictable (it comes
mangled at this stage, for example by LSN compression) so it is
replaced by a 'void* hook_arg', which is used to pass down information,
currently only to write_hook_for_clr_end() (previous undo_lsn and
type of undone record).
* If from ha_maria, we print to stderr how many seconds (with one
fractional digit) the REDO phase took, same for UNDO phase and for
final table close. Just to give an indication for debugging and maybe
also for Support.


storage/maria/ha_maria.cc:
  question for Monty
storage/maria/ma_blockrec.c:
  * log in-write hooks (write_hook_for_redo() etc) move from
  ma_loghandler.c to here; this is natural: the hooks are coupled
  to their callers (functions in ma_blockrec.c).
  * translog_write_record() now has a new argument "hook_arg";
  using it to pass down to write_hook_for_clr_end() the transaction's
  previous_undo_lsn and the type of the being undone record, and also
  to pass down to all UNDOs the live checksum variation caused by the
  operation.
  * If table has live checksum, store in UNDO_ROW_INSERT|UPDATE|DELETE
  and in CLR_END the checksum variation ("delta") caused by the
  operation. For example if a DELETE caused the table's live checksum
  to change from 123 to 456, we store in the UNDO_ROW_DELETE, in 4 bytes,
  the value 333 (456-123).
  * Instead of hard-coded "1" as length of the place where we store
  the undone record's type in CLR_END, use a symbol CLR_TYPE_STORE_SIZE;
  use macros clr_type_store and clr_type_korr.
  * write_block_record() has a new parameter 'old_record_checksum'
  which is the pre-computed checksum of old_record; that value is used
  to update the table's live checksum when writing UNDO_ROW_UPDATE|CLR_END.
  * In allocate_write_block_record(), if we are executing UNDO_ROW_DELETE
  the row's checksum is already computed.
  * _ma_update_block_record2() now expect the new row's checksum into
  cur_row.checksum (was already true) and the old row's checksum into
  new_row.checksum (that's new). Its two callers, maria_update() and
  _ma_apply_undo_row_update(), honour this.
  * When executing an UNDO_ROW_INSERT|UPDATE|DELETE in UNDO phase, pick
  up the checksum delta from the log record. It is then used to update
  the table's live checksum when writing CLR_END, and saves us a
  computation of record.
storage/maria/ma_blockrec.h:
  in-write hooks move from ma_loghandler.c
storage/maria/ma_check.c:
  more straightforward size of buffer
storage/maria/ma_checkpoint.c:
  <= is enough
storage/maria/ma_commit.c:
  new prototype of translog_write_record()
storage/maria/ma_create.c:
  new prototype of translog_write_record()
storage/maria/ma_delete.c:
  The row's checksum must be computed before calling(*delete_record)(),
  not after, because it must be known inside _ma_delete_block_record()
  (to update the table's live checksum when writing UNDO_ROW_DELETE).
  If deleting from a transactional table, live checksum was already updated
  when writing UNDO_ROW_DELETE.
storage/maria/ma_delete_all.c:
  @todo is now done (in ma_loghandler.c)
storage/maria/ma_delete_table.c:
  new prototype of translog_write_record()
storage/maria/ma_loghandler.c:
  * in-write hooks move to ma_blockrec.c.
  * translog_write_record() gets a new argument 'hook_arg' which is
  passed down to pre|inwrite_rec_hook. It is more useful that 'parts'
  for those hooks, because when those hooks are called, 'parts' has
  possibly been mangled (like with LSN compression) and is so
  unpredictable.
  * fix for compiler warning (unused buffer_start when compiling without
  debug support)
  * Because checksum delta is stored into UNDO_ROW_INSERT|UPDATE|DELETE
  and CLR_END, but only if the table has live checksum, these records
  are not PSEUDOFIXEDLENGTH anymore, they are now VARIABLE_LENGTH (their
  length is X if no live checksum and X+4 otherwise).
  * add an inwrite_rec_hook for UNDO_ROW_UPDATE, which updates the
  table's live checksum. Update it also in hooks of UNDO_ROW_INSERT|
  DELETE and REDO_DELETE_ALL and CLR_END.
  * Bugfix: when reading a record in translog_read_record(), it happened
  that "length" became negative, because the function assumed that
  the record extended beyond the page's end, whereas it may be shorter.
storage/maria/ma_loghandler.h:
  * Instead of hard-coded "1" and "4", use symbols and macros
  to store/retrieve the type of record which the CLR_END corresponds
  to, and the checksum variation caused by the operation which logs the
  record
  * translog_write_record() gets a new argument 'hook_arg' which is
  passed down to pre|inwrite_rec_hook. It is more useful that 'parts'
  for those hooks, because when those hooks are called, 'parts' has
  possibly been mangled (like with LSN compression) and is so
  unpredictable.
storage/maria/ma_open.c:
  fix for "empty body in if() statement" (when compiling without safemutex)
storage/maria/ma_pagecache.c:
  <= is enough
storage/maria/ma_recovery.c:
  * print the time that each recovery phase (REDO/UNDO/flush) took;
  this is enabled only when recovering from ha_maria. Is it printed
  n seconds with a fractional part of one digit (like 123.4 seconds).
  * In the REDO phase, update the table's live checksum by using
  the checksum delta stored in UNDO_ROW_INSERT|DELETE|UPDATE and CLR_END.
  Update it too when seeing REDO_DELETE_ALL.
  * In the UNDO phase, when executing UNDO_ROW_INSERT, if the table does
  not have live checksum then reading the record's header (as done by
  the master loop of run_undo_phase()) is enough; otherwise we
  do a translog_read_record() to have the checksum delta ready
  for _ma_apply_undo_row_insert().
  * When at the end of the REDO phase we notice that there is an unfinished
  group of REDOs, don't assert in debug binaries, as I verified that it
  can happen in real life (with kill -9)
  * removing ' in #error as it confuses gcc3
storage/maria/ma_rename.c:
  new prototype of translog_write_record()
storage/maria/ma_test_recovery.expected:
  Change in output of ma_test_recovery: now all live checksums of
  original tables equal those of tables recreated by the REDO phase
  and those of tables fixed by the UNDO phase. I.e. recovery of
  the live checksum looks like working (which was after all the only
  goal of this changeset).
  I checked by hand that it's not just all live checksums which are
  now 0 and that's why they match. They are the old values like
  3757530372. maria.test has hard-coded checksum values in its result
  file so checks this too.
storage/maria/ma_update.c:
  * It's useless to put up HA_STATE_CHANGED in 'key_changed',
  as we put up HA_STATE_CHANGED in info->update anyway.
  * We need to compute the old and new rows' checksum before calling
  (*update_record)(), as checksum delta must be known when logging
  UNDO_ROW_UPDATE which is done by _ma_update_block_record(). Note that
  some functions change the 'newrec' record (at least _ma_check_unique()
  does) so we cannot move the checksum computation too early in the
  function.
storage/maria/ma_write.c:
  If inserting into a transactional table, live's checksum was
  already updated when writing UNDO_ROW_INSERT. The multiplication
  is a trick to save an if().
storage/maria/unittest/ma_test_loghandler-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_first_lsn-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_max_lsn-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_noflush-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_purge-t.c:
  new prototype of translog_write_record()
storage/myisam/sort.c:
  fix for compiler warnings in pushbuild (write_merge_key* functions
  didn't have their declaration match MARIA_HA::write_key).
parent da6db764
......@@ -2185,6 +2185,9 @@ int ha_maria::create(const char *name, register TABLE *table_arg,
error;
?
Why fool the user?
Shouldn't this test be pushed down to maria_create()? Because currently,
ma_test1 -T crashes: it creates a table with DYNAMIC_RECORD but has
born_transactional==1, which confuses some recovery-related code.
*/
#endif
create_info.transactional= (row_type == BLOCK_RECORD &&
......
......@@ -300,6 +300,32 @@ typedef struct st_maria_extent_cursor
} MARIA_EXTENT_CURSOR;
/**
@brief Structure for passing down info to write_hook_for_clr_end().
This hooks needs to know the variation of the live checksum caused by the
current operation to update state.checksum under log's mutex,
needs to know the transaction's previous undo_lsn to set
trn->undo_lsn under log mutex, and needs to know the type of UNDO being
undone now to modify state.records under log mutex.
*/
struct st_msg_to_write_hook_for_clr_end
{
LSN previous_undo_lsn;
enum translog_record_type undone_record_type;
ha_checksum checksum_delta;
};
/** S:share,D:checksum_delta,E:expression,P:pointer_into_record,L:length */
#define store_checksum_in_rec(S,D,E,P,L) do \
{ \
D= 0; \
if ((S)->calc_checksum != NULL) \
{ \
D= (E); \
ha_checksum_store(P, D); \
L+= HA_CHECKSUM_STORE_SIZE; \
} \
} while (0)
static my_bool delete_tails(MARIA_HA *info, MARIA_RECORD_POS *tails);
static my_bool delete_head_or_tail(MARIA_HA *info,
ulonglong page, uint record_number,
......@@ -1387,7 +1413,7 @@ static my_bool write_tail(MARIA_HA *info,
if (translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_TAIL,
info->trn, info, sizeof(log_data) + length,
TRANSLOG_INTERNAL_PARTS + 2, log_array,
log_data))
log_data, NULL))
DBUG_RETURN(1);
}
......@@ -1642,7 +1668,7 @@ static my_bool free_full_pages(MARIA_HA *info, MARIA_ROW *row)
if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS, info->trn,
info, sizeof(log_data) + extents_length,
TRANSLOG_INTERNAL_PARTS + 2, log_array,
log_data))
log_data, NULL))
DBUG_RETURN(1);
DBUG_RETURN(_ma_bitmap_free_full_pages(info, row->extents,
......@@ -1689,7 +1715,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data))
log_data, NULL))
res= 1;
}
......@@ -1716,6 +1742,9 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
@param row_pos Position on head page where to put head part of
record
@param undo_lsn <> LSN_ERROR if we are executing an UNDO
@param old_record_checksum Checksum of old_record: ignored if table does
not have live checksum; otherwise if
old_record==NULL it must be 0.
@note
On return all pinned pages are released.
......@@ -1731,7 +1760,8 @@ static my_bool write_block_record(MARIA_HA *info,
MARIA_BITMAP_BLOCKS *bitmap_blocks,
my_bool head_block_is_read,
struct st_row_pos_info *row_pos,
LSN undo_lsn)
LSN undo_lsn,
ha_checksum old_record_checksum)
{
uchar *data, *end_of_data, *tmp_data_used, *tmp_data;
uchar *row_extents_first_part, *row_extents_second_part;
......@@ -1785,7 +1815,10 @@ static my_bool write_block_record(MARIA_HA *info,
if (share->base.pack_fields)
store_key_length_inc(data, row->field_lengths_length);
if (share->calc_checksum)
{
*(data++)= (uchar) (row->checksum); /* store least significant byte */
DBUG_ASSERT(!((old_record_checksum != 0) && (old_record == NULL)));
}
memcpy(data, record, share->base.null_bytes);
data+= share->base.null_bytes;
memcpy(data, row->empty_bits, share->base.pack_bytes);
......@@ -2211,7 +2244,7 @@ static my_bool write_block_record(MARIA_HA *info,
if (translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_HEAD, info->trn,
info, sizeof(log_data) + data_length,
TRANSLOG_INTERNAL_PARTS + 2, log_array,
log_data))
log_data, NULL))
goto disk_err;
}
......@@ -2328,7 +2361,7 @@ static my_bool write_block_record(MARIA_HA *info,
error= translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_BLOBS,
info->trn, info, log_entry_length,
(uint) (log_array_pos - log_array),
log_array, log_data);
log_array, log_data, NULL);
if (log_array != tmp_log_array)
my_free((uchar*) log_array, MYF(0));
if (error)
......@@ -2343,31 +2376,44 @@ static my_bool write_block_record(MARIA_HA *info,
if (undo_lsn != LSN_ERROR)
{
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE + HA_CHECKSUM_STORE_SIZE];
struct st_msg_to_write_hook_for_clr_end msg;
/* undo_lsn must be first for compression to work */
lsn_store(log_data, undo_lsn);
/*
Store if this CLR is about an UNDO_INSERT, UNDO_DELETE or UNDO_UPDATE;
in the first/second case, Recovery, when it sees the CLR_END in the
REDO phase, may decrement/increment the records' count.
Store if this CLR is about UNDO_DELETE or UNDO_UPDATE;
in the first case, Recovery, when it sees the CLR_END in the
REDO phase, may decrement the records' count.
*/
log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE]= old_record ?
LOGREC_UNDO_ROW_UPDATE : LOGREC_UNDO_ROW_DELETE;
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 0].length=
sizeof(log_data) - HA_CHECKSUM_STORE_SIZE;
msg.undone_record_type=
old_record ? LOGREC_UNDO_ROW_UPDATE : LOGREC_UNDO_ROW_DELETE;
clr_type_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
msg.undone_record_type);
msg.previous_undo_lsn= undo_lsn;
store_checksum_in_rec(share, msg.checksum_delta,
row->checksum - old_record_checksum,
log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE,
log_array[TRANSLOG_INTERNAL_PARTS + 0].length);
if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data),
info->trn, info,
log_array[TRANSLOG_INTERNAL_PARTS + 0].length,
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data + LSN_STORE_SIZE))
log_data + LSN_STORE_SIZE, &msg))
goto disk_err;
}
else
{
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE +
HA_CHECKSUM_STORE_SIZE];
ha_checksum checksum_delta;
/* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_INSERT share same header */
/* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_UPDATE share same header */
lsn_store(log_data, info->trn->undo_lsn);
page_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
head_block->page);
......@@ -2376,15 +2422,24 @@ static my_bool write_block_record(MARIA_HA *info,
row_pos->rownr);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 0].length=
sizeof(log_data) - HA_CHECKSUM_STORE_SIZE;
store_checksum_in_rec(share, checksum_delta,
row->checksum - old_record_checksum,
log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
log_array[TRANSLOG_INTERNAL_PARTS + 0].length);
compile_time_assert(sizeof(ha_checksum) == HA_CHECKSUM_STORE_SIZE);
if (!old_record)
{
/* Write UNDO log record for the INSERT */
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_INSERT,
info->trn, info, sizeof(log_data),
info->trn, info,
log_array[TRANSLOG_INTERNAL_PARTS +
0].length,
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data + LSN_STORE_SIZE))
log_data + LSN_STORE_SIZE, &checksum_delta))
goto disk_err;
}
else
......@@ -2397,10 +2452,11 @@ static my_bool write_block_record(MARIA_HA *info,
TRANSLOG_INTERNAL_PARTS + 1,
&row_parts_count);
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_UPDATE, info->trn,
info, sizeof(log_data) + row_length,
info, log_array[TRANSLOG_INTERNAL_PARTS +
0].length + row_length,
TRANSLOG_INTERNAL_PARTS + 1 +
row_parts_count,
log_array, log_data + LSN_STORE_SIZE))
row_parts_count, log_array,
log_data + LSN_STORE_SIZE, &checksum_delta))
goto disk_err;
}
}
......@@ -2517,10 +2573,18 @@ static my_bool allocate_and_write_block_record(MARIA_HA *info,
DBUG_RETURN(1);
row->lastpos= ma_recordpos(blocks->block->page, row_pos.rownr);
if (info->s->calc_checksum)
row->checksum= (info->s->calc_checksum)(info, record);
{
if (undo_lsn == LSN_ERROR)
row->checksum= (info->s->calc_checksum)(info, record);
else
{
/* _ma_apply_undo_row_delete() already set row's checksum. Verify it. */
DBUG_ASSERT(row->checksum == (info->s->calc_checksum)(info, record));
}
}
if (write_block_record(info, (uchar*) 0, record, row,
blocks, blocks->block->org_bitmap_value != 0,
&row_pos, undo_lsn))
&row_pos, undo_lsn, 0))
DBUG_RETURN(1); /* Error reading bitmap */
DBUG_PRINT("exit", ("Rowid: %lu (%lu:%u)", (ulong) row->lastpos,
(ulong) ma_recordpos_to_page(row->lastpos),
......@@ -2592,6 +2656,7 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks;
MARIA_BITMAP_BLOCK *block, *end;
LSN lsn= LSN_IMPOSSIBLE;
MARIA_SHARE *share= info->s;
DBUG_ENTER("_ma_write_abort_block_record");
if (delete_head_or_tail(info,
......@@ -2619,13 +2684,14 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
}
}
if (info->s->now_transactional)
if (share->now_transactional)
{
LSN previous_undo_lsn;
TRANSLOG_HEADER_BUFFER rec;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE + HA_CHECKSUM_STORE_SIZE];
int len;
struct st_msg_to_write_hook_for_clr_end msg;
/*
We do need the code above (delete_head_or_tail() etc) for
non-transactional tables.
......@@ -2644,15 +2710,24 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
goto end;
}
DBUG_ASSERT(rec.type == LOGREC_UNDO_ROW_INSERT);
previous_undo_lsn= lsn_korr(rec.header);
lsn_store(log_data, previous_undo_lsn);
log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE]= LOGREC_UNDO_ROW_INSERT;
memcpy(log_data, rec.header, LSN_STORE_SIZE); /* previous UNDO LSN */
msg.previous_undo_lsn= lsn_korr(rec.header);
msg.undone_record_type= LOGREC_UNDO_ROW_INSERT;
clr_type_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
LOGREC_UNDO_ROW_INSERT);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 0].length=
sizeof(log_data) - HA_CHECKSUM_STORE_SIZE;
store_checksum_in_rec(share, msg.checksum_delta,
- info->cur_row.checksum,
log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE,
log_array[TRANSLOG_INTERNAL_PARTS + 0].length);
if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data),
info->trn, info,
log_array[TRANSLOG_INTERNAL_PARTS + 0].length,
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data + LSN_STORE_SIZE))
log_data + LSN_STORE_SIZE, &msg))
res= 1;
}
end:
......@@ -2685,6 +2760,8 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
uchar *dir;
ulonglong page;
struct st_row_pos_info row_pos;
my_bool res;
ha_checksum old_checksum;
MARIA_SHARE *share= info->s;
DBUG_ENTER("_ma_update_block_record2");
DBUG_PRINT("enter", ("rowid: %lu", (long) record_pos));
......@@ -2694,7 +2771,11 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
DBUG_DUMP("newrec", record, share->base.reclength);
#endif
/* checksum was computed by maria_update() already and put into cur_row */
/*
Checksums of new and old rows were computed by callers already; new
row's was put into cur_row, old row's was put into new_row.
*/
old_checksum= new_row->checksum;
new_row->checksum= cur_row->checksum;
calc_record_size(info, record, new_row);
page= ma_recordpos_to_page(record_pos);
......@@ -2747,8 +2828,9 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
if (cur_row->extents_count && free_full_pages(info, cur_row))
goto err;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks,
1, &row_pos, undo_lsn));
res= write_block_record(info, oldrec, record, new_row, blocks,
1, &row_pos, undo_lsn, old_checksum);
DBUG_RETURN(res);
}
/*
Allocate all size in block for record
......@@ -2781,8 +2863,9 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
row_pos.dir= dir;
row_pos.data= buff + uint2korr(dir);
row_pos.length= head_length;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, 1,
&row_pos, undo_lsn));
res= write_block_record(info, oldrec, record, new_row, blocks, 1,
&row_pos, undo_lsn, old_checksum);
DBUG_RETURN(res);
err:
_ma_unpin_all_pages_and_finalize_row(info, 0);
......@@ -2949,7 +3032,7 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
LOGREC_REDO_PURGE_ROW_TAIL),
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data))
log_data, NULL))
DBUG_RETURN(1);
}
if (pagecache_write(share->pagecache,
......@@ -2976,7 +3059,7 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data))
log_data, NULL))
DBUG_RETURN(1);
}
/* Write the empty page (needed only for REPAIR to work) */
......@@ -3044,6 +3127,7 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
{
ulonglong page;
uint record_number;
MARIA_SHARE *share= info->s;
DBUG_ENTER("_ma_delete_block_record");
page= ma_recordpos_to_page(info->cur_row.lastpos);
......@@ -3058,13 +3142,14 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err;
if (info->s->now_transactional)
if (share->now_transactional)
{
LSN lsn;
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE +
DIR_COUNT_SIZE];
DIRPOS_STORE_SIZE + HA_CHECKSUM_STORE_SIZE];
size_t row_length;
uint row_parts_count;
ha_checksum checksum_delta;
/* Write UNDO record */
lsn_store(log_data, info->trn->undo_lsn);
......@@ -3073,15 +3158,26 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
PAGE_STORE_SIZE, record_number);
info->log_row_parts[TRANSLOG_INTERNAL_PARTS].str= (char*) log_data;
info->log_row_parts[TRANSLOG_INTERNAL_PARTS].length= sizeof(log_data);
info->log_row_parts[TRANSLOG_INTERNAL_PARTS].length=
sizeof(log_data) - HA_CHECKSUM_STORE_SIZE;
store_checksum_in_rec(share, checksum_delta,
- info->cur_row.checksum,
log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
info->log_row_parts[TRANSLOG_INTERNAL_PARTS +
0].length);
row_length= fill_insert_undo_parts(info, record, info->log_row_parts +
TRANSLOG_INTERNAL_PARTS + 1,
&row_parts_count);
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_DELETE, info->trn,
info, sizeof(log_data) + row_length,
info,
info->log_row_parts[TRANSLOG_INTERNAL_PARTS +
0].length + row_length,
TRANSLOG_INTERNAL_PARTS + 1 + row_parts_count,
info->log_row_parts, log_data + LSN_STORE_SIZE))
info->log_row_parts, log_data + LSN_STORE_SIZE,
&checksum_delta))
goto err;
}
......@@ -3377,7 +3473,8 @@ static my_bool read_long_data(MARIA_HA *info, uchar *to, ulong length,
cur_row.extents_counts contains number of extents
cur_row.empty_bits is set to empty bits
cur_row.field_lengths contains packed length of all fields
cur_row.blob_length contains total length of all blobs.
cur_row.blob_length contains total length of all blobs
cur_row.checksum contains checksum of read record.
RETURN
0 ok
......@@ -4576,6 +4673,211 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
DBUG_RETURN(row_length);
}
/***************************************************************************
In-write hooks called under log's lock when log record is written
***************************************************************************/
/**
@brief Sets transaction's rec_lsn if needed
A transaction sometimes writes a REDO even before the page is in the
pagecache (example: brand new head or tail pages; full pages). So, if
Checkpoint happens just after the REDO write, it needs to know that the
REDO phase must start before this REDO. Scanning the pagecache cannot
tell that as the page is not in the cache. So, transaction sets its rec_lsn
to the REDO's LSN or somewhere before, and Checkpoint reads the
transaction's rec_lsn.
@return Operation status, always 0 (success)
*/
my_bool write_hook_for_redo(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info
__attribute__ ((unused)),
LSN *lsn, void *hook_arg
__attribute__ ((unused)))
{
/*
Users of dummy_transaction_object must keep this TRN clean as it
is used by many threads (like those manipulating non-transactional
tables). It might be dangerous if one user sets rec_lsn or some other
member and it is picked up by another user (like putting this rec_lsn into
a page of a non-transactional table); it's safer if all members stay 0. So
non-transactional log records (REPAIR, CREATE, RENAME, DROP) should not
call this hook; we trust them but verify ;)
*/
DBUG_ASSERT(trn->trid != 0);
/*
If the hook stays so simple, it would be faster to pass
!trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn
to translog_write_record(), like Monty did in his original code, and not
have a hook. For now we keep it like this.
*/
if (trn->rec_lsn == 0)
trn->rec_lsn= *lsn;
return 0;
}
/**
@brief Sets transaction's undo_lsn, first_undo_lsn if needed
@return Operation status, always 0 (success)
*/
my_bool write_hook_for_undo(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info
__attribute__ ((unused)),
LSN *lsn, void *hook_arg
__attribute__ ((unused)))
{
DBUG_ASSERT(trn->trid != 0);
trn->undo_lsn= *lsn;
if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0))
trn->first_undo_lsn=
trn->undo_lsn | LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
DBUG_ASSERT(tbl_info->state == &tbl_info->s->state.state);
return 0;
/*
when we implement purging, we will specialize this hook: UNDO_PURGE
records will additionally set trn->undo_purge_lsn
*/
}
/**
@brief Sets the table's records count and checksum to 0, then calls the
generic REDO hook.
@return Operation status, always 0 (success)
*/
my_bool write_hook_for_redo_delete_all(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info
__attribute__ ((unused)),
LSN *lsn, void *hook_arg)
{
MARIA_SHARE *share= tbl_info->s;
DBUG_ASSERT(tbl_info->state == &tbl_info->s->state.state);
share->state.state.records= share->state.state.checksum= 0;
return write_hook_for_redo(type, trn, tbl_info, lsn, hook_arg);
}
/**
@brief Upates "records" and "checksum" and calls the generic UNDO hook
@return Operation status, always 0 (success)
*/
my_bool write_hook_for_undo_row_insert(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg)
{
MARIA_SHARE *share= tbl_info->s;
share->state.state.records++;
share->state.state.checksum+= *(ha_checksum *)hook_arg;
return write_hook_for_undo(type, trn, tbl_info, lsn, hook_arg);
}
/**
@brief Upates "records" and calls the generic UNDO hook
@return Operation status, always 0 (success)
*/
my_bool write_hook_for_undo_row_delete(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg)
{
MARIA_SHARE *share= tbl_info->s;
share->state.state.records--;
share->state.state.checksum+= *(ha_checksum *)hook_arg;
return write_hook_for_undo(type, trn, tbl_info, lsn, hook_arg);
}
/**
@brief Upates "records" and "checksum" and calls the generic UNDO hook
@return Operation status, always 0 (success)
*/
my_bool write_hook_for_undo_row_update(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg)
{
MARIA_SHARE *share= tbl_info->s;
share->state.state.checksum+= *(ha_checksum *)hook_arg;
return write_hook_for_undo(type, trn, tbl_info, lsn, hook_arg);
}
/**
@brief Sets transaction's undo_lsn, first_undo_lsn if needed
@return Operation status, always 0 (success)
*/
my_bool write_hook_for_clr_end(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info
__attribute__ ((unused)),
LSN *lsn __attribute__ ((unused)),
void *hook_arg)
{
MARIA_SHARE *share= tbl_info->s;
struct st_msg_to_write_hook_for_clr_end *msg=
(struct st_msg_to_write_hook_for_clr_end *)hook_arg;
DBUG_ASSERT(trn->trid != 0);
trn->undo_lsn= msg->previous_undo_lsn;
share->state.state.checksum+= msg->checksum_delta;
switch (msg->undone_record_type) {
case LOGREC_UNDO_ROW_DELETE:
share->state.state.records++;
break;
case LOGREC_UNDO_ROW_INSERT:
share->state.state.records--;
break;
case LOGREC_UNDO_ROW_UPDATE:
break;
default:
DBUG_ASSERT(0);
}
if (trn->undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
return 0;
}
/**
@brief Updates table's lsn_of_file_id.
@return Operation status, always 0 (success)
*/
my_bool write_hook_for_file_id(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn
__attribute__ ((unused)),
MARIA_HA *tbl_info,
LSN *lsn __attribute__ ((unused)),
void *hook_arg
__attribute__ ((unused)))
{
DBUG_ASSERT(cmp_translog_addr(tbl_info->s->lsn_of_file_id, *lsn) < 0);
tbl_info->s->lsn_of_file_id= *lsn;
return 0;
}
/***************************************************************************
Applying of REDO log records
***************************************************************************/
......@@ -4944,19 +5246,25 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
ulonglong page;
uint rownr;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + 1], *buff;
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE + HA_CHECKSUM_STORE_SIZE],
*buff;
my_bool res= 1;
MARIA_PINNED_PAGE page_link;
LSN lsn;
MARIA_SHARE *share= info->s;
struct st_msg_to_write_hook_for_clr_end msg;
DBUG_ENTER("_ma_apply_undo_row_insert");
page= page_korr(header);
rownr= dirpos_korr(header + PAGE_STORE_SIZE);
header+= PAGE_STORE_SIZE;
rownr= dirpos_korr(header);
header+= DIRPOS_STORE_SIZE;
DBUG_PRINT("enter", ("Page: %lu rownr: %u", (ulong) page, rownr));
if (!(buff= pagecache_read(info->s->pagecache,
if (!(buff= pagecache_read(share->pagecache,
&info->dfile, page, 0,
info->buff, info->s->page_type,
info->buff, share->page_type,
PAGECACHE_LOCK_WRITE,
&page_link.link)))
DBUG_RETURN(1);
......@@ -4977,14 +5285,24 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
/* undo_lsn must be first for compression to work */
lsn_store(log_data, undo_lsn);
log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE]= LOGREC_UNDO_ROW_INSERT;
clr_type_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
LOGREC_UNDO_ROW_INSERT);
log_array[TRANSLOG_INTERNAL_PARTS + 0].length=
sizeof(log_data) - HA_CHECKSUM_STORE_SIZE;
msg.undone_record_type= LOGREC_UNDO_ROW_INSERT;
msg.previous_undo_lsn= undo_lsn;
store_checksum_in_rec(share, msg.checksum_delta,
- ha_checksum_korr(header),
log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE,
log_array[TRANSLOG_INTERNAL_PARTS + 0].length);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data),
info->trn, info, log_array[TRANSLOG_INTERNAL_PARTS
+ 0].length,
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data + LSN_STORE_SIZE))
log_data + LSN_STORE_SIZE, &msg))
goto err;
res= 0;
......@@ -5014,6 +5332,16 @@ my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
some buffers to point directly to 'header'
*/
memcpy(&row, &info->cur_row, sizeof(row));
if (share->calc_checksum)
{
/*
We extract the checksum delta here, saving a recomputation in
allocate_and_write_block_record(). It's only an optimization.
*/
row.checksum= - ha_checksum_korr(header);
header+= HA_CHECKSUM_STORE_SIZE;
}
null_field_lengths= row.null_field_lengths;
blob_lengths= row.blob_lengths;
......@@ -5184,18 +5512,25 @@ my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
uchar *current_record, *orig_record;
int error= 1;
MARIA_RECORD_POS record_pos;
ha_checksum checksum_delta;
DBUG_ENTER("_ma_apply_undo_row_update");
page= page_korr(header);
rownr= dirpos_korr(header + PAGE_STORE_SIZE);
header+= PAGE_STORE_SIZE;
rownr= dirpos_korr(header);
header+= DIRPOS_STORE_SIZE;
record_pos= ma_recordpos(page, rownr);
DBUG_PRINT("enter", ("Page: %lu rownr: %u", (ulong) page, rownr));
if (share->calc_checksum)
{
checksum_delta= ha_checksum_korr(header);
header+= HA_CHECKSUM_STORE_SIZE;
}
/*
Set header to point to old field values, generated by
fill_update_undo_parts()
*/
header+= PAGE_STORE_SIZE + DIRPOS_STORE_SIZE;
field_length_header= ma_get_length((uchar**) &header);
field_length_data= header;
header+= field_length_header;
......@@ -5290,14 +5625,14 @@ my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
if (share->calc_checksum)
{
info->cur_row.checksum= (*share->calc_checksum)(info, orig_record);
info->state->checksum+= (info->cur_row.checksum -
(*share->calc_checksum)(info, current_record));
info->new_row.checksum= checksum_delta +
(info->cur_row.checksum= (*share->calc_checksum)(info, orig_record));
/* verify that record's content is sane */
DBUG_ASSERT(info->new_row.checksum ==
(*share->calc_checksum)(info, current_record));
}
/*
Now records are up to date, execute the update to original values
*/
/* Now records are up to date, execute the update to original values */
if (_ma_update_block_record2(info, record_pos, current_record, orig_record,
undo_lsn))
goto err;
......
......@@ -193,3 +193,28 @@ my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length);
my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length);
my_bool write_hook_for_redo(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
void *hook_arg);
my_bool write_hook_for_undo(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
void *hook_arg);
my_bool write_hook_for_redo_delete_all(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg);
my_bool write_hook_for_undo_row_insert(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg);
my_bool write_hook_for_undo_row_delete(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg);
my_bool write_hook_for_undo_row_update(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg);
my_bool write_hook_for_clr_end(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
void *hook_arg);
my_bool write_hook_for_file_id(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
void *hook_arg);
......@@ -5602,11 +5602,10 @@ static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
record).
*/
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE];
uchar log_data[FILEID_STORE_SIZE + 4];
LSN lsn;
compile_time_assert(LSN_STORE_SIZE >= (FILEID_STORE_SIZE + 4));
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= FILEID_STORE_SIZE + 4;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
/*
testflag gives an idea of what REPAIR did (in particular T_QUICK
or not: did it touch the data file or not?).
......@@ -5614,10 +5613,9 @@ static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
int4store(log_data + FILEID_STORE_SIZE, param->testflag);
if (unlikely(translog_write_record(&lsn, LOGREC_REDO_REPAIR_TABLE,
&dummy_transaction_object, info,
log_array[TRANSLOG_INTERNAL_PARTS +
0].length,
sizeof(log_data),
sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data) ||
log_array, log_data, NULL) ||
translog_flush(lsn)))
return 1;
/*
......
......@@ -266,7 +266,7 @@ static int really_execute_checkpoint(void)
&dummy_transaction_object, NULL,
total_rec_length,
sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL) ||
log_array, NULL, NULL) ||
translog_flush(lsn)))
goto err;
......@@ -652,7 +652,7 @@ pthread_handler_t ma_checkpoint_background(void *arg __attribute__((unused)))
break;
#if 0 /* good for testing, to do a lot of checkpoints, finds a lot of bugs */
pthread_mutex_unlock(&LOCK_checkpoint);
my_sleep(100000); // a tenth of a second
my_sleep(100000); /* a tenth of a second */
pthread_mutex_lock(&LOCK_checkpoint);
#else
/* To have a killable sleep, we use timedwait like our SQL GET_LOCK() */
......@@ -893,7 +893,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
filter_param.pages_covered_by_bitmap= share->bitmap.pages_covered;
/* OS file descriptors are ints which we stored in 4 bytes */
compile_time_assert(sizeof(int) == 4);
compile_time_assert(sizeof(int) <= 4);
pthread_mutex_lock(&share->intern_lock);
/*
Tables in a normal state have their two file descriptors open.
......
......@@ -64,7 +64,7 @@ int ma_commit(TRN *trn)
res= (translog_write_record(&commit_lsn, LOGREC_COMMIT,
trn, NULL, 0,
sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL) ||
log_array, NULL, NULL) ||
translog_flush(commit_lsn) ||
trnman_commit_trn(trn));
/*
......
......@@ -997,7 +997,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
&dummy_transaction_object, NULL,
total_rec_length,
sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL) ||
log_array, NULL, NULL) ||
translog_flush(lsn)))
goto err;
/*
......
......@@ -44,10 +44,10 @@ int maria_delete(MARIA_HA *info,const uchar *record)
/* Test if record is in datafile */
DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
maria_print_error(info->s, HA_ERR_CRASHED);
maria_print_error(share, HA_ERR_CRASHED);
DBUG_RETURN(my_errno= HA_ERR_CRASHED););
DBUG_EXECUTE_IF("my_error_test_undefined_error",
maria_print_error(info->s, INT_MAX);
maria_print_error(share, INT_MAX);
DBUG_RETURN(my_errno= INT_MAX););
if (!(info->update & HA_STATE_AKTIV))
{
......@@ -70,17 +70,17 @@ int maria_delete(MARIA_HA *info,const uchar *record)
old_key= info->lastkey2;
for (i=0 ; i < share->base.keys ; i++ )
{
if (maria_is_key_active(info->s->state.key_map, i))
if (maria_is_key_active(share->state.key_map, i))
{
info->s->keyinfo[i].version++;
if (info->s->keyinfo[i].flag & HA_FULLTEXT )
share->keyinfo[i].version++;
if (share->keyinfo[i].flag & HA_FULLTEXT )
{
if (_ma_ft_del(info,i,(char*) old_key,record,info->cur_row.lastpos))
goto err;
}
else
{
if (info->s->keyinfo[i].ck_delete(info,i,old_key,
if (share->keyinfo[i].ck_delete(info,i,old_key,
_ma_make_key(info,i,old_key,record,info->cur_row.lastpos)))
goto err;
}
......@@ -89,19 +89,20 @@ int maria_delete(MARIA_HA *info,const uchar *record)
}
}
if ((*share->delete_record)(info, record))
goto err; /* Remove record from database */
/*
We can't use the row based checksum as this doesn't have enough
precision.
*/
if (info->s->calc_checksum)
if (share->calc_checksum)
{
info->cur_row.checksum= (*info->s->calc_checksum)(info,record);
info->state->checksum-= info->cur_row.checksum;
/*
We can't use the row based checksum as this doesn't have enough
precision.
*/
info->cur_row.checksum= (*share->calc_checksum)(info, record);
}
if ((*share->delete_record)(info, record))
goto err; /* Remove record from database */
info->state->checksum+= - !share->now_transactional *
info->cur_row.checksum;
info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
info->state->records-= !share->now_transactional;
share->state.changed|= STATE_NOT_OPTIMIZED_ROWS;
......@@ -111,8 +112,8 @@ int maria_delete(MARIA_HA *info,const uchar *record)
allow_break(); /* Allow SIGHUP & SIGINT */
if (info->invalidator != 0)
{
DBUG_PRINT("info", ("invalidator... '%s' (delete)", info->s->open_file_name));
(*info->invalidator)(info->s->open_file_name);
DBUG_PRINT("info", ("invalidator... '%s' (delete)", share->open_file_name));
(*info->invalidator)(share->open_file_name);
info->invalidator=0;
}
DBUG_RETURN(0);
......@@ -122,7 +123,7 @@ int maria_delete(MARIA_HA *info,const uchar *record)
mi_sizestore(lastpos, info->cur_row.lastpos);
if (save_errno != HA_ERR_RECORD_CHANGED)
{
maria_print_error(info->s, HA_ERR_CRASHED);
maria_print_error(share, HA_ERR_CRASHED);
maria_mark_crashed(info); /* mark table crashed */
}
VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
......@@ -131,7 +132,7 @@ int maria_delete(MARIA_HA *info,const uchar *record)
my_errno=save_errno;
if (save_errno == HA_ERR_KEY_NOT_FOUND)
{
maria_print_error(info->s, HA_ERR_CRASHED);
maria_print_error(share, HA_ERR_CRASHED);
my_errno=HA_ERR_CRASHED;
}
......
......@@ -64,14 +64,15 @@ int maria_delete_all_rows(MARIA_HA *info)
if (unlikely(translog_write_record(&lsn, LOGREC_REDO_DELETE_ALL,
info->trn, info, 0,
sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data) ||
log_array, log_data, NULL) ||
translog_flush(lsn)))
goto err;
}
/*
For recovery it matters that this is called after writing the log record,
so that resetting state.records actually happens under log's mutex.
so that resetting state.records and state.checksum actually happens under
log's mutex.
*/
_ma_reset_status(info);
......@@ -147,10 +148,6 @@ void _ma_reset_status(MARIA_HA *info)
info->state->key_file_length= share->base.keystart;
info->state->data_file_length= 0;
info->state->empty= info->state->key_empty= 0;
/**
@todo RECOVERY BUG
the line below must happen under log's mutex when writing the REDO
*/
info->state->checksum= 0;
/* Drop the delete key chain. */
......
......@@ -92,7 +92,7 @@ int maria_delete_table(const char *name)
log_array[TRANSLOG_INTERNAL_PARTS +
0].length,
sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL) ||
log_array, NULL, NULL) ||
translog_flush(lsn)))
DBUG_RETURN(1);
}
......
......@@ -14,8 +14,8 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "maria_def.h"
#include "ma_blockrec.h"
#include "trnman.h"
#include "ma_blockrec.h" /* for some constants and in-write hooks */
#include "trnman.h" /* for access to members of TRN */
/**
@file
......@@ -208,31 +208,6 @@ static MARIA_SHARE **id_to_share= NULL;
/* lock for id_to_share */
static my_atomic_rwlock_t LOCK_id_to_share;
static my_bool write_hook_for_redo(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
struct st_translog_parts *parts);
static my_bool write_hook_for_undo(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
struct st_translog_parts *parts);
static my_bool write_hook_for_redo_delete_all(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn,
struct st_translog_parts *parts);
static my_bool write_hook_for_undo_row_insert(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn,
struct st_translog_parts *parts);
static my_bool write_hook_for_undo_row_delete(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn,
struct st_translog_parts *parts);
static my_bool write_hook_for_clr_end(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
struct st_translog_parts *parts);
static my_bool write_hook_for_file_id(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
struct st_translog_parts *parts);
static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr);
/*
......@@ -437,8 +412,8 @@ static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW=
"redo_undelete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_CLR_END=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, LSN_STORE_SIZE + FILEID_STORE_SIZE + 1,
LSN_STORE_SIZE + FILEID_STORE_SIZE + 1, NULL, write_hook_for_clr_end, NULL, 1,
{LOGRECTYPE_VARIABLE_LENGTH, 0, LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE, NULL, write_hook_for_clr_end, NULL, 1,
"clr_end", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_PURGE_END=
......@@ -446,8 +421,7 @@ static LOG_DESC INIT_LOGREC_PURGE_END=
"purge_end", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_ROW_INSERT=
{LOGRECTYPE_PSEUDOFIXEDLENGTH,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
{LOGRECTYPE_VARIABLE_LENGTH, 0,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_undo_row_insert, NULL, 1,
"undo_row_insert", LOGREC_LAST_IN_GROUP, NULL, NULL};
......@@ -461,7 +435,7 @@ static LOG_DESC INIT_LOGREC_UNDO_ROW_DELETE=
static LOG_DESC INIT_LOGREC_UNDO_ROW_UPDATE=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_undo, NULL, 1,
NULL, write_hook_for_undo_row_update, NULL, 1,
"undo_row_update", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_KEY_INSERT=
......@@ -3678,23 +3652,23 @@ static translog_size_t translog_get_current_group_size()
}
/*
Write variable record in 1 group
/**
@brief Write variable record in 1 group.
SYNOPSIS
translog_write_variable_record_1group()
lsn LSN of the record will be written here
type the log record type
short_trid Short transaction ID or 0 if it has no sense
parts Descriptor of record source parts
buffer_to_flush Buffer which have to be flushed if it is not 0
header_length Calculated header length of chunk type 0
trn Transaction structure pointer for hooks by
record log type, for short_id
@param lsn LSN of the record will be written here
@param type the log record type
@param short_trid Short transaction ID or 0 if it has no sense
@param parts Descriptor of record source parts
@param buffer_to_flush Buffer which have to be flushed if it is not 0
@param header_length Calculated header length of chunk type 0
@param trn Transaction structure pointer for hooks by
record log type, for short_id
@param hook_arg Argument which will be passed to pre-write and
in-write hooks of this record.
RETURN
0 OK
1 Error
@return Operation status
@retval 0 OK
@retval 1 Error
*/
static my_bool
......@@ -3705,7 +3679,7 @@ translog_write_variable_record_1group(LSN *lsn,
struct st_translog_parts *parts,
struct st_translog_buffer
*buffer_to_flush, uint16 header_length,
TRN *trn)
TRN *trn, void *hook_arg)
{
TRANSLOG_ADDRESS horizon;
struct st_buffer_cursor cursor;
......@@ -3721,7 +3695,7 @@ translog_write_variable_record_1group(LSN *lsn,
*lsn, TRUE) ||
(log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info,
lsn, parts)))
lsn, hook_arg)))
{
translog_unlock();
DBUG_RETURN(1);
......@@ -3830,23 +3804,23 @@ translog_write_variable_record_1group(LSN *lsn,
}
/*
Write variable record in 1 chunk
/**
@brief Write variable record in 1 chunk.
SYNOPSIS
translog_write_variable_record_1chunk()
lsn LSN of the record will be written here
type the log record type
short_trid Short transaction ID or 0 if it has no sense
parts Descriptor of record source parts
buffer_to_flush Buffer which have to be flushed if it is not 0
header_length Calculated header length of chunk type 0
trn Transaction structure pointer for hooks by
record log type, for short_id
@param lsn LSN of the record will be written here
@param type the log record type
@param short_trid Short transaction ID or 0 if it has no sense
@param parts Descriptor of record source parts
@param buffer_to_flush Buffer which have to be flushed if it is not 0
@param header_length Calculated header length of chunk type 0
@param trn Transaction structure pointer for hooks by
record log type, for short_id
@param hook_arg Argument which will be passed to pre-write and
in-write hooks of this record.
RETURN
0 OK
1 Error
@return Operation status
@retval 0 OK
@retval 1 Error
*/
static my_bool
......@@ -3857,7 +3831,7 @@ translog_write_variable_record_1chunk(LSN *lsn,
struct st_translog_parts *parts,
struct st_translog_buffer
*buffer_to_flush, uint16 header_length,
TRN *trn)
TRN *trn, void *hook_arg)
{
int rc;
uchar chunk0_header[1 + 2 + 5 + 2];
......@@ -3871,7 +3845,7 @@ translog_write_variable_record_1chunk(LSN *lsn,
*lsn, TRUE) ||
(log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info,
lsn, parts)))
lsn, hook_arg)))
{
translog_unlock();
DBUG_RETURN(1);
......@@ -4196,24 +4170,24 @@ static my_bool translog_relative_LSN_encode(struct st_translog_parts *parts,
}
/*
Write multi-group variable-size record
/**
@brief Write multi-group variable-size record.
SYNOPSIS
translog_write_variable_record_mgroup()
lsn LSN of the record will be written here
type the log record type
short_trid Short transaction ID or 0 if it has no sense
parts Descriptor of record source parts
buffer_to_flush Buffer which have to be flushed if it is not 0
header_length Header length calculated for 1 group
buffer_rest Beginning from which we plan to write in full pages
trn Transaction structure pointer for hooks by
record log type, for short_id
@param lsn LSN of the record will be written here
@param type the log record type
@param short_trid Short transaction ID or 0 if it has no sense
@param parts Descriptor of record source parts
@param buffer_to_flush Buffer which have to be flushed if it is not 0
@param header_length Header length calculated for 1 group
@param buffer_rest Beginning from which we plan to write in full pages
@param trn Transaction structure pointer for hooks by
record log type, for short_id
@param hook_arg Argument which will be passed to pre-write and
in-write hooks of this record.
RETURN
0 OK
1 Error
@return Operation status
@retval 0 OK
@retval 1 Error
*/
static my_bool
......@@ -4226,7 +4200,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
*buffer_to_flush,
uint16 header_length,
translog_size_t buffer_rest,
TRN *trn)
TRN *trn, void *hook_arg)
{
TRANSLOG_ADDRESS horizon;
struct st_buffer_cursor cursor;
......@@ -4554,7 +4528,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
if (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook) (type, trn,
tbl_info,
lsn, parts))
lsn, hook_arg))
goto err;
}
......@@ -4626,21 +4600,21 @@ translog_write_variable_record_mgroup(LSN *lsn,
}
/*
Write the variable length log record
/**
@brief Write the variable length log record.
SYNOPSIS
translog_write_variable_record()
lsn LSN of the record will be written here
type the log record type
short_trid Short transaction ID or 0 if it has no sense
parts Descriptor of record source parts
trn Transaction structure pointer for hooks by
record log type, for short_id
@param lsn LSN of the record will be written here
@param type the log record type
@param short_trid Short transaction ID or 0 if it has no sense
@param parts Descriptor of record source parts
@param trn Transaction structure pointer for hooks by
record log type, for short_id
@param hook_arg Argument which will be passed to pre-write and
in-write hooks of this record.
RETURN
0 OK
1 Error
@return Operation status
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_write_variable_record(LSN *lsn,
......@@ -4648,7 +4622,7 @@ static my_bool translog_write_variable_record(LSN *lsn,
MARIA_HA *tbl_info,
SHORT_TRANSACTION_ID short_trid,
struct st_translog_parts *parts,
TRN *trn)
TRN *trn, void *hook_arg)
{
struct st_translog_buffer *buffer_to_flush= NULL;
uint header_length1= 1 + 2 + 2 +
......@@ -4725,7 +4699,7 @@ static my_bool translog_write_variable_record(LSN *lsn,
res= translog_write_variable_record_1chunk(lsn, type, tbl_info,
short_trid,
parts, buffer_to_flush,
header_length1, trn);
header_length1, trn, hook_arg);
DBUG_RETURN(res);
}
......@@ -4737,7 +4711,7 @@ static my_bool translog_write_variable_record(LSN *lsn,
res= translog_write_variable_record_1group(lsn, type, tbl_info,
short_trid,
parts, buffer_to_flush,
header_length1, trn);
header_length1, trn, hook_arg);
DBUG_RETURN(res);
}
/* following function makes translog_unlock(); */
......@@ -4745,26 +4719,26 @@ static my_bool translog_write_variable_record(LSN *lsn,
short_trid,
parts, buffer_to_flush,
header_length1,
buffer_rest, trn);
buffer_rest, trn, hook_arg);
DBUG_RETURN(res);
}
/*
Write the fixed and pseudo-fixed log record
/**
@brief Write the fixed and pseudo-fixed log record.
SYNOPSIS
translog_write_fixed_record()
lsn LSN of the record will be written here
type the log record type
short_trid Short transaction ID or 0 if it has no sense
parts Descriptor of record source parts
trn Transaction structure pointer for hooks by
record log type, for short_id
@param lsn LSN of the record will be written here
@param type the log record type
@param short_trid Short transaction ID or 0 if it has no sense
@param parts Descriptor of record source parts
@param trn Transaction structure pointer for hooks by
record log type, for short_id
@param hook_arg Argument which will be passed to pre-write and
in-write hooks of this record.
RETURN
0 OK
1 Error
@return Operation status
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_write_fixed_record(LSN *lsn,
......@@ -4772,7 +4746,7 @@ static my_bool translog_write_fixed_record(LSN *lsn,
MARIA_HA *tbl_info,
SHORT_TRANSACTION_ID short_trid,
struct st_translog_parts *parts,
TRN *trn)
TRN *trn, void *hook_arg)
{
struct st_translog_buffer *buffer_to_flush= NULL;
uchar chunk1_header[1 + 2];
......@@ -4824,7 +4798,7 @@ static my_bool translog_write_fixed_record(LSN *lsn,
*lsn, TRUE) ||
(log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook) (type, trn, tbl_info,
lsn, parts)))
lsn, hook_arg)))
{
rc= 1;
goto err;
......@@ -4899,6 +4873,9 @@ static my_bool translog_write_fixed_record(LSN *lsn,
@param store_share_id if tbl_info!=NULL then share's id will
automatically be stored in the two first bytes
pointed (so pointer is assumed to be !=NULL)
@param hook_arg argument which will be passed to pre-write and
in-write hooks of this record.
@return Operation status
@retval 0 OK
@retval 1 Error
......@@ -4910,7 +4887,8 @@ my_bool translog_write_record(LSN *lsn,
translog_size_t rec_len,
uint part_no,
LEX_STRING *parts_data,
uchar *store_share_id)
uchar *store_share_id,
void *hook_arg)
{
struct st_translog_parts parts;
LEX_STRING *part;
......@@ -4955,7 +4933,7 @@ my_bool translog_write_record(LSN *lsn,
if (unlikely(translog_write_record(&dummy_lsn, LOGREC_LONG_TRANSACTION_ID,
trn, NULL, sizeof(log_data),
sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL)))
log_array, NULL, NULL)))
DBUG_RETURN(1);
}
......@@ -5018,17 +4996,17 @@ my_bool translog_write_record(LSN *lsn,
if (!(rc= (log_record_type_descriptor[type].prewrite_hook &&
(*log_record_type_descriptor[type].prewrite_hook) (type, trn,
tbl_info,
&parts))))
hook_arg))))
{
switch (log_record_type_descriptor[type].class) {
case LOGRECTYPE_VARIABLE_LENGTH:
rc= translog_write_variable_record(lsn, type, tbl_info,
short_trid, &parts, trn);
short_trid, &parts, trn, hook_arg);
break;
case LOGRECTYPE_PSEUDOFIXEDLENGTH:
case LOGRECTYPE_FIXEDLENGTH:
rc= translog_write_fixed_record(lsn, type, tbl_info,
short_trid, &parts, trn);
short_trid, &parts, trn, hook_arg);
break;
case LOGRECTYPE_NOT_ALLOWED:
default:
......@@ -6060,6 +6038,7 @@ translog_size_t translog_read_record(LSN lsn,
if (offset < page_end)
{
uint len= page_end - offset;
set_if_smaller(len, length); /* in case we read beyond record's end */
DBUG_ASSERT(offset >= data->current_offset);
memcpy(buffer,
data->scanner.page + data->body_offset +
......@@ -6339,207 +6318,6 @@ my_bool translog_flush(LSN lsn)
}
/**
@brief Sets transaction's rec_lsn if needed
A transaction sometimes writes a REDO even before the page is in the
pagecache (example: brand new head or tail pages; full pages). So, if
Checkpoint happens just after the REDO write, it needs to know that the
REDO phase must start before this REDO. Scanning the pagecache cannot
tell that as the page is not in the cache. So, transaction sets its rec_lsn
to the REDO's LSN or somewhere before, and Checkpoint reads the
transaction's rec_lsn.
@todo move it to a separate file
@return Operation status, always 0 (success)
*/
static my_bool write_hook_for_redo(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info
__attribute__ ((unused)),
LSN *lsn,
struct st_translog_parts *parts
__attribute__ ((unused)))
{
/*
Users of dummy_transaction_object must keep this TRN clean as it
is used by many threads (like those manipulating non-transactional
tables). It might be dangerous if one user sets rec_lsn or some other
member and it is picked up by another user (like putting this rec_lsn into
a page of a non-transactional table); it's safer if all members stay 0. So
non-transactional log records (REPAIR, CREATE, RENAME, DROP) should not
call this hook; we trust them but verify ;)
*/
DBUG_ASSERT(trn->trid != 0);
/*
If the hook stays so simple, it would be faster to pass
!trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn
to translog_write_record(), like Monty did in his original code, and not
have a hook. For now we keep it like this.
*/
if (trn->rec_lsn == 0)
trn->rec_lsn= *lsn;
return 0;
}
/**
@brief Sets transaction's undo_lsn, first_undo_lsn if needed
@todo move it to a separate file
@return Operation status, always 0 (success)
*/
static my_bool write_hook_for_undo(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info
__attribute__ ((unused)),
LSN *lsn,
struct st_translog_parts *parts
__attribute__ ((unused)))
{
DBUG_ASSERT(trn->trid != 0);
trn->undo_lsn= *lsn;
if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0))
trn->first_undo_lsn=
trn->undo_lsn | LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
return 0;
/*
when we implement purging, we will specialize this hook: UNDO_PURGE
records will additionally set trn->undo_purge_lsn
*/
}
/**
@brief Sets the table's records count to 0, then calls the generic REDO
hook.
@todo move it to a separate file
@return Operation status, always 0 (success)
*/
static my_bool write_hook_for_redo_delete_all(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info
__attribute__ ((unused)),
LSN *lsn,
struct st_translog_parts *parts
__attribute__ ((unused)))
{
tbl_info->s->state.state.records= 0;
return write_hook_for_redo(type, trn, tbl_info, lsn, parts);
}
/**
@brief Upates "records" and calls the generic UNDO hook
@todo move it to a separate file
@return Operation status, always 0 (success)
*/
static my_bool write_hook_for_undo_row_insert(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn,
struct st_translog_parts *parts
__attribute__ ((unused)))
{
tbl_info->s->state.state.records++;
return write_hook_for_undo(type, trn, tbl_info, lsn, parts);
}
/**
@brief Upates "records" and calls the generic UNDO hook
@todo move it to a separate file
@return Operation status, always 0 (success)
*/
static my_bool write_hook_for_undo_row_delete(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn,
struct st_translog_parts *parts
__attribute__ ((unused)))
{
tbl_info->s->state.state.records--;
return write_hook_for_undo(type, trn, tbl_info, lsn, parts);
}
/**
@brief Sets transaction's undo_lsn, first_undo_lsn if needed
@todo move it to a separate file
@return Operation status, always 0 (success)
*/
static my_bool write_hook_for_clr_end(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, MARIA_HA *tbl_info
__attribute__ ((unused)),
LSN *lsn
__attribute__ ((unused)),
struct st_translog_parts *parts)
{
char *ptr= parts->parts[TRANSLOG_INTERNAL_PARTS + 0].str;
enum translog_record_type undone_record_type=
ptr[LSN_STORE_SIZE + FILEID_STORE_SIZE];
DBUG_ASSERT(trn->trid != 0);
trn->undo_lsn= lsn_korr(ptr);
switch (undone_record_type) {
case LOGREC_UNDO_ROW_DELETE:
tbl_info->s->state.state.records++;
break;
case LOGREC_UNDO_ROW_INSERT:
tbl_info->s->state.state.records--;
break;
case LOGREC_UNDO_ROW_UPDATE:
break;
default:
DBUG_ASSERT(0);
}
if (trn->undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
return 0;
}
/**
@brief Updates table's lsn_of_file_id.
@todo move it to a separate file
@return Operation status, always 0 (success)
*/
static my_bool write_hook_for_file_id(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn
__attribute__ ((unused)),
MARIA_HA *tbl_info,
LSN *lsn
__attribute__ ((unused)),
struct st_translog_parts *parts
__attribute__ ((unused)))
{
DBUG_ASSERT(cmp_translog_addr(tbl_info->s->lsn_of_file_id, *lsn) < 0);
tbl_info->s->lsn_of_file_id= *lsn;
return 0;
}
/**
@brief Gives a 2-byte-id to MARIA_SHARE and logs this fact
......@@ -6610,7 +6388,7 @@ int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn)
log_array[TRANSLOG_INTERNAL_PARTS +
1].length,
sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data)))
log_array, log_data, NULL)))
return 1;
}
pthread_mutex_unlock(&share->intern_lock);
......
......@@ -48,6 +48,7 @@ typedef uint16 SHORT_TRANSACTION_ID;
struct st_maria_info;
/* Changing one of the "SIZE" below will break backward-compatibility! */
/* Length of CRC at end of pages */
#define CRC_LENGTH 4
/* Size of file id in logs */
......@@ -57,16 +58,23 @@ struct st_maria_info;
/* Size of page ranges in log */
#define PAGERANGE_STORE_SIZE ROW_EXTENT_COUNT_SIZE
#define DIRPOS_STORE_SIZE 1
#define CLR_TYPE_STORE_SIZE 1
/* If table has live checksum we store its changes in UNDOs */
#define HA_CHECKSUM_STORE_SIZE 4
/* Store methods to match the above sizes */
#define fileid_store(T,A) int2store(T,A)
#define page_store(T,A) int5store(T,A)
#define dirpos_store(T,A) ((*(uchar*) (T)) = A)
#define pagerange_store(T,A) int2store(T,A)
#define clr_type_store(T,A) ((*(uchar*) (T)) = A)
#define ha_checksum_store(T,A) int4store(T,A)
#define fileid_korr(P) uint2korr(P)
#define page_korr(P) uint5korr(P)
#define dirpos_korr(P) ((P)[0])
#define pagerange_korr(P) uint2korr(P)
#define clr_type_korr(P) ((P)[0])
#define ha_checksum_korr(P) uint4korr(P)
/*
Length of disk drive sector size (we assume that writing it
......@@ -230,7 +238,8 @@ translog_write_record(LSN *lsn, enum translog_record_type type,
struct st_transaction *trn,
struct st_maria_info *tbl_info,
translog_size_t rec_len, uint part_no,
LEX_STRING *parts_data, uchar *store_share_id);
LEX_STRING *parts_data, uchar *store_share_id,
void *hook_arg);
extern void translog_destroy();
......@@ -299,12 +308,11 @@ struct st_translog_parts
typedef my_bool(*prewrite_rec_hook) (enum translog_record_type type,
TRN *trn, struct st_maria_info *tbl_info,
struct st_translog_parts *parts);
void *hook_arg);
typedef my_bool(*inwrite_rec_hook) (enum translog_record_type type,
TRN *trn, struct st_maria_info *tbl_info,
LSN *lsn,
struct st_translog_parts *parts);
LSN *lsn, void *hook_arg);
typedef uint16(*read_rec_hook) (enum translog_record_type type,
uint16 read_length, uchar *read_buff,
......
......@@ -177,7 +177,7 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
share->delay_key_write=1;
info.state= &share->state.state; /* Change global values by default */
if (!share->base.born_transactional) /* but for transactional ones ... */
if (!share->base.born_transactional) /* For transactional ones ... */
info.trn= &dummy_transaction_object; /* ... force crash if no trn given */
pthread_mutex_unlock(&share->intern_lock);
......
......@@ -3941,7 +3941,7 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
}
}
compile_time_assert(sizeof(pagecache->blocks == 4));
compile_time_assert(sizeof(pagecache->blocks) <= 4);
str->length= 4 + /* number of dirty pages */
(4 + /* file */
4 + /* pageno */
......@@ -3963,8 +3963,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
{
if (block->type != PAGECACHE_LSN_PAGE)
continue; /* no need to store it in the checkpoint record */
compile_time_assert((4 == sizeof(block->hash_link->file.file)));
compile_time_assert((4 == sizeof(block->hash_link->pageno)));
compile_time_assert(sizeof(block->hash_link->file.file) <= 4);
compile_time_assert(sizeof(block->hash_link->pageno) <= 4);
int4store(ptr, block->hash_link->file.file);
ptr+= 4;
int4store(ptr, block->hash_link->pageno);
......
......@@ -50,6 +50,7 @@ static LSN current_group_end_lsn,
static TrID max_long_trid= 0; /**< max long trid seen by REDO phase */
static FILE *tracef; /**< trace file for debugging */
static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */
static ulonglong now; /**< for tracking execution time of phases */
#define prototype_redo_exec_hook(R) \
static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
......@@ -312,8 +313,12 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
log_record_buffer.length= 0;
if (tracef != stdout && redo_phase_message_printed)
{
ulonglong old_now= now;
now= my_getsystime();
float previous_phase_took= (now - old_now)/10000000.0;
/** @todo RECOVERY BUG all prints to stderr should go to error log */
fprintf(stderr, "\n");
/** @todo RECOVERY BUG all prints to stderr should go to error log */
fprintf(stderr, " (%.1f seconds)\n", previous_phase_took);
}
/* we don't cleanly close tables if we hit some error (may corrupt them) */
DBUG_RETURN(error);
......@@ -1211,12 +1216,25 @@ prototype_redo_exec_hook(UNDO_ROW_INSERT)
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 0;
MARIA_SHARE *share= info->s;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (cmp_translog_addr(rec->lsn, info->s->state.is_of_horizon) >= 0)
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
tprint(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records++;
/** @todo RECOVERY BUG Also update the table's checksum */
share->state.state.records++;
if (share->calc_checksum)
{
uchar buff[HA_CHECKSUM_STORE_SIZE];
if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
HA_CHECKSUM_STORE_SIZE)
{
tprint(tracef, "Failed to read record\n");
return 1;
}
share->state.state.checksum+= ha_checksum_korr(buff);
}
/**
@todo some bits below will rather be set when executing UNDOs related
to keys
......@@ -1234,15 +1252,29 @@ prototype_redo_exec_hook(UNDO_ROW_DELETE)
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 0;
MARIA_SHARE *share= info->s;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (cmp_translog_addr(rec->lsn, info->s->state.is_of_horizon) >= 0)
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
tprint(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records--;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
share->state.state.records--;
if (share->calc_checksum)
{
uchar buff[HA_CHECKSUM_STORE_SIZE];
if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
HA_CHECKSUM_STORE_SIZE)
{
tprint(tracef, "Failed to read record\n");
return 1;
}
share->state.state.checksum+= ha_checksum_korr(buff);
}
share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
}
tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records);
return 0;
}
......@@ -1252,10 +1284,24 @@ prototype_redo_exec_hook(UNDO_ROW_UPDATE)
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 0;
MARIA_SHARE *share= info->s;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (cmp_translog_addr(rec->lsn, info->s->state.is_of_horizon) >= 0)
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
if (share->calc_checksum)
{
uchar buff[HA_CHECKSUM_STORE_SIZE];
if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
HA_CHECKSUM_STORE_SIZE)
{
tprint(tracef, "Failed to read record\n");
return 1;
}
share->state.state.checksum+= ha_checksum_korr(buff);
}
share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
}
return 0;
......@@ -1306,33 +1352,46 @@ prototype_redo_exec_hook(CLR_END)
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 0;
MARIA_SHARE *share= info->s;
LSN previous_undo_lsn= lsn_korr(rec->header);
enum translog_record_type undone_record_type=
(rec->header)[LSN_STORE_SIZE + FILEID_STORE_SIZE];
clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
const LOG_DESC *log_desc= &log_record_type_descriptor[undone_record_type];
set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn);
tprint(tracef, " CLR_END was about %s, undo_lsn now LSN (%lu,0x%lx)\n",
log_desc->name, LSN_IN_PARTS(previous_undo_lsn));
if (cmp_translog_addr(rec->lsn, info->s->state.is_of_horizon) >= 0)
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
tprint(tracef, " state older than record, updating rows' count\n");
if (share->calc_checksum)
{
uchar buff[HA_CHECKSUM_STORE_SIZE];
if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE, HA_CHECKSUM_STORE_SIZE,
buff, NULL) != HA_CHECKSUM_STORE_SIZE)
{
tprint(tracef, "Failed to read record\n");
return 1;
}
share->state.state.checksum+= ha_checksum_korr(buff);
}
switch (undone_record_type) {
case LOGREC_UNDO_ROW_DELETE:
info->s->state.state.records++;
share->state.state.records++;
break;
case LOGREC_UNDO_ROW_INSERT:
info->s->state.state.records--;
share->state.state.records--;
break;
case LOGREC_UNDO_ROW_UPDATE:
break;
default:
DBUG_ASSERT(0);
}
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
}
tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records);
return 0;
}
......@@ -1353,12 +1412,33 @@ prototype_undo_exec_hook(UNDO_ROW_INSERT)
*/
return 1;
}
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
MARIA_SHARE *share= info->s;
share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
const uchar *record_ptr= rec->header;
if (share->calc_checksum)
{
/*
We need to read more of the record to put the checksum into the record
buffer used by _ma_apply_undo_row_insert().
If the table has no live checksum, rec->header will be enough.
*/
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
tprint(tracef, "Failed to read record\n");
return 1;
}
record_ptr= log_record_buffer.str;
}
info->trn= trn;
error= _ma_apply_undo_row_insert(info, previous_undo_lsn,
rec->header + LSN_STORE_SIZE +
record_ptr + LSN_STORE_SIZE +
FILEID_STORE_SIZE);
info->trn= 0;
/* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
......@@ -1378,7 +1458,8 @@ prototype_undo_exec_hook(UNDO_ROW_DELETE)
if (info == NULL)
return 1;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
MARIA_SHARE *share= info->s;
share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
enlarge_buffer(rec);
......@@ -1405,8 +1486,7 @@ prototype_undo_exec_hook(UNDO_ROW_DELETE)
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE));
info->trn= 0;
tprint(tracef, " rows' count %lu\n undo_lsn now LSN (%lu,0x%lx)\n",
(ulong)info->s->state.state.records,
LSN_IN_PARTS(previous_undo_lsn));
(ulong)share->state.state.records, LSN_IN_PARTS(previous_undo_lsn));
return error;
}
......@@ -1419,8 +1499,8 @@ prototype_undo_exec_hook(UNDO_ROW_UPDATE)
if (info == NULL)
return 1;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
MARIA_SHARE *share= info->s;
share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
enlarge_buffer(rec);
......@@ -1634,7 +1714,7 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
hash_free(&all_dirty_pages);
/*
hash_free() can be called multiple times probably, but be safe it that
hash_free() can be called multiple times probably, but be safe if that
changes
*/
bzero(&all_dirty_pages, sizeof(all_dirty_pages));
......@@ -1652,11 +1732,8 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
LSN gslsn= all_active_trans[sid].group_start_lsn;
TRN *trn;
if (gslsn != LSN_IMPOSSIBLE)
{
tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
LSN_IN_PARTS(gslsn), sid);
ALERT_USER();
}
if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
{
char llbuf[22];
......@@ -1734,8 +1811,12 @@ static int run_undo_phase(uint unfinished)
{
if (tracef != stdout)
{
ulonglong old_now= now;
now= my_getsystime();
float previous_phase_took= (now - old_now)/10000000.0;
/** @todo RECOVERY BUG all prints to stderr should go to error log */
fprintf(stderr, " 100%%; transactions to roll back:");
fprintf(stderr, " 100%% (%.1f seconds); transactions to roll back:",
previous_phase_took);
}
tprint(tracef, "%u transactions will be rolled back\n", unfinished);
for( ; ; )
......@@ -2070,8 +2151,11 @@ static int close_all_tables(void)
tprint(tracef, "Closing all tables\n");
if (tracef != stdout && redo_phase_message_printed)
{
ulonglong old_now= now;
now= my_getsystime();
float previous_phase_took= (now - old_now)/10000000.0;
/** @todo RECOVERY BUG all prints to stderr should go to error log */
fprintf(stderr, "; flushing tables");
fprintf(stderr, " (%.1f seconds); flushing tables", previous_phase_took);
}
/*
......@@ -2141,6 +2225,7 @@ static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
/** @todo RECOVERY BUG all prints to stderr should go to error log */
fprintf(stderr, "Maria engine: starting recovery; recovered pages: 0%%");
redo_phase_message_printed= TRUE;
now= my_getsystime();
}
if (end_logno == FILENO_IMPOSSIBLE)
{
......@@ -2166,7 +2251,7 @@ static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
}
#ifdef MARIA_EXTERNAL_LOCKING
#error Maria's Checkpoint and Recovery are really not ready for it
#error Marias Checkpoint and Recovery are really not ready for it
#endif
/*
......
......@@ -85,7 +85,7 @@ int maria_rename(const char *old_name, const char *new_name)
&dummy_transaction_object, NULL,
old_name_len + new_name_len,
sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL) ||
log_array, NULL, NULL) ||
translog_flush(lsn)))
{
maria_close(info);
......
......@@ -4,10 +4,6 @@ TEST WITH ma_test1 -s -M -T -c
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 3757530372
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -21,10 +17,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 3757530372
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -78,10 +70,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 221293111
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -95,10 +83,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 221293111
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -112,10 +96,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 221293111
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -137,10 +117,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 3697324514
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -158,10 +134,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 2428948025
---
> Checksum: 3026590807
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -175,10 +147,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 2428948025
---
> Checksum: 3026590807
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -192,10 +160,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 2428948025
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -253,10 +217,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 221293111
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -270,10 +230,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 221293111
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -287,10 +243,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 221293111
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -312,10 +264,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 3697324514
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -333,10 +281,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 2428948025
---
> Checksum: 3026590807
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -350,10 +294,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 2428948025
---
> Checksum: 3026590807
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -367,10 +307,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 2428948025
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -428,10 +364,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 221293111
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -445,10 +377,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 221293111
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -462,10 +390,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 221293111
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -487,10 +411,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 3697324514
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -508,10 +428,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 2428948025
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -525,10 +441,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 2428948025
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -542,10 +454,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 2428948025
---
> Checksum: 0
11c11
< Datafile length: 16384 Keyfile length: 16384
---
......@@ -603,10 +511,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 411409161
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -620,10 +524,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 411409161
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -637,10 +537,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 411409161
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -662,10 +558,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 4024695312
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -683,10 +575,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 529753687
---
> Checksum: 800025671
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -700,10 +588,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 529753687
---
> Checksum: 800025671
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -717,10 +601,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 529753687
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -778,10 +658,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 411409161
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -795,10 +671,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 411409161
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -812,10 +684,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 411409161
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -837,10 +705,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 4024695312
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -858,10 +722,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 529753687
---
> Checksum: 800025671
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -875,10 +735,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 529753687
---
> Checksum: 800025671
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -892,10 +748,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 529753687
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -953,10 +805,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 411409161
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -970,10 +818,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 411409161
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -987,10 +831,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 411409161
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -1012,10 +852,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 4024695312
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -1033,10 +869,6 @@ Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 529753687
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -1050,10 +882,6 @@ testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 529753687
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......@@ -1067,10 +895,6 @@ testing applying of CLRs to recreate table
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
7c7
< Checksum: 529753687
---
> Checksum: 0
11c11
< Datafile length: 49152 Keyfile length: 16384
---
......
......@@ -27,11 +27,9 @@ int maria_update(register MARIA_HA *info, const uchar *oldrec, uchar *newrec)
bool auto_key_changed=0;
ulonglong changed;
MARIA_SHARE *share=info->s;
ha_checksum old_checksum;
DBUG_ENTER("maria_update");
LINT_INIT(new_key);
LINT_INIT(changed);
LINT_INIT(old_checksum);
DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
maria_print_error(info->s, HA_ERR_CRASHED);
......@@ -59,16 +57,6 @@ int maria_update(register MARIA_HA *info, const uchar *oldrec, uchar *newrec)
goto err_end; /* Record has changed */
}
if (share->calc_checksum)
{
/*
We can't use the row based checksum as this doesn't have enough
precision.
*/
if (info->s->calc_checksum)
old_checksum= (*info->s->calc_checksum)(info, oldrec);
}
/* Calculate and check all unique constraints */
key_changed=0;
for (i=0 ; i < share->state.header.uniques ; i++)
......@@ -137,19 +125,23 @@ int maria_update(register MARIA_HA *info, const uchar *oldrec, uchar *newrec)
}
}
}
/*
If we are running with external locking, we must update the index file
that something has changed.
*/
if (changed || !my_disable_locking)
key_changed|= HA_STATE_CHANGED;
if (share->calc_checksum)
{
info->cur_row.checksum= (*share->calc_checksum)(info,newrec);
info->state->checksum+= (info->cur_row.checksum - old_checksum);
/* Store new checksum in index file header */
key_changed|= HA_STATE_CHANGED;
/*
We can't use the row based checksum as this doesn't have enough
precision (one byte, while the table's is more bytes).
At least _ma_check_unique() modifies the 'newrec' record, so checksum
has to be computed _after_ it. Nobody apparently modifies 'oldrec'.
We need to pass the old row's checksum down to (*update_record)(), we do
this via info->new_row.checksum (not intuitive but existing code
mandated that cur_row is the new row).
If (*update_record)() fails, table will be marked corrupted so no need
to revert the live checksum change.
*/
info->state->checksum+= !share->now_transactional *
((info->cur_row.checksum= (*share->calc_checksum)(info, newrec)) -
(info->new_row.checksum= (*share->calc_checksum)(info, oldrec)));
}
{
/*
......@@ -165,14 +157,9 @@ int maria_update(register MARIA_HA *info, const uchar *oldrec, uchar *newrec)
org_delete_link= share->state.dellink;
if ((*share->update_record)(info, pos, oldrec, newrec))
goto err;
if (!key_changed &&
(memcmp((char*) &state, (char*) info->state, sizeof(state)) ||
org_split != share->state.split ||
org_delete_link != share->state.dellink))
key_changed|= HA_STATE_CHANGED; /* Must update index file */
}
if (auto_key_changed)
set_if_bigger(info->s->state.auto_increment,
set_if_bigger(share->state.auto_increment,
ma_retrieve_auto_increment(info, newrec));
/*
......@@ -195,8 +182,8 @@ int maria_update(register MARIA_HA *info, const uchar *oldrec, uchar *newrec)
allow_break(); /* Allow SIGHUP & SIGINT */
if (info->invalidator != 0)
{
DBUG_PRINT("info", ("invalidator... '%s' (update)", info->s->open_file_name));
(*info->invalidator)(info->s->open_file_name);
DBUG_PRINT("info", ("invalidator... '%s' (update)", share->open_file_name));
(*info->invalidator)(share->open_file_name);
info->invalidator=0;
}
DBUG_RETURN(0);
......@@ -232,7 +219,7 @@ int maria_update(register MARIA_HA *info, const uchar *oldrec, uchar *newrec)
}
else
{
maria_print_error(info->s, HA_ERR_CRASHED);
maria_print_error(share, HA_ERR_CRASHED);
maria_mark_crashed(info);
}
info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_ROW_CHANGED |
......@@ -243,7 +230,7 @@ int maria_update(register MARIA_HA *info, const uchar *oldrec, uchar *newrec)
allow_break(); /* Allow SIGHUP & SIGINT */
if (save_errno == HA_ERR_KEY_NOT_FOUND)
{
maria_print_error(info->s, HA_ERR_CRASHED);
maria_print_error(share, HA_ERR_CRASHED);
save_errno=HA_ERR_CRASHED;
}
DBUG_RETURN(my_errno=save_errno);
......
......@@ -162,10 +162,6 @@ int maria_write(MARIA_HA *info, uchar *record)
rw_unlock(&share->key_root_lock[i]);
}
}
/**
@todo RECOVERY BUG
this += must happen under log's mutex when writing the UNDO
*/
if (share->calc_write_checksum)
info->cur_row.checksum= (*share->calc_write_checksum)(info,record);
if (filepos != HA_OFFSET_ERROR)
......@@ -176,7 +172,8 @@ int maria_write(MARIA_HA *info, uchar *record)
@todo when we enable multiple writers, we will have to protect
'records' and 'checksum' somehow.
*/
info->state->checksum+= info->cur_row.checksum;
info->state->checksum+= !share->now_transactional *
info->cur_row.checksum;
}
if (share->base.auto_key)
set_if_bigger(info->s->state.auto_increment,
......
......@@ -198,8 +198,8 @@ int main(int argc __attribute__((unused)), char *argv[])
trn->first_undo_lsn= TRANSACTION_LOGGED_LONG_ID;
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
trn, NULL,
6, TRANSLOG_INTERNAL_PARTS + 1, parts, NULL))
trn, NULL, 6, TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
......@@ -220,8 +220,8 @@ int main(int argc __attribute__((unused)), char *argv[])
/* check auto-count feature */
parts[TRANSLOG_INTERNAL_PARTS + 1].str= NULL;
parts[TRANSLOG_INTERNAL_PARTS + 1].length= 0;
if (translog_write_record(&lsn, LOGREC_FIXED_RECORD_1LSN_EXAMPLE,
trn, NULL, LSN_STORE_SIZE, 0, parts, NULL))
if (translog_write_record(&lsn, LOGREC_FIXED_RECORD_1LSN_EXAMPLE, trn,
NULL, LSN_STORE_SIZE, 0, parts, NULL, NULL))
{
fprintf(stderr, "1 Can't write reference defore record #%lu\n",
(ulong) i);
......@@ -241,7 +241,7 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_write_record(&lsn,
LOGREC_VARIABLE_RECORD_1LSN_EXAMPLE,
trn, NULL, 0, TRANSLOG_INTERNAL_PARTS + 2,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "1 Can't write var reference defore record #%lu\n",
(ulong) i);
......@@ -259,8 +259,8 @@ int main(int argc __attribute__((unused)), char *argv[])
parts[TRANSLOG_INTERNAL_PARTS + 0].length= 23;
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_2LSN_EXAMPLE,
trn, NULL,
23, TRANSLOG_INTERNAL_PARTS + 1, parts, NULL))
trn, NULL, 23, TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL, NULL))
{
fprintf(stderr, "0 Can't write reference defore record #%lu\n",
(ulong) i);
......@@ -280,7 +280,8 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_write_record(&lsn,
LOGREC_VARIABLE_RECORD_2LSN_EXAMPLE,
trn, NULL, 14 + rec_len,
TRANSLOG_INTERNAL_PARTS + 2, parts, NULL))
TRANSLOG_INTERNAL_PARTS + 2, parts, NULL,
NULL))
{
fprintf(stderr, "0 Can't write var reference defore record #%lu\n",
(ulong) i);
......@@ -297,7 +298,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
trn, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) i);
translog_destroy();
......@@ -316,7 +317,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE,
trn, NULL, rec_len,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write variable record #%lu\n", (ulong) i);
translog_destroy();
......
......@@ -106,7 +106,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
......
......@@ -96,7 +96,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
......
......@@ -194,8 +194,8 @@ int main(int argc __attribute__((unused)), char *argv[])
trn->short_id= 0;
trn->first_undo_lsn= TRANSACTION_LOGGED_LONG_ID;
if (translog_write_record(&lsn, LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
trn, NULL,
6, TRANSLOG_INTERNAL_PARTS + 1, parts, NULL))
trn, NULL, 6, TRANSLOG_INTERNAL_PARTS + 1, parts,
NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
......@@ -214,10 +214,9 @@ int main(int argc __attribute__((unused)), char *argv[])
parts[TRANSLOG_INTERNAL_PARTS + 0].length= LSN_STORE_SIZE;
trn->short_id= i % 0xFFFF;
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_1LSN_EXAMPLE,
trn, NULL,
LSN_STORE_SIZE,
TRANSLOG_INTERNAL_PARTS + 1, parts, NULL))
LOGREC_FIXED_RECORD_1LSN_EXAMPLE, trn, NULL,
LSN_STORE_SIZE, TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL, NULL))
{
fprintf(stderr, "1 Can't write reference before record #%lu\n",
(ulong) i);
......@@ -237,7 +236,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_VARIABLE_RECORD_1LSN_EXAMPLE,
trn, NULL, LSN_STORE_SIZE + rec_len,
TRANSLOG_INTERNAL_PARTS + 2,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "1 Can't write var reference before record #%lu\n",
(ulong) i);
......@@ -256,9 +255,8 @@ int main(int argc __attribute__((unused)), char *argv[])
trn->short_id= i % 0xFFFF;
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_2LSN_EXAMPLE,
trn, NULL, 23,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
trn, NULL, 23, TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL, NULL))
{
fprintf(stderr, "0 Can't write reference before record #%lu\n",
(ulong) i);
......@@ -279,7 +277,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_VARIABLE_RECORD_2LSN_EXAMPLE,
trn, NULL, LSN_STORE_SIZE * 2 + rec_len,
TRANSLOG_INTERNAL_PARTS + 2,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "0 Can't write var reference before record #%lu\n",
(ulong) i);
......@@ -296,7 +294,7 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
trn, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1, parts, NULL))
TRANSLOG_INTERNAL_PARTS + 1, parts, NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) i);
translog_destroy();
......@@ -314,7 +312,7 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_write_record(&lsn,
LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE,
trn, NULL, rec_len,
TRANSLOG_INTERNAL_PARTS + 1, parts, NULL))
TRANSLOG_INTERNAL_PARTS + 1, parts, NULL, NULL))
{
fprintf(stderr, "Can't write variable record #%lu\n", (ulong) i);
translog_destroy();
......
......@@ -138,7 +138,7 @@ void writer(int num)
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&trn, NULL, 6, TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write LOGREC_FIXED_RECORD_0LSN_EXAMPLE record #%lu "
"thread %i\n", (ulong) i, num);
......@@ -155,7 +155,7 @@ void writer(int num)
LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE,
&trn, NULL,
len, TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write variable record #%lu\n", (ulong) i);
translog_destroy();
......@@ -307,7 +307,7 @@ int main(int argc __attribute__((unused)),
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write the first record\n");
translog_destroy();
......
......@@ -85,7 +85,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
......
......@@ -97,7 +97,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
......
......@@ -80,7 +80,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
......@@ -102,7 +102,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
......@@ -125,7 +125,7 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_write_record(&lsn,
LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, LONG_BUFFER_SIZE,
TRANSLOG_INTERNAL_PARTS + 1, parts, NULL))
TRANSLOG_INTERNAL_PARTS + 1, parts, NULL, NULL))
{
fprintf(stderr, "Can't write variable record\n");
translog_destroy();
......@@ -150,7 +150,7 @@ int main(int argc __attribute__((unused)), char *argv[])
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
parts, NULL, NULL))
{
fprintf(stderr, "Can't write last record\n");
translog_destroy();
......
......@@ -78,10 +78,10 @@ static int NEAR_F write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
static uint NEAR_F read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
uint sort_length);
static int NEAR_F write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
char *key, uint sort_length, uint count);
uchar *key, uint sort_length, uint count);
static int NEAR_F write_merge_key_varlen(MI_SORT_PARAM *info,
IO_CACHE *to_file,
char* key, uint sort_length,
uchar* key, uint sort_length,
uint count);
static inline int
my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
......@@ -858,16 +858,15 @@ static uint NEAR_F read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
static int NEAR_F write_merge_key_varlen(MI_SORT_PARAM *info,
IO_CACHE *to_file,char* key,
IO_CACHE *to_file, uchar* key,
uint sort_length, uint count)
{
uint idx;
char *bufs = key;
uchar *bufs = key;
for (idx=1;idx<=count;idx++)
{
int err;
if ((err= my_var_write(info,to_file, (uchar*) bufs)))
if ((err= my_var_write(info, to_file, bufs)))
return (err);
bufs=bufs+sort_length;
}
......@@ -876,10 +875,10 @@ static int NEAR_F write_merge_key_varlen(MI_SORT_PARAM *info,
static int NEAR_F write_merge_key(MI_SORT_PARAM *info __attribute__((unused)),
IO_CACHE *to_file, char* key,
IO_CACHE *to_file, uchar* key,
uint sort_length, uint count)
{
return my_b_write(to_file,(uchar*) key,(uint) sort_length*count);
return my_b_write(to_file, key, (uint) sort_length * count);
}
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment