Bug #17038, distribution of schema operation to multiple binlogs...

Bug #17038, distribution of schema operation to multiple binlogs missing/multiple entries, partial fix
- log alter table directly in server instead of in handler
- acknowledge alter table _after_ all binlog events have been processed
parent 36e9bd03
...@@ -4770,13 +4770,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) ...@@ -4770,13 +4770,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
"Creating event for logging table failed. " "Creating event for logging table failed. "
"See error log for details."); "See error log for details.");
} }
if (is_old_table_tmpfile) if (!is_old_table_tmpfile)
ndbcluster_log_schema_op(current_thd, share,
current_thd->query, current_thd->query_length,
m_dbname, new_tabname,
0, 0,
SOT_ALTER_TABLE);
else
ndbcluster_log_schema_op(current_thd, share, ndbcluster_log_schema_op(current_thd, share,
current_thd->query, current_thd->query_length, current_thd->query, current_thd->query_length,
m_dbname, new_tabname, m_dbname, new_tabname,
......
...@@ -441,6 +441,7 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, ...@@ -441,6 +441,7 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
break; break;
case LOGCOM_ALTER_TABLE: case LOGCOM_ALTER_TABLE:
type= SOT_ALTER_TABLE; type= SOT_ALTER_TABLE;
log= 1;
break; break;
case LOGCOM_RENAME_TABLE: case LOGCOM_RENAME_TABLE:
type= SOT_RENAME_TABLE; type= SOT_RENAME_TABLE;
...@@ -461,8 +462,10 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, ...@@ -461,8 +462,10 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
break; break;
} }
if (log) if (log)
{
ndbcluster_log_schema_op(thd, 0, query, query_length, ndbcluster_log_schema_op(thd, 0, query, query_length,
db, table_name, 0, 0, type); db, table_name, 0, 0, type);
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -891,6 +894,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -891,6 +894,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
} }
char tmp_buf2[FN_REFLEN]; char tmp_buf2[FN_REFLEN];
int get_a_share= 0;
switch (type) switch (type)
{ {
case SOT_DROP_TABLE: case SOT_DROP_TABLE:
...@@ -901,12 +905,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -901,12 +905,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
query= tmp_buf2; query= tmp_buf2;
query_length= (uint) (strxmov(tmp_buf2, "drop table `", query_length= (uint) (strxmov(tmp_buf2, "drop table `",
table_name, "`", NullS) - tmp_buf2); table_name, "`", NullS) - tmp_buf2);
break; // fall through
case SOT_CREATE_TABLE: case SOT_CREATE_TABLE:
break; // fall through
case SOT_RENAME_TABLE: case SOT_RENAME_TABLE:
break; // fall through
case SOT_ALTER_TABLE: case SOT_ALTER_TABLE:
if (!share)
get_a_share= 1;
break; break;
case SOT_DROP_DB: case SOT_DROP_DB:
break; break;
...@@ -922,6 +928,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -922,6 +928,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
abort(); /* should not happen, programming error */ abort(); /* should not happen, programming error */
} }
if (get_a_share)
{
char key[FN_REFLEN];
(void)strxnmov(key, FN_REFLEN, share_prefix, db,
"/", table_name, NullS);
share= get_share(key, 0, false, false);
}
const NdbError *ndb_error= 0; const NdbError *ndb_error= 0;
uint32 node_id= g_ndb_cluster_connection->node_id(); uint32 node_id= g_ndb_cluster_connection->node_id();
Uint64 epoch= 0; Uint64 epoch= 0;
...@@ -956,7 +970,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -956,7 +970,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
} }
Ndb *ndb= thd_ndb->ndb; Ndb *ndb= thd_ndb->ndb;
char old_db[128]; char old_db[FN_REFLEN];
strcpy(old_db, ndb->getDatabaseName()); strcpy(old_db, ndb->getDatabaseName());
char tmp_buf[SCHEMA_QUERY_SIZE]; char tmp_buf[SCHEMA_QUERY_SIZE];
...@@ -974,9 +988,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -974,9 +988,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
strcmp(NDB_SCHEMA_TABLE, table_name)) strcmp(NDB_SCHEMA_TABLE, table_name))
{ {
ndb_error= &dict->getNdbError(); ndb_error= &dict->getNdbError();
goto end;
} }
DBUG_RETURN(0); goto end;
} }
{ {
...@@ -1119,6 +1132,10 @@ end: ...@@ -1119,6 +1132,10 @@ end:
} }
(void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&share->mutex);
} }
if (get_a_share)
free_share(&share);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1328,7 +1345,10 @@ static int ...@@ -1328,7 +1345,10 @@ static int
ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
NdbEventOperation *pOp, NdbEventOperation *pOp,
List<Cluster_replication_schema> List<Cluster_replication_schema>
*schema_list, MEM_ROOT *mem_root) *post_epoch_log_list,
List<Cluster_replication_schema>
*post_epoch_unlock_list,
MEM_ROOT *mem_root)
{ {
DBUG_ENTER("ndb_binlog_thread_handle_schema_event"); DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
...@@ -1357,7 +1377,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1357,7 +1377,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
{ {
case SOT_DROP_TABLE: case SOT_DROP_TABLE:
/* binlog dropping table after any table operations */ /* binlog dropping table after any table operations */
schema_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root);
log_query= 0; log_query= 0;
break; break;
case SOT_RENAME_TABLE: case SOT_RENAME_TABLE:
...@@ -1389,7 +1409,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1389,7 +1409,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
TRUE, /* print error */ TRUE, /* print error */
TRUE); /* don't binlog the query */ TRUE); /* don't binlog the query */
/* binlog dropping database after any table operations */ /* binlog dropping database after any table operations */
schema_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root);
log_query= 0; log_query= 0;
break; break;
case SOT_CREATE_DB: case SOT_CREATE_DB:
...@@ -1431,8 +1451,19 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1431,8 +1451,19 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
{ {
DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length);
if (bitmap_is_set(&slock, node_id)) if (bitmap_is_set(&slock, node_id))
{
/*
If it is an SOT_ALTER_TABLE we need to acknowledge the
schema operation _after_ all the events have been
processed so that all schema events coming through
the event operation has been processed
*/
if ((enum SCHEMA_OP_TYPE)schema->type == SOT_ALTER_TABLE)
post_epoch_unlock_list->push_back(schema, mem_root);
else
ndbcluster_update_slock(thd, schema->db, schema->name); ndbcluster_update_slock(thd, schema->db, schema->name);
} }
}
if (log_query) if (log_query)
{ {
...@@ -2724,7 +2755,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -2724,7 +2755,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
MEM_ROOT *old_root= *root_ptr; MEM_ROOT *old_root= *root_ptr;
MEM_ROOT mem_root; MEM_ROOT mem_root;
init_sql_alloc(&mem_root, 4096, 0); init_sql_alloc(&mem_root, 4096, 0);
List<Cluster_replication_schema> schema_list; List<Cluster_replication_schema> post_epoch_log_list;
List<Cluster_replication_schema> post_epoch_unlock_list;
*root_ptr= &mem_root; *root_ptr= &mem_root;
if (unlikely(schema_res > 0)) if (unlikely(schema_res > 0))
...@@ -2737,7 +2769,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -2737,7 +2769,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{ {
if (!pOp->hasError()) if (!pOp->hasError())
ndb_binlog_thread_handle_schema_event(thd, schema_ndb, pOp, ndb_binlog_thread_handle_schema_event(thd, schema_ndb, pOp,
&schema_list, &mem_root); &post_epoch_log_list,
&post_epoch_unlock_list,
&mem_root);
else else
sql_print_error("NDB: error %lu (%s) on handling " sql_print_error("NDB: error %lu (%s) on handling "
"binlog schema event", "binlog schema event",
...@@ -2864,9 +2898,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -2864,9 +2898,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
} }
} }
/*
process any operations that should be done after
the epoch is complete
*/
{ {
Cluster_replication_schema *schema; Cluster_replication_schema *schema;
while ((schema= schema_list.pop())) while ((schema= post_epoch_unlock_list.pop()))
{
ndbcluster_update_slock(thd, schema->db, schema->name);
}
while ((schema= post_epoch_log_list.pop()))
{ {
char *thd_db_save= thd->db; char *thd_db_save= thd->db;
thd->db= schema->db; thd->db= schema->db;
......
...@@ -29,18 +29,24 @@ extern ulong ndb_extra_logging; ...@@ -29,18 +29,24 @@ extern ulong ndb_extra_logging;
#define INJECTOR_EVENT_LEN 200 #define INJECTOR_EVENT_LEN 200
/*
The numbers below must not change as they
are passed between mysql servers, and if changed
would break compatablility. Add new numbers to
the end.
*/
enum SCHEMA_OP_TYPE enum SCHEMA_OP_TYPE
{ {
SOT_DROP_TABLE, SOT_DROP_TABLE= 0,
SOT_CREATE_TABLE, SOT_CREATE_TABLE= 1,
SOT_RENAME_TABLE, SOT_RENAME_TABLE= 2,
SOT_ALTER_TABLE, SOT_ALTER_TABLE= 3,
SOT_DROP_DB, SOT_DROP_DB= 4,
SOT_CREATE_DB, SOT_CREATE_DB= 5,
SOT_ALTER_DB, SOT_ALTER_DB= 6,
SOT_CLEAR_SLOCK, SOT_CLEAR_SLOCK= 7,
SOT_TABLESPACE, SOT_TABLESPACE= 8,
SOT_LOGFILE_GROUP SOT_LOGFILE_GROUP= 9
}; };
const uint max_ndb_nodes= 64; /* multiple of 32 */ const uint max_ndb_nodes= 64; /* multiple of 32 */
......
...@@ -2529,12 +2529,11 @@ struct binlog_log_query_st ...@@ -2529,12 +2529,11 @@ struct binlog_log_query_st
const char *table_name; const char *table_name;
}; };
static my_bool binlog_log_query_handlerton(THD *thd, static my_bool binlog_log_query_handlerton2(THD *thd,
st_plugin_int *plugin, const handlerton *hton,
void *args) void *args)
{ {
struct binlog_log_query_st *b= (struct binlog_log_query_st*)args; struct binlog_log_query_st *b= (struct binlog_log_query_st*)args;
handlerton *hton= (handlerton *) plugin->plugin->info;
if (hton->state == SHOW_OPTION_YES && hton->binlog_log_query) if (hton->state == SHOW_OPTION_YES && hton->binlog_log_query)
hton->binlog_log_query(thd, hton->binlog_log_query(thd,
b->binlog_command, b->binlog_command,
...@@ -2545,7 +2544,15 @@ static my_bool binlog_log_query_handlerton(THD *thd, ...@@ -2545,7 +2544,15 @@ static my_bool binlog_log_query_handlerton(THD *thd,
return FALSE; return FALSE;
} }
void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, static my_bool binlog_log_query_handlerton(THD *thd,
st_plugin_int *plugin,
void *args)
{
return binlog_log_query_handlerton2(thd, (const handlerton *) plugin->plugin->info, args);
}
void ha_binlog_log_query(THD *thd, const handlerton *hton,
enum_binlog_command binlog_command,
const char *query, uint query_length, const char *query, uint query_length,
const char *db, const char *table_name) const char *db, const char *table_name)
{ {
...@@ -2555,8 +2562,11 @@ void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, ...@@ -2555,8 +2562,11 @@ void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
b.query_length= query_length; b.query_length= query_length;
b.db= db; b.db= db;
b.table_name= table_name; b.table_name= table_name;
if (hton == 0)
plugin_foreach(thd, binlog_log_query_handlerton, plugin_foreach(thd, binlog_log_query_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, &b); MYSQL_STORAGE_ENGINE_PLUGIN, &b);
else
binlog_log_query_handlerton2(thd, hton, &b);
} }
#endif #endif
......
...@@ -2020,7 +2020,8 @@ int ha_repl_report_replication_stop(THD *thd); ...@@ -2020,7 +2020,8 @@ int ha_repl_report_replication_stop(THD *thd);
int ha_reset_logs(THD *thd); int ha_reset_logs(THD *thd);
int ha_binlog_index_purge_file(THD *thd, const char *file); int ha_binlog_index_purge_file(THD *thd, const char *file);
void ha_reset_slave(THD *thd); void ha_reset_slave(THD *thd);
void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, void ha_binlog_log_query(THD *thd, const handlerton *db_type,
enum_binlog_command binlog_command,
const char *query, uint query_length, const char *query, uint query_length,
const char *db, const char *table_name); const char *db, const char *table_name);
void ha_binlog_wait(THD *thd); void ha_binlog_wait(THD *thd);
......
...@@ -501,7 +501,7 @@ bool mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info, ...@@ -501,7 +501,7 @@ bool mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info,
query_length= thd->query_length; query_length= thd->query_length;
} }
ha_binlog_log_query(thd, LOGCOM_CREATE_DB, ha_binlog_log_query(thd, 0, LOGCOM_CREATE_DB,
query, query_length, query, query_length,
db, ""); db, "");
...@@ -579,7 +579,7 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info) ...@@ -579,7 +579,7 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info)
thd->variables.collation_database= thd->db_charset; thd->variables.collation_database= thd->db_charset;
} }
ha_binlog_log_query(thd, LOGCOM_ALTER_DB, ha_binlog_log_query(thd, 0, LOGCOM_ALTER_DB,
thd->query, thd->query_length, thd->query, thd->query_length,
db, ""); db, "");
......
...@@ -4974,6 +4974,10 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name, ...@@ -4974,6 +4974,10 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
} }
thd->proc_info="end"; thd->proc_info="end";
ha_binlog_log_query(thd, create_info->db_type, LOGCOM_ALTER_TABLE,
thd->query, thd->query_length,
db, table_name);
DBUG_ASSERT(!(mysql_bin_log.is_open() && binlog_row_based && DBUG_ASSERT(!(mysql_bin_log.is_open() && binlog_row_based &&
(create_info->options & HA_LEX_CREATE_TMP_TABLE))); (create_info->options & HA_LEX_CREATE_TMP_TABLE)));
write_bin_log(thd, TRUE, thd->query, thd->query_length); write_bin_log(thd, TRUE, thd->query, thd->query_length);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment