Commit 1d06b719 authored by Ivan Tyagov's avatar Ivan Tyagov

Initial import of Levin's work.

parent 2e0960ca
#!//usr/bin/python
"""
Basic OPC UA <-> HTTP gateway server.
"""
import sys
import asyncio
import asyncua
from dataclasses import dataclass, field
import json
import requests
import urllib
import argparse
import logging
# #################################
# Configure
parser = argparse.ArgumentParser(description='Run OPCUA Server.')
a = parser.add_argument
a('--ip', help='The IP address on which the OPCUA Server runs', default="127.0.0.1")
a('--port', help='The port on which the OPCUA Server runs', default="4840")
a('--xml', help='Path of XML to configure Server. See asyncua doc for more details.', default=None)
a('--erp5-ip', help='IP of ERP5 instance to which data shall be send.', default=None)
a('--erp5-port', help='Port of ERP5 instance to which data shall be send.', default=None)
args = parser.parse_args()
ip, port, xml, erp5_ip, erp5_port = args.ip, args.port, args.xml, args.erp5_ip, args.erp5_port
# #################################
# Define ERP5Handler
@dataclass(frozen=True)
class ERP5Handler(asyncua.common.subscription.SubHandler):
ip: str
port: str
session: requests.Session = field(default_factory=requests.Session)
@property
def uri(self):
return f"http://{self.ip}:{self.port}/erp5/opcua_test"
def call(self, **data):
params = urllib.parse.quote_plus(json.dumps({k: str(v) for k, v in data.items()}))
self.session.post(f"{self.uri}?data={params}")
def datachange_notification(self, node, val, data):
self.call(node=node, val=val, data=data)
def event_notification(self, event):
self.call(event=event)
erp5_handler = None
if erp5_ip is not None and erp5_port is not None:
erp5_handler = ERP5Handler(erp5_ip, erp5_port)
class InternalSession(asyncua.server.internal_session.InternalSession):
async def read(self, params):
erp5_handler.call(params=params)
return await super().read(params)
# #################################
# Start OPCUA Server
async def main():
_logger = logging.getLogger(__name__)
# setup our server
server = asyncua.Server()
await server.init()
server.set_endpoint(f"opc.tcp://{ip}:{port}/freeopcua/server/")
if xml is not None:
await server.import_xml(xml)
if erp5_handler:
subscription = await server.create_subscription(1000, erp5_handler)
await subscription.subscribe_events()
nodes = await asyncua.common.ua_utils.get_nodes_of_namespace(server)
await subscription.subscribe_data_change(nodes)
def create_session(
name, user=asyncua.server.users.User(role=asyncua.server.users.UserRole.Anonymous), external=False
):
self = server.iserver
return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
server.iserver.create_session = create_session
_logger.info("Added subscription for erp5 handler.")
_logger.info("Starting server!")
async with server:
while True:
await asyncio.sleep(1)
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(), debug=True)
import __main__
if __name__ == '__main__':
sys.exit(__main__.main())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment