Commit cb5b56c8 authored by unknown's avatar unknown

Bug#25721

  "Concurrent ALTER/CREATE SERVER can lead to deadlock"
  Deadlock caused by inconsistant use of mutexes in sql_server.cc
  One mutex has been removed to resolve deadlock.
  Many functions were made private which should not be exported.
  Unused variables and function removed.


mysql-test/r/federated_server.result:
  Bug 25721 "Concurrent ALTER/CREATE SERVER can lead to deadlock"
  
  New test results
mysql-test/t/federated_server.test:
  Bug 25721 "Concurrent ALTER/CREATE SERVER can lead to deadlock"
  
  Test for bug by using stored procedure. Unpatched server would deadlock frequently.
sql/sql_parse.cc:
  Bug 25721 "Concurrent ALTER/CREATE SERVER can lead to deadlock"
  
  check for correct error code when dropping server
sql/sql_servers.cc:
  Bug 25721 "Concurrent ALTER/CREATE SERVER can lead to deadlock"
  
  Removed unneccessary mutex, only need THR_LOCK_servers rwlock to
  guard data structures against race conditions. Misuse of other mutex
  caused deadlock by inconsistant ordering of mutex lock operations.
  Alter order of some operations to hit memory before disk.
  Removed unused function.
  Removed servers_version and servers_cache_initialised variables.
  Made many internal functions static.
sql/sql_servers.h:
  Bug 25721 "Concurrent ALTER/CREATE SERVER can lead to deadlock"
  
  remove internal functions from being exported.
parent 09e4d027
...@@ -189,6 +189,31 @@ drop user guest_select@localhost; ...@@ -189,6 +189,31 @@ drop user guest_select@localhost;
drop table federated.t1; drop table federated.t1;
drop server 's1'; drop server 's1';
# End of 5.1 tests # End of 5.1 tests
use test;
create procedure p1 ()
begin
DECLARE v INT DEFAULT 0;
DECLARE e INT DEFAULT 0;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET e = e + 1;
WHILE v < 10000 do
CREATE SERVER s
FOREIGN DATA WRAPPER mysql
OPTIONS (USER 'Remote', HOST '192.168.1.106', DATABASE 'test');
ALTER SERVER s OPTIONS (USER 'Remote');
DROP SERVER s;
SET v = v + 1;
END WHILE;
SELECT e > 0;
END//
use test;
call p1();
call p1();
e > 0
1
e > 0
1
drop procedure p1;
drop server if exists s;
DROP TABLE IF EXISTS federated.t1; DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated; DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1; DROP TABLE IF EXISTS federated.t1;
......
...@@ -234,4 +234,39 @@ drop server 's1'; ...@@ -234,4 +234,39 @@ drop server 's1';
--echo # End of 5.1 tests --echo # End of 5.1 tests
#
# Bug#25721 - deadlock with ALTER/CREATE SERVER
#
connect (other,localhost,root,,);
connection master;
use test;
delimiter //;
create procedure p1 ()
begin
DECLARE v INT DEFAULT 0;
DECLARE e INT DEFAULT 0;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET e = e + 1;
WHILE v < 10000 do
CREATE SERVER s
FOREIGN DATA WRAPPER mysql
OPTIONS (USER 'Remote', HOST '192.168.1.106', DATABASE 'test');
ALTER SERVER s OPTIONS (USER 'Remote');
DROP SERVER s;
SET v = v + 1;
END WHILE;
SELECT e > 0;
END//
delimiter ;//
connection other;
use test;
send call p1();
connection master;
call p1();
connection other;
reap;
drop procedure p1;
drop server if exists s;
source include/federated_cleanup.inc; source include/federated_cleanup.inc;
...@@ -4320,7 +4320,7 @@ mysql_execute_command(THD *thd) ...@@ -4320,7 +4320,7 @@ mysql_execute_command(THD *thd)
if ((err_code= drop_server(thd, &lex->server_options))) if ((err_code= drop_server(thd, &lex->server_options)))
{ {
if (! lex->drop_if_exists && err_code == ER_FOREIGN_SERVER_EXISTS) if (! lex->drop_if_exists && err_code == ER_FOREIGN_SERVER_DOESNT_EXIST)
{ {
DBUG_PRINT("info", ("problem dropping server %s", DBUG_PRINT("info", ("problem dropping server %s",
lex->server_options.server_name)); lex->server_options.server_name));
......
...@@ -25,14 +25,43 @@ ...@@ -25,14 +25,43 @@
#include "sp_head.h" #include "sp_head.h"
#include "sp.h" #include "sp.h"
HASH servers_cache; /*
pthread_mutex_t servers_cache_mutex; // To init the hash We only use 1 mutex to guard the data structures - THR_LOCK_servers.
uint servers_cache_initialised=FALSE; Read locked when only reading data and write-locked for all other access.
/* Version of server table. incremented by servers_load */ */
static uint servers_version=0;
static HASH servers_cache;
static MEM_ROOT mem; static MEM_ROOT mem;
static rw_lock_t THR_LOCK_servers; static rw_lock_t THR_LOCK_servers;
static bool get_server_from_table_to_cache(TABLE *table);
/* insert functions */
static int insert_server(THD *thd, FOREIGN_SERVER *server_options);
static int insert_server_record(TABLE *table, FOREIGN_SERVER *server);
static int insert_server_record_into_cache(FOREIGN_SERVER *server);
static void prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options,
FOREIGN_SERVER *server);
/* drop functions */
static int delete_server_record(TABLE *table,
char *server_name,
int server_name_length);
static int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options);
/* update functions */
static void prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options,
FOREIGN_SERVER *existing,
FOREIGN_SERVER *altered);
static int update_server(THD *thd, FOREIGN_SERVER *existing,
FOREIGN_SERVER *altered);
static int update_server_record(TABLE *table, FOREIGN_SERVER *server);
static int update_server_record_in_cache(FOREIGN_SERVER *existing,
FOREIGN_SERVER *altered);
/* utility functions */
static void merge_server_struct(FOREIGN_SERVER *from, FOREIGN_SERVER *to);
static byte *servers_cache_get_key(FOREIGN_SERVER *server, uint *length, static byte *servers_cache_get_key(FOREIGN_SERVER *server, uint *length,
my_bool not_used __attribute__((unused))) my_bool not_used __attribute__((unused)))
{ {
...@@ -45,6 +74,7 @@ static byte *servers_cache_get_key(FOREIGN_SERVER *server, uint *length, ...@@ -45,6 +74,7 @@ static byte *servers_cache_get_key(FOREIGN_SERVER *server, uint *length,
DBUG_RETURN((byte*) server->server_name); DBUG_RETURN((byte*) server->server_name);
} }
/* /*
Initialize structures responsible for servers used in federated Initialize structures responsible for servers used in federated
server scheme information for them from the server server scheme information for them from the server
...@@ -64,35 +94,27 @@ static byte *servers_cache_get_key(FOREIGN_SERVER *server, uint *length, ...@@ -64,35 +94,27 @@ static byte *servers_cache_get_key(FOREIGN_SERVER *server, uint *length,
1 Could not initialize servers 1 Could not initialize servers
*/ */
my_bool servers_init(bool dont_read_servers_table) bool servers_init(bool dont_read_servers_table)
{ {
THD *thd; THD *thd;
my_bool return_val= 0; bool return_val= FALSE;
DBUG_ENTER("servers_init"); DBUG_ENTER("servers_init");
/* init the mutex */ /* init the mutex */
if (pthread_mutex_init(&servers_cache_mutex, MY_MUTEX_INIT_FAST))
DBUG_RETURN(1);
if (my_rwlock_init(&THR_LOCK_servers, NULL)) if (my_rwlock_init(&THR_LOCK_servers, NULL))
DBUG_RETURN(1); DBUG_RETURN(TRUE);
/* initialise our servers cache */ /* initialise our servers cache */
if (hash_init(&servers_cache, system_charset_info, 32, 0, 0, if (hash_init(&servers_cache, system_charset_info, 32, 0, 0,
(hash_get_key) servers_cache_get_key, 0, 0)) (hash_get_key) servers_cache_get_key, 0, 0))
{ {
return_val= 1; /* we failed, out of memory? */ return_val= TRUE; /* we failed, out of memory? */
goto end; goto end;
} }
/* Initialize the mem root for data */ /* Initialize the mem root for data */
init_alloc_root(&mem, ACL_ALLOC_BLOCK_SIZE, 0); init_alloc_root(&mem, ACL_ALLOC_BLOCK_SIZE, 0);
/*
at this point, the cache is initialised, let it be known
*/
servers_cache_initialised= TRUE;
if (dont_read_servers_table) if (dont_read_servers_table)
goto end; goto end;
...@@ -100,7 +122,7 @@ my_bool servers_init(bool dont_read_servers_table) ...@@ -100,7 +122,7 @@ my_bool servers_init(bool dont_read_servers_table)
To be able to run this from boot, we allocate a temporary THD To be able to run this from boot, we allocate a temporary THD
*/ */
if (!(thd=new THD)) if (!(thd=new THD))
DBUG_RETURN(1); DBUG_RETURN(TRUE);
thd->thread_stack= (char*) &thd; thd->thread_stack= (char*) &thd;
thd->store_globals(); thd->store_globals();
/* /*
...@@ -130,19 +152,13 @@ my_bool servers_init(bool dont_read_servers_table) ...@@ -130,19 +152,13 @@ my_bool servers_init(bool dont_read_servers_table)
TRUE Error TRUE Error
*/ */
static my_bool servers_load(THD *thd, TABLE_LIST *tables) static bool servers_load(THD *thd, TABLE_LIST *tables)
{ {
TABLE *table; TABLE *table;
READ_RECORD read_record_info; READ_RECORD read_record_info;
my_bool return_val= TRUE; bool return_val= TRUE;
DBUG_ENTER("servers_load"); DBUG_ENTER("servers_load");
if (!servers_cache_initialised)
DBUG_RETURN(0);
/* need to figure out how to utilise this variable */
servers_version++; /* servers updated */
/* first, send all cached rows to sleep with the fishes, oblivion! /* first, send all cached rows to sleep with the fishes, oblivion!
I expect this crappy comment replaced */ I expect this crappy comment replaced */
free_root(&mem, MYF(MY_MARK_BLOCKS_FREE)); free_root(&mem, MYF(MY_MARK_BLOCKS_FREE));
...@@ -156,7 +172,7 @@ static my_bool servers_load(THD *thd, TABLE_LIST *tables) ...@@ -156,7 +172,7 @@ static my_bool servers_load(THD *thd, TABLE_LIST *tables)
goto end; goto end;
} }
return_val=0; return_val= FALSE;
end: end:
end_read_record(&read_record_info); end_read_record(&read_record_info);
...@@ -183,10 +199,10 @@ static my_bool servers_load(THD *thd, TABLE_LIST *tables) ...@@ -183,10 +199,10 @@ static my_bool servers_load(THD *thd, TABLE_LIST *tables)
TRUE Failure TRUE Failure
*/ */
my_bool servers_reload(THD *thd) bool servers_reload(THD *thd)
{ {
TABLE_LIST tables[1]; TABLE_LIST tables[1];
my_bool return_val= 1; bool return_val= TRUE;
DBUG_ENTER("servers_reload"); DBUG_ENTER("servers_reload");
if (thd->locked_tables) if (thd->locked_tables)
...@@ -196,10 +212,9 @@ my_bool servers_reload(THD *thd) ...@@ -196,10 +212,9 @@ my_bool servers_reload(THD *thd)
close_thread_tables(thd); close_thread_tables(thd);
} }
/* DBUG_PRINT("info", ("locking servers_cache"));
To avoid deadlocks we should obtain table locks before rw_wrlock(&THR_LOCK_servers);
obtaining servers_cache->lock mutex.
*/
bzero((char*) tables, sizeof(tables)); bzero((char*) tables, sizeof(tables));
tables[0].alias= tables[0].table_name= (char*) "servers"; tables[0].alias= tables[0].table_name= (char*) "servers";
tables[0].db= (char*) "mysql"; tables[0].db= (char*) "mysql";
...@@ -212,12 +227,6 @@ my_bool servers_reload(THD *thd) ...@@ -212,12 +227,6 @@ my_bool servers_reload(THD *thd)
goto end; goto end;
} }
DBUG_PRINT("info", ("locking servers_cache"));
VOID(pthread_mutex_lock(&servers_cache_mutex));
//old_servers_cache= servers_cache;
//old_mem=mem;
if ((return_val= servers_load(thd, tables))) if ((return_val= servers_load(thd, tables)))
{ // Error. Revert to old list { // Error. Revert to old list
/* blast, for now, we have no servers, discuss later way to preserve */ /* blast, for now, we have no servers, discuss later way to preserve */
...@@ -226,14 +235,14 @@ my_bool servers_reload(THD *thd) ...@@ -226,14 +235,14 @@ my_bool servers_reload(THD *thd)
servers_free(); servers_free();
} }
DBUG_PRINT("info", ("unlocking servers_cache"));
VOID(pthread_mutex_unlock(&servers_cache_mutex));
end: end:
close_thread_tables(thd); close_thread_tables(thd);
DBUG_PRINT("info", ("unlocking servers_cache"));
rw_unlock(&THR_LOCK_servers);
DBUG_RETURN(return_val); DBUG_RETURN(return_val);
} }
/* /*
Initialize structures responsible for servers used in federated Initialize structures responsible for servers used in federated
server scheme information for them from the server server scheme information for them from the server
...@@ -260,7 +269,8 @@ my_bool servers_reload(THD *thd) ...@@ -260,7 +269,8 @@ my_bool servers_reload(THD *thd)
1 could not insert server struct into global servers cache 1 could not insert server struct into global servers cache
*/ */
my_bool get_server_from_table_to_cache(TABLE *table) static bool
get_server_from_table_to_cache(TABLE *table)
{ {
/* alloc a server struct */ /* alloc a server struct */
char *ptr; char *ptr;
...@@ -308,69 +318,6 @@ my_bool get_server_from_table_to_cache(TABLE *table) ...@@ -308,69 +318,6 @@ my_bool get_server_from_table_to_cache(TABLE *table)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
/*
SYNOPSIS
server_exists_in_table()
THD *thd - thread pointer
LEX_SERVER_OPTIONS *server_options - pointer to Lex->server_options
NOTES
This function takes a LEX_SERVER_OPTIONS struct, which is very much the
same type of structure as a FOREIGN_SERVER, it contains the values parsed
in any one of the [CREATE|DELETE|DROP] SERVER statements. Using the
member "server_name", index_read_idx either founds the record and returns
1, or doesn't find the record, and returns 0
RETURN VALUES
0 record not found
1 record found
*/
my_bool server_exists_in_table(THD *thd, LEX_SERVER_OPTIONS *server_options)
{
int result= 1;
int error= 0;
TABLE_LIST tables;
TABLE *table;
DBUG_ENTER("server_exists");
bzero((char*) &tables, sizeof(tables));
tables.db= (char*) "mysql";
tables.alias= tables.table_name= (char*) "servers";
/* need to open before acquiring THR_LOCK_plugin or it will deadlock */
if (! (table= open_ltable(thd, &tables, TL_WRITE)))
DBUG_RETURN(TRUE);
table->use_all_columns();
rw_wrlock(&THR_LOCK_servers);
VOID(pthread_mutex_lock(&servers_cache_mutex));
/* set the field that's the PK to the value we're looking for */
table->field[0]->store(server_options->server_name,
server_options->server_name_length,
system_charset_info);
if ((error= table->file->index_read_idx(table->record[0], 0,
(byte *)table->field[0]->ptr,
table->key_info[0].key_length,
HA_READ_KEY_EXACT)))
{
if (error != HA_ERR_KEY_NOT_FOUND && error != HA_ERR_END_OF_FILE)
{
table->file->print_error(error, MYF(0));
result= -1;
}
result= 0;
DBUG_PRINT("info",("record for server '%s' not found!",
server_options->server_name));
}
VOID(pthread_mutex_unlock(&servers_cache_mutex));
rw_unlock(&THR_LOCK_servers);
DBUG_RETURN(result);
}
/* /*
SYNOPSIS SYNOPSIS
...@@ -382,15 +329,18 @@ my_bool server_exists_in_table(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -382,15 +329,18 @@ my_bool server_exists_in_table(THD *thd, LEX_SERVER_OPTIONS *server_options)
This function takes a server object that is has all members properly This function takes a server object that is has all members properly
prepared, ready to be inserted both into the mysql.servers table and prepared, ready to be inserted both into the mysql.servers table and
the servers cache. the servers cache.
THR_LOCK_servers must be write locked.
RETURN VALUES RETURN VALUES
0 - no error 0 - no error
other - error code other - error code
*/ */
int insert_server(THD *thd, FOREIGN_SERVER *server) static int
insert_server(THD *thd, FOREIGN_SERVER *server)
{ {
int error= 0; int error= -1;
TABLE_LIST tables; TABLE_LIST tables;
TABLE *table; TABLE *table;
...@@ -402,13 +352,7 @@ int insert_server(THD *thd, FOREIGN_SERVER *server) ...@@ -402,13 +352,7 @@ int insert_server(THD *thd, FOREIGN_SERVER *server)
/* need to open before acquiring THR_LOCK_plugin or it will deadlock */ /* need to open before acquiring THR_LOCK_plugin or it will deadlock */
if (! (table= open_ltable(thd, &tables, TL_WRITE))) if (! (table= open_ltable(thd, &tables, TL_WRITE)))
DBUG_RETURN(TRUE); goto end;
/* lock mutex to make sure no changes happen */
VOID(pthread_mutex_lock(&servers_cache_mutex));
/* lock table */
rw_wrlock(&THR_LOCK_servers);
/* insert the server into the table */ /* insert the server into the table */
if ((error= insert_server_record(table, server))) if ((error= insert_server_record(table, server)))
...@@ -419,12 +363,10 @@ int insert_server(THD *thd, FOREIGN_SERVER *server) ...@@ -419,12 +363,10 @@ int insert_server(THD *thd, FOREIGN_SERVER *server)
goto end; goto end;
end: end:
/* unlock the table */
rw_unlock(&THR_LOCK_servers);
VOID(pthread_mutex_unlock(&servers_cache_mutex));
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /*
SYNOPSIS SYNOPSIS
int insert_server_record_into_cache() int insert_server_record_into_cache()
...@@ -434,13 +376,16 @@ int insert_server(THD *thd, FOREIGN_SERVER *server) ...@@ -434,13 +376,16 @@ int insert_server(THD *thd, FOREIGN_SERVER *server)
This function takes a FOREIGN_SERVER pointer to an allocated (root mem) This function takes a FOREIGN_SERVER pointer to an allocated (root mem)
and inserts it into the global servers cache and inserts it into the global servers cache
THR_LOCK_servers must be write locked.
RETURN VALUE RETURN VALUE
0 - no error 0 - no error
>0 - error code >0 - error code
*/ */
int insert_server_record_into_cache(FOREIGN_SERVER *server) static int
insert_server_record_into_cache(FOREIGN_SERVER *server)
{ {
int error=0; int error=0;
DBUG_ENTER("insert_server_record_into_cache"); DBUG_ENTER("insert_server_record_into_cache");
...@@ -461,6 +406,7 @@ int insert_server_record_into_cache(FOREIGN_SERVER *server) ...@@ -461,6 +406,7 @@ int insert_server_record_into_cache(FOREIGN_SERVER *server)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /*
SYNOPSIS SYNOPSIS
store_server_fields() store_server_fields()
...@@ -478,7 +424,8 @@ int insert_server_record_into_cache(FOREIGN_SERVER *server) ...@@ -478,7 +424,8 @@ int insert_server_record_into_cache(FOREIGN_SERVER *server)
*/ */
void store_server_fields(TABLE *table, FOREIGN_SERVER *server) static void
store_server_fields(TABLE *table, FOREIGN_SERVER *server)
{ {
table->use_all_columns(); table->use_all_columns();
...@@ -539,6 +486,7 @@ void store_server_fields(TABLE *table, FOREIGN_SERVER *server) ...@@ -539,6 +486,7 @@ void store_server_fields(TABLE *table, FOREIGN_SERVER *server)
*/ */
static
int insert_server_record(TABLE *table, FOREIGN_SERVER *server) int insert_server_record(TABLE *table, FOREIGN_SERVER *server)
{ {
int error; int error;
...@@ -606,7 +554,7 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server) ...@@ -606,7 +554,7 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server)
int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options) int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
{ {
int error= 0; int error;
TABLE_LIST tables; TABLE_LIST tables;
TABLE *table; TABLE *table;
...@@ -618,28 +566,33 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -618,28 +566,33 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
tables.db= (char*) "mysql"; tables.db= (char*) "mysql";
tables.alias= tables.table_name= (char*) "servers"; tables.alias= tables.table_name= (char*) "servers";
/* need to open before acquiring THR_LOCK_plugin or it will deadlock */
if (! (table= open_ltable(thd, &tables, TL_WRITE)))
DBUG_RETURN(TRUE);
rw_wrlock(&THR_LOCK_servers); rw_wrlock(&THR_LOCK_servers);
VOID(pthread_mutex_lock(&servers_cache_mutex));
/* hit the memory hit first */
if ((error= delete_server_record_in_cache(server_options)))
goto end;
if ((error= delete_server_record(table, if (! (table= open_ltable(thd, &tables, TL_WRITE)))
server_options->server_name, {
server_options->server_name_length))) error= my_errno;
goto end; goto end;
}
error= delete_server_record(table,
server_options->server_name,
server_options->server_name_length);
if ((error= delete_server_record_in_cache(server_options))) /*
goto end; Perform a reload so we don't have a 'hole' in our mem_root
*/
servers_load(thd, &tables);
end: end:
VOID(pthread_mutex_unlock(&servers_cache_mutex));
rw_unlock(&THR_LOCK_servers); rw_unlock(&THR_LOCK_servers);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /*
SYNOPSIS SYNOPSIS
...@@ -658,10 +611,10 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -658,10 +611,10 @@ int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
*/ */
int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options) static int
delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options)
{ {
int error= ER_FOREIGN_SERVER_DOESNT_EXIST;
int error= 0;
FOREIGN_SERVER *server; FOREIGN_SERVER *server;
DBUG_ENTER("delete_server_record_in_cache"); DBUG_ENTER("delete_server_record_in_cache");
...@@ -677,7 +630,7 @@ int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options) ...@@ -677,7 +630,7 @@ int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options)
DBUG_PRINT("info", ("server_name %s length %d not found!", DBUG_PRINT("info", ("server_name %s length %d not found!",
server_options->server_name, server_options->server_name,
server_options->server_name_length)); server_options->server_name_length));
// what should be done if not found in the cache? goto end;
} }
/* /*
We succeded in deletion of the server to the table, now delete We succeded in deletion of the server to the table, now delete
...@@ -687,14 +640,15 @@ int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options) ...@@ -687,14 +640,15 @@ int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options)
server->server_name, server->server_name,
server->server_name_length)); server->server_name_length));
if (server) VOID(hash_delete(&servers_cache, (byte*) server));
VOID(hash_delete(&servers_cache, (byte*) server));
error= 0;
servers_version++; /* servers updated */
end:
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /*
SYNOPSIS SYNOPSIS
...@@ -714,6 +668,8 @@ int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options) ...@@ -714,6 +668,8 @@ int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options)
table for the particular server via the call to update_server_record, table for the particular server via the call to update_server_record,
and in the servers_cache via update_server_record_in_cache. and in the servers_cache via update_server_record_in_cache.
THR_LOCK_servers must be write locked.
RETURN VALUE RETURN VALUE
0 - no error 0 - no error
>0 - error code >0 - error code
...@@ -722,7 +678,7 @@ int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options) ...@@ -722,7 +678,7 @@ int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options)
int update_server(THD *thd, FOREIGN_SERVER *existing, FOREIGN_SERVER *altered) int update_server(THD *thd, FOREIGN_SERVER *existing, FOREIGN_SERVER *altered)
{ {
int error= 0; int error;
TABLE *table; TABLE *table;
TABLE_LIST tables; TABLE_LIST tables;
DBUG_ENTER("update_server"); DBUG_ENTER("update_server");
...@@ -732,19 +688,26 @@ int update_server(THD *thd, FOREIGN_SERVER *existing, FOREIGN_SERVER *altered) ...@@ -732,19 +688,26 @@ int update_server(THD *thd, FOREIGN_SERVER *existing, FOREIGN_SERVER *altered)
tables.alias= tables.table_name= (char*)"servers"; tables.alias= tables.table_name= (char*)"servers";
if (!(table= open_ltable(thd, &tables, TL_WRITE))) if (!(table= open_ltable(thd, &tables, TL_WRITE)))
DBUG_RETURN(1); {
error= my_errno;
goto end;
}
rw_wrlock(&THR_LOCK_servers);
if ((error= update_server_record(table, altered))) if ((error= update_server_record(table, altered)))
goto end; goto end;
update_server_record_in_cache(existing, altered); error= update_server_record_in_cache(existing, altered);
/*
Perform a reload so we don't have a 'hole' in our mem_root
*/
servers_load(thd, &tables);
end: end:
rw_unlock(&THR_LOCK_servers);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /*
SYNOPSIS SYNOPSIS
...@@ -761,6 +724,8 @@ int update_server(THD *thd, FOREIGN_SERVER *existing, FOREIGN_SERVER *altered) ...@@ -761,6 +724,8 @@ int update_server(THD *thd, FOREIGN_SERVER *existing, FOREIGN_SERVER *altered)
HASH, then the updated record inserted, in essence replacing the old HASH, then the updated record inserted, in essence replacing the old
record. record.
THR_LOCK_servers must be write locked.
RETURN VALUE RETURN VALUE
0 - no error 0 - no error
1 - error 1 - error
...@@ -791,13 +756,13 @@ int update_server_record_in_cache(FOREIGN_SERVER *existing, ...@@ -791,13 +756,13 @@ int update_server_record_in_cache(FOREIGN_SERVER *existing,
{ {
DBUG_PRINT("info", ("had a problem inserting server %s at %lx", DBUG_PRINT("info", ("had a problem inserting server %s at %lx",
altered->server_name, (long unsigned int) altered)); altered->server_name, (long unsigned int) altered));
error= 1; error= ER_OUT_OF_RESOURCES;
} }
servers_version++; /* servers updated */
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /*
SYNOPSIS SYNOPSIS
...@@ -830,9 +795,9 @@ void merge_server_struct(FOREIGN_SERVER *from, FOREIGN_SERVER *to) ...@@ -830,9 +795,9 @@ void merge_server_struct(FOREIGN_SERVER *from, FOREIGN_SERVER *to)
to->password= strdup_root(&mem, from->password); to->password= strdup_root(&mem, from->password);
if (to->port == -1) if (to->port == -1)
to->port= from->port; to->port= from->port;
if (!to->socket) if (!to->socket && from->socket)
to->socket= strdup_root(&mem, from->socket); to->socket= strdup_root(&mem, from->socket);
if (!to->scheme) if (!to->scheme && from->scheme)
to->scheme= strdup_root(&mem, from->scheme); to->scheme= strdup_root(&mem, from->scheme);
if (!to->owner) if (!to->owner)
to->owner= strdup_root(&mem, from->owner); to->owner= strdup_root(&mem, from->owner);
...@@ -840,6 +805,7 @@ void merge_server_struct(FOREIGN_SERVER *from, FOREIGN_SERVER *to) ...@@ -840,6 +805,7 @@ void merge_server_struct(FOREIGN_SERVER *from, FOREIGN_SERVER *to)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* /*
SYNOPSIS SYNOPSIS
...@@ -862,7 +828,9 @@ void merge_server_struct(FOREIGN_SERVER *from, FOREIGN_SERVER *to) ...@@ -862,7 +828,9 @@ void merge_server_struct(FOREIGN_SERVER *from, FOREIGN_SERVER *to)
*/ */
int update_server_record(TABLE *table, FOREIGN_SERVER *server)
static int
update_server_record(TABLE *table, FOREIGN_SERVER *server)
{ {
int error=0; int error=0;
DBUG_ENTER("update_server_record"); DBUG_ENTER("update_server_record");
...@@ -878,10 +846,7 @@ int update_server_record(TABLE *table, FOREIGN_SERVER *server) ...@@ -878,10 +846,7 @@ int update_server_record(TABLE *table, FOREIGN_SERVER *server)
HA_READ_KEY_EXACT))) HA_READ_KEY_EXACT)))
{ {
if (error != HA_ERR_KEY_NOT_FOUND && error != HA_ERR_END_OF_FILE) if (error != HA_ERR_KEY_NOT_FOUND && error != HA_ERR_END_OF_FILE)
{
table->file->print_error(error, MYF(0)); table->file->print_error(error, MYF(0));
error= 1;
}
DBUG_PRINT("info",("server not found!")); DBUG_PRINT("info",("server not found!"));
error= ER_FOREIGN_SERVER_DOESNT_EXIST; error= ER_FOREIGN_SERVER_DOESNT_EXIST;
} }
...@@ -901,6 +866,7 @@ int update_server_record(TABLE *table, FOREIGN_SERVER *server) ...@@ -901,6 +866,7 @@ int update_server_record(TABLE *table, FOREIGN_SERVER *server)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /*
SYNOPSIS SYNOPSIS
...@@ -916,11 +882,11 @@ int update_server_record(TABLE *table, FOREIGN_SERVER *server) ...@@ -916,11 +882,11 @@ int update_server_record(TABLE *table, FOREIGN_SERVER *server)
*/ */
int delete_server_record(TABLE *table, static int
char *server_name, delete_server_record(TABLE *table,
int server_name_length) char *server_name, int server_name_length)
{ {
int error= 0; int error;
DBUG_ENTER("delete_server_record"); DBUG_ENTER("delete_server_record");
table->use_all_columns(); table->use_all_columns();
...@@ -933,10 +899,7 @@ int delete_server_record(TABLE *table, ...@@ -933,10 +899,7 @@ int delete_server_record(TABLE *table,
HA_READ_KEY_EXACT))) HA_READ_KEY_EXACT)))
{ {
if (error != HA_ERR_KEY_NOT_FOUND && error != HA_ERR_END_OF_FILE) if (error != HA_ERR_KEY_NOT_FOUND && error != HA_ERR_END_OF_FILE)
{
table->file->print_error(error, MYF(0)); table->file->print_error(error, MYF(0));
error= 1;
}
DBUG_PRINT("info",("server not found!")); DBUG_PRINT("info",("server not found!"));
error= ER_FOREIGN_SERVER_DOESNT_EXIST; error= ER_FOREIGN_SERVER_DOESNT_EXIST;
} }
...@@ -965,28 +928,35 @@ int delete_server_record(TABLE *table, ...@@ -965,28 +928,35 @@ int delete_server_record(TABLE *table,
int create_server(THD *thd, LEX_SERVER_OPTIONS *server_options) int create_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
{ {
int error; int error= ER_FOREIGN_SERVER_EXISTS;
FOREIGN_SERVER *server; FOREIGN_SERVER *server;
DBUG_ENTER("create_server"); DBUG_ENTER("create_server");
DBUG_PRINT("info", ("server_options->server_name %s", DBUG_PRINT("info", ("server_options->server_name %s",
server_options->server_name)); server_options->server_name));
rw_wrlock(&THR_LOCK_servers);
/* hit the memory first */
if (hash_search(&servers_cache, (byte*) server_options->server_name,
server_options->server_name_length))
goto end;
server= (FOREIGN_SERVER *)alloc_root(&mem, server= (FOREIGN_SERVER *)alloc_root(&mem,
sizeof(FOREIGN_SERVER)); sizeof(FOREIGN_SERVER));
if ((error= prepare_server_struct_for_insert(server_options, server))) prepare_server_struct_for_insert(server_options, server);
goto end;
if ((error= insert_server(thd, server))) error= insert_server(thd, server);
goto end;
DBUG_PRINT("info", ("error returned %d", error)); DBUG_PRINT("info", ("error returned %d", error));
end: end:
rw_unlock(&THR_LOCK_servers);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /*
SYNOPSIS SYNOPSIS
...@@ -1003,37 +973,33 @@ int create_server(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -1003,37 +973,33 @@ int create_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options) int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
{ {
int error= 0; int error= ER_FOREIGN_SERVER_DOESNT_EXIST;
FOREIGN_SERVER *altered, *existing; FOREIGN_SERVER *altered, *existing;
DBUG_ENTER("alter_server"); DBUG_ENTER("alter_server");
DBUG_PRINT("info", ("server_options->server_name %s", DBUG_PRINT("info", ("server_options->server_name %s",
server_options->server_name)); server_options->server_name));
altered= (FOREIGN_SERVER *)alloc_root(&mem, rw_wrlock(&THR_LOCK_servers);
sizeof(FOREIGN_SERVER));
VOID(pthread_mutex_lock(&servers_cache_mutex));
if (!(existing= (FOREIGN_SERVER *) hash_search(&servers_cache, if (!(existing= (FOREIGN_SERVER *) hash_search(&servers_cache,
(byte*) server_options->server_name, (byte*) server_options->server_name,
server_options->server_name_length))) server_options->server_name_length)))
{
error= ER_FOREIGN_SERVER_DOESNT_EXIST;
goto end; goto end;
}
if ((error= prepare_server_struct_for_update(server_options, existing, altered))) altered= (FOREIGN_SERVER *)alloc_root(&mem,
goto end; sizeof(FOREIGN_SERVER));
if ((error= update_server(thd, existing, altered))) prepare_server_struct_for_update(server_options, existing, altered);
goto end;
error= update_server(thd, existing, altered);
end: end:
DBUG_PRINT("info", ("error returned %d", error)); DBUG_PRINT("info", ("error returned %d", error));
VOID(pthread_mutex_unlock(&servers_cache_mutex)); rw_unlock(&THR_LOCK_servers);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /*
SYNOPSIS SYNOPSIS
...@@ -1044,19 +1010,17 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options) ...@@ -1044,19 +1010,17 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
NOTES NOTES
RETURN VALUE RETURN VALUE
0 - no error none
*/ */
int prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options, static void
FOREIGN_SERVER *server) prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options,
FOREIGN_SERVER *server)
{ {
int error;
char *unset_ptr= (char*)""; char *unset_ptr= (char*)"";
DBUG_ENTER("prepare_server_struct"); DBUG_ENTER("prepare_server_struct");
error= 0;
/* these two MUST be set */ /* these two MUST be set */
server->server_name= strdup_root(&mem, server_options->server_name); server->server_name= strdup_root(&mem, server_options->server_name);
server->server_name_length= server_options->server_name_length; server->server_name_length= server_options->server_name_length;
...@@ -1086,7 +1050,7 @@ int prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options, ...@@ -1086,7 +1050,7 @@ int prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options,
server->owner= server_options->owner ? server->owner= server_options->owner ?
strdup_root(&mem, server_options->owner) : unset_ptr; strdup_root(&mem, server_options->owner) : unset_ptr;
DBUG_RETURN(error); DBUG_VOID_RETURN;
} }
/* /*
...@@ -1102,13 +1066,12 @@ int prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options, ...@@ -1102,13 +1066,12 @@ int prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options,
*/ */
int prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options, static void
FOREIGN_SERVER *existing, prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options,
FOREIGN_SERVER *altered) FOREIGN_SERVER *existing,
FOREIGN_SERVER *altered)
{ {
int error;
DBUG_ENTER("prepare_server_struct_for_update"); DBUG_ENTER("prepare_server_struct_for_update");
error= 0;
altered->server_name= strdup_root(&mem, server_options->server_name); altered->server_name= strdup_root(&mem, server_options->server_name);
altered->server_name_length= server_options->server_name_length; altered->server_name_length= server_options->server_name_length;
...@@ -1159,7 +1122,7 @@ int prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options, ...@@ -1159,7 +1122,7 @@ int prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options,
(strcmp(server_options->owner, existing->owner))) ? (strcmp(server_options->owner, existing->owner))) ?
strdup_root(&mem, server_options->owner) : 0; strdup_root(&mem, server_options->owner) : 0;
DBUG_RETURN(error); DBUG_VOID_RETURN;
} }
/* /*
...@@ -1178,17 +1141,15 @@ int prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options, ...@@ -1178,17 +1141,15 @@ int prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options,
void servers_free(bool end) void servers_free(bool end)
{ {
DBUG_ENTER("servers_free"); DBUG_ENTER("servers_free");
if (!servers_cache_initialised) if (!hash_inited(&servers_cache))
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
VOID(pthread_mutex_destroy(&servers_cache_mutex)); rwlock_destroy(&THR_LOCK_servers);
servers_cache_initialised=0;
free_root(&mem,MYF(0)); free_root(&mem,MYF(0));
hash_free(&servers_cache); hash_free(&servers_cache);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* /*
SYNOPSIS SYNOPSIS
...@@ -1220,7 +1181,7 @@ FOREIGN_SERVER *get_server_by_name(const char *server_name) ...@@ -1220,7 +1181,7 @@ FOREIGN_SERVER *get_server_by_name(const char *server_name)
} }
DBUG_PRINT("info", ("locking servers_cache")); DBUG_PRINT("info", ("locking servers_cache"));
VOID(pthread_mutex_lock(&servers_cache_mutex)); rw_rdlock(&THR_LOCK_servers);
if (!(server= (FOREIGN_SERVER *) hash_search(&servers_cache, if (!(server= (FOREIGN_SERVER *) hash_search(&servers_cache,
(byte*) server_name, (byte*) server_name,
server_name_length))) server_name_length)))
...@@ -1230,7 +1191,7 @@ FOREIGN_SERVER *get_server_by_name(const char *server_name) ...@@ -1230,7 +1191,7 @@ FOREIGN_SERVER *get_server_by_name(const char *server_name)
server= (FOREIGN_SERVER *) NULL; server= (FOREIGN_SERVER *) NULL;
} }
DBUG_PRINT("info", ("unlocking servers_cache")); DBUG_PRINT("info", ("unlocking servers_cache"));
VOID(pthread_mutex_unlock(&servers_cache_mutex)); rw_unlock(&THR_LOCK_servers);
DBUG_RETURN(server); DBUG_RETURN(server);
} }
...@@ -25,40 +25,18 @@ typedef struct st_federated_server ...@@ -25,40 +25,18 @@ typedef struct st_federated_server
} FOREIGN_SERVER; } FOREIGN_SERVER;
/* cache handlers */ /* cache handlers */
my_bool servers_init(bool dont_read_server_table); bool servers_init(bool dont_read_server_table);
my_bool servers_reload(THD *thd); bool servers_reload(THD *thd);
my_bool get_server_from_table_to_cache(TABLE *table);
void servers_free(bool end=0); void servers_free(bool end=0);
/* insert functions */ /* insert functions */
int create_server(THD *thd, LEX_SERVER_OPTIONS *server_options); int create_server(THD *thd, LEX_SERVER_OPTIONS *server_options);
int insert_server(THD *thd, FOREIGN_SERVER *server_options);
int insert_server_record(TABLE *table, FOREIGN_SERVER *server);
int insert_server_record_into_cache(FOREIGN_SERVER *server);
void store_server_fields_for_insert(TABLE *table, FOREIGN_SERVER *server);
void store_server_fields_for_insert(TABLE *table,
FOREIGN_SERVER *existing,
FOREIGN_SERVER *altered);
int prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options,
FOREIGN_SERVER *server);
/* drop functions */ /* drop functions */
int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options); int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options);
int delete_server_record(TABLE *table,
char *server_name,
int server_name_length);
int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options);
/* update functions */ /* update functions */
int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options); int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options);
int prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options,
FOREIGN_SERVER *existing, /* lookup functions */
FOREIGN_SERVER *altered);
int update_server(THD *thd, FOREIGN_SERVER *existing, FOREIGN_SERVER *altered);
int update_server_record(TABLE *table, FOREIGN_SERVER *server);
int update_server_record_in_cache(FOREIGN_SERVER *existing,
FOREIGN_SERVER *altered);
/* utility functions */
void merge_server_struct(FOREIGN_SERVER *from, FOREIGN_SERVER *to);
FOREIGN_SERVER *get_server_by_name(const char *server_name); FOREIGN_SERVER *get_server_by_name(const char *server_name);
my_bool server_exists_in_table(THD *thd, char *server_name);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment