Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
e86ea27b
Commit
e86ea27b
authored
Mar 09, 2006
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-5.1-new
into mysql.com:/space/pekka/ndb/version/my51
parents
cc736560
cabebacd
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
365 additions
and
118 deletions
+365
-118
mysql-test/include/show_binlog_events.inc
mysql-test/include/show_binlog_events.inc
+4
-0
mysql-test/r/ndb_binlog_ddl_multi.result
mysql-test/r/ndb_binlog_ddl_multi.result
+67
-2
mysql-test/t/ndb_binlog_ddl_multi.test
mysql-test/t/ndb_binlog_ddl_multi.test
+54
-20
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.cc
+202
-94
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+38
-2
No files found.
mysql-test/include/show_binlog_events.inc
0 → 100644
View file @
e86ea27b
--
let
$binlog_start
=
102
--
replace_result
$binlog_start
<
binlog_start
>
--
replace_column
2
# 4 # 5 #
--
eval
show
binlog
events
from
$binlog_start
mysql-test/r/ndb_binlog_ddl_multi.result
View file @
e86ea27b
drop database if exists mysqltest;
drop database if exists mysqltest;
drop table if exists t1,t2;
drop table if exists t1,t2
,t3
;
drop database if exists mysqltest;
drop database if exists mysqltest;
drop table if exists t1,t2;
drop table if exists t1,t2
,t3
;
reset master;
reset master;
reset master;
reset master;
create database mysqltest;
create database mysqltest;
...
@@ -123,3 +123,68 @@ master-bin1.000001 # Table_map # # cluster.apply_status
...
@@ -123,3 +123,68 @@ master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; drop table `t1`
master-bin1.000001 # Query # # use `test`; drop table `t1`
reset master;
show tables;
Tables_in_test
reset master;
show tables;
Tables_in_test
create table t1 (a int key) engine=ndb;
create table t2 (a int key) engine=ndb;
create table t3 (a int key) engine=ndb;
rename table t3 to t4, t2 to t3, t1 to t2, t4 to t1;
show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin1.000001 # Query # # use `test`; create table t1 (a int key) engine=ndb
master-bin1.000001 # Query # # use `test`; create table t2 (a int key) engine=ndb
master-bin1.000001 # Query # # use `test`; create table t3 (a int key) engine=ndb
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t3` to `test.t4`
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t2` to `test.t3`
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t1` to `test.t2`
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t4` to `test.t1`
drop table t1;
drop table t2;
drop table t3;
reset master;
show tables;
Tables_in_test
reset master;
show tables;
Tables_in_test
create table t1 (a int key) engine=ndb;
insert into t1 values(1);
rename table t1 to t2;
insert into t2 values(2);
show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin1.000001 # Query # # use `test`; create table t1 (a int key) engine=ndb
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Table_map # # test.t1
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t1` to `test.t2`
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Table_map # # test.t2
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
drop table t2;
mysql-test/t/ndb_binlog_ddl_multi.test
View file @
e86ea27b
...
@@ -5,18 +5,16 @@
...
@@ -5,18 +5,16 @@
--
disable_warnings
--
disable_warnings
connection
server2
;
connection
server2
;
drop
database
if
exists
mysqltest
;
drop
database
if
exists
mysqltest
;
drop
table
if
exists
t1
,
t2
;
drop
table
if
exists
t1
,
t2
,
t3
;
connection
server1
;
connection
server1
;
drop
database
if
exists
mysqltest
;
drop
database
if
exists
mysqltest
;
drop
table
if
exists
t1
,
t2
;
drop
table
if
exists
t1
,
t2
,
t3
;
--
connection
server1
--
connection
server1
reset
master
;
reset
master
;
--
connection
server2
--
connection
server2
reset
master
;
reset
master
;
--
enable_warnings
--
enable_warnings
--
let
$binlog_start
=
102
#
#
# basic test to see if ddl distribution works across
# basic test to see if ddl distribution works across
# multiple binlogs
# multiple binlogs
...
@@ -33,15 +31,10 @@ create table t1 (a int primary key) engine=ndb;
...
@@ -33,15 +31,10 @@ create table t1 (a int primary key) engine=ndb;
--
connection
server2
--
connection
server2
create
table
t2
(
a
int
primary
key
)
engine
=
ndb
;
create
table
t2
(
a
int
primary
key
)
engine
=
ndb
;
--
replace_result
$binlog_start
<
binlog_start
>
--
source
include
/
show_binlog_events
.
inc
--
replace_column
2
# 4 # 5 #
--
eval
show
binlog
events
from
$binlog_start
--
connection
server1
--
connection
server1
--
replace_result
$binlog_start
<
binlog_start
>
--
source
include
/
show_binlog_events
.
inc
--
replace_column
2
# 4 # 5 #
--
eval
show
binlog
events
from
$binlog_start
# alter table
# alter table
--
connection
server1
--
connection
server1
...
@@ -53,9 +46,7 @@ reset master;
...
@@ -53,9 +46,7 @@ reset master;
alter
table
t2
add
column
(
b
int
);
alter
table
t2
add
column
(
b
int
);
--
connections
server1
--
connections
server1
--
replace_result
$binlog_start
<
binlog_start
>
--
source
include
/
show_binlog_events
.
inc
--
replace_column
2
# 4 # 5 #
--
eval
show
binlog
events
from
$binlog_start
# alter database
# alter database
...
@@ -91,9 +82,7 @@ drop database mysqltest;
...
@@ -91,9 +82,7 @@ drop database mysqltest;
create
table
t1
(
a
int
primary
key
)
engine
=
ndb
;
create
table
t1
(
a
int
primary
key
)
engine
=
ndb
;
--
connection
server2
--
connection
server2
--
replace_result
$binlog_start
<
binlog_start
>
--
source
include
/
show_binlog_events
.
inc
--
replace_column
2
# 4 # 5 #
--
eval
show
binlog
events
from
$binlog_start
--
connection
server2
--
connection
server2
drop
table
t2
;
drop
table
t2
;
...
@@ -144,6 +133,51 @@ ENGINE =NDB;
...
@@ -144,6 +133,51 @@ ENGINE =NDB;
drop
table
t1
;
drop
table
t1
;
--
connection
server2
--
connection
server2
--
replace_result
$binlog_start
<
binlog_start
>
--
source
include
/
show_binlog_events
.
inc
--
replace_column
2
# 4 # 5 #
--
eval
show
binlog
events
from
$binlog_start
#
# Bug #17827 cluster: rename of several tables in one statement,
# gets multiply logged
#
--
connection
server1
reset
master
;
show
tables
;
--
connection
server2
reset
master
;
show
tables
;
--
connection
server1
create
table
t1
(
a
int
key
)
engine
=
ndb
;
create
table
t2
(
a
int
key
)
engine
=
ndb
;
create
table
t3
(
a
int
key
)
engine
=
ndb
;
rename
table
t3
to
t4
,
t2
to
t3
,
t1
to
t2
,
t4
to
t1
;
--
connection
server2
--
source
include
/
show_binlog_events
.
inc
drop
table
t1
;
drop
table
t2
;
drop
table
t3
;
#
# Bug #17838 binlog not setup on seconday master after rename
#
#
--
connection
server1
reset
master
;
show
tables
;
--
connection
server2
reset
master
;
show
tables
;
--
connection
server1
create
table
t1
(
a
int
key
)
engine
=
ndb
;
insert
into
t1
values
(
1
);
rename
table
t1
to
t2
;
insert
into
t2
values
(
2
);
# now we should see data in table t1 _and_ t2
# prior to bug fix, data was missing for t2
--
connection
server2
--
source
include
/
show_binlog_events
.
inc
drop
table
t2
;
sql/ha_ndbcluster_binlog.cc
View file @
e86ea27b
...
@@ -230,12 +230,25 @@ static void run_query(THD *thd, char *buf, char *end,
...
@@ -230,12 +230,25 @@ static void run_query(THD *thd, char *buf, char *end,
}
}
}
}
int
static
void
ndbcluster_binlog_close_table
(
THD
*
thd
,
NDB_SHARE
*
share
)
{
DBUG_ENTER
(
"ndbcluster_binlog_close_table"
);
if
(
share
->
table_share
)
{
free_table_share
(
share
->
table_share
);
share
->
table_share
=
0
;
share
->
table
=
0
;
}
DBUG_ASSERT
(
share
->
table
==
0
);
DBUG_VOID_RETURN
;
}
static
int
ndbcluster_binlog_open_table
(
THD
*
thd
,
NDB_SHARE
*
share
,
ndbcluster_binlog_open_table
(
THD
*
thd
,
NDB_SHARE
*
share
,
TABLE_SHARE
*
table_share
,
TABLE
*
table
)
TABLE_SHARE
*
table_share
,
TABLE
*
table
)
{
{
int
error
;
int
error
;
MEM_ROOT
*
mem_root
=
&
share
->
mem_root
;
DBUG_ENTER
(
"ndbcluster_binlog_open_table"
);
DBUG_ENTER
(
"ndbcluster_binlog_open_table"
);
init_tmp_table_share
(
table_share
,
share
->
db
,
0
,
share
->
table_name
,
init_tmp_table_share
(
table_share
,
share
->
db
,
0
,
share
->
table_name
,
...
@@ -274,22 +287,13 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
...
@@ -274,22 +287,13 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
table
->
s
->
table_name
.
str
=
share
->
table_name
;
table
->
s
->
table_name
.
str
=
share
->
table_name
;
table
->
s
->
table_name
.
length
=
strlen
(
share
->
table_name
);
table
->
s
->
table_name
.
length
=
strlen
(
share
->
table_name
);
DBUG_ASSERT
(
share
->
table_share
==
0
);
share
->
table_share
=
table_share
;
share
->
table_share
=
table_share
;
DBUG_ASSERT
(
share
->
table
==
0
);
share
->
table
=
table
;
share
->
table
=
table
;
#ifndef DBUG_OFF
#ifndef DBUG_OFF
dbug_print_table
(
"table"
,
table
);
dbug_print_table
(
"table"
,
table
);
#endif
#endif
/*
! do not touch the contents of the table
it may be in use by the injector thread
*/
share
->
ndb_value
[
0
]
=
(
NdbValue
*
)
alloc_root
(
mem_root
,
sizeof
(
NdbValue
)
*
(
table
->
s
->
fields
+
2
/*extra for hidden key and part key*/
));
share
->
ndb_value
[
1
]
=
(
NdbValue
*
)
alloc_root
(
mem_root
,
sizeof
(
NdbValue
)
*
(
table
->
s
->
fields
+
2
/*extra for hidden key and part key*/
));
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
...
@@ -351,6 +355,18 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
...
@@ -351,6 +355,18 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
TABLE
*
table
=
(
TABLE
*
)
my_malloc
(
sizeof
(
*
table
),
MYF
(
MY_WME
));
TABLE
*
table
=
(
TABLE
*
)
my_malloc
(
sizeof
(
*
table
),
MYF
(
MY_WME
));
if
((
error
=
ndbcluster_binlog_open_table
(
thd
,
share
,
table_share
,
table
)))
if
((
error
=
ndbcluster_binlog_open_table
(
thd
,
share
,
table_share
,
table
)))
break
;
break
;
/*
! do not touch the contents of the table
it may be in use by the injector thread
*/
MEM_ROOT
*
mem_root
=
&
share
->
mem_root
;
share
->
ndb_value
[
0
]
=
(
NdbValue
*
)
alloc_root
(
mem_root
,
sizeof
(
NdbValue
)
*
(
table
->
s
->
fields
+
2
/*extra for hidden key and part key*/
));
share
->
ndb_value
[
1
]
=
(
NdbValue
*
)
alloc_root
(
mem_root
,
sizeof
(
NdbValue
)
*
(
table
->
s
->
fields
+
2
/*extra for hidden key and part key*/
));
if
(
table
->
s
->
primary_key
==
MAX_KEY
)
if
(
table
->
s
->
primary_key
==
MAX_KEY
)
share
->
flags
|=
NSF_HIDDEN_PK
;
share
->
flags
|=
NSF_HIDDEN_PK
;
if
(
table
->
s
->
blob_fields
!=
0
)
if
(
table
->
s
->
blob_fields
!=
0
)
...
@@ -1156,8 +1172,11 @@ end:
...
@@ -1156,8 +1172,11 @@ end:
(
void
)
pthread_mutex_unlock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_unlock
(
&
share
->
mutex
);
}
}
if
(
get_a_share
)
if
(
get_a_share
&&
share
)
{
free_share
(
&
share
);
free_share
(
&
share
);
share
=
0
;
}
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
...
@@ -1314,22 +1333,35 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
...
@@ -1314,22 +1333,35 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE
*
share
)
NDB_SHARE
*
share
)
{
{
DBUG_ENTER
(
"ndb_handle_schema_change"
);
DBUG_ENTER
(
"ndb_handle_schema_change"
);
int
remote_drop_table
=
0
,
do_close_cached_tables
=
0
;
bool
do_close_cached_tables
=
FALSE
;
const
char
*
dbname
=
share
->
table
->
s
->
db
.
str
;
bool
is_online_alter_table
=
FALSE
;
const
char
*
tabname
=
share
->
table
->
s
->
table_name
.
str
;
bool
is_rename_table
=
FALSE
;
bool
online_alter_table
=
(
pOp
->
getEventType
()
==
NDBEVENT
::
TE_ALTER
&&
bool
is_remote_change
=
pOp
->
tableFrmChanged
());
(
uint
)
pOp
->
getReqNodeId
()
!=
g_ndb_cluster_connection
->
node_id
();
if
(
pOp
->
getEventType
()
==
NDBEVENT
::
TE_ALTER
)
{
if
(
pOp
->
tableFrmChanged
())
{
is_online_alter_table
=
TRUE
;
}
else
{
DBUG_ASSERT
(
pOp
->
tableNameChanged
());
is_rename_table
=
TRUE
;
}
}
if
(
pOp
->
getEventType
()
!=
NDBEVENT
::
TE_CLUSTER_FAILURE
&&
if
(
is_remote_change
)
/* includes CLUSTER_FAILURE */
(
uint
)
pOp
->
getReqNodeId
()
!=
g_ndb_cluster_connection
->
node_id
())
{
{
TABLE_SHARE
*
table_share
=
share
->
table
->
s
;
TABLE
*
table
=
share
->
table
;
TABLE
*
table
=
share
->
table
;
TABLE_SHARE
*
table_share
=
table
->
s
;
const
char
*
dbname
=
table_share
->
db
.
str
;
/*
/*
Invalidate table and all it's indexes
Invalidate table and all it's indexes
*/
*/
ndb
->
setDatabaseName
(
share
->
table
->
s
->
db
.
str
);
ndb
->
setDatabaseName
(
dbname
);
Thd_ndb
*
thd_ndb
=
get_thd_ndb
(
thd
);
Thd_ndb
*
thd_ndb
=
get_thd_ndb
(
thd
);
DBUG_ASSERT
(
thd_ndb
!=
NULL
);
DBUG_ASSERT
(
thd_ndb
!=
NULL
);
Ndb
*
old_ndb
=
thd_ndb
->
ndb
;
Ndb
*
old_ndb
=
thd_ndb
->
ndb
;
...
@@ -1341,8 +1373,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
...
@@ -1341,8 +1373,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
table_handler
.
invalidate_dictionary_cache
(
TRUE
);
table_handler
.
invalidate_dictionary_cache
(
TRUE
);
thd_ndb
->
ndb
=
old_ndb
;
thd_ndb
->
ndb
=
old_ndb
;
if
(
online_alter_table
)
if
(
is_online_alter_table
)
{
{
const
char
*
tabname
=
table_share
->
table_name
.
str
;
char
key
[
FN_REFLEN
];
char
key
[
FN_REFLEN
];
const
void
*
data
=
0
,
*
pack_data
=
0
;
const
void
*
data
=
0
,
*
pack_data
=
0
;
uint
length
,
pack_length
;
uint
length
,
pack_length
;
...
@@ -1364,17 +1397,18 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
...
@@ -1364,17 +1397,18 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
DBUG_DUMP
(
"frm"
,
(
char
*
)
altered_table
->
getFrmData
(),
DBUG_DUMP
(
"frm"
,
(
char
*
)
altered_table
->
getFrmData
(),
altered_table
->
getFrmLength
());
altered_table
->
getFrmLength
());
pthread_mutex_lock
(
&
LOCK_open
);
pthread_mutex_lock
(
&
LOCK_open
);
const
NDBTAB
*
old
=
dict
->
getTable
(
tabname
);
const
NDBTAB
*
old
=
dict
->
getTable
(
tabname
);
if
(
!
old
&&
if
(
!
old
&&
old
->
getObjectVersion
()
!=
altered_table
->
getObjectVersion
())
old
->
getObjectVersion
()
!=
altered_table
->
getObjectVersion
())
dict
->
putTable
(
altered_table
);
dict
->
putTable
(
altered_table
);
if
((
error
=
unpackfrm
(
&
data
,
&
length
,
altered_table
->
getFrmData
()))
||
if
((
error
=
unpackfrm
(
&
data
,
&
length
,
altered_table
->
getFrmData
()))
||
(
error
=
writefrm
(
key
,
data
,
length
)))
(
error
=
writefrm
(
key
,
data
,
length
)))
{
{
sql_print_information
(
"NDB: Failed write frm for %s.%s, error %d"
,
sql_print_information
(
"NDB: Failed write frm for %s.%s, error %d"
,
dbname
,
tabname
,
error
);
dbname
,
tabname
,
error
);
}
}
ndbcluster_binlog_close_table
(
thd
,
share
);
close_cached_tables
((
THD
*
)
0
,
0
,
(
TABLE_LIST
*
)
0
,
TRUE
);
close_cached_tables
((
THD
*
)
0
,
0
,
(
TABLE_LIST
*
)
0
,
TRUE
);
if
((
error
=
ndbcluster_binlog_open_table
(
thd
,
share
,
if
((
error
=
ndbcluster_binlog_open_table
(
thd
,
share
,
table_share
,
table
)))
table_share
,
table
)))
...
@@ -1383,11 +1417,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
...
@@ -1383,11 +1417,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
pthread_mutex_unlock
(
&
LOCK_open
);
pthread_mutex_unlock
(
&
LOCK_open
);
}
}
}
}
remote_drop_table
=
1
;
}
}
// If only frm was changed continue replicating
// If only frm was changed continue replicating
if
(
online_alter_table
)
if
(
is_
online_alter_table
)
{
{
/* Signal ha_ndbcluster::alter_table that drop is done */
/* Signal ha_ndbcluster::alter_table that drop is done */
(
void
)
pthread_cond_signal
(
&
injector_cond
);
(
void
)
pthread_cond_signal
(
&
injector_cond
);
...
@@ -1395,6 +1428,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
...
@@ -1395,6 +1428,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
}
}
(
void
)
pthread_mutex_lock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_lock
(
&
share
->
mutex
);
if
(
is_rename_table
&&
!
is_remote_change
)
{
DBUG_PRINT
(
"info"
,
(
"Detected name change of table %s.%s"
,
share
->
db
,
share
->
table_name
));
/* ToDo: remove printout */
if
(
ndb_extra_logging
)
sql_print_information
(
"NDB Binlog: rename table %s%s/%s -> %s."
,
share_prefix
,
share
->
table
->
s
->
db
.
str
,
share
->
table
->
s
->
table_name
.
str
,
share
->
key
);
/* do the rename of the table in the share */
share
->
table
->
s
->
db
.
str
=
share
->
db
;
share
->
table
->
s
->
db
.
length
=
strlen
(
share
->
db
);
share
->
table
->
s
->
table_name
.
str
=
share
->
table_name
;
share
->
table
->
s
->
table_name
.
length
=
strlen
(
share
->
table_name
);
}
DBUG_ASSERT
(
share
->
op
==
pOp
||
share
->
op_old
==
pOp
);
DBUG_ASSERT
(
share
->
op
==
pOp
||
share
->
op_old
==
pOp
);
if
(
share
->
op_old
==
pOp
)
if
(
share
->
op_old
==
pOp
)
share
->
op_old
=
0
;
share
->
op_old
=
0
;
...
@@ -1408,11 +1457,11 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
...
@@ -1408,11 +1457,11 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
pthread_mutex_lock
(
&
ndbcluster_mutex
);
pthread_mutex_lock
(
&
ndbcluster_mutex
);
free_share
(
&
share
,
TRUE
);
free_share
(
&
share
,
TRUE
);
if
(
remote_drop_tabl
e
&&
share
&&
share
->
state
!=
NSS_DROPPED
)
if
(
is_remote_chang
e
&&
share
&&
share
->
state
!=
NSS_DROPPED
)
{
{
DBUG_PRINT
(
"info"
,
(
"remote
drop tabl
e"
));
DBUG_PRINT
(
"info"
,
(
"remote
chang
e"
));
if
(
share
->
use_count
!=
1
)
if
(
share
->
use_count
!=
1
)
do_close_cached_tables
=
1
;
do_close_cached_tables
=
TRUE
;
share
->
state
=
NSS_DROPPED
;
share
->
state
=
NSS_DROPPED
;
free_share
(
&
share
,
TRUE
);
free_share
(
&
share
,
TRUE
);
}
}
...
@@ -1464,24 +1513,45 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1464,24 +1513,45 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
int
log_query
=
0
;
int
log_query
=
0
;
DBUG_PRINT
(
"info"
,
(
"log query_length: %d query: '%s'"
,
DBUG_PRINT
(
"info"
,
(
"log query_length: %d query: '%s'"
,
schema
->
query_length
,
schema
->
query
));
schema
->
query_length
,
schema
->
query
));
char
key
[
FN_REFLEN
];
build_table_filename
(
key
,
sizeof
(
key
),
schema
->
db
,
schema
->
name
,
""
);
NDB_SHARE
*
share
=
get_share
(
key
,
0
,
false
,
false
);
switch
((
enum
SCHEMA_OP_TYPE
)
schema
->
type
)
switch
((
enum
SCHEMA_OP_TYPE
)
schema
->
type
)
{
{
case
SOT_DROP_TABLE
:
case
SOT_DROP_TABLE
:
/* binlog dropping table after any table operations */
/* binlog dropping table after any table operations */
if
(
ndb_binlog_running
)
if
(
share
&&
share
->
op
)
post_epoch_log_list
->
push_back
(
schema
,
mem_root
);
post_epoch_log_list
->
push_back
(
schema
,
mem_root
);
log_query
=
0
;
log_query
=
0
;
break
;
break
;
case
SOT_RENAME_TABLE
:
case
SOT_RENAME_TABLE
:
/* fall through */
if
(
share
&&
share
->
op
)
{
log_query
=
0
;
post_epoch_log_list
->
push_back
(
schema
,
mem_root
);
break
;
/* discovery will be handled by binlog */
}
goto
sot_create_table
;
case
SOT_ALTER_TABLE
:
case
SOT_ALTER_TABLE
:
if
(
ndb_binlog_running
)
if
(
share
&&
share
->
op
)
{
{
log_query
=
1
;
log_query
=
0
;
post_epoch_log_list
->
push_back
(
schema
,
mem_root
);
break
;
/* discovery will be handled by binlog */
break
;
/* discovery will be handled by binlog */
}
}
/* fall through */
goto
sot_create_table
;
case
SOT_CREATE_TABLE
:
case
SOT_CREATE_TABLE
:
sot_create_table:
/*
we need to free any share here as command below
may need to call handle_trailing_share
*/
if
(
share
)
{
free_share
(
&
share
);
share
=
0
;
}
pthread_mutex_lock
(
&
LOCK_open
);
pthread_mutex_lock
(
&
LOCK_open
);
if
(
ndb_create_table_from_engine
(
thd
,
schema
->
db
,
schema
->
name
))
if
(
ndb_create_table_from_engine
(
thd
,
schema
->
db
,
schema
->
name
))
{
{
...
@@ -1514,10 +1584,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1514,10 +1584,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
break
;
break
;
case
SOT_CLEAR_SLOCK
:
case
SOT_CLEAR_SLOCK
:
{
{
char
key
[
FN_REFLEN
];
build_table_filename
(
key
,
sizeof
(
key
),
schema
->
db
,
schema
->
name
,
""
);
NDB_SHARE
*
share
=
get_share
(
key
,
0
,
false
,
false
);
if
(
share
)
if
(
share
)
{
{
pthread_mutex_lock
(
&
share
->
mutex
);
pthread_mutex_lock
(
&
share
->
mutex
);
...
@@ -1528,6 +1594,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1528,6 +1594,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
pthread_mutex_unlock
(
&
share
->
mutex
);
pthread_mutex_unlock
(
&
share
->
mutex
);
pthread_cond_signal
(
&
injector_cond
);
pthread_cond_signal
(
&
injector_cond
);
free_share
(
&
share
);
free_share
(
&
share
);
share
=
0
;
}
}
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
...
@@ -1536,7 +1603,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1536,7 +1603,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
log_query
=
1
;
log_query
=
1
;
break
;
break
;
}
}
if
(
share
)
{
free_share
(
&
share
);
share
=
0
;
}
/* signal that schema operation has been handled */
/* signal that schema operation has been handled */
if
((
enum
SCHEMA_OP_TYPE
)
schema
->
type
!=
SOT_CLEAR_SLOCK
)
if
((
enum
SCHEMA_OP_TYPE
)
schema
->
type
!=
SOT_CLEAR_SLOCK
)
{
{
...
@@ -1571,23 +1642,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1571,23 +1642,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
case
NDBEVENT
:
:
TE_DELETE
:
case
NDBEVENT
:
:
TE_DELETE
:
// skip
// skip
break
;
break
;
case
NDBEVENT
:
:
TE_ALTER
:
if
(
pOp
->
tableNameChanged
())
{
DBUG_PRINT
(
"info"
,
(
"Detected name change of table %s.%s"
,
share
->
db
,
share
->
table_name
));
/* do the rename of the table in the share */
share
->
table
->
s
->
db
.
str
=
share
->
db
;
share
->
table
->
s
->
db
.
length
=
strlen
(
share
->
db
);
share
->
table
->
s
->
table_name
.
str
=
share
->
table_name
;
share
->
table
->
s
->
table_name
.
length
=
strlen
(
share
->
table_name
);
}
ndb_handle_schema_change
(
thd
,
ndb
,
pOp
,
share
);
break
;
case
NDBEVENT
:
:
TE_CLUSTER_FAILURE
:
case
NDBEVENT
:
:
TE_CLUSTER_FAILURE
:
case
NDBEVENT
:
:
TE_DROP
:
case
NDBEVENT
:
:
TE_DROP
:
free_share
(
&
schema_share
);
free_share
(
&
schema_share
);
schema_share
=
0
;
schema_share
=
0
;
// fall through
case
NDBEVENT
:
:
TE_ALTER
:
ndb_handle_schema_change
(
thd
,
ndb
,
pOp
,
share
);
ndb_handle_schema_change
(
thd
,
ndb
,
pOp
,
share
);
break
;
break
;
case
NDBEVENT
:
:
TE_NODE_FAILURE
:
case
NDBEVENT
:
:
TE_NODE_FAILURE
:
...
@@ -1659,6 +1719,72 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1659,6 +1719,72 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
/*
process any operations that should be done after
the epoch is complete
*/
static
void
ndb_binlog_thread_handle_schema_event_post_epoch
(
THD
*
thd
,
List
<
Cluster_replication_schema
>
*
post_epoch_log_list
,
List
<
Cluster_replication_schema
>
*
post_epoch_unlock_list
)
{
DBUG_ENTER
(
"ndb_binlog_thread_handle_schema_event_post_epoch"
);
Cluster_replication_schema
*
schema
;
while
((
schema
=
post_epoch_log_list
->
pop
()))
{
DBUG_PRINT
(
"info"
,
(
"log query_length: %d query: '%s'"
,
schema
->
query_length
,
schema
->
query
));
{
char
key
[
FN_REFLEN
];
build_table_filename
(
key
,
sizeof
(
key
),
schema
->
db
,
schema
->
name
,
""
);
NDB_SHARE
*
share
=
get_share
(
key
,
0
,
false
,
false
);
switch
((
enum
SCHEMA_OP_TYPE
)
schema
->
type
)
{
case
SOT_DROP_DB
:
case
SOT_DROP_TABLE
:
break
;
case
SOT_RENAME_TABLE
:
case
SOT_ALTER_TABLE
:
if
(
share
&&
share
->
op
)
{
break
;
/* discovery handled by binlog */
}
pthread_mutex_lock
(
&
LOCK_open
);
if
(
ndb_create_table_from_engine
(
thd
,
schema
->
db
,
schema
->
name
))
{
sql_print_error
(
"Could not discover table '%s.%s' from "
"binlog schema event '%s' from node %d"
,
schema
->
db
,
schema
->
name
,
schema
->
query
,
schema
->
node_id
);
}
pthread_mutex_unlock
(
&
LOCK_open
);
default:
DBUG_ASSERT
(
false
);
}
if
(
share
)
{
free_share
(
&
share
);
share
=
0
;
}
}
{
char
*
thd_db_save
=
thd
->
db
;
thd
->
db
=
schema
->
db
;
thd
->
binlog_query
(
THD
::
STMT_QUERY_TYPE
,
schema
->
query
,
schema
->
query_length
,
FALSE
,
schema
->
name
[
0
]
==
0
);
thd
->
db
=
thd_db_save
;
}
}
while
((
schema
=
post_epoch_unlock_list
->
pop
()))
{
ndbcluster_update_slock
(
thd
,
schema
->
db
,
schema
->
name
);
}
DBUG_VOID_RETURN
;
}
/*
/*
Timer class for doing performance measurements
Timer class for doing performance measurements
*/
*/
...
@@ -2206,6 +2332,10 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
...
@@ -2206,6 +2332,10 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
op
->
mergeEvents
(
true
);
// currently not inherited from event
op
->
mergeEvents
(
true
);
// currently not inherited from event
DBUG_PRINT
(
"info"
,
(
"share->ndb_value[0]: 0x%x"
,
share
->
ndb_value
[
0
]));
DBUG_PRINT
(
"info"
,
(
"share->ndb_value[1]: 0x%x"
,
share
->
ndb_value
[
1
]));
int
n_columns
=
ndbtab
->
getNoOfColumns
();
int
n_columns
=
ndbtab
->
getNoOfColumns
();
int
n_fields
=
table
?
table
->
s
->
fields
:
0
;
// XXX ???
int
n_fields
=
table
?
table
->
s
->
fields
:
0
;
// XXX ???
for
(
int
j
=
0
;
j
<
n_columns
;
j
++
)
for
(
int
j
=
0
;
j
<
n_columns
;
j
++
)
...
@@ -2258,6 +2388,12 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
...
@@ -2258,6 +2388,12 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
}
}
share
->
ndb_value
[
0
][
j
].
ptr
=
attr0
.
ptr
;
share
->
ndb_value
[
0
][
j
].
ptr
=
attr0
.
ptr
;
share
->
ndb_value
[
1
][
j
].
ptr
=
attr1
.
ptr
;
share
->
ndb_value
[
1
][
j
].
ptr
=
attr1
.
ptr
;
DBUG_PRINT
(
"info"
,
(
"&share->ndb_value[0][%d]: 0x%x "
"share->ndb_value[0][%d]: 0x%x"
,
j
,
&
share
->
ndb_value
[
0
][
j
],
j
,
attr0
.
ptr
));
DBUG_PRINT
(
"info"
,
(
"&share->ndb_value[1][%d]: 0x%x "
"share->ndb_value[1][%d]: 0x%x"
,
j
,
&
share
->
ndb_value
[
0
][
j
],
j
,
attr1
.
ptr
));
}
}
op
->
setCustomData
((
void
*
)
share
);
// set before execute
op
->
setCustomData
((
void
*
)
share
);
// set before execute
share
->
op
=
op
;
// assign op in NDB_SHARE
share
->
op
=
op
;
// assign op in NDB_SHARE
...
@@ -2468,24 +2604,6 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
...
@@ -2468,24 +2604,6 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
"op_old: %lx"
,
"op_old: %lx"
,
share
->
key
,
share
,
pOp
,
share
->
op
,
share
->
op_old
));
share
->
key
,
share
,
pOp
,
share
->
op
,
share
->
op_old
));
break
;
break
;
case
NDBEVENT
:
:
TE_ALTER
:
if
(
pOp
->
tableNameChanged
())
{
DBUG_PRINT
(
"info"
,
(
"Detected name change of table %s.%s"
,
share
->
db
,
share
->
table_name
));
/* ToDo: remove printout */
if
(
ndb_extra_logging
)
sql_print_information
(
"NDB Binlog: rename table %s%s/%s -> %s."
,
share_prefix
,
share
->
table
->
s
->
db
.
str
,
share
->
table
->
s
->
table_name
.
str
,
share
->
key
);
/* do the rename of the table in the share */
share
->
table
->
s
->
db
.
str
=
share
->
db
;
share
->
table
->
s
->
db
.
length
=
strlen
(
share
->
db
);
share
->
table
->
s
->
table_name
.
str
=
share
->
table_name
;
share
->
table
->
s
->
table_name
.
length
=
strlen
(
share
->
table_name
);
}
goto
drop_alter_common
;
case
NDBEVENT
:
:
TE_DROP
:
case
NDBEVENT
:
:
TE_DROP
:
if
(
apply_status_share
==
share
)
if
(
apply_status_share
==
share
)
{
{
...
@@ -2495,7 +2613,8 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
...
@@ -2495,7 +2613,8 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
/* ToDo: remove printout */
/* ToDo: remove printout */
if
(
ndb_extra_logging
)
if
(
ndb_extra_logging
)
sql_print_information
(
"NDB Binlog: drop table %s."
,
share
->
key
);
sql_print_information
(
"NDB Binlog: drop table %s."
,
share
->
key
);
drop_alter_common:
// fall through
case
NDBEVENT
:
:
TE_ALTER
:
row
.
n_schemaops
++
;
row
.
n_schemaops
++
;
DBUG_PRINT
(
"info"
,
(
"TABLE %s EVENT: %s received share: 0x%lx op: %lx "
DBUG_PRINT
(
"info"
,
(
"TABLE %s EVENT: %s received share: 0x%lx op: %lx "
"share op: %lx op_old: %lx"
,
"share op: %lx op_old: %lx"
,
...
@@ -3075,26 +3194,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
...
@@ -3075,26 +3194,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
}
}
}
}
/*
ndb_binlog_thread_handle_schema_event_post_epoch
(
thd
,
process any operations that should be done after
&
post_epoch_log_list
,
the epoch is complete
&
post_epoch_unlock_list
);
*/
{
Cluster_replication_schema
*
schema
;
while
((
schema
=
post_epoch_unlock_list
.
pop
()))
{
ndbcluster_update_slock
(
thd
,
schema
->
db
,
schema
->
name
);
}
while
((
schema
=
post_epoch_log_list
.
pop
()))
{
char
*
thd_db_save
=
thd
->
db
;
thd
->
db
=
schema
->
db
;
thd
->
binlog_query
(
THD
::
STMT_QUERY_TYPE
,
schema
->
query
,
schema
->
query_length
,
FALSE
,
schema
->
name
[
0
]
==
0
);
thd
->
db
=
thd_db_save
;
}
}
free_root
(
&
mem_root
,
MYF
(
0
));
free_root
(
&
mem_root
,
MYF
(
0
));
*
root_ptr
=
old_root
;
*
root_ptr
=
old_root
;
ndb_latest_handled_binlog_epoch
=
ndb_latest_received_binlog_epoch
;
ndb_latest_handled_binlog_epoch
=
ndb_latest_received_binlog_epoch
;
...
@@ -3110,9 +3212,15 @@ err:
...
@@ -3110,9 +3212,15 @@ err:
sql_print_information
(
"Stopping Cluster Binlog"
);
sql_print_information
(
"Stopping Cluster Binlog"
);
if
(
apply_status_share
)
if
(
apply_status_share
)
{
free_share
(
&
apply_status_share
);
free_share
(
&
apply_status_share
);
apply_status_share
=
0
;
}
if
(
schema_share
)
if
(
schema_share
)
{
free_share
(
&
schema_share
);
free_share
(
&
schema_share
);
schema_share
=
0
;
}
/* remove all event operations */
/* remove all event operations */
if
(
ndb
)
if
(
ndb
)
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
View file @
e86ea27b
...
@@ -689,9 +689,45 @@ NdbEventOperationImpl::receive_event()
...
@@ -689,9 +689,45 @@ NdbEventOperationImpl::receive_event()
error
.
code
));
error
.
code
));
DBUG_RETURN_EVENT
(
1
);
DBUG_RETURN_EVENT
(
1
);
}
}
if
(
m_eventImpl
->
m_tableImpl
)
delete
m_eventImpl
->
m_tableImpl
;
NdbTableImpl
*
tmp_table_impl
=
m_eventImpl
->
m_tableImpl
;
m_eventImpl
->
m_tableImpl
=
at
;
m_eventImpl
->
m_tableImpl
=
at
;
DBUG_PRINT
(
"info"
,
(
"switching table impl 0x%x -> 0x%x"
,
tmp_table_impl
,
at
));
// change the rec attrs to refer to the new table object
int
i
;
for
(
i
=
0
;
i
<
2
;
i
++
)
{
NdbRecAttr
*
p
=
theFirstPkAttrs
[
i
];
while
(
p
)
{
int
no
=
p
->
getColumn
()
->
getColumnNo
();
NdbColumnImpl
*
tAttrInfo
=
at
->
getColumn
(
no
);
DBUG_PRINT
(
"info"
,
(
"rec_attr: 0x%x "
"switching column impl 0x%x -> 0x%x"
,
p
,
p
->
m_column
,
tAttrInfo
));
p
->
m_column
=
tAttrInfo
;
p
=
p
->
next
();
}
}
for
(
i
=
0
;
i
<
2
;
i
++
)
{
NdbRecAttr
*
p
=
theFirstDataAttrs
[
i
];
while
(
p
)
{
int
no
=
p
->
getColumn
()
->
getColumnNo
();
NdbColumnImpl
*
tAttrInfo
=
at
->
getColumn
(
no
);
DBUG_PRINT
(
"info"
,
(
"rec_attr: 0x%x "
"switching column impl 0x%x -> 0x%x"
,
p
,
p
->
m_column
,
tAttrInfo
));
p
->
m_column
=
tAttrInfo
;
p
=
p
->
next
();
}
}
if
(
tmp_table_impl
)
delete
tmp_table_impl
;
}
}
if
(
unlikely
(
operation
>=
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
))
if
(
unlikely
(
operation
>=
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
))
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment