Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
2c7d48c8
Commit
2c7d48c8
authored
Apr 04, 2006
by
tomas@poseidon.ndb.mysql.com
Browse files
Options
Browse Files
Download
Plain Diff
Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new
into poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new
parents
7accd684
d0ca8374
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
162 additions
and
67 deletions
+162
-67
sql/ha_ndbcluster.h
sql/ha_ndbcluster.h
+0
-2
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.cc
+162
-65
No files found.
sql/ha_ndbcluster.h
View file @
2c7d48c8
...
@@ -115,8 +115,6 @@ typedef struct st_ndbcluster_share {
...
@@ -115,8 +115,6 @@ typedef struct st_ndbcluster_share {
TABLE
*
table
;
TABLE
*
table
;
NdbValue
*
ndb_value
[
2
];
NdbValue
*
ndb_value
[
2
];
MY_BITMAP
*
subscriber_bitmap
;
MY_BITMAP
*
subscriber_bitmap
;
MY_BITMAP
slock_bitmap
;
uint32
slock
[
256
/
32
];
// 256 bits for lock status of table
#endif
#endif
}
NDB_SHARE
;
}
NDB_SHARE
;
...
...
sql/ha_ndbcluster_binlog.cc
View file @
2c7d48c8
...
@@ -86,6 +86,22 @@ static ulonglong ndb_latest_received_binlog_epoch= 0;
...
@@ -86,6 +86,22 @@ static ulonglong ndb_latest_received_binlog_epoch= 0;
NDB_SHARE
*
apply_status_share
=
0
;
NDB_SHARE
*
apply_status_share
=
0
;
NDB_SHARE
*
schema_share
=
0
;
NDB_SHARE
*
schema_share
=
0
;
/* Schema object distribution handling */
HASH
ndb_schema_objects
;
typedef
struct
st_ndb_schema_object
{
pthread_mutex_t
mutex
;
char
*
key
;
uint
key_length
;
uint
use_count
;
MY_BITMAP
slock_bitmap
;
uint32
slock
[
256
/
32
];
// 256 bits for lock status of table
}
NDB_SCHEMA_OBJECT
;
static
NDB_SCHEMA_OBJECT
*
ndb_get_schema_object
(
const
char
*
key
,
my_bool
create_if_not_exists
,
my_bool
have_lock
);
static
void
ndb_free_schema_object
(
NDB_SCHEMA_OBJECT
**
ndb_schema_object
,
bool
have_lock
);
/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
extern
Uint64
g_latest_trans_gci
;
extern
Uint64
g_latest_trans_gci
;
...
@@ -328,9 +344,6 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
...
@@ -328,9 +344,6 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
max_ndb_nodes
,
false
);
max_ndb_nodes
,
false
);
bitmap_clear_all
(
&
share
->
subscriber_bitmap
[
i
]);
bitmap_clear_all
(
&
share
->
subscriber_bitmap
[
i
]);
}
}
bitmap_init
(
&
share
->
slock_bitmap
,
share
->
slock
,
sizeof
(
share
->
slock
)
*
8
,
false
);
bitmap_clear_all
(
&
share
->
slock_bitmap
);
}
}
if
(
!
do_event_op
)
if
(
!
do_event_op
)
...
@@ -952,7 +965,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
...
@@ -952,7 +965,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
}
}
char
tmp_buf2
[
FN_REFLEN
];
char
tmp_buf2
[
FN_REFLEN
];
int
get_a_share
=
0
;
switch
(
type
)
switch
(
type
)
{
{
case
SOT_DROP_TABLE
:
case
SOT_DROP_TABLE
:
...
@@ -963,8 +975,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
...
@@ -963,8 +975,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
query
=
tmp_buf2
;
query
=
tmp_buf2
;
query_length
=
(
uint
)
(
strxmov
(
tmp_buf2
,
"drop table `"
,
query_length
=
(
uint
)
(
strxmov
(
tmp_buf2
,
"drop table `"
,
table_name
,
"`"
,
NullS
)
-
tmp_buf2
);
table_name
,
"`"
,
NullS
)
-
tmp_buf2
);
if
(
!
share
)
get_a_share
=
1
;
break
;
break
;
case
SOT_RENAME_TABLE
:
case
SOT_RENAME_TABLE
:
/* redo the rename table query as is may contain several tables */
/* redo the rename table query as is may contain several tables */
...
@@ -972,14 +982,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
...
@@ -972,14 +982,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
query_length
=
(
uint
)
(
strxmov
(
tmp_buf2
,
"rename table `"
,
query_length
=
(
uint
)
(
strxmov
(
tmp_buf2
,
"rename table `"
,
old_db
,
"."
,
old_table_name
,
"` to `"
,
old_db
,
"."
,
old_table_name
,
"` to `"
,
db
,
"."
,
table_name
,
"`"
,
NullS
)
-
tmp_buf2
);
db
,
"."
,
table_name
,
"`"
,
NullS
)
-
tmp_buf2
);
if
(
!
share
)
get_a_share
=
1
;
break
;
break
;
case
SOT_CREATE_TABLE
:
case
SOT_CREATE_TABLE
:
// fall through
// fall through
case
SOT_ALTER_TABLE
:
case
SOT_ALTER_TABLE
:
if
(
!
share
)
get_a_share
=
1
;
break
;
break
;
case
SOT_DROP_DB
:
case
SOT_DROP_DB
:
break
;
break
;
...
@@ -995,18 +1001,18 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
...
@@ -995,18 +1001,18 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
abort
();
/* should not happen, programming error */
abort
();
/* should not happen, programming error */
}
}
if
(
get_a_share
)
NDB_SCHEMA_OBJECT
*
ndb_schema_object
;
{
{
char
key
[
FN_REFLEN
];
char
key
[
FN_REFLEN
];
build_table_filename
(
key
,
sizeof
(
key
),
db
,
table_name
,
""
);
build_table_filename
(
key
,
sizeof
(
key
),
db
,
table_name
,
""
);
share
=
get_share
(
key
,
0
,
false
,
false
);
ndb_schema_object
=
ndb_get_schema_object
(
key
,
TRUE
,
FALSE
);
}
}
const
NdbError
*
ndb_error
=
0
;
const
NdbError
*
ndb_error
=
0
;
uint32
node_id
=
g_ndb_cluster_connection
->
node_id
();
uint32
node_id
=
g_ndb_cluster_connection
->
node_id
();
Uint64
epoch
=
0
;
Uint64
epoch
=
0
;
MY_BITMAP
schema_subscribers
;
MY_BITMAP
schema_subscribers
;
uint32
bitbuf
[
sizeof
(
schema_share
->
slock
)
/
4
];
uint32
bitbuf
[
sizeof
(
ndb_schema_object
->
slock
)
/
4
];
{
{
int
i
;
int
i
;
bitmap_init
(
&
schema_subscribers
,
bitbuf
,
sizeof
(
bitbuf
)
*
8
,
false
);
bitmap_init
(
&
schema_subscribers
,
bitbuf
,
sizeof
(
bitbuf
)
*
8
,
false
);
...
@@ -1022,11 +1028,12 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
...
@@ -1022,11 +1028,12 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
(
void
)
pthread_mutex_unlock
(
&
schema_share
->
mutex
);
(
void
)
pthread_mutex_unlock
(
&
schema_share
->
mutex
);
bitmap_clear_bit
(
&
schema_subscribers
,
node_id
);
bitmap_clear_bit
(
&
schema_subscribers
,
node_id
);
if
(
share
)
if
(
ndb_schema_object
)
{
{
(
void
)
pthread_mutex_lock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_lock
(
&
ndb_schema_object
->
mutex
);
memcpy
(
share
->
slock
,
schema_subscribers
.
bitmap
,
sizeof
(
share
->
slock
));
memcpy
(
ndb_schema_object
->
slock
,
schema_subscribers
.
bitmap
,
(
void
)
pthread_mutex_unlock
(
&
share
->
mutex
);
sizeof
(
ndb_schema_object
->
slock
));
(
void
)
pthread_mutex_unlock
(
&
ndb_schema_object
->
mutex
);
}
}
DBUG_DUMP
(
"schema_subscribers"
,
(
char
*
)
schema_subscribers
.
bitmap
,
DBUG_DUMP
(
"schema_subscribers"
,
(
char
*
)
schema_subscribers
.
bitmap
,
...
@@ -1158,20 +1165,17 @@ end:
...
@@ -1158,20 +1165,17 @@ end:
Wait for other mysqld's to acknowledge the table operation
Wait for other mysqld's to acknowledge the table operation
*/
*/
if
(
ndb_error
==
0
&&
if
(
ndb_error
==
0
&&
(
type
==
SOT_CREATE_TABLE
||
type
==
SOT_RENAME_TABLE
||
type
==
SOT_ALTER_TABLE
)
&&
!
bitmap_is_clear_all
(
&
schema_subscribers
))
!
bitmap_is_clear_all
(
&
schema_subscribers
))
{
{
int
max_timeout
=
10
;
int
max_timeout
=
10
;
(
void
)
pthread_mutex_lock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_lock
(
&
ndb_schema_object
->
mutex
);
while
(
1
)
while
(
1
)
{
{
struct
timespec
abstime
;
struct
timespec
abstime
;
int
i
;
int
i
;
set_timespec
(
abstime
,
1
);
set_timespec
(
abstime
,
1
);
(
void
)
pthread_cond_timedwait
(
&
injector_cond
,
(
void
)
pthread_cond_timedwait
(
&
injector_cond
,
&
share
->
mutex
,
&
ndb_schema_object
->
mutex
,
&
abstime
);
&
abstime
);
(
void
)
pthread_mutex_lock
(
&
schema_share
->
mutex
);
(
void
)
pthread_mutex_lock
(
&
schema_share
->
mutex
);
...
@@ -1184,14 +1188,14 @@ end:
...
@@ -1184,14 +1188,14 @@ end:
}
}
(
void
)
pthread_mutex_unlock
(
&
schema_share
->
mutex
);
(
void
)
pthread_mutex_unlock
(
&
schema_share
->
mutex
);
/* remove any unsubscribed from
share
->slock */
/* remove any unsubscribed from
ndb_schema_object
->slock */
bitmap_intersect
(
&
share
->
slock_bitmap
,
&
schema_subscribers
);
bitmap_intersect
(
&
ndb_schema_object
->
slock_bitmap
,
&
schema_subscribers
);
DBUG_DUMP
(
"
share
->slock_bitmap.bitmap"
,
DBUG_DUMP
(
"
ndb_schema_object
->slock_bitmap.bitmap"
,
(
char
*
)
share
->
slock_bitmap
.
bitmap
,
(
char
*
)
ndb_schema_object
->
slock_bitmap
.
bitmap
,
no_bytes_in_map
(
&
share
->
slock_bitmap
));
no_bytes_in_map
(
&
ndb_schema_object
->
slock_bitmap
));
if
(
bitmap_is_clear_all
(
&
share
->
slock_bitmap
))
if
(
bitmap_is_clear_all
(
&
ndb_schema_object
->
slock_bitmap
))
break
;
break
;
max_timeout
--
;
max_timeout
--
;
...
@@ -1203,16 +1207,13 @@ end:
...
@@ -1203,16 +1207,13 @@ end:
if
(
ndb_extra_logging
)
if
(
ndb_extra_logging
)
sql_print_information
(
"NDB create table: "
sql_print_information
(
"NDB create table: "
"waiting max %u sec for create table %s."
,
"waiting max %u sec for create table %s."
,
max_timeout
,
share
->
key
);
max_timeout
,
ndb_schema_object
->
key
);
}
}
(
void
)
pthread_mutex_unlock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_unlock
(
&
ndb_schema_object
->
mutex
);
}
}
if
(
get_a_share
&&
share
)
if
(
ndb_schema_object
)
{
ndb_free_schema_object
(
&
ndb_schema_object
,
FALSE
);
free_share
(
&
share
);
share
=
0
;
}
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
...
@@ -1529,12 +1530,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1529,12 +1530,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
MEM_ROOT
*
mem_root
)
MEM_ROOT
*
mem_root
)
{
{
DBUG_ENTER
(
"ndb_binlog_thread_handle_schema_event"
);
DBUG_ENTER
(
"ndb_binlog_thread_handle_schema_event"
);
NDB_SHARE
*
share
=
(
NDB_SHARE
*
)
pOp
->
getCustomData
();
NDB_SHARE
*
tmp_
share
=
(
NDB_SHARE
*
)
pOp
->
getCustomData
();
if
(
share
&&
schema_share
==
share
)
if
(
tmp_share
&&
schema_share
==
tmp_
share
)
{
{
NDBEVENT
::
TableEvent
ev_type
=
pOp
->
getEventType
();
NDBEVENT
::
TableEvent
ev_type
=
pOp
->
getEventType
();
DBUG_PRINT
(
"enter"
,
(
"%s.%s ev_type: %d"
,
DBUG_PRINT
(
"enter"
,
(
"%s.%s ev_type: %d"
,
share
->
db
,
share
->
table_name
,
ev_type
));
tmp_share
->
db
,
tmp_
share
->
table_name
,
ev_type
));
if
(
ev_type
==
NDBEVENT
::
TE_UPDATE
||
if
(
ev_type
==
NDBEVENT
::
TE_UPDATE
||
ev_type
==
NDBEVENT
::
TE_INSERT
)
ev_type
==
NDBEVENT
::
TE_INSERT
)
{
{
...
@@ -1543,7 +1544,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1543,7 +1544,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
MY_BITMAP
slock
;
MY_BITMAP
slock
;
bitmap_init
(
&
slock
,
schema
->
slock
,
8
*
SCHEMA_SLOCK_SIZE
,
false
);
bitmap_init
(
&
slock
,
schema
->
slock
,
8
*
SCHEMA_SLOCK_SIZE
,
false
);
uint
node_id
=
g_ndb_cluster_connection
->
node_id
();
uint
node_id
=
g_ndb_cluster_connection
->
node_id
();
ndbcluster_get_schema
(
share
,
schema
);
ndbcluster_get_schema
(
tmp_
share
,
schema
);
if
(
schema
->
node_id
!=
node_id
)
if
(
schema
->
node_id
!=
node_id
)
{
{
int
log_query
=
0
,
post_epoch_unlock
=
0
;
int
log_query
=
0
,
post_epoch_unlock
=
0
;
...
@@ -1628,18 +1629,24 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1628,18 +1629,24 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
break
;
break
;
case
SOT_CLEAR_SLOCK
:
case
SOT_CLEAR_SLOCK
:
{
{
if
(
share
)
pthread_mutex_lock
(
&
ndbcluster_mutex
);
NDB_SCHEMA_OBJECT
*
ndb_schema_object
=
(
NDB_SCHEMA_OBJECT
*
)
hash_search
(
&
ndb_schema_objects
,
(
byte
*
)
key
,
strlen
(
key
));
if
(
ndb_schema_object
)
{
{
pthread_mutex_lock
(
&
share
->
mutex
);
pthread_mutex_lock
(
&
ndb_schema_object
->
mutex
);
memcpy
(
share
->
slock
,
schema
->
slock
,
sizeof
(
share
->
slock
));
memcpy
(
ndb_schema_object
->
slock
,
schema
->
slock
,
DBUG_DUMP
(
"share->slock_bitmap.bitmap"
,
sizeof
(
ndb_schema_object
->
slock
));
(
char
*
)
share
->
slock_bitmap
.
bitmap
,
DBUG_DUMP
(
"ndb_schema_object->slock_bitmap.bitmap"
,
no_bytes_in_map
(
&
share
->
slock_bitmap
));
(
char
*
)
ndb_schema_object
->
slock_bitmap
.
bitmap
,
pthread_mutex_unlock
(
&
share
->
mutex
);
no_bytes_in_map
(
&
ndb_schema_object
->
slock_bitmap
));
pthread_mutex_unlock
(
&
ndb_schema_object
->
mutex
);
pthread_cond_signal
(
&
injector_cond
);
pthread_cond_signal
(
&
injector_cond
);
free_share
(
&
share
);
share
=
0
;
}
}
if
(
share
)
free_share
(
&
share
,
TRUE
);
pthread_mutex_unlock
(
&
ndbcluster_mutex
);
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
case
SOT_TABLESPACE
:
case
SOT_TABLESPACE
:
...
@@ -1687,24 +1694,24 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1687,24 +1694,24 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
schema_share
=
0
;
schema_share
=
0
;
// fall through
// fall through
case
NDBEVENT
:
:
TE_ALTER
:
case
NDBEVENT
:
:
TE_ALTER
:
ndb_handle_schema_change
(
thd
,
ndb
,
pOp
,
share
);
ndb_handle_schema_change
(
thd
,
ndb
,
pOp
,
tmp_
share
);
break
;
break
;
case
NDBEVENT
:
:
TE_NODE_FAILURE
:
case
NDBEVENT
:
:
TE_NODE_FAILURE
:
{
{
uint8
node_id
=
g_node_id_map
[
pOp
->
getNdbdNodeId
()];
uint8
node_id
=
g_node_id_map
[
pOp
->
getNdbdNodeId
()];
DBUG_ASSERT
(
node_id
!=
0xFF
);
DBUG_ASSERT
(
node_id
!=
0xFF
);
(
void
)
pthread_mutex_lock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_lock
(
&
tmp_
share
->
mutex
);
bitmap_clear_all
(
&
share
->
subscriber_bitmap
[
node_id
]);
bitmap_clear_all
(
&
tmp_
share
->
subscriber_bitmap
[
node_id
]);
DBUG_PRINT
(
"info"
,(
"NODE_FAILURE UNSUBSCRIBE[%d]"
,
node_id
));
DBUG_PRINT
(
"info"
,(
"NODE_FAILURE UNSUBSCRIBE[%d]"
,
node_id
));
if
(
ndb_extra_logging
)
if
(
ndb_extra_logging
)
{
{
sql_print_information
(
"NDB Binlog: Node: %d, down,"
sql_print_information
(
"NDB Binlog: Node: %d, down,"
" Subscriber bitmask %x%x"
,
" Subscriber bitmask %x%x"
,
pOp
->
getNdbdNodeId
(),
pOp
->
getNdbdNodeId
(),
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
1
],
tmp_
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
1
],
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
0
]);
tmp_
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
0
]);
}
}
(
void
)
pthread_mutex_unlock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_unlock
(
&
tmp_
share
->
mutex
);
(
void
)
pthread_cond_signal
(
&
injector_cond
);
(
void
)
pthread_cond_signal
(
&
injector_cond
);
break
;
break
;
}
}
...
@@ -1713,8 +1720,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1713,8 +1720,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
uint8
node_id
=
g_node_id_map
[
pOp
->
getNdbdNodeId
()];
uint8
node_id
=
g_node_id_map
[
pOp
->
getNdbdNodeId
()];
uint8
req_id
=
pOp
->
getReqNodeId
();
uint8
req_id
=
pOp
->
getReqNodeId
();
DBUG_ASSERT
(
req_id
!=
0
&&
node_id
!=
0xFF
);
DBUG_ASSERT
(
req_id
!=
0
&&
node_id
!=
0xFF
);
(
void
)
pthread_mutex_lock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_lock
(
&
tmp_
share
->
mutex
);
bitmap_set_bit
(
&
share
->
subscriber_bitmap
[
node_id
],
req_id
);
bitmap_set_bit
(
&
tmp_
share
->
subscriber_bitmap
[
node_id
],
req_id
);
DBUG_PRINT
(
"info"
,(
"SUBSCRIBE[%d] %d"
,
node_id
,
req_id
));
DBUG_PRINT
(
"info"
,(
"SUBSCRIBE[%d] %d"
,
node_id
,
req_id
));
if
(
ndb_extra_logging
)
if
(
ndb_extra_logging
)
{
{
...
@@ -1722,10 +1729,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1722,10 +1729,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
" Subscriber bitmask %x%x"
,
" Subscriber bitmask %x%x"
,
pOp
->
getNdbdNodeId
(),
pOp
->
getNdbdNodeId
(),
req_id
,
req_id
,
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
1
],
tmp_
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
1
],
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
0
]);
tmp_
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
0
]);
}
}
(
void
)
pthread_mutex_unlock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_unlock
(
&
tmp_
share
->
mutex
);
(
void
)
pthread_cond_signal
(
&
injector_cond
);
(
void
)
pthread_cond_signal
(
&
injector_cond
);
break
;
break
;
}
}
...
@@ -1734,8 +1741,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1734,8 +1741,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
uint8
node_id
=
g_node_id_map
[
pOp
->
getNdbdNodeId
()];
uint8
node_id
=
g_node_id_map
[
pOp
->
getNdbdNodeId
()];
uint8
req_id
=
pOp
->
getReqNodeId
();
uint8
req_id
=
pOp
->
getReqNodeId
();
DBUG_ASSERT
(
req_id
!=
0
&&
node_id
!=
0xFF
);
DBUG_ASSERT
(
req_id
!=
0
&&
node_id
!=
0xFF
);
(
void
)
pthread_mutex_lock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_lock
(
&
tmp_
share
->
mutex
);
bitmap_clear_bit
(
&
share
->
subscriber_bitmap
[
node_id
],
req_id
);
bitmap_clear_bit
(
&
tmp_
share
->
subscriber_bitmap
[
node_id
],
req_id
);
DBUG_PRINT
(
"info"
,(
"UNSUBSCRIBE[%d] %d"
,
node_id
,
req_id
));
DBUG_PRINT
(
"info"
,(
"UNSUBSCRIBE[%d] %d"
,
node_id
,
req_id
));
if
(
ndb_extra_logging
)
if
(
ndb_extra_logging
)
{
{
...
@@ -1743,16 +1750,16 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
...
@@ -1743,16 +1750,16 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
" Subscriber bitmask %x%x"
,
" Subscriber bitmask %x%x"
,
pOp
->
getNdbdNodeId
(),
pOp
->
getNdbdNodeId
(),
req_id
,
req_id
,
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
1
],
tmp_
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
1
],
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
0
]);
tmp_
share
->
subscriber_bitmap
[
node_id
].
bitmap
[
0
]);
}
}
(
void
)
pthread_mutex_unlock
(
&
share
->
mutex
);
(
void
)
pthread_mutex_unlock
(
&
tmp_
share
->
mutex
);
(
void
)
pthread_cond_signal
(
&
injector_cond
);
(
void
)
pthread_cond_signal
(
&
injector_cond
);
break
;
break
;
}
}
default:
default:
sql_print_error
(
"NDB Binlog: unknown non data event %d for %s. "
sql_print_error
(
"NDB Binlog: unknown non data event %d for %s. "
"Ignoring..."
,
(
unsigned
)
ev_type
,
share
->
key
);
"Ignoring..."
,
(
unsigned
)
ev_type
,
tmp_
share
->
key
);
}
}
}
}
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
...
@@ -2888,6 +2895,90 @@ private:
...
@@ -2888,6 +2895,90 @@ private:
Injector thread main loop
Injector thread main loop
****************************************************************/
****************************************************************/
static
byte
*
ndb_schema_objects_get_key
(
NDB_SCHEMA_OBJECT
*
schema_object
,
uint
*
length
,
my_bool
not_used
__attribute__
((
unused
)))
{
*
length
=
schema_object
->
key_length
;
return
(
byte
*
)
schema_object
->
key
;
}
static
NDB_SCHEMA_OBJECT
*
ndb_get_schema_object
(
const
char
*
key
,
my_bool
create_if_not_exists
,
my_bool
have_lock
)
{
NDB_SCHEMA_OBJECT
*
ndb_schema_object
;
uint
length
=
(
uint
)
strlen
(
key
);
DBUG_ENTER
(
"ndb_get_schema_object"
);
DBUG_PRINT
(
"enter"
,
(
"key: '%s'"
,
key
));
if
(
!
have_lock
)
pthread_mutex_lock
(
&
ndbcluster_mutex
);
while
(
!
(
ndb_schema_object
=
(
NDB_SCHEMA_OBJECT
*
)
hash_search
(
&
ndb_schema_objects
,
(
byte
*
)
key
,
length
)))
{
if
(
!
create_if_not_exists
)
{
DBUG_PRINT
(
"info"
,
(
"does not exist"
));
break
;
}
if
(
!
(
ndb_schema_object
=
(
NDB_SCHEMA_OBJECT
*
)
my_malloc
(
sizeof
(
*
ndb_schema_object
)
+
length
+
1
,
MYF
(
MY_WME
|
MY_ZEROFILL
))))
{
DBUG_PRINT
(
"info"
,
(
"malloc error"
));
break
;
}
ndb_schema_object
->
key
=
(
char
*
)(
ndb_schema_object
+
1
);
memcpy
(
ndb_schema_object
->
key
,
key
,
length
+
1
);
ndb_schema_object
->
key_length
=
length
;
if
(
my_hash_insert
(
&
ndb_schema_objects
,
(
byte
*
)
ndb_schema_object
))
{
my_free
((
gptr
)
ndb_schema_object
,
0
);
break
;
}
pthread_mutex_init
(
&
ndb_schema_object
->
mutex
,
MY_MUTEX_INIT_FAST
);
bitmap_init
(
&
ndb_schema_object
->
slock_bitmap
,
ndb_schema_object
->
slock
,
sizeof
(
ndb_schema_object
->
slock
)
*
8
,
false
);
bitmap_clear_all
(
&
ndb_schema_object
->
slock_bitmap
);
break
;
}
if
(
ndb_schema_object
)
{
ndb_schema_object
->
use_count
++
;
DBUG_PRINT
(
"info"
,
(
"use_count: %d"
,
ndb_schema_object
->
use_count
));
}
if
(
!
have_lock
)
pthread_mutex_unlock
(
&
ndbcluster_mutex
);
DBUG_RETURN
(
ndb_schema_object
);
}
static
void
ndb_free_schema_object
(
NDB_SCHEMA_OBJECT
**
ndb_schema_object
,
bool
have_lock
)
{
DBUG_ENTER
(
"ndb_free_schema_object"
);
DBUG_PRINT
(
"enter"
,
(
"key: '%s'"
,
(
*
ndb_schema_object
)
->
key
));
if
(
!
have_lock
)
pthread_mutex_lock
(
&
ndbcluster_mutex
);
if
(
!--
(
*
ndb_schema_object
)
->
use_count
)
{
DBUG_PRINT
(
"info"
,
(
"use_count: %d"
,
(
*
ndb_schema_object
)
->
use_count
));
hash_delete
(
&
ndb_schema_objects
,
(
byte
*
)
*
ndb_schema_object
);
pthread_mutex_destroy
(
&
(
*
ndb_schema_object
)
->
mutex
);
my_free
((
gptr
)
*
ndb_schema_object
,
MYF
(
0
));
*
ndb_schema_object
=
0
;
}
else
{
DBUG_PRINT
(
"info"
,
(
"use_count: %d"
,
(
*
ndb_schema_object
)
->
use_count
));
}
if
(
!
have_lock
)
pthread_mutex_unlock
(
&
ndbcluster_mutex
);
DBUG_VOID_RETURN
;
}
pthread_handler_t
ndb_binlog_thread_func
(
void
*
arg
)
pthread_handler_t
ndb_binlog_thread_func
(
void
*
arg
)
{
{
THD
*
thd
;
/* needs to be first for thread_stack */
THD
*
thd
;
/* needs to be first for thread_stack */
...
@@ -2961,6 +3052,10 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
...
@@ -2961,6 +3052,10 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
goto
err
;
goto
err
;
}
}
/* init hash for schema object distribution */
(
void
)
hash_init
(
&
ndb_schema_objects
,
system_charset_info
,
32
,
0
,
0
,
(
hash_get_key
)
ndb_schema_objects_get_key
,
0
,
0
);
/*
/*
Expose global reference to our ndb object.
Expose global reference to our ndb object.
...
@@ -3360,6 +3455,8 @@ err:
...
@@ -3360,6 +3455,8 @@ err:
ndb
=
0
;
ndb
=
0
;
}
}
hash_free
(
&
ndb_schema_objects
);
// Placed here to avoid a memory leak; TODO: check if needed
// Placed here to avoid a memory leak; TODO: check if needed
net_end
(
&
thd
->
net
);
net_end
(
&
thd
->
net
);
delete
thd
;
delete
thd
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment