Commit 3ab132bd authored by tomas@poseidon.ndb.mysql.com's avatar tomas@poseidon.ndb.mysql.com

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
parents 2b9b8446 586f00b2
DROP TABLE IF EXISTS t1,t2;
drop database if exists mysqltest;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10),
INDEX i1(attr1)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (0,0,0,"zero"),(1,1,1,"one"),(2,2,2,"two");
SELECT * FROM t1 WHERE attr1 = 1;
pk1 attr1 attr2 attr3
1 1 1 one
alter table t1 rename t2;
SELECT * FROM t2 WHERE attr1 = 1;
pk1 attr1 attr2 attr3
1 1 1 one
create database ndbtest;
alter table t2 rename ndbtest.t2;
SELECT * FROM ndbtest.t2 WHERE attr1 = 1;
pk1 attr1 attr2 attr3
1 1 1 one
drop table ndbtest.t2;
drop database ndbtest;
......@@ -47,6 +47,10 @@ master-bin.000001 # Table_map 1 # table_id: # (test.t1)
flush logs;
create table t3 (a int)ENGINE=NDB;
start slave;
let $result_pattern= '%127.0.0.1%root%master-bin.000002%slave-relay-bin.000005%Yes%Yes%0%0%None%' ;
--source include/wait_slave_status.inc
flush logs;
stop slave;
create table t2 (n int)ENGINE=NDB;
......
......@@ -28,7 +28,7 @@ rpl_ndb_commit_afterflush : BUG#19328 2006-05-04 tomas Slave timeout with COM_RE
rpl_ndb_dd_partitions : BUG#19259 2006-04-21 rpl_ndb_dd_partitions fails on s/AMD
rpl_ndb_ddl : BUG#18946 result file needs update + test needs to checked
rpl_ndb_innodb2ndb : Bug #19710 Cluster replication to partition table fails on DELETE FROM statement
rpl_ndb_log : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ
#rpl_ndb_log : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ
rpl_ndb_myisam2ndb : Bug #19710 Cluster replication to partition table fails on DELETE FROM statement
rpl_switch_stm_row_mixed : BUG#18590 2006-03-28 brian
rpl_row_blob_innodb : BUG#18980 2006-04-10 kent Test fails randomly
......
-- source include/have_ndb.inc
-- source include/not_embedded.inc
--disable_warnings
DROP TABLE IF EXISTS t1,t2;
drop database if exists mysqltest;
--enable_warnings
#
# Table rename tests
#
#
# Create a normal table with primary key
#
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10),
INDEX i1(attr1)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (0,0,0,"zero"),(1,1,1,"one"),(2,2,2,"two");
SELECT * FROM t1 WHERE attr1 = 1;
alter table t1 rename t2;
SELECT * FROM t2 WHERE attr1 = 1;
create database ndbtest;
alter table t2 rename ndbtest.t2;
SELECT * FROM ndbtest.t2 WHERE attr1 = 1;
drop table ndbtest.t2;
drop database ndbtest;
# End of 4.1 tests
......@@ -4648,7 +4648,7 @@ int ha_ndbcluster::create(const char *name,
share->db, share->table_name,
m_table->getObjectId(),
m_table->getObjectVersion(),
SOT_CREATE_TABLE);
SOT_CREATE_TABLE, 0, 0, 1);
break;
}
}
......@@ -4921,13 +4921,17 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
{
NDBDICT *dict;
char old_dbname[FN_HEADLEN];
char new_dbname[FN_HEADLEN];
char new_tabname[FN_HEADLEN];
const NDBTAB *orig_tab;
int result;
bool recreate_indexes= FALSE;
NDBDICT::List index_list;
DBUG_ENTER("ha_ndbcluster::rename_table");
DBUG_PRINT("info", ("Renaming %s to %s", from, to));
set_dbname(from, old_dbname);
set_dbname(to, new_dbname);
set_tabname(from);
set_tabname(to, new_tabname);
......@@ -4952,6 +4956,11 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
DBUG_ASSERT(r == 0);
}
#endif
if (my_strcasecmp(system_charset_info, new_dbname, old_dbname))
{
dict->listIndexes(index_list, *orig_tab);
recreate_indexes= TRUE;
}
// Change current database to that of target table
set_dbname(to);
ndb->setDatabaseName(m_dbname);
......@@ -5030,7 +5039,33 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
old_dbname, m_tabname,
ndb_table_id, ndb_table_version,
SOT_RENAME_TABLE,
m_dbname, new_tabname);
m_dbname, new_tabname, 1);
}
// If we are moving tables between databases, we need to recreate
// indexes
if (recreate_indexes)
{
for (unsigned i = 0; i < index_list.count; i++)
{
NDBDICT::List::Element& index_el = index_list.elements[i];
// Recreate any indexes not stored in the system database
if (my_strcasecmp(system_charset_info,
index_el.database, NDB_SYSTEM_DATABASE))
{
set_dbname(from);
ndb->setDatabaseName(m_dbname);
const NDBINDEX * index= dict->getIndexGlobal(index_el.name, new_tab);
DBUG_PRINT("info", ("Creating index %s/%s",
index_el.database, index->getName()));
dict->createIndex(*index, new_tab);
DBUG_PRINT("info", ("Dropping index %s/%s",
index_el.database, index->getName()));
set_dbname(from);
ndb->setDatabaseName(m_dbname);
dict->dropIndexGlobal(*index);
}
}
}
if (share)
free_share(&share);
......@@ -5053,6 +5088,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
const char *db,
const char *table_name)
{
THD *thd= current_thd;
DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
NDBDICT *dict= ndb->getDictionary();
#ifdef HAVE_NDB_BINLOG
......@@ -5084,7 +5120,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
ndb_table_version= h->m_table->getObjectVersion();
}
#endif
h->release_metadata(current_thd, ndb);
h->release_metadata(thd, ndb);
}
else
{
......@@ -5150,11 +5186,11 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
if (!IS_TMP_PREFIX(table_name) && share)
{
ndbcluster_log_schema_op(current_thd, share,
current_thd->query, current_thd->query_length,
ndbcluster_log_schema_op(thd, share,
thd->query, thd->query_length,
share->db, share->table_name,
ndb_table_id, ndb_table_version,
SOT_DROP_TABLE);
SOT_DROP_TABLE, 0, 0, 1);
}
else if (table_dropped && share && share->op) /* ndbcluster_log_schema_op
will do a force GCP */
......@@ -5733,6 +5769,7 @@ int ndbcluster_drop_database_impl(const char *path)
static void ndbcluster_drop_database(char *path)
{
THD *thd= current_thd;
DBUG_ENTER("ndbcluster_drop_database");
#ifdef HAVE_NDB_BINLOG
/*
......@@ -5750,9 +5787,9 @@ static void ndbcluster_drop_database(char *path)
#ifdef HAVE_NDB_BINLOG
char db[FN_REFLEN];
ha_ndbcluster::set_dbname(path, db);
ndbcluster_log_schema_op(current_thd, 0,
current_thd->query, current_thd->query_length,
db, "", 0, 0, SOT_DROP_DB);
ndbcluster_log_schema_op(thd, 0,
thd->query, thd->query_length,
db, "", 0, 0, SOT_DROP_DB, 0, 0, 0);
#endif
DBUG_VOID_RETURN;
}
......@@ -6831,6 +6868,7 @@ static void dbug_print_open_tables()
*/
int handle_trailing_share(NDB_SHARE *share)
{
THD *thd= current_thd;
static ulong trailing_share_id= 0;
DBUG_ENTER("handle_trailing_share");
......@@ -6841,7 +6879,7 @@ int handle_trailing_share(NDB_SHARE *share)
bzero((char*) &table_list,sizeof(table_list));
table_list.db= share->db;
table_list.alias= table_list.table_name= share->table_name;
close_cached_tables(current_thd, 0, &table_list, TRUE);
close_cached_tables(thd, 0, &table_list, TRUE);
pthread_mutex_lock(&ndbcluster_mutex);
if (!--share->use_count)
......@@ -9944,13 +9982,13 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info)
thd->query, thd->query_length,
"", info->tablespace_name,
0, 0,
SOT_TABLESPACE);
SOT_TABLESPACE, 0, 0, 0);
else
ndbcluster_log_schema_op(thd, 0,
thd->query, thd->query_length,
"", info->logfile_group_name,
0, 0,
SOT_LOGFILE_GROUP);
SOT_LOGFILE_GROUP, 0, 0, 0);
#endif
DBUG_RETURN(FALSE);
......
......@@ -39,6 +39,12 @@
#define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
#define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE
/*
Timeout for syncing schema events between
mysql servers, and between mysql server and the binlog
*/
const int opt_ndb_sync_timeout= 120;
/*
Flag showing if the ndb injector thread is running, if so == 1
-1 if it was started but later stopped for some reason
......@@ -498,6 +504,7 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
{
case LOGCOM_CREATE_TABLE:
type= SOT_CREATE_TABLE;
DBUG_ASSERT(FALSE);
break;
case LOGCOM_ALTER_TABLE:
type= SOT_ALTER_TABLE;
......@@ -505,9 +512,11 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
break;
case LOGCOM_RENAME_TABLE:
type= SOT_RENAME_TABLE;
DBUG_ASSERT(FALSE);
break;
case LOGCOM_DROP_TABLE:
type= SOT_DROP_TABLE;
DBUG_ASSERT(FALSE);
break;
case LOGCOM_CREATE_DB:
type= SOT_CREATE_DB;
......@@ -519,12 +528,14 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
break;
case LOGCOM_DROP_DB:
type= SOT_DROP_DB;
DBUG_ASSERT(FALSE);
break;
}
if (log)
{
ndbcluster_log_schema_op(thd, 0, query, query_length,
db, table_name, 0, 0, type);
db, table_name, 0, 0, type,
0, 0, 0);
}
DBUG_VOID_RETURN;
}
......@@ -961,6 +972,154 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
return buf;
}
/*
acknowledge handling of schema operation
*/
static int
ndbcluster_update_slock(THD *thd,
const char *db,
const char *table_name)
{
DBUG_ENTER("ndbcluster_update_slock");
if (!schema_share)
{
DBUG_RETURN(0);
}
const NdbError *ndb_error= 0;
uint32 node_id= g_ndb_cluster_connection->node_id();
Ndb *ndb= check_ndb_in_thd(thd);
char save_db[FN_HEADLEN];
strcpy(save_db, ndb->getDatabaseName());
char tmp_buf[FN_REFLEN];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
const NDBTAB *ndbtab= ndbtab_g.get_table();
NdbTransaction *trans= 0;
int retries= 100;
const NDBCOL *col[SCHEMA_SIZE];
unsigned sz[SCHEMA_SIZE];
MY_BITMAP slock;
uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
if (ndbtab == 0)
{
abort();
DBUG_RETURN(0);
}
{
uint i;
for (i= 0; i < SCHEMA_SIZE; i++)
{
col[i]= ndbtab->getColumn(i);
if (i != SCHEMA_QUERY_I)
{
sz[i]= col[i]->getLength();
DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
}
}
}
while (1)
{
if ((trans= ndb->startTransaction()) == 0)
goto err;
{
NdbOperation *op= 0;
int r= 0;
/* read the bitmap exlusive */
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->readTupleExclusive();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
strlen(table_name));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
DBUG_ASSERT(r == 0);
}
if (trans->execute(NdbTransaction::NoCommit))
goto err;
bitmap_clear_bit(&slock, node_id);
{
NdbOperation *op= 0;
int r= 0;
/* now update the tuple */
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->updateTuple();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
strlen(table_name));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
DBUG_ASSERT(r == 0);
/* node_id */
r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
DBUG_ASSERT(r == 0);
/* type */
r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
DBUG_ASSERT(r == 0);
}
if (trans->execute(NdbTransaction::Commit) == 0)
{
dict->forceGCPWait();
DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
node_id, db, table_name));
break;
}
err:
const NdbError *this_error= trans ?
&trans->getNdbError() : &ndb->getNdbError();
if (this_error->status == NdbError::TemporaryError)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
continue; // retry
}
}
ndb_error= this_error;
break;
}
end:
if (ndb_error)
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb_error->code,
ndb_error->message,
"Could not release lock on '%s.%s'",
db, table_name);
if (trans)
ndb->closeTransaction(trans);
ndb->setDatabaseName(save_db);
DBUG_RETURN(0);
}
/*
log query in schema table
*/
......@@ -995,7 +1154,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type,
const char *new_db, const char *new_table_name)
const char *new_db, const char *new_table_name,
int have_lock_open)
{
DBUG_ENTER("ndbcluster_log_schema_op");
Thd_ndb *thd_ndb= get_thd_ndb(thd);
......@@ -1076,8 +1236,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
Uint64 epoch= 0;
MY_BITMAP schema_subscribers;
uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
uint32 bitbuf_e[sizeof(bitbuf)];
bzero((char *)bitbuf_e, sizeof(bitbuf_e));
char bitbuf_e[sizeof(bitbuf)];
bzero(bitbuf_e, sizeof(bitbuf_e));
{
int i, updated= 0;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
......@@ -1096,7 +1256,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
}
(void) pthread_mutex_unlock(&schema_share->mutex);
if (updated)
{
bitmap_clear_bit(&schema_subscribers, node_id);
/*
if setting own acknowledge bit it is important that
no other mysqld's are registred, as subsequent code
will cause the original event to be hidden (by blob
merge event code)
*/
if (bitmap_is_clear_all(&schema_subscribers))
bitmap_set_bit(&schema_subscribers, node_id);
}
else
bitmap_clear_all(&schema_subscribers);
......@@ -1209,7 +1379,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
{
log_db= new_db;
log_tab= new_table_name;
log_subscribers= (const char *)bitbuf_e; // no ack expected on this
log_subscribers= bitbuf_e; // no ack expected on this
log_type= (uint32)SOT_RENAME_TABLE_NEW;
continue;
}
......@@ -1217,7 +1387,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
}
if (trans->execute(NdbTransaction::Commit) == 0)
{
dict->forceGCPWait();
DBUG_PRINT("info", ("logged: %s", query));
break;
}
......@@ -1238,7 +1407,7 @@ err:
}
end:
if (ndb_error)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb_error->code,
ndb_error->message,
......@@ -1254,8 +1423,22 @@ end:
if (ndb_error == 0 &&
!bitmap_is_clear_all(&schema_subscribers))
{
int max_timeout= 10;
/*
if own nodeid is set we are a single mysqld registred
as an optimization we update the slock directly
*/
if (bitmap_is_set(&schema_subscribers, node_id))
ndbcluster_update_slock(thd, db, table_name);
else
dict->forceGCPWait();
int max_timeout= opt_ndb_sync_timeout;
(void) pthread_mutex_lock(&ndb_schema_object->mutex);
if (have_lock_open)
{
safe_mutex_assert_owner(&LOCK_open);
(void) pthread_mutex_unlock(&LOCK_open);
}
while (1)
{
struct timespec abstime;
......@@ -1265,7 +1448,8 @@ end:
int ret= pthread_cond_timedwait(&injector_cond,
&ndb_schema_object->mutex,
&abstime);
if (thd->killed)
break;
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < no_storage_nodes; i++)
{
......@@ -1300,6 +1484,10 @@ end:
"distributing", ndb_schema_object->key);
}
}
if (have_lock_open)
{
(void) pthread_mutex_lock(&LOCK_open);
}
(void) pthread_mutex_unlock(&ndb_schema_object->mutex);
}
......@@ -1309,154 +1497,6 @@ end:
DBUG_RETURN(0);
}
/*
acknowledge handling of schema operation
*/
static int
ndbcluster_update_slock(THD *thd,
const char *db,
const char *table_name)
{
DBUG_ENTER("ndbcluster_update_slock");
if (!schema_share)
{
DBUG_RETURN(0);
}
const NdbError *ndb_error= 0;
uint32 node_id= g_ndb_cluster_connection->node_id();
Ndb *ndb= check_ndb_in_thd(thd);
char save_db[FN_HEADLEN];
strcpy(save_db, ndb->getDatabaseName());
char tmp_buf[FN_REFLEN];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
const NDBTAB *ndbtab= ndbtab_g.get_table();
NdbTransaction *trans= 0;
int retries= 100;
const NDBCOL *col[SCHEMA_SIZE];
unsigned sz[SCHEMA_SIZE];
MY_BITMAP slock;
uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
if (ndbtab == 0)
{
abort();
DBUG_RETURN(0);
}
{
uint i;
for (i= 0; i < SCHEMA_SIZE; i++)
{
col[i]= ndbtab->getColumn(i);
if (i != SCHEMA_QUERY_I)
{
sz[i]= col[i]->getLength();
DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
}
}
}
while (1)
{
if ((trans= ndb->startTransaction()) == 0)
goto err;
{
NdbOperation *op= 0;
int r= 0;
/* read the bitmap exlusive */
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->readTupleExclusive();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
strlen(table_name));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
DBUG_ASSERT(r == 0);
}
if (trans->execute(NdbTransaction::NoCommit))
goto err;
bitmap_clear_bit(&slock, node_id);
{
NdbOperation *op= 0;
int r= 0;
/* now update the tuple */
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->updateTuple();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
strlen(table_name));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
DBUG_ASSERT(r == 0);
/* node_id */
r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
DBUG_ASSERT(r == 0);
/* type */
r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
DBUG_ASSERT(r == 0);
}
if (trans->execute(NdbTransaction::Commit) == 0)
{
dict->forceGCPWait();
DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
node_id, db, table_name));
break;
}
err:
const NdbError *this_error= trans ?
&trans->getNdbError() : &ndb->getNdbError();
if (this_error->status == NdbError::TemporaryError)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
continue; // retry
}
}
ndb_error= this_error;
break;
}
end:
if (ndb_error)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb_error->code,
ndb_error->message,
"Could not release lock on '%s.%s'",
db, table_name);
if (trans)
ndb->closeTransaction(trans);
ndb->setDatabaseName(save_db);
DBUG_RETURN(0);
}
/*
Handle _non_ data events from the storage nodes
*/
......@@ -1680,17 +1720,26 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false);
uint node_id= g_ndb_cluster_connection->node_id();
ndbcluster_get_schema(tmp_share, schema);
enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
DBUG_PRINT("info",
("%s.%s: log query_length: %d query: '%s' type: %d",
schema->db, schema->name,
schema->query_length, schema->query,
schema_type));
if (schema_type == SOT_CLEAR_SLOCK)
{
/*
handle slock after epoch is completed to ensure that
schema events get inserted in the binlog after any data
events
*/
post_epoch_log_list->push_back(schema, mem_root);
DBUG_RETURN(0);
}
if (schema->node_id != node_id)
{
int log_query= 0, post_epoch_unlock= 0;
DBUG_PRINT("info",
("%s.%s: log query_length: %d query: '%s' type: %d",
schema->db, schema->name,
schema->query_length, schema->query,
schema->type));
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
switch ((enum SCHEMA_OP_TYPE)schema->type)
switch (schema_type)
{
case SOT_DROP_TABLE:
// fall through
......@@ -1738,30 +1787,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
TRUE, /* print error */
FALSE); /* binlog the query */
break;
case SOT_CLEAR_SLOCK:
{
pthread_mutex_lock(&ndbcluster_mutex);
NDB_SCHEMA_OBJECT *ndb_schema_object=
(NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
(byte*) key, strlen(key));
if (ndb_schema_object)
{
pthread_mutex_lock(&ndb_schema_object->mutex);
memcpy(ndb_schema_object->slock, schema->slock,
sizeof(ndb_schema_object->slock));
DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
(char*)ndb_schema_object->slock_bitmap.bitmap,
no_bytes_in_map(&ndb_schema_object->slock_bitmap));
pthread_mutex_unlock(&ndb_schema_object->mutex);
pthread_cond_signal(&injector_cond);
}
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
case SOT_TABLESPACE:
case SOT_LOGFILE_GROUP:
log_query= 1;
break;
case SOT_CLEAR_SLOCK:
abort();
}
if (log_query && ndb_binlog_running)
{
......@@ -1902,10 +1933,30 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
schema->type));
int log_query= 0;
{
enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
if (schema_type == SOT_CLEAR_SLOCK)
{
pthread_mutex_lock(&ndbcluster_mutex);
NDB_SCHEMA_OBJECT *ndb_schema_object=
(NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
(byte*) key, strlen(key));
if (ndb_schema_object)
{
pthread_mutex_lock(&ndb_schema_object->mutex);
memcpy(ndb_schema_object->slock, schema->slock,
sizeof(ndb_schema_object->slock));
DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
(char*)ndb_schema_object->slock_bitmap.bitmap,
no_bytes_in_map(&ndb_schema_object->slock_bitmap));
pthread_mutex_unlock(&ndb_schema_object->mutex);
pthread_cond_signal(&injector_cond);
}
pthread_mutex_unlock(&ndbcluster_mutex);
continue;
}
NDB_SHARE *share= get_share(key, 0, false, false);
enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
switch (schema_type)
{
case SOT_DROP_DB:
......@@ -2328,6 +2379,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
const char *event_name, NDB_SHARE *share,
int push_warning)
{
THD *thd= current_thd;
DBUG_ENTER("ndbcluster_create_event");
DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
ndbtab->getName(), ndbtab->getObjectVersion(),
......@@ -2357,7 +2409,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
"with BLOB attribute and no PK is not supported",
share->key);
if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_ILLEGAL_HA_CREATE_OPTION,
ER(ER_ILLEGAL_HA_CREATE_OPTION),
ndbcluster_hton_name,
......@@ -2401,7 +2453,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
failed, print a warning
*/
if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code,
dict->getNdbError().message, "NDB");
......@@ -2429,7 +2481,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
dict->dropEvent(my_event.getName()))
{
if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code,
dict->getNdbError().message, "NDB");
......@@ -2448,7 +2500,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
if (dict->createEvent(my_event))
{
if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code,
dict->getNdbError().message, "NDB");
......@@ -2461,7 +2513,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
DBUG_RETURN(-1);
}
#ifdef NDB_BINLOG_EXTRA_WARNINGS
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
0, "NDB Binlog: Removed trailing event",
"NDB");
......@@ -2490,6 +2542,7 @@ int
ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
const char *event_name)
{
THD *thd= current_thd;
/*
we are in either create table or rename table so table should be
locked, hence we can work with the share without locks
......@@ -2563,7 +2616,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
{
sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
" %s",event_name);
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb->getNdbError().code,
ndb->getNdbError().message,
......@@ -2613,7 +2666,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
sql_print_error("NDB Binlog: Creating NdbEventOperation"
" blob field %u handles failed (code=%d) for %s",
j, op->getNdbError().code, event_name);
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
op->getNdbError().code,
op->getNdbError().message,
......@@ -2650,7 +2703,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
retries= 0;
if (retries == 0)
{
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
op->getNdbError().code, op->getNdbError().message,
"NDB");
......@@ -2698,6 +2751,7 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
NDB_SHARE *share, const char *type_str)
{
DBUG_ENTER("ndbcluster_handle_drop_table");
THD *thd= current_thd;
NDBDICT *dict= ndb->getDictionary();
if (event_name && dict->dropEvent(event_name))
......@@ -2705,7 +2759,7 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
if (dict->getNdbError().code != 4710)
{
/* drop event failed for some reason, issue a warning */
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code,
dict->getNdbError().message, "NDB");
......@@ -2743,10 +2797,14 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
these out of order, thus we are keeping the SYNC_DROP_ defined
for now.
*/
const char *save_proc_info= thd->proc_info;
#define SYNC_DROP_
#ifdef SYNC_DROP_
thd->proc_info= "Syncing ndb table schema operation and binlog";
(void) pthread_mutex_lock(&share->mutex);
int max_timeout= 10;
safe_mutex_assert_owner(&LOCK_open);
(void) pthread_mutex_unlock(&LOCK_open);
int max_timeout= opt_ndb_sync_timeout;
while (share->op)
{
struct timespec abstime;
......@@ -2754,7 +2812,8 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
int ret= pthread_cond_timedwait(&injector_cond,
&share->mutex,
&abstime);
if (share->op == 0)
if (thd->killed ||
share->op == 0)
break;
if (ret)
{
......@@ -2770,6 +2829,7 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
type_str, share->key);
}
}
(void) pthread_mutex_lock(&LOCK_open);
(void) pthread_mutex_unlock(&share->mutex);
#else
(void) pthread_mutex_lock(&share->mutex);
......@@ -2777,6 +2837,7 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
share->op= 0;
(void) pthread_mutex_unlock(&share->mutex);
#endif
thd->proc_info= save_proc_info;
DBUG_RETURN(0);
}
......
......@@ -138,8 +138,9 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type,
const char *new_db= 0,
const char *new_table_name= 0);
const char *new_db,
const char *new_table_name,
int have_lock_open);
int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
NDB_SHARE *share,
const char *type_str);
......
......@@ -1002,6 +1002,9 @@ typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
#define WAITFOR_RESPONSE_TIMEOUT 120000 // Milliseconds
#endif
#define NDB_SYSTEM_DATABASE "sys"
#define NDB_SYSTEM_SCHEMA "def"
/**
* @class Ndb
* @brief Represents the NDB kernel and is the main class of the NDB API.
......@@ -1672,6 +1675,8 @@ private:
const char * externalizeIndexName(const char * internalIndexName,
bool fullyQualifiedNames);
const char * externalizeIndexName(const char * internalIndexName);
const BaseString old_internalize_index_name(const NdbTableImpl * table,
const char * external_name) const;
const BaseString internalize_index_name(const NdbTableImpl * table,
const char * external_name) const;
......
......@@ -1635,6 +1635,16 @@ public:
int listIndexes(List & list, const char * tableName);
int listIndexes(List & list, const char * tableName) const;
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
* Fetch list of indexes of given table.
* @param list Reference to list where to store the listed indexes
* @param table Reference to table that index belongs to.
* @return 0 if successful, otherwise -1
*/
int listIndexes(List & list, const Table &table) const;
#endif
/** @} *******************************************************************/
/**
* @name Events
......
......@@ -1311,6 +1311,35 @@ Ndb::internalize_table_name(const char *external_name) const
DBUG_RETURN(ret);
}
const BaseString
Ndb::old_internalize_index_name(const NdbTableImpl * table,
const char * external_name) const
{
BaseString ret;
DBUG_ENTER("old_internalize_index_name");
DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
external_name, table ? table->m_id : ~0));
if (!table)
{
DBUG_PRINT("error", ("!table"));
DBUG_RETURN(ret);
}
if (fullyQualifiedNames)
{
/* Internal index name format <db>/<schema>/<tabid>/<table> */
ret.assfmt("%s%d%c%s",
theImpl->m_prefix.c_str(),
table->m_id,
table_name_separator,
external_name);
}
else
ret.assign(external_name);
DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
DBUG_RETURN(ret);
}
const BaseString
Ndb::internalize_index_name(const NdbTableImpl * table,
......@@ -1328,9 +1357,9 @@ Ndb::internalize_index_name(const NdbTableImpl * table,
if (fullyQualifiedNames)
{
/* Internal index name format <db>/<schema>/<tabid>/<table> */
/* Internal index name format sys/def/<tabid>/<table> */
ret.assfmt("%s%d%c%s",
theImpl->m_prefix.c_str(),
theImpl->m_systemPrefix.c_str(),
table->m_id,
table_name_separator,
external_name);
......
......@@ -1618,6 +1618,14 @@ NdbDictionary::Dictionary::listIndexes(List& list,
return m_impl.listIndexes(list, tab->getTableId());
}
int
NdbDictionary::Dictionary::listIndexes(List& list,
const NdbDictionary::Table &table) const
{
return m_impl.listIndexes(list, table.getTableId());
}
const struct NdbError &
NdbDictionary::Dictionary::getNdbError() const {
return m_impl.getNdbError();
......
......@@ -56,7 +56,6 @@
DBUG_RETURN(b);\
}
extern Uint64 g_latest_trans_gci;
int ndb_dictionary_is_mysqld = 0;
bool
......@@ -1509,9 +1508,21 @@ NdbTableImpl *
NdbDictionaryImpl::getIndexTable(NdbIndexImpl * index,
NdbTableImpl * table)
{
const char *current_db= m_ndb.getDatabaseName();
NdbTableImpl *index_table;
const BaseString internalName(
m_ndb.internalize_index_name(table, index->getName()));
return getTable(m_ndb.externalizeTableName(internalName.c_str()));
// Get index table in system database
m_ndb.setDatabaseName(NDB_SYSTEM_DATABASE);
index_table= getTable(m_ndb.externalizeTableName(internalName.c_str()));
m_ndb.setDatabaseName(current_db);
if (!index_table)
{
// Index table not found
// Try geting index table in current database (old format)
index_table= getTable(m_ndb.externalizeTableName(internalName.c_str()));
}
return index_table;
}
#if 0
......@@ -4223,7 +4234,6 @@ NdbDictInterface::execWAIT_GCP_CONF(NdbApiSignal* signal,
{
const WaitGCPConf * const conf=
CAST_CONSTPTR(WaitGCPConf, signal->getDataPtr());
g_latest_trans_gci= conf->gcp;
m_waiter.signal(NO_WAIT);
}
......
......@@ -1020,6 +1020,33 @@ NdbDictionaryImpl::getIndexGlobal(const char * index_name,
}
break;
}
{
// Index not found, try old format
const BaseString
old_internal_indexname(m_ndb.old_internalize_index_name(&ndbtab,
index_name));
retry= 2;
while (retry)
{
NdbTableImpl *tab=
fetchGlobalTableImplRef(InitIndex(old_internal_indexname,
index_name, ndbtab));
if (tab)
{
// tab->m_index sould be set. otherwise tab == 0
NdbIndexImpl *idx= tab->m_index;
if (idx->m_table_id != (unsigned)ndbtab.getObjectId() ||
idx->m_table_version != (unsigned)ndbtab.getObjectVersion())
{
releaseIndexGlobal(*idx, 1);
retry--;
continue;
}
DBUG_RETURN(idx);
}
break;
}
}
m_error.code= 4243;
DBUG_RETURN(0);
}
......@@ -1086,17 +1113,41 @@ NdbDictionaryImpl::getIndex(const char* index_name,
index_name,
prim));
if (!tab)
goto err;
goto retry;
info= Ndb_local_table_info::create(tab, 0);
if (!info)
goto err;
goto retry;
m_localHash.put(internal_indexname.c_str(), info);
}
else
tab= info->m_table_impl;
return tab->m_index;
retry:
// Index not found, try fetching it from current database
const BaseString
old_internal_indexname(m_ndb.old_internalize_index_name(&prim, index_name));
info= m_localHash.get(old_internal_indexname.c_str());
if (info == 0)
{
tab= fetchGlobalTableImplRef(InitIndex(old_internal_indexname,
index_name,
prim));
if (!tab)
goto err;
info= Ndb_local_table_info::create(tab, 0);
if (!info)
goto err;
m_localHash.put(old_internal_indexname.c_str(), info);
}
else
tab= info->m_table_impl;
return tab->m_index;
err:
m_error.code= 4243;
......
......@@ -93,6 +93,8 @@ public:
m_schemaname.c_str(), table_name_separator);
}
BaseString m_systemPrefix; // Buffer for preformatted for <sys>/<def>/
/**
* NOTE free lists must be _after_ theNdbObjectIdMap take
* assure that destructors are run in correct order
......
......@@ -32,6 +32,8 @@
#include <signaldata/TcKeyFailConf.hpp>
#include <signaldata/TcHbRep.hpp>
Uint64 g_latest_trans_gci = 0;
/*****************************************************************************
NdbTransaction( Ndb* aNdb );
......@@ -1568,6 +1570,9 @@ NdbTransaction::receiveTC_COMMITCONF(const TcCommitConf * commitConf)
theCommitStatus = Committed;
theCompletionStatus = CompletedSuccess;
theGlobalCheckpointId = commitConf->gci;
// theGlobalCheckpointId == 0 if NoOp transaction
if (theGlobalCheckpointId)
g_latest_trans_gci = theGlobalCheckpointId;
return 0;
} else {
#ifdef NDB_NO_DROPPED_SIGNAL
......@@ -1746,6 +1751,8 @@ from other transactions.
if (tCommitFlag == 1) {
theCommitStatus = Committed;
theGlobalCheckpointId = tGCI;
assert(tGCI);
g_latest_trans_gci = tGCI;
} else if ((tNoComp >= tNoSent) &&
(theLastExecOpInList->theCommitIndicator == 1)){
......@@ -1922,6 +1929,8 @@ NdbTransaction::receiveTCINDXCONF(const TcIndxConf * indxConf,
if (tCommitFlag == 1) {
theCommitStatus = Committed;
theGlobalCheckpointId = tGCI;
assert(tGCI);
g_latest_trans_gci = tGCI;
} else if ((tNoComp >= tNoSent) &&
(theLastExecOpInList->theCommitIndicator == 1)){
/**********************************************************************/
......
......@@ -46,7 +46,6 @@
#include <EventLogger.hpp>
extern EventLogger g_eventLogger;
Uint64 g_latest_trans_gci= 0;
/******************************************************************************
* int init( int aNrOfCon, int aNrOfOp );
......@@ -367,7 +366,6 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tCon = void2con(tFirstDataPtr);
if ((tCon->checkMagicNumber() == 0) &&
(tCon->theSendStatus == NdbTransaction::sendTC_OP)) {
g_latest_trans_gci= keyConf->gci;
tReturnCode = tCon->receiveTCKEYCONF(keyConf, tLen);
if (tReturnCode != -1) {
completedTransaction(tCon);
......@@ -520,7 +518,6 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tCon = void2con(tFirstDataPtr);
if ((tCon->checkMagicNumber() == 0) &&
(tCon->theSendStatus == NdbTransaction::sendTC_COMMIT)) {
g_latest_trans_gci= commitConf->gci;
tReturnCode = tCon->receiveTC_COMMITCONF(commitConf);
if (tReturnCode != -1) {
completedTransaction(tCon);
......@@ -855,7 +852,6 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tCon = void2con(tFirstDataPtr);
if ((tCon->checkMagicNumber() == 0) &&
(tCon->theSendStatus == NdbTransaction::sendTC_OP)) {
g_latest_trans_gci= indxConf->gci;
tReturnCode = tCon->receiveTCINDXCONF(indxConf, tLen);
if (tReturnCode != -1) {
completedTransaction(tCon);
......
......@@ -219,6 +219,9 @@ NdbImpl::NdbImpl(Ndb_cluster_connection *ndb_cluster_connection,
}
m_optimized_node_selection=
m_ndb_cluster_connection.m_optimized_node_selection;
m_systemPrefix.assfmt("%s%c%s%c", NDB_SYSTEM_DATABASE, table_name_separator,
NDB_SYSTEM_SCHEMA, table_name_separator);
}
NdbImpl::~NdbImpl()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment