Commit 819c40d6 authored by Alexey Yurchenko's avatar Alexey Yurchenko Committed by Jan Lindström

- wsrep-lib update (SR cleanups and voting support) (#1359)

- TOI error ignoring fix (wsrep_ignore_apply_errors)
parent 1f54b662
...@@ -5827,7 +5827,7 @@ START SLAVE; . Query: '%s'", expected_error, thd->query()); ...@@ -5827,7 +5827,7 @@ START SLAVE; . Query: '%s'", expected_error, thd->query());
thd->get_db(), query_arg); thd->get_db(), query_arg);
thd->is_slave_error= 1; thd->is_slave_error= 1;
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (thd->wsrep_apply_toi && wsrep_must_ignore_error(thd)) if (wsrep_thd_is_toi(thd) && wsrep_must_ignore_error(thd))
{ {
thd->clear_error(1); thd->clear_error(1);
thd->killed= NOT_KILLED; thd->killed= NOT_KILLED;
......
...@@ -667,7 +667,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) ...@@ -667,7 +667,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
wsrep_po_handle(WSREP_PO_INITIALIZER), wsrep_po_handle(WSREP_PO_INITIALIZER),
wsrep_po_cnt(0), wsrep_po_cnt(0),
wsrep_apply_format(0), wsrep_apply_format(0),
wsrep_apply_toi(false),
wsrep_rbr_buf(NULL), wsrep_rbr_buf(NULL),
wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED), wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED),
wsrep_affected_rows(0), wsrep_affected_rows(0),
......
...@@ -4817,7 +4817,6 @@ class THD: public THD_count, /* this must be first */ ...@@ -4817,7 +4817,6 @@ class THD: public THD_count, /* this must be first */
rpl_sid wsrep_po_sid; rpl_sid wsrep_po_sid;
#endif /* GTID_SUPPORT */ #endif /* GTID_SUPPORT */
void *wsrep_apply_format; void *wsrep_apply_format;
bool wsrep_apply_toi; /* applier processing in TOI */
uchar* wsrep_rbr_buf; uchar* wsrep_rbr_buf;
wsrep_gtid_t wsrep_sync_wait_gtid; wsrep_gtid_t wsrep_sync_wait_gtid;
// wsrep_gtid_t wsrep_last_written_gtid; // wsrep_gtid_t wsrep_last_written_gtid;
......
...@@ -45,13 +45,13 @@ class Wsrep_non_trans_mode ...@@ -45,13 +45,13 @@ class Wsrep_non_trans_mode
{ {
m_thd->variables.option_bits&= ~OPTION_BEGIN; m_thd->variables.option_bits&= ~OPTION_BEGIN;
m_thd->server_status&= ~SERVER_STATUS_IN_TRANS; m_thd->server_status&= ~SERVER_STATUS_IN_TRANS;
m_thd->wsrep_cs().enter_toi(ws_meta); m_thd->wsrep_cs().enter_toi_mode(ws_meta);
} }
~Wsrep_non_trans_mode() ~Wsrep_non_trans_mode()
{ {
m_thd->variables.option_bits= m_option_bits; m_thd->variables.option_bits= m_option_bits;
m_thd->server_status= m_server_status; m_thd->server_status= m_server_status;
m_thd->wsrep_cs().leave_toi(); m_thd->wsrep_cs().leave_toi_mode();
} }
private: private:
Wsrep_non_trans_mode(const Wsrep_non_trans_mode&); Wsrep_non_trans_mode(const Wsrep_non_trans_mode&);
...@@ -343,7 +343,8 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle, ...@@ -343,7 +343,8 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
} }
int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) const wsrep::const_buffer& data,
wsrep::mutable_buffer&)
{ {
DBUG_ENTER("Wsrep_high_priority_service::apply_toi"); DBUG_ENTER("Wsrep_high_priority_service::apply_toi");
THD* thd= m_thd; THD* thd= m_thd;
...@@ -404,21 +405,33 @@ void Wsrep_high_priority_service::switch_execution_context(wsrep::high_priority_ ...@@ -404,21 +405,33 @@ void Wsrep_high_priority_service::switch_execution_context(wsrep::high_priority_
} }
int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_handle, int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta) const wsrep::ws_meta& ws_meta,
wsrep::mutable_buffer& err)
{ {
DBUG_ENTER("Wsrep_high_priority_service::log_dummy_write_set"); DBUG_ENTER("Wsrep_high_priority_service::log_dummy_write_set");
int ret= 0; int ret= 0;
DBUG_PRINT("info", DBUG_PRINT("info",
("Wsrep_high_priority_service::log_dummy_write_set: seqno=%lld", ("Wsrep_high_priority_service::log_dummy_write_set: seqno=%lld",
ws_meta.seqno().get())); ws_meta.seqno().get()));
m_thd->wsrep_cs().start_transaction(ws_handle, ws_meta); if (ws_meta.ordered())
WSREP_DEBUG("Log dummy write set %lld", ws_meta.seqno().get());
if (!(opt_log_slave_updates && wsrep_gtid_mode && m_thd->variables.gtid_seq_no))
{ {
m_thd->wsrep_cs().before_rollback(); wsrep::client_state& cs(m_thd->wsrep_cs());
m_thd->wsrep_cs().after_rollback(); if (!cs.transaction().active())
{
cs.start_transaction(ws_handle, ws_meta);
}
adopt_apply_error(err);
WSREP_DEBUG("Log dummy write set %lld", ws_meta.seqno().get());
ret= cs.provider().commit_order_enter(ws_handle, ws_meta);
if (!(ret && opt_log_slave_updates && wsrep_gtid_mode &&
m_thd->variables.gtid_seq_no))
{
cs.before_rollback();
cs.after_rollback();
}
ret= ret || cs.provider().commit_order_leave(ws_handle, ws_meta, err);
cs.after_applying();
} }
m_thd->wsrep_cs().after_applying();
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
...@@ -452,7 +465,8 @@ Wsrep_applier_service::~Wsrep_applier_service() ...@@ -452,7 +465,8 @@ Wsrep_applier_service::~Wsrep_applier_service()
} }
int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) const wsrep::const_buffer& data,
wsrep::mutable_buffer&)
{ {
DBUG_ENTER("Wsrep_applier_service::apply_write_set"); DBUG_ENTER("Wsrep_applier_service::apply_write_set");
THD* thd= m_thd; THD* thd= m_thd;
...@@ -606,7 +620,8 @@ Wsrep_replayer_service::~Wsrep_replayer_service() ...@@ -606,7 +620,8 @@ Wsrep_replayer_service::~Wsrep_replayer_service()
} }
int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) const wsrep::const_buffer& data,
wsrep::mutable_buffer&)
{ {
DBUG_ENTER("Wsrep_replayer_service::apply_write_set"); DBUG_ENTER("Wsrep_replayer_service::apply_write_set");
THD* thd= m_thd; THD* thd= m_thd;
......
...@@ -37,19 +37,23 @@ class Wsrep_high_priority_service : ...@@ -37,19 +37,23 @@ class Wsrep_high_priority_service :
const wsrep::ws_meta&); const wsrep::ws_meta&);
const wsrep::transaction& transaction() const; const wsrep::transaction& transaction() const;
int adopt_transaction(const wsrep::transaction&); int adopt_transaction(const wsrep::transaction&);
int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) = 0; int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&) = 0;
int append_fragment_and_commit(const wsrep::ws_handle&, int append_fragment_and_commit(const wsrep::ws_handle&,
const wsrep::ws_meta&, const wsrep::ws_meta&,
const wsrep::const_buffer&); const wsrep::const_buffer&);
int remove_fragments(const wsrep::ws_meta&); int remove_fragments(const wsrep::ws_meta&);
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&); int commit(const wsrep::ws_handle&, const wsrep::ws_meta&);
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&); int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&);
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&); int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&);
void store_globals(); void store_globals();
void reset_globals(); void reset_globals();
void switch_execution_context(wsrep::high_priority_service&); void switch_execution_context(wsrep::high_priority_service&);
int log_dummy_write_set(const wsrep::ws_handle&, int log_dummy_write_set(const wsrep::ws_handle&,
const wsrep::ws_meta&); const wsrep::ws_meta&,
wsrep::mutable_buffer&);
void adopt_apply_error(wsrep::mutable_buffer& err) {}
virtual bool check_exit_status() const = 0; virtual bool check_exit_status() const = 0;
void debug_crash(const char*); void debug_crash(const char*);
...@@ -78,7 +82,8 @@ class Wsrep_applier_service : public Wsrep_high_priority_service ...@@ -78,7 +82,8 @@ class Wsrep_applier_service : public Wsrep_high_priority_service
public: public:
Wsrep_applier_service(THD*); Wsrep_applier_service(THD*);
~Wsrep_applier_service(); ~Wsrep_applier_service();
int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&); int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&);
void after_apply(); void after_apply();
bool is_replaying() const { return false; } bool is_replaying() const { return false; }
bool check_exit_status() const; bool check_exit_status() const;
...@@ -89,7 +94,8 @@ class Wsrep_replayer_service : public Wsrep_high_priority_service ...@@ -89,7 +94,8 @@ class Wsrep_replayer_service : public Wsrep_high_priority_service
public: public:
Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd); Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd);
~Wsrep_replayer_service(); ~Wsrep_replayer_service();
int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&); int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&);
void after_apply() { } void after_apply() { }
bool is_replaying() const { return true; } bool is_replaying() const { return true; }
void replay_status(enum wsrep::provider::status status) void replay_status(enum wsrep::provider::status status)
......
...@@ -1782,7 +1782,7 @@ static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */) ...@@ -1782,7 +1782,7 @@ static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */)
if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd); if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);
if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; } if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; }
wsrep::client_state& cs(thd->wsrep_cs()); wsrep::client_state& cs(thd->wsrep_cs());
int const ret= cs.leave_toi(); int const ret= cs.leave_toi_local(wsrep::mutable_buffer());
if (ret) if (ret)
{ {
WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, " WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, "
...@@ -1850,10 +1850,10 @@ static int wsrep_TOI_begin(THD *thd, const char *db, const char *table, ...@@ -1850,10 +1850,10 @@ static int wsrep_TOI_begin(THD *thd, const char *db, const char *table,
thd_proc_info(thd, "acquiring total order isolation"); thd_proc_info(thd, "acquiring total order isolation");
wsrep::client_state& cs(thd->wsrep_cs()); wsrep::client_state& cs(thd->wsrep_cs());
int ret= cs.enter_toi(key_array, int ret= cs.enter_toi_local(key_array,
wsrep::const_buffer(buff.ptr, buff.len), wsrep::const_buffer(buff.ptr, buff.len),
wsrep::provider::flag::start_transaction | wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit); wsrep::provider::flag::commit);
if (ret) if (ret)
{ {
...@@ -1909,7 +1909,7 @@ static void wsrep_TOI_end(THD *thd) { ...@@ -1909,7 +1909,7 @@ static void wsrep_TOI_end(THD *thd) {
if (wsrep_thd_is_local_toi(thd)) if (wsrep_thd_is_local_toi(thd))
{ {
wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); wsrep_set_SE_checkpoint(client_state.toi_meta().gtid());
int ret= client_state.leave_toi(); int ret= client_state.leave_toi_local(wsrep::mutable_buffer());
if (!ret) if (!ret)
{ {
WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get()); WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get());
...@@ -2400,8 +2400,7 @@ int wsrep_must_ignore_error(THD* thd) ...@@ -2400,8 +2400,7 @@ int wsrep_must_ignore_error(THD* thd)
const uint flags= sql_command_flags[thd->lex->sql_command]; const uint flags= sql_command_flags[thd->lex->sql_command];
DBUG_ASSERT(error); DBUG_ASSERT(error);
DBUG_ASSERT((wsrep_thd_is_toi(thd)) || DBUG_ASSERT(wsrep_thd_is_toi(thd) || wsrep_thd_is_applying(thd));
(wsrep_thd_is_applying(thd) && thd->wsrep_apply_toi));
if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL)) if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL))
goto ignore_error; goto ignore_error;
......
...@@ -1343,7 +1343,8 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) ...@@ -1343,7 +1343,8 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
ws_meta); ws_meta);
} }
applier->store_globals(); applier->store_globals();
applier->apply_write_set(ws_meta, data); wsrep::mutable_buffer unused;
applier->apply_write_set(ws_meta, data, unused);
applier->after_apply(); applier->after_apply();
storage_service.store_globals(); storage_service.store_globals();
} }
......
Subproject commit fd66bdef0bbcdeb3a5189c7f93319cb5f9d77ea7 Subproject commit 0f676bd89378c7c823cff7ae7cdaef3cafcca231
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment