ndb - bug#19285 : document and check what blob ops are allowed

parent df358201
...@@ -74,19 +74,41 @@ class NdbColumnImpl; ...@@ -74,19 +74,41 @@ class NdbColumnImpl;
* NdbBlob methods return -1 on error and 0 on success, and use output * NdbBlob methods return -1 on error and 0 on success, and use output
* parameters when necessary. * parameters when necessary.
* *
* Operation types: * Usage notes for different operation types:
* - insertTuple must use setValue if blob column is non-nullable *
* - readTuple with exclusive lock can also update existing value * - insertTuple must use setValue if blob attribute is non-nullable
* - updateTuple can overwrite with setValue or update existing value *
* - writeTuple always overwrites and must use setValue if non-nullable * - readTuple or scan readTuples with lock mode LM_CommittedRead is
* automatically upgraded to lock mode LM_Read if any blob attributes
* are accessed (to guarantee consistent view)
*
* - readTuple (with any lock mode) can only read blob value
*
* - updateTuple can either overwrite existing value with setValue or
* update it in active phase
*
* - writeTuple always overwrites blob value and must use setValue if
* blob attribute is non-nullable
*
* - deleteTuple creates implicit non-accessible blob handles * - deleteTuple creates implicit non-accessible blob handles
* - scan with exclusive lock can also update existing value *
* - scan "lock takeover" update op must do its own getBlobHandle * - scan readTuples (any lock mode) can use its blob handles only
* to read blob value
*
* - scan readTuples with lock mode LM_Exclusive can update row and blob
* value using updateCurrentTuple, where the operation returned must
* create its own blob handles explicitly
*
* - scan readTuples with lock mode LM_Exclusive can delete row (and
* therefore blob values) using deleteCurrentTuple, which creates
* implicit non-accessible blob handles
*
* - the operation returned by lockCurrentTuple cannot update blob value
* *
* Bugs / limitations: * Bugs / limitations:
* - lock mode upgrade should be handled automatically *
* - lock mode vs allowed operation is not checked
* - too many pending blob ops can blow up i/o buffers * - too many pending blob ops can blow up i/o buffers
*
* - table and its blob part tables are not created atomically * - table and its blob part tables are not created atomically
*/ */
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
...@@ -194,6 +216,9 @@ public: ...@@ -194,6 +216,9 @@ public:
/** /**
* Return error object. The error may be blob specific (below) or may * Return error object. The error may be blob specific (below) or may
* be copied from a failed implicit operation. * be copied from a failed implicit operation.
*
* The error code is copied back to the operation unless the operation
* already has a non-zero error code.
*/ */
const NdbError& getNdbError() const; const NdbError& getNdbError() const;
/** /**
...@@ -290,6 +315,7 @@ private: ...@@ -290,6 +315,7 @@ private:
bool isWriteOp(); bool isWriteOp();
bool isDeleteOp(); bool isDeleteOp();
bool isScanOp(); bool isScanOp();
bool isReadOnlyOp();
bool isTakeOverOp(); bool isTakeOverOp();
// computations // computations
Uint32 getPartNumber(Uint64 pos); Uint32 getPartNumber(Uint64 pos);
...@@ -323,9 +349,9 @@ private: ...@@ -323,9 +349,9 @@ private:
int preCommit(); int preCommit();
int atNextResult(); int atNextResult();
// errors // errors
void setErrorCode(int anErrorCode, bool invalidFlag = true); void setErrorCode(int anErrorCode, bool invalidFlag = false);
void setErrorCode(NdbOperation* anOp, bool invalidFlag = true); void setErrorCode(NdbOperation* anOp, bool invalidFlag = false);
void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true); void setErrorCode(NdbTransaction* aCon, bool invalidFlag = false);
#ifdef VM_TRACE #ifdef VM_TRACE
int getOperationType() const; int getOperationType() const;
friend class NdbOut& operator<<(NdbOut&, const NdbBlob&); friend class NdbOut& operator<<(NdbOut&, const NdbBlob&);
......
...@@ -41,7 +41,7 @@ public: ...@@ -41,7 +41,7 @@ public:
* readTuples. * readTuples.
*/ */
enum ScanFlag { enum ScanFlag {
SF_TupScan = (1 << 16), // scan TUP - only LM_CommittedRead SF_TupScan = (1 << 16), // scan TUP
SF_OrderBy = (1 << 24), // index scan in order SF_OrderBy = (1 << 24), // index scan in order
SF_Descending = (2 << 24), // index scan in descending order SF_Descending = (2 << 24), // index scan in descending order
SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no
......
...@@ -265,6 +265,16 @@ NdbBlob::isScanOp() ...@@ -265,6 +265,16 @@ NdbBlob::isScanOp()
theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest; theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest;
} }
inline bool
NdbBlob::isReadOnlyOp()
{
return ! (
theNdbOp->theOperationType == NdbOperation::InsertRequest ||
theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
theNdbOp->theOperationType == NdbOperation::WriteRequest
);
}
inline bool inline bool
NdbBlob::isTakeOverOp() NdbBlob::isTakeOverOp()
{ {
...@@ -438,12 +448,12 @@ NdbBlob::getValue(void* data, Uint32 bytes) ...@@ -438,12 +448,12 @@ NdbBlob::getValue(void* data, Uint32 bytes)
{ {
DBUG_ENTER("NdbBlob::getValue"); DBUG_ENTER("NdbBlob::getValue");
DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes)); DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
if (theGetFlag || theState != Prepared) { if (! isReadOp() && ! isScanOp()) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (! isReadOp() && ! isScanOp()) { if (theGetFlag || theState != Prepared) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrState);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (data == NULL && bytes != 0) { if (data == NULL && bytes != 0) {
...@@ -461,12 +471,12 @@ NdbBlob::setValue(const void* data, Uint32 bytes) ...@@ -461,12 +471,12 @@ NdbBlob::setValue(const void* data, Uint32 bytes)
{ {
DBUG_ENTER("NdbBlob::setValue"); DBUG_ENTER("NdbBlob::setValue");
DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes)); DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
if (theSetFlag || theState != Prepared) { if (isReadOnlyOp()) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (! isInsertOp() && ! isUpdateOp() && ! isWriteOp()) { if (theSetFlag || theState != Prepared) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrState);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (data == NULL && bytes != 0) { if (data == NULL && bytes != 0) {
...@@ -533,6 +543,10 @@ int ...@@ -533,6 +543,10 @@ int
NdbBlob::setNull() NdbBlob::setNull()
{ {
DBUG_ENTER("NdbBlob::setNull"); DBUG_ENTER("NdbBlob::setNull");
if (isReadOnlyOp()) {
setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1);
}
if (theNullFlag == -1) { if (theNullFlag == -1) {
if (theState == Prepared) { if (theState == Prepared) {
DBUG_RETURN(setValue(0, 0)); DBUG_RETURN(setValue(0, 0));
...@@ -571,6 +585,10 @@ NdbBlob::truncate(Uint64 length) ...@@ -571,6 +585,10 @@ NdbBlob::truncate(Uint64 length)
{ {
DBUG_ENTER("NdbBlob::truncate"); DBUG_ENTER("NdbBlob::truncate");
DBUG_PRINT("info", ("length=%llu", length)); DBUG_PRINT("info", ("length=%llu", length));
if (isReadOnlyOp()) {
setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1);
}
if (theNullFlag == -1) { if (theNullFlag == -1) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrState);
DBUG_RETURN(-1); DBUG_RETURN(-1);
...@@ -628,12 +646,14 @@ NdbBlob::setPos(Uint64 pos) ...@@ -628,12 +646,14 @@ NdbBlob::setPos(Uint64 pos)
int int
NdbBlob::readData(void* data, Uint32& bytes) NdbBlob::readData(void* data, Uint32& bytes)
{ {
DBUG_ENTER("NdbBlob::readData");
if (theState != Active) { if (theState != Active) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrState);
return -1; DBUG_RETURN(-1);
} }
char* buf = static_cast<char*>(data); char* buf = static_cast<char*>(data);
return readDataPrivate(buf, bytes); int ret = readDataPrivate(buf, bytes);
DBUG_RETURN(ret);
} }
int int
...@@ -722,12 +742,18 @@ NdbBlob::readDataPrivate(char* buf, Uint32& bytes) ...@@ -722,12 +742,18 @@ NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
int int
NdbBlob::writeData(const void* data, Uint32 bytes) NdbBlob::writeData(const void* data, Uint32 bytes)
{ {
DBUG_ENTER("NdbBlob::writeData");
if (isReadOnlyOp()) {
setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1);
}
if (theState != Active) { if (theState != Active) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrState);
return -1; DBUG_RETURN(-1);
} }
const char* buf = static_cast<const char*>(data); const char* buf = static_cast<const char*>(data);
return writeDataPrivate(buf, bytes); int ret = writeDataPrivate(buf, bytes);
DBUG_RETURN(0);
} }
int int
...@@ -1130,6 +1156,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl ...@@ -1130,6 +1156,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
} }
} }
if (isReadOp()) { if (isReadOp()) {
// upgrade lock mode
if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
theNdbOp->theLockMode = NdbOperation::LM_Read;
// add read of head+inline in this op // add read of head+inline in this op
if (getHeadInlineValue(theNdbOp) == -1) if (getHeadInlineValue(theNdbOp) == -1)
DBUG_RETURN(-1); DBUG_RETURN(-1);
...@@ -1148,6 +1177,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl ...@@ -1148,6 +1177,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
supportedOp = true; supportedOp = true;
} }
if (isScanOp()) { if (isScanOp()) {
// upgrade lock mode
if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
theNdbOp->theLockMode = NdbOperation::LM_Read;
// add read of head+inline in this op // add read of head+inline in this op
if (getHeadInlineValue(theNdbOp) == -1) if (getHeadInlineValue(theNdbOp) == -1)
DBUG_RETURN(-1); DBUG_RETURN(-1);
......
...@@ -24,7 +24,7 @@ public: ...@@ -24,7 +24,7 @@ public:
STATIC_CONST( ErrTable = 4263 ); STATIC_CONST( ErrTable = 4263 );
// "Invalid usage of blob attribute" // "Invalid usage of blob attribute"
STATIC_CONST( ErrUsage = 4264 ); STATIC_CONST( ErrUsage = 4264 );
// "Method is not valid in current blob state" // "The blob method is not valid in current blob state"
STATIC_CONST( ErrState = 4265 ); STATIC_CONST( ErrState = 4265 );
// "Invalid blob seek position" // "Invalid blob seek position"
STATIC_CONST( ErrSeek = 4266 ); STATIC_CONST( ErrSeek = 4266 );
...@@ -33,7 +33,9 @@ public: ...@@ -33,7 +33,9 @@ public:
// "Error in blob head update forced rollback of transaction" // "Error in blob head update forced rollback of transaction"
STATIC_CONST( ErrAbort = 4268 ); STATIC_CONST( ErrAbort = 4268 );
// "Unknown blob error" // "Unknown blob error"
STATIC_CONST( ErrUnknown = 4269 ); STATIC_CONST( ErrUnknown = 4270 );
// "The blob method is incompatible with operation type or lock mode"
STATIC_CONST( ErrCompat = 4275 );
}; };
#endif #endif
...@@ -513,14 +513,15 @@ ErrorBundle ErrorCodes[] = { ...@@ -513,14 +513,15 @@ ErrorBundle ErrorCodes[] = {
{ 4262, UD, "NdbScanFilter: Condition is out of bounds"}, { 4262, UD, "NdbScanFilter: Condition is out of bounds"},
{ 4263, IE, "Invalid blob attributes or invalid blob parts table" }, { 4263, IE, "Invalid blob attributes or invalid blob parts table" },
{ 4264, AE, "Invalid usage of blob attribute" }, { 4264, AE, "Invalid usage of blob attribute" },
{ 4265, AE, "Method is not valid in current blob state" }, { 4265, AE, "The blob method is not valid in current blob state" },
{ 4266, AE, "Invalid blob seek position" }, { 4266, AE, "Invalid blob seek position" },
{ 4267, IE, "Corrupted blob value" }, { 4267, IE, "Corrupted blob value" },
{ 4268, IE, "Error in blob head update forced rollback of transaction" }, { 4268, IE, "Error in blob head update forced rollback of transaction" },
{ 4269, IE, "No connection to ndb management server" }, { 4269, IE, "No connection to ndb management server" },
{ 4270, IE, "Unknown blob error" }, { 4270, IE, "Unknown blob error" },
{ 4335, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" }, { 4335, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" },
{ 4271, AE, "Invalid index object, not retrieved via getIndex()" } { 4271, AE, "Invalid index object, not retrieved via getIndex()" },
{ 4275, IE, "The blob method is incompatible with operation type or lock mode" }
}; };
static static
......
...@@ -226,7 +226,7 @@ dropTable() ...@@ -226,7 +226,7 @@ dropTable()
{ {
NdbDictionary::Table tab(g_opt.m_tname); NdbDictionary::Table tab(g_opt.m_tname);
if (g_dic->getTable(g_opt.m_tname) != 0) if (g_dic->getTable(g_opt.m_tname) != 0)
CHK(g_dic->dropTable(tab) == 0); CHK(g_dic->dropTable(g_opt.m_tname) == 0);
return 0; return 0;
} }
...@@ -297,13 +297,15 @@ createTable() ...@@ -297,13 +297,15 @@ createTable()
struct Bval { struct Bval {
char* m_val; char* m_val;
unsigned m_len; unsigned m_len;
char* m_buf; char* m_buf; // read/write buffer
unsigned m_buflen; unsigned m_buflen;
int m_error_code; // for testing expected error code
Bval() : Bval() :
m_val(0), m_val(0),
m_len(0), m_len(0),
m_buf(0), // read/write buffer m_buf(0),
m_buflen(0) m_buflen(0),
m_error_code(0)
{} {}
~Bval() { delete [] m_val; delete [] m_buf; } ~Bval() { delete [] m_val; delete [] m_buf; }
void alloc(unsigned buflen) { void alloc(unsigned buflen) {
...@@ -459,19 +461,23 @@ getBlobLength(NdbBlob* h, unsigned& len) ...@@ -459,19 +461,23 @@ getBlobLength(NdbBlob* h, unsigned& len)
// setValue / getValue // setValue / getValue
static int static int
setBlobValue(NdbBlob* h, const Bval& v) setBlobValue(NdbBlob* h, const Bval& v, int error_code = 0)
{ {
bool null = (v.m_val == 0); bool null = (v.m_val == 0);
bool isNull; bool isNull;
unsigned len; unsigned len;
DBG("setValue " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null); DBG("setValue " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null);
if (null) { if (null) {
CHK(h->setNull() == 0); CHK(h->setNull() == 0 || h->getNdbError().code == error_code);
if (error_code)
return 0;
isNull = false; isNull = false;
CHK(h->getNull(isNull) == 0 && isNull == true); CHK(h->getNull(isNull) == 0 && isNull == true);
CHK(getBlobLength(h, len) == 0 && len == 0); CHK(getBlobLength(h, len) == 0 && len == 0);
} else { } else {
CHK(h->setValue(v.m_val, v.m_len) == 0); CHK(h->setValue(v.m_val, v.m_len) == 0 || h->getNdbError().code == error_code);
if (error_code)
return 0;
CHK(h->getNull(isNull) == 0 && isNull == false); CHK(h->getNull(isNull) == 0 && isNull == false);
CHK(getBlobLength(h, len) == 0 && len == v.m_len); CHK(getBlobLength(h, len) == 0 && len == v.m_len);
} }
...@@ -479,11 +485,11 @@ setBlobValue(NdbBlob* h, const Bval& v) ...@@ -479,11 +485,11 @@ setBlobValue(NdbBlob* h, const Bval& v)
} }
static int static int
setBlobValue(const Tup& tup) setBlobValue(const Tup& tup, int error_code = 0)
{ {
CHK(setBlobValue(g_bh1, tup.m_blob1) == 0); CHK(setBlobValue(g_bh1, tup.m_blob1, error_code) == 0);
if (! g_opt.m_oneblob) if (! g_opt.m_oneblob)
CHK(setBlobValue(g_bh2, tup.m_blob2) == 0); CHK(setBlobValue(g_bh2, tup.m_blob2, error_code) == 0);
return 0; return 0;
} }
...@@ -543,13 +549,18 @@ writeBlobData(NdbBlob* h, const Bval& v) ...@@ -543,13 +549,18 @@ writeBlobData(NdbBlob* h, const Bval& v)
bool isNull; bool isNull;
unsigned len; unsigned len;
DBG("write " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null); DBG("write " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null);
int error_code = v.m_error_code;
if (null) { if (null) {
CHK(h->setNull() == 0); CHK(h->setNull() == 0 || h->getNdbError().code == error_code);
if (error_code)
return 0;
isNull = false; isNull = false;
CHK(h->getNull(isNull) == 0 && isNull == true); CHK(h->getNull(isNull) == 0 && isNull == true);
CHK(getBlobLength(h, len) == 0 && len == 0); CHK(getBlobLength(h, len) == 0 && len == 0);
} else { } else {
CHK(h->truncate(v.m_len) == 0); CHK(h->truncate(v.m_len) == 0 || h->getNdbError().code == error_code);
if (error_code)
return 0;
unsigned n = 0; unsigned n = 0;
do { do {
unsigned m = g_opt.m_full ? v.m_len : urandom(v.m_len + 1); unsigned m = g_opt.m_full ? v.m_len : urandom(v.m_len + 1);
...@@ -568,11 +579,14 @@ writeBlobData(NdbBlob* h, const Bval& v) ...@@ -568,11 +579,14 @@ writeBlobData(NdbBlob* h, const Bval& v)
} }
static int static int
writeBlobData(const Tup& tup) writeBlobData(Tup& tup, int error_code = 0)
{ {
tup.m_blob1.m_error_code = error_code;
CHK(writeBlobData(g_bh1, tup.m_blob1) == 0); CHK(writeBlobData(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob) if (! g_opt.m_oneblob) {
tup.m_blob2.m_error_code = error_code;
CHK(writeBlobData(g_bh2, tup.m_blob2) == 0); CHK(writeBlobData(g_bh2, tup.m_blob2) == 0);
}
return 0; return 0;
} }
...@@ -635,19 +649,20 @@ blobWriteHook(NdbBlob* h, void* arg) ...@@ -635,19 +649,20 @@ blobWriteHook(NdbBlob* h, void* arg)
} }
static int static int
setBlobWriteHook(NdbBlob* h, Bval& v) setBlobWriteHook(NdbBlob* h, Bval& v, int error_code = 0)
{ {
DBG("setBlobWriteHook"); DBG("setBlobWriteHook");
v.m_error_code = error_code;
CHK(h->setActiveHook(blobWriteHook, &v) == 0); CHK(h->setActiveHook(blobWriteHook, &v) == 0);
return 0; return 0;
} }
static int static int
setBlobWriteHook(Tup& tup) setBlobWriteHook(Tup& tup, int error_code = 0)
{ {
CHK(setBlobWriteHook(g_bh1, tup.m_blob1) == 0); CHK(setBlobWriteHook(g_bh1, tup.m_blob1, error_code) == 0);
if (! g_opt.m_oneblob) if (! g_opt.m_oneblob)
CHK(setBlobWriteHook(g_bh2, tup.m_blob2) == 0); CHK(setBlobWriteHook(g_bh2, tup.m_blob2, error_code) == 0);
return 0; return 0;
} }
...@@ -869,7 +884,10 @@ readPk(int style) ...@@ -869,7 +884,10 @@ readPk(int style)
DBG("readPk pk1=" << hex << tup.m_pk1); DBG("readPk pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0); CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
CHK(g_opr->readTuple() == 0); if (urandom(2) == 0)
CHK(g_opr->readTuple() == 0);
else
CHK(g_opr->readTuple(NdbOperation::LM_CommittedRead) == 0);
CHK(g_opr->equal("PK1", tup.m_pk1) == 0); CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
if (g_opt.m_pk2len != 0) if (g_opt.m_pk2len != 0)
CHK(g_opr->equal("PK2", tup.m_pk2) == 0); CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
...@@ -883,6 +901,8 @@ readPk(int style) ...@@ -883,6 +901,8 @@ readPk(int style)
CHK(readBlobData(tup) == 0); CHK(readBlobData(tup) == 0);
} }
CHK(g_con->execute(Commit) == 0); CHK(g_con->execute(Commit) == 0);
// verify lock mode upgrade
CHK(g_opr->getLockMode() == NdbOperation::LM_Read);
if (style == 0 || style == 1) { if (style == 0 || style == 1) {
CHK(verifyBlobValue(tup) == 0); CHK(verifyBlobValue(tup) == 0);
} }
...@@ -900,23 +920,40 @@ updatePk(int style) ...@@ -900,23 +920,40 @@ updatePk(int style)
for (unsigned k = 0; k < g_opt.m_rows; k++) { for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k]; Tup& tup = g_tups[k];
DBG("updatePk pk1=" << hex << tup.m_pk1); DBG("updatePk pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0); while (1) {
CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0); int mode = urandom(3);
CHK(g_opr->updateTuple() == 0); int error_code = mode == 0 ? 0 : 4275;
CHK(g_opr->equal("PK1", tup.m_pk1) == 0); CHK((g_con = g_ndb->startTransaction()) != 0);
if (g_opt.m_pk2len != 0) CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
CHK(g_opr->equal("PK2", tup.m_pk2) == 0); if (mode == 0) {
CHK(getBlobHandles(g_opr) == 0); DBG("using updateTuple");
if (style == 0) { CHK(g_opr->updateTuple() == 0);
CHK(setBlobValue(tup) == 0); } else if (mode == 1) {
} else if (style == 1) { DBG("using readTuple exclusive");
CHK(setBlobWriteHook(tup) == 0); CHK(g_opr->readTuple(NdbOperation::LM_Exclusive) == 0);
} else { } else {
CHK(g_con->execute(NoCommit) == 0); DBG("using readTuple - will fail and retry");
CHK(writeBlobData(tup) == 0); CHK(g_opr->readTuple() == 0);
}
CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
if (g_opt.m_pk2len != 0)
CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
CHK(getBlobHandles(g_opr) == 0);
if (style == 0) {
CHK(setBlobValue(tup, error_code) == 0);
} else if (style == 1) {
CHK(setBlobWriteHook(tup, error_code) == 0);
} else {
CHK(g_con->execute(NoCommit) == 0);
CHK(writeBlobData(tup, error_code) == 0);
}
if (error_code == 0) {
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
break;
}
g_ndb->closeTransaction(g_con);
} }
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_opr = 0; g_opr = 0;
g_con = 0; g_con = 0;
tup.m_exists = true; tup.m_exists = true;
...@@ -1002,7 +1039,10 @@ readIdx(int style) ...@@ -1002,7 +1039,10 @@ readIdx(int style)
DBG("readIdx pk1=" << hex << tup.m_pk1); DBG("readIdx pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0); CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
CHK(g_opx->readTuple() == 0); if (urandom(2) == 0)
CHK(g_opx->readTuple() == 0);
else
CHK(g_opx->readTuple(NdbOperation::LM_CommittedRead) == 0);
CHK(g_opx->equal("PK2", tup.m_pk2) == 0); CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
CHK(getBlobHandles(g_opx) == 0); CHK(getBlobHandles(g_opx) == 0);
if (style == 0) { if (style == 0) {
...@@ -1014,6 +1054,8 @@ readIdx(int style) ...@@ -1014,6 +1054,8 @@ readIdx(int style)
CHK(readBlobData(tup) == 0); CHK(readBlobData(tup) == 0);
} }
CHK(g_con->execute(Commit) == 0); CHK(g_con->execute(Commit) == 0);
// verify lock mode upgrade (already done by NdbIndexOperation)
CHK(g_opx->getLockMode() == NdbOperation::LM_Read);
if (style == 0 || style == 1) { if (style == 0 || style == 1) {
CHK(verifyBlobValue(tup) == 0); CHK(verifyBlobValue(tup) == 0);
} }
...@@ -1031,6 +1073,7 @@ updateIdx(int style) ...@@ -1031,6 +1073,7 @@ updateIdx(int style)
for (unsigned k = 0; k < g_opt.m_rows; k++) { for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k]; Tup& tup = g_tups[k];
DBG("updateIdx pk1=" << hex << tup.m_pk1); DBG("updateIdx pk1=" << hex << tup.m_pk1);
// skip 4275 testing
CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0); CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
CHK(g_opx->updateTuple() == 0); CHK(g_opx->updateTuple() == 0);
...@@ -1128,7 +1171,10 @@ readScan(int style, bool idx) ...@@ -1128,7 +1171,10 @@ readScan(int style, bool idx)
} else { } else {
CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0); CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
} }
CHK(g_ops->readTuples(NdbScanOperation::LM_Read) == 0); if (urandom(2) == 0)
CHK(g_ops->readTuples(NdbOperation::LM_Read) == 0);
else
CHK(g_ops->readTuples(NdbOperation::LM_CommittedRead) == 0);
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0); CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0) if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0); CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
...@@ -1139,6 +1185,8 @@ readScan(int style, bool idx) ...@@ -1139,6 +1185,8 @@ readScan(int style, bool idx)
CHK(setBlobReadHook(tup) == 0); CHK(setBlobReadHook(tup) == 0);
} }
CHK(g_con->execute(NoCommit) == 0); CHK(g_con->execute(NoCommit) == 0);
// verify lock mode upgrade
CHK(g_ops->getLockMode() == NdbOperation::LM_Read);
unsigned rows = 0; unsigned rows = 0;
while (1) { while (1) {
int ret; int ret;
...@@ -1180,7 +1228,7 @@ updateScan(int style, bool idx) ...@@ -1180,7 +1228,7 @@ updateScan(int style, bool idx)
} else { } else {
CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0); CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
} }
CHK(g_ops->readTuples(NdbScanOperation::LM_Exclusive) == 0); CHK(g_ops->readTuples(NdbOperation::LM_Exclusive) == 0);
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0); CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0) if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0); CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
...@@ -1199,6 +1247,7 @@ updateScan(int style, bool idx) ...@@ -1199,6 +1247,7 @@ updateScan(int style, bool idx)
// calculate new blob values // calculate new blob values
calcBval(g_tups[k], false); calcBval(g_tups[k], false);
tup.copyfrom(g_tups[k]); tup.copyfrom(g_tups[k]);
// cannot do 4275 testing, scan op error code controls execution
CHK((g_opr = g_ops->updateCurrentTuple()) != 0); CHK((g_opr = g_ops->updateCurrentTuple()) != 0);
CHK(getBlobHandles(g_opr) == 0); CHK(getBlobHandles(g_opr) == 0);
if (style == 0) { if (style == 0) {
...@@ -1232,7 +1281,7 @@ deleteScan(bool idx) ...@@ -1232,7 +1281,7 @@ deleteScan(bool idx)
} else { } else {
CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0); CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
} }
CHK(g_ops->readTuples(NdbScanOperation::LM_Exclusive) == 0); CHK(g_ops->readTuples(NdbOperation::LM_Exclusive) == 0);
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0); CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0) if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0); CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
...@@ -1651,7 +1700,7 @@ testperf() ...@@ -1651,7 +1700,7 @@ testperf()
char b[20]; char b[20];
CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0); CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
CHK(g_ops->readTuples(NdbScanOperation::LM_Read) == 0); CHK(g_ops->readTuples(NdbOperation::LM_Read) == 0);
CHK(g_ops->getValue(cA, (char*)&a) != 0); CHK(g_ops->getValue(cA, (char*)&a) != 0);
CHK(g_ops->getValue(cB, b) != 0); CHK(g_ops->getValue(cB, b) != 0);
CHK(g_con->execute(NoCommit) == 0); CHK(g_con->execute(NoCommit) == 0);
...@@ -1680,7 +1729,7 @@ testperf() ...@@ -1680,7 +1729,7 @@ testperf()
char c[20]; char c[20];
CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0); CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
CHK(g_ops->readTuples(NdbScanOperation::LM_Read) == 0); CHK(g_ops->readTuples(NdbOperation::LM_Read) == 0);
CHK(g_ops->getValue(cA, (char*)&a) != 0); CHK(g_ops->getValue(cA, (char*)&a) != 0);
CHK((g_bh1 = g_ops->getBlobHandle(cC)) != 0); CHK((g_bh1 = g_ops->getBlobHandle(cC)) != 0);
CHK(g_con->execute(NoCommit) == 0); CHK(g_con->execute(NoCommit) == 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment