Commit 8f874b83 authored by Aurel's avatar Aurel

use a thread mixing class for storage and implement a dispatcher to manage packets


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@68 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 112d91e3
from Queue import Queue
from threading import Lock
from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.utils import p64, u64, cp, z64
from thread import get_ident
from neo.client.dispatcher import Dispatcher
from neo.event import EventManager
import logging
class NEOStorageError(POSException.StorageError):
pass
......@@ -15,19 +21,38 @@ class NEOStorage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient."""
def __init__(self, master_addr, master_port, read_only=False, **kw):
def __init__(self, master_addr, master_port, name, read_only=False, **kw):
self._is_read_only = read_only
from neo.client.app import Application # here to prevent recursive import
self.app = Application(master_addr, master_port)
# Transaction must be under protection of lock
l = Lock()
self._txn_lock_acquire = l.acquire
self._txn_lock_release = l.release
# Create two queue for message between thread and dispatcher
# - message queue is for message that has to be send to other node
# through the dispatcher
# - request queue is for message receive from other node which have to
# be processed
message_queue = Queue()
request_queue = Queue()
# Create the event manager
em = EventManager()
# Create dispatcher thread
dispatcher = Dispatcher(em, message_queue, request_queue)
dispatcher.setDaemon(True)
dispatcher.start()
# Import here to prevent recursive import
from neo.client.app import Application
self.app = Application(master_addr, master_port, name, em, dispatcher,
message_queue, request_queue)
def load(self, oid, version=None):
try:
self.app.load(oid)
return self.app.process_method('load', oid=oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid)
def close(self):
self.app.close()
return self.app.process_method('close')
def cleanup(self):
raise NotImplementedError
......@@ -43,31 +68,39 @@ class NEOStorage(BaseStorage.BaseStorage,
def new_oid(self):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.new_oid()
return self.app.process_method('new_oid')
def tpc_begin(self, transaction, tid=None, status=' '):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.tpc_begin(transaction, tid, status)
self._txn_lock_acquire()
return self.app.process_method('tpc_begin', transaction=transaction, tid=tid, status=status)
def tpc_vote(self, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.tpc_vote(transaction)
return self.app.process_method('tpc_vote', transaction=transaction)
def tpc_abort(self, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.tpc_abort(transaction)
try:
return self.app.process_method('tpc_abort', transaction=transaction)
except:
self._txn_lock_release()
def tpc_finish(self, transaction, f=None):
self.app.tpc_finish(transaction, f)
try:
return self.app.process_method('tpc_finish', transaction=transaction, f=f)
except:
self._txn_lock_release()
def store(self, oid, serial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
try:
self.app.store(oid, serial, data, version, transaction)
return self.app.process_method('store', oid=oid, serial=serial, data=data,
version=version, transaction=transaction)
except NEOStorageConflictError:
new_data = self.tryToResolveConflict(oid, self.app.tid,
serial, data)
......@@ -84,20 +117,20 @@ class NEOStorage(BaseStorage.BaseStorage,
def getSerial(self, oid):
try:
self.app.getSerial(oid)
return self.app.process_method('getSerial', oid=oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid)
# mutliple revisions
def loadSerial(self, oid, serial):
try:
self.app.loadSerial(oid,serial)
return self.app.process_method('loadSerial', oid=oid, serial=serial)
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid, serial)
def loadBefore(self, oid, tid):
try:
self.app.loadBefore(self, oid, tid)
return self.app.process_method('loadBefore', oid=oid, tid=tid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid, tid)
......@@ -108,23 +141,16 @@ class NEOStorage(BaseStorage.BaseStorage,
def undo(self, transaction_id, txn):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.undo(transaction_id, txn)
def undoInfo(self, first=0, last=-20, specification=None):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.undoInfo(first, last, specification)
self._txn_lock_acquire()
try:
return self.app.process_method('undo', transaction_id=transaction_id, txn=txn)
except:
self._txn_lock_release()
def undoLog(self, first, last, filter):
if self._is_read_only:
raise POSException.ReadOnlyError()
# This should not be used by ZODB
# instead it should use undoInfo
# Look at ZODB/interface.py for more info
if filter is not None:
return []
else:
return self.undoInfo(first, last)
return self.undoLog(first, last, filter)
def supportsUndo(self):
return 0
......
import logging
import os
from time import time
from threading import Lock, Condition
from threading import Lock, Condition, Thread, local
from cPickle import dumps, loads
from zlib import compress, adler32, decompress
from Queue import Queue, Empty
from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection
from neo.protocol import Packet, INVALID_UUID, CLIENT_NODE_TYPE, UP_TO_DATE_STATE
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, CLIENT_NODE_TYPE, \
UP_TO_DATE_STATE, FEEDING_STATE
from neo.client.handler import ClientEventHandler
from neo.client.NEOStorage import NEOStorageConflictError, NEOStorageNotFoundError
from neo.client.multithreading import ThreadingMixIn
class ConnectionManager(object):
"""This class manage a pool of connection to storage node."""
......@@ -36,12 +38,13 @@ class ConnectionManager(object):
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid, addr[0],
addr[1], self.storage.name)
conn.expectMessage(msg_id)
while 1:
self.em.poll(1)
if self.storage_node is not None:
break
logging.debug('connected to a storage node %s' %(addr,))
self.storage.local_var.tmp_q = Queue(1)
self.storage.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.storage.local_var.storage_node = None
self.storage._waitMessage()
if self.storage.storage_node is None:
raise NEOStorageError('Connection to storage node failed')
logging.debug('connected to storage node %s' %(addr,))
return conn
def _dropConnection(self,):
......@@ -80,61 +83,51 @@ class ConnectionManager(object):
return self._createNodeConnection(node)
class Application(object):
class Application(ThreadingMixIn, object):
"""The client node application."""
def __init__(self, master_addr, master_port, name, **kw):
def __init__(self, master_addr, master_port, name, em, dispatcher, message_queue,
request_queue, **kw):
logging.basicConfig(level = logging.DEBUG)
logging.debug('master node address is %s, port is %d' %(master_addr, master_port))
# Internal Attributes
logging.debug('master node address is %s, port is %d' %(master_addr,
master_port))
# Internal Attributes common to all thread
self.name = name
self.em = EventManager()
self.em = em
self.dispatcher = dispatcher
self.nm = NodeManager()
self.cm = ConnectionManager(self)
self.pt = None
self.queue = message_queue
self.request_queue = request_queue
self.primary_master_node = None
self.master_conn = None
self.uuid = None
self.mq_cache = MQ()
self.new_oid_list = [] # List of new oid for ZODB
self.txn_data_dict = {} # Data for each object used in current transaction
self.txn = None # The current transaction
self.tid = None # The current transaction id
self.txn_finished = 0 # Flag to know when transaction finished on master
self.txn_stored = 0 # Flag to knwo when transaction has well been stored
self.loaded_object = None # Current data of the object we are loading
self.history = None # History data for a given object
# object_stored is used to know if storage node
# accepted the object or raised a conflict
# 0 : no answer yet
# -1 : conflict
# oid, serial : ok
self.object_stored = 0
self.new_oid_list = []
# Transaction specific variable
self.tid = None
self.txn = None
self.txn_data_dict = {}
self.txn_obj_stored = 0
self.txn_voted = 0
self.txn_finished = 0
# Internal attribute distinct between thread
self.local_var = local()
# Lock definition :
# _return_lock is used to return data from thread to ZODB
# _oid_lock is used in order to not call multiple oid
# generation at the same time
# _txn_lock lock the entire transaction process, it is acquire
# at tpc begin and release at tpc_finish or tpc_abort
# _cache_lock is used for the client cache
# _load_lock is acquire to protect self.loaded_object used in event
# handler when retrieving object from storage node
# _info_lock is used when retrieving information for object or transaction
lock = Lock()
self._return_lock_acquire = lock.acquire
self._return_lock_release = lock.release
lock = Lock()
self._oid_lock_acquire = lock.acquire
self._oid_lock_release = lock.release
lock = Lock()
self._txn_lock_acquire = lock.acquire
self._txn_lock_release = lock.release
lock = Lock()
self._cache_lock_acquire = lock.acquire
self._cache_lock_release = lock.release
lock = Lock()
self._load_lock_acquire = lock.acquire
self._load_lock_release = lock.release
lock = Lock()
self._info_lock_acquire = lock.acquire
self._info_lock_release = lock.release
# XXX Generate an UUID for self. For now, just use a random string.
# Avoid an invalid UUID.
if self.uuid is None:
......@@ -143,7 +136,6 @@ class Application(object):
if uuid != INVALID_UUID:
break
self.uuid = uuid
# Connect to primary master node
defined_master_addr = (master_addr, master_port)
while 1:
......@@ -160,9 +152,27 @@ class Application(object):
pass
logging.info("connected to primary master node")
def _waitMessage(self):
"""Wait for a message returned by dispatcher in queues."""
# First get message we are waiting for
message = None
message = self.local_var.tmp_q.get(True, None)
if message is not None:
message[0].handler.dispatch(message[0], message[1])
# Now check if there is global messages and execute them
global_message = None
while 1:
try:
global_message = self.request_queue.get_nowait()
except Empty:
break
if global_message is not None:
global_message[0].handler.dispatch(message[0], message[1])
def connectToPrimaryMasterNode(self, defined_master_addr):
"""Connect to the primary master node."""
handler = ClientEventHandler(self)
handler = ClientEventHandler(self, self.dispatcher)
n = MasterNode(server = defined_master_addr)
self.nm.add(n)
......@@ -174,10 +184,14 @@ class Application(object):
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
defined_master_addr[0],
defined_master_addr[1], self.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
# send message to dispatcher
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.primary_master_node = None
self.node_not_ready = 0
while 1:
self.em.poll(1)
self._waitMessage()
if self.primary_master_node is not None:
break
if self.node_not_ready:
......@@ -197,15 +211,17 @@ class Application(object):
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
self.primary_master_node.server[0],
self.primary_master_node.server[1] , self.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
# send message to dispatcher
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.master_conn = conn
# Wait for primary master node information
while 1:
self.em.poll(1)
self._waitMessage()
if self.pt.filled() or self.node_not_ready:
break
def new_oid(self):
"""Get a new OID."""
self._oid_lock_acquire()
......@@ -219,17 +235,16 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.askNewOIDs(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
while 1:
self.em.poll(1)
if len(self.new_oid_list) > 0:
break
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self._waitMessage()
if len(self.new_oid_list) <= 0:
raise NEOStorageError('new_oid failed')
return self.new_oid_list.pop()
finally:
self._oid_lock_release()
def getSerial(self, oid):
# Try in cache first
self._cache_lock_acquire()
......@@ -242,61 +257,64 @@ class Application(object):
hist = self.history(oid, length=1, object_only=1)
if len(hist) == 0:
raise NEOStorageNotFoundError()
return hist[0][0]
if hist[0] != oid:
raise NEOStorageError('getSerial failed')
return hist[1][0][0]
def _load(self, oid, serial="", cache=0):
def _load(self, oid, serial=INVALID_TID, tid=INVALID_TID, cache=0):
"""Internal method which manage load ,loadSerial and loadBefore."""
partition_id = oid % self.num_paritions
# Only used up to date node for retrieving object
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
if x.getState() == UP_TO_DATE_STATE]
self._load_lock_acquire()
data = None
# Store data on each node
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
p = Packet()
p.askObjectByOID(msg_id, oid, serial)
conn.addPacket(p)
conn.expectMessage(msg_id)
p.askObject(msg_id, oid, serial, tid)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer
self.loaded_object = None
try:
while 1:
self.em.poll(1)
if self.loaded_object is not None:
break
if self.loaded_object == -1:
self.local_var.asked_object = 0
# asked object retured value are :
# -1 : oid not found
# other : data
self._waitMessage()
if self.local_var.asked_object == -1:
# OID not found
# XXX either try with another node, either raise error here
# for now try with another node
continue
# Copy object data here to release lock as soon as possible
noid, start_serial, end_serial, compression, checksum, data = self.loaded_object
finally:
self._load_lock_release()
# Check data here
# Check data
noid, start_serial, end_serial, compression, checksum, data = self.local_var.loaded_object
if noid != oid:
# Oops, try with next node
logging.error('got wrong oid %s instead of %s from node %s' \
%(noid, oid, storage_node.getServer()))
# Reacquire lock and try again
self._load_lock_acquire()
continue
elif compression and checksum != adler32(data):
# Check checksum if we use compression
logging.error('wrong checksum from node %s for oid %s' \
%(storage_node.getServer(), oid))
# Reacquire lock and try again
self._load_lock_acquire()
continue
else:
# Everything looks allright
break
if self.loaded_object == -1:
# We didn't got any object from storage node
if self.local_var.loaded_object == -1:
# We didn't got any object from all storage node
raise NEOStorageNotFoundError()
# Uncompress data
if compression:
data = decompressed(data)
# Put in cache only when using load
if cache:
self.cache_lock_acquire()
......@@ -306,6 +324,7 @@ class Application(object):
self.cache_lock_release()
return loads(data), start_serial, end_serial
def load(self, oid, version=None):
"""Load an object for a given oid."""
# First try from cache
......@@ -318,26 +337,26 @@ class Application(object):
# Otherwise get it from storage node
return self._load(oid, cache=1)[:2]
def loadSerial(self, oid, serial):
"""Load an object for a given oid and serial."""
# Do not try in cache as it managed only up-to-date object
return self._load(oid, serial)[:2], None
def loadBefore(oid, tid):
"""Load an object for a given oid before tid committed."""
# Do not try in cache as it managed only up-to-date object
return self._load(oid, tid)
def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction."""
# First get a transaction, only one is allowed at a time
self._txn_lock_acquire()
if self.txn == transaction:
# Wa have already began the same transaction
# We already begin the same transaction
return
self.txn = transaction
# Init list of oid used in this transaction
self.txn_oid_list = {}
# Get a new transaction id if necessary
if tid is None:
self.tid = None
......@@ -345,16 +364,16 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.askNewTID(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer
while 1:
self.em.poll(1)
if self.tid is not None:
break
self._waitMessage()
if self.tid is None:
raise NEOStorageError('tpc_begin failed')
else:
self.tid = tid
def store(self, oid, serial, data, version, transaction):
"""Store object."""
if transaction is not self.txn:
......@@ -362,40 +381,35 @@ class Application(object):
# Find which storage node to use
partition_id = oid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
# Store data on each node
ddata = dumps(data)
compressed_data = compress(ddata)
crc = adler32(compressed_data)
checksum = adler32(compressed_data)
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
p = Packet()
# Compres data with zlib
p.askStoreObject(msg_id, oid, serial, 1, crc, compressed_data)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.object_stored = 0
while 1:
self.em.poll(1)
if self.object_stored != 0:
break
p.askStoreObject(msg_id, oid, serial, 1, checksum, compressed_data)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Check we don't get any conflict
self.txn_object_stored = 0
self._waitMessage()
if self.object_stored == -1:
if txn_data_dict.has_key(oid):
if self.txn_data_dict.has_key(oid):
# One storage already accept the object, is it normal ??
# remove from dict and raise ConflictError, don't care of
# previous node which already store data as it would be resent
# again if conflict is resolved or txn will be aborted
txn_dict.pop(oid)
self.txn_data_dict.pop(oid)
raise NEOStorageConflictError()
noid, nserial = self.object_stored
if noid != oid:
# Huh!!
raise NEOStorageError()
else:
# Store object in tmp cache
noid, nserial = self.object_stored
self.txn_data_dict[oid] = ddata
break
def tpc_vote(self, transaction):
"""Store current transaction."""
......@@ -404,34 +418,37 @@ class Application(object):
user = transaction.user
desc = transaction.description
ext = dumps(transaction._extension)
# Store data on each node
partition_id = self.tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
# Store data on each node
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
p = Packet()
p.askStoreTransaction(msg_id, self.tid, user, desc, ext, oid_list)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.txn_stored == 0
while 1:
self.em.poll(1)
if self.txn_stored:
break
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.txn_voted == 0
self._waitMessage()
if self.txn_voted != 1:
raise NEOStorageError('tpc_vote failed')
def _clear_txn(self):
"""Clear some transaction parameter and release lock."""
"""Clear some transaction parameters."""
self.tid = None
self.txn = None
self._txn_lock_release()
self.txn_data_dict = {}
self.txn_voted = 0
self.txn_finished = 0
def tpc_abort(self, transaction):
"""Abort current transaction."""
if transaction is not self.txn:
return
try:
# Abort transaction on each node used for it
# In node where objects were stored
# Abort txn in node where objects were stored
aborted_node = {}
for oid in self.txn_oid_list:
partition_id = oid % self.num_paritions
......@@ -442,9 +459,10 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.abortTransaction(msg_id, self.tid)
conn.addPacket(p)
self.queue.put((None, msg_id, conn, p), True)
aborted_node[storage_node] = 1
# In nodes where transaction was stored
# Abort in nodes where transaction was stored
partition_id = self.tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
......@@ -453,10 +471,11 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.abortTransaction(msg_id, self.tid)
conn.addPacket(p)
self.queue.put((None, msg_id, conn, p), True)
finally:
self._clear_txn()
def tpc_finish(self, transaction, f=None):
"""Finish current transaction."""
if self.txn is not transaction:
......@@ -470,14 +489,13 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.finishTransaction(msg_id, self.oid_list, self.tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer
self.txn_finished = 0
while 1:
self.em.poll(1)
if self.txn_finished:
break
self._waitMessage()
if self.txn_finished != 1:
raise NEOStorateError('tpc_finish failed')
# Update cache
self.cache_lock_acquire()
try:
......@@ -487,17 +505,17 @@ class Application(object):
self.cache[oid] = self.tid, ddata
finally:
self.cache_lock_release()
# Release transaction
return self.tid
finally:
self._clear_txn()
def undo(self, transaction_id, txn):
# XXX conflict and non-undoable txn management is missing
if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction)
# First get transaction information from master node
self._info_lock_acquire()
try:
partition_id = transaction_id % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
......@@ -505,17 +523,23 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.askTransactionInformation(msg_id, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer
self.txn_info = None
while 1:
self.em.poll(1)
if self.txn_info is not None:
self.local_var.txn_info = 0
self._waitMessage()
if self.local_var.txn_info == -1:
# Tid not found, try with next node
continue
elif isinstance(self.local_var.txn_info, {}):
break
oid_list = self.txn_info['oids']
finally:
self._info_lock_releas()
else:
raise NEOStorageError('undo failed')
if self.local_var.txn_info == -1:
raise NEOStorageError('undo failed')
oid_list = self.local_var.txn_info['oids']
# Second get object data from storage node using loadBefore
data_dict = {}
for oid in oid_list:
......@@ -529,26 +553,40 @@ class Application(object):
self.tpc_vote(txn)
self.tpc_finish(txn)
def undoInfo(self, first, last, specification=None):
# First get list of transaction from master node
self._info_lock_acquire()
try:
conn = self.master_conn
def undoLog(self, first, last, filter=None):
if last < 0:
# See FileStorage.py for explanation
last = first - last
# First get list of transaction from all storage node
storage_node_list = [x for x in self.pt.getNodeList() if x.getState() \
in (UP_TO_DATE_STATE, FEEDING_STATE)]
self.local_var.node_tids = {}
self.local_var.tmp_q = Queue(len(storage_node_list))
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
p = Packet()
p.askTIDs(msg_id, first, last, specification)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.undo_tid_list = None
while 1:
self.em.poll(1)
# must take care of order here
if self.undo_tid_list is not None:
p.askTIDs(msg_id, first, last)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer from all storages
while True:
self._waitMessage()
if len(self.local_var.node_tids) == len(storage_node_list):
break
# Reorder tids
ordered_tids = []
for tids in self.local_var.node_tids.values():
ordered_tids.append(tids)
# XXX do we need a special cmp function here ?
ordered_tids.sort(reverse=True)
# For each transaction, get info
undo_txn_list = []
for tid in undo_tid_list:
undo_info = []
for tid in ordered_tids:
partition_id = tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
......@@ -556,24 +594,33 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.askTransactionInformation(msg_id, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer
self.txn_info = None
while 1:
self.em.poll(1)
if self.txn_info is not None:
self.local_var.txn_info = 0
self._waitMessage()
if self.local_var.txn_info == -1:
# TID not found, go on with next node
continue
elif isinstance(self.local_var.txn_info, {}):
break
# Filter result if needed
if filter is not None:
# Filter method return True if match
if not filter(self.local_var.txn_info['description']):
continue
# Append to returned list
self.local_var.txn_info.pop("oids")
undo_info.append(self.local_var.txn_info)
if len(undo_info) >= last-first:
break
self.txn_info.pop("oids")
undo_txn_list.append(self.txn_info)
return undo_txn_dict
finally:
self._info_lock_release()
return undo_info
def history(self, oid, version, length=1, filter=None, object_only=0):
self._info_lock_acquire()
history_list = []
try:
# Get history informations for object first
partition_id = oid % self.num_paritions
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
......@@ -583,24 +630,25 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.askObjectHistory(msg_id, oid, length)
conn.addPacket(p)
conn.expectMessage(msg_id)
# must be a list of dict
self.history = None
while 1:
self.em.poll(1)
if self.history is not None:
break
if self.history == -1:
# not found, go on with next node
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.local_var.history = None
self._waitMessage()
if self.local_var.history == -1:
# Not found, go on with next node
continue
if self.history[0] != oid:
# got history for wrong oid
if self.local_var.history[0] != oid:
# Got history for wrong oid
continue
if not isinstance(self.local_var.history, {}):
raise NEOStorageError('history failed')
if object_only:
return self.history[1]
# Use by getSerial
return self.local_var.history
# Now that we have object informations, get txn informations
for serial, size in self.hisory[1]:
history_list = []
for serial, size in self.local_var.hisory[1]:
partition_id = tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
......@@ -608,14 +656,17 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.askTransactionInformation(msg_id, serial)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer
self.txn_info = None
while 1:
self.em.poll(1)
if self.txn_info is not None:
self.local_var.txn_info = None
self._waitMessage()
if self.local_var.txn_info == -1:
# TID not found
continue
if isinstance(self.local_var.txn_info, {}):
break
# create history dict
self.txn_info.remove('id')
self.txn_info.remove('oids')
......@@ -623,9 +674,8 @@ class Application(object):
self.txn_info['version'] = None
self.txn_info['size'] = size
history_list.append(self.txn_info)
return history_list
finally:
self._info_lock_release()
def __del__(self):
"""Clear all connection."""
......
from threading import Thread
from Queue import Empty
from neo.protocol import PING, Packet
class Dispatcher(Thread):
"""Dispatcher class use to redirect request to thread."""
def __init__(self, em, message_queue, request_queue, **kw):
Thread.__init__(self, **kw)
self._message_queue = message_queue
self._request_queue = request_queue
self.em = em
# This dict is used to associate conn/message id to client thread queue
# and thus redispatch answer to the original thread
self.message_table = {}
def run(self):
while 1:
# First check if we receive any new message from other node
self.message = None
m = None
self.em.poll(1)
if self.message is not None:
conn, packet = self.message
# now send message to waiting thread
key = "%s-%s" %(conn.getUUID(),packet.getId())
if self.message_table.has_key(key):
tmp_q = self.message_table.pop(key)
tmp_q.put(self.message, True)
else:
conn, packet = self.message
method_type = packet.getType()
if method_type == PING:
# must answer with no delay
conn.addPacket(Packet().pong(packet.getId()))
else:
# put message in request queue
self._request_queue.put(self.message, True)
# Then check if a client ask me to send a message
try:
m = self._message_queue.get_nowait()
if m is not None:
tmp_q, msg_id, conn, p = m
conn.addPacket(p)
conn.expectMessage(msg_id)
if tmp_q is not None:
key = "%s-%s" %(conn.getUUID(),msg_id)
self.message_table[key] = tmp_q
except Empty:
continue
......@@ -9,14 +9,20 @@ from neo.pt import PartitionTable
from ZODB.TimeStamp import TimeStamp
from ZODB.utils import p64
from thread import get_ident
class ClientEventHandler(EventHandler):
"""This class deals with events for a master."""
def __init__(self, app):
def __init__(self, app, dispatcher):
self.app = app
self.dispatcher = dispatcher
EventHandler.__init__(self)
def packetReceived(self, conn, packet):
logging.debug("received packet id %s" %(packet.getId(),))
self.dispatcher.message = conn, packet
def handleNotReady(self, conn, packet, message):
if isinstance(conn, ClientConnection):
app = self.app
......@@ -54,8 +60,12 @@ class ClientEventHandler(EventHandler):
# Ask a primary master.
msg_id = conn.getNextId()
conn.addPacket(Packet().askPrimaryMaster(msg_id))
conn.expectMessage(msg_id)
p = Packet()
p.askPrimaryMaster(msg_id)
# send message to dispatcher
app.queue.put((app.local_var.tmp_q, msg_id, conn, p), True)
elif node_type == STORAGE_NODE_TYPE:
app.storage_node = node
else:
self.handleUnexpectedPacket(conn, packet)
......@@ -206,8 +216,7 @@ class ClientEventHandler(EventHandler):
if isinstance(conn, ClientConnection):
app = self.app
if tid != app.tid:
# What's this ?
raise NEOStorageError
app.txn_finished = -1
else:
app.txn_finished = 1
else:
......@@ -225,16 +234,16 @@ class ClientEventHandler(EventHandler):
self.handleUnexpectedPacket(conn, packet)
# Storage node handler
def handleAnwserObjectByOID(self, oid, start_serial, end_serial, compression,
def handleAnwserObject(self, conn, packet, oid, start_serial, end_serial, compression,
checksum, data):
if isinstance(conn, ClientConnection):
app = self.app
app.loaded_object = (oid, start_serial, end_serial, compression,
app.local_var.loaded_object = (oid, start_serial, end_serial, compression,
checksum, data)
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerStoreObject(self, conflicting, oid, serial):
def handleAnswerStoreObject(self, conn, packet, conflicting, oid, serial):
if isinstance(conn, ClientConnection):
app = self.app
if conflicting == '1':
......@@ -244,14 +253,14 @@ class ClientEventHandler(EventHandler):
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerStoreTransaction(self, tid):
def handleAnswerStoreTransaction(self, conn, packet, tid):
if isinstance(conn, ClientConnection):
app = self.app
app.txn_stored = 1
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerTransactionInformation(self, tid, user, desc, oid_list):
def handleAnswerTransactionInformation(self, conn, packet, tid, user, desc, oid_list):
if isinstance(conn, ClientConnection):
app = self.app
# transaction information are returned as a dict
......@@ -261,11 +270,11 @@ class ClientEventHandler(EventHandler):
info['description'] = desc
info['id'] = p64(long(tid))
info['oids'] = oid_list
app.txn_info = info
app.local_var.txn_info = info
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerObjectHistory(self, oid, history_list):
def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
if isinstance(conn, ClientConnection):
app = self.app
# history_list is a list of tuple (serial, size)
......@@ -273,3 +282,22 @@ class ClientEventHandler(EventHandler):
else:
self.handleUnexpectedPacket(conn, packet)
def handleOidNotFound(self, conn, packet, message):
if isinstance(conn, ClientConnection):
app = self.app
# This can happen either when :
# - loading an object
# - asking for history
self.local_var.asked_object = -1
self.local_var.history = -1
else:
self.handleUnexpectedPacket(conn, packet)
def handleTidNotFound(self, conn, packet, message):
if isinstance(conn, ClientConnection):
app = self.app
# This can happen when requiring txn informations
self.local_var.txn_info = -1
else:
self.handleUnexpectedPacket(conn, packet)
from threading import Thread
class ThreadingMixIn:
"""Mix-in class to handle each method in a new thread."""
def process_method_thread(self, method, kw):
m = getattr(self, method)
try:
r = m(**kw)
finally:
self._return_lock_acquire()
self.returned_data = r
def process_method(self, method, **kw):
"""Start a new thread to process the method."""
t = Thread(target = self.process_method_thread,
args = (method, kw))
t.start()
# wait for thread to be completed, returned value must be
# under protection of a lock
try:
t.join()
return self.returned_data
finally:
self._return_lock_release()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment