Commit ee92b211 authored by Sergei Golubchik's avatar Sergei Golubchik

assisted discovery in federatedx

parent 32ee15d8
CREATE DATABASE federated;
CREATE DATABASE federated;
CREATE TABLE t1 (
`id` int(20) primary key,
`group` int NOT NULL default 1,
`a\\b` int NOT NULL default 2,
`a\\` int unsigned,
`name` varchar(32) default 'name')
DEFAULT CHARSET=latin1;
CREATE TABLE t1 ENGINE=FEDERATED
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/test/t1';
SHOW CREATE TABLE t1;
Table Create Table
t1 CREATE TABLE `t1` (
`id` int(20) NOT NULL,
`group` int(11) NOT NULL DEFAULT '1',
`a\\b` int(11) NOT NULL DEFAULT '2',
`a\\` int(10) unsigned DEFAULT NULL,
`name` varchar(32) DEFAULT 'name',
PRIMARY KEY (`id`)
) ENGINE=FEDERATED DEFAULT CHARSET=latin1 CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/test/t1'
INSERT INTO t1 (id, name) VALUES (1, 'foo');
INSERT INTO t1 (id, name) VALUES (2, 'fee');
SELECT * FROM t1;
id group a\\b a\\ name
1 1 2 NULL foo
2 1 2 NULL fee
DROP TABLE t1;
SELECT * FROM t1;
id group a\\b a\\ name
1 1 2 NULL foo
2 1 2 NULL fee
DROP TABLE t1;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
source include/federated.inc;
source have_federatedx.inc;
connection slave;
CREATE TABLE t1 (
`id` int(20) primary key,
`group` int NOT NULL default 1,
`a\\b` int NOT NULL default 2,
`a\\` int unsigned,
`name` varchar(32) default 'name')
DEFAULT CHARSET=latin1;
connection master;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval CREATE TABLE t1 ENGINE=FEDERATED
CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/test/t1';
--replace_result $SLAVE_MYPORT SLAVE_PORT
SHOW CREATE TABLE t1;
INSERT INTO t1 (id, name) VALUES (1, 'foo');
INSERT INTO t1 (id, name) VALUES (2, 'fee');
--sorted_result
SELECT * FROM t1;
DROP TABLE t1;
connection slave;
--sorted_result
SELECT * FROM t1;
DROP TABLE t1;
source include/federated_cleanup.inc;
...@@ -2091,7 +2091,11 @@ int TABLE_SHARE::init_from_sql_statement_string(THD *thd, bool write, ...@@ -2091,7 +2091,11 @@ int TABLE_SHARE::init_from_sql_statement_string(THD *thd, bool write,
delete file; delete file;
if (frm.str) if (frm.str)
{
option_list= 0; // cleanup existing options ...
option_struct= 0; // ... if it's an assisted discovery
error= init_from_binary_frm_image(thd, write, frm.str, frm.length); error= init_from_binary_frm_image(thd, write, frm.str, frm.length);
}
ret: ret:
my_free(const_cast<uchar*>(frm.str)); my_free(const_cast<uchar*>(frm.str));
......
...@@ -426,6 +426,7 @@ int federatedx_db_init(void *p) ...@@ -426,6 +426,7 @@ int federatedx_db_init(void *p)
federatedx_hton->savepoint_release= ha_federatedx::savepoint_release; federatedx_hton->savepoint_release= ha_federatedx::savepoint_release;
federatedx_hton->commit= ha_federatedx::commit; federatedx_hton->commit= ha_federatedx::commit;
federatedx_hton->rollback= ha_federatedx::rollback; federatedx_hton->rollback= ha_federatedx::rollback;
federatedx_hton->discover_table_structure= ha_federatedx::discover_assisted;
federatedx_hton->create= federatedx_create_handler; federatedx_hton->create= federatedx_create_handler;
federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED; federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED;
...@@ -516,15 +517,16 @@ err: ...@@ -516,15 +517,16 @@ err:
} }
static int parse_url_error(FEDERATEDX_SHARE *share, TABLE *table, int error_num) static int parse_url_error(FEDERATEDX_SHARE *share, TABLE_SHARE *table_s,
int error_num)
{ {
char buf[FEDERATEDX_QUERY_BUFFER_SIZE]; char buf[FEDERATEDX_QUERY_BUFFER_SIZE];
int buf_len; int buf_len;
DBUG_ENTER("ha_federatedx parse_url_error"); DBUG_ENTER("ha_federatedx parse_url_error");
buf_len= min(table->s->connect_string.length, buf_len= min(table_s->connect_string.length,
FEDERATEDX_QUERY_BUFFER_SIZE-1); FEDERATEDX_QUERY_BUFFER_SIZE-1);
strmake(buf, table->s->connect_string.str, buf_len); strmake(buf, table_s->connect_string.str, buf_len);
my_error(error_num, MYF(0), buf); my_error(error_num, MYF(0), buf);
DBUG_RETURN(error_num); DBUG_RETURN(error_num);
} }
...@@ -646,8 +648,8 @@ error: ...@@ -646,8 +648,8 @@ error:
*/ */
static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share,
uint table_create_flag) TABLE_SHARE *table_s, uint table_create_flag)
{ {
uint error_num= (table_create_flag ? uint error_num= (table_create_flag ?
ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
...@@ -657,11 +659,11 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, ...@@ -657,11 +659,11 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table,
share->port= 0; share->port= 0;
share->socket= 0; share->socket= 0;
DBUG_PRINT("info", ("share at %lx", (long unsigned int) share)); DBUG_PRINT("info", ("share at %lx", (long unsigned int) share));
DBUG_PRINT("info", ("Length: %u", (uint) table->s->connect_string.length)); DBUG_PRINT("info", ("Length: %u", (uint) table_s->connect_string.length));
DBUG_PRINT("info", ("String: '%.*s'", (int) table->s->connect_string.length, DBUG_PRINT("info", ("String: '%.*s'", (int) table_s->connect_string.length,
table->s->connect_string.str)); table_s->connect_string.str));
share->connection_string= strmake_root(mem_root, table->s->connect_string.str, share->connection_string= strmake_root(mem_root, table_s->connect_string.str,
table->s->connect_string.length); table_s->connect_string.length);
DBUG_PRINT("info",("parse_url alloced share->connection_string %lx", DBUG_PRINT("info",("parse_url alloced share->connection_string %lx",
(long unsigned int) share->connection_string)); (long unsigned int) share->connection_string));
...@@ -714,9 +716,9 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, ...@@ -714,9 +716,9 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table,
Connection specifies everything but, resort to Connection specifies everything but, resort to
expecting remote and foreign table names to match expecting remote and foreign table names to match
*/ */
share->table_name= strmake_root(mem_root, table->s->table_name.str, share->table_name= strmake_root(mem_root, table_s->table_name.str,
(share->table_name_length= (share->table_name_length=
table->s->table_name.length)); table_s->table_name.length));
DBUG_PRINT("info", DBUG_PRINT("info",
("internal format, default table_name " ("internal format, default table_name "
"share->connection_string: %s share->table_name: %s", "share->connection_string: %s share->table_name: %s",
...@@ -730,7 +732,7 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, ...@@ -730,7 +732,7 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table,
{ {
share->parsed= TRUE; share->parsed= TRUE;
// Add a null for later termination of table name // Add a null for later termination of table name
share->connection_string[table->s->connect_string.length]= 0; share->connection_string[table_s->connect_string.length]= 0;
share->scheme= share->connection_string; share->scheme= share->connection_string;
DBUG_PRINT("info",("parse_url alloced share->scheme: %lx", DBUG_PRINT("info",("parse_url alloced share->scheme: %lx",
(ulong) share->scheme)); (ulong) share->scheme));
...@@ -817,7 +819,7 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table, ...@@ -817,7 +819,7 @@ static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, TABLE *table,
DBUG_RETURN(0); DBUG_RETURN(0);
error: error:
DBUG_RETURN(parse_url_error(share, table, error_num)); DBUG_RETURN(parse_url_error(share, table_s, error_num));
} }
/***************************************************************************** /*****************************************************************************
...@@ -1583,7 +1585,7 @@ static FEDERATEDX_SHARE *get_share(const char *table_name, TABLE *table) ...@@ -1583,7 +1585,7 @@ static FEDERATEDX_SHARE *get_share(const char *table_name, TABLE *table)
tmp_share.share_key= table_name; tmp_share.share_key= table_name;
tmp_share.share_key_length= strlen(table_name); tmp_share.share_key_length= strlen(table_name);
if (parse_url(&mem_root, &tmp_share, table, 0)) if (parse_url(&mem_root, &tmp_share, table->s, 0))
goto error; goto error;
/* TODO: change tmp_share.scheme to LEX_STRING object */ /* TODO: change tmp_share.scheme to LEX_STRING object */
...@@ -3350,7 +3352,7 @@ int ha_federatedx::create(const char *name, TABLE *table_arg, ...@@ -3350,7 +3352,7 @@ int ha_federatedx::create(const char *name, TABLE *table_arg,
federatedx_io *tmp_io= NULL; federatedx_io *tmp_io= NULL;
DBUG_ENTER("ha_federatedx::create"); DBUG_ENTER("ha_federatedx::create");
if ((retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1))) if ((retval= parse_url(thd->mem_root, &tmp_share, table_arg->s, 1)))
goto error; goto error;
/* loopback socket connections hang due to LOCK_open mutex */ /* loopback socket connections hang due to LOCK_open mutex */
...@@ -3572,6 +3574,71 @@ int ha_federatedx::rollback(handlerton *hton, MYSQL_THD thd, bool all) ...@@ -3572,6 +3574,71 @@ int ha_federatedx::rollback(handlerton *hton, MYSQL_THD thd, bool all)
DBUG_RETURN(return_val); DBUG_RETURN(return_val);
} }
/*
Federated supports assisted discovery, like
CREATE TABLE t1 CONNECTION="mysql://joe:pass@192.168.1.111/federated/t1";
but not a fully automatic discovery where a table magically appear
on any use (like, on SELECT * from t1).
*/
int ha_federatedx::discover_assisted(handlerton *hton, THD* thd,
TABLE_SHARE *table_s, HA_CREATE_INFO *info)
{
int error= HA_ERR_NO_CONNECTION;
FEDERATEDX_SHARE tmp_share;
CHARSET_INFO *cs= system_charset_info;
MYSQL mysql;
char buf[1024];
String query(buf, sizeof(buf), cs);
MYSQL_RES *res;
MYSQL_ROW rdata;
ulong *rlen;
if (parse_url(thd->mem_root, &tmp_share, table_s, 1))
return HA_WRONG_CREATE_OPTION;
mysql_init(&mysql);
mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, cs->csname);
if (!mysql_real_connect(&mysql, tmp_share.hostname, tmp_share.username,
tmp_share.password, tmp_share.database,
tmp_share.port, tmp_share.socket, 0))
goto err1;
if (mysql_real_query(&mysql, STRING_WITH_LEN("SET SQL_MODE=NO_TABLE_OPTIONS")))
goto err1;
query.copy(STRING_WITH_LEN("SHOW CREATE TABLE "), cs);
append_ident(&query, tmp_share.table_name,
tmp_share.table_name_length, ident_quote_char);
if (mysql_real_query(&mysql, query.ptr(), query.length()))
goto err1;
if (!((res= mysql_store_result(&mysql))))
goto err1;
if (!(rdata= mysql_fetch_row(res)) || !((rlen= mysql_fetch_lengths(res))))
goto err2;
query.copy(rdata[1], rlen[1], cs);
query.append(STRING_WITH_LEN(" CONNECTION=\""), cs);
query.append(table_s->connect_string.str, table_s->connect_string.length, cs);
query.append('"');
error= table_s->init_from_sql_statement_string(thd, true,
query.ptr(), query.length());
err2:
mysql_free_result(res);
err1:
if (error)
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), mysql_error(&mysql));
mysql_close(&mysql);
return error;
}
struct st_mysql_storage_engine federatedx_storage_engine= struct st_mysql_storage_engine federatedx_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION }; { MYSQL_HANDLERTON_INTERFACE_VERSION };
...@@ -3585,10 +3652,10 @@ maria_declare_plugin(federatedx) ...@@ -3585,10 +3652,10 @@ maria_declare_plugin(federatedx)
PLUGIN_LICENSE_GPL, PLUGIN_LICENSE_GPL,
federatedx_db_init, /* Plugin Init */ federatedx_db_init, /* Plugin Init */
federatedx_done, /* Plugin Deinit */ federatedx_done, /* Plugin Deinit */
0x0200 /* 2.0 */, 0x0201 /* 2.1 */,
NULL, /* status variables */ NULL, /* status variables */
NULL, /* system variables */ NULL, /* system variables */
"2.0", /* string version */ "2.1", /* string version */
MariaDB_PLUGIN_MATURITY_BETA /* maturity */ MariaDB_PLUGIN_MATURITY_BETA /* maturity */
} }
maria_declare_plugin_end; maria_declare_plugin_end;
...@@ -297,6 +297,8 @@ private: ...@@ -297,6 +297,8 @@ private:
static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv); static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv);
static int commit(handlerton *hton, MYSQL_THD thd, bool all); static int commit(handlerton *hton, MYSQL_THD thd, bool all);
static int rollback(handlerton *hton, MYSQL_THD thd, bool all); static int rollback(handlerton *hton, MYSQL_THD thd, bool all);
static int discover_assisted(handlerton *, THD*, TABLE_SHARE *,
HA_CREATE_INFO *);
bool append_stmt_insert(String *query); bool append_stmt_insert(String *query);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment