Commit 3f3ea303 authored by unknown's avatar unknown

Fixed bugs in ordered scan discovered by mysql-test-run

Enabled ordered scan in handler



ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  Moved saveBound to NdbIndexScanOperation
ndb/include/ndbapi/NdbScanOperation.hpp:
  Moved saveBound to NdbIndexScanOperation
ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Introduced map for
  index attributes (keys) -> real attr id (and back)
ndb/src/ndbapi/NdbDictionaryImpl.hpp:
  Introduced map for
  index attributes (keys) -> real attr id (and back)
ndb/src/ndbapi/NdbOperationDefine.cpp:
  Moved saveBound to NdbIndexScanOperation
ndb/src/ndbapi/NdbOperationInt.cpp:
  Moved saveBound to NdbIndexScanOperation
ndb/src/ndbapi/NdbScanOperation.cpp:
  Moved saveBound to NdbIndexScanOperation
  Fixed bugs in handling of setBounds w.r.t getValues and index keys
    (use new reverse map)
  Fixed bugs in next_result_ordered
sql/ha_ndbcluster.cc:
  Use sorted scan when requested
parent 8aac8515
......@@ -125,6 +125,7 @@ private:
virtual ~NdbIndexScanOperation();
int setBound(const NdbColumnImpl*, int type, const void* aValue, Uint32 len);
int saveBoundATTRINFO();
virtual int equal_impl(const NdbColumnImpl*, const char*, Uint32);
virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);
......
......@@ -128,7 +128,6 @@ protected:
NdbApiSignal* theSCAN_TABREQ;
int getFirstATTRINFOScan();
int saveBoundATTRINFO();
int doSendScan(int ProcessorId);
int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId);
......
......@@ -1737,8 +1737,8 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
return 0;
}
NdbTableImpl* primTab = getTable(tab->m_primaryTable.c_str());
if(primTab == 0){
NdbTableImpl* prim = getTable(tab->m_primaryTable.c_str());
if(prim == 0){
m_error.code = 4243;
return 0;
}
......@@ -1752,7 +1752,7 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
idx->m_indexId = tab->m_tableId;
idx->m_internalName.assign(internalName);
idx->m_externalName.assign(externalName);
idx->m_tableName.assign(primTab->m_externalName);
idx->m_tableName.assign(prim->m_externalName);
idx->m_type = tab->m_indexType;
// skip last attribute (NDB$PK or NDB$TNODE)
for(unsigned i = 0; i+1<tab->m_columns.size(); i++){
......@@ -1760,6 +1760,14 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
// Copy column definition
*col = *tab->m_columns[i];
idx->m_columns.push_back(col);
/**
* reverse map
*/
int key_id = prim->getColumn(col->getName())->getColumnNo();
int fill = -1;
idx->m_key_ids.fill(key_id, fill);
idx->m_key_ids[key_id] = i;
col->m_keyInfoPos = key_id;
}
idx->m_table = tab;
......
......@@ -176,6 +176,7 @@ public:
BaseString m_externalName;
BaseString m_tableName;
Vector<NdbColumnImpl *> m_columns;
Vector<int> m_key_ids;
NdbDictionary::Index::Type m_type;
bool m_logging;
......
......@@ -34,7 +34,7 @@
#include "NdbUtil.hpp"
#include "NdbOut.hpp"
#include "NdbImpl.hpp"
#include <NdbScanOperation.hpp>
#include <NdbIndexScanOperation.hpp>
#include "NdbBlob.hpp"
#include <Interpreter.hpp>
......@@ -317,7 +317,7 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue)
(!tAttrInfo->m_indexOnly) &&
(theStatus != Init)){
if (theStatus == SetBound) {
((NdbScanOperation*)this)->saveBoundATTRINFO();
((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
if (theStatus != GetValue) {
......
......@@ -33,7 +33,7 @@ Adjust: 991029 UABRONM First version.
#include "NdbRecAttr.hpp"
#include "NdbUtil.hpp"
#include "Interpreter.hpp"
#include <NdbScanOperation.hpp>
#include <NdbIndexScanOperation.hpp>
#ifdef VM_TRACE
#include <NdbEnv.h>
......@@ -217,7 +217,7 @@ NdbOperation::initial_interpreterCheck()
{
if ((theInterpretIndicator == 1)) {
if (theStatus == SetBound) {
((NdbScanOperation*)this)->saveBoundATTRINFO();
((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
if (theStatus == ExecInterpretedValue) {
......
......@@ -375,7 +375,7 @@ NdbScanOperation::getFirstATTRINFOScan()
* a separate list. Then continue with normal scan.
*/
int
NdbScanOperation::saveBoundATTRINFO()
NdbIndexScanOperation::saveBoundATTRINFO()
{
theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
theBoundATTRINFO = theFirstATTRINFO;
......@@ -395,18 +395,27 @@ NdbScanOperation::saveBoundATTRINFO()
* unless the one's with EqBound
*/
if(!res && m_ordered){
Uint32 idx = 0;
Uint32 cnt = m_currentTable->getNoOfPrimaryKeys();
while(!theTupleKeyDefined[idx][0] && idx < cnt){
NdbColumnImpl* col = m_currentTable->getColumn(idx);
/**
* If setBound EQ
*/
Uint32 i = 0;
while(theTupleKeyDefined[i][0] == SETBOUND_EQ)
i++;
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
m_sort_columns = cnt - i;
for(; i<cnt; i++){
NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
UintPtr newVal = UintPtr(tmp);
theTupleKeyDefined[idx][0] = FAKE_PTR;
theTupleKeyDefined[idx][1] = (newVal & 0xFFFFFFFF);
theTupleKeyDefined[i][0] = FAKE_PTR;
theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8)
theTupleKeyDefined[idx][2] = (newVal >> 32);
theTupleKeyDefined[i][2] = (newVal >> 32);
#endif
idx++;
}
}
return res;
......@@ -753,7 +762,7 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
}
if (theStatus == SetBound) {
saveBoundATTRINFO();
((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
......@@ -1049,19 +1058,33 @@ NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject,
NdbRecAttr*
NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
char* aValue){
if(!attrInfo->getPrimaryKey() || !m_ordered){
if(!m_ordered){
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
if (theStatus == SetBound) {
saveBoundATTRINFO();
theStatus = GetValue;
}
Uint32 id = attrInfo->m_attrId;
int id = attrInfo->m_attrId; // In "real" table
assert(m_accessTable->m_index);
int sz = (int)m_accessTable->m_index->m_key_ids.size();
if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
Uint32 marker = theTupleKeyDefined[id][0];
if(marker == SETBOUND_EQ){
return NdbScanOperation::getValue_impl(attrInfo, aValue);
} else if(marker == API_PTR){
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
assert(marker == FAKE_PTR);
UintPtr oldVal;
oldVal = theTupleKeyDefined[id][1];
#if (SIZEOF_CHARP == 8)
......@@ -1071,6 +1094,7 @@ NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
tmp->setup(attrInfo, aValue);
return tmp;
}
......@@ -1121,10 +1145,9 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
* so it's safe to use [tIndexAttrId]
* (instead of looping as is NdbOperation::equal_impl)
*/
if(!theTupleKeyDefined[tIndexAttrId][0]){
if(type == BoundEQ && !theTupleKeyDefined[tIndexAttrId][0]){
theNoOfTupKeyDefined++;
theTupleKeyDefined[tIndexAttrId][0] = SETBOUND_EQ;
m_sort_columns -= m_ordered;
}
return 0;
......@@ -1142,8 +1165,9 @@ NdbIndexScanOperation::readTuples(LockMode lm,
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
if(rs && order_by){
m_ordered = 1;
m_sort_columns = m_accessTable->getNoOfPrimaryKeys();
m_sort_columns = m_accessTable->getNoOfColumns() - 1; // -1 for NDB$NODE
m_current_api_receiver = m_sent_receivers_count;
m_api_receivers_count = m_sent_receivers_count;
}
return rs;
}
......@@ -1154,33 +1178,29 @@ NdbIndexScanOperation::fix_get_values(){
* Loop through all getValues and set buffer pointer to "API" pointer
*/
NdbRecAttr * curr = theReceiver.theFirstRecAttr;
Uint32 cnt = m_sort_columns;
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
Uint32 idx = 0;
NdbIndexImpl * idx = m_accessTable->m_index;
NdbTableImpl * tab = m_currentTable;
while(cnt > 0){ // To MAXNROFTUPLEKEY loops
NdbColumnImpl * col = tab->getColumn(idx);
if(col->getPrimaryKey()){
Uint32 val = theTupleKeyDefined[idx][0];
switch(val){
case FAKE_PTR:
curr->setup(col, 0);
// Fall-through
case API_PTR:
cnt--;
break;
case SETBOUND_EQ:
(void)1;
for(Uint32 i = 0; i<cnt; i++){
Uint32 val = theTupleKeyDefined[i][0];
switch(val){
case FAKE_PTR:{
NdbColumnImpl * key = idx->m_columns[i];
NdbColumnImpl * col = tab->getColumn(key->m_keyInfoPos);
curr->setup(col, 0);
}
break;
case API_PTR:
case SETBOUND_EQ:
break;
#ifdef VM_TRACE
break;
default:
abort();
default:
abort();
#endif
}
}
idx++;
curr = curr->next();
}
}
......@@ -1222,24 +1242,23 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
int
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Uint32 u_idx = m_current_api_receiver; // start of unsorted
Uint32 u_last = u_idx + 1; // last unsorted
Uint32 s_idx = u_last; // start of sorted
Uint32 u_idx, u_last;
Uint32 s_idx = m_current_api_receiver; // first sorted
Uint32 s_last = theParallelism; // last sorted
NdbReceiver** arr = m_api_receivers;
NdbReceiver* tRec = arr[u_idx];
NdbReceiver* tRec = arr[s_idx];
if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
fetchAllowed,
(u_idx < s_last ? tRec->nextResult() : 0));
(s_idx < s_last ? tRec->nextResult() : 0));
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
u_idx, u_last,
s_idx, s_last);
bool fetchNeeded = (u_idx == s_last) || !tRec->nextResult();
bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
if(fetchNeeded){
if(fetchAllowed){
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
......@@ -1247,8 +1266,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(u_idx)){
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
......@@ -1256,24 +1276,30 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
}
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1;
}
u_idx = 0;
u_last = m_conf_receivers_count;
s_idx = (u_last > 1 ? s_last : s_idx);
m_conf_receivers_count = 0;
memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
if(theError.code){
setErrorCode(theError.code);
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1;
}
}
} else {
if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
return 2;
}
} else {
u_idx = s_idx;
u_last = s_idx + 1;
s_idx++;
}
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
......@@ -1319,6 +1345,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
tRec = m_api_receivers[s_idx];
if(s_idx < s_last && tRec->nextResult()){
tRec->copyout(theReceiver);
if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
return 0;
}
......@@ -1329,9 +1356,11 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
if(seq == tp->getNodeSequence(nodeId) &&
send_next_scan(0, true) == 0 &&
theError.code == 0){
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
return 1;
}
setErrorCode(theError.code);
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1;
}
......@@ -1363,7 +1392,8 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
tRec->prepareSend();
m_sent_receivers_count = last + 1;
m_current_api_receiver = idx + 1;
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1);
......
......@@ -871,7 +871,8 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,
parallelism, sorted)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment