Commit d749d905 authored by unknown's avatar unknown

Merge mysql.com:/space/pekka/ndb/version/my51

into  mysql.com:/space/pekka/ndb/version/my51-bug14509


mysql-test/r/ndb_basic.result:
  Auto merged
sql/ha_ndbcluster.cc:
  Auto merged
sql/ha_ndbcluster.h:
  Auto merged
storage/ndb/include/ndbapi/Ndb.hpp:
  Auto merged
storage/ndb/src/ndbapi/DictCache.cpp:
  Auto merged
storage/ndb/src/ndbapi/DictCache.hpp:
  Auto merged
storage/ndb/src/ndbapi/Ndb.cpp:
  Auto merged
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Auto merged
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp:
  Auto merged
parents 52078846 2aa3ff8e
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1, t2;
drop database if exists mysqltest; drop database if exists mysqltest;
CREATE TABLE t1 ( CREATE TABLE t1 (
a INT NOT NULL, a INT NOT NULL,
...@@ -328,3 +328,24 @@ select 'no_copy' from ndb_show_tables where id = @t1_id and name like '%t1%'; ...@@ -328,3 +328,24 @@ select 'no_copy' from ndb_show_tables where id = @t1_id and name like '%t1%';
no_copy no_copy
no_copy no_copy
DROP TABLE t1, ndb_show_tables; DROP TABLE t1, ndb_show_tables;
create table t1 (a int primary key auto_increment, b int) engine=ndb;
insert into t1 (b) values (101),(102),(103);
select * from t1 where a = 3;
a b
3 103
alter table t1 rename t2;
insert into t2 (b) values (201),(202),(203);
select * from t2 where a = 6;
a b
6 203
alter table t2 add c int;
insert into t2 (b) values (301),(302),(303);
select * from t2 where a = 9;
a b c
9 303 NULL
alter table t2 rename t1;
insert into t1 (b) values (401),(402),(403);
select * from t1 where a = 12;
a b c
12 403 NULL
drop table t1;
...@@ -642,30 +642,30 @@ counter datavalue ...@@ -642,30 +642,30 @@ counter datavalue
6 newval 6 newval
7 newval 7 newval
8 newval 8 newval
35 newval 9 newval
36 newval 10 newval
37 newval 11 newval
38 newval 12 newval
39 newval 13 newval
40 newval 14 newval
41 newval 15 newval
42 newval 16 newval
43 newval 17 newval
44 newval 18 newval
45 newval 19 newval
46 newval 20 newval
47 newval 21 newval
48 newval 22 newval
49 newval 23 newval
50 newval 24 newval
51 newval 25 newval
52 newval 26 newval
53 newval 27 newval
54 newval 28 newval
55 newval 29 newval
56 newval 30 newval
57 newval 31 newval
58 newval 32 newval
drop table t1; drop table t1;
CREATE TABLE t1 ( b INT ) PACK_KEYS = 0 ENGINE = ndb; CREATE TABLE t1 ( b INT ) PACK_KEYS = 0 ENGINE = ndb;
select * from t1; select * from t1;
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
-- source include/not_embedded.inc -- source include/not_embedded.inc
--disable_warnings --disable_warnings
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1, t2;
drop database if exists mysqltest; drop database if exists mysqltest;
--enable_warnings --enable_warnings
...@@ -383,3 +383,18 @@ LOAD DATA INFILE 'tmp.dat' INTO TABLE ndb_show_tables; ...@@ -383,3 +383,18 @@ LOAD DATA INFILE 'tmp.dat' INTO TABLE ndb_show_tables;
select 'no_copy' from ndb_show_tables where id = @t1_id and name like '%t1%'; select 'no_copy' from ndb_show_tables where id = @t1_id and name like '%t1%';
DROP TABLE t1, ndb_show_tables; DROP TABLE t1, ndb_show_tables;
# simple test that auto incr is not lost at rename or alter
create table t1 (a int primary key auto_increment, b int) engine=ndb;
insert into t1 (b) values (101),(102),(103);
select * from t1 where a = 3;
alter table t1 rename t2;
insert into t2 (b) values (201),(202),(203);
select * from t2 where a = 6;
alter table t2 add c int;
insert into t2 (b) values (301),(302),(303);
select * from t2 where a = 9;
alter table t2 rename t1;
insert into t1 (b) values (401),(402),(403);
select * from t1 where a = 12;
drop table t1;
...@@ -106,7 +106,6 @@ static uint ndbcluster_alter_table_flags(uint flags) ...@@ -106,7 +106,6 @@ static uint ndbcluster_alter_table_flags(uint flags)
} }
#define NDB_FAILED_AUTO_INCREMENT ~(Uint64)0
#define NDB_AUTO_INCREMENT_RETRIES 10 #define NDB_AUTO_INCREMENT_RETRIES 10
#define ERR_PRINT(err) \ #define ERR_PRINT(err) \
...@@ -2465,14 +2464,16 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2465,14 +2464,16 @@ int ha_ndbcluster::write_row(byte *record)
{ {
// Table has hidden primary key // Table has hidden primary key
Ndb *ndb= get_ndb(); Ndb *ndb= get_ndb();
Uint64 auto_value= NDB_FAILED_AUTO_INCREMENT; int ret;
Uint64 auto_value;
uint retries= NDB_AUTO_INCREMENT_RETRIES; uint retries= NDB_AUTO_INCREMENT_RETRIES;
do { do {
auto_value= ndb->getAutoIncrementValue(m_table); Ndb_tuple_id_range_guard g(m_share);
} while (auto_value == NDB_FAILED_AUTO_INCREMENT && ret= ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1);
} while (ret == -1 &&
--retries && --retries &&
ndb->getNdbError().status == NdbError::TemporaryError); ndb->getNdbError().status == NdbError::TemporaryError);
if (auto_value == NDB_FAILED_AUTO_INCREMENT) if (ret == -1)
ERR_RETURN(ndb->getNdbError()); ERR_RETURN(ndb->getNdbError());
if (set_hidden_key(op, table_share->fields, (const byte*)&auto_value)) if (set_hidden_key(op, table_share->fields, (const byte*)&auto_value))
ERR_RETURN(op->getNdbError()); ERR_RETURN(op->getNdbError());
...@@ -2567,11 +2568,12 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2567,11 +2568,12 @@ int ha_ndbcluster::write_row(byte *record)
Ndb *ndb= get_ndb(); Ndb *ndb= get_ndb();
Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1; Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
DBUG_PRINT("info", DBUG_PRINT("info",
("Trying to set next auto increment value to %lu", ("Trying to set next auto increment value to %llu",
(ulong) next_val)); (ulonglong) next_val));
if (ndb->setAutoIncrementValue(m_table, next_val, TRUE)) Ndb_tuple_id_range_guard g(m_share);
DBUG_PRINT("info", if (ndb->setAutoIncrementValue(m_table, g.range, next_val, TRUE)
("Setting next auto increment value to %u", next_val)); == -1)
ERR_RETURN(ndb->getNdbError());
} }
m_skip_auto_increment= TRUE; m_skip_auto_increment= TRUE;
...@@ -3542,9 +3544,16 @@ void ha_ndbcluster::info(uint flag) ...@@ -3542,9 +3544,16 @@ void ha_ndbcluster::info(uint flag)
if (m_table) if (m_table)
{ {
Ndb *ndb= get_ndb(); Ndb *ndb= get_ndb();
Ndb_tuple_id_range_guard g(m_share);
auto_increment_value= if (ndb->readAutoIncrementValue(m_table, g.range,
ndb->readAutoIncrementValue(m_table); auto_increment_value) == -1)
{
const NdbError err= ndb->getNdbError();
sql_print_error("Error %lu in readAutoIncrementValue(): %s",
(ulong) err.code, err.message);
auto_increment_value= ~(Uint64)0;
}
} }
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -5236,17 +5245,18 @@ ulonglong ha_ndbcluster::get_auto_increment() ...@@ -5236,17 +5245,18 @@ ulonglong ha_ndbcluster::get_auto_increment()
m_rows_to_insert - m_rows_inserted : m_rows_to_insert - m_rows_inserted :
((m_rows_to_insert > m_autoincrement_prefetch) ? ((m_rows_to_insert > m_autoincrement_prefetch) ?
m_rows_to_insert : m_autoincrement_prefetch)); m_rows_to_insert : m_autoincrement_prefetch));
auto_value= NDB_FAILED_AUTO_INCREMENT; int ret;
uint retries= NDB_AUTO_INCREMENT_RETRIES; uint retries= NDB_AUTO_INCREMENT_RETRIES;
do { do {
auto_value= Ndb_tuple_id_range_guard g(m_share);
(m_skip_auto_increment) ? ret=
ndb->readAutoIncrementValue(m_table) m_skip_auto_increment ?
: ndb->getAutoIncrementValue(m_table, cache_size); ndb->readAutoIncrementValue(m_table, g.range, auto_value) :
} while (auto_value == NDB_FAILED_AUTO_INCREMENT && ndb->getAutoIncrementValue(m_table, g.range, auto_value, cache_size);
} while (ret == -1 &&
--retries && --retries &&
ndb->getNdbError().status == NdbError::TemporaryError); ndb->getNdbError().status == NdbError::TemporaryError);
if (auto_value == NDB_FAILED_AUTO_INCREMENT) if (ret == -1)
{ {
const NdbError err= ndb->getNdbError(); const NdbError err= ndb->getNdbError();
sql_print_error("Error %lu in ::get_auto_increment(): %s", sql_print_error("Error %lu in ::get_auto_increment(): %s",
......
...@@ -106,6 +106,7 @@ typedef struct st_ndbcluster_share { ...@@ -106,6 +106,7 @@ typedef struct st_ndbcluster_share {
ulonglong commit_count; ulonglong commit_count;
char *db; char *db;
char *table_name; char *table_name;
Ndb::TupleIdRange tuple_id_range;
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
uint32 flags; uint32 flags;
NdbEventOperation *op; NdbEventOperation *op;
...@@ -139,6 +140,19 @@ set_ndb_share_state(NDB_SHARE *share, NDB_SHARE_STATE state) ...@@ -139,6 +140,19 @@ set_ndb_share_state(NDB_SHARE *share, NDB_SHARE_STATE state)
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
} }
struct Ndb_tuple_id_range_guard {
Ndb_tuple_id_range_guard(NDB_SHARE* _share) :
share(_share),
range(share->tuple_id_range) {
pthread_mutex_lock(&share->mutex);
}
~Ndb_tuple_id_range_guard() {
pthread_mutex_unlock(&share->mutex);
}
NDB_SHARE* share;
Ndb::TupleIdRange& range;
};
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/* NDB_SHARE.flags */ /* NDB_SHARE.flags */
#define NSF_HIDDEN_PK 1 /* table has hidden primary key */ #define NSF_HIDDEN_PK 1 /* table has hidden primary key */
...@@ -726,7 +740,6 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -726,7 +740,6 @@ static void set_tabname(const char *pathname, char *tabname);
int drop_indexes(Ndb *ndb, TABLE *tab); int drop_indexes(Ndb *ndb, TABLE *tab);
int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict, int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict,
KEY *key_info, const char *index_name, uint index_no); KEY *key_info, const char *index_name, uint index_no);
int initialize_autoincrement(const void *table);
int get_metadata(const char* path); int get_metadata(const char* path);
void release_metadata(THD *thd, Ndb *ndb); void release_metadata(THD *thd, Ndb *ndb);
NDB_INDEX_TYPE get_index_type(uint idx_no) const; NDB_INDEX_TYPE get_index_type(uint idx_no) const;
......
...@@ -986,6 +986,7 @@ class NdbBlob; ...@@ -986,6 +986,7 @@ class NdbBlob;
class NdbReceiver; class NdbReceiver;
class TransporterFacade; class TransporterFacade;
class PollGuard; class PollGuard;
class Ndb_local_table_info;
template <class T> struct Ndb_free_list_t; template <class T> struct Ndb_free_list_t;
typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*); typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
...@@ -1463,35 +1464,58 @@ public: ...@@ -1463,35 +1464,58 @@ public:
/** /**
* Return a unique tuple id for a table. The id sequence is * Return a unique tuple id for a table. The id sequence is
* ascending but may contain gaps. * ascending but may contain gaps. Methods which have no
* TupleIdRange argument use NDB API dict cache. They may
* not be called from mysqld.
* *
* @param aTableName table name * @param aTableName table name
* *
* @param cacheSize number of values to cache in this Ndb object * @param cacheSize number of values to cache in this Ndb object
* *
* @return tuple id or 0 on error * @return 0 or -1 on error, and tupleId in out parameter
*/ */
struct TupleIdRange {
Uint64 m_first_tuple_id;
Uint64 m_last_tuple_id;
void reset() {
m_first_tuple_id = ~(Uint64)0;
m_last_tuple_id = ~(Uint64)0;
};
};
int initAutoIncrement(); int initAutoIncrement();
Uint64 getAutoIncrementValue(const char* aTableName, int getAutoIncrementValue(const char* aTableName,
Uint32 cacheSize = 1); Uint64 & tupleId, Uint32 cacheSize);
Uint64 getAutoIncrementValue(const NdbDictionary::Table * aTable, int getAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint32 cacheSize = 1); Uint64 & tupleId, Uint32 cacheSize);
Uint64 readAutoIncrementValue(const char* aTableName); int getAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 readAutoIncrementValue(const NdbDictionary::Table * aTable); TupleIdRange & range, Uint64 & tupleId,
bool setAutoIncrementValue(const char* aTableName, Uint64 val, Uint32 cacheSize);
bool increase = false); int readAutoIncrementValue(const char* aTableName,
bool setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val, Uint64 & tupleId);
bool increase = false); int readAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 getTupleIdFromNdb(const char* aTableName, Uint64 & tupleId);
Uint32 cacheSize = 1000); int readAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 getTupleIdFromNdb(Uint32 aTableId, TupleIdRange & range, Uint64 & tupleId);
Uint32 cacheSize = 1000); int setAutoIncrementValue(const char* aTableName,
Uint64 readTupleIdFromNdb(Uint32 aTableId); Uint64 tupleId, bool increase);
bool setTupleIdInNdb(const char* aTableName, Uint64 val, int setAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 tupleId, bool increase);
int setAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 tupleId,
bool increase); bool increase);
bool setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase); private:
Uint64 opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op); int getTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId,
Uint32 cacheSize);
int readTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId);
int setTupleIdInNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 tupleId, bool increase);
int opTupleIdOnNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & opValue, Uint32 op);
public:
/** /**
*/ */
...@@ -1694,12 +1718,8 @@ private: ...@@ -1694,12 +1718,8 @@ private:
Uint64 the_last_check_time; Uint64 the_last_check_time;
Uint64 theFirstTransId; Uint64 theFirstTransId;
// The tupleId is retrieved from DB
// The tupleId is retreived from DB the
// tupleId is unique for each tableid.
const NdbDictionary::Table *m_sys_tab_0; const NdbDictionary::Table *m_sys_tab_0;
Uint64 theFirstTupleId[2048];
Uint64 theLastTupleId[2048];
Uint32 theRestartGCI; // the Restart GCI used by DIHNDBTAMPER Uint32 theRestartGCI; // the Restart GCI used by DIHNDBTAMPER
......
...@@ -1645,10 +1645,9 @@ void Ndbcntr::systemErrorLab(Signal* signal, int line) ...@@ -1645,10 +1645,9 @@ void Ndbcntr::systemErrorLab(Signal* signal, int line)
/* |-2048| # 1 00000001 | */ /* |-2048| # 1 00000001 | */
/* | : | : | */ /* | : | : | */
/* | -1 | # 1 00000001 | */ /* | -1 | # 1 00000001 | */
/* | 0 | 0 | */ /* | 1 | 0 | tupleid sequence now created on first use */
/* | 1 | 0 | */ /* | : | : | v */
/* | : | : | */ /* | 2048| 0 | v */
/* | 2047| 0 | */
/*---------------------------------------------------------------------------*/ /*---------------------------------------------------------------------------*/
void Ndbcntr::createSystableLab(Signal* signal, unsigned index) void Ndbcntr::createSystableLab(Signal* signal, unsigned index)
{ {
...@@ -1859,8 +1858,7 @@ void Ndbcntr::crSystab8Lab(Signal* signal) ...@@ -1859,8 +1858,7 @@ void Ndbcntr::crSystab8Lab(Signal* signal)
jam(); jam();
ckey = 1; ckey = 1;
ctransidPhase = ZFALSE; ctransidPhase = ZFALSE;
crSystab7Lab(signal); // skip 2nd loop - tupleid sequence now created on first use
return;
}//if }//if
signal->theData[0] = ctcConnectionP; signal->theData[0] = ctcConnectionP;
signal->theData[1] = reference(); signal->theData[1] = reference();
......
...@@ -47,6 +47,7 @@ Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl) ...@@ -47,6 +47,7 @@ Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl)
{ {
assert(! is_ndb_blob_table(table_impl)); assert(! is_ndb_blob_table(table_impl));
m_table_impl= table_impl; m_table_impl= table_impl;
m_tuple_id_range.reset();
} }
Ndb_local_table_info::~Ndb_local_table_info() Ndb_local_table_info::~Ndb_local_table_info()
......
...@@ -33,6 +33,10 @@ public: ...@@ -33,6 +33,10 @@ public:
static Ndb_local_table_info *create(NdbTableImpl *table_impl, Uint32 sz=0); static Ndb_local_table_info *create(NdbTableImpl *table_impl, Uint32 sz=0);
static void destroy(Ndb_local_table_info *); static void destroy(Ndb_local_table_info *);
NdbTableImpl *m_table_impl; NdbTableImpl *m_table_impl;
// range of cached tuple ids per thread
Ndb::TupleIdRange m_tuple_id_range;
Uint64 m_local_data[1]; // Must be last member. Used to access extra space. Uint64 m_local_data[1]; // Must be last member. Used to access extra space.
private: private:
Ndb_local_table_info(NdbTableImpl *table_impl); Ndb_local_table_info(NdbTableImpl *table_impl);
......
...@@ -747,158 +747,271 @@ Remark: Returns a new TupleId to the application. ...@@ -747,158 +747,271 @@ Remark: Returns a new TupleId to the application.
The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId. The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp. It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
****************************************************************************/ ****************************************************************************/
Uint64 int
Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize) Ndb::getAutoIncrementValue(const char* aTableName,
Uint64 & tupleId, Uint32 cacheSize)
{ {
DBUG_ENTER("getAutoIncrementValue"); DBUG_ENTER("Ndb::getAutoIncrementValue");
ASSERT_NOT_MYSQLD;
BaseString internal_tabname(internalize_table_name(aTableName)); BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info= Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname); theDictionary->get_local_table_info(internal_tabname);
if (info == 0) if (info == 0) {
DBUG_RETURN(~(Uint64)0); theError.code = theDictionary->getNdbError().code;
const NdbTableImpl *table= info->m_table_impl; DBUG_RETURN(-1);
Uint64 tupleId = getTupleIdFromNdb(table->m_id, cacheSize); }
DBUG_PRINT("info", ("value %ul", (ulong) tupleId)); const NdbTableImpl* table = info->m_table_impl;
DBUG_RETURN(tupleId); TupleIdRange & range = info->m_tuple_id_range;
if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
} }
Uint64 int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable, Uint32 cacheSize) Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 & tupleId, Uint32 cacheSize)
{ {
DBUG_ENTER("getAutoIncrementValue"); DBUG_ENTER("Ndb::getAutoIncrementValue");
if (aTable == 0) ASSERT_NOT_MYSQLD;
DBUG_RETURN(~(Uint64)0); assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = getTupleIdFromNdb(table->m_id, cacheSize); const BaseString& internal_tabname = table->m_internalName;
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
DBUG_RETURN(tupleId); Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
} }
Uint64 int
Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize) Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 & tupleId,
Uint32 cacheSize)
{ {
const NdbTableImpl* table = theDictionary->getTable(aTableName); DBUG_ENTER("Ndb::getAutoIncrementValue");
if (table == 0) assert(aTable != 0);
return ~(Uint64)0; const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
return getTupleIdFromNdb(table->m_id, cacheSize);
if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
} }
Uint64 int
Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize) Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId, Uint32 cacheSize)
{ {
DBUG_ENTER("getTupleIdFromNdb"); DBUG_ENTER("Ndb::getTupleIdFromNdb");
if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] ) if (range.m_first_tuple_id != range.m_last_tuple_id)
{ {
theFirstTupleId[aTableId]++; assert(range.m_first_tuple_id < range.m_last_tuple_id);
DBUG_PRINT("info", ("next cached value %ul", tupleId = ++range.m_first_tuple_id;
(ulong) theFirstTupleId[aTableId])); DBUG_PRINT("info", ("next cached value %llu", (ulonglong)tupleId));
DBUG_RETURN(theFirstTupleId[aTableId]);
} }
else // theFirstTupleId == theLastTupleId else
{ {
DBUG_PRINT("info",("reading %u values from database", if (cacheSize == 0)
(cacheSize == 0) ? 1 : cacheSize)); cacheSize = 1;
DBUG_RETURN(opTupleIdOnNdb(aTableId, (cacheSize == 0) ? 1 : cacheSize, 0)); DBUG_PRINT("info", ("reading %u values from database", (uint)cacheSize));
/*
* reserve next cacheSize entries in db. adds cacheSize to NEXTID
* and returns first tupleId in the new range.
*/
Uint64 opValue = cacheSize;
if (opTupleIdOnNdb(table, range, opValue, 0) == -1)
DBUG_RETURN(-1);
tupleId = opValue;
} }
DBUG_RETURN(0);
} }
Uint64 int
Ndb::readAutoIncrementValue(const char* aTableName) Ndb::readAutoIncrementValue(const char* aTableName,
Uint64 & tupleId)
{ {
DBUG_ENTER("readAutoIncrementValue"); DBUG_ENTER("Ndb::readAutoIncrementValue");
const NdbTableImpl* table = theDictionary->getTable(aTableName); ASSERT_NOT_MYSQLD;
if (table == 0) { BaseString internal_tabname(internalize_table_name(aTableName));
theError= theDictionary->getNdbError();
DBUG_RETURN(~(Uint64)0); Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
} }
Uint64 tupleId = readTupleIdFromNdb(table->m_id); const NdbTableImpl* table = info->m_table_impl;
DBUG_PRINT("info", ("value %ul", (ulong) tupleId)); TupleIdRange & range = info->m_tuple_id_range;
DBUG_RETURN(tupleId); if (readTupleIdFromNdb(table, range, tupleId) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
} }
Uint64 int
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable) Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 & tupleId)
{ {
DBUG_ENTER("readAutoIncrementValue"); DBUG_ENTER("Ndb::readAutoIncrementValue");
if (aTable == 0) ASSERT_NOT_MYSQLD;
DBUG_RETURN(~(Uint64)0); assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = readTupleIdFromNdb(table->m_id); const BaseString& internal_tabname = table->m_internalName;
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
DBUG_RETURN(tupleId); Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
if (readTupleIdFromNdb(table, range, tupleId) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
} }
Uint64 int
Ndb::readTupleIdFromNdb(Uint32 aTableId) Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 & tupleId)
{ {
if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] ) DBUG_ENTER("Ndb::readAutoIncrementValue");
// Cache is empty, check next in database assert(aTable != 0);
return opTupleIdOnNdb(aTableId, 0, 3); const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
return theFirstTupleId[aTableId] + 1; if (readTupleIdFromNdb(table, range, tupleId) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(0);
} }
bool int
Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase) Ndb::readTupleIdFromNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & tupleId)
{ {
DBUG_ENTER("setAutoIncrementValue"); DBUG_ENTER("Ndb::readTupleIdFromNdb");
if (range.m_first_tuple_id != range.m_last_tuple_id)
{
assert(range.m_first_tuple_id < range.m_last_tuple_id);
tupleId = range.m_first_tuple_id + 1;
}
else
{
/*
* peek at NEXTID. does not reserve it so the value is valid
* only if no other transactions are allowed.
*/
Uint64 opValue = 0;
if (opTupleIdOnNdb(table, range, opValue, 3) == -1)
DBUG_RETURN(-1);
tupleId = opValue;
}
DBUG_RETURN(0);
}
int
Ndb::setAutoIncrementValue(const char* aTableName,
Uint64 tupleId, bool increase)
{
DBUG_ENTER("Ndb::setAutoIncrementValue");
ASSERT_NOT_MYSQLD;
BaseString internal_tabname(internalize_table_name(aTableName)); BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info= Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname); theDictionary->get_local_table_info(internal_tabname);
if (info == 0) { if (info == 0) {
theError= theDictionary->getNdbError(); theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(false); DBUG_RETURN(-1);
} }
const NdbTableImpl* table= info->m_table_impl; const NdbTableImpl* table = info->m_table_impl;
DBUG_RETURN(setTupleIdInNdb(table->m_id, val, increase)); TupleIdRange & range = info->m_tuple_id_range;
if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
} }
bool int
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val, bool increase) Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
Uint64 tupleId, bool increase)
{ {
DBUG_ENTER("setAutoIncrementValue"); DBUG_ENTER("Ndb::setAutoIncrementValue");
if (aTable == 0) ASSERT_NOT_MYSQLD;
DBUG_RETURN(~(Uint64)0); assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
DBUG_RETURN(setTupleIdInNdb(table->m_id, val, increase)); const BaseString& internal_tabname = table->m_internalName;
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
} }
bool int
Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase ) Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
TupleIdRange & range, Uint64 tupleId,
bool increase)
{ {
DBUG_ENTER("setTupleIdInNdb(const char*, ...)"); DBUG_ENTER("Ndb::setAutoIncrementValue");
const NdbTableImpl* table = theDictionary->getTable(aTableName); assert(aTable != 0);
if (table == 0) { const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
theError= theDictionary->getNdbError();
DBUG_RETURN(false); if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
} DBUG_RETURN(-1);
DBUG_RETURN(setTupleIdInNdb(table->m_id, val, increase)); DBUG_RETURN(0);
} }
bool int
Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase ) Ndb::setTupleIdInNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 tupleId, bool increase)
{ {
DBUG_ENTER("setTupleIdInNdb(Uint32, ...)"); DBUG_ENTER("Ndb::setTupleIdInNdb");
if (increase) if (increase)
{ {
if (theFirstTupleId[aTableId] != theLastTupleId[aTableId]) if (range.m_first_tuple_id != range.m_last_tuple_id)
{ {
// We have a cache sequence assert(range.m_first_tuple_id < range.m_last_tuple_id);
if (val <= theFirstTupleId[aTableId]+1) if (tupleId <= range.m_first_tuple_id + 1)
DBUG_RETURN(false); DBUG_RETURN(0);
if (val <= theLastTupleId[aTableId]) if (tupleId <= range.m_last_tuple_id)
{ {
theFirstTupleId[aTableId] = val - 1; range.m_first_tuple_id = tupleId - 1;
DBUG_RETURN(true); DBUG_PRINT("info",
("Setting next auto increment cached value to %llu",
(ulonglong)tupleId));
DBUG_RETURN(0);
} }
// else continue;
} }
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 2) == val)); /*
* if tupleId <= NEXTID, do nothing. otherwise update NEXTID to
* tupleId and set cached range to first = last = tupleId - 1.
*/
if (opTupleIdOnNdb(table, range, tupleId, 2) == -1)
DBUG_RETURN(-1);
} }
else else
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 1) == val)); {
/*
* update NEXTID to given value. reset cached range.
*/
if (opTupleIdOnNdb(table, range, tupleId, 1) == -1)
DBUG_RETURN(-1);
}
DBUG_RETURN(0);
} }
int Ndb::initAutoIncrement() int Ndb::initAutoIncrement()
...@@ -922,18 +1035,18 @@ int Ndb::initAutoIncrement() ...@@ -922,18 +1035,18 @@ int Ndb::initAutoIncrement()
return (m_sys_tab_0 == NULL); return (m_sys_tab_0 == NULL);
} }
Uint64 int
Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & opValue, Uint32 op)
{ {
DBUG_ENTER("Ndb::opTupleIdOnNdb"); DBUG_ENTER("Ndb::opTupleIdOnNdb");
Uint32 aTableId = table->m_id;
DBUG_PRINT("enter", ("table=%u value=%llu op=%u", aTableId, opValue, op)); DBUG_PRINT("enter", ("table=%u value=%llu op=%u", aTableId, opValue, op));
NdbTransaction* tConnection; NdbTransaction* tConnection;
NdbOperation* tOperation= 0; // Compiler warning if not initialized NdbOperation* tOperation= 0; // Compiler warning if not initialized
Uint64 tValue; Uint64 tValue;
NdbRecAttr* tRecAttrResult; NdbRecAttr* tRecAttrResult;
int result;
Uint64 ret;
CHECK_STATUS_MACRO_ZERO; CHECK_STATUS_MACRO_ZERO;
...@@ -961,42 +1074,44 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) ...@@ -961,42 +1074,44 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
tValue = tRecAttrResult->u_64_value(); tValue = tRecAttrResult->u_64_value();
theFirstTupleId[aTableId] = tValue - opValue; range.m_first_tuple_id = tValue - opValue;
theLastTupleId[aTableId] = tValue - 1; range.m_last_tuple_id = tValue - 1;
ret = theFirstTupleId[aTableId]; opValue = range.m_first_tuple_id; // out
break; break;
case 1: case 1:
tOperation->updateTuple(); // create on first use
tOperation->writeTuple();
tOperation->equal("SYSKEY_0", aTableId ); tOperation->equal("SYSKEY_0", aTableId );
tOperation->setValue("NEXTID", opValue); tOperation->setValue("NEXTID", opValue);
if (tConnection->execute( Commit ) == -1 ) if (tConnection->execute( Commit ) == -1 )
goto error_handler; goto error_handler;
theFirstTupleId[aTableId] = ~(Uint64)0; range.reset();
theLastTupleId[aTableId] = ~(Uint64)0;
ret = opValue;
break; break;
case 2: case 2:
tOperation->interpretedUpdateTuple(); tOperation->interpretedUpdateTuple();
tOperation->equal("SYSKEY_0", aTableId ); tOperation->equal("SYSKEY_0", aTableId );
tOperation->load_const_u64(1, opValue); tOperation->load_const_u64(1, opValue);
tOperation->read_attr("NEXTID", 2); tOperation->read_attr("NEXTID", 2);
// compare NEXTID >= opValue
tOperation->branch_le(2, 1, 0); tOperation->branch_le(2, 1, 0);
tOperation->write_attr("NEXTID", 1); tOperation->write_attr("NEXTID", 1);
tOperation->interpret_exit_ok(); tOperation->interpret_exit_ok();
tOperation->def_label(0); tOperation->def_label(0);
tOperation->interpret_exit_nok(9999); tOperation->interpret_exit_nok(9999);
if ( (result = tConnection->execute( Commit )) == -1 ) if (tConnection->execute( Commit ) == -1)
{
if (tConnection->theError.code != 9999)
goto error_handler; goto error_handler;
}
if (result == 9999)
ret = ~(Uint64)0;
else else
{ {
theFirstTupleId[aTableId] = theLastTupleId[aTableId] = opValue - 1; DBUG_PRINT("info",
ret = opValue; ("Setting next auto increment value (db) to %llu",
(ulonglong)opValue));
range.m_first_tuple_id = range.m_last_tuple_id = opValue - 1;
} }
break; break;
case 3: case 3:
...@@ -1005,7 +1120,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) ...@@ -1005,7 +1120,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
tRecAttrResult = tOperation->getValue("NEXTID"); tRecAttrResult = tOperation->getValue("NEXTID");
if (tConnection->execute( Commit ) == -1 ) if (tConnection->execute( Commit ) == -1 )
goto error_handler; goto error_handler;
ret = tRecAttrResult->u_64_value(); opValue = tRecAttrResult->u_64_value(); // out
break; break;
default: default:
goto error_handler; goto error_handler;
...@@ -1013,7 +1128,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) ...@@ -1013,7 +1128,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
this->closeTransaction(tConnection); this->closeTransaction(tConnection);
DBUG_RETURN(ret); DBUG_RETURN(0);
error_handler: error_handler:
theError.code = tConnection->theError.code; theError.code = tConnection->theError.code;
...@@ -1023,7 +1138,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) ...@@ -1023,7 +1138,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
theError.code, theError.code,
tConnection ? tConnection->theError.code : -1, tConnection ? tConnection->theError.code : -1,
tOperation ? tOperation->theError.code : -1)); tOperation ? tOperation->theError.code : -1));
DBUG_RETURN(~(Uint64)0); DBUG_RETURN(-1);
} }
Uint32 Uint32
......
...@@ -1387,9 +1387,6 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl) ...@@ -1387,9 +1387,6 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
Ndb_local_table_info::create(impl, m_local_table_data_size); Ndb_local_table_info::create(impl, m_local_table_data_size);
m_localHash.put(impl->m_internalName.c_str(), info); m_localHash.put(impl->m_internalName.c_str(), info);
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
} }
int int
...@@ -2249,11 +2246,11 @@ NdbDictionaryImpl::createTable(NdbTableImpl &t) ...@@ -2249,11 +2246,11 @@ NdbDictionaryImpl::createTable(NdbTableImpl &t)
} }
if (autoIncrement) { if (autoIncrement) {
// XXX unlikely race condition - t.m_id may no longer be same table // XXX unlikely race condition - t.m_id may no longer be same table
if (! m_ndb.setTupleIdInNdb(t.m_id, initialValue, false)) { // the tuple id range is not used on input
if (m_ndb.theError.code) Ndb::TupleIdRange range;
if (m_ndb.setTupleIdInNdb(&t, range, initialValue, false) == -1) {
assert(m_ndb.theError.code != 0);
m_error.code = m_ndb.theError.code; m_error.code = m_ndb.theError.code;
else
m_error.code = 4336;
delete t2; delete t2;
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
......
...@@ -951,8 +951,6 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName) ...@@ -951,8 +951,6 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
if (info) if (info)
{ {
m_localHash.put(internalTableName.c_str(), info); m_localHash.put(internalTableName.c_str(), info);
m_ndb.theFirstTupleId[tab->getTableId()] = ~0;
m_ndb.theLastTupleId[tab->getTableId()] = ~0;
} }
} }
} }
......
...@@ -100,10 +100,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -100,10 +100,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theConnectionArray[i] = NULL; theConnectionArray[i] = NULL;
}//forg }//forg
m_sys_tab_0 = NULL; m_sys_tab_0 = NULL;
for (i = 0; i < 2048 ; i++) {
theFirstTupleId[i] = 0;
theLastTupleId[i] = 0;
}//for
theImpl->m_dbname.assign(aDataBase); theImpl->m_dbname.assign(aDataBase);
theImpl->m_schemaname.assign(aSchema); theImpl->m_schemaname.assign(aSchema);
......
...@@ -600,7 +600,6 @@ ErrorBundle ErrorCodes[] = { ...@@ -600,7 +600,6 @@ ErrorBundle ErrorCodes[] = {
{ 4269, DMEC, IE, "No connection to ndb management server" }, { 4269, DMEC, IE, "No connection to ndb management server" },
{ 4270, DMEC, IE, "Unknown blob error" }, { 4270, DMEC, IE, "Unknown blob error" },
{ 4335, DMEC, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" }, { 4335, DMEC, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" },
{ 4336, DMEC, AE, "Auto-increment value set below current value" },
{ 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" }, { 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" },
{ 4272, DMEC, AE, "Table definition has undefined column" }, { 4272, DMEC, AE, "Table definition has undefined column" },
{ 4273, DMEC, IE, "No blob table in dict cache" }, { 4273, DMEC, IE, "No blob table in dict cache" },
......
...@@ -1139,9 +1139,13 @@ runCreateAutoincrementTable(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -1139,9 +1139,13 @@ runCreateAutoincrementTable(NDBT_Context* ctx, NDBT_Step* step){
for (int i = 0; i < 16; i++) { for (int i = 0; i < 16; i++) {
Uint64 value = myNdb->getAutoIncrementValue(tabname, 1); Uint64 value;
if (myNdb->getAutoIncrementValue(tabname, value, 1) == -1) {
if (value != (startvalue+i)) { g_err << "getAutoIncrementValue failed on " << tabname << endl;
APIERROR(myNdb->getNdbError());
return NDBT_FAILED;
}
else if (value != (startvalue+i)) {
g_err << "value = " << value << " expected " << startvalue+i << endl;; g_err << "value = " << value << " expected " << startvalue+i << endl;;
APIERROR(myNdb->getNdbError()); APIERROR(myNdb->getNdbError());
// ret = NDBT_FAILED; // ret = NDBT_FAILED;
......
...@@ -151,9 +151,12 @@ BackupRestore::finalize_table(const TableS & table){ ...@@ -151,9 +151,12 @@ BackupRestore::finalize_table(const TableS & table){
if (table.have_auto_inc()) if (table.have_auto_inc())
{ {
Uint64 max_val= table.get_max_auto_val(); Uint64 max_val= table.get_max_auto_val();
Uint64 auto_val= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable)); Uint64 auto_val;
if (max_val+1 > auto_val || auto_val == ~(Uint64)0) int r= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable), auto_val);
ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false); if (r == -1 && m_ndb->getNdbError().code != 626)
ret= false;
else if (r == -1 || max_val+1 > auto_val)
ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false) != -1;
} }
return ret; return ret;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment