Commit aa930f78 authored by Matthias Leich's avatar Matthias Leich

Merge last changesets, no conflicts

parents 2e36bc3d 7aec6cce
...@@ -159,7 +159,8 @@ void thr_multi_unlock(THR_LOCK_DATA **data,uint count); ...@@ -159,7 +159,8 @@ void thr_multi_unlock(THR_LOCK_DATA **data,uint count);
void thr_abort_locks(THR_LOCK *lock, my_bool upgrade_lock); void thr_abort_locks(THR_LOCK *lock, my_bool upgrade_lock);
my_bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread); my_bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread);
void thr_print_locks(void); /* For debugging */ void thr_print_locks(void); /* For debugging */
my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data); my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data,
enum thr_lock_type new_lock_type);
void thr_downgrade_write_lock(THR_LOCK_DATA *data, void thr_downgrade_write_lock(THR_LOCK_DATA *data,
enum thr_lock_type new_lock_type); enum thr_lock_type new_lock_type);
my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data); my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data);
......
...@@ -284,4 +284,30 @@ ERROR 22007: Incorrect date value: '0000-00-00' for column 'f1' at row 1 ...@@ -284,4 +284,30 @@ ERROR 22007: Incorrect date value: '0000-00-00' for column 'f1' at row 1
INSERT DELAYED INTO t2 VALUES (0,'2007-00-00'); INSERT DELAYED INTO t2 VALUES (0,'2007-00-00');
ERROR 22007: Incorrect date value: '2007-00-00' for column 'f1' at row 1 ERROR 22007: Incorrect date value: '2007-00-00' for column 'f1' at row 1
DROP TABLE t1,t2; DROP TABLE t1,t2;
set @old_delayed_updates = @@global.low_priority_updates;
set global low_priority_updates = 1;
select @@global.low_priority_updates;
@@global.low_priority_updates
1
drop table if exists t1;
create table t1 (a int, b int);
insert into t1 values (1,1);
lock table t1 read;
connection: update
insert delayed into t1 values (2,2);;
connection: select
select * from t1;
a b
1 1
connection: default
select * from t1;
a b
1 1
unlock tables;
select * from t1;
a b
1 1
2 2
drop table t1;
set global low_priority_updates = @old_delayed_updates;
End of 5.1 tests End of 5.1 tests
...@@ -10,7 +10,4 @@ ...@@ -10,7 +10,4 @@
# #
############################################################################## ##############################################################################
rpl_ndb_circular : Bug#41183 rpl_ndb_circular, rpl_ndb_circular_simplex need maintenance, crash
rpl_ndb_circular_simplex : Bug#41183 rpl_ndb_circular, rpl_ndb_circular_simplex need maintenance, crash
# the below testcase have been reworked to avoid the bug, test contains comment, keep bug open # the below testcase have been reworked to avoid the bug, test contains comment, keep bug open
...@@ -285,4 +285,47 @@ INSERT DELAYED INTO t2 VALUES (0,'0000-00-00'); ...@@ -285,4 +285,47 @@ INSERT DELAYED INTO t2 VALUES (0,'0000-00-00');
INSERT DELAYED INTO t2 VALUES (0,'2007-00-00'); INSERT DELAYED INTO t2 VALUES (0,'2007-00-00');
DROP TABLE t1,t2; DROP TABLE t1,t2;
#
# Bug#40536: SELECT is blocked by INSERT DELAYED waiting on upgrading lock,
# even with low_priority_updates
#
set @old_delayed_updates = @@global.low_priority_updates;
set global low_priority_updates = 1;
select @@global.low_priority_updates;
--disable_warnings
drop table if exists t1;
--enable_warnings
create table t1 (a int, b int);
insert into t1 values (1,1);
lock table t1 read;
connect (update,localhost,root,,);
connection update;
--echo connection: update
--send insert delayed into t1 values (2,2);
connection default;
let $wait_condition=
select count(*) = 1 from information_schema.processlist
where command = "Delayed insert" and state = "upgrading lock";
--source include/wait_condition.inc
connect (select,localhost,root,,);
--echo connection: select
select * from t1;
connection default;
--echo connection: default
select * from t1;
connection default;
disconnect update;
disconnect select;
unlock tables;
let $wait_condition=
select count(*) = 1 from information_schema.processlist
where command = "Delayed insert" and state = "Waiting for INSERT";
--source include/wait_condition.inc
select * from t1;
drop table t1;
set global low_priority_updates = @old_delayed_updates;
--echo End of 5.1 tests --echo End of 5.1 tests
...@@ -1359,7 +1359,8 @@ void thr_downgrade_write_lock(THR_LOCK_DATA *in_data, ...@@ -1359,7 +1359,8 @@ void thr_downgrade_write_lock(THR_LOCK_DATA *in_data,
/* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */ /* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */
my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data) my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data,
enum thr_lock_type new_lock_type)
{ {
THR_LOCK *lock=data->lock; THR_LOCK *lock=data->lock;
DBUG_ENTER("thr_upgrade_write_delay_lock"); DBUG_ENTER("thr_upgrade_write_delay_lock");
...@@ -1372,7 +1373,7 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data) ...@@ -1372,7 +1373,7 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
} }
check_locks(lock,"before upgrading lock",0); check_locks(lock,"before upgrading lock",0);
/* TODO: Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */ /* TODO: Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */
data->type=TL_WRITE; /* Upgrade lock */ data->type= new_lock_type; /* Upgrade lock */
/* Check if someone has given us the lock */ /* Check if someone has given us the lock */
if (!data->cond) if (!data->cond)
...@@ -1411,6 +1412,7 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data) ...@@ -1411,6 +1412,7 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data) my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
{ {
THR_LOCK *lock=data->lock; THR_LOCK *lock=data->lock;
enum thr_lock_type write_lock_type;
DBUG_ENTER("thr_reschedule_write_lock"); DBUG_ENTER("thr_reschedule_write_lock");
pthread_mutex_lock(&lock->mutex); pthread_mutex_lock(&lock->mutex);
...@@ -1420,6 +1422,7 @@ my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data) ...@@ -1420,6 +1422,7 @@ my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
write_lock_type= data->type;
data->type=TL_WRITE_DELAYED; data->type=TL_WRITE_DELAYED;
if (lock->update_status) if (lock->update_status)
(*lock->update_status)(data->status_param); (*lock->update_status)(data->status_param);
...@@ -1438,7 +1441,7 @@ my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data) ...@@ -1438,7 +1441,7 @@ my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
free_all_read_locks(lock,0); free_all_read_locks(lock,0);
pthread_mutex_unlock(&lock->mutex); pthread_mutex_unlock(&lock->mutex);
DBUG_RETURN(thr_upgrade_write_delay_lock(data)); DBUG_RETURN(thr_upgrade_write_delay_lock(data, write_lock_type));
} }
......
...@@ -53,6 +53,8 @@ ...@@ -53,6 +53,8 @@
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD* thd);
static const char *HA_ERR(int i) static const char *HA_ERR(int i)
{ {
switch (i) { switch (i) {
...@@ -2894,7 +2896,37 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, ...@@ -2894,7 +2896,37 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); if (strcmp("COMMIT", query) == 0 && rli->tables_to_lock)
{
/*
Cleaning-up the last statement context:
the terminal event of the current statement flagged with
STMT_END_F got filtered out in ndb circular replication.
*/
int error;
char llbuff[22];
if ((error= rows_event_stmt_cleanup(const_cast<Relay_log_info*>(rli), thd)))
{
const_cast<Relay_log_info*>(rli)->report(ERROR_LEVEL, error,
"Error in cleaning up after an event preceeding the commit; "
"the group log file/position: %s %s",
const_cast<Relay_log_info*>(rli)->group_master_log_name,
llstr(const_cast<Relay_log_info*>(rli)->group_master_log_pos,
llbuff));
}
/*
Executing a part of rli->stmt_done() logics that does not deal
with group position change. The part is redundant now but is
future-change-proof addon, e.g if COMMIT handling will start checking
invariants like IN_STMT flag must be off at committing the transaction.
*/
const_cast<Relay_log_info*>(rli)->inc_event_relay_log_pos();
const_cast<Relay_log_info*>(rli)->clear_flag(Relay_log_info::IN_STMT);
}
else
{
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
}
/* /*
Note: We do not need to execute reset_one_shot_variables() if this Note: We do not need to execute reset_one_shot_variables() if this
...@@ -7403,16 +7435,20 @@ Rows_log_event::do_shall_skip(Relay_log_info *rli) ...@@ -7403,16 +7435,20 @@ Rows_log_event::do_shall_skip(Relay_log_info *rli)
return Log_event::do_shall_skip(rli); return Log_event::do_shall_skip(rli);
} }
int /**
Rows_log_event::do_update_pos(Relay_log_info *rli) The function is called at Rows_log_event statement commit time,
{ normally from Rows_log_event::do_update_pos() and possibly from
DBUG_ENTER("Rows_log_event::do_update_pos"); Query_log_event::do_apply_event() of the COMMIT.
int error= 0; The function commits the last statement for engines, binlog and
releases resources have been allocated for the statement.
DBUG_PRINT("info", ("flags: %s",
get_flags(STMT_END_F) ? "STMT_END_F " : "")); @retval 0 Ok.
@retval non-zero Error at the commit.
*/
if (get_flags(STMT_END_F)) static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
{
int error;
{ {
/* /*
This is the end of a statement or transaction, so close (and This is the end of a statement or transaction, so close (and
...@@ -7454,14 +7490,39 @@ Rows_log_event::do_update_pos(Relay_log_info *rli) ...@@ -7454,14 +7490,39 @@ Rows_log_event::do_update_pos(Relay_log_info *rli)
thd->reset_current_stmt_binlog_row_based(); thd->reset_current_stmt_binlog_row_based();
rli->cleanup_context(thd, 0); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
if (error == 0) }
return error;
}
/**
The method either increments the relay log position or
commits the current statement and increments the master group
possition if the event is STMT_END_F flagged and
the statement corresponds to the autocommit query (i.e replicated
without wrapping in BEGIN/COMMIT)
@retval 0 Success
@retval non-zero Error in the statement commit
*/
int
Rows_log_event::do_update_pos(Relay_log_info *rli)
{
DBUG_ENTER("Rows_log_event::do_update_pos");
int error= 0;
DBUG_PRINT("info", ("flags: %s",
get_flags(STMT_END_F) ? "STMT_END_F " : ""));
if (get_flags(STMT_END_F))
{
if ((error= rows_event_stmt_cleanup(rli, thd)) == 0)
{ {
/* /*
Indicate that a statement is finished. Indicate that a statement is finished.
Step the group log position if we are not in a transaction, Step the group log position if we are not in a transaction,
otherwise increase the event log position. otherwise increase the event log position.
*/ */
rli->stmt_done(log_pos, when); rli->stmt_done(log_pos, when);
/* /*
...@@ -7475,11 +7536,13 @@ Rows_log_event::do_update_pos(Relay_log_info *rli) ...@@ -7475,11 +7536,13 @@ Rows_log_event::do_update_pos(Relay_log_info *rli)
thd->clear_error(); thd->clear_error();
} }
else else
{
rli->report(ERROR_LEVEL, error, rli->report(ERROR_LEVEL, error,
"Error in %s event: commit of row events failed, " "Error in %s event: commit of row events failed, "
"table `%s`.`%s`", "table `%s`.`%s`",
get_type_str(), m_table->s->db.str, get_type_str(), m_table->s->db.str,
m_table->s->table_name.str); m_table->s->table_name.str);
}
} }
else else
{ {
......
...@@ -1690,6 +1690,7 @@ class delayed_row :public ilink { ...@@ -1690,6 +1690,7 @@ class delayed_row :public ilink {
class Delayed_insert :public ilink { class Delayed_insert :public ilink {
uint locks_in_memory; uint locks_in_memory;
thr_lock_type delayed_lock;
public: public:
THD thd; THD thd;
TABLE *table; TABLE *table;
...@@ -1731,6 +1732,8 @@ class Delayed_insert :public ilink { ...@@ -1731,6 +1732,8 @@ class Delayed_insert :public ilink {
pthread_cond_init(&cond_client,NULL); pthread_cond_init(&cond_client,NULL);
VOID(pthread_mutex_lock(&LOCK_thread_count)); VOID(pthread_mutex_lock(&LOCK_thread_count));
delayed_insert_threads++; delayed_insert_threads++;
delayed_lock= global_system_variables.low_priority_updates ?
TL_WRITE_LOW_PRIORITY : TL_WRITE;
VOID(pthread_mutex_unlock(&LOCK_thread_count)); VOID(pthread_mutex_unlock(&LOCK_thread_count));
} }
~Delayed_insert() ~Delayed_insert()
...@@ -2540,7 +2543,7 @@ bool Delayed_insert::handle_inserts(void) ...@@ -2540,7 +2543,7 @@ bool Delayed_insert::handle_inserts(void)
table->use_all_columns(); table->use_all_columns();
thd_proc_info(&thd, "upgrading lock"); thd_proc_info(&thd, "upgrading lock");
if (thr_upgrade_write_delay_lock(*thd.lock->locks)) if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock))
{ {
/* /*
This can happen if thread is killed either by a shutdown This can happen if thread is killed either by a shutdown
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment