Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
81c10085
Commit
81c10085
authored
Mar 22, 2015
by
Olivier R-D
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
use python datetime instead of DateTime, fix tests
parent
8d501b27
Changes
8
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
195 additions
and
170 deletions
+195
-170
example-server.py
example-server.py
+1
-1
opcua/internal_server.py
opcua/internal_server.py
+23
-2
opcua/server.py
opcua/server.py
+2
-0
opcua/uaprotocol_auto.py
opcua/uaprotocol_auto.py
+95
-102
opcua/uatypes.py
opcua/uatypes.py
+38
-32
schemas/generate_model.py
schemas/generate_model.py
+1
-1
schemas/generate_protocol_python.py
schemas/generate_protocol_python.py
+4
-0
tests.py
tests.py
+31
-32
No files found.
example-server.py
View file @
81c10085
...
@@ -10,7 +10,7 @@ if __name__ == "__main__":
...
@@ -10,7 +10,7 @@ if __name__ == "__main__":
logging
.
basicConfig
(
level
=
logging
.
WARN
)
logging
.
basicConfig
(
level
=
logging
.
WARN
)
logger
=
logging
.
getLogger
(
"opcua.address_space"
)
logger
=
logging
.
getLogger
(
"opcua.address_space"
)
#logger = logging.getLogger("opcua.internal_server")
#logger = logging.getLogger("opcua.internal_server")
logger
.
setLevel
(
logging
.
DEBUG
)
#
logger.setLevel(logging.DEBUG)
server
=
Server
()
server
=
Server
()
server
.
set_endpoint
(
"opc.tcp://localhost:4841/freeopcua/server/"
)
server
.
set_endpoint
(
"opc.tcp://localhost:4841/freeopcua/server/"
)
server
.
set_server_name
(
"FreeOpcUa Example Server"
)
server
.
set_server_name
(
"FreeOpcUa Example Server"
)
...
...
opcua/internal_server.py
View file @
81c10085
"""
"""
Internal server to be used on server side
Internal server to be used on server side
"""
"""
from
datetime
import
datetime
import
uuid
import
uuid
import
logging
import
logging
from
threading
import
RLock
from
threading
import
RLock
,
Timer
from
opcua
import
ua
from
opcua
import
ua
from
opcua
import
utils
from
opcua
import
utils
from
opcua
import
Node
from
opcua.address_space
import
AddressSpace
from
opcua.address_space
import
AddressSpace
from
opcua.standard_address_space_part3
import
create_standard_address_space_Part3
from
opcua.standard_address_space_part3
import
create_standard_address_space_Part3
from
opcua.standard_address_space_part4
import
create_standard_address_space_Part4
from
opcua.standard_address_space_part4
import
create_standard_address_space_Part4
...
@@ -48,6 +50,25 @@ class InternalServer(object):
...
@@ -48,6 +50,25 @@ class InternalServer(object):
create_standard_address_space_Part13
(
self
.
aspace
)
create_standard_address_space_Part13
(
self
.
aspace
)
self
.
channels
=
{}
self
.
channels
=
{}
self
.
_lock
=
RLock
()
self
.
_lock
=
RLock
()
#set some node values expected by some clients
self
.
current_time_node
=
Node
(
self
,
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_CurrentTime
))
self
.
_stopev
=
False
def
start
(
self
):
Node
(
self
,
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_State
)).
set_value
(
0
)
Node
(
self
,
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_StartTime
)).
set_value
(
datetime
.
now
())
# set time every seconds, maybe we should disable it for performance reason??
self
.
_set_current_time
()
def
stop
(
self
):
self
.
_stopev
=
True
def
_set_current_time
(
self
):
if
self
.
_stopev
:
return
self
.
current_time_node
.
set_value
(
datetime
.
now
())
self
.
_timer
=
Timer
(
1
,
self
.
_set_current_time
)
self
.
_timer
.
start
()
def
open_secure_channel
(
self
,
params
,
currentchannel
=
None
):
def
open_secure_channel
(
self
,
params
,
currentchannel
=
None
):
self
.
logger
.
info
(
"open secure channel"
)
self
.
logger
.
info
(
"open secure channel"
)
...
@@ -61,7 +82,7 @@ class InternalServer(object):
...
@@ -61,7 +82,7 @@ class InternalServer(object):
else
:
else
:
channel
=
self
.
channels
[
currentchannel
.
SecurityToken
.
ChannelId
]
channel
=
self
.
channels
[
currentchannel
.
SecurityToken
.
ChannelId
]
channel
.
SecurityToken
.
TokenId
+=
1
channel
.
SecurityToken
.
TokenId
+=
1
channel
.
SecurityToken
.
CreatedAt
=
ua
.
DateTime
()
channel
.
SecurityToken
.
CreatedAt
=
datetime
.
now
()
channel
.
SecurityToken
.
RevisedLifetime
=
params
.
RequestedLifetime
channel
.
SecurityToken
.
RevisedLifetime
=
params
.
RequestedLifetime
channel
.
ServerNonce
=
uuid
.
uuid4
().
bytes
+
uuid
.
uuid4
().
bytes
channel
.
ServerNonce
=
uuid
.
uuid4
().
bytes
+
uuid
.
uuid4
().
bytes
self
.
channels
[
channel
.
SecurityToken
.
ChannelId
]
=
channel
self
.
channels
[
channel
.
SecurityToken
.
ChannelId
]
=
channel
...
...
opcua/server.py
View file @
81c10085
...
@@ -57,11 +57,13 @@ class Server(object):
...
@@ -57,11 +57,13 @@ class Server(object):
self
.
name
=
name
self
.
name
=
name
def
start
(
self
):
def
start
(
self
):
self
.
iserver
.
start
()
self
.
_set_endpoints
()
self
.
_set_endpoints
()
self
.
bserver
=
BinaryServer
(
self
.
iserver
,
self
.
endpoint
.
hostname
,
self
.
endpoint
.
port
)
self
.
bserver
=
BinaryServer
(
self
.
iserver
,
self
.
endpoint
.
hostname
,
self
.
endpoint
.
port
)
self
.
bserver
.
start
()
self
.
bserver
.
start
()
def
stop
(
self
):
def
stop
(
self
):
self
.
iserver
.
stop
()
self
.
bserver
.
stop
()
self
.
bserver
.
stop
()
...
...
opcua/uaprotocol_auto.py
View file @
81c10085
This diff is collapsed.
Click to expand it.
opcua/uatypes.py
View file @
81c10085
...
@@ -8,8 +8,15 @@ import struct
...
@@ -8,8 +8,15 @@ import struct
import
opcua.status_code
as
status_code
import
opcua.status_code
as
status_code
#UaTypes = ("Boolean", "SByte", "Byte", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double", "String", "DateTime", "Guid", "ByteString")
#types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
UaTypes
=
(
"Boolean"
,
"SByte"
,
"Byte"
,
"Int8"
,
"UInt8"
,
"Int16"
,
"UInt16"
,
"Int32"
,
"UInt32"
,
"Int64"
,
"UInt64"
,
"Float"
,
"Double"
)
#, "String", "DateTime", "Guid", "ByteString")
UaTypes
=
(
"Boolean"
,
"SByte"
,
"Byte"
,
"Int8"
,
"UInt8"
,
"Int16"
,
"UInt16"
,
"Int32"
,
"UInt32"
,
"Int64"
,
"UInt64"
,
"Float"
,
"Double"
)
def
datetime_to_win_epoch
(
dt
):
epch
=
(
dt
-
datetime
(
1601
,
1
,
1
,
0
,
0
)).
total_seconds
()
*
10
**
7
return
int
(
epch
)
def
win_epoch_to_datetime
(
epch
):
return
datetime
(
1601
,
1
,
1
)
+
timedelta
(
microseconds
=
epch
/
10.0
)
def
uatype_to_fmt
(
uatype
):
def
uatype_to_fmt
(
uatype
):
#if uatype == "String":
#if uatype == "String":
...
@@ -65,6 +72,9 @@ def pack_uatype(uatype, value):
...
@@ -65,6 +72,9 @@ def pack_uatype(uatype, value):
return
pack_string
(
value
)
return
pack_string
(
value
)
elif
uatype
in
(
"CharArray"
,
"ByteString"
):
elif
uatype
in
(
"CharArray"
,
"ByteString"
):
return
pack_bytes
(
value
)
return
pack_bytes
(
value
)
elif
uatype
==
"DateTime"
:
epch
=
datetime_to_win_epoch
(
value
)
return
struct
.
pack
(
'<q'
,
epch
)
elif
uatype
in
UaTypes
:
elif
uatype
in
UaTypes
:
fmt
=
'<'
+
uatype_to_fmt
(
uatype
)
fmt
=
'<'
+
uatype_to_fmt
(
uatype
)
return
struct
.
pack
(
fmt
,
value
)
return
struct
.
pack
(
fmt
,
value
)
...
@@ -76,6 +86,9 @@ def unpack_uatype(uatype, data):
...
@@ -76,6 +86,9 @@ def unpack_uatype(uatype, data):
return
unpack_string
(
data
)
return
unpack_string
(
data
)
elif
uatype
in
(
"CharArray"
,
"ByteString"
):
elif
uatype
in
(
"CharArray"
,
"ByteString"
):
return
unpack_bytes
(
data
)
return
unpack_bytes
(
data
)
elif
uatype
==
"DateTime"
:
epch
=
struct
.
unpack
(
'<q'
,
data
.
read
(
8
))[
0
]
return
win_epoch_to_datetime
(
epch
)
elif
uatype
in
UaTypes
:
elif
uatype
in
UaTypes
:
fmt
=
'<'
+
uatype_to_fmt
(
uatype
)
fmt
=
'<'
+
uatype_to_fmt
(
uatype
)
size
=
struct
.
calcsize
(
fmt
)
size
=
struct
.
calcsize
(
fmt
)
...
@@ -398,21 +411,15 @@ class QualifiedName(object):
...
@@ -398,21 +411,15 @@ class QualifiedName(object):
return
'QualifiedName({}:{})'
.
format
(
self
.
NamespaceIndex
,
self
.
Name
)
return
'QualifiedName({}:{})'
.
format
(
self
.
NamespaceIndex
,
self
.
Name
)
__repr__
=
__str__
__repr__
=
__str__
'''
class DateTime(object):
class DateTime(object):
def __init__(self, data=None):
def __init__(self, data=None):
if data is None:
if data is None:
self
.
data
=
self
.
_to1601
(
datetime
.
now
())
self.data =
datetime_to_win_epoch
(datetime.now())
else:
else:
self.data = data
self.data = data
def
_to1601
(
self
,
dt
):
return
(
dt
-
datetime
(
1601
,
1
,
1
,
0
,
0
)).
total_seconds
()
*
10
**
7
def
to_datetime
(
self
):
us
=
self
.
data
/
10.0
return
datetime
(
1601
,
1
,
1
)
+
timedelta
(
microseconds
=
us
)
@staticmethod
@staticmethod
def now():
def now():
return DateTime.from_datetime(datetime.now())
return DateTime.from_datetime(datetime.now())
...
@@ -420,7 +427,7 @@ class DateTime(object):
...
@@ -420,7 +427,7 @@ class DateTime(object):
@staticmethod
@staticmethod
def from_datetime(pydt):
def from_datetime(pydt):
dt = DateTime()
dt = DateTime()
dt
.
data
=
d
t
.
_to1601
(
pydt
)
dt.data = d
atetime_to_win_epoch
(pydt)
return dt
return dt
def to_binary(self):
def to_binary(self):
...
@@ -439,12 +446,14 @@ class DateTime(object):
...
@@ -439,12 +446,14 @@ class DateTime(object):
def to_time_t(self):
def to_time_t(self):
epoch = datetime.utcfromtimestamp(0)
epoch = datetime.utcfromtimestamp(0)
delta
=
self
.
to_datetime
()
-
epoch
delta = self.
win_epoch_to_datetime(self.data)
() - epoch
return delta.total_seconds()
return delta.total_seconds()
def __str__(self):
def __str__(self):
return
"Datetime({})"
.
format
(
self
.
to_datetime
(
).
isoformat
())
return "Datetime({})".format(
win_epoch_to_datetime(self.data
).isoformat())
__repr__ = __str__
__repr__ = __str__
'''
class
VariantType
(
Enum
):
class
VariantType
(
Enum
):
'''
'''
...
@@ -490,12 +499,11 @@ class Variant(object):
...
@@ -490,12 +499,11 @@ class Variant(object):
self
.
VariantType
=
self
.
_guess_type
(
self
.
Value
)
self
.
VariantType
=
self
.
_guess_type
(
self
.
Value
)
else
:
else
:
self
.
VariantType
=
varianttype
self
.
VariantType
=
varianttype
#special case for python datetime
if
type
(
self
.
Value
)
is
datetime
:
def
__eq__
(
self
,
other
):
self
.
Value
=
DateTime
.
from_datetime
(
self
.
Value
)
if
isinstance
(
other
,
Variant
)
and
self
.
VariantType
==
other
.
VariantType
and
self
.
Value
==
other
.
Value
:
#FIXME: finish
return
True
#if type(self.Value) in (list, tuple) :
return
False
#self.Value = [DateTime.from_datetime(i) for i in self.Value]
def
_guess_type
(
self
,
val
):
def
_guess_type
(
self
,
val
):
if
val
is
None
:
if
val
is
None
:
...
@@ -511,8 +519,7 @@ class Variant(object):
...
@@ -511,8 +519,7 @@ class Variant(object):
elif
type
(
val
)
==
datetime
:
elif
type
(
val
)
==
datetime
:
return
VariantType
.
DateTime
return
VariantType
.
DateTime
else
:
else
:
raise
Exception
(
"Could not guess variant type, specify type"
)
raise
Exception
(
"Could not guess UA type of {} with type {}, specify UA type"
.
format
(
val
,
type
(
val
)))
def
__str__
(
self
):
def
__str__
(
self
):
return
"Variant(val:{},type:{})"
.
format
(
self
.
Value
,
self
.
VariantType
)
return
"Variant(val:{},type:{})"
.
format
(
self
.
Value
,
self
.
VariantType
)
...
@@ -552,14 +559,13 @@ class DataValue(object):
...
@@ -552,14 +559,13 @@ class DataValue(object):
'''
'''
def
__init__
(
self
,
variant
=
None
):
def
__init__
(
self
,
variant
=
None
):
self
.
Encoding
=
0
self
.
Encoding
=
0
if
variant
is
None
:
if
not
type
(
variant
)
is
Variant
:
self
.
Value
=
Variant
()
variant
=
Variant
(
variant
)
else
:
self
.
Value
=
variant
self
.
Value
=
variant
self
.
StatusCode
=
StatusCode
()
self
.
StatusCode
=
StatusCode
()
self
.
SourceTimestamp
=
DateTime
()
self
.
SourceTimestamp
=
datetime
.
now
()
#
DateTime()
self
.
SourcePicoseconds
=
0
self
.
SourcePicoseconds
=
0
self
.
ServerTimestamp
=
DateTime
()
self
.
ServerTimestamp
=
datetime
.
now
()
#
DateTime()
self
.
ServerPicoseconds
=
0
self
.
ServerPicoseconds
=
0
def
to_binary
(
self
):
def
to_binary
(
self
):
...
@@ -576,9 +582,9 @@ class DataValue(object):
...
@@ -576,9 +582,9 @@ class DataValue(object):
if
self
.
StatusCode
:
if
self
.
StatusCode
:
packet
.
append
(
self
.
StatusCode
.
to_binary
())
packet
.
append
(
self
.
StatusCode
.
to_binary
())
if
self
.
SourceTimestamp
:
if
self
.
SourceTimestamp
:
packet
.
append
(
self
.
SourceTimestamp
.
to_binary
())
packet
.
append
(
pack_uatype
(
'DateTime'
,
self
.
SourceTimestamp
))
#
self.SourceTimestamp.to_binary())
if
self
.
ServerTimestamp
:
if
self
.
ServerTimestamp
:
packet
.
append
(
self
.
ServerTimestamp
.
to_binary
())
packet
.
append
(
pack_uatype
(
'DateTime'
,
self
.
ServerTimestamp
))
#
self.ServerTimestamp.to_binary())
if
self
.
SourcePicoseconds
:
if
self
.
SourcePicoseconds
:
packet
.
append
(
pack_uatype
(
'UInt16'
,
self
.
SourcePicoseconds
))
packet
.
append
(
pack_uatype
(
'UInt16'
,
self
.
SourcePicoseconds
))
if
self
.
ServerPicoseconds
:
if
self
.
ServerPicoseconds
:
...
@@ -594,9 +600,9 @@ class DataValue(object):
...
@@ -594,9 +600,9 @@ class DataValue(object):
if
obj
.
Encoding
&
(
1
<<
1
):
if
obj
.
Encoding
&
(
1
<<
1
):
obj
.
StatusCode
=
StatusCode
.
from_binary
(
data
)
obj
.
StatusCode
=
StatusCode
.
from_binary
(
data
)
if
obj
.
Encoding
&
(
1
<<
2
):
if
obj
.
Encoding
&
(
1
<<
2
):
obj
.
SourceTimestamp
=
DateTime
.
from_binary
(
data
)
obj
.
SourceTimestamp
=
unpack_uatype
(
'DateTime'
,
data
)
#
DateTime.from_binary(data)
if
obj
.
Encoding
&
(
1
<<
3
):
if
obj
.
Encoding
&
(
1
<<
3
):
obj
.
ServerTimestamp
=
DateTime
.
from_binary
(
data
)
obj
.
ServerTimestamp
=
unpack_uatype
(
'DateTime'
,
data
)
#
DateTime.from_binary(data)
if
obj
.
Encoding
&
(
1
<<
4
):
if
obj
.
Encoding
&
(
1
<<
4
):
obj
.
SourcePicoseconds
=
unpack_uatype
(
'UInt16'
,
data
)
obj
.
SourcePicoseconds
=
unpack_uatype
(
'UInt16'
,
data
)
if
obj
.
Encoding
&
(
1
<<
5
):
if
obj
.
Encoding
&
(
1
<<
5
):
...
...
schemas/generate_model.py
View file @
81c10085
...
@@ -77,7 +77,7 @@ class Field(object):
...
@@ -77,7 +77,7 @@ class Field(object):
__repr__
=
__str__
__repr__
=
__str__
def
is_native_type
(
self
):
def
is_native_type
(
self
):
if
self
.
uatype
in
(
"Char"
,
"SByte"
,
"Int8"
,
"Int16"
,
"Int32"
,
"Int64"
,
"UInt8"
,
"UInt16"
,
"UInt32"
,
"UInt64"
,
"Boolean"
,
"Double"
,
"Float"
,
"Byte"
,
"String"
,
"CharArray"
,
"ByteString"
):
if
self
.
uatype
in
(
"Char"
,
"SByte"
,
"Int8"
,
"Int16"
,
"Int32"
,
"Int64"
,
"UInt8"
,
"UInt16"
,
"UInt32"
,
"UInt64"
,
"Boolean"
,
"Double"
,
"Float"
,
"Byte"
,
"String"
,
"CharArray"
,
"ByteString"
,
"DateTime"
):
return
True
return
True
return
False
return
False
...
...
schemas/generate_protocol_python.py
View file @
81c10085
...
@@ -40,6 +40,8 @@ class CodeGenerator(object):
...
@@ -40,6 +40,8 @@ class CodeGenerator(object):
self
.
write
(
"Autogenerate code from xml spec"
)
self
.
write
(
"Autogenerate code from xml spec"
)
self
.
write
(
"'''"
)
self
.
write
(
"'''"
)
self
.
write
(
""
)
self
.
write
(
""
)
self
.
write
(
"from datetime import datetime"
)
self
.
write
(
""
)
self
.
write
(
"from opcua.uatypes import *"
)
self
.
write
(
"from opcua.uatypes import *"
)
self
.
write
(
"from opcua.object_ids import ObjectIds"
)
self
.
write
(
"from opcua.object_ids import ObjectIds"
)
self
.
write
(
""
)
self
.
write
(
""
)
...
@@ -215,6 +217,8 @@ class CodeGenerator(object):
...
@@ -215,6 +217,8 @@ class CodeGenerator(object):
return
"b''"
return
"b''"
elif
field
.
uatype
in
(
"Boolean"
):
elif
field
.
uatype
in
(
"Boolean"
):
return
"True"
return
"True"
elif
field
.
uatype
in
(
"DateTime"
):
return
"datetime.now()"
elif
field
.
uatype
in
(
"Int8"
,
"Int16"
,
"Int32"
,
"Int64"
,
"UInt8"
,
"UInt16"
,
"UInt32"
,
"UInt64"
,
"Double"
,
"Float"
,
"Byte"
):
elif
field
.
uatype
in
(
"Int8"
,
"Int16"
,
"Int32"
,
"Int64"
,
"UInt8"
,
"UInt16"
,
"UInt32"
,
"UInt64"
,
"Double"
,
"Float"
,
"Byte"
):
return
0
return
0
else
:
else
:
...
...
tests.py
View file @
81c10085
#! /usr/bin/env python
#! /usr/bin/env python
import
io
import
io
import
sys
import
sys
import
datetime
from
datetime
import
datetime
import
unittest
import
unittest
from
threading
import
Thread
,
Event
from
threading
import
Thread
,
Event
try
:
try
:
...
@@ -84,14 +84,6 @@ class Unit(unittest.TestCase):
...
@@ -84,14 +84,6 @@ class Unit(unittest.TestCase):
d
=
obj
.
to_binary
()
d
=
obj
.
to_binary
()
print
(
d
)
print
(
d
)
def
test_datetime
(
self
):
dt
=
ua
.
DateTime
()
print
(
dt
)
n
=
ua
.
DateTime
.
now
()
print
(
n
)
d
=
n
.
to_binary
()
self
.
assertEqual
(
len
(
d
),
8
)
def
test_qualified_name
(
self
):
def
test_qualified_name
(
self
):
qn
=
ua
.
QualifiedName
(
"Root"
,
0
)
qn
=
ua
.
QualifiedName
(
"Root"
,
0
)
print
(
qn
)
print
(
qn
)
...
@@ -147,35 +139,42 @@ class Unit(unittest.TestCase):
...
@@ -147,35 +139,42 @@ class Unit(unittest.TestCase):
def
test_datavalue
(
self
):
def
test_datavalue
(
self
):
dv
=
ua
.
DataValue
(
123
)
dv
=
ua
.
DataValue
(
123
)
self
.
assertEqual
(
dv
.
Value
,
123
)
self
.
assertEqual
(
dv
.
Value
,
ua
.
Variant
(
123
))
self
.
assertEqual
(
type
(
dv
.
Value
),
ua
.
Variant
)
dv
=
ua
.
DataValue
(
'abc'
)
dv
=
ua
.
DataValue
(
'abc'
)
self
.
assertEqual
(
dv
.
Value
,
'abc'
)
self
.
assertEqual
(
dv
.
Value
,
ua
.
Variant
(
'abc'
)
)
#tnow = int(time.time())
now
=
datetime
.
now
()
#dv.source_timestamp = ua.DateTime.from_time_t(tnow)
dv
.
source_timestamp
=
now
#self.assertEqual(int(dv.source_timestamp.to_time_t()), tnow)
#self.assertEqual(int(dv.source_timestamp.to_time_t()), tnow)
def
test_variant
(
self
):
def
test_variant
(
self
):
dv
=
ua
.
Variant
(
True
,
ua
.
VariantType
.
Boolean
)
dv
=
ua
.
Variant
(
True
,
ua
.
VariantType
.
Boolean
)
self
.
assertEqual
(
dv
.
Value
,
True
)
self
.
assertEqual
(
dv
.
Value
,
True
)
self
.
assertEqual
(
type
(
dv
.
Value
),
bool
)
self
.
assertEqual
(
type
(
dv
.
Value
),
bool
)
now
=
datetime
.
now
()
v
=
ua
.
Variant
(
now
)
self
.
assertEqual
(
v
.
Value
,
now
)
self
.
assertEqual
(
v
.
VariantType
,
ua
.
VariantType
.
DateTime
)
v2
=
ua
.
Variant
.
from_binary
(
ua
.
utils
.
Buffer
(
v
.
to_binary
()))
#self.assertEqual(v.Value, v2.Value)
self
.
assertEqual
(
v
.
VariantType
,
v2
.
VariantType
)
def
test_variant_array
(
self
):
v
=
ua
.
Variant
([
1
,
2
,
3
,
4
,
5
])
self
.
assertEqual
(
v
.
Value
[
1
],
2
)
#self.assertEqual(v.VarianType, ua.VariantType.Int64) # we do not care, we should aonly test for sutff that matter
v2
=
ua
.
Variant
.
from_binary
(
ua
.
utils
.
Buffer
(
v
.
to_binary
()))
self
.
assertEqual
(
v
.
Value
,
v2
.
Value
)
self
.
assertEqual
(
v
.
VariantType
,
v2
.
VariantType
)
now
=
datetime
.
now
()
v
=
ua
.
Variant
([
now
])
self
.
assertEqual
(
v
.
Value
[
0
],
now
)
self
.
assertEqual
(
v
.
VariantType
,
ua
.
VariantType
.
DateTime
)
v2
=
ua
.
Variant
.
from_binary
(
ua
.
utils
.
Buffer
(
v
.
to_binary
()))
#self.assertEqual(v.Value, v2.Value)
self
.
assertEqual
(
v
.
VariantType
,
v2
.
VariantType
)
"""
def test_datetime(self):
tnow1 = int(time.time())
tnow = int((datetime.datetime.utcnow() - datetime.datetime(1970,1,1)).total_seconds())
self.assertEqual(tnow, tnow1) #if this one fails this is a system error, not freopcua
dt = ua.DateTime.from_time_t(tnow)
self.assertEqual(tnow, dt.to_time_t())
pydt = dt.to_datetime()
self.assertEqual(tnow, int((pydt - datetime.datetime(1970,1,1)).total_seconds()))
dt2 = ua.DateTime(pydt)
#self.assertEqual(dt2, dt) #FIXME: not implemented
pydt2 = dt.to_datetime()
self.assertEqual(pydt, pydt2)
"""
class
CommonTests
(
object
):
class
CommonTests
(
object
):
'''
'''
...
@@ -276,7 +275,7 @@ class CommonTests(object):
...
@@ -276,7 +275,7 @@ class CommonTests(object):
def
test_datetime_write
(
self
):
def
test_datetime_write
(
self
):
time_node
=
self
.
opc
.
get_node
(
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_CurrentTime
))
time_node
=
self
.
opc
.
get_node
(
ua
.
NodeId
(
ua
.
ObjectIds
.
Server_ServerStatus_CurrentTime
))
now
=
datetime
.
datetime
.
now
()
now
=
datetime
.
now
()
objects
=
self
.
opc
.
get_objects_node
()
objects
=
self
.
opc
.
get_objects_node
()
v1
=
objects
.
add_variable
(
4
,
"test_datetime"
,
now
)
v1
=
objects
.
add_variable
(
4
,
"test_datetime"
,
now
)
tid
=
v1
.
get_value
()
tid
=
v1
.
get_value
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment