Commit f7662a45 authored by unknown's avatar unknown

wl2126 - ndb - Enable read_multi_range batching also for range scan(s)

 +fix mysql code style
 +adopt to interface changes by ingo


sql/ha_ndbcluster.cc:
  Enable read_multi_range batching also for range scan(s)
   +fix mysql code style
   +adopt to interface changes by ingo
sql/ha_ndbcluster.h:
  Enable read_multi_range batching also for range scan(s)
   +fix mysql code style
   +adopt to interface changes by ingo
parent 0a263290
......@@ -4715,7 +4715,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
ulong reclength = table->reclength;
NdbOperation* op;
if(uses_blob_value(m_retrieve_all_fields))
if (uses_blob_value(m_retrieve_all_fields))
{
/**
* blobs can't be batched currently
......@@ -4728,62 +4728,28 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
buffer));
}
switch(index_type){
case UNIQUE_INDEX:
case PRIMARY_KEY_INDEX:
break;
case PRIMARY_KEY_ORDERED_INDEX:
case UNIQUE_ORDERED_INDEX:
/**
* Currently not supported on ordered indexes
*/
for(i= 0; i<range_count; i++)
{
if (ranges[i].start_key.length == key_info->key_length &&
ranges[i].start_key.flag == HA_READ_KEY_EXACT)
continue;
/**
* Mark that we using hander:: implementation
*/
m_disable_multi_read= true;
DBUG_RETURN(handler::read_multi_range_first(found_range_p,
ranges,
range_count,
sorted,
buffer));
}
break;
default:
case ORDERED_INDEX:
m_disable_multi_read= true;
DBUG_RETURN(handler::read_multi_range_first(found_range_p,
ranges,
range_count,
sorted,
buffer));
}
m_disable_multi_read= false;
multi_ranges= ranges;
multi_range_count= range_count;
multi_range_sorted= sorted;
const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index;
multi_range_buffer= buffer;
multi_range_found_p= found_range_p;
byte* curr = buffer->buffer;
byte* curr= (byte*)buffer->buffer;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index;
const NdbOperation* lastOp = m_active_trans->getLastDefinedOperation();
const NdbOperation* lastOp= m_active_trans->getLastDefinedOperation();
NdbIndexScanOperation* scanOp= 0;
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
{
switch(index_type){
case PRIMARY_KEY_INDEX:
case PRIMARY_KEY_ORDERED_INDEX:
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
pk:
{
ranges[i].range_flag |= UNIQUE_RANGE;
if ((op= m_active_trans->getNdbOperation(tab)) &&
!op->readTuple(lm) &&
!set_primary_key(op, ranges[i].start_key.key) &&
......@@ -4792,12 +4758,13 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
curr += reclength;
else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
break;
}
break;
case UNIQUE_INDEX:
case UNIQUE_ORDERED_INDEX:
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
sk:
{
ranges[i].range_flag |= UNIQUE_RANGE;
if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) &&
!op->readTuple(lm) &&
!set_primary_key(op, ranges[i].start_key.key) &&
......@@ -4806,12 +4773,54 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
curr += reclength;
else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
break;
}
case PRIMARY_KEY_ORDERED_INDEX:
if (ranges[i].start_key.length == key_info->key_length &&
ranges[i].start_key.flag == HA_READ_KEY_EXACT)
goto pk;
goto range;
case UNIQUE_ORDERED_INDEX:
if (ranges[i].start_key.length == key_info->key_length &&
ranges[i].start_key.flag == HA_READ_KEY_EXACT)
goto sk;
goto range;
case ORDERED_INDEX:
range:
ranges[i].range_flag &= ~(uint)UNIQUE_RANGE;
if (sorted && scanOp != 0 && curr != buffer->buffer)
{
/**
* We currently don't support batching of ordered range scans
*/
curr= (byte*)buffer->buffer_end;
break;
}
if (scanOp == 0)
{
if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) &&
(m_active_cursor= scanOp->readTuples(lm, 0, parallelism, sorted)) &&
!define_read_attrs(curr, scanOp))
curr += reclength;
else
ERR_RETURN(scanOp ?
scanOp->getNdbError() :
m_active_trans->getNdbError());
}
else
{
scanOp->set_new_bound();
}
const key_range *keys[2]= { &ranges[i].start_key, &ranges[i].end_key };
if ((res= set_bounds(scanOp, keys)))
DBUG_RETURN(res);
break;
}
}
if(i != range_count)
if (i != range_count)
{
buffer->end_of_used_area= buffer->buffer_end;
buffer->end_of_used_area= (byte*)buffer->buffer_end;
}
else
{
......@@ -4823,60 +4832,78 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
*/
m_current_multi_operation=
lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
if(!(res= execute_no_commit_ie(this, m_active_trans)))
if (!(res= execute_no_commit_ie(this, m_active_trans)))
{
multi_range_curr= 0;
m_multi_range_result_ptr= buffer->buffer;
DBUG_RETURN(read_multi_range_next());
m_multi_range_defined_count= i;
m_multi_range_result_ptr= (byte*)buffer->buffer;
DBUG_RETURN(read_multi_range_next(found_range_p));
}
ERR_RETURN(m_active_trans->getNdbError());
}
int
ha_ndbcluster::read_multi_range_next()
ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
{
DBUG_ENTER("ha_ndbcluster::read_multi_range_next");
if(m_disable_multi_read)
DBUG_RETURN(handler::read_multi_range_next());
if (m_disable_multi_read)
DBUG_RETURN(handler::read_multi_range_next(multi_range_found_p));
ulong reclength = table->reclength;
const NdbOperation* op = m_current_multi_operation;
while(multi_range_curr < multi_range_count && op && op->getNdbError().code)
int res;
ulong reclength= table->reclength;
const NdbOperation* op= m_current_multi_operation;
while(multi_range_curr < m_multi_range_defined_count)
{
if(multi_ranges[multi_range_curr].range_flag & UNIQUE_RANGE)
{
if(op->getNdbError().code == 0)
goto found_next;
op= m_active_trans->getNextCompletedOperation(op);
multi_range_curr++;
op = m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
}
if(multi_range_curr < multi_range_count && op)
else if(m_active_cursor)
{
* multi_range_found_p= multi_ranges + multi_range_curr;
memcpy(table->record[0], m_multi_range_result_ptr, reclength);
unpack_record(table->record[0]);
table->status= 0;
if(m_active_cursor->nextResult())
goto found;
/**
* Move to next
*/
multi_range_curr++;
m_current_multi_operation = m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
DBUG_RETURN(0);
}
if(multi_range_curr == multi_range_count)
m_active_cursor->close();
m_active_cursor= 0;
}
else
{
DBUG_RETURN(HA_ERR_END_OF_FILE);
multi_range_curr++;
}
}
if (multi_range_curr == multi_range_count)
DBUG_RETURN(HA_ERR_END_OF_FILE);
/**
* Read remaining ranges
*/
uint left = multi_range_count - multi_range_curr;
uint left= multi_range_count - multi_range_curr;
DBUG_RETURN(read_multi_range_first(multi_range_found_p,
multi_ranges + multi_range_curr,
left,
multi_range_sorted,
multi_range_buffer));
found_next:
multi_range_curr++;
op= m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
found:
* multi_range_found_p= multi_ranges + multi_range_curr;
memcpy(table->record[0], m_multi_range_result_ptr, reclength);
unpack_record(table->record[0]);
table->status= 0;
DBUG_RETURN(0);
}
#endif /* HAVE_NDBCLUSTER_DB */
......@@ -117,7 +117,7 @@ class ha_ndbcluster: public handler
int read_multi_range_first(key_multi_range **found_range_p,
key_multi_range *ranges, uint range_count,
bool sorted, handler_buffer *buffer);
int read_multi_range_next(void);
int read_multi_range_next(key_multi_range **found_range_p);
bool get_error_message(int error, String *buf);
void info(uint);
......@@ -256,6 +256,7 @@ class ha_ndbcluster: public handler
bool m_disable_multi_read;
byte* m_multi_range_result_ptr;
uint m_multi_range_defined_count;
const NdbOperation* m_current_multi_operation;
void set_rec_per_key();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment