2016-02-28 22:59:10 +08:00
# -*- coding: utf-8 -*-
2016-03-07 06:03:12 +08:00
import os
import json
2016-02-28 22:59:10 +08:00
import time
import logging
import itertools
2016-03-07 06:03:12 +08:00
import http
2016-02-28 22:59:10 +08:00
from concurrent.futures import ProcessPoolExecutor
2016-03-07 06:03:12 +08:00
from .substitutor import substitute
2016-02-28 22:59:10 +08:00
from .epoch_result import SimulationEpochResult
2016-02-29 02:59:07 +08:00
from .group_result import SimulationGroupResult
2016-02-28 22:59:10 +08:00
from .iteration_result import SimulationIterationResult
log = logging.getLogger(__name__)
2016-03-07 06:03:12 +08:00
def calculate_qps(results, delta_t=None):
2016-02-28 22:59:10 +08:00
2016-02-29 02:59:07 +08:00
Calculates queris per second for the results list. The main idea
is to scale up results to the result with the biggest execution time.
Let say that 2 workers execute the same query. First worker
executes 100 queries in 1s, second worker executes 10 queries in 10s.
In that case first query result has to be scaled up.
Here we are goint to make aproximation, if first worker
execution time was 10s, it would execute 1000 queries.
So, the total speed is (1000q + 10q) / 10s = 101 qps. In that case
101 would be returned from this function.
:param results: list of SimulationIterationResult objects
:returns: queries per second result calculated on the input list
2016-02-28 22:59:10 +08:00
2016-03-07 06:03:12 +08:00
min_start_time = min([result.start_time for result in results])
max_end_time = max([result.end_time for result in results])
delta_t = max_end_time - min_start_time
qps = sum([result.count for result in results]) / delta_t
2016-02-28 22:59:10 +08:00
return qps
class SimulationExecutor(object):
2016-02-29 02:59:07 +08:00
The main executor object. Every period the instance of this class
will execute all queries, collect the results and calculate speed of
queries execution.
2016-02-28 22:59:10 +08:00
def setup(self, params):
2016-02-29 02:59:07 +08:00
Setup params and initialize the workers pool.
:param params: SimulationParams object
2016-02-28 22:59:10 +08:00
self.params = params
2016-02-29 02:59:07 +08:00
self.pool = ProcessPoolExecutor
2016-02-28 22:59:10 +08:00
return self
2016-03-07 06:03:12 +08:00
def send(self, connection, query):
2016-02-29 02:59:07 +08:00
Sends the query to the graph database.
2016-03-07 06:03:12 +08:00
:param connection: http.client.HTTPConnection
2016-02-29 02:59:07 +08:00
:param query: str, query string
2016-03-07 06:03:12 +08:00
body = json.dumps({'statements': [{'statement': substitute(query)}]})
headers = {
'Authorization': self.params.authorization,
'Content-Type': 'application/json'
connection.request('POST', '/db/data/transaction/commit',
body=body, headers=headers)
response = connection.getresponse()
log.debug('New response: %s' % response.read())
2016-02-29 02:59:07 +08:00
2016-02-28 22:59:10 +08:00
def iteration(self, task):
2016-02-29 02:59:07 +08:00
Executes the task. Task encapsulates the informations about query.
The task is smallest piece of work and this method will try to execute
queries (one query, more times) from the task as fastest as possible.
Execution time of this method is constrained with the period_time time.
:param task: instance of SimulationTask class.
:returns: SimulationIterationResult
2016-02-28 22:59:10 +08:00
count = 0
2016-02-29 02:59:07 +08:00
delta_t = 0
2016-02-28 22:59:10 +08:00
2016-03-07 06:03:12 +08:00
log.debug("New worker with PID: %s" % os.getpid())
connection = http.client.HTTPConnection(
self.params.host, self.params.port)
start_time = time.time()
2016-02-28 22:59:10 +08:00
for i in range(self.params.queries_per_period):
2016-02-29 02:59:07 +08:00
# send the query on execution
2016-03-07 06:03:12 +08:00
self.send(connection, task.query)
2016-02-29 02:59:07 +08:00
# calculate delta time
2016-03-07 06:03:12 +08:00
end_time = time.time()
delta_t = end_time - start_time
2016-02-29 02:59:07 +08:00
2016-02-28 22:59:10 +08:00
count = count + 1
if delta_t > self.params.period_time:
2016-03-07 06:03:12 +08:00
return SimulationIterationResult(task.id, count, start_time, end_time)
2016-02-28 22:59:10 +08:00
def epoch(self):
2016-02-29 02:59:07 +08:00
Single simulation epoc. All workers are going to execute
their queries in the period that is period_time seconds length.
2016-02-28 22:59:10 +08:00
2016-03-07 06:03:12 +08:00
max_workers = self.params.workers_per_query * len(self.params.tasks)
2016-02-28 22:59:10 +08:00
2016-03-07 06:03:12 +08:00
with self.pool(max_workers=max_workers) as executor:
2016-02-28 22:59:10 +08:00
2016-03-07 06:03:12 +08:00
log.debug('pool iter')
2016-03-05 17:10:57 +08:00
2016-02-28 22:59:10 +08:00
# execute all tasks
2016-03-07 06:03:12 +08:00
start_time = time.time()
2016-02-28 22:59:10 +08:00
futures = [executor.submit(self.iteration, task)
for task in self.params.tasks
for i in range(self.params.workers_per_query)]
results = [future.result() for future in futures]
2016-03-07 06:03:12 +08:00
end_time = time.time()
epoch_time = end_time - start_time
log.info("Total epoch time: %s" % epoch_time)
2016-02-28 22:59:10 +08:00
# per query calculation
grouped = itertools.groupby(results, lambda x: x.id)
2016-02-29 02:59:07 +08:00
per_query = [SimulationGroupResult(id, calculate_qps(list(tasks)))
2016-02-28 22:59:10 +08:00
for id, tasks in grouped]
# for all calculation
for_all = calculate_qps(results)
2016-03-07 06:03:12 +08:00
log.info('Queries per period: %s' % sum([r.count
for r in results]))
2016-02-28 22:59:10 +08:00
return SimulationEpochResult(per_query, for_all)