Commit 3e798b03 authored by unknown's avatar unknown

MDEV-26: global transaction id. Intermediate commit.

Now master saves and restores the binlog state, across server restart and crash.
parent 03f28863
......@@ -3235,6 +3235,47 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (!is_relay_log)
{
char buf[FN_REFLEN];
/*
Output a Gtid_list_log_event at the start of the binlog file.
This is used to quickly determine which GTIDs are found in binlog
files earlier than this one, and which are found in this (or later)
binlogs.
The list gives a mapping from (domain_id, server_id) -> seq_no (so
this means that there is at most one entry for every unique pair
(domain_id, server_id) in the list). It indicates that this seq_no is
the last one found in an earlier binlog file for this (domain_id,
server_id) combination - so any higher seq_no should be search for
from this binlog file, or a later one.
This allows to locate the binlog file containing a given GTID by
scanning backwards, reading just the Gtid_list_log_event at the
start of each file, and scanning only the relevant binlog file when
found, not all binlog files.
The existence of a given entry (domain_id, server_id, seq_no)
guarantees only that this seq_no will not be found in this or any
later binlog file. It does not guarantee that it can be found it an
earlier binlog file, for example the file may have been purged.
If there is no entry for a given (domain_id, server_id) pair, then
it means that no such GTID exists in any earlier binlog. It is
permissible to remove such pair from future Gtid_list_log_events
if all previous binlog files containing such GTIDs have been purged
(though such optimization is not performed at the time of this
writing). So if there is no entry for given GTID it means that such
GTID should be search for in this or later binlog file, same as if
there had been an entry (domain_id, server_id, 0).
*/
Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state);
if (gl_ev.write(&log_file))
goto err;
/* Output a binlog checkpoint event at the start of the binlog file. */
/*
Construct an entry in the binlog_xid_count_list for the new binlog
file (we will not link it into the list until we know the new file
......@@ -5342,6 +5383,108 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
}
int
MYSQL_BIN_LOG::write_state_to_file()
{
File file_no;
IO_CACHE cache;
char buf[FN_REFLEN];
int err;
bool opened= false;
bool inited= false;
fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
MY_UNPACK_FILENAME);
if ((file_no= mysql_file_open(key_file_binlog_state, buf,
O_RDWR|O_CREAT|O_TRUNC|O_BINARY,
MYF(MY_WME))) < 0)
{
err= 1;
goto err;
}
opened= true;
if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0,
MYF(MY_WME|MY_WAIT_IF_FULL))))
goto err;
inited= true;
if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache)))
goto err;
inited= false;
if ((err= end_io_cache(&cache)))
goto err;
if ((err= mysql_file_sync(file_no, MYF(MY_WME|MY_SYNC_FILESIZE))))
goto err;
goto end;
err:
sql_print_error("Error writing binlog state to file '%s'.\n", buf);
if (inited)
end_io_cache(&cache);
end:
if (opened)
mysql_file_close(file_no, MYF(0));
return err;
}
int
MYSQL_BIN_LOG::read_state_from_file()
{
File file_no;
IO_CACHE cache;
char buf[FN_REFLEN];
int err;
bool opened= false;
bool inited= false;
fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
MY_UNPACK_FILENAME);
if ((file_no= mysql_file_open(key_file_binlog_state, buf,
O_RDONLY|O_BINARY, MYF(0))) < 0)
{
if (my_errno != ENOENT)
{
err= 1;
goto err;
}
else
{
rpl_gtid gtid;
/*
If the state file does not exist, this is the first server startup
with GTID enabled. So initialize to empty state.
*/
gtid.domain_id= global_system_variables.gtid_domain_id;
gtid.server_id= global_system_variables.server_id;
gtid.seq_no= 0;
rpl_global_gtid_binlog_state.update(&gtid);
err= 0;
goto end;
}
}
opened= true;
if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0,
MYF(MY_WME|MY_WAIT_IF_FULL))))
goto err;
inited= true;
if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache)))
goto err;
goto end;
err:
sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf);
end:
if (inited)
end_io_cache(&cache);
if (opened)
mysql_file_close(file_no, MYF(0));
return err;
}
/**
Write an event to the binary log. If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the event
......@@ -6801,6 +6944,8 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
void MYSQL_BIN_LOG::close(uint exiting)
{ // One can't set log_type here!
bool failed_to_save_state= false;
DBUG_ENTER("MYSQL_BIN_LOG::close");
DBUG_PRINT("enter",("exiting: %d", (int) exiting));
if (log_state == LOG_OPENED)
......@@ -6818,6 +6963,27 @@ void MYSQL_BIN_LOG::close(uint exiting)
s.write(&log_file);
bytes_written+= s.data_written;
signal_update();
/*
When we shut down server, write out the binlog state to a separate
file so we do not have to scan an entire binlog file to recover it
at next server start.
Note that this must be written and synced to disk before marking the
last binlog file as "not crashed".
*/
if (!is_relay_log && write_state_to_file())
{
sql_print_error("Failed to save binlog GTID state during shutdown. "
"Binlog will be marked as crashed, so that crash "
"recovery can recover the state at next server "
"startup.");
/*
Leave binlog file marked as crashed, so we can recover state by
scanning it now that we failed to write out the state properly.
*/
failed_to_save_state= true;
}
}
#endif /* HAVE_REPLICATION */
......@@ -6826,6 +6992,7 @@ void MYSQL_BIN_LOG::close(uint exiting)
&& !(exiting & LOG_CLOSE_DELAYED_CLOSE))
{
my_off_t org_position= mysql_file_tell(log_file.file, MYF(0));
if (!failed_to_save_state)
clear_inuse_flag_when_closing(log_file.file);
/*
Restore position so that anything we have in the IO_cache is written
......@@ -8004,7 +8171,10 @@ int TC_LOG_BINLOG::open(const char *opt_name)
(Format_description_log_event *)ev);
}
else
error=0;
error= read_state_from_file();
/* Pick the next unused seq_no from the loaded/recovered binlog state. */
global_gtid_counter= rpl_global_gtid_binlog_state.seq_no_for_server_id
(global_system_variables.server_id);
delete ev;
end_io_cache(&log);
......@@ -8415,6 +8585,37 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
}
break;
}
case GTID_LIST_EVENT:
if (first_round)
{
uint32 i;
Gtid_list_log_event *glev= (Gtid_list_log_event *)ev;
/* Initialise the binlog state from the Gtid_list event. */
rpl_global_gtid_binlog_state.reset();
for (i= 0; i < glev->count; ++i)
{
if (rpl_global_gtid_binlog_state.update(&(glev->list[i])))
goto err2;
}
}
break;
case GTID_EVENT:
if (first_round)
{
Gtid_log_event *gev= (Gtid_log_event *)ev;
rpl_gtid gtid;
/* Update the binlog state with any GTID logged after Gtid_list. */
gtid.domain_id= gev->domain_id;
gtid.server_id= gev->server_id;
gtid.seq_no= gev->seq_no;
if (rpl_global_gtid_binlog_state.update(&gtid))
goto err2;
}
break;
default:
/* Nothing. */
break;
......
......@@ -771,6 +771,8 @@ public:
inline uint32 get_open_count() { return open_count; }
void set_status_variables(THD *thd);
bool write_gtid_event(THD *thd, bool standalone, bool is_transactional);
int read_state_from_file();
int write_state_to_file();
};
class Log_event_handler
......
......@@ -6299,8 +6299,8 @@ rpl_binlog_state::~rpl_binlog_state()
/*
Update replication state with a new GTID.
If the replication domain id already exists, then the new GTID replaces the
old one for that domain id. Else a new entry is inserted.
If the (domain_id, server_id) pair already exists, then the new GTID replaces
the old one for that domain id. Else a new entry is inserted.
Returns 0 for ok, 1 for error.
*/
......@@ -6309,16 +6309,16 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid)
{
uchar *rec;
rec= my_hash_search(&hash, (const uchar *)gtid, 0);
rec= my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
if (rec)
{
const rpl_gtid *old_gtid= (const rpl_gtid *)rec;
if (old_gtid->server_id == gtid->server_id &&
old_gtid->seq_no > gtid->seq_no)
sql_print_warning("Out-of-order GTIDs detected for server_id=%u. "
if (old_gtid->seq_no > gtid->seq_no)
sql_print_warning("Out-of-order GTIDs detected for "
"domain_id=%u, server_id=%u. "
"Please ensure that independent replication streams "
"use different replication domain_id to avoid "
"inconsistencies.", gtid->server_id);
"inconsistencies.", gtid->domain_id, gtid->server_id);
else
memcpy(rec, gtid, sizeof(*gtid));
return 0;
......@@ -6329,6 +6329,87 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid)
memcpy(rec, gtid, sizeof(*gtid));
return my_hash_insert(&hash, rec);
}
void
rpl_binlog_state::reset()
{
my_hash_reset(&hash);
}
uint32
rpl_binlog_state::seq_no_for_server_id(uint32 server_id)
{
ulong i;
uint64 seq_no= 0;
for (i= 0; i < hash.records; ++i)
{
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
if (gtid->server_id == server_id && gtid->seq_no > seq_no)
seq_no= gtid->seq_no;
}
return seq_no;
}
int
rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
{
ulong i;
char buf[21];
for (i= 0; i < count(); ++i)
{
size_t res;
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
longlong10_to_str(gtid->seq_no, buf, 10);
res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf);
if (res == (size_t) -1)
return 1;
}
return 0;
}
int
rpl_binlog_state::read_from_iocache(IO_CACHE *src)
{
/* 10-digit - 10-digit - 20-digit \n \0 */
char buf[10+1+10+1+20+1+1];
char *p, *q, *end;
int err;
rpl_gtid gtid;
uint64 v;
reset();
for (;;)
{
size_t res= my_b_gets(src, buf, sizeof(buf));
if (!res)
break;
end= buf + res;
p= end;
v= (uint64)my_strtoll10(buf, &p, &err);
if (err != 0 || v > (uint32)0xffffffff || *p++ != '-')
return 1;
gtid.domain_id= (uint32)v;
q= end;
v= (uint64)my_strtoll10(p, &q, &err);
if (err != 0 || v > (uint32)0xffffffff || *q++ != '-')
return 1;
gtid.server_id= (uint32)v;
gtid.seq_no= (uint64)my_strtoll10(q, &end, &err);
if (err != 0)
return 1;
if (update(&gtid))
return 1;
}
return 0;
}
#endif /* MYSQL_SERVER */
......
......@@ -3038,6 +3038,10 @@ struct rpl_binlog_state
ulong count() const { return hash.records; }
int update(const struct rpl_gtid *gtid);
void reset();
uint32 seq_no_for_server_id(uint32 server_id);
int write_to_iocache(IO_CACHE *dest);
int read_from_iocache(IO_CACHE *src);
};
/**
......
......@@ -965,6 +965,7 @@ PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_trg, key_file_trn, key_file_init;
PSI_file_key key_file_query_log, key_file_slow_log;
PSI_file_key key_file_relaylog, key_file_relaylog_index;
PSI_file_key key_file_binlog_state;
static PSI_file_info all_server_files[]=
{
......@@ -995,7 +996,8 @@ static PSI_file_info all_server_files[]=
{ &key_file_tclog, "tclog", 0},
{ &key_file_trg, "trigger_name", 0},
{ &key_file_trn, "trigger", 0},
{ &key_file_init, "init", 0}
{ &key_file_init, "init", 0},
{ &key_file_binlog_state, "binlog_state", 0}
};
/**
......
......@@ -294,6 +294,7 @@ extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_trg, key_file_trn, key_file_init;
extern PSI_file_key key_file_query_log, key_file_slow_log;
extern PSI_file_key key_file_relaylog, key_file_relaylog_index;
extern PSI_file_key key_file_binlog_state;
void init_server_psi_keys();
#endif /* HAVE_PSI_INTERFACE */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment