Commit 5c62e940 authored by Dmitry Shulga's avatar Dmitry Shulga

MDEV-34551: Column list in the trigger definition

Added support of the clause `update of <columns>` by the parser
and initial implementation of UPDATE statement processing.
parent 10b88590
......@@ -9247,7 +9247,7 @@ fill_record_n_invoke_before_triggers(THD *thd, TABLE *table,
if (!result && triggers)
{
if (triggers->process_triggers(thd, event, TRG_ACTION_BEFORE,
TRUE) ||
true, &fields) ||
not_null_fields_have_null_values(table))
return TRUE;
......
......@@ -1590,6 +1590,11 @@ struct st_trg_chistics: public st_trg_execution_order
const char *ordering_clause_begin;
const char *ordering_clause_end;
/*
List of column names of a table on that the ON UPDATE trigger
must be fired.
*/
List<LEX_CSTRING> *on_update_col_names;
};
enum xa_option_words {XA_NONE, XA_JOIN, XA_RESUME, XA_ONE_PHASE,
......
......@@ -1220,6 +1220,48 @@ bool Trigger::add_to_file_list(void* param_arg)
}
/**
Check that there is a column in ON UPDATE trigger matching with some of
the table's column from UPDATE statement.
@param fields to be updated by the UPDATE statement
@return true in case there is a column in the target table that matches one
of columns specified by a trigger definition or no columns were
specified for the trigger at all, else return false.
*/
bool Trigger::match_updatable_columns(List<Item> &fields)
{
DBUG_ASSERT(event == TRG_EVENT_UPDATE);
/*
No table columns were specified in OF col1, col2 ... colN of
the statement CREATE TRIGGER BEFORE/AFTER UPDATE. It means that this
ON UPDATE trigger can't be fired on every UPDATE statement involving
the target table.
*/
if (!updatable_columns || updatable_columns->is_empty())
return true;
List_iterator_fast<Item> fields_it(fields);
List_iterator_fast<LEX_CSTRING> columns_it(*updatable_columns);
LEX_CSTRING *column_name;
Item_field *field;
while ((field= (Item_field*)fields_it++))
{
while ((column_name= columns_it++))
{
if (field->field_name.streq(*column_name))
return true;
}
}
return false;
}
/**
Deletes the .TRG file for a table.
......@@ -1538,6 +1580,54 @@ bool Table_triggers_list::prepare_record_accessors(TABLE *table)
}
/**
Deep copy of on update columns list created on parsing a trigger definition.
The destination list and its elements are allocated on table's memory root.
@param table_mem_root table mem_root from where a memory is allocated.
@param [out] dst_col_names destination list where to copy an original one
@param src_col_names source list that has to be copied
@return false on success, true on OOM error
*/
static bool
copy_on_update_columns_list(MEM_ROOT *table_mem_root,
List<LEX_CSTRING> **dst_col_names,
List<LEX_CSTRING> *src_col_names)
{
if (!src_col_names || src_col_names->is_empty())
{
*dst_col_names= nullptr;
return false;
}
List<LEX_CSTRING> *result= new (table_mem_root) List<LEX_CSTRING>();
if (!result)
return true; // OOM
List_iterator_fast<LEX_CSTRING> columns_it(*src_col_names);
LEX_CSTRING *column_name;
while ((column_name= columns_it++))
{
LEX_CSTRING *cname= (LEX_CSTRING*)alloc_root(table_mem_root,
sizeof(LEX_CSTRING));
if (!cname)
return true; // OOM
*cname= safe_lexcstrdup_root(table_mem_root, *column_name);
if (!cname->str ||
result->push_back(cname, table_mem_root))
return true; // OOM
}
*dst_col_names= result;
return false;
}
/**
Check whenever .TRG file for table exist and load all triggers it contains.
......@@ -1733,6 +1823,11 @@ bool Table_triggers_list::check_n_load(THD *thd, const LEX_CSTRING *db,
trigger->connection_cl_name= creation_ctx->get_connection_cl()->coll_name;
trigger->db_cl_name= creation_ctx->get_db_cl()->coll_name;
if (copy_on_update_columns_list(&table->mem_root,
&trigger->updatable_columns,
lex.trg_chistics.on_update_col_names))
goto err_with_lex_cleanup;
/* event can only be TRG_EVENT_MAX in case of fatal parse errors */
if (lex.trg_chistics.event != TRG_EVENT_MAX)
trigger_list->add_trigger(lex.trg_chistics.event,
......@@ -2481,7 +2576,8 @@ bool Table_triggers_list::change_table_name(THD *thd,
bool Table_triggers_list::process_triggers(THD *thd,
trg_event_type event,
trg_action_time_type time_type,
bool old_row_is_record1)
bool old_row_is_record1,
List<Item> *fields_in_update_stmt)
{
bool err_status;
Sub_statement_state statement_state;
......@@ -2522,6 +2618,17 @@ bool Table_triggers_list::process_triggers(THD *thd,
do {
thd->lex->current_select= NULL;
/*
For BEFORE UPDATE trigger check that table fields specified by
the UPDATE statement matches with column names defined in FOR UPDATE
trigger definition, if any.
*/
if (event == TRG_EVENT_UPDATE &&
fields_in_update_stmt &&
!trigger->match_updatable_columns(*fields_in_update_stmt))
continue;
err_status=
trigger->body->execute_trigger(thd,
&trigger_table->s->db,
......
......@@ -30,6 +30,8 @@ struct TABLE_LIST;
class Query_tables_list;
typedef struct st_ddl_log_state DDL_LOG_STATE;
#include <sql_list.h>
/** Event on which trigger is invoked. */
enum trg_event_type
{
......@@ -115,7 +117,13 @@ class Trigger :public Sql_alloc
{
public:
Trigger(Table_triggers_list *base_arg, sp_head *code):
base(base_arg), body(code), next(0), action_order(0)
base(base_arg), body(code), next(0),
sql_mode{0},
hr_create_time{(unsigned long long)-1},
event{TRG_EVENT_MAX},
action_time{TRG_ACTION_MAX},
action_order(0),
updatable_columns{nullptr}
{
bzero((char *)&subject_table_grants, sizeof(subject_table_grants));
}
......@@ -141,6 +149,7 @@ class Trigger :public Sql_alloc
trg_event_type event;
trg_action_time_type action_time;
uint action_order;
List<LEX_CSTRING> *updatable_columns;
void get_trigger_info(LEX_CSTRING *stmt, LEX_CSTRING *body,
LEX_STRING *definer);
......@@ -148,6 +157,8 @@ class Trigger :public Sql_alloc
bool change_on_table_name(void* param_arg);
bool change_table_name(void* param_arg);
bool add_to_file_list(void* param_arg);
bool match_updatable_columns(List<Item> &fields);
};
typedef bool (Trigger::*Triggers_processor)(void *arg);
......@@ -248,7 +259,8 @@ class Table_triggers_list: public Sql_alloc
String *stmt_query, DDL_LOG_STATE *ddl_log_state);
bool process_triggers(THD *thd, trg_event_type event,
trg_action_time_type time_type,
bool old_row_is_record1);
bool old_row_is_record1,
List<Item> *fields_in_update_stmt= nullptr);
void empty_lists();
bool create_lists_needed_for_files(MEM_ROOT *root);
bool save_trigger_file(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table_name);
......
......@@ -1065,7 +1065,8 @@ bool Sql_cmd_update::update_single_table(THD *thd)
if (table->triggers &&
unlikely(table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
TRG_ACTION_AFTER, TRUE)))
TRG_ACTION_AFTER, true,
fields)))
{
error= 1;
break;
......
......@@ -8309,7 +8309,7 @@ persistent_index_stat_spec:
table_column_list:
/* empty */
{}
| ident
| ident
{
Lex->column_list->push_back((LEX_STRING*)
thd->memdup(&$1, sizeof(LEX_STRING)), thd->mem_root);
......@@ -18189,6 +18189,52 @@ trigger_follows_precedes_clause:
}
;
opt_on_update_cols:
/* empty */
{
Lex->trg_chistics.on_update_col_names= NULL;
}
| OF_SYM on_update_cols
{
if (Lex->trg_chistics.event != TRG_EVENT_UPDATE)
{
thd->parse_error(ER_SYNTAX_ERROR, $1.pos());
MYSQL_YYABORT;
}
}
;
on_update_cols:
ident
{
List<LEX_CSTRING> *col_names_list;
LEX_CSTRING *col_name;
col_names_list=
new (thd->mem_root) List<LEX_CSTRING>;
col_name= (LEX_CSTRING *) thd->memdup(&$1, sizeof(LEX_CSTRING));
if (unlikely(col_names_list == NULL) ||
unlikely(col_name == NULL) ||
unlikely(col_names_list->push_back(col_name, thd->mem_root)))
MYSQL_YYABORT;
Lex->trg_chistics.on_update_col_names= col_names_list;
}
| on_update_cols ',' ident
{
LEX_CSTRING *col_name;
col_name= (LEX_CSTRING *) thd->memdup(&$3, sizeof(LEX_CSTRING));
if (unlikely(col_name == NULL) ||
unlikely(Lex->trg_chistics.on_update_col_names->push_back
(col_name, thd->mem_root)))
MYSQL_YYABORT;
}
;
trigger_tail:
remember_name
opt_if_not_exists
......@@ -18199,15 +18245,16 @@ trigger_tail:
sp_name
trg_action_time
trg_event
opt_on_update_cols
ON
remember_name /* $8 */
{ /* $9 */
remember_name /* $9 */
{ /* $10 */
Lex->raw_trg_on_table_name_begin= YYLIP->get_tok_start();
}
table_ident /* $10 */
table_ident /* $11 */
FOR_SYM
remember_name /* $12 */
{ /* $13 */
remember_name /* $13 */
{ /* $14 */
Lex->raw_trg_on_table_name_end= YYLIP->get_tok_start();
}
EACH_SYM
......@@ -18215,8 +18262,8 @@ trigger_tail:
{
Lex->trg_chistics.ordering_clause_begin= YYLIP->get_cpp_ptr();
}
trigger_follows_precedes_clause /* $17 */
{ /* $18 */
trigger_follows_precedes_clause /* $18 */
{ /* $19 */
LEX *lex= thd->lex;
Lex_input_stream *lip= YYLIP;
......@@ -18224,10 +18271,10 @@ trigger_tail:
my_yyabort_error((ER_SP_NO_RECURSIVE_CREATE, MYF(0), "TRIGGER"));
lex->stmt_definition_begin= $1;
lex->ident.str= $8;
lex->ident.length= $12 - $8;
lex->ident.str= $9;
lex->ident.length= $13 - $9;
lex->spname= $4;
(*static_cast<st_trg_execution_order*>(&lex->trg_chistics))= ($17);
(*static_cast<st_trg_execution_order*>(&lex->trg_chistics))= ($18);
lex->trg_chistics.ordering_clause_end= lip->get_cpp_ptr();
if (unlikely(!lex->make_sp_head(thd, $4, &sp_handler_trigger,
......@@ -18236,8 +18283,8 @@ trigger_tail:
lex->sphead->set_body_start(thd, lip->get_cpp_tok_start());
}
sp_proc_stmt /* $19 */ force_lookahead /* $20 */
{ /* $21 */
sp_proc_stmt /* $20 */ force_lookahead /* $21 */
{ /* $22 */
LEX *lex= Lex;
lex->sql_command= SQLCOM_CREATE_TRIGGER;
......@@ -18250,7 +18297,7 @@ trigger_tail:
lex->query_tables can be wiped out.
*/
if (!lex->first_select_lex()->
add_table_to_list(thd, $10, (LEX_CSTRING*) 0,
add_table_to_list(thd, $11, (LEX_CSTRING*) 0,
TL_OPTION_UPDATING, TL_READ_NO_INSERT,
MDL_SHARED_NO_WRITE))
MYSQL_YYABORT;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment