Many files:

  Semi-synchronous replication for InnoDB type tables; before telling the client that a commit has been processed, wait that the replication thread has returned from my_net_send() where it sends the binlog to the slave; note that TCP/IP, even with the TCP_NODELAY option does not guarantee that the slave has RECEIVED the data - this is just heuristic at the moment; this is useful in failover: in almost all cases, every transaction that has returned from the commit has been sent and processed in the slave, which makes failover to the slave simpler if the master crashes; the code does not work yet as is, because MySQL should call innobase_report_binlog_offset_and_commit() in a commit; we will most probably return that call to 5.0.x, to make InnoDB Hot Backup and group commit to work again; XA code broke them temporarily in 5.0.3
parent ba51f96e
This diff is collapsed.
...@@ -321,3 +321,5 @@ int innobase_rollback_by_xid( ...@@ -321,3 +321,5 @@ int innobase_rollback_by_xid(
int innobase_xa_end(THD *thd); int innobase_xa_end(THD *thd);
int innobase_repl_report_sent_binlog(THD *thd, char *log_file_name,
my_off_t end_offset);
...@@ -2411,3 +2411,60 @@ TYPELIB *ha_known_exts(void) ...@@ -2411,3 +2411,60 @@ TYPELIB *ha_known_exts(void)
} }
return &known_extensions; return &known_extensions;
} }
/*
Reports to table handlers up to which position we have sent the binlog
to a slave in replication
SYNOPSIS
ha_repl_report_sent_binlog()
NOTES
Only works for InnoDB at the moment
RETURN VALUE
Always 0 (= success)
PARAMETERS
THD *thd in: thread doing the binlog communication to
the slave
char *log_file_name in: binlog file name
my_off_t end_offset in: the offset in the binlog file up to
which we sent the contents to the slave
*/
int ha_repl_report_sent_binlog(THD *thd, char *log_file_name,
my_off_t end_offset)
{
#ifdef HAVE_INNOBASE_DB
return innobase_repl_report_sent_binlog(thd,log_file_name,end_offset);
#else
/* remove warnings about unused parameters */
thd=thd; log_file_name=log_file_name; end_offset=end_offset;
return 0;
#endif
}
/*
Reports to table handlers that we stop replication to a specific slave
SYNOPSIS
ha_repl_report_replication_stop()
NOTES
Does nothing at the moment
RETURN VALUE
Always 0 (= success)
PARAMETERS
THD *thd in: thread doing the binlog communication to
the slave
*/
int ha_repl_report_replication_stop(THD *thd)
{
thd = thd;
return 0;
}
...@@ -843,7 +843,7 @@ int ha_change_key_cache_param(KEY_CACHE *key_cache); ...@@ -843,7 +843,7 @@ int ha_change_key_cache_param(KEY_CACHE *key_cache);
int ha_change_key_cache(KEY_CACHE *old_key_cache, KEY_CACHE *new_key_cache); int ha_change_key_cache(KEY_CACHE *old_key_cache, KEY_CACHE *new_key_cache);
int ha_end_key_cache(KEY_CACHE *key_cache); int ha_end_key_cache(KEY_CACHE *key_cache);
/* weird stuff */ /* report to InnoDB that control passes to the client */
int ha_release_temporary_latches(THD *thd); int ha_release_temporary_latches(THD *thd);
/* transactions: interface to handlerton functions */ /* transactions: interface to handlerton functions */
...@@ -875,3 +875,7 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht); ...@@ -875,3 +875,7 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht);
#define trans_need_2pc(thd, all) ((total_ha_2pc > 1) && \ #define trans_need_2pc(thd, all) ((total_ha_2pc > 1) && \
!((all ? &thd->transaction.all : &thd->transaction.stmt)->no_2pc)) !((all ? &thd->transaction.all : &thd->transaction.stmt)->no_2pc))
/* semi-synchronous replication */
int ha_repl_report_sent_binlog(THD *thd, char *log_file_name,
my_off_t end_offset);
int ha_repl_report_replication_stop(THD *thd);
...@@ -5495,7 +5495,6 @@ The minimum value for this variable is 4096.", ...@@ -5495,7 +5495,6 @@ The minimum value for this variable is 4096.",
{"sync-frm", OPT_SYNC_FRM, "Sync .frm to disk on create. Enabled by default.", {"sync-frm", OPT_SYNC_FRM, "Sync .frm to disk on create. Enabled by default.",
(gptr*) &opt_sync_frm, (gptr*) &opt_sync_frm, 0, GET_BOOL, NO_ARG, 1, 0, (gptr*) &opt_sync_frm, (gptr*) &opt_sync_frm, 0, GET_BOOL, NO_ARG, 1, 0,
0, 0, 0, 0}, 0, 0, 0, 0},
#ifdef DOES_NOTHING_YET
{"sync-replication", OPT_SYNC_REPLICATION, {"sync-replication", OPT_SYNC_REPLICATION,
"Enable synchronous replication.", "Enable synchronous replication.",
(gptr*) &global_system_variables.sync_replication, (gptr*) &global_system_variables.sync_replication,
...@@ -5511,7 +5510,6 @@ The minimum value for this variable is 4096.", ...@@ -5511,7 +5510,6 @@ The minimum value for this variable is 4096.",
(gptr*) &global_system_variables.sync_replication_timeout, (gptr*) &global_system_variables.sync_replication_timeout,
(gptr*) &global_system_variables.sync_replication_timeout, (gptr*) &global_system_variables.sync_replication_timeout,
0, GET_ULONG, REQUIRED_ARG, 10, 0, ~0L, 0, 1, 0}, 0, GET_ULONG, REQUIRED_ARG, 10, 0, ~0L, 0, 1, 0},
#endif
{"table_cache", OPT_TABLE_CACHE, {"table_cache", OPT_TABLE_CACHE,
"The number of open tables for all threads.", (gptr*) &table_cache_size, "The number of open tables for all threads.", (gptr*) &table_cache_size,
(gptr*) &table_cache_size, 0, GET_ULONG, REQUIRED_ARG, 64, 1, 512*1024L, (gptr*) &table_cache_size, 0, GET_ULONG, REQUIRED_ARG, 64, 1, 512*1024L,
......
...@@ -957,11 +957,9 @@ struct show_var_st init_vars[]= { ...@@ -957,11 +957,9 @@ struct show_var_st init_vars[]= {
{"sql_warnings", (char*) &sys_sql_warnings, SHOW_BOOL}, {"sql_warnings", (char*) &sys_sql_warnings, SHOW_BOOL},
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
{sys_sync_binlog_period.name,(char*) &sys_sync_binlog_period, SHOW_SYS}, {sys_sync_binlog_period.name,(char*) &sys_sync_binlog_period, SHOW_SYS},
#ifdef DOES_NOTHING_YET
{sys_sync_replication.name, (char*) &sys_sync_replication, SHOW_SYS}, {sys_sync_replication.name, (char*) &sys_sync_replication, SHOW_SYS},
{sys_sync_replication_slave_id.name, (char*) &sys_sync_replication_slave_id,SHOW_SYS}, {sys_sync_replication_slave_id.name, (char*) &sys_sync_replication_slave_id,SHOW_SYS},
{sys_sync_replication_timeout.name, (char*) &sys_sync_replication_timeout,SHOW_SYS}, {sys_sync_replication_timeout.name, (char*) &sys_sync_replication_timeout,SHOW_SYS},
#endif
#endif #endif
{sys_sync_frm.name, (char*) &sys_sync_frm, SHOW_SYS}, {sys_sync_frm.name, (char*) &sys_sync_frm, SHOW_SYS},
#ifdef HAVE_TZNAME #ifdef HAVE_TZNAME
......
...@@ -385,6 +385,11 @@ impossible position"; ...@@ -385,6 +385,11 @@ impossible position";
goto err; goto err;
} }
printf("Binlog file name %s\n", log_file_name);
if (thd->variables.sync_replication)
ha_repl_report_sent_binlog(thd, log_file_name, pos);
/* /*
We need to start a packet with something other than 255 We need to start a packet with something other than 255
to distinguish it from error to distinguish it from error
...@@ -470,6 +475,10 @@ impossible position"; ...@@ -470,6 +475,10 @@ impossible position";
my_errno= ER_UNKNOWN_ERROR; my_errno= ER_UNKNOWN_ERROR;
goto err; goto err;
} }
if (thd->variables.sync_replication)
ha_repl_report_sent_binlog(thd, log_file_name, my_b_tell(&log));
/* /*
No need to save this event. We are only doing simple reads No need to save this event. We are only doing simple reads
(no real parsing of the events) so we don't need it. And so (no real parsing of the events) so we don't need it. And so
...@@ -527,6 +536,13 @@ impossible position"; ...@@ -527,6 +536,13 @@ impossible position";
my_errno= ER_UNKNOWN_ERROR; my_errno= ER_UNKNOWN_ERROR;
goto err; goto err;
} }
printf("Dump loop: %s: Current log position %lu\n", log_file_name,
(ulong)my_b_tell(&log));
if (thd->variables.sync_replication)
ha_repl_report_sent_binlog(thd, log_file_name, my_b_tell(&log));
DBUG_PRINT("info", ("log event code %d", DBUG_PRINT("info", ("log event code %d",
(*packet)[LOG_EVENT_OFFSET+1] )); (*packet)[LOG_EVENT_OFFSET+1] ));
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
...@@ -640,6 +656,12 @@ impossible position"; ...@@ -640,6 +656,12 @@ impossible position";
goto err; goto err;
} }
printf("Second loop: %s: Current log position %lu\n", log_file_name,
(ulong)my_b_tell(&log));
if (thd->variables.sync_replication)
ha_repl_report_sent_binlog(thd, log_file_name, my_b_tell(&log));
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{ {
if (send_file(thd)) if (send_file(thd))
...@@ -704,12 +726,22 @@ impossible position"; ...@@ -704,12 +726,22 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err; goto err;
} }
if (thd->variables.sync_replication)
ha_repl_report_sent_binlog(thd, log_file_name, 0);
printf("Binlog file name of a new binlog %s\n", log_file_name);
packet->length(0); packet->length(0);
packet->append('\0'); packet->append('\0');
} }
} }
end: end:
printf("Ending replication\n");
if (thd->variables.sync_replication)
ha_repl_report_replication_stop(thd);
end_io_cache(&log); end_io_cache(&log);
(void)my_close(file, MYF(MY_WME)); (void)my_close(file, MYF(MY_WME));
...@@ -721,6 +753,11 @@ impossible position"; ...@@ -721,6 +753,11 @@ impossible position";
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
err: err:
if (thd->variables.sync_replication)
ha_repl_report_replication_stop(thd);
printf("Ending replication in error %s\n", errmsg);
thd->proc_info = "Waiting to finalize termination"; thd->proc_info = "Waiting to finalize termination";
end_io_cache(&log); end_io_cache(&log);
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment