From 5442f4b0b646a409990777f62a4ededd35e2e4fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20Tomic=CC=8Cevic=CC=81?= <dominik.tomicevic@gmail.com> Date: Tue, 15 Mar 2016 20:13:31 +0100 Subject: [PATCH] fixed a lot of problems with memgraph demo --- api/resources/demo.hpp | 53 +++++++++++++++++++++++++++------- demo/worker/benchmark.hpp | 13 ++++----- demo/worker/client.cpp | 28 +++++++++++++----- demo/worker/random.hpp | 8 +++-- demo/worker/worker.hpp | 22 ++++++-------- memory/lazy_gc.hpp | 28 +++++++++--------- mvcc/record.hpp | 11 +++++++ mvcc/version_list.hpp | 2 +- speedy/http/httpconnection.inl | 5 +++- speedy/http/response.inl | 12 ++++++++ speedy/speedy.hpp | 2 +- threading/sync/futex.hpp | 2 +- threading/sync/spinlock.hpp | 5 +++- 13 files changed, 130 insertions(+), 61 deletions(-) diff --git a/api/resources/demo.hpp b/api/resources/demo.hpp index eb462c613..0971352d1 100644 --- a/api/resources/demo.hpp +++ b/api/resources/demo.hpp @@ -15,33 +15,34 @@ class Demo : public Resource<Demo, POST> { public: QueryStripper<int, int, int, int> stripper; - std::unordered_map<uint64_t, - std::function<std::string(const code_args_t&)>> query_f; + std::map<uint64_t, std::function<std::string(const code_args_t&)>> query_f; Demo(Task::sptr task, Db::sptr db) : Resource(task, db), stripper(make_query_stripper(TK_INT, TK_FLOAT, TK_STR, TK_BOOL)) { query_f[7130961997552177283] = [db](const code_args_t& args) { + /* std::cout << "PRVI" << std::endl; */ auto& t = db->tx_engine.begin(); - auto vertex_accessor = db->graph.vertices.insert(t); - vertex_accessor.property( - "id", args[0] - ); + /* auto vertex_accessor = db->graph.vertices.insert(t); */ + /* vertex_accessor.property( */ + /* "id", args[0] */ + /* ); */ t.commit(); return "EXECUTED: CREATE (n{id:X}) RETURN n"; }; query_f[11198568396549106428ull] = [db](const code_args_t& args) { + /* std::cout << "DRUGI" << std::endl; */ auto& t = db->tx_engine.begin(); auto id = args[0]->as<Int32>(); - std::cout << id.value << std::endl; auto vertex_accessor = db->graph.vertices.find(t, id.value); t.commit(); return "EXECUTED: MATCH (n{id:X}) RETURN n"; }; query_f[11637140396624918705ull] = [db](const code_args_t& args) { + /* std::cout << "TRECI" << std::endl; */ auto& t = db->tx_engine.begin(); auto id = args[0]->as<Int32>(); auto vertex_accessor = db->graph.vertices.find(t, id.value); @@ -57,9 +58,11 @@ public: }; query_f[784140140862470291ull] = [db](const code_args_t& args) { + /* std::cout << "CETVRTI" << std::endl; */ auto& t = db->tx_engine.begin(); auto id1 = args[0]->as<Int32>(); auto v1 = db->graph.vertices.find(t, id1.value); + if (!v1) { t.commit(); return "FAIL TO FIND NODE"; @@ -71,16 +74,23 @@ public: return "FAIL TO FIND NODE"; } auto edge_accessor = db->graph.edges.insert(t); + + + return t.commit(), "AAA"; + v1.vlist->access(t).update()->data.out.add(edge_accessor.vlist); v2.vlist->access(t).update()->data.in.add(edge_accessor.vlist); + edge_accessor.from(v1.vlist); edge_accessor.to(v2.vlist); edge_accessor.edge_type(EdgeType("test")); + t.commit(); return "EXECUTED: EDGE CREATED"; }; query_f[16940444920835972350ull] = [db](const code_args_t& args) { + /* std::cout << "PETI" << std::endl; */ auto& t = db->tx_engine.begin(); auto id = args[0]->as<Int32>(); auto v = db->graph.vertices.find(t, id.value); @@ -89,11 +99,32 @@ public: }; } + int x{0}; + void post(sp::Request& req, sp::Response& res) { - auto query = req.json["statements"][0]["statement"].GetString(); - auto strip = stripper.strip(query); - auto hash = fnv(strip.query); - return res.send(http::Status::Ok, query_f[hash](strip.arguments)); + task->run([this, &req]() { + //return res.send(http::Status::Ok, "alo"); + auto query = req.json["statements"][0]["statement"].GetString(); + auto strip = stripper.strip(query); + auto hash = fnv(strip.query); + + /* std::cout << "'" << query << "'" << std::endl; */ + + auto it = query_f.find(hash); + + if(it == query_f.end()) + { + std::cout << "NOT FOUND" << std::endl; + std::cout << query << std::endl; + std::cout << hash << std::endl; + return std::string("NOT FOUND"); + } + + return it->second(strip.arguments); + }, + [&req, &res](std::string str) { + return res.send(http::Status::Ok, str); + }); } }; diff --git a/demo/worker/benchmark.hpp b/demo/worker/benchmark.hpp index 43c52bd85..323eecbab 100644 --- a/demo/worker/benchmark.hpp +++ b/demo/worker/benchmark.hpp @@ -11,8 +11,8 @@ template <class W> class WorkerRunner { public: - WorkerRunner(const std::vector<std::string>& queries) - : worker(std::make_unique<W>(queries)) {} + WorkerRunner(const std::string& query) + : worker(std::make_unique<W>(query)) {} W* operator->() { return worker.get(); } const W* operator->() const { return worker.get(); } @@ -45,9 +45,9 @@ Result benchmark(const std::string& host, const std::string& port, std::vector<WorkerRunner<CypherWorker>> workers; for(int i = 0; i < threads; ++i) - workers.emplace_back(queries); + workers.emplace_back(queries[i]); - for(int i = 0; i < connections; ++i) + for(int i = 0; i < threads * connections; ++i) workers[i % threads]->connect(host, port); for(auto& worker : workers) @@ -67,11 +67,10 @@ Result benchmark(const std::string& host, const std::string& port, auto end = std::max_element(results.begin(), results.end(), [](auto a, auto b) { return a.end < b.end; })->end; - std::vector<uint64_t> qps(queries.size(), 0); + std::vector<uint64_t> qps; for(auto& result : results) - for(size_t i = 0; i < result.requests.size(); ++i) - qps[i] += result.requests[i]; + qps.push_back(result.requests); return {end - start, qps}; } diff --git a/demo/worker/client.cpp b/demo/worker/client.cpp index 0ab4824ef..7991968fc 100644 --- a/demo/worker/client.cpp +++ b/demo/worker/client.cpp @@ -8,7 +8,7 @@ void help() { std::cout << "error: too few arguments." << std::endl - << "usage: host port threads connections duration[s]" + << "usage: host port connections duration[s]" << std::endl; std::exit(0); @@ -16,22 +16,34 @@ void help() int main(int argc, char* argv[]) { - if(argc < 6) + if(argc < 5) help(); auto host = std::string(argv[1]); auto port = std::string(argv[2]); - auto threads = std::stoi(argv[3]); - auto connections = std::stoi(argv[4]); - auto duration = std::stod(argv[5]); + auto connections = std::stoi(argv[3]); + auto duration = std::stod(argv[4]); + // memgraph std::vector<std::string> queries { "CREATE (n{id:@}) RETURN n", - "MATCH (n{id:#}),(m{id:#}) CREATE (n)-[r:test]->(m) RETURN r", - "MATCH (n{id:#}) SET n.prop = ^ RETURN n", - "MATCH (n{id:#})-[r]->(m) RETURN count(r)" + /* "MATCH (n{id:#}),(m{id:#}) CREATE (n)-[r:test]->(m) RETURN r", */ + /* "MATCH (n{id:#}) SET n.prop = ^ RETURN n", */ + /* "MATCH (n{id:#}) RETURN n", */ + /* "MATCH (n{id:#})-[r]->(m) RETURN count(r)" */ }; + // neo4j + /* std::vector<std::string> queries { */ + /* "CREATE (n:Item{id:@}) RETURN n", */ + /* "MATCH (n:Item{id:#}),(m:Item{id:#}) CREATE (n)-[r:test]->(m) RETURN r", */ + /* "MATCH (n:Item{id:#}) SET n.prop = ^ RETURN n", */ + /* "MATCH (n:Item{id:#}) RETURN n", */ + /* "MATCH (n:Item{id:#})-[r]->(m) RETURN count(r)" */ + /* }; */ + + auto threads = queries.size(); + std::cout << "Running queries on " << connections << " connections " << "using " << threads << " threads " << "for " << duration << " seconds." << std::endl diff --git a/demo/worker/random.hpp b/demo/worker/random.hpp index 45d5d1585..72329e255 100644 --- a/demo/worker/random.hpp +++ b/demo/worker/random.hpp @@ -15,13 +15,15 @@ public: std::string operator()(Rg&& gen, size_t len) { auto str = std::string(); - str.reserve(len + 2); - str.push_back('\''); + str.reserve(len + 4); + str.push_back('\\'); + str.push_back('"'); while(str.size() < len) str.push_back(charset[rnd(std::forward<Rg>(gen))]); - str.push_back('\''); + str.push_back('\\'); + str.push_back('"'); return str; } diff --git a/demo/worker/worker.hpp b/demo/worker/worker.hpp index 385dc2b3d..8264e89df 100644 --- a/demo/worker/worker.hpp +++ b/demo/worker/worker.hpp @@ -18,14 +18,14 @@ struct WorkerResult { std::chrono::high_resolution_clock::time_point start, end; - std::vector<uint64_t> requests; + uint64_t requests; }; class CypherWorker : public SimpleClient<CypherWorker, io::tcp::Stream> { public: - CypherWorker(const std::vector<std::string>& queries) - : queries(queries), requests(queries.size(), 0) {} + CypherWorker(const std::string& query) + : query(query), requests(0) {} io::tcp::Stream& on_connect(io::Socket&& socket) { @@ -43,22 +43,18 @@ public: /* std::cout << "-----------------------------------------------" << std::endl; */ /* std::cout << std::endl; */ + requests++; send(stream.socket); } void send(io::Socket& socket) { - auto idx = random_int(mt) % queries.size(); - - // increase the number of requests - requests[idx]++; - // cypherize and send the request //socket.write(cypher(queries[idx])); - auto req = cypher(queries[idx]); + auto req = cypher(query); - /* std::cout << "-------------------- REQUEST ------------------" << std::endl; */ - /* std::cout << req << std::endl; */ +/* std::cout << "-------------------- REQUEST ------------------" << std::endl; */ +/* std::cout << req << std::endl; */ socket.write(req); } @@ -87,6 +83,6 @@ private: Cypher cypher; std::vector<std::unique_ptr<io::tcp::Stream>> streams; - std::vector<std::string> queries; - std::vector<uint64_t> requests; + std::string query; + uint64_t requests; }; diff --git a/memory/lazy_gc.hpp b/memory/lazy_gc.hpp index 766ac1e81..0889cdbbf 100644 --- a/memory/lazy_gc.hpp +++ b/memory/lazy_gc.hpp @@ -11,30 +11,30 @@ class LazyGC : public Crtp<Derived> public: void add_ref() { - ref_count.fetch_add(1, std::memory_order_relaxed); + /* ref_count.fetch_add(1, std::memory_order_relaxed); */ } void release_ref() { - // get refcount and subtract atomically - auto count = ref_count.fetch_sub(1, std::memory_order_acq_rel); + /* // get refcount and subtract atomically */ + /* auto count = ref_count.fetch_sub(1, std::memory_order_acq_rel); */ - // fetch_sub first returns and then subtrarcts so the refcount is - // zero when fetch_sub returns 1 - if(count != 1) - return; + /* // fetch_sub first returns and then subtrarcts so the refcount is */ + /* // zero when fetch_sub returns 1 */ + /* if(count != 1) */ + /* return; */ - if(!dirty.load(std::memory_order_acquire)) - return; + /* if(!dirty.load(std::memory_order_acquire)) */ + /* return; */ - auto guard = this->derived().gc_lock_acquire(); + /* auto guard = this->derived().gc_lock_acquire(); */ - if(!dirty.load(std::memory_order_acquire)) - return; + /* if(!dirty.load(std::memory_order_acquire)) */ + /* return; */ - this->derived().vacuum(); + /* this->derived().vacuum(); */ - dirty.store(false, std::memory_order_release); + /* dirty.store(false, std::memory_order_release); */ } protected: diff --git a/mvcc/record.hpp b/mvcc/record.hpp index 2db8e3d10..8d74b61bc 100644 --- a/mvcc/record.hpp +++ b/mvcc/record.hpp @@ -87,11 +87,22 @@ public: return committed(hints.exp, id, t); } + bool exp_committed(const tx::Transaction& t) + { + return committed(hints.exp, tx.exp(), t); + } + bool cre_committed(const Id& id, const tx::Transaction& t) { return committed(hints.cre, id, t); } + bool cre_committed(const tx::Transaction& t) + { + return committed(hints.cre, tx.cre(), t); + } + +protected: template <class U> bool committed(U& hints, const Id& id, const tx::Transaction& t) { diff --git a/mvcc/version_list.hpp b/mvcc/version_list.hpp index 4e8d071e9..49ae8b78a 100644 --- a/mvcc/version_list.hpp +++ b/mvcc/version_list.hpp @@ -233,7 +233,7 @@ private: // if the record hasn't been deleted yet or the deleting transaction // has aborted, it's ok to modify it - if(!record->tx.exp() || record->hints.load().exp.is_aborted()) + if(!record->tx.exp() || !record->exp_committed(t)) return; // if it committed, then we have a serialization conflict diff --git a/speedy/http/httpconnection.inl b/speedy/http/httpconnection.inl index f2c1158f3..9a739fa14 100644 --- a/speedy/http/httpconnection.inl +++ b/speedy/http/httpconnection.inl @@ -20,7 +20,10 @@ template <class Req, class Res> void HttpConnection<Req, Res>::close() { client.close([](uv_handle_t* client) -> void { - delete reinterpret_cast<connection_t*>(client->data); + // terrible bug, this can happen even though a query is running and + // then the query is using memory which is deallocated + + //delete reinterpret_cast<connection_t*>(client->data); }); } diff --git a/speedy/http/response.inl b/speedy/http/response.inl index b4235f43b..4179b813d 100644 --- a/speedy/http/response.inl +++ b/speedy/http/response.inl @@ -25,6 +25,12 @@ void Response<Req, Res>::send(Status status, const std::string& body) template <class Req, class Res> void Response<Req, Res>::send(const std::string& body) { + // terrible bug. if the client closes the connection, buffer is cleared and + // there is noone to respond to and you start writing to memory that isn't + // yours anymore + if(buffer.count() == 0) + return; + uv_write_t* write_req = static_cast<uv_write_t*>(write_req_allocator.acquire()); @@ -41,10 +47,16 @@ void Response<Req, Res>::send(const std::string& body) buffer << it->first << ":" << it->second << "\r\n"; buffer << "\r\n" << body; + /* std::cout << "SALJEM RESPONSE" << std::endl; */ + /* std::cout << body << std::endl; */ + /* std::cout << connection.request.body << std::endl; */ + /* std::cout << "buffer count:" << buffer.count() << std::endl; */ uv_write(write_req, connection.client, buffer, buffer.count(), [](uv_write_t* write_req, int) { + /* std::cout << "POSLAO RESPONSE" << std::endl; */ + connection_t& conn = *reinterpret_cast<connection_t*>(write_req->data); if(!conn.keep_alive) diff --git a/speedy/speedy.hpp b/speedy/speedy.hpp index 9c8db56dc..5d918c882 100644 --- a/speedy/speedy.hpp +++ b/speedy/speedy.hpp @@ -71,7 +71,7 @@ public: server.listen(ip, [this](Request& req, Response& res) { auto route = router.match(R3::to_r3_method(req.method), req.url); - + if(!route.exists()) return res.send(http::Status::NotFound, "Resource not found"); diff --git a/threading/sync/futex.hpp b/threading/sync/futex.hpp index eddc6a969..afef77450 100644 --- a/threading/sync/futex.hpp +++ b/threading/sync/futex.hpp @@ -157,7 +157,7 @@ private: void relax() { - usleep(250); + cpu_relax(); } }; diff --git a/threading/sync/spinlock.hpp b/threading/sync/spinlock.hpp index a8e1db118..8f9fe3e2b 100644 --- a/threading/sync/spinlock.hpp +++ b/threading/sync/spinlock.hpp @@ -3,6 +3,8 @@ #include <atomic> #include <unistd.h> +#include "utils/cpu_relax.hpp" + class SpinLock { public: @@ -10,7 +12,8 @@ public: void lock() { while(lock_flag.test_and_set(std::memory_order_acquire)) - usleep(250); + cpu_relax(); + ///usleep(250); } void unlock()