Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neoppod
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Vincent Pelletier
neoppod
Commits
64e08bd5
Commit
64e08bd5
authored
May 09, 2017
by
Julien Muchembled
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
dropPartitions
parent
8535b9cc
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
133 additions
and
78 deletions
+133
-78
neo/storage/database/importer.py
neo/storage/database/importer.py
+1
-1
neo/storage/database/manager.py
neo/storage/database/manager.py
+104
-20
neo/storage/database/mysql.py
neo/storage/database/mysql.py
+12
-11
neo/storage/database/sqlite.py
neo/storage/database/sqlite.py
+7
-10
neo/storage/handlers/__init__.py
neo/storage/handlers/__init__.py
+1
-1
neo/storage/handlers/initialization.py
neo/storage/handlers/initialization.py
+4
-18
neo/tests/storage/testMasterHandler.py
neo/tests/storage/testMasterHandler.py
+1
-1
neo/tests/storage/testStorageDBTests.py
neo/tests/storage/testStorageDBTests.py
+1
-1
neo/tests/threaded/testReplication.py
neo/tests/threaded/testReplication.py
+2
-15
No files found.
neo/storage/database/importer.py
View file @
64e08bd5
...
...
@@ -375,7 +375,7 @@ class ImporterDatabaseManager(DatabaseManager):
def
__init__
(
self
,
*
args
,
**
kw
):
super
(
ImporterDatabaseManager
,
self
).
__init__
(
*
args
,
**
kw
)
implements
(
self
,
"""_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction
dropPartitions
_getLastTID
deleteObject deleteTransaction
_dropPartition
_getLastTID
getReplicationObjectList _getTIDList nonempty"""
.
split
())
_getPartition
=
property
(
lambda
self
:
self
.
db
.
_getPartition
)
...
...
neo/storage/database/manager.py
View file @
64e08bd5
...
...
@@ -14,11 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
os
,
errno
,
socket
,
sys
,
threading
,
time
import
os
,
errno
,
socket
,
sys
,
threading
from
collections
import
defaultdict
from
contextlib
import
contextmanager
from
copy
import
copy
from
functools
import
wraps
from
time
import
time
from
neo.lib
import
logging
,
util
from
neo.lib.exception
import
NonReadableCell
from
neo.lib.interfaces
import
abstract
,
requires
...
...
@@ -54,6 +55,8 @@ class DatabaseManager(object):
LOCKED
=
"error: database is locked"
_deferred
=
0
_drop_stats
=
0
,
0
_dropping
=
None
_repairing
=
None
def
__init__
(
self
,
database
,
engine
=
None
,
wait
=
None
):
...
...
@@ -213,7 +216,8 @@ class DatabaseManager(object):
self
.
setConfiguration
(
"version"
,
version
)
def
doOperation
(
self
,
app
):
pass
if
self
.
_dropping
:
self
.
_dropPartitions
(
app
)
def
_close
(
self
):
"""Backend-specific code to close the database"""
...
...
@@ -560,7 +564,8 @@ class DatabaseManager(object):
if
-
x
[
1
]
in
READABLE
)
@
requires
(
_changePartitionTable
,
_getLastIDs
,
_getLastTID
)
def
changePartitionTable
(
self
,
ptid
,
num_replicas
,
cell_list
,
reset
=
False
):
def
changePartitionTable
(
self
,
app
,
ptid
,
num_replicas
,
cell_list
,
reset
=
False
):
my_nid
=
self
.
getUUID
()
pt
=
dict
(
self
.
iterAssignedCells
())
# In backup mode, the last transactions of a readable cell may be
...
...
@@ -568,23 +573,41 @@ class DatabaseManager(object):
backup_tid
=
self
.
getBackupTID
()
if
backup_tid
:
backup_tid
=
util
.
u64
(
backup_tid
)
def
outofdate_tid
(
offset
):
tid
=
pt
.
get
(
offset
,
0
)
if
tid
>=
0
:
return
tid
return
-
tid
in
READABLE
and
(
backup_tid
or
max
(
self
.
_getLastIDs
(
offset
)[
0
],
self
.
_getLastTID
(
offset
)))
or
0
cell_list
=
[(
offset
,
nid
,
(
None
if
state
==
CellStates
.
DISCARDED
else
-
state
if
nid
!=
my_nid
or
state
!=
CellStates
.
OUT_OF_DATE
else
outofdate_tid
(
offset
)))
for
offset
,
nid
,
state
in
cell_list
]
self
.
_changePartitionTable
(
cell_list
,
reset
)
max_offset
=
-
1
dropping
=
self
.
_dropping
or
set
()
assigned
=
[]
cells
=
[]
for
offset
,
nid
,
state
in
cell_list
:
if
max_offset
<
offset
:
max_offset
=
offset
if
state
==
CellStates
.
DISCARDED
:
if
nid
==
my_nid
:
dropping
.
add
(
offset
)
tid
=
None
else
:
if
nid
==
my_nid
:
assigned
.
append
(
offset
)
if
nid
!=
my_nid
or
state
!=
CellStates
.
OUT_OF_DATE
:
tid
=
-
state
else
:
tid
=
pt
.
get
(
offset
,
0
)
if
tid
<
0
:
tid
=
-
tid
in
READABLE
and
(
backup_tid
or
max
(
self
.
_getLastIDs
(
offset
)[
0
],
self
.
_getLastTID
(
offset
)))
or
0
cells
.
append
((
offset
,
nid
,
tid
))
if
reset
:
dropping
.
update
(
xrange
(
max_offset
+
1
))
dropping
.
difference_update
(
assigned
)
self
.
_changePartitionTable
(
cells
,
reset
)
self
.
_updateReadable
(
reset
)
assert
isinstance
(
ptid
,
(
int
,
long
)),
ptid
self
.
_setConfiguration
(
'ptid'
,
str
(
ptid
))
self
.
_setConfiguration
(
'replicas'
,
str
(
num_replicas
))
if
dropping
and
not
self
.
_dropping
:
self
.
_dropping
=
dropping
if
app
.
operational
:
self
.
_dropPartitions
(
app
)
@
requires
(
_changePartitionTable
)
def
updateCellTID
(
self
,
partition
,
tid
):
...
...
@@ -629,9 +652,70 @@ class DatabaseManager(object):
else
:
yield
offset
,
None
def
_dropPartitions
(
self
,
app
):
if
app
.
disable_drop_partitions
:
logging
.
info
(
"don't drop data for partitions %r"
,
self
.
_dropping
)
return
def
dropPartitions
():
dropping
=
self
.
_dropping
before
=
drop_count
,
drop_time
=
self
.
_drop_stats
commit
=
dropped
=
0
while
dropping
:
offset
=
next
(
iter
(
dropping
))
log
=
dropped
while
True
:
yield
1
if
offset
not
in
dropping
:
break
start
=
time
()
if
0
<
commit
<
start
:
self
.
commit
()
logging
.
debug
(
'drop: committed'
)
commit
=
0
continue
data_id_list
=
self
.
_dropPartition
(
offset
,
# The efficiency drops when the number of lines to
# delete is too small so do not delete too few.
max
(
100
,
int
(.
1
*
drop_count
/
drop_time
))
if
drop_time
else
1000
)
if
data_id_list
:
if
not
commit
:
commit
=
time
()
+
1
if
log
==
dropped
:
dropped
+=
1
logging
.
info
(
"dropping partition %s..."
,
offset
)
if
type
(
data_id_list
)
is
list
:
try
:
data_id_list
.
remove
(
None
)
pass
# XXX: not covered
except
ValueError
:
pass
logging
.
debug
(
'drop: pruneData(%s)'
,
len
(
data_id_list
))
drop_count
+=
self
.
_pruneData
(
data_id_list
)
drop_time
+=
time
()
-
start
self
.
_drop_stats
=
drop_count
,
drop_time
continue
dropping
.
remove
(
offset
)
break
if
dropped
:
if
commit
:
self
.
commit
()
logging
.
info
(
"%s partition(s) dropped"
" (stats: count: %s/%s, time: %.4s/%.4s)"
,
dropped
,
drop_count
-
before
[
0
],
drop_count
,
round
(
drop_time
-
before
[
1
],
3
),
round
(
drop_time
,
3
))
app
.
newTask
(
dropPartitions
())
@
abstract
def
dropPartitions
(
self
,
offset_list
):
"""Delete all data for specified partitions"""
def
_dropPartition
(
self
,
offset
,
count
):
"""Delete rows for given partition
Delete at most 'count' rows of from obj:
- if there's no line to delete, purge trans and return
a boolean indicating if any row was deleted (from trans)
- else return data ids of deleted rows
"""
def
_getUnfinishedDataIdList
(
self
):
"""Drop any unfinished data from a database."""
...
...
@@ -852,10 +936,10 @@ class DatabaseManager(object):
my_nid
=
self
.
getUUID
()
commit
=
0
for
partition
,
state
in
self
.
iterAssignedCells
():
if
commit
<
time
.
time
():
if
commit
<
time
():
if
commit
:
self
.
commit
()
commit
=
time
.
time
()
+
10
commit
=
time
()
+
10
if
state
>
tid
:
cell_list
.
append
((
partition
,
my_nid
,
tid
))
self
.
_deleteRange
(
partition
,
tid
)
...
...
neo/storage/database/mysql.py
View file @
64e08bd5
...
...
@@ -532,19 +532,20 @@ class MySQLDatabaseManager(DatabaseManager):
" ON DUPLICATE KEY UPDATE tid = %d"
%
(
offset
,
nid
,
tid
,
tid
))
def
dropPartitions
(
self
,
offset_lis
t
):
def
_dropPartition
(
self
,
offset
,
coun
t
):
q
=
self
.
query
# XXX: these queries are inefficient (execution time increase with
# row count, although we use indexes) when there are rows to
# delete. It should be done as an idle task, by chunks.
for
partition
in
offset_list
:
where
=
" WHERE `partition`=%d"
%
partition
data_id_list
=
[
x
for
x
,
in
q
(
"SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
"%s AND data_id IS NOT NULL"
%
where
)]
where
=
" WHERE `partition`=%s ORDER BY tid, oid LIMIT %s"
%
(
offset
,
count
)
logging
.
debug
(
"drop: select(%s)"
,
count
)
x
=
q
(
"SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
+
where
)
if
x
:
logging
.
debug
(
"drop: obj"
)
q
(
"DELETE FROM obj"
+
where
)
q
(
"DELETE FROM trans"
+
where
)
self
.
_pruneData
(
data_id_list
)
return
[
x
for
x
,
in
x
]
logging
.
debug
(
"drop: trans"
)
q
(
"DELETE FROM trans WHERE `partition`=%s"
%
offset
)
(
x
,),
=
q
(
'SELECT ROW_COUNT()'
)
return
x
def
_getUnfinishedDataIdList
(
self
):
return
[
x
for
x
,
in
self
.
query
(
...
...
neo/storage/database/sqlite.py
View file @
64e08bd5
...
...
@@ -368,17 +368,14 @@ class SQLiteDatabaseManager(DatabaseManager):
q
(
"INSERT OR FAIL INTO pt VALUES (?,?,?)"
,
(
offset
,
nid
,
int
(
state
)))
def
dropPartitions
(
self
,
offset_list
):
where
=
" WHERE partition=?"
def
_dropPartition
(
self
,
*
args
):
q
=
self
.
query
for
partition
in
offset_list
:
args
=
partition
,
data_id_list
=
[
x
for
x
,
in
q
(
"SELECT DISTINCT data_id FROM obj%s AND data_id IS NOT NULL"
%
where
,
args
)]
q
(
"DELETE FROM obj"
+
where
,
args
)
q
(
"DELETE FROM trans"
+
where
,
args
)
self
.
_pruneData
(
data_id_list
)
where
=
" FROM obj WHERE partition=? ORDER BY tid, oid LIMIT ?"
x
=
q
(
"SELECT DISTINCT data_id"
+
where
,
args
).
fetchall
()
if
x
:
q
(
"DELETE"
+
where
,
args
)
return
[
x
for
x
,
in
x
]
return
q
(
"DELETE FROM trans WHERE partition=?"
,
args
[:
1
]).
rowcount
def
_getUnfinishedDataIdList
(
self
):
return
[
x
for
x
,
in
self
.
query
(
...
...
neo/storage/handlers/__init__.py
View file @
64e08bd5
...
...
@@ -71,7 +71,7 @@ class BaseMasterHandler(BaseHandler):
if
ptid
!=
1
+
app
.
pt
.
getID
():
raise
ProtocolError
(
'wrong partition table id'
)
app
.
pt
.
update
(
ptid
,
num_replicas
,
cell_list
,
app
.
nm
)
app
.
dm
.
changePartitionTable
(
ptid
,
num_replicas
,
cell_list
)
app
.
dm
.
changePartitionTable
(
app
,
ptid
,
num_replicas
,
cell_list
)
if
app
.
operational
:
app
.
replicator
.
notifyPartitionChanges
(
cell_list
)
app
.
dm
.
commit
()
...
...
neo/storage/handlers/initialization.py
View file @
64e08bd5
...
...
@@ -27,25 +27,11 @@ class InitializationHandler(BaseMasterHandler):
pt
.
load
(
ptid
,
num_replicas
,
row_list
,
app
.
nm
)
if
not
pt
.
filled
():
raise
ProtocolError
(
'Partial partition table received'
)
# Install the partition table into the database for persistence.
cell_list
=
[]
unassigned
=
range
(
pt
.
getPartitions
())
for
offset
in
reversed
(
unassigned
):
for
cell
in
pt
.
getCellList
(
offset
):
cell_list
.
append
((
offset
,
cell
.
getUUID
(),
cell
.
getState
()))
if
cell
.
getUUID
()
==
app
.
uuid
:
unassigned
.
remove
(
offset
)
# delete objects database
cell_list
=
[(
offset
,
cell
.
getUUID
(),
cell
.
getState
())
for
offset
in
xrange
(
pt
.
getPartitions
())
for
cell
in
pt
.
getCellList
(
offset
)]
dm
=
app
.
dm
if
unassigned
:
if
app
.
disable_drop_partitions
:
logging
.
info
(
'partitions %r are discarded but actual deletion'
' of data is disabled'
,
unassigned
)
else
:
logging
.
debug
(
'drop data for partitions %r'
,
unassigned
)
dm
.
dropPartitions
(
unassigned
)
dm
.
changePartitionTable
(
ptid
,
num_replicas
,
cell_list
,
reset
=
True
)
dm
.
changePartitionTable
(
app
,
ptid
,
num_replicas
,
cell_list
,
reset
=
True
)
dm
.
commit
()
def
truncate
(
self
,
conn
,
tid
):
...
...
neo/tests/storage/testMasterHandler.py
View file @
64e08bd5
...
...
@@ -92,7 +92,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
# dm call
calls
=
self
.
app
.
dm
.
mockGetNamedCalls
(
'changePartitionTable'
)
self
.
assertEqual
(
len
(
calls
),
1
)
calls
[
0
].
checkArgs
(
ptid
,
1
,
cells
)
calls
[
0
].
checkArgs
(
app
,
ptid
,
1
,
cells
)
if
__name__
==
"__main__"
:
unittest
.
main
()
neo/tests/storage/testStorageDBTests.py
View file @
64e08bd5
...
...
@@ -49,7 +49,7 @@ class StorageDBTests(NeoUnitTestBase):
uuid
=
self
.
getStorageUUID
()
db
.
setUUID
(
uuid
)
self
.
assertEqual
(
uuid
,
db
.
getUUID
())
db
.
changePartitionTable
(
1
,
0
,
db
.
changePartitionTable
(
None
,
1
,
0
,
[(
i
,
uuid
,
CellStates
.
UP_TO_DATE
)
for
i
in
xrange
(
num_partitions
)],
reset
=
True
)
self
.
assertEqual
(
num_partitions
,
1
+
db
.
_getMaxPartition
())
...
...
neo/tests/threaded/testReplication.py
View file @
64e08bd5
...
...
@@ -455,13 +455,10 @@ class ReplicationTests(NEOThreadedTest):
return
isinstance
(
packet
,
delayed
)
and
\
packet
.
_args
[
0
]
==
offset
and
\
conn
in
s1
.
getConnectionList
(
s0
)
def
changePartitionTable
(
orig
,
ptid
,
num_replicas
,
cell_list
):
def
changePartitionTable
(
orig
,
app
,
ptid
,
num_replicas
,
cell_list
):
if
(
offset
,
s0
.
uuid
,
CellStates
.
DISCARDED
)
in
cell_list
:
connection_filter
.
remove
(
delayAskFetch
)
# XXX: this is currently not done by
# default for performance reason
orig
.
im_self
.
dropPartitions
((
offset
,))
return
orig
(
ptid
,
num_replicas
,
cell_list
)
return
orig
(
app
,
ptid
,
num_replicas
,
cell_list
)
np
=
cluster
.
num_partitions
s0
,
s1
,
s2
=
cluster
.
storage_list
for
delayed
in
Packets
.
AskFetchTransactions
,
Packets
.
AskFetchObjects
:
...
...
@@ -686,17 +683,7 @@ class ReplicationTests(NEOThreadedTest):
cluster
.
neoctl
.
tweakPartitionTable
()
self
.
tic
()
self
.
assertEqual
(
1
,
s1
.
sqlCount
(
'obj'
))
# Deletion should start as soon as the cell is discarded, as a
# background task, instead of doing it during initialization.
count
=
s0
.
sqlCount
(
'obj'
)
s0
.
stop
()
cluster
.
join
((
s0
,))
s0
.
resetNode
()
s0
.
start
()
self
.
tic
()
self
.
assertEqual
(
2
,
s0
.
sqlCount
(
'obj'
))
with
self
.
expectedFailure
():
\
self
.
assertEqual
(
2
,
count
)
@
with_cluster
(
replicas
=
1
)
def
testResumingReplication
(
self
,
cluster
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment