Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
d922e834
Commit
d922e834
authored
May 19, 2006
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new
into poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
parents
55e9c051
ccbdf093
Changes
17
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
483 additions
and
319 deletions
+483
-319
mysql-test/r/ndb_autodiscover3.result
mysql-test/r/ndb_autodiscover3.result
+4
-0
mysql-test/t/disabled.def
mysql-test/t/disabled.def
+1
-1
mysql-test/t/ndb_autodiscover3.test
mysql-test/t/ndb_autodiscover3.test
+4
-0
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.cc
+10
-6
sql/ha_ndbcluster.h
sql/ha_ndbcluster.h
+1
-0
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.cc
+83
-55
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/include/ndbapi/Ndb.hpp
+1
-0
storage/ndb/include/ndbapi/NdbDictionary.hpp
storage/ndb/include/ndbapi/NdbDictionary.hpp
+2
-1
storage/ndb/include/util/SocketServer.hpp
storage/ndb/include/util/SocketServer.hpp
+5
-3
storage/ndb/src/common/util/SocketServer.cpp
storage/ndb/src/common/util/SocketServer.cpp
+35
-9
storage/ndb/src/kernel/blocks/suma/Suma.cpp
storage/ndb/src/kernel/blocks/suma/Suma.cpp
+16
-1
storage/ndb/src/mgmsrv/Services.cpp
storage/ndb/src/mgmsrv/Services.cpp
+2
-0
storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/Ndb.cpp
+6
-0
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+3
-57
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
+72
-73
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+230
-109
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
+8
-4
No files found.
mysql-test/r/ndb_autodiscover3.result
View file @
d922e834
...
...
@@ -18,6 +18,7 @@ select * from t2;
ERROR 42S02: Table 'test.t2' doesn't exist
show tables like 't2';
Tables_in_test (t2)
reset master;
create table t2 (a int key) engine=ndbcluster;
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
select * from t2 order by a limit 3;
...
...
@@ -30,10 +31,12 @@ a
1
2
3
reset master;
select * from t2;
ERROR 42S02: Table 'test.t2' doesn't exist
show tables like 't2';
Tables_in_test (t2)
reset master;
create table t2 (a int key) engine=ndbcluster;
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
select * from t2 order by a limit 3;
...
...
@@ -46,4 +49,5 @@ a
1
2
3
reset master;
drop table t2;
mysql-test/t/disabled.def
View file @
d922e834
...
...
@@ -16,7 +16,7 @@ events_scheduling : BUG#19170 2006-04-26 andrey Test case of 19170 fails
events_logs_tests : BUG#17619 2006-05-16 andrey Test case problems
ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown
#
ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown
#ndb_cache2 : BUG#18597 2006-03-28 brian simultaneous drop table and ndb statistics update triggers node failure
#ndb_cache_multi2 : BUG#18597 2006-04-10 kent simultaneous drop table and ndb statistics update triggers node failure
ndb_load : BUG#17233 2006-05-04 tomas failed load data from infile causes mysqld dbug_assert, binlog not flushed
...
...
mysql-test/t/ndb_autodiscover3.test
View file @
d922e834
...
...
@@ -43,6 +43,7 @@ select * from t2 order by a limit 3;
--
error
ER_NO_SUCH_TABLE
select
*
from
t2
;
show
tables
like
't2'
;
reset
master
;
create
table
t2
(
a
int
key
)
engine
=
ndbcluster
;
insert
into
t2
values
(
1
),(
2
),(
3
),(
4
),(
5
),(
6
),(
7
),(
8
),(
9
),(
10
);
select
*
from
t2
order
by
a
limit
3
;
...
...
@@ -50,6 +51,7 @@ select * from t2 order by a limit 3;
# server 1 should have a stale cache, and in this case wrong frm, transaction must be retried
--
connection
server1
select
*
from
t2
order
by
a
limit
3
;
reset
master
;
--
exec
$NDB_MGM
--
no
-
defaults
-
e
"all restart -i"
>>
$NDB_TOOLS_OUTPUT
--
exec
$NDB_TOOLS_DIR
/
ndb_waiter
--
no
-
defaults
>>
$NDB_TOOLS_OUTPUT
...
...
@@ -60,6 +62,7 @@ select * from t2 order by a limit 3;
--
error
ER_NO_SUCH_TABLE
select
*
from
t2
;
show
tables
like
't2'
;
reset
master
;
create
table
t2
(
a
int
key
)
engine
=
ndbcluster
;
insert
into
t2
values
(
1
),(
2
),(
3
),(
4
),(
5
),(
6
),(
7
),(
8
),(
9
),(
10
);
select
*
from
t2
order
by
a
limit
3
;
...
...
@@ -67,6 +70,7 @@ select * from t2 order by a limit 3;
# server 2 should have a stale cache, but with right frm, transaction need not be retried
--
connection
server2
select
*
from
t2
order
by
a
limit
3
;
reset
master
;
drop
table
t2
;
# End of 4.1 tests
sql/ha_ndbcluster.cc
View file @
d922e834
...
...
@@ -182,6 +182,8 @@ static const char * ndb_connected_host= 0;
static
long
ndb_connected_port
=
0
;
static
long
ndb_number_of_replicas
=
0
;
long
ndb_number_of_storage_nodes
=
0
;
long
ndb_number_of_ready_storage_nodes
=
0
;
long
ndb_connect_count
=
0
;
static
int
update_status_variables
(
Ndb_cluster_connection
*
c
)
{
...
...
@@ -190,6 +192,8 @@ static int update_status_variables(Ndb_cluster_connection *c)
ndb_connected_host
=
c
->
get_connected_host
();
ndb_number_of_replicas
=
0
;
ndb_number_of_storage_nodes
=
c
->
no_db_nodes
();
ndb_number_of_ready_storage_nodes
=
c
->
get_no_ready
();
ndb_connect_count
=
c
->
get_connect_count
();
return
0
;
}
...
...
@@ -7128,10 +7132,6 @@ void ndbcluster_real_free_share(NDB_SHARE **share)
#ifndef DBUG_OFF
bzero
((
gptr
)(
*
share
)
->
table_share
,
sizeof
(
*
(
*
share
)
->
table_share
));
bzero
((
gptr
)(
*
share
)
->
table
,
sizeof
(
*
(
*
share
)
->
table
));
#endif
my_free
((
gptr
)
(
*
share
)
->
table_share
,
MYF
(
0
));
my_free
((
gptr
)
(
*
share
)
->
table
,
MYF
(
0
));
#ifndef DBUG_OFF
(
*
share
)
->
table_share
=
0
;
(
*
share
)
->
table
=
0
;
#endif
...
...
@@ -9361,11 +9361,15 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print,
"cluster_node_id=%u, "
"connected_host=%s, "
"connected_port=%u, "
"number_of_storage_nodes=%u"
,
"number_of_storage_nodes=%u, "
"number_of_ready_storage_nodes=%u, "
"connect_count=%u"
,
ndb_cluster_node_id
,
ndb_connected_host
,
ndb_connected_port
,
ndb_number_of_storage_nodes
);
ndb_number_of_storage_nodes
,
ndb_number_of_ready_storage_nodes
,
ndb_connect_count
);
if
(
stat_print
(
thd
,
ndbcluster_hton
.
name
,
strlen
(
ndbcluster_hton
.
name
),
"connection"
,
strlen
(
"connection"
),
buf
,
buflen
))
...
...
sql/ha_ndbcluster.h
View file @
d922e834
...
...
@@ -113,6 +113,7 @@ typedef struct st_ndbcluster_share {
char
*
old_names
;
// for rename table
TABLE_SHARE
*
table_share
;
TABLE
*
table
;
byte
*
record
[
2
];
// pointer to allocated records for receiving data
NdbValue
*
ndb_value
[
2
];
MY_BITMAP
*
subscriber_bitmap
;
#endif
...
...
sql/ha_ndbcluster_binlog.cc
View file @
d922e834
...
...
@@ -25,6 +25,7 @@
#include "slave.h"
#include "ha_ndbcluster_binlog.h"
#include "NdbDictionary.hpp"
#include <util/NdbAutoPtr.hpp>
#ifdef ndb_dynamite
#undef assert
...
...
@@ -265,7 +266,8 @@ ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
static
int
ndbcluster_binlog_open_table
(
THD
*
thd
,
NDB_SHARE
*
share
,
TABLE_SHARE
*
table_share
,
TABLE
*
table
)
TABLE_SHARE
*
table_share
,
TABLE
*
table
,
int
reopen
)
{
int
error
;
DBUG_ENTER
(
"ndbcluster_binlog_open_table"
);
...
...
@@ -278,27 +280,34 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
share
->
key
,
error
);
DBUG_PRINT
(
"error"
,
(
"open_table_def failed %d"
,
error
));
free_table_share
(
table_share
);
my_free
((
gptr
)
table_share
,
MYF
(
0
));
my_free
((
gptr
)
table
,
MYF
(
0
));
DBUG_RETURN
(
error
);
}
if
((
error
=
open_table_from_share
(
thd
,
table_share
,
""
,
0
,
if
((
error
=
open_table_from_share
(
thd
,
table_share
,
""
,
0
/* fon't allocate buffers */
,
(
uint
)
READ_ALL
,
0
,
table
,
FALSE
)))
{
sql_print_error
(
"Unable to open table for %s, error=%d(%d)"
,
share
->
key
,
error
,
my_errno
);
DBUG_PRINT
(
"error"
,
(
"open_table_from_share failed %d"
,
error
));
free_table_share
(
table_share
);
my_free
((
gptr
)
table_share
,
MYF
(
0
));
my_free
((
gptr
)
table
,
MYF
(
0
));
DBUG_RETURN
(
error
);
}
assign_new_table_id
(
table_share
);
if
(
!
table
->
record
[
1
]
||
table
->
record
[
1
]
==
table
->
record
[
0
])
if
(
!
reopen
)
{
// allocate memory on ndb share so it can be reused after online alter table
share
->
record
[
0
]
=
(
byte
*
)
alloc_root
(
&
share
->
mem_root
,
table
->
s
->
rec_buff_length
);
share
->
record
[
1
]
=
(
byte
*
)
alloc_root
(
&
share
->
mem_root
,
table
->
s
->
rec_buff_length
);
}
{
table
->
record
[
1
]
=
alloc_root
(
&
table
->
mem_root
,
table
->
s
->
rec_buff_length
);
my_ptrdiff_t
row_offset
=
share
->
record
[
0
]
-
table
->
record
[
0
];
Field
**
p_field
;
for
(
p_field
=
table
->
field
;
*
p_field
;
p_field
++
)
(
*
p_field
)
->
move_field_offset
(
row_offset
);
table
->
record
[
0
]
=
share
->
record
[
0
];
table
->
record
[
1
]
=
share
->
record
[
1
];
}
table
->
in_use
=
injector_thd
;
table
->
s
->
db
.
str
=
share
->
db
;
...
...
@@ -366,10 +375,9 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
while
(
1
)
{
int
error
;
TABLE_SHARE
*
table_share
=
(
TABLE_SHARE
*
)
my_malloc
(
sizeof
(
*
table_share
),
MYF
(
MY_WME
));
TABLE
*
table
=
(
TABLE
*
)
my_malloc
(
sizeof
(
*
table
),
MYF
(
MY_WME
));
if
((
error
=
ndbcluster_binlog_open_table
(
thd
,
share
,
table_share
,
table
)))
TABLE_SHARE
*
table_share
=
(
TABLE_SHARE
*
)
alloc_root
(
mem_root
,
sizeof
(
*
table_share
));
TABLE
*
table
=
(
TABLE
*
)
alloc_root
(
mem_root
,
sizeof
(
*
table
));
if
((
error
=
ndbcluster_binlog_open_table
(
thd
,
share
,
table_share
,
table
,
0
)))
break
;
/*
! do not touch the contents of the table
...
...
@@ -1535,6 +1543,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
sql_print_information
(
"NDB: Failed write frm for %s.%s, error %d"
,
dbname
,
tabname
,
error
);
}
// copy names as memory will be freed
NdbAutoPtr
<
char
>
a1
((
char
*
)(
dbname
=
strdup
(
dbname
)));
NdbAutoPtr
<
char
>
a2
((
char
*
)(
tabname
=
strdup
(
tabname
)));
ndbcluster_binlog_close_table
(
thd
,
share
);
TABLE_LIST
table_list
;
...
...
@@ -1543,10 +1555,16 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
table_list
.
alias
=
table_list
.
table_name
=
(
char
*
)
tabname
;
close_cached_tables
(
thd
,
0
,
&
table_list
,
TRUE
);
if
((
error
=
ndbcluster_binlog_open_table
(
thd
,
share
,
table_share
,
table
)))
if
((
error
=
ndbcluster_binlog_open_table
(
thd
,
share
,
table_share
,
table
,
1
)))
sql_print_information
(
"NDB: Failed to re-open table %s.%s"
,
dbname
,
tabname
);
table
=
share
->
table
;
table_share
=
share
->
table_share
;
dbname
=
table_share
->
db
.
str
;
tabname
=
table_share
->
table_name
.
str
;
pthread_mutex_unlock
(
&
LOCK_open
);
}
my_free
((
char
*
)
data
,
MYF
(
MY_ALLOW_ZERO_PTR
));
...
...
@@ -1776,7 +1794,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
break
;
case
NDBEVENT
:
:
TE_CLUSTER_FAILURE
:
if
(
ndb_extra_logging
)
sql_print_information
(
"NDB Binlog: cluster failure for %s."
,
schema_share
->
key
);
sql_print_information
(
"NDB Binlog: cluster failure for %s at epoch %u."
,
schema_share
->
key
,
(
unsigned
)
pOp
->
getGCI
());
// fall through
case
NDBEVENT
:
:
TE_DROP
:
if
(
ndb_extra_logging
&&
...
...
@@ -1785,7 +1804,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
"read only on reconnect."
);
free_share
(
&
schema_share
);
schema_share
=
0
;
ndb_binlog_tables_inited
=
FALSE
;
close_cached_tables
((
THD
*
)
0
,
0
,
(
TABLE_LIST
*
)
0
,
FALSE
);
// fall through
case
NDBEVENT
:
:
TE_ALTER
:
...
...
@@ -2829,7 +2847,8 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
{
case
NDBEVENT
:
:
TE_CLUSTER_FAILURE
:
if
(
ndb_extra_logging
)
sql_print_information
(
"NDB Binlog: cluster failure for %s."
,
share
->
key
);
sql_print_information
(
"NDB Binlog: cluster failure for %s at epoch %u."
,
share
->
key
,
(
unsigned
)
pOp
->
getGCI
());
if
(
apply_status_share
==
share
)
{
if
(
ndb_extra_logging
&&
...
...
@@ -2838,7 +2857,6 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
"read only on reconnect."
);
free_share
(
&
apply_status_share
);
apply_status_share
=
0
;
ndb_binlog_tables_inited
=
FALSE
;
}
DBUG_PRINT
(
"info"
,
(
"CLUSTER FAILURE EVENT: "
"%s received share: 0x%lx op: %lx share op: %lx "
...
...
@@ -2854,7 +2872,6 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
"read only on reconnect."
);
free_share
(
&
apply_status_share
);
apply_status_share
=
0
;
ndb_binlog_tables_inited
=
FALSE
;
}
/* ToDo: remove printout */
if
(
ndb_extra_logging
)
...
...
@@ -3267,46 +3284,43 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
pthread_mutex_unlock
(
&
injector_mutex
);
pthread_cond_signal
(
&
injector_cond
);
thd
->
proc_info
=
"Waiting for ndbcluster to start"
;
pthread_mutex_lock
(
&
injector_mutex
);
while
(
!
schema_share
||
(
ndb_binlog_running
&&
!
apply_status_share
))
{
/* ndb not connected yet */
struct
timespec
abstime
;
set_timespec
(
abstime
,
1
);
pthread_cond_timedwait
(
&
injector_cond
,
&
injector_mutex
,
&
abstime
);
if
(
abort_loop
)
{
pthread_mutex_unlock
(
&
injector_mutex
);
goto
err
;
}
}
pthread_mutex_unlock
(
&
injector_mutex
);
restart:
/*
Main NDB Injector loop
*/
DBUG_ASSERT
(
ndbcluster_hton
.
slot
!=
~
(
uint
)
0
);
if
(
!
(
thd_ndb
=
ha_ndbcluster
::
seize_thd_ndb
()))
{
sql_print_error
(
"Could not allocate Thd_ndb object"
);
goto
err
;
}
set_thd_ndb
(
thd
,
thd_ndb
);
thd_ndb
->
options
|=
TNO_NO_LOG_SCHEMA_OP
;
thd
->
query_id
=
0
;
// to keep valgrind quiet
{
static
char
db
[]
=
""
;
thd
->
db
=
db
;
if
(
ndb_binlog_running
)
open_binlog_index
(
thd
,
&
binlog_tables
,
&
binlog_index
);
thd
->
db
=
db
;
thd
->
proc_info
=
"Waiting for ndbcluster to start"
;
pthread_mutex_lock
(
&
injector_mutex
);
while
(
!
schema_share
||
(
ndb_binlog_running
&&
!
apply_status_share
))
{
/* ndb not connected yet */
struct
timespec
abstime
;
set_timespec
(
abstime
,
1
);
pthread_cond_timedwait
(
&
injector_cond
,
&
injector_mutex
,
&
abstime
);
if
(
abort_loop
)
{
pthread_mutex_unlock
(
&
injector_mutex
);
goto
err
;
}
}
pthread_mutex_unlock
(
&
injector_mutex
);
if
(
thd_ndb
==
NULL
)
{
DBUG_ASSERT
(
ndbcluster_hton
.
slot
!=
~
(
uint
)
0
);
if
(
!
(
thd_ndb
=
ha_ndbcluster
::
seize_thd_ndb
()))
{
sql_print_error
(
"Could not allocate Thd_ndb object"
);
goto
err
;
}
set_thd_ndb
(
thd
,
thd_ndb
);
thd_ndb
->
options
|=
TNO_NO_LOG_SCHEMA_OP
;
thd
->
query_id
=
0
;
// to keep valgrind quiet
}
}
restart:
{
// wait for the first event
thd
->
proc_info
=
"Waiting for first event from ndbcluster"
;
...
...
@@ -3321,6 +3335,9 @@ restart:
DBUG_PRINT
(
"info"
,
(
"schema_res: %d schema_gci: %d"
,
schema_res
,
schema_gci
));
if
(
schema_res
>
0
)
{
i_ndb
->
pollEvents
(
0
);
i_ndb
->
flushIncompleteEvents
(
schema_gci
);
s_ndb
->
flushIncompleteEvents
(
schema_gci
);
if
(
schema_gci
<
ndb_latest_handled_binlog_epoch
)
{
sql_print_error
(
"NDB Binlog: cluster has been restarted --initial or with older filesystem. "
...
...
@@ -3334,7 +3351,13 @@ restart:
}
}
}
{
static
char
db
[]
=
""
;
thd
->
db
=
db
;
if
(
ndb_binlog_running
)
open_binlog_index
(
thd
,
&
binlog_tables
,
&
binlog_index
);
thd
->
db
=
db
;
}
do_ndbcluster_binlog_close_connection
=
BCCC_running
;
for
(
;
!
((
abort_loop
||
do_ndbcluster_binlog_close_connection
)
&&
ndb_latest_handled_binlog_epoch
>=
g_latest_trans_gci
)
&&
...
...
@@ -3683,7 +3706,12 @@ restart:
ndb_latest_handled_binlog_epoch
=
ndb_latest_received_binlog_epoch
;
}
if
(
do_ndbcluster_binlog_close_connection
==
BCCC_restart
)
{
ndb_binlog_tables_inited
=
FALSE
;
close_thread_tables
(
thd
);
binlog_index
=
0
;
goto
restart
;
}
err:
DBUG_PRINT
(
"info"
,(
"Shutting down cluster binlog thread"
));
thd
->
proc_info
=
"Shutting down"
;
...
...
storage/ndb/include/ndbapi/Ndb.hpp
View file @
d922e834
...
...
@@ -1262,6 +1262,7 @@ public:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
int
flushIncompleteEvents
(
Uint64
gci
);
NdbEventOperation
*
getEventOperation
(
NdbEventOperation
*
eventOp
=
0
);
Uint64
getLatestGCI
();
void
forceGCP
();
...
...
storage/ndb/include/ndbapi/NdbDictionary.hpp
View file @
d922e834
...
...
@@ -1132,7 +1132,8 @@ public:
_TE_NODE_FAILURE
=
10
,
_TE_SUBSCRIBE
=
11
,
_TE_UNSUBSCRIBE
=
12
,
_TE_NUL
=
13
// internal (e.g. INS o DEL within same GCI)
_TE_NUL
=
13
,
// internal (e.g. INS o DEL within same GCI)
_TE_ACTIVE
=
14
// internal (node becomes active)
};
#endif
/**
...
...
storage/ndb/include/util/SocketServer.hpp
View file @
d922e834
...
...
@@ -106,7 +106,8 @@ public:
void
stopSessions
(
bool
wait
=
false
);
void
foreachSession
(
void
(
*
f
)(
Session
*
,
void
*
),
void
*
data
);
void
checkSessions
();
private:
struct
SessionInstance
{
Service
*
m_service
;
...
...
@@ -117,12 +118,13 @@ private:
Service
*
m_service
;
NDB_SOCKET_TYPE
m_socket
;
};
MutexVector
<
SessionInstance
>
m_sessions
;
NdbLockable
m_session_mutex
;
Vector
<
SessionInstance
>
m_sessions
;
MutexVector
<
ServiceInstance
>
m_services
;
unsigned
m_maxSessions
;
void
doAccept
();
void
checkSessions
();
void
checkSessions
Impl
();
void
startSession
(
SessionInstance
&
);
/**
...
...
storage/ndb/src/common/util/SocketServer.cpp
View file @
d922e834
...
...
@@ -184,9 +184,12 @@ SocketServer::doAccept(){
SessionInstance
s
;
s
.
m_service
=
si
.
m_service
;
s
.
m_session
=
si
.
m_service
->
newSession
(
childSock
);
if
(
s
.
m_session
!=
0
){
if
(
s
.
m_session
!=
0
)
{
m_session_mutex
.
lock
();
m_sessions
.
push_back
(
s
);
startSession
(
m_sessions
.
back
());
m_session_mutex
.
unlock
();
}
continue
;
...
...
@@ -240,10 +243,13 @@ void
SocketServer
::
doRun
(){
while
(
!
m_stopThread
){
checkSessions
();
m_session_mutex
.
lock
();
checkSessionsImpl
();
if
(
m_sessions
.
size
()
<
m_maxSessions
){
m_session_mutex
.
unlock
();
doAccept
();
}
else
{
m_session_mutex
.
unlock
();
NdbSleep_MilliSleep
(
200
);
}
}
...
...
@@ -276,17 +282,30 @@ transfer(NDB_SOCKET_TYPE sock){
void
SocketServer
::
foreachSession
(
void
(
*
func
)(
SocketServer
::
Session
*
,
void
*
),
void
*
data
)
{
m_session_mutex
.
lock
();
for
(
int
i
=
m_sessions
.
size
()
-
1
;
i
>=
0
;
i
--
){
(
*
func
)(
m_sessions
[
i
].
m_session
,
data
);
}
checkSessions
();
m_session_mutex
.
unlock
();
}
void
SocketServer
::
checkSessions
(){
for
(
int
i
=
m_sessions
.
size
()
-
1
;
i
>=
0
;
i
--
){
if
(
m_sessions
[
i
].
m_session
->
m_stopped
){
if
(
m_sessions
[
i
].
m_thread
!=
0
){
SocketServer
::
checkSessions
()
{
m_session_mutex
.
lock
();
checkSessionsImpl
();
m_session_mutex
.
unlock
();
}
void
SocketServer
::
checkSessionsImpl
()
{
for
(
int
i
=
m_sessions
.
size
()
-
1
;
i
>=
0
;
i
--
)
{
if
(
m_sessions
[
i
].
m_session
->
m_stopped
)
{
if
(
m_sessions
[
i
].
m_thread
!=
0
)
{
void
*
ret
;
NdbThread_WaitFor
(
m_sessions
[
i
].
m_thread
,
&
ret
);
NdbThread_Destroy
(
&
m_sessions
[
i
].
m_thread
);
...
...
@@ -301,19 +320,26 @@ SocketServer::checkSessions(){
void
SocketServer
::
stopSessions
(
bool
wait
){
int
i
;
m_session_mutex
.
lock
();
for
(
i
=
m_sessions
.
size
()
-
1
;
i
>=
0
;
i
--
)
{
m_sessions
[
i
].
m_session
->
stopSession
();
m_sessions
[
i
].
m_session
->
m_stop
=
true
;
// to make sure
}
m_session_mutex
.
unlock
();
for
(
i
=
m_services
.
size
()
-
1
;
i
>=
0
;
i
--
)
m_services
[
i
].
m_service
->
stopSessions
();
if
(
wait
){
m_session_mutex
.
lock
();
while
(
m_sessions
.
size
()
>
0
){
checkSessions
();
checkSessionsImpl
();
m_session_mutex
.
unlock
();
NdbSleep_MilliSleep
(
100
);
m_session_mutex
.
lock
();
}
m_session_mutex
.
unlock
();
}
}
...
...
@@ -348,4 +374,4 @@ sessionThread_C(void* _sc){
}
template
class
MutexVector
<
SocketServer
::
ServiceInstance
>;
template
class
Mutex
Vector
<
SocketServer
::
SessionInstance
>;
template
class
Vector
<
SocketServer
::
SessionInstance
>;
storage/ndb/src/kernel/blocks/suma/Suma.cpp
View file @
d922e834
...
...
@@ -2649,6 +2649,22 @@ Suma::reportAllSubscribers(Signal *signal,
SubscriptionPtr
subPtr
,
SubscriberPtr
subbPtr
)
{
SubTableData
*
data
=
(
SubTableData
*
)
signal
->
getDataPtrSend
();
if
(
table_event
==
NdbDictionary
::
Event
::
_TE_SUBSCRIBE
)
{
data
->
gci
=
m_last_complete_gci
+
1
;
data
->
tableId
=
subPtr
.
p
->
m_tableId
;
data
->
operation
=
NdbDictionary
::
Event
::
_TE_ACTIVE
;
data
->
ndbd_nodeid
=
refToNode
(
reference
());
data
->
changeMask
=
0
;
data
->
totalLen
=
0
;
data
->
req_nodeid
=
refToNode
(
subbPtr
.
p
->
m_senderRef
);
data
->
senderData
=
subbPtr
.
p
->
m_senderData
;
sendSignal
(
subbPtr
.
p
->
m_senderRef
,
GSN_SUB_TABLE_DATA
,
signal
,
SubTableData
::
SignalLength
,
JBB
);
}
if
(
!
(
subPtr
.
p
->
m_options
&
Subscription
::
REPORT_SUBSCRIBE
))
{
return
;
...
...
@@ -2663,7 +2679,6 @@ Suma::reportAllSubscribers(Signal *signal,
ndbout_c
(
"reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d"
,
subPtr
.
i
,
subPtr
.
p
->
n_subscribers
);
//#endif
SubTableData
*
data
=
(
SubTableData
*
)
signal
->
getDataPtrSend
();
data
->
gci
=
m_last_complete_gci
+
1
;
data
->
tableId
=
subPtr
.
p
->
m_tableId
;
data
->
operation
=
table_event
;
...
...
storage/ndb/src/mgmsrv/Services.cpp
View file @
d922e834
...
...
@@ -502,6 +502,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
ps
.
tick
=
tick
;
m_mgmsrv
.
get_socket_server
()
->
foreachSession
(
stop_session_if_timed_out
,
&
ps
);
m_mgmsrv
.
get_socket_server
()
->
checkSessions
();
error_string
=
""
;
continue
;
}
...
...
@@ -1559,6 +1560,7 @@ MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx,
ps
.
free_nodes
.
bitXORC
(
NodeBitmask
());
// invert connected_nodes to get free nodes
m_mgmsrv
.
get_socket_server
()
->
foreachSession
(
stop_session_if_not_connected
,
&
ps
);
m_mgmsrv
.
get_socket_server
()
->
checkSessions
();
m_output
->
println
(
"purge stale sessions reply"
);
if
(
str
.
length
()
>
0
)
...
...
storage/ndb/src/ndbapi/Ndb.cpp
View file @
d922e834
...
...
@@ -1324,6 +1324,12 @@ Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
return
theEventBuffer
->
pollEvents
(
aMillisecondNumber
,
latestGCI
);
}
int
Ndb
::
flushIncompleteEvents
(
Uint64
gci
)
{
return
theEventBuffer
->
flushIncompleteEvents
(
gci
);
}
NdbEventOperation
*
Ndb
::
nextEvent
()
{
return
theEventBuffer
->
nextEvent
();
...
...
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
View file @
d922e834
...
...
@@ -3002,63 +3002,6 @@ NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl)
DBUG_RETURN
(
0
);
}
/*****************************************************************
* Get index info
*/
NdbIndexImpl
*
NdbDictionaryImpl
::
getIndexImpl
(
const
char
*
externalName
,
const
BaseString
&
internalName
)
{
ASSERT_NOT_MYSQLD
;
Ndb_local_table_info
*
info
=
get_local_table_info
(
internalName
);
if
(
info
==
0
){
m_error
.
code
=
4243
;
return
0
;
}
NdbTableImpl
*
tab
=
info
->
m_table_impl
;
if
(
tab
->
m_indexType
==
NdbDictionary
::
Object
::
TypeUndefined
)
{
// Not an index
m_error
.
code
=
4243
;
return
0
;
}
NdbTableImpl
*
prim
=
getTable
(
tab
->
m_primaryTable
.
c_str
());
if
(
prim
==
0
){
m_error
.
code
=
4243
;
return
0
;
}
return
getIndexImpl
(
externalName
,
internalName
,
*
tab
,
*
prim
);
}
NdbIndexImpl
*
NdbDictionaryImpl
::
getIndexImpl
(
const
char
*
externalName
,
const
BaseString
&
internalName
,
NdbTableImpl
&
tab
,
NdbTableImpl
&
prim
)
{
DBUG_ENTER
(
"NdbDictionaryImpl::getIndexImpl"
);
DBUG_ASSERT
(
tab
.
m_indexType
!=
NdbDictionary
::
Object
::
TypeUndefined
);
/**
* Create index impl
*/
NdbIndexImpl
*
idx
;
if
(
NdbDictInterface
::
create_index_obj_from_table
(
&
idx
,
&
tab
,
&
prim
)
==
0
){
idx
->
m_table
=
&
tab
;
idx
->
m_externalName
.
assign
(
externalName
);
idx
->
m_internalName
.
assign
(
internalName
);
idx
->
m_table_id
=
prim
.
getObjectId
();
idx
->
m_table_version
=
prim
.
getObjectVersion
();
// TODO Assign idx to tab->m_index
// Don't do it right now since assign can't asign a table with index
// tab->m_index = idx;
DBUG_RETURN
(
idx
);
}
DBUG_RETURN
(
0
);
}
int
NdbDictInterface
::
create_index_obj_from_table
(
NdbIndexImpl
**
dst
,
NdbTableImpl
*
tab
,
...
...
@@ -3116,6 +3059,9 @@ NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst,
tab
->
m_columns
[
i
]
->
m_distributionKey
=
0
;
}
idx
->
m_table_id
=
prim
->
getObjectId
();
idx
->
m_table_version
=
prim
->
getObjectVersion
();
*
dst
=
idx
;
DBUG_PRINT
(
"exit"
,
(
"m_id: %d m_version: %d"
,
idx
->
m_id
,
idx
->
m_version
));
DBUG_RETURN
(
0
);
...
...
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
View file @
d922e834
...
...
@@ -617,6 +617,7 @@ public:
get_local_table_info
(
const
BaseString
&
internalTableName
);
NdbIndexImpl
*
getIndex
(
const
char
*
indexName
,
const
char
*
tableName
);
NdbIndexImpl
*
getIndex
(
const
char
*
indexName
,
const
NdbTableImpl
&
prim
);
NdbEventImpl
*
getEvent
(
const
char
*
eventName
,
NdbTableImpl
*
=
NULL
);
NdbEventImpl
*
getBlobEvent
(
const
NdbEventImpl
&
ev
,
uint
col_no
);
NdbEventImpl
*
getEventImpl
(
const
char
*
internalName
);
...
...
@@ -958,51 +959,36 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
DBUG_RETURN
(
info
);
// autoincrement already initialized
}
class
InitIndex
Global
:
public
GlobalCacheInitObject
class
InitIndex
:
public
GlobalCacheInitObject
{
public:
const
char
*
m_index_name
;
NdbTableImpl
&
m_prim
;
const
NdbTableImpl
&
m_prim
;
InitIndexGlobal
(
NdbDictionaryImpl
*
dict
,
const
BaseString
&
internal_indexname
,
const
char
*
index_name
,
NdbTableImpl
&
prim
)
:
GlobalCacheInitObject
(
dict
,
internal_indexname
),
InitIndex
(
const
BaseString
&
internal_indexname
,
const
char
*
index_name
,
const
NdbTableImpl
&
prim
)
:
GlobalCacheInitObject
(
0
,
internal_indexname
),
m_index_name
(
index_name
),
m_prim
(
prim
)
{}
int
init
(
NdbTableImpl
&
tab
)
const
{
tab
.
m_index
=
m_dict
->
getIndexImpl
(
m_index_name
,
m_name
,
tab
,
m_prim
);
if
(
tab
.
m_index
==
0
)
return
1
;
tab
.
m_index
->
m_table
=
&
tab
;
return
0
;
}
};
class
InitIndex
:
public
GlobalCacheInitObject
{
public:
const
char
*
m_index_name
;
InitIndex
(
NdbDictionaryImpl
*
dict
,
const
BaseString
&
internal_indexname
,
const
char
*
index_name
)
:
GlobalCacheInitObject
(
dict
,
internal_indexname
),
m_index_name
(
index_name
)
{}
int
init
(
NdbTableImpl
&
tab
)
const
{
DBUG_ASSERT
(
tab
.
m_index
==
0
);
tab
.
m_index
=
m_dict
->
getIndexImpl
(
m_index_name
,
m_name
);
if
(
tab
.
m_index
)
{}
int
init
(
NdbTableImpl
&
tab
)
const
{
DBUG_ENTER
(
"InitIndex::init"
);
DBUG_ASSERT
(
tab
.
m_indexType
!=
NdbDictionary
::
Object
::
TypeUndefined
);
/**
* Create index impl
*/
NdbIndexImpl
*
idx
;
if
(
NdbDictInterface
::
create_index_obj_from_table
(
&
idx
,
&
tab
,
&
m_prim
)
==
0
)
{
tab
.
m_index
->
m_table
=
&
tab
;
return
0
;
idx
->
m_table
=
&
tab
;
idx
->
m_externalName
.
assign
(
m_index_name
);
idx
->
m_internalName
.
assign
(
m_name
);
tab
.
m_index
=
idx
;
DBUG_RETURN
(
0
);
}
return
1
;
DBUG_RETURN
(
1
)
;
}
};
...
...
@@ -1019,14 +1005,14 @@ NdbDictionaryImpl::getIndexGlobal(const char * index_name,
while
(
retry
)
{
NdbTableImpl
*
tab
=
fetchGlobalTableImplRef
(
InitIndex
Global
(
this
,
internal_indexname
,
index_name
,
ndbtab
));
fetchGlobalTableImplRef
(
InitIndex
(
internal_indexname
,
index_name
,
ndbtab
));
if
(
tab
)
{
// tab->m_index sould be set. otherwise tab == 0
NdbIndexImpl
*
idx
=
tab
->
m_index
;
if
(
idx
->
m_table_id
!=
ndbtab
.
getObjectId
()
||
idx
->
m_table_version
!=
ndbtab
.
getObjectVersion
())
if
(
idx
->
m_table_id
!=
(
unsigned
)
ndbtab
.
getObjectId
()
||
idx
->
m_table_version
!=
(
unsigned
)
ndbtab
.
getObjectVersion
())
{
releaseIndexGlobal
(
*
idx
,
1
);
retry
--
;
...
...
@@ -1067,41 +1053,54 @@ NdbIndexImpl *
NdbDictionaryImpl
::
getIndex
(
const
char
*
index_name
,
const
char
*
table_name
)
{
while
(
table_name
||
m_ndb
.
usingFullyQualifiedNames
()
)
if
(
table_name
==
0
)
{
const
BaseString
internal_indexname
(
(
table_name
)
?
m_ndb
.
internalize_index_name
(
getTable
(
table_name
),
index_name
)
:
m_ndb
.
internalize_table_name
(
index_name
));
// Index is also a table
if
(
internal_indexname
.
length
())
{
Ndb_local_table_info
*
info
=
m_localHash
.
get
(
internal_indexname
.
c_str
());
NdbTableImpl
*
tab
;
if
(
info
==
0
)
{
tab
=
fetchGlobalTableImplRef
(
InitIndex
(
this
,
internal_indexname
,
index_name
));
if
(
tab
)
{
info
=
Ndb_local_table_info
::
create
(
tab
,
0
);
if
(
info
)
m_localHash
.
put
(
internal_indexname
.
c_str
(),
info
);
else
break
;
}
else
break
;
}
else
tab
=
info
->
m_table_impl
;
return
tab
->
m_index
;
}
break
;
assert
(
0
);
m_error
.
code
=
4243
;
return
0
;
}
NdbTableImpl
*
prim
=
getTable
(
table_name
);
if
(
prim
==
0
)
{
m_error
.
code
=
4243
;
return
0
;
}
return
getIndex
(
index_name
,
*
prim
);
}
inline
NdbIndexImpl
*
NdbDictionaryImpl
::
getIndex
(
const
char
*
index_name
,
const
NdbTableImpl
&
prim
)
{
const
BaseString
internal_indexname
(
m_ndb
.
internalize_index_name
(
&
prim
,
index_name
));
Ndb_local_table_info
*
info
=
m_localHash
.
get
(
internal_indexname
.
c_str
());
NdbTableImpl
*
tab
;
if
(
info
==
0
)
{
tab
=
fetchGlobalTableImplRef
(
InitIndex
(
internal_indexname
,
index_name
,
prim
));
if
(
!
tab
)
goto
err
;
info
=
Ndb_local_table_info
::
create
(
tab
,
0
);
if
(
!
info
)
goto
err
;
m_localHash
.
put
(
internal_indexname
.
c_str
(),
info
);
}
else
tab
=
info
->
m_table_impl
;
return
tab
->
m_index
;
err:
m_error
.
code
=
4243
;
return
0
;
}
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
View file @
d922e834
...
...
@@ -153,11 +153,14 @@ NdbEventOperationImpl::init(NdbEventImpl& evnt)
m_state
=
EO_CREATED
;
m_node_bit_mask
.
clear
();
#ifdef ndb_event_stores_merge_events_flag
m_mergeEvents
=
m_eventImpl
->
m_mergeEvents
;
#else
m_mergeEvents
=
false
;
m_mergeEvents
=
false
;
#endif
m_ref_count
=
0
;
DBUG_PRINT
(
"info"
,
(
"m_ref_count = 0 for op: %p"
,
this
));
m_has_error
=
0
;
...
...
@@ -530,7 +533,11 @@ NdbEventOperationImpl::execute_nolock()
}
}
if
(
r
==
0
)
{
m_ref_count
++
;
DBUG_PRINT
(
"info"
,
(
"m_ref_count: %u for op: %p"
,
m_ref_count
,
this
));
DBUG_RETURN
(
0
);
}
}
//Error
m_state
=
EO_ERROR
;
...
...
@@ -657,80 +664,79 @@ NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal,
int
NdbEventOperationImpl
::
receive_event
()
{
DBUG_ENTER_EVENT
(
"NdbEventOperationImpl::receive_event"
);
Uint32
operation
=
(
Uint32
)
m_data_item
->
sdata
->
operation
;
DBUG_PRINT_EVENT
(
"info"
,(
"sdata->operation %u"
,
operation
));
if
(
operation
==
NdbDictionary
::
Event
::
_TE_ALTER
)
{
// Parse the new table definition and
// create a table object
NdbDictionary
::
Dictionary
*
myDict
=
m_ndb
->
getDictionary
();
NdbDictionaryImpl
*
dict
=
&
NdbDictionaryImpl
::
getImpl
(
*
myDict
);
NdbError
error
;
NdbDictInterface
dif
(
error
);
NdbTableImpl
*
at
;
m_change_mask
=
m_data_item
->
sdata
->
changeMask
;
error
.
code
=
dif
.
parseTableInfo
(
&
at
,
(
Uint32
*
)
m_buffer
.
get_data
(),
m_buffer
.
length
()
/
4
,
true
);
m_buffer
.
clear
();
if
(
at
)
at
->
buildColumnHash
();
else
{
DBUG_PRINT_EVENT
(
"info"
,
(
"Failed to parse DictTabInfo error %u"
,
error
.
code
));
DBUG_RETURN_EVENT
(
1
);
}
NdbTableImpl
*
tmp_table_impl
=
m_eventImpl
->
m_tableImpl
;
m_eventImpl
->
m_tableImpl
=
at
;
DBUG_PRINT
(
"info"
,
(
"switching table impl 0x%x -> 0x%x"
,
tmp_table_impl
,
at
));
// change the rec attrs to refer to the new table object
int
i
;
for
(
i
=
0
;
i
<
2
;
i
++
)
if
(
unlikely
(
operation
>=
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
))
{
DBUG_ENTER
(
"NdbEventOperationImpl::receive_event"
);
DBUG_PRINT
(
"info"
,(
"sdata->operation %u this: %p"
,
operation
,
this
));
if
(
operation
==
NdbDictionary
::
Event
::
_TE_ALTER
)
{
NdbRecAttr
*
p
=
theFirstPkAttrs
[
i
];
while
(
p
)
// Parse the new table definition and
// create a table object
NdbDictionary
::
Dictionary
*
myDict
=
m_ndb
->
getDictionary
();
NdbDictionaryImpl
*
dict
=
&
NdbDictionaryImpl
::
getImpl
(
*
myDict
);
NdbError
error
;
NdbDictInterface
dif
(
error
);
NdbTableImpl
*
at
;
m_change_mask
=
m_data_item
->
sdata
->
changeMask
;
error
.
code
=
dif
.
parseTableInfo
(
&
at
,
(
Uint32
*
)
m_buffer
.
get_data
(),
m_buffer
.
length
()
/
4
,
true
);
m_buffer
.
clear
();
if
(
unlikely
(
!
at
))
{
int
no
=
p
->
getColumn
()
->
getColumnNo
();
NdbColumnImpl
*
tAttrInfo
=
at
->
getColumn
(
no
);
DBUG_PRINT
(
"info"
,
(
"rec_attr: 0x%x "
"switching column impl 0x%x -> 0x%x"
,
p
,
p
->
m_column
,
tAttrInfo
));
p
->
m_column
=
tAttrInfo
;
p
=
p
->
next
();
DBUG_PRINT
(
"info"
,
(
"Failed to parse DictTabInfo error %u"
,
error
.
code
));
ndbout_c
(
"Failed to parse DictTabInfo error %u"
,
error
.
code
);
DBUG_RETURN
(
1
);
}
}
for
(
i
=
0
;
i
<
2
;
i
++
)
{
NdbRecAttr
*
p
=
theFirstDataAttrs
[
i
];
while
(
p
)
at
->
buildColumnHash
();
NdbTableImpl
*
tmp_table_impl
=
m_eventImpl
->
m_tableImpl
;
m_eventImpl
->
m_tableImpl
=
at
;
DBUG_PRINT
(
"info"
,
(
"switching table impl 0x%x -> 0x%x"
,
tmp_table_impl
,
at
));
// change the rec attrs to refer to the new table object
int
i
;
for
(
i
=
0
;
i
<
2
;
i
++
)
{
NdbRecAttr
*
p
=
theFirstPkAttrs
[
i
];
while
(
p
)
{
int
no
=
p
->
getColumn
()
->
getColumnNo
();
NdbColumnImpl
*
tAttrInfo
=
at
->
getColumn
(
no
);
DBUG_PRINT
(
"info"
,
(
"rec_attr: 0x%x "
"switching column impl 0x%x -> 0x%x"
,
p
,
p
->
m_column
,
tAttrInfo
));
p
->
m_column
=
tAttrInfo
;
p
=
p
->
next
();
}
}
for
(
i
=
0
;
i
<
2
;
i
++
)
{
int
no
=
p
->
getColumn
()
->
getColumnNo
();
NdbColumnImpl
*
tAttrInfo
=
at
->
getColumn
(
no
);
DBUG_PRINT
(
"info"
,
(
"rec_attr: 0x%x "
"switching column impl 0x%x -> 0x%x"
,
p
,
p
->
m_column
,
tAttrInfo
));
p
->
m_column
=
tAttrInfo
;
p
=
p
->
next
();
NdbRecAttr
*
p
=
theFirstDataAttrs
[
i
];
while
(
p
)
{
int
no
=
p
->
getColumn
()
->
getColumnNo
();
NdbColumnImpl
*
tAttrInfo
=
at
->
getColumn
(
no
);
DBUG_PRINT
(
"info"
,
(
"rec_attr: 0x%x "
"switching column impl 0x%x -> 0x%x"
,
p
,
p
->
m_column
,
tAttrInfo
));
p
->
m_column
=
tAttrInfo
;
p
=
p
->
next
();
}
}
if
(
tmp_table_impl
)
delete
tmp_table_impl
;
}
if
(
tmp_table_impl
)
delete
tmp_table_impl
;
}
if
(
unlikely
(
operation
>=
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
))
{
DBUG_RETURN_EVENT
(
1
);
DBUG_RETURN
(
1
);
}
DBUG_ENTER_EVENT
(
"NdbEventOperationImpl::receive_event"
);
DBUG_PRINT_EVENT
(
"info"
,(
"sdata->operation %u this: %p"
,
operation
,
this
));
// now move the data into the RecAttrs
int
is_update
=
operation
==
NdbDictionary
::
Event
::
_TE_UPDATE
;
...
...
@@ -1089,6 +1095,33 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
return
ret
;
}
int
NdbEventBuffer
::
flushIncompleteEvents
(
Uint64
gci
)
{
/**
* Find min complete gci
*/
Uint32
i
;
Uint32
sz
=
m_active_gci
.
size
();
Gci_container
*
array
=
(
Gci_container
*
)
m_active_gci
.
getBase
();
for
(
i
=
0
;
i
<
sz
;
i
++
)
{
Gci_container
*
tmp
=
array
+
i
;
if
(
tmp
->
m_gci
&&
tmp
->
m_gci
<
gci
)
{
// we have found an old not-completed gci, remove it
ndbout_c
(
"ndb: flushing incomplete epoch %lld (<%lld)"
,
tmp
->
m_gci
,
gci
);
if
(
!
tmp
->
m_data
.
is_empty
())
{
free_list
(
tmp
->
m_data
);
}
tmp
->~
Gci_container
();
bzero
(
tmp
,
sizeof
(
Gci_container
));
}
}
return
0
;
}
NdbEventOperation
*
NdbEventBuffer
::
nextEvent
()
{
...
...
@@ -1157,7 +1190,10 @@ NdbEventBuffer::nextEvent()
}
EventBufData_list
::
Gci_ops
*
gci_ops
=
m_available_data
.
first_gci_ops
();
while
(
gci_ops
&&
op
->
getGCI
()
>
gci_ops
->
m_gci
)
{
deleteUsedEventOperations
();
gci_ops
=
m_available_data
.
next_gci_ops
();
}
assert
(
gci_ops
&&
(
op
->
getGCI
()
==
gci_ops
->
m_gci
));
DBUG_RETURN_EVENT
(
op
->
m_facade
);
}
...
...
@@ -1177,7 +1213,10 @@ NdbEventBuffer::nextEvent()
// free all "per gci unique" collected operations
EventBufData_list
::
Gci_ops
*
gci_ops
=
m_available_data
.
first_gci_ops
();
while
(
gci_ops
)
{
deleteUsedEventOperations
();
gci_ops
=
m_available_data
.
next_gci_ops
();
}
DBUG_RETURN_EVENT
(
0
);
}
...
...
@@ -1191,31 +1230,37 @@ NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
EventBufData_list
::
Gci_op
g
=
gci_ops
->
m_gci_op_list
[(
*
iter
)
++
];
if
(
event_types
!=
NULL
)
*
event_types
=
g
.
event_types
;
DBUG_PRINT
(
"info"
,
(
"gci: %d"
,
(
unsigned
)
gci_ops
->
m_gci
));
DBUG_PRINT
(
"info"
,
(
"gci: %d g.op: %x g.event_types: %x"
,
(
unsigned
)
gci_ops
->
m_gci
,
g
.
op
,
g
.
event_types
));
DBUG_RETURN
(
g
.
op
);
}
DBUG_RETURN
(
NULL
);
}
void
NdbEventBuffer
::
lock
()
{
NdbMutex_Lock
(
m_mutex
);
}
void
NdbEventBuffer
::
unlock
()
{
NdbMutex_Unlock
(
m_mutex
);
}
void
NdbEventBuffer
::
add_drop_lock
()
NdbEventBuffer
::
deleteUsedEventOperations
()
{
NdbMutex_Lock
(
p_add_drop_mutex
);
}
void
NdbEventBuffer
::
add_drop_unlock
()
{
NdbMutex_Unlock
(
p_add_drop_mutex
);
Uint32
iter
=
0
;
const
NdbEventOperation
*
op_f
;
while
((
op_f
=
getGCIEventOperations
(
&
iter
,
NULL
))
!=
NULL
)
{
NdbEventOperationImpl
*
op
=
&
op_f
->
m_impl
;
DBUG_ASSERT
(
op
->
m_ref_count
>
0
);
op
->
m_ref_count
--
;
DBUG_PRINT
(
"info"
,
(
"m_ref_count: %u for op: %p"
,
op
->
m_ref_count
,
op
));
if
(
op
->
m_ref_count
==
0
)
{
DBUG_PRINT
(
"info"
,
(
"deleting op: %p"
,
op
));
DBUG_ASSERT
(
op
->
m_node_bit_mask
.
isclear
());
if
(
op
->
m_next
)
op
->
m_next
->
m_prev
=
op
->
m_prev
;
if
(
op
->
m_prev
)
op
->
m_prev
->
m_next
=
op
->
m_next
;
else
m_dropped_ev_op
=
op
->
m_next
;
delete
op
->
m_facade
;
}
}
}
static
...
...
@@ -1469,6 +1514,10 @@ NdbEventBuffer::complete_outof_order_gcis()
void
NdbEventBuffer
::
report_node_failure
(
Uint32
node_id
)
{
NdbEventOperation
*
op
=
m_ndb
->
getEventOperation
(
0
);
if
(
op
==
0
)
return
;
DBUG_ENTER
(
"NdbEventBuffer::report_node_failure"
);
SubTableData
data
;
LinearSectionPtr
ptr
[
3
];
...
...
@@ -1484,12 +1533,20 @@ NdbEventBuffer::report_node_failure(Uint32 node_id)
/**
* Insert this event for each operation
*/
NdbEventOperation
*
op
=
0
;
while
((
op
=
m_ndb
->
getEventOperation
(
op
)))
{
NdbEventOperationImpl
*
impl
=
&
op
->
m_impl
;
data
.
senderData
=
impl
->
m_oid
;
insertDataL
(
impl
,
&
data
,
ptr
);
// no need to lock()/unlock(), receive thread calls this
NdbEventOperationImpl
*
impl
=
&
op
->
m_impl
;
do
if
(
!
impl
->
m_node_bit_mask
.
isclear
())
{
data
.
senderData
=
impl
->
m_oid
;
insertDataL
(
impl
,
&
data
,
ptr
);
}
while
((
impl
=
impl
->
m_next
));
for
(
impl
=
m_dropped_ev_op
;
impl
;
impl
=
impl
->
m_next
)
if
(
!
impl
->
m_node_bit_mask
.
isclear
())
{
data
.
senderData
=
impl
->
m_oid
;
insertDataL
(
impl
,
&
data
,
ptr
);
}
}
DBUG_VOID_RETURN
;
}
...
...
@@ -1515,12 +1572,21 @@ NdbEventBuffer::completeClusterFailed()
/**
* Insert this event for each operation
*/
do
{
NdbEventOperationImpl
*
impl
=
&
op
->
m_impl
;
data
.
senderData
=
impl
->
m_oid
;
insertDataL
(
impl
,
&
data
,
ptr
);
}
while
((
op
=
m_ndb
->
getEventOperation
(
op
)));
// no need to lock()/unlock(), receive thread calls this
NdbEventOperationImpl
*
impl
=
&
op
->
m_impl
;
do
if
(
!
impl
->
m_node_bit_mask
.
isclear
())
{
data
.
senderData
=
impl
->
m_oid
;
insertDataL
(
impl
,
&
data
,
ptr
);
}
while
((
impl
=
impl
->
m_next
));
for
(
impl
=
m_dropped_ev_op
;
impl
;
impl
=
impl
->
m_next
)
if
(
!
impl
->
m_node_bit_mask
.
isclear
())
{
data
.
senderData
=
impl
->
m_oid
;
insertDataL
(
impl
,
&
data
,
ptr
);
}
}
/**
* Release all GCI's with m_gci > gci
...
...
@@ -1565,7 +1631,11 @@ NdbEventBuffer::completeClusterFailed()
}
}
assert
(
bucket
!=
0
);
if
(
bucket
==
0
)
{
// no bucket to complete
DBUG_VOID_RETURN
;
}
const
Uint32
cnt
=
bucket
->
m_gcp_complete_rep_count
=
1
;
bucket
->
m_gci
=
gci
;
...
...
@@ -1595,6 +1665,40 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
{
DBUG_ENTER_EVENT
(
"NdbEventBuffer::insertDataL"
);
Uint64
gci
=
sdata
->
gci
;
const
bool
is_data_event
=
sdata
->
operation
<
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
;
if
(
!
is_data_event
)
{
switch
(
sdata
->
operation
)
{
case
NdbDictionary
:
:
Event
::
_TE_NODE_FAILURE
:
op
->
m_node_bit_mask
.
clear
(
sdata
->
ndbd_nodeid
);
break
;
case
NdbDictionary
:
:
Event
::
_TE_ACTIVE
:
op
->
m_node_bit_mask
.
set
(
sdata
->
ndbd_nodeid
);
// internal event, do not relay to user
DBUG_RETURN_EVENT
(
0
);
break
;
case
NdbDictionary
:
:
Event
::
_TE_CLUSTER_FAILURE
:
op
->
m_node_bit_mask
.
clear
();
DBUG_ASSERT
(
op
->
m_ref_count
>
0
);
op
->
m_ref_count
--
;
DBUG_PRINT
(
"info"
,
(
"m_ref_count: %u for op: %p"
,
op
->
m_ref_count
,
op
));
break
;
case
NdbDictionary
:
:
Event
::
_TE_STOP
:
op
->
m_node_bit_mask
.
clear
(
sdata
->
ndbd_nodeid
);
if
(
op
->
m_node_bit_mask
.
isclear
())
{
DBUG_ASSERT
(
op
->
m_ref_count
>
0
);
op
->
m_ref_count
--
;
DBUG_PRINT
(
"info"
,
(
"m_ref_count: %u for op: %p"
,
op
->
m_ref_count
,
op
));
}
break
;
default:
break
;
}
}
if
(
likely
((
Uint32
)
op
->
mi_type
&
(
1
<<
(
Uint32
)
sdata
->
operation
))
)
{
...
...
@@ -1615,8 +1719,6 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
}
const
bool
is_blob_event
=
(
op
->
theMainOp
!=
NULL
);
const
bool
is_data_event
=
sdata
->
operation
<
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
;
const
bool
use_hash
=
op
->
m_mergeEvents
&&
is_data_event
;
if
(
!
is_data_event
&&
is_blob_event
)
...
...
@@ -2244,6 +2346,8 @@ void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
void
EventBufData_list
::
add_gci_op
(
Gci_op
g
,
bool
del
)
{
DBUG_ENTER_EVENT
(
"EventBufData_list::add_gci_op"
);
DBUG_PRINT_EVENT
(
"info"
,
(
"p.op: %p g.event_types: %x"
,
g
.
op
,
g
.
event_types
));
assert
(
g
.
op
!=
NULL
);
Uint32
i
;
for
(
i
=
0
;
i
<
m_gci_op_count
;
i
++
)
{
...
...
@@ -2273,8 +2377,15 @@ EventBufData_list::add_gci_op(Gci_op g, bool del)
}
assert
(
m_gci_op_count
<
m_gci_op_alloc
);
assert
(
!
del
);
#ifndef DBUG_OFF
i
=
m_gci_op_count
;
#endif
g
.
op
->
m_ref_count
++
;
DBUG_PRINT
(
"info"
,
(
"m_ref_count: %u for op: %p"
,
g
.
op
->
m_ref_count
,
g
.
op
));
m_gci_op_list
[
m_gci_op_count
++
]
=
g
;
}
DBUG_PRINT_EVENT
(
"exit"
,
(
"m_gci_op_list[%u].event_types: %x"
,
i
,
m_gci_op_list
[
i
].
event_types
));
DBUG_VOID_RETURN_EVENT
;
}
void
...
...
@@ -2337,6 +2448,9 @@ NdbEventBuffer::createEventOperation(const char* eventName,
delete
tOp
;
DBUG_RETURN
(
NULL
);
}
getEventOperationImpl
(
tOp
)
->
m_ref_count
=
1
;
DBUG_PRINT
(
"info"
,
(
"m_ref_count: %u for op: %p"
,
getEventOperationImpl
(
tOp
)
->
m_ref_count
,
getEventOperationImpl
(
tOp
)));
DBUG_RETURN
(
tOp
);
}
...
...
@@ -2362,16 +2476,10 @@ NdbEventBuffer::createEventOperation(NdbEventImpl& evnt,
void
NdbEventBuffer
::
dropEventOperation
(
NdbEventOperation
*
tOp
)
{
DBUG_ENTER
(
"NdbEventBuffer::dropEventOperation"
);
NdbEventOperationImpl
*
op
=
getEventOperationImpl
(
tOp
);
op
->
stop
();
op
->
m_next
=
m_dropped_ev_op
;
op
->
m_prev
=
0
;
if
(
m_dropped_ev_op
)
m_dropped_ev_op
->
m_prev
=
op
;
m_dropped_ev_op
=
op
;
// stop blob event ops
if
(
op
->
theMainOp
==
NULL
)
{
...
...
@@ -2391,11 +2499,24 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
}
}
// ToDo, take care of these to be deleted at the
// appropriate time, after we are sure that there
// are _no_ more events coming
// delete tOp;
DBUG_ASSERT
(
op
->
m_ref_count
>
0
);
op
->
m_ref_count
--
;
DBUG_PRINT
(
"info"
,
(
"m_ref_count: %u for op: %p"
,
op
->
m_ref_count
,
op
));
if
(
op
->
m_ref_count
==
0
)
{
DBUG_PRINT
(
"info"
,
(
"deleting op: %p"
,
op
));
DBUG_ASSERT
(
op
->
m_node_bit_mask
.
isclear
());
delete
op
->
m_facade
;
}
else
{
op
->
m_next
=
m_dropped_ev_op
;
op
->
m_prev
=
0
;
if
(
m_dropped_ev_op
)
m_dropped_ev_op
->
m_prev
=
op
;
m_dropped_ev_op
=
op
;
}
DBUG_VOID_RETURN
;
}
void
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
View file @
d922e834
...
...
@@ -367,6 +367,8 @@ public:
Uint32
m_eventId
;
Uint32
m_oid
;
Bitmask
<
(
unsigned
int
)
_NDB_NODE_BITMASK_SIZE
>
m_node_bit_mask
;
int
m_ref_count
;
bool
m_mergeEvents
;
EventBufData
*
m_data_item
;
...
...
@@ -406,10 +408,10 @@ public:
void
dropEventOperation
(
NdbEventOperation
*
);
static
NdbEventOperationImpl
*
getEventOperationImpl
(
NdbEventOperation
*
tOp
);
void
add_drop_lock
()
;
void
add_drop_unlock
()
;
void
lock
()
;
void
unlock
()
;
void
add_drop_lock
()
{
NdbMutex_Lock
(
p_add_drop_mutex
);
}
void
add_drop_unlock
()
{
NdbMutex_Unlock
(
p_add_drop_mutex
);
}
void
lock
()
{
NdbMutex_Lock
(
m_mutex
);
}
void
unlock
()
{
NdbMutex_Unlock
(
m_mutex
);
}
void
add_op
();
void
remove_op
();
...
...
@@ -430,9 +432,11 @@ public:
Uint32
getEventId
(
int
bufferId
);
int
pollEvents
(
int
aMillisecondNumber
,
Uint64
*
latestGCI
=
0
);
int
flushIncompleteEvents
(
Uint64
gci
);
NdbEventOperation
*
nextEvent
();
NdbEventOperationImpl
*
getGCIEventOperations
(
Uint32
*
iter
,
Uint32
*
event_types
);
void
deleteUsedEventOperations
();
NdbEventOperationImpl
*
move_data
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment