Commit fc716dc5 authored by Nirbhay Choubey's avatar Nirbhay Choubey

MDEV-8260 : Issues related to concurrent CTAS

* Wait for aborted thd (victim) to release MDL locks
* Skip aborting an already aborted thd
* Defer setting OK status in case of CTAS
* Minor cosmetic changes
* Added a test case
parent 6050ab65
--source include/galera_cluster.inc
--source include/have_innodb.inc
--write_file $MYSQLTEST_VARDIR/tmp/galera_concurrent.sql
CREATE table t1 as SELECT SLEEP(0);
DROP table t1;
CREATE table t1 as SELECT SLEEP(0);
DROP table t1;
CREATE table t1 as SELECT SLEEP(0);
DROP table t1;
CREATE table t1 as SELECT SLEEP(0);
DROP table t1;
CREATE table t1 as SELECT SLEEP(0);
DROP table t1;
CREATE table t1 as SELECT SLEEP(0);
DROP table t1;
CREATE table t1 as SELECT SLEEP(0);
DROP table t1;
CREATE table t1 as SELECT SLEEP(0);
DROP table t1;
CREATE table t1 as SELECT SLEEP(0);
CREATE table t2 as SELECT SLEEP(0);
CREATE table t3 as SELECT SLEEP(0);
CREATE table t4 as SELECT SLEEP(0);
CREATE table t5 as SELECT SLEEP(0);
CREATE table t6 as SELECT SLEEP(0);
CREATE table t7 as SELECT SLEEP(0);
CREATE table t8 as SELECT SLEEP(0);
CREATE table t9 as SELECT SLEEP(0);
DROP table t1;
DROP table t2;
DROP table t3;
DROP table t4;
DROP table t5;
DROP table t6;
DROP table t7;
DROP table t8;
DROP table t9;
EOF
let $run=10;
while($run)
{
--error 0,1
exec $MYSQL --user=root --host=127.0.0.1 --port=$NODE_MYPORT_1 test
< $MYSQLTEST_VARDIR/tmp/galera_concurrent.sql &
$MYSQL --user=root --host=127.0.0.1 --port=$NODE_MYPORT_2 test
< $MYSQLTEST_VARDIR/tmp/galera_concurrent.sql;
dec $run;
}
--remove_file $MYSQLTEST_VARDIR/tmp/galera_concurrent.sql
--source include/galera_end.inc
--echo # End of test
...@@ -1199,6 +1199,13 @@ MDL_wait::timed_wait(THD *thd, struct timespec *abs_timeout, ...@@ -1199,6 +1199,13 @@ MDL_wait::timed_wait(THD *thd, struct timespec *abs_timeout,
while (!m_wait_status && !thd->killed && while (!m_wait_status && !thd->killed &&
wait_result != ETIMEDOUT && wait_result != ETIME) wait_result != ETIMEDOUT && wait_result != ETIME)
{ {
#ifdef WITH_WSREP
if (wsrep_thd_is_BF(thd, true))
{
wait_result= mysql_cond_wait(&m_COND_wait_status, &m_LOCK_wait_status);
}
else
#endif
wait_result= mysql_cond_timedwait(&m_COND_wait_status, &m_LOCK_wait_status, wait_result= mysql_cond_timedwait(&m_COND_wait_status, &m_LOCK_wait_status,
abs_timeout); abs_timeout);
} }
...@@ -1283,12 +1290,15 @@ void MDL_lock::Ticket_list::add_ticket(MDL_ticket *ticket) ...@@ -1283,12 +1290,15 @@ void MDL_lock::Ticket_list::add_ticket(MDL_ticket *ticket)
WSREP_DEBUG("MDL add_ticket inserted before: %lu %s", WSREP_DEBUG("MDL add_ticket inserted before: %lu %s",
wsrep_thd_thread_id(waiting->get_ctx()->get_thd()), wsrep_thd_thread_id(waiting->get_ctx()->get_thd()),
wsrep_thd_query(waiting->get_ctx()->get_thd())); wsrep_thd_query(waiting->get_ctx()->get_thd()));
/* Insert the ticket before the first non-BF waiting thd. */
m_list.insert_after(prev, ticket); m_list.insert_after(prev, ticket);
added= true; added= true;
} }
prev= waiting; prev= waiting;
} }
if (!added) m_list.push_back(ticket);
/* Otherwise, insert the ticket at the back of the waiting list. */
if (!added) m_list.push_back(ticket);
while ((granted= itg++)) while ((granted= itg++))
{ {
......
...@@ -3486,6 +3486,8 @@ class select_insert :public select_result_interceptor { ...@@ -3486,6 +3486,8 @@ class select_insert :public select_result_interceptor {
virtual void store_values(List<Item> &values); virtual void store_values(List<Item> &values);
virtual bool can_rollback_data() { return 0; } virtual bool can_rollback_data() { return 0; }
void send_error(uint errcode,const char *err); void send_error(uint errcode,const char *err);
bool prepare_eof();
bool send_ok_packet();
bool send_eof(); bool send_eof();
virtual void abort_result_set(); virtual void abort_result_set();
/* not implemented: select_insert is never re-used in prepared statements */ /* not implemented: select_insert is never re-used in prepared statements */
......
...@@ -3650,14 +3650,14 @@ void select_insert::send_error(uint errcode,const char *err) ...@@ -3650,14 +3650,14 @@ void select_insert::send_error(uint errcode,const char *err)
} }
bool select_insert::send_eof() bool select_insert::prepare_eof()
{ {
int error; int error;
bool const trans_table= table->file->has_transactions(); bool const trans_table= table->file->has_transactions();
ulonglong id, row_count;
bool changed; bool changed;
killed_state killed_status= thd->killed; killed_state killed_status= thd->killed;
DBUG_ENTER("select_insert::send_eof");
DBUG_ENTER("select_insert::prepare_eof");
DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'", DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
trans_table, table->file->table_type())); trans_table, table->file->table_type()));
...@@ -3699,11 +3699,10 @@ bool select_insert::send_eof() ...@@ -3699,11 +3699,10 @@ bool select_insert::send_eof()
*/ */
#ifdef WITH_WSREP #ifdef WITH_WSREP
if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) && if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
(!error || thd->transaction.stmt.modified_non_trans_table))
#else #else
if (mysql_bin_log.is_open() && if (mysql_bin_log.is_open() &&
(!error || thd->transaction.stmt.modified_non_trans_table))
#endif #endif
(!error || thd->transaction.stmt.modified_non_trans_table))
{ {
int errcode= 0; int errcode= 0;
if (!error) if (!error)
...@@ -3715,7 +3714,7 @@ bool select_insert::send_eof() ...@@ -3715,7 +3714,7 @@ bool select_insert::send_eof()
trans_table, FALSE, FALSE, errcode)) trans_table, FALSE, FALSE, errcode))
{ {
table->file->ha_release_auto_increment(); table->file->ha_release_auto_increment();
DBUG_RETURN(1); DBUG_RETURN(true);
} }
} }
table->file->ha_release_auto_increment(); table->file->ha_release_auto_increment();
...@@ -3723,27 +3722,49 @@ bool select_insert::send_eof() ...@@ -3723,27 +3722,49 @@ bool select_insert::send_eof()
if (error) if (error)
{ {
table->file->print_error(error,MYF(0)); table->file->print_error(error,MYF(0));
DBUG_RETURN(1); DBUG_RETURN(true);
} }
char buff[160];
DBUG_RETURN(false);
}
bool select_insert::send_ok_packet() {
char message[160]; /* status message */
ulong row_count; /* rows affected */
ulong id; /* last insert-id */
DBUG_ENTER("select_insert::send_ok_packet");
if (info.ignore) if (info.ignore)
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
(ulong) (info.records - info.copied), (ulong) info.records, (ulong) (info.records - info.copied),
(ulong) thd->warning_info->statement_warn_count()); (ulong) thd->warning_info->statement_warn_count());
else else
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
(ulong) (info.deleted+info.updated), (ulong) info.records, (ulong) (info.deleted + info.updated),
(ulong) thd->warning_info->statement_warn_count()); (ulong) thd->warning_info->statement_warn_count());
row_count= info.copied + info.deleted + row_count= info.copied + info.deleted +
((thd->client_capabilities & CLIENT_FOUND_ROWS) ? ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
info.touched : info.updated); info.touched : info.updated);
id= (thd->first_successful_insert_id_in_cur_stmt > 0) ? id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
thd->first_successful_insert_id_in_cur_stmt : thd->first_successful_insert_id_in_cur_stmt :
(thd->arg_of_last_insert_id_function ? (thd->arg_of_last_insert_id_function ?
thd->first_successful_insert_id_in_prev_stmt : thd->first_successful_insert_id_in_prev_stmt :
(info.copied ? autoinc_value_of_last_inserted_row : 0)); (info.copied ? autoinc_value_of_last_inserted_row : 0));
::my_ok(thd, row_count, id, buff);
DBUG_RETURN(0); ::my_ok(thd, row_count, id, message);
DBUG_RETURN(false);
}
bool select_insert::send_eof()
{
bool res;
DBUG_ENTER("select_insert::send_eof");
res= (prepare_eof() || send_ok_packet());
DBUG_RETURN(res);
} }
void select_insert::abort_result_set() { void select_insert::abort_result_set() {
...@@ -4240,9 +4261,12 @@ void select_create::send_error(uint errcode,const char *err) ...@@ -4240,9 +4261,12 @@ void select_create::send_error(uint errcode,const char *err)
bool select_create::send_eof() bool select_create::send_eof()
{ {
bool tmp=select_insert::send_eof(); DBUG_ENTER("select_create::send_eof");
if (tmp) if (prepare_eof())
{
abort_result_set(); abort_result_set();
DBUG_RETURN(true);
}
else else
{ {
/* /*
...@@ -4262,7 +4286,7 @@ bool select_create::send_eof() ...@@ -4262,7 +4286,7 @@ bool select_create::send_eof()
thd->thread_id, thd->wsrep_conflict_state, thd->query()); thd->thread_id, thd->wsrep_conflict_state, thd->query());
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
abort_result_set(); abort_result_set();
return TRUE; DBUG_RETURN(true);
} }
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
...@@ -4270,6 +4294,9 @@ bool select_create::send_eof() ...@@ -4270,6 +4294,9 @@ bool select_create::send_eof()
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
send_ok_packet();
if (m_plock) if (m_plock)
{ {
mysql_unlock_tables(thd, *m_plock); mysql_unlock_tables(thd, *m_plock);
...@@ -4277,7 +4304,7 @@ bool select_create::send_eof() ...@@ -4277,7 +4304,7 @@ bool select_create::send_eof()
m_plock= NULL; m_plock= NULL;
} }
} }
return tmp; DBUG_RETURN(false);
} }
......
...@@ -1476,10 +1476,21 @@ void wsrep_to_isolation_end(THD *thd) ...@@ -1476,10 +1476,21 @@ void wsrep_to_isolation_end(THD *thd)
gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
gra->command, gra->lex->sql_command, gra->query()); gra->command, gra->lex->sql_command, gra->query());
/**
Check if request for the metadata lock should be granted to the requester.
@param requestor_ctx The MDL context of the requestor
@param ticket MDL ticket for the requested lock
@retval TRUE Lock request can be granted
@retval FALSE Lock request cannot be granted
*/
bool bool
wsrep_grant_mdl_exception(MDL_context *requestor_ctx, wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
MDL_ticket *ticket MDL_ticket *ticket
) { ) {
/* Fallback to the non-wsrep behaviour */
if (!WSREP_ON) return FALSE; if (!WSREP_ON) return FALSE;
THD *request_thd = requestor_ctx->get_thd(); THD *request_thd = requestor_ctx->get_thd();
......
...@@ -495,6 +495,17 @@ int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) ...@@ -495,6 +495,17 @@ int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
victim_thd) victim_thd)
{ {
if ((victim_thd->wsrep_conflict_state == MUST_ABORT) ||
(victim_thd->wsrep_conflict_state == ABORTED) ||
(victim_thd->wsrep_conflict_state == ABORTING))
{
WSREP_DEBUG("wsrep_abort_thd called by %llu with victim %llu already "
"aborted. Ignoring.",
(bf_thd) ? (long long)bf_thd->real_id : 0,
(long long)victim_thd->real_id);
DBUG_RETURN(1);
}
WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
(long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
ha_wsrep_abort_transaction(bf_thd, victim_thd, signal); ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment