Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
39e750d9
Commit
39e750d9
authored
Apr 12, 2005
by
joreland@mysql.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
bug#9749 - ndb lock upgrade
handle more cases...
parent
b38a5171
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
204 additions
and
13 deletions
+204
-13
ndb/src/kernel/blocks/dbacc/Dbacc.hpp
ndb/src/kernel/blocks/dbacc/Dbacc.hpp
+2
-0
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+142
-1
ndb/test/ndbapi/testOperations.cpp
ndb/test/ndbapi/testOperations.cpp
+60
-12
No files found.
ndb/src/kernel/blocks/dbacc/Dbacc.hpp
View file @
39e750d9
...
...
@@ -1100,6 +1100,8 @@ private:
Uint32
executeNextOperation
(
Signal
*
signal
);
void
releaselock
(
Signal
*
signal
);
void
takeOutFragWaitQue
(
Signal
*
signal
);
void
check_lock_upgrade
(
Signal
*
signal
,
OperationrecPtr
lock_owner
,
OperationrecPtr
release_op
);
void
allocOverflowPage
(
Signal
*
signal
);
bool
getrootfragmentrec
(
Signal
*
signal
,
RootfragmentrecPtr
&
,
Uint32
fragId
);
void
insertLockOwnersList
(
Signal
*
signal
,
const
OperationrecPtr
&
);
...
...
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
View file @
39e750d9
...
...
@@ -5802,9 +5802,148 @@ void Dbacc::commitOperation(Signal* signal)
ptrCheckGuard
(
tolqTmpPtr
,
coprecsize
,
operationrec
);
tolqTmpPtr
.
p
->
prevParallelQue
=
operationRecPtr
.
p
->
prevParallelQue
;
}
//if
}
//if
/**
* Check possible lock upgrade
* 1) Find lock owner
* 2) Count transactions in parallel que
* 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner)
* upgrade next serial
*/
if
(
operationRecPtr
.
p
->
lockMode
)
{
jam
();
/**
* Committing a non shared operation can't lead to lock upgrade
*/
return
;
}
OperationrecPtr
lock_owner
;
lock_owner
.
i
=
operationRecPtr
.
p
->
prevParallelQue
;
ptrCheckGuard
(
lock_owner
,
coprecsize
,
operationrec
);
Uint32
transid
[
2
]
=
{
lock_owner
.
p
->
transId1
,
lock_owner
.
p
->
transId2
};
while
(
lock_owner
.
p
->
prevParallelQue
!=
RNIL
)
{
lock_owner
.
i
=
lock_owner
.
p
->
prevParallelQue
;
ptrCheckGuard
(
lock_owner
,
coprecsize
,
operationrec
);
if
(
lock_owner
.
p
->
transId1
!=
transid
[
0
]
||
lock_owner
.
p
->
transId2
!=
transid
[
1
])
{
jam
();
/**
* If more than 1 trans in lock queue -> no lock upgrade
*/
return
;
}
}
check_lock_upgrade
(
signal
,
lock_owner
,
operationRecPtr
);
}
}
//Dbacc::commitOperation()
void
Dbacc
::
check_lock_upgrade
(
Signal
*
signal
,
OperationrecPtr
lock_owner
,
OperationrecPtr
release_op
)
{
if
((
lock_owner
.
p
->
transId1
==
release_op
.
p
->
transId1
&&
lock_owner
.
p
->
transId2
==
release_op
.
p
->
transId2
)
||
release_op
.
p
->
lockMode
||
lock_owner
.
p
->
nextSerialQue
==
RNIL
)
{
jam
();
/**
* No lock upgrade if same trans or lock owner has no serial queue
* or releasing non shared op
*/
return
;
}
OperationrecPtr
next
;
next
.
i
=
lock_owner
.
p
->
nextSerialQue
;
ptrCheckGuard
(
next
,
coprecsize
,
operationrec
);
if
(
lock_owner
.
p
->
transId1
!=
next
.
p
->
transId1
||
lock_owner
.
p
->
transId2
!=
next
.
p
->
transId2
)
{
jam
();
/**
* No lock upgrad if !same trans in serial queue
*/
return
;
}
tgnptMainOpPtr
=
lock_owner
;
getNoParallelTransaction
(
signal
);
if
(
tgnptNrTransaction
>
1
)
{
jam
();
/**
* No lock upgrade if more than 1 transaction in parallell queue
*/
return
;
}
OperationrecPtr
tmp
;
tmp
.
i
=
lock_owner
.
p
->
nextSerialQue
=
next
.
p
->
nextSerialQue
;
if
(
tmp
.
i
!=
RNIL
)
{
ptrCheckGuard
(
tmp
,
coprecsize
,
operationrec
);
ndbassert
(
tmp
.
p
->
prevSerialQue
==
next
.
i
);
tmp
.
p
->
prevSerialQue
=
lock_owner
.
i
;
}
next
.
p
->
nextSerialQue
=
next
.
p
->
prevSerialQue
=
RNIL
;
// Find end of parallell que
tmp
=
lock_owner
;
tmp
.
p
->
lockMode
=
1
;
while
(
tmp
.
p
->
nextParallelQue
!=
RNIL
)
{
jam
();
tmp
.
i
=
tmp
.
p
->
nextParallelQue
;
ptrCheckGuard
(
tmp
,
coprecsize
,
operationrec
);
tmp
.
p
->
lockMode
=
1
;
}
next
.
p
->
prevParallelQue
=
tmp
.
i
;
tmp
.
p
->
nextParallelQue
=
next
.
i
;
OperationrecPtr
save
=
operationRecPtr
;
Uint32
TelementIsDisappeared
=
0
;
// lock upgrade = all reads
Uint32
ThashValue
=
lock_owner
.
p
->
hashValue
;
Uint32
localdata
[
2
];
localdata
[
0
]
=
lock_owner
.
p
->
localdata
[
0
];
localdata
[
1
]
=
lock_owner
.
p
->
localdata
[
1
];
do
{
next
.
p
->
elementIsDisappeared
=
TelementIsDisappeared
;
next
.
p
->
hashValue
=
ThashValue
;
next
.
p
->
localdata
[
0
]
=
localdata
[
0
];
next
.
p
->
localdata
[
1
]
=
localdata
[
1
];
operationRecPtr
=
next
;
ndbassert
(
next
.
p
->
lockMode
);
TelementIsDisappeared
=
executeNextOperation
(
signal
);
if
(
next
.
p
->
nextParallelQue
!=
RNIL
)
{
jam
();
next
.
i
=
next
.
p
->
nextParallelQue
;
ptrCheckGuard
(
next
,
coprecsize
,
operationrec
);
}
else
{
jam
();
break
;
}
//if
}
while
(
1
);
operationRecPtr
=
save
;
}
/* ------------------------------------------------------------------------- */
/* RELEASELOCK */
/* RESETS LOCK OF AN ELEMENT. */
...
...
@@ -5841,6 +5980,8 @@ void Dbacc::releaselock(Signal* signal)
ptrCheckGuard
(
trlTmpOperPtr
,
coprecsize
,
operationrec
);
trlTmpOperPtr
.
p
->
prevSerialQue
=
trlOperPtr
.
i
;
}
//if
check_lock_upgrade
(
signal
,
copyInOperPtr
,
operationRecPtr
);
/* --------------------------------------------------------------------------------- */
/* SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT */
/* STARTING QUEUED OPERATIONS. THUS WE CAN END HERE. */
...
...
ndb/test/ndbapi/testOperations.cpp
View file @
39e750d9
...
...
@@ -547,21 +547,64 @@ runLockUpgrade1(NDBT_Context* ctx, NDBT_Step* step){
do
{
CHECK
(
hugoOps
.
startTransaction
(
pNdb
)
==
0
);
CHECK
(
hugoOps
.
pkReadRecord
(
pNdb
,
0
,
1
,
NdbOperation
::
LM_Read
)
==
0
);
CHECK
(
hugoOps
.
execute_NoCommit
(
pNdb
)
==
0
);
if
(
ctx
->
getProperty
(
"LOCK_UPGRADE"
,
1
)
==
1
)
{
CHECK
(
hugoOps
.
pkReadRecord
(
pNdb
,
0
,
1
,
NdbOperation
::
LM_Read
)
==
0
);
CHECK
(
hugoOps
.
execute_NoCommit
(
pNdb
)
==
0
);
ctx
->
setProperty
(
"READ_DONE"
,
1
);
ctx
->
broadcast
();
ndbout_c
(
"wait 2"
);
ctx
->
getPropertyWait
(
"READ_DONE"
,
2
);
ndbout_c
(
"wait 2 - done"
);
ctx
->
setProperty
(
"READ_DONE"
,
1
);
ctx
->
broadcast
();
ndbout_c
(
"wait 2"
);
ctx
->
getPropertyWait
(
"READ_DONE"
,
2
);
ndbout_c
(
"wait 2 - done"
);
}
else
{
ctx
->
setProperty
(
"READ_DONE"
,
1
);
ctx
->
broadcast
();
ctx
->
getPropertyWait
(
"READ_DONE"
,
2
);
ndbout_c
(
"wait 2 - done"
);
CHECK
(
hugoOps
.
pkReadRecord
(
pNdb
,
0
,
1
,
NdbOperation
::
LM_Read
)
==
0
);
CHECK
(
hugoOps
.
execute_NoCommit
(
pNdb
)
==
0
);
}
if
(
ctx
->
getProperty
(
"LU_OP"
,
o_INS
)
==
o_INS
)
{
CHECK
(
hugoOps
.
pkDeleteRecord
(
pNdb
,
0
,
1
)
==
0
);
CHECK
(
hugoOps
.
pkInsertRecord
(
pNdb
,
0
,
1
,
2
)
==
0
);
}
else
if
(
ctx
->
getProperty
(
"LU_OP"
,
o_UPD
)
==
o_UPD
)
{
CHECK
(
hugoOps
.
pkUpdateRecord
(
pNdb
,
0
,
1
,
2
)
==
0
);
}
else
{
CHECK
(
hugoOps
.
pkDeleteRecord
(
pNdb
,
0
,
1
)
==
0
);
}
ctx
->
setProperty
(
"READ_DONE"
,
3
);
ctx
->
broadcast
();
ndbout_c
(
"before update"
);
CHECK
(
hugoOps
.
pkUpdateRecord
(
pNdb
,
0
,
1
,
2
)
==
0
);
ndbout_c
(
"wait update"
);
CHECK
(
hugoOps
.
execute_NoCommit
(
pNdb
)
==
0
);
CHECK
(
hugoOps
.
closeTransaction
(
pNdb
));
CHECK
(
hugoOps
.
execute_Commit
(
pNdb
)
==
0
);
CHECK
(
hugoOps
.
closeTransaction
(
pNdb
)
==
0
);
CHECK
(
hugoOps
.
startTransaction
(
pNdb
)
==
0
);
CHECK
(
hugoOps
.
pkReadRecord
(
pNdb
,
0
,
1
)
==
0
);
int
res
=
hugoOps
.
execute_Commit
(
pNdb
);
if
(
ctx
->
getProperty
(
"LU_OP"
,
o_INS
)
==
o_INS
)
{
CHECK
(
res
==
0
);
CHECK
(
hugoOps
.
verifyUpdatesValue
(
2
)
==
0
);
}
else
if
(
ctx
->
getProperty
(
"LU_OP"
,
o_UPD
)
==
o_UPD
)
{
CHECK
(
res
==
0
);
CHECK
(
hugoOps
.
verifyUpdatesValue
(
2
)
==
0
);
}
else
{
CHECK
(
res
==
626
);
}
}
while
(
0
);
return
result
;
...
...
@@ -592,10 +635,10 @@ runLockUpgrade2(NDBT_Context* ctx, NDBT_Step* step){
ndbout_c
(
"wait 3 - done"
);
NdbSleep_MilliSleep
(
200
);
CHECK
(
hugoOps
.
execute_Commit
(
pNdb
)
==
0
);
CHECK
(
hugoOps
.
execute_Commit
(
pNdb
)
==
0
);
}
while
(
0
);
return
NDBT_FAILED
;
return
result
;
}
int
...
...
@@ -607,11 +650,16 @@ main(int argc, const char** argv){
NDBT_TestSuite
ts
(
"testOperations"
);
for
(
Uint32
i
=
0
;
i
<
6
;
i
++
)
{
BaseString
name
(
"bug_9749"
);
name
.
appfmt
(
"_%d"
,
i
);
NDBT_TestCaseImpl1
*
pt
=
new
NDBT_TestCaseImpl1
(
&
ts
,
name
.
c_str
(),
""
);
pt
->
setProperty
(
"LOCK_UPGRADE"
,
1
+
(
i
&
1
));
pt
->
setProperty
(
"LU_OP"
,
1
+
(
i
>>
1
));
pt
->
addInitializer
(
new
NDBT_Initializer
(
pt
,
"runClearTable"
,
runClearTable
));
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment