Commit f70af1da authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra SE: Add capability to retry failed API calls

- Add capability to retry calls that have failed with UnavailableException or
  [Cassandra's] TimedOutException. 
- We don't retry for Thrift errors yet, although could easily do, now.
parent d94689d3
...@@ -46,6 +46,9 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -46,6 +46,9 @@ class Cassandra_se_impl: public Cassandra_se_interface
ConsistencyLevel::type write_consistency; ConsistencyLevel::type write_consistency;
ConsistencyLevel::type read_consistency; ConsistencyLevel::type read_consistency;
/* How many times to retry an operation before giving up */
int thrift_call_retries_to_do;
/* DDL data */ /* DDL data */
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */ KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
...@@ -72,10 +75,12 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -72,10 +75,12 @@ class Cassandra_se_impl: public Cassandra_se_interface
SlicePredicate slice_pred; SlicePredicate slice_pred;
bool get_slices_returned_less; bool get_slices_returned_less;
bool get_slice_found_rows;
public: public:
Cassandra_se_impl() : cass(NULL), Cassandra_se_impl() : cass(NULL),
write_consistency(ConsistencyLevel::ONE), write_consistency(ConsistencyLevel::ONE),
read_consistency(ConsistencyLevel::ONE) {} read_consistency(ConsistencyLevel::ONE),
thrift_call_retries_to_do(0) {}
virtual ~Cassandra_se_impl(){ delete cass; } virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */ /* Connection and DDL checks */
...@@ -94,6 +99,7 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -94,6 +99,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
void clear_insert_buffer(); void clear_insert_buffer();
void start_row_insert(const char *key, int key_len); void start_row_insert(const char *key, int key_len);
void add_insert_column(const char *name, const char *value, int value_len); void add_insert_column(const char *name, const char *value, int value_len);
bool do_insert(); bool do_insert();
/* Reads, point lookups */ /* Reads, point lookups */
...@@ -105,6 +111,8 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -105,6 +111,8 @@ class Cassandra_se_impl: public Cassandra_se_interface
private: private:
bool have_rowkey_to_skip; bool have_rowkey_to_skip;
std::string rowkey_to_skip; std::string rowkey_to_skip;
bool get_range_slices_param_last_key_as_start_key;
public: public:
bool get_range_slices(bool last_key_as_start_key); bool get_range_slices(bool last_key_as_start_key);
void finish_reading_range_slices(); void finish_reading_range_slices();
...@@ -119,19 +127,30 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -119,19 +127,30 @@ class Cassandra_se_impl: public Cassandra_se_interface
int add_lookup_key(const char *key, size_t key_len); int add_lookup_key(const char *key, size_t key_len);
bool multiget_slice(); bool multiget_slice();
private:
std::vector<std::string> mrr_keys; /* TODO: can we use allocator to put them onto MRR buffer? */
std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
public:
bool get_next_multiget_row(); bool get_next_multiget_row();
bool truncate(); bool truncate();
bool remove_row(); bool remove_row();
private: private:
bool retryable_truncate();
bool retryable_do_insert();
bool retryable_remove_row();
bool retryable_setup_ddl_checks();
bool retryable_multiget_slice();
bool retryable_get_range_slices();
bool retryable_get_slice();
std::vector<std::string> mrr_keys; /* can we use allocator to put these into MRR buffer? */
std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
/* Non-inherited utility functions: */ /* Non-inherited utility functions: */
int64_t get_i64_timestamp(); int64_t get_i64_timestamp();
typedef bool (Cassandra_se_impl::*retryable_func_t)();
bool try_operation(retryable_func_t func);
}; };
...@@ -189,11 +208,17 @@ void Cassandra_se_impl::set_consistency_levels(ulong read_cons_level, ...@@ -189,11 +208,17 @@ void Cassandra_se_impl::set_consistency_levels(ulong read_cons_level,
} }
bool Cassandra_se_impl::setup_ddl_checks() bool Cassandra_se_impl::retryable_setup_ddl_checks()
{ {
try { try {
cass->describe_keyspace(ks_def, keyspace); cass->describe_keyspace(ks_def, keyspace);
} catch (NotFoundException nfe) {
print_error("keyspace `%s` not found: %s", keyspace.c_str(), nfe.what());
return true;
}
std::vector<CfDef>::iterator it; std::vector<CfDef>::iterator it;
for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++) for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
{ {
...@@ -202,21 +227,17 @@ bool Cassandra_se_impl::setup_ddl_checks() ...@@ -202,21 +227,17 @@ bool Cassandra_se_impl::setup_ddl_checks()
return false; return false;
} }
print_error("describe_keyspace() didn't return our column family"); print_error("Column family %s not found in keyspace %s",
column_family.c_str(),
} catch (InvalidRequestException ire) { keyspace.c_str());
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (NotFoundException nfe) {
print_error("keyspace not found: %s", nfe.what());
}catch(TException e){
print_error("Thrift exception: %s", e.what());
} catch (...) {
print_error("Unknown exception");
}
return true; return true;
} }
bool Cassandra_se_impl::setup_ddl_checks()
{
return try_operation(&Cassandra_se_impl::retryable_setup_ddl_checks);
}
void Cassandra_se_impl::first_ddl_column() void Cassandra_se_impl::first_ddl_column()
{ {
...@@ -309,10 +330,20 @@ void Cassandra_se_impl::add_insert_column(const char *name, const char *value, ...@@ -309,10 +330,20 @@ void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
} }
bool Cassandra_se_impl::do_insert() bool Cassandra_se_impl::retryable_do_insert()
{ {
bool res= true; cass->batch_mutate(batch_mutation, write_consistency);
cassandra_counters.row_inserts+= batch_mutation.size();
cassandra_counters.row_insert_batches++;
clear_insert_buffer();
return 0;
}
bool Cassandra_se_impl::do_insert()
{
/* /*
zero-size mutations are allowed by Cassandra's batch_mutate but lets not zero-size mutations are allowed by Cassandra's batch_mutate but lets not
do them (we may attempt to do it if there is a bulk insert that stores do them (we may attempt to do it if there is a bulk insert that stores
...@@ -321,29 +352,7 @@ bool Cassandra_se_impl::do_insert() ...@@ -321,29 +352,7 @@ bool Cassandra_se_impl::do_insert()
if (batch_mutation.empty()) if (batch_mutation.empty())
return false; return false;
try { return try_operation(&Cassandra_se_impl::retryable_do_insert);
cass->batch_mutate(batch_mutation, write_consistency);
cassandra_counters.row_inserts+= batch_mutation.size();
cassandra_counters.row_insert_batches++;
clear_insert_buffer();
res= false;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
}catch(TException e){
print_error("Thrift exception: %s", e.what());
} catch (...) {
print_error("Unknown exception");
}
return res;
} }
...@@ -357,19 +366,27 @@ bool Cassandra_se_impl::do_insert() ...@@ -357,19 +366,27 @@ bool Cassandra_se_impl::do_insert()
*/ */
bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found) bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
{
bool res;
rowkey.assign(key, key_len);
if (!(res= try_operation(&Cassandra_se_impl::retryable_get_slice)))
*found= get_slice_found_rows;
return res;
}
bool Cassandra_se_impl::retryable_get_slice()
{ {
ColumnParent cparent; ColumnParent cparent;
cparent.column_family= column_family; cparent.column_family= column_family;
rowkey.assign(key, key_len);
SlicePredicate slice_pred; SlicePredicate slice_pred;
SliceRange sr; SliceRange sr;
sr.start = ""; sr.start = "";
sr.finish = ""; sr.finish = "";
slice_pred.__set_slice_range(sr); slice_pred.__set_slice_range(sr);
try {
cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, cass->get_slice(column_data_vec, rowkey, cparent, slice_pred,
read_consistency); read_consistency);
...@@ -379,26 +396,10 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found) ...@@ -379,26 +396,10 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
No columns found. Cassandra doesn't allow records without any column => No columns found. Cassandra doesn't allow records without any column =>
this means the seach key doesn't exist this means the seach key doesn't exist
*/ */
*found= false; get_slice_found_rows= false;
return false; return false;
} }
*found= true; get_slice_found_rows= true;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
return true;
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
return true;
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
return true;
}catch(TException e){
print_error("Thrift exception: %s", e.what());
} catch (...) {
print_error("Unknown exception");
return true;
}
column_data_it= column_data_vec.begin(); column_data_it= column_data_vec.begin();
return false; return false;
...@@ -456,7 +457,15 @@ void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len) ...@@ -456,7 +457,15 @@ void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len)
bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key) bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
{ {
bool res= true; get_range_slices_param_last_key_as_start_key= last_key_as_start_key;
return try_operation(&Cassandra_se_impl::retryable_get_range_slices);
}
bool Cassandra_se_impl::retryable_get_range_slices()
{
bool last_key_as_start_key= get_range_slices_param_last_key_as_start_key;
ColumnParent cparent; ColumnParent cparent;
cparent.column_family= column_family; cparent.column_family= column_family;
...@@ -482,32 +491,17 @@ bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key) ...@@ -482,32 +491,17 @@ bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
key_range.end_key.assign("", 0); key_range.end_key.assign("", 0);
key_range.count= read_batch_size; key_range.count= read_batch_size;
try {
cass->get_range_slices(key_slice_vec, cass->get_range_slices(key_slice_vec, cparent, slice_pred, key_range,
cparent, slice_pred, key_range,
read_consistency); read_consistency);
res= false;
if (key_slice_vec.size() < (uint)read_batch_size) if (key_slice_vec.size() < (uint)read_batch_size)
get_slices_returned_less= true; get_slices_returned_less= true;
else else
get_slices_returned_less= false; get_slices_returned_less= false;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
}catch(TException e){
print_error("Thrift exception: %s", e.what());
} catch (...) {
print_error("Unknown exception");
}
key_slice_it= key_slice_vec.begin(); key_slice_it= key_slice_vec.begin();
return res; return false;
} }
...@@ -574,51 +568,79 @@ void Cassandra_se_impl::add_read_column(const char *name_arg) ...@@ -574,51 +568,79 @@ void Cassandra_se_impl::add_read_column(const char *name_arg)
bool Cassandra_se_impl::truncate() bool Cassandra_se_impl::truncate()
{ {
bool res= true; return try_operation(&Cassandra_se_impl::retryable_truncate);
try { }
cass->truncate(column_family);
res= false;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
}catch(TException e){
print_error("Thrift exception: %s", e.what());
} catch (...) {
print_error("Unknown exception");
}
return res; bool Cassandra_se_impl::retryable_truncate()
{
cass->truncate(column_family);
return 0;
} }
bool Cassandra_se_impl::remove_row() bool Cassandra_se_impl::remove_row()
{ {
bool res= true; return try_operation(&Cassandra_se_impl::retryable_remove_row);
}
bool Cassandra_se_impl::retryable_remove_row()
{
ColumnPath column_path; ColumnPath column_path;
column_path.column_family= column_family; column_path.column_family= column_family;
cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
return 0;
}
/*
This function will try a Cassandra operation, and handle errors.
*/
bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
{
bool res;
int n_retries= thrift_call_retries_to_do;
do
{
res= true;
try { try {
cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency); if ((res= (this->*func_to_call)()))
res= false; {
/*
The function call was made successfully (without timeouts, etc),
but something inside it returned 'true'.
This is supposedly a failure (or "not found" or other negative
result). We need to return this to the caller.
*/
n_retries= 0;
}
} catch (InvalidRequestException ire) { } catch (InvalidRequestException ire) {
n_retries= 0; /* there is no point in retrying this operation */
print_error("%s [%s]", ire.what(), ire.why.c_str()); print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) { } catch (UnavailableException ue) {
cassandra_counters.unavailable_exceptions++;
if (!--n_retries)
print_error("UnavailableException: %s", ue.what()); print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) { } catch (TimedOutException te) {
cassandra_counters.timeout_exceptions++;
if (!--n_retries)
print_error("TimedOutException: %s", te.what()); print_error("TimedOutException: %s", te.what());
}catch(TException e){ }catch(TException e){
/* todo: we may use retry for certain kinds of Thrift errors */
n_retries= 0;
print_error("Thrift exception: %s", e.what()); print_error("Thrift exception: %s", e.what());
} catch (...) { } catch (...) {
n_retries= 0; /* Don't retry */
print_error("Unknown exception"); print_error("Unknown exception");
} }
} while (res && n_retries > 0);
return res; return res;
} }
...@@ -638,8 +660,13 @@ int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len) ...@@ -638,8 +660,13 @@ int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len)
return mrr_keys.size(); return mrr_keys.size();
} }
bool Cassandra_se_impl::multiget_slice() bool Cassandra_se_impl::multiget_slice()
{
return try_operation(&Cassandra_se_impl::retryable_multiget_slice);
}
bool Cassandra_se_impl::retryable_multiget_slice()
{ {
ColumnParent cparent; ColumnParent cparent;
cparent.column_family= column_family; cparent.column_family= column_family;
...@@ -650,34 +677,15 @@ bool Cassandra_se_impl::multiget_slice() ...@@ -650,34 +677,15 @@ bool Cassandra_se_impl::multiget_slice()
sr.finish = ""; sr.finish = "";
slice_pred.__set_slice_range(sr); slice_pred.__set_slice_range(sr);
bool res= true;
try {
cassandra_counters.multiget_reads++; cassandra_counters.multiget_reads++;
cassandra_counters.multiget_keys_scanned += mrr_keys.size(); cassandra_counters.multiget_keys_scanned += mrr_keys.size();
cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred, cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
read_consistency); read_consistency);
cassandra_counters.multiget_rows_read += mrr_result.size(); cassandra_counters.multiget_rows_read += mrr_result.size();
res= false;
mrr_result_it= mrr_result.begin(); mrr_result_it= mrr_result.begin();
} catch (InvalidRequestException ire) { return false;
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
}catch(TException e){
print_error("Thrift exception: %s", e.what());
} catch (...) {
print_error("Unknown exception");
}
return res;
} }
......
...@@ -91,6 +91,9 @@ class Cassandra_status_vars ...@@ -91,6 +91,9 @@ class Cassandra_status_vars
ulong multiget_reads; ulong multiget_reads;
ulong multiget_keys_scanned; ulong multiget_keys_scanned;
ulong multiget_rows_read; ulong multiget_rows_read;
ulong timeout_exceptions;
ulong unavailable_exceptions;
}; };
......
...@@ -82,6 +82,11 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG, ...@@ -82,6 +82,11 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
"Number of rows in an rnd_read (full scan) batch", "Number of rows in an rnd_read (full scan) batch",
NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0); NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
"Number of times to retry Cassandra calls that failed due to timeouts or "
"network communication problems. The default, 0, means not to retry.",
NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0);
/* These match values in enum_cassandra_consistency_level */ /* These match values in enum_cassandra_consistency_level */
const char *cassandra_consistency_level[] = const char *cassandra_consistency_level[] =
{ {
...@@ -161,6 +166,7 @@ static struct st_mysql_sys_var* cassandra_system_variables[]= { ...@@ -161,6 +166,7 @@ static struct st_mysql_sys_var* cassandra_system_variables[]= {
MYSQL_SYSVAR(default_thrift_host), MYSQL_SYSVAR(default_thrift_host),
MYSQL_SYSVAR(write_consistency), MYSQL_SYSVAR(write_consistency),
MYSQL_SYSVAR(read_consistency), MYSQL_SYSVAR(read_consistency),
MYSQL_SYSVAR(failure_retries),
NULL NULL
}; };
...@@ -177,6 +183,11 @@ static SHOW_VAR cassandra_status_variables[]= { ...@@ -177,6 +183,11 @@ static SHOW_VAR cassandra_status_variables[]= {
(char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG}, (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
{"multiget_rows_read", {"multiget_rows_read",
(char*) &cassandra_counters.multiget_rows_read, SHOW_LONG}, (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
{"timeout_exceptions",
(char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
{"unavailable_exceptions",
(char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
{NullS, NullS, SHOW_LONG} {NullS, NullS, SHOW_LONG}
}; };
...@@ -1678,7 +1689,6 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info, ...@@ -1678,7 +1689,6 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff) static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
{ {
//innodb_export_status();
cassandra_counters_copy= cassandra_counters; cassandra_counters_copy= cassandra_counters;
var->type= SHOW_ARRAY; var->type= SHOW_ARRAY;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment