Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
0d659b0e
Commit
0d659b0e
authored
Feb 21, 2021
by
oroulet
Committed by
oroulet
Mar 10, 2021
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
start work on generated code
parent
8917a67c
Changes
7
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
2572 additions
and
6808 deletions
+2572
-6808
asyncua/ua/ua_binary.py
asyncua/ua/ua_binary.py
+2
-1
asyncua/ua/uaprotocol_auto.py
asyncua/ua/uaprotocol_auto.py
+2398
-6591
asyncua/ua/uaprotocol_hand.py
asyncua/ua/uaprotocol_hand.py
+70
-135
asyncua/ua/uatypes.py
asyncua/ua/uatypes.py
+25
-13
schemas/generate_model.py
schemas/generate_model.py
+26
-16
schemas/generate_protocol_python.py
schemas/generate_protocol_python.py
+47
-50
tests/test_unit.py
tests/test_unit.py
+4
-2
No files found.
asyncua/ua/ua_binary.py
View file @
0d659b0e
...
...
@@ -267,7 +267,7 @@ def struct_to_binary(obj):
print
(
"SETATTR"
,
name
,
idx
,
bin
(
container_val
))
setattr
(
obj
,
container_name
,
container_val
)
for
name
,
uatype
in
get_type_hints
(
obj
).
items
():
print
(
"STRUCT"
,
obj
,
name
,
uatype
)
print
(
"STRUCT
LEMENT
"
,
obj
,
name
,
uatype
)
if
name
==
"Encoding"
:
packet
.
append
(
Primitives
.
Byte
.
pack
(
obj
.
Encoding
))
continue
...
...
@@ -278,6 +278,7 @@ def struct_to_binary(obj):
packet
.
append
(
list_to_binary
(
_from_list
(
uatype
),
val
))
else
:
if
has_switch
and
val
is
None
and
name
in
obj
.
ua_switches
:
print
(
"SWTICH SAYS TO NOT WRITE"
)
pass
else
:
packet
.
append
(
to_binary
(
uatype
,
val
))
...
...
asyncua/ua/uaprotocol_auto.py
View file @
0d659b0e
This diff is collapsed.
Click to expand it.
asyncua/ua/uaprotocol_hand.py
View file @
0d659b0e
This diff is collapsed.
Click to expand it.
asyncua/ua/uatypes.py
View file @
0d659b0e
...
...
@@ -254,8 +254,21 @@ class EventNotifier(_MaskEnum):
@
dataclass
(
frozen
=
FROZEN
)
class
StatusCode
:
"""
:ivar value:
:vartype value: int
:ivar name:
:vartype name: string
:ivar doc:
:vartype doc: string
"""
value
:
UInt32
=
0
def
__post_init__
(
self
):
if
isinstance
(
self
.
value
,
str
):
self
.
value
=
getattr
(
status_codes
.
StatusCodes
,
self
.
value
)
def
check
(
self
):
"""
Raises an exception if the status code is anything else than 0 (good).
...
...
@@ -297,7 +310,7 @@ class NodeIdType(IntEnum):
_NodeIdType
=
NodeIdType
# ugly hack
@
dataclass
(
frozen
=
True
,
eq
=
False
)
@
dataclass
(
frozen
=
True
,
eq
=
False
,
order
=
False
)
class
NodeId
:
"""
NodeId Object
...
...
@@ -372,10 +385,9 @@ class NodeId:
def
__lt__
(
self
,
other
):
if
not
isinstance
(
other
,
NodeId
):
raise
AttributeError
(
"Can only compare to NodeId"
)
print
(
"COMPARE"
,
self
,
other
)
print
(
self
.
NodeIdType
,
self
.
NamespaceIndex
,
self
.
Identifier
,
other
.
NodeIdType
,
other
.
NamespaceIndex
,
other
.
Identifier
)
return
(
self
.
NodeIdType
,
self
.
NamespaceIndex
,
self
.
Identifier
)
<
(
other
.
NodeIdType
,
other
.
NamespaceIndex
,
other
.
Identifier
)
raise
UaError
(
f"NodeId of type
{
self
.
NodeIdType
.
name
}
has an incompatible identifier
{
self
.
Identifier
}
of type
{
type
(
self
.
Identifier
)
}
"
)
def
is_null
(
self
):
if
self
.
NamespaceIndex
!=
0
:
...
...
@@ -460,7 +472,7 @@ class NodeId:
return
asyncua
.
ua
.
ua_binary
.
nodeid_to_binary
(
self
)
@
dataclass
(
frozen
=
True
,
eq
=
False
)
@
dataclass
(
frozen
=
True
,
eq
=
False
,
order
=
False
)
class
TwoByteNodeId
(
NodeId
):
def
__post_init__
(
self
):
object
.
__setattr__
(
self
,
"NodeIdType"
,
NodeIdType
.
TwoByte
)
...
...
@@ -472,7 +484,7 @@ class TwoByteNodeId(NodeId):
raise
ValueError
(
f"
{
self
.
__class__
.
__name__
}
cannot have NamespaceIndex != 0"
)
@
dataclass
(
frozen
=
True
,
eq
=
False
)
@
dataclass
(
frozen
=
True
,
eq
=
False
,
order
=
False
)
class
FourByteNodeId
(
NodeId
):
def
__post_init__
(
self
):
object
.
__setattr__
(
self
,
"NodeIdType"
,
NodeIdType
.
FourByte
)
...
...
@@ -484,7 +496,7 @@ class FourByteNodeId(NodeId):
raise
ValueError
(
f"
{
self
.
__class__
.
__name__
}
cannot have NamespaceIndex != 0"
)
@
dataclass
(
frozen
=
True
,
eq
=
False
)
@
dataclass
(
frozen
=
True
,
eq
=
False
,
order
=
False
)
class
NumericNodeId
(
NodeId
):
def
__post_init__
(
self
):
object
.
__setattr__
(
self
,
"NodeIdType"
,
NodeIdType
.
Numeric
)
...
...
@@ -492,7 +504,7 @@ class NumericNodeId(NodeId):
raise
ValueError
(
f"
{
self
.
__class__
.
__name__
}
Identifier must be int"
)
@
dataclass
(
frozen
=
True
,
eq
=
False
)
@
dataclass
(
frozen
=
True
,
eq
=
False
,
order
=
False
)
class
ByteStringNodeId
(
NodeId
):
def
__post_init__
(
self
):
object
.
__setattr__
(
self
,
"NodeIdType"
,
NodeIdType
.
ByteString
)
...
...
@@ -500,7 +512,7 @@ class ByteStringNodeId(NodeId):
raise
ValueError
(
f"
{
self
.
__class__
.
__name__
}
Identifier must be bytes"
)
@
dataclass
(
frozen
=
True
,
eq
=
False
)
@
dataclass
(
frozen
=
True
,
eq
=
False
,
order
=
False
)
class
GuidNodeId
(
NodeId
):
def
__post_init__
(
self
):
object
.
__setattr__
(
self
,
"NodeIdType"
,
NodeIdType
.
Guid
)
...
...
@@ -508,7 +520,7 @@ class GuidNodeId(NodeId):
raise
ValueError
(
f"
{
self
.
__class__
.
__name__
}
Identifier must be uuid"
)
@
dataclass
(
frozen
=
True
,
eq
=
False
)
@
dataclass
(
frozen
=
True
,
eq
=
False
,
order
=
False
)
class
StringNodeId
(
NodeId
):
def
__post_init__
(
self
):
object
.
__setattr__
(
self
,
"NodeIdType"
,
NodeIdType
.
String
)
...
...
@@ -516,7 +528,7 @@ class StringNodeId(NodeId):
raise
ValueError
(
f"
{
self
.
__class__
.
__name__
}
Identifier must be string"
)
@
dataclass
(
frozen
=
True
,
eq
=
False
)
@
dataclass
(
frozen
=
True
,
eq
=
False
,
order
=
False
)
class
ExpandedNodeId
(
NodeId
):
NamespaceUri
:
Optional
[
String
]
=
field
(
default
=
None
,
compare
=
True
)
ServerIndex
:
Int32
=
field
(
default
=
0
,
compare
=
True
)
...
...
@@ -540,7 +552,7 @@ class QualifiedName:
NamespaceIndex
:
UInt16
=
0
Name
:
String
=
""
def
__init__
(
self
,
Name
,
NamespaceIndex
=
0
):
def
__init__
(
self
,
Name
=
"MISSING_NAME"
,
NamespaceIndex
=
0
):
self
.
Name
=
Name
self
.
NamespaceIndex
=
NamespaceIndex
if
isinstance
(
self
.
NamespaceIndex
,
str
)
and
isinstance
(
self
.
Name
,
int
):
...
...
@@ -758,8 +770,8 @@ class Variant:
self.is_array = False
self._freeze = True
if isinstance(self.Value, Variant):
self.Value = self.Value.Value
self.VariantType = self.Value.VariantType
self.Value = self.Value.Value
if self.VariantType is None:
self.VariantType = self._guess_type(self.Value)
if (
...
...
schemas/generate_model.py
View file @
0d659b0e
...
...
@@ -125,35 +125,43 @@ class Model(object):
raise
Exception
(
"No enum named: "
+
str
(
name
))
def
_add_struct
(
struct
,
newstructs
,
waiting_structs
,
known_structs
):
print
(
"appending"
,
struct
)
newstructs
.
append
(
struct
)
known_structs
.
append
(
struct
.
name
)
# now seeing if some struct where waiting for this one
waitings
=
waiting_structs
.
pop
(
struct
.
name
,
None
)
if
waitings
:
for
s
in
waitings
:
s
.
waitingfor
.
remove
(
struct
.
name
)
print
(
"TRY POP"
,
s
,
s
.
waitingfor
)
if
not
s
.
waitingfor
:
_add_struct
(
s
,
newstructs
,
waiting_structs
,
known_structs
)
def
reorder_structs
(
model
):
types
=
IgnoredStructs
+
IgnoredEnums
+
[
'Bit'
,
'Char'
,
'CharArray'
,
'Guid'
,
'SByte'
,
'Int16'
,
'Int32'
,
'Int64'
,
'UInt16'
,
'UInt32'
,
'UInt64'
,
'DateTime'
,
'Boolean'
,
'Double'
,
'Float'
,
'ByteString'
,
'Byte'
,
'StatusCode'
,
'DiagnosticInfo'
,
'String'
,
'AttributeID'
'AttributeID'
,
"NodeId"
,
"Variant"
]
+
[
enum
.
name
for
enum
in
model
.
enums
]
+
[
'VariableAccessLevel'
]
waiting
=
{}
waiting
_structs
=
{}
newstructs
=
[]
for
s
in
model
.
structs
:
types
.
append
(
s
.
name
)
print
(
"Trying to add "
,
s
)
s
.
waitingfor
=
[]
ok
=
True
for
f
in
s
.
fields
:
if
f
.
uatype
not
in
types
:
if
f
.
uatype
in
waiting
.
keys
():
waiting
[
f
.
uatype
].
append
(
s
)
s
.
waitingfor
.
append
(
f
.
uatype
)
if
f
.
uatype
in
waiting_structs
:
waiting_structs
[
f
.
uatype
].
append
(
s
)
else
:
waiting
[
f
.
uatype
]
=
[
s
]
s
.
waitingfor
.
append
(
f
.
uatype
)
waiting_structs
[
f
.
uatype
]
=
[
s
]
s
.
waitingfor
.
append
(
f
.
uatype
)
print
(
s
,
" waiting for "
,
f
.
uatype
)
ok
=
False
if
ok
:
newstructs
.
append
(
s
)
waitings
=
waiting
.
pop
(
s
.
name
,
None
)
if
waitings
:
for
s2
in
waitings
:
s2
.
waitingfor
.
remove
(
s
.
name
)
if
not
s2
.
waitingfor
:
newstructs
.
append
(
s2
)
_add_struct
(
s
,
newstructs
,
waiting_structs
,
types
)
if
len
(
model
.
structs
)
!=
len
(
newstructs
):
_logger
.
warning
(
f'Error while reordering structs, some structs could not be reinserted,'
f' had
{
len
(
model
.
structs
)
}
structs, we now have
{
len
(
newstructs
)
}
structs'
)
...
...
@@ -161,7 +169,9 @@ def reorder_structs(model):
s2
=
set
(
newstructs
)
_logger
.
debug
(
'Variant'
in
types
)
for
s
in
s1
-
s2
:
_logger
.
debug
(
f'
{
s
}
is waiting for:
{
s
.
waitingfor
}
'
)
_logger
.
warning
(
f'
{
s
}
is waiting_structs for:
{
s
.
waitingfor
}
'
)
#from IPython import embed
#embed()
model
.
structs
=
newstructs
...
...
schemas/generate_protocol_python.py
View file @
0d659b0e
...
...
@@ -80,9 +80,16 @@ class CodeGenerator:
self
.
write
(
''
)
self
.
write
(
'from datetime import datetime'
)
self
.
write
(
'from enum import IntEnum'
)
self
.
write
(
'from typing import Union, List'
)
self
.
write
(
'from dataclasses import dataclass, field'
)
self
.
write
(
''
)
# self.write('from asyncua.ua.uaerrors import UaError')
self
.
write
(
'from asyncua.ua.uatypes import *'
)
self
.
write
(
'from asyncua.ua.uatypes import FROZEN'
)
self
.
write
(
'from asyncua.ua.uatypes import SByte, Byte, Bytes, ByteString, Int16, Int32, Int64, UInt16, UInt32, UInt64, Boolean, Float, Double, Null, String, CharArray, DateTime, Guid'
)
self
.
write
(
'from asyncua.ua.uatypes import AccessLevel, EventNotifier '
)
self
.
write
(
'from asyncua.ua.uatypes import LocalizedText, Variant, QualifiedName, StatusCode, DataValue'
)
self
.
write
(
'from asyncua.ua.uatypes import NodeId, FourByteNodeId, ExpandedNodeId, ExtensionObject'
)
self
.
write
(
'from asyncua.ua.uatypes import extension_object_typeids, extension_objects_by_typeid'
)
self
.
write
(
'from asyncua.ua.object_ids import ObjectIds'
)
def
generate_enum_code
(
self
,
enum
):
...
...
@@ -106,7 +113,8 @@ class CodeGenerator:
self
.
write
(
''
)
self
.
write
(
''
)
self
.
iidx
=
0
self
.
write
(
f'class
{
obj
.
name
}
(FrozenClass):'
)
self
.
write
(
'@dataclass(frozen=FROZEN)'
)
self
.
write
(
f'class
{
obj
.
name
}
:'
)
self
.
iidx
+=
1
self
.
write
(
'"""'
)
if
obj
.
doc
:
...
...
@@ -117,15 +125,47 @@ class CodeGenerator:
self
.
write
(
f':vartype
{
field
.
name
}
:
{
field
.
uatype
}
'
)
self
.
write
(
'"""'
)
self
.
write
(
''
)
# FIXME: next line is a weak way to find out if object is a datatype or not...
if
"Parameter"
not
in
obj
.
name
and
"Result"
not
in
obj
.
name
:
self
.
write
(
''
)
self
.
write
(
f'data_type = NodeId(ObjectIds.
{
obj
.
name
}
)'
)
if
obj
.
fields
:
self
.
write
(
''
)
# hack extension object stuff
extobj_hack
=
False
if
"BodyLength"
in
[
f
.
name
for
f
in
obj
.
fields
]:
extobj_hack
=
True
for
field
in
obj
.
fields
:
if
field
.
length
:
typestring
=
f"List[
{
field
.
uatype
}
]"
else
:
typestring
=
field
.
uatype
if
field
.
name
==
field
.
uatype
:
# help!!! selv referencing class
print
(
"SELF REFENCING"
,
obj
,
field
)
fieldname
=
field
.
name
+
"_"
else
:
fieldname
=
field
.
name
if
field
.
name
==
"Encoding"
:
val
=
0
if
not
extobj_hack
else
1
self
.
write
(
f"
{
field
.
name
}
: Byte = field(default=
{
val
}
, repr=False, init=False)"
)
elif
field
.
uatype
==
obj
.
name
:
# help!!! selv referencing class
pass
#FIXME: handle
elif
obj
.
name
not
in
(
"ExtensionObject"
,)
and
\
field
.
name
==
"TypeId"
:
# and ( obj.name.endswith("Request") or obj.name.endswith("Response")):
self
.
write
(
f"TypeId: NodeId = FourByteNodeId(ObjectIds.
{
obj
.
name
}
_Encoding_DefaultBinary)"
)
else
:
self
.
write
(
f"
{
fieldname
}
:
{
typestring
}
=
{
'field(default_factory=list)'
if
field
.
length
else
self
.
get_default_value
(
field
)
}
"
)
switch_written
=
False
for
field
in
obj
.
fields
:
if
field
.
switchfield
is
not
None
:
if
not
switch_written
:
self
.
write
(
""
)
self
.
write
(
'ua_switches = {'
)
switch_written
=
True
...
...
@@ -133,51 +173,7 @@ class CodeGenerator:
self
.
write
(
f" '
{
field
.
name
}
': ('
{
bit
.
container
}
',
{
bit
.
idx
}
),"
)
# if field.switchvalue is not None: Not sure we need to handle that one
if
switch_written
:
self
.
write
(
" }"
)
self
.
write
(
"ua_types = ["
)
for
field
in
obj
.
fields
:
prefix
=
"ListOf"
if
field
.
length
else
""
uatype
=
prefix
+
field
.
uatype
if
uatype
==
"ListOfChar"
:
uatype
=
"String"
self
.
write
(
f" ('
{
field
.
name
}
', '
{
uatype
}
'),"
)
self
.
write
(
" ]"
)
self
.
write
(
""
)
self
.
write
(
"def __init__(self):"
)
self
.
iidx
+=
1
# hack extension object stuff
extobj_hack
=
False
if
"BodyLength"
in
[
f
.
name
for
f
in
obj
.
fields
]:
extobj_hack
=
True
for
field
in
obj
.
fields
:
if
extobj_hack
and
field
.
name
==
"Encoding"
:
self
.
write
(
"self.Encoding = 1"
)
elif
field
.
uatype
==
obj
.
name
:
# help!!! selv referencing class
self
.
write
(
"self.{} = None"
.
format
(
field
.
name
))
elif
obj
.
name
not
in
(
"ExtensionObject"
,)
and
\
field
.
name
==
"TypeId"
:
# and ( obj.name.endswith("Request") or obj.name.endswith("Response")):
self
.
write
(
f"self.TypeId = FourByteNodeId(ObjectIds.
{
obj
.
name
}
_Encoding_DefaultBinary)"
)
else
:
self
.
write
(
f"self.
{
field
.
name
}
=
{
'[]'
if
field
.
length
else
self
.
get_default_value
(
field
)
}
"
)
self
.
write
(
"self._freeze = True"
)
self
.
iidx
=
1
# __str__
self
.
write
(
""
)
self
.
write
(
"def __str__(self):"
)
self
.
iidx
+=
1
tmp
=
[
f"
{
f
.
name
}
:{{self.
{
f
.
name
}
}}"
for
f
in
obj
.
fields
]
tmp
=
", "
.
join
(
tmp
)
if
tmp
:
self
.
write
(
f"return f'
{
obj
.
name
}
(
{
tmp
}
)'"
)
else
:
self
.
write
(
f"return '
{
obj
.
name
}
()'"
)
self
.
iidx
-=
1
self
.
write
(
""
)
self
.
write
(
"__repr__ = __str__"
)
self
.
write
(
"}"
)
self
.
iidx
=
0
...
...
@@ -217,7 +213,7 @@ class CodeGenerator:
return
None
if
field
.
uatype
in
self
.
model
.
enum_list
:
enum
=
self
.
model
.
get_enum
(
field
.
uatype
)
return
f'
{
enum
.
name
}
(0)
'
return
f'
{
enum
.
name
}
.
{
enum
.
values
[
0
].
name
}
'
if
field
.
uatype
==
'String'
:
return
None
elif
field
.
uatype
in
(
'ByteString'
,
'CharArray'
,
'Char'
):
...
...
@@ -231,7 +227,7 @@ class CodeGenerator:
elif
field
.
uatype
in
'ExtensionObject'
:
return
'ExtensionObject()'
else
:
return
f'
{
field
.
uatype
}
(
)'
return
f'
field(default_factory=
{
field
.
uatype
}
)'
if
__name__
==
'__main__'
:
...
...
@@ -248,5 +244,6 @@ if __name__ == '__main__':
gm
.
split_requests
(
model
)
gm
.
fix_names
(
model
)
gm
.
remove_duplicate_types
(
model
)
gm
.
reorder_structs
(
model
)
c
=
CodeGenerator
(
model
,
protocol_path
)
c
.
run
()
tests/test_unit.py
View file @
0d659b0e
...
...
@@ -160,6 +160,7 @@ def test_nodeid_ordering():
mylist
=
[
a
,
b
,
c
,
d
,
e
,
f
,
g
,
h
,
i
,
j
]
mylist
.
sort
()
expected
=
[
h
,
c
,
a
,
b
,
e
,
d
,
f
,
g
,
i
,
j
]
expected
=
[
c
,
h
,
a
,
b
,
e
,
d
,
f
,
g
,
i
,
j
]
# FIXME: make sure this does not break some client/server
assert
mylist
==
expected
...
...
@@ -233,9 +234,10 @@ def test_string_to_variant_localized_text():
def
test_string_to_variant_localized_text_with_locale
():
locale
=
"cs-CZ"
string
=
"Moje jméno"
string_repr
=
f
"LocalizedText(Encoding:3, Locale:
{
locale
}
, Text:
{
string
}
)"
string_repr
=
f
'LocalizedText(Encoding:3, Locale:
{
locale
}
, Text:
{
string
}
)'
obj
=
ua
.
LocalizedText
(
string
,
locale
)
assert
obj
==
string_to_val
(
string_repr
,
ua
.
VariantType
.
LocalizedText
)
breakpoint
()
assert
string_repr
==
val_to_string
(
obj
)
...
...
@@ -384,7 +386,7 @@ def test_nodeid_string():
assert
nid
!=
ua
.
NodeId
.
from_string
(
"i=45; ns=11"
)
assert
nid
!=
ua
.
NodeId
.
from_string
(
"i=5; ns=10"
)
# not sure the next one is correct...
assert
nid
==
ua
.
NodeId
.
from_string
(
"i=45; ns=10; srv=
serverid
"
)
assert
nid
==
ua
.
NodeId
.
from_string
(
"i=45; ns=10; srv=
3
"
)
nid1
=
ua
.
NodeId
(
"myid.mynodeid"
,
7
)
assert
nid1
==
ua
.
NodeId
.
from_string
(
"ns=7; s=myid.mynodeid"
)
# with pytest.raises(ua.UaError):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment