Commit 9bf479b0 authored by Monty's avatar Monty

Update galera to work with independent sub transactions

parent 4102f158
......@@ -61,6 +61,7 @@
#include "wsrep_xid.h"
#include "wsrep_thd.h"
#include "wsrep_trans_observer.h" /* wsrep transaction hooks */
#include "wsrep_var.h" /* wsrep_hton_check() */
#endif /* WITH_WSREP */
/**
......@@ -6582,6 +6583,9 @@ int handler::ha_reset()
static int wsrep_after_row(THD *thd)
{
DBUG_ENTER("wsrep_after_row");
if (thd->internal_transaction())
DBUG_RETURN(0);
/* enforce wsrep_max_ws_rows */
thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
......
......@@ -5808,6 +5808,8 @@ start_new_trans::start_new_trans(THD *thd)
server_status= thd->server_status;
m_transaction_psi= thd->m_transaction_psi;
thd->m_transaction_psi= 0;
wsrep_on= thd->variables.wsrep_on;
thd->variables.wsrep_on= 0;
thd->server_status&= ~(SERVER_STATUS_IN_TRANS |
SERVER_STATUS_IN_TRANS_READONLY);
thd->server_status|= SERVER_STATUS_AUTOCOMMIT;
......@@ -5826,6 +5828,7 @@ void start_new_trans::restore_old_transaction()
if (org_thd->m_transaction_psi)
MYSQL_COMMIT_TRANSACTION(org_thd->m_transaction_psi);
org_thd->m_transaction_psi= m_transaction_psi;
org_thd->variables.wsrep_on= wsrep_on;
org_thd= 0;
}
......
......@@ -5170,6 +5170,7 @@ class start_new_trans
THD *org_thd;
uint in_sub_stmt;
uint server_status;
my_bool wsrep_on;
public:
start_new_trans(THD *thd);
......
......@@ -3075,6 +3075,7 @@ void wsrep_commit_empty(THD* thd, bool all)
if (wsrep_is_real(thd, all) &&
wsrep_thd_is_local(thd) &&
thd->wsrep_trx().active() &&
!thd->internal_transaction() &&
thd->wsrep_trx().state() != wsrep::transaction::s_committed)
{
/* @todo CTAS with STATEMENT binlog format and empty result set
......
......@@ -34,7 +34,8 @@ void wsrep_commit_empty(THD* thd, bool all);
static inline bool wsrep_is_active(THD* thd)
{
return (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
thd->wsrep_cs().transaction().active());
thd->wsrep_cs().transaction().active() &&
!thd->internal_transaction());
}
/*
......@@ -340,6 +341,8 @@ static inline int wsrep_after_commit(THD* thd, bool all)
(long long)wsrep_thd_trx_seqno(thd),
wsrep_has_changes(thd));
DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
if (thd->internal_transaction())
DBUG_RETURN(0);
int ret= 0;
if (thd->wsrep_trx().state() == wsrep::transaction::s_committing)
{
......@@ -409,7 +412,8 @@ static inline int wsrep_after_rollback(THD* thd, bool all)
static inline int wsrep_before_statement(THD* thd)
{
return (thd->wsrep_cs().state() != wsrep::client_state::s_none ?
return (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
!thd->internal_transaction() ?
thd->wsrep_cs().before_statement() : 0);
}
......@@ -417,7 +421,8 @@ static inline
int wsrep_after_statement(THD* thd)
{
DBUG_ENTER("wsrep_after_statement");
DBUG_RETURN(thd->wsrep_cs().state() != wsrep::client_state::s_none ?
DBUG_RETURN(thd->wsrep_cs().state() != wsrep::client_state::s_none &&
!thd->internal_transaction() ?
thd->wsrep_cs().after_statement() : 0);
}
......@@ -425,7 +430,8 @@ static inline void wsrep_after_apply(THD* thd)
{
DBUG_ASSERT(wsrep_thd_is_applying(thd));
WSREP_DEBUG("wsrep_after_apply %lld", thd->thread_id);
thd->wsrep_cs().after_applying();
if (!thd->internal_transaction())
thd->wsrep_cs().after_applying();
}
static inline void wsrep_open(THD* thd)
......@@ -448,7 +454,8 @@ static inline void wsrep_open(THD* thd)
static inline void wsrep_close(THD* thd)
{
DBUG_ENTER("wsrep_close");
if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
if (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
!thd->internal_transaction())
{
thd->wsrep_cs().close();
}
......@@ -459,7 +466,8 @@ static inline void
wsrep_wait_rollback_complete_and_acquire_ownership(THD *thd)
{
DBUG_ENTER("wsrep_wait_rollback_complete_and_acquire_ownership");
if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
if (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
!thd->internal_transaction())
{
thd->wsrep_cs().wait_rollback_complete_and_acquire_ownership();
}
......@@ -468,8 +476,9 @@ wsrep_wait_rollback_complete_and_acquire_ownership(THD *thd)
static inline int wsrep_before_command(THD* thd)
{
return (thd->wsrep_cs().state() != wsrep::client_state::s_none ?
thd->wsrep_cs().before_command() : 0);
return (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
!thd->internal_transaction() ?
thd->wsrep_cs().before_command() : 0);
}
/*
Called after each command.
......@@ -478,7 +487,8 @@ static inline int wsrep_before_command(THD* thd)
*/
static inline void wsrep_after_command_before_result(THD* thd)
{
if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
if (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
!thd->internal_transaction())
{
thd->wsrep_cs().after_command_before_result();
}
......@@ -486,7 +496,8 @@ static inline void wsrep_after_command_before_result(THD* thd)
static inline void wsrep_after_command_after_result(THD* thd)
{
if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
if (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
!thd->internal_transaction())
{
thd->wsrep_cs().after_command_after_result();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment