Commit 3395c53e authored by unknown's avatar unknown

A fix and a test case for

Bug#21483 "Server abort or deadlock on INSERT DELAYED with another
implicit insert"
Also fixes and adds test cases for bugs:
20497 "Trigger with INSERT DELAYED causes Error 1165"
21714 "Wrong NEW.value and server abort on INSERT DELAYED to a
table with a trigger".
Post-review fixes.

Problem:
In MySQL INSERT DELAYED is a way to pipe all inserts into a
given table through a dedicated thread. This is necessary for
simplistic storage engines like MyISAM, which do not have internal
concurrency control or threading and thus can not
achieve efficient INSERT throughput without support from SQL layer.
DELAYED INSERT works as follows:
For every distinct table, which can accept DELAYED inserts and has
pending data to insert, a dedicated thread is created to write data
to disk. All user connection threads that attempt to
delayed-insert into this table interact with the dedicated thread in
producer/consumer fashion: all records to-be inserted are pushed
into a queue of the dedicated thread, which fetches the records and 
writes them.
In this design, client connection threads never open or lock
the delayed insert table.
This functionality was introduced in version 3.23 and does not take 
into account existence of triggers, views, or pre-locking.
E.g. if INSERT DELAYED is called from a stored function, which,
in turn, is called from another stored function that uses the delayed
table, a deadlock can occur, because delayed locking by-passes
pre-locking. Besides:
 * the delayed thread works directly with the subject table through
   the storage engine API and does not invoke triggers
 * even if it was patched to invoke triggers, if triggers,
   in turn, used other tables, the delayed thread would
   have to open and lock involved tables (use pre-locking).
 * even if it was patched to use pre-locking, without deadlock
   detection the delayed thread could easily lock out user 
   connection threads in case when the same table is used both
   in a trigger and on the right side of the insert query: 
   the delayed thread would not release locks until all inserts 
   are complete, and user connection can not complete inserts 
   without having locks on the tables used on the right side of the
   query.

Solution:

These considerations suggest two general alternatives for the
future of INSERT DELAYED:
 * it is considered a full-fledged alternative to normal INSERT
 * it is regarded as an optimisation that is only relevant 
   for simplistic engines.
Since we missed our chance to provide complete support of new
features when 5.0 was in development, the first alternative
currently renders infeasible.
However, even the second alternative, which is to detect
new features and convert DELAYED insert into a normal insert, 
is not easy to implement.
The catch-22 is that we don't know if the subject table has triggers
or is a view before we open it, and we only open it in the
delayed thread. We don't know if the query involves pre-locking
until we have opened all tables, and we always first create
the delayed thread, and only then open the remaining tables.
This patch detects the problematic scenarios and converts
DELAYED INSERT to a normal INSERT using the following approach:
 * if the statement is executed under pre-locking (e.g. from
   within a stored function or trigger) or the right
   side may require pre-locking, we detect the situation
   before creating a delayed insert thread and convert the statement
   to a conventional INSERT.
  * if the subject table is a view or has triggers, we shutdown
   the delayed thread and convert the statement to a conventional
   INSERT.


mysql-test/r/insert.result:
  Update test results.
mysql-test/t/insert.test:
  Add a test case for Bug#21483, Bug#20497, Bug#21714 (INSERT DELAYED
  and stored routines, triggers).
sql/sp_head.cc:
  Upgrade lock type to TL_WRITE when computing the pre-locking set.
sql/sql_base.cc:
  Use a new method.
sql/sql_insert.cc:
  INSERT DELAYED and pre-locking:
  - if  under pre-locking, upgrade the lock type to TL_WRITE
  and proceed as a normal write
  - if DELAYED table has triggers, also request a lock upgrade.
  - make sure errors in the delayed thread are propagated
  correctly
sql/sql_lex.h:
  Add a method to check if a parsed tree refers to stored
  routines.
parent 30184f96
...@@ -346,3 +346,119 @@ f1 f2 ...@@ -346,3 +346,119 @@ f1 f2
12 NULL 12 NULL
drop view v1; drop view v1;
drop table t1,t2; drop table t1,t2;
DROP TABLE IF EXISTS t1;
DROP FUNCTION IF EXISTS f1;
DROP FUNCTION IF EXISTS f2;
CREATE TABLE t1 (i INT);
CREATE FUNCTION f1() RETURNS INT
BEGIN
INSERT INTO t1 VALUES (1);
RETURN 1;
END |
CREATE FUNCTION f2() RETURNS INT
BEGIN
INSERT DELAYED INTO t1 VALUES (2);
RETURN 1;
END |
SELECT f1();
f1()
1
SELECT f2();
f2()
1
INSERT INTO t1 VALUES (3);
INSERT DELAYED INTO t1 VALUES (4);
INSERT INTO t1 VALUES (f1());
ERROR HY000: Can't update table 't1' in stored function/trigger because it is already used by statement which invoked this stored function/trigger.
INSERT DELAYED INTO t1 VALUES (f1());
ERROR HY000: Can't update table 't1' in stored function/trigger because it is already used by statement which invoked this stored function/trigger.
INSERT INTO t1 VALUES (f2());
ERROR HY000: Can't update table 't1' in stored function/trigger because it is already used by statement which invoked this stored function/trigger.
INSERT DELAYED INTO t1 VALUES (f2());
ERROR HY000: Can't update table 't1' in stored function/trigger because it is already used by statement which invoked this stored function/trigger.
CREATE TRIGGER t1_bi BEFORE INSERT ON t1 FOR EACH ROW
INSERT INTO t1 VALUES (NEW.i);
INSERT INTO t1 VALUES (1);
ERROR HY000: Can't update table 't1' in stored function/trigger because it is already used by statement which invoked this stored function/trigger.
INSERT DELAYED INTO t1 VALUES (1);
ERROR HY000: Can't update table 't1' in stored function/trigger because it is already used by statement which invoked this stored function/trigger.
SELECT * FROM t1;
i
1
2
3
4
DROP FUNCTION f2;
DROP FUNCTION f1;
DROP TABLE t1;
DROP TABLE IF EXISTS t1, t2;
CREATE TABLE t1 (i INT);
CREATE TABLE t2 (i INT);
CREATE TRIGGER t1_bi BEFORE INSERT ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (NEW.i);
CREATE TRIGGER t1_bu BEFORE UPDATE ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (NEW.i);
CREATE TRIGGER t1_bd BEFORE DELETE ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (OLD.i);
INSERT INTO t1 VALUES (1);
INSERT DELAYED INTO t1 VALUES (2);
SELECT * FROM t1;
i
1
2
UPDATE t1 SET i = 3 WHERE i = 1;
SELECT * FROM t1;
i
3
2
DELETE FROM t1 WHERE i = 3;
SELECT * FROM t1;
i
2
SELECT * FROM t2;
i
1
2
3
3
DROP TABLE t1, t2;
DROP TABLE IF EXISTS t1, t2;
CREATE TABLE t1 (i INT);
CREATE TRIGGER t1_bi BEFORE INSERT ON t1 FOR EACH ROW
SET @a= NEW.i;
SET @a= 0;
INSERT DELAYED INTO t1 VALUES (1);
SELECT @a;
@a
1
INSERT DELAYED INTO t1 VALUES (2);
SELECT @a;
@a
2
DROP TABLE t1;
CREATE TABLE t1 (i INT);
CREATE TABLE t2 (i INT);
CREATE TRIGGER t1_ai AFTER INSERT ON t1 FOR EACH ROW
INSERT INTO t2 VALUES (NEW.i);
CREATE TRIGGER t1_au AFTER UPDATE ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (NEW.i);
CREATE TRIGGER t1_ad AFTER DELETE ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (OLD.i);
INSERT DELAYED INTO t1 VALUES (1);
SELECT * FROM t1;
i
1
UPDATE t1 SET i = 2 WHERE i = 1;
SELECT * FROM t1;
i
2
DELETE FROM t1 WHERE i = 2;
SELECT * FROM t1;
i
SELECT * FROM t2;
i
1
2
2
DROP TABLE t1, t2;
End of 5.0 tests.
...@@ -216,3 +216,142 @@ select * from t1; ...@@ -216,3 +216,142 @@ select * from t1;
drop view v1; drop view v1;
drop table t1,t2; drop table t1,t2;
#
# BUG#21483: Server abort or deadlock on INSERT DELAYED with another
# implicit insert
#
# The solution is to downgrade INSERT DELAYED to normal INSERT if the
# statement uses functions and access tables or triggers, or is called
# from a function or a trigger.
#
--disable_warnings
DROP TABLE IF EXISTS t1;
DROP FUNCTION IF EXISTS f1;
DROP FUNCTION IF EXISTS f2;
--enable_warnings
CREATE TABLE t1 (i INT);
delimiter |;
CREATE FUNCTION f1() RETURNS INT
BEGIN
INSERT INTO t1 VALUES (1);
RETURN 1;
END |
CREATE FUNCTION f2() RETURNS INT
BEGIN
INSERT DELAYED INTO t1 VALUES (2);
RETURN 1;
END |
delimiter ;|
SELECT f1();
SELECT f2();
INSERT INTO t1 VALUES (3);
INSERT DELAYED INTO t1 VALUES (4);
--error ER_CANT_UPDATE_USED_TABLE_IN_SF_OR_TRG
INSERT INTO t1 VALUES (f1());
--error ER_CANT_UPDATE_USED_TABLE_IN_SF_OR_TRG
INSERT DELAYED INTO t1 VALUES (f1());
--error ER_CANT_UPDATE_USED_TABLE_IN_SF_OR_TRG
INSERT INTO t1 VALUES (f2());
--error ER_CANT_UPDATE_USED_TABLE_IN_SF_OR_TRG
INSERT DELAYED INTO t1 VALUES (f2());
CREATE TRIGGER t1_bi BEFORE INSERT ON t1 FOR EACH ROW
INSERT INTO t1 VALUES (NEW.i);
--error ER_CANT_UPDATE_USED_TABLE_IN_SF_OR_TRG
INSERT INTO t1 VALUES (1);
--error ER_CANT_UPDATE_USED_TABLE_IN_SF_OR_TRG
INSERT DELAYED INTO t1 VALUES (1);
SELECT * FROM t1;
DROP FUNCTION f2;
DROP FUNCTION f1;
DROP TABLE t1;
#
# BUG#20497: Trigger with INSERT DELAYED causes Error 1165
#
# Fixed by the patch for Bug#21483
#
--disable_warnings
DROP TABLE IF EXISTS t1, t2;
--enable_warnings
CREATE TABLE t1 (i INT);
CREATE TABLE t2 (i INT);
CREATE TRIGGER t1_bi BEFORE INSERT ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (NEW.i);
CREATE TRIGGER t1_bu BEFORE UPDATE ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (NEW.i);
CREATE TRIGGER t1_bd BEFORE DELETE ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (OLD.i);
INSERT INTO t1 VALUES (1);
INSERT DELAYED INTO t1 VALUES (2);
SELECT * FROM t1;
UPDATE t1 SET i = 3 WHERE i = 1;
SELECT * FROM t1;
DELETE FROM t1 WHERE i = 3;
SELECT * FROM t1;
SELECT * FROM t2;
DROP TABLE t1, t2;
#
# BUG#21714: Wrong NEW.value and server abort on INSERT DELAYED to a
# table with a trigger
#
# Fixed by the patch for Bug#21483
#
--disable_warnings
DROP TABLE IF EXISTS t1, t2;
--enable_warnings
CREATE TABLE t1 (i INT);
CREATE TRIGGER t1_bi BEFORE INSERT ON t1 FOR EACH ROW
SET @a= NEW.i;
SET @a= 0;
INSERT DELAYED INTO t1 VALUES (1);
SELECT @a;
INSERT DELAYED INTO t1 VALUES (2);
SELECT @a;
DROP TABLE t1;
CREATE TABLE t1 (i INT);
CREATE TABLE t2 (i INT);
CREATE TRIGGER t1_ai AFTER INSERT ON t1 FOR EACH ROW
INSERT INTO t2 VALUES (NEW.i);
CREATE TRIGGER t1_au AFTER UPDATE ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (NEW.i);
CREATE TRIGGER t1_ad AFTER DELETE ON t1 FOR EACH ROW
INSERT DELAYED INTO t2 VALUES (OLD.i);
INSERT DELAYED INTO t1 VALUES (1);
SELECT * FROM t1;
UPDATE t1 SET i = 2 WHERE i = 1;
SELECT * FROM t1;
DELETE FROM t1 WHERE i = 2;
SELECT * FROM t1;
SELECT * FROM t2;
DROP TABLE t1, t2;
--echo End of 5.0 tests.
...@@ -3487,6 +3487,14 @@ sp_head::merge_table_list(THD *thd, TABLE_LIST *table, LEX *lex_for_tmp_check) ...@@ -3487,6 +3487,14 @@ sp_head::merge_table_list(THD *thd, TABLE_LIST *table, LEX *lex_for_tmp_check)
tlen+= alen; tlen+= alen;
tname[tlen]= '\0'; tname[tlen]= '\0';
/*
Upgrade the lock type because this table list will be used
only in pre-locked mode, in which DELAYED inserts are always
converted to normal inserts.
*/
if (table->lock_type == TL_WRITE_DELAYED)
table->lock_type= TL_WRITE;
/* /*
We ignore alias when we check if table was already marked as temporary We ignore alias when we check if table was already marked as temporary
(and therefore should not be prelocked). Otherwise we will erroneously (and therefore should not be prelocked). Otherwise we will erroneously
......
...@@ -2277,7 +2277,7 @@ int open_tables(THD *thd, TABLE_LIST **start, uint *counter, uint flags) ...@@ -2277,7 +2277,7 @@ int open_tables(THD *thd, TABLE_LIST **start, uint *counter, uint flags)
*/ */
if (!thd->prelocked_mode && !thd->lex->requires_prelocking() && if (!thd->prelocked_mode && !thd->lex->requires_prelocking() &&
thd->lex->sroutines_list.elements) thd->lex->uses_stored_routines())
{ {
bool first_no_prelocking, need_prelocking, tabs_changed; bool first_no_prelocking, need_prelocking, tabs_changed;
TABLE_LIST **save_query_tables_last= thd->lex->query_tables_last; TABLE_LIST **save_query_tables_last= thd->lex->query_tables_last;
...@@ -2465,7 +2465,7 @@ int open_tables(THD *thd, TABLE_LIST **start, uint *counter, uint flags) ...@@ -2465,7 +2465,7 @@ int open_tables(THD *thd, TABLE_LIST **start, uint *counter, uint flags)
*/ */
if (tables->view && !thd->prelocked_mode && if (tables->view && !thd->prelocked_mode &&
!thd->lex->requires_prelocking() && !thd->lex->requires_prelocking() &&
tables->view->sroutines_list.elements) tables->view->uses_stored_routines())
{ {
/* We have at least one table in TL here. */ /* We have at least one table in TL here. */
if (!query_tables_last_own) if (!query_tables_last_own)
......
...@@ -61,7 +61,7 @@ ...@@ -61,7 +61,7 @@
#include "slave.h" #include "slave.h"
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list); static bool delayed_get_table(THD *thd, TABLE_LIST *table_list);
static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore, static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore,
char *query, uint query_length, bool log_on); char *query, uint query_length, bool log_on);
static void end_delayed_insert(THD *thd); static void end_delayed_insert(THD *thd);
...@@ -409,6 +409,7 @@ void mark_fields_used_by_triggers_for_insert_stmt(THD *thd, TABLE *table, ...@@ -409,6 +409,7 @@ void mark_fields_used_by_triggers_for_insert_stmt(THD *thd, TABLE *table,
downgrade the lock in handler::store_lock() method. downgrade the lock in handler::store_lock() method.
*/ */
static
void upgrade_lock_type(THD *thd, thr_lock_type *lock_type, void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
enum_duplicates duplic, enum_duplicates duplic,
bool is_multi_insert) bool is_multi_insert)
...@@ -422,29 +423,37 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type, ...@@ -422,29 +423,37 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
if (*lock_type == TL_WRITE_DELAYED) if (*lock_type == TL_WRITE_DELAYED)
{ {
#ifdef EMBEDDED_LIBRARY
/* No auxiliary threads in the embedded server. */
*lock_type= TL_WRITE;
return;
#else
/* /*
We do not use delayed threads if: We do not use delayed threads if:
- we're running in the safe mode or skip-new - the feature - we're running in the safe mode or skip-new mode -- the
is disabled in these modes feature is disabled in these modes
- we're running this query in statement level replication, - we're executing this statement on a replication slave --
on a replication slave - because we must ensure serial we need to ensure serial execution of queries on the
execution of queries on the slave slave
- it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
insert cannot be concurrent insert cannot be concurrent
- this statement is directly or indirectly invoked from
a stored function or trigger (under pre-locking) - to
avoid deadlocks, since INSERT DELAYED involves a lock
upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
attempt while keeping other table level locks.
- this statement itself may require pre-locking.
We should upgrade the lock even though in most cases
delayed functionality may work. Unfortunately, we can't
easily identify whether the subject table is not used in
the statement indirectly via a stored function or trigger:
if it is used, that will lead to a deadlock between the
client connection and the delayed thread.
*/ */
if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) || if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
thd->slave_thread || thd->slave_thread ||
thd->variables.max_insert_delayed_threads == 0) thd->variables.max_insert_delayed_threads == 0 ||
thd->prelocked_mode ||
thd->lex->uses_stored_routines())
{ {
*lock_type= TL_WRITE; *lock_type= TL_WRITE;
return; return;
} }
#endif
bool log_on= (thd->options & OPTION_BIN_LOG || bool log_on= (thd->options & OPTION_BIN_LOG ||
! (thd->security_ctx->master_access & SUPER_ACL)); ! (thd->security_ctx->master_access & SUPER_ACL));
if (log_on && mysql_bin_log.is_open() && is_multi_insert) if (log_on && mysql_bin_log.is_open() && is_multi_insert)
...@@ -470,6 +479,72 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type, ...@@ -470,6 +479,72 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
} }
/**
Find or create a delayed insert thread for the first table in
the table list, then open and lock the remaining tables.
If a table can not be used with insert delayed, upgrade the lock
and open and lock all tables using the standard mechanism.
@param thd thread context
@param table_list list of "descriptors" for tables referenced
directly in statement SQL text.
The first element in the list corresponds to
the destination table for inserts, remaining
tables, if any, are usually tables referenced
by sub-queries in the right part of the
INSERT.
@return Status of the operation. In case of success 'table'
member of every table_list element points to an instance of
class TABLE.
@sa open_and_lock_tables for more information about MySQL table
level locking
*/
static
bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
{
DBUG_ENTER("open_and_lock_for_insert_delayed");
#ifndef EMBEDDED_LIBRARY
if (delayed_get_table(thd, table_list))
DBUG_RETURN(TRUE);
if (table_list->table)
{
/*
Open tables used for sub-selects or in stored functions, will also
cache these functions.
*/
if (open_and_lock_tables(thd, table_list->next_global))
{
end_delayed_insert(thd);
DBUG_RETURN(TRUE);
}
/*
First table was not processed by open_and_lock_tables(),
we need to set updatability flag "by hand".
*/
if (!table_list->derived && !table_list->view)
table_list->updatable= 1; // usual table
DBUG_RETURN(FALSE);
}
#endif
/*
* This is embedded library and we don't have auxiliary
threads OR
* a lock upgrade was requested inside delayed_get_table
because
- there are too many delayed insert threads OR
- the table has triggers.
Use a normal insert.
*/
table_list->lock_type= TL_WRITE;
DBUG_RETURN(open_and_lock_tables(thd, table_list));
}
/** /**
INSERT statement implementation INSERT statement implementation
*/ */
...@@ -483,11 +558,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, ...@@ -483,11 +558,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
bool ignore) bool ignore)
{ {
int error, res; int error, res;
/*
log_on is about delayed inserts only.
By default, both logs are enabled (this won't cause problems if the server
runs without --log-update or --log-bin).
*/
bool transactional_table, joins_freed= FALSE; bool transactional_table, joins_freed= FALSE;
bool changed; bool changed;
uint value_count; uint value_count;
...@@ -501,9 +571,14 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, ...@@ -501,9 +571,14 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
Name_resolution_context_state ctx_state; Name_resolution_context_state ctx_state;
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
char *query= thd->query; char *query= thd->query;
#endif /*
log_on is about delayed inserts only.
By default, both logs are enabled (this won't cause problems if the server
runs without --log-update or --log-bin).
*/
bool log_on= (thd->options & OPTION_BIN_LOG) || bool log_on= (thd->options & OPTION_BIN_LOG) ||
(!(thd->security_ctx->master_access & SUPER_ACL)); (!(thd->security_ctx->master_access & SUPER_ACL));
#endif
thr_lock_type lock_type = table_list->lock_type; thr_lock_type lock_type = table_list->lock_type;
Item *unused_conds= 0; Item *unused_conds= 0;
DBUG_ENTER("mysql_insert"); DBUG_ENTER("mysql_insert");
...@@ -514,7 +589,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, ...@@ -514,7 +589,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
*/ */
upgrade_lock_type(thd, &table_list->lock_type, duplic, upgrade_lock_type(thd, &table_list->lock_type, duplic,
values_list.elements > 1); values_list.elements > 1);
lock_type= table_list->lock_type;
/* /*
We can't write-delayed into a table locked with LOCK TABLES: We can't write-delayed into a table locked with LOCK TABLES:
...@@ -522,7 +596,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, ...@@ -522,7 +596,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
never be able to get a lock on the table. QQQ: why not never be able to get a lock on the table. QQQ: why not
upgrade the lock here instead? upgrade the lock here instead?
*/ */
if (lock_type == TL_WRITE_DELAYED && thd->locked_tables && if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
find_locked_table(thd, table_list->db, table_list->table_name)) find_locked_table(thd, table_list->db, table_list->table_name))
{ {
my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0), my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
...@@ -530,36 +604,16 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, ...@@ -530,36 +604,16 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
#ifndef EMBEDDED_LIBRARY if (table_list->lock_type == TL_WRITE_DELAYED)
if (lock_type == TL_WRITE_DELAYED)
{ {
res= 1; if (open_and_lock_for_insert_delayed(thd, table_list))
if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error) DBUG_RETURN(TRUE);
{
/*
Open tables used for sub-selects or in stored functions, will also
cache these functions.
*/
res= open_and_lock_tables(thd, table_list->next_global);
/*
First is not processed by open_and_lock_tables() => we need set
updateability flags "by hands".
*/
if (!table_list->derived && !table_list->view)
table_list->updatable= 1; // usual table
}
else if (thd->net.last_errno != ER_WRONG_OBJECT)
{
/* Too many delayed insert threads; Use a normal insert */
table_list->lock_type= lock_type= TL_WRITE;
res= open_and_lock_tables(thd, table_list);
}
} }
else else
#endif /* EMBEDDED_LIBRARY */ {
res= open_and_lock_tables(thd, table_list); if (open_and_lock_tables(thd, table_list))
if (res || thd->is_fatal_error) DBUG_RETURN(TRUE);
DBUG_RETURN(TRUE); }
thd->proc_info="init"; thd->proc_info="init";
thd->used_tables=0; thd->used_tables=0;
...@@ -577,6 +631,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, ...@@ -577,6 +631,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
/* mysql_prepare_insert set table_list->table if it was not set */ /* mysql_prepare_insert set table_list->table if it was not set */
table= table_list->table; table= table_list->table;
lock_type= table_list->lock_type;
context= &thd->lex->select_lex.context; context= &thd->lex->select_lex.context;
/* /*
...@@ -1633,19 +1688,32 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) ...@@ -1633,19 +1688,32 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
Attempt to find or create a delayed insert thread to handle inserts Attempt to find or create a delayed insert thread to handle inserts
into this table. into this table.
@return Return a local copy of the table in the delayed thread @return In case of success, table_list->table points to a local copy
@retval NULL too many delayed threads OR of the delayed table or is set to NULL, which indicates a
this thread ran out of resources OR request for lock upgrade. In case of failure, value of
a newly created delayed insert thread ran out of resources OR table_list->table is undefined.
the delayed insert thread failed to open the table. @retval TRUE - this thread ran out of resources OR
In the last three cases an error is set in THD. - a newly created delayed insert thread ran out of
resources OR
- the created thread failed to open and lock the table
(e.g. because it does not exist) OR
- the table opened in the created thread turned out to
be a view
@retval FALSE - table successfully opened OR
- too many delayed insert threads OR
- the table has triggers and we have to fall back to
a normal INSERT
Two latter cases indicate a request for lock upgrade.
XXX: why do we regard INSERT DELAYED into a view as an error and
do not simply a lock upgrade?
*/ */
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) static
bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
{ {
int error; int error;
Delayed_insert *tmp; Delayed_insert *tmp;
TABLE *table;
DBUG_ENTER("delayed_get_table"); DBUG_ENTER("delayed_get_table");
/* Must be set in the parser */ /* Must be set in the parser */
...@@ -1671,7 +1739,8 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) ...@@ -1671,7 +1739,8 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
if (!(tmp=new Delayed_insert())) if (!(tmp=new Delayed_insert()))
{ {
my_error(ER_OUTOFMEMORY,MYF(0),sizeof(Delayed_insert)); my_error(ER_OUTOFMEMORY,MYF(0),sizeof(Delayed_insert));
goto err1; thd->fatal_error();
goto end_create;
} }
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
thread_count++; thread_count++;
...@@ -1680,9 +1749,10 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) ...@@ -1680,9 +1749,10 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME)); tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME));
if (tmp->thd.db == NULL || tmp->thd.query == NULL) if (tmp->thd.db == NULL || tmp->thd.query == NULL)
{ {
/* The error is reported */
delete tmp; delete tmp;
my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0)); thd->fatal_error();
goto err1; goto end_create;
} }
tmp->table_list= *table_list; // Needed to open table tmp->table_list= *table_list; // Needed to open table
tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query; tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query;
...@@ -1698,7 +1768,8 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) ...@@ -1698,7 +1768,8 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
tmp->unlock(); tmp->unlock();
delete tmp; delete tmp;
my_error(ER_CANT_CREATE_THREAD, MYF(0), error); my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
goto err1; thd->fatal_error();
goto end_create;
} }
/* Wait until table is open */ /* Wait until table is open */
...@@ -1711,41 +1782,44 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) ...@@ -1711,41 +1782,44 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
thd->proc_info="got old table"; thd->proc_info="got old table";
if (tmp->thd.killed) if (tmp->thd.killed)
{ {
if (tmp->thd.is_fatal_error) if (tmp->thd.net.report_error)
{ {
/* Copy error message and abort */ /*
thd->fatal_error(); Copy the error message. Note that we don't treat fatal
strmov(thd->net.last_error,tmp->thd.net.last_error); errors in the delayed thread as fatal errors in the
thd->net.last_errno=tmp->thd.net.last_errno; main thread. Use of my_message will enable stored
procedures continue handlers.
*/
my_message(tmp->thd.net.last_errno, tmp->thd.net.last_error,
MYF(0));
} }
tmp->unlock(); tmp->unlock();
goto err; goto end_create;
} }
if (thd->killed) if (thd->killed)
{ {
tmp->unlock(); tmp->unlock();
goto err; goto end_create;
} }
} }
pthread_mutex_unlock(&LOCK_delayed_create); pthread_mutex_unlock(&LOCK_delayed_create);
} }
pthread_mutex_lock(&tmp->mutex); pthread_mutex_lock(&tmp->mutex);
table= tmp->get_local_table(thd); table_list->table= tmp->get_local_table(thd);
pthread_mutex_unlock(&tmp->mutex); pthread_mutex_unlock(&tmp->mutex);
if (table) if (table_list->table)
{
DBUG_ASSERT(tmp->thd.net.report_error == 0 && thd->net.report_error == 0);
thd->di=tmp; thd->di=tmp;
else if (tmp->thd.is_fatal_error) }
thd->fatal_error();
/* Unlock the delayed insert object after its last access. */ /* Unlock the delayed insert object after its last access. */
tmp->unlock(); tmp->unlock();
DBUG_RETURN((table_list->table=table)); DBUG_RETURN(table_list->table == NULL);
err1: end_create:
thd->fatal_error();
err:
pthread_mutex_unlock(&LOCK_delayed_create); pthread_mutex_unlock(&LOCK_delayed_create);
DBUG_RETURN(0); // Continue with normal insert DBUG_RETURN(thd->net.report_error);
} }
...@@ -1760,6 +1834,9 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) ...@@ -1760,6 +1834,9 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
@pre This function is called from the client thread. Delayed @pre This function is called from the client thread. Delayed
insert thread mutex must be acquired before invoking this insert thread mutex must be acquired before invoking this
function. function.
@return Not-NULL table object on success. NULL in case of an error,
which is set in client_thd.
*/ */
TABLE *Delayed_insert::get_local_table(THD* client_thd) TABLE *Delayed_insert::get_local_table(THD* client_thd)
...@@ -1785,8 +1862,7 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) ...@@ -1785,8 +1862,7 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd)
goto error; goto error;
if (dead) if (dead)
{ {
strmov(client_thd->net.last_error,thd.net.last_error); my_message(thd.net.last_errno, thd.net.last_error, MYF(0));
client_thd->net.last_errno=thd.net.last_errno;
goto error; goto error;
} }
} }
...@@ -1831,7 +1907,7 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) ...@@ -1831,7 +1907,7 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd)
for (org_field= table->field; *org_field; org_field++, field++) for (org_field= table->field; *org_field; org_field++, field++)
{ {
if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1))) if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
DBUG_RETURN(0); goto error;
(*field)->orig_table= copy; // Remove connection (*field)->orig_table= copy; // Remove connection
(*field)->move_field(adjust_ptrs); // Point at copy->record[0] (*field)->move_field(adjust_ptrs); // Point at copy->record[0]
if (*org_field == found_next_number_field) if (*org_field == found_next_number_field)
...@@ -1871,8 +1947,9 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) ...@@ -1871,8 +1947,9 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd)
/* Put a question in queue */ /* Put a question in queue */
static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, bool ignore, static
char *query, uint query_length, bool log_on) int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, bool ignore,
char *query, uint query_length, bool log_on)
{ {
delayed_row *row=0; delayed_row *row=0;
Delayed_insert *di=thd->di; Delayed_insert *di=thd->di;
...@@ -1939,6 +2016,10 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, bool igno ...@@ -1939,6 +2016,10 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, bool igno
DBUG_RETURN(1); DBUG_RETURN(1);
} }
/**
Signal the delayed insert thread that this user connection
is finished using it for this statement.
*/
static void end_delayed_insert(THD *thd) static void end_delayed_insert(THD *thd)
{ {
...@@ -2051,6 +2132,15 @@ pthread_handler_t handle_delayed_insert(void *arg) ...@@ -2051,6 +2132,15 @@ pthread_handler_t handle_delayed_insert(void *arg)
my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name); my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
goto end; goto end;
} }
if (di->table->triggers)
{
/*
Table has triggers. This is not an error, but we do
not support triggers with delayed insert. Terminate the delayed
thread without an error and thus request lock upgrade.
*/
goto end;
}
di->table->copy_blobs=1; di->table->copy_blobs=1;
/* One can now use this */ /* One can now use this */
......
...@@ -886,6 +886,12 @@ class Query_tables_list ...@@ -886,6 +886,12 @@ class Query_tables_list
query_tables_own_last= 0; query_tables_own_last= 0;
} }
} }
/**
true if the parsed tree contains references to stored procedures
or functions, false otherwise
*/
bool uses_stored_routines() const
{ return sroutines_list.elements != 0; }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment