418 lines
15 KiB
Python
Executable File
418 lines
15 KiB
Python
Executable File
#!/usr/bin/python3 -u
|
|
import argparse
|
|
import atexit
|
|
import os
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
|
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
|
|
|
|
CONFIG_TEMPLATE = """
|
|
server:
|
|
host: "127.0.0.1"
|
|
port: 1389
|
|
encryption: "{encryption}"
|
|
cert_file: ""
|
|
key_file: ""
|
|
ca_file: ""
|
|
validate_cert: false
|
|
|
|
users:
|
|
prefix: "{prefix}"
|
|
suffix: "{suffix}"
|
|
|
|
roles:
|
|
root_dn: "{root_dn}"
|
|
root_objectclass: "{root_objectclass}"
|
|
user_attribute: "{user_attribute}"
|
|
role_attribute: "{role_attribute}"
|
|
"""
|
|
|
|
|
|
def wait_for_server(port, delay=0.1):
|
|
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
|
|
while subprocess.call(cmd) != 0:
|
|
time.sleep(0.01)
|
|
time.sleep(delay)
|
|
|
|
|
|
def execute_tester(binary, queries, username="", password="",
|
|
auth_should_fail=False, query_should_fail=False):
|
|
if password == "":
|
|
password = username
|
|
args = [binary, "--username", username, "--password", password]
|
|
if auth_should_fail:
|
|
args.append("--auth-should-fail")
|
|
if query_should_fail:
|
|
args.append("--query-should-fail")
|
|
args.extend(queries)
|
|
subprocess.run(args).check_returncode()
|
|
|
|
|
|
class Memgraph:
|
|
def __init__(self, binary):
|
|
self._binary = binary
|
|
self._storage_directory = None
|
|
self._auth_module = None
|
|
self._auth_config = None
|
|
self._process = None
|
|
|
|
def start(self, **kwargs):
|
|
self.stop()
|
|
self._storage_directory = tempfile.TemporaryDirectory()
|
|
self._auth_module = os.path.join(self._storage_directory.name,
|
|
"ldap.py")
|
|
self._auth_config = os.path.join(self._storage_directory.name,
|
|
"ldap.yaml")
|
|
script_file = os.path.join(PROJECT_DIR, "src", "auth",
|
|
"reference_modules", "ldap.py")
|
|
virtualenv_bin = os.path.join(SCRIPT_DIR, "ve3", "bin", "python3")
|
|
with open(script_file) as fin:
|
|
data = fin.read()
|
|
data = data.replace("/usr/bin/env python3", virtualenv_bin)
|
|
data = data.replace("/etc/memgraph/auth/ldap.yaml",
|
|
self._auth_config)
|
|
with open(self._auth_module, "w") as fout:
|
|
fout.write(data)
|
|
os.chmod(self._auth_module, stat.S_IRWXU | stat.S_IRWXG)
|
|
self.restart(**kwargs)
|
|
|
|
def restart(self, **kwargs):
|
|
self.stop()
|
|
config = {
|
|
"encryption": kwargs.pop("encryption", "disabled"),
|
|
"prefix": kwargs.pop("prefix", "cn="),
|
|
"suffix": kwargs.pop("suffix", ",ou=people,dc=memgraph,dc=com"),
|
|
"root_dn": kwargs.pop("root_dn", "ou=roles,dc=memgraph,dc=com"),
|
|
"root_objectclass": kwargs.pop("root_objectclass", "groupOfNames"),
|
|
"user_attribute": kwargs.pop("user_attribute", "member"),
|
|
"role_attribute": kwargs.pop("role_attribute", "cn"),
|
|
}
|
|
with open(self._auth_config, "w") as f:
|
|
f.write(CONFIG_TEMPLATE.format(**config))
|
|
args = [self._binary,
|
|
"--data-directory", self._storage_directory.name,
|
|
"--auth-module-executable",
|
|
kwargs.pop("module_executable", self._auth_module)]
|
|
for key, value in kwargs.items():
|
|
ldap_key = "--auth-module-" + key.replace("_", "-")
|
|
if type(value) == bool:
|
|
args.append(ldap_key + "=" + str(value).lower())
|
|
else:
|
|
args.append(ldap_key)
|
|
args.append(value)
|
|
self._process = subprocess.Popen(args)
|
|
time.sleep(0.1)
|
|
assert self._process.poll() is None, "Memgraph process died " \
|
|
"prematurely!"
|
|
wait_for_server(7687)
|
|
|
|
def stop(self, check=True):
|
|
if self._process is None:
|
|
return 0
|
|
self._process.terminate()
|
|
exitcode = self._process.wait()
|
|
self._process = None
|
|
if check:
|
|
assert exitcode == 0, "Memgraph process didn't exit cleanly!"
|
|
return exitcode
|
|
|
|
|
|
def initialize_test(memgraph, tester_binary, **kwargs):
|
|
memgraph.start(module_executable="")
|
|
execute_tester(tester_binary,
|
|
["CREATE USER root", "GRANT ALL PRIVILEGES TO root"])
|
|
check_login = kwargs.pop("check_login", True)
|
|
memgraph.restart(**kwargs)
|
|
if check_login:
|
|
execute_tester(tester_binary, [], "root")
|
|
|
|
|
|
# Tests
|
|
|
|
|
|
def test_basic(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary)
|
|
execute_tester(tester_binary, [], "alice")
|
|
execute_tester(tester_binary, ["GRANT MATCH TO alice"], "root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_only_existing_users(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, create_missing_user=False)
|
|
execute_tester(tester_binary, [], "alice", auth_should_fail=True)
|
|
execute_tester(tester_binary, ["CREATE USER alice"], "root")
|
|
execute_tester(tester_binary, [], "alice")
|
|
execute_tester(tester_binary, ["GRANT MATCH TO alice"], "root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_role_mapping(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary)
|
|
|
|
execute_tester(tester_binary, [], "alice")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
|
|
query_should_fail=True)
|
|
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
|
|
execute_tester(tester_binary, [], "bob")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "bob",
|
|
query_should_fail=True)
|
|
|
|
execute_tester(tester_binary, [], "carol")
|
|
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol",
|
|
query_should_fail=True)
|
|
execute_tester(tester_binary, ["GRANT CREATE TO admin"], "root")
|
|
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol")
|
|
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "dave")
|
|
|
|
memgraph.stop()
|
|
|
|
|
|
def test_role_removal(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary)
|
|
execute_tester(tester_binary, [], "alice")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
|
|
query_should_fail=True)
|
|
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
memgraph.restart(manage_roles=False)
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
execute_tester(tester_binary, ["CLEAR ROLE FOR alice"], "root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
|
|
query_should_fail=True)
|
|
memgraph.stop()
|
|
|
|
|
|
def test_only_existing_roles(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, create_missing_role=False)
|
|
execute_tester(tester_binary, [], "bob")
|
|
execute_tester(tester_binary, [], "alice", auth_should_fail=True)
|
|
execute_tester(tester_binary, ["CREATE ROLE moderator"], "root")
|
|
execute_tester(tester_binary, [], "alice")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_role_is_user(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary)
|
|
execute_tester(tester_binary, [], "admin")
|
|
execute_tester(tester_binary, [], "carol", auth_should_fail=True)
|
|
memgraph.stop()
|
|
|
|
|
|
def test_user_is_role(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary)
|
|
execute_tester(tester_binary, [], "carol")
|
|
execute_tester(tester_binary, [], "admin", auth_should_fail=True)
|
|
memgraph.stop()
|
|
|
|
|
|
def test_user_permissions_persistancy(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary)
|
|
execute_tester(tester_binary,
|
|
["CREATE USER alice", "GRANT MATCH TO alice"], "root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_role_permissions_persistancy(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary)
|
|
execute_tester(tester_binary,
|
|
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
|
|
"root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_only_authentication(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, manage_roles=False)
|
|
execute_tester(tester_binary,
|
|
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
|
|
"root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
|
|
query_should_fail=True)
|
|
memgraph.stop()
|
|
|
|
|
|
def test_wrong_prefix(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, prefix="eve", check_login=False)
|
|
execute_tester(tester_binary, [], "root", auth_should_fail=True)
|
|
memgraph.stop()
|
|
|
|
|
|
def test_wrong_suffix(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, suffix="", check_login=False)
|
|
execute_tester(tester_binary, [], "root", auth_should_fail=True)
|
|
memgraph.stop()
|
|
|
|
|
|
def test_suffix_with_spaces(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary,
|
|
suffix=", ou= people, dc = memgraph, dc = com")
|
|
execute_tester(tester_binary,
|
|
["CREATE USER alice", "GRANT MATCH TO alice"], "root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_role_mapping_wrong_root_dn(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary,
|
|
root_dn="ou=invalid,dc=memgraph,dc=com")
|
|
execute_tester(tester_binary,
|
|
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
|
|
"root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
|
|
query_should_fail=True)
|
|
memgraph.restart()
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_role_mapping_wrong_root_objectclass(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, root_objectclass="person")
|
|
execute_tester(tester_binary,
|
|
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
|
|
"root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
|
|
query_should_fail=True)
|
|
memgraph.restart()
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_role_mapping_wrong_user_attribute(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, user_attribute="cn")
|
|
execute_tester(tester_binary,
|
|
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
|
|
"root")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
|
|
query_should_fail=True)
|
|
memgraph.restart()
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_wrong_password(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary)
|
|
execute_tester(tester_binary, [], "root", password="sudo",
|
|
auth_should_fail=True)
|
|
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_password_persistancy(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, check_login=False)
|
|
memgraph.restart(module_executable="")
|
|
execute_tester(tester_binary, ["SHOW USERS"], "root", password="sudo")
|
|
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
|
|
memgraph.restart()
|
|
execute_tester(tester_binary, [], "root", password="sudo",
|
|
auth_should_fail=True)
|
|
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
|
|
memgraph.restart(module_executable="")
|
|
execute_tester(tester_binary, [], "root", password="sudo",
|
|
auth_should_fail=True)
|
|
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
|
|
memgraph.stop()
|
|
|
|
|
|
def test_user_multiple_roles(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, check_login=False)
|
|
memgraph.restart()
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "eve",
|
|
query_should_fail=True)
|
|
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root",
|
|
query_should_fail=True)
|
|
memgraph.restart(manage_roles=False)
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "eve",
|
|
query_should_fail=True)
|
|
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root",
|
|
query_should_fail=True)
|
|
memgraph.restart(manage_roles=False, root_dn="")
|
|
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "eve",
|
|
query_should_fail=True)
|
|
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root",
|
|
query_should_fail=True)
|
|
memgraph.stop()
|
|
|
|
|
|
def test_starttls_failure(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, encryption="starttls",
|
|
check_login=False)
|
|
execute_tester(tester_binary, [], "root", auth_should_fail=True)
|
|
memgraph.stop()
|
|
|
|
|
|
def test_ssl_failure(memgraph, tester_binary):
|
|
initialize_test(memgraph, tester_binary, encryption="ssl",
|
|
check_login=False)
|
|
execute_tester(tester_binary, [], "root", auth_should_fail=True)
|
|
memgraph.stop()
|
|
|
|
|
|
# Startup logic
|
|
|
|
|
|
if __name__ == "__main__":
|
|
memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph")
|
|
tester_binary = os.path.join(PROJECT_DIR, "build", "tests",
|
|
"integration", "ldap", "tester")
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--memgraph", default=memgraph_binary)
|
|
parser.add_argument("--tester", default=tester_binary)
|
|
parser.add_argument("--openldap-dir",
|
|
default=os.path.join(SCRIPT_DIR, "openldap-2.4.47"))
|
|
args = parser.parse_args()
|
|
|
|
# Setup Memgraph handler
|
|
memgraph = Memgraph(args.memgraph)
|
|
|
|
# Start the slapd binary
|
|
slapd_args = [os.path.join(args.openldap_dir, "exe", "libexec", "slapd"),
|
|
"-h", "ldap://127.0.0.1:1389/", "-d", "0"]
|
|
slapd = subprocess.Popen(slapd_args)
|
|
time.sleep(0.1)
|
|
assert slapd.poll() is None, "slapd process died prematurely!"
|
|
wait_for_server(1389)
|
|
|
|
# Register cleanup function
|
|
@atexit.register
|
|
def cleanup():
|
|
mg_stat = memgraph.stop(check=False)
|
|
if mg_stat != 0:
|
|
print("Memgraph process didn't exit cleanly!")
|
|
|
|
if slapd.poll() is None:
|
|
slapd.terminate()
|
|
slapd_stat = slapd.wait()
|
|
if slapd_stat != 0:
|
|
print("slapd process didn't exit cleanly!")
|
|
|
|
assert mg_stat == 0 and slapd_stat == 0, "Some of the processes " \
|
|
"(memgraph, slapd) crashed!"
|
|
|
|
# Execute tests
|
|
names = sorted(globals().keys())
|
|
for name in names:
|
|
if not name.startswith("test_"):
|
|
continue
|
|
test = " ".join(name[5:].split("_"))
|
|
func = globals()[name]
|
|
print("\033[1;36m~~ Running", test, "test ~~\033[0m")
|
|
func(memgraph, args.tester)
|
|
print("\033[1;36m~~ Finished", test, "test ~~\033[0m\n")
|
|
|
|
# Shutdown the slapd binary
|
|
slapd.terminate()
|
|
assert slapd.wait() == 0, "slapd process didn't exit cleanly!"
|
|
|
|
sys.exit(0)
|