memgraph/tests/integration/ldap/runner.py

349 lines
13 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/python3 -u
# Copyright 2021 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import argparse
import atexit
import os
import stat
import subprocess
import sys
import tempfile
import time
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
SIGNAL_SIGTERM = 15
CONFIG_TEMPLATE = """
server:
host: "127.0.0.1"
port: 1389
encryption: "{encryption}"
cert_file: ""
key_file: ""
ca_file: ""
validate_cert: false
users:
prefix: "{prefix}"
suffix: "{suffix}"
roles:
root_dn: "{root_dn}"
root_objectclass: "{root_objectclass}"
user_attribute: "{user_attribute}"
role_attribute: "{role_attribute}"
"""
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def execute_tester(binary, queries, username="", password="", auth_should_fail=False, query_should_fail=False):
if password == "":
password = username
args = [binary, "--username", username, "--password", password]
if auth_should_fail:
args.append("--auth-should-fail")
if query_should_fail:
args.append("--query-should-fail")
args.extend(queries)
subprocess.run(args).check_returncode()
class Memgraph:
def __init__(self, binary):
self._binary = binary
self._storage_directory = None
self._auth_module = None
self._auth_config = None
self._process = None
def start(self, **kwargs):
self.stop()
self._storage_directory = tempfile.TemporaryDirectory()
self._auth_module = os.path.join(self._storage_directory.name, "ldap.py")
self._auth_config = os.path.join(self._storage_directory.name, "ldap.yaml")
script_file = os.path.join(PROJECT_DIR, "src", "auth", "reference_modules", "ldap.py")
virtualenv_bin = os.path.join(SCRIPT_DIR, "ve3", "bin", "python3")
with open(script_file) as fin:
data = fin.read()
data = data.replace("/usr/bin/python3", virtualenv_bin)
data = data.replace("/etc/memgraph/auth/ldap.yaml", self._auth_config)
with open(self._auth_module, "w") as fout:
fout.write(data)
os.chmod(self._auth_module, stat.S_IRWXU | stat.S_IRWXG)
self.restart(**kwargs)
def restart(self, **kwargs):
self.stop()
config = {
"encryption": kwargs.pop("encryption", "disabled"),
"prefix": kwargs.pop("prefix", "cn="),
"suffix": kwargs.pop("suffix", ",ou=people,dc=memgraph,dc=com"),
"root_dn": kwargs.pop("root_dn", "ou=roles,dc=memgraph,dc=com"),
"root_objectclass": kwargs.pop("root_objectclass", "groupOfNames"),
"user_attribute": kwargs.pop("user_attribute", "member"),
"role_attribute": kwargs.pop("role_attribute", "cn"),
}
with open(self._auth_config, "w") as f:
f.write(CONFIG_TEMPLATE.format(**config))
args = [
self._binary,
"--data-directory",
self._storage_directory.name,
"--auth-module-executable",
kwargs.pop("module_executable", self._auth_module),
]
for key, value in kwargs.items():
ldap_key = "--auth-module-" + key.replace("_", "-")
if isinstance(value, bool):
args.append(ldap_key + "=" + str(value).lower())
else:
args.append(ldap_key)
args.append(value)
self._process = subprocess.Popen(args)
time.sleep(0.1)
assert self._process.poll() is None, "Memgraph process died " "prematurely!"
wait_for_server(7687)
def stop(self, check=True):
if self._process is None:
return 0
pid = self._process.pid
try:
os.kill(pid, SIGNAL_SIGTERM)
except os.OSError:
if check:
assert False
return -1
time.sleep(1)
return 0
def initialize_test(memgraph, tester_binary, **kwargs):
memgraph.start(module_executable="")
execute_tester(tester_binary, ["CREATE ROLE root_role", "GRANT ALL PRIVILEGES TO root_role"])
check_login = kwargs.pop("check_login", True)
memgraph.restart(**kwargs)
if check_login:
execute_tester(tester_binary, [], "root")
# Tests
def test_module_ux(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, ["CREATE USER user1"], "root", query_should_fail=True)
execute_tester(tester_binary, ["CREATE ROLE role1"], "root", query_should_fail=False)
execute_tester(tester_binary, ["DROP USER user1"], "root", query_should_fail=True)
execute_tester(tester_binary, ["DROP ROLE role1"], "root", query_should_fail=False)
execute_tester(tester_binary, ["SET ROLE FOR user1 TO role1"], "root", query_should_fail=True)
execute_tester(tester_binary, ["CLEAR ROLE FOR user1"], "root", query_should_fail=True)
memgraph.stop()
def test_user_auth(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "alice", auth_should_fail=True)
execute_tester(tester_binary, ["CREATE ROLE moderator"], "root")
execute_tester(tester_binary, [], "alice")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_mapping(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "alice", auth_should_fail=True)
execute_tester(tester_binary, [], "bob", auth_should_fail=True)
execute_tester(tester_binary, [], "carol", auth_should_fail=True)
execute_tester(tester_binary, ["CREATE ROLE moderator"], "root")
execute_tester(tester_binary, ["CREATE ROLE admin"], "root")
execute_tester(tester_binary, [], "alice", auth_should_fail=False)
execute_tester(tester_binary, [], "bob", auth_should_fail=True)
execute_tester(tester_binary, [], "carol", auth_should_fail=False)
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True)
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "carol", query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=False)
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "carol", query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO admin"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=False)
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "carol", query_should_fail=False)
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "alice", query_should_fail=True)
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol", query_should_fail=True)
execute_tester(tester_binary, ["GRANT CREATE TO admin"], "root")
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "alice", query_should_fail=True)
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol", query_should_fail=False)
memgraph.stop()
def test_instance_restart(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, ["CREATE ROLE moderator"], "root")
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.restart()
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_removal(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, ["CREATE ROLE moderator"], "root")
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
execute_tester(tester_binary, ["DROP ROLE moderator"], "root")
execute_tester(tester_binary, [], "alice", auth_should_fail=True)
memgraph.stop()
def test_wrong_prefix(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, prefix="eve", check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
def test_wrong_suffix(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, suffix="", check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
def test_suffix_with_spaces(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, suffix=", ou= people, dc = memgraph, dc = com")
execute_tester(tester_binary, ["CREATE ROLE moderator", "GRANT MATCH TO moderator"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
# def test_role_mapping_wrong_root_dn(memgraph, tester_binary):
# initialize_test(memgraph, tester_binary, root_dn="ou=invalid,dc=memgraph,dc=com")
# execute_tester(tester_binary, ["CREATE ROLE moderator", "GRANT MATCH TO moderator"], "root")
# execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True)
# memgraph.restart()
# execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
# memgraph.stop()
# def test_role_mapping_wrong_root_objectclass(memgraph, tester_binary):
# initialize_test(memgraph, tester_binary, root_objectclass="person")
# execute_tester(tester_binary, ["CREATE ROLE moderator", "GRANT MATCH TO moderator"], "root")
# execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True)
# memgraph.restart()
# execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
# memgraph.stop()
# def test_role_mapping_wrong_user_attribute(memgraph, tester_binary):
# initialize_test(memgraph, tester_binary, user_attribute="cn")
# execute_tester(tester_binary, ["CREATE ROLE moderator", "GRANT MATCH TO moderator"], "root")
# execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice", query_should_fail=True)
# memgraph.restart()
# execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
# memgraph.stop()
def test_wrong_password(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "root", password="sudo", auth_should_fail=True)
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
memgraph.stop()
def test_user_multiple_roles(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "eve", auth_should_fail=True)
memgraph.stop()
def test_starttls_failure(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, encryption="starttls", check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
def test_ssl_failure(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, encryption="ssl", check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
# Startup logic
if __name__ == "__main__":
memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph")
tester_binary = os.path.join(PROJECT_DIR, "build", "tests", "integration", "ldap", "tester")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--tester", default=tester_binary)
parser.add_argument("--openldap-dir", default=os.path.join(SCRIPT_DIR, "openldap-2.4.47"))
args = parser.parse_args()
# Setup Memgraph handler
memgraph = Memgraph(args.memgraph)
# Start the slapd binary
slapd_args = [os.path.join(args.openldap_dir, "exe", "libexec", "slapd"), "-h", "ldap://127.0.0.1:1389/", "-d", "0"]
slapd = subprocess.Popen(slapd_args)
time.sleep(0.1)
assert slapd.poll() is None, "slapd process died prematurely!"
wait_for_server(1389)
# Register cleanup function
@atexit.register
def cleanup():
mg_stat = memgraph.stop(check=False)
if mg_stat != 0:
print("Memgraph process didn't exit cleanly!")
if slapd.poll() is None:
slapd.terminate()
slapd_stat = slapd.wait()
if slapd_stat != 0:
print("slapd process didn't exit cleanly!")
assert mg_stat == 0 and slapd_stat == 0, "Some of the processes " "(memgraph, slapd) crashed!"
# Execute tests
names = sorted(globals().keys())
for name in names:
if not name.startswith("test_"):
continue
test = " ".join(name[5:].split("_"))
func = globals()[name]
print("\033[1;36m~~ Running", test, "test ~~\033[0m")
func(memgraph, args.tester)
print("\033[1;36m~~ Finished", test, "test ~~\033[0m\n")
# Shutdown the slapd binary
slapd.terminate()
assert slapd.wait() == 0, "slapd process didn't exit cleanly!"
sys.exit(0)