Commit f7d843fa authored by Łukasz Nowak's avatar Łukasz Nowak

Move TIDStorage product into new home.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@45804 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent fa061a33
No related merge requests found
############################################################################
#
# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
class ClientDisconnected(Exception):
pass
class ExchangeProtocol:
"""
Handle data exchange between client and server.
Kinds of data which can be exchanged:
- str
send_field
recv_field
- int
send_int
recv_int
- list of str
send_list
recv_list
- list of int
send_int_list
recv_int_list
- dict (key: str, value: int)
send_dict
recv_dict
Forbidden chars:
Send (raise if present):
\\n (field separator)
Receive (stripped silently):
\\n (field separator)
\\r (for compatibility)
"""
def __init__(self, socket):
self._socket = socket
def send_field(self, to_send):
if type(to_send) is not str:
raise ValueError, 'Value is not of str type: %r' % (type(to_send), )
if '\n' in to_send:
raise ValueError, '\\n is a forbidden value.'
self._socket.send(to_send)
self._socket.send('\n')
def recv_field(self):
received = None
result = []
append = result.append
while received != '\n':
received = self._socket.recv(1)
if len(received) == 0:
raise ClientDisconnected
if received != '\r':
append(received)
return ''.join(result[:-1])
def send_int(self, to_send):
self.send_field(str(to_send))
def recv_int(self):
return int(self.recv_field())
def send_list(self, to_send, send_length=True):
assert isinstance(to_send, (tuple, list))
if send_length:
self.send_int(len(to_send))
for field in to_send:
self.send_field(field)
def send_int_list(self, to_send, *args, **kw):
self.send_list([str(x) for x in to_send], *args, **kw)
def recv_list(self, length=None):
result = []
append = result.append
if length is None:
length = int(self.recv_field())
for field_number in xrange(length):
append(self.recv_field())
return result
def recv_int_list(self, *args, **kw):
return [int(x) for x in self.recv_list(*args, **kw)]
def send_dict(self, to_send):
"""
Key: string
Value: int
"""
assert isinstance(to_send, (dict))
if len(to_send) == 0:
key_list = value_list = []
else:
key_list, value_list = zip(*to_send.items())
self.send_list(key_list)
self.send_int_list(value_list, send_length=False)
def recv_dict(self):
"""
Key: string
Value: int
"""
key_list = self.recv_list()
value_list = self.recv_int_list(len(key_list))
result = dict(zip(key_list, value_list))
return result
TIDStorage
SYNOPSIS
========
This product provides a way to have consistent backups when running a
multi-storage instance (only ZEO is supported at the moment).
Doing backups of individual Data.fs who are part of the same instance
(one mounted in another) is a problem when there are transactions involving
multiple storages: if there is a crash during transaction commit, there is no
way to tell which storage was committed and which was not (there is no TID
consistency between databases).
There is an even more tricky case. Consider the following:
2 transactions running in parallel:
T1: modifies storage A and B
T2: modifies storage A
Commit order scenario:
T1 starts committing (takes commit lock on A and B)
T2 starts committing (waits for commit lock on A)
T1 commits A (commit lock released on A)
T2 commits A (takes & releases commit lock on A)
[crash]
T1 commits B (commit lock released on B) <- never happens because of crash
Here, T2 was able to commit entirely, but it must not be saved. This is
because transactions are stored in ZODB in the order they are committed.
So is T2 is in the backup, a part of T1 will also be, and backup will be
inconsistent (T1 commit on B never happened).
TIDStorage log and server log
-----------------------------
TIDStorage uses two logfiles - one which is used to inform administrator
about server state (logfile_name in configuration) and TIDStorage log to which
TIDs are appended (status_file in configuration).
USAGE
=====
Put product in Zope Products to activate Zope-side patches.
Create configuration, example is provided in repozo/sample_configuration.py
Run bin/tidstorage.py with created configuration. As Zope commits transations
it will connect to TIDStorage server, which will be shown in Zope and TIDStorage
server logs.
PYTHONPATH issues
-----------------
To run server and scripts there is a need to set correct PYTHONPATH - at least
to product directory and for some tools to Zope lib/python.
Example:
PYTHONPATH=/usr/lib/erp5/lib/python:/usr/lib/erp5/lib/python/Products/TIDStorage
Typical scenario with failure, restoring from backup
----------------------------------------------------
* Zopes and Zeos running
* TIDStorage running
* backups done using repozo/repozo_tidstorage.py (they might contain
incoherency), for every backup tidstorage.tid is saved
* system failure
* restore using repozo/repozo_tidstorage.py with -t tidstorage.tid from last
backup
In this scenario only on restoration destination file is cut at point of last
known TID position. This step is optional, as in some cases administrator
might want to not cut this file.
Typical scenario with failure, no restoring needed
--------------------------------------------------
* Zopes and Zeos running
* TIDStorage running
* system failure
* no need to restore from backup, but there might be some laying transactions
in different ZODB files, system is incoherent
* administrator use repozo/restore_tidstorage.py to cut not correctly commited
transactions, system is coherent again
TECHNICAL DETAILS
=================
TIDStorage fixes those issues by keeping track of transaction-to-tid relations
for all (ZODB, via ZEO) storages involved in any transaction, and by tracking
inter-transaction dependencies.
TIDStorage is composed of 3 parts:
- A Zope product, which monkey-patches "ZEO" and "transaction" products.
transaction patch:
TIDStorage works at transaction boundaries, so we hook around
_commitResource method to know when it happens.
It must be configured to fit your network setup (TID_STORAGE_ADDRESS)
ZEO patch:
With regular ZEO, there is no way to know last committed TID at
transaction-code level. This patch stores last committed TID on ZEO
connection object, to be read by transaction patch.
- A daemon
This is TIDStorage itself, receiving TIDs from Zopes and delivering
coherency points to backup scripts.
- Backup scripts and other utilities
Those scripts are (mostly) wrappers for repozo backup script, fetching
coherency points from TIDStorage daemon and invoking repozo.
No changes to repozo.py are needed, as it is used only as subsystem
to do reliable backups and restore.
Using provided utils in utils/ directory is it possible to query
for last known TID from server and operate on TIDStorage log.
Constraints under which TIDStorage was designed:
- Zope performance
Protocol (see below) was designed as one-way only (Zope pushes data to
TIDStorage, and does not expect an answer), so that TIDStorage speed do not
limit Zope performance.
- No added single-point-of-failure
Even if Zope cannot connect to TIDStorage, it will still work. It will only
emit one log line when connection is lost or at first attempt if it did not
succeed. When connection is established, another log line is emitted.
- Bootstrap
As TIDStorage can be started and stopped while things still happen on
ZODBs, it must be able to bootstrap its content before any backup can
happen. This is done by creating artificial Zope transactions whose only
purpose is to cause a commit to happen on each ZODB, filling TIDStorage and
making sure there is no pending commit on any storage (since all locks
could be taken by those transactions, it means that all transaction started
before that TIDStorage can receive their notification have ended).
- Restoration from Data.fs
In addition to the ability to restore from repozo-style backups, and in
order to provide greater backup frequency than repozo can offer on big
databases, TIDStorage offers the possibility to restore coherent Data.fs
from crashed ones - as long as they are not corrupted.
Limits:
- Restore "lag"
As TIDStorage can only offer a coherency point when inderdependent
transactions are all finished (committed or aborted), TIDStorage log file
backup from time T might actually contain data from moments before.
So while doing restore with -t option data will be cut to state as
time T - undefined, small lag.
There are even pathologic cases where no coherency point can be found,
so TIDStorage log file won't have any information.
Daemeon signal support:
- HUP repoens all log files
- USR1 dumps tid configuration into log file
- TERM kills daemon
PROTOCOL SPECIFICATION
======================
All characters allowed in data, except \n and \r (0x0A & 0x0D).
Each field ends with \n, \r is ignored.
No escaping.
When transferring a list, it is prepended by the number of included fields.
Example:
3\n
foo\n
bar\n
baz\n
When transferring a dict, it is prepended by the number of items, followed by
keys and then values. Values must be integers represented as strings.
Example:
2\n
key1\n
key2\n
1\n
2\n
Commands are case-insensitive.
1) Start of commit command:
BEGIN\n
<commit id>\n
<list of involved storages>
<commit id>: must be identical to the one given when commit finishes (be it ABORT or COMMIT)
<list of involved storages>: list of storage ids involved in the transaction
NB: final \n is part of list representation, so it's not displayed above.
Response: (nothing)
2) Transaction abort command:
ABORT\n
<commit id>\n
<commit id>: (cf. BEGIN)
Response: (nothing)
3) Transaction finalisation command:
COMMIT\n
<commit id>\n
<dict of involved storages and committed TIDs>
<commit id>: (cf. BEGIN)
involved storages: (cf. BEGIN)
committed TIDs: TIDs for each storage, as int.
NB: final \n is part of list representation, so it's not displayed above.
Response: (nothing)
4) Data read command:
DUMP\n
Response:
<dict of storages and TIDs>
5) Connection termination command:
QUIT\n
Response: (nothing, server closes connection)
6) Bootstrap status command:
BOOTSTRAPED\n
Response: 1 if bootstrap was completely done, 0 otherwise.
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2009 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import socket
import time
from ExchangeProtocol import ExchangeProtocol
class TIDClient:
def __init__(self, address):
self._to_server = socket.socket()
self._to_server.connect(address)
self._exchange_protocol = ExchangeProtocol(self._to_server)
def _dump(self, test_id=None):
self._exchange_protocol.send_field('dump')
received_dict = self._exchange_protocol.recv_dict()
if test_id is None:
result = received_dict
else:
id_len = len(test_id) + 1 # Add 1 to strip underscore.
result = dict([(key[id_len:], value) \
for key, value in received_dict.iteritems() \
if key.startswith(test_id)])
return dict([(key, int(value)) for key, value in result.iteritems()])
def dump(self, test_id):
return self._dump(test_id=test_id)
def dump_all(self):
return self._dump()
def begin(self, test_id, transaction_id, storage_id_list):
self._exchange_protocol.send_field('begin')
self._exchange_protocol.send_field(transaction_id)
internal_storage_id_list = ['%s_%s' % (test_id, x) \
for x in storage_id_list]
self._exchange_protocol.send_list(internal_storage_id_list)
def abort(self, test_id, transaction_id):
self._exchange_protocol.send_field('abort')
self._exchange_protocol.send_field(transaction_id)
def commit(self, test_id, transaction_id, storage_tid_dict):
self._exchange_protocol.send_field('commit')
self._exchange_protocol.send_field(transaction_id)
internal_storage_tid_dict = {}
for key, value in storage_tid_dict.iteritems():
internal_storage_tid_dict['%s_%s' % (test_id, key)] = value
self._exchange_protocol.send_dict(internal_storage_tid_dict)
def bootstraped(self):
self._exchange_protocol.send_field('bootstraped')
return self._exchange_protocol.recv_int()
def waitForBootstrap(self):
while not self.bootstraped():
time.sleep(0.1)
TIDStorage 5.4.7
############################################################################
#
# Copyright (c) 2007 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from ZEO.ClientStorage import ClientStorage
from zLOG import LOG, WARNING, INFO
LAST_COMMITED_TID_PROPERTY_ID = '_last_commited_tid'
# Hook tpc_finish's hook method.
# New hook must be a local method because it must access tpc_finish's "self"
# and original hook.
LOG('TIDStorage',INFO,'Monkey patching ClientStorage.tpc_finish and ClientStorage.getLastCommitedTID')
original_tpc_finish = ClientStorage.tpc_finish
def tpc_finish(self, txn, f=None):
def saveTIDOnInstance(tid):
if f is not None:
f(tid)
setattr(self, LAST_COMMITED_TID_PROPERTY_ID, tid)
return original_tpc_finish(self, txn, f=saveTIDOnInstance)
ClientStorage.tpc_finish = tpc_finish
def getLastCommitedTID(self):
"""
Return last commited tid for this storage, or None if no transaction
was commited yet.
"""
return getattr(self, LAST_COMMITED_TID_PROPERTY_ID, None)
ClientStorage.getLastCommitedTID = getLastCommitedTID
############################################################################
#
# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# Load monkey patches
import transaction_transaction
import ZEOClientStorage
#!/usr/bin/python
##############################################################################
#
# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# About errors in TIDStorage logs:
# - CRITICAL: Decreasing update ignored
# This error means that any backup started prior to this error can contain
# incomplete transaction data.
# This error can happen when TIDStorage did not handle received data in the
# right order.
# Example:
# 3 storages (S1, S2, S3):
# They all start at TID=1 value.
# 2 transaction (T1, T2):
# T1 commits TID 3 on S2, TID 2 on S3
# T2 commits TID 2 on S1, TID 2 on S2
# Due to those TIDs, TIDStorage *should* handle data in this order:
# T2begin, T2commit, T1begin, T1commit
# Or:
# T2begin, T1begin, T2commit, T1commit
# Or even, though it denotes a late handling of T2commit:
# T2begin, T1begin, T1commit, T2commit
# But, if TIDStorage handles data in the following order:
# T1begin, T1commit, T2begin, T2commit
# *AND* a backup dumps TIDStorage content at a point between T1commit and
# T2commit, then the backup will contain T2's commit on S2, which has a
# lower TID than T1's commit on that same storage.
#
# - Abort received, but never began
# and
# - Commit received, but never began
# These erros means that packets were lost/never received.
# This should not happen, since network connection is TCP, and TCP
# retransmits data.
# But it happens frequently if TIDStorage is started when Zope is under
# load. This is because Zope attemped to contact TIDStorage at "begin"
# step, but could not reach it. Then, at commit (or abort) it could reach
# TIDStorage, causing the error message.
# This error is bening, because:
# - Until bootstrap is complete, no TID is available for backup
# - When bootstrap is complete, it means that every ZODB got unlocked
# at some point (since TIDStorage commit happens after ZODB tpc_finish
# lock release).
# - When a transaction sends data to multiple ZODBs, there is a point in
# time when ALL impacted ZODBs are locked.
# The conclusion of all this is that any transaction started before
# TIDStorage was available has necesarily finished (commit or abort) at
# the time bootstrap finished.
# So no backup can be affected by such message (but backup feasability can
# get delayed as locks would delay bootstrap end, and hence TID data
# availability).
import os
import imp
import sys
import pwd
import grp
import sets
import time
import urllib
import socket
import signal
import getopt
import SocketServer
import threading
import traceback
from ExchangeProtocol import ClientDisconnected, ExchangeProtocol
class TransactionException(Exception):
pass
class AlwaysIncreasingDict(dict):
"""
When inserting/updating a value, check that the new one is strictly
greater than existing key (or integer 0 value if no value existed for
given key).
Values are converted to integers before comparison.
TODO:
- Do not descend from dict to prevent users from avoiding checks.
"""
def __init__(self, strict=False, *args, **kw):
dict.__init__(self, *args, **kw)
self._strict = strict
def __setitem__(self, key, value):
if self.get(key, 0) < value:
dict.__setitem__(self, key, value)
else:
if self._strict:
log('CRITICAL: Decreasing update ignored: key=%r %r <= %r' % \
(key, value, self.get(key, 0)))
def update(self, other):
"""
To check for decreases.
"""
for key, value in other.iteritems():
self[key] = value
class TransactionTracker:
"""
Implement transaction tracking.
This class is not thread-safe.
A transaction starts with a call to "begin" and ends with a call to
"finish" with the same identifier.
"finish" returns payload provided at begin (or True if no payload was
given) if nothing illegal was detected, otherwise returns False.
Illegal cases:
- "begin" called twice without intermediate "finish" call
- "finish" called without a corresponding "begin" call (this includes
calling "finish" twice)
"""
def __init__(self):
self._container = {}
def begin(self, identifier, payload=True):
if identifier in self._container:
raise TransactionException, 'Begin called twice in a row.'
self._container[identifier] = payload
def finish(self, identifier):
if identifier not in self._container:
raise TransactionException, 'Finish called without former "begin" call.'
return self._container.pop(identifier)
class TIDServer(SocketServer.BaseRequestHandler):
"""
Exchange data with connected peer.
TODO:
- Implement socket buffering.
"""
def log(self, message):
log('%r: %s' % (self.client_address, message))
def dump(self):
tid_dict = self._tid_storage.dump()
self._field_exchange.send_dict(tid_dict)
def begin(self):
identifier = self._field_exchange.recv_field()
storage_id_list = self._field_exchange.recv_list()
self._tid_storage.begin(identifier, storage_id_list)
def abort(self):
identifier = self._field_exchange.recv_field()
self._tid_storage.abort(identifier)
def commit(self):
identifier = self._field_exchange.recv_field()
tid_dict = self._field_exchange.recv_dict()
self._tid_storage.commit(identifier, tid_dict)
def bootstraped(self):
self._field_exchange.send_int(has_bootstraped and 1 or 0)
def handle(self):
global tid_storage
self._tid_storage = tid_storage
self._field_exchange = ExchangeProtocol(socket=self.request)
command_mapping = {
'begin': self.begin,
'abort': self.abort,
'commit': self.commit,
'dump': self.dump,
'bootstraped': self.bootstraped,
}
self.log('Connected')
try:
# Intercept ClientDisconnected exception to stop thread nicely instead
# of crashing.
# Log all others exceptions.
while True:
received = self._field_exchange.recv_field()
command_str = received.lower()
if command_str == 'quit':
break
method = command_mapping.get(command_str)
if method is not None:
# Intercept all errors to log it instead of causing disconnection.
# Except, of course, the ClientDisconnected exception itself.
try:
method()
except ClientDisconnected:
raise
except:
self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
except ClientDisconnected:
pass
except:
self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
self.log('Client disconnected')
self.request.shutdown(socket.SHUT_RDWR)
self.request.close()
return
class TIDStorage:
"""
Store ZODB TIDs for multiple ZODBs.
Designed to be a singleton.
Thread-safe.
Consequently, transactions are not bound to a specific connection: If a
connection is cut after a "begin", reconnecting and issuing "abort" or
"commit" is valid.
TODO:
- Use smaller locking areas
- Improve decision taking algorythm in _unregisterTransactionID (implies
modifying _registerTransactionIDAndStorageID).
"""
_storage_id_lock = threading.RLock()
_next_full_dump = None
_next_dump = None
_burst_period = None
_full_dump_period = None
def __init__(self, tid_file_path=None, burst_period=None, full_dump_period=None):
self._transaction_tracker = TransactionTracker()
self._storage = AlwaysIncreasingDict(strict=True)
self._transcient = AlwaysIncreasingDict()
self._storage_id_to_transaction_id_list_dict = {}
self._transaction_id_to_storage_id_list_dict = {}
self._storage_id_to_storage_id_set_dict = {}
if tid_file_path is not None:
self._burst_period = burst_period
self._full_dump_period = full_dump_period
now = time.time()
if full_dump_period is not None:
self._next_full_dump = now
if burst_period is not None:
self._next_dump = now
self._since_last_burst = sets.Set()
def __repr__(self):
result = []
append = result.append
self._storage_id_lock.acquire()
try:
append('_storage_id_to_transaction_id_list_dict=' + \
repr(self._storage_id_to_transaction_id_list_dict))
append('_transaction_id_to_storage_id_list_dict=' + \
repr(self._transaction_id_to_storage_id_list_dict))
append('_storage_id_to_storage_id_set_dict=' + \
repr(self._storage_id_to_storage_id_set_dict))
append('_transcient=' + repr(self._transcient))
append('_storage=' + repr(self._storage))
finally:
self._storage_id_lock.release()
return '\n'.join(result)
def _registerTransactionIDAndStorageID(self, transaction_id, storage_id_list):
assert len(storage_id_list) != 0
assert self._storage_id_lock.acquire(False)
try:
# Update transaction_id -> storage_id_list
assert transaction_id not in self._transaction_id_to_storage_id_list_dict
self._transaction_id_to_storage_id_list_dict[transaction_id] = storage_id_list
storage_id_set = sets.Set(storage_id_list)
storage_id_set_id_set = sets.Set()
for storage_id in storage_id_list:
# Update storage_id -> transaction_id_list
identifier_set = self._storage_id_to_transaction_id_list_dict.get(storage_id)
if identifier_set is None:
identifier_set = self._storage_id_to_transaction_id_list_dict[storage_id] = sets.Set()
assert transaction_id not in identifier_set
identifier_set.add(transaction_id)
# Prepare the update storage_id -> storage_id_set
existing_storage_id_set = self._storage_id_to_storage_id_set_dict.get(storage_id, None)
if existing_storage_id_set is not None:
storage_id_set.union_update(existing_storage_id_set)
storage_id_set_id_set.add(id(existing_storage_id_set))
# Update storage_id -> storage_id_set
# Cannot use iteritems because dict is modified in the loop.
for key, value in self._storage_id_to_storage_id_set_dict.items():
if id(value) in storage_id_set_id_set:
self._storage_id_to_storage_id_set_dict[key] = storage_id_set
for storage_id in storage_id_set:
self._storage_id_to_storage_id_set_dict[storage_id] = storage_id_set
finally:
self._storage_id_lock.release()
def _unregisterTransactionID(self, transaction_id):
"""
Also transfers from self._transcient to self._storage.
"""
assert self._storage_id_lock.acquire(False)
try:
# Update transaction_id -> storage_id_list and retrieve storage_id_list
# Raises if not found
storage_id_list = self._transaction_id_to_storage_id_list_dict.pop(transaction_id)
# Update storage_id -> transaction_id_list
for storage_id in storage_id_list:
identifier_set = self._storage_id_to_transaction_id_list_dict[storage_id]
# Raises if not found
identifier_set.remove(transaction_id)
if len(identifier_set) == 0:
del self._storage_id_to_transaction_id_list_dict[storage_id]
# Update storage_id -> storage_id_set
# Raises if not found
storage_id_set = self._storage_id_to_storage_id_set_dict[storage_id]
# Raises if not found
storage_id_set.remove(storage_id)
if has_bootstraped:
if tid_file is not None:
now = time.time()
can_full_dump = (self._next_full_dump is not None) and (self._next_full_dump < now)
can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now)
record_for_dump = can_dump or (self._next_dump is not None)
append_to_file = (can_dump or can_full_dump)
else:
append_to_file = record_for_dump = can_dump = can_full_dump = False
for key, value in self._storage_id_to_storage_id_set_dict.iteritems():
if len(value) == 0 and key in self._transcient:
self._storage[key] = self._transcient.pop(key)
if record_for_dump:
self._since_last_burst.add(key)
if append_to_file:
if can_full_dump:
to_dump_dict = self._storage
dump_code = 'f'
else:
to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst])
dump_code = 'd'
if len(to_dump_dict):
self._dumpToTIDFile(now, dump_code, to_dump_dict)
if can_full_dump:
self._next_full_dump = now + self._full_dump_period
if self._next_dump is not None:
self._next_dump = now + self._burst_period
self._since_last_burst.clear()
else:
doBootstrap()
finally:
self._storage_id_lock.release()
def rotateTIDFile(self):
global tid_file
self._storage_id_lock.acquire()
try:
tid_file = openTIDLog()
if tid_file is not None:
self._dumpToTIDFile(time.time(), 'f', self._storage)
finally:
self._storage_id_lock.release()
def _dumpToTIDFile(self, now, dump_code, to_dump_dict):
tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict))
def dump(self):
self._storage_id_lock.acquire()
try:
return self._storage.copy()
finally:
self._storage_id_lock.release()
def dump_transcient(self):
self._storage_id_lock.acquire()
try:
return self._transcient.copy()
finally:
self._storage_id_lock.release()
def begin(self, transaction_id, storage_id_list):
self._storage_id_lock.acquire()
try:
self._transaction_tracker.begin(transaction_id, storage_id_list)
self._registerTransactionIDAndStorageID(transaction_id, storage_id_list)
finally:
self._storage_id_lock.release()
def abort(self, transaction_id):
self._storage_id_lock.acquire()
try:
try:
self._transaction_tracker.finish(transaction_id)
except TransactionException:
# Overwrite exception message
raise TransactionException, 'Abort received, but never began'
self._unregisterTransactionID(transaction_id)
finally:
self._storage_id_lock.release()
def commit(self, transaction_id, tid_dict):
self._storage_id_lock.acquire()
try:
try:
storage_id_list = self._transaction_tracker.finish(transaction_id)
except TransactionException:
# Overwrite exception message
raise TransactionException, 'Commit received, but never began'
check_dict = tid_dict.copy()
for storage_id in storage_id_list:
del check_dict[storage_id]
assert len(check_dict) == 0
self._transcient.update(tid_dict)
self._unregisterTransactionID(transaction_id)
finally:
self._storage_id_lock.release()
class BootstrapContent(threading.Thread):
"""
Thread used to bootstrap TIDStorage content.
This must be started at first client request, and must be run only once.
Global boolean "has_bootstraped" is set to true once it succeeded.
"""
def __init__(self, *args, **kw):
threading.Thread.__init__(self, *args, **kw)
self.setDaemon(True)
def run(self):
"""
Contact all zopes to serialize all their storages.
"""
global has_bootstraped
base_url = options.base_url
if base_url is not None:
log('Bootstrap started')
storage_id_to_object_path_dict = {}
for key, value in options.known_tid_storage_identifier_dict.iteritems():
mountpoint = value[2]
if mountpoint is None:
log('Skipping bootstrap of storage %s because its mountpoint is unknown.' % (key, ))
else:
storage_id_to_object_path_dict[key] = mountpoint
target_storage_id_set = sets.ImmutableSet(storage_id_to_object_path_dict.keys())
known_storage_id_set = sets.ImmutableSet(tid_storage.dump_transcient().keys())
to_check_storage_id_set = target_storage_id_set - known_storage_id_set
while len(to_check_storage_id_set) and can_bootstrap:
serialize_url = None
for storage_id in to_check_storage_id_set:
if can_bootstrap and storage_id not in tid_storage.dump_transcient().keys():
serialize_url = base_url % (storage_id_to_object_path_dict[storage_id], )
try:
# Query a Zope, which will contact this process in return to store
# the new TID number, making the given storage known.
page = urllib.urlopen(serialize_url)
except Exception, message:
log('Exception during bootstrap (%r):\n%s' % (serialize_url, ''.join(traceback.format_exception(*sys.exc_info()))))
else:
log('Opened %r: %r' % (serialize_url, page.read()))
# Let some time for zope to contact TIDStorage back and fill the gaps.
time.sleep(5)
known_storage_id_set = sets.ImmutableSet(tid_storage.dump_transcient().keys())
to_check_storage_id_set = target_storage_id_set - known_storage_id_set
if len(to_check_storage_id_set):
log('Bootstrap in progress... Missing storages: %r' % (to_check_storage_id_set, ))
# Retry a bit later
time.sleep(60)
if len(to_check_storage_id_set) == 0:
log('Bootstrap done (%i storages).' % (len(target_storage_id_set), ))
has_bootstraped = True
else:
log('Bootstrap did not happen because base_url was not given.')
has_bootstraped = True
bootstrap_content = BootstrapContent()
has_bootstraped = False
can_bootstrap = True
bootstrap_lock = threading.RLock()
def doBootstrap():
acquired = bootstrap_lock.acquire(False)
if acquired:
try:
if not bootstrap_content.isAlive():
bootstrap_content.start()
finally:
bootstrap_lock.release()
def log(message):
print >> sys.stdout, '%s: %s' % (time.asctime(), message)
class PoliteThreadingTCPServer(SocketServer.ThreadingTCPServer):
daemon_threads = True
allow_reuse_address = True
def server(address, port):
server = PoliteThreadingTCPServer((address, port), TIDServer)
try:
try:
log('Server listening on %s:%s.'% server.server_address)
server.serve_forever()
except KeyboardInterrupt:
log('Shuting down (received KeyboardInterrupt).')
finally:
global can_bootstrap
can_bootstrap = False
server.server_close()
def openLog():
if options.fork:
result = open(options.logfile_name, 'a', 0)
else:
result = sys.stdout
return result
def openTIDLog():
if options.status_file is None:
result = None
else:
result = open(options.status_file, 'a', 0)
return result
def HUPHandler(signal_number, stack):
log('Rotating logfile...')
sys.stdout = sys.stderr = openLog()
tid_storage.rotateTIDFile()
log('Logfile rotated')
def USR1Handler(signal_number, stack):
log(repr(tid_storage))
def TERMHandler(signal_number, stack):
log('Received SIGTERM, exiting.')
raise KeyboardInterrupt, 'Killed by SIGTERM'
def usage():
print """
Usage: %(arg0)s [-h] [-n|--nofork|--fg] [-l|--log] [-p|--port] [-a|--address]
[--pidfile] [--user] [--group] [-s|--status-file] [-b|--burst-period]
[-F|--full-dump-period] [-c|--config]
-h
Display this help.
-n
--nofork
--fg
Do not fork in background.
-l filename
--log filename
Log to given filename, instead of default %(logfile_name)s.
-p number
--port number
Listen to given port number, intead of default %(port)i.
-a address
--address address
Listen to interface runing given address, instead of default %(address)s.
--pidfile file_path
If forking, this file will contain the pid of forked process.
If this argument is not provided, pid is written to %(pidfile_name)s.
--user user_name
Run as specified user.
Also, retrieve user's group and run as this group.
--group group_name
Run as specified group.
If both --user and --group are specified, --group must come last.
-s file_name
--status-file file_name
Append stored TIDs to file.
See also "burst-period" and "full-dump-period".
If not provided, no dump ever happens.
-b seconds
--burst-period seconds
Defines the age of last write after which an incremental write can happen.
Such write only contain what changed since last write.
If not provided, no incremental write is done.
-F seconds
--full-dump-period seconds
How many seconds must separate complete dumps to status file.
Those writes contain the complete current state.
If both a full dump and an incremental write can happen, full dump takes
precedence.
If not provided, no full dump is done.
-c file_name
--config file_name
Use given file as options file.
It must be a python file. See sample_options.py for possible values.
If provided and if configuration file defines base_url and
known_tid_storage_identifier_dict variables, this program will cause
generation of all tids before first write to status file.
""" % {'arg0': sys.argv[0],
'logfile_name': Options.logfile_name,
'pidfile_name': Options.pidfile_name,
'port': Options.port,
'address': Options.address}
class Options:
port = 9001
address = '0.0.0.0'
logfile_name = 'tidstorage.log'
pidfile_name = 'tidstorage.pid'
fork = True
setuid = None
setgid = None
status_file = None
burst_period = None
full_dump_period = None
known_tid_storage_identifier_dict = {}
base_url = None
config_file_name = None
options = Options()
try:
opts, args = getopt.getopt(sys.argv[1:],
'hnfl:p:a:s:b:F:c:',
['help', 'nofork', 'fg', 'log=', 'port=',
'address=', 'pidfile=', 'user=', 'group=',
'status-file=', 'burst-period=',
'full-dump-period=', 'config='])
except:
usage()
raise
for opt, arg in opts:
if opt in ('-h', '--help'):
usage()
sys.exit()
elif opt in ('-n', '--fg', '--nofork'):
options.fork = False
elif opt in ('-l', '--log'):
options.logfile_name = arg
elif opt in ('-p', '--port'):
options.port = int(arg)
elif opt in ('-a', '--address'):
options.address = arg
elif opt == '--pidfile':
options.pidfile_name = arg
elif opt == '--user':
pw = pwd.getpwnam(arg)
options.setuid = pw.pw_uid
options.setgid = pw.pw_gid
elif opt == '--group':
options.setgid = grp.getgrnam(arg).gr_gid
elif opt in ('-s', '--status-file'):
options.status_file = arg
elif opt in ('-b', '--burst-period'):
options.burst_period = int(arg)
elif opt in ('-F', '--full-dump-period'):
options.full_dump_period = int(arg)
elif opt in ('-c', '--config'):
config_file_name = arg
if config_file_name is not None:
config_file = os.path.splitext(os.path.basename(config_file_name))[0]
config_path = os.path.dirname(config_file_name)
if len(config_path):
config_path = [config_path]
else:
config_path = sys.path
file, path, description = imp.find_module(config_file, config_path)
module = imp.load_module(config_file, file, path, description)
file.close()
for option_id in [x for x in dir(Options) if x[:1] != '_']:
if option_id not in options.__dict__ and hasattr(module, option_id):
setattr(options, option_id, getattr(module, option_id))
if options.pidfile_name is not None:
options.pidfile_name = os.path.abspath(options.pidfile_name)
if options.logfile_name is not None:
options.logfile_name = os.path.abspath(options.logfile_name)
if options.status_file is not None:
options.status_file = os.path.abspath(options.status_file)
if options.setgid is not None:
os.setgid(options.setgid)
if options.setuid is not None:
os.setuid(options.setuid)
tid_storage = TIDStorage(tid_file_path=options.status_file,
burst_period=options.burst_period,
full_dump_period=options.full_dump_period)
signal.signal(signal.SIGHUP, HUPHandler)
signal.signal(signal.SIGUSR1, USR1Handler)
signal.signal(signal.SIGTERM, TERMHandler)
tid_file = openTIDLog()
sys.stdout = sys.stderr = openLog()
if options.fork:
os.chdir('/')
os.umask(027)
pidfile = open(options.pidfile_name, 'w')
pid = os.fork()
if pid == 0:
os.setsid()
pid = os.fork()
if pid == 0:
pidfile.close()
os.close(0)
os.close(1)
os.close(2)
server(options.address, options.port)
log('Exiting.')
else:
pidfile.write(str(pid))
pidfile.close()
os._exit(0)
else:
os._exit(0)
else:
server(options.address, options.port)
#!/usr/bin/python2.4
##############################################################################
#
# Copyright (c) 2007 Nexedi SARL. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Parts of this file are borrowed from Zope 2.8.8 repozo.py script.
# Essentialy "usage" and "parseargs" methods.
# So it's released under the ZPL v2.0, as is Zope 2.8.8 .
""" repozo wrapper to backup for multiple Data.fs files and restore them in a consistent way.
Usage: %(program)s [-h|--help] [-c|--config configuration_file]
[--repozo repozo_command] [-R|--recover|--recover_check]
[--tid_log tid_log_file]
[...]
-h
--help
Display this help and exit.
-c configuration_file
--config configuration_file
Use given file as configuration file.
It must be a python file. See sample_configuration.py for required values.
Recquired if neither -h nor --help are given.
--repozo repozo_command
Use given executable as repozo command.
Default: repozo.py
-R
--recover
Instead of saving existing Data.fs, perform an automated recovery from
backups + timestamp file with optionally cutting files at the last coherent
transaction.
--recover_check
Similar to above, except that it restores file to temp folder and compares
with existing file.
Files restored this way are automaticaly deleted after check.
--recover_inplace
Like 'recover' above, but doesn't actually restore from repozo files,
instead it just cuts existing FileStorage files at the last coherent
transaction.
-t tid_log_file
--tid_log tid_log_file
TID log file, which will be used to find TID, which then will be used to
cut restored file at found TID
"""
import imp
import getopt
import glob
import sys
import os
import md5
import time
import tempfile
from shutil import copy
from restore_tidstorage import parse, get_tid_position
program = sys.argv[0]
def log(message):
print message
def cleanup(known_tid_storage_identifier_dict, keep_full_backup_count,
status_file_backup_dir, status_file):
if keep_full_backup_count <= 0:
raise ValueError("A number of full backups to keep must be specified.")
cleanup_list = []
date_len = 19
oldest_fsz = None # Oldest full backup to keep among all storages.
for file_path, storage_path, object_path \
in known_tid_storage_identifier_dict.itervalues():
# Find oldest full backup to keep for this storage -> fsz
fsz_list = sorted(glob.glob(os.path.join(storage_path, '*.fsz')))
fsz = fsz_list[max(0, len(fsz_list) - keep_full_backup_count)]
fsz, ext = os.path.splitext(os.path.basename(fsz))
assert len(fsz) == date_len
if oldest_fsz is None or fsz < oldest_fsz:
oldest_fsz = fsz
# Clean up all repozo files older than fsz.
for path in glob.glob(os.path.join(storage_path, '*')):
date, ext = os.path.splitext(os.path.basename(path))
assert len(date) == date_len
if ext in ('.fsz', '.deltafsz', '.dat') and date < fsz:
cleanup_list.append(path)
# Clean up all status files older than oldest_fsz.
if oldest_fsz and status_file_backup_dir and status_file:
prefix = os.path.join(status_file_backup_dir,
os.path.basename(status_file) + '-')
path_len = len(prefix) + date_len
for path in glob.glob(prefix + '*'):
assert path.startswith(path) and len(path) == path_len
if path[-date_len:] < oldest_fsz:
cleanup_list.append(path)
return cleanup_list
def backup(known_tid_storage_identifier_dict, repozo_formated_command):
"""Backups all ZODB files"""
backup_count = 0
total_count = len(known_tid_storage_identifier_dict)
for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
repozo_command = repozo_formated_command % (storage_path, file_path)
if not os.access(storage_path, os.R_OK):
os.makedirs(storage_path)
log('Runing %r...' % (repozo_command, ))
status = os.system(repozo_command)
status = os.WEXITSTATUS(status)
if status == 0:
backup_count += 1
else:
log('Error occurred while saving %s: exit status=%i' % (file_path, status))
log('Saved %i FileStorages out of %i.' % (backup_count, total_count))
return total_count - backup_count
def get_md5_diggest(file_instance, length):
BLOCK_SIZE=512
file_instance.seek(0)
md5sum = md5.new()
read = file_instance.read
update = md5sum.update
while length > 0:
to_read = min(BLOCK_SIZE, length)
buffer = read(to_read)
if len(buffer) != to_read:
log('Warning: read %i instead of requested %i, stopping read' % (len(buffer), to_read))
length = 0
else:
length -= to_read
update(buffer)
return md5sum.hexdigest()
def recover(known_tid_storage_identifier_dict, repozo_formated_command, check=False, last_tid_dict=None):
"""Recovers all ZODB files, when last_tid_dict is passed cut them at proper
byte. If repozo_formated_command is None, assume we only need to trim the
files to the last registered coherent transaction."""
if repozo_formated_command is None and last_tid_dict is None:
log('Error: we need the Status File for trimming the ZODB files')
return 1
recovered_count = 0
total_count = len(known_tid_storage_identifier_dict)
result = 0
for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
# check that indexes do not exists if cut shall be done
file_index = file_path + ".index"
if os.access(file_index, os.F_OK) and last_tid_dict is not None:
log('Error: Index file %s exists. Bug in ZODB makes it impossible to cut'
' indexed file after restore. Cannot continue.' %
file_index)
result = 1
if result:
return result
for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
if repozo_formated_command is not None and not os.access(storage_path, os.R_OK):
log('Warning: unable to recover %s because %s is missing/unreadable.' % (file_path, storage_path))
continue
elif repozo_formated_command is None and not (os.path.isfile(file_path) and
os.access(file_path, os.W_OK)):
log('Warning: unable to trim %s because it is missing/unwritable' % (file_path,))
continue
if check:
original_file_path = file_path
file_path = os.path.join(tempfile.gettempdir(), os.path.basename(file_path))
if repozo_formated_command is not None:
repozo_command = repozo_formated_command % (storage_path, file_path)
status = os.system(repozo_command)
status = os.WEXITSTATUS(status)
else:
# we're being asked to cut only, so assume successful restore
status = 0
if status == 0:
recovered_count += 1
if last_tid_dict is not None:
pos = get_tid_position(file_path, last_tid_dict[key])
print 'Cutting restored file %s at %s byte' % (file_path, pos),
f = open(file_path,'a')
if not check:
f.truncate(pos)
print
else:
print 'only check, file untouched'
f.close()
else:
log('Error occurred while recovering %s: exit status=%i' % (file_path, status))
if check:
log('Info: Comparing restored %s with original %s' % (file_path, original_file_path))
recovered_file = open(file_path, 'r')
original_file = open(original_file_path, 'r')
try:
recovered_file.seek(0, 2)
original_file.seek(0, 2)
recovered_file_length = recovered_file.tell()
original_file_length = original_file.tell()
checked_length = recovered_file_length
if recovered_file_length < original_file_length:
log('Info: Shorter than original: -%i bytes (-%.02f%%)' % \
(original_file_length - recovered_file_length,
1 - (float(recovered_file_length) / original_file_length)))
elif recovered_file_length > original_file_length:
log('ERROR: Longer than original: +%i bytes (+%.02f%%). Was original packed since backup ?' % \
(recovered_file_length - original_file_length,
float(recovered_file_length) / original_file_length))
checked_length = None
if checked_length is not None:
recovered_file_diggest = get_md5_diggest(recovered_file, checked_length)
original_file_diggest = get_md5_diggest(original_file, checked_length)
if recovered_file_diggest != original_file_diggest:
log('ERROR: Recovered md5 does not match original: %s != %s.' % \
(recovered_file_diggest, original_file_diggest))
finally:
recovered_file.close()
original_file.close()
os.unlink(file_path)
log('Restored %i FileStorages out of %i.' % (recovered_count, total_count))
return total_count - recovered_count
def usage(code, msg=''):
outfp = sys.stderr
if code == 0:
outfp = sys.stdout
print >> outfp, __doc__ % globals()
if msg:
print >> outfp, msg
sys.exit(code)
def parseargs():
try:
opts, args = getopt.getopt(sys.argv[1:], 'vQrt:FhzRc:',
['help', 'verbose', 'quick', 'full',
'gzip', 'repository', 'repozo=',
'config=','recover', 'recover_check',
'recover_inplace', 'tid_log=', 'cleanup'])
except getopt.error, msg:
usage(1, msg)
class Options:
timestamp_file_path = None
repozo_file_name = 'repozo.py'
configuration_file_name = None
repozo_opts = ['-B']
known_tid_storage_identifier_dict = {}
action = None
dry_run = False
inplace = False
status_file = None
status_file_backup_dir = None
recover_status_file = None
keep_full_backup_count = None
options = Options()
if args:
options.repozo_opts.extend(args)
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt in ('-c', '--config'):
options.configuration_file_name = arg
elif opt == '--repozo':
options.repozo_file_name = arg
elif opt in ('-R', '--recover', '--recover_check', '--recover_inplace'):
options.repozo_opts[0] = '-R'
if options.action:
usage(1, 'Only 1 command allowed.')
options.action = 'recover'
if opt == '--recover_check':
options.dry_run = True
if opt == '--recover_inplace':
options.inplace = True
elif opt in ('--cleanup'):
if options.action:
usage(1, 'Only 1 command allowed.')
options.action = 'cleanup'
elif opt in ('-r', '--repository'):
options.repozo_opts.append('%s %s' % (opt, arg))
elif opt in ('-t', '--tid_log'):
options.recover_status_file = arg
else:
options.repozo_opts.append(opt)
if options.configuration_file_name is None:
usage(1, 'Either -c or --config is required.')
configuration_filename, ext = os.path.splitext(os.path.basename(options.configuration_file_name))
configuration_path = os.path.dirname(options.configuration_file_name)
if len(configuration_path):
configuration_path = [configuration_path]
else:
configuration_path = sys.path
file, path, description = imp.find_module(configuration_filename, configuration_path)
module = imp.load_module(configuration_filename, file, path, description)
file.close()
try:
options.known_tid_storage_identifier_dict = module.known_tid_storage_identifier_dict
options.timestamp_file_path = module.timestamp_file_path
except AttributeError, msg:
usage(1, msg)
for option_id in ('status_file', 'status_file_backup_dir',
'keep_full_backup_count'):
if getattr(options, option_id) is None:
setattr(options, option_id, getattr(module, option_id, None))
# XXX: we do not check any option this way, it's too dangerous.
#options.repozo_opts.extend(getattr(module, 'repozo_opts', []))
return options
def backupStatusFile(status_file,destination_directory):
file_name = os.path.basename(status_file) + '-' + '%04d-%02d-%02d-%02d-%02d-%02d' % time.gmtime()[:6]
copy(status_file, os.path.join(destination_directory, file_name))
log("Written status file backup as %s" % os.path.join(destination_directory, file_name))
def main():
options = parseargs()
last_tid_dict = None
if options.recover_status_file:
if options.action != 'recover':
raise ValueError("Status file path only for recovering")
last_tid_dict = parse(options.recover_status_file)
if options.action == 'cleanup':
for path in cleanup(options.known_tid_storage_identifier_dict,
options.keep_full_backup_count,
options.status_file_backup_dir, options.status_file):
os.remove(path)
return 0
repozo_formated_command = '%s %s -r "%%s"' % (options.repozo_file_name, ' '.join(options.repozo_opts))
if options.action == 'recover':
timestamp_file = open(options.timestamp_file_path, 'r')
timestamp = ''
read_line = ' '
while len(read_line):
timestamp = read_line
read_line = timestamp_file.readline()
timestamp = timestamp.strip('\r\n \t')
if timestamp is not None:
repozo_formated_command += ' -o "%%s" -D %s' % (timestamp, )
if options.inplace:
# use the Data.fs files themselves, instead of restoring them with repozo
repozo_formated_command = None
result = recover(
known_tid_storage_identifier_dict=options.known_tid_storage_identifier_dict,
repozo_formated_command=repozo_formated_command,
check=options.dry_run,
last_tid_dict=last_tid_dict)
else:
repozo_formated_command += ' -f "%s"'
if options.status_file is not None and options.status_file_backup_dir is not None:
backupStatusFile(options.status_file, options.status_file_backup_dir)
result = backup(
known_tid_storage_identifier_dict=options.known_tid_storage_identifier_dict,
repozo_formated_command=repozo_formated_command)
if result == 0:
# Paranoid mode:
# Issue a system-wide "sync" command to make sure all files which were saved
# are really present on disk.
os.system('sync')
timestamp_file = open(options.timestamp_file_path, 'a', 0)
try:
# Borrowed from repozo.
timestamp_file.write('\n%04d-%02d-%02d-%02d-%02d-%02d' % time.gmtime()[:6])
finally:
timestamp_file.close()
return result
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/python2.4
##############################################################################
#
# Copyright (c) 2007 Nexedi SARL. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Parts of this file are borrowed from Zope 2.8.8 repozo.py script.
# Essentialy "usage", "parseargs" and parts of "restore" methods.
# So it's released under the ZPL v2.0, as is Zope 2.8.8 .
"""
Usage: %(program)s [-h|--help] [-c|--config configuration_file]
-h
--help
Display this help and exit.
-c configuration_file
--config configuration_file
Use given file as configuration file.
It must be a python file.
Recquired if neither -h nor --help are given.
"""
import imp
import getopt
import sys
import os
# urllib2 does not support (?) urls containing credentials
# (http://login:password@...) but it's fine with urllib.
from struct import pack
import shutil
from ZODB.FileStorage import FileStorage
program = sys.argv[0]
def log(message):
print message
def parse(status_file):
tid_log = open(status_file)
content = {}
last_timestamp = None
line = tid_log.readline()
while line != '':
split_line = line.split(' ', 2)
assert len(split_line) == 3, repr(split_line)
line_timestamp, line_type, line_dict = split_line
line_timestamp = float(line_timestamp)
assert line_type in ('f', 'd'), repr(line_type)
if last_timestamp is None:
last_timestamp = line_timestamp
else:
assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
line_dict = eval(line_dict, None)
assert isinstance(line_dict, dict), type(line_dict)
assert len(line_dict), repr(line_dict)
if line_type == 'd':
for key, value in line_dict.iteritems():
if key in content:
assert content[key] < value, '%r < %r' % (content[key], value)
content[key] = value
elif line_type == 'f':
for key, value in content.iteritems():
assert key in line_dict, repr(key)
assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
content = line_dict
line = tid_log.readline()
return content
READCHUNK = 10 * 1024 * 1024
def get_tid_position(filepath,last_tid):
tid = pack('>Q', last_tid + 1)
# Find the file position of the last completed transaction.
fs = FileStorage(filepath, read_only=True, stop=tid)
# Note that the FileStorage ctor calls read_index() which scans the file
# and returns "the position just after the last valid transaction record".
# getSize() then returns this position, which is exactly what we want,
# because we only want to copy stuff from the beginning of the file to the
# last valid transaction record.
pos = fs.getSize()
fs.close()
return pos
def recover(data_fs_backup_path_dict, status_file):
last_tid_dict = parse(status_file)
for storage_id, (file_path, backup_path) in data_fs_backup_path_dict.iteritems():
# Derived from repozo (function=do_full_backup)
# TODO: optimise to read backup only once.
can_restore = False
if os.path.exists(backup_path):
if os.path.exists(file_path):
print 'Both original and backup files exist for %r. If previous restoration was successful, you should delete the backup for this restoration to take place. Original: %r Backup: %r' % (storage_id, file_path, backup_path)
else:
print 'Only backup file is available for %r: %r. Assuming it\'s ok and restoring to %r' % (storage_id, backup_path, file_path)
can_restore = True
else:
if os.path.exists(file_path):
sys.stdout.write('Copying %r to %r... ' % (file_path, backup_path))
shutil.copy(file_path, backup_path)
initial_size = stat(file_path).st_size
final_size = stat(backup_path).st_size
if initial_size == final_size:
can_restore = True
print 'Done.'
else:
print 'Backup size %i differs from original size %i. Is the original file (%r) still in use ? Is there enough free disk space at destination (%r) ?' % (final_size, initial_size, file_path, backup_path)
else:
print 'Cannot find any file for %r: %r and %r do not exist.' % (storage_id, file_path, backup_path)
if can_restore:
pos = get_tid_position(backup_path,last_tid_dict[storage_id])
print 'Restoring backup: %s bytes (transaction %r) from %s to %s' % (pos, tid, backup_path, file_path)
source_file = open(backup_path, 'rb')
destination_file = open(file_path, 'wb')
while pos:
todo = min(READCHUNK, pos)
data = source_file.read(todo)
if not data:
print 'Unexpected end of data stream (should contain %i more bytes)' % (pos, )
break
destination_file.write(data)
pos -= len(data)
destination_file.close()
source_file.close()
else:
print 'Skipping restoration of %r (%r).' % (file_path, storage_id)
def usage(code, msg=''):
outfp = sys.stderr
if code == 0:
outfp = sys.stdout
print >> outfp, __doc__ % globals()
if msg:
print >> outfp, msg
sys.exit(code)
def parseargs():
try:
opts, args = getopt.getopt(sys.argv[1:], 'hc:',
['help', 'config='])
except getopt.error, msg:
usage(1, msg)
class Options:
configuration_file_name = None
status_file = None
options = Options()
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt in ('-c', '--config'):
options.configuration_file_name = arg
if options.configuration_file_name is None:
usage(1, 'Either -c or --config is required.')
configuration_filename, ext = os.path.splitext(os.path.basename(options.configuration_file_name))
configuration_path = os.path.dirname(options.configuration_file_name)
if len(configuration_path):
configuration_path = [configuration_path]
else:
configuration_path = sys.path
file, path, description = imp.find_module(configuration_filename, configuration_path)
module = imp.load_module(configuration_filename, file, path, description)
file.close()
try:
options.data_fs_backup_path_dict = module.data_fs_backup_path_dict
options.status_file = module.status_file
except AttributeError, msg:
usage(1, msg)
return options
def main():
options = parseargs()
recover(
data_fs_backup_path_dict=options.data_fs_backup_path_dict,
status_file=options.status_file)
if __name__ == '__main__':
sys.exit(main())
# COMMON
# This part is used both by tidstorage.py and repozo_tidstorage.py
known_tid_storage_identifier_dict = {
"((('localhost', 8200),), '2')":
('/home/vincent/zeo2/var2/Data.fs',
'/home/vincent/tmp/repozo/z22',
'foo_test'),
"((('localhost', 8200),), '1')":
('/home/vincent/zeo2/var/Data.fs',
'/home/vincent/tmp/repozo/z21',
'bar_test'),
"((('localhost', 8100),), '1')":
('/home/vincent/zeo1/var/Data.fs',
'/home/vincent/tmp/repozo/z11',
'baz_test'),
}
base_url = 'http://login:password@localhost:5080/erp5/%s/modifyContext'
port = 9001
host = '127.0.0.1'
# SERVER
# This part is only used by server_v2.py
#logfile_name = 'tidstorage.log'
#pidfile_name = 'tidstorage.pid'
#fork = False
#setuid = None
#setgid = None
status_file = 'tidstorage.tid'
burst_period = 30
full_dump_period = 300
# REPOZO_TIDSTORAGE
# This part is only used by repozo_tidstorage.py
timestamp_file_path = 'repozo_tidstorage_timestamp.log'
# place to put backuped TIDStorage status_file logs
status_file_backup_dir = '/home/vincent/tmp/repozo'
# When cleaning up old backups, keep this number of full backups.
keep_full_backup_count = 3
#!/usr/bin/python
import traceback
import sys
from TIDClient import TIDClient
class TestTIDServerV2:
def __init__(self, address, port):
self._client = TIDClient((address, port))
def assertEqual(self, value, target):
assert value == target, 'value %r does not match target %r' % (value, target)
def test_01_InitialValue(self, test_id):
"""
Check that the storage is empty
"""
self.assertEqual(self._client.dump_all(), {})
def test_02_Bootstrap(self, test_id):
"""
Trigger bootstrap and check that no value is visible until bootstrap is
done.
"""
t1_storage_tid_dict = {'s0': 1}
t2_storage_tid_dict = {'s1': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
# Bootstrap is runing on the server, nothing is visible yet.
self.assertEqual(self._client.dump(test_id), {})
self._client.waitForBootstrap()
# Nothing is available yet, we need one more transaction to happen.
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
# Now everything must be available.
self.assertEqual(self._client.dump(test_id), {'s0': 1, 's1': 1})
def test_03_Scenario1(self, test_id):
"""
Simple begin - commit case.
"""
storage_tid_dict = {'s1': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', storage_tid_dict)
self.assertEqual(self._client.dump(test_id), storage_tid_dict)
def test_04_Scenario2(self, test_id):
"""
Simple begin - abort case.
"""
storage_tid_dict = {'s1': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.abort(test_id, 't1')
self.assertEqual(self._client.dump(test_id), {})
def test_05_Scenario3(self, test_id):
"""
2 concurent transactions impacting a common storage.
Second transaction begins after first does, and commits before
first does.
"""
t1_storage_tid_dict = {'s1': 1, 's2': 1}
t2_storage_tid_dict = {'s1': 2, 's3': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1, 's3': 1})
def test_06_Scenario4(self, test_id):
"""
3 concurent transactions.
Transactions 1 and 2 impact same storage s1.
Transaction 3 impacts storage s3 after transaction 2 commited.
Still, as storage 3 was part of a non-commitable-yet transaction,
it must not be commited untill all blockable (here, t1) transaction have
ended.
"""
t1_storage_tid_dict = {'s1': 1, 's2': 2}
t2_storage_tid_dict = {'s1': 2, 's3': 1}
t3_storage_tid_dict = {'s3': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't3', t3_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2, 's3': 1})
def test_07_Scenario4bis(self, test_id):
"""
3 concurent transactions.
Transactions 1 and 2 impact same storage s1.
Transaction 3 impacts storage s3 after transaction 2 commited.
Still, as storage 3 was part of a non-commitable-yet transaction,
it must not be commited untill all blockable (here, t1) transaction have
ended.
In this version, t1 aborts: for example, tpc_vote failed. As the data
was already sent to storage, it might be already present on disk (and
anyway, tid is not to be used anymore), so it's valid for t2 to commit
with tid 2 even if t1 aborted tid 1.
"""
t1_storage_tid_dict = {'s1': 1, 's2': 2}
t2_storage_tid_dict = {'s1': 2, 's3': 1}
t3_storage_tid_dict = {'s3': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't3', t3_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.abort(test_id, 't1')
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's3': 1})
def test_08_Scenario5(self, test_id):
"""
2 concurent transactions impacting a common storage.
Second transaction begins after first does, and commits after
first does.
"""
t1_storage_tid_dict = {'s1': 2}
t2_storage_tid_dict = {'s1': 1, 's2': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1})
def test_09_Scenario6(self, test_id):
"""
2 concurent transactions impacting separate sets of storages.
Check that the first commit impacts dump data immediately.
"""
t1_storage_tid_dict = {'s1': 1}
t2_storage_tid_dict = {'s2': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 1})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 1, 's2': 1})
def test_10_Scenario7(self, test_id):
"""
3 concurent transactions.
t1 and t2 impact a set of different storages.
t3 impacts a set of storage containing the ones from t1 and the ones
from t2.
Check that nothing impacts dump data until everything is commited.
"""
t1_storage_tid_dict = {'s1': 1}
t2_storage_tid_dict = {'s2': 2}
t3_storage_tid_dict = {'s1': 2, 's2': 2}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't3', t3_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2})
def test_11_Scenario8(self, test_id):
"""
Simple increase case.
"""
self.assertEqual(self._client.dump(test_id), {})
t1_storage_tid_dict = {}
for s1_value in (1, 2):
previous_t1_storage_tid_dict = t1_storage_tid_dict
t1_storage_tid_dict = {'s1': s1_value}
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), previous_t1_storage_tid_dict)
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), t1_storage_tid_dict)
def run(self):
for test_method_id in [x for x in dir(self) if x.startswith('test')]:
self.log("Runing %s..." % (test_method_id, ))
try:
try:
getattr(self, test_method_id)(test_id=test_method_id)
except AssertionError:
self.log('F\n')
self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
finally:
self.log('\n')
def log(self, message):
sys.stdout.write(message)
if __name__ == '__main__':
assert len(sys.argv) == 3, 'Requires exactly 2 arguments: <address> <port>'
address = sys.argv[1]
port = int(sys.argv[2])
test = TestTIDServerV2(address, port)
test.run()
############################################################################
#
# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from ExchangeProtocol import ExchangeProtocol
from transaction._transaction import Transaction
from zLOG import LOG, WARNING, INFO
import socket
import thread
import struct
import sys
GET_LAST_COMMITED_TID_METHOD_ID = 'getLastCommitedTID'
TID_STORAGE_ADDRESS = ('127.0.0.1', 9001)
tid_storage = None
zope_identifier = None
LOG('TIDStorage',INFO,'Monkey patching transaction._transaction.Transaction._commitResources')
# Borrowed from CMFActivity.ActivityTool.getCurrentNode
def getZopeId():
""" Return current node in form ip:port """
global zope_identifier
if zope_identifier is None:
port = ''
from asyncore import socket_map
for k, v in socket_map.items():
if hasattr(v, 'port'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
port = v.port
break
assert port != '', 'zhttp_server not started yet'
ip = socket.gethostbyname(socket.gethostname())
if TID_STORAGE_ADDRESS[0] != '127.0.0.1':
assert ip != '127.0.0.1', 'self address must not be 127.0.0.1 if TIDStorage is remote'
zope_identifier = '%s:%s' %(ip, port)
return zope_identifier
def getFilestorageList(resource_list):
return getFilestorageToTIDMapping(resource_list).keys()
def getFilestorageToTIDMapping(resource_list):
datafs_tid_update_dict = {}
for resource in resource_list:
storage = getattr(resource, '_storage', None)
if storage is not None:
getLastCommitedTID = getattr(storage, GET_LAST_COMMITED_TID_METHOD_ID,
None)
if getLastCommitedTID is not None:
tid = getLastCommitedTID()
_addr = tuple([tuple(x) for x in getattr(storage, '_addr', [])])
_storage = getattr(storage, '_storage', '')
datafs_id = repr((_addr, _storage))
assert datafs_id not in datafs_tid_update_dict
if tid is None:
datafs_tid_update_dict[datafs_id] = None
else:
# unpack stolen from ZODB/utils.py:u64
datafs_tid_update_dict[datafs_id] = struct.unpack(">Q", tid)[0]
return datafs_tid_update_dict
class BufferedSocket:
"""
Write-only thread-safe buffered socket.
Attemps to reconnect at most once per flush.
"""
_socket_lock = thread.allocate_lock()
_connected = False
def __init__(self, address):
self._socket = socket.socket()
self._address = address
self._send_buffer_dict = {}
def _connect(self):
try:
self._socket.connect(self._address)
self._notifyConnected()
except socket.error, message:
# We don't want to have an error line per failed connection attemp, to
# avoid flooding the logfile.
pass
def _getSendBuffer(self, ident):
send_buffer = self._send_buffer_dict.get(ident)
if send_buffer is None:
send_buffer = self._send_buffer_dict[ident] = []
return send_buffer
def _notifyDisconnected(self, message):
if self._connected:
self._connected = False
LOG('TIDStorage', WARNING, 'Disconnected: %s' % (message, ))
def _notifyConnected(self):
if not self._connected:
self._connected = True
# Display a log message at WARNING level, so that reconnection message
# are visible when disconnection messages are visible, even if it is
# not a warning, properly speaking.
LOG('TIDStorage', WARNING, 'Connected')
def send(self, to_send):
send_buffer = self._getSendBuffer(thread.get_ident())
send_buffer.append(to_send)
def flush(self):
"""
Flush send buffer and actually send data, with extra checks to behave
nicely if connection is broken.
Do not retry to send if something goes wrong (data is then lost !).
Here, most important thing is speed, not data.
Serialize usage.
"""
ident = thread.get_ident()
self._socket_lock.acquire()
try:
if not self._connected:
self._connect()
if self._connected:
try:
self._socket.sendall(''.join(self._getSendBuffer(ident)))
except socket.error, message:
self._notifyDisconnected(message)
try:
self._socket.shutdown(socket.SHUT_RDWR)
except socket.error:
self._socket.close()
self._socket = socket.socket()
finally:
self._socket_lock.release()
self._send_buffer_dict[ident] = []
class TIDClient:
"""Simple by design write only TIDClient using BufferedSocket"""
def __init__(self, address):
self._buffered_socket = BufferedSocket(address)
self._field_exchange = ExchangeProtocol(socket=self._buffered_socket)
def commit(self, tid_update_dict):
"""
Send given dict to TIDStorage server.
"""
self._send_command('commit')
self._field_exchange.send_dict(tid_update_dict)
self._buffered_socket.flush()
def begin(self, storage_id_list):
"""
Inform TIDStorage connection tracking that commit was initiated.
"""
self._send_command('begin')
self._field_exchange.send_list(storage_id_list)
self._buffered_socket.flush()
def abort(self):
"""
Inform TIDStorage connection tracking that commit was aborted.
"""
self._send_command('abort')
self._buffered_socket.flush()
def _send_command(self, command):
"""
Every command must be followed by an identifier.
This identifier is used to track transactions, so the same identifier
must not be used twice at the same time, but can be reused later.
"""
self._field_exchange.send_field(command)
self._field_exchange.send_field('%s_%x' % (getZopeId(), thread.get_ident()))
original__commitResources = Transaction._commitResources
def _commitResources(self, *args, **kw):
"""
Hook Transaction's _commitResources.
Before:
- Initialise TIDClient if needed
- Check if there is any storage we are interested in in current commit
- If so, issue a begin
After (2 cases):
- original__commitResources raised:
- Issue an abort
- otherwise:
- Issue a commit
Note to editors: Prevent your code from raising anything ! This method
MUST NOT raise any exception, except that it MUST NOT hide any exception
raised by original__commitResources.
"""
has_storages = False
try:
global tid_storage
if tid_storage is None:
tid_storage = TIDClient(TID_STORAGE_ADDRESS)
filestorage_list = getFilestorageList(self._resources)
if len(filestorage_list):
has_storages = True
tid_storage.begin(filestorage_list)
except:
LOG('TIDStorage _commitResources', WARNING, 'Exception in begin phase', error=sys.exc_info())
try:
result = original__commitResources(self, *args, **kw)
except:
if has_storages:
exception = sys.exc_info()
try:
tid_storage.abort()
except:
LOG('TIDStorage _commitResources', WARNING, 'Exception in abort phase', error=sys.exc_info())
# Re-raise original exception, in case sendTIDCommitAbort tainted
# last exception value.
raise exception[0], exception[1], exception[2]
else:
raise
else:
if has_storages:
# Now that everything has been commited, all exceptions relative to added
# code must be swalowed (but still reported) to avoid confusing transaction
# system.
try:
tid_storage.commit(getFilestorageToTIDMapping(self._resources))
except:
LOG('TIDStorage _commitResources', WARNING, 'Exception in commit phase', error=sys.exc_info())
return result
Transaction._commitResources = _commitResources
#!/usr/bin/python
##############################################################################
#
# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# Checks the sanity of a tidstorage TID log provided via stdin.
# Exit status:
# 0 Success
# 1 Failure
# Error is displayed on stderr.
# On success, final tid values for each storage is displayed on stdout.
import sys
content = {}
last_timestamp = None
line = sys.stdin.readline()
while line != '':
split_line = line.split(' ', 2)
assert len(split_line) == 3, repr(split_line)
line_timestamp, line_type, line_dict = split_line
line_timestamp = float(line_timestamp)
assert line_type in ('f', 'd'), repr(line_type)
if last_timestamp is None:
last_timestamp = line_timestamp
else:
assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
line_dict = eval(line_dict, None)
assert isinstance(line_dict, dict), type(line_dict)
assert len(line_dict), repr(line_dict)
if line_type == 'd':
for key, value in line_dict.iteritems():
if key in content:
assert content[key] < value, '%r < %r' % (content[key], value)
content[key] = value
elif line_type == 'f':
for key, value in content.iteritems():
assert key in line_dict, repr(key)
assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
content = line_dict
line = sys.stdin.readline()
key_list = content.keys()
key_list.sort()
for key in key_list:
print '%r %r' % (key, content[key])
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2009 Nexedi SARL and Contributors. All Rights Reserved.
# Łukasz Nowak <luke@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# Dumps TIDStorage dict configuration
from TIDClient import TIDClient
import sys
from struct import pack
from base64 import encodestring
from pprint import pprint
def main():
address = sys.argv[1]
port = int(sys.argv[2])
client = TIDClient((address, port))
if not client.bootstraped():
raise ValueError('Server not bootstraped')
dump_dict = client.dump_all()
if not dump_dict:
raise ValueError('Received empty dict from server')
for key,value in dump_dict.iteritems():
dump_dict[key] = encodestring(pack('>Q', value)).rstrip()
pprint(dump_dict)
if __name__ == '__main__':
assert len(sys.argv) == 3, 'Requires exactly 2 arguments: <address> <port>'
sys.exit(main())
#!/usr/bin/python
##############################################################################
#
# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# Transforms a TIDStorage TID log provided on stdin, which might contain
# full statuses and/or incremental changes, and provides a version on stdout
# containing only full statuses.
# Also, does sanity checks on the given file.
# Exit status:
# 0 Success
# 1 Failure
import sys
content = {}
last_timestamp = None
line = sys.stdin.readline()
while line != '':
split_line = line.split(' ', 2)
assert len(split_line) == 3, repr(split_line)
line_timestamp, line_type, line_dict = split_line
line_timestamp = float(line_timestamp)
assert line_type in ('f', 'd'), repr(line_type)
if last_timestamp is None:
last_timestamp = line_timestamp
else:
assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
line_dict = eval(line_dict, None)
assert isinstance(line_dict, dict), type(line_dict)
assert len(line_dict), repr(line_dict)
if line_type == 'd':
for key, value in line_dict.iteritems():
if key in content:
assert content[key] < value, '%r < %r' % (content[key], value)
content[key] = value
print '%r f %r' % (line_timestamp, content)
elif line_type == 'f':
for key, value in content.iteritems():
assert key in line_dict, repr(key)
assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
content = line_dict
print line.strip()
line = sys.stdin.readline()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment