diff --git a/api/link_resources.py b/api/link_resources.py new file mode 100644 index 000000000..798f4dbf9 --- /dev/null +++ b/api/link_resources.py @@ -0,0 +1,68 @@ +# this script generates the include.hpp file for restful resources +import re +import os + +resource_path = 'resources' +template_path = 'resources/include.hpp.template' +include_path = 'resources/include.hpp' + +# remove the old version of the file if exists +if os.path.isfile(include_path): + os.remove(include_path) + + +class Resource(object): + """ represents a restful resource class for speedy """ + + def __init__(self, filename): + self.filename = filename + + with open(os.path.join(resource_path, filename)) as f: + class_name = re.compile('\s*class\s*(\w+)\s*\:') + + for line in f: + result = re.search(class_name, line) + + if result is None: + continue + + self.class_name = result.group(1) + + break + + +def load_resources(): + return [Resource(f) for f in os.listdir(resource_path) + if f.endswith('.hpp')] + + +def write_includes(file, resources): + for filename in [resource.filename for resource in resources]: + print 'writing include for', filename + file.write('#include "{}"\n'.format(filename)) + + +def write_inits(file, resources): + for class_name in [resource.class_name for resource in resources]: + print 'writing init for', class_name + file.write(' insert<{}>(app);\n'.format(class_name)) + + +def make_include_file(): + resources = load_resources() + + with open(template_path, 'r') as ftemplate: + with open(include_path, 'w') as finclude: + for line in ftemplate: + if '' in line: + write_includes(finclude, resources) + continue + + if '' in line: + write_inits(finclude, resources) + continue + + finclude.write(line) + +if __name__ == '__main__': + make_include_file() diff --git a/api/resources/animal.hpp b/api/resources/animal.hpp new file mode 100644 index 000000000..ceee4db12 --- /dev/null +++ b/api/resources/animal.hpp @@ -0,0 +1,23 @@ +#ifndef MEMGRAPH_ANIMAL_HPP +#define MEMGRAPH_ANIMAL_HPP + +#include "speedy/speedy.hpp" +#include "api/restful/resource.hpp" + +class Animal : public api::Resource +{ +public: + Animal(speedy::Speedy& app) : Resource(app, "/animal") {} + + void get(http::Request& req, http::Response& res) + { + return res.send("Ok, here is a Dog"); + } + + void post(http::Request& req, http::Response& res) + { + return res.send("Oh, you gave me an animal?"); + } +}; + +#endif diff --git a/api/resources/include.hpp b/api/resources/include.hpp new file mode 100644 index 000000000..a7d03b710 --- /dev/null +++ b/api/resources/include.hpp @@ -0,0 +1,33 @@ +/** @file include.hpp + * @brief Links all restful resources to the application + * + * This file is autogenerated by the python script link_resources.py + * + * YOU SHOULD NOT EDIT THIS FILE MANUALLY! + */ + +#ifndef MEMGRAPH_API_RESOURCES_INCLUDE_HPP +#define MEMGRAPH_API_RESOURCES_INCLUDE_HPP + +#include +#include + +#include "api/restful/resource.hpp" +#include "speedy/speedy.hpp" + +#include "animal.hpp" + +static std::list> resources; + +template +void insert(speedy::Speedy& app) +{ + resources.push_back(std::unique_ptr(new T(app))); +} + +void init(speedy::Speedy& app) +{ + insert(app); +} + +#endif diff --git a/api/resources/include.hpp.template b/api/resources/include.hpp.template new file mode 100644 index 000000000..55e344882 --- /dev/null +++ b/api/resources/include.hpp.template @@ -0,0 +1,33 @@ +/** @file include.hpp + * @brief Links all restful resources to the application + * + * This file is autogenerated by the python script link_resources.py + * + * YOU SHOULD NOT EDIT THIS FILE MANUALLY! + */ + +#ifndef MEMGRAPH_API_RESOURCES_INCLUDE_HPP +#define MEMGRAPH_API_RESOURCES_INCLUDE_HPP + +#include +#include + +#include "api/restful/resource.hpp" +#include "speedy/speedy.hpp" + + + +static std::list> resources; + +template +void insert(speedy::Speedy& app) +{ + resources.push_back(std::unique_ptr(new T(app))); +} + +void init(speedy::Speedy& app) +{ + +} + +#endif diff --git a/api/resource.hpp b/api/restful/resource.hpp similarity index 97% rename from api/resource.hpp rename to api/restful/resource.hpp index 08293d52e..62c12db71 100644 --- a/api/resource.hpp +++ b/api/restful/resource.hpp @@ -114,6 +114,8 @@ struct Methods : public Method } +struct RestfulResource {}; + /** @brief Represents a restful resource * * Automatically registers get, put, post, del... methods inside the derived @@ -128,7 +130,7 @@ struct Methods : public Method * @tparam Ms... HTTP methods to register for this resource (GET, POST...) */ template -class Resource : public detail::Methods +class Resource : public detail::Methods, public RestfulResource { public: Resource(speedy::Speedy& app, const std::string& path) diff --git a/api/test.cpp b/api/test.cpp index 533d21db5..26d6aeece 100644 --- a/api/test.cpp +++ b/api/test.cpp @@ -1,30 +1,14 @@ #include #include "speedy/speedy.hpp" -#include "resource.hpp" - -class Animal : public api::Resource -{ -public: - Animal(speedy::Speedy& app) : Resource(app, "/animal") {} - - void get(http::Request& req, http::Response& res) - { - return res.send("Ok, here is a Dog"); - } - - void post(http::Request& req, http::Response& res) - { - return res.send("Oh, you gave me an animal?"); - } -}; +#include "resources/include.hpp" int main(void) { uv::UvLoop loop; speedy::Speedy app(loop); - auto animal = Animal(app); + init(app); http::Ipv4 ip("0.0.0.0", 3400); app.listen(ip); diff --git a/data_structures/bitset/dynamic_bitset.hpp b/data_structures/bitset/dynamic_bitset.hpp index 4963d327f..b42b3f4c2 100644 --- a/data_structures/bitset/dynamic_bitset.hpp +++ b/data_structures/bitset/dynamic_bitset.hpp @@ -2,11 +2,7 @@ #define MEMGRAPH_DATA_STRUCTURES_BITSET_DYNAMIC_BITSET_HPP #include -#include #include -#include -#include -#include #include "threading/sync/lockable.hpp" #include "threading/sync/spinlock.hpp" @@ -140,7 +136,7 @@ private: // the next chunk does not exist and we need it. take an exclusive // lock to prevent others that also want to create a new chunk // from creating it - auto guard = acquire(); + auto guard = acquire_unique(); // double-check locking. if the chunk exists now, some other thread // has just created it, continue searching for my chunk diff --git a/data_structures/list/lockfree_list.hpp b/data_structures/list/lockfree_list.hpp index c7c0b4895..6259fc31a 100644 --- a/data_structures/list/lockfree_list.hpp +++ b/data_structures/list/lockfree_list.hpp @@ -136,7 +136,7 @@ public: // we only care about push_front and iterator performance so we can // tradeoff some remove speed for better reads and inserts. remove is // used exclusively by the GC thread(s) so it can be slower - auto guard = acquire(); + auto guard = acquire_unique(); // even though concurrent removes are synchronized, we need to worry // about concurrent reads (solved by using atomics) and concurrent diff --git a/data_structures/queue/slqueue.hpp b/data_structures/queue/slqueue.hpp index d1cf145b6..745182d49 100644 --- a/data_structures/queue/slqueue.hpp +++ b/data_structures/queue/slqueue.hpp @@ -6,42 +6,39 @@ #include "threading/sync/lockable.hpp" #include "threading/sync/spinlock.hpp" -namespace spinlock -{ - template -class Queue : Lockable +class SlQueue : Lockable { public: template void emplace(Args&&... args) { - auto guard = acquire(); + auto guard = acquire_unique(); queue.emplace(args...); } void push(const T& item) { - auto guard = acquire(); + auto guard = acquire_unique(); queue.push(item); } T front() { - auto guard = acquire(); + auto guard = acquire_unique(); return queue.front(); } void pop() { - auto guard = acquire(); + auto guard = acquire_unique(); queue.pop(); } bool pop(T& item) { - auto guard = acquire(); + auto guard = acquire_unique(); if(queue.empty()) return false; @@ -52,13 +49,13 @@ public: bool empty() { - auto guard = acquire(); + auto guard = acquire_unique(); return queue.empty(); } size_t size() { - auto guard = acquire(); + auto guard = acquire_unique(); return queue.size(); } @@ -66,6 +63,4 @@ private: std::queue queue; }; -} - #endif diff --git a/data_structures/skiplist/lockfree_skiplist.hpp b/data_structures/skiplist/lockfree_skiplist.hpp new file mode 100644 index 000000000..eebbbcd38 --- /dev/null +++ b/data_structures/skiplist/lockfree_skiplist.hpp @@ -0,0 +1,61 @@ +#ifndef MEMGRAPH_DATA_STRUCTURES_SKIPLIST_LOCKFREE_SKIPLIST_HPP +#define MEMGRAPH_DATA_STRUCTURES_SKIPLIST_LOCKFREE_SKIPLIST_HPP + +#include +#include + +#include "utils/mark_ref.hpp" + +namespace lockfree +{ + +template +class SkipList +{ +public: + struct Node + { + using ref_t = MarkRef; + + K* key; + T* item; + + const uint8_t height; + + static Node* create(uint8_t height, K* key, T* item) + { + auto size = sizeof(Node) + height * sizeof(std::atomic); + auto node = static_cast(std::malloc(size)); + return new (node) Node(height, key, item); + } + + static void destroy(Node* node) + { + node->~SkipNode(); + free(node); + } + + private: + Node(uint8_t height, K* key, T* item) + : key(key), item(item), height(height) + { + for(uint8_t i = 0; i < height; ++i) + new (&tower[i]) std::atomic(nullptr); + } + + // this creates an array of the size zero. we can't put any sensible + // value here since we don't know what size it will be untill the + // node is allocated. we could make it a SkipNode** but then we would + // have two memory allocations, one for node and one for the forward + // list. this way we avoid expensive malloc/free calls and also cache + // thrashing when following a pointer on the heap + std::atomic tower[0]; + }; + + //void list_search(const K& key, + +}; + +} + +#endif diff --git a/data_structures/skiplist/skiplist.hpp b/data_structures/skiplist/skiplist.hpp index d0d380cef..39602c3bf 100644 --- a/data_structures/skiplist/skiplist.hpp +++ b/data_structures/skiplist/skiplist.hpp @@ -28,7 +28,7 @@ public: ~SkipList() { - for(Node* current = header.load(std::memory_order_relaxed); current;) + for(Node* current = header.load(); current;) { Node* next = current->forward(0); Node::destroy(current); @@ -38,7 +38,7 @@ public: size_t size() const { - return size_.load(std::memory_order_relaxed); + return size_.load(); } uint8_t height() const @@ -60,7 +60,7 @@ public: size_t increment_size(size_t delta) { - return size_.fetch_add(delta, std::memory_order_relaxed) + delta; + return size_.fetch_add(delta) + delta; } int find_path(Node* from, @@ -92,7 +92,7 @@ public: Node* find(const K* const key) { - Node* pred = header.load(std::memory_order_consume); + Node* pred = header.load(); Node* node = nullptr; uint8_t level = pred->height; @@ -150,7 +150,7 @@ public: while(true) { - auto head = header.load(std::memory_order_consume); + auto head = header.load(); auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs); if(lfound != -1) @@ -215,7 +215,7 @@ public: while(true) { - auto head = header.load(std::memory_order_consume); + auto head = header.load(); auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs); if(!marked && (lfound == -1 || !ok_delete(succs[lfound], lfound))) diff --git a/data_structures/slrbtree.hpp b/data_structures/slrbtree.hpp index 86558824a..d74a08279 100644 --- a/data_structures/slrbtree.hpp +++ b/data_structures/slrbtree.hpp @@ -3,15 +3,14 @@ #include -#include "utils/sync/spinlock.hpp" +#include "threading/sync/spinlock.hpp" template -class SlRbTree +class SlRbTree : Lockable { public: private: - SpinLock lock; std::map tree; }; diff --git a/data_structures/slstack.hpp b/data_structures/slstack.hpp index dbde180a4..54315f417 100644 --- a/data_structures/slstack.hpp +++ b/data_structures/slstack.hpp @@ -29,7 +29,6 @@ public: } private: - SpinLock lock; std::stack stack; }; diff --git a/database/db.hpp b/database/db.hpp new file mode 100644 index 000000000..4796b9eb5 --- /dev/null +++ b/database/db.hpp @@ -0,0 +1,20 @@ +#ifndef MEMGRAPH_DATABASE_DB_HPP +#define MEMGRAPH_DATABASE_DB_HPP + +#include "transactions/transaction_engine.hpp" +#include "transactions/commit_log.hpp" + +class Db +{ +public: + static Db& get() + { + static Db db; + return db; + } + + tx::CommitLog commit_log; + tx::TransactionEngine transaction_engine; +}; + +#endif diff --git a/ioctest.cpp b/ioctest.cpp deleted file mode 100644 index ab92a2952..000000000 --- a/ioctest.cpp +++ /dev/null @@ -1,77 +0,0 @@ -#include - -#include "utils/ioc/container.hpp" - -struct A -{ - A() - { - std::cout << "Constructor of A" << std::endl; - } - - int a = 3; -}; - -struct C; - -struct B -{ - B(std::shared_ptr&& a, std::shared_ptr&& c) - : a(a), c(c) - { - std::cout << "Constructor of B" << std::endl; - } - - int b = 4; - - std::shared_ptr a; - std::shared_ptr c; -}; - -struct C -{ - C(int c) : c(c) - { - std::cout << "Constructor of C" << std::endl; - } - - int c; -}; - -int main(void) -{ - ioc::Container container; - - // register a singleton class A - auto a = container.singleton(); - std::cout << a->a << std::endl; // should print 3 - - // register a factory function that makes C - container.factory([]() { - return std::make_shared(5); - }); - - // try to resolve A - auto aa = container.resolve(); - std::cout << aa->a << std::endl; // should print 3 - - // register a singleton class B with dependencies A and C - // (A will be resolved as an existing singleton and C will - // be created by the factory function defined above) - auto b = container.singleton(); - std::cout << b->b << std::endl; // should print 4 - std::cout << b->a->a << std::endl; // should print 3 - std::cout << b->c->c << std::endl; // should print 5 - - // try to resolve B - auto bb = container.resolve(); - std::cout << bb->b << std::endl; // should print 4 - - // try to resolve C - // (will be created as a new instance by the factory - // function defined above) - auto c = container.resolve(); - std::cout << c->c << std::endl; // should print 5 - - return 0; -}; diff --git a/memory/literals.hpp b/memory/literals.hpp new file mode 100644 index 000000000..965ee0ef1 --- /dev/null +++ b/memory/literals.hpp @@ -0,0 +1,29 @@ +#ifndef MEMGRAPH_MEMORY_LITERALS_HPP +#define MEMGRAPH_MEMORY_LITERALS_HPP + +#include + +namespace memory +{ +namespace literals +{ + +constexpr unsigned long long operator"" _GB(unsigned long long gb) +{ + return 1024 * 1024 * 1024 * gb; +} + +constexpr unsigned long long operator"" _MB(unsigned long long mb) +{ + return 1024 * 1024 * mb; +} + +constexpr unsigned long long operator"" _kB(unsigned long long kb) +{ + return 1024 * kb; +} + +} +} + +#endif diff --git a/mvcc/atom.hpp b/mvcc/atom.hpp index ab3e8570f..b19b38b8b 100644 --- a/mvcc/atom.hpp +++ b/mvcc/atom.hpp @@ -3,7 +3,7 @@ #include "threading/sync/lockable.hpp" -#include "transaction.hpp" +#include "transactions/transaction.hpp" #include "version.hpp" namespace mvcc @@ -11,11 +11,10 @@ namespace mvcc template class Atom : public Version, - public Lockable<> + public Lockable { public: - Atom(uint64_t id, T* first) - : Version(first), id(id) + Atom(uint64_t id, T* first) : Version(first), id(id) { // it's illegal that the first version is nullptr. there should be at // least one version of a record @@ -29,27 +28,17 @@ public: // inspects the record change history and returns the record version visible // to the current transaction if it exists, otherwise it returns nullptr - T* latest_visible(const Transaction& t) + T* latest_visible(const tx::Transaction& t) { return first()->latest_visible(t); } - // inspects the record change history and returns the newest available - // version from all transactions - T& newest_available() - { - T* record = this->newer(), newer = this->newer(); - - while(newer != nullptr) - record = newer, newer = record->newer(); - - return record; - } - // every record has a unique id. 2^64 = 1.8 x 10^19. that should be enough // for a looong time :) but keep in mind that some vacuuming would be nice // to reuse indices for deleted nodes. uint64_t id; + + std::atomic*> next; }; } diff --git a/mvcc/hints.hpp b/mvcc/hints.hpp new file mode 100644 index 000000000..f0ed3df3b --- /dev/null +++ b/mvcc/hints.hpp @@ -0,0 +1,115 @@ +#ifndef MEMGRAPH_MVCC_HINTS_HPP +#define MEMGRAPH_MVCC_HINTS_HPP + +#include +#include + +#include "transactions/commit_log.hpp" + +namespace mvcc +{ + +// known committed and known aborted for both xmax and xmin +// this hints are used to quickly check the commit/abort status of the +// transaction that created this record. if these are not set, one should +// consult the commit log to find the status and update the status here +// more info https://wiki.postgresql.org/wiki/Hint_Bits +class Hints +{ +public: + union HintBits; + +private: + enum Flags { + MIN_CMT = 1, // XX01 + MIN_ABT = 1 << 1, // XX10 + MAX_CMT = 1 << 2, // 01XX + MAX_ABT = 1 << 3 // 10XX + }; + + template + class TxHints + { + using type = TxHints; + + public: + TxHints(std::atomic& bits) + : bits(bits) {} + + struct Value + { + bool is_committed() const + { + return bits & COMMITTED; + } + + bool is_aborted() const + { + return bits & ABORTED; + } + + bool is_unknown() const + { + return !(is_committed() || is_aborted()); + } + + uint8_t bits; + }; + + Value load(std::memory_order order = std::memory_order_seq_cst) + { + return Value { bits.load(order) }; + } + + void set_committed() + { + bits.fetch_or(COMMITTED); + } + + void set_aborted() + { + bits.fetch_or(ABORTED); + } + + private: + std::atomic& bits; + }; + + struct Min : public TxHints + { + using TxHints::TxHints; + }; + + struct Max : public TxHints + { + using TxHints::TxHints; + }; + +public: + Hints() : min(bits), max(bits) + { + assert(bits.is_lock_free()); + } + + union HintBits + { + uint8_t bits; + + Min::Value min; + Max::Value max; + }; + + HintBits load(std::memory_order order = std::memory_order_seq_cst) + { + return HintBits { bits.load(order) }; + } + + Min min; + Max max; + + std::atomic bits { 0 }; +}; + +} + +#endif diff --git a/mvcc/mvcc.hpp b/mvcc/mvcc.hpp index 9d80cb86b..5c5ad007f 100644 --- a/mvcc/mvcc.hpp +++ b/mvcc/mvcc.hpp @@ -3,9 +3,12 @@ #include -#include "transaction.hpp" +#include "transactions/transaction.hpp" +#include "transactions/commit_log.hpp" + #include "minmax.hpp" #include "version.hpp" +#include "hints.hpp" // the mvcc implementation used here is very much like postgresql's // more info: https://momjian.us/main/writings/pgsql/mvcc.pdf @@ -17,7 +20,7 @@ template class Mvcc : public Version { public: - Mvcc() {} + Mvcc() = default; // tx.min is the id of the transaction that created the record // and tx.max is the id of the transaction that deleted the record @@ -32,7 +35,7 @@ public: MinMax cmd; // check if this record is visible to the transaction t - bool visible(const Transaction& t) + bool visible(const tx::Transaction& t) { // TODO check if the record was created by a transaction that has been // aborted. one might implement this by checking the hints in mvcc @@ -42,30 +45,30 @@ public: // if you think they're not, you're wrong, and you should think about it // again. i know, it happened to me. - return ((tx.min() == t.id && // inserted by the current transaction - cmd.min() < t.cid && // before this command, and - (tx.max() == 0 || // the row has not been deleted, or - (tx.max() == t.id && // it was deleted by the current - // transaction - cmd.max() >= t.cid))) // but not before this command, - || // or - (t.committed(tx.min()) && // the record was inserted by a - // committed transaction, and - (tx.max() == 0 || // the record has not been deleted, or - (tx.max() == t.id && // the row is being deleted by this - // transaction - cmd.max() >= t.cid) || // but it's not deleted "yet", or - (tx.max() != t.id && // the row was deleted by another - // transaction - !t.committed(tx.max()) // that has not been committed + return ((tx.min() == t.id && // inserted by the current transaction + cmd.min() < t.cid && // before this command, and + (tx.max() == 0 || // the row has not been deleted, or + (tx.max() == t.id && // it was deleted by the current + // transaction + cmd.max() >= t.cid))) // but not before this command, + || // or + (min_committed(tx.min(), t) && // the record was inserted by a + // committed transaction, and + (tx.max() == 0 || // the record has not been deleted, or + (tx.max() == t.id && // the row is being deleted by this + // transaction + cmd.max() >= t.cid) || // but it's not deleted "yet", or + (tx.max() != t.id && // the row was deleted by another + // transaction + !max_committed(tx.max(), t) // that has not been committed )))); } // inspects the record change history and returns the record version visible // to the current transaction if it exists, otherwise it returns nullptr - T* latest_visible(const Transaction& t) + T* latest_visible(const tx::Transaction& t) { - T* record = this, newer = this->newer(); + T* record = static_cast(this), *newer = this->newer(); // move down through the versions of the nodes until you find the first // one visible to this transaction. if no visible records are found, @@ -76,25 +79,59 @@ public: return record; } - void mark_created(const Transaction& t) + void mark_created(const tx::Transaction& t) { tx.min(t.id); cmd.min(t.cid); } - void mark_deleted(const Transaction& t) + void mark_deleted(const tx::Transaction& t) { tx.max(t.id); cmd.max(t.cid); } protected: - // known committed and known aborted for both xmax and xmin - // this hints are used to quickly check the commit/abort status of the - // transaction that created this record. if these are not set, one should - // consult the commit log to find the status and update the status here - // more info https://wiki.postgresql.org/wiki/Hint_Bits - std::atomic hints; + Hints hints; + + bool max_committed(uint64_t id, const tx::Transaction& t) + { + return committed(hints.max, id, t); + } + + bool min_committed(uint64_t id, const tx::Transaction& t) + { + return committed(hints.min, id, t); + } + + template + bool committed(U& hints, uint64_t id, const tx::Transaction& t) + { + auto hint_bits = hints.load(); + + // if hints are set, return if xid is committed + if(!hint_bits.is_unknown()) + return hint_bits.is_committed(); + + // if hints are not set: + // - the creating transaction is still in progress (examine snapshot) + if(t.snapshot.is_active(id)) + return false; + + // - you are the first one to check since it ended, consult commit log + auto& clog = tx::CommitLog::get(); + auto info = clog.fetch_info(id); + + if(info.is_committed()) + { + hints.set_committed(); + return true; + } + + assert(info.is_aborted()); + hints.set_aborted(); + return false; + } }; } diff --git a/mvcc/mvcc_error.hpp b/mvcc/mvcc_error.hpp new file mode 100644 index 000000000..cb3105ca7 --- /dev/null +++ b/mvcc/mvcc_error.hpp @@ -0,0 +1,17 @@ +#ifndef MEMGRAPH_MVCC_MVCC_ERROR_HPP +#define MEMGRAPH_MVCC_MVCC_ERROR_HPP + +#include + +namespace mvcc +{ + +class MvccError : public std::runtime_error +{ +public: + using runtime_error::runtime_error; +}; + +} + +#endif diff --git a/mvcc/store.hpp b/mvcc/store.hpp index 8b72d957c..0fb6903c3 100644 --- a/mvcc/store.hpp +++ b/mvcc/store.hpp @@ -1,10 +1,9 @@ #ifndef MEMGRAPH_STORAGE_MVCC_STORE_HPP #define MEMGRAPH_STORAGE_MVCC_STORE_HPP -#include - -#include "mvcc/transaction.hpp" +#include "transactions/transaction.hpp" #include "mvcc/atom.hpp" +#include "mvcc/mvcc_error.hpp" #include "data_structures/list/lockfree_list.hpp" #include "utils/counters/atomic_counter.hpp" @@ -15,40 +14,57 @@ namespace mvcc { -class MvccError : public std::runtime_error -{ -public: - using runtime_error::runtime_error; -}; - template class MvccStore { using list_t = lockfree::List>; public: - using iterator = typename list_t::iterator; + using read_iterator = typename list_t::read_iterator; + using read_write_iterator = typename list_t::read_write_iterator; MvccStore() : counter(0) {} - iterator insert(const Transaction& t) + read_iterator begin() { - auto record = new T(); - - record->mark_created(t); - - return data.push_front(Atom(counter.next(), record)); + return data.begin(); } - T* update(Atom& atom, T& record, const Transaction& t) + read_write_iterator rw_begin() + { + return data.rw_begin(); + } + + Atom* insert(const tx::Transaction& t) + { + // create a first version of the record + auto record = new T(); + + // mark the record as created by the transaction t + record->mark_created(t); + + // create an Atom to put in the list + auto atom = new Atom(counter.next(), record); + + // put the atom with the record to the linked list + data.push_front(atom); + + return atom; + } + + T* update(Atom& atom, T& record, const tx::Transaction& t) { auto guard = atom.acquire(); // if xmax is not zero, that means there is a newer version of this - // record or it has been deleted. we cannot do anything here + // record or it has been deleted. we cannot do anything here until + // we implement some more intelligent locking mechanisms if(record.tx.max()) throw MvccError("can't serialize due to concurrent operation(s)"); + assert(atom.latest_visible(t) == &record); + assert(atom.latest_visible(t) == record.latest_visible(t)); + // make a new version auto updated = new T(); *updated = *record; @@ -63,7 +79,7 @@ public: return updated; } - void remove(Atom& atom, T& record, const Transaction& t) + void remove(Atom& atom, T& record, const tx::Transaction& t) { auto guard = atom.acquire(); diff --git a/mvcc/version.hpp b/mvcc/version.hpp index bbc99a1be..924e84eff 100644 --- a/mvcc/version.hpp +++ b/mvcc/version.hpp @@ -12,18 +12,23 @@ class Version public: Version() : versions(nullptr) {} + ~Version() + { + delete versions.load(); + } + Version(T* value) : versions(value) {} // return a pointer to a newer version stored in this record T* newer() { - return versions.load(std::memory_order_relaxed); + return versions.load(); } // set a newer version of this record void newer(T* value) { - versions.store(value, std::memory_order_relaxed); + versions.store(value); } private: diff --git a/speedy/r3.hpp b/speedy/r3.hpp index 12566d57e..cb094b8cb 100644 --- a/speedy/r3.hpp +++ b/speedy/r3.hpp @@ -99,6 +99,7 @@ public: R3(R3&& other) { + this->routes = std::move(other.routes); this->root = other.root; other.root = nullptr; } diff --git a/speedy/speedy.hpp b/speedy/speedy.hpp index d4025e15d..e5a6f1398 100644 --- a/speedy/speedy.hpp +++ b/speedy/speedy.hpp @@ -20,7 +20,10 @@ namespace speedy class Speedy { public: - Speedy(uv::UvLoop& loop) : server(loop), router(100) {} + using sptr = std::shared_ptr; + + Speedy(uv::UvLoop& loop, size_t capacity = 100) + : server(loop), router(capacity) {} Speedy(Speedy&) = delete; Speedy(Speedy&&) = delete; diff --git a/speedy/speedy.inl b/speedy/speedy.inl new file mode 100644 index 000000000..e5e33e09f --- /dev/null +++ b/speedy/speedy.inl @@ -0,0 +1,94 @@ +#ifndef MEMGRAPH_SPEEDY_INL +#define MEMGRAPH_SPEEDY_INL + +#include "speedy.hpp" + +namespace speedy +{ + +int r3_request_method(http::Method method) +{ + switch (method) { + case http::Method::GET: return METHOD_GET; + case http::Method::POST: return METHOD_POST; + case http::Method::PUT: return METHOD_PUT; + case http::Method::DELETE: return METHOD_DELETE; + case http::Method::HEAD: return METHOD_HEAD; + } +} + +// TODO: better implementation + +Speedy::Speedy(uv::UvLoop& loop, const http::Ipv4& ip) : server(loop), ip(ip) +{ + n = r3_tree_create(100); +} + +void Speedy::store_callback(int method, + const std::string &path, + http::request_cb_t callback) +{ + callbacks.push_back(callback); + void *ptr = malloc(sizeof(uint)); + *((uint *)ptr) = callbacks.size() - 1; + r3_tree_insert_routel(n, method, path.c_str(), path.size(), ptr); +} + +void Speedy::get(const std::string &path, http::request_cb_t callback) +{ + store_callback(METHOD_GET, path, callback); + + // TODO: something like this + // this solution doesn't work, currenlty I don't know why + // callbacks.push_back(callback) + // r3_tree_insert_pathl(n, path.c_str(), path.size(), &callbacks.back()); +} + +void Speedy::post(const std::string &path, http::request_cb_t callback) +{ + store_callback(METHOD_POST, path, callback); +} + +void Speedy::put(const std::string &path, http::request_cb_t callback) +{ + store_callback(METHOD_PUT, path, callback); +} + +void Speedy::del(const std::string &path, http::request_cb_t callback) +{ + store_callback(METHOD_DELETE, path, callback); +} + +void Speedy::listen() +{ + char *errstr = NULL; + int err = r3_tree_compile(n, &errstr); + if (err) { + std::cout << "R3 compile error" << std::endl; + } + + server.listen(ip, [this](http::Request& req, http::Response& res) { + auto url = req.url; + auto c_url = url.c_str(); + match_entry *entry = match_entry_create(c_url); + entry->request_method = r3_request_method(req.method); + route *r = r3_tree_match_route(this->n, entry); + match_entry_free(entry); + if (r) { + int index = *((int *)r->data); + auto callback = this->callbacks[index]; + callback(req, res); + // TODO: and something like this + // auto callback = *reinterpret_cast(n->data); + // callback(req, res); + } else { + res.send("Not found"); + } + }); + + std::cout << "Server is UP" << std::endl; +} + +} + +#endif diff --git a/storage/graph.hpp b/storage/graph.hpp index c0d15acf6..c75cf4e2f 100644 --- a/storage/graph.hpp +++ b/storage/graph.hpp @@ -5,6 +5,7 @@ #include "mvcc/atom.hpp" #include "mvcc/store.hpp" +#include "mvcc/mvcc_error.hpp" #include "vertex.hpp" #include "edge.hpp" @@ -17,13 +18,24 @@ class Graph public: Graph() {} - EdgeStore::iterator connect(Vertex a, Vertex b, const Transaction& t) + mvcc::Atom* connect(mvcc::Atom& atom_a, Vertex& a, + mvcc::Atom& atom_b, Vertex& b, + const tx::Transaction& t) { - auto it = edges.insert(t); + // try to lock A + auto guard_a = atom_a.acquire_unique(); - it-> - - return it; + if(a.tx.max()) + throw mvcc::MvccError("can't serialize due to\ + concurrent operation(s)"); + + auto guard_b = atom_b.acquire_unique(); + + if(b.tx.max()) + throw mvcc::MvccError("can't serialize due to\ + concurrent operation(s)"); + + return edges.insert(t); } VertexStore vertices; diff --git a/storage/model/properties/properties.hpp b/storage/model/properties/properties.hpp index 1f47178bf..36347a5b6 100644 --- a/storage/model/properties/properties.hpp +++ b/storage/model/properties/properties.hpp @@ -4,6 +4,7 @@ #include #include "property.hpp" +#include "string.hpp" namespace model { @@ -43,9 +44,9 @@ public: buffer += '"'; buffer += kvp.first; buffer += "\":"; kvp.second->dump(buffer); buffer += ','; } - - buffer.pop_back(); // erase last comma - buffer += '}'; + + // replace last redundant comma with } + buffer.back() = '}'; } private: diff --git a/storage/model/record.hpp b/storage/model/record.hpp index 0d4f42d27..e074c7d1d 100644 --- a/storage/model/record.hpp +++ b/storage/model/record.hpp @@ -1,6 +1,7 @@ #ifndef MEMGRAPH_STORAGE_RECORD_HPP #define MEMGRAPH_STORAGE_RECORD_HPP +#include #include #include diff --git a/storage/vertex.hpp b/storage/vertex.hpp index 13e0da874..07bd0eb1e 100644 --- a/storage/vertex.hpp +++ b/storage/vertex.hpp @@ -12,4 +12,14 @@ struct Vertex : public Record std::vector out; }; +inline std::ostream& operator<<(std::ostream& stream, Vertex& record) +{ + std::string props; + record.properties.dump(props); + + return stream << "Vertex" + << "(xmin = " << record.tx.min() + << ", xmax = " << record.tx.max() + << "): " << props; +} #endif diff --git a/threading/pool.hpp b/threading/pool.hpp index 96fe8220a..846a3f81d 100644 --- a/threading/pool.hpp +++ b/threading/pool.hpp @@ -37,7 +37,7 @@ public: void run(F&& f, Args&&... args) { { - auto lock = acquire(); + auto lock = acquire_unique(); tasks.emplace([&f, &args...]() { f(std::forward(args)...); @@ -62,7 +62,7 @@ private: task_t task; { - auto lock = acquire(); + auto lock = acquire_unique(); cond.wait(lock, [this] { return !this->alive || !this->tasks.empty(); diff --git a/threading/sync/lockable.hpp b/threading/sync/lockable.hpp index d0f78ee7a..01f627d94 100644 --- a/threading/sync/lockable.hpp +++ b/threading/sync/lockable.hpp @@ -7,7 +7,9 @@ template class Lockable { -protected: +public: + using lock_type = lock_t; + std::lock_guard acquire_guard() { return std::lock_guard(lock); diff --git a/transactions/commit_log.hpp b/transactions/commit_log.hpp new file mode 100644 index 000000000..b5f6b5373 --- /dev/null +++ b/transactions/commit_log.hpp @@ -0,0 +1,92 @@ +#ifndef MEMGRAPH_TRANSACTIONS_COMMIT_LOG_HPP +#define MEMGRAPH_TRANSACTIONS_COMMIT_LOG_HPP + +#include "data_structures/bitset/dynamic_bitset.hpp" + +namespace tx +{ + +class CommitLog +{ +public: + struct Info + { + enum Status + { + ACTIVE = 0, // 00 + COMMITTED = 1, // 01 + ABORTED = 2, // 10 + }; + + bool is_active() const + { + return flags & ACTIVE; + } + + bool is_committed() const + { + return flags & COMMITTED; + } + + bool is_aborted() const + { + return flags & ABORTED; + } + + operator uint8_t() const + { + return flags; + } + + uint8_t flags; + }; + + CommitLog() = default; + CommitLog(CommitLog&) = delete; + CommitLog(CommitLog&&) = delete; + + CommitLog operator=(CommitLog) = delete; + + static CommitLog& get() + { + static CommitLog log; + return log; + } + + Info fetch_info(uint64_t id) + { + return Info { log.at(2 * id, 2) }; + } + + bool is_active(uint64_t id) + { + return fetch_info(id).is_active(); + } + + bool is_committed(uint64_t id) + { + return fetch_info(id).is_committed(); + } + + void set_committed(uint64_t id) + { + log.set(2 * id); + } + + bool is_aborted(uint64_t id) + { + return fetch_info(id).is_aborted(); + } + + void set_aborted(uint64_t id) + { + log.set(2 * id + 1); + } + +private: + DynamicBitset log; +}; + +} + +#endif diff --git a/transactions/snapshot.hpp b/transactions/snapshot.hpp new file mode 100644 index 000000000..7c72e8882 --- /dev/null +++ b/transactions/snapshot.hpp @@ -0,0 +1,37 @@ +#ifndef MEMGRAPH_TRANSACTIONS_SNAPSHOT_HPP +#define MEMGRAPH_TRANSACTIONS_SNAPSHOT_HPP + +#include +#include + +namespace tx +{ + +template +class Snapshot +{ +public: + Snapshot(std::vector active) : active(std::move(active)) {} + + Snapshot(const Snapshot& other) + { + active = other.active; + } + + Snapshot(Snapshot&& other) + { + active = std::move(other.active); + } + + bool is_active(id_t xid) const + { + return std::binary_search(active.begin(), active.end(), xid); + } + +private: + std::vector active; +}; + +} + +#endif diff --git a/transactions/transaction.hpp b/transactions/transaction.hpp index 6b01cc5e2..5133d0341 100644 --- a/transactions/transaction.hpp +++ b/transactions/transaction.hpp @@ -5,13 +5,15 @@ #include #include +#include "snapshot.hpp" + namespace tx { struct Transaction { - Transaction(uint64_t id, std::vector active) - : id(id), cid(1), active(std::move(active)) {} + Transaction(uint64_t id, Snapshot snapshot) + : id(id), cid(1), snapshot(std::move(snapshot)) {} // index of this transaction uint64_t id; @@ -19,30 +21,8 @@ struct Transaction // index of the current command in the current transaction; uint8_t cid; - // the ids of the currently active transactions used by the mvcc - // implementation for snapshot transaction isolation. - // std::vector is much faster than std::set for fewer number of items - // we don't expect the number of active transactions getting too large. - std::vector active; - - // check weather the transaction with the xid looks committed from the - // database snapshot given to this transaction - bool looks_committed(uint64_t xid) const - { - // transaction xid is newer than id and therefore not visible at all - if (xid > id) - return false; - - // transaction xid is not visible if it's currently active. the - // active transactions are sorted ascending and therefore we can stop - // looking as soon as we hit the active transaction with id greater - // than xid - for(size_t i = 0; i < active.size(); ++i) - if(xid <= active[i]) - return false; - - return true; - } + // a snapshot of currently active transactions + Snapshot snapshot; }; } diff --git a/transactions/transaction_cache.hpp b/transactions/transaction_cache.hpp index 0fb933a26..ec2397072 100644 --- a/transactions/transaction_cache.hpp +++ b/transactions/transaction_cache.hpp @@ -13,16 +13,15 @@ template class TransactionCache { public: - Transaction* get(id_t id) const { auto it = cache.find(id); - return it != cache.end() ? it->second : nullptr; + return it != cache.end() ? it->second.get() : nullptr; } void put(id_t id, Transaction* transaction) { - cache.emplace(std::make_pair(id, transaction)); + cache.emplace(std::make_pair(id, std::unique_ptr(transaction))); } void del(id_t id) diff --git a/transactions/transaction_engine.hpp b/transactions/transaction_engine.hpp index e63949013..27a4d7504 100644 --- a/transactions/transaction_engine.hpp +++ b/transactions/transaction_engine.hpp @@ -1,16 +1,12 @@ #ifndef MEMGRAPH_MVCC_TRANSACTIONENGINE_HPP #define MEMGRAPH_MVCC_TRANSACTIONENGINE_HPP -#include #include -#include #include -#include -#include -#include #include "transaction.hpp" #include "transaction_cache.hpp" +#include "commit_log.hpp" #include "utils/counters/simple_counter.hpp" @@ -29,11 +25,11 @@ public: class TransactionEngine : Lockable { public: - TransactionEngine(uint64_t n) : counter(n) {} + TransactionEngine() : counter(0) {} const Transaction& begin() { - auto guard = this->acquire(); + auto guard = this->acquire_unique(); auto id = counter.next(); auto t = new Transaction(id, active); @@ -46,7 +42,7 @@ public: const Transaction& advance(uint64_t id) { - auto guard = this->acquire(); + auto guard = this->acquire_unique(); auto* t = cache.get(id); @@ -61,35 +57,37 @@ public: void commit(const Transaction& t) { - auto guard = this->acquire(); + auto guard = this->acquire_unique(); + CommitLog::get().set_committed(t.id); finalize(t); } - void rollback(const Transaction& t) + void abort(const Transaction& t) { - auto guard = this->acquire(); + auto guard = this->acquire_unique(); + CommitLog::get().set_aborted(t.id); finalize(t); } uint64_t last_known_active() { - auto guard = this->acquire(); + auto guard = this->acquire_unique(); return active.front(); } // total number of transactions started from the beginning of time uint64_t count() { - auto guard = this->acquire(); + auto guard = this->acquire_unique(); return counter.count(); } // the number of currently active transactions size_t size() { - auto guard = this->acquire(); + auto guard = this->acquire_unique(); return active.size(); } diff --git a/utils/ioc/container.hpp b/utils/ioc/container.hpp index a3f4622e4..9d00df900 100644 --- a/utils/ioc/container.hpp +++ b/utils/ioc/container.hpp @@ -85,7 +85,7 @@ public: template void factory(typename Creator::func&& f) - { + { items[key()] = std::move(Holdable::uptr( new Creator(std::forward::func>(f)) )); diff --git a/utils/mark_ref.hpp b/utils/mark_ref.hpp index b98d5c731..9737d61e3 100644 --- a/utils/mark_ref.hpp +++ b/utils/mark_ref.hpp @@ -33,13 +33,9 @@ struct MarkRef T& operator*() { return *get(); } T* operator->() { return get(); } + operator T*() { return get(); } + uintptr_t ptr; }; -//template -//MarkRef make_markref(Args&&... args) -//{ -// -//} - #endif