#!/usr/bin/python3 -u import argparse import atexit import os import subprocess import sys import tempfile import time SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) def wait_for_server(port, delay=0.1): cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] while subprocess.call(cmd) != 0: time.sleep(0.01) time.sleep(delay) def execute_tester(binary, queries, username="", password="", auth_should_fail=False, query_should_fail=False): if password == "": password = username args = [binary, "--username", username, "--password", password] if auth_should_fail: args.append("--auth-should-fail") if query_should_fail: args.append("--query-should-fail") args.extend(queries) subprocess.run(args).check_returncode() class Memgraph: def __init__(self, binary): self._binary = binary self._storage_directory = None self._process = None def start(self, args=[]): self.stop() self._storage_directory = tempfile.TemporaryDirectory() self.restart(args) def restart(self, args=[]): self.stop() args = [self._binary, "--durability-directory", self._storage_directory.name] + list(map(str, args)) self._process = subprocess.Popen(args) time.sleep(0.1) assert self._process.poll() is None, "Memgraph process died " \ "prematurely!" wait_for_server(7687) def stop(self, check=True): if self._process is None: return 0 self._process.terminate() exitcode = self._process.wait() self._process = None if check: assert exitcode == 0, "Memgraph process didn't exit cleanly!" return exitcode def restart_memgraph(memgraph, tester_binary, **kwargs): args = ["--auth-ldap-enabled", "--auth-ldap-host", "127.0.0.1", "--auth-ldap-port", "1389"] if "prefix" not in kwargs: kwargs["prefix"] = "cn=" if "suffix" not in kwargs: kwargs["suffix"] = ",ou=people,dc=memgraph,dc=com" for key, value in kwargs.items(): ldap_key = "--auth-ldap-" + key.replace("_", "-") if type(value) == bool: args.append(ldap_key + "=" + str(value).lower()) else: args.append(ldap_key) args.append(value) memgraph.restart(args) def initialize_test(memgraph, tester_binary, **kwargs): memgraph.start() execute_tester(tester_binary, ["CREATE USER root", "GRANT ALL PRIVILEGES TO root"]) check_login = kwargs.pop("check_login", True) restart_memgraph(memgraph, tester_binary, **kwargs) if check_login: execute_tester(tester_binary, [], "root") # Tests def test_basic(memgraph, tester_binary): initialize_test(memgraph, tester_binary) execute_tester(tester_binary, [], "alice") execute_tester(tester_binary, ["GRANT MATCH TO alice"], "root") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice") memgraph.stop() def test_only_existing_users(memgraph, tester_binary): initialize_test(memgraph, tester_binary, create_user=False) execute_tester(tester_binary, [], "alice", auth_should_fail=True) execute_tester(tester_binary, ["CREATE USER alice"], "root") execute_tester(tester_binary, [], "alice") execute_tester(tester_binary, ["GRANT MATCH TO alice"], "root") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice") memgraph.stop() def test_role_mapping(memgraph, tester_binary): initialize_test(memgraph, tester_binary, role_mapping_root_dn="ou=roles,dc=memgraph,dc=com") execute_tester(tester_binary, [], "alice") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True) execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice") execute_tester(tester_binary, [], "bob") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "bob", query_should_fail=True) execute_tester(tester_binary, [], "carol") execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol", query_should_fail=True) execute_tester(tester_binary, ["GRANT CREATE TO admin"], "root") execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol") execute_tester(tester_binary, ["CREATE (n) RETURN n"], "dave") memgraph.stop() def test_role_removal(memgraph, tester_binary): initialize_test(memgraph, tester_binary, role_mapping_root_dn="ou=roles,dc=memgraph,dc=com") execute_tester(tester_binary, [], "alice") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True) execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice") restart_memgraph(memgraph, tester_binary) execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True) memgraph.stop() def test_only_existing_roles(memgraph, tester_binary): initialize_test(memgraph, tester_binary, role_mapping_root_dn="ou=roles,dc=memgraph,dc=com", create_role=False) execute_tester(tester_binary, [], "bob") execute_tester(tester_binary, [], "alice", auth_should_fail=True) execute_tester(tester_binary, ["CREATE ROLE moderator"], "root") execute_tester(tester_binary, [], "alice") memgraph.stop() def test_role_is_user(memgraph, tester_binary): initialize_test(memgraph, tester_binary, role_mapping_root_dn="ou=roles,dc=memgraph,dc=com") execute_tester(tester_binary, [], "admin") execute_tester(tester_binary, [], "carol", auth_should_fail=True) memgraph.stop() def test_user_is_role(memgraph, tester_binary): initialize_test(memgraph, tester_binary, role_mapping_root_dn="ou=roles,dc=memgraph,dc=com") execute_tester(tester_binary, [], "carol") execute_tester(tester_binary, [], "admin", auth_should_fail=True) memgraph.stop() def test_user_permissions_persistancy(memgraph, tester_binary): initialize_test(memgraph, tester_binary) execute_tester(tester_binary, ["CREATE USER alice", "GRANT MATCH TO alice"], "root") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice") memgraph.stop() def test_role_permissions_persistancy(memgraph, tester_binary): initialize_test(memgraph, tester_binary, role_mapping_root_dn="ou=roles,dc=memgraph,dc=com") execute_tester(tester_binary, ["CREATE ROLE moderator", "GRANT MATCH TO moderator"], "root") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice") memgraph.stop() def test_only_authentication(memgraph, tester_binary): initialize_test(memgraph, tester_binary) execute_tester(tester_binary, ["CREATE ROLE moderator", "GRANT MATCH TO moderator"], "root") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True) memgraph.stop() def test_wrong_prefix(memgraph, tester_binary): initialize_test(memgraph, tester_binary, prefix="eve", check_login=False) execute_tester(tester_binary, [], "root", auth_should_fail=True) memgraph.stop() def test_wrong_suffix(memgraph, tester_binary): initialize_test(memgraph, tester_binary, suffix="", check_login=False) execute_tester(tester_binary, [], "root", auth_should_fail=True) memgraph.stop() def test_suffix_with_spaces(memgraph, tester_binary): initialize_test(memgraph, tester_binary, suffix=", ou= people, dc = memgraph, dc = com") execute_tester(tester_binary, ["CREATE USER alice", "GRANT MATCH TO alice"], "root") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice") memgraph.stop() def test_role_mapping_wrong_root_dn(memgraph, tester_binary): initialize_test(memgraph, tester_binary, role_mapping_root_dn="ou=invalid,dc=memgraph,dc=com") execute_tester(tester_binary, ["CREATE ROLE moderator", "GRANT MATCH TO moderator"], "root") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True) restart_memgraph(memgraph, tester_binary, role_mapping_root_dn="ou=roles,dc=memgraph,dc=com") execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice") memgraph.stop() def test_wrong_password(memgraph, tester_binary): initialize_test(memgraph, tester_binary, role_mapping_root_dn="ou=roles,dc=memgraph,dc=com") execute_tester(tester_binary, [], "root", password="sudo", auth_should_fail=True) execute_tester(tester_binary, ["SHOW USERS"], "root", password="root") memgraph.stop() def test_password_persistancy(memgraph, tester_binary): initialize_test(memgraph, tester_binary, check_login=False) memgraph.restart() execute_tester(tester_binary, ["SHOW USERS"], "root", password="sudo") execute_tester(tester_binary, ["SHOW USERS"], "root", password="root") restart_memgraph(memgraph, tester_binary) execute_tester(tester_binary, [], "root", password="sudo", auth_should_fail=True) execute_tester(tester_binary, ["SHOW USERS"], "root", password="root") memgraph.restart() execute_tester(tester_binary, [], "root", password="sudo", auth_should_fail=True) execute_tester(tester_binary, ["SHOW USERS"], "root", password="root") memgraph.stop() def test_starttls_failure(memgraph, tester_binary): initialize_test(memgraph, tester_binary, issue_starttls=True, check_login=False) execute_tester(tester_binary, [], "root", auth_should_fail=True) memgraph.stop() # Startup logic if __name__ == "__main__": memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph") if not os.path.exists(memgraph_binary): memgraph_binary = os.path.join(PROJECT_DIR, "build_debug", "memgraph") tester_binary = os.path.join(PROJECT_DIR, "build", "tests", "integration", "ldap", "tester") if not os.path.exists(tester_binary): tester_binary = os.path.join(PROJECT_DIR, "build_debug", "tests", "integration", "ldap", "tester") parser = argparse.ArgumentParser() parser.add_argument("--memgraph", default=memgraph_binary) parser.add_argument("--tester", default=tester_binary) parser.add_argument("--openldap-dir", default=os.path.join(SCRIPT_DIR, "openldap-2.4.47")) args = parser.parse_args() # Setup Memgraph handler memgraph = Memgraph(args.memgraph) # Start the slapd binary slapd_args = [os.path.join(args.openldap_dir, "exe", "libexec", "slapd"), "-h", "ldap://127.0.0.1:1389/", "-d", "0"] slapd = subprocess.Popen(slapd_args) time.sleep(0.1) assert slapd.poll() is None, "slapd process died prematurely!" wait_for_server(1389) # Register cleanup function @atexit.register def cleanup(): mg_stat = memgraph.stop(check=False) if mg_stat != 0: print("Memgraph process didn't exit cleanly!") if slapd.poll() is None: slapd.terminate() slapd_stat = slapd.wait() if slapd_stat != 0: print("slapd process didn't exit cleanly!") assert mg_stat == 0 and slapd_stat == 0, "Some of the processes " \ "(memgraph, slapd) crashed!" # Execute tests names = sorted(globals().keys()) for name in names: if not name.startswith("test_"): continue test = " ".join(name[5:].split("_")) func = globals()[name] print("\033[1;36m~~ Running", test, "test ~~\033[0m") func(memgraph, args.tester) print("\033[1;36m~~ Finished", test, "test ~~\033[0m\n") # Shutdown the slapd binary slapd.terminate() assert slapd.wait() == 0, "slapd process didn't exit cleanly!" sys.exit(0)