Commit 8ec02bb8 authored by Nirbhay Choubey's avatar Nirbhay Choubey
parent 857abf14
......@@ -652,6 +652,7 @@ typedef struct system_variables
#ifdef WITH_WSREP
my_bool wsrep_on;
my_bool wsrep_causal_reads;
uint wsrep_sync_wait;
ulong wsrep_retry_autocommit;
#endif
double long_query_time_double;
......
......@@ -2766,7 +2766,7 @@ mysql_execute_command(THD *thd)
case SQLCOM_SHOW_STATUS:
{
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
#endif /* WITH_WSREP */
execute_show_status(thd, all_tables);
break;
......@@ -2801,7 +2801,7 @@ mysql_execute_command(THD *thd)
case SQLCOM_SHOW_STATUS_PROC:
case SQLCOM_SHOW_STATUS_FUNC:
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
#endif /* WITH_WSREP */
case SQLCOM_SHOW_DATABASES:
......@@ -2825,7 +2825,7 @@ mysql_execute_command(THD *thd)
case SQLCOM_SHOW_INDEX_STATS:
case SQLCOM_SELECT:
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
case SQLCOM_SHOW_VARIABLES:
case SQLCOM_SHOW_CHARSETS:
case SQLCOM_SHOW_COLLATIONS:
......@@ -3516,7 +3516,7 @@ case SQLCOM_PREPARE:
#else
{
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
#endif /* WITH_WSREP */
/*
......@@ -3582,7 +3582,7 @@ case SQLCOM_PREPARE:
{
DBUG_ASSERT(first_table == all_tables && first_table != 0);
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
#endif /* WITH_WSREP */
if (check_table_access(thd, SELECT_ACL, all_tables,
......@@ -3593,6 +3593,10 @@ case SQLCOM_PREPARE:
break;
}
case SQLCOM_UPDATE:
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) &&
wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) goto error;
#endif /* WITH_WSREP */
{
ha_rows found= 0, updated= 0;
DBUG_ASSERT(first_table == all_tables && first_table != 0);
......@@ -3632,6 +3636,10 @@ case SQLCOM_PREPARE:
/* if we switched from normal update, rights are checked */
if (up_result != 2)
{
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) &&
wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) goto error;
#endif /* WITH_WSREP */
if ((res= multi_update_precheck(thd, all_tables)))
break;
}
......@@ -3701,6 +3709,10 @@ case SQLCOM_PREPARE:
break;
}
case SQLCOM_REPLACE:
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) &&
wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE)) goto error;
#endif /* WITH_WSREP */
#ifndef DBUG_OFF
if (mysql_bin_log.is_open())
{
......@@ -3736,6 +3748,10 @@ case SQLCOM_PREPARE:
}
#endif
case SQLCOM_INSERT:
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) &&
wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE)) goto error;
#endif /* WITH_WSREP */
{
DBUG_ASSERT(first_table == all_tables && first_table != 0);
......@@ -3789,6 +3805,10 @@ case SQLCOM_PREPARE:
}
case SQLCOM_REPLACE_SELECT:
case SQLCOM_INSERT_SELECT:
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) &&
wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE)) goto error;
#endif /* WITH_WSREP */
{
select_result *sel_result;
bool explain= MY_TEST(lex->describe);
......@@ -3890,6 +3910,10 @@ case SQLCOM_PREPARE:
break;
}
case SQLCOM_DELETE:
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) &&
wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) goto error;
#endif /* WITH_WSREP */
{
select_result *sel_result=lex->result;
DBUG_ASSERT(first_table == all_tables && first_table != 0);
......@@ -3910,6 +3934,10 @@ case SQLCOM_PREPARE:
break;
}
case SQLCOM_DELETE_MULTI:
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) &&
wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) goto error;
#endif /* WITH_WSREP */
{
DBUG_ASSERT(first_table == all_tables && first_table != 0);
TABLE_LIST *aux_tables= thd->lex->auxiliary_table_list.first;
......
......@@ -4664,9 +4664,21 @@ static Sys_var_mybool Sys_wsrep_certify_nonPK(
CMD_LINE(OPT_ARG), DEFAULT(TRUE));
static Sys_var_mybool Sys_wsrep_causal_reads(
"wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations",
"wsrep_causal_reads", "(DEPRECATED) setting this variable is equivalent to setting wsrep_sync_wait READ flag",
SESSION_VAR(wsrep_causal_reads),
CMD_LINE(OPT_ARG), DEFAULT(FALSE));
CMD_LINE(OPT_ARG), DEFAULT(FALSE),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(wsrep_causal_reads_update));
static Sys_var_uint Sys_wsrep_sync_wait(
"wsrep_sync_wait", "Ensure \"synchronous\" read view before executing an operation of the type specified by bitmask: 1 - READ(includes SELECT, SHOW and BEGIN/START TRANSACTION); 2 - UPDATE and DELETE; 4 - INSERT and REPLACE",
SESSION_VAR(wsrep_sync_wait),
CMD_LINE(OPT_ARG),
VALID_RANGE(WSREP_SYNC_WAIT_NONE, WSREP_SYNC_WAIT_MAX),
DEFAULT(WSREP_SYNC_WAIT_NONE),
BLOCK_SIZE(1),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(wsrep_sync_wait_update));
static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS };
static Sys_var_enum Sys_wsrep_OSU_method(
......
......@@ -192,7 +192,7 @@ bool trans_begin(THD *thd, uint flags)
#ifdef WITH_WSREP
thd->wsrep_PA_safe= true;
if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd))
if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd))
DBUG_RETURN(TRUE);
#endif /* WITH_WSREP */
......
......@@ -879,13 +879,15 @@ bool wsrep_start_replication()
return true;
}
bool
wsrep_causal_wait (THD* thd)
bool wsrep_sync_wait (THD* thd, uint mask)
{
if (thd->variables.wsrep_causal_reads && thd->variables.wsrep_on &&
if ((thd->variables.wsrep_sync_wait & mask) &&
thd->variables.wsrep_on &&
!thd->in_active_multi_stmt_transaction() &&
thd->wsrep_conflict_state != REPLAYING)
{
WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u",
thd->variables.wsrep_sync_wait, mask);
// This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
// TODO: modify to check if thd has locked any rows.
wsrep_gtid_t gtid;
......@@ -909,7 +911,7 @@ wsrep_causal_wait (THD* thd)
err= ER_NOT_SUPPORTED_YET;
break;
default:
msg= "Causal wait failed.";
msg= "Synchronous wait failed.";
err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed
// with ER_LOCK_WAIT_TIMEOUT
}
......
......@@ -102,6 +102,14 @@ extern my_bool wsrep_slave_FK_checks;
extern my_bool wsrep_slave_UK_checks;
enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU };
enum enum_wsrep_sync_wait {
WSREP_SYNC_WAIT_NONE = 0x0,
// show, select, begin
WSREP_SYNC_WAIT_BEFORE_READ = 0x1,
WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE = 0x2,
WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE = 0x4,
WSREP_SYNC_WAIT_MAX = 0x7
};
// MySQL status variables
extern my_bool wsrep_connected;
......@@ -175,9 +183,10 @@ extern void wsrep_kill_mysql(THD *thd);
/* new defines */
extern void wsrep_stop_replication(THD *thd);
extern bool wsrep_start_replication();
extern bool wsrep_causal_wait(THD* thd);
extern bool wsrep_sync_wait (THD* thd, uint mask = WSREP_SYNC_WAIT_BEFORE_READ);
extern int wsrep_check_opts (int argc, char* const* argv);
extern void wsrep_prepend_PATH (const char* path);
/* some inline functions are defined in wsrep_mysqld_inl.h */
/* Other global variables */
extern wsrep_seqno_t wsrep_locked_seqno;
......
......@@ -60,11 +60,29 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type)
return false;
}
void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type)
{
if (var_type == OPT_GLOBAL) {
thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads;
bool wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type)
{
// global setting should not affect session setting.
// if (var_type == OPT_GLOBAL) {
// thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads;
// }
if (thd->variables.wsrep_causal_reads) {
thd->variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ;
} else {
thd->variables.wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ;
}
return false;
}
bool wsrep_sync_wait_update (sys_var* self, THD* thd, enum_var_type var_type)
{
// global setting should not affect session setting.
// if (var_type == OPT_GLOBAL) {
// thd->variables.wsrep_sync_wait = global_system_variables.wsrep_sync_wait;
// }
thd->variables.wsrep_causal_reads = thd->variables.wsrep_sync_wait &
WSREP_SYNC_WAIT_BEFORE_READ;
return false;
}
static int wsrep_start_position_verify (const char* start_str)
......
......@@ -35,7 +35,8 @@ int wsrep_init_vars();
#define INIT_ARGS (const char* opt)
extern bool wsrep_on_update UPDATE_ARGS;
extern void wsrep_causal_reads_update UPDATE_ARGS;
extern bool wsrep_causal_reads_update UPDATE_ARGS;
extern bool wsrep_sync_wait_update UPDATE_ARGS;
extern bool wsrep_start_position_check CHECK_ARGS;
extern bool wsrep_start_position_update UPDATE_ARGS;
extern void wsrep_start_position_init INIT_ARGS;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment