Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
2bd00bc2
Commit
2bd00bc2
authored
Dec 28, 2005
by
pekka@mysql.com
Browse files
Options
Browse Files
Download
Plain Diff
Merge mysql.com:/space/pekka/ndb/version/my51
into mysql.com:/space/pekka/ndb/version/my51-rbr
parents
986cd20a
b450f940
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
585 additions
and
136 deletions
+585
-136
storage/ndb/include/ndbapi/NdbDictionary.hpp
storage/ndb/include/ndbapi/NdbDictionary.hpp
+2
-1
storage/ndb/include/ndbapi/NdbEventOperation.hpp
storage/ndb/include/ndbapi/NdbEventOperation.hpp
+6
-0
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
+2
-2
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
+53
-33
storage/ndb/src/ndbapi/NdbEventOperation.cpp
storage/ndb/src/ndbapi/NdbEventOperation.cpp
+5
-0
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+467
-94
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
+46
-6
storage/ndb/test/ndbapi/test_event.cpp
storage/ndb/test/ndbapi/test_event.cpp
+4
-0
No files found.
storage/ndb/include/ndbapi/NdbDictionary.hpp
View file @
2bd00bc2
...
...
@@ -1027,7 +1027,8 @@ public:
_TE_CREATE
=
6
,
_TE_GCP_COMPLETE
=
7
,
_TE_CLUSTER_FAILURE
=
8
,
_TE_STOP
=
9
_TE_STOP
=
9
,
_TE_NUL
=
10
// internal (INS o DEL within same GCI)
};
#endif
/**
...
...
storage/ndb/include/ndbapi/NdbEventOperation.hpp
View file @
2bd00bc2
...
...
@@ -93,6 +93,12 @@ public:
* Retrieve current state of the NdbEventOperation object
*/
State
getState
();
/**
* By default events on same NdbEventOperation within same GCI
* are merged into a single event. This can be changed with
* separateEvents(true).
*/
void
separateEvents
(
bool
flag
);
/**
* Activates the NdbEventOperation to start receiving events. The
...
...
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
View file @
2bd00bc2
TARGET
=
ndbapi_event
SRCS
=
ndbapi_event.cpp
OBJS
=
ndbapi_event.o
CXX
=
g++
CXX
=
g++
-g
CFLAGS
=
-c
-Wall
-fno-rtti
-fno-exceptions
CXXFLAGS
=
DEBUG
=
...
...
@@ -17,7 +17,7 @@ $(TARGET): $(OBJS)
$(CXX)
$(CXXFLAGS)
$(LFLAGS)
$(LIB_DIR)
$(OBJS)
-lndbclient
-lmysqlclient_r
-lmysys
-lmystrings
-lz
$(SYS_LIB)
-o
$(TARGET)
$(TARGET).o
:
$(SRCS)
$(CXX)
$(CFLAGS)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
$(SRCS)
$(CXX)
$(CFLAGS)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
-I
$(TOP_SRCDIR)
/include
$(SRCS)
clean
:
rm
-f
*
.o
$(TARGET)
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
View file @
2bd00bc2
...
...
@@ -58,24 +58,29 @@
/**
*
* Assume that there is a table
TAB
0 which is being updated by
* Assume that there is a table
t
0 which is being updated by
* another process (e.g. flexBench -l 0 -stdtables).
* We want to monitor what happens with columns
COL0, COL2, COL11
* We want to monitor what happens with columns
c0,c1,c2,c3.
*
* or together with the mysql client;
*
* shell> mysql -u root
* mysql> create database TEST_DB;
* mysql> use TEST_DB;
* mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb;
* mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
* primary key(c0, c2)) engine ndb charset latin1;
*
* In another window start ndbapi_event, wait until properly started
*
insert into TAB0 values (1,2,3);
insert into TAB0 values (2,2,3);
insert into TAB0 values (3,2,9);
update TAB0 set COL1=10 where COL0=1;
delete from TAB0 where COL0=1;
insert into t0 values (1, 2, 'a', 'b');
insert into t0 values (3, 4, 'c', 'd');
update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
update t0 set c3 = 'f'; -- use scan
update t0 set c3 = 'F'; -- use scan update to 'same'
update t0 set c2 = 'g' where c0 = 1; -- update pk part
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
delete from t0;
*
* you should see the data popping up in the example window
*
...
...
@@ -92,9 +97,10 @@ int myCreateEvent(Ndb* myNdb,
const
char
**
eventColumnName
,
const
int
noEventColumnName
);
int
main
()
int
main
(
int
argc
,
char
**
argv
)
{
ndb_init
();
bool
sep
=
argc
>
1
&&
strcmp
(
argv
[
1
],
"-s"
)
==
0
;
Ndb_cluster_connection
*
cluster_connection
=
new
Ndb_cluster_connection
();
// Object representing the cluster
...
...
@@ -126,13 +132,15 @@ int main()
if
(
myNdb
->
init
()
==
-
1
)
APIERROR
(
myNdb
->
getNdbError
());
const
char
*
eventName
=
"CHNG_IN_
TAB
0"
;
const
char
*
eventTableName
=
"
TAB
0"
;
const
int
noEventColumnName
=
3
;
const
char
*
eventName
=
"CHNG_IN_
t
0"
;
const
char
*
eventTableName
=
"
t
0"
;
const
int
noEventColumnName
=
4
;
const
char
*
eventColumnName
[
noEventColumnName
]
=
{
"COL0"
,
"COL1"
,
"COL11"
};
{
"c0"
,
"c1"
,
"c2"
,
"c3"
};
// Create events
myCreateEvent
(
myNdb
,
...
...
@@ -142,13 +150,14 @@ int main()
noEventColumnName
);
int
j
=
0
;
while
(
j
<
5
)
{
while
(
j
<
99
)
{
// Start "transaction" for handling events
NdbEventOperation
*
op
;
printf
(
"create EventOperation
\n
"
);
if
((
op
=
myNdb
->
createEventOperation
(
eventName
))
==
NULL
)
APIERROR
(
myNdb
->
getNdbError
());
op
->
separateEvents
(
sep
);
printf
(
"get values
\n
"
);
NdbRecAttr
*
recAttr
[
noEventColumnName
];
...
...
@@ -175,34 +184,45 @@ int main()
i
++
;
switch
(
op
->
getEventType
())
{
case
NdbDictionary
:
:
Event
::
TE_INSERT
:
printf
(
"%u INSERT
:
"
,
i
);
printf
(
"%u INSERT"
,
i
);
break
;
case
NdbDictionary
:
:
Event
::
TE_DELETE
:
printf
(
"%u DELETE
:
"
,
i
);
printf
(
"%u DELETE"
,
i
);
break
;
case
NdbDictionary
:
:
Event
::
TE_UPDATE
:
printf
(
"%u UPDATE
:
"
,
i
);
printf
(
"%u UPDATE"
,
i
);
break
;
default:
abort
();
// should not happen
}
for
(
int
i
=
1
;
i
<
noEventColumnName
;
i
++
)
{
printf
(
" gci=%d
\n
"
,
op
->
getGCI
());
printf
(
"post: "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttr
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
printf
(
" post[%u]="
,
i
);
if
(
recAttr
[
i
]
->
isNULL
()
==
0
)
// we have a non-null value
printf
(
"%u"
,
recAttr
[
i
]
->
u_32_value
());
else
// we have a null value
printf
(
"NULL"
);
}
if
(
recAttr
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttr
[
i
]
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttr
[
i
]
->
aRef
());
}
else
// we have a null value
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
}
printf
(
"
\n
pre : "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttrPre
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
printf
(
" pre[%u]="
,
i
);
if
(
recAttrPre
[
i
]
->
isNULL
()
==
0
)
// we have a non-null value
printf
(
"%u"
,
recAttrPre
[
i
]
->
u_32_value
());
else
// we have a null value
printf
(
"NULL"
);
}
if
(
recAttrPre
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttrPre
[
i
]
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttrPre
[
i
]
->
aRef
());
}
else
// we have a null value
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
}
printf
(
"
\n
"
);
printf
(
"
\n
"
);
}
}
else
;
//printf("timed out\n");
...
...
storage/ndb/src/ndbapi/NdbEventOperation.cpp
View file @
2bd00bc2
...
...
@@ -38,6 +38,11 @@ NdbEventOperation::State NdbEventOperation::getState()
return
m_impl
.
getState
();
}
void
NdbEventOperation
::
separateEvents
(
bool
flag
)
{
m_impl
.
m_separateEvents
=
flag
;
}
NdbRecAttr
*
NdbEventOperation
::
getValue
(
const
char
*
colName
,
char
*
aValue
)
{
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
View file @
2bd00bc2
...
...
@@ -104,6 +104,8 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
m_state
=
EO_CREATED
;
m_separateEvents
=
false
;
m_has_error
=
0
;
DBUG_PRINT
(
"exit"
,(
"this: 0x%x oid: %u"
,
this
,
m_oid
));
...
...
@@ -693,6 +695,21 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
return
ret
;
}
#ifdef VM_TRACE
static
void
print_std
(
const
char
*
tag
,
const
SubTableData
*
sdata
,
LinearSectionPtr
ptr
[
3
])
{
printf
(
"%s
\n
"
,
tag
);
printf
(
"addr=%p gci=%d op=%d
\n
"
,
(
void
*
)
sdata
,
sdata
->
gci
,
sdata
->
operation
);
for
(
int
i
=
0
;
i
<=
2
;
i
++
)
{
printf
(
"sec=%d addr=%p sz=%d
\n
"
,
i
,
(
void
*
)
ptr
[
i
].
p
,
ptr
[
i
].
sz
);
for
(
int
j
=
0
;
j
<
ptr
[
i
].
sz
;
j
++
)
printf
(
"%08x "
,
ptr
[
i
].
p
[
j
]);
printf
(
"
\n
"
);
}
}
#endif
NdbEventOperation
*
NdbEventBuffer
::
nextEvent
()
{
...
...
@@ -734,6 +751,10 @@ NdbEventBuffer::nextEvent()
op
->
m_data_done_count
++
;
#endif
// NUL event is not returned
if
(
data
->
sdata
->
operation
==
NdbDictionary
::
Event
::
_TE_NUL
)
continue
;
int
r
=
op
->
receive_event
();
if
(
r
>
0
)
{
...
...
@@ -1099,13 +1120,15 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_ENTER
(
"NdbEventBuffer::insertDataL"
);
Uint64
gci
=
sdata
->
gci
;
EventBufData
*
data
=
m_free_data
;
if
(
likely
((
Uint32
)
op
->
mi_type
&
1
<<
(
Uint32
)
sdata
->
operation
)
)
{
Gci_container
*
bucket
=
find_bucket
(
&
m_active_gci
,
gci
);
DBUG_PRINT
(
"info"
,
(
"data insertion in eventId %d"
,
op
->
m_eventId
));
DBUG_PRINT
(
"info"
,
(
"gci=%d tab=%d op=%d node=%d"
,
sdata
->
gci
,
sdata
->
tableId
,
sdata
->
operation
,
sdata
->
req_nodeid
));
if
(
unlikely
(
bucket
==
0
))
{
...
...
@@ -1116,61 +1139,65 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_RETURN
(
0
);
}
if
(
unlikely
(
data
==
0
))
bool
use_hash
=
!
op
->
m_separateEvents
&&
sdata
->
operation
<
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
;
// find position in bucket hash table
EventBufData
*
data
=
0
;
EventBufData_hash
::
Pos
hpos
;
if
(
use_hash
)
{
#ifdef VM_TRACE
assert
(
m_free_data_count
==
0
);
assert
(
m_free_data_sz
==
0
);
#endif
expand
(
4000
);
reportStatus
();
bucket
->
m_data_hash
.
search
(
hpos
,
op
,
ptr
);
data
=
hpos
.
data
;
}
data
=
m_free_data
;
if
(
data
==
0
)
{
// allocate new result buffer
data
=
alloc_data
();
if
(
unlikely
(
data
==
0
))
{
#ifdef VM_TRACE
printf
(
"m_latest_command: %s
\n
"
,
m_latest_command
);
printf
(
"no free data, m_latestGCI %lld
\n
"
,
m_latestGCI
);
printf
(
"m_free_data_count %d
\n
"
,
m_free_data_count
);
printf
(
"m_available_data_count %d first gci %d last gci %d
\n
"
,
m_available_data
.
m_count
,
m_available_data
.
m_head
?
m_available_data
.
m_head
->
sdata
->
gci
:
0
,
m_available_data
.
m_tail
?
m_available_data
.
m_tail
->
sdata
->
gci
:
0
);
printf
(
"m_used_data_count %d
\n
"
,
m_used_data
.
m_count
);
#endif
op
->
m_has_error
=
2
;
DBUG_RETURN
(
-
1
);
// TODO handle this, overrun, or, skip?
op
->
m_has_error
=
2
;
DBUG_RETURN
(
-
1
);
}
}
// remove data from free list
m_free_data
=
data
->
m_next
;
if
(
unlikely
(
copy_data
(
sdata
,
ptr
,
data
)))
{
op
->
m_has_error
=
3
;
DBUG_RETURN
(
-
1
);
}
// add it to list and hash table
bucket
->
m_data
.
append
(
data
);
if
(
use_hash
)
{
bucket
->
m_data_hash
.
append
(
hpos
,
data
);
}
#ifdef VM_TRACE
m_free_data_count
--
;
assert
(
m_free_data_sz
>=
data
->
sz
);
op
->
m_data_count
++
;
#endif
m_free_data_sz
-=
data
->
sz
;
if
(
unlikely
(
copy_data_alloc
(
sdata
,
ptr
,
data
)))
}
else
{
op
->
m_has_error
=
3
;
DBUG_RETURN
(
-
1
);
// event with same op, PK found, merge into old buffer
if
(
unlikely
(
merge_data
(
sdata
,
ptr
,
data
)))
{
op
->
m_has_error
=
3
;
DBUG_RETURN
(
-
1
);
}
}
data
->
m_event_op
=
op
;
if
(
use_hash
)
{
data
->
m_pkhash
=
hpos
.
pkhash
;
}
// add it to received data
bucket
->
m_data
.
append
(
data
);
data
->
m_event_op
=
op
;
#ifdef VM_TRACE
op
->
m_data_count
++
;
#endif
DBUG_RETURN
(
0
);
}
#ifdef VM_TRACE
if
((
Uint32
)
op
->
m_eventImpl
->
mi_type
&
1
<<
(
Uint32
)
sdata
->
operation
)
{
// XXX never reached
DBUG_PRINT
(
"info"
,(
"Data arrived before ready eventId"
,
op
->
m_eventId
));
DBUG_RETURN
(
0
);
}
...
...
@@ -1183,80 +1210,324 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
#endif
}
int
NdbEventBuffer
::
copy_data_alloc
(
const
SubTableData
*
const
f_sdata
,
LinearSectionPtr
f_ptr
[
3
],
EventBufData
*
ev_buf
)
// allocate EventBufData
EventBufData
*
NdbEventBuffer
::
alloc_data
()
{
DBUG_ENTER
(
"NdbEventBuffer::copy_data_alloc"
);
const
unsigned
min_alloc_size
=
128
;
const
unsigned
sz4
=
(
sizeof
(
SubTableData
)
+
3
)
>>
2
;
Uint32
f_ptr_sz_0
=
f_ptr
[
0
].
sz
;
Uint32
f_ptr_sz_1
=
f_ptr
[
1
].
sz
;
Uint32
f_ptr_sz_2
=
f_ptr
[
2
].
sz
;
LinearSectionPtr
*
t_ptr
=
ev_buf
->
ptr
;
SubTableData
*
sdata
=
ev_buf
->
sdata
;
const
unsigned
alloc_size
=
(
sz4
+
f_ptr_sz_0
+
f_ptr_sz_1
+
f_ptr_sz_2
)
*
sizeof
(
Uint32
);
Uint32
*
ptr
;
if
(
alloc_size
>
min_alloc_size
)
DBUG_ENTER
(
"alloc_data"
);
EventBufData
*
data
=
m_free_data
;
if
(
unlikely
(
data
==
0
))
{
if
(
sdata
)
#ifdef VM_TRACE
assert
(
m_free_data_count
==
0
);
assert
(
m_free_data_sz
==
0
);
#endif
expand
(
4000
);
reportStatus
();
data
=
m_free_data
;
if
(
unlikely
(
data
==
0
))
{
NdbMem_Free
((
char
*
)
sdata
);
#ifdef VM_TRACE
assert
(
m_total_alloc
>=
ev_buf
->
sz
);
printf
(
"m_latest_command: %s
\n
"
,
m_latest_command
);
printf
(
"no free data, m_latestGCI %lld
\n
"
,
m_latestGCI
);
printf
(
"m_free_data_count %d
\n
"
,
m_free_data_count
);
printf
(
"m_available_data_count %d first gci %d last gci %d
\n
"
,
m_available_data
.
m_count
,
m_available_data
.
m_head
?
m_available_data
.
m_head
->
sdata
->
gci
:
0
,
m_available_data
.
m_tail
?
m_available_data
.
m_tail
->
sdata
->
gci
:
0
);
printf
(
"m_used_data_count %d
\n
"
,
m_used_data
.
m_count
);
#endif
m_total_alloc
-=
ev_buf
->
sz
;
DBUG_RETURN
(
0
);
// TODO handle this, overrun, or, skip?
}
ptr
=
(
Uint32
*
)
NdbMem_Allocate
(
alloc_size
);
ev_buf
->
sdata
=
(
SubTableData
*
)
ptr
;
ev_buf
->
sz
=
alloc_size
;
m_total_alloc
+=
alloc_size
;
}
else
/* alloc_size <= min_alloc_size */
// remove data from free list
m_free_data
=
data
->
m_next
;
data
->
m_next
=
0
;
#ifdef VM_TRACE
m_free_data_count
--
;
assert
(
m_free_data_sz
>=
data
->
sz
);
#endif
m_free_data_sz
-=
data
->
sz
;
DBUG_RETURN
(
data
);
}
// allocate initial or bigger memory area in EventBufData
// takes sizes from given ptr and sets up data->ptr
int
NdbEventBuffer
::
alloc_mem
(
EventBufData
*
data
,
LinearSectionPtr
ptr
[
3
])
{
const
Uint32
min_alloc_size
=
128
;
Uint32
sz4
=
(
sizeof
(
SubTableData
)
+
3
)
>>
2
;
Uint32
alloc_size
=
(
sz4
+
ptr
[
0
].
sz
+
ptr
[
1
].
sz
+
ptr
[
2
].
sz
)
<<
2
;
if
(
alloc_size
<
min_alloc_size
)
alloc_size
=
min_alloc_size
;
if
(
data
->
sz
<
alloc_size
)
{
NdbMem_Free
((
char
*
)
data
->
memory
);
assert
(
m_total_alloc
>=
data
->
sz
);
m_total_alloc
-=
data
->
sz
;
data
->
memory
=
0
;
data
->
sz
=
0
;
data
->
memory
=
(
Uint32
*
)
NdbMem_Allocate
(
alloc_size
);
if
(
data
->
memory
==
0
)
return
-
1
;
data
->
sz
=
alloc_size
;
m_total_alloc
+=
data
->
sz
;
}
Uint32
*
memptr
=
data
->
memory
;
memptr
+=
sz4
;
int
i
;
for
(
i
=
0
;
i
<=
2
;
i
++
)
{
if
(
sdata
)
ptr
=
(
Uint32
*
)
sdata
;
else
{
ptr
=
(
Uint32
*
)
NdbMem_Allocate
(
min_alloc_size
);
ev_buf
->
sdata
=
(
SubTableData
*
)
ptr
;
ev_buf
->
sz
=
min_alloc_size
;
m_total_alloc
+=
min_alloc_size
;
}
data
->
ptr
[
i
].
p
=
memptr
;
data
->
ptr
[
i
].
sz
=
ptr
[
i
].
sz
;
memptr
+=
ptr
[
i
].
sz
;
}
memcpy
(
ptr
,
f_sdata
,
sizeof
(
SubTableData
));
ptr
+=
sz4
;
return
0
;
}
int
NdbEventBuffer
::
copy_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
)
{
DBUG_ENTER
(
"NdbEventBuffer::copy_data"
);
t_ptr
->
p
=
ptr
;
t_ptr
->
sz
=
f_ptr_sz_0
;
if
(
alloc_mem
(
data
,
ptr
)
!=
0
)
DBUG_RETURN
(
-
1
);
memcpy
(
data
->
sdata
,
sdata
,
sizeof
(
SubTableData
));
int
i
;
for
(
i
=
0
;
i
<=
2
;
i
++
)
memcpy
(
data
->
ptr
[
i
].
p
,
ptr
[
i
].
p
,
ptr
[
i
].
sz
<<
2
);
DBUG_RETURN
(
0
);
}
memcpy
(
ptr
,
f_ptr
[
0
].
p
,
sizeof
(
Uint32
)
*
f_ptr_sz_0
);
ptr
+=
f_ptr_sz_0
;
t_ptr
++
;
static
struct
Ev_t
{
enum
{
INS
=
NdbDictionary
::
Event
::
_TE_INSERT
,
DEL
=
NdbDictionary
::
Event
::
_TE_DELETE
,
UPD
=
NdbDictionary
::
Event
::
_TE_UPDATE
,
NUL
=
NdbDictionary
::
Event
::
_TE_NUL
,
ERR
=
255
};
int
t1
,
t2
,
t3
;
}
ev_t
[]
=
{
{
Ev_t
::
INS
,
Ev_t
::
INS
,
Ev_t
::
ERR
},
{
Ev_t
::
INS
,
Ev_t
::
DEL
,
Ev_t
::
NUL
},
//ok
{
Ev_t
::
INS
,
Ev_t
::
UPD
,
Ev_t
::
INS
},
//ok
{
Ev_t
::
DEL
,
Ev_t
::
INS
,
Ev_t
::
UPD
},
//ok
{
Ev_t
::
DEL
,
Ev_t
::
DEL
,
Ev_t
::
ERR
},
{
Ev_t
::
DEL
,
Ev_t
::
UPD
,
Ev_t
::
ERR
},
{
Ev_t
::
UPD
,
Ev_t
::
INS
,
Ev_t
::
ERR
},
{
Ev_t
::
UPD
,
Ev_t
::
DEL
,
Ev_t
::
DEL
},
//ok
{
Ev_t
::
UPD
,
Ev_t
::
UPD
,
Ev_t
::
UPD
}
//ok
};
t_ptr
->
p
=
ptr
;
t_ptr
->
sz
=
f_ptr_sz_1
;
/*
* | INS | DEL | UPD
* 0 | pk ah + all ah | pk ah | pk ah + new ah
* 1 | pk ad + all ad | old pk ad | new pk ad + new ad
* 2 | empty | old non-pk ah+ad | old ah+ad
*/
memcpy
(
ptr
,
f_ptr
[
1
].
p
,
sizeof
(
Uint32
)
*
f_ptr_sz_1
);
ptr
+=
f_ptr_sz_1
;
t_ptr
++
;
static
AttributeHeader
copy_head
(
Uint32
&
i1
,
Uint32
*
p1
,
Uint32
&
i2
,
const
Uint32
*
p2
,
Uint32
flags
)
{
AttributeHeader
ah
(
p2
[
i2
]);
bool
do_copy
=
(
flags
&
1
);
if
(
do_copy
)
p1
[
i1
]
=
p2
[
i2
];
i1
++
;
i2
++
;
return
ah
;
}
if
(
f_ptr_sz_2
)
static
void
copy_attr
(
AttributeHeader
ah
,
Uint32
&
j1
,
Uint32
*
p1
,
Uint32
&
j2
,
const
Uint32
*
p2
,
Uint32
flags
)
{
bool
do_copy
=
(
flags
&
1
);
bool
with_head
=
(
flags
&
2
);
Uint32
n
=
with_head
+
ah
.
getDataSize
();
if
(
do_copy
)
{
t_ptr
->
p
=
ptr
;
t_ptr
->
sz
=
f_ptr_sz_2
;
memcpy
(
ptr
,
f_ptr
[
2
].
p
,
sizeof
(
Uint32
)
*
f_ptr_sz_2
)
;
Uint32
k
;
for
(
k
=
0
;
k
<
n
;
k
++
)
p1
[
j1
++
]
=
p2
[
j2
++
]
;
}
else
{
t_ptr
->
p
=
0
;
t_ptr
->
sz
=
0
;
j1
+=
n
;
j2
+=
n
;
}
}
int
NdbEventBuffer
::
merge_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr2
[
3
],
EventBufData
*
data
)
{
DBUG_ENTER
(
"NdbEventBuffer::merge_data"
);
Uint32
nkey
=
data
->
m_event_op
->
m_eventImpl
->
m_tableImpl
->
m_noOfKeys
;
int
t1
=
data
->
sdata
->
operation
;
int
t2
=
sdata
->
operation
;
if
(
t1
==
Ev_t
::
NUL
)
DBUG_RETURN
(
copy_data
(
sdata
,
ptr2
,
data
));
Ev_t
*
tp
=
0
;
int
i
;
for
(
i
=
0
;
i
<
sizeof
(
ev_t
)
/
sizeof
(
ev_t
[
0
]);
i
++
)
{
if
(
ev_t
[
i
].
t1
==
t1
&&
ev_t
[
i
].
t2
==
t2
)
{
tp
=
&
ev_t
[
i
];
break
;
}
}
assert
(
tp
!=
0
&&
tp
->
t3
!=
Ev_t
::
ERR
);
// save old data
EventBufData
olddata
=
*
data
;
data
->
memory
=
0
;
data
->
sz
=
0
;
// compose ptr1 o ptr2 = ptr
LinearSectionPtr
(
&
ptr1
)
[
3
]
=
olddata
.
ptr
;
LinearSectionPtr
(
&
ptr
)
[
3
]
=
data
->
ptr
;
// loop twice where first loop only sets sizes
int
loop
;
for
(
loop
=
0
;
loop
<=
1
;
loop
++
)
{
if
(
loop
==
1
)
{
if
(
alloc_mem
(
data
,
ptr
)
!=
0
)
DBUG_RETURN
(
-
1
);
*
data
->
sdata
=
*
sdata
;
data
->
sdata
->
operation
=
tp
->
t3
;
}
ptr
[
0
].
sz
=
ptr
[
1
].
sz
=
ptr
[
3
].
sz
=
0
;
// copy pk from new version
{
AttributeHeader
ah
;
Uint32
i
=
0
;
Uint32
j
=
0
;
Uint32
i2
=
0
;
Uint32
j2
=
0
;
while
(
i
<
nkey
)
{
ah
=
copy_head
(
i
,
ptr
[
0
].
p
,
i2
,
ptr2
[
0
].
p
,
loop
);
copy_attr
(
ah
,
j
,
ptr
[
1
].
p
,
j2
,
ptr2
[
1
].
p
,
loop
);
}
ptr
[
0
].
sz
=
i
;
ptr
[
1
].
sz
=
j
;
}
// merge after values, new version overrides
if
(
tp
->
t3
!=
Ev_t
::
DEL
)
{
AttributeHeader
ah
;
Uint32
i
=
ptr
[
0
].
sz
;
Uint32
j
=
ptr
[
1
].
sz
;
Uint32
i1
=
0
;
Uint32
j1
=
0
;
Uint32
i2
=
nkey
;
Uint32
j2
=
ptr
[
1
].
sz
;
while
(
i1
<
nkey
)
{
j1
+=
AttributeHeader
(
ptr1
[
0
].
p
[
i1
++
]).
getDataSize
();
}
while
(
1
)
{
bool
b1
=
(
i1
<
ptr1
[
0
].
sz
);
bool
b2
=
(
i2
<
ptr2
[
0
].
sz
);
if
(
b1
&&
b2
)
{
Uint32
id1
=
AttributeHeader
(
ptr1
[
0
].
p
[
i1
]).
getAttributeId
();
Uint32
id2
=
AttributeHeader
(
ptr2
[
0
].
p
[
i2
]).
getAttributeId
();
if
(
id1
<
id2
)
b2
=
false
;
else
if
(
id1
>
id2
)
b1
=
false
;
else
{
j1
+=
AttributeHeader
(
ptr1
[
0
].
p
[
i1
++
]).
getDataSize
();
b1
=
false
;
}
}
if
(
b1
)
{
ah
=
copy_head
(
i
,
ptr
[
0
].
p
,
i1
,
ptr1
[
0
].
p
,
loop
);
copy_attr
(
ah
,
j
,
ptr
[
1
].
p
,
j1
,
ptr1
[
1
].
p
,
loop
);
}
else
if
(
b2
)
{
ah
=
copy_head
(
i
,
ptr
[
0
].
p
,
i2
,
ptr2
[
0
].
p
,
loop
);
copy_attr
(
ah
,
j
,
ptr
[
1
].
p
,
j2
,
ptr2
[
1
].
p
,
loop
);
}
else
break
;
}
ptr
[
0
].
sz
=
i
;
ptr
[
1
].
sz
=
j
;
}
// merge before values, old version overrides
if
(
tp
->
t3
!=
Ev_t
::
INS
)
{
AttributeHeader
ah
;
Uint32
k
=
0
;
Uint32
k1
=
0
;
Uint32
k2
=
0
;
while
(
1
)
{
bool
b1
=
(
k1
<
ptr1
[
2
].
sz
);
bool
b2
=
(
k2
<
ptr2
[
2
].
sz
);
if
(
b1
&&
b2
)
{
Uint32
id1
=
AttributeHeader
(
ptr1
[
2
].
p
[
k1
]).
getAttributeId
();
Uint32
id2
=
AttributeHeader
(
ptr2
[
2
].
p
[
k2
]).
getAttributeId
();
if
(
id1
<
id2
)
b2
=
false
;
else
if
(
id1
>
id2
)
b1
=
false
;
else
{
k2
+=
1
+
AttributeHeader
(
ptr2
[
2
].
p
[
k2
]).
getDataSize
();
b2
=
false
;
}
}
if
(
b1
)
{
ah
=
AttributeHeader
(
ptr1
[
2
].
p
[
k1
]);
copy_attr
(
ah
,
k
,
ptr
[
2
].
p
,
k1
,
ptr1
[
2
].
p
,
loop
|
2
);
}
else
if
(
b2
)
{
ah
=
AttributeHeader
(
ptr2
[
2
].
p
[
k2
]);
copy_attr
(
ah
,
k
,
ptr
[
2
].
p
,
k2
,
ptr2
[
2
].
p
,
loop
|
2
);
}
else
break
;
}
}
}
// free old data
NdbMem_Free
((
char
*
)
olddata
.
memory
);
DBUG_RETURN
(
0
);
}
...
...
@@ -1399,5 +1670,107 @@ NdbEventBuffer::reportStatus()
#endif
}
// hash table routines
// could optimize the all-fixed case
Uint32
EventBufData_hash
::
getpkhash
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
])
{
const
NdbTableImpl
*
tab
=
op
->
m_eventImpl
->
m_tableImpl
;
// in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
// for pk update (to equivalent pk) post/pre values give same hash
Uint32
nkey
=
tab
->
m_noOfKeys
;
assert
(
nkey
!=
0
&&
nkey
<=
ptr
[
0
].
sz
);
const
Uint32
*
hptr
=
ptr
[
0
].
p
;
const
uchar
*
dptr
=
(
uchar
*
)
ptr
[
1
].
p
;
// hash registers
ulong
nr1
=
0
;
ulong
nr2
=
0
;
while
(
nkey
--
!=
0
)
{
AttributeHeader
ah
(
*
hptr
++
);
Uint32
bytesize
=
ah
.
getByteSize
();
assert
(
dptr
+
bytesize
<=
(
uchar
*
)(
ptr
[
1
].
p
+
ptr
[
1
].
sz
));
Uint32
i
=
ah
.
getAttributeId
();
const
NdbColumnImpl
*
col
=
tab
->
getColumn
(
i
);
assert
(
col
!=
0
);
Uint32
lb
,
len
;
bool
ok
=
NdbSqlUtil
::
get_var_length
(
col
->
m_type
,
dptr
,
bytesize
,
lb
,
len
);
assert
(
ok
);
CHARSET_INFO
*
cs
=
col
->
m_cs
?
col
->
m_cs
:
&
my_charset_bin
;
(
*
cs
->
coll
->
hash_sort
)(
cs
,
dptr
+
lb
,
len
,
&
nr1
,
&
nr2
);
dptr
+=
bytesize
;
}
return
nr1
;
}
// this is seldom invoked
bool
EventBufData_hash
::
getpkequal
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr1
[
3
],
LinearSectionPtr
ptr2
[
3
])
{
const
NdbTableImpl
*
tab
=
op
->
m_eventImpl
->
m_tableImpl
;
Uint32
nkey
=
tab
->
m_noOfKeys
;
assert
(
nkey
!=
0
&&
nkey
<=
ptr1
[
0
].
sz
&&
nkey
<=
ptr2
[
0
].
sz
);
const
Uint32
*
hptr1
=
ptr1
[
0
].
p
;
const
Uint32
*
hptr2
=
ptr2
[
0
].
p
;
const
uchar
*
dptr1
=
(
uchar
*
)
ptr1
[
1
].
p
;
const
uchar
*
dptr2
=
(
uchar
*
)
ptr2
[
1
].
p
;
while
(
nkey
--
!=
0
)
{
AttributeHeader
ah1
(
*
hptr1
++
);
AttributeHeader
ah2
(
*
hptr2
++
);
// sizes can differ on update of varchar endspace
Uint32
bytesize1
=
ah1
.
getByteSize
();
Uint32
bytesize2
=
ah1
.
getByteSize
();
assert
(
dptr1
+
bytesize1
<=
(
uchar
*
)(
ptr1
[
1
].
p
+
ptr1
[
1
].
sz
));
assert
(
dptr2
+
bytesize2
<=
(
uchar
*
)(
ptr2
[
1
].
p
+
ptr2
[
1
].
sz
));
assert
(
ah1
.
getAttributeId
()
==
ah2
.
getAttributeId
());
Uint32
i
=
ah1
.
getAttributeId
();
const
NdbColumnImpl
*
col
=
tab
->
getColumn
(
i
);
assert
(
col
!=
0
);
Uint32
lb1
,
len1
;
bool
ok1
=
NdbSqlUtil
::
get_var_length
(
col
->
m_type
,
dptr1
,
bytesize1
,
lb1
,
len1
);
Uint32
lb2
,
len2
;
bool
ok2
=
NdbSqlUtil
::
get_var_length
(
col
->
m_type
,
dptr2
,
bytesize2
,
lb2
,
len2
);
assert
(
ok1
&&
ok2
&&
lb1
==
lb2
);
CHARSET_INFO
*
cs
=
col
->
m_cs
?
col
->
m_cs
:
&
my_charset_bin
;
int
res
=
(
cs
->
coll
->
strnncollsp
)(
cs
,
dptr1
+
lb1
,
len1
,
dptr2
+
lb2
,
len2
,
false
);
if
(
res
!=
0
)
return
false
;
dptr1
+=
bytesize1
;
dptr2
+=
bytesize2
;
}
return
true
;
}
void
EventBufData_hash
::
search
(
Pos
&
hpos
,
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
])
{
Uint32
pkhash
=
getpkhash
(
op
,
ptr
);
Uint32
index
=
(
op
->
m_oid
^
pkhash
)
%
GCI_EVENT_HASH_SIZE
;
EventBufData
*
data
=
m_hash
[
index
];
while
(
data
!=
0
)
{
if
(
data
->
m_event_op
==
op
&&
data
->
m_pkhash
==
pkhash
&&
getpkequal
(
op
,
data
->
ptr
,
ptr
))
break
;
data
=
data
->
m_next_hash
;
}
hpos
.
index
=
index
;
hpos
.
data
=
data
;
hpos
.
pkhash
=
pkhash
;
}
template
class
Vector
<
Gci_container
>;
template
class
Vector
<
NdbEventBuffer
::
EventBufData_chunk
*
>;
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
View file @
2bd00bc2
...
...
@@ -25,16 +25,19 @@
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
class
NdbEventOperationImpl
;
struct
EventBufData
{
union
{
SubTableData
*
sdata
;
char
*
memory
;
Uint32
*
memory
;
};
LinearSectionPtr
ptr
[
3
];
unsigned
sz
;
NdbEventOperationImpl
*
m_event_op
;
EventBufData
*
m_next
;
// Next wrt to global order
EventBufData
*
m_next_hash
;
// Next in per-GCI hash
Uint32
m_pkhash
;
// PK hash (without op) for fast compare
};
class
EventBufData_list
...
...
@@ -116,6 +119,34 @@ void EventBufData_list::append(const EventBufData_list &list)
m_sz
+=
list
.
m_sz
;
}
// GCI bucket has also a hash over data, with key event op, table PK.
// It can only be appended to and is invalid after remove_first().
class
EventBufData_hash
{
public:
struct
Pos
{
// search result
Uint32
index
;
// index into hash array
EventBufData
*
data
;
// non-zero if found
Uint32
pkhash
;
// PK hash
};
static
Uint32
getpkhash
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
]);
static
bool
getpkequal
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr1
[
3
],
LinearSectionPtr
ptr2
[
3
]);
void
search
(
Pos
&
hpos
,
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
]);
void
append
(
Pos
&
hpos
,
EventBufData
*
data
);
enum
{
GCI_EVENT_HASH_SIZE
=
101
};
EventBufData
*
m_hash
[
GCI_EVENT_HASH_SIZE
];
};
inline
void
EventBufData_hash
::
append
(
Pos
&
hpos
,
EventBufData
*
data
)
{
data
->
m_next_hash
=
m_hash
[
hpos
.
index
];
m_hash
[
hpos
.
index
]
=
data
;
}
struct
Gci_container
{
enum
State
...
...
@@ -127,6 +158,7 @@ struct Gci_container
Uint32
m_gcp_complete_rep_count
;
// Remaining SUB_GCP_COMPLETE_REP until done
Uint64
m_gci
;
// GCI
EventBufData_list
m_data
;
EventBufData_hash
m_data_hash
;
};
class
NdbEventOperationImpl
:
public
NdbEventOperation
{
...
...
@@ -173,6 +205,8 @@ public:
*/
Uint32
m_eventId
;
Uint32
m_oid
;
bool
m_separateEvents
;
EventBufData
*
m_data_item
;
...
...
@@ -212,7 +246,6 @@ public:
void
add_op
();
void
remove_op
();
void
init_gci_containers
();
Uint32
m_active_op_count
;
// accessed from the "receive thread"
int
insertDataL
(
NdbEventOperationImpl
*
op
,
...
...
@@ -233,10 +266,15 @@ public:
NdbEventOperationImpl
*
move_data
();
// used by both user thread and receive thread
int
copy_data_alloc
(
const
SubTableData
*
const
f_sdata
,
LinearSectionPtr
f_ptr
[
3
],
EventBufData
*
ev_buf
);
// routines to copy/merge events
EventBufData
*
alloc_data
();
int
alloc_mem
(
EventBufData
*
data
,
LinearSectionPtr
ptr
[
3
]);
int
copy_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
);
int
merge_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
);
void
free_list
(
EventBufData_list
&
list
);
...
...
@@ -290,6 +328,8 @@ private:
// dropped event operations that have not yet
// been deleted
NdbEventOperationImpl
*
m_dropped_ev_op
;
Uint32
m_active_op_count
;
};
inline
...
...
storage/ndb/test/ndbapi/test_event.cpp
View file @
2bd00bc2
...
...
@@ -169,6 +169,7 @@ eventOperation(Ndb* pNdb, const NdbDictionary::Table &tab, void* pstats, int rec
g_err
<<
function
<<
"Event operation creation failed
\n
"
;
return
NDBT_FAILED
;
}
pOp
->
separateEvents
(
true
);
g_info
<<
function
<<
"get values
\n
"
;
NdbRecAttr
*
recAttr
[
1024
];
...
...
@@ -370,6 +371,7 @@ int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step)
g_err
<<
"Event operation creation failed
\n
"
;
return
NDBT_FAILED
;
}
pOp
->
separateEvents
(
true
);
g_info
<<
"dropping event operation"
<<
endl
;
int
res
=
pNdb
->
dropEventOperation
(
pOp
);
...
...
@@ -540,6 +542,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
g_err
<<
"Event operation creation failed on %s"
<<
buf
<<
endl
;
DBUG_RETURN
(
NDBT_FAILED
);
}
pOp
->
separateEvents
(
true
);
int
i
;
int
n_columns
=
table
->
getNoOfColumns
();
...
...
@@ -1185,6 +1188,7 @@ static int createEventOperations(Ndb * ndb)
{
DBUG_RETURN
(
NDBT_FAILED
);
}
pOp
->
separateEvents
(
true
);
int
n_columns
=
pTabs
[
i
]
->
getNoOfColumns
();
for
(
int
j
=
0
;
j
<
n_columns
;
j
++
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment