Added cleaner.

Added multithreading to dbms.
Skiplist now supports emplace insert.
This commit is contained in:
Kruno Tomola Fabro 2016-08-30 00:45:07 +01:00
parent 2218b0e472
commit b2ce3d58a4
38 changed files with 509 additions and 104 deletions

View File

@ -402,6 +402,8 @@ EXECUTE_PROCESS(
# TODO: create separate static library from bolt code # TODO: create separate static library from bolt code
set(memgraph_src_files set(memgraph_src_files
${src_dir}/dbms/dbms.cpp
${src_dir}/dbms/cleaner.cpp
${src_dir}/utils/string/transform.cpp ${src_dir}/utils/string/transform.cpp
${src_dir}/utils/string/join.cpp ${src_dir}/utils/string/join.cpp
${src_dir}/utils/string/file.cpp ${src_dir}/utils/string/file.cpp
@ -416,6 +418,7 @@ set(memgraph_src_files
${src_dir}/communication/bolt/v1/transport/bolt_decoder.cpp ${src_dir}/communication/bolt/v1/transport/bolt_decoder.cpp
${src_dir}/communication/bolt/v1/transport/buffer.cpp ${src_dir}/communication/bolt/v1/transport/buffer.cpp
${src_dir}/communication/bolt/v1/serialization/bolt_serializer.cpp ${src_dir}/communication/bolt/v1/serialization/bolt_serializer.cpp
${src_dir}/threading/thread.cpp
${src_dir}/mvcc/id.cpp ${src_dir}/mvcc/id.cpp
${src_dir}/storage/vertices.cpp ${src_dir}/storage/vertices.cpp
${src_dir}/storage/edges.cpp ${src_dir}/storage/edges.cpp

View File

@ -1,8 +1,8 @@
#pragma once #pragma once
#include "communication/bolt/v1/states.hpp" #include "communication/bolt/v1/states.hpp"
#include "io/network/socket.hpp"
#include "dbms/dbms.hpp" #include "dbms/dbms.hpp"
#include "io/network/socket.hpp"
namespace bolt namespace bolt
{ {
@ -16,11 +16,10 @@ class Bolt
public: public:
Bolt(); Bolt();
Session* create_session(io::Socket&& socket); Session *create_session(io::Socket &&socket);
void close(Session* session); void close(Session *session);
States states; States states;
Dbms dbms; Dbms dbms;
}; };
} }

View File

@ -33,7 +33,7 @@ public:
std::pair<list_it, bool> insert(const K &key, T &&data) std::pair<list_it, bool> insert(const K &key, T &&data)
{ {
return accessor.insert(item_t(key, std::forward<T>(data))); return accessor.insert(item_t(key, std::move(data)));
} }
std::pair<list_it, bool> insert(K &&key, T &&data) std::pair<list_it, bool> insert(K &&key, T &&data)
@ -42,6 +42,17 @@ public:
item_t(std::forward<K>(key), std::forward<T>(data))); item_t(std::forward<K>(key), std::forward<T>(data)));
} }
template <class... Args1, class... Args2>
std::pair<list_it, bool> emplace(const K &key,
std::tuple<Args1...> first_args,
std::tuple<Args2...> second_args)
{
return accessor.emplace(
key, std::piecewise_construct,
std::forward<std::tuple<Args1...>>(first_args),
std::forward<std::tuple<Args2...>>(second_args));
}
list_it_con find(const K &key) const { return accessor.find(key); } list_it_con find(const K &key) const { return accessor.find(key); }
list_it find(const K &key) { return accessor.find(key); } list_it find(const K &key) { return accessor.find(key); }

View File

@ -148,7 +148,7 @@ public:
static Node *create(const T &item, uint8_t height) static Node *create(const T &item, uint8_t height)
{ {
return create(item, height); return create(height, item);
} }
static Node *create(T &&item, uint8_t height) static Node *create(T &&item, uint8_t height)
@ -160,6 +160,16 @@ public:
return new (node) Node(std::move(item), height); return new (node) Node(std::move(item), height);
} }
template <class... Args>
static Node *emplace(uint8_t height, Args &&... args)
{
auto node = allocate(height);
// we have raw memory and we need to construct an object
// of type Node on it
return new (node) Node(height, std::forward<Args>(args)...);
}
static void destroy(Node *node) static void destroy(Node *node)
{ {
node->~Node(); node->~Node();
@ -180,6 +190,12 @@ public:
new (&tower[i]) std::atomic<Node *>{nullptr}; new (&tower[i]) std::atomic<Node *>{nullptr};
} }
template <class... Args>
Node(uint8_t height, Args &&... args) : Node(height)
{
this->data.emplace(std::forward<Args>(args)...);
}
Node(T &&data, uint8_t height) : Node(height) Node(T &&data, uint8_t height) : Node(height)
{ {
this->data.set(std::move(data)); this->data.set(std::move(data));
@ -519,12 +535,19 @@ public:
std::pair<Iterator, bool> insert(const T &item) std::pair<Iterator, bool> insert(const T &item)
{ {
return skiplist->insert(item, preds, succs); return skiplist->insert(preds, succs, item);
} }
std::pair<Iterator, bool> insert(T &&item) std::pair<Iterator, bool> insert(T &&item)
{ {
return skiplist->insert(std::move(item), preds, succs); return skiplist->insert(preds, succs, std::move(item));
}
template <class K, class... Args>
std::pair<Iterator, bool> emplace(K &key, Args &&... args)
{
return skiplist->emplace(preds, succs, key,
std::forward<Args>(args)...);
} }
Iterator insert_non_unique(const T &item) Iterator insert_non_unique(const T &item)
@ -786,13 +809,15 @@ private:
// has the locks // has the locks
if (!lock_nodes<true>(height, guards, preds, succs)) continue; if (!lock_nodes<true>(height, guards, preds, succs)) continue;
return insert_here(std::forward<T>(data), preds, succs, height, return insert_here(Node::create(std::move(data), height), preds,
guards); succs, height, guards);
} }
} }
// Insert unique data // Insert unique data
std::pair<Iterator, bool> insert(T &&data, Node *preds[], Node *succs[]) // F - type of funct which will create new node if needed. Recieves height
// of node.
std::pair<Iterator, bool> insert(Node *preds[], Node *succs[], T &&data)
{ {
while (true) { while (true) {
// TODO: before here was data.first // TODO: before here was data.first
@ -817,18 +842,53 @@ private:
// has the locks // has the locks
if (!lock_nodes<true>(height, guards, preds, succs)) continue; if (!lock_nodes<true>(height, guards, preds, succs)) continue;
return {insert_here(std::move(data), preds, succs, height, guards), return {insert_here(Node::create(std::move(data), height), preds,
succs, height, guards),
true}; true};
} }
} }
// Inserts data to specified locked location. // Insert unique data
Iterator insert_here(T &&data, Node *preds[], Node *succs[], int height, // TODO: This is almost all duplicate code from insert
guard_t guards[]) template <class K, class... Args>
std::pair<Iterator, bool> emplace(Node *preds[], Node *succs[], K &key,
Args &&... args)
{ {
// you have the locks, create a new node while (true) {
auto new_node = Node::create(std::move(data), height); // TODO: before here was data.first
auto level = find_path(this, H - 1, key, preds, succs);
if (level != -1) {
auto found = succs[level];
if (found->flags.is_marked()) continue;
while (!found->flags.is_fully_linked())
usleep(250);
return {Iterator{succs[level]}, false};
}
auto height = rnd();
guard_t guards[H];
// try to acquire the locks for predecessors up to the height of
// the new node. release the locks and try again if someone else
// has the locks
if (!lock_nodes<true>(height, guards, preds, succs)) continue;
return {
insert_here(Node::emplace(height, std::forward<Args>(args)...),
preds, succs, height, guards),
true};
}
}
// Inserts data to specified locked location.
Iterator insert_here(Node *new_node, Node *preds[], Node *succs[],
int height, guard_t guards[])
{
// Node::create(std::move(data), height)
// link the predecessors and successors, e.g. // link the predecessors and successors, e.g.
// //
// 4 HEAD ... P ------------------------> S ... NULL // 4 HEAD ... P ------------------------> S ... NULL

View File

@ -28,6 +28,7 @@ public:
// I - type of function I:const tx::Transaction& -> // I - type of function I:const tx::Transaction& ->
// std::unique_ptr<IndexBase<TypeGroupVertex,std::nullptr_t>> // std::unique_ptr<IndexBase<TypeGroupVertex,std::nullptr_t>>
// G - type of collection (verrtex/edge) // G - type of collection (verrtex/edge)
// TODO: Currently only one index at a time can be created.
template <class TG, class I, class G> template <class TG, class I, class G>
bool create_index_on_vertex_property_family(const char *name, G &coll, bool create_index_on_vertex_property_family(const char *name, G &coll,
I &create_index); I &create_index);

View File

@ -17,6 +17,7 @@ class DbTransaction
friend DbAccessor; friend DbAccessor;
public: public:
DbTransaction(Db &db);
DbTransaction(Db &db, tx::Transaction &trans) : db(db), trans(trans) {} DbTransaction(Db &db, tx::Transaction &trans) : db(db), trans(trans) {}
// Global transactional algorithms,operations and general methods meant for // Global transactional algorithms,operations and general methods meant for
@ -24,6 +25,14 @@ public:
// This should provide cleaner hierarchy of operations on database. // This should provide cleaner hierarchy of operations on database.
// For example cleaner. // For example cleaner.
// Cleans edge part of database. MUST be called by one cleaner thread at
// one time.
void clean_edge_section();
// Cleans vertex part of database. MUST be called by one cleaner thread at
// one time..
void clean_vertex_section();
// Updates indexes of Vertex/Edges in index_updates. True if indexes are // Updates indexes of Vertex/Edges in index_updates. True if indexes are
// updated successfully. False means that transaction failed. // updated successfully. False means that transaction failed.
bool update_indexes(); bool update_indexes();

25
include/dbms/cleaner.hpp Normal file
View File

@ -0,0 +1,25 @@
#pragma once
#include "database/db.hpp"
class Thread;
// How much sec is a cleaning_cycle in which cleaner will clean at most
// once.
constexpr size_t cleaning_cycle = 60;
class Cleaning
{
public:
Cleaning(ConcurrentMap<std::string, Db> &dbs);
~Cleaning();
private:
ConcurrentMap<std::string, Db> &dbms;
std::vector<std::unique_ptr<Thread>> cleaners;
std::atomic<bool> cleaning = {true};
};

32
include/dbms/dbms.hpp Normal file
View File

@ -0,0 +1,32 @@
#pragma once
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/db.hpp"
#include "dbms/cleaner.hpp"
class Dbms
{
public:
Dbms() { create_default(); }
// returns active database
Db &active();
// set active database
// if active database doesn't exist create one
Db &active(const std::string &name);
// TODO: DELETE action
private:
// creates default database
Db &create_default() { return active("default"); }
// dbs container
ConcurrentMap<std::string, Db> dbs;
// currently active database
std::atomic<Db *> active_db;
Cleaning cleaning = {dbs};
};

View File

@ -102,6 +102,12 @@ public:
return committed(hints.cre, tx.cre(), t); return committed(hints.cre, tx.cre(), t);
} }
// True if record was deleted before id.
bool is_deleted_before(const Id &id)
{
return tx.exp() != Id(0) && tx.exp() < id;
}
// TODO: Test this // TODO: Test this
// True if this record is visible for write. // True if this record is visible for write.
bool is_visible_write(const tx::Transaction &t) bool is_visible_write(const tx::Transaction &t)

View File

@ -10,32 +10,35 @@ class Version
{ {
public: public:
Version() = default; Version() = default;
Version(T* older) : older(older) {} Version(T *older) : older(older) {}
~Version() ~Version() { delete older.load(std::memory_order_seq_cst); }
{
delete older.load(std::memory_order_seq_cst);
}
// return a pointer to an older version stored in this record // return a pointer to an older version stored in this record
T* next(std::memory_order order = std::memory_order_seq_cst) T *next(std::memory_order order = std::memory_order_seq_cst)
{ {
return older.load(order); return older.load(order);
} }
const T* next(std::memory_order order = std::memory_order_seq_cst) const const T *next(std::memory_order order = std::memory_order_seq_cst) const
{ {
return older.load(order); return older.load(order);
} }
// set the older version of this record // set the older version of this record
void next(T* value, std::memory_order order = std::memory_order_seq_cst) void next(T *value, std::memory_order order = std::memory_order_seq_cst)
{ {
older.store(value, order); older.store(value, order);
} }
private: // sets if as expected
std::atomic<T*> older {nullptr}; bool cas(T *expected, T *set,
}; std::memory_order order = std::memory_order_seq_cst)
{
return older.compare_exchange_strong(expected, set, order);
}
private:
std::atomic<T *> older{nullptr};
};
} }

View File

@ -11,7 +11,7 @@ namespace mvcc
{ {
template <class T> template <class T>
class VersionList : public LazyGC<VersionList<T>> class VersionList
{ {
friend class Accessor; friend class Accessor;
@ -51,6 +51,56 @@ public:
auto gc_lock_acquire() { return std::unique_lock<RecordLock>(lock); } auto gc_lock_acquire() { return std::unique_lock<RecordLock>(lock); }
// Frees all records which are deleted by transaction older than given id.
// EXPECTS THAT THERE IS NO ACTIVE TRANSACTION WITH ID LESS THAN GIVEN ID.
// EXPECTS THAT THERE WON'T BE SIMULATAIUS CALLS FROM DIFFERENT THREADS OF
// THIS METHOD.
// True if this whole version list isn't needed any more. There is still
// possibilty that someone is reading it at this moment but he cant change
// it or get anything from it.
// TODO: Validate this method
bool gc_deleted(const Id &id)
{
auto r = head.load(std::memory_order_seq_cst);
T *bef = nullptr;
// nullptr
// |
// [v1] ...
// |
// [v2] <------+
// | |
// [v3] <------+
// | | Jump backwards until you find a first old deleted
// [VerList] ----+ version, or you reach the end of the list
//
while (r != nullptr && !r->is_deleted_before(id)) {
bef = r;
r = r->next(std::memory_order_seq_cst);
}
if (bef == nullptr) {
// if r==nullptr he is needed and it is expecting insert.
// if r!=nullptr vertex has been explicitly deleted. It can't be
// updated because for update, visible record is needed and at this
// point whe know that there is no visible record for any
// transaction. Also it cant be inserted because head isn't nullptr.
// Remove also requires visible record. Find wont return any record
// because none is visible.
return r != nullptr;
} else {
if (r != nullptr) {
// Bef is possible visible to some transaction but r is not and
// the implementation of this version list guarantees that
// record r and older records aren't accessed.
bef->next(nullptr, std::memory_order_seq_cst);
delete r; // THIS IS ISSUE IF MULTIPLE THREADS TRY TO DO THIS
}
return false;
}
}
void vacuum() {} void vacuum() {}
T *find(const tx::Transaction &t) const T *find(const tx::Transaction &t) const

View File

@ -9,6 +9,10 @@
class EdgeTypeStore class EdgeTypeStore
{ {
public: public:
using store_t = ConcurrentMap<CharStr, std::unique_ptr<EdgeType>>;
store_t::Accessor access();
const EdgeType &find_or_create(const char *name); const EdgeType &find_or_create(const char *name);
bool contains(const char *name); // TODO: const bool contains(const char *name); // TODO: const
@ -24,5 +28,5 @@ public:
// templetize the two of them // templetize the two of them
private: private:
ConcurrentMap<CharStr, std::unique_ptr<EdgeType>> edge_types; store_t edge_types;
}; };

View File

@ -19,17 +19,22 @@ using EdgeIndexBase = IndexBase<TypeGroupEdge, K>;
class Edges class Edges
{ {
using prop_familys_t = ConcurrentMap<std::string, EdgePropertyFamily *>; using prop_familys_t = ConcurrentMap<std::string, EdgePropertyFamily *>;
using store_t = ConcurrentMap<uint64_t, EdgeRecord>;
public: public:
store_t::Accessor access();
Option<const EdgeAccessor> find(DbTransaction &t, const Id &id); Option<const EdgeAccessor> find(DbTransaction &t, const Id &id);
// Creates new Edge and returns filled EdgeAccessor. // Creates new Edge and returns filled EdgeAccessor.
EdgeAccessor insert(DbTransaction &t, VertexRecord *from, VertexRecord *to); EdgeAccessor insert(DbTransaction &t, VertexRecord *from, VertexRecord *to);
prop_familys_t::Accessor property_family_access();
EdgePropertyFamily &property_family_find_or_create(const std::string &name); EdgePropertyFamily &property_family_find_or_create(const std::string &name);
private: private:
ConcurrentMap<uint64_t, EdgeRecord> edges; store_t edges;
// TODO: Because familys wont be removed this could be done with more // TODO: Because familys wont be removed this could be done with more
// efficent // efficent
// data structure. // data structure.

View File

@ -9,6 +9,7 @@ template <class TG, class K>
class NonUniqueUnorderedIndex : public IndexBase<TG, K> class NonUniqueUnorderedIndex : public IndexBase<TG, K>
{ {
public: public:
using store_t = List<IndexRecord<TG, K>>;
// typedef T value_type; // typedef T value_type;
// typedef K key_type; // typedef K key_type;
@ -33,9 +34,9 @@ public:
// Removes for all transactions obsolete Records. // Removes for all transactions obsolete Records.
// Cleaner has to call this method when he decideds that it is time for // Cleaner has to call this method when he decideds that it is time for
// cleaning. // cleaning. Id must be id of oldest active transaction.
void clean(DbTransaction &) final; void clean(const Id &id) final;
private: private:
List<IndexRecord<TG, K>> list; store_t list;
}; };

View File

@ -32,8 +32,8 @@ public:
// Removes for all transactions obsolete Records. // Removes for all transactions obsolete Records.
// Cleaner has to call this method when he decideds that it is time for // Cleaner has to call this method when he decideds that it is time for
// cleaning. // cleaning. Id must be id of oldest active transaction.
void clean(DbTransaction &) final; void clean(const Id &id) final;
private: private:
ConcurrentSet<IndexRecord<T, K>> set; ConcurrentSet<IndexRecord<T, K>> set;

View File

@ -56,8 +56,8 @@ public:
// Removes for all transactions obsolete Records. // Removes for all transactions obsolete Records.
// Cleaner has to call this method when he decideds that it is time for // Cleaner has to call this method when he decideds that it is time for
// cleaning. // cleaning. Id must be id of oldest active transaction.
virtual void clean(DbTransaction &) = 0; virtual void clean(const Id &id) = 0;
// Activates index for readers. // Activates index for readers.
void activate(); void activate();

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include "mvcc/id.hpp"
#include "utils/border.hpp" #include "utils/border.hpp"
#include "utils/total_ordering.hpp" #include "utils/total_ordering.hpp"
@ -52,6 +53,10 @@ public:
bool is_valid(tx::Transaction &t) const; bool is_valid(tx::Transaction &t) const;
// True if it can be removed.
bool to_clean(const Id &oldest_active) const;
// This method is valid only if is_valid is true.
const auto access(DbTransaction &db) const; const auto access(DbTransaction &db) const;
const K key; const K key;

View File

@ -9,6 +9,10 @@
class LabelStore class LabelStore
{ {
public: public:
using store_t = ConcurrentMap<CharStr, std::unique_ptr<Label>>;
store_t::Accessor access();
const Label &find_or_create(const char *name); const Label &find_or_create(const char *name);
bool contains(const char *name); // TODO: const bool contains(const char *name); // TODO: const
@ -17,5 +21,5 @@ public:
// return { Label, is_found } // return { Label, is_found }
private: private:
ConcurrentMap<CharStr, std::unique_ptr<Label>> labels; store_t labels;
}; };

View File

@ -33,6 +33,8 @@ public:
VertexPropertyFamily & VertexPropertyFamily &
property_family_find_or_create(const std::string &name); property_family_find_or_create(const std::string &name);
prop_familys_t::Accessor property_family_access();
private: private:
vertices_t vertices; vertices_t vertices;
// TODO: Because families wont be removed this could be done with more // TODO: Because families wont be removed this could be done with more

View File

@ -2,5 +2,5 @@
namespace this_thread namespace this_thread
{ {
thread_local unsigned id = 0; // thread_local unsigned id = 0;
}; };

View File

@ -1,11 +1,11 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <thread>
#include <cassert> #include <cassert>
#include <thread>
#include "threading/id.hpp"
#include "utils/underlying_cast.hpp" #include "utils/underlying_cast.hpp"
#include "id.hpp"
class Thread class Thread
{ {
@ -28,27 +28,20 @@ public:
} }
Thread() = default; Thread() = default;
Thread(const Thread&) = delete; Thread(const Thread &) = delete;
Thread(Thread&& other) Thread(Thread &&other);
{
assert(thread_id == UNINITIALIZED);
thread_id = other.thread_id;
thread = std::move(other.thread);
}
void join() { return thread.join(); } void join();
private: private:
unsigned thread_id = UNINITIALIZED; unsigned thread_id = UNINITIALIZED;
std::thread thread; std::thread thread;
template <class F, class... Args> template <class F, class... Args>
void start_thread(F&& f) void start_thread(F &&f)
{ {
this_thread::id = thread_id; // this_thread::id = thread_id;
f(); f();
} }
}; };
std::atomic<unsigned> Thread::thread_counter {1};

View File

@ -3,6 +3,8 @@
#include <algorithm> #include <algorithm>
#include <vector> #include <vector>
#include "utils/option.hpp"
namespace tx namespace tx
{ {
@ -23,6 +25,25 @@ public:
return std::binary_search(active.begin(), active.end(), xid); return std::binary_search(active.begin(), active.end(), xid);
} }
// Return id of oldest transaction. None if there is no transactions in
// snapshot.
Option<Id> oldest_active()
{
auto n = active.size();
if (n > 0) {
Id min = active[0];
for (auto i = 1; i < n; i++) {
if (active[i] < min) {
min = active[i];
}
}
return Option<Id>(min);
} else {
return Option<Id>();
}
}
void insert(const id_t &id) { active.push_back(id); } void insert(const id_t &id) { active.push_back(id); }
void remove(const id_t &id) void remove(const id_t &id)

View File

@ -33,6 +33,9 @@ public:
// snapshot will be empty. // snapshot will be empty.
void wait_for_active(); void wait_for_active();
// Return id of oldest transaction from snapshot.
Id oldest_active();
// True if id is in snapshot. // True if id is in snapshot.
bool is_active(const Id &id) const; bool is_active(const Id &id) const;
void take_lock(RecordLock &lock); void take_lock(RecordLock &lock);

View File

@ -131,6 +131,17 @@ public:
return std::move(*data._M_ptr()); return std::move(*data._M_ptr());
} }
// Takes if it exists otherwise returns given value.
T take_or(T &&value)
{
if (initialized) {
initialized = false;
return std::move(*data._M_ptr());
} else {
return std::move(value);
}
}
explicit operator bool() const { return initialized; } explicit operator bool() const { return initialized; }
private: private:

View File

@ -43,6 +43,13 @@ public:
initialized = true; initialized = true;
} }
template <class... Args>
void emplace(Args &&... args)
{
new (data._M_addr()) T(args...);
initialized = true;
}
private: private:
__gnu_cxx::__aligned_buffer<T> data; __gnu_cxx::__aligned_buffer<T> data;
bool initialized = false; bool initialized = false;

View File

@ -1,5 +1,6 @@
#include "database/db_transaction.hpp" #include "database/db_transaction.hpp"
#include "database/db.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
#include "storage/edge_type/edge_type.hpp" #include "storage/edge_type/edge_type.hpp"
#include "storage/label/label.hpp" #include "storage/label/label.hpp"
@ -10,6 +11,79 @@
return false; \ return false; \
} }
DbTransaction::DbTransaction(Db &db) : db(db), trans(db.tx_engine.begin()) {}
// Cleaning for indexes in labels and edge_type
template <class A>
void clean_indexes(A &&acc, Id oldest_active)
{
for (auto &l : acc) {
l.second.get()->index().clean(oldest_active);
}
}
// Cleaning for version lists
template <class A>
void clean_version_lists(A &&acc, Id oldest_active)
{
for (auto &vlist : acc) {
if (vlist.second.gc_deleted(oldest_active)) {
// TODO: Optimization, iterator with remove method.
bool succ = acc.remove(vlist.first);
assert(succ); // There is other cleaner here
}
}
}
// Cleaning for indexes in properties.
template <class A>
void clean_property_indexes(A &&acc, Id oldest_active)
{
for (auto &family : acc) {
auto oi = family.second->index.get_read();
if (oi.is_present()) {
oi.get()->clean(oldest_active);
}
}
// TODO: Code for cleaning other indexes which are not yet coded into
// the database.
}
// Cleans edge part of database. Should be called by one cleaner thread at
// one time.
void DbTransaction::clean_edge_section()
{
Id oldest_active = trans.oldest_active();
// Clean edge_type index
clean_indexes(db.graph.edge_type_store.access(), oldest_active);
// Clean family_type_s edge index
clean_property_indexes(db.graph.edges.property_family_access(),
oldest_active);
// Clean Edge list
clean_version_lists(db.graph.edges.access(), oldest_active);
}
// Cleans vertex part of database. Should be called by one cleaner thread at
// one time.
void DbTransaction::clean_vertex_section()
{
Id oldest_active = trans.oldest_active();
// Clean label index
clean_indexes(db.graph.label_store.access(), oldest_active);
// Clean family_type_s vertex index
clean_property_indexes(db.graph.vertices.property_family_access(),
oldest_active);
// Clean vertex list
clean_version_lists(db.graph.vertices.access(), oldest_active);
}
template <class TG, class IU> template <class TG, class IU>
bool update_property_indexes(IU &iu, const tx::Transaction &t) bool update_property_indexes(IU &iu, const tx::Transaction &t)
{ {

37
src/dbms/cleaner.cpp Normal file
View File

@ -0,0 +1,37 @@
#include "dbms/cleaner.hpp"
#include <chrono>
#include <ctime>
#include <thread>
#include "database/db_transaction.hpp"
#include "threading/thread.hpp"
Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs) : dbms(dbs)
{
cleaners.push_back(std::make_unique<Thread>([&]() {
std::time_t last_clean = std::time(nullptr);
while (cleaning.load(std::memory_order_acquire)) {
std::time_t now = std::time(nullptr);
if (now >= last_clean + cleaning_cycle) {
for (auto &db : dbs.access()) {
DbTransaction t(db.second);
t.clean_edge_section();
t.clean_vertex_section();
}
last_clean = now;
} else {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}));
}
Cleaning::~Cleaning()
{
cleaning.store(false, std::memory_order_release);
for (auto &t : cleaners) {
t.get()->join();
}
}

31
src/dbms/dbms.cpp Normal file
View File

@ -0,0 +1,31 @@
#include "dbms/dbms.hpp"
// returns active database
Db &Dbms::active()
{
Db *active = active_db.load(std::memory_order_acquire);
if (UNLIKELY(active == nullptr)) {
return create_default();
} else {
return *active;
}
}
// set active database
// if active database doesn't exist create one
Db &Dbms::active(const std::string &name)
{
auto acc = dbs.access();
// create db if it doesn't exist
auto it = acc.find(name);
if (it == acc.end()) {
it = acc.emplace(name, std::forward_as_tuple(name),
std::forward_as_tuple(name))
.first;
}
// set and return active db
auto &db = it->second;
active_db.store(&db, std::memory_order_release);
return db;
}

View File

@ -1,46 +0,0 @@
#pragma once
#include <map>
#include "database/db.hpp"
class Dbms
{
public:
Dbms() { create_default(); }
// returns active database
Db &active()
{
if (UNLIKELY(active_db == nullptr)) create_default();
return *active_db;
}
// set active database
// if active database doesn't exist create one
Db &active(const std::string &name)
{
// create db if it doesn't exist
if (dbs.find(name) == dbs.end()) {
dbs.emplace(std::piecewise_construct, std::forward_as_tuple(name),
std::forward_as_tuple(name));
}
// set and return active db
auto &db = dbs.at(name);
return active_db = &db, *active_db;
}
// TODO: DELETE action
private:
// dbs container
std::map<std::string, Db> dbs;
// currently active database
Db *active_db;
// creates default database
void create_default() { active("default"); }
};

View File

@ -1,5 +1,10 @@
#include "storage/edge_type/edge_type_store.hpp" #include "storage/edge_type/edge_type_store.hpp"
EdgeTypeStore::store_t::Accessor EdgeTypeStore::access()
{
return edge_types.access();
}
const EdgeType &EdgeTypeStore::find_or_create(const char *name) const EdgeType &EdgeTypeStore::find_or_create(const char *name)
{ {
auto accessor = edge_types.access(); auto accessor = edge_types.access();

View File

@ -3,6 +3,8 @@
#include "storage/edge_accessor.hpp" #include "storage/edge_accessor.hpp"
#include "utils/iterator/iterator.hpp" #include "utils/iterator/iterator.hpp"
Edges::store_t::Accessor Edges::access() { return edges.access(); }
Option<const EdgeAccessor> Edges::find(DbTransaction &t, const Id &id) Option<const EdgeAccessor> Edges::find(DbTransaction &t, const Id &id)
{ {
auto edges_accessor = edges.access(); auto edges_accessor = edges.access();
@ -35,6 +37,11 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from,
return EdgeAccessor(edge, &inserted_edge_record->second, t); return EdgeAccessor(edge, &inserted_edge_record->second, t);
} }
Edges::prop_familys_t::Accessor Edges::property_family_access()
{
return prop_familys.access();
}
EdgePropertyFamily & EdgePropertyFamily &
Edges::property_family_find_or_create(const std::string &name) Edges::property_family_find_or_create(const std::string &name)
{ {

View File

@ -65,9 +65,14 @@ auto NonUniqueUnorderedIndex<T, K>::for_range_exact(DbAccessor &t_v,
} }
template <class T, class K> template <class T, class K>
void NonUniqueUnorderedIndex<T, K>::clean(DbTransaction &) void NonUniqueUnorderedIndex<T, K>::clean(const Id &id)
{ {
// TODO: Actual cleaning auto end = list.end();
for (auto it = list.begin(); it != end; it++) {
if (it->to_clean(id)) {
it.remove();
}
}
} }
template class NonUniqueUnorderedIndex<TypeGroupEdge, std::nullptr_t>; template class NonUniqueUnorderedIndex<TypeGroupEdge, std::nullptr_t>;

View File

@ -84,9 +84,15 @@ auto UniqueOrderedIndex<T, K>::for_range_exact(DbAccessor &t_v,
} }
template <class T, class K> template <class T, class K>
void UniqueOrderedIndex<T, K>::clean(DbTransaction &) void UniqueOrderedIndex<T, K>::clean(const Id &id)
{ {
// TODO: Actual cleaning auto acc = set.access();
for (auto ir : acc) {
if (ir.to_clean(id)) {
// TODO: Optimization, iterator with remove method.
acc.remove(ir);
}
}
} }
template class UniqueOrderedIndex<TypeGroupEdge, std::nullptr_t>; template class UniqueOrderedIndex<TypeGroupEdge, std::nullptr_t>;

View File

@ -31,6 +31,13 @@ bool IndexRecord<TG, K>::is_valid(tx::Transaction &t) const
return record == vlist->find(t); return record == vlist->find(t);
} }
template <class TG, class K>
bool IndexRecord<TG, K>::to_clean(const Id &oldest_active) const
{
assert(!empty());
return record->is_deleted_before(oldest_active);
}
template <class TG, class K> template <class TG, class K>
const auto IndexRecord<TG, K>::access(DbTransaction &db) const const auto IndexRecord<TG, K>::access(DbTransaction &db) const
{ {

View File

@ -1,5 +1,7 @@
#include "storage/label/label_store.hpp" #include "storage/label/label_store.hpp"
LabelStore::store_t::Accessor LabelStore::access() { return labels.access(); }
const Label &LabelStore::find_or_create(const char *name) const Label &LabelStore::find_or_create(const char *name)
{ {
auto accessor = labels.access(); auto accessor = labels.access();

View File

@ -37,6 +37,11 @@ VertexAccessor Vertices::insert(DbTransaction &t)
return VertexAccessor(vertex, &inserted_vertex_record->second, t); return VertexAccessor(vertex, &inserted_vertex_record->second, t);
} }
Vertices::prop_familys_t::Accessor Vertices::property_family_access()
{
return prop_familys.access();
}
VertexPropertyFamily & VertexPropertyFamily &
Vertices::property_family_find_or_create(const std::string &name) Vertices::property_family_find_or_create(const std::string &name)
{ {

12
src/threading/thread.cpp Normal file
View File

@ -0,0 +1,12 @@
#include "threading/thread.hpp"
Thread::Thread(Thread &&other)
{
assert(thread_id == UNINITIALIZED);
thread_id = other.thread_id;
thread = std::move(other.thread);
}
void Thread::join() { return thread.join(); }
std::atomic<unsigned> Thread::thread_counter{1};

View File

@ -31,6 +31,11 @@ bool Transaction::is_active(const Id &id) const
return snapshot.is_active(id); return snapshot.is_active(id);
} }
Id Transaction::oldest_active()
{
return snapshot.oldest_active().take_or(Id(id));
}
void Transaction::take_lock(RecordLock &lock) { locks.take(&lock, id); } void Transaction::take_lock(RecordLock &lock) { locks.take(&lock, id); }
void Transaction::commit() { engine.commit(*this); } void Transaction::commit() { engine.commit(*this); }