Commit e8c4af12 authored by unknown's avatar unknown

set correct lockmode in all reads...

    move lockmode from scan operation to operation
    added read tuple with lock mode


ndb/include/ndbapi/NdbIndexOperation.hpp:
  added read tuple with lock mode
ndb/include/ndbapi/NdbOperation.hpp:
  move lockmode from scan operation to operation
ndb/include/ndbapi/NdbScanOperation.hpp:
  move lockmode from scan operation to operation
ndb/src/ndbapi/NdbIndexOperation.cpp:
  added read tuple with lock mode
ndb/src/ndbapi/NdbOperationDefine.cpp:
  added read tuple with lock mode
sql/ha_ndbcluster.cc:
  set correct lockmode in all reads...
  moved lockmode from scan operatoin to operation
parent 4888de1d
......@@ -49,6 +49,15 @@ public:
* @{
*/
/**
* Define the NdbIndexOperation to be a standard operation of type readTuple.
* When calling NdbConnection::execute, this operation
* reads a tuple.
*
* @return 0 if successful otherwise -1.
*/
int readTuple(LockMode);
/**
* Define the NdbIndexOperation to be a standard operation of type readTuple.
* When calling NdbConnection::execute, this operation
......
......@@ -51,6 +51,19 @@ public:
* @{
*/
/**
* Lock when performing read
*/
enum LockMode {
LM_Read = 0,
LM_Exclusive = 1,
LM_CommittedRead = 2,
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
LM_Dirty = 2
#endif
};
/**
* Define the NdbOperation to be a standard operation of type insertTuple.
* When calling NdbConnection::execute, this operation
......@@ -88,6 +101,15 @@ public:
*/
virtual int deleteTuple();
/**
* Define the NdbOperation to be a standard operation of type readTuple.
* When calling NdbConnection::execute, this operation
* reads a tuple.
*
* @return 0 if successful otherwise -1.
*/
virtual int readTuple(LockMode);
/**
* Define the NdbOperation to be a standard operation of type readTuple.
* When calling NdbConnection::execute, this operation
......
......@@ -53,18 +53,6 @@ public:
IndexCursor = 2
};
/**
* Lock when performing scan
*/
enum LockMode {
LM_Read = 0,
LM_Exclusive = 1,
LM_CommittedRead = 2,
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
LM_Dirty = 2
#endif
};
/**
* Type of cursor
*/
......
......@@ -85,6 +85,11 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex,
return 0;
}
int NdbIndexOperation::readTuple(NdbOperation::LockMode lm)
{
return NdbOperation::readTuple(lm);
}
int NdbIndexOperation::readTuple()
{
// First check that index is unique
......
......@@ -103,6 +103,24 @@ NdbOperation::writeTuple()
* int readTuple();
*****************************************************************************/
int
NdbOperation::readTuple(NdbOperation::LockMode lm)
{
switch(lm) {
case LM_Read:
return readTuple();
break;
case LM_Exclusive:
return readTupleExclusive();
break;
case LM_CommittedRead:
return readTuple();
break;
};
}
/******************************************************************************
* int readTuple();
*****************************************************************************/
int
NdbOperation::readTuple()
{
NdbConnection* tNdbCon = theNdbCon;
......
......@@ -803,14 +803,14 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
int lm;
if (type == TL_WRITE_ALLOW_WRITE)
lm= NdbScanOperation::LM_Exclusive;
lm= NdbOperation::LM_Exclusive;
else if (uses_blob_value(retrieve_all_fields))
/*
TODO use a new scan mode to read + lock + keyinfo
*/
lm= NdbScanOperation::LM_Exclusive;
lm= NdbOperation::LM_Exclusive;
else
lm= NdbScanOperation::LM_CommittedRead;
lm= NdbOperation::LM_CommittedRead;
return lm;
}
......@@ -952,8 +952,10 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len);
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->readTuple() != 0)
op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY)
......@@ -1021,8 +1023,10 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
// We have allready retrieved all fields, nothing to complement
DBUG_RETURN(0);
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->readTuple() != 0)
op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
int res;
......@@ -1073,10 +1077,12 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbIndexOperation((NDBINDEX *)
m_index[active_index].unique_index,
(const NDBTAB *) m_table)) ||
op->readTuple() != 0)
op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
// Set secondary index key(s)
......@@ -1335,14 +1341,13 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_EXECUTE("enter", print_key(end_key, "end_key"););
index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index,
(const NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if ((op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index,
(const NDBTAB *) m_table)) ||
!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
......@@ -1400,11 +1405,10 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_PRINT("info", ("Starting a new filtered scan on %s",
m_tabname));
if (!(op= trans->getNdbScanOperation((const NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism)))
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
......@@ -1471,11 +1475,10 @@ int ha_ndbcluster::full_table_scan(byte *buf)
DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism)))
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment