Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
3620aa0d
Commit
3620aa0d
authored
Jun 04, 2016
by
ORD
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #200 from FreeOpcUa/ev2
fix, renaming and restructure of event stuff
parents
c5c8fe8e
9490ac05
Changes
14
Show whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
137 additions
and
127 deletions
+137
-127
examples/server-events.py
examples/server-events.py
+2
-0
opcua/common/event_objects.py
opcua/common/event_objects.py
+46
-0
opcua/common/events.py
opcua/common/events.py
+54
-27
opcua/common/node.py
opcua/common/node.py
+1
-1
opcua/common/subscription.py
opcua/common/subscription.py
+1
-1
opcua/server/event_generator.py
opcua/server/event_generator.py
+3
-2
opcua/server/history_sql.py
opcua/server/history_sql.py
+1
-1
opcua/server/internal_subscription.py
opcua/server/internal_subscription.py
+7
-23
opcua/server/server.py
opcua/server/server.py
+2
-1
opcua/ua/__init__.py
opcua/ua/__init__.py
+0
-1
opcua/ua/uaevents_auto.py
opcua/ua/uaevents_auto.py
+0
-53
opcua/ua/uatypes.py
opcua/ua/uatypes.py
+5
-0
tests/tests_server.py
tests/tests_server.py
+13
-16
tests/tests_unit.py
tests/tests_unit.py
+2
-1
No files found.
examples/server-events.py
View file @
3620aa0d
...
...
@@ -59,6 +59,8 @@ if __name__ == "__main__":
time
.
sleep
(
5
)
myevgen
.
event
.
Message
=
"MyFirstEvent "
+
str
(
count
)
myevgen
.
event
.
Severity
=
count
myevgen
.
event
.
MyNumericProperty
=
count
myevgen
.
event
.
MyStringProperty
=
"Property "
+
str
(
count
)
myevgen
.
trigger
()
mysecondevgen
.
trigger
(
message
=
"MySecondEvent "
+
str
(
count
))
count
+=
1
...
...
opcua/common/event_objects.py
0 → 100644
View file @
3620aa0d
"""
This file contains event types as Python objects.
TODO: This should be auto-generated but is not!!!!!
"""
from
opcua
import
ua
from
opcua.common.events
import
Event
class
BaseEvent
(
Event
):
"""
BaseEvent implements BaseEventType from which inherit all other events and it is used per default.
"""
def
__init__
(
self
,
sourcenode
=
None
,
message
=
None
,
severity
=
1
):
Event
.
__init__
(
self
)
self
.
add_property
(
"EventId"
,
bytes
(),
ua
.
VariantType
.
ByteString
)
self
.
add_property
(
"EventType"
,
ua
.
NodeId
(
ua
.
ObjectIds
.
BaseEventType
),
ua
.
VariantType
.
NodeId
)
self
.
add_property
(
"SourceNode"
,
sourcenode
,
ua
.
VariantType
.
NodeId
)
self
.
add_property
(
"SourceName"
,
None
,
ua
.
VariantType
.
String
)
self
.
add_property
(
"Time"
,
None
,
ua
.
VariantType
.
DateTime
)
self
.
add_property
(
"ReceiveTime"
,
None
,
ua
.
VariantType
.
DateTime
)
self
.
add_property
(
"LocalTime"
,
None
,
ua
.
VariantType
.
DateTime
)
self
.
add_property
(
"Message"
,
ua
.
LocalizedText
(
message
),
ua
.
VariantType
.
LocalizedText
)
self
.
add_property
(
"Severity"
,
severity
,
ua
.
VariantType
.
UInt16
)
class
AuditEvent
(
BaseEvent
):
"""
Audit implements AuditEventType from which inherit all other Audit events.
"""
def
__init__
(
self
,
sourcenode
=
None
,
message
=
None
,
severity
=
1
):
super
(
AuditEvent
,
self
).
__init__
(
sourcenode
,
message
,
severity
)
self
.
EventType
=
ua
.
NodeId
(
ua
.
ObjectIds
.
AuditEventType
)
self
.
add_property
(
"ActionTimeStamp"
,
None
,
ua
.
VariantType
.
DateTime
)
self
.
add_property
(
"Status"
,
False
,
ua
.
VariantType
.
Boolean
)
self
.
add_property
(
"ServerId"
,
None
,
ua
.
VariantType
.
String
)
self
.
add_property
(
"ClientAuditEntryId"
,
None
,
ua
.
VariantType
.
String
)
self
.
add_property
(
"ClientUserId"
,
None
,
ua
.
VariantType
.
String
)
IMPLEMENTED_EVENTS
=
{
ua
.
ObjectIds
.
BaseEventType
:
BaseEvent
,
ua
.
ObjectIds
.
AuditEventType
:
AuditEvent
,
}
opcua/common/events.py
View file @
3620aa0d
import
copy
from
opcua
import
ua
import
opcua
from
opcua.common.uaerrors
import
UaError
class
Event
Result
(
object
):
class
Event
(
object
):
"""
To be sent to clients for every events from server
OPC UA Event object.
This is class in inherited by the common event objects such as BaseEvent,
other auto standard events and custom events
Events are used to trigger events on server side and are
sent to clients for every events from server
Developper Warning:
On server side the data type of attributes should be known, thus
add properties using the add_property method!!!
"""
def
__init__
(
self
):
...
...
@@ -15,12 +27,21 @@ class EventResult(object):
self
.
internal_properties
=
list
(
self
.
__dict__
.
keys
())[:]
+
[
"internal_properties"
]
def
__str__
(
self
):
return
"EventResult({})"
.
format
([
str
(
k
)
+
":"
+
str
(
v
)
for
k
,
v
in
self
.
__dict__
.
items
()
if
k
not
in
self
.
internal_properties
])
return
"{}({})"
.
format
(
self
.
__class__
.
__name__
,
[
str
(
k
)
+
":"
+
str
(
v
)
for
k
,
v
in
self
.
__dict__
.
items
()
if
k
not
in
self
.
internal_properties
])
__repr__
=
__str__
def
add_property
(
self
,
name
,
val
,
datatype
):
"""
Add a property to event and tore its data type
"""
setattr
(
self
,
name
,
val
)
self
.
data_types
[
name
]
=
datatype
def
get_event_props_as_fields_dict
(
self
):
"""
convert all properties of the Event
Result
class to a dict of variants
convert all properties of the Event class to a dict of variants
"""
field_vars
=
{}
for
key
,
value
in
vars
(
self
).
items
():
...
...
@@ -33,11 +54,10 @@ class EventResult(object):
"""
Create an Event object from a dict of name and variants
"""
result
=
EventResul
t
()
ev
=
Even
t
()
for
k
,
v
in
fields
.
items
():
setattr
(
result
,
k
,
v
.
Value
)
result
.
data_types
[
k
]
=
v
.
VariantType
return
result
ev
.
add_property
(
k
,
v
.
Value
,
v
.
VariantType
)
return
ev
def
to_event_fields_using_subscription_fields
(
self
,
select_clauses
):
"""
...
...
@@ -58,12 +78,17 @@ class EventResult(object):
"""
fields
=
[]
for
sattr
in
select_clauses
:
if
len
(
sattr
.
BrowsePath
)
==
0
:
name
=
sattr
.
AttributeId
.
name
if
not
sattr
.
BrowsePath
:
name
=
ua
.
AttributeIds
(
sattr
.
AttributeId
)
.
name
else
:
name
=
sattr
.
BrowsePath
[
0
].
Name
field
=
getattr
(
self
,
name
)
fields
.
append
(
ua
.
Variant
(
field
,
self
.
data_types
[
name
]))
try
:
val
=
getattr
(
self
,
name
)
except
AttributeError
:
field
=
ua
.
Variant
(
None
)
else
:
field
=
ua
.
Variant
(
copy
.
deepcopy
(
val
),
self
.
data_types
[
name
])
fields
.
append
(
field
)
return
fields
@
staticmethod
...
...
@@ -71,17 +96,16 @@ class EventResult(object):
"""
Instanciate an Event object from a select_clauses and fields
"""
result
=
EventResul
t
()
result
.
select_clauses
=
select_clauses
result
.
event_fields
=
fields
ev
=
Even
t
()
ev
.
select_clauses
=
select_clauses
ev
.
event_fields
=
fields
for
idx
,
sattr
in
enumerate
(
select_clauses
):
if
len
(
sattr
.
BrowsePath
)
==
0
:
name
=
sattr
.
AttributeId
.
name
else
:
name
=
sattr
.
BrowsePath
[
0
].
Name
setattr
(
result
,
name
,
fields
[
idx
].
Value
)
result
.
data_types
[
name
]
=
fields
[
idx
].
VariantType
return
result
ev
.
add_property
(
name
,
fields
[
idx
].
Value
,
fields
[
idx
].
VariantType
)
return
ev
def
get_filter_from_event_type
(
eventtype
):
...
...
@@ -152,8 +176,8 @@ def get_event_obj_from_type_node(node):
"""
return an Event object from an event type node
"""
if
node
.
nodeid
.
Identifier
in
ua
.
uaevents_auto
.
IMPLEMENTED_EVENTS
.
keys
():
return
ua
.
uaevents_auto
.
IMPLEMENTED_EVENTS
[
node
.
nodeid
.
Identifier
]()
if
node
.
nodeid
.
Identifier
in
opcua
.
common
.
event_objects
.
IMPLEMENTED_EVENTS
.
keys
():
return
opcua
.
common
.
event_objects
.
IMPLEMENTED_EVENTS
[
node
.
nodeid
.
Identifier
]()
else
:
parent_identifier
,
parent_eventtype
=
_find_parent_eventtype
(
node
)
if
not
parent_eventtype
:
...
...
@@ -162,16 +186,19 @@ def get_event_obj_from_type_node(node):
class
CustomEvent
(
parent_eventtype
):
def
__init__
(
self
):
super
(
CustomEvent
,
self
).
__init__
(
extended
=
True
)
parent_eventtype
.
__init__
(
self
)
self
.
EventType
=
node
.
nodeid
curr_node
=
node
while
curr_node
.
nodeid
.
Identifier
!=
parent_identifier
:
for
prop
in
curr_node
.
get_properties
():
setattr
(
self
,
prop
.
get_browse_name
().
Name
,
prop
.
get_value
())
name
=
prop
.
get_browse_name
().
Name
val
=
prop
.
get_data_value
()
self
.
add_property
(
name
,
val
.
Value
.
Value
,
val
.
Value
.
VariantType
)
parents
=
curr_node
.
get_referenced_nodes
(
refs
=
ua
.
ObjectIds
.
HasSubtype
,
direction
=
ua
.
BrowseDirection
.
Inverse
,
includesubtypes
=
False
)
if
len
(
parents
)
!=
1
:
# Something went wrong
r
eturn
None
r
aise
UaError
(
"Parent of event type could notbe found"
)
curr_node
=
parents
[
0
]
self
.
_freeze
=
True
...
...
@@ -186,8 +213,8 @@ def _find_parent_eventtype(node):
if
len
(
parents
)
!=
1
:
# Something went wrong
return
None
,
None
if
parents
[
0
].
nodeid
.
Identifier
in
ua
.
uaevents_auto
.
IMPLEMENTED_EVENTS
.
keys
():
return
parents
[
0
].
nodeid
.
Identifier
,
ua
.
uaevents_auto
.
IMPLEMENTED_EVENTS
[
parents
[
0
].
nodeid
.
Identifier
]
if
parents
[
0
].
nodeid
.
Identifier
in
opcua
.
common
.
event_objects
.
IMPLEMENTED_EVENTS
.
keys
():
return
parents
[
0
].
nodeid
.
Identifier
,
opcua
.
common
.
event_objects
.
IMPLEMENTED_EVENTS
[
parents
[
0
].
nodeid
.
Identifier
]
else
:
return
_find_parent_eventtype
(
parents
[
0
])
opcua/common/node.py
View file @
3620aa0d
...
...
@@ -411,7 +411,7 @@ class Node(object):
result
=
self
.
history_read_events
(
details
)
event_res
=
[]
for
res
in
result
.
HistoryData
.
Events
:
event_res
.
append
(
events
.
Event
Result
.
from_event_fields
(
evfilter
.
SelectClauses
,
res
.
EventFields
))
event_res
.
append
(
events
.
Event
.
from_event_fields
(
evfilter
.
SelectClauses
,
res
.
EventFields
))
return
event_res
def
history_read_events
(
self
,
details
):
...
...
opcua/common/subscription.py
View file @
3620aa0d
...
...
@@ -142,7 +142,7 @@ class Subscription(object):
for
event
in
eventlist
.
Events
:
with
self
.
_lock
:
data
=
self
.
_monitoreditems_map
[
event
.
ClientHandle
]
result
=
events
.
Event
Result
.
from_event_fields
(
data
.
mfilter
.
SelectClauses
,
event
.
EventFields
)
result
=
events
.
Event
.
from_event_fields
(
data
.
mfilter
.
SelectClauses
,
event
.
EventFields
)
result
.
server_handle
=
data
.
server_handle
if
hasattr
(
self
.
_handler
,
"event_notification"
):
try
:
...
...
opcua/server/event_generator.py
View file @
3620aa0d
...
...
@@ -5,6 +5,7 @@ import uuid
from
opcua
import
ua
from
opcua
import
Node
from
opcua.common
import
events
from
opcua.common
import
event_objects
class
EventGenerator
(
object
):
...
...
@@ -25,14 +26,14 @@ class EventGenerator(object):
def
__init__
(
self
,
isession
,
etype
=
None
,
source
=
ua
.
ObjectIds
.
Server
):
if
not
etype
:
etype
=
ua
.
BaseEvent
()
etype
=
event_objects
.
BaseEvent
()
self
.
logger
=
logging
.
getLogger
(
__name__
)
self
.
isession
=
isession
self
.
event
=
None
node
=
None
if
isinstance
(
etype
,
ua
.
BaseEvent
):
if
isinstance
(
etype
,
event_objects
.
BaseEvent
):
self
.
event
=
etype
elif
isinstance
(
etype
,
Node
):
node
=
etype
...
...
opcua/server/history_sql.py
View file @
3620aa0d
...
...
@@ -257,7 +257,7 @@ class HistorySQLite(HistoryStorageInterface):
else
:
fdict
[
clauses
[
i
]]
=
ua
.
Variant
(
None
)
results
.
append
(
events
.
Event
Result
.
from_field_dict
(
fdict
))
results
.
append
(
events
.
Event
.
from_field_dict
(
fdict
))
except
sqlite3
.
Error
as
e
:
self
.
logger
.
error
(
'Historizing SQL Read Error events for node %s: %s'
,
source_id
,
e
)
...
...
opcua/server/internal_subscription.py
View file @
3620aa0d
...
...
@@ -5,6 +5,7 @@ server side implementation of a subscription object
from
threading
import
RLock
import
logging
import
copy
import
traceback
from
opcua
import
ua
...
...
@@ -228,26 +229,9 @@ class MonitoredItemService(object):
return
fieldlist
=
ua
.
EventFieldList
()
fieldlist
.
ClientHandle
=
mdata
.
client_handle
fieldlist
.
EventFields
=
self
.
_get_event_fields
(
mdata
.
filter
,
event
)
fieldlist
.
EventFields
=
event
.
to_event_fields
(
mdata
.
filter
.
SelectClauses
)
self
.
isub
.
enqueue_event
(
mid
,
fieldlist
,
mdata
.
queue_size
)
def
_get_event_fields
(
self
,
evfilter
,
event
):
fields
=
[]
for
sattr
in
evfilter
.
SelectClauses
:
try
:
if
not
sattr
.
BrowsePath
:
val
=
getattr
(
event
,
sattr
.
Attribute
.
name
)
val
=
copy
.
deepcopy
(
val
)
fields
.
append
(
ua
.
Variant
(
val
))
else
:
name
=
sattr
.
BrowsePath
[
0
].
Name
val
=
getattr
(
event
,
name
)
val
=
copy
.
deepcopy
(
val
)
fields
.
append
(
ua
.
Variant
(
val
))
except
AttributeError
:
fields
.
append
(
ua
.
Variant
())
return
fields
def
trigger_statuschange
(
self
,
code
):
self
.
isub
.
enqueue_statuschange
(
code
)
...
...
@@ -313,7 +297,8 @@ class InternalSubscription(object):
self
.
_stopev
=
True
result
=
None
with
self
.
_lock
:
if
self
.
has_published_results
():
# FIXME: should we pop a publish request here? or we do not care?
if
self
.
has_published_results
():
# FIXME: should we pop a publish request here? or we do not care?
self
.
_publish_cycles_count
+=
1
result
=
self
.
_pop_publish_result
()
if
result
is
not
None
:
...
...
@@ -408,7 +393,7 @@ class WhereClauseEvaluator(object):
try
:
res
=
self
.
_eval_el
(
0
,
event
)
except
Exception
as
ex
:
self
.
logger
.
warning
(
"Exception while evaluating WhereClause %s for event %s: %s"
,
self
.
elements
,
event
,
ex
)
self
.
logger
.
exception
(
"Exception while evaluating WhereClause %s for event %s: %s"
,
self
.
elements
,
event
,
ex
)
return
False
return
res
...
...
@@ -445,11 +430,10 @@ class WhereClauseEvaluator(object):
self
.
logger
.
warn
(
"Cast operand not implemented, assuming True"
)
return
True
elif
el
.
FilterOperator
==
ua
.
FilterOperator
.
OfType
:
self
.
logger
.
warn
(
"OfType operand not implemented, assuming True"
)
return
True
return
event
.
EventType
==
self
.
_eval_op
(
ops
[
0
],
event
)
else
:
# TODO: implement missing operators
print
(
"WhereClause not implemented for element: %s"
,
el
)
self
.
logger
.
warning
(
"WhereClause not implemented for element: %s"
,
el
)
raise
NotImplementedError
def
_like_operator
(
self
,
string
,
pattern
):
...
...
opcua/server/server.py
View file @
3620aa0d
...
...
@@ -20,6 +20,7 @@ from opcua.common import xmlimporter
from
opcua.common.manage_nodes
import
delete_nodes
from
opcua.client.client
import
Client
from
opcua.crypto
import
security_policies
from
opcua.common.event_objects
import
BaseEvent
use_crypto
=
True
try
:
from
opcua.crypto
import
uacrypto
...
...
@@ -343,7 +344,7 @@ class Server(object):
Use this object to fire events
"""
if
not
etype
:
etype
=
ua
.
BaseEvent
()
etype
=
BaseEvent
()
return
EventGenerator
(
self
.
iserver
.
isession
,
etype
,
source
)
def
create_custom_event_type
(
self
,
idx
,
name
,
baseetype
=
ua
.
ObjectIds
.
BaseEventType
,
properties
=
[]):
...
...
opcua/ua/__init__.py
View file @
3620aa0d
...
...
@@ -5,5 +5,4 @@ from opcua.ua.status_codes import StatusCodes
from
opcua.ua.uaprotocol_auto
import
*
from
opcua.ua.uaprotocol_hand
import
*
from
opcua.ua.uatypes
import
*
#TODO: This should be renamed to uatypes_hand
from
opcua.ua.uaevents_auto
import
*
opcua/ua/uaevents_auto.py
deleted
100644 → 0
View file @
c5c8fe8e
"""
Example auto_generated file with UA Types
For now only events!
"""
from
opcua.ua
import
*
# TODO: This should be auto generated form XML description of EventTypes
class
BaseEvent
(
FrozenClass
):
"""
BaseEvent implements BaseEventType from which inherit all other events and it is used per default.
"""
def
__init__
(
self
,
sourcenode
=
None
,
message
=
None
,
severity
=
1
,
extended
=
False
):
self
.
EventId
=
bytes
()
self
.
EventType
=
NodeId
(
ObjectIds
.
BaseEventType
)
self
.
SourceNode
=
sourcenode
self
.
SourceName
=
None
self
.
Time
=
None
self
.
ReceiveTime
=
None
self
.
LocalTime
=
None
self
.
Message
=
LocalizedText
(
message
)
self
.
Severity
=
Variant
(
severity
,
VariantType
.
UInt16
)
if
not
extended
:
self
.
_freeze
=
True
def
__str__
(
self
):
return
"{}({})"
.
format
(
type
(
self
).
__name__
,
[
str
(
k
)
+
":"
+
str
(
v
)
for
k
,
v
in
self
.
__dict__
.
items
()])
__repr__
=
__str__
class
AuditEvent
(
BaseEvent
):
"""
Audit implements AuditEventType from which inherit all other Audit events.
"""
def
__init__
(
self
,
sourcenode
=
None
,
message
=
None
,
severity
=
1
,
extended
=
False
):
super
(
AuditEvent
,
self
).
__init__
(
sourcenode
,
message
,
severity
,
True
)
self
.
EventType
=
NodeId
(
ObjectIds
.
AuditEventType
)
self
.
ActionTimeStamp
=
None
self
.
Status
=
False
self
.
ServerId
=
None
self
.
ClientAuditEntryId
=
None
self
.
ClientUserId
=
None
if
not
extended
:
self
.
_freeze
=
True
IMPLEMENTED_EVENTS
=
{
ObjectIds
.
BaseEventType
:
BaseEvent
,
ObjectIds
.
AuditEventType
:
AuditEvent
,
}
opcua/ua/uatypes.py
View file @
3620aa0d
...
...
@@ -115,6 +115,11 @@ def pack_uatype(uatype, value):
return
pack_bytes
(
value
)
elif
uatype
==
"DateTime"
:
return
pack_datetime
(
value
)
elif
uatype
==
"LocalizedText"
:
if
isinstance
(
value
,
LocalizedText
):
return
value
.
to_binary
()
else
:
return
LocalizedText
(
value
).
to_binary
()
elif
uatype
==
"ExtensionObject"
:
# dependency loop: classes in uaprotocol_auto use Variant defined in this file,
# but Variant can contain any object from uaprotocol_auto as ExtensionObject.
...
...
tests/tests_server.py
View file @
3620aa0d
...
...
@@ -13,6 +13,7 @@ from opcua import Server
from
opcua
import
Client
from
opcua
import
ua
from
opcua
import
uamethod
from
opcua.common.event_objects
import
BaseEvent
,
AuditEvent
port_num
=
485140
...
...
@@ -179,16 +180,15 @@ class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
def
test_get_event_from_type_node_Inhereted_AuditEvent
(
self
):
ev
=
opcua
.
common
.
events
.
get_event_obj_from_type_node
(
opcua
.
Node
(
self
.
opc
.
iserver
.
isession
,
ua
.
NodeId
(
ua
.
ObjectIds
.
AuditEventType
)))
self
.
assertIsNot
(
ev
,
None
)
# we did not receive event
self
.
assertIsInstance
(
ev
,
ua
.
BaseEvent
)
self
.
assertIsInstance
(
ev
,
ua
.
AuditEvent
)
self
.
assertIsInstance
(
ev
,
BaseEvent
)
self
.
assertIsInstance
(
ev
,
AuditEvent
)
self
.
assertEqual
(
ev
.
EventType
,
ua
.
NodeId
(
ua
.
ObjectIds
.
AuditEventType
))
self
.
assertEqual
(
ev
.
Severity
,
ua
.
Variant
(
1
,
ua
.
VariantType
.
UInt16
)
)
self
.
assertEqual
(
ev
.
Severity
,
1
)
self
.
assertEqual
(
ev
.
ActionTimeStamp
,
None
)
self
.
assertEqual
(
ev
.
Status
,
False
)
self
.
assertEqual
(
ev
.
ServerId
,
None
)
self
.
assertEqual
(
ev
.
ClientAuditEntryId
,
None
)
self
.
assertEqual
(
ev
.
ClientUserId
,
None
)
self
.
assertEqual
(
ev
.
_freeze
,
True
)
def
test_eventgenerator_default
(
self
):
evgen
=
self
.
opc
.
get_event_generator
()
...
...
@@ -196,7 +196,7 @@ class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
check_eventgenerator_SourceServer
(
self
,
evgen
)
def
test_eventgenerator_BaseEvent_object
(
self
):
evgen
=
self
.
opc
.
get_event_generator
(
ua
.
BaseEvent
())
evgen
=
self
.
opc
.
get_event_generator
(
BaseEvent
())
check_eventgenerator_BaseEvent
(
self
,
evgen
)
check_eventgenerator_SourceServer
(
self
,
evgen
)
...
...
@@ -245,7 +245,7 @@ class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
def
test_eventgenerator_source_collision
(
self
):
objects
=
self
.
opc
.
get_objects_node
()
o
=
objects
.
add_object
(
3
,
'MyObject'
)
event
=
ua
.
BaseEvent
(
sourcenode
=
o
.
nodeid
)
event
=
BaseEvent
(
sourcenode
=
o
.
nodeid
)
evgen
=
self
.
opc
.
get_event_generator
(
event
,
ua
.
ObjectIds
.
Server
)
check_eventgenerator_BaseEvent
(
self
,
evgen
)
check_event_generator_object
(
self
,
evgen
,
o
)
...
...
@@ -256,16 +256,15 @@ class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
ev
=
evgen
.
event
self
.
assertIsNot
(
ev
,
None
)
# we did not receive event
self
.
assertIsInstance
(
ev
,
ua
.
BaseEvent
)
self
.
assertIsInstance
(
ev
,
ua
.
AuditEvent
)
self
.
assertIsInstance
(
ev
,
BaseEvent
)
self
.
assertIsInstance
(
ev
,
AuditEvent
)
self
.
assertEqual
(
ev
.
EventType
,
ua
.
NodeId
(
ua
.
ObjectIds
.
AuditEventType
))
self
.
assertEqual
(
ev
.
Severity
,
ua
.
Variant
(
1
,
ua
.
VariantType
.
UInt16
)
)
self
.
assertEqual
(
ev
.
Severity
,
1
)
self
.
assertEqual
(
ev
.
ActionTimeStamp
,
None
)
self
.
assertEqual
(
ev
.
Status
,
False
)
self
.
assertEqual
(
ev
.
ServerId
,
None
)
self
.
assertEqual
(
ev
.
ClientAuditEntryId
,
None
)
self
.
assertEqual
(
ev
.
ClientUserId
,
None
)
self
.
assertEqual
(
ev
.
_freeze
,
True
)
def
test_create_custom_event_type_ObjectId
(
self
):
etype
=
self
.
opc
.
create_custom_event_type
(
2
,
'MyEvent'
,
ua
.
ObjectIds
.
BaseEventType
,
[(
'PropertyNum'
,
ua
.
VariantType
.
Float
),
(
'PropertyString'
,
ua
.
VariantType
.
String
)])
...
...
@@ -353,10 +352,9 @@ def check_eventgenerator_BaseEvent(test, evgen):
def
check_base_event
(
test
,
ev
):
test
.
assertIsNot
(
ev
,
None
)
# we did not receive event
test
.
assertIsInstance
(
ev
,
ua
.
BaseEvent
)
test
.
assertIsInstance
(
ev
,
BaseEvent
)
test
.
assertEqual
(
ev
.
EventType
,
ua
.
NodeId
(
ua
.
ObjectIds
.
BaseEventType
))
test
.
assertEqual
(
ev
.
Severity
,
ua
.
Variant
(
1
,
ua
.
VariantType
.
UInt16
))
test
.
assertEqual
(
ev
.
_freeze
,
True
)
test
.
assertEqual
(
ev
.
Severity
,
1
)
def
check_eventgenerator_CustomEvent
(
test
,
evgen
,
etype
):
...
...
@@ -367,10 +365,9 @@ def check_eventgenerator_CustomEvent(test, evgen, etype):
def
check_custom_event
(
test
,
ev
,
etype
):
test
.
assertIsNot
(
ev
,
None
)
# we did not receive event
test
.
assertIsInstance
(
ev
,
ua
.
BaseEvent
)
test
.
assertIsInstance
(
ev
,
BaseEvent
)
test
.
assertEqual
(
ev
.
EventType
,
etype
.
nodeid
)
test
.
assertEqual
(
ev
.
Severity
,
ua
.
Variant
(
1
,
ua
.
VariantType
.
UInt16
))
test
.
assertEqual
(
ev
.
_freeze
,
True
)
test
.
assertEqual
(
ev
.
Severity
,
1
)
def
check_custom_event_type
(
test
,
ev
):
...
...
tests/tests_unit.py
View file @
3620aa0d
...
...
@@ -10,6 +10,7 @@ from opcua.ua import extensionobject_from_binary
from
opcua.ua
import
extensionobject_to_binary
from
opcua.ua.uatypes
import
flatten
,
get_shape
,
reshape
from
opcua.server.internal_subscription
import
WhereClauseEvaluator
from
opcua.common.event_objects
import
BaseEvent
...
...
@@ -379,7 +380,7 @@ class TestUnit(unittest.TestCase):
wce
=
WhereClauseEvaluator
(
logging
.
getLogger
(
__name__
),
None
,
cf
)
ev
=
ua
.
BaseEvent
()
ev
=
BaseEvent
()
ev
.
_freeze
=
False
ev
.
property
=
3
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment