From ab24792ef1bb72a0b3a72f2148d70700a6c7f309 Mon Sep 17 00:00:00 2001 From: Marko Budiselic <mbudiselicbuda@gmail.com> Date: Sun, 28 Feb 2016 19:59:07 +0100 Subject: [PATCH] demo server username + password params, workers_number param was removed --- demo/demo_server_run.py | 2 +- demo/memgraph_demo.docker | 5 +- demo/simulation/epoch_result.py | 6 ++ demo/simulation/executor.py | 86 +++++++++++++++++++---------- demo/simulation/group_result.py | 11 +++- demo/simulation/iteration_result.py | 5 ++ demo/simulation/params.py | 65 +++++++++++++++++----- demo/simulation/task.py | 3 + demo/simulation/web_server.py | 28 ++++++++-- 9 files changed, 157 insertions(+), 54 deletions(-) diff --git a/demo/demo_server_run.py b/demo/demo_server_run.py index 3014d7760..ff628aced 100644 --- a/demo/demo_server_run.py +++ b/demo/demo_server_run.py @@ -9,7 +9,7 @@ from simulation.web_server import SimulationWebServer def main(): ''' - The frontend run script. Environment could be configured + The demo server run script. Environment could be configured via the MEMGRAPH_DEMO environtment variable. Available environments are: debug, prod. ''' diff --git a/demo/memgraph_demo.docker b/demo/memgraph_demo.docker index 98439c05b..dfd379cd7 100644 --- a/demo/memgraph_demo.docker +++ b/demo/memgraph_demo.docker @@ -1,9 +1,8 @@ FROM ubuntu:16.04 RUN apt-get update -RUN apt-get install -y python3.5 python3-pip -RUN rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* +RUN apt-get install -y git -RUN pip3 install flask requests +RUN git clone https://pullbot:JnSdamFGKOanF1@phabricator.tomicevic.com/diffusion/MG/memgraph.git CMD ["/bin/bash"] diff --git a/demo/simulation/epoch_result.py b/demo/simulation/epoch_result.py index ade054e45..437b81846 100644 --- a/demo/simulation/epoch_result.py +++ b/demo/simulation/epoch_result.py @@ -3,16 +3,22 @@ class SimulationEpochResult(object): ''' + Encapsulates single epoch result. ''' def __init__(self, per_query, for_all): ''' + Sets per_query and for_all results. + + :param per_query: list of SimulationGroupResult objects + :param for_all: float, queries per second ''' self.per_query = per_query self.for_all = for_all def json_data(self): ''' + :returns: dict, epoch results ''' return { "per_query": [item.json_data() for item in self.per_query], diff --git a/demo/simulation/executor.py b/demo/simulation/executor.py index 02c9ce076..8d415fc70 100644 --- a/demo/simulation/executor.py +++ b/demo/simulation/executor.py @@ -7,7 +7,7 @@ import requests from concurrent.futures import ProcessPoolExecutor from .epoch_result import SimulationEpochResult -from .group_result import SimulationGrupeResult +from .group_result import SimulationGroupResult from .iteration_result import SimulationIterationResult log = logging.getLogger(__name__) @@ -15,6 +15,20 @@ log = logging.getLogger(__name__) def calculate_qps(results): ''' + Calculates queris per second for the results list. The main idea + is to scale up results to the result with the biggest execution time. + + Example: + Let say that 2 workers execute the same query. First worker + executes 100 queries in 1s, second worker executes 10 queries in 10s. + In that case first query result has to be scaled up. + Here we are goint to make aproximation, if first worker + execution time was 10s, it would execute 1000 queries. + So, the total speed is (1000q + 10q) / 10s = 101 qps. In that case + 101 would be returned from this function. + + :param results: list of SimulationIterationResult objects + :returns: queries per second result calculated on the input list ''' max_delta_t = max([result.delta_t for result in results]) qps = sum([result.count * (max_delta_t / result.delta_t) @@ -22,45 +36,63 @@ def calculate_qps(results): return qps -def send(query): - ''' - ''' - requests.post('http://localhost:7474/db/data/transaction/commit', - json={ - "statements": [ - {"statement": query} - ] - }, - headers={'Authorization': 'Basic bmVvNGo6cGFzcw=='}) - - class SimulationExecutor(object): ''' + The main executor object. Every period the instance of this class + will execute all queries, collect the results and calculate speed of + queries execution. ''' - def __init__(self): - ''' - ''' - pass - def setup(self, params): ''' + Setup params and initialize the workers pool. + + :param params: SimulationParams object ''' self.params = params + self.pool = ProcessPoolExecutor return self + def send(self, query): + ''' + Sends the query to the graph database. + + TODO: replace random arguments + + :param query: str, query string + ''' + requests.post('http://localhost:7474/db/data/transaction/commit', + json={ + "statements": [ + {"statement": query} + ] + }, + headers={'Authorization': 'Basic bmVvNGo6cGFzcw=='}) + def iteration(self, task): ''' + Executes the task. Task encapsulates the informations about query. + The task is smallest piece of work and this method will try to execute + queries (one query, more times) from the task as fastest as possible. + Execution time of this method is constrained with the period_time time. + + :param task: instance of SimulationTask class. + :returns: SimulationIterationResult ''' count = 0 - start_time = time.time() + delta_t = 0 for i in range(self.params.queries_per_period): - send(task.query) + # send the query on execution + start_time = time.time() + self.send(task.query) end_time = time.time() + + # calculate delta time + delta_t = delta_t + (end_time - start_time) + count = count + 1 - delta_t = end_time - start_time if delta_t > self.params.period_time: break @@ -69,11 +101,12 @@ class SimulationExecutor(object): def epoch(self): ''' + Single simulation epoc. All workers are going to execute + their queries in the period that is period_time seconds length. ''' log.info('epoch') - pool = ProcessPoolExecutor - with pool(max_workers=self.params.workers_number) as executor: + with self.pool() as executor: # execute all tasks futures = [executor.submit(self.iteration, task) @@ -83,15 +116,10 @@ class SimulationExecutor(object): # per query calculation grouped = itertools.groupby(results, lambda x: x.id) - per_query = [SimulationGrupeResult(id, calculate_qps(list(tasks))) + per_query = [SimulationGroupResult(id, calculate_qps(list(tasks))) for id, tasks in grouped] # for all calculation for_all = calculate_qps(results) return SimulationEpochResult(per_query, for_all) - - def execute(self): - ''' - ''' - return self.epoch() diff --git a/demo/simulation/group_result.py b/demo/simulation/group_result.py index a798eebd3..8c34eab0d 100644 --- a/demo/simulation/group_result.py +++ b/demo/simulation/group_result.py @@ -1,18 +1,23 @@ # -*- coding: utf-8 -*- -class SimulationGrupeResult(object): +class SimulationGroupResult(object): ''' + Encapsulates query per seconds information for qroup of workers + (all workers that execute the same query). ''' - def __init__(self, id, queries_per_period): + def __init__(self, id, queries_per_second): ''' + :param id: str, query id + :param queries_per_second: float, queries per second ''' self.id = id - self.queries_per_second = queries_per_period + self.queries_per_second = queries_per_second def json_data(self): ''' + :returns: dict, {query_id(str):queries_per_second(float)} ''' return { 'id': self.id, diff --git a/demo/simulation/iteration_result.py b/demo/simulation/iteration_result.py index 4e2103d76..6f9d5c2aa 100644 --- a/demo/simulation/iteration_result.py +++ b/demo/simulation/iteration_result.py @@ -3,10 +3,14 @@ class SimulationIterationResult(object): ''' + Encapsulates single worker result. ''' def __init__(self, id, count, delta_t): ''' + :param id: str, query id + :param count: int, number of the query exection + :param delta_t: time of execution ''' self.id = id self.count = count @@ -15,6 +19,7 @@ class SimulationIterationResult(object): def json_data(self): ''' + :returns: dict {query_id(str):queries_per_second(float)} ''' return { "id": self.id, diff --git a/demo/simulation/params.py b/demo/simulation/params.py index bad402fc3..6611b26dd 100644 --- a/demo/simulation/params.py +++ b/demo/simulation/params.py @@ -1,37 +1,44 @@ # -*- coding: utf-8 -*- +import base64 + class SimulationParams(object): ''' + Encapsulates the simulation params. ''' def __init__(self): ''' + Setup default params values. ''' self.protocol = 'http' self.host = 'localhost' self.port = 7474 + self.username = '' + self.password = '' - self.workers_number = 16 self.period_time = 0.5 self.workers_per_query = 1 self.queries_per_second = 15000 - self.recalculate_qps() + self.recalculate_qpp() self.tasks = [] def json_data(self): ''' + :returns: dict with all param values ''' return { "protocol": self.protocol, "host": self.host, "port": self.port, - "workers_number": self.workers_number, + "username": self.username, + "password": self.password, "period_time": self.period_time, "workers_per_query": self.workers_per_query, - "queries_per_second": self.queries_per_second, - "queries_per_period": self.queries_per_period + "queries_per_period": self.queries_per_period, + "queries_per_second": self.queries_per_second } # protocol @@ -61,14 +68,44 @@ class SimulationParams(object): def port(self, value): self._port = value - # workers number + # username @property - def workers_number(self): - return self._workers_number + def username(self): + return self._username - @workers_number.setter - def workers_number(self, value): - self._workers_number = value + @username.setter + def username(self, value): + self._username = value + self.http_basic() + + # password + @property + def password(self): + return self._password + + @password.setter + def password(self, value): + self._password = value + self.http_basic() + + def http_basic(self): + ''' + Recalculates http authorization header. + ''' + try: + encoded = base64.b64encode(self.username + ":" + self.password) + self.authorization = {"Authorization": "Basic " + encoded} + except: + pass + + # authorization + @property + def authorization(self): + return self._authorization + + @authorization.setter + def authorization(self, value): + self._authorization = value # workers per query @property @@ -87,8 +124,9 @@ class SimulationParams(object): @queries_per_second.setter def queries_per_second(self, value): self._queries_per_second = value + self.recalculate_qpp() - def recalculate_qps(self): + def recalculate_qpp(self): try: self.queries_per_period = \ int(self.queries_per_second * self.period_time) @@ -103,7 +141,6 @@ class SimulationParams(object): @queries_per_period.setter def queries_per_period(self, value): self._queries_per_period = value - self.recalculate_qps() # period time @property @@ -113,4 +150,4 @@ class SimulationParams(object): @period_time.setter def period_time(self, value): self._period_time = value - self.recalculate_qps() + self.recalculate_qpp() diff --git a/demo/simulation/task.py b/demo/simulation/task.py index 687e23fee..bdb9bc3e8 100644 --- a/demo/simulation/task.py +++ b/demo/simulation/task.py @@ -3,10 +3,13 @@ class SimulationTask(object): ''' + Encapsulates query data. ''' def __init__(self, id, query): ''' + :param id: query id + :param query: str, query string ''' self.id = id self.query = query diff --git a/demo/simulation/web_server.py b/demo/simulation/web_server.py index bcc69172d..083833b58 100644 --- a/demo/simulation/web_server.py +++ b/demo/simulation/web_server.py @@ -30,7 +30,13 @@ class SimulationWebServer(): def setup_routes(self): ''' Setup all available rutes: - /ping + GET /ping + POST /tasks + POST /start + POST /stop + GET /stats + GET /params + POST /params ''' self.server.add_url_rule('/ping', 'ping', self.ping) self.server.add_url_rule('/tasks', 'tasks', self.tasks, @@ -61,23 +67,30 @@ class SimulationWebServer(): def tasks(self): ''' + Register tasks. Task is object that encapsulates single query data. ''' data = request.get_json()['data'] + self.simulation_params.tasks = \ [SimulationTask(item['id'], item['query']) for item in data] + return ('', 200) def run_simulation(self): ''' + If flag is_simulation_running flag is up (True) the executor + epoch will be executed. Epochs will be executed until somebody + set is_simulation_running flag to Flase. ''' log.info('new simulation run') while self.is_simulation_running: - self.simulation_stats = self.simulation_executor.execute() + self.simulation_stats = self.simulation_executor.epoch() def start(self): ''' + Starts new executor epoch in separate thread. ''' self.is_simulation_running = True t = threading.Thread(target=self.run_simulation, daemon=True) @@ -86,12 +99,16 @@ class SimulationWebServer(): def stop(self): ''' + On POST /stop, stops the executor. The is not immediately, first + the is_simulation_running flag is set to False value, and next + epoc of executor won't be executed. ''' self.is_simulation_running = False return ('', 204) def stats(self): ''' + Returns the simulation stats. Queries per second. ''' if not self.simulation_stats: return ('', 204) @@ -100,16 +117,19 @@ class SimulationWebServer(): def params_get(self): ''' + Returns simulation parameters. ''' return jsonify(self.simulation_params.json_data()) def params_set(self): ''' + Sets simulation parameters. ''' data = request.get_json() - param_names = ['protocol', 'port', 'workers_number', 'period_time', - 'queries_per_second', 'workers_per_query'] + param_names = ['protocol', 'host', 'port', 'username', 'password', + 'period_time', 'queries_per_second', + 'workers_per_query'] for param in param_names: if param in data: