Commit 7d655f8f authored by holyfoot@mysql.com's avatar holyfoot@mysql.com

Merge bk@192.168.21.1:mysql-5.1-new

into mysql.com:/home/hf/work/mysql-5.1.mrg
parents 5ec2d9b5 135f4ab7
......@@ -286,3 +286,45 @@ unique key tx1 (c002, c003, c004, c005)) engine=ndb;
create index tx2
on t1 (c010, c011, c012, c013);
drop table t1;
CREATE TABLE t1 (
auto int(5) unsigned NOT NULL auto_increment,
string char(10),
vstring varchar(10),
bin binary(2),
vbin varbinary(7),
tiny tinyint(4) DEFAULT '0' NOT NULL ,
short smallint(6) DEFAULT '1' NOT NULL ,
medium mediumint(8) DEFAULT '0' NOT NULL,
long_int int(11) DEFAULT '0' NOT NULL,
longlong bigint(13) DEFAULT '0' NOT NULL,
real_float float(13,1) DEFAULT 0.0 NOT NULL,
real_double double(16,4),
real_decimal decimal(16,4),
utiny tinyint(3) unsigned DEFAULT '0' NOT NULL,
ushort smallint(5) unsigned zerofill DEFAULT '00000' NOT NULL,
umedium mediumint(8) unsigned DEFAULT '0' NOT NULL,
ulong int(11) unsigned DEFAULT '0' NOT NULL,
ulonglong bigint(13) unsigned DEFAULT '0' NOT NULL,
bits bit(3),
options enum('zero','one','two','three','four') not null,
flags set('zero','one','two','three','four') not null,
date_field date,
year_field year,
time_field time,
date_time datetime,
time_stamp timestamp,
PRIMARY KEY (auto)
) engine=ndb;
CREATE TEMPORARY TABLE ndb_show_tables (id INT, type VARCHAR(20), state VARCHAR(20), logging VARCHAR(20), _database VARCHAR(255), _schema VARCHAR(20), name VARCHAR(255));
LOAD DATA INFILE 'tmp.dat' INTO TABLE ndb_show_tables;
set @t1_id = (select id from ndb_show_tables where name like '%t1%');
truncate ndb_show_tables;
alter table t1 change tiny new_tiny tinyint(4) DEFAULT '0' NOT NULL;
create index i1 on t1(medium);
alter table t1 add index i2(long_int);
drop index i1 on t1;
LOAD DATA INFILE 'tmp.dat' INTO TABLE ndb_show_tables;
select 'no_copy' from ndb_show_tables where id = @t1_id and name like '%t1%';
no_copy
no_copy
DROP TABLE t1, ndb_show_tables;
DROP TABLE IF EXISTS t1;
create table t1 ( a int primary key, b varchar(10), c varchar(10), index (b) )
engine=ndb;
insert into t1 values (1,'one','one'), (2,'two','two'), (3,'three','three');
create index c on t1(c);
select * from t1 where c = 'two';
a b c
2 two two
alter table t1 drop index c;
select * from t1 where c = 'two';
select * from t1 where c = 'two';
a b c
2 two two
drop table t1;
create table t3 (a int primary key) engine=ndbcluster;
begin;
insert into t3 values (1);
alter table t3 rename t4;
delete from t3;
insert into t3 values (1);
commit;
select * from t3;
ERROR HY000: Can't lock file (errno: 155)
select * from t4;
a
1
drop table t4;
show tables;
Tables_in_test
......@@ -62,4 +62,6 @@ t4
drop table t1, t2, t3, t4;
drop table if exists t1, t3, t4;
Warnings:
Error 155 Table 'test.t1' doesn't exist
Error 155 Table 'test.t3' doesn't exist
Error 155 Table 'test.t4' doesn't exist
......@@ -18,12 +18,14 @@ ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t
ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown
#ndb_cache2 : BUG#18597 2006-03-28 brian simultaneous drop table and ndb statistics update triggers node failure
#ndb_cache_multi2 : BUG#18597 2006-04-10 kent simultaneous drop table and ndb statistics update triggers node failure
ndb_load : BUG#17233 2006-05-04 tomas failed load data from infile causes mysqld dbug_assert, binlog not flushed
partition_03ndb : BUG#16385 2006-03-24 mikael Partitions: crash when updating a range partitioned NDB table
ps_7ndb : BUG#18950 2006-02-16 jmiller create table like does not obtain LOCK_open
rpl_ndb_2innodb : BUG#19227 2006-04-20 pekka pk delete apparently not replicated
rpl_ndb_2myisam : BUG#19227 2006-04-20 pekka pk delete apparently not replicated
rpl_ndb_auto_inc : BUG#17086 2006-02-16 jmiller CR: auto_increment_increment and auto_increment_offset produce duplicate key er
rpl_ndb_dd_partitions : BUG#19259 2006-04-21 rpl_ndb_dd_partitions fails on solaris
rpl_ndb_commit_afterflush : BUG#19328 2006-05-04 tomas Slave timeout with COM_REGISTER_SLAVE error causing stop
rpl_ndb_dd_partitions : BUG#19259 2006-04-21 rpl_ndb_dd_partitions fails on s/AMD
rpl_ndb_ddl : BUG#18946 result file needs update + test needs to checked
rpl_ndb_innodb2ndb : BUG#17400 2006-04-19 tomas Cluster Replication: delete & update of rows in table without pk fails on slave.
rpl_ndb_log : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ
......
......@@ -327,3 +327,59 @@ drop table t1;
# End of 4.1 tests
# On-line alter table
CREATE TABLE t1 (
auto int(5) unsigned NOT NULL auto_increment,
string char(10),
vstring varchar(10),
bin binary(2),
vbin varbinary(7),
tiny tinyint(4) DEFAULT '0' NOT NULL ,
short smallint(6) DEFAULT '1' NOT NULL ,
medium mediumint(8) DEFAULT '0' NOT NULL,
long_int int(11) DEFAULT '0' NOT NULL,
longlong bigint(13) DEFAULT '0' NOT NULL,
real_float float(13,1) DEFAULT 0.0 NOT NULL,
real_double double(16,4),
real_decimal decimal(16,4),
utiny tinyint(3) unsigned DEFAULT '0' NOT NULL,
ushort smallint(5) unsigned zerofill DEFAULT '00000' NOT NULL,
umedium mediumint(8) unsigned DEFAULT '0' NOT NULL,
ulong int(11) unsigned DEFAULT '0' NOT NULL,
ulonglong bigint(13) unsigned DEFAULT '0' NOT NULL,
bits bit(3),
options enum('zero','one','two','three','four') not null,
flags set('zero','one','two','three','four') not null,
date_field date,
year_field year,
time_field time,
date_time datetime,
time_stamp timestamp,
PRIMARY KEY (auto)
) engine=ndb;
CREATE TEMPORARY TABLE ndb_show_tables (id INT, type VARCHAR(20), state VARCHAR(20), logging VARCHAR(20), _database VARCHAR(255), _schema VARCHAR(20), name VARCHAR(255));
--disable_warnings
--exec $NDB_TOOLS_DIR/ndb_show_tables --p > $MYSQLTEST_VARDIR/master-data/test/tmp.dat
LOAD DATA INFILE 'tmp.dat' INTO TABLE ndb_show_tables;
--enable_warnings
set @t1_id = (select id from ndb_show_tables where name like '%t1%');
truncate ndb_show_tables;
alter table t1 change tiny new_tiny tinyint(4) DEFAULT '0' NOT NULL;
create index i1 on t1(medium);
alter table t1 add index i2(long_int);
drop index i1 on t1;
--disable_warnings
--exec $NDB_TOOLS_DIR/ndb_show_tables --p > $MYSQLTEST_VARDIR/master-data/test/tmp.dat
LOAD DATA INFILE 'tmp.dat' INTO TABLE ndb_show_tables;
--enable_warnings
select 'no_copy' from ndb_show_tables where id = @t1_id and name like '%t1%';
DROP TABLE t1, ndb_show_tables;
-- source include/have_ndb.inc
-- source include/have_multi_ndb.inc
-- source include/not_embedded.inc
-- source include/have_binlog_format_row.inc
--disable_warnings
DROP TABLE IF EXISTS t1;
......
-- source include/have_ndb.inc
-- source include/have_multi_ndb.inc
-- source include/not_embedded.inc
-- source include/have_binlog_format_statement.inc
--disable_warnings
DROP TABLE IF EXISTS t1;
--enable_warnings
connection server1;
create table t1 ( a int primary key, b varchar(10), c varchar(10), index (b) )
engine=ndb;
insert into t1 values (1,'one','one'), (2,'two','two'), (3,'three','three');
create index c on t1(c);
connection server2;
select * from t1 where c = 'two';
connection server1;
alter table t1 drop index c;
connection server2;
--disable_result_log
--error 0,1412
select * from t1 where c = 'two';
--enable_result_log
select * from t1 where c = 'two';
connection server1;
drop table t1;
connection server1;
create table t3 (a int primary key) engine=ndbcluster;
connection server2;
begin;
insert into t3 values (1);
connection server1;
alter table t3 rename t4;
connection server2;
# with rbr the below will not work as the "alter" event
# explicitly invalidates the dictionary cache.
# This should work as transaction is ongoing...
delete from t3;
insert into t3 values (1);
commit;
# This should fail as its a new transaction
--error 1015
select * from t3;
select * from t4;
drop table t4;
show tables;
connection server1;
This diff is collapsed.
......@@ -70,8 +70,8 @@ typedef enum ndb_index_status {
typedef struct ndb_index_data {
NDB_INDEX_TYPE type;
NDB_INDEX_STATUS status;
void *index;
void *unique_index;
const NdbDictionary::Index *index;
const NdbDictionary::Index *unique_index;
unsigned char *unique_index_attrid_map;
// In this version stats are not shared between threads
NdbIndexStat* index_stat;
......@@ -560,6 +560,7 @@ class ha_ndbcluster: public handler
ha_ndbcluster(TABLE_SHARE *table);
~ha_ndbcluster();
int ha_initialise();
int open(const char *name, int mode, uint test_if_locked);
int close(void);
......@@ -708,23 +709,17 @@ static void set_tabname(const char *pathname, char *tabname);
Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE *share);
int alter_table_name(const char *to);
static int delete_table(ha_ndbcluster *h, Ndb *ndb,
const char *path,
const char *db,
const char *table_name);
int drop_ndb_table();
int create_ndb_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
int create_unique_index(const char *name, KEY *key_info);
int create_index(const char *name, KEY *key_info,
NDB_INDEX_TYPE idx_type, uint idx_no);
int drop_ndb_index(const char *name);
int table_changed(const void *pack_frm_data, uint pack_frm_len);
// Index list management
int create_indexes(Ndb *ndb, TABLE *tab);
void clear_index(int i);
void clear_indexes();
int open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error);
void renumber_indexes(Ndb *ndb, TABLE *tab);
int drop_indexes(Ndb *ndb, TABLE *tab);
......@@ -732,7 +727,7 @@ static void set_tabname(const char *pathname, char *tabname);
KEY *key_info, const char *index_name, uint index_no);
int initialize_autoincrement(const void *table);
int get_metadata(const char* path);
void release_metadata();
void release_metadata(THD *thd, Ndb *ndb);
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info,
......@@ -795,8 +790,6 @@ static void set_tabname(const char *pathname, char *tabname);
void print_results();
ulonglong get_auto_increment();
int invalidate_dictionary_cache(bool global,
const NdbDictionary::Table *ndbtab);
int ndb_err(NdbTransaction*);
bool uses_blob_value();
......@@ -834,7 +827,6 @@ static void set_tabname(const char *pathname, char *tabname);
NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor;
const NdbDictionary::Table *m_table;
int m_table_version;
struct Ndb_local_table_statistics *m_table_info;
char m_dbname[FN_HEADLEN];
//char m_schemaname[FN_HEADLEN];
......
This diff is collapsed.
......@@ -41,14 +41,15 @@ enum SCHEMA_OP_TYPE
{
SOT_DROP_TABLE= 0,
SOT_CREATE_TABLE= 1,
SOT_RENAME_TABLE= 2,
SOT_RENAME_TABLE_NEW= 2,
SOT_ALTER_TABLE= 3,
SOT_DROP_DB= 4,
SOT_CREATE_DB= 5,
SOT_ALTER_DB= 6,
SOT_CLEAR_SLOCK= 7,
SOT_TABLESPACE= 8,
SOT_LOGFILE_GROUP= 9
SOT_LOGFILE_GROUP= 9,
SOT_RENAME_TABLE= 10
};
const uint max_ndb_nodes= 64; /* multiple of 32 */
......@@ -56,6 +57,45 @@ const uint max_ndb_nodes= 64; /* multiple of 32 */
static const char *ha_ndb_ext=".ndb";
static const char share_prefix[]= "./";
class Ndb_table_guard
{
public:
Ndb_table_guard(NDBDICT *dict, const char *tabname)
: m_dict(dict)
{
DBUG_ENTER("Ndb_table_guard");
m_ndbtab= m_dict->getTableGlobal(tabname);
m_invalidate= 0;
DBUG_PRINT("info", ("m_ndbtab: %p", m_ndbtab));
DBUG_VOID_RETURN;
}
~Ndb_table_guard()
{
DBUG_ENTER("~Ndb_table_guard");
if (m_ndbtab)
{
DBUG_PRINT("info", ("m_ndbtab: %p m_invalidate: %d",
m_ndbtab, m_invalidate));
m_dict->removeTableGlobal(*m_ndbtab, m_invalidate);
}
DBUG_VOID_RETURN;
}
const NDBTAB *get_table() { return m_ndbtab; }
void invalidate() { m_invalidate= 1; }
const NDBTAB *release()
{
DBUG_ENTER("Ndb_table_guard::release");
const NDBTAB *tmp= m_ndbtab;
DBUG_PRINT("info", ("m_ndbtab: %p", m_ndbtab));
m_ndbtab = 0;
DBUG_RETURN(tmp);
}
private:
const NDBTAB *m_ndbtab;
NDBDICT *m_dict;
int m_invalidate;
};
#ifdef HAVE_NDB_BINLOG
extern pthread_t ndb_binlog_thread;
extern pthread_mutex_t injector_mutex;
......@@ -98,8 +138,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type,
const char *old_db= 0,
const char *old_table_name= 0);
const char *new_db= 0,
const char *new_table_name= 0);
int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
NDB_SHARE *share,
const char *type_str);
......
......@@ -3012,49 +3012,7 @@ static int init_server_components()
unireg_abort(1);
}
/* We have to initialize the storage engines before CSV logging */
if (ha_init())
{
sql_print_error("Can't init databases");
unireg_abort(1);
}
#ifdef WITH_CSV_STORAGE_ENGINE
if (opt_bootstrap)
log_output_options= LOG_FILE;
else
logger.init_log_tables();
if (log_output_options & LOG_NONE)
{
/*
Issue a warining if there were specified additional options to the
log-output along with NONE. Probably this wasn't what user wanted.
*/
if ((log_output_options & LOG_NONE) && (log_output_options & ~LOG_NONE))
sql_print_warning("There were other values specified to "
"log-output besides NONE. Disabling slow "
"and general logs anyway.");
logger.set_handlers(LOG_FILE, LOG_NONE, LOG_NONE);
}
else
{
/* fall back to the log files if tables are not present */
if (have_csv_db == SHOW_OPTION_NO)
{
sql_print_error("CSV engine is not present, falling back to the "
"log files");
log_output_options= log_output_options & ~LOG_TABLE | LOG_FILE;
}
logger.set_handlers(LOG_FILE, opt_slow_log ? log_output_options:LOG_NONE,
opt_log ? log_output_options:LOG_NONE);
}
#else
logger.set_handlers(LOG_FILE, opt_slow_log ? LOG_FILE:LOG_NONE,
opt_log ? LOG_FILE:LOG_NONE);
#endif
/* need to configure logging before initializing storage engines */
if (opt_update_log)
{
/*
......@@ -3182,6 +3140,49 @@ server.");
using_update_log=1;
}
/* We have to initialize the storage engines before CSV logging */
if (ha_init())
{
sql_print_error("Can't init databases");
unireg_abort(1);
}
#ifdef WITH_CSV_STORAGE_ENGINE
if (opt_bootstrap)
log_output_options= LOG_FILE;
else
logger.init_log_tables();
if (log_output_options & LOG_NONE)
{
/*
Issue a warining if there were specified additional options to the
log-output along with NONE. Probably this wasn't what user wanted.
*/
if ((log_output_options & LOG_NONE) && (log_output_options & ~LOG_NONE))
sql_print_warning("There were other values specified to "
"log-output besides NONE. Disabling slow "
"and general logs anyway.");
logger.set_handlers(LOG_FILE, LOG_NONE, LOG_NONE);
}
else
{
/* fall back to the log files if tables are not present */
if (have_csv_db == SHOW_OPTION_NO)
{
sql_print_error("CSV engine is not present, falling back to the "
"log files");
log_output_options= log_output_options & ~LOG_TABLE | LOG_FILE;
}
logger.set_handlers(LOG_FILE, opt_slow_log ? log_output_options:LOG_NONE,
opt_log ? log_output_options:LOG_NONE);
}
#else
logger.set_handlers(LOG_FILE, opt_slow_log ? LOG_FILE:LOG_NONE,
opt_log ? LOG_FILE:LOG_NONE);
#endif
/*
Check that the default storage engine is actually available.
*/
......
......@@ -64,6 +64,7 @@
#define CFG_DB_BACKUP_DATA_BUFFER_MEM 134
#define CFG_DB_BACKUP_LOG_BUFFER_MEM 135
#define CFG_DB_BACKUP_WRITE_SIZE 136
#define CFG_DB_BACKUP_MAX_WRITE_SIZE 139
#define CFG_LOG_DESTINATION 147
......
......@@ -1470,6 +1470,8 @@ public:
*
* @return tuple id or 0 on error
*/
int initAutoIncrement();
Uint64 getAutoIncrementValue(const char* aTableName,
Uint32 cacheSize = 1);
Uint64 getAutoIncrementValue(const NdbDictionary::Table * aTable,
......@@ -1694,6 +1696,7 @@ private:
// The tupleId is retreived from DB the
// tupleId is unique for each tableid.
const NdbDictionary::Table *m_sys_tab_0;
Uint64 theFirstTupleId[2048];
Uint64 theLastTupleId[2048];
......
......@@ -798,6 +798,7 @@ public:
* Get object status
*/
virtual Object::Status getObjectStatus() const;
void setStatusInvalid() const;
/**
* Get object version
......@@ -1734,6 +1735,7 @@ public:
* @return 0 if successful otherwise -1.
*/
int createIndex(const Index &index);
int createIndex(const Index &index, const Table &table);
/**
* Drop index with given name
......@@ -1805,6 +1807,15 @@ public:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
const Table * getTable(const char * name, void **data) const;
void set_local_table_data_size(unsigned sz);
const Index * getIndexGlobal(const char * indexName,
const Table &ndbtab) const;
const Table * getTableGlobal(const char * tableName) const;
int alterTableGlobal(const Table &f, const Table &t);
int dropTableGlobal(const Table &ndbtab);
int dropIndexGlobal(const Index &index);
int removeIndexGlobal(const Index &ndbidx, int invalidate) const;
int removeTableGlobal(const Table &ndbtab, int invalidate) const;
#endif
};
};
......
......@@ -56,7 +56,7 @@ public:
* multiplied by a percentage obtained from the cache (result zero is
* returned as 1).
*/
int records_in_range(NdbDictionary::Index* index,
int records_in_range(const NdbDictionary::Index* index,
NdbIndexScanOperation* op,
Uint64 table_rows,
Uint64* count,
......
......@@ -75,7 +75,7 @@ public:
/**
* Constructor / Destructor
*/
SocketServer(int maxSessions = 32);
SocketServer(unsigned maxSessions = ~(unsigned)0);
~SocketServer();
/**
......
......@@ -27,7 +27,7 @@
#define DEBUG(x) ndbout << x << endl;
SocketServer::SocketServer(int maxSessions) :
SocketServer::SocketServer(unsigned maxSessions) :
m_sessions(10),
m_services(5)
{
......@@ -136,7 +136,7 @@ SocketServer::setup(SocketServer::Service * service,
}
DBUG_PRINT("info",("bound to %u",ntohs(servaddr.sin_port)));
if (listen(sock, m_maxSessions) == -1){
if (listen(sock, m_maxSessions > 32 ? 32 : m_maxSessions) == -1){
DBUG_PRINT("error",("listen() - %d - %s",
errno, strerror(errno)));
NDB_CLOSE_SOCKET(sock);
......
......@@ -163,15 +163,16 @@ Backup::execREAD_CONFIG_REQ(Signal* signal)
Uint32 szDataBuf = (2 * 1024 * 1024);
Uint32 szLogBuf = (2 * 1024 * 1024);
Uint32 szWrite = 32768;
Uint32 szWrite = 32768, maxWriteSize = (256 * 1024);
ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_DATA_BUFFER_MEM, &szDataBuf);
ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_LOG_BUFFER_MEM, &szLogBuf);
ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_WRITE_SIZE, &szWrite);
ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MAX_WRITE_SIZE, &maxWriteSize);
c_defaults.m_logBufferSize = szLogBuf;
c_defaults.m_dataBufferSize = szDataBuf;
c_defaults.m_minWriteSize = szWrite;
c_defaults.m_maxWriteSize = 256*1024;
c_defaults.m_maxWriteSize = maxWriteSize;
c_defaults.m_lcp_buffer_size = szDataBuf;
......
......@@ -483,6 +483,14 @@ Dbtup::load_diskpage(Signal* signal,
req.m_callback.m_callbackData= opRec;
req.m_callback.m_callbackFunction=
safe_cast(&Dbtup::disk_page_load_callback);
#ifdef ERROR_INSERT
if (ERROR_INSERTED(4022))
{
flags |= Page_cache_client::DELAY_REQ;
req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000;
}
#endif
if((res= m_pgman.get_page(signal, req, flags)) > 0)
{
......@@ -3119,6 +3127,35 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_page_callback);
int flags = Page_cache_client::COMMIT_REQ;
#ifdef ERROR_INSERT
if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
{
int rnd = rand() % 100;
int slp = 0;
if (ERROR_INSERTED(4024))
{
slp = 3000;
}
else if (rnd > 90)
{
slp = 3000;
}
else if (rnd > 70)
{
slp = 100;
}
ndbout_c("rnd: %d slp: %d", rnd, slp);
if (slp)
{
flags |= Page_cache_client::DELAY_REQ;
preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
}
}
#endif
res = m_pgman.get_page(signal, preq, flags);
if (res == 0)
{
......@@ -3130,6 +3167,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
}
PagePtr disk_page = *(PagePtr*)&m_pgman.m_ptr;
disk_page_set_dirty(disk_page);
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_logbuffer_callback);
......@@ -3164,7 +3202,7 @@ Dbtup::nr_delete_page_callback(Signal* signal,
Ptr<GlobalPage> gpage;
m_global_page_pool.getPtr(gpage, page_id);
PagePtr pagePtr= *(PagePtr*)&gpage;
disk_page_set_dirty(pagePtr);
Dblqh::Nr_op_info op;
op.m_ptr_i = userpointer;
op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
......
......@@ -944,12 +944,16 @@ Pgman::process_callback(Signal* signal)
int max_count = 1;
Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK];
while (! pl_callback.isEmpty() && --max_count >= 0)
Ptr<Page_entry> ptr;
pl_callback.first(ptr);
while (! ptr.isNull() && --max_count >= 0)
{
jam();
Ptr<Page_entry> ptr;
pl_callback.first(ptr);
if (! process_callback(signal, ptr))
Ptr<Page_entry> curr = ptr;
pl_callback.next(ptr);
if (! process_callback(signal, curr))
{
jam();
break;
......@@ -987,6 +991,18 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr)
#ifdef VM_TRACE
debugOut << "PGMAN: " << req_ptr << " : process_callback" << endl;
#endif
#ifdef ERROR_INSERT
if (req_ptr.p->m_flags & Page_request::DELAY_REQ)
{
Uint64 now = NdbTick_CurrentMillisecond();
if (now < req_ptr.p->m_delay_until_time)
{
break;
}
}
#endif
b = globalData.getBlock(req_ptr.p->m_block);
callback = req_ptr.p->m_callback;
......@@ -1314,6 +1330,24 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr)
state |= Page_entry::MAPPED;
set_page_state(ptr, state);
{
/**
* Update lsn record on page
* as it can be modified/flushed wo/ update_lsn has been called
* (e.g. prealloc) and it then would get lsn 0, which is bad
* when running undo and following SR
*/
Ptr<GlobalPage> pagePtr;
m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i);
File_formats::Datafile::Data_page* page =
(File_formats::Datafile::Data_page*)pagePtr.p;
Uint64 lsn = 0;
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += page->m_page_header.m_page_lsn_lo;
ptr.p->m_lsn = lsn;
}
ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--;
......@@ -1575,7 +1609,13 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
}
bool only_request = ptr.p->m_requests.isEmpty();
#ifdef ERROR_INSERT
if (req_flags & Page_request::DELAY_REQ)
{
jam();
only_request = false;
}
#endif
if (only_request &&
state & Page_entry::MAPPED)
{
......@@ -1623,7 +1663,10 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
req_ptr.p->m_block = page_req.m_block;
req_ptr.p->m_flags = page_req.m_flags;
req_ptr.p->m_callback = page_req.m_callback;
#ifdef ERROR_INSERT
req_ptr.p->m_delay_until_time = page_req.m_delay_until_time;
#endif
state |= Page_entry::REQUEST;
if (only_request && req_flags & Page_request::EMPTY_PAGE)
{
......
......@@ -256,12 +256,18 @@ private:
,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn
,UNLOCK_PAGE = 0x0400
,CORR_REQ = 0x0800 // correlated request (no LIRS update)
#ifdef ERROR_INSERT
,DELAY_REQ = 0x1000 // Force request to be delayed
#endif
};
Uint16 m_block;
Uint16 m_flags;
SimulatedBlock::Callback m_callback;
#ifdef ERROR_INSERT
Uint64 m_delay_until_time;
#endif
Uint32 nextList;
Uint32 m_magic;
};
......@@ -508,6 +514,10 @@ public:
struct Request {
Local_key m_page;
SimulatedBlock::Callback m_callback;
#ifdef ERROR_INSERT
Uint64 m_delay_until_time;
#endif
};
Ptr<GlobalPage> m_ptr; // TODO remove
......@@ -520,6 +530,9 @@ public:
,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ
,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE
,CORR_REQ = Pgman::Page_request::CORR_REQ
#ifdef ERROR_INSERT
,DELAY_REQ = Pgman::Page_request::DELAY_REQ
#endif
};
/**
......@@ -588,7 +601,10 @@ Page_cache_client::get_page(Signal* signal, Request& req, Uint32 flags)
page_req.m_block = m_block;
page_req.m_flags = flags;
page_req.m_callback = req.m_callback;
#ifdef ERROR_INSERT
page_req.m_delay_until_time = req.m_delay_until_time;
#endif
int i = m_pgman->get_page(signal, entry_ptr, page_req);
if (i > 0)
{
......
......@@ -1235,7 +1235,19 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
false,
ConfigInfo::CI_INT,
"32K",
"0",
"2K",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_DB_BACKUP_MAX_WRITE_SIZE,
"BackupMaxWriteSize",
DB_TOKEN,
"Max size of filesystem writes made by backup (in bytes)",
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
"256K",
"2K",
STR_VALUE(MAX_INT_RNIL) },
{
......
......@@ -63,6 +63,7 @@ LocalDictCache::~LocalDictCache(){
Ndb_local_table_info *
LocalDictCache::get(const char * name){
ASSERT_NOT_MYSQLD;
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
return m_tableHash.getData(name, len);
......@@ -70,6 +71,7 @@ LocalDictCache::get(const char * name){
void
LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
ASSERT_NOT_MYSQLD;
assert(! is_ndb_blob_table(name));
const Uint32 id = tab_info->m_table_impl->m_id;
m_tableHash.insertKey(name, strlen(name), id, tab_info);
......@@ -77,6 +79,7 @@ LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
void
LocalDictCache::drop(const char * name){
ASSERT_NOT_MYSQLD;
assert(! is_ndb_blob_table(name));
Ndb_local_table_info *info= m_tableHash.deleteKey(name, strlen(name));
DBUG_ASSERT(info != 0);
......@@ -100,8 +103,15 @@ GlobalDictCache::~GlobalDictCache(){
Vector<TableVersion> * vers = curr->theData;
const unsigned sz = vers->size();
for(unsigned i = 0; i<sz ; i++){
if((* vers)[i].m_impl != 0)
TableVersion tv= (*vers)[i];
DBUG_PRINT(" ", ("vers[%d]: ver: %d, refCount: %d, status: %d",
i, tv.m_version, tv.m_refCount, tv.m_status));
if(tv.m_impl != 0)
{
DBUG_PRINT(" ", ("m_impl: internalname: %s",
tv.m_impl->m_internalName.c_str()));
delete (* vers)[i].m_impl;
}
}
delete curr->theData;
curr->theData= NULL;
......@@ -164,11 +174,18 @@ GlobalDictCache::get(const char * name)
TableVersion * ver = & versions->back();
switch(ver->m_status){
case OK:
if (ver->m_impl->m_status == NdbDictionary::Object::Invalid)
{
ver->m_status = DROPPED;
retreive = true; // Break loop
break;
}
ver->m_refCount++;
DBUG_PRINT("info", ("Table OK version=%x.%x refCount=%u",
ver->m_impl->m_version & 0xFFFFFF,
ver->m_impl->m_version >> 24,
ver->m_refCount));
DBUG_PRINT("info", ("Table OK tab: %p version=%x.%x refCount=%u",
ver->m_impl,
ver->m_impl->m_version & 0xFFFFFF,
ver->m_impl->m_version >> 24,
ver->m_refCount));
DBUG_RETURN(ver->m_impl);
case DROPPED:
retreive = true; // Break loop
......@@ -197,8 +214,8 @@ NdbTableImpl *
GlobalDictCache::put(const char * name, NdbTableImpl * tab)
{
DBUG_ENTER("GlobalDictCache::put");
DBUG_PRINT("enter", ("name: %s, internal_name: %s version: %x.%x",
name,
DBUG_PRINT("enter", ("tab: %p name: %s, internal_name: %s version: %x.%x",
tab, name,
tab ? tab->m_internalName.c_str() : "tab NULL",
tab ? tab->m_version & 0xFFFFFF : 0,
tab ? tab->m_version >> 24 : 0));
......@@ -264,66 +281,11 @@ GlobalDictCache::put(const char * name, NdbTableImpl * tab)
}
void
GlobalDictCache::drop(NdbTableImpl * tab)
{
DBUG_ENTER("GlobalDictCache::drop");
DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
assert(! is_ndb_blob_table(tab));
unsigned i;
const Uint32 len = strlen(tab->m_internalName.c_str());
Vector<TableVersion> * vers =
m_tableHash.getData(tab->m_internalName.c_str(), len);
if(vers == 0){
// Should always tried to retreive it first
// and thus there should be a record
abort();
}
const Uint32 sz = vers->size();
if(sz == 0){
// Should always tried to retreive it first
// and thus there should be a record
abort();
}
for(i = 0; i < sz; i++){
TableVersion & ver = (* vers)[i];
if(ver.m_impl == tab){
if(ver.m_refCount == 0 || ver.m_status == RETREIVING ||
ver.m_version != tab->m_version){
DBUG_PRINT("info", ("Dropping with refCount=%d status=%d impl=%p",
ver.m_refCount, ver.m_status, ver.m_impl));
break;
}
DBUG_PRINT("info", ("Found table to drop, i: %d, name: %s",
i, ver.m_impl->m_internalName.c_str()));
ver.m_refCount--;
ver.m_status = DROPPED;
if(ver.m_refCount == 0){
DBUG_PRINT("info", ("refCount is zero, deleting m_impl"));
delete ver.m_impl;
vers->erase(i);
}
DBUG_VOID_RETURN;
}
}
for(i = 0; i<sz; i++){
TableVersion & ver = (* vers)[i];
ndbout_c("%d: version: %d refCount: %d status: %d impl: %p",
i, ver.m_version, ver.m_refCount,
ver.m_status, ver.m_impl);
}
abort();
}
void
GlobalDictCache::release(NdbTableImpl * tab)
GlobalDictCache::release(NdbTableImpl * tab, int invalidate)
{
DBUG_ENTER("GlobalDictCache::release");
DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
DBUG_PRINT("enter", ("tab: %p internal_name: %s",
tab, tab->m_internalName.c_str()));
assert(! is_ndb_blob_table(tab));
unsigned i;
......@@ -354,6 +316,17 @@ GlobalDictCache::release(NdbTableImpl * tab)
}
ver.m_refCount--;
if (ver.m_impl->m_status == NdbDictionary::Object::Invalid || invalidate)
{
ver.m_impl->m_status = NdbDictionary::Object::Invalid;
ver.m_status = DROPPED;
}
if (ver.m_refCount == 0 && ver.m_status == DROPPED)
{
DBUG_PRINT("info", ("refCount is zero, deleting m_impl"));
delete ver.m_impl;
vers->erase(i);
}
DBUG_VOID_RETURN;
}
}
......@@ -374,6 +347,7 @@ GlobalDictCache::alter_table_rep(const char * name,
Uint32 tableVersion,
bool altered)
{
DBUG_ENTER("GlobalDictCache::alter_table_rep");
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
Vector<TableVersion> * vers =
......@@ -381,13 +355,13 @@ GlobalDictCache::alter_table_rep(const char * name,
if(vers == 0)
{
return;
DBUG_VOID_RETURN;
}
const Uint32 sz = vers->size();
if(sz == 0)
{
return;
DBUG_VOID_RETURN;
}
for(Uint32 i = 0; i < sz; i++)
......@@ -399,15 +373,16 @@ GlobalDictCache::alter_table_rep(const char * name,
ver.m_status = DROPPED;
ver.m_impl->m_status = altered ?
NdbDictionary::Object::Altered : NdbDictionary::Object::Invalid;
return;
DBUG_VOID_RETURN;
}
if(i == sz - 1 && ver.m_status == RETREIVING)
{
ver.m_impl = altered ? &f_altered_table : &f_invalid_table;
return;
DBUG_VOID_RETURN;
}
}
DBUG_VOID_RETURN;
}
template class Vector<GlobalDictCache::TableVersion>;
......@@ -63,11 +63,11 @@ public:
GlobalDictCache();
~GlobalDictCache();
NdbTableImpl * get(NdbTableImpl *tab);
NdbTableImpl * get(const char * name);
NdbTableImpl* put(const char * name, NdbTableImpl *);
void drop(NdbTableImpl *);
void release(NdbTableImpl *);
void release(NdbTableImpl *, int invalidate = 0);
void alter_table_rep(const char * name,
Uint32 tableId, Uint32 tableVersion, bool altered);
......
......@@ -901,6 +901,27 @@ Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase )
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 1) == val));
}
int Ndb::initAutoIncrement()
{
if (m_sys_tab_0)
return 0;
BaseString currentDb(getDatabaseName());
BaseString currentSchema(getDatabaseSchemaName());
setDatabaseName("sys");
setDatabaseSchemaName("def");
m_sys_tab_0 = getDictionary()->getTableGlobal("SYSTAB_0");
// Restore current name space
setDatabaseName(currentDb.c_str());
setDatabaseSchemaName(currentSchema.c_str());
return (m_sys_tab_0 == NULL);
}
Uint64
Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
{
......@@ -916,19 +937,14 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
CHECK_STATUS_MACRO_ZERO;
BaseString currentDb(getDatabaseName());
BaseString currentSchema(getDatabaseSchemaName());
if (initAutoIncrement())
goto error_return;
setDatabaseName("sys");
setDatabaseSchemaName("def");
tConnection = this->startTransaction();
if (tConnection == NULL)
goto error_return;
if (usingFullyQualifiedNames())
tOperation = tConnection->getNdbOperation("SYSTAB_0");
else
tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
tOperation = tConnection->getNdbOperation(m_sys_tab_0);
if (tOperation == NULL)
goto error_handler;
......@@ -997,20 +1013,12 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
this->closeTransaction(tConnection);
// Restore current name space
setDatabaseName(currentDb.c_str());
setDatabaseSchemaName(currentSchema.c_str());
DBUG_RETURN(ret);
error_handler:
theError.code = tConnection->theError.code;
this->closeTransaction(tConnection);
error_return:
// Restore current name space
setDatabaseName(currentDb.c_str());
setDatabaseSchemaName(currentSchema.c_str());
DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
theError.code,
tConnection ? tConnection->theError.code : -1,
......
......@@ -559,6 +559,11 @@ NdbDictionary::Table::getObjectStatus() const {
return m_impl.m_status;
}
void
NdbDictionary::Table::setStatusInvalid() const {
m_impl.m_status = NdbDictionary::Object::Invalid;
}
int
NdbDictionary::Table::getObjectVersion() const {
return m_impl.m_version;
......@@ -1330,6 +1335,11 @@ NdbDictionary::Dictionary::dropTable(Table & t){
return m_impl.dropTable(NdbTableImpl::getImpl(t));
}
int
NdbDictionary::Dictionary::dropTableGlobal(const Table & t){
return m_impl.dropTableGlobal(NdbTableImpl::getImpl(t));
}
int
NdbDictionary::Dictionary::dropTable(const char * name){
return m_impl.dropTable(name);
......@@ -1340,6 +1350,14 @@ NdbDictionary::Dictionary::alterTable(const Table & t){
return m_impl.alterTable(NdbTableImpl::getImpl(t));
}
int
NdbDictionary::Dictionary::alterTableGlobal(const Table & f,
const Table & t)
{
return m_impl.alterTableGlobal(NdbTableImpl::getImpl(f),
NdbTableImpl::getImpl(t));
}
const NdbDictionary::Table *
NdbDictionary::Dictionary::getTable(const char * name, void **data) const
{
......@@ -1349,6 +1367,40 @@ NdbDictionary::Dictionary::getTable(const char * name, void **data) const
return 0;
}
const NdbDictionary::Index *
NdbDictionary::Dictionary::getIndexGlobal(const char * indexName,
const Table &ndbtab) const
{
NdbIndexImpl * i = m_impl.getIndexGlobal(indexName,
NdbTableImpl::getImpl(ndbtab));
if(i)
return i->m_facade;
return 0;
}
const NdbDictionary::Table *
NdbDictionary::Dictionary::getTableGlobal(const char * name) const
{
NdbTableImpl * t = m_impl.getTableGlobal(name);
if(t)
return t->m_facade;
return 0;
}
int
NdbDictionary::Dictionary::removeIndexGlobal(const Index &ndbidx,
int invalidate) const
{
return m_impl.releaseIndexGlobal(NdbIndexImpl::getImpl(ndbidx), invalidate);
}
int
NdbDictionary::Dictionary::removeTableGlobal(const Table &ndbtab,
int invalidate) const
{
return m_impl.releaseTableGlobal(NdbTableImpl::getImpl(ndbtab), invalidate);
}
void NdbDictionary::Dictionary::putTable(const NdbDictionary::Table * table)
{
NdbDictionary::Table *copy_table = new NdbDictionary::Table;
......@@ -1420,6 +1472,13 @@ NdbDictionary::Dictionary::createIndex(const Index & ind)
return m_impl.createIndex(NdbIndexImpl::getImpl(ind));
}
int
NdbDictionary::Dictionary::createIndex(const Index & ind, const Table & tab)
{
return m_impl.createIndex(NdbIndexImpl::getImpl(ind),
NdbTableImpl::getImpl(tab));
}
int
NdbDictionary::Dictionary::dropIndex(const char * indexName,
const char * tableName)
......@@ -1427,6 +1486,12 @@ NdbDictionary::Dictionary::dropIndex(const char * indexName,
return m_impl.dropIndex(indexName, tableName);
}
int
NdbDictionary::Dictionary::dropIndexGlobal(const Index &ind)
{
return m_impl.dropIndexGlobal(NdbIndexImpl::getImpl(ind));
}
const NdbDictionary::Index *
NdbDictionary::Dictionary::getIndex(const char * indexName,
const char * tableName) const
......
......@@ -35,6 +35,9 @@ is_ndb_blob_table(const char* name, Uint32* ptab_id = 0, Uint32* pcol_no = 0);
bool
is_ndb_blob_table(const class NdbTableImpl* t);
extern int ndb_dictionary_is_mysqld;
#define ASSERT_NOT_MYSQLD assert(ndb_dictionary_is_mysqld == 0)
class NdbDictObjectImpl {
public:
int m_id;
......@@ -253,6 +256,8 @@ public:
BaseString m_internalName;
BaseString m_externalName;
BaseString m_tableName;
Uint32 m_table_id;
Uint32 m_table_version;
Vector<NdbColumnImpl *> m_columns;
Vector<int> m_key_ids;
......@@ -539,6 +544,21 @@ private:
UtilBuffer m_buffer;
};
class NdbDictionaryImpl;
class GlobalCacheInitObject
{
public:
NdbDictionaryImpl *m_dict;
const BaseString &m_name;
GlobalCacheInitObject(NdbDictionaryImpl *dict,
const BaseString &name) :
m_dict(dict),
m_name(name)
{}
virtual ~GlobalCacheInitObject() {}
virtual int init(NdbTableImpl &tab) const = 0;
};
class NdbDictionaryImpl : public NdbDictionary::Dictionary {
public:
NdbDictionaryImpl(Ndb &ndb);
......@@ -558,6 +578,7 @@ public:
int removeCachedObject(NdbTableImpl &);
int createIndex(NdbIndexImpl &ix);
int createIndex(NdbIndexImpl &ix, NdbTableImpl & tab);
int dropIndex(const char * indexName,
const char * tableName);
int dropIndex(NdbIndexImpl &, const char * tableName);
......@@ -578,6 +599,15 @@ public:
int listObjects(List& list, NdbDictionary::Object::Type type);
int listIndexes(List& list, Uint32 indexId);
NdbTableImpl * getTableGlobal(const char * tableName);
NdbIndexImpl * getIndexGlobal(const char * indexName,
NdbTableImpl &ndbtab);
int alterTableGlobal(NdbTableImpl &orig_impl, NdbTableImpl &impl);
int dropTableGlobal(NdbTableImpl &);
int dropIndexGlobal(NdbIndexImpl & impl);
int releaseTableGlobal(NdbTableImpl & impl, int invalidate);
int releaseIndexGlobal(NdbIndexImpl & impl, int invalidate);
NdbTableImpl * getTable(const char * tableName, void **data= 0);
NdbTableImpl * getBlobTable(const NdbTableImpl&, uint col_no);
NdbTableImpl * getBlobTable(uint tab_id, uint col_no);
......@@ -616,10 +646,14 @@ public:
NdbDictInterface m_receiver;
Ndb & m_ndb;
private:
NdbIndexImpl* getIndexImpl(const char * externalName,
const BaseString& internalName,
NdbTableImpl &tab,
NdbTableImpl &prim);
NdbIndexImpl * getIndexImpl(const char * name,
const BaseString& internalName);
Ndb_local_table_info * fetchGlobalTableImpl(const BaseString& internalName);
private:
NdbTableImpl * fetchGlobalTableImplRef(const GlobalCacheInitObject &obj);
};
inline
......@@ -852,6 +886,27 @@ NdbDictionaryImpl::getImpl(const NdbDictionary::Dictionary & t){
* Inline:d getters
*/
class InitTable : public GlobalCacheInitObject
{
public:
InitTable(NdbDictionaryImpl *dict,
const BaseString &name) :
GlobalCacheInitObject(dict, name)
{}
int init(NdbTableImpl &tab) const
{
return m_dict->getBlobTables(tab);
}
};
inline
NdbTableImpl *
NdbDictionaryImpl::getTableGlobal(const char * table_name)
{
const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
return fetchGlobalTableImplRef(InitTable(this, internal_tabname));
}
inline
NdbTableImpl *
NdbDictionaryImpl::getTable(const char * table_name, void **data)
......@@ -885,21 +940,134 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
DBUG_PRINT("enter", ("table: %s", internalTableName.c_str()));
Ndb_local_table_info *info= m_localHash.get(internalTableName.c_str());
if (info == 0) {
info= fetchGlobalTableImpl(internalTableName);
if (info == 0) {
DBUG_RETURN(0);
if (info == 0)
{
NdbTableImpl *tab=
fetchGlobalTableImplRef(InitTable(this, internalTableName));
if (tab)
{
info= Ndb_local_table_info::create(tab, m_local_table_data_size);
if (info)
{
m_localHash.put(internalTableName.c_str(), info);
m_ndb.theFirstTupleId[tab->getTableId()] = ~0;
m_ndb.theLastTupleId[tab->getTableId()] = ~0;
}
}
}
DBUG_RETURN(info); // autoincrement already initialized
}
class InitIndexGlobal : public GlobalCacheInitObject
{
public:
const char *m_index_name;
NdbTableImpl &m_prim;
InitIndexGlobal(NdbDictionaryImpl *dict,
const BaseString &internal_indexname,
const char *index_name,
NdbTableImpl &prim) :
GlobalCacheInitObject(dict, internal_indexname),
m_index_name(index_name),
m_prim(prim)
{}
int init(NdbTableImpl &tab) const
{
tab.m_index= m_dict->getIndexImpl(m_index_name, m_name, tab, m_prim);
if (tab.m_index == 0)
return 1;
tab.m_index->m_table= &tab;
return 0;
}
};
class InitIndex : public GlobalCacheInitObject
{
public:
const char *m_index_name;
InitIndex(NdbDictionaryImpl *dict,
const BaseString &internal_indexname,
const char *index_name) :
GlobalCacheInitObject(dict, internal_indexname),
m_index_name(index_name)
{}
int init(NdbTableImpl &tab) const
{
DBUG_ASSERT(tab.m_index == 0);
tab.m_index= m_dict->getIndexImpl(m_index_name, m_name);
if (tab.m_index)
{
tab.m_index->m_table= &tab;
return 0;
}
return 1;
}
};
inline
NdbIndexImpl *
NdbDictionaryImpl::getIndexGlobal(const char * index_name,
NdbTableImpl &ndbtab)
{
DBUG_ENTER("NdbDictionaryImpl::getIndexGlobal");
const BaseString
internal_indexname(m_ndb.internalize_index_name(&ndbtab, index_name));
int retry= 2;
while (retry)
{
NdbTableImpl *tab=
fetchGlobalTableImplRef(InitIndexGlobal(this, internal_indexname,
index_name, ndbtab));
if (tab)
{
// tab->m_index sould be set. otherwise tab == 0
NdbIndexImpl *idx= tab->m_index;
if (idx->m_table_id != ndbtab.getObjectId() ||
idx->m_table_version != ndbtab.getObjectVersion())
{
releaseIndexGlobal(*idx, 1);
retry--;
continue;
}
DBUG_RETURN(idx);
}
break;
}
m_error.code= 4243;
DBUG_RETURN(0);
}
inline int
NdbDictionaryImpl::releaseTableGlobal(NdbTableImpl & impl, int invalidate)
{
DBUG_ENTER("NdbDictionaryImpl::releaseTableGlobal");
DBUG_PRINT("enter", ("internal_name: %s", impl.m_internalName.c_str()));
m_globalHash->lock();
m_globalHash->release(&impl, invalidate);
m_globalHash->unlock();
DBUG_RETURN(0);
}
inline int
NdbDictionaryImpl::releaseIndexGlobal(NdbIndexImpl & impl, int invalidate)
{
DBUG_ENTER("NdbDictionaryImpl::releaseIndexGlobal");
DBUG_PRINT("enter", ("internal_name: %s", impl.m_internalName.c_str()));
m_globalHash->lock();
m_globalHash->release(impl.m_table, invalidate);
m_globalHash->unlock();
DBUG_RETURN(0);
}
inline
NdbIndexImpl *
NdbDictionaryImpl::getIndex(const char * index_name,
const char * table_name)
{
if (table_name || m_ndb.usingFullyQualifiedNames())
while (table_name || m_ndb.usingFullyQualifiedNames())
{
const BaseString internal_indexname(
(table_name)
......@@ -910,18 +1078,28 @@ NdbDictionaryImpl::getIndex(const char * index_name,
if (internal_indexname.length())
{
Ndb_local_table_info * info=
get_local_table_info(internal_indexname);
if (info)
Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str());
NdbTableImpl *tab;
if (info == 0)
{
NdbTableImpl * tab= info->m_table_impl;
if (tab->m_index == 0)
tab->m_index= getIndexImpl(index_name, internal_indexname);
if (tab->m_index != 0)
tab->m_index->m_table= tab;
return tab->m_index;
tab= fetchGlobalTableImplRef(InitIndex(this, internal_indexname,
index_name));
if (tab)
{
info= Ndb_local_table_info::create(tab, 0);
if (info)
m_localHash.put(internal_indexname.c_str(), info);
else
break;
}
else
break;
}
else
tab= info->m_table_impl;
return tab->m_index;
}
break;
}
m_error.code= 4243;
......
......@@ -377,7 +377,7 @@ NdbIndexStat::stat_select(const Uint32* key1, Uint32 keylen1, const Uint32* key2
}
int
NdbIndexStat::records_in_range(NdbDictionary::Index* index, NdbIndexScanOperation* op, Uint64 table_rows, Uint64* count, int flags)
NdbIndexStat::records_in_range(const NdbDictionary::Index* index, NdbIndexScanOperation* op, Uint64 table_rows, Uint64* count, int flags)
{
DBUG_ENTER("NdbIndexStat::records_in_range");
Uint64 rows;
......
......@@ -99,6 +99,7 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
for (i = 0; i < MAX_NDB_NODES ; i++) {
theConnectionArray[i] = NULL;
}//forg
m_sys_tab_0 = NULL;
for (i = 0; i < 2048 ; i++) {
theFirstTupleId[i] = 0;
theLastTupleId[i] = 0;
......@@ -137,6 +138,9 @@ Ndb::~Ndb()
DBUG_ENTER("Ndb::~Ndb()");
DBUG_PRINT("enter",("this=0x%x",this));
if (m_sys_tab_0)
getDictionary()->removeTableGlobal(*m_sys_tab_0, 0);
assert(theImpl->m_ev_op == 0); // user should return NdbEventOperation's
for (NdbEventOperationImpl *op= theImpl->m_ev_op; op; op=op->m_next)
{
......
......@@ -109,7 +109,7 @@ int runCreateTheTable(NDBT_Context* ctx, NDBT_Step* step){
const NdbDictionary::Table* pTab = ctx->getTab();
// Try to create table in db
if (pTab->createTableInDb(pNdb) != 0){
if (NDBT_Tables::createTable(pNdb, pTab->getName()) != 0){
return NDBT_FAILED;
}
......@@ -151,7 +151,7 @@ int runCreateTableWhenDbIsFull(NDBT_Context* ctx, NDBT_Step* step){
}
// Try to create table in db
if (pTab->createTableInDb(pNdb) == 0){
if (NDBT_Tables::createTable(pNdb, pTab->getName()) == 0){
result = NDBT_FAILED;
}
......@@ -203,7 +203,7 @@ int runCreateAndDrop(NDBT_Context* ctx, NDBT_Step* step){
ndbout << i << ": ";
// Try to create table in db
if (pTab->createTableInDb(pNdb) != 0){
if (NDBT_Tables::createTable(pNdb, pTab->getName()) != 0){
return NDBT_FAILED;
}
......@@ -254,7 +254,8 @@ int runCreateAndDropWithData(NDBT_Context* ctx, NDBT_Step* step){
while (i < loops){
ndbout << i << ": ";
// Try to create table in db
if (pTab->createTableInDb(pNdb) != 0){
if (NDBT_Tables::createTable(pNdb, pTab->getName()) != 0){
return NDBT_FAILED;
}
......@@ -336,7 +337,7 @@ int runCreateAndDropDuring(NDBT_Context* ctx, NDBT_Step* step){
Ndb* pNdb = GETNDB(step);
g_debug << "Creating table" << endl;
if (pTab->createTableInDb(pNdb) != 0){
if (NDBT_Tables::createTable(pNdb, pTab->getName()) != 0){
g_err << "createTableInDb failed" << endl;
result = NDBT_FAILED;
continue;
......@@ -357,7 +358,6 @@ int runCreateAndDropDuring(NDBT_Context* ctx, NDBT_Step* step){
g_debug << "Dropping table" << endl;
if (pNdb->getDictionary()->dropTable(pTab2->getName()) != 0){
g_err << "Failed to drop "<<pTab2->getName()<<" in db" << endl;
result = NDBT_FAILED;
......
......@@ -188,6 +188,8 @@ list(const char * tabname,
ndbout_c("%-5d %-20s %-8s %-7s %s", elt.id, type, state, store, elt.name);
}
}
if (_parsable)
exit(0);
}
NDB_STD_OPTS_VARS;
......
......@@ -636,6 +636,7 @@ fi
%attr(755, root, root) %{_bindir}/ndb_desc
%attr(755, root, root) %{_bindir}/ndb_show_tables
%attr(755, root, root) %{_bindir}/ndb_test_platform
%attr(755, root, root) %{_bindir}/ndb_config
%files ndb-extra
%defattr(-,root,root,0755)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment