Commit 38238355 authored by unknown's avatar unknown

BUG#4775 "Duplicate key requires rollback of transaction" - Improved error...

BUG#4775 "Duplicate key requires rollback of transaction" - Improved error message telling that transaction is aborted
BUG#4312 "wrong behaviour on insert .. on duplicate key" functionality disabled


mysql-test/r/ndb_insert.result:
  New tests for fduplicate inserts in combination with transaction
  New tests for INSERT IGNORE and REPLACE
mysql-test/t/ndb_insert.test:
  New tests for fduplicate inserts in combination with transaction
  New tests for INSERT IGNORE and REPLACE
ndb/src/ndbapi/NdbConnection.cpp:
  Return error 4350 "Transaction already aborted" if execute(Commit) is called when theCommitStatus==Aborted
  Add DBUG_PRINT's
ndb/src/ndbapi/ndberror.c:
  Add new error message indicating that the transaction already has been aborted.
sql/ha_ndbcluster.cc:
  Map all error code 0 to 1 in order to catch errors caused by NdbApi returning -1 without having set an error code.
  Use ndb object in THD in get_error_message
  BUG# 4312 Return HA_ERR_WRONG_COMMAND if extra(HA_EXTRA_IGNORE_DUP_KEY) is called
  Only use writeTuple if command is REPLACE
sql/ha_ndbcluster.h:
  Added member variable to keep track of when HA_EXTRA_IGNORE_DUP_KEY is used, but NDB can't support it.
parent 373f1b70
......@@ -420,12 +420,130 @@ INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
ERROR 23000: Duplicate entry '10' for key 1
select count(*) from t1;
count(*)
2000
begin;
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
ERROR 23000: Duplicate entry '10' for key 1
commit;
ERROR HY000: Got error 4350 'Transaction already aborted' from ndbcluster
select * from t1 where pk1=1;
pk1 b c
1 1 1
select * from t1 where pk1=10;
pk1 b c
10 10 10
select count(*) from t1 where pk1 <= 10 order by pk1;
count(*)
11
select count(*) from t1;
count(*)
2000
begin;
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
ERROR 23000: Duplicate entry '10' for key 1
rollback;
select * from t1 where pk1=1;
pk1 b c
1 1 1
select * from t1 where pk1=10;
pk1 b c
10 10 10
select count(*) from t1 where pk1 <= 10 order by pk1;
count(*)
11
select count(*) from t1;
count(*)
2000
begin;
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
ERROR 23000: Duplicate entry '10' for key 1
SELECT * FROM t1 WHERE pk1=10;
ERROR HY000: Got error 4350 'Transaction already aborted' from ndbcluster
rollback;
select * from t1 where pk1=1;
pk1 b c
1 1 1
select * from t1 where pk1=10;
pk1 b c
10 10 10
select count(*) from t1 where pk1 <= 10 order by pk1;
count(*)
11
select count(*) from t1;
count(*)
2000
begin;
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
ERROR 23000: Duplicate entry '10' for key 1
SELECT * FROM t1 WHERE pk1=10;
ERROR HY000: Got error 4350 'Transaction already aborted' from ndbcluster
SELECT * FROM t1 WHERE pk1=10;
ERROR HY000: Got error 4350 'Transaction already aborted' from ndbcluster
commit;
ERROR HY000: Got error 4350 'Transaction already aborted' from ndbcluster
select * from t1 where pk1=1;
pk1 b c
1 1 1
select * from t1 where pk1=10;
pk1 b c
10 10 10
select count(*) from t1 where pk1 <= 10 order by pk1;
count(*)
11
select count(*) from t1;
count(*)
2000
begin;
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
ERROR 23000: Duplicate entry '10' for key 1
INSERT INTO t1 values (4000, 40, 44);
ERROR HY000: Got error 4350 'Transaction already aborted' from ndbcluster
rollback;
select * from t1 where pk1=1;
pk1 b c
1 1 1
select * from t1 where pk1=10;
pk1 b c
10 10 10
select count(*) from t1 where pk1 <= 10 order by pk1;
count(*)
11
select count(*) from t1;
count(*)
2000
insert into t1 select * from t1 where b < 10 order by pk1;
ERROR 23000: Duplicate entry '9' for key 1
begin;
INSERT IGNORE INTO t1 VALUES(1,2,3);
ERROR HY000: Table storage engine for 't1' doesn't have this option
commit;
select * from t1 where pk1=1;
pk1 b c
1 1 1
INSERT IGNORE INTO t1 VALUES(1,2,3);
ERROR HY000: Table storage engine for 't1' doesn't have this option
select * from t1 where pk1=1;
pk1 b c
1 1 1
REPLACE INTO t1 values(1, 2, 3);
select * from t1 where pk1=1;
pk1 b c
1 2 3
INSERT INTO t1 VALUES(1,1,1) ON DUPLICATE KEY UPDATE b=79;
ERROR HY000: Table storage engine for 't1' doesn't have this option
select * from t1 where pk1=1;
pk1 b c
1 2 3
DROP TABLE t1;
......@@ -437,20 +437,117 @@ INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
select count(*) from t1;
#
# Insert duplicate rows, inside transaction
# try to commit
#
begin;
--error 1062
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
--error 1296
commit;
select * from t1 where pk1=1;
select * from t1 where pk1=10;
select count(*) from t1 where pk1 <= 10 order by pk1;
select count(*) from t1;
#
# Insert duplicate rows, inside transaction
# rollback
#
begin;
--error 1062
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
rollback;
select * from t1 where pk1=1;
select * from t1 where pk1=10;
select count(*) from t1 where pk1 <= 10 order by pk1;
select count(*) from t1;
#
# Insert duplicate rows, inside transaction
# then try to select, finally rollback
#
begin;
--error 1062
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
--error 1296
SELECT * FROM t1 WHERE pk1=10;
rollback;
select * from t1 where pk1=1;
select * from t1 where pk1=10;
select count(*) from t1 where pk1 <= 10 order by pk1;
select count(*) from t1;
#
# Insert duplicate rows, inside transaction
# then try to select, finally commit
#
begin;
--error 1062
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
--error 1296
SELECT * FROM t1 WHERE pk1=10;
--error 1296
SELECT * FROM t1 WHERE pk1=10;
--error 1296
commit;
select * from t1 where pk1=1;
select * from t1 where pk1=10;
select count(*) from t1 where pk1 <= 10 order by pk1;
select count(*) from t1;
#
# Insert duplicate rows, inside transaction
# then try to do another insert
#
begin;
--error 1062
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
--error 1296
INSERT INTO t1 values (4000, 40, 44);
rollback;
select * from t1 where pk1=1;
select * from t1 where pk1=10;
select count(*) from t1 where pk1 <= 10 order by pk1;
select count(*) from t1;
#
# Insert duplicate rows using "insert .. select"
......@@ -459,4 +556,21 @@ commit;
insert into t1 select * from t1 where b < 10 order by pk1;
begin;
--error 1031
INSERT IGNORE INTO t1 VALUES(1,2,3);
commit;
select * from t1 where pk1=1;
--error 1031
INSERT IGNORE INTO t1 VALUES(1,2,3);
select * from t1 where pk1=1;
REPLACE INTO t1 values(1, 2, 3);
select * from t1 where pk1=1;
--error 1031
INSERT INTO t1 VALUES(1,1,1) ON DUPLICATE KEY UPDATE b=79;
select * from t1 where pk1=1;
DROP TABLE t1;
......@@ -145,27 +145,29 @@ NdbConnection::init()
}//NdbConnection::init()
/*****************************************************************************
setOperationErrorCode(int anErrorCode);
setOperationErrorCode(int error);
Remark: Sets an error code on the connection object from an
operation object.
*****************************************************************************/
void
NdbConnection::setOperationErrorCode(int anErrorCode)
NdbConnection::setOperationErrorCode(int error)
{
if (theError.code == 0)
theError.code = anErrorCode;
}//NdbConnection::setOperationErrorCode()
DBUG_ENTER("NdbConnection::setOperationErrorCode");
setErrorCode(error);
DBUG_VOID_RETURN;
}
/*****************************************************************************
setOperationErrorCodeAbort(int anErrorCode);
setOperationErrorCodeAbort(int error);
Remark: Sets an error code on the connection object from an
operation object.
*****************************************************************************/
void
NdbConnection::setOperationErrorCodeAbort(int anErrorCode)
NdbConnection::setOperationErrorCodeAbort(int error)
{
DBUG_ENTER("NdbConnection::setOperationErrorCodeAbort");
if (theTransactionIsStarted == false) {
theCommitStatus = Aborted;
} else if ((m_abortOption == AbortOnError) &&
......@@ -173,9 +175,9 @@ NdbConnection::setOperationErrorCodeAbort(int anErrorCode)
(theCommitStatus != Aborted)) {
theCommitStatus = NeedAbort;
}//if
if (theError.code == 0)
theError.code = anErrorCode;
}//NdbConnection::setOperationErrorCodeAbort()
setErrorCode(error);
DBUG_VOID_RETURN;
}
/*****************************************************************************
setErrorCode(int anErrorCode);
......@@ -183,10 +185,15 @@ setErrorCode(int anErrorCode);
Remark: Sets an error indication on the connection object.
*****************************************************************************/
void
NdbConnection::setErrorCode(int anErrorCode)
NdbConnection::setErrorCode(int error)
{
DBUG_ENTER("NdbConnection::setErrorCode");
DBUG_PRINT("enter", ("error: %d, theError.code: %d", error, theError.code));
if (theError.code == 0)
theError.code = anErrorCode;
theError.code = error;
DBUG_VOID_RETURN;
}//NdbConnection::setErrorCode()
int
......@@ -262,8 +269,12 @@ NdbConnection::execute(ExecType aTypeOfExec,
AbortOption abortOption,
int forceSend)
{
DBUG_ENTER("NdbConnection::execute");
DBUG_PRINT("enter", ("aTypeOfExec: %d, abortOption: %d",
aTypeOfExec, abortOption));
if (! theBlobFlag)
return executeNoBlobs(aTypeOfExec, abortOption, forceSend);
DBUG_RETURN(executeNoBlobs(aTypeOfExec, abortOption, forceSend));
/*
* execute prepared ops in batches, as requested by blobs
......@@ -346,7 +357,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
}
} while (theFirstOpInList != NULL || tExecType != aTypeOfExec);
return ret;
DBUG_RETURN(ret);
}
int
......@@ -354,6 +365,10 @@ NdbConnection::executeNoBlobs(ExecType aTypeOfExec,
AbortOption abortOption,
int forceSend)
{
DBUG_ENTER("NdbConnection::executeNoBlobs");
DBUG_PRINT("enter", ("aTypeOfExec: %d, abortOption: %d",
aTypeOfExec, abortOption));
//------------------------------------------------------------------------
// We will start by preparing all operations in the transaction defined
// since last execute or since beginning. If this works ok we will continue
......@@ -376,7 +391,7 @@ NdbConnection::executeNoBlobs(ExecType aTypeOfExec,
*/
ndbout << "This timeout should never occur, execute(..)" << endl;
setOperationErrorCodeAbort(4012); // Error code for "Cluster Failure"
return -1;
DBUG_RETURN(-1);
}//if
/*
......@@ -400,13 +415,13 @@ NdbConnection::executeNoBlobs(ExecType aTypeOfExec,
}
#endif
if (theReturnStatus == ReturnFailure) {
return -1;
DBUG_RETURN(-1);
}//if
break;
}
}
thePendingBlobOps = 0;
return 0;
DBUG_RETURN(0);
}//NdbConnection::execute()
/*****************************************************************************
......@@ -430,9 +445,15 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
void* anyObject,
AbortOption abortOption)
{
DBUG_ENTER("NdbConnection::executeAsynchPrepare");
DBUG_PRINT("enter", ("aTypeOfExec: %d, aCallback: %x, anyObject: %x",
aTypeOfExec, aCallback, anyObject));
/**
* Reset error.code on execute
*/
if (theError.code != 0)
DBUG_PRINT("enter", ("Resetting error %d on execute", theError.code));
theError.code = 0;
NdbScanOperation* tcOp = m_theFirstScanOperation;
if (tcOp != 0){
......@@ -441,7 +462,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
int tReturnCode;
tReturnCode = tcOp->executeCursor(theDBnode);
if (tReturnCode == -1) {
return;
DBUG_VOID_RETURN;
}//if
tcOp = (NdbScanOperation*)tcOp->next();
} // while
......@@ -463,17 +484,6 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
theCallbackFunction = aCallback;
theCallbackObject = anyObject;
m_abortOption = abortOption;
// SendStatusType tSendStatus = theSendStatus;
// if (tSendStatus != InitState) {
/****************************************************************************
* The application is obviously doing strange things. We should probably
* report to the application the problem in some manner. Since we don't have
* a good way of handling the problem we avoid discovering the problem.
* Should be handled at some point in time.
****************************************************************************/
// return;
// }
m_waitForReply = true;
tNdb->thePreparedTransactionsArray[tnoOfPreparedTransactions] = this;
theTransArrayIndex = tnoOfPreparedTransactions;
......@@ -502,7 +512,11 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
} else {
theSendStatus = sendABORTfail;
}//if
return;
if (theCommitStatus == Aborted){
DBUG_PRINT("exit", ("theCommitStatus: Aborted"));
setErrorCode(4350);
}
DBUG_VOID_RETURN;
}//if
if (tTransactionIsStarted == true) {
if (tLastOp != NULL) {
......@@ -520,7 +534,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
* We will use the commit method.
*********************************************************************/
theSendStatus = sendCOMMITstate;
return;
DBUG_VOID_RETURN;
} else {
/**********************************************************************
* We need to put it into the array of completed transactions to
......@@ -532,7 +546,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
* put it into the completed array.
**********************************************************************/
theSendStatus = sendCompleted;
return; // No Commit with no operations is OK
DBUG_VOID_RETURN; // No Commit with no operations is OK
}//if
}//if
} else if (tTransactionIsStarted == false) {
......@@ -560,7 +574,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
* will put it into the completed array.
***********************************************************************/
theSendStatus = sendCompleted;
return;
DBUG_VOID_RETURN;
}//if
}
......@@ -573,7 +587,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
tReturnCode = tOp->prepareSend(theTCConPtr, theTransactionId);
if (tReturnCode == -1) {
theSendStatus = sendABORTfail;
return;
DBUG_VOID_RETURN;
}//if
/*************************************************************************
......@@ -596,7 +610,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
theNoOfOpSent = 0;
theNoOfOpCompleted = 0;
theSendStatus = sendOperations;
return;
DBUG_VOID_RETURN;
}//NdbConnection::executeAsynchPrepare()
void NdbConnection::close()
......@@ -665,6 +679,8 @@ Remark: Send all operations belonging to this connection.
int
NdbConnection::doSend()
{
DBUG_ENTER("NdbConnection::doSend");
/*
This method assumes that at least one operation have been defined. This
is ensured by the caller of this routine (=execute).
......@@ -687,7 +703,7 @@ NdbConnection::doSend()
theSendStatus = sendTC_OP;
theTransactionIsStarted = true;
tNdb->insert_sent_list(this);
return 0;
DBUG_RETURN(0);
}//case
case sendABORT:
case sendABORTfail:{
......@@ -699,18 +715,18 @@ NdbConnection::doSend()
theReturnStatus = ReturnFailure;
}//if
if (sendROLLBACK() == 0) {
return 0;
DBUG_RETURN(0);
}//if
break;
}//case
case sendCOMMITstate:
if (sendCOMMIT() == 0) {
return 0;
DBUG_RETURN(0);
}//if
break;
case sendCompleted:
theNdb->insert_completed_list(this);
return 0;
DBUG_RETURN(0);
default:
ndbout << "Inconsistent theSendStatus = " << theSendStatus << endl;
abort();
......@@ -720,7 +736,7 @@ NdbConnection::doSend()
theReleaseOnClose = true;
theTransactionIsStarted = false;
theCommitStatus = Aborted;
return -1;
DBUG_RETURN(-1);
}//NdbConnection::doSend()
/**************************************************************************
......
......@@ -228,6 +228,7 @@ ErrorBundle ErrorCodes[] = {
{ 4347, IE, "Bad state at alter index" },
{ 4348, IE, "Inconsistency detected at alter index" },
{ 4349, IE, "Inconsistency detected at index usage" },
{ 4350, IE, "Transaction already aborted" },
/**
* Application error
......
......@@ -121,6 +121,8 @@ static const err_code_mapping err_map[]=
{ 827, HA_ERR_RECORD_FILE_FULL },
{ 832, HA_ERR_RECORD_FILE_FULL },
{ 0, 1 },
{ -1, -1 }
};
......@@ -149,8 +151,6 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans)
{
int res;
const NdbError err= trans->getNdbError();
if (!err.code)
return 0; // Don't log things to DBUG log if no error
DBUG_ENTER("ndb_err");
ERR_PRINT(err);
......@@ -186,10 +186,11 @@ bool ha_ndbcluster::get_error_message(int error,
DBUG_ENTER("ha_ndbcluster::get_error_message");
DBUG_PRINT("enter", ("error: %d", error));
if (!m_ndb)
Ndb* ndb = (Ndb*)current_thd->transaction.ndb;
if (!ndb)
DBUG_RETURN(false);
const NdbError err= m_ndb->getNdbError(error);
const NdbError err= ndb->getNdbError(error);
bool temporary= err.status==NdbError::TemporaryError;
buf->set(err.message, strlen(err.message), &my_charset_bin);
DBUG_PRINT("exit", ("message: %s, temporary: %d", buf->ptr(), temporary));
......@@ -1429,6 +1430,11 @@ int ha_ndbcluster::write_row(byte *record)
int res;
DBUG_ENTER("write_row");
if(m_ignore_dup_key_not_supported)
{
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
}
statistic_increment(ha_write_count,&LOCK_status);
if (table->timestamp_default_now)
update_timestamp(record+table->timestamp_default_now-1);
......@@ -2352,14 +2358,20 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
break;
case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/
DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
if (current_thd->lex->sql_command == SQLCOM_REPLACE)
{
DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
m_use_write= TRUE;
} else
{
m_ignore_dup_key_not_supported= TRUE;
}
break;
case HA_EXTRA_NO_IGNORE_DUP_KEY:
DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY"));
DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
m_use_write= false;
m_ignore_dup_key_not_supported= false;
break;
case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those
where field->query_id is the same as
......@@ -3228,6 +3240,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
HA_NO_PREFIX_CHAR_KEYS),
m_share(0),
m_use_write(false),
m_ignore_dup_key_not_supported(false),
retrieve_all_fields(FALSE),
rows_to_insert(1),
rows_inserted(0),
......
......@@ -226,6 +226,7 @@ class ha_ndbcluster: public handler
typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write;
bool m_ignore_dup_key_not_supported;
bool retrieve_all_fields;
ha_rows rows_to_insert;
ha_rows rows_inserted;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment