Commit 049f6412 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 777befa0
import errno
import json
import logging
import os
from dateutil import parser
from .util import JSONPromise
from .util import tail_file
import re
from .util import JSONPromise, get_json_log_data_interval
from zope.interface import implementer
from slapos.grid.promise import interface
......@@ -16,26 +10,86 @@ class RunPromise(JSONPromise):
super(RunPromise, self).__init__(config)
self.setPeriodicity(minute=1)
self.amarisoft_rf_info_log = self.getConfig('amarisoft-rf-info-log')
# self.stats_period = int(self.getConfig('stats-period'))
self.stats_period = int(self.getConfig('stats-period'))
self.testing = self.getConfig('testing') == "True"
self.sdr_devchan = "/dev/sdr%s@%s" % (self.getConfig('sdr_dev'), self.getConfig('sfp_port'))
def sense(self):
if self.testing:
self.logger.info("skipping promise")
return
last_line = tail_file(self.amarisoft_rf_info_log)
if "CPRI" not in last_line:
self.logger.info("No CPRI feature")
else:
if "HW" in last_line and "SW" in last_line:
self.logger.info("CPRI locked")
else:
if "HW" not in last_line:
self.logger.error("HW Lock is missing")
if "SW" not in last_line:
self.logger.error("SW Lock is missing")
def error(msg): self.logger.error("%s: %s", self.sdr_devchan, msg)
def info(msg): self.logger.info ("%s: %s", self.sdr_devchan, msg)
data_list = get_json_log_data_interval(self.amarisoft_rf_info_log, self.stats_period * 2)
if len(data_list) < 1:
error("rf_info: stale data")
return
rf_info_text = data_list[0]['rf_info']
rf_info = self._parse_rf_info(rf_info_text)
if self.sdr_devchan not in rf_info:
error("rf_info: no device entry")
return
rf_info = rf_info[self.sdr_devchan]
icpri = rf_info.get('CPRI_option')
if icpri is None:
error("no CPRI feature")
return
hw = ("HW" in icpri)
sw = ("SW" in icpri)
if not hw:
error("HW Lock is missing")
if not sw:
error("SW Lock is missing")
if hw and sw:
info("CPRI locked")
@staticmethod
def _parse_rf_info(rf_info_text): # -> {} /dev/sdrX@Y -> {key: value}
"""_parse_rf_info parses rf_info output into per-SDR-device key->value dictionaries.
For example:
TRX SDR driver 2023-09-07, API v15/18
PCIe CPRI /dev/sdr1@2:
FPGA vccint: 0.98 V
FPGA vccaux: 1.77 V
PCIe CPRI /dev/sdr3@4:
ABC: 123
DEF: 4567
is parsed as {'/dev/sdr1@2': {'FPGA vccint': '0.98 V', 'FPGA vccaux': '1.77 V'},
'/dev/sdr3@4': {'ABC': '123', 'DEF': '4567'}}
"""
rf_info = {}
cur = None
for l in rf_info_text.splitlines():
if not l.startswith(' '): # possibly start of new /dev entry
cur = None
m = re.search(r' (/dev/sdr[^\s]+):$', l)
if m is None: # not so - ignore the line
continue
cur = {}
sdr_devchan = m.group(1)
rf_info[sdr_devchan] = cur
continue
# indented line - it populates current if it still holds its context
if cur is None:
continue
k, v = l.split(':', 1)
k = k.strip()
v = v.strip()
cur[k] = v
return rf_info
def test(self):
"""
Called after sense() if the instance is still converging.
......
......@@ -28,6 +28,7 @@
import mock
import os
import time
import json
from datetime import datetime
from datetime import timedelta
from slapos.grid.promise import PromiseError
......@@ -41,6 +42,7 @@ class TestCheckCpriLock(TestPromisePluginMixin):
def setUp(self):
super(TestCheckCpriLock, self).setUp()
self.amarisoft_rf_info_log = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'amarisoft_rf_info.json.log')
rf_info = \
"""
......@@ -78,42 +80,51 @@ PCIe CPRI /dev/sdr3@1:
DMA0: RX fifo: 66.67us Usage=16/32768 (0%)
DMA0 Underflows: 0
DMA0 Overflows: 0
PCIe SDR /dev/sdr4@0:
AAA: bbb
"""
data = {'message': 'rf', 'rf_info': rf_info}
self.rf_info_data = {'message': 'rf', 'rf_info': rf_info}
self.amarisoft_rf_info_log = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'amarisoft_rf_info.json.log')
def writeLog(self, data):
with open(self.amarisoft_rf_info_log, 'w') as f:
f.write(
"""
{"time": "%s", "log_level": "INFO", "message": "RF info", "data": %s}
""" % ((datetime.now() - timedelta(seconds=5)).strftime("%Y-%m-%d %H:%M:%S")[:-3], data))
"""{"time": "%s", "log_level": "INFO", "message": "RF info", "data": %s}""" %
((datetime.now() - timedelta(seconds=5)).strftime("%Y-%m-%d %H:%M:%S")[:-3], json.dumps(data)))
def writePromise(self, **kw):
kw.update({'amarisoft-rf-info-log': self.amarisoft_rf_info_log})
kw.update({'amarisoft-rf-info-log': self.amarisoft_rf_info_log,
'stats-period': 100})
super(TestCheckCpriLock, self).writePromise(self.promise_name,
"from %s import %s\nextra_config_dict = %r\n"
% (RunPromise.__module__, RunPromise.__name__, kw))
def test_locked_ok(self):
self.writeLog(self.rf_info_data)
self.writePromise(sdr_dev='3', sfp_port='1')
self.configureLauncher()
self.launcher.run()
def test_no_lock(self):
self.writeLog(self.rf_info_data)
self.writePromise(sdr_dev='2', sfp_port='1')
self.configureLauncher()
with self.assertRaises(PromiseError):
with self.assertRaisesRegex(PromiseError, r'(?m)HW Lock is missing\n.*SW Lock is missing'):
self.launcher.run()
# XXX check output
def test_no_cpri(self):
def test_no_device(self):
self.writeLog(self.rf_info_data)
self.writePromise(sdr_dev='1', sfp_port='0')
self.configureLauncher()
with self.assertRaises(PromiseError):
with self.assertRaisesRegex(PromiseError, 'no device entry'):
self.launcher.run()
# XXX check output
def test_no_cpri_entry(self):
self.writeLog(self.rf_info_data)
self.writePromise(sdr_dev='4', sfp_port='0')
self.configureLauncher()
with self.assertRaisesRegex(PromiseError, 'no CPRI feature'):
self.launcher.run()
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment