Commit 42f8548f authored by Aleksey Midenkov's avatar Aleksey Midenkov

MDEV-25091 CREATE TABLE: field references qualified by a wrong table name succeed

Before FRM is written walk vcol expressions through
check_table_name_processor() and check if field items match (db,
table_name) qualifier.

We cannot do this in check_vcol_func_processor() as there is already
no table name qualifiers in expressions of written and loaded FRM.
parent 4649ba74
...@@ -50,3 +50,41 @@ t1 CREATE TABLE "t1" ( ...@@ -50,3 +50,41 @@ t1 CREATE TABLE "t1" (
) )
drop table t1; drop table t1;
set session sql_mode=@OLD_SQL_MODE; set session sql_mode=@OLD_SQL_MODE;
#
# MDEV-25091 CREATE TABLE: field references qualified by a wrong table name succeed
#
create table t2 (x int);
create table t1 (x int, y int generated always as (t2.x));
ERROR 42S22: Unknown column '`t2`.`x`' in 'GENERATED ALWAYS'
create table t1 (x int, y int check (y > t2.x));
ERROR 42S22: Unknown column '`t2`.`x`' in 'CHECK'
create table t1 (x int, y int default t2.x);
ERROR 42S22: Unknown column '`t2`.`x`' in 'DEFAULT'
create table t1 (x int, check (t2.x > 0));
ERROR 42S22: Unknown column '`t2`.`x`' in 'CHECK'
create table t1 (x int);
alter table t1 add column y int generated always as (t2.x);
ERROR 42S22: Unknown column '`t2`.`x`' in 'GENERATED ALWAYS'
alter table t1 add column y int check (z > t2.x);
ERROR 42S22: Unknown column '`t2`.`x`' in 'CHECK'
alter table t1 add column y int default t2.x;
ERROR 42S22: Unknown column '`t2`.`x`' in 'DEFAULT'
alter table t1 add constraint check (t2.x > 0);
ERROR 42S22: Unknown column '`t2`.`x`' in 'CHECK'
create or replace table t1 (x int, y int generated always as (t1.x));
create or replace table t1 (x int, y int check (y > t1.x));
create or replace table t1 (x int, y int default t1.x);
create or replace table t1 (x int, check (t1.x > 0));
create or replace table t1 (x int, y int generated always as (test.t1.x));
create or replace table t1 (x int, y int check (y > test.t1.x));
create or replace table t1 (x int, y int default test.t1.x);
create or replace table t1 (x int, check (test.t1.x > 0));
drop tables t1, t2;
create table t1 (x int, y int generated always as (test2.t1.x));
ERROR 42S22: Unknown column '`test2`.`t1`.`x`' in 'GENERATED ALWAYS'
create table t1 (x int, y int check (y > test2.t1.x));
ERROR 42S22: Unknown column '`test2`.`t1`.`x`' in 'CHECK'
create table t1 (x int, y int default test2.t1.x);
ERROR 42S22: Unknown column '`test2`.`t1`.`x`' in 'DEFAULT'
create table t1 (x int, check (test2.t1.x > 0));
ERROR 42S22: Unknown column '`test2`.`t1`.`x`' in 'CHECK'
...@@ -28,3 +28,47 @@ show create table t1; ...@@ -28,3 +28,47 @@ show create table t1;
drop table t1; drop table t1;
set session sql_mode=@OLD_SQL_MODE; set session sql_mode=@OLD_SQL_MODE;
--echo #
--echo # MDEV-25091 CREATE TABLE: field references qualified by a wrong table name succeed
--echo #
create table t2 (x int);
--error ER_BAD_FIELD_ERROR
create table t1 (x int, y int generated always as (t2.x));
--error ER_BAD_FIELD_ERROR
create table t1 (x int, y int check (y > t2.x));
--error ER_BAD_FIELD_ERROR
create table t1 (x int, y int default t2.x);
--error ER_BAD_FIELD_ERROR
create table t1 (x int, check (t2.x > 0));
create table t1 (x int);
--error ER_BAD_FIELD_ERROR
alter table t1 add column y int generated always as (t2.x);
--error ER_BAD_FIELD_ERROR
alter table t1 add column y int check (z > t2.x);
--error ER_BAD_FIELD_ERROR
alter table t1 add column y int default t2.x;
--error ER_BAD_FIELD_ERROR
alter table t1 add constraint check (t2.x > 0);
create or replace table t1 (x int, y int generated always as (t1.x));
create or replace table t1 (x int, y int check (y > t1.x));
create or replace table t1 (x int, y int default t1.x);
create or replace table t1 (x int, check (t1.x > 0));
create or replace table t1 (x int, y int generated always as (test.t1.x));
create or replace table t1 (x int, y int check (y > test.t1.x));
create or replace table t1 (x int, y int default test.t1.x);
create or replace table t1 (x int, check (test.t1.x > 0));
drop tables t1, t2;
--error ER_BAD_FIELD_ERROR
create table t1 (x int, y int generated always as (test2.t1.x));
--error ER_BAD_FIELD_ERROR
create table t1 (x int, y int check (y > test2.t1.x));
--error ER_BAD_FIELD_ERROR
create table t1 (x int, y int default test2.t1.x);
--error ER_BAD_FIELD_ERROR
create table t1 (x int, check (test2.t1.x > 0));
...@@ -1719,6 +1719,15 @@ class Item: public Value_source, ...@@ -1719,6 +1719,15 @@ class Item: public Value_source,
return 0; return 0;
} }
/**
Check db/table_name if they defined in item and match arg values
@param arg Pointer to Check_table_name_prm structure
@retval true Match failed
@retval false Match succeeded
*/
virtual bool check_table_name_processor(void *arg) { return false; }
/* /*
TRUE if the expression depends only on the table indicated by tab_map TRUE if the expression depends only on the table indicated by tab_map
or can be converted to such an exression using equalities. or can be converted to such an exression using equalities.
...@@ -1850,6 +1859,15 @@ class Item: public Value_source, ...@@ -1850,6 +1859,15 @@ class Item: public Value_source,
bool collect; bool collect;
}; };
struct Check_table_name_prm
{
LEX_CSTRING db;
LEX_CSTRING table_name;
String field;
Check_table_name_prm(LEX_CSTRING _db, LEX_CSTRING _table_name) :
db(_db), table_name(_table_name) {}
};
/* /*
For SP local variable returns pointer to Item representing its For SP local variable returns pointer to Item representing its
current value and pointer to current Item otherwise. current value and pointer to current Item otherwise.
...@@ -2820,6 +2838,24 @@ class Item_field :public Item_ident, ...@@ -2820,6 +2838,24 @@ class Item_field :public Item_ident,
} }
return mark_unsupported_function(field_name, arg, VCOL_FIELD_REF); return mark_unsupported_function(field_name, arg, VCOL_FIELD_REF);
} }
bool check_table_name_processor(void *arg)
{
Check_table_name_prm &p= *(Check_table_name_prm *) arg;
if (p.table_name.length && table_name)
{
DBUG_ASSERT(p.db.length);
if ((db_name &&
my_strcasecmp(table_alias_charset, p.db.str, db_name)) ||
my_strcasecmp(table_alias_charset, p.table_name.str, table_name))
{
print(&p.field, (enum_query_type) (QT_ITEM_ORIGINAL_FUNC_NULLIF |
QT_NO_DATA_EXPANSION |
QT_TO_SYSTEM_CHARSET));
return true;
}
}
return false;
}
void cleanup(); void cleanup();
Item_equal *get_item_equal() { return item_equal; } Item_equal *get_item_equal() { return item_equal; }
void set_item_equal(Item_equal *item_eq) { item_equal= item_eq; } void set_item_equal(Item_equal *item_eq) { item_equal= item_eq; }
......
...@@ -3207,6 +3207,23 @@ struct LEX: public Query_tables_list ...@@ -3207,6 +3207,23 @@ struct LEX: public Query_tables_list
} }
return false; return false;
} }
bool create_like() const
{
DBUG_ASSERT(!create_info.like() || !select_lex.item_list.elements);
return create_info.like();
}
bool create_select() const
{
DBUG_ASSERT(!create_info.like() || !select_lex.item_list.elements);
return select_lex.item_list.elements;
}
bool create_simple() const
{
return !create_like() && !create_select();
}
}; };
......
...@@ -76,7 +76,9 @@ static int copy_data_between_tables(THD *thd, TABLE *from,TABLE *to, ...@@ -76,7 +76,9 @@ static int copy_data_between_tables(THD *thd, TABLE *from,TABLE *to,
static bool prepare_blob_field(THD *thd, Column_definition *sql_field); static bool prepare_blob_field(THD *thd, Column_definition *sql_field);
static int mysql_prepare_create_table(THD *, HA_CREATE_INFO *, Alter_info *, static int mysql_prepare_create_table(THD *, HA_CREATE_INFO *, Alter_info *,
uint *, handler *, KEY **, uint *, int); uint *, handler *, KEY **, uint *, int,
const LEX_CSTRING db,
const LEX_CSTRING table_name);
static uint blob_length_by_type(enum_field_types type); static uint blob_length_by_type(enum_field_types type);
static bool fix_constraints_names(THD *thd, List<Virtual_column_info> static bool fix_constraints_names(THD *thd, List<Virtual_column_info>
*check_constraint_list); *check_constraint_list);
...@@ -1809,10 +1811,12 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags) ...@@ -1809,10 +1811,12 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
strxmov(shadow_frm_name, shadow_path, reg_ext, NullS); strxmov(shadow_frm_name, shadow_path, reg_ext, NullS);
if (flags & WFRM_WRITE_SHADOW) if (flags & WFRM_WRITE_SHADOW)
{ {
LEX_CSTRING db= { lpt->db, strlen(lpt->db) };
LEX_CSTRING table_name= { lpt->table_name, strlen(lpt->table_name) };
if (mysql_prepare_create_table(lpt->thd, lpt->create_info, lpt->alter_info, if (mysql_prepare_create_table(lpt->thd, lpt->create_info, lpt->alter_info,
&lpt->db_options, lpt->table->file, &lpt->db_options, lpt->table->file,
&lpt->key_info_buffer, &lpt->key_count, &lpt->key_info_buffer, &lpt->key_count,
C_ALTER_TABLE)) C_ALTER_TABLE, db, table_name))
{ {
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
...@@ -3231,7 +3235,8 @@ static int ...@@ -3231,7 +3235,8 @@ static int
mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info, mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info,
Alter_info *alter_info, uint *db_options, Alter_info *alter_info, uint *db_options,
handler *file, KEY **key_info_buffer, handler *file, KEY **key_info_buffer,
uint *key_count, int create_table_mode) uint *key_count, int create_table_mode,
const LEX_CSTRING db, const LEX_CSTRING table_name)
{ {
const char *key_name; const char *key_name;
Create_field *sql_field,*dup_field; Create_field *sql_field,*dup_field;
...@@ -3246,6 +3251,7 @@ mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info, ...@@ -3246,6 +3251,7 @@ mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info,
uint total_uneven_bit_length= 0; uint total_uneven_bit_length= 0;
int select_field_count= C_CREATE_SELECT(create_table_mode); int select_field_count= C_CREATE_SELECT(create_table_mode);
bool tmp_table= create_table_mode == C_ALTER_TABLE; bool tmp_table= create_table_mode == C_ALTER_TABLE;
const bool create_simple= thd->lex->create_simple();
DBUG_ENTER("mysql_prepare_create_table"); DBUG_ENTER("mysql_prepare_create_table");
LEX_STRING* connect_string = &create_info->connect_string; LEX_STRING* connect_string = &create_info->connect_string;
...@@ -4138,6 +4144,7 @@ mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info, ...@@ -4138,6 +4144,7 @@ mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info,
create_info->null_bits= null_fields; create_info->null_bits= null_fields;
/* Check fields. */ /* Check fields. */
Item::Check_table_name_prm walk_prm(db, table_name);
it.rewind(); it.rewind();
while ((sql_field=it++)) while ((sql_field=it++))
{ {
...@@ -4182,6 +4189,37 @@ mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info, ...@@ -4182,6 +4189,37 @@ mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info,
my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name); my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
if (create_simple)
{
/*
NOTE: we cannot do this in check_vcol_func_processor() as there is already
no table name qualifier in expression.
*/
if (sql_field->vcol_info &&
sql_field->vcol_info->expr->walk(&Item::check_table_name_processor,
false, (void *) &walk_prm))
{
my_error(ER_BAD_FIELD_ERROR, MYF(0), walk_prm.field.c_ptr(), "GENERATED ALWAYS");
DBUG_RETURN(TRUE);
}
if (sql_field->default_value &&
sql_field->default_value->expr->walk(&Item::check_table_name_processor,
false, (void *) &walk_prm))
{
my_error(ER_BAD_FIELD_ERROR, MYF(0), walk_prm.field.c_ptr(), "DEFAULT");
DBUG_RETURN(TRUE);
}
if (sql_field->check_constraint &&
sql_field->check_constraint->expr->walk(&Item::check_table_name_processor,
false, (void *) &walk_prm))
{
my_error(ER_BAD_FIELD_ERROR, MYF(0), walk_prm.field.c_ptr(), "CHECK");
DBUG_RETURN(TRUE);
}
}
} }
/* Check table level constraints */ /* Check table level constraints */
...@@ -4191,6 +4229,12 @@ mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info, ...@@ -4191,6 +4229,12 @@ mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info,
Virtual_column_info *check; Virtual_column_info *check;
while ((check= c_it++)) while ((check= c_it++))
{ {
if (create_simple && check->expr->walk(&Item::check_table_name_processor, false,
(void *) &walk_prm))
{
my_error(ER_BAD_FIELD_ERROR, MYF(0), walk_prm.field.c_ptr(), "CHECK");
DBUG_RETURN(TRUE);
}
if (!check->name.length || check->automatic_name) if (!check->name.length || check->automatic_name)
continue; continue;
...@@ -4477,6 +4521,8 @@ handler *mysql_create_frm_image(THD *thd, ...@@ -4477,6 +4521,8 @@ handler *mysql_create_frm_image(THD *thd,
{ {
uint db_options; uint db_options;
handler *file; handler *file;
const LEX_CSTRING new_db= { db, strlen(db) };
const LEX_CSTRING new_table_name= { table_name, strlen(table_name) };
DBUG_ENTER("mysql_create_frm_image"); DBUG_ENTER("mysql_create_frm_image");
if (!alter_info->create_list.elements) if (!alter_info->create_list.elements)
...@@ -4700,7 +4746,7 @@ handler *mysql_create_frm_image(THD *thd, ...@@ -4700,7 +4746,7 @@ handler *mysql_create_frm_image(THD *thd,
if (mysql_prepare_create_table(thd, create_info, alter_info, &db_options, if (mysql_prepare_create_table(thd, create_info, alter_info, &db_options,
file, key_info, key_count, file, key_info, key_count,
create_table_mode)) create_table_mode, new_db, new_table_name))
goto err; goto err;
create_info->table_options=db_options; create_info->table_options=db_options;
...@@ -7024,13 +7070,15 @@ bool mysql_compare_tables(TABLE *table, ...@@ -7024,13 +7070,15 @@ bool mysql_compare_tables(TABLE *table,
Alter_info tmp_alter_info(*alter_info, thd->mem_root); Alter_info tmp_alter_info(*alter_info, thd->mem_root);
uint db_options= 0; /* not used */ uint db_options= 0; /* not used */
KEY *key_info_buffer= NULL; KEY *key_info_buffer= NULL;
LEX_CSTRING db= { table->s->db.str, table->s->db.length };
LEX_CSTRING table_name= { table->s->db.str, table->s->table_name.length };
/* Create the prepared information. */ /* Create the prepared information. */
int create_table_mode= table->s->tmp_table == NO_TMP_TABLE ? int create_table_mode= table->s->tmp_table == NO_TMP_TABLE ?
C_ORDINARY_CREATE : C_ALTER_TABLE; C_ORDINARY_CREATE : C_ALTER_TABLE;
if (mysql_prepare_create_table(thd, create_info, &tmp_alter_info, if (mysql_prepare_create_table(thd, create_info, &tmp_alter_info,
&db_options, table->file, &key_info_buffer, &db_options, table->file, &key_info_buffer,
&key_count, create_table_mode)) &key_count, create_table_mode, db, table_name))
DBUG_RETURN(1); DBUG_RETURN(1);
/* Some very basic checks. */ /* Some very basic checks. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment