WL#1727 Implement read_range_first and read_range_next

parent d4ee7e7e
......@@ -634,25 +634,80 @@ inline int ha_ndbcluster::next_result(byte *buf)
}
/*
Set bounds for a ordered index scan, use key_range
*/
int ha_ndbcluster::set_bounds(NdbOperation *op,
const key_range *key,
int bound)
{
uint i, tot_len;
byte *key_ptr;
KEY* key_info= table->key_info + active_index;
KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts;
DBUG_ENTER("set_bounds");
DBUG_PRINT("enter", ("bound: %d", bound));
DBUG_PRINT("enter", ("key_parts: %d", key_info->key_parts));
DBUG_PRINT("enter", ("key->length: %d", key->length));
DBUG_PRINT("enter", ("key->flag: %d", key->flag));
// Set bounds using key data
tot_len= 0;
key_ptr= (byte *) key->key;
for (; key_part != end; key_part++)
{
Field* field= key_part->field;
uint32 field_len= field->pack_length();
tot_len+= field_len;
const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"};
DBUG_ASSERT(bound >= 0 && bound <= 4);
DBUG_PRINT("info", ("Set Bound%s on %s",
bounds[bound],
field->field_name));
DBUG_DUMP("key", (char*)key_ptr, field_len);
if (op->setBound(field->field_name,
bound,
key_ptr,
field_len) != 0)
ERR_RETURN(op->getNdbError());
key_ptr+= field_len;
if (tot_len >= key->length)
break;
/*
Only one bound which is not EQ can be set
so if this bound was not EQ, bail out and make
a best effort attempt
*/
if (bound != NdbOperation::BoundEQ)
break;
}
DBUG_RETURN(0);
}
/*
Read record(s) from NDB using ordered index scan
*/
int ha_ndbcluster::ordered_index_scan(const byte *key, uint key_len,
byte *buf,
enum ha_rkey_function find_flag)
int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key,
bool sorted, byte* buf)
{
uint no_fields= table->fields;
uint tot_len, i;
uint i;
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor= m_active_cursor;
NdbScanOperation *op;
const char *bound_str= NULL;
const char *index_name;
NdbOperation::BoundType bound_type = NdbOperation::BoundEQ;
bool can_be_handled_by_ndb= FALSE;
byte *key_ptr;
KEY *key_info;
THD* thd = current_thd;
DBUG_ENTER("ordered_index_scan");
DBUG_PRINT("enter", ("index: %u", active_index));
......@@ -664,77 +719,27 @@ int ha_ndbcluster::ordered_index_scan(const byte *key, uint key_len,
if (!(cursor= op->readTuples(parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
switch (find_flag) {
case HA_READ_KEY_EXACT: /* Find first record else error */
bound_str= "HA_READ_KEY_EXACT";
bound_type= NdbOperation::BoundEQ;
can_be_handled_by_ndb= TRUE;
break;
case HA_READ_KEY_OR_NEXT: /* Record or next record */
bound_str= "HA_READ_KEY_OR_NEXT";
bound_type= NdbOperation::BoundLE;
can_be_handled_by_ndb= TRUE;
break;
case HA_READ_KEY_OR_PREV: /* Record or previous */
bound_str= "HA_READ_KEY_OR_PREV";
bound_type= NdbOperation::BoundGE;
can_be_handled_by_ndb= TRUE;
break;
case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
bound_str= "HA_READ_AFTER_KEY";
bound_type= NdbOperation::BoundLT;
can_be_handled_by_ndb= TRUE;
break;
case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
bound_str= "HA_READ_BEFORE_KEY";
bound_type= NdbOperation::BoundGT;
can_be_handled_by_ndb= TRUE;
break;
case HA_READ_PREFIX: /* Key which as same prefix */
bound_str= "HA_READ_PREFIX";
break;
case HA_READ_PREFIX_LAST: /* Last key with the same prefix */
bound_str= "HA_READ_PREFIX_LAST";
break;
case HA_READ_PREFIX_LAST_OR_PREV:
/* Last or prev key with the same prefix */
bound_str= "HA_READ_PREFIX_LAST_OR_PREV";
break;
default:
bound_str= "UNKNOWN";
break;
}
DBUG_PRINT("info", ("find_flag: %s, bound_type: %d,"
"can_be_handled_by_ndb: %d",
bound_str, bound_type, can_be_handled_by_ndb));
if (!can_be_handled_by_ndb)
if (start_key &&
set_bounds(op, start_key,
(start_key->flag == HA_READ_KEY_EXACT) ?
NdbOperation::BoundEQ :
(start_key->flag == HA_READ_AFTER_KEY) ?
NdbOperation::BoundLT :
NdbOperation::BoundLE))
DBUG_RETURN(1);
if (end_key &&
(start_key && start_key->flag != HA_READ_KEY_EXACT) &&
// MASV Is it a bug that end_key is not 0
// if start flag is HA_READ_KEY_EXACT
// Set bounds using key data
tot_len= 0;
key_ptr= (byte *) key;
key_info= table->key_info + active_index;
for (i= 0; i < key_info->key_parts; i++)
{
Field* field= key_info->key_part[i].field;
uint32 field_len= field->pack_length();
DBUG_PRINT("info", ("Set index bound on %s",
field->field_name));
DBUG_DUMP("key", (char*)key_ptr, field_len);
if (op->setBound(field->field_name,
bound_type,
key_ptr,
field_len) != 0)
ERR_RETURN(op->getNdbError());
key_ptr+= field_len;
tot_len+= field_len;
if (tot_len >= key_len)
break;
}
set_bounds(op, end_key,
(end_key->flag == HA_READ_AFTER_KEY) ?
NdbOperation::BoundGE :
NdbOperation::BoundGT))
DBUG_RETURN(1);
// Define attributes to read
for (i= 0; i < no_fields; i++)
{
......@@ -1360,9 +1365,13 @@ int ha_ndbcluster::index_read(byte *buf,
case UNIQUE_INDEX:
error= unique_index_read(key, key_len, buf);
break;
case ORDERED_INDEX:
error= ordered_index_scan(key, key_len, buf, find_flag);
key_range start_key;
start_key.key= key;
start_key.length= key_len;
start_key.flag= find_flag;
error= ordered_index_scan(&start_key, 0, false, buf);
break;
default:
......@@ -1423,6 +1432,44 @@ int ha_ndbcluster::index_last(byte *buf)
}
int ha_ndbcluster::read_range_first(const key_range *start_key,
const key_range *end_key,
bool sorted)
{
int error= 1;
DBUG_ENTER("ha_ndbcluster::read_range_first");
switch (get_index_type(active_index)){
case PRIMARY_KEY_INDEX:
error= pk_read(start_key->key, start_key->length,
table->record[0]);
break;
case UNIQUE_INDEX:
error= unique_index_read(start_key->key, start_key->length,
table->record[0]);
break;
case ORDERED_INDEX:
// Start the ordered index scan and fetch the first row
error= ordered_index_scan(start_key, end_key, sorted,
table->record[0]);
break;
default:
case UNDEFINED_INDEX:
break;
}
DBUG_RETURN(error);
}
int ha_ndbcluster::read_range_next(bool eq_range)
{
DBUG_ENTER("ha_ndbcluster::read_range_next");
DBUG_RETURN(next_result(table->record[0]));
}
int ha_ndbcluster::rnd_init(bool scan)
{
NdbResultSet *cursor= m_active_cursor;
......
......@@ -76,6 +76,11 @@ class ha_ndbcluster: public handler
int rnd_next(byte *buf);
int rnd_pos(byte *buf, byte *pos);
void position(const byte *record);
int read_range_first(const key_range *start_key,
const key_range *end_key,
bool sorted);
int read_range_next(bool eq_range);
void info(uint);
int extra(enum ha_extra_function operation);
......@@ -147,9 +152,9 @@ class ha_ndbcluster: public handler
byte *buf);
int unique_index_read(const byte *key, uint key_len,
byte *buf);
int ordered_index_scan(const byte *key, uint key_len,
byte *buf,
enum ha_rkey_function find_flag);
int ordered_index_scan(const key_range *start_key,
const key_range *end_key,
bool sorted, byte* buf);
int full_table_scan(byte * buf);
int next_result(byte *buf);
#if 0
......@@ -172,6 +177,8 @@ class ha_ndbcluster: public handler
int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr);
int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key(NdbOperation *op);
int set_bounds(NdbOperation *ndb_op, const key_range *key,
int bound);
int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
void print_results();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment