Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
9163dbf8
Commit
9163dbf8
authored
Jul 02, 2024
by
Olivier
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix typing of SubscriptionHandler
parent
55726fbf
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
26 additions
and
21 deletions
+26
-21
asyncua/common/subscription.py
asyncua/common/subscription.py
+24
-18
asyncua/server/history.py
asyncua/server/history.py
+1
-2
asyncua/sync.py
asyncua/sync.py
+1
-1
No files found.
asyncua/common/subscription.py
View file @
9163dbf8
...
...
@@ -81,35 +81,41 @@ class StatusChangeNotificationHandler(Protocol):
...
SubscriptionHandler
=
Union
[
DataChangeNotificationHandler
,
EventNotificationHandler
,
StatusChangeNotificationHandler
]
"""
Protocol class representing subscription handlers to receive events from server.
"""
class
SubHandler
:
"""
Subscription Handler. To receive events from server for a subscription
This class is just a sample class. Whatever class having these methods can be used
"""
def
datachange_notification
(
self
,
node
:
Node
,
val
:
Any
,
data
:
DataChangeNotif
)
->
None
:
class
DataChangeNotificationHandlerAsync
(
Protocol
):
async
def
datachange_notification
(
self
,
node
:
Node
,
val
:
Any
,
data
:
DataChangeNotif
)
->
None
:
"""
called for every datachange notification from server
"""
pass
...
def
event_notification
(
self
,
event
:
ua
.
EventNotificationList
)
->
None
:
class
EventNotificationHandlerAsync
(
Protocol
):
async
def
event_notification
(
self
,
event
:
ua
.
EventNotificationList
)
->
None
:
"""
called for every event notification from server
"""
pass
...
def
status_change_notification
(
self
,
status
:
ua
.
StatusChangeNotification
)
->
None
:
class
StatusChangeNotificationHandlerAsync
(
Protocol
):
async
def
status_change_notification
(
self
,
status
:
ua
.
StatusChangeNotification
)
->
None
:
"""
called for every status change notification from server
"""
pass
...
SubscriptionHandler
=
Union
[
DataChangeNotificationHandler
,
EventNotificationHandler
,
StatusChangeNotificationHandler
,
DataChangeNotificationHandlerAsync
,
EventNotificationHandlerAsync
,
StatusChangeNotificationHandlerAsync
,
]
"""
Protocol class representing subscription handlers to receive events from server.
"""
class
Subscription
:
...
...
asyncua/server/history.py
View file @
9163dbf8
...
...
@@ -7,7 +7,6 @@ from datetime import timedelta
from
datetime
import
timezone
from
asyncua
import
ua
from
asyncua.common
import
subscription
from
asyncua.common.subscription
import
Subscription
,
SubscriptionHandler
from
..common.utils
import
Buffer
...
...
@@ -217,7 +216,7 @@ class HistoryDict(HistoryStorageInterface):
pass
class
SubHandler
(
subscription
.
SubHandler
)
:
class
SubHandler
:
def
__init__
(
self
,
storage
:
HistoryStorageInterface
):
self
.
storage
=
storage
...
...
asyncua/sync.py
View file @
9163dbf8
...
...
@@ -322,7 +322,7 @@ class Client:
def
load_enums
(
self
)
->
Dict
[
str
,
Type
]:
# type: ignore[empty-body]
pass
def
create_subscription
(
self
,
period
:
Union
[
ua
.
CreateSubscriptionParameters
,
float
],
handler
:
subscription
.
SubHandler
,
publishing
:
bool
=
True
)
->
"Subscription"
:
def
create_subscription
(
self
,
period
:
Union
[
ua
.
CreateSubscriptionParameters
,
float
],
handler
:
subscription
.
Sub
scription
Handler
,
publishing
:
bool
=
True
)
->
"Subscription"
:
coro
=
self
.
aio_obj
.
create_subscription
(
period
,
_SubHandler
(
self
.
tloop
,
handler
),
publishing
)
aio_sub
=
self
.
tloop
.
post
(
coro
)
return
Subscription
(
self
.
tloop
,
aio_sub
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment