Commit a1c3a274 authored by Sergey Petrunya's avatar Sergey Petrunya

Read records in batches when doing full table scan.

parent 80502f47
......@@ -68,6 +68,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
std::string rowkey; /* key of the record we're returning now */
SlicePredicate slice_pred;
bool get_slices_returned_less;
public:
Cassandra_se_impl() : cass(NULL) {}
virtual ~Cassandra_se_impl(){ delete cass; }
......@@ -92,9 +93,9 @@ class Cassandra_se_impl: public Cassandra_se_interface
void get_read_rowkey(char **value, int *value_len);
/* Reads, multi-row scans */
bool get_range_slices();
bool get_range_slices(bool last_key_as_start_key);
void finish_reading_range_slices();
bool get_next_range_slice_row();
bool get_next_range_slice_row(bool *eof);
/* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
void clear_read_columns();
......@@ -369,7 +370,7 @@ void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len)
}
bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as parameters
bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
{
bool res= true;
......@@ -380,11 +381,16 @@ bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as para
// Try passing nothing...
KeyRange key_range; // Try passing nothing, too.
key_range.__isset.start_key=true;
key_range.__isset.end_key=true;
key_range.start_key.assign("", 0);
key_range.end_key.assign("", 0);
key_range.__isset.start_key= true;
key_range.__isset.end_key= true;
if (last_key_as_start_key)
key_range.start_key= rowkey;
else
key_range.start_key.assign("", 0);
key_range.end_key.assign("", 0);
key_range.count= read_batch_size;
try {
cass->get_range_slices(key_slice_vec,
......@@ -392,6 +398,11 @@ bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as para
cur_consistency_level);
res= false;
if (key_slice_vec.size() < (uint)read_batch_size)
get_slices_returned_less= true;
else
get_slices_returned_less= false;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
......@@ -405,11 +416,32 @@ bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as para
}
bool Cassandra_se_impl::get_next_range_slice_row()
/* Switch to next row. This may produce an error */
bool Cassandra_se_impl::get_next_range_slice_row(bool *eof)
{
if (key_slice_it == key_slice_vec.end())
return true;
{
if (get_slices_returned_less)
{
*eof= true;
return false;
}
/*
We have read through all columns in this batch. Try getting the next
batch.
*/
if (get_range_slices(true))
return true;
if (key_slice_vec.empty())
{
*eof= true;
return false;
}
}
*eof= false;
column_data_vec= key_slice_it->columns;
rowkey= key_slice_it->key;
column_data_it= column_data_vec.begin();
......
......@@ -39,9 +39,11 @@ class Cassandra_se_interface
virtual void get_read_rowkey(char **value, int *value_len)=0;
/* Reads, multi-row scans */
virtual bool get_range_slices()=0;
int read_batch_size;
virtual bool get_range_slices(bool last_key_as_start_key)=0;
virtual void finish_reading_range_slices()=0;
virtual bool get_next_range_slice_row()=0;
virtual bool get_next_range_slice_row(bool *eof)=0;
/* read_set setup */
virtual void clear_read_columns()=0;
......
......@@ -212,7 +212,8 @@ static handler* cassandra_create_handler(handlerton *hton,
ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
:handler(hton, table_arg),
se(NULL), field_converters(NULL),rowkey_converter(NULL)
se(NULL), field_converters(NULL), rowkey_converter(NULL),
rnd_batch_size(10*1000)
{}
......@@ -760,7 +761,8 @@ int ha_cassandra::rnd_init(bool scan)
for (uint i= 1; i < table->s->fields; i++)
se->add_read_column(table->field[i]->field_name);
bres= se->get_range_slices();
se->read_batch_size= rnd_batch_size;
bres= se->get_range_slices(false);
if (bres)
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
......@@ -780,17 +782,23 @@ int ha_cassandra::rnd_end()
int ha_cassandra::rnd_next(uchar *buf)
{
int rc;
bool reached_eof;
DBUG_ENTER("ha_cassandra::rnd_next");
// Unpack and return the next record.
if (se->get_next_range_slice_row())
if (se->get_next_range_slice_row(&reached_eof))
{
rc= HA_ERR_END_OF_FILE;
rc= HA_ERR_INTERNAL_ERROR;
}
else
{
read_cassandra_columns(true);
rc= 0;
if (reached_eof)
rc= HA_ERR_END_OF_FILE;
else
{
read_cassandra_columns(true);
rc= 0;
}
}
DBUG_RETURN(rc);
......
......@@ -45,6 +45,8 @@ class ha_cassandra: public handler
void free_field_converters();
void read_cassandra_columns(bool unpack_pk);
ha_rows rnd_batch_size;
public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment