diff --git a/demo/.gitignore b/demo/.gitignore new file mode 100644 index 000000000..af8b9686e --- /dev/null +++ b/demo/.gitignore @@ -0,0 +1 @@ +ve/ diff --git a/demo/demo_server.docker b/demo/demo_server.docker new file mode 100644 index 000000000..38df666d5 --- /dev/null +++ b/demo/demo_server.docker @@ -0,0 +1,15 @@ +FROM ubuntu:16.04 + +RUN apt-get update +RUN apt-get install -y python3.5 python3-pip +RUN apt-get install -y python3-setuptools +# RUN rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + +RUN pip3 install flask requests + +COPY simulation /app/simulation +COPY demo_server_run.py /app/demo_server_run.py + +WORKDIR /app + +CMD ["python3", "demo_server_run.py"] diff --git a/demo/demo_server_run.py b/demo/demo_server_run.py new file mode 100644 index 000000000..3014d7760 --- /dev/null +++ b/demo/demo_server_run.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import logging + +from simulation.web_server import SimulationWebServer + + +def main(): + ''' + The frontend run script. Environment could be configured + via the MEMGRAPH_DEMO environtment variable. Available environments + are: debug, prod. + ''' + if 'MEMGRAPH_DEMO' in os.environ: + environment = os.environ['MEMGRAPH_DEMO'] + else: + environment = "debug" + + frontend_server = SimulationWebServer() + + if environment == 'prod': + logging.basicConfig(level=logging.WARNING) + frontend_server.run("0.0.0.0", 8080, False) + elif environment == 'debug': + logging.basicConfig(level=logging.INFO) + frontend_server.run("0.0.0.0", 8080, True) + +if __name__ == '__main__': + main() diff --git a/demo/memgraph_demo.docker b/demo/memgraph_demo.docker index 3b9be33e8..98439c05b 100644 --- a/demo/memgraph_demo.docker +++ b/demo/memgraph_demo.docker @@ -1,7 +1,8 @@ FROM ubuntu:16.04 -RUN apt-get update && apt-get install -y python3.5 python3-pip \ - && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* +RUN apt-get update +RUN apt-get install -y python3.5 python3-pip +RUN rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN pip3 install flask requests diff --git a/demo/neo4j_demo.run b/demo/neo4j_demo.run index 18ddb3f5e..c55b522fe 100755 --- a/demo/neo4j_demo.run +++ b/demo/neo4j_demo.run @@ -2,4 +2,4 @@ docker stop neo4j_demo docker rm neo4j_demo -docker run -d --name neo4j_demo -p 7474:7474 neo4j +docker run -d --name neo4j_demo --net=host -p 7474:7474 neo4j diff --git a/demo/simulation/__init__.py b/demo/simulation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/demo/simulation/epoch_result.py b/demo/simulation/epoch_result.py new file mode 100644 index 000000000..ade054e45 --- /dev/null +++ b/demo/simulation/epoch_result.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- + + +class SimulationEpochResult(object): + ''' + ''' + + def __init__(self, per_query, for_all): + ''' + ''' + self.per_query = per_query + self.for_all = for_all + + def json_data(self): + ''' + ''' + return { + "per_query": [item.json_data() for item in self.per_query], + "for_all": self.for_all + } diff --git a/demo/simulation/executor.py b/demo/simulation/executor.py new file mode 100644 index 000000000..02c9ce076 --- /dev/null +++ b/demo/simulation/executor.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- + +import time +import logging +import itertools +import requests +from concurrent.futures import ProcessPoolExecutor + +from .epoch_result import SimulationEpochResult +from .group_result import SimulationGrupeResult +from .iteration_result import SimulationIterationResult + +log = logging.getLogger(__name__) + + +def calculate_qps(results): + ''' + ''' + max_delta_t = max([result.delta_t for result in results]) + qps = sum([result.count * (max_delta_t / result.delta_t) + for result in results]) / max_delta_t + return qps + + +def send(query): + ''' + ''' + requests.post('http://localhost:7474/db/data/transaction/commit', + json={ + "statements": [ + {"statement": query} + ] + }, + headers={'Authorization': 'Basic bmVvNGo6cGFzcw=='}) + + +class SimulationExecutor(object): + ''' + ''' + + def __init__(self): + ''' + ''' + pass + + def setup(self, params): + ''' + ''' + self.params = params + return self + + def iteration(self, task): + ''' + ''' + count = 0 + start_time = time.time() + + for i in range(self.params.queries_per_period): + send(task.query) + + end_time = time.time() + count = count + 1 + delta_t = end_time - start_time + + if delta_t > self.params.period_time: + break + + return SimulationIterationResult(task.id, count, delta_t) + + def epoch(self): + ''' + ''' + log.info('epoch') + + pool = ProcessPoolExecutor + with pool(max_workers=self.params.workers_number) as executor: + + # execute all tasks + futures = [executor.submit(self.iteration, task) + for task in self.params.tasks + for i in range(self.params.workers_per_query)] + results = [future.result() for future in futures] + + # per query calculation + grouped = itertools.groupby(results, lambda x: x.id) + per_query = [SimulationGrupeResult(id, calculate_qps(list(tasks))) + for id, tasks in grouped] + + # for all calculation + for_all = calculate_qps(results) + + return SimulationEpochResult(per_query, for_all) + + def execute(self): + ''' + ''' + return self.epoch() diff --git a/demo/simulation/group_result.py b/demo/simulation/group_result.py new file mode 100644 index 000000000..a798eebd3 --- /dev/null +++ b/demo/simulation/group_result.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- + + +class SimulationGrupeResult(object): + ''' + ''' + + def __init__(self, id, queries_per_period): + ''' + ''' + self.id = id + self.queries_per_second = queries_per_period + + def json_data(self): + ''' + ''' + return { + 'id': self.id, + 'queries_per_second': self.queries_per_second + } diff --git a/demo/simulation/iteration_result.py b/demo/simulation/iteration_result.py new file mode 100644 index 000000000..4e2103d76 --- /dev/null +++ b/demo/simulation/iteration_result.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + + +class SimulationIterationResult(object): + ''' + ''' + + def __init__(self, id, count, delta_t): + ''' + ''' + self.id = id + self.count = count + self.delta_t = delta_t + self.queries_per_second = self.count / self.delta_t + + def json_data(self): + ''' + ''' + return { + "id": self.id, + "queries_per_second": self.queries_per_second + } diff --git a/demo/simulation/params.py b/demo/simulation/params.py new file mode 100644 index 000000000..bad402fc3 --- /dev/null +++ b/demo/simulation/params.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- + + +class SimulationParams(object): + ''' + ''' + + def __init__(self): + ''' + ''' + self.protocol = 'http' + self.host = 'localhost' + self.port = 7474 + + self.workers_number = 16 + self.period_time = 0.5 + self.workers_per_query = 1 + self.queries_per_second = 15000 + self.recalculate_qps() + + self.tasks = [] + + def json_data(self): + ''' + ''' + return { + "protocol": self.protocol, + "host": self.host, + "port": self.port, + "workers_number": self.workers_number, + "period_time": self.period_time, + "workers_per_query": self.workers_per_query, + "queries_per_second": self.queries_per_second, + "queries_per_period": self.queries_per_period + } + + # protocol + @property + def protocol(self): + return self._protocol + + @protocol.setter + def protocol(self, value): + self._protocol = value + + # host + @property + def host(self): + return self._host + + @host.setter + def host(self, value): + self._host = value + + # port + @property + def port(self): + return self._port + + @port.setter + def port(self, value): + self._port = value + + # workers number + @property + def workers_number(self): + return self._workers_number + + @workers_number.setter + def workers_number(self, value): + self._workers_number = value + + # workers per query + @property + def workers_per_query(self): + return self._workers_per_query + + @workers_per_query.setter + def workers_per_query(self, value): + self._workers_per_query = value + + # queries per second + @property + def queries_per_second(self): + return self._queries_per_second + + @queries_per_second.setter + def queries_per_second(self, value): + self._queries_per_second = value + + def recalculate_qps(self): + try: + self.queries_per_period = \ + int(self.queries_per_second * self.period_time) + except: + pass + + # queries per period + @property + def queries_per_period(self): + return self._queries_per_period + + @queries_per_period.setter + def queries_per_period(self, value): + self._queries_per_period = value + self.recalculate_qps() + + # period time + @property + def period_time(self): + return self._period_time + + @period_time.setter + def period_time(self, value): + self._period_time = value + self.recalculate_qps() diff --git a/demo/simulation/task.py b/demo/simulation/task.py new file mode 100644 index 000000000..687e23fee --- /dev/null +++ b/demo/simulation/task.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- + + +class SimulationTask(object): + ''' + ''' + + def __init__(self, id, query): + ''' + ''' + self.id = id + self.query = query diff --git a/demo/simulation/web_server.py b/demo/simulation/web_server.py new file mode 100644 index 000000000..bcc69172d --- /dev/null +++ b/demo/simulation/web_server.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- + +import logging +import threading +from flask import Flask, request, jsonify + +from .executor import SimulationExecutor +from .params import SimulationParams +from .task import SimulationTask + +log = logging.getLogger(__name__) + + +class SimulationWebServer(): + ''' + Memgraph demo fontend server. For now it wraps the flask server. + ''' + + def __init__(self): + ''' + Instantiates the flask web server. + ''' + self.server = Flask(__name__) + self.is_simulation_running = False + self.simulation_stats = None + self.simulation_params = SimulationParams() + self.simulation_executor = \ + SimulationExecutor().setup(self.simulation_params) + + def setup_routes(self): + ''' + Setup all available rutes: + /ping + ''' + self.server.add_url_rule('/ping', 'ping', self.ping) + self.server.add_url_rule('/tasks', 'tasks', self.tasks, + methods=['POST']) + self.server.add_url_rule('/start', 'start', self.start, + methods=['POST']) + self.server.add_url_rule('/stop', 'stop', self.stop, + methods=['POST']) + self.server.add_url_rule('/stats', 'stats', self.stats, + methods=['GET']) + self.server.add_url_rule('/params', 'params_get', self.params_get, + methods=['GET']) + self.server.add_url_rule('/params', 'params_set', self.params_set, + methods=['POST']) + + def run(self, host="127.0.0.1", port=8080, debug=False): + ''' + Runs the server. Before run, routes are initialized. + ''' + self.setup_routes() + self.server.run(host=host, port=port, debug=debug) + + def ping(self): + ''' + Ping endpoint. Returns 204 HTTP status code. + ''' + return ('', 204) + + def tasks(self): + ''' + ''' + data = request.get_json()['data'] + self.simulation_params.tasks = \ + [SimulationTask(item['id'], item['query']) + for item in data] + return ('', 200) + + def run_simulation(self): + ''' + ''' + log.info('new simulation run') + + while self.is_simulation_running: + self.simulation_stats = self.simulation_executor.execute() + + def start(self): + ''' + ''' + self.is_simulation_running = True + t = threading.Thread(target=self.run_simulation, daemon=True) + t.start() + return ('', 204) + + def stop(self): + ''' + ''' + self.is_simulation_running = False + return ('', 204) + + def stats(self): + ''' + ''' + if not self.simulation_stats: + return ('', 204) + + return jsonify(self.simulation_stats.json_data()) + + def params_get(self): + ''' + ''' + return jsonify(self.simulation_params.json_data()) + + def params_set(self): + ''' + ''' + data = request.get_json() + + param_names = ['protocol', 'port', 'workers_number', 'period_time', + 'queries_per_second', 'workers_per_query'] + + for param in param_names: + if param in data: + setattr(self.simulation_params, param, data[param]) + + return self.params_get()