memgraph workload simulation server - first implementation

This commit is contained in:
Marko Budiselic 2016-02-28 15:59:10 +01:00
parent 3eb84e8f1c
commit e93687b1db
13 changed files with 456 additions and 3 deletions

1
demo/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
ve/

15
demo/demo_server.docker Normal file
View File

@ -0,0 +1,15 @@
FROM ubuntu:16.04
RUN apt-get update
RUN apt-get install -y python3.5 python3-pip
RUN apt-get install -y python3-setuptools
# RUN rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN pip3 install flask requests
COPY simulation /app/simulation
COPY demo_server_run.py /app/demo_server_run.py
WORKDIR /app
CMD ["python3", "demo_server_run.py"]

31
demo/demo_server_run.py Normal file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import logging
from simulation.web_server import SimulationWebServer
def main():
'''
The frontend run script. Environment could be configured
via the MEMGRAPH_DEMO environtment variable. Available environments
are: debug, prod.
'''
if 'MEMGRAPH_DEMO' in os.environ:
environment = os.environ['MEMGRAPH_DEMO']
else:
environment = "debug"
frontend_server = SimulationWebServer()
if environment == 'prod':
logging.basicConfig(level=logging.WARNING)
frontend_server.run("0.0.0.0", 8080, False)
elif environment == 'debug':
logging.basicConfig(level=logging.INFO)
frontend_server.run("0.0.0.0", 8080, True)
if __name__ == '__main__':
main()

View File

@ -1,7 +1,8 @@
FROM ubuntu:16.04
RUN apt-get update && apt-get install -y python3.5 python3-pip \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN apt-get update
RUN apt-get install -y python3.5 python3-pip
RUN rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN pip3 install flask requests

View File

@ -2,4 +2,4 @@
docker stop neo4j_demo
docker rm neo4j_demo
docker run -d --name neo4j_demo -p 7474:7474 neo4j
docker run -d --name neo4j_demo --net=host -p 7474:7474 neo4j

View File

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
class SimulationEpochResult(object):
'''
'''
def __init__(self, per_query, for_all):
'''
'''
self.per_query = per_query
self.for_all = for_all
def json_data(self):
'''
'''
return {
"per_query": [item.json_data() for item in self.per_query],
"for_all": self.for_all
}

View File

@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
import time
import logging
import itertools
import requests
from concurrent.futures import ProcessPoolExecutor
from .epoch_result import SimulationEpochResult
from .group_result import SimulationGrupeResult
from .iteration_result import SimulationIterationResult
log = logging.getLogger(__name__)
def calculate_qps(results):
'''
'''
max_delta_t = max([result.delta_t for result in results])
qps = sum([result.count * (max_delta_t / result.delta_t)
for result in results]) / max_delta_t
return qps
def send(query):
'''
'''
requests.post('http://localhost:7474/db/data/transaction/commit',
json={
"statements": [
{"statement": query}
]
},
headers={'Authorization': 'Basic bmVvNGo6cGFzcw=='})
class SimulationExecutor(object):
'''
'''
def __init__(self):
'''
'''
pass
def setup(self, params):
'''
'''
self.params = params
return self
def iteration(self, task):
'''
'''
count = 0
start_time = time.time()
for i in range(self.params.queries_per_period):
send(task.query)
end_time = time.time()
count = count + 1
delta_t = end_time - start_time
if delta_t > self.params.period_time:
break
return SimulationIterationResult(task.id, count, delta_t)
def epoch(self):
'''
'''
log.info('epoch')
pool = ProcessPoolExecutor
with pool(max_workers=self.params.workers_number) as executor:
# execute all tasks
futures = [executor.submit(self.iteration, task)
for task in self.params.tasks
for i in range(self.params.workers_per_query)]
results = [future.result() for future in futures]
# per query calculation
grouped = itertools.groupby(results, lambda x: x.id)
per_query = [SimulationGrupeResult(id, calculate_qps(list(tasks)))
for id, tasks in grouped]
# for all calculation
for_all = calculate_qps(results)
return SimulationEpochResult(per_query, for_all)
def execute(self):
'''
'''
return self.epoch()

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
class SimulationGrupeResult(object):
'''
'''
def __init__(self, id, queries_per_period):
'''
'''
self.id = id
self.queries_per_second = queries_per_period
def json_data(self):
'''
'''
return {
'id': self.id,
'queries_per_second': self.queries_per_second
}

View File

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
class SimulationIterationResult(object):
'''
'''
def __init__(self, id, count, delta_t):
'''
'''
self.id = id
self.count = count
self.delta_t = delta_t
self.queries_per_second = self.count / self.delta_t
def json_data(self):
'''
'''
return {
"id": self.id,
"queries_per_second": self.queries_per_second
}

116
demo/simulation/params.py Normal file
View File

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
class SimulationParams(object):
'''
'''
def __init__(self):
'''
'''
self.protocol = 'http'
self.host = 'localhost'
self.port = 7474
self.workers_number = 16
self.period_time = 0.5
self.workers_per_query = 1
self.queries_per_second = 15000
self.recalculate_qps()
self.tasks = []
def json_data(self):
'''
'''
return {
"protocol": self.protocol,
"host": self.host,
"port": self.port,
"workers_number": self.workers_number,
"period_time": self.period_time,
"workers_per_query": self.workers_per_query,
"queries_per_second": self.queries_per_second,
"queries_per_period": self.queries_per_period
}
# protocol
@property
def protocol(self):
return self._protocol
@protocol.setter
def protocol(self, value):
self._protocol = value
# host
@property
def host(self):
return self._host
@host.setter
def host(self, value):
self._host = value
# port
@property
def port(self):
return self._port
@port.setter
def port(self, value):
self._port = value
# workers number
@property
def workers_number(self):
return self._workers_number
@workers_number.setter
def workers_number(self, value):
self._workers_number = value
# workers per query
@property
def workers_per_query(self):
return self._workers_per_query
@workers_per_query.setter
def workers_per_query(self, value):
self._workers_per_query = value
# queries per second
@property
def queries_per_second(self):
return self._queries_per_second
@queries_per_second.setter
def queries_per_second(self, value):
self._queries_per_second = value
def recalculate_qps(self):
try:
self.queries_per_period = \
int(self.queries_per_second * self.period_time)
except:
pass
# queries per period
@property
def queries_per_period(self):
return self._queries_per_period
@queries_per_period.setter
def queries_per_period(self, value):
self._queries_per_period = value
self.recalculate_qps()
# period time
@property
def period_time(self):
return self._period_time
@period_time.setter
def period_time(self, value):
self._period_time = value
self.recalculate_qps()

12
demo/simulation/task.py Normal file
View File

@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
class SimulationTask(object):
'''
'''
def __init__(self, id, query):
'''
'''
self.id = id
self.query = query

View File

@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
import logging
import threading
from flask import Flask, request, jsonify
from .executor import SimulationExecutor
from .params import SimulationParams
from .task import SimulationTask
log = logging.getLogger(__name__)
class SimulationWebServer():
'''
Memgraph demo fontend server. For now it wraps the flask server.
'''
def __init__(self):
'''
Instantiates the flask web server.
'''
self.server = Flask(__name__)
self.is_simulation_running = False
self.simulation_stats = None
self.simulation_params = SimulationParams()
self.simulation_executor = \
SimulationExecutor().setup(self.simulation_params)
def setup_routes(self):
'''
Setup all available rutes:
/ping
'''
self.server.add_url_rule('/ping', 'ping', self.ping)
self.server.add_url_rule('/tasks', 'tasks', self.tasks,
methods=['POST'])
self.server.add_url_rule('/start', 'start', self.start,
methods=['POST'])
self.server.add_url_rule('/stop', 'stop', self.stop,
methods=['POST'])
self.server.add_url_rule('/stats', 'stats', self.stats,
methods=['GET'])
self.server.add_url_rule('/params', 'params_get', self.params_get,
methods=['GET'])
self.server.add_url_rule('/params', 'params_set', self.params_set,
methods=['POST'])
def run(self, host="127.0.0.1", port=8080, debug=False):
'''
Runs the server. Before run, routes are initialized.
'''
self.setup_routes()
self.server.run(host=host, port=port, debug=debug)
def ping(self):
'''
Ping endpoint. Returns 204 HTTP status code.
'''
return ('', 204)
def tasks(self):
'''
'''
data = request.get_json()['data']
self.simulation_params.tasks = \
[SimulationTask(item['id'], item['query'])
for item in data]
return ('', 200)
def run_simulation(self):
'''
'''
log.info('new simulation run')
while self.is_simulation_running:
self.simulation_stats = self.simulation_executor.execute()
def start(self):
'''
'''
self.is_simulation_running = True
t = threading.Thread(target=self.run_simulation, daemon=True)
t.start()
return ('', 204)
def stop(self):
'''
'''
self.is_simulation_running = False
return ('', 204)
def stats(self):
'''
'''
if not self.simulation_stats:
return ('', 204)
return jsonify(self.simulation_stats.json_data())
def params_get(self):
'''
'''
return jsonify(self.simulation_params.json_data())
def params_set(self):
'''
'''
data = request.get_json()
param_names = ['protocol', 'port', 'workers_number', 'period_time',
'queries_per_second', 'workers_per_query']
for param in param_names:
if param in data:
setattr(self.simulation_params, param, data[param])
return self.params_get()