Commit 348ccb6f authored by Igor Babaev's avatar Igor Babaev

Fixed bug mdev-11674.

1. The rows of a recursive CTE at some point may overflow
the HEAP temporary table containing them. At this point
the table is converted to a MyISAM temporary table and the
new added rows are placed into this MyISAM table.
A bug in the of select_union_recursive::send_data prevented
the server from writing the row that caused the overflow
into the temporary table used for the result of the iteration
steps. This could lead, in particular,to a premature end
of the iterations.
2. The method TABLE::insert_all_rows_into() that was used
to copy all rows of one temporary table into another
did not take into account that the destination temporary
table must be converted to a MyISAM table at some point.
This patch fixed this problem. It also renamed the method
into TABLE::insert_all_rows_into_tmp_table() and added
an extra parameter needed for the conversion.
parent a758479c
......@@ -2327,3 +2327,19 @@ a b dist
7 6 3
DROP VIEW edges2;
DROP TABLE edges;
#
# MDEV-11674: recursive CTE table that cannot be stored
# in a heap table
#
create table t1 (id int, test_data varchar(36));
insert into t1(id, test_data)
select id, test_data
from (
with recursive data_generator(id, test_data) as (
select 1 as id, uuid() as test_data
union all
select id + 1, uuid() from data_generator where id < 150000
)
select * from data_generator
) as a;
drop table t1;
......@@ -1484,3 +1484,24 @@ ORDER BY a, dist, b;
DROP VIEW edges2;
DROP TABLE edges;
--echo #
--echo # MDEV-11674: recursive CTE table that cannot be stored
--echo # in a heap table
--echo #
create table t1 (id int, test_data varchar(36));
insert into t1(id, test_data)
select id, test_data
from (
with recursive data_generator(id, test_data) as (
select 1 as id, uuid() as test_data
union all
select id + 1, uuid() from data_generator where id < 150000
)
select * from data_generator
) as a;
drop table t1;
......@@ -969,7 +969,10 @@ bool TABLE_LIST::fill_recursive(THD *thd)
if (!rc)
{
TABLE *src= with->rec_result->table;
rc =src->insert_all_rows_into(thd, table, true);
rc =src->insert_all_rows_into_tmp_table(thd,
table,
&with->rec_result->tmp_table_param,
true);
}
return rc;
}
......
......@@ -105,7 +105,7 @@ int select_union_recursive::send_data(List<Item> &values)
{
int rc= select_union::send_data(values);
if (!write_err)
if (write_err != HA_ERR_FOUND_DUPP_KEY)
{
int err;
if ((err= incr_table->file->ha_write_tmp_row(table->record[0])))
......@@ -1192,6 +1192,7 @@ bool st_select_lex_unit::exec_recursive()
st_select_lex *end= NULL;
bool is_unrestricted= with_element->is_unrestricted();
List_iterator_fast<TABLE> li(with_element->rec_result->rec_tables);
TMP_TABLE_PARAM *tmp_table_param= &with_element->rec_result->tmp_table_param;
ha_rows examined_rows= 0;
bool was_executed= executed;
TABLE *rec_table;
......@@ -1247,7 +1248,9 @@ bool st_select_lex_unit::exec_recursive()
while ((rec_table= li++))
{
saved_error=
incr_table->insert_all_rows_into(thd, rec_table, !is_unrestricted);
incr_table->insert_all_rows_into_tmp_table(thd, rec_table,
tmp_table_param,
!is_unrestricted);
if (!with_element->rec_result->first_rec_table_to_update)
with_element->rec_result->first_rec_table_to_update= rec_table;
if (with_element->level == 1)
......
......@@ -7573,46 +7573,58 @@ bool TABLE::validate_default_values_of_unset_fields(THD *thd) const
}
bool TABLE::insert_all_rows_into(THD *thd, TABLE *dest, bool with_cleanup)
bool TABLE::insert_all_rows_into_tmp_table(THD *thd,
TABLE *tmp_table,
TMP_TABLE_PARAM *tmp_table_param,
bool with_cleanup)
{
int write_err= 0;
DBUG_ENTER("TABLE::insert_all_rows_into");
DBUG_ENTER("TABLE::insert_all_rows_into_tmp_table");
if (with_cleanup)
{
if ((write_err= dest->file->ha_delete_all_rows()))
if ((write_err= tmp_table->file->ha_delete_all_rows()))
goto err;
}
if (file->indexes_are_disabled())
dest->file->ha_disable_indexes(HA_KEY_SWITCH_ALL);
tmp_table->file->ha_disable_indexes(HA_KEY_SWITCH_ALL);
file->ha_index_or_rnd_end();
if (file->ha_rnd_init_with_error(1))
DBUG_RETURN(1);
if (dest->no_rows)
dest->file->extra(HA_EXTRA_NO_ROWS);
if (tmp_table->no_rows)
tmp_table->file->extra(HA_EXTRA_NO_ROWS);
else
{
/* update table->file->stats.records */
file->info(HA_STATUS_VARIABLE);
dest->file->ha_start_bulk_insert(file->stats.records);
tmp_table->file->ha_start_bulk_insert(file->stats.records);
}
while (!file->ha_rnd_next(dest->record[1]))
while (!file->ha_rnd_next(tmp_table->record[0]))
{
write_err= dest->file->ha_write_tmp_row(dest->record[1]);
write_err= tmp_table->file->ha_write_tmp_row(tmp_table->record[0]);
if (write_err)
goto err;
{
bool is_duplicate;
if (tmp_table->file->is_fatal_error(write_err, HA_CHECK_DUP) &&
create_internal_tmp_table_from_heap(thd, tmp_table,
tmp_table_param->start_recinfo,
&tmp_table_param->recinfo,
write_err, 1, &is_duplicate))
DBUG_RETURN(1);
}
if (thd->check_killed())
{
thd->send_kill_message();
goto err_killed;
}
}
if (!dest->no_rows && dest->file->ha_end_bulk_insert())
if (!tmp_table->no_rows && tmp_table->file->ha_end_bulk_insert())
goto err;
DBUG_RETURN(0);
......
......@@ -53,6 +53,7 @@ class With_element;
struct TDC_element;
class Virtual_column_info;
class Table_triggers_list;
class TMP_TABLE_PARAM;
/*
Used to identify NESTED_JOIN structures within a join (applicable only to
......@@ -1447,7 +1448,10 @@ struct TABLE
inline Field **field_to_fill();
bool validate_default_values_of_unset_fields(THD *thd) const;
bool insert_all_rows_into(THD *thd, TABLE *dest, bool with_cleanup);
bool insert_all_rows_into_tmp_table(THD *thd,
TABLE *tmp_table,
TMP_TABLE_PARAM *tmp_table_param,
bool with_cleanup);
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment