2017-06-06 17:00:59 +08:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
'''
|
|
|
|
Common methods for writing graph database
|
|
|
|
integration tests in python.
|
|
|
|
|
|
|
|
Only Bolt communication protocol is supported.
|
|
|
|
'''
|
|
|
|
|
|
|
|
import contextlib
|
2018-04-21 21:36:28 +08:00
|
|
|
import os
|
2017-06-14 19:42:06 +08:00
|
|
|
from threading import Thread
|
|
|
|
from time import sleep
|
2017-06-06 17:00:59 +08:00
|
|
|
|
|
|
|
from argparse import ArgumentParser
|
2017-07-29 23:03:34 +08:00
|
|
|
from neo4j.v1 import GraphDatabase
|
2017-06-06 17:00:59 +08:00
|
|
|
|
|
|
|
|
|
|
|
class OutputData:
|
|
|
|
'''
|
|
|
|
Encapsulates results and info about the tests.
|
|
|
|
'''
|
|
|
|
def __init__(self):
|
|
|
|
# data in time format (name, time, unit)
|
|
|
|
self._measurements = []
|
|
|
|
# name, string data
|
|
|
|
self._statuses = []
|
|
|
|
|
|
|
|
def add_measurement(self, name, time, unit="s"):
|
|
|
|
'''
|
|
|
|
Stores measurement.
|
|
|
|
|
|
|
|
:param name: str, name of measurement
|
|
|
|
:param time: float, time value
|
|
|
|
:param unit: str, time unit
|
|
|
|
'''
|
|
|
|
self._measurements.append((name, time, unit))
|
|
|
|
|
|
|
|
def add_status(self, name, status):
|
|
|
|
'''
|
|
|
|
Stores status data point.
|
|
|
|
|
|
|
|
:param name: str, name of data point
|
|
|
|
:param status: printable value
|
|
|
|
'''
|
|
|
|
self._statuses.append((name, status))
|
|
|
|
|
2017-06-14 19:42:06 +08:00
|
|
|
def dump(self, print_f=print):
|
2017-06-06 17:00:59 +08:00
|
|
|
'''
|
2017-06-14 19:42:06 +08:00
|
|
|
Dumps output using the given ouput function.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
print_f - the function that consumes ouptput. Defaults to
|
|
|
|
the 'print' function.
|
2017-06-06 17:00:59 +08:00
|
|
|
'''
|
2017-06-14 19:42:06 +08:00
|
|
|
print_f("Output data:")
|
2017-06-06 17:00:59 +08:00
|
|
|
for name, status in self._statuses:
|
2017-06-14 19:42:06 +08:00
|
|
|
print_f(" %s: %s" % (name, status))
|
2017-06-06 17:00:59 +08:00
|
|
|
for name, time, unit in self._measurements:
|
2017-06-14 19:42:06 +08:00
|
|
|
print_f(" %s: %s%s" % (name, time, unit))
|
2017-06-06 17:00:59 +08:00
|
|
|
|
|
|
|
|
2017-06-14 19:42:06 +08:00
|
|
|
def execute_till_success(session, query, max_retries=1000):
|
2017-06-06 17:00:59 +08:00
|
|
|
'''
|
|
|
|
Executes a query within Bolt session until the query is
|
|
|
|
successfully executed against the database.
|
|
|
|
|
2017-06-14 19:42:06 +08:00
|
|
|
Args:
|
|
|
|
session - the bolt session to execute the query with
|
|
|
|
query - str, the query to execute
|
|
|
|
max_retries - int, maximum allowed number of attempts
|
|
|
|
|
2017-06-06 17:00:59 +08:00
|
|
|
:param session: active Bolt session
|
|
|
|
:param query: query to execute
|
|
|
|
|
2017-06-14 19:42:06 +08:00
|
|
|
:return: tuple (results_data_list, number_of_failures)
|
2017-06-06 17:00:59 +08:00
|
|
|
'''
|
|
|
|
no_failures = 0
|
2017-06-14 19:42:06 +08:00
|
|
|
while True:
|
2017-06-06 17:00:59 +08:00
|
|
|
try:
|
2017-07-18 01:14:42 +08:00
|
|
|
result = session.run(query)
|
|
|
|
# neo4.Address object can't be pickled so we need to convert it to
|
|
|
|
# str in metadata dictionary. This is important so that we can use
|
|
|
|
# this function in multiprocessing.Pool.map.
|
|
|
|
metadata = {k: str(v) for k, v in
|
|
|
|
result.summary().metadata.items()}
|
|
|
|
return result.data(), no_failures, metadata
|
2017-06-06 17:00:59 +08:00
|
|
|
except Exception:
|
|
|
|
no_failures += 1
|
2017-06-14 19:42:06 +08:00
|
|
|
if no_failures >= max_retries:
|
|
|
|
raise Exception("Query '%s' failed %d times, aborting" %
|
|
|
|
(query, max_retries))
|
|
|
|
|
|
|
|
|
|
|
|
def batch(input, batch_size):
|
|
|
|
""" Batches the given input (must be iterable).
|
|
|
|
Supports input generators. Returns a generator.
|
|
|
|
All is lazy. The last batch can contain less elements
|
|
|
|
then `batch_size`, but is for sure more then zero.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
input - iterable of elements
|
|
|
|
batch_size - number of elements in the batch
|
|
|
|
Return:
|
|
|
|
a generator that yields batches of elements.
|
|
|
|
"""
|
|
|
|
assert batch_size > 1, "Batch size must be greater then zero"
|
|
|
|
|
|
|
|
batch = []
|
|
|
|
for element in input:
|
|
|
|
batch.append(element)
|
|
|
|
if len(batch) >= batch_size:
|
|
|
|
yield batch
|
|
|
|
batch = []
|
|
|
|
if len(batch):
|
|
|
|
yield batch
|
|
|
|
|
|
|
|
|
|
|
|
def render(template, iterable_arguments):
|
|
|
|
"""
|
|
|
|
Calls template.format() for each given argument.
|
|
|
|
"""
|
|
|
|
for arguments in iterable_arguments:
|
|
|
|
yield template.format(arguments)
|
2017-06-06 17:00:59 +08:00
|
|
|
|
|
|
|
|
|
|
|
def assert_equal(expected, actual, message):
|
|
|
|
'''
|
|
|
|
Compares expected and actual values. If values are not the same terminate
|
|
|
|
the execution.
|
|
|
|
|
|
|
|
:param expected: expected value
|
|
|
|
:param actual: actual value
|
|
|
|
:param message: str, message in case that the values are not equal, must
|
|
|
|
contain two placeholders (%s) to print the values.
|
|
|
|
'''
|
|
|
|
assert expected == actual, message % (expected, actual)
|
|
|
|
|
|
|
|
|
2017-06-14 19:42:06 +08:00
|
|
|
def connection_argument_parser():
|
2017-06-06 17:00:59 +08:00
|
|
|
'''
|
|
|
|
Parses arguments related to establishing database connection like
|
|
|
|
host, port, username, etc.
|
|
|
|
|
|
|
|
:return: An instance of ArgumentParser
|
|
|
|
'''
|
|
|
|
parser = ArgumentParser(description=__doc__)
|
|
|
|
|
2018-06-20 23:44:47 +08:00
|
|
|
parser.add_argument('--endpoint', type=str, default='127.0.0.1:7687',
|
2017-06-06 17:00:59 +08:00
|
|
|
help='DBMS instance endpoint. '
|
|
|
|
'Bolt protocol is the only option.')
|
|
|
|
parser.add_argument('--username', type=str, default='neo4j',
|
|
|
|
help='DBMS instance username.')
|
|
|
|
parser.add_argument('--password', type=int, default='1234',
|
|
|
|
help='DBMS instance password.')
|
2018-06-20 23:44:47 +08:00
|
|
|
parser.add_argument('--use-ssl', action='store_true',
|
2017-06-06 17:00:59 +08:00
|
|
|
help="Is SSL enabled?")
|
|
|
|
return parser
|
|
|
|
|
|
|
|
|
|
|
|
@contextlib.contextmanager
|
|
|
|
def bolt_session(url, auth, ssl=False):
|
|
|
|
'''
|
|
|
|
with wrapper around Bolt session.
|
|
|
|
|
2018-06-20 23:44:47 +08:00
|
|
|
:param url: str, e.g. "bolt://127.0.0.1:7687"
|
2017-06-06 17:00:59 +08:00
|
|
|
:param auth: auth method, goes directly to the Bolt driver constructor
|
|
|
|
:param ssl: bool, is ssl enabled
|
|
|
|
'''
|
|
|
|
driver = GraphDatabase.driver(url, auth=auth, encrypted=ssl)
|
|
|
|
session = driver.session()
|
|
|
|
try:
|
|
|
|
yield session
|
|
|
|
finally:
|
|
|
|
session.close()
|
|
|
|
driver.close()
|
|
|
|
|
|
|
|
|
2017-12-05 21:13:14 +08:00
|
|
|
# If you are using session with multiprocessing take a look at SesssionCache
|
|
|
|
# in bipartite for an idea how to reuse sessions.
|
2017-06-06 17:00:59 +08:00
|
|
|
def argument_session(args):
|
|
|
|
'''
|
2017-06-14 19:42:06 +08:00
|
|
|
:return: Bolt session context manager based on program arguments
|
2017-06-06 17:00:59 +08:00
|
|
|
'''
|
|
|
|
return bolt_session('bolt://' + args.endpoint,
|
2018-06-20 23:44:47 +08:00
|
|
|
(args.username, str(args.password)),
|
|
|
|
args.use_ssl)
|
2017-06-14 19:42:06 +08:00
|
|
|
|
|
|
|
|
2018-06-20 23:44:47 +08:00
|
|
|
def argument_driver(args):
|
2017-06-14 19:42:06 +08:00
|
|
|
return GraphDatabase.driver(
|
|
|
|
'bolt://' + args.endpoint,
|
2017-07-29 23:03:34 +08:00
|
|
|
auth=(args.username, str(args.password)),
|
2018-06-20 23:44:47 +08:00
|
|
|
encrypted=args.use_ssl)
|
2017-06-14 19:42:06 +08:00
|
|
|
|
2018-04-21 21:36:28 +08:00
|
|
|
# This class is used to create and cache sessions. Session is cached by args
|
|
|
|
# used to create it and process' pid in which it was created. This makes it easy
|
|
|
|
# to reuse session with python multiprocessing primitives like pmap.
|
|
|
|
class SessionCache:
|
|
|
|
cache = {}
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def argument_session(args):
|
|
|
|
key = tuple(vars(args).items()) + (os.getpid(),)
|
|
|
|
if key in SessionCache.cache:
|
|
|
|
return SessionCache.cache[key][1]
|
|
|
|
driver = argument_driver(args) # |
|
|
|
|
session = driver.session() # V
|
|
|
|
SessionCache.cache[key] = (driver, session)
|
|
|
|
return session
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def cleanup():
|
|
|
|
for _, (driver, session) in SessionCache.cache.items():
|
|
|
|
session.close()
|
|
|
|
driver.close()
|
|
|
|
|
2017-06-14 19:42:06 +08:00
|
|
|
|
|
|
|
def periodically_execute(callable, args, interval, daemon=True):
|
|
|
|
"""
|
|
|
|
Periodically calls the given callable.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
callable - the callable to call
|
|
|
|
args - arguments to pass to callable
|
|
|
|
interval - time (in seconds) between two calls
|
|
|
|
deamon - if the execution thread should be a daemon
|
|
|
|
"""
|
|
|
|
def periodic_call():
|
|
|
|
while True:
|
|
|
|
sleep(interval)
|
|
|
|
callable()
|
|
|
|
|
|
|
|
Thread(target=periodic_call, args=args, daemon=daemon).start()
|