Bug #18932: Cluster binlog mysqld accepts updating although binlog not setup

parent d43c5992
...@@ -4480,6 +4480,21 @@ int ha_ndbcluster::create(const char *name, ...@@ -4480,6 +4480,21 @@ int ha_ndbcluster::create(const char *name,
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} }
#ifdef HAVE_NDB_BINLOG
/*
Don't allow table creation unless
schema distribution table is setup
( unless it is a creation of the schema dist table itself )
*/
if (!schema_share &&
!(strcmp(m_dbname, NDB_REP_DB) == 0 &&
strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0))
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
#endif /* HAVE_NDB_BINLOG */
DBUG_PRINT("table", ("name: %s", m_tabname)); DBUG_PRINT("table", ("name: %s", m_tabname));
tab.setName(m_tabname); tab.setName(m_tabname);
tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE)); tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));
...@@ -5004,7 +5019,8 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) ...@@ -5004,7 +5019,8 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
is_old_table_tmpfile= 0; is_old_table_tmpfile= 0;
String event_name(INJECTOR_EVENT_LEN); String event_name(INJECTOR_EVENT_LEN);
ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0); ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0);
ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share); ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share,
"rename table");
} }
if (!result && !IS_TMP_PREFIX(new_tabname)) if (!result && !IS_TMP_PREFIX(new_tabname))
...@@ -5088,6 +5104,15 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ...@@ -5088,6 +5104,15 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
NDBDICT *dict= ndb->getDictionary(); NDBDICT *dict= ndb->getDictionary();
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/*
Don't allow drop table unless
schema distribution table is setup
*/
if (!schema_share)
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
NDB_SHARE *share= get_share(path, 0, false); NDB_SHARE *share= get_share(path, 0, false);
#endif #endif
...@@ -5156,7 +5181,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ...@@ -5156,7 +5181,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0); ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0);
ndbcluster_handle_drop_table(ndb, ndbcluster_handle_drop_table(ndb,
table_dropped ? event_name.c_ptr() : 0, table_dropped ? event_name.c_ptr() : 0,
share); share, "delete table");
} }
if (share) if (share)
...@@ -5185,6 +5210,18 @@ int ha_ndbcluster::delete_table(const char *name) ...@@ -5185,6 +5210,18 @@ int ha_ndbcluster::delete_table(const char *name)
set_dbname(name); set_dbname(name);
set_tabname(name); set_tabname(name);
#ifdef HAVE_NDB_BINLOG
/*
Don't allow drop table unless
schema distribution table is setup
*/
if (!schema_share)
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
#endif
if (check_ndb_connection()) if (check_ndb_connection())
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
...@@ -5406,6 +5443,11 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) ...@@ -5406,6 +5443,11 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
if (!res) if (!res)
info(HA_STATUS_VARIABLE | HA_STATUS_CONST); info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
#ifdef HAVE_NDB_BINLOG
if (!ndb_binlog_tables_inited && ndb_binlog_running)
table->db_stat|= HA_READ_ONLY;
#endif
DBUG_RETURN(res); DBUG_RETURN(res);
} }
...@@ -5704,6 +5746,19 @@ int ndbcluster_drop_database_impl(const char *path) ...@@ -5704,6 +5746,19 @@ int ndbcluster_drop_database_impl(const char *path)
static void ndbcluster_drop_database(char *path) static void ndbcluster_drop_database(char *path)
{ {
DBUG_ENTER("ndbcluster_drop_database");
#ifdef HAVE_NDB_BINLOG
/*
Don't allow drop database unless
schema distribution table is setup
*/
if (!schema_share)
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
DBUG_VOID_RETURN;
//DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
#endif
ndbcluster_drop_database_impl(path); ndbcluster_drop_database_impl(path);
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
char db[FN_REFLEN]; char db[FN_REFLEN];
...@@ -5712,6 +5767,7 @@ static void ndbcluster_drop_database(char *path) ...@@ -5712,6 +5767,7 @@ static void ndbcluster_drop_database(char *path)
current_thd->query, current_thd->query_length, current_thd->query, current_thd->query_length,
db, "", 0, 0, SOT_DROP_DB); db, "", 0, 0, SOT_DROP_DB);
#endif #endif
DBUG_VOID_RETURN;
} }
/* /*
find all tables in ndb and discover those needed find all tables in ndb and discover those needed
...@@ -5740,29 +5796,30 @@ int ndbcluster_find_all_files(THD *thd) ...@@ -5740,29 +5796,30 @@ int ndbcluster_find_all_files(THD *thd)
NDBDICT *dict= ndb->getDictionary(); NDBDICT *dict= ndb->getDictionary();
int unhandled, retries= 5; int unhandled, retries= 5, skipped;
do do
{ {
if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
ERR_RETURN(dict->getNdbError()); ERR_RETURN(dict->getNdbError());
unhandled= 0; unhandled= 0;
skipped= 0;
retries--;
for (uint i= 0 ; i < list.count ; i++) for (uint i= 0 ; i < list.count ; i++)
{ {
NDBDICT::List::Element& elmt= list.elements[i]; NDBDICT::List::Element& elmt= list.elements[i];
int do_handle_table= 0;
if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name)) if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
{ {
DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name)); DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
continue; continue;
} }
DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name)); DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
if (elmt.state == NDBOBJ::StateOnline || if (elmt.state != NDBOBJ::StateOnline &&
elmt.state == NDBOBJ::StateBackup) elmt.state != NDBOBJ::StateBackup &&
do_handle_table= 1; elmt.state != NDBOBJ::StateBuilding)
else if (!(elmt.state == NDBOBJ::StateBuilding))
{ {
sql_print_information("NDB: skipping setup table %s.%s, in state %d", sql_print_information("NDB: skipping setup table %s.%s, in state %d",
elmt.database, elmt.name, elmt.state); elmt.database, elmt.name, elmt.state);
skipped++;
continue; continue;
} }
...@@ -5771,7 +5828,7 @@ int ndbcluster_find_all_files(THD *thd) ...@@ -5771,7 +5828,7 @@ int ndbcluster_find_all_files(THD *thd)
if (!(ndbtab= dict->getTable(elmt.name))) if (!(ndbtab= dict->getTable(elmt.name)))
{ {
if (do_handle_table) if (retries == 0)
sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s", sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
elmt.database, elmt.name, elmt.database, elmt.name,
dict->getNdbError().code, dict->getNdbError().code,
...@@ -5840,9 +5897,9 @@ int ndbcluster_find_all_files(THD *thd) ...@@ -5840,9 +5897,9 @@ int ndbcluster_find_all_files(THD *thd)
pthread_mutex_unlock(&LOCK_open); pthread_mutex_unlock(&LOCK_open);
} }
} }
while (unhandled && retries--); while (unhandled && retries);
DBUG_RETURN(0); DBUG_RETURN(-(skipped + unhandled));
} }
int ndbcluster_find_files(THD *thd,const char *db,const char *path, int ndbcluster_find_files(THD *thd,const char *db,const char *path,
...@@ -7706,6 +7763,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -7706,6 +7763,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
pthread_cond_wait(&COND_server_started, &LOCK_server_started); pthread_cond_wait(&COND_server_started, &LOCK_server_started);
pthread_mutex_unlock(&LOCK_server_started); pthread_mutex_unlock(&LOCK_server_started);
ndbcluster_util_inited= 1;
/* /*
Wait for cluster to start Wait for cluster to start
*/ */
...@@ -7737,6 +7796,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -7737,6 +7796,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
} }
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
if (ndb_extra_logging && ndb_binlog_running)
sql_print_information("NDB Binlog: Ndb tables initially read only.");
/* create tables needed by the replication */ /* create tables needed by the replication */
ndbcluster_setup_binlog_table_shares(thd); ndbcluster_setup_binlog_table_shares(thd);
#else #else
...@@ -7746,17 +7807,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -7746,17 +7807,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
ndbcluster_find_all_files(thd); ndbcluster_find_all_files(thd);
#endif #endif
ndbcluster_util_inited= 1;
#ifdef HAVE_NDB_BINLOG
/* Signal injector thread that all is setup */
pthread_cond_signal(&injector_cond);
#endif
set_timespec(abstime, 0); set_timespec(abstime, 0);
for (;!abort_loop;) for (;!abort_loop;)
{ {
pthread_mutex_lock(&LOCK_ndb_util_thread); pthread_mutex_lock(&LOCK_ndb_util_thread);
pthread_cond_timedwait(&COND_ndb_util_thread, pthread_cond_timedwait(&COND_ndb_util_thread,
&LOCK_ndb_util_thread, &LOCK_ndb_util_thread,
...@@ -7774,7 +7827,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -7774,7 +7827,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
Check that the apply_status_share and schema_share has been created. Check that the apply_status_share and schema_share has been created.
If not try to create it If not try to create it
*/ */
if (!apply_status_share || !schema_share) if (!ndb_binlog_tables_inited)
ndbcluster_setup_binlog_table_shares(thd); ndbcluster_setup_binlog_table_shares(thd);
#endif #endif
......
This diff is collapsed.
...@@ -101,7 +101,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -101,7 +101,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
const char *old_db= 0, const char *old_db= 0,
const char *old_table_name= 0); const char *old_table_name= 0);
int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
NDB_SHARE *share); NDB_SHARE *share,
const char *type_str);
void ndb_rep_event_name(String *event_name, void ndb_rep_event_name(String *event_name,
const char *db, const char *tbl); const char *db, const char *tbl);
int ndb_create_table_from_engine(THD *thd, const char *db, int ndb_create_table_from_engine(THD *thd, const char *db,
...@@ -112,12 +113,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg); ...@@ -112,12 +113,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg);
/* /*
table cluster_replication.apply_status table cluster_replication.apply_status
*/ */
void ndbcluster_setup_binlog_table_shares(THD *thd); int ndbcluster_setup_binlog_table_shares(THD *thd);
extern NDB_SHARE *apply_status_share; extern NDB_SHARE *apply_status_share;
extern NDB_SHARE *schema_share; extern NDB_SHARE *schema_share;
extern THD *injector_thd; extern THD *injector_thd;
extern my_bool ndb_binlog_running; extern my_bool ndb_binlog_running;
extern my_bool ndb_binlog_tables_inited;
bool bool
ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment