From f9765d61af1d7574186e8fa006ff38a30e5b341b Mon Sep 17 00:00:00 2001 From: Marko Budiselic <mbudiselicbuda@gmail.com> Date: Sun, 6 Mar 2016 23:03:12 +0100 Subject: [PATCH] demo server refactor, still not enough good solution, client must also be implemented in c/c++ --- demo/demo_server.docker | 11 +++-- demo/simulation/executor.py | 72 ++++++++++++++++++---------- demo/simulation/iteration_result.py | 6 ++- demo/simulation/params.py | 17 +++++-- demo/{ => simulation}/substitutor.py | 0 5 files changed, 70 insertions(+), 36 deletions(-) rename demo/{ => simulation}/substitutor.py (100%) diff --git a/demo/demo_server.docker b/demo/demo_server.docker index b0bca606f..85646ad90 100644 --- a/demo/demo_server.docker +++ b/demo/demo_server.docker @@ -1,17 +1,20 @@ -FROM ubuntu:16.04 +FROM ubuntu:14.04 RUN apt-get update -RUN apt-get install -y python3.5 python3.5-dev python3-pip python3-setuptools +RUN apt-get install -y python3.4 python3.4-dev python3-pip python3-setuptools # RUN rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN pip3 install flask COPY simulation /app/simulation -COPY demo_server_run.py /app/demo_server_init.py +COPY demo_server_init.py /app/demo_server_init.py WORKDIR /app ENV MEMGRAPH_DEMO prod -# uwsgi --http-socket 0.0.0.0:8080 --module demo_server_init:app --master --enable-threads +# uwsgi --http-socket 0.0.0.0:8080 --module demo_server_init:app \ +# --master --enable-threads +EXPOSE 8080 + CMD ["python3", "demo_server_init.py"] diff --git a/demo/simulation/executor.py b/demo/simulation/executor.py index ab7717abe..46a97a98c 100644 --- a/demo/simulation/executor.py +++ b/demo/simulation/executor.py @@ -1,11 +1,14 @@ # -*- coding: utf-8 -*- +import os +import json import time import logging import itertools -import urllib.request +import http from concurrent.futures import ProcessPoolExecutor +from .substitutor import substitute from .epoch_result import SimulationEpochResult from .group_result import SimulationGroupResult from .iteration_result import SimulationIterationResult @@ -13,7 +16,7 @@ from .iteration_result import SimulationIterationResult log = logging.getLogger(__name__) -def calculate_qps(results): +def calculate_qps(results, delta_t=None): ''' Calculates queris per second for the results list. The main idea is to scale up results to the result with the biggest execution time. @@ -30,9 +33,10 @@ def calculate_qps(results): :param results: list of SimulationIterationResult objects :returns: queries per second result calculated on the input list ''' - max_delta_t = max([result.delta_t for result in results]) - qps = sum([result.count * (max_delta_t / result.delta_t) - for result in results]) / max_delta_t + min_start_time = min([result.start_time for result in results]) + max_end_time = max([result.end_time for result in results]) + delta_t = max_end_time - min_start_time + qps = sum([result.count for result in results]) / delta_t return qps @@ -53,24 +57,22 @@ class SimulationExecutor(object): self.pool = ProcessPoolExecutor return self - def send(self, query): + def send(self, connection, query): ''' Sends the query to the graph database. - TODO: replace random arguments - + :param connection: http.client.HTTPConnection :param query: str, query string ''' - # requests.post('http://localhost:7474/db/data/transaction/commit', - # json={ - # "statements": [ - # {"statement": query} - # ] - # }, - # headers={'Authorization': 'Basic bmVvNGo6cGFzcw=='}) - # requests.get('http://localhost:7474/db/data/ping') - response = urllib.request.urlopen('http://localhost:7474/db/data/ping') - response.read() + body = json.dumps({'statements': [{'statement': substitute(query)}]}) + headers = { + 'Authorization': self.params.authorization, + 'Content-Type': 'application/json' + } + connection.request('POST', '/db/data/transaction/commit', + body=body, headers=headers) + response = connection.getresponse() + log.debug('New response: %s' % response.read()) def iteration(self, task): ''' @@ -85,39 +87,54 @@ class SimulationExecutor(object): count = 0 delta_t = 0 + log.debug("New worker with PID: %s" % os.getpid()) + + connection = http.client.HTTPConnection( + self.params.host, self.params.port) + connection.connect() + + start_time = time.time() + for i in range(self.params.queries_per_period): # send the query on execution - start_time = time.time() - self.send(task.query) - end_time = time.time() + self.send(connection, task.query) # calculate delta time - delta_t = delta_t + (end_time - start_time) + end_time = time.time() + delta_t = end_time - start_time count = count + 1 if delta_t > self.params.period_time: break - return SimulationIterationResult(task.id, count, delta_t) + connection.close() + + return SimulationIterationResult(task.id, count, start_time, end_time) def epoch(self): ''' Single simulation epoc. All workers are going to execute their queries in the period that is period_time seconds length. ''' - log.info('epoch') + log.debug('epoch') - with self.pool() as executor: + max_workers = self.params.workers_per_query * len(self.params.tasks) - log.info('pool iter') + with self.pool(max_workers=max_workers) as executor: + + log.debug('pool iter') # execute all tasks + start_time = time.time() futures = [executor.submit(self.iteration, task) for task in self.params.tasks for i in range(self.params.workers_per_query)] results = [future.result() for future in futures] + end_time = time.time() + epoch_time = end_time - start_time + log.info("Total epoch time: %s" % epoch_time) # per query calculation grouped = itertools.groupby(results, lambda x: x.id) @@ -127,4 +144,7 @@ class SimulationExecutor(object): # for all calculation for_all = calculate_qps(results) + log.info('Queries per period: %s' % sum([r.count + for r in results])) + return SimulationEpochResult(per_query, for_all) diff --git a/demo/simulation/iteration_result.py b/demo/simulation/iteration_result.py index 6f9d5c2aa..b7c024e72 100644 --- a/demo/simulation/iteration_result.py +++ b/demo/simulation/iteration_result.py @@ -6,7 +6,7 @@ class SimulationIterationResult(object): Encapsulates single worker result. ''' - def __init__(self, id, count, delta_t): + def __init__(self, id, count, start_time, end_time): ''' :param id: str, query id :param count: int, number of the query exection @@ -14,7 +14,9 @@ class SimulationIterationResult(object): ''' self.id = id self.count = count - self.delta_t = delta_t + self.start_time = start_time + self.end_time = end_time + self.delta_t = end_time - start_time self.queries_per_second = self.count / self.delta_t def json_data(self): diff --git a/demo/simulation/params.py b/demo/simulation/params.py index 6611b26dd..7745400e1 100644 --- a/demo/simulation/params.py +++ b/demo/simulation/params.py @@ -1,6 +1,9 @@ # -*- coding: utf-8 -*- import base64 +import logging + +log = logging.getLogger(__name__) class SimulationParams(object): @@ -76,6 +79,7 @@ class SimulationParams(object): @username.setter def username(self, value): self._username = value + log.info("Username is now: %s" % self._username) self.http_basic() # password @@ -86,6 +90,7 @@ class SimulationParams(object): @password.setter def password(self, value): self._password = value + log.info("Password is now: %s" % self._password) self.http_basic() def http_basic(self): @@ -93,10 +98,14 @@ class SimulationParams(object): Recalculates http authorization header. ''' try: - encoded = base64.b64encode(self.username + ":" + self.password) - self.authorization = {"Authorization": "Basic " + encoded} - except: - pass + encoded = base64.b64encode( + str.encode(self.username + ":" + self.password)) + self.authorization = "Basic " + encoded.decode() + log.info("Authorization is now: %s" % self.authorization) + except AttributeError: + log.debug("Username or password isn't defined.") + except Exception as e: + log.exception(e) # authorization @property diff --git a/demo/substitutor.py b/demo/simulation/substitutor.py similarity index 100% rename from demo/substitutor.py rename to demo/simulation/substitutor.py