Commit 2fbd1c73 authored by unknown's avatar unknown

MDEV-4506: Parallel replication.

MDEV-5189: Error handling in parallel replication.

Fix error handling in parallel worker threads when a query fails:

 - Report the error to the error log.

 - Return the error back, and set rli->abort_slave.

 - Stop executing more events after the error.
parent 6a38b594
...@@ -9,11 +9,6 @@ ...@@ -9,11 +9,6 @@
ToDo list: ToDo list:
- Error handling. If we fail in one of multiple parallel executions, we
need to make a best effort to complete prior transactions and roll back
following transactions, so slave binlog position will be correct.
And all the retry logic for temporary errors like deadlock.
- Retry of failed transactions is not yet implemented for the parallel case. - Retry of failed transactions is not yet implemented for the parallel case.
- All the waits (eg. in struct wait_for_commit and in - All the waits (eg. in struct wait_for_commit and in
...@@ -212,7 +207,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -212,7 +207,7 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly. everything is stopped and cleaned up correctly.
*/ */
if (!sql_worker_killed(thd, rgi, in_event_group)) if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group))
err= rpt_handle_event(events, rpt); err= rpt_handle_event(events, rpt);
else else
err= thd->wait_for_prior_commit(); err= thd->wait_for_prior_commit();
...@@ -228,6 +223,13 @@ handle_rpl_parallel_thread(void *arg) ...@@ -228,6 +223,13 @@ handle_rpl_parallel_thread(void *arg)
delete_or_keep_event_post_apply(rgi, event_type, events->ev); delete_or_keep_event_post_apply(rgi, event_type, events->ev);
my_free(events); my_free(events);
if (err)
{
rgi->is_error= true;
slave_output_error_info(rgi->rli, thd);
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
}
if (end_of_group) if (end_of_group)
{ {
in_event_group= false; in_event_group= false;
...@@ -785,6 +787,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -785,6 +787,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
} }
else if (!is_group_event || !current) else if (!is_group_event || !current)
{ {
int err;
/* /*
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
Same for events not preceeded by GTID (we should not see those normally, Same for events not preceeded by GTID (we should not see those normally,
...@@ -802,11 +805,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -802,11 +805,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
rev->new_log_ident, rev->ident_len+1); rev->new_log_ident, rev->ident_len+1);
} }
rpt_handle_event(qev, NULL); err= rpt_handle_event(qev, NULL);
delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev);
my_free(qev); my_free(qev);
return false; return (err != 0);
} }
else else
{ {
......
...@@ -1274,12 +1274,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, ...@@ -1274,12 +1274,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
DBA aware of the problem in the error log. DBA aware of the problem in the error log.
*/ */
} }
if (mi->using_gtid == Master_info::USE_GTID_NO)
{
DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE();); DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
if (mi->using_gtid == Master_info::USE_GTID_NO)
flush_relay_log_info(this); flush_relay_log_info(this);
DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
}
/* /*
Note that Rotate_log_event::do_apply_event() does not call this Note that Rotate_log_event::do_apply_event() does not call this
function, so there is no chance that a fake rotate event resets function, so there is no chance that a fake rotate event resets
...@@ -1453,7 +1451,7 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_) ...@@ -1453,7 +1451,7 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_)
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
tables_to_lock_count(0), trans_retries(0), last_event_start_time(0), tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
is_parallel_exec(false), is_parallel_exec(false), is_error(false),
row_stmt_start_timestamp(0), long_find_row_note_printed(false) row_stmt_start_timestamp(0), long_find_row_note_printed(false)
{ {
bzero(&current_gtid, sizeof(current_gtid)); bzero(&current_gtid, sizeof(current_gtid));
......
...@@ -558,6 +558,7 @@ struct rpl_group_info ...@@ -558,6 +558,7 @@ struct rpl_group_info
*/ */
char future_event_master_log_name[FN_REFLEN]; char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec; bool is_parallel_exec;
bool is_error;
private: private:
/* /*
......
...@@ -4078,6 +4078,92 @@ int check_temp_dir(char* tmp_file) ...@@ -4078,6 +4078,92 @@ int check_temp_dir(char* tmp_file)
} }
void
slave_output_error_info(Relay_log_info *rli, THD *thd)
{
/*
retrieve as much info as possible from the thd and, error
codes and warnings and print this to the error log as to
allow the user to locate the error
*/
uint32 const last_errno= rli->last_error().number;
char llbuff[22];
if (thd->is_error())
{
char const *const errmsg= thd->stmt_da->message();
DBUG_PRINT("info",
("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d",
thd->stmt_da->sql_errno(), last_errno));
if (last_errno == 0)
{
/*
This function is reporting an error which was not reported
while executing exec_relay_log_event().
*/
rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg);
}
else if (last_errno != thd->stmt_da->sql_errno())
{
/*
* An error was reported while executing exec_relay_log_event()
* however the error code differs from what is in the thread.
* This function prints out more information to help finding
* what caused the problem.
*/
sql_print_error("Slave (additional info): %s Error_code: %d",
errmsg, thd->stmt_da->sql_errno());
}
}
/* Print any warnings issued */
List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
MYSQL_ERROR *err;
/*
Added controlled slave thread cancel for replication
of user-defined variables.
*/
bool udf_error = false;
while ((err= it++))
{
if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY)
udf_error = true;
sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno());
}
if (udf_error)
{
String tmp;
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN("; GTID position '"));
rpl_append_gtid_state(&tmp, false);
tmp.append(STRING_WITH_LEN("'"));
}
sql_print_error("Error loading user-defined library, slave SQL "
"thread aborted. Install the missing library, and restart the "
"slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
"position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,
llbuff), tmp.c_ptr_safe());
}
else
{
String tmp;
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN("; GTID position '"));
rpl_append_gtid_state(&tmp, false);
tmp.append(STRING_WITH_LEN("'"));
}
sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
the slave SQL thread with \"SLAVE START\". We stopped at log \
'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff),
tmp.c_ptr_safe());
}
}
/** /**
Slave SQL thread entry point. Slave SQL thread entry point.
...@@ -4335,87 +4421,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ...@@ -4335,87 +4421,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
DBUG_PRINT("info", ("exec_relay_log_event() failed")); DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped // do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(serial_rgi)) if (!sql_slave_killed(serial_rgi))
{ slave_output_error_info(rli, thd);
/*
retrieve as much info as possible from the thd and, error
codes and warnings and print this to the error log as to
allow the user to locate the error
*/
uint32 const last_errno= rli->last_error().number;
if (thd->is_error())
{
char const *const errmsg= thd->stmt_da->message();
DBUG_PRINT("info",
("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d",
thd->stmt_da->sql_errno(), last_errno));
if (last_errno == 0)
{
/*
This function is reporting an error which was not reported
while executing exec_relay_log_event().
*/
rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg);
}
else if (last_errno != thd->stmt_da->sql_errno())
{
/*
* An error was reported while executing exec_relay_log_event()
* however the error code differs from what is in the thread.
* This function prints out more information to help finding
* what caused the problem.
*/
sql_print_error("Slave (additional info): %s Error_code: %d",
errmsg, thd->stmt_da->sql_errno());
}
}
/* Print any warnings issued */
List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
MYSQL_ERROR *err;
/*
Added controlled slave thread cancel for replication
of user-defined variables.
*/
bool udf_error = false;
while ((err= it++))
{
if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY)
udf_error = true;
sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno());
}
if (udf_error)
{
String tmp;
if (mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN("; GTID position '"));
rpl_append_gtid_state(&tmp, false);
tmp.append(STRING_WITH_LEN("'"));
}
sql_print_error("Error loading user-defined library, slave SQL "
"thread aborted. Install the missing library, and restart the "
"slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
"position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,
llbuff), tmp.c_ptr_safe());
}
else
{
String tmp;
if (mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN("; GTID position '"));
rpl_append_gtid_state(&tmp, false);
tmp.append(STRING_WITH_LEN("'"));
}
sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
the slave SQL thread with \"SLAVE START\". We stopped at log \
'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff),
tmp.c_ptr_safe());
}
}
goto err; goto err;
} }
} }
......
...@@ -233,6 +233,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, ...@@ -233,6 +233,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
rpl_parallel_thread *rpt); rpl_parallel_thread *rpt);
pthread_handler_t handle_slave_io(void *arg); pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(Relay_log_info *rli, THD *thd);
pthread_handler_t handle_slave_sql(void *arg); pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname); bool net_request_file(NET* net, const char* fname);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment