Commit c690398a authored by joreland@mysql.com's avatar joreland@mysql.com

correct distr key handling

parent 8d82c130
...@@ -920,6 +920,9 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col) ...@@ -920,6 +920,9 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
out << " NOT NULL"; out << " NOT NULL";
else else
out << " NULL"; out << " NULL";
if(col.getDistributionKey())
out << " DISTRIBUTION KEY";
return out; return out;
} }
......
...@@ -70,6 +70,7 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex, ...@@ -70,6 +70,7 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex,
} }
m_theIndex = anIndex; m_theIndex = anIndex;
m_accessTable = anIndex->m_table; m_accessTable = anIndex->m_table;
theNoOfTupKeyLeft = m_accessTable->getNoOfPrimaryKeys();
return 0; return 0;
} }
......
...@@ -229,7 +229,14 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -229,7 +229,14 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
theNoOfTupKeyLeft = tNoKeysDef; theNoOfTupKeyLeft = tNoKeysDef;
tErrorLine++; tErrorLine++;
theErrorLine = tErrorLine; theErrorLine = tErrorLine;
if (tNoKeysDef == 0) { if (tNoKeysDef == 0) {
if (tDistrKey &&
handle_distribution_key((Uint64*)aValue, totalSizeInWords))
{
return -1;
}
if (tOpType == UpdateRequest) { if (tOpType == UpdateRequest) {
if (tInterpretInd == 1) { if (tInterpretInd == 1) {
theStatus = GetValue; theStatus = GetValue;
...@@ -259,18 +266,12 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -259,18 +266,12 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
setErrorCodeAbort(4005); setErrorCodeAbort(4005);
return -1; return -1;
}//if }//if
}//if
if (!tDistrKey)
{
return 0; return 0;
} }//if
else
{
return handle_distribution_key((Uint64*)aValue, totalSizeInWords);
}
} else { } else {
return -1; return -1;
}//if }//if
return 0;
} }
if (aValue == NULL) { if (aValue == NULL) {
...@@ -290,6 +291,8 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -290,6 +291,8 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
setErrorCodeAbort(4225); setErrorCodeAbort(4225);
return -1; return -1;
}//if }//if
ndbout_c("theStatus: %d", theStatus);
// If we come here, set a general errorcode // If we come here, set a general errorcode
// and exit // and exit
...@@ -515,17 +518,22 @@ NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size) ...@@ -515,17 +518,22 @@ NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size)
int int
NdbOperation::handle_distribution_key(const Uint64* value, Uint32 len) NdbOperation::handle_distribution_key(const Uint64* value, Uint32 len)
{ {
if(theNoOfTupKeyLeft > 0) if(theDistrKeyIndicator_ == 1 ||
(theNoOfTupKeyLeft > 0 && m_accessTable->m_noOfDistributionKeys > 1))
{ {
ndbout_c("handle_distribution_key - exit");
return 0; return 0;
} }
if(m_accessTable->m_noOfDistributionKeys == 1) if(m_accessTable->m_noOfDistributionKeys == 1)
{ {
setPartitionHash(value, len); setPartitionHash(value, len);
ndbout_c("handle_distribution_key - 1 key");
} }
else else
{ {
ndbout_c("handle_distribution_key - long");
/** /**
* Copy distribution key to linear memory * Copy distribution key to linear memory
*/ */
......
...@@ -1028,6 +1028,12 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1028,6 +1028,12 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
Uint32 remaining = KeyInfo::DataLength - currLen; Uint32 remaining = KeyInfo::DataLength - currLen;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
bool tDistrKey = tAttrInfo->m_distributionKey; bool tDistrKey = tAttrInfo->m_distributionKey;
len = aValue != NULL ? sizeInBytes : 0;
if (len != sizeInBytes && (len != 0)) {
setErrorCodeAbort(4209);
return -1;
}
// normalize char bound // normalize char bound
CHARSET_INFO* cs = tAttrInfo->m_cs; CHARSET_INFO* cs = tAttrInfo->m_cs;
...@@ -1035,26 +1041,23 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1035,26 +1041,23 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
if (cs != NULL && aValue != NULL) { if (cs != NULL && aValue != NULL) {
// current limitation: strxfrm does not increase length // current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1); assert(cs->strxfrm_multiply == 1);
((Uint32*)xfrmData)[len >> 2] = 0;
unsigned n = unsigned n =
(*cs->coll->strnxfrm)(cs, (*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData), (uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, sizeInBytes); (const uchar*)aValue, len);
((Uint32*)xfrmData)[sizeInBytes >> 2] = 0;
while (n < sizeInBytes) while (n < len)
((uchar*)xfrmData)[n++] = 0x20; ((uchar*)xfrmData)[n++] = 0x20;
if(sizeInBytes & 3) if(len & 3)
sizeInBytes += (4 - sizeInBytes & 3); {
len += (4 - (len & 3));
}
aValue = (char*)xfrmData; aValue = (char*)xfrmData;
} }
len = aValue != NULL ? sizeInBytes : 0;
if (len != sizeInBytes && (len != 0)) {
setErrorCodeAbort(4209);
return -1;
}
// insert attribute header // insert attribute header
Uint32 tIndexAttrId = tAttrInfo->m_attrId; Uint32 tIndexAttrId = tAttrInfo->m_attrId;
Uint32 sizeInWords = (len + 3) / 4; Uint32 sizeInWords = (len + 3) / 4;
...@@ -1062,7 +1065,9 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1062,7 +1065,9 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
const Uint32 ahValue = ah.m_value; const Uint32 ahValue = ah.m_value;
const Uint32 align = (UintPtr(aValue) & 7); const Uint32 align = (UintPtr(aValue) & 7);
const bool aligned = (type == BoundEQ) ? (align & 3) == 0 : (align == 0); const bool aligned = (tDistrKey && type == BoundEQ) ?
(align == 0) : (align & 3) == 0;
const bool nobytes = (len & 0x3) == 0; const bool nobytes = (len & 0x3) == 0;
const Uint32 totalLen = 2 + sizeInWords; const Uint32 totalLen = 2 + sizeInWords;
Uint32 tupKeyLen = theTupKeyLen; Uint32 tupKeyLen = theTupKeyLen;
...@@ -1077,8 +1082,9 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1077,8 +1082,9 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
Uint32 *tempData = (Uint32*)xfrmData; Uint32 *tempData = (Uint32*)xfrmData;
tempData[0] = type; tempData[0] = type;
tempData[1] = ahValue; tempData[1] = ahValue;
tempData[sizeInBytes >> 2] = 0; tempData[2 + (len >> 2)] = 0;
memcpy(tempData+2, aValue, len); memcpy(tempData+2, aValue, len);
insertBOUNDS(tempData, 2+sizeInWords); insertBOUNDS(tempData, 2+sizeInWords);
} else { } else {
Uint32 buf[2] = { type, ahValue }; Uint32 buf[2] = { type, ahValue };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment