Commit 4290117b authored by sjaakola's avatar sjaakola Committed by Nirbhay Choubey

Refs MW-252

- enveloped FTWRL processing with wsrep desync/resync calls. This way FTWRL processing node
  will not cause flow control to kick in
- donor servicing thread is unfortunate exception, we must let him to pause provider as part
  of FTWRL phase, but not desync/resync as this is done as part of donor control on higher
  level
parent da9650a3
...@@ -1071,6 +1071,16 @@ void Global_read_lock::unlock_global_read_lock(THD *thd) ...@@ -1071,6 +1071,16 @@ void Global_read_lock::unlock_global_read_lock(THD *thd)
#ifdef WITH_WSREP #ifdef WITH_WSREP
wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
wsrep->resume(wsrep); wsrep->resume(wsrep);
if (!wsrep_desync && !thd->wsrep_donor)
{
int ret = wsrep->resync(wsrep);
if (ret != WSREP_OK)
{
WSREP_WARN("resync failed %d for FTWRL: db: %s, query: %s", ret,
(thd->db ? thd->db : "(null)"), thd->query());
DBUG_VOID_RETURN;
}
}
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
} }
thd->mdl_context.release_lock(m_mdl_global_shared_lock); thd->mdl_context.release_lock(m_mdl_global_shared_lock);
...@@ -1106,7 +1116,7 @@ bool Global_read_lock::make_global_read_lock_block_commit(THD *thd) ...@@ -1106,7 +1116,7 @@ bool Global_read_lock::make_global_read_lock_block_commit(THD *thd)
*/ */
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (m_mdl_blocks_commits_lock) if (WSREP(thd) && m_mdl_blocks_commits_lock)
{ {
WSREP_DEBUG("GRL was in block commit mode when entering " WSREP_DEBUG("GRL was in block commit mode when entering "
"make_global_read_lock_block_commit"); "make_global_read_lock_block_commit");
...@@ -1131,6 +1141,35 @@ bool Global_read_lock::make_global_read_lock_block_commit(THD *thd) ...@@ -1131,6 +1141,35 @@ bool Global_read_lock::make_global_read_lock_block_commit(THD *thd)
m_state= GRL_ACQUIRED_AND_BLOCKS_COMMIT; m_state= GRL_ACQUIRED_AND_BLOCKS_COMMIT;
#ifdef WITH_WSREP #ifdef WITH_WSREP
/* Native threads should bail out before wsrep oprations to follow.
Donor servicing thread is an exception, it should pause provider but not desync,
as it is already desynced in donor state
*/
if (!WSREP(thd) && !thd->wsrep_donor)
{
DBUG_RETURN(FALSE);
}
/* if already desynced or donor, avoid double desyncing */
if (wsrep_desync || thd->wsrep_donor)
{
WSREP_DEBUG("desync set upfont, skipping implicit desync for FTWRL: %d",
wsrep_desync);
}
else
{
int rcode;
WSREP_DEBUG("running implicit desync for node");
rcode = wsrep->desync(wsrep);
if (rcode != WSREP_OK)
{
WSREP_WARN("FTWRL desync failed %d for schema: %s, query: %s",
rcode, (thd->db ? thd->db : "(null)"), thd->query());
my_message(ER_LOCK_DEADLOCK, "wsrep desync failed for FTWRL", MYF(0));
DBUG_RETURN(TRUE);
}
}
long long ret = wsrep->pause(wsrep); long long ret = wsrep->pause(wsrep);
if (ret >= 0) if (ret >= 0)
{ {
......
...@@ -1088,7 +1088,8 @@ THD::THD() ...@@ -1088,7 +1088,8 @@ THD::THD()
wsrep_po_in_trans(FALSE), wsrep_po_in_trans(FALSE),
wsrep_apply_format(0), wsrep_apply_format(0),
wsrep_apply_toi(false), wsrep_apply_toi(false),
wsrep_skip_append_keys(false) wsrep_skip_append_keys(false),
wsrep_donor(false)
#endif #endif
{ {
ulong tmp; ulong tmp;
......
...@@ -3880,6 +3880,7 @@ class THD :public Statement, ...@@ -3880,6 +3880,7 @@ class THD :public Statement,
bool wsrep_apply_toi; /* applier processing in TOI */ bool wsrep_apply_toi; /* applier processing in TOI */
bool wsrep_skip_append_keys; bool wsrep_skip_append_keys;
wsrep_gtid_t wsrep_sync_wait_gtid; wsrep_gtid_t wsrep_sync_wait_gtid;
my_bool wsrep_donor; /* true if thread is SST donor servicing */
ulong wsrep_affected_rows; ulong wsrep_affected_rows;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
}; };
......
...@@ -979,6 +979,7 @@ static void* sst_donor_thread (void* a) ...@@ -979,6 +979,7 @@ static void* sst_donor_thread (void* a)
wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
// operate with wsrep_ready == OFF // operate with wsrep_ready == OFF
thd.ptr->wsrep_donor = true;
wsp::process proc(arg->cmd, "r", arg->env); wsp::process proc(arg->cmd, "r", arg->env);
err= proc.error(); err= proc.error();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment