Commit 1d357776 authored by unknown's avatar unknown

MDEV-26: Global transaction ID.

When starting slave, check binlog state in addition to mysql.rpl_slave.state.

This allows to switch a previous master to be a slave directly
with MASTER_GTID_POS=AUTO.
parent 0b36233a
......@@ -120,7 +120,7 @@ static MYSQL_BIN_LOG::xid_count_per_binlog *
static bool start_binlog_background_thread();
rpl_binlog_state rpl_global_gtid_binlog_state;
static rpl_binlog_state rpl_global_gtid_binlog_state;
/**
purge logs, master and slave sides both, related error code
......@@ -5488,6 +5488,13 @@ MYSQL_BIN_LOG::read_state_from_file()
}
int
MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
{
return rpl_global_gtid_binlog_state.get_most_recent_gtid_list(list, size);
}
/**
Write an event to the binary log. If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the event
......@@ -8176,8 +8183,7 @@ int TC_LOG_BINLOG::open(const char *opt_name)
else
error= read_state_from_file();
/* Pick the next unused seq_no from the loaded/recovered binlog state. */
global_gtid_counter= rpl_global_gtid_binlog_state.seq_no_for_server_id
(global_system_variables.server_id);
global_gtid_counter= rpl_global_gtid_binlog_state.seq_no_from_state();
delete ev;
end_io_cache(&log);
......
......@@ -396,6 +396,7 @@ class MYSQL_QUERY_LOG: public MYSQL_LOG
( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID )
class binlog_cache_mngr;
class rpl_gtid;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{
private:
......@@ -773,6 +774,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
bool write_gtid_event(THD *thd, bool standalone, bool is_transactional);
int read_state_from_file();
int write_state_to_file();
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
};
class Log_event_handler
......
......@@ -6279,6 +6279,24 @@ rpl_slave_state::next_subid(uint32 domain_id)
#endif
static
bool
rpl_slave_state_tostring_helper(String *dest, rpl_gtid *gtid, bool *first)
{
if (*first)
*first= false;
else
if (dest->append(",",1))
return true;
return
dest->append_ulonglong(gtid->domain_id) ||
dest->append("-",1) ||
dest->append_ulonglong(gtid->server_id) ||
dest->append("-",1) ||
dest->append_ulonglong(gtid->seq_no);
}
/*
Prepare the current slave state as a string, suitable for sending to the
master to request to receive binlog events starting from that GTID state.
......@@ -6288,10 +6306,20 @@ rpl_slave_state::next_subid(uint32 domain_id)
*/
int
rpl_slave_state::tostring(String *dest)
rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
{
bool first= true;
uint32 i;
HASH gtid_hash;
uchar *rec;
rpl_gtid *gtid;
int res= 1;
my_hash_init(&gtid_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, NULL, HASH_UNIQUE);
for (i= 0; i < num_extra; ++i)
if (my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
lock();
......@@ -6319,19 +6347,43 @@ rpl_slave_state::tostring(String *dest)
}
}
if (first)
first= false;
else
dest->append("-",1);
dest->append_ulonglong(best_gtid.domain_id);
dest->append("-",1);
dest->append_ulonglong(best_gtid.server_id);
dest->append("-",1);
dest->append_ulonglong(best_gtid.seq_no);
/* Check if we have something newer in the extra list. */
rec= my_hash_search(&gtid_hash, (const uchar *)&best_gtid.domain_id, 0);
if (rec)
{
gtid= (rpl_gtid *)rec;
if (gtid->seq_no > best_gtid.seq_no)
memcpy(&best_gtid, gtid, sizeof(best_gtid));
if (my_hash_delete(&gtid_hash, rec))
{
unlock();
goto err;
}
}
if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first))
{
unlock();
return 0;
goto err;
}
}
unlock();
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
{
gtid= (rpl_gtid *)my_hash_element(&gtid_hash, i);
if (rpl_slave_state_tostring_helper(dest, gtid, &first))
goto err;
}
res= 0;
err:
my_hash_free(&gtid_hash);
return res;
}
......@@ -6359,18 +6411,28 @@ rpl_slave_state::is_empty()
rpl_binlog_state::rpl_binlog_state()
{
my_hash_init(&hash, &my_charset_bin, 32,
offsetof(rpl_gtid, domain_id), 2*sizeof(uint32), NULL, my_free,
HASH_UNIQUE);
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
MY_MUTEX_INIT_SLOW);
}
void
rpl_binlog_state::reset()
{
uint32 i;
for (i= 0; i < hash.records; ++i)
my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
my_hash_reset(&hash);
}
rpl_binlog_state::~rpl_binlog_state()
{
mysql_mutex_destroy(&LOCK_binlog_state);
reset();
my_hash_free(&hash);
mysql_mutex_destroy(&LOCK_binlog_state);
}
......@@ -6385,68 +6447,130 @@ rpl_binlog_state::~rpl_binlog_state()
int
rpl_binlog_state::update(const struct rpl_gtid *gtid)
{
uchar *rec;
rpl_gtid *lookup_gtid;
element *elem;
rec= my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
if (rec)
elem= (element *)my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
if (elem)
{
const rpl_gtid *old_gtid= (const rpl_gtid *)rec;
if (old_gtid->seq_no > gtid->seq_no)
sql_print_warning("Out-of-order GTIDs detected for "
"domain_id=%u, server_id=%u. "
"Please ensure that independent replication streams "
"use different replication domain_id to avoid "
"inconsistencies.", gtid->domain_id, gtid->server_id);
else
memcpy(rec, gtid, sizeof(*gtid));
/*
By far the most common case is that successive events within same
replication domain have the same server id (it changes only when
switching to a new master). So save a hash lookup in this case.
*/
if (likely(elem->last_gtid->server_id == gtid->server_id))
{
elem->last_gtid->seq_no= gtid->seq_no;
return 0;
}
if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME))))
lookup_gtid= (rpl_gtid *)
my_hash_search(&elem->hash, (const uchar *)&gtid->server_id, 0);
if (lookup_gtid)
{
lookup_gtid->seq_no= gtid->seq_no;
elem->last_gtid= lookup_gtid;
return 0;
}
/* Allocate a new GTID and insert it. */
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
if (!lookup_gtid)
return 1;
memcpy(rec, gtid, sizeof(*gtid));
return my_hash_insert(&hash, rec);
}
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
{
my_free(lookup_gtid);
return 1;
}
elem->last_gtid= lookup_gtid;
return 0;
}
/* First time we see this domain_id; allocate a new element. */
elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
if (elem && lookup_gtid)
{
elem->domain_id= gtid->domain_id;
my_hash_init(&elem->hash, &my_charset_bin, 32,
offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
HASH_UNIQUE);
elem->last_gtid= lookup_gtid;
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
{
lookup_gtid= NULL; /* Do not free. */
if (0 == my_hash_insert(&hash, (const uchar *)elem))
return 0;
}
my_hash_free(&elem->hash);
}
void
rpl_binlog_state::reset()
{
my_hash_reset(&hash);
/* An error. */
if (elem)
my_free(elem);
if (lookup_gtid)
my_free(lookup_gtid);
return 1;
}
uint32
rpl_binlog_state::seq_no_for_server_id(uint32 server_id)
rpl_binlog_state::seq_no_from_state()
{
ulong i;
ulong i, j;
uint64 seq_no= 0;
for (i= 0; i < hash.records; ++i)
{
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
if (gtid->server_id == server_id && gtid->seq_no > seq_no)
element *e= (element *)my_hash_element(&hash, i);
for (j= 0; j < e->hash.records; ++j)
{
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid->seq_no > seq_no)
seq_no= gtid->seq_no;
}
}
return seq_no;
}
/*
Write binlog state to text file, so we can read it in again without having
to scan last binlog file (normal shutdown/startup, not crash recovery).
The most recent GTID within each domain_id is written after any other GTID
within this domain.
*/
int
rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
{
ulong i;
ulong i, j;
char buf[21];
for (i= 0; i < count(); ++i)
for (i= 0; i < hash.records; ++i)
{
size_t res;
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
element *e= (element *)my_hash_element(&hash, i);
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
if (j < e->hash.records)
{
gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid == e->last_gtid)
continue;
}
else
gtid= e->last_gtid;
longlong10_to_str(gtid->seq_no, buf, 10);
res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf);
if (res == (size_t) -1)
return 1;
}
}
return 0;
}
......@@ -6861,6 +6985,78 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
}
uint32
rpl_binlog_state::count()
{
uint32 c= 0;
uint32 i;
for (i= 0; i < hash.records; ++i)
c+= ((element *)my_hash_element(&hash, i))->hash.records;
return c;
}
int
rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
{
uint32 i, j, pos;
pos= 0;
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
if (j < e->hash.records)
{
gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid == e->last_gtid)
continue;
}
else
gtid= e->last_gtid;
if (pos >= list_size)
return 1;
memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
}
}
return 0;
}
/*
Get a list of the most recently binlogged GTID, for each domain_id.
This can be used when switching from being a master to being a slave,
to know where to start replicating from the new master.
The returned list must be de-allocated with my_free().
Returns 0 for ok, non-zero for out-of-memory.
*/
int
rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
{
uint32 i;
*size= hash.records;
if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME))))
return 1;
for (i= 0; i < *size; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid));
}
return 0;
}
#ifdef MYSQL_SERVER
Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
......@@ -6871,12 +7067,7 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
/* Failure to allocate memory will be caught by is_valid() returning false. */
if (count != 0 && count < (1<<28) &&
(list = (rpl_gtid *)my_malloc(count * sizeof(*list), MYF(MY_WME))))
{
uint32 i;
for (i= 0; i < count; ++i)
list[i]= *(rpl_gtid *)my_hash_element(&gtid_set->hash, i);
}
gtid_set->get_gtid_list(list, count);
}
bool
......
......@@ -2981,7 +2981,7 @@ struct rpl_slave_state
uint32 domain_id;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add (list_element *l)
void add(list_element *l)
{
l->next= list;
list= l;
......@@ -3008,7 +3008,7 @@ struct rpl_slave_state
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction);
uint64 next_subid(uint32 domain_id);
int tostring(String *dest);
int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
bool is_empty();
void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
......@@ -3027,10 +3027,21 @@ struct rpl_slave_state
containing a gigen GTID, by simply scanning backwards from the newest
one until a lower seq_no is found in the Gtid_list_log_event at the
start of a binlog for the given domain_id and server_id.
We also remember the last logged GTID for every domain_id. This is used
to know where to start when a master is changed to a slave. As a side
effect, it also allows to skip a hash lookup in the very common case of
logging a new GTID with same server id as last GTID.
*/
struct rpl_binlog_state
{
/* Mapping from (domain_id,server_id) to its GTID. */
struct element {
uint32 domain_id;
HASH hash; /* Containing all server_id for one domain_id */
/* The most recent entry in the hash. */
rpl_gtid *last_gtid;
};
/* Mapping from domain_id to collection of elements. */
HASH hash;
/* Mutex protecting access to the state. */
mysql_mutex_t LOCK_binlog_state;
......@@ -3038,12 +3049,14 @@ struct rpl_binlog_state
rpl_binlog_state();
~rpl_binlog_state();
ulong count() const { return hash.records; }
int update(const struct rpl_gtid *gtid);
void reset();
uint32 seq_no_for_server_id(uint32 server_id);
int update(const struct rpl_gtid *gtid);
uint32 seq_no_from_state();
int write_to_iocache(IO_CACHE *dest);
int read_from_iocache(IO_CACHE *src);
uint32 count();
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
};
......
......@@ -1792,10 +1792,29 @@ when it try to get the value of TIME_ZONE global variable from master.";
char str_buf[256];
String connect_state(str_buf, sizeof(str_buf), system_charset_info);
connect_state.length(0);
rpl_gtid *binlog_gtid_list= NULL;
uint32 num_binlog_gtids= 0;
if (opt_bin_log)
{
int err= mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list,
&num_binlog_gtids);
if (err)
{
err_code= ER_OUTOFMEMORY;
errmsg= "The slave I/O thread stops because a fatal out-of-memory "
"error is encountered when it tries to compute @slave_connect_state.";
sprintf(err_buff, "%s Error: Out of memory", errmsg);
goto err;
}
}
connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"),
system_charset_info);
rpl_global_gtid_slave_state.tostring(&connect_state);
rpl_global_gtid_slave_state.tostring(&connect_state, binlog_gtid_list,
num_binlog_gtids);
if (binlog_gtid_list)
my_free(binlog_gtid_list);
connect_state.append(STRING_WITH_LEN("'"), system_charset_info);
rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length());
if (rc)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment