Commit 0f18ff33 authored by mikael@dator6.(none)'s avatar mikael@dator6.(none)

BUG#30996: Committed too early when autocommit and lock table

Moved out a lot of code into functions from external_lock and
start_stmt
Fixed a crashing bug at memory alloc failure
Merged the stmt and all variables into one trans variable
Always register start of statement as according to the
interface of the handlers.
Also register for start of transaction when not statement commit
== not autocommit AND no begin - commit ongoing
Now that we registered in a proper manner we also needed to handle
the commit call when end of statement and transaction is ongoing
Added start_stmt_count to know when we have start of statement
for first table
parent d4e84e13
drop table if exists t1;
create table t1 (a int) engine ndb;
set autocommit=1;
lock table t1 write;
set autocommit=0;
insert into t1 values (0);
rollback;
select * from t1;
a
unlock tables;
drop table t1;
-- source include/have_ndb.inc
# BUG 30996
--disable_warnings
drop table if exists t1;
--enable_warnings
create table t1 (a int) engine ndb;
set autocommit=1;
lock table t1 write;
set autocommit=0;
insert into t1 values (0);
rollback;
select * from t1;
unlock tables;
drop table t1;
...@@ -325,9 +325,9 @@ Thd_ndb::Thd_ndb() ...@@ -325,9 +325,9 @@ Thd_ndb::Thd_ndb()
{ {
ndb= new Ndb(g_ndb_cluster_connection, ""); ndb= new Ndb(g_ndb_cluster_connection, "");
lock_count= 0; lock_count= 0;
start_stmt_count= 0;
count= 0; count= 0;
all= NULL; trans= NULL;
stmt= NULL;
m_error= FALSE; m_error= FALSE;
m_error_code= 0; m_error_code= 0;
query_state&= NDB_QUERY_NORMAL; query_state&= NDB_QUERY_NORMAL;
...@@ -382,6 +382,11 @@ Thd_ndb::get_open_table(THD *thd, const void *key) ...@@ -382,6 +382,11 @@ Thd_ndb::get_open_table(THD *thd, const void *key)
{ {
thd_ndb_share= (THD_NDB_SHARE *) alloc_root(&thd->transaction.mem_root, thd_ndb_share= (THD_NDB_SHARE *) alloc_root(&thd->transaction.mem_root,
sizeof(THD_NDB_SHARE)); sizeof(THD_NDB_SHARE));
if (!thd_ndb_share)
{
mem_alloc_error(sizeof(THD_NDB_SHARE));
DBUG_RETURN(NULL);
}
thd_ndb_share->key= key; thd_ndb_share->key= key;
thd_ndb_share->stat.last_count= count; thd_ndb_share->stat.last_count= count;
thd_ndb_share->stat.no_uncommitted_rows_count= 0; thd_ndb_share->stat.no_uncommitted_rows_count= 0;
...@@ -4327,7 +4332,7 @@ static int ndbcluster_update_apply_status(THD *thd, int do_update) ...@@ -4327,7 +4332,7 @@ static int ndbcluster_update_apply_status(THD *thd, int do_update)
Ndb *ndb= thd_ndb->ndb; Ndb *ndb= thd_ndb->ndb;
NDBDICT *dict= ndb->getDictionary(); NDBDICT *dict= ndb->getDictionary();
const NDBTAB *ndbtab; const NDBTAB *ndbtab;
NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt; NdbTransaction *trans= thd_ndb->trans;
ndb->setDatabaseName(NDB_REP_DB); ndb->setDatabaseName(NDB_REP_DB);
Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE); Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE);
if (!(ndbtab= ndbtab_g.get_table())) if (!(ndbtab= ndbtab_g.get_table()))
...@@ -4371,10 +4376,110 @@ static int ndbcluster_update_apply_status(THD *thd, int do_update) ...@@ -4371,10 +4376,110 @@ static int ndbcluster_update_apply_status(THD *thd, int do_update)
} }
#endif /* HAVE_NDB_BINLOG */ #endif /* HAVE_NDB_BINLOG */
void ha_ndbcluster::transaction_checks(THD *thd)
{
if (thd->lex->sql_command == SQLCOM_LOAD)
{
m_transaction_on= FALSE;
/* Would be simpler if has_transactions() didn't always say "yes" */
thd->transaction.all.modified_non_trans_table=
thd->transaction.stmt.modified_non_trans_table= TRUE;
}
else if (!thd->transaction.on)
m_transaction_on= FALSE;
else
m_transaction_on= thd->variables.ndb_use_transactions;
}
int ha_ndbcluster::start_statement(THD *thd,
Thd_ndb *thd_ndb,
Ndb *ndb)
{
DBUG_ENTER("ha_ndbcluster::start_statement");
PRINT_OPTION_FLAGS(thd);
trans_register_ha(thd, FALSE, ndbcluster_hton);
if (!thd_ndb->trans)
{
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(thd, TRUE, ndbcluster_hton);
DBUG_PRINT("trans",("Starting transaction"));
thd_ndb->trans= ndb->startTransaction();
if (thd_ndb->trans == NULL)
ERR_RETURN(ndb->getNdbError());
thd_ndb->init_open_tables();
thd_ndb->query_state&= NDB_QUERY_NORMAL;
thd_ndb->trans_options= 0;
thd_ndb->m_slow_path= FALSE;
if (!(thd->options & OPTION_BIN_LOG) ||
thd->variables.binlog_format == BINLOG_FORMAT_STMT)
{
thd_ndb->trans_options|= TNTO_NO_LOGGING;
thd_ndb->m_slow_path= TRUE;
}
else if (thd->slave_thread)
thd_ndb->m_slow_path= TRUE;
}
/*
If this is the start of a LOCK TABLE, a table look
should be taken on the table in NDB
Check if it should be read or write lock
*/
if (thd->options & (OPTION_TABLE_LOCK))
{
//lockThisTable();
DBUG_PRINT("info", ("Locking the table..." ));
}
DBUG_RETURN(0);
}
int ha_ndbcluster::init_handler_for_statement(THD *thd, Thd_ndb *thd_ndb)
{
/*
This is the place to make sure this handler instance
has a started transaction.
The transaction is started by the first handler on which
MySQL Server calls external lock
Other handlers in the same stmt or transaction should use
the same NDB transaction. This is done by setting up the m_active_trans
pointer to point to the NDB transaction.
*/
DBUG_ENTER("ha_ndbcluster::init_handler_for_statement");
// store thread specific data first to set the right context
m_force_send= thd->variables.ndb_force_send;
m_ha_not_exact_count= !thd->variables.ndb_use_exact_count;
m_autoincrement_prefetch=
(ha_rows) thd->variables.ndb_autoincrement_prefetch_sz;
m_active_trans= thd_ndb->trans;
DBUG_ASSERT(m_active_trans);
// Start of transaction
m_rows_changed= 0;
m_ops_pending= 0;
m_slow_path= thd_ndb->m_slow_path;
#ifdef HAVE_NDB_BINLOG
if (unlikely(m_slow_path))
{
if (m_share == ndb_apply_status_share && thd->slave_thread)
thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
}
#endif
// TODO remove double pointers...
if (!(m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table)))
{
DBUG_RETURN(1);
}
m_table_info= &m_thd_ndb_share->stat;
DBUG_RETURN(0);
}
int ha_ndbcluster::external_lock(THD *thd, int lock_type) int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{ {
int error=0; int error=0;
NdbTransaction* trans= NULL;
DBUG_ENTER("external_lock"); DBUG_ENTER("external_lock");
/* /*
...@@ -4395,124 +4500,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -4395,124 +4500,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (lock_type != F_UNLCK) if (lock_type != F_UNLCK)
{ {
DBUG_PRINT("info", ("lock_type != F_UNLCK")); DBUG_PRINT("info", ("lock_type != F_UNLCK"));
if (thd->lex->sql_command == SQLCOM_LOAD) transaction_checks(thd);
{
m_transaction_on= FALSE;
/* Would be simpler if has_transactions() didn't always say "yes" */
thd->transaction.all.modified_non_trans_table= thd->transaction.stmt.modified_non_trans_table= TRUE;
}
else if (!thd->transaction.on)
m_transaction_on= FALSE;
else
m_transaction_on= thd->variables.ndb_use_transactions;
if (!thd_ndb->lock_count++) if (!thd_ndb->lock_count++)
{ {
PRINT_OPTION_FLAGS(thd); if ((error= start_statement(thd, thd_ndb, ndb)))
if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) goto error;
{
// Autocommit transaction
DBUG_ASSERT(!thd_ndb->stmt);
DBUG_PRINT("trans",("Starting transaction stmt"));
trans= ndb->startTransaction();
if (trans == NULL)
{
thd_ndb->lock_count= 0;
ERR_RETURN(ndb->getNdbError());
}
thd_ndb->init_open_tables();
thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
thd_ndb->trans_options= 0;
thd_ndb->m_slow_path= FALSE;
if (!(thd->options & OPTION_BIN_LOG) ||
thd->variables.binlog_format == BINLOG_FORMAT_STMT)
{
thd_ndb->trans_options|= TNTO_NO_LOGGING;
thd_ndb->m_slow_path= TRUE;
}
else if (thd->slave_thread)
thd_ndb->m_slow_path= TRUE;
trans_register_ha(thd, FALSE, ndbcluster_hton);
}
else
{
if (!thd_ndb->all)
{
// Not autocommit transaction
// A "master" transaction ha not been started yet
DBUG_PRINT("trans",("starting transaction, all"));
trans= ndb->startTransaction();
if (trans == NULL)
{
thd_ndb->lock_count= 0;
ERR_RETURN(ndb->getNdbError());
}
thd_ndb->init_open_tables();
thd_ndb->all= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
thd_ndb->trans_options= 0;
thd_ndb->m_slow_path= FALSE;
if (!(thd->options & OPTION_BIN_LOG) ||
thd->variables.binlog_format == BINLOG_FORMAT_STMT)
{
thd_ndb->trans_options|= TNTO_NO_LOGGING;
thd_ndb->m_slow_path= TRUE;
}
else if (thd->slave_thread)
thd_ndb->m_slow_path= TRUE;
trans_register_ha(thd, TRUE, ndbcluster_hton);
/*
If this is the start of a LOCK TABLE, a table look
should be taken on the table in NDB
Check if it should be read or write lock
*/
if (thd->options & (OPTION_TABLE_LOCK))
{
//lockThisTable();
DBUG_PRINT("info", ("Locking the table..." ));
}
}
}
}
/*
This is the place to make sure this handler instance
has a started transaction.
The transaction is started by the first handler on which
MySQL Server calls external lock
Other handlers in the same stmt or transaction should use
the same NDB transaction. This is done by setting up the m_active_trans
pointer to point to the NDB transaction.
*/
// store thread specific data first to set the right context
m_force_send= thd->variables.ndb_force_send;
m_ha_not_exact_count= !thd->variables.ndb_use_exact_count;
m_autoincrement_prefetch=
(ha_rows) thd->variables.ndb_autoincrement_prefetch_sz;
m_active_trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
DBUG_ASSERT(m_active_trans);
// Start of transaction
m_rows_changed= 0;
m_ops_pending= 0;
m_slow_path= thd_ndb->m_slow_path;
#ifdef HAVE_NDB_BINLOG
if (unlikely(m_slow_path))
{
if (m_share == ndb_apply_status_share && thd->slave_thread)
thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
} }
#endif if ((error= init_handler_for_statement(thd, thd_ndb)))
// TODO remove double pointers... goto error;
m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table); DBUG_RETURN(0);
m_table_info= &m_thd_ndb_share->stat;
} }
else else
{ {
...@@ -4540,16 +4536,19 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -4540,16 +4536,19 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
DBUG_PRINT("trans", ("Last external_lock")); DBUG_PRINT("trans", ("Last external_lock"));
PRINT_OPTION_FLAGS(thd); PRINT_OPTION_FLAGS(thd);
if (thd_ndb->stmt) if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
{ {
/* if (thd_ndb->trans)
Unlock is done without a transaction commit / rollback. {
This happens if the thread didn't update any rows /*
We must in this case close the transaction to release resources Unlock is done without a transaction commit / rollback.
*/ This happens if the thread didn't update any rows
DBUG_PRINT("trans",("ending non-updating transaction")); We must in this case close the transaction to release resources
ndb->closeTransaction(m_active_trans); */
thd_ndb->stmt= NULL; DBUG_PRINT("trans",("ending non-updating transaction"));
ndb->closeTransaction(thd_ndb->trans);
thd_ndb->trans= NULL;
}
} }
} }
m_table_info= NULL; m_table_info= NULL;
...@@ -4578,7 +4577,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -4578,7 +4577,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (m_ops_pending) if (m_ops_pending)
DBUG_PRINT("warning", ("ops_pending != 0L")); DBUG_PRINT("warning", ("ops_pending != 0L"));
m_ops_pending= 0; m_ops_pending= 0;
DBUG_RETURN(0);
} }
error:
thd_ndb->lock_count--;
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -4610,25 +4612,20 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) ...@@ -4610,25 +4612,20 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type)
{ {
int error=0; int error=0;
DBUG_ENTER("start_stmt"); DBUG_ENTER("start_stmt");
PRINT_OPTION_FLAGS(thd);
Thd_ndb *thd_ndb= get_thd_ndb(thd); Thd_ndb *thd_ndb= get_thd_ndb(thd);
NdbTransaction *trans= (thd_ndb->stmt)?thd_ndb->stmt:thd_ndb->all; transaction_checks(thd);
if (!trans){ if (!thd_ndb->start_stmt_count++)
{
Ndb *ndb= thd_ndb->ndb; Ndb *ndb= thd_ndb->ndb;
DBUG_PRINT("trans",("Starting transaction stmt")); if ((error= start_statement(thd, thd_ndb, ndb)))
trans= ndb->startTransaction(); goto error;
if (trans == NULL)
ERR_RETURN(ndb->getNdbError());
no_uncommitted_rows_reset(thd);
thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
trans_register_ha(thd, FALSE, ndbcluster_hton);
} }
m_active_trans= trans; if ((error= init_handler_for_statement(thd, thd_ndb)))
// Start of statement goto error;
m_ops_pending= 0; DBUG_RETURN(0);
error:
thd_ndb->start_stmt_count--;
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -4642,15 +4639,27 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all) ...@@ -4642,15 +4639,27 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
int res= 0; int res= 0;
Thd_ndb *thd_ndb= get_thd_ndb(thd); Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb; Ndb *ndb= thd_ndb->ndb;
NdbTransaction *trans= all ? thd_ndb->all : thd_ndb->stmt; NdbTransaction *trans= thd_ndb->trans;
DBUG_ENTER("ndbcluster_commit"); DBUG_ENTER("ndbcluster_commit");
DBUG_PRINT("transaction",("%s",
trans == thd_ndb->stmt ?
"stmt" : "all"));
DBUG_ASSERT(ndb); DBUG_ASSERT(ndb);
if (trans == NULL) thd_ndb->start_stmt_count= 0;
if (trans == NULL || (!all &&
thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
{
/*
An odditity in the handler interface is that commit on handlerton
is called to indicate end of statement only in cases where
autocommit isn't used and the all flag isn't set.
We also leave quickly when a transaction haven't even been started,
in this case we are safe that no clean up is needed. In this case
the MySQL Server could handle the query without contacting the
NDB kernel.
*/
DBUG_PRINT("info", ("Commit before start or end-of-statement only"));
DBUG_RETURN(0); DBUG_RETURN(0);
}
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
if (unlikely(thd_ndb->m_slow_path)) if (unlikely(thd_ndb->m_slow_path))
...@@ -4671,11 +4680,7 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all) ...@@ -4671,11 +4680,7 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
ndbcluster_print_error(res, error_op); ndbcluster_print_error(res, error_op);
} }
ndb->closeTransaction(trans); ndb->closeTransaction(trans);
thd_ndb->trans= NULL;
if (all)
thd_ndb->all= NULL;
else
thd_ndb->stmt= NULL;
/* Clear commit_count for tables changed by transaction */ /* Clear commit_count for tables changed by transaction */
NDB_SHARE* share; NDB_SHARE* share;
...@@ -4704,13 +4709,15 @@ static int ndbcluster_rollback(handlerton *hton, THD *thd, bool all) ...@@ -4704,13 +4709,15 @@ static int ndbcluster_rollback(handlerton *hton, THD *thd, bool all)
int res= 0; int res= 0;
Thd_ndb *thd_ndb= get_thd_ndb(thd); Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb; Ndb *ndb= thd_ndb->ndb;
NdbTransaction *trans= all ? thd_ndb->all : thd_ndb->stmt; NdbTransaction *trans= thd_ndb->trans;
DBUG_ENTER("ndbcluster_rollback"); DBUG_ENTER("ndbcluster_rollback");
DBUG_PRINT("transaction",("%s", DBUG_ASSERT(ndb);
trans == thd_ndb->stmt ? thd_ndb->start_stmt_count= 0;
"stmt" : "all")); if (!trans)
DBUG_ASSERT(ndb && trans); {
DBUG_RETURN(0);
}
if (trans->execute(NdbTransaction::Rollback) != 0) if (trans->execute(NdbTransaction::Rollback) != 0)
{ {
...@@ -4722,11 +4729,7 @@ static int ndbcluster_rollback(handlerton *hton, THD *thd, bool all) ...@@ -4722,11 +4729,7 @@ static int ndbcluster_rollback(handlerton *hton, THD *thd, bool all)
ndbcluster_print_error(res, error_op); ndbcluster_print_error(res, error_op);
} }
ndb->closeTransaction(trans); ndb->closeTransaction(trans);
thd_ndb->trans= NULL;
if (all)
thd_ndb->all= NULL;
else
thd_ndb->stmt= NULL;
/* Clear list of tables changed by transaction */ /* Clear list of tables changed by transaction */
thd_ndb->changed_tables.empty(); thd_ndb->changed_tables.empty();
......
...@@ -204,8 +204,8 @@ class Thd_ndb ...@@ -204,8 +204,8 @@ class Thd_ndb
Ndb *ndb; Ndb *ndb;
ulong count; ulong count;
uint lock_count; uint lock_count;
NdbTransaction *all; uint start_stmt_count;
NdbTransaction *stmt; NdbTransaction *trans;
bool m_error; bool m_error;
bool m_slow_path; bool m_slow_path;
int m_error_code; int m_error_code;
...@@ -496,6 +496,10 @@ private: ...@@ -496,6 +496,10 @@ private:
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool);
void transaction_checks(THD *thd);
int start_statement(THD *thd, Thd_ndb *thd_ndb, Ndb* ndb);
int init_handler_for_statement(THD *thd, Thd_ndb *thd_ndb);
NdbTransaction *m_active_trans; NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor; NdbScanOperation *m_active_cursor;
const NdbDictionary::Table *m_table; const NdbDictionary::Table *m_table;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment