Commit 245298f2 authored by unknown's avatar unknown

MDEV-506 Cassandra dynamic columns access

parent 7327cd97
......@@ -39,6 +39,12 @@
*/
#define MAX_DYNAMIC_COLUMN_LENGTH 0X1FFFFFFFL
/*
Limits of implementation
*/
#define MAX_NAME_LENGTH 255
#define MAX_TOTAL_NAME_LENGTH 65535
/* NO and OK is the same used just to show semantics */
#define ER_DYNCOL_NO ER_DYNCOL_OK
......@@ -50,7 +56,8 @@ enum enum_dyncol_func_result
ER_DYNCOL_LIMIT= -2, /* Some limit reached */
ER_DYNCOL_RESOURCE= -3, /* Out of resourses */
ER_DYNCOL_DATA= -4, /* Incorrect input data */
ER_DYNCOL_UNKNOWN_CHARSET= -5 /* Unknown character set */
ER_DYNCOL_UNKNOWN_CHARSET= -5, /* Unknown character set */
ER_DYNCOL_TRUNCATED= 2 /* OK, but data was truncated */
};
typedef DYNAMIC_STRING DYNAMIC_COLUMN;
......@@ -81,6 +88,7 @@ struct st_dynamic_column_value
struct {
LEX_STRING value;
CHARSET_INFO *charset;
my_bool nonfreeable;
} string;
struct {
decimal_digit_t buffer[DECIMAL_BUFF_LENGTH];
......@@ -108,6 +116,13 @@ dynamic_column_create_many_fmt(DYNAMIC_COLUMN *str,
uchar *column_keys,
DYNAMIC_COLUMN_VALUE *values,
my_bool names);
enum enum_dyncol_func_result
dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
uint column_count,
void *column_keys,
DYNAMIC_COLUMN_VALUE *values,
my_bool new_str,
my_bool string_keys);
enum enum_dyncol_func_result
dynamic_column_update(DYNAMIC_COLUMN *org, uint column_nr,
......@@ -163,6 +178,21 @@ dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json);
#define dynamic_column_initialize(A) memset((A), 0, sizeof(*(A)))
#define dynamic_column_column_free(V) dynstr_free(V)
/* conversion of values to 3 base types */
enum enum_dyncol_func_result
dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
CHARSET_INFO *cs, my_bool quote);
enum enum_dyncol_func_result
dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val);
enum enum_dyncol_func_result
dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val);
enum enum_dyncol_func_result
dynamic_column_vals(DYNAMIC_COLUMN *str,
DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
char **free_names);
/***************************************************************************
Internal functions, don't use if you don't know what you are doing...
***************************************************************************/
......
......@@ -365,8 +365,9 @@ CREATE TABLE t2 (rowkey bigint PRIMARY KEY, datecol bigint) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
select * from t2;
rowkey datecol
1 1346189025000
10 1346189026000
1 1346192625000
10 1346192626000
delete from t2;
drop table t2;
#
# Check whether changing parameters with ALTER TABLE works.
......@@ -407,3 +408,141 @@ new-rowkey12 data1-value3 454
rowkey11 updated-1 34543
delete from t1;
drop table t1;
#
# Dynamic columns support
#
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
drop table t2;
#error: dynamic column is not a blob
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
ERROR 42000: Incorrect column specifier for column 'uuidcol'
#error: double dynamic column
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1, textcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
ERROR 42000: Incorrect column specifier for column 'textcol'
#
# Dynamic column read
#
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
delete from t2;
insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
drop table t2;
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2;
rowkey column_list(dyn) column_get(dyn, 'uuidcol' as char)
1 `uuidcol` 9b5658dc-f32f-11e1-94cd-f46d046e9f09
2 `uuidcol` 9b5658dc-f32f-11e1-94cd-f46d046e9f0a
drop table t2;
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
delete from t2;
drop table t2;
#
# Dynamic column insert
#
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
insert into t2 values (1, column_create("dyn1", 1, "dyn2", "two"));
select rowkey, column_json(dyn) from t2;
rowkey column_json(dyn)
1 [{"dyn1":"1"},{"dyn2":"two"}]
delete from t2;
drop table t2;
# bigint
CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'a', 254324));
insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'a', 2543));
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"a":254324},{"dyn1":"1"},{"dyn2":"two"}]
2 [{"a":2543},{"dyn1":"1"},{"dyn2":"two"}]
delete from t1;
drop table t1;
# int
CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf3';
insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'intcol', 254324));
insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'intcol', 2543));
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"dyn1":"1"},{"dyn2":"two"},{"intcol":254324}]
2 [{"dyn1":"1"},{"dyn2":"two"},{"intcol":2543}]
delete from t1;
drop table t1;
# timestamp
CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'datecol', 254324));
insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'datecol', 2543));
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"dyn1":"1"},{"dyn2":"two"},{"datecol":254324}]
2 [{"dyn1":"1"},{"dyn2":"two"},{"datecol":2543}]
delete from t1;
drop table t1;
# boolean
CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7';
insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 254324));
insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 0));
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":1}]
2 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":0}]
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":1}]
2 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":0}]
update t1 set dyn=column_add(dyn, "dyn2", null, "dyn3", "3");
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"dyn1":"1"},{"dyn3":"3"},{"boolcol":1}]
2 [{"dyn1":"1"},{"dyn3":"3"},{"boolcol":0}]
update t1 set dyn=column_add(dyn, "dyn1", null) where rowkey= 1;
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"dyn3":"3"},{"boolcol":1}]
2 [{"dyn1":"1"},{"dyn3":"3"},{"boolcol":0}]
update t1 set dyn=column_add(dyn, "dyn3", null, "a", "ddd");
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"a":"ddd"},{"boolcol":1}]
2 [{"a":"ddd"},{"dyn1":"1"},{"boolcol":0}]
update t1 set dyn=column_add(dyn, "12345678901234", "ddd");
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"a":"ddd"},{"boolcol":1},{"12345678901234":"ddd"}]
2 [{"a":"ddd"},{"dyn1":"1"},{"boolcol":0},{"12345678901234":"ddd"}]
update t1 set dyn=column_add(dyn, "12345678901234", null);
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"a":"ddd"},{"boolcol":1}]
2 [{"a":"ddd"},{"dyn1":"1"},{"boolcol":0}]
update t1 set dyn=column_add(dyn, 'boolcol', null) where rowkey= 2;
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"a":"ddd"},{"boolcol":1}]
2 [{"a":"ddd"},{"dyn1":"1"}]
update t1 set rowkey= 3, dyn=column_add(dyn, "dyn1", null, 'boolcol', 0) where rowkey= 2;
select rowkey, column_json(dyn) from t1;
rowkey column_json(dyn)
1 [{"a":"ddd"},{"boolcol":1}]
3 [{"a":"ddd"},{"boolcol":0}]
delete from t1;
drop table t1;
CREATE TABLE t1 (rowkey varchar(10) PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd1';
select * from t1;
ERROR HY000: Internal error: 'Unable to convert value for field `dyn` from Cassandra's data format. Name length exceed limit of 255: 'very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_ver'
drop table t1;
CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes)
ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2';
DELETE FROM t1;
insert into t1 values (1, column_create("dyn", 1));
select rowkey, column_list(dyn) from t1;
rowkey column_list(dyn)
1 `dyn`
delete from t1;
DROP TABLE t1;
CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes)
ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2';
insert into t1 values (1,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
ERROR HY000: Encountered illegal format of dynamic column string
delete from t1;
DROP TABLE t1;
......@@ -89,6 +89,24 @@ The following options may be given as the first argument:
--bulk-insert-buffer-size=#
Size of tree cache used in bulk insert optimisation. Note
that this is a limit per thread!
--cassandra[=name] Enable or disable CASSANDRA plugin. Possible values are
ON, OFF, FORCE (don't start if the plugin fails to load).
--cassandra-default-thrift-host=name
Default host for Cassandra thrift connections
--cassandra-failure-retries=#
Number of times to retry Cassandra calls that failed due
to timeouts or network communication problems. The
default, 0, means not to retry.
--cassandra-insert-batch-size=#
Number of rows in an INSERT batch
--cassandra-multiget-batch-size=#
Number of rows in a multiget(MRR) batch
--cassandra-read-consistency=name
Cassandra consistency level to use for read operations
--cassandra-rnd-batch-size=#
Number of rows in an rnd_read (full scan) batch
--cassandra-write-consistency=name
Cassandra consistency level to use for write operations
--character-set-client-handshake
Don't ignore client side character set value sent during
handshake.
......@@ -863,6 +881,14 @@ binlog-optimize-thread-scheduling TRUE
binlog-row-event-max-size 1024
binlog-stmt-cache-size 32768
bulk-insert-buffer-size 8388608
cassandra ON
cassandra-default-thrift-host (No default value)
cassandra-failure-retries 0
cassandra-insert-batch-size 100
cassandra-multiget-batch-size 100
cassandra-read-consistency ONE
cassandra-rnd-batch-size 10000
cassandra-write-consistency ONE
character-set-client-handshake TRUE
character-set-filesystem binary
character-set-server latin1
......
......@@ -94,6 +94,18 @@ CREATE COLUMN FAMILY cf10
WITH comparator = UTF8Type
AND key_validation_class=UTF8Type
AND default_validation_class = UTF8Type;
CREATE COLUMN FAMILY cfd1
WITH comparator = UTF8Type
AND key_validation_class=UTF8Type
AND default_validation_class = UTF8Type;
SET cfd1['1']['very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_long_name']='1';
CREATE COLUMN FAMILY cfd2
WITH comparator = UTF8Type
AND key_validation_class=Int32Type
AND default_validation_class = UTF8Type;
EOF
--error 0,1,2
......@@ -463,7 +475,7 @@ drop table t2;
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, datecol bigint) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
select * from t2;
delete from t2;
drop table t2;
--echo #
......@@ -511,6 +523,118 @@ select * from t1;
delete from t1;
drop table t1;
--echo #
--echo # Dynamic columns support
--echo #
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
drop table t2;
--echo #error: dynamic column is not a blob
--error ER_WRONG_FIELD_SPEC
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
--echo #error: double dynamic column
--error ER_WRONG_FIELD_SPEC
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1, textcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
--echo #
--echo # Dynamic column read
--echo #
#prepare data
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
delete from t2;
insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
drop table t2;
#test dynamic column read
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2;
drop table t2;
#cleanup data
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
delete from t2;
drop table t2;
--echo #
--echo # Dynamic column insert
--echo #
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
insert into t2 values (1, column_create("dyn1", 1, "dyn2", "two"));
select rowkey, column_json(dyn) from t2;
delete from t2;
drop table t2;
--echo # bigint
CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'a', 254324));
insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'a', 2543));
select rowkey, column_json(dyn) from t1;
delete from t1;
drop table t1;
--echo # int
CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf3';
insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'intcol', 254324));
insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'intcol', 2543));
select rowkey, column_json(dyn) from t1;
delete from t1;
drop table t1;
--echo # timestamp
CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'datecol', 254324));
insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'datecol', 2543));
select rowkey, column_json(dyn) from t1;
delete from t1;
drop table t1;
--echo # boolean
CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7';
insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 254324));
insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 0));
select rowkey, column_json(dyn) from t1;
select rowkey, column_json(dyn) from t1;
update t1 set dyn=column_add(dyn, "dyn2", null, "dyn3", "3");
select rowkey, column_json(dyn) from t1;
update t1 set dyn=column_add(dyn, "dyn1", null) where rowkey= 1;
select rowkey, column_json(dyn) from t1;
update t1 set dyn=column_add(dyn, "dyn3", null, "a", "ddd");
select rowkey, column_json(dyn) from t1;
update t1 set dyn=column_add(dyn, "12345678901234", "ddd");
select rowkey, column_json(dyn) from t1;
update t1 set dyn=column_add(dyn, "12345678901234", null);
select rowkey, column_json(dyn) from t1;
update t1 set dyn=column_add(dyn, 'boolcol', null) where rowkey= 2;
select rowkey, column_json(dyn) from t1;
update t1 set rowkey= 3, dyn=column_add(dyn, "dyn1", null, 'boolcol', 0) where rowkey= 2;
select rowkey, column_json(dyn) from t1;
delete from t1;
drop table t1;
CREATE TABLE t1 (rowkey varchar(10) PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd1';
--error ER_INTERNAL_ERROR
select * from t1;
drop table t1;
# MDEV-560
CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes)
ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2';
DELETE FROM t1;
insert into t1 values (1, column_create("dyn", 1));
select rowkey, column_list(dyn) from t1;
# Cleanup
delete from t1;
DROP TABLE t1;
# MDEV-561 (incorrect format data to dynamic column)
CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes)
ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2';
--error ER_DYN_COL_WRONG_FORMAT
insert into t1 values (1,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
delete from t1;
DROP TABLE t1;
############################################################################
## Cassandra cleanup
############################################################################
......
......@@ -68,6 +68,8 @@ uint32 copy_and_convert(char *to, uint32 to_length, CHARSET_INFO *to_cs,
#define MAX_OFFSET_LENGTH 5
#define DYNCOL_NUM_CHAR 6
my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str)
{
if (str->length < 1)
......@@ -211,7 +213,7 @@ static my_bool check_limit_num(const void *val)
static my_bool check_limit_str(const void *val)
{
return (*((LEX_STRING **)val))->length > 255;
return (*((LEX_STRING **)val))->length > MAX_NAME_LENGTH;
}
......@@ -288,7 +290,7 @@ my_bool put_header_entry_str(DYN_HEADER *hdr,
size_t offset)
{
LEX_STRING *column_name= (LEX_STRING *)column_key;
DBUG_ASSERT(column_name->length <= 255);
DBUG_ASSERT(column_name->length <= MAX_NAME_LENGTH);
hdr->entry[0]= column_name->length;
DBUG_ASSERT(hdr->name - hdr->nmpool < (long) 0x10000L);
int2store(hdr->entry + 1, hdr->name - hdr->nmpool);
......@@ -1381,6 +1383,9 @@ dynamic_new_column_store(DYNAMIC_COLUMN *str,
DYNCOL_SYZERESERVE))
goto err;
}
if (!column_count)
return ER_DYNCOL_OK;
bzero(str->str, fmt->fixed_hdr);
str->length= fmt->fixed_hdr;
......@@ -1501,7 +1506,7 @@ calc_var_sizes(DYN_HEADER *hdr,
@return ER_DYNCOL_* return code
*/
static enum enum_dyncol_func_result
enum enum_dyncol_func_result
dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
uint column_count,
void *column_keys,
......@@ -1761,7 +1766,7 @@ static my_bool
find_column(DYN_HEADER *hdr, uint numkey, LEX_STRING *strkey)
{
LEX_STRING nmkey;
char nmkeybuff[6]; /* to fit max 2 bytes number */
char nmkeybuff[DYNCOL_NUM_CHAR]; /* to fit max 2 bytes number */
DBUG_ASSERT(hdr->header != NULL);
if (hdr->header + hdr->header_size > hdr->data_end)
......@@ -2169,10 +2174,10 @@ dynamic_column_list_str(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_lexstr)
if (header.format == DYNCOL_FMT_NUM)
{
uint nm= uint2korr(read);
tmp.str= my_malloc(6, MYF(0));
tmp.str= my_malloc(DYNCOL_NUM_CHAR, MYF(0));
if (!tmp.str)
return ER_DYNCOL_RESOURCE;
tmp.length= snprintf(tmp.str, 6, "%u", nm);
tmp.length= snprintf(tmp.str, DYNCOL_NUM_CHAR, "%u", nm);
}
else
{
......@@ -2208,7 +2213,7 @@ find_place(DYN_HEADER *hdr, void *key, my_bool string_keys)
uint mid, start, end, val;
int flag;
LEX_STRING str;
char buff[6];
char buff[DYNCOL_NUM_CHAR];
my_bool need_conversion= ((string_keys ? DYNCOL_FMT_STR : DYNCOL_FMT_NUM) !=
hdr->format);
LINT_INIT(flag); /* 100 % safe */
......@@ -2425,7 +2430,7 @@ dynamic_column_update_copy(DYNAMIC_COLUMN *str, PLAN *plan,
size_t offs;
uint nm;
DYNAMIC_COLUMN_TYPE tp;
char buff[6];
char buff[DYNCOL_NUM_CHAR];
if (hdr->format == DYNCOL_FMT_NUM)
{
......@@ -3438,7 +3443,7 @@ dynamic_column_check(DYNAMIC_COLUMN *str)
enum enum_dyncol_func_result
dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
my_bool quote)
CHARSET_INFO *cs, my_bool quote)
{
char buff[40];
int len;
......@@ -3468,24 +3473,22 @@ dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
char *alloc= NULL;
char *from= val->x.string.value.str;
uint bufflen;
my_bool conv= !my_charset_same(val->x.string.charset,
&my_charset_utf8_general_ci);
my_bool conv= !my_charset_same(val->x.string.charset, cs);
my_bool rc;
len= val->x.string.value.length;
bufflen= (len * (conv ? my_charset_utf8_general_ci.mbmaxlen : 1));
bufflen= (len * (conv ? cs->mbmaxlen : 1));
if (dynstr_realloc(str, bufflen))
return ER_DYNCOL_RESOURCE;
// guaranty UTF-8 string for value
if (!my_charset_same(val->x.string.charset,
&my_charset_utf8_general_ci))
if (!my_charset_same(val->x.string.charset, cs))
{
uint dummy_errors;
if (!quote)
{
/* convert to the destination */
str->length+= copy_and_convert_extended(str->str, bufflen,
&my_charset_utf8_general_ci,
cs,
from, len,
val->x.string.charset,
&dummy_errors);
......@@ -3494,8 +3497,7 @@ dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
if ((alloc= (char *)my_malloc(bufflen, MYF(0))))
{
len=
copy_and_convert_extended(alloc, bufflen,
&my_charset_utf8_general_ci,
copy_and_convert_extended(alloc, bufflen, cs,
from, len, val->x.string.charset,
&dummy_errors);
from= alloc;
......@@ -3543,6 +3545,155 @@ dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
return(ER_DYNCOL_OK);
}
enum enum_dyncol_func_result
dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val)
{
enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
*ll= 0;
switch (val->type) {
case DYN_COL_INT:
*ll= val->x.long_value;
break;
case DYN_COL_UINT:
*ll= (longlong)val->x.ulong_value;
if (val->x.ulong_value > ULONGLONG_MAX)
rc= ER_DYNCOL_TRUNCATED;
break;
case DYN_COL_DOUBLE:
*ll= (longlong)val->x.double_value;
if (((double) *ll) != val->x.double_value)
rc= ER_DYNCOL_TRUNCATED;
break;
case DYN_COL_STRING:
{
longlong i= 0, sign= 1;
char *src= val->x.string.value.str;
uint len= val->x.string.value.length;
while (len && my_isspace(&my_charset_latin1, *src)) src++,len--;
if (len)
{
if (*src == '-')
{
sign= -1;
src++;
} else if (*src == '-')
src++;
while(len && my_isdigit(&my_charset_latin1, *src))
{
i= i * 10 + (*src - '0');
src++;
}
}
else
rc= ER_DYNCOL_TRUNCATED;
if (len)
rc= ER_DYNCOL_TRUNCATED;
*ll= i * sign;
break;
}
case DYN_COL_DECIMAL:
if (decimal2longlong(&val->x.decimal.value, ll) != E_DEC_OK)
rc= ER_DYNCOL_TRUNCATED;
break;
case DYN_COL_DATETIME:
*ll= (val->x.time_value.year * 10000000000L +
val->x.time_value.month * 100000000L +
val->x.time_value.day * 1000000 +
val->x.time_value.hour * 10000 +
val->x.time_value.minute * 100 +
val->x.time_value.second) *
(val->x.time_value.neg ? -1 : 1);
break;
case DYN_COL_DATE:
*ll= (val->x.time_value.year * 10000 +
val->x.time_value.month * 100 +
val->x.time_value.day) *
(val->x.time_value.neg ? -1 : 1);
break;
case DYN_COL_TIME:
*ll= (val->x.time_value.hour * 10000 +
val->x.time_value.minute * 100 +
val->x.time_value.second) *
(val->x.time_value.neg ? -1 : 1);
break;
case DYN_COL_NULL:
rc= ER_DYNCOL_TRUNCATED;
break;
default:
return(ER_DYNCOL_FORMAT);
}
return(rc);
}
enum enum_dyncol_func_result
dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val)
{
enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
*dbl= 0;
switch (val->type) {
case DYN_COL_INT:
*dbl= (double)val->x.long_value;
if (((longlong) *dbl) != val->x.long_value)
rc= ER_DYNCOL_TRUNCATED;
break;
case DYN_COL_UINT:
*dbl= (double)val->x.ulong_value;
if (((ulonglong) *dbl) != val->x.ulong_value)
rc= ER_DYNCOL_TRUNCATED;
break;
case DYN_COL_DOUBLE:
*dbl= val->x.double_value;
break;
case DYN_COL_STRING:
{
char *str, *end;
if ((str= malloc(val->x.string.value.length + 1)))
return ER_DYNCOL_RESOURCE;
memcpy(str, val->x.string.value.str, val->x.string.value.length);
str[val->x.string.value.length]= '\0';
*dbl= strtod(str, &end);
if (*end != '\0')
rc= ER_DYNCOL_TRUNCATED;
}
case DYN_COL_DECIMAL:
if (decimal2double(&val->x.decimal.value, dbl) != E_DEC_OK)
rc= ER_DYNCOL_TRUNCATED;
break;
case DYN_COL_DATETIME:
*dbl= (double)(val->x.time_value.year * 10000000000L +
val->x.time_value.month * 100000000L +
val->x.time_value.day * 1000000 +
val->x.time_value.hour * 10000 +
val->x.time_value.minute * 100 +
val->x.time_value.second) *
(val->x.time_value.neg ? -1 : 1);
break;
case DYN_COL_DATE:
*dbl= (double)(val->x.time_value.year * 10000 +
val->x.time_value.month * 100 +
val->x.time_value.day) *
(val->x.time_value.neg ? -1 : 1);
break;
case DYN_COL_TIME:
*dbl= (double)(val->x.time_value.hour * 10000 +
val->x.time_value.minute * 100 +
val->x.time_value.second) *
(val->x.time_value.neg ? -1 : 1);
break;
case DYN_COL_NULL:
rc= ER_DYNCOL_TRUNCATED;
break;
default:
return(ER_DYNCOL_FORMAT);
}
return(rc);
}
/**
Convert to JSON
......@@ -3602,10 +3753,11 @@ dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json)
if (header.format == DYNCOL_FMT_NUM)
{
uint nm= uint2korr(header.entry);
if (dynstr_realloc(json, 6 + 3))
if (dynstr_realloc(json, DYNCOL_NUM_CHAR + 3))
goto err;
json->str[json->length++]= '"';
json->length+= (snprintf(json->str + json->length, 6, "%u", nm));
json->length+= (snprintf(json->str + json->length,
DYNCOL_NUM_CHAR, "%u", nm));
}
else
{
......@@ -3619,7 +3771,8 @@ dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json)
}
json->str[json->length++]= '"';
json->str[json->length++]= ':';
if ((rc= dynamic_column_val_str(json, &val, TRUE)) < 0 ||
if ((rc= dynamic_column_val_str(json, &val,
&my_charset_utf8_general_ci, TRUE)) < 0 ||
dynstr_append_mem(json, "}", 1))
goto err;
}
......@@ -3631,3 +3784,99 @@ dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json)
json->length= 0;
return rc;
}
/**
Convert to DYNAMIC_COLUMN_VALUE values and names (LEX_STING) dynamic array
@param str The packed string
@param names Where to put names
@param vals Where to put values
@param free_names pointer to free names buffer if there is it.
@return ER_DYNCOL_* return code
*/
enum enum_dyncol_func_result
dynamic_column_vals(DYNAMIC_COLUMN *str,
DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
char **free_names)
{
DYN_HEADER header;
char *nm;
uint i;
enum enum_dyncol_func_result rc;
*free_names= 0;
bzero(names, sizeof(DYNAMIC_ARRAY)); /* In case of errors */
bzero(vals, sizeof(DYNAMIC_ARRAY)); /* In case of errors */
if (str->length == 0)
return ER_DYNCOL_OK; /* no columns */
if ((rc= init_read_hdr(&header, str)) < 0)
return rc;
if (header.entry_size * header.column_count + FIXED_HEADER_SIZE >
str->length)
return ER_DYNCOL_FORMAT;
if (init_dynamic_array(names, sizeof(LEX_STRING),
header.column_count, 0) ||
init_dynamic_array(vals, sizeof(DYNAMIC_COLUMN_VALUE),
header.column_count, 0) ||
(header.format == DYNCOL_FMT_NUM &&
!(*free_names= (char *)malloc(DYNCOL_NUM_CHAR * header.column_count))))
{
rc= ER_DYNCOL_RESOURCE;
goto err;
}
nm= *free_names;
for (i= 0, header.entry= header.header;
i < header.column_count;
i++, header.entry+= header.entry_size)
{
DYNAMIC_COLUMN_VALUE val;
LEX_STRING name;
header.length=
hdr_interval_length(&header, header.entry + header.entry_size);
header.data= header.dtpool + header.offset;
/*
Check that the found data is withing the ranges. This can happen if
we get data with wrong offsets.
*/
if (header.length == DYNCOL_OFFSET_ERROR ||
header.length > INT_MAX || header.offset > header.data_size)
{
rc= ER_DYNCOL_FORMAT;
goto err;
}
if ((rc= dynamic_column_get_value(&header, &val)) < 0)
goto err;
if (header.format == DYNCOL_FMT_NUM)
{
uint num= uint2korr(header.entry);
name.str= nm;
name.length= snprintf(nm, DYNCOL_NUM_CHAR, "%u", num);
nm+= name.length + 1;
}
else
{
name.length= header.entry[0];
name.str= (char *)header.nmpool + uint2korr(header.entry + 1);
}
/* following is preallocated and so do not fail */
(void) insert_dynamic(names, (uchar *)&name);
(void) insert_dynamic(vals, (uchar *)&val);
}
return ER_DYNCOL_OK;
err:
delete_dynamic(names);
delete_dynamic(vals);
if (*free_names)
my_free(*free_names);
*free_names= 0;
return rc;
}
......@@ -9773,6 +9773,7 @@ int dynamic_column_error_message(enum_dyncol_func_result rc)
switch (rc) {
case ER_DYNCOL_YES:
case ER_DYNCOL_OK:
case ER_DYNCOL_TRUNCATED:
break; // it is not an error
case ER_DYNCOL_FORMAT:
my_error(ER_DYN_COL_WRONG_FORMAT, MYF(0));
......
......@@ -272,6 +272,7 @@ bool rename_temporary_table(THD* thd, TABLE *table, const char *new_db,
const char *table_name);
bool is_equal(const LEX_STRING *a, const LEX_STRING *b);
class Open_tables_backup;
/* Functions to work with system tables. */
bool open_system_tables_for_read(THD *thd, TABLE_LIST *table_list,
Open_tables_backup *backup);
......
......@@ -12,7 +12,7 @@ SET(cassandra_sources
gen-cpp/Cassandra.h)
#INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS})
INCLUDE_DIRECTORIES(AFTER /home/psergey/cassandra/thrift/include/thrift/)
INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift)
#
STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
......
......@@ -17,6 +17,12 @@
#include "cassandra_se.h"
struct st_mysql_lex_string
{
char *str;
size_t length;
};
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::transport;
......@@ -74,6 +80,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
std::string rowkey; /* key of the record we're returning now */
SlicePredicate slice_pred;
SliceRange slice_pred_sr;
bool get_slices_returned_less;
bool get_slice_found_rows;
public:
......@@ -91,6 +98,8 @@ class Cassandra_se_impl: public Cassandra_se_interface
void first_ddl_column();
bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
void get_rowkey_type(char **name, char **type);
size_t get_ddl_size();
const char* get_default_validator();
/* Settings */
void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
......@@ -98,15 +107,19 @@ class Cassandra_se_impl: public Cassandra_se_interface
/* Writes */
void clear_insert_buffer();
void start_row_insert(const char *key, int key_len);
void add_insert_column(const char *name, const char *value, int value_len);
void add_insert_column(const char *name, int name_len,
const char *value, int value_len);
void add_insert_delete_column(const char *name, int name_len);
void add_row_deletion(const char *key, int key_len,
Column_name_enumerator *col_names);
Column_name_enumerator *col_names,
LEX_STRING *names, uint nnames);
bool do_insert();
/* Reads, point lookups */
bool get_slice(char *key, size_t key_len, bool *found);
bool get_next_read_column(char **name, char **value, int *value_len);
bool get_next_read_column(char **name, int *name_len,
char **value, int *value_len );
void get_read_rowkey(char **value, int *value_len);
/* Reads, multi-row scans */
......@@ -122,6 +135,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
/* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
void clear_read_columns();
void clear_read_all_columns();
void add_read_column(const char *name);
/* Reads, MRR scans */
......@@ -277,6 +291,16 @@ void Cassandra_se_impl::get_rowkey_type(char **name, char **type)
*name= NULL;
}
size_t Cassandra_se_impl::get_ddl_size()
{
return cf_def.column_metadata.size();
}
const char* Cassandra_se_impl::get_default_validator()
{
return cf_def.default_validation_class.c_str();
}
/////////////////////////////////////////////////////////////////////////////
// Data writes
......@@ -316,7 +340,8 @@ void Cassandra_se_impl::start_row_insert(const char *key, int key_len)
void Cassandra_se_impl::add_row_deletion(const char *key, int key_len,
Column_name_enumerator *col_names)
Column_name_enumerator *col_names,
LEX_STRING *names, uint nnames)
{
std::string key_to_delete;
key_to_delete.assign(key, key_len);
......@@ -344,6 +369,9 @@ void Cassandra_se_impl::add_row_deletion(const char *key, int key_len,
const char *col_name;
while ((col_name= col_names->get_next_name()))
slice_pred.column_names.push_back(std::string(col_name));
for (uint i= 0; i < nnames; i++)
slice_pred.column_names.push_back(std::string(names[i].str,
names[i].length));
mut.deletion.predicate= slice_pred;
......@@ -351,7 +379,9 @@ void Cassandra_se_impl::add_row_deletion(const char *key, int key_len,
}
void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
void Cassandra_se_impl::add_insert_column(const char *name,
int name_len,
const char *value,
int value_len)
{
Mutation mut;
......@@ -359,6 +389,9 @@ void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
mut.column_or_supercolumn.__isset.column= true;
Column& col=mut.column_or_supercolumn.column;
if (name_len)
col.name.assign(name, name_len);
else
col.name.assign(name);
col.value.assign(value, value_len);
col.timestamp= insert_timestamp;
......@@ -367,6 +400,23 @@ void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
insert_list->push_back(mut);
}
void Cassandra_se_impl::add_insert_delete_column(const char *name,
int name_len)
{
Mutation mut;
mut.__isset.deletion= true;
mut.deletion.__isset.timestamp= true;
mut.deletion.timestamp= insert_timestamp;
mut.deletion.__isset.predicate= true;
SlicePredicate slice_pred;
slice_pred.__isset.column_names= true;
slice_pred.column_names.push_back(std::string(name, name_len));
mut.deletion.predicate= slice_pred;
insert_list->push_back(mut);
}
bool Cassandra_se_impl::retryable_do_insert()
{
......@@ -444,8 +494,8 @@ bool Cassandra_se_impl::retryable_get_slice()
}
bool Cassandra_se_impl::get_next_read_column(char **name, char **value,
int *value_len)
bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len,
char **value, int *value_len)
{
bool use_counter=false;
while (1)
......@@ -468,12 +518,14 @@ bool Cassandra_se_impl::get_next_read_column(char **name, char **value,
ColumnOrSuperColumn& cs= *column_data_it;
if (use_counter)
{
*name_len= cs.counter_column.name.size();
*name= (char*)cs.counter_column.name.c_str();
*value= (char*)&cs.counter_column.value;
*value_len= sizeof(cs.counter_column.value);
}
else
{
*name_len= cs.column.name.size();
*name= (char*)cs.column.name.c_str();
*value= (char*)cs.column.value.c_str();
*value_len= cs.column.value.length();
......@@ -601,6 +653,13 @@ void Cassandra_se_impl::clear_read_columns()
slice_pred.column_names.clear();
}
void Cassandra_se_impl::clear_read_all_columns()
{
slice_pred_sr.start = "";
slice_pred_sr.finish = "";
slice_pred.__set_slice_range(slice_pred_sr);
}
void Cassandra_se_impl::add_read_column(const char *name_arg)
{
......
......@@ -6,6 +6,8 @@
both together causes compile errors due to conflicts).
*/
struct st_mysql_lex_string;
typedef struct st_mysql_lex_string LEX_STRING;
/* We need to define this here so that ha_cassandra.cc also has access to it */
typedef enum
......@@ -50,19 +52,25 @@ class Cassandra_se_interface
virtual bool next_ddl_column(char **name, int *name_len, char **value,
int *value_len)=0;
virtual void get_rowkey_type(char **name, char **type)=0;
virtual size_t get_ddl_size()=0;
virtual const char* get_default_validator()=0;
/* Writes */
virtual void clear_insert_buffer()=0;
virtual void add_row_deletion(const char *key, int key_len,
Column_name_enumerator *col_names)=0;
Column_name_enumerator *col_names,
LEX_STRING *names, uint nnames)=0;
virtual void start_row_insert(const char *key, int key_len)=0;
virtual void add_insert_column(const char *name, const char *value,
virtual void add_insert_delete_column(const char *name, int name_len)= 0;
virtual void add_insert_column(const char *name, int name_len,
const char *value,
int value_len)=0;
virtual bool do_insert()=0;
/* Reads */
virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
virtual bool get_next_read_column(char **name, char **value, int *value_len)=0;
virtual bool get_next_read_column(char **name, int *name_len,
char **value, int *value_len)=0;
virtual void get_read_rowkey(char **value, int *value_len)=0;
/* Reads, multi-row scans */
......@@ -79,6 +87,7 @@ class Cassandra_se_interface
/* read_set setup */
virtual void clear_read_columns()=0;
virtual void clear_read_all_columns()=0;
virtual void add_read_column(const char *name)=0;
virtual bool truncate()=0;
......
......@@ -11,7 +11,7 @@ namespace org { namespace apache { namespace cassandra {
const cassandraConstants g_cassandra_constants;
cassandraConstants::cassandraConstants() {
cassandra_const_VERSION = "19.32.0";
cassandra_const_VERSION = (char *)"19.32.0";
}
}}} // namespace
......
......@@ -22,10 +22,16 @@
#include "ha_cassandra.h"
#include "sql_class.h"
#define DYNCOL_USUAL 20
#define DYNCOL_DELTA 100
#define DYNCOL_USUAL_REC 1024
#define DYNCOL_DELTA_REC 1024
static handler *cassandra_create_handler(handlerton *hton,
TABLE_SHARE *table,
MEM_ROOT *mem_root);
extern int dynamic_column_error_message(enum_dyncol_func_result rc);
handlerton *cassandra_hton;
......@@ -69,6 +75,25 @@ ha_create_table_option cassandra_table_option_list[]=
HA_TOPTION_END
};
/**
Structure for CREATE TABLE options (field options).
*/
struct ha_field_option_struct
{
bool dyncol_field;
};
ha_create_table_option cassandra_field_option_list[]=
{
/*
Collect all other columns as dynamic here,
the valid values are YES/NO, ON/OFF, 1/0.
The default is 0, that is true, yes, on.
*/
HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
HA_FOPTION_END
};
static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
"Number of rows in an INSERT batch",
......@@ -252,8 +277,7 @@ static int cassandra_init_func(void *p)
*/
cassandra_hton->flags= 0;
cassandra_hton->table_options= cassandra_table_option_list;
//cassandra_hton->field_options= example_field_option_list;
cassandra_hton->field_options= NULL;
cassandra_hton->field_options= cassandra_field_option_list;
mysql_mutex_init(0 /* no instrumentation */,
&cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
......@@ -361,7 +385,11 @@ static handler* cassandra_create_handler(handlerton *hton,
ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
:handler(hton, table_arg),
se(NULL), field_converters(NULL), rowkey_converter(NULL)
se(NULL), field_converters(NULL),
special_type_field_converters(NULL),
special_type_field_names(NULL), n_special_type_fields(0),
rowkey_converter(NULL),
dyncol_field(0), dyncol_set(0)
{}
......@@ -381,7 +409,8 @@ int ha_cassandra::connect_and_check_options(TABLE *table_arg)
int res;
DBUG_ENTER("ha_cassandra::connect_and_check_options");
if ((res= check_table_options(options)))
if ((res= check_field_options(table_arg->s->field)) ||
(res= check_table_options(options)))
DBUG_RETURN(res);
se= create_cassandra_se();
......@@ -403,6 +432,32 @@ int ha_cassandra::connect_and_check_options(TABLE *table_arg)
}
int ha_cassandra::check_field_options(Field **fields)
{
Field **field;
uint i;
DBUG_ENTER("ha_cassandra::check_field_options");
for (field= fields, i= 0; *field; field++, i++)
{
ha_field_option_struct *field_options= (*field)->option_struct;
if (field_options && field_options->dyncol_field)
{
if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
{
my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name);
DBUG_RETURN(HA_WRONG_CREATE_OPTION);
}
dyncol_set= 1;
dyncol_field= i;
bzero(&dynamic_values, sizeof(dynamic_values));
bzero(&dynamic_names, sizeof(dynamic_names));
bzero(&dynamic_rec, sizeof(dynamic_rec));
}
}
DBUG_RETURN(0);
}
int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
{
DBUG_ENTER("ha_cassandra::open");
......@@ -770,15 +825,8 @@ static int convert_hex_digit(const char c)
const char map2number[]="0123456789abcdef";
class UuidDataConverter : public ColumnDataConverter
static void convert_uuid2string(char *str, const char *cass_data)
{
char buf[16]; /* Binary UUID representation */
String str_buf;
public:
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len==16);
char str[37];
char *ptr= str;
/* UUID arrives as 16-byte number in network byte order */
for (uint i=0; i < 16; i++)
......@@ -789,36 +837,54 @@ class UuidDataConverter : public ColumnDataConverter
*(ptr++)= '-';
}
*ptr= 0;
field->store(str, 36,field->charset());
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{
String *uuid_str= field->val_str(&str_buf);
char *pstr= (char*)uuid_str->c_ptr();
if (uuid_str->length() != 36)
return true;
}
static bool convert_string2uuid(char *buf, const char *str)
{
int lower, upper;
for (uint i=0; i < 16; i++)
for (uint i= 0; i < 16; i++)
{
if ((upper= convert_hex_digit(pstr[0])) == -1 ||
(lower= convert_hex_digit(pstr[1])) == -1)
if ((upper= convert_hex_digit(str[0])) == -1 ||
(lower= convert_hex_digit(str[1])) == -1)
{
return true;
}
buf[i]= lower | (upper << 4);
pstr += 2;
str += 2;
if (i == 3 || i == 5 || i == 7 || i == 9)
{
if (pstr[0] != '-')
if (str[0] != '-')
return true;
pstr++;
str++;
}
}
return false;
}
class UuidDataConverter : public ColumnDataConverter
{
char buf[16]; /* Binary UUID representation */
String str_buf;
public:
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len==16);
char str[37];
convert_uuid2string(str, cass_data);
field->store(str, 36,field->charset());
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{
String *uuid_str= field->val_str(&str_buf);
if (uuid_str->length() != 36)
return true;
if (convert_string2uuid(buf, (char*)uuid_str->c_ptr()))
return true;
*cass_data= buf;
*cass_data_len= 16;
return false;
......@@ -826,6 +892,302 @@ class UuidDataConverter : public ColumnDataConverter
~UuidDataConverter(){}
};
/**
Converting dynamic columns types to/from casandra types
*/
bool cassandra_to_dyncol_intLong(const char *cass_data,
int cass_data_len __attribute__((unused)),
DYNAMIC_COLUMN_VALUE *value)
{
value->type= DYN_COL_INT;
#ifdef WORDS_BIGENDIAN
value->x.long_value= (longlong *)*cass_data;
#else
flip64(cass_data, (char *)&value->x.long_value);
#endif
return 0;
}
bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
longlong *tmp= (longlong *) buff;
enum enum_dyncol_func_result rc=
dynamic_column_val_long(tmp, value);
if (rc < 0)
return true;
*cass_data_len= sizeof(longlong);
#ifdef WORDS_BIGENDIAN
*cass_data= (char *)buff;
#else
flip64((char *)buff, (char *)buff + sizeof(longlong));
*cass_data= (char *)buff + sizeof(longlong);
#endif
*freemem= NULL;
return false;
}
bool cassandra_to_dyncol_intInt32(const char *cass_data,
int cass_data_len __attribute__((unused)),
DYNAMIC_COLUMN_VALUE *value)
{
int32 tmp;
value->type= DYN_COL_INT;
#ifdef WORDS_BIGENDIAN
tmp= *((int32 *)cass_data);
#else
flip32(cass_data, (char *)&tmp);
#endif
value->x.long_value= tmp;
return 0;
}
bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong));
enum enum_dyncol_func_result rc=
dynamic_column_val_long(tmp, value);
if (rc < 0)
return true;
*cass_data_len= sizeof(int32);
*cass_data= (char *)buff;
#ifdef WORDS_BIGENDIAN
*((int32 *) buff) = (int32) *tmp;
#else
{
int32 tmp2= (int32) *tmp;
flip32((char *)&tmp2, (char *)buff);
}
#endif
*freemem= NULL;
return false;
}
bool cassandra_to_dyncol_intCounter(const char *cass_data,
int cass_data_len __attribute__((unused)),
DYNAMIC_COLUMN_VALUE *value)
{
value->type= DYN_COL_INT;
value->x.long_value= *((longlong *)cass_data);
return 0;
}
bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
longlong *tmp= (longlong *)buff;
enum enum_dyncol_func_result rc=
dynamic_column_val_long(tmp, value);
if (rc < 0)
return true;
*cass_data_len= sizeof(longlong);
*cass_data= (char *)buff;
*freemem= NULL;
return false;
}
bool cassandra_to_dyncol_doubleFloat(const char *cass_data,
int cass_data_len __attribute__((unused)),
DYNAMIC_COLUMN_VALUE *value)
{
value->type= DYN_COL_DOUBLE;
value->x.double_value= *((float *)cass_data);
return 0;
}
bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
double tmp;
enum enum_dyncol_func_result rc=
dynamic_column_val_double(&tmp, value);
if (rc < 0)
return true;
*((float *)buff)= (float) tmp;
*cass_data_len= sizeof(float);
*cass_data= (char *)buff;
*freemem= NULL;
return false;
}
bool cassandra_to_dyncol_doubleDouble(const char *cass_data,
int cass_data_len __attribute__((unused)),
DYNAMIC_COLUMN_VALUE *value)
{
value->type= DYN_COL_DOUBLE;
value->x.double_value= *((double *)cass_data);
return 0;
}
bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
double *tmp= (double *)buff;
enum enum_dyncol_func_result rc=
dynamic_column_val_double(tmp, value);
if (rc < 0)
return true;
*cass_data_len= sizeof(double);
*cass_data= (char *)buff;
*freemem= NULL;
return false;
}
bool cassandra_to_dyncol_strStr(const char *cass_data,
int cass_data_len,
DYNAMIC_COLUMN_VALUE *value,
CHARSET_INFO *cs)
{
value->type= DYN_COL_STRING;
value->x.string.charset= cs;
value->x.string.value.str= (char *)cass_data;
value->x.string.value.length= cass_data_len;
value->x.string.nonfreeable= TRUE; // do not try to free
return 0;
}
bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem, CHARSET_INFO *cs)
{
DYNAMIC_STRING tmp;
if (init_dynamic_string(&tmp, NULL, 1024, 1024))
return 1;
enum enum_dyncol_func_result rc=
dynamic_column_val_str(&tmp, value, cs, FALSE);
if (rc < 0)
{
dynstr_free(&tmp);
return 1;
}
*cass_data_len= tmp.length;
*(cass_data)= tmp.str;
*freemem= tmp.str;
return 0;
}
bool cassandra_to_dyncol_strBytes(const char *cass_data,
int cass_data_len,
DYNAMIC_COLUMN_VALUE *value)
{
return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
&my_charset_bin);
}
bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
buff, freemem, &my_charset_bin);
}
bool cassandra_to_dyncol_strAscii(const char *cass_data,
int cass_data_len,
DYNAMIC_COLUMN_VALUE *value)
{
return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
&my_charset_latin1_bin);
}
bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
buff, freemem, &my_charset_latin1_bin);
}
bool cassandra_to_dyncol_strUTF8(const char *cass_data,
int cass_data_len,
DYNAMIC_COLUMN_VALUE *value)
{
return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
&my_charset_utf8_unicode_ci);
}
bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
buff, freemem, &my_charset_utf8_unicode_ci);
}
bool cassandra_to_dyncol_strUUID(const char *cass_data,
int cass_data_len,
DYNAMIC_COLUMN_VALUE *value)
{
value->type= DYN_COL_STRING;
value->x.string.charset= &my_charset_bin;
value->x.string.value.str= (char *)my_malloc(37, MYF(0));
if (!value->x.string.value.str)
{
value->x.string.value.length= 0;
value->x.string.nonfreeable= TRUE;
return 1;
}
convert_uuid2string(value->x.string.value.str, cass_data);
value->x.string.value.length= 36;
value->x.string.nonfreeable= FALSE;
return 0;
}
bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
DYNAMIC_STRING tmp;
if (init_dynamic_string(&tmp, NULL, 1024, 1024))
return true;
enum enum_dyncol_func_result rc=
dynamic_column_val_str(&tmp, value, &my_charset_latin1_bin, FALSE);
if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str))
{
dynstr_free(&tmp);
return true;
}
*cass_data_len= tmp.length;
*(cass_data)= tmp.str;
*freemem= tmp.str;
return 0;
}
bool cassandra_to_dyncol_intBool(const char *cass_data,
int cass_data_len,
DYNAMIC_COLUMN_VALUE *value)
{
value->type= DYN_COL_INT;
value->x.long_value= (cass_data[0] ? 1 : 0);
return 0;
}
bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value,
char **cass_data, int *cass_data_len,
void* buff, void **freemem)
{
longlong tmp;
enum enum_dyncol_func_result rc=
dynamic_column_val_long(&tmp, value);
if (rc < 0)
return true;
((char *)buff)[0]= (tmp ? 1 : 0);
*cass_data_len= 1;
*(cass_data)= (char *)buff;
*freemem= 0;
return 0;
}
const char * const validator_bigint= "org.apache.cassandra.db.marshal.LongType";
const char * const validator_int= "org.apache.cassandra.db.marshal.Int32Type";
......@@ -849,6 +1211,126 @@ const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerTyp
const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType";
static CASSANDRA_TYPE_DEF cassandra_types[]=
{
{
validator_bigint,
&cassandra_to_dyncol_intLong,
&dyncol_to_cassandraLong
},
{
validator_int,
&cassandra_to_dyncol_intInt32,
&dyncol_to_cassandraInt32
},
{
validator_counter,
cassandra_to_dyncol_intCounter,
&dyncol_to_cassandraCounter
},
{
validator_float,
&cassandra_to_dyncol_doubleFloat,
&dyncol_to_cassandraFloat
},
{
validator_double,
&cassandra_to_dyncol_doubleDouble,
&dyncol_to_cassandraDouble
},
{
validator_blob,
&cassandra_to_dyncol_strBytes,
&dyncol_to_cassandraBytes
},
{
validator_ascii,
&cassandra_to_dyncol_strAscii,
&dyncol_to_cassandraAscii
},
{
validator_text,
&cassandra_to_dyncol_strUTF8,
&dyncol_to_cassandraUTF8
},
{
validator_timestamp,
&cassandra_to_dyncol_intLong,
&dyncol_to_cassandraLong
},
{
validator_uuid,
&cassandra_to_dyncol_strUUID,
&dyncol_to_cassandraUUID
},
{
validator_boolean,
&cassandra_to_dyncol_intBool,
&dyncol_to_cassandraBool
},
{
validator_varint,
&cassandra_to_dyncol_strBytes,
&dyncol_to_cassandraBytes
},
{
validator_decimal,
&cassandra_to_dyncol_strBytes,
&dyncol_to_cassandraBytes
}
};
CASSANDRA_TYPE get_cassandra_type(const char *validator)
{
CASSANDRA_TYPE rc;
switch(validator[32])
{
case 'L':
rc= CT_BIGINT;
break;
case 'I':
rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
rc= CT_INT;
break;
case 'C':
rc= CT_COUNTER;
break;
case 'F':
rc= CT_FLOAT;
break;
case 'D':
switch (validator[33])
{
case 'o':
rc= CT_DOUBLE;
break;
case 'a':
rc= CT_TIMESTAMP;
break;
case 'e':
rc= CT_DECIMAL;
break;
default:
rc= CT_BLOB;
break;
}
break;
case 'B':
rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
break;
case 'A':
rc= CT_ASCII;
break;
case 'U':
rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
break;
default:
rc= CT_BLOB;
}
DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
return rc;
}
ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
{
ColumnDataConverter *res= NULL;
......@@ -943,23 +1425,87 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
int col_name_len;
char *col_type;
int col_type_len;
size_t ddl_fields= se->get_ddl_size();
const char *default_type= se->get_default_validator();
uint max_non_default_fields;
DBUG_ENTER("ha_cassandra::setup_field_converters");
DBUG_ASSERT(default_type);
DBUG_ASSERT(!field_converters);
size_t memsize= sizeof(ColumnDataConverter*) * n_fields;
DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
/*
We always should take into account that in case of using dynamic columns
sql description contain one field which does not described in
Cassandra DDL also key field is described separately. So that
is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
*/
max_non_default_fields= ddl_fields + 2 - n_fields;
if (ddl_fields < (n_fields - dyncol_set - 1))
{
se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
DBUG_RETURN(true);
}
/* allocate memory in one chunk */
size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
(sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
(dyncol_set ? max_non_default_fields : 0);
if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
return true;
DBUG_RETURN(true);
bzero(field_converters, memsize);
n_field_converters= n_fields;
if (dyncol_set)
{
special_type_field_converters=
(CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
special_type_field_names=
((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
}
if (dyncol_set)
{
if (init_dynamic_array(&dynamic_values,
sizeof(DYNAMIC_COLUMN_VALUE),
DYNCOL_USUAL, DYNCOL_DELTA))
DBUG_RETURN(true);
else
if (init_dynamic_array(&dynamic_names,
sizeof(LEX_STRING),
DYNCOL_USUAL, DYNCOL_DELTA))
{
delete_dynamic(&dynamic_values);
DBUG_RETURN(true);
}
else
if (init_dynamic_string(&dynamic_rec, NULL,
DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
{
delete_dynamic(&dynamic_values);
delete_dynamic(&dynamic_names);
DBUG_RETURN(true);
}
/* Dynamic column field has special processing */
field_converters[dyncol_field]= NULL;
default_type_def= cassandra_types + get_cassandra_type(default_type);
}
se->first_ddl_column();
uint n_mapped= 0;
while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
&col_type_len))
{
Field **field;
uint i;
/* Mapping for the 1st field is already known */
for (Field **field= field_arg + 1; *field; field++)
for (field= field_arg + 1, i= 1; *field; field++, i++)
{
if (!strcmp((*field)->field_name, col_name))
if ((!dyncol_set || dyncol_field != i) &&
!strcmp((*field)->field_name, col_name))
{
n_mapped++;
ColumnDataConverter **conv= field_converters + (*field)->field_index;
......@@ -968,14 +1514,28 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
se->print_error("Failed to map column %s to datatype %s",
(*field)->field_name, col_type);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
return true;
DBUG_RETURN(true);
}
(*conv)->field= *field;
}
}
if (dyncol_set && !(*field)) // is needed and not found
{
DBUG_PRINT("info",("Field not found: %s", col_name));
if (strcmp(col_type, default_type))
{
DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
col_name, col_type));
special_type_field_names[n_special_type_fields].length= col_name_len;
special_type_field_names[n_special_type_fields].str= col_name;
special_type_field_converters[n_special_type_fields]=
cassandra_types[get_cassandra_type(col_type)];
n_special_type_fields++;
}
}
}
if (n_mapped != n_fields - 1)
if (n_mapped != n_fields - 1 - dyncol_set)
{
Field *first_unmapped= NULL;
/* Find the first field */
......@@ -992,7 +1552,7 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
se->print_error("Field `%s` could not be mapped to any field in Cassandra",
first_unmapped->field_name);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
return true;
DBUG_RETURN(true);
}
/*
......@@ -1001,16 +1561,17 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
se->get_rowkey_type(&col_name, &col_type);
if (col_name && strcmp(col_name, (*field_arg)->field_name))
{
se->print_error("PRIMARY KEY column must match Cassandra's name '%s'", col_name);
se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
col_name);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
return true;
DBUG_RETURN(true);
}
if (!col_name && strcmp("rowkey", (*field_arg)->field_name))
{
se->print_error("target column family has no key_alias defined, "
"PRIMARY KEY column must be named 'rowkey'");
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
return true;
DBUG_RETURN(true);
}
if (col_type != NULL)
......@@ -1019,7 +1580,7 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
{
se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
return true;
DBUG_RETURN(true);
}
rowkey_converter->field= *field_arg;
}
......@@ -1027,10 +1588,10 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
{
se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
return true;
DBUG_RETURN(true);
}
return false;
DBUG_RETURN(false);
}
......@@ -1039,10 +1600,20 @@ void ha_cassandra::free_field_converters()
delete rowkey_converter;
rowkey_converter= NULL;
if (dyncol_set)
{
delete_dynamic(&dynamic_values);
delete_dynamic(&dynamic_names);
dynstr_free(&dynamic_rec);
}
if (field_converters)
{
for (uint i=0; i < n_field_converters; i++)
if (field_converters[i])
{
DBUG_ASSERT(!dyncol_set || i == dyncol_field);
delete field_converters[i];
}
my_free(field_converters);
field_converters= NULL;
}
......@@ -1081,6 +1652,7 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
{
/* We get here when making lookups like uuid_column='not-an-uuid' */
dbug_tmp_restore_column_map(table->read_set, old_map);
DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
}
......@@ -1125,13 +1697,40 @@ void ha_cassandra::print_conversion_error(const char *field_name,
}
void free_strings(DYNAMIC_COLUMN_VALUE *vals, uint num)
{
for (uint i= 0; i < num; i++)
if (vals[i].type == DYN_COL_STRING &&
!vals[i].x.string.nonfreeable)
my_free(vals[i].x.string.value.str);
}
CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
int cass_name_len)
{
CASSANDRA_TYPE_DEF *type= default_type_def;
for(uint i= 0; i < n_special_type_fields; i++)
{
if (cass_name_len == (int)special_type_field_names[i].length &&
memcmp(cass_name, special_type_field_names[i].str,
cass_name_len) == 0)
{
type= special_type_field_converters + i;
break;
}
}
return type;
}
int ha_cassandra::read_cassandra_columns(bool unpack_pk)
{
char *cass_name;
char *cass_value;
int cass_value_len;
int cass_value_len, cass_name_len;
Field **field;
int res= 0;
ulong total_name_len= 0;
/*
cassandra_to_mariadb() calls will use field->store(...) methods, which
......@@ -1144,16 +1743,18 @@ int ha_cassandra::read_cassandra_columns(bool unpack_pk)
for (field= table->field + 1; *field; field++)
(*field)->set_null();
while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len))
while (!se->get_next_read_column(&cass_name, &cass_name_len,
&cass_value, &cass_value_len))
{
// map to our column. todo: use hash or something..
int idx=1;
bool found= 0;
for (field= table->field + 1; *field; field++)
{
idx++;
if (!strcmp((*field)->field_name, cass_name))
uint fieldnr= (*field)->field_index;
if ((!dyncol_set || dyncol_field != fieldnr) &&
!strcmp((*field)->field_name, cass_name))
{
int fieldnr= (*field)->field_index;
found= 1;
(*field)->set_notnull();
if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
cass_value_len))
......@@ -1166,6 +1767,84 @@ int ha_cassandra::read_cassandra_columns(bool unpack_pk)
break;
}
}
if (dyncol_set && !found)
{
DYNAMIC_COLUMN_VALUE val;
LEX_STRING nm;
CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
cass_name_len);
nm.str= cass_name;
nm.length= cass_name_len;
if (nm.length > MAX_NAME_LENGTH)
{
se->print_error("Unable to convert value for field `%s`"
" from Cassandra's data format. Name"
" length exceed limit of %u: '%s'",
table->field[dyncol_field]->field_name,
(uint)MAX_NAME_LENGTH, cass_name);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
res=1;
goto err;
}
total_name_len+= cass_name_len;
if (nm.length > MAX_TOTAL_NAME_LENGTH)
{
se->print_error("Unable to convert value for field `%s`"
" from Cassandra's data format. Sum of all names"
" length exceed limit of %lu",
table->field[dyncol_field]->field_name,
cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
res=1;
goto err;
}
if ((res= (*(type->cassandra_to_dynamic))(cass_value,
cass_value_len, &val)) ||
insert_dynamic(&dynamic_names, (uchar *) &nm) ||
insert_dynamic(&dynamic_values, (uchar *) &val))
{
if (res)
{
print_conversion_error(cass_name, cass_value, cass_value_len);
}
free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
dynamic_values.elements);
// EOM shouldm be already reported if happened
res=1;
goto err;
}
}
}
dynamic_rec.length= 0;
if (dyncol_set)
{
if (dynamic_column_create_many_internal_fmt(&dynamic_rec,
dynamic_names.elements,
dynamic_names.buffer,
(DYNAMIC_COLUMN_VALUE *)
dynamic_values.buffer,
FALSE,
TRUE) < 0)
dynamic_rec.length= 0;
free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
dynamic_values.elements);
dynamic_values.elements= dynamic_names.elements= 0;
}
if (dyncol_set)
{
if (dynamic_rec.length == 0)
table->field[dyncol_field]->set_null();
else
{
Field_blob *blob= (Field_blob *)table->field[dyncol_field];
blob->set_notnull();
blob->store_length(dynamic_rec.length);
*((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
dynamic_rec.str;
}
}
if (unpack_pk)
......@@ -1187,6 +1866,82 @@ int ha_cassandra::read_cassandra_columns(bool unpack_pk)
return res;
}
int ha_cassandra::read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
String *valcol, char **freenames)
{
String *strcol;
DYNAMIC_COLUMN col;
enum enum_dyncol_func_result rc;
DBUG_ENTER("ha_cassandra::read_dyncol");
Field *field= table->field[dyncol_field];
DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB);
/* It is blob and it does not use buffer */
strcol= field->val_str(NULL, valcol);
if (field->is_null())
{
bzero(vals, sizeof(DYNAMIC_ARRAY));
bzero(names, sizeof(DYNAMIC_ARRAY));
DBUG_RETURN(0); // nothing to write
}
/*
dynamic_column_vals only read the string so we can
cheat here with assignment
*/
bzero(&col, sizeof(col));
col.str= (char *)strcol->ptr();
col.length= strcol->length();
if ((rc= dynamic_column_vals(&col, names, vals, freenames)) < 0)
{
dynamic_column_error_message(rc);
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
DBUG_RETURN(0);
}
int ha_cassandra::write_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names)
{
uint i;
DBUG_ENTER("ha_cassandra::write_dynamic_row");
DBUG_ASSERT(dyncol_set);
DBUG_ASSERT(names->elements == vals->elements);
for (i= 0; i < names->elements; i++)
{
char buff[16];
CASSANDRA_TYPE_DEF *type;
void *freemem= NULL;
char *cass_data;
int cass_data_len;
LEX_STRING *name= dynamic_element(names, i, LEX_STRING*);
DYNAMIC_COLUMN_VALUE *val= dynamic_element(vals, i, DYNAMIC_COLUMN_VALUE*);
DBUG_PRINT("info", ("field %*s", (int)name->length, name->str));
type= get_cassandra_field_def(name->str, (int) name->length);
if ((*type->dynamic_to_cassandra)(val, &cass_data, &cass_data_len,
buff, &freemem))
{
my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
name->str, insert_lineno);
DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
}
se->add_insert_column(name->str, name->length,
cass_data, cass_data_len);
if (freemem)
my_free(freemem);
}
DBUG_RETURN(0);
}
void ha_cassandra::free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
char *free_names)
{
delete_dynamic(names);
delete_dynamic(vals);
if (free_names)
my_free(free_names);
}
int ha_cassandra::write_row(uchar *buf)
{
......@@ -1221,16 +1976,36 @@ int ha_cassandra::write_row(uchar *buf)
{
char *cass_data;
int cass_data_len;
if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
if (dyncol_set && dyncol_field == i)
{
String valcol;
DYNAMIC_ARRAY vals, names;
char *free_names;
int rc;
DBUG_ASSERT(field_converters[i] == NULL);
if (!(rc= read_dyncol(&vals, &names, &valcol, &free_names)))
rc= write_dynamic_row(&vals, &names);
free_dynamic_row(&vals, &names, free_names);
if (rc)
{
dbug_tmp_restore_column_map(table->read_set, old_map);
DBUG_RETURN(rc);
}
}
else
{
if (field_converters[i]->mariadb_to_cassandra(&cass_data,
&cass_data_len))
{
my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
field_converters[i]->field->field_name, insert_lineno);
dbug_tmp_restore_column_map(table->read_set, old_map);
DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
}
se->add_insert_column(field_converters[i]->field->field_name,
se->add_insert_column(field_converters[i]->field->field_name, 0,
cass_data, cass_data_len);
}
}
dbug_tmp_restore_column_map(table->read_set, old_map);
......@@ -1296,9 +2071,16 @@ int ha_cassandra::rnd_init(bool scan)
DBUG_RETURN(0);
}
if (dyncol_set)
{
se->clear_read_all_columns();
}
else
{
se->clear_read_columns();
for (uint i= 1; i < table->s->fields; i++)
se->add_read_column(table->field[i]->field_name);
}
se->read_batch_size= THDVAR(table->in_use, rnd_batch_size);
bres= se->get_range_slices(false);
......@@ -1633,14 +2415,17 @@ class Column_name_enumerator_impl : public Column_name_enumerator
int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
{
DYNAMIC_ARRAY oldvals, oldnames, vals, names;
String oldvalcol, valcol;
char *oldfree_names= NULL, *free_names= NULL;
my_bitmap_map *old_map;
int res;
DBUG_ENTER("ha_cassandra::update_row");
/* Currently, it is guaranteed that new_data == table->record[0] */
DBUG_ASSERT(new_data == table->record[0]);
/* For now, just rewrite the full record */
se->clear_insert_buffer();
old_map= dbug_tmp_use_all_columns(table, table->read_set);
char *old_key;
......@@ -1668,6 +2453,22 @@ int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
else
new_primary_key= false;
if (dyncol_set)
{
Field *field= table->field[dyncol_field];
/* move to get old_data */
my_ptrdiff_t diff;
diff= (my_ptrdiff_t) (old_data - new_data);
field->move_field_offset(diff); // Points now at old_data
if ((res= read_dyncol(&oldvals, &oldnames, &oldvalcol, &oldfree_names)))
DBUG_RETURN(res);
field->move_field_offset(-diff); // back to new_data
if ((res= read_dyncol(&vals, &names, &valcol, &free_names)))
{
free_dynamic_row(&oldnames, &oldvals, oldfree_names);
DBUG_RETURN(res);
}
}
if (new_primary_key)
{
......@@ -1676,7 +2477,10 @@ int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
Add a DELETE operation into the batch
*/
Column_name_enumerator_impl name_enumerator(this);
se->add_row_deletion(old_key, old_key_len, &name_enumerator);
se->add_row_deletion(old_key, old_key_len, &name_enumerator,
(LEX_STRING *)oldnames.buffer,
(dyncol_set ? oldnames.elements : 0));
oldnames.elements= oldvals.elements= 0; // they will be deleted
}
se->start_row_insert(new_key, new_key_len);
......@@ -1686,6 +2490,14 @@ int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
{
char *cass_data;
int cass_data_len;
if (dyncol_set && dyncol_field == i)
{
DBUG_ASSERT(field_converters[i] == NULL);
if ((res= write_dynamic_row(&vals, &names)))
goto err;
}
else
{
if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
{
my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
......@@ -1693,16 +2505,49 @@ int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
dbug_tmp_restore_column_map(table->read_set, old_map);
DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
}
se->add_insert_column(field_converters[i]->field->field_name,
se->add_insert_column(field_converters[i]->field->field_name, 0,
cass_data, cass_data_len);
}
}
if (dyncol_set)
{
/* find removed fields */
uint i= 0, j= 0;
LEX_STRING *onames= (LEX_STRING *)oldnames.buffer;
LEX_STRING *nnames= (LEX_STRING *)names.buffer;
/* both array are sorted */
for(; i < oldnames.elements; i++)
{
int scmp= 0;
while (j < names.elements &&
(nnames[j].length < onames[i].length ||
(nnames[j].length == onames[i].length &&
(scmp= memcmp(nnames[j].str, onames[i].str,
onames[i].length)) < 0)))
j++;
if (j < names.elements &&
nnames[j].length == onames[i].length &&
scmp == 0)
j++;
else
se->add_insert_delete_column(onames[i].str, onames[i].length);
}
}
dbug_tmp_restore_column_map(table->read_set, old_map);
bool res= se->do_insert();
res= se->do_insert();
if (res)
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
err:
if (dyncol_set)
{
free_dynamic_row(&oldnames, &oldvals, oldfree_names);
free_dynamic_row(&names, &vals, free_names);
}
DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
}
......
......@@ -40,6 +40,33 @@ class ColumnDataConverter;
struct ha_table_option_struct;
struct st_dynamic_column_value;
typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data,
int cass_data_len,
struct st_dynamic_column_value *value);
typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value,
char **cass_data,
int *cass_data_len,
void *buf, void **freemem);
struct cassandra_type_def
{
const char *name;
CAS2DYN_CONVERTER cassandra_to_dynamic;
DYN2CAS_CONVERTER dynamic_to_cassandra;
};
typedef struct cassandra_type_def CASSANDRA_TYPE_DEF;
enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE,
CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT,
CT_DECIMAL};
typedef enum cassandtra_type_enum CASSANDRA_TYPE;
/** @brief
Class definition for the storage engine
*/
......@@ -51,9 +78,18 @@ class ha_cassandra: public handler
Cassandra_se_interface *se;
/* description of static part of the table definition */
ColumnDataConverter **field_converters;
uint n_field_converters;
CASSANDRA_TYPE_DEF *default_type_def;
/* description of dynamic columns part */
CASSANDRA_TYPE_DEF *special_type_field_converters;
LEX_STRING *special_type_field_names;
uint n_special_type_fields;
DYNAMIC_ARRAY dynamic_values, dynamic_names;
DYNAMIC_STRING dynamic_rec;
ColumnDataConverter *rowkey_converter;
bool setup_field_converters(Field **field, uint n_fields);
......@@ -65,6 +101,9 @@ class ha_cassandra: public handler
bool doing_insert_batch;
ha_rows insert_rows_batched;
uint dyncol_field;
bool dyncol_set;
/* Used to produce 'wrong column %s at row %lu' warnings */
ha_rows insert_lineno;
void print_conversion_error(const char *field_name,
......@@ -191,6 +230,14 @@ class ha_cassandra: public handler
private:
bool source_exhausted;
bool mrr_start_read();
int check_field_options(Field **fields);
int read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
String *valcol, char **freenames);
int write_dynamic_row(DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals);
void static free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
char *free_names);
CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name,
int cass_name_length);
public:
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment