Bug #16851 Cluster: Auto Database Push is not working properly

parent f10a7edf
...@@ -4189,8 +4189,8 @@ int ha_ndbcluster::create(const char *name, ...@@ -4189,8 +4189,8 @@ int ha_ndbcluster::create(const char *name,
if ((my_errno= write_ndb_file(name))) if ((my_errno= write_ndb_file(name)))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
if (ndb_binlog_thread_running > 0) ndbcluster_create_binlog_setup(get_ndb(), name2, strlen(name2),
ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname, 0); m_dbname, m_tabname, FALSE);
#endif /* HAVE_NDB_BINLOG */ #endif /* HAVE_NDB_BINLOG */
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} }
...@@ -4386,6 +4386,8 @@ int ha_ndbcluster::create(const char *name, ...@@ -4386,6 +4386,8 @@ int ha_ndbcluster::create(const char *name,
" Event: %s", name2); " Event: %s", name2);
/* a warning has been issued to the client */ /* a warning has been issued to the client */
} }
if (share && ndb_binlog_thread_running <= 0)
share->flags|= NSF_NO_BINLOG;
ndbcluster_log_schema_op(current_thd, share, ndbcluster_log_schema_op(current_thd, share,
current_thd->query, current_thd->query_length, current_thd->query, current_thd->query_length,
share->db, share->table_name, share->db, share->table_name,
...@@ -5460,7 +5462,7 @@ int ndbcluster_find_all_files(THD *thd) ...@@ -5460,7 +5462,7 @@ int ndbcluster_find_all_files(THD *thd)
/* no such database defined, skip table */ /* no such database defined, skip table */
continue; continue;
} }
strxnmov(end, FN_LEN-1-(key-end), "/", elmt.name, NullS); end= strxnmov(end, FN_LEN-1-(end-key), "/", elmt.name, NullS);
const void *data= 0, *pack_data= 0; const void *data= 0, *pack_data= 0;
uint length, pack_length; uint length, pack_length;
int discover= 0; int discover= 0;
...@@ -5486,41 +5488,25 @@ int ndbcluster_find_all_files(THD *thd) ...@@ -5486,41 +5488,25 @@ int ndbcluster_find_all_files(THD *thd)
my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR)); my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR)); my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_lock(&LOCK_open);
if (discover) if (discover)
{ {
/* ToDo 4.1 database needs to be created if missing */ /* ToDo 4.1 database needs to be created if missing */
pthread_mutex_lock(&LOCK_open);
if (ndb_create_table_from_engine(thd, elmt.database, elmt.name)) if (ndb_create_table_from_engine(thd, elmt.database, elmt.name))
{ {
/* ToDo 4.1 handle error */ /* ToDo 4.1 handle error */
} }
pthread_mutex_unlock(&LOCK_open);
} }
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
else if (ndb_binlog_thread_running > 0) else
{ {
/* set up replication for this table */ /* set up replication for this table */
NDB_SHARE *share; ndbcluster_create_binlog_setup(ndb, key, end-key,
pthread_mutex_lock(&ndbcluster_mutex); elmt.database, elmt.name,
if (((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables, TRUE);
(byte*) key, strlen(key)))
&& share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG))
|| share == 0)
{
/*
there is no binlog creation setup for this table
attempt to do it
*/
pthread_mutex_unlock(&ndbcluster_mutex);
pthread_mutex_lock(&LOCK_open);
ndbcluster_create_binlog_setup(ndb, key, elmt.database, elmt.name,
share);
pthread_mutex_unlock(&LOCK_open);
}
else
pthread_mutex_unlock(&ndbcluster_mutex);
} }
#endif #endif
pthread_mutex_unlock(&LOCK_open);
} }
} }
while (unhandled && retries--); while (unhandled && retries--);
...@@ -5635,36 +5621,18 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, ...@@ -5635,36 +5621,18 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/* setup logging to binlog for all discovered tables */ /* setup logging to binlog for all discovered tables */
if (ndb_binlog_thread_running > 0)
{ {
char *end; char *end, *end1=
char *end1=
strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS); strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS);
NDB_SHARE *share;
pthread_mutex_lock(&ndbcluster_mutex);
for (i= 0; i < ok_tables.records; i++) for (i= 0; i < ok_tables.records; i++)
{ {
file_name= (char*)hash_element(&ok_tables, i); file_name= (char*)hash_element(&ok_tables, i);
end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS); end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS);
if ((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
(byte*)name, end - name))
&& share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG))
{
/*
there is no binlog creation setup for this table
attempt to do it
*/
pthread_mutex_unlock(&ndbcluster_mutex);
pthread_mutex_lock(&LOCK_open); pthread_mutex_lock(&LOCK_open);
ndbcluster_create_binlog_setup(ndb, name, db, file_name, share); ndbcluster_create_binlog_setup(ndb, name, end-name,
db, file_name, TRUE);
pthread_mutex_unlock(&LOCK_open); pthread_mutex_unlock(&LOCK_open);
pthread_mutex_lock(&ndbcluster_mutex);
}
/* Table existed in the mysqld so there should be a share */
DBUG_ASSERT(share != NULL);
} }
pthread_mutex_unlock(&ndbcluster_mutex);
} }
#endif #endif
......
...@@ -239,11 +239,17 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) ...@@ -239,11 +239,17 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
share->table= 0; share->table= 0;
if (ndb_binlog_thread_running <= 0) if (ndb_binlog_thread_running <= 0)
{ {
DBUG_ASSERT(_table != 0); if (_table)
{
if (_table->s->primary_key == MAX_KEY) if (_table->s->primary_key == MAX_KEY)
share->flags|= NSF_HIDDEN_PK; share->flags|= NSF_HIDDEN_PK;
if (_table->s->blob_fields != 0) if (_table->s->blob_fields != 0)
share->flags|= NSF_BLOB_FLAG; share->flags|= NSF_BLOB_FLAG;
}
else
{
share->flags|= NSF_NO_BINLOG;
}
return; return;
} }
while (1) while (1)
...@@ -1626,25 +1632,37 @@ ndb_rep_event_name(String *event_name,const char *db, const char *tbl) ...@@ -1626,25 +1632,37 @@ ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
create/discover. create/discover.
*/ */
int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
uint key_len,
const char *db, const char *db,
const char *table_name, const char *table_name,
NDB_SHARE *share) my_bool share_may_exist)
{ {
DBUG_ENTER("ndbcluster_create_binlog_setup"); DBUG_ENTER("ndbcluster_create_binlog_setup");
DBUG_PRINT("enter",("key: %s key_len: %d %s.%s share_may_exist: %d",
key, key_len, db, table_name, share_may_exist));
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name)); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
DBUG_ASSERT(strlen(key) == key_len);
pthread_mutex_lock(&ndbcluster_mutex); pthread_mutex_lock(&ndbcluster_mutex);
/* Handle any trailing share */ /* Handle any trailing share */
if (share == 0) NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) key, key_len);
if (share && share_may_exist)
{
if (share->flags & NSF_NO_BINLOG ||
share->op != 0 ||
share->op_old != 0)
{ {
share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, pthread_mutex_unlock(&ndbcluster_mutex);
(byte*) key, strlen(key)); DBUG_RETURN(0); // replication already setup, or should not
}
}
if (share) if (share)
{
handle_trailing_share(share); handle_trailing_share(share);
} }
else
handle_trailing_share(share);
/* Create share which is needed to hold replication information */ /* Create share which is needed to hold replication information */
if (!(share= get_share(key, 0, true, true))) if (!(share= get_share(key, 0, true, true)))
...@@ -1652,6 +1670,13 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, ...@@ -1652,6 +1670,13 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
sql_print_error("NDB Binlog: " sql_print_error("NDB Binlog: "
"allocating table share for %s failed", key); "allocating table share for %s failed", key);
} }
if (ndb_binlog_thread_running <= 0)
{
share->flags|= NSF_NO_BINLOG;
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_unlock(&ndbcluster_mutex);
while (share && !IS_TMP_PREFIX(table_name)) while (share && !IS_TMP_PREFIX(table_name))
...@@ -1749,6 +1774,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, ...@@ -1749,6 +1774,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
sql_print_error("NDB Binlog: logging of table %s " sql_print_error("NDB Binlog: logging of table %s "
"with no PK and blob attributes is not supported", "with no PK and blob attributes is not supported",
share->key); share->key);
share->flags|= NSF_NO_BINLOG;
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
/* No primary key, subscribe for all attributes */ /* No primary key, subscribe for all attributes */
......
...@@ -72,9 +72,10 @@ void ndbcluster_binlog_init_handlerton(); ...@@ -72,9 +72,10 @@ void ndbcluster_binlog_init_handlerton();
void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table); void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table);
int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
uint key_len,
const char *db, const char *db,
const char *table_name, const char *table_name,
NDB_SHARE *share); my_bool share_may_exist);
int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table, int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table,
const char *event_name, NDB_SHARE *share); const char *event_name, NDB_SHARE *share);
int ndbcluster_create_event_ops(NDB_SHARE *share, int ndbcluster_create_event_ops(NDB_SHARE *share,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment