Commit a7d880f0 authored by Oleksandr Byelkin's avatar Oleksandr Byelkin

MDEV-21916: COM_STMT_BULK_EXECUTE with RETURNING insert wrong values

The problem is that array binding uses net buffer to read parameters for each
execution while each execiting with RETURNING write in the same buffer.

Solution is to allocate new net buffer to avoid changing buffer we are reading
from.
parent 826eab3f
......@@ -179,14 +179,26 @@ my_bool my_net_init(NET *net, Vio *vio, void *thd, uint my_flags)
DBUG_RETURN(0);
}
/**
Allocate and assign new net buffer
@note In case of error the old buffer left
@retval TRUE error
@retval FALSE success
*/
my_bool net_allocate_new_packet(NET *net, void *thd, uint my_flags)
{
uchar *tmp;
DBUG_ENTER("net_allocate_new_packet");
if (!(net->buff=(uchar*) my_malloc(key_memory_NET_buff,
(size_t) net->max_packet +
NET_HEADER_SIZE + COMP_HEADER_SIZE + 1,
MYF(MY_WME | my_flags))))
if (!(tmp= (uchar*) my_malloc(key_memory_NET_buff,
(size_t) net->max_packet +
NET_HEADER_SIZE + COMP_HEADER_SIZE + 1,
MYF(MY_WME | my_flags))))
DBUG_RETURN(1);
net->buff= tmp;
net->buff_end=net->buff+net->max_packet;
net->write_pos=net->read_pos = net->buff;
DBUG_RETURN(0);
......
......@@ -598,6 +598,7 @@ void Protocol::end_statement()
thd->get_stmt_da()->get_sqlstate());
break;
case Diagnostics_area::DA_EOF:
case Diagnostics_area::DA_EOF_BULK:
error= send_eof(thd->server_status,
thd->get_stmt_da()->statement_warn_count());
break;
......
......@@ -698,7 +698,12 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
!table->prepare_triggers_for_delete_stmt_or_event())
will_batch= !table->file->start_bulk_delete();
if (returning)
/*
thd->get_stmt_da()->is_set() means first iteration of prepared statement
with array binding operation execution (non optimized so it is not
INSERT)
*/
if (returning && !thd->get_stmt_da()->is_set())
{
if (result->send_result_set_metadata(returning->item_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
......
......@@ -372,7 +372,7 @@ Diagnostics_area::set_eof_status(THD *thd)
{
DBUG_ENTER("set_eof_status");
/* Only allowed to report eof if has not yet reported an error */
DBUG_ASSERT(! is_set());
DBUG_ASSERT(!is_set() || (m_status == DA_EOF_BULK && is_bulk_op()));
/*
In production, refuse to overwrite an error or a custom response
with an EOF packet.
......@@ -385,11 +385,23 @@ Diagnostics_area::set_eof_status(THD *thd)
number of warnings, since they are not available to the client
anyway.
*/
m_statement_warn_count= (thd->spcont ?
0 :
current_statement_warn_count());
if (m_status == DA_EOF_BULK)
{
if (!thd->spcont)
m_statement_warn_count+= current_statement_warn_count();
}
else
{
if (thd->spcont)
{
m_statement_warn_count= 0;
m_affected_rows= 0;
}
else
m_statement_warn_count= current_statement_warn_count();
m_status= (is_bulk_op() ? DA_EOF_BULK : DA_EOF);
}
m_status= DA_EOF;
DBUG_VOID_RETURN;
}
......
......@@ -960,6 +960,8 @@ class Diagnostics_area: public Sql_state_errno,
DA_EOF,
/** Set whenever one calls my_ok() in PS bulk mode. */
DA_OK_BULK,
/** Set whenever one calls my_eof() in PS bulk mode. */
DA_EOF_BULK,
/** Set whenever one calls my_error() or my_message(). */
DA_ERROR,
/** Set in case of a custom response, such as one from COM_STMT_PREPARE. */
......@@ -1019,8 +1021,11 @@ class Diagnostics_area: public Sql_state_errno,
enum_diagnostics_status status() const { return m_status; }
const char *message() const
{ DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
m_status == DA_OK_BULK); return m_message; }
{
DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
m_status == DA_OK_BULK || m_status == DA_EOF_BULK);
return m_message;
}
bool skip_flush() const
{
......@@ -1055,7 +1060,7 @@ class Diagnostics_area: public Sql_state_errno,
uint statement_warn_count() const
{
DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK ||
m_status == DA_EOF);
m_status == DA_EOF ||m_status == DA_EOF_BULK );
return m_statement_warn_count;
}
......
......@@ -710,6 +710,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
Name_resolution_context *context;
Name_resolution_context_state ctx_state;
SELECT_LEX *returning= thd->lex->has_returning() ? thd->lex->returning() : 0;
unsigned char *readbuff= NULL;
#ifndef EMBEDDED_LIBRARY
char *query= thd->query();
......@@ -786,7 +787,25 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
/* Prepares LEX::returing_list if it is not empty */
if (returning)
{
result->prepare(returning->item_list, NULL);
if (thd->is_bulk_op())
{
/*
It is RETURNING which needs network buffer to write result set and
it is array binfing which need network buffer to read parameters.
So we allocate yet another network buffer.
The old buffer will be freed at the end of operation.
*/
DBUG_ASSERT(thd->protocol == &thd->protocol_binary);
readbuff= thd->net.buff; // old buffer
if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
{
readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
goto abort;
}
}
}
context= &thd->lex->first_select_lex()->context;
/*
......@@ -1316,7 +1335,8 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
thd->lex->current_select->save_leaf_tables(thd);
thd->lex->current_select->first_cond_optimization= 0;
}
if (readbuff)
my_free(readbuff);
DBUG_RETURN(FALSE);
abort:
......@@ -1330,6 +1350,8 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
if (!joins_freed)
free_underlaid_joins(thd, thd->lex->first_select_lex());
thd->abort_on_warning= 0;
if (readbuff)
my_free(readbuff);
DBUG_RETURN(retval);
}
......
......@@ -2356,7 +2356,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
size_t next_length_length= packet_start - packet;
unsigned char *readbuff= net->buff;
if (net_allocate_new_packet(net, thd, MYF(0)))
if (net_allocate_new_packet(net, thd, MYF(MY_THREAD_SPECIFIC)))
break;
PSI_statement_locker *save_locker= thd->m_statement_psi;
......
......@@ -894,6 +894,9 @@ static bool insert_bulk_params(Prepared_statement *stmt,
case STMT_INDICATOR_IGNORE:
param->set_ignore();
break;
default:
DBUG_ASSERT(0);
DBUG_RETURN(1);
}
}
else
......@@ -4567,6 +4570,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
uchar *packet_end_arg)
{
Reprepare_observer reprepare_observer;
unsigned char *readbuff= NULL;
bool error= 0;
packet= packet_arg;
packet_end= packet_end_arg;
......@@ -4580,24 +4584,37 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
if (state == Query_arena::STMT_ERROR)
{
my_message(last_errno, last_error, MYF(0));
thd->set_bulk_execution(0);
return TRUE;
goto err;
}
/* Check for non zero parameter count*/
if (param_count == 0)
{
DBUG_PRINT("error", ("Statement with no parameters for bulk execution."));
my_error(ER_UNSUPPORTED_PS, MYF(0));
thd->set_bulk_execution(0);
return TRUE;
goto err;
}
if (!(sql_command_flags[lex->sql_command] & CF_PS_ARRAY_BINDING_SAFE))
{
DBUG_PRINT("error", ("Command is not supported in bulk execution."));
my_error(ER_UNSUPPORTED_PS, MYF(0));
thd->set_bulk_execution(0);
return TRUE;
goto err;
}
/*
Here second buffer for not optimized commands,
optimized commands do it inside thier internal loop.
*/
if (!(sql_command_flags[lex->sql_command] & CF_PS_ARRAY_BINDING_OPTIMIZED) &&
this->lex->has_returning())
{
// Above check can be true for SELECT in future
DBUG_ASSERT(lex->sql_command != SQLCOM_SELECT);
readbuff= thd->net.buff; // old buffer
if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
{
readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
goto err;
}
}
#ifndef EMBEDDED_LIBRARY
......@@ -4609,9 +4626,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
{
my_error(ER_WRONG_ARGUMENTS, MYF(0),
"mysqld_stmt_bulk_execute");
reset_stmt_params(this);
thd->set_bulk_execution(0);
return true;
goto err;
}
read_types= FALSE;
......@@ -4628,8 +4643,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
{
if (set_bulk_parameters(TRUE))
{
thd->set_bulk_execution(0);
return true;
goto err;
}
}
......@@ -4693,8 +4707,16 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
}
reset_stmt_params(this);
thd->set_bulk_execution(0);
if (readbuff)
my_free(readbuff);
return error;
err:
reset_stmt_params(this);
thd->set_bulk_execution(0);
if (readbuff)
my_free(readbuff);
return true;
}
......
......@@ -5466,6 +5466,7 @@ void pfs_end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch(da->status())
{
case Diagnostics_area::DA_OK_BULK:
case Diagnostics_area::DA_EOF_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK:
......@@ -5706,6 +5707,7 @@ void pfs_end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch (da->status())
{
case Diagnostics_area::DA_OK_BULK:
case Diagnostics_area::DA_EOF_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK:
......
......@@ -20547,6 +20547,178 @@ static void test_bulk_replace()
rc= mysql_query(mysql, "DROP TABLE t1");
myquery(rc);
}
static void test_bulk_insert_returning()
{
int rc;
MYSQL_STMT *stmt;
MYSQL_BIND bind[2], res_bind[2];
MYSQL_ROW row;
MYSQL_RES *result;
int i,
id[]= {1, 2, 3, 4},
val[]= {1, 1, 1, 1},
count= sizeof(id)/sizeof(id[0]);
unsigned long length[2];
my_bool is_null[2];
my_bool error[2];
int32 res[2];
rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1");
myquery(rc);
rc= mysql_query(mysql,
"CREATE TABLE t1 (id int not null primary key, active int)");
myquery(rc);
stmt= mysql_stmt_init(mysql);
rc= mysql_stmt_prepare(stmt,
"insert into t1 values (?, ?) returning id, active",
-1);
check_execute(stmt, rc);
memset(bind, 0, sizeof(bind));
bind[0].buffer_type = MYSQL_TYPE_LONG;
bind[0].buffer = (void *)id;
bind[0].buffer_length = 0;
bind[1].buffer_type = MYSQL_TYPE_LONG;
bind[1].buffer = (void *)val;
bind[1].buffer_length = 0;
mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, (void*)&count);
rc= mysql_stmt_bind_param(stmt, bind);
check_execute(stmt, rc);
rc= mysql_stmt_execute(stmt);
myquery(rc);
memset(res_bind, 0, sizeof(res_bind));
for (i= 0; i < 2; i++)
{
res_bind[i].buffer_type= MYSQL_TYPE_LONG;
res_bind[i].buffer= (char *)&res[i];
res_bind[i].is_null= &is_null[i];
res_bind[i].length= &length[i];
res_bind[i].error= &error[i];
}
rc= mysql_stmt_bind_result(stmt, res_bind);
myquery(rc);
rc= mysql_stmt_store_result(stmt);
myquery(rc);
i= 0;
while (!mysql_stmt_fetch(stmt))
{
i++;
DIE_IF(is_null[0] || is_null[1]);
DIE_IF(res[0] != i);
DIE_IF(res[1] != 1);
}
DIE_IF(i != 4);
mysql_stmt_close(stmt);
rc= mysql_query(mysql, "SELECT id,active FROM t1");
myquery(rc);
result= mysql_store_result(mysql);
mytest(result);
i= 0;
while ((row= mysql_fetch_row(result)))
{
i++;
DIE_IF(atoi(row[0]) != i);
DIE_IF(atoi(row[1]) != 1);
}
DIE_IF(i != 4);
mysql_free_result(result);
rc= mysql_query(mysql, "DROP TABLE t1");
myquery(rc);
}
static void test_bulk_delete_returning()
{
int rc;
MYSQL_STMT *stmt;
MYSQL_BIND bind[2], res_bind[2];
MYSQL_ROW row;
MYSQL_RES *result;
int i,
id[]= {1, 2, 3, 4},
count= sizeof(id)/sizeof(id[0]);
unsigned long length[1];
my_bool is_null[1];
my_bool error[1];
int32 res[1];
rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1");
myquery(rc);
rc= mysql_query(mysql, "CREATE TABLE t1 (id int not null primary key)");
myquery(rc);
rc= mysql_query(mysql, "insert into t1 values (1), (2), (3), (4)");
myquery(rc);
verify_affected_rows(4);
stmt= mysql_stmt_init(mysql);
rc= mysql_stmt_prepare(stmt, "DELETE FROM t1 WHERE id=? RETURNING id", -1);
check_execute(stmt, rc);
memset(bind, 0, sizeof(bind));
bind[0].buffer_type = MYSQL_TYPE_LONG;
bind[0].buffer = (void *)id;
bind[0].buffer_length = 0;
mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, (void*)&count);
rc= mysql_stmt_bind_param(stmt, bind);
check_execute(stmt, rc);
rc= mysql_stmt_execute(stmt);
myquery(rc);
memset(res_bind, 0, sizeof(res_bind));
res_bind[0].buffer_type= MYSQL_TYPE_LONG;
res_bind[0].buffer= (char *)&res[0];
res_bind[0].is_null= &is_null[0];
res_bind[0].length= &length[0];
res_bind[0].error= &error[0];
rc= mysql_stmt_bind_result(stmt, res_bind);
myquery(rc);
rc= mysql_stmt_store_result(stmt);
myquery(rc);
i= 0;
while (!mysql_stmt_fetch(stmt))
{
i++;
DIE_IF(is_null[0]);
DIE_IF(res[0] != i);
}
DIE_IF(i != 4);
mysql_stmt_close(stmt);
rc= mysql_query(mysql, "SELECT id FROM t1");
myquery(rc);
result= mysql_store_result(mysql);
mytest(result);
i= 0;
while ((row= mysql_fetch_row(result)))
{
i++;
printf("\nResult (SHOULD NOT BE HERE!!!) %d %s \n", i, row[0]);
}
DIE_IF(i != 0 );
mysql_free_result(result);
rc= mysql_query(mysql, "DROP TABLE t1");
myquery(rc);
}
#endif
......@@ -21427,6 +21599,8 @@ static struct my_tests_st my_tests[]= {
{ "test_bulk_autoinc", test_bulk_autoinc},
{ "test_bulk_delete", test_bulk_delete },
{ "test_bulk_replace", test_bulk_replace },
{ "test_bulk_insert_returning", test_bulk_insert_returning },
{ "test_bulk_delete_returning", test_bulk_delete_returning },
#endif
{ "test_ps_params_in_ctes", test_ps_params_in_ctes },
{ "test_explain_meta", test_explain_meta },
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment