Commit ecba7868 authored by unknown's avatar unknown

A slave now can optionally register with the master providing the

info on how to connect to the slave for the clients that connect to
the master, but would like to know where the slaves are


include/mysql_com.h:
  register slaves
mysql-test/mysql-test-run.sh:
  parameters to test slave registration
mysql-test/r/rpl000002.result:
  test of slave registration
mysql-test/t/rpl000002.test:
  test slave registration
sql/mysql_priv.h:
  slave registration
sql/mysqld.cc:
  slave registration
sql/slave.cc:
  slave registration
sql/slave.h:
  slave registration
sql/sql_lex.h:
  slave registration
sql/sql_parse.cc:
  slave registration
sql/sql_repl.cc:
  slave registration
sql/sql_repl.h:
  slave registration
sql/sql_yacc.yy:
  slave registration
parent f5e06429
......@@ -42,7 +42,8 @@ enum enum_server_command {COM_SLEEP,COM_QUIT,COM_INIT_DB,COM_QUERY,
COM_PROCESS_INFO,COM_CONNECT,COM_PROCESS_KILL,
COM_DEBUG,COM_PING,COM_TIME,COM_DELAYED_INSERT,
COM_CHANGE_USER, COM_BINLOG_DUMP,
COM_TABLE_DUMP, COM_CONNECT_OUT};
COM_TABLE_DUMP, COM_CONNECT_OUT,
COM_REGISTER_SLAVE};
#define NOT_NULL_FLAG 1 /* Field can't be NULL */
#define PRI_KEY_FLAG 2 /* Field is part of a primary key */
......
......@@ -520,6 +520,8 @@ start_slave()
--tmpdir=$MYSQL_TMP_DIR \
--language=english \
--skip-innodb --skip-slave-start \
--report-host=127.0.0.1 --report-user=root \
--report-port=$SLAVE_MYPORT \
$SMALL_SERVER \
$EXTRA_SLAVE_OPT $EXTRA_SLAVE_MYSQLD_OPT"
if [ x$DO_DDD = x1 ]
......
......@@ -2,6 +2,8 @@ n
2000
2001
2002
Server_id Host User Password Port
2 127.0.0.1 root 9307
id created
1 1970-01-01 06:25:45
id created
......
......@@ -11,6 +11,7 @@ use test;
sync_with_master;
select * from t1;
connection master;
show slave hosts;
drop table t1;
save_master_pos;
connection slave;
......
......@@ -520,7 +520,7 @@ extern pthread_mutex_t LOCK_mysql_create_db,LOCK_Acl,LOCK_open,
LOCK_thread_count,LOCK_mapped_file,LOCK_user_locks, LOCK_status,
LOCK_grant, LOCK_error_log, LOCK_delayed_insert,
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
LOCK_binlog_update, LOCK_slave, LOCK_server_id;
LOCK_binlog_update, LOCK_slave, LOCK_server_id, LOCK_slave_list;
extern pthread_cond_t COND_refresh,COND_thread_count, COND_binlog_update,
COND_slave_stopped, COND_slave_start;
extern pthread_attr_t connection_attrib;
......
......@@ -20,6 +20,7 @@
#include <my_dir.h>
#include "sql_acl.h"
#include "slave.h"
#include "sql_repl.h"
#include "stacktrace.h"
#ifdef HAVE_BERKELEY_DB
#include "ha_berkeley.h"
......@@ -277,9 +278,12 @@ volatile ulong cached_thread_count=0;
// replication parameters, if master_host is not NULL, we are a slave
my_string master_user = (char*) "test", master_password = 0, master_host=0,
master_info_file = (char*) "master.info";
my_string report_user = (char*) "test", report_password = 0, report_host=0;
const char *localhost=LOCAL_HOST;
const char *delayed_user="DELAYED";
uint master_port = MYSQL_PORT, master_connect_retry = 60;
uint report_port = MYSQL_PORT;
ulong max_tmp_tables,max_heap_table_size;
ulong bytes_sent = 0L, bytes_received = 0L;
......@@ -341,7 +345,7 @@ pthread_mutex_t LOCK_mysql_create_db, LOCK_Acl, LOCK_open, LOCK_thread_count,
LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create,
LOCK_crypt, LOCK_bytes_sent, LOCK_bytes_received,
LOCK_binlog_update, LOCK_slave, LOCK_server_id,
LOCK_user_conn;
LOCK_user_conn, LOCK_slave_list;
pthread_cond_t COND_refresh,COND_thread_count,COND_binlog_update,
COND_slave_stopped, COND_slave_start;
......@@ -695,6 +699,7 @@ void clean_up(bool print_message)
bitmap_free(&temp_pool);
free_max_user_conn();
end_slave();
end_slave_list();
#ifndef __WIN__
if (!opt_bootstrap)
(void) my_delete(pidfile_name,MYF(0)); // This may not always exist
......@@ -1685,7 +1690,8 @@ int main(int argc, char **argv)
randominit(&sql_rand,(ulong) start_time,(ulong) start_time/2);
reset_floating_point_exceptions();
init_thr_lock();
init_slave_list();
/* Fix varibles that are base 1024*1024 */
myisam_max_temp_length= (my_off_t) min(((ulonglong) myisam_max_sort_file_size)*1024*1024, (ulonglong) MAX_FILE_SIZE);
myisam_max_extra_temp_length= (my_off_t) min(((ulonglong) myisam_max_extra_sort_file_size)*1024*1024, (ulonglong) MAX_FILE_SIZE);
......@@ -2479,7 +2485,8 @@ enum options {
OPT_TEMP_POOL, OPT_DO_PSTACK, OPT_TX_ISOLATION,
OPT_GEMINI_FLUSH_LOG, OPT_GEMINI_RECOVER,
OPT_GEMINI_UNBUFFERED_IO, OPT_SKIP_SAFEMALLOC,
OPT_SKIP_STACK_TRACE
OPT_SKIP_STACK_TRACE, OPT_REPORT_HOST,
OPT_REPORT_USER, OPT_REPORT_PASSWORD, OPT_REPORT_PORT
};
static struct option long_options[] = {
......@@ -2587,6 +2594,12 @@ static struct option long_options[] = {
(int) OPT_REPLICATE_WILD_IGNORE_TABLE},
{"replicate-rewrite-db", required_argument, 0,
(int) OPT_REPLICATE_REWRITE_DB},
// In replication, we may need to tell the other servers how to connect
// to us
{"report-host", required_argument, 0, (int) OPT_REPORT_HOST},
{"report-user", required_argument, 0, (int) OPT_REPORT_USER},
{"report-password", required_argument, 0, (int) OPT_REPORT_PASSWORD},
{"report-port", required_argument, 0, (int) OPT_REPORT_PORT},
{"safe-mode", no_argument, 0, (int) OPT_SAFE},
{"safe-show-database", no_argument, 0, (int) OPT_SAFE_SHOW_DB},
{"socket", required_argument, 0, (int) OPT_SOCKET},
......@@ -3712,6 +3725,18 @@ static void get_options(int argc,char **argv)
case OPT_MASTER_PORT:
master_port= atoi(optarg);
break;
case OPT_REPORT_HOST:
report_host=optarg;
break;
case OPT_REPORT_USER:
report_user=optarg;
break;
case OPT_REPORT_PASSWORD:
report_password=optarg;
break;
case OPT_REPORT_PORT:
report_port= atoi(optarg);
break;
case OPT_MASTER_CONNECT_RETRY:
master_connect_retry= atoi(optarg);
break;
......
......@@ -569,6 +569,52 @@ error:
return 1;
}
int register_slave_on_master(MYSQL* mysql)
{
String packet;
uint len;
char buf[4];
if(!report_host)
return 0;
int4store(buf, server_id);
packet.append(buf, 4);
len = strlen(report_host);
packet.append((char)(uchar)len);
packet.append(report_host, len);
len = strlen(report_user);
packet.append((char)(uchar)len);
packet.append(report_user, len);
if(report_password)
{
len = strlen(report_password);
packet.append((char)(uchar)len);
packet.append(report_password, len);
}
else
{
packet.append((char)0);
}
int2store(buf, (uint16)report_port);
packet.append(buf, 2);
if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
packet.length(), 0))
{
sql_print_error("Error on COM_REGISTER_SLAVE: '%s'",
mc_mysql_error(mysql));
return 1;
}
return 0;
}
int show_master_info(THD* thd)
{
DBUG_ENTER("show_master_info");
......@@ -1245,6 +1291,12 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
sql_print_error("Slave thread killed while connecting to master");
goto err;
}
// register ourselves with the master
// if fails, this is not fatal - we just print the error message and go
// on with life
thd->proc_info = "Registering slave on master";
register_slave_on_master(mysql);
while (!slave_killed(thd))
{
......
......@@ -66,6 +66,7 @@ typedef struct st_table_rule_ent
#define TABLE_RULE_ARR_SIZE 16
int flush_master_info(MASTER_INFO* mi);
int register_slave_on_master(MYSQL* mysql);
int mysql_table_dump(THD* thd, const char* db,
const char* tbl_name, int fd = -1);
......@@ -117,9 +118,9 @@ extern int disconnect_slave_event_count, abort_slave_event_count ;
#endif
// the master variables are defaults read from my.cnf or command line
extern uint master_port, master_connect_retry;
extern uint master_port, master_connect_retry, report_port;
extern my_string master_user, master_password, master_host,
master_info_file;
master_info_file, report_user, report_host, report_password;
extern I_List<i_string> replicate_do_db, replicate_ignore_db;
extern I_List<i_string_pair> replicate_rewrite_db;
......
......@@ -54,7 +54,8 @@ enum enum_sql_command {
SQLCOM_RENAME_TABLE, SQLCOM_BACKUP_TABLE, SQLCOM_RESTORE_TABLE,
SQLCOM_RESET, SQLCOM_PURGE, SQLCOM_SHOW_BINLOGS,
SQLCOM_SHOW_OPEN_TABLES, SQLCOM_LOAD_MASTER_DATA,
SQLCOM_HA_OPEN, SQLCOM_HA_CLOSE, SQLCOM_HA_READ
SQLCOM_HA_OPEN, SQLCOM_HA_CLOSE, SQLCOM_HA_READ,
SQLCOM_SHOW_SLAVE_HOSTS
};
enum lex_states { STATE_START, STATE_CHAR, STATE_IDENT,
......
......@@ -53,7 +53,7 @@ const char *command_name[]={
"Sleep", "Quit", "Init DB", "Query", "Field List", "Create DB",
"Drop DB", "Refresh", "Shutdown", "Statistics", "Processlist",
"Connect","Kill","Debug","Ping","Time","Delayed_insert","Change user",
"Binlog Dump","Table Dump", "Connect Out"
"Binlog Dump","Table Dump", "Connect Out", "Register Slave"
};
bool volatile abort_slave = 0;
......@@ -766,6 +766,14 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
if (!mysql_change_db(thd,packet))
mysql_log.write(thd,command,"%s",thd->db);
break;
case COM_REGISTER_SLAVE:
{
if(register_slave(thd, (uchar*)packet, packet_length))
send_error(&thd->net);
else
send_ok(&thd->net);
break;
}
case COM_TABLE_DUMP:
{
slow_command = TRUE;
......@@ -1163,6 +1171,13 @@ mysql_execute_command(void)
res = purge_master_logs(thd, lex->to_log);
break;
}
case SQLCOM_SHOW_SLAVE_HOSTS:
{
if(check_access(thd, FILE_ACL, any_db))
goto error;
res = show_slave_hosts(thd);
break;
}
case SQLCOM_BACKUP_TABLE:
{
if (check_db_used(thd,tables) ||
......
......@@ -25,9 +25,39 @@
#include <thr_alarm.h>
#include <my_dir.h>
#define SLAVE_LIST_CHUNK 128
extern const char* any_db;
extern pthread_handler_decl(handle_slave,arg);
HASH slave_list;
static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
my_bool not_used __attribute__((unused)))
{
*len = 4;
return &si->server_id;
}
static void slave_info_free(void *s)
{
my_free((byte*)s, MYF(MY_WME));
}
void init_slave_list()
{
hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0,
(hash_get_key) slave_list_key, slave_info_free, 0);
pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
}
void end_slave_list()
{
pthread_mutex_lock(&LOCK_slave_list);
hash_free(&slave_list);
pthread_mutex_unlock(&LOCK_slave_list);
pthread_mutex_destroy(&LOCK_slave_list);
}
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
const char**errmsg)
......@@ -56,6 +86,55 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
return 0;
}
int register_slave(THD* thd, uchar* packet, uint packet_length)
{
uint len;
SLAVE_INFO* si, *old_si;
int res = 1;
uchar* p = packet, *p_end = packet + packet_length;
if(check_access(thd, FILE_ACL, any_db))
return 1;
if(!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err;
si->server_id = uint4korr(p);
p += 4;
len = (uint)*p++;
if(p + len > p_end || len > sizeof(si->host) - 1)
goto err;
memcpy(si->host, p, len);
si->host[len] = 0;
p += len;
len = *p++;
if(p + len > p_end || len > sizeof(si->user) - 1)
goto err;
memcpy(si->user, p, len);
si->user[len] = 0;
p += len;
len = *p++;
if(p + len > p_end || len > sizeof(si->password) - 1)
goto err;
memcpy(si->password, p, len);
si->password[len] = 0;
p += len;
si->port = uint2korr(p);
pthread_mutex_lock(&LOCK_slave_list);
if((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&si->server_id, 4)))
hash_delete(&slave_list, (byte*)old_si);
res = hash_insert(&slave_list, (byte*)si);
pthread_mutex_unlock(&LOCK_slave_list);
return res;
err:
if(si)
my_free((byte*)si, MYF(MY_WME));
return res;
}
static int send_file(THD *thd)
{
......@@ -742,6 +821,44 @@ void reset_master()
}
int show_slave_hosts(THD* thd)
{
DBUG_ENTER("show_slave_hosts");
List<Item> field_list;
field_list.push_back(new Item_empty_string("Server_id", 20));
field_list.push_back(new Item_empty_string("Host", 20));
field_list.push_back(new Item_empty_string("User",20));
field_list.push_back(new Item_empty_string("Password",20));
field_list.push_back(new Item_empty_string("Port",20));
if(send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
String* packet = &thd->packet;
uint i;
NET* net = &thd->net;
pthread_mutex_lock(&LOCK_slave_list);
for(i = 0; i < slave_list.records; ++i)
{
SLAVE_INFO* si = (SLAVE_INFO*)hash_element(&slave_list, i);
packet->length(0);
net_store_data(packet, si->server_id);
net_store_data(packet, si->host);
net_store_data(packet, si->user);
net_store_data(packet, si->password);
net_store_data(packet, (uint)si->port);
if(my_net_write(net, (char*)packet->ptr(), packet->length()))
{
pthread_mutex_unlock(&LOCK_slave_list);
DBUG_RETURN(-1);
}
}
pthread_mutex_unlock(&LOCK_slave_list);
send_eof(net);
DBUG_RETURN(0);
}
int show_binlog_info(THD* thd)
{
DBUG_ENTER("show_binlog_info");
......
......@@ -3,6 +3,16 @@
#include "slave.h"
typedef struct st_slave_info
{
uint32 server_id;
char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1];
char password[HASH_PASSWORD_LENGTH+1];
uint16 port;
} SLAVE_INFO;
extern HASH slave_list;
extern char* master_host;
extern my_string opt_bin_logname, master_info_file;
extern uint32 server_id;
......@@ -17,8 +27,12 @@ int stop_slave(THD* thd = 0, bool net_report = 1);
int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int change_master(THD* thd);
int show_slave_hosts(THD* thd);
void reset_slave();
void reset_master();
void init_slave_list();
void end_slave_list();
int register_slave(THD* thd, uchar* packet, uint packet_length);
int purge_master_logs(THD* thd, const char* to_log);
bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset);
......
......@@ -2246,7 +2246,11 @@ show_param:
| MASTER_SYM LOGS_SYM
{
Lex->sql_command = SQLCOM_SHOW_BINLOGS;
}
}
| SLAVE HOSTS_SYM
{
Lex->sql_command = SQLCOM_SHOW_SLAVE_HOSTS;
}
| keys_or_index FROM table_ident opt_db
{
Lex->sql_command= SQLCOM_SHOW_KEYS;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment