Commit 5018a660 authored by Sergei Golubchik's avatar Sergei Golubchik

cleanup: Log_event::read_log_event()

There are three Log_event::read_log_event() methods:
1. read the event image from IO_CACHE into String
2. create Log_event from the in-memory event image
3. read the event image from IO_CACHE and create Log_event

The 3rd was reading event image into memory and invoking the 2nd to
create Log_event. Now the 3rd also uses the 1st to read the event image
from IO_CACHE into memory, instead of duplicating its functionality.
parent 08687f7e
...@@ -2240,7 +2240,7 @@ static Exit_status check_header(IO_CACHE* file, ...@@ -2240,7 +2240,7 @@ static Exit_status check_header(IO_CACHE* file,
Format_description_log_event *new_description_event; Format_description_log_event *new_description_event;
my_b_seek(file, tmp_pos); /* seek back to event's start */ my_b_seek(file, tmp_pos); /* seek back to event's start */
if (!(new_description_event= (Format_description_log_event*) if (!(new_description_event= (Format_description_log_event*)
Log_event::read_log_event(file, glob_description_event, Log_event::read_log_event(file, 0, glob_description_event,
opt_verify_binlog_checksum))) opt_verify_binlog_checksum)))
/* EOF can't be hit here normally, so it's a real error */ /* EOF can't be hit here normally, so it's a real error */
{ {
...@@ -2274,7 +2274,7 @@ static Exit_status check_header(IO_CACHE* file, ...@@ -2274,7 +2274,7 @@ static Exit_status check_header(IO_CACHE* file,
{ {
Log_event *ev; Log_event *ev;
my_b_seek(file, tmp_pos); /* seek back to event's start */ my_b_seek(file, tmp_pos); /* seek back to event's start */
if (!(ev= Log_event::read_log_event(file, glob_description_event, if (!(ev= Log_event::read_log_event(file, 0, glob_description_event,
opt_verify_binlog_checksum))) opt_verify_binlog_checksum)))
{ {
/* EOF can't be hit here normally, so it's a real error */ /* EOF can't be hit here normally, so it's a real error */
...@@ -2388,7 +2388,7 @@ static Exit_status dump_local_log_entries(PRINT_EVENT_INFO *print_event_info, ...@@ -2388,7 +2388,7 @@ static Exit_status dump_local_log_entries(PRINT_EVENT_INFO *print_event_info,
char llbuff[21]; char llbuff[21];
my_off_t old_off = my_b_tell(file); my_off_t old_off = my_b_tell(file);
Log_event* ev = Log_event::read_log_event(file, glob_description_event, Log_event* ev = Log_event::read_log_event(file, 0, glob_description_event,
opt_verify_binlog_checksum); opt_verify_binlog_checksum);
if (!ev) if (!ev)
{ {
......
...@@ -1283,29 +1283,26 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) ...@@ -1283,29 +1283,26 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
DBUG_RETURN( ret); DBUG_RETURN( ret);
} }
#endif /* !MYSQL_CLIENT */
/** /**
This needn't be format-tolerant, because we only read This needn't be format-tolerant, because we only parse the first
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length). LOG_EVENT_MINIMAL_HEADER_LEN bytes (just need the event's length).
*/ */
int Log_event::read_log_event(IO_CACHE* file, String* packet, int Log_event::read_log_event(IO_CACHE* file, String* packet,
mysql_mutex_t* log_lock, uint8 checksum_alg_arg)
uint8 checksum_alg_arg,
const char *log_file_name_arg,
bool* is_binlog_active)
{ {
ulong data_len; ulong data_len;
int result=0;
char buf[LOG_EVENT_MINIMAL_HEADER_LEN]; char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
uchar ev_offset= packet->length(); uchar ev_offset= packet->length();
DBUG_ENTER("Log_event::read_log_event"); #ifndef max_allowed_packet
THD *thd=current_thd;
if (log_lock) ulong max_allowed_packet= thd ? thd->slave_thread ? slave_max_allowed_packet
mysql_mutex_lock(log_lock); : thd->variables.max_allowed_packet
: ~(uint)0;
if (log_file_name_arg) #endif
*is_binlog_active= mysql_bin_log.is_active(log_file_name_arg); DBUG_ENTER("Log_event::read_log_event(IO_CACHE*,String*...)");
if (my_b_read(file, (uchar*) buf, sizeof(buf))) if (my_b_read(file, (uchar*) buf, sizeof(buf)))
{ {
...@@ -1315,41 +1312,32 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, ...@@ -1315,41 +1312,32 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
update to the log. update to the log.
*/ */
DBUG_PRINT("error",("file->error: %d", file->error)); DBUG_PRINT("error",("file->error: %d", file->error));
if (!file->error) DBUG_RETURN(file->error == 0 ? LOG_READ_EOF :
result= LOG_READ_EOF; file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
else
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
goto end;
} }
data_len= uint4korr(buf + EVENT_LEN_OFFSET); data_len= uint4korr(buf + EVENT_LEN_OFFSET);
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
data_len > max(current_thd->variables.max_allowed_packet,
opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
{
DBUG_PRINT("error",("data_len: %lu", data_len));
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
LOG_READ_TOO_LARGE);
goto end;
}
/* Append the log event header to packet */ /* Append the log event header to packet */
if (packet->append(buf, sizeof(buf))) if (packet->append(buf, sizeof(buf)))
{ DBUG_RETURN(LOG_READ_MEM);
/* Failed to allocate packet */
result= LOG_READ_MEM; if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN)
goto end; DBUG_RETURN(LOG_READ_BOGUS);
}
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN; if (data_len > max(max_allowed_packet,
if (data_len) opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
DBUG_RETURN(LOG_READ_TOO_LARGE);
if (data_len > LOG_EVENT_MINIMAL_HEADER_LEN)
{ {
/* Append rest of event, read directly from file into packet */ /* Append rest of event, read directly from file into packet */
if (packet->append(file, data_len)) if (packet->append(file, data_len - LOG_EVENT_MINIMAL_HEADER_LEN))
{ {
/* /*
Fatal error occured when appending rest of the event Fatal error occured when appending rest of the event
to packet, possible failures: to packet, possible failures:
1. EOF occured when reading from file, it's really an error 1. EOF occured when reading from file, it's really an error
as data_len is >=0 there's supposed to be more bytes available. as there's supposed to be more bytes available.
file->error will have been set to number of bytes left to read file->error will have been set to number of bytes left to read
2. Read was interrupted, file->error would normally be set to -1 2. Read was interrupted, file->error would normally be set to -1
3. Failed to allocate memory for packet, my_errno 3. Failed to allocate memory for packet, my_errno
...@@ -1357,18 +1345,16 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, ...@@ -1357,18 +1345,16 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
memory allocation occurs before the call to read it might memory allocation occurs before the call to read it might
be uninitialized) be uninitialized)
*/ */
result= (my_errno == ENOMEM ? LOG_READ_MEM : DBUG_RETURN(my_errno == ENOMEM ? LOG_READ_MEM :
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO)); (file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
/* Implicit goto end; */
} }
else
{
/* Corrupt the event for Dump thread*/ /* Corrupt the event for Dump thread*/
DBUG_EXECUTE_IF("corrupt_read_log_event2", DBUG_EXECUTE_IF("corrupt_read_log_event2",
uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset; uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset;
if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
{ {
int debug_cor_pos = rand() % (data_len + sizeof(buf) - BINLOG_CHECKSUM_LEN); int debug_cor_pos = rand() % (data_len - BINLOG_CHECKSUM_LEN);
debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos]; debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos)); DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos));
DBUG_SET("-d,corrupt_read_log_event2"); DBUG_SET("-d,corrupt_read_log_event2");
...@@ -1377,33 +1363,13 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, ...@@ -1377,33 +1363,13 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
/* /*
CRC verification of the Dump thread CRC verification of the Dump thread
*/ */
if (opt_master_verify_checksum && if (event_checksum_test((uchar*) packet->ptr() + ev_offset,
event_checksum_test((uchar*) packet->ptr() + ev_offset, data_len, checksum_alg_arg))
data_len + sizeof(buf), DBUG_RETURN(LOG_READ_CHECKSUM_FAILURE);
checksum_alg_arg))
{
result= LOG_READ_CHECKSUM_FAILURE;
goto end;
}
} }
} DBUG_RETURN(0);
end:
if (log_lock)
mysql_mutex_unlock(log_lock);
DBUG_RETURN(result);
} }
#endif /* !MYSQL_CLIENT */
#ifndef MYSQL_CLIENT
#define UNLOCK_MUTEX if (log_lock) mysql_mutex_unlock(log_lock);
#define LOCK_MUTEX if (log_lock) mysql_mutex_lock(log_lock);
#else
#define UNLOCK_MUTEX
#define LOCK_MUTEX
#endif
#ifndef MYSQL_CLIENT
/** /**
@note @note
Allocates memory; The caller is responsible for clean-up. Allocates memory; The caller is responsible for clean-up.
...@@ -1413,87 +1379,61 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, ...@@ -1413,87 +1379,61 @@ Log_event* Log_event::read_log_event(IO_CACHE* file,
const Format_description_log_event const Format_description_log_event
*description_event, *description_event,
my_bool crc_check) my_bool crc_check)
#else
Log_event* Log_event::read_log_event(IO_CACHE* file,
const Format_description_log_event
*description_event,
my_bool crc_check)
#endif
{ {
DBUG_ENTER("Log_event::read_log_event"); DBUG_ENTER("Log_event::read_log_event(IO_CACHE*,Format_description_log_event*...)");
DBUG_ASSERT(description_event != 0); DBUG_ASSERT(description_event != 0);
char head[LOG_EVENT_MINIMAL_HEADER_LEN]; String event;
/*
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
check the event for sanity and to know its length; no need to really parse
it. We say "at most" because this could be a 3.23 master, which has header
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
"minimal" over the set {MySQL >=4.0}).
*/
uint header_size= MY_MIN(description_event->common_header_len,
LOG_EVENT_MINIMAL_HEADER_LEN);
LOCK_MUTEX;
DBUG_PRINT("info", ("my_b_tell: %lu", (ulong) my_b_tell(file)));
if (my_b_read(file, (uchar *) head, header_size))
{
DBUG_PRINT("info", ("Log_event::read_log_event(IO_CACHE*,Format_desc*) \
failed my_b_read"));
UNLOCK_MUTEX;
/*
No error here; it could be that we are at the file's end. However
if the next my_b_read() fails (below), it will be an error as we
were able to read the first bytes.
*/
DBUG_RETURN(0);
}
ulong data_len = uint4korr(head + EVENT_LEN_OFFSET);
char *buf= 0;
const char *error= 0; const char *error= 0;
Log_event *res= 0; Log_event *res= 0;
#ifndef max_allowed_packet
THD *thd=current_thd;
uint max_allowed_packet= thd ? slave_max_allowed_packet:~(uint)0;
#endif
if (data_len > max<ulong>(max_allowed_packet, if (log_lock)
opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER)) mysql_mutex_lock(log_lock);
{
error = "Event too big";
goto err;
}
if (data_len < header_size) switch (read_log_event(file, &event, (uint8)BINLOG_CHECKSUM_ALG_OFF))
{ {
error = "Event too small"; case 0:
break;
case LOG_READ_EOF: // no error here; we are at the file's end
goto err; goto err;
} case LOG_READ_BOGUS:
error= "Event too small";
// some events use the extra byte to null-terminate strings
if (!(buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
{
error = "Out of memory";
goto err; goto err;
} case LOG_READ_IO:
buf[data_len] = 0; error= "read error";
memcpy(buf, head, header_size); goto err;
if (my_b_read(file, (uchar*) buf + header_size, data_len - header_size)) case LOG_READ_MEM:
{ error= "Out of memory";
error = "read error"; goto err;
case LOG_READ_TRUNC:
error= "Event truncated";
goto err;
case LOG_READ_TOO_LARGE:
error= "Event too big";
goto err;
case LOG_READ_CHECKSUM_FAILURE:
default:
DBUG_ASSERT(0);
error= "internal error";
goto err; goto err;
} }
if ((res= read_log_event(buf, data_len, &error, description_event, crc_check)))
res->register_temp_buf(buf, TRUE); if ((res= read_log_event(event.ptr(), event.length(),
&error, description_event, crc_check)))
res->register_temp_buf(event.release(), true);
err: err:
UNLOCK_MUTEX; if (log_lock)
if (!res) mysql_mutex_unlock(log_lock);
if (error)
{ {
DBUG_ASSERT(error != 0); DBUG_ASSERT(!res);
sql_print_error("Error in Log_event::read_log_event(): " if (event.length() >= OLD_HEADER_LEN)
"'%s', data_len: %lu, event_type: %d", sql_print_error("Error in Log_event::read_log_event(): '%s',"
error,data_len,(uchar)(head[EVENT_TYPE_OFFSET])); " data_len: %lu, event_type: %d", error,
my_free(buf); uint4korr(event.ptr() + EVENT_LEN_OFFSET),
(uchar)(event.ptr()[EVENT_TYPE_OFFSET]));
else
sql_print_error("Error in Log_event::read_log_event(): '%s'", error);
/* /*
The SQL slave thread will check if file->error<0 to know The SQL slave thread will check if file->error<0 to know
if there was an I/O error. Even if there is no "low-level" I/O errors if there was an I/O error. Even if there is no "low-level" I/O errors
......
...@@ -1127,6 +1127,39 @@ public: ...@@ -1127,6 +1127,39 @@ public:
Log_event(); Log_event();
Log_event(THD* thd_arg, uint16 flags_arg, bool is_transactional); Log_event(THD* thd_arg, uint16 flags_arg, bool is_transactional);
/*
init_show_field_list() prepares the column names and types for the
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
EVENTS.
*/
static void init_show_field_list(THD *thd, List<Item>* field_list);
#ifdef HAVE_REPLICATION
int net_send(THD *thd, Protocol *protocol, const char* log_name,
my_off_t pos);
/*
pack_info() is used by SHOW BINLOG EVENTS; as print() it prepares and sends
a string to display to the user, so it resembles print().
*/
virtual void pack_info(THD *thd, Protocol *protocol);
#endif /* HAVE_REPLICATION */
virtual const char* get_db()
{
return thd ? thd->db : 0;
}
#else
Log_event() : temp_buf(0), flags(0) {}
/* print*() functions are used by mysqlbinlog */
virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
void print_timestamp(IO_CACHE* file, time_t *ts = 0);
void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
bool is_more);
void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
bool is_more);
#endif
/* /*
read_log_event() functions read an event from a binlog or relay read_log_event() functions read an event from a binlog or relay
log; used by SHOW BINLOG EVENTS, the binlog_dump thread on the log; used by SHOW BINLOG EVENTS, the binlog_dump thread on the
...@@ -1155,9 +1188,9 @@ public: ...@@ -1155,9 +1188,9 @@ public:
@param[in] file log file to be read @param[in] file log file to be read
@param[out] packet packet to hold the event @param[out] packet packet to hold the event
@param[in] lock the lock to be used upon read @param[in] checksum_alg_arg verify the event checksum using this
@param[in] log_file_name_arg the log's file name algorithm (or don't if it's
@param[out] is_binlog_active is the current log still active use BINLOG_CHECKSUM_ALG_OFF)
@retval 0 success @retval 0 success
@retval LOG_READ_EOF end of file, nothing was read @retval LOG_READ_EOF end of file, nothing was read
...@@ -1168,46 +1201,7 @@ public: ...@@ -1168,46 +1201,7 @@ public:
@retval LOG_READ_TOO_LARGE event too large @retval LOG_READ_TOO_LARGE event too large
*/ */
static int read_log_event(IO_CACHE* file, String* packet, static int read_log_event(IO_CACHE* file, String* packet,
mysql_mutex_t* log_lock, uint8 checksum_alg_arg);
uint8 checksum_alg_arg,
const char *log_file_name_arg = NULL,
bool* is_binlog_active = NULL);
/*
init_show_field_list() prepares the column names and types for the
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
EVENTS.
*/
static void init_show_field_list(THD *thd, List<Item>* field_list);
#ifdef HAVE_REPLICATION
int net_send(THD *thd, Protocol *protocol, const char* log_name,
my_off_t pos);
/*
pack_info() is used by SHOW BINLOG EVENTS; as print() it prepares and sends
a string to display to the user, so it resembles print().
*/
virtual void pack_info(THD *thd, Protocol *protocol);
#endif /* HAVE_REPLICATION */
virtual const char* get_db()
{
return thd ? thd->db : 0;
}
#else
Log_event() : temp_buf(0), flags(0) {}
/* avoid having to link mysqlbinlog against libpthread */
static Log_event* read_log_event(IO_CACHE* file,
const Format_description_log_event
*description_event, my_bool crc_check);
/* print*() functions are used by mysqlbinlog */
virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
void print_timestamp(IO_CACHE* file, time_t *ts = 0);
void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
bool is_more);
void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
bool is_more);
#endif
/* /*
The value is set by caller of FD constructor and The value is set by caller of FD constructor and
Log_event::write_header() for the rest. Log_event::write_header() for the rest.
......
...@@ -1200,9 +1200,9 @@ bool event_checksum_test(uchar *event_buf, ulong event_len, uint8 alg) ...@@ -1200,9 +1200,9 @@ bool event_checksum_test(uchar *event_buf, ulong event_len, uint8 alg)
DBUG_ASSERT(event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT); DBUG_ASSERT(event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
event_buf[FLAGS_OFFSET]= (uchar) flags; event_buf[FLAGS_OFFSET]= (uchar) flags;
} }
res= !(computed == incoming); res= DBUG_EVALUATE_IF("simulate_checksum_test_failure", TRUE, computed != incoming);
} }
return DBUG_EVALUATE_IF("simulate_checksum_test_failure", TRUE, res); return res;
} }
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
......
...@@ -1436,8 +1436,9 @@ gtid_state_from_pos(const char *name, uint32 offset, ...@@ -1436,8 +1436,9 @@ gtid_state_from_pos(const char *name, uint32 offset,
break; break;
packet.length(0); packet.length(0);
err= Log_event::read_log_event(&cache, &packet, NULL, err= Log_event::read_log_event(&cache, &packet,
current_checksum_alg); opt_master_verify_checksum
? current_checksum_alg : 0);
if (err) if (err)
{ {
errormsg= "Could not read binlog while searching for slave start " errormsg= "Could not read binlog while searching for slave start "
...@@ -2230,8 +2231,9 @@ static int send_format_descriptor_event(binlog_send_info *info, ...@@ -2230,8 +2231,9 @@ static int send_format_descriptor_event(binlog_send_info *info,
the binlog the binlog
*/ */
info->last_pos= my_b_tell(log); info->last_pos= my_b_tell(log);
error= Log_event::read_log_event(log, packet, /* LOCK_log */ NULL, error= Log_event::read_log_event(log, packet,
info->current_checksum_alg); opt_master_verify_checksum
? info->current_checksum_alg : 0);
linfo->pos= my_b_tell(log); linfo->pos= my_b_tell(log);
if (error) if (error)
...@@ -2566,9 +2568,9 @@ static int send_events(binlog_send_info *info, ...@@ -2566,9 +2568,9 @@ static int send_events(binlog_send_info *info,
return 1; return 1;
info->last_pos= linfo->pos; info->last_pos= linfo->pos;
error = Log_event::read_log_event(log, packet, /* LOCK_log */ NULL, error= Log_event::read_log_event(log, packet,
info->current_checksum_alg, opt_master_verify_checksum
NULL, NULL); ? info->current_checksum_alg : 0);
linfo->pos= my_b_tell(log); linfo->pos= my_b_tell(log);
if (error) if (error)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment