Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
95913557
Commit
95913557
authored
Feb 18, 2019
by
oroulet
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update some examples, make them work and add several missing async methods
parent
7b2538d0
Changes
15
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
155 additions
and
171 deletions
+155
-171
examples/client-example.py
examples/client-example.py
+8
-10
examples/client-minimal.py
examples/client-minimal.py
+9
-37
examples/client-subscription.py
examples/client-subscription.py
+9
-16
examples/server-example.py
examples/server-example.py
+24
-27
examples/server-minimal.py
examples/server-minimal.py
+4
-1
opcua/client/client.py
opcua/client/client.py
+4
-10
opcua/client/ua_client.py
opcua/client/ua_client.py
+36
-8
opcua/common/node.py
opcua/common/node.py
+20
-21
opcua/common/shortcuts.py
opcua/common/shortcuts.py
+1
-0
opcua/common/subscription.py
opcua/common/subscription.py
+5
-5
opcua/server/address_space.py
opcua/server/address_space.py
+2
-2
opcua/server/internal_server.py
opcua/server/internal_server.py
+5
-4
opcua/server/internal_subscription.py
opcua/server/internal_subscription.py
+19
-20
opcua/server/subscription_service.py
opcua/server/subscription_service.py
+6
-7
tests/test_subscriptions.py
tests/test_subscriptions.py
+3
-3
No files found.
examples/client-example.py
View file @
95913557
...
...
@@ -24,9 +24,8 @@ class SubHandler(object):
print
(
"New event"
,
event
)
async
def
task
(
loop
):
url
=
"opc.tcp://commsvr.com:51234/UA/CAS_UA_Server"
# url = "opc.tcp://localhost:4840/freeopcua/server/"
async
def
run
():
url
=
"opc.tcp://localhost:4840/freeopcua/server/"
try
:
async
with
Client
(
url
=
url
)
as
client
:
root
=
client
.
get_root_node
()
...
...
@@ -37,6 +36,9 @@ async def task(loop):
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger
.
info
(
"Children of root are: %r"
,
await
root
.
get_children
())
uri
=
"http://examples.freeopcua.github.io"
idx
=
await
client
.
get_namespace_index
(
uri
)
_logger
.
info
(
"index of our namespace is %s"
,
idx
)
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
...
...
@@ -63,18 +65,14 @@ async def task(loop):
# await sub.delete()
# calling a method on server
res
=
obj
.
call_method
(
"2:multiply"
,
3
,
"klk"
)
res
=
await
obj
.
call_method
(
"2:multiply"
,
3
,
"klk"
)
_logger
.
info
(
"method result is: %r"
,
res
)
except
Exception
:
_logger
.
exception
(
'error'
)
def
main
()
:
if
__name__
==
"__main__"
:
loop
=
asyncio
.
get_event_loop
()
loop
.
set_debug
(
True
)
loop
.
run_until_complete
(
task
(
loop
))
loop
.
close
()
loop
.
run_until_complete
(
run
())
if
__name__
==
"__main__"
:
main
()
examples/client-minimal.py
View file @
95913557
import
asyncio
import
sys
sys
.
path
.
insert
(
0
,
".."
)
import
logging
from
opcua
import
Client
,
Node
,
ua
...
...
@@ -6,35 +8,7 @@ logging.basicConfig(level=logging.INFO)
_logger
=
logging
.
getLogger
(
'opcua'
)
async
def
browse_nodes
(
node
:
Node
):
"""
Build a nested node tree dict by recursion (filtered by OPC UA objects and variables).
"""
node_class
=
await
node
.
get_node_class
()
children
=
[]
for
child
in
await
node
.
get_children
():
if
await
child
.
get_node_class
()
in
[
ua
.
NodeClass
.
Object
,
ua
.
NodeClass
.
Variable
]:
children
.
append
(
await
browse_nodes
(
child
)
)
if
node_class
!=
ua
.
NodeClass
.
Variable
:
var_type
=
None
else
:
try
:
var_type
=
(
await
node
.
get_data_type_as_variant_type
()).
value
except
ua
.
UaError
:
_logger
.
warning
(
'Node Variable Type could not be determined for %r'
,
node
)
var_type
=
None
return
{
'id'
:
node
.
nodeid
.
to_string
(),
'name'
:
(
await
node
.
get_display_name
()).
Text
,
'cls'
:
node_class
.
value
,
'children'
:
children
,
'type'
:
var_type
,
}
async
def
task
(
loop
):
async
def
main
():
# url = 'opc.tcp://192.168.2.64:4840'
url
=
'opc.tcp://localhost:4840/freeopcua/server/'
# url = 'opc.tcp://commsvr.com:51234/UA/CAS_UA_Server'
...
...
@@ -47,9 +21,13 @@ async def task(loop):
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger
.
info
(
'Children of root are: %r'
,
await
root
.
get_children
())
uri
=
'http://examples.freeopcua.github.io'
idx
=
await
client
.
get_namespace_index
(
uri
)
# get a specific node knowing its node id
# var = client.get_node(ua.NodeId(1002, 2))
# var = client.get_node("ns=3;i=2002")
var
=
await
root
.
get_child
([
"0:Objects"
,
f"
{
idx
}
:MyObject"
,
f"
{
idx
}
:MyVariable"
])
print
(
"My variable"
,
var
,
await
var
.
get_value
())
# print(var)
# var.get_data_value() # get value of node as a DataValue object
# var.get_value() # get value of node as a python builtin
...
...
@@ -57,18 +35,12 @@ async def task(loop):
# var.set_value(3.9) # set node value using implicit data type
# Now getting a variable node using its browse path
tree
=
await
browse_nodes
(
client
.
get_objects_node
())
_logger
.
info
(
'Node tree: %r'
,
tree
)
except
Exception
:
_logger
.
exception
(
'error'
)
def
main
()
:
if
__name__
==
'__main__'
:
loop
=
asyncio
.
get_event_loop
()
loop
.
set_debug
(
True
)
loop
.
run_until_complete
(
task
(
loop
))
loop
.
run_until_complete
(
main
(
))
loop
.
close
()
if
__name__
==
'__main__'
:
main
()
examples/client-subscription.py
View file @
95913557
import
sys
sys
.
path
.
insert
(
0
,
".."
)
import
os
# os.environ['PYOPCUA_NO_TYPO_CHECK'] = 'True'
...
...
@@ -16,35 +18,26 @@ class SubscriptionHandler:
_logger
.
info
(
'datachange_notification %r %s'
,
node
,
val
)
async
def
task
(
loop
):
async
def
main
(
):
url
=
'opc.tcp://localhost:4840/freeopcua/server/'
client
=
Client
(
url
=
url
)
client
.
set_user
(
'test'
)
client
.
set_password
(
'test'
)
# client.set_security_string()
async
with
client
:
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root
=
client
.
get_root_node
(
)
_logger
.
info
(
'Objects node is: %r'
,
root
)
uri
=
'http://examples.freeopcua.github.io'
idx
=
await
client
.
get_namespace_index
(
uri
)
var
=
await
client
.
nodes
.
objects
.
get_child
([
f"
{
idx
}
:MyObject"
,
f"
{
idx
}
:MyVariable"
]
)
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger
.
info
(
'Children of root are: %r'
,
await
root
.
get_children
())
handler
=
SubscriptionHandler
()
subscription
=
await
client
.
create_subscription
(
500
,
handler
)
nodes
=
[
client
.
get_node
(
'ns=1;i=6'
)
,
var
,
client
.
get_node
(
ua
.
ObjectIds
.
Server_ServerStatus_CurrentTime
),
]
await
subscription
.
subscribe_data_change
(
nodes
)
await
asyncio
.
sleep
(
10
)
def
main
()
:
if
__name__
==
"__main__"
:
loop
=
asyncio
.
get_event_loop
()
loop
.
set_debug
(
True
)
loop
.
run_until_complete
(
task
(
loop
))
loop
.
run_until_complete
(
main
())
loop
.
close
()
if
__name__
==
"__main__"
:
main
()
examples/server-example.py
View file @
95913557
from
threading
import
Thread
import
asyncio
import
copy
import
logging
from
datetime
import
datetime
...
...
@@ -7,17 +7,6 @@ from math import sin
import
sys
sys
.
path
.
insert
(
0
,
".."
)
try
:
from
IPython
import
embed
except
ImportError
:
import
code
def
embed
():
myvars
=
globals
()
myvars
.
update
(
locals
())
shell
=
code
.
InteractiveConsole
(
myvars
)
shell
.
interact
()
from
opcua
import
ua
,
uamethod
,
Server
...
...
@@ -53,9 +42,9 @@ def multiply(parent, x, y):
return
x
*
y
if
__name__
==
"__main__"
:
async
def
main
()
:
# optional: setup logging
logging
.
basicConfig
(
level
=
logging
.
WARN
)
logging
.
basicConfig
(
level
=
logging
.
INFO
)
#logger = logging.getLogger("opcua.address_space")
# logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.internal_server")
...
...
@@ -67,6 +56,7 @@ if __name__ == "__main__":
# now setup our server
server
=
Server
()
await
server
.
init
()
#server.disable_clock()
#server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
server
.
set_endpoint
(
"opc.tcp://0.0.0.0:4840/freeopcua/server/"
)
...
...
@@ -79,28 +69,29 @@ if __name__ == "__main__":
# setup our own namespace
uri
=
"http://examples.freeopcua.github.io"
idx
=
server
.
register_namespace
(
uri
)
idx
=
await
server
.
register_namespace
(
uri
)
return
# create a new node type we can instantiate in our address space
dev
=
server
.
nodes
.
base_object_type
.
add_object_type
(
0
,
"MyDevice"
)
dev
.
add_variable
(
0
,
"sensor1"
,
1.0
).
set_modelling_rule
(
True
)
dev
.
add_property
(
0
,
"device_id"
,
"0340"
).
set_modelling_rule
(
True
)
ctrl
=
dev
.
add_object
(
0
,
"controller"
)
ctrl
.
set_modelling_rule
(
True
)
ctrl
.
add_property
(
0
,
"state"
,
"Idle"
).
set_modelling_rule
(
True
)
dev
=
await
server
.
nodes
.
base_object_type
.
add_object_type
(
idx
,
"MyDevice"
)
await
dev
.
add_variable
(
idx
,
"sensor1"
,
1.0
).
set_modelling_rule
(
True
)
await
dev
.
add_property
(
idx
,
"device_id"
,
"0340"
).
set_modelling_rule
(
True
)
ctrl
=
await
dev
.
add_object
(
idx
,
"controller"
)
await
ctrl
.
set_modelling_rule
(
True
)
await
ctrl
.
add_property
(
idx
,
"state"
,
"Idle"
).
set_modelling_rule
(
True
)
# populating our address space
# First a folder to organise our nodes
myfolder
=
server
.
nodes
.
objects
.
add_folder
(
idx
,
"myEmptyFolder"
)
myfolder
=
await
server
.
nodes
.
objects
.
add_folder
(
idx
,
"myEmptyFolder"
)
# instanciate one instance of our device
mydevice
=
server
.
nodes
.
objects
.
add_object
(
idx
,
"Device0001"
,
dev
)
mydevice_var
=
mydevice
.
get_child
([
"0:controller"
,
"0
:state"
])
# get proxy to our device state variable
mydevice
=
await
server
.
nodes
.
objects
.
add_object
(
idx
,
"Device0001"
,
dev
)
mydevice_var
=
await
mydevice
.
get_child
([
f"
{
idx
}
:controller"
,
f"
{
idx
}
:state"
])
# get proxy to our device state variable
# create directly some objects and variables
myobj
=
server
.
nodes
.
objects
.
add_object
(
idx
,
"MyObject"
)
myvar
=
myobj
.
add_variable
(
idx
,
"MyVariable"
,
6.7
)
myobj
=
await
server
.
nodes
.
objects
.
add_object
(
idx
,
"MyObject"
)
myvar
=
await
myobj
.
add_variable
(
idx
,
"MyVariable"
,
6.7
)
myvar
.
set_writable
()
# Set MyVariable to be writable by clients
mystringvar
=
myobj
.
add_variable
(
idx
,
"MyStringVariable"
,
"Really nice string"
)
mystringvar
=
await
myobj
.
add_variable
(
idx
,
"MyStringVariable"
,
"Really nice string"
)
mystringvar
.
set_writable
()
# Set MyVariable to be writable by clients
mydtvar
=
myobj
.
add_variable
(
idx
,
"MyDateTimeVar"
,
datetime
.
utcnow
())
mydtvar
.
set_writable
()
# Set MyVariable to be writable by clients
...
...
@@ -139,3 +130,9 @@ if __name__ == "__main__":
embed
()
finally
:
server
.
stop
()
if
__name__
==
"__main__"
:
loop
=
asyncio
.
get_event_loop
()
loop
.
set_debug
(
True
)
loop
.
run_until_complete
(
main
())
examples/server-minimal.py
View file @
95913557
import
sys
sys
.
path
.
insert
(
0
,
".."
)
import
asyncio
from
opcua
import
ua
,
Server
...
...
@@ -13,7 +15,7 @@ async def task(loop):
# setup our server
server
=
Server
()
await
server
.
init
()
server
.
set_endpoint
(
'opc.tcp://
127.0.0.1:808
0/freeopcua/server/'
)
#4840
server
.
set_endpoint
(
'opc.tcp://
localhost:484
0/freeopcua/server/'
)
#4840
# setup our own namespace, not really necessary but should as spec
uri
=
'http://examples.freeopcua.github.io'
idx
=
await
server
.
register_namespace
(
uri
)
...
...
@@ -34,6 +36,7 @@ async def task(loop):
async
with
server
:
count
=
0
while
True
:
print
(
"UPDATE"
)
await
asyncio
.
sleep
(
1
)
count
+=
0.1
await
myvar
.
set_value
(
count
)
...
...
opcua/client/client.py
View file @
95913557
...
...
@@ -339,13 +339,6 @@ class Client(object):
# Actual maximum number of milliseconds that a Session shall remain open without activity
self
.
session_timeout
=
response
.
RevisedSessionTimeout
self
.
_schedule_renew_session
()
# ToDo: subscribe to ServerStatus
"""
The preferred mechanism for a Client to monitor the connection status is through the keep-alive of the
Subscription. A Client should subscribe for the State Variable in the ServerStatus to detect shutdown or other
failure states. If no Subscription is created or the Server does not support Subscriptions,
the connection can be monitored by periodically reading the State Variable
"""
return
response
def
_schedule_renew_session
(
self
,
renew_session
:
bool
=
False
):
...
...
@@ -361,12 +354,13 @@ class Client(object):
async
def
_renew_session
(
self
):
"""
Renew the SecureChannel before the SessionTimeout will happen.
ToDo: shouldn't this only be done if there was no session activity?
In theory we could do that only if no session activity
but it does not cost much..
"""
s
erver_state
=
self
.
get_node
(
ua
.
FourByteNodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_State
))
s
tate_node
=
await
self
.
nodes
.
server_state
self
.
logger
.
debug
(
"renewing channel"
)
await
self
.
open_secure_channel
(
renew
=
True
)
val
=
await
s
erver_stat
e
.
get_value
()
val
=
await
s
tate_nod
e
.
get_value
()
self
.
logger
.
debug
(
"server state is: %s "
,
val
)
def
server_policy_id
(
self
,
token_type
,
default
):
...
...
opcua/client/ua_client.py
View file @
95913557
...
...
@@ -137,7 +137,7 @@ class UASocketProtocol(asyncio.Protocol):
def
_call_callback
(
self
,
request_id
,
body
):
future
=
self
.
_callbackmap
.
pop
(
request_id
,
None
)
if
future
is
None
:
raise
ua
.
UaError
(
"No
future object found for request
: {0}, callbacks in list are {1}"
.
format
(
raise
ua
.
UaError
(
"No
request found for requestid
: {0}, callbacks in list are {1}"
.
format
(
request_id
,
self
.
_callbackmap
.
keys
()))
future
.
set_result
(
body
)
...
...
@@ -183,6 +183,7 @@ class UASocketProtocol(asyncio.Protocol):
self
.
_connection
.
set_channel
(
response
.
Parameters
)
return
response
.
Parameters
async
def
close_secure_channel
(
self
):
"""
Close secure channel.
...
...
@@ -217,6 +218,8 @@ class UaClient:
self
.
_timeout
=
timeout
self
.
security_policy
=
ua
.
SecurityPolicy
()
self
.
protocol
:
UASocketProtocol
=
None
self
.
_sub_cond
=
asyncio
.
Condition
()
self
.
_sub_data_queue
=
[]
def
set_security
(
self
,
policy
:
ua
.
SecurityPolicy
):
self
.
security_policy
=
policy
...
...
@@ -407,9 +410,10 @@ class UaClient:
self
.
logger
.
info
(
"create_subscription"
)
request
=
ua
.
CreateSubscriptionRequest
()
request
.
Parameters
=
params
data
=
await
self
.
protocol
.
send_request
(
request
)
response
=
struct_from_binary
(
ua
.
CreateSubscriptionResponse
,
await
self
.
protocol
.
send_request
(
request
)
data
)
self
.
logger
.
info
(
"create subscription callback"
)
self
.
logger
.
debug
(
response
)
...
...
@@ -421,9 +425,10 @@ class UaClient:
self
.
logger
.
info
(
"delete_subscription"
)
request
=
ua
.
DeleteSubscriptionsRequest
()
request
.
Parameters
.
SubscriptionIds
=
subscription_ids
data
=
await
self
.
protocol
.
send_request
(
request
)
response
=
struct_from_binary
(
ua
.
DeleteSubscriptionsResponse
,
await
self
.
protocol
.
send_request
(
request
)
data
)
self
.
logger
.
info
(
"delete subscriptions callback"
)
self
.
logger
.
debug
(
response
)
...
...
@@ -438,13 +443,36 @@ class UaClient:
acks
=
[]
request
=
ua
.
PublishRequest
()
request
.
Parameters
.
SubscriptionAcknowledgements
=
acks
data
=
await
self
.
protocol
.
send_request
(
request
,
timeout
=
0
)
# check if answer looks ok
await
self
.
protocol
.
send_request
(
request
,
self
.
_sub_data_received
,
timeout
=
0
)
def
_sub_data_received
(
self
,
future
):
data
=
future
.
result
()
self
.
loop
.
create_task
(
self
.
_call_publish_callback
(
data
))
"""
def _sub_data_received(self, future):
data = future.result()
self.loop.create_task(self._enqueue_sub_data(data))
async def _enqueue_sub_data(self, data):
self._sub_data_queue.append(data)
with self._sub_cond:
self._sub_cond.notify()
async def _subscribtion_loop(self):
while True:
async with self._sub_cond:
await self._sub_cond.wait()
data = self._sub_data_queue.pop(0)
await self._call_publish_callback(data)
"""
async
def
_call_publish_callback
(
self
,
data
):
self
.
logger
.
info
(
"call_publish_callback"
)
try
:
self
.
protocol
.
check_answer
(
data
,
"while waiting for publish response"
)
except
BadTimeout
:
# Spec Part 4, 7.28
self
.
loop
.
create_task
(
self
.
publish
()
)
await
self
.
publish
(
)
return
except
BadNoSubscription
:
# Spec Part 5, 13.8.1
# BadNoSubscription is expected after deleting the last subscription.
...
...
@@ -472,7 +500,7 @@ class UaClient:
# does so it stays in, doesn't seem to hurt.
self
.
logger
.
exception
(
"Error parsing notification from server"
)
# send publish request ot server so he does stop sending notifications
self
.
loop
.
create_task
(
self
.
publish
([])
)
await
self
.
publish
([]
)
return
# look for callback
try
:
...
...
@@ -482,7 +510,7 @@ class UaClient:
return
# do callback
try
:
callback
(
response
.
Parameters
)
await
callback
(
response
.
Parameters
)
except
Exception
:
# we call client code, catch everything!
self
.
logger
.
exception
(
"Exception while calling user callback: %s"
)
...
...
opcua/common/node.py
View file @
95913557
...
...
@@ -654,33 +654,32 @@ class Node:
rule
=
ua
.
ObjectIds
.
ModellingRule_Mandatory
if
mandatory
else
ua
.
ObjectIds
.
ModellingRule_Optional
await
self
.
add_reference
(
rule
,
ua
.
ObjectIds
.
HasModellingRule
,
True
,
False
)
def
add_folder
(
self
,
nodeid
,
bname
):
return
create_folder
(
self
,
nodeid
,
bname
)
async
def
add_folder
(
self
,
nodeid
,
bname
):
return
await
create_folder
(
self
,
nodeid
,
bname
)
def
add_object
(
self
,
nodeid
,
bname
,
objecttype
=
None
):
return
create_object
(
self
,
nodeid
,
bname
,
objecttype
)
async
def
add_object
(
self
,
nodeid
,
bname
,
objecttype
=
None
):
return
await
create_object
(
self
,
nodeid
,
bname
,
objecttype
)
def
add_variable
(
self
,
nodeid
,
bname
,
val
,
varianttype
=
None
,
datatype
=
None
):
return
create_variable
(
self
,
nodeid
,
bname
,
val
,
varianttype
,
datatype
)
async
def
add_variable
(
self
,
nodeid
,
bname
,
val
,
varianttype
=
None
,
datatype
=
None
):
return
await
create_variable
(
self
,
nodeid
,
bname
,
val
,
varianttype
,
datatype
)
def
add_object_type
(
self
,
nodeid
,
bname
):
return
create_object_type
(
self
,
nodeid
,
bname
)
async
def
add_object_type
(
self
,
nodeid
,
bname
):
return
await
create_object_type
(
self
,
nodeid
,
bname
)
def
add_variable_type
(
self
,
nodeid
,
bname
,
datatype
):
return
create_variable_type
(
self
,
nodeid
,
bname
,
datatype
)
async
def
add_variable_type
(
self
,
nodeid
,
bname
,
datatype
):
return
await
create_variable_type
(
self
,
nodeid
,
bname
,
datatype
)
def
add_data_type
(
self
,
nodeid
,
bname
,
description
=
None
):
return
create_data_type
(
self
,
nodeid
,
bname
,
description
=
None
)
async
def
add_data_type
(
self
,
nodeid
,
bname
,
description
=
None
):
return
await
create_data_type
(
self
,
nodeid
,
bname
,
description
=
None
)
def
add_property
(
self
,
nodeid
,
bname
,
val
,
varianttype
=
None
,
datatype
=
None
):
return
create_property
(
self
,
nodeid
,
bname
,
val
,
varianttype
,
datatype
)
async
def
add_property
(
self
,
nodeid
,
bname
,
val
,
varianttype
=
None
,
datatype
=
None
):
return
await
create_property
(
self
,
nodeid
,
bname
,
val
,
varianttype
,
datatype
)
def
add_method
(
self
,
*
args
):
return
create_method
(
self
,
*
args
)
async
def
add_method
(
self
,
*
args
):
return
await
create_method
(
self
,
*
args
)
def
add_reference_type
(
self
,
nodeid
,
bname
,
symmetric
=
True
,
inversename
=
None
):
"""COROUTINE"""
return
create_reference_type
(
self
,
nodeid
,
bname
,
symmetric
,
inversename
)
async
def
add_reference_type
(
self
,
nodeid
,
bname
,
symmetric
=
True
,
inversename
=
None
):
return
await
create_reference_type
(
self
,
nodeid
,
bname
,
symmetric
,
inversename
)
def
call_method
(
self
,
methodid
,
*
args
):
return
call_method
(
self
,
methodid
,
*
args
)
async
def
call_method
(
self
,
methodid
,
*
args
):
return
await
call_method
(
self
,
methodid
,
*
args
)
opcua/common/shortcuts.py
View file @
95913557
...
...
@@ -28,3 +28,4 @@ class Shortcuts(object):
self
.
namespace_array
=
Node
(
server
,
ObjectIds
.
Server_NamespaceArray
)
self
.
opc_binary
=
Node
(
server
,
ObjectIds
.
OPCBinarySchema_TypeSystem
)
self
.
base_structure_type
=
Node
(
server
,
ObjectIds
.
Structure
)
self
.
server_state
=
Node
(
server
,
ObjectIds
.
Server_ServerStatus_State
)
opcua/common/subscription.py
View file @
95913557
...
...
@@ -4,7 +4,7 @@ high level interface to subscriptions
import
asyncio
import
logging
import
collections
from
typing
import
Union
import
time
from
opcua
import
ua
from
.events
import
Event
,
get_filter_from_event_type
...
...
@@ -95,7 +95,7 @@ class Subscription:
self
.
logger
.
info
(
'Subscription created %s'
,
self
.
subscription_id
)
# Send a publish request so the server has one in its queue
# Servers should always be able to handle at least on extra publish request per subscriptions
self
.
loop
.
create_task
(
self
.
server
.
publish
()
)
await
self
.
server
.
publish
(
)
async
def
delete
(
self
):
"""
...
...
@@ -104,10 +104,10 @@ class Subscription:
results
=
await
self
.
server
.
delete_subscriptions
([
self
.
subscription_id
])
results
[
0
].
check
()
def
publish_callback
(
self
,
publishresult
:
ua
.
PublishResult
):
async
def
publish_callback
(
self
,
publishresult
:
ua
.
PublishResult
):
self
.
logger
.
info
(
"Publish callback called with result: %s"
,
publishresult
)
while
self
.
subscription_id
is
None
:
time
.
sleep
(
0.01
)
await
asyncio
.
sleep
(
0.01
)
if
publishresult
.
NotificationMessage
.
NotificationData
is
not
None
:
for
notif
in
publishresult
.
NotificationMessage
.
NotificationData
:
...
...
@@ -125,7 +125,7 @@ class Subscription:
ack
=
ua
.
SubscriptionAcknowledgement
()
ack
.
SubscriptionId
=
self
.
subscription_id
ack
.
SequenceNumber
=
publishresult
.
NotificationMessage
.
SequenceNumber
self
.
loop
.
create_task
(
self
.
server
.
publish
([
ack
])
)
await
self
.
server
.
publish
([
ack
]
)
def
_call_datachange
(
self
,
datachange
:
ua
.
DataChangeNotification
):
for
item
in
datachange
.
MonitoredItems
:
...
...
opcua/server/address_space.py
View file @
95913557
...
...
@@ -220,8 +220,8 @@ class NodeManagementService(object):
return
result
if
item
.
ParentNodeId
.
is_null
():
self
.
logger
.
info
(
"add_node: while adding node %s, requested parent node is null %s %s"
,
item
.
RequestedNewNodeId
,
item
.
ParentNodeId
,
item
.
ParentNodeId
.
is_null
())
#
self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
#
item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
if
check
:
result
.
StatusCode
=
ua
.
StatusCode
(
ua
.
StatusCodes
.
BadParentNodeIdInvalid
)
return
result
...
...
opcua/server/internal_server.py
View file @
95913557
...
...
@@ -400,13 +400,13 @@ class InternalSession:
return
self
.
iserver
.
method_service
.
call
(
params
)
async
def
create_subscription
(
self
,
params
,
callback
):
result
=
self
.
subscription_service
.
create_subscription
(
params
,
callback
)
result
=
await
self
.
subscription_service
.
create_subscription
(
params
,
callback
)
self
.
subscriptions
.
append
(
result
.
SubscriptionId
)
return
result
async
def
create_monitored_items
(
self
,
params
):
"""Returns Future"""
subscription_result
=
self
.
subscription_service
.
create_monitored_items
(
params
)
subscription_result
=
await
self
.
subscription_service
.
create_monitored_items
(
params
)
self
.
iserver
.
server_callback_dispatcher
.
dispatch
(
CallbackType
.
ItemSubscriptionCreated
,
ServerItemCallback
(
params
,
subscription_result
))
return
subscription_result
...
...
@@ -424,15 +424,16 @@ class InternalSession:
for
i
in
ids
:
if
i
in
self
.
subscriptions
:
self
.
subscriptions
.
remove
(
i
)
return
self
.
subscription_service
.
delete_subscriptions
(
ids
)
return
await
self
.
subscription_service
.
delete_subscriptions
(
ids
)
async
def
delete_monitored_items
(
self
,
params
):
subscription_result
=
self
.
subscription_service
.
delete_monitored_items
(
params
)
subscription_result
=
await
self
.
subscription_service
.
delete_monitored_items
(
params
)
self
.
iserver
.
server_callback_dispatcher
.
dispatch
(
CallbackType
.
ItemSubscriptionDeleted
,
ServerItemCallback
(
params
,
subscription_result
))
return
subscription_result
async
def
publish
(
self
,
acks
=
None
):
# This is an async method, dues to symetry with client code
if
acks
is
None
:
acks
=
[]
return
self
.
subscription_service
.
publish
(
acks
)
...
...
opcua/server/internal_subscription.py
View file @
95913557
...
...
@@ -3,6 +3,7 @@ server side implementation of a subscription object
"""
import
logging
import
asyncio
from
opcua
import
ua
...
...
@@ -56,7 +57,7 @@ class MonitoredItemService:
def
delete_all_monitored_items
(
self
):
self
.
delete_monitored_items
([
mdata
.
monitored_item_id
for
mdata
in
self
.
_monitored_items
.
values
()])
def
create_monitored_items
(
self
,
params
):
async
def
create_monitored_items
(
self
,
params
):
results
=
[]
for
item
in
params
.
ItemsToCreate
:
# with self._lock:
...
...
@@ -259,34 +260,33 @@ class InternalSubscription:
self
.
_startup
=
True
self
.
_keep_alive_count
=
0
self
.
_publish_cycles_count
=
0
self
.
_
stopev
=
Fals
e
self
.
_
task
=
Non
e
def
__str__
(
self
):
return
"Subscription(id:{0})"
.
format
(
self
.
data
.
SubscriptionId
)
def
start
(
self
):
async
def
start
(
self
):
self
.
logger
.
debug
(
"starting subscription %s"
,
self
.
data
.
SubscriptionId
)
if
self
.
data
.
RevisedPublishingInterval
>
0.0
:
self
.
_
subscription_loop
(
)
self
.
_
task
=
self
.
subservice
.
loop
.
create_task
(
self
.
_subscription_loop
()
)
def
stop
(
self
):
async
def
stop
(
self
):
self
.
logger
.
debug
(
"stopping subscription %s"
,
self
.
data
.
SubscriptionId
)
self
.
_stopev
=
True
self
.
_task
.
cancel
()
await
self
.
_task
self
.
monitored_item_srv
.
delete_all_monitored_items
()
def
_trigger_publish
(
self
):
if
not
self
.
_stopev
and
self
.
data
.
RevisedPublishingInterval
<=
0.0
:
self
.
subservice
.
loop
.
call_soon
(
self
.
publish_results
)
if
self
.
_task
and
self
.
data
.
RevisedPublishingInterval
<=
0.0
:
self
.
publish_results
(
)
def
_subscription_loop
(
self
):
if
not
self
.
_stopev
:
self
.
subservice
.
loop
.
call_later
(
self
.
data
.
RevisedPublishingInterval
/
1000.0
,
self
.
_sub_loop
)
def
_sub_loop
(
self
):
if
self
.
_stopev
:
return
self
.
publish_results
()
self
.
_subscription_loop
()
async
def
_subscription_loop
(
self
):
try
:
while
True
:
await
asyncio
.
sleep
(
self
.
data
.
RevisedPublishingInterval
/
1000.0
)
self
.
publish_results
()
except
asyncio
.
CancelledError
:
pass
def
has_published_results
(
self
):
if
self
.
_startup
or
self
.
_triggered_datachanges
or
self
.
_triggered_events
:
...
...
@@ -304,14 +304,14 @@ class InternalSubscription:
self
,
self
.
_publish_cycles_count
,
self
.
data
.
RevisedLifetimeCount
)
# FIXME this will never be send since we do not have publish request anyway
self
.
monitored_item_srv
.
trigger_statuschange
(
ua
.
StatusCode
(
ua
.
StatusCodes
.
BadTimeout
))
self
.
_stopev
=
True
result
=
None
if
self
.
has_published_results
():
# FIXME: should we pop a publish request here? or we do not care?
self
.
_publish_cycles_count
+=
1
result
=
self
.
_pop_publish_result
()
if
result
is
not
None
:
self
.
callback
(
result
)
self
.
subservice
.
loop
.
create_task
(
self
.
callback
(
result
))
#await self.callback(result)
def
_pop_publish_result
(
self
):
result
=
ua
.
PublishResult
()
...
...
@@ -354,7 +354,6 @@ class InternalSubscription:
def
publish
(
self
,
acks
):
self
.
logger
.
info
(
"publish request with acks %s"
,
acks
)
self
.
_publish_cycles_count
=
0
for
nb
in
acks
:
self
.
_not_acknowledged_results
.
pop
(
nb
,
None
)
...
...
opcua/server/subscription_service.py
View file @
95913557
...
...
@@ -22,7 +22,7 @@ class SubscriptionService:
self
.
subscriptions
=
{}
self
.
_sub_id_counter
=
77
def
create_subscription
(
self
,
params
,
callback
):
async
def
create_subscription
(
self
,
params
,
callback
):
self
.
logger
.
info
(
"create subscription with callback: %s"
,
callback
)
result
=
ua
.
CreateSubscriptionResult
()
result
.
RevisedPublishingInterval
=
params
.
RequestedPublishingInterval
...
...
@@ -32,21 +32,20 @@ class SubscriptionService:
result
.
SubscriptionId
=
self
.
_sub_id_counter
sub
=
InternalSubscription
(
self
,
result
,
self
.
aspace
,
callback
)
sub
.
start
()
await
sub
.
start
()
self
.
subscriptions
[
result
.
SubscriptionId
]
=
sub
return
result
def
delete_subscriptions
(
self
,
ids
):
async
def
delete_subscriptions
(
self
,
ids
):
self
.
logger
.
info
(
"delete subscriptions: %s"
,
ids
)
res
=
[]
for
i
in
ids
:
#with self._lock:
if
i
not
in
self
.
subscriptions
:
res
.
append
(
ua
.
StatusCode
(
ua
.
StatusCodes
.
BadSubscriptionIdInvalid
))
else
:
sub
=
self
.
subscriptions
.
pop
(
i
)
sub
.
stop
()
await
sub
.
stop
()
res
.
append
(
ua
.
StatusCode
())
return
res
...
...
@@ -56,7 +55,7 @@ class SubscriptionService:
for
subid
,
sub
in
self
.
subscriptions
.
items
():
sub
.
publish
([
ack
.
SequenceNumber
for
ack
in
acks
if
ack
.
SubscriptionId
==
subid
])
def
create_monitored_items
(
self
,
params
):
async
def
create_monitored_items
(
self
,
params
):
self
.
logger
.
info
(
"create monitored items"
)
#with self._lock:
if
params
.
SubscriptionId
not
in
self
.
subscriptions
:
...
...
@@ -66,7 +65,7 @@ class SubscriptionService:
response
.
StatusCode
=
ua
.
StatusCode
(
ua
.
StatusCodes
.
BadSubscriptionIdInvalid
)
res
.
append
(
response
)
return
res
return
self
.
subscriptions
[
params
.
SubscriptionId
].
monitored_item_srv
.
create_monitored_items
(
params
)
return
await
self
.
subscriptions
[
params
.
SubscriptionId
].
monitored_item_srv
.
create_monitored_items
(
params
)
def
modify_monitored_items
(
self
,
params
):
self
.
logger
.
info
(
"modify monitored items"
)
...
...
tests/test_subscriptions.py
View file @
95913557
...
...
@@ -98,9 +98,9 @@ async def test_subscription_overload(opc):
for
i
in
range
(
nb
):
for
j
in
range
(
nb
):
await
variables
[
i
].
set_value
(
j
)
await
sub
.
delete
()
for
s
in
subs
:
await
s
.
delete
()
#
await sub.delete()
#
for s in subs:
#
await s.delete()
async
def
test_subscription_count
(
opc
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment