Commit 407e9b78 authored by Monty's avatar Monty Committed by Sergei Golubchik

MDEV-24395 Atomic DROP TRIGGER

The purpose of this task is to ensure that DROP TRIGGER is atomic.

Description of how atomic drop trigger works:

Logging of DROP TRIGGER
    Log the following information:
    db
    table name
    trigger name
    xid /* Used to check if query was already logged to binary log */
    initial length of the .TRG file
    query if there is space for it, if not log a zero length query.

Recovery operations:
- Delete if exists 'database/trigger_name.TRN~'
  - If this file existed, it means that we crashed before the trigger
    was deleted and there is nothing else to do.
- Get length of .TRG file
  - If file length is unchanged, trigger was not dropped. Nothing else to
    do.
- Log original query to binary, if it was stored in the ddl log. If it was
  not stored (long query string), log the following query to binary log:
  use `database` ; DROP TRIGGER IF EXISTS `trigger_name`
  /* generated by ddl log */;

Other things:
- Added trigger name and DDL_LOG_STATE to drop_trigger()
  Trigger name was added to make the interface more consistent and
  more general.
parent e3cfb7c8
"engine: aria crash point: ddl_log_drop_before_drop_trigger position: 1"
t1.TRG
t1_trg.TRN
t2_trg.TRN
"engine: aria crash point: ddl_log_drop_before_drop_trigger position: 2"
t1.TRG
t2_trg.TRN
master-bin.000001 # Query # # use `test`; DROP TRIGGER t1_trg
"engine: aria crash point: ddl_log_drop_before_drop_trn position: 1"
t1.TRG
t2_trg.TRN
master-bin.000002 # Query # # use `test`; DROP TRIGGER t1_trg
"engine: aria crash point: ddl_log_drop_before_drop_trn position: 2"
master-bin.000001 # Query # # use `test`; DROP TRIGGER t1_trg
master-bin.000002 # Query # # use `test`; DROP TRIGGER t2_trg /* some comment */
"engine: aria crash point: ddl_log_drop_after_drop_trigger position: 1"
t1.TRG
t2_trg.TRN
master-bin.000002 # Query # # use `test`; DROP TRIGGER t1_trg
"engine: aria crash point: ddl_log_drop_after_drop_trigger position: 2"
master-bin.000001 # Query # # use `test`; DROP TRIGGER t1_trg
master-bin.000002 # Query # # use `test`; DROP TRIGGER t2_trg /* some comment */
"engine: aria crash point: definition_file_after_create position: 1"
t1.TRG
t1_trg.TRN
t2_trg.TRN
"engine: aria crash point: definition_file_after_create position: 2"
"No crash!"
master-bin.000001 # Query # # use `test`; DROP TRIGGER t1_trg
master-bin.000001 # Query # # use `test`; DROP TRIGGER t2_trg /* some comment */
"engine: aria crash point: ddl_log_drop_before_binlog position: 1"
t1.TRG
t2_trg.TRN
master-bin.000002 # Query # # use `test`; DROP TRIGGER t1_trg
"engine: aria crash point: ddl_log_drop_before_binlog position: 2"
master-bin.000001 # Query # # use `test`; DROP TRIGGER t1_trg
master-bin.000002 # Query # # use `test`; DROP TRIGGER t2_trg /* some comment */
"engine: aria crash point: ddl_log_drop_after_binlog position: 1"
t1.TRG
t2_trg.TRN
master-bin.000001 # Query # # use `test`; DROP TRIGGER t1_trg
"engine: aria crash point: ddl_log_drop_after_binlog position: 2"
master-bin.000001 # Query # # use `test`; DROP TRIGGER t1_trg
master-bin.000001 # Query # # use `test`; DROP TRIGGER t2_trg /* some comment */
# Test deleting not existing trigger
#
Warnings:
Note 1360 Trigger does not exist
--source include/have_debug.inc
--source include/have_log_bin.inc
--source include/not_valgrind.inc
#
# Testing of atomic drop with crashes in a lot of different places
#
let $MYSQLD_DATADIR= `SELECT @@datadir`;
let long_comment=`select repeat('a',16384)`;
let $engine_count=1;
let $engines='aria';
let $crash_count=6;
let $crash_points='ddl_log_drop_before_drop_trigger', 'ddl_log_drop_before_drop_trn','ddl_log_drop_after_drop_trigger', 'definition_file_after_create', 'ddl_log_drop_before_binlog', 'ddl_log_drop_after_binlog';
# Number of drops in the tested statement
let $drops=2;
let $old_debug=`select @@debug_dbug`;
let $e=0;
let $keep_include_silent=1;
let $grep_script=DROP TRIGGER;
--disable_query_log
while ($e < $engine_count)
{
inc $e;
let $engine=`select ELT($e, $engines)`;
let $default_engine=$engine;
let $extra_option=;
if ($engine == "aria")
{
let $extra_option=transactional=1;
}
if ($engine == "aria_notrans")
{
let $default_engine="aria";
let $extra_option=transactional=0;
}
let $c=0;
while ($c < $crash_count)
{
inc $c;
let $crash=`select ELT($c, $crash_points)`;
let $r=0;
while ($r < $drops)
{
inc $r;
--eval set @@default_storage_engine=$default_engine
--eval create table t1 (a int not null, b int not null) $extra_option;
insert into t1 values(1,1);
flush tables;
delimiter |;
create trigger t1_trg before insert on t1 for each row
begin
if isnull(new.a) then
set new.a:= 1000;
end if;
end|
create trigger t2_trg before insert on t1 for each row
begin
if isnull(new.b) then
set new.b:= 2000;
end if;
end|
delimiter ;|
RESET MASTER;
echo "engine: $engine crash point: $crash position: $r";
--exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
--disable_reconnect
--eval set @@debug_dbug="+d,$crash",@debug_crash_counter=$r
let $errno=0;
let $crashed=1;
--error 0,2013
DROP TRIGGER t1_trg;
let $error=$errno;
if ($error == 0)
{
--error 0,2013
--eval DROP TRIGGER t2_trg /* $long_comment */;
let $error=$errno;
}
--enable_reconnect
--source include/wait_until_connected_again.inc
--disable_query_log
--eval set @@debug_dbug="$old_debug"
if ($error == 0)
{
echo "No crash!";
}
# Check which tables still exists
--list_files $MYSQLD_DATADIR/test *TR*
--let $binlog_file=master-bin.000001
--source include/show_binlog_events.inc
if ($error)
{
--let $binlog_file=master-bin.000002
--source include/show_binlog_events.inc
}
--disable_warnings
drop table if exists t1;
--enable_warnings
}
let long_comment=some comment;
}
}
--echo
--echo # Test deleting not existing trigger
--echo #
drop trigger if exists `t1_trg`;
--enable_query_log
......@@ -82,19 +82,19 @@ uchar ddl_log_file_magic[]=
/* Action names for ddl_log_action_code */
const char *ddl_log_action_name[]=
const char *ddl_log_action_name[DDL_LOG_LAST_ACTION]=
{
"Unknown", "partitioning delete", "partitioning rename",
"partitioning replace", "partitioning exchange",
"rename table", "rename view",
"initialize drop table", "drop table",
"initialize drop view", "drop view"
"initialize drop view", "drop view", "drop trigger",
};
/* Number of phases per entry */
const uchar ddl_log_entry_phases[DDL_LOG_LAST_ACTION]=
{
0, 1, 1, 2, 3, 4, 1, 1, 3, 1, 1
0, 1, 1, 2, 3, 4, 1, 1, 3, 1, 1, 1
};
......@@ -528,6 +528,27 @@ static void set_global_from_ddl_log_entry(const DDL_LOG_ENTRY *ddl_log_entry)
}
/*
Calculate how much space we have left in the log entry for one string
This can be used to check if we have space to store the query string
in the block.
*/
static size_t ddl_log_free_space_in_entry(const DDL_LOG_ENTRY *ddl_log_entry)
{
size_t length= global_ddl_log.name_pos + 3*7; // 3 byte per string below
length+= ddl_log_entry->handler_name.length;
length+= ddl_log_entry->db.length;
length+= ddl_log_entry->name.length;
length+= ddl_log_entry->from_handler_name.length;
length+= ddl_log_entry->from_db.length;
length+= ddl_log_entry->from_name.length;
length+= ddl_log_entry->tmp_name.length;
return global_ddl_log.io_size - length - 3; // 3 is for storing next string
}
/**
Convert from file_entry_buf binary blob to ddl_log_entry struct.
......@@ -805,21 +826,25 @@ class ddl_log_error_handler : public Internal_error_handler
/*
Build a filename for a table, trigger file or .frm
Delete also any temporary file suffixed with ~
@return 0 Temporary file deleted
@return 1 No temporary file found
*/
static void build_filename_and_delete_tmp_file(char *path, size_t path_length,
static bool build_filename_and_delete_tmp_file(char *path, size_t path_length,
const LEX_CSTRING *db,
const LEX_CSTRING *name,
const char *ext,
PSI_file_key psi_key)
{
bool deleted;
uint length= build_table_filename(path, path_length-1,
db->str, name->str, ext, 0);
path[length]= '~';
path[length+1]= 0;
(void) mysql_file_delete(psi_key, path, MYF(0));
deleted= mysql_file_delete(psi_key, path, MYF(0)) != 0;
path[length]= 0;
return deleted;
}
......@@ -833,6 +858,10 @@ static void build_filename_and_delete_tmp_file(char *path, size_t path_length,
@retval FALSE Success
*/
static LEX_CSTRING end_comment=
{ STRING_WITH_LEN(" /* generated by ddl log */")};
static int ddl_log_execute_action(THD *thd, MEM_ROOT *mem_root,
DDL_LOG_ENTRY *ddl_log_entry)
{
......@@ -1210,8 +1239,6 @@ static int ddl_log_execute_action(THD *thd, MEM_ROOT *mem_root,
if (!ddl_log_entry->next_entry && mysql_bin_log.is_open())
{
/* Last drop table. Write query to binlog */
LEX_CSTRING end_comment=
{ STRING_WITH_LEN(" /* generated by ddl recovery */")};
ddl_drop_query.length(ddl_drop_query.length()-1);
ddl_drop_query.append(&end_comment);
......@@ -1247,11 +1274,9 @@ static int ddl_log_execute_action(THD *thd, MEM_ROOT *mem_root,
append_identifier(thd, &ddl_drop_query, &table);
ddl_drop_query.append(',');
if (!ddl_log_entry->next_entry)
if (!ddl_log_entry->next_entry && mysql_bin_log.is_open())
{
/* Last drop view. Write query to binlog */
LEX_CSTRING end_comment=
{ STRING_WITH_LEN(" /* generated by ddl recovery */")};
ddl_drop_query.length(ddl_drop_query.length()-1);
ddl_drop_query.append(&end_comment);
......@@ -1263,6 +1288,70 @@ static int ddl_log_execute_action(THD *thd, MEM_ROOT *mem_root,
}
break;
}
case DDL_LOG_DROP_TRIGGER_ACTION:
{
MY_STAT stat_info;
off_t frm_length= 1; // Impossible length
LEX_CSTRING thd_db= thd->db;
/* Delete trigger temporary file if it still exists */
if (!build_filename_and_delete_tmp_file(to_path, sizeof(to_path) - 1,
&ddl_log_entry->db,
&ddl_log_entry->name,
TRG_EXT,
key_file_fileparser))
{
/* Temporary file existed and was deleted, nothing left to do */
(void) update_phase(entry_pos, DDL_LOG_FINAL_PHASE);
break;
}
/*
We can use length of TRG file as an indication if trigger was removed.
If there is no file, then it means that this was the last trigger
and the file was removed.
*/
if (my_stat(to_path, &stat_info, MYF(0)))
frm_length= (off_t) stat_info.st_size;
if (frm_length != (off_t) ddl_log_entry->unique_id &&
mysql_bin_log.is_open())
{
/*
File size changed and it was not binlogged (as this entry was
executed)
*/
(void) rm_trigname_file(to_path, &ddl_log_entry->db,
&ddl_log_entry->from_name,
MYF(0));
ddl_drop_query.length(0);
ddl_drop_query.set_charset(system_charset_info);
if (ddl_log_entry->tmp_name.length)
{
/* We can use the original query */
ddl_drop_query.append(&ddl_log_entry->tmp_name);
}
else
{
/* Generate new query */
ddl_drop_query.append(STRING_WITH_LEN("DROP TRIGGER IF EXISTS "));
append_identifier(thd, &ddl_drop_query, &ddl_log_entry->from_name);
ddl_drop_query.append(&end_comment);
}
if (mysql_bin_log.is_open())
{
mysql_mutex_unlock(&LOCK_gdl);
thd->db= ddl_log_entry->db;
(void) thd->binlog_query(THD::STMT_QUERY_TYPE,
ddl_drop_query.ptr(),
ddl_drop_query.length(), TRUE, FALSE,
FALSE, 0);
thd->db= thd_db;
mysql_mutex_lock(&LOCK_gdl);
}
}
(void) update_phase(entry_pos, DDL_LOG_FINAL_PHASE);
break;
}
default:
DBUG_ASSERT(0);
break;
......@@ -2113,3 +2202,43 @@ bool ddl_log_drop_view(THD *thd, DDL_LOG_STATE *ddl_state,
DDL_LOG_DROP_VIEW_ACTION, 0,
(handlerton*) 0, path, db, table));
}
bool ddl_log_drop_trigger(THD *thd, DDL_LOG_STATE *ddl_state,
const LEX_CSTRING *db,
const LEX_CSTRING *table,
const LEX_CSTRING *trigger_name,
const LEX_CSTRING *query)
{
DDL_LOG_ENTRY ddl_log_entry;
MY_STAT stat_info;
char path[FN_REFLEN+1];
off_t frm_length= 0;
size_t max_query_length;
DBUG_ENTER("ddl_log_drop");
build_table_filename(path, sizeof(path)-1, db->str, table->str, TRG_EXT, 0);
/* We can use length of frm file as an indication if trigger was removed */
if (my_stat(path, &stat_info, MYF(MY_WME | ME_WARNING)))
frm_length= (off_t) stat_info.st_size;
bzero(&ddl_log_entry, sizeof(ddl_log_entry));
ddl_log_entry.action_type= DDL_LOG_DROP_TRIGGER_ACTION;
ddl_log_entry.unique_id= (ulonglong) frm_length;
ddl_log_entry.db= *const_cast<LEX_CSTRING*>(db);
ddl_log_entry.name= *const_cast<LEX_CSTRING*>(table);
ddl_log_entry.from_name= *const_cast<LEX_CSTRING*>(trigger_name);
/*
If we can store query as is, we store it. Otherwise it will be
re-generated on recovery
*/
max_query_length= ddl_log_free_space_in_entry(&ddl_log_entry);
if (max_query_length >= query->length)
ddl_log_entry.tmp_name= *const_cast<LEX_CSTRING*>(query);
DBUG_RETURN(ddl_log_write(ddl_state, &ddl_log_entry));
}
......@@ -81,6 +81,7 @@ enum ddl_log_action_code
DDL_LOG_DROP_TABLE_ACTION= 8,
DDL_LOG_DROP_VIEW_INIT_ACTION= 9,
DDL_LOG_DROP_VIEW_ACTION= 10,
DDL_LOG_DROP_TRIGGER_ACTION= 11,
DDL_LOG_LAST_ACTION /* End marker */
};
......@@ -228,5 +229,10 @@ bool ddl_log_drop_view(THD *thd, DDL_LOG_STATE *ddl_state,
const LEX_CSTRING *path,
const LEX_CSTRING *db,
const LEX_CSTRING *table);
bool ddl_log_drop_trigger(THD *thd, DDL_LOG_STATE *ddl_state,
const LEX_CSTRING *db,
const LEX_CSTRING *table,
const LEX_CSTRING *trigger_name,
const LEX_CSTRING *query);
extern mysql_mutex_t LOCK_gdl;
#endif /* DDL_LOG_INCLUDED */
......@@ -26,13 +26,14 @@
#include "parse_file.h"
#include "sp.h"
#include "sql_base.h"
#include "sql_show.h" // append_definer, append_identifier
#include "sql_table.h" // build_table_filename,
// check_n_cut_mysql50_prefix
#include "sql_db.h" // get_default_db_collation
#include "sql_handler.h" // mysql_ha_rm_tables
#include "sql_show.h" // append_definer, append_identifier
#include "sql_table.h" // build_table_filename,
// check_n_cut_mysql50_prefix
#include "sql_db.h" // get_default_db_collation
#include "sql_handler.h" // mysql_ha_rm_tables
#include "sp_cache.h" // sp_invalidate_cache
#include <mysys_err.h>
#include <ddl_log.h> // ddl_log_state
#include "debug_sync.h"
#include "mysql/psi/mysql_sp.h"
......@@ -401,10 +402,12 @@ bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create)
MDL_ticket *mdl_ticket= NULL;
MDL_request mdl_request_for_trn;
Query_tables_list backup;
DDL_LOG_STATE ddl_log_state;
DBUG_ENTER("mysql_create_or_drop_trigger");
/* Charset of the buffer for statement must be system one. */
stmt_query.set_charset(system_charset_info);
bzero(&ddl_log_state, sizeof(ddl_log_state));
/*
QQ: This function could be merged in mysql_alter_table() function
......@@ -604,9 +607,15 @@ bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create)
};);
#endif /* WITH_WSREP */
result= (create ?
table->triggers->create_trigger(thd, tables, &stmt_query):
table->triggers->drop_trigger(thd, tables, &stmt_query));
if (create)
result= table->triggers->create_trigger(thd, tables, &stmt_query);
else
{
result= table->triggers->drop_trigger(thd, tables,
&thd->lex->spname->m_name,
&stmt_query,
&ddl_log_state);
}
close_all_tables_for_name(thd, table->s, HA_EXTRA_NOT_USED, NULL);
......@@ -626,7 +635,15 @@ bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create)
end:
if (!result)
{
debug_crash_here("ddl_log_drop_before_binlog");
thd->binlog_xid= thd->query_id;
ddl_log_update_xid(&ddl_log_state, thd->binlog_xid);
result= write_bin_log(thd, TRUE, stmt_query.ptr(), stmt_query.length());
thd->binlog_xid= 0;
debug_crash_here("ddl_log_drop_after_binlog");
}
ddl_log_complete(&ddl_log_state);
/*
If we are under LOCK TABLES we should restore original state of
......@@ -869,13 +886,13 @@ bool Table_triggers_list::create_trigger(THD *thd, TABLE_LIST *tables,
{
if (lex->create_info.or_replace())
{
String drop_trg_query;
LEX_CSTRING *sp_name= &thd->lex->spname->m_name; // alias
/*
The following can fail if the trigger is for another table or
there exists a .TRN file but there was no trigger for it in
the .TRG file
*/
if (unlikely(drop_trigger(thd, tables, &drop_trg_query)))
if (unlikely(drop_trigger(thd, tables, sp_name, 0, 0)))
DBUG_RETURN(true);
}
else if (lex->create_info.if_not_exists())
......@@ -1069,8 +1086,8 @@ static bool rm_trigger_file(char *path, const LEX_CSTRING *db,
True error
*/
static bool rm_trigname_file(char *path, const LEX_CSTRING *db,
const LEX_CSTRING *trigger_name, myf MyFlags)
bool rm_trigname_file(char *path, const LEX_CSTRING *db,
const LEX_CSTRING *trigger_name, myf MyFlags)
{
build_table_filename(path, FN_REFLEN - 1, db->str, trigger_name->str,
TRN_EXT, 0);
......@@ -1168,13 +1185,15 @@ Trigger *Table_triggers_list::find_trigger(const LEX_CSTRING *name,
*/
bool Table_triggers_list::drop_trigger(THD *thd, TABLE_LIST *tables,
String *stmt_query)
LEX_CSTRING *sp_name,
String *stmt_query,
DDL_LOG_STATE *ddl_log_state)
{
const LEX_CSTRING *sp_name= &thd->lex->spname->m_name; // alias
char path[FN_REFLEN];
Trigger *trigger;
stmt_query->set(thd->query(), thd->query_length(), stmt_query->charset());
if (stmt_query)
stmt_query->set(thd->query(), thd->query_length(), stmt_query->charset());
/* Find and delete trigger from list */
if (!(trigger= find_trigger(sp_name, true)))
......@@ -1184,6 +1203,22 @@ bool Table_triggers_list::drop_trigger(THD *thd, TABLE_LIST *tables,
return 1;
}
{
LEX_CSTRING query= {0,0};
if (stmt_query)
{
/* This code is executed in case of DROP TRIGGER */
lex_string_set3(&query, thd->query(), thd->query_length());
}
if (ddl_log_state)
if (ddl_log_drop_trigger(thd, ddl_log_state,
&tables->db, &tables->table_name,
sp_name, &query))
return 1;
}
debug_crash_here("ddl_log_drop_before_drop_trigger");
if (!count) // If no more triggers
{
/*
......@@ -1201,9 +1236,13 @@ bool Table_triggers_list::drop_trigger(THD *thd, TABLE_LIST *tables,
return 1;
}
debug_crash_here("ddl_log_drop_before_drop_trn");
if (rm_trigname_file(path, &tables->db, sp_name, MYF(MY_WME)))
return 1;
debug_crash_here("ddl_log_drop_after_drop_trigger");
delete trigger;
return 0;
}
......
......@@ -28,6 +28,7 @@ class sp_name;
class Query_tables_list;
struct TABLE_LIST;
class Query_tables_list;
typedef struct st_ddl_log_state DDL_LOG_STATE;
/** Event on which trigger is invoked. */
enum trg_event_type
......@@ -220,7 +221,9 @@ class Table_triggers_list: public Sql_alloc
~Table_triggers_list();
bool create_trigger(THD *thd, TABLE_LIST *table, String *stmt_query);
bool drop_trigger(THD *thd, TABLE_LIST *table, String *stmt_query);
bool drop_trigger(THD *thd, TABLE_LIST *table,
LEX_CSTRING *sp_name,
String *stmt_query, DDL_LOG_STATE *ddl_log_state);
bool process_triggers(THD *thd, trg_event_type event,
trg_action_time_type time_type,
bool old_row_is_record1);
......@@ -324,6 +327,8 @@ bool load_table_name_for_trigger(THD *thd,
const LEX_CSTRING *trn_path,
LEX_CSTRING *tbl_name);
bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create);
bool rm_trigname_file(char *path, const LEX_CSTRING *db,
const LEX_CSTRING *trigger_name, myf MyFlags);
extern const char * const TRG_EXT;
extern const char * const TRN_EXT;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment