Commit fdab0300 authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra storage engine: bulk INSERT support

- bulk inserts themselves
- control variable and counters.
parent 38a3df4a
......@@ -67,3 +67,28 @@ thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
INSERT INTO t1 VALUES (1,1),(2,2);
DELETE FROM t1 ORDER BY a LIMIT 1;
DROP TABLE t1;
#
# Batched INSERT
#
show variables like 'cassandra_insert_batch_size';
Variable_name Value
cassandra_insert_batch_size 100
show status like 'cassandra_row_insert%';
Variable_name Value
Cassandra_row_inserts 8
Cassandra_row_insert_batches 7
CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
INSERT INTO t1 VALUES (1,1),(2,2);
DELETE FROM t1 ORDER BY a LIMIT 1;
DROP TABLE t1;
show status like 'cassandra_row_insert%';
Variable_name Value
Cassandra_row_inserts 10
Cassandra_row_insert_batches 8
# FLUSH STATUS doesn't work for our variables, just like with InnoDB.
flush status;
show status like 'cassandra_row_insert%';
Variable_name Value
Cassandra_row_inserts 10
Cassandra_row_insert_batches 8
......@@ -109,6 +109,24 @@ DELETE FROM t1 ORDER BY a LIMIT 1;
DROP TABLE t1;
--echo #
--echo # Batched INSERT
--echo #
show variables like 'cassandra_insert_batch_size';
show status like 'cassandra_row_insert%';
CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
INSERT INTO t1 VALUES (1,1),(2,2);
DELETE FROM t1 ORDER BY a LIMIT 1;
DROP TABLE t1;
show status like 'cassandra_row_insert%';
--echo # FLUSH STATUS doesn't work for our variables, just like with InnoDB.
flush status;
show status like 'cassandra_row_insert%';
############################################################################
## Cassandra cleanup
############################################################################
......
......@@ -57,7 +57,6 @@ class Cassandra_se_impl: public Cassandra_se_interface
typedef std::map<std::string, ColumnFamilyToMutation> KeyToCfMutationMap;
KeyToCfMutationMap batch_mutation; /* Prepare operation here */
std::string key_to_insert;
int64_t insert_timestamp;
std::vector<Mutation>* insert_list;
......@@ -83,7 +82,8 @@ class Cassandra_se_impl: public Cassandra_se_interface
void get_rowkey_type(char **name, char **type);
/* Writes */
void start_prepare_insert(const char *key, int key_len);
void clear_insert_buffer();
void start_row_insert(const char *key, int key_len);
void add_insert_column(const char *name, const char *value, int value_len);
bool do_insert();
......@@ -233,10 +233,17 @@ int64_t Cassandra_se_impl::get_i64_timestamp()
return ms;
}
void Cassandra_se_impl::start_prepare_insert(const char *key, int key_len)
void Cassandra_se_impl::clear_insert_buffer()
{
key_to_insert.assign(key, key_len);
batch_mutation.clear();
}
void Cassandra_se_impl::start_row_insert(const char *key, int key_len)
{
std::string key_to_insert;
key_to_insert.assign(key, key_len);
batch_mutation[key_to_insert]= ColumnFamilyToMutation();
ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
......@@ -270,6 +277,10 @@ bool Cassandra_se_impl::do_insert()
try {
cass->batch_mutate(batch_mutation, cur_consistency_level);
cassandra_counters.row_inserts+= batch_mutation.size();
cassandra_counters.row_insert_batches++;
res= false;
} catch (InvalidRequestException ire) {
......
......@@ -28,7 +28,8 @@ class Cassandra_se_interface
virtual void get_rowkey_type(char **name, char **type)=0;
/* Writes */
virtual void start_prepare_insert(const char *key, int key_len)=0;
virtual void clear_insert_buffer()=0;
virtual void start_row_insert(const char *key, int key_len)=0;
virtual void add_insert_column(const char *name, const char *value,
int value_len)=0;
virtual bool do_insert()=0;
......@@ -58,4 +59,14 @@ class Cassandra_se_interface
void print_error(const char *format, ...);
};
/* A structure with global counters */
class Cassandra_status_vars
{
public:
ulong row_inserts;
ulong row_insert_batches;
};
extern Cassandra_status_vars cassandra_counters;
Cassandra_se_interface *get_cassandra_se();
......@@ -56,6 +56,23 @@ ha_create_table_option cassandra_table_option_list[]=
};
static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
"Number of rows in an INSERT batch",
NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
static struct st_mysql_sys_var* cassandra_system_variables[]= {
MYSQL_SYSVAR(insert_batch_size),
// MYSQL_SYSVAR(enum_var),
// MYSQL_SYSVAR(ulong_var),
NULL
};
Cassandra_status_vars cassandra_counters;
Cassandra_status_vars cassandra_counters_copy;
/**
@brief
Function we use in the creation of our hash to get key.
......@@ -727,13 +744,16 @@ int ha_cassandra::write_row(uchar *buf)
my_bitmap_map *old_map;
DBUG_ENTER("ha_cassandra::write_row");
if (!doing_insert_batch)
se->clear_insert_buffer();
old_map= dbug_tmp_use_all_columns(table, table->read_set);
/* Convert the key */
char *cass_key;
int cass_key_len;
rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
se->start_prepare_insert(cass_key, cass_key_len);
se->start_row_insert(cass_key, cass_key_len);
/* Convert other fields */
for (uint i= 1; i < table->s->fields; i++)
......@@ -747,7 +767,20 @@ int ha_cassandra::write_row(uchar *buf)
dbug_tmp_restore_column_map(table->read_set, old_map);
bool res= se->do_insert();
bool res;
if (doing_insert_batch)
{
res= 0;
if (++insert_rows_batched >= /*insert_batch_size*/
THDVAR(table->in_use, insert_batch_size))
{
res= se->do_insert();
insert_rows_batched= 0;
}
}
else
res= se->do_insert();
if (res)
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
......@@ -756,6 +789,28 @@ int ha_cassandra::write_row(uchar *buf)
}
void ha_cassandra::start_bulk_insert(ha_rows rows)
{
doing_insert_batch= true;
insert_rows_batched= 0;
se->clear_insert_buffer();
}
int ha_cassandra::end_bulk_insert()
{
DBUG_ENTER("ha_cassandra::end_bulk_insert");
/* Flush out the insert buffer */
doing_insert_batch= false;
bool bres= se->do_insert();
se->clear_insert_buffer();
DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
}
int ha_cassandra::rnd_init(bool scan)
{
bool bres;
......@@ -893,19 +948,14 @@ int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
DBUG_RETURN(rc);
}
#if 0
void ha_cassandra::start_bulk_insert(ha_rows rows)
{
/* Do nothing? */
}
int ha_cassandra::end_bulk_insert()
int ha_cassandra::reset()
{
// TODO!
doing_insert_batch= false;
return 0;
}
#endif
/////////////////////////////////////////////////////////////////////////////
// Dummy implementations start
/////////////////////////////////////////////////////////////////////////////
......@@ -1023,20 +1073,32 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
// Dummy implementations end
/////////////////////////////////////////////////////////////////////////////
static struct st_mysql_sys_var* cassandra_system_variables[]= {
// MYSQL_SYSVAR(enum_var),
// MYSQL_SYSVAR(ulong_var),
NULL
static SHOW_VAR cassandra_status_variables[]= {
{"row_inserts",
(char*) &cassandra_counters.row_inserts, SHOW_LONG},
{"row_insert_batches",
(char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
{NullS, NullS, SHOW_LONG}
};
static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
{
//innodb_export_status();
cassandra_counters_copy= cassandra_counters;
var->type= SHOW_ARRAY;
var->value= (char *) &cassandra_status_variables;
return 0;
}
struct st_mysql_storage_engine cassandra_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
static struct st_mysql_show_var func_status[]=
{
// {"example_func_example", (char *)show_func_example, SHOW_FUNC},
{"Cassandra", (char *)show_cassandra_vars, SHOW_FUNC},
{0,0,SHOW_UNDEF}
};
......
......@@ -47,6 +47,9 @@ class ha_cassandra: public handler
void read_cassandra_columns(bool unpack_pk);
ha_rows rnd_batch_size;
bool doing_insert_batch;
ha_rows insert_rows_batched;
public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra()
......@@ -145,10 +148,12 @@ class ha_cassandra: public handler
*/
virtual double read_time(uint, uint, ha_rows rows)
{ return (double) rows / 20.0+1; }
#if 0
virtual void start_bulk_insert(ha_rows rows);
virtual int end_bulk_insert();
#endif
virtual int reset();
/*
Everything below are methods that we implement in ha_example.cc.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment