Commit eaba4f75 authored by unknown's avatar unknown

Merge perch.ndb.mysql.com:/home/jonas/src/51-jonas

into  perch.ndb.mysql.com:/home/jonas/src/mysql-5.1-new-ndb

parents 5b814706 db916d90
...@@ -183,3 +183,9 @@ connection master; ...@@ -183,3 +183,9 @@ connection master;
DROP TABLE IF EXISTS test.t1; DROP TABLE IF EXISTS test.t1;
DROP TABLE IF EXISTS test.t2; DROP TABLE IF EXISTS test.t2;
# ensure cleanup on slave as well:
# ndb blob tables consist of several tables
# if cluster is shutdown while not all tables are
# properly dropped, the table becomes inconsistent
# and wrecks later test cases
--sync_slave_with_master
...@@ -4480,6 +4480,21 @@ int ha_ndbcluster::create(const char *name, ...@@ -4480,6 +4480,21 @@ int ha_ndbcluster::create(const char *name,
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} }
#ifdef HAVE_NDB_BINLOG
/*
Don't allow table creation unless
schema distribution table is setup
( unless it is a creation of the schema dist table itself )
*/
if (!schema_share &&
!(strcmp(m_dbname, NDB_REP_DB) == 0 &&
strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0))
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
#endif /* HAVE_NDB_BINLOG */
DBUG_PRINT("table", ("name: %s", m_tabname)); DBUG_PRINT("table", ("name: %s", m_tabname));
tab.setName(m_tabname); tab.setName(m_tabname);
tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE)); tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));
...@@ -5004,7 +5019,8 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) ...@@ -5004,7 +5019,8 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
is_old_table_tmpfile= 0; is_old_table_tmpfile= 0;
String event_name(INJECTOR_EVENT_LEN); String event_name(INJECTOR_EVENT_LEN);
ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0); ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0);
ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share); ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share,
"rename table");
} }
if (!result && !IS_TMP_PREFIX(new_tabname)) if (!result && !IS_TMP_PREFIX(new_tabname))
...@@ -5088,6 +5104,15 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ...@@ -5088,6 +5104,15 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
NDBDICT *dict= ndb->getDictionary(); NDBDICT *dict= ndb->getDictionary();
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/*
Don't allow drop table unless
schema distribution table is setup
*/
if (!schema_share)
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
NDB_SHARE *share= get_share(path, 0, false); NDB_SHARE *share= get_share(path, 0, false);
#endif #endif
...@@ -5156,7 +5181,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ...@@ -5156,7 +5181,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0); ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0);
ndbcluster_handle_drop_table(ndb, ndbcluster_handle_drop_table(ndb,
table_dropped ? event_name.c_ptr() : 0, table_dropped ? event_name.c_ptr() : 0,
share); share, "delete table");
} }
if (share) if (share)
...@@ -5185,6 +5210,18 @@ int ha_ndbcluster::delete_table(const char *name) ...@@ -5185,6 +5210,18 @@ int ha_ndbcluster::delete_table(const char *name)
set_dbname(name); set_dbname(name);
set_tabname(name); set_tabname(name);
#ifdef HAVE_NDB_BINLOG
/*
Don't allow drop table unless
schema distribution table is setup
*/
if (!schema_share)
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
#endif
if (check_ndb_connection()) if (check_ndb_connection())
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
...@@ -5406,6 +5443,11 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) ...@@ -5406,6 +5443,11 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
if (!res) if (!res)
info(HA_STATUS_VARIABLE | HA_STATUS_CONST); info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
#ifdef HAVE_NDB_BINLOG
if (!ndb_binlog_tables_inited && ndb_binlog_running)
table->db_stat|= HA_READ_ONLY;
#endif
DBUG_RETURN(res); DBUG_RETURN(res);
} }
...@@ -5704,6 +5746,19 @@ int ndbcluster_drop_database_impl(const char *path) ...@@ -5704,6 +5746,19 @@ int ndbcluster_drop_database_impl(const char *path)
static void ndbcluster_drop_database(char *path) static void ndbcluster_drop_database(char *path)
{ {
DBUG_ENTER("ndbcluster_drop_database");
#ifdef HAVE_NDB_BINLOG
/*
Don't allow drop database unless
schema distribution table is setup
*/
if (!schema_share)
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
DBUG_VOID_RETURN;
//DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
#endif
ndbcluster_drop_database_impl(path); ndbcluster_drop_database_impl(path);
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
char db[FN_REFLEN]; char db[FN_REFLEN];
...@@ -5712,6 +5767,7 @@ static void ndbcluster_drop_database(char *path) ...@@ -5712,6 +5767,7 @@ static void ndbcluster_drop_database(char *path)
current_thd->query, current_thd->query_length, current_thd->query, current_thd->query_length,
db, "", 0, 0, SOT_DROP_DB); db, "", 0, 0, SOT_DROP_DB);
#endif #endif
DBUG_VOID_RETURN;
} }
/* /*
find all tables in ndb and discover those needed find all tables in ndb and discover those needed
...@@ -5733,36 +5789,37 @@ int ndbcluster_find_all_files(THD *thd) ...@@ -5733,36 +5789,37 @@ int ndbcluster_find_all_files(THD *thd)
DBUG_ENTER("ndbcluster_find_all_files"); DBUG_ENTER("ndbcluster_find_all_files");
Ndb* ndb; Ndb* ndb;
char key[FN_REFLEN]; char key[FN_REFLEN];
NdbDictionary::Dictionary::List list;
if (!(ndb= check_ndb_in_thd(thd))) if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
NDBDICT *dict= ndb->getDictionary(); NDBDICT *dict= ndb->getDictionary();
int unhandled, retries= 5; int unhandled, retries= 5, skipped;
do do
{ {
NdbDictionary::Dictionary::List list;
if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
ERR_RETURN(dict->getNdbError()); ERR_RETURN(dict->getNdbError());
unhandled= 0; unhandled= 0;
skipped= 0;
retries--;
for (uint i= 0 ; i < list.count ; i++) for (uint i= 0 ; i < list.count ; i++)
{ {
NDBDICT::List::Element& elmt= list.elements[i]; NDBDICT::List::Element& elmt= list.elements[i];
int do_handle_table= 0;
if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name)) if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
{ {
DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name)); DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
continue; continue;
} }
DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name)); DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
if (elmt.state == NDBOBJ::StateOnline || if (elmt.state != NDBOBJ::StateOnline &&
elmt.state == NDBOBJ::StateBackup) elmt.state != NDBOBJ::StateBackup &&
do_handle_table= 1; elmt.state != NDBOBJ::StateBuilding)
else if (!(elmt.state == NDBOBJ::StateBuilding))
{ {
sql_print_information("NDB: skipping setup table %s.%s, in state %d", sql_print_information("NDB: skipping setup table %s.%s, in state %d",
elmt.database, elmt.name, elmt.state); elmt.database, elmt.name, elmt.state);
skipped++;
continue; continue;
} }
...@@ -5771,7 +5828,7 @@ int ndbcluster_find_all_files(THD *thd) ...@@ -5771,7 +5828,7 @@ int ndbcluster_find_all_files(THD *thd)
if (!(ndbtab= dict->getTable(elmt.name))) if (!(ndbtab= dict->getTable(elmt.name)))
{ {
if (do_handle_table) if (retries == 0)
sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s", sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
elmt.database, elmt.name, elmt.database, elmt.name,
dict->getNdbError().code, dict->getNdbError().code,
...@@ -5840,9 +5897,9 @@ int ndbcluster_find_all_files(THD *thd) ...@@ -5840,9 +5897,9 @@ int ndbcluster_find_all_files(THD *thd)
pthread_mutex_unlock(&LOCK_open); pthread_mutex_unlock(&LOCK_open);
} }
} }
while (unhandled && retries--); while (unhandled && retries);
DBUG_RETURN(0); DBUG_RETURN(-(skipped + unhandled));
} }
int ndbcluster_find_files(THD *thd,const char *db,const char *path, int ndbcluster_find_files(THD *thd,const char *db,const char *path,
...@@ -7706,6 +7763,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -7706,6 +7763,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
pthread_cond_wait(&COND_server_started, &LOCK_server_started); pthread_cond_wait(&COND_server_started, &LOCK_server_started);
pthread_mutex_unlock(&LOCK_server_started); pthread_mutex_unlock(&LOCK_server_started);
ndbcluster_util_inited= 1;
/* /*
Wait for cluster to start Wait for cluster to start
*/ */
...@@ -7737,6 +7796,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -7737,6 +7796,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
} }
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
if (ndb_extra_logging && ndb_binlog_running)
sql_print_information("NDB Binlog: Ndb tables initially read only.");
/* create tables needed by the replication */ /* create tables needed by the replication */
ndbcluster_setup_binlog_table_shares(thd); ndbcluster_setup_binlog_table_shares(thd);
#else #else
...@@ -7746,17 +7807,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -7746,17 +7807,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
ndbcluster_find_all_files(thd); ndbcluster_find_all_files(thd);
#endif #endif
ndbcluster_util_inited= 1;
#ifdef HAVE_NDB_BINLOG
/* Signal injector thread that all is setup */
pthread_cond_signal(&injector_cond);
#endif
set_timespec(abstime, 0); set_timespec(abstime, 0);
for (;!abort_loop;) for (;!abort_loop;)
{ {
pthread_mutex_lock(&LOCK_ndb_util_thread); pthread_mutex_lock(&LOCK_ndb_util_thread);
pthread_cond_timedwait(&COND_ndb_util_thread, pthread_cond_timedwait(&COND_ndb_util_thread,
&LOCK_ndb_util_thread, &LOCK_ndb_util_thread,
...@@ -7774,7 +7827,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -7774,7 +7827,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
Check that the apply_status_share and schema_share has been created. Check that the apply_status_share and schema_share has been created.
If not try to create it If not try to create it
*/ */
if (!apply_status_share || !schema_share) if (!ndb_binlog_tables_inited)
ndbcluster_setup_binlog_table_shares(thd); ndbcluster_setup_binlog_table_shares(thd);
#endif #endif
...@@ -10029,14 +10082,15 @@ static int ndbcluster_fill_files_table(THD *thd, TABLE_LIST *tables, COND *cond) ...@@ -10029,14 +10082,15 @@ static int ndbcluster_fill_files_table(THD *thd, TABLE_LIST *tables, COND *cond)
} }
} }
dict->listObjects(dflist, NdbDictionary::Object::Undofile); NdbDictionary::Dictionary::List uflist;
dict->listObjects(uflist, NdbDictionary::Object::Undofile);
ndberr= dict->getNdbError(); ndberr= dict->getNdbError();
if (ndberr.classification != NdbError::NoError) if (ndberr.classification != NdbError::NoError)
ERR_RETURN(ndberr); ERR_RETURN(ndberr);
for (i= 0; i < dflist.count; i++) for (i= 0; i < uflist.count; i++)
{ {
NdbDictionary::Dictionary::List::Element& elt= dflist.elements[i]; NdbDictionary::Dictionary::List::Element& elt= uflist.elements[i];
Ndb_cluster_connection_node_iter iter; Ndb_cluster_connection_node_iter iter;
unsigned id; unsigned id;
......
...@@ -48,6 +48,7 @@ int ndb_binlog_thread_running= 0; ...@@ -48,6 +48,7 @@ int ndb_binlog_thread_running= 0;
FALSE if not FALSE if not
*/ */
my_bool ndb_binlog_running= FALSE; my_bool ndb_binlog_running= FALSE;
my_bool ndb_binlog_tables_inited= FALSE;
/* /*
Global reference to the ndb injector thread THD oject Global reference to the ndb injector thread THD oject
...@@ -775,32 +776,50 @@ static int ndbcluster_create_schema_table(THD *thd) ...@@ -775,32 +776,50 @@ static int ndbcluster_create_schema_table(THD *thd)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
void ndbcluster_setup_binlog_table_shares(THD *thd) int ndbcluster_setup_binlog_table_shares(THD *thd)
{ {
int done_find_all_files= 0;
if (!schema_share && if (!schema_share &&
ndbcluster_check_schema_share() == 0) ndbcluster_check_schema_share() == 0)
{ {
if (!done_find_all_files) pthread_mutex_lock(&LOCK_open);
ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
pthread_mutex_unlock(&LOCK_open);
if (!schema_share)
{ {
ndbcluster_find_all_files(thd); ndbcluster_create_schema_table(thd);
done_find_all_files= 1; // always make sure we create the 'schema' first
if (!schema_share)
return 1;
} }
ndbcluster_create_schema_table(thd);
// always make sure we create the 'schema' first
if (!schema_share)
return;
} }
if (!apply_status_share && if (!apply_status_share &&
ndbcluster_check_apply_status_share() == 0) ndbcluster_check_apply_status_share() == 0)
{ {
if (!done_find_all_files) pthread_mutex_lock(&LOCK_open);
ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
pthread_mutex_unlock(&LOCK_open);
if (!apply_status_share)
{ {
ndbcluster_find_all_files(thd); ndbcluster_create_apply_status_table(thd);
done_find_all_files= 1; if (!apply_status_share)
return 1;
}
}
if (!ndbcluster_find_all_files(thd))
{
pthread_mutex_lock(&LOCK_open);
ndb_binlog_tables_inited= TRUE;
if (ndb_binlog_running)
{
if (ndb_extra_logging)
sql_print_information("NDB Binlog: ndb tables writable");
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
} }
ndbcluster_create_apply_status_table(thd); pthread_mutex_unlock(&LOCK_open);
/* Signal injector thread that all is setup */
pthread_cond_signal(&injector_cond);
} }
return 0;
} }
/* /*
...@@ -936,6 +955,31 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf, ...@@ -936,6 +955,31 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
/* /*
log query in schema table log query in schema table
*/ */
static void ndb_report_waiting(const char *key,
int the_time,
const char *op,
const char *obj)
{
ulonglong ndb_latest_epoch= 0;
const char *proc_info= "<no info>";
pthread_mutex_lock(&injector_mutex);
if (injector_ndb)
ndb_latest_epoch= injector_ndb->getLatestGCI();
if (injector_thd)
proc_info= injector_thd->proc_info;
pthread_mutex_unlock(&injector_mutex);
sql_print_information("NDB %s:"
" waiting max %u sec for %s %s."
" epochs: (%u,%u,%u)"
" injector proc_info: %s"
,key, the_time, op, obj
,(uint)ndb_latest_handled_binlog_epoch
,(uint)ndb_latest_received_binlog_epoch
,(uint)ndb_latest_epoch
,proc_info
);
}
int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
const char *query, int query_length, const char *query, int query_length,
const char *db, const char *table_name, const char *db, const char *table_name,
...@@ -965,6 +1009,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -965,6 +1009,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
} }
char tmp_buf2[FN_REFLEN]; char tmp_buf2[FN_REFLEN];
const char *type_str;
switch (type) switch (type)
{ {
case SOT_DROP_TABLE: case SOT_DROP_TABLE:
...@@ -975,6 +1020,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -975,6 +1020,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
query= tmp_buf2; query= tmp_buf2;
query_length= (uint) (strxmov(tmp_buf2, "drop table `", query_length= (uint) (strxmov(tmp_buf2, "drop table `",
table_name, "`", NullS) - tmp_buf2); table_name, "`", NullS) - tmp_buf2);
type_str= "drop table";
break; break;
case SOT_RENAME_TABLE: case SOT_RENAME_TABLE:
/* redo the rename table query as is may contain several tables */ /* redo the rename table query as is may contain several tables */
...@@ -982,20 +1028,28 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -982,20 +1028,28 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
query_length= (uint) (strxmov(tmp_buf2, "rename table `", query_length= (uint) (strxmov(tmp_buf2, "rename table `",
old_db, ".", old_table_name, "` to `", old_db, ".", old_table_name, "` to `",
db, ".", table_name, "`", NullS) - tmp_buf2); db, ".", table_name, "`", NullS) - tmp_buf2);
type_str= "rename table";
break; break;
case SOT_CREATE_TABLE: case SOT_CREATE_TABLE:
// fall through type_str= "create table";
break;
case SOT_ALTER_TABLE: case SOT_ALTER_TABLE:
type_str= "create table";
break; break;
case SOT_DROP_DB: case SOT_DROP_DB:
type_str= "drop db";
break; break;
case SOT_CREATE_DB: case SOT_CREATE_DB:
type_str= "create db";
break; break;
case SOT_ALTER_DB: case SOT_ALTER_DB:
type_str= "alter db";
break; break;
case SOT_TABLESPACE: case SOT_TABLESPACE:
type_str= "tablespace";
break; break;
case SOT_LOGFILE_GROUP: case SOT_LOGFILE_GROUP:
type_str= "logfile group";
break; break;
default: default:
abort(); /* should not happen, programming error */ abort(); /* should not happen, programming error */
...@@ -1201,13 +1255,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -1201,13 +1255,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
max_timeout--; max_timeout--;
if (max_timeout == 0) if (max_timeout == 0)
{ {
sql_print_error("NDB create table: timed out. Ignoring..."); sql_print_error("NDB %s: distibuting %s timed out. Ignoring...",
type_str, ndb_schema_object->key);
break; break;
} }
if (ndb_extra_logging) if (ndb_extra_logging)
sql_print_information("NDB create table: " ndb_report_waiting(type_str, max_timeout,
"waiting max %u sec for create table %s.", "distributing", ndb_schema_object->key);
max_timeout, ndb_schema_object->key);
} }
(void) pthread_mutex_unlock(&ndb_schema_object->mutex); (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
} }
...@@ -1689,9 +1743,15 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1689,9 +1743,15 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
// skip // skip
break; break;
case NDBEVENT::TE_CLUSTER_FAILURE: case NDBEVENT::TE_CLUSTER_FAILURE:
// fall through
case NDBEVENT::TE_DROP: case NDBEVENT::TE_DROP:
if (ndb_extra_logging &&
ndb_binlog_tables_inited && ndb_binlog_running)
sql_print_information("NDB Binlog: ndb tables initially "
"read only on reconnect.");
free_share(&schema_share); free_share(&schema_share);
schema_share= 0; schema_share= 0;
ndb_binlog_tables_inited= FALSE;
// fall through // fall through
case NDBEVENT::TE_ALTER: case NDBEVENT::TE_ALTER:
ndb_handle_schema_change(thd, ndb, pOp, tmp_share); ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
...@@ -2494,9 +2554,15 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -2494,9 +2554,15 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
get_share(share); get_share(share);
if (do_apply_status_share) if (do_apply_status_share)
{
apply_status_share= get_share(share); apply_status_share= get_share(share);
(void) pthread_cond_signal(&injector_cond);
}
else if (do_schema_share) else if (do_schema_share)
{
schema_share= get_share(share); schema_share= get_share(share);
(void) pthread_cond_signal(&injector_cond);
}
DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u", DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u",
share->key, share->op, share->use_count)); share->key, share->op, share->use_count));
...@@ -2513,7 +2579,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -2513,7 +2579,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
*/ */
int int
ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
NDB_SHARE *share) NDB_SHARE *share, const char *type_str)
{ {
DBUG_ENTER("ndbcluster_handle_drop_table"); DBUG_ENTER("ndbcluster_handle_drop_table");
...@@ -2581,9 +2647,8 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, ...@@ -2581,9 +2647,8 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
break; break;
} }
if (ndb_extra_logging) if (ndb_extra_logging)
sql_print_information("NDB delete table: " ndb_report_waiting(type_str, max_timeout,
"waiting max %u sec for drop table %s.", type_str, share->key);
max_timeout, share->key);
} }
(void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&share->mutex);
#else #else
...@@ -2660,13 +2725,18 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2660,13 +2725,18 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
switch (type) switch (type)
{ {
case NDBEVENT::TE_CLUSTER_FAILURE: case NDBEVENT::TE_CLUSTER_FAILURE:
if (ndb_extra_logging)
sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
if (apply_status_share == share) if (apply_status_share == share)
{ {
if (ndb_extra_logging &&
ndb_binlog_tables_inited && ndb_binlog_running)
sql_print_information("NDB Binlog: ndb tables initially "
"read only on reconnect.");
free_share(&apply_status_share); free_share(&apply_status_share);
apply_status_share= 0; apply_status_share= 0;
ndb_binlog_tables_inited= FALSE;
} }
if (ndb_extra_logging)
sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: " DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: "
"%s received share: 0x%lx op: %lx share op: %lx " "%s received share: 0x%lx op: %lx share op: %lx "
"op_old: %lx", "op_old: %lx",
...@@ -2675,8 +2745,13 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2675,8 +2745,13 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
case NDBEVENT::TE_DROP: case NDBEVENT::TE_DROP:
if (apply_status_share == share) if (apply_status_share == share)
{ {
if (ndb_extra_logging &&
ndb_binlog_tables_inited && ndb_binlog_running)
sql_print_information("NDB Binlog: ndb tables initially "
"read only on reconnect.");
free_share(&apply_status_share); free_share(&apply_status_share);
apply_status_share= 0; apply_status_share= 0;
ndb_binlog_tables_inited= FALSE;
} }
/* ToDo: remove printout */ /* ToDo: remove printout */
if (ndb_extra_logging) if (ndb_extra_logging)
...@@ -3087,7 +3162,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3087,7 +3162,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
thd->proc_info= "Waiting for ndbcluster to start"; thd->proc_info= "Waiting for ndbcluster to start";
pthread_mutex_lock(&injector_mutex); pthread_mutex_lock(&injector_mutex);
while (!ndbcluster_util_inited) while (!schema_share || !apply_status_share)
{ {
/* ndb not connected yet */ /* ndb not connected yet */
struct timespec abstime; struct timespec abstime;
...@@ -3119,10 +3194,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3119,10 +3194,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
thd->db= db; thd->db= db;
if (ndb_binlog_running) if (ndb_binlog_running)
open_binlog_index(thd, &binlog_tables, &binlog_index); open_binlog_index(thd, &binlog_tables, &binlog_index);
if (!apply_status_share)
{
sql_print_error("NDB: Could not get apply status share");
}
thd->db= db; thd->db= db;
} }
...@@ -3184,6 +3255,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3184,6 +3255,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
if (unlikely(schema_res > 0)) if (unlikely(schema_res > 0))
{ {
thd->proc_info= "Processing events from schema table";
schema_ndb-> schema_ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
schema_ndb-> schema_ndb->
...@@ -3379,6 +3451,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3379,6 +3451,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
if (trans.good()) if (trans.good())
{ {
//DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes); //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
thd->proc_info= "Committing events to binlog";
injector::transaction::binlog_pos start= trans.start_pos(); injector::transaction::binlog_pos start= trans.start_pos();
if (int r= trans.commit()) if (int r= trans.commit())
{ {
...@@ -3418,6 +3491,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3418,6 +3491,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
} }
err: err:
DBUG_PRINT("info",("Shutting down cluster binlog thread")); DBUG_PRINT("info",("Shutting down cluster binlog thread"));
thd->proc_info= "Shutting down";
close_thread_tables(thd); close_thread_tables(thd);
pthread_mutex_lock(&injector_mutex); pthread_mutex_lock(&injector_mutex);
/* don't mess with the injector_ndb anymore from other threads */ /* don't mess with the injector_ndb anymore from other threads */
......
...@@ -101,7 +101,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -101,7 +101,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
const char *old_db= 0, const char *old_db= 0,
const char *old_table_name= 0); const char *old_table_name= 0);
int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
NDB_SHARE *share); NDB_SHARE *share,
const char *type_str);
void ndb_rep_event_name(String *event_name, void ndb_rep_event_name(String *event_name,
const char *db, const char *tbl); const char *db, const char *tbl);
int ndb_create_table_from_engine(THD *thd, const char *db, int ndb_create_table_from_engine(THD *thd, const char *db,
...@@ -112,12 +113,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg); ...@@ -112,12 +113,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg);
/* /*
table cluster_replication.apply_status table cluster_replication.apply_status
*/ */
void ndbcluster_setup_binlog_table_shares(THD *thd); int ndbcluster_setup_binlog_table_shares(THD *thd);
extern NDB_SHARE *apply_status_share; extern NDB_SHARE *apply_status_share;
extern NDB_SHARE *schema_share; extern NDB_SHARE *schema_share;
extern THD *injector_thd; extern THD *injector_thd;
extern my_bool ndb_binlog_running; extern my_bool ndb_binlog_running;
extern my_bool ndb_binlog_tables_inited;
bool bool
ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
......
...@@ -586,7 +586,8 @@ public: ...@@ -586,7 +586,8 @@ public:
enum Unimplemented enum Unimplemented
{ {
ScanOptimised = 15, //Default updateOptimised ScanOptimised = 15, //Default updateOptimised
AttributeGroup = 1012 //Default 0 AttributeGroup = 1012, //Default 0
FileNo = 102
}; };
}; };
...@@ -618,13 +619,13 @@ struct DictFilegroupInfo { ...@@ -618,13 +619,13 @@ struct DictFilegroupInfo {
*/ */
FileName = 100, FileName = 100,
FileType = 101, FileType = 101,
FileId = 102, FileId = 103,
FileNo = 103, // Per Filegroup
FileFGroupId = 104, FileFGroupId = 104,
FileFGroupVersion = 105, FileFGroupVersion = 105,
FileSizeHi = 106, FileSizeHi = 106,
FileSizeLo = 107, FileSizeLo = 107,
FileFreeExtents = 108, FileFreeExtents = 108,
FileVersion = 109,
FileEnd = 199, // FileEnd = 199, //
/** /**
...@@ -696,8 +697,8 @@ struct DictFilegroupInfo { ...@@ -696,8 +697,8 @@ struct DictFilegroupInfo {
struct File { struct File {
char FileName[PATH_MAX]; char FileName[PATH_MAX];
Uint32 FileType; Uint32 FileType;
Uint32 FileNo;
Uint32 FileId; Uint32 FileId;
Uint32 FileVersion;
Uint32 FilegroupId; Uint32 FilegroupId;
Uint32 FilegroupVersion; Uint32 FilegroupVersion;
Uint32 FileSizeHi; Uint32 FileSizeHi;
......
...@@ -66,9 +66,10 @@ struct DropFilegroupRef { ...@@ -66,9 +66,10 @@ struct DropFilegroupRef {
Busy = 701, Busy = 701,
NotMaster = 702, NotMaster = 702,
NoSuchFilegroup = 767, NoSuchFilegroup = 767,
FilegroupInUse = 768 FilegroupInUse = 768,
InvalidSchemaObjectVersion = 774
}; };
Uint32 senderData; Uint32 senderData;
Uint32 senderRef; Uint32 senderRef;
Uint32 masterNodeId; Uint32 masterNodeId;
...@@ -150,7 +151,8 @@ struct DropFileRef { ...@@ -150,7 +151,8 @@ struct DropFileRef {
NoError = 0, NoError = 0,
Busy = 701, Busy = 701,
NoSuchFile = 766, NoSuchFile = 766,
DropUndoFileNotSupported = 769 DropUndoFileNotSupported = 769,
InvalidSchemaObjectVersion = 774
}; };
Uint32 senderData; Uint32 senderData;
......
...@@ -216,8 +216,8 @@ SimpleProperties::SP2StructMapping ...@@ -216,8 +216,8 @@ SimpleProperties::SP2StructMapping
DictFilegroupInfo::FileMapping[] = { DictFilegroupInfo::FileMapping[] = {
DFGIMAPS(File, FileName, FileName, 0, PATH_MAX), DFGIMAPS(File, FileName, FileName, 0, PATH_MAX),
DFGIMAP2(File, FileType, FileType, 0, 1), DFGIMAP2(File, FileType, FileType, 0, 1),
DFGIMAP(File, FileNo, FileNo),
DFGIMAP(File, FileId, FileId), DFGIMAP(File, FileId, FileId),
DFGIMAP(File, FileVersion, FileVersion),
DFGIMAP(File, FileFGroupId, FilegroupId), DFGIMAP(File, FileFGroupId, FilegroupId),
DFGIMAP(File, FileFGroupVersion, FilegroupVersion), DFGIMAP(File, FileFGroupVersion, FilegroupVersion),
DFGIMAP(File, FileSizeHi, FileSizeHi), DFGIMAP(File, FileSizeHi, FileSizeHi),
...@@ -254,8 +254,8 @@ void ...@@ -254,8 +254,8 @@ void
DictFilegroupInfo::File::init(){ DictFilegroupInfo::File::init(){
memset(FileName, sizeof(FileName), 0); memset(FileName, sizeof(FileName), 0);
FileType = ~0; FileType = ~0;
FileNo = ~0;
FileId = ~0; FileId = ~0;
FileVersion = ~0;
FilegroupId = ~0; FilegroupId = ~0;
FilegroupVersion = ~0; FilegroupVersion = ~0;
FileSizeHi = 0; FileSizeHi = 0;
......
...@@ -631,7 +631,8 @@ Dbdict::packFileIntoPages(SimpleProperties::Writer & w, ...@@ -631,7 +631,8 @@ Dbdict::packFileIntoPages(SimpleProperties::Writer & w,
f.FileSizeHi = (f_ptr.p->m_file_size >> 32); f.FileSizeHi = (f_ptr.p->m_file_size >> 32);
f.FileSizeLo = (f_ptr.p->m_file_size & 0xFFFFFFFF); f.FileSizeLo = (f_ptr.p->m_file_size & 0xFFFFFFFF);
f.FileFreeExtents= free_extents; f.FileFreeExtents= free_extents;
f.FileNo = f_ptr.p->key; f.FileId = f_ptr.p->key;
f.FileVersion = f_ptr.p->m_version;
FilegroupPtr lfg_ptr; FilegroupPtr lfg_ptr;
ndbrequire(c_filegroup_hash.find(lfg_ptr, f.FilegroupId)); ndbrequire(c_filegroup_hash.find(lfg_ptr, f.FilegroupId));
...@@ -13588,6 +13589,13 @@ Dbdict::execDROP_FILE_REQ(Signal* signal) ...@@ -13588,6 +13589,13 @@ Dbdict::execDROP_FILE_REQ(Signal* signal)
break; break;
} }
if (file_ptr.p->m_version != version)
{
ref->errorCode = DropFileRef::InvalidSchemaObjectVersion;
ref->errorLine = __LINE__;
break;
}
Ptr<SchemaTransaction> trans_ptr; Ptr<SchemaTransaction> trans_ptr;
if (! c_Trans.seize(trans_ptr)) if (! c_Trans.seize(trans_ptr))
{ {
...@@ -13662,6 +13670,13 @@ Dbdict::execDROP_FILEGROUP_REQ(Signal* signal) ...@@ -13662,6 +13670,13 @@ Dbdict::execDROP_FILEGROUP_REQ(Signal* signal)
ref->errorLine = __LINE__; ref->errorLine = __LINE__;
break; break;
} }
if (filegroup_ptr.p->m_version != version)
{
ref->errorCode = DropFilegroupRef::InvalidSchemaObjectVersion;
ref->errorLine = __LINE__;
break;
}
Ptr<SchemaTransaction> trans_ptr; Ptr<SchemaTransaction> trans_ptr;
if (! c_Trans.seize(trans_ptr)) if (! c_Trans.seize(trans_ptr))
...@@ -15095,6 +15110,7 @@ Dbdict::create_file_prepare_start(Signal* signal, SchemaOp* op){ ...@@ -15095,6 +15110,7 @@ Dbdict::create_file_prepare_start(Signal* signal, SchemaOp* op){
filePtr.p->m_obj_ptr_i = obj_ptr.i; filePtr.p->m_obj_ptr_i = obj_ptr.i;
filePtr.p->m_filegroup_id = f.FilegroupId; filePtr.p->m_filegroup_id = f.FilegroupId;
filePtr.p->m_type = f.FileType; filePtr.p->m_type = f.FileType;
filePtr.p->m_version = op->m_obj_version;
obj_ptr.p->m_id = op->m_obj_id; obj_ptr.p->m_id = op->m_obj_id;
obj_ptr.p->m_type = f.FileType; obj_ptr.p->m_type = f.FileType;
......
...@@ -535,6 +535,7 @@ public: ...@@ -535,6 +535,7 @@ public:
Uint32 key; Uint32 key;
Uint32 m_magic; Uint32 m_magic;
Uint32 m_version;
Uint32 m_obj_ptr_i; Uint32 m_obj_ptr_i;
Uint32 m_filegroup_id; Uint32 m_filegroup_id;
Uint32 m_type; Uint32 m_type;
......
...@@ -4624,13 +4624,27 @@ NdbDictInterface::get_filegroup(NdbFilegroupImpl & dst, ...@@ -4624,13 +4624,27 @@ NdbDictInterface::get_filegroup(NdbFilegroupImpl & dst,
LinearSectionPtr ptr[1]; LinearSectionPtr ptr[1];
ptr[0].p = (Uint32*)name; ptr[0].p = (Uint32*)name;
ptr[0].sz = (strLen + 3)/4; ptr[0].sz = (strLen + 3)/4;
#ifndef IGNORE_VALGRIND_WARNINGS
if (strLen & 3)
{
Uint32 pad = 0;
m_buffer.clear();
m_buffer.append(name, strLen);
m_buffer.append(&pad, 4);
ptr[0].p = (Uint32*)m_buffer.get_data();
}
#endif
int r = dictSignal(&tSignal, ptr, 1, int r = dictSignal(&tSignal, ptr, 1,
-1, // any node -1, // any node
WAIT_GET_TAB_INFO_REQ, WAIT_GET_TAB_INFO_REQ,
DICT_WAITFOR_TIMEOUT, 100); DICT_WAITFOR_TIMEOUT, 100);
if (r) if (r)
{ {
dst.m_id = -1;
dst.m_version = ~0;
DBUG_PRINT("info", ("get_filegroup failed dictSignal")); DBUG_PRINT("info", ("get_filegroup failed dictSignal"));
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
...@@ -4766,7 +4780,18 @@ NdbDictInterface::get_file(NdbFileImpl & dst, ...@@ -4766,7 +4780,18 @@ NdbDictInterface::get_file(NdbFileImpl & dst,
LinearSectionPtr ptr[1]; LinearSectionPtr ptr[1];
ptr[0].p = (Uint32*)name; ptr[0].p = (Uint32*)name;
ptr[0].sz = (strLen + 3)/4; ptr[0].sz = (strLen + 3)/4;
#ifndef IGNORE_VALGRIND_WARNINGS
if (strLen & 3)
{
Uint32 pad = 0;
m_buffer.clear();
m_buffer.append(name, strLen);
m_buffer.append(&pad, 4);
ptr[0].p = (Uint32*)m_buffer.get_data();
}
#endif
int r = dictSignal(&tSignal, ptr, 1, int r = dictSignal(&tSignal, ptr, 1,
node, node,
WAIT_GET_TAB_INFO_REQ, WAIT_GET_TAB_INFO_REQ,
...@@ -4834,7 +4859,8 @@ NdbDictInterface::parseFileInfo(NdbFileImpl &dst, ...@@ -4834,7 +4859,8 @@ NdbDictInterface::parseFileInfo(NdbFileImpl &dst,
} }
dst.m_type= (NdbDictionary::Object::Type)f.FileType; dst.m_type= (NdbDictionary::Object::Type)f.FileType;
dst.m_id= f.FileNo; dst.m_id= f.FileId;
dst.m_version = f.FileVersion;
dst.m_size= ((Uint64)f.FileSizeHi << 32) | (f.FileSizeLo); dst.m_size= ((Uint64)f.FileSizeHi << 32) | (f.FileSizeLo);
dst.m_path.assign(f.FileName); dst.m_path.assign(f.FileName);
......
...@@ -386,6 +386,7 @@ ErrorBundle ErrorCodes[] = { ...@@ -386,6 +386,7 @@ ErrorBundle ErrorCodes[] = {
{ 768, DMEC, SE, "Cant drop filegroup, filegroup is used" }, { 768, DMEC, SE, "Cant drop filegroup, filegroup is used" },
{ 769, DMEC, SE, "Drop undofile not supported, drop logfile group instead" }, { 769, DMEC, SE, "Drop undofile not supported, drop logfile group instead" },
{ 770, DMEC, SE, "Cant drop file, file is used" }, { 770, DMEC, SE, "Cant drop file, file is used" },
{ 774, DMEC, SE, "Invalid schema object for drop" },
{ 241, HA_ERR_TABLE_DEF_CHANGED, SE, "Invalid schema object version" }, { 241, HA_ERR_TABLE_DEF_CHANGED, SE, "Invalid schema object version" },
{ 283, HA_ERR_NO_SUCH_TABLE, SE, "Table is being dropped" }, { 283, HA_ERR_NO_SUCH_TABLE, SE, "Table is being dropped" },
{ 284, HA_ERR_TABLE_DEF_CHANGED, SE, "Table not defined in transaction coordinator" }, { 284, HA_ERR_TABLE_DEF_CHANGED, SE, "Table not defined in transaction coordinator" },
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment