memgraph/tests/integration/ldap/runner.py

419 lines
15 KiB
Python
Executable File

#!/usr/bin/python3 -u
import argparse
import atexit
import os
import stat
import subprocess
import sys
import tempfile
import time
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
CONFIG_TEMPLATE = """
server:
host: "127.0.0.1"
port: 1389
encryption: "{encryption}"
cert_file: ""
key_file: ""
ca_file: ""
validate_cert: false
users:
prefix: "{prefix}"
suffix: "{suffix}"
roles:
root_dn: "{root_dn}"
root_objectclass: "{root_objectclass}"
user_attribute: "{user_attribute}"
role_attribute: "{role_attribute}"
"""
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def execute_tester(binary, queries, username="", password="",
auth_should_fail=False, query_should_fail=False):
if password == "":
password = username
args = [binary, "--username", username, "--password", password]
if auth_should_fail:
args.append("--auth-should-fail")
if query_should_fail:
args.append("--query-should-fail")
args.extend(queries)
subprocess.run(args).check_returncode()
class Memgraph:
def __init__(self, binary):
self._binary = binary
self._storage_directory = None
self._auth_module = None
self._auth_config = None
self._process = None
def start(self, **kwargs):
self.stop()
self._storage_directory = tempfile.TemporaryDirectory()
self._auth_module = os.path.join(self._storage_directory.name,
"ldap.py")
self._auth_config = os.path.join(self._storage_directory.name,
"ldap.yaml")
script_file = os.path.join(PROJECT_DIR, "src", "auth",
"reference_modules", "ldap.py")
virtualenv_bin = os.path.join(SCRIPT_DIR, "ve3", "bin", "python3")
with open(script_file) as fin:
data = fin.read()
data = data.replace("/usr/bin/python3", virtualenv_bin)
data = data.replace("/etc/memgraph/auth/ldap.yaml",
self._auth_config)
with open(self._auth_module, "w") as fout:
fout.write(data)
os.chmod(self._auth_module, stat.S_IRWXU | stat.S_IRWXG)
self.restart(**kwargs)
def restart(self, **kwargs):
self.stop()
config = {
"encryption": kwargs.pop("encryption", "disabled"),
"prefix": kwargs.pop("prefix", "cn="),
"suffix": kwargs.pop("suffix", ",ou=people,dc=memgraph,dc=com"),
"root_dn": kwargs.pop("root_dn", "ou=roles,dc=memgraph,dc=com"),
"root_objectclass": kwargs.pop("root_objectclass", "groupOfNames"),
"user_attribute": kwargs.pop("user_attribute", "member"),
"role_attribute": kwargs.pop("role_attribute", "cn"),
}
with open(self._auth_config, "w") as f:
f.write(CONFIG_TEMPLATE.format(**config))
args = [self._binary,
"--data-directory", self._storage_directory.name,
"--auth-module-executable",
kwargs.pop("module_executable", self._auth_module)]
for key, value in kwargs.items():
ldap_key = "--auth-module-" + key.replace("_", "-")
if isinstance(value, bool):
args.append(ldap_key + "=" + str(value).lower())
else:
args.append(ldap_key)
args.append(value)
self._process = subprocess.Popen(args)
time.sleep(0.1)
assert self._process.poll() is None, "Memgraph process died " \
"prematurely!"
wait_for_server(7687)
def stop(self, check=True):
if self._process is None:
return 0
self._process.terminate()
exitcode = self._process.wait()
self._process = None
if check:
assert exitcode == 0, "Memgraph process didn't exit cleanly!"
return exitcode
def initialize_test(memgraph, tester_binary, **kwargs):
memgraph.start(module_executable="")
execute_tester(tester_binary,
["CREATE USER root", "GRANT ALL PRIVILEGES TO root"])
check_login = kwargs.pop("check_login", True)
memgraph.restart(**kwargs)
if check_login:
execute_tester(tester_binary, [], "root")
# Tests
def test_basic(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "alice")
execute_tester(tester_binary, ["GRANT MATCH TO alice"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_only_existing_users(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, create_missing_user=False)
execute_tester(tester_binary, [], "alice", auth_should_fail=True)
execute_tester(tester_binary, ["CREATE USER alice"], "root")
execute_tester(tester_binary, [], "alice")
execute_tester(tester_binary, ["GRANT MATCH TO alice"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_mapping(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "alice")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
execute_tester(tester_binary, [], "bob")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "bob",
query_should_fail=True)
execute_tester(tester_binary, [], "carol")
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol",
query_should_fail=True)
execute_tester(tester_binary, ["GRANT CREATE TO admin"], "root")
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol")
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "dave")
memgraph.stop()
def test_role_removal(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "alice")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.restart(manage_roles=False)
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
execute_tester(tester_binary, ["CLEAR ROLE FOR alice"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
memgraph.stop()
def test_only_existing_roles(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, create_missing_role=False)
execute_tester(tester_binary, [], "bob")
execute_tester(tester_binary, [], "alice", auth_should_fail=True)
execute_tester(tester_binary, ["CREATE ROLE moderator"], "root")
execute_tester(tester_binary, [], "alice")
memgraph.stop()
def test_role_is_user(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "admin")
execute_tester(tester_binary, [], "carol", auth_should_fail=True)
memgraph.stop()
def test_user_is_role(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "carol")
execute_tester(tester_binary, [], "admin", auth_should_fail=True)
memgraph.stop()
def test_user_permissions_persistancy(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary,
["CREATE USER alice", "GRANT MATCH TO alice"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_permissions_persistancy(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary,
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
"root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_only_authentication(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, manage_roles=False)
execute_tester(tester_binary,
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
"root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
memgraph.stop()
def test_wrong_prefix(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, prefix="eve", check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
def test_wrong_suffix(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, suffix="", check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
def test_suffix_with_spaces(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
suffix=", ou= people, dc = memgraph, dc = com")
execute_tester(tester_binary,
["CREATE USER alice", "GRANT MATCH TO alice"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_mapping_wrong_root_dn(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
root_dn="ou=invalid,dc=memgraph,dc=com")
execute_tester(tester_binary,
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
"root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
memgraph.restart()
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_mapping_wrong_root_objectclass(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, root_objectclass="person")
execute_tester(tester_binary,
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
"root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
memgraph.restart()
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_mapping_wrong_user_attribute(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, user_attribute="cn")
execute_tester(tester_binary,
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
"root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
memgraph.restart()
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_wrong_password(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "root", password="sudo",
auth_should_fail=True)
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
memgraph.stop()
def test_password_persistancy(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, check_login=False)
memgraph.restart(module_executable="")
execute_tester(tester_binary, ["SHOW USERS"], "root", password="sudo")
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
memgraph.restart()
execute_tester(tester_binary, [], "root", password="sudo",
auth_should_fail=True)
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
memgraph.restart(module_executable="")
execute_tester(tester_binary, [], "root", password="sudo",
auth_should_fail=True)
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
memgraph.stop()
def test_user_multiple_roles(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, check_login=False)
memgraph.restart()
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "eve",
query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root",
query_should_fail=True)
memgraph.restart(manage_roles=False)
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "eve",
query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root",
query_should_fail=True)
memgraph.restart(manage_roles=False, root_dn="")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "eve",
query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root",
query_should_fail=True)
memgraph.stop()
def test_starttls_failure(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, encryption="starttls",
check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
def test_ssl_failure(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, encryption="ssl",
check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
# Startup logic
if __name__ == "__main__":
memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph")
tester_binary = os.path.join(PROJECT_DIR, "build", "tests",
"integration", "ldap", "tester")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--tester", default=tester_binary)
parser.add_argument("--openldap-dir",
default=os.path.join(SCRIPT_DIR, "openldap-2.4.47"))
args = parser.parse_args()
# Setup Memgraph handler
memgraph = Memgraph(args.memgraph)
# Start the slapd binary
slapd_args = [os.path.join(args.openldap_dir, "exe", "libexec", "slapd"),
"-h", "ldap://127.0.0.1:1389/", "-d", "0"]
slapd = subprocess.Popen(slapd_args)
time.sleep(0.1)
assert slapd.poll() is None, "slapd process died prematurely!"
wait_for_server(1389)
# Register cleanup function
@atexit.register
def cleanup():
mg_stat = memgraph.stop(check=False)
if mg_stat != 0:
print("Memgraph process didn't exit cleanly!")
if slapd.poll() is None:
slapd.terminate()
slapd_stat = slapd.wait()
if slapd_stat != 0:
print("slapd process didn't exit cleanly!")
assert mg_stat == 0 and slapd_stat == 0, "Some of the processes " \
"(memgraph, slapd) crashed!"
# Execute tests
names = sorted(globals().keys())
for name in names:
if not name.startswith("test_"):
continue
test = " ".join(name[5:].split("_"))
func = globals()[name]
print("\033[1;36m~~ Running", test, "test ~~\033[0m")
func(memgraph, args.tester)
print("\033[1;36m~~ Finished", test, "test ~~\033[0m\n")
# Shutdown the slapd binary
slapd.terminate()
assert slapd.wait() == 0, "slapd process didn't exit cleanly!"
sys.exit(0)