Commit fa1f214c authored by Sergei Golubchik's avatar Sergei Golubchik

MDEV-12725 select on federated table crashes server

NET can only store current_thd if this NET (or its MYSQL) is not
moved between threads. In FederatedX MYSQL is part of the TABLE,
and a TABLE can migrate between threads.

Fix: associate NET with THD in txn->acquire() , and dissociate
in txn->release()
parent 5099d6de
SET GLOBAL query_cache_size= 16*1024*1024;
SET GLOBAL query_cache_type= 1;
CREATE TABLE t1 (i INT);
CREATE TABLE t2 (i INT) ENGINE=FEDERATED CONNECTION="mysql://root@localhost:MASTER_MYPORT/test/t1";
ALTER TABLE t2 DISABLE KEYS;
ERROR HY000: Storage engine FEDERATED of the table `test`.`t2` doesn't have this option
CREATE TABLE t3 (i INT) ENGINE=FEDERATED CONNECTION="mysql://root@localhost:MASTER_MYPORT/test/t1";
SET GLOBAL query_cache_size= default;
SET GLOBAL query_cache_type= default;
drop table t1, t2, t3;
#
# MDEV-12725 select on federated table crashes server
#
#
SET GLOBAL query_cache_size= 16*1024*1024;
SET GLOBAL query_cache_type= 1;
CREATE TABLE t1 (i INT);
--replace_result $MASTER_MYPORT MASTER_MYPORT
eval CREATE TABLE t2 (i INT) ENGINE=FEDERATED CONNECTION="mysql://root@localhost:$MASTER_MYPORT/test/t1";
--error ER_ILLEGAL_HA
ALTER TABLE t2 DISABLE KEYS;
--replace_result $MASTER_MYPORT MASTER_MYPORT
eval CREATE TABLE t3 (i INT) ENGINE=FEDERATED CONNECTION="mysql://root@localhost:$MASTER_MYPORT/test/t1";
source include/restart_mysqld.inc;
SET GLOBAL query_cache_size= default;
SET GLOBAL query_cache_type= default;
drop table t1, t2, t3;
...@@ -120,6 +120,7 @@ class federatedx_io_mysql :public federatedx_io ...@@ -120,6 +120,7 @@ class federatedx_io_mysql :public federatedx_io
void *ref); void *ref);
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref); const void *ref);
virtual void set_thd(void *thd);
}; };
...@@ -648,3 +649,7 @@ int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result, ...@@ -648,3 +649,7 @@ int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result,
return 0; return 0;
} }
void federatedx_io_mysql::set_thd(void *thd)
{
mysql.net.thd= thd;
}
...@@ -93,8 +93,8 @@ void federatedx_txn::close(FEDERATEDX_SERVER *server) ...@@ -93,8 +93,8 @@ void federatedx_txn::close(FEDERATEDX_SERVER *server)
} }
int federatedx_txn::acquire(FEDERATEDX_SHARE *share, bool readonly, int federatedx_txn::acquire(FEDERATEDX_SHARE *share, void *thd,
federatedx_io **ioptr) bool readonly, federatedx_io **ioptr)
{ {
federatedx_io *io; federatedx_io *io;
FEDERATEDX_SERVER *server= share->s; FEDERATEDX_SERVER *server= share->s;
...@@ -131,6 +131,7 @@ int federatedx_txn::acquire(FEDERATEDX_SHARE *share, bool readonly, ...@@ -131,6 +131,7 @@ int federatedx_txn::acquire(FEDERATEDX_SHARE *share, bool readonly,
io->busy= TRUE; io->busy= TRUE;
io->owner_ptr= ioptr; io->owner_ptr= ioptr;
io->set_thd(thd);
} }
DBUG_ASSERT(io->busy && io->server == server); DBUG_ASSERT(io->busy && io->server == server);
...@@ -157,8 +158,11 @@ void federatedx_txn::release(federatedx_io **ioptr) ...@@ -157,8 +158,11 @@ void federatedx_txn::release(federatedx_io **ioptr)
io->active, io->is_autocommit())); io->active, io->is_autocommit()));
if (io->is_autocommit()) if (io->is_autocommit())
{
io->set_thd(NULL);
io->active= FALSE; io->active= FALSE;
} }
}
release_scan(); release_scan();
......
...@@ -1764,7 +1764,7 @@ int ha_federatedx::open(const char *name, int mode, uint test_if_locked) ...@@ -1764,7 +1764,7 @@ int ha_federatedx::open(const char *name, int mode, uint test_if_locked)
txn= get_txn(thd); txn= get_txn(thd);
if ((error= txn->acquire(share, TRUE, &io))) if ((error= txn->acquire(share, thd, TRUE, &io)))
{ {
free_share(txn, share); free_share(txn, share);
DBUG_RETURN(error); DBUG_RETURN(error);
...@@ -2049,7 +2049,7 @@ int ha_federatedx::write_row(uchar *buf) ...@@ -2049,7 +2049,7 @@ int ha_federatedx::write_row(uchar *buf)
/* we always want to append this, even if there aren't any fields */ /* we always want to append this, even if there aren't any fields */
values_string.append(STRING_WITH_LEN(") ")); values_string.append(STRING_WITH_LEN(") "));
if ((error= txn->acquire(share, FALSE, &io))) if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
DBUG_RETURN(error); DBUG_RETURN(error);
if (use_bulk_insert) if (use_bulk_insert)
...@@ -2138,7 +2138,7 @@ void ha_federatedx::start_bulk_insert(ha_rows rows, uint flags) ...@@ -2138,7 +2138,7 @@ void ha_federatedx::start_bulk_insert(ha_rows rows, uint flags)
Make sure we have an open connection so that we know the Make sure we have an open connection so that we know the
maximum packet size. maximum packet size.
*/ */
if (txn->acquire(share, FALSE, &io)) if (txn->acquire(share, ha_thd(), FALSE, &io))
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
page_size= (uint) my_getpagesize(); page_size= (uint) my_getpagesize();
...@@ -2169,7 +2169,7 @@ int ha_federatedx::end_bulk_insert() ...@@ -2169,7 +2169,7 @@ int ha_federatedx::end_bulk_insert()
if (bulk_insert.str && bulk_insert.length && !table_will_be_deleted) if (bulk_insert.str && bulk_insert.length && !table_will_be_deleted)
{ {
if ((error= txn->acquire(share, FALSE, &io))) if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
DBUG_RETURN(error); DBUG_RETURN(error);
if (io->query(bulk_insert.str, bulk_insert.length)) if (io->query(bulk_insert.str, bulk_insert.length))
error= stash_remote_error(); error= stash_remote_error();
...@@ -2221,7 +2221,7 @@ int ha_federatedx::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -2221,7 +2221,7 @@ int ha_federatedx::optimize(THD* thd, HA_CHECK_OPT* check_opt)
DBUG_ASSERT(txn == get_txn(thd)); DBUG_ASSERT(txn == get_txn(thd));
if ((error= txn->acquire(share, FALSE, &io))) if ((error= txn->acquire(share, thd, FALSE, &io)))
DBUG_RETURN(error); DBUG_RETURN(error);
if (io->query(query.ptr(), query.length())) if (io->query(query.ptr(), query.length()))
...@@ -2253,7 +2253,7 @@ int ha_federatedx::repair(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -2253,7 +2253,7 @@ int ha_federatedx::repair(THD* thd, HA_CHECK_OPT* check_opt)
DBUG_ASSERT(txn == get_txn(thd)); DBUG_ASSERT(txn == get_txn(thd));
if ((error= txn->acquire(share, FALSE, &io))) if ((error= txn->acquire(share, thd, FALSE, &io)))
DBUG_RETURN(error); DBUG_RETURN(error);
if (io->query(query.ptr(), query.length())) if (io->query(query.ptr(), query.length()))
...@@ -2412,7 +2412,7 @@ int ha_federatedx::update_row(const uchar *old_data, uchar *new_data) ...@@ -2412,7 +2412,7 @@ int ha_federatedx::update_row(const uchar *old_data, uchar *new_data)
if (!has_a_primary_key) if (!has_a_primary_key)
update_string.append(STRING_WITH_LEN(" LIMIT 1")); update_string.append(STRING_WITH_LEN(" LIMIT 1"));
if ((error= txn->acquire(share, FALSE, &io))) if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
DBUG_RETURN(error); DBUG_RETURN(error);
if (io->query(update_string.ptr(), update_string.length())) if (io->query(update_string.ptr(), update_string.length()))
...@@ -2490,7 +2490,7 @@ int ha_federatedx::delete_row(const uchar *buf) ...@@ -2490,7 +2490,7 @@ int ha_federatedx::delete_row(const uchar *buf)
DBUG_PRINT("info", DBUG_PRINT("info",
("Delete sql: %s", delete_string.c_ptr_quick())); ("Delete sql: %s", delete_string.c_ptr_quick()));
if ((error= txn->acquire(share, FALSE, &io))) if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
DBUG_RETURN(error); DBUG_RETURN(error);
if (io->query(delete_string.ptr(), delete_string.length())) if (io->query(delete_string.ptr(), delete_string.length()))
...@@ -2599,7 +2599,7 @@ int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index, ...@@ -2599,7 +2599,7 @@ int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index,
NULL, 0, 0); NULL, 0, 0);
sql_query.append(index_string); sql_query.append(index_string);
if ((retval= txn->acquire(share, TRUE, &io))) if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
DBUG_RETURN(retval); DBUG_RETURN(retval);
if (io->query(sql_query.ptr(), sql_query.length())) if (io->query(sql_query.ptr(), sql_query.length()))
...@@ -2679,7 +2679,7 @@ int ha_federatedx::read_range_first(const key_range *start_key, ...@@ -2679,7 +2679,7 @@ int ha_federatedx::read_range_first(const key_range *start_key,
&table->key_info[active_index], &table->key_info[active_index],
start_key, end_key, 0, eq_range_arg); start_key, end_key, 0, eq_range_arg);
if ((retval= txn->acquire(share, TRUE, &io))) if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
DBUG_RETURN(retval); DBUG_RETURN(retval);
if (stored_result) if (stored_result)
...@@ -2779,7 +2779,7 @@ int ha_federatedx::rnd_init(bool scan) ...@@ -2779,7 +2779,7 @@ int ha_federatedx::rnd_init(bool scan)
{ {
int error; int error;
if ((error= txn->acquire(share, TRUE, &io))) if ((error= txn->acquire(share, ha_thd(), TRUE, &io)))
DBUG_RETURN(error); DBUG_RETURN(error);
if (stored_result) if (stored_result)
...@@ -2826,7 +2826,7 @@ int ha_federatedx::free_result() ...@@ -2826,7 +2826,7 @@ int ha_federatedx::free_result()
else else
{ {
federatedx_io *tmp_io= 0, **iop; federatedx_io *tmp_io= 0, **iop;
if (!*(iop= &io) && (error= txn->acquire(share, TRUE, (iop= &tmp_io)))) if (!*(iop= &io) && (error= txn->acquire(share, ha_thd(), TRUE, (iop= &tmp_io))))
{ {
DBUG_ASSERT(0); // Fail when testing DBUG_ASSERT(0); // Fail when testing
insert_dynamic(&results, (uchar*) &stored_result); insert_dynamic(&results, (uchar*) &stored_result);
...@@ -2906,7 +2906,7 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result) ...@@ -2906,7 +2906,7 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result)
FEDERATEDX_IO_ROW *row; FEDERATEDX_IO_ROW *row;
DBUG_ENTER("ha_federatedx::read_next"); DBUG_ENTER("ha_federatedx::read_next");
if ((retval= txn->acquire(share, TRUE, &io))) if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
DBUG_RETURN(retval); DBUG_RETURN(retval);
/* Fetch a row, insert it back in a row format. */ /* Fetch a row, insert it back in a row format. */
...@@ -2951,7 +2951,7 @@ void ha_federatedx::position(const uchar *record __attribute__ ((unused))) ...@@ -2951,7 +2951,7 @@ void ha_federatedx::position(const uchar *record __attribute__ ((unused)))
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
if (txn->acquire(share, TRUE, &io)) if (txn->acquire(share, ha_thd(), TRUE, &io))
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
io->mark_position(stored_result, ref); io->mark_position(stored_result, ref);
...@@ -2980,7 +2980,7 @@ int ha_federatedx::rnd_pos(uchar *buf, uchar *pos) ...@@ -2980,7 +2980,7 @@ int ha_federatedx::rnd_pos(uchar *buf, uchar *pos)
/* We have to move this to 'ref' to get things aligned */ /* We have to move this to 'ref' to get things aligned */
bmove(ref, pos, ref_length); bmove(ref, pos, ref_length);
if ((retval= txn->acquire(share, TRUE, &io))) if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
goto error; goto error;
if ((retval= io->seek_position(&result, ref))) if ((retval= io->seek_position(&result, ref)))
...@@ -3054,7 +3054,7 @@ int ha_federatedx::info(uint flag) ...@@ -3054,7 +3054,7 @@ int ha_federatedx::info(uint flag)
/* we want not to show table status if not needed to do so */ /* we want not to show table status if not needed to do so */
if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO)) if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO))
{ {
if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, TRUE, (iop= &tmp_io)))) if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io))))
goto fail; goto fail;
} }
...@@ -3155,6 +3155,7 @@ int ha_federatedx::extra(ha_extra_function operation) ...@@ -3155,6 +3155,7 @@ int ha_federatedx::extra(ha_extra_function operation)
int ha_federatedx::reset(void) int ha_federatedx::reset(void)
{ {
THD *thd= ha_thd();
int error = 0; int error = 0;
insert_dup_update= FALSE; insert_dup_update= FALSE;
...@@ -3172,9 +3173,9 @@ int ha_federatedx::reset(void) ...@@ -3172,9 +3173,9 @@ int ha_federatedx::reset(void)
federatedx_io *tmp_io= 0, **iop; federatedx_io *tmp_io= 0, **iop;
// external_lock may not have been called so txn may not be set // external_lock may not have been called so txn may not be set
tmp_txn= get_txn(ha_thd()); tmp_txn= get_txn(thd);
if (!*(iop= &io) && (error= tmp_txn->acquire(share, TRUE, (iop= &tmp_io)))) if (!*(iop= &io) && (error= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io))))
{ {
DBUG_ASSERT(0); // Fail when testing DBUG_ASSERT(0); // Fail when testing
return error; return error;
...@@ -3208,6 +3209,7 @@ int ha_federatedx::reset(void) ...@@ -3208,6 +3209,7 @@ int ha_federatedx::reset(void)
int ha_federatedx::delete_all_rows() int ha_federatedx::delete_all_rows()
{ {
THD *thd= ha_thd();
char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
String query(query_buffer, sizeof(query_buffer), &my_charset_bin); String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
int error; int error;
...@@ -3221,14 +3223,14 @@ int ha_federatedx::delete_all_rows() ...@@ -3221,14 +3223,14 @@ int ha_federatedx::delete_all_rows()
ident_quote_char); ident_quote_char);
/* no need for savepoint in autocommit mode */ /* no need for savepoint in autocommit mode */
if (!(ha_thd()->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) if (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
txn->stmt_autocommit(); txn->stmt_autocommit();
/* /*
TRUNCATE won't return anything in mysql_affected_rows TRUNCATE won't return anything in mysql_affected_rows
*/ */
if ((error= txn->acquire(share, FALSE, &io))) if ((error= txn->acquire(share, thd, FALSE, &io)))
DBUG_RETURN(error); DBUG_RETURN(error);
if (io->query(query.ptr(), query.length())) if (io->query(query.ptr(), query.length()))
...@@ -3373,7 +3375,7 @@ int ha_federatedx::create(const char *name, TABLE *table_arg, ...@@ -3373,7 +3375,7 @@ int ha_federatedx::create(const char *name, TABLE *table_arg,
if (tmp_share.s) if (tmp_share.s)
{ {
tmp_txn= get_txn(thd); tmp_txn= get_txn(thd);
if (!(retval= tmp_txn->acquire(&tmp_share, TRUE, &tmp_io))) if (!(retval= tmp_txn->acquire(&tmp_share, thd, TRUE, &tmp_io)))
{ {
retval= test_connection(thd, tmp_io, &tmp_share); retval= test_connection(thd, tmp_io, &tmp_share);
tmp_txn->release(&tmp_io); tmp_txn->release(&tmp_io);
...@@ -3470,7 +3472,7 @@ int ha_federatedx::external_lock(MYSQL_THD thd, int lock_type) ...@@ -3470,7 +3472,7 @@ int ha_federatedx::external_lock(MYSQL_THD thd, int lock_type)
{ {
table_will_be_deleted = FALSE; table_will_be_deleted = FALSE;
txn= get_txn(thd); txn= get_txn(thd);
if (!(error= txn->acquire(share, lock_type == F_RDLCK, &io)) && if (!(error= txn->acquire(share, ha_thd(), lock_type == F_RDLCK, &io)) &&
(lock_type == F_WRLCK || !io->is_autocommit())) (lock_type == F_WRLCK || !io->is_autocommit()))
{ {
if (!thd_test_options(thd, (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) if (!thd_test_options(thd, (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
......
...@@ -215,6 +215,7 @@ class federatedx_io ...@@ -215,6 +215,7 @@ class federatedx_io
void *ref)=0; void *ref)=0;
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref)=0; const void *ref)=0;
virtual void set_thd(void *thd) { }
}; };
...@@ -233,7 +234,7 @@ class federatedx_txn ...@@ -233,7 +234,7 @@ class federatedx_txn
bool has_connections() const { return txn_list != NULL; } bool has_connections() const { return txn_list != NULL; }
bool in_transaction() const { return savepoint_next != 0; } bool in_transaction() const { return savepoint_next != 0; }
int acquire(FEDERATEDX_SHARE *share, bool readonly, federatedx_io **io); int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io);
void release(federatedx_io **io); void release(federatedx_io **io);
void close(FEDERATEDX_SERVER *); void close(FEDERATEDX_SERVER *);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment