Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
c68df5c1
Commit
c68df5c1
authored
Jan 19, 2006
by
pekka@mysql.com
Browse files
Options
Browse Files
Download
Plain Diff
Merge mysql.com:/space/pekka/ndb/version/my51
into mysql.com:/space/pekka/ndb/version/my51-rbr
parents
d9fb7c20
17cec85f
Changes
13
Show whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
1057 additions
and
224 deletions
+1057
-224
storage/ndb/include/ndbapi/NdbBlob.hpp
storage/ndb/include/ndbapi/NdbBlob.hpp
+37
-1
storage/ndb/include/ndbapi/NdbDictionary.hpp
storage/ndb/include/ndbapi/NdbDictionary.hpp
+19
-1
storage/ndb/include/ndbapi/NdbEventOperation.hpp
storage/ndb/include/ndbapi/NdbEventOperation.hpp
+8
-0
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
+3
-3
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
+117
-52
storage/ndb/src/ndbapi/NdbBlob.cpp
storage/ndb/src/ndbapi/NdbBlob.cpp
+227
-40
storage/ndb/src/ndbapi/NdbDictionary.cpp
storage/ndb/src/ndbapi/NdbDictionary.cpp
+5
-0
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+82
-8
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
+4
-1
storage/ndb/src/ndbapi/NdbEventOperation.cpp
storage/ndb/src/ndbapi/NdbEventOperation.cpp
+12
-0
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+429
-49
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
+37
-2
storage/ndb/test/ndbapi/test_event_merge.cpp
storage/ndb/test/ndbapi/test_event_merge.cpp
+77
-67
No files found.
storage/ndb/include/ndbapi/NdbBlob.hpp
View file @
c68df5c1
...
...
@@ -28,6 +28,7 @@ class NdbOperation;
class
NdbRecAttr
;
class
NdbTableImpl
;
class
NdbColumnImpl
;
class
NdbEventOperationImpl
;
/**
* @class NdbBlob
...
...
@@ -71,6 +72,10 @@ class NdbColumnImpl;
* writes. It avoids execute penalty if nothing is pending. It is not
* needed after execute (obviously) or after next scan result.
*
* NdbBlob also supports reading post or pre blob data from events. The
* handle can be read after next event on main table has been retrieved.
* The data is available immediately. See NdbEventOperation.
*
* NdbBlob methods return -1 on error and 0 on success, and use output
* parameters when necessary.
*
...
...
@@ -145,6 +150,12 @@ public:
* then the callback is invoked.
*/
int
setActiveHook
(
ActiveHook
*
activeHook
,
void
*
arg
);
/**
* Check if blob value is defined (NULL or not). Used as first call
* on event based blob. The argument is set to -1 for not defined.
* Unlike getNull() this does not cause error on the handle.
*/
int
getDefined
(
int
&
isNull
);
/**
* Check if blob is null.
*/
...
...
@@ -191,6 +202,11 @@ public:
* Get blob parts table name. Useful only to test programs.
*/
static
int
getBlobTableName
(
char
*
btname
,
Ndb
*
anNdb
,
const
char
*
tableName
,
const
char
*
columnName
);
/**
* Get blob event name. The blob event is created if the main event
* monitors the blob column. The name includes main event name.
*/
static
int
getBlobEventName
(
char
*
bename
,
Ndb
*
anNdb
,
const
char
*
eventName
,
const
char
*
columnName
);
/**
* Return error object. The error may be blob specific (below) or may
* be copied from a failed implicit operation.
...
...
@@ -217,17 +233,29 @@ private:
friend
class
NdbScanOperation
;
friend
class
NdbDictionaryImpl
;
friend
class
NdbResultSet
;
// atNextResult
friend
class
NdbEventBuffer
;
friend
class
NdbEventOperationImpl
;
#endif
// state
State
theState
;
void
setState
(
State
newState
);
// quick and dirty support for events (consider subclassing)
int
theEventBlobVersion
;
// -1=normal blob 0=post event 1=pre event
// define blob table
static
void
getBlobTableName
(
char
*
btname
,
const
NdbTableImpl
*
t
,
const
NdbColumnImpl
*
c
);
static
void
getBlobTable
(
NdbTableImpl
&
bt
,
const
NdbTableImpl
*
t
,
const
NdbColumnImpl
*
c
);
static
void
getBlobEventName
(
char
*
bename
,
const
NdbEventImpl
*
e
,
const
NdbColumnImpl
*
c
);
static
void
getBlobEvent
(
NdbEventImpl
&
be
,
const
NdbEventImpl
*
e
,
const
NdbColumnImpl
*
c
);
// ndb api stuff
Ndb
*
theNdb
;
NdbTransaction
*
theNdbCon
;
NdbOperation
*
theNdbOp
;
NdbEventOperationImpl
*
theEventOp
;
NdbEventOperationImpl
*
theBlobEventOp
;
NdbRecAttr
*
theBlobEventPkRecAttr
;
NdbRecAttr
*
theBlobEventDistRecAttr
;
NdbRecAttr
*
theBlobEventPartRecAttr
;
NdbRecAttr
*
theBlobEventDataRecAttr
;
const
NdbTableImpl
*
theTable
;
const
NdbTableImpl
*
theAccessTable
;
const
NdbTableImpl
*
theBlobTable
;
...
...
@@ -263,6 +291,8 @@ private:
Buf
theHeadInlineBuf
;
Buf
theHeadInlineCopyBuf
;
// for writeTuple
Buf
thePartBuf
;
Buf
theBlobEventDataBuf
;
Uint32
thePartNumber
;
// for event
Head
*
theHead
;
char
*
theInlineData
;
NdbRecAttr
*
theHeadInlineRecAttr
;
...
...
@@ -306,6 +336,8 @@ private:
int
readDataPrivate
(
char
*
buf
,
Uint32
&
bytes
);
int
writeDataPrivate
(
const
char
*
buf
,
Uint32
bytes
);
int
readParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
readTableParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
readEventParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
insertParts
(
const
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
updateParts
(
const
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
deleteParts
(
Uint32
part
,
Uint32
count
);
...
...
@@ -317,19 +349,23 @@ private:
int
invokeActiveHook
();
// blob handle maintenance
int
atPrepare
(
NdbTransaction
*
aCon
,
NdbOperation
*
anOp
,
const
NdbColumnImpl
*
aColumn
);
int
atPrepare
(
NdbEventOperationImpl
*
anOp
,
NdbEventOperationImpl
*
aBlobOp
,
const
NdbColumnImpl
*
aColumn
,
int
version
);
int
prepareColumn
();
int
preExecute
(
NdbTransaction
::
ExecType
anExecType
,
bool
&
batch
);
int
postExecute
(
NdbTransaction
::
ExecType
anExecType
);
int
preCommit
();
int
atNextResult
();
int
atNextEvent
();
// errors
void
setErrorCode
(
int
anErrorCode
,
bool
invalidFlag
=
true
);
void
setErrorCode
(
NdbOperation
*
anOp
,
bool
invalidFlag
=
true
);
void
setErrorCode
(
NdbTransaction
*
aCon
,
bool
invalidFlag
=
true
);
void
setErrorCode
(
NdbEventOperationImpl
*
anOp
,
bool
invalidFlag
=
true
);
#ifdef VM_TRACE
int
getOperationType
()
const
;
friend
class
NdbOut
&
operator
<<
(
NdbOut
&
,
const
NdbBlob
&
);
#endif
// list stuff
void
next
(
NdbBlob
*
obj
)
{
theNext
=
obj
;}
NdbBlob
*
next
()
{
return
theNext
;}
friend
struct
Ndb_free_list_t
<
NdbBlob
>
;
...
...
storage/ndb/include/ndbapi/NdbDictionary.hpp
View file @
c68df5c1
...
...
@@ -1124,7 +1124,7 @@ public:
_TE_NODE_FAILURE
=
10
,
_TE_SUBSCRIBE
=
11
,
_TE_UNSUBSCRIBE
=
12
,
_TE_NUL
=
13
// internal (INS o DEL within same GCI)
_TE_NUL
=
13
// internal (
e.g.
INS o DEL within same GCI)
};
#endif
/**
...
...
@@ -1261,6 +1261,24 @@ public:
*/
int
getNoOfEventColumns
()
const
;
/**
* The merge events flag is false by default. Setting it true
* implies that events are merged in following ways:
*
* - for given NdbEventOperation associated with this event,
* events on same PK within same GCI are merged into single event
*
* - a blob table event is created for each blob attribute
* and blob events are handled as part of main table events
*
* - blob post/pre data from the blob part events can be read
* via NdbBlob methods as a single value
*
* NOTE: Currently this flag is not inherited by NdbEventOperation
* and must be set on NdbEventOperation explicitly.
*/
void
mergeEvents
(
bool
flag
);
/**
* Get object status
*/
...
...
storage/ndb/include/ndbapi/NdbEventOperation.hpp
View file @
c68df5c1
...
...
@@ -150,6 +150,14 @@ public:
*/
NdbRecAttr
*
getPreValue
(
const
char
*
anAttrName
,
char
*
aValue
=
0
);
/**
* These methods replace getValue/getPreValue for blobs. Each
* method creates a blob handle NdbBlob. The handle supports only
* read operations. See NdbBlob.
*/
NdbBlob
*
getBlobHandle
(
const
char
*
anAttrName
);
NdbBlob
*
getPreBlobHandle
(
const
char
*
anAttrName
);
int
isOverrun
()
const
;
/**
...
...
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
View file @
c68df5c1
...
...
@@ -4,7 +4,7 @@ OBJS = ndbapi_event.o
CXX
=
g++
-g
CFLAGS
=
-c
-Wall
-fno-rtti
-fno-exceptions
CXXFLAGS
=
DEBUG
=
DEBUG
=
# -DVM_TRACE
LFLAGS
=
-Wall
TOP_SRCDIR
=
../../../..
INCLUDE_DIR
=
$(TOP_SRCDIR)
/storage/ndb/include
...
...
@@ -16,8 +16,8 @@ SYS_LIB =
$(TARGET)
:
$(OBJS)
$(CXX)
$(CXXFLAGS)
$(LFLAGS)
$(LIB_DIR)
$(OBJS)
-lndbclient
-lmysqlclient_r
-lmysys
-lmystrings
-lz
$(SYS_LIB)
-o
$(TARGET)
$(TARGET).o
:
$(SRCS)
$(CXX)
$(CFLAGS)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
-I
$(TOP_SRCDIR)
/include
$(SRCS)
$(TARGET).o
:
$(SRCS)
Makefile
$(CXX)
$(CFLAGS)
$(DEBUG)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
-I
$(TOP_SRCDIR)
/include
$(SRCS)
clean
:
rm
-f
*
.o
$(TARGET)
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
View file @
c68df5c1
...
...
@@ -54,26 +54,32 @@
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#ifdef VM_TRACE
#include <my_global.h>
#endif
#ifndef assert
#include <assert.h>
#endif
/**
*
* Assume that there is a table t0 which is being updated by
* Assume that there is a table which is being updated by
* another process (e.g. flexBench -l 0 -stdtables).
* We want to monitor what happens with column
s c0,c1,c2,c3
.
* We want to monitor what happens with column
values
.
*
*
or together with the mysql client;
*
Or using the mysql client:
*
* shell> mysql -u root
* mysql> create database TEST_DB;
* mysql> use TEST_DB;
* mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
* mysql> create table t0
* (c0 int, c1 int, c2 char(4), c3 char(4), c4 text,
* primary key(c0, c2)) engine ndb charset latin1;
*
* In another window start ndbapi_event, wait until properly started
*
insert into t0 values (1, 2, 'a', 'b');
insert into t0 values (3, 4, 'c', 'd');
insert into t0 values (1, 2, 'a', 'b'
, null
);
insert into t0 values (3, 4, 'c', 'd'
, null
);
update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
update t0 set c3 = 'f'; -- use scan
update t0 set c3 = 'F'; -- use scan update to 'same'
...
...
@@ -81,7 +87,18 @@
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
delete from t0;
*
insert ...; update ...; -- see events w/ same pk merged (if -m option)
delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU
update ...; update ...;
-- text requires -m flag
set @a = repeat('a',256); -- inline size
set @b = repeat('b',2000); -- part size
set @c = repeat('c',2000*30); -- 30 parts
-- update the text field using combinations of @a, @b, @c ...
* you should see the data popping up in the example window
*
*/
...
...
@@ -95,12 +112,18 @@ int myCreateEvent(Ndb* myNdb,
const
char
*
eventName
,
const
char
*
eventTableName
,
const
char
**
eventColumnName
,
const
int
noEventColumnName
);
const
int
noEventColumnName
,
bool
merge_events
);
int
main
(
int
argc
,
char
**
argv
)
{
ndb_init
();
bool
merge_events
=
argc
>
1
&&
strcmp
(
argv
[
1
],
"-m"
)
==
0
;
bool
merge_events
=
argc
>
1
&&
strchr
(
argv
[
1
],
'm'
)
!=
0
;
#ifdef VM_TRACE
bool
dbug
=
argc
>
1
&&
strchr
(
argv
[
1
],
'd'
)
!=
0
;
if
(
dbug
)
DBUG_PUSH
(
"d:t:"
);
if
(
dbug
)
putenv
(
"API_SIGNAL_LOG=-"
);
#endif
Ndb_cluster_connection
*
cluster_connection
=
new
Ndb_cluster_connection
();
// Object representing the cluster
...
...
@@ -134,12 +157,13 @@ int main(int argc, char** argv)
const
char
*
eventName
=
"CHNG_IN_t0"
;
const
char
*
eventTableName
=
"t0"
;
const
int
noEventColumnName
=
4
;
const
int
noEventColumnName
=
5
;
const
char
*
eventColumnName
[
noEventColumnName
]
=
{
"c0"
,
"c1"
,
"c2"
,
"c3"
"c3"
,
"c4"
};
// Create events
...
...
@@ -147,9 +171,14 @@ int main(int argc, char** argv)
eventName
,
eventTableName
,
eventColumnName
,
noEventColumnName
);
noEventColumnName
,
merge_events
);
int
j
=
0
;
// Normal values and blobs are unfortunately handled differently..
typedef
union
{
NdbRecAttr
*
ra
;
NdbBlob
*
bh
;
}
RA_BH
;
int
i
,
j
,
k
,
l
;
j
=
0
;
while
(
j
<
99
)
{
// Start "transaction" for handling events
...
...
@@ -160,12 +189,17 @@ int main(int argc, char** argv)
op
->
mergeEvents
(
merge_events
);
printf
(
"get values
\n
"
);
NdbRecAttr
*
recAttr
[
noEventColumnName
];
NdbRecAttr
*
recAttrPre
[
noEventColumnName
];
RA_BH
recAttr
[
noEventColumnName
];
RA_BH
recAttrPre
[
noEventColumnName
];
// primary keys should always be a part of the result
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
recAttr
[
i
]
=
op
->
getValue
(
eventColumnName
[
i
]);
recAttrPre
[
i
]
=
op
->
getPreValue
(
eventColumnName
[
i
]);
for
(
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
i
<
4
)
{
recAttr
[
i
].
ra
=
op
->
getValue
(
eventColumnName
[
i
]);
recAttrPre
[
i
].
ra
=
op
->
getPreValue
(
eventColumnName
[
i
]);
}
else
if
(
merge_events
)
{
recAttr
[
i
].
bh
=
op
->
getBlobHandle
(
eventColumnName
[
i
]);
recAttrPre
[
i
].
bh
=
op
->
getPreBlobHandle
(
eventColumnName
[
i
]);
}
}
// set up the callbacks
...
...
@@ -174,13 +208,16 @@ int main(int argc, char** argv)
if
(
op
->
execute
())
APIERROR
(
op
->
getNdbError
());
int
i
=
0
;
while
(
i
<
40
)
{
NdbEventOperation
*
the_op
=
op
;
i
=
0
;
while
(
i
<
40
)
{
// printf("now waiting for event...\n");
int
r
=
myNdb
->
pollEvents
(
1000
);
// wait for event or 1000 ms
int
r
=
myNdb
->
pollEvents
(
1000
);
// wait for event or 1000 ms
if
(
r
>
0
)
{
// printf("got data! %d\n", r);
while
((
op
=
myNdb
->
nextEvent
()))
{
assert
(
the_op
==
op
);
i
++
;
switch
(
op
->
getEventType
())
{
case
NdbDictionary
:
:
Event
::
TE_INSERT
:
...
...
@@ -195,40 +232,66 @@ int main(int argc, char** argv)
default:
abort
();
// should not happen
}
printf
(
" gci=%d
\n
"
,
op
->
getGCI
());
printf
(
"post: "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttr
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
if
(
recAttr
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttr
[
i
]
->
u_32_value
());
printf
(
" gci=%d
\n
"
,
(
int
)
op
->
getGCI
());
for
(
k
=
0
;
k
<=
1
;
k
++
)
{
printf
(
k
==
0
?
"post: "
:
"pre : "
);
for
(
l
=
0
;
l
<
noEventColumnName
;
l
++
)
{
if
(
l
<
4
)
{
NdbRecAttr
*
ra
=
k
==
0
?
recAttr
[
l
].
ra
:
recAttrPre
[
l
].
ra
;
if
(
ra
->
isNULL
()
>=
0
)
{
// we have a value
if
(
ra
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
l
<
2
)
printf
(
"%-5u"
,
ra
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttr
[
i
]
->
aRef
());
}
else
// we have a null valu
e
printf
(
"%-5.4s"
,
ra
->
aRef
());
}
els
e
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
printf
(
"%-5s"
,
"-"
);
// no value
}
else
if
(
merge_events
)
{
int
isNull
;
NdbBlob
*
bh
=
k
==
0
?
recAttr
[
l
].
bh
:
recAttrPre
[
l
].
bh
;
bh
->
getDefined
(
isNull
);
if
(
isNull
>=
0
)
{
// we have a value
if
(
!
isNull
)
{
// we have a non-null value
Uint64
length
=
0
;
bh
->
getLength
(
length
);
// read into buffer
unsigned
char
*
buf
=
new
unsigned
char
[
length
];
memset
(
buf
,
'X'
,
length
);
Uint32
n
=
length
;
bh
->
readData
(
buf
,
n
);
// n is in/out
assert
(
n
==
length
);
// pretty-print
bool
first
=
true
;
Uint32
i
=
0
;
while
(
i
<
n
)
{
unsigned
char
c
=
buf
[
i
++
];
Uint32
m
=
1
;
while
(
i
<
n
&&
buf
[
i
]
==
c
)
i
++
,
m
++
;
if
(
!
first
)
printf
(
"+"
);
printf
(
"%u%c"
,
m
,
c
);
first
=
false
;
}
printf
(
"
\n
pre : "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttrPre
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
if
(
recAttrPre
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttrPre
[
i
]
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttrPre
[
i
]
->
aRef
());
}
else
// we have a null value
printf
(
"[%u]"
,
n
);
delete
[]
buf
;
}
else
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
printf
(
"%-5s"
,
"-"
);
// no value
}
}
printf
(
"
\n
"
);
}
}
}
else
;
//printf("timed out\n");
}
// don't want to listen to events anymore
if
(
myNdb
->
dropEventOperation
(
op
))
APIERROR
(
myNdb
->
getNdbError
());
if
(
myNdb
->
dropEventOperation
(
the_op
))
APIERROR
(
myNdb
->
getNdbError
());
the_op
=
0
;
j
++
;
}
...
...
@@ -250,7 +313,8 @@ int myCreateEvent(Ndb* myNdb,
const
char
*
eventName
,
const
char
*
eventTableName
,
const
char
**
eventColumnNames
,
const
int
noEventColumnNames
)
const
int
noEventColumnNames
,
bool
merge_events
)
{
NdbDictionary
::
Dictionary
*
myDict
=
myNdb
->
getDictionary
();
if
(
!
myDict
)
APIERROR
(
myNdb
->
getNdbError
());
...
...
@@ -265,6 +329,7 @@ int myCreateEvent(Ndb* myNdb,
// myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
myEvent
.
addEventColumns
(
noEventColumnNames
,
eventColumnNames
);
myEvent
.
mergeEvents
(
merge_events
);
// Add event to database
if
(
myDict
->
createEvent
(
myEvent
)
==
0
)
...
...
storage/ndb/src/ndbapi/NdbBlob.cpp
View file @
c68df5c1
...
...
@@ -23,6 +23,7 @@
#include <NdbBlob.hpp>
#include "NdbBlobImpl.hpp"
#include <NdbScanOperation.hpp>
#include <NdbEventOperationImpl.hpp>
/*
* Reading index table directly (as a table) is faster but there are
...
...
@@ -147,6 +148,61 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm
DBUG_VOID_RETURN
;
}
int
NdbBlob
::
getBlobEventName
(
char
*
bename
,
Ndb
*
anNdb
,
const
char
*
eventName
,
const
char
*
columnName
)
{
NdbEventImpl
*
e
=
anNdb
->
theDictionary
->
m_impl
.
getEvent
(
eventName
);
if
(
e
==
NULL
)
return
-
1
;
NdbColumnImpl
*
c
=
e
->
m_tableImpl
->
getColumn
(
columnName
);
if
(
c
==
NULL
)
return
-
1
;
getBlobEventName
(
bename
,
e
,
c
);
return
0
;
}
void
NdbBlob
::
getBlobEventName
(
char
*
bename
,
const
NdbEventImpl
*
e
,
const
NdbColumnImpl
*
c
)
{
// XXX events should have object id
snprintf
(
bename
,
MAX_TAB_NAME_SIZE
,
"NDB$BLOBEVENT_%s_%d"
,
e
->
m_name
.
c_str
(),
(
int
)
c
->
m_column_no
);
}
void
NdbBlob
::
getBlobEvent
(
NdbEventImpl
&
be
,
const
NdbEventImpl
*
e
,
const
NdbColumnImpl
*
c
)
{
DBUG_ENTER
(
"NdbBlob::getBlobEvent"
);
// blob table
assert
(
c
->
m_blobTable
!=
NULL
);
const
NdbTableImpl
&
bt
=
*
c
->
m_blobTable
;
// blob event name
char
bename
[
NdbBlobImpl
::
BlobTableNameSize
];
getBlobEventName
(
bename
,
e
,
c
);
be
.
setName
(
bename
);
be
.
setTable
(
bt
);
// simple assigments
be
.
mi_type
=
e
->
mi_type
;
be
.
m_dur
=
e
->
m_dur
;
be
.
m_mergeEvents
=
e
->
m_mergeEvents
;
// report unchanged data
// not really needed now since UPD is DEL o INS and we subscribe to all
be
.
setReport
(
NdbDictionary
::
Event
::
ER_ALL
);
// columns PK - DIST - PART - DATA
{
const
NdbColumnImpl
*
bc
=
bt
.
getColumn
((
Uint32
)
0
);
be
.
addColumn
(
*
bc
);
}
{
const
NdbColumnImpl
*
bc
=
bt
.
getColumn
((
Uint32
)
1
);
be
.
addColumn
(
*
bc
);
}
{
const
NdbColumnImpl
*
bc
=
bt
.
getColumn
((
Uint32
)
2
);
be
.
addColumn
(
*
bc
);
}
{
const
NdbColumnImpl
*
bc
=
bt
.
getColumn
((
Uint32
)
3
);
be
.
addColumn
(
*
bc
);
}
DBUG_VOID_RETURN
;
}
// initialization
NdbBlob
::
NdbBlob
(
Ndb
*
)
...
...
@@ -158,9 +214,16 @@ void
NdbBlob
::
init
()
{
theState
=
Idle
;
theEventBlobVersion
=
-
1
;
theNdb
=
NULL
;
theNdbCon
=
NULL
;
theNdbOp
=
NULL
;
theEventOp
=
NULL
;
theBlobEventOp
=
NULL
;
theBlobEventPkRecAttr
=
NULL
;
theBlobEventDistRecAttr
=
NULL
;
theBlobEventPartRecAttr
=
NULL
;
theBlobEventDataRecAttr
=
NULL
;
theTable
=
NULL
;
theAccessTable
=
NULL
;
theBlobTable
=
NULL
;
...
...
@@ -439,7 +502,7 @@ NdbBlob::getHeadFromRecAttr()
DBUG_ENTER
(
"NdbBlob::getHeadFromRecAttr"
);
assert
(
theHeadInlineRecAttr
!=
NULL
);
theNullFlag
=
theHeadInlineRecAttr
->
isNULL
();
assert
(
theNullFlag
!=
-
1
);
assert
(
the
EventBlobVersion
>=
0
||
the
NullFlag
!=
-
1
);
theLength
=
!
theNullFlag
?
theHead
->
length
:
0
;
DBUG_VOID_RETURN
;
}
...
...
@@ -543,6 +606,18 @@ NdbBlob::setActiveHook(ActiveHook activeHook, void* arg)
// misc operations
int
NdbBlob
::
getDefined
(
int
&
isNull
)
{
DBUG_ENTER
(
"NdbBlob::getDefined"
);
if
(
theState
==
Prepared
&&
theSetFlag
)
{
isNull
=
(
theSetBuf
==
NULL
);
DBUG_RETURN
(
0
);
}
isNull
=
theNullFlag
;
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
getNull
(
bool
&
isNull
)
{
...
...
@@ -887,6 +962,18 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
{
DBUG_ENTER
(
"NdbBlob::readParts"
);
DBUG_PRINT
(
"info"
,
(
"part=%u count=%u"
,
part
,
count
));
int
ret
;
if
(
theEventBlobVersion
==
-
1
)
ret
=
readTableParts
(
buf
,
part
,
count
);
else
ret
=
readEventParts
(
buf
,
part
,
count
);
DBUG_RETURN
(
ret
);
}
int
NdbBlob
::
readTableParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
)
{
DBUG_ENTER
(
"NdbBlob::readTableParts"
);
Uint32
n
=
0
;
while
(
n
<
count
)
{
NdbOperation
*
tOp
=
theNdbCon
->
getNdbOperation
(
theBlobTable
);
...
...
@@ -906,6 +993,18 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
readEventParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
)
{
DBUG_ENTER
(
"NdbBlob::readEventParts"
);
int
ret
=
theEventOp
->
readBlobParts
(
buf
,
this
,
part
,
count
);
if
(
ret
!=
0
)
{
setErrorCode
(
theEventOp
);
DBUG_RETURN
(
-
1
);
}
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
insertParts
(
const
char
*
buf
,
Uint32
part
,
Uint32
count
)
{
...
...
@@ -1094,48 +1193,12 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
theTable
=
anOp
->
m_currentTable
;
theAccessTable
=
anOp
->
m_accessTable
;
theColumn
=
aColumn
;
NdbDictionary
::
Column
::
Type
partType
=
NdbDictionary
::
Column
::
Undefined
;
switch
(
theColumn
->
getType
())
{
case
NdbDictionary
:
:
Column
::
Blob
:
partType
=
NdbDictionary
::
Column
::
Binary
;
theFillChar
=
0x0
;
break
;
case
NdbDictionary
:
:
Column
::
Text
:
partType
=
NdbDictionary
::
Column
::
Char
;
theFillChar
=
0x20
;
break
;
default:
setErrorCode
(
NdbBlobImpl
::
ErrUsage
);
DBUG_RETURN
(
-
1
);
}
// sizes
theInlineSize
=
theColumn
->
getInlineSize
();
thePartSize
=
theColumn
->
getPartSize
();
theStripeSize
=
theColumn
->
getStripeSize
();
// sanity check
assert
((
NDB_BLOB_HEAD_SIZE
<<
2
)
==
sizeof
(
Head
));
assert
(
theColumn
->
m_attrSize
*
theColumn
->
m_arraySize
==
sizeof
(
Head
)
+
theInlineSize
);
if
(
thePartSize
>
0
)
{
const
NdbDictionary
::
Table
*
bt
=
NULL
;
const
NdbDictionary
::
Column
*
bc
=
NULL
;
if
(
theStripeSize
==
0
||
(
bt
=
theColumn
->
getBlobTable
())
==
NULL
||
(
bc
=
bt
->
getColumn
(
"DATA"
))
==
NULL
||
bc
->
getType
()
!=
partType
||
bc
->
getLength
()
!=
(
int
)
thePartSize
)
{
setErrorCode
(
NdbBlobImpl
::
ErrTable
);
// prepare blob column and table
if
(
prepareColumn
()
==
-
1
)
DBUG_RETURN
(
-
1
);
}
theBlobTable
=
&
NdbTableImpl
::
getImpl
(
*
bt
);
}
// buffers
theKeyBuf
.
alloc
(
theTable
->
m_keyLenInWords
<<
2
);
// extra buffers
theAccessKeyBuf
.
alloc
(
theAccessTable
->
m_keyLenInWords
<<
2
);
theHeadInlineBuf
.
alloc
(
sizeof
(
Head
)
+
theInlineSize
);
theHeadInlineCopyBuf
.
alloc
(
sizeof
(
Head
)
+
theInlineSize
);
thePartBuf
.
alloc
(
thePartSize
);
theHead
=
(
Head
*
)
theHeadInlineBuf
.
data
;
theInlineData
=
theHeadInlineBuf
.
data
+
sizeof
(
Head
);
// handle different operation types
bool
supportedOp
=
false
;
if
(
isKeyOp
())
{
...
...
@@ -1189,6 +1252,99 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
atPrepare
(
NdbEventOperationImpl
*
anOp
,
NdbEventOperationImpl
*
aBlobOp
,
const
NdbColumnImpl
*
aColumn
,
int
version
)
{
DBUG_ENTER
(
"NdbBlob::atPrepare [event]"
);
DBUG_PRINT
(
"info"
,
(
"this=%p op=%p"
,
this
,
anOp
));
assert
(
theState
==
Idle
);
assert
(
version
==
0
||
version
==
1
);
theEventBlobVersion
=
version
;
// ndb api stuff
theNdb
=
anOp
->
m_ndb
;
theEventOp
=
anOp
;
theBlobEventOp
=
aBlobOp
;
theTable
=
anOp
->
m_eventImpl
->
m_tableImpl
;
theColumn
=
aColumn
;
// prepare blob column and table
if
(
prepareColumn
()
==
-
1
)
DBUG_RETURN
(
-
1
);
// extra buffers
theBlobEventDataBuf
.
alloc
(
thePartSize
);
// prepare receive of head+inline
theHeadInlineRecAttr
=
theEventOp
->
getValue
(
aColumn
,
theHeadInlineBuf
.
data
,
version
);
if
(
theHeadInlineRecAttr
==
NULL
)
{
setErrorCode
(
theEventOp
);
DBUG_RETURN
(
-
1
);
}
// prepare receive of blob part
if
((
theBlobEventPkRecAttr
=
theBlobEventOp
->
getValue
(
theBlobTable
->
getColumn
((
Uint32
)
0
),
theKeyBuf
.
data
,
version
))
==
NULL
||
(
theBlobEventDistRecAttr
=
theBlobEventOp
->
getValue
(
theBlobTable
->
getColumn
((
Uint32
)
1
),
(
char
*
)
0
,
version
))
==
NULL
||
(
theBlobEventPartRecAttr
=
theBlobEventOp
->
getValue
(
theBlobTable
->
getColumn
((
Uint32
)
2
),
(
char
*
)
&
thePartNumber
,
version
))
==
NULL
||
(
theBlobEventDataRecAttr
=
theBlobEventOp
->
getValue
(
theBlobTable
->
getColumn
((
Uint32
)
3
),
theBlobEventDataBuf
.
data
,
version
))
==
NULL
)
{
setErrorCode
(
theBlobEventOp
);
DBUG_RETURN
(
-
1
);
}
setState
(
Prepared
);
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
prepareColumn
()
{
DBUG_ENTER
(
"prepareColumn"
);
NdbDictionary
::
Column
::
Type
partType
=
NdbDictionary
::
Column
::
Undefined
;
switch
(
theColumn
->
getType
())
{
case
NdbDictionary
:
:
Column
::
Blob
:
partType
=
NdbDictionary
::
Column
::
Binary
;
theFillChar
=
0x0
;
break
;
case
NdbDictionary
:
:
Column
::
Text
:
partType
=
NdbDictionary
::
Column
::
Char
;
theFillChar
=
0x20
;
break
;
default:
setErrorCode
(
NdbBlobImpl
::
ErrUsage
);
DBUG_RETURN
(
-
1
);
}
// sizes
theInlineSize
=
theColumn
->
getInlineSize
();
thePartSize
=
theColumn
->
getPartSize
();
theStripeSize
=
theColumn
->
getStripeSize
();
// sanity check
assert
((
NDB_BLOB_HEAD_SIZE
<<
2
)
==
sizeof
(
Head
));
assert
(
theColumn
->
m_attrSize
*
theColumn
->
m_arraySize
==
sizeof
(
Head
)
+
theInlineSize
);
if
(
thePartSize
>
0
)
{
const
NdbDictionary
::
Table
*
bt
=
NULL
;
const
NdbDictionary
::
Column
*
bc
=
NULL
;
if
(
theStripeSize
==
0
||
(
bt
=
theColumn
->
getBlobTable
())
==
NULL
||
(
bc
=
bt
->
getColumn
(
"DATA"
))
==
NULL
||
bc
->
getType
()
!=
partType
||
bc
->
getLength
()
!=
(
int
)
thePartSize
)
{
setErrorCode
(
NdbBlobImpl
::
ErrTable
);
DBUG_RETURN
(
-
1
);
}
// blob table
theBlobTable
=
&
NdbTableImpl
::
getImpl
(
*
bt
);
}
// these buffers are always used
theKeyBuf
.
alloc
(
theTable
->
m_keyLenInWords
<<
2
);
theHeadInlineBuf
.
alloc
(
sizeof
(
Head
)
+
theInlineSize
);
theHead
=
(
Head
*
)
theHeadInlineBuf
.
data
;
theInlineData
=
theHeadInlineBuf
.
data
+
sizeof
(
Head
);
thePartBuf
.
alloc
(
thePartSize
);
DBUG_RETURN
(
0
);
}
/*
* Before execute of prepared operation. May add new operations before
* this one. May ask that this operation and all before it (a "batch")
...
...
@@ -1537,6 +1693,26 @@ NdbBlob::atNextResult()
DBUG_RETURN
(
0
);
}
/*
* After next event on main table.
*/
int
NdbBlob
::
atNextEvent
()
{
DBUG_ENTER
(
"NdbBlob::atNextEvent"
);
DBUG_PRINT
(
"info"
,
(
"this=%p op=%p blob op=%p version=%d"
,
this
,
theEventOp
,
theBlobEventOp
,
theEventBlobVersion
));
if
(
theState
==
Invalid
)
DBUG_RETURN
(
-
1
);
assert
(
theEventBlobVersion
>=
0
);
getHeadFromRecAttr
();
if
(
theNullFlag
==
-
1
)
// value not defined
DBUG_RETURN
(
0
);
if
(
setPos
(
0
)
==
-
1
)
DBUG_RETURN
(
-
1
);
setState
(
Active
);
DBUG_RETURN
(
0
);
}
// misc
const
NdbDictionary
::
Column
*
...
...
@@ -1589,6 +1765,17 @@ NdbBlob::setErrorCode(NdbTransaction* aCon, bool invalidFlag)
setErrorCode
(
code
,
invalidFlag
);
}
void
NdbBlob
::
setErrorCode
(
NdbEventOperationImpl
*
anOp
,
bool
invalidFlag
)
{
int
code
=
0
;
if
((
code
=
anOp
->
m_error
.
code
)
!=
0
)
;
else
code
=
NdbBlobImpl
::
ErrUnknown
;
setErrorCode
(
code
,
invalidFlag
);
}
// info about all blobs in this operation
NdbBlob
*
...
...
storage/ndb/src/ndbapi/NdbDictionary.cpp
View file @
c68df5c1
...
...
@@ -901,6 +901,11 @@ int NdbDictionary::Event::getNoOfEventColumns() const
return
m_impl
.
getNoOfEventColumns
();
}
void
NdbDictionary
::
Event
::
mergeEvents
(
bool
flag
)
{
m_impl
.
m_mergeEvents
=
flag
;
}
NdbDictionary
::
Object
::
Status
NdbDictionary
::
Event
::
getObjectStatus
()
const
{
...
...
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
View file @
c68df5c1
...
...
@@ -1072,6 +1072,7 @@ void NdbEventImpl::init()
m_tableId
=
RNIL
;
mi_type
=
0
;
m_dur
=
NdbDictionary
::
Event
::
ED_UNDEFINED
;
m_mergeEvents
=
false
;
m_tableImpl
=
NULL
;
m_rep
=
NdbDictionary
::
Event
::
ER_UPDATED
;
}
...
...
@@ -2036,7 +2037,7 @@ int
NdbDictionaryImpl
::
addBlobTables
(
NdbTableImpl
&
t
)
{
unsigned
n
=
t
.
m_noOfBlobs
;
DBUG_ENTER
(
"NdbDictio
an
ryImpl::addBlobTables"
);
DBUG_ENTER
(
"NdbDictio
na
ryImpl::addBlobTables"
);
// optimized for blob column being the last one
// and not looking for more than one if not neccessary
for
(
unsigned
i
=
t
.
m_columns
.
size
();
i
>
0
&&
n
>
0
;)
{
...
...
@@ -3151,7 +3152,37 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
#endif
// NdbDictInterface m_receiver;
DBUG_RETURN
(
m_receiver
.
createEvent
(
m_ndb
,
evnt
,
0
/* getFlag unset */
));
if
(
m_receiver
.
createEvent
(
m_ndb
,
evnt
,
0
/* getFlag unset */
)
!=
0
)
DBUG_RETURN
(
-
1
);
// Create blob events
if
(
evnt
.
m_mergeEvents
&&
createBlobEvents
(
evnt
)
!=
0
)
{
int
save_code
=
m_error
.
code
;
(
void
)
dropEvent
(
evnt
.
m_name
.
c_str
());
m_error
.
code
=
save_code
;
DBUG_RETURN
(
-
1
);
}
DBUG_RETURN
(
0
);
}
int
NdbDictionaryImpl
::
createBlobEvents
(
NdbEventImpl
&
evnt
)
{
DBUG_ENTER
(
"NdbDictionaryImpl::createBlobEvents"
);
NdbTableImpl
&
t
=
*
evnt
.
m_tableImpl
;
Uint32
n
=
t
.
m_noOfBlobs
;
Uint32
i
;
for
(
i
=
0
;
i
<
evnt
.
m_columns
.
size
()
&&
n
>
0
;
i
++
)
{
NdbColumnImpl
&
c
=
*
evnt
.
m_columns
[
i
];
if
(
!
c
.
getBlobType
()
||
c
.
getPartSize
()
==
0
)
continue
;
n
--
;
NdbEventImpl
blob_evnt
;
NdbBlob
::
getBlobEvent
(
blob_evnt
,
&
evnt
,
&
c
);
if
(
createEvent
(
blob_evnt
)
!=
0
)
DBUG_RETURN
(
-
1
);
}
DBUG_RETURN
(
0
);
}
int
...
...
@@ -3400,6 +3431,7 @@ NdbDictionaryImpl::getEvent(const char * eventName)
if
(
attributeList_sz
>
table
.
getNoOfColumns
()
)
{
m_error
.
code
=
241
;
DBUG_PRINT
(
"error"
,(
"Invalid version, too many columns"
));
delete
ev
;
DBUG_RETURN
(
NULL
);
...
...
@@ -3409,6 +3441,7 @@ NdbDictionaryImpl::getEvent(const char * eventName)
for
(
unsigned
id
=
0
;
ev
->
m_columns
.
size
()
<
attributeList_sz
;
id
++
)
{
if
(
id
>=
table
.
getNoOfColumns
())
{
m_error
.
code
=
241
;
DBUG_PRINT
(
"error"
,(
"Invalid version, column %d out of range"
,
id
));
delete
ev
;
DBUG_RETURN
(
NULL
);
...
...
@@ -3566,13 +3599,54 @@ NdbDictInterface::execSUB_START_REF(NdbApiSignal * signal,
int
NdbDictionaryImpl
::
dropEvent
(
const
char
*
eventName
)
{
NdbEventImpl
*
ev
=
new
NdbEventImpl
();
ev
->
setName
(
eventName
);
int
ret
=
m_receiver
.
dropEvent
(
*
ev
);
delete
ev
;
DBUG_ENTER
(
"NdbDictionaryImpl::dropEvent"
);
DBUG_PRINT
(
"info"
,
(
"name=%s"
,
eventName
));
// printf("__________________RET %u\n", ret);
return
ret
;
NdbEventImpl
*
evnt
=
getEvent
(
eventName
);
// allocated
if
(
evnt
==
NULL
)
{
if
(
m_error
.
code
!=
723
&&
// no such table
m_error
.
code
!=
241
)
// invalid table
DBUG_RETURN
(
-
1
);
DBUG_PRINT
(
"info"
,
(
"no table, drop by name alone"
));
evnt
=
new
NdbEventImpl
();
evnt
->
setName
(
eventName
);
}
int
ret
=
dropEvent
(
*
evnt
);
delete
evnt
;
DBUG_RETURN
(
ret
);
}
int
NdbDictionaryImpl
::
dropEvent
(
const
NdbEventImpl
&
evnt
)
{
if
(
dropBlobEvents
(
evnt
)
!=
0
)
return
-
1
;
if
(
m_receiver
.
dropEvent
(
evnt
)
!=
0
)
return
-
1
;
return
0
;
}
int
NdbDictionaryImpl
::
dropBlobEvents
(
const
NdbEventImpl
&
evnt
)
{
DBUG_ENTER
(
"NdbDictionaryImpl::dropBlobEvents"
);
if
(
evnt
.
m_tableImpl
!=
0
)
{
const
NdbTableImpl
&
t
=
*
evnt
.
m_tableImpl
;
Uint32
n
=
t
.
m_noOfBlobs
;
Uint32
i
;
for
(
i
=
0
;
i
<
evnt
.
m_columns
.
size
()
&&
n
>
0
;
i
++
)
{
const
NdbColumnImpl
&
c
=
*
evnt
.
m_columns
[
i
];
if
(
!
c
.
getBlobType
()
||
c
.
getPartSize
()
==
0
)
continue
;
n
--
;
char
bename
[
MAX_TAB_NAME_SIZE
];
NdbBlob
::
getBlobEventName
(
bename
,
&
evnt
,
&
c
);
(
void
)
dropEvent
(
bename
);
}
}
else
{
// could loop over MAX_ATTRIBUTES_IN_TABLE ...
}
DBUG_RETURN
(
0
);
}
int
...
...
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
View file @
c68df5c1
...
...
@@ -277,7 +277,6 @@ public:
NdbDictionary
::
Event
::
EventDurability
getDurability
()
const
;
void
setReport
(
NdbDictionary
::
Event
::
EventReport
r
);
NdbDictionary
::
Event
::
EventReport
getReport
()
const
;
void
addEventColumn
(
const
NdbColumnImpl
&
c
);
int
getNoOfEventColumns
()
const
;
void
print
()
{
...
...
@@ -295,6 +294,7 @@ public:
Uint32
mi_type
;
NdbDictionary
::
Event
::
EventDurability
m_dur
;
NdbDictionary
::
Event
::
EventReport
m_rep
;
bool
m_mergeEvents
;
NdbTableImpl
*
m_tableImpl
;
BaseString
m_tableName
;
...
...
@@ -547,7 +547,10 @@ public:
NdbTableImpl
*
table
);
int
createEvent
(
NdbEventImpl
&
);
int
createBlobEvents
(
NdbEventImpl
&
);
int
dropEvent
(
const
char
*
eventName
);
int
dropEvent
(
const
NdbEventImpl
&
);
int
dropBlobEvents
(
const
NdbEventImpl
&
);
int
executeSubscribeEvent
(
NdbEventOperationImpl
&
);
int
stopSubscribeEvent
(
NdbEventOperationImpl
&
);
...
...
storage/ndb/src/ndbapi/NdbEventOperation.cpp
View file @
c68df5c1
...
...
@@ -55,6 +55,18 @@ NdbEventOperation::getPreValue(const char *colName, char *aValue)
return
m_impl
.
getValue
(
colName
,
aValue
,
1
);
}
NdbBlob
*
NdbEventOperation
::
getBlobHandle
(
const
char
*
colName
)
{
return
m_impl
.
getBlobHandle
(
colName
,
0
);
}
NdbBlob
*
NdbEventOperation
::
getPreBlobHandle
(
const
char
*
colName
)
{
return
m_impl
.
getBlobHandle
(
colName
,
1
);
}
int
NdbEventOperation
::
execute
()
{
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
View file @
c68df5c1
...
...
@@ -38,6 +38,7 @@
#include "DictCache.hpp"
#include <portlib/NdbMem.h>
#include <NdbRecAttr.hpp>
#include <NdbBlob.hpp>
#include <NdbEventOperation.hpp>
#include "NdbEventOperationImpl.hpp"
...
...
@@ -48,6 +49,20 @@ static Gci_container g_empty_gci_container;
static
const
Uint32
ACTIVE_GCI_DIRECTORY_SIZE
=
4
;
static
const
Uint32
ACTIVE_GCI_MASK
=
ACTIVE_GCI_DIRECTORY_SIZE
-
1
;
#ifdef VM_TRACE
static
void
print_std
(
const
SubTableData
*
sdata
,
LinearSectionPtr
ptr
[
3
])
{
printf
(
"addr=%p gci=%d op=%d
\n
"
,
(
void
*
)
sdata
,
sdata
->
gci
,
sdata
->
operation
);
for
(
int
i
=
0
;
i
<=
2
;
i
++
)
{
printf
(
"sec=%d addr=%p sz=%d
\n
"
,
i
,
(
void
*
)
ptr
[
i
].
p
,
ptr
[
i
].
sz
);
for
(
int
j
=
0
;
j
<
ptr
[
i
].
sz
;
j
++
)
printf
(
"%08x "
,
ptr
[
i
].
p
[
j
]);
printf
(
"
\n
"
);
}
}
#endif
/*
* Class NdbEventOperationImpl
*
...
...
@@ -60,7 +75,7 @@ static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
#define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
#define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
#define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
#define DBUG_DUMP_EVENT(A,B,C) DBUG_
S
UMP(A,B,C)
#define DBUG_DUMP_EVENT(A,B,C) DBUG_
D
UMP(A,B,C)
#else
#define DBUG_ENTER_EVENT(A)
#define DBUG_RETURN_EVENT(A) return(A)
...
...
@@ -92,6 +107,11 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
theCurrentDataAttrs
[
0
]
=
NULL
;
theFirstDataAttrs
[
1
]
=
NULL
;
theCurrentDataAttrs
[
1
]
=
NULL
;
theBlobList
=
NULL
;
theBlobOpList
=
NULL
;
theMainOp
=
NULL
;
m_data_item
=
NULL
;
m_eventImpl
=
NULL
;
...
...
@@ -117,7 +137,11 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
m_state
=
EO_CREATED
;
#ifdef ndb_event_stores_merge_events_flag
m_mergeEvents
=
m_eventImpl
->
m_mergeEvents
;
#else
m_mergeEvents
=
false
;
#endif
m_has_error
=
0
;
...
...
@@ -254,10 +278,183 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
DBUG_RETURN
(
tAttr
);
}
NdbBlob
*
NdbEventOperationImpl
::
getBlobHandle
(
const
char
*
colName
,
int
n
)
{
DBUG_ENTER
(
"NdbEventOperationImpl::getBlobHandle (colName)"
);
assert
(
m_mergeEvents
);
if
(
m_state
!=
EO_CREATED
)
{
ndbout_c
(
"NdbEventOperationImpl::getBlobHandle may only be called between "
"instantiation and execute()"
);
DBUG_RETURN
(
NULL
);
}
NdbColumnImpl
*
tAttrInfo
=
m_eventImpl
->
m_tableImpl
->
getColumn
(
colName
);
if
(
tAttrInfo
==
NULL
)
{
ndbout_c
(
"NdbEventOperationImpl::getBlobHandle attribute %s not found"
,
colName
);
DBUG_RETURN
(
NULL
);
}
NdbBlob
*
bh
=
getBlobHandle
(
tAttrInfo
,
n
);
DBUG_RETURN
(
bh
);
}
NdbBlob
*
NdbEventOperationImpl
::
getBlobHandle
(
const
NdbColumnImpl
*
tAttrInfo
,
int
n
)
{
DBUG_ENTER
(
"NdbEventOperationImpl::getBlobHandle"
);
DBUG_PRINT
(
"info"
,
(
"attr=%s post/pre=%d"
,
tAttrInfo
->
m_name
.
c_str
(),
n
));
// as in NdbOperation, create only one instance
NdbBlob
*
tBlob
=
theBlobList
;
NdbBlob
*
tLastBlob
=
NULL
;
while
(
tBlob
!=
NULL
)
{
if
(
tBlob
->
theColumn
==
tAttrInfo
&&
tBlob
->
theEventBlobVersion
==
n
)
DBUG_RETURN
(
tBlob
);
tLastBlob
=
tBlob
;
tBlob
=
tBlob
->
theNext
;
}
// blob event name
char
bename
[
MAX_TAB_NAME_SIZE
];
NdbBlob
::
getBlobEventName
(
bename
,
m_eventImpl
,
tAttrInfo
);
// find blob event op if any (it serves both post and pre handles)
assert
(
tAttrInfo
->
m_blobTable
!=
NULL
);
NdbEventOperationImpl
*
tBlobOp
=
theBlobOpList
;
NdbEventOperationImpl
*
tLastBlopOp
=
NULL
;
while
(
tBlobOp
!=
NULL
)
{
if
(
strcmp
(
tBlobOp
->
m_eventImpl
->
m_name
.
c_str
(),
bename
)
==
0
)
{
assert
(
tBlobOp
->
m_eventImpl
->
m_tableImpl
==
tAttrInfo
->
m_blobTable
);
break
;
}
tLastBlopOp
=
tBlobOp
;
tBlobOp
=
tBlobOp
->
theNextBlobOp
;
}
DBUG_PRINT
(
"info"
,
(
"%s op %s"
,
tBlobOp
?
" reuse"
:
" create"
,
bename
));
// create blob event op if not found
if
(
tBlobOp
==
NULL
)
{
NdbEventOperation
*
tmp
=
m_ndb
->
createEventOperation
(
bename
);
if
(
tmp
==
NULL
)
DBUG_RETURN
(
NULL
);
tBlobOp
=
&
tmp
->
m_impl
;
// pointer to main table op
tBlobOp
->
theMainOp
=
this
;
tBlobOp
->
m_mergeEvents
=
m_mergeEvents
;
// add to list end
if
(
tLastBlopOp
==
NULL
)
theBlobOpList
=
tBlobOp
;
else
tLastBlopOp
->
theNextBlobOp
=
tBlobOp
;
tBlobOp
->
theNextBlobOp
=
NULL
;
}
tBlob
=
m_ndb
->
getNdbBlob
();
if
(
tBlob
==
NULL
)
DBUG_RETURN
(
NULL
);
// calls getValue on inline and blob part
if
(
tBlob
->
atPrepare
(
this
,
tBlobOp
,
tAttrInfo
,
n
)
==
-
1
)
{
m_ndb
->
releaseNdbBlob
(
tBlob
);
DBUG_RETURN
(
NULL
);
}
// add to list end
if
(
tLastBlob
==
NULL
)
theBlobList
=
tBlob
;
else
tLastBlob
->
theNext
=
tBlob
;
tBlob
->
theNext
=
NULL
;
DBUG_RETURN
(
tBlob
);
}
int
NdbEventOperationImpl
::
readBlobParts
(
char
*
buf
,
NdbBlob
*
blob
,
Uint32
part
,
Uint32
count
)
{
DBUG_ENTER_EVENT
(
"NdbEventOperationImpl::readBlobParts"
);
DBUG_PRINT_EVENT
(
"info"
,
(
"part=%u count=%u post/pre=%d"
,
part
,
count
,
blob
->
theEventBlobVersion
));
NdbEventOperationImpl
*
blob_op
=
blob
->
theBlobEventOp
;
EventBufData
*
main_data
=
m_data_item
;
DBUG_PRINT_EVENT
(
"info"
,
(
"main_data=%p"
,
main_data
));
assert
(
main_data
!=
NULL
);
// search for blob parts list head
EventBufData
*
head
;
assert
(
m_data_item
!=
NULL
);
head
=
m_data_item
->
m_next_blob
;
while
(
head
!=
NULL
)
{
if
(
head
->
m_event_op
==
blob_op
)
{
DBUG_PRINT_EVENT
(
"info"
,
(
"found blob parts head %p"
,
head
));
break
;
}
head
=
head
->
m_next_blob
;
}
Uint32
nparts
=
0
;
EventBufData
*
data
=
head
;
// XXX optimize using part no ordering
while
(
data
!=
NULL
)
{
/*
* Hack part no directly out of buffer since it is not returned
* in pre data (PK buglet). For part data use receive_event().
* This means extra copy.
*/
blob_op
->
m_data_item
=
data
;
int
r
=
blob_op
->
receive_event
();
assert
(
r
>
0
);
Uint32
no
=
data
->
get_blob_part_no
();
Uint32
sz
=
blob
->
thePartSize
;
const
char
*
src
=
blob
->
theBlobEventDataBuf
.
data
;
DBUG_PRINT_EVENT
(
"info"
,
(
"part_data=%p part no=%u part sz=%u"
,
data
,
no
,
sz
));
if
(
part
<=
no
&&
no
<
part
+
count
)
{
DBUG_PRINT_EVENT
(
"info"
,
(
"part within read range"
));
memcpy
(
buf
+
(
no
-
part
)
*
sz
,
src
,
sz
);
nparts
++
;
}
else
{
DBUG_PRINT_EVENT
(
"info"
,
(
"part outside read range"
));
}
data
=
data
->
m_next
;
}
assert
(
nparts
==
count
);
DBUG_RETURN_EVENT
(
0
);
}
int
NdbEventOperationImpl
::
execute
()
{
DBUG_ENTER
(
"NdbEventOperationImpl::execute"
);
m_ndb
->
theEventBuffer
->
add_drop_lock
();
int
r
=
execute_nolock
();
m_ndb
->
theEventBuffer
->
add_drop_unlock
();
DBUG_RETURN
(
r
);
}
int
NdbEventOperationImpl
::
execute_nolock
()
{
DBUG_ENTER
(
"NdbEventOperationImpl::execute_nolock"
);
DBUG_PRINT
(
"info"
,
(
"this=%p type=%s"
,
this
,
!
theMainOp
?
"main"
:
"blob"
));
NdbDictionary
::
Dictionary
*
myDict
=
m_ndb
->
getDictionary
();
if
(
!
myDict
)
{
m_error
.
code
=
m_ndb
->
getNdbError
().
code
;
...
...
@@ -266,17 +463,25 @@ NdbEventOperationImpl::execute()
if
(
theFirstPkAttrs
[
0
]
==
NULL
&&
theFirstDataAttrs
[
0
]
==
NULL
)
{
// defaults to get all
}
m_ndb
->
theEventBuffer
->
add_drop_lock
();
m_magic_number
=
NDB_EVENT_OP_MAGIC_NUMBER
;
m_state
=
EO_EXECUTING
;
mi_type
=
m_eventImpl
->
mi_type
;
m_ndb
->
theEventBuffer
->
add_op
();
int
r
=
NdbDictionaryImpl
::
getImpl
(
*
myDict
).
executeSubscribeEvent
(
*
this
);
if
(
r
==
0
)
{
m_ndb
->
theEventBuffer
->
add_drop_unlock
();
if
(
theMainOp
==
NULL
)
{
DBUG_PRINT
(
"info"
,
(
"execute blob ops"
));
NdbEventOperationImpl
*
blob_op
=
theBlobOpList
;
while
(
blob_op
!=
NULL
)
{
r
=
blob_op
->
execute_nolock
();
if
(
r
!=
0
)
break
;
blob_op
=
blob_op
->
theNextBlobOp
;
}
}
if
(
r
==
0
)
DBUG_RETURN
(
0
);
}
//Error
...
...
@@ -285,7 +490,6 @@ NdbEventOperationImpl::execute()
m_magic_number
=
0
;
m_error
.
code
=
myDict
->
getNdbError
().
code
;
m_ndb
->
theEventBuffer
->
remove_op
();
m_ndb
->
theEventBuffer
->
add_drop_unlock
();
DBUG_RETURN
(
r
);
}
...
...
@@ -709,21 +913,6 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
return
ret
;
}
#ifdef VM_TRACE
static
void
print_std
(
const
char
*
tag
,
const
SubTableData
*
sdata
,
LinearSectionPtr
ptr
[
3
])
{
printf
(
"%s
\n
"
,
tag
);
printf
(
"addr=%p gci=%d op=%d
\n
"
,
(
void
*
)
sdata
,
sdata
->
gci
,
sdata
->
operation
);
for
(
int
i
=
0
;
i
<=
2
;
i
++
)
{
printf
(
"sec=%d addr=%p sz=%d
\n
"
,
i
,
(
void
*
)
ptr
[
i
].
p
,
ptr
[
i
].
sz
);
for
(
int
j
=
0
;
j
<
ptr
[
i
].
sz
;
j
++
)
printf
(
"%08x "
,
ptr
[
i
].
p
[
j
]);
printf
(
"
\n
"
);
}
}
#endif
NdbEventOperation
*
NdbEventBuffer
::
nextEvent
()
{
...
...
@@ -751,6 +940,7 @@ NdbEventBuffer::nextEvent()
while
((
data
=
m_available_data
.
m_head
))
{
NdbEventOperationImpl
*
op
=
data
->
m_event_op
;
DBUG_PRINT_EVENT
(
"info"
,
(
"available data=%p op=%p"
,
data
,
op
));
// set NdbEventOperation data
op
->
m_data_item
=
data
;
...
...
@@ -767,7 +957,10 @@ NdbEventBuffer::nextEvent()
// NUL event is not returned
if
(
data
->
sdata
->
operation
==
NdbDictionary
::
Event
::
_TE_NUL
)
{
DBUG_PRINT_EVENT
(
"info"
,
(
"skip _TE_NUL"
));
continue
;
}
int
r
=
op
->
receive_event
();
if
(
r
>
0
)
...
...
@@ -777,6 +970,12 @@ NdbEventBuffer::nextEvent()
#ifdef VM_TRACE
m_latest_command
=
m_latest_command_save
;
#endif
NdbBlob
*
tBlob
=
op
->
theBlobList
;
while
(
tBlob
!=
NULL
)
{
(
void
)
tBlob
->
atNextEvent
();
tBlob
=
tBlob
->
theNext
;
}
DBUG_RETURN_EVENT
(
op
->
m_facade
);
}
// the next event belonged to an event op that is no
...
...
@@ -1161,7 +1360,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_ENTER_EVENT
(
"NdbEventBuffer::insertDataL"
);
Uint64
gci
=
sdata
->
gci
;
if
(
likely
((
Uint32
)
op
->
mi_type
&
1
<<
(
Uint32
)
sdata
->
operation
)
)
if
(
likely
((
Uint32
)
op
->
mi_type
&
(
1
<<
(
Uint32
)
sdata
->
operation
)
)
)
{
Gci_container
*
bucket
=
find_bucket
(
&
m_active_gci
,
gci
);
...
...
@@ -1179,9 +1378,9 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_RETURN_EVENT
(
0
);
}
bool
use_hash
=
op
->
m_mergeEvents
&&
const
bool
is_data_event
=
sdata
->
operation
<
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
;
const
bool
use_hash
=
op
->
m_mergeEvents
&&
is_data_event
;
// find position in bucket hash table
EventBufData
*
data
=
0
;
...
...
@@ -1207,10 +1406,38 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
op
->
m_has_error
=
3
;
DBUG_RETURN_EVENT
(
-
1
);
}
// add it to list and hash table
data
->
m_event_op
=
op
;
if
(
op
->
theMainOp
==
NULL
||
!
is_data_event
)
{
bucket
->
m_data
.
append
(
data
);
}
else
{
// find or create main event for this blob event
EventBufData_hash
::
Pos
main_hpos
;
int
ret
=
get_main_data
(
bucket
,
main_hpos
,
data
);
if
(
ret
==
-
1
)
{
op
->
m_has_error
=
4
;
DBUG_RETURN_EVENT
(
-
1
);
}
EventBufData
*
main_data
=
main_hpos
.
data
;
if
(
ret
!=
0
)
// main event was created
{
main_data
->
m_event_op
=
op
->
theMainOp
;
bucket
->
m_data
.
append
(
main_data
);
if
(
use_hash
)
{
main_data
->
m_pkhash
=
main_hpos
.
pkhash
;
bucket
->
m_data_hash
.
append
(
main_hpos
,
main_data
);
}
}
// link blob event under main event
add_blob_data
(
main_data
,
data
);
}
if
(
use_hash
)
{
data
->
m_pkhash
=
hpos
.
pkhash
;
bucket
->
m_data_hash
.
append
(
hpos
,
data
);
}
#ifdef VM_TRACE
...
...
@@ -1226,18 +1453,12 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_RETURN_EVENT
(
-
1
);
}
}
data
->
m_event_op
=
op
;
if
(
use_hash
)
{
data
->
m_pkhash
=
hpos
.
pkhash
;
}
DBUG_RETURN_EVENT
(
0
);
}
#ifdef VM_TRACE
if
((
Uint32
)
op
->
m_eventImpl
->
mi_type
&
1
<<
(
Uint32
)
sdata
->
operation
)
if
((
Uint32
)
op
->
m_eventImpl
->
mi_type
&
(
1
<<
(
Uint32
)
sdata
->
operation
)
)
{
// XXX never reached
DBUG_PRINT_EVENT
(
"info"
,(
"Data arrived before ready eventId"
,
op
->
m_eventId
));
DBUG_RETURN_EVENT
(
0
);
}
...
...
@@ -1300,6 +1521,8 @@ NdbEventBuffer::alloc_data()
int
NdbEventBuffer
::
alloc_mem
(
EventBufData
*
data
,
LinearSectionPtr
ptr
[
3
])
{
DBUG_ENTER
(
"NdbEventBuffer::alloc_mem"
);
DBUG_PRINT
(
"info"
,
(
"ptr sz %u + %u + %u"
,
ptr
[
0
].
sz
,
ptr
[
1
].
sz
,
ptr
[
2
].
sz
));
const
Uint32
min_alloc_size
=
128
;
Uint32
sz4
=
(
sizeof
(
SubTableData
)
+
3
)
>>
2
;
...
...
@@ -1317,7 +1540,7 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
data
->
memory
=
(
Uint32
*
)
NdbMem_Allocate
(
alloc_size
);
if
(
data
->
memory
==
0
)
return
-
1
;
DBUG_RETURN
(
-
1
)
;
data
->
sz
=
alloc_size
;
m_total_alloc
+=
data
->
sz
;
}
...
...
@@ -1332,7 +1555,7 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
memptr
+=
ptr
[
i
].
sz
;
}
return
0
;
DBUG_RETURN
(
0
)
;
}
int
...
...
@@ -1404,13 +1627,10 @@ copy_attr(AttributeHeader ah,
{
Uint32
k
;
for
(
k
=
0
;
k
<
n
;
k
++
)
p1
[
j1
++
]
=
p2
[
j2
++
];
p1
[
j1
+
k
]
=
p2
[
j2
+
k
];
}
else
{
j1
+=
n
;
j2
+=
n
;
}
}
int
...
...
@@ -1443,8 +1663,8 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
data
->
sz
=
0
;
// compose ptr1 o ptr2 = ptr
LinearSectionPtr
(
&
ptr1
)
[
3
]
=
olddata
.
ptr
;
LinearSectionPtr
(
&
ptr
)
[
3
]
=
data
->
ptr
;
LinearSectionPtr
(
&
ptr1
)[
3
]
=
olddata
.
ptr
;
LinearSectionPtr
(
&
ptr
)[
3
]
=
data
->
ptr
;
// loop twice where first loop only sets sizes
int
loop
;
...
...
@@ -1458,7 +1678,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
data
->
sdata
->
operation
=
tp
->
t3
;
}
ptr
[
0
].
sz
=
ptr
[
1
].
sz
=
ptr
[
3
].
sz
=
0
;
ptr
[
0
].
sz
=
ptr
[
1
].
sz
=
ptr
[
2
].
sz
=
0
;
// copy pk from new version
{
...
...
@@ -1573,6 +1793,113 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
DBUG_RETURN_EVENT
(
0
);
}
/*
* Given blob part event, find main table event on inline part. It
* should exist (force in TUP) but may arrive later. If so, create
* NUL event on main table. The real event replaces it later.
*/
// write attribute headers for concatened PK
static
void
split_concatenated_pk
(
const
NdbTableImpl
*
t
,
Uint32
*
ah_buffer
,
const
Uint32
*
pk_buffer
,
Uint32
pk_sz
)
{
Uint32
sz
=
0
;
// words parsed so far
Uint32
n
;
// pk attr count
Uint32
i
;
for
(
i
=
n
=
0
;
i
<
t
->
m_columns
.
size
()
&&
n
<
t
->
m_noOfKeys
;
i
++
)
{
const
NdbColumnImpl
*
c
=
t
->
getColumn
(
i
);
assert
(
c
!=
NULL
);
if
(
!
c
->
m_pk
)
continue
;
assert
(
sz
<
pk_sz
);
Uint32
bytesize
=
c
->
m_attrSize
*
c
->
m_arraySize
;
Uint32
lb
,
len
;
bool
ok
=
NdbSqlUtil
::
get_var_length
(
c
->
m_type
,
&
pk_buffer
[
sz
],
bytesize
,
lb
,
len
);
assert
(
ok
);
AttributeHeader
ah
(
i
,
lb
+
len
);
ah_buffer
[
n
++
]
=
ah
.
m_value
;
sz
+=
ah
.
getDataSize
();
}
assert
(
n
==
t
->
m_noOfKeys
&&
sz
==
pk_sz
);
}
int
NdbEventBuffer
::
get_main_data
(
Gci_container
*
bucket
,
EventBufData_hash
::
Pos
&
hpos
,
EventBufData
*
blob_data
)
{
DBUG_ENTER_EVENT
(
"NdbEventBuffer::get_main_data"
);
NdbEventOperationImpl
*
main_op
=
blob_data
->
m_event_op
->
theMainOp
;
assert
(
main_op
!=
NULL
);
const
NdbTableImpl
*
mainTable
=
main_op
->
m_eventImpl
->
m_tableImpl
;
// create LinearSectionPtr for main table key
LinearSectionPtr
ptr
[
3
];
Uint32
ah_buffer
[
NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY
];
ptr
[
0
].
sz
=
mainTable
->
m_noOfKeys
;
ptr
[
0
].
p
=
ah_buffer
;
ptr
[
1
].
sz
=
AttributeHeader
(
blob_data
->
ptr
[
0
].
p
[
0
]).
getDataSize
();
ptr
[
1
].
p
=
blob_data
->
ptr
[
1
].
p
;
ptr
[
2
].
sz
=
0
;
ptr
[
2
].
p
=
0
;
split_concatenated_pk
(
mainTable
,
ptr
[
0
].
p
,
ptr
[
1
].
p
,
ptr
[
1
].
sz
);
DBUG_DUMP_EVENT
(
"ah"
,
(
char
*
)
ptr
[
0
].
p
,
ptr
[
0
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"pk"
,
(
char
*
)
ptr
[
1
].
p
,
ptr
[
1
].
sz
<<
2
);
// search for main event buffer
bucket
->
m_data_hash
.
search
(
hpos
,
main_op
,
ptr
);
if
(
hpos
.
data
!=
NULL
)
DBUG_RETURN_EVENT
(
0
);
// not found, create a place-holder
EventBufData
*
main_data
=
alloc_data
();
if
(
main_data
==
NULL
)
DBUG_RETURN_EVENT
(
-
1
);
SubTableData
sdata
=
*
blob_data
->
sdata
;
sdata
.
tableId
=
main_op
->
m_eventImpl
->
m_tableImpl
->
m_id
;
sdata
.
operation
=
NdbDictionary
::
Event
::
_TE_NUL
;
if
(
copy_data
(
&
sdata
,
ptr
,
main_data
)
!=
0
)
DBUG_RETURN_EVENT
(
-
1
);
hpos
.
data
=
main_data
;
DBUG_RETURN_EVENT
(
1
);
}
void
NdbEventBuffer
::
add_blob_data
(
EventBufData
*
main_data
,
EventBufData
*
blob_data
)
{
DBUG_ENTER_EVENT
(
"NdbEventBuffer::add_blob_data"
);
DBUG_PRINT_EVENT
(
"info"
,
(
"main_data=%p blob_data=%p"
,
main_data
,
blob_data
));
EventBufData
*
head
;
head
=
main_data
->
m_next_blob
;
while
(
head
!=
NULL
)
{
if
(
head
->
m_event_op
==
blob_data
->
m_event_op
)
break
;
head
=
head
->
m_next_blob
;
}
if
(
head
==
NULL
)
{
head
=
blob_data
;
head
->
m_next_blob
=
main_data
->
m_next_blob
;
main_data
->
m_next_blob
=
head
;
}
else
{
blob_data
->
m_next
=
head
->
m_next
;
head
->
m_next
=
blob_data
;
}
DBUG_VOID_RETURN_EVENT
;
}
NdbEventOperationImpl
*
NdbEventBuffer
::
move_data
()
{
...
...
@@ -1613,6 +1940,31 @@ NdbEventBuffer::free_list(EventBufData_list &list)
#endif
m_free_data_sz
+=
list
.
m_sz
;
// free blobs XXX unacceptable performance, fix later
{
EventBufData
*
data
=
list
.
m_head
;
while
(
1
)
{
while
(
data
->
m_next_blob
!=
NULL
)
{
EventBufData
*
blob_head
=
data
->
m_next_blob
;
data
->
m_next_blob
=
blob_head
->
m_next_blob
;
blob_head
->
m_next_blob
=
NULL
;
while
(
blob_head
!=
NULL
)
{
EventBufData
*
blob_part
=
blob_head
;
blob_head
=
blob_head
->
m_next
;
blob_part
->
m_next
=
m_free_data
;
m_free_data
=
blob_part
;
#ifdef VM_TRACE
m_free_data_count
++
;
#endif
m_free_data_sz
+=
blob_part
->
sz
;
}
}
if
(
data
==
list
.
m_tail
)
break
;
data
=
data
->
m_next
;
}
}
// list returned to m_free_data
new
(
&
list
)
EventBufData_list
;
}
...
...
@@ -1649,6 +2001,14 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
m_dropped_ev_op
->
m_prev
=
op
;
m_dropped_ev_op
=
op
;
// drop blob ops
while
(
op
->
theBlobOpList
!=
NULL
)
{
NdbEventOperationImpl
*
tBlobOp
=
op
->
theBlobOpList
;
op
->
theBlobOpList
=
op
->
theBlobOpList
->
theNextBlobOp
;
(
void
)
m_ndb
->
dropEventOperation
(
tBlobOp
);
}
// ToDo, take care of these to be deleted at the
// appropriate time, after we are sure that there
// are _no_ more events coming
...
...
@@ -1717,6 +2077,10 @@ send_report:
Uint32
EventBufData_hash
::
getpkhash
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
])
{
DBUG_ENTER_EVENT
(
"EventBufData_hash::getpkhash"
);
DBUG_DUMP_EVENT
(
"ah"
,
(
char
*
)
ptr
[
0
].
p
,
ptr
[
0
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"pk"
,
(
char
*
)
ptr
[
1
].
p
,
ptr
[
1
].
sz
<<
2
);
const
NdbTableImpl
*
tab
=
op
->
m_eventImpl
->
m_tableImpl
;
// in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
...
...
@@ -1747,13 +2111,19 @@ EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
(
*
cs
->
coll
->
hash_sort
)(
cs
,
dptr
+
lb
,
len
,
&
nr1
,
&
nr2
);
dptr
+=
((
bytesize
+
3
)
/
4
)
*
4
;
}
return
nr1
;
DBUG_PRINT_EVENT
(
"info"
,
(
"hash result=%08x"
,
nr1
));
DBUG_RETURN_EVENT
(
nr1
);
}
// this is seldom invoked
bool
EventBufData_hash
::
getpkequal
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr1
[
3
],
LinearSectionPtr
ptr2
[
3
])
{
DBUG_ENTER_EVENT
(
"EventBufData_hash::getpkequal"
);
DBUG_DUMP_EVENT
(
"ah1"
,
(
char
*
)
ptr1
[
0
].
p
,
ptr1
[
0
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"pk1"
,
(
char
*
)
ptr1
[
1
].
p
,
ptr1
[
1
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"ah2"
,
(
char
*
)
ptr2
[
0
].
p
,
ptr2
[
0
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"pk2"
,
(
char
*
)
ptr2
[
1
].
p
,
ptr2
[
1
].
sz
<<
2
);
const
NdbTableImpl
*
tab
=
op
->
m_eventImpl
->
m_tableImpl
;
Uint32
nkey
=
tab
->
m_noOfKeys
;
...
...
@@ -1763,6 +2133,8 @@ EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3
const
uchar
*
dptr1
=
(
uchar
*
)
ptr1
[
1
].
p
;
const
uchar
*
dptr2
=
(
uchar
*
)
ptr2
[
1
].
p
;
bool
equal
=
true
;
while
(
nkey
--
!=
0
)
{
AttributeHeader
ah1
(
*
hptr1
++
);
...
...
@@ -1787,16 +2159,22 @@ EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3
CHARSET_INFO
*
cs
=
col
->
m_cs
?
col
->
m_cs
:
&
my_charset_bin
;
int
res
=
(
cs
->
coll
->
strnncollsp
)(
cs
,
dptr1
+
lb1
,
len1
,
dptr2
+
lb2
,
len2
,
false
);
if
(
res
!=
0
)
return
false
;
{
equal
=
false
;
break
;
}
dptr1
+=
((
bytesize1
+
3
)
/
4
)
*
4
;
dptr2
+=
((
bytesize2
+
3
)
/
4
)
*
4
;
}
return
true
;
DBUG_PRINT_EVENT
(
"info"
,
(
"equal=%s"
,
equal
?
"true"
:
"false"
));
DBUG_RETURN_EVENT
(
equal
);
}
void
EventBufData_hash
::
search
(
Pos
&
hpos
,
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
])
{
DBUG_ENTER_EVENT
(
"EventBufData_hash::search"
);
Uint32
pkhash
=
getpkhash
(
op
,
ptr
);
Uint32
index
=
(
op
->
m_oid
^
pkhash
)
%
GCI_EVENT_HASH_SIZE
;
EventBufData
*
data
=
m_hash
[
index
];
...
...
@@ -1811,6 +2189,8 @@ EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr
hpos
.
index
=
index
;
hpos
.
data
=
data
;
hpos
.
pkhash
=
pkhash
;
DBUG_PRINT_EVENT
(
"info"
,
(
"search result=%p"
,
data
));
DBUG_VOID_RETURN_EVENT
;
}
template
class
Vector
<
Gci_container
>;
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
View file @
c68df5c1
...
...
@@ -21,6 +21,7 @@
#include <signaldata/SumaImpl.hpp>
#include <transporter/TransporterDefinitions.hpp>
#include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp>
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
...
...
@@ -35,9 +36,28 @@ struct EventBufData
LinearSectionPtr
ptr
[
3
];
unsigned
sz
;
NdbEventOperationImpl
*
m_event_op
;
EventBufData
*
m_next
;
// Next wrt to global order
/*
* Blobs are stored in blob list (m_next_blob) where each entry
* is list of parts (m_next) in part number order.
*
* TODO order by part no and link for fast read and free_list
*/
EventBufData
*
m_next
;
// Next wrt to global order or Next blob part
EventBufData
*
m_next_blob
;
// First part in next blob
EventBufData
*
m_next_hash
;
// Next in per-GCI hash
Uint32
m_pkhash
;
// PK hash (without op) for fast compare
// Get blob part number from blob data
Uint32
get_blob_part_no
()
{
assert
(
ptr
[
0
].
sz
>
2
);
Uint32
pos
=
AttributeHeader
(
ptr
[
0
].
p
[
0
]).
getDataSize
()
+
AttributeHeader
(
ptr
[
0
].
p
[
1
]).
getDataSize
();
Uint32
no
=
ptr
[
1
].
p
[
pos
];
return
no
;
}
};
class
EventBufData_list
...
...
@@ -70,7 +90,6 @@ EventBufData_list::~EventBufData_list()
{
}
inline
int
EventBufData_list
::
is_empty
()
{
...
...
@@ -173,9 +192,13 @@ public:
NdbEventOperation
::
State
getState
();
int
execute
();
int
execute_nolock
();
int
stop
();
NdbRecAttr
*
getValue
(
const
char
*
colName
,
char
*
aValue
,
int
n
);
NdbRecAttr
*
getValue
(
const
NdbColumnImpl
*
,
char
*
aValue
,
int
n
);
NdbBlob
*
getBlobHandle
(
const
char
*
colName
,
int
n
);
NdbBlob
*
getBlobHandle
(
const
NdbColumnImpl
*
,
int
n
);
int
readBlobParts
(
char
*
buf
,
NdbBlob
*
blob
,
Uint32
part
,
Uint32
count
);
int
receive_event
();
Uint64
getGCI
();
Uint64
getLatestGCI
();
...
...
@@ -199,6 +222,13 @@ public:
NdbRecAttr
*
theFirstDataAttrs
[
2
];
NdbRecAttr
*
theCurrentDataAttrs
[
2
];
NdbBlob
*
theBlobList
;
union
{
NdbEventOperationImpl
*
theBlobOpList
;
NdbEventOperationImpl
*
theNextBlobOp
;
};
NdbEventOperationImpl
*
theMainOp
;
// blob op pointer to main op
NdbEventOperation
::
State
m_state
;
/* note connection to mi_type */
Uint32
mi_type
;
/* should be == 0 if m_state != EO_EXECUTING
* else same as in EventImpl
...
...
@@ -275,6 +305,11 @@ public:
int
merge_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
);
int
get_main_data
(
Gci_container
*
bucket
,
EventBufData_hash
::
Pos
&
hpos
,
EventBufData
*
blob_data
);
void
add_blob_data
(
EventBufData
*
main_data
,
EventBufData
*
blob_data
);
void
free_list
(
EventBufData_list
&
list
);
...
...
storage/ndb/test/ndbapi/test_event_merge.cpp
View file @
c68df5c1
...
...
@@ -21,14 +21,7 @@
#include <my_sys.h>
#include <ndb_version.h>
#if NDB_VERSION_D < MAKE_VERSION(5, 1, 0)
#define version50
#else
#undef version50
#endif
// until rbr in 5.1
#undef version51rbr
// version >= 5.1 required
#if !defined(min) || !defined(max)
#define min(x, y) ((x) < (y) ? (x) : (y))
...
...
@@ -57,11 +50,11 @@
* There are other -no-* options, each added to isolate a specific bug.
*
* There are 5 ways (ignoring NUL operand) to compose 2 ops:
*
5.0 bugs 5.1 bugs
*
* INS o DEL = NUL
* INS o UPD = INS
type=INS
* DEL o INS = UPD
type=INS type=INS
* UPD o DEL = DEL
no event
* INS o UPD = INS
* DEL o INS = UPD
* UPD o DEL = DEL
* UPD o UPD = UPD
*/
...
...
@@ -73,17 +66,19 @@ struct Opts {
uint
maxpk
;
my_bool
no_blobs
;
my_bool
no_implicit_nulls
;
my_bool
no_missing_update
;
my_bool
no_multiops
;
my_bool
no_nulls
;
my_bool
one_blob
;
const
char
*
opstring
;
uint
seed
;
my_bool
separate_events
;
uint
tweak
;
// whatever's useful
my_bool
use_table
;
};
static
Opts
g_opts
;
static
const
uint
g_maxpk
=
100
;
static
const
uint
g_maxpk
=
100
0
;
static
const
uint
g_maxopstringpart
=
100
;
static
const
char
*
g_opstringpart
[
g_maxopstringpart
];
static
uint
g_opstringparts
=
0
;
...
...
@@ -712,6 +707,20 @@ checkop(const Op* op, Uint32& pk1)
if
(
!
c
.
nullable
)
{
chkrc
(
ind0
<=
0
&&
ind1
<=
0
);
}
if
(
c
.
isblob
())
{
// blob values must be from allowed chars
int
j
;
for
(
j
=
0
;
j
<
2
;
j
++
)
{
const
Data
&
d
=
op
->
data
[
j
];
if
(
d
.
ind
[
i
]
==
0
)
{
const
Data
::
Txt
&
t
=
*
d
.
ptr
[
i
].
txt
;
int
k
;
for
(
k
=
0
;
k
<
t
.
len
;
k
++
)
{
chkrc
(
strchr
(
g_charval
,
t
.
val
[
k
])
!=
0
);
}
}
}
}
}
return
0
;
}
...
...
@@ -849,9 +858,8 @@ createevent()
const
Col
&
c
=
g_col
[
i
];
evt
.
addEventColumn
(
c
.
name
);
}
#ifdef version51rbr
evt
.
setReport
(
NdbDictionary
::
Event
::
ER_UPDATED
);
evt
.
mergeEvents
(
!
g_opts
.
separate_events
);
#endif
if
(
g_dic
->
getEvent
(
evt
.
getName
())
!=
0
)
chkdb
(
g_dic
->
dropEvent
(
evt
.
getName
())
==
0
);
chkdb
(
g_dic
->
createEvent
(
evt
)
==
0
);
...
...
@@ -875,14 +883,8 @@ static int
createeventop
()
{
ll1
(
"createeventop"
);
#ifdef version50
uint
bsz
=
10
*
g_opts
.
maxops
;
chkdb
((
g_evt_op
=
g_ndb
->
createEventOperation
(
g_evt
->
getName
(),
bsz
))
!=
0
);
#else
chkdb
((
g_evt_op
=
g_ndb
->
createEventOperation
(
g_evt
->
getName
()))
!=
0
);
// available in gci merge changeset
g_evt_op
->
mergeEvents
(
!
g_opts
.
separate_events
);
// not yet inherited
#endif
uint
i
;
for
(
i
=
0
;
i
<
ncol
();
i
++
)
{
const
Col
&
c
=
g_col
[
i
];
...
...
@@ -891,10 +893,8 @@ createeventop()
chkdb
((
g_ev_ra
[
0
][
i
]
=
g_evt_op
->
getValue
(
c
.
name
,
(
char
*
)
d
[
0
].
ptr
[
i
].
v
))
!=
0
);
chkdb
((
g_ev_ra
[
1
][
i
]
=
g_evt_op
->
getPreValue
(
c
.
name
,
(
char
*
)
d
[
1
].
ptr
[
i
].
v
))
!=
0
);
}
else
{
#ifdef version51rbr
chkdb
((
g_ev_bh
[
0
][
i
]
=
g_evt_op
->
getBlobHandle
(
c
.
name
))
!=
0
);
chkdb
((
g_ev_bh
[
1
][
i
]
=
g_evt_op
->
getPreBlobHandle
(
c
.
name
))
!=
0
);
#endif
}
}
return
0
;
...
...
@@ -909,10 +909,10 @@ dropeventop()
return
0
;
}
// wait for event to be installed and for GCIs to pass
static
int
waitgci
(
)
// wait for event to be installed and for at least 1 GCI to pass
waitgci
(
uint
ngci
)
{
const
uint
ngci
=
3
;
ll1
(
"waitgci "
<<
ngci
);
Uint32
gci
[
2
];
uint
i
=
0
;
...
...
@@ -976,7 +976,6 @@ scantab()
if
(
!
c
.
isblob
())
{
ind
=
ra
[
i
]
->
isNULL
();
}
else
{
#ifdef version51rbr
int
ret
;
ret
=
bh
[
i
]
->
getDefined
(
ind
);
assert
(
ret
==
0
);
...
...
@@ -992,8 +991,10 @@ scantab()
Uint32
len
=
t
.
len
;
ret
=
bh
[
i
]
->
readData
(
t
.
val
,
len
);
assert
(
ret
==
0
&&
len
==
t
.
len
);
// to see the data, have to execute...
chkdb
(
g_con
->
execute
(
NoCommit
)
==
0
);
assert
(
memchr
(
t
.
val
,
'X'
,
t
.
len
)
==
0
);
}
#endif
}
assert
(
ind
>=
0
);
d0
.
ind
[
i
]
=
ind
;
...
...
@@ -1042,7 +1043,7 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
}
else
if
(
t
==
Op
::
INS
&&
!
g_opts
.
no_implicit_nulls
&&
c
.
nullable
&&
urandom
(
10
,
100
))
{
d
.
noop
|=
(
1
<<
i
);
d
.
ind
[
i
]
=
1
;
// implicit NULL value is known
}
else
if
(
t
==
Op
::
UPD
&&
urandom
(
10
,
100
))
{
}
else
if
(
t
==
Op
::
UPD
&&
!
g_opts
.
no_missing_update
&&
urandom
(
10
,
100
))
{
d
.
noop
|=
(
1
<<
i
);
d
.
ind
[
i
]
=
-
1
;
// fixed up in caller
}
else
if
(
!
g_opts
.
no_nulls
&&
c
.
nullable
&&
urandom
(
10
,
100
))
{
...
...
@@ -1060,6 +1061,8 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
{
char
*
p
=
d
.
ptr
[
i
].
ch
;
uint
u
=
urandom
(
g_charlen
);
if
(
u
==
0
)
u
=
urandom
(
g_charlen
);
// 2x bias for non-empty
uint
j
;
for
(
j
=
0
;
j
<
g_charlen
;
j
++
)
{
uint
v
=
urandom
(
strlen
(
g_charval
));
...
...
@@ -1070,10 +1073,19 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
case
NdbDictionary
:
:
Column
::
Text
:
{
Data
::
Txt
&
t
=
*
d
.
ptr
[
i
].
txt
;
delete
[]
t
.
val
;
t
.
val
=
0
;
if
(
g_opts
.
tweak
&
1
)
{
uint
u
=
256
+
2000
;
uint
v
=
(
g_opts
.
tweak
&
2
)
?
0
:
urandom
(
strlen
(
g_charval
));
t
.
val
=
new
char
[
u
];
t
.
len
=
u
;
memset
(
t
.
val
,
g_charval
[
v
],
u
);
break
;
}
uint
u
=
urandom
(
g_maxblobsize
);
u
=
urandom
(
u
);
// 4x bias for smaller blobs
u
=
urandom
(
u
);
delete
[]
t
.
val
;
t
.
val
=
new
char
[
u
];
t
.
len
=
u
;
uint
j
=
0
;
...
...
@@ -1134,9 +1146,15 @@ makeops()
{
ll1
(
"makeops"
);
Uint32
pk1
=
0
;
while
(
g_usedops
<
g_opts
.
maxops
&&
pk1
<
g_opts
.
maxpk
)
{
if
(
g_opts
.
opstring
==
0
)
while
(
1
)
{
if
(
g_opts
.
opstring
==
0
)
{
if
(
g_usedops
>=
g_opts
.
maxops
)
// use up ops
break
;
pk1
=
urandom
(
g_opts
.
maxpk
);
}
else
{
if
(
pk1
>=
g_opts
.
maxpk
)
// use up pks
break
;
}
ll2
(
"makeops: pk1="
<<
pk1
);
// total op on the pk so far
// optype either NUL=initial/deleted or INS=created
...
...
@@ -1465,7 +1483,7 @@ matchevent(Op* ev)
}
if
(
tmpok
)
{
ok
=
gci_op
->
match
=
true
;
ll2
(
"
===:
match"
);
ll2
(
"match"
);
}
}
pos
++
;
...
...
@@ -1555,7 +1573,6 @@ geteventdata()
NdbRecAttr
*
ra
=
g_ev_ra
[
j
][
i
];
ind
=
ra
->
isNULL
();
}
else
{
#ifdef version51rbr
NdbBlob
*
bh
=
g_ev_bh
[
j
][
i
];
ret
=
bh
->
getDefined
(
ind
);
assert
(
ret
==
0
);
...
...
@@ -1572,7 +1589,6 @@ geteventdata()
ret
=
bh
->
readData
(
t
.
val
,
len
);
assert
(
ret
==
0
&&
len
==
t
.
len
);
}
#endif
}
d
[
j
].
ind
[
i
]
=
ind
;
}
...
...
@@ -1585,38 +1601,22 @@ runevents()
ll1
(
"runevents"
);
uint
mspoll
=
1000
;
uint
npoll
=
6
;
// strangely long delay
ll1
(
"poll "
<<
npoll
);
while
(
npoll
!=
0
)
{
npoll
--
;
int
ret
;
ll1
(
"poll"
);
ret
=
g_ndb
->
pollEvents
(
mspoll
);
if
(
ret
<=
0
)
continue
;
while
(
1
)
{
g_rec_ev
->
init
(
Op
::
EV
);
#ifdef version50
int
overrun
=
g_opts
.
maxops
;
chkdb
((
ret
=
g_evt_op
->
next
(
&
overrun
))
>=
0
);
chkrc
(
overrun
==
0
);
if
(
ret
==
0
)
break
;
#else
NdbEventOperation
*
tmp_op
=
g_ndb
->
nextEvent
();
if
(
tmp_op
==
0
)
break
;
reqrc
(
g_evt_op
==
tmp_op
);
#endif
chkrc
(
seteventtype
(
g_rec_ev
,
g_evt_op
->
getEventType
())
==
0
);
geteventdata
();
g_rec_ev
->
gci
=
g_evt_op
->
getGCI
();
#ifdef version50
// fix to match 5.1
if
(
g_rec_ev
->
type
==
Op
::
UPD
)
{
Uint32
pk1
=
g_rec_ev
->
data
[
0
].
pk1
;
makedata
(
getcol
(
"pk1"
),
g_rec_ev
->
data
[
1
],
pk1
,
Op
::
UPD
);
makedata
(
getcol
(
"pk2"
),
g_rec_ev
->
data
[
1
],
pk1
,
Op
::
UPD
);
}
#endif
// get indicators and blob value
ll2
(
"runevents: EVT: "
<<
*
g_rec_ev
);
// check basic sanity
...
...
@@ -1667,7 +1667,7 @@ runtest()
chkrc
(
createtable
()
==
0
);
chkrc
(
createevent
()
==
0
);
for
(
g_loop
=
0
;
g_opts
.
loop
==
0
||
g_loop
<
g_opts
.
loop
;
g_loop
++
)
{
ll0
(
"
loop "
<<
g_loop
);
ll0
(
"
=== loop "
<<
g_loop
<<
" ==="
);
setseed
(
g_loop
);
resetmem
();
chkrc
(
scantab
()
==
0
);
// alternative: save tot_op for loop > 0
...
...
@@ -1675,7 +1675,7 @@ runtest()
g_rec_ev
=
getop
(
Op
::
EV
);
chkrc
(
createeventop
()
==
0
);
chkdb
(
g_evt_op
->
execute
()
==
0
);
chkrc
(
waitgci
()
==
0
);
chkrc
(
waitgci
(
3
)
==
0
);
chkrc
(
runops
()
==
0
);
if
(
!
g_opts
.
separate_events
)
chkrc
(
mergeops
()
==
0
);
...
...
@@ -1685,6 +1685,8 @@ runtest()
chkrc
(
matchevents
()
==
0
);
chkrc
(
matchops
()
==
0
);
chkrc
(
dropeventop
()
==
0
);
// time erases everything..
chkrc
(
waitgci
(
1
)
==
0
);
}
chkrc
(
dropevent
()
==
0
);
chkrc
(
droptable
()
==
0
);
...
...
@@ -1703,41 +1705,48 @@ my_long_options[] =
{
"loglevel"
,
1002
,
"Logging level in this program (default 0)"
,
(
gptr
*
)
&
g_opts
.
loglevel
,
(
gptr
*
)
&
g_opts
.
loglevel
,
0
,
GET_INT
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"loop"
,
1003
,
"Number of test loops (default
2
, 0=forever)"
,
{
"loop"
,
1003
,
"Number of test loops (default
3
, 0=forever)"
,
(
gptr
*
)
&
g_opts
.
loop
,
(
gptr
*
)
&
g_opts
.
loop
,
0
,
GET_INT
,
REQUIRED_ARG
,
2
,
0
,
0
,
0
,
0
,
0
},
GET_INT
,
REQUIRED_ARG
,
3
,
0
,
0
,
0
,
0
,
0
},
{
"maxops"
,
1004
,
"Approx number of PK operations (default 1000)"
,
(
gptr
*
)
&
g_opts
.
maxops
,
(
gptr
*
)
&
g_opts
.
maxops
,
0
,
GET_UINT
,
REQUIRED_ARG
,
1000
,
0
,
0
,
0
,
0
,
0
},
{
"maxpk"
,
1005
,
"Number of different PK values (default 10)"
,
(
gptr
*
)
&
g_opts
.
maxpk
,
(
gptr
*
)
&
g_opts
.
maxpk
,
0
,
GET_UINT
,
REQUIRED_ARG
,
10
,
1
,
g_maxpk
,
0
,
0
,
0
},
GET_UINT
,
REQUIRED_ARG
,
10
,
0
,
0
,
0
,
0
,
0
},
{
"no-blobs"
,
1006
,
"Omit blob attributes (5.0: true)"
,
(
gptr
*
)
&
g_opts
.
no_blobs
,
(
gptr
*
)
&
g_opts
.
no_blobs
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"no-implicit-nulls"
,
1007
,
"Insert must include NULL values explicitly"
,
{
"no-implicit-nulls"
,
1007
,
"Insert must include all attrs"
" i.e. no implicit NULLs"
,
(
gptr
*
)
&
g_opts
.
no_implicit_nulls
,
(
gptr
*
)
&
g_opts
.
no_implicit_nulls
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"no-multiops"
,
1008
,
"Allow only 1 operation per commit"
,
{
"no-missing-update"
,
1008
,
"Update must include all non-PK attrs"
,
(
gptr
*
)
&
g_opts
.
no_missing_update
,
(
gptr
*
)
&
g_opts
.
no_missing_update
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"no-multiops"
,
1009
,
"Allow only 1 operation per commit"
,
(
gptr
*
)
&
g_opts
.
no_multiops
,
(
gptr
*
)
&
g_opts
.
no_multiops
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"no-nulls"
,
10
09
,
"Create no NULL values"
,
{
"no-nulls"
,
10
10
,
"Create no NULL values"
,
(
gptr
*
)
&
g_opts
.
no_nulls
,
(
gptr
*
)
&
g_opts
.
no_nulls
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"one-blob"
,
101
0
,
"Only one blob attribute (defaut
t 2)"
,
{
"one-blob"
,
101
1
,
"Only one blob attribute (defaul
t 2)"
,
(
gptr
*
)
&
g_opts
.
one_blob
,
(
gptr
*
)
&
g_opts
.
one_blob
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"opstring"
,
101
1
,
"Operations to run e.g. idiucdc (c is commit) or"
{
"opstring"
,
101
2
,
"Operations to run e.g. idiucdc (c is commit) or"
" iuuc:uudc (the : separates loops)"
,
(
gptr
*
)
&
g_opts
.
opstring
,
(
gptr
*
)
&
g_opts
.
opstring
,
0
,
GET_STR_ALLOC
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"seed"
,
101
2
,
"Random seed (0=loop number, default -1=random)"
,
{
"seed"
,
101
3
,
"Random seed (0=loop number, default -1=random)"
,
(
gptr
*
)
&
g_opts
.
seed
,
(
gptr
*
)
&
g_opts
.
seed
,
0
,
GET_INT
,
REQUIRED_ARG
,
-
1
,
0
,
0
,
0
,
0
,
0
},
{
"separate-events"
,
101
3
,
"Do not combine events per GCI (5.0: true)"
,
{
"separate-events"
,
101
4
,
"Do not combine events per GCI (5.0: true)"
,
(
gptr
*
)
&
g_opts
.
separate_events
,
(
gptr
*
)
&
g_opts
.
separate_events
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"use-table"
,
1014
,
"Use existing table 'tem1'"
,
{
"tweak"
,
1015
,
"Whatever the source says"
,
(
gptr
*
)
&
g_opts
.
tweak
,
(
gptr
*
)
&
g_opts
.
tweak
,
0
,
GET_UINT
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"use-table"
,
1016
,
"Use existing table 'tem1'"
,
(
gptr
*
)
&
g_opts
.
use_table
,
(
gptr
*
)
&
g_opts
.
use_table
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
0
,
0
,
0
,
...
...
@@ -1754,9 +1763,10 @@ usage()
static
int
checkopts
()
{
#ifdef version50
g_opts
.
separate_events
=
true
;
#endif
if
(
g_opts
.
maxpk
>
g_maxpk
)
{
ll0
(
"setting maxpk to "
<<
g_maxpk
);
g_opts
.
maxpk
=
g_maxpk
;
}
if
(
g_opts
.
separate_events
)
{
g_opts
.
no_blobs
=
true
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment