demo server username + password params, workers_number param was removed

This commit is contained in:
Marko Budiselic 2016-02-28 19:59:07 +01:00
parent e93687b1db
commit ab24792ef1
9 changed files with 157 additions and 54 deletions

View File

@ -9,7 +9,7 @@ from simulation.web_server import SimulationWebServer
def main():
'''
The frontend run script. Environment could be configured
The demo server run script. Environment could be configured
via the MEMGRAPH_DEMO environtment variable. Available environments
are: debug, prod.
'''

View File

@ -1,9 +1,8 @@
FROM ubuntu:16.04
RUN apt-get update
RUN apt-get install -y python3.5 python3-pip
RUN rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN apt-get install -y git
RUN pip3 install flask requests
RUN git clone https://pullbot:JnSdamFGKOanF1@phabricator.tomicevic.com/diffusion/MG/memgraph.git
CMD ["/bin/bash"]

View File

@ -3,16 +3,22 @@
class SimulationEpochResult(object):
'''
Encapsulates single epoch result.
'''
def __init__(self, per_query, for_all):
'''
Sets per_query and for_all results.
:param per_query: list of SimulationGroupResult objects
:param for_all: float, queries per second
'''
self.per_query = per_query
self.for_all = for_all
def json_data(self):
'''
:returns: dict, epoch results
'''
return {
"per_query": [item.json_data() for item in self.per_query],

View File

@ -7,7 +7,7 @@ import requests
from concurrent.futures import ProcessPoolExecutor
from .epoch_result import SimulationEpochResult
from .group_result import SimulationGrupeResult
from .group_result import SimulationGroupResult
from .iteration_result import SimulationIterationResult
log = logging.getLogger(__name__)
@ -15,6 +15,20 @@ log = logging.getLogger(__name__)
def calculate_qps(results):
'''
Calculates queris per second for the results list. The main idea
is to scale up results to the result with the biggest execution time.
Example:
Let say that 2 workers execute the same query. First worker
executes 100 queries in 1s, second worker executes 10 queries in 10s.
In that case first query result has to be scaled up.
Here we are goint to make aproximation, if first worker
execution time was 10s, it would execute 1000 queries.
So, the total speed is (1000q + 10q) / 10s = 101 qps. In that case
101 would be returned from this function.
:param results: list of SimulationIterationResult objects
:returns: queries per second result calculated on the input list
'''
max_delta_t = max([result.delta_t for result in results])
qps = sum([result.count * (max_delta_t / result.delta_t)
@ -22,45 +36,63 @@ def calculate_qps(results):
return qps
def send(query):
'''
'''
requests.post('http://localhost:7474/db/data/transaction/commit',
json={
"statements": [
{"statement": query}
]
},
headers={'Authorization': 'Basic bmVvNGo6cGFzcw=='})
class SimulationExecutor(object):
'''
The main executor object. Every period the instance of this class
will execute all queries, collect the results and calculate speed of
queries execution.
'''
def __init__(self):
'''
'''
pass
def setup(self, params):
'''
Setup params and initialize the workers pool.
:param params: SimulationParams object
'''
self.params = params
self.pool = ProcessPoolExecutor
return self
def send(self, query):
'''
Sends the query to the graph database.
TODO: replace random arguments
:param query: str, query string
'''
requests.post('http://localhost:7474/db/data/transaction/commit',
json={
"statements": [
{"statement": query}
]
},
headers={'Authorization': 'Basic bmVvNGo6cGFzcw=='})
def iteration(self, task):
'''
Executes the task. Task encapsulates the informations about query.
The task is smallest piece of work and this method will try to execute
queries (one query, more times) from the task as fastest as possible.
Execution time of this method is constrained with the period_time time.
:param task: instance of SimulationTask class.
:returns: SimulationIterationResult
'''
count = 0
start_time = time.time()
delta_t = 0
for i in range(self.params.queries_per_period):
send(task.query)
# send the query on execution
start_time = time.time()
self.send(task.query)
end_time = time.time()
# calculate delta time
delta_t = delta_t + (end_time - start_time)
count = count + 1
delta_t = end_time - start_time
if delta_t > self.params.period_time:
break
@ -69,11 +101,12 @@ class SimulationExecutor(object):
def epoch(self):
'''
Single simulation epoc. All workers are going to execute
their queries in the period that is period_time seconds length.
'''
log.info('epoch')
pool = ProcessPoolExecutor
with pool(max_workers=self.params.workers_number) as executor:
with self.pool() as executor:
# execute all tasks
futures = [executor.submit(self.iteration, task)
@ -83,15 +116,10 @@ class SimulationExecutor(object):
# per query calculation
grouped = itertools.groupby(results, lambda x: x.id)
per_query = [SimulationGrupeResult(id, calculate_qps(list(tasks)))
per_query = [SimulationGroupResult(id, calculate_qps(list(tasks)))
for id, tasks in grouped]
# for all calculation
for_all = calculate_qps(results)
return SimulationEpochResult(per_query, for_all)
def execute(self):
'''
'''
return self.epoch()

View File

@ -1,18 +1,23 @@
# -*- coding: utf-8 -*-
class SimulationGrupeResult(object):
class SimulationGroupResult(object):
'''
Encapsulates query per seconds information for qroup of workers
(all workers that execute the same query).
'''
def __init__(self, id, queries_per_period):
def __init__(self, id, queries_per_second):
'''
:param id: str, query id
:param queries_per_second: float, queries per second
'''
self.id = id
self.queries_per_second = queries_per_period
self.queries_per_second = queries_per_second
def json_data(self):
'''
:returns: dict, {query_id(str):queries_per_second(float)}
'''
return {
'id': self.id,

View File

@ -3,10 +3,14 @@
class SimulationIterationResult(object):
'''
Encapsulates single worker result.
'''
def __init__(self, id, count, delta_t):
'''
:param id: str, query id
:param count: int, number of the query exection
:param delta_t: time of execution
'''
self.id = id
self.count = count
@ -15,6 +19,7 @@ class SimulationIterationResult(object):
def json_data(self):
'''
:returns: dict {query_id(str):queries_per_second(float)}
'''
return {
"id": self.id,

View File

@ -1,37 +1,44 @@
# -*- coding: utf-8 -*-
import base64
class SimulationParams(object):
'''
Encapsulates the simulation params.
'''
def __init__(self):
'''
Setup default params values.
'''
self.protocol = 'http'
self.host = 'localhost'
self.port = 7474
self.username = ''
self.password = ''
self.workers_number = 16
self.period_time = 0.5
self.workers_per_query = 1
self.queries_per_second = 15000
self.recalculate_qps()
self.recalculate_qpp()
self.tasks = []
def json_data(self):
'''
:returns: dict with all param values
'''
return {
"protocol": self.protocol,
"host": self.host,
"port": self.port,
"workers_number": self.workers_number,
"username": self.username,
"password": self.password,
"period_time": self.period_time,
"workers_per_query": self.workers_per_query,
"queries_per_second": self.queries_per_second,
"queries_per_period": self.queries_per_period
"queries_per_period": self.queries_per_period,
"queries_per_second": self.queries_per_second
}
# protocol
@ -61,14 +68,44 @@ class SimulationParams(object):
def port(self, value):
self._port = value
# workers number
# username
@property
def workers_number(self):
return self._workers_number
def username(self):
return self._username
@workers_number.setter
def workers_number(self, value):
self._workers_number = value
@username.setter
def username(self, value):
self._username = value
self.http_basic()
# password
@property
def password(self):
return self._password
@password.setter
def password(self, value):
self._password = value
self.http_basic()
def http_basic(self):
'''
Recalculates http authorization header.
'''
try:
encoded = base64.b64encode(self.username + ":" + self.password)
self.authorization = {"Authorization": "Basic " + encoded}
except:
pass
# authorization
@property
def authorization(self):
return self._authorization
@authorization.setter
def authorization(self, value):
self._authorization = value
# workers per query
@property
@ -87,8 +124,9 @@ class SimulationParams(object):
@queries_per_second.setter
def queries_per_second(self, value):
self._queries_per_second = value
self.recalculate_qpp()
def recalculate_qps(self):
def recalculate_qpp(self):
try:
self.queries_per_period = \
int(self.queries_per_second * self.period_time)
@ -103,7 +141,6 @@ class SimulationParams(object):
@queries_per_period.setter
def queries_per_period(self, value):
self._queries_per_period = value
self.recalculate_qps()
# period time
@property
@ -113,4 +150,4 @@ class SimulationParams(object):
@period_time.setter
def period_time(self, value):
self._period_time = value
self.recalculate_qps()
self.recalculate_qpp()

View File

@ -3,10 +3,13 @@
class SimulationTask(object):
'''
Encapsulates query data.
'''
def __init__(self, id, query):
'''
:param id: query id
:param query: str, query string
'''
self.id = id
self.query = query

View File

@ -30,7 +30,13 @@ class SimulationWebServer():
def setup_routes(self):
'''
Setup all available rutes:
/ping
GET /ping
POST /tasks
POST /start
POST /stop
GET /stats
GET /params
POST /params
'''
self.server.add_url_rule('/ping', 'ping', self.ping)
self.server.add_url_rule('/tasks', 'tasks', self.tasks,
@ -61,23 +67,30 @@ class SimulationWebServer():
def tasks(self):
'''
Register tasks. Task is object that encapsulates single query data.
'''
data = request.get_json()['data']
self.simulation_params.tasks = \
[SimulationTask(item['id'], item['query'])
for item in data]
return ('', 200)
def run_simulation(self):
'''
If flag is_simulation_running flag is up (True) the executor
epoch will be executed. Epochs will be executed until somebody
set is_simulation_running flag to Flase.
'''
log.info('new simulation run')
while self.is_simulation_running:
self.simulation_stats = self.simulation_executor.execute()
self.simulation_stats = self.simulation_executor.epoch()
def start(self):
'''
Starts new executor epoch in separate thread.
'''
self.is_simulation_running = True
t = threading.Thread(target=self.run_simulation, daemon=True)
@ -86,12 +99,16 @@ class SimulationWebServer():
def stop(self):
'''
On POST /stop, stops the executor. The is not immediately, first
the is_simulation_running flag is set to False value, and next
epoc of executor won't be executed.
'''
self.is_simulation_running = False
return ('', 204)
def stats(self):
'''
Returns the simulation stats. Queries per second.
'''
if not self.simulation_stats:
return ('', 204)
@ -100,16 +117,19 @@ class SimulationWebServer():
def params_get(self):
'''
Returns simulation parameters.
'''
return jsonify(self.simulation_params.json_data())
def params_set(self):
'''
Sets simulation parameters.
'''
data = request.get_json()
param_names = ['protocol', 'port', 'workers_number', 'period_time',
'queries_per_second', 'workers_per_query']
param_names = ['protocol', 'host', 'port', 'username', 'password',
'period_time', 'queries_per_second',
'workers_per_query']
for param in param_names:
if param in data: