Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
bffe1340
Commit
bffe1340
authored
Aug 13, 2016
by
olivier R-D
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
initial copy_node implementation and cleanup of instanciate
parent
f88dce0d
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
124 additions
and
67 deletions
+124
-67
opcua/__init__.py
opcua/__init__.py
+1
-0
opcua/common/copy.py
opcua/common/copy.py
+78
-0
opcua/common/instantiate.py
opcua/common/instantiate.py
+40
-64
opcua/common/manage_nodes.py
opcua/common/manage_nodes.py
+1
-1
tests/tests_common.py
tests/tests_common.py
+4
-2
No files found.
opcua/__init__.py
View file @
bffe1340
...
...
@@ -10,6 +10,7 @@ from opcua.client.client import Client
from
opcua.server.server
import
Server
from
opcua.server.event_generator
import
EventGenerator
from
opcua.common.instantiate
import
instantiate
from
opcua.common.copy
import
copy_node
opcua/common/copy.py
0 → 100644
View file @
bffe1340
from
opcua
import
ua
from
opcua
import
Node
def
copy_node
(
parent
,
node
,
nodeid
=
None
,
idx
=
0
,
recursive
=
True
):
"""
Copy a node or node tree as child of parent node
"""
print
(
"Copying"
,
node
,
"into "
,
parent
,
"in idx"
,
idx
)
rdesc
=
_rdesc_from_node
(
parent
,
node
)
if
nodeid
is
None
:
nodeid
=
ua
.
NodeId
(
namespaceidx
=
idx
)
# will trigger automatic node generation in namespace idx
added_nodeids
=
_copy_node
(
parent
.
server
,
parent
.
nodeid
,
rdesc
,
nodeid
,
recursive
)
return
[
Node
(
parent
.
server
,
nid
)
for
nid
in
added_nodeids
]
def
_copy_node
(
server
,
parent_nodeid
,
rdesc
,
nodeid
,
recursive
):
addnode
=
ua
.
AddNodesItem
()
addnode
.
RequestedNewNodeId
=
nodeid
addnode
.
BrowseName
=
rdesc
.
BrowseName
addnode
.
ParentNodeId
=
parent_nodeid
addnode
.
ReferenceTypeId
=
rdesc
.
ReferenceTypeId
addnode
.
TypeDefinition
=
rdesc
.
TypeDefinition
addnode
.
NodeClass
=
rdesc
.
NodeClass
node_type
=
Node
(
server
,
rdesc
.
NodeId
)
attrObj
=
getattr
(
ua
,
rdesc
.
NodeClass
.
name
+
"Attributes"
)
print
(
"attrObj"
is
attrObj
)
_read_and_copy_attrs
(
node_type
,
attrObj
(),
addnode
)
res
=
server
.
add_nodes
([
addnode
])[
0
]
print
(
"Added result"
,
res
)
added_nodes
=
[
res
.
AddedNodeId
]
if
recursive
:
descs
=
node_type
.
get_children_descriptions
()
for
desc
in
descs
:
nodes
=
_copy_node
(
server
,
res
.
AddedNodeId
,
desc
,
nodeid
=
ua
.
NodeId
(
namespaceidx
=
res
.
AddedNodeId
.
NamespaceIndex
),
recursive
=
recursive
)
added_nodes
.
extend
(
nodes
)
return
added_nodes
def
_rdesc_from_node
(
parent
,
node
):
results
=
node
.
get_attributes
([
ua
.
AttributeIds
.
NodeClass
,
ua
.
AttributeIds
.
BrowseName
,
ua
.
AttributeIds
.
DisplayName
])
nclass
,
qname
,
dname
=
[
res
.
Value
.
Value
for
res
in
results
]
rdesc
=
ua
.
ReferenceDescription
()
rdesc
.
NodeId
=
node
.
nodeid
rdesc
.
BrowseName
=
qname
rdesc
.
DisplayName
=
dname
rdesc
.
NodeClass
=
nclass
if
parent
.
get_type_definition
()
==
ua
.
NodeId
(
ua
.
ObjectIds
.
FolderType
):
rdesc
.
ReferenceTypeId
=
ua
.
NodeId
(
ua
.
ObjectIds
.
Organizes
)
else
:
rdesc
.
ReferenceTypeId
=
ua
.
NodeId
(
ua
.
ObjectIds
.
HasComponent
)
rdesc
.
TypeDefinition
=
node
.
nodeid
return
rdesc
def
_read_and_copy_attrs
(
node_type
,
struct
,
addnode
):
names
=
[
name
for
name
in
struct
.
__dict__
.
keys
()
if
not
name
.
startswith
(
"_"
)
and
name
not
in
(
"BodyLength"
,
"TypeId"
,
"SpecifiedAttributes"
,
"Encoding"
,
"IsAbstract"
,
"EventNotifier"
)]
attrs
=
[
getattr
(
ua
.
AttributeIds
,
name
)
for
name
in
names
]
for
name
in
names
:
results
=
node_type
.
get_attributes
(
attrs
)
for
idx
,
name
in
enumerate
(
names
):
if
results
[
idx
].
StatusCode
.
is_good
():
if
name
==
"Value"
:
setattr
(
struct
,
name
,
results
[
idx
].
Value
)
else
:
setattr
(
struct
,
name
,
results
[
idx
].
Value
.
Value
)
else
:
print
(
"Instantiate: while copying attributes from node type %s, attribute %s, statuscode is %s"
%
(
node_type
,
name
,
results
[
idx
].
StatusCode
))
addnode
.
NodeAttributes
=
struct
opcua/common/instantiate.py
View file @
bffe1340
...
...
@@ -5,7 +5,8 @@ Instantiate a new node and its child nodes from a node type.
from
opcua
import
Node
from
opcua
import
ua
from
opcua.common
import
ua_utils
from
opcua.common
import
ua_utils
from
opcua.common.copy
import
_rdesc_from_node
,
_read_and_copy_attrs
def
instantiate
(
parent
,
node_type
,
nodeid
=
None
,
bname
=
None
,
idx
=
0
):
...
...
@@ -15,20 +16,8 @@ def instantiate(parent, node_type, nodeid=None, bname=None, idx=0):
If they exists children of the node type, such as components, variables and
properties are also instantiated
"""
rdesc
=
_rdesc_from_node
(
parent
,
node_type
)
results
=
node_type
.
get_attributes
([
ua
.
AttributeIds
.
NodeClass
,
ua
.
AttributeIds
.
BrowseName
,
ua
.
AttributeIds
.
DisplayName
])
nclass
,
qname
,
dname
=
[
res
.
Value
.
Value
for
res
in
results
]
rdesc
=
ua
.
ReferenceDescription
()
rdesc
.
NodeId
=
node_type
.
nodeid
rdesc
.
BrowseName
=
qname
rdesc
.
DisplayName
=
dname
rdesc
.
NodeClass
=
nclass
if
parent
.
get_type_definition
()
==
ua
.
NodeId
(
ua
.
ObjectIds
.
FolderType
):
rdesc
.
ReferenceTypeId
=
ua
.
NodeId
(
ua
.
ObjectIds
.
Organizes
)
else
:
rdesc
.
ReferenceTypeId
=
ua
.
NodeId
(
ua
.
ObjectIds
.
HasComponent
)
rdesc
.
TypeDefinition
=
node_type
.
nodeid
if
nodeid
is
None
:
nodeid
=
ua
.
NodeId
(
namespaceidx
=
idx
)
# will trigger automatic node generation in namespace idx
if
bname
is
None
:
...
...
@@ -36,14 +25,21 @@ def instantiate(parent, node_type, nodeid=None, bname=None, idx=0):
elif
isinstance
(
bname
,
str
):
bname
=
ua
.
QualifiedName
.
from_string
(
bname
)
return
_instantiate_node
(
parent
.
server
,
parent
.
nodeid
,
rdesc
,
nodeid
,
bname
)
nodeids
=
_instantiate_node
(
parent
.
server
,
parent
.
nodeid
,
rdesc
,
nodeid
,
bname
)
return
[
Node
(
parent
.
server
,
nid
)
for
nid
in
nodeids
]
def
_instantiate_node
(
server
,
parentid
,
rdesc
,
nodeid
,
bname
,
recursive
=
True
):
"""
instantiate a node type under parent
"""
node_type
=
Node
(
server
,
rdesc
.
NodeId
)
refs
=
node_type
.
get_referenced_nodes
(
refs
=
ua
.
ObjectIds
.
HasModellingRule
)
# skip optional elements
if
len
(
refs
)
==
1
and
refs
[
0
].
nodeid
==
ua
.
NodeId
(
ua
.
ObjectIds
.
ModellingRule_Optional
):
return
[]
addnode
=
ua
.
AddNodesItem
()
addnode
.
RequestedNewNodeId
=
nodeid
addnode
.
BrowseName
=
bname
...
...
@@ -51,55 +47,35 @@ def _instantiate_node(server, parentid, rdesc, nodeid, bname, recursive=True):
addnode
.
ReferenceTypeId
=
rdesc
.
ReferenceTypeId
addnode
.
TypeDefinition
=
rdesc
.
TypeDefinition
node_type
=
Node
(
server
,
rdesc
.
NodeId
)
refs
=
node_type
.
get_referenced_nodes
(
refs
=
ua
.
ObjectIds
.
HasModellingRule
)
# skip optional elements
if
not
(
len
(
refs
)
==
1
and
refs
[
0
].
nodeid
==
ua
.
NodeId
(
ua
.
ObjectIds
.
ModellingRule_Optional
)
):
if
rdesc
.
NodeClass
in
(
ua
.
NodeClass
.
Object
,
ua
.
NodeClass
.
ObjectType
):
addnode
.
NodeClass
=
ua
.
NodeClass
.
Object
_read_and_copy_attrs
(
node_type
,
ua
.
ObjectAttributes
(),
addnode
)
if
rdesc
.
NodeClass
in
(
ua
.
NodeClass
.
Object
,
ua
.
NodeClass
.
ObjectType
):
addnode
.
NodeClass
=
ua
.
NodeClass
.
Object
_read_and_copy_attrs
(
node_type
,
ua
.
ObjectAttributes
(),
addnode
)
elif
rdesc
.
NodeClass
in
(
ua
.
NodeClass
.
Variable
,
ua
.
NodeClass
.
VariableType
):
addnode
.
NodeClass
=
ua
.
NodeClass
.
Variable
_read_and_copy_attrs
(
node_type
,
ua
.
VariableAttributes
(),
addnode
)
elif
rdesc
.
NodeClass
in
(
ua
.
NodeClass
.
Method
,):
addnode
.
NodeClass
=
ua
.
NodeClass
.
Method
_read_and_copy_attrs
(
node_type
,
ua
.
MethodAttributes
(),
addnode
)
else
:
print
(
"Instantiate: Node class not supported: "
,
rdesc
.
NodeClass
)
return
res
=
server
.
add_nodes
([
addnode
])[
0
]
if
recursive
:
parents
=
ua_utils
.
get_node_supertypes
(
node_type
,
includeitself
=
True
)
node
=
Node
(
server
,
res
.
AddedNodeId
)
for
parent
in
parents
:
descs
=
parent
.
get_children_descriptions
(
includesubtypes
=
False
)
for
c_rdesc
in
descs
:
# skip items that already exists, prefer the 'lowest' one in object hierarchy
if
not
ua_utils
.
is_child_present
(
node
,
c_rdesc
.
BrowseName
):
_instantiate_node
(
server
,
res
.
AddedNodeId
,
c_rdesc
,
nodeid
=
ua
.
NodeId
(
namespaceidx
=
res
.
AddedNodeId
.
NamespaceIndex
),
bname
=
c_rdesc
.
BrowseName
)
return
Node
(
server
,
res
.
AddedNodeId
)
elif
rdesc
.
NodeClass
in
(
ua
.
NodeClass
.
Variable
,
ua
.
NodeClass
.
VariableType
):
addnode
.
NodeClass
=
ua
.
NodeClass
.
Variable
_read_and_copy_attrs
(
node_type
,
ua
.
VariableAttributes
(),
addnode
)
elif
rdesc
.
NodeClass
in
(
ua
.
NodeClass
.
Method
,):
addnode
.
NodeClass
=
ua
.
NodeClass
.
Method
_read_and_copy_attrs
(
node_type
,
ua
.
MethodAttributes
(),
addnode
)
else
:
return
None
print
(
"Instantiate: Node class not supported: "
,
rdesc
.
NodeClass
)
return
res
=
server
.
add_nodes
([
addnode
])[
0
]
added_nodes
=
[
res
.
AddedNodeId
]
if
recursive
:
parents
=
ua_utils
.
get_node_supertypes
(
node_type
,
includeitself
=
True
)
node
=
Node
(
server
,
res
.
AddedNodeId
)
for
parent
in
parents
:
descs
=
parent
.
get_children_descriptions
(
includesubtypes
=
False
)
for
c_rdesc
in
descs
:
# skip items that already exists, prefer the 'lowest' one in object hierarchy
if
not
ua_utils
.
is_child_present
(
node
,
c_rdesc
.
BrowseName
):
nodeids
=
_instantiate_node
(
server
,
res
.
AddedNodeId
,
c_rdesc
,
nodeid
=
ua
.
NodeId
(
namespaceidx
=
res
.
AddedNodeId
.
NamespaceIndex
),
bname
=
c_rdesc
.
BrowseName
)
added_nodes
.
extend
(
nodeids
)
print
(
"RETURNING"
,
added_nodes
)
return
added_nodes
def
_read_and_copy_attrs
(
node_type
,
struct
,
addnode
):
names
=
[
name
for
name
in
struct
.
__dict__
.
keys
()
if
not
name
.
startswith
(
"_"
)
and
name
not
in
(
"BodyLength"
,
"TypeId"
,
"SpecifiedAttributes"
,
"Encoding"
,
"IsAbstract"
,
"EventNotifier"
)]
attrs
=
[
getattr
(
ua
.
AttributeIds
,
name
)
for
name
in
names
]
for
name
in
names
:
results
=
node_type
.
get_attributes
(
attrs
)
for
idx
,
name
in
enumerate
(
names
):
if
results
[
idx
].
StatusCode
.
is_good
():
if
name
==
"Value"
:
setattr
(
struct
,
name
,
results
[
idx
].
Value
)
else
:
setattr
(
struct
,
name
,
results
[
idx
].
Value
.
Value
)
else
:
print
(
"Instantiate: while copying attributes from node type %s, attribute %s, statuscode is %s"
%
(
node_type
,
name
,
results
[
idx
].
StatusCode
))
addnode
.
NodeAttributes
=
struct
opcua/common/manage_nodes.py
View file @
bffe1340
...
...
@@ -51,7 +51,7 @@ def create_object(parent, nodeid, bname, objecttype=None):
nodeid
,
qname
=
_parse_nodeid_qname
(
nodeid
,
bname
)
if
objecttype
is
not
None
:
objecttype
=
node
.
Node
(
parent
.
server
,
objecttype
)
return
instantiate
(
parent
,
objecttype
,
nodeid
,
bname
)
return
instantiate
(
parent
,
objecttype
,
nodeid
,
bname
)
[
0
]
else
:
return
node
.
Node
(
parent
.
server
,
_create_object
(
parent
.
server
,
parent
.
nodeid
,
nodeid
,
qname
,
ua
.
ObjectIds
.
BaseObjectType
))
...
...
tests/tests_common.py
View file @
bffe1340
...
...
@@ -551,7 +551,8 @@ class CommonTests(object):
ctrl_t
=
dev_t
.
add_object
(
0
,
"controller"
)
prop_t
=
ctrl_t
.
add_property
(
0
,
"state"
,
"Running"
)
mydevice
=
instantiate
(
self
.
opc
.
nodes
.
objects
,
dev_t
,
bname
=
"2:Device0001"
)
nodes
=
instantiate
(
self
.
opc
.
nodes
.
objects
,
dev_t
,
bname
=
"2:Device0001"
)
mydevice
=
nodes
[
0
]
self
.
assertEqual
(
mydevice
.
get_type_definition
(),
dev_t
.
nodeid
)
obj
=
mydevice
.
get_child
([
"0:controller"
])
...
...
@@ -565,7 +566,8 @@ class CommonTests(object):
v_t
=
devd_t
.
add_variable
(
0
,
"childparam"
,
1.0
)
p_t
=
devd_t
.
add_property
(
0
,
"sensorx_id"
,
"0340"
)
mydevicederived
=
instantiate
(
self
.
opc
.
nodes
.
objects
,
devd_t
,
bname
=
"2:Device0002"
)
nodes
=
instantiate
(
self
.
opc
.
nodes
.
objects
,
devd_t
,
bname
=
"2:Device0002"
)
mydevicederived
=
nodes
[
0
]
prop1
=
mydevicederived
.
get_child
([
"0:sensorx_id"
])
var1
=
mydevicederived
.
get_child
([
"0:childparam"
])
var_parent
=
mydevicederived
.
get_child
([
"0:sensor"
])
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment