Commit 8eb0f5ca authored by vinchen's avatar vinchen Committed by Kristian Nielsen

Control the Maximum speed(KB/s) to read binlog from master

parent 27025221
...@@ -939,6 +939,7 @@ void my_time_init(void); ...@@ -939,6 +939,7 @@ void my_time_init(void);
extern my_hrtime_t my_hrtime(void); extern my_hrtime_t my_hrtime(void);
extern ulonglong my_interval_timer(void); extern ulonglong my_interval_timer(void);
extern ulonglong my_getcputime(void); extern ulonglong my_getcputime(void);
extern ulonglong my_micro_time();
#define microsecond_interval_timer() (my_interval_timer()/1000) #define microsecond_interval_timer() (my_interval_timer()/1000)
#define hrtime_to_time(X) ((X).val/HRTIME_RESOLUTION) #define hrtime_to_time(X) ((X).val/HRTIME_RESOLUTION)
......
...@@ -131,3 +131,34 @@ ulonglong my_getcputime() ...@@ -131,3 +131,34 @@ ulonglong my_getcputime()
#endif /* CLOCK_THREAD_CPUTIME_ID */ #endif /* CLOCK_THREAD_CPUTIME_ID */
return 0; return 0;
} }
/**
Return time in microseconds.
@remark This function is to be used to measure performance in
micro seconds. As it's not defined whats the start time
for the clock, this function us only useful to measure
time between two moments.
@retval Value in microseconds from some undefined point in time.
*/
ulonglong my_micro_time()
{
#ifdef _WIN32
ulonglong newtime;
GetSystemTimeAsFileTime((FILETIME*)&newtime);
newtime-= OFFSET_TO_EPOC;
return (newtime/10);
#else
ulonglong newtime;
struct timeval t;
/*
The following loop is here because gettimeofday may fail on some systems
*/
while (gettimeofday(&t, NULL) != 0)
{}
newtime= (ulonglong)t.tv_sec * 1000000 + t.tv_usec;
return newtime;
#endif
}
...@@ -77,6 +77,7 @@ Master_info *active_mi= 0; ...@@ -77,6 +77,7 @@ Master_info *active_mi= 0;
Master_info_index *master_info_index; Master_info_index *master_info_index;
my_bool replicate_same_server_id; my_bool replicate_same_server_id;
ulonglong relay_log_space_limit = 0; ulonglong relay_log_space_limit = 0;
ulonglong opt_read_binlog_speed_limit = 0;
const char *relay_log_index= 0; const char *relay_log_index= 0;
const char *relay_log_basename= 0; const char *relay_log_basename= 0;
...@@ -4415,6 +4416,8 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -4415,6 +4416,8 @@ pthread_handler_t handle_slave_io(void *arg)
mi->slave_running= MYSQL_SLAVE_RUN_READING; mi->slave_running= MYSQL_SLAVE_RUN_READING;
DBUG_ASSERT(mi->last_error().number == 0); DBUG_ASSERT(mi->last_error().number == 0);
ulonglong lastchecktime = my_micro_time()/1000;
ulonglong tokenamount = opt_read_binlog_speed_limit*1024;
while (!io_slave_killed(mi)) while (!io_slave_killed(mi))
{ {
ulong event_len; ulong event_len;
...@@ -4473,8 +4476,34 @@ Stopping slave I/O thread due to out-of-memory error from master"); ...@@ -4473,8 +4476,34 @@ Stopping slave I/O thread due to out-of-memory error from master");
goto err; goto err;
} }
/* Control the binlog read speed of master when read_binlog_speed_limit is non-zero
*/
ulonglong read_binlog_speed_limit = opt_read_binlog_speed_limit;
if (read_binlog_speed_limit) {
/* prevent the tokenamount become a large value,
for example, the IO thread doesn't work for a long time
*/
if (tokenamount > read_binlog_speed_limit * 1024 *2)
{
lastchecktime = my_micro_time()/1000;
tokenamount = read_binlog_speed_limit * 1024 *2;
}
do{
ulonglong currenttime = my_micro_time()/1000;
tokenamount += (currenttime - lastchecktime)*read_binlog_speed_limit*1024/1000;
lastchecktime = currenttime;
if(tokenamount < event_len)
{
ulonglong micro_sleeptime = 1000*1000*(event_len - tokenamount) / (read_binlog_speed_limit * 1024);
my_sleep(micro_sleeptime > 1000 ? micro_sleeptime : 1000); // at least sleep 1000 micro second
}
}while(tokenamount < event_len);
tokenamount -= event_len;
}
/* XXX: 'synced' should be updated by queue_event to indicate /* XXX: 'synced' should be updated by queue_event to indicate
whether event has been synced to disk */ whether event has been synced to disk */
bool synced= 0; bool synced= 0;
if (queue_event(mi, event_buf, event_len)) if (queue_event(mi, event_buf, event_len))
{ {
......
...@@ -140,6 +140,7 @@ extern my_bool opt_log_slave_updates; ...@@ -140,6 +140,7 @@ extern my_bool opt_log_slave_updates;
extern char *opt_slave_skip_errors; extern char *opt_slave_skip_errors;
extern my_bool opt_replicate_annotate_row_events; extern my_bool opt_replicate_annotate_row_events;
extern ulonglong relay_log_space_limit; extern ulonglong relay_log_space_limit;
extern ulonglong opt_read_binlog_speed_limit;
extern ulonglong slave_skipped_errors; extern ulonglong slave_skipped_errors;
extern const char *relay_log_index; extern const char *relay_log_index;
extern const char *relay_log_basename; extern const char *relay_log_basename;
......
...@@ -4630,6 +4630,12 @@ static Sys_var_charptr Sys_slave_skip_errors( ...@@ -4630,6 +4630,12 @@ static Sys_var_charptr Sys_slave_skip_errors(
READ_ONLY GLOBAL_VAR(opt_slave_skip_errors), CMD_LINE(REQUIRED_ARG), READ_ONLY GLOBAL_VAR(opt_slave_skip_errors), CMD_LINE(REQUIRED_ARG),
IN_SYSTEM_CHARSET, DEFAULT(0)); IN_SYSTEM_CHARSET, DEFAULT(0));
static Sys_var_ulonglong Sys_read_binlog_speed_limit(
"read_binlog_speed_limit", "Maximum speed(KB/s) to read binlog from"
" master (0 = no limit)",
GLOBAL_VAR(opt_read_binlog_speed_limit), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));
static Sys_var_ulonglong Sys_relay_log_space_limit( static Sys_var_ulonglong Sys_relay_log_space_limit(
"relay_log_space_limit", "Maximum space to use for all relay logs", "relay_log_space_limit", "Maximum space to use for all relay logs",
READ_ONLY GLOBAL_VAR(relay_log_space_limit), CMD_LINE(REQUIRED_ARG), READ_ONLY GLOBAL_VAR(relay_log_space_limit), CMD_LINE(REQUIRED_ARG),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment