demo server refactor, still not enough good solution, client must also be implemented in c/c++

This commit is contained in:
Marko Budiselic 2016-03-06 23:03:12 +01:00
parent 35482711a8
commit f9765d61af
5 changed files with 70 additions and 36 deletions

View File

@ -1,17 +1,20 @@
FROM ubuntu:16.04
FROM ubuntu:14.04
RUN apt-get update
RUN apt-get install -y python3.5 python3.5-dev python3-pip python3-setuptools
RUN apt-get install -y python3.4 python3.4-dev python3-pip python3-setuptools
# RUN rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN pip3 install flask
COPY simulation /app/simulation
COPY demo_server_run.py /app/demo_server_init.py
COPY demo_server_init.py /app/demo_server_init.py
WORKDIR /app
ENV MEMGRAPH_DEMO prod
# uwsgi --http-socket 0.0.0.0:8080 --module demo_server_init:app --master --enable-threads
# uwsgi --http-socket 0.0.0.0:8080 --module demo_server_init:app \
# --master --enable-threads
EXPOSE 8080
CMD ["python3", "demo_server_init.py"]

View File

@ -1,11 +1,14 @@
# -*- coding: utf-8 -*-
import os
import json
import time
import logging
import itertools
import urllib.request
import http
from concurrent.futures import ProcessPoolExecutor
from .substitutor import substitute
from .epoch_result import SimulationEpochResult
from .group_result import SimulationGroupResult
from .iteration_result import SimulationIterationResult
@ -13,7 +16,7 @@ from .iteration_result import SimulationIterationResult
log = logging.getLogger(__name__)
def calculate_qps(results):
def calculate_qps(results, delta_t=None):
'''
Calculates queris per second for the results list. The main idea
is to scale up results to the result with the biggest execution time.
@ -30,9 +33,10 @@ def calculate_qps(results):
:param results: list of SimulationIterationResult objects
:returns: queries per second result calculated on the input list
'''
max_delta_t = max([result.delta_t for result in results])
qps = sum([result.count * (max_delta_t / result.delta_t)
for result in results]) / max_delta_t
min_start_time = min([result.start_time for result in results])
max_end_time = max([result.end_time for result in results])
delta_t = max_end_time - min_start_time
qps = sum([result.count for result in results]) / delta_t
return qps
@ -53,24 +57,22 @@ class SimulationExecutor(object):
self.pool = ProcessPoolExecutor
return self
def send(self, query):
def send(self, connection, query):
'''
Sends the query to the graph database.
TODO: replace random arguments
:param connection: http.client.HTTPConnection
:param query: str, query string
'''
# requests.post('http://localhost:7474/db/data/transaction/commit',
# json={
# "statements": [
# {"statement": query}
# ]
# },
# headers={'Authorization': 'Basic bmVvNGo6cGFzcw=='})
# requests.get('http://localhost:7474/db/data/ping')
response = urllib.request.urlopen('http://localhost:7474/db/data/ping')
response.read()
body = json.dumps({'statements': [{'statement': substitute(query)}]})
headers = {
'Authorization': self.params.authorization,
'Content-Type': 'application/json'
}
connection.request('POST', '/db/data/transaction/commit',
body=body, headers=headers)
response = connection.getresponse()
log.debug('New response: %s' % response.read())
def iteration(self, task):
'''
@ -85,39 +87,54 @@ class SimulationExecutor(object):
count = 0
delta_t = 0
log.debug("New worker with PID: %s" % os.getpid())
connection = http.client.HTTPConnection(
self.params.host, self.params.port)
connection.connect()
start_time = time.time()
for i in range(self.params.queries_per_period):
# send the query on execution
start_time = time.time()
self.send(task.query)
end_time = time.time()
self.send(connection, task.query)
# calculate delta time
delta_t = delta_t + (end_time - start_time)
end_time = time.time()
delta_t = end_time - start_time
count = count + 1
if delta_t > self.params.period_time:
break
return SimulationIterationResult(task.id, count, delta_t)
connection.close()
return SimulationIterationResult(task.id, count, start_time, end_time)
def epoch(self):
'''
Single simulation epoc. All workers are going to execute
their queries in the period that is period_time seconds length.
'''
log.info('epoch')
log.debug('epoch')
with self.pool() as executor:
max_workers = self.params.workers_per_query * len(self.params.tasks)
log.info('pool iter')
with self.pool(max_workers=max_workers) as executor:
log.debug('pool iter')
# execute all tasks
start_time = time.time()
futures = [executor.submit(self.iteration, task)
for task in self.params.tasks
for i in range(self.params.workers_per_query)]
results = [future.result() for future in futures]
end_time = time.time()
epoch_time = end_time - start_time
log.info("Total epoch time: %s" % epoch_time)
# per query calculation
grouped = itertools.groupby(results, lambda x: x.id)
@ -127,4 +144,7 @@ class SimulationExecutor(object):
# for all calculation
for_all = calculate_qps(results)
log.info('Queries per period: %s' % sum([r.count
for r in results]))
return SimulationEpochResult(per_query, for_all)

View File

@ -6,7 +6,7 @@ class SimulationIterationResult(object):
Encapsulates single worker result.
'''
def __init__(self, id, count, delta_t):
def __init__(self, id, count, start_time, end_time):
'''
:param id: str, query id
:param count: int, number of the query exection
@ -14,7 +14,9 @@ class SimulationIterationResult(object):
'''
self.id = id
self.count = count
self.delta_t = delta_t
self.start_time = start_time
self.end_time = end_time
self.delta_t = end_time - start_time
self.queries_per_second = self.count / self.delta_t
def json_data(self):

View File

@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-
import base64
import logging
log = logging.getLogger(__name__)
class SimulationParams(object):
@ -76,6 +79,7 @@ class SimulationParams(object):
@username.setter
def username(self, value):
self._username = value
log.info("Username is now: %s" % self._username)
self.http_basic()
# password
@ -86,6 +90,7 @@ class SimulationParams(object):
@password.setter
def password(self, value):
self._password = value
log.info("Password is now: %s" % self._password)
self.http_basic()
def http_basic(self):
@ -93,10 +98,14 @@ class SimulationParams(object):
Recalculates http authorization header.
'''
try:
encoded = base64.b64encode(self.username + ":" + self.password)
self.authorization = {"Authorization": "Basic " + encoded}
except:
pass
encoded = base64.b64encode(
str.encode(self.username + ":" + self.password))
self.authorization = "Basic " + encoded.decode()
log.info("Authorization is now: %s" % self.authorization)
except AttributeError:
log.debug("Username or password isn't defined.")
except Exception as e:
log.exception(e)
# authorization
@property