Commit ef5d0be3 authored by unknown's avatar unknown

bug#6775 - ndb

Queue scan on real fragment.
  Index fragment for range scans
  Table fragment for table scans


ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Document meaning of fragPtrI and how it differs
    from scanTcRec->fragmentptr
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Queue scan on real fragment.
    Index fragment for range scans
    Table fragment for table scans
ndb/src/ndbapi/NdbConnection.cpp:
  Check tOp before assigning
parent 6f07c1bb
......@@ -550,6 +550,11 @@ public:
UintR scanErrorCounter;
UintR scanLocalFragid;
UintR scanSchemaVersion;
/**
* This is _always_ main table, even in range scan
* in which case scanTcrec->fragmentptr is different
*/
Uint32 fragPtrI;
UintR scanStoredProcId;
ScanState scanState;
......
......@@ -7703,6 +7703,9 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){
jam();
scanptr.i = scan_ptr_i;
c_scanRecordPool.getPtr(scanptr);
fragptr.i = tcConnectptr.p->fragmentptr;
ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
finishScanrec(signal);
releaseScanrec(signal);
tcConnectptr.p->transactionState = TcConnectionrec::IDLE;
......@@ -8570,10 +8573,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
/**
* Used for scan take over
*/
FragrecordPtr tFragPtr;
tFragPtr.i = fragptr.p->tableFragptr;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
scanptr.p->fragPtrI = fragptr.p->tableFragptr;
{
FragrecordPtr tFragPtr;
tFragPtr.i = fragptr.p->tableFragptr;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
scanptr.p->fragPtrI = fragptr.p->tableFragptr;
}
/**
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
......@@ -8582,8 +8587,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
stop += start;
Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
Uint32 free = fragptr.p->m_scanNumberMask.find(start);
if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){
jam();
......@@ -8597,16 +8602,16 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
*/
scanptr.p->scanState = ScanRecord::IN_QUEUE;
LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
tFragPtr.p->m_queuedScans);
fragptr.p->m_queuedScans);
queue.add(scanptr);
return ZOK;
}
scanptr.p->scanNumber = free;
tFragPtr.p->m_scanNumberMask.clear(free);// Update mask
fragptr.p->m_scanNumberMask.clear(free);// Update mask
LocalDLList<ScanRecord> active(c_scanRecordPool, tFragPtr.p->m_activeScans);
LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans);
active.add(scanptr);
if(scanptr.p->scanKeyinfoFlag){
jam();
......@@ -8666,12 +8671,8 @@ void Dblqh::finishScanrec(Signal* signal)
{
release_acc_ptr_list(scanptr.p);
FragrecordPtr tFragPtr;
tFragPtr.i = scanptr.p->fragPtrI;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
tFragPtr.p->m_queuedScans);
fragptr.p->m_queuedScans);
if(scanptr.p->scanState == ScanRecord::IN_QUEUE){
jam();
......@@ -8689,11 +8690,11 @@ void Dblqh::finishScanrec(Signal* signal)
ndbrequire(tmp.p == scanptr.p);
}
LocalDLList<ScanRecord> scans(c_scanRecordPool, tFragPtr.p->m_activeScans);
LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans);
scans.release(scanptr);
const Uint32 scanNumber = scanptr.p->scanNumber;
ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber));
ndbrequire(!fragptr.p->m_scanNumberMask.get(scanNumber));
ScanRecordPtr restart;
/**
......@@ -8701,13 +8702,13 @@ void Dblqh::finishScanrec(Signal* signal)
*/
if(scanNumber == NR_ScanNo || !queue.first(restart)){
jam();
tFragPtr.p->m_scanNumberMask.set(scanNumber);
fragptr.p->m_scanNumberMask.set(scanNumber);
return;
}
if(ERROR_INSERTED(5034)){
jam();
tFragPtr.p->m_scanNumberMask.set(scanNumber);
fragptr.p->m_scanNumberMask.set(scanNumber);
return;
}
......@@ -8718,7 +8719,7 @@ void Dblqh::finishScanrec(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
restart.p->scanNumber = scanNumber;
restart.p->scanState = ScanRecord::WAIT_ACC_SCAN;
queue.remove(restart);
scans.add(restart);
if(restart.p->scanKeyinfoFlag){
......
......@@ -1086,8 +1086,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index,
if (indexTable != 0){
NdbIndexScanOperation* tOp =
getNdbScanOperation((NdbTableImpl *) indexTable);
tOp->m_currentTable = table;
if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor;
if(tOp)
{
tOp->m_currentTable = table;
tOp->m_cursor_type = NdbScanOperation::IndexCursor;
}
return tOp;
} else {
setOperationErrorCodeAbort(theNdb->theError.code);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment