Commit 8c5104c8 authored by Alexander Korolkov's avatar Alexander Korolkov Committed by olivier R-D

Support server-size secure connections

Moved most of the common logic to uaprotocol.SecureConnection class.
Example of server with security enabled:
tools/uaserver --populate --certificate cert.pem --private_key pk.pem
parent 39e3a62e
......@@ -23,14 +23,11 @@ class UASocketClient(object):
self.timeout = timeout
self._socket = None
self._do_stop = False
self._security_token = ua.ChannelSecurityToken()
self.authentication_token = ua.NodeId()
self._sequence_number = 0
self._request_id = 0
self._request_handle = 0
self._callbackmap = {}
self._security_policy = security_policy
self._max_chunk_size = 65536
self._connection = ua.SecureConnection(security_policy)
def start(self):
"""
......@@ -61,14 +58,9 @@ class UASocketClient(object):
if callback:
future.add_done_callback(callback)
self._callbackmap[self._request_id] = future
for chunk in ua.MessageChunk.message_to_chunks(self._security_policy, binreq, self._max_chunk_size,
message_type=message_type,
channel_id=self._security_token.ChannelId,
request_id=self._request_id,
token_id=self._security_token.TokenId):
self._sequence_number += 1
chunk.SequenceHeader.SequenceNumber = self._sequence_number
self._socket.write(chunk.to_binary())
msg = self._connection.message_to_binary(binreq,
message_type, self._request_id)
self._socket.write(msg)
return future
def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
......@@ -104,14 +96,11 @@ class UASocketClient(object):
self.logger.info("Thread ended")
def _receive(self):
msg = ua.tcp_message_from_socket(self._security_policy, self._socket)
if isinstance(msg, ua.MessageChunk):
chunks = [msg]
# TODO: check everything
while chunks[-1].MessageHeader.ChunkType == ua.ChunkType.Intermediate:
chunks.append(ua.tcp_message_from_socket(self._security_policy, self._socket))
body = b"".join([c.Body for c in chunks])
self._call_callback(msg.SequenceHeader.RequestId, utils.Buffer(body))
msg = self._connection.receive_from_socket(self._socket)
if msg is None:
return
elif isinstance(msg, ua.Message):
self._call_callback(msg.request_id(), msg.body())
elif isinstance(msg, ua.Acknowledge):
self._call_callback(0, msg)
elif isinstance(msg, ua.ErrorMessage):
......@@ -126,19 +115,6 @@ class UASocketClient(object):
raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
future.set_result(body)
def _write_socket(self, hdr, *args):
alle = []
for arg in args:
data = arg.to_binary()
hdr.add_size(len(data))
self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
self.logger.debug("struct: %s", arg)
self.logger.debug("data: %s", data)
alle.append(data)
alle.insert(0, hdr.to_binary())
alle = b"".join(alle)
self._socket.write(alle)
def _create_request_header(self, timeout=1000):
hdr = ua.RequestHeader()
hdr.AuthenticationToken = self.authentication_token
......@@ -165,13 +141,12 @@ class UASocketClient(object):
def send_hello(self, url):
hello = ua.Hello()
hello.EndpointUrl = url
header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
future = Future()
with self._lock:
self._callbackmap[0] = future
self._write_socket(header, hello)
binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
self._socket.write(binmsg)
ack = future.result(self.timeout)
self._max_chunk_size = ack.SendBufferSize # client shouldn't send chunks larger than this
return ack
def open_secure_channel(self, params):
......@@ -182,7 +157,7 @@ class UASocketClient(object):
response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
response.ResponseHeader.ServiceResult.check()
self._security_token = response.Parameters.SecurityToken
self._connection.set_security_token(response.Parameters.SecurityToken)
return response.Parameters
def close_secure_channel(self):
......
......@@ -328,6 +328,8 @@ class SecurityPolicyBasic128Rsa15(SecurityPolicy):
def __init__(self, server_cert, client_cert, client_pk, mode):
require_cryptography(self)
if isinstance(server_cert, bytes):
server_cert = uacrypto.x509_from_der(server_cert)
# even in Sign mode we need to asymmetrically encrypt secrets
# transmitted in OpenSecureChannel. So SignAndEncrypt here
self.asymmetric_cryptography = Cryptography(
......@@ -387,6 +389,8 @@ class SecurityPolicyBasic256(SecurityPolicy):
def __init__(self, server_cert, client_cert, client_pk, mode):
require_cryptography(self)
if isinstance(server_cert, bytes):
server_cert = uacrypto.x509_from_der(server_cert)
# even in Sign mode we need to asymmetrically encrypt secrets
# transmitted in OpenSecureChannel. So SignAndEncrypt here
self.asymmetric_cryptography = Cryptography(
......
......@@ -24,6 +24,10 @@ class BinaryServer(object):
self.iserver = internal_server
self.loop = internal_server.loop
self._server = None
self._policies = []
def set_policies(self, policies):
self._policies = policies
def start(self):
......@@ -39,12 +43,14 @@ class BinaryServer(object):
iserver = self.iserver
loop = self.loop
logger = self.logger
policies = self._policies
def connection_made(self, transport):
self.peername = transport.get_extra_info('peername')
self.logger.info('New connection from %s', self.peername)
self.transport = transport
self.processor = UAProcessor(self.iserver, self.transport)
self.processor.set_policies(self.policies)
self.data = b""
def connection_lost(self, ex):
......
......@@ -165,7 +165,6 @@ class InternalSession(object):
InternalSession._counter += 1
self.authentication_token = ua.NodeId(self._auth_counter)
InternalSession._auth_counter += 1
self.nonce = utils.create_nonce()
self.subscriptions = []
#self.logger.debug("Created internal session %s for user %s", self.name, self.user)
print("Created internal session {} for user {}".format(self.name, self.user))
......@@ -185,6 +184,7 @@ class InternalSession(object):
result.AuthenticationToken = self.authentication_token
result.RevisedSessionTimeout = params.RequestedSessionTimeout
result.MaxRequestMessageSize = 65536
self.nonce = utils.create_nonce(32)
result.ServerNonce = self.nonce
result.ServerEndpoints = self.get_endpoints(sockname=sockname)
......@@ -200,6 +200,7 @@ class InternalSession(object):
result = ua.ActivateSessionResult()
if self.state != SessionState.Created:
raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
self.nonce = utils.create_nonce(32)
result.ServerNonce = self.nonce
for _ in params.ClientSoftwareCertificates:
result.Results.append(ua.StatusCode())
......
......@@ -18,6 +18,13 @@ from opcua.common.event import Event
from opcua.common.subscription import Subscription
from opcua.common import xmlimporter
from opcua.client.client import Client
from opcua.crypto import security_policies
use_crypto = True
try:
from opcua.crypto import uacrypto
except:
print("cryptography is not installed, use of crypto disabled")
use_crypto = False
class Server(object):
......@@ -60,12 +67,24 @@ class Server(object):
self.bserver = None
self._discovery_clients = {}
self._discovery_period = 60
self.certificate = None
self.private_key = None
self._policies = []
# setup some expected values
self.register_namespace(self.application_uri)
sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
sa_node.set_value([self.application_uri])
def load_certificate(self, path):
"""
load server certificate from file, either pem or der
"""
self.certificate = uacrypto.load_certificate(path)
def load_private_key(self, path):
self.private_key = uacrypto.load_private_key(path)
def disable_clock(self, val=True):
"""
for debugging you may want to disable clock that write every second
......@@ -136,8 +155,34 @@ class Server(object):
def _setup_server_nodes(self):
# to be called just before starting server since it needs all parameters to be setup
self._set_endpoints()
def _set_endpoints(self):
self._policies = [ua.SecurityPolicyFactory()]
if self.certificate and self.private_key:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(
security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt,
self.certificate, self.private_key))
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(
security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign,
self.certificate, self.private_key))
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(
security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt,
self.certificate, self.private_key))
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(
security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign,
self.certificate, self.private_key))
def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'anonymous'
idtoken.TokenType = ua.UserTokenType.Anonymous
......@@ -164,8 +209,10 @@ class Server(object):
edp = ua.EndpointDescription()
edp.EndpointUrl = self.endpoint.geturl()
edp.Server = appdesc
edp.SecurityMode = ua.MessageSecurityMode.None_
edp.SecurityPolicyUri = 'http://opcfoundation.org/UA/SecurityPolicy#None'
if self.certificate:
edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
edp.SecurityMode = mode
edp.SecurityPolicyUri = policy.URI
edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3]
edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
edp.SecurityLevel = 0
......@@ -181,6 +228,7 @@ class Server(object):
self._setup_server_nodes()
self.iserver.start()
self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
self.bserver.set_policies(self._policies)
self.bserver.start()
def stop(self):
......
......@@ -28,39 +28,22 @@ class UAProcessor(object):
self._socketlock = Lock()
self._datalock = Lock()
self._publishdata_queue = []
self._seq_number = 0
self._security_policy = ua.SecurityPolicy()
self._max_chunk_size = 65536
self._connection = ua.SecureConnection(ua.SecurityPolicy())
def set_policies(self, policies):
self._connection.set_policy_factories(policies)
def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
with self._socketlock:
response.ResponseHeader.RequestHandle = requesthandle
for chunk in ua.MessageChunk.message_to_chunks(self._security_policy, response.to_binary(), self._max_chunk_size, msgtype,
channel_id=self.channel.SecurityToken.ChannelId,
token_id=self.channel.SecurityToken.TokenId,
request_id=seqhdr.RequestId):
self._seq_number += 1
chunk.SequenceHeader.SequenceNumber = self._seq_number
self.socket.write(chunk.to_binary())
def _write_socket(self, hdr, *args):
alle = []
for arg in args:
data = arg.to_binary()
hdr.add_size(len(data))
alle.append(data)
alle.insert(0, hdr.to_binary())
alle = b"".join(alle)
self.logger.info("writting %s bytes to socket, with header %s ", len(alle), hdr)
#self.logger.info("writting data %s", hdr, [i for i in args])
#self.logger.debug("data: %s", alle)
self.socket.write(alle)
def open_secure_channel(self, body):
algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
seqhdr = ua.SequenceHeader.from_binary(body)
data = self._connection.message_to_binary(response.to_binary(),
msgtype, seqhdr.RequestId)
self.socket.write(data)
def open_secure_channel(self, algohdr, seqhdr, body):
request = ua.OpenSecureChannelRequest.from_binary(body)
self._connection.select_policy(algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
channel = self._open_secure_channel(request.Parameters)
# send response
response = ua.OpenSecureChannelResponse()
......@@ -80,33 +63,32 @@ class UAProcessor(object):
self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
def process(self, header, body):
if header.MessageType == ua.MessageType.Hello:
hello = ua.Hello.from_binary(body)
hdr = ua.Header(ua.MessageType.Acknowledge, ua.ChunkType.Single)
ack = ua.Acknowledge()
self._max_chunk_size = hello.ReceiveBufferSize
ack.ReceiveBufferSize = hello.ReceiveBufferSize
ack.SendBufferSize = hello.SendBufferSize
self._write_socket(hdr, ack)
elif header.MessageType == ua.MessageType.Error:
self.logger.warning("Received an error message type")
msg = self._connection.receive_from_header_and_body(header, body)
if isinstance(msg, ua.Message):
if header.MessageType == ua.MessageType.SecureOpen:
self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
elif header.MessageType == ua.MessageType.SecureClose:
if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
return False
elif header.MessageType == ua.MessageType.SecureOpen:
self.open_secure_channel(body)
elif header.MessageType == ua.MessageType.SecureMessage:
return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
elif header.MessageType == ua.MessageType.SecureClose:
if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
return False
elif isinstance(msg, ua.Hello):
ack = ua.Acknowledge()
ack.ReceiveBufferSize = msg.ReceiveBufferSize
ack.SendBufferSize = msg.SendBufferSize
data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
self.socket.write(data)
elif header.MessageType == ua.MessageType.SecureMessage:
algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
seqhdr = ua.SequenceHeader.from_binary(body)
return self.process_message(algohdr, seqhdr, body)
elif isinstance(msg, ua.ErrorMessage):
self.logger.warning("Received an error message type")
else:
self.logger.warning("Unsupported message type: %s", header.MessageType)
raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
return True
def process_message(self, algohdr, seqhdr, body):
......@@ -132,6 +114,9 @@ class UAProcessor(object):
response = ua.CreateSessionResponse()
response.Parameters = sessiondata
response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
response.Parameters.ServerSignature.Signature = self._connection._security_policy.asymmetric_cryptography.signature(self._connection._security_policy.server_certificate + params.ClientNonce)
response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
self.logger.info("sending create sesssion response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
......@@ -154,6 +139,8 @@ class UAProcessor(object):
self.logger.info("request to activate non-existing session")
raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
self._connection._security_policy.asymmetric_cryptography.verify(self._connection._security_policy.client_certificate + self.session.nonce, params.ClientSignature.Signature)
result = self.session.activate_session(params)
response = ua.ActivateSessionResponse()
......@@ -390,7 +377,9 @@ class UAProcessor(object):
self.channel.SecurityToken.TokenId += 1
self.channel.SecurityToken.CreatedAt = datetime.now()
self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
self.channel.ServerNonce = utils.create_nonce()
self.channel.ServerNonce = utils.create_nonce(self._connection._security_policy.symmetric_key_size)
self._connection.set_security_token(self.channel.SecurityToken)
self._connection._security_policy.make_symmetric_key(self.channel.ServerNonce, params.ClientNonce)
return self.channel
def close(self):
......
......@@ -484,11 +484,19 @@ def uaserver():
"--shell",
action="store_true",
help="Start python shell instead of randomly changing node values")
parser.add_argument("--certificate",
help="set server certificate")
parser.add_argument("--private_key",
help="set server private key")
args = parser.parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
server = Server()
server.set_endpoint(args.url)
if args.certificate:
server.load_certificate(args.certificate)
if args.private_key:
server.load_private_key(args.private_key)
server.disable_clock(args.disable_clock)
server.set_server_name("FreeOpcUa Example Server")
if args.xml:
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment