Commit 7ed60ee7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ec6b7bd5
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""pyruntraced ... - run python ... with NEO tracepoints attached
This program runs python code with tracepoints equivalent to NEO/go activated
on NEO/py side.
Whenever a tracepoint is reached, attached probe sends corresponding trace event
to driving process and awaits further commands from it. Commands could be:
- to execute arbitrary python code, thus allowing program state inspection, or
- to continue original code execution.
Overall this allows Go driver process to test several concurrent child
subprocesses via tracetest, the same way to how it is done in pure NEO/go.
Implementation notes
--------------------
The channel for tracing communication in between driver and traced process is
passed as opened file descriptor by driver, and its number is specified via
$PYRUNTRACED_FD environment variable.
The protocol in between driver and traced process is as follows (from the point
of view of traced process):
> tid T eventname event # event happenned on a thread
< tid C # driver tells tracee to continue execution
< tid !... # driver asks tracee to evaluate ... in context where probe is attached
> tid E problem # tracee reports result as exception
> tid R result # tracee reports result
eventname is just regulat string
event, problem, result are JSON-encoded objects.
Fork is not allowed in order not to confuse the driver.
"""
from functools import wraps
import json
import thread, threading
# Tracer implements synchronous tracing in compatible way to tracetest on
# NEO/go side.
#
# See top-level module description for details.
class Tracer(object):
def __init__(self, ftrace):
self.ftrace = ftrace # file object to communicate with tracer
self.txlock = threading.Lock() # serializes writes to .ftrace
self.rxtab = {} # {} tid -> RxQueue
self.rxlock = threading.Lock() # serializes access to .rxtab
self.trecv = threading.Thread(target=self._serve_recv)
self.trecv.daemon = True # XXX better to gracefully stop it?
self.trecv.start()
# _send sends 1 line to tracer from current thread.
def _send(self, line):
assert '\n' not in line
tid = thread.get_ident()
self.txlock.acquire()
try:
self.ftrace.write(('%d ' % tid) + line + '\n')
self.ftrace.flush()
finally:
self.txlock.release()
# _serve_recv receives lines from .ftrace and multiplexes them to
# destination threads RX queues.
#
# runs in a dedicated thread.
def _serve_recv(self):
while 1:
line = self.ftrace.readline()
# tid SP rest \n
tid, line = line.split(None, 1)
line = line.rstrip('\n')
tid = int(tid)
self.rxlock.acquire()
rxq = self.rxtab.get(tid)
if rxq is None:
self.rxtab[tid] = rxq = RxQueue()
rxq.lineq.append(line)
rxq.event.set()
self.rxlock.release()
# _recv receives 1 line from tracer for current thread.
def _recv(self):
tid = thread.get_ident()
while 1:
self.rxlock.acquire()
rxq = self.rxtab.get(tid)
if rxq is None:
self.rxtab[tid] = rxq = RxQueue()
if len(rxq.lineq) > 0:
# data was already queued for us
line = rxq.lineq.pop(0)
self.rxlock.release()
return line
# there is no data - we have to wait for it
rxq.event.clear()
self.rxlock.release()
rxq.event.wait()
# woke up -> retry to dequeue data
# trace1 handles 1 trace event.
#
# it sends the event to tracer and awaits commands from it to either
# inspect current state or continue execution.
def trace1(self, eventname, event):
# send trace event
evstr = json.dumps(event)
assert '\n' not in evstr
self._send('T %s %s' % (eventname, evstr))
# wait for commands
# ! ... - eval python code
# C - continue
while 1:
line = self._recv()
if len(line) == 0 or line[0] not in "!C":
raise RuntimeError("trace1: got invalid line from driver: %r" % line)
if line[0] == 'C':
return # probe finishes - continue execution of original code
# eval python in context of probed function
try:
r = eval(line[1:], g, l)
except Exception as e:
self._send('E %s' % json.dumps(str(e)))
else:
self._send('R %s' % json.dumps(r))
# RxQueue represents receive queue for 1 thread
class RxQueue(object):
def __init__(self):
self.lineq = [] # [] of lines received
self.event = threading.Event() # sender signals consumer there is new data
# gtracer is the global tracer object
gtracer = None # Tracer
# trace_entry marks entry to func with tracepoint and attaches probe to run XXX
def trace_entry(func, eventname):
klass = func.im_class
fname = func.im_func.func_name
def deco(f):
@wraps(func)
def probe(self, *args, **kw):
event = f(self, *args, **kw)
gtracer.trace1(eventname, event)
return func(self, *args, **kw)
setattr(klass, fname, probe)
return deco
# trace events
from neo.lib.connection import Connection
import socket
# sk_addr converts socket address tuple for family to string
def sk_addr(addr, family):
if family == socket.AF_INET:
host, port = addr
return '%s:%s' % (host, port)
else:
raise RuntimError('sk_addr: TODO: %s' % family)
def sk_localaddr(sk):
return sk_addr( sk.getsockname(), sk.family )
def sk_remoteaddr(sk):
return sk_addr( sk.getpeername(), sk.family )
@trace_entry(Connection._addPacket, 'MsgSendPre')
def _(self, packet):
sk = self.connector.socket
pkth, data = packet.encode()
return {'src': sk_localaddr(sk),
'dst': sk_remoteaddr(sk),
# XXX pkt goes as quoted to avoid UTF-8 decode problem on json.dump
'pktq': `pkth+data`}
#'pktq': (pkth+data).encode('hex')}
# ----------------------------------------
import os, sys, code, runpy
# main mimics `python ...`,
# but with tracepoints already attached.
def main():
# setup global tracer
global gtracer
fdtrace = os.getenv("PYRUNTRACED_FD")
if fdtrace is None:
print >>sys.stderr, "$PYRUNTRACED_FD must be set"
sys.exit(1)
fdtrace = int(fdtrace)
ftrace = os.fdopen(fdtrace, 'r+')
gtracer = Tracer(ftrace)
# forbid fork
def nofork(): raise RuntimeError('pyruntraced: fork forbidden')
os.fork = nofork
os.forkpty = nofork
# now mimic `python ...`
argv = sys.argv[1:]
# interactive console
if not argv:
code.interact()
return
# -c command
if argv[0] == '-c':
sys.argv = argv[0:1] + argv[2:] # python leaves '-c' as argv[0]
# exec with the same globals `python -c ...` does
g = {'__name__': '__main__',
'__doc__': None,
'__package__': None}
exec argv[1] in g
# -m module
elif argv[0] == '-m':
# search sys.path for module and run corresponding .py file as script
sys.argv = argv[1:]
runpy.run_module(sys.argv[0], init_globals={'__doc__': None}, run_name='__main__')
elif argv[0].startswith('-'):
print >>sys.stderr, "invalid option '%s'" % argv[0]
sys.exit(2)
# file
else:
sys.argv = argv
filepath = argv[0]
# exec with same globals `python file.py` does
# XXX use runpy.run_path() instead?
g = {'__name__': '__main__',
'__file__': filepath,
'__doc__': None,
'__package__': None}
execfile(filepath, g)
return
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment