d008a2ad8d
* [T1007-MG < T0997-MG] Authorization on paths (#501) * Added read authorization in paths operators * [T1007-MG < T1016-MG] Added authorization in create and delete operators (#513) * Added authorization in RemoveNodeCursor, RemoveExpandCursor, CreateNodeCursor, CreateExpandCursor,MergeCursor * [T1007-MG < T1014-MG] Add authorization to read operators (#520) Added label based access control to read operators (ScanAll). * [T1007-MG < T1015-MG] Add authorization to update operators (SetProperty, SetProperties, RemoveProperty) (#521) Added label based authorization to update operators Co-authored-by: niko4299 <51059248+niko4299@users.noreply.github.com> Co-authored-by: Josip Mrden <josip.mrden@memgraph.io>
159 lines
7.0 KiB
Python
159 lines
7.0 KiB
Python
# Copyright 2022 Memgraph Ltd.
|
|
#
|
|
# Use of this software is governed by the Business Source License
|
|
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
|
# License, and you may not use this file except in compliance with the Business Source License.
|
|
#
|
|
# As of the Change Date specified in that file, in accordance with
|
|
# the Business Source License, use of this software will be governed
|
|
# by the Apache License, Version 2.0, included in the file
|
|
# licenses/APL.txt.
|
|
|
|
import sys
|
|
import pytest
|
|
from common import connect, execute_and_fetch_all, reset_update_permissions
|
|
|
|
update_property_query = "MATCH (n:update_label) SET n.prop = 2 RETURN n.prop;"
|
|
update_properties_query = "MATCH (n:update_label) SET n = {prop: 2, prop2: 3} RETURN n.prop;"
|
|
remove_property_query = "MATCH (n:update_label) REMOVE n.prop RETURN n.prop;"
|
|
|
|
|
|
def test_can_read_node_when_given_update_grant():
|
|
admin_cursor = connect(username="admin", password="test").cursor()
|
|
reset_update_permissions(admin_cursor)
|
|
execute_and_fetch_all(admin_cursor, "GRANT UPDATE ON LABELS :update_label TO user;")
|
|
|
|
test_cursor = connect(username="user", password="test").cursor()
|
|
results = execute_and_fetch_all(test_cursor, "MATCH (n:update_label) RETURN n;")
|
|
|
|
assert len(results) == 1
|
|
|
|
|
|
def test_can_update_node_when_given_update_grant():
|
|
admin_cursor = connect(username="admin", password="test").cursor()
|
|
reset_update_permissions(admin_cursor)
|
|
execute_and_fetch_all(admin_cursor, "GRANT UPDATE ON LABELS :update_label TO user;")
|
|
|
|
test_cursor = connect(username="user", password="test").cursor()
|
|
|
|
update_property_actual = execute_and_fetch_all(test_cursor, update_property_query)
|
|
update_properties_actual = execute_and_fetch_all(test_cursor, update_properties_query)
|
|
remove_property_actual = execute_and_fetch_all(test_cursor, remove_property_query)
|
|
|
|
assert update_property_actual[0][0] == 2
|
|
assert update_properties_actual[0][0] == 2
|
|
assert remove_property_actual[0][0] is None
|
|
|
|
|
|
def test_can_not_update_node_when_given_deny():
|
|
admin_cursor = connect(username="admin", password="test").cursor()
|
|
reset_update_permissions(admin_cursor)
|
|
execute_and_fetch_all(admin_cursor, "DENY UPDATE ON LABELS :update_label TO user;")
|
|
|
|
test_cursor = connect(username="user", password="test").cursor()
|
|
|
|
update_property_actual = execute_and_fetch_all(test_cursor, update_property_query)
|
|
update_properties_actual = execute_and_fetch_all(test_cursor, update_properties_query)
|
|
remove_property_actual = execute_and_fetch_all(test_cursor, remove_property_query)
|
|
|
|
assert update_property_actual[0][0] == 1
|
|
assert update_properties_actual[0][0] == 1
|
|
assert remove_property_actual[0][0] == 1
|
|
|
|
|
|
def test_can_not_update_node_when_given_nothing():
|
|
admin_cursor = connect(username="admin", password="test").cursor()
|
|
reset_update_permissions(admin_cursor)
|
|
|
|
test_cursor = connect(username="user", password="test").cursor()
|
|
|
|
update_property_actual = execute_and_fetch_all(test_cursor, update_property_query)
|
|
update_properties_actual = execute_and_fetch_all(test_cursor, update_properties_query)
|
|
remove_property_actual = execute_and_fetch_all(test_cursor, remove_property_query)
|
|
|
|
assert len(update_property_actual) == 0
|
|
assert len(update_properties_actual) == 0
|
|
assert len(remove_property_actual) == 0
|
|
|
|
|
|
def test_can_not_update_node_when_given_read():
|
|
admin_cursor = connect(username="admin", password="test").cursor()
|
|
reset_update_permissions(admin_cursor)
|
|
execute_and_fetch_all(admin_cursor, "GRANT READ ON LABELS :update_label TO user;")
|
|
|
|
test_cursor = connect(username="user", password="test").cursor()
|
|
|
|
update_property_actual = execute_and_fetch_all(test_cursor, update_property_query)
|
|
update_properties_actual = execute_and_fetch_all(test_cursor, update_properties_query)
|
|
remove_property_actual = execute_and_fetch_all(test_cursor, remove_property_query)
|
|
|
|
assert update_property_actual[0][0] == 1
|
|
assert update_properties_actual[0][0] == 1
|
|
assert remove_property_actual[0][0] == 1
|
|
|
|
|
|
def test_can_not_update_node_when_given_read_globally():
|
|
admin_cursor = connect(username="admin", password="test").cursor()
|
|
reset_update_permissions(admin_cursor)
|
|
execute_and_fetch_all(admin_cursor, "GRANT READ ON LABELS * TO user;")
|
|
|
|
test_cursor = connect(username="user", password="test").cursor()
|
|
|
|
update_property_actual = execute_and_fetch_all(test_cursor, update_property_query)
|
|
update_properties_actual = execute_and_fetch_all(test_cursor, update_properties_query)
|
|
remove_property_actual = execute_and_fetch_all(test_cursor, remove_property_query)
|
|
|
|
assert update_property_actual[0][0] == 1
|
|
assert update_properties_actual[0][0] == 1
|
|
assert remove_property_actual[0][0] == 1
|
|
|
|
|
|
def test_can_update_node_when_given_update_globally():
|
|
admin_cursor = connect(username="admin", password="test").cursor()
|
|
reset_update_permissions(admin_cursor)
|
|
execute_and_fetch_all(admin_cursor, "GRANT UPDATE ON LABELS * TO user;")
|
|
|
|
test_cursor = connect(username="user", password="test").cursor()
|
|
update_property_actual = execute_and_fetch_all(test_cursor, update_property_query)
|
|
update_properties_actual = execute_and_fetch_all(test_cursor, update_properties_query)
|
|
remove_property_actual = execute_and_fetch_all(test_cursor, remove_property_query)
|
|
|
|
assert update_property_actual[0][0] == 2
|
|
assert update_properties_actual[0][0] == 2
|
|
assert remove_property_actual[0][0] is None
|
|
|
|
|
|
def test_can_update_node_when_given_create_delete_globally():
|
|
admin_cursor = connect(username="admin", password="test").cursor()
|
|
reset_update_permissions(admin_cursor)
|
|
execute_and_fetch_all(admin_cursor, "GRANT CREATE_DELETE ON LABELS * TO user;")
|
|
|
|
test_cursor = connect(username="user", password="test").cursor()
|
|
update_property_actual = execute_and_fetch_all(test_cursor, update_property_query)
|
|
update_properties_actual = execute_and_fetch_all(test_cursor, update_properties_query)
|
|
remove_property_actual = execute_and_fetch_all(test_cursor, remove_property_query)
|
|
|
|
assert update_property_actual[0][0] == 2
|
|
assert update_properties_actual[0][0] == 2
|
|
assert remove_property_actual[0][0] is None
|
|
|
|
|
|
def test_can_update_node_when_given_create_delete():
|
|
admin_cursor = connect(username="admin", password="test").cursor()
|
|
reset_update_permissions(admin_cursor)
|
|
execute_and_fetch_all(admin_cursor, "GRANT CREATE_DELETE ON LABELS :update_label TO user;")
|
|
|
|
test_cursor = connect(username="user", password="test").cursor()
|
|
|
|
update_property_actual = execute_and_fetch_all(test_cursor, update_property_query)
|
|
update_properties_actual = execute_and_fetch_all(test_cursor, update_properties_query)
|
|
remove_property_actual = execute_and_fetch_all(test_cursor, remove_property_query)
|
|
|
|
assert update_property_actual[0][0] == 2
|
|
assert update_properties_actual[0][0] == 2
|
|
assert remove_property_actual[0][0] is None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(pytest.main([__file__, "-rA"]))
|