Commit ca51c21f authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra SE:

- added option thrift_port which allows to specify which port to connect to
- not adding username/password - it turns out, there are no authentication
  schemes in stock cassandra distribution.
parent d5e2b5ea
...@@ -73,7 +73,7 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -73,7 +73,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
virtual ~Cassandra_se_impl(){ delete cass; } virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */ /* Connection and DDL checks */
bool connect(const char *host, const char *keyspace); bool connect(const char *host, int port, const char *keyspace);
void set_column_family(const char *cfname) { column_family.assign(cfname); } void set_column_family(const char *cfname) { column_family.assign(cfname); }
bool setup_ddl_checks(); bool setup_ddl_checks();
...@@ -135,7 +135,7 @@ Cassandra_se_interface *get_cassandra_se() ...@@ -135,7 +135,7 @@ Cassandra_se_interface *get_cassandra_se()
} }
bool Cassandra_se_impl::connect(const char *host, const char *keyspace_arg) bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg)
{ {
bool res= true; bool res= true;
...@@ -143,7 +143,7 @@ bool Cassandra_se_impl::connect(const char *host, const char *keyspace_arg) ...@@ -143,7 +143,7 @@ bool Cassandra_se_impl::connect(const char *host, const char *keyspace_arg)
try { try {
boost::shared_ptr<TTransport> socket = boost::shared_ptr<TTransport> socket =
boost::shared_ptr<TSocket>(new TSocket(host, 9160)); boost::shared_ptr<TSocket>(new TSocket(host, port));
boost::shared_ptr<TTransport> tr = boost::shared_ptr<TTransport> tr =
boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket)); boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket));
boost::shared_ptr<TProtocol> p = boost::shared_ptr<TProtocol> p =
...@@ -680,3 +680,4 @@ bool Cassandra_se_impl::get_next_multiget_row() ...@@ -680,3 +680,4 @@ bool Cassandra_se_impl::get_next_multiget_row()
} }
...@@ -17,7 +17,7 @@ class Cassandra_se_interface ...@@ -17,7 +17,7 @@ class Cassandra_se_interface
virtual ~Cassandra_se_interface(){}; virtual ~Cassandra_se_interface(){};
/* Init */ /* Init */
virtual bool connect(const char *host, const char *port)=0; virtual bool connect(const char *host, int port, const char *keyspace)=0;
virtual void set_column_family(const char *cfname) = 0; virtual void set_column_family(const char *cfname) = 0;
/* Check underlying DDL */ /* Check underlying DDL */
...@@ -82,3 +82,4 @@ extern Cassandra_status_vars cassandra_counters; ...@@ -82,3 +82,4 @@ extern Cassandra_status_vars cassandra_counters;
Cassandra_se_interface *get_cassandra_se(); Cassandra_se_interface *get_cassandra_se();
...@@ -38,7 +38,8 @@ mysql_mutex_t cassandra_mutex; ...@@ -38,7 +38,8 @@ mysql_mutex_t cassandra_mutex;
struct ha_table_option_struct struct ha_table_option_struct
{ {
const char *host; const char *thrift_host;
int thrift_port;
const char *keyspace; const char *keyspace;
const char *column_family; const char *column_family;
}; };
...@@ -49,7 +50,8 @@ ha_create_table_option cassandra_table_option_list[]= ...@@ -49,7 +50,8 @@ ha_create_table_option cassandra_table_option_list[]=
/* /*
one option that takes an arbitrary string one option that takes an arbitrary string
*/ */
HA_TOPTION_STRING("thrift_host", host), HA_TOPTION_STRING("thrift_host", thrift_host),
HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0),
HA_TOPTION_STRING("keyspace", keyspace), HA_TOPTION_STRING("keyspace", keyspace),
HA_TOPTION_STRING("column_family", column_family), HA_TOPTION_STRING("column_family", column_family),
HA_TOPTION_END HA_TOPTION_END
...@@ -283,14 +285,14 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked) ...@@ -283,14 +285,14 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
ha_table_option_struct *options= table->s->option_struct; ha_table_option_struct *options= table->s->option_struct;
fprintf(stderr, "ha_cass: open thrift_host=%s keyspace=%s column_family=%s\n", fprintf(stderr, "ha_cass: open thrift_host=%s keyspace=%s column_family=%s\n",
options->host, options->keyspace, options->column_family); options->thrift_host, options->keyspace, options->column_family);
DBUG_ASSERT(!se); DBUG_ASSERT(!se);
if (!options->host || !options->keyspace || !options->column_family) if (!options->thrift_host || !options->keyspace || !options->column_family)
DBUG_RETURN(HA_WRONG_CREATE_OPTION); DBUG_RETURN(HA_WRONG_CREATE_OPTION);
se= get_cassandra_se(); se= get_cassandra_se();
se->set_column_family(options->column_family); se->set_column_family(options->column_family);
if (se->connect(options->host, options->keyspace)) if (se->connect(options->thrift_host, options->thrift_port, options->keyspace))
{ {
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str()); my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
...@@ -389,7 +391,7 @@ int ha_cassandra::create(const char *name, TABLE *table_arg, ...@@ -389,7 +391,7 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
*/ */
#endif #endif
DBUG_ASSERT(!se); DBUG_ASSERT(!se);
if (!options->host || !options->keyspace || !options->column_family) if (!options->thrift_host || !options->keyspace || !options->column_family)
{ {
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
"thrift_host, keyspace, and column_family table options must be specified"); "thrift_host, keyspace, and column_family table options must be specified");
...@@ -397,7 +399,7 @@ int ha_cassandra::create(const char *name, TABLE *table_arg, ...@@ -397,7 +399,7 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
} }
se= get_cassandra_se(); se= get_cassandra_se();
se->set_column_family(options->column_family); se->set_column_family(options->column_family);
if (se->connect(options->host, options->keyspace)) if (se->connect(options->thrift_host, options->thrift_port, options->keyspace))
{ {
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str()); my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment