Commit 6663f82b authored by joreland@mysql.com's avatar joreland@mysql.com

wl2126 - ndb - Enable read_multi_range batching also for range scan(s)

 +fix mysql code style
 +adopt to interface changes by ingo
parent c9d472d5
...@@ -4715,7 +4715,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4715,7 +4715,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
ulong reclength = table->reclength; ulong reclength = table->reclength;
NdbOperation* op; NdbOperation* op;
if(uses_blob_value(m_retrieve_all_fields)) if (uses_blob_value(m_retrieve_all_fields))
{ {
/** /**
* blobs can't be batched currently * blobs can't be batched currently
...@@ -4728,62 +4728,28 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4728,62 +4728,28 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
buffer)); buffer));
} }
switch(index_type){
case UNIQUE_INDEX:
case PRIMARY_KEY_INDEX:
break;
case PRIMARY_KEY_ORDERED_INDEX:
case UNIQUE_ORDERED_INDEX:
/**
* Currently not supported on ordered indexes
*/
for(i= 0; i<range_count; i++)
{
if (ranges[i].start_key.length == key_info->key_length &&
ranges[i].start_key.flag == HA_READ_KEY_EXACT)
continue;
/**
* Mark that we using hander:: implementation
*/
m_disable_multi_read= true;
DBUG_RETURN(handler::read_multi_range_first(found_range_p,
ranges,
range_count,
sorted,
buffer));
}
break;
default:
case ORDERED_INDEX:
m_disable_multi_read= true;
DBUG_RETURN(handler::read_multi_range_first(found_range_p,
ranges,
range_count,
sorted,
buffer));
}
m_disable_multi_read= false; m_disable_multi_read= false;
multi_ranges= ranges; multi_ranges= ranges;
multi_range_count= range_count; multi_range_count= range_count;
multi_range_sorted= sorted; multi_range_sorted= sorted;
const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index;
multi_range_buffer= buffer; multi_range_buffer= buffer;
multi_range_found_p= found_range_p;
byte* curr = buffer->buffer; byte* curr= (byte*)buffer->buffer;
NdbOperation::LockMode lm= NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
const NDBTAB *tab= (const NDBTAB *) m_table; const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index; const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index;
const NdbOperation* lastOp = m_active_trans->getLastDefinedOperation(); const NdbOperation* lastOp= m_active_trans->getLastDefinedOperation();
NdbIndexScanOperation* scanOp= 0;
switch(index_type){ for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
case PRIMARY_KEY_INDEX: {
case PRIMARY_KEY_ORDERED_INDEX: switch(index_type){
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++) case PRIMARY_KEY_INDEX:
pk:
{ {
ranges[i].range_flag |= UNIQUE_RANGE;
if ((op= m_active_trans->getNdbOperation(tab)) && if ((op= m_active_trans->getNdbOperation(tab)) &&
!op->readTuple(lm) && !op->readTuple(lm) &&
!set_primary_key(op, ranges[i].start_key.key) && !set_primary_key(op, ranges[i].start_key.key) &&
...@@ -4792,12 +4758,13 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4792,12 +4758,13 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
curr += reclength; curr += reclength;
else else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError()); ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
break;
} }
break; break;
case UNIQUE_INDEX: case UNIQUE_INDEX:
case UNIQUE_ORDERED_INDEX: sk:
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
{ {
ranges[i].range_flag |= UNIQUE_RANGE;
if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) && if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) &&
!op->readTuple(lm) && !op->readTuple(lm) &&
!set_primary_key(op, ranges[i].start_key.key) && !set_primary_key(op, ranges[i].start_key.key) &&
...@@ -4806,12 +4773,54 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4806,12 +4773,54 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
curr += reclength; curr += reclength;
else else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError()); ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
break;
}
case PRIMARY_KEY_ORDERED_INDEX:
if (ranges[i].start_key.length == key_info->key_length &&
ranges[i].start_key.flag == HA_READ_KEY_EXACT)
goto pk;
goto range;
case UNIQUE_ORDERED_INDEX:
if (ranges[i].start_key.length == key_info->key_length &&
ranges[i].start_key.flag == HA_READ_KEY_EXACT)
goto sk;
goto range;
case ORDERED_INDEX:
range:
ranges[i].range_flag &= ~(uint)UNIQUE_RANGE;
if (sorted && scanOp != 0 && curr != buffer->buffer)
{
/**
* We currently don't support batching of ordered range scans
*/
curr= (byte*)buffer->buffer_end;
break;
}
if (scanOp == 0)
{
if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) &&
(m_active_cursor= scanOp->readTuples(lm, 0, parallelism, sorted)) &&
!define_read_attrs(curr, scanOp))
curr += reclength;
else
ERR_RETURN(scanOp ?
scanOp->getNdbError() :
m_active_trans->getNdbError());
}
else
{
scanOp->set_new_bound();
}
const key_range *keys[2]= { &ranges[i].start_key, &ranges[i].end_key };
if ((res= set_bounds(scanOp, keys)))
DBUG_RETURN(res);
break;
} }
} }
if(i != range_count) if (i != range_count)
{ {
buffer->end_of_used_area= buffer->buffer_end; buffer->end_of_used_area= (byte*)buffer->buffer_end;
} }
else else
{ {
...@@ -4823,60 +4832,78 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4823,60 +4832,78 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
*/ */
m_current_multi_operation= m_current_multi_operation=
lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
if(!(res= execute_no_commit_ie(this, m_active_trans))) if (!(res= execute_no_commit_ie(this, m_active_trans)))
{ {
multi_range_curr= 0; multi_range_curr= 0;
m_multi_range_result_ptr= buffer->buffer; m_multi_range_defined_count= i;
DBUG_RETURN(read_multi_range_next()); m_multi_range_result_ptr= (byte*)buffer->buffer;
DBUG_RETURN(read_multi_range_next(found_range_p));
} }
ERR_RETURN(m_active_trans->getNdbError()); ERR_RETURN(m_active_trans->getNdbError());
} }
int int
ha_ndbcluster::read_multi_range_next() ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
{ {
DBUG_ENTER("ha_ndbcluster::read_multi_range_next"); DBUG_ENTER("ha_ndbcluster::read_multi_range_next");
if(m_disable_multi_read) if (m_disable_multi_read)
DBUG_RETURN(handler::read_multi_range_next()); DBUG_RETURN(handler::read_multi_range_next(multi_range_found_p));
ulong reclength = table->reclength;
const NdbOperation* op = m_current_multi_operation;
while(multi_range_curr < multi_range_count && op && op->getNdbError().code)
{
multi_range_curr++;
op = m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
}
if(multi_range_curr < multi_range_count && op) int res;
ulong reclength= table->reclength;
const NdbOperation* op= m_current_multi_operation;
while(multi_range_curr < m_multi_range_defined_count)
{ {
* multi_range_found_p= multi_ranges + multi_range_curr; if(multi_ranges[multi_range_curr].range_flag & UNIQUE_RANGE)
memcpy(table->record[0], m_multi_range_result_ptr, reclength); {
unpack_record(table->record[0]); if(op->getNdbError().code == 0)
table->status= 0; goto found_next;
op= m_active_trans->getNextCompletedOperation(op);
multi_range_curr++;
m_multi_range_result_ptr += reclength;
}
else if(m_active_cursor)
{
if(m_active_cursor->nextResult())
goto found;
/** multi_range_curr++;
* Move to next m_multi_range_result_ptr += reclength;
*/
multi_range_curr++;
m_current_multi_operation = m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
DBUG_RETURN(0);
}
if(multi_range_curr == multi_range_count) m_active_cursor->close();
{ m_active_cursor= 0;
}
else
{
multi_range_curr++;
}
}
if (multi_range_curr == multi_range_count)
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
}
/** /**
* Read remaining ranges * Read remaining ranges
*/ */
uint left = multi_range_count - multi_range_curr; uint left= multi_range_count - multi_range_curr;
DBUG_RETURN(read_multi_range_first(multi_range_found_p, DBUG_RETURN(read_multi_range_first(multi_range_found_p,
multi_ranges + multi_range_curr, multi_ranges + multi_range_curr,
left, left,
multi_range_sorted, multi_range_sorted,
multi_range_buffer)); multi_range_buffer));
found_next:
multi_range_curr++;
op= m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
found:
* multi_range_found_p= multi_ranges + multi_range_curr;
memcpy(table->record[0], m_multi_range_result_ptr, reclength);
unpack_record(table->record[0]);
table->status= 0;
DBUG_RETURN(0);
} }
#endif /* HAVE_NDBCLUSTER_DB */ #endif /* HAVE_NDBCLUSTER_DB */
...@@ -117,7 +117,7 @@ class ha_ndbcluster: public handler ...@@ -117,7 +117,7 @@ class ha_ndbcluster: public handler
int read_multi_range_first(key_multi_range **found_range_p, int read_multi_range_first(key_multi_range **found_range_p,
key_multi_range *ranges, uint range_count, key_multi_range *ranges, uint range_count,
bool sorted, handler_buffer *buffer); bool sorted, handler_buffer *buffer);
int read_multi_range_next(void); int read_multi_range_next(key_multi_range **found_range_p);
bool get_error_message(int error, String *buf); bool get_error_message(int error, String *buf);
void info(uint); void info(uint);
...@@ -256,6 +256,7 @@ class ha_ndbcluster: public handler ...@@ -256,6 +256,7 @@ class ha_ndbcluster: public handler
bool m_disable_multi_read; bool m_disable_multi_read;
byte* m_multi_range_result_ptr; byte* m_multi_range_result_ptr;
uint m_multi_range_defined_count;
const NdbOperation* m_current_multi_operation; const NdbOperation* m_current_multi_operation;
void set_rec_per_key(); void set_rec_per_key();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment