Commit d7cfbff5 authored by unknown's avatar unknown

relay_log_space_limit

DBUG_ macro cleanup
buffer boundary cleanup
This changeset, although not fully tested, works for me better than 
anything I've had so far, including what is in the repository. I will 
push it unless something crashes while I am writing this :-)


mysql-test/r/rpl000014.result:
  updated result
mysql-test/r/rpl000015.result:
  updated result
mysql-test/r/rpl000016.result:
  updated result
mysql-test/r/rpl_log.result:
  new result
mysys/mf_iocache.c:
  DBUG_ cleanup
mysys/mf_iocache2.c:
  DBUG_ fix
sql/log.cc:
  added relay_log_space_limit
sql/mysqld.cc:
  relay_log_space_limit
sql/slave.cc:
  relay_log_space_limit
sql/slave.h:
  relay_log_space_limit
sql/sql_class.h:
  relay_log_space_limit
sql/sql_repl.cc:
  fixed buffer overrun bug
parent 869e671f
...@@ -7,22 +7,22 @@ show master status; ...@@ -7,22 +7,22 @@ show master status;
File Position Binlog_do_db Binlog_ignore_db File Position Binlog_do_db Binlog_ignore_db
master-bin.001 79 master-bin.001 79
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 1 master-bin.001 79 slave-relay-bin.002 120 master-bin.001 Yes Yes 0 0 79 127.0.0.1 root MASTER_PORT 1 master-bin.001 79 slave-relay-bin.002 120 master-bin.001 Yes Yes 0 0 79 124
change master to master_log_pos=73; change master to master_log_pos=73;
slave stop; slave stop;
change master to master_log_pos=73; change master to master_log_pos=73;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 1 master-bin.001 73 slave-relay-bin.001 4 master-bin.001 No No 0 0 73 127.0.0.1 root MASTER_PORT 1 master-bin.001 73 slave-relay-bin.001 4 master-bin.001 No No 0 0 73 4
slave start; slave start;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 1 master-bin.001 73 slave-relay-bin.001 4 master-bin.001 Yes Yes 0 0 73 127.0.0.1 root MASTER_PORT 1 master-bin.001 73 slave-relay-bin.001 4 master-bin.001 Yes Yes 0 0 73 4
change master to master_log_pos=173; change master to master_log_pos=173;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 1 master-bin.001 173 slave-relay-bin.001 4 master-bin.001 Yes Yes 0 0 173 127.0.0.1 root MASTER_PORT 1 master-bin.001 173 slave-relay-bin.001 4 master-bin.001 Yes Yes 0 0 173 4
show master status; show master status;
File Position Binlog_do_db Binlog_ignore_db File Position Binlog_do_db Binlog_ignore_db
master-bin.001 79 master-bin.001 79
......
...@@ -4,21 +4,21 @@ File Position Binlog_do_db Binlog_ignore_db ...@@ -4,21 +4,21 @@ File Position Binlog_do_db Binlog_ignore_db
master-bin.001 79 master-bin.001 79
reset slave; reset slave;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
0 0 0 0 No No 0 0 0 0 0 0 0 No No 0 0 0 0
change master to master_host='127.0.0.1'; change master to master_host='127.0.0.1';
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 test MASTER_PORT 7 4 slave-relay-bin.001 4 No No 0 0 0 127.0.0.1 test MASTER_PORT 7 4 slave-relay-bin.001 4 No No 0 0 0 4
change master to master_host='127.0.0.1',master_user='root', change master to master_host='127.0.0.1',master_user='root',
master_password='',master_port=MASTER_PORT; master_password='',master_port=MASTER_PORT;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 7 4 slave-relay-bin.001 4 No No 0 0 0 127.0.0.1 root MASTER_PORT 7 4 slave-relay-bin.001 4 No No 0 0 0 4
slave start; slave start;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 7 master-bin.001 79 slave-relay-bin.001 120 master-bin.001 Yes Yes 0 0 79 127.0.0.1 root MASTER_PORT 7 master-bin.001 79 slave-relay-bin.001 120 master-bin.001 Yes Yes 0 0 79 120
drop table if exists t1; drop table if exists t1;
create table t1 (n int); create table t1 (n int);
insert into t1 values (10),(45),(90); insert into t1 values (10),(45),(90);
......
...@@ -14,8 +14,8 @@ drop table if exists t1; ...@@ -14,8 +14,8 @@ drop table if exists t1;
create table t1 (s text); create table t1 (s text);
insert into t1 values('Could not break slave'),('Tried hard'); insert into t1 values('Could not break slave'),('Tried hard');
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 60 master-bin.001 234 slave-relay-bin.001 275 master-bin.001 Yes Yes 0 0 234 127.0.0.1 root MASTER_PORT 60 master-bin.001 234 slave-relay-bin.001 275 master-bin.001 Yes Yes 0 0 234 275
select * from t1; select * from t1;
s s
Could not break slave Could not break slave
...@@ -41,8 +41,8 @@ Log_name ...@@ -41,8 +41,8 @@ Log_name
master-bin.003 master-bin.003
insert into t2 values (65); insert into t2 values (65);
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 60 master-bin.003 155 slave-relay-bin.001 755 master-bin.003 Yes Yes 0 0 155 127.0.0.1 root MASTER_PORT 60 master-bin.003 155 slave-relay-bin.001 755 master-bin.003 Yes Yes 0 0 155 755
select * from t2; select * from t2;
m m
34 34
...@@ -64,8 +64,8 @@ master-bin.006 445 ...@@ -64,8 +64,8 @@ master-bin.006 445
slave stop; slave stop;
slave start; slave start;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 60 master-bin.006 445 slave-relay-bin.004 1229 master-bin.006 Yes Yes 0 0 445 127.0.0.1 root MASTER_PORT 60 master-bin.006 445 slave-relay-bin.004 1229 master-bin.006 Yes Yes 0 0 445 1229
lock tables t3 read; lock tables t3 read;
select count(*) from t3 where n >= 4; select count(*) from t3 where n >= 4;
count(*) count(*)
......
...@@ -74,8 +74,8 @@ slave-bin.002 57 Query 1 4 use test; create table t1 (n int) ...@@ -74,8 +74,8 @@ slave-bin.002 57 Query 1 4 use test; create table t1 (n int)
slave-bin.002 115 Query 1 62 use test; insert into t1 values (1) slave-bin.002 115 Query 1 62 use test; insert into t1 values (1)
slave-bin.002 175 Query 1 122 use test; drop table t1 slave-bin.002 175 Query 1 122 use test; drop table t1
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Relay_log_space
127.0.0.1 root MASTER_PORT 1 master-bin.002 170 slave-relay-bin.002 916 master-bin.002 Yes Yes 0 0 170 127.0.0.1 root MASTER_PORT 1 master-bin.002 170 slave-relay-bin.002 916 master-bin.002 Yes Yes 0 0 170 920
show new master for slave with master_log_file='master-bin.001' and show new master for slave with master_log_file='master-bin.001' and
master_log_pos=4 and master_server_id=1; master_log_pos=4 and master_server_id=1;
Log_name Log_pos Log_name Log_pos
......
...@@ -906,7 +906,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) ...@@ -906,7 +906,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
uint length; uint length;
my_bool append_cache; my_bool append_cache;
my_off_t pos_in_file; my_off_t pos_in_file;
DBUG_ENTER("flush_io_cache"); DBUG_ENTER("_flush_io_cache");
if (!(append_cache = (info->type == SEQ_READ_APPEND))) if (!(append_cache = (info->type == SEQ_READ_APPEND)))
need_append_buffer_lock=0; need_append_buffer_lock=0;
......
...@@ -117,6 +117,7 @@ void my_b_seek(IO_CACHE *info,my_off_t pos) ...@@ -117,6 +117,7 @@ void my_b_seek(IO_CACHE *info,my_off_t pos)
} }
info->pos_in_file=pos; info->pos_in_file=pos;
info->seek_not_done=1; info->seek_not_done=1;
DBUG_VOID_RETURN;
} }
......
...@@ -83,7 +83,7 @@ static int find_uniq_filename(char *name) ...@@ -83,7 +83,7 @@ static int find_uniq_filename(char *name)
MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1),
name(0), log_type(LOG_CLOSED),write_error(0), name(0), log_type(LOG_CLOSED),write_error(0),
inited(0), file_id(1),no_rotate(0), inited(0), file_id(1),no_rotate(0),
need_start_event(1) need_start_event(1),bytes_written(0)
{ {
/* /*
We don't want to intialize LOCK_Log here as the thread system may We don't want to intialize LOCK_Log here as the thread system may
...@@ -99,6 +99,7 @@ MYSQL_LOG::~MYSQL_LOG() ...@@ -99,6 +99,7 @@ MYSQL_LOG::~MYSQL_LOG()
{ {
(void) pthread_mutex_destroy(&LOCK_log); (void) pthread_mutex_destroy(&LOCK_log);
(void) pthread_mutex_destroy(&LOCK_index); (void) pthread_mutex_destroy(&LOCK_index);
(void) pthread_cond_destroy(&update_cond);
} }
} }
...@@ -234,17 +235,13 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -234,17 +235,13 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
else if (log_type == LOG_BIN) else if (log_type == LOG_BIN)
{ {
bool error; bool error;
/* if (do_magic)
Explanation of the boolean black magic: {
if we are supposed to write magic number try write if (my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4) ||
clean
up if failed
then if index_file has not been previously opened, try to open it
clean up if failed
*/
if ((do_magic && my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4)) ||
open_index(O_APPEND | O_RDWR | O_CREAT)) open_index(O_APPEND | O_RDWR | O_CREAT))
goto err; goto err;
bytes_written += 4;
}
if (need_start_event && !no_auto_events) if (need_start_event && !no_auto_events)
{ {
...@@ -462,12 +459,30 @@ err: ...@@ -462,12 +459,30 @@ err:
my_delete(fname, MYF(0)); // do not report error if the file is not there my_delete(fname, MYF(0)); // do not report error if the file is not there
else else
{ {
MY_STAT s;
my_close(index_file, MYF(MY_WME)); my_close(index_file, MYF(MY_WME));
if (!my_stat(rli->relay_log_name,&s,MYF(0)))
{
sql_print_error("The first log %s failed to stat during purge",
rli->relay_log_name);
error=1;
goto err;
}
if (my_rename(fname,index_file_name,MYF(MY_WME)) || if (my_rename(fname,index_file_name,MYF(MY_WME)) ||
(index_file=my_open(index_file_name,O_BINARY|O_RDWR|O_APPEND, (index_file=my_open(index_file_name,O_BINARY|O_RDWR|O_APPEND,
MYF(MY_WME)))<0 || MYF(MY_WME)))<0 ||
my_delete(rli->relay_log_name, MYF(MY_WME))) my_delete(rli->relay_log_name, MYF(MY_WME)))
error=1; error=1;
pthread_mutex_lock(&rli->log_space_lock);
rli->log_space_total -= s.st_size;
fprintf(stderr,"purge_first_log: %ld\n", rli->log_space_total);
pthread_mutex_unlock(&rli->log_space_lock);
// ok to broadcast after the critical region as there is no risk of
// the mutex being destroyed by this thread later - this helps save
// context switches
pthread_cond_broadcast(&rli->log_space_cond);
if ((error=find_first_log(&rli->linfo,"",0/*no mutex*/))) if ((error=find_first_log(&rli->linfo,"",0/*no mutex*/)))
{ {
char buff[22]; char buff[22];
...@@ -695,6 +710,7 @@ void MYSQL_LOG::new_file(bool inside_mutex) ...@@ -695,6 +710,7 @@ void MYSQL_LOG::new_file(bool inside_mutex)
if (thd && thd->slave_thread) if (thd && thd->slave_thread)
r.flags |= LOG_EVENT_FORCED_ROTATE_F; r.flags |= LOG_EVENT_FORCED_ROTATE_F;
r.write(&log_file); r.write(&log_file);
bytes_written += r.get_event_len();
} }
// update needs to be signaled even if there is no rotate event // update needs to be signaled even if there is no rotate event
// log rotation should give the waiting thread a signal to // log rotation should give the waiting thread a signal to
...@@ -728,6 +744,7 @@ bool MYSQL_LOG::append(Log_event* ev) ...@@ -728,6 +744,7 @@ bool MYSQL_LOG::append(Log_event* ev)
error=1; error=1;
goto err; goto err;
} }
bytes_written += ev->get_event_len();
if ((uint)my_b_append_tell(&log_file) > max_binlog_size) if ((uint)my_b_append_tell(&log_file) > max_binlog_size)
{ {
new_file(1); new_file(1);
...@@ -754,6 +771,7 @@ bool MYSQL_LOG::appendv(const char* buf, uint len,...) ...@@ -754,6 +771,7 @@ bool MYSQL_LOG::appendv(const char* buf, uint len,...)
error = 1; error = 1;
break; break;
} }
bytes_written += len;
} while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint))); } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint)));
if ((uint) my_b_append_tell(&log_file) > max_binlog_size) if ((uint) my_b_append_tell(&log_file) > max_binlog_size)
......
...@@ -3042,6 +3042,8 @@ CHANGEABLE_VAR changeable_vars[] = { ...@@ -3042,6 +3042,8 @@ CHANGEABLE_VAR changeable_vars[] = {
128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE }, 128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE },
{ "record_rnd_buffer", (long*) &record_rnd_cache_size, { "record_rnd_buffer", (long*) &record_rnd_cache_size,
0, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE }, 0, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE },
{ "relay_log_space_limit", (long*) &relay_log_space_limit, 0L, 0L,ULONG_MAX,
0, 1},
{ "slave_net_timeout", (long*) &slave_net_timeout, { "slave_net_timeout", (long*) &slave_net_timeout,
SLAVE_NET_TIMEOUT, 1, LONG_TIMEOUT, 0, 1 }, SLAVE_NET_TIMEOUT, 1, LONG_TIMEOUT, 0, 1 },
{ "slow_launch_time", (long*) &slow_launch_time, { "slow_launch_time", (long*) &slow_launch_time,
......
...@@ -42,6 +42,8 @@ bool do_table_inited = 0, ignore_table_inited = 0; ...@@ -42,6 +42,8 @@ bool do_table_inited = 0, ignore_table_inited = 0;
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0; bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
bool table_rules_on = 0; bool table_rules_on = 0;
static TABLE* save_temporary_tables = 0; static TABLE* save_temporary_tables = 0;
ulong relay_log_space_limit = 0; /* TODO: fix variables to access ulonglong
values and make it ulonglong */
// when slave thread exits, we need to remember the temporary tables so we // when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start // can re-use them on slave start
...@@ -60,8 +62,10 @@ static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev); ...@@ -60,8 +62,10 @@ static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev); static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static int queue_old_event(MASTER_INFO* mi, const char* buf, static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len); uint event_len);
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi); static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli); static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
static int count_relay_log_space(RELAY_LOG_INFO* rli);
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
...@@ -264,8 +268,9 @@ void init_slave_skip_errors(char* arg) ...@@ -264,8 +268,9 @@ void init_slave_skip_errors(char* arg)
// are not running // are not running
int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
{ {
DBUG_ENTER("purge_relay_logs");
if (!rli->inited) if (!rli->inited)
return 0; /* successfully do nothing */ DBUG_RETURN(0); /* successfully do nothing */
DBUG_ASSERT(rli->slave_running == 0); DBUG_ASSERT(rli->slave_running == 0);
DBUG_ASSERT(rli->mi->slave_running == 0); DBUG_ASSERT(rli->mi->slave_running == 0);
int error=0; int error=0;
...@@ -282,14 +287,20 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) ...@@ -282,14 +287,20 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
goto err; goto err;
} }
strnmov(rli->relay_log_name,rli->linfo.log_file_name, strnmov(rli->relay_log_name,rli->linfo.log_file_name,
sizeof(rli->relay_log_name)); sizeof(rli->relay_log_name)-1);
rli->log_space_total=4; //just first log with magic number and nothing else
rli->relay_log_pos=4; rli->relay_log_pos=4;
rli->relay_log.reset_bytes_written();
rli->log_pos_current=0; rli->log_pos_current=0;
if (!just_reset) if (!just_reset)
error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg); error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg);
err: err:
#ifndef DBUG_OFF
char buf[22];
#endif
DBUG_PRINT("info",("log_space_total=%s",llstr(rli->log_space_total,buf)));
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
return error; DBUG_RETURN(error);
} }
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock) int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
...@@ -953,8 +964,9 @@ void end_master_info(MASTER_INFO* mi) ...@@ -953,8 +964,9 @@ void end_master_info(MASTER_INFO* mi)
int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
{ {
DBUG_ENTER("init_relay_log_info");
if (rli->inited) if (rli->inited)
return 0; DBUG_RETURN(0);
MY_STAT stat_area; MY_STAT stat_area;
char fname[FN_REFLEN+128]; char fname[FN_REFLEN+128];
int info_fd; int info_fd;
...@@ -970,6 +982,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) ...@@ -970,6 +982,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->log_pos_current=0; rli->log_pos_current=0;
rli->abort_pos_wait=0; rli->abort_pos_wait=0;
rli->skip_log_purge=0; rli->skip_log_purge=0;
rli->log_space_limit = relay_log_space_limit;
rli->log_space_total = 0;
// TODO: make this work with multi-master // TODO: make this work with multi-master
if (!opt_relay_logname) if (!opt_relay_logname)
{ {
...@@ -1001,7 +1015,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) ...@@ -1001,7 +1015,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
my_close(info_fd, MYF(0)); my_close(info_fd, MYF(0));
rli->info_fd=-1; rli->info_fd=-1;
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
return 1; DBUG_RETURN(1);
} }
if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg)) if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg))
goto err; goto err;
...@@ -1021,7 +1035,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) ...@@ -1021,7 +1035,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
my_close(info_fd, MYF(0)); my_close(info_fd, MYF(0));
rli->info_fd=-1; rli->info_fd=-1;
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
return 1; DBUG_RETURN(1);
} }
rli->info_fd = info_fd; rli->info_fd = info_fd;
...@@ -1052,8 +1066,13 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) ...@@ -1052,8 +1066,13 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
// before flush_relay_log_info // before flush_relay_log_info
reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1); reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
error=test(flush_relay_log_info(rli)); error=test(flush_relay_log_info(rli));
if (count_relay_log_space(rli))
{
msg="Error counting relay log space";
goto err;
}
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
return error; DBUG_RETURN(error);
err: err:
sql_print_error(msg); sql_print_error(msg);
...@@ -1061,9 +1080,66 @@ err: ...@@ -1061,9 +1080,66 @@ err:
my_close(info_fd, MYF(0)); my_close(info_fd, MYF(0));
rli->info_fd=-1; rli->info_fd=-1;
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
return 1; DBUG_RETURN(1);
} }
static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
{
MY_STAT s;
DBUG_ENTER("add_relay_log");
if (!my_stat(linfo->log_file_name,&s,MYF(0)))
{
sql_print_error("log %s listed in the index, but failed to stat",
linfo->log_file_name);
DBUG_RETURN(1);
}
rli->log_space_total += s.st_size;
#ifndef DBUG_OFF
char buf[22];
#endif
DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
DBUG_RETURN(0);
}
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{
bool slave_killed;
MASTER_INFO* mi = rli->mi;
const char* save_proc_info;
THD* thd = mi->io_thd;
DBUG_ENTER("wait_for_relay_log_space");
pthread_mutex_lock(&rli->log_space_lock);
save_proc_info = thd->proc_info;
thd->proc_info = "Waiting for relay log space to free";
while (rli->log_space_limit < rli->log_space_total &&
!(slave_killed=io_slave_killed(thd,mi)))
{
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
}
thd->proc_info = save_proc_info;
pthread_mutex_unlock(&rli->log_space_lock);
DBUG_RETURN(slave_killed);
}
static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
LOG_INFO linfo;
DBUG_ENTER("count_relay_log_space");
rli->log_space_total = 0;
if (rli->relay_log.find_first_log(&linfo,""))
{
sql_print_error("Could not find first log while counting relay log space");
DBUG_RETURN(1);
}
if (add_relay_log(rli,&linfo))
DBUG_RETURN(1);
while (!rli->relay_log.find_next_log(&linfo))
{
if (add_relay_log(rli,&linfo))
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
int init_master_info(MASTER_INFO* mi, const char* master_info_fname, int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname) const char* slave_info_fname)
...@@ -1242,6 +1318,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi) ...@@ -1242,6 +1318,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
field_list.push_back(new Item_empty_string("Last_error", 20)); field_list.push_back(new Item_empty_string("Last_error", 20));
field_list.push_back(new Item_empty_string("Skip_counter", 12)); field_list.push_back(new Item_empty_string("Skip_counter", 12));
field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12)); field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
field_list.push_back(new Item_empty_string("Relay_log_space", 12));
if(send_fields(thd, field_list, 1)) if(send_fields(thd, field_list, 1))
DBUG_RETURN(-1); DBUG_RETURN(-1);
...@@ -1268,6 +1345,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi) ...@@ -1268,6 +1345,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
net_store_data(packet, mi->rli.last_slave_error); net_store_data(packet, mi->rli.last_slave_error);
net_store_data(packet, mi->rli.slave_skip_counter); net_store_data(packet, mi->rli.slave_skip_counter);
net_store_data(packet, (longlong) mi->rli.master_log_pos); net_store_data(packet, (longlong) mi->rli.master_log_pos);
net_store_data(packet, (longlong) mi->rli.log_space_total);
pthread_mutex_unlock(&mi->rli.data_lock); pthread_mutex_unlock(&mi->rli.data_lock);
pthread_mutex_unlock(&mi->data_lock); pthread_mutex_unlock(&mi->data_lock);
...@@ -1783,6 +1861,14 @@ from master"); ...@@ -1783,6 +1861,14 @@ from master");
goto err; goto err;
} }
flush_master_info(mi); flush_master_info(mi);
if (mi->rli.log_space_limit && mi->rli.log_space_limit <
mi->rli.log_space_total)
if (wait_for_relay_log_space(&mi->rli))
{
sql_print_error("Slave I/O thread aborted while waiting for relay \
log space");
goto err;
}
// TODO: check debugging abort code // TODO: check debugging abort code
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (abort_slave_event_count && !--events_till_abort) if (abort_slave_event_count && !--events_till_abort)
...@@ -1986,7 +2072,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) ...@@ -1986,7 +2072,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
goto err; goto err;
} }
/* this dummy block is so we could insantiate Append_block_log_event /* this dummy block is so we could instantiate Append_block_log_event
once and then modify it slightly instead of doing it multiple times once and then modify it slightly instead of doing it multiple times
in the loop in the loop
*/ */
...@@ -2012,6 +2098,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) ...@@ -2012,6 +2098,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
relay log"); relay log");
goto err; goto err;
} }
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
break; break;
} }
if (unlikely(cev_not_written)) if (unlikely(cev_not_written))
...@@ -2026,6 +2113,7 @@ relay log"); ...@@ -2026,6 +2113,7 @@ relay log");
goto err; goto err;
} }
cev_not_written=0; cev_not_written=0;
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
} }
else else
{ {
...@@ -2038,6 +2126,7 @@ relay log"); ...@@ -2038,6 +2126,7 @@ relay log");
relay log"); relay log");
goto err; goto err;
} }
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
} }
} }
} }
...@@ -2145,6 +2234,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, ...@@ -2145,6 +2234,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
DBUG_ASSERT(!tmp_buf); DBUG_ASSERT(!tmp_buf);
return 1; return 1;
} }
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
} }
delete ev; delete ev;
if (likely(inc_pos)) if (likely(inc_pos))
...@@ -2198,6 +2288,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) ...@@ -2198,6 +2288,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
{ {
if (likely(inc_pos)) if (likely(inc_pos))
mi->master_log_pos += event_len; mi->master_log_pos += event_len;
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
} }
if (unlikely(processed_stop_event)) if (unlikely(processed_stop_event))
mi->ignore_stop_event=1; mi->ignore_stop_event=1;
......
...@@ -31,6 +31,7 @@ extern char* slave_load_tmpdir; ...@@ -31,6 +31,7 @@ extern char* slave_load_tmpdir;
extern my_string master_info_file,relay_log_info_file; extern my_string master_info_file,relay_log_info_file;
extern my_string opt_relay_logname, opt_relaylog_index_name; extern my_string opt_relay_logname, opt_relaylog_index_name;
extern bool opt_skip_slave_start; extern bool opt_skip_slave_start;
extern ulong relay_log_space_limit;
struct st_master_info; struct st_master_info;
/* /*
...@@ -153,6 +154,9 @@ typedef struct st_relay_log_info ...@@ -153,6 +154,9 @@ typedef struct st_relay_log_info
bool log_pos_current; bool log_pos_current;
bool abort_pos_wait; bool abort_pos_wait;
bool skip_log_purge; bool skip_log_purge;
ulonglong log_space_limit,log_space_total;
pthread_mutex_t log_space_lock;
pthread_cond_t log_space_cond;
st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0), st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0),
cur_log_init_count(0), cur_log_init_count(0),
...@@ -163,17 +167,21 @@ typedef struct st_relay_log_info ...@@ -163,17 +167,21 @@ typedef struct st_relay_log_info
bzero(&info_file,sizeof(info_file)); bzero(&info_file,sizeof(info_file));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
pthread_cond_init(&data_cond, NULL); pthread_cond_init(&data_cond, NULL);
pthread_cond_init(&start_cond, NULL); pthread_cond_init(&start_cond, NULL);
pthread_cond_init(&stop_cond, NULL); pthread_cond_init(&stop_cond, NULL);
pthread_cond_init(&log_space_cond, NULL);
} }
~st_relay_log_info() ~st_relay_log_info()
{ {
pthread_mutex_destroy(&run_lock); pthread_mutex_destroy(&run_lock);
pthread_mutex_destroy(&data_lock); pthread_mutex_destroy(&data_lock);
pthread_mutex_destroy(&log_space_lock);
pthread_cond_destroy(&data_cond); pthread_cond_destroy(&data_cond);
pthread_cond_destroy(&start_cond); pthread_cond_destroy(&start_cond);
pthread_cond_destroy(&stop_cond); pthread_cond_destroy(&stop_cond);
pthread_cond_destroy(&log_space_cond);
} }
inline void inc_pending(ulonglong val) inline void inc_pending(ulonglong val)
{ {
......
...@@ -78,12 +78,29 @@ class MYSQL_LOG { ...@@ -78,12 +78,29 @@ class MYSQL_LOG {
bool need_start_event; bool need_start_event;
pthread_cond_t update_cond; pthread_cond_t update_cond;
bool no_auto_events; // for relay binlog bool no_auto_events; // for relay binlog
ulonglong bytes_written;
friend class Log_event; friend class Log_event;
public: public:
MYSQL_LOG(); MYSQL_LOG();
~MYSQL_LOG(); ~MYSQL_LOG();
pthread_mutex_t* get_log_lock() { return &LOCK_log; } pthread_mutex_t* get_log_lock() { return &LOCK_log; }
void reset_bytes_written()
{
bytes_written = 0;
}
void harvest_bytes_written(ulonglong* counter)
{
#ifndef DBUG_OFF
char buf1[22],buf2[22];
#endif
DBUG_ENTER("harvest_bytes_written");
(*counter)+=bytes_written;
DBUG_PRINT("info",("counter=%s,bytes_written=%s", llstr(*counter,buf1),
llstr(bytes_written,buf2)));
bytes_written=0;
DBUG_VOID_RETURN;
}
IO_CACHE* get_log_file() { return &log_file; } IO_CACHE* get_log_file() { return &log_file; }
void signal_update() { pthread_cond_broadcast(&update_cond);} void signal_update() { pthread_cond_broadcast(&update_cond);}
void wait_for_update(THD* thd); void wait_for_update(THD* thd);
......
...@@ -751,7 +751,7 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -751,7 +751,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
need_relay_log_purge = 0; need_relay_log_purge = 0;
mi->rli.skip_log_purge=1; mi->rli.skip_log_purge=1;
strnmov(mi->rli.relay_log_name,lex_mi->relay_log_name, strnmov(mi->rli.relay_log_name,lex_mi->relay_log_name,
sizeof(mi->rli.relay_log_name)); sizeof(mi->rli.relay_log_name)-1);
} }
if (lex_mi->relay_log_pos) if (lex_mi->relay_log_pos)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment