fixed a lot of problems with memgraph demo

This commit is contained in:
Dominik Tomičević 2016-03-15 20:13:31 +01:00
parent a90d67e324
commit 5442f4b0b6
13 changed files with 130 additions and 61 deletions

View File

@ -15,33 +15,34 @@ class Demo : public Resource<Demo, POST>
{
public:
QueryStripper<int, int, int, int> stripper;
std::unordered_map<uint64_t,
std::function<std::string(const code_args_t&)>> query_f;
std::map<uint64_t, std::function<std::string(const code_args_t&)>> query_f;
Demo(Task::sptr task, Db::sptr db) :
Resource(task, db),
stripper(make_query_stripper(TK_INT, TK_FLOAT, TK_STR, TK_BOOL))
{
query_f[7130961997552177283] = [db](const code_args_t& args) {
/* std::cout << "PRVI" << std::endl; */
auto& t = db->tx_engine.begin();
auto vertex_accessor = db->graph.vertices.insert(t);
vertex_accessor.property(
"id", args[0]
);
/* auto vertex_accessor = db->graph.vertices.insert(t); */
/* vertex_accessor.property( */
/* "id", args[0] */
/* ); */
t.commit();
return "EXECUTED: CREATE (n{id:X}) RETURN n";
};
query_f[11198568396549106428ull] = [db](const code_args_t& args) {
/* std::cout << "DRUGI" << std::endl; */
auto& t = db->tx_engine.begin();
auto id = args[0]->as<Int32>();
std::cout << id.value << std::endl;
auto vertex_accessor = db->graph.vertices.find(t, id.value);
t.commit();
return "EXECUTED: MATCH (n{id:X}) RETURN n";
};
query_f[11637140396624918705ull] = [db](const code_args_t& args) {
/* std::cout << "TRECI" << std::endl; */
auto& t = db->tx_engine.begin();
auto id = args[0]->as<Int32>();
auto vertex_accessor = db->graph.vertices.find(t, id.value);
@ -57,9 +58,11 @@ public:
};
query_f[784140140862470291ull] = [db](const code_args_t& args) {
/* std::cout << "CETVRTI" << std::endl; */
auto& t = db->tx_engine.begin();
auto id1 = args[0]->as<Int32>();
auto v1 = db->graph.vertices.find(t, id1.value);
if (!v1) {
t.commit();
return "FAIL TO FIND NODE";
@ -71,16 +74,23 @@ public:
return "FAIL TO FIND NODE";
}
auto edge_accessor = db->graph.edges.insert(t);
return t.commit(), "AAA";
v1.vlist->access(t).update()->data.out.add(edge_accessor.vlist);
v2.vlist->access(t).update()->data.in.add(edge_accessor.vlist);
edge_accessor.from(v1.vlist);
edge_accessor.to(v2.vlist);
edge_accessor.edge_type(EdgeType("test"));
t.commit();
return "EXECUTED: EDGE CREATED";
};
query_f[16940444920835972350ull] = [db](const code_args_t& args) {
/* std::cout << "PETI" << std::endl; */
auto& t = db->tx_engine.begin();
auto id = args[0]->as<Int32>();
auto v = db->graph.vertices.find(t, id.value);
@ -89,11 +99,32 @@ public:
};
}
int x{0};
void post(sp::Request& req, sp::Response& res)
{
auto query = req.json["statements"][0]["statement"].GetString();
auto strip = stripper.strip(query);
auto hash = fnv(strip.query);
return res.send(http::Status::Ok, query_f[hash](strip.arguments));
task->run([this, &req]() {
//return res.send(http::Status::Ok, "alo");
auto query = req.json["statements"][0]["statement"].GetString();
auto strip = stripper.strip(query);
auto hash = fnv(strip.query);
/* std::cout << "'" << query << "'" << std::endl; */
auto it = query_f.find(hash);
if(it == query_f.end())
{
std::cout << "NOT FOUND" << std::endl;
std::cout << query << std::endl;
std::cout << hash << std::endl;
return std::string("NOT FOUND");
}
return it->second(strip.arguments);
},
[&req, &res](std::string str) {
return res.send(http::Status::Ok, str);
});
}
};

View File

@ -11,8 +11,8 @@ template <class W>
class WorkerRunner
{
public:
WorkerRunner(const std::vector<std::string>& queries)
: worker(std::make_unique<W>(queries)) {}
WorkerRunner(const std::string& query)
: worker(std::make_unique<W>(query)) {}
W* operator->() { return worker.get(); }
const W* operator->() const { return worker.get(); }
@ -45,9 +45,9 @@ Result benchmark(const std::string& host, const std::string& port,
std::vector<WorkerRunner<CypherWorker>> workers;
for(int i = 0; i < threads; ++i)
workers.emplace_back(queries);
workers.emplace_back(queries[i]);
for(int i = 0; i < connections; ++i)
for(int i = 0; i < threads * connections; ++i)
workers[i % threads]->connect(host, port);
for(auto& worker : workers)
@ -67,11 +67,10 @@ Result benchmark(const std::string& host, const std::string& port,
auto end = std::max_element(results.begin(), results.end(),
[](auto a, auto b) { return a.end < b.end; })->end;
std::vector<uint64_t> qps(queries.size(), 0);
std::vector<uint64_t> qps;
for(auto& result : results)
for(size_t i = 0; i < result.requests.size(); ++i)
qps[i] += result.requests[i];
qps.push_back(result.requests);
return {end - start, qps};
}

View File

@ -8,7 +8,7 @@
void help()
{
std::cout << "error: too few arguments." << std::endl
<< "usage: host port threads connections duration[s]"
<< "usage: host port connections duration[s]"
<< std::endl;
std::exit(0);
@ -16,22 +16,34 @@ void help()
int main(int argc, char* argv[])
{
if(argc < 6)
if(argc < 5)
help();
auto host = std::string(argv[1]);
auto port = std::string(argv[2]);
auto threads = std::stoi(argv[3]);
auto connections = std::stoi(argv[4]);
auto duration = std::stod(argv[5]);
auto connections = std::stoi(argv[3]);
auto duration = std::stod(argv[4]);
// memgraph
std::vector<std::string> queries {
"CREATE (n{id:@}) RETURN n",
"MATCH (n{id:#}),(m{id:#}) CREATE (n)-[r:test]->(m) RETURN r",
"MATCH (n{id:#}) SET n.prop = ^ RETURN n",
"MATCH (n{id:#})-[r]->(m) RETURN count(r)"
/* "MATCH (n{id:#}),(m{id:#}) CREATE (n)-[r:test]->(m) RETURN r", */
/* "MATCH (n{id:#}) SET n.prop = ^ RETURN n", */
/* "MATCH (n{id:#}) RETURN n", */
/* "MATCH (n{id:#})-[r]->(m) RETURN count(r)" */
};
// neo4j
/* std::vector<std::string> queries { */
/* "CREATE (n:Item{id:@}) RETURN n", */
/* "MATCH (n:Item{id:#}),(m:Item{id:#}) CREATE (n)-[r:test]->(m) RETURN r", */
/* "MATCH (n:Item{id:#}) SET n.prop = ^ RETURN n", */
/* "MATCH (n:Item{id:#}) RETURN n", */
/* "MATCH (n:Item{id:#})-[r]->(m) RETURN count(r)" */
/* }; */
auto threads = queries.size();
std::cout << "Running queries on " << connections << " connections "
<< "using " << threads << " threads "
<< "for " << duration << " seconds." << std::endl

View File

@ -15,13 +15,15 @@ public:
std::string operator()(Rg&& gen, size_t len)
{
auto str = std::string();
str.reserve(len + 2);
str.push_back('\'');
str.reserve(len + 4);
str.push_back('\\');
str.push_back('"');
while(str.size() < len)
str.push_back(charset[rnd(std::forward<Rg>(gen))]);
str.push_back('\'');
str.push_back('\\');
str.push_back('"');
return str;
}

View File

@ -18,14 +18,14 @@
struct WorkerResult
{
std::chrono::high_resolution_clock::time_point start, end;
std::vector<uint64_t> requests;
uint64_t requests;
};
class CypherWorker : public SimpleClient<CypherWorker, io::tcp::Stream>
{
public:
CypherWorker(const std::vector<std::string>& queries)
: queries(queries), requests(queries.size(), 0) {}
CypherWorker(const std::string& query)
: query(query), requests(0) {}
io::tcp::Stream& on_connect(io::Socket&& socket)
{
@ -43,22 +43,18 @@ public:
/* std::cout << "-----------------------------------------------" << std::endl; */
/* std::cout << std::endl; */
requests++;
send(stream.socket);
}
void send(io::Socket& socket)
{
auto idx = random_int(mt) % queries.size();
// increase the number of requests
requests[idx]++;
// cypherize and send the request
//socket.write(cypher(queries[idx]));
auto req = cypher(queries[idx]);
auto req = cypher(query);
/* std::cout << "-------------------- REQUEST ------------------" << std::endl; */
/* std::cout << req << std::endl; */
/* std::cout << "-------------------- REQUEST ------------------" << std::endl; */
/* std::cout << req << std::endl; */
socket.write(req);
}
@ -87,6 +83,6 @@ private:
Cypher cypher;
std::vector<std::unique_ptr<io::tcp::Stream>> streams;
std::vector<std::string> queries;
std::vector<uint64_t> requests;
std::string query;
uint64_t requests;
};

View File

@ -11,30 +11,30 @@ class LazyGC : public Crtp<Derived>
public:
void add_ref()
{
ref_count.fetch_add(1, std::memory_order_relaxed);
/* ref_count.fetch_add(1, std::memory_order_relaxed); */
}
void release_ref()
{
// get refcount and subtract atomically
auto count = ref_count.fetch_sub(1, std::memory_order_acq_rel);
/* // get refcount and subtract atomically */
/* auto count = ref_count.fetch_sub(1, std::memory_order_acq_rel); */
// fetch_sub first returns and then subtrarcts so the refcount is
// zero when fetch_sub returns 1
if(count != 1)
return;
/* // fetch_sub first returns and then subtrarcts so the refcount is */
/* // zero when fetch_sub returns 1 */
/* if(count != 1) */
/* return; */
if(!dirty.load(std::memory_order_acquire))
return;
/* if(!dirty.load(std::memory_order_acquire)) */
/* return; */
auto guard = this->derived().gc_lock_acquire();
/* auto guard = this->derived().gc_lock_acquire(); */
if(!dirty.load(std::memory_order_acquire))
return;
/* if(!dirty.load(std::memory_order_acquire)) */
/* return; */
this->derived().vacuum();
/* this->derived().vacuum(); */
dirty.store(false, std::memory_order_release);
/* dirty.store(false, std::memory_order_release); */
}
protected:

View File

@ -87,11 +87,22 @@ public:
return committed(hints.exp, id, t);
}
bool exp_committed(const tx::Transaction& t)
{
return committed(hints.exp, tx.exp(), t);
}
bool cre_committed(const Id& id, const tx::Transaction& t)
{
return committed(hints.cre, id, t);
}
bool cre_committed(const tx::Transaction& t)
{
return committed(hints.cre, tx.cre(), t);
}
protected:
template <class U>
bool committed(U& hints, const Id& id, const tx::Transaction& t)
{

View File

@ -233,7 +233,7 @@ private:
// if the record hasn't been deleted yet or the deleting transaction
// has aborted, it's ok to modify it
if(!record->tx.exp() || record->hints.load().exp.is_aborted())
if(!record->tx.exp() || !record->exp_committed(t))
return;
// if it committed, then we have a serialization conflict

View File

@ -20,7 +20,10 @@ template <class Req, class Res>
void HttpConnection<Req, Res>::close()
{
client.close([](uv_handle_t* client) -> void {
delete reinterpret_cast<connection_t*>(client->data);
// terrible bug, this can happen even though a query is running and
// then the query is using memory which is deallocated
//delete reinterpret_cast<connection_t*>(client->data);
});
}

View File

@ -25,6 +25,12 @@ void Response<Req, Res>::send(Status status, const std::string& body)
template <class Req, class Res>
void Response<Req, Res>::send(const std::string& body)
{
// terrible bug. if the client closes the connection, buffer is cleared and
// there is noone to respond to and you start writing to memory that isn't
// yours anymore
if(buffer.count() == 0)
return;
uv_write_t* write_req =
static_cast<uv_write_t*>(write_req_allocator.acquire());
@ -41,10 +47,16 @@ void Response<Req, Res>::send(const std::string& body)
buffer << it->first << ":" << it->second << "\r\n";
buffer << "\r\n" << body;
/* std::cout << "SALJEM RESPONSE" << std::endl; */
/* std::cout << body << std::endl; */
/* std::cout << connection.request.body << std::endl; */
/* std::cout << "buffer count:" << buffer.count() << std::endl; */
uv_write(write_req, connection.client, buffer, buffer.count(),
[](uv_write_t* write_req, int) {
/* std::cout << "POSLAO RESPONSE" << std::endl; */
connection_t& conn = *reinterpret_cast<connection_t*>(write_req->data);
if(!conn.keep_alive)

View File

@ -71,7 +71,7 @@ public:
server.listen(ip, [this](Request& req, Response& res) {
auto route = router.match(R3::to_r3_method(req.method), req.url);
if(!route.exists())
return res.send(http::Status::NotFound, "Resource not found");

View File

@ -157,7 +157,7 @@ private:
void relax()
{
usleep(250);
cpu_relax();
}
};

View File

@ -3,6 +3,8 @@
#include <atomic>
#include <unistd.h>
#include "utils/cpu_relax.hpp"
class SpinLock
{
public:
@ -10,7 +12,8 @@ public:
void lock()
{
while(lock_flag.test_and_set(std::memory_order_acquire))
usleep(250);
cpu_relax();
///usleep(250);
}
void unlock()