Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
a7a4acb1
Commit
a7a4acb1
authored
Jan 16, 2016
by
olivier R-D
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
handle arrays as lists of lists. add tests
parent
d09971cf
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
126 additions
and
14 deletions
+126
-14
opcua/ua/uatypes.py
opcua/ua/uatypes.py
+62
-8
tests/tests.py
tests/tests.py
+64
-6
No files found.
opcua/ua/uatypes.py
View file @
a7a4acb1
...
...
@@ -729,17 +729,16 @@ class Variant(FrozenClass):
self
.
Value
=
value
self
.
VariantType
=
varianttype
self
.
Dimensions
=
dimensions
self
.
_freeze
=
True
if
isinstance
(
value
,
Variant
):
self
.
Value
=
value
.
Value
self
.
VariantType
=
value
.
VariantType
if
self
.
VariantType
is
None
:
if
type
(
self
.
Value
)
in
(
list
,
tuple
):
if
len
(
self
.
Value
)
==
0
:
raise
UAError
(
"could not guess UA variable type"
)
self
.
VariantType
=
self
.
_guess_type
(
self
.
Value
[
0
])
else
:
self
.
VariantType
=
self
.
_guess_type
(
self
.
Value
)
self
.
_freeze
=
True
self
.
VariantType
=
self
.
_guess_type
(
self
.
Value
)
if
self
.
Dimensions
is
None
and
type
(
self
.
Value
)
in
(
list
,
tuple
):
dims
=
get_dimensions
(
self
.
Value
)
if
len
(
dims
)
>
1
:
self
.
Dimensions
=
dims
def
__eq__
(
self
,
other
):
if
isinstance
(
other
,
Variant
)
and
self
.
VariantType
==
other
.
VariantType
and
self
.
Value
==
other
.
Value
:
...
...
@@ -747,6 +746,10 @@ class Variant(FrozenClass):
return
False
def
_guess_type
(
self
,
val
):
while
isinstance
(
val
,
(
list
,
tuple
)):
if
len
(
val
)
==
0
:
raise
UAError
(
"could not guess UA variable type"
)
val
=
val
[
0
]
if
val
is
None
:
return
VariantType
.
Null
elif
isinstance
(
val
,
bool
):
...
...
@@ -783,7 +786,7 @@ class Variant(FrozenClass):
self
.
Encoding
=
set_bit
(
self
.
Encoding
,
6
)
self
.
Encoding
=
set_bit
(
self
.
Encoding
,
7
)
b
.
append
(
uatype_UInt8
.
pack
(
self
.
Encoding
))
b
.
append
(
pack_uatype_array
(
self
.
VariantType
.
name
,
self
.
Value
))
b
.
append
(
pack_uatype_array
(
self
.
VariantType
.
name
,
flatten
(
self
.
Value
)
))
if
self
.
Dimensions
is
not
None
:
b
.
append
(
pack_uatype_array
(
"Int32"
,
self
.
Dimensions
))
else
:
...
...
@@ -805,10 +808,61 @@ class Variant(FrozenClass):
value
=
unpack_uatype
(
vtype
.
name
,
data
)
if
test_bit
(
encoding
,
6
):
dimensions
=
unpack_uatype_array
(
"Int32"
,
data
)
value
=
reshape
(
value
,
dimensions
)
return
Variant
(
value
,
vtype
,
encoding
,
dimensions
)
def
reshape
(
flat
,
dims
):
subdims
=
dims
[
1
:]
subsize
=
1
for
i
in
subdims
:
if
i
==
0
:
i
=
1
subsize
*=
i
while
dims
[
0
]
*
subsize
>
len
(
flat
):
flat
.
append
([])
if
not
subdims
or
subdims
==
[
0
]:
return
flat
return
[
reshape
(
flat
[
i
:
i
+
subsize
],
subdims
)
for
i
in
range
(
0
,
len
(
flat
),
subsize
)]
def
_split_list
(
l
,
n
):
n
=
max
(
1
,
n
)
return
[
l
[
i
:
i
+
n
]
for
i
in
range
(
0
,
len
(
l
),
n
)]
def
flatten_and_get_dimensions
(
mylist
):
dims
=
[]
dims
.
append
(
len
(
mylist
))
while
isinstance
(
mylist
[
0
],
(
list
,
tuple
)):
dims
.
append
(
len
(
mylist
[
0
]))
mylist
=
[
item
for
sublist
in
mylist
for
item
in
sublist
]
if
len
(
mylist
)
==
0
:
break
return
mylist
,
dims
def
flatten
(
mylist
):
if
len
(
mylist
)
==
0
:
return
mylist
while
isinstance
(
mylist
[
0
],
(
list
,
tuple
)):
mylist
=
[
item
for
sublist
in
mylist
for
item
in
sublist
]
if
len
(
mylist
)
==
0
:
break
return
mylist
def
get_dimensions
(
mylist
):
dims
=
[]
while
isinstance
(
mylist
,
(
list
,
tuple
)):
dims
.
append
(
len
(
mylist
))
if
len
(
mylist
)
==
0
:
break
mylist
=
mylist
[
0
]
return
dims
class
DataValue
(
FrozenClass
):
'''
...
...
tests/tests.py
View file @
a7a4acb1
...
...
@@ -22,6 +22,7 @@ from opcua.ua import ObjectIds
from
opcua.ua
import
AttributeIds
from
opcua.ua
import
extensionobject_from_binary
from
opcua.ua
import
extensionobject_to_binary
from
opcua.ua.uatypes
import
flatten
,
get_dimensions
,
reshape
port_num1
=
48510
port_num2
=
48530
...
...
@@ -95,6 +96,44 @@ class Unit(unittest.TestCase):
Simple unit test that do not need to setup a server or a client
'''
def
test_variant_dimensions
(
self
):
l
=
[[[
1.0
,
1.0
,
1.0
,
1.0
],
[
2.0
,
2.0
,
2.0
,
2.0
],
[
3.0
,
3.0
,
3.0
,
3.0
]],[[
5.0
,
5.0
,
5.0
,
5.0
],
[
7.0
,
8.0
,
9.0
,
01.0
],
[
1.0
,
1.0
,
1.0
,
1.0
]]]
v
=
ua
.
Variant
(
l
)
self
.
assertEqual
(
v
.
Dimensions
,
[
2
,
3
,
4
])
v2
=
ua
.
Variant
.
from_binary
(
ua
.
utils
.
Buffer
(
v
.
to_binary
()))
self
.
assertEqual
(
v
,
v2
)
self
.
assertEqual
(
v
.
Dimensions
,
v2
.
Dimensions
)
# very special case
l
=
[[[],
[],
[]],
[[],
[],
[]]]
v
=
ua
.
Variant
(
l
,
ua
.
VariantType
.
UInt32
)
self
.
assertEqual
(
v
.
Dimensions
,
[
2
,
3
,
0
])
v2
=
ua
.
Variant
.
from_binary
(
ua
.
utils
.
Buffer
(
v
.
to_binary
()))
self
.
assertEqual
(
v
.
Dimensions
,
v2
.
Dimensions
)
self
.
assertEqual
(
v
,
v2
)
def
test_flatten
(
self
):
l
=
[[[
1.0
,
1.0
,
1.0
,
1.0
],
[
1.0
,
1.0
,
1.0
,
1.0
],
[
1.0
,
1.0
,
1.0
,
1.0
]],[[
1.0
,
1.0
,
1.0
,
1.0
],
[
1.0
,
1.0
,
1.0
,
1.0
],
[
1.0
,
1.0
,
1.0
,
1.0
]]]
l2
=
flatten
(
l
)
dims
=
get_dimensions
(
l
)
self
.
assertEqual
(
dims
,
[
2
,
3
,
4
])
self
.
assertNotEqual
(
l
,
l2
)
l3
=
reshape
(
l2
,
(
2
,
3
,
4
))
self
.
assertEqual
(
l
,
l3
)
l
=
[[[],
[],
[]],
[[],
[],
[]]]
l2
=
flatten
(
l
)
dims
=
get_dimensions
(
l
)
self
.
assertEqual
(
dims
,
[
2
,
3
,
0
])
l
=
[
1
,
2
,
3
,
4
]
l2
=
flatten
(
l
)
dims
=
get_dimensions
(
l
)
self
.
assertEqual
(
dims
,
[
4
])
self
.
assertEqual
(
l
,
l2
)
def
test_guid
(
self
):
g
=
ua
.
Guid
()
sc
=
ua
.
StatusCode
()
...
...
@@ -277,15 +316,13 @@ class Unit(unittest.TestCase):
self
.
assertEqual
(
v
.
VariantType
,
v2
.
VariantType
)
def
test_variant_array_dim
(
self
):
v
=
ua
.
Variant
([
1
,
2
,
3
,
4
,
5
,
6
],
dimensions
=
[
2
,
5
])
v
=
ua
.
Variant
([
1
,
2
,
3
,
4
,
5
,
6
],
dimensions
=
[
2
,
3
])
self
.
assertEqual
(
v
.
Value
[
1
],
2
)
# self.assertEqual(v.VarianType, ua.VariantType.Int64) # we do not care, we should aonly test for sutff that matter
v2
=
ua
.
Variant
.
from_binary
(
ua
.
utils
.
Buffer
(
v
.
to_binary
()))
self
.
assertEqual
(
v
.
Value
,
v2
.
Value
)
self
.
assertEqual
(
reshape
(
v
.
Value
,
(
2
,
3
))
,
v2
.
Value
)
self
.
assertEqual
(
v
.
VariantType
,
v2
.
VariantType
)
self
.
assertEqual
(
v
.
Dimensions
,
v2
.
Dimensions
)
self
.
assertEqual
(
v2
.
Dimensions
,
[
2
,
5
])
self
.
assertEqual
(
v2
.
Dimensions
,
[
2
,
3
])
def
test_text
(
self
):
t1
=
ua
.
LocalizedText
(
'Root'
)
...
...
@@ -489,6 +526,27 @@ class CommonTests(object):
tid
=
v1
.
get_value
()
self
.
assertEqual
(
now
,
tid
)
def
test_variant_array_dim
(
self
):
objects
=
self
.
opc
.
get_objects_node
()
l
=
[[[
1.0
,
1.0
,
1.0
,
1.0
],
[
2.0
,
2.0
,
2.0
,
2.0
],
[
3.0
,
3.0
,
3.0
,
3.0
]],[[
5.0
,
5.0
,
5.0
,
5.0
],
[
7.0
,
8.0
,
9.0
,
01.0
],
[
1.0
,
1.0
,
1.0
,
1.0
]]]
v
=
objects
.
add_variable
(
3
,
'variableWithDims'
,
l
)
v2
=
v
.
get_value
()
self
.
assertEqual
(
v2
,
l
)
dv
=
v
.
get_data_value
()
self
.
assertEqual
(
dv
.
Value
.
Dimensions
,
[
2
,
3
,
4
])
l
=
[[[],
[],
[]],
[[],
[],
[]]]
variant
=
ua
.
Variant
(
l
,
ua
.
VariantType
.
UInt32
)
v
=
objects
.
add_variable
(
3
,
'variableWithDimsEmpty'
,
variant
)
v2
=
v
.
get_value
()
self
.
assertEqual
(
v2
,
l
)
dv
=
v
.
get_data_value
()
self
.
assertEqual
(
dv
.
Value
.
Dimensions
,
[
2
,
3
,
0
])
def
test_add_numeric_variable
(
self
):
objects
=
self
.
opc
.
get_objects_node
()
v
=
objects
.
add_variable
(
'ns=3;i=888;'
,
'3:numericnodefromstring'
,
99
)
...
...
@@ -1137,7 +1195,7 @@ class TestServer(unittest.TestCase, CommonTests):
new_servers
=
client
.
find_servers
([
"urn:freeopcua:python"
])
self
.
assertEqual
(
len
(
new_servers
)
-
len
(
servers
)
,
2
)
self
.
assertTrue
(
new_app_uri1
in
[
s
.
ApplicationUri
for
s
in
new_servers
])
self
.
assertTr
(
new_app_uri2
in
[
s
.
ApplicationUri
for
s
in
new_servers
])
self
.
assertTr
ue
(
new_app_uri2
in
[
s
.
ApplicationUri
for
s
in
new_servers
])
finally
:
client
.
disconnect
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment