Commit 403dacf6 authored by Monty's avatar Monty

Fixed crash in aria recovery when using bulk insert

MDEV-20578 Got error 126 when executing undo undo_key_delete
upon Aria crash recovery

The crash happens in this scenario:
- Table with unique keys and non unique keys
- Batch insert (LOAD DATA or INSERT ... SELECT) with REPLACE
- Some insert succeeds followed by duplicate key error

In the above scenario the table gets corrupted.

The bug was that we don't generate any undo entry for the
failed insert as the whole insert can be ignored by undo.
The code did however not take into account that when bulk
insert is used, we would write cached keys to the file on
failure and undo would wrongly ignore these.

Fixed by moving the writing of the cache keys after we write
the aborted-insert event to the log.
parent 0c1f97b3
--skip-stack-trace --skip-core-file
--default-storage-engine=Aria
create table t1 (a int primary key, b int, c int, unique key(b), key(c)) engine=aria transactional=1;
insert into t1 values (1000,1000,1000);
insert into t1 select seq,seq+100, seq+200 from seq_1_to_10;
SET GLOBAL debug_dbug="+d,crash_end_bulk_insert";
REPLACE into t1 select seq+20,seq+95, seq + 300 from seq_1_to_10;
ERROR HY000: Lost connection to MySQL server during query
check table t1;
Table Op Msg_type Msg_text
test.t1 check status OK
select sum(a),sum(b),sum(c) from t1;
sum(a) sum(b) sum(c)
1055 2055 3055
drop table t1;
--source include/not_embedded.inc
--source include/not_valgrind.inc
# Avoid CrashReporter popup on Mac
--source include/not_crashrep.inc
# Binary must be compiled with debug for crash to occur
--source include/have_debug.inc
--source include/have_sequence.inc
#
# MDEV-20578 Got error 126 when executing undo undo_key_delete upon Aria crash
# recovery
#
# Write file to make mysql-test-run.pl expect crash and restart
--exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
create table t1 (a int primary key, b int, c int, unique key(b), key(c)) engine=aria transactional=1;
insert into t1 values (1000,1000,1000);
insert into t1 select seq,seq+100, seq+200 from seq_1_to_10;
# Insert into t1 with batch insert where we get a rows replaced after
# a few sucessful inserts
SET GLOBAL debug_dbug="+d,crash_end_bulk_insert";
--error 2013
REPLACE into t1 select seq+20,seq+95, seq + 300 from seq_1_to_10;
# Wait until restarted
--enable_reconnect
--source include/wait_until_connected_again.inc
--disable_reconnect
check table t1;
select sum(a),sum(b),sum(c) from t1;
drop table t1;
...@@ -4164,6 +4164,19 @@ int handler::ha_repair(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -4164,6 +4164,19 @@ int handler::ha_repair(THD* thd, HA_CHECK_OPT* check_opt)
} }
/**
End bulk insert
*/
int handler::ha_end_bulk_insert()
{
DBUG_ENTER("handler::ha_end_bulk_insert");
DBUG_EXECUTE_IF("crash_end_bulk_insert",
{ extra(HA_EXTRA_FLUSH) ; DBUG_SUICIDE();});
estimation_rows_to_insert= 0;
DBUG_RETURN(end_bulk_insert());
}
/** /**
Bulk update row: public interface. Bulk update row: public interface.
......
...@@ -2911,13 +2911,7 @@ class handler :public Sql_alloc ...@@ -2911,13 +2911,7 @@ class handler :public Sql_alloc
start_bulk_insert(rows, flags); start_bulk_insert(rows, flags);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
int ha_end_bulk_insert() int ha_end_bulk_insert();
{
DBUG_ENTER("handler::ha_end_bulk_insert");
estimation_rows_to_insert= 0;
int ret= end_bulk_insert();
DBUG_RETURN(ret);
}
int ha_bulk_update_row(const uchar *old_data, uchar *new_data, int ha_bulk_update_row(const uchar *old_data, uchar *new_data,
uint *dup_key_found); uint *dup_key_found);
int ha_delete_all_rows(); int ha_delete_all_rows();
......
...@@ -949,6 +949,7 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE) ...@@ -949,6 +949,7 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE)
char *old_name, *new_name; char *old_name, *new_name;
int error= 1; int error= 1;
MARIA_HA *info= NULL; MARIA_HA *info= NULL;
my_bool from_table_is_crashed= 0;
DBUG_ENTER("exec_REDO_LOGREC_REDO_RENAME_TABLE"); DBUG_ENTER("exec_REDO_LOGREC_REDO_RENAME_TABLE");
if (skip_DDLs) if (skip_DDLs)
...@@ -1018,15 +1019,15 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE) ...@@ -1018,15 +1019,15 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE)
} }
if (maria_is_crashed(info)) if (maria_is_crashed(info))
{ {
tprint(tracef, ", is crashed, can't rename it"); tprint(tracef, "is crashed, can't be used for rename ; new-name table ");
ALERT_USER(); from_table_is_crashed= 1;
goto end;
} }
if (close_one_table(info->s->open_file_name.str, rec->lsn) || if (close_one_table(info->s->open_file_name.str, rec->lsn) ||
maria_close(info)) maria_close(info))
goto end; goto end;
info= NULL; info= NULL;
tprint(tracef, ", is ok for renaming; new-name table "); if (!from_table_is_crashed)
tprint(tracef, "is ok for renaming; new-name table ");
} }
else /* one or two files absent, or header corrupted... */ else /* one or two files absent, or header corrupted... */
{ {
...@@ -1091,11 +1092,19 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE) ...@@ -1091,11 +1092,19 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE)
goto end; goto end;
info= NULL; info= NULL;
/* abnormal situation */ /* abnormal situation */
tprint(tracef, ", exists but is older than record, can't rename it"); tprint(tracef, "exists but is older than record, can't rename it");
goto end; goto end;
} }
else /* one or two files absent, or header corrupted... */ else /* one or two files absent, or header corrupted... */
tprint(tracef, ", can't be opened, probably does not exist"); tprint(tracef, "can't be opened, probably does not exist");
if (from_table_is_crashed)
{
eprint(tracef, "Aborting rename as old table was crashed");
ALERT_USER();
goto end;
}
tprint(tracef, ", renaming '%s'", old_name); tprint(tracef, ", renaming '%s'", old_name);
if (maria_rename(old_name, new_name)) if (maria_rename(old_name, new_name))
{ {
......
...@@ -335,12 +335,6 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -335,12 +335,6 @@ int maria_write(MARIA_HA *info, uchar *record)
my_errno == HA_ERR_NULL_IN_SPATIAL || my_errno == HA_ERR_NULL_IN_SPATIAL ||
my_errno == HA_ERR_OUT_OF_MEM) my_errno == HA_ERR_OUT_OF_MEM)
{ {
if (info->bulk_insert)
{
uint j;
for (j=0 ; j < share->base.keys ; j++)
maria_flush_bulk_insert(info, j);
}
info->errkey= i < share->base.keys ? (int) i : -1; info->errkey= i < share->base.keys ? (int) i : -1;
/* /*
We delete keys in the reverse order of insertion. This is the order that We delete keys in the reverse order of insertion. This is the order that
...@@ -366,6 +360,7 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -366,6 +360,7 @@ int maria_write(MARIA_HA *info, uchar *record)
{ {
if (_ma_ft_del(info,i,buff,record,filepos)) if (_ma_ft_del(info,i,buff,record,filepos))
{ {
fatal_error= 1;
if (local_lock_tree) if (local_lock_tree)
mysql_rwlock_unlock(&keyinfo->root_lock); mysql_rwlock_unlock(&keyinfo->root_lock);
break; break;
...@@ -380,6 +375,7 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -380,6 +375,7 @@ int maria_write(MARIA_HA *info, uchar *record)
filepos, filepos,
info->trn->trid))) info->trn->trid)))
{ {
fatal_error= 1;
if (local_lock_tree) if (local_lock_tree)
mysql_rwlock_unlock(&keyinfo->root_lock); mysql_rwlock_unlock(&keyinfo->root_lock);
break; break;
...@@ -399,6 +395,13 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -399,6 +395,13 @@ int maria_write(MARIA_HA *info, uchar *record)
fatal_error= 1; fatal_error= 1;
} }
if (info->bulk_insert)
{
uint j;
for (j=0 ; j < share->base.keys ; j++)
maria_flush_bulk_insert(info, j);
}
if (fatal_error) if (fatal_error)
{ {
maria_print_error(info->s, HA_ERR_CRASHED); maria_print_error(info->s, HA_ERR_CRASHED);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment