Ndb.hpp, Ndb.cpp, ha_ndbcluster.cc:

  Bug#26342 auto_increment_increment AND auto_increment_offset REALLY REALLY anger NDB cluster, implemented support for auto_increment_offset and auto_increment
parent 70210623
......@@ -6056,7 +6056,7 @@ void ha_ndbcluster::get_auto_increment(ulonglong offset, ulonglong increment,
ulonglong nb_desired_values,
ulonglong *first_value,
ulonglong *nb_reserved_values)
{
{
int cache_size;
Uint64 auto_value;
DBUG_ENTER("get_auto_increment");
......@@ -6080,7 +6080,8 @@ void ha_ndbcluster::get_auto_increment(ulonglong offset, ulonglong increment,
ret=
m_skip_auto_increment ?
ndb->readAutoIncrementValue(m_table, g.range, auto_value) :
ndb->getAutoIncrementValue(m_table, g.range, auto_value, cache_size);
ndb->getAutoIncrementValue(m_table, g.range,
auto_value, cache_size, increment, offset);
} while (ret == -1 &&
--retries &&
ndb->getNdbError().status == NdbError::TemporaryError);
......
......@@ -1488,12 +1488,15 @@ public:
int initAutoIncrement();
int getAutoIncrementValue(const char* aTableName,
Uint64 & tupleId, Uint32 cacheSize);
Uint64 & tupleId, Uint32 cacheSize,
Uint64 step = 1, Uint64 start = 1);
int getAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 & tupleId, Uint32 cacheSize);
Uint64 & tupleId, Uint32 cacheSize,
Uint64 step = 1, Uint64 start = 1);
int getAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 & tupleId,
Uint32 cacheSize);
Uint32 cacheSize,
Uint64 step = 1, Uint64 start = 1);
int readAutoIncrementValue(const char* aTableName,
Uint64 & tupleId);
int readAutoIncrementValue(const NdbDictionary::Table * aTable,
......@@ -1510,7 +1513,7 @@ public:
private:
int getTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId,
Uint32 cacheSize);
Uint32 cacheSize, Uint64 step = 1, Uint64 start = 1);
int readTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId);
int setTupleIdInNdb(const NdbTableImpl* table,
......
......@@ -754,17 +754,27 @@ Ndb::getNodeId()
}
/****************************************************************************
Uint64 getTupleIdFromNdb( Uint32 aTableId, Uint32 cacheSize );
Parameters: aTableId : The TableId.
cacheSize: Prefetch this many values
Remark: Returns a new TupleId to the application.
The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
Uint64 getAutoIncrementValue( const char* aTableName,
Uint64 & autoValue,
Uint32 cacheSize,
Uint64 step,
Uint64 start);
Parameters: aTableName (IN) : The table name.
autoValue (OUT) : Returns new autoincrement value
cacheSize (IN) : Prefetch this many values
step (IN) : Specifies the step between the
autoincrement values.
start (IN) : Start value for first value
Remark: Returns a new autoincrement value to the application.
The autoincrement values can be increased by steps
(default 1) and a number of values can be prefetched
by specifying cacheSize (default 10).
****************************************************************************/
int
Ndb::getAutoIncrementValue(const char* aTableName,
Uint64 & tupleId, Uint32 cacheSize)
Uint64 & autoValue, Uint32 cacheSize,
Uint64 step, Uint64 start)
{
DBUG_ENTER("Ndb::getAutoIncrementValue");
ASSERT_NOT_MYSQLD;
......@@ -778,15 +788,16 @@ Ndb::getAutoIncrementValue(const char* aTableName,
}
const NdbTableImpl* table = info->m_table_impl;
TupleIdRange & range = info->m_tuple_id_range;
if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
if (getTupleIdFromNdb(table, range, autoValue, cacheSize, step, start) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %lu", (ulong) tupleId));
DBUG_PRINT("info", ("value %lu", (ulong) autoValue));
DBUG_RETURN(0);
}
int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 & tupleId, Uint32 cacheSize)
Uint64 & autoValue, Uint32 cacheSize,
Uint64 step, Uint64 start)
{
DBUG_ENTER("Ndb::getAutoIncrementValue");
ASSERT_NOT_MYSQLD;
......@@ -801,51 +812,86 @@ Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
if (getTupleIdFromNdb(table, range, autoValue, cacheSize, step, start) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
DBUG_RETURN(0);
}
int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 & tupleId,
Uint32 cacheSize)
TupleIdRange & range, Uint64 & autoValue,
Uint32 cacheSize, Uint64 step, Uint64 start)
{
DBUG_ENTER("Ndb::getAutoIncrementValue");
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
if (getTupleIdFromNdb(table, range, autoValue, cacheSize, step, start) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
DBUG_RETURN(0);
}
int
Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId, Uint32 cacheSize)
TupleIdRange & range, Uint64 & tupleId,
Uint32 cacheSize, Uint64 step, Uint64 start)
{
/*
Returns a new TupleId to the application.
The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
In most cases step= start= 1, in which case we get:
1,2,3,4,5,...
If step=10 and start=5 and first number is 1, we get:
5,15,25,35,...
*/
DBUG_ENTER("Ndb::getTupleIdFromNdb");
if (range.m_first_tuple_id != range.m_last_tuple_id)
/*
Check if the next value can be taken from the pre-fetched
sequence.
*/
if (range.m_first_tuple_id != range.m_last_tuple_id &&
range.m_first_tuple_id + step <= range.m_last_tuple_id)
{
assert(range.m_first_tuple_id < range.m_last_tuple_id);
tupleId = ++range.m_first_tuple_id;
DBUG_PRINT("info", ("next cached value %lu", (ulong)tupleId));
range.m_first_tuple_id += step;
tupleId = range.m_first_tuple_id;
DBUG_PRINT("info", ("Next cached value %lu", (ulong) tupleId));
}
else
{
/*
If start value is greater than step it is ignored
*/
Uint64 offset = (start > step) ? 1 : start;
/*
Pre-fetch a number of values depending on cacheSize
*/
if (cacheSize == 0)
cacheSize = 1;
DBUG_PRINT("info", ("reading %u values from database", (uint)cacheSize));
/*
* reserve next cacheSize entries in db. adds cacheSize to NEXTID
* and returns first tupleId in the new range.
* and returns first tupleId in the new range. If tupleId's are
* incremented in steps then multiply the cacheSize with step size.
*/
Uint64 opValue = cacheSize;
Uint64 opValue = cacheSize * step;
if (opTupleIdOnNdb(table, range, opValue, 0) == -1)
DBUG_RETURN(-1);
tupleId = opValue;
DBUG_PRINT("info", ("Next value fetched from database %lu", (ulong) opValue));
DBUG_PRINT("info", ("Increasing %lu by offset %lu, increment is %lu", (ulong) (ulong) opValue, (ulong) offset, (ulong) step));
Uint64 current, next;
next = ((Uint64) (opValue + step - offset)) / step;
next = next * step + offset;
current = (next < step) ? next : next - step;
tupleId = (opValue <= current) ? current : next;
DBUG_PRINT("info", ("Returning %lu", (ulong) tupleId));
range.m_first_tuple_id = tupleId;
}
DBUG_RETURN(0);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment