Commit cb6d2aff authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra SE

- Add support for Cassandra's 'varint' datatype, mappable to VARBINARY.
parent 4d06cb8a
...@@ -326,3 +326,20 @@ set cassandra_write_consistency='ANY'; ...@@ -326,3 +326,20 @@ set cassandra_write_consistency='ANY';
set cassandra_write_consistency='TWO'; set cassandra_write_consistency='TWO';
set cassandra_write_consistency='THREE'; set cassandra_write_consistency='THREE';
set cassandra_write_consistency=@tmp; set cassandra_write_consistency=@tmp;
#
# varint datatype support
#
CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(32)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
select rowkey, hex(varint_col) from t2;
rowkey hex(varint_col)
val-01 01
val-0x123456 123456
val-0x12345678 12345678
drop table t2;
# now, let's check what happens when MariaDB's column is not wide enough:
CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(2)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
select rowkey, hex(varint_col) from t2;
ERROR HY000: Internal error: 'Unable to convert value of field `varint_col` from cassandra's data format. Source has 4 bytes, data: 12345678'
drop table t2;
...@@ -68,6 +68,11 @@ create columnfamily cf8 (rowkey varchar primary key, countercol counter); ...@@ -68,6 +68,11 @@ create columnfamily cf8 (rowkey varchar primary key, countercol counter);
update cf8 set countercol=countercol+1 where rowkey='cnt1'; update cf8 set countercol=countercol+1 where rowkey='cnt1';
update cf8 set countercol=countercol+100 where rowkey='cnt2'; update cf8 set countercol=countercol+100 where rowkey='cnt2';
create columnfamily cf9 (rowkey varchar primary key, varint_col varint);
insert into cf9 (rowkey, varint_col) values ('val-01', 1);
insert into cf9 (rowkey, varint_col) values ('val-0x123456', 1193046);
insert into cf9 (rowkey, varint_col) values ('val-0x12345678', 305419896);
EOF EOF
--error 0,1,2 --error 0,1,2
--system cqlsh -3 -f $MYSQLTEST_VARDIR/cassandra_test_init.cql --system cqlsh -3 -f $MYSQLTEST_VARDIR/cassandra_test_init.cql
...@@ -413,6 +418,25 @@ set cassandra_write_consistency='THREE'; ...@@ -413,6 +418,25 @@ set cassandra_write_consistency='THREE';
set cassandra_write_consistency=@tmp; set cassandra_write_consistency=@tmp;
--echo #
--echo # varint datatype support
--echo #
# create columnfamily cf9 (rowkey varchar primary key, varint_col varint);
CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(32)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
--sorted_result
select rowkey, hex(varint_col) from t2;
drop table t2;
--echo # now, let's check what happens when MariaDB's column is not wide enough:
CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(2)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
--sorted_result
--error ER_INTERNAL_ERROR
select rowkey, hex(varint_col) from t2;
drop table t2;
############################################################################ ############################################################################
## Cassandra cleanup ## Cassandra cleanup
############################################################################ ############################################################################
......
...@@ -531,7 +531,7 @@ class ColumnDataConverter ...@@ -531,7 +531,7 @@ class ColumnDataConverter
Field *field; Field *field;
/* This will save Cassandra's data in the Field */ /* This will save Cassandra's data in the Field */
virtual void cassandra_to_mariadb(const char *cass_data, virtual int cassandra_to_mariadb(const char *cass_data,
int cass_data_len)=0; int cass_data_len)=0;
/* /*
...@@ -552,11 +552,12 @@ class DoubleDataConverter : public ColumnDataConverter ...@@ -552,11 +552,12 @@ class DoubleDataConverter : public ColumnDataConverter
{ {
double buf; double buf;
public: public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len) int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{ {
DBUG_ASSERT(cass_data_len == sizeof(double)); DBUG_ASSERT(cass_data_len == sizeof(double));
double *pdata= (double*) cass_data; double *pdata= (double*) cass_data;
field->store(*pdata); field->store(*pdata);
return 0;
} }
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
...@@ -574,11 +575,12 @@ class FloatDataConverter : public ColumnDataConverter ...@@ -574,11 +575,12 @@ class FloatDataConverter : public ColumnDataConverter
{ {
float buf; float buf;
public: public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len) int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{ {
DBUG_ASSERT(cass_data_len == sizeof(float)); DBUG_ASSERT(cass_data_len == sizeof(float));
float *pdata= (float*) cass_data; float *pdata= (float*) cass_data;
field->store(*pdata); field->store(*pdata);
return 0;
} }
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
...@@ -608,7 +610,7 @@ class BigintDataConverter : public ColumnDataConverter ...@@ -608,7 +610,7 @@ class BigintDataConverter : public ColumnDataConverter
longlong buf; longlong buf;
bool flip; /* is false when reading counter columns */ bool flip; /* is false when reading counter columns */
public: public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len) int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{ {
longlong tmp; longlong tmp;
DBUG_ASSERT(cass_data_len == sizeof(longlong)); DBUG_ASSERT(cass_data_len == sizeof(longlong));
...@@ -617,6 +619,7 @@ class BigintDataConverter : public ColumnDataConverter ...@@ -617,6 +619,7 @@ class BigintDataConverter : public ColumnDataConverter
else else
memcpy(&tmp, cass_data, sizeof(longlong)); memcpy(&tmp, cass_data, sizeof(longlong));
field->store(tmp); field->store(tmp);
return 0;
} }
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
...@@ -647,10 +650,11 @@ class TinyintDataConverter : public ColumnDataConverter ...@@ -647,10 +650,11 @@ class TinyintDataConverter : public ColumnDataConverter
{ {
char buf; char buf;
public: public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len) int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{ {
DBUG_ASSERT(cass_data_len == 1); DBUG_ASSERT(cass_data_len == 1);
field->store(cass_data[0]); field->store(cass_data[0]);
return 0;
} }
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
...@@ -668,12 +672,13 @@ class Int32DataConverter : public ColumnDataConverter ...@@ -668,12 +672,13 @@ class Int32DataConverter : public ColumnDataConverter
{ {
int32_t buf; int32_t buf;
public: public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len) int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{ {
int32_t tmp; int32_t tmp;
DBUG_ASSERT(cass_data_len == sizeof(int32_t)); DBUG_ASSERT(cass_data_len == sizeof(int32_t));
flip32(cass_data, (char*)&tmp); flip32(cass_data, (char*)&tmp);
field->store(tmp); field->store(tmp);
return 0;
} }
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
...@@ -691,10 +696,14 @@ class Int32DataConverter : public ColumnDataConverter ...@@ -691,10 +696,14 @@ class Int32DataConverter : public ColumnDataConverter
class StringCopyConverter : public ColumnDataConverter class StringCopyConverter : public ColumnDataConverter
{ {
String buf; String buf;
size_t max_length;
public: public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len) int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{ {
if ((size_t)cass_data_len > max_length)
return 1;
field->store(cass_data, cass_data_len,field->charset()); field->store(cass_data, cass_data_len,field->charset());
return 0;
} }
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
...@@ -704,6 +713,7 @@ class StringCopyConverter : public ColumnDataConverter ...@@ -704,6 +713,7 @@ class StringCopyConverter : public ColumnDataConverter
*cass_data_len= pstr->length(); *cass_data_len= pstr->length();
return false; return false;
} }
StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {}
~StringCopyConverter(){} ~StringCopyConverter(){}
}; };
...@@ -712,7 +722,7 @@ class TimestampDataConverter : public ColumnDataConverter ...@@ -712,7 +722,7 @@ class TimestampDataConverter : public ColumnDataConverter
{ {
int64_t buf; int64_t buf;
public: public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len) int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{ {
/* Cassandra data is milliseconds-since-epoch in network byte order */ /* Cassandra data is milliseconds-since-epoch in network byte order */
int64_t tmp; int64_t tmp;
...@@ -724,6 +734,7 @@ class TimestampDataConverter : public ColumnDataConverter ...@@ -724,6 +734,7 @@ class TimestampDataConverter : public ColumnDataConverter
- microsecond fraction of a second. - microsecond fraction of a second.
*/ */
((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000); ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
return 0;
} }
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
...@@ -768,7 +779,7 @@ class UuidDataConverter : public ColumnDataConverter ...@@ -768,7 +779,7 @@ class UuidDataConverter : public ColumnDataConverter
char buf[16]; /* Binary UUID representation */ char buf[16]; /* Binary UUID representation */
String str_buf; String str_buf;
public: public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len) int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{ {
DBUG_ASSERT(cass_data_len==16); DBUG_ASSERT(cass_data_len==16);
char str[37]; char str[37];
...@@ -783,6 +794,7 @@ class UuidDataConverter : public ColumnDataConverter ...@@ -783,6 +794,7 @@ class UuidDataConverter : public ColumnDataConverter
} }
*ptr= 0; *ptr= 0;
field->store(str, 36,field->charset()); field->store(str, 36,field->charset());
return 0;
} }
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
...@@ -836,6 +848,11 @@ const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType"; ...@@ -836,6 +848,11 @@ const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";
const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType"; const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
/*
VARINTs are stored as little-endian big numbers.
*/
const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType";
ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name) ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
{ {
...@@ -885,14 +902,20 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ ...@@ -885,14 +902,20 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
/* fall through: */ /* fall through: */
case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING: case MYSQL_TYPE_VAR_STRING:
{
bool is_varint;
if (!strcmp(validator_name, validator_blob) || if (!strcmp(validator_name, validator_blob) ||
!strcmp(validator_name, validator_ascii) || !strcmp(validator_name, validator_ascii) ||
!strcmp(validator_name, validator_text)) !strcmp(validator_name, validator_text) ||
(is_varint= !strcmp(validator_name, validator_varint)))
{ {
res= new StringCopyConverter; size_t max_size= (size_t)-1;
if (is_varint)
max_size= field->field_length;
res= new StringCopyConverter(max_size);
} }
break; break;
}
case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONG:
if (!strcmp(validator_name, validator_int)) if (!strcmp(validator_name, validator_int))
res= new Int32DataConverter; res= new Int32DataConverter;
...@@ -1041,24 +1064,43 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, ...@@ -1041,24 +1064,43 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
/* TODO: what if we're not reading all columns?? */ /* TODO: what if we're not reading all columns?? */
if (!found) if (!found)
{
rc= HA_ERR_KEY_NOT_FOUND; rc= HA_ERR_KEY_NOT_FOUND;
}
else else
rc= read_cassandra_columns(false);
DBUG_RETURN(rc);
}
void ha_cassandra::print_conversion_error(const char *field_name,
char *cass_value,
int cass_value_len)
{
char buf[32];
char *p= cass_value;
size_t i= 0;
for (; (i < (int)sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
{ {
read_cassandra_columns(false); buf[i++]= map2number[(*p >> 4) & 0xF];
buf[i++]= map2number[*p & 0xF];
} }
buf[i]=0;
DBUG_RETURN(rc); se->print_error("Unable to convert value for field `%s` from Cassandra's data"
" format. Source data is %d bytes, 0x%s%s",
field_name, cass_value_len, buf,
(i == sizeof(buf) - 1)? "..." : "");
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
} }
void ha_cassandra::read_cassandra_columns(bool unpack_pk) int ha_cassandra::read_cassandra_columns(bool unpack_pk)
{ {
char *cass_name; char *cass_name;
char *cass_value; char *cass_value;
int cass_value_len; int cass_value_len;
Field **field; Field **field;
int res= 0;
/* /*
cassandra_to_mariadb() calls will use field->store(...) methods, which cassandra_to_mariadb() calls will use field->store(...) methods, which
...@@ -1082,7 +1124,14 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk) ...@@ -1082,7 +1124,14 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk)
{ {
int fieldnr= (*field)->field_index; int fieldnr= (*field)->field_index;
(*field)->set_notnull(); (*field)->set_notnull();
field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len); if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
cass_value_len))
{
print_conversion_error((*field)->field_name, cass_value,
cass_value_len);
res=1;
goto err;
}
break; break;
} }
} }
...@@ -1094,10 +1143,17 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk) ...@@ -1094,10 +1143,17 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk)
field= table->field; field= table->field;
(*field)->set_notnull(); (*field)->set_notnull();
se->get_read_rowkey(&cass_value, &cass_value_len); se->get_read_rowkey(&cass_value, &cass_value_len);
rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len); if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
{
print_conversion_error((*field)->field_name, cass_value, cass_value_len);
res=1;
goto err;
}
} }
err:
dbug_tmp_restore_column_map(table->write_set, old_map); dbug_tmp_restore_column_map(table->write_set, old_map);
return res;
} }
...@@ -1234,10 +1290,7 @@ int ha_cassandra::rnd_next(uchar *buf) ...@@ -1234,10 +1290,7 @@ int ha_cassandra::rnd_next(uchar *buf)
if (reached_eof) if (reached_eof)
rc= HA_ERR_END_OF_FILE; rc= HA_ERR_END_OF_FILE;
else else
{ rc= read_cassandra_columns(true);
read_cassandra_columns(true);
rc= 0;
}
} }
DBUG_RETURN(rc); DBUG_RETURN(rc);
...@@ -1422,8 +1475,7 @@ int ha_cassandra::multi_range_read_next(range_id_t *range_info) ...@@ -1422,8 +1475,7 @@ int ha_cassandra::multi_range_read_next(range_id_t *range_info)
{ {
if (!se->get_next_multiget_row()) if (!se->get_next_multiget_row())
{ {
read_cassandra_columns(true); res= read_cassandra_columns(true);
res= 0;
break; break;
} }
else else
......
...@@ -58,7 +58,7 @@ class ha_cassandra: public handler ...@@ -58,7 +58,7 @@ class ha_cassandra: public handler
bool setup_field_converters(Field **field, uint n_fields); bool setup_field_converters(Field **field, uint n_fields);
void free_field_converters(); void free_field_converters();
void read_cassandra_columns(bool unpack_pk); int read_cassandra_columns(bool unpack_pk);
int check_table_options(struct ha_table_option_struct* options); int check_table_options(struct ha_table_option_struct* options);
bool doing_insert_batch; bool doing_insert_batch;
...@@ -66,6 +66,8 @@ class ha_cassandra: public handler ...@@ -66,6 +66,8 @@ class ha_cassandra: public handler
/* Used to produce 'wrong column %s at row %lu' warnings */ /* Used to produce 'wrong column %s at row %lu' warnings */
ha_rows insert_lineno; ha_rows insert_lineno;
void print_conversion_error(const char *field_name,
char *cass_value, int cass_value_len);
public: public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg); ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra() ~ha_cassandra()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment