Commit d817267a authored by Vicențiu Ciorbaru's avatar Vicențiu Ciorbaru

[MDEV-6877] Change replication event loop to account for empty events

When writing rows with a minimal row image, it is possible to receive
empty events. In that case m_curr_row and m_rows_end are the same,
however the event implies an insert into the table with the default
values associated for that table.
parent 50955075
...@@ -9907,7 +9907,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -9907,7 +9907,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
rgi->set_row_stmt_start_timestamp(); rgi->set_row_stmt_start_timestamp();
THD_STAGE_INFO(thd, stage_executing); THD_STAGE_INFO(thd, stage_executing);
while (error == 0 && m_curr_row < m_rows_end) do
{ {
/* in_use can have been set to NULL in close_tables_for_reopen */ /* in_use can have been set to NULL in close_tables_for_reopen */
THD* old_thd= table->in_use; THD* old_thd= table->in_use;
...@@ -9956,17 +9956,13 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -9956,17 +9956,13 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (!m_curr_row_end && !error) if (!m_curr_row_end && !error)
error= unpack_current_row(rgi); error= unpack_current_row(rgi);
// at this moment m_curr_row_end should be set
DBUG_ASSERT(error || m_curr_row_end != NULL);
DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
m_curr_row= m_curr_row_end; m_curr_row= m_curr_row_end;
if (error == 0 && !transactional_table) if (error == 0 && !transactional_table)
thd->transaction.all.modified_non_trans_table= thd->transaction.all.modified_non_trans_table=
thd->transaction.stmt.modified_non_trans_table= TRUE; thd->transaction.stmt.modified_non_trans_table= TRUE;
} // row processing loop } // row processing loop
while (error == 0 && (m_curr_row != m_rows_end));
/* /*
Restore the sql_mode after the rows event is processed. Restore the sql_mode after the rows event is processed.
...@@ -11395,7 +11391,16 @@ Rows_log_event::write_row(rpl_group_info *rgi, ...@@ -11395,7 +11391,16 @@ Rows_log_event::write_row(rpl_group_info *rgi,
the size of the first row and use that value to initialize the size of the first row and use that value to initialize
storage engine for bulk insertion. storage engine for bulk insertion.
*/ */
ulong estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row); /* this is the first row to be inserted, we estimate the rows with
the size of the first row and use that value to initialize
storage engine for bulk insertion */
DBUG_ASSERT(!(m_curr_row > m_curr_row_end));
ulong estimated_rows= 0;
if (m_curr_row < m_curr_row_end)
estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
else if (m_curr_row == m_curr_row_end)
estimated_rows= 1;
table->file->ha_start_bulk_insert(estimated_rows); table->file->ha_start_bulk_insert(estimated_rows);
} }
......
...@@ -4409,20 +4409,23 @@ protected: ...@@ -4409,20 +4409,23 @@ protected:
int find_row(rpl_group_info *); int find_row(rpl_group_info *);
int write_row(rpl_group_info *, const bool); int write_row(rpl_group_info *, const bool);
// Unpack the current row into m_table->record[0] // Unpack the current row into m_table->record[0], but with
// a different columns bitmap.
int unpack_current_row(rpl_group_info *rgi, MY_BITMAP const *cols) int unpack_current_row(rpl_group_info *rgi, MY_BITMAP const *cols)
{ {
DBUG_ASSERT(m_table); DBUG_ASSERT(m_table);
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT); ASSERT_OR_RETURN_ERROR(m_curr_row <= m_rows_end, HA_ERR_CORRUPT_EVENT);
return ::unpack_row(rgi, m_table, m_width, m_curr_row, cols, return ::unpack_row(rgi, m_table, m_width, m_curr_row, cols,
&m_curr_row_end, &m_master_reclength, m_rows_end); &m_curr_row_end, &m_master_reclength, m_rows_end);
} }
// Unpack the current row into m_table->record[0]
int unpack_current_row(rpl_group_info *rgi) int unpack_current_row(rpl_group_info *rgi)
{ {
DBUG_ASSERT(m_table); DBUG_ASSERT(m_table);
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT); ASSERT_OR_RETURN_ERROR(m_curr_row <= m_rows_end, HA_ERR_CORRUPT_EVENT);
return ::unpack_row(rgi, m_table, m_width, m_curr_row, &m_cols, return ::unpack_row(rgi, m_table, m_width, m_curr_row, &m_cols,
&m_curr_row_end, &m_master_reclength, m_rows_end); &m_curr_row_end, &m_master_reclength, m_rows_end);
} }
......
...@@ -7521,7 +7521,6 @@ struct my_option my_long_options[]= ...@@ -7521,7 +7521,6 @@ struct my_option my_long_options[]=
MYSQL_COMPATIBILITY_OPTION("log-bin-use-v1-row-events"), MYSQL_COMPATIBILITY_OPTION("log-bin-use-v1-row-events"),
MYSQL_TO_BE_IMPLEMENTED_OPTION("default-authentication-plugin"), MYSQL_TO_BE_IMPLEMENTED_OPTION("default-authentication-plugin"),
MYSQL_COMPATIBILITY_OPTION("binlog-max-flush-queue-time"), MYSQL_COMPATIBILITY_OPTION("binlog-max-flush-queue-time"),
MYSQL_TO_BE_IMPLEMENTED_OPTION("binlog-row-image"),
MYSQL_TO_BE_IMPLEMENTED_OPTION("explicit-defaults-for-timestamp"), MYSQL_TO_BE_IMPLEMENTED_OPTION("explicit-defaults-for-timestamp"),
MYSQL_COMPATIBILITY_OPTION("master-info-repository"), MYSQL_COMPATIBILITY_OPTION("master-info-repository"),
MYSQL_COMPATIBILITY_OPTION("relay-log-info-repository"), MYSQL_COMPATIBILITY_OPTION("relay-log-info-repository"),
......
...@@ -209,6 +209,16 @@ unpack_row(rpl_group_info *rgi, ...@@ -209,6 +209,16 @@ unpack_row(rpl_group_info *rgi,
Field **field_ptr; Field **field_ptr;
Field **const end_ptr= begin_ptr + colcnt; Field **const end_ptr= begin_ptr + colcnt;
if (bitmap_is_clear_all(cols))
{
/**
There was no data sent from the master, so there is
nothing to unpack.
*/
*current_row_end= pack_ptr;
*master_reclength= 0;
DBUG_RETURN(error);
}
DBUG_ASSERT(null_ptr < row_data + master_null_byte_count); DBUG_ASSERT(null_ptr < row_data + master_null_byte_count);
// Mask to mask out the correct bit among the null bits // Mask to mask out the correct bit among the null bits
......
...@@ -2132,7 +2132,7 @@ public: ...@@ -2132,7 +2132,7 @@ public:
const uchar *buf); const uchar *buf);
int binlog_update_row(TABLE* table, bool is_transactional, int binlog_update_row(TABLE* table, bool is_transactional,
const uchar *old_data, const uchar *new_data); const uchar *old_data, const uchar *new_data);
void binlog_prepare_row_images(TABLE* table); static void binlog_prepare_row_images(TABLE* table);
void set_server_id(uint32 sid) { variables.server_id = sid; } void set_server_id(uint32 sid) { variables.server_id = sid; }
......
...@@ -5975,7 +5975,7 @@ void TABLE::mark_columns_needed_for_insert() ...@@ -5975,7 +5975,7 @@ void TABLE::mark_columns_needed_for_insert()
the read_set at binlogging time (for those cases that the read_set at binlogging time (for those cases that
we only want to log a PK and we needed other fields for we only want to log a PK and we needed other fields for
execution). execution).
*/ */
void TABLE::mark_columns_per_binlog_row_image() void TABLE::mark_columns_per_binlog_row_image()
{ {
DBUG_ENTER("mark_columns_per_binlog_row_image"); DBUG_ENTER("mark_columns_per_binlog_row_image");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment