Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
2a419f58
Commit
2a419f58
authored
Aug 19, 2012
by
Sergey Petrunya
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
MDEV-431: Cassandra storage engine
- Descriptive error messages - Unpack PK column on range scans
parent
0b52b22b
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
147 additions
and
31 deletions
+147
-31
mysql-test/r/cassandra.result
mysql-test/r/cassandra.result
+15
-4
mysql-test/t/cassandra.test
mysql-test/t/cassandra.test
+8
-2
storage/cassandra/cassandra_se.cc
storage/cassandra/cassandra_se.cc
+31
-3
storage/cassandra/cassandra_se.h
storage/cassandra/cassandra_se.h
+2
-0
storage/cassandra/ha_cassandra.cc
storage/cassandra/ha_cassandra.cc
+87
-21
storage/cassandra/ha_cassandra.h
storage/cassandra/ha_cassandra.h
+4
-1
No files found.
mysql-test/r/cassandra.result
View file @
2a419f58
...
@@ -13,15 +13,26 @@ thrift_host='localhost' keyspace='no_such_keyspace' column_family='colfam';
...
@@ -13,15 +13,26 @@ thrift_host='localhost' keyspace='no_such_keyspace' column_family='colfam';
ERROR HY000: Unable to connect to foreign data source: Default TException. [Keyspace no_such_keyspace does not exist]
ERROR HY000: Unable to connect to foreign data source: Default TException. [Keyspace no_such_keyspace does not exist]
create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra
create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra
thrift_host='localhost' keyspace='no_such_keyspace';
thrift_host='localhost' keyspace='no_such_keyspace';
ERROR HY000:
Can't create table 'test.t1' (errno: 140)
ERROR HY000:
Unable to connect to foreign data source: thrift_host, keyspace, and column_family table options must be s
create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra
create table t1 (rowkey
var
char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra
thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
select * from t1;
rowkey data1 data2
insert into t1 values ('rowkey10', 'data1-value', 123456);
insert into t1 values ('rowkey10', 'data1-value', 123456);
insert into t1 values ('rowkey11', 'data1-value2', 34543);
insert into t1 values ('rowkey11', 'data1-value2', 34543);
insert into t1 values ('rowkey12', 'data1-value3', 454);
select * from t1;
select * from t1;
rowkey data1 data2
rowkey data1 data2
data1-value 123456
rowkey12 data1-value3 454
data1-value2 34543
rowkey10 data1-value 123456
rowkey11 data1-value2 34543
explain
select * from t1 where rowkey='rowkey11';
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 const PRIMARY PRIMARY 38 const 1
select * from t1 where rowkey='rowkey11';
rowkey data1 data2
rowkey11 data1-value2 34543
delete from t1;
delete from t1;
select * from t1;
select * from t1;
rowkey data1 data2
rowkey data1 data2
...
...
mysql-test/t/cassandra.test
View file @
2a419f58
...
@@ -25,7 +25,7 @@ create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra
...
@@ -25,7 +25,7 @@ create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra
thrift_host
=
'localhost'
keyspace
=
'no_such_keyspace'
column_family
=
'colfam'
;
thrift_host
=
'localhost'
keyspace
=
'no_such_keyspace'
column_family
=
'colfam'
;
# No column family specified
# No column family specified
--
error
ER_C
ANT_CREATE_TABL
E
--
error
ER_C
ONNECT_TO_FOREIGN_DATA_SOURC
E
create
table
t1
(
rowkey
char
(
10
)
primary
key
,
column1
char
(
10
))
engine
=
cassandra
create
table
t1
(
rowkey
char
(
10
)
primary
key
,
column1
char
(
10
))
engine
=
cassandra
thrift_host
=
'localhost'
keyspace
=
'no_such_keyspace'
;
thrift_host
=
'localhost'
keyspace
=
'no_such_keyspace'
;
...
@@ -49,13 +49,19 @@ create columnfamily cf1 ( pk varchar primary key, data1 varchar, data2 bigint);
...
@@ -49,13 +49,19 @@ create columnfamily cf1 ( pk varchar primary key, data1 varchar, data2 bigint);
############################################################################
############################################################################
# Now, create a table for real and insert data
# Now, create a table for real and insert data
create
table
t1
(
rowkey
char
(
36
)
primary
key
,
data1
varchar
(
60
),
data2
bigint
)
engine
=
cassandra
create
table
t1
(
rowkey
var
char
(
36
)
primary
key
,
data1
varchar
(
60
),
data2
bigint
)
engine
=
cassandra
thrift_host
=
'localhost'
keyspace
=
'mariadbtest2'
column_family
=
'cf1'
;
thrift_host
=
'localhost'
keyspace
=
'mariadbtest2'
column_family
=
'cf1'
;
select
*
from
t1
;
insert
into
t1
values
(
'rowkey10'
,
'data1-value'
,
123456
);
insert
into
t1
values
(
'rowkey10'
,
'data1-value'
,
123456
);
insert
into
t1
values
(
'rowkey11'
,
'data1-value2'
,
34543
);
insert
into
t1
values
(
'rowkey11'
,
'data1-value2'
,
34543
);
insert
into
t1
values
(
'rowkey12'
,
'data1-value3'
,
454
);
select
*
from
t1
;
select
*
from
t1
;
explain
select
*
from
t1
where
rowkey
=
'rowkey11'
;
select
*
from
t1
where
rowkey
=
'rowkey11'
;
# Check if deletion works
# Check if deletion works
delete
from
t1
;
delete
from
t1
;
select
*
from
t1
;
select
*
from
t1
;
...
...
storage/cassandra/cassandra_se.cc
View file @
2a419f58
...
@@ -64,6 +64,8 @@ class Cassandra_se_impl: public Cassandra_se_interface
...
@@ -64,6 +64,8 @@ class Cassandra_se_impl: public Cassandra_se_interface
/* Resultset we're reading */
/* Resultset we're reading */
std
::
vector
<
KeySlice
>
key_slice_vec
;
std
::
vector
<
KeySlice
>
key_slice_vec
;
std
::
vector
<
KeySlice
>::
iterator
key_slice_it
;
std
::
vector
<
KeySlice
>::
iterator
key_slice_it
;
std
::
string
rowkey
;
/* key of the record we're returning now */
SlicePredicate
slice_pred
;
SlicePredicate
slice_pred
;
public:
public:
...
@@ -77,6 +79,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
...
@@ -77,6 +79,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
bool
setup_ddl_checks
();
bool
setup_ddl_checks
();
void
first_ddl_column
();
void
first_ddl_column
();
bool
next_ddl_column
(
char
**
name
,
int
*
name_len
,
char
**
value
,
int
*
value_len
);
bool
next_ddl_column
(
char
**
name
,
int
*
name_len
,
char
**
value
,
int
*
value_len
);
void
get_rowkey_type
(
char
**
name
,
char
**
type
);
/* Writes */
/* Writes */
void
start_prepare_insert
(
const
char
*
key
,
int
key_len
);
void
start_prepare_insert
(
const
char
*
key
,
int
key_len
);
...
@@ -86,6 +89,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
...
@@ -86,6 +89,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
/* Reads, point lookups */
/* Reads, point lookups */
bool
get_slice
(
char
*
key
,
size_t
key_len
,
bool
*
found
);
bool
get_slice
(
char
*
key
,
size_t
key_len
,
bool
*
found
);
bool
get_next_read_column
(
char
**
name
,
char
**
value
,
int
*
value_len
);
bool
get_next_read_column
(
char
**
name
,
char
**
value
,
int
*
value_len
);
void
get_read_rowkey
(
char
**
value
,
int
*
value_len
);
/* Reads, multi-row scans */
/* Reads, multi-row scans */
bool
get_range_slices
();
bool
get_range_slices
();
...
@@ -193,6 +197,21 @@ bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len,
...
@@ -193,6 +197,21 @@ bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len,
return
false
;
return
false
;
}
}
void
Cassandra_se_impl
::
get_rowkey_type
(
char
**
name
,
char
**
type
)
{
if
(
cf_def
.
__isset
.
key_validation_class
)
*
type
=
(
char
*
)
cf_def
.
key_validation_class
.
c_str
();
else
*
type
=
NULL
;
if
(
cf_def
.
__isset
.
key_alias
)
*
name
=
(
char
*
)
cf_def
.
key_alias
.
c_str
();
else
*
name
=
NULL
;
}
/////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////
// Data writes
// Data writes
/////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////
...
@@ -269,8 +288,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
...
@@ -269,8 +288,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
ColumnParent
cparent
;
ColumnParent
cparent
;
cparent
.
column_family
=
column_family
;
cparent
.
column_family
=
column_family
;
std
::
string
rowkey_str
;
rowkey
.
assign
(
key
,
key_len
);
rowkey_str
.
assign
(
key
,
key_len
);
SlicePredicate
slice_pred
;
SlicePredicate
slice_pred
;
SliceRange
sr
;
SliceRange
sr
;
...
@@ -279,7 +297,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
...
@@ -279,7 +297,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
slice_pred
.
__set_slice_range
(
sr
);
slice_pred
.
__set_slice_range
(
sr
);
try
{
try
{
cass
->
get_slice
(
column_data_vec
,
rowkey
_str
,
cparent
,
slice_pred
,
cass
->
get_slice
(
column_data_vec
,
rowkey
,
cparent
,
slice_pred
,
cur_consistency_level
);
cur_consistency_level
);
if
(
column_data_vec
.
size
()
==
0
)
if
(
column_data_vec
.
size
()
==
0
)
...
@@ -333,6 +351,15 @@ bool Cassandra_se_impl::get_next_read_column(char **name, char **value,
...
@@ -333,6 +351,15 @@ bool Cassandra_se_impl::get_next_read_column(char **name, char **value,
}
}
/* Return the rowkey for the record that was read */
void
Cassandra_se_impl
::
get_read_rowkey
(
char
**
value
,
int
*
value_len
)
{
*
value
=
(
char
*
)
rowkey
.
c_str
();
*
value_len
=
rowkey
.
length
();
}
bool
Cassandra_se_impl
::
get_range_slices
()
//todo: start_range/end_range as parameters
bool
Cassandra_se_impl
::
get_range_slices
()
//todo: start_range/end_range as parameters
{
{
bool
res
=
true
;
bool
res
=
true
;
...
@@ -375,6 +402,7 @@ bool Cassandra_se_impl::get_next_range_slice_row()
...
@@ -375,6 +402,7 @@ bool Cassandra_se_impl::get_next_range_slice_row()
return
true
;
return
true
;
column_data_vec
=
key_slice_it
->
columns
;
column_data_vec
=
key_slice_it
->
columns
;
rowkey
=
key_slice_it
->
key
;
column_data_it
=
column_data_vec
.
begin
();
column_data_it
=
column_data_vec
.
begin
();
key_slice_it
++
;
key_slice_it
++
;
return
false
;
return
false
;
...
...
storage/cassandra/cassandra_se.h
View file @
2a419f58
...
@@ -25,6 +25,7 @@ class Cassandra_se_interface
...
@@ -25,6 +25,7 @@ class Cassandra_se_interface
virtual
void
first_ddl_column
()
=
0
;
virtual
void
first_ddl_column
()
=
0
;
virtual
bool
next_ddl_column
(
char
**
name
,
int
*
name_len
,
char
**
value
,
virtual
bool
next_ddl_column
(
char
**
name
,
int
*
name_len
,
char
**
value
,
int
*
value_len
)
=
0
;
int
*
value_len
)
=
0
;
virtual
void
get_rowkey_type
(
char
**
name
,
char
**
type
)
=
0
;
/* Writes */
/* Writes */
virtual
void
start_prepare_insert
(
const
char
*
key
,
int
key_len
)
=
0
;
virtual
void
start_prepare_insert
(
const
char
*
key
,
int
key_len
)
=
0
;
...
@@ -35,6 +36,7 @@ class Cassandra_se_interface
...
@@ -35,6 +36,7 @@ class Cassandra_se_interface
/* Reads */
/* Reads */
virtual
bool
get_slice
(
char
*
key
,
size_t
key_len
,
bool
*
found
)
=
0
;
virtual
bool
get_slice
(
char
*
key
,
size_t
key_len
,
bool
*
found
)
=
0
;
virtual
bool
get_next_read_column
(
char
**
name
,
char
**
value
,
int
*
value_len
)
=
0
;
virtual
bool
get_next_read_column
(
char
**
name
,
char
**
value
,
int
*
value_len
)
=
0
;
virtual
void
get_read_rowkey
(
char
**
value
,
int
*
value_len
)
=
0
;
/* Reads, multi-row scans */
/* Reads, multi-row scans */
virtual
bool
get_range_slices
()
=
0
;
virtual
bool
get_range_slices
()
=
0
;
...
...
storage/cassandra/ha_cassandra.cc
View file @
2a419f58
...
@@ -212,7 +212,7 @@ static handler* cassandra_create_handler(handlerton *hton,
...
@@ -212,7 +212,7 @@ static handler* cassandra_create_handler(handlerton *hton,
ha_cassandra
::
ha_cassandra
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
)
ha_cassandra
::
ha_cassandra
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
)
:
handler
(
hton
,
table_arg
),
:
handler
(
hton
,
table_arg
),
se
(
NULL
),
field_converters
(
NULL
)
se
(
NULL
),
field_converters
(
NULL
)
,
rowkey_converter
(
NULL
)
{}
{}
...
@@ -251,7 +251,6 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
...
@@ -251,7 +251,6 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
if
(
setup_field_converters
(
table
->
field
,
table
->
s
->
fields
))
if
(
setup_field_converters
(
table
->
field
,
table
->
s
->
fields
))
{
{
my_error
(
ER_CONNECT_TO_FOREIGN_DATA_SOURCE
,
MYF
(
0
),
"setup_field_converters"
);
DBUG_RETURN
(
HA_ERR_NO_CONNECTION
);
DBUG_RETURN
(
HA_ERR_NO_CONNECTION
);
}
}
...
@@ -341,8 +340,12 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
...
@@ -341,8 +340,12 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
*/
*/
#endif
#endif
DBUG_ASSERT
(
!
se
);
DBUG_ASSERT
(
!
se
);
if
(
!
options
->
host
||
!
options
->
keyspace
||
!
options
->
column_family
)
if
(
!
options
->
host
||
!
options
->
keyspace
||
!
options
->
column_family
)
{
my_error
(
ER_CONNECT_TO_FOREIGN_DATA_SOURCE
,
MYF
(
0
),
"thrift_host, keyspace, and column_family table options must be specified"
);
DBUG_RETURN
(
HA_WRONG_CREATE_OPTION
);
DBUG_RETURN
(
HA_WRONG_CREATE_OPTION
);
}
se
=
get_cassandra_se
();
se
=
get_cassandra_se
();
se
->
set_column_family
(
options
->
column_family
);
se
->
set_column_family
(
options
->
column_family
);
if
(
se
->
connect
(
options
->
host
,
options
->
keyspace
))
if
(
se
->
connect
(
options
->
host
,
options
->
keyspace
))
...
@@ -515,6 +518,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
...
@@ -515,6 +518,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
case
MYSQL_TYPE_VAR_STRING
:
case
MYSQL_TYPE_VAR_STRING
:
case
MYSQL_TYPE_VARCHAR
:
case
MYSQL_TYPE_VARCHAR
:
//case MYSQL_TYPE_STRING: <-- todo: should we allow end-padded 'CHAR(N)'?
if
(
!
strcmp
(
validator_name
,
validator_blob
)
||
if
(
!
strcmp
(
validator_name
,
validator_blob
)
||
!
strcmp
(
validator_name
,
validator_ascii
)
||
!
strcmp
(
validator_name
,
validator_ascii
)
||
!
strcmp
(
validator_name
,
validator_text
))
!
strcmp
(
validator_name
,
validator_text
))
...
@@ -560,7 +564,12 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
...
@@ -560,7 +564,12 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
n_mapped
++
;
n_mapped
++
;
ColumnDataConverter
**
conv
=
field_converters
+
(
*
field
)
->
field_index
;
ColumnDataConverter
**
conv
=
field_converters
+
(
*
field
)
->
field_index
;
if
(
!
(
*
conv
=
map_field_to_validator
(
*
field
,
col_type
)))
if
(
!
(
*
conv
=
map_field_to_validator
(
*
field
,
col_type
)))
{
se
->
print_error
(
"Failed to map column %s to datatype %s"
,
(
*
field
)
->
field_name
,
col_type
);
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
return
true
;
return
true
;
}
(
*
conv
)
->
field
=
*
field
;
(
*
conv
)
->
field
=
*
field
;
}
}
}
}
...
@@ -568,6 +577,28 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
...
@@ -568,6 +577,28 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
if
(
n_mapped
!=
n_fields
-
1
)
if
(
n_mapped
!=
n_fields
-
1
)
return
true
;
return
true
;
/*
Setup type conversion for row_key. It may also have a name, but we ignore
it currently
*/
se
->
get_rowkey_type
(
&
col_name
,
&
col_type
);
if
(
col_type
!=
NULL
)
{
if
(
!
(
rowkey_converter
=
map_field_to_validator
(
*
field_arg
,
col_type
)))
{
se
->
print_error
(
"Failed to map PRIMARY KEY to datatype %s"
,
col_type
);
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
return
true
;
}
rowkey_converter
->
field
=
*
field_arg
;
}
else
{
se
->
print_error
(
"Cassandra's rowkey has no defined datatype (todo: support this)"
);
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
return
true
;
}
return
false
;
return
false
;
}
}
...
@@ -575,6 +606,9 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
...
@@ -575,6 +606,9 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
void
ha_cassandra
::
free_field_converters
()
void
ha_cassandra
::
free_field_converters
()
{
{
delete
rowkey_converter
;
rowkey_converter
=
NULL
;
if
(
field_converters
)
if
(
field_converters
)
{
{
for
(
uint
i
=
0
;
i
<
n_field_converters
;
i
++
)
for
(
uint
i
=
0
;
i
<
n_field_converters
;
i
++
)
...
@@ -588,8 +622,8 @@ void ha_cassandra::free_field_converters()
...
@@ -588,8 +622,8 @@ void ha_cassandra::free_field_converters()
void
store_key_image_to_rec
(
Field
*
field
,
uchar
*
ptr
,
uint
len
);
void
store_key_image_to_rec
(
Field
*
field
,
uchar
*
ptr
,
uint
len
);
int
ha_cassandra
::
index_read_map
(
uchar
*
buf
,
const
uchar
*
key
,
int
ha_cassandra
::
index_read_map
(
uchar
*
buf
,
const
uchar
*
key
,
key_part_map
keypart_map
,
key_part_map
keypart_map
,
enum
ha_rkey_function
find_flag
)
enum
ha_rkey_function
find_flag
)
{
{
int
rc
;
int
rc
;
DBUG_ENTER
(
"ha_cassandra::index_read_map"
);
DBUG_ENTER
(
"ha_cassandra::index_read_map"
);
...
@@ -597,19 +631,26 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
...
@@ -597,19 +631,26 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
if
(
find_flag
!=
HA_READ_KEY_EXACT
)
if
(
find_flag
!=
HA_READ_KEY_EXACT
)
DBUG_RETURN
(
HA_ERR_WRONG_COMMAND
);
DBUG_RETURN
(
HA_ERR_WRONG_COMMAND
);
// todo: decode the search key.
uint
key_len
=
calculate_key_len
(
table
,
active_index
,
key
,
keypart_map
);
uint
key_len
=
calculate_key_len
(
table
,
active_index
,
key
,
keypart_map
);
store_key_image_to_rec
(
table
->
field
[
0
],
(
uchar
*
)
key
,
key_len
);
store_key_image_to_rec
(
table
->
field
[
0
],
(
uchar
*
)
key
,
key_len
);
#if 0
char buff[256];
char buff[256];
String tmp(buff,sizeof(buff), &my_charset_bin);
String tmp(buff,sizeof(buff), &my_charset_bin);
tmp.length(0);
tmp.length(0);
String *str;
String *str;
str= table->field[0]->val_str(&tmp);
str= table->field[0]->val_str(&tmp);
#endif
char
*
cass_key
;
int
cass_key_len
;
rowkey_converter
->
mariadb_to_cassandra
(
&
cass_key
,
&
cass_key_len
);
bool
found
;
bool
found
;
if
(
se
->
get_slice
((
char
*
)
str
->
ptr
(),
str
->
length
(),
&
found
))
if
(
se
->
get_slice
(
cass_key
,
cass_key_len
,
&
found
))
{
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
rc
=
HA_ERR_INTERNAL_ERROR
;
rc
=
HA_ERR_INTERNAL_ERROR
;
}
/* TODO: what if we're not reading all columns?? */
/* TODO: what if we're not reading all columns?? */
if
(
!
found
)
if
(
!
found
)
...
@@ -618,14 +659,14 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
...
@@ -618,14 +659,14 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
}
}
else
else
{
{
read_cassandra_columns
();
read_cassandra_columns
(
false
);
}
}
DBUG_RETURN
(
rc
);
DBUG_RETURN
(
rc
);
}
}
void
ha_cassandra
::
read_cassandra_columns
()
void
ha_cassandra
::
read_cassandra_columns
(
bool
unpack_pk
)
{
{
char
*
cass_name
;
char
*
cass_name
;
char
*
cass_value
;
char
*
cass_value
;
...
@@ -659,6 +700,15 @@ void ha_cassandra::read_cassandra_columns()
...
@@ -659,6 +700,15 @@ void ha_cassandra::read_cassandra_columns()
}
}
}
}
}
}
if
(
unpack_pk
)
{
/* Unpack rowkey to primary key */
field
=
table
->
field
;
(
*
field
)
->
set_notnull
();
se
->
get_read_rowkey
(
&
cass_value
,
&
cass_value_len
);
rowkey_converter
->
cassandra_to_mariadb
(
cass_value
,
cass_value_len
);
}
dbug_tmp_restore_column_map
(
table
->
write_set
,
old_map
);
dbug_tmp_restore_column_map
(
table
->
write_set
,
old_map
);
}
}
...
@@ -667,12 +717,13 @@ void ha_cassandra::read_cassandra_columns()
...
@@ -667,12 +717,13 @@ void ha_cassandra::read_cassandra_columns()
int
ha_cassandra
::
write_row
(
uchar
*
buf
)
int
ha_cassandra
::
write_row
(
uchar
*
buf
)
{
{
my_bitmap_map
*
old_map
;
my_bitmap_map
*
old_map
;
char
buff
[
512
];
//
char buff[512];
DBUG_ENTER
(
"ha_cassandra::write_row"
);
DBUG_ENTER
(
"ha_cassandra::write_row"
);
old_map
=
dbug_tmp_use_all_columns
(
table
,
table
->
read_set
);
old_map
=
dbug_tmp_use_all_columns
(
table
,
table
->
read_set
);
/* Convert the key (todo: unify with the rest of the processing) */
/* Convert the key (todo: unify with the rest of the processing) */
#if 0
{
{
Field *pk_col= table->field[0];
Field *pk_col= table->field[0];
String tmp(buff,sizeof(buff), &my_charset_bin);
String tmp(buff,sizeof(buff), &my_charset_bin);
...
@@ -682,6 +733,11 @@ int ha_cassandra::write_row(uchar *buf)
...
@@ -682,6 +733,11 @@ int ha_cassandra::write_row(uchar *buf)
se->start_prepare_insert(str->ptr(), str->length());
se->start_prepare_insert(str->ptr(), str->length());
}
}
#endif
char
*
cass_key
;
int
cass_key_len
;
rowkey_converter
->
mariadb_to_cassandra
(
&
cass_key
,
&
cass_key_len
);
se
->
start_prepare_insert
(
cass_key
,
cass_key_len
);
/* Convert other fields */
/* Convert other fields */
for
(
uint
i
=
1
;
i
<
table
->
s
->
fields
;
i
++
)
for
(
uint
i
=
1
;
i
<
table
->
s
->
fields
;
i
++
)
...
@@ -697,6 +753,9 @@ int ha_cassandra::write_row(uchar *buf)
...
@@ -697,6 +753,9 @@ int ha_cassandra::write_row(uchar *buf)
bool
res
=
se
->
do_insert
();
bool
res
=
se
->
do_insert
();
if
(
res
)
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
DBUG_RETURN
(
res
?
HA_ERR_INTERNAL_ERROR
:
0
);
DBUG_RETURN
(
res
?
HA_ERR_INTERNAL_ERROR
:
0
);
}
}
...
@@ -713,6 +772,8 @@ int ha_cassandra::rnd_init(bool scan)
...
@@ -713,6 +772,8 @@ int ha_cassandra::rnd_init(bool scan)
se
->
add_read_column
(
table
->
field
[
i
]
->
field_name
);
se
->
add_read_column
(
table
->
field
[
i
]
->
field_name
);
bres
=
se
->
get_range_slices
();
bres
=
se
->
get_range_slices
();
if
(
bres
)
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
DBUG_RETURN
(
bres
?
HA_ERR_INTERNAL_ERROR
:
0
);
DBUG_RETURN
(
bres
?
HA_ERR_INTERNAL_ERROR
:
0
);
}
}
...
@@ -739,7 +800,7 @@ int ha_cassandra::rnd_next(uchar *buf)
...
@@ -739,7 +800,7 @@ int ha_cassandra::rnd_next(uchar *buf)
}
}
else
else
{
{
read_cassandra_columns
();
read_cassandra_columns
(
true
);
rc
=
0
;
rc
=
0
;
}
}
...
@@ -753,11 +814,22 @@ int ha_cassandra::delete_all_rows()
...
@@ -753,11 +814,22 @@ int ha_cassandra::delete_all_rows()
DBUG_ENTER
(
"ha_cassandra::delete_all_rows"
);
DBUG_ENTER
(
"ha_cassandra::delete_all_rows"
);
bres
=
se
->
truncate
();
bres
=
se
->
truncate
();
if
(
bres
)
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
DBUG_RETURN
(
bres
?
HA_ERR_INTERNAL_ERROR
:
0
);
DBUG_RETURN
(
bres
?
HA_ERR_INTERNAL_ERROR
:
0
);
}
}
int
ha_cassandra
::
delete_row
(
const
uchar
*
buf
)
{
DBUG_ENTER
(
"ha_cassandra::delete_row"
);
// todo: delete the row we've just read.
DBUG_RETURN
(
HA_ERR_WRONG_COMMAND
);
}
/////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////
// Dummy implementations start
// Dummy implementations start
/////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////
...
@@ -815,7 +887,8 @@ ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
...
@@ -815,7 +887,8 @@ ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
key_range
*
max_key
)
key_range
*
max_key
)
{
{
DBUG_ENTER
(
"ha_cassandra::records_in_range"
);
DBUG_ENTER
(
"ha_cassandra::records_in_range"
);
DBUG_RETURN
(
10
);
// low number to force index usage
//DBUG_RETURN(10); // low number to force index usage
DBUG_RETURN
(
HA_POS_ERROR
);
}
}
...
@@ -866,13 +939,6 @@ int ha_cassandra::delete_table(const char *name)
...
@@ -866,13 +939,6 @@ int ha_cassandra::delete_table(const char *name)
}
}
int
ha_cassandra
::
delete_row
(
const
uchar
*
buf
)
{
DBUG_ENTER
(
"ha_cassandra::delete_row"
);
DBUG_RETURN
(
HA_ERR_WRONG_COMMAND
);
}
/**
/**
check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
if new and old definition are compatible
if new and old definition are compatible
...
...
storage/cassandra/ha_cassandra.h
View file @
2a419f58
...
@@ -38,10 +38,13 @@ class ha_cassandra: public handler
...
@@ -38,10 +38,13 @@ class ha_cassandra: public handler
ColumnDataConverter
**
field_converters
;
ColumnDataConverter
**
field_converters
;
uint
n_field_converters
;
uint
n_field_converters
;
ColumnDataConverter
*
rowkey_converter
;
bool
setup_field_converters
(
Field
**
field
,
uint
n_fields
);
bool
setup_field_converters
(
Field
**
field
,
uint
n_fields
);
void
free_field_converters
();
void
free_field_converters
();
void
read_cassandra_columns
();
void
read_cassandra_columns
(
bool
unpack_pk
);
public:
public:
ha_cassandra
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
);
ha_cassandra
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
);
~
ha_cassandra
()
~
ha_cassandra
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment