Bug#28158 - table->read_set is set incorrectly,

        causing wrong error message in Falcon

An error message about a duplicate key could show a wrong key
value when not all columns of the key were used to select the
rows for update.

Some storage engines return a record with only the selected
columns filled.

This is fixed by re-reading the record with a read_set which
includes all columns of the duplicate key after a duplicate key
error happens and before the error message is printed.
parent 4b766950
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t3;
CREATE TABLE t1 ( CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY, pk1 INT NOT NULL PRIMARY KEY,
b INT NOT NULL, b INT NOT NULL,
...@@ -40,3 +42,47 @@ pk1 b c ...@@ -40,3 +42,47 @@ pk1 b c
12 2 2 12 2 2
14 1 1 14 1 1
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (a int, b int, KEY (a, b)) ENGINE=ndbcluster;
CREATE TABLE t2 (a int, b int, UNIQUE KEY (a, b)) ENGINE=ndbcluster;
CREATE TABLE t3 (a int, b int, PRIMARY KEY (a, b)) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (1, 2);
INSERT INTO t1 VALUES (2, 2);
INSERT INTO t2 VALUES (1, 2);
INSERT INTO t2 VALUES (2, 2);
INSERT INTO t3 VALUES (1, 2);
INSERT INTO t3 VALUES (2, 2);
UPDATE t1 SET a = 1;
UPDATE t1 SET a = 1 ORDER BY a;
UPDATE t2 SET a = 1;
ERROR 23000: Duplicate entry '' for key '*UNKNOWN*'
UPDATE t2 SET a = 1 ORDER BY a;
ERROR 23000: Duplicate entry '' for key '*UNKNOWN*'
UPDATE t3 SET a = 1;
ERROR 23000: Duplicate entry '1-2' for key 'PRIMARY'
UPDATE t3 SET a = 1 ORDER BY a;
ERROR 23000: Duplicate entry '1-2' for key 'PRIMARY'
SELECT count(*) FROM t1;
count(*)
2
SELECT count(*) FROM t2;
count(*)
2
SELECT count(*) FROM t3;
count(*)
2
SELECT * FROM t1 ORDER by a;
a b
1 2
1 2
SELECT * FROM t2 ORDER by a;
a b
1 2
2 2
SELECT * FROM t3 ORDER by a;
a b
1 2
2 2
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t3;
End of 5.1 tests
...@@ -3,10 +3,12 @@ ...@@ -3,10 +3,12 @@
--disable_warnings --disable_warnings
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t3;
--enable_warnings --enable_warnings
# #
# Basic test of INSERT in NDB # Basic test of UPDATE in NDB
# #
# #
...@@ -39,3 +41,49 @@ DROP TABLE IF EXISTS t1; ...@@ -39,3 +41,49 @@ DROP TABLE IF EXISTS t1;
--enable_warnings --enable_warnings
# End of 4.1 tests # End of 4.1 tests
#
# Bug#28158: table->read_set is set incorrectly,
# causing wrong error message in Falcon
#
CREATE TABLE t1 (a int, b int, KEY (a, b)) ENGINE=ndbcluster;
CREATE TABLE t2 (a int, b int, UNIQUE KEY (a, b)) ENGINE=ndbcluster;
CREATE TABLE t3 (a int, b int, PRIMARY KEY (a, b)) ENGINE=ndbcluster;
#
INSERT INTO t1 VALUES (1, 2);
INSERT INTO t1 VALUES (2, 2);
#
INSERT INTO t2 VALUES (1, 2);
INSERT INTO t2 VALUES (2, 2);
#
INSERT INTO t3 VALUES (1, 2);
INSERT INTO t3 VALUES (2, 2);
#
UPDATE t1 SET a = 1;
UPDATE t1 SET a = 1 ORDER BY a;
#
--error ER_DUP_ENTRY
UPDATE t2 SET a = 1;
--error ER_DUP_ENTRY
UPDATE t2 SET a = 1 ORDER BY a;
#
--error ER_DUP_ENTRY
UPDATE t3 SET a = 1;
--error ER_DUP_ENTRY
UPDATE t3 SET a = 1 ORDER BY a;
#
SELECT count(*) FROM t1;
SELECT count(*) FROM t2;
SELECT count(*) FROM t3;
SELECT * FROM t1 ORDER by a;
SELECT * FROM t2 ORDER by a;
SELECT * FROM t3 ORDER by a;
#
--disable_warnings
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t3;
--enable_warnings
--echo End of 5.1 tests
...@@ -83,6 +83,75 @@ static bool check_fields(THD *thd, List<Item> &items) ...@@ -83,6 +83,75 @@ static bool check_fields(THD *thd, List<Item> &items)
} }
/**
@brief Re-read record if more columns are needed for error message.
@detail If we got a duplicate key error, we want to write an error
message containing the value of the duplicate key. If we do not have
all fields of the key value in record[0], we need to re-read the
record with a proper read_set.
@param[in] error error number
@param[in] table table
*/
static void prepare_record_for_error_message(int error, TABLE *table)
{
Field **field_p;
Field *field;
uint keynr;
MY_BITMAP unique_map; /* Fields in offended unique. */
my_bitmap_map unique_map_buf[bitmap_buffer_size(MAX_FIELDS)];
DBUG_ENTER("prepare_record_for_error_message");
/*
Only duplicate key errors print the key value.
If storage engine does always read all columns, we have the value alraedy.
*/
if ((error != HA_ERR_FOUND_DUPP_KEY) ||
!(table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ))
DBUG_VOID_RETURN;
/*
Get the number of the offended index.
We will see MAX_KEY if the engine cannot determine the affected index.
*/
if ((keynr= table->file->get_dup_key(error)) >= MAX_KEY)
DBUG_VOID_RETURN;
/* Create unique_map with all fields used by that index. */
bitmap_init(&unique_map, unique_map_buf, table->s->fields, FALSE);
table->mark_columns_used_by_index_no_reset(keynr, &unique_map);
/* Subtract read_set and write_set. */
bitmap_subtract(&unique_map, table->read_set);
bitmap_subtract(&unique_map, table->write_set);
/*
If the unique index uses columns that are neither in read_set
nor in write_set, we must re-read the record.
Otherwise no need to do anything.
*/
if (bitmap_is_clear_all(&unique_map))
DBUG_VOID_RETURN;
/* Get identifier of last read record into table->file->ref. */
table->file->position(table->record[0]);
/* Add all fields used by unique index to read_set. */
bitmap_union(table->read_set, &unique_map);
/* Tell the engine about the new set. */
table->file->column_bitmaps_signal();
/* Read record that is identified by table->file->ref. */
(void) table->file->rnd_pos(table->record[1], table->file->ref);
/* Copy the newly read columns into the new record. */
for (field_p= table->field; (field= *field_p); field_p++)
if (bitmap_is_set(&unique_map, field->field_index))
field->copy_from_tmp(table->s->rec_buff_length);
DBUG_VOID_RETURN;
}
/* /*
Process usual UPDATE Process usual UPDATE
...@@ -470,6 +539,13 @@ int mysql_update(THD *thd, ...@@ -470,6 +539,13 @@ int mysql_update(THD *thd,
else else
will_batch= !table->file->start_bulk_update(); will_batch= !table->file->start_bulk_update();
/*
Assure that we can use position()
if we need to create an error message.
*/
if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ)
table->prepare_for_position();
/* /*
We can use compare_record() to optimize away updates if We can use compare_record() to optimize away updates if
the table handler is returning all columns OR if the table handler is returning all columns OR if
...@@ -573,6 +649,8 @@ int mysql_update(THD *thd, ...@@ -573,6 +649,8 @@ int mysql_update(THD *thd,
*/ */
if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY)) if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
thd->fatal_error(); /* Other handler errors are fatal */ thd->fatal_error(); /* Other handler errors are fatal */
prepare_record_for_error_message(error, table);
table->file->print_error(error,MYF(0)); table->file->print_error(error,MYF(0));
error= 1; error= 1;
break; break;
...@@ -596,13 +674,16 @@ int mysql_update(THD *thd, ...@@ -596,13 +674,16 @@ int mysql_update(THD *thd,
{ {
if (error) if (error)
{ {
/* purecov: begin inspected */
/* /*
The handler should not report error of duplicate keys if they The handler should not report error of duplicate keys if they
are ignored. This is a requirement on batching handlers. are ignored. This is a requirement on batching handlers.
*/ */
prepare_record_for_error_message(error, table);
table->file->print_error(error,MYF(0)); table->file->print_error(error,MYF(0));
error= 1; error= 1;
break; break;
/* purecov: end */
} }
/* /*
Either an error was found and we are ignoring errors or there Either an error was found and we are ignoring errors or there
...@@ -668,9 +749,12 @@ int mysql_update(THD *thd, ...@@ -668,9 +749,12 @@ int mysql_update(THD *thd,
in the batched update. in the batched update.
*/ */
{ {
/* purecov: begin inspected */
thd->fatal_error(); thd->fatal_error();
prepare_record_for_error_message(loc_error, table);
table->file->print_error(loc_error,MYF(0)); table->file->print_error(loc_error,MYF(0));
error= 1; error= 1;
/* purecov: end */
} }
else else
updated-= dup_key_found; updated-= dup_key_found;
...@@ -1540,6 +1624,8 @@ bool multi_update::send_data(List<Item> &not_used_values) ...@@ -1540,6 +1624,8 @@ bool multi_update::send_data(List<Item> &not_used_values)
*/ */
if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY)) if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
thd->fatal_error(); /* Other handler errors are fatal */ thd->fatal_error(); /* Other handler errors are fatal */
prepare_record_for_error_message(error, table);
table->file->print_error(error,MYF(0)); table->file->print_error(error,MYF(0));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -1676,7 +1762,7 @@ int multi_update::do_updates(bool from_send_error) ...@@ -1676,7 +1762,7 @@ int multi_update::do_updates(bool from_send_error)
ha_rows org_updated; ha_rows org_updated;
TABLE *table, *tmp_table; TABLE *table, *tmp_table;
List_iterator_fast<TABLE> check_opt_it(unupdated_check_opt_tables); List_iterator_fast<TABLE> check_opt_it(unupdated_check_opt_tables);
DBUG_ENTER("do_updates"); DBUG_ENTER("multi_update::do_updates");
do_update= 0; // Don't retry this function do_update= 0; // Don't retry this function
if (!found) if (!found)
...@@ -1819,6 +1905,7 @@ int multi_update::do_updates(bool from_send_error) ...@@ -1819,6 +1905,7 @@ int multi_update::do_updates(bool from_send_error)
if (!from_send_error) if (!from_send_error)
{ {
thd->fatal_error(); thd->fatal_error();
prepare_record_for_error_message(local_error, table);
table->file->print_error(local_error,MYF(0)); table->file->print_error(local_error,MYF(0));
} }
...@@ -1849,6 +1936,7 @@ bool multi_update::send_eof() ...@@ -1849,6 +1936,7 @@ bool multi_update::send_eof()
{ {
char buff[STRING_BUFFER_USUAL_SIZE]; char buff[STRING_BUFFER_USUAL_SIZE];
ulonglong id; ulonglong id;
DBUG_ENTER("multi_update::send_eof");
thd->proc_info="updating reference tables"; thd->proc_info="updating reference tables";
/* Does updates for the last n - 1 tables, returns 0 if ok */ /* Does updates for the last n - 1 tables, returns 0 if ok */
...@@ -1904,7 +1992,7 @@ bool multi_update::send_eof() ...@@ -1904,7 +1992,7 @@ bool multi_update::send_eof()
/* Safety: If we haven't got an error before (can happen in do_updates) */ /* Safety: If we haven't got an error before (can happen in do_updates) */
my_message(ER_UNKNOWN_ERROR, "An error occured in multi-table update", my_message(ER_UNKNOWN_ERROR, "An error occured in multi-table update",
MYF(0)); MYF(0));
return TRUE; DBUG_RETURN(TRUE);
} }
id= thd->arg_of_last_insert_id_function ? id= thd->arg_of_last_insert_id_function ?
...@@ -1914,5 +2002,5 @@ bool multi_update::send_eof() ...@@ -1914,5 +2002,5 @@ bool multi_update::send_eof()
thd->row_count_func= thd->row_count_func=
(thd->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated; (thd->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated;
::send_ok(thd, (ulong) thd->row_count_func, id, buff); ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
return FALSE; DBUG_RETURN(FALSE);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment