Commit a0c1806d authored by unknown's avatar unknown
Browse files

removing the usage if the ndb local dict cache to adress the 241 errors we get now and then

parent 1a703be5
......@@ -62,4 +62,6 @@ t4
drop table t1, t2, t3, t4;
drop table if exists t1, t3, t4;
Warnings:
Error 155 Table 'test.t1' doesn't exist
Error 155 Table 'test.t3' doesn't exist
Error 155 Table 'test.t4' doesn't exist
This diff is collapsed.
......@@ -70,8 +70,8 @@ typedef enum ndb_index_status {
typedef struct ndb_index_data {
NDB_INDEX_TYPE type;
NDB_INDEX_STATUS status;
void *index;
void *unique_index;
const NdbDictionary::Index *index;
const NdbDictionary::Index *unique_index;
unsigned char *unique_index_attrid_map;
// In this version stats are not shared between threads
NdbIndexStat* index_stat;
......@@ -560,6 +560,7 @@ class ha_ndbcluster: public handler
ha_ndbcluster(TABLE_SHARE *table);
~ha_ndbcluster();
int ha_initialise();
int open(const char *name, int mode, uint test_if_locked);
int close(void);
......@@ -708,19 +709,15 @@ private:
Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE *share);
int alter_table_name(const char *to);
static int delete_table(ha_ndbcluster *h, Ndb *ndb,
const char *path,
const char *db,
const char *table_name);
int drop_ndb_table();
int create_ndb_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
int create_unique_index(const char *name, KEY *key_info);
int create_index(const char *name, KEY *key_info,
NDB_INDEX_TYPE idx_type, uint idx_no);
int drop_ndb_index(const char *name);
int table_changed(const void *pack_frm_data, uint pack_frm_len);
// Index list management
int create_indexes(Ndb *ndb, TABLE *tab);
void clear_index(int i);
......@@ -732,7 +729,7 @@ private:
KEY *key_info, const char *index_name, uint index_no);
int initialize_autoincrement(const void *table);
int get_metadata(const char* path);
void release_metadata();
void release_metadata(THD *thd, Ndb *ndb);
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info,
......@@ -795,8 +792,6 @@ private:
void print_results();
ulonglong get_auto_increment();
int invalidate_dictionary_cache(bool global,
const NdbDictionary::Table *ndbtab);
int ndb_err(NdbTransaction*);
bool uses_blob_value();
......@@ -834,7 +829,6 @@ private:
NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor;
const NdbDictionary::Table *m_table;
int m_table_version;
struct Ndb_local_table_statistics *m_table_info;
char m_dbname[FN_HEADLEN];
//char m_schemaname[FN_HEADLEN];
......
......@@ -986,7 +986,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type,
const char *old_db, const char *old_table_name)
const char *new_db, const char *new_table_name)
{
DBUG_ENTER("ndbcluster_log_schema_op");
Thd_ndb *thd_ndb= get_thd_ndb(thd);
......@@ -1026,8 +1026,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
/* redo the rename table query as is may contain several tables */
query= tmp_buf2;
query_length= (uint) (strxmov(tmp_buf2, "rename table `",
old_db, ".", old_table_name, "` to `",
db, ".", table_name, "`", NullS) - tmp_buf2);
db, ".", table_name, "` to `",
new_db, ".", new_table_name, "`", NullS) - tmp_buf2);
type_str= "rename table";
break;
case SOT_CREATE_TABLE:
......@@ -1067,6 +1067,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
Uint64 epoch= 0;
MY_BITMAP schema_subscribers;
uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
uint32 bitbuf_e[sizeof(bitbuf)];
bzero((char *)bitbuf_e, sizeof(bitbuf_e));
{
int i, updated= 0;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
......@@ -1110,7 +1112,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
char tmp_buf[FN_REFLEN];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
const NDBTAB *ndbtab= ndbtab_g.get_table();
NdbTransaction *trans= 0;
int retries= 100;
const NDBCOL *col[SCHEMA_SIZE];
......@@ -1141,8 +1144,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
while (1)
{
const char *log_db= db;
const char *log_tab= table_name;
const char *log_subscribers= (char*)schema_subscribers.bitmap;
uint32 log_type= (uint32)type;
if ((trans= ndb->startTransaction()) == 0)
goto err;
while (1)
{
NdbOperation *op= 0;
int r= 0;
......@@ -1152,17 +1160,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
strlen(table_name));
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
strlen(log_tab));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap);
r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
DBUG_ASSERT(r == 0);
/* query */
{
......@@ -1186,8 +1194,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
DBUG_ASSERT(r == 0);
/* type */
r|= op->setValue(SCHEMA_TYPE_I, (uint32)type);
r|= op->setValue(SCHEMA_TYPE_I, log_type);
DBUG_ASSERT(r == 0);
if (log_db != new_db && new_db && new_table_name)
{
log_db= new_db;
log_tab= new_table_name;
log_subscribers= (const char *)bitbuf_e; // no ack expected on this
log_type= (uint32)SOT_RENAME_TABLE_NEW;
continue;
}
break;
}
if (trans->execute(NdbTransaction::Commit) == 0)
{
......@@ -1306,7 +1323,8 @@ ndbcluster_update_slock(THD *thd,
char tmp_buf[FN_REFLEN];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
const NDBTAB *ndbtab= ndbtab_g.get_table();
NdbTransaction *trans= 0;
int retries= 100;
const NDBCOL *col[SCHEMA_SIZE];
......@@ -1452,31 +1470,28 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
{
if (pOp->tableFrmChanged())
{
DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed"));
is_online_alter_table= TRUE;
}
else
{
DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed"));
DBUG_ASSERT(pOp->tableNameChanged());
is_rename_table= TRUE;
}
}
/*
Refresh local dictionary cache by
invalidating table and all it's indexes
*/
ndb->setDatabaseName(dbname);
Thd_ndb *thd_ndb= get_thd_ndb(thd);
DBUG_ASSERT(thd_ndb != NULL);
Ndb* old_ndb= thd_ndb->ndb;
thd_ndb->ndb= ndb;
ha_ndbcluster table_handler(table_share);
(void)strxmov(table_handler.m_dbname, dbname, NullS);
(void)strxmov(table_handler.m_tabname, tabname, NullS);
table_handler.open_indexes(ndb, table, TRUE);
table_handler.invalidate_dictionary_cache(TRUE, 0);
thd_ndb->ndb= old_ndb;
{
ndb->setDatabaseName(dbname);
Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
const NDBTAB *ev_tab= pOp->getTable();
const NDBTAB *cache_tab= ndbtab_g.get_table();
if (cache_tab &&
cache_tab->getObjectId() == ev_tab->getObjectId() &&
cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
ndbtab_g.invalidate();
}
/*
Refresh local frm file and dictionary cache if
remote on-line alter table
......@@ -1505,7 +1520,8 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
DBUG_DUMP("frm", (char*)altered_table->getFrmData(),
altered_table->getFrmLength());
pthread_mutex_lock(&LOCK_open);
const NDBTAB *old= dict->getTable(tabname);
Ndb_table_guard ndbtab_g(dict, tabname);
const NDBTAB *old= ndbtab_g.get_table();
if (!old &&
old->getObjectVersion() != altered_table->getObjectVersion())
dict->putTable(altered_table);
......@@ -1517,7 +1533,13 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
dbname, tabname, error);
}
ndbcluster_binlog_close_table(thd, share);
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
TABLE_LIST table_list;
bzero((char*) &table_list,sizeof(table_list));
table_list.db= (char *)dbname;
table_list.alias= table_list.table_name= (char *)tabname;
close_cached_tables(thd, 0, &table_list, TRUE);
if ((error= ndbcluster_binlog_open_table(thd, share,
table_share, table)))
sql_print_information("NDB: Failed to re-open table %s.%s",
......@@ -1545,26 +1567,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
share_prefix, share->table->s->db.str,
share->table->s->table_name.str,
share->key);
{
ndb->setDatabaseName(share->table->s->db.str);
Ndb_table_guard ndbtab_g(ndb->getDictionary(),
share->table->s->table_name.str);
const NDBTAB *ev_tab= pOp->getTable();
const NDBTAB *cache_tab= ndbtab_g.get_table();
if (cache_tab &&
cache_tab->getObjectId() == ev_tab->getObjectId() &&
cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
ndbtab_g.invalidate();
}
/* do the rename of the table in the share */
share->table->s->db.str= share->db;
share->table->s->db.length= strlen(share->db);
share->table->s->table_name.str= share->table_name;
share->table->s->table_name.length= strlen(share->table_name);
/*
Refresh local dictionary cache by invalidating any
old table with same name and all it's indexes
*/
ndb->setDatabaseName(dbname);
Thd_ndb *thd_ndb= get_thd_ndb(thd);
DBUG_ASSERT(thd_ndb != NULL);
Ndb* old_ndb= thd_ndb->ndb;
thd_ndb->ndb= ndb;
ha_ndbcluster table_handler(table_share);
table_handler.set_dbname(share->key);
table_handler.set_tabname(share->key);
table_handler.open_indexes(ndb, table, TRUE);
table_handler.invalidate_dictionary_cache(TRUE, 0);
thd_ndb->ndb= old_ndb;
}
DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
if (share->op_old == pOp)
......@@ -1582,14 +1600,19 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
if (is_remote_change && share && share->state != NSS_DROPPED)
{
DBUG_PRINT("info", ("remote change"));
share->state= NSS_DROPPED;
if (share->use_count != 1)
do_close_cached_tables= TRUE;
share->state= NSS_DROPPED;
free_share(&share, TRUE);
else
{
free_share(&share, TRUE);
share= 0;
}
}
else
share= 0;
pthread_mutex_unlock(&ndbcluster_mutex);
share= 0;
pOp->setCustomData(0);
pthread_mutex_lock(&injector_mutex);
......@@ -1598,7 +1621,14 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
pthread_mutex_unlock(&injector_mutex);
if (do_close_cached_tables)
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
{
TABLE_LIST table_list;
bzero((char*) &table_list,sizeof(table_list));
table_list.db= (char *)dbname;
table_list.alias= table_list.table_name= (char *)tabname;
close_cached_tables(thd, 0, &table_list);
free_share(&share);
}
DBUG_RETURN(0);
}
......@@ -1630,53 +1660,27 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
if (schema->node_id != node_id)
{
int log_query= 0, post_epoch_unlock= 0;
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
schema->query_length, schema->query));
DBUG_PRINT("info",
("%s.%s: log query_length: %d query: '%s' type: %d",
schema->db, schema->name,
schema->query_length, schema->query,
schema->type));
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
NDB_SHARE *share= get_share(key, 0, false, false);
switch ((enum SCHEMA_OP_TYPE)schema->type)
{
case SOT_DROP_TABLE:
/* binlog dropping table after any table operations */
if (share && share->op)
{
post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
}
/* table is either ignored or logging is postponed to later */
log_query= 0;
break;
// fall through
case SOT_RENAME_TABLE:
if (share && share->op)
{
post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
break; /* discovery will be handled by binlog */
}
goto sot_create_table;
// fall through
case SOT_RENAME_TABLE_NEW:
// fall through
case SOT_ALTER_TABLE:
if (share && share->op)
{
post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
break; /* discovery will be handled by binlog */
}
goto sot_create_table;
post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
break;
case SOT_CREATE_TABLE:
sot_create_table:
/*
we need to free any share here as command below
may need to call handle_trailing_share
*/
if (share)
{
free_share(&share);
share= 0;
}
pthread_mutex_lock(&LOCK_open);
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
{
......@@ -1694,12 +1698,9 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
TRUE, /* print error */
TRUE); /* don't binlog the query */
/* binlog dropping database after any table operations */
if (ndb_binlog_running)
{
post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
}
post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
break;
case SOT_CREATE_DB:
/* fall through */
......@@ -1726,8 +1727,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
pthread_mutex_unlock(&ndb_schema_object->mutex);
pthread_cond_signal(&injector_cond);
}
if (share)
free_share(&share, TRUE);
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
......@@ -1736,11 +1735,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
log_query= 1;
break;
}
if (share)
{
free_share(&share);
share= 0;
}
if (log_query && ndb_binlog_running)
{
char *thd_db_save= thd->db;
......@@ -1864,36 +1858,81 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
List<Cluster_schema>
*post_epoch_unlock_list)
{
if (post_epoch_log_list->elements == 0)
return;
DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
Cluster_schema *schema;
while ((schema= post_epoch_log_list->pop()))
{
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
schema->query_length, schema->query));
DBUG_PRINT("info",
("%s.%s: log query_length: %d query: '%s' type: %d",
schema->db, schema->name,
schema->query_length, schema->query,
schema->type));
int log_query= 0;
{
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
NDB_SHARE *share= get_share(key, 0, false, false);
switch ((enum SCHEMA_OP_TYPE)schema->type)
enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
switch (schema_type)
{
case SOT_DROP_DB:
case SOT_DROP_TABLE:
log_query= 1;
break;
case SOT_DROP_TABLE:
// invalidation already handled by binlog thread
if (share && share->op)
{
log_query= 1;
break;
}
// fall through
case SOT_RENAME_TABLE:
// fall through
case SOT_ALTER_TABLE:
if (share && share->op)
// invalidation already handled by binlog thread
if (!share || !share->op)
{
break; /* discovery handled by binlog */
{
injector_ndb->setDatabaseName(schema->db);
Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
schema->name);
ndbtab_g.invalidate();
}
TABLE_LIST table_list;
bzero((char*) &table_list,sizeof(table_list));
table_list.db= schema->db;
table_list.alias= table_list.table_name= schema->name;
close_cached_tables(thd, 0, &table_list, FALSE);
}
pthread_mutex_lock(&LOCK_open);
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
if (schema_type != SOT_ALTER_TABLE)
break;
// fall through
case SOT_RENAME_TABLE_NEW:
log_query= 1;
if (ndb_binlog_running)
{
sql_print_error("Could not discover table '%s.%s' from "
"binlog schema event '%s' from node %d",
schema->db, schema->name, schema->query,
schema->node_id);
/*
we need to free any share here as command below
may need to call handle_trailing_share
*/
if (share)
{
free_share(&share);
share= 0;
}
pthread_mutex_lock(&LOCK_open);
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
{
sql_print_error("Could not discover table '%s.%s' from "
"binlog schema event '%s' from node %d",
schema->db, schema->name, schema->query,
schema->node_id);
}
pthread_mutex_unlock(&LOCK_open);
}
pthread_mutex_unlock(&LOCK_open);
break;
default:
DBUG_ASSERT(false);
}
......@@ -1903,6 +1942,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
share= 0;
}
}
if (ndb_binlog_running && log_query)
{
char *thd_db_save= thd->db;
thd->db= schema->db;
......@@ -2186,7 +2226,8 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
ndb->setDatabaseName(db);
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *ndbtab= dict->getTable(table_name);
Ndb_table_guard ndbtab_g(dict, table_name);
const NDBTAB *ndbtab= ndbtab_g.get_table();
if (ndbtab == 0)
{
if (ndb_extra_logging)
......@@ -2201,7 +2242,8 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
event should have been created by someone else,
but let's make sure, and create if it doesn't exist
*/
if (!dict->getEvent(event_name.c_ptr()))
const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
if (!ev)
{
if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share))
{
......@@ -2216,9 +2258,12 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
event_name.c_ptr());
}
else
{
delete ev;
if (ndb_extra_logging)
sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
event_name.c_ptr());
}
/*
create the event operations for receiving logging events
......@@ -2328,8 +2373,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
try retrieving the event, if table version/id matches, we will get
a valid event. Otherwise we have a trailing event from before
*/
if (dict->getEvent(event_name))
const NDBEVENT *ev;
if ((ev= dict->getEvent(event_name)))
{
delete ev;
DBUG_RETURN(0);
}
......
......@@ -41,14 +41,15 @@ enum SCHEMA_OP_TYPE
{
SOT_DROP_TABLE= 0,
SOT_CREATE_TABLE= 1,
SOT_RENAME_TABLE= 2,
SOT_RENAME_TABLE_NEW= 2,
SOT_ALTER_TABLE= 3,
SOT_DROP_DB= 4,
SOT_CREATE_DB= 5,
SOT_ALTER_DB= 6,
SOT_CLEAR_SLOCK= 7,
SOT_TABLESPACE= 8,
SOT_LOGFILE_GROUP= 9
SOT_LOGFILE_GROUP= 9,
SOT_RENAME_TABLE= 10
};
const uint max_ndb_nodes= 64; /* multiple of 32 */
......@@ -56,6 +57,45 @@ const uint max_ndb_nodes= 64; /* multiple of 32 */
static const char *ha_ndb_ext=".ndb";
static const char share_prefix[]= "./";
class Ndb_table_guard
{
public:
Ndb_table_guard(NDBDICT *dict, const char *tabname)
: m_dict(dict)
{
DBUG_ENTER("Ndb_table_guard");
m_ndbtab= m_dict->getTableGlobal(tabname);
m_invalidate= 0;
DBUG_PRINT("info", ("m_ndbtab: %p", m_ndbtab));
DBUG_VOID_RETURN;
}
~Ndb_table_guard()
{
DBUG_ENTER("~Ndb_table_guard");
if (m_ndbtab)
{
DBUG_PRINT("info", ("m_ndbtab: %p m_invalidate: %d",
m_ndbtab, m_invalidate));
m_dict->removeTableGlobal(*m_ndbtab, m_invalidate);
}
DBUG_VOID_RETURN;
}
const NDBTAB *get_table() { return m_ndbtab; }
void invalidate() { m_invalidate= 1; }
const NDBTAB *release()
{
DBUG_ENTER("Ndb_table_guard::release");
const NDBTAB *tmp= m_ndbtab;
DBUG_PRINT("info", ("m_ndbtab: %p", m_ndbtab));
m_ndbtab = 0;
DBUG_RETURN(tmp);
}
private:
const NDBTAB *m_ndbtab;
NDBDICT *m_dict;
int m_invalidate;
};
#ifdef HAVE_NDB_BINLOG
extern pthread_t ndb_binlog_thread;
extern pthread_mutex_t injector_mutex;
......@@ -98,8 +138,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type,
const char *old_db= 0,
const char *old_table_name= 0);
const char *new_db= 0,
const char *new_table_name= 0);
int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
NDB_SHARE *share,
const char *type_str);
......
......@@ -1470,6 +1470,8 @@ public:
*
* @return tuple id or 0 on error
*/
int initAutoIncrement();
Uint64 getAutoIncrementValue(const char* aTableName,
Uint32 cacheSize = 1);
Uint64 getAutoIncrementValue(const NdbDictionary::Table * aTable,
......@@ -1694,6 +1696,7 @@ private:
// The tupleId is retreived from DB the
// tupleId is unique for each tableid.
const NdbDictionary::Table *m_sys_tab_0;
Uint64 theFirstTupleId[2048];
Uint64 theLastTupleId[2048];
......
......@@ -798,6 +798,7 @@ public:
* Get object status
*/
virtual Object::Status getObjectStatus() const;
void setStatusInvalid() const;
/**
* Get object version
......@@ -1734,6 +1735,7 @@ public:
* @return 0 if successful otherwise -1.
*/
int createIndex(const Index &index);
int createIndex(const Index &index, const Table &table);
/**
* Drop index with given name
......@@ -1805,6 +1807,15 @@ public:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
const Table * getTable(const char * name, void **data) const;
void set_local_table_data_size(unsigned sz);
const Index * getIndexGlobal(const char * indexName,
const Table &ndbtab) const;
const Table * getTableGlobal(const char * tableName) const;
int alterTableGlobal(const Table &f, const Table &t);
int dropTableGlobal(const Table &ndbtab);
int dropIndexGlobal(const Index &index);
int removeIndexGlobal(const Index &ndbidx, int invalidate) const;
int removeTableGlobal(const Table &ndbtab, int invalidate) const;
#endif
};
};
......
......@@ -56,7 +56,7 @@ public:
* multiplied by a percentage obtained from the cache (result zero is
* returned as 1).
*/
int records_in_range(NdbDictionary::Index* index,
int records_in_range(const NdbDictionary::Index* index,
NdbIndexScanOperation* op,
Uint64 table_rows,
Uint64* count,
......
......@@ -63,6 +63,7 @@ LocalDictCache::~LocalDictCache(){
Ndb_local_table_info *
LocalDictCache::get(const char * name){
ASSERT_NOT_MYSQLD;
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
return m_tableHash.getData(name, len);
......@@ -70,6 +71,7 @@ LocalDictCache::get(const char * name){
void
LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
ASSERT_NOT_MYSQLD;
assert(! is_ndb_blob_table(name));
const Uint32 id = tab_info->m_table_impl->m_id;
m_tableHash.insertKey(name, strlen(name), id, tab_info);
......@@ -77,6 +79,7 @@ LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
void
LocalDictCache::drop(const char * name){
ASSERT_NOT_MYSQLD;
assert(! is_ndb_blob_table(name));
Ndb_local_table_info *info= m_tableHash.deleteKey(name, strlen(name));
DBUG_ASSERT(info != 0);
......@@ -100,8 +103,15 @@ GlobalDictCache::~GlobalDictCache(){
Vector<TableVersion> * vers = curr->theData;
const unsigned sz = vers->size();
for(unsigned i = 0; i<sz ; i++){
if((* vers)[i].m_impl != 0)
TableVersion tv= (*vers)[i];
DBUG_PRINT(" ", ("vers[%d]: ver: %d, refCount: %d, status: %d",
i, tv.m_version, tv.m_refCount, tv.m_status));
if(tv.m_impl != 0)
{
DBUG_PRINT(" ", ("m_impl: internalname: %s",
tv.m_impl->m_internalName.c_str()));
delete (* vers)[i].m_impl;
}
}
delete curr->theData;
curr->theData= NULL;
......@@ -164,11 +174,18 @@ GlobalDictCache::get(const char * name)
TableVersion * ver = & versions->back();
switch(ver->m_status){
case OK:
if (ver->m_impl->m_status == NdbDictionary::Object::Invalid)
{
ver->m_status = DROPPED;
retreive = true; // Break loop
break;
}
ver->m_refCount++;
DBUG_PRINT("info", ("Table OK version=%x.%x refCount=%u",
ver->m_impl->m_version & 0xFFFFFF,
ver->m_impl->m_version >> 24,
ver->m_refCount));
DBUG_PRINT("info", ("Table OK tab: %p version=%x.%x refCount=%u",
ver->m_impl,
ver->m_impl->m_version & 0xFFFFFF,
ver->m_impl->m_version >> 24,
ver->m_refCount));
DBUG_RETURN(ver->m_impl);
case DROPPED:
retreive = true; // Break loop
......@@ -197,8 +214,8 @@ NdbTableImpl *
GlobalDictCache::put(const char * name, NdbTableImpl * tab)
{
DBUG_ENTER("GlobalDictCache::put");
DBUG_PRINT("enter", ("name: %s, internal_name: %s version: %x.%x",
name,
DBUG_PRINT("enter", ("tab: %p name: %s, internal_name: %s version: %x.%x",
tab, name,
tab ? tab->m_internalName.c_str() : "tab NULL",
tab ? tab->m_version & 0xFFFFFF : 0,
tab ? tab->m_version >> 24 : 0));
......@@ -264,66 +281,11 @@ GlobalDictCache::put(const char * name, NdbTableImpl * tab)
}
void
GlobalDictCache::drop(NdbTableImpl * tab)
{
DBUG_ENTER("GlobalDictCache::drop");
DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
assert(! is_ndb_blob_table(tab));
unsigned i;
const Uint32 len = strlen(tab->m_internalName.c_str());
Vector<TableVersion> * vers =
m_tableHash.getData(tab->m_internalName.c_str(), len);
if(vers == 0){
// Should always tried to retreive it first
// and thus there should be a record
abort();
}
const Uint32 sz = vers->size();
if(sz == 0){
// Should always tried to retreive it first
// and thus there should be a record
abort();
}
for(i = 0; i < sz; i++){
TableVersion & ver = (* vers)[i];
if(ver.m_impl == tab){
if(ver.m_refCount == 0 || ver.m_status == RETREIVING ||
ver.m_version != tab->m_version){
DBUG_PRINT("info", ("Dropping with refCount=%d status=%d impl=%p",
ver.m_refCount, ver.m_status, ver.m_impl));
break;
}
DBUG_PRINT("info", ("Found table to drop, i: %d, name: %s",
i, ver.m_impl->m_internalName.c_str()));
ver.m_refCount--;
ver.m_status = DROPPED;
if(ver.m_refCount == 0){
DBUG_PRINT("info", ("refCount is zero, deleting m_impl"));
delete ver.m_impl;
vers->erase(i);
}
DBUG_VOID_RETURN;
}
}
for(i = 0; i<sz; i++){
TableVersion & ver = (* vers)[i];
ndbout_c("%d: version: %d refCount: %d status: %d impl: %p",
i, ver.m_version, ver.m_refCount,
ver.m_status, ver.m_impl);
}
abort();
}
void
GlobalDictCache::release(NdbTableImpl * tab)
GlobalDictCache::release(NdbTableImpl * tab, int invalidate)
{
DBUG_ENTER("GlobalDictCache::release");
DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
DBUG_PRINT("enter", ("tab: %p internal_name: %s",
tab, tab->m_internalName.c_str()));
assert(! is_ndb_blob_table(tab));
unsigned i;
......@@ -354,6 +316,17 @@ GlobalDictCache::release(NdbTableImpl * tab)
}
ver.m_refCount--;
if (ver.m_impl->m_status == NdbDictionary::Object::Invalid || invalidate)
{
ver.m_impl->m_status = NdbDictionary::Object::Invalid;
ver.m_status = DROPPED;
}
if (ver.m_refCount == 0 && ver.m_status == DROPPED)
{
DBUG_PRINT("info", ("refCount is zero, deleting m_impl"));
delete ver.m_impl;
vers->erase(i);
}
DBUG_VOID_RETURN;
}
}
......@@ -374,6 +347,7 @@ GlobalDictCache::alter_table_rep(const char * name,
Uint32 tableVersion,
bool altered)
{
DBUG_ENTER("GlobalDictCache::alter_table_rep");
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
Vector<TableVersion> * vers =
......@@ -381,13 +355,13 @@ GlobalDictCache::alter_table_rep(const char * name,
if(vers == 0)
{
return;
DBUG_VOID_RETURN;
}
const Uint32 sz = vers->size();
if(sz == 0)
{
return;
DBUG_VOID_RETURN;
}
for(Uint32 i = 0; i < sz; i++)
......@@ -399,15 +373,16 @@ GlobalDictCache::alter_table_rep(const char * name,
ver.m_status = DROPPED;
ver.m_impl->m_status = altered ?
NdbDictionary::Object::Altered : NdbDictionary::Object::Invalid;
return;
DBUG_VOID_RETURN;
}
if(i == sz - 1 && ver.m_status == RETREIVING)
{
ver.m_impl = altered ? &f_altered_table : &f_invalid_table;
return;
DBUG_VOID_RETURN;
}
}
DBUG_VOID_RETURN;
}
template class Vector<GlobalDictCache::TableVersion>;
......@@ -63,11 +63,11 @@ public:
GlobalDictCache();
~GlobalDictCache();
NdbTableImpl * get(NdbTableImpl *tab);
NdbTableImpl * get(const char * name);
NdbTableImpl* put(const char * name, NdbTableImpl *);
void drop(NdbTableImpl *);
void release(NdbTableImpl *);
void release(NdbTableImpl *, int invalidate = 0);
void alter_table_rep(const char * name,
Uint32 tableId, Uint32 tableVersion, bool altered);
......
......@@ -901,6 +901,27 @@ Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase )
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 1) == val));
}
int Ndb::initAutoIncrement()
{
if (m_sys_tab_0)
return 0;
BaseString currentDb(getDatabaseName());
BaseString currentSchema(getDatabaseSchemaName());
setDatabaseName("sys");
setDatabaseSchemaName("def");
m_sys_tab_0 = getDictionary()->getTableGlobal("SYSTAB_0");
// Restore current name space
setDatabaseName(currentDb.c_str());
setDatabaseSchemaName(currentSchema.c_str());
return (m_sys_tab_0 == NULL);
}
Uint64
Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
{
......@@ -916,19 +937,14 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
CHECK_STATUS_MACRO_ZERO;
BaseString currentDb(getDatabaseName());
BaseString currentSchema(getDatabaseSchemaName());
if (initAutoIncrement())
goto error_return;
setDatabaseName("sys");
setDatabaseSchemaName("def");
tConnection = this->startTransaction();
if (tConnection == NULL)
goto error_return;
if (usingFullyQualifiedNames())
tOperation = tConnection->getNdbOperation("SYSTAB_0");
else
tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
tOperation = tConnection->getNdbOperation(m_sys_tab_0);
if (tOperation == NULL)
goto error_handler;
......@@ -997,20 +1013,12 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
this->closeTransaction(tConnection);
// Restore current name space
setDatabaseName(currentDb.c_str());
setDatabaseSchemaName(currentSchema.c_str());
DBUG_RETURN(ret);
error_handler:
theError.code = tConnection->theError.code;
this->closeTransaction(tConnection);
error_return:
// Restore current name space
setDatabaseName(currentDb.c_str());
setDatabaseSchemaName(currentSchema.c_str());
DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
theError.code,
tConnection ? tConnection->theError.code : -1,
......
......@@ -559,6 +559,11 @@ NdbDictionary::Table::getObjectStatus() const {
return m_impl.m_status;
}
void
NdbDictionary::Table::setStatusInvalid() const {
m_impl.m_status = NdbDictionary::Object::Invalid;
}
int
NdbDictionary::Table::getObjectVersion() const {
return m_impl.m_version;
......@@ -1330,6 +1335,11 @@ NdbDictionary::Dictionary::dropTable(Table & t){
return m_impl.dropTable(NdbTableImpl::getImpl(t));
}
int
NdbDictionary::Dictionary::dropTableGlobal(const Table & t){
return m_impl.dropTableGlobal(NdbTableImpl::getImpl(t));
}
int
NdbDictionary::Dictionary::dropTable(const char * name){
return m_impl.dropTable(name);
......@@ -1340,6 +1350,14 @@ NdbDictionary::Dictionary::alterTable(const Table & t){
return m_impl.alterTable(NdbTableImpl::getImpl(t));
}
int
NdbDictionary::Dictionary::alterTableGlobal(const Table & f,
const Table & t)
{
return m_impl.alterTableGlobal(NdbTableImpl::getImpl(f),
NdbTableImpl::getImpl(t));
}
const NdbDictionary::Table *
NdbDictionary::Dictionary::getTable(const char * name, void **data) const
{
......@@ -1349,6 +1367,40 @@ NdbDictionary::Dictionary::getTable(const char * name, void **data) const
return 0;
}
const NdbDictionary::Index *
NdbDictionary::Dictionary::getIndexGlobal(const char * indexName,
const Table &ndbtab) const
{
NdbIndexImpl * i = m_impl.getIndexGlobal(indexName,
NdbTableImpl::getImpl(ndbtab));
if(i)
return i->m_facade;
return 0;
}
const NdbDictionary::Table *
NdbDictionary::Dictionary::getTableGlobal(const char * name) const
{
NdbTableImpl * t = m_impl.getTableGlobal(name);
if(t)
return t->m_facade;
return 0;
}
int
NdbDictionary::Dictionary::removeIndexGlobal(const Index &ndbidx,
int invalidate) const
{
return m_impl.releaseIndexGlobal(NdbIndexImpl::getImpl(ndbidx), invalidate);
}
int
NdbDictionary::Dictionary::removeTableGlobal(const Table &ndbtab,
int invalidate) const
{
return m_impl.releaseTableGlobal(NdbTableImpl::getImpl(ndbtab), invalidate);
}
void NdbDictionary::Dictionary::putTable(const NdbDictionary::Table * table)
{
NdbDictionary::Table *copy_table = new NdbDictionary::Table;
......@@ -1420,6 +1472,13 @@ NdbDictionary::Dictionary::createIndex(const Index & ind)
return m_impl.createIndex(NdbIndexImpl::getImpl(ind));
}
int
NdbDictionary::Dictionary::createIndex(const Index & ind, const Table & tab)
{
return m_impl.createIndex(NdbIndexImpl::getImpl(ind),
NdbTableImpl::getImpl(tab));
}
int
NdbDictionary::Dictionary::dropIndex(const char * indexName,
const char * tableName)
......@@ -1427,6 +1486,12 @@ NdbDictionary::Dictionary::dropIndex(const char * indexName,
return m_impl.dropIndex(indexName, tableName);
}
int
NdbDictionary::Dictionary::dropIndexGlobal(const Index &ind)
{
return m_impl.dropIndexGlobal(NdbIndexImpl::getImpl(ind));
}
const NdbDictionary::Index *
NdbDictionary::Dictionary::getIndex(const char * indexName,
const char * tableName) const
......
......@@ -50,7 +50,14 @@
#define DICT_WAITFOR_TIMEOUT (7*24*60*60*1000)
#define ERR_RETURN(a,b) \
{\
DBUG_PRINT("exit", ("error %d", (a).code));\
DBUG_RETURN(b);\
}
extern Uint64 g_latest_trans_gci;
int ndb_dictionary_is_mysqld = 0;
bool
is_ndb_blob_table(const char* name, Uint32* ptab_id, Uint32* pcol_no)
......@@ -1015,7 +1022,7 @@ NdbTableImpl::get_nodes(Uint32 hashValue, const Uint16 ** nodes) const
*/
NdbIndexImpl::NdbIndexImpl() :
NdbDictionary::Index(* this),
NdbDictionary::Index(* this),
NdbDictObjectImpl(NdbDictionary::Object::OrderedIndex), m_facade(this)
{
init();
......@@ -1288,44 +1295,30 @@ NdbDictionaryImpl::~NdbDictionaryImpl()
}
}
Ndb_local_table_info *
NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
NdbTableImpl *
NdbDictionaryImpl::fetchGlobalTableImplRef(const GlobalCacheInitObject &obj)
{
DBUG_ENTER("fetchGlobalTableImplRef");
NdbTableImpl *impl;
m_globalHash->lock();
impl = m_globalHash->get(internalTableName.c_str());
impl = m_globalHash->get(obj.m_name.c_str());
m_globalHash->unlock();
if (impl == 0){
impl = m_receiver.getTable(internalTableName,
impl = m_receiver.getTable(obj.m_name.c_str(),
m_ndb.usingFullyQualifiedNames());
if (impl != 0) {
int ret = getBlobTables(*impl);
if (ret != 0) {
delete impl;
impl = 0;
}
if (impl != 0 && obj.init(*impl))
{
delete impl;
impl = 0;
}
m_globalHash->lock();
m_globalHash->put(internalTableName.c_str(), impl);
m_globalHash->put(obj.m_name.c_str(), impl);
m_globalHash->unlock();
if(impl == 0){
return 0;
}
}
Ndb_local_table_info *info=
Ndb_local_table_info::create(impl, m_local_table_data_size);
m_localHash.put(internalTableName.c_str(), info);
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
return info;
DBUG_RETURN(impl);
}
void
......@@ -2276,18 +2269,30 @@ int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)
}
// Alter the table
int ret = m_receiver.alterTable(m_ndb, impl);
if(ret == 0){
// Remove cached information and let it be refreshed at next access
int ret = alterTableGlobal(*local->m_table_impl, impl);
if(ret == 0)
{
m_globalHash->lock();
local->m_table_impl->m_status = NdbDictionary::Object::Invalid;
m_globalHash->drop(local->m_table_impl);
m_globalHash->release(local->m_table_impl, 1);
m_globalHash->unlock();
m_localHash.drop(originalInternalName);
}
DBUG_RETURN(ret);
}
int NdbDictionaryImpl::alterTableGlobal(NdbTableImpl &old_impl,
NdbTableImpl &impl)
{
DBUG_ENTER("NdbDictionaryImpl::alterTableGlobal");
// Alter the table
int ret = m_receiver.alterTable(m_ndb, impl);
old_impl.m_status = NdbDictionary::Object::Invalid;
if(ret == 0){
DBUG_RETURN(ret);
}
ERR_RETURN(getNdbError(), ret);
}
int
NdbDictInterface::alterTable(Ndb & ndb,
NdbTableImpl & impl)
......@@ -2731,6 +2736,7 @@ NdbDictionaryImpl::dropTable(const char * name)
{
DBUG_ENTER("NdbDictionaryImpl::dropTable");
DBUG_PRINT("enter",("name: %s", name));
ASSERT_NOT_MYSQLD;
NdbTableImpl * tab = getTable(name);
if(tab == 0){
DBUG_RETURN(-1);
......@@ -2743,8 +2749,7 @@ NdbDictionaryImpl::dropTable(const char * name)
DBUG_PRINT("info",("INCOMPATIBLE_VERSION internal_name: %s", internalTableName.c_str()));
m_localHash.drop(internalTableName.c_str());
m_globalHash->lock();
tab->m_status = NdbDictionary::Object::Invalid;
m_globalHash->drop(tab);
m_globalHash->release(tab, 1);
m_globalHash->unlock();
DBUG_RETURN(dropTable(name));
}
......@@ -2792,8 +2797,7 @@ NdbDictionaryImpl::dropTable(NdbTableImpl & impl)
m_localHash.drop(internalTableName);
m_globalHash->lock();
impl.m_status = NdbDictionary::Object::Invalid;
m_globalHash->drop(&impl);
m_globalHash->release(&impl, 1);
m_globalHash->unlock();
return 0;
......@@ -2802,6 +2806,50 @@ NdbDictionaryImpl::dropTable(NdbTableImpl & impl)
return ret;
}
int
NdbDictionaryImpl::dropTableGlobal(NdbTableImpl & impl)
{
int res;
const char * name = impl.getName();
DBUG_ENTER("NdbDictionaryImpl::dropTableGlobal");
DBUG_ASSERT(impl.m_status != NdbDictionary::Object::New);
DBUG_ASSERT(impl.m_indexType == NdbDictionary::Object::TypeUndefined);
List list;
if ((res = listIndexes(list, impl.m_id)) == -1){
ERR_RETURN(getNdbError(), -1);
}
for (unsigned i = 0; i < list.count; i++) {
const List::Element& element = list.elements[i];
NdbIndexImpl *idx= getIndexGlobal(element.name, impl);
if (idx == NULL)
{
ERR_RETURN(getNdbError(), -1);
}
if ((res = dropIndexGlobal(*idx)) == -1)
{
releaseIndexGlobal(*idx, 1);
ERR_RETURN(getNdbError(), -1);
}
releaseIndexGlobal(*idx, 1);
}
if (impl.m_noOfBlobs != 0) {
if (dropBlobTables(impl) != 0){
ERR_RETURN(getNdbError(), -1);
}
}
int ret = m_receiver.dropTable(impl);
impl.m_status = NdbDictionary::Object::Invalid;
if(ret == 0 || m_error.code == 709 || m_error.code == 723)
{
DBUG_RETURN(0);
}
ERR_RETURN(getNdbError(), ret);
}
int
NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t)
{
......@@ -2822,7 +2870,7 @@ NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t)
DBUG_PRINT("info", ("col %s: blob table %s: error %d",
c.m_name.c_str(), bt->m_internalName.c_str(), m_error.code));
if (! (ret == 709 || ret == 723)) // "force" mode on
DBUG_RETURN(-1);
ERR_RETURN(getNdbError(), -1);
}
// leave c.m_blobTable defined
}
......@@ -2891,8 +2939,7 @@ NdbDictionaryImpl::invalidateObject(NdbTableImpl & impl)
m_localHash.drop(internalTableName);
m_globalHash->lock();
impl.m_status = NdbDictionary::Object::Invalid;
m_globalHash->drop(&impl);
m_globalHash->release(&impl, 1);
m_globalHash->unlock();
DBUG_RETURN(0);
}
......@@ -2918,6 +2965,7 @@ NdbIndexImpl*
NdbDictionaryImpl::getIndexImpl(const char * externalName,
const BaseString& internalName)
{
ASSERT_NOT_MYSQLD;
Ndb_local_table_info * info = get_local_table_info(internalName);
if(info == 0){
m_error.code = 4243;
......@@ -2938,26 +2986,41 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
return 0;
}
return getIndexImpl(externalName, internalName, *tab, *prim);
}
NdbIndexImpl*
NdbDictionaryImpl::getIndexImpl(const char * externalName,
const BaseString& internalName,
NdbTableImpl &tab,
NdbTableImpl &prim)
{
DBUG_ENTER("NdbDictionaryImpl::getIndexImpl");
DBUG_ASSERT(tab.m_indexType != NdbDictionary::Object::TypeUndefined);
/**
* Create index impl
*/
NdbIndexImpl* idx;
if(NdbDictInterface::create_index_obj_from_table(&idx, tab, prim) == 0){
idx->m_table = tab;
if(NdbDictInterface::create_index_obj_from_table(&idx, &tab, &prim) == 0){
idx->m_table = &tab;
idx->m_externalName.assign(externalName);
idx->m_internalName.assign(internalName);
idx->m_table_id = prim.getObjectId();
idx->m_table_version = prim.getObjectVersion();
// TODO Assign idx to tab->m_index
// Don't do it right now since assign can't asign a table with index
// tab->m_index = idx;
return idx;
DBUG_RETURN(idx);
}
return 0;
DBUG_RETURN(0);
}
int
NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst,
NdbTableImpl* tab,
const NdbTableImpl* prim){
const NdbTableImpl* prim)
{
DBUG_ENTER("NdbDictInterface::create_index_obj_from_table");
NdbIndexImpl *idx = new NdbIndexImpl();
idx->m_version = tab->m_version;
idx->m_status = tab->m_status;
......@@ -3010,8 +3073,8 @@ NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst,
}
* dst = idx;
return 0;
DBUG_PRINT("exit", ("m_id: %d m_version: %d", idx->m_id, idx->m_version));
DBUG_RETURN(0);
}
/*****************************************************************
......@@ -3020,6 +3083,7 @@ NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst,
int
NdbDictionaryImpl::createIndex(NdbIndexImpl &ix)
{
ASSERT_NOT_MYSQLD;
NdbTableImpl* tab = getTable(ix.getTable());
if(tab == 0){
m_error.code = 4249;
......@@ -3029,6 +3093,12 @@ NdbDictionaryImpl::createIndex(NdbIndexImpl &ix)
return m_receiver.createIndex(m_ndb, ix, * tab);
}
int
NdbDictionaryImpl::createIndex(NdbIndexImpl &ix, NdbTableImpl &tab)
{
return m_receiver.createIndex(m_ndb, ix, tab);
}
int
NdbDictInterface::createIndex(Ndb & ndb,
const NdbIndexImpl & impl,
......@@ -3135,6 +3205,7 @@ int
NdbDictionaryImpl::dropIndex(const char * indexName,
const char * tableName)
{
ASSERT_NOT_MYSQLD;
NdbIndexImpl * idx = getIndex(indexName, tableName);
if (idx == 0) {
m_error.code = 4243;
......@@ -3152,8 +3223,7 @@ NdbDictionaryImpl::dropIndex(const char * indexName,
m_localHash.drop(internalIndexName.c_str());
m_globalHash->lock();
idx->m_table->m_status = NdbDictionary::Object::Invalid;
m_globalHash->drop(idx->m_table);
m_globalHash->release(idx->m_table, 1);
m_globalHash->unlock();
return dropIndex(indexName, tableName);
}
......@@ -3183,13 +3253,13 @@ NdbDictionaryImpl::dropIndex(NdbIndexImpl & impl, const char * tableName)
return dropIndex(indexName, tableName);
}
int ret = m_receiver.dropIndex(impl, *timpl);
if(ret == 0){
m_localHash.drop(internalIndexName.c_str());
int ret= dropIndexGlobal(impl);
if (ret == 0)
{
m_globalHash->lock();
impl.m_table->m_status = NdbDictionary::Object::Invalid;
m_globalHash->drop(impl.m_table);
m_globalHash->release(impl.m_table, 1);
m_globalHash->unlock();
m_localHash.drop(internalIndexName.c_str());
}
return ret;
}
......@@ -3198,10 +3268,26 @@ NdbDictionaryImpl::dropIndex(NdbIndexImpl & impl, const char * tableName)
return -1;
}
int
NdbDictionaryImpl::dropIndexGlobal(NdbIndexImpl & impl)
{
DBUG_ENTER("NdbDictionaryImpl::dropIndexGlobal");
int ret = m_receiver.dropIndex(impl, *impl.m_table);
impl.m_status = NdbDictionary::Object::Invalid;
if(ret == 0)
{
DBUG_RETURN(0);
}
ERR_RETURN(getNdbError(), ret);
}
int
NdbDictInterface::dropIndex(const NdbIndexImpl & impl,
const NdbTableImpl & timpl)
{
DBUG_ENTER("NdbDictInterface::dropIndex");
DBUG_PRINT("enter", ("indexId: %d indexVersion: %d",
timpl.m_id, timpl.m_version));
NdbApiSignal tSignal(m_reference);
tSignal.theReceiversBlockNumber = DBDICT;
tSignal.theVerId_signalNumber = GSN_DROP_INDX_REQ;
......@@ -3223,9 +3309,9 @@ NdbDictInterface::dropIndex(const NdbIndexImpl & impl,
errCodes);
if(m_error.code == DropIndxRef::InvalidIndexVersion) {
// Clear caches and try again
return INCOMPATIBLE_VERSION;
ERR_RETURN(m_error, INCOMPATIBLE_VERSION);
}
return r;
ERR_RETURN(m_error, r);
}
void
......@@ -3262,7 +3348,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
if(tab == 0){
DBUG_PRINT("info",("NdbDictionaryImpl::createEvent: table not found: %s",
evnt.getTableName()));
DBUG_RETURN(-1);
ERR_RETURN(getNdbError(), -1);
}
evnt.setTable(tab);
}
......@@ -3281,7 +3367,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
ndbout_c("Attr id %u in table %s not found", evnt.m_attrIds[i],
evnt.getTableName());
m_error.code= 4713;
DBUG_RETURN(-1);
ERR_RETURN(getNdbError(), -1);
}
}
......@@ -3302,7 +3388,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
table.getColumn(evnt.m_columns[i]->m_name.c_str());
if(col == 0){
m_error.code= 4247;
DBUG_RETURN(-1);
ERR_RETURN(getNdbError(), -1);
}
// Copy column definition
*evnt.m_columns[i] = *col;
......@@ -3328,7 +3414,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
for(i = 1; i<attributeList_sz; i++) {
if (evnt.m_columns[i-1]->m_attrId == evnt.m_columns[i]->m_attrId) {
m_error.code= 4258;
DBUG_RETURN(-1);
ERR_RETURN(getNdbError(), -1);
}
}
......@@ -3340,14 +3426,14 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
// NdbDictInterface m_receiver;
if (m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */) != 0)
DBUG_RETURN(-1);
ERR_RETURN(getNdbError(), -1);
// Create blob events
if (evnt.m_mergeEvents && createBlobEvents(evnt) != 0) {
int save_code = m_error.code;
(void)dropEvent(evnt.m_name.c_str());
m_error.code = save_code;
DBUG_RETURN(-1);
ERR_RETURN(getNdbError(), -1);
}
DBUG_RETURN(0);
}
......@@ -3367,7 +3453,7 @@ NdbDictionaryImpl::createBlobEvents(NdbEventImpl& evnt)
NdbEventImpl blob_evnt;
NdbBlob::getBlobEvent(blob_evnt, &evnt, &c);
if (createEvent(blob_evnt) != 0)
DBUG_RETURN(-1);
ERR_RETURN(getNdbError(), -1);
}
DBUG_RETURN(0);
}
......@@ -3418,7 +3504,7 @@ NdbDictInterface::createEvent(class Ndb & ndb,
const size_t len = strlen(evnt.m_name.c_str()) + 1;
if(len > MAX_TAB_NAME_SIZE) {
m_error.code= 4241;
DBUG_RETURN(-1);
ERR_RETURN(getNdbError(), -1);
}
w.add(SimpleProperties::StringValue, evnt.m_name.c_str());
......@@ -3442,7 +3528,7 @@ NdbDictInterface::createEvent(class Ndb & ndb,
0, -1);
if (ret) {
DBUG_RETURN(ret);
ERR_RETURN(getNdbError(), ret);
}
char *dataPtr = (char *)m_buffer.get_data();
......@@ -3468,7 +3554,7 @@ NdbDictInterface::createEvent(class Ndb & ndb,
//evnt.m_attrListBitmask != evntConf->getAttrListBitmask() ||
evnt.mi_type != evntConf->getEventType()) {
ndbout_c("ERROR*************");
DBUG_RETURN(1);
ERR_RETURN(getNdbError(), 1);
}
}
......@@ -3555,7 +3641,7 @@ NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
DBUG_ENTER("NdbDictionaryImpl::getEvent");
DBUG_PRINT("enter",("eventName= %s", eventName));
NdbEventImpl *ev = new NdbEventImpl();
NdbEventImpl *ev = new NdbEventImpl();
if (ev == NULL) {
DBUG_RETURN(NULL);
}
......@@ -3569,35 +3655,36 @@ NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
DBUG_RETURN(NULL);
}
if (tab == NULL) {
// We only have the table name with internal name
DBUG_PRINT("info",("table %s", ev->getTableName()));
Ndb_local_table_info *info;
info= get_local_table_info(ev->getTableName());
if (info == 0)
// We only have the table name with internal name
DBUG_PRINT("info",("table %s", ev->getTableName()));
if (tab == NULL)
{
tab= fetchGlobalTableImplRef(InitTable(this, ev->getTableName()));
if (tab == 0)
{
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
delete ev;
DBUG_RETURN(NULL);
}
if ((info->m_table_impl->m_status != NdbDictionary::Object::Retrieved) ||
(info->m_table_impl->m_id != ev->m_table_id) ||
(table_version_major(info->m_table_impl->m_version) !=
if ((tab->m_status != NdbDictionary::Object::Retrieved) ||
(tab->m_id != ev->m_table_id) ||
(table_version_major(tab->m_version) !=
table_version_major(ev->m_table_version)))
{
removeCachedObject(*info->m_table_impl);
info= get_local_table_info(ev->getTableName());
if (info == 0)
DBUG_PRINT("info", ("mismatch on verison in cache"));
releaseTableGlobal(*tab, 1);
tab= fetchGlobalTableImplRef(InitTable(this, ev->getTableName()));
if (tab == 0)
{
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
delete ev;
DBUG_RETURN(NULL);
}
}
tab = info->m_table_impl;
}
ev->setTable(tab);
ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));
// get the columns from the attrListBitmask
NdbTableImpl &table = *ev->m_tableImpl;
......
......@@ -35,6 +35,9 @@ is_ndb_blob_table(const char* name, Uint32* ptab_id = 0, Uint32* pcol_no = 0);
bool
is_ndb_blob_table(const class NdbTableImpl* t);
extern int ndb_dictionary_is_mysqld;
#define ASSERT_NOT_MYSQLD assert(ndb_dictionary_is_mysqld == 0)
class NdbDictObjectImpl {
public:
int m_id;
......@@ -253,6 +256,8 @@ public:
BaseString m_internalName;
BaseString m_externalName;
BaseString m_tableName;
Uint32 m_table_id;
Uint32 m_table_version;
Vector<NdbColumnImpl *> m_columns;
Vector<int> m_key_ids;
......@@ -539,6 +544,21 @@ private:
UtilBuffer m_buffer;
};
class NdbDictionaryImpl;
class GlobalCacheInitObject
{
public:
NdbDictionaryImpl *m_dict;
const BaseString &m_name;
GlobalCacheInitObject(NdbDictionaryImpl *dict,
const BaseString &name) :
m_dict(dict),
m_name(name)
{}
virtual ~GlobalCacheInitObject() {}
virtual int init(NdbTableImpl &tab) const = 0;
};
class NdbDictionaryImpl : public NdbDictionary::Dictionary {
public:
NdbDictionaryImpl(Ndb &ndb);
......@@ -558,6 +578,7 @@ public:
int removeCachedObject(NdbTableImpl &);
int createIndex(NdbIndexImpl &ix);
int createIndex(NdbIndexImpl &ix, NdbTableImpl & tab);
int dropIndex(const char * indexName,
const char * tableName);
int dropIndex(NdbIndexImpl &, const char * tableName);
......@@ -578,6 +599,15 @@ public:
int listObjects(List& list, NdbDictionary::Object::Type type);
int listIndexes(List& list, Uint32 indexId);
NdbTableImpl * getTableGlobal(const char * tableName);
NdbIndexImpl * getIndexGlobal(const char * indexName,
NdbTableImpl &ndbtab);
int alterTableGlobal(NdbTableImpl &orig_impl, NdbTableImpl &impl);
int dropTableGlobal(NdbTableImpl &);
int dropIndexGlobal(NdbIndexImpl & impl);
int releaseTableGlobal(NdbTableImpl & impl, int invalidate);
int releaseIndexGlobal(NdbIndexImpl & impl, int invalidate);
NdbTableImpl * getTable(const char * tableName, void **data= 0);
NdbTableImpl * getBlobTable(const NdbTableImpl&, uint col_no);
NdbTableImpl * getBlobTable(uint tab_id, uint col_no);
......@@ -616,10 +646,14 @@ public:
NdbDictInterface m_receiver;
Ndb & m_ndb;
private:
NdbIndexImpl* getIndexImpl(const char * externalName,
const BaseString& internalName,
NdbTableImpl &tab,
NdbTableImpl &prim);
NdbIndexImpl * getIndexImpl(const char * name,
const BaseString& internalName);
Ndb_local_table_info * fetchGlobalTableImpl(const BaseString& internalName);
private:
NdbTableImpl * fetchGlobalTableImplRef(const GlobalCacheInitObject &obj);
};
inline
......@@ -852,6 +886,27 @@ NdbDictionaryImpl::getImpl(const NdbDictionary::Dictionary & t){
* Inline:d getters
*/
class InitTable : public GlobalCacheInitObject
{
public:
InitTable(NdbDictionaryImpl *dict,
const BaseString &name) :
GlobalCacheInitObject(dict, name)
{}
int init(NdbTableImpl &tab) const
{
return m_dict->getBlobTables(tab);
}
};
inline
NdbTableImpl *
NdbDictionaryImpl::getTableGlobal(const char * table_name)
{
const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
return fetchGlobalTableImplRef(InitTable(this, internal_tabname));
}
inline
NdbTableImpl *
NdbDictionaryImpl::getTable(const char * table_name, void **data)
......@@ -885,21 +940,134 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
DBUG_PRINT("enter", ("table: %s", internalTableName.c_str()));
Ndb_local_table_info *info= m_localHash.get(internalTableName.c_str());
if (info == 0) {
info= fetchGlobalTableImpl(internalTableName);
if (info == 0) {
DBUG_RETURN(0);
if (info == 0)
{
NdbTableImpl *tab=
fetchGlobalTableImplRef(InitTable(this, internalTableName));
if (tab)
{
info= Ndb_local_table_info::create(tab, m_local_table_data_size);
if (info)
{
m_localHash.put(internalTableName.c_str(), info);
m_ndb.theFirstTupleId[tab->getTableId()] = ~0;
m_ndb.theLastTupleId[tab->getTableId()] = ~0;
}
}
}
DBUG_RETURN(info); // autoincrement already initialized
}
class InitIndexGlobal : public GlobalCacheInitObject
{
public:
const char *m_index_name;
NdbTableImpl &m_prim;
InitIndexGlobal(NdbDictionaryImpl *dict,
const BaseString &internal_indexname,
const char *index_name,
NdbTableImpl &prim) :
GlobalCacheInitObject(dict, internal_indexname),
m_index_name(index_name),
m_prim(prim)
{}
int init(NdbTableImpl &tab) const
{
tab.m_index= m_dict->getIndexImpl(m_index_name, m_name, tab, m_prim);
if (tab.m_index == 0)
return 1;
tab.m_index->m_table= &tab;
return 0;
}
};
class InitIndex : public GlobalCacheInitObject
{
public:
const char *m_index_name;
InitIndex(NdbDictionaryImpl *dict,
const BaseString &internal_indexname,
const char *index_name) :
GlobalCacheInitObject(dict, internal_indexname),
m_index_name(index_name)
{}
int init(NdbTableImpl &tab) const
{
DBUG_ASSERT(tab.m_index == 0);
tab.m_index= m_dict->getIndexImpl(m_index_name, m_name);
if (tab.m_index)
{
tab.m_index->m_table= &tab;
return 0;
}
return 1;
}
};
inline
NdbIndexImpl *
NdbDictionaryImpl::getIndexGlobal(const char * index_name,
NdbTableImpl &ndbtab)
{
DBUG_ENTER("NdbDictionaryImpl::getIndexGlobal");
const BaseString
internal_indexname(m_ndb.internalize_index_name(&ndbtab, index_name));
int retry= 2;
while (retry)
{
NdbTableImpl *tab=
fetchGlobalTableImplRef(InitIndexGlobal(this, internal_indexname,
index_name, ndbtab));
if (tab)
{
// tab->m_index sould be set. otherwise tab == 0
NdbIndexImpl *idx= tab->m_index;
if (idx->m_table_id != ndbtab.getObjectId() ||
idx->m_table_version != ndbtab.getObjectVersion())
{
releaseIndexGlobal(*idx, 1);
retry--;
continue;
}
DBUG_RETURN(idx);
}
break;
}
m_error.code= 4243;
DBUG_RETURN(0);
}
inline int
NdbDictionaryImpl::releaseTableGlobal(NdbTableImpl & impl, int invalidate)
{
DBUG_ENTER("NdbDictionaryImpl::releaseTableGlobal");
DBUG_PRINT("enter", ("internal_name: %s", impl.m_internalName.c_str()));
m_globalHash->lock();
m_globalHash->release(&impl, invalidate);
m_globalHash->unlock();
DBUG_RETURN(0);
}
inline int
NdbDictionaryImpl::releaseIndexGlobal(NdbIndexImpl & impl, int invalidate)
{
DBUG_ENTER("NdbDictionaryImpl::releaseIndexGlobal");
DBUG_PRINT("enter", ("internal_name: %s", impl.m_internalName.c_str()));
m_globalHash->lock();
m_globalHash->release(impl.m_table, invalidate);
m_globalHash->unlock();
DBUG_RETURN(0);
}
inline
NdbIndexImpl *
NdbDictionaryImpl::getIndex(const char * index_name,
const char * table_name)
{
if (table_name || m_ndb.usingFullyQualifiedNames())
while (table_name || m_ndb.usingFullyQualifiedNames())
{
const BaseString internal_indexname(
(table_name)
......@@ -910,18 +1078,28 @@ NdbDictionaryImpl::getIndex(const char * index_name,
if (internal_indexname.length())
{
Ndb_local_table_info * info=
get_local_table_info(internal_indexname);
if (info)
Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str());
NdbTableImpl *tab;
if (info == 0)
{
NdbTableImpl * tab= info->m_table_impl;
if (tab->m_index == 0)
tab->m_index= getIndexImpl(index_name, internal_indexname);
if (tab->m_index != 0)
tab->m_index->m_table= tab;
return tab->m_index;
tab= fetchGlobalTableImplRef(InitIndex(this, internal_indexname,
index_name));
if (tab)
{
info= Ndb_local_table_info::create(tab, 0);
if (info)
m_localHash.put(internal_indexname.c_str(), info);
else
break;
}
else
break;
}
else
tab= info->m_table_impl;
return tab->m_index;
}
break;
}
m_error.code= 4243;
......
......@@ -377,7 +377,7 @@ NdbIndexStat::stat_select(const Uint32* key1, Uint32 keylen1, const Uint32* key2
}
int
NdbIndexStat::records_in_range(NdbDictionary::Index* index, NdbIndexScanOperation* op, Uint64 table_rows, Uint64* count, int flags)
NdbIndexStat::records_in_range(const NdbDictionary::Index* index, NdbIndexScanOperation* op, Uint64 table_rows, Uint64* count, int flags)
{
DBUG_ENTER("NdbIndexStat::records_in_range");
Uint64 rows;
......
......@@ -99,6 +99,7 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
for (i = 0; i < MAX_NDB_NODES ; i++) {
theConnectionArray[i] = NULL;
}//forg
m_sys_tab_0 = NULL;
for (i = 0; i < 2048 ; i++) {
theFirstTupleId[i] = 0;
theLastTupleId[i] = 0;
......@@ -137,6 +138,9 @@ Ndb::~Ndb()
DBUG_ENTER("Ndb::~Ndb()");
DBUG_PRINT("enter",("this=0x%x",this));
if (m_sys_tab_0)
getDictionary()->removeTableGlobal(*m_sys_tab_0, 0);
assert(theImpl->m_ev_op == 0); // user should return NdbEventOperation's
for (NdbEventOperationImpl *op= theImpl->m_ev_op; op; op=op->m_next)
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment