Commit 776c9ef0 authored by unknown's avatar unknown

Bug#25817 UPDATE IGNORE doesn't check write_set when checking unique indexes: Added checks

parent b0d289ca
...@@ -39,4 +39,12 @@ pk1 b c ...@@ -39,4 +39,12 @@ pk1 b c
10 0 0 10 0 0
12 2 2 12 2 2
14 1 1 14 1 1
create unique index ib on t1(b);
update t1 set c = 4 where pk1 = 12;
update ignore t1 set b = 55 where pk1 = 14;
select * from t1 order by pk1;
pk1 b c
10 0 0
12 2 4
14 55 1
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1;
...@@ -33,6 +33,11 @@ UPDATE IGNORE t1 set pk1 = 1, c = 2 where pk1 = 4; ...@@ -33,6 +33,11 @@ UPDATE IGNORE t1 set pk1 = 1, c = 2 where pk1 = 4;
select * from t1 order by pk1; select * from t1 order by pk1;
UPDATE t1 set pk1 = pk1 + 10; UPDATE t1 set pk1 = pk1 + 10;
select * from t1 order by pk1; select * from t1 order by pk1;
# bug#25817
create unique index ib on t1(b);
update t1 set c = 4 where pk1 = 12;
update ignore t1 set b = 55 where pk1 = 14;
select * from t1 order by pk1;
--disable_warnings --disable_warnings
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1;
......
...@@ -1356,6 +1356,30 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *rec ...@@ -1356,6 +1356,30 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *rec
DBUG_RETURN(0); DBUG_RETURN(0);
} }
bool ha_ndbcluster::check_index_fields_in_write_set(uint keyno)
{
KEY* key_info= table->key_info + keyno;
KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts;
uint i;
DBUG_ENTER("check_index_fields_in_write_set");
if (m_retrieve_all_fields)
{
DBUG_RETURN(true);
}
for (i= 0; key_part != end; key_part++, i++)
{
Field* field= key_part->field;
if (field->query_id != current_thd->query_id)
{
DBUG_RETURN(false);
}
}
DBUG_RETURN(true);
}
int ha_ndbcluster::set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno) int ha_ndbcluster::set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno)
{ {
KEY* key_info= table->key_info + keyno; KEY* key_info= table->key_info + keyno;
...@@ -1643,7 +1667,8 @@ check_null_in_record(const KEY* key_info, const byte *record) ...@@ -1643,7 +1667,8 @@ check_null_in_record(const KEY* key_info, const byte *record)
* primary key or unique index values * primary key or unique index values
*/ */
int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk) int ha_ndbcluster::peek_indexed_rows(const byte *record,
NDB_WRITE_OP write_op)
{ {
NdbTransaction *trans= m_active_trans; NdbTransaction *trans= m_active_trans;
NdbOperation *op; NdbOperation *op;
...@@ -1656,7 +1681,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk) ...@@ -1656,7 +1681,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk)
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
first= NULL; first= NULL;
if (check_pk && table->s->primary_key != MAX_KEY) if (write_op != NDB_UPDATE && table->s->primary_key != MAX_KEY)
{ {
/* /*
* Fetch any row with colliding primary key * Fetch any row with colliding primary key
...@@ -1690,6 +1715,12 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk) ...@@ -1690,6 +1715,12 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk)
DBUG_PRINT("info", ("skipping check for key with NULL")); DBUG_PRINT("info", ("skipping check for key with NULL"));
continue; continue;
} }
if (write_op != NDB_INSERT && !check_index_fields_in_write_set(i))
{
DBUG_PRINT("info", ("skipping check for key %u not in write_set", i));
continue;
}
NdbIndexOperation *iop; NdbIndexOperation *iop;
NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index; NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
key_part= key_info->key_part; key_part= key_info->key_part;
...@@ -2268,7 +2299,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2268,7 +2299,7 @@ int ha_ndbcluster::write_row(byte *record)
start_bulk_insert will set parameters to ensure that each start_bulk_insert will set parameters to ensure that each
write_row is committed individually write_row is committed individually
*/ */
int peek_res= peek_indexed_rows(record, true); int peek_res= peek_indexed_rows(record, NDB_INSERT);
if (!peek_res) if (!peek_res)
{ {
...@@ -2456,7 +2487,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -2456,7 +2487,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (m_ignore_dup_key && (thd->lex->sql_command == SQLCOM_UPDATE || if (m_ignore_dup_key && (thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI)) thd->lex->sql_command == SQLCOM_UPDATE_MULTI))
{ {
int peek_res= peek_indexed_rows(new_data, pk_update); NDB_WRITE_OP write_op= (pk_update) ? NDB_PK_UPDATE : NDB_UPDATE;
int peek_res= peek_indexed_rows(new_data, write_op);
if (!peek_res) if (!peek_res)
{ {
......
...@@ -59,6 +59,12 @@ typedef struct ndb_index_data { ...@@ -59,6 +59,12 @@ typedef struct ndb_index_data {
bool null_in_unique_index; bool null_in_unique_index;
} NDB_INDEX_DATA; } NDB_INDEX_DATA;
typedef enum ndb_write_op {
NDB_INSERT = 0,
NDB_UPDATE = 1,
NDB_PK_UPDATE = 2
} NDB_WRITE_OP;
typedef struct st_ndbcluster_share { typedef struct st_ndbcluster_share {
THR_LOCK lock; THR_LOCK lock;
pthread_mutex_t mutex; pthread_mutex_t mutex;
...@@ -251,7 +257,7 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -251,7 +257,7 @@ static void set_tabname(const char *pathname, char *tabname);
const NdbOperation *first, const NdbOperation *first,
const NdbOperation *last, const NdbOperation *last,
uint errcode); uint errcode);
int peek_indexed_rows(const byte *record, bool check_pk); int peek_indexed_rows(const byte *record, NDB_WRITE_OP write_op);
int unique_index_read(const byte *key, uint key_len, int unique_index_read(const byte *key, uint key_len,
byte *buf); byte *buf);
int ordered_index_scan(const key_range *start_key, int ordered_index_scan(const key_range *start_key,
...@@ -286,6 +292,7 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -286,6 +292,7 @@ static void set_tabname(const char *pathname, char *tabname);
int get_ndb_blobs_value(NdbBlob *last_ndb_blob, my_ptrdiff_t ptrdiff); int get_ndb_blobs_value(NdbBlob *last_ndb_blob, my_ptrdiff_t ptrdiff);
int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key_from_record(NdbOperation *op, const byte *record); int set_primary_key_from_record(NdbOperation *op, const byte *record);
bool check_index_fields_in_write_set(uint keyno);
int set_index_key_from_record(NdbOperation *op, const byte *record, int set_index_key_from_record(NdbOperation *op, const byte *record,
uint keyno); uint keyno);
int set_bounds(NdbIndexScanOperation*, const key_range *keys[2], uint= 0); int set_bounds(NdbIndexScanOperation*, const key_range *keys[2], uint= 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment