Commit 7015003c authored by mskold@mysql.com's avatar mskold@mysql.com

Fix for Bug#17431 INSERT IGNORE INTO returns failed: 1296: err 4350 'Transaction already aborted'

parent a945c58f
...@@ -1242,6 +1242,7 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info, ...@@ -1242,6 +1242,7 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info,
{ {
char unique_index_name[FN_LEN]; char unique_index_name[FN_LEN];
static const char* unique_suffix= "$unique"; static const char* unique_suffix= "$unique";
m_has_unique_index= TRUE;
strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS); strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name)); DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name));
const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname); const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname);
...@@ -1268,7 +1269,7 @@ int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error) ...@@ -1268,7 +1269,7 @@ int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error)
KEY* key_info= tab->key_info; KEY* key_info= tab->key_info;
const char **key_name= tab->s->keynames.type_names; const char **key_name= tab->s->keynames.type_names;
DBUG_ENTER("ha_ndbcluster::open_indexes"); DBUG_ENTER("ha_ndbcluster::open_indexes");
m_has_unique_index= FALSE;
for (i= 0; i < tab->s->keys; i++, key_info++, key_name++) for (i= 0; i < tab->s->keys; i++, key_info++, key_name++)
{ {
if ((error= add_index_handle(thd, dict, key_info, *key_name, i))) if ((error= add_index_handle(thd, dict, key_info, *key_name, i)))
...@@ -1570,6 +1571,25 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *rec ...@@ -1570,6 +1571,25 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *rec
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int ha_ndbcluster::set_index_key_from_record(NdbOperation *op,
const byte *record, uint keyno)
{
KEY* key_info= table->key_info + keyno;
KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts;
uint i;
DBUG_ENTER("set_index_key_from_record");
for (i= 0; key_part != end; key_part++, i++)
{
Field* field= key_part->field;
if (set_ndb_key(op, field, m_index[keyno].unique_index_attrid_map[i],
record+key_part->offset))
ERR_RETURN(m_active_trans->getNdbError());
}
DBUG_RETURN(0);
}
int int
ha_ndbcluster::set_index_key(NdbOperation *op, ha_ndbcluster::set_index_key(NdbOperation *op,
const KEY *key_info, const KEY *key_info,
...@@ -1778,46 +1798,154 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data, ...@@ -1778,46 +1798,154 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data,
} }
/* /*
Peek to check if a particular row already exists * Check that all operations between first and last all
* have gotten the errcode
* If checking for HA_ERR_KEY_NOT_FOUND then update m_dupkey
* for all succeeding operations
*/
bool ha_ndbcluster::check_all_operations_for_error(NdbTransaction *trans,
const NdbOperation *first,
const NdbOperation *last,
uint errcode)
{
const NdbOperation *op= first;
DBUG_ENTER("ha_ndbcluster::check_all_operations_for_error");
while(op)
{
NdbError err= op->getNdbError();
if (err.status != NdbError::Success)
{
if (ndb_to_mysql_error(&err) != (int) errcode)
DBUG_RETURN(false);
if (op == last) break;
op= trans->getNextCompletedOperation(op);
}
else
{
// We found a duplicate
if (op->getType() == NdbOperation::UniqueIndexAccess)
{
if (errcode == HA_ERR_KEY_NOT_FOUND)
{
NdbIndexOperation *iop= (NdbIndexOperation *) op;
const NDBINDEX *index= iop->getIndex();
// Find the key_no of the index
for(uint i= 0; i<table->s->keys; i++)
{
if (m_index[i].unique_index == index)
{
m_dupkey= i;
break;
}
}
}
}
else
{
// Must have been primary key access
DBUG_ASSERT(op->getType() == NdbOperation::PrimaryKeyAccess);
if (errcode == HA_ERR_KEY_NOT_FOUND)
m_dupkey= table->s->primary_key;
}
DBUG_RETURN(false);
}
}
DBUG_RETURN(true);
}
/*
* Peek to check if any rows already exist with conflicting
* primary key or unique index values
*/ */
int ha_ndbcluster::peek_row(const byte *record) int ha_ndbcluster::peek_indexed_rows(const byte *record)
{ {
NdbTransaction *trans= m_active_trans; NdbTransaction *trans= m_active_trans;
NdbOperation *op; NdbOperation *op;
DBUG_ENTER("peek_row"); const NdbOperation *first, *last;
uint i;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
int res; int res;
if ((res= set_primary_key_from_record(op, record))) DBUG_ENTER("peek_indexed_rows");
ERR_RETURN(trans->getNdbError());
if (m_use_partition_function) NdbOperation::LockMode lm= NdbOperation::LM_Read;
first= NULL;
if (table->s->primary_key != MAX_KEY)
{ {
uint32 part_id; /*
int error; * Fetch any row with colliding primary key
longlong func_value; */
if ((error= m_part_info->get_partition_id(m_part_info, &part_id, if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
&func_value))) op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
first= op;
if ((res= set_primary_key_from_record(op, record)))
ERR_RETURN(trans->getNdbError());
if (m_use_partition_function)
{ {
DBUG_RETURN(error); uint32 part_id;
int error;
longlong func_value;
if ((error= m_part_info->get_partition_id(m_part_info, &part_id,
&func_value)))
{
DBUG_RETURN(error);
}
op->setPartitionId(part_id);
} }
op->setPartitionId(part_id);
} }
/*
* Fetch any rows with colliding unique indexes
*/
KEY* key_info;
KEY_PART_INFO *key_part, *end;
for (i= 0, key_info= table->key_info; i < table->s->keys; i++, key_info++)
{
if (i != table->s->primary_key &&
key_info->flags & HA_NOSAME)
{
// A unique index is defined on table
NdbIndexOperation *iop;
NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
key_part= key_info->key_part;
end= key_part + key_info->key_parts;
if (!(iop= trans->getNdbIndexOperation(unique_index,
(const NDBTAB *) m_table)) ||
iop->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
if (execute_no_commit_ie(this,trans) != 0) if (!first)
first= iop;
if ((res= set_index_key_from_record(iop, record, i)))
ERR_RETURN(trans->getNdbError());
}
}
last= trans->getLastDefinedOperation();
if (first)
res= execute_no_commit_ie(this,trans);
else
{
// Table has no keys
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
}
if (check_all_operations_for_error(trans, first, last,
HA_ERR_KEY_NOT_FOUND))
{ {
table->status= STATUS_NOT_FOUND; table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
else
{
DBUG_PRINT("info", ("m_dupkey %d", m_dupkey));
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* /*
Read one record from NDB using unique secondary index Read one record from NDB using unique secondary index
*/ */
...@@ -2312,13 +2440,33 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2312,13 +2440,33 @@ int ha_ndbcluster::write_row(byte *record)
DBUG_ENTER("ha_ndbcluster::write_row"); DBUG_ENTER("ha_ndbcluster::write_row");
m_write_op= TRUE; m_write_op= TRUE;
if (!m_use_write && m_ignore_dup_key && table_share->primary_key != MAX_KEY) has_auto_increment= (table->next_number_field && record == table->record[0]);
if (table_share->primary_key != MAX_KEY)
{
/*
* Increase any auto_incremented primary key
*/
if (has_auto_increment)
{
THD *thd= table->in_use;
m_skip_auto_increment= FALSE;
update_auto_increment();
/* Ensure that handler is always called for auto_increment values */
thd->next_insert_id= 0;
m_skip_auto_increment= !auto_increment_column_changed;
}
}
/*
* If IGNORE the ignore constraint violations on primary and unique keys
*/
if (!m_use_write && m_ignore_dup_key)
{ {
int peek_res= peek_row(record); int peek_res= peek_indexed_rows(record);
if (!peek_res) if (!peek_res)
{ {
m_dupkey= table_share->primary_key;
DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
} }
if (peek_res != HA_ERR_KEY_NOT_FOUND) if (peek_res != HA_ERR_KEY_NOT_FOUND)
...@@ -2328,7 +2476,6 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2328,7 +2476,6 @@ int ha_ndbcluster::write_row(byte *record)
statistic_increment(thd->status_var.ha_write_count, &LOCK_status); statistic_increment(thd->status_var.ha_write_count, &LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time(); table->timestamp_field->set_time();
has_auto_increment= (table->next_number_field && record == table->record[0]);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table))) if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
...@@ -2369,17 +2516,6 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2369,17 +2516,6 @@ int ha_ndbcluster::write_row(byte *record)
{ {
int res; int res;
if (has_auto_increment)
{
THD *thd= table->in_use;
m_skip_auto_increment= FALSE;
update_auto_increment();
/* Ensure that handler is always called for auto_increment values */
thd->next_insert_id= 0;
m_skip_auto_increment= !auto_increment_column_changed;
}
if ((res= set_primary_key_from_record(op, record))) if ((res= set_primary_key_from_record(op, record)))
return res; return res;
} }
...@@ -3465,7 +3601,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) ...@@ -3465,7 +3601,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
break; break;
case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/ case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/
DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY")); DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
if (current_thd->lex->sql_command == SQLCOM_REPLACE) if (current_thd->lex->sql_command == SQLCOM_REPLACE && !m_has_unique_index)
{ {
DBUG_PRINT("info", ("Turning ON use of write instead of insert")); DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
m_use_write= TRUE; m_use_write= TRUE;
...@@ -5139,6 +5275,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE_SHARE *table_arg): ...@@ -5139,6 +5275,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE_SHARE *table_arg):
m_sorted(FALSE), m_sorted(FALSE),
m_use_write(FALSE), m_use_write(FALSE),
m_ignore_dup_key(FALSE), m_ignore_dup_key(FALSE),
m_has_unique_index(FALSE),
m_primary_key_update(FALSE), m_primary_key_update(FALSE),
m_ignore_no_key(FALSE), m_ignore_no_key(FALSE),
m_rows_to_insert((ha_rows) 1), m_rows_to_insert((ha_rows) 1),
......
...@@ -738,7 +738,11 @@ private: ...@@ -738,7 +738,11 @@ private:
part_id_range *part_spec); part_id_range *part_spec);
int full_table_scan(byte * buf); int full_table_scan(byte * buf);
int peek_row(const byte *record); bool check_all_operations_for_error(NdbTransaction *trans,
const NdbOperation *first,
const NdbOperation *last,
uint errcode);
int peek_indexed_rows(const byte *record);
int unique_index_read(const byte *key, uint key_len, int unique_index_read(const byte *key, uint key_len,
byte *buf); byte *buf);
int fetch_next(NdbScanOperation* op); int fetch_next(NdbScanOperation* op);
...@@ -766,6 +770,8 @@ private: ...@@ -766,6 +770,8 @@ private:
int get_ndb_blobs_value(NdbBlob *last_ndb_blob); int get_ndb_blobs_value(NdbBlob *last_ndb_blob);
int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key_from_record(NdbOperation *op, const byte *record); int set_primary_key_from_record(NdbOperation *op, const byte *record);
int set_index_key_from_record(NdbOperation *op, const byte *record,
uint keyno);
int set_bounds(NdbIndexScanOperation*, uint inx, bool rir, int set_bounds(NdbIndexScanOperation*, uint inx, bool rir,
const key_range *keys[2], uint= 0); const key_range *keys[2], uint= 0);
int key_cmp(uint keynr, const byte * old_row, const byte * new_row); int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
...@@ -832,6 +838,7 @@ private: ...@@ -832,6 +838,7 @@ private:
bool m_sorted; bool m_sorted;
bool m_use_write; bool m_use_write;
bool m_ignore_dup_key; bool m_ignore_dup_key;
bool m_has_unique_index;
bool m_primary_key_update; bool m_primary_key_update;
bool m_write_op; bool m_write_op;
bool m_ignore_no_key; bool m_ignore_no_key;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment