Commit 70dc08b9 authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - fix bug#18075 varsize PK + blobs

parent 3c1f4e3c
...@@ -292,6 +292,7 @@ private: ...@@ -292,6 +292,7 @@ private:
}; };
Buf theKeyBuf; Buf theKeyBuf;
Buf theAccessKeyBuf; Buf theAccessKeyBuf;
Buf thePackKeyBuf;
Buf theHeadInlineBuf; Buf theHeadInlineBuf;
Buf theHeadInlineCopyBuf; // for writeTuple Buf theHeadInlineCopyBuf; // for writeTuple
Buf thePartBuf; Buf thePartBuf;
...@@ -328,6 +329,9 @@ private: ...@@ -328,6 +329,9 @@ private:
Uint32 getPartNumber(Uint64 pos); Uint32 getPartNumber(Uint64 pos);
Uint32 getPartCount(); Uint32 getPartCount();
Uint32 getDistKey(Uint32 part); Uint32 getDistKey(Uint32 part);
// pack / unpack
int packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf);
int unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf);
// getters and setters // getters and setters
int getTableKeyValue(NdbOperation* anOp); int getTableKeyValue(NdbOperation* anOp);
int setTableKeyValue(NdbOperation* anOp); int setTableKeyValue(NdbOperation* anOp);
......
...@@ -881,7 +881,7 @@ protected: ...@@ -881,7 +881,7 @@ protected:
Uint32 ptr2int() { return theReceiver.getId(); }; Uint32 ptr2int() { return theReceiver.getId(); };
// get table or index key from prepared signals // get table or index key from prepared signals
int getKeyFromTCREQ(Uint32* data, unsigned size); int getKeyFromTCREQ(Uint32* data, Uint32 & size);
/****************************************************************************** /******************************************************************************
* These are the private variables that are defined in the operation objects. * These are the private variables that are defined in the operation objects.
......
...@@ -240,7 +240,7 @@ protected: ...@@ -240,7 +240,7 @@ protected:
void receiver_completed(NdbReceiver*); void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP(); void execCLOSE_SCAN_REP();
int getKeyFromKEYINFO20(Uint32* data, unsigned size); int getKeyFromKEYINFO20(Uint32* data, Uint32 & size);
NdbOperation* takeOverScanOp(OperationType opType, NdbTransaction*); NdbOperation* takeOverScanOp(OperationType opType, NdbTransaction*);
bool m_ordered; bool m_ordered;
......
...@@ -310,7 +310,7 @@ NdbBlob::Buf::alloc(unsigned n) ...@@ -310,7 +310,7 @@ NdbBlob::Buf::alloc(unsigned n)
void void
NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src) NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
{ {
assert(size == src.size); size = src.size;
memcpy(data, src.data, size); memcpy(data, src.data, size);
} }
...@@ -408,6 +408,75 @@ NdbBlob::getDistKey(Uint32 part) ...@@ -408,6 +408,75 @@ NdbBlob::getDistKey(Uint32 part)
return (part / theStripeSize) % theStripeSize; return (part / theStripeSize) % theStripeSize;
} }
// pack/unpack table/index key XXX support routines, shortcuts
int
NdbBlob::packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf)
{
DBUG_ENTER("NdbBlob::packKeyValue");
const Uint32* data = (const Uint32*)srcBuf.data;
unsigned pos = 0;
Uint32* pack_data = (Uint32*)thePackKeyBuf.data;
unsigned pack_pos = 0;
for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
NdbColumnImpl* c = aTable->m_columns[i];
assert(c != NULL);
if (c->m_pk) {
unsigned len = c->m_attrSize * c->m_arraySize;
Uint32 pack_len;
bool ok = c->get_var_length(&data[pos], pack_len);
if (! ok) {
setErrorCode(NdbBlobImpl::ErrCorruptPK);
DBUG_RETURN(-1);
}
memcpy(&pack_data[pack_pos], &data[pos], pack_len);
while (pack_len % 4 != 0) {
char* p = (char*)&pack_data[pack_pos] + pack_len++;
*p = 0;
}
pos += (len + 3) / 4;
pack_pos += pack_len / 4;
}
}
assert(4 * pos == srcBuf.size);
assert(4 * pack_pos <= thePackKeyBuf.maxsize);
thePackKeyBuf.size = 4 * pack_pos;
DBUG_RETURN(0);
}
int
NdbBlob::unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf)
{
DBUG_ENTER("NdbBlob::unpackKeyValue");
Uint32* data = (Uint32*)dstBuf.data;
unsigned pos = 0;
const Uint32* pack_data = (const Uint32*)thePackKeyBuf.data;
unsigned pack_pos = 0;
for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
NdbColumnImpl* c = aTable->m_columns[i];
assert(c != NULL);
if (c->m_pk) {
unsigned len = c->m_attrSize * c->m_arraySize;
Uint32 pack_len;
bool ok = c->get_var_length(&pack_data[pack_pos], pack_len);
if (! ok) {
setErrorCode(NdbBlobImpl::ErrCorruptPK);
DBUG_RETURN(-1);
}
memcpy(&data[pos], &pack_data[pack_pos], pack_len);
while (pack_len % 4 != 0) {
char* p = (char*)&data[pos] + pack_len++;
*p = 0;
}
pos += (len + 3) / 4;
pack_pos += pack_len / 4;
}
}
assert(4 * pos == dstBuf.size);
assert(4 * pack_pos == thePackKeyBuf.size);
DBUG_RETURN(0);
}
// getters and setters // getters and setters
int int
...@@ -489,12 +558,10 @@ int ...@@ -489,12 +558,10 @@ int
NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part) NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
{ {
DBUG_ENTER("NdbBlob::setPartKeyValue"); DBUG_ENTER("NdbBlob::setPartKeyValue");
DBUG_PRINT("info", ("dist=%u part=%u key=", getDistKey(part), part)); DBUG_PRINT("info", ("dist=%u part=%u packkey=", getDistKey(part), part));
DBUG_DUMP("info", theKeyBuf.data, 4 * theTable->m_keyLenInWords); DBUG_DUMP("info", thePackKeyBuf.data, 4 * thePackKeyBuf.size);
//Uint32* data = (Uint32*)theKeyBuf.data;
//unsigned size = theTable->m_keyLenInWords;
// TODO use attr ids after compatibility with 4.1.7 not needed // TODO use attr ids after compatibility with 4.1.7 not needed
if (anOp->equal("PK", theKeyBuf.data) == -1 || if (anOp->equal("PK", thePackKeyBuf.data) == -1 ||
anOp->equal("DIST", getDistKey(part)) == -1 || anOp->equal("DIST", getDistKey(part)) == -1 ||
anOp->equal("PART", part) == -1) { anOp->equal("PART", part) == -1) {
setErrorCode(anOp); setErrorCode(anOp);
...@@ -1242,21 +1309,27 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl ...@@ -1242,21 +1309,27 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
if (isKeyOp()) { if (isKeyOp()) {
if (isTableOp()) { if (isTableOp()) {
// get table key // get table key
Uint32* data = (Uint32*)theKeyBuf.data; Uint32* data = (Uint32*)thePackKeyBuf.data;
unsigned size = theTable->m_keyLenInWords; Uint32 size = theTable->m_keyLenInWords; // in-out
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
thePackKeyBuf.size = 4 * size;
if (unpackKeyValue(theTable, theKeyBuf) == -1)
DBUG_RETURN(-1);
} }
if (isIndexOp()) { if (isIndexOp()) {
// get index key // get index key
Uint32* data = (Uint32*)theAccessKeyBuf.data; Uint32* data = (Uint32*)thePackKeyBuf.data;
unsigned size = theAccessTable->m_keyLenInWords; Uint32 size = theAccessTable->m_keyLenInWords; // in-out
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
thePackKeyBuf.size = 4 * size;
if (unpackKeyValue(theAccessTable, theAccessKeyBuf) == -1)
DBUG_RETURN(-1);
} }
if (isReadOp()) { if (isReadOp()) {
// add read of head+inline in this op // add read of head+inline in this op
...@@ -1303,6 +1376,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, ...@@ -1303,6 +1376,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
theEventOp = anOp; theEventOp = anOp;
theBlobEventOp = aBlobOp; theBlobEventOp = aBlobOp;
theTable = anOp->m_eventImpl->m_tableImpl; theTable = anOp->m_eventImpl->m_tableImpl;
theAccessTable = theTable;
theColumn = aColumn; theColumn = aColumn;
// prepare blob column and table // prepare blob column and table
if (prepareColumn() == -1) if (prepareColumn() == -1)
...@@ -1321,7 +1395,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, ...@@ -1321,7 +1395,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
if (theBlobEventOp != NULL) { if (theBlobEventOp != NULL) {
if ((theBlobEventPkRecAttr = if ((theBlobEventPkRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0), theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
theKeyBuf.data, version)) == NULL || thePackKeyBuf.data, version)) == NULL ||
(theBlobEventDistRecAttr = (theBlobEventDistRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1), theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
(char*)0, version)) == NULL || (char*)0, version)) == NULL ||
...@@ -1380,6 +1454,7 @@ NdbBlob::prepareColumn() ...@@ -1380,6 +1454,7 @@ NdbBlob::prepareColumn()
} }
// these buffers are always used // these buffers are always used
theKeyBuf.alloc(theTable->m_keyLenInWords << 2); theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
thePackKeyBuf.alloc(max(theTable->m_keyLenInWords, theAccessTable->m_keyLenInWords) << 2);
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize); theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
theHead = (Head*)theHeadInlineBuf.data; theHead = (Head*)theHeadInlineBuf.data;
theInlineData = theHeadInlineBuf.data + sizeof(Head); theInlineData = theHeadInlineBuf.data + sizeof(Head);
...@@ -1464,7 +1539,7 @@ NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch) ...@@ -1464,7 +1539,7 @@ NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch)
if (tOp == NULL || if (tOp == NULL ||
tOp->readTuple() == -1 || tOp->readTuple() == -1 ||
setAccessKeyValue(tOp) == -1 || setAccessKeyValue(tOp) == -1 ||
tOp->getValue(pkAttrId, theKeyBuf.data) == NULL) { tOp->getValue(pkAttrId, thePackKeyBuf.data) == NULL) {
setErrorCode(tOp); setErrorCode(tOp);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
...@@ -1553,10 +1628,12 @@ NdbBlob::postExecute(NdbTransaction::ExecType anExecType) ...@@ -1553,10 +1628,12 @@ NdbBlob::postExecute(NdbTransaction::ExecType anExecType)
assert(isKeyOp()); assert(isKeyOp());
if (isIndexOp()) { if (isIndexOp()) {
NdbBlob* tFirstBlob = theNdbOp->theBlobList; NdbBlob* tFirstBlob = theNdbOp->theBlobList;
if (this != tFirstBlob) { if (this == tFirstBlob) {
packKeyValue(theTable, theKeyBuf);
} else {
// copy key from first blob // copy key from first blob
assert(theKeyBuf.size == tFirstBlob->theKeyBuf.size); theKeyBuf.copyfrom(tFirstBlob->theKeyBuf);
memcpy(theKeyBuf.data, tFirstBlob->theKeyBuf.data, tFirstBlob->theKeyBuf.size); thePackKeyBuf.copyfrom(tFirstBlob->thePackKeyBuf);
} }
} }
if (isReadOp()) { if (isReadOp()) {
...@@ -1710,12 +1787,16 @@ NdbBlob::atNextResult() ...@@ -1710,12 +1787,16 @@ NdbBlob::atNextResult()
DBUG_RETURN(-1); DBUG_RETURN(-1);
assert(isScanOp()); assert(isScanOp());
// get primary key // get primary key
{ Uint32* data = (Uint32*)theKeyBuf.data; { NdbScanOperation* tScanOp = (NdbScanOperation*)theNdbOp;
unsigned size = theTable->m_keyLenInWords; Uint32* data = (Uint32*)thePackKeyBuf.data;
if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) { unsigned size = theTable->m_keyLenInWords; // in-out
if (tScanOp->getKeyFromKEYINFO20(data, size) == -1) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
thePackKeyBuf.size = 4 * size;
if (unpackKeyValue(theTable, theKeyBuf) == -1)
DBUG_RETURN(-1);
} }
getHeadFromRecAttr(); getHeadFromRecAttr();
if (setPos(0) == -1) if (setPos(0) == -1)
......
...@@ -34,6 +34,8 @@ public: ...@@ -34,6 +34,8 @@ public:
STATIC_CONST( ErrAbort = 4268 ); STATIC_CONST( ErrAbort = 4268 );
// "Unknown blob error" // "Unknown blob error"
STATIC_CONST( ErrUnknown = 4269 ); STATIC_CONST( ErrUnknown = 4269 );
// "Corrupted main table PK in blob operation"
STATIC_CONST( ErrCorruptPK = 4274 );
}; };
#endif #endif
...@@ -472,7 +472,8 @@ void ...@@ -472,7 +472,8 @@ void
NdbOperation::reorderKEYINFO() NdbOperation::reorderKEYINFO()
{ {
Uint32 data[4000]; Uint32 data[4000];
getKeyFromTCREQ(data, 4000); Uint32 size = 4000;
getKeyFromTCREQ(data, size);
Uint32 pos = 1; Uint32 pos = 1;
Uint32 k; Uint32 k;
for (k = 0; k < m_accessTable->m_noOfKeys; k++) { for (k = 0; k < m_accessTable->m_noOfKeys; k++) {
...@@ -501,7 +502,7 @@ NdbOperation::reorderKEYINFO() ...@@ -501,7 +502,7 @@ NdbOperation::reorderKEYINFO()
} }
int int
NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size) NdbOperation::getKeyFromTCREQ(Uint32* data, Uint32 & size)
{ {
assert(size >= theTupKeyLen && theTupKeyLen > 0); assert(size >= theTupKeyLen && theTupKeyLen > 0);
size = theTupKeyLen; size = theTupKeyLen;
......
...@@ -199,7 +199,7 @@ NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r) ...@@ -199,7 +199,7 @@ NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r)
out << hex << "H'" << r.u_32_value() << dec; out << hex << "H'" << r.u_32_value() << dec;
break; break;
case NdbDictionary::Column::Unsigned: case NdbDictionary::Column::Unsigned:
out << r.u_32_value(); out << *((Uint32*)r.aRef() + j);
break; break;
case NdbDictionary::Column::Smallunsigned: case NdbDictionary::Column::Smallunsigned:
out << r.u_short_value(); out << r.u_short_value();
......
...@@ -912,13 +912,20 @@ NdbScanOperation::doSendScan(int aProcessorId) ...@@ -912,13 +912,20 @@ NdbScanOperation::doSendScan(int aProcessorId)
* the scan process. * the scan process.
****************************************************************************/ ****************************************************************************/
int int
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size) NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 & size)
{ {
NdbRecAttr * tRecAttr = m_curr_row; NdbRecAttr * tRecAttr = m_curr_row;
if(tRecAttr) if(tRecAttr)
{ {
const Uint32 * src = (Uint32*)tRecAttr->aRef(); const Uint32 * src = (Uint32*)tRecAttr->aRef();
memcpy(data, src, 4*size);
assert(tRecAttr->get_size_in_bytes() > 0);
assert(tRecAttr->get_size_in_bytes() < 65536);
const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
assert(size >= len);
memcpy(data, src, 4*len);
size = len;
return 0; return 0;
} }
return -1; return -1;
......
...@@ -599,7 +599,8 @@ ErrorBundle ErrorCodes[] = { ...@@ -599,7 +599,8 @@ ErrorBundle ErrorCodes[] = {
{ 4336, DMEC, AE, "Auto-increment value set below current value" }, { 4336, DMEC, AE, "Auto-increment value set below current value" },
{ 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" }, { 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" },
{ 4272, DMEC, AE, "Table definition has undefined column" }, { 4272, DMEC, AE, "Table definition has undefined column" },
{ 4273, DMEC, IE, "No blob table in dict cache" } { 4273, DMEC, IE, "No blob table in dict cache" },
{ 4274, DMEC, IE, "Corrupted main table PK in blob operation" }
}; };
static static
......
...@@ -223,7 +223,7 @@ dropTable() ...@@ -223,7 +223,7 @@ dropTable()
{ {
NdbDictionary::Table tab(g_opt.m_tname); NdbDictionary::Table tab(g_opt.m_tname);
if (g_dic->getTable(g_opt.m_tname) != 0) if (g_dic->getTable(g_opt.m_tname) != 0)
CHK(g_dic->dropTable(tab) == 0); CHK(g_dic->dropTable(g_opt.m_tname) == 0);
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment