Commit acfa0b00 authored by Sergey Petrunya's avatar Sergey Petrunya

MDEV-431: Cassandra storage engine

- Introduce type converters (so far rather trivial)
- switch INSERT to using batch_mutate()
parent 15f429d6
......@@ -49,7 +49,7 @@ create columnfamily cf1 ( pk varchar primary key, data1 varchar);
############################################################################
# Now, create a table for real and insert data
create table t1 (rowkey char(36) primary key, column1 char(60)) engine=cassandra
create table t1 (rowkey char(36) primary key, data1 varchar(60)) engine=cassandra
thrift_host='localhost' keyspace='mariadbtest' column_family='cf1';
insert into t1 values ('key0', 'data1');
......
......@@ -43,53 +43,56 @@ class Cassandra_se_impl: public Cassandra_se_interface
std::string column_family;
std::string keyspace;
/* DDL checks */
/* DDL data */
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/
std::vector<ColumnDef>::iterator column_ddl_it;
/* The list that was returned by the last key lookup */
std::vector<ColumnOrSuperColumn> col_supercol_vec;
std::vector<ColumnOrSuperColumn> column_data_vec;
std::vector<ColumnOrSuperColumn>::iterator column_data_it;
/* Insert preparation */
typedef std::map<std::string, std::vector<Mutation> > ColumnFamilyToMutation;
typedef std::map<std::string, ColumnFamilyToMutation> KeyToCfMutationMap;
KeyToCfMutationMap batch_mutation; /* Prepare operation here */
std::string key_to_insert;
int64_t insert_timestamp;
std::vector<Mutation>* insert_list;
public:
Cassandra_se_impl() : cass(NULL) {}
virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */
bool connect(const char *host, const char *keyspace);
void set_column_family(const char *cfname) { column_family.assign(cfname); }
virtual void set_column_family(const char *cfname)
{
column_family.assign(cfname);
}
virtual bool insert(NameAndValue *fields);
virtual bool get_slice(char *key, size_t key_len, NameAndValue *row, bool *found);
/* Functions to enumerate ColumnFamily's DDL data */
bool setup_ddl_checks();
void first_ddl_column();
bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
/* Writes */
void start_prepare_insert(const char *key, int key_len);
void add_insert_column(const char *name, const char *value, int value_len);
bool do_insert();
/* Reads */
bool get_slice(char *key, size_t key_len, bool *found);
bool get_next_read_column(char **name, char **value, int *value_len);
};
/////////////////////////////////////////////////////////////////////////////
// Connection and setup
/////////////////////////////////////////////////////////////////////////////
Cassandra_se_interface *get_cassandra_se()
{
return new Cassandra_se_impl;
}
#define CASS_TRY(x) try { \
x; \
}catch(TTransportException te){ \
print_error("%s [%d]", te.what(), te.getType()); \
}catch(InvalidRequestException ire){ \
print_error("%s [%s]", ire.what(), ire.why.c_str()); \
}catch(NotFoundException nfe){ \
print_error("%s", nfe.what()); \
} catch(...) { \
print_error("Unknown Exception"); \
}
bool Cassandra_se_impl::connect(const char *host, const char *keyspace_arg)
{
......@@ -121,10 +124,9 @@ bool Cassandra_se_impl::connect(const char *host, const char *keyspace_arg)
print_error("Unknown Exception");
}
// For now:
cur_consistency_level= ConsistencyLevel::ONE;
if (setup_ddl_checks())
if (!res && setup_ddl_checks())
res= true;
return res;
}
......@@ -176,13 +178,20 @@ bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len,
return false;
}
/////////////////////////////////////////////////////////////////////////////
// Data writes
/////////////////////////////////////////////////////////////////////////////
bool Cassandra_se_impl::insert(NameAndValue *fields)
void Cassandra_se_impl::start_prepare_insert(const char *key, int key_len)
{
ColumnParent cparent;
cparent.column_family= column_family;
Column c;
key_to_insert.assign(key, key_len);
batch_mutation.clear();
batch_mutation[key_to_insert]= ColumnFamilyToMutation();
ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
cf_mut[column_family]= std::vector<Mutation>();
insert_list= &cf_mut[column_family];
struct timeval td;
gettimeofday(&td, NULL);
int64_t ms = td.tv_sec;
......@@ -190,37 +199,57 @@ bool Cassandra_se_impl::insert(NameAndValue *fields)
int64_t usec = td.tv_usec;
usec = usec / 1000;
ms += usec;
c.timestamp = ms;
c.__isset.timestamp = true;
insert_timestamp= ms;
}
void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
int value_len)
{
Mutation mut;
mut.__isset.column_or_supercolumn= true;
mut.column_or_supercolumn.__isset.column= true;
Column& col=mut.column_or_supercolumn.column;
col.name.assign(name);
col.value.assign(value, value_len);
col.timestamp= insert_timestamp;
col.__isset.value= true;
col.__isset.timestamp= true;
insert_list->push_back(mut);
}
std::string key;
key.assign(fields->value, fields->value_len);
fields++;
bool res= false;
bool Cassandra_se_impl::do_insert()
{
bool res= true;
try {
/* TODO: switch to batch_mutate(). Or, even to CQL? */
// TODO: what should INSERT table (co1, col2) VALUES ('foo', 'bar') mean?
// in SQL, it sets all columns.. what should it mean here? can we have
// it to work only for specified columns? (if yes, what do for
// VALUES()?)
c.__isset.value= true;
for(;fields->name; fields++)
{
c.name.assign(fields->name);
c.value.assign(fields->value, fields->value_len);
cass->insert(key, cparent, c, ConsistencyLevel::ONE);
}
cass->batch_mutate(batch_mutation, cur_consistency_level);
res= false;
} catch (...) {
res= true;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
}
return res;
}
bool Cassandra_se_impl::get_slice(char *key, size_t key_len, NameAndValue *row, bool *found)
/////////////////////////////////////////////////////////////////////////////
// Reading data
/////////////////////////////////////////////////////////////////////////////
/*
Make one key lookup. If the record is found, the result is stored locally and
the caller should iterate over it.
*/
bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
{
ColumnParent cparent;
cparent.column_family= column_family;
......@@ -235,37 +264,57 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, NameAndValue *row,
slice_pred.__set_slice_range(sr);
try {
std::vector<ColumnOrSuperColumn> &res= col_supercol_vec;
cass->get_slice(res, rowkey_str, cparent, slice_pred, ConsistencyLevel::ONE);
*found= true;
cass->get_slice(column_data_vec, rowkey_str, cparent, slice_pred,
ConsistencyLevel::ONE);
std::vector<ColumnOrSuperColumn>::iterator it;
if (res.size() == 0)
if (column_data_vec.size() == 0)
{
/*
/*
No columns found. Cassandra doesn't allow records without any column =>
this means the seach key doesn't exist
*/
*found= false;
return false;
}
for (it= res.begin(); it < res.end(); it++)
{
ColumnOrSuperColumn cs= *it;
if (!cs.__isset.column)
return true;
row->name= (char*)cs.column.name.c_str();
row->value= (char*)cs.column.value.c_str();
row->value_len= cs.column.value.length();
row++;
}
row->name= NULL;
*found= true;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
return true;
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
return true;
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
return true;
}
column_data_it= column_data_vec.begin();
return false;
}
bool Cassandra_se_impl::get_next_read_column(char **name, char **value,
int *value_len)
{
while (1)
{
if (column_data_it == column_data_vec.end())
return true;
if (((*column_data_it).__isset.column))
break; /* Ok it's a real column. Should be always the case. */
column_data_it++;
}
ColumnOrSuperColumn& cs= *column_data_it;
*name= (char*)cs.column.name.c_str();
*value= (char*)cs.column.value.c_str();
*value_len= cs.column.value.length();
column_data_it++;
return false;
}
......@@ -43,10 +43,14 @@ class Cassandra_se_interface
int *value_len)=0;
/* Writes */
virtual bool insert(NameAndValue *fields)=0;
virtual void start_prepare_insert(const char *key, int key_len)=0;
virtual void add_insert_column(const char *name, const char *value,
int value_len)=0;
virtual bool do_insert()=0;
/* Reads */
virtual bool get_slice(char *key, size_t key_len, NameAndValue *row, bool *found)=0 ;
virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
virtual bool get_next_read_column(char **name, char **value, int *value_len)=0;
/* Passing error messages up to ha_cassandra */
char err_buffer[512];
......
This diff is collapsed.
......@@ -24,6 +24,7 @@ typedef struct st_cassandra_share {
THR_LOCK lock;
} CASSANDRA_SHARE;
class ColumnDataConverter;
/** @brief
Class definition for the storage engine
......@@ -38,6 +39,12 @@ class ha_cassandra: public handler
/* pre-allocated array of #fields elements */
NameAndValue *names_and_vals;
NameAndValue *get_names_and_vals();
ColumnDataConverter **field_converters;
uint n_field_converters;
bool setup_field_converters(Field **field, uint n_fields);
void free_field_converters();
public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment