Commit c00a37d1 authored by Sergey Petrunya's avatar Sergey Petrunya

MDEV-4443: Cassandra SE: ERROR 1928 (HY000): Internal error: 'Thrift...

MDEV-4443: Cassandra SE: ERROR 1928 (HY000): Internal error: 'Thrift exception: Called write on non-open socket'
- Made call re-try system also handle network disconnects (it will reconnect before retrying)
- Added Cassandra_network_exceptions counter.
- @@cassandra_failure_retries is now always honored.
parent 08ce9bfe
drop table if exists t0, t1; drop table if exists t0, t1;
#
# Check variables and status counters
#
show status like 'cassandra%';
Variable_name Value
Cassandra_row_inserts 0
Cassandra_row_insert_batches 0
Cassandra_multiget_keys_scanned 0
Cassandra_multiget_reads 0
Cassandra_multiget_rows_read 0
Cassandra_network_exceptions 0
Cassandra_timeout_exceptions 0
Cassandra_unavailable_exceptions 0
show variables like 'cassandra%';
Variable_name Value
cassandra_default_thrift_host
cassandra_failure_retries 3
cassandra_insert_batch_size 100
cassandra_multiget_batch_size 100
cassandra_read_consistency ONE
cassandra_rnd_batch_size 10000
cassandra_write_consistency ONE
#
# Test various errors on table creation.
#
create table t1 (a int) engine=cassandra create table t1 (a int) engine=cassandra
thrift_host='localhost' keyspace='foo' column_family='colfam'; thrift_host='localhost' keyspace='foo' column_family='colfam';
ERROR 42000: This table type requires a primary key ERROR 42000: This table type requires a primary key
......
...@@ -10,7 +10,15 @@ if (`SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.ENGINES WHERE engine = 'cassand ...@@ -10,7 +10,15 @@ if (`SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.ENGINES WHERE engine = 'cassand
drop table if exists t0, t1; drop table if exists t0, t1;
--enable_warnings --enable_warnings
# Test various errors on table creation. --echo #
--echo # Check variables and status counters
--echo #
show status like 'cassandra%';
show variables like 'cassandra%';
--echo #
--echo # Test various errors on table creation.
--echo #
--error ER_REQUIRES_PRIMARY_KEY --error ER_REQUIRES_PRIMARY_KEY
create table t1 (a int) engine=cassandra create table t1 (a int) engine=cassandra
thrift_host='localhost' keyspace='foo' column_family='colfam'; thrift_host='localhost' keyspace='foo' column_family='colfam';
......
...@@ -43,9 +43,13 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -43,9 +43,13 @@ class Cassandra_se_impl: public Cassandra_se_interface
ConsistencyLevel::type write_consistency; ConsistencyLevel::type write_consistency;
ConsistencyLevel::type read_consistency; ConsistencyLevel::type read_consistency;
/* Connection data */
std::string host;
int port;
/* How many times to retry an operation before giving up */ /* How many times to retry an operation before giving up */
int thrift_call_retries_to_do; int thrift_call_retries_to_do;
bool inside_try_operation;
/* DDL data */ /* DDL data */
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */ KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
...@@ -74,15 +78,19 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -74,15 +78,19 @@ class Cassandra_se_impl: public Cassandra_se_interface
SliceRange slice_pred_sr; SliceRange slice_pred_sr;
bool get_slices_returned_less; bool get_slices_returned_less;
bool get_slice_found_rows; bool get_slice_found_rows;
bool reconnect();
public: public:
Cassandra_se_impl() : cass(NULL), Cassandra_se_impl() : cass(NULL),
write_consistency(ConsistencyLevel::ONE), write_consistency(ConsistencyLevel::ONE),
read_consistency(ConsistencyLevel::ONE), read_consistency(ConsistencyLevel::ONE),
thrift_call_retries_to_do(0) {} thrift_call_retries_to_do(1),
inside_try_operation(false)
{}
virtual ~Cassandra_se_impl(){ delete cass; } virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */ /* Connection and DDL checks */
bool connect(const char *host, int port, const char *keyspace); bool connect(const char *host_arg, int port_arg, const char *keyspace);
void set_column_family(const char *cfname) { column_family.assign(cfname); } void set_column_family(const char *cfname) { column_family.assign(cfname); }
bool setup_ddl_checks(); bool setup_ddl_checks();
...@@ -94,6 +102,9 @@ public: ...@@ -94,6 +102,9 @@ public:
/* Settings */ /* Settings */
void set_consistency_levels(ulong read_cons_level, ulong write_cons_level); void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
virtual void set_n_retries(uint retries_arg) {
thrift_call_retries_to_do= retries_arg;
}
/* Writes */ /* Writes */
void clear_insert_buffer(); void clear_insert_buffer();
...@@ -170,15 +181,25 @@ Cassandra_se_interface *create_cassandra_se() ...@@ -170,15 +181,25 @@ Cassandra_se_interface *create_cassandra_se()
} }
bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg) bool Cassandra_se_impl::connect(const char *host_arg, int port_arg, const char *keyspace_arg)
{ {
bool res= true;
keyspace.assign(keyspace_arg); keyspace.assign(keyspace_arg);
host.assign(host_arg);
port= port_arg;
return reconnect();
}
bool Cassandra_se_impl::reconnect()
{
delete cass;
cass= NULL;
bool res= true;
try { try {
boost::shared_ptr<TTransport> socket = boost::shared_ptr<TTransport> socket =
boost::shared_ptr<TSocket>(new TSocket(host, port)); boost::shared_ptr<TSocket>(new TSocket(host.c_str(), port));
boost::shared_ptr<TTransport> tr = boost::shared_ptr<TTransport> tr =
boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket)); boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket));
boost::shared_ptr<TProtocol> p = boost::shared_ptr<TProtocol> p =
...@@ -186,7 +207,7 @@ bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace ...@@ -186,7 +207,7 @@ bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace
cass= new CassandraClient(p); cass= new CassandraClient(p);
tr->open(); tr->open();
cass->set_keyspace(keyspace_arg); cass->set_keyspace(keyspace.c_str());
res= false; // success res= false; // success
}catch(TTransportException te){ }catch(TTransportException te){
...@@ -694,7 +715,10 @@ bool Cassandra_se_impl::retryable_remove_row() ...@@ -694,7 +715,10 @@ bool Cassandra_se_impl::retryable_remove_row()
bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call) bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
{ {
bool res; bool res;
int n_retries= thrift_call_retries_to_do; int n_attempts= thrift_call_retries_to_do;
bool was_inside_try_operation= inside_try_operation;
inside_try_operation= true;
do do
{ {
...@@ -710,31 +734,70 @@ bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call) ...@@ -710,31 +734,70 @@ bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
This is supposedly a failure (or "not found" or other negative This is supposedly a failure (or "not found" or other negative
result). We need to return this to the caller. result). We need to return this to the caller.
*/ */
n_retries= 0; n_attempts= 0;
} }
} catch (InvalidRequestException ire) { } catch (InvalidRequestException ire) {
n_retries= 0; /* there is no point in retrying this operation */ n_attempts= 0; /* there is no point in retrying this operation */
print_error("%s [%s]", ire.what(), ire.why.c_str()); print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) { } catch (UnavailableException ue) {
cassandra_counters.unavailable_exceptions++; cassandra_counters.unavailable_exceptions++;
if (!--n_retries) if (!--n_attempts)
print_error("UnavailableException: %s", ue.what()); print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) { } catch (TimedOutException te) {
/*
Note: this is a timeout generated *inside Cassandra cluster*.
Connection between us and the cluster is ok, but something went wrong
within the cluster.
*/
cassandra_counters.timeout_exceptions++; cassandra_counters.timeout_exceptions++;
if (!--n_retries) if (!--n_attempts)
print_error("TimedOutException: %s", te.what()); print_error("TimedOutException: %s", te.what());
} catch (TTransportException tte) {
/* Something went wrong in communication between us and Cassandra */
cassandra_counters.network_exceptions++;
switch (tte.getType())
{
case TTransportException::NOT_OPEN:
case TTransportException::TIMED_OUT:
case TTransportException::END_OF_FILE:
case TTransportException::INTERRUPTED:
{
if (!was_inside_try_operation && reconnect())
{
/* Failed to reconnect, no point to retry the operation */
n_attempts= 0;
print_error("%s", tte.what());
}
else
{
n_attempts--;
}
break;
}
default:
{
/*
We assume it doesn't make sense to retry for
unknown kinds of TTransportException-s
*/
n_attempts= 0;
print_error("%s", tte.what());
}
}
}catch(TException e){ }catch(TException e){
/* todo: we may use retry for certain kinds of Thrift errors */ /* todo: we may use retry for certain kinds of Thrift errors */
n_retries= 0; n_attempts= 0;
print_error("Thrift exception: %s", e.what()); print_error("Thrift exception: %s", e.what());
} catch (...) { } catch (...) {
n_retries= 0; /* Don't retry */ n_attempts= 0; /* Don't retry */
print_error("Unknown exception"); print_error("Unknown exception");
} }
} while (res && n_retries > 0); } while (res && n_attempts > 0);
inside_try_operation= was_inside_try_operation;
return res; return res;
} }
......
...@@ -45,6 +45,7 @@ public: ...@@ -45,6 +45,7 @@ public:
/* Settings */ /* Settings */
virtual void set_consistency_levels(ulong read_cons_level, ulong write_cons_level)=0; virtual void set_consistency_levels(ulong read_cons_level, ulong write_cons_level)=0;
virtual void set_n_retries(uint retries_arg)=0;
/* Check underlying DDL */ /* Check underlying DDL */
virtual bool setup_ddl_checks()=0; virtual bool setup_ddl_checks()=0;
...@@ -113,6 +114,7 @@ public: ...@@ -113,6 +114,7 @@ public:
ulong timeout_exceptions; ulong timeout_exceptions;
ulong unavailable_exceptions; ulong unavailable_exceptions;
ulong network_exceptions;
}; };
......
...@@ -110,7 +110,7 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG, ...@@ -110,7 +110,7 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG, static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
"Number of times to retry Cassandra calls that failed due to timeouts or " "Number of times to retry Cassandra calls that failed due to timeouts or "
"network communication problems. The default, 0, means not to retry.", "network communication problems. The default, 0, means not to retry.",
NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0); NULL, NULL, /*default*/ 3, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
/* These match values in enum_cassandra_consistency_level */ /* These match values in enum_cassandra_consistency_level */
const char *cassandra_consistency_level[] = const char *cassandra_consistency_level[] =
...@@ -2210,6 +2210,7 @@ int ha_cassandra::reset() ...@@ -2210,6 +2210,7 @@ int ha_cassandra::reset()
{ {
se->set_consistency_levels(THDVAR(table->in_use, read_consistency), se->set_consistency_levels(THDVAR(table->in_use, read_consistency),
THDVAR(table->in_use, write_consistency)); THDVAR(table->in_use, write_consistency));
se->set_n_retries(THDVAR(table->in_use, failure_retries));
} }
return 0; return 0;
} }
...@@ -2581,6 +2582,8 @@ static SHOW_VAR cassandra_status_variables[]= { ...@@ -2581,6 +2582,8 @@ static SHOW_VAR cassandra_status_variables[]= {
{"multiget_rows_read", {"multiget_rows_read",
(char*) &cassandra_counters.multiget_rows_read, SHOW_LONG}, (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
{"network_exceptions",
(char*) &cassandra_counters.network_exceptions, SHOW_LONG},
{"timeout_exceptions", {"timeout_exceptions",
(char*) &cassandra_counters.timeout_exceptions, SHOW_LONG}, (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
{"unavailable_exceptions", {"unavailable_exceptions",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment