Commit 75982493 authored by Brenden Blanco's avatar Brenden Blanco

Add per-cpu parameters to buffered perf output

Signed-off-by: default avatarBrenden Blanco <bblanco@plumgrid.com>
parent d0daf6a4
...@@ -9,29 +9,33 @@ ...@@ -9,29 +9,33 @@
import atexit import atexit
from bcc import BPF from bcc import BPF
import ctypes import ctypes
import multiprocessing
counter = 0 counter = 0
def cb(foo, data, size): def cb(cookie, data, size):
global counter global counter
counter += 1 counter += 1
prog = """ prog = """
BPF_PERF_ARRAY(events, 2); BPF_PERF_ARRAY(events, NUMCPU);
BPF_TABLE("array", int, u64, counters, 10); BPF_TABLE("array", int, u64, counters, 10);
int kprobe__sys_write(void *ctx) { int kprobe__sys_write(void *ctx) {
struct { struct {
u64 ts; u64 ts;
} data = {bpf_ktime_get_ns()}; } data = {bpf_ktime_get_ns()};
if (events.perf_output(ctx, 0, &data, sizeof(data)) < 0) int rc;
bpf_trace_printk("perf_output failed\\n"); if ((rc = events.perf_output(ctx, bpf_get_smp_processor_id(), &data, sizeof(data))) < 0)
bpf_trace_printk("perf_output failed: %d\\n", rc);
int zero = 0; int zero = 0;
u64 *val = counters.lookup(&zero); u64 *val = counters.lookup(&zero);
if (val) lock_xadd(val, 1); if (val) lock_xadd(val, 1);
return 0; return 0;
} }
""" """
numcpu = multiprocessing.cpu_count()
prog = prog.replace("NUMCPU", str(numcpu))
b = BPF(text=prog) b = BPF(text=prog)
b["events"].open_perf_buffer(0, cb, None) b["events"].open_perf_buffers(cb, None)
@atexit.register @atexit.register
def print_counter(): def print_counter():
...@@ -39,5 +43,7 @@ def print_counter(): ...@@ -39,5 +43,7 @@ def print_counter():
global b global b
print("counter = %d vs %d" % (counter, b["counters"][ctypes.c_int(0)].value)) print("counter = %d vs %d" % (counter, b["counters"][ctypes.c_int(0)].value))
print("Tracing sys_write, try `dd if=/dev/zero of=/dev/null`")
print("Tracing... Hit Ctrl-C to end.")
while 1: while 1:
b.kprobe_poll() b.kprobe_poll()
...@@ -178,7 +178,7 @@ int bpf_attach_socket(int sock, int prog) { ...@@ -178,7 +178,7 @@ int bpf_attach_socket(int sock, int prog) {
static int bpf_attach_tracing_event(int progfd, const char *event_path, static int bpf_attach_tracing_event(int progfd, const char *event_path,
struct perf_reader *reader, int pid, int cpu, int group_fd) { struct perf_reader *reader, int pid, int cpu, int group_fd) {
int efd = -1, rc = -1, pfd; int efd = -1, pfd;
ssize_t bytes; ssize_t bytes;
char buf[256]; char buf[256];
struct perf_event_attr attr = {}; struct perf_event_attr attr = {};
...@@ -187,13 +187,13 @@ static int bpf_attach_tracing_event(int progfd, const char *event_path, ...@@ -187,13 +187,13 @@ static int bpf_attach_tracing_event(int progfd, const char *event_path,
efd = open(buf, O_RDONLY, 0); efd = open(buf, O_RDONLY, 0);
if (efd < 0) { if (efd < 0) {
fprintf(stderr, "open(%s): %s\n", buf, strerror(errno)); fprintf(stderr, "open(%s): %s\n", buf, strerror(errno));
goto cleanup; goto error;
} }
bytes = read(efd, buf, sizeof(buf)); bytes = read(efd, buf, sizeof(buf));
if (bytes <= 0 || bytes >= sizeof(buf)) { if (bytes <= 0 || bytes >= sizeof(buf)) {
fprintf(stderr, "read(%s): %s\n", buf, strerror(errno)); fprintf(stderr, "read(%s): %s\n", buf, strerror(errno));
goto cleanup; goto error;
} }
buf[bytes] = '\0'; buf[bytes] = '\0';
attr.config = strtol(buf, NULL, 0); attr.config = strtol(buf, NULL, 0);
...@@ -204,126 +204,128 @@ static int bpf_attach_tracing_event(int progfd, const char *event_path, ...@@ -204,126 +204,128 @@ static int bpf_attach_tracing_event(int progfd, const char *event_path,
pfd = syscall(__NR_perf_event_open, &attr, pid, cpu, group_fd, PERF_FLAG_FD_CLOEXEC); pfd = syscall(__NR_perf_event_open, &attr, pid, cpu, group_fd, PERF_FLAG_FD_CLOEXEC);
if (pfd < 0) { if (pfd < 0) {
perror("perf_event_open"); perror("perf_event_open");
goto cleanup; goto error;
} }
perf_reader_set_fd(reader, pfd); perf_reader_set_fd(reader, pfd);
if (perf_reader_mmap(reader, attr.type, attr.sample_type) < 0) if (perf_reader_mmap(reader, attr.type, attr.sample_type) < 0)
goto cleanup; goto error;
if (ioctl(pfd, PERF_EVENT_IOC_SET_BPF, progfd) < 0) { if (ioctl(pfd, PERF_EVENT_IOC_SET_BPF, progfd) < 0) {
perror("ioctl(PERF_EVENT_IOC_SET_BPF)"); perror("ioctl(PERF_EVENT_IOC_SET_BPF)");
goto cleanup; goto error;
} }
if (ioctl(pfd, PERF_EVENT_IOC_ENABLE, 0) < 0) { if (ioctl(pfd, PERF_EVENT_IOC_ENABLE, 0) < 0) {
perror("ioctl(PERF_EVENT_IOC_ENABLE)"); perror("ioctl(PERF_EVENT_IOC_ENABLE)");
goto cleanup; goto error;
} }
rc = 0; return 0;
cleanup: error:
if (efd >= 0) if (efd >= 0)
close(efd); close(efd);
return rc; return -1;
} }
void * bpf_attach_kprobe(int progfd, const char *event, void * bpf_attach_kprobe(int progfd, const char *event,
const char *event_desc, pid_t pid, const char *event_desc, pid_t pid,
int cpu, int group_fd, perf_reader_cb cb, int cpu, int group_fd, perf_reader_cb cb,
void *cb_cookie) { void *cb_cookie) {
int rc = -1, kfd = -1; int kfd = -1;
char buf[256]; char buf[256];
struct perf_reader *reader = NULL; struct perf_reader *reader = NULL;
reader = perf_reader_new(cb, NULL, cb_cookie); reader = perf_reader_new(cb, NULL, cb_cookie);
if (!reader) if (!reader)
goto cleanup; goto error;
kfd = open("/sys/kernel/debug/tracing/kprobe_events", O_WRONLY | O_APPEND, 0); kfd = open("/sys/kernel/debug/tracing/kprobe_events", O_WRONLY | O_APPEND, 0);
if (kfd < 0) { if (kfd < 0) {
perror("open(kprobe_events)"); perror("open(kprobe_events)");
goto cleanup; goto error;
} }
if (write(kfd, event_desc, strlen(event_desc)) < 0) { if (write(kfd, event_desc, strlen(event_desc)) < 0) {
fprintf(stderr, "write of \"%s\" into kprobe_events failed: %s\n", event_desc, strerror(errno)); fprintf(stderr, "write of \"%s\" into kprobe_events failed: %s\n", event_desc, strerror(errno));
if (errno == EINVAL) if (errno == EINVAL)
fprintf(stderr, "check dmesg output for possible cause\n"); fprintf(stderr, "check dmesg output for possible cause\n");
goto cleanup; goto error;
} }
snprintf(buf, sizeof(buf), "/sys/kernel/debug/tracing/events/kprobes/%s", event); snprintf(buf, sizeof(buf), "/sys/kernel/debug/tracing/events/kprobes/%s", event);
rc = bpf_attach_tracing_event(progfd, buf, reader, pid, cpu, group_fd); if (bpf_attach_tracing_event(progfd, buf, reader, pid, cpu, group_fd) < 0)
goto error;
cleanup: return reader;
error:
if (kfd >= 0) if (kfd >= 0)
close(kfd); close(kfd);
if (reader && rc < 0) { if (reader)
perf_reader_free(reader); perf_reader_free(reader);
reader = NULL;
}
return reader; return NULL;
} }
int bpf_detach_kprobe(const char *event_desc) { int bpf_detach_kprobe(const char *event_desc) {
int rc = -1, kfd = -1; int kfd = -1;
kfd = open("/sys/kernel/debug/tracing/kprobe_events", O_WRONLY | O_APPEND, 0); kfd = open("/sys/kernel/debug/tracing/kprobe_events", O_WRONLY | O_APPEND, 0);
if (kfd < 0) { if (kfd < 0) {
perror("open(kprobe_events)"); perror("open(kprobe_events)");
goto cleanup; goto error;
} }
if (write(kfd, event_desc, strlen(event_desc)) < 0) { if (write(kfd, event_desc, strlen(event_desc)) < 0) {
perror("write(kprobe_events)"); perror("write(kprobe_events)");
goto cleanup; goto error;
} }
rc = 0;
cleanup: return 0;
error:
if (kfd >= 0) if (kfd >= 0)
close(kfd); close(kfd);
return rc; return -1;
} }
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie) { void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid, int cpu) {
int rc = -1, pfd; int pfd;
struct perf_event_attr attr = {}; struct perf_event_attr attr = {};
struct perf_reader *reader = NULL;
struct perf_reader *reader = perf_reader_new(NULL, raw_cb, cb_cookie); reader = perf_reader_new(NULL, raw_cb, cb_cookie);
if (!reader) if (!reader)
goto cleanup; goto error;
attr.config = PERF_COUNT_SW_BPF_OUTPUT; attr.config = PERF_COUNT_SW_BPF_OUTPUT;
attr.type = PERF_TYPE_SOFTWARE; attr.type = PERF_TYPE_SOFTWARE;
attr.sample_type = PERF_SAMPLE_RAW; attr.sample_type = PERF_SAMPLE_RAW;
pfd = syscall(__NR_perf_event_open, &attr, -1, 0, -1, PERF_FLAG_FD_CLOEXEC); attr.sample_period = 1;
attr.wakeup_events = 1;
pfd = syscall(__NR_perf_event_open, &attr, pid, cpu, -1, PERF_FLAG_FD_CLOEXEC);
if (pfd < 0) { if (pfd < 0) {
perror("perf_event_open"); perror("perf_event_open");
goto cleanup; goto error;
} }
perf_reader_set_fd(reader, pfd); perf_reader_set_fd(reader, pfd);
if (perf_reader_mmap(reader, attr.type, attr.sample_type) < 0) if (perf_reader_mmap(reader, attr.type, attr.sample_type) < 0)
goto cleanup; goto error;
if (ioctl(pfd, PERF_EVENT_IOC_ENABLE, 0) < 0) { if (ioctl(pfd, PERF_EVENT_IOC_ENABLE, 0) < 0) {
perror("ioctl(PERF_EVENT_IOC_ENABLE)"); perror("ioctl(PERF_EVENT_IOC_ENABLE)");
goto cleanup; goto error;
} }
rc = 0; return reader;
cleanup: error:
if (reader && rc < 0) { if (reader)
perf_reader_free(reader); perf_reader_free(reader);
reader = NULL;
}
return reader; return NULL;
} }
...@@ -48,6 +48,7 @@ void * bpf_attach_kprobe(int progfd, const char *event, const char *event_desc, ...@@ -48,6 +48,7 @@ void * bpf_attach_kprobe(int progfd, const char *event, const char *event_desc,
int pid, int cpu, int group_fd, perf_reader_cb cb, int pid, int cpu, int group_fd, perf_reader_cb cb,
void *cb_cookie); void *cb_cookie);
int bpf_detach_kprobe(const char *event_desc); int bpf_detach_kprobe(const char *event_desc);
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid, int cpu);
#define LOG_BUF_SIZE 65536 #define LOG_BUF_SIZE 65536
extern char bpf_log_buf[LOG_BUF_SIZE]; extern char bpf_log_buf[LOG_BUF_SIZE];
......
...@@ -18,6 +18,7 @@ from collections import MutableMapping ...@@ -18,6 +18,7 @@ from collections import MutableMapping
import ctypes as ct import ctypes as ct
import fcntl import fcntl
import json import json
import multiprocessing
import os import os
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
import sys import sys
...@@ -95,7 +96,7 @@ lib.bpf_attach_kprobe.argtypes = [ct.c_int, ct.c_char_p, ct.c_char_p, ct.c_int, ...@@ -95,7 +96,7 @@ lib.bpf_attach_kprobe.argtypes = [ct.c_int, ct.c_char_p, ct.c_char_p, ct.c_int,
lib.bpf_detach_kprobe.restype = ct.c_int lib.bpf_detach_kprobe.restype = ct.c_int
lib.bpf_detach_kprobe.argtypes = [ct.c_char_p] lib.bpf_detach_kprobe.argtypes = [ct.c_char_p]
lib.bpf_open_perf_buffer.restype = ct.c_void_p lib.bpf_open_perf_buffer.restype = ct.c_void_p
lib.bpf_open_perf_buffer.argtypes = [_RAW_CB_TYPE, ct.py_object] lib.bpf_open_perf_buffer.argtypes = [_RAW_CB_TYPE, ct.py_object, ct.c_int, ct.c_int]
lib.perf_reader_poll.restype = ct.c_int lib.perf_reader_poll.restype = ct.c_int
lib.perf_reader_poll.argtypes = [ct.c_int, ct.POINTER(ct.c_void_p), ct.c_int] lib.perf_reader_poll.argtypes = [ct.c_int, ct.POINTER(ct.c_void_p), ct.c_int]
lib.perf_reader_free.restype = None lib.perf_reader_free.restype = None
...@@ -148,6 +149,7 @@ class BPF(object): ...@@ -148,6 +149,7 @@ class BPF(object):
self.Key = keytype self.Key = keytype
self.Leaf = leaftype self.Leaf = leaftype
self.ttype = lib.bpf_table_type_id(self.bpf.module, self.map_id) self.ttype = lib.bpf_table_type_id(self.bpf.module, self.map_id)
self._cbs = {}
def key_sprintf(self, key): def key_sprintf(self, key):
key_p = ct.pointer(key) key_p = ct.pointer(key)
...@@ -185,20 +187,42 @@ class BPF(object): ...@@ -185,20 +187,42 @@ class BPF(object):
raise Exception("Could not scanf leaf") raise Exception("Could not scanf leaf")
return leaf return leaf
def open_perf_buffer(self, key, cb, cookie): def open_perf_buffers(self, cb, cookie):
reader = lib.bpf_open_perf_buffer(_RAW_CB_TYPE(cb), """open_perf_buffers(cb, cookie)
ct.cast(id(cookie), ct.py_object))
Opens ring buffers, one for each cpu, to receive custom perf event
data from the bpf program. The program is expected to use the cpu-id
as the key of the perf_output call.
"""
for i in range(0, multiprocessing.cpu_count()):
self.open_perf_buffer(i, cb, cookie, cpu=i)
def open_perf_buffer(self, key, cb, cookie, pid=-1, cpu=0):
"""open_perf_buffer(key, cb, cookie, pid=-1, cpu=0)
Open a ring buffer to receive custom perf event data from the bpf
program. The callback cb is invoked for each event submitted, which
can be up to millions of events per second. The signature of cb
should be cb(cookie, data, data_size).
"""
fn = _RAW_CB_TYPE(lambda x, data, size: cb(cookie, data, size))
reader = lib.bpf_open_perf_buffer(fn, None, pid, cpu)
if not reader: if not reader:
raise Exception("Could not open perf buffer") raise Exception("Could not open perf buffer")
fd = lib.perf_reader_fd(reader) fd = lib.perf_reader_fd(reader)
self[self.Key(key)] = self.Leaf(fd) self[self.Key(key)] = self.Leaf(fd)
open_kprobes[(id(self), key)] = reader open_kprobes[(id(self), key)] = reader
# keep a refcnt
self._cbs[key] = (fn, cookie)
def close_perf_buffer(self, key): def close_perf_buffer(self, key):
reader = open_kprobes.get((id(self), key)) reader = open_kprobes.get((id(self), key))
if reader: if reader:
lib.perf_reader_free(reader) lib.perf_reader_free(reader)
del(open_kprobes[(id(self), key)]) del(open_kprobes[(id(self), key)])
del self._cbs[key]
def __getitem__(self, key): def __getitem__(self, key):
key_p = ct.pointer(key) key_p = ct.pointer(key)
...@@ -810,10 +834,10 @@ class BPF(object): ...@@ -810,10 +834,10 @@ class BPF(object):
Poll from the ring buffers for all of the open kprobes, calling the Poll from the ring buffers for all of the open kprobes, calling the
cb() that was given in the BPF constructor for each entry. cb() that was given in the BPF constructor for each entry.
""" """
readers = (ct.c_void_p * len(open_kprobes))()
for i, v in enumerate(open_kprobes.values()):
readers[i] = v
try: try:
readers = (ct.c_void_p * len(open_kprobes))()
for i, v in enumerate(open_kprobes.values()):
readers[i] = v
lib.perf_reader_poll(len(open_kprobes), readers, timeout) lib.perf_reader_poll(len(open_kprobes), readers, timeout)
except KeyboardInterrupt: except KeyboardInterrupt:
exit() exit()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment