Commit d0d8b6c9 authored by pekka@mysql.com's avatar pekka@mysql.com

Merge mysql.com:/space/pekka/ndb/version/my50-bug14509

into  mysql.com:/space/pekka/ndb/version/my51-bug14509
parents a70bfd6c e554f9b3
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t1, t2;
drop database if exists mysqltest;
CREATE TABLE t1 (
a INT NOT NULL,
......@@ -328,3 +328,24 @@ select 'no_copy' from ndb_show_tables where id = @t1_id and name like '%t1%';
no_copy
no_copy
DROP TABLE t1, ndb_show_tables;
create table t1 (a int primary key auto_increment, b int) engine=ndb;
insert into t1 (b) values (101),(102),(103);
select * from t1 where a = 3;
a b
3 103
alter table t1 rename t2;
insert into t2 (b) values (201),(202),(203);
select * from t2 where a = 6;
a b
6 203
alter table t2 add c int;
insert into t2 (b) values (301),(302),(303);
select * from t2 where a = 9;
a b c
9 303 NULL
alter table t2 rename t1;
insert into t1 (b) values (401),(402),(403);
select * from t1 where a = 12;
a b c
12 403 NULL
drop table t1;
......@@ -3,7 +3,7 @@
-- source include/not_embedded.inc
--disable_warnings
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t1, t2;
drop database if exists mysqltest;
--enable_warnings
......@@ -325,6 +325,21 @@ on t1 (c010, c011, c012, c013);
drop table t1;
# simple test that auto incr is not lost at rename or alter
create table t1 (a int primary key auto_increment, b int) engine=ndb;
insert into t1 (b) values (101),(102),(103);
select * from t1 where a = 3;
alter table t1 rename t2;
insert into t2 (b) values (201),(202),(203);
select * from t2 where a = 6;
alter table t2 add c int;
insert into t2 (b) values (301),(302),(303);
select * from t2 where a = 9;
alter table t2 rename t1;
insert into t1 (b) values (401),(402),(403);
select * from t1 where a = 12;
drop table t1;
# End of 4.1 tests
# On-line alter table
......
......@@ -986,6 +986,7 @@ class NdbBlob;
class NdbReceiver;
class TransporterFacade;
class PollGuard;
class Ndb_local_table_info;
template <class T> struct Ndb_free_list_t;
typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
......@@ -1482,15 +1483,12 @@ public:
bool increase = false);
bool setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val,
bool increase = false);
Uint64 getTupleIdFromNdb(const char* aTableName,
Uint32 cacheSize = 1000);
Uint64 getTupleIdFromNdb(Uint32 aTableId,
Uint32 cacheSize = 1000);
Uint64 readTupleIdFromNdb(Uint32 aTableId);
bool setTupleIdInNdb(const char* aTableName, Uint64 val,
bool increase);
bool setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase);
Uint64 opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op);
private:
Uint64 getTupleIdFromNdb(Ndb_local_table_info* info, Uint32 cacheSize);
Uint64 readTupleIdFromNdb(Ndb_local_table_info* info);
bool setTupleIdInNdb(Ndb_local_table_info* info, Uint64 val, bool increase);
Uint64 opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 opValue, Uint32 op);
public:
/**
*/
......@@ -1693,12 +1691,8 @@ private:
Uint64 the_last_check_time;
Uint64 theFirstTransId;
// The tupleId is retreived from DB the
// tupleId is unique for each tableid.
// The tupleId is retrieved from DB
const NdbDictionary::Table *m_sys_tab_0;
Uint64 theFirstTupleId[2048];
Uint64 theLastTupleId[2048];
Uint32 theRestartGCI; // the Restart GCI used by DIHNDBTAMPER
......
......@@ -1645,10 +1645,9 @@ void Ndbcntr::systemErrorLab(Signal* signal, int line)
/* |-2048| # 1 00000001 | */
/* | : | : | */
/* | -1 | # 1 00000001 | */
/* | 0 | 0 | */
/* | 1 | 0 | */
/* | : | : | */
/* | 2047| 0 | */
/* | 1 | 0 | tupleid sequence now created on first use */
/* | : | : | v */
/* | 2048| 0 | v */
/*---------------------------------------------------------------------------*/
void Ndbcntr::createSystableLab(Signal* signal, unsigned index)
{
......@@ -1859,8 +1858,7 @@ void Ndbcntr::crSystab8Lab(Signal* signal)
jam();
ckey = 1;
ctransidPhase = ZFALSE;
crSystab7Lab(signal);
return;
// skip 2nd loop - tupleid sequence now created on first use
}//if
signal->theData[0] = ctcConnectionP;
signal->theData[1] = reference();
......
......@@ -47,6 +47,8 @@ Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl)
{
assert(! is_ndb_blob_table(table_impl));
m_table_impl= table_impl;
m_first_tuple_id = ~(Uint64)0;
m_last_tuple_id = ~(Uint64)0;
}
Ndb_local_table_info::~Ndb_local_table_info()
......
......@@ -33,6 +33,11 @@ public:
static Ndb_local_table_info *create(NdbTableImpl *table_impl, Uint32 sz=0);
static void destroy(Ndb_local_table_info *);
NdbTableImpl *m_table_impl;
// range of cached tuple ids per thread
Uint64 m_first_tuple_id;
Uint64 m_last_tuple_id;
Uint64 m_local_data[1]; // Must be last member. Used to access extra space.
private:
Ndb_local_table_info(NdbTableImpl *table_impl);
......
......@@ -755,11 +755,12 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0)
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(~(Uint64)0);
const NdbTableImpl *table= info->m_table_impl;
Uint64 tupleId = getTupleIdFromNdb(table->m_id, cacheSize);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
}
Uint64 tupleId = getTupleIdFromNdb(info, cacheSize);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(tupleId);
}
......@@ -770,50 +771,58 @@ Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable, Uint32 cacheSize
if (aTable == 0)
DBUG_RETURN(~(Uint64)0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = getTupleIdFromNdb(table->m_id, cacheSize);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
DBUG_RETURN(tupleId);
}
const BaseString& internal_tabname = table->m_internalName;
Uint64
Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
{
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
return ~(Uint64)0;
return getTupleIdFromNdb(table->m_id, cacheSize);
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(~(Uint64)0);
}
Uint64 tupleId = getTupleIdFromNdb(info, cacheSize);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(tupleId);
}
Uint64
Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize)
Ndb::getTupleIdFromNdb(Ndb_local_table_info* info, Uint32 cacheSize)
{
DBUG_ENTER("getTupleIdFromNdb");
if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] )
Uint64 tupleId;
if (info->m_first_tuple_id != info->m_last_tuple_id)
{
theFirstTupleId[aTableId]++;
DBUG_PRINT("info", ("next cached value %ul",
(ulong) theFirstTupleId[aTableId]));
DBUG_RETURN(theFirstTupleId[aTableId]);
assert(info->m_first_tuple_id < info->m_last_tuple_id);
tupleId = ++info->m_first_tuple_id;
DBUG_PRINT("info", ("next cached value %llu", (ulonglong)tupleId));
}
else // theFirstTupleId == theLastTupleId
else
{
DBUG_PRINT("info",("reading %u values from database",
(cacheSize == 0) ? 1 : cacheSize));
DBUG_RETURN(opTupleIdOnNdb(aTableId, (cacheSize == 0) ? 1 : cacheSize, 0));
if (cacheSize == 0)
cacheSize = 1;
DBUG_PRINT("info", ("reading %u values from database", (uint)cacheSize));
/*
* reserve next cacheSize entries in db. adds cacheSize to NEXTID
* and returns first tupleId in the new range.
*/
tupleId = opTupleIdOnNdb(info, cacheSize, 0);
}
DBUG_RETURN(tupleId);
}
Uint64
Ndb::readAutoIncrementValue(const char* aTableName)
{
DBUG_ENTER("readAutoIncrementValue");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0) {
theError= theDictionary->getNdbError();
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(~(Uint64)0);
}
Uint64 tupleId = readTupleIdFromNdb(table->m_id);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
Uint64 tupleId = readTupleIdFromNdb(info);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(tupleId);
}
......@@ -824,19 +833,38 @@ Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable)
if (aTable == 0)
DBUG_RETURN(~(Uint64)0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = readTupleIdFromNdb(table->m_id);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
const BaseString& internal_tabname = table->m_internalName;
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(~(Uint64)0);
}
Uint64 tupleId = readTupleIdFromNdb(info);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(tupleId);
}
Uint64
Ndb::readTupleIdFromNdb(Uint32 aTableId)
Ndb::readTupleIdFromNdb(Ndb_local_table_info* info)
{
if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] )
// Cache is empty, check next in database
return opTupleIdOnNdb(aTableId, 0, 3);
return theFirstTupleId[aTableId] + 1;
DBUG_ENTER("Ndb::readTupleIdFromNdb");
Uint64 tupleId;
if (info->m_first_tuple_id != info->m_last_tuple_id)
{
assert(info->m_first_tuple_id < info->m_last_tuple_id);
tupleId = info->m_first_tuple_id + 1;
}
else
{
/*
* peek at NEXTID. does not reserve it so the value is valid
* only if no other transactions are allowed.
*/
tupleId = opTupleIdOnNdb(info, 0, 3);
}
DBUG_RETURN(tupleId);
}
bool
......@@ -848,11 +876,10 @@ Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError= theDictionary->getNdbError();
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(false);
}
const NdbTableImpl* table= info->m_table_impl;
DBUG_RETURN(setTupleIdInNdb(table->m_id, val, increase));
DBUG_RETURN(setTupleIdInNdb(info, val, increase));
}
bool
......@@ -860,45 +887,47 @@ Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val, bool
{
DBUG_ENTER("setAutoIncrementValue");
if (aTable == 0)
DBUG_RETURN(~(Uint64)0);
DBUG_RETURN(false);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
DBUG_RETURN(setTupleIdInNdb(table->m_id, val, increase));
}
const BaseString& internal_tabname = table->m_internalName;
bool
Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
{
DBUG_ENTER("setTupleIdInNdb(const char*, ...)");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0) {
theError= theDictionary->getNdbError();
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(false);
}
DBUG_RETURN(setTupleIdInNdb(table->m_id, val, increase));
DBUG_RETURN(setTupleIdInNdb(info, val, increase));
}
bool
Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase )
Ndb::setTupleIdInNdb(Ndb_local_table_info* info, Uint64 val, bool increase)
{
DBUG_ENTER("setTupleIdInNdb(Uint32, ...)");
DBUG_ENTER("setTupleIdInNdb");
if (increase)
{
if (theFirstTupleId[aTableId] != theLastTupleId[aTableId])
if (info->m_first_tuple_id != info->m_last_tuple_id)
{
// We have a cache sequence
if (val <= theFirstTupleId[aTableId]+1)
assert(info->m_first_tuple_id < info->m_last_tuple_id);
if (val <= info->m_first_tuple_id + 1)
DBUG_RETURN(false);
if (val <= theLastTupleId[aTableId])
if (val <= info->m_last_tuple_id)
{
theFirstTupleId[aTableId] = val - 1;
info->m_first_tuple_id = val - 1;
DBUG_RETURN(true);
}
// else continue;
}
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 2) == val));
/*
* if value <= NEXTID, do nothing. otherwise update NEXTID to
* value and set cached range to first = last = value - 1.
*/
DBUG_RETURN((opTupleIdOnNdb(info, val, 2) == val));
}
else
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 1) == val));
/*
* update NEXTID to given value. reset cached range.
*/
DBUG_RETURN((opTupleIdOnNdb(info, val, 1) == val));
}
int Ndb::initAutoIncrement()
......@@ -923,9 +952,10 @@ int Ndb::initAutoIncrement()
}
Uint64
Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
Ndb::opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 opValue, Uint32 op)
{
DBUG_ENTER("Ndb::opTupleIdOnNdb");
Uint32 aTableId = info->m_table_impl->m_tableId;
DBUG_PRINT("enter", ("table=%u value=%llu op=%u", aTableId, opValue, op));
NdbTransaction* tConnection;
......@@ -961,20 +991,21 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
tValue = tRecAttrResult->u_64_value();
theFirstTupleId[aTableId] = tValue - opValue;
theLastTupleId[aTableId] = tValue - 1;
ret = theFirstTupleId[aTableId];
info->m_first_tuple_id = tValue - opValue;
info->m_last_tuple_id = tValue - 1;
ret = info->m_first_tuple_id;
break;
case 1:
tOperation->updateTuple();
// create on first use
tOperation->writeTuple();
tOperation->equal("SYSKEY_0", aTableId );
tOperation->setValue("NEXTID", opValue);
if (tConnection->execute( Commit ) == -1 )
goto error_handler;
theFirstTupleId[aTableId] = ~(Uint64)0;
theLastTupleId[aTableId] = ~(Uint64)0;
info->m_first_tuple_id = ~(Uint64)0;
info->m_last_tuple_id = ~(Uint64)0;
ret = opValue;
break;
case 2:
......@@ -982,6 +1013,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
tOperation->equal("SYSKEY_0", aTableId );
tOperation->load_const_u64(1, opValue);
tOperation->read_attr("NEXTID", 2);
// compare NEXTID >= opValue
tOperation->branch_le(2, 1, 0);
tOperation->write_attr("NEXTID", 1);
tOperation->interpret_exit_ok();
......@@ -989,13 +1021,17 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
tOperation->interpret_exit_nok(9999);
if ( (result = tConnection->execute( Commit )) == -1 )
goto error_handler;
if (result == 9999)
{
if (tConnection->theError.code != 9999)
goto error_handler;
// NEXTID >= opValue, return ~(Uint64)0 for now since
// there is no error check...
ret = ~(Uint64)0;
}
else
{
theFirstTupleId[aTableId] = theLastTupleId[aTableId] = opValue - 1;
info->m_first_tuple_id = info->m_last_tuple_id = opValue - 1;
ret = opValue;
}
break;
......
......@@ -100,10 +100,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theConnectionArray[i] = NULL;
}//forg
m_sys_tab_0 = NULL;
for (i = 0; i < 2048 ; i++) {
theFirstTupleId[i] = 0;
theLastTupleId[i] = 0;
}//for
theImpl->m_dbname.assign(aDataBase);
theImpl->m_schemaname.assign(aSchema);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment