Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
9b946e22
Commit
9b946e22
authored
Feb 23, 2021
by
oroulet
Committed by
oroulet
Mar 10, 2021
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
client and server seems to work
parent
7d735413
Changes
11
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
1184 additions
and
93 deletions
+1184
-93
asyncua/client/ua_client.py
asyncua/client/ua_client.py
+2
-1
asyncua/common/instantiate_util.py
asyncua/common/instantiate_util.py
+3
-3
asyncua/common/manage_nodes.py
asyncua/common/manage_nodes.py
+2
-2
asyncua/common/xmlimporter.py
asyncua/common/xmlimporter.py
+30
-50
asyncua/ua/ua_binary.py
asyncua/ua/ua_binary.py
+6
-7
asyncua/ua/uaprotocol_auto.py
asyncua/ua/uaprotocol_auto.py
+1062
-8
asyncua/ua/uaprotocol_hand.py
asyncua/ua/uaprotocol_hand.py
+11
-10
asyncua/ua/uatypes.py
asyncua/ua/uatypes.py
+11
-4
examples/server-minimal.py
examples/server-minimal.py
+5
-4
schemas/generate_protocol_python.py
schemas/generate_protocol_python.py
+22
-4
tests/test_unit.py
tests/test_unit.py
+30
-0
No files found.
asyncua/client/ua_client.py
View file @
9b946e22
...
...
@@ -168,7 +168,8 @@ class UASocketProtocol(asyncio.Protocol):
try
:
self
.
_callbackmap
[
request_id
].
set_result
(
body
)
except
KeyError
:
raise
ua
.
UaError
(
f"No request found for request id:
{
request_id
}
, pending are
{
self
.
_callbackmap
.
keys
()
}
"
)
#raise ua.UaError(f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}, body was {body}")
pass
except
asyncio
.
InvalidStateError
:
if
not
self
.
closed
:
self
.
logger
.
warning
(
"Future for request id %s is already done"
,
request_id
)
...
...
asyncua/common/instantiate_util.py
View file @
9b946e22
...
...
@@ -23,7 +23,7 @@ async def instantiate(parent, node_type, nodeid=None, bname=None, dname=None, id
rdesc
.
TypeDefinition
=
node_type
.
nodeid
if
nodeid
is
None
:
nodeid
=
ua
.
NodeId
(
namespaceid
x
=
idx
)
# will trigger automatic node generation in namespace idx
nodeid
=
ua
.
NodeId
(
NamespaceInde
x
=
idx
)
# will trigger automatic node generation in namespace idx
if
bname
is
None
:
bname
=
rdesc
.
BrowseName
elif
isinstance
(
bname
,
str
):
...
...
@@ -112,7 +112,7 @@ async def _instantiate_node(server,
c_node_type
,
res
.
AddedNodeId
,
c_rdesc
,
nodeid
=
ua
.
NodeId
(
identifier
=
inst_nodeid
,
namespaceid
x
=
res
.
AddedNodeId
.
NamespaceIndex
),
nodeid
=
ua
.
NodeId
(
Identifier
=
inst_nodeid
,
NamespaceInde
x
=
res
.
AddedNodeId
.
NamespaceIndex
),
bname
=
c_rdesc
.
BrowseName
,
instantiate_optional
=
instantiate_optional
)
...
...
@@ -122,7 +122,7 @@ async def _instantiate_node(server,
c_node_type
,
res
.
AddedNodeId
,
c_rdesc
,
nodeid
=
ua
.
NodeId
(
namespaceid
x
=
res
.
AddedNodeId
.
NamespaceIndex
),
nodeid
=
ua
.
NodeId
(
NamespaceInde
x
=
res
.
AddedNodeId
.
NamespaceIndex
),
bname
=
c_rdesc
.
BrowseName
,
instantiate_optional
=
instantiate_optional
)
...
...
asyncua/common/manage_nodes.py
View file @
9b946e22
...
...
@@ -398,7 +398,7 @@ async def _create_method(parent, nodeid, qname, callback, inputs, outputs):
if
inputs
:
await
create_property
(
method
,
ua
.
NodeId
(
namespaceid
x
=
method
.
nodeid
.
NamespaceIndex
),
ua
.
NodeId
(
NamespaceInde
x
=
method
.
nodeid
.
NamespaceIndex
),
ua
.
QualifiedName
(
"InputArguments"
,
0
),
[
_vtype_to_argument
(
vtype
)
for
vtype
in
inputs
],
varianttype
=
ua
.
VariantType
.
ExtensionObject
,
...
...
@@ -407,7 +407,7 @@ async def _create_method(parent, nodeid, qname, callback, inputs, outputs):
if
outputs
:
await
create_property
(
method
,
ua
.
NodeId
(
namespaceid
x
=
method
.
nodeid
.
NamespaceIndex
),
ua
.
NodeId
(
NamespaceInde
x
=
method
.
nodeid
.
NamespaceIndex
),
ua
.
QualifiedName
(
"OutputArguments"
,
0
),
[
_vtype_to_argument
(
vtype
)
for
vtype
in
outputs
],
varianttype
=
ua
.
VariantType
.
ExtensionObject
,
...
...
asyncua/common/xmlimporter.py
View file @
9b946e22
...
...
@@ -5,13 +5,11 @@ format is the one from opc-ua specification
import
logging
import
uuid
from
typing
import
Union
,
Dict
from
copy
import
copy
from
asyncua
import
ua
from
.xmlparser
import
XMLParser
,
ua_type_to_python
from
..ua.uaerrors
import
UaError
_logger
=
logging
.
getLogger
(
__name__
)
...
...
@@ -53,15 +51,11 @@ class XmlImporter:
server_model_list
=
[]
server_namespaces_node
=
await
self
.
server
.
nodes
.
namespaces
.
get_children
()
for
model_node
in
server_namespaces_node
:
server_model_list
.
append
(
{
server_model_list
.
append
({
"ModelUri"
:
await
(
await
model_node
.
get_child
(
"NamespaceUri"
)).
read_value
(),
"Version"
:
await
(
await
model_node
.
get_child
(
"NamespaceVersion"
)).
read_value
(),
"PublicationDate"
:
(
await
(
await
model_node
.
get_child
(
"NamespacePublicationDate"
)).
read_value
()
).
strftime
(
"%Y-%m-%dT%H:%M:%SZ"
),
}
)
"PublicationDate"
:
(
await
(
await
model_node
.
get_child
(
"NamespacePublicationDate"
)).
read_value
()).
strftime
(
"%Y-%m-%dT%H:%M:%SZ"
),
})
return
server_model_list
async
def
_check_required_models
(
self
,
xmlpath
=
None
,
xmlstring
=
None
):
...
...
@@ -71,10 +65,7 @@ class XmlImporter:
server_model_list
=
await
self
.
_get_existing_model_in_namespace
()
for
model
in
server_model_list
:
for
req_model
in
req_models
:
if
(
model
[
"ModelUri"
]
==
req_model
[
"ModelUri"
]
and
model
[
"PublicationDate"
]
>=
req_model
[
"PublicationDate"
]
):
if
(
model
[
"ModelUri"
]
==
req_model
[
"ModelUri"
]
and
model
[
"PublicationDate"
]
>=
req_model
[
"PublicationDate"
]):
if
"Version"
in
model
and
"Version"
in
req_model
:
if
model
[
"Version"
]
>=
req_model
[
"Version"
]:
req_models
.
remove
(
req_model
)
...
...
@@ -123,15 +114,14 @@ class XmlImporter:
if
missing_nodes
:
_logger
.
warning
(
f"The following references exist, but the Nodes are missing:
{
missing_nodes
}
"
)
if
len
(
self
.
refs
):
_logger
.
warning
(
"The following references could not be imported and are probably broken: %s"
,
self
.
refs
,)
_logger
.
warning
(
"The following references could not be imported and are probably broken: %s"
,
self
.
refs
,
)
return
nodes
async
def
_add_missing_reverse_references
(
self
,
new_nodes
):
__unidirectional_types
=
{
ua
.
ObjectIds
.
GuardVariableType
,
ua
.
ObjectIds
.
HasGuard
,
ua
.
ObjectIds
.
TransitionVariableType
,
ua
.
ObjectIds
.
StateMachineType
,
ua
.
ObjectIds
.
StateVariableType
,
ua
.
ObjectIds
.
TwoStateVariableType
,
ua
.
ObjectIds
.
StateType
,
ua
.
ObjectIds
.
TransitionType
,
ua
.
ObjectIds
.
FiniteTransitionVariableType
,
ua
.
ObjectIds
.
HasInterface
}
__unidirectional_types
=
{
ua
.
ObjectIds
.
GuardVariableType
,
ua
.
ObjectIds
.
HasGuard
,
ua
.
ObjectIds
.
TransitionVariableType
,
ua
.
ObjectIds
.
StateMachineType
,
ua
.
ObjectIds
.
StateVariableType
,
ua
.
ObjectIds
.
TwoStateVariableType
,
ua
.
ObjectIds
.
StateType
,
ua
.
ObjectIds
.
TransitionType
,
ua
.
ObjectIds
.
FiniteTransitionVariableType
,
ua
.
ObjectIds
.
HasInterface
}
dangling_refs_to_missing_nodes
=
set
()
for
new_node_id
in
new_nodes
:
new_n
=
self
.
server
.
get_node
(
new_node_id
)
...
...
@@ -237,7 +227,7 @@ class XmlImporter:
field
.
datatype
=
self
.
_to_migrated_nodeid
(
field
.
datatype
)
return
new_nodes
def
_migrate_ns
(
self
,
nodeid
:
ua
.
NodeId
)
->
ua
.
NodeId
:
def
_migrate_ns
(
self
,
obj
:
Union
[
ua
.
NodeId
,
ua
.
QualifiedName
])
->
Union
[
ua
.
NodeId
,
ua
.
QualifiedName
]
:
"""
Check if the index of nodeid or browsename given in the xml model file
must be converted to a already existing namespace id based on the files
...
...
@@ -245,10 +235,13 @@ class XmlImporter:
:returns: NodeId (str)
"""
if
nodeid
.
NamespaceIndex
in
self
.
namespaces
:
nodeid
=
copy
(
nodeid
)
nodeid
.
NamespaceIndex
=
self
.
namespaces
[
nodeid
.
NamespaceIndex
]
return
nodeid
if
isinstance
(
obj
,
ua
.
NodeId
):
if
obj
.
NamespaceIndex
in
self
.
namespaces
:
obj
=
ua
.
NodeId
(
Identifier
=
obj
.
Identifier
,
NamespaceIndex
=
self
.
namespaces
[
obj
.
NamespaceIndex
],
NodeIdType
=
obj
.
NodeIdType
)
if
isinstance
(
obj
,
ua
.
QualifiedName
):
if
obj
.
NamespaceIndex
in
self
.
namespaces
:
obj
=
ua
.
QualifiedName
(
Name
=
obj
.
Name
,
NamespaceIndex
=
self
.
namespaces
[
obj
.
NamespaceIndex
])
return
obj
def
_get_add_node_item
(
self
,
obj
):
node
=
ua
.
AddNodesItem
()
...
...
@@ -322,9 +315,7 @@ class XmlImporter:
attrs
.
DisplayName
=
ua
.
LocalizedText
(
obj
.
displayname
)
attrs
.
DataType
=
obj
.
datatype
if
obj
.
value
is
not
None
:
attrs
.
Value
=
self
.
_add_variable_value
(
obj
,
)
attrs
.
Value
=
self
.
_add_variable_value
(
obj
,
)
if
obj
.
rank
:
attrs
.
ValueRank
=
obj
.
rank
if
obj
.
accesslevel
:
...
...
@@ -417,20 +408,13 @@ class XmlImporter:
values
.
append
(
extobj
)
return
ua
.
Variant
(
values
,
ua
.
VariantType
.
ExtensionObject
)
elif
obj
.
valuetype
==
"ListOfGuid"
:
return
ua
.
Variant
(
[
uuid
.
UUID
(
guid
)
for
guid
in
obj
.
value
],
getattr
(
ua
.
VariantType
,
obj
.
valuetype
[
6
:])
)
return
ua
.
Variant
([
uuid
.
UUID
(
guid
)
for
guid
in
obj
.
value
],
getattr
(
ua
.
VariantType
,
obj
.
valuetype
[
6
:]))
elif
obj
.
valuetype
.
startswith
(
"ListOf"
):
vtype
=
obj
.
valuetype
[
6
:]
if
hasattr
(
ua
.
ua_binary
.
Primitives
,
vtype
):
return
ua
.
Variant
(
obj
.
value
,
getattr
(
ua
.
VariantType
,
vtype
))
elif
vtype
==
"LocalizedText"
:
return
ua
.
Variant
(
[
getattr
(
ua
,
vtype
)(
text
=
item
[
"Text"
],
locale
=
item
[
"Locale"
])
for
item
in
obj
.
value
]
)
return
ua
.
Variant
([
getattr
(
ua
,
vtype
)(
Text
=
item
[
"Text"
],
Locale
=
item
[
"Locale"
])
for
item
in
obj
.
value
])
else
:
return
ua
.
Variant
([
getattr
(
ua
,
vtype
)(
v
)
for
v
in
obj
.
value
])
elif
obj
.
valuetype
==
"ExtensionObject"
:
...
...
@@ -570,11 +554,11 @@ class XmlImporter:
f
=
ua
.
EnumField
()
f
.
Name
=
field
.
name
if
field
.
dname
:
f
.
DisplayName
=
ua
.
LocalizedText
(
t
ext
=
field
.
dname
)
f
.
DisplayName
=
ua
.
LocalizedText
(
T
ext
=
field
.
dname
)
else
:
f
.
DisplayName
=
ua
.
LocalizedText
(
t
ext
=
field
.
name
)
f
.
DisplayName
=
ua
.
LocalizedText
(
T
ext
=
field
.
name
)
f
.
Value
=
field
.
value
f
.
Description
=
ua
.
LocalizedText
(
t
ext
=
field
.
desc
)
f
.
Description
=
ua
.
LocalizedText
(
T
ext
=
field
.
desc
)
edef
.
Fields
.
append
(
f
)
return
edef
...
...
@@ -599,7 +583,7 @@ class XmlImporter:
if
f
.
IsOptional
:
optional
=
True
f
.
ArrayDimensions
=
field
.
arraydim
f
.
Description
=
ua
.
LocalizedText
(
t
ext
=
field
.
desc
)
f
.
Description
=
ua
.
LocalizedText
(
T
ext
=
field
.
desc
)
sdef
.
Fields
.
append
(
f
)
if
optional
:
sdef
.
StructureType
=
ua
.
StructureType
.
StructureWithOptionalFields
...
...
@@ -621,11 +605,7 @@ class XmlImporter:
all_node_ids
=
[
data
.
nodeid
for
data
in
ndatas
]
while
ndatas
:
for
ndata
in
ndatas
[:]:
if
(
ndata
.
nodeid
.
NamespaceIndex
not
in
self
.
namespaces
.
values
()
or
ndata
.
parent
is
None
or
ndata
.
parent
not
in
all_node_ids
):
if
(
ndata
.
nodeid
.
NamespaceIndex
not
in
self
.
namespaces
.
values
()
or
ndata
.
parent
is
None
or
ndata
.
parent
not
in
all_node_ids
):
sorted_ndatas
.
append
(
ndata
)
sorted_nodes_ids
.
append
(
ndata
.
nodeid
)
ndatas
.
remove
(
ndata
)
...
...
asyncua/ua/ua_binary.py
View file @
9b946e22
...
...
@@ -350,12 +350,13 @@ def nodeid_to_binary(nodeid):
def
nodeid_from_binary
(
data
):
encoding
=
ord
(
data
.
read
(
1
))
nidtype
=
ua
.
NodeIdType
(
encoding
&
0b00111111
)
expanded
=
False
uri
=
None
server_idx
=
None
if
nidtype
==
ua
.
NodeIdType
.
TwoByte
:
identifier
=
ord
(
data
.
read
(
1
))
return
ua
.
TwoByteNodeId
(
identifier
)
if
nidtype
==
ua
.
NodeIdType
.
FourByte
:
nidx
=
0
el
if
nidtype
==
ua
.
NodeIdType
.
FourByte
:
nidx
,
identifier
=
struct
.
unpack
(
"<BH"
,
data
.
read
(
3
))
elif
nidtype
==
ua
.
NodeIdType
.
Numeric
:
nidx
,
identifier
=
struct
.
unpack
(
"<HI"
,
data
.
read
(
6
))
...
...
@@ -373,12 +374,10 @@ def nodeid_from_binary(data):
if
test_bit
(
encoding
,
7
):
uri
=
Primitives
.
String
.
unpack
(
data
)
expanded
=
True
if
test_bit
(
encoding
,
6
):
server_idx
=
Primitives
.
UInt32
.
unpack
(
data
)
expanded
=
True
if
expanded
:
if
uri
is
not
None
or
server_idx
is
not
None
:
return
ua
.
ExpandedNodeId
(
identifier
,
nidx
,
nidtype
,
uri
,
server_idx
)
return
ua
.
NodeId
(
identifier
,
nidx
,
nidtype
)
...
...
@@ -449,7 +448,7 @@ def extensionobject_from_binary(data):
body
=
data
.
copy
(
length
)
data
.
skip
(
length
)
if
typeid
.
Identifier
==
0
:
return
None
return
ua
.
ExtensionObject
()
elif
typeid
in
ua
.
extension_objects_by_typeid
:
cls
=
ua
.
extension_objects_by_typeid
[
typeid
]
if
body
is
None
:
...
...
asyncua/ua/uaprotocol_auto.py
View file @
9b946e22
This diff is collapsed.
Click to expand it.
asyncua/ua/uaprotocol_hand.py
View file @
9b946e22
...
...
@@ -49,6 +49,7 @@ class Header(uatypes.FrozenClass):
ChannelId
:
int
=
0
body_size
=
0
packet_size
=
0
header_size
=
8
def
add_size
(
self
,
size
):
self
.
body_size
+=
size
...
...
asyncua/ua/uatypes.py
View file @
9b946e22
...
...
@@ -263,7 +263,7 @@ class StatusCode:
:vartype doc: string
"""
value
:
UInt32
=
0
value
:
UInt32
=
status_codes
.
StatusCodes
.
Good
def
__post_init__
(
self
):
if
isinstance
(
self
.
value
,
str
):
...
...
@@ -385,7 +385,6 @@ class NodeId:
def
__lt__
(
self
,
other
):
if
not
isinstance
(
other
,
NodeId
):
raise
AttributeError
(
"Can only compare to NodeId"
)
print
(
"COMPARE"
,
self
,
other
)
print
(
self
.
NodeIdType
,
self
.
NamespaceIndex
,
self
.
Identifier
,
other
.
NodeIdType
,
other
.
NamespaceIndex
,
other
.
Identifier
)
return
(
self
.
NodeIdType
,
self
.
NamespaceIndex
,
self
.
Identifier
)
<
(
other
.
NodeIdType
,
other
.
NamespaceIndex
,
other
.
Identifier
)
...
...
@@ -643,7 +642,7 @@ class ExtensionObject:
TypeId: NodeId = NodeId()
Encoding: Byte = field(default=0, repr=False, init=False)
Body: Optional[ByteString] =
b""
Body: Optional[ByteString] =
None
def __bool__(self):
return self.Body is not None
...
...
@@ -910,7 +909,7 @@ class DataValue:
Encoding: Byte = field(default=0, repr=False, init=False)
Value: Optional[Variant] = None
StatusCode
: Optional[StatusCode] = None #
field(default_factory=StatusCode)
StatusCode
_: Optional[StatusCode] =
field(default_factory=StatusCode)
SourceTimestamp: Optional[DateTime] = None
SourcePicoseconds: Optional[UInt16] = None
ServerTimestamp: Optional[DateTime] = None
...
...
@@ -922,6 +921,14 @@ class DataValue:
if not isinstance(self.Value, Variant):
self.Value = Variant(self.Value)
@property
def StatusCode(self):
return self.StatusCode_
@StatusCode.setter
def StatusCode(self, val):
self.StatusCode_ = val
def datatype_to_varianttype(int_type):
"""
...
...
examples/server-minimal.py
View file @
9b946e22
...
...
@@ -7,9 +7,6 @@ from asyncua import ua, Server
from
asyncua.common.methods
import
uamethod
logging
.
basicConfig
(
level
=
logging
.
INFO
)
_logger
=
logging
.
getLogger
(
'asyncua'
)
@
uamethod
def
func
(
parent
,
value
):
...
...
@@ -17,6 +14,7 @@ def func(parent, value):
async
def
main
():
_logger
=
logging
.
getLogger
(
'asyncua'
)
# setup our server
server
=
Server
()
await
server
.
init
()
...
...
@@ -43,4 +41,7 @@ async def main():
if
__name__
==
'__main__'
:
asyncio
.
run
(
main
())
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
asyncio
.
run
(
main
(),
debug
=
True
)
schemas/generate_protocol_python.py
View file @
9b946e22
...
...
@@ -80,7 +80,7 @@ class CodeGenerator:
self
.
write
(
''
)
self
.
write
(
'from datetime import datetime'
)
self
.
write
(
'from enum import IntEnum'
)
self
.
write
(
'from typing import Union, List'
)
self
.
write
(
'from typing import Union, List
, Optional
'
)
self
.
write
(
'from dataclasses import dataclass, field'
)
self
.
write
(
''
)
# self.write('from asyncua.ua.uaerrors import UaError')
...
...
@@ -137,14 +137,21 @@ class CodeGenerator:
if
"BodyLength"
in
[
f
.
name
for
f
in
obj
.
fields
]:
extobj_hack
=
True
hack_names
=
[]
for
field
in
obj
.
fields
:
# FIXME; flag optional those that are optional
if
field
.
length
:
typestring
=
f"List[
{
field
.
uatype
}
]"
elif
field
.
switchfield
is
not
None
:
typestring
=
f"Optional[
{
field
.
uatype
}
]"
else
:
typestring
=
field
.
uatype
if
field
.
name
==
field
.
uatype
:
# help!!! selv referencing class
if
field
.
name
==
field
.
uatype
:
# variable name and type name are the same. Dataclass do not like it
print
(
"SELF REFENCING"
,
obj
,
field
)
hack_names
.
append
(
field
.
name
)
fieldname
=
field
.
name
+
"_"
else
:
fieldname
=
field
.
name
...
...
@@ -153,8 +160,8 @@ class CodeGenerator:
val
=
0
if
not
extobj_hack
else
1
self
.
write
(
f"
{
field
.
name
}
: Byte = field(default=
{
val
}
, repr=False, init=False)"
)
elif
field
.
uatype
==
obj
.
name
:
# help!!! selv referencing class
pass
#FIXME: handle
#FIXME: handle better
self
.
write
(
f"
{
fieldname
}
: Optional[ExtensionObject] = None"
)
elif
obj
.
name
not
in
(
"ExtensionObject"
,)
and
\
field
.
name
==
"TypeId"
:
# and ( obj.name.endswith("Request") or obj.name.endswith("Response")):
self
.
write
(
f"TypeId: NodeId = FourByteNodeId(ObjectIds.
{
obj
.
name
}
_Encoding_DefaultBinary)"
)
...
...
@@ -175,6 +182,17 @@ class CodeGenerator:
if
switch_written
:
self
.
write
(
"}"
)
if
hack_names
:
self
.
write
(
""
)
for
name
in
hack_names
:
self
.
write
(
"@property"
)
self
.
write
(
f"def
{
name
}
(self):"
)
self
.
write
(
f" return self.
{
name
}
_"
)
self
.
write
(
""
)
self
.
write
(
f"@
{
name
}
.setter"
)
self
.
write
(
f"def
{
name
}
(self, val):"
)
self
.
write
(
f" self.
{
name
}
_ = val"
)
self
.
iidx
=
0
def
write_unpack_enum
(
self
,
name
,
enum
):
...
...
tests/test_unit.py
View file @
9b946e22
...
...
@@ -441,6 +441,13 @@ def test_null_string():
assert
v
.
Value
==
v2
.
Value
def
test_empty_extension_object
():
obj
=
ua
.
ExtensionObject
()
obj2
=
extensionobject_from_binary
(
ua
.
utils
.
Buffer
(
extensionobject_to_binary
(
obj
)))
assert
type
(
obj
)
==
type
(
obj2
)
assert
obj
==
obj2
def
test_extension_object
():
obj
=
ua
.
UserNameIdentityToken
()
obj
.
UserName
=
"admin"
...
...
@@ -732,6 +739,8 @@ def test_bin_data_type_def():
dta
=
ua
.
DataTypeAttributes
()
dta
.
DisplayName
=
ua
.
LocalizedText
(
"titi"
)
ad
.
NodeAttributes
=
dta
from
IPython
import
embed
embed
()
data
=
struct_to_binary
(
ad
)
ad2
=
struct_from_binary
(
ua
.
AddNodesItem
,
ua
.
utils
.
Buffer
(
data
))
...
...
@@ -746,3 +755,24 @@ def test_bin_datattributes():
data
=
struct_to_binary
(
dta
)
dta2
=
struct_from_binary
(
ua
.
DataTypeAttributes
,
ua
.
utils
.
Buffer
(
data
))
assert
dta
.
DisplayName
==
dta2
.
DisplayName
def
test_browse
():
data
=
b'
\
x01
\
x00
\
x12
\
x02
\
xe0
S2
\
xb3
\
x8f
\
n
\
xd7
\
x01
\
x04
\
x00
\
x00
\
x00
\
x00
\
x00
\
x00
\
x00
\
x00
\
xff
\
xff
\
xff
\
xff
\
x00
\
x00
\
x00
\
x01
\
x00
\
x00
\
x00
\
x00
\
x00
\
x00
\
x00
\
xff
\
xff
\
xff
\
xff
\
x03
\
x00
\
x00
\
x00
\
x00
#
\
x01
@U
\
x00
\
x00
\
x00
\
x00
\
x00
\
x00
\
x07
\
x00
\
x00
\
x00
Objects
\
x02
\
x07
\
x00
\
x00
\
x00
Objects
\
x01
\
x00
\
x00
\
x00
@=
\
x00
\
x00
\
x00
\
x00
\
x00
#
\
x01
@V
\
x00
\
x00
\
x00
\
x00
\
x00
\
x00
\
x05
\
x00
\
x00
\
x00
Types
\
x02
\
x05
\
x00
\
x00
\
x00
Types
\
x01
\
x00
\
x00
\
x00
@=
\
x00
\
x00
\
x00
\
x00
\
x00
#
\
x01
@W
\
x00
\
x00
\
x00
\
x00
\
x00
\
x00
\
x05
\
x00
\
x00
\
x00
Views
\
x02
\
x05
\
x00
\
x00
\
x00
Views
\
x01
\
x00
\
x00
\
x00
@=
\
x00
\
x00
\
x00
\
x00
\
xff
\
xff
\
xff
\
xff
'
#data = b'\x01\x00\x12\x020)E\x11"\n\xd7\x01\x04\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\x03\x00\x00\x00\x00#\x01@U\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00Objects\x02\x07\x00\x00\x00Objects\x01\x00\x00\x00@=\x00\x00\x00\x00\x00#\x01@V\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00Types\x02\x05\x00\x00\x00Types\x01\x00\x00\x00@=\x00\x00\x00\x00\x00#\x01@W\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00Views\x02\x05\x00\x00\x00Views\x01\x00\x00\x00@=\x00\x00\x00\x00\xff\xff\xff\xff'
res
=
struct_from_binary
(
ua
.
BrowseResponse
,
ua
.
utils
.
Buffer
(
data
))
def
test_bname
():
qn
=
ua
.
QualifiedName
(
"TOTO"
,
2
)
d
=
struct_to_binary
(
qn
)
qn2
=
struct_from_binary
(
ua
.
QualifiedName
,
ua
.
utils
.
Buffer
(
d
))
assert
qn
==
qn2
def
test_expandedNodeId
():
d
=
b"
\
x40
\
x55
\
x00
\
x00
\
x00
\
x00
"
nid
=
nodeid_from_binary
(
ua
.
utils
.
Buffer
(
d
))
assert
isinstance
(
nid
,
ua
.
ExpandedNodeId
)
assert
nid
.
ServerIndex
==
0
assert
nid
.
Identifier
==
85
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment