Commit dc8eeb8c authored by Julien Muchembled's avatar Julien Muchembled

wip

parent 4bf4ad49
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import print_function from __future__ import print_function
import argparse, errno, logging, os, socket, sys, threading import argparse, errno, logging, os, socket, sys, threading
from array import array
from bisect import insort from bisect import insort
from collections import defaultdict from collections import defaultdict
from contextlib import closing, contextmanager from contextlib import closing, contextmanager
...@@ -29,6 +30,11 @@ VERSION = 1 ...@@ -29,6 +30,11 @@ VERSION = 1
logger = logging.getLogger('reflink') logger = logging.getLogger('reflink')
for array32u in 'IL':
array32u = partial(array, array32u)
if array32u().itemsize == 4:
break
try: try:
from ZODB.Connection import TransactionMetaData from ZODB.Connection import TransactionMetaData
except ImportError: # BBB: ZODB < 5 except ImportError: # BBB: ZODB < 5
...@@ -107,6 +113,61 @@ class InvalidationListener(object): ...@@ -107,6 +113,61 @@ class InvalidationListener(object):
transform_record_data = untransform_record_data = None transform_record_data = untransform_record_data = None
class OidArray(object):
__slots__ = 'arrays',
def __init__(self, arrays):
self.arrays = arrays
def __iand__(self, other):
arrays = self.arrays
other = other.arrays
for i, a in enumerate(arrays):
a = set(a)
try:
a.intersection_update(other[i])
except IndexError:
del arrays[i:]
break
arrays[i] = array32u(sorted(a))
return self
def __nonzero__(self):
return any(self.arrays)
def __len__(self):
return sum(map(len, self.arrays))
def __iter__(self):
for i, x in enumerate(self.arrays):
i <<= 32
for x in x:
yield p64(i | x)
def __delitem__(self, key):
assert None is key.start is key.step, key
n = key.stop
for x in self.arrays:
k = len(x)
del x[:n]
n -= k
if n <= 0:
break
def append(self, oid):
oid = u64(oid)
i = oid >> 32
oid &= 0xFFFFFFFF
arrays = self.arrays
while True:
try:
arrays[i].append(oid)
break
except IndexError:
arrays.append(array32u())
class Object(object): class Object(object):
__slots__ = 'referrers', 'referents', 'prev_orphan', 'next_orphan' __slots__ = 'referrers', 'referents', 'prev_orphan', 'next_orphan'
...@@ -338,7 +399,13 @@ class Changeset(object): ...@@ -338,7 +399,13 @@ class Changeset(object):
try: try:
oid = stack.pop() oid = stack.pop()
except IndexError: except IndexError:
return {p64(*x) for x in q('SELECT oid FROM t')} x, = q('SELECT MAX(oid) >> 32 FROM t').fetchone()
return OidArray([
array32u(x for x, in q(
'SELECT oid & 0xFFFFFFFF FROM t'
' WHERE oid>=? AND oid<? ORDER BY oid',
(x << 32, x+1 << 32)))
for x in xrange(x + 1)])
k = oid, k = oid,
try: try:
(v,), = q('SELECT referents FROM t WHERE oid=?', k) (v,), = q('SELECT referents FROM t WHERE oid=?', k)
...@@ -605,15 +672,15 @@ def main(args=None): ...@@ -605,15 +672,15 @@ def main(args=None):
return "Main storage shall implement " + iface.__name__ return "Main storage shall implement " + iface.__name__
if command == "path": if command == "path":
t = args.tid x = args.tid
if t is None: if x is None:
tid = inc64(tid) tid = inc64(tid)
else: else:
tid = p64(t + 1) tid = p64(x + 1)
t = p64(t) x = p64(x)
def find_global(*args): def find_global(*args):
obj.extend(args) obj.extend(args)
with changeset.historical(t): with changeset.historical(x):
for oid in reversed(changeset.path(p64(args.oid))): for oid in reversed(changeset.path(p64(args.oid))):
obj = [hex(u64(oid))] obj = [hex(u64(oid))]
if args.main: if args.main:
...@@ -638,9 +705,9 @@ def main(args=None): ...@@ -638,9 +705,9 @@ def main(args=None):
exit_before_gc = args.exit_before_gc exit_before_gc = args.exit_before_gc
exit_after_gc = args.exit_after_gc exit_after_gc = args.exit_after_gc
no_gc = args.no_gc no_gc = args.no_gc
t = args.pack_neo x = args.pack_neo
if t: if x:
changeset.pack(tidFromTime(t)) changeset.pack(tidFromTime(x))
job_count = args.jobs job_count = args.jobs
if job_count <= 0: if job_count <= 0:
parser.error("--jobs must be strictly positive.") parser.error("--jobs must be strictly positive.")
...@@ -739,12 +806,12 @@ def main(args=None): ...@@ -739,12 +806,12 @@ def main(args=None):
src.referents = referents src.referents = referents
for referent in referents: for referent in referents:
insort(changeset.get(referent).referrers, oid) insort(changeset.get(referent).referrers, oid)
t = time() x = time()
if next_commit <= t: if next_commit <= x:
tid = inc64(tid) tid = inc64(tid)
changeset.bootstrap = bootstrap[0], u64(oid) + 1 changeset.bootstrap = bootstrap[0], u64(oid) + 1
changeset.commit(tid, "oid=%x" % u64(oid)) changeset.commit(tid, "oid=%x" % u64(oid))
next_commit = t + commit_interval next_commit = x + commit_interval
changeset.bootstrap = bootstrap[0], None changeset.bootstrap = bootstrap[0], None
tid = p64(bootstrap[0]) tid = p64(bootstrap[0])
changeset.commit(tid) changeset.commit(tid)
...@@ -811,20 +878,29 @@ def main(args=None): ...@@ -811,20 +878,29 @@ def main(args=None):
invalidation_listener = InvalidationListener(main_storage, tid) invalidation_listener = InvalidationListener(main_storage, tid)
def iterTrans(x): if commit_interval:
put = queue.put deleted_dict = {}
try: def iterTrans(x):
for x in x: put = queue.put
put(x) try:
for x in x: for x in x:
put(x) tid = x.tid
put(None) put(tid)
except: try:
exc_info[:] = sys.exc_info() x = deleted_dict.pop(tid)
put(None) except KeyError:
for x in x:
put((x.oid, x.data))
else:
for x in x:
put((x, None))
put(None)
except:
exc_info[:] = sys.exc_info()
put(None)
if not no_gc: if not no_gc:
gc_lock_name = "\0reflink-%s" % os.getuid() gc_lock_name = "\0reflink-%s" % os.getpid()
next_gc = period and TimeStamp(changeset.last_gc).timeTime() + period next_gc = period and TimeStamp(changeset.last_gc).timeTime() + period
next_commit = time() + commit_interval next_commit = time() + commit_interval
while True: while True:
...@@ -834,22 +910,21 @@ def main(args=None): ...@@ -834,22 +910,21 @@ def main(args=None):
thread.daemon = True thread.daemon = True
thread.start() thread.start()
while True: while True:
t = queue.get() x = queue.get()
if t is None: if x is None:
checkExc() checkExc()
break break
tid = t.tid tid = x
# logger.debug("tid=%x", u64(tid)) # logger.debug("tid=%x", u64(tid))
check_orphan = {} check_orphan = {}
while True: while True:
record = queue.get() x = queue.get()
if record is None: if x is None:
checkExc() checkExc()
break break
oid = record.oid oid, data = x
# logger.debug(" oid=%x", u64(oid)) # logger.debug(" oid=%x", u64(oid))
src = changeset.get(oid) src = changeset.get(oid)
data = record.data
prev_set = src.referents prev_set = src.referents
if data is None: if data is None:
# logger.debug(" deleted") # logger.debug(" deleted")
...@@ -892,13 +967,13 @@ def main(args=None): ...@@ -892,13 +967,13 @@ def main(args=None):
if changeset.check_orphan.get(referent): if changeset.check_orphan.get(referent):
changeset.check_orphan[referent] = False changeset.check_orphan[referent] = False
check_orphan[referent] = dst check_orphan[referent] = dst
t = inc64(tid) x = inc64(tid)
for oid, obj in check_orphan.iteritems(): for oid, obj in check_orphan.iteritems():
if obj.maybeOrphan(): if obj.maybeOrphan():
assert not (obj.prev_orphan or obj.next_orphan), oid assert not (obj.prev_orphan or obj.next_orphan), oid
if not obj.referents: if not obj.referents:
try: try:
if main_storage.loadBefore(oid, t) is None: if main_storage.loadBefore(oid, x) is None:
continue continue
except POSKeyError: except POSKeyError:
continue continue
...@@ -911,19 +986,20 @@ def main(args=None): ...@@ -911,19 +986,20 @@ def main(args=None):
prev_orphan.next_orphan = oid prev_orphan.next_orphan = oid
changeset.check_orphan.clear() changeset.check_orphan.clear()
t = time() x = time()
if next_commit <= t: if next_commit <= x:
changeset.commit(tid) changeset.commit(tid)
next_commit = t + commit_interval next_commit = x + commit_interval
thread.join() thread.join()
assert not deleted_dict, list(deleted_dict)
t = time() x = time()
timeout = next_gc - t timeout = next_gc - x
if timeout <= 0 or not commit_interval: if timeout <= 0 or not commit_interval:
timeout = None timeout = None
if (full or changeset.orphan or bootstrap) and not no_gc: if (full or changeset.orphan or bootstrap) and not no_gc:
if changeset.buckets: if changeset.buckets:
next_commit = t + commit_interval next_commit = x + commit_interval
changeset.commit(tid) changeset.commit(tid)
assert tid == changeset.storage.lastTransaction() assert tid == changeset.storage.lastTransaction()
if exit_before_gc: if exit_before_gc:
...@@ -954,7 +1030,8 @@ def main(args=None): ...@@ -954,7 +1030,8 @@ def main(args=None):
else: else:
gc_tid = tid gc_tid = tid
orphans = gc(None) orphans = gc(None)
orphans = sorted(orphans) if isinstance(orphans, set):
orphans = sorted(orphans)
logger.info(' found %s OID(s) to delete', len(orphans)) logger.info(' found %s OID(s) to delete', len(orphans))
log_remaining = False log_remaining = False
count = 0 count = 0
...@@ -966,20 +1043,22 @@ def main(args=None): ...@@ -966,20 +1043,22 @@ def main(args=None):
with closing(socket.socket(socket.AF_UNIX, with closing(socket.socket(socket.AF_UNIX,
socket.SOCK_STREAM)) as s: socket.SOCK_STREAM)) as s:
try: try:
with closing(s.connect(gc_lock_name)) as s: s.connect(gc_lock_name)
s.recv(1) s.recv(1)
except socket.error: except socket.error:
pass pass
deleted = OidArray([])
txn = TransactionMetaData( txn = TransactionMetaData(
description=TXN_GC_DESC % u64(gc_tid)) description=TXN_GC_DESC % u64(gc_tid))
main_storage.tpc_begin(txn) main_storage.tpc_begin(txn)
try: try:
for i, oid, data, serial in iter_orphans( for i, oid, data, serial in iter_orphans(
enumerate(orphans), main_storage.load): enumerate(orphans, 1), main_storage.load):
if gc_tid < serial: if gc_tid < serial:
count = None count = None
break break
main_storage.deleteObject(oid, serial, txn) main_storage.deleteObject(oid, serial, txn)
deleted.append(oid)
if dry_run: if dry_run:
oid = u64(oid) oid = u64(oid)
try: try:
...@@ -1024,13 +1103,15 @@ def main(args=None): ...@@ -1024,13 +1103,15 @@ def main(args=None):
logger.info(' tpc_finish...') logger.info(' tpc_finish...')
main_storage.tpc_finish(txn, main_storage.tpc_finish(txn,
invalidation_listener.tpc_finish) invalidation_listener.tpc_finish)
changeset.last_gc = invalidation_listener.last_gc x = changeset.last_gc = \
invalidation_listener.last_gc
if commit_interval:
deleted_dict[x] = deleted
next_commit = 0 next_commit = 0
if period: if period:
# We don't want future `gc(gc_tid)` to waste # We don't want future `gc(gc_tid)` to waste
# time with OIDs that are already deleted. # time with OIDs that are already deleted.
next_gc = TimeStamp(changeset.last_gc next_gc = TimeStamp(x).timeTime() + period
).timeTime() + period
continue continue
except ConflictError: except ConflictError:
count = None count = None
...@@ -1039,6 +1120,7 @@ def main(args=None): ...@@ -1039,6 +1120,7 @@ def main(args=None):
count = 0 count = 0
else: else:
main_storage.tpc_abort(txn) main_storage.tpc_abort(txn)
del deleted
break break
del orphans del orphans
if count == 0: if count == 0:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment