Commit 38a445fe authored by mskold@mysql.com's avatar mskold@mysql.com

sql_delete.cc:

  Bug #18864  TRUNCATE TABLE doesn't reset AUTO_INCREMENT value on ndb table: locked lock_OPEN mutex to support TRUNCATE with re-create and cluster binlog
Many files:
  Bug #18864  TRUNCATE TABLE doesn't reset AUTO_INCREMENT value on ndb table: adaption to MySQ Cluster replication
ndb_lock.result, ha_ndbcluster.cc:
  Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: Adaption to 5.1 code
NdbDictionaryImpl.hpp:
  Fix of bad merge
parent be608984
......@@ -63,6 +63,86 @@ pk u o
5 5 5
insert into t1 values (1,1,1);
drop table t1;
create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb;
insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3);
begin;
select * from t1 where x = 1 for update;
x y z
1 one 1
begin;
select * from t1 where x = 2 for update;
x y z
2 two 2
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where y = 'one' or y = 'three' for update;
x y z
3 three 3
1 one 1
begin;
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where z > 1 and z < 3 for update;
x y z
2 two 2
begin;
select * from t1 where x = 1 for update;
x y z
1 one 1
select * from t1 where x = 2 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where x = 1 lock in share mode;
x y z
1 one 1
begin;
select * from t1 where x = 1 lock in share mode;
x y z
1 one 1
select * from t1 where x = 2 for update;
x y z
2 two 2
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where y = 'one' or y = 'three' lock in share mode;
x y z
3 three 3
1 one 1
begin;
select * from t1 where y = 'one' lock in share mode;
x y z
1 one 1
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where z > 1 and z < 3 lock in share mode;
x y z
2 two 2
begin;
select * from t1 where z = 1 lock in share mode;
x y z
1 one 1
select * from t1 where x = 1 for update;
x y z
1 one 1
select * from t1 where x = 2 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
drop table t1;
create table t3 (id2 int) engine=ndb;
lock tables t3 write;
unlock tables;
......
......@@ -146,4 +146,19 @@ c1 c2
3 NULL
4 NULL
5 NULL
TRUNCATE t1;
SELECT count(*) FROM t1;
count(*)
0
INSERT INTO t1 VALUES (101,NULL),(102,NULL),(103,NULL),(104,NULL),(105,NULL),(106,NULL),(107,NULL),(108,NULL),(109,NULL),(1010,NULL);
SELECT count(*) FROM t1;
count(*)
10
SELECT c1 FROM t1 ORDER BY c1 LIMIT 5;
c1
101
102
103
104
105
DROP TABLE t1;
......@@ -172,7 +172,14 @@ connection slave;
# here we would get error 1412 prior to bug
SELECT * FROM t1 ORDER BY c1 LIMIT 5;
--connection master
TRUNCATE t1;
SELECT count(*) FROM t1;
INSERT INTO t1 VALUES (101,NULL),(102,NULL),(103,NULL),(104,NULL),(105,NULL),(106,NULL),(107,NULL),(108,NULL),(109,NULL),(1010,NULL);
--sync_slave_with_master
connection slave;
SELECT count(*) FROM t1;
SELECT c1 FROM t1 ORDER BY c1 LIMIT 5;
# cleanup
--connection master
......
......@@ -1402,7 +1402,8 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
if (type >= TL_WRITE_ALLOW_WRITE)
return NdbOperation::LM_Exclusive;
if (uses_blob_value())
if (type == TL_READ_WITH_SHARED_LOCKS ||
uses_blob_value())
return NdbOperation::LM_Read;
return NdbOperation::LM_CommittedRead;
}
......@@ -1947,7 +1948,30 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
int check;
NdbTransaction *trans= m_active_trans;
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
if (m_lock_tuple)
{
/*
Lock level m_lock.type either TL_WRITE_ALLOW_WRITE
(SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT
LOCK WITH SHARE MODE) and row was not explictly unlocked
with unlock_row() call
*/
NdbConnection *trans= m_active_trans;
NdbOperation *op;
// Lock row
DBUG_PRINT("info", ("Keeping lock on scanned row"));
if (!(op= m_active_cursor->lockCurrentTuple()))
{
m_lock_tuple= false;
ERR_RETURN(trans->getNdbError());
}
m_ops_pending++;
}
m_lock_tuple= false;
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE &&
m_lock.type != TL_READ_WITH_SHARED_LOCKS;;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/*
......@@ -1963,6 +1987,13 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
if ((check= cursor->nextResult(contact_ndb, m_force_send)) == 0)
{
/*
Explicitly lock tuple if "select for update" or
"select lock in share mode"
*/
m_lock_tuple= (m_lock.type == TL_WRITE_ALLOW_WRITE
||
m_lock.type == TL_READ_WITH_SHARED_LOCKS);
DBUG_RETURN(0);
}
else if (check == 1 || check == 2)
......@@ -2258,9 +2289,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
restart= FALSE;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
bool need_pk = (lm == NdbOperation::LM_Read);
if (!(op= trans->getNdbIndexScanOperation(m_index[active_index].index,
m_table)) ||
op->readTuples(lm, 0, parallelism, sorted, descending))
op->readTuples(lm, 0, parallelism, sorted, descending, false, need_pk))
ERR_RETURN(trans->getNdbError());
if (m_use_partition_function && part_spec != NULL &&
part_spec->start_part == part_spec->end_part)
......@@ -2329,8 +2361,11 @@ int ha_ndbcluster::full_table_scan(byte *buf)
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
bool need_pk = (lm == NdbOperation::LM_Read);
if (!(op=trans->getNdbScanOperation(m_table)) ||
op->readTuples(lm, 0, parallelism))
op->readTuples(lm,
(need_pk)?NdbScanOperation::SF_KeyInfo:0,
parallelism))
ERR_RETURN(trans->getNdbError());
m_active_cursor= op;
......@@ -2706,6 +2741,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
DBUG_PRINT("info", ("Calling updateTuple on cursor"));
if (!(op= cursor->updateCurrentTuple()))
ERR_RETURN(trans->getNdbError());
m_lock_tuple= false;
m_ops_pending++;
if (uses_blob_value())
m_blobs_pending= TRUE;
......@@ -2814,6 +2850,7 @@ int ha_ndbcluster::delete_row(const byte *record)
DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
if (cursor->deleteCurrentTuple() != 0)
ERR_RETURN(trans->getNdbError());
m_lock_tuple= false;
m_ops_pending++;
if (m_use_partition_function)
......@@ -3073,6 +3110,13 @@ int ha_ndbcluster::index_init(uint index, bool sorted)
DBUG_PRINT("enter", ("index: %u sorted: %d", index, sorted));
active_index= index;
m_sorted= sorted;
/*
Locks are are explicitly released in scan
unless m_lock.type == TL_READ_HIGH_PRIORITY
and no sub-sequent call to unlock_row()
*/
m_lock_tuple= false;
m_lock_tuple= false;
DBUG_RETURN(0);
}
......@@ -3996,6 +4040,22 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
DBUG_RETURN(error);
}
/*
Unlock the last row read in an open scan.
Rows are unlocked by default in ndb, but
for SELECT FOR UPDATE and SELECT LOCK WIT SHARE MODE
locks are kept if unlock_row() is not called.
*/
void ha_ndbcluster::unlock_row()
{
DBUG_ENTER("unlock_row");
DBUG_PRINT("info", ("Unlocking row"));
m_lock_tuple= false;
DBUG_VOID_RETURN;
}
/*
Start a transaction for running a statement if one is not
already running in a transaction. This will be the case in
......@@ -4413,6 +4473,7 @@ int ha_ndbcluster::create(const char *name,
uint pack_length, length, i, pk_length= 0;
const void *data, *pack_data;
bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE);
bool is_truncate= (current_thd->lex->sql_command == SQLCOM_TRUNCATE);
DBUG_ENTER("ha_ndbcluster::create");
DBUG_PRINT("enter", ("name: %s", name));
......@@ -4421,6 +4482,12 @@ int ha_ndbcluster::create(const char *name,
set_dbname(name);
set_tabname(name);
if (is_truncate)
{
DBUG_PRINT("info", ("Dropping and re-creating table for TRUNCATE"));
if ((my_errno= delete_table(name)))
DBUG_RETURN(my_errno);
}
table= form;
if (create_from_engine)
{
......@@ -4665,7 +4732,9 @@ int ha_ndbcluster::create(const char *name,
share->db, share->table_name,
m_table->getObjectId(),
m_table->getObjectVersion(),
SOT_CREATE_TABLE, 0, 0, 1);
(is_truncate) ?
SOT_TRUNCATE_TABLE : SOT_CREATE_TABLE,
0, 0, 1);
break;
}
}
......@@ -5200,7 +5269,8 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
*/
int table_dropped= dict->getNdbError().code != 709;
if (!IS_TMP_PREFIX(table_name) && share)
if (!IS_TMP_PREFIX(table_name) && share &&
current_thd->lex->sql_command != SQLCOM_TRUNCATE)
{
ndbcluster_log_schema_op(thd, share,
thd->query, thd->query_length,
......@@ -6183,7 +6253,7 @@ static int ndbcluster_init()
#ifdef HAVE_NDB_BINLOG
ndbcluster_binlog_init_handlerton();
#endif
h.flags= HTON_TEMPORARY_NOT_SUPPORTED;
h.flags= HTON_CAN_RECREATE | HTON_TEMPORARY_NOT_SUPPORTED;
}
if (have_ndbcluster != SHOW_OPTION_YES)
......@@ -7430,6 +7500,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
byte *end_of_buffer= (byte*)buffer->buffer_end;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
bool need_pk = (lm == NdbOperation::LM_Read);
const NDBTAB *tab= m_table;
const NDBINDEX *unique_idx= m_index[active_index].unique_index;
const NDBINDEX *idx= m_index[active_index].index;
......@@ -7520,7 +7591,8 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
end_of_buffer -= reclength;
}
else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab))
&&!scanOp->readTuples(lm, 0, parallelism, sorted, FALSE, TRUE)
&&!scanOp->readTuples(lm, 0, parallelism, sorted,
FALSE, TRUE, need_pk)
&&!generate_scan_filter(m_cond_stack, scanOp)
&&!define_read_attrs(end_of_buffer-reclength, scanOp))
{
......
......@@ -1232,6 +1232,9 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
case SOT_LOGFILE_GROUP:
type_str= "logfile group";
break;
case SOT_TRUNCATE_TABLE:
type_str= "truncate table";
break;
default:
abort(); /* should not happen, programming error */
}
......@@ -1765,6 +1768,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
post_epoch_unlock= 1;
break;
case SOT_CREATE_TABLE:
case SOT_TRUNCATE_TABLE:
pthread_mutex_lock(&LOCK_open);
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
{
......
......@@ -49,7 +49,8 @@ enum SCHEMA_OP_TYPE
SOT_CLEAR_SLOCK= 7,
SOT_TABLESPACE= 8,
SOT_LOGFILE_GROUP= 9,
SOT_RENAME_TABLE= 10
SOT_RENAME_TABLE= 10,
SOT_TRUNCATE_TABLE= 11
};
const uint max_ndb_nodes= 64; /* multiple of 32 */
......
......@@ -954,8 +954,10 @@ bool mysql_truncate(THD *thd, TABLE_LIST *table_list, bool dont_send_ok)
// crashes, replacement works. *(path + path_length - reg_ext_length)=
// '\0';
path[path_length - reg_ext_length] = 0;
VOID(pthread_mutex_lock(&LOCK_open));
error= ha_create_table(thd, path, table_list->db, table_list->table_name,
&create_info, 1);
VOID(pthread_mutex_unlock(&LOCK_open));
query_cache_invalidate3(thd, table_list, 0);
end:
......
......@@ -581,7 +581,7 @@ public:
int createIndex(NdbIndexImpl &ix, NdbTableImpl & tab);
int dropIndex(const char * indexName,
const char * tableName);
int dropIndex(NdbIndexImpl &);
int dropIndex(NdbIndexImpl &, const char * tableName);
NdbTableImpl * getIndexTable(NdbIndexImpl * index,
NdbTableImpl * table);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment