Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
76629f52
Commit
76629f52
authored
Aug 31, 2004
by
joreland@mysql.com
Browse files
Options
Browse Files
Download
Plain Diff
Merge mysql.com:/home/jonas/src/mysql-4.1-ndb
into mysql.com:/home/jonas/src/wl2025
parents
cc9d1260
2856ddb6
Changes
10
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
530 additions
and
381 deletions
+530
-381
ndb/include/util/NdbSqlUtil.hpp
ndb/include/util/NdbSqlUtil.hpp
+6
-331
ndb/src/common/util/NdbSqlUtil.cpp
ndb/src/common/util/NdbSqlUtil.cpp
+247
-22
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
+14
-18
ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
+1
-1
ndb/src/kernel/blocks/dbtux/Times.txt
ndb/src/kernel/blocks/dbtux/Times.txt
+11
-1
ndb/src/ndbapi/NdbOperationInt.cpp
ndb/src/ndbapi/NdbOperationInt.cpp
+3
-0
ndb/src/ndbapi/NdbScanOperation.cpp
ndb/src/ndbapi/NdbScanOperation.cpp
+2
-2
ndb/test/ndbapi/Makefile.am
ndb/test/ndbapi/Makefile.am
+2
-1
ndb/test/ndbapi/slow_select.cpp
ndb/test/ndbapi/slow_select.cpp
+218
-0
ndb/test/ndbapi/testOIBasic.cpp
ndb/test/ndbapi/testOIBasic.cpp
+26
-5
No files found.
ndb/include/util/NdbSqlUtil.hpp
View file @
76629f52
...
...
@@ -40,8 +40,9 @@ public:
* Compare kernel attribute values. Returns -1, 0, +1 for less,
* equal, greater, respectively. Parameters are pointers to values,
* full attribute size in words, and size of available data in words.
* There are 2 special return values to check first. All values fit
* into a signed char.
* If available size is less than full size, CmpUnknown may be
* returned. If a value cannot be parsed, it compares like NULL i.e.
* less than any valid value.
*/
typedef
int
Cmp
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
);
...
...
@@ -49,8 +50,7 @@ public:
CmpLess
=
-
1
,
CmpEqual
=
0
,
CmpGreater
=
1
,
CmpUnknown
=
126
,
// insufficient partial data
CmpError
=
127
// bad data format or unimplemented comparison
CmpUnknown
=
2
// insufficient partial data
};
/**
...
...
@@ -82,19 +82,13 @@ public:
Text
// Text blob
};
Enum
m_typeId
;
Cmp
*
m_cmp
;
//
set to NULL if cmp not implemente
d
Cmp
*
m_cmp
;
//
comparison metho
d
};
/**
* Get type by id. Can return the Undefined type.
*/
static
const
Type
&
type
(
Uint32
typeId
);
/**
* Inline comparison method. Most or all real methods Type::m_cmp are
* implemented via this (trusting dead code elimination).
*/
static
int
cmp
(
Uint32
typeId
,
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
);
static
const
Type
&
getType
(
Uint32
typeId
);
private:
/**
...
...
@@ -127,323 +121,4 @@ private:
static
Cmp
cmpText
;
};
inline
int
NdbSqlUtil
::
cmp
(
Uint32
typeId
,
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
// XXX require size >= 1
if
(
size
>
full
)
return
CmpError
;
switch
((
Type
::
Enum
)
typeId
)
{
case
Type
:
:
Undefined
:
break
;
case
Type
:
:
Tinyint
:
{
if
(
size
>=
1
)
{
union
{
Uint32
p
[
1
];
Int8
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Tinyunsigned
:
{
if
(
size
>=
1
)
{
union
{
Uint32
p
[
1
];
Uint8
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Smallint
:
{
if
(
size
>=
1
)
{
union
{
Uint32
p
[
1
];
Int16
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Smallunsigned
:
{
if
(
size
>=
1
)
{
union
{
Uint32
p
[
1
];
Uint16
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Mediumint
:
{
if
(
size
>=
1
)
{
union
{
const
Uint32
*
p
;
const
unsigned
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
Int32
v1
=
sint3korr
(
u1
.
v
);
Int32
v2
=
sint3korr
(
u2
.
v
);
if
(
v1
<
v2
)
return
-
1
;
if
(
v1
>
v2
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Mediumunsigned
:
{
if
(
size
>=
1
)
{
union
{
const
Uint32
*
p
;
const
unsigned
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
Uint32
v1
=
uint3korr
(
u1
.
v
);
Uint32
v2
=
uint3korr
(
u2
.
v
);
if
(
v1
<
v2
)
return
-
1
;
if
(
v1
>
v2
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Int
:
{
if
(
size
>=
1
)
{
union
{
Uint32
p
[
1
];
Int32
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Unsigned
:
{
if
(
size
>=
1
)
{
union
{
Uint32
p
[
1
];
Uint32
v
;
}
u1
,
u2
;
u1
.
v
=
p1
[
0
];
u2
.
v
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Bigint
:
{
if
(
size
>=
2
)
{
union
{
Uint32
p
[
2
];
Int64
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u1
.
p
[
1
]
=
p1
[
1
];
u2
.
p
[
0
]
=
p2
[
0
];
u2
.
p
[
1
]
=
p2
[
1
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Bigunsigned
:
{
if
(
size
>=
2
)
{
union
{
Uint32
p
[
2
];
Uint64
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u1
.
p
[
1
]
=
p1
[
1
];
u2
.
p
[
0
]
=
p2
[
0
];
u2
.
p
[
1
]
=
p2
[
1
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Float
:
{
if
(
size
>=
1
)
{
union
{
Uint32
p
[
1
];
float
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Double
:
{
if
(
size
>=
2
)
{
union
{
Uint32
p
[
2
];
double
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u1
.
p
[
1
]
=
p1
[
1
];
u2
.
p
[
0
]
=
p2
[
0
];
u2
.
p
[
1
]
=
p2
[
1
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
case
Type
:
:
Decimal
:
// XXX not used by MySQL or NDB
break
;
case
Type
:
:
Char
:
{
/*
* Char is blank-padded to length and null-padded to word size.
* There is no terminator so we must compare the full values.
*/
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
size
<<
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
case
Type
:
:
Varchar
:
{
/*
* Varchar is not allowed to contain a null byte and the stored
* value is null-padded. Therefore comparison does not need to
* use the length.
*/
if
(
size
>=
1
)
{
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
// length in first 2 bytes
int
k
=
strncmp
(
u1
.
v
+
2
,
u2
.
v
+
2
,
(
size
<<
2
)
-
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
return
CmpUnknown
;
}
case
Type
:
:
Binary
:
{
// compare byte wise
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
size
<<
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
case
Type
:
:
Varbinary
:
{
// assume correctly padded and compare byte wise
if
(
size
>=
1
)
{
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
// length in first 2 bytes
int
k
=
memcmp
(
u1
.
v
+
2
,
u2
.
v
+
2
,
(
size
<<
2
)
-
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
return
CmpUnknown
;
}
case
Type
:
:
Datetime
:
{
/*
* Datetime is CC YY MM DD hh mm ss \0
*/
if
(
size
>=
1
)
{
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
// skip format check
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
4
);
if
(
k
!=
0
)
return
k
<
0
?
-
1
:
+
1
;
if
(
size
>=
2
)
{
k
=
memcmp
(
u1
.
v
+
4
,
u2
.
v
+
4
,
4
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
0
;
}
}
return
CmpUnknown
;
}
case
Type
:
:
Timespec
:
{
/*
* Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN
*/
if
(
size
>=
1
)
{
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
// skip format check
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
4
);
if
(
k
!=
0
)
return
k
<
0
?
-
1
:
+
1
;
if
(
size
>=
2
)
{
k
=
memcmp
(
u1
.
v
+
4
,
u2
.
v
+
4
,
4
);
if
(
k
!=
0
)
return
k
<
0
?
-
1
:
+
1
;
Uint32
n1
=
*
(
const
Uint32
*
)(
u1
.
v
+
8
);
Uint32
n2
=
*
(
const
Uint32
*
)(
u2
.
v
+
8
);
if
(
n1
<
n2
)
return
-
1
;
if
(
n2
>
n1
)
return
+
1
;
return
0
;
}
}
return
CmpUnknown
;
}
case
Type
:
:
Blob
:
{
// skip blob head, the rest is binary
const
unsigned
skip
=
NDB_BLOB_HEAD_SIZE
;
if
(
size
>=
skip
+
1
)
{
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
+
skip
;
u2
.
p
=
p2
+
skip
;
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
(
size
-
1
)
<<
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
return
CmpUnknown
;
}
case
Type
:
:
Text
:
{
// skip blob head, the rest is char
const
unsigned
skip
=
NDB_BLOB_HEAD_SIZE
;
if
(
size
>=
skip
+
1
)
{
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
+
skip
;
u2
.
p
=
p2
+
skip
;
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
(
size
-
1
)
<<
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
return
CmpUnknown
;
}
}
return
CmpError
;
}
#endif
ndb/src/common/util/NdbSqlUtil.cpp
View file @
76629f52
...
...
@@ -167,7 +167,7 @@ NdbSqlUtil::m_typeList[] = {
};
const
NdbSqlUtil
::
Type
&
NdbSqlUtil
::
t
ype
(
Uint32
typeId
)
NdbSqlUtil
::
getT
ype
(
Uint32
typeId
)
{
if
(
typeId
<
sizeof
(
m_typeList
)
/
sizeof
(
m_typeList
[
0
])
&&
m_typeList
[
typeId
].
m_typeId
!=
Type
::
Undefined
)
{
...
...
@@ -181,127 +181,352 @@ NdbSqlUtil::type(Uint32 typeId)
int
NdbSqlUtil
::
cmpTinyint
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Tinyint
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
union
{
Uint32
p
[
1
];
Int8
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
int
NdbSqlUtil
::
cmpTinyunsigned
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Tinyunsigned
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
union
{
Uint32
p
[
1
];
Uint8
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
int
NdbSqlUtil
::
cmpSmallint
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Smallint
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
union
{
Uint32
p
[
1
];
Int16
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
int
NdbSqlUtil
::
cmpSmallunsigned
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Smallunsigned
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
union
{
Uint32
p
[
1
];
Uint16
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
int
NdbSqlUtil
::
cmpMediumint
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Mediumint
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
union
{
const
Uint32
*
p
;
const
unsigned
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
Int32
v1
=
sint3korr
(
u1
.
v
);
Int32
v2
=
sint3korr
(
u2
.
v
);
if
(
v1
<
v2
)
return
-
1
;
if
(
v1
>
v2
)
return
+
1
;
return
0
;
}
int
NdbSqlUtil
::
cmpMediumunsigned
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Mediumunsigned
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
union
{
const
Uint32
*
p
;
const
unsigned
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
Uint32
v1
=
uint3korr
(
u1
.
v
);
Uint32
v2
=
uint3korr
(
u2
.
v
);
if
(
v1
<
v2
)
return
-
1
;
if
(
v1
>
v2
)
return
+
1
;
return
0
;
}
int
NdbSqlUtil
::
cmpInt
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Int
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
union
{
Uint32
p
[
1
];
Int32
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
int
NdbSqlUtil
::
cmpUnsigned
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Unsigned
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
union
{
Uint32
p
[
1
];
Uint32
v
;
}
u1
,
u2
;
u1
.
v
=
p1
[
0
];
u2
.
v
=
p2
[
0
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
int
NdbSqlUtil
::
cmpBigint
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Bigint
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
if
(
size
>=
2
)
{
union
{
Uint32
p
[
2
];
Int64
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u1
.
p
[
1
]
=
p1
[
1
];
u2
.
p
[
0
]
=
p2
[
0
];
u2
.
p
[
1
]
=
p2
[
1
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpBigunsigned
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Bigunsigned
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
if
(
size
>=
2
)
{
union
{
Uint32
p
[
2
];
Uint64
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u1
.
p
[
1
]
=
p1
[
1
];
u2
.
p
[
0
]
=
p2
[
0
];
u2
.
p
[
1
]
=
p2
[
1
];
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpFloat
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Float
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
union
{
Uint32
p
[
1
];
float
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u2
.
p
[
0
]
=
p2
[
0
];
// no format check
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
int
NdbSqlUtil
::
cmpDouble
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Double
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
if
(
size
>=
2
)
{
union
{
Uint32
p
[
2
];
double
v
;
}
u1
,
u2
;
u1
.
p
[
0
]
=
p1
[
0
];
u1
.
p
[
1
]
=
p1
[
1
];
u2
.
p
[
0
]
=
p2
[
0
];
u2
.
p
[
1
]
=
p2
[
1
];
// no format check
if
(
u1
.
v
<
u2
.
v
)
return
-
1
;
if
(
u1
.
v
>
u2
.
v
)
return
+
1
;
return
0
;
}
return
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpDecimal
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Decimal
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
// not used by MySQL or NDB
assert
(
false
);
return
0
;
}
int
NdbSqlUtil
::
cmpChar
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Char
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
/*
* Char is blank-padded to length and null-padded to word size. There
* is no terminator so we compare the full values.
*/
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
size
<<
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpVarchar
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Varchar
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
/*
* Varchar is not allowed to contain a null byte and the value is
* null-padded. Therefore comparison does not need to use the length.
*/
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
// skip length in first 2 bytes
int
k
=
strncmp
(
u1
.
v
+
2
,
u2
.
v
+
2
,
(
size
<<
2
)
-
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpBinary
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Binary
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
/*
* Binary data of full length. Compare bytewise.
*/
union
{
const
Uint32
*
p
;
const
unsigned
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
size
<<
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpVarbinary
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Varbinary
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
/*
* Binary data of variable length padded with nulls. The comparison
* does not need to use the length.
*/
union
{
const
Uint32
*
p
;
const
unsigned
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
// skip length in first 2 bytes
int
k
=
memcmp
(
u1
.
v
+
2
,
u2
.
v
+
2
,
(
size
<<
2
)
-
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpDatetime
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Datetime
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
/*
* Datetime is CC YY MM DD hh mm ss \0
*/
union
{
const
Uint32
*
p
;
const
unsigned
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
// no format check
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
4
);
if
(
k
!=
0
)
return
k
<
0
?
-
1
:
+
1
;
if
(
size
>=
2
)
{
k
=
memcmp
(
u1
.
v
+
4
,
u2
.
v
+
4
,
4
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
0
;
}
return
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpTimespec
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Timespec
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
/*
* Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN
*/
union
{
const
Uint32
*
p
;
const
unsigned
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
;
u2
.
p
=
p2
;
// no format check
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
4
);
if
(
k
!=
0
)
return
k
<
0
?
-
1
:
+
1
;
if
(
size
>=
2
)
{
k
=
memcmp
(
u1
.
v
+
4
,
u2
.
v
+
4
,
4
);
if
(
k
!=
0
)
return
k
<
0
?
-
1
:
+
1
;
if
(
size
>=
3
)
{
Uint32
n1
=
*
(
const
Uint32
*
)(
u1
.
v
+
8
);
Uint32
n2
=
*
(
const
Uint32
*
)(
u2
.
v
+
8
);
if
(
n1
<
n2
)
return
-
1
;
if
(
n2
>
n1
)
return
+
1
;
return
0
;
}
}
return
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpBlob
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Blob
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
/*
* Blob comparison is on the inline bytes. Except for larger header
* the format is like Varbinary.
*/
const
unsigned
head
=
NDB_BLOB_HEAD_SIZE
;
// skip blob head
if
(
size
>=
head
+
1
)
{
union
{
const
Uint32
*
p
;
const
unsigned
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
+
head
;
u2
.
p
=
p2
+
head
;
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
(
size
-
head
)
<<
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
return
CmpUnknown
;
}
int
NdbSqlUtil
::
cmpText
(
const
Uint32
*
p1
,
const
Uint32
*
p2
,
Uint32
full
,
Uint32
size
)
{
return
cmp
(
Type
::
Text
,
p1
,
p2
,
full
,
size
);
assert
(
full
>=
size
&&
size
>
0
);
/*
* Text comparison is on the inline bytes. Except for larger header
* the format is like Varchar.
*/
const
unsigned
head
=
NDB_BLOB_HEAD_SIZE
;
// skip blob head
if
(
size
>=
head
+
1
)
{
union
{
const
Uint32
*
p
;
const
char
*
v
;
}
u1
,
u2
;
u1
.
p
=
p1
+
head
;
u2
.
p
=
p2
+
head
;
int
k
=
memcmp
(
u1
.
v
,
u2
.
v
,
(
size
-
head
)
<<
2
);
return
k
<
0
?
-
1
:
k
>
0
?
+
1
:
full
==
size
?
0
:
CmpUnknown
;
}
return
CmpUnknown
;
}
#ifdef NDB_SQL_UTIL_TEST
...
...
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
View file @
76629f52
...
...
@@ -35,7 +35,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
searchKey
+=
start
;
int
ret
=
0
;
while
(
start
<
numAttrs
)
{
if
(
len2
<
AttributeHeaderSize
)
{
if
(
len2
<
=
AttributeHeaderSize
)
{
jam
();
ret
=
NdbSqlUtil
::
CmpUnknown
;
break
;
...
...
@@ -46,7 +46,8 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
jam
();
// current attribute
const
DescAttr
&
descAttr
=
descEnt
.
m_descAttr
[
start
];
const
unsigned
typeId
=
descAttr
.
m_typeId
;
const
NdbSqlUtil
::
Type
&
type
=
NdbSqlUtil
::
getType
(
descAttr
.
m_typeId
);
ndbassert
(
type
.
m_typeId
!=
NdbSqlUtil
::
Type
::
Undefined
);
// full data size
const
unsigned
size1
=
AttributeDescriptor
::
getSizeInWords
(
descAttr
.
m_attrDesc
);
ndbrequire
(
size1
!=
0
&&
size1
==
entryData
.
ah
().
getDataSize
());
...
...
@@ -55,7 +56,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
// compare
const
Uint32
*
const
p1
=
*
searchKey
;
const
Uint32
*
const
p2
=
&
entryData
[
AttributeHeaderSize
];
ret
=
NdbSqlUtil
::
cmp
(
typeId
,
p1
,
p2
,
size1
,
size2
);
ret
=
(
*
type
.
m_cmp
)(
p1
,
p2
,
size1
,
size2
);
if
(
ret
!=
0
)
{
jam
();
break
;
...
...
@@ -78,8 +79,6 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
entryData
+=
AttributeHeaderSize
+
entryData
.
ah
().
getDataSize
();
start
++
;
}
// XXX until data format errors are handled
ndbrequire
(
ret
!=
NdbSqlUtil
::
CmpError
);
return
ret
;
}
...
...
@@ -103,13 +102,14 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Tabl
jam
();
// current attribute
const
DescAttr
&
descAttr
=
descEnt
.
m_descAttr
[
start
];
const
unsigned
typeId
=
descAttr
.
m_typeId
;
const
NdbSqlUtil
::
Type
&
type
=
NdbSqlUtil
::
getType
(
descAttr
.
m_typeId
);
ndbassert
(
type
.
m_typeId
!=
NdbSqlUtil
::
Type
::
Undefined
);
// full data size
const
unsigned
size1
=
AttributeDescriptor
::
getSizeInWords
(
descAttr
.
m_attrDesc
);
// compare
const
Uint32
*
const
p1
=
*
searchKey
;
const
Uint32
*
const
p2
=
*
entryKey
;
ret
=
NdbSqlUtil
::
cmp
(
typeId
,
p1
,
p2
,
size1
,
size1
);
ret
=
(
*
type
.
m_cmp
)(
p1
,
p2
,
size1
,
size1
);
if
(
ret
!=
0
)
{
jam
();
break
;
...
...
@@ -132,8 +132,6 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Tabl
entryKey
+=
1
;
start
++
;
}
// XXX until data format errors are handled
ndbrequire
(
ret
!=
NdbSqlUtil
::
CmpError
);
return
ret
;
}
...
...
@@ -172,7 +170,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
*/
unsigned
type
=
4
;
while
(
boundCount
!=
0
)
{
if
(
len2
<
AttributeHeaderSize
)
{
if
(
len2
<
=
AttributeHeaderSize
)
{
jam
();
return
NdbSqlUtil
::
CmpUnknown
;
}
...
...
@@ -186,7 +184,8 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
// current attribute
const
unsigned
index
=
boundInfo
.
ah
().
getAttributeId
();
const
DescAttr
&
descAttr
=
descEnt
.
m_descAttr
[
index
];
const
unsigned
typeId
=
descAttr
.
m_typeId
;
const
NdbSqlUtil
::
Type
&
type
=
NdbSqlUtil
::
getType
(
descAttr
.
m_typeId
);
ndbassert
(
type
.
m_typeId
!=
NdbSqlUtil
::
Type
::
Undefined
);
ndbrequire
(
entryData
.
ah
().
getAttributeId
()
==
descAttr
.
m_primaryAttrId
);
// full data size
const
unsigned
size1
=
boundInfo
.
ah
().
getDataSize
();
...
...
@@ -196,9 +195,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
// compare
const
Uint32
*
const
p1
=
&
boundInfo
[
AttributeHeaderSize
];
const
Uint32
*
const
p2
=
&
entryData
[
AttributeHeaderSize
];
int
ret
=
NdbSqlUtil
::
cmp
(
typeId
,
p1
,
p2
,
size1
,
size2
);
// XXX until data format errors are handled
ndbrequire
(
ret
!=
NdbSqlUtil
::
CmpError
);
int
ret
=
(
*
type
.
m_cmp
)(
p1
,
p2
,
size1
,
size2
);
if
(
ret
!=
0
)
{
jam
();
return
ret
;
...
...
@@ -269,15 +266,14 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
// current attribute
const
unsigned
index
=
boundInfo
.
ah
().
getAttributeId
();
const
DescAttr
&
descAttr
=
descEnt
.
m_descAttr
[
index
];
const
unsigned
typeId
=
descAttr
.
m_typeId
;
const
NdbSqlUtil
::
Type
&
type
=
NdbSqlUtil
::
getType
(
descAttr
.
m_typeId
);
ndbassert
(
type
.
m_typeId
!=
NdbSqlUtil
::
Type
::
Undefined
);
// full data size
const
unsigned
size1
=
AttributeDescriptor
::
getSizeInWords
(
descAttr
.
m_attrDesc
);
// compare
const
Uint32
*
const
p1
=
&
boundInfo
[
AttributeHeaderSize
];
const
Uint32
*
const
p2
=
*
entryKey
;
int
ret
=
NdbSqlUtil
::
cmp
(
typeId
,
p1
,
p2
,
size1
,
size1
);
// XXX until data format errors are handled
ndbrequire
(
ret
!=
NdbSqlUtil
::
CmpError
);
int
ret
=
(
*
type
.
m_cmp
)(
p1
,
p2
,
size1
,
size1
);
if
(
ret
!=
0
)
{
jam
();
return
ret
;
...
...
ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
View file @
76629f52
...
...
@@ -184,7 +184,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
}
#endif
// check if type is valid and has a comparison method
const
NdbSqlUtil
::
Type
&
type
=
NdbSqlUtil
::
t
ype
(
descAttr
.
m_typeId
);
const
NdbSqlUtil
::
Type
&
type
=
NdbSqlUtil
::
getT
ype
(
descAttr
.
m_typeId
);
if
(
type
.
m_typeId
==
NdbSqlUtil
::
Type
::
Undefined
||
type
.
m_cmp
==
0
)
{
jam
();
...
...
ndb/src/kernel/blocks/dbtux/Times.txt
View file @
76629f52
...
...
@@ -28,6 +28,8 @@ d
shows ms / 1000 rows for each and index time overhead
samples 10% of all PKs (100,000 pk reads, 100,000 scans)
the "pct" values are from more accurate total times (not shown)
040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct
...
...
@@ -75,11 +77,19 @@ optim 13 mc02/a 40 ms 57 ms 42 pct
mc02/d 170 ms 256 ms 50 pct
after wl-1884 store all-NULL keys (the tests have pctnull=10 per column)
[ what happened to PK read performance? ]
optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/b 47 ms 77 ms 61 pct
mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct
[ case d: what happened to PK read performance? ]
optim 14 mc02/a 41 ms 60 ms 44 pct
mc02/b 46 ms 81 ms 73 pct
mc02/c 9 ms 13 ms 37 pct
mc02/d 242 ms 285 ms 17 pct
[ case b: do long keys suffer from many subroutine calls? ]
vim: set et:
ndb/src/ndbapi/NdbOperationInt.cpp
View file @
76629f52
...
...
@@ -732,6 +732,9 @@ int
NdbOperation
::
read_attr
(
const
NdbColumnImpl
*
anAttrObject
,
Uint32
RegDest
)
{
INT_DEBUG
((
"read_attr %d %u"
,
anAttrObject
->
m_attrId
,
RegDest
));
if
(
initial_interpreterCheck
()
==
-
1
)
return
-
1
;
int
tAttrId
=
read_attrCheck
(
anAttrObject
);
if
(
tAttrId
==
-
1
)
goto
read_attr_error1
;
...
...
ndb/src/ndbapi/NdbScanOperation.cpp
View file @
76629f52
...
...
@@ -1220,10 +1220,10 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
Uint32
type
=
NdbColumnImpl
::
getImpl
(
*
r1
->
m_column
).
m_extType
;
Uint32
size
=
(
r1
->
theAttrSize
*
r1
->
theArraySize
+
3
)
/
4
;
if
(
!
r1_null
){
char
r
=
NdbSqlUtil
::
cmp
(
type
,
d1
,
d2
,
size
,
size
);
const
NdbSqlUtil
::
Type
&
t
=
NdbSqlUtil
::
getType
(
type
);
int
r
=
(
*
t
.
m_cmp
)(
d1
,
d2
,
size
,
size
);
if
(
r
){
assert
(
r
!=
NdbSqlUtil
::
CmpUnknown
);
assert
(
r
!=
NdbSqlUtil
::
CmpError
);
return
r
;
}
}
...
...
ndb/test/ndbapi/Makefile.am
View file @
76629f52
...
...
@@ -30,7 +30,7 @@ testSystemRestart \
testTimeout
\
testTransactions
\
testDeadlock
\
test_event
test_event
ndbapi_slow_select
#flexTimedAsynch
#testBlobs
...
...
@@ -66,6 +66,7 @@ testTimeout_SOURCES = testTimeout.cpp
testTransactions_SOURCES
=
testTransactions.cpp
testDeadlock_SOURCES
=
testDeadlock.cpp
test_event_SOURCES
=
test_event.cpp
ndbapi_slow_select_SOURCES
=
slow_select.cpp
INCLUDES_LOC
=
-I
$(top_srcdir)
/ndb/include/kernel
...
...
ndb/test/ndbapi/slow_select.cpp
0 → 100644
View file @
76629f52
#include <NdbApi.hpp>
#include <NdbOut.hpp>
#include <NdbTick.h>
struct
S_Scan
{
const
char
*
m_table
;
const
char
*
m_index
;
NdbIndexScanOperation
*
m_scan
;
NdbResultSet
*
m_result
;
Uint32
metaid
;
Uint32
match_count
;
Uint32
row_count
;
};
static
S_Scan
g_scans
[]
=
{
{
"affiliatestometa"
,
"ind_affiliatestometa"
,
0
,
0
,
0
,
0
,
0
},
{
"media"
,
"metaid"
,
0
,
0
,
0
,
0
,
0
},
{
"meta"
,
"PRIMARY"
,
0
,
0
,
0
,
0
,
0
},
{
"artiststometamap"
,
"PRIMARY"
,
0
,
0
,
0
,
0
,
0
},
{
"subgenrestometamap"
,
"metaid"
,
0
,
0
,
0
,
0
,
0
}
};
#define require(x) if(!(x)) { ndbout << "LINE: " << __LINE__ << endl;abort(); }
#define require2(o, x) if(!(x)) { ndbout << o->getNdbError() << endl; abort(); }
Uint32
g_affiliateid
=
2
;
Uint32
g_formatids
[]
=
{
8
,
31
,
76
};
Uint64
start
;
Uint32
g_artistid
=
0
;
Uint32
g_subgenreid
=
0
;
NdbConnection
*
g_trans
=
0
;
static
void
lookup
();
int
main
(
void
){
Ndb
g_ndb
(
"test"
);
g_ndb
.
init
(
1024
);
require
(
g_ndb
.
waitUntilReady
()
==
0
);
g_trans
=
g_ndb
.
startTransaction
();
require
(
g_trans
);
size_t
i
,
j
;
const
size_t
cnt
=
sizeof
(
g_scans
)
/
sizeof
(
g_scans
[
0
]);
start
=
NdbTick_CurrentMillisecond
();
for
(
i
=
0
;
i
<
cnt
;
i
++
){
ndbout_c
(
"starting scan on: %s %s"
,
g_scans
[
i
].
m_table
,
g_scans
[
i
].
m_index
);
g_scans
[
i
].
m_scan
=
g_trans
->
getNdbIndexScanOperation
(
g_scans
[
i
].
m_index
,
g_scans
[
i
].
m_table
);
NdbIndexScanOperation
*
scan
=
g_scans
[
i
].
m_scan
;
require
(
scan
);
g_scans
[
i
].
m_result
=
scan
->
readTuples
(
NdbScanOperation
::
LM_CommittedRead
,
0
,
0
,
true
);
require
(
g_scans
[
i
].
m_result
);
}
require
(
!
g_scans
[
0
].
m_scan
->
setBound
((
Uint32
)
0
,
NdbIndexScanOperation
::
BoundEQ
,
&
g_affiliateid
,
sizeof
(
g_affiliateid
)));
#if 0
require(!g_scans[1].m_scan->setBound((Uint32)0,
NdbIndexScanOperation::BoundLE,
&g_formatids[0],
sizeof(g_formatids[0])));
#endif
NdbScanFilter
sf
(
g_scans
[
1
].
m_scan
);
sf
.
begin
(
NdbScanFilter
::
OR
);
sf
.
eq
(
2
,
g_formatids
[
0
]);
sf
.
eq
(
2
,
g_formatids
[
1
]);
sf
.
eq
(
2
,
g_formatids
[
2
]);
sf
.
end
();
// affiliatestometa
require
(
g_scans
[
0
].
m_scan
->
getValue
(
"uniquekey"
));
require
(
g_scans
[
0
].
m_scan
->
getValue
(
"xml"
));
// media
require
(
g_scans
[
1
].
m_scan
->
getValue
(
"path"
));
require
(
g_scans
[
1
].
m_scan
->
getValue
(
"mediaid"
));
require
(
g_scans
[
1
].
m_scan
->
getValue
(
"formatid"
));
// meta
require
(
g_scans
[
2
].
m_scan
->
getValue
(
"name"
));
require
(
g_scans
[
2
].
m_scan
->
getValue
(
"xml"
));
// artiststometamap
require
(
g_scans
[
3
].
m_scan
->
getValue
(
"artistid"
,
(
char
*
)
&
g_artistid
));
// subgenrestometamap
require
(
g_scans
[
4
].
m_scan
->
getValue
(
"subgenreid"
,
(
char
*
)
&
g_subgenreid
));
for
(
i
=
0
;
i
<
cnt
;
i
++
){
g_scans
[
i
].
m_scan
->
getValue
(
"metaid"
,
(
char
*
)
&
g_scans
[
i
].
metaid
);
}
g_trans
->
execute
(
NoCommit
,
AbortOnError
,
1
);
Uint32
max_val
=
0
;
Uint32
match_val
=
0
;
S_Scan
*
F
[
5
],
*
Q
[
5
],
*
nextF
[
5
];
Uint32
F_sz
=
0
,
Q_sz
=
0
;
for
(
i
=
0
;
i
<
cnt
;
i
++
){
F_sz
++
;
F
[
i
]
=
&
g_scans
[
i
];
}
Uint32
match_count
=
0
;
while
(
F_sz
>
0
){
Uint32
prev_F_sz
=
F_sz
;
F_sz
=
0
;
bool
found
=
false
;
//for(i = 0; i<cnt; i++)
//ndbout_c("%s - %d", g_scans[i].m_table, g_scans[i].metaid);
for
(
i
=
0
;
i
<
prev_F_sz
;
i
++
){
int
res
=
F
[
i
]
->
m_result
->
nextResult
();
if
(
res
==
-
1
)
abort
();
if
(
res
==
1
){
continue
;
}
Uint32
metaid
=
F
[
i
]
->
metaid
;
F
[
i
]
->
row_count
++
;
if
(
metaid
==
match_val
){
//ndbout_c("flera");
nextF
[
F_sz
++
]
=
F
[
i
];
require
(
F_sz
>=
0
&&
F_sz
<=
cnt
);
F
[
i
]
->
match_count
++
;
Uint32
comb
=
1
;
for
(
j
=
0
;
j
<
cnt
;
j
++
){
comb
*=
(
&
g_scans
[
j
]
==
F
[
i
]
?
1
:
g_scans
[
j
].
match_count
);
}
match_count
+=
comb
;
found
=
true
;
continue
;
}
if
(
metaid
<
max_val
){
nextF
[
F_sz
++
]
=
F
[
i
];
require
(
F_sz
>=
0
&&
F_sz
<=
cnt
);
continue
;
}
if
(
metaid
>
max_val
){
for
(
j
=
0
;
j
<
Q_sz
;
j
++
)
nextF
[
F_sz
++
]
=
Q
[
j
];
require
(
F_sz
>=
0
&&
F_sz
<=
cnt
);
Q_sz
=
0
;
max_val
=
metaid
;
}
Q
[
Q_sz
++
]
=
F
[
i
];
require
(
Q_sz
>=
0
&&
Q_sz
<=
cnt
);
}
if
(
F_sz
==
0
&&
Q_sz
>
0
){
match_val
=
max_val
;
for
(
j
=
0
;
j
<
Q_sz
;
j
++
){
nextF
[
F_sz
++
]
=
Q
[
j
];
Q
[
j
]
->
match_count
=
1
;
}
require
(
F_sz
>=
0
&&
F_sz
<=
cnt
);
require
(
Q_sz
>=
0
&&
Q_sz
<=
cnt
);
Q_sz
=
0
;
match_count
++
;
lookup
();
}
else
if
(
!
found
&&
F_sz
+
Q_sz
<
cnt
){
F_sz
=
0
;
}
require
(
F_sz
>=
0
&&
F_sz
<=
cnt
);
for
(
i
=
0
;
i
<
F_sz
;
i
++
)
F
[
i
]
=
nextF
[
i
];
}
start
=
NdbTick_CurrentMillisecond
()
-
start
;
ndbout_c
(
"Elapsed: %lldms"
,
start
);
ndbout_c
(
"rows: %d"
,
match_count
);
for
(
i
=
0
;
i
<
cnt
;
i
++
){
ndbout_c
(
"%s : %d"
,
g_scans
[
i
].
m_table
,
g_scans
[
i
].
row_count
);
}
}
static
void
lookup
(){
{
NdbOperation
*
op
=
g_trans
->
getNdbOperation
(
"artists"
);
require2
(
g_trans
,
op
);
require2
(
op
,
op
->
readTuple
()
==
0
);
require2
(
op
,
op
->
equal
(
"artistid"
,
g_artistid
)
==
0
);
require2
(
op
,
op
->
getValue
(
"name"
));
}
{
NdbOperation
*
op
=
g_trans
->
getNdbOperation
(
"subgenres"
);
require2
(
g_trans
,
op
);
require2
(
op
,
op
->
readTuple
()
==
0
);
require2
(
op
,
op
->
equal
(
"subgenreid"
,
g_subgenreid
)
==
0
);
require2
(
op
,
op
->
getValue
(
"name"
));
}
static
int
loop
=
0
;
if
(
loop
++
>=
16
){
loop
=
0
;
require
(
g_trans
->
execute
(
NoCommit
)
==
0
);
}
//require(g_trans->restart() == 0);
}
ndb/test/ndbapi/testOIBasic.cpp
View file @
76629f52
...
...
@@ -34,6 +34,7 @@
struct
Opt
{
// common options
unsigned
m_batch
;
const
char
*
m_bound
;
const
char
*
m_case
;
bool
m_core
;
bool
m_dups
;
...
...
@@ -43,6 +44,7 @@ struct Opt {
unsigned
m_loop
;
bool
m_nologging
;
bool
m_msglock
;
unsigned
m_pctnull
;
unsigned
m_rows
;
unsigned
m_samples
;
unsigned
m_scanrd
;
...
...
@@ -54,6 +56,7 @@ struct Opt {
unsigned
m_v
;
Opt
()
:
m_batch
(
32
),
m_bound
(
"01234"
),
m_case
(
0
),
m_core
(
false
),
m_dups
(
false
),
...
...
@@ -63,6 +66,7 @@ struct Opt {
m_loop
(
1
),
m_nologging
(
false
),
m_msglock
(
true
),
m_pctnull
(
10
),
m_rows
(
1000
),
m_samples
(
0
),
m_scanrd
(
240
),
...
...
@@ -87,6 +91,7 @@ printhelp()
ndbout
<<
"usage: testOIbasic [options]"
<<
endl
<<
" -batch N pk operations in batch ["
<<
d
.
m_batch
<<
"]"
<<
endl
<<
" -bound xyz use only these bound types 0-4 ["
<<
d
.
m_bound
<<
"]"
<<
endl
<<
" -case abc only given test cases (letters a-z)"
<<
endl
<<
" -core core dump on error ["
<<
d
.
m_core
<<
"]"
<<
endl
<<
" -dups allow duplicate tuples from index scan ["
<<
d
.
m_dups
<<
"]"
<<
endl
...
...
@@ -94,6 +99,7 @@ printhelp()
<<
" -index xyz only given index numbers (digits 1-9)"
<<
endl
<<
" -loop N loop count full suite 0=forever ["
<<
d
.
m_loop
<<
"]"
<<
endl
<<
" -nologging create tables in no-logging mode"
<<
endl
<<
" -pctnull N pct NULL values in nullable column ["
<<
d
.
m_pctnull
<<
"]"
<<
endl
<<
" -rows N rows per thread ["
<<
d
.
m_rows
<<
"]"
<<
endl
<<
" -samples N samples for some timings (0=all) ["
<<
d
.
m_samples
<<
"]"
<<
endl
<<
" -scanrd N scan read parallelism ["
<<
d
.
m_scanrd
<<
"]"
<<
endl
...
...
@@ -198,7 +204,6 @@ struct Par : public Opt {
Tmr
&
tmr
()
const
{
assert
(
m_tmr
!=
0
);
return
*
m_tmr
;
}
unsigned
m_totrows
;
// value calculation
unsigned
m_pctnull
;
unsigned
m_range
;
unsigned
m_pctrange
;
// do verify after read
...
...
@@ -214,7 +219,6 @@ struct Par : public Opt {
m_set
(
0
),
m_tmr
(
0
),
m_totrows
(
m_threads
*
m_rows
),
m_pctnull
(
10
),
m_range
(
m_rows
),
m_pctrange
(
0
),
m_verify
(
false
),
...
...
@@ -1622,7 +1626,6 @@ Set::calc(Par par, unsigned i)
m_row
[
i
]
=
new
Row
(
tab
);
Row
&
row
=
*
m_row
[
i
];
// value generation parameters
par
.
m_pctnull
=
10
;
par
.
m_pctrange
=
40
;
row
.
calc
(
par
,
i
);
}
...
...
@@ -1898,8 +1901,11 @@ BSet::calc(Par par)
BVal
&
bval
=
*
new
BVal
(
icol
);
m_bval
[
m_bvals
++
]
=
&
bval
;
bval
.
m_null
=
false
;
// equality bound only on i==0
unsigned
sel
=
urandom
(
5
-
i
);
unsigned
sel
;
do
{
// equality bound only on i==0
sel
=
urandom
(
5
-
i
);
}
while
(
strchr
(
par
.
m_bound
,
'0'
+
sel
)
==
0
);
if
(
sel
<
2
)
bval
.
m_type
=
0
|
(
1
<<
i
);
else
if
(
sel
<
4
)
...
...
@@ -3207,6 +3213,15 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
continue
;
}
}
if
(
strcmp
(
arg
,
"-bound"
)
==
0
)
{
if
(
++
argv
,
--
argc
>
0
)
{
const
char
*
p
=
argv
[
0
];
if
(
strlen
(
p
)
!=
0
&&
strlen
(
p
)
==
strspn
(
p
,
"01234"
))
{
g_opt
.
m_bound
=
strdup
(
p
);
continue
;
}
}
}
if
(
strcmp
(
arg
,
"-case"
)
==
0
)
{
if
(
++
argv
,
--
argc
>
0
)
{
g_opt
.
m_case
=
strdup
(
argv
[
0
]);
...
...
@@ -3257,6 +3272,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
g_opt
.
m_nologging
=
true
;
continue
;
}
if
(
strcmp
(
arg
,
"-pctnull"
)
==
0
)
{
if
(
++
argv
,
--
argc
>
0
)
{
g_opt
.
m_pctnull
=
atoi
(
argv
[
0
]);
continue
;
}
}
if
(
strcmp
(
arg
,
"-rows"
)
==
0
)
{
if
(
++
argv
,
--
argc
>
0
)
{
g_opt
.
m_rows
=
atoi
(
argv
[
0
]);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment