Commit 4242beb3 authored by mskold@mysql.com's avatar mskold@mysql.com

Merge mskold@bk-internal.mysql.com:/home/bk/mysql-5.0

into  mysql.com:/home/marty/MySQL/mysql-5.0
parents a590e5dc d17c3d3c
...@@ -63,3 +63,83 @@ pk u o ...@@ -63,3 +63,83 @@ pk u o
5 5 5 5 5 5
insert into t1 values (1,1,1); insert into t1 values (1,1,1);
drop table t1; drop table t1;
create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb;
insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3);
begin;
select * from t1 where x = 1 for update;
x y z
1 one 1
begin;
select * from t1 where x = 2 for update;
x y z
2 two 2
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where y = 'one' or y = 'three' for update;
x y z
3 three 3
1 one 1
begin;
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where z > 1 and z < 3 for update;
x y z
2 two 2
begin;
select * from t1 where x = 1 for update;
x y z
1 one 1
select * from t1 where x = 2 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where x = 1 lock in share mode;
x y z
1 one 1
begin;
select * from t1 where x = 1 lock in share mode;
x y z
1 one 1
select * from t1 where x = 2 for update;
x y z
2 two 2
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where y = 'one' or y = 'three' lock in share mode;
x y z
3 three 3
1 one 1
begin;
select * from t1 where y = 'one' lock in share mode;
x y z
1 one 1
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where z > 1 and z < 3 lock in share mode;
x y z
2 two 2
begin;
select * from t1 where z = 1 lock in share mode;
x y z
1 one 1
select * from t1 where x = 1 for update;
x y z
1 one 1
select * from t1 where x = 2 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
drop table t1;
DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t1, t2;
CREATE TABLE t2 ( CREATE TABLE t1 (
a bigint unsigned NOT NULL PRIMARY KEY, a bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY,
b int unsigned not null, b int unsigned not null,
c int unsigned c int unsigned
) engine=ndbcluster; ) engine=ndbcluster;
select count(*) from t2; select count(*) from t1;
count(*) count(*)
5000 5000
truncate table t2; select * from t1 order by a limit 2;
select count(*) from t2; a b c
1 509 2500
2 510 7
truncate table t1;
select count(*) from t1;
count(*) count(*)
0 0
drop table t2; insert into t1 values(NULL,1,1),(NULL,2,2);
select * from t1 order by a;
a b c
1 1 1
2 2 2
drop table t1;
...@@ -69,4 +69,117 @@ insert into t1 values (1,1,1); ...@@ -69,4 +69,117 @@ insert into t1 values (1,1,1);
drop table t1; drop table t1;
# Lock for update
create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb;
insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3);
# PK access
connection con1;
begin;
select * from t1 where x = 1 for update;
connection con2;
begin;
select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# table scan
connection con1;
begin;
select * from t1 where y = 'one' or y = 'three' for update;
connection con2;
begin;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
# Bug #20390 SELECT FOR UPDATE does not release locks of untouched rows in full table scans
#select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# index scan
connection con1;
begin;
select * from t1 where z > 1 and z < 3 for update;
connection con2;
begin;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
select * from t1 where x = 1 for update;
--error 1205
select * from t1 where x = 2 for update;
rollback;
connection con1;
commit;
# share locking
# PK access
connection con1;
begin;
select * from t1 where x = 1 lock in share mode;
connection con2;
begin;
select * from t1 where x = 1 lock in share mode;
select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# table scan
connection con1;
begin;
select * from t1 where y = 'one' or y = 'three' lock in share mode;
connection con2;
begin;
select * from t1 where y = 'one' lock in share mode;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
# Bug #20390 SELECT FOR UPDATE does not release locks of untouched rows in full table scans
#select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# index scan
connection con1;
begin;
select * from t1 where z > 1 and z < 3 lock in share mode;
connection con2;
begin;
select * from t1 where z = 1 lock in share mode;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
select * from t1 where x = 1 for update;
--error 1205
select * from t1 where x = 2 for update;
rollback;
connection con1;
commit;
drop table t1;
# End of 4.1 tests # End of 4.1 tests
...@@ -2,12 +2,11 @@ ...@@ -2,12 +2,11 @@
-- source include/not_embedded.inc -- source include/not_embedded.inc
--disable_warnings --disable_warnings
DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t1, t2;
--enable_warnings --enable_warnings
CREATE TABLE t1 (
CREATE TABLE t2 ( a bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY,
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null, b int unsigned not null,
c int unsigned c int unsigned
) engine=ndbcluster; ) engine=ndbcluster;
...@@ -20,17 +19,23 @@ let $1=500; ...@@ -20,17 +19,23 @@ let $1=500;
disable_query_log; disable_query_log;
while ($1) while ($1)
{ {
eval insert into t2 values($1*10, $1+9, 5*$1), ($1*10+1, $1+10, 7),($1*10+2, $1+10, 7*$1), ($1*10+3, $1+10, 10+$1), ($1*10+4, $1+10, 70*$1), ($1*10+5, $1+10, 7), ($1*10+6, $1+10, 9), ($1*10+7, $1+299, 899), ($1*10+8, $1+10, 12), ($1*10+9, $1+10, 14*$1); eval insert into t1 values(NULL, $1+9, 5*$1), (NULL, $1+10, 7),(NULL, $1+10, 7*$1), (NULL, $1+10, 10+$1), (NULL, $1+10, 70*$1), (NULL, $1+10, 7), (NULL, $1+10, 9), (NULL, $1+299, 899), (NULL, $1+10, 12), (NULL, $1+10, 14*$1);
dec $1; dec $1;
} }
enable_query_log; enable_query_log;
select count(*) from t2; select count(*) from t1;
select * from t1 order by a limit 2;
truncate table t1;
select count(*) from t1;
truncate table t2; insert into t1 values(NULL,1,1),(NULL,2,2);
select count(*) from t2; select * from t1 order by a;
drop table t2; drop table t1;
# End of 4.1 tests # End of 4.1 tests
...@@ -61,11 +61,14 @@ public: ...@@ -61,11 +61,14 @@ public:
Uint32 parallel, Uint32 parallel,
bool order_by, bool order_by,
bool order_desc = false, bool order_desc = false,
bool read_range_no = false) { bool read_range_no = false,
bool keyinfo = false) {
Uint32 scan_flags = Uint32 scan_flags =
(SF_OrderBy & -(Int32)order_by) | (SF_OrderBy & -(Int32)order_by) |
(SF_Descending & -(Int32)order_desc) | (SF_Descending & -(Int32)order_desc) |
(SF_ReadRangeNo & -(Int32)read_range_no); (SF_ReadRangeNo & -(Int32)read_range_no) |
(SF_KeyInfo & -(Int32)keyinfo);
return readTuples(lock_mode, scan_flags, parallel); return readTuples(lock_mode, scan_flags, parallel);
} }
#endif #endif
......
...@@ -44,7 +44,8 @@ public: ...@@ -44,7 +44,8 @@ public:
SF_TupScan = (1 << 16), // scan TUP - only LM_CommittedRead SF_TupScan = (1 << 16), // scan TUP - only LM_CommittedRead
SF_OrderBy = (1 << 24), // index scan in order SF_OrderBy = (1 << 24), // index scan in order
SF_Descending = (2 << 24), // index scan in descending order SF_Descending = (2 << 24), // index scan in descending order
SF_ReadRangeNo = (4 << 24) // enable @ref get_range_no SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no
SF_KeyInfo = 1 // request KeyInfo to be sent back
}; };
/** /**
...@@ -61,15 +62,14 @@ public: ...@@ -61,15 +62,14 @@ public:
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED #ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
/** /**
* readTuples * readTuples
*
* @param lock_mode Lock mode * @param lock_mode Lock mode
* @param batch No of rows to fetch from each fragment at a time * @param batch No of rows to fetch from each fragment at a time
* @param parallel No of fragments to scan in parallell * @param parallel No of fragments to scan in parallell
* @note specifying 0 for batch and parallall means max performance * @note specifying 0 for batch and parallell means max performance
*/ */
#ifdef ndb_readtuples_impossible_overload #ifdef ndb_readtuples_impossible_overload
int readTuples(LockMode lock_mode = LM_Read, int readTuples(LockMode lock_mode = LM_Read,
Uint32 batch = 0, Uint32 parallel = 0); Uint32 batch = 0, Uint32 parallel = 0, bool keyinfo = false);
#endif #endif
inline int readTuples(int parallell){ inline int readTuples(int parallell){
...@@ -141,6 +141,20 @@ public: ...@@ -141,6 +141,20 @@ public:
*/ */
void close(bool forceSend = false, bool releaseOp = false); void close(bool forceSend = false, bool releaseOp = false);
/**
* Lock current tuple
*
* @return an NdbOperation or NULL.
*/
NdbOperation* lockCurrentTuple();
/**
* Lock current tuple
*
* @param lockTrans Transaction that should perform the lock
*
* @return an NdbOperation or NULL.
*/
NdbOperation* lockCurrentTuple(NdbTransaction* lockTrans);
/** /**
* Update current tuple * Update current tuple
* *
...@@ -249,6 +263,19 @@ protected: ...@@ -249,6 +263,19 @@ protected:
NdbRecAttr *m_curr_row; // Pointer to last returned row NdbRecAttr *m_curr_row; // Pointer to last returned row
}; };
inline
NdbOperation*
NdbScanOperation::lockCurrentTuple(){
return lockCurrentTuple(m_transConnection);
}
inline
NdbOperation*
NdbScanOperation::lockCurrentTuple(NdbTransaction* takeOverTrans){
return takeOverScanOp(NdbOperation::ReadRequest,
takeOverTrans);
}
inline inline
NdbOperation* NdbOperation*
NdbScanOperation::updateCurrentTuple(){ NdbScanOperation::updateCurrentTuple(){
......
...@@ -160,7 +160,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -160,7 +160,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
return -1; return -1;
} }
m_keyInfo = lockExcl ? 1 : 0; m_keyInfo = ((scan_flags & SF_KeyInfo) || lockExcl) ? 1 : 0;
bool rangeScan = false; bool rangeScan = false;
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex) if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
...@@ -924,18 +924,28 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans) ...@@ -924,18 +924,28 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
if (newOp == NULL){ if (newOp == NULL){
return NULL; return NULL;
} }
if (!m_keyInfo)
{
// Cannot take over lock if no keyinfo was requested
setErrorCodeAbort(4604);
return NULL;
}
pTrans->theSimpleState = 0; pTrans->theSimpleState = 0;
const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1; const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
newOp->theTupKeyLen = len; newOp->theTupKeyLen = len;
newOp->theOperationType = opType; newOp->theOperationType = opType;
if (opType == DeleteRequest) { switch (opType) {
newOp->theStatus = GetValue; case (ReadRequest):
} else { newOp->theLockMode = theLockMode;
newOp->theStatus = SetValue; // Fall through
case (DeleteRequest):
newOp->theStatus = GetValue;
break;
default:
newOp->theStatus = SetValue;
} }
const Uint32 * src = (Uint32*)tRecAttr->aRef(); const Uint32 * src = (Uint32*)tRecAttr->aRef();
const Uint32 tScanInfo = src[len] & 0x3FFFF; const Uint32 tScanInfo = src[len] & 0x3FFFF;
const Uint32 tTakeOverFragment = src[len] >> 20; const Uint32 tTakeOverFragment = src[len] >> 20;
......
...@@ -290,7 +290,7 @@ ErrorBundle ErrorCodes[] = { ...@@ -290,7 +290,7 @@ ErrorBundle ErrorCodes[] = {
{ 4601, AE, "Transaction is not started"}, { 4601, AE, "Transaction is not started"},
{ 4602, AE, "You must call getNdbOperation before executeScan" }, { 4602, AE, "You must call getNdbOperation before executeScan" },
{ 4603, AE, "There can only be ONE operation in a scan transaction" }, { 4603, AE, "There can only be ONE operation in a scan transaction" },
{ 4604, AE, "takeOverScanOp, opType must be UpdateRequest or DeleteRequest" }, { 4604, AE, "takeOverScanOp, to take over a scanned row one must explicitly request keyinfo in readTuples call" },
{ 4605, AE, "You may only call openScanRead or openScanExclusive once for each operation"}, { 4605, AE, "You may only call openScanRead or openScanExclusive once for each operation"},
{ 4607, AE, "There may only be one operation in a scan transaction"}, { 4607, AE, "There may only be one operation in a scan transaction"},
{ 4608, AE, "You can not takeOverScan unless you have used openScanExclusive"}, { 4608, AE, "You can not takeOverScan unless you have used openScanExclusive"},
......
...@@ -70,7 +70,7 @@ handlerton ndbcluster_hton = { ...@@ -70,7 +70,7 @@ handlerton ndbcluster_hton = {
NULL, /* create_cursor_read_view */ NULL, /* create_cursor_read_view */
NULL, /* set_cursor_read_view */ NULL, /* set_cursor_read_view */
NULL, /* close_cursor_read_view */ NULL, /* close_cursor_read_view */
HTON_NO_FLAGS HTON_CAN_RECREATE
}; };
#define NDB_AUTO_INCREMENT_RETRIES 10 #define NDB_AUTO_INCREMENT_RETRIES 10
...@@ -1174,12 +1174,23 @@ void ha_ndbcluster::release_metadata() ...@@ -1174,12 +1174,23 @@ void ha_ndbcluster::release_metadata()
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{ {
DBUG_ENTER("ha_ndbcluster::get_ndb_lock_type");
if (type >= TL_WRITE_ALLOW_WRITE) if (type >= TL_WRITE_ALLOW_WRITE)
return NdbOperation::LM_Exclusive; {
else if (uses_blob_value(m_retrieve_all_fields)) DBUG_PRINT("info", ("Using exclusive lock"));
return NdbOperation::LM_Read; DBUG_RETURN(NdbOperation::LM_Exclusive);
}
else if (type == TL_READ_WITH_SHARED_LOCKS ||
uses_blob_value(m_retrieve_all_fields))
{
DBUG_PRINT("info", ("Using read lock"));
DBUG_RETURN(NdbOperation::LM_Read);
}
else else
return NdbOperation::LM_CommittedRead; {
DBUG_PRINT("info", ("Using committed read"));
DBUG_RETURN(NdbOperation::LM_CommittedRead);
}
} }
static const ulong index_type_flags[]= static const ulong index_type_flags[]=
...@@ -1679,7 +1690,30 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) ...@@ -1679,7 +1690,30 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
int check; int check;
NdbTransaction *trans= m_active_trans; NdbTransaction *trans= m_active_trans;
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE; if (m_lock_tuple)
{
/*
Lock level m_lock.type either TL_WRITE_ALLOW_WRITE
(SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT
LOCK WITH SHARE MODE) and row was not explictly unlocked
with unlock_row() call
*/
NdbConnection *trans= m_active_trans;
NdbOperation *op;
// Lock row
DBUG_PRINT("info", ("Keeping lock on scanned row"));
if (!(op= m_active_cursor->lockCurrentTuple()))
{
m_lock_tuple= false;
ERR_RETURN(trans->getNdbError());
}
m_ops_pending++;
}
m_lock_tuple= false;
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE &&
m_lock.type != TL_READ_WITH_SHARED_LOCKS;
do { do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/* /*
...@@ -1695,6 +1729,13 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) ...@@ -1695,6 +1729,13 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
if ((check= cursor->nextResult(contact_ndb, m_force_send)) == 0) if ((check= cursor->nextResult(contact_ndb, m_force_send)) == 0)
{ {
/*
Explicitly lock tuple if "select for update" or
"select lock in share mode"
*/
m_lock_tuple= (m_lock.type == TL_WRITE_ALLOW_WRITE
||
m_lock.type == TL_READ_WITH_SHARED_LOCKS);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
else if (check == 1 || check == 2) else if (check == 1 || check == 2)
...@@ -1983,10 +2024,11 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1983,10 +2024,11 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
restart= FALSE; restart= FALSE;
NdbOperation::LockMode lm= NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
bool need_pk = (lm == NdbOperation::LM_Read);
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *) if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index, m_index[active_index].index,
(const NDBTAB *) m_table)) || (const NDBTAB *) m_table)) ||
op->readTuples(lm, 0, parallelism, sorted, descending)) op->readTuples(lm, 0, parallelism, sorted, descending, false, need_pk))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= op; m_active_cursor= op;
} else { } else {
...@@ -2036,8 +2078,11 @@ int ha_ndbcluster::full_table_scan(byte *buf) ...@@ -2036,8 +2078,11 @@ int ha_ndbcluster::full_table_scan(byte *buf)
NdbOperation::LockMode lm= NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
bool need_pk = (lm == NdbOperation::LM_Read);
if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) || if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
op->readTuples(lm, 0, parallelism)) op->readTuples(lm,
(need_pk)?NdbScanOperation::SF_KeyInfo:0,
parallelism))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= op; m_active_cursor= op;
if (generate_scan_filter(m_cond_stack, op)) if (generate_scan_filter(m_cond_stack, op))
...@@ -2327,6 +2372,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -2327,6 +2372,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
DBUG_PRINT("info", ("Calling updateTuple on cursor")); DBUG_PRINT("info", ("Calling updateTuple on cursor"));
if (!(op= cursor->updateCurrentTuple())) if (!(op= cursor->updateCurrentTuple()))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_lock_tuple= false;
m_ops_pending++; m_ops_pending++;
if (uses_blob_value(FALSE)) if (uses_blob_value(FALSE))
m_blobs_pending= TRUE; m_blobs_pending= TRUE;
...@@ -2406,6 +2452,7 @@ int ha_ndbcluster::delete_row(const byte *record) ...@@ -2406,6 +2452,7 @@ int ha_ndbcluster::delete_row(const byte *record)
DBUG_PRINT("info", ("Calling deleteTuple on cursor")); DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
if (cursor->deleteCurrentTuple() != 0) if (cursor->deleteCurrentTuple() != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_lock_tuple= false;
m_ops_pending++; m_ops_pending++;
no_uncommitted_rows_update(-1); no_uncommitted_rows_update(-1);
...@@ -2529,9 +2576,9 @@ void ha_ndbcluster::unpack_record(byte* buf) ...@@ -2529,9 +2576,9 @@ void ha_ndbcluster::unpack_record(byte* buf)
const NdbRecAttr* rec= m_value[hidden_no].rec; const NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec); DBUG_ASSERT(rec);
DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no, DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
hidden_col->getName(), rec->u_64_value())); hidden_col->getName(), rec->u_64_value()));
} }
//print_results(); print_results();
#endif #endif
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -2605,6 +2652,12 @@ int ha_ndbcluster::index_init(uint index) ...@@ -2605,6 +2652,12 @@ int ha_ndbcluster::index_init(uint index)
{ {
DBUG_ENTER("ha_ndbcluster::index_init"); DBUG_ENTER("ha_ndbcluster::index_init");
DBUG_PRINT("enter", ("index: %u", index)); DBUG_PRINT("enter", ("index: %u", index));
/*
Locks are are explicitly released in scan
unless m_lock.type == TL_READ_HIGH_PRIORITY
and no sub-sequent call to unlock_row()
*/
m_lock_tuple= false;
DBUG_RETURN(handler::index_init(index)); DBUG_RETURN(handler::index_init(index));
} }
...@@ -3613,6 +3666,22 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3613,6 +3666,22 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/*
Unlock the last row read in an open scan.
Rows are unlocked by default in ndb, but
for SELECT FOR UPDATE and SELECT LOCK WIT SHARE MODE
locks are kept if unlock_row() is not called.
*/
void ha_ndbcluster::unlock_row()
{
DBUG_ENTER("unlock_row");
DBUG_PRINT("info", ("Unlocking row"));
m_lock_tuple= false;
DBUG_VOID_RETURN;
}
/* /*
Start a transaction for running a statement if one is not Start a transaction for running a statement if one is not
already running in a transaction. This will be the case in already running in a transaction. This will be the case in
...@@ -4085,6 +4154,12 @@ int ha_ndbcluster::create(const char *name, ...@@ -4085,6 +4154,12 @@ int ha_ndbcluster::create(const char *name,
set_dbname(name2); set_dbname(name2);
set_tabname(name2); set_tabname(name2);
if (current_thd->lex->sql_command == SQLCOM_TRUNCATE)
{
DBUG_PRINT("info", ("Dropping and re-creating table for TRUNCATE"));
if ((my_errno= delete_table(name)))
DBUG_RETURN(my_errno);
}
if (create_from_engine) if (create_from_engine)
{ {
/* /*
...@@ -5891,6 +5966,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ...@@ -5891,6 +5966,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
byte *end_of_buffer= (byte*)buffer->buffer_end; byte *end_of_buffer= (byte*)buffer->buffer_end;
NdbOperation::LockMode lm= NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
bool need_pk = (lm == NdbOperation::LM_Read);
const NDBTAB *tab= (const NDBTAB *) m_table; const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index; const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index;
const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index; const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index;
...@@ -5957,7 +6033,8 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ...@@ -5957,7 +6033,8 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
end_of_buffer -= reclength; end_of_buffer -= reclength;
} }
else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab))
&&!scanOp->readTuples(lm, 0, parallelism, sorted, FALSE, TRUE) &&!scanOp->readTuples(lm, 0, parallelism, sorted,
FALSE, TRUE, need_pk)
&&!generate_scan_filter(m_cond_stack, scanOp) &&!generate_scan_filter(m_cond_stack, scanOp)
&&!define_read_attrs(end_of_buffer-reclength, scanOp)) &&!define_read_attrs(end_of_buffer-reclength, scanOp))
{ {
......
...@@ -503,6 +503,7 @@ class ha_ndbcluster: public handler ...@@ -503,6 +503,7 @@ class ha_ndbcluster: public handler
int extra(enum ha_extra_function operation); int extra(enum ha_extra_function operation);
int extra_opt(enum ha_extra_function operation, ulong cache_size); int extra_opt(enum ha_extra_function operation, ulong cache_size);
int external_lock(THD *thd, int lock_type); int external_lock(THD *thd, int lock_type);
void unlock_row();
int start_stmt(THD *thd, thr_lock_type lock_type); int start_stmt(THD *thd, thr_lock_type lock_type);
const char * table_type() const; const char * table_type() const;
const char ** bas_ext() const; const char ** bas_ext() const;
...@@ -684,6 +685,7 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -684,6 +685,7 @@ static void set_tabname(const char *pathname, char *tabname);
char m_tabname[FN_HEADLEN]; char m_tabname[FN_HEADLEN];
ulong m_table_flags; ulong m_table_flags;
THR_LOCK_DATA m_lock; THR_LOCK_DATA m_lock;
bool m_lock_tuple;
NDB_SHARE *m_share; NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY]; NDB_INDEX_DATA m_index[MAX_KEY];
// NdbRecAttr has no reference to blob // NdbRecAttr has no reference to blob
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment