Commit c06bc668 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-11065: Compressed binary log

Minor review comments/changes:

 - A bunch of style-fixes.

 - Change macros to static inline functions.

 - Update check_event_type() with compressed event types.

 - Small .result file update.
parent e1c50287
...@@ -79,7 +79,7 @@ connection slave; ...@@ -79,7 +79,7 @@ connection slave;
set @@global.debug_dbug='d,simulate_slave_unaware_checksum'; set @@global.debug_dbug='d,simulate_slave_unaware_checksum';
start slave; start slave;
include/wait_for_slave_io_error.inc [errno=1236] include/wait_for_slave_io_error.inc [errno=1236]
Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 368, the last event read from 'master-bin.000010' at 4, the last byte read from 'master-bin.000010' at 249.'' Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 375, the last event read from 'master-bin.000010' at 4, the last byte read from 'master-bin.000010' at 256.''
select count(*) as zero from t1; select count(*) as zero from t1;
zero zero
0 0
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
# #
# #
# mysqlbinlog: comprssed row event # mysqlbinlog: compressed row event
# #
# #
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
# #
# #
# mysqlbinlog: comprssed query event # mysqlbinlog: compressed query event
# #
# #
......
This diff is collapsed.
...@@ -694,6 +694,11 @@ enum Log_event_type ...@@ -694,6 +694,11 @@ enum Log_event_type
/* /*
Compressed binlog event. Compressed binlog event.
Note that the order between WRITE/UPDATE/DELETE events is significant;
this is so that we can convert from the compressed to the uncompressed
event type with (type-WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT)
and similar for _V1.
*/ */
QUERY_COMPRESSED_EVENT = 165, QUERY_COMPRESSED_EVENT = 165,
WRITE_ROWS_COMPRESSED_EVENT_V1 = 166, WRITE_ROWS_COMPRESSED_EVENT_V1 = 166,
...@@ -708,15 +713,52 @@ enum Log_event_type ...@@ -708,15 +713,52 @@ enum Log_event_type
ENUM_END_EVENT /* end marker */ ENUM_END_EVENT /* end marker */
}; };
#define LOG_EVENT_IS_QUERY(type) (type == QUERY_EVENT || type == QUERY_COMPRESSED_EVENT) static inline bool LOG_EVENT_IS_QUERY(enum Log_event_type type)
#define LOG_EVENT_IS_WRITE_ROW(type) (type == WRITE_ROWS_EVENT || type == WRITE_ROWS_EVENT_V1 || type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1) {
#define LOG_EVENT_IS_UPDATE_ROW(type) (type == UPDATE_ROWS_EVENT || type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1) return type == QUERY_EVENT || type == QUERY_COMPRESSED_EVENT;
#define LOG_EVENT_IS_DELETE_ROW(type) (type == DELETE_ROWS_EVENT || type == DELETE_ROWS_EVENT_V1 || type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1) }
#define LOG_EVENT_IS_ROW_COMPRESSED(type) (type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1 ||\
type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1 ||\
type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1) static inline bool LOG_EVENT_IS_WRITE_ROW(enum Log_event_type type)
#define LOG_EVENT_IS_ROW_V2(type) (type >= WRITE_ROWS_EVENT && type <= DELETE_ROWS_EVENT || \ {
type >= WRITE_ROWS_COMPRESSED_EVENT && type <= DELETE_ROWS_COMPRESSED_EVENT ) return type == WRITE_ROWS_EVENT || type == WRITE_ROWS_EVENT_V1 ||
type == WRITE_ROWS_COMPRESSED_EVENT ||
type == WRITE_ROWS_COMPRESSED_EVENT_V1;
}
static inline bool LOG_EVENT_IS_UPDATE_ROW(enum Log_event_type type)
{
return type == UPDATE_ROWS_EVENT || type == UPDATE_ROWS_EVENT_V1 ||
type == UPDATE_ROWS_COMPRESSED_EVENT ||
type == UPDATE_ROWS_COMPRESSED_EVENT_V1;
}
static inline bool LOG_EVENT_IS_DELETE_ROW(enum Log_event_type type)
{
return type == DELETE_ROWS_EVENT || type == DELETE_ROWS_EVENT_V1 ||
type == DELETE_ROWS_COMPRESSED_EVENT ||
type == DELETE_ROWS_COMPRESSED_EVENT_V1;
}
static inline bool LOG_EVENT_IS_ROW_COMPRESSED(enum Log_event_type type)
{
return type == WRITE_ROWS_COMPRESSED_EVENT ||
type == WRITE_ROWS_COMPRESSED_EVENT_V1 ||
type == UPDATE_ROWS_COMPRESSED_EVENT ||
type == UPDATE_ROWS_COMPRESSED_EVENT_V1 ||
type == DELETE_ROWS_COMPRESSED_EVENT ||
type == DELETE_ROWS_COMPRESSED_EVENT_V1;
}
static inline bool LOG_EVENT_IS_ROW_V2(enum Log_event_type type)
{
return (type >= WRITE_ROWS_EVENT && type <= DELETE_ROWS_EVENT) ||
(type >= WRITE_ROWS_COMPRESSED_EVENT && type <= DELETE_ROWS_COMPRESSED_EVENT);
}
/* /*
......
...@@ -645,7 +645,7 @@ is_group_ending(Log_event *ev, Log_event_type event_type) ...@@ -645,7 +645,7 @@ is_group_ending(Log_event *ev, Log_event_type event_type)
{ {
if (event_type == XID_EVENT) if (event_type == XID_EVENT)
return 1; return 1;
if (event_type == QUERY_EVENT) if (event_type == QUERY_EVENT) // COMMIT/ROLLBACK are never compressed
{ {
Query_log_event *qev = (Query_log_event *)ev; Query_log_event *qev = (Query_log_event *)ev;
if (qev->is_commit()) if (qev->is_commit())
...@@ -2511,7 +2511,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -2511,7 +2511,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{ {
DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION); DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
if (typ == XID_EVENT || if (typ == XID_EVENT ||
(typ == QUERY_EVENT && (typ == QUERY_EVENT && // COMMIT/ROLLBACK are never compressed
(((Query_log_event *)ev)->is_commit() || (((Query_log_event *)ev)->is_commit() ||
((Query_log_event *)ev)->is_rollback()))) ((Query_log_event *)ev)->is_rollback())))
rli->gtid_skip_flag= GTID_SKIP_NOT; rli->gtid_skip_flag= GTID_SKIP_NOT;
......
...@@ -6153,8 +6153,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6153,8 +6153,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/ */
case QUERY_COMPRESSED_EVENT: case QUERY_COMPRESSED_EVENT:
inc_pos= event_len; inc_pos= event_len;
if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, if (query_event_uncompress(rli->relay_log.description_event_for_queue,
buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len)) checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
buf, new_buf_arr, sizeof(new_buf_arr),
&is_malloc, (char **)&new_buf, &event_len))
{ {
char llbuf[22]; char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR; error = ER_BINLOG_UNCOMPRESS_ERROR;
...@@ -6175,8 +6177,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6175,8 +6177,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
case DELETE_ROWS_COMPRESSED_EVENT_V1: case DELETE_ROWS_COMPRESSED_EVENT_V1:
inc_pos = event_len; inc_pos = event_len;
{ {
if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, if (row_log_event_uncompress(rli->relay_log.description_event_for_queue,
buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len)) checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
buf, new_buf_arr, sizeof(new_buf_arr),
&is_malloc, (char **)&new_buf, &event_len))
{ {
char llbuf[22]; char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR; error = ER_BINLOG_UNCOMPRESS_ERROR;
...@@ -6207,7 +6211,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6207,7 +6211,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_EXECUTE_IF("kill_slave_io_after_2_events", DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
{ {
if (mi->dbug_do_disconnect && if (mi->dbug_do_disconnect &&
(LOG_EVENT_IS_QUERY((uchar)buf[EVENT_TYPE_OFFSET]) || (LOG_EVENT_IS_QUERY((Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]) ||
((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT)) ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT))
&& (--mi->dbug_event_counter == 0)) && (--mi->dbug_event_counter == 0))
{ {
......
...@@ -82,6 +82,12 @@ static int check_event_type(int type, Relay_log_info *rli) ...@@ -82,6 +82,12 @@ static int check_event_type(int type, Relay_log_info *rli)
case PRE_GA_WRITE_ROWS_EVENT: case PRE_GA_WRITE_ROWS_EVENT:
case PRE_GA_UPDATE_ROWS_EVENT: case PRE_GA_UPDATE_ROWS_EVENT:
case PRE_GA_DELETE_ROWS_EVENT: case PRE_GA_DELETE_ROWS_EVENT:
case WRITE_ROWS_COMPRESSED_EVENT_V1:
case UPDATE_ROWS_COMPRESSED_EVENT_V1:
case DELETE_ROWS_COMPRESSED_EVENT_V1:
case WRITE_ROWS_COMPRESSED_EVENT:
case UPDATE_ROWS_COMPRESSED_EVENT:
case DELETE_ROWS_COMPRESSED_EVENT:
/* /*
Row events are only allowed if a Format_description_event has Row events are only allowed if a Format_description_event has
already been seen. already been seen.
......
...@@ -6961,7 +6961,6 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, ...@@ -6961,7 +6961,6 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
flush the pending rows event if necessary. flush the pending rows event if necessary.
*/ */
{ {
Log_event* ev = NULL;
int error = 0; int error = 0;
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment