Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
1449ba53
Commit
1449ba53
authored
Nov 11, 2021
by
oroulet
Committed by
oroulet
Nov 11, 2021
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
revert and fix Variant array
parent
100a711b
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
44 additions
and
21 deletions
+44
-21
asyncua/server/address_space.py
asyncua/server/address_space.py
+1
-1
asyncua/ua/ua_binary.py
asyncua/ua/ua_binary.py
+3
-2
asyncua/ua/uatypes.py
asyncua/ua/uatypes.py
+18
-10
tests/test_unit.py
tests/test_unit.py
+22
-8
No files found.
asyncua/server/address_space.py
View file @
1449ba53
...
@@ -458,7 +458,7 @@ class NodeManagementService:
...
@@ -458,7 +458,7 @@ class NodeManagementService:
def
_add_node_attr
(
self
,
item
,
nodedata
,
name
,
vtype
=
None
,
add_timestamps
=
False
,
is_array
=
False
):
def
_add_node_attr
(
self
,
item
,
nodedata
,
name
,
vtype
=
None
,
add_timestamps
=
False
,
is_array
=
False
):
if
item
.
SpecifiedAttributes
&
getattr
(
ua
.
NodeAttributesMask
,
name
):
if
item
.
SpecifiedAttributes
&
getattr
(
ua
.
NodeAttributesMask
,
name
):
dv
=
ua
.
DataValue
(
dv
=
ua
.
DataValue
(
ua
.
Variant
(
getattr
(
item
,
name
),
vtype
,
Dimensions
=
[
0
]
if
is_array
else
None
),
ua
.
Variant
(
getattr
(
item
,
name
),
vtype
,
is_array
=
is_array
),
SourceTimestamp
=
datetime
.
utcnow
()
if
add_timestamps
else
None
,
SourceTimestamp
=
datetime
.
utcnow
()
if
add_timestamps
else
None
,
)
)
nodedata
.
attributes
[
getattr
(
ua
.
AttributeIds
,
name
)]
=
AttributeValue
(
dv
)
nodedata
.
attributes
[
getattr
(
ua
.
AttributeIds
,
name
)]
=
AttributeValue
(
dv
)
...
...
asyncua/ua/ua_binary.py
View file @
1449ba53
...
@@ -434,19 +434,20 @@ def variant_to_binary(var):
...
@@ -434,19 +434,20 @@ def variant_to_binary(var):
def
variant_from_binary
(
data
):
def
variant_from_binary
(
data
):
dimensions
=
None
dimensions
=
None
array
=
False
encoding
=
ord
(
data
.
read
(
1
))
encoding
=
ord
(
data
.
read
(
1
))
int_type
=
encoding
&
0b00111111
int_type
=
encoding
&
0b00111111
vtype
=
ua
.
datatype_to_varianttype
(
int_type
)
vtype
=
ua
.
datatype_to_varianttype
(
int_type
)
if
test_bit
(
encoding
,
7
):
if
test_bit
(
encoding
,
7
):
value
=
unpack_uatype_array
(
vtype
,
data
)
value
=
unpack_uatype_array
(
vtype
,
data
)
dimensions
=
[
0
]
array
=
True
else
:
else
:
value
=
unpack_uatype
(
vtype
,
data
)
value
=
unpack_uatype
(
vtype
,
data
)
if
test_bit
(
encoding
,
6
):
if
test_bit
(
encoding
,
6
):
dimensions
=
unpack_uatype_array
(
ua
.
VariantType
.
Int32
,
data
)
dimensions
=
unpack_uatype_array
(
ua
.
VariantType
.
Int32
,
data
)
if
value
is
not
None
:
if
value
is
not
None
:
value
=
_reshape
(
value
,
dimensions
)
value
=
_reshape
(
value
,
dimensions
)
return
ua
.
Variant
(
value
,
vtype
,
dimensions
)
return
ua
.
Variant
(
value
,
vtype
,
dimensions
,
is_array
=
array
)
def
_reshape
(
flat
,
dims
):
def
_reshape
(
flat
,
dims
):
...
...
asyncua/ua/uatypes.py
View file @
1449ba53
...
@@ -3,7 +3,7 @@ implement ua datatypes
...
@@ -3,7 +3,7 @@ implement ua datatypes
"""
"""
import
sys
import
sys
from
typing
import
Optional
,
Any
,
Union
,
Generic
from
typing
import
Optional
,
Any
,
Union
,
Generic
,
List
import
collections
import
collections
import
logging
import
logging
from
enum
import
Enum
,
IntEnum
from
enum
import
Enum
,
IntEnum
...
@@ -780,22 +780,29 @@ class Variant:
...
@@ -780,22 +780,29 @@ class Variant:
:ivar VariantType:
:ivar VariantType:
:vartype VariantType: VariantType
:vartype VariantType: VariantType
:ivar Dimension:
:ivar Dimension:
:vartype Dimensions: The length of each dimensions. Usually guessed from value. [0] mean 1D array without length limit
:vartype Dimensions: The length of each dimensions. Make the variant a Matrix
:ivar is_array:
:vartype is_array: If the variant is an array. Always True if Dimension is specificied
"""
"""
# FIXME: typing is wrong here
Value: Any = None
Value: Any = None
VariantType: VariantType = None
VariantType: VariantType = None
Dimensions: Optional[Int32] = None
Dimensions: Optional[List[Int32]] = None
is_array: Optional[bool] = None
def __post_init__(self):
def __post_init__(self):
if self.is_array is None:
if isinstance(self.Value, (list, tuple)) or self.Dimensions :
object.__setattr__(self, "
is_array
", True)
else:
object.__setattr__(self, "
is_array
", False)
if isinstance(self.Value, Variant):
if isinstance(self.Value, Variant):
object.__setattr__(self, "
VariantType
", self.Value.VariantType)
object.__setattr__(self, "
VariantType
", self.Value.VariantType)
object.__setattr__(self, "
Value
", self.Value.Value)
object.__setattr__(self, "
Value
", self.Value.Value)
if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
dims = get_shape(self.Value)
object.__setattr__(self, "
Dimensions
", dims)
if not isinstance(self.VariantType, (VariantType, VariantTypeCustom)):
if not isinstance(self.VariantType, (VariantType, VariantTypeCustom)):
if self.VariantType is None:
if self.VariantType is None:
object.__setattr__(self, "
VariantType
", self._guess_type(self.Value))
object.__setattr__(self, "
VariantType
", self._guess_type(self.Value))
...
@@ -820,9 +827,10 @@ class Variant:
...
@@ -820,9 +827,10 @@ class Variant:
f"
Non
array
Variant
of
type
{
self
.
VariantType
}
cannot
have
value
None
"
f"
Non
array
Variant
of
type
{
self
.
VariantType
}
cannot
have
value
None
"
)
)
@property
if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
def is_array(self):
dims = get_shape(self.Value)
return self.Dimensions is not None
if len(dims) > 1:
object.__setattr__(self, "
Dimensions
", dims)
def __eq__(self, other):
def __eq__(self, other):
if (
if (
...
...
tests/test_unit.py
View file @
1449ba53
...
@@ -30,35 +30,44 @@ EXAMPLE_BSD_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "exam
...
@@ -30,35 +30,44 @@ EXAMPLE_BSD_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "exam
def
test_variant_array_none
():
def
test_variant_array_none
():
v
=
ua
.
Variant
(
None
,
VariantType
=
ua
.
VariantType
.
Int32
,
Dimensions
=
[
0
]
)
v
=
ua
.
Variant
(
None
,
VariantType
=
ua
.
VariantType
.
Int32
,
is_array
=
True
)
data
=
variant_to_binary
(
v
)
data
=
variant_to_binary
(
v
)
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
data
))
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
data
))
assert
v
==
v2
assert
v
==
v2
assert
v2
.
is_array
assert
v2
.
is_array
assert
v2
.
Dimensions
==
[
0
]
assert
v2
.
Dimensions
is
None
v
=
ua
.
Variant
(
None
,
VariantType
=
ua
.
VariantType
.
Null
,
Dimensions
=
[
0
]
)
v
=
ua
.
Variant
(
None
,
VariantType
=
ua
.
VariantType
.
Null
,
is_array
=
True
)
data
=
variant_to_binary
(
v
)
data
=
variant_to_binary
(
v
)
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
data
))
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
data
))
assert
v
==
v2
assert
v
==
v2
assert
v2
.
is_array
assert
v2
.
is_array
assert
v2
.
Dimensions
==
[
0
]
assert
v2
.
Dimensions
is
None
v
=
ua
.
Variant
(
None
,
VariantType
=
ua
.
VariantType
.
Null
,
Dimensions
=
None
)
v
=
ua
.
Variant
(
None
,
VariantType
=
ua
.
VariantType
.
Null
,
Dimensions
=
[
0
,
0
]
)
data
=
variant_to_binary
(
v
)
data
=
variant_to_binary
(
v
)
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
data
))
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
data
))
assert
v
==
v2
assert
v
==
v2
assert
not
v2
.
is_array
assert
v2
.
is_array
assert
v2
.
Dimensions
==
None
assert
v2
.
Dimensions
==
[
0
,
0
]
def
test_variant_empty_list
():
def
test_variant_empty_list
():
v
=
ua
.
Variant
([],
VariantType
=
ua
.
VariantType
.
Int32
,
Dimensions
=
[
0
])
v
=
ua
.
Variant
([],
VariantType
=
ua
.
VariantType
.
Int32
,
is_array
=
True
)
data
=
variant_to_binary
(
v
)
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
data
))
assert
v
==
v2
assert
v2
.
is_array
assert
v2
.
Dimensions
is
None
v
=
ua
.
Variant
([],
VariantType
=
ua
.
VariantType
.
Int32
,
is_array
=
True
,
Dimensions
=
[
0
])
data
=
variant_to_binary
(
v
)
data
=
variant_to_binary
(
v
)
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
data
))
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
data
))
assert
v
==
v2
assert
v
==
v2
assert
v2
.
is_array
assert
v2
.
is_array
assert
v2
.
Dimensions
==
[
0
]
def
test_custom_structs
(
tmpdir
):
def
test_custom_structs
(
tmpdir
):
...
@@ -602,6 +611,7 @@ def test_variant_array():
...
@@ -602,6 +611,7 @@ def test_variant_array():
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
variant_to_binary
(
v
)))
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
variant_to_binary
(
v
)))
assert
v
.
Value
==
v2
.
Value
assert
v
.
Value
==
v2
.
Value
assert
v
.
VariantType
==
v2
.
VariantType
assert
v
.
VariantType
==
v2
.
VariantType
assert
v2
.
Dimensions
is
None
now
=
datetime
.
utcnow
()
now
=
datetime
.
utcnow
()
v
=
ua
.
Variant
([
now
])
v
=
ua
.
Variant
([
now
])
...
@@ -610,12 +620,16 @@ def test_variant_array():
...
@@ -610,12 +620,16 @@ def test_variant_array():
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
variant_to_binary
(
v
)))
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
variant_to_binary
(
v
)))
assert
v
.
Value
==
v2
.
Value
assert
v
.
Value
==
v2
.
Value
assert
v
.
VariantType
==
v2
.
VariantType
assert
v
.
VariantType
==
v2
.
VariantType
assert
v2
.
Dimensions
is
None
def
test_variant_array_dim
():
def
test_variant_array_dim
():
v
=
ua
.
Variant
([
1
,
2
,
3
,
4
,
5
,
6
],
Dimensions
=
[
2
,
3
])
v
=
ua
.
Variant
([
1
,
2
,
3
,
4
,
5
,
6
],
Dimensions
=
[
2
,
3
])
assert
v
.
Value
[
1
]
==
2
assert
v
.
Value
[
1
]
==
2
assert
v
.
Dimensions
==
[
2
,
3
]
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
variant_to_binary
(
v
)))
v2
=
variant_from_binary
(
ua
.
utils
.
Buffer
(
variant_to_binary
(
v
)))
assert
_reshape
(
v
.
Value
,
(
2
,
3
))
==
v2
.
Value
assert
_reshape
(
v
.
Value
,
(
2
,
3
))
==
v2
.
Value
assert
v
.
VariantType
==
v2
.
VariantType
assert
v
.
VariantType
==
v2
.
VariantType
assert
v
.
Dimensions
==
v2
.
Dimensions
assert
v
.
Dimensions
==
v2
.
Dimensions
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment