Commit 4c669601 authored by unknown's avatar unknown

Bug #26783 replication status unknown after cluster or mysqld failure

- update the ndb_apply_status table with binlog info


BitKeeper/etc/ignore:
  Added client/rpl_constants.h to the ignore list
mysql-test/r/rpl_ndb_stm_innodb.result:
  New BitKeeper file ``mysql-test/r/rpl_ndb_stm_innodb.result''
mysql-test/t/rpl_ndb_stm_innodb-master.opt:
  New BitKeeper file ``mysql-test/t/rpl_ndb_stm_innodb-master.opt''
mysql-test/t/rpl_ndb_stm_innodb.test:
  New BitKeeper file ``mysql-test/t/rpl_ndb_stm_innodb.test''
parent 5c602713
...@@ -2956,3 +2956,4 @@ win/vs71cache.txt ...@@ -2956,3 +2956,4 @@ win/vs71cache.txt
win/vs8cache.txt win/vs8cache.txt
zlib/*.ds? zlib/*.ds?
zlib/*.vcproj zlib/*.vcproj
client/rpl_constants.h
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
create table t1 (a int key, b int) engine innodb;
create table t2 (a int key, b int) engine innodb;
alter table t1 engine ndb;
alter table t2 engine ndb;
insert into t1 values (1,2);
select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
@start_pos:=start_pos @end_pos:=end_pos
<start_pos> <end_pos>
show binlog events from <start_pos> limit 1;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 <start_pos> Query 1 # use `test`; insert into t1 values (1,2)
show binlog events from <start_pos> limit 1,1;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Xid 1 445 COMMIT /* XID */
begin;
insert into t1 values (2,3);
insert into t2 values (3,4);
commit;
select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
@start_pos:=start_pos @end_pos:=end_pos
<start_pos> <end_pos>
show binlog events from <start_pos> limit 1;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 <start_pos> Query 1 # use `test`; BEGIN
show binlog events from <start_pos> limit 1,2;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query # # use `test`; insert into t1 values (2,3)
master-bin.000001 # Query # # use `test`; insert into t2 values (3,4)
show binlog events from <start_pos> limit 3,1;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Xid 1 <end_pos> COMMIT /* XID */
--source include/have_ndb.inc
--source include/have_innodb.inc
--source include/have_binlog_format_mixed_or_statement.inc
--source include/master-slave.inc
--connection master
create table t1 (a int key, b int) engine innodb;
create table t2 (a int key, b int) engine innodb;
--sync_slave_with_master
--connection slave
alter table t1 engine ndb;
alter table t2 engine ndb;
# check binlog position without begin
--connection master
insert into t1 values (1,2);
--sync_slave_with_master
--connection slave
--replace_column 1 <start_pos> 2 <end_pos>
select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
--let $start_pos = `select @start_pos`
--let $end_pos = `select @end_pos`
--connection master
# here is actually a bug, since there is no begin statement, the
# query is autocommitted, and end_pos shows end of the insert and not
# end of the commit
--replace_result $start_pos <start_pos>
--replace_column 5 #
--eval show binlog events from $start_pos limit 1
--replace_result $start_pos <start_pos> $end_pos <end_pos>
--replace_column 2 #
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/
--eval show binlog events from $start_pos limit 1,1
# check binlog position with begin
--connection master
begin;
insert into t1 values (2,3);
insert into t2 values (3,4);
commit;
--sync_slave_with_master
--connection slave
--replace_column 1 <start_pos> 2 <end_pos>
select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
--let $start_pos = `select @start_pos`
--let $end_pos = `select @end_pos`
--connection master
--replace_result $start_pos <start_pos>
--replace_column 5 #
--eval show binlog events from $start_pos limit 1
--replace_result $start_pos <start_pos>
--replace_column 2 # 4 # 5 #
--eval show binlog events from $start_pos limit 1,2
--replace_result $start_pos <start_pos> $end_pos <end_pos>
--replace_column 2 #
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/
--eval show binlog events from $start_pos limit 3,1
...@@ -4123,6 +4123,58 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, ...@@ -4123,6 +4123,58 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
- refresh list of the indexes for the table if needed (if altered) - refresh list of the indexes for the table if needed (if altered)
*/ */
#ifdef HAVE_NDB_BINLOG
extern MASTER_INFO *active_mi;
static int ndbcluster_update_apply_status(THD *thd, int do_update)
{
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *ndbtab;
NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
ndb->setDatabaseName(NDB_REP_DB);
Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE);
if (!(ndbtab= ndbtab_g.get_table()))
{
return -1;
}
NdbOperation *op= 0;
int r= 0;
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
if (do_update)
r|= op->updateTuple();
else
r|= op->writeTuple();
DBUG_ASSERT(r == 0);
// server_id
r|= op->equal(0u, (Uint64)thd->server_id);
DBUG_ASSERT(r == 0);
if (!do_update)
{
// epoch
r|= op->setValue(1u, (Uint64)0);
DBUG_ASSERT(r == 0);
}
// log_name
char tmp_buf[FN_REFLEN];
ndb_pack_varchar(ndbtab->getColumn(2u), tmp_buf,
active_mi->rli.group_master_log_name,
strlen(active_mi->rli.group_master_log_name));
r|= op->setValue(2u, tmp_buf);
DBUG_ASSERT(r == 0);
// start_pos
r|= op->setValue(3u, (Uint64)active_mi->rli.group_master_log_pos);
DBUG_ASSERT(r == 0);
// end_pos
r|= op->setValue(4u, (Uint64)active_mi->rli.group_master_log_pos +
((Uint64)active_mi->rli.future_event_relay_log_pos -
(Uint64)active_mi->rli.group_relay_log_pos));
DBUG_ASSERT(r == 0);
return 0;
}
#endif /* HAVE_NDB_BINLOG */
int ha_ndbcluster::external_lock(THD *thd, int lock_type) int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{ {
int error=0; int error=0;
...@@ -4173,6 +4225,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -4173,6 +4225,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd_ndb->init_open_tables(); thd_ndb->init_open_tables();
thd_ndb->stmt= trans; thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL; thd_ndb->query_state&= NDB_QUERY_NORMAL;
thd_ndb->trans_options= 0;
trans_register_ha(thd, FALSE, ndbcluster_hton); trans_register_ha(thd, FALSE, ndbcluster_hton);
} }
else else
...@@ -4189,6 +4242,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -4189,6 +4242,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd_ndb->init_open_tables(); thd_ndb->init_open_tables();
thd_ndb->all= trans; thd_ndb->all= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL; thd_ndb->query_state&= NDB_QUERY_NORMAL;
thd_ndb->trans_options= 0;
trans_register_ha(thd, TRUE, ndbcluster_hton); trans_register_ha(thd, TRUE, ndbcluster_hton);
/* /*
...@@ -4229,7 +4283,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -4229,7 +4283,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
// Start of transaction // Start of transaction
m_rows_changed= 0; m_rows_changed= 0;
m_ops_pending= 0; m_ops_pending= 0;
#ifdef HAVE_NDB_BINLOG
if (m_share == ndb_apply_status_share && thd->slave_thread)
thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
#endif
// TODO remove double pointers... // TODO remove double pointers...
m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table); m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table);
m_table_info= &m_thd_ndb_share->stat; m_table_info= &m_thd_ndb_share->stat;
...@@ -4372,6 +4429,11 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all) ...@@ -4372,6 +4429,11 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
"stmt" : "all")); "stmt" : "all"));
DBUG_ASSERT(ndb && trans); DBUG_ASSERT(ndb && trans);
#ifdef HAVE_NDB_BINLOG
if (thd->slave_thread)
ndbcluster_update_apply_status(thd, thd_ndb->trans_options & TNTO_INJECTED_APPLY_STATUS);
#endif /* HAVE_NDB_BINLOG */
if (execute_commit(thd,trans) != 0) if (execute_commit(thd,trans) != 0)
{ {
const NdbError err= trans->getNdbError(); const NdbError err= trans->getNdbError();
......
...@@ -593,6 +593,11 @@ enum THD_NDB_OPTIONS ...@@ -593,6 +593,11 @@ enum THD_NDB_OPTIONS
TNO_NO_LOG_SCHEMA_OP= 1 << 0 TNO_NO_LOG_SCHEMA_OP= 1 << 0
}; };
enum THD_NDB_TRANS_OPTIONS
{
TNTO_INJECTED_APPLY_STATUS= 1 << 0
};
struct Ndb_local_table_statistics { struct Ndb_local_table_statistics {
int no_uncommitted_rows_count; int no_uncommitted_rows_count;
ulong last_count; ulong last_count;
...@@ -620,6 +625,7 @@ class Thd_ndb ...@@ -620,6 +625,7 @@ class Thd_ndb
NdbTransaction *stmt; NdbTransaction *stmt;
int error; int error;
uint32 options; uint32 options;
uint32 trans_options;
List<NDB_SHARE> changed_tables; List<NDB_SHARE> changed_tables;
uint query_state; uint query_state;
HASH open_tables; HASH open_tables;
......
...@@ -953,7 +953,7 @@ static void ndbcluster_get_schema(NDB_SHARE *share, ...@@ -953,7 +953,7 @@ static void ndbcluster_get_schema(NDB_SHARE *share,
/* /*
helper function to pack a ndb varchar helper function to pack a ndb varchar
*/ */
static char *ndb_pack_varchar(const NDBCOL *col, char *buf, char *ndb_pack_varchar(const NDBCOL *col, char *buf,
const char *str, int sz) const char *str, int sz)
{ {
switch (col->getArrayType()) switch (col->getArrayType())
......
...@@ -181,6 +181,8 @@ int ndbcluster_find_all_files(THD *thd); ...@@ -181,6 +181,8 @@ int ndbcluster_find_all_files(THD *thd);
void ndb_unpack_record(TABLE *table, NdbValue *value, void ndb_unpack_record(TABLE *table, NdbValue *value,
MY_BITMAP *defined, byte *buf); MY_BITMAP *defined, byte *buf);
char *ndb_pack_varchar(const NDBCOL *col, char *buf,
const char *str, int sz);
NDB_SHARE *ndbcluster_get_share(const char *key, NDB_SHARE *ndbcluster_get_share(const char *key,
TABLE *table, TABLE *table,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment