Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
d679b903
Commit
d679b903
authored
Jan 12, 2006
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge mysql.com:/home/mydev/mysql-5.1
into mysql.com:/home/mydev/mysql-5.1-wl1563
parents
d9e3f538
5872e5ae
Changes
18
Show whitespace changes
Inline
Side-by-side
Showing
18 changed files
with
665 additions
and
190 deletions
+665
-190
mysql-test/t/ndb_dd_ddl.test
mysql-test/t/ndb_dd_ddl.test
+10
-10
sql/event_executor.cc
sql/event_executor.cc
+9
-2
sql/event_timed.cc
sql/event_timed.cc
+2
-2
storage/ndb/include/ndbapi/NdbDictionary.hpp
storage/ndb/include/ndbapi/NdbDictionary.hpp
+2
-1
storage/ndb/include/ndbapi/NdbEventOperation.hpp
storage/ndb/include/ndbapi/NdbEventOperation.hpp
+6
-0
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
+2
-2
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
+53
-33
storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp
+1
-1
storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp
+2
-2
storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp
+4
-4
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
+1
-1
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
+18
-18
storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
+2
-0
storage/ndb/src/ndbapi/NdbEventOperation.cpp
storage/ndb/src/ndbapi/NdbEventOperation.cpp
+5
-0
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+468
-94
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
+46
-6
storage/ndb/test/ndbapi/test_event.cpp
storage/ndb/test/ndbapi/test_event.cpp
+4
-0
storage/ndb/test/ndbapi/test_event_merge.cpp
storage/ndb/test/ndbapi/test_event_merge.cpp
+30
-14
No files found.
mysql-test/t/ndb_dd_ddl.test
View file @
d679b903
...
...
@@ -21,7 +21,7 @@ INITIAL_SIZE 16M
UNDO_BUFFER_SIZE
=
1
M
ENGINE
=
NDB
;
--
error
1502
--
error
ER_CREATE_TABLESPACE_FAILED
CREATE
LOGFILE
GROUP
lg1
ADD
UNDOFILE
'undofile.dat'
INITIAL_SIZE
16
M
...
...
@@ -32,7 +32,7 @@ ALTER LOGFILE GROUP lg1
ADD
UNDOFILE
'undofile02.dat'
INITIAL_SIZE
4
M
ENGINE
NDB
;
--
error
1507
--
error
ER_ALTER_TABLESPACE_FAILED
ALTER
LOGFILE
GROUP
lg1
ADD
UNDOFILE
'undofile02.dat'
INITIAL_SIZE
4
M
ENGINE
=
NDB
;
...
...
@@ -43,20 +43,20 @@ USE LOGFILE GROUP lg1
INITIAL_SIZE 12M
ENGINE NDB
;
--
error
1502
# Bug 16158
--
error
ER_CREATE_TABLESPACE_FAILED
CREATE
TABLESPACE
ts1
ADD
DATAFILE
'datafile.dat'
USE
LOGFILE
GROUP lg1
INITIAL_SIZE 12M
ENGINE NDB
;
# Currently a bug, bug#16158
ALTER
TABLESPACE
ts1
ADD
DATAFILE
'datafile2.dat'
INITIAL_SIZE
12
M
ENGINE
=
NDB
;
--
error
1507
# Currently a bug, bug#16158
--
error
ER_ALTER_TABLESPACE_FAILED
ALTER
TABLESPACE
ts1
ADD
DATAFILE
'datafile2.dat'
INITIAL_SIZE
12
M
...
...
@@ -67,7 +67,7 @@ CREATE TABLE t1
tablespace
ts1
storage
disk
engine
ndb
;
--
error
1050
--
error
ER_TABLE_EXISTS_ERROR
CREATE
TABLE
t1
(
pk1
int
not
null
primary
key
,
b
int
not
null
,
c
int
not
null
)
tablespace
ts1
storage
disk
...
...
@@ -79,7 +79,7 @@ ALTER TABLESPACE ts1
DROP
DATAFILE
'datafile2.dat'
ENGINE
=
NDB
;
--
error
1507
--
error
ER_ALTER_TABLESPACE_FAILED
ALTER
TABLESPACE
ts1
DROP
DATAFILE
'datafile2.dat'
ENGINE
=
NDB
;
...
...
@@ -88,7 +88,7 @@ ALTER TABLESPACE ts1
DROP
DATAFILE
'datafile.dat'
ENGINE
=
NDB
;
--
error
1507
--
error
ER_ALTER_TABLESPACE_FAILED
ALTER
TABLESPACE
ts1
DROP
DATAFILE
'datafile.dat'
ENGINE
=
NDB
;
...
...
@@ -96,14 +96,14 @@ ENGINE=NDB;
DROP
TABLESPACE
ts1
ENGINE
=
NDB
;
--
error
1503
--
error
ER_DROP_TABLESPACE_FAILED
DROP
TABLESPACE
ts1
ENGINE
=
NDB
;
DROP
LOGFILE
GROUP
lg1
ENGINE
=
NDB
;
--
error
1503
--
error
ER_DROP_TABLESPACE_FAILED
DROP
LOGFILE
GROUP
lg1
ENGINE
=
NDB
;
--
echo
****
End
Duplicate
Statement
Testing
****
...
...
sql/event_executor.cc
View file @
d679b903
...
...
@@ -184,6 +184,13 @@ event_executor_main(void *arg)
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init
();
if
(
sizeof
(
my_time_t
)
!=
sizeof
(
time_t
))
{
sql_print_error
(
"sizeof(my_time_t) != sizeof(time_t) ."
"The scheduler will not work correctly. Stopping."
);
goto
err_no_thd
;
}
//TODO Andrey: Check for NULL
if
(
!
(
thd
=
new
THD
))
// note that contructor of THD uses DBUG_ !
{
...
...
@@ -275,7 +282,7 @@ event_executor_main(void *arg)
}
DBUG_PRINT
(
"evex main thread"
,(
"computing time to sleep till next exec"
));
time
(
&
now
);
time
(
(
time_t
*
)
&
now
);
my_tz_UTC
->
gmt_sec_to_TIME
(
&
time_now
,
now
);
t2sleep
=
evex_time_diff
(
&
et
->
execute_at
,
&
time_now
);
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
...
...
sql/event_timed.cc
View file @
d679b903
...
...
@@ -537,7 +537,7 @@ event_timed::compute_next_execution_time()
}
goto
ret
;
}
time
(
&
now
);
time
(
(
time_t
*
)
&
now
);
my_tz_UTC
->
gmt_sec_to_TIME
(
&
time_now
,
now
);
/*
sql_print_information("[%s.%s]", dbname.str, name.str);
...
...
@@ -703,7 +703,7 @@ event_timed::mark_last_executed()
TIME
time_now
;
my_time_t
now
;
time
(
&
now
);
time
(
(
time_t
*
)
&
now
);
my_tz_UTC
->
gmt_sec_to_TIME
(
&
time_now
,
now
);
last_executed
=
time_now
;
// was execute_at
...
...
storage/ndb/include/ndbapi/NdbDictionary.hpp
View file @
d679b903
...
...
@@ -1051,7 +1051,8 @@ public:
_TE_CREATE
=
6
,
_TE_GCP_COMPLETE
=
7
,
_TE_CLUSTER_FAILURE
=
8
,
_TE_STOP
=
9
_TE_STOP
=
9
,
_TE_NUL
=
10
// internal (INS o DEL within same GCI)
};
#endif
/**
...
...
storage/ndb/include/ndbapi/NdbEventOperation.hpp
View file @
d679b903
...
...
@@ -93,6 +93,12 @@ public:
* Retrieve current state of the NdbEventOperation object
*/
State
getState
();
/**
* By default events on same NdbEventOperation within same GCI
* are merged into a single event. This can be changed with
* separateEvents(true).
*/
void
separateEvents
(
bool
flag
);
/**
* Activates the NdbEventOperation to start receiving events. The
...
...
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
View file @
d679b903
TARGET
=
ndbapi_event
SRCS
=
ndbapi_event.cpp
OBJS
=
ndbapi_event.o
CXX
=
g++
CXX
=
g++
-g
CFLAGS
=
-c
-Wall
-fno-rtti
-fno-exceptions
CXXFLAGS
=
DEBUG
=
...
...
@@ -17,7 +17,7 @@ $(TARGET): $(OBJS)
$(CXX)
$(CXXFLAGS)
$(LFLAGS)
$(LIB_DIR)
$(OBJS)
-lndbclient
-lmysqlclient_r
-lmysys
-lmystrings
-lz
$(SYS_LIB)
-o
$(TARGET)
$(TARGET).o
:
$(SRCS)
$(CXX)
$(CFLAGS)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
$(SRCS)
$(CXX)
$(CFLAGS)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
-I
$(TOP_SRCDIR)
/include
$(SRCS)
clean
:
rm
-f
*
.o
$(TARGET)
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
View file @
d679b903
...
...
@@ -58,24 +58,29 @@
/**
*
* Assume that there is a table
TAB
0 which is being updated by
* Assume that there is a table
t
0 which is being updated by
* another process (e.g. flexBench -l 0 -stdtables).
* We want to monitor what happens with columns
COL0, COL2, COL11
* We want to monitor what happens with columns
c0,c1,c2,c3.
*
* or together with the mysql client;
*
* shell> mysql -u root
* mysql> create database TEST_DB;
* mysql> use TEST_DB;
* mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb;
* mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
* primary key(c0, c2)) engine ndb charset latin1;
*
* In another window start ndbapi_event, wait until properly started
*
insert into TAB0 values (1,2,3);
insert into TAB0 values (2,2,3);
insert into TAB0 values (3,2,9);
update TAB0 set COL1=10 where COL0=1;
delete from TAB0 where COL0=1;
insert into t0 values (1, 2, 'a', 'b');
insert into t0 values (3, 4, 'c', 'd');
update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
update t0 set c3 = 'f'; -- use scan
update t0 set c3 = 'F'; -- use scan update to 'same'
update t0 set c2 = 'g' where c0 = 1; -- update pk part
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
delete from t0;
*
* you should see the data popping up in the example window
*
...
...
@@ -92,9 +97,10 @@ int myCreateEvent(Ndb* myNdb,
const
char
**
eventColumnName
,
const
int
noEventColumnName
);
int
main
()
int
main
(
int
argc
,
char
**
argv
)
{
ndb_init
();
bool
sep
=
argc
>
1
&&
strcmp
(
argv
[
1
],
"-s"
)
==
0
;
Ndb_cluster_connection
*
cluster_connection
=
new
Ndb_cluster_connection
();
// Object representing the cluster
...
...
@@ -126,13 +132,15 @@ int main()
if
(
myNdb
->
init
()
==
-
1
)
APIERROR
(
myNdb
->
getNdbError
());
const
char
*
eventName
=
"CHNG_IN_
TAB
0"
;
const
char
*
eventTableName
=
"
TAB
0"
;
const
int
noEventColumnName
=
3
;
const
char
*
eventName
=
"CHNG_IN_
t
0"
;
const
char
*
eventTableName
=
"
t
0"
;
const
int
noEventColumnName
=
4
;
const
char
*
eventColumnName
[
noEventColumnName
]
=
{
"COL0"
,
"COL1"
,
"COL11"
};
{
"c0"
,
"c1"
,
"c2"
,
"c3"
};
// Create events
myCreateEvent
(
myNdb
,
...
...
@@ -142,13 +150,14 @@ int main()
noEventColumnName
);
int
j
=
0
;
while
(
j
<
5
)
{
while
(
j
<
99
)
{
// Start "transaction" for handling events
NdbEventOperation
*
op
;
printf
(
"create EventOperation
\n
"
);
if
((
op
=
myNdb
->
createEventOperation
(
eventName
))
==
NULL
)
APIERROR
(
myNdb
->
getNdbError
());
op
->
separateEvents
(
sep
);
printf
(
"get values
\n
"
);
NdbRecAttr
*
recAttr
[
noEventColumnName
];
...
...
@@ -175,32 +184,43 @@ int main()
i
++
;
switch
(
op
->
getEventType
())
{
case
NdbDictionary
:
:
Event
::
TE_INSERT
:
printf
(
"%u INSERT
:
"
,
i
);
printf
(
"%u INSERT"
,
i
);
break
;
case
NdbDictionary
:
:
Event
::
TE_DELETE
:
printf
(
"%u DELETE
:
"
,
i
);
printf
(
"%u DELETE"
,
i
);
break
;
case
NdbDictionary
:
:
Event
::
TE_UPDATE
:
printf
(
"%u UPDATE
:
"
,
i
);
printf
(
"%u UPDATE"
,
i
);
break
;
default:
abort
();
// should not happen
}
for
(
int
i
=
1
;
i
<
noEventColumnName
;
i
++
)
{
printf
(
" gci=%d
\n
"
,
op
->
getGCI
());
printf
(
"post: "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttr
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
printf
(
" post[%u]="
,
i
);
if
(
recAttr
[
i
]
->
isNULL
()
==
0
)
// we have a non-null value
printf
(
"%u"
,
recAttr
[
i
]
->
u_32_value
());
else
// we have a null value
printf
(
"NULL"
);
if
(
recAttr
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttr
[
i
]
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttr
[
i
]
->
aRef
());
}
else
// we have a null value
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
}
printf
(
"
\n
pre : "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttrPre
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
printf
(
" pre[%u]="
,
i
);
if
(
recAttrPre
[
i
]
->
isNULL
()
==
0
)
// we have a non-null value
printf
(
"%u"
,
recAttrPre
[
i
]
->
u_32_value
());
else
// we have a null value
printf
(
"NULL"
);
}
if
(
recAttrPre
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttrPre
[
i
]
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttrPre
[
i
]
->
aRef
());
}
else
// we have a null value
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
}
printf
(
"
\n
"
);
}
...
...
storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp
View file @
d679b903
...
...
@@ -132,7 +132,7 @@ void Dbtup::updatePackedList(Signal* signal, Uint16 hostId)
void
Dbtup
::
sendReadAttrinfo
(
Signal
*
signal
,
KeyReqStruct
*
req_struct
,
Uint32
ToutBufIndex
,
const
Operationrec
*
const
regOperPtr
)
const
Operationrec
*
regOperPtr
)
{
if
(
ToutBufIndex
==
0
)
return
;
...
...
storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp
View file @
d679b903
...
...
@@ -242,8 +242,8 @@ Dbtup::alloc_page(Tablerec* tabPtrP, Fragrecord* fragPtrP,
}
Uint32
*
Dbtup
::
alloc_fix_rowid
(
Fragrecord
*
const
regFragPtr
,
Tablerec
*
const
regTabPtr
,
Dbtup
::
alloc_fix_rowid
(
Fragrecord
*
regFragPtr
,
Tablerec
*
regTabPtr
,
Local_key
*
key
,
Uint32
*
out_frag_page_id
)
{
...
...
storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp
View file @
d679b903
...
...
@@ -89,7 +89,7 @@
//
// The full page range struct
Uint32
Dbtup
::
getEmptyPage
(
Fragrecord
*
const
regFragPtr
)
Uint32
Dbtup
::
getEmptyPage
(
Fragrecord
*
regFragPtr
)
{
Uint32
pageId
=
regFragPtr
->
emptyPrimPage
.
firstItem
;
if
(
pageId
==
RNIL
)
{
...
...
@@ -108,7 +108,7 @@ Uint32 Dbtup::getEmptyPage(Fragrecord* const regFragPtr)
return
pageId
;
}
//Dbtup::getEmptyPage()
Uint32
Dbtup
::
getRealpid
(
Fragrecord
*
const
regFragPtr
,
Uint32
logicalPageId
)
Uint32
Dbtup
::
getRealpid
(
Fragrecord
*
regFragPtr
,
Uint32
logicalPageId
)
{
PageRangePtr
grpPageRangePtr
;
Uint32
loopLimit
;
...
...
@@ -241,7 +241,7 @@ bool Dbtup::insertPageRangeTab(Fragrecord* const regFragPtr,
}
//Dbtup::insertPageRangeTab()
void
Dbtup
::
releaseFragPages
(
Fragrecord
*
const
regFragPtr
)
void
Dbtup
::
releaseFragPages
(
Fragrecord
*
regFragPtr
)
{
if
(
regFragPtr
->
rootPageRange
==
RNIL
)
{
ljam
();
...
...
@@ -349,7 +349,7 @@ void Dbtup::initFragRange(Fragrecord* const regFragPtr)
regFragPtr
->
nextStartRange
=
0
;
}
//initFragRange()
Uint32
Dbtup
::
allocFragPages
(
Fragrecord
*
const
regFragPtr
,
Uint32
tafpNoAllocRequested
)
Uint32
Dbtup
::
allocFragPages
(
Fragrecord
*
regFragPtr
,
Uint32
tafpNoAllocRequested
)
{
Uint32
tafpPagesAllocated
=
0
;
while
(
true
)
{
...
...
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
View file @
d679b903
...
...
@@ -28,7 +28,7 @@
#define ljamEntry() { jamEntryLine(3000 + __LINE__); }
void
Dbtup
::
setUpQueryRoutines
(
Tablerec
*
const
regTabPtr
)
Dbtup
::
setUpQueryRoutines
(
Tablerec
*
regTabPtr
)
{
Uint32
startDescriptor
=
regTabPtr
->
tabDescriptor
;
ndbrequire
((
startDescriptor
+
(
regTabPtr
->
m_no_of_attributes
<<
ZAD_LOG_SIZE
))
...
...
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
View file @
d679b903
...
...
@@ -356,8 +356,8 @@ Dbtup::dropTrigger(Tablerec* table, const DropTrigReq* req)
/* ---------------------------------------------------------------- */
void
Dbtup
::
checkImmediateTriggersAfterInsert
(
KeyReqStruct
*
req_struct
,
Operationrec
*
const
regOperPtr
,
Tablerec
*
const
regTablePtr
)
Operationrec
*
regOperPtr
,
Tablerec
*
regTablePtr
)
{
if
(
refToBlock
(
req_struct
->
TC_ref
)
!=
DBTC
)
{
return
;
...
...
@@ -374,8 +374,8 @@ Dbtup::checkImmediateTriggersAfterInsert(KeyReqStruct *req_struct,
void
Dbtup
::
checkImmediateTriggersAfterUpdate
(
KeyReqStruct
*
req_struct
,
Operationrec
*
const
regOperPtr
,
Tablerec
*
const
regTablePtr
)
Operationrec
*
regOperPtr
,
Tablerec
*
regTablePtr
)
{
if
(
refToBlock
(
req_struct
->
TC_ref
)
!=
DBTC
)
{
return
;
...
...
@@ -399,8 +399,8 @@ Dbtup::checkImmediateTriggersAfterUpdate(KeyReqStruct *req_struct,
void
Dbtup
::
checkImmediateTriggersAfterDelete
(
KeyReqStruct
*
req_struct
,
Operationrec
*
const
regOperPtr
,
Tablerec
*
const
regTablePtr
)
Operationrec
*
regOperPtr
,
Tablerec
*
regTablePtr
)
{
if
(
refToBlock
(
req_struct
->
TC_ref
)
!=
DBTC
)
{
return
;
...
...
@@ -444,8 +444,8 @@ void Dbtup::checkDeferredTriggers(Signal* signal,
/* */
/* ---------------------------------------------------------------- */
void
Dbtup
::
checkDetachedTriggers
(
KeyReqStruct
*
req_struct
,
Operationrec
*
const
regOperPtr
,
Tablerec
*
const
regTablePtr
)
Operationrec
*
regOperPtr
,
Tablerec
*
regTablePtr
)
{
Uint32
save_type
=
regOperPtr
->
op_struct
.
op_type
;
Tuple_header
*
save_ptr
=
req_struct
->
m_tuple_ptr
;
...
...
@@ -1049,9 +1049,9 @@ void Dbtup::sendFireTrigOrd(Signal* signal,
int
Dbtup
::
executeTuxInsertTriggers
(
Signal
*
signal
,
Operationrec
*
const
regOperPtr
,
Fragrecord
*
const
regFragPtr
,
Tablerec
*
const
regTabPtr
)
Operationrec
*
regOperPtr
,
Fragrecord
*
regFragPtr
,
Tablerec
*
regTabPtr
)
{
TuxMaintReq
*
const
req
=
(
TuxMaintReq
*
)
signal
->
getDataPtrSend
();
// fill in constant part
...
...
@@ -1066,9 +1066,9 @@ Dbtup::executeTuxInsertTriggers(Signal* signal,
int
Dbtup
::
executeTuxUpdateTriggers
(
Signal
*
signal
,
Operationrec
*
const
regOperPtr
,
Fragrecord
*
const
regFragPtr
,
Tablerec
*
const
regTabPtr
)
Operationrec
*
regOperPtr
,
Fragrecord
*
regFragPtr
,
Tablerec
*
regTabPtr
)
{
TuxMaintReq
*
const
req
=
(
TuxMaintReq
*
)
signal
->
getDataPtrSend
();
// fill in constant part
...
...
@@ -1139,8 +1139,8 @@ Dbtup::executeTuxDeleteTriggers(Signal* signal,
void
Dbtup
::
executeTuxCommitTriggers
(
Signal
*
signal
,
Operationrec
*
regOperPtr
,
Fragrecord
*
const
regFragPtr
,
Tablerec
*
const
regTabPtr
)
Fragrecord
*
regFragPtr
,
Tablerec
*
regTabPtr
)
{
TuxMaintReq
*
const
req
=
(
TuxMaintReq
*
)
signal
->
getDataPtrSend
();
Uint32
tupVersion
;
...
...
@@ -1174,8 +1174,8 @@ Dbtup::executeTuxCommitTriggers(Signal* signal,
void
Dbtup
::
executeTuxAbortTriggers
(
Signal
*
signal
,
Operationrec
*
regOperPtr
,
Fragrecord
*
const
regFragPtr
,
Tablerec
*
const
regTabPtr
)
Fragrecord
*
regFragPtr
,
Tablerec
*
regTabPtr
)
{
TuxMaintReq
*
const
req
=
(
TuxMaintReq
*
)
signal
->
getDataPtrSend
();
// get version
...
...
storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
View file @
d679b903
...
...
@@ -83,6 +83,8 @@ Ndbd_mem_manager::init(Uint32 pages)
release
(
start
+
1
,
end
-
1
-
start
);
}
return
0
;
}
void
...
...
storage/ndb/src/ndbapi/NdbEventOperation.cpp
View file @
d679b903
...
...
@@ -38,6 +38,11 @@ NdbEventOperation::State NdbEventOperation::getState()
return
m_impl
.
getState
();
}
void
NdbEventOperation
::
separateEvents
(
bool
flag
)
{
m_impl
.
m_separateEvents
=
flag
;
}
NdbRecAttr
*
NdbEventOperation
::
getValue
(
const
char
*
colName
,
char
*
aValue
)
{
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
View file @
d679b903
...
...
@@ -104,6 +104,8 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
m_state
=
EO_CREATED
;
m_separateEvents
=
false
;
m_has_error
=
0
;
DBUG_PRINT
(
"exit"
,(
"this: 0x%x oid: %u"
,
this
,
m_oid
));
...
...
@@ -693,6 +695,21 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
return
ret
;
}
#ifdef VM_TRACE
static
void
print_std
(
const
char
*
tag
,
const
SubTableData
*
sdata
,
LinearSectionPtr
ptr
[
3
])
{
printf
(
"%s
\n
"
,
tag
);
printf
(
"addr=%p gci=%d op=%d
\n
"
,
(
void
*
)
sdata
,
sdata
->
gci
,
sdata
->
operation
);
for
(
int
i
=
0
;
i
<=
2
;
i
++
)
{
printf
(
"sec=%d addr=%p sz=%d
\n
"
,
i
,
(
void
*
)
ptr
[
i
].
p
,
ptr
[
i
].
sz
);
for
(
int
j
=
0
;
j
<
ptr
[
i
].
sz
;
j
++
)
printf
(
"%08x "
,
ptr
[
i
].
p
[
j
]);
printf
(
"
\n
"
);
}
}
#endif
NdbEventOperation
*
NdbEventBuffer
::
nextEvent
()
{
...
...
@@ -734,6 +751,10 @@ NdbEventBuffer::nextEvent()
op
->
m_data_done_count
++
;
#endif
// NUL event is not returned
if
(
data
->
sdata
->
operation
==
NdbDictionary
::
Event
::
_TE_NUL
)
continue
;
int
r
=
op
->
receive_event
();
if
(
r
>
0
)
{
...
...
@@ -1099,13 +1120,15 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_ENTER
(
"NdbEventBuffer::insertDataL"
);
Uint64
gci
=
sdata
->
gci
;
EventBufData
*
data
=
m_free_data
;
if
(
likely
((
Uint32
)
op
->
mi_type
&
1
<<
(
Uint32
)
sdata
->
operation
)
)
{
Gci_container
*
bucket
=
find_bucket
(
&
m_active_gci
,
gci
);
DBUG_PRINT
(
"info"
,
(
"data insertion in eventId %d"
,
op
->
m_eventId
));
DBUG_PRINT
(
"info"
,
(
"gci=%d tab=%d op=%d node=%d"
,
sdata
->
gci
,
sdata
->
tableId
,
sdata
->
operation
,
sdata
->
req_nodeid
));
if
(
unlikely
(
bucket
==
0
))
{
...
...
@@ -1116,6 +1139,84 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_RETURN
(
0
);
}
bool
use_hash
=
!
op
->
m_separateEvents
&&
sdata
->
operation
<
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
;
// find position in bucket hash table
EventBufData
*
data
=
0
;
EventBufData_hash
::
Pos
hpos
;
if
(
use_hash
)
{
bucket
->
m_data_hash
.
search
(
hpos
,
op
,
ptr
);
data
=
hpos
.
data
;
}
if
(
data
==
0
)
{
// allocate new result buffer
data
=
alloc_data
();
if
(
unlikely
(
data
==
0
))
{
op
->
m_has_error
=
2
;
DBUG_RETURN
(
-
1
);
}
if
(
unlikely
(
copy_data
(
sdata
,
ptr
,
data
)))
{
op
->
m_has_error
=
3
;
DBUG_RETURN
(
-
1
);
}
// add it to list and hash table
bucket
->
m_data
.
append
(
data
);
if
(
use_hash
)
{
bucket
->
m_data_hash
.
append
(
hpos
,
data
);
}
#ifdef VM_TRACE
op
->
m_data_count
++
;
#endif
}
else
{
// event with same op, PK found, merge into old buffer
if
(
unlikely
(
merge_data
(
sdata
,
ptr
,
data
)))
{
op
->
m_has_error
=
3
;
DBUG_RETURN
(
-
1
);
}
}
data
->
m_event_op
=
op
;
if
(
use_hash
)
{
data
->
m_pkhash
=
hpos
.
pkhash
;
}
DBUG_RETURN
(
0
);
}
#ifdef VM_TRACE
if
((
Uint32
)
op
->
m_eventImpl
->
mi_type
&
1
<<
(
Uint32
)
sdata
->
operation
)
{
// XXX never reached
DBUG_PRINT
(
"info"
,(
"Data arrived before ready eventId"
,
op
->
m_eventId
));
DBUG_RETURN
(
0
);
}
else
{
DBUG_PRINT
(
"info"
,(
"skipped"
));
DBUG_RETURN
(
0
);
}
#else
return
0
;
#endif
}
// allocate EventBufData
EventBufData
*
NdbEventBuffer
::
alloc_data
()
{
DBUG_ENTER
(
"alloc_data"
);
EventBufData
*
data
=
m_free_data
;
if
(
unlikely
(
data
==
0
))
{
#ifdef VM_TRACE
...
...
@@ -1125,7 +1226,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
expand
(
4000
);
reportStatus
();
data
=
m_free_data
;
data
=
m_free_data
;
if
(
unlikely
(
data
==
0
))
{
#ifdef VM_TRACE
...
...
@@ -1139,125 +1240,296 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
m_available_data
.
m_tail
?
m_available_data
.
m_tail
->
sdata
->
gci
:
0
);
printf
(
"m_used_data_count %d
\n
"
,
m_used_data
.
m_count
);
#endif
op
->
m_has_error
=
2
;
DBUG_RETURN
(
-
1
);
// TODO handle this, overrun, or, skip?
DBUG_RETURN
(
0
);
// TODO handle this, overrun, or, skip?
}
}
// remove data from free list
m_free_data
=
data
->
m_next
;
m_free_data
=
data
->
m_next
;
data
->
m_next
=
0
;
#ifdef VM_TRACE
m_free_data_count
--
;
assert
(
m_free_data_sz
>=
data
->
sz
);
#endif
m_free_data_sz
-=
data
->
sz
;
m_free_data_sz
-=
data
->
sz
;
DBUG_RETURN
(
data
);
}
if
(
unlikely
(
copy_data_alloc
(
sdata
,
ptr
,
data
)))
{
op
->
m_has_error
=
3
;
DBUG_RETURN
(
-
1
);
}
// allocate initial or bigger memory area in EventBufData
// takes sizes from given ptr and sets up data->ptr
int
NdbEventBuffer
::
alloc_mem
(
EventBufData
*
data
,
LinearSectionPtr
ptr
[
3
])
{
const
Uint32
min_alloc_size
=
128
;
// add it to received data
bucket
->
m_data
.
append
(
data
);
Uint32
sz4
=
(
sizeof
(
SubTableData
)
+
3
)
>>
2
;
Uint32
alloc_size
=
(
sz4
+
ptr
[
0
].
sz
+
ptr
[
1
].
sz
+
ptr
[
2
].
sz
)
<<
2
;
if
(
alloc_size
<
min_alloc_size
)
alloc_size
=
min_alloc_size
;
data
->
m_event_op
=
op
;
#ifdef VM_TRACE
op
->
m_data_count
++
;
#endif
DBUG_RETURN
(
0
);
if
(
data
->
sz
<
alloc_size
)
{
NdbMem_Free
((
char
*
)
data
->
memory
);
assert
(
m_total_alloc
>=
data
->
sz
);
m_total_alloc
-=
data
->
sz
;
data
->
memory
=
0
;
data
->
sz
=
0
;
data
->
memory
=
(
Uint32
*
)
NdbMem_Allocate
(
alloc_size
);
if
(
data
->
memory
==
0
)
return
-
1
;
data
->
sz
=
alloc_size
;
m_total_alloc
+=
data
->
sz
;
}
#ifdef VM_TRACE
if
((
Uint32
)
op
->
m_eventImpl
->
mi_type
&
1
<<
(
Uint32
)
sdata
->
operation
)
Uint32
*
memptr
=
data
->
memory
;
memptr
+=
sz4
;
int
i
;
for
(
i
=
0
;
i
<=
2
;
i
++
)
{
DBUG_PRINT
(
"info"
,(
"Data arrived before ready eventId"
,
op
->
m_eventId
));
DBUG_RETURN
(
0
);
data
->
ptr
[
i
].
p
=
memptr
;
data
->
ptr
[
i
].
sz
=
ptr
[
i
].
sz
;
memptr
+=
ptr
[
i
].
sz
;
}
else
{
DBUG_PRINT
(
"info"
,(
"skipped"
));
DBUG_RETURN
(
0
);
}
#else
return
0
;
#endif
}
int
NdbEventBuffer
::
copy_data
_alloc
(
const
SubTableData
*
const
f_
sdata
,
LinearSectionPtr
f_
ptr
[
3
],
EventBufData
*
ev_buf
)
NdbEventBuffer
::
copy_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
)
{
DBUG_ENTER
(
"NdbEventBuffer::copy_data_alloc"
);
const
unsigned
min_alloc_size
=
128
;
const
unsigned
sz4
=
(
sizeof
(
SubTableData
)
+
3
)
>>
2
;
Uint32
f_ptr_sz_0
=
f_ptr
[
0
].
sz
;
Uint32
f_ptr_sz_1
=
f_ptr
[
1
].
sz
;
Uint32
f_ptr_sz_2
=
f_ptr
[
2
].
sz
;
LinearSectionPtr
*
t_ptr
=
ev_buf
->
ptr
;
SubTableData
*
sdata
=
ev_buf
->
sdata
;
const
unsigned
alloc_size
=
(
sz4
+
f_ptr_sz_0
+
f_ptr_sz_1
+
f_ptr_sz_2
)
*
sizeof
(
Uint32
);
Uint32
*
ptr
;
if
(
alloc_size
>
min_alloc_size
)
{
if
(
sdata
)
{
NdbMem_Free
((
char
*
)
sdata
);
#ifdef VM_TRACE
assert
(
m_total_alloc
>=
ev_buf
->
sz
);
#endif
m_total_alloc
-=
ev_buf
->
sz
;
}
ptr
=
(
Uint32
*
)
NdbMem_Allocate
(
alloc_size
);
ev_buf
->
sdata
=
(
SubTableData
*
)
ptr
;
ev_buf
->
sz
=
alloc_size
;
m_total_alloc
+=
alloc_size
;
}
else
/* alloc_size <= min_alloc_size */
DBUG_ENTER
(
"NdbEventBuffer::copy_data"
);
if
(
alloc_mem
(
data
,
ptr
)
!=
0
)
DBUG_RETURN
(
-
1
);
memcpy
(
data
->
sdata
,
sdata
,
sizeof
(
SubTableData
));
int
i
;
for
(
i
=
0
;
i
<=
2
;
i
++
)
memcpy
(
data
->
ptr
[
i
].
p
,
ptr
[
i
].
p
,
ptr
[
i
].
sz
<<
2
);
DBUG_RETURN
(
0
);
}
static
struct
Ev_t
{
enum
{
INS
=
NdbDictionary
::
Event
::
_TE_INSERT
,
DEL
=
NdbDictionary
::
Event
::
_TE_DELETE
,
UPD
=
NdbDictionary
::
Event
::
_TE_UPDATE
,
NUL
=
NdbDictionary
::
Event
::
_TE_NUL
,
ERR
=
255
};
int
t1
,
t2
,
t3
;
}
ev_t
[]
=
{
{
Ev_t
::
INS
,
Ev_t
::
INS
,
Ev_t
::
ERR
},
{
Ev_t
::
INS
,
Ev_t
::
DEL
,
Ev_t
::
NUL
},
//ok
{
Ev_t
::
INS
,
Ev_t
::
UPD
,
Ev_t
::
INS
},
//ok
{
Ev_t
::
DEL
,
Ev_t
::
INS
,
Ev_t
::
UPD
},
//ok
{
Ev_t
::
DEL
,
Ev_t
::
DEL
,
Ev_t
::
ERR
},
{
Ev_t
::
DEL
,
Ev_t
::
UPD
,
Ev_t
::
ERR
},
{
Ev_t
::
UPD
,
Ev_t
::
INS
,
Ev_t
::
ERR
},
{
Ev_t
::
UPD
,
Ev_t
::
DEL
,
Ev_t
::
DEL
},
//ok
{
Ev_t
::
UPD
,
Ev_t
::
UPD
,
Ev_t
::
UPD
}
//ok
};
/*
* | INS | DEL | UPD
* 0 | pk ah + all ah | pk ah | pk ah + new ah
* 1 | pk ad + all ad | old pk ad | new pk ad + new ad
* 2 | empty | old non-pk ah+ad | old ah+ad
*/
static
AttributeHeader
copy_head
(
Uint32
&
i1
,
Uint32
*
p1
,
Uint32
&
i2
,
const
Uint32
*
p2
,
Uint32
flags
)
{
AttributeHeader
ah
(
p2
[
i2
]);
bool
do_copy
=
(
flags
&
1
);
if
(
do_copy
)
p1
[
i1
]
=
p2
[
i2
];
i1
++
;
i2
++
;
return
ah
;
}
static
void
copy_attr
(
AttributeHeader
ah
,
Uint32
&
j1
,
Uint32
*
p1
,
Uint32
&
j2
,
const
Uint32
*
p2
,
Uint32
flags
)
{
bool
do_copy
=
(
flags
&
1
);
bool
with_head
=
(
flags
&
2
);
Uint32
n
=
with_head
+
ah
.
getDataSize
();
if
(
do_copy
)
{
if
(
sdata
)
ptr
=
(
Uint32
*
)
sdata
;
Uint32
k
;
for
(
k
=
0
;
k
<
n
;
k
++
)
p1
[
j1
++
]
=
p2
[
j2
++
];
}
else
{
ptr
=
(
Uint32
*
)
NdbMem_Allocate
(
min_alloc_size
);
ev_buf
->
sdata
=
(
SubTableData
*
)
ptr
;
ev_buf
->
sz
=
min_alloc_size
;
m_total_alloc
+=
min_alloc_size
;
j1
+=
n
;
j2
+=
n
;
}
}
int
NdbEventBuffer
::
merge_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr2
[
3
],
EventBufData
*
data
)
{
DBUG_ENTER
(
"NdbEventBuffer::merge_data"
);
Uint32
nkey
=
data
->
m_event_op
->
m_eventImpl
->
m_tableImpl
->
m_noOfKeys
;
int
t1
=
data
->
sdata
->
operation
;
int
t2
=
sdata
->
operation
;
if
(
t1
==
Ev_t
::
NUL
)
DBUG_RETURN
(
copy_data
(
sdata
,
ptr2
,
data
));
Ev_t
*
tp
=
0
;
int
i
;
for
(
i
=
0
;
i
<
sizeof
(
ev_t
)
/
sizeof
(
ev_t
[
0
]);
i
++
)
{
if
(
ev_t
[
i
].
t1
==
t1
&&
ev_t
[
i
].
t2
==
t2
)
{
tp
=
&
ev_t
[
i
];
break
;
}
}
assert
(
tp
!=
0
&&
tp
->
t3
!=
Ev_t
::
ERR
);
memcpy
(
ptr
,
f_sdata
,
sizeof
(
SubTableData
));
ptr
+=
sz4
;
// save old data
EventBufData
olddata
=
*
data
;
data
->
memory
=
0
;
data
->
sz
=
0
;
t_ptr
->
p
=
ptr
;
t_ptr
->
sz
=
f_ptr_sz_0
;
// compose ptr1 o ptr2 = ptr
LinearSectionPtr
(
&
ptr1
)
[
3
]
=
olddata
.
ptr
;
LinearSectionPtr
(
&
ptr
)
[
3
]
=
data
->
ptr
;
memcpy
(
ptr
,
f_ptr
[
0
].
p
,
sizeof
(
Uint32
)
*
f_ptr_sz_0
);
ptr
+=
f_ptr_sz_0
;
t_ptr
++
;
// loop twice where first loop only sets sizes
int
loop
;
for
(
loop
=
0
;
loop
<=
1
;
loop
++
)
{
if
(
loop
==
1
)
{
if
(
alloc_mem
(
data
,
ptr
)
!=
0
)
DBUG_RETURN
(
-
1
);
*
data
->
sdata
=
*
sdata
;
data
->
sdata
->
operation
=
tp
->
t3
;
}
t_ptr
->
p
=
ptr
;
t_ptr
->
sz
=
f_ptr_sz_1
;
ptr
[
0
].
sz
=
ptr
[
1
].
sz
=
ptr
[
3
].
sz
=
0
;
memcpy
(
ptr
,
f_ptr
[
1
].
p
,
sizeof
(
Uint32
)
*
f_ptr_sz_1
);
ptr
+=
f_ptr_sz_1
;
t_ptr
++
;
// copy pk from new version
{
AttributeHeader
ah
;
Uint32
i
=
0
;
Uint32
j
=
0
;
Uint32
i2
=
0
;
Uint32
j2
=
0
;
while
(
i
<
nkey
)
{
ah
=
copy_head
(
i
,
ptr
[
0
].
p
,
i2
,
ptr2
[
0
].
p
,
loop
);
copy_attr
(
ah
,
j
,
ptr
[
1
].
p
,
j2
,
ptr2
[
1
].
p
,
loop
);
}
ptr
[
0
].
sz
=
i
;
ptr
[
1
].
sz
=
j
;
}
if
(
f_ptr_sz_2
)
// merge after values, new version overrides
if
(
tp
->
t3
!=
Ev_t
::
DEL
)
{
AttributeHeader
ah
;
Uint32
i
=
ptr
[
0
].
sz
;
Uint32
j
=
ptr
[
1
].
sz
;
Uint32
i1
=
0
;
Uint32
j1
=
0
;
Uint32
i2
=
nkey
;
Uint32
j2
=
ptr
[
1
].
sz
;
while
(
i1
<
nkey
)
{
j1
+=
AttributeHeader
(
ptr1
[
0
].
p
[
i1
++
]).
getDataSize
();
}
while
(
1
)
{
bool
b1
=
(
i1
<
ptr1
[
0
].
sz
);
bool
b2
=
(
i2
<
ptr2
[
0
].
sz
);
if
(
b1
&&
b2
)
{
Uint32
id1
=
AttributeHeader
(
ptr1
[
0
].
p
[
i1
]).
getAttributeId
();
Uint32
id2
=
AttributeHeader
(
ptr2
[
0
].
p
[
i2
]).
getAttributeId
();
if
(
id1
<
id2
)
b2
=
false
;
else
if
(
id1
>
id2
)
b1
=
false
;
else
{
j1
+=
AttributeHeader
(
ptr1
[
0
].
p
[
i1
++
]).
getDataSize
();
b1
=
false
;
}
}
if
(
b1
)
{
ah
=
copy_head
(
i
,
ptr
[
0
].
p
,
i1
,
ptr1
[
0
].
p
,
loop
);
copy_attr
(
ah
,
j
,
ptr
[
1
].
p
,
j1
,
ptr1
[
1
].
p
,
loop
);
}
else
if
(
b2
)
{
t_ptr
->
p
=
ptr
;
t_ptr
->
sz
=
f_ptr_sz_2
;
memcpy
(
ptr
,
f_ptr
[
2
].
p
,
sizeof
(
Uint32
)
*
f_ptr_sz_2
);
ah
=
copy_head
(
i
,
ptr
[
0
].
p
,
i2
,
ptr2
[
0
].
p
,
loop
);
copy_attr
(
ah
,
j
,
ptr
[
1
].
p
,
j2
,
ptr2
[
1
].
p
,
loop
);
}
else
break
;
}
ptr
[
0
].
sz
=
i
;
ptr
[
1
].
sz
=
j
;
}
// merge before values, old version overrides
if
(
tp
->
t3
!=
Ev_t
::
INS
)
{
AttributeHeader
ah
;
Uint32
k
=
0
;
Uint32
k1
=
0
;
Uint32
k2
=
0
;
while
(
1
)
{
bool
b1
=
(
k1
<
ptr1
[
2
].
sz
);
bool
b2
=
(
k2
<
ptr2
[
2
].
sz
);
if
(
b1
&&
b2
)
{
Uint32
id1
=
AttributeHeader
(
ptr1
[
2
].
p
[
k1
]).
getAttributeId
();
Uint32
id2
=
AttributeHeader
(
ptr2
[
2
].
p
[
k2
]).
getAttributeId
();
if
(
id1
<
id2
)
b2
=
false
;
else
if
(
id1
>
id2
)
b1
=
false
;
else
{
t_ptr
->
p
=
0
;
t_ptr
->
sz
=
0
;
k2
+=
1
+
AttributeHeader
(
ptr2
[
2
].
p
[
k2
]).
getDataSize
();
b2
=
false
;
}
}
if
(
b1
)
{
ah
=
AttributeHeader
(
ptr1
[
2
].
p
[
k1
]);
copy_attr
(
ah
,
k
,
ptr
[
2
].
p
,
k1
,
ptr1
[
2
].
p
,
loop
|
2
);
}
else
if
(
b2
)
{
ah
=
AttributeHeader
(
ptr2
[
2
].
p
[
k2
]);
copy_attr
(
ah
,
k
,
ptr
[
2
].
p
,
k2
,
ptr2
[
2
].
p
,
loop
|
2
);
}
else
break
;
}
ptr
[
2
].
sz
=
k
;
}
}
// free old data
NdbMem_Free
((
char
*
)
olddata
.
memory
);
DBUG_RETURN
(
0
);
}
...
...
@@ -1399,5 +1671,107 @@ send_report:
#endif
}
// hash table routines
// could optimize the all-fixed case
Uint32
EventBufData_hash
::
getpkhash
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
])
{
const
NdbTableImpl
*
tab
=
op
->
m_eventImpl
->
m_tableImpl
;
// in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
// for pk update (to equivalent pk) post/pre values give same hash
Uint32
nkey
=
tab
->
m_noOfKeys
;
assert
(
nkey
!=
0
&&
nkey
<=
ptr
[
0
].
sz
);
const
Uint32
*
hptr
=
ptr
[
0
].
p
;
const
uchar
*
dptr
=
(
uchar
*
)
ptr
[
1
].
p
;
// hash registers
ulong
nr1
=
0
;
ulong
nr2
=
0
;
while
(
nkey
--
!=
0
)
{
AttributeHeader
ah
(
*
hptr
++
);
Uint32
bytesize
=
ah
.
getByteSize
();
assert
(
dptr
+
bytesize
<=
(
uchar
*
)(
ptr
[
1
].
p
+
ptr
[
1
].
sz
));
Uint32
i
=
ah
.
getAttributeId
();
const
NdbColumnImpl
*
col
=
tab
->
getColumn
(
i
);
assert
(
col
!=
0
);
Uint32
lb
,
len
;
bool
ok
=
NdbSqlUtil
::
get_var_length
(
col
->
m_type
,
dptr
,
bytesize
,
lb
,
len
);
assert
(
ok
);
CHARSET_INFO
*
cs
=
col
->
m_cs
?
col
->
m_cs
:
&
my_charset_bin
;
(
*
cs
->
coll
->
hash_sort
)(
cs
,
dptr
+
lb
,
len
,
&
nr1
,
&
nr2
);
dptr
+=
bytesize
;
}
return
nr1
;
}
// this is seldom invoked
bool
EventBufData_hash
::
getpkequal
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr1
[
3
],
LinearSectionPtr
ptr2
[
3
])
{
const
NdbTableImpl
*
tab
=
op
->
m_eventImpl
->
m_tableImpl
;
Uint32
nkey
=
tab
->
m_noOfKeys
;
assert
(
nkey
!=
0
&&
nkey
<=
ptr1
[
0
].
sz
&&
nkey
<=
ptr2
[
0
].
sz
);
const
Uint32
*
hptr1
=
ptr1
[
0
].
p
;
const
Uint32
*
hptr2
=
ptr2
[
0
].
p
;
const
uchar
*
dptr1
=
(
uchar
*
)
ptr1
[
1
].
p
;
const
uchar
*
dptr2
=
(
uchar
*
)
ptr2
[
1
].
p
;
while
(
nkey
--
!=
0
)
{
AttributeHeader
ah1
(
*
hptr1
++
);
AttributeHeader
ah2
(
*
hptr2
++
);
// sizes can differ on update of varchar endspace
Uint32
bytesize1
=
ah1
.
getByteSize
();
Uint32
bytesize2
=
ah1
.
getByteSize
();
assert
(
dptr1
+
bytesize1
<=
(
uchar
*
)(
ptr1
[
1
].
p
+
ptr1
[
1
].
sz
));
assert
(
dptr2
+
bytesize2
<=
(
uchar
*
)(
ptr2
[
1
].
p
+
ptr2
[
1
].
sz
));
assert
(
ah1
.
getAttributeId
()
==
ah2
.
getAttributeId
());
Uint32
i
=
ah1
.
getAttributeId
();
const
NdbColumnImpl
*
col
=
tab
->
getColumn
(
i
);
assert
(
col
!=
0
);
Uint32
lb1
,
len1
;
bool
ok1
=
NdbSqlUtil
::
get_var_length
(
col
->
m_type
,
dptr1
,
bytesize1
,
lb1
,
len1
);
Uint32
lb2
,
len2
;
bool
ok2
=
NdbSqlUtil
::
get_var_length
(
col
->
m_type
,
dptr2
,
bytesize2
,
lb2
,
len2
);
assert
(
ok1
&&
ok2
&&
lb1
==
lb2
);
CHARSET_INFO
*
cs
=
col
->
m_cs
?
col
->
m_cs
:
&
my_charset_bin
;
int
res
=
(
cs
->
coll
->
strnncollsp
)(
cs
,
dptr1
+
lb1
,
len1
,
dptr2
+
lb2
,
len2
,
false
);
if
(
res
!=
0
)
return
false
;
dptr1
+=
bytesize1
;
dptr2
+=
bytesize2
;
}
return
true
;
}
void
EventBufData_hash
::
search
(
Pos
&
hpos
,
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
])
{
Uint32
pkhash
=
getpkhash
(
op
,
ptr
);
Uint32
index
=
(
op
->
m_oid
^
pkhash
)
%
GCI_EVENT_HASH_SIZE
;
EventBufData
*
data
=
m_hash
[
index
];
while
(
data
!=
0
)
{
if
(
data
->
m_event_op
==
op
&&
data
->
m_pkhash
==
pkhash
&&
getpkequal
(
op
,
data
->
ptr
,
ptr
))
break
;
data
=
data
->
m_next_hash
;
}
hpos
.
index
=
index
;
hpos
.
data
=
data
;
hpos
.
pkhash
=
pkhash
;
}
template
class
Vector
<
Gci_container
>;
template
class
Vector
<
NdbEventBuffer
::
EventBufData_chunk
*
>;
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
View file @
d679b903
...
...
@@ -25,16 +25,19 @@
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
class
NdbEventOperationImpl
;
struct
EventBufData
{
union
{
SubTableData
*
sdata
;
char
*
memory
;
Uint32
*
memory
;
};
LinearSectionPtr
ptr
[
3
];
unsigned
sz
;
NdbEventOperationImpl
*
m_event_op
;
EventBufData
*
m_next
;
// Next wrt to global order
EventBufData
*
m_next_hash
;
// Next in per-GCI hash
Uint32
m_pkhash
;
// PK hash (without op) for fast compare
};
class
EventBufData_list
...
...
@@ -116,6 +119,34 @@ void EventBufData_list::append(const EventBufData_list &list)
m_sz
+=
list
.
m_sz
;
}
// GCI bucket has also a hash over data, with key event op, table PK.
// It can only be appended to and is invalid after remove_first().
class
EventBufData_hash
{
public:
struct
Pos
{
// search result
Uint32
index
;
// index into hash array
EventBufData
*
data
;
// non-zero if found
Uint32
pkhash
;
// PK hash
};
static
Uint32
getpkhash
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
]);
static
bool
getpkequal
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr1
[
3
],
LinearSectionPtr
ptr2
[
3
]);
void
search
(
Pos
&
hpos
,
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
]);
void
append
(
Pos
&
hpos
,
EventBufData
*
data
);
enum
{
GCI_EVENT_HASH_SIZE
=
101
};
EventBufData
*
m_hash
[
GCI_EVENT_HASH_SIZE
];
};
inline
void
EventBufData_hash
::
append
(
Pos
&
hpos
,
EventBufData
*
data
)
{
data
->
m_next_hash
=
m_hash
[
hpos
.
index
];
m_hash
[
hpos
.
index
]
=
data
;
}
struct
Gci_container
{
enum
State
...
...
@@ -127,6 +158,7 @@ struct Gci_container
Uint32
m_gcp_complete_rep_count
;
// Remaining SUB_GCP_COMPLETE_REP until done
Uint64
m_gci
;
// GCI
EventBufData_list
m_data
;
EventBufData_hash
m_data_hash
;
};
class
NdbEventOperationImpl
:
public
NdbEventOperation
{
...
...
@@ -174,6 +206,8 @@ public:
Uint32
m_eventId
;
Uint32
m_oid
;
bool
m_separateEvents
;
EventBufData
*
m_data_item
;
void
*
m_custom_data
;
...
...
@@ -212,7 +246,6 @@ public:
void
add_op
();
void
remove_op
();
void
init_gci_containers
();
Uint32
m_active_op_count
;
// accessed from the "receive thread"
int
insertDataL
(
NdbEventOperationImpl
*
op
,
...
...
@@ -233,10 +266,15 @@ public:
NdbEventOperationImpl
*
move_data
();
// used by both user thread and receive thread
int
copy_data_alloc
(
const
SubTableData
*
const
f_sdata
,
LinearSectionPtr
f_ptr
[
3
],
EventBufData
*
ev_buf
);
// routines to copy/merge events
EventBufData
*
alloc_data
();
int
alloc_mem
(
EventBufData
*
data
,
LinearSectionPtr
ptr
[
3
]);
int
copy_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
);
int
merge_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
);
void
free_list
(
EventBufData_list
&
list
);
...
...
@@ -290,6 +328,8 @@ private:
// dropped event operations that have not yet
// been deleted
NdbEventOperationImpl
*
m_dropped_ev_op
;
Uint32
m_active_op_count
;
};
inline
...
...
storage/ndb/test/ndbapi/test_event.cpp
View file @
d679b903
...
...
@@ -169,6 +169,7 @@ eventOperation(Ndb* pNdb, const NdbDictionary::Table &tab, void* pstats, int rec
g_err
<<
function
<<
"Event operation creation failed
\n
"
;
return
NDBT_FAILED
;
}
pOp
->
separateEvents
(
true
);
g_info
<<
function
<<
"get values
\n
"
;
NdbRecAttr
*
recAttr
[
1024
];
...
...
@@ -380,6 +381,7 @@ int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step)
g_err
<<
"Event operation creation failed
\n
"
;
return
NDBT_FAILED
;
}
pOp
->
separateEvents
(
true
);
g_info
<<
"dropping event operation"
<<
endl
;
int
res
=
pNdb
->
dropEventOperation
(
pOp
);
...
...
@@ -550,6 +552,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
g_err
<<
"Event operation creation failed on %s"
<<
buf
<<
endl
;
DBUG_RETURN
(
NDBT_FAILED
);
}
pOp
->
separateEvents
(
true
);
int
i
;
int
n_columns
=
table
->
getNoOfColumns
();
...
...
@@ -1195,6 +1198,7 @@ static int createEventOperations(Ndb * ndb)
{
DBUG_RETURN
(
NDBT_FAILED
);
}
pOp
->
separateEvents
(
true
);
int
n_columns
=
pTabs
[
i
]
->
getNoOfColumns
();
for
(
int
j
=
0
;
j
<
n_columns
;
j
++
)
...
...
storage/ndb/test/ndbapi/test_event_merge.cpp
View file @
d679b903
...
...
@@ -473,9 +473,9 @@ struct Op { // single or composite
Kind
kind
;
Type
type
;
Op
*
next_op
;
// within one commit
Op
*
next_com
;
// next commit chain
or next event
Op
*
next_com
;
// next commit chain
Op
*
next_gci
;
// groups commit chains (unless --separate-events)
Op
*
next_ev
;
Op
*
next_ev
;
// next event
Op
*
next_free
;
// free list
bool
free
;
// on free list
uint
num_op
;
...
...
@@ -564,6 +564,8 @@ static NdbRecAttr* g_ev_ra[2][g_maxcol]; // 0-post 1-pre
static
NdbBlob
*
g_ev_bh
[
2
][
g_maxcol
];
// 0-post 1-pre
static
Op
*
g_rec_ev
;
static
uint
g_ev_pos
[
g_maxpk
];
static
uint
g_num_gci
=
0
;
static
uint
g_num_ev
=
0
;
static
Op
*
getop
(
Op
::
Kind
a_kind
)
...
...
@@ -651,6 +653,7 @@ resetmem()
}
}
assert
(
g_usedops
==
0
);
g_num_gci
=
g_num_ev
=
0
;
}
struct
Comp
{
...
...
@@ -877,9 +880,8 @@ createeventop()
chkdb
((
g_evt_op
=
g_ndb
->
createEventOperation
(
g_evt
->
getName
(),
bsz
))
!=
0
);
#else
chkdb
((
g_evt_op
=
g_ndb
->
createEventOperation
(
g_evt
->
getName
()))
!=
0
);
#ifdef version51rbr
// available in gci merge changeset
g_evt_op
->
separateEvents
(
g_opts
.
separate_events
);
// not yet inherited
#endif
#endif
uint
i
;
for
(
i
=
0
;
i
<
ncol
();
i
++
)
{
...
...
@@ -1203,8 +1205,9 @@ makeops()
// copy to gci level
copyop
(
com_op
,
gci_op
);
tot_op
->
num_com
+=
1
;
g_num_gci
+=
1
;
}
ll1
(
"makeops: used ops = "
<<
g_usedops
);
ll1
(
"makeops: used ops = "
<<
g_usedops
<<
" com ops = "
<<
g_num_gci
);
}
static
int
...
...
@@ -1341,12 +1344,13 @@ mergeops()
gci_op2
=
gci_op2
->
next_gci
;
freeop
(
tmp_op
);
mergecnt
++
;
assert
(
g_num_gci
!=
0
);
g_num_gci
--
;
}
gci_op
=
gci_op
->
next_gci
=
gci_op2
;
}
}
ll1
(
"mergeops: used ops = "
<<
g_usedops
);
ll1
(
"mergeops: merged "
<<
mergecnt
<<
" gci entries"
);
ll1
(
"mergeops: used ops = "
<<
g_usedops
<<
" gci ops = "
<<
g_num_gci
);
return
0
;
}
...
...
@@ -1504,27 +1508,37 @@ matchevents()
static
int
matchops
()
{
ll1
(
"matchops"
);
uint
nomatch
=
0
;
Uint32
pk1
;
for
(
pk1
=
0
;
pk1
<
g_opts
.
maxpk
;
pk1
++
)
{
Op
*
tot_op
=
g_pk_op
[
pk1
];
if
(
tot_op
==
0
)
continue
;
Op
*
com_op
=
tot_op
->
next_com
;
while
(
com_op
!=
0
)
{
if
(
com_op
->
type
!=
Op
::
NUL
&&
!
com_op
->
match
)
{
Op
*
gci_op
=
tot_op
->
next_gci
;
while
(
gci_op
!=
0
)
{
if
(
gci_op
->
type
==
Op
::
NUL
)
{
ll2
(
"GCI: "
<<
*
gci_op
<<
" [skip NUL]"
);
}
else
if
(
gci_op
->
match
)
{
ll2
(
"GCI: "
<<
*
gci_op
<<
" [match OK]"
);
}
else
{
ll0
(
"GCI: "
<<
*
gci_op
);
Op
*
com_op
=
gci_op
->
next_com
;
assert
(
com_op
!=
0
);
ll0
(
"COM: "
<<
*
com_op
);
Op
*
op
=
com_op
->
next_op
;
assert
(
op
!=
0
);
while
(
op
!=
0
)
{
ll0
(
"
---
: "
<<
*
op
);
ll0
(
"
OP
: "
<<
*
op
);
op
=
op
->
next_op
;
}
ll0
(
"no matching event"
);
return
-
1
;
nomatch
++
;
}
com_op
=
com_op
->
next_com
;
gci_op
=
gci_op
->
next_gci
;
}
}
chkrc
(
nomatch
==
0
);
return
0
;
}
...
...
@@ -1619,9 +1633,10 @@ runevents()
Op
*
ev
=
getop
(
Op
::
EV
);
copyop
(
g_rec_ev
,
ev
);
last_ev
->
next_ev
=
ev
;
g_num_ev
++
;
}
}
ll1
(
"runevents: used ops = "
<<
g_usedops
);
ll1
(
"runevents: used ops = "
<<
g_usedops
<<
" events = "
<<
g_num_ev
);
return
0
;
}
...
...
@@ -1666,6 +1681,7 @@ runtest()
chkrc
(
mergeops
()
==
0
);
cmppostpre
();
chkrc
(
runevents
()
==
0
);
ll0
(
"counts: gci = "
<<
g_num_gci
<<
" ev = "
<<
g_num_ev
);
chkrc
(
matchevents
()
==
0
);
chkrc
(
matchops
()
==
0
);
chkrc
(
dropeventop
()
==
0
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment