Commit 815ff693 authored by Vincent Pelletier's avatar Vincent Pelletier

Add tiny_profiler measure points.

Given measure points were used to search for hot spots in client around
"store" method.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1911 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 110cf175
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
from thread import get_ident from thread import get_ident
from cPickle import dumps from cPickle import dumps
from zlib import compress, decompress from zlib import compress as real_compress, decompress
from neo.locking import Queue, Empty from neo.locking import Queue, Empty
from random import shuffle from random import shuffle
from time import sleep from time import sleep
...@@ -32,7 +32,7 @@ from neo import logging ...@@ -32,7 +32,7 @@ from neo import logging
from neo import protocol from neo import protocol
from neo.protocol import NodeTypes, Packets from neo.protocol import NodeTypes, Packets
from neo.event import EventManager from neo.event import EventManager
from neo.util import makeChecksum, dump from neo.util import makeChecksum as real_makeChecksum, dump
from neo.locking import Lock from neo.locking import Lock
from neo.connection import MTClientConnection from neo.connection import MTClientConnection
from neo.node import NodeManager from neo.node import NodeManager
...@@ -47,7 +47,22 @@ from neo.client.iterator import Iterator ...@@ -47,7 +47,22 @@ from neo.client.iterator import Iterator
from neo.client.mq import MQ from neo.client.mq import MQ
from neo.client.pool import ConnectionPool from neo.client.pool import ConnectionPool
from neo.util import u64, parseMasterList from neo.util import u64, parseMasterList
from neo.profiling import profiler_decorator, PROFILING_ENABLED
if PROFILING_ENABLED:
# Those functions require a "real" python function wrapper before they can
# be decorated.
@profiler_decorator
def compress(data):
return real_compress(data)
@profiler_decorator
def makeChecksum(data):
return real_makeChecksum(data)
else:
# If profiling is disabled, directly use original functions.
compress = real_compress
makeChecksum = real_makeChecksum
class ThreadContext(object): class ThreadContext(object):
...@@ -161,6 +176,7 @@ class Application(object): ...@@ -161,6 +176,7 @@ class Application(object):
self._nm_acquire = lock.acquire self._nm_acquire = lock.acquire
self._nm_release = lock.release self._nm_release = lock.release
@profiler_decorator
def _handlePacket(self, conn, packet, handler=None): def _handlePacket(self, conn, packet, handler=None):
""" """
conn conn
...@@ -186,6 +202,7 @@ class Application(object): ...@@ -186,6 +202,7 @@ class Application(object):
raise ValueError, 'Unknown node type: %r' % (node.__class__, ) raise ValueError, 'Unknown node type: %r' % (node.__class__, )
handler.dispatch(conn, packet) handler.dispatch(conn, packet)
@profiler_decorator
def _waitAnyMessage(self, block=True): def _waitAnyMessage(self, block=True):
""" """
Handle all pending packets. Handle all pending packets.
...@@ -209,6 +226,7 @@ class Application(object): ...@@ -209,6 +226,7 @@ class Application(object):
except ConnectionClosed: except ConnectionClosed:
pass pass
@profiler_decorator
def _waitMessage(self, target_conn, msg_id, handler=None): def _waitMessage(self, target_conn, msg_id, handler=None):
"""Wait for a message returned by the dispatcher in queues.""" """Wait for a message returned by the dispatcher in queues."""
get = self.local_var.queue.get get = self.local_var.queue.get
...@@ -225,6 +243,7 @@ class Application(object): ...@@ -225,6 +243,7 @@ class Application(object):
elif packet is not None: elif packet is not None:
self._handlePacket(conn, packet) self._handlePacket(conn, packet)
@profiler_decorator
def _askStorage(self, conn, packet): def _askStorage(self, conn, packet):
""" Send a request to a storage node and process it's answer """ """ Send a request to a storage node and process it's answer """
try: try:
...@@ -234,6 +253,7 @@ class Application(object): ...@@ -234,6 +253,7 @@ class Application(object):
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, self.storage_handler) self._waitMessage(conn, msg_id, self.storage_handler)
@profiler_decorator
def _askPrimary(self, packet): def _askPrimary(self, packet):
""" Send a request to the primary master and process it's answer """ """ Send a request to the primary master and process it's answer """
conn = self._getMasterConnection() conn = self._getMasterConnection()
...@@ -244,6 +264,7 @@ class Application(object): ...@@ -244,6 +264,7 @@ class Application(object):
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, self.primary_handler) self._waitMessage(conn, msg_id, self.primary_handler)
@profiler_decorator
def _getMasterConnection(self): def _getMasterConnection(self):
""" Connect to the primary master node on demand """ """ Connect to the primary master node on demand """
# acquire the lock to allow only one thread to connect to the primary # acquire the lock to allow only one thread to connect to the primary
...@@ -265,6 +286,7 @@ class Application(object): ...@@ -265,6 +286,7 @@ class Application(object):
self._getMasterConnection() self._getMasterConnection()
return self.pt return self.pt
@profiler_decorator
def _getCellListForOID(self, oid, readable=False, writable=False): def _getCellListForOID(self, oid, readable=False, writable=False):
""" Return the cells available for the specified OID """ """ Return the cells available for the specified OID """
pt = self._getPartitionTable() pt = self._getPartitionTable()
...@@ -275,6 +297,7 @@ class Application(object): ...@@ -275,6 +297,7 @@ class Application(object):
pt = self._getPartitionTable() pt = self._getPartitionTable()
return pt.getCellListForTID(tid, readable, writable) return pt.getCellListForTID(tid, readable, writable)
@profiler_decorator
def _connectToPrimaryNode(self): def _connectToPrimaryNode(self):
logging.debug('connecting to primary master...') logging.debug('connecting to primary master...')
ready = False ready = False
...@@ -377,6 +400,7 @@ class Application(object): ...@@ -377,6 +400,7 @@ class Application(object):
def getDB(self): def getDB(self):
return self._db return self._db
@profiler_decorator
def new_oid(self): def new_oid(self):
"""Get a new OID.""" """Get a new OID."""
self._oid_lock_acquire() self._oid_lock_acquire()
...@@ -398,6 +422,7 @@ class Application(object): ...@@ -398,6 +422,7 @@ class Application(object):
# return the last OID used, this is innacurate # return the last OID used, this is innacurate
return int(u64(self.last_oid)) return int(u64(self.last_oid))
@profiler_decorator
def getSerial(self, oid): def getSerial(self, oid):
# Try in cache first # Try in cache first
self._cache_lock_acquire() self._cache_lock_acquire()
...@@ -415,6 +440,7 @@ class Application(object): ...@@ -415,6 +440,7 @@ class Application(object):
return hist[1][0][0] return hist[1][0][0]
@profiler_decorator
def _load(self, oid, serial=None, tid=None, cache=0): def _load(self, oid, serial=None, tid=None, cache=0):
"""Internal method which manage load ,loadSerial and loadBefore.""" """Internal method which manage load ,loadSerial and loadBefore."""
cell_list = self._getCellListForOID(oid, readable=True) cell_list = self._getCellListForOID(oid, readable=True)
...@@ -489,6 +515,7 @@ class Application(object): ...@@ -489,6 +515,7 @@ class Application(object):
return data, start_serial, end_serial return data, start_serial, end_serial
@profiler_decorator
def load(self, oid, version=None): def load(self, oid, version=None):
"""Load an object for a given oid.""" """Load an object for a given oid."""
# First try from cache # First try from cache
...@@ -508,6 +535,7 @@ class Application(object): ...@@ -508,6 +535,7 @@ class Application(object):
self._load_lock_release() self._load_lock_release()
@profiler_decorator
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
"""Load an object for a given oid and serial.""" """Load an object for a given oid and serial."""
# Do not try in cache as it manages only up-to-date object # Do not try in cache as it manages only up-to-date object
...@@ -515,6 +543,7 @@ class Application(object): ...@@ -515,6 +543,7 @@ class Application(object):
return self._load(oid, serial=serial)[0] return self._load(oid, serial=serial)[0]
@profiler_decorator
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
"""Load an object for a given oid before tid committed.""" """Load an object for a given oid before tid committed."""
# Do not try in cache as it manages only up-to-date object # Do not try in cache as it manages only up-to-date object
...@@ -527,6 +556,7 @@ class Application(object): ...@@ -527,6 +556,7 @@ class Application(object):
return data, start, end return data, start, end
@profiler_decorator
def tpc_begin(self, transaction, tid=None, status=' '): def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction.""" """Begin a new transaction."""
# First get a transaction, only one is allowed at a time # First get a transaction, only one is allowed at a time
...@@ -543,6 +573,7 @@ class Application(object): ...@@ -543,6 +573,7 @@ class Application(object):
self.local_var.txn = transaction self.local_var.txn = transaction
@profiler_decorator
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
"""Store object.""" """Store object."""
if transaction is not self.local_var.txn: if transaction is not self.local_var.txn:
...@@ -586,6 +617,7 @@ class Application(object): ...@@ -586,6 +617,7 @@ class Application(object):
self._waitAnyMessage(False) self._waitAnyMessage(False)
return None return None
@profiler_decorator
def _handleConflicts(self, tryToResolveConflict): def _handleConflicts(self, tryToResolveConflict):
result = [] result = []
append = result.append append = result.append
...@@ -616,6 +648,7 @@ class Application(object): ...@@ -616,6 +648,7 @@ class Application(object):
serials=(tid, serial), data=data) serials=(tid, serial), data=data)
return result return result
@profiler_decorator
def waitStoreResponses(self, tryToResolveConflict): def waitStoreResponses(self, tryToResolveConflict):
result = [] result = []
append = result.append append = result.append
...@@ -654,6 +687,7 @@ class Application(object): ...@@ -654,6 +687,7 @@ class Application(object):
append((oid, tid)) append((oid, tid))
return result return result
@profiler_decorator
def tpc_vote(self, transaction, tryToResolveConflict): def tpc_vote(self, transaction, tryToResolveConflict):
"""Store current transaction.""" """Store current transaction."""
local_var = self.local_var local_var = self.local_var
...@@ -696,6 +730,7 @@ class Application(object): ...@@ -696,6 +730,7 @@ class Application(object):
return result return result
@profiler_decorator
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
"""Abort current transaction.""" """Abort current transaction."""
if transaction is not self.local_var.txn: if transaction is not self.local_var.txn:
...@@ -732,6 +767,7 @@ class Application(object): ...@@ -732,6 +767,7 @@ class Application(object):
conn.unlock() conn.unlock()
self.local_var.clear() self.local_var.clear()
@profiler_decorator
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
"""Finish current transaction.""" """Finish current transaction."""
if self.local_var.txn is not transaction: if self.local_var.txn is not transaction:
......
...@@ -21,6 +21,7 @@ from neo.locking import RLock ...@@ -21,6 +21,7 @@ from neo.locking import RLock
from neo.protocol import NodeTypes, Packets from neo.protocol import NodeTypes, Packets
from neo.connection import MTClientConnection from neo.connection import MTClientConnection
from neo.client.exception import ConnectionClosed from neo.client.exception import ConnectionClosed
from neo.profiling import profiler_decorator
class ConnectionPool(object): class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes.""" """This class manages a pool of connections to storage nodes."""
...@@ -36,6 +37,7 @@ class ConnectionPool(object): ...@@ -36,6 +37,7 @@ class ConnectionPool(object):
self.connection_lock_acquire = l.acquire self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release self.connection_lock_release = l.release
@profiler_decorator
def _initNodeConnection(self, node): def _initNodeConnection(self, node):
"""Init a connection to a given storage node.""" """Init a connection to a given storage node."""
addr = node.getAddress() addr = node.getAddress()
...@@ -78,6 +80,7 @@ class ConnectionPool(object): ...@@ -78,6 +80,7 @@ class ConnectionPool(object):
logging.info('Storage node %s not ready', node) logging.info('Storage node %s not ready', node)
return None return None
@profiler_decorator
def _dropConnections(self): def _dropConnections(self):
"""Drop connections.""" """Drop connections."""
for node_uuid, conn in self.connection_dict.items(): for node_uuid, conn in self.connection_dict.items():
...@@ -95,6 +98,7 @@ class ConnectionPool(object): ...@@ -95,6 +98,7 @@ class ConnectionPool(object):
finally: finally:
conn.unlock() conn.unlock()
@profiler_decorator
def _createNodeConnection(self, node): def _createNodeConnection(self, node):
"""Create a connection to a given storage node.""" """Create a connection to a given storage node."""
if len(self.connection_dict) > self.max_pool_size: if len(self.connection_dict) > self.max_pool_size:
...@@ -114,9 +118,11 @@ class ConnectionPool(object): ...@@ -114,9 +118,11 @@ class ConnectionPool(object):
conn.lock() conn.lock()
return conn return conn
@profiler_decorator
def getConnForCell(self, cell): def getConnForCell(self, cell):
return self.getConnForNode(cell.getNode()) return self.getConnForNode(cell.getNode())
@profiler_decorator
def getConnForNode(self, node): def getConnForNode(self, node):
"""Return a locked connection object to a given node """Return a locked connection object to a given node
If no connection exists, create a new one""" If no connection exists, create a new one"""
...@@ -136,6 +142,7 @@ class ConnectionPool(object): ...@@ -136,6 +142,7 @@ class ConnectionPool(object):
finally: finally:
self.connection_lock_release() self.connection_lock_release()
@profiler_decorator
def removeConnection(self, node): def removeConnection(self, node):
"""Explicitly remove connection when a node is broken.""" """Explicitly remove connection when a node is broken."""
self.connection_lock_acquire() self.connection_lock_acquire()
......
...@@ -28,6 +28,7 @@ from neo.util import dump ...@@ -28,6 +28,7 @@ from neo.util import dump
from neo.logger import PACKET_LOGGER from neo.logger import PACKET_LOGGER
from neo import attributeTracker from neo import attributeTracker
from neo.profiling import profiler_decorator
PING_DELAY = 5 PING_DELAY = 5
PING_TIMEOUT = 5 PING_TIMEOUT = 5
...@@ -312,6 +313,7 @@ class Connection(BaseConnection): ...@@ -312,6 +313,7 @@ class Connection(BaseConnection):
def getPeerId(self): def getPeerId(self):
return self.peer_id return self.peer_id
@profiler_decorator
def _getNextId(self): def _getNextId(self):
next_id = self.cur_id next_id = self.cur_id
self.cur_id = (next_id + 1) & 0xffffffff self.cur_id = (next_id + 1) & 0xffffffff
...@@ -405,6 +407,7 @@ class Connection(BaseConnection): ...@@ -405,6 +407,7 @@ class Connection(BaseConnection):
else: else:
handler.connectionClosed(self) handler.connectionClosed(self)
@profiler_decorator
def _recv(self): def _recv(self):
"""Receive data from a connector.""" """Receive data from a connector."""
try: try:
...@@ -430,6 +433,7 @@ class Connection(BaseConnection): ...@@ -430,6 +433,7 @@ class Connection(BaseConnection):
# unhandled connector exception # unhandled connector exception
raise raise
@profiler_decorator
def _send(self): def _send(self):
"""Send data to a connector.""" """Send data to a connector."""
if not self.write_buf: if not self.write_buf:
...@@ -457,6 +461,7 @@ class Connection(BaseConnection): ...@@ -457,6 +461,7 @@ class Connection(BaseConnection):
self._closure() self._closure()
raise raise
@profiler_decorator
def _addPacket(self, packet): def _addPacket(self, packet):
"""Add a packet into the write buffer.""" """Add a packet into the write buffer."""
if self.connector is None: if self.connector is None:
...@@ -479,6 +484,7 @@ class Connection(BaseConnection): ...@@ -479,6 +484,7 @@ class Connection(BaseConnection):
self._addPacket(packet) self._addPacket(packet)
return msg_id return msg_id
@profiler_decorator
@not_closed @not_closed
def ask(self, packet, timeout=CRITICAL_TIMEOUT): def ask(self, packet, timeout=CRITICAL_TIMEOUT):
""" """
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.locking import Lock from neo.locking import Lock
from neo.profiling import profiler_decorator
EMPTY = {} EMPTY = {}
def giant_lock(func): def giant_lock(func):
...@@ -38,6 +39,7 @@ class Dispatcher: ...@@ -38,6 +39,7 @@ class Dispatcher:
self.lock_release = lock.release self.lock_release = lock.release
@giant_lock @giant_lock
@profiler_decorator
def dispatch(self, conn, msg_id, data): def dispatch(self, conn, msg_id, data):
"""Retrieve register-time provided queue, and put data in it.""" """Retrieve register-time provided queue, and put data in it."""
queue = self.message_table.get(id(conn), EMPTY).pop(msg_id, None) queue = self.message_table.get(id(conn), EMPTY).pop(msg_id, None)
...@@ -48,6 +50,7 @@ class Dispatcher: ...@@ -48,6 +50,7 @@ class Dispatcher:
return True return True
@giant_lock @giant_lock
@profiler_decorator
def register(self, conn, msg_id, queue): def register(self, conn, msg_id, queue):
"""Register an expectation for a reply.""" """Register an expectation for a reply."""
self.message_table.setdefault(id(conn), {})[msg_id] = queue self.message_table.setdefault(id(conn), {})[msg_id] = queue
...@@ -58,6 +61,7 @@ class Dispatcher: ...@@ -58,6 +61,7 @@ class Dispatcher:
except KeyError: except KeyError:
queue_dict[key] = 1 queue_dict[key] = 1
@profiler_decorator
def unregister(self, conn): def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock """ Unregister a connection and put fake packet in queues to unlock
threads excepting responses from that connection """ threads excepting responses from that connection """
...@@ -75,11 +79,13 @@ class Dispatcher: ...@@ -75,11 +79,13 @@ class Dispatcher:
notified_set.add(queue_id) notified_set.add(queue_id)
queue_dict[queue_id] -= 1 queue_dict[queue_id] -= 1
@profiler_decorator
def registered(self, conn): def registered(self, conn):
"""Check if a connection is registered into message table.""" """Check if a connection is registered into message table."""
return len(self.message_table.get(id(conn), EMPTY)) != 0 return len(self.message_table.get(id(conn), EMPTY)) != 0
@giant_lock @giant_lock
@profiler_decorator
def pending(self, queue): def pending(self, queue):
return not queue.empty() or self.queue_dict[id(queue)] > 0 return not queue.empty() or self.queue_dict[id(queue)] > 0
...@@ -19,6 +19,7 @@ from neo import logging ...@@ -19,6 +19,7 @@ from neo import logging
from neo.protocol import PacketMalformedError from neo.protocol import PacketMalformedError
from neo.util import dump from neo.util import dump
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.profiling import profiler_decorator
class PacketLogger(EventHandler): class PacketLogger(EventHandler):
""" Logger at packet level (for debugging purpose) """ """ Logger at packet level (for debugging purpose) """
...@@ -26,6 +27,7 @@ class PacketLogger(EventHandler): ...@@ -26,6 +27,7 @@ class PacketLogger(EventHandler):
def __init__(self): def __init__(self):
EventHandler.__init__(self, None) EventHandler.__init__(self, None)
@profiler_decorator
def dispatch(self, conn, packet, direction): def dispatch(self, conn, packet, direction):
"""This is a helper method to handle various packet types.""" """This is a helper method to handle various packet types."""
# default log message # default log message
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
from struct import pack, unpack, error, calcsize from struct import pack, unpack, error, calcsize
from socket import inet_ntoa, inet_aton from socket import inet_ntoa, inet_aton
from neo.profiling import profiler_decorator
from neo.util import Enum from neo.util import Enum
...@@ -263,6 +264,7 @@ class Packet(object): ...@@ -263,6 +264,7 @@ class Packet(object):
def getType(self): def getType(self):
return self.__class__ return self.__class__
@profiler_decorator
def encode(self): def encode(self):
""" Encode a packet as a string to send it over the network """ """ Encode a packet as a string to send it over the network """
content = self._body content = self._body
......
...@@ -10,9 +10,12 @@ from time import time ...@@ -10,9 +10,12 @@ from time import time
from neo.tests.functional import NEOCluster from neo.tests.functional import NEOCluster
from neo.client.Storage import Storage from neo.client.Storage import Storage
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from neo.profiling import PROFILING_ENABLED, profiler_decorator, \
profiler_report
def runImport(neo, datafs): def runImport(neo, datafs):
@profiler_decorator
def _copyTransactionsFrom(self, other): def _copyTransactionsFrom(self, other):
""" taken from ZODB.BaseStorage that build stat during import """ """ taken from ZODB.BaseStorage that build stat during import """
def inc(d): def inc(d):
...@@ -182,6 +185,9 @@ if __name__ == "__main__": ...@@ -182,6 +185,9 @@ if __name__ == "__main__":
summary, report = buildReport(config, *runImport(neo, datafs)) summary, report = buildReport(config, *runImport(neo, datafs))
neo.stop() neo.stop()
if PROFILING_ENABLED:
print profiler_report()
# display and/or send the report # display and/or send the report
print summary print summary
print report print report
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment