Commit 82b974a1 authored by Kristian Nielsen's avatar Kristian Nielsen

Merge MDEV-11064: "Restrict the speed of reading binlog from Master" into 10.2

parents 27025221 9970e81f
...@@ -109,6 +109,8 @@ my_bool net_write_command(NET *net,unsigned char command, ...@@ -109,6 +109,8 @@ my_bool net_write_command(NET *net,unsigned char command,
const unsigned char *packet, size_t len); const unsigned char *packet, size_t len);
int net_real_write(NET *net,const unsigned char *packet, size_t len); int net_real_write(NET *net,const unsigned char *packet, size_t len);
unsigned long my_net_read_packet(NET *net, my_bool read_from_server); unsigned long my_net_read_packet(NET *net, my_bool read_from_server);
ulong my_net_read_packet_reallen(NET *net, my_bool read_from_server,
ulong* reallen);
struct sockaddr; struct sockaddr;
int my_connect(my_socket s, const struct sockaddr *name, unsigned int namelen, int my_connect(my_socket s, const struct sockaddr *name, unsigned int namelen,
unsigned int timeout); unsigned int timeout);
......
...@@ -585,6 +585,8 @@ my_bool net_write_command(NET *net,unsigned char command, ...@@ -585,6 +585,8 @@ my_bool net_write_command(NET *net,unsigned char command,
const unsigned char *packet, size_t len); const unsigned char *packet, size_t len);
int net_real_write(NET *net,const unsigned char *packet, size_t len); int net_real_write(NET *net,const unsigned char *packet, size_t len);
unsigned long my_net_read_packet(NET *net, my_bool read_from_server); unsigned long my_net_read_packet(NET *net, my_bool read_from_server);
ulong my_net_read_packet_reallen(NET *net, my_bool read_from_server,
ulong* reallen);
#define my_net_read(A) my_net_read_packet((A), 0) #define my_net_read(A) my_net_read_packet((A), 0)
#ifdef MY_GLOBAL_INCLUDED #ifdef MY_GLOBAL_INCLUDED
......
...@@ -104,6 +104,7 @@ cli_advanced_command(MYSQL *mysql, enum enum_server_command command, ...@@ -104,6 +104,7 @@ cli_advanced_command(MYSQL *mysql, enum enum_server_command command,
const unsigned char *arg, ulong arg_length, const unsigned char *arg, ulong arg_length,
my_bool skip_check, MYSQL_STMT *stmt); my_bool skip_check, MYSQL_STMT *stmt);
unsigned long cli_safe_read(MYSQL *mysql); unsigned long cli_safe_read(MYSQL *mysql);
unsigned long cli_safe_read_reallen(MYSQL *mysql, ulong* reallen);
void net_clear_error(NET *net); void net_clear_error(NET *net);
void set_stmt_errmsg(MYSQL_STMT *stmt, NET *net); void set_stmt_errmsg(MYSQL_STMT *stmt, NET *net);
void set_stmt_error(MYSQL_STMT *stmt, int errcode, const char *sqlstate, void set_stmt_error(MYSQL_STMT *stmt, int errcode, const char *sqlstate,
......
...@@ -774,6 +774,9 @@ The following options may be given as the first argument: ...@@ -774,6 +774,9 @@ The following options may be given as the first argument:
--range-alloc-block-size=# --range-alloc-block-size=#
Allocation block size for storing ranges during Allocation block size for storing ranges during
optimization optimization
--read-binlog-speed-limit=#
Maximum speed(KB/s) to read binlog from master (0 = no
limit)
--read-buffer-size=# --read-buffer-size=#
Each thread that does a sequential scan allocates a Each thread that does a sequential scan allocates a
buffer of this size for each table it scans. If you do buffer of this size for each table it scans. If you do
...@@ -1399,6 +1402,7 @@ query-cache-type OFF ...@@ -1399,6 +1402,7 @@ query-cache-type OFF
query-cache-wlock-invalidate FALSE query-cache-wlock-invalidate FALSE
query-prealloc-size 24576 query-prealloc-size 24576
range-alloc-block-size 4096 range-alloc-block-size 4096
read-binlog-speed-limit 0
read-buffer-size 131072 read-buffer-size 131072
read-only FALSE read-only FALSE
read-rnd-buffer-size 262144 read-rnd-buffer-size 262144
......
...@@ -3439,6 +3439,20 @@ NUMERIC_BLOCK_SIZE 1024 ...@@ -3439,6 +3439,20 @@ NUMERIC_BLOCK_SIZE 1024
ENUM_VALUE_LIST NULL ENUM_VALUE_LIST NULL
READ_ONLY NO READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME READ_BINLOG_SPEED_LIMIT
SESSION_VALUE NULL
GLOBAL_VALUE 0
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 0
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT Maximum speed(KB/s) to read binlog from master (0 = no limit)
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 18446744073709551615
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME READ_BUFFER_SIZE VARIABLE_NAME READ_BUFFER_SIZE
SESSION_VALUE 131072 SESSION_VALUE 131072
GLOBAL_VALUE 131072 GLOBAL_VALUE 131072
......
...@@ -569,16 +569,22 @@ HANDLE create_shared_memory(MYSQL *mysql,NET *net, uint connect_timeout) ...@@ -569,16 +569,22 @@ HANDLE create_shared_memory(MYSQL *mysql,NET *net, uint connect_timeout)
Error message is set. Error message is set.
@retval @retval
*/ */
ulong ulong
cli_safe_read(MYSQL *mysql) cli_safe_read(MYSQL *mysql)
{
ulong reallen = 0;
return cli_safe_read_reallen(mysql, &reallen);
}
ulong
cli_safe_read_reallen(MYSQL *mysql, ulong* reallen)
{ {
NET *net= &mysql->net; NET *net= &mysql->net;
ulong len=0; ulong len=0;
restart: restart:
if (net->vio != 0) if (net->vio != 0)
len= my_net_read_packet(net, 0); len= my_net_read_packet_reallen(net, 0, reallen);
if (len == packet_error || len == 0) if (len == packet_error || len == 0)
{ {
......
...@@ -1133,15 +1133,22 @@ ulong my_net_read(NET *net) ...@@ -1133,15 +1133,22 @@ ulong my_net_read(NET *net)
The function returns the length of the found packet or packet_error. The function returns the length of the found packet or packet_error.
net->read_pos points to the read data. net->read_pos points to the read data.
*/ */
ulong
my_net_read_packet(NET *net, my_bool read_from_server)
{
ulong reallen = 0;
return my_net_read_packet_reallen(net, read_from_server, &reallen);
}
ulong ulong
my_net_read_packet(NET *net, my_bool read_from_server) my_net_read_packet_reallen(NET *net, my_bool read_from_server, ulong* reallen)
{ {
size_t len, complen; size_t len, complen;
MYSQL_NET_READ_START(); MYSQL_NET_READ_START();
*reallen = 0;
#ifdef HAVE_COMPRESS #ifdef HAVE_COMPRESS
if (!net->compress) if (!net->compress)
{ {
...@@ -1164,7 +1171,10 @@ my_net_read_packet(NET *net, my_bool read_from_server) ...@@ -1164,7 +1171,10 @@ my_net_read_packet(NET *net, my_bool read_from_server)
} }
net->read_pos = net->buff + net->where_b; net->read_pos = net->buff + net->where_b;
if (len != packet_error) if (len != packet_error)
{
net->read_pos[len]=0; /* Safeguard for mysql_use_result */ net->read_pos[len]=0; /* Safeguard for mysql_use_result */
*reallen = len;
}
MYSQL_NET_READ_DONE(0, len); MYSQL_NET_READ_DONE(0, len);
return len; return len;
#ifdef HAVE_COMPRESS #ifdef HAVE_COMPRESS
...@@ -1265,6 +1275,7 @@ my_net_read_packet(NET *net, my_bool read_from_server) ...@@ -1265,6 +1275,7 @@ my_net_read_packet(NET *net, my_bool read_from_server)
return packet_error; return packet_error;
} }
buf_length+= complen; buf_length+= complen;
*reallen += packet_len;
} }
net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE; net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE;
......
...@@ -77,6 +77,7 @@ Master_info *active_mi= 0; ...@@ -77,6 +77,7 @@ Master_info *active_mi= 0;
Master_info_index *master_info_index; Master_info_index *master_info_index;
my_bool replicate_same_server_id; my_bool replicate_same_server_id;
ulonglong relay_log_space_limit = 0; ulonglong relay_log_space_limit = 0;
ulonglong opt_read_binlog_speed_limit = 0;
const char *relay_log_index= 0; const char *relay_log_index= 0;
const char *relay_log_basename= 0; const char *relay_log_basename= 0;
...@@ -3307,13 +3308,15 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, ...@@ -3307,13 +3308,15 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
try a reconnect. We do not want to print anything to try a reconnect. We do not want to print anything to
the error log in this case because this a anormal the error log in this case because this a anormal
event in an idle server. event in an idle server.
network_read_len get the real network read length in VIO, especially using compressed protocol
RETURN VALUES RETURN VALUES
'packet_error' Error 'packet_error' Error
number Length of packet number Length of packet
*/ */
static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
ulong* network_read_len)
{ {
ulong len; ulong len;
DBUG_ENTER("read_event"); DBUG_ENTER("read_event");
...@@ -3328,7 +3331,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) ...@@ -3328,7 +3331,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
DBUG_RETURN(packet_error); DBUG_RETURN(packet_error);
#endif #endif
len = cli_safe_read(mysql); len = cli_safe_read_reallen(mysql, network_read_len);
if (len == packet_error || (long) len < 1) if (len == packet_error || (long) len < 1)
{ {
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
...@@ -4415,9 +4418,11 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -4415,9 +4418,11 @@ pthread_handler_t handle_slave_io(void *arg)
mi->slave_running= MYSQL_SLAVE_RUN_READING; mi->slave_running= MYSQL_SLAVE_RUN_READING;
DBUG_ASSERT(mi->last_error().number == 0); DBUG_ASSERT(mi->last_error().number == 0);
ulonglong lastchecktime = my_hrtime().val;
ulonglong tokenamount = opt_read_binlog_speed_limit*1024;
while (!io_slave_killed(mi)) while (!io_slave_killed(mi))
{ {
ulong event_len; ulong event_len, network_read_len = 0;
/* /*
We say "waiting" because read_event() will wait if there's nothing to We say "waiting" because read_event() will wait if there's nothing to
read. But if there's something to read, it will not wait. The read. But if there's something to read, it will not wait. The
...@@ -4425,7 +4430,7 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -4425,7 +4430,7 @@ pthread_handler_t handle_slave_io(void *arg)
we're in fact receiving nothing. we're in fact receiving nothing.
*/ */
THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event); THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event);
event_len= read_event(mysql, mi, &suppress_warnings); event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len);
if (check_io_slave_killed(mi, NullS)) if (check_io_slave_killed(mi, NullS))
goto err; goto err;
...@@ -4473,6 +4478,47 @@ Stopping slave I/O thread due to out-of-memory error from master"); ...@@ -4473,6 +4478,47 @@ Stopping slave I/O thread due to out-of-memory error from master");
goto err; goto err;
} }
/* Control the binlog read speed of master
when read_binlog_speed_limit is non-zero
*/
ulonglong speed_limit_in_bytes = opt_read_binlog_speed_limit * 1024;
if (speed_limit_in_bytes)
{
/* Prevent the tokenamount become a large value,
for example, the IO thread doesn't work for a long time
*/
if (tokenamount > speed_limit_in_bytes * 2)
{
lastchecktime = my_hrtime().val;
tokenamount = speed_limit_in_bytes * 2;
}
do
{
ulonglong currenttime = my_hrtime().val;
tokenamount += (currenttime - lastchecktime) * speed_limit_in_bytes / (1000*1000);
lastchecktime = currenttime;
if(tokenamount < network_read_len)
{
ulonglong micro_time = 1000*1000 * (network_read_len - tokenamount) / speed_limit_in_bytes ;
ulonglong second_time = micro_time / (1000 * 1000);
micro_time = micro_time % (1000 * 1000);
// at least sleep 1000 micro second
my_sleep(micro_time > 1000 ? micro_time : 1000);
/*
If it sleep more than one second,
it should use slave_sleep() to avoid the STOP SLAVE hang.
*/
if (second_time)
slave_sleep(thd, second_time, io_slave_killed, mi);
}
}while(tokenamount < network_read_len);
tokenamount -= network_read_len;
}
/* XXX: 'synced' should be updated by queue_event to indicate /* XXX: 'synced' should be updated by queue_event to indicate
whether event has been synced to disk */ whether event has been synced to disk */
bool synced= 0; bool synced= 0;
......
...@@ -140,6 +140,7 @@ extern my_bool opt_log_slave_updates; ...@@ -140,6 +140,7 @@ extern my_bool opt_log_slave_updates;
extern char *opt_slave_skip_errors; extern char *opt_slave_skip_errors;
extern my_bool opt_replicate_annotate_row_events; extern my_bool opt_replicate_annotate_row_events;
extern ulonglong relay_log_space_limit; extern ulonglong relay_log_space_limit;
extern ulonglong opt_read_binlog_speed_limit;
extern ulonglong slave_skipped_errors; extern ulonglong slave_skipped_errors;
extern const char *relay_log_index; extern const char *relay_log_index;
extern const char *relay_log_basename; extern const char *relay_log_basename;
......
...@@ -4630,6 +4630,12 @@ static Sys_var_charptr Sys_slave_skip_errors( ...@@ -4630,6 +4630,12 @@ static Sys_var_charptr Sys_slave_skip_errors(
READ_ONLY GLOBAL_VAR(opt_slave_skip_errors), CMD_LINE(REQUIRED_ARG), READ_ONLY GLOBAL_VAR(opt_slave_skip_errors), CMD_LINE(REQUIRED_ARG),
IN_SYSTEM_CHARSET, DEFAULT(0)); IN_SYSTEM_CHARSET, DEFAULT(0));
static Sys_var_ulonglong Sys_read_binlog_speed_limit(
"read_binlog_speed_limit", "Maximum speed(KB/s) to read binlog from"
" master (0 = no limit)",
GLOBAL_VAR(opt_read_binlog_speed_limit), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));
static Sys_var_ulonglong Sys_relay_log_space_limit( static Sys_var_ulonglong Sys_relay_log_space_limit(
"relay_log_space_limit", "Maximum space to use for all relay logs", "relay_log_space_limit", "Maximum space to use for all relay logs",
READ_ONLY GLOBAL_VAR(relay_log_space_limit), CMD_LINE(REQUIRED_ARG), READ_ONLY GLOBAL_VAR(relay_log_space_limit), CMD_LINE(REQUIRED_ARG),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment