diff --git a/storage/ndb/include/kernel/signaldata/ScanTab.hpp b/storage/ndb/include/kernel/signaldata/ScanTab.hpp index 571fc374eab789621fbcc2a3c0965850ba6c3fcd..0074078533f97f4a8bc4b9513ca2d74762f94d72 100644 --- a/storage/ndb/include/kernel/signaldata/ScanTab.hpp +++ b/storage/ndb/include/kernel/signaldata/ScanTab.hpp @@ -115,16 +115,16 @@ private: z = Descending (TUX) - 1 Bit 14 x = Range Scan (TUX) - 1 Bit 15 b = Scan batch - 10 Bit 16-25 (max 1023) - d = Distribution key flag - n = No disk flag + d = Distribution key flag - 1 Bit 26 + n = No disk flag - 1 Bit 9 1111111111222222222233 01234567890123456789012345678901 - pppppppplnhcktzxbbbbbbbbbb + pppppppplnhcktzxbbbbbbbbbbd */ -#define PARALLELL_SHIFT (0) -#define PARALLELL_MASK (255) +#define PARALLEL_SHIFT (0) +#define PARALLEL_MASK (255) #define LOCK_MODE_SHIFT (8) #define LOCK_MODE_MASK (1) @@ -151,13 +151,15 @@ private: #define SCAN_BATCH_MASK (1023) #define SCAN_DISTR_KEY_SHIFT (26) +#define SCAN_DISTR_KEY_MASK (1) #define SCAN_NODISK_SHIFT (9) +#define SCAN_NODISK_MASK (1) inline Uint8 ScanTabReq::getParallelism(const UintR & requestInfo){ - return (Uint8)((requestInfo >> PARALLELL_SHIFT) & PARALLELL_MASK); + return (Uint8)((requestInfo >> PARALLEL_SHIFT) & PARALLEL_MASK); } inline @@ -211,58 +213,65 @@ ScanTabReq::clearRequestInfo(UintR & requestInfo){ inline void ScanTabReq::setParallelism(UintR & requestInfo, Uint32 type){ - ASSERT_MAX(type, PARALLELL_MASK, "ScanTabReq::setParallellism"); - requestInfo |= (type << PARALLELL_SHIFT); + ASSERT_MAX(type, PARALLEL_MASK, "ScanTabReq::setParallelism"); + requestInfo= (requestInfo & ~(PARALLEL_MASK << PARALLEL_SHIFT)) | + ((type & PARALLEL_MASK) << PARALLEL_SHIFT); } inline void ScanTabReq::setLockMode(UintR & requestInfo, Uint32 mode){ ASSERT_MAX(mode, LOCK_MODE_MASK, "ScanTabReq::setLockMode"); - requestInfo |= (mode << LOCK_MODE_SHIFT); + requestInfo= (requestInfo & ~(LOCK_MODE_MASK << LOCK_MODE_SHIFT)) | + ((mode & LOCK_MODE_MASK) << LOCK_MODE_SHIFT); } inline void ScanTabReq::setHoldLockFlag(UintR & requestInfo, Uint32 flag){ ASSERT_BOOL(flag, "ScanTabReq::setHoldLockFlag"); - requestInfo |= (flag << HOLD_LOCK_SHIFT); + requestInfo= (requestInfo & ~(HOLD_LOCK_MASK << HOLD_LOCK_SHIFT)) | + ((flag & HOLD_LOCK_MASK) << HOLD_LOCK_SHIFT); } inline void ScanTabReq::setReadCommittedFlag(UintR & requestInfo, Uint32 flag){ ASSERT_BOOL(flag, "ScanTabReq::setReadCommittedFlag"); - requestInfo |= (flag << READ_COMMITTED_SHIFT); + requestInfo= (requestInfo & ~(READ_COMMITTED_MASK << READ_COMMITTED_SHIFT)) | + ((flag & READ_COMMITTED_MASK) << READ_COMMITTED_SHIFT); } inline void ScanTabReq::setRangeScanFlag(UintR & requestInfo, Uint32 flag){ ASSERT_BOOL(flag, "ScanTabReq::setRangeScanFlag"); - requestInfo |= (flag << RANGE_SCAN_SHIFT); + requestInfo= (requestInfo & ~(RANGE_SCAN_MASK << RANGE_SCAN_SHIFT)) | + ((flag & RANGE_SCAN_MASK) << RANGE_SCAN_SHIFT); } inline void ScanTabReq::setDescendingFlag(UintR & requestInfo, Uint32 flag){ ASSERT_BOOL(flag, "ScanTabReq::setDescendingFlag"); - requestInfo |= (flag << DESCENDING_SHIFT); + requestInfo= (requestInfo & ~(DESCENDING_MASK << DESCENDING_SHIFT)) | + ((flag & DESCENDING_MASK) << DESCENDING_SHIFT); } inline void ScanTabReq::setTupScanFlag(UintR & requestInfo, Uint32 flag){ ASSERT_BOOL(flag, "ScanTabReq::setTupScanFlag"); - requestInfo |= (flag << TUP_SCAN_SHIFT); + requestInfo= (requestInfo & ~(TUP_SCAN_MASK << TUP_SCAN_SHIFT)) | + ((flag & TUP_SCAN_MASK) << TUP_SCAN_SHIFT); } inline void ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){ ASSERT_MAX(flag, SCAN_BATCH_MASK, "ScanTabReq::setScanBatch"); - requestInfo &= ~(SCAN_BATCH_MASK << SCAN_BATCH_SHIFT); - requestInfo |= (flag << SCAN_BATCH_SHIFT); + requestInfo= (requestInfo & ~(SCAN_BATCH_MASK << SCAN_BATCH_SHIFT)) | + ((flag & SCAN_BATCH_MASK) << SCAN_BATCH_SHIFT); } inline @@ -275,33 +284,36 @@ inline void ScanTabReq::setKeyinfoFlag(UintR & requestInfo, Uint32 flag){ ASSERT_BOOL(flag, "ScanTabReq::setKeyinfoFlag"); - requestInfo |= (flag << KEYINFO_SHIFT); + requestInfo= (requestInfo & ~(KEYINFO_MASK << KEYINFO_SHIFT)) | + ((flag & KEYINFO_MASK) << KEYINFO_SHIFT); } inline Uint8 ScanTabReq::getDistributionKeyFlag(const UintR & requestInfo){ - return (Uint8)((requestInfo >> SCAN_DISTR_KEY_SHIFT) & 1); + return (Uint8)((requestInfo >> SCAN_DISTR_KEY_SHIFT) & SCAN_DISTR_KEY_MASK); } inline void ScanTabReq::setDistributionKeyFlag(UintR & requestInfo, Uint32 flag){ ASSERT_BOOL(flag, "ScanTabReq::setKeyinfoFlag"); - requestInfo |= (flag << SCAN_DISTR_KEY_SHIFT); + requestInfo= (requestInfo & ~(SCAN_DISTR_KEY_MASK << SCAN_DISTR_KEY_SHIFT)) | + ((flag & SCAN_DISTR_KEY_MASK) << SCAN_DISTR_KEY_SHIFT); } inline UintR ScanTabReq::getNoDiskFlag(const UintR & requestInfo){ - return (requestInfo >> SCAN_NODISK_SHIFT) & 1; + return (requestInfo >> SCAN_NODISK_SHIFT) & SCAN_NODISK_MASK; } inline void ScanTabReq::setNoDiskFlag(UintR & requestInfo, Uint32 flag){ ASSERT_BOOL(flag, "TcKeyReq::setNoDiskFlag"); - requestInfo |= (flag << SCAN_NODISK_SHIFT); + requestInfo= (requestInfo & ~(SCAN_NODISK_MASK << SCAN_NODISK_SHIFT)) | + ((flag & SCAN_NODISK_MASK) << SCAN_NODISK_SHIFT); } /** diff --git a/storage/ndb/include/ndbapi/NdbOperation.hpp b/storage/ndb/include/ndbapi/NdbOperation.hpp index 61ea71e7021ce9fe3c4a13b6297ed272947a7cb5..0fa2cac0a32a50994569e724560b616470d1524b 100644 --- a/storage/ndb/include/ndbapi/NdbOperation.hpp +++ b/storage/ndb/include/ndbapi/NdbOperation.hpp @@ -949,6 +949,8 @@ protected: // get table or index key from prepared signals int getKeyFromTCREQ(Uint32* data, Uint32 & size); + virtual void setReadLockMode(LockMode lockMode); + /****************************************************************************** * These are the private variables that are defined in the operation objects. *****************************************************************************/ diff --git a/storage/ndb/include/ndbapi/NdbScanOperation.hpp b/storage/ndb/include/ndbapi/NdbScanOperation.hpp index 1897a11c809021fd1a70dc4b58dfdf1d13a86d40..bc24b782addc627f69274eaec698baaaabbdcdd6 100644 --- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp @@ -214,6 +214,7 @@ protected: int init(const NdbTableImpl* tab, NdbTransaction*); int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId); int doSend(int ProcessorId); + virtual void setReadLockMode(LockMode lockMode); virtual void setErrorCode(int aErrorCode); virtual void setErrorCodeAbort(int aErrorCode); diff --git a/storage/ndb/src/ndbapi/NdbBlob.cpp b/storage/ndb/src/ndbapi/NdbBlob.cpp index b3664107cced15c41021d272225c925446757491..25dcafdef5325972a36a3ceb3a41bd1cb466afa4 100644 --- a/storage/ndb/src/ndbapi/NdbBlob.cpp +++ b/storage/ndb/src/ndbapi/NdbBlob.cpp @@ -1390,7 +1390,7 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl if (isReadOp()) { // upgrade lock mode if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead) - theNdbOp->theLockMode = NdbOperation::LM_Read; + theNdbOp->setReadLockMode(NdbOperation::LM_Read); // add read of head+inline in this op if (getHeadInlineValue(theNdbOp) == -1) DBUG_RETURN(-1); @@ -1411,7 +1411,7 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl if (isScanOp()) { // upgrade lock mode if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead) - theNdbOp->theLockMode = NdbOperation::LM_Read; + theNdbOp->setReadLockMode(NdbOperation::LM_Read); // add read of head+inline in this op if (getHeadInlineValue(theNdbOp) == -1) DBUG_RETURN(-1); diff --git a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp index f818564642b69946b0bd705756f3bfd8596782e0..ced9b18bd5566fe3c7ba95266b477d7d1b4eca49 100644 --- a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp +++ b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp @@ -333,6 +333,36 @@ NdbOperation::interpretedDeleteTuple() }//if }//NdbOperation::interpretedDeleteTuple() +void +NdbOperation::setReadLockMode(LockMode lockMode) +{ + /* We only support changing lock mode for read operations at this time. */ + assert(theOperationType == ReadRequest || theOperationType == ReadExclusive); + switch (lockMode) + { + case LM_CommittedRead: + theOperationType= ReadRequest; + theSimpleIndicator= 1; + theDirtyIndicator= 1; + break; + case LM_Read: + theNdbCon->theSimpleState= 0; + theOperationType= ReadRequest; + theSimpleIndicator= 0; + theDirtyIndicator= 0; + break; + case LM_Exclusive: + theNdbCon->theSimpleState= 0; + theOperationType= ReadExclusive; + theSimpleIndicator= 0; + theDirtyIndicator= 0; + break; + default: + /* Not supported / invalid. */ + assert(false); + } + theLockMode= lockMode; +} /****************************************************************************** diff --git a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp index c253dca8c61b3a4f801ef2e4abcf4d2bed0b7bf6..dc9a74ae11c558ccdd7af986f0bfda1997b0ef19 100644 --- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp @@ -138,34 +138,9 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, } theNdbCon->theScanningOp = this; - theLockMode = lm; - - bool lockExcl, lockHoldMode, readCommitted; - switch(lm){ - case NdbScanOperation::LM_Read: - lockExcl = false; - lockHoldMode = true; - readCommitted = false; - break; - case NdbScanOperation::LM_Exclusive: - lockExcl = true; - lockHoldMode = true; - readCommitted = false; - break; - case NdbScanOperation::LM_CommittedRead: - lockExcl = false; - lockHoldMode = false; - readCommitted = true; - break; - default: - setErrorCode(4003); - return -1; - } - - m_keyInfo = ((scan_flags & SF_KeyInfo) || lockExcl) ? 1 : 0; bool tupScan = (scan_flags & SF_TupScan); -#if 1 // XXX temp for testing +#if 0 // XXX temp for testing { char* p = getenv("NDB_USE_TUPSCAN"); if (p != 0) { unsigned n = atoi(p); // 0-10 @@ -225,13 +200,13 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, Uint32 reqInfo = 0; ScanTabReq::setParallelism(reqInfo, parallel); ScanTabReq::setScanBatch(reqInfo, 0); - ScanTabReq::setLockMode(reqInfo, lockExcl); - ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode); - ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted); ScanTabReq::setRangeScanFlag(reqInfo, rangeScan); ScanTabReq::setTupScanFlag(reqInfo, tupScan); req->requestInfo = reqInfo; + m_keyInfo = (scan_flags & SF_KeyInfo) ? 1 : 0; + setReadLockMode(lm); + Uint64 transId = theNdbCon->getTransactionId(); req->transId1 = (Uint32) transId; req->transId2 = (Uint32) (transId >> 32); @@ -251,6 +226,41 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, return 0; } +void +NdbScanOperation::setReadLockMode(LockMode lockMode) +{ + bool lockExcl, lockHoldMode, readCommitted; + switch (lockMode) + { + case LM_CommittedRead: + lockExcl= false; + lockHoldMode= false; + readCommitted= true; + break; + case LM_Read: + lockExcl= false; + lockHoldMode= true; + readCommitted= false; + break; + case LM_Exclusive: + lockExcl= true; + lockHoldMode= true; + readCommitted= false; + m_keyInfo= 1; + break; + default: + /* Not supported / invalid. */ + assert(false); + } + theLockMode= lockMode; + ScanTabReq *req= CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + Uint32 reqInfo= req->requestInfo; + ScanTabReq::setLockMode(reqInfo, lockExcl); + ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode); + ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted); + req->requestInfo= reqInfo; +} + int NdbScanOperation::fix_receivers(Uint32 parallel){ assert(parallel > 0); diff --git a/storage/ndb/test/ndbapi/testBlobs.cpp b/storage/ndb/test/ndbapi/testBlobs.cpp index e86714c4fcb11c48d35805bf4a4cad6a20295388..d9c657a0a296b9bcbbc4ca097f844f9523d49a57 100644 --- a/storage/ndb/test/ndbapi/testBlobs.cpp +++ b/storage/ndb/test/ndbapi/testBlobs.cpp @@ -141,6 +141,7 @@ printusage() << "bug tests" << endl << " -bug 4088 ndb api hang with mixed ops on index table" << endl << " -bug 27018 middle partial part write clobbers rest of part" << endl + << " -bug 27370 Potential inconsistent blob reads for ReadCommitted reads" << endl ; } @@ -1886,12 +1887,210 @@ bugtest_27018() return 0; } + +struct bug27370_data { + Ndb *m_ndb; + char m_current_write_value; + char *m_writebuf; + Uint32 m_blob1_size; + Uint32 m_pk1; + char m_pk2[g_max_pk2len + 1]; + bool m_thread_stop; +}; + +void *bugtest_27370_thread(void *arg) +{ + bug27370_data *data= (bug27370_data *)arg; + + while (!data->m_thread_stop) + { + memset(data->m_writebuf, data->m_current_write_value, data->m_blob1_size); + data->m_current_write_value++; + + NdbConnection *con; + if ((con= data->m_ndb->startTransaction()) == 0) + return (void *)"Failed to create transaction"; + NdbOperation *opr; + if ((opr= con->getNdbOperation(g_opt.m_tname)) == 0) + return (void *)"Failed to create operation"; + if (opr->writeTuple() != 0) + return (void *)"writeTuple() failed"; + if (opr->equal("PK1", data->m_pk1) != 0) + return (void *)"equal(PK1) failed"; + if (g_opt.m_pk2len != 0) + if (opr->equal("PK2", data->m_pk2) != 0) + return (void *)"equal(PK2) failed"; + NdbBlob *bh; + if ((bh= opr->getBlobHandle("BL1")) == 0) + return (void *)"getBlobHandle() failed"; + if (bh->setValue(data->m_writebuf, data->m_blob1_size) != 0) + return (void *)"setValue() failed"; + if (con->execute(Commit, AbortOnError, 1) != 0) + return (void *)"execute() failed"; + data->m_ndb->closeTransaction(con); + } + + return NULL; // Success +} + +static int +bugtest_27370() +{ + DBG("bug test 27370 - Potential inconsistent blob reads for ReadCommitted reads"); + + bug27370_data data; + + data.m_ndb= new Ndb(g_ncc, "TEST_DB"); + CHK(data.m_ndb->init(20) == 0); + CHK(data.m_ndb->waitUntilReady() == 0); + + data.m_current_write_value= 0; + data.m_blob1_size= g_opt.m_blob1.m_inline + 10 * g_opt.m_blob1.m_partsize; + CHK((data.m_writebuf= new char [data.m_blob1_size]) != 0); + data.m_pk1= 27370; + memset(data.m_pk2, 'x', g_max_pk2len); + data.m_pk2[g_max_pk2len]= '\0'; + data.m_thread_stop= false; + + memset(data.m_writebuf, data.m_current_write_value, data.m_blob1_size); + data.m_current_write_value++; + + CHK((g_con= g_ndb->startTransaction()) != 0); + CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0); + CHK(g_opr->writeTuple() == 0); + CHK(g_opr->equal("PK1", data.m_pk1) == 0); + if (g_opt.m_pk2len != 0) + CHK(g_opr->equal("PK2", data.m_pk2) == 0); + CHK((g_bh1= g_opr->getBlobHandle("BL1")) != 0); + CHK(g_bh1->setValue(data.m_writebuf, data.m_blob1_size) == 0); + CHK(g_con->execute(Commit) == 0); + g_ndb->closeTransaction(g_con); + g_con= NULL; + + pthread_t thread_handle; + CHK(pthread_create(&thread_handle, NULL, bugtest_27370_thread, &data) == 0); + + DBG("bug test 27370 - PK blob reads"); + Uint32 seen_updates= 0; + while (seen_updates < 50) + { + CHK((g_con= g_ndb->startTransaction()) != 0); + CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0); + CHK(g_opr->readTuple(NdbOperation::LM_CommittedRead) == 0); + CHK(g_opr->equal("PK1", data.m_pk1) == 0); + if (g_opt.m_pk2len != 0) + CHK(g_opr->equal("PK2", data.m_pk2) == 0); + CHK((g_bh1= g_opr->getBlobHandle("BL1")) != 0); + CHK(g_con->execute(NoCommit, AbortOnError, 1) == 0); + + const Uint32 loop_max= 10; + char read_char; + char original_read_char= 0; + Uint32 readloop; + for (readloop= 0;; readloop++) + { + if (readloop > 0) + { + if (readloop > 1) + { + /* Compare against first read. */ + CHK(read_char == original_read_char); + } + else + { + /* + We count the number of times we see the other thread had the + chance to update, so that we can be sure it had the opportunity + to run a reasonable number of times before we stop. + */ + if (original_read_char != read_char) + seen_updates++; + original_read_char= read_char; + } + } + if (readloop > loop_max) + break; + Uint32 readSize= 1; + CHK(g_bh1->setPos(urandom(data.m_blob1_size)) == 0); + CHK(g_bh1->readData(&read_char, readSize) == 0); + CHK(readSize == 1); + ExecType commitType= readloop == loop_max ? Commit : NoCommit; + CHK(g_con->execute(commitType, AbortOnError, 1) == 0); + } + g_ndb->closeTransaction(g_con); + g_con= NULL; + } + + DBG("bug test 27370 - table scan blob reads"); + seen_updates= 0; + while (seen_updates < 50) + { + CHK((g_con= g_ndb->startTransaction()) != 0); + CHK((g_ops= g_con->getNdbScanOperation(g_opt.m_tname)) != 0); + CHK(g_ops->readTuples(NdbOperation::LM_CommittedRead) == 0); + CHK((g_bh1= g_ops->getBlobHandle("BL1")) != 0); + CHK(g_con->execute(NoCommit, AbortOnError, 1) == 0); + CHK(g_ops->nextResult(true) == 0); + + const Uint32 loop_max= 10; + char read_char; + char original_read_char= 0; + Uint32 readloop; + for (readloop= 0;; readloop++) + { + if (readloop > 0) + { + if (readloop > 1) + { + /* Compare against first read. */ + CHK(read_char == original_read_char); + } + else + { + /* + We count the number of times we see the other thread had the + chance to update, so that we can be sure it had the opportunity + to run a reasonable number of times before we stop. + */ + if (original_read_char != read_char) + seen_updates++; + original_read_char= read_char; + } + } + if (readloop > loop_max) + break; + Uint32 readSize= 1; + CHK(g_bh1->setPos(urandom(data.m_blob1_size)) == 0); + CHK(g_bh1->readData(&read_char, readSize) == 0); + CHK(readSize == 1); + CHK(g_con->execute(NoCommit, AbortOnError, 1) == 0); + } + + CHK(g_ops->nextResult(true) == 1); + g_ndb->closeTransaction(g_con); + g_con= NULL; + } + + data.m_thread_stop= true; + void *thread_return; + CHK(pthread_join(thread_handle, &thread_return) == 0); + DBG("bug 27370 - thread return status: " << + (thread_return ? (char *)thread_return : "<null>")); + CHK(thread_return == 0); + + g_con= NULL; + g_opr= NULL; + g_bh1= NULL; + return 0; +} + static struct { int m_bug; int (*m_test)(); } g_bugtest[] = { { 4088, bugtest_4088 }, - { 27018, bugtest_27018 } + { 27018, bugtest_27018 }, + { 27370, bugtest_27370 } }; NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) diff --git a/storage/ndb/test/run-test/daily-basic-tests.txt b/storage/ndb/test/run-test/daily-basic-tests.txt index 181efc0ddc588b074c4a7b8475bdd6137763ac9a..ba55755cdd85c048791070f34702bee315203a8d 100644 --- a/storage/ndb/test/run-test/daily-basic-tests.txt +++ b/storage/ndb/test/run-test/daily-basic-tests.txt @@ -708,6 +708,14 @@ max-time: 1500 cmd: testBlobs args: +max-time: 600 +cmd: testBlobs +args: -bug 27018 + +max-time: 600 +cmd: testBlobs +args: -bug 27370 + max-time: 5000 cmd: testOIBasic args: -case abcdefz