submitted all work done on storage and mvcc

This commit is contained in:
Dominik Tomičević 2015-10-08 00:58:29 +02:00
parent ed34e47481
commit 204fdaeb5f
38 changed files with 824 additions and 254 deletions

68
api/link_resources.py Normal file
View File

@ -0,0 +1,68 @@
# this script generates the include.hpp file for restful resources
import re
import os
resource_path = 'resources'
template_path = 'resources/include.hpp.template'
include_path = 'resources/include.hpp'
# remove the old version of the file if exists
if os.path.isfile(include_path):
os.remove(include_path)
class Resource(object):
""" represents a restful resource class for speedy """
def __init__(self, filename):
self.filename = filename
with open(os.path.join(resource_path, filename)) as f:
class_name = re.compile('\s*class\s*(\w+)\s*\:')
for line in f:
result = re.search(class_name, line)
if result is None:
continue
self.class_name = result.group(1)
break
def load_resources():
return [Resource(f) for f in os.listdir(resource_path)
if f.endswith('.hpp')]
def write_includes(file, resources):
for filename in [resource.filename for resource in resources]:
print 'writing include for', filename
file.write('#include "{}"\n'.format(filename))
def write_inits(file, resources):
for class_name in [resource.class_name for resource in resources]:
print 'writing init for', class_name
file.write(' insert<{}>(app);\n'.format(class_name))
def make_include_file():
resources = load_resources()
with open(template_path, 'r') as ftemplate:
with open(include_path, 'w') as finclude:
for line in ftemplate:
if '<INCLUDE>' in line:
write_includes(finclude, resources)
continue
if '<INIT>' in line:
write_inits(finclude, resources)
continue
finclude.write(line)
if __name__ == '__main__':
make_include_file()

23
api/resources/animal.hpp Normal file
View File

@ -0,0 +1,23 @@
#ifndef MEMGRAPH_ANIMAL_HPP
#define MEMGRAPH_ANIMAL_HPP
#include "speedy/speedy.hpp"
#include "api/restful/resource.hpp"
class Animal : public api::Resource<Animal, api::GET, api::POST>
{
public:
Animal(speedy::Speedy& app) : Resource(app, "/animal") {}
void get(http::Request& req, http::Response& res)
{
return res.send("Ok, here is a Dog");
}
void post(http::Request& req, http::Response& res)
{
return res.send("Oh, you gave me an animal?");
}
};
#endif

33
api/resources/include.hpp Normal file
View File

@ -0,0 +1,33 @@
/** @file include.hpp
* @brief Links all restful resources to the application
*
* This file is autogenerated by the python script link_resources.py
*
* YOU SHOULD NOT EDIT THIS FILE MANUALLY!
*/
#ifndef MEMGRAPH_API_RESOURCES_INCLUDE_HPP
#define MEMGRAPH_API_RESOURCES_INCLUDE_HPP
#include <list>
#include <memory>
#include "api/restful/resource.hpp"
#include "speedy/speedy.hpp"
#include "animal.hpp"
static std::list<std::unique_ptr<api::RestfulResource>> resources;
template <class T>
void insert(speedy::Speedy& app)
{
resources.push_back(std::unique_ptr<api::RestfulResource>(new T(app)));
}
void init(speedy::Speedy& app)
{
insert<Animal>(app);
}
#endif

View File

@ -0,0 +1,33 @@
/** @file include.hpp
* @brief Links all restful resources to the application
*
* This file is autogenerated by the python script link_resources.py
*
* YOU SHOULD NOT EDIT THIS FILE MANUALLY!
*/
#ifndef MEMGRAPH_API_RESOURCES_INCLUDE_HPP
#define MEMGRAPH_API_RESOURCES_INCLUDE_HPP
#include <list>
#include <memory>
#include "api/restful/resource.hpp"
#include "speedy/speedy.hpp"
<INCLUDE>
static std::list<std::unique_ptr<api::RestfulResource>> resources;
template <class T>
void insert(speedy::Speedy& app)
{
resources.push_back(std::unique_ptr<api::RestfulResource>(new T(app)));
}
void init(speedy::Speedy& app)
{
<INIT>
}
#endif

View File

@ -114,6 +114,8 @@ struct Methods<T, M> : public Method<T, M>
}
struct RestfulResource {};
/** @brief Represents a restful resource
*
* Automatically registers get, put, post, del... methods inside the derived
@ -128,7 +130,7 @@ struct Methods<T, M> : public Method<T, M>
* @tparam Ms... HTTP methods to register for this resource (GET, POST...)
*/
template <class T, class... Ms>
class Resource : public detail::Methods<T, Ms...>
class Resource : public detail::Methods<T, Ms...>, public RestfulResource
{
public:
Resource(speedy::Speedy& app, const std::string& path)

View File

@ -1,30 +1,14 @@
#include <iostream>
#include "speedy/speedy.hpp"
#include "resource.hpp"
class Animal : public api::Resource<Animal, api::GET, api::POST>
{
public:
Animal(speedy::Speedy& app) : Resource(app, "/animal") {}
void get(http::Request& req, http::Response& res)
{
return res.send("Ok, here is a Dog");
}
void post(http::Request& req, http::Response& res)
{
return res.send("Oh, you gave me an animal?");
}
};
#include "resources/include.hpp"
int main(void)
{
uv::UvLoop loop;
speedy::Speedy app(loop);
auto animal = Animal(app);
init(app);
http::Ipv4 ip("0.0.0.0", 3400);
app.listen(ip);

View File

@ -2,11 +2,7 @@
#define MEMGRAPH_DATA_STRUCTURES_BITSET_DYNAMIC_BITSET_HPP
#include <cassert>
#include <vector>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp"
@ -140,7 +136,7 @@ private:
// the next chunk does not exist and we need it. take an exclusive
// lock to prevent others that also want to create a new chunk
// from creating it
auto guard = acquire();
auto guard = acquire_unique();
// double-check locking. if the chunk exists now, some other thread
// has just created it, continue searching for my chunk

View File

@ -136,7 +136,7 @@ public:
// we only care about push_front and iterator performance so we can
// tradeoff some remove speed for better reads and inserts. remove is
// used exclusively by the GC thread(s) so it can be slower
auto guard = acquire();
auto guard = acquire_unique();
// even though concurrent removes are synchronized, we need to worry
// about concurrent reads (solved by using atomics) and concurrent

View File

@ -6,42 +6,39 @@
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp"
namespace spinlock
{
template <class T>
class Queue : Lockable<SpinLock>
class SlQueue : Lockable<SpinLock>
{
public:
template <class... Args>
void emplace(Args&&... args)
{
auto guard = acquire();
auto guard = acquire_unique();
queue.emplace(args...);
}
void push(const T& item)
{
auto guard = acquire();
auto guard = acquire_unique();
queue.push(item);
}
T front()
{
auto guard = acquire();
auto guard = acquire_unique();
return queue.front();
}
void pop()
{
auto guard = acquire();
auto guard = acquire_unique();
queue.pop();
}
bool pop(T& item)
{
auto guard = acquire();
auto guard = acquire_unique();
if(queue.empty())
return false;
@ -52,13 +49,13 @@ public:
bool empty()
{
auto guard = acquire();
auto guard = acquire_unique();
return queue.empty();
}
size_t size()
{
auto guard = acquire();
auto guard = acquire_unique();
return queue.size();
}
@ -66,6 +63,4 @@ private:
std::queue<T> queue;
};
}
#endif

View File

@ -0,0 +1,61 @@
#ifndef MEMGRAPH_DATA_STRUCTURES_SKIPLIST_LOCKFREE_SKIPLIST_HPP
#define MEMGRAPH_DATA_STRUCTURES_SKIPLIST_LOCKFREE_SKIPLIST_HPP
#include <cstdlib>
#include <atomic>
#include "utils/mark_ref.hpp"
namespace lockfree
{
template <class K, class T>
class SkipList
{
public:
struct Node
{
using ref_t = MarkRef<Node>;
K* key;
T* item;
const uint8_t height;
static Node* create(uint8_t height, K* key, T* item)
{
auto size = sizeof(Node) + height * sizeof(std::atomic<ref_t>);
auto node = static_cast<Node*>(std::malloc(size));
return new (node) Node(height, key, item);
}
static void destroy(Node* node)
{
node->~SkipNode();
free(node);
}
private:
Node(uint8_t height, K* key, T* item)
: key(key), item(item), height(height)
{
for(uint8_t i = 0; i < height; ++i)
new (&tower[i]) std::atomic<ref_t>(nullptr);
}
// this creates an array of the size zero. we can't put any sensible
// value here since we don't know what size it will be untill the
// node is allocated. we could make it a SkipNode** but then we would
// have two memory allocations, one for node and one for the forward
// list. this way we avoid expensive malloc/free calls and also cache
// thrashing when following a pointer on the heap
std::atomic<ref_t> tower[0];
};
//void list_search(const K& key,
};
}
#endif

View File

@ -28,7 +28,7 @@ public:
~SkipList()
{
for(Node* current = header.load(std::memory_order_relaxed); current;)
for(Node* current = header.load(); current;)
{
Node* next = current->forward(0);
Node::destroy(current);
@ -38,7 +38,7 @@ public:
size_t size() const
{
return size_.load(std::memory_order_relaxed);
return size_.load();
}
uint8_t height() const
@ -60,7 +60,7 @@ public:
size_t increment_size(size_t delta)
{
return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
return size_.fetch_add(delta) + delta;
}
int find_path(Node* from,
@ -92,7 +92,7 @@ public:
Node* find(const K* const key)
{
Node* pred = header.load(std::memory_order_consume);
Node* pred = header.load();
Node* node = nullptr;
uint8_t level = pred->height;
@ -150,7 +150,7 @@ public:
while(true)
{
auto head = header.load(std::memory_order_consume);
auto head = header.load();
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(lfound != -1)
@ -215,7 +215,7 @@ public:
while(true)
{
auto head = header.load(std::memory_order_consume);
auto head = header.load();
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(!marked && (lfound == -1 || !ok_delete(succs[lfound], lfound)))

View File

@ -3,15 +3,14 @@
#include <map>
#include "utils/sync/spinlock.hpp"
#include "threading/sync/spinlock.hpp"
template <class K, class T>
class SlRbTree
class SlRbTree : Lockable<SpinLock>
{
public:
private:
SpinLock lock;
std::map<K, T> tree;
};

View File

@ -29,7 +29,6 @@ public:
}
private:
SpinLock lock;
std::stack<T> stack;
};

20
database/db.hpp Normal file
View File

@ -0,0 +1,20 @@
#ifndef MEMGRAPH_DATABASE_DB_HPP
#define MEMGRAPH_DATABASE_DB_HPP
#include "transactions/transaction_engine.hpp"
#include "transactions/commit_log.hpp"
class Db
{
public:
static Db& get()
{
static Db db;
return db;
}
tx::CommitLog commit_log;
tx::TransactionEngine transaction_engine;
};
#endif

View File

@ -1,77 +0,0 @@
#include <iostream>
#include "utils/ioc/container.hpp"
struct A
{
A()
{
std::cout << "Constructor of A" << std::endl;
}
int a = 3;
};
struct C;
struct B
{
B(std::shared_ptr<A>&& a, std::shared_ptr<C>&& c)
: a(a), c(c)
{
std::cout << "Constructor of B" << std::endl;
}
int b = 4;
std::shared_ptr<A> a;
std::shared_ptr<C> c;
};
struct C
{
C(int c) : c(c)
{
std::cout << "Constructor of C" << std::endl;
}
int c;
};
int main(void)
{
ioc::Container container;
// register a singleton class A
auto a = container.singleton<A>();
std::cout << a->a << std::endl; // should print 3
// register a factory function that makes C
container.factory<C>([]() {
return std::make_shared<C>(5);
});
// try to resolve A
auto aa = container.resolve<A>();
std::cout << aa->a << std::endl; // should print 3
// register a singleton class B with dependencies A and C
// (A will be resolved as an existing singleton and C will
// be created by the factory function defined above)
auto b = container.singleton<B, A, C>();
std::cout << b->b << std::endl; // should print 4
std::cout << b->a->a << std::endl; // should print 3
std::cout << b->c->c << std::endl; // should print 5
// try to resolve B
auto bb = container.resolve<B>();
std::cout << bb->b << std::endl; // should print 4
// try to resolve C
// (will be created as a new instance by the factory
// function defined above)
auto c = container.resolve<C>();
std::cout << c->c << std::endl; // should print 5
return 0;
};

29
memory/literals.hpp Normal file
View File

@ -0,0 +1,29 @@
#ifndef MEMGRAPH_MEMORY_LITERALS_HPP
#define MEMGRAPH_MEMORY_LITERALS_HPP
#include <cstdint>
namespace memory
{
namespace literals
{
constexpr unsigned long long operator"" _GB(unsigned long long gb)
{
return 1024 * 1024 * 1024 * gb;
}
constexpr unsigned long long operator"" _MB(unsigned long long mb)
{
return 1024 * 1024 * mb;
}
constexpr unsigned long long operator"" _kB(unsigned long long kb)
{
return 1024 * kb;
}
}
}
#endif

View File

@ -3,7 +3,7 @@
#include "threading/sync/lockable.hpp"
#include "transaction.hpp"
#include "transactions/transaction.hpp"
#include "version.hpp"
namespace mvcc
@ -11,11 +11,10 @@ namespace mvcc
template <class T>
class Atom : public Version<T>,
public Lockable<>
public Lockable<SpinLock>
{
public:
Atom(uint64_t id, T* first)
: Version<T>(first), id(id)
Atom(uint64_t id, T* first) : Version<T>(first), id(id)
{
// it's illegal that the first version is nullptr. there should be at
// least one version of a record
@ -29,27 +28,17 @@ public:
// inspects the record change history and returns the record version visible
// to the current transaction if it exists, otherwise it returns nullptr
T* latest_visible(const Transaction& t)
T* latest_visible(const tx::Transaction& t)
{
return first()->latest_visible(t);
}
// inspects the record change history and returns the newest available
// version from all transactions
T& newest_available()
{
T* record = this->newer(), newer = this->newer();
while(newer != nullptr)
record = newer, newer = record->newer();
return record;
}
// every record has a unique id. 2^64 = 1.8 x 10^19. that should be enough
// for a looong time :) but keep in mind that some vacuuming would be nice
// to reuse indices for deleted nodes.
uint64_t id;
std::atomic<Atom<T>*> next;
};
}

115
mvcc/hints.hpp Normal file
View File

@ -0,0 +1,115 @@
#ifndef MEMGRAPH_MVCC_HINTS_HPP
#define MEMGRAPH_MVCC_HINTS_HPP
#include <atomic>
#include <unistd.h>
#include "transactions/commit_log.hpp"
namespace mvcc
{
// known committed and known aborted for both xmax and xmin
// this hints are used to quickly check the commit/abort status of the
// transaction that created this record. if these are not set, one should
// consult the commit log to find the status and update the status here
// more info https://wiki.postgresql.org/wiki/Hint_Bits
class Hints
{
public:
union HintBits;
private:
enum Flags {
MIN_CMT = 1, // XX01
MIN_ABT = 1 << 1, // XX10
MAX_CMT = 1 << 2, // 01XX
MAX_ABT = 1 << 3 // 10XX
};
template <Flags COMMITTED, Flags ABORTED>
class TxHints
{
using type = TxHints<COMMITTED, ABORTED>;
public:
TxHints(std::atomic<uint8_t>& bits)
: bits(bits) {}
struct Value
{
bool is_committed() const
{
return bits & COMMITTED;
}
bool is_aborted() const
{
return bits & ABORTED;
}
bool is_unknown() const
{
return !(is_committed() || is_aborted());
}
uint8_t bits;
};
Value load(std::memory_order order = std::memory_order_seq_cst)
{
return Value { bits.load(order) };
}
void set_committed()
{
bits.fetch_or(COMMITTED);
}
void set_aborted()
{
bits.fetch_or(ABORTED);
}
private:
std::atomic<uint8_t>& bits;
};
struct Min : public TxHints<MIN_CMT, MIN_ABT>
{
using TxHints::TxHints;
};
struct Max : public TxHints<MAX_CMT, MAX_ABT>
{
using TxHints::TxHints;
};
public:
Hints() : min(bits), max(bits)
{
assert(bits.is_lock_free());
}
union HintBits
{
uint8_t bits;
Min::Value min;
Max::Value max;
};
HintBits load(std::memory_order order = std::memory_order_seq_cst)
{
return HintBits { bits.load(order) };
}
Min min;
Max max;
std::atomic<uint8_t> bits { 0 };
};
}
#endif

View File

@ -3,9 +3,12 @@
#include <atomic>
#include "transaction.hpp"
#include "transactions/transaction.hpp"
#include "transactions/commit_log.hpp"
#include "minmax.hpp"
#include "version.hpp"
#include "hints.hpp"
// the mvcc implementation used here is very much like postgresql's
// more info: https://momjian.us/main/writings/pgsql/mvcc.pdf
@ -17,7 +20,7 @@ template <class T>
class Mvcc : public Version<T>
{
public:
Mvcc() {}
Mvcc() = default;
// tx.min is the id of the transaction that created the record
// and tx.max is the id of the transaction that deleted the record
@ -32,7 +35,7 @@ public:
MinMax<uint8_t> cmd;
// check if this record is visible to the transaction t
bool visible(const Transaction& t)
bool visible(const tx::Transaction& t)
{
// TODO check if the record was created by a transaction that has been
// aborted. one might implement this by checking the hints in mvcc
@ -42,30 +45,30 @@ public:
// if you think they're not, you're wrong, and you should think about it
// again. i know, it happened to me.
return ((tx.min() == t.id && // inserted by the current transaction
cmd.min() < t.cid && // before this command, and
(tx.max() == 0 || // the row has not been deleted, or
(tx.max() == t.id && // it was deleted by the current
// transaction
cmd.max() >= t.cid))) // but not before this command,
|| // or
(t.committed(tx.min()) && // the record was inserted by a
// committed transaction, and
(tx.max() == 0 || // the record has not been deleted, or
(tx.max() == t.id && // the row is being deleted by this
// transaction
cmd.max() >= t.cid) || // but it's not deleted "yet", or
(tx.max() != t.id && // the row was deleted by another
// transaction
!t.committed(tx.max()) // that has not been committed
return ((tx.min() == t.id && // inserted by the current transaction
cmd.min() < t.cid && // before this command, and
(tx.max() == 0 || // the row has not been deleted, or
(tx.max() == t.id && // it was deleted by the current
// transaction
cmd.max() >= t.cid))) // but not before this command,
|| // or
(min_committed(tx.min(), t) && // the record was inserted by a
// committed transaction, and
(tx.max() == 0 || // the record has not been deleted, or
(tx.max() == t.id && // the row is being deleted by this
// transaction
cmd.max() >= t.cid) || // but it's not deleted "yet", or
(tx.max() != t.id && // the row was deleted by another
// transaction
!max_committed(tx.max(), t) // that has not been committed
))));
}
// inspects the record change history and returns the record version visible
// to the current transaction if it exists, otherwise it returns nullptr
T* latest_visible(const Transaction& t)
T* latest_visible(const tx::Transaction& t)
{
T* record = this, newer = this->newer();
T* record = static_cast<T*>(this), *newer = this->newer();
// move down through the versions of the nodes until you find the first
// one visible to this transaction. if no visible records are found,
@ -76,25 +79,59 @@ public:
return record;
}
void mark_created(const Transaction& t)
void mark_created(const tx::Transaction& t)
{
tx.min(t.id);
cmd.min(t.cid);
}
void mark_deleted(const Transaction& t)
void mark_deleted(const tx::Transaction& t)
{
tx.max(t.id);
cmd.max(t.cid);
}
protected:
// known committed and known aborted for both xmax and xmin
// this hints are used to quickly check the commit/abort status of the
// transaction that created this record. if these are not set, one should
// consult the commit log to find the status and update the status here
// more info https://wiki.postgresql.org/wiki/Hint_Bits
std::atomic<uint8_t> hints;
Hints hints;
bool max_committed(uint64_t id, const tx::Transaction& t)
{
return committed(hints.max, id, t);
}
bool min_committed(uint64_t id, const tx::Transaction& t)
{
return committed(hints.min, id, t);
}
template <class U>
bool committed(U& hints, uint64_t id, const tx::Transaction& t)
{
auto hint_bits = hints.load();
// if hints are set, return if xid is committed
if(!hint_bits.is_unknown())
return hint_bits.is_committed();
// if hints are not set:
// - the creating transaction is still in progress (examine snapshot)
if(t.snapshot.is_active(id))
return false;
// - you are the first one to check since it ended, consult commit log
auto& clog = tx::CommitLog::get();
auto info = clog.fetch_info(id);
if(info.is_committed())
{
hints.set_committed();
return true;
}
assert(info.is_aborted());
hints.set_aborted();
return false;
}
};
}

17
mvcc/mvcc_error.hpp Normal file
View File

@ -0,0 +1,17 @@
#ifndef MEMGRAPH_MVCC_MVCC_ERROR_HPP
#define MEMGRAPH_MVCC_MVCC_ERROR_HPP
#include <stdexcept>
namespace mvcc
{
class MvccError : public std::runtime_error
{
public:
using runtime_error::runtime_error;
};
}
#endif

View File

@ -1,10 +1,9 @@
#ifndef MEMGRAPH_STORAGE_MVCC_STORE_HPP
#define MEMGRAPH_STORAGE_MVCC_STORE_HPP
#include <stdexcept>
#include "mvcc/transaction.hpp"
#include "transactions/transaction.hpp"
#include "mvcc/atom.hpp"
#include "mvcc/mvcc_error.hpp"
#include "data_structures/list/lockfree_list.hpp"
#include "utils/counters/atomic_counter.hpp"
@ -15,40 +14,57 @@
namespace mvcc
{
class MvccError : public std::runtime_error
{
public:
using runtime_error::runtime_error;
};
template <class T>
class MvccStore
{
using list_t = lockfree::List<Atom<T>>;
public:
using iterator = typename list_t::iterator;
using read_iterator = typename list_t::read_iterator;
using read_write_iterator = typename list_t::read_write_iterator;
MvccStore() : counter(0) {}
iterator insert(const Transaction& t)
read_iterator begin()
{
auto record = new T();
record->mark_created(t);
return data.push_front(Atom<T>(counter.next(), record));
return data.begin();
}
T* update(Atom<T>& atom, T& record, const Transaction& t)
read_write_iterator rw_begin()
{
return data.rw_begin();
}
Atom<T>* insert(const tx::Transaction& t)
{
// create a first version of the record
auto record = new T();
// mark the record as created by the transaction t
record->mark_created(t);
// create an Atom to put in the list
auto atom = new Atom<T>(counter.next(), record);
// put the atom with the record to the linked list
data.push_front(atom);
return atom;
}
T* update(Atom<T>& atom, T& record, const tx::Transaction& t)
{
auto guard = atom.acquire();
// if xmax is not zero, that means there is a newer version of this
// record or it has been deleted. we cannot do anything here
// record or it has been deleted. we cannot do anything here until
// we implement some more intelligent locking mechanisms
if(record.tx.max())
throw MvccError("can't serialize due to concurrent operation(s)");
assert(atom.latest_visible(t) == &record);
assert(atom.latest_visible(t) == record.latest_visible(t));
// make a new version
auto updated = new T();
*updated = *record;
@ -63,7 +79,7 @@ public:
return updated;
}
void remove(Atom<T>& atom, T& record, const Transaction& t)
void remove(Atom<T>& atom, T& record, const tx::Transaction& t)
{
auto guard = atom.acquire();

View File

@ -12,18 +12,23 @@ class Version
public:
Version() : versions(nullptr) {}
~Version()
{
delete versions.load();
}
Version(T* value) : versions(value) {}
// return a pointer to a newer version stored in this record
T* newer()
{
return versions.load(std::memory_order_relaxed);
return versions.load();
}
// set a newer version of this record
void newer(T* value)
{
versions.store(value, std::memory_order_relaxed);
versions.store(value);
}
private:

View File

@ -99,6 +99,7 @@ public:
R3(R3&& other)
{
this->routes = std::move(other.routes);
this->root = other.root;
other.root = nullptr;
}

View File

@ -20,7 +20,10 @@ namespace speedy
class Speedy
{
public:
Speedy(uv::UvLoop& loop) : server(loop), router(100) {}
using sptr = std::shared_ptr<Speedy>;
Speedy(uv::UvLoop& loop, size_t capacity = 100)
: server(loop), router(capacity) {}
Speedy(Speedy&) = delete;
Speedy(Speedy&&) = delete;

94
speedy/speedy.inl Normal file
View File

@ -0,0 +1,94 @@
#ifndef MEMGRAPH_SPEEDY_INL
#define MEMGRAPH_SPEEDY_INL
#include "speedy.hpp"
namespace speedy
{
int r3_request_method(http::Method method)
{
switch (method) {
case http::Method::GET: return METHOD_GET;
case http::Method::POST: return METHOD_POST;
case http::Method::PUT: return METHOD_PUT;
case http::Method::DELETE: return METHOD_DELETE;
case http::Method::HEAD: return METHOD_HEAD;
}
}
// TODO: better implementation
Speedy::Speedy(uv::UvLoop& loop, const http::Ipv4& ip) : server(loop), ip(ip)
{
n = r3_tree_create(100);
}
void Speedy::store_callback(int method,
const std::string &path,
http::request_cb_t callback)
{
callbacks.push_back(callback);
void *ptr = malloc(sizeof(uint));
*((uint *)ptr) = callbacks.size() - 1;
r3_tree_insert_routel(n, method, path.c_str(), path.size(), ptr);
}
void Speedy::get(const std::string &path, http::request_cb_t callback)
{
store_callback(METHOD_GET, path, callback);
// TODO: something like this
// this solution doesn't work, currenlty I don't know why
// callbacks.push_back(callback)
// r3_tree_insert_pathl(n, path.c_str(), path.size(), &callbacks.back());
}
void Speedy::post(const std::string &path, http::request_cb_t callback)
{
store_callback(METHOD_POST, path, callback);
}
void Speedy::put(const std::string &path, http::request_cb_t callback)
{
store_callback(METHOD_PUT, path, callback);
}
void Speedy::del(const std::string &path, http::request_cb_t callback)
{
store_callback(METHOD_DELETE, path, callback);
}
void Speedy::listen()
{
char *errstr = NULL;
int err = r3_tree_compile(n, &errstr);
if (err) {
std::cout << "R3 compile error" << std::endl;
}
server.listen(ip, [this](http::Request& req, http::Response& res) {
auto url = req.url;
auto c_url = url.c_str();
match_entry *entry = match_entry_create(c_url);
entry->request_method = r3_request_method(req.method);
route *r = r3_tree_match_route(this->n, entry);
match_entry_free(entry);
if (r) {
int index = *((int *)r->data);
auto callback = this->callbacks[index];
callback(req, res);
// TODO: and something like this
// auto callback = *reinterpret_cast<http::request_cb_t*>(n->data);
// callback(req, res);
} else {
res.send("Not found");
}
});
std::cout << "Server is UP" << std::endl;
}
}
#endif

View File

@ -5,6 +5,7 @@
#include "mvcc/atom.hpp"
#include "mvcc/store.hpp"
#include "mvcc/mvcc_error.hpp"
#include "vertex.hpp"
#include "edge.hpp"
@ -17,13 +18,24 @@ class Graph
public:
Graph() {}
EdgeStore::iterator connect(Vertex a, Vertex b, const Transaction& t)
mvcc::Atom<Edge>* connect(mvcc::Atom<Vertex>& atom_a, Vertex& a,
mvcc::Atom<Vertex>& atom_b, Vertex& b,
const tx::Transaction& t)
{
auto it = edges.insert(t);
// try to lock A
auto guard_a = atom_a.acquire_unique();
it->
if(a.tx.max())
throw mvcc::MvccError("can't serialize due to\
concurrent operation(s)");
return it;
auto guard_b = atom_b.acquire_unique();
if(b.tx.max())
throw mvcc::MvccError("can't serialize due to\
concurrent operation(s)");
return edges.insert(t);
}
VertexStore vertices;

View File

@ -4,6 +4,7 @@
#include <map>
#include "property.hpp"
#include "string.hpp"
namespace model
{
@ -44,8 +45,8 @@ public:
kvp.second->dump(buffer); buffer += ',';
}
buffer.pop_back(); // erase last comma
buffer += '}';
// replace last redundant comma with }
buffer.back() = '}';
}
private:

View File

@ -1,6 +1,7 @@
#ifndef MEMGRAPH_STORAGE_RECORD_HPP
#define MEMGRAPH_STORAGE_RECORD_HPP
#include <ostream>
#include <mutex>
#include <set>

View File

@ -12,4 +12,14 @@ struct Vertex : public Record<Vertex>
std::vector<Edge*> out;
};
inline std::ostream& operator<<(std::ostream& stream, Vertex& record)
{
std::string props;
record.properties.dump(props);
return stream << "Vertex"
<< "(xmin = " << record.tx.min()
<< ", xmax = " << record.tx.max()
<< "): " << props;
}
#endif

View File

@ -37,7 +37,7 @@ public:
void run(F&& f, Args&&... args)
{
{
auto lock = acquire();
auto lock = acquire_unique();
tasks.emplace([&f, &args...]() {
f(std::forward<Args>(args)...);
@ -62,7 +62,7 @@ private:
task_t task;
{
auto lock = acquire();
auto lock = acquire_unique();
cond.wait(lock, [this] {
return !this->alive || !this->tasks.empty();

View File

@ -7,7 +7,9 @@
template <class lock_t = SpinLock>
class Lockable
{
protected:
public:
using lock_type = lock_t;
std::lock_guard<lock_t> acquire_guard()
{
return std::lock_guard<lock_t>(lock);

View File

@ -0,0 +1,92 @@
#ifndef MEMGRAPH_TRANSACTIONS_COMMIT_LOG_HPP
#define MEMGRAPH_TRANSACTIONS_COMMIT_LOG_HPP
#include "data_structures/bitset/dynamic_bitset.hpp"
namespace tx
{
class CommitLog
{
public:
struct Info
{
enum Status
{
ACTIVE = 0, // 00
COMMITTED = 1, // 01
ABORTED = 2, // 10
};
bool is_active() const
{
return flags & ACTIVE;
}
bool is_committed() const
{
return flags & COMMITTED;
}
bool is_aborted() const
{
return flags & ABORTED;
}
operator uint8_t() const
{
return flags;
}
uint8_t flags;
};
CommitLog() = default;
CommitLog(CommitLog&) = delete;
CommitLog(CommitLog&&) = delete;
CommitLog operator=(CommitLog) = delete;
static CommitLog& get()
{
static CommitLog log;
return log;
}
Info fetch_info(uint64_t id)
{
return Info { log.at(2 * id, 2) };
}
bool is_active(uint64_t id)
{
return fetch_info(id).is_active();
}
bool is_committed(uint64_t id)
{
return fetch_info(id).is_committed();
}
void set_committed(uint64_t id)
{
log.set(2 * id);
}
bool is_aborted(uint64_t id)
{
return fetch_info(id).is_aborted();
}
void set_aborted(uint64_t id)
{
log.set(2 * id + 1);
}
private:
DynamicBitset<uint8_t, 32768> log;
};
}
#endif

37
transactions/snapshot.hpp Normal file
View File

@ -0,0 +1,37 @@
#ifndef MEMGRAPH_TRANSACTIONS_SNAPSHOT_HPP
#define MEMGRAPH_TRANSACTIONS_SNAPSHOT_HPP
#include <vector>
#include <algorithm>
namespace tx
{
template <class id_t>
class Snapshot
{
public:
Snapshot(std::vector<id_t> active) : active(std::move(active)) {}
Snapshot(const Snapshot& other)
{
active = other.active;
}
Snapshot(Snapshot&& other)
{
active = std::move(other.active);
}
bool is_active(id_t xid) const
{
return std::binary_search(active.begin(), active.end(), xid);
}
private:
std::vector<id_t> active;
};
}
#endif

View File

@ -5,13 +5,15 @@
#include <cstdint>
#include <vector>
#include "snapshot.hpp"
namespace tx
{
struct Transaction
{
Transaction(uint64_t id, std::vector<uint64_t> active)
: id(id), cid(1), active(std::move(active)) {}
Transaction(uint64_t id, Snapshot<uint64_t> snapshot)
: id(id), cid(1), snapshot(std::move(snapshot)) {}
// index of this transaction
uint64_t id;
@ -19,30 +21,8 @@ struct Transaction
// index of the current command in the current transaction;
uint8_t cid;
// the ids of the currently active transactions used by the mvcc
// implementation for snapshot transaction isolation.
// std::vector is much faster than std::set for fewer number of items
// we don't expect the number of active transactions getting too large.
std::vector<uint64_t> active;
// check weather the transaction with the xid looks committed from the
// database snapshot given to this transaction
bool looks_committed(uint64_t xid) const
{
// transaction xid is newer than id and therefore not visible at all
if (xid > id)
return false;
// transaction xid is not visible if it's currently active. the
// active transactions are sorted ascending and therefore we can stop
// looking as soon as we hit the active transaction with id greater
// than xid
for(size_t i = 0; i < active.size(); ++i)
if(xid <= active[i])
return false;
return true;
}
// a snapshot of currently active transactions
Snapshot<uint64_t> snapshot;
};
}

View File

@ -13,16 +13,15 @@ template <class id_t>
class TransactionCache
{
public:
Transaction* get(id_t id) const
{
auto it = cache.find(id);
return it != cache.end() ? it->second : nullptr;
return it != cache.end() ? it->second.get() : nullptr;
}
void put(id_t id, Transaction* transaction)
{
cache.emplace(std::make_pair(id, transaction));
cache.emplace(std::make_pair(id, std::unique_ptr<Transaction>(transaction)));
}
void del(id_t id)

View File

@ -1,16 +1,12 @@
#ifndef MEMGRAPH_MVCC_TRANSACTIONENGINE_HPP
#define MEMGRAPH_MVCC_TRANSACTIONENGINE_HPP
#include <cstdlib>
#include <atomic>
#include <mutex>
#include <vector>
#include <set>
#include <list>
#include <algorithm>
#include "transaction.hpp"
#include "transaction_cache.hpp"
#include "commit_log.hpp"
#include "utils/counters/simple_counter.hpp"
@ -29,11 +25,11 @@ public:
class TransactionEngine : Lockable<SpinLock>
{
public:
TransactionEngine(uint64_t n) : counter(n) {}
TransactionEngine() : counter(0) {}
const Transaction& begin()
{
auto guard = this->acquire();
auto guard = this->acquire_unique();
auto id = counter.next();
auto t = new Transaction(id, active);
@ -46,7 +42,7 @@ public:
const Transaction& advance(uint64_t id)
{
auto guard = this->acquire();
auto guard = this->acquire_unique();
auto* t = cache.get(id);
@ -61,35 +57,37 @@ public:
void commit(const Transaction& t)
{
auto guard = this->acquire();
auto guard = this->acquire_unique();
CommitLog::get().set_committed(t.id);
finalize(t);
}
void rollback(const Transaction& t)
void abort(const Transaction& t)
{
auto guard = this->acquire();
auto guard = this->acquire_unique();
CommitLog::get().set_aborted(t.id);
finalize(t);
}
uint64_t last_known_active()
{
auto guard = this->acquire();
auto guard = this->acquire_unique();
return active.front();
}
// total number of transactions started from the beginning of time
uint64_t count()
{
auto guard = this->acquire();
auto guard = this->acquire_unique();
return counter.count();
}
// the number of currently active transactions
size_t size()
{
auto guard = this->acquire();
auto guard = this->acquire_unique();
return active.size();
}

View File

@ -85,7 +85,7 @@ public:
template <class T>
void factory(typename Creator<T>::func&& f)
{
{
items[key<T>()] = std::move(Holdable::uptr(
new Creator<T>(std::forward<typename Creator<T>::func>(f))
));

View File

@ -33,13 +33,9 @@ struct MarkRef
T& operator*() { return *get(); }
T* operator->() { return get(); }
operator T*() { return get(); }
uintptr_t ptr;
};
//template <class T, class... Args>
//MarkRef<T> make_markref(Args&&... args)
//{
//
//}
#endif