Commit 513cb26e authored by unknown's avatar unknown

Merge work:/home/bk/mysql-4.0

into mysql.sashanet.com:/home/sasha/src/bk/mysql-4.0
parents 25214075 a23f2a3a
...@@ -243,7 +243,10 @@ typedef struct st_typelib { /* Different types saved here */ ...@@ -243,7 +243,10 @@ typedef struct st_typelib { /* Different types saved here */
const char **type_names; const char **type_names;
} TYPELIB; } TYPELIB;
enum cache_type {READ_CACHE,WRITE_CACHE,READ_FIFO,READ_NET,WRITE_NET}; enum cache_type {READ_CACHE,WRITE_CACHE,
SEQ_READ_APPEND /* sequential read or append */,
READ_FIFO,
READ_NET,WRITE_NET};
enum flush_type { FLUSH_KEEP, FLUSH_RELEASE, FLUSH_IGNORE_CHANGED, enum flush_type { FLUSH_KEEP, FLUSH_RELEASE, FLUSH_IGNORE_CHANGED,
FLUSH_FORCE_WRITE}; FLUSH_FORCE_WRITE};
...@@ -294,6 +297,16 @@ typedef struct st_io_cache /* Used when cacheing files */ ...@@ -294,6 +297,16 @@ typedef struct st_io_cache /* Used when cacheing files */
{ {
my_off_t pos_in_file,end_of_file; my_off_t pos_in_file,end_of_file;
byte *rc_pos,*rc_end,*buffer,*rc_request_pos; byte *rc_pos,*rc_end,*buffer,*rc_request_pos;
my_bool alloced_buffer; /* currented READ_NET is the only one
that will use a buffer allocated somewhere
else
*/
byte *append_buffer, *append_pos, *append_end;
/* for append buffer used in READ_APPEND cache */
#ifdef THREAD
pthread_mutex_t append_buffer_lock;
/* need mutex copying from append buffer to read buffer */
#endif
int (*read_function)(struct st_io_cache *,byte *,uint); int (*read_function)(struct st_io_cache *,byte *,uint);
/* callbacks when the actual read I/O happens */ /* callbacks when the actual read I/O happens */
IO_CACHE_CALLBACK pre_read; IO_CACHE_CALLBACK pre_read;
...@@ -546,6 +559,7 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type, ...@@ -546,6 +559,7 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
my_off_t seek_offset,pbool use_async_io, my_off_t seek_offset,pbool use_async_io,
pbool clear_cache); pbool clear_cache);
extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_get(IO_CACHE *info); extern int _my_b_get(IO_CACHE *info);
extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
# This file is public domain and comes with NO WARRANTY of any kind # This file is public domain and comes with NO WARRANTY of any kind
target = libmysqlclient.la target = libmysqlclient.la
target_defs = -DUNDEF_THREADS_HACK -DDONT_USE_RAID target_defs = -DUNDEF_THREADS_HACK -DDONT_USE_RAID -DMYSQL_CLIENT
LIBS = @CLIENT_LIBS@ LIBS = @CLIENT_LIBS@
INCLUDES = -I$(srcdir)/../include -I../include \ INCLUDES = -I$(srcdir)/../include -I../include \
-I$(srcdir)/.. -I$(top_srcdir) -I.. $(openssl_includes) -I$(srcdir)/.. -I$(top_srcdir) -I.. $(openssl_includes)
......
...@@ -1147,6 +1147,8 @@ static inline int get_slaves_from_master(MYSQL* mysql) ...@@ -1147,6 +1147,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
MYSQL_ROW row; MYSQL_ROW row;
int error = 1; int error = 1;
int has_auth_info; int has_auth_info;
int port_ind;
if (!mysql->net.vio && !mysql_real_connect(mysql,0,0,0,0,0,0,0)) if (!mysql->net.vio && !mysql_real_connect(mysql,0,0,0,0,0,0,0))
{ {
expand_error(mysql, CR_PROBE_MASTER_CONNECT); expand_error(mysql, CR_PROBE_MASTER_CONNECT);
...@@ -1162,8 +1164,14 @@ static inline int get_slaves_from_master(MYSQL* mysql) ...@@ -1162,8 +1164,14 @@ static inline int get_slaves_from_master(MYSQL* mysql)
switch (mysql_num_fields(res)) switch (mysql_num_fields(res))
{ {
case 3: has_auth_info = 0; break; case 5:
case 5: has_auth_info = 1; break; has_auth_info = 0;
port_ind=2;
break;
case 7:
has_auth_info = 1;
port_ind=4;
break;
default: default:
goto err; goto err;
} }
...@@ -1175,8 +1183,8 @@ static inline int get_slaves_from_master(MYSQL* mysql) ...@@ -1175,8 +1183,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
if (has_auth_info) if (has_auth_info)
{ {
tmp_user = row[3]; tmp_user = row[2];
tmp_pass = row[4]; tmp_pass = row[3];
} }
else else
{ {
...@@ -1184,7 +1192,7 @@ static inline int get_slaves_from_master(MYSQL* mysql) ...@@ -1184,7 +1192,7 @@ static inline int get_slaves_from_master(MYSQL* mysql)
tmp_pass = mysql->passwd; tmp_pass = mysql->passwd;
} }
if (!(slave = spawn_init(mysql, row[1], atoi(row[2]), if (!(slave = spawn_init(mysql, row[1], atoi(row[port_ind]),
tmp_user, tmp_pass))) tmp_user, tmp_pass)))
goto err; goto err;
......
...@@ -7,6 +7,29 @@ use test; ...@@ -7,6 +7,29 @@ use test;
drop table if exists t1,t3; drop table if exists t1,t3;
create table t1 (word char(20) not null); create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1; load data infile '../../std_data/words.dat' into table t1;
load data local infile '/home/sasha/bk/mysql-4.0/mysql-test/std_data/words.dat' into table t1;
select * from t1;
word
Aarhus
Aaron
Ababa
aback
abaft
abandon
abandoned
abandoning
abandonment
abandons
Aarhus
Aaron
Ababa
aback
abaft
abandon
abandoned
abandoning
abandonment
abandons
set password = password('foo'); set password = password('foo');
set password = password(''); set password = password('');
create table t3(n int); create table t3(n int);
...@@ -18,7 +41,7 @@ n ...@@ -18,7 +41,7 @@ n
2 2
select sum(length(word)) from t1; select sum(length(word)) from t1;
sum(length(word)) sum(length(word))
71 141
drop table t1,t3; drop table t1,t3;
reset master; reset master;
reset slave; reset slave;
......
...@@ -15,8 +15,8 @@ n ...@@ -15,8 +15,8 @@ n
2001 2001
2002 2002
show slave hosts; show slave hosts;
Server_id Host Port Server_id Host Port Rpl_recovery_rank Master_id
2 127.0.0.1 $SLAVE_MYPORT 2 127.0.0.1 9307 2 1
drop table t1; drop table t1;
slave stop; slave stop;
drop table if exists t2; drop table if exists t2;
......
...@@ -4,6 +4,8 @@ use test; ...@@ -4,6 +4,8 @@ use test;
drop table if exists t1,t3; drop table if exists t1,t3;
create table t1 (word char(20) not null); create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1; load data infile '../../std_data/words.dat' into table t1;
eval load data local infile '$MYSQL_TEST_DIR/std_data/words.dat' into table t1;
select * from t1;
set password = password('foo'); set password = password('foo');
set password = password(''); set password = password('');
create table t3(n int); create table t3(n int);
......
This diff is collapsed.
This diff is collapsed.
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
#include "mysql_priv.h" #include "mysql_priv.h"
#include "repl_failsafe.h" #include "repl_failsafe.h"
#include "sql_repl.h"
#include "slave.h"
#include "mini_client.h"
#include <mysql.h>
RPL_STATUS rpl_status=RPL_NULL; RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status; pthread_mutex_t LOCK_rpl_status;
...@@ -33,11 +37,184 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE", ...@@ -33,11 +37,184 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"", TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
rpl_status_type}; rpl_status_type};
static int init_failsafe_rpl_thread(THD* thd)
{
DBUG_ENTER("init_failsafe_rpl_thread");
thd->system_thread = thd->bootstrap = 1;
thd->client_capabilities = 0;
my_net_init(&thd->net, 0);
thd->net.timeout = slave_net_timeout;
thd->max_packet_length=thd->net.max_packet;
thd->master_access= ~0;
thd->priv_user = 0;
thd->system_thread = 1;
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
if (init_thr_lock() ||
my_pthread_setspecific_ptr(THR_THD, thd) ||
my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
my_pthread_setspecific_ptr(THR_NET, &thd->net))
{
close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
end_thread(thd,0);
DBUG_RETURN(-1);
}
thd->mysys_var=my_thread_var;
thd->dbug_thread_id=my_thread_id();
#if !defined(__WIN__) && !defined(OS2)
sigset_t set;
VOID(sigemptyset(&set)); // Get mask in use
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif
thd->mem_root.free=thd->mem_root.used=0;
if (thd->max_join_size == (ulong) ~0L)
thd->options |= OPTION_BIG_SELECTS;
thd->proc_info="Thread initialized";
thd->version=refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status) void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
{ {
pthread_mutex_lock(&LOCK_rpl_status); pthread_mutex_lock(&LOCK_rpl_status);
if (rpl_status == from_status || rpl_status == RPL_ANY) if (rpl_status == from_status || rpl_status == RPL_ANY)
rpl_status = to_status; rpl_status = to_status;
pthread_cond_signal(&COND_rpl_status);
pthread_mutex_unlock(&LOCK_rpl_status);
}
int update_slave_list(MYSQL* mysql)
{
MYSQL_RES* res=0;
MYSQL_ROW row;
const char* error=0;
bool have_auth_info;
int port_ind;
if (mc_mysql_query(mysql,"SHOW SLAVE HOSTS",0) ||
!(res = mc_mysql_store_result(mysql)))
{
error = "Query error";
goto err;
}
switch (mc_mysql_num_fields(res))
{
case 5:
have_auth_info = 0;
port_ind=2;
break;
case 7:
have_auth_info = 1;
port_ind=4;
break;
default:
error = "Invalid number of fields in SHOW SLAVE HOSTS";
goto err;
}
pthread_mutex_lock(&LOCK_slave_list);
while ((row = mc_mysql_fetch_row(res)))
{
uint32 server_id;
SLAVE_INFO* si, *old_si;
server_id = atoi(row[0]);
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&server_id,4)))
si = old_si;
else
{
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
{
error = "Out of memory";
pthread_mutex_unlock(&LOCK_slave_list);
goto err;
}
si->server_id = server_id;
}
strnmov(si->host, row[1], sizeof(si->host));
si->port = atoi(row[port_ind]);
si->rpl_recovery_rank = atoi(row[port_ind+1]);
si->master_id = atoi(row[port_ind+2]);
if (have_auth_info)
{
strnmov(si->user, row[2], sizeof(si->user));
strnmov(si->password, row[3], sizeof(si->password));
}
}
pthread_mutex_unlock(&LOCK_slave_list);
err:
if (res)
mc_mysql_free_result(res);
if (error)
{
sql_print_error("Error updating slave list:",error);
return 1;
}
return 0;
}
int find_recovery_captain(THD* thd, MYSQL* mysql)
{
return 0;
}
pthread_handler_decl(handle_failsafe_rpl,arg)
{
DBUG_ENTER("handle_failsafe_rpl");
THD *thd = new THD;
thd->thread_stack = (char*)&thd;
MYSQL* recovery_captain = 0;
pthread_detach_this_thread();
if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mc_mysql_init(0)))
{
sql_print_error("Could not initialize failsafe replication thread");
goto err;
}
pthread_mutex_lock(&LOCK_rpl_status);
while (!thd->killed && !abort_loop)
{
bool break_req_chain = 0;
const char* msg = thd->enter_cond(&COND_rpl_status,
&LOCK_rpl_status, "Waiting for request");
pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
thd->proc_info="Processling request";
while (!break_req_chain)
{
switch (rpl_status)
{
case RPL_LOST_SOLDIER:
if (find_recovery_captain(thd, recovery_captain))
rpl_status=RPL_TROOP_SOLDIER;
else
rpl_status=RPL_RECOVERY_CAPTAIN;
break_req_chain=1; /* for now until other states are implemented */
break;
default:
break_req_chain=1;
break;
}
}
thd->exit_cond(msg);
}
pthread_mutex_unlock(&LOCK_rpl_status); pthread_mutex_unlock(&LOCK_rpl_status);
err:
if (recovery_captain)
mc_mysql_close(recovery_captain);
delete thd;
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0);
} }
#ifndef REPL_FAILSAFE_H #ifndef REPL_FAILSAFE_H
#define REPL_FAILSAFE_H #define REPL_FAILSAFE_H
#include "mysql.h"
typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE, typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER, RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
RPL_RECOVERY_CAPTAIN,RPL_NULL /* inactive */, RPL_RECOVERY_CAPTAIN,RPL_NULL /* inactive */,
...@@ -10,7 +12,11 @@ extern RPL_STATUS rpl_status; ...@@ -10,7 +12,11 @@ extern RPL_STATUS rpl_status;
extern pthread_mutex_t LOCK_rpl_status; extern pthread_mutex_t LOCK_rpl_status;
extern pthread_cond_t COND_rpl_status; extern pthread_cond_t COND_rpl_status;
extern TYPELIB rpl_role_typelib, rpl_status_typelib; extern TYPELIB rpl_role_typelib, rpl_status_typelib;
extern uint rpl_recovery_rank;
extern const char* rpl_role_type[], *rpl_status_type[]; extern const char* rpl_role_type[], *rpl_status_type[];
pthread_handler_decl(handle_failsafe_rpl,arg);
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status); void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status);
int find_recovery_captain(THD* thd, MYSQL* mysql);
int update_slave_list(MYSQL* mysql);
#endif #endif
...@@ -55,6 +55,8 @@ inline bool slave_killed(THD* thd); ...@@ -55,6 +55,8 @@ inline bool slave_killed(THD* thd);
static int init_slave_thread(THD* thd); static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect);
static int safe_sleep(THD* thd, int sec); static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db, static int create_table_from_dump(THD* thd, NET* net, const char* db,
...@@ -615,6 +617,10 @@ int register_slave_on_master(MYSQL* mysql) ...@@ -615,6 +617,10 @@ int register_slave_on_master(MYSQL* mysql)
int2store(buf, (uint16)report_port); int2store(buf, (uint16)report_port);
packet.append(buf, 2); packet.append(buf, 2);
int4store(buf, rpl_recovery_rank);
packet.append(buf, 4);
int4store(buf, 0); /* tell the master will fill in master_id */
packet.append(buf, 4);
if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(), if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
packet.length(), 0)) packet.length(), 0))
...@@ -868,7 +874,7 @@ command"); ...@@ -868,7 +874,7 @@ command");
} }
static uint read_event(MYSQL* mysql, MASTER_INFO *mi) static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
{ {
ulong len = packet_error; ulong len = packet_error;
// for convinience lets think we start by // for convinience lets think we start by
...@@ -1017,7 +1023,6 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -1017,7 +1023,6 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init(); my_thread_init();
slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ ! slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
thd->set_time();
DBUG_ENTER("handle_slave"); DBUG_ENTER("handle_slave");
pthread_detach_this_thread(); pthread_detach_this_thread();
...@@ -1067,6 +1072,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -1067,6 +1072,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// on with life // on with life
thd->proc_info = "Registering slave on master"; thd->proc_info = "Registering slave on master";
register_slave_on_master(mysql); register_slave_on_master(mysql);
update_slave_list(mysql);
while (!slave_killed(thd)) while (!slave_killed(thd))
{ {
...@@ -1117,7 +1123,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME, ...@@ -1117,7 +1123,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME,
while(!slave_killed(thd)) while(!slave_killed(thd))
{ {
thd->proc_info = "Reading master update"; thd->proc_info = "Reading master update";
uint event_len = read_event(mysql, &glob_mi); ulong event_len = read_event(mysql, &glob_mi);
if(slave_killed(thd)) if(slave_killed(thd))
{ {
sql_print_error("Slave thread killed while reading event"); sql_print_error("Slave thread killed while reading event");
...@@ -1244,30 +1250,7 @@ position %s", ...@@ -1244,30 +1250,7 @@ position %s",
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{ {
int slave_was_killed; return connect_to_master(thd, mysql, mi, 0);
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
while(!(slave_was_killed = slave_killed(thd)) &&
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
sql_print_error("Slave thread: error connecting to master: %s (%d),\
retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry);
safe_sleep(thd, mi->connect_retry);
}
if(!slave_was_killed)
{
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
mi->user, mi->host, mi->port);
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(mysql->net.vio);
#endif
}
return slave_was_killed;
} }
/* /*
...@@ -1275,7 +1258,8 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) ...@@ -1275,7 +1258,8 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
master_retry_count times master_retry_count times
*/ */
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect)
{ {
int slave_was_killed; int slave_was_killed;
int last_errno= -2; // impossible error int last_errno= -2; // impossible error
...@@ -1290,12 +1274,15 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) ...@@ -1290,12 +1274,15 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
#ifndef DBUG_OFF #ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count; events_till_disconnect = disconnect_slave_event_count;
#endif #endif
while (!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql)) while (!(slave_was_killed = slave_killed(thd)) &&
(reconnect ? mc_mysql_reconnect(mysql) :
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0)))
{ {
/* Don't repeat last error */ /* Don't repeat last error */
if (mc_mysql_errno(mysql) != last_errno) if (mc_mysql_errno(mysql) != last_errno)
{ {
sql_print_error("Slave thread: error re-connecting to master: \ sql_print_error("Slave thread: error connecting to master: \
%s, last_errno=%d, retry in %d sec", %s, last_errno=%d, retry in %d sec",
mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql), mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
mi->connect_retry); mi->connect_retry);
...@@ -1309,18 +1296,26 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) ...@@ -1309,18 +1296,26 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
if (master_retry_count && err_count++ == master_retry_count) if (master_retry_count && err_count++ == master_retry_count)
{ {
slave_was_killed=1; slave_was_killed=1;
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER); if (reconnect)
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
break; break;
} }
} }
if (!slave_was_killed) if (!slave_was_killed)
{ {
sql_print_error("Slave: reconnected to master '%s@%s:%d',\ if (reconnect)
sql_print_error("Slave: connected to master '%s@%s:%d',\
replication resumed in log '%s' at position %s", glob_mi.user, replication resumed in log '%s' at position %s", glob_mi.user,
glob_mi.host, glob_mi.port, glob_mi.host, glob_mi.port,
RPL_LOG_NAME, RPL_LOG_NAME,
llstr(glob_mi.pos,llbuff)); llstr(glob_mi.pos,llbuff));
else
{
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
mi->user, mi->host, mi->port);
}
#ifdef SIGNAL_WITH_VIO_CLOSE #ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(mysql->net.vio); thd->set_active_vio(mysql->net.vio);
#endif #endif
...@@ -1329,6 +1324,17 @@ replication resumed in log '%s' at position %s", glob_mi.user, ...@@ -1329,6 +1324,17 @@ replication resumed in log '%s' at position %s", glob_mi.user,
return slave_was_killed; return slave_was_killed;
} }
/*
Try to connect until successful or slave killed or we have retried
master_retry_count times
*/
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
return connect_to_master(thd, mysql, mi, 1);
}
#ifdef __GNUC__ #ifdef __GNUC__
template class I_List_iterator<i_string>; template class I_List_iterator<i_string>;
template class I_List_iterator<i_string_pair>; template class I_List_iterator<i_string_pair>;
......
...@@ -278,8 +278,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -278,8 +278,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
ha_autocommit_or_rollback(thd,error); ha_autocommit_or_rollback(thd,error);
if (!opt_old_rpl_compat && mysql_bin_log.is_open()) if (!opt_old_rpl_compat && mysql_bin_log.is_open())
{ {
Delete_file_log_event d(thd); if (lf_info.wrote_create_file)
mysql_bin_log.write(&d); {
Delete_file_log_event d(thd);
mysql_bin_log.write(&d);
}
} }
DBUG_RETURN(-1); // Error on read DBUG_RETURN(-1); // Error on read
} }
...@@ -303,8 +306,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -303,8 +306,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (!opt_old_rpl_compat) if (!opt_old_rpl_compat)
{ {
read_info.end_io_cache(); // make sure last block gets logged read_info.end_io_cache(); // make sure last block gets logged
Execute_load_log_event e(thd); if (lf_info.wrote_create_file)
mysql_bin_log.write(&e); {
Execute_load_log_event e(thd);
mysql_bin_log.write(&e);
}
} }
} }
if (using_transactions) if (using_transactions)
...@@ -534,6 +540,14 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term, ...@@ -534,6 +540,14 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
} }
else else
{ {
/* init_io_cache() will not initialize read_function member
if the cache is READ_NET. The reason is explained in
mysys/mf_iocache.c. So we work around the problem with a
manual assignment
*/
if (get_it_from_net)
cache.read_function = _my_b_net_read;
need_end_io_cache = 1; need_end_io_cache = 1;
if (!opt_old_rpl_compat && mysql_bin_log.is_open()) if (!opt_old_rpl_compat && mysql_bin_log.is_open())
cache.pre_read = cache.pre_close = cache.pre_read = cache.pre_close =
......
...@@ -140,6 +140,11 @@ int register_slave(THD* thd, uchar* packet, uint packet_length) ...@@ -140,6 +140,11 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
get_object(p,si->user); get_object(p,si->user);
get_object(p,si->password); get_object(p,si->password);
si->port = uint2korr(p); si->port = uint2korr(p);
p += 2;
si->rpl_recovery_rank = uint4korr(p);
p += 4;
if (!(si->master_id = uint4korr(p)))
si->master_id = server_id;
si->thd = thd; si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list); pthread_mutex_lock(&LOCK_slave_list);
...@@ -534,6 +539,7 @@ impossible position"; ...@@ -534,6 +539,7 @@ impossible position";
DBUG_PRINT("wait",("waiting for data on binary log")); DBUG_PRINT("wait",("waiting for data on binary log"));
if (!thd->killed) if (!thd->killed)
pthread_cond_wait(&COND_binlog_update, log_lock); pthread_cond_wait(&COND_binlog_update, log_lock);
DBUG_PRINT("wait",("binary log received update"));
break; break;
default: default:
...@@ -1253,6 +1259,8 @@ int show_slave_hosts(THD* thd) ...@@ -1253,6 +1259,8 @@ int show_slave_hosts(THD* thd)
field_list.push_back(new Item_empty_string("Password",20)); field_list.push_back(new Item_empty_string("Password",20));
} }
field_list.push_back(new Item_empty_string("Port",20)); field_list.push_back(new Item_empty_string("Port",20));
field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
field_list.push_back(new Item_empty_string("Master_id", 20));
if (send_fields(thd, field_list, 1)) if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1); DBUG_RETURN(-1);
...@@ -1271,6 +1279,8 @@ int show_slave_hosts(THD* thd) ...@@ -1271,6 +1279,8 @@ int show_slave_hosts(THD* thd)
net_store_data(packet, si->password); net_store_data(packet, si->password);
} }
net_store_data(packet, (uint32) si->port); net_store_data(packet, (uint32) si->port);
net_store_data(packet, si->rpl_recovery_rank);
net_store_data(packet, si->master_id);
if (my_net_write(net, (char*)packet->ptr(), packet->length())) if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{ {
pthread_mutex_unlock(&LOCK_slave_list); pthread_mutex_unlock(&LOCK_slave_list);
...@@ -1616,7 +1626,8 @@ int log_loaded_block(IO_CACHE* file) ...@@ -1616,7 +1626,8 @@ int log_loaded_block(IO_CACHE* file)
{ {
LOAD_FILE_INFO* lf_info; LOAD_FILE_INFO* lf_info;
uint block_len ; uint block_len ;
if (!(block_len = file->rc_end - file->buffer)) char* buffer = (char*)file->buffer;
if (!(block_len = file->rc_end - buffer))
return 0; return 0;
lf_info = (LOAD_FILE_INFO*)file->arg; lf_info = (LOAD_FILE_INFO*)file->arg;
if (lf_info->last_pos_in_file != HA_POS_ERROR && if (lf_info->last_pos_in_file != HA_POS_ERROR &&
...@@ -1625,14 +1636,14 @@ int log_loaded_block(IO_CACHE* file) ...@@ -1625,14 +1636,14 @@ int log_loaded_block(IO_CACHE* file)
lf_info->last_pos_in_file = file->pos_in_file; lf_info->last_pos_in_file = file->pos_in_file;
if (lf_info->wrote_create_file) if (lf_info->wrote_create_file)
{ {
Append_block_log_event a(lf_info->thd, (char*) file->buffer, block_len); Append_block_log_event a(lf_info->thd, buffer, block_len);
mysql_bin_log.write(&a); mysql_bin_log.write(&a);
} }
else else
{ {
Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db, Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
lf_info->table_name, *lf_info->fields, lf_info->table_name, *lf_info->fields,
lf_info->handle_dup, (char*) file->buffer, lf_info->handle_dup, buffer,
block_len); block_len);
mysql_bin_log.write(&c); mysql_bin_log.write(&c);
lf_info->wrote_create_file = 1; lf_info->wrote_create_file = 1;
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
typedef struct st_slave_info typedef struct st_slave_info
{ {
uint32 server_id; uint32 server_id;
uint32 rpl_recovery_rank, master_id;
char host[HOSTNAME_LENGTH+1]; char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1]; char user[USERNAME_LENGTH+1];
char password[HASH_PASSWORD_LENGTH+1]; char password[HASH_PASSWORD_LENGTH+1];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment