Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
8b55fd35
Commit
8b55fd35
authored
Apr 30, 2019
by
cbergmiller
Committed by
GitHub
Apr 30, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #43 from FreeOpcUa/subscrition_#39
Subscription #39
parents
41c5f6ec
057f7ffd
Changes
2
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
146 additions
and
45 deletions
+146
-45
asyncua/common/subscription.py
asyncua/common/subscription.py
+81
-35
tests/test_subscriptions.py
tests/test_subscriptions.py
+65
-10
No files found.
asyncua/common/subscription.py
View file @
8b55fd35
This diff is collapsed.
Click to expand it.
tests/test_subscriptions.py
View file @
8b55fd35
...
...
@@ -24,7 +24,7 @@ class SubHandler:
class
MySubHandler
:
"""
More advanced subscription client using Future, so we can
wait for events in tests
More advanced subscription client using Future, so we can
await events in tests.
"""
def
__init__
(
self
):
...
...
@@ -46,14 +46,22 @@ class MySubHandler:
class
MySubHandler2
:
def
__init__
(
self
):
def
__init__
(
self
,
limit
=
None
):
self
.
results
=
[]
self
.
limit
=
limit
self
.
done
=
asyncio
.
Event
()
def
check_done
(
self
):
if
self
.
limit
and
len
(
self
.
results
)
==
self
.
limit
and
not
self
.
done
.
is_set
():
self
.
done
.
set
()
def
datachange_notification
(
self
,
node
,
val
,
data
):
self
.
results
.
append
((
node
,
val
))
self
.
check_done
()
def
event_notification
(
self
,
event
):
self
.
results
.
append
(
event
)
self
.
check_done
()
class
MySubHandlerCounter
:
...
...
@@ -282,10 +290,14 @@ async def test_subscription_data_change_many(opc):
async
def
test_subscribe_server_time
(
opc
):
"""
Test the subscription of the `Server_ServerStatus_CurrentTime` objects `Value` attribute.
"""
myhandler
=
MySubHandler
()
server_time_node
=
opc
.
opc
.
get_node
(
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_CurrentTime
))
sub
=
await
opc
.
opc
.
create_subscription
(
200
,
myhandler
)
handle
=
await
sub
.
subscribe_data_change
(
server_time_node
)
assert
type
(
handle
)
is
int
node
,
val
,
data
=
await
myhandler
.
result
()
assert
server_time_node
==
node
delta
=
datetime
.
utcnow
()
-
val
...
...
@@ -295,6 +307,9 @@ async def test_subscribe_server_time(opc):
async
def
test_modify_monitored_item
(
opc
):
"""
Test that the subscription of the `Server_ServerStatus_CurrentTime` object can be modified.
"""
myhandler
=
MySubHandler
()
server_time_node
=
opc
.
opc
.
get_node
(
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_CurrentTime
))
sub
=
await
opc
.
opc
.
create_subscription
(
1000
,
myhandler
)
...
...
@@ -320,6 +335,46 @@ async def test_create_delete_subscription(opc):
await
sub
.
delete
()
async
def
test_unsubscribe_two_objects_simultaneously
(
opc
):
"""
Test the subscription/unsub. of the `CurrentTime` and `Server_ServerStatus_State` objects.
Unsubscribe from both Nodes simultaneously.
"""
handler
=
MySubHandler2
(
limit
=
1
)
nodes
=
[
opc
.
opc
.
get_node
(
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_CurrentTime
)),
opc
.
opc
.
get_node
(
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_State
)),
]
sub
=
await
opc
.
opc
.
create_subscription
(
200
,
handler
)
handles
=
await
sub
.
subscribe_data_change
(
nodes
)
await
handler
.
done
.
wait
()
assert
handler
.
results
[
0
][
0
]
==
nodes
[
0
]
assert
(
datetime
.
utcnow
()
-
handler
.
results
[
0
][
1
])
<
timedelta
(
seconds
=
2
)
assert
handler
.
results
[
1
][
0
]
==
nodes
[
1
]
assert
handler
.
results
[
1
][
1
]
==
0
await
sub
.
unsubscribe
(
handles
)
await
sub
.
delete
()
async
def
test_unsubscribe_two_objects_consecutively
(
opc
):
"""
Test the subscription/unsub. of the `CurrentTime` and `Server_ServerStatus_State` objects.
Unsubscribe from both Nodes consecutively.
"""
handler
=
MySubHandler2
(
limit
=
2
)
nodes
=
[
opc
.
opc
.
get_node
(
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_CurrentTime
)),
opc
.
opc
.
get_node
(
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_State
)),
]
sub
=
await
opc
.
opc
.
create_subscription
(
200
,
handler
)
handles
=
await
sub
.
subscribe_data_change
(
nodes
)
assert
type
(
handles
)
is
list
await
handler
.
done
.
wait
()
for
handle
in
handles
:
await
sub
.
unsubscribe
(
handle
)
await
sub
.
delete
()
async
def
test_subscribe_events
(
opc
):
sub
=
await
opc
.
opc
.
create_subscription
(
100
,
MySubHandler
())
handle
=
await
sub
.
subscribe_events
()
...
...
@@ -422,8 +477,8 @@ async def test_events_wrong_source(opc):
async
def
test_events_CustomEvent
(
opc
):
etype
=
await
opc
.
server
.
create_custom_event_type
(
2
,
'MyEvent'
,
ua
.
ObjectIds
.
BaseEventType
,
[(
'PropertyNum'
,
ua
.
VariantType
.
Float
),
(
'PropertyString'
,
ua
.
VariantType
.
String
)])
[(
'PropertyNum'
,
ua
.
VariantType
.
Float
),
(
'PropertyString'
,
ua
.
VariantType
.
String
)])
evgen
=
await
opc
.
server
.
get_event_generator
(
etype
)
myhandler
=
MySubHandler
()
sub
=
await
opc
.
opc
.
create_subscription
(
100
,
myhandler
)
...
...
@@ -455,8 +510,8 @@ async def test_events_CustomEvent_MyObject(opc):
objects
=
opc
.
server
.
get_objects_node
()
o
=
await
objects
.
add_object
(
3
,
'MyObject'
)
etype
=
await
opc
.
server
.
create_custom_event_type
(
2
,
'MyEvent'
,
ua
.
ObjectIds
.
BaseEventType
,
[(
'PropertyNum'
,
ua
.
VariantType
.
Float
),
(
'PropertyString'
,
ua
.
VariantType
.
String
)])
[(
'PropertyNum'
,
ua
.
VariantType
.
Float
),
(
'PropertyString'
,
ua
.
VariantType
.
String
)])
evgen
=
await
opc
.
server
.
get_event_generator
(
etype
,
emitting_node
=
o
)
myhandler
=
MySubHandler
()
sub
=
await
opc
.
opc
.
create_subscription
(
100
,
myhandler
)
...
...
@@ -486,12 +541,12 @@ async def test_several_different_events(opc):
objects
=
opc
.
server
.
get_objects_node
()
o
=
await
objects
.
add_object
(
3
,
'MyObject'
)
etype1
=
await
opc
.
server
.
create_custom_event_type
(
2
,
'MyEvent1'
,
ua
.
ObjectIds
.
BaseEventType
,
[(
'PropertyNum'
,
ua
.
VariantType
.
Float
),
(
'PropertyString'
,
ua
.
VariantType
.
String
)])
[(
'PropertyNum'
,
ua
.
VariantType
.
Float
),
(
'PropertyString'
,
ua
.
VariantType
.
String
)])
evgen1
=
await
opc
.
server
.
get_event_generator
(
etype1
,
o
)
etype2
=
await
opc
.
server
.
create_custom_event_type
(
2
,
'MyEvent2'
,
ua
.
ObjectIds
.
BaseEventType
,
[(
'PropertyNum'
,
ua
.
VariantType
.
Float
),
(
'PropertyString'
,
ua
.
VariantType
.
String
)])
[(
'PropertyNum'
,
ua
.
VariantType
.
Float
),
(
'PropertyString'
,
ua
.
VariantType
.
String
)])
evgen2
=
await
opc
.
server
.
get_event_generator
(
etype2
,
o
)
myhandler
=
MySubHandler2
()
sub
=
await
opc
.
opc
.
create_subscription
(
100
,
myhandler
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment