memgraph/tests/integration/ldap/runner.py
Matej Ferencevic d9bc4ec476 Implement LDAP authentication
Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1888
2019-02-28 09:39:30 +01:00

337 lines
12 KiB
Python
Executable File

#!/usr/bin/python3 -u
import argparse
import atexit
import os
import subprocess
import sys
import tempfile
import time
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def execute_tester(binary, queries, username="", password="",
auth_should_fail=False, query_should_fail=False):
if password == "":
password = username
args = [binary, "--username", username, "--password", password]
if auth_should_fail:
args.append("--auth-should-fail")
if query_should_fail:
args.append("--query-should-fail")
args.extend(queries)
subprocess.run(args).check_returncode()
class Memgraph:
def __init__(self, binary):
self._binary = binary
self._storage_directory = None
self._process = None
def start(self, args=[]):
self.stop()
self._storage_directory = tempfile.TemporaryDirectory()
self.restart(args)
def restart(self, args=[]):
self.stop()
args = [self._binary, "--durability-directory",
self._storage_directory.name] + list(map(str, args))
self._process = subprocess.Popen(args)
time.sleep(0.1)
assert self._process.poll() is None, "Memgraph process died " \
"prematurely!"
wait_for_server(7687)
def stop(self, check=True):
if self._process is None:
return 0
self._process.terminate()
exitcode = self._process.wait()
self._process = None
if check:
assert exitcode == 0, "Memgraph process didn't exit cleanly!"
return exitcode
def restart_memgraph(memgraph, tester_binary, **kwargs):
args = ["--auth-ldap-enabled", "--auth-ldap-host", "127.0.0.1",
"--auth-ldap-port", "1389"]
if "prefix" not in kwargs:
kwargs["prefix"] = "cn="
if "suffix" not in kwargs:
kwargs["suffix"] = ",ou=people,dc=memgraph,dc=com"
for key, value in kwargs.items():
ldap_key = "--auth-ldap-" + key.replace("_", "-")
if type(value) == bool:
args.append(ldap_key + "=" + str(value).lower())
else:
args.append(ldap_key)
args.append(value)
memgraph.restart(args)
def initialize_test(memgraph, tester_binary, **kwargs):
memgraph.start()
execute_tester(tester_binary,
["CREATE USER root", "GRANT ALL PRIVILEGES TO root"])
check_login = kwargs.pop("check_login", True)
restart_memgraph(memgraph, tester_binary, **kwargs)
if check_login:
execute_tester(tester_binary, [], "root")
# Tests
def test_basic(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary, [], "alice")
execute_tester(tester_binary, ["GRANT MATCH TO alice"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_only_existing_users(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, create_user=False)
execute_tester(tester_binary, [], "alice", auth_should_fail=True)
execute_tester(tester_binary, ["CREATE USER alice"], "root")
execute_tester(tester_binary, [], "alice")
execute_tester(tester_binary, ["GRANT MATCH TO alice"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_mapping(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
role_mapping_root_dn="ou=roles,dc=memgraph,dc=com")
execute_tester(tester_binary, [], "alice")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
execute_tester(tester_binary, [], "bob")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "bob",
query_should_fail=True)
execute_tester(tester_binary, [], "carol")
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol",
query_should_fail=True)
execute_tester(tester_binary, ["GRANT CREATE TO admin"], "root")
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "carol")
execute_tester(tester_binary, ["CREATE (n) RETURN n"], "dave")
memgraph.stop()
def test_role_removal(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
role_mapping_root_dn="ou=roles,dc=memgraph,dc=com")
execute_tester(tester_binary, [], "alice")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
execute_tester(tester_binary, ["GRANT MATCH TO moderator"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
restart_memgraph(memgraph, tester_binary)
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
memgraph.stop()
def test_only_existing_roles(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
role_mapping_root_dn="ou=roles,dc=memgraph,dc=com",
create_role=False)
execute_tester(tester_binary, [], "bob")
execute_tester(tester_binary, [], "alice", auth_should_fail=True)
execute_tester(tester_binary, ["CREATE ROLE moderator"], "root")
execute_tester(tester_binary, [], "alice")
memgraph.stop()
def test_role_is_user(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
role_mapping_root_dn="ou=roles,dc=memgraph,dc=com")
execute_tester(tester_binary, [], "admin")
execute_tester(tester_binary, [], "carol", auth_should_fail=True)
memgraph.stop()
def test_user_is_role(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
role_mapping_root_dn="ou=roles,dc=memgraph,dc=com")
execute_tester(tester_binary, [], "carol")
execute_tester(tester_binary, [], "admin", auth_should_fail=True)
memgraph.stop()
def test_user_permissions_persistancy(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary,
["CREATE USER alice", "GRANT MATCH TO alice"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_permissions_persistancy(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
role_mapping_root_dn="ou=roles,dc=memgraph,dc=com")
execute_tester(tester_binary,
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
"root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_only_authentication(memgraph, tester_binary):
initialize_test(memgraph, tester_binary)
execute_tester(tester_binary,
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
"root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
memgraph.stop()
def test_wrong_prefix(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, prefix="eve", check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
def test_wrong_suffix(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, suffix="", check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
def test_suffix_with_spaces(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
suffix=", ou= people, dc = memgraph, dc = com")
execute_tester(tester_binary,
["CREATE USER alice", "GRANT MATCH TO alice"], "root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_role_mapping_wrong_root_dn(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
role_mapping_root_dn="ou=invalid,dc=memgraph,dc=com")
execute_tester(tester_binary,
["CREATE ROLE moderator", "GRANT MATCH TO moderator"],
"root")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice",
query_should_fail=True)
restart_memgraph(memgraph, tester_binary,
role_mapping_root_dn="ou=roles,dc=memgraph,dc=com")
execute_tester(tester_binary, ["MATCH (n) RETURN n"], "alice")
memgraph.stop()
def test_wrong_password(memgraph, tester_binary):
initialize_test(memgraph, tester_binary,
role_mapping_root_dn="ou=roles,dc=memgraph,dc=com")
execute_tester(tester_binary, [], "root", password="sudo",
auth_should_fail=True)
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
memgraph.stop()
def test_password_persistancy(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, check_login=False)
memgraph.restart()
execute_tester(tester_binary, ["SHOW USERS"], "root", password="sudo")
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
restart_memgraph(memgraph, tester_binary)
execute_tester(tester_binary, [], "root", password="sudo",
auth_should_fail=True)
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
memgraph.restart()
execute_tester(tester_binary, [], "root", password="sudo",
auth_should_fail=True)
execute_tester(tester_binary, ["SHOW USERS"], "root", password="root")
memgraph.stop()
def test_starttls_failure(memgraph, tester_binary):
initialize_test(memgraph, tester_binary, issue_starttls=True,
check_login=False)
execute_tester(tester_binary, [], "root", auth_should_fail=True)
memgraph.stop()
# Startup logic
if __name__ == "__main__":
memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph")
if not os.path.exists(memgraph_binary):
memgraph_binary = os.path.join(PROJECT_DIR, "build_debug", "memgraph")
tester_binary = os.path.join(PROJECT_DIR, "build", "tests",
"integration", "ldap", "tester")
if not os.path.exists(tester_binary):
tester_binary = os.path.join(PROJECT_DIR, "build_debug", "tests",
"integration", "ldap", "tester")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--tester", default=tester_binary)
parser.add_argument("--openldap-dir",
default=os.path.join(SCRIPT_DIR, "openldap-2.4.47"))
args = parser.parse_args()
# Setup Memgraph handler
memgraph = Memgraph(args.memgraph)
# Start the slapd binary
slapd_args = [os.path.join(args.openldap_dir, "exe", "libexec", "slapd"),
"-h", "ldap://127.0.0.1:1389/", "-d", "0"]
slapd = subprocess.Popen(slapd_args)
time.sleep(0.1)
assert slapd.poll() is None, "slapd process died prematurely!"
wait_for_server(1389)
# Register cleanup function
@atexit.register
def cleanup():
mg_stat = memgraph.stop(check=False)
if mg_stat != 0:
print("Memgraph process didn't exit cleanly!")
if slapd.poll() is None:
slapd.terminate()
slapd_stat = slapd.wait()
if slapd_stat != 0:
print("slapd process didn't exit cleanly!")
assert mg_stat == 0 and slapd_stat == 0, "Some of the processes " \
"(memgraph, slapd) crashed!"
# Execute tests
names = sorted(globals().keys())
for name in names:
if not name.startswith("test_"):
continue
test = " ".join(name[5:].split("_"))
func = globals()[name]
print("\033[1;36m~~ Running", test, "test ~~\033[0m")
func(memgraph, args.tester)
print("\033[1;36m~~ Finished", test, "test ~~\033[0m\n")
# Shutdown the slapd binary
slapd.terminate()
assert slapd.wait() == 0, "slapd process didn't exit cleanly!"
sys.exit(0)