Commit 0a8bbe4d authored by joreland@mysql.com's avatar joreland@mysql.com

wl2135 - index restart

parent e5f1db69
......@@ -37,6 +37,14 @@ a b c
1 2 3
2 3 5
3 4 6
select tt1.* from t1 as tt1, t1 as tt2 use index(b) where tt1.b = tt2.b order by tt1.c;
a b c
6 7 2
5 6 2
1 2 3
2 3 5
3 4 6
4 5 8
update t1 set c = 3 where b = 3;
select * from t1 order by a;
a b c
......
......@@ -23,6 +23,9 @@ select * from t1 where b > 4 order by b;
select * from t1 where b < 4 order by b;
select * from t1 where b <= 4 order by b;
# Test of reset_bounds
select tt1.* from t1 as tt1, t1 as tt2 use index(b) where tt1.b = tt2.b order by tt1.c;
#
# Here we should add some "explain select" to verify that the ordered index is
# used for these queries.
......
......@@ -74,6 +74,7 @@ private:
static Uint8 getHoldLockFlag(const UintR & requestInfo);
static Uint8 getReadCommittedFlag(const UintR & requestInfo);
static Uint8 getRangeScanFlag(const UintR & requestInfo);
static Uint8 getKeyinfoFlag(const UintR & requestInfo);
static Uint16 getScanBatch(const UintR & requestInfo);
/**
......@@ -85,6 +86,7 @@ private:
static void setHoldLockFlag(UintR & requestInfo, Uint32 flag);
static void setReadCommittedFlag(UintR & requestInfo, Uint32 flag);
static void setRangeScanFlag(UintR & requestInfo, Uint32 flag);
static void setKeyinfoFlag(UintR & requestInfo, Uint32 flag);
static void setScanBatch(Uint32& requestInfo, Uint32 sz);
};
......@@ -95,12 +97,13 @@ private:
l = Lock mode - 1 Bit 8
h = Hold lock mode - 1 Bit 10
c = Read Committed - 1 Bit 11
k = Keyinfo - 1 Bit 12
x = Range Scan (TUX) - 1 Bit 15
b = Scan batch - 10 Bit 16-25 (max 1023)
1111111111222222222233
01234567890123456789012345678901
ppppppppl hc xbbbbbbbbbb
ppppppppl hck xbbbbbbbbbb
*/
#define PARALLELL_SHIFT (0)
......@@ -112,6 +115,9 @@ private:
#define HOLD_LOCK_SHIFT (10)
#define HOLD_LOCK_MASK (1)
#define KEYINFO_SHIFT (12)
#define KEYINFO_MASK (1)
#define READ_COMMITTED_SHIFT (11)
#define READ_COMMITTED_MASK (1)
......@@ -206,6 +212,20 @@ ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){
requestInfo |= (flag << SCAN_BATCH_SHIFT);
}
inline
Uint8
ScanTabReq::getKeyinfoFlag(const UintR & requestInfo){
return (Uint8)((requestInfo >> KEYINFO_SHIFT) & KEYINFO_MASK);
}
inline
void
ScanTabReq::setKeyinfoFlag(UintR & requestInfo, Uint32 flag){
ASSERT_BOOL(flag, "ScanTabReq::setKeyinfoFlag");
requestInfo |= (flag << KEYINFO_SHIFT);
}
/**
*
* SENDER: Dbtc
......
......@@ -125,6 +125,7 @@ public:
*/
int reset_bounds();
bool getSorted() const { return m_ordered; }
private:
NdbIndexScanOperation(Ndb* aNdb);
virtual ~NdbIndexScanOperation();
......
......@@ -717,6 +717,8 @@ public:
NotDefined ///< Internal for debugging
};
LockMode getLockMode() const { return theLockMode; }
protected:
/******************************************************************************
* These are the methods used to create and delete the NdbOperation objects.
......@@ -893,7 +895,7 @@ protected:
// currently defined
OperationType theOperationType; // Read Request, Update Req......
Uint8 theLockMode; // Can be set to WRITE if read operation
LockMode theLockMode; // Can be set to WRITE if read operation
OperationStatus theStatus; // The status of the operation.
Uint32 theMagicNumber; // Magic number to verify that object
// is correct
......
......@@ -139,6 +139,10 @@ public:
int deleteTuple();
int deleteTuple(NdbConnection* takeOverTransaction);
/**
* Get underlying operation
*/
NdbOperation* getOperation();
private:
NdbResultSet(NdbScanOperation*);
......@@ -149,4 +153,10 @@ private:
NdbScanOperation* m_operation;
};
inline
NdbOperation*
NdbResultSet::getOperation(){
return m_operation;
}
#endif
......@@ -30,12 +30,13 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " apiConnectPtr: H\'%.8x",
sig->apiConnectPtr);
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Holdlock: %u, RangeScan: %u\n",
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u\n",
sig->getParallelism(requestInfo),
sig->getScanBatch(requestInfo),
sig->getLockMode(requestInfo),
sig->getHoldLockFlag(requestInfo),
sig->getRangeScanFlag(requestInfo));
sig->getRangeScanFlag(requestInfo),
sig->getKeyinfoFlag(requestInfo));
Uint32 keyLen = (sig->attrLenKeyLen >> 16);
Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF);
......
......@@ -1197,18 +1197,7 @@ public:
Uint16 first_batch_size;
Uint32 batch_byte_size;
// Shall the locks be held until the application have read the
// records
Uint8 scanLockHold;
// Shall the locks be read or write locks
Uint8 scanLockMode;
// Skip locks by other transactions and read latest committed
Uint8 readCommitted;
// Scan is on ordered index
Uint8 rangeScan;
Uint32 scanRequestInfo; // ScanFrag format
// Close is ordered
bool m_close_scan_req;
......
......@@ -8599,8 +8599,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
UintR scanParallel,
UintR noOprecPerFrag)
{
const UintR reqinfo = scanTabReq->requestInfo;
scanptr.p->scanTcrec = tcConnectptr.i;
scanptr.p->scanApiRec = apiConnectptr.i;
scanptr.p->scanAiLength = scanTabReq->attrLenKeyLen & 0xFFFF;
......@@ -8611,10 +8609,17 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
scanptr.p->noOprecPerFrag = noOprecPerFrag;
scanptr.p->first_batch_size= scanTabReq->first_batch_size;
scanptr.p->batch_byte_size= scanTabReq->batch_byte_size;
scanptr.p->scanLockMode = ScanTabReq::getLockMode(reqinfo);
scanptr.p->scanLockHold = ScanTabReq::getHoldLockFlag(reqinfo);
scanptr.p->readCommitted = ScanTabReq::getReadCommittedFlag(reqinfo);
scanptr.p->rangeScan = ScanTabReq::getRangeScanFlag(reqinfo);
Uint32 tmp = 0;
const UintR ri = scanTabReq->requestInfo;
ScanFragReq::setLockMode(tmp, ScanTabReq::getLockMode(ri));
ScanFragReq::setHoldLockFlag(tmp, ScanTabReq::getHoldLockFlag(ri));
ScanFragReq::setKeyinfoFlag(tmp, ScanTabReq::getKeyinfoFlag(ri));
ScanFragReq::setReadCommittedFlag(tmp,ScanTabReq::getReadCommittedFlag(ri));
ScanFragReq::setRangeScanFlag(tmp, ScanTabReq::getRangeScanFlag(ri));
ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
scanptr.p->scanRequestInfo = tmp;
scanptr.p->scanStoredProcId = scanTabReq->storedProcId;
scanptr.p->scanState = ScanRecord::RUNNING;
scanptr.p->m_queued_count = 0;
......@@ -8631,7 +8636,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ptr.p->m_apiPtr = cdata[i];
}//for
(* (scanptr.p->rangeScan ?
(* (ScanTabReq::getRangeScanFlag(ri) ?
&c_counters.c_range_scan_count :
&c_counters.c_scan_count))++;
}//Dbtc::initScanrec()
......@@ -9491,17 +9496,8 @@ void Dbtc::sendScanFragReq(Signal* signal,
ScanRecord* scanP,
ScanFragRec* scanFragP)
{
Uint32 requestInfo = 0;
ScanFragReq * const req = (ScanFragReq *)&signal->theData[0];
ScanFragReq::setLockMode(requestInfo, scanP->scanLockMode);
ScanFragReq::setHoldLockFlag(requestInfo, scanP->scanLockHold);
if(scanP->scanLockMode == 1){ // Not read -> keyinfo
jam();
ScanFragReq::setKeyinfoFlag(requestInfo, 1);
}
ScanFragReq::setReadCommittedFlag(requestInfo, scanP->readCommitted);
ScanFragReq::setRangeScanFlag(requestInfo, scanP->rangeScan);
ScanFragReq::setAttrLen(requestInfo, scanP->scanAiLength);
Uint32 requestInfo = scanP->scanRequestInfo;
ScanFragReq::setScanPrio(requestInfo, 1);
apiConnectptr.i = scanP->scanApiRec;
req->tableId = scanP->scanTableref;
......@@ -10470,9 +10466,6 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanSchemaVersion,
sp.p->scanTableref,
sp.p->scanStoredProcId);
infoEvent(" lhold=%d, lmode=%d",
sp.p->scanLockHold,
sp.p->scanLockMode);
infoEvent(" apiRec=%d, next=%d",
sp.p->scanApiRec, sp.p->nextScan);
......
......@@ -55,6 +55,7 @@ NdbOperation::insertTuple()
theOperationType = InsertRequest;
tNdbCon->theSimpleState = 0;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
......@@ -74,6 +75,7 @@ NdbOperation::updateTuple()
tNdbCon->theSimpleState = 0;
theOperationType = UpdateRequest;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
......@@ -93,6 +95,7 @@ NdbOperation::writeTuple()
tNdbCon->theSimpleState = 0;
theOperationType = WriteRequest;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
......@@ -115,6 +118,8 @@ NdbOperation::readTuple(NdbOperation::LockMode lm)
case LM_CommittedRead:
return readTuple();
break;
default:
return -1;
};
}
/******************************************************************************
......@@ -130,6 +135,7 @@ NdbOperation::readTuple()
tNdbCon->theSimpleState = 0;
theOperationType = ReadRequest;
theErrorLine = tErrorLine++;
theLockMode = LM_Read;
return 0;
} else {
setErrorCode(4200);
......@@ -150,6 +156,7 @@ NdbOperation::deleteTuple()
tNdbCon->theSimpleState = 0;
theOperationType = DeleteRequest;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
......@@ -170,6 +177,7 @@ NdbOperation::readTupleExclusive()
tNdbCon->theSimpleState = 0;
theOperationType = ReadExclusive;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
......@@ -189,6 +197,7 @@ NdbOperation::simpleRead()
theOperationType = ReadRequest;
theSimpleIndicator = 1;
theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
return 0;
} else {
setErrorCode(4200);
......@@ -218,6 +227,7 @@ NdbOperation::committedRead()
theSimpleIndicator = 1;
theDirtyIndicator = 1;
theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
return 0;
} else {
setErrorCode(4200);
......@@ -240,6 +250,7 @@ NdbOperation::dirtyUpdate()
theSimpleIndicator = 1;
theDirtyIndicator = 1;
theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
return 0;
} else {
setErrorCode(4200);
......@@ -262,6 +273,7 @@ NdbOperation::dirtyWrite()
theSimpleIndicator = 1;
theDirtyIndicator = 1;
theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
return 0;
} else {
setErrorCode(4200);
......@@ -282,7 +294,7 @@ NdbOperation::interpretedUpdateTuple()
tNdbCon->theSimpleState = 0;
theOperationType = UpdateRequest;
theAI_LenInCurrAI = 25;
theLockMode = LM_Exclusive;
theErrorLine = tErrorLine++;
initInterpreter();
return 0;
......@@ -307,7 +319,7 @@ NdbOperation::interpretedDeleteTuple()
theErrorLine = tErrorLine++;
theAI_LenInCurrAI = 25;
theLockMode = LM_Exclusive;
initInterpreter();
return 0;
} else {
......
......@@ -144,6 +144,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
}
theNdbCon->theScanningOp = this;
theLockMode = lm;
bool lockExcl, lockHoldMode, readCommitted;
switch(lm){
......@@ -167,7 +168,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
return 0;
}
m_keyInfo = lockExcl;
m_keyInfo = lockExcl ? 1 : 0;
bool range = false;
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
......@@ -743,20 +744,28 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
req->batch_byte_size= batch_byte_size;
req->first_batch_size= first_batch_size;
/**
* Set keyinfo flag
* (Always keyinfo when using blobs)
*/
Uint32 reqInfo = req->requestInfo;
ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
req->requestInfo = reqInfo;
for(Uint32 i = 0; i<theParallelism; i++){
m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size);
}
return 0;
}
/******************************************************************************
/*****************************************************************************
int doSend()
Return Value: Return >0 : send was succesful, returns number of signals sent
Return -1: In all other case.
Parameters: aProcessorId: Receiving processor node
Remark: Sends the ATTRINFO signal(s)
******************************************************************************/
*****************************************************************************/
int
NdbScanOperation::doSendScan(int aProcessorId)
{
......@@ -842,7 +851,7 @@ NdbScanOperation::doSendScan(int aProcessorId)
return tSignalCount;
}//NdbOperation::doSendScan()
/******************************************************************************
/*****************************************************************************
* NdbOperation* takeOverScanOp(NdbConnection* updateTrans);
*
* Parameters: The update transactions NdbConnection pointer.
......@@ -861,7 +870,7 @@ NdbScanOperation::doSendScan(int aProcessorId)
* This means that the updating transactions can be placed
* in separate threads and thus increasing the parallelism during
* the scan process.
*****************************************************************************/
****************************************************************************/
int
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
{
......@@ -970,6 +979,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
NdbBlob*
NdbScanOperation::getBlobHandle(const char* anAttrName)
{
m_keyInfo = 1;
return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrName));
}
......@@ -977,6 +987,7 @@ NdbScanOperation::getBlobHandle(const char* anAttrName)
NdbBlob*
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
{
m_keyInfo = 1;
return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrId));
}
......
......@@ -1341,6 +1341,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key,
bool sorted, byte* buf)
{
bool restart;
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
NdbIndexScanOperation *op;
......@@ -1352,7 +1353,9 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_EXECUTE("enter", print_key(start_key, "start_key"););
DBUG_EXECUTE("enter", print_key(end_key, "end_key"););
if(m_active_cursor == 0)
{
restart= false;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
......@@ -1361,6 +1364,16 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
} else {
restart= true;
op= (NdbIndexScanOperation*)m_active_cursor->getOperation();
DBUG_ASSERT(op->getSorted() == sorted);
DBUG_ASSERT(op->getLockMode() ==
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
if(op->reset_bounds())
DBUG_RETURN(ndb_err(m_active_trans));
}
if (start_key &&
set_bounds(op, start_key,
......@@ -1383,9 +1396,18 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
NdbIndexScanOperation::BoundGT))
DBUG_RETURN(1);
}
if(!restart)
{
DBUG_RETURN(define_read_attrs(buf, op));
}
}
else
{
if (execute_no_commit(this,trans) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(next_result(buf));
}
}
/*
Start a filtered scan in NDB.
......@@ -2225,9 +2247,6 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf");
DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted));
if (m_active_cursor)
close_scan();
switch (get_index_type(active_index)){
case PRIMARY_KEY_ORDERED_INDEX:
case PRIMARY_KEY_INDEX:
......@@ -2255,11 +2274,8 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
break;
}
// Start the ordered index scan and fetch the first row
error= ordered_index_scan(start_key, end_key, sorted,
buf);
error= ordered_index_scan(start_key, end_key, sorted, buf);
DBUG_RETURN(error);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment