Commit ace86a23 authored by Daniele Sciascia's avatar Daniele Sciascia Committed by Nirbhay Choubey

refs codership/mysql-wsrep#201

- Fixes query cache so that it is aware of wsrep_sync_wait.
  Query cache would return (possibly stale) results to the
  client, regardless of the value of wsrep_sync_wait.
- Includes the test case that reproduced the issue.
parent c1ea0570
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (DEFAULT);
SET GLOBAL query_cache_size=1355776;
SET SESSION wsrep_sync_wait = 7;
--source include/galera_cluster.inc
--source include/have_innodb.inc
--source include/have_query_cache.inc
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (DEFAULT);
--connection node_2
--let $qcache_size_orig = `SELECT @@GLOBAL.query_cache_size`
SET GLOBAL query_cache_size=1355776;
SET SESSION wsrep_sync_wait = 7;
--disable_query_log
--let $count = 10000
while ($count)
{
--connection node_1
INSERT INTO t1 VALUES (DEFAULT);
--let $val1 = `SELECT LAST_INSERT_ID()`
--connection node_2
--let $val2 = `SELECT MAX(id) FROM t1`
--let $val3 = `SELECT $val1 != $val2`
if ($val3)
{
--echo $val1 $val2
--die wsrep_sync_wait failed
}
--dec $count
}
--eval SET GLOBAL query_cache_size = $qcache_size_orig
DROP TABLE t1;
...@@ -1846,6 +1846,7 @@ Query_cache::send_result_to_client(THD *thd, char *org_sql, uint query_length) ...@@ -1846,6 +1846,7 @@ Query_cache::send_result_to_client(THD *thd, char *org_sql, uint query_length)
goto err; goto err;
} }
} }
/* /*
Try to obtain an exclusive lock on the query cache. If the cache is Try to obtain an exclusive lock on the query cache. If the cache is
disabled or if a full cache flush is in progress, the attempt to disabled or if a full cache flush is in progress, the attempt to
...@@ -1957,6 +1958,25 @@ def_week_frmt: %lu, in_trans: %d, autocommit: %d", ...@@ -1957,6 +1958,25 @@ def_week_frmt: %lu, in_trans: %d, autocommit: %d",
} }
DBUG_PRINT("qcache", ("Query in query hash 0x%lx", (ulong)query_block)); DBUG_PRINT("qcache", ("Query in query hash 0x%lx", (ulong)query_block));
#ifdef WITH_WSREP
if (WSREP_CLIENT(thd) && wsrep_must_sync_wait(thd)) {
unlock();
if (wsrep_sync_wait(thd))
goto err;
if (try_lock(TRUE))
goto err;
query_block = (Query_cache_block *) my_hash_search(&queries,
(uchar*) sql,
tot_length);
if (query_block == 0 ||
query_block->query()->result() == 0 ||
query_block->query()->result()->type != Query_cache_block::RESULT)
{
goto err_unlock;
}
}
#endif /* WITH_WSREP */
/* Now lock and test that nothing changed while blocks was unlocked */ /* Now lock and test that nothing changed while blocks was unlocked */
BLOCK_LOCK_RD(query_block); BLOCK_LOCK_RD(query_block);
......
...@@ -1213,6 +1213,7 @@ THD::THD() ...@@ -1213,6 +1213,7 @@ THD::THD()
wsrep_mysql_replicated = 0; wsrep_mysql_replicated = 0;
wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query = NULL;
wsrep_TOI_pre_query_len = 0; wsrep_TOI_pre_query_len = 0;
wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED;
#endif #endif
/* Call to init() below requires fully initialized Open_tables_state. */ /* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this); reset_open_tables_state(this);
...@@ -1628,6 +1629,7 @@ void THD::init(void) ...@@ -1628,6 +1629,7 @@ void THD::init(void)
wsrep_mysql_replicated = 0; wsrep_mysql_replicated = 0;
wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query = NULL;
wsrep_TOI_pre_query_len = 0; wsrep_TOI_pre_query_len = 0;
wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED;
/* /*
@@wsrep_causal_reads is now being handled via wsrep_sync_wait, update it @@wsrep_causal_reads is now being handled via wsrep_sync_wait, update it
...@@ -2361,6 +2363,10 @@ void THD::cleanup_after_query() ...@@ -2361,6 +2363,10 @@ void THD::cleanup_after_query()
rgi_slave->cleanup_after_query(); rgi_slave->cleanup_after_query();
#endif #endif
#ifdef WITH_WSREP
wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED;
#endif /* WITH_WSREP */
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
...@@ -2787,7 +2787,6 @@ class THD :public Statement, ...@@ -2787,7 +2787,6 @@ class THD :public Statement,
query_id_t first_query_id; query_id_t first_query_id;
} binlog_evt_union; } binlog_evt_union;
/** /**
Internal parser state. Internal parser state.
Note that since the parser is not re-entrant, we keep only one parser Note that since the parser is not re-entrant, we keep only one parser
...@@ -3865,6 +3864,7 @@ class THD :public Statement, ...@@ -3865,6 +3864,7 @@ class THD :public Statement,
void* wsrep_apply_format; void* wsrep_apply_format;
bool wsrep_apply_toi; /* applier processing in TOI */ bool wsrep_apply_toi; /* applier processing in TOI */
bool wsrep_skip_append_keys; bool wsrep_skip_append_keys;
wsrep_gtid_t wsrep_sync_wait_gtid;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
}; };
......
...@@ -810,19 +810,24 @@ bool wsrep_start_replication() ...@@ -810,19 +810,24 @@ bool wsrep_start_replication()
return true; return true;
} }
bool wsrep_must_sync_wait (THD* thd, uint mask)
{
return (thd->variables.wsrep_sync_wait & mask) &&
thd->variables.wsrep_on &&
!thd->in_active_multi_stmt_transaction() &&
thd->wsrep_conflict_state != REPLAYING &&
thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED;
}
bool wsrep_sync_wait (THD* thd, uint mask) bool wsrep_sync_wait (THD* thd, uint mask)
{ {
if ((thd->variables.wsrep_sync_wait & mask) && if (wsrep_must_sync_wait(thd, mask))
thd->variables.wsrep_on &&
!thd->in_active_multi_stmt_transaction() &&
thd->wsrep_conflict_state != REPLAYING)
{ {
WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u", WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u",
thd->variables.wsrep_sync_wait, mask); thd->variables.wsrep_sync_wait, mask);
// This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
// TODO: modify to check if thd has locked any rows. // TODO: modify to check if thd has locked any rows.
wsrep_gtid_t gtid; wsrep_status_t ret= wsrep->causal_read (wsrep, &thd->wsrep_sync_wait_gtid);
wsrep_status_t ret= wsrep->causal_read (wsrep, &gtid);
if (unlikely(WSREP_OK != ret)) if (unlikely(WSREP_OK != ret))
{ {
......
...@@ -192,6 +192,7 @@ extern void wsrep_kill_mysql(THD *thd); ...@@ -192,6 +192,7 @@ extern void wsrep_kill_mysql(THD *thd);
/* new defines */ /* new defines */
extern void wsrep_stop_replication(THD *thd); extern void wsrep_stop_replication(THD *thd);
extern bool wsrep_start_replication(); extern bool wsrep_start_replication();
extern bool wsrep_must_sync_wait (THD* thd, uint mask = WSREP_SYNC_WAIT_BEFORE_READ);
extern bool wsrep_sync_wait (THD* thd, uint mask = WSREP_SYNC_WAIT_BEFORE_READ); extern bool wsrep_sync_wait (THD* thd, uint mask = WSREP_SYNC_WAIT_BEFORE_READ);
extern int wsrep_check_opts (int argc, char* const* argv); extern int wsrep_check_opts (int argc, char* const* argv);
extern void wsrep_prepend_PATH (const char* path); extern void wsrep_prepend_PATH (const char* path);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment