BUG#26257 New Federated Server Functionality Doesn't support differently named tables

* Modified Federated memory allocation to use MEM_ROOT
* Modified sql_servers and federated to allocate share connection
  parameters to use MEM_ROOT
* Modified Federated to allow tablename in addition to server name
* Implicit flushing of tables using altered/dropped server name
* Added tests to prove new functionality works

Contributors to this patch: Patrick Galbraith, Antony Curtis
parent 14ccc659
...@@ -20,6 +20,14 @@ CREATE TABLE first_db.t1 ( ...@@ -20,6 +20,14 @@ CREATE TABLE first_db.t1 (
`name` varchar(64) NOT NULL default '' `name` varchar(64) NOT NULL default ''
) )
DEFAULT CHARSET=latin1; DEFAULT CHARSET=latin1;
DROP TABLE IF EXISTS first_db.t2;
Warnings:
Note 1051 Unknown table 't2'
CREATE TABLE first_db.t2 (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
use second_db; use second_db;
DROP TABLE IF EXISTS second_db.t1; DROP TABLE IF EXISTS second_db.t1;
Warnings: Warnings:
...@@ -29,6 +37,14 @@ CREATE TABLE second_db.t1 ( ...@@ -29,6 +37,14 @@ CREATE TABLE second_db.t1 (
`name` varchar(64) NOT NULL default '' `name` varchar(64) NOT NULL default ''
) )
DEFAULT CHARSET=latin1; DEFAULT CHARSET=latin1;
DROP TABLE IF EXISTS second_db.t2;
Warnings:
Note 1051 Unknown table 't2'
CREATE TABLE second_db.t2 (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
drop server if exists 'server_one'; drop server if exists 'server_one';
create server 'server_one' foreign data wrapper 'mysql' options create server 'server_one' foreign data wrapper 'mysql' options
(HOST '127.0.0.1', (HOST '127.0.0.1',
...@@ -60,10 +76,10 @@ CREATE TABLE federated.old ( ...@@ -60,10 +76,10 @@ CREATE TABLE federated.old (
) )
ENGINE="FEDERATED" DEFAULT CHARSET=latin1 ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/first_db/t1'; CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/first_db/t1';
INSERT INTO federated.old (id, name) values (1, 'federated.old url'); INSERT INTO federated.old (id, name) values (1, 'federated.old-> first_db.t1, url format');
SELECT * FROM federated.old; SELECT * FROM federated.old;
id name id name
1 federated.old url 1 federated.old-> first_db.t1, url format
DROP TABLE IF EXISTS federated.old2; DROP TABLE IF EXISTS federated.old2;
Warnings: Warnings:
Note 1051 Unknown table 'old2' Note 1051 Unknown table 'old2'
...@@ -72,8 +88,37 @@ CREATE TABLE federated.old2 ( ...@@ -72,8 +88,37 @@ CREATE TABLE federated.old2 (
`name` varchar(64) NOT NULL default '' `name` varchar(64) NOT NULL default ''
) )
ENGINE="FEDERATED" DEFAULT CHARSET=latin1 ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/first_db/t2';
INSERT INTO federated.old2 (id, name) values (1, 'federated.old2-> first_db.t2, url format');
SELECT * FROM federated.old2;
id name
1 federated.old2-> first_db.t2, url format
DROP TABLE IF EXISTS federated.urldb2t1;
Warnings:
Note 1051 Unknown table 'urldb2t1'
CREATE TABLE federated.urldb2t1 (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/second_db/t1'; CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/second_db/t1';
INSERT INTO federated.old2 (id, name) values (1, 'federated.old2 url'); INSERT INTO federated.urldb2t1 (id, name) values (1, 'federated.urldb2t1 -> second_db.t1, url format');
SELECT * FROM federated.urldb2t1;
id name
1 federated.urldb2t1 -> second_db.t1, url format
DROP TABLE IF EXISTS federated.urldb2t2;
Warnings:
Note 1051 Unknown table 'urldb2t2'
CREATE TABLE federated.urldb2t2 (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/second_db/t2';
INSERT INTO federated.urldb2t2 (id, name) values (1, 'federated.urldb2t2 -> second_db.t2, url format');
SELECT * FROM federated.urldb2t2;
id name
1 federated.urldb2t2 -> second_db.t2, url format
DROP TABLE IF EXISTS federated.t1; DROP TABLE IF EXISTS federated.t1;
Warnings: Warnings:
Note 1051 Unknown table 't1' Note 1051 Unknown table 't1'
...@@ -83,18 +128,38 @@ CREATE TABLE federated.t1 ( ...@@ -83,18 +128,38 @@ CREATE TABLE federated.t1 (
) )
ENGINE="FEDERATED" DEFAULT CHARSET=latin1 ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='server_one'; CONNECTION='server_one';
INSERT INTO federated.t1 (id, name) values (1, 'server_one, new scheme'); INSERT INTO federated.t1 (id, name) values (1, 'server_one, new scheme, first_db.t1');
SELECT * FROM federated.t1; SELECT * FROM federated.t1;
id name id name
1 federated.old url 1 federated.old-> first_db.t1, url format
1 server_one, new scheme 1 server_one, new scheme, first_db.t1
DROP TABLE IF EXISTS federated.whatever;
Warnings:
Note 1051 Unknown table 'whatever'
CREATE TABLE federated.whatever (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='server_one/t1';
INSERT INTO federated.whatever (id, name) values (1, 'server_one, new scheme, whatever, first_db.t1');
SELECT * FROM federated.whatever;
id name
1 federated.old-> first_db.t1, url format
1 server_one, new scheme, first_db.t1
1 server_one, new scheme, whatever, first_db.t1
ALTER SERVER 'server_one' options(DATABASE 'second_db'); ALTER SERVER 'server_one' options(DATABASE 'second_db');
flush tables; INSERT INTO federated.t1 (id, name) values (1, 'server_two, new scheme, second_db.t1');
INSERT INTO federated.t1 (id, name) values (1, 'server_two, new scheme');
SELECT * FROM federated.t1; SELECT * FROM federated.t1;
id name id name
1 federated.old2 url 1 federated.urldb2t1 -> second_db.t1, url format
1 server_two, new scheme 1 server_two, new scheme, second_db.t1
INSERT INTO federated.whatever (id, name) values (1, 'server_two, new scheme, whatever, second_db.t1');
SELECT * FROM federated.whatever;
id name
1 federated.urldb2t1 -> second_db.t1, url format
1 server_two, new scheme, second_db.t1
1 server_two, new scheme, whatever, second_db.t1
drop table federated.t1; drop table federated.t1;
drop server 'server_one'; drop server 'server_one';
drop server 'server_two'; drop server 'server_two';
......
...@@ -17,6 +17,13 @@ CREATE TABLE first_db.t1 ( ...@@ -17,6 +17,13 @@ CREATE TABLE first_db.t1 (
) )
DEFAULT CHARSET=latin1; DEFAULT CHARSET=latin1;
DROP TABLE IF EXISTS first_db.t2;
CREATE TABLE first_db.t2 (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
use second_db; use second_db;
DROP TABLE IF EXISTS second_db.t1; DROP TABLE IF EXISTS second_db.t1;
CREATE TABLE second_db.t1 ( CREATE TABLE second_db.t1 (
...@@ -25,6 +32,13 @@ CREATE TABLE second_db.t1 ( ...@@ -25,6 +32,13 @@ CREATE TABLE second_db.t1 (
) )
DEFAULT CHARSET=latin1; DEFAULT CHARSET=latin1;
DROP TABLE IF EXISTS second_db.t2;
CREATE TABLE second_db.t2 (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
connection master; connection master;
drop server if exists 'server_one'; drop server if exists 'server_one';
...@@ -61,7 +75,7 @@ eval CREATE TABLE federated.old ( ...@@ -61,7 +75,7 @@ eval CREATE TABLE federated.old (
ENGINE="FEDERATED" DEFAULT CHARSET=latin1 ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/first_db/t1'; CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/first_db/t1';
INSERT INTO federated.old (id, name) values (1, 'federated.old url'); INSERT INTO federated.old (id, name) values (1, 'federated.old-> first_db.t1, url format');
SELECT * FROM federated.old; SELECT * FROM federated.old;
...@@ -72,9 +86,32 @@ eval CREATE TABLE federated.old2 ( ...@@ -72,9 +86,32 @@ eval CREATE TABLE federated.old2 (
`name` varchar(64) NOT NULL default '' `name` varchar(64) NOT NULL default ''
) )
ENGINE="FEDERATED" DEFAULT CHARSET=latin1 ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/first_db/t2';
INSERT INTO federated.old2 (id, name) values (1, 'federated.old2-> first_db.t2, url format');
SELECT * FROM federated.old2;
DROP TABLE IF EXISTS federated.urldb2t1;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval CREATE TABLE federated.urldb2t1 (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/second_db/t1'; CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/second_db/t1';
INSERT INTO federated.urldb2t1 (id, name) values (1, 'federated.urldb2t1 -> second_db.t1, url format');
SELECT * FROM federated.urldb2t1;
INSERT INTO federated.old2 (id, name) values (1, 'federated.old2 url'); DROP TABLE IF EXISTS federated.urldb2t2;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval CREATE TABLE federated.urldb2t2 (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/second_db/t2';
INSERT INTO federated.urldb2t2 (id, name) values (1, 'federated.urldb2t2 -> second_db.t2, url format');
SELECT * FROM federated.urldb2t2;
DROP TABLE IF EXISTS federated.t1; DROP TABLE IF EXISTS federated.t1;
CREATE TABLE federated.t1 ( CREATE TABLE federated.t1 (
...@@ -84,17 +121,30 @@ CREATE TABLE federated.t1 ( ...@@ -84,17 +121,30 @@ CREATE TABLE federated.t1 (
ENGINE="FEDERATED" DEFAULT CHARSET=latin1 ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='server_one'; CONNECTION='server_one';
INSERT INTO federated.t1 (id, name) values (1, 'server_one, new scheme'); INSERT INTO federated.t1 (id, name) values (1, 'server_one, new scheme, first_db.t1');
SELECT * FROM federated.t1; SELECT * FROM federated.t1;
DROP TABLE IF EXISTS federated.whatever;
CREATE TABLE federated.whatever (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='server_one/t1';
INSERT INTO federated.whatever (id, name) values (1, 'server_one, new scheme, whatever, first_db.t1');
SELECT * FROM federated.whatever;
ALTER SERVER 'server_one' options(DATABASE 'second_db'); ALTER SERVER 'server_one' options(DATABASE 'second_db');
flush tables; # FLUSH TABLES is now unneccessary
INSERT INTO federated.t1 (id, name) values (1, 'server_two, new scheme'); INSERT INTO federated.t1 (id, name) values (1, 'server_two, new scheme, second_db.t1');
SELECT * FROM federated.t1; SELECT * FROM federated.t1;
INSERT INTO federated.whatever (id, name) values (1, 'server_two, new scheme, whatever, second_db.t1');
SELECT * FROM federated.whatever;
drop table federated.t1; drop table federated.t1;
drop server 'server_one'; drop server 'server_one';
......
...@@ -1430,6 +1430,9 @@ void close_open_tables_and_downgrade(ALTER_PARTITION_PARAM_TYPE *lpt); ...@@ -1430,6 +1430,9 @@ void close_open_tables_and_downgrade(ALTER_PARTITION_PARAM_TYPE *lpt);
void mysql_wait_completed_table(ALTER_PARTITION_PARAM_TYPE *lpt, TABLE *my_table); void mysql_wait_completed_table(ALTER_PARTITION_PARAM_TYPE *lpt, TABLE *my_table);
bool close_cached_tables(THD *thd, bool wait_for_refresh, TABLE_LIST *tables, bool have_lock = FALSE); bool close_cached_tables(THD *thd, bool wait_for_refresh, TABLE_LIST *tables, bool have_lock = FALSE);
bool close_cached_connection_tables(THD *thd, bool wait_for_refresh,
LEX_STRING *connect_string,
bool have_lock = FALSE);
void copy_field_from_tmp_record(Field *field,int offset); void copy_field_from_tmp_record(Field *field,int offset);
bool fill_record(THD *thd, Field **field, List<Item> &values, bool fill_record(THD *thd, Field **field, List<Item> &values,
bool ignore_errors); bool ignore_errors);
......
...@@ -858,6 +858,7 @@ void free_io_cache(TABLE *table) ...@@ -858,6 +858,7 @@ void free_io_cache(TABLE *table)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* /*
Close all tables which aren't in use by any thread Close all tables which aren't in use by any thread
...@@ -969,6 +970,71 @@ bool close_cached_tables(THD *thd, bool if_wait_for_refresh, ...@@ -969,6 +970,71 @@ bool close_cached_tables(THD *thd, bool if_wait_for_refresh,
} }
/*
Close all tables which match specified connection string or
if specified string is NULL, then any table with a connection string.
*/
bool close_cached_connection_tables(THD *thd, bool if_wait_for_refresh,
LEX_STRING *connection, bool have_lock)
{
uint idx;
TABLE_LIST tmp, *tables= NULL;
bool result= FALSE;
DBUG_ENTER("close_cached_connections");
DBUG_ASSERT(thd);
bzero(&tmp, sizeof(TABLE_LIST));
if (!have_lock)
VOID(pthread_mutex_lock(&LOCK_open));
for (idx= 0; idx < table_def_cache.records; idx++)
{
TABLE_SHARE *share= (TABLE_SHARE *) hash_element(&table_def_cache, idx);
/* Ignore if table is not open or does not have a connect_string */
if (!share->connect_string.length || !share->ref_count)
continue;
/* Compare the connection string */
if (connection &&
(connection->length > share->connect_string.length ||
(connection->length < share->connect_string.length &&
(share->connect_string.str[connection->length] != '/' &&
share->connect_string.str[connection->length] != '\\')) ||
strncasecmp(connection->str, share->connect_string.str,
connection->length)))
continue;
/* close_cached_tables() only uses these elements */
tmp.db= share->db.str;
tmp.table_name= share->table_name.str;
tmp.next_local= tables;
tables= (TABLE_LIST *) memdup_root(thd->mem_root, (char*)&tmp,
sizeof(TABLE_LIST));
}
if (tables)
result= close_cached_tables(thd, FALSE, tables, TRUE);
if (!have_lock)
VOID(pthread_mutex_unlock(&LOCK_open));
if (if_wait_for_refresh)
{
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex= 0;
thd->mysys_var->current_cond= 0;
thd->proc_info=0;
pthread_mutex_unlock(&thd->mysys_var->mutex);
}
DBUG_RETURN(result);
}
/* /*
Mark all tables in the list which were used by current substatement Mark all tables in the list which were used by current substatement
as free for reuse. as free for reuse.
......
...@@ -16,6 +16,21 @@ ...@@ -16,6 +16,21 @@
/* /*
The servers are saved in the system table "servers" The servers are saved in the system table "servers"
Currently, when the user performs an ALTER SERVER or a DROP SERVER
operation, it will cause all open tables which refer to the named
server connection to be flushed. This may cause some undesirable
behaviour with regard to currently running transactions. It is
expected that the DBA knows what s/he is doing when s/he performs
the ALTER SERVER or DROP SERVER operation.
TODO:
It is desirable for us to implement a callback mechanism instead where
callbacks can be registered for specific server protocols. The callback
will be fired when such a server name has been created/altered/dropped
or when statistics are to be gathered such as how many actual connections.
Storage engines etc will be able to make use of the callback so that
currently running transactions etc will not be disrupted.
*/ */
#include "mysql_priv.h" #include "mysql_priv.h"
...@@ -557,6 +572,8 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -557,6 +572,8 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
int error; int error;
TABLE_LIST tables; TABLE_LIST tables;
TABLE *table; TABLE *table;
LEX_STRING name= { server_options->server_name,
server_options->server_name_length };
DBUG_ENTER("drop_server"); DBUG_ENTER("drop_server");
DBUG_PRINT("info", ("server name server->server_name %s", DBUG_PRINT("info", ("server name server->server_name %s",
...@@ -578,14 +595,16 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -578,14 +595,16 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
goto end; goto end;
} }
error= delete_server_record(table, error= delete_server_record(table, name.str, name.length);
server_options->server_name,
server_options->server_name_length);
/* /* close the servers table before we call closed_cached_connection_tables */
Perform a reload so we don't have a 'hole' in our mem_root close_thread_tables(thd);
*/
servers_load(thd, &tables); if (close_cached_connection_tables(thd, TRUE, &name))
{
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_UNKNOWN_ERROR, "Server connection in use");
}
end: end:
rw_unlock(&THR_LOCK_servers); rw_unlock(&THR_LOCK_servers);
...@@ -975,6 +994,8 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -975,6 +994,8 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
{ {
int error= ER_FOREIGN_SERVER_DOESNT_EXIST; int error= ER_FOREIGN_SERVER_DOESNT_EXIST;
FOREIGN_SERVER *altered, *existing; FOREIGN_SERVER *altered, *existing;
LEX_STRING name= { server_options->server_name,
server_options->server_name_length };
DBUG_ENTER("alter_server"); DBUG_ENTER("alter_server");
DBUG_PRINT("info", ("server_options->server_name %s", DBUG_PRINT("info", ("server_options->server_name %s",
server_options->server_name)); server_options->server_name));
...@@ -982,8 +1003,8 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -982,8 +1003,8 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
rw_wrlock(&THR_LOCK_servers); rw_wrlock(&THR_LOCK_servers);
if (!(existing= (FOREIGN_SERVER *) hash_search(&servers_cache, if (!(existing= (FOREIGN_SERVER *) hash_search(&servers_cache,
(byte*) server_options->server_name, (byte*) name.str,
server_options->server_name_length))) name.length)))
goto end; goto end;
altered= (FOREIGN_SERVER *)alloc_root(&mem, altered= (FOREIGN_SERVER *)alloc_root(&mem,
...@@ -993,6 +1014,15 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -993,6 +1014,15 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
error= update_server(thd, existing, altered); error= update_server(thd, existing, altered);
/* close the servers table before we call closed_cached_connection_tables */
close_thread_tables(thd);
if (close_cached_connection_tables(thd, FALSE, &name))
{
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_UNKNOWN_ERROR, "Server connection in use");
}
end: end:
DBUG_PRINT("info", ("error returned %d", error)); DBUG_PRINT("info", ("error returned %d", error));
rw_unlock(&THR_LOCK_servers); rw_unlock(&THR_LOCK_servers);
...@@ -1143,6 +1173,12 @@ void servers_free(bool end) ...@@ -1143,6 +1173,12 @@ void servers_free(bool end)
DBUG_ENTER("servers_free"); DBUG_ENTER("servers_free");
if (!hash_inited(&servers_cache)) if (!hash_inited(&servers_cache))
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
if (!end)
{
free_root(&mem, MYF(MY_MARK_BLOCKS_FREE));
my_hash_reset(&servers_cache);
DBUG_VOID_RETURN;
}
rwlock_destroy(&THR_LOCK_servers); rwlock_destroy(&THR_LOCK_servers);
free_root(&mem,MYF(0)); free_root(&mem,MYF(0));
hash_free(&servers_cache); hash_free(&servers_cache);
...@@ -1150,6 +1186,51 @@ void servers_free(bool end) ...@@ -1150,6 +1186,51 @@ void servers_free(bool end)
} }
/*
SYNOPSIS
clone_server(MEM_ROOT *mem_root, FOREIGN_SERVER *orig, FOREIGN_SERVER *buff)
Create a clone of FOREIGN_SERVER. If the supplied mem_root is of
thd->mem_root then the copy is automatically disposed at end of statement.
NOTES
ARGS
MEM_ROOT pointer (strings are copied into this mem root)
FOREIGN_SERVER pointer (made a copy of)
FOREIGN_SERVER buffer (if not-NULL, this pointer is returned)
RETURN VALUE
FOREIGN_SEVER pointer (copy of one supplied FOREIGN_SERVER)
*/
static FOREIGN_SERVER *clone_server(MEM_ROOT *mem, const FOREIGN_SERVER *server,
FOREIGN_SERVER *buffer)
{
DBUG_ENTER("sql_server.cc:clone_server");
if (!buffer)
buffer= (FOREIGN_SERVER *) alloc_root(mem, sizeof(FOREIGN_SERVER));
buffer->server_name= strmake_root(mem, server->server_name,
server->server_name_length);
buffer->port= server->port;
buffer->server_name_length= server->server_name_length;
/* TODO: We need to examine which of these can really be NULL */
buffer->db= server->db ? strdup_root(mem, server->db) : NULL;
buffer->scheme= server->scheme ? strdup_root(mem, server->scheme) : NULL;
buffer->username= server->username? strdup_root(mem, server->username): NULL;
buffer->password= server->password? strdup_root(mem, server->password): NULL;
buffer->socket= server->socket ? strdup_root(mem, server->socket) : NULL;
buffer->owner= server->owner ? strdup_root(mem, server->owner) : NULL;
buffer->host= server->host ? strdup_root(mem, server->host) : NULL;
DBUG_RETURN(buffer);
}
/* /*
SYNOPSIS SYNOPSIS
...@@ -1163,11 +1244,11 @@ void servers_free(bool end) ...@@ -1163,11 +1244,11 @@ void servers_free(bool end)
*/ */
FOREIGN_SERVER *get_server_by_name(const char *server_name) FOREIGN_SERVER *get_server_by_name(MEM_ROOT *mem, const char *server_name,
FOREIGN_SERVER *buff)
{ {
ulong error_num=0;
uint server_name_length; uint server_name_length;
FOREIGN_SERVER *server= 0; FOREIGN_SERVER *server;
DBUG_ENTER("get_server_by_name"); DBUG_ENTER("get_server_by_name");
DBUG_PRINT("info", ("server_name %s", server_name)); DBUG_PRINT("info", ("server_name %s", server_name));
...@@ -1176,7 +1257,6 @@ FOREIGN_SERVER *get_server_by_name(const char *server_name) ...@@ -1176,7 +1257,6 @@ FOREIGN_SERVER *get_server_by_name(const char *server_name)
if (! server_name || !strlen(server_name)) if (! server_name || !strlen(server_name))
{ {
DBUG_PRINT("info", ("server_name not defined!")); DBUG_PRINT("info", ("server_name not defined!"));
error_num= 1;
DBUG_RETURN((FOREIGN_SERVER *)NULL); DBUG_RETURN((FOREIGN_SERVER *)NULL);
} }
...@@ -1190,6 +1270,10 @@ FOREIGN_SERVER *get_server_by_name(const char *server_name) ...@@ -1190,6 +1270,10 @@ FOREIGN_SERVER *get_server_by_name(const char *server_name)
server_name, server_name_length)); server_name, server_name_length));
server= (FOREIGN_SERVER *) NULL; server= (FOREIGN_SERVER *) NULL;
} }
/* otherwise, make copy of server */
else
server= clone_server(mem, server, buff);
DBUG_PRINT("info", ("unlocking servers_cache")); DBUG_PRINT("info", ("unlocking servers_cache"));
rw_unlock(&THR_LOCK_servers); rw_unlock(&THR_LOCK_servers);
DBUG_RETURN(server); DBUG_RETURN(server);
......
...@@ -39,4 +39,5 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options); ...@@ -39,4 +39,5 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options);
int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options); int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options);
/* lookup functions */ /* lookup functions */
FOREIGN_SERVER *get_server_by_name(const char *server_name); FOREIGN_SERVER *get_server_by_name(MEM_ROOT *mem, const char *server_name,
FOREIGN_SERVER *server_buffer);
...@@ -43,23 +43,55 @@ ...@@ -43,23 +43,55 @@
The create table will simply create the .frm file, and within the The create table will simply create the .frm file, and within the
"CREATE TABLE" SQL, there SHALL be any of the following : "CREATE TABLE" SQL, there SHALL be any of the following :
comment=scheme://username:password@hostname:port/database/tablename connection=scheme://username:password@hostname:port/database/tablename
comment=scheme://username@hostname/database/tablename connection=scheme://username@hostname/database/tablename
comment=scheme://username:password@hostname/database/tablename connection=scheme://username:password@hostname/database/tablename
comment=scheme://username:password@hostname/database/tablename connection=scheme://username:password@hostname/database/tablename
- OR -
As of 5.1 (See worklog #3031), federated now allows you to use a non-url
format, taking advantage of mysql.servers:
connection="connection_one"
connection="connection_one/table_foo"
An example would be: An example would be:
comment=mysql://username:password@hostname:port/database/tablename connection=mysql://username:password@hostname:port/database/tablename
***IMPORTANT*** or, if we had:
create server 'server_one' foreign data wrapper 'mysql' options
(HOST '127.0.0.1',
DATABASE 'db1',
USER 'root',
PASSWORD '',
PORT 3306,
SOCKET '',
OWNER 'root');
CREATE TABLE federated.t1 (
`id` int(20) NOT NULL,
`name` varchar(64) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='server_one';
So, this will have been the equivalent of
CONNECTION="mysql://root@127.0.0.1:3306/db1/t1"
This is a first release, conceptual release Then, we can also change the server to point to a new schema:
Only 'mysql://' is supported at this release.
ALTER SERVER 'server_one' options(DATABASE 'db2');
This comment connection string is necessary for the handler to be All subsequent calls will now be against db2.t1! Guess what? You don't
able to connect to the foreign server. have to perform an alter table!
This connecton="connection string" is necessary for the handler to be
able to connect to the foreign server, either by URL, or by server
name.
The basic flow is this: The basic flow is this:
...@@ -166,7 +198,7 @@ ...@@ -166,7 +198,7 @@
KEY other_key (other)) KEY other_key (other))
ENGINE="FEDERATED" ENGINE="FEDERATED"
DEFAULT CHARSET=latin1 DEFAULT CHARSET=latin1
COMMENT='root@127.0.0.1:9306/federated/test_federated'; CONNECTION='mysql://root@127.0.0.1:9306/federated/test_federated';
Notice the "COMMENT" and "ENGINE" field? This is where you Notice the "COMMENT" and "ENGINE" field? This is where you
respectively set the engine type, "FEDERATED" and foreign respectively set the engine type, "FEDERATED" and foreign
...@@ -263,7 +295,7 @@ ...@@ -263,7 +295,7 @@
To run these tests, go into ./mysql-test (based in the directory you To run these tests, go into ./mysql-test (based in the directory you
built the server in) built the server in)
./mysql-test-run federatedd ./mysql-test-run federated
To run the test, or if you want to run the test and have debug info: To run the test, or if you want to run the test and have debug info:
...@@ -311,7 +343,7 @@ ...@@ -311,7 +343,7 @@
------------- -------------
These were the files that were modified or created for this These were the files that were modified or created for this
Federated handler to work: Federated handler to work, in 5.0:
./configure.in ./configure.in
./sql/Makefile.am ./sql/Makefile.am
...@@ -329,6 +361,13 @@ ...@@ -329,6 +361,13 @@
./sql/ha_federated.cc ./sql/ha_federated.cc
./sql/ha_federated.h ./sql/ha_federated.h
In 5.1
my:~/mysql-build/mysql-5.1-bkbits patg$ ls storage/federated/
CMakeLists.txt Makefile.in ha_federated.h plug.in
Makefile SCCS libfederated.a
Makefile.am ha_federated.cc libfederated_a-ha_federated.o
*/ */
...@@ -547,42 +586,39 @@ static int parse_url_error(FEDERATED_SHARE *share, TABLE *table, int error_num) ...@@ -547,42 +586,39 @@ static int parse_url_error(FEDERATED_SHARE *share, TABLE *table, int error_num)
int buf_len; int buf_len;
DBUG_ENTER("ha_federated parse_url_error"); DBUG_ENTER("ha_federated parse_url_error");
if (share->connection_string)
{
DBUG_PRINT("info",
("error: parse_url. Returning error code %d \
freeing share->connection_string %lx",
error_num, (long unsigned int) share->connection_string));
my_free((gptr) share->connection_string, MYF(0));
share->connection_string= 0;
}
buf_len= min(table->s->connect_string.length, buf_len= min(table->s->connect_string.length,
FEDERATED_QUERY_BUFFER_SIZE-1); FEDERATED_QUERY_BUFFER_SIZE-1);
strmake(buf, table->s->connect_string.str, buf_len); strmake(buf, table->s->connect_string.str, buf_len);
my_error(error_num, MYF(0), buf); my_error(error_num, MYF(0), buf);
DBUG_RETURN(error_num); DBUG_RETURN(error_num);
} }
/* /*
retrieve server object which contains server meta-data retrieve server object which contains server meta-data
from the system table given a server's name, set share from the system table given a server's name, set share
connection parameter members connection parameter members
*/ */
int get_connection(FEDERATED_SHARE *share) int get_connection(MEM_ROOT *mem_root, FEDERATED_SHARE *share)
{ {
int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST; int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST;
char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
FOREIGN_SERVER *server; FOREIGN_SERVER *server, server_buffer;
DBUG_ENTER("ha_federated::get_connection"); DBUG_ENTER("ha_federated::get_connection");
/*
get_server_by_name() clones the server if exists and allocates
copies of strings in the supplied mem_root
*/
if (!(server= if (!(server=
get_server_by_name(share->connection_string))) get_server_by_name(mem_root, share->connection_string, &server_buffer)))
{ {
DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!")); DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!"));
/* need to come up with error handling */ /* need to come up with error handling */
error_num=1; error_num=1;
goto error; goto error;
} }
DBUG_PRINT("info", ("get_server_by_name returned server at %lx", (long unsigned int) server)); DBUG_PRINT("info", ("get_server_by_name returned server at %lx",
(long unsigned int) server));
/* /*
Most of these should never be empty strings, error handling will Most of these should never be empty strings, error handling will
...@@ -591,29 +627,22 @@ int get_connection(FEDERATED_SHARE *share) ...@@ -591,29 +627,22 @@ int get_connection(FEDERATED_SHARE *share)
except there are errors in the trace file of the share being overrun except there are errors in the trace file of the share being overrun
at the address of the share. at the address of the share.
*/ */
if (server->server_name) share->server_name_length= server->server_name_length;
share->server_name= server->server_name; share->server_name= server->server_name;
share->server_name_length= server->server_name_length ?
server->server_name_length : 0;
if (server->username)
share->username= server->username; share->username= server->username;
if (server->password)
share->password= server->password; share->password= server->password;
if (server->db)
share->database= server->db; share->database= server->db;
#ifndef I_AM_PARANOID
share->port= server->port ? (ushort) server->port : MYSQL_PORT; share->port= server->port > 0 && server->port < 65536 ?
#else
if (server->host) share->port= server->port > 1023 && server->port < 65536 ?
#endif
(ushort) server->port : MYSQL_PORT;
share->hostname= server->host; share->hostname= server->host;
if (server->socket) if (!(share->socket= server->socket) &&
share->socket= server->socket; !strcmp(share->hostname, my_localhost))
else if (strcmp(share->hostname, my_localhost) == 0) share->socket= (char *) MYSQL_UNIX_ADDR;
share->socket= my_strdup(MYSQL_UNIX_ADDR, MYF(0));
if (server->scheme)
share->scheme= server->scheme; share->scheme= server->scheme;
else
share->scheme= NULL;
DBUG_PRINT("info", ("share->username %s", share->username)); DBUG_PRINT("info", ("share->username %s", share->username));
DBUG_PRINT("info", ("share->password %s", share->password)); DBUG_PRINT("info", ("share->password %s", share->password));
...@@ -636,6 +665,7 @@ error: ...@@ -636,6 +665,7 @@ error:
SYNOPSIS SYNOPSIS
parse_url() parse_url()
mem_root MEM_ROOT pointer for memory allocation
share pointer to FEDERATED share share pointer to FEDERATED share
table pointer to current TABLE class table pointer to current TABLE class
table_create_flag determines what error to throw table_create_flag determines what error to throw
...@@ -685,7 +715,7 @@ error: ...@@ -685,7 +715,7 @@ error:
*/ */
static int parse_url(FEDERATED_SHARE *share, TABLE *table, static int parse_url(MEM_ROOT *mem_root, FEDERATED_SHARE *share, TABLE *table,
uint table_create_flag) uint table_create_flag)
{ {
uint error_num= (table_create_flag ? uint error_num= (table_create_flag ?
...@@ -699,20 +729,19 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table, ...@@ -699,20 +729,19 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table,
DBUG_PRINT("info", ("Length: %d", table->s->connect_string.length)); DBUG_PRINT("info", ("Length: %d", table->s->connect_string.length));
DBUG_PRINT("info", ("String: '%.*s'", table->s->connect_string.length, DBUG_PRINT("info", ("String: '%.*s'", table->s->connect_string.length,
table->s->connect_string.str)); table->s->connect_string.str));
share->connection_string= my_strndup(table->s->connect_string.str, share->connection_string= strmake_root(mem_root, table->s->connect_string.str,
table->s->connect_string.length, table->s->connect_string.length);
MYF(0));
// Add a null for later termination of table name
share->connection_string[table->s->connect_string.length]= 0;
DBUG_PRINT("info",("parse_url alloced share->connection_string %lx", DBUG_PRINT("info",("parse_url alloced share->connection_string %lx",
(long unsigned int) share->connection_string)); (long unsigned int) share->connection_string));
DBUG_PRINT("info",("share->connection_string %s",share->connection_string)); DBUG_PRINT("info",("share->connection_string %s",share->connection_string));
/* No delimiters, must be a straight connection name */ /*
if ( (!strchr(share->connection_string, '/')) && No :// or @ in connection string. Must be a straight connection name of
(!strchr(share->connection_string, '@')) && either "servername" or "servername/tablename"
(!strchr(share->connection_string, ';'))) */
if ( (!strstr(share->connection_string, "://") &&
(!strchr(share->connection_string, '@'))))
{ {
DBUG_PRINT("info", DBUG_PRINT("info",
...@@ -721,17 +750,51 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table, ...@@ -721,17 +750,51 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table,
share->connection_string, share->connection_string,
(long unsigned int) share->connection_string)); (long unsigned int) share->connection_string));
/* ok, so we do a little parsing, but not completely! */
share->parsed= FALSE; share->parsed= FALSE;
if ((error_num= get_connection(share))) /*
If there is a single '/' in the connection string, this means the user is
specifying a table name
*/
if ((share->table_name= strchr(share->connection_string, '/')))
{
share->connection_string[share->table_name - share->connection_string]= '\0';
share->table_name++;
share->table_name_length= strlen(share->table_name);
DBUG_PRINT("info",
("internal format, parsed table_name share->connection_string \
%s share->table_name %s",
share->connection_string, share->table_name));
/*
there better not be any more '/'s !
*/
if (strchr(share->table_name, '/'))
goto error; goto error;
}
/*
otherwise, straight server name, use tablename of federated table
as remote table name
*/
else
{
/* /*
connection specifies everything but, resort to connection specifies everything but, resort to
expecting remote and foreign table names to match expecting remote and foreign table names to match
*/ */
share->table_name= table->s->table_name.str; share->table_name= strmake_root(mem_root, table->s->table_name.str,
share->table_name_length= table->s->table_name.length; (share->table_name_length= table->s->table_name.length));
share->table_name[share->table_name_length]= '\0'; DBUG_PRINT("info",
("internal format, default table_name share->connection_string \
%s share->table_name %s",
share->connection_string, share->table_name));
}
if ((error_num= get_connection(mem_root, share)))
goto error;
} }
else else
{ {
...@@ -817,7 +880,7 @@ Then password is a null string, so set to NULL ...@@ -817,7 +880,7 @@ Then password is a null string, so set to NULL
if (!share->port) if (!share->port)
{ {
if (strcmp(share->hostname, my_localhost) == 0) if (strcmp(share->hostname, my_localhost) == 0)
share->socket= my_strdup(MYSQL_UNIX_ADDR, MYF(0)); share->socket= (char *) MYSQL_UNIX_ADDR;
else else
share->port= MYSQL_PORT; share->port= MYSQL_PORT;
} }
...@@ -1421,22 +1484,26 @@ err: ...@@ -1421,22 +1484,26 @@ err:
static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
{ {
char *select_query;
char query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
Field **field; Field **field;
String query(query_buffer, sizeof(query_buffer), &my_charset_bin); String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
FEDERATED_SHARE *share= NULL, tmp_share; FEDERATED_SHARE *share= NULL, tmp_share;
MEM_ROOT mem_root;
DBUG_ENTER("ha_federated.cc::get_share");
/* /*
In order to use this string, we must first zero it's length, In order to use this string, we must first zero it's length,
or it will contain garbage or it will contain garbage
*/ */
query.length(0); query.length(0);
init_alloc_root(&mem_root, 256, 0);
pthread_mutex_lock(&federated_mutex); pthread_mutex_lock(&federated_mutex);
tmp_share.share_key= table_name; tmp_share.share_key= table_name;
tmp_share.share_key_length= strlen(table_name); tmp_share.share_key_length= strlen(table_name);
if (parse_url(&tmp_share, table, 0)) if (parse_url(&mem_root, &tmp_share, table, 0))
goto error; goto error;
/* TODO: change tmp_share.scheme to LEX_STRING object */ /* TODO: change tmp_share.scheme to LEX_STRING object */
...@@ -1457,24 +1524,17 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) ...@@ -1457,24 +1524,17 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
query.length(query.length() - sizeof_trailing_comma); query.length(query.length() - sizeof_trailing_comma);
query.append(STRING_WITH_LEN(" FROM `")); query.append(STRING_WITH_LEN(" FROM `"));
query.append(tmp_share.table_name, tmp_share.table_name_length);
query.append(STRING_WITH_LEN("`"));
DBUG_PRINT("info", ("calling alloc_root"));
if (!(share= (FEDERATED_SHARE *) if (!(share= (FEDERATED_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) ||
my_multi_malloc(MYF(MY_WME), !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length())))
&share, sizeof(*share),
&select_query,
query.length()+table->s->connect_string.length+1,
NullS)))
goto error; goto error;
memcpy(share, &tmp_share, sizeof(tmp_share));
share->table_name_length= strlen(share->table_name);
/* TODO: share->table_name to LEX_STRING object */
query.append(share->table_name, share->table_name_length);
query.append(STRING_WITH_LEN("`"));
share->select_query= select_query;
strmov(share->select_query, query.ptr());
share->use_count= 0; share->use_count= 0;
share->mem_root= mem_root;
DBUG_PRINT("info", DBUG_PRINT("info",
("share->select_query %s", share->select_query)); ("share->select_query %s", share->select_query));
...@@ -1483,17 +1543,18 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) ...@@ -1483,17 +1543,18 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
thr_lock_init(&share->lock); thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST); pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
} }
else
free_root(&mem_root, MYF(0)); /* prevents memory leak */
share->use_count++; share->use_count++;
pthread_mutex_unlock(&federated_mutex); pthread_mutex_unlock(&federated_mutex);
return share; DBUG_RETURN(share);
error: error:
pthread_mutex_unlock(&federated_mutex); pthread_mutex_unlock(&federated_mutex);
my_free((gptr) tmp_share.connection_string, MYF(MY_ALLOW_ZERO_PTR)); free_root(&mem_root, MYF(0));
tmp_share.connection_string= 0; DBUG_RETURN(NULL);
my_free((gptr) share, MYF(MY_ALLOW_ZERO_PTR));
return NULL;
} }
...@@ -1505,23 +1566,16 @@ error: ...@@ -1505,23 +1566,16 @@ error:
static int free_share(FEDERATED_SHARE *share) static int free_share(FEDERATED_SHARE *share)
{ {
MEM_ROOT mem_root= share->mem_root;
DBUG_ENTER("free_share"); DBUG_ENTER("free_share");
pthread_mutex_lock(&federated_mutex); pthread_mutex_lock(&federated_mutex);
if (!--share->use_count) if (!--share->use_count)
{ {
hash_delete(&federated_open_tables, (byte*) share); hash_delete(&federated_open_tables, (byte*) share);
if (share->parsed)
my_free((gptr) share->socket, MYF(MY_ALLOW_ZERO_PTR));
/*if (share->connection_string)
{
*/
my_free((gptr) share->connection_string, MYF(MY_ALLOW_ZERO_PTR));
share->connection_string= 0;
/*}*/
thr_lock_delete(&share->lock); thr_lock_delete(&share->lock);
VOID(pthread_mutex_destroy(&share->mutex)); VOID(pthread_mutex_destroy(&share->mutex));
my_free((gptr) share, MYF(0)); free_root(&mem_root, MYF(0));
} }
pthread_mutex_unlock(&federated_mutex); pthread_mutex_unlock(&federated_mutex);
...@@ -1590,6 +1644,8 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) ...@@ -1590,6 +1644,8 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked)
mysql_options(mysql,MYSQL_SET_CHARSET_NAME, mysql_options(mysql,MYSQL_SET_CHARSET_NAME,
this->table->s->table_charset->csname); this->table->s->table_charset->csname);
DBUG_PRINT("info", ("calling mysql_real_connect hostname %s user %s",
share->hostname, share->username));
if (!mysql || !mysql_real_connect(mysql, if (!mysql || !mysql_real_connect(mysql,
share->hostname, share->hostname,
share->username, share->username,
...@@ -2832,15 +2888,13 @@ int ha_federated::create(const char *name, TABLE *table_arg, ...@@ -2832,15 +2888,13 @@ int ha_federated::create(const char *name, TABLE *table_arg,
HA_CREATE_INFO *create_info) HA_CREATE_INFO *create_info)
{ {
int retval; int retval;
THD *thd= current_thd;
FEDERATED_SHARE tmp_share; // Only a temporary share, to test the url FEDERATED_SHARE tmp_share; // Only a temporary share, to test the url
DBUG_ENTER("ha_federated::create"); DBUG_ENTER("ha_federated::create");
if (!(retval= parse_url(&tmp_share, table_arg, 1))) if (!(retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1)))
retval= check_foreign_data_source(&tmp_share, 1); retval= check_foreign_data_source(&tmp_share, 1);
/* free this because strdup created it in parse_url */
my_free((gptr) tmp_share.connection_string, MYF(MY_ALLOW_ZERO_PTR));
tmp_share.connection_string= 0;
DBUG_RETURN(retval); DBUG_RETURN(retval);
} }
......
...@@ -43,6 +43,8 @@ ...@@ -43,6 +43,8 @@
The example implements the minimum of what you will probably need. The example implements the minimum of what you will probably need.
*/ */
typedef struct st_federated_share { typedef struct st_federated_share {
MEM_ROOT mem_root;
bool parsed; bool parsed;
/* this key is unique db/tablename */ /* this key is unique db/tablename */
const char *share_key; const char *share_key;
...@@ -67,6 +69,7 @@ typedef struct st_federated_share { ...@@ -67,6 +69,7 @@ typedef struct st_federated_share {
char *sport; char *sport;
int share_key_length; int share_key_length;
ushort port; ushort port;
uint table_name_length, server_name_length, connect_string_length, use_count; uint table_name_length, server_name_length, connect_string_length, use_count;
pthread_mutex_t mutex; pthread_mutex_t mutex;
THR_LOCK lock; THR_LOCK lock;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment