Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
e4b6c25c
Commit
e4b6c25c
authored
Jan 12, 2005
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge bk-internal:/home/bk/mysql-5.0-ndb
into build.mysql.com:/users/tulin/mysql-5.0-ndb
parents
f34fee6d
d6cb2dbe
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
337 additions
and
192 deletions
+337
-192
ndb/include/kernel/signaldata/SignalData.hpp
ndb/include/kernel/signaldata/SignalData.hpp
+7
-0
ndb/src/common/debugger/signaldata/SignalDataPrint.cpp
ndb/src/common/debugger/signaldata/SignalDataPrint.cpp
+7
-1
ndb/src/common/debugger/signaldata/SignalNames.cpp
ndb/src/common/debugger/signaldata/SignalNames.cpp
+3
-0
ndb/src/common/debugger/signaldata/SumaImpl.cpp
ndb/src/common/debugger/signaldata/SumaImpl.cpp
+75
-1
ndb/src/kernel/blocks/suma/Suma.cpp
ndb/src/kernel/blocks/suma/Suma.cpp
+3
-1
ndb/src/ndbapi/NdbDictionaryImpl.cpp
ndb/src/ndbapi/NdbDictionaryImpl.cpp
+107
-115
ndb/src/ndbapi/NdbEventOperationImpl.cpp
ndb/src/ndbapi/NdbEventOperationImpl.cpp
+115
-68
ndb/test/src/HugoTransactions.cpp
ndb/test/src/HugoTransactions.cpp
+20
-6
No files found.
ndb/include/kernel/signaldata/SignalData.hpp
View file @
e4b6c25c
...
...
@@ -177,9 +177,16 @@ GSN_PRINT_SIGNATURE(printFAIL_REP);
GSN_PRINT_SIGNATURE
(
printDISCONNECT_REP
);
GSN_PRINT_SIGNATURE
(
printSUB_CREATE_REQ
);
GSN_PRINT_SIGNATURE
(
printSUB_CREATE_CONF
);
GSN_PRINT_SIGNATURE
(
printSUB_CREATE_REF
);
GSN_PRINT_SIGNATURE
(
printSUB_REMOVE_REQ
);
GSN_PRINT_SIGNATURE
(
printSUB_REMOVE_CONF
);
GSN_PRINT_SIGNATURE
(
printSUB_REMOVE_REF
);
GSN_PRINT_SIGNATURE
(
printSUB_START_REQ
);
GSN_PRINT_SIGNATURE
(
printSUB_START_REF
);
GSN_PRINT_SIGNATURE
(
printSUB_START_CONF
);
GSN_PRINT_SIGNATURE
(
printSUB_STOP_REQ
);
GSN_PRINT_SIGNATURE
(
printSUB_STOP_REF
);
GSN_PRINT_SIGNATURE
(
printSUB_STOP_CONF
);
GSN_PRINT_SIGNATURE
(
printSUB_SYNC_REQ
);
GSN_PRINT_SIGNATURE
(
printSUB_SYNC_REF
);
GSN_PRINT_SIGNATURE
(
printSUB_SYNC_CONF
);
...
...
ndb/src/common/debugger/signaldata/SignalDataPrint.cpp
View file @
e4b6c25c
...
...
@@ -151,11 +151,17 @@ SignalDataPrintFunctions[] = {
{
GSN_DISCONNECT_REP
,
printDISCONNECT_REP
},
{
GSN_SUB_CREATE_REQ
,
printSUB_CREATE_REQ
},
//
{ GSN_SUB_CREATE_REF, printSUB_CREATE_REF },
{
GSN_SUB_CREATE_REF
,
printSUB_CREATE_REF
},
{
GSN_SUB_CREATE_CONF
,
printSUB_CREATE_CONF
},
{
GSN_SUB_REMOVE_REQ
,
printSUB_REMOVE_REQ
},
{
GSN_SUB_REMOVE_REF
,
printSUB_REMOVE_REF
},
{
GSN_SUB_REMOVE_CONF
,
printSUB_REMOVE_CONF
},
{
GSN_SUB_START_REQ
,
printSUB_START_REQ
},
{
GSN_SUB_START_REF
,
printSUB_START_REF
},
{
GSN_SUB_START_CONF
,
printSUB_START_CONF
},
{
GSN_SUB_STOP_REQ
,
printSUB_STOP_REQ
},
{
GSN_SUB_STOP_REF
,
printSUB_STOP_REF
},
{
GSN_SUB_STOP_CONF
,
printSUB_STOP_CONF
},
{
GSN_SUB_SYNC_REQ
,
printSUB_SYNC_REQ
},
{
GSN_SUB_SYNC_REF
,
printSUB_SYNC_REF
},
{
GSN_SUB_SYNC_CONF
,
printSUB_SYNC_CONF
},
...
...
ndb/src/common/debugger/signaldata/SignalNames.cpp
View file @
e4b6c25c
...
...
@@ -578,6 +578,9 @@ const GsnName SignalNames [] = {
,{
GSN_SUB_CREATE_REQ
,
"SUB_CREATE_REQ"
}
,{
GSN_SUB_CREATE_REF
,
"SUB_CREATE_REF"
}
,{
GSN_SUB_CREATE_CONF
,
"SUB_CREATE_CONF"
}
,{
GSN_SUB_REMOVE_REQ
,
"SUB_REMOVE_REQ"
}
,{
GSN_SUB_REMOVE_REF
,
"SUB_REMOVE_REF"
}
,{
GSN_SUB_REMOVE_CONF
,
"SUB_REMOVE_CONF"
}
,{
GSN_SUB_START_REQ
,
"SUB_START_REQ"
}
,{
GSN_SUB_START_REF
,
"SUB_START_REF"
}
,{
GSN_SUB_START_CONF
,
"SUB_START_CONF"
}
...
...
ndb/src/common/debugger/signaldata/SumaImpl.cpp
View file @
e4b6c25c
...
...
@@ -39,13 +39,56 @@ printSUB_CREATE_CONF(FILE * output, const Uint32 * theData,
return
false
;
}
bool
printSUB_CREATE_REF
(
FILE
*
output
,
const
Uint32
*
theData
,
Uint32
len
,
Uint16
receiverBlockNo
)
{
const
SubCreateRef
*
const
sig
=
(
SubCreateRef
*
)
theData
;
fprintf
(
output
,
" subscriptionId: %x
\n
"
,
sig
->
subscriptionId
);
fprintf
(
output
,
" subscriptionKey: %x
\n
"
,
sig
->
subscriptionKey
);
fprintf
(
output
,
" subscriberData: %x
\n
"
,
sig
->
subscriberData
);
return
false
;
}
bool
printSUB_REMOVE_REQ
(
FILE
*
output
,
const
Uint32
*
theData
,
Uint32
len
,
Uint16
receiverBlockNo
)
{
const
SubRemoveReq
*
const
sig
=
(
SubRemoveReq
*
)
theData
;
fprintf
(
output
,
" subscriptionId: %x
\n
"
,
sig
->
subscriptionId
);
fprintf
(
output
,
" subscriptionKey: %x
\n
"
,
sig
->
subscriptionKey
);
return
false
;
}
bool
printSUB_REMOVE_CONF
(
FILE
*
output
,
const
Uint32
*
theData
,
Uint32
len
,
Uint16
receiverBlockNo
)
{
const
SubRemoveConf
*
const
sig
=
(
SubRemoveConf
*
)
theData
;
fprintf
(
output
,
" subscriptionId: %x
\n
"
,
sig
->
subscriptionId
);
fprintf
(
output
,
" subscriptionKey: %x
\n
"
,
sig
->
subscriptionKey
);
fprintf
(
output
,
" subscriberData: %x
\n
"
,
sig
->
subscriberData
);
return
false
;
}
bool
printSUB_REMOVE_REF
(
FILE
*
output
,
const
Uint32
*
theData
,
Uint32
len
,
Uint16
receiverBlockNo
)
{
const
SubRemoveRef
*
const
sig
=
(
SubRemoveRef
*
)
theData
;
fprintf
(
output
,
" subscriptionId: %x
\n
"
,
sig
->
subscriptionId
);
fprintf
(
output
,
" subscriptionKey: %x
\n
"
,
sig
->
subscriptionKey
);
fprintf
(
output
,
" subscriberData: %x
\n
"
,
sig
->
subscriberData
);
fprintf
(
output
,
" err: %x
\n
"
,
sig
->
err
);
return
false
;
}
bool
printSUB_START_REQ
(
FILE
*
output
,
const
Uint32
*
theData
,
Uint32
len
,
Uint16
receiverBlockNo
)
{
const
SubStartReq
*
const
sig
=
(
SubStartReq
*
)
theData
;
fprintf
(
output
,
" subscriptionId: %x
\n
"
,
sig
->
subscriptionId
);
fprintf
(
output
,
" subscriptionKey: %x
\n
"
,
sig
->
subscriptionKey
);
fprintf
(
output
,
" s
tartPart: %x
\n
"
,
sig
->
part
);
fprintf
(
output
,
" s
ubscriberData: %x
\n
"
,
sig
->
subscriberData
);
return
false
;
}
...
...
@@ -72,6 +115,37 @@ printSUB_START_CONF(FILE * output, const Uint32 * theData,
return
false
;
}
bool
printSUB_STOP_REQ
(
FILE
*
output
,
const
Uint32
*
theData
,
Uint32
len
,
Uint16
receiverBlockNo
)
{
const
SubStopReq
*
const
sig
=
(
SubStopReq
*
)
theData
;
fprintf
(
output
,
" subscriptionId: %x
\n
"
,
sig
->
subscriptionId
);
fprintf
(
output
,
" subscriptionKey: %x
\n
"
,
sig
->
subscriptionKey
);
fprintf
(
output
,
" subscriberData: %x
\n
"
,
sig
->
subscriberData
);
return
false
;
}
bool
printSUB_STOP_REF
(
FILE
*
output
,
const
Uint32
*
theData
,
Uint32
len
,
Uint16
receiverBlockNo
)
{
const
SubStopRef
*
const
sig
=
(
SubStopRef
*
)
theData
;
fprintf
(
output
,
" subscriptionId: %x
\n
"
,
sig
->
subscriptionId
);
fprintf
(
output
,
" subscriptionKey: %x
\n
"
,
sig
->
subscriptionKey
);
fprintf
(
output
,
" subscriberData: %x
\n
"
,
sig
->
subscriberData
);
fprintf
(
output
,
" err: %x
\n
"
,
sig
->
err
);
return
false
;
}
bool
printSUB_STOP_CONF
(
FILE
*
output
,
const
Uint32
*
theData
,
Uint32
len
,
Uint16
receiverBlockNo
)
{
const
SubStopConf
*
const
sig
=
(
SubStopConf
*
)
theData
;
fprintf
(
output
,
" subscriptionId: %x
\n
"
,
sig
->
subscriptionId
);
fprintf
(
output
,
" subscriptionKey: %x
\n
"
,
sig
->
subscriptionKey
);
fprintf
(
output
,
" subscriberData: %x
\n
"
,
sig
->
subscriberData
);
return
false
;
}
bool
printSUB_SYNC_REQ
(
FILE
*
output
,
const
Uint32
*
theData
,
Uint32
len
,
Uint16
receiverBlockNo
)
{
...
...
ndb/src/kernel/blocks/suma/Suma.cpp
View file @
e4b6c25c
...
...
@@ -3280,7 +3280,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
for
(;
!
subbPtr
.
isNull
();
c_dataSubscribers
.
next
(
subbPtr
)){
jam
();
if
(
subbPtr
.
p
->
m_subPtrI
==
subPtr
.
i
&&
subbPtr
.
p
->
m_subscriberRef
==
subscriberRef
&&
refToNode
(
subbPtr
.
p
->
m_subscriberRef
)
==
refToNode
(
subscriberRef
)
&&
subbPtr
.
p
->
m_subscriberData
==
subscriberData
){
// ndbout_c("STOP_REQ: before c_dataSubscribers.release");
jam
();
...
...
@@ -3508,6 +3508,8 @@ SumaParticipant::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req,
jam
();
SubRemoveRef
*
ref
=
(
SubRemoveRef
*
)
signal
->
getDataPtrSend
();
ref
->
senderRef
=
reference
();
ref
->
subscriptionId
=
req
.
subscriptionId
;
ref
->
subscriptionKey
=
req
.
subscriptionKey
;
ref
->
senderData
=
req
.
senderData
;
ref
->
err
=
errCode
;
if
(
temporary
)
...
...
ndb/src/ndbapi/NdbDictionaryImpl.cpp
View file @
e4b6c25c
...
...
@@ -793,7 +793,7 @@ NdbDictInterface::setTransporter(class TransporterFacade * tf)
execNodeStatus);
if ( m_blockNumber == -1 ) {
m_error.code
= 4105;
m_error.code= 4105;
return false; // no more free blocknumbers
}//if
Uint32 theNode = tf->ownId();
...
...
@@ -947,7 +947,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
aNodeId
=
m_transporter
->
get_an_alive_node
();
}
if
(
aNodeId
==
0
){
m_error
.
code
=
4009
;
m_error
.
code
=
4009
;
m_transporter
->
unlock_mutex
();
DBUG_RETURN
(
-
1
);
}
...
...
@@ -974,7 +974,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
}
}
m_error
.
code
=
0
;
m_error
.
code
=
0
;
m_waiter
.
m_node
=
aNodeId
;
m_waiter
.
m_state
=
wst
;
...
...
@@ -1042,7 +1042,7 @@ NdbDictInterface::getTable(const char * name, bool fullyQualifiedNames)
const
Uint32
strLen
=
strlen
(
name
)
+
1
;
// NULL Terminated
if
(
strLen
>
MAX_TAB_NAME_SIZE
)
{
//sizeof(req->tableName)){
m_error
.
code
=
4307
;
m_error
.
code
=
4307
;
return
0
;
}
...
...
@@ -1079,9 +1079,9 @@ NdbDictInterface::getTable(class NdbApiSignal * signal,
if
(
r
)
return
0
;
NdbTableImpl
*
rt
=
0
;
m_error
.
code
=
parseTableInfo
(
&
rt
,
(
Uint32
*
)
m_buffer
.
get_data
(),
m_buffer
.
length
()
/
4
,
fullyQualifiedNames
);
m_error
.
code
=
parseTableInfo
(
&
rt
,
(
Uint32
*
)
m_buffer
.
get_data
(),
m_buffer
.
length
()
/
4
,
fullyQualifiedNames
);
rt
->
buildColumnHash
();
return
rt
;
}
...
...
@@ -1116,7 +1116,7 @@ NdbDictInterface::execGET_TABINFO_REF(NdbApiSignal * signal,
{
const
GetTabInfoRef
*
ref
=
CAST_CONSTPTR
(
GetTabInfoRef
,
signal
->
getDataPtr
());
m_error
.
code
=
ref
->
errorCode
;
m_error
.
code
=
ref
->
errorCode
;
m_waiter
.
signal
(
NO_WAIT
);
}
...
...
@@ -1348,7 +1348,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
if
(
tableDesc
.
FragmentDataLen
>
0
)
{
int
i
;
unsigned
i
;
Uint32
replicaCount
=
tableDesc
.
FragmentData
[
0
];
Uint32
fragCount
=
tableDesc
.
FragmentData
[
1
];
...
...
@@ -1396,13 +1396,13 @@ NdbDictionaryImpl::createTable(NdbTableImpl &t)
Ndb_local_table_info
*
info
=
get_local_table_info
(
t
.
m_internalName
.
c_str
(),
false
);
if
(
info
==
NULL
)
{
m_error
.
code
=
709
;
m_error
.
code
=
709
;
return
-
1
;
}
if
(
createBlobTables
(
*
(
info
->
m_table_impl
))
!=
0
)
{
int
save_code
=
m_error
.
code
;
(
void
)
dropTable
(
t
);
m_error
.
code
=
save_code
;
m_error
.
code
=
save_code
;
return
-
1
;
}
return
0
;
...
...
@@ -1472,7 +1472,7 @@ int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)
DBUG_ENTER
(
"NdbDictionaryImpl::alterTable"
);
if
(
!
get_local_table_info
(
originalInternalName
,
false
)){
m_error
.
code
=
709
;
m_error
.
code
=
709
;
DBUG_RETURN
(
-
1
);
}
// Alter the table
...
...
@@ -1508,12 +1508,12 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
DBUG_ENTER
(
"NdbDictInterface::createOrAlterTable"
);
unsigned
i
;
if
((
unsigned
)
impl
.
getNoOfPrimaryKeys
()
>
NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY
){
m_error
.
code
=
4317
;
m_error
.
code
=
4317
;
DBUG_RETURN
(
-
1
);
}
unsigned
sz
=
impl
.
m_columns
.
size
();
if
(
sz
>
NDB_MAX_ATTRIBUTES_IN_TABLE
){
m_error
.
code
=
4318
;
m_error
.
code
=
4318
;
DBUG_RETURN
(
-
1
);
}
...
...
@@ -1538,7 +1538,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
continue
;
if
(
col
->
m_autoIncrement
)
{
if
(
haveAutoIncrement
)
{
m_error
.
code
=
4335
;
m_error
.
code
=
4335
;
DBUG_RETURN
(
-
1
);
}
haveAutoIncrement
=
true
;
...
...
@@ -1548,7 +1548,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
// Check max length of frm data
if
(
impl
.
m_frm
.
length
()
>
MAX_FRM_DATA_SIZE
){
m_error
.
code
=
1229
;
m_error
.
code
=
1229
;
DBUG_RETURN
(
-
1
);
}
tmpTab
.
FrmLen
=
impl
.
m_frm
.
length
();
...
...
@@ -1596,22 +1596,22 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
// check type and compute attribute size and array size
if
(
!
tmpAttr
.
translateExtType
())
{
m_error
.
code
=
703
;
m_error
.
code
=
703
;
DBUG_RETURN
(
-
1
);
}
// charset is defined exactly for char types
if
(
col
->
getCharType
()
!=
(
col
->
m_cs
!=
NULL
))
{
m_error
.
code
=
703
;
m_error
.
code
=
703
;
DBUG_RETURN
(
-
1
);
}
// primary key type check
if
(
col
->
m_pk
&&
!
NdbSqlUtil
::
usable_in_pk
(
col
->
m_type
,
col
->
m_cs
))
{
m_error
.
code
=
743
;
m_error
.
code
=
743
;
DBUG_RETURN
(
-
1
);
}
// distribution key not supported for Char attribute
if
(
col
->
m_distributionKey
&&
col
->
m_cs
!=
NULL
)
{
m_error
.
code
=
745
;
m_error
.
code
=
745
;
DBUG_RETURN
(
-
1
);
}
// charset in upper half of precision
...
...
@@ -1669,7 +1669,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
if
(
!
ndb
.
setAutoIncrementValue
(
impl
.
m_externalName
.
c_str
(),
autoIncrementValue
))
{
if
(
ndb
.
theError
.
code
==
0
)
{
m_error
.
code
=
4336
;
m_error
.
code
=
4336
;
ndb
.
theError
=
m_error
;
}
else
m_error
=
ndb
.
theError
;
...
...
@@ -1719,7 +1719,7 @@ NdbDictInterface::execCREATE_TABLE_REF(NdbApiSignal * signal,
{
const
CreateTableRef
*
const
ref
=
CAST_CONSTPTR
(
CreateTableRef
,
signal
->
getDataPtr
());
m_error
.
code
=
ref
->
errorCode
;
m_error
.
code
=
ref
->
errorCode
;
m_masterNodeId
=
ref
->
masterNodeId
;
m_waiter
.
signal
(
NO_WAIT
);
}
...
...
@@ -1763,7 +1763,7 @@ NdbDictInterface::execALTER_TABLE_REF(NdbApiSignal * signal,
{
const
AlterTableRef
*
const
ref
=
CAST_CONSTPTR
(
AlterTableRef
,
signal
->
getDataPtr
());
m_error
.
code
=
ref
->
errorCode
;
m_error
.
code
=
ref
->
errorCode
;
m_masterNodeId
=
ref
->
masterNodeId
;
m_waiter
.
signal
(
NO_WAIT
);
}
...
...
@@ -1808,7 +1808,7 @@ NdbDictionaryImpl::dropTable(NdbTableImpl & impl)
}
if
(
impl
.
m_indexType
!=
NdbDictionary
::
Index
::
Undefined
)
{
m_receiver
.
m_error
.
code
=
1228
;
m_receiver
.
m_error
.
code
=
1228
;
return
-
1
;
}
...
...
@@ -1918,7 +1918,7 @@ NdbDictInterface::execDROP_TABLE_REF(NdbApiSignal * signal,
LinearSectionPtr
ptr
[
3
])
{
const
DropTableRef
*
const
ref
=
CAST_CONSTPTR
(
DropTableRef
,
signal
->
getDataPtr
());
m_error
.
code
=
ref
->
errorCode
;
m_error
.
code
=
ref
->
errorCode
;
m_masterNodeId
=
ref
->
masterNodeId
;
m_waiter
.
signal
(
NO_WAIT
);
}
...
...
@@ -2370,7 +2370,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
const
NdbColumnImpl
*
col
=
table
.
getColumn
(
evnt
.
m_columns
[
i
]
->
m_name
.
c_str
());
if
(
col
==
0
){
m_error
.
code
=
4247
;
m_error
.
code
=
4247
;
return
-
1
;
}
// Copy column definition
...
...
@@ -2396,7 +2396,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
// Check for illegal duplicate attributes
for
(
i
=
1
;
i
<
attributeList_sz
;
i
++
)
{
if
(
evnt
.
m_columns
[
i
-
1
]
->
m_attrId
==
evnt
.
m_columns
[
i
]
->
m_attrId
)
{
m_error
.
code
=
4258
;
m_error
.
code
=
4258
;
return
-
1
;
}
}
...
...
@@ -2444,7 +2444,7 @@ NdbDictInterface::createEvent(class Ndb & ndb,
const
size_t
len
=
strlen
(
evnt
.
m_externalName
.
c_str
())
+
1
;
if
(
len
>
MAX_TAB_NAME_SIZE
)
{
m_error
.
code
=
4241
;
m_error
.
code
=
4241
;
return
-
1
;
}
...
...
@@ -2557,9 +2557,7 @@ int
NdbDictInterface
::
stopSubscribeEvent
(
class
Ndb
&
ndb
,
NdbEventImpl
&
evnt
)
{
#ifdef EVENT_DEBUG
ndbout_c
(
"SUB_STOP_REQ"
);
#endif
DBUG_ENTER
(
"NdbDictInterface::stopSubscribeEvent"
);
NdbApiSignal
tSignal
(
m_reference
);
// tSignal.theReceiversBlockNumber = SUMA;
...
...
@@ -2575,7 +2573,7 @@ NdbDictInterface::stopSubscribeEvent(class Ndb & ndb,
sumaStop
->
part
=
(
Uint32
)
SubscriptionData
::
TableData
;
sumaStop
->
subscriberRef
=
m_reference
;
return
stopSubscribeEvent
(
&
tSignal
,
NULL
);
DBUG_RETURN
(
stopSubscribeEvent
(
&
tSignal
,
NULL
)
);
}
int
...
...
@@ -2635,7 +2633,7 @@ NdbDictionaryImpl::getEvent(const char * eventName)
#ifdef EVENT_DEBUG
ndbout_c
(
"NdbDictionaryImpl::getEvent could not find column id %d"
,
id
);
#endif
m_error
.
code
=
4247
;
m_error
.
code
=
4247
;
delete
ev
;
return
NULL
;
}
...
...
@@ -2653,9 +2651,8 @@ void
NdbDictInterface
::
execCREATE_EVNT_CONF
(
NdbApiSignal
*
signal
,
LinearSectionPtr
ptr
[
3
])
{
#ifdef EVENT_DEBUG
ndbout
<<
"NdbDictionaryImpl.cpp: execCREATE_EVNT_CONF"
<<
endl
;
#endif
DBUG_ENTER
(
"NdbDictInterface::execCREATE_EVNT_CONF"
);
m_buffer
.
clear
();
unsigned
int
len
=
signal
->
getLength
()
<<
2
;
m_buffer
.
append
((
char
*
)
&
len
,
sizeof
(
len
));
...
...
@@ -2665,45 +2662,49 @@ NdbDictInterface::execCREATE_EVNT_CONF(NdbApiSignal * signal,
m_buffer
.
append
((
char
*
)
ptr
[
0
].
p
,
strlen
((
char
*
)
ptr
[
0
].
p
)
+
1
);
}
const
CreateEvntConf
*
const
createEvntConf
=
CAST_CONSTPTR
(
CreateEvntConf
,
signal
->
getDataPtr
());
Uint32
subscriptionId
=
createEvntConf
->
getEventId
();
Uint32
subscriptionKey
=
createEvntConf
->
getEventKey
();
DBUG_PRINT
(
"info"
,(
"subscriptionId=%d,subscriptionKey=%d"
,
subscriptionId
,
subscriptionKey
));
m_waiter
.
signal
(
NO_WAIT
);
DBUG_VOID_RETURN
;
}
void
NdbDictInterface
::
execCREATE_EVNT_REF
(
NdbApiSignal
*
signal
,
LinearSectionPtr
ptr
[
3
])
{
#ifdef EVENT_DEBUG
ndbout
<<
"NdbDictionaryImpl.cpp: execCREATE_EVNT_REF"
<<
endl
;
ndbout
<<
"Exiting"
<<
endl
;
exit
(
-
1
);
#endif
DBUG_ENTER
(
"NdbDictInterface::execCREATE_EVNT_REF"
);
const
CreateEvntRef
*
const
ref
=
CAST_CONSTPTR
(
CreateEvntRef
,
signal
->
getDataPtr
());
m_error
.
code
=
ref
->
getErrorCode
();
#ifdef EVENT_DEBUG
ndbout_c
(
"execCREATE_EVNT_REF"
);
ndbout_c
(
"ErrorCode %u"
,
ref
->
getErrorCode
());
ndbout_c
(
"Errorline %u"
,
ref
->
getErrorLine
());
ndbout_c
(
"ErrorNode %u"
,
ref
->
getErrorNode
());
#endif
m_waiter
.
signal
(
NO_WAIT
);
const
CreateEvntRef
*
const
ref
=
CAST_CONSTPTR
(
CreateEvntRef
,
signal
->
getDataPtr
());
m_error
.
code
=
ref
->
getErrorCode
();
DBUG_PRINT
(
"error"
,(
"error=%d,line=%d,node=%d"
,
ref
->
getErrorCode
(),
ref
->
getErrorLine
(),
ref
->
getErrorNode
()));
m_waiter
.
signal
(
NO_WAIT
);
DBUG_VOID_RETURN
;
}
void
NdbDictInterface
::
execSUB_STOP_CONF
(
NdbApiSignal
*
signal
,
LinearSectionPtr
ptr
[
3
])
{
DBUG_ENTER
(
"NdbDictInterface::execSUB_STOP_REF"
);
#ifdef EVENT_DEBUG
ndbout
<<
"Got GSN_SUB_STOP_CONF"
<<
endl
;
#endif
// SubRemoveConf * const sumaRemoveConf = CAST_CONSTPTR(SubRemoveConf, signal->getDataPtr());
DBUG_ENTER
(
"NdbDictInterface::execSUB_STOP_CONF"
);
const
SubStopConf
*
const
subStopConf
=
CAST_CONSTPTR
(
SubStopConf
,
signal
->
getDataPtr
());
// Uint32 subscriptionId = sumaRemove
Conf->subscriptionId;
// Uint32 subscriptionKey = sumaRemove
Conf->subscriptionKey;
// Uint32 senderData = sumaRemoveConf->send
erData;
Uint32
subscriptionId
=
subStop
Conf
->
subscriptionId
;
Uint32
subscriptionKey
=
subStop
Conf
->
subscriptionKey
;
Uint32
subscriberData
=
subStopConf
->
subscrib
erData
;
DBUG_PRINT
(
"info"
,(
"subscriptionId=%d,subscriptionKey=%d,subscriberData=%d"
,
subscriptionId
,
subscriptionKey
,
subscriberData
));
m_waiter
.
signal
(
NO_WAIT
);
DBUG_VOID_RETURN
;
}
void
...
...
@@ -2711,19 +2712,17 @@ NdbDictInterface::execSUB_STOP_REF(NdbApiSignal * signal,
LinearSectionPtr
ptr
[
3
])
{
DBUG_ENTER
(
"NdbDictInterface::execSUB_STOP_REF"
);
#ifdef EVENT_DEBUG
ndbout
<<
"Got GSN_SUB_STOP_REF"
<<
endl
;
#endif
const
SubRemoveRef
*
const
sumaRemoveRef
=
CAST_CONSTPTR
(
SubRemoveRef
,
signal
->
getDataPtr
());
const
SubStopRef
*
const
subStopRef
=
CAST_CONSTPTR
(
SubStopRef
,
signal
->
getDataPtr
());
// Uint32 subscriptionId = sumaRemoveRef->subscriptionId;
// Uint32 subscriptionKey = sumaRemoveRef->subscriptionKey;
// Uint32 senderData = sumaRemoveRef->senderData;
Uint32
subscriptionId
=
subStopRef
->
subscriptionId
;
Uint32
subscriptionKey
=
subStopRef
->
subscriptionKey
;
Uint32
subscriberData
=
subStopRef
->
subscriberData
;
m_error
.
code
=
subStopRef
->
errorCode
;
m_error
.
code
=
sumaRemoveRef
->
errorCode
;
DBUG_PRINT
(
"error"
,(
"subscriptionId=%d,subscriptionKey=%d,subscriberData=%d,error=%d"
,
subscriptionId
,
subscriptionKey
,
subscriberData
,
m_error
.
code
));
m_waiter
.
signal
(
NO_WAIT
);
DBUG_VOID_RETURN
;
}
...
...
@@ -2731,57 +2730,55 @@ void
NdbDictInterface
::
execSUB_START_CONF
(
NdbApiSignal
*
signal
,
LinearSectionPtr
ptr
[
3
])
{
#ifdef EVENT_DEBUG
ndbout
<<
"Got GSN_SUB_START_CONF"
<<
endl
;
#endif
const
SubStartConf
*
const
sumaStartConf
=
CAST_CONSTPTR
(
SubStartConf
,
signal
->
getDataPtr
());
DBUG_ENTER
(
"NdbDictInterface::execSUB_START_CONF"
);
const
SubStartConf
*
const
subStartConf
=
CAST_CONSTPTR
(
SubStartConf
,
signal
->
getDataPtr
());
// Uint32 subscriptionId = suma
StartConf->subscriptionId;
// Uint32 subscriptionKey = suma
StartConf->subscriptionKey;
Uint32
subscriptionId
=
sub
StartConf
->
subscriptionId
;
Uint32
subscriptionKey
=
sub
StartConf
->
subscriptionKey
;
SubscriptionData
::
Part
part
=
(
SubscriptionData
::
Part
)
suma
StartConf
->
part
;
// Uint32 subscriberData = suma
StartConf->subscriberData;
(
SubscriptionData
::
Part
)
sub
StartConf
->
part
;
Uint32
subscriberData
=
sub
StartConf
->
subscriberData
;
switch
(
part
)
{
case
SubscriptionData
:
:
MetaData
:
{
#ifdef EVENT_DEBUG
ndbout
<<
"SubscriptionData::MetaData"
<<
endl
;
#endif
m_error
.
code
=
1
;
DBUG_PRINT
(
"error"
,(
"SubscriptionData::MetaData"
));
m_error
.
code
=
1
;
break
;
}
case
SubscriptionData
:
:
TableData
:
{
#ifdef EVENT_DEBUG
ndbout
<<
"SubscriptionData::TableData"
<<
endl
;
#endif
DBUG_PRINT
(
"info"
,(
"SubscriptionData::TableData"
));
break
;
}
default:
{
#ifdef EVENT_DEBUG
ndbout_c
(
"NdbDictInterface::execSUB_START_CONF wrong data"
);
#endif
m_error
.
code
=
1
;
DBUG_PRINT
(
"error"
,(
"wrong data"
));
m_error
.
code
=
2
;
break
;
}
}
DBUG_PRINT
(
"info"
,(
"subscriptionId=%d,subscriptionKey=%d,subscriberData=%d"
,
subscriptionId
,
subscriptionKey
,
subscriberData
));
m_waiter
.
signal
(
NO_WAIT
);
DBUG_VOID_RETURN
;
}
void
NdbDictInterface
::
execSUB_START_REF
(
NdbApiSignal
*
signal
,
LinearSectionPtr
ptr
[
3
])
{
#ifdef EVENT_DEBUG
ndbout
<<
"Got GSN_SUB_START_REF"
<<
endl
;
#endif
m_error
.
code
=
1
;
m_waiter
.
signal
(
NO_WAIT
);
DBUG_ENTER
(
"NdbDictInterface::execSUB_START_REF"
);
const
SubStartRef
*
const
subStartRef
=
CAST_CONSTPTR
(
SubStartRef
,
signal
->
getDataPtr
());
m_error
.
code
=
subStartRef
->
errorCode
;
m_waiter
.
signal
(
NO_WAIT
);
DBUG_VOID_RETURN
;
}
void
NdbDictInterface
::
execSUB_GCP_COMPLETE_REP
(
NdbApiSignal
*
signal
,
LinearSectionPtr
ptr
[
3
])
{
const
SubGcpCompleteRep
*
const
rep
=
CAST_CONSTPTR
(
SubGcpCompleteRep
,
signal
->
getDataPtr
());
const
SubGcpCompleteRep
*
const
rep
=
CAST_CONSTPTR
(
SubGcpCompleteRep
,
signal
->
getDataPtr
());
const
Uint32
gci
=
rep
->
gci
;
// const Uint32 senderRef = rep->senderRef;
...
...
@@ -2792,7 +2789,8 @@ NdbDictInterface::execSUB_GCP_COMPLETE_REP(NdbApiSignal * signal,
const
Uint32
ref
=
signal
->
theSendersBlockRef
;
NdbApiSignal
tSignal
(
m_reference
);
SubGcpCompleteAcc
*
acc
=
CAST_PTR
(
SubGcpCompleteAcc
,
tSignal
.
getDataPtrSend
());
SubGcpCompleteAcc
*
acc
=
CAST_PTR
(
SubGcpCompleteAcc
,
tSignal
.
getDataPtrSend
());
acc
->
rep
=
*
rep
;
...
...
@@ -2850,9 +2848,9 @@ NdbDictInterface::execSUB_TABLE_DATA(NdbApiSignal * signal,
int
NdbDictionaryImpl
::
dropEvent
(
const
char
*
eventName
)
{
NdbEventImpl
*
ev
=
new
NdbEventImpl
();
NdbEventImpl
*
ev
=
new
NdbEventImpl
();
ev
->
setName
(
eventName
);
int
ret
=
m_receiver
.
dropEvent
(
*
ev
);
int
ret
=
m_receiver
.
dropEvent
(
*
ev
);
delete
ev
;
// printf("__________________RET %u\n", ret);
...
...
@@ -2901,31 +2899,25 @@ void
NdbDictInterface
::
execDROP_EVNT_CONF
(
NdbApiSignal
*
signal
,
LinearSectionPtr
ptr
[
3
])
{
#ifdef EVENT_DEBUG
ndbout
<<
"NdbDictionaryImpl.cpp: execDROP_EVNT_CONF"
<<
endl
;
#endif
DBUG_ENTER
(
"NdbDictInterface::execDROP_EVNT_CONF"
);
m_waiter
.
signal
(
NO_WAIT
);
DBUG_VOID_RETURN
;
}
void
NdbDictInterface
::
execDROP_EVNT_REF
(
NdbApiSignal
*
signal
,
LinearSectionPtr
ptr
[
3
])
{
#ifdef EVENT_DEBUG
ndbout
<<
"NdbDictionaryImpl.cpp: execDROP_EVNT_REF"
<<
endl
;
#endif
const
DropEvntRef
*
const
ref
=
CAST_CONSTPTR
(
DropEvntRef
,
signal
->
getDataPtr
());
m_error
.
code
=
ref
->
getErrorCode
();
DBUG_ENTER
(
"NdbDictInterface::execDROP_EVNT_REF"
);
const
DropEvntRef
*
const
ref
=
CAST_CONSTPTR
(
DropEvntRef
,
signal
->
getDataPtr
());
m_error
.
code
=
ref
->
getErrorCode
();
#if 0
ndbout_c("execDROP_EVNT_REF");
ndbout_c("ErrorCode %u", ref->getErrorCode());
ndbout_c("Errorline %u", ref->getErrorLine());
ndbout_c("ErrorNode %u", ref->getErrorNode());
#endif
DBUG_PRINT
(
"info"
,(
"ErrorCode=%u Errorline=%u ErrorNode=%u"
,
ref
->
getErrorCode
(),
ref
->
getErrorLine
(),
ref
->
getErrorNode
()));
m_waiter
.
signal
(
NO_WAIT
);
m_waiter
.
signal
(
NO_WAIT
);
DBUG_VOID_RETURN
;
}
/*****************************************************************
...
...
@@ -2990,7 +2982,7 @@ NdbDictInterface::listObjects(NdbDictionary::Dictionary::List& list,
}
if
(
!
ok
)
{
// bad signal data
m_error
.
code
=
4213
;
m_error
.
code
=
4213
;
return
-
1
;
}
list
.
count
=
count
;
...
...
@@ -3056,7 +3048,7 @@ NdbDictInterface::listObjects(NdbApiSignal* signal)
m_transporter
->
lock_mutex
();
Uint16
aNodeId
=
m_transporter
->
get_an_alive_node
();
if
(
aNodeId
==
0
)
{
m_error
.
code
=
4009
;
m_error
.
code
=
4009
;
m_transporter
->
unlock_mutex
();
return
-
1
;
}
...
...
@@ -3064,7 +3056,7 @@ NdbDictInterface::listObjects(NdbApiSignal* signal)
m_transporter
->
unlock_mutex
();
continue
;
}
m_error
.
code
=
0
;
m_error
.
code
=
0
;
m_waiter
.
m_node
=
aNodeId
;
m_waiter
.
m_state
=
WAIT_LIST_TABLES_CONF
;
m_waiter
.
wait
(
WAITFOR_RESPONSE_TIMEOUT
);
...
...
ndb/src/ndbapi/NdbEventOperationImpl.cpp
View file @
e4b6c25c
...
...
@@ -120,24 +120,26 @@ NdbEventOperationImpl::getState()
NdbRecAttr
*
NdbEventOperationImpl
::
getValue
(
const
char
*
colName
,
char
*
aValue
,
int
n
)
{
DBUG_ENTER
(
"NdbEventOperationImpl::getValue"
);
if
(
m_state
!=
EO_CREATED
)
{
ndbout_c
(
"NdbEventOperationImpl::getValue may only be called between instantiation and execute()"
);
return
NULL
;
DBUG_RETURN
(
NULL
)
;
}
NdbColumnImpl
*
tAttrInfo
=
m_eventImpl
->
m_tableImpl
->
getColumn
(
colName
);
if
(
tAttrInfo
==
NULL
)
{
ndbout_c
(
"NdbEventOperationImpl::getValue attribute %s not found"
,
colName
);
return
NULL
;
DBUG_RETURN
(
NULL
)
;
}
return
NdbEventOperationImpl
::
getValue
(
tAttrInfo
,
aValue
,
n
);
DBUG_RETURN
(
NdbEventOperationImpl
::
getValue
(
tAttrInfo
,
aValue
,
n
)
);
}
NdbRecAttr
*
NdbEventOperationImpl
::
getValue
(
const
NdbColumnImpl
*
tAttrInfo
,
char
*
aValue
,
int
n
)
{
DBUG_ENTER
(
"NdbEventOperationImpl::getValue"
);
// Insert Attribute Id into ATTRINFO part.
NdbRecAttr
*&
theFirstRecAttr
=
theFirstRecAttrs
[
n
];
NdbRecAttr
*&
theCurrentRecAttr
=
theCurrentRecAttrs
[
n
];
...
...
@@ -149,7 +151,7 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
if
(
tRecAttr
==
NULL
)
{
exit
(
-
1
);
//setErrorCodeAbort(4000);
return
NULL
;
DBUG_RETURN
(
NULL
)
;
}
/**********************************************************************
...
...
@@ -161,7 +163,7 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
//setErrorCodeAbort(4000);
m_ndb
->
releaseRecAttr
(
tRecAttr
);
exit
(
-
1
);
return
NULL
;
DBUG_RETURN
(
NULL
)
;
}
//theErrorLine++;
...
...
@@ -193,7 +195,7 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
tRecAttr
->
release
();
// do I need to do this?
m_ndb
->
releaseRecAttr
(
tRecAttr
);
exit
(
-
1
);
return
NULL
;
DBUG_RETURN
(
NULL
)
;
}
// this is it, between p and p_next
p
->
next
(
tRecAttr
);
...
...
@@ -201,16 +203,17 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
}
}
return
tRecAttr
;
DBUG_RETURN
(
tRecAttr
)
;
}
int
NdbEventOperationImpl
::
execute
()
{
DBUG_ENTER
(
"NdbEventOperationImpl::execute"
);
NdbDictionary
::
Dictionary
*
myDict
=
m_ndb
->
getDictionary
();
if
(
!
myDict
)
{
m_error
.
code
=
m_ndb
->
getNdbError
().
code
;
return
-
1
;
DBUG_RETURN
(
-
1
)
;
}
if
(
theFirstRecAttrs
[
0
]
==
NULL
)
{
// defaults to get all
...
...
@@ -227,7 +230,9 @@ NdbEventOperationImpl::execute()
m_error
.
code
=
4709
;
if
(
r
<
0
)
return
-
1
;
{
DBUG_RETURN
(
-
1
);
}
m_eventImpl
->
m_bufferId
=
m_bufferId
=
(
Uint32
)
r
;
...
...
@@ -252,7 +257,7 @@ NdbEventOperationImpl::execute()
//Error
m_state
=
EO_ERROR
;
}
return
r
;
DBUG_RETURN
(
r
)
;
}
int
...
...
@@ -260,7 +265,9 @@ NdbEventOperationImpl::stop()
{
DBUG_ENTER
(
"NdbEventOperationImpl::stop"
);
if
(
m_state
!=
EO_EXECUTING
)
{
DBUG_RETURN
(
-
1
);
}
// ndbout_c("NdbEventOperation::stopping()");
...
...
@@ -330,6 +337,7 @@ NdbEventOperationImpl::getLatestGCI()
int
NdbEventOperationImpl
::
next
(
int
*
pOverrun
)
{
DBUG_ENTER
(
"NdbEventOperationImpl::next"
);
int
nr
=
10000
;
// a high value
int
tmpOverrun
=
0
;
int
*
ptmpOverrun
;
...
...
@@ -346,7 +354,10 @@ NdbEventOperationImpl::next(int *pOverrun)
*
pOverrun
=
tmpOverrun
;
}
if
(
r
<=
0
)
return
r
;
// no data
if
(
r
<=
0
)
{
DBUG_RETURN
(
r
);
// no data
}
if
(
r
<
nr
)
r
=
nr
;
else
nr
--
;
// we don't want to be stuck here forever
...
...
@@ -356,7 +367,10 @@ NdbEventOperationImpl::next(int *pOverrun)
// now move the data into the RecAttrs
if
((
theFirstRecAttrs
[
0
]
==
NULL
)
&&
(
theFirstRecAttrs
[
1
]
==
NULL
))
return
r
;
(
theFirstRecAttrs
[
1
]
==
NULL
))
{
DBUG_RETURN
(
r
);
}
// no copying since no RecAttr's
...
...
@@ -464,9 +478,11 @@ NdbEventOperationImpl::next(int *pOverrun)
}
if
(
hasSomeData
)
return
r
;
{
DBUG_RETURN
(
r
);
}
}
return
0
;
DBUG_RETURN
(
0
)
;
}
NdbDictionary
::
Event
::
TableEvent
...
...
@@ -641,23 +657,28 @@ NdbGlobalEventBufferHandle::~NdbGlobalEventBufferHandle()
void
NdbGlobalEventBufferHandle
::
addBufferId
(
int
bufferId
)
{
DBUG_ENTER
(
"NdbGlobalEventBufferHandle::addBufferId"
);
DBUG_PRINT
(
"enter"
,(
"bufferId=%d"
,
bufferId
));
if
(
m_nids
>=
NDB_MAX_ACTIVE_EVENTS
)
{
ndbout_c
(
"NdbGlobalEventBufferHandle::addBufferId error in paramerer setting"
);
exit
(
-
1
);
}
m_bufferIds
[
m_nids
]
=
bufferId
;
m_nids
++
;
DBUG_VOID_RETURN
;
}
void
NdbGlobalEventBufferHandle
::
dropBufferId
(
int
bufferId
)
{
DBUG_ENTER
(
"NdbGlobalEventBufferHandle::dropBufferId"
);
DBUG_PRINT
(
"enter"
,(
"bufferId=%d"
,
bufferId
));
for
(
int
i
=
0
;
i
<
m_nids
;
i
++
)
if
(
m_bufferIds
[
i
]
==
bufferId
)
{
m_nids
--
;
for
(;
i
<
m_nids
;
i
++
)
m_bufferIds
[
i
]
=
m_bufferIds
[
i
+
1
];
return
;
DBUG_VOID_RETURN
;
}
ndbout_c
(
"NdbGlobalEventBufferHandle::dropBufferId %d does not exist"
,
bufferId
);
...
...
@@ -841,7 +862,7 @@ NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h,
// (BufItem *)NdbMem_Allocate(m_max*sizeof(BufItem));
for
(
int
i
=
0
;
i
<
m_max
;
i
++
)
{
m_buf
[
i
].
gId
=
0
;
m_buf
[
i
].
gId
=
0
;
}
}
// TODO make sure we don't hit roof
...
...
@@ -876,14 +897,14 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
{
DBUG_ENTER
(
"NdbGlobalEventBuffer::real_prepareAddSubscribeEvent"
);
int
i
;
int
bufferId
=
-
1
;
int
bufferId
=
-
1
;
// add_drop_lock(); // only one thread can do add or drop at a time
// Find place where eventId already set
for
(
i
=
0
;
i
<
m_no
;
i
++
)
{
if
(
m_buf
[
i
].
gId
==
eventId
)
{
bufferId
=
i
;
bufferId
=
i
;
break
;
}
}
...
...
@@ -891,56 +912,54 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
// find space for new bufferId
for
(
i
=
0
;
i
<
m_no
;
i
++
)
{
if
(
m_buf
[
i
].
gId
==
0
)
{
bufferId
=
i
;
// we found an empty spot
break
;
bufferId
=
i
;
// we found an empty spot
goto
found_bufferId
;
}
}
if
(
bufferId
<
0
&&
m_no
<
m_max
)
{
// room for more so get that
bufferId
=
m_no
;
m_buf
[
m_no
].
gId
=
0
;
bufferId
=
m_no
;
m_buf
[
m_no
].
gId
=
0
;
m_no
++
;
}
else
{
ndbout_c
(
"prepareAddSubscribeEvent: Can't accept more subscribers"
);
// add_drop_unlock();
// add_drop_unlock();
DBUG_PRINT
(
"error"
,(
"Can't accept more subscribers:"
" bufferId=%d, m_no=%d, m_max=%d"
,
bufferId
,
m_no
,
m_max
));
DBUG_RETURN
(
-
1
);
}
}
found_bufferId:
BufItem
&
b
=
m_buf
[
ID
(
bufferId
)];
BufItem
&
b
=
m_buf
[
ID
(
bufferId
)];
if
(
b
.
gId
==
0
)
{
// first subscriber needs some initialization
bufferId
=
NO_ID
(
0
,
bufferId
);
bufferId
=
NO_ID
(
0
,
bufferId
);
b
.
gId
=
eventId
;
b
.
gId
=
eventId
;
if
((
b
.
p_buf_mutex
=
NdbMutex_Create
())
==
NULL
)
{
if
((
b
.
p_buf_mutex
=
NdbMutex_Create
())
==
NULL
)
{
ndbout_c
(
"NdbGlobalEventBuffer: NdbMutex_Create() failed"
);
exit
(
-
1
);
abort
(
);
}
b
.
subs
=
0
;
b
.
f
=
0
;
b
.
sz
=
0
;
b
.
max_sz
=
aHandle
->
m_bufferL
;
b
.
data
=
b
.
subs
=
0
;
b
.
f
=
0
;
b
.
sz
=
0
;
b
.
max_sz
=
aHandle
->
m_bufferL
;
b
.
data
=
(
BufItem
::
Data
*
)
NdbMem_Allocate
(
b
.
max_sz
*
sizeof
(
BufItem
::
Data
));
for
(
int
i
=
0
;
i
<
b
.
max_sz
;
i
++
)
{
b
.
data
[
i
].
sdata
=
NULL
;
b
.
data
[
i
].
ptr
[
0
].
p
=
NULL
;
b
.
data
[
i
].
ptr
[
1
].
p
=
NULL
;
b
.
data
[
i
].
ptr
[
2
].
p
=
NULL
;
b
.
data
[
i
].
sdata
=
NULL
;
b
.
data
[
i
].
ptr
[
0
].
p
=
NULL
;
b
.
data
[
i
].
ptr
[
1
].
p
=
NULL
;
b
.
data
[
i
].
ptr
[
2
].
p
=
NULL
;
}
}
else
{
#ifdef EVENT_DEBUG
ndbout_c
(
"NdbGlobalEventBuffer::prepareAddSubscribeEvent: TRYING handle one subscriber per event b.subs = %u"
,
b
.
subs
);
#endif
DBUG_PRINT
(
"info"
,
(
"TRYING handle one subscriber per event b.subs=%u"
,
b
.
subs
));
int
ni
=
-
1
;
for
(
int
i
=
0
;
i
<
b
.
subs
;
i
++
)
{
if
(
b
.
ps
[
i
].
theHandle
==
NULL
)
{
...
...
@@ -952,7 +971,8 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
if
(
b
.
subs
<
MAX_SUBSCRIBERS_PER_EVENT
)
{
ni
=
b
.
subs
;
}
else
{
ndbout_c
(
"prepareAddSubscribeEvent: Can't accept more subscribers"
);
DBUG_PRINT
(
"error"
,
(
"Can't accept more subscribers: b.subs=%d"
,
b
.
subs
));
// add_drop_unlock();
DBUG_RETURN
(
-
1
);
}
...
...
@@ -975,10 +995,8 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
else
hasSubscriber
=
0
;
#ifdef EVENT_DEBUG
ndbout_c
(
"prepareAddSubscribeEvent: handed out bufferId %d for eventId %d"
,
bufferId
,
eventId
);
#endif
DBUG_PRINT
(
"info"
,(
"handed out bufferId=%d for eventId=%d hasSubscriber=%d"
,
bufferId
,
eventId
,
hasSubscriber
));
/* we now have a lock on the prepare so that no one can mess with this
* unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent
...
...
@@ -989,9 +1007,13 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
void
NdbGlobalEventBuffer
::
real_unprepareAddSubscribeEvent
(
int
bufferId
)
{
DBUG_ENTER
(
"NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent"
);
BufItem
&
b
=
m_buf
[
ID
(
bufferId
)];
int
n
=
NO
(
bufferId
);
DBUG_PRINT
(
"enter"
,
(
"bufferId=%d,ID(bufferId)=%d,NO(bufferId)=%d"
,
bufferId
,
ID
(
bufferId
),
NO
(
bufferId
)));
b
.
ps
[
n
].
theHandle
=
NULL
;
// remove subscribers from the end,
...
...
@@ -1004,10 +1026,8 @@ NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
break
;
if
(
b
.
subs
==
0
)
{
#ifdef EVENT_DEBUG
ndbout_c
(
"unprepareAddSubscribeEvent: no more subscribers left on eventId %d"
,
b
.
gId
);
#endif
b
.
gId
=
0
;
// We don't have any subscribers, reuse BufItem
DBUG_PRINT
(
"info"
,(
"no more subscribers left on eventId %d"
,
b
.
gId
));
b
.
gId
=
0
;
// We don't have any subscribers, reuse BufItem
if
(
b
.
data
)
{
NdbMem_Free
((
void
*
)
b
.
data
);
b
.
data
=
NULL
;
...
...
@@ -1018,12 +1038,14 @@ NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
}
}
// add_drop_unlock();
DBUG_VOID_RETURN
;
}
void
NdbGlobalEventBuffer
::
real_addSubscribeEvent
(
int
bufferId
,
void
*
ndbEventOperation
)
{
DBUG_ENTER
(
"NdbGlobalEventBuffer::real_addSubscribeEvent"
);
BufItem
&
b
=
m_buf
[
ID
(
bufferId
)];
int
n
=
NO
(
bufferId
);
...
...
@@ -1031,9 +1053,8 @@ NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId,
b
.
ps
[
n
].
theHandle
->
addBufferId
(
bufferId
);
// add_drop_unlock();
#ifdef EVENT_DEBUG
ndbout_c
(
"addSubscribeEvent:: added bufferId %d"
,
bufferId
);
#endif
DBUG_PRINT
(
"info"
,(
"added bufferId %d"
,
bufferId
));
DBUG_VOID_RETURN
;
}
void
...
...
@@ -1062,7 +1083,9 @@ NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId,
else
if
(
n
==
1
)
hasSubscriber
=
0
;
else
{
DBUG_RETURN
(
-
1
);
}
DBUG_RETURN
(
0
);
}
...
...
@@ -1070,6 +1093,7 @@ NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId,
void
NdbGlobalEventBuffer
::
real_dropSubscribeEvent
(
int
bufferId
)
{
DBUG_ENTER
(
"NdbGlobalEventBuffer::real_dropSubscribeEvent"
);
// add_drop_lock(); // only one thread can do add-drop at a time
BufItem
&
b
=
m_buf
[
ID
(
bufferId
)];
...
...
@@ -1085,6 +1109,7 @@ NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId)
#ifdef EVENT_DEBUG
ndbout_c
(
"dropSubscribeEvent:: dropped bufferId %d"
,
bufferId
);
#endif
DBUG_VOID_RETURN
;
}
void
...
...
@@ -1107,6 +1132,7 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
])
{
DBUG_ENTER
(
"NdbGlobalEventBuffer::real_insertDataL"
);
BufItem
&
b
=
m_buf
[
ID
(
bufferId
)];
#ifdef EVENT_DEBUG
int
n
=
NO
(
bufferId
);
...
...
@@ -1119,7 +1145,9 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
// move front forward
if
(
copy_data_alloc
(
sdata
,
ptr
,
b
.
data
[
b
.
f
].
sdata
,
b
.
data
[
b
.
f
].
ptr
))
return
-
1
;
{
DBUG_RETURN
(
-
1
);
}
for
(
int
i
=
0
;
i
<
b
.
subs
;
i
++
)
{
NdbGlobalEventBuffer
::
BufItem
::
Ps
&
e
=
b
.
ps
[
i
];
if
(
e
.
theHandle
)
{
// active subscriber
...
...
@@ -1127,7 +1155,7 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
if
(
e
.
bufferempty
==
0
)
{
e
.
overrun
++
;
// another item has been overwritten
e
.
b
++
;
// move next-to-read next since old item was overwritten
if
(
e
.
b
==
b
.
max_sz
)
e
.
b
=
0
;
// start from beginning
if
(
e
.
b
==
b
.
max_sz
)
e
.
b
=
0
;
// start from beginning
}
}
e
.
bufferempty
=
0
;
...
...
@@ -1147,21 +1175,28 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
#endif
}
}
return
0
;
DBUG_RETURN
(
0
)
;
}
int
NdbGlobalEventBuffer
::
hasData
(
int
bufferId
)
{
DBUG_ENTER
(
"NdbGlobalEventBuffer::hasData"
);
BufItem
&
b
=
m_buf
[
ID
(
bufferId
)];
int
n
=
NO
(
bufferId
);
NdbGlobalEventBuffer
::
BufItem
::
Ps
&
e
=
b
.
ps
[
n
];
if
(
e
.
bufferempty
)
return
0
;
{
DBUG_RETURN
(
0
);
}
if
(
b
.
f
<=
e
.
b
)
return
b
.
max_sz
-
e
.
b
+
b
.
f
;
{
DBUG_RETURN
(
b
.
max_sz
-
e
.
b
+
b
.
f
);
}
else
return
b
.
f
-
e
.
b
;
{
DBUG_RETURN
(
b
.
f
-
e
.
b
);
}
}
int
NdbGlobalEventBuffer
::
real_getDataL
(
const
int
bufferId
,
...
...
@@ -1169,6 +1204,7 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
LinearSectionPtr
ptr
[
3
],
int
*
pOverrun
)
{
DBUG_ENTER
(
"NdbGlobalEventBuffer::real_getDataL"
);
BufItem
&
b
=
m_buf
[
ID
(
bufferId
)];
int
n
=
NO
(
bufferId
);
NdbGlobalEventBuffer
::
BufItem
::
Ps
&
e
=
b
.
ps
[
n
];
...
...
@@ -1179,13 +1215,17 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
}
if
(
e
.
bufferempty
)
return
0
;
// nothing to get
{
DBUG_RETURN
(
0
);
// nothing to get
}
if
(
copy_data_alloc
(
b
.
data
[
e
.
b
].
sdata
,
b
.
data
[
e
.
b
].
ptr
,
sdata
,
ptr
))
return
-
1
;
{
DBUG_RETURN
(
-
1
);
}
e
.
b
++
;
if
(
e
.
b
==
b
.
max_sz
)
e
.
b
=
0
;
// move next-to-read forward
e
.
b
++
;
if
(
e
.
b
==
b
.
max_sz
)
e
.
b
=
0
;
// move next-to-read forward
if
(
b
.
f
==
e
.
b
)
// back has cought up with front
e
.
bufferempty
=
1
;
...
...
@@ -1194,7 +1234,7 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
ndbout_c
(
"getting data from buffer %d with eventId %d"
,
bufferId
,
b
.
gId
);
#endif
return
hasData
(
bufferId
)
+
1
;
DBUG_RETURN
(
hasData
(
bufferId
)
+
1
)
;
}
int
NdbGlobalEventBuffer
::
copy_data_alloc
(
const
SubTableData
*
const
f_sdata
,
...
...
@@ -1202,6 +1242,7 @@ NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
SubTableData
*
&
t_sdata
,
LinearSectionPtr
t_ptr
[
3
])
{
DBUG_ENTER
(
"NdbGlobalEventBuffer::copy_data_alloc"
);
if
(
t_sdata
==
NULL
)
{
t_sdata
=
(
SubTableData
*
)
NdbMem_Allocate
(
sizeof
(
SubTableData
));
}
...
...
@@ -1223,28 +1264,34 @@ NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
}
t_p
.
sz
=
f_p
.
sz
;
}
return
0
;
DBUG_RETURN
(
0
)
;
}
int
NdbGlobalEventBuffer
::
real_wait
(
NdbGlobalEventBufferHandle
*
h
,
int
aMillisecondNumber
)
{
DBUG_ENTER
(
"NdbGlobalEventBuffer::real_wait"
);
// check if there are anything in any of the buffers
int
i
;
int
n
=
0
;
for
(
i
=
0
;
i
<
h
->
m_nids
;
i
++
)
n
+=
hasData
(
h
->
m_bufferIds
[
i
]);
if
(
n
)
return
n
;
if
(
n
)
{
DBUG_RETURN
(
n
);
}
int
r
=
NdbCondition_WaitTimeout
(
h
->
p_cond
,
ndb_global_event_buffer_mutex
,
aMillisecondNumber
);
if
(
r
>
0
)
return
-
1
;
{
DBUG_RETURN
(
-
1
);
}
n
=
0
;
for
(
i
=
0
;
i
<
h
->
m_nids
;
i
++
)
n
+=
hasData
(
h
->
m_bufferIds
[
i
]);
return
n
;
DBUG_RETURN
(
n
)
;
}
template
class
Vector
<
NdbGlobalEventBufferHandle
*
>;
ndb/test/src/HugoTransactions.cpp
View file @
e4b6c25c
...
...
@@ -775,7 +775,9 @@ HugoTransactions::createEvent(Ndb* pNdb){
NdbDictionary
::
Dictionary
*
myDict
=
pNdb
->
getDictionary
();
if
(
!
myDict
)
{
printf
(
"Event Creation failedDictionary not found"
);
g_err
<<
"Dictionary not found "
<<
pNdb
->
getNdbError
().
code
<<
" "
<<
pNdb
->
getNdbError
().
message
<<
endl
;
return
NDBT_FAILED
;
}
...
...
@@ -796,21 +798,33 @@ HugoTransactions::createEvent(Ndb* pNdb){
if
(
res
==
0
)
myEvent
.
print
();
else
{
g_info
<<
"Event creation failed
\n
"
;
g_info
<<
"trying drop Event, maybe event exists
\n
"
;
else
if
(
myDict
->
getNdbError
().
classification
==
NdbError
::
SchemaObjectExists
)
{
g_info
<<
"Event creation failed event exists
\n
"
;
res
=
myDict
->
dropEvent
(
eventName
);
if
(
res
)
{
g_err
<<
"failed to drop event
\n
"
;
g_err
<<
"Failed to drop event: "
<<
myDict
->
getNdbError
().
code
<<
" : "
<<
myDict
->
getNdbError
().
message
<<
endl
;
return
NDBT_FAILED
;
}
// try again
res
=
myDict
->
createEvent
(
myEvent
);
// Add event to database
if
(
res
)
{
g_err
<<
"failed to create event
\n
"
;
g_err
<<
"Failed to create event (1): "
<<
myDict
->
getNdbError
().
code
<<
" : "
<<
myDict
->
getNdbError
().
message
<<
endl
;
return
NDBT_FAILED
;
}
}
else
{
g_err
<<
"Failed to create event (2): "
<<
myDict
->
getNdbError
().
code
<<
" : "
<<
myDict
->
getNdbError
().
message
<<
endl
;
return
NDBT_FAILED
;
}
return
NDBT_OK
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment