Commit 07140f17 authored by Sergey Vojtovich's avatar Sergey Vojtovich

Just move, no code changes otherwise.

Part of MDEV-7974 - backport fix for mysql bug#12161 (XA and binlog)
parent ca7fbcea
...@@ -126,6 +126,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ...@@ -126,6 +126,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/rowid_filter.cc ../sql/rowid_filter.h ../sql/rowid_filter.cc ../sql/rowid_filter.h
../sql/item_vers.cc ../sql/item_vers.cc
../sql/opt_trace.cc ../sql/opt_trace.cc
../sql/xa.cc
${GEN_SOURCES} ${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE} ${MYSYS_LIBWRAP_SOURCE}
) )
......
...@@ -143,7 +143,7 @@ SET (SQL_SOURCE ...@@ -143,7 +143,7 @@ SET (SQL_SOURCE
opt_trace.cc opt_trace.cc
${WSREP_SOURCES} ${WSREP_SOURCES}
table_cache.cc encryption.cc temporary_tables.cc table_cache.cc encryption.cc temporary_tables.cc
proxy_protocol.cc backup.cc proxy_protocol.cc backup.cc xa.cc
${CMAKE_CURRENT_BINARY_DIR}/sql_builtin.cc ${CMAKE_CURRENT_BINARY_DIR}/sql_builtin.cc
${CMAKE_CURRENT_BINARY_DIR}/sql_yacc.cc ${CMAKE_CURRENT_BINARY_DIR}/sql_yacc.cc
${CMAKE_CURRENT_BINARY_DIR}/sql_yacc_ora.cc ${CMAKE_CURRENT_BINARY_DIR}/sql_yacc_ora.cc
......
...@@ -2221,186 +2221,6 @@ int ha_recover(HASH *commit_list) ...@@ -2221,186 +2221,6 @@ int ha_recover(HASH *commit_list)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/**
return the XID as it appears in the SQL function's arguments.
So this string can be passed to XA START, XA PREPARE etc...
@note
the 'buf' has to have space for at least SQL_XIDSIZE bytes.
*/
/*
'a'..'z' 'A'..'Z', '0'..'9'
and '-' '_' ' ' symbols don't have to be
converted.
*/
static const char xid_needs_conv[128]=
{
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,
0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,
1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,
1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1
};
uint get_sql_xid(XID *xid, char *buf)
{
int tot_len= xid->gtrid_length + xid->bqual_length;
int i;
const char *orig_buf= buf;
for (i=0; i<tot_len; i++)
{
uchar c= ((uchar *) xid->data)[i];
if (c >= 128 || xid_needs_conv[c])
break;
}
if (i >= tot_len)
{
/* No need to convert characters to hexadecimals. */
*buf++= '\'';
memcpy(buf, xid->data, xid->gtrid_length);
buf+= xid->gtrid_length;
*buf++= '\'';
if (xid->bqual_length > 0 || xid->formatID != 1)
{
*buf++= ',';
*buf++= '\'';
memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length);
buf+= xid->bqual_length;
*buf++= '\'';
}
}
else
{
*buf++= 'X';
*buf++= '\'';
for (i= 0; i < xid->gtrid_length; i++)
{
*buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
*buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
}
*buf++= '\'';
if (xid->bqual_length > 0 || xid->formatID != 1)
{
*buf++= ',';
*buf++= 'X';
*buf++= '\'';
for (; i < tot_len; i++)
{
*buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
*buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
}
*buf++= '\'';
}
}
if (xid->formatID != 1)
{
*buf++= ',';
buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf,
MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID);
}
return (uint)(buf - orig_buf);
}
/**
return the list of XID's to a client, the same way SHOW commands do.
@note
I didn't find in XA specs that an RM cannot return the same XID twice,
so mysql_xa_recover does not filter XID's to ensure uniqueness.
It can be easily fixed later, if necessary.
*/
static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol,
char *data, uint data_len, CHARSET_INFO *data_cs)
{
if (xs->xa_state == XA_PREPARED)
{
protocol->prepare_for_resend();
protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
protocol->store(data, data_len, data_cs);
if (protocol->write())
return TRUE;
}
return FALSE;
}
static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol)
{
return xa_recover_callback(xs, protocol, xs->xid.data,
xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
}
static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol)
{
char buf[SQL_XIDSIZE];
uint len= get_sql_xid(&xs->xid, buf);
return xa_recover_callback(xs, protocol, buf, len,
&my_charset_utf8_general_ci);
}
bool mysql_xa_recover(THD *thd)
{
List<Item> field_list;
Protocol *protocol= thd->protocol;
MEM_ROOT *mem_root= thd->mem_root;
my_hash_walk_action action;
DBUG_ENTER("mysql_xa_recover");
field_list.push_back(new (mem_root)
Item_int(thd, "formatID", 0,
MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
field_list.push_back(new (mem_root)
Item_int(thd, "gtrid_length", 0,
MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
field_list.push_back(new (mem_root)
Item_int(thd, "bqual_length", 0,
MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
{
uint len;
CHARSET_INFO *cs;
if (thd->lex->verbose)
{
len= SQL_XIDSIZE;
cs= &my_charset_utf8_general_ci;
action= (my_hash_walk_action) xa_recover_callback_verbose;
}
else
{
len= XIDDATASIZE;
cs= &my_charset_bin;
action= (my_hash_walk_action) xa_recover_callback_short;
}
field_list.push_back(new (mem_root)
Item_empty_string(thd, "data", len, cs), mem_root);
}
if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
if (xid_cache_iterate(thd, action, protocol))
DBUG_RETURN(1);
my_eof(thd);
DBUG_RETURN(0);
}
/* /*
Called by engine to notify TC that a new commit checkpoint has been reached. Called by engine to notify TC that a new commit checkpoint has been reached.
......
...@@ -892,15 +892,6 @@ struct xid_t { ...@@ -892,15 +892,6 @@ struct xid_t {
}; };
typedef struct xid_t XID; typedef struct xid_t XID;
/*
The size of XID string representation in the form
'gtrid', 'bqual', formatID
see xid_t::get_sql_string() for details.
*/
#define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS)
/* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */
uint get_sql_xid(XID *xid, char *buf);
/* for recover() handlerton call */ /* for recover() handlerton call */
#define MIN_XID_LIST_SIZE 128 #define MIN_XID_LIST_SIZE 128
#define MAX_XID_LIST_SIZE (1024*128) #define MAX_XID_LIST_SIZE (1024*128)
...@@ -4977,7 +4968,6 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht); ...@@ -4977,7 +4968,6 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht);
const char *get_canonical_filename(handler *file, const char *path, const char *get_canonical_filename(handler *file, const char *path,
char *tmp_path); char *tmp_path);
bool mysql_xa_recover(THD *thd);
void commit_checkpoint_notify_ha(handlerton *hton, void *cookie); void commit_checkpoint_notify_ha(handlerton *hton, void *cookie);
inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRING *name) inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRING *name)
......
...@@ -5641,263 +5641,6 @@ void THD::mark_transaction_to_rollback(bool all) ...@@ -5641,263 +5641,6 @@ void THD::mark_transaction_to_rollback(bool all)
is_fatal_sub_stmt_error= true; is_fatal_sub_stmt_error= true;
transaction_rollback_request= all; transaction_rollback_request= all;
} }
/***************************************************************************
Handling of XA id cacheing
***************************************************************************/
class XID_cache_element
{
/*
m_state is used to prevent elements from being deleted while XA RECOVER
iterates xid cache and to prevent recovered elments from being acquired by
multiple threads.
bits 1..29 are reference counter
bit 30 is RECOVERED flag
bit 31 is ACQUIRED flag (thread owns this xid)
bit 32 is unused
Newly allocated and deleted elements have m_state set to 0.
On lock() m_state is atomically incremented. It also creates load-ACQUIRE
memory barrier to make sure m_state is actually updated before furhter
memory accesses. Attempting to lock an element that has neither ACQUIRED
nor RECOVERED flag set returns failure and further accesses to element
memory are forbidden.
On unlock() m_state is decremented. It also creates store-RELEASE memory
barrier to make sure m_state is actually updated after preceding memory
accesses.
ACQUIRED flag is set when thread registers it's xid or when thread acquires
recovered xid.
RECOVERED flag is set for elements found during crash recovery.
ACQUIRED and RECOVERED flags are cleared before element is deleted from
hash in a spin loop, after last reference is released.
*/
std::atomic<int32_t> m_state;
public:
static const int32 ACQUIRED= 1 << 30;
static const int32 RECOVERED= 1 << 29;
XID_STATE *m_xid_state;
bool is_set(int32_t flag)
{ return m_state.load(std::memory_order_relaxed) & flag; }
void set(int32_t flag)
{
DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED));
m_state.fetch_add(flag, std::memory_order_relaxed);
}
bool lock()
{
int32_t old= m_state.fetch_add(1, std::memory_order_acquire);
if (old & (ACQUIRED | RECOVERED))
return true;
unlock();
return false;
}
void unlock()
{ m_state.fetch_sub(1, std::memory_order_release); }
void mark_uninitialized()
{
int32_t old= ACQUIRED;
while (!m_state.compare_exchange_weak(old, 0,
std::memory_order_relaxed,
std::memory_order_relaxed))
{
old&= ACQUIRED | RECOVERED;
(void) LF_BACKOFF();
}
}
bool acquire_recovered()
{
int32_t old= RECOVERED;
while (!m_state.compare_exchange_weak(old, ACQUIRED | RECOVERED,
std::memory_order_relaxed,
std::memory_order_relaxed))
{
if (!(old & RECOVERED) || (old & ACQUIRED))
return false;
old= RECOVERED;
(void) LF_BACKOFF();
}
return true;
}
static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
XID_cache_element *element,
XID_STATE *xid_state)
{
DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED));
element->m_xid_state= xid_state;
xid_state->xid_cache_element= element;
}
static void lf_alloc_constructor(uchar *ptr)
{
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
element->m_state= 0;
}
static void lf_alloc_destructor(uchar *ptr)
{
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
DBUG_ASSERT(!element->is_set(ACQUIRED));
if (element->is_set(RECOVERED))
my_free(element->m_xid_state);
}
static uchar *key(const XID_cache_element *element, size_t *length,
my_bool not_used __attribute__((unused)))
{
*length= element->m_xid_state->xid.key_length();
return element->m_xid_state->xid.key();
}
};
static LF_HASH xid_cache;
static bool xid_cache_inited;
bool THD::fix_xid_hash_pins()
{
if (!xid_hash_pins)
xid_hash_pins= lf_hash_get_pins(&xid_cache);
return !xid_hash_pins;
}
void xid_cache_init()
{
xid_cache_inited= true;
lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0,
(my_hash_get_key) XID_cache_element::key, &my_charset_bin);
xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor;
xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor;
xid_cache.initializer=
(lf_hash_initializer) XID_cache_element::lf_hash_initializer;
}
void xid_cache_free()
{
if (xid_cache_inited)
{
lf_hash_destroy(&xid_cache);
xid_cache_inited= false;
}
}
/**
Find recovered XA transaction by XID.
*/
XID_STATE *xid_cache_search(THD *thd, XID *xid)
{
XID_STATE *xs= 0;
DBUG_ASSERT(thd->xid_hash_pins);
XID_cache_element *element=
(XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
xid->key(), xid->key_length());
if (element)
{
if (element->acquire_recovered())
xs= element->m_xid_state;
lf_hash_search_unpin(thd->xid_hash_pins);
DEBUG_SYNC(thd, "xa_after_search");
}
return xs;
}
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
{
XID_STATE *xs;
LF_PINS *pins;
int res= 1;
if (!(pins= lf_hash_get_pins(&xid_cache)))
return true;
if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME))))
{
xs->xa_state=xa_state;
xs->xid.set(xid);
xs->rm_error=0;
if ((res= lf_hash_insert(&xid_cache, pins, xs)))
my_free(xs);
else
xs->xid_cache_element->set(XID_cache_element::RECOVERED);
if (res == 1)
res= 0;
}
lf_hash_put_pins(pins);
return res;
}
bool xid_cache_insert(THD *thd, XID_STATE *xid_state)
{
if (thd->fix_xid_hash_pins())
return true;
int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state);
switch (res)
{
case 0:
xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
break;
case 1:
my_error(ER_XAER_DUPID, MYF(0));
/* fall through */
default:
xid_state->xid_cache_element= 0;
}
return res;
}
void xid_cache_delete(THD *thd, XID_STATE *xid_state)
{
if (xid_state->xid_cache_element)
{
bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED);
DBUG_ASSERT(thd->xid_hash_pins);
xid_state->xid_cache_element->mark_uninitialized();
lf_hash_delete(&xid_cache, thd->xid_hash_pins,
xid_state->xid.key(), xid_state->xid.key_length());
xid_state->xid_cache_element= 0;
if (recovered)
my_free(xid_state);
}
}
struct xid_cache_iterate_arg
{
my_hash_walk_action action;
void *argument;
};
static my_bool xid_cache_iterate_callback(XID_cache_element *element,
xid_cache_iterate_arg *arg)
{
my_bool res= FALSE;
if (element->lock())
{
res= arg->action(element->m_xid_state, arg->argument);
element->unlock();
}
return res;
}
int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg)
{
xid_cache_iterate_arg argument= { action, arg };
return thd->fix_xid_hash_pins() ? -1 :
lf_hash_iterate(&xid_cache, thd->xid_hash_pins,
(my_hash_walk_action) xid_cache_iterate_callback,
&argument);
}
/** /**
......
...@@ -47,6 +47,7 @@ ...@@ -47,6 +47,7 @@
#include <mysql_com_server.h> #include <mysql_com_server.h>
#include "session_tracker.h" #include "session_tracker.h"
#include "backup.h" #include "backup.h"
#include "xa.h"
extern "C" extern "C"
void set_thd_stage_info(void *thd, void set_thd_stage_info(void *thd,
...@@ -1276,50 +1277,6 @@ struct st_savepoint { ...@@ -1276,50 +1277,6 @@ struct st_savepoint {
MDL_savepoint mdl_savepoint; MDL_savepoint mdl_savepoint;
}; };
enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
extern const char *xa_state_names[];
class XID_cache_element;
typedef struct st_xid_state {
/* For now, this is only used to catch duplicated external xids */
XID xid; // transaction identifier
enum xa_states xa_state; // used by external XA only
/* Error reported by the Resource Manager (RM) to the Transaction Manager. */
uint rm_error;
XID_cache_element *xid_cache_element;
/**
Check that XA transaction has an uncommitted work. Report an error
to the user in case when there is an uncommitted work for XA transaction.
@return result of check
@retval false XA transaction is NOT in state IDLE, PREPARED
or ROLLBACK_ONLY.
@retval true XA transaction is in state IDLE or PREPARED
or ROLLBACK_ONLY.
*/
bool check_has_uncommitted_xa() const
{
if (xa_state == XA_IDLE ||
xa_state == XA_PREPARED ||
xa_state == XA_ROLLBACK_ONLY)
{
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
return true;
}
return false;
}
} XID_STATE;
void xid_cache_init(void);
void xid_cache_free(void);
XID_STATE *xid_cache_search(THD *thd, XID *xid);
bool xid_cache_insert(XID *xid, enum xa_states xa_state);
bool xid_cache_insert(THD *thd, XID_STATE *xid_state);
void xid_cache_delete(THD *thd, XID_STATE *xid_state);
int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *argument);
/** /**
@class Security_context @class Security_context
@brief A set of THD members describing the current authenticated user. @brief A set of THD members describing the current authenticated user.
......
...@@ -395,10 +395,6 @@ const LEX_CSTRING command_name[257]={ ...@@ -395,10 +395,6 @@ const LEX_CSTRING command_name[257]={
{ STRING_WITH_LEN("Error") } // Last command number 255 { STRING_WITH_LEN("Error") } // Last command number 255
}; };
const char *xa_state_names[]={
"NON-EXISTING", "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY"
};
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
/** /**
Returns true if all tables should be ignored. Returns true if all tables should be ignored.
......
...@@ -28,21 +28,19 @@ ...@@ -28,21 +28,19 @@
#include "wsrep_trans_observer.h" #include "wsrep_trans_observer.h"
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
#ifndef EMBEDDED_LIBRARY
/** /**
Helper: Tell tracker (if any) that transaction ended. Helper: Tell tracker (if any) that transaction ended.
*/ */
static void trans_track_end_trx(THD *thd) void trans_track_end_trx(THD *thd)
{ {
#ifndef EMBEDDED_LIBRARY
if (thd->variables.session_track_transaction_info > TX_TRACK_NONE) if (thd->variables.session_track_transaction_info > TX_TRACK_NONE)
{ {
((Transaction_state_tracker *) ((Transaction_state_tracker *)
thd->session_tracker.get_tracker(TRANSACTION_INFO_TRACKER))->end_trx(thd); thd->session_tracker.get_tracker(TRANSACTION_INFO_TRACKER))->end_trx(thd);
} }
}
#else
#define trans_track_end_trx(A) do{}while(0)
#endif //EMBEDDED_LIBRARY #endif //EMBEDDED_LIBRARY
}
/** /**
...@@ -88,65 +86,6 @@ static bool trans_check(THD *thd) ...@@ -88,65 +86,6 @@ static bool trans_check(THD *thd)
} }
/**
Mark a XA transaction as rollback-only if the RM unilaterally
rolled back the transaction branch.
@note If a rollback was requested by the RM, this function sets
the appropriate rollback error code and transits the state
to XA_ROLLBACK_ONLY.
@return TRUE if transaction was rolled back or if the transaction
state is XA_ROLLBACK_ONLY. FALSE otherwise.
*/
static bool xa_trans_rolled_back(XID_STATE *xid_state)
{
if (xid_state->rm_error)
{
switch (xid_state->rm_error) {
case ER_LOCK_WAIT_TIMEOUT:
my_error(ER_XA_RBTIMEOUT, MYF(0));
break;
case ER_LOCK_DEADLOCK:
my_error(ER_XA_RBDEADLOCK, MYF(0));
break;
default:
my_error(ER_XA_RBROLLBACK, MYF(0));
}
xid_state->xa_state= XA_ROLLBACK_ONLY;
}
return (xid_state->xa_state == XA_ROLLBACK_ONLY);
}
/**
Rollback the active XA transaction.
@note Resets rm_error before calling ha_rollback(), so
the thd->transaction.xid structure gets reset
by ha_rollback() / THD::transaction::cleanup().
@return TRUE if the rollback failed, FALSE otherwise.
*/
static bool xa_trans_force_rollback(THD *thd)
{
/*
We must reset rm_error before calling ha_rollback(),
so thd->transaction.xid structure gets reset
by ha_rollback()/THD::transaction::cleanup().
*/
thd->transaction.xid_state.rm_error= 0;
if (ha_rollback_trans(thd, true))
{
my_error(ER_XAER_RMERR, MYF(0));
return true;
}
return false;
}
/** /**
Begin a new transaction. Begin a new transaction.
...@@ -777,268 +716,3 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name) ...@@ -777,268 +716,3 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name)
DBUG_RETURN(MY_TEST(res)); DBUG_RETURN(MY_TEST(res));
} }
/**
Starts an XA transaction with the given xid value.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_start(THD *thd)
{
enum xa_states xa_state= thd->transaction.xid_state.xa_state;
DBUG_ENTER("trans_xa_start");
if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME)
{
bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid);
if (not_equal)
my_error(ER_XAER_NOTA, MYF(0));
else
thd->transaction.xid_state.xa_state= XA_ACTIVE;
DBUG_RETURN(not_equal);
}
/* TODO: JOIN is not supported yet. */
if (thd->lex->xa_opt != XA_NONE)
my_error(ER_XAER_INVAL, MYF(0));
else if (xa_state != XA_NOTR)
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction())
my_error(ER_XAER_OUTSIDE, MYF(0));
else if (!trans_begin(thd))
{
DBUG_ASSERT(thd->transaction.xid_state.xid.is_null());
thd->transaction.xid_state.xa_state= XA_ACTIVE;
thd->transaction.xid_state.rm_error= 0;
thd->transaction.xid_state.xid.set(thd->lex->xid);
if (xid_cache_insert(thd, &thd->transaction.xid_state))
{
thd->transaction.xid_state.xa_state= XA_NOTR;
thd->transaction.xid_state.xid.null();
trans_rollback(thd);
DBUG_RETURN(true);
}
DBUG_RETURN(FALSE);
}
DBUG_RETURN(TRUE);
}
/**
Put a XA transaction in the IDLE state.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_end(THD *thd)
{
DBUG_ENTER("trans_xa_end");
/* TODO: SUSPEND and FOR MIGRATE are not supported yet. */
if (thd->lex->xa_opt != XA_NONE)
my_error(ER_XAER_INVAL, MYF(0));
else if (thd->transaction.xid_state.xa_state != XA_ACTIVE)
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
my_error(ER_XAER_NOTA, MYF(0));
else if (!xa_trans_rolled_back(&thd->transaction.xid_state))
thd->transaction.xid_state.xa_state= XA_IDLE;
DBUG_RETURN(thd->is_error() ||
thd->transaction.xid_state.xa_state != XA_IDLE);
}
/**
Put a XA transaction in the PREPARED state.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_prepare(THD *thd)
{
DBUG_ENTER("trans_xa_prepare");
if (thd->transaction.xid_state.xa_state != XA_IDLE)
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
my_error(ER_XAER_NOTA, MYF(0));
else if (ha_prepare(thd))
{
xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
my_error(ER_XA_RBROLLBACK, MYF(0));
}
else
thd->transaction.xid_state.xa_state= XA_PREPARED;
DBUG_RETURN(thd->is_error() ||
thd->transaction.xid_state.xa_state != XA_PREPARED);
}
/**
Commit and terminate the a XA transaction.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_commit(THD *thd)
{
bool res= TRUE;
enum xa_states xa_state= thd->transaction.xid_state.xa_state;
DBUG_ENTER("trans_xa_commit");
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
if (thd->fix_xid_hash_pins())
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(TRUE);
}
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
res= !xs;
if (res)
my_error(ER_XAER_NOTA, MYF(0));
else
{
res= xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
xid_cache_delete(thd, xs);
}
DBUG_RETURN(res);
}
if (xa_trans_rolled_back(&thd->transaction.xid_state))
{
xa_trans_force_rollback(thd);
res= thd->is_error();
}
else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE)
{
int r= ha_commit_trans(thd, TRUE);
if ((res= MY_TEST(r)))
my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
}
else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE)
{
MDL_request mdl_request;
/*
Acquire metadata lock which will ensure that COMMIT is blocked
by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
progress blocks FTWRL).
We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
*/
mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
MDL_TRANSACTION);
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, TRUE);
my_error(ER_XAER_RMERR, MYF(0));
}
else
{
DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
res= MY_TEST(ha_commit_one_phase(thd, 1));
if (res)
my_error(ER_XAER_RMERR, MYF(0));
}
}
else
{
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
DBUG_RETURN(TRUE);
}
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.reset();
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
trans_track_end_trx(thd);
DBUG_RETURN(res);
}
/**
Roll back and terminate a XA transaction.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_rollback(THD *thd)
{
bool res= TRUE;
enum xa_states xa_state= thd->transaction.xid_state.xa_state;
DBUG_ENTER("trans_xa_rollback");
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
if (thd->fix_xid_hash_pins())
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(TRUE);
}
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
if (!xs)
my_error(ER_XAER_NOTA, MYF(0));
else
{
xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
xid_cache_delete(thd, xs);
}
DBUG_RETURN(thd->get_stmt_da()->is_error());
}
if (xa_state != XA_IDLE && xa_state != XA_PREPARED && xa_state != XA_ROLLBACK_ONLY)
{
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
DBUG_RETURN(TRUE);
}
res= xa_trans_force_rollback(thd);
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.reset();
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
trans_track_end_trx(thd);
DBUG_RETURN(res);
}
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
class THD; class THD;
void trans_track_end_trx(THD *thd);
bool trans_begin(THD *thd, uint flags= 0); bool trans_begin(THD *thd, uint flags= 0);
bool trans_commit(THD *thd); bool trans_commit(THD *thd);
bool trans_commit_implicit(THD *thd); bool trans_commit_implicit(THD *thd);
...@@ -37,12 +39,6 @@ bool trans_savepoint(THD *thd, LEX_CSTRING name); ...@@ -37,12 +39,6 @@ bool trans_savepoint(THD *thd, LEX_CSTRING name);
bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name); bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name);
bool trans_release_savepoint(THD *thd, LEX_CSTRING name); bool trans_release_savepoint(THD *thd, LEX_CSTRING name);
bool trans_xa_start(THD *thd);
bool trans_xa_end(THD *thd);
bool trans_xa_prepare(THD *thd);
bool trans_xa_commit(THD *thd);
bool trans_xa_rollback(THD *thd);
void trans_reset_one_shot_chistics(THD *thd); void trans_reset_one_shot_chistics(THD *thd);
#endif /* TRANSACTION_H */ #endif /* TRANSACTION_H */
/*
Copyright (c) 2000, 2016, Oracle and/or its affiliates.
Copyright (c) 2009, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
#include "mariadb.h"
#include "sql_class.h"
#include "transaction.h"
/***************************************************************************
Handling of XA id cacheing
***************************************************************************/
class XID_cache_element
{
/*
m_state is used to prevent elements from being deleted while XA RECOVER
iterates xid cache and to prevent recovered elments from being acquired by
multiple threads.
bits 1..29 are reference counter
bit 30 is RECOVERED flag
bit 31 is ACQUIRED flag (thread owns this xid)
bit 32 is unused
Newly allocated and deleted elements have m_state set to 0.
On lock() m_state is atomically incremented. It also creates load-ACQUIRE
memory barrier to make sure m_state is actually updated before furhter
memory accesses. Attempting to lock an element that has neither ACQUIRED
nor RECOVERED flag set returns failure and further accesses to element
memory are forbidden.
On unlock() m_state is decremented. It also creates store-RELEASE memory
barrier to make sure m_state is actually updated after preceding memory
accesses.
ACQUIRED flag is set when thread registers it's xid or when thread acquires
recovered xid.
RECOVERED flag is set for elements found during crash recovery.
ACQUIRED and RECOVERED flags are cleared before element is deleted from
hash in a spin loop, after last reference is released.
*/
std::atomic<int32_t> m_state;
public:
static const int32 ACQUIRED= 1 << 30;
static const int32 RECOVERED= 1 << 29;
XID_STATE *m_xid_state;
bool is_set(int32_t flag)
{ return m_state.load(std::memory_order_relaxed) & flag; }
void set(int32_t flag)
{
DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED));
m_state.fetch_add(flag, std::memory_order_relaxed);
}
bool lock()
{
int32_t old= m_state.fetch_add(1, std::memory_order_acquire);
if (old & (ACQUIRED | RECOVERED))
return true;
unlock();
return false;
}
void unlock()
{ m_state.fetch_sub(1, std::memory_order_release); }
void mark_uninitialized()
{
int32_t old= ACQUIRED;
while (!m_state.compare_exchange_weak(old, 0,
std::memory_order_relaxed,
std::memory_order_relaxed))
{
old&= ACQUIRED | RECOVERED;
(void) LF_BACKOFF();
}
}
bool acquire_recovered()
{
int32_t old= RECOVERED;
while (!m_state.compare_exchange_weak(old, ACQUIRED | RECOVERED,
std::memory_order_relaxed,
std::memory_order_relaxed))
{
if (!(old & RECOVERED) || (old & ACQUIRED))
return false;
old= RECOVERED;
(void) LF_BACKOFF();
}
return true;
}
static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
XID_cache_element *element,
XID_STATE *xid_state)
{
DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED));
element->m_xid_state= xid_state;
xid_state->xid_cache_element= element;
}
static void lf_alloc_constructor(uchar *ptr)
{
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
element->m_state= 0;
}
static void lf_alloc_destructor(uchar *ptr)
{
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
DBUG_ASSERT(!element->is_set(ACQUIRED));
if (element->is_set(RECOVERED))
my_free(element->m_xid_state);
}
static uchar *key(const XID_cache_element *element, size_t *length,
my_bool not_used __attribute__((unused)))
{
*length= element->m_xid_state->xid.key_length();
return element->m_xid_state->xid.key();
}
};
static LF_HASH xid_cache;
static bool xid_cache_inited;
const char *xa_state_names[]= {
"NON-EXISTING", "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY"
};
bool THD::fix_xid_hash_pins()
{
if (!xid_hash_pins)
xid_hash_pins= lf_hash_get_pins(&xid_cache);
return !xid_hash_pins;
}
void xid_cache_init()
{
xid_cache_inited= true;
lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0,
(my_hash_get_key) XID_cache_element::key, &my_charset_bin);
xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor;
xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor;
xid_cache.initializer=
(lf_hash_initializer) XID_cache_element::lf_hash_initializer;
}
void xid_cache_free()
{
if (xid_cache_inited)
{
lf_hash_destroy(&xid_cache);
xid_cache_inited= false;
}
}
/**
Find recovered XA transaction by XID.
*/
static XID_STATE *xid_cache_search(THD *thd, XID *xid)
{
XID_STATE *xs= 0;
DBUG_ASSERT(thd->xid_hash_pins);
XID_cache_element *element=
(XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
xid->key(), xid->key_length());
if (element)
{
if (element->acquire_recovered())
xs= element->m_xid_state;
lf_hash_search_unpin(thd->xid_hash_pins);
DEBUG_SYNC(thd, "xa_after_search");
}
return xs;
}
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
{
XID_STATE *xs;
LF_PINS *pins;
int res= 1;
if (!(pins= lf_hash_get_pins(&xid_cache)))
return true;
if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME))))
{
xs->xa_state=xa_state;
xs->xid.set(xid);
xs->rm_error=0;
if ((res= lf_hash_insert(&xid_cache, pins, xs)))
my_free(xs);
else
xs->xid_cache_element->set(XID_cache_element::RECOVERED);
if (res == 1)
res= 0;
}
lf_hash_put_pins(pins);
return res;
}
bool xid_cache_insert(THD *thd, XID_STATE *xid_state)
{
if (thd->fix_xid_hash_pins())
return true;
int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state);
switch (res)
{
case 0:
xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
break;
case 1:
my_error(ER_XAER_DUPID, MYF(0));
/* fall through */
default:
xid_state->xid_cache_element= 0;
}
return res;
}
void xid_cache_delete(THD *thd, XID_STATE *xid_state)
{
if (xid_state->xid_cache_element)
{
bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED);
DBUG_ASSERT(thd->xid_hash_pins);
xid_state->xid_cache_element->mark_uninitialized();
lf_hash_delete(&xid_cache, thd->xid_hash_pins,
xid_state->xid.key(), xid_state->xid.key_length());
xid_state->xid_cache_element= 0;
if (recovered)
my_free(xid_state);
}
}
struct xid_cache_iterate_arg
{
my_hash_walk_action action;
void *argument;
};
static my_bool xid_cache_iterate_callback(XID_cache_element *element,
xid_cache_iterate_arg *arg)
{
my_bool res= FALSE;
if (element->lock())
{
res= arg->action(element->m_xid_state, arg->argument);
element->unlock();
}
return res;
}
static int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg)
{
xid_cache_iterate_arg argument= { action, arg };
return thd->fix_xid_hash_pins() ? -1 :
lf_hash_iterate(&xid_cache, thd->xid_hash_pins,
(my_hash_walk_action) xid_cache_iterate_callback,
&argument);
}
/**
Mark a XA transaction as rollback-only if the RM unilaterally
rolled back the transaction branch.
@note If a rollback was requested by the RM, this function sets
the appropriate rollback error code and transits the state
to XA_ROLLBACK_ONLY.
@return TRUE if transaction was rolled back or if the transaction
state is XA_ROLLBACK_ONLY. FALSE otherwise.
*/
static bool xa_trans_rolled_back(XID_STATE *xid_state)
{
if (xid_state->rm_error)
{
switch (xid_state->rm_error) {
case ER_LOCK_WAIT_TIMEOUT:
my_error(ER_XA_RBTIMEOUT, MYF(0));
break;
case ER_LOCK_DEADLOCK:
my_error(ER_XA_RBDEADLOCK, MYF(0));
break;
default:
my_error(ER_XA_RBROLLBACK, MYF(0));
}
xid_state->xa_state= XA_ROLLBACK_ONLY;
}
return (xid_state->xa_state == XA_ROLLBACK_ONLY);
}
/**
Rollback the active XA transaction.
@note Resets rm_error before calling ha_rollback(), so
the thd->transaction.xid structure gets reset
by ha_rollback() / THD::transaction::cleanup().
@return TRUE if the rollback failed, FALSE otherwise.
*/
static bool xa_trans_force_rollback(THD *thd)
{
/*
We must reset rm_error before calling ha_rollback(),
so thd->transaction.xid structure gets reset
by ha_rollback()/THD::transaction::cleanup().
*/
thd->transaction.xid_state.rm_error= 0;
if (ha_rollback_trans(thd, true))
{
my_error(ER_XAER_RMERR, MYF(0));
return true;
}
return false;
}
/**
Starts an XA transaction with the given xid value.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_start(THD *thd)
{
enum xa_states xa_state= thd->transaction.xid_state.xa_state;
DBUG_ENTER("trans_xa_start");
if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME)
{
bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid);
if (not_equal)
my_error(ER_XAER_NOTA, MYF(0));
else
thd->transaction.xid_state.xa_state= XA_ACTIVE;
DBUG_RETURN(not_equal);
}
/* TODO: JOIN is not supported yet. */
if (thd->lex->xa_opt != XA_NONE)
my_error(ER_XAER_INVAL, MYF(0));
else if (xa_state != XA_NOTR)
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction())
my_error(ER_XAER_OUTSIDE, MYF(0));
else if (!trans_begin(thd))
{
DBUG_ASSERT(thd->transaction.xid_state.xid.is_null());
thd->transaction.xid_state.xa_state= XA_ACTIVE;
thd->transaction.xid_state.rm_error= 0;
thd->transaction.xid_state.xid.set(thd->lex->xid);
if (xid_cache_insert(thd, &thd->transaction.xid_state))
{
thd->transaction.xid_state.xa_state= XA_NOTR;
thd->transaction.xid_state.xid.null();
trans_rollback(thd);
DBUG_RETURN(true);
}
DBUG_RETURN(FALSE);
}
DBUG_RETURN(TRUE);
}
/**
Put a XA transaction in the IDLE state.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_end(THD *thd)
{
DBUG_ENTER("trans_xa_end");
/* TODO: SUSPEND and FOR MIGRATE are not supported yet. */
if (thd->lex->xa_opt != XA_NONE)
my_error(ER_XAER_INVAL, MYF(0));
else if (thd->transaction.xid_state.xa_state != XA_ACTIVE)
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
my_error(ER_XAER_NOTA, MYF(0));
else if (!xa_trans_rolled_back(&thd->transaction.xid_state))
thd->transaction.xid_state.xa_state= XA_IDLE;
DBUG_RETURN(thd->is_error() ||
thd->transaction.xid_state.xa_state != XA_IDLE);
}
/**
Put a XA transaction in the PREPARED state.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_prepare(THD *thd)
{
DBUG_ENTER("trans_xa_prepare");
if (thd->transaction.xid_state.xa_state != XA_IDLE)
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
my_error(ER_XAER_NOTA, MYF(0));
else if (ha_prepare(thd))
{
xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
my_error(ER_XA_RBROLLBACK, MYF(0));
}
else
thd->transaction.xid_state.xa_state= XA_PREPARED;
DBUG_RETURN(thd->is_error() ||
thd->transaction.xid_state.xa_state != XA_PREPARED);
}
/**
Commit and terminate the a XA transaction.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_commit(THD *thd)
{
bool res= TRUE;
enum xa_states xa_state= thd->transaction.xid_state.xa_state;
DBUG_ENTER("trans_xa_commit");
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
if (thd->fix_xid_hash_pins())
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(TRUE);
}
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
res= !xs;
if (res)
my_error(ER_XAER_NOTA, MYF(0));
else
{
res= xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
xid_cache_delete(thd, xs);
}
DBUG_RETURN(res);
}
if (xa_trans_rolled_back(&thd->transaction.xid_state))
{
xa_trans_force_rollback(thd);
res= thd->is_error();
}
else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE)
{
int r= ha_commit_trans(thd, TRUE);
if ((res= MY_TEST(r)))
my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
}
else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE)
{
MDL_request mdl_request;
/*
Acquire metadata lock which will ensure that COMMIT is blocked
by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
progress blocks FTWRL).
We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
*/
mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
MDL_TRANSACTION);
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, TRUE);
my_error(ER_XAER_RMERR, MYF(0));
}
else
{
DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
res= MY_TEST(ha_commit_one_phase(thd, 1));
if (res)
my_error(ER_XAER_RMERR, MYF(0));
}
}
else
{
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
DBUG_RETURN(TRUE);
}
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.reset();
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
trans_track_end_trx(thd);
DBUG_RETURN(res);
}
/**
Roll back and terminate a XA transaction.
@param thd Current thread
@retval FALSE Success
@retval TRUE Failure
*/
bool trans_xa_rollback(THD *thd)
{
bool res= TRUE;
enum xa_states xa_state= thd->transaction.xid_state.xa_state;
DBUG_ENTER("trans_xa_rollback");
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
if (thd->fix_xid_hash_pins())
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(TRUE);
}
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
if (!xs)
my_error(ER_XAER_NOTA, MYF(0));
else
{
xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
xid_cache_delete(thd, xs);
}
DBUG_RETURN(thd->get_stmt_da()->is_error());
}
if (xa_state != XA_IDLE && xa_state != XA_PREPARED && xa_state != XA_ROLLBACK_ONLY)
{
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
DBUG_RETURN(TRUE);
}
res= xa_trans_force_rollback(thd);
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.reset();
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
trans_track_end_trx(thd);
DBUG_RETURN(res);
}
/**
return the XID as it appears in the SQL function's arguments.
So this string can be passed to XA START, XA PREPARE etc...
@note
the 'buf' has to have space for at least SQL_XIDSIZE bytes.
*/
/*
'a'..'z' 'A'..'Z', '0'..'9'
and '-' '_' ' ' symbols don't have to be
converted.
*/
static const char xid_needs_conv[128]=
{
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,
0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,
1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,
1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1
};
/*
The size of XID string representation in the form
'gtrid', 'bqual', formatID
see xid_t::get_sql_string() for details.
*/
#define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS)
/* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */
static uint get_sql_xid(XID *xid, char *buf)
{
int tot_len= xid->gtrid_length + xid->bqual_length;
int i;
const char *orig_buf= buf;
for (i=0; i<tot_len; i++)
{
uchar c= ((uchar *) xid->data)[i];
if (c >= 128 || xid_needs_conv[c])
break;
}
if (i >= tot_len)
{
/* No need to convert characters to hexadecimals. */
*buf++= '\'';
memcpy(buf, xid->data, xid->gtrid_length);
buf+= xid->gtrid_length;
*buf++= '\'';
if (xid->bqual_length > 0 || xid->formatID != 1)
{
*buf++= ',';
*buf++= '\'';
memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length);
buf+= xid->bqual_length;
*buf++= '\'';
}
}
else
{
*buf++= 'X';
*buf++= '\'';
for (i= 0; i < xid->gtrid_length; i++)
{
*buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
*buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
}
*buf++= '\'';
if (xid->bqual_length > 0 || xid->formatID != 1)
{
*buf++= ',';
*buf++= 'X';
*buf++= '\'';
for (; i < tot_len; i++)
{
*buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
*buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
}
*buf++= '\'';
}
}
if (xid->formatID != 1)
{
*buf++= ',';
buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf,
MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID);
}
return (uint)(buf - orig_buf);
}
/**
return the list of XID's to a client, the same way SHOW commands do.
@note
I didn't find in XA specs that an RM cannot return the same XID twice,
so mysql_xa_recover does not filter XID's to ensure uniqueness.
It can be easily fixed later, if necessary.
*/
static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol,
char *data, uint data_len, CHARSET_INFO *data_cs)
{
if (xs->xa_state == XA_PREPARED)
{
protocol->prepare_for_resend();
protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
protocol->store(data, data_len, data_cs);
if (protocol->write())
return TRUE;
}
return FALSE;
}
static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol)
{
return xa_recover_callback(xs, protocol, xs->xid.data,
xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
}
static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol)
{
char buf[SQL_XIDSIZE];
uint len= get_sql_xid(&xs->xid, buf);
return xa_recover_callback(xs, protocol, buf, len,
&my_charset_utf8_general_ci);
}
bool mysql_xa_recover(THD *thd)
{
List<Item> field_list;
Protocol *protocol= thd->protocol;
MEM_ROOT *mem_root= thd->mem_root;
my_hash_walk_action action;
DBUG_ENTER("mysql_xa_recover");
field_list.push_back(new (mem_root)
Item_int(thd, "formatID", 0,
MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
field_list.push_back(new (mem_root)
Item_int(thd, "gtrid_length", 0,
MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
field_list.push_back(new (mem_root)
Item_int(thd, "bqual_length", 0,
MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
{
uint len;
CHARSET_INFO *cs;
if (thd->lex->verbose)
{
len= SQL_XIDSIZE;
cs= &my_charset_utf8_general_ci;
action= (my_hash_walk_action) xa_recover_callback_verbose;
}
else
{
len= XIDDATASIZE;
cs= &my_charset_bin;
action= (my_hash_walk_action) xa_recover_callback_short;
}
field_list.push_back(new (mem_root)
Item_empty_string(thd, "data", len, cs), mem_root);
}
if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
if (xid_cache_iterate(thd, action, protocol))
DBUG_RETURN(1);
my_eof(thd);
DBUG_RETURN(0);
}
/*
Copyright (c) 2000, 2016, Oracle and/or its affiliates.
Copyright (c) 2009, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
extern const char *xa_state_names[];
class XID_cache_element;
struct XID_STATE {
/* For now, this is only used to catch duplicated external xids */
XID xid; // transaction identifier
enum xa_states xa_state; // used by external XA only
/* Error reported by the Resource Manager (RM) to the Transaction Manager. */
uint rm_error;
XID_cache_element *xid_cache_element;
/**
Check that XA transaction has an uncommitted work. Report an error
to the user in case when there is an uncommitted work for XA transaction.
@return result of check
@retval false XA transaction is NOT in state IDLE, PREPARED
or ROLLBACK_ONLY.
@retval true XA transaction is in state IDLE or PREPARED
or ROLLBACK_ONLY.
*/
bool check_has_uncommitted_xa() const
{
if (xa_state == XA_IDLE ||
xa_state == XA_PREPARED ||
xa_state == XA_ROLLBACK_ONLY)
{
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
return true;
}
return false;
}
};
void xid_cache_init(void);
void xid_cache_free(void);
bool xid_cache_insert(XID *xid, enum xa_states xa_state);
bool xid_cache_insert(THD *thd, XID_STATE *xid_state);
void xid_cache_delete(THD *thd, XID_STATE *xid_state);
bool trans_xa_start(THD *thd);
bool trans_xa_end(THD *thd);
bool trans_xa_prepare(THD *thd);
bool trans_xa_commit(THD *thd);
bool trans_xa_rollback(THD *thd);
bool mysql_xa_recover(THD *thd);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment