Commit e4059423 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 095f166c
......@@ -70,7 +70,7 @@
Currently, storage presence is broadcasted to client nodes too early, as
the storage node would refuse them until it has only up-to-date data (not
only up-to-date cells, but also a partition table and node states).
- In backup mode, 2 simultaneous replication should be possible so that:
- In backup mode, 2 simultaneous replication should be possible so that: NOTE
- outdated cells does not block backup for too long time
- constantly modified partitions does not prevent outdated cells to
replicate
......@@ -86,7 +86,7 @@
not degrade performance for client nodes. But when there's only 1 storage
left for a partition, it may be wanted to guarantee a minimum speed to
avoid complete data loss if another failure happens too early.
- Find a way not to always start replication from the beginning. Currently,
- Find a way not to always start replication from the beginning. Currently, NOTE
a temporarily down nodes can't replicate from where it was interrupted,
which is an issue on big databases. (SPEED)
- Pack segmentation & throttling (HIGH AVAILABILITY)
......@@ -106,7 +106,7 @@
- Check replicas: (HIGH AVAILABILITY)
- Automatically tell corrupted cells to fix their data when a good source
is known.
- Add an option to also check all rows of trans/obj/data, instead of only
- Add an option to also check all rows of trans/obj/data, instead of only NOTE
keys (trans.tid & obj.{tid,oid}).
Master
......@@ -131,7 +131,7 @@
table hasn't changed by pinging the master and retry if necessary.
- Implement IStorageRestoreable (ZODB API) in order to preserve data
serials (i.e. undo information).
- Fix and reenable deadlock avoidance (SPEED). This is required for
- Fix and reenable deadlock avoidance (SPEED). This is required for NOTE
neo.threaded.test.Test.testDeadlockAvoidance
Admin
......
......@@ -181,7 +181,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
app._cache_lock_release()
def notifyPartitionChanges(self, conn, ptid, cell_list):
if self.app.pt.filled():
if self.app.pt.filled(): # XXX wrong
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, node_list):
......
......@@ -111,7 +111,7 @@ class HandlerSwitcher(object):
finally:
self._is_handling = False
def _handle(self, connection, packet):
def _handle(self, connection, packet): # NOTE incoming packet -> handle -> dispatch ...
assert len(self._pending) == 1 or self._pending[0][0]
logging.packet(connection, packet, False)
if connection.isClosed() and packet.ignoreOnClosedConnection():
......
......@@ -164,7 +164,7 @@ class SocketConnector(object):
# Do nothing special if n == 0:
# - it never happens for simple sockets;
# - for SSL sockets, this is always the case unless everything
# could be sent.
# could be sent. # NOTE queue may grow up indefinitely in this case!
if n != len(msg):
self.queued[:] = msg[n:],
return False
......
......@@ -109,7 +109,7 @@ class EpollEventManager(object):
self.wakeup()
else:
self.epoll.register(fd)
self.addReader(conn)
self.addReader(conn) # FIXME implicitly adding addReader
def unregister(self, conn, close=False):
new_pending_processing = [x for x in self._pending_processing
......@@ -136,6 +136,7 @@ class EpollEventManager(object):
if self._closeAcquire(0):
self._closeRelease()
# NOTE
def isIdle(self):
return not (self._pending_processing or self.writer_set)
......@@ -150,7 +151,7 @@ class EpollEventManager(object):
self._poll(blocking)
if not self._pending_processing:
return
to_process = self._pending_processing.pop(0)
to_process = self._pending_processing.pop(0) # FIXME only 1 element - ._pending_processing may grow
try:
to_process.process()
finally:
......
......@@ -199,6 +199,7 @@ UUID_NAMESPACES = {
NodeTypes.CLIENT: -0x20,
NodeTypes.ADMIN: -0x30,
}
# ex: 'S1', 'M1', ...
uuid_str = (lambda ns: lambda uuid:
ns[uuid >> 24] + str(uuid & 0xffffff) if uuid else str(uuid)
)({v: str(k)[0] for k, v in UUID_NAMESPACES.iteritems()})
......@@ -751,7 +752,7 @@ class Recovery(Packet):
"""
_answer = PStruct('answer_recovery',
PPTID('ptid'),
PTID('backup_tid'),
PTID('backup_tid'), # NOTE
PTID('truncate_tid'),
)
......@@ -787,7 +788,7 @@ class NotifyPartitionTable(Packet):
class PartitionChanges(Packet):
"""
Notify a subset of a partition table. This is used to notify changes.
PM -> S, C.
PM -> S, C. XXX also -> A (see RecoveryManager)
"""
_fmt = PStruct('notify_partition_changes',
PPTID('ptid'),
......@@ -917,6 +918,7 @@ class LockInformation(Packet):
PTID('ttid'),
)
# NOTE
class InvalidateObjects(Packet):
"""
Invalidate objects. PM -> C.
......@@ -1398,6 +1400,7 @@ class NotifyReady(Packet):
"""
pass
# NOTE
# replication
class FetchTransactions(Packet):
......@@ -1469,6 +1472,7 @@ class AddObject(Packet):
PTID('data_serial'),
)
# NOTE
class Replicate(Packet):
"""
Notify a storage node to replicate partitions up to given 'tid'
......
......@@ -346,7 +346,7 @@ class Application(BaseApplication):
raise RuntimeError("No upstream cluster to backup"
" defined in configuration")
truncate = Packets.Truncate(
self.backup_app.provideService())
self.backup_app.provideService()) # NOTE enter to backup main loop
except StoppedOperation, e:
logging.critical('No longer operational')
truncate = Packets.Truncate(*e.args) if e.args else None
......@@ -445,7 +445,7 @@ class Application(BaseApplication):
elif node.isStorage() and storage_handler:
handler = storage_handler
else:
continue # keep handler
continue # keep handler # FIXME handler can be not setup-yet at all
if type(handler) is not type(conn.getLastHandler()):
conn.setHandler(handler)
handler.connectionCompleted(conn, new=False)
......@@ -522,12 +522,14 @@ class Application(BaseApplication):
tid = txn.getTID()
transaction_node = txn.getNode()
invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
# NOTE send invalidation to clients
for client_node in self.nm.getClientList(only_identified=True):
c = client_node.getConnection()
if client_node is transaction_node:
c.answer(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId())
else:
# NOTE notifies clients sequentially & irregardless of whether client was subscribed
c.notify(invalidate_objects)
# Unlock Information to relevant storage nodes.
......
......@@ -93,7 +93,7 @@ class BackupApplication(object):
while True:
app.changeClusterState(ClusterStates.STARTING_BACKUP)
bootstrap = BootstrapManager(self, self.name, NodeTypes.CLIENT)
# {offset -> node}
# {offset -> node} (primary storage for off which will be talking to upstream cluster)
self.primary_partition_dict = {}
# [[tid]]
self.tid_list = tuple([] for _ in xrange(pt.getPartitions()))
......@@ -193,6 +193,8 @@ class BackupApplication(object):
for node in trigger_set:
self.triggerBackup(node)
# NOTE called by backup_app.invalidateObjects() when it has info that
# partitions in partition_set were updated in upstream cluster (up to `tid`)
def invalidatePartitions(self, tid, partition_set):
app = self.app
prev_tid = app.getLastTransaction()
......@@ -211,7 +213,7 @@ class BackupApplication(object):
for cell in pt.getCellList(offset, readable=True):
node = cell.getNode()
assert node.isConnected(), node
if cell.backup_tid == prev_tid:
if cell.backup_tid == prev_tid: # XXX ?
# Let's given 4 TID t0,t1,t2,t3: if a cell is only
# modified by t0 & t3 and has all data for t0, 4 values
# are possible for its 'backup_tid' until it replicates
......@@ -238,8 +240,8 @@ class BackupApplication(object):
self.primary_partition_dict[offset] = \
random.choice(node_list)
else:
# Partition not touched, so increase 'backup_tid' of all
# "up-to-date" replicas, without having to replicate.
# Partition not touched, so increase 'backup_tid' of all NOTE
# "up-to-date" replicas, without having to replicate. (probably relates to backup_tid=tid initial bug)
for cell in pt.getCellList(offset, readable=True):
if last_max_tid <= cell.backup_tid:
cell.backup_tid = tid
......@@ -252,7 +254,7 @@ class BackupApplication(object):
cell.replicating = tid
for node, untouched_dict in untouched_dict.iteritems():
if app.isStorageReady(node.getUUID()):
node.notify(Packets.Replicate(tid, '', untouched_dict))
node.notify(Packets.Replicate(tid, '', untouched_dict)) # NOTE Mb -> Sb (notify tid brings no new data)
for node in trigger_set:
self.triggerBackup(node)
count = sum(map(len, self.tid_list))
......@@ -288,9 +290,10 @@ class BackupApplication(object):
source_dict[offset] = addr
logging.debug("ask %s to replicate partition %u up to %s from %r",
uuid_str(node.getUUID()), offset, dump(tid), addr)
node.getConnection().notify(Packets.Replicate(
node.getConnection().notify(Packets.Replicate( # NOTE Mb -> Sb (notify to trigger replicate up to tid)
tid, self.name, source_dict))
# NOTE feedback from Sb -> Mb a partition (requested by invalidatePartitions->triggerBackup) has been replicated
def notifyReplicationDone(self, node, offset, tid):
app = self.app
cell = app.pt.getCell(offset, node.getUUID())
......
......@@ -68,7 +68,7 @@ class AdministrationHandler(MasterHandler):
'entering cluster' % (node, ))
app._startup_allowed = True
state = app.cluster_state
elif state == ClusterStates.STARTING_BACKUP:
elif state == ClusterStates.STARTING_BACKUP: # NOTE
if app.tm.hasPending() or app.nm.getClientList(True):
raise ProtocolError("Can not switch to %s state with pending"
" transactions or connected clients" % state)
......@@ -136,7 +136,7 @@ class AdministrationHandler(MasterHandler):
for node in app.nm.getStorageList()
if node.isPending() and node.getUUID() in uuid_list))
if node_list:
p = Packets.StartOperation(bool(app.backup_tid))
p = Packets.StartOperation(bool(app.backup_tid)) # NOTE ...
for node in node_list:
node.setRunning()
node.notify(p)
......
......@@ -37,6 +37,7 @@ class BackupHandler(EventHandler):
def notifyNodeInformation(self, conn, node_list):
self.app.nm.update(node_list)
# NOTE invalidation from M -> Mb (all partitions)
def answerLastTransaction(self, conn, tid):
app = self.app
if tid != ZERO_TID:
......@@ -44,6 +45,7 @@ class BackupHandler(EventHandler):
else: # upstream DB is empty
assert app.app.getLastTransaction() == tid
# NOTE invalidation from M -> Mb
def invalidateObjects(self, conn, tid, oid_list):
app = self.app
getPartition = app.app.pt.getPartition
......
......@@ -321,6 +321,7 @@ class PartitionTable(neo.lib.pt.PartitionTable):
except AttributeError:
pass
# NOTE
def setBackupTidDict(self, backup_tid_dict):
for row in self.partition_list:
for cell in row:
......@@ -328,9 +329,9 @@ class PartitionTable(neo.lib.pt.PartitionTable):
cell.backup_tid = backup_tid_dict.get(cell.getUUID(),
ZERO_TID)
def getBackupTid(self, mean=max):
def getBackupTid(self, mean=max): # XXX sometimes called with mean=min
try:
return min(mean(x.backup_tid for x in row if x.isReadable())
return min(mean(x.backup_tid for x in row if x.isReadable()) # XXX min(mean(...)) - correct?
for row in self.partition_list)
except ValueError:
return ZERO_TID
......
......@@ -111,6 +111,7 @@ class RecoveryManager(MasterHandler):
if cell_list:
self._notifyAdmins(Packets.NotifyPartitionChanges(
pt.setNextID(), cell_list))
# NOTE
if app.backup_tid:
pt.setBackupTidDict(self.backup_tid_dict)
app.backup_tid = pt.getBackupTid()
......@@ -147,6 +148,7 @@ class RecoveryManager(MasterHandler):
# ask the last IDs to perform the recovery
conn.ask(Packets.AskRecovery())
# NOTE
def answerRecovery(self, conn, ptid, backup_tid, truncate_tid):
uuid = conn.getUUID()
if self.target_ptid <= ptid:
......
......@@ -48,6 +48,7 @@ def main(args=None):
config = ConfigurationManager(defaults, options, 'storage')
# setup custom logging
logging.backlog(max_size=None) # log without delay
logging.setup(config.getLogfile())
# and then, load and run the application
......
......@@ -20,7 +20,8 @@ from logging import getLogger, INFO
from optparse import OptionParser
from neo.lib import logging
from neo.tests import functional
logging.backlog()
#logging.backlog()
logging.backlog(max_size=None)
del logging.default_root_handler.handle
def main():
......
......@@ -251,7 +251,7 @@ class Application(BaseApplication):
while not self.operational:
_poll()
self.ready = True
self.replicator.populate()
self.replicator.populate() # TODO study what's inside
self.master_conn.notify(Packets.NotifyReady())
def doOperation(self):
......@@ -275,6 +275,7 @@ class Application(BaseApplication):
while True:
while task_queue:
try:
# NOTE backup/importer processed under isIdle
while isIdle():
if task_queue[-1].next():
_poll(0)
......
......@@ -126,6 +126,7 @@ class ClientOperationHandler(EventHandler):
self._askStoreObject(conn, oid, serial, compression, checksum, data,
data_serial, ttid, unlock, time.time())
# NOTE
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
min_tid, max_tid, length, partition)))
......
......@@ -78,6 +78,7 @@ class InitializationHandler(BaseMasterHandler):
dm.unlockTransaction(tid, ttid)
dm.commit()
# NOTE M -> S "start operational"
def startOperation(self, conn, backup):
self.app.operational = True
# XXX: see comment in protocol
......
......@@ -179,6 +179,7 @@ class StorageOperationHandler(EventHandler):
def askFetchTransactions(self, conn, partition, length, min_tid, max_tid,
tid_list):
app = self.app
# NOTE XXX
if app.tm.isLockedTid(max_tid):
# Wow, backup cluster is fast. Requested transactions are still in
# ttrans/ttobj so wait a little.
......
......@@ -72,6 +72,7 @@ class Partition(object):
if hasattr(self, x)),
id(self))
# NOTE
class Replicator(object):
current_node = None
......@@ -366,6 +367,7 @@ class Replicator(object):
node = self.current_node
if node is not None is node.getUUID():
self.cancel()
# NOTE
# Cancel all replication orders from upstream cluster.
for offset in self.replicate_dict.keys():
addr, name = self.source_dict.get(offset, (None, None))
......
......@@ -611,6 +611,7 @@ class NEOCluster(object):
kw = dict(cluster=weak_self, getReplicas=replicas, getAdapter=adapter,
getPartitions=partitions, getReset=clear_databases,
getSSL=self.SSL)
# NOTE
if upstream is not None:
self.upstream = weakref.proxy(upstream)
kw.update(getUpstreamCluster=upstream.name,
......
......@@ -33,6 +33,7 @@ from .. import Patch
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random
# NOTE
def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped):
def wrapper(self):
......@@ -53,6 +54,7 @@ def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
return decorator
# NOTE
class ReplicationTests(NEOThreadedTest):
def checksumPartition(self, storage, partition, max_tid=MAX_TID):
......
Application (master)
BaseApplication
.em EventManager
.nm NodeManager
.tm TransactionManager
.pt PartitionTable
(master) Application < BaseApplication
.tm (master) TransactionManager
.pt (master) PartitionTable
.listening_conn ListeningConnection
.primary ?
.listening_conn ListeningConnection
.primary bool # "am I primary"
.primary_master_node ?
.cluster_state
.current_manager None | RecoveryManager | VerificationManager
.backup_app BackupApplication
.backup_tid
.truncate_tid
.admin_handler AdministrationHandler
.secondary_master_handler SecondaryMasterHandler
.client_service_handler ClientServiceHandler
.storage_service_handler StorageServiceHandler
(storage) Application < BaseApplication
.tm (storage) TransactionManager
.dm DatabaseManager (Importer | MySQL | SQLite)
.pt ? PartitionTable
.checker Checker
.replicator Replicator
.listening_conn ListeningConnection
.master_conn
.master_node
.event_queue deque (key, callable, msg_id, conn, args)
.event_queue_dict {} key -> count(.event_queue, key)
.task_queue deque
.operational bool
.ready bool # when .operational=T and "got all informations"
Handlers
--------
......
......@@ -12,8 +12,13 @@ etc1 = 'etc1'
def main():
logging.backlog(max_size=None, max_packet=None) # log everything & without bufferring
kw = { 'master_nodes': '[2001:67c:1254:e:20::3977]:2051', # M on webr-wneo-*1*
'name': 'woelfel-munich-clone',
kw = {
#'master_nodes': '[2001:67c:1254:e:20::3977]:2051', # M on webr-wneo-*1*
#'name': 'woelfel-munich-clone',
'master_nodes': '[2001:67c:1254:e:21::ffa]:2051', # M on webr-wneo-2
'name': 'woelfel-munich-clone-backup-comp-2591',
'read_only': True,
'logfile': 'x.log',
......@@ -22,11 +27,17 @@ def main():
'key': etc1 + '/neo.key',
}
print 'aaa'
stor = Storage(**kw)
print 'bbb'
db = DB(stor)
print 'ccc'
conn = db.open()
print 'ddd'
root = conn.root()
print 'eee'
print root
print 'fff'
while 1:
sleep(1)
......
......@@ -69,6 +69,11 @@ def dump(stor, tidmin, tidmax):
def mkneostor():
from logging import getLogger, INFO, DEBUG
from neo.lib import logging
#getLogger().setLevel(DEBUG)
logging.backlog(max_size=None, max_packet=None) # log everything & without bufferring
from neo.client.Storage import Storage as NEOStorage
etc1 = 'etc1'
"""
......@@ -97,6 +102,8 @@ def mkneostor():
}
stor = NEOStorage(**kw)
from time import sleep
sleep(2)
return stor
......
......@@ -33,6 +33,7 @@ class DummyObject(Persistent):
def __init__(self, data):
self._data = None
# NOTE
class ReplicationBenchmark(BenchmarkRunner):
""" Test replication process """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment