Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
be502ffb
Commit
be502ffb
authored
Feb 20, 2016
by
ORD
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #136 from FreeOpcUa/dev2
add tests for connection with crypto
parents
f182d979
2c56efe6
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
11 additions
and
81 deletions
+11
-81
opcua/server/binary_server.py
opcua/server/binary_server.py
+0
-79
tests/tests.py
tests/tests.py
+9
-1
tests/tests_client.py
tests/tests_client.py
+2
-1
No files found.
opcua/server/binary_server.py
deleted
100644 → 0
View file @
f182d979
"""
Socket server forwarding request to internal server
"""
import
logging
try
:
import
socketserver
except
ImportError
:
import
SocketServer
as
socketserver
from
threading
import
Thread
from
threading
import
Condition
from
opcua
import
ua
from
opcua.uaprocessor
import
UaProcessor
logger
=
logging
.
getLogger
(
__name__
)
class
BinaryServer
(
Thread
):
"""
Socket server forwarding request to internal server
"""
def
__init__
(
self
,
internal_server
,
hostname
,
port
):
Thread
.
__init__
(
self
)
self
.
socket_server
=
None
self
.
hostname
=
hostname
self
.
port
=
port
self
.
iserver
=
internal_server
self
.
_cond
=
Condition
()
def
start
(
self
):
with
self
.
_cond
:
Thread
.
start
(
self
)
self
.
_cond
.
wait
()
def
run
(
self
):
logger
.
warning
(
"Listening on %s:%s"
,
self
.
hostname
,
self
.
port
)
socketserver
.
TCPServer
.
allow_reuse_address
=
True
# get rid of address already in used warning
self
.
socket_server
=
ThreadingTCPServer
((
self
.
hostname
,
self
.
port
),
UAHandler
)
# self.socket_server.daemon_threads = True # this will force a shutdown of all threads, maybe too hard
self
.
socket_server
.
internal_server
=
self
.
iserver
# allow handler to acces server properties
with
self
.
_cond
:
self
.
_cond
.
notify_all
()
self
.
socket_server
.
serve_forever
()
def
stop
(
self
):
logger
.
warning
(
"server shutdown request"
)
self
.
socket_server
.
shutdown
()
class
UAHandler
(
socketserver
.
BaseRequestHandler
):
"""
The RequestHandler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
"""
def
handle
(
self
):
sock
=
ua
.
utils
.
SocketWrapper
(
self
.
request
)
processor
=
UaProcessor
(
self
.
server
.
internal_server
,
sock
,
self
.
client_address
)
try
:
while
True
:
hdr
=
ua
.
Header
.
from_string
(
sock
)
body
=
sock
.
read
(
hdr
.
body_size
)
ret
=
processor
.
process
(
hdr
,
ua
.
utils
.
Buffer
(
body
))
if
not
ret
:
break
except
ua
.
utils
.
SocketClosedException
:
logger
.
warning
(
"Server has closed connection"
)
except
Exception
:
logger
.
exception
(
"Exception raised while parsing message from client, closing"
)
class
ThreadingTCPServer
(
socketserver
.
ThreadingMixIn
,
socketserver
.
TCPServer
):
pass
tests/tests.py
View file @
be502ffb
...
...
@@ -3,6 +3,13 @@ import logging
import
sys
sys
.
path
.
insert
(
0
,
".."
)
sys
.
path
.
insert
(
0
,
"."
)
try
:
from
opcua.crypto
import
uacrypto
CRYPTOGRAPHY_AVAILABLE
=
True
except
ImportError
:
print
(
"WARNING: CRYPTO NOT AVAILABLE, CRYPTO TESTS DISABLED!!"
)
CRYPTOGRAPHY_AVAILABLE
=
False
...
...
@@ -10,7 +17,8 @@ from tests_cmd_lines import TestCmdLines
from
tests_server
import
TestServer
from
tests_client
import
TestClient
from
tests_unit
import
Unit
from
tests_crypto_connect
import
TestCryptoConnect
if
CRYPTOGRAPHY_AVAILABLE
:
from
tests_crypto_connect
import
TestCryptoConnect
if
__name__
==
'__main__'
:
...
...
tests/tests_client.py
View file @
be502ffb
...
...
@@ -25,7 +25,8 @@ class TestClient(unittest.TestCase, CommonTests):
self
.
srv
.
start
()
# start admin client
self
.
clt
=
Client
(
'opc.tcp://admin@localhost:%d'
%
port_num1
)
# long timeout since travis (automated testing) can be really slow
self
.
clt
=
Client
(
'opc.tcp://admin@localhost:%d'
%
port_num1
,
timeout
=
10
)
self
.
clt
.
connect
()
self
.
opc
=
self
.
clt
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment