Commit a3f33268 authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra SE: add support for reading counter type values

parent 82e74d4c
...@@ -64,7 +64,9 @@ create columnfamily cf6 (rowkey uuid primary key, col1 int); ...@@ -64,7 +64,9 @@ create columnfamily cf6 (rowkey uuid primary key, col1 int);
create columnfamily cf7 (rowkey int primary key, boolcol boolean); create columnfamily cf7 (rowkey int primary key, boolcol boolean);
create columnfamily cf8 (rowkey int primary key, countercol counter); create columnfamily cf8 (rowkey varchar primary key, countercol counter);
update cf8 set countercol=countercol+1 where rowkey='cnt1';
update cf8 set countercol=countercol+100 where rowkey='cnt2';
EOF EOF
--error 0,1,2 --error 0,1,2
...@@ -355,7 +357,7 @@ drop table t2; ...@@ -355,7 +357,7 @@ drop table t2;
# create columnfamily cf7 (rowkey int primary key, boolcol boolean); # create columnfamily cf7 (rowkey int primary key, boolcol boolean);
CREATE TABLE t2 (rowkey int PRIMARY KEY, boolcol varchar(12)) ENGINE=CASSANDRA CREATE TABLE t2 (rowkey int PRIMARY KEY, boolcol bool) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7'; thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7';
insert into t2 values (0, 0); insert into t2 values (0, 0);
insert into t2 values (1, 1); insert into t2 values (1, 1);
...@@ -363,6 +365,13 @@ select * from t2; ...@@ -363,6 +365,13 @@ select * from t2;
delete from t2; delete from t2;
drop table t2; drop table t2;
# Counter type
# create columnfamily cf8 (rowkey int primary key, countercol counter);
CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, countercol bigint) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf8';
select * from t2;
############################################################################ ############################################################################
## Cassandra cleanup ## Cassandra cleanup
############################################################################ ############################################################################
......
...@@ -378,21 +378,37 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found) ...@@ -378,21 +378,37 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
bool Cassandra_se_impl::get_next_read_column(char **name, char **value, bool Cassandra_se_impl::get_next_read_column(char **name, char **value,
int *value_len) int *value_len)
{ {
bool use_counter=false;
while (1) while (1)
{ {
if (column_data_it == column_data_vec.end()) if (column_data_it == column_data_vec.end())
return true; return true;
if (((*column_data_it).__isset.column)) if ((*column_data_it).__isset.column)
break; /* Ok it's a real column. Should be always the case. */ break; /* Ok it's a real column. Should be always the case. */
if ((*column_data_it).__isset.counter_column)
{
use_counter= true;
break;
}
column_data_it++; column_data_it++;
} }
ColumnOrSuperColumn& cs= *column_data_it; ColumnOrSuperColumn& cs= *column_data_it;
*name= (char*)cs.column.name.c_str(); if (use_counter)
*value= (char*)cs.column.value.c_str(); {
*value_len= cs.column.value.length(); *name= (char*)cs.counter_column.name.c_str();
*value= (char*)&cs.counter_column.value;
*value_len= sizeof(cs.counter_column.value);
}
else
{
*name= (char*)cs.column.name.c_str();
*value= (char*)cs.column.value.c_str();
*value_len= cs.column.value.length();
}
column_data_it++; column_data_it++;
return false; return false;
......
...@@ -501,23 +501,31 @@ static void flip64(const char *from, char* to) ...@@ -501,23 +501,31 @@ static void flip64(const char *from, char* to)
class BigintDataConverter : public ColumnDataConverter class BigintDataConverter : public ColumnDataConverter
{ {
longlong buf; longlong buf;
bool flip; /* is false when reading counter columns */
public: public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len) void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{ {
longlong tmp; longlong tmp;
DBUG_ASSERT(cass_data_len == sizeof(longlong)); DBUG_ASSERT(cass_data_len == sizeof(longlong));
flip64(cass_data, (char*)&tmp); if (flip)
flip64(cass_data, (char*)&tmp);
else
memcpy(&tmp, cass_data, sizeof(longlong));
field->store(tmp); field->store(tmp);
} }
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{ {
longlong tmp= field->val_int(); longlong tmp= field->val_int();
flip64((const char*)&tmp, (char*)&buf); if (flip)
flip64((const char*)&tmp, (char*)&buf);
else
memcpy(&buf, &tmp, sizeof(longlong));
*cass_data= (char*)&buf; *cass_data= (char*)&buf;
*cass_data_len=sizeof(longlong); *cass_data_len=sizeof(longlong);
return false; return false;
} }
BigintDataConverter(bool flip_arg) : flip(flip_arg) {}
~BigintDataConverter(){} ~BigintDataConverter(){}
}; };
...@@ -723,6 +731,7 @@ const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType"; ...@@ -723,6 +731,7 @@ const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";
const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType"; const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name) ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
{ {
ColumnDataConverter *res= NULL; ColumnDataConverter *res= NULL;
...@@ -737,10 +746,13 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ ...@@ -737,10 +746,13 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
/* fall through: */ /* fall through: */
case MYSQL_TYPE_SHORT: case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONGLONG: case MYSQL_TYPE_LONGLONG:
if (!strcmp(validator_name, validator_bigint)) {
res= new BigintDataConverter; bool is_counter= false;
if (!strcmp(validator_name, validator_bigint) ||
(is_counter= !strcmp(validator_name, validator_counter)))
res= new BigintDataConverter(!is_counter);
break; break;
}
case MYSQL_TYPE_FLOAT: case MYSQL_TYPE_FLOAT:
if (!strcmp(validator_name, validator_float)) if (!strcmp(validator_name, validator_float))
res= new FloatDataConverter; res= new FloatDataConverter;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment