Commit b3e29fbd authored by antony@ppcg5.local's avatar antony@ppcg5.local

Bug#29019

  "REPLACE/INSERT IGNORE/UPDATE IGNORE doesn't work"
  Federated does not record neccessary HA_EXTRA flags in order to
  support REPLACE/INSERT IGNORE/UPDATE IGNORE.
  Implement ::extra() to capture flags neccessary for functionality.
New function append_ident() to better escape identifiers consistantly.
parent f8b7669e
...@@ -1843,6 +1843,30 @@ C3A4C3B6C3BCC39F ...@@ -1843,6 +1843,30 @@ C3A4C3B6C3BCC39F
D18DD184D184D0B5D0BAD182D0B8D0B2D0BDD183D18E D18DD184D184D0B5D0BAD182D0B8D0B2D0BDD183D18E
drop table federated.t1; drop table federated.t1;
drop table federated.t1; drop table federated.t1;
create table federated.t1 (a int primary key, b varchar(64))
DEFAULT CHARSET=utf8;
create table federated.t1 (a int primary key, b varchar(64))
ENGINE=FEDERATED
connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1'
DEFAULT CHARSET=utf8;
insert ignore into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
a b
1 Larry
2 Curly
truncate federated.t1;
replace into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
a b
1 Moe
2 Curly
update ignore federated.t1 set a=a+1;
select * from federated.t1;
a b
1 Moe
3 Curly
drop table federated.t1;
drop table federated.t1;
DROP TABLE IF EXISTS federated.t1; DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated; DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1; DROP TABLE IF EXISTS federated.t1;
......
...@@ -1576,4 +1576,31 @@ connection slave; ...@@ -1576,4 +1576,31 @@ connection slave;
drop table federated.t1; drop table federated.t1;
#
# BUG#21019 Federated Engine does not support REPLACE/INSERT IGNORE/UPDATE IGNORE
#
connection slave;
create table federated.t1 (a int primary key, b varchar(64))
DEFAULT CHARSET=utf8;
connection master;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval create table federated.t1 (a int primary key, b varchar(64))
ENGINE=FEDERATED
connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1'
DEFAULT CHARSET=utf8;
insert ignore into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
truncate federated.t1;
replace into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
update ignore federated.t1 set a=a+1;
select * from federated.t1;
drop table federated.t1;
connection slave;
drop table federated.t1;
source include/federated_cleanup.inc; source include/federated_cleanup.inc;
...@@ -348,6 +348,10 @@ pthread_mutex_t federated_mutex; // This is the mutex we use to ...@@ -348,6 +348,10 @@ pthread_mutex_t federated_mutex; // This is the mutex we use to
// init the hash // init the hash
static int federated_init= FALSE; // Variable for checking the static int federated_init= FALSE; // Variable for checking the
// init state of hash // init state of hash
static char ident_quote_char= '`'; // Character for quoting
// identifiers
static char value_quote_char= '\''; // Character for quoting
// literals
/* Federated storage engine handlerton */ /* Federated storage engine handlerton */
...@@ -440,6 +444,58 @@ bool federated_db_end() ...@@ -440,6 +444,58 @@ bool federated_db_end()
return FALSE; return FALSE;
} }
/**
@brief Append identifiers to the string.
@param[in,out] string The target string.
@param[in] name Identifier name
@param[in] length Length of identifier name in bytes
@param[in] quote_char Quote char to use for quoting identifier.
@return Operation Status
@retval FALSE OK
@retval TRUE There was an error appending to the string.
@note This function is based upon the append_identifier() function
in sql_show.cc except that quoting always occurs.
*/
static bool append_ident(String *string, const char *name, uint length,
const char quote_char)
{
bool result;
uint clen;
const char *name_end;
DBUG_ENTER("append_ident");
if (quote_char)
{
string->reserve(length * 2 + 2);
if ((result= string->append(&quote_char, 1, system_charset_info)))
goto err;
for (name_end= name+length; name < name_end; name+= clen)
{
uchar c= *(uchar *) name;
if (!(clen= my_mbcharlen(system_charset_info, c)))
clen= 1;
if (clen == 1 && c == (uchar) quote_char &&
(result= string->append(&quote_char, 1, system_charset_info)))
goto err;
if ((result= string->append(name, clen, string->charset())))
goto err;
}
result= string->append(&quote_char, 1, system_charset_info);
}
else
result= string->append(name, length, system_charset_info);
err:
DBUG_RETURN(result);
}
/* /*
Check (in create) whether the tables exists, and that it can be connected to Check (in create) whether the tables exists, and that it can be connected to
...@@ -458,7 +514,6 @@ bool federated_db_end() ...@@ -458,7 +514,6 @@ bool federated_db_end()
static int check_foreign_data_source(FEDERATED_SHARE *share, static int check_foreign_data_source(FEDERATED_SHARE *share,
bool table_create_flag) bool table_create_flag)
{ {
char escaped_table_name[NAME_LEN*2];
char query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
uint error_code; uint error_code;
...@@ -499,7 +554,6 @@ static int check_foreign_data_source(FEDERATED_SHARE *share, ...@@ -499,7 +554,6 @@ static int check_foreign_data_source(FEDERATED_SHARE *share,
} }
else else
{ {
int escaped_table_name_length= 0;
/* /*
Since we do not support transactions at this version, we can let the Since we do not support transactions at this version, we can let the
client API silently reconnect. For future versions, we will need more client API silently reconnect. For future versions, we will need more
...@@ -517,14 +571,8 @@ static int check_foreign_data_source(FEDERATED_SHARE *share, ...@@ -517,14 +571,8 @@ static int check_foreign_data_source(FEDERATED_SHARE *share,
query.append(FEDERATED_SELECT); query.append(FEDERATED_SELECT);
query.append(FEDERATED_STAR); query.append(FEDERATED_STAR);
query.append(FEDERATED_FROM); query.append(FEDERATED_FROM);
query.append(FEDERATED_BTICK); append_ident(&query, share->table_name, share->table_name_length,
escaped_table_name_length= ident_quote_char);
escape_string_for_mysql(&my_charset_bin, (char*)escaped_table_name,
sizeof(escaped_table_name),
share->table_name,
share->table_name_length);
query.append(escaped_table_name, escaped_table_name_length);
query.append(FEDERATED_BTICK);
query.append(FEDERATED_WHERE); query.append(FEDERATED_WHERE);
query.append(FEDERATED_FALSE); query.append(FEDERATED_FALSE);
...@@ -784,9 +832,8 @@ uint ha_federated::convert_row_to_internal_format(byte *record, ...@@ -784,9 +832,8 @@ uint ha_federated::convert_row_to_internal_format(byte *record,
static bool emit_key_part_name(String *to, KEY_PART_INFO *part) static bool emit_key_part_name(String *to, KEY_PART_INFO *part)
{ {
DBUG_ENTER("emit_key_part_name"); DBUG_ENTER("emit_key_part_name");
if (to->append(FEDERATED_BTICK) || if (append_ident(to, part->field->field_name,
to->append(part->field->field_name) || strlen(part->field->field_name), ident_quote_char))
to->append(FEDERATED_BTICK))
DBUG_RETURN(1); // Out of memory DBUG_RETURN(1); // Out of memory
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1309,31 +1356,28 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) ...@@ -1309,31 +1356,28 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
query.append(FEDERATED_SELECT); query.append(FEDERATED_SELECT);
for (field= table->field; *field; field++) for (field= table->field; *field; field++)
{ {
query.append(FEDERATED_BTICK); append_ident(&query, (*field)->field_name,
query.append((*field)->field_name); strlen((*field)->field_name), ident_quote_char);
query.append(FEDERATED_BTICK);
query.append(FEDERATED_COMMA); query.append(FEDERATED_COMMA);
} }
query.length(query.length()- strlen(FEDERATED_COMMA)); query.length(query.length()- strlen(FEDERATED_COMMA));
query.append(FEDERATED_FROM); query.append(FEDERATED_FROM);
query.append(FEDERATED_BTICK);
tmp_share.table_name_length= strlen(tmp_share.table_name);
append_ident(&query, tmp_share.table_name,
tmp_share.table_name_length, ident_quote_char);
if (!(share= (FEDERATED_SHARE *) if (!(share= (FEDERATED_SHARE *)
my_multi_malloc(MYF(MY_WME), my_multi_malloc(MYF(MY_WME),
&share, sizeof(*share), &share, sizeof(*share),
&select_query, &select_query, query.length()+1,
query.length()+table->s->connect_string.length+1,
NullS))) NullS)))
goto error; goto error;
memcpy(share, &tmp_share, sizeof(tmp_share)); memcpy(share, &tmp_share, sizeof(tmp_share));
memcpy(select_query, query.ptr(), query.length()+1);
share->table_name_length= strlen(share->table_name);
/* TODO: share->table_name to LEX_STRING object */
query.append(share->table_name, share->table_name_length);
query.append(FEDERATED_BTICK);
share->select_query= select_query; share->select_query= select_query;
strmov(share->select_query, query.ptr());
share->use_count= 0; share->use_count= 0;
DBUG_PRINT("info", DBUG_PRINT("info",
("share->select_query %s", share->select_query)); ("share->select_query %s", share->select_query));
...@@ -1467,6 +1511,8 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) ...@@ -1467,6 +1511,8 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked)
table->s->reclength); table->s->reclength);
DBUG_PRINT("info", ("ref_length: %u", ref_length)); DBUG_PRINT("info", ("ref_length: %u", ref_length));
reset();
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1579,10 +1625,14 @@ int ha_federated::write_row(byte *buf) ...@@ -1579,10 +1625,14 @@ int ha_federated::write_row(byte *buf)
/* /*
start both our field and field values strings start both our field and field values strings
*/ */
insert_string.append(FEDERATED_INSERT); if (replace_duplicates)
insert_string.append(FEDERATED_BTICK); insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
insert_string.append(share->table_name, share->table_name_length); else if (ignore_duplicates)
insert_string.append(FEDERATED_BTICK); insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
else
insert_string.append(STRING_WITH_LEN("INSERT INTO "));
append_ident(&insert_string, share->table_name,
share->table_name_length, ident_quote_char);
insert_string.append(FEDERATED_OPENPAREN); insert_string.append(FEDERATED_OPENPAREN);
values_string.append(FEDERATED_VALUES); values_string.append(FEDERATED_VALUES);
...@@ -1599,14 +1649,15 @@ int ha_federated::write_row(byte *buf) ...@@ -1599,14 +1649,15 @@ int ha_federated::write_row(byte *buf)
else else
{ {
(*field)->val_str(&insert_field_value_string); (*field)->val_str(&insert_field_value_string);
values_string.append('\''); values_string.append(value_quote_char);
insert_field_value_string.print(&values_string); insert_field_value_string.print(&values_string);
values_string.append('\''); values_string.append(value_quote_char);
insert_field_value_string.length(0); insert_field_value_string.length(0);
} }
/* append the field name */ /* append the field name */
insert_string.append((*field)->field_name); append_ident(&insert_string, (*field)->field_name,
strlen((*field)->field_name), ident_quote_char);
/* append the value */ /* append the value */
values_string.append(insert_field_value_string); values_string.append(insert_field_value_string);
...@@ -1688,9 +1739,8 @@ int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1688,9 +1739,8 @@ int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt)
query.set_charset(system_charset_info); query.set_charset(system_charset_info);
query.append(FEDERATED_OPTIMIZE); query.append(FEDERATED_OPTIMIZE);
query.append(FEDERATED_BTICK); append_ident(&query, share->table_name, share->table_name_length,
query.append(share->table_name, share->table_name_length); ident_quote_char);
query.append(FEDERATED_BTICK);
if (mysql_real_query(mysql, query.ptr(), query.length())) if (mysql_real_query(mysql, query.ptr(), query.length()))
{ {
...@@ -1711,9 +1761,8 @@ int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1711,9 +1761,8 @@ int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt)
query.set_charset(system_charset_info); query.set_charset(system_charset_info);
query.append(FEDERATED_REPAIR); query.append(FEDERATED_REPAIR);
query.append(FEDERATED_BTICK); append_ident(&query, share->table_name, share->table_name_length,
query.append(share->table_name, share->table_name_length); ident_quote_char);
query.append(FEDERATED_BTICK);
if (check_opt->flags & T_QUICK) if (check_opt->flags & T_QUICK)
query.append(FEDERATED_QUICK); query.append(FEDERATED_QUICK);
if (check_opt->flags & T_EXTEND) if (check_opt->flags & T_EXTEND)
...@@ -1788,10 +1837,12 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) ...@@ -1788,10 +1837,12 @@ int ha_federated::update_row(const byte *old_data, byte *new_data)
update_string.length(0); update_string.length(0);
where_string.length(0); where_string.length(0);
update_string.append(FEDERATED_UPDATE); if (ignore_duplicates)
update_string.append(FEDERATED_BTICK); update_string.append(STRING_WITH_LEN("UPDATE IGNORE "));
update_string.append(share->table_name); else
update_string.append(FEDERATED_BTICK); update_string.append(STRING_WITH_LEN("UPDATE "));
append_ident(&update_string, share->table_name,
share->table_name_length, ident_quote_char);
update_string.append(FEDERATED_SET); update_string.append(FEDERATED_SET);
/* /*
...@@ -1806,8 +1857,11 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) ...@@ -1806,8 +1857,11 @@ int ha_federated::update_row(const byte *old_data, byte *new_data)
for (Field **field= table->field; *field; field++) for (Field **field= table->field; *field; field++)
{ {
where_string.append((*field)->field_name); uint field_name_length= strlen((*field)->field_name);
update_string.append((*field)->field_name); append_ident(&where_string, (*field)->field_name, field_name_length,
ident_quote_char);
append_ident(&update_string, (*field)->field_name, field_name_length,
ident_quote_char);
update_string.append(FEDERATED_EQ); update_string.append(FEDERATED_EQ);
if ((*field)->is_null()) if ((*field)->is_null())
...@@ -1816,9 +1870,9 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) ...@@ -1816,9 +1870,9 @@ int ha_federated::update_row(const byte *old_data, byte *new_data)
{ {
/* otherwise = */ /* otherwise = */
(*field)->val_str(&field_value); (*field)->val_str(&field_value);
update_string.append('\''); update_string.append(value_quote_char);
field_value.print(&update_string); field_value.print(&update_string);
update_string.append('\''); update_string.append(value_quote_char);
field_value.length(0); field_value.length(0);
} }
...@@ -1829,9 +1883,9 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) ...@@ -1829,9 +1883,9 @@ int ha_federated::update_row(const byte *old_data, byte *new_data)
where_string.append(FEDERATED_EQ); where_string.append(FEDERATED_EQ);
(*field)->val_str(&field_value, (*field)->val_str(&field_value,
(char*) (old_data + (*field)->offset())); (char*) (old_data + (*field)->offset()));
where_string.append('\''); where_string.append(value_quote_char);
field_value.print(&where_string); field_value.print(&where_string);
where_string.append('\''); where_string.append(value_quote_char);
field_value.length(0); field_value.length(0);
} }
...@@ -1888,16 +1942,16 @@ int ha_federated::delete_row(const byte *buf) ...@@ -1888,16 +1942,16 @@ int ha_federated::delete_row(const byte *buf)
delete_string.length(0); delete_string.length(0);
delete_string.append(FEDERATED_DELETE); delete_string.append(FEDERATED_DELETE);
delete_string.append(FEDERATED_FROM); delete_string.append(FEDERATED_FROM);
delete_string.append(FEDERATED_BTICK); append_ident(&delete_string, share->table_name,
delete_string.append(share->table_name); share->table_name_length, ident_quote_char);
delete_string.append(FEDERATED_BTICK);
delete_string.append(FEDERATED_WHERE); delete_string.append(FEDERATED_WHERE);
for (Field **field= table->field; *field; field++) for (Field **field= table->field; *field; field++)
{ {
Field *cur_field= *field; Field *cur_field= *field;
data_string.length(0); data_string.length(0);
delete_string.append(cur_field->field_name); append_ident(&delete_string, (*field)->field_name,
strlen((*field)->field_name), ident_quote_char);
if (cur_field->is_null()) if (cur_field->is_null())
{ {
...@@ -1907,9 +1961,9 @@ int ha_federated::delete_row(const byte *buf) ...@@ -1907,9 +1961,9 @@ int ha_federated::delete_row(const byte *buf)
{ {
delete_string.append(FEDERATED_EQ); delete_string.append(FEDERATED_EQ);
cur_field->val_str(&data_string); cur_field->val_str(&data_string);
delete_string.append('\''); delete_string.append(value_quote_char);
data_string.print(&delete_string); data_string.print(&delete_string);
delete_string.append('\''); delete_string.append(value_quote_char);
} }
delete_string.append(FEDERATED_AND); delete_string.append(FEDERATED_AND);
...@@ -2411,14 +2465,8 @@ int ha_federated::info(uint flag) ...@@ -2411,14 +2465,8 @@ int ha_federated::info(uint flag)
{ {
status_query_string.length(0); status_query_string.length(0);
status_query_string.append(FEDERATED_INFO); status_query_string.append(FEDERATED_INFO);
status_query_string.append(FEDERATED_SQUOTE); append_ident(&status_query_string, share->table_name,
share->table_name_length, value_quote_char);
escape_string_for_mysql(&my_charset_bin, (char *)escaped_table_name,
sizeof(escaped_table_name),
share->table_name,
share->table_name_length);
status_query_string.append(escaped_table_name);
status_query_string.append(FEDERATED_SQUOTE);
if (mysql_real_query(mysql, status_query_string.ptr(), if (mysql_real_query(mysql, status_query_string.ptr(),
status_query_string.length())) status_query_string.length()))
...@@ -2484,6 +2532,42 @@ int ha_federated::info(uint flag) ...@@ -2484,6 +2532,42 @@ int ha_federated::info(uint flag)
} }
/**
@brief Handles extra signals from MySQL server
@param[in] operation Hint for storage engine
@return Operation Status
@retval 0 OK
*/
int ha_federated::extra(ha_extra_function operation)
{
DBUG_ENTER("ha_federated::extra");
switch (operation) {
case HA_EXTRA_IGNORE_DUP_KEY:
ignore_duplicates= TRUE;
break;
case HA_EXTRA_NO_IGNORE_DUP_KEY:
ignore_duplicates= FALSE;
break;
case HA_EXTRA_WRITE_CAN_REPLACE:
replace_duplicates= TRUE;
break;
case HA_EXTRA_WRITE_CANNOT_REPLACE:
replace_duplicates= FALSE;
break;
case HA_EXTRA_RESET:
ignore_duplicates= FALSE;
replace_duplicates= FALSE;
break;
default:
/* do nothing */
DBUG_PRINT("info",("unhandled operation: %d", (uint) operation));
}
DBUG_RETURN(0);
}
/* /*
Used to delete all rows in a table. Both for cases of truncate and Used to delete all rows in a table. Both for cases of truncate and
for cases where the optimizer realizes that all rows will be for cases where the optimizer realizes that all rows will be
...@@ -2506,9 +2590,8 @@ int ha_federated::delete_all_rows() ...@@ -2506,9 +2590,8 @@ int ha_federated::delete_all_rows()
query.set_charset(system_charset_info); query.set_charset(system_charset_info);
query.append(FEDERATED_TRUNCATE); query.append(FEDERATED_TRUNCATE);
query.append(FEDERATED_BTICK); append_ident(&query, share->table_name, share->table_name_length,
query.append(share->table_name); ident_quote_char);
query.append(FEDERATED_BTICK);
/* /*
TRUNCATE won't return anything in mysql_affected_rows TRUNCATE won't return anything in mysql_affected_rows
......
...@@ -157,6 +157,7 @@ class ha_federated: public handler ...@@ -157,6 +157,7 @@ class ha_federated: public handler
MYSQL_ROW_OFFSET current_position; // Current position used by ::position() MYSQL_ROW_OFFSET current_position; // Current position used by ::position()
int remote_error_number; int remote_error_number;
char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE]; char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE];
bool ignore_duplicates, replace_duplicates;
private: private:
/* /*
...@@ -284,6 +285,7 @@ class ha_federated: public handler ...@@ -284,6 +285,7 @@ class ha_federated: public handler
int rnd_pos(byte *buf, byte *pos); //required int rnd_pos(byte *buf, byte *pos); //required
void position(const byte *record); //required void position(const byte *record); //required
int info(uint); //required int info(uint); //required
int extra(ha_extra_function operation);
void update_auto_increment(void); void update_auto_increment(void);
int repair(THD* thd, HA_CHECK_OPT* check_opt); int repair(THD* thd, HA_CHECK_OPT* check_opt);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment