Commit c9a41abb authored by jonas@perch.ndb.mysql.com's avatar jonas@perch.ndb.mysql.com

Merge perch.ndb.mysql.com:/home/jonas/src/mysql-5.1-new

into  perch.ndb.mysql.com:/home/jonas/src/mysql-5.1-new-ndb
parents ed42379c 0c3127cf
...@@ -64,17 +64,26 @@ pk u o ...@@ -64,17 +64,26 @@ pk u o
insert into t1 values (1,1,1); insert into t1 values (1,1,1);
drop table t1; drop table t1;
create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb;
insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1);
begin; begin;
select * from t1 where x = 1 for update; select * from t1 where x = 1 for update;
x y z x y z
1 one 1 1 one 1
begin; begin;
select * from t1 where x = 2 for update; select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
rollback;
insert into t1 values (2,'two',2),(3,"three",3);
begin;
select * from t1 where x = 1 for update;
x y z x y z
2 two 2 1 one 1
select * from t1 where x = 1 for update; select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction ERROR HY000: Lock wait timeout exceeded; try restarting transaction
select * from t1 where x = 2 for update;
x y z
2 two 2
rollback; rollback;
commit; commit;
begin; begin;
......
...@@ -286,10 +286,10 @@ INSERT INTO t1 VALUES ...@@ -286,10 +286,10 @@ INSERT INTO t1 VALUES
(406990,67,'2006-02-23 18:01:45'),(148815,67,'2005-10-25 15:34:17'), (406990,67,'2006-02-23 18:01:45'),(148815,67,'2005-10-25 15:34:17'),
(148812,67,'2005-10-25 15:30:01'),(245651,67,'2005-12-08 15:58:27'), (148812,67,'2005-10-25 15:30:01'),(245651,67,'2005-12-08 15:58:27'),
(154503,67,'2005-10-28 11:52:38'); (154503,67,'2005-10-28 11:52:38');
create table t11 select * from t1 where b = 67 AND (c IS NULL OR c > NOW()) order by 3 asc; create table t11 engine = ndbcluster select * from t1 where b = 67 AND (c IS NULL OR c > NOW()) order by 3 asc;
create table t12 select * from t1 where b = 67 AND (c IS NULL OR c > NOW()) order by 3 desc; create table t12 engine = ndbcluster select * from t1 where b = 67 AND (c IS NULL OR c > NOW()) order by 3 desc;
create table t21 select * from t1 where b = 67 AND (c IS NULL OR c > '2005-12-08') order by 3 asc; create table t21 select * from t1 where b = 67 AND (c IS NULL OR c > '2005-12-08') order by 3 asc;
create table t22 select * from t1 where b = 67 AND (c IS NULL OR c > '2005-12-08') order by 3 desc; create table t22 engine = ndbcluster select * from t1 where b = 67 AND (c IS NULL OR c > '2005-12-08') order by 3 desc;
select * from t11 order by 1,2,3; select * from t11 order by 1,2,3;
a b c a b c
254 67 NULL 254 67 NULL
...@@ -366,4 +366,65 @@ a b c ...@@ -366,4 +366,65 @@ a b c
406993 67 2006-02-27 11:20:57 406993 67 2006-02-27 11:20:57
406994 67 2006-02-27 11:26:46 406994 67 2006-02-27 11:26:46
406995 67 2006-02-28 11:55:00 406995 67 2006-02-28 11:55:00
select t12.a from t11, t12 where t11.a in(255,256) and t11.a = t12.a and t11.c is null order by t12.a;
a
255
256
update t22 set c = '2005-12-08 15:58:27' where a = 255;
select * from t22 order by 1,2,3;
a b c
1 67 2006-02-23 15:01:35
254 67 NULL
255 67 2005-12-08 15:58:27
256 67 NULL
1120 67 NULL
1133 67 NULL
4101 67 NULL
9199 67 NULL
223456 67 NULL
245651 67 2005-12-08 15:58:27
245652 67 2005-12-08 15:58:27
245653 67 2005-12-08 15:59:07
245654 67 2005-12-08 15:59:08
245655 67 2005-12-08 15:59:08
398340 67 2006-02-20 04:38:53
398341 67 2006-02-20 04:48:44
398545 67 2006-02-20 04:53:13
406631 67 2006-02-23 10:49:42
406988 67 2006-02-23 17:07:22
406989 67 2006-02-23 17:08:46
406990 67 2006-02-23 18:01:45
406991 67 2006-02-24 16:42:32
406992 67 2006-02-24 16:47:18
406993 67 2006-02-27 11:20:57
406994 67 2006-02-27 11:26:46
406995 67 2006-02-28 11:55:00
select t21.* from t21,t22 where t21.a = t22.a and
t22.a in (select t12.a from t11, t12 where t11.a in(255,256) and t11.a = t12.a and t11.c is null) and t22.c is null order by t21.a;
a b c
256 67 NULL
delete from t22 where a > 245651;
update t22 set b = a + 1;
select * from t22 order by 1,2,3;
a b c
1 2 2006-02-23 15:01:35
254 255 NULL
255 256 2005-12-08 15:58:27
256 257 NULL
1120 1121 NULL
1133 1134 NULL
4101 4102 NULL
9199 9200 NULL
223456 223457 NULL
245651 245652 2005-12-08 15:58:27
select c, count(*)
from t21
inner join t22 using (a)
where t22.b in (2,256,257,1121,1134,4102,9200,223457,245652)
group by c
order by c;
c count(*)
NULL 7
2005-12-08 15:58:27 1
2006-02-23 15:01:35 1
DROP TABLE t1, t11, t12, t21, t22; DROP TABLE t1, t11, t12, t21, t22;
...@@ -20,7 +20,6 @@ ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t ...@@ -20,7 +20,6 @@ ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t
ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
ndb_binlog_ignore_db : BUG#21279 2006-07-25 ingo Randomly throws a warning ndb_binlog_ignore_db : BUG#21279 2006-07-25 ingo Randomly throws a warning
ndb_load : BUG#17233 2006-05-04 tomas failed load data from infile causes mysqld dbug_assert, binlog not flushed ndb_load : BUG#17233 2006-05-04 tomas failed load data from infile causes mysqld dbug_assert, binlog not flushed
ndb_restore_compat : BUG#21283 2006-07-26 ingo Test fails randomly
partition_03ndb : BUG#16385 2006-03-24 mikael Partitions: crash when updating a range partitioned NDB table partition_03ndb : BUG#16385 2006-03-24 mikael Partitions: crash when updating a range partitioned NDB table
ps : BUG#21524 2006-08-08 pgalbraith 'ps' test fails in --ps-protocol test AMD64 bit ps : BUG#21524 2006-08-08 pgalbraith 'ps' test fails in --ps-protocol test AMD64 bit
ps_7ndb : BUG#18950 2006-02-16 jmiller create table like does not obtain LOCK_open ps_7ndb : BUG#18950 2006-02-16 jmiller create table like does not obtain LOCK_open
......
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
-- source include/have_multi_ndb.inc -- source include/have_multi_ndb.inc
-- source include/not_embedded.inc -- source include/not_embedded.inc
# see bug#21563
-- source include/have_binlog_format_row.inc
--disable_warnings --disable_warnings
drop table if exists t1, t2; drop table if exists t1, t2;
--enable_warnings --enable_warnings
......
...@@ -73,7 +73,7 @@ drop table t1; ...@@ -73,7 +73,7 @@ drop table t1;
create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb;
insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1);
# PK access # PK access
connection con1; connection con1;
...@@ -82,11 +82,22 @@ select * from t1 where x = 1 for update; ...@@ -82,11 +82,22 @@ select * from t1 where x = 1 for update;
connection con2; connection con2;
begin; begin;
select * from t1 where x = 2 for update;
--error 1205 --error 1205
select * from t1 where x = 1 for update; select * from t1 where x = 1 for update;
rollback; rollback;
connection con1;
rollback;
insert into t1 values (2,'two',2),(3,"three",3);
begin;
select * from t1 where x = 1 for update;
connection con2;
--error 1205
select * from t1 where x = 1 for update;
select * from t1 where x = 2 for update;
rollback;
connection con1; connection con1;
commit; commit;
......
...@@ -228,13 +228,32 @@ INSERT INTO t1 VALUES ...@@ -228,13 +228,32 @@ INSERT INTO t1 VALUES
(148812,67,'2005-10-25 15:30:01'),(245651,67,'2005-12-08 15:58:27'), (148812,67,'2005-10-25 15:30:01'),(245651,67,'2005-12-08 15:58:27'),
(154503,67,'2005-10-28 11:52:38'); (154503,67,'2005-10-28 11:52:38');
create table t11 select * from t1 where b = 67 AND (c IS NULL OR c > NOW()) order by 3 asc; create table t11 engine = ndbcluster select * from t1 where b = 67 AND (c IS NULL OR c > NOW()) order by 3 asc;
create table t12 select * from t1 where b = 67 AND (c IS NULL OR c > NOW()) order by 3 desc; create table t12 engine = ndbcluster select * from t1 where b = 67 AND (c IS NULL OR c > NOW()) order by 3 desc;
create table t21 select * from t1 where b = 67 AND (c IS NULL OR c > '2005-12-08') order by 3 asc; create table t21 select * from t1 where b = 67 AND (c IS NULL OR c > '2005-12-08') order by 3 asc;
create table t22 select * from t1 where b = 67 AND (c IS NULL OR c > '2005-12-08') order by 3 desc; create table t22 engine = ndbcluster select * from t1 where b = 67 AND (c IS NULL OR c > '2005-12-08') order by 3 desc;
select * from t11 order by 1,2,3; select * from t11 order by 1,2,3;
select * from t12 order by 1,2,3; select * from t12 order by 1,2,3;
select * from t21 order by 1,2,3; select * from t21 order by 1,2,3;
select * from t22 order by 1,2,3; select * from t22 order by 1,2,3;
# join tests
select t12.a from t11, t12 where t11.a in(255,256) and t11.a = t12.a and t11.c is null order by t12.a;
update t22 set c = '2005-12-08 15:58:27' where a = 255;
select * from t22 order by 1,2,3;
select t21.* from t21,t22 where t21.a = t22.a and
t22.a in (select t12.a from t11, t12 where t11.a in(255,256) and t11.a = t12.a and t11.c is null) and t22.c is null order by t21.a;
delete from t22 where a > 245651;
update t22 set b = a + 1;
select * from t22 order by 1,2,3;
select c, count(*)
from t21
inner join t22 using (a)
where t22.b in (2,256,257,1121,1134,4102,9200,223457,245652)
group by c
order by c;
DROP TABLE t1, t11, t12, t21, t22; DROP TABLE t1, t11, t12, t21, t22;
...@@ -256,13 +256,15 @@ int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans) ...@@ -256,13 +256,15 @@ int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
} }
inline inline
int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans,
bool force_release)
{ {
#ifdef NOT_USED #ifdef NOT_USED
int m_batch_execute= 0; int m_batch_execute= 0;
if (m_batch_execute) if (m_batch_execute)
return 0; return 0;
#endif #endif
h->release_completed_operations(trans, force_release);
return h->m_ignore_no_key ? return h->m_ignore_no_key ?
execute_no_commit_ignore_no_key(h,trans) : execute_no_commit_ignore_no_key(h,trans) :
trans->execute(NdbTransaction::NoCommit, trans->execute(NdbTransaction::NoCommit,
...@@ -297,13 +299,15 @@ int execute_commit(THD *thd, NdbTransaction *trans) ...@@ -297,13 +299,15 @@ int execute_commit(THD *thd, NdbTransaction *trans)
} }
inline inline
int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans,
bool force_release)
{ {
#ifdef NOT_USED #ifdef NOT_USED
int m_batch_execute= 0; int m_batch_execute= 0;
if (m_batch_execute) if (m_batch_execute)
return 0; return 0;
#endif #endif
h->release_completed_operations(trans, force_release);
return trans->execute(NdbTransaction::NoCommit, return trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AO_IgnoreError, NdbTransaction::AO_IgnoreError,
h->m_force_send); h->m_force_send);
...@@ -328,6 +332,7 @@ Thd_ndb::Thd_ndb() ...@@ -328,6 +332,7 @@ Thd_ndb::Thd_ndb()
all= NULL; all= NULL;
stmt= NULL; stmt= NULL;
error= 0; error= 0;
query_state&= NDB_QUERY_NORMAL;
options= 0; options= 0;
(void) hash_init(&open_tables, &my_charset_bin, 5, 0, 0, (void) hash_init(&open_tables, &my_charset_bin, 5, 0, 0,
(hash_get_key)thd_ndb_share_get_key, 0, 0); (hash_get_key)thd_ndb_share_get_key, 0, 0);
...@@ -1696,7 +1701,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf, ...@@ -1696,7 +1701,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf,
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
} }
if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0)
{ {
table->status= STATUS_NOT_FOUND; table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
...@@ -1761,7 +1766,7 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data, ...@@ -1761,7 +1766,7 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data,
} }
} }
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
{ {
table->status= STATUS_NOT_FOUND; table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
...@@ -1914,7 +1919,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record) ...@@ -1914,7 +1919,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record)
} }
last= trans->getLastDefinedOperation(); last= trans->getLastDefinedOperation();
if (first) if (first)
res= execute_no_commit_ie(this,trans); res= execute_no_commit_ie(this,trans,false);
else else
{ {
// Table has no keys // Table has no keys
...@@ -1963,7 +1968,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -1963,7 +1968,7 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((res= define_read_attrs(buf, op))) if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res); DBUG_RETURN(res);
if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0)
{ {
table->status= STATUS_NOT_FOUND; table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
...@@ -2011,7 +2016,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) ...@@ -2011,7 +2016,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
*/ */
if (m_ops_pending && m_blobs_pending) if (m_ops_pending && m_blobs_pending)
{ {
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
m_ops_pending= 0; m_ops_pending= 0;
m_blobs_pending= FALSE; m_blobs_pending= FALSE;
...@@ -2043,7 +2048,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) ...@@ -2043,7 +2048,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
{ {
if (m_transaction_on) if (m_transaction_on)
{ {
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
else else
...@@ -2370,7 +2375,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -2370,7 +2375,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
} }
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(next_result(buf)); DBUG_RETURN(next_result(buf));
...@@ -2440,7 +2445,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) ...@@ -2440,7 +2445,7 @@ int ha_ndbcluster::full_table_scan(byte *buf)
if ((res= define_read_attrs(buf, op))) if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res); DBUG_RETURN(res);
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully")); DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf)); DBUG_RETURN(next_result(buf));
...@@ -2603,7 +2608,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2603,7 +2608,7 @@ int ha_ndbcluster::write_row(byte *record)
m_bulk_insert_not_flushed= FALSE; m_bulk_insert_not_flushed= FALSE;
if (m_transaction_on) if (m_transaction_on)
{ {
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
{ {
m_skip_auto_increment= TRUE; m_skip_auto_increment= TRUE;
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
...@@ -2840,7 +2845,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -2840,7 +2845,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
op->setValue(no_fields, part_func_value); op->setValue(no_fields, part_func_value);
} }
// Execute update operation // Execute update operation
if (!cursor && execute_no_commit(this,trans) != 0) { if (!cursor && execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
...@@ -2926,7 +2931,7 @@ int ha_ndbcluster::delete_row(const byte *record) ...@@ -2926,7 +2931,7 @@ int ha_ndbcluster::delete_row(const byte *record)
} }
// Execute delete operation // Execute delete operation
if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
...@@ -3392,6 +3397,26 @@ int ha_ndbcluster::close_scan() ...@@ -3392,6 +3397,26 @@ int ha_ndbcluster::close_scan()
NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor; NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor;
if (m_lock_tuple)
{
/*
Lock level m_lock.type either TL_WRITE_ALLOW_WRITE
(SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT
LOCK WITH SHARE MODE) and row was not explictly unlocked
with unlock_row() call
*/
NdbOperation *op;
// Lock row
DBUG_PRINT("info", ("Keeping lock on scanned row"));
if (!(op= cursor->lockCurrentTuple()))
{
m_lock_tuple= false;
ERR_RETURN(trans->getNdbError());
}
m_ops_pending++;
}
m_lock_tuple= false;
if (m_ops_pending) if (m_ops_pending)
{ {
/* /*
...@@ -3399,7 +3424,7 @@ int ha_ndbcluster::close_scan() ...@@ -3399,7 +3424,7 @@ int ha_ndbcluster::close_scan()
deleteing/updating transaction before closing the scan deleteing/updating transaction before closing the scan
*/ */
DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));
if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
...@@ -3793,7 +3818,7 @@ int ha_ndbcluster::end_bulk_insert() ...@@ -3793,7 +3818,7 @@ int ha_ndbcluster::end_bulk_insert()
m_bulk_insert_not_flushed= FALSE; m_bulk_insert_not_flushed= FALSE;
if (m_transaction_on) if (m_transaction_on)
{ {
if (execute_no_commit(this, trans) != 0) if (execute_no_commit(this, trans,false) != 0)
{ {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
my_errno= error= ndb_err(trans); my_errno= error= ndb_err(trans);
...@@ -3968,6 +3993,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3968,6 +3993,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
ERR_RETURN(ndb->getNdbError()); ERR_RETURN(ndb->getNdbError());
thd_ndb->init_open_tables(); thd_ndb->init_open_tables();
thd_ndb->stmt= trans; thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
trans_register_ha(thd, FALSE, &ndbcluster_hton); trans_register_ha(thd, FALSE, &ndbcluster_hton);
} }
else else
...@@ -3983,6 +4009,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3983,6 +4009,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
ERR_RETURN(ndb->getNdbError()); ERR_RETURN(ndb->getNdbError());
thd_ndb->init_open_tables(); thd_ndb->init_open_tables();
thd_ndb->all= trans; thd_ndb->all= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
trans_register_ha(thd, TRUE, &ndbcluster_hton); trans_register_ha(thd, TRUE, &ndbcluster_hton);
/* /*
...@@ -4139,6 +4166,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) ...@@ -4139,6 +4166,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type)
thd_ndb->stmt= trans; thd_ndb->stmt= trans;
trans_register_ha(thd, FALSE, &ndbcluster_hton); trans_register_ha(thd, FALSE, &ndbcluster_hton);
} }
thd_ndb->query_state&= NDB_QUERY_NORMAL;
m_active_trans= trans; m_active_trans= trans;
// Start of statement // Start of statement
...@@ -7572,6 +7600,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ...@@ -7572,6 +7600,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
NDB_INDEX_TYPE index_type= get_index_type(active_index); NDB_INDEX_TYPE index_type= get_index_type(active_index);
ulong reclength= table_share->reclength; ulong reclength= table_share->reclength;
NdbOperation* op; NdbOperation* op;
Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
if (uses_blob_value()) if (uses_blob_value())
{ {
...@@ -7585,7 +7614,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ...@@ -7585,7 +7614,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
sorted, sorted,
buffer)); buffer));
} }
thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE;
m_disable_multi_read= FALSE; m_disable_multi_read= FALSE;
/** /**
...@@ -7757,7 +7786,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ...@@ -7757,7 +7786,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
*/ */
m_current_multi_operation= m_current_multi_operation=
lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
if (!(res= execute_no_commit_ie(this, m_active_trans))) if (!(res= execute_no_commit_ie(this, m_active_trans,true)))
{ {
m_multi_range_defined= multi_range_curr; m_multi_range_defined= multi_range_curr;
multi_range_curr= ranges; multi_range_curr= ranges;
...@@ -9558,6 +9587,24 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack, ...@@ -9558,6 +9587,24 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
void
ha_ndbcluster::release_completed_operations(NdbTransaction *trans,
bool force_release)
{
if (!force_release)
{
if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE)
{
/* We are batching reads and have not consumed all fetched
rows yet, releasing operation records is unsafe
*/
return;
}
}
trans->releaseCompletedOperations();
}
/* /*
get table space info for SHOW CREATE TABLE get table space info for SHOW CREATE TABLE
*/ */
......
...@@ -534,6 +534,12 @@ class Ndb_cond_traverse_context ...@@ -534,6 +534,12 @@ class Ndb_cond_traverse_context
Ndb_rewrite_context *rewrite_stack; Ndb_rewrite_context *rewrite_stack;
}; };
typedef enum ndb_query_state_bits {
NDB_QUERY_NORMAL = 0,
NDB_QUERY_MULTI_READ_RANGE = 1
} NDB_QUERY_STATE_BITS;
/* /*
Place holder for ha_ndbcluster thread specific data Place holder for ha_ndbcluster thread specific data
*/ */
...@@ -571,6 +577,7 @@ class Thd_ndb ...@@ -571,6 +577,7 @@ class Thd_ndb
int error; int error;
uint32 options; uint32 options;
List<NDB_SHARE> changed_tables; List<NDB_SHARE> changed_tables;
uint query_state;
HASH open_tables; HASH open_tables;
}; };
...@@ -833,6 +840,8 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -833,6 +840,8 @@ static void set_tabname(const char *pathname, char *tabname);
void no_uncommitted_rows_update(int); void no_uncommitted_rows_update(int);
void no_uncommitted_rows_reset(THD *); void no_uncommitted_rows_reset(THD *);
void release_completed_operations(NdbTransaction*, bool);
/* /*
Condition pushdown Condition pushdown
*/ */
...@@ -849,8 +858,8 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -849,8 +858,8 @@ static void set_tabname(const char *pathname, char *tabname);
friend int execute_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool);
NdbTransaction *m_active_trans; NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor; NdbScanOperation *m_active_cursor;
......
...@@ -3858,12 +3858,37 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3858,12 +3858,37 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
close_thread_tables(thd); close_thread_tables(thd);
pthread_mutex_lock(&injector_mutex); pthread_mutex_lock(&injector_mutex);
/* don't mess with the injector_ndb anymore from other threads */ /* don't mess with the injector_ndb anymore from other threads */
int ndb_obj_cnt= 1; // g_ndb
ndb_obj_cnt+= injector_ndb == 0 ? 0 : 1;
ndb_obj_cnt+= schema_ndb == 0 ? 0 : 1;
ndb_obj_cnt+= ndbcluster_util_inited ? 1 : 0;
injector_thd= 0; injector_thd= 0;
injector_ndb= 0; injector_ndb= 0;
schema_ndb= 0; schema_ndb= 0;
pthread_mutex_unlock(&injector_mutex); pthread_mutex_unlock(&injector_mutex);
thd->db= 0; // as not to try to free memory thd->db= 0; // as not to try to free memory
if (!ndb_extra_logging)
sql_print_information("Stopping Cluster Binlog"); sql_print_information("Stopping Cluster Binlog");
else
sql_print_information("Stopping Cluster Binlog: %u(%u)",
g_ndb_cluster_connection->get_active_ndb_objects(),
ndb_obj_cnt);
/**
* Add extra wait loop to make user "user" ndb-object go away...
* otherwise user thread can have ongoing SUB_DATA
*/
int sleep_cnt= 0;
while (sleep_cnt < 300 && g_ndb_cluster_connection->get_active_ndb_objects() > ndb_obj_cnt)
{
my_sleep(10000); // 10ms
sleep_cnt++;
}
if (ndb_extra_logging)
sql_print_information("Stopping Cluster Binlog: waited %ums %u(%u)",
10*sleep_cnt, g_ndb_cluster_connection->get_active_ndb_objects(),
ndb_obj_cnt);
if (apply_status_share) if (apply_status_share)
{ {
......
...@@ -140,6 +140,7 @@ class NdbTransaction ...@@ -140,6 +140,7 @@ class NdbTransaction
friend class NdbIndexOperation; friend class NdbIndexOperation;
friend class NdbIndexScanOperation; friend class NdbIndexScanOperation;
friend class NdbBlob; friend class NdbBlob;
friend class ha_ndbcluster;
#endif #endif
public: public:
......
...@@ -114,6 +114,7 @@ public: ...@@ -114,6 +114,7 @@ public:
void init_get_next_node(Ndb_cluster_connection_node_iter &iter); void init_get_next_node(Ndb_cluster_connection_node_iter &iter);
unsigned int get_next_node(Ndb_cluster_connection_node_iter &iter); unsigned int get_next_node(Ndb_cluster_connection_node_iter &iter);
unsigned get_active_ndb_objects() const;
#endif #endif
private: private:
......
...@@ -742,9 +742,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -742,9 +742,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
const Uint32 oid = sdata->senderData; const Uint32 oid = sdata->senderData;
NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid); NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
if (op->m_magic_number != NDB_EVENT_OP_MAGIC_NUMBER) if (unlikely(op == 0 || op->m_magic_number != NDB_EVENT_OP_MAGIC_NUMBER))
{
g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic " g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic "
"number"); "number");
return ;
}
// Accumulate DIC_TAB_INFO for TE_ALTER events // Accumulate DIC_TAB_INFO for TE_ALTER events
if (sdata->operation == NdbDictionary::Event::_TE_ALTER && if (sdata->operation == NdbDictionary::Event::_TE_ALTER &&
......
...@@ -1265,6 +1265,7 @@ TransporterFacade::get_an_alive_node() ...@@ -1265,6 +1265,7 @@ TransporterFacade::get_an_alive_node()
} }
TransporterFacade::ThreadData::ThreadData(Uint32 size){ TransporterFacade::ThreadData::ThreadData(Uint32 size){
m_use_cnt = 0;
m_firstFree = END_OF_LIST; m_firstFree = END_OF_LIST;
expand(size); expand(size);
} }
...@@ -1302,6 +1303,7 @@ TransporterFacade::ThreadData::open(void* objRef, ...@@ -1302,6 +1303,7 @@ TransporterFacade::ThreadData::open(void* objRef,
nextFree = m_firstFree; nextFree = m_firstFree;
} }
m_use_cnt++;
m_firstFree = m_statusNext[nextFree]; m_firstFree = m_statusNext[nextFree];
Object_Execute oe = { objRef , fun }; Object_Execute oe = { objRef , fun };
...@@ -1318,6 +1320,8 @@ TransporterFacade::ThreadData::close(int number){ ...@@ -1318,6 +1320,8 @@ TransporterFacade::ThreadData::close(int number){
number= numberToIndex(number); number= numberToIndex(number);
assert(getInUse(number)); assert(getInUse(number));
m_statusNext[number] = m_firstFree; m_statusNext[number] = m_firstFree;
assert(m_use_cnt);
m_use_cnt--;
m_firstFree = number; m_firstFree = number;
Object_Execute oe = { 0, 0 }; Object_Execute oe = { 0, 0 };
m_objectExecute[number] = oe; m_objectExecute[number] = oe;
...@@ -1325,6 +1329,12 @@ TransporterFacade::ThreadData::close(int number){ ...@@ -1325,6 +1329,12 @@ TransporterFacade::ThreadData::close(int number){
return 0; return 0;
} }
Uint32
TransporterFacade::get_active_ndb_objects() const
{
return m_threads.m_use_cnt;
}
PollGuard::PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, PollGuard::PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter,
Uint32 block_no) Uint32 block_no)
{ {
......
...@@ -68,6 +68,7 @@ public: ...@@ -68,6 +68,7 @@ public:
// Close this block number // Close this block number
int close(BlockNumber blockNumber, Uint64 trans_id); int close(BlockNumber blockNumber, Uint64 trans_id);
Uint32 get_active_ndb_objects() const;
// Only sends to nodes which are alive // Only sends to nodes which are alive
int sendSignal(NdbApiSignal * signal, NodeId nodeId); int sendSignal(NdbApiSignal * signal, NodeId nodeId);
...@@ -240,6 +241,7 @@ private: ...@@ -240,6 +241,7 @@ private:
NodeStatusFunction m_statusFunction; NodeStatusFunction m_statusFunction;
}; };
Uint32 m_use_cnt;
Uint32 m_firstFree; Uint32 m_firstFree;
Vector<Uint32> m_statusNext; Vector<Uint32> m_statusNext;
Vector<Object_Execute> m_objectExecute; Vector<Object_Execute> m_objectExecute;
......
...@@ -599,5 +599,10 @@ Ndb_cluster_connection::get_next_node(Ndb_cluster_connection_node_iter &iter) ...@@ -599,5 +599,10 @@ Ndb_cluster_connection::get_next_node(Ndb_cluster_connection_node_iter &iter)
return m_impl.get_next_node(iter); return m_impl.get_next_node(iter);
} }
unsigned
Ndb_cluster_connection::get_active_ndb_objects() const
{
return m_impl.m_transporter_facade->get_active_ndb_objects();
}
template class Vector<Ndb_cluster_connection_impl::Node>; template class Vector<Ndb_cluster_connection_impl::Node>;
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment