Commit caf79dac authored by tomas@poseidon.(none)'s avatar tomas@poseidon.(none)

renamed ha_recovery_logging to ha_enable_transaction

added tests to alter table for "large" alter tables and truncates in ndbcluster
added debug printout in restart() in ndbcluster
added flag THD::transaction.on to enable/disable transaction
parent 6ae278a6
......@@ -72,3 +72,14 @@ col6 col1 col3 fourth col4 col4_5 col5 col7 col8
1 101 3 4 5 PENDING 0000-00-00 00:00:00
2 102 4 3 5 99 PENDING EXTRA 2004-01-01 00:00:00
drop table t1;
DROP TABLE IF EXISTS t2;
create table t2 (a int NOT NULL PRIMARY KEY) engine=myisam;
alter table t2 engine=ndbcluster;
select count(*) from t2;
count(*)
15001
truncate table t2;
select count(*) from t2;
count(*)
0
drop table t2;
......@@ -48,4 +48,21 @@ show table status;
select * from t1 order by col1;
drop table t1;
--disable_warnings
DROP TABLE IF EXISTS t2;
--enable_warnings
create table t2 (a int NOT NULL PRIMARY KEY) engine=myisam;
let $1=15001;
disable_query_log;
while ($1)
{
eval insert into t2 values($1);
dec $1;
}
enable_query_log;
alter table t2 engine=ndbcluster;
select count(*) from t2;
truncate table t2;
select count(*) from t2;
drop table t2;
......@@ -191,6 +191,7 @@ NdbConnection::setErrorCode(int anErrorCode)
int
NdbConnection::restart(){
DBUG_ENTER("NdbConnection::restart");
if(theCompletionStatus == CompletedSuccess){
releaseCompletedOperations();
Uint64 tTransid = theNdb->theFirstTransId;
......@@ -201,9 +202,10 @@ NdbConnection::restart(){
theNdb->theFirstTransId = tTransid + 1;
}
theCompletionStatus = NotCompleted;
return 0;
DBUG_RETURN(0);
}
return -1;
DBUG_PRINT("error",("theCompletionStatus != CompletedSuccess"));
DBUG_RETURN(-1);
}
/*****************************************************************************
......
......@@ -20,7 +20,6 @@
NDB Cluster
*/
#ifdef __GNUC__
#pragma implementation // gcc: Class implementation
#endif
......@@ -1058,8 +1057,17 @@ inline int ha_ndbcluster::next_result(byte *buf)
be sent to NDB
*/
DBUG_PRINT("info", ("ops_pending: %d", ops_pending));
if (current_thd->transaction.on)
{
if (ops_pending && (trans->execute(NoCommit) != 0))
DBUG_RETURN(ndb_err(trans));
}
else
{
if (ops_pending && (trans->execute(Commit) != 0))
DBUG_RETURN(ndb_err(trans));
trans->restart();
}
ops_pending= 0;
contact_ndb= (check == 2);
......@@ -1310,7 +1318,6 @@ int ha_ndbcluster::full_table_scan(byte *buf)
DBUG_RETURN(define_read_attrs(buf, op));
}
inline
int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
{
......@@ -1361,7 +1368,6 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
/*
Insert one record into NDB
*/
int ha_ndbcluster::write_row(byte *record)
{
bool has_auto_increment;
......@@ -1427,17 +1433,45 @@ int ha_ndbcluster::write_row(byte *record)
((rows_inserted % bulk_insert_rows) == 0) ||
uses_blob_value(false) != 0)
{
THD *thd= current_thd;
// Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
(int)rows_inserted, (int)bulk_insert_rows));
bulk_insert_not_flushed= false;
if (thd->transaction.on) {
if (trans->execute(NoCommit) != 0)
{
skip_auto_increment= true;
DBUG_RETURN(ndb_err(trans));
}
}
else
{
if (trans->execute(Commit) != 0)
{
skip_auto_increment= true;
DBUG_RETURN(ndb_err(trans));
}
#if 0 // this is what we want to use but it is not functional
trans->restart();
#else
m_ndb->closeTransaction(m_active_trans);
m_active_trans= m_ndb->startTransaction();
if (thd->transaction.all.ndb_tid)
thd->transaction.all.ndb_tid= m_active_trans;
else
thd->transaction.stmt.ndb_tid= m_active_trans;
if (m_active_trans == NULL)
{
skip_auto_increment= true;
ERR_RETURN(m_ndb->getNdbError());
}
trans= m_active_trans;
#endif
}
}
if ((has_auto_increment) && (skip_auto_increment))
{
Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
......@@ -2494,10 +2528,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction();
if (trans == NULL)
{
thd->transaction.ndb_lock_count--; // We didn't get the lock
ERR_RETURN(m_ndb->getNdbError());
}
thd->transaction.stmt.ndb_tid= trans;
}
else
......@@ -2510,10 +2541,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction();
if (trans == NULL)
{
thd->transaction.ndb_lock_count--; // We didn't get the lock
ERR_RETURN(m_ndb->getNdbError());
}
/*
If this is the start of a LOCK TABLE, a table look
......@@ -3128,6 +3156,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_active_trans(NULL),
m_active_cursor(NULL),
m_ndb(NULL),
m_share(0),
m_table(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NULL_IN_KEY |
......@@ -3177,6 +3206,8 @@ ha_ndbcluster::~ha_ndbcluster()
{
DBUG_ENTER("~ha_ndbcluster");
if (m_share)
free_share(m_share);
release_metadata();
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
blobs_buffer= 0;
......@@ -3219,8 +3250,10 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
set_dbname(name);
set_tabname(name);
if (check_ndb_connection())
if (check_ndb_connection()) {
free_share(m_share); m_share= 0;
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
DBUG_RETURN(get_metadata(name));
}
......@@ -3234,7 +3267,7 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
int ha_ndbcluster::close(void)
{
DBUG_ENTER("close");
free_share(m_share);
free_share(m_share); m_share= 0;
release_metadata();
m_ndb= NULL;
DBUG_RETURN(0);
......
......@@ -1199,14 +1199,15 @@ int handler::rename_table(const char * from, const char * to)
}
/*
Tell the handler to turn on or off logging to the handler's recovery log
Tell the handler to turn on or off transaction in the handler
*/
int ha_recovery_logging(THD *thd, bool on)
int ha_enable_transaction(THD *thd, bool on)
{
int error=0;
DBUG_ENTER("ha_recovery_logging");
DBUG_ENTER("ha_enable_transaction");
thd->transaction.on= on;
DBUG_RETURN(error);
}
......
......@@ -550,7 +550,7 @@ int ha_savepoint(THD *thd, char *savepoint_name);
int ha_autocommit_or_rollback(THD *thd, int error);
void ha_set_spin_retries(uint retries);
bool ha_flush_logs(void);
int ha_recovery_logging(THD *thd, bool on);
int ha_enable_transaction(THD *thd, bool on);
int ha_change_key_cache(KEY_CACHE *old_key_cache,
KEY_CACHE *new_key_cache);
int ha_discover(const char* dbname, const char* name,
......
......@@ -302,6 +302,7 @@ void THD::init(void)
void THD::init_for_queries()
{
ha_enable_transaction(this,TRUE);
init_sql_alloc(&mem_root,
variables.query_alloc_block_size,
variables.query_prealloc_size);
......
......@@ -768,6 +768,7 @@ public:
#ifdef HAVE_NDBCLUSTER_DB
void* ndb;
#endif
bool on;
/*
Tables changed in transaction (that must be invalidated in query cache).
List contain only transactional tables, that not invalidated in query
......
......@@ -648,8 +648,11 @@ int mysql_truncate(THD *thd, TABLE_LIST *table_list, bool dont_send_ok)
{
/* Probably InnoDB table */
table_list->lock_type= TL_WRITE;
DBUG_RETURN(mysql_delete(thd, table_list, (COND*) 0, (SQL_LIST*) 0,
HA_POS_ERROR, 0));
ha_enable_transaction(thd, FALSE);
error= mysql_delete(thd, table_list, (COND*) 0, (SQL_LIST*) 0,
HA_POS_ERROR, 0);
ha_enable_transaction(thd, TRUE);
DBUG_RETURN(error);
}
if (lock_and_wait_for_table_name(thd, table_list))
DBUG_RETURN(-1);
......
......@@ -3343,7 +3343,7 @@ copy_data_between_tables(TABLE *from,TABLE *to,
Turn off recovery logging since rollback of an alter table is to
delete the new table so there is no need to log the changes to it.
*/
error= ha_recovery_logging(thd,FALSE);
error= ha_enable_transaction(thd,FALSE);
if (error)
{
error= 1;
......@@ -3405,7 +3405,7 @@ copy_data_between_tables(TABLE *from,TABLE *to,
}
to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
ha_recovery_logging(thd,TRUE);
ha_enable_transaction(thd,TRUE);
/*
Ensure that the new table is saved properly to disk so that we
can do a rename
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment