Commit e7dd358f authored by unknown's avatar unknown

MDEV-26: Global transaction ID.

When starting slave, check binlog state in addition to mysql.rpl_slave.state.

This allows to switch a previous master to be a slave directly
with MASTER_GTID_POS=AUTO.
parent 5fb52d7f
...@@ -120,7 +120,7 @@ static MYSQL_BIN_LOG::xid_count_per_binlog * ...@@ -120,7 +120,7 @@ static MYSQL_BIN_LOG::xid_count_per_binlog *
static bool start_binlog_background_thread(); static bool start_binlog_background_thread();
rpl_binlog_state rpl_global_gtid_binlog_state; static rpl_binlog_state rpl_global_gtid_binlog_state;
/** /**
purge logs, master and slave sides both, related error code purge logs, master and slave sides both, related error code
...@@ -5488,6 +5488,13 @@ MYSQL_BIN_LOG::read_state_from_file() ...@@ -5488,6 +5488,13 @@ MYSQL_BIN_LOG::read_state_from_file()
} }
int
MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
{
return rpl_global_gtid_binlog_state.get_most_recent_gtid_list(list, size);
}
/** /**
Write an event to the binary log. If with_annotate != NULL and Write an event to the binary log. If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the event *with_annotate = TRUE write also Annotate_rows before the event
...@@ -8176,8 +8183,7 @@ int TC_LOG_BINLOG::open(const char *opt_name) ...@@ -8176,8 +8183,7 @@ int TC_LOG_BINLOG::open(const char *opt_name)
else else
error= read_state_from_file(); error= read_state_from_file();
/* Pick the next unused seq_no from the loaded/recovered binlog state. */ /* Pick the next unused seq_no from the loaded/recovered binlog state. */
global_gtid_counter= rpl_global_gtid_binlog_state.seq_no_for_server_id global_gtid_counter= rpl_global_gtid_binlog_state.seq_no_from_state();
(global_system_variables.server_id);
delete ev; delete ev;
end_io_cache(&log); end_io_cache(&log);
......
...@@ -396,6 +396,7 @@ class MYSQL_QUERY_LOG: public MYSQL_LOG ...@@ -396,6 +396,7 @@ class MYSQL_QUERY_LOG: public MYSQL_LOG
( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID ) ( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID )
class binlog_cache_mngr; class binlog_cache_mngr;
class rpl_gtid;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{ {
private: private:
...@@ -773,6 +774,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -773,6 +774,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
bool write_gtid_event(THD *thd, bool standalone, bool is_transactional); bool write_gtid_event(THD *thd, bool standalone, bool is_transactional);
int read_state_from_file(); int read_state_from_file();
int write_state_to_file(); int write_state_to_file();
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
}; };
class Log_event_handler class Log_event_handler
......
...@@ -6279,6 +6279,24 @@ rpl_slave_state::next_subid(uint32 domain_id) ...@@ -6279,6 +6279,24 @@ rpl_slave_state::next_subid(uint32 domain_id)
#endif #endif
static
bool
rpl_slave_state_tostring_helper(String *dest, rpl_gtid *gtid, bool *first)
{
if (*first)
*first= false;
else
if (dest->append(",",1))
return true;
return
dest->append_ulonglong(gtid->domain_id) ||
dest->append("-",1) ||
dest->append_ulonglong(gtid->server_id) ||
dest->append("-",1) ||
dest->append_ulonglong(gtid->seq_no);
}
/* /*
Prepare the current slave state as a string, suitable for sending to the Prepare the current slave state as a string, suitable for sending to the
master to request to receive binlog events starting from that GTID state. master to request to receive binlog events starting from that GTID state.
...@@ -6288,10 +6306,20 @@ rpl_slave_state::next_subid(uint32 domain_id) ...@@ -6288,10 +6306,20 @@ rpl_slave_state::next_subid(uint32 domain_id)
*/ */
int int
rpl_slave_state::tostring(String *dest) rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
{ {
bool first= true; bool first= true;
uint32 i; uint32 i;
HASH gtid_hash;
uchar *rec;
rpl_gtid *gtid;
int res= 1;
my_hash_init(&gtid_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, NULL, HASH_UNIQUE);
for (i= 0; i < num_extra; ++i)
if (my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
lock(); lock();
...@@ -6319,19 +6347,43 @@ rpl_slave_state::tostring(String *dest) ...@@ -6319,19 +6347,43 @@ rpl_slave_state::tostring(String *dest)
} }
} }
if (first) /* Check if we have something newer in the extra list. */
first= false; rec= my_hash_search(&gtid_hash, (const uchar *)&best_gtid.domain_id, 0);
else if (rec)
dest->append("-",1); {
dest->append_ulonglong(best_gtid.domain_id); gtid= (rpl_gtid *)rec;
dest->append("-",1); if (gtid->seq_no > best_gtid.seq_no)
dest->append_ulonglong(best_gtid.server_id); memcpy(&best_gtid, gtid, sizeof(best_gtid));
dest->append("-",1); if (my_hash_delete(&gtid_hash, rec))
dest->append_ulonglong(best_gtid.seq_no); {
} unlock();
goto err;
}
}
if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first))
{
unlock();
goto err;
}
}
unlock(); unlock();
return 0;
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
{
gtid= (rpl_gtid *)my_hash_element(&gtid_hash, i);
if (rpl_slave_state_tostring_helper(dest, gtid, &first))
goto err;
}
res= 0;
err:
my_hash_free(&gtid_hash);
return res;
} }
...@@ -6359,18 +6411,28 @@ rpl_slave_state::is_empty() ...@@ -6359,18 +6411,28 @@ rpl_slave_state::is_empty()
rpl_binlog_state::rpl_binlog_state() rpl_binlog_state::rpl_binlog_state()
{ {
my_hash_init(&hash, &my_charset_bin, 32, my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
offsetof(rpl_gtid, domain_id), 2*sizeof(uint32), NULL, my_free, sizeof(uint32), NULL, my_free, HASH_UNIQUE);
HASH_UNIQUE);
mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
} }
void
rpl_binlog_state::reset()
{
uint32 i;
for (i= 0; i < hash.records; ++i)
my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
my_hash_reset(&hash);
}
rpl_binlog_state::~rpl_binlog_state() rpl_binlog_state::~rpl_binlog_state()
{ {
mysql_mutex_destroy(&LOCK_binlog_state); reset();
my_hash_free(&hash); my_hash_free(&hash);
mysql_mutex_destroy(&LOCK_binlog_state);
} }
...@@ -6385,67 +6447,129 @@ rpl_binlog_state::~rpl_binlog_state() ...@@ -6385,67 +6447,129 @@ rpl_binlog_state::~rpl_binlog_state()
int int
rpl_binlog_state::update(const struct rpl_gtid *gtid) rpl_binlog_state::update(const struct rpl_gtid *gtid)
{ {
uchar *rec; rpl_gtid *lookup_gtid;
element *elem;
rec= my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0); elem= (element *)my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
if (rec) if (elem)
{ {
const rpl_gtid *old_gtid= (const rpl_gtid *)rec; /*
if (old_gtid->seq_no > gtid->seq_no) By far the most common case is that successive events within same
sql_print_warning("Out-of-order GTIDs detected for " replication domain have the same server id (it changes only when
"domain_id=%u, server_id=%u. " switching to a new master). So save a hash lookup in this case.
"Please ensure that independent replication streams " */
"use different replication domain_id to avoid " if (likely(elem->last_gtid->server_id == gtid->server_id))
"inconsistencies.", gtid->domain_id, gtid->server_id); {
else elem->last_gtid->seq_no= gtid->seq_no;
memcpy(rec, gtid, sizeof(*gtid)); return 0;
}
lookup_gtid= (rpl_gtid *)
my_hash_search(&elem->hash, (const uchar *)&gtid->server_id, 0);
if (lookup_gtid)
{
lookup_gtid->seq_no= gtid->seq_no;
elem->last_gtid= lookup_gtid;
return 0;
}
/* Allocate a new GTID and insert it. */
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
if (!lookup_gtid)
return 1;
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
{
my_free(lookup_gtid);
return 1;
}
elem->last_gtid= lookup_gtid;
return 0; return 0;
} }
if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME)))) /* First time we see this domain_id; allocate a new element. */
return 1; elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
memcpy(rec, gtid, sizeof(*gtid)); lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
return my_hash_insert(&hash, rec); if (elem && lookup_gtid)
} {
elem->domain_id= gtid->domain_id;
my_hash_init(&elem->hash, &my_charset_bin, 32,
offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
HASH_UNIQUE);
elem->last_gtid= lookup_gtid;
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
{
lookup_gtid= NULL; /* Do not free. */
if (0 == my_hash_insert(&hash, (const uchar *)elem))
return 0;
}
my_hash_free(&elem->hash);
}
void /* An error. */
rpl_binlog_state::reset() if (elem)
{ my_free(elem);
my_hash_reset(&hash); if (lookup_gtid)
my_free(lookup_gtid);
return 1;
} }
uint32 uint32
rpl_binlog_state::seq_no_for_server_id(uint32 server_id) rpl_binlog_state::seq_no_from_state()
{ {
ulong i; ulong i, j;
uint64 seq_no= 0; uint64 seq_no= 0;
for (i= 0; i < hash.records; ++i) for (i= 0; i < hash.records; ++i)
{ {
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i); element *e= (element *)my_hash_element(&hash, i);
if (gtid->server_id == server_id && gtid->seq_no > seq_no) for (j= 0; j < e->hash.records; ++j)
seq_no= gtid->seq_no; {
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid->seq_no > seq_no)
seq_no= gtid->seq_no;
}
} }
return seq_no; return seq_no;
} }
/*
Write binlog state to text file, so we can read it in again without having
to scan last binlog file (normal shutdown/startup, not crash recovery).
The most recent GTID within each domain_id is written after any other GTID
within this domain.
*/
int int
rpl_binlog_state::write_to_iocache(IO_CACHE *dest) rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
{ {
ulong i; ulong i, j;
char buf[21]; char buf[21];
for (i= 0; i < count(); ++i) for (i= 0; i < hash.records; ++i)
{ {
size_t res; size_t res;
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i); element *e= (element *)my_hash_element(&hash, i);
longlong10_to_str(gtid->seq_no, buf, 10); for (j= 0; j <= e->hash.records; ++j)
res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf); {
if (res == (size_t) -1) const rpl_gtid *gtid;
return 1; if (j < e->hash.records)
{
gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid == e->last_gtid)
continue;
}
else
gtid= e->last_gtid;
longlong10_to_str(gtid->seq_no, buf, 10);
res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf);
if (res == (size_t) -1)
return 1;
}
} }
return 0; return 0;
...@@ -6861,6 +6985,78 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, ...@@ -6861,6 +6985,78 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
} }
uint32
rpl_binlog_state::count()
{
uint32 c= 0;
uint32 i;
for (i= 0; i < hash.records; ++i)
c+= ((element *)my_hash_element(&hash, i))->hash.records;
return c;
}
int
rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
{
uint32 i, j, pos;
pos= 0;
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
if (j < e->hash.records)
{
gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid == e->last_gtid)
continue;
}
else
gtid= e->last_gtid;
if (pos >= list_size)
return 1;
memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
}
}
return 0;
}
/*
Get a list of the most recently binlogged GTID, for each domain_id.
This can be used when switching from being a master to being a slave,
to know where to start replicating from the new master.
The returned list must be de-allocated with my_free().
Returns 0 for ok, non-zero for out-of-memory.
*/
int
rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
{
uint32 i;
*size= hash.records;
if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME))))
return 1;
for (i= 0; i < *size; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid));
}
return 0;
}
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
...@@ -6871,12 +7067,7 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) ...@@ -6871,12 +7067,7 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
/* Failure to allocate memory will be caught by is_valid() returning false. */ /* Failure to allocate memory will be caught by is_valid() returning false. */
if (count != 0 && count < (1<<28) && if (count != 0 && count < (1<<28) &&
(list = (rpl_gtid *)my_malloc(count * sizeof(*list), MYF(MY_WME)))) (list = (rpl_gtid *)my_malloc(count * sizeof(*list), MYF(MY_WME))))
{ gtid_set->get_gtid_list(list, count);
uint32 i;
for (i= 0; i < count; ++i)
list[i]= *(rpl_gtid *)my_hash_element(&gtid_set->hash, i);
}
} }
bool bool
......
...@@ -2981,7 +2981,7 @@ struct rpl_slave_state ...@@ -2981,7 +2981,7 @@ struct rpl_slave_state
uint32 domain_id; uint32 domain_id;
list_element *grab_list() { list_element *l= list; list= NULL; return l; } list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add (list_element *l) void add(list_element *l)
{ {
l->next= list; l->next= list;
list= l; list= l;
...@@ -3008,7 +3008,7 @@ struct rpl_slave_state ...@@ -3008,7 +3008,7 @@ struct rpl_slave_state
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction); bool in_transaction);
uint64 next_subid(uint32 domain_id); uint64 next_subid(uint32 domain_id);
int tostring(String *dest); int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
bool is_empty(); bool is_empty();
void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); } void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
...@@ -3027,10 +3027,21 @@ struct rpl_slave_state ...@@ -3027,10 +3027,21 @@ struct rpl_slave_state
containing a gigen GTID, by simply scanning backwards from the newest containing a gigen GTID, by simply scanning backwards from the newest
one until a lower seq_no is found in the Gtid_list_log_event at the one until a lower seq_no is found in the Gtid_list_log_event at the
start of a binlog for the given domain_id and server_id. start of a binlog for the given domain_id and server_id.
We also remember the last logged GTID for every domain_id. This is used
to know where to start when a master is changed to a slave. As a side
effect, it also allows to skip a hash lookup in the very common case of
logging a new GTID with same server id as last GTID.
*/ */
struct rpl_binlog_state struct rpl_binlog_state
{ {
/* Mapping from (domain_id,server_id) to its GTID. */ struct element {
uint32 domain_id;
HASH hash; /* Containing all server_id for one domain_id */
/* The most recent entry in the hash. */
rpl_gtid *last_gtid;
};
/* Mapping from domain_id to collection of elements. */
HASH hash; HASH hash;
/* Mutex protecting access to the state. */ /* Mutex protecting access to the state. */
mysql_mutex_t LOCK_binlog_state; mysql_mutex_t LOCK_binlog_state;
...@@ -3038,12 +3049,14 @@ struct rpl_binlog_state ...@@ -3038,12 +3049,14 @@ struct rpl_binlog_state
rpl_binlog_state(); rpl_binlog_state();
~rpl_binlog_state(); ~rpl_binlog_state();
ulong count() const { return hash.records; }
int update(const struct rpl_gtid *gtid);
void reset(); void reset();
uint32 seq_no_for_server_id(uint32 server_id); int update(const struct rpl_gtid *gtid);
uint32 seq_no_from_state();
int write_to_iocache(IO_CACHE *dest); int write_to_iocache(IO_CACHE *dest);
int read_from_iocache(IO_CACHE *src); int read_from_iocache(IO_CACHE *src);
uint32 count();
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
}; };
......
...@@ -1792,10 +1792,29 @@ when it try to get the value of TIME_ZONE global variable from master."; ...@@ -1792,10 +1792,29 @@ when it try to get the value of TIME_ZONE global variable from master.";
char str_buf[256]; char str_buf[256];
String connect_state(str_buf, sizeof(str_buf), system_charset_info); String connect_state(str_buf, sizeof(str_buf), system_charset_info);
connect_state.length(0); connect_state.length(0);
rpl_gtid *binlog_gtid_list= NULL;
uint32 num_binlog_gtids= 0;
if (opt_bin_log)
{
int err= mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list,
&num_binlog_gtids);
if (err)
{
err_code= ER_OUTOFMEMORY;
errmsg= "The slave I/O thread stops because a fatal out-of-memory "
"error is encountered when it tries to compute @slave_connect_state.";
sprintf(err_buff, "%s Error: Out of memory", errmsg);
goto err;
}
}
connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"), connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"),
system_charset_info); system_charset_info);
rpl_global_gtid_slave_state.tostring(&connect_state); rpl_global_gtid_slave_state.tostring(&connect_state, binlog_gtid_list,
num_binlog_gtids);
if (binlog_gtid_list)
my_free(binlog_gtid_list);
connect_state.append(STRING_WITH_LEN("'"), system_charset_info); connect_state.append(STRING_WITH_LEN("'"), system_charset_info);
rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length());
if (rc) if (rc)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment