Commit 8a6018f9 authored by Christoph Ziebuhr's avatar Christoph Ziebuhr Committed by oroulet

Allow regular users to do write requests

parent ec227ba7
from asyncua import ua from asyncua import ua
from asyncua.server.users import UserRole from asyncua.server.users import UserRole
WRITE_TYPES = [ ADMIN_TYPES = [
ua.ObjectIds.WriteRequest_Encoding_DefaultBinary,
ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary, ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary,
ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary, ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary,
ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary, ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary,
...@@ -11,11 +10,12 @@ WRITE_TYPES = [ ...@@ -11,11 +10,12 @@ WRITE_TYPES = [
ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary, ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary,
] ]
READ_TYPES = [ USER_TYPES = [
ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary, ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary,
ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary, ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary,
ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary, ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary,
ua.ObjectIds.ReadRequest_Encoding_DefaultBinary, ua.ObjectIds.ReadRequest_Encoding_DefaultBinary,
ua.ObjectIds.WriteRequest_Encoding_DefaultBinary,
ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary, ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary,
ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary, ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary,
ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary, ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary,
...@@ -49,15 +49,15 @@ class PermissionRuleset: ...@@ -49,15 +49,15 @@ class PermissionRuleset:
class SimpleRoleRuleset(PermissionRuleset): class SimpleRoleRuleset(PermissionRuleset):
""" """
Standard simple role-based ruleset. Standard simple role-based ruleset.
Admins alone can write, admins and users can read, and anonymous users can't do anything. Admins alone can change address space, admins and users can read/write, and anonymous users can't do anything.
""" """
def __init__(self): def __init__(self):
write_ids = list(map(ua.NodeId, WRITE_TYPES)) admin_ids = list(map(ua.NodeId, ADMIN_TYPES))
read_ids = list(map(ua.NodeId, READ_TYPES)) user_ids = list(map(ua.NodeId, USER_TYPES))
self._permission_dict = { self._permission_dict = {
UserRole.Admin: set().union(write_ids, read_ids), UserRole.Admin: set().union(admin_ids, user_ids),
UserRole.User: set().union(read_ids), UserRole.User: set().union(user_ids),
UserRole.Anonymous: set() UserRole.Anonymous: set()
} }
......
...@@ -88,9 +88,9 @@ async def test_permissions_admin(srv_crypto_one_cert): ...@@ -88,9 +88,9 @@ async def test_permissions_admin(srv_crypto_one_cert):
assert await clt.get_objects_node().get_children() assert await clt.get_objects_node().get_children()
objects = clt.nodes.objects objects = clt.nodes.objects
child = await objects.get_child(['0:MyObject', '0:MyVariable']) child = await objects.get_child(['0:MyObject', '0:MyVariable'])
await child.read_value()
await child.set_value(42.0) await child.set_value(42.0)
assert await child.read_value() == 42.0
await child.add_property(0, "MyProperty1", 3)
async def test_permissions_user(srv_crypto_one_cert): async def test_permissions_user(srv_crypto_one_cert):
clt = Client(uri_crypto_cert) clt = Client(uri_crypto_cert)
...@@ -106,9 +106,10 @@ async def test_permissions_user(srv_crypto_one_cert): ...@@ -106,9 +106,10 @@ async def test_permissions_user(srv_crypto_one_cert):
assert await clt.get_objects_node().get_children() assert await clt.get_objects_node().get_children()
objects = clt.nodes.objects objects = clt.nodes.objects
child = await objects.get_child(['0:MyObject', '0:MyVariable']) child = await objects.get_child(['0:MyObject', '0:MyVariable'])
await child.read_value() await child.set_value(44.0)
assert await child.read_value() == 44.0
with pytest.raises(ua.uaerrors.BadUserAccessDenied): with pytest.raises(ua.uaerrors.BadUserAccessDenied):
await child.set_value(42) await child.add_property(0, "MyProperty2", 3)
async def test_permissions_anonymous(srv_crypto_one_cert): async def test_permissions_anonymous(srv_crypto_one_cert):
...@@ -121,6 +122,7 @@ async def test_permissions_anonymous(srv_crypto_one_cert): ...@@ -121,6 +122,7 @@ async def test_permissions_anonymous(srv_crypto_one_cert):
server_certificate=srv_crypto_params[0][1], server_certificate=srv_crypto_params[0][1],
mode=ua.MessageSecurityMode.SignAndEncrypt mode=ua.MessageSecurityMode.SignAndEncrypt
) )
await clt.connect() async with clt:
await clt.get_endpoints() await clt.get_endpoints()
await clt.disconnect() with pytest.raises(ua.uaerrors.BadUserAccessDenied):
await clt.nodes.objects.get_children()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment