Commit 29e9406a authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra SE: fix batched insert to flush its buffers after insert operation.

parent 6a827daf
......@@ -210,4 +210,19 @@ select * from t2;
rowkey datecol
1 2012-08-29 01:23:45
delete from t2;
#
# (no MDEV#) Check that insert counters work correctly
#
create table t0 (a int);
insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
set cassandra_insert_batch_size=10;
insert into t2 select A.a+10*B.a, now() from t0 A, t0 B;
inserts insert_batches
100 10
set cassandra_insert_batch_size=1;
insert into t2 select A.a+10*B.a+100, now() from t0 A, t0 B;
inserts insert_batches
100 100
delete from t2;
drop table t2;
drop table t0;
......@@ -232,7 +232,51 @@ insert into t2 values (1, '2012-08-29 01:23:45');
select * from t2;
delete from t2;
--echo #
--echo # (no MDEV#) Check that insert counters work correctly
--echo #
create table t0 (a int);
insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
let $start_inserts=`select variable_value from information_schema.SESSION_STATUS
where variable_name ='Cassandra_row_inserts'`;
let $start_insert_batches=`select variable_value from information_schema.SESSION_STATUS
where variable_name ='Cassandra_row_insert_batches'`;
set cassandra_insert_batch_size=10;
insert into t2 select A.a+10*B.a, now() from t0 A, t0 B;
--disable_query_log
eval select
(select variable_value - $start_inserts from information_schema.SESSION_STATUS
where variable_name ='Cassandra_row_inserts')
AS 'inserts',
(select variable_value - $start_insert_batches from information_schema.SESSION_STATUS
where variable_name ='Cassandra_row_insert_batches')
AS 'insert_batches';
--enable_query_log
let $start_inserts=`select variable_value from information_schema.SESSION_STATUS
where variable_name ='Cassandra_row_inserts'`;
let $start_insert_batches=`select variable_value from information_schema.SESSION_STATUS
where variable_name ='Cassandra_row_insert_batches'`;
set cassandra_insert_batch_size=1;
insert into t2 select A.a+10*B.a+100, now() from t0 A, t0 B;
--disable_query_log
eval select
(select variable_value - $start_inserts from information_schema.SESSION_STATUS
where variable_name ='Cassandra_row_inserts')
AS 'inserts',
(select variable_value - $start_insert_batches from information_schema.SESSION_STATUS
where variable_name ='Cassandra_row_insert_batches')
AS 'insert_batches';
--enable_query_log
delete from t2;
drop table t2;
drop table t0;
############################################################################
## Cassandra cleanup
############################################################################
......
......@@ -291,6 +291,15 @@ void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
bool Cassandra_se_impl::do_insert()
{
bool res= true;
/*
zero-size mutations are allowed by Cassandra's batch_mutate but lets not
do them (we may attempt to do it if there is a bulk insert that stores
exactly @@cassandra_insert_batch_size*n elements.
*/
if (batch_mutation.empty())
return false;
try {
cass->batch_mutate(batch_mutation, cur_consistency_level);
......@@ -298,6 +307,7 @@ bool Cassandra_se_impl::do_insert()
cassandra_counters.row_inserts+= batch_mutation.size();
cassandra_counters.row_insert_batches++;
clear_insert_buffer();
res= false;
} catch (InvalidRequestException ire) {
......@@ -623,3 +633,4 @@ bool Cassandra_se_impl::get_next_multiget_row()
return false;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment