merge with dev

This commit is contained in:
Marko Budiselic 2016-08-30 05:34:08 +01:00
commit 32ca896425
69 changed files with 839 additions and 194 deletions

View File

@ -144,6 +144,7 @@ endforeach()
# -----------------------------------------------------------------------------
# COPY header files required by query engine (query compiler)
# TODO: somehow automate (in destination dir should be only required include files)
FILE(COPY ${include_dir}/barrier/barrier.hpp DESTINATION ${build_include_dir}/barrier)
FILE(COPY ${include_dir}/database/db.hpp DESTINATION ${build_include_dir}/database)
FILE(COPY ${include_dir}/database/db_transaction.hpp DESTINATION ${build_include_dir}/database)
FILE(COPY ${include_dir}/database/db_accessor.hpp DESTINATION ${build_include_dir}/database)
@ -382,7 +383,7 @@ message(STATUS "TESTS binaries: ${TESTS}")
# development the barrier can be turned off because it is much easier to
# debug
option(BARRIER "Barrier" ON)
message(STATUS "Barrier: ${BARRIER} (Source code isolation)")
message(STATUS "BARRIER: ${BARRIER} (Source code isolation)")
if(BARRIER)
add_definitions( -DBARRIER )
endif()
@ -412,6 +413,8 @@ EXECUTE_PROCESS(
# TODO: create separate static library from bolt code
set(memgraph_src_files
${src_dir}/dbms/dbms.cpp
${src_dir}/dbms/cleaner.cpp
${src_dir}/utils/string/transform.cpp
${src_dir}/utils/string/join.cpp
${src_dir}/utils/string/file.cpp
@ -426,6 +429,7 @@ set(memgraph_src_files
${src_dir}/communication/bolt/v1/transport/bolt_decoder.cpp
${src_dir}/communication/bolt/v1/transport/buffer.cpp
${src_dir}/communication/bolt/v1/serialization/bolt_serializer.cpp
${src_dir}/threading/thread.cpp
${src_dir}/mvcc/id.cpp
${src_dir}/storage/vertices.cpp
${src_dir}/storage/edges.cpp
@ -466,7 +470,6 @@ set(memgraph_src_files
${src_dir}/storage/record_accessor.cpp
)
# STATIC library used by memgraph executables
add_library(memgraph STATIC ${memgraph_src_files})
@ -474,6 +477,14 @@ add_library(memgraph STATIC ${memgraph_src_files})
add_library(memgraph_pic STATIC ${memgraph_src_files})
set_property(TARGET memgraph_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE)
if (BARRIER)
# create static barrier lib
add_library(barrier STATIC ${memgraph_src_files})
# create pic static barrier lib
add_library(barrier_pic STATIC ${memgraph_src_files})
set_property(TARGET barrier_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE)
endif()
# tests
if (TESTS)
@ -509,7 +520,13 @@ set(MEMGRAPH_BUILD_NAME
# memgraph main executable
if (MEMGRAPH)
add_executable(${MEMGRAPH_BUILD_NAME} ${src_dir}/memgraph_bolt.cpp)
target_link_libraries(${MEMGRAPH_BUILD_NAME} memgraph)
if (BARRIER)
target_link_libraries(${MEMGRAPH_BUILD_NAME} barrier)
elseif (NOT BARRIER)
target_link_libraries(${MEMGRAPH_BUILD_NAME} memgraph)
endif ()
target_link_libraries(${MEMGRAPH_BUILD_NAME} Threads::Threads)
target_link_libraries(${MEMGRAPH_BUILD_NAME} cypher_lib)
if (UNIX)
@ -519,12 +536,3 @@ if (MEMGRAPH)
target_link_libraries(${MEMGRAPH_BUILD_NAME} dl)
endif (UNIX)
endif()
# # memgraph executable HTTP TODO: DEPRICATED
# add_executable(memgraph_http src/memgraph.cpp)
# add_dependencies(memgraph_http cypher_lib)
# target_link_libraries(memgraph_http Threads::Threads)
# target_link_libraries(memgraph_http pcre)
# target_link_libraries(memgraph_http ${libuv_static_lib})
# target_link_libraries(memgraph_http ${r3_static_lib})
# target_link_libraries(memgraph_http ${http_parser_static_lib})

View File

@ -38,7 +38,7 @@ class EdgePropertyType;
// BOLT
template <class Stream>
class BoltSerializer;
class RecordStream;
// ************ Here should be forward declarations of Unsized barrier classes
// COMMON
@ -413,43 +413,41 @@ public:
};
template <class Stream>
class BoltSerializer : private Sized<8, 8>
class RecordStream : private Sized<8, 8>
{
public:
template <class T>
BoltSerializer(T &&d);
RecordStream(T &&d);
BoltSerializer(const BoltSerializer &other) = default;
BoltSerializer(BoltSerializer &&other) = default;
~BoltSerializer();
RecordStream(const RecordStream &other) = default;
RecordStream(RecordStream &&other) = default;
~RecordStream();
BoltSerializer &operator=(const BoltSerializer &other) = default;
BoltSerializer &operator=(BoltSerializer &&other) = default;
RecordStream &operator=(const RecordStream &other) = default;
RecordStream &operator=(RecordStream &&other) = default;
void write(const VertexAccessor &vertex);
void write(const EdgeAccessor &edge);
void write(const Property &prop);
void write_null();
void write(const Bool &prop);
void write(const Float &prop);
void write(const Double &prop);
void write(const Int32 &prop);
void write(const Int64 &prop);
void write(const std::string &value);
void write(const String &prop);
template <class T>
void handle(const T &prop);
void write_success();
void write_success_empty();
void write_ignored();
void write_fields(const std::vector<std::string> &fields);
void write_field(const std::string& field);
void write_list_header(size_t size);
void write_record();
void write_meta(const std::string& type);
void send();
void chunk();
};
// ************ Here should be definitions of Unsized barrier classes

View File

@ -1,8 +1,11 @@
#pragma once
#include "barrier/barrier.hpp"
// This is the place for imports from memgraph .hpp
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
#include "communication/bolt/v1/serialization/record_stream.hpp"
#include "io/network/socket.hpp"
#include "database/db.hpp"
#include "database/db_accessor.hpp"
#include "storage/edge_type/edge_type.hpp"
@ -165,7 +168,7 @@ TRANSFORM_REF_TEMPLATED(VertexIndex<T>, VertexIndexBase<T>);
template <class T>
TRANSFORM_REF_TEMPLATED(EdgeIndex<T>, EdgeIndexBase<T>);
template <class T>
TRANSFORM_REF_TEMPLATED(BoltSerializer<T>, ::bolt::BoltSerializer<T>);
TRANSFORM_REF_TEMPLATED(RecordStream<T>, ::bolt::RecordStream<T>);
template <class T>
TRANSFORM_REF_TEMPLATED(
@ -209,7 +212,7 @@ TRANSFORM_VALUE_ONE_RAW(EdgePropertyType<T>,
::EdgePropertyFamily::PropertyType::PropertyTypeKey<T>)
template <class T>
TRANSFORM_VALUE_ONE_RAW(BoltSerializer<T>, ::bolt::BoltSerializer<T>)
TRANSFORM_VALUE_ONE_RAW(RecordStream<T>, ::bolt::RecordStream<T>)
// ********************* SPECIAL CONSTRUCTORS
#define VertexPropertyType_constructor(x) \

View File

@ -1,8 +1,8 @@
#pragma once
#include "communication/bolt/v1/states.hpp"
#include "io/network/socket.hpp"
#include "dbms/dbms.hpp"
#include "io/network/socket.hpp"
namespace bolt
{
@ -16,11 +16,10 @@ class Bolt
public:
Bolt();
Session* create_session(io::Socket&& socket);
void close(Session* session);
Session *create_session(io::Socket &&socket);
void close(Session *session);
States states;
Dbms dbms;
};
}

View File

@ -8,6 +8,9 @@
#include "storage/model/properties/all.hpp"
#include "storage/model/properties/properties.hpp"
#include "storage/label/label.hpp"
#include "storage/edge_type/edge_type.hpp"
#include "storage/vertex_record.hpp"
namespace bolt
{

View File

@ -22,6 +22,8 @@ public:
logger = logging::log->logger("Record Stream");
}
~RecordStream() = default;
// TODO: create apstract methods that are not bolt specific ---------------
void write_success()
{

View File

@ -5,7 +5,8 @@
namespace config
{
constexpr const char * COMPILE_CPU_PATH = "compile_cpu_path";
constexpr const char * TEMPLATE_CPU_CPP_PATH = "template_cpu_cpp_path";
constexpr const char *COMPILE_CPU_PATH = "compile_cpu_path";
constexpr const char *TEMPLATE_CPU_CPP_PATH = "template_cpu_cpp_path";
constexpr const char *BARRIER_TEMPLATE_CPU_CPP_PATH =
"barrier_template_cpu_cpp_path";
}

View File

@ -33,7 +33,7 @@ public:
std::pair<list_it, bool> insert(const K &key, T &&data)
{
return accessor.insert(item_t(key, std::forward<T>(data)));
return accessor.insert(item_t(key, std::move(data)));
}
std::pair<list_it, bool> insert(K &&key, T &&data)
@ -42,6 +42,17 @@ public:
item_t(std::forward<K>(key), std::forward<T>(data)));
}
template <class... Args1, class... Args2>
std::pair<list_it, bool> emplace(const K &key,
std::tuple<Args1...> first_args,
std::tuple<Args2...> second_args)
{
return accessor.emplace(
key, std::piecewise_construct,
std::forward<std::tuple<Args1...>>(first_args),
std::forward<std::tuple<Args2...>>(second_args));
}
list_it_con find(const K &key) const { return accessor.find(key); }
list_it find(const K &key) { return accessor.find(key); }

View File

@ -148,7 +148,7 @@ public:
static Node *create(const T &item, uint8_t height)
{
return create(item, height);
return create(height, item);
}
static Node *create(T &&item, uint8_t height)
@ -160,6 +160,16 @@ public:
return new (node) Node(std::move(item), height);
}
template <class... Args>
static Node *emplace(uint8_t height, Args &&... args)
{
auto node = allocate(height);
// we have raw memory and we need to construct an object
// of type Node on it
return new (node) Node(height, std::forward<Args>(args)...);
}
static void destroy(Node *node)
{
node->~Node();
@ -180,6 +190,12 @@ public:
new (&tower[i]) std::atomic<Node *>{nullptr};
}
template <class... Args>
Node(uint8_t height, Args &&... args) : Node(height)
{
this->data.emplace(std::forward<Args>(args)...);
}
Node(T &&data, uint8_t height) : Node(height)
{
this->data.set(std::move(data));
@ -519,12 +535,19 @@ public:
std::pair<Iterator, bool> insert(const T &item)
{
return skiplist->insert(item, preds, succs);
return skiplist->insert(preds, succs, item);
}
std::pair<Iterator, bool> insert(T &&item)
{
return skiplist->insert(std::move(item), preds, succs);
return skiplist->insert(preds, succs, std::move(item));
}
template <class K, class... Args>
std::pair<Iterator, bool> emplace(K &key, Args &&... args)
{
return skiplist->emplace(preds, succs, key,
std::forward<Args>(args)...);
}
Iterator insert_non_unique(const T &item)
@ -786,13 +809,15 @@ private:
// has the locks
if (!lock_nodes<true>(height, guards, preds, succs)) continue;
return insert_here(std::forward<T>(data), preds, succs, height,
guards);
return insert_here(Node::create(std::move(data), height), preds,
succs, height, guards);
}
}
// Insert unique data
std::pair<Iterator, bool> insert(T &&data, Node *preds[], Node *succs[])
// F - type of funct which will create new node if needed. Recieves height
// of node.
std::pair<Iterator, bool> insert(Node *preds[], Node *succs[], T &&data)
{
while (true) {
// TODO: before here was data.first
@ -817,18 +842,53 @@ private:
// has the locks
if (!lock_nodes<true>(height, guards, preds, succs)) continue;
return {insert_here(std::move(data), preds, succs, height, guards),
return {insert_here(Node::create(std::move(data), height), preds,
succs, height, guards),
true};
}
}
// Inserts data to specified locked location.
Iterator insert_here(T &&data, Node *preds[], Node *succs[], int height,
guard_t guards[])
// Insert unique data
// TODO: This is almost all duplicate code from insert
template <class K, class... Args>
std::pair<Iterator, bool> emplace(Node *preds[], Node *succs[], K &key,
Args &&... args)
{
// you have the locks, create a new node
auto new_node = Node::create(std::move(data), height);
while (true) {
// TODO: before here was data.first
auto level = find_path(this, H - 1, key, preds, succs);
if (level != -1) {
auto found = succs[level];
if (found->flags.is_marked()) continue;
while (!found->flags.is_fully_linked())
usleep(250);
return {Iterator{succs[level]}, false};
}
auto height = rnd();
guard_t guards[H];
// try to acquire the locks for predecessors up to the height of
// the new node. release the locks and try again if someone else
// has the locks
if (!lock_nodes<true>(height, guards, preds, succs)) continue;
return {
insert_here(Node::emplace(height, std::forward<Args>(args)...),
preds, succs, height, guards),
true};
}
}
// Inserts data to specified locked location.
Iterator insert_here(Node *new_node, Node *preds[], Node *succs[],
int height, guard_t guards[])
{
// Node::create(std::move(data), height)
// link the predecessors and successors, e.g.
//
// 4 HEAD ... P ------------------------> S ... NULL

View File

@ -28,6 +28,7 @@ public:
// I - type of function I:const tx::Transaction& ->
// std::unique_ptr<IndexBase<TypeGroupVertex,std::nullptr_t>>
// G - type of collection (verrtex/edge)
// TODO: Currently only one index at a time can be created.
template <class TG, class I, class G>
bool create_index_on_vertex_property_family(const char *name, G &coll,
I &create_index);

View File

@ -17,6 +17,7 @@ class DbTransaction
friend DbAccessor;
public:
DbTransaction(Db &db);
DbTransaction(Db &db, tx::Transaction &trans) : db(db), trans(trans) {}
// Global transactional algorithms,operations and general methods meant for
@ -24,6 +25,14 @@ public:
// This should provide cleaner hierarchy of operations on database.
// For example cleaner.
// Cleans edge part of database. MUST be called by one cleaner thread at
// one time.
void clean_edge_section();
// Cleans vertex part of database. MUST be called by one cleaner thread at
// one time..
void clean_vertex_section();
// Updates indexes of Vertex/Edges in index_updates. True if indexes are
// updated successfully. False means that transaction failed.
bool update_indexes();

25
include/dbms/cleaner.hpp Normal file
View File

@ -0,0 +1,25 @@
#pragma once
#include "database/db.hpp"
class Thread;
// How much sec is a cleaning_cycle in which cleaner will clean at most
// once.
constexpr size_t cleaning_cycle = 60;
class Cleaning
{
public:
Cleaning(ConcurrentMap<std::string, Db> &dbs);
~Cleaning();
private:
ConcurrentMap<std::string, Db> &dbms;
std::vector<std::unique_ptr<Thread>> cleaners;
std::atomic<bool> cleaning = {true};
};

32
include/dbms/dbms.hpp Normal file
View File

@ -0,0 +1,32 @@
#pragma once
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/db.hpp"
#include "dbms/cleaner.hpp"
class Dbms
{
public:
Dbms() { create_default(); }
// returns active database
Db &active();
// set active database
// if active database doesn't exist create one
Db &active(const std::string &name);
// TODO: DELETE action
private:
// creates default database
Db &create_default() { return active("default"); }
// dbs container
ConcurrentMap<std::string, Db> dbs;
// currently active database
std::atomic<Db *> active_db;
Cleaning cleaning = {dbs};
};

View File

@ -102,6 +102,12 @@ public:
return committed(hints.cre, tx.cre(), t);
}
// True if record was deleted before id.
bool is_deleted_before(const Id &id)
{
return tx.exp() != Id(0) && tx.exp() < id;
}
// TODO: Test this
// True if this record is visible for write.
bool is_visible_write(const tx::Transaction &t)

View File

@ -10,32 +10,35 @@ class Version
{
public:
Version() = default;
Version(T* older) : older(older) {}
Version(T *older) : older(older) {}
~Version()
{
delete older.load(std::memory_order_seq_cst);
}
~Version() { delete older.load(std::memory_order_seq_cst); }
// return a pointer to an older version stored in this record
T* next(std::memory_order order = std::memory_order_seq_cst)
T *next(std::memory_order order = std::memory_order_seq_cst)
{
return older.load(order);
}
const T* next(std::memory_order order = std::memory_order_seq_cst) const
const T *next(std::memory_order order = std::memory_order_seq_cst) const
{
return older.load(order);
}
// set the older version of this record
void next(T* value, std::memory_order order = std::memory_order_seq_cst)
void next(T *value, std::memory_order order = std::memory_order_seq_cst)
{
older.store(value, order);
}
private:
std::atomic<T*> older {nullptr};
};
// sets if as expected
bool cas(T *expected, T *set,
std::memory_order order = std::memory_order_seq_cst)
{
return older.compare_exchange_strong(expected, set, order);
}
private:
std::atomic<T *> older{nullptr};
};
}

View File

@ -11,7 +11,7 @@ namespace mvcc
{
template <class T>
class VersionList : public LazyGC<VersionList<T>>
class VersionList
{
friend class Accessor;
@ -51,6 +51,56 @@ public:
auto gc_lock_acquire() { return std::unique_lock<RecordLock>(lock); }
// Frees all records which are deleted by transaction older than given id.
// EXPECTS THAT THERE IS NO ACTIVE TRANSACTION WITH ID LESS THAN GIVEN ID.
// EXPECTS THAT THERE WON'T BE SIMULATAIUS CALLS FROM DIFFERENT THREADS OF
// THIS METHOD.
// True if this whole version list isn't needed any more. There is still
// possibilty that someone is reading it at this moment but he cant change
// it or get anything from it.
// TODO: Validate this method
bool gc_deleted(const Id &id)
{
auto r = head.load(std::memory_order_seq_cst);
T *bef = nullptr;
// nullptr
// |
// [v1] ...
// |
// [v2] <------+
// | |
// [v3] <------+
// | | Jump backwards until you find a first old deleted
// [VerList] ----+ version, or you reach the end of the list
//
while (r != nullptr && !r->is_deleted_before(id)) {
bef = r;
r = r->next(std::memory_order_seq_cst);
}
if (bef == nullptr) {
// if r==nullptr he is needed and it is expecting insert.
// if r!=nullptr vertex has been explicitly deleted. It can't be
// updated because for update, visible record is needed and at this
// point whe know that there is no visible record for any
// transaction. Also it cant be inserted because head isn't nullptr.
// Remove also requires visible record. Find wont return any record
// because none is visible.
return r != nullptr;
} else {
if (r != nullptr) {
// Bef is possible visible to some transaction but r is not and
// the implementation of this version list guarantees that
// record r and older records aren't accessed.
bef->next(nullptr, std::memory_order_seq_cst);
delete r; // THIS IS ISSUE IF MULTIPLE THREADS TRY TO DO THIS
}
return false;
}
}
void vacuum() {}
T *find(const tx::Transaction &t) const

View File

@ -20,6 +20,9 @@ public:
std::string flags;
// TODO: sync this with cmake configuration
#ifdef BARRIER
flags += " -DBARRIER";
#endif
#ifdef NDEBUG
flags += " -DNDEBUG -O2";
#endif
@ -51,6 +54,9 @@ public:
"-I../libs/fmt", // TODO: load from config
"-I../../libs/fmt",
"-L./ -L../",
#ifdef BARRIER
"-lbarrier_pic",
#endif
"-lmemgraph_pic",
"-shared -fPIC" // shared library flags
);

View File

@ -3,11 +3,11 @@
#include "config/config.hpp"
#include "cypher/ast/ast.hpp"
#include "cypher/compiler.hpp"
#include "logging/default.hpp"
#include "query_engine/exceptions/errors.hpp"
#include "template_engine/engine.hpp"
#include "traverser/cpp_traverser.hpp"
#include "utils/string/file.hpp"
#include "logging/default.hpp"
#include "utils/type_discovery.hpp"
using std::string;
@ -27,7 +27,11 @@ public:
CppTraverser cpp_traverser;
// get paths
#ifdef BARRIER
string template_path = CONFIG(config::BARRIER_TEMPLATE_CPU_CPP_PATH);
#else
string template_path = CONFIG(config::TEMPLATE_CPU_CPP_PATH);
#endif
string template_file = utils::read_file(template_path.c_str());
// syntax tree generation
@ -55,7 +59,11 @@ public:
template_file, {{"class_name", "CodeCPU"},
{"stripped_hash", std::to_string(stripped_hash)},
{"query", query},
#ifdef BARRIER
{"stream", "RecordStream<io::Socket>"},
#else
{"stream", type_name<Stream>().to_string()},
#endif
{"code", cpp_traverser.code}});
logger.debug("generated code: {}", generated);

View File

@ -3,9 +3,11 @@
#include <cstdint>
#include <map>
#include <string>
#include <vector>
#include "query_engine/code_generator/namer.hpp"
#include "storage/model/properties/flags.hpp"
#include "query_engine/exceptions/exceptions.hpp"
// main states that are used while ast is traversed
// in order to generate ActionSequence
@ -49,6 +51,7 @@ private:
std::map<std::string, EntityStatus> entity_status;
std::map<std::string, EntityType> entity_type;
std::map<std::string, EntitySource> entity_source;
std::map<std::string, std::vector<std::string>> entity_tags;
// TODO: container that keeps track about c++ variable names
@ -81,6 +84,13 @@ public:
return entity_source.at(name);
}
auto tags(const std::string& name) const
{
if (entity_tags.find(name) == entity_tags.end())
throw CppGeneratorException("No tags for specified entity");
return entity_tags.at(name);
}
const std::map<std::string, EntityType> &all_typed_enteties()
{
return entity_type;
@ -114,4 +124,9 @@ public:
{
entity_source[name] = source;
}
void tags(const std::string& name, std::vector<std::string> tags)
{
entity_tags[name] = tags;
}
};

View File

@ -41,10 +41,18 @@ auto match_query_action =
if (place == entity_search::search_internal_id) {
auto index = fetch_internal_index(action_data, name);
code += code_line(code::match_vertex_by_id, name, index);
cypher_data.source(name, EntitySource::InternalId);
}
if (place == entity_search::search_main_storage) {
cypher_data.source(name, EntitySource::MainStorage);
}
if (place == entity_search::search_label_index) {
if (action_data.entity_data.at(name).tags.size() > 1) {
throw SemanticError("Multiple label match (currently NOT supported)");
}
cypher_data.source(name, EntitySource::LabelIndex);
cypher_data.tags(name, action_data.entity_data.at(name).tags);
}
}
// find relationship
@ -56,6 +64,10 @@ auto match_query_action =
if (place == entity_search::search_internal_id) {
auto index = fetch_internal_index(action_data, name);
code += code_line(code::match_edge_by_id, name, index);
cypher_data.source(name, EntitySource::InternalId);
}
if (place == entity_search::search_main_storage) {
cypher_data.source(name, EntitySource::MainStorage);
}
}
}

View File

@ -32,11 +32,21 @@ auto return_query_action =
// the client has to receive all elements from the main storage
if (cypher_data.source(entity) == EntitySource::MainStorage)
{
code += code_line(code::write_all_vertices, entity);
if (cypher_data.type(entity) == EntityType::Node)
code += code_line(code::write_all_vertices, entity);
else if (cypher_data.type(entity) == EntityType::Relationship)
code += code_line(code::write_all_edges, entity);
}
if (cypher_data.source(entity) == EntitySource::LabelIndex)
{
// TODO: fetch somehow label name
if (cypher_data.type(entity) == EntityType::Node) {
if (cypher_data.tags(entity).size() == 0)
throw CppGeneratorException("entity has no tags");
auto label = cypher_data.tags(entity).at(0);
code += code_line(code::fine_and_write_vertices_by_label,
entity, label);
}
// TODO: code_line
}
}

View File

@ -151,6 +151,14 @@ struct QueryActionData
return entity_data.at(entity);
}
auto get_tags(const std::string& entity) const
{
if (entity_data.find(entity) == entity_data.end())
throw CppGeneratorException("Entity " + entity + "doesn't exist");
return entity_data.at(entity).tags;
}
void print() const
{
for (auto const &action : actions) {

View File

@ -1,16 +1,24 @@
#pragma once
#include "communication/communication.hpp"
#include "query_engine/query_stripped.hpp"
#ifdef BARRIER
#include "barrier/barrier.hpp"
#else
#include "database/db.hpp"
#include "database/db_accessor.hpp"
#include "query_engine/query_stripped.hpp"
#endif
template <typename Stream>
class ICodeCPU
{
public:
virtual bool run(Db &db, code_args_t &args,
Stream &stream) = 0;
#ifdef BARRIER
virtual bool run(barrier::Db &db, code_args_t &args, Stream &stream) = 0;
#else
virtual bool run(Db &db, code_args_t &args, Stream &stream) = 0;
#endif
virtual ~ICodeCPU() {}
};

View File

@ -11,18 +11,30 @@
// execution
// postprocess the results
// BARRIER!
#ifdef BARRIER
namespace barrier
{
Db &trans(::Db &ref);
}
#endif
template <typename Stream>
class ProgramExecutor
{
public:
// QueryProgram has to know about the Stream
// Stream has to be passed in this function for every execution
auto execute(QueryProgram<Stream> &program, Db &db,
Stream &stream)
auto execute(QueryProgram<Stream> &program, Db &db, Stream &stream)
{
try {
// TODO: return result of query/code exection
#ifdef BARRIER
return program.code->run(barrier::trans(db),
program.stripped.arguments, stream);
#else
return program.code->run(db, program.stripped.arguments, stream);
#endif
} catch (...) {
// TODO: return more information about the error
throw QueryEngineException("code execution error");

View File

@ -13,6 +13,7 @@
#include "utils/hashing/fnv.hpp"
#include "utils/string/transform.hpp"
#include "utils/variadic/variadic.hpp"
#include "logging/default.hpp"
template <class T, class V>
void store_query_param(code_args_t &arguments, V &&v)
@ -25,7 +26,8 @@ class QueryStripper
{
public:
QueryStripper(Ts &&... strip_types)
: strip_types(std::make_tuple(std::forward<Ts>(strip_types)...)),
: logger(logging::log->logger("QueryStripper")),
strip_types(std::make_tuple(std::forward<Ts>(strip_types)...)),
lexer(std::make_unique<CypherLexer>())
{
}
@ -70,6 +72,10 @@ public:
std::stol(token.value));
break;
case TK_STR:
// TODO: remove quotes view lexertl
token.value.erase(0, 1);
token.value.erase(token.value.length() - 1, 1);
// TODO: remove
store_query_param<String>(stripped_arguments, token.value);
break;
case TK_BOOL: {
@ -94,6 +100,7 @@ public:
}
private:
Logger logger;
std::tuple<Ts...> strip_types;
CypherLexer::uptr lexer;

View File

@ -71,6 +71,29 @@ const std::string write_all_vertices =
" }});\n"
" stream.write_meta(\"rw\");\n";
const std::string fine_and_write_vertices_by_label =
"auto &label = t.label_find_or_create(\"{1}\");\n"
" stream.write_field(\"{0}\");\n"
" label.index().for_range(t).for_all([&](auto vertex) {\n"
" stream.write_record();\n"
" stream.write_list_header(1);\n"
" stream.write(vertex);\n"
" stream.chunk();\n"
" });\n"
" stream.write_meta(\"rw\");\n";
const std::string write_all_edges =
"stream.write_field(\"{0}\");\n"
" iter::for_all(t.edge_access(), [&](auto edge) {{\n"
" if (edge.fill()) {{\n"
" stream.write_record();\n"
" stream.write_list_header(1);\n"
" stream.write(edge);\n"
" stream.chunk();\n"
" }}\n"
" }});\n"
" stream.write_meta(\"rw\");\n";
const std::string return_true = "return true;";
const std::string todo = "// TODO: {}";

View File

@ -366,13 +366,15 @@ public:
void visit(ast::LabelList &ast_label_list) override
{
auto &data = generator.action_data();
auto &action_data = generator.action_data();
if (!ast_label_list.has_value()) return;
auto label = ast_label_list.value->name;
data.add_entity_tag(entity, label);
action_data.add_entity_tag(entity, label);
action_data.csm.search_cost(entity, entity_search::search_label_index,
entity_search::label_cost);
Traverser::visit(ast_label_list);
}

View File

@ -3,6 +3,7 @@
#include "storage/edge.hpp"
#include "storage/edge_record.hpp"
#include "storage/record_accessor.hpp"
#include "storage/vertex_accessor.hpp"
#include "utils/assert.hpp"
#include "utils/reference_wrapper.hpp"
@ -23,7 +24,7 @@ public:
const EdgeType &edge_type() const;
auto from() const;
VertexAccessor from() const;
auto to() const;
VertexAccessor to() const;
};

View File

@ -9,6 +9,10 @@
class EdgeTypeStore
{
public:
using store_t = ConcurrentMap<CharStr, std::unique_ptr<EdgeType>>;
store_t::Accessor access();
const EdgeType &find_or_create(const char *name);
bool contains(const char *name); // TODO: const
@ -24,5 +28,5 @@ public:
// templetize the two of them
private:
ConcurrentMap<CharStr, std::unique_ptr<EdgeType>> edge_types;
store_t edge_types;
};

View File

@ -5,16 +5,6 @@
#include "storage/edge_accessor.hpp"
#include "storage/vertex_accessor.hpp"
auto EdgeAccessor::from() const
{
return VertexAccessor(this->vlist->from(), this->db);
}
auto EdgeAccessor::to() const
{
return VertexAccessor(this->vlist->to(), this->db);
}
auto VertexAccessor::out() const
{
DbTransaction &t = this->db;

View File

@ -19,17 +19,22 @@ using EdgeIndexBase = IndexBase<TypeGroupEdge, K>;
class Edges
{
using prop_familys_t = ConcurrentMap<std::string, EdgePropertyFamily *>;
using store_t = ConcurrentMap<uint64_t, EdgeRecord>;
public:
store_t::Accessor access();
Option<const EdgeAccessor> find(DbTransaction &t, const Id &id);
// Creates new Edge and returns filled EdgeAccessor.
EdgeAccessor insert(DbTransaction &t, VertexRecord *from, VertexRecord *to);
prop_familys_t::Accessor property_family_access();
EdgePropertyFamily &property_family_find_or_create(const std::string &name);
private:
ConcurrentMap<uint64_t, EdgeRecord> edges;
store_t edges;
// TODO: Because familys wont be removed this could be done with more
// efficent
// data structure.

View File

@ -9,6 +9,7 @@ template <class TG, class K>
class NonUniqueUnorderedIndex : public IndexBase<TG, K>
{
public:
using store_t = List<IndexRecord<TG, K>>;
// typedef T value_type;
// typedef K key_type;
@ -33,9 +34,9 @@ public:
// Removes for all transactions obsolete Records.
// Cleaner has to call this method when he decideds that it is time for
// cleaning.
void clean(DbTransaction &) final;
// cleaning. Id must be id of oldest active transaction.
void clean(const Id &id) final;
private:
List<IndexRecord<TG, K>> list;
store_t list;
};

View File

@ -32,8 +32,8 @@ public:
// Removes for all transactions obsolete Records.
// Cleaner has to call this method when he decideds that it is time for
// cleaning.
void clean(DbTransaction &) final;
// cleaning. Id must be id of oldest active transaction.
void clean(const Id &id) final;
private:
ConcurrentSet<IndexRecord<T, K>> set;

View File

@ -56,8 +56,8 @@ public:
// Removes for all transactions obsolete Records.
// Cleaner has to call this method when he decideds that it is time for
// cleaning.
virtual void clean(DbTransaction &) = 0;
// cleaning. Id must be id of oldest active transaction.
virtual void clean(const Id &id) = 0;
// Activates index for readers.
void activate();

View File

@ -1,5 +1,6 @@
#pragma once
#include "mvcc/id.hpp"
#include "utils/border.hpp"
#include "utils/total_ordering.hpp"
@ -52,6 +53,10 @@ public:
bool is_valid(tx::Transaction &t) const;
// True if it can be removed.
bool to_clean(const Id &oldest_active) const;
// This method is valid only if is_valid is true.
const auto access(DbTransaction &db) const;
const K key;

View File

@ -9,6 +9,10 @@
class LabelStore
{
public:
using store_t = ConcurrentMap<CharStr, std::unique_ptr<Label>>;
store_t::Accessor access();
const Label &find_or_create(const char *name);
bool contains(const char *name); // TODO: const
@ -17,5 +21,5 @@ public:
// return { Label, is_found }
private:
ConcurrentMap<CharStr, std::unique_ptr<Label>> labels;
store_t labels;
};

View File

@ -36,6 +36,8 @@ public:
VertexPropertyFamily &
property_family_find_or_create(const std::string &name);
prop_familys_t::Accessor property_family_access();
private:
vertices_t vertices;
// TODO: Because families wont be removed this could be done with more

View File

@ -2,5 +2,5 @@
namespace this_thread
{
thread_local unsigned id = 0;
// thread_local unsigned id = 0;
};

View File

@ -1,11 +1,11 @@
#pragma once
#include <atomic>
#include <thread>
#include <cassert>
#include <thread>
#include "threading/id.hpp"
#include "utils/underlying_cast.hpp"
#include "id.hpp"
class Thread
{
@ -28,27 +28,20 @@ public:
}
Thread() = default;
Thread(const Thread&) = delete;
Thread(const Thread &) = delete;
Thread(Thread&& other)
{
assert(thread_id == UNINITIALIZED);
thread_id = other.thread_id;
thread = std::move(other.thread);
}
Thread(Thread &&other);
void join() { return thread.join(); }
void join();
private:
unsigned thread_id = UNINITIALIZED;
std::thread thread;
template <class F, class... Args>
void start_thread(F&& f)
void start_thread(F &&f)
{
this_thread::id = thread_id;
// this_thread::id = thread_id;
f();
}
};
std::atomic<unsigned> Thread::thread_counter {1};

View File

@ -3,6 +3,8 @@
#include <algorithm>
#include <vector>
#include "utils/option.hpp"
namespace tx
{
@ -23,6 +25,25 @@ public:
return std::binary_search(active.begin(), active.end(), xid);
}
// Return id of oldest transaction. None if there is no transactions in
// snapshot.
Option<Id> oldest_active()
{
auto n = active.size();
if (n > 0) {
Id min = active[0];
for (auto i = 1; i < n; i++) {
if (active[i] < min) {
min = active[i];
}
}
return Option<Id>(min);
} else {
return Option<Id>();
}
}
void insert(const id_t &id) { active.push_back(id); }
void remove(const id_t &id)

View File

@ -33,6 +33,9 @@ public:
// snapshot will be empty.
void wait_for_active();
// Return id of oldest transaction from snapshot.
Id oldest_active();
// True if id is in snapshot.
bool is_active(const Id &id) const;
void take_lock(RecordLock &lock);

View File

@ -51,6 +51,9 @@ public:
if (std::strcmp(key, "template_cpu_cpp_path") == 0)
return "./template/template_code_cpu.cpp";
if (std::strcmp(key, "barrier_template_cpu_cpp_path") == 0)
return "./template/barrier_template_code_cpu.cpp";
if (std::strcmp(key, "template_cpu_hpp_path") == 0)
return "./template/template_code_cpu.hpp";

View File

@ -131,6 +131,17 @@ public:
return std::move(*data._M_ptr());
}
// Takes if it exists otherwise returns given value.
T take_or(T &&value)
{
if (initialized) {
initialized = false;
return std::move(*data._M_ptr());
} else {
return std::move(value);
}
}
explicit operator bool() const { return initialized; }
private:

View File

@ -43,6 +43,13 @@ public:
initialized = true;
}
template <class... Args>
void emplace(Args &&... args)
{
new (data._M_addr()) T(args...);
initialized = true;
}
private:
__gnu_cxx::__aligned_buffer<T> data;
bool initialized = false;

View File

@ -25,6 +25,7 @@
#include "storage/vertices.cpp"
#include "storage/vertices.hpp"
#include "utils/command_line/arguments.hpp"
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
const int max_score = 1000000;

View File

@ -12,6 +12,7 @@
#include <unordered_map>
#include "import/csv_import.hpp"
#include "utils/command_line/arguments.hpp"
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
using namespace std;

View File

@ -499,83 +499,140 @@ OptionPtr<EdgeIndex<std::nullptr_t>> EdgePropertyFamily::index()
// ************************* BOLT SERIALIZER
template <class Stream>
BoltSerializer<Stream>::~BoltSerializer()
RecordStream<Stream>::~RecordStream()
{
THIS->~BoltSerializer();
// TODO: solve this
// THIS->~RecordStream();
}
template <class Stream>
void BoltSerializer<Stream>::write(const VertexAccessor &vertex)
void RecordStream<Stream>::write(const VertexAccessor &vertex)
{
HALF_CALL(write(trans(vertex)));
}
template <class Stream>
void BoltSerializer<Stream>::write(const EdgeAccessor &edge)
void RecordStream<Stream>::write(const EdgeAccessor &edge)
{
HALF_CALL(write(trans(edge)));
}
template <class Stream>
void BoltSerializer<Stream>::write(const Property &prop)
void RecordStream<Stream>::write(const Property &prop)
{
HALF_CALL(write(prop));
}
// template <class Stream>
// void RecordStream<Stream>::write_null()
// {
// HALF_CALL(write_null());
// }
template <class Stream>
void RecordStream<Stream>::write(const Bool &prop)
{
HALF_CALL(write(prop));
}
template <class Stream>
void BoltSerializer<Stream>::write_null()
{
HALF_CALL(write_null());
}
template <class Stream>
void BoltSerializer<Stream>::write(const Bool &prop)
void RecordStream<Stream>::write(const Float &prop)
{
HALF_CALL(write(prop));
}
template <class Stream>
void BoltSerializer<Stream>::write(const Float &prop)
void RecordStream<Stream>::write(const Double &prop)
{
HALF_CALL(write(prop));
}
template <class Stream>
void BoltSerializer<Stream>::write(const Double &prop)
void RecordStream<Stream>::write(const Int32 &prop)
{
HALF_CALL(write(prop));
}
template <class Stream>
void BoltSerializer<Stream>::write(const Int32 &prop)
void RecordStream<Stream>::write(const Int64 &prop)
{
HALF_CALL(write(prop));
}
template <class Stream>
void BoltSerializer<Stream>::write(const Int64 &prop)
{
HALF_CALL(write(prop));
}
template <class Stream>
void BoltSerializer<Stream>::write(const std::string &value)
void RecordStream<Stream>::write(const std::string &value)
{
HALF_CALL(write(value));
}
template <class Stream>
void BoltSerializer<Stream>::write(const String &prop)
void RecordStream<Stream>::write(const String &prop)
{
HALF_CALL(write(prop));
}
template <class Stream>
template <class T>
void BoltSerializer<Stream>::handle(const T &prop)
void RecordStream<Stream>::write_success()
{
HALF_CALL(template handle<T>(prop));
HALF_CALL(write_success());
}
template <class Stream>
void RecordStream<Stream>::write_success_empty()
{
HALF_CALL(write_success_empty());
}
template <class Stream>
void RecordStream<Stream>::write_ignored()
{
HALF_CALL(write_ignored());
}
template <class Stream>
void RecordStream<Stream>::write_fields(const std::vector<std::string> &fields)
{
HALF_CALL(write_fields(fields));
}
template <class Stream>
void RecordStream<Stream>::write_field(const std::string& field)
{
HALF_CALL(write_field(field));
}
template <class Stream>
void RecordStream<Stream>::write_list_header(size_t size)
{
HALF_CALL(write_list_header(size));
}
template <class Stream>
void RecordStream<Stream>::write_record()
{
HALF_CALL(write_record());
}
template <class Stream>
void RecordStream<Stream>::write_meta(const std::string& type)
{
HALF_CALL(write_meta(type));
}
template <class Stream>
void RecordStream<Stream>::send()
{
HALF_CALL(send());
}
template <class Stream>
void RecordStream<Stream>::chunk()
{
HALF_CALL(chunk());
}
template class RecordStream<io::Socket>;
}
// **************************** ERROR EXAMPLES ****************************** //

View File

@ -3,7 +3,7 @@
#include "communication/bolt/v1/transport/chunked_buffer.hpp"
#include "communication/bolt/v1/transport/chunked_encoder.hpp"
#include "communication/bolt/v1/transport/socket_stream.hpp"
#include "storage/edge_x_vertex.hpp"
#include "io/network/socket.hpp"
template <class Stream>
void bolt::BoltSerializer<Stream>::write(const EdgeAccessor &edge)
@ -32,5 +32,5 @@ void bolt::BoltSerializer<Stream>::write(const EdgeAccessor &edge)
}
}
// template class bolt::BoltSerializer<bolt::BoltEncoder<
// bolt::ChunkedEncoder<bolt::ChunkedBuffer<bolt::SocketStream<io::Socket>>>>>;
template class bolt::BoltSerializer<bolt::BoltEncoder<
bolt::ChunkedEncoder<bolt::ChunkedBuffer<bolt::SocketStream<io::Socket>>>>>;

View File

@ -1,6 +1,10 @@
#include "communication/bolt/v1/states/executor.hpp"
#include "communication/bolt/v1/messaging/codes.hpp"
#ifdef BARRIER
#include "barrier/barrier.cpp"
#endif
namespace bolt
{

View File

@ -63,6 +63,12 @@ public:
// string literal TODO double quote escape
rule("\\\"(.*?)\\\"", TK_STR);
// ALL BELOW COMBNATIONS DON'T WORK
// rule("(?#\\\")(.*?)(?#\\\")", TK_STR);
// rule("[\"](.*?)[\"]", TK_STR);
// rule("(?:\")(.*?)(?:\")", TK_STR);
// rule("(?#:\")(.*?)(?#:\")", TK_STR);
// rule("(?#\")(.*?)(?#\")", TK_STR);
// number
rule("\\d+", TK_LONG);

View File

@ -1,5 +1,6 @@
#include "database/db_transaction.hpp"
#include "database/db.hpp"
#include "storage/edge.hpp"
#include "storage/edge_type/edge_type.hpp"
#include "storage/label/label.hpp"
@ -10,6 +11,79 @@
return false; \
}
DbTransaction::DbTransaction(Db &db) : db(db), trans(db.tx_engine.begin()) {}
// Cleaning for indexes in labels and edge_type
template <class A>
void clean_indexes(A &&acc, Id oldest_active)
{
for (auto &l : acc) {
l.second.get()->index().clean(oldest_active);
}
}
// Cleaning for version lists
template <class A>
void clean_version_lists(A &&acc, Id oldest_active)
{
for (auto &vlist : acc) {
if (vlist.second.gc_deleted(oldest_active)) {
// TODO: Optimization, iterator with remove method.
bool succ = acc.remove(vlist.first);
assert(succ); // There is other cleaner here
}
}
}
// Cleaning for indexes in properties.
template <class A>
void clean_property_indexes(A &&acc, Id oldest_active)
{
for (auto &family : acc) {
auto oi = family.second->index.get_read();
if (oi.is_present()) {
oi.get()->clean(oldest_active);
}
}
// TODO: Code for cleaning other indexes which are not yet coded into
// the database.
}
// Cleans edge part of database. Should be called by one cleaner thread at
// one time.
void DbTransaction::clean_edge_section()
{
Id oldest_active = trans.oldest_active();
// Clean edge_type index
clean_indexes(db.graph.edge_type_store.access(), oldest_active);
// Clean family_type_s edge index
clean_property_indexes(db.graph.edges.property_family_access(),
oldest_active);
// Clean Edge list
clean_version_lists(db.graph.edges.access(), oldest_active);
}
// Cleans vertex part of database. Should be called by one cleaner thread at
// one time.
void DbTransaction::clean_vertex_section()
{
Id oldest_active = trans.oldest_active();
// Clean label index
clean_indexes(db.graph.label_store.access(), oldest_active);
// Clean family_type_s vertex index
clean_property_indexes(db.graph.vertices.property_family_access(),
oldest_active);
// Clean vertex list
clean_version_lists(db.graph.vertices.access(), oldest_active);
}
template <class TG, class IU>
bool update_property_indexes(IU &iu, const tx::Transaction &t)
{

37
src/dbms/cleaner.cpp Normal file
View File

@ -0,0 +1,37 @@
#include "dbms/cleaner.hpp"
#include <chrono>
#include <ctime>
#include <thread>
#include "database/db_transaction.hpp"
#include "threading/thread.hpp"
Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs) : dbms(dbs)
{
cleaners.push_back(std::make_unique<Thread>([&]() {
std::time_t last_clean = std::time(nullptr);
while (cleaning.load(std::memory_order_acquire)) {
std::time_t now = std::time(nullptr);
if (now >= last_clean + cleaning_cycle) {
for (auto &db : dbs.access()) {
DbTransaction t(db.second);
t.clean_edge_section();
t.clean_vertex_section();
}
last_clean = now;
} else {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}));
}
Cleaning::~Cleaning()
{
cleaning.store(false, std::memory_order_release);
for (auto &t : cleaners) {
t.get()->join();
}
}

31
src/dbms/dbms.cpp Normal file
View File

@ -0,0 +1,31 @@
#include "dbms/dbms.hpp"
// returns active database
Db &Dbms::active()
{
Db *active = active_db.load(std::memory_order_acquire);
if (UNLIKELY(active == nullptr)) {
return create_default();
} else {
return *active;
}
}
// set active database
// if active database doesn't exist create one
Db &Dbms::active(const std::string &name)
{
auto acc = dbs.access();
// create db if it doesn't exist
auto it = acc.find(name);
if (it == acc.end()) {
it = acc.emplace(name, std::forward_as_tuple(name),
std::forward_as_tuple(name))
.first;
}
// set and return active db
auto &db = it->second;
active_db.store(&db, std::memory_order_release);
return db;
}

View File

@ -1,46 +0,0 @@
#pragma once
#include <map>
#include "database/db.hpp"
class Dbms
{
public:
Dbms() { create_default(); }
// returns active database
Db &active()
{
if (UNLIKELY(active_db == nullptr)) create_default();
return *active_db;
}
// set active database
// if active database doesn't exist create one
Db &active(const std::string &name)
{
// create db if it doesn't exist
if (dbs.find(name) == dbs.end()) {
dbs.emplace(std::piecewise_construct, std::forward_as_tuple(name),
std::forward_as_tuple(name));
}
// set and return active db
auto &db = dbs.at(name);
return active_db = &db, *active_db;
}
// TODO: DELETE action
private:
// dbs container
std::map<std::string, Db> dbs;
// currently active database
Db *active_db;
// creates default database
void create_default() { active("default"); }
};

View File

@ -0,0 +1,42 @@
#include <iostream>
#include <string>
#include "query_engine/util.hpp"
#include "query_engine/i_code_cpu.hpp"
#include "storage/model/properties/all.hpp"
using std::cout;
using std::endl;
// query: {{query}}
// BARRIER!
namespace barrier
{
class {{class_name}} : public ICodeCPU<{{stream}}>
{
public:
bool run(Db &db, code_args_t &args,
{{stream}} &stream) override
{
{{code}}
}
~{{class_name}}() {}
};
}
extern "C" ICodeCPU<barrier::{{stream}}>* produce()
{
// BARRIER!
return new barrier::{{class_name}}();
}
extern "C" void destruct(ICodeCPU<barrier::{{stream}}>* p)
{
delete p;
}

View File

@ -23,7 +23,6 @@ public:
~{{class_name}}() {}
};
extern "C" ICodeCPU<{{stream}}>* produce()
{
return new {{class_name}}();

View File

@ -10,3 +10,13 @@ const EdgeType &EdgeAccessor::edge_type() const
runtime_assert(this->record->data.edge_type != nullptr, "EdgeType is null");
return *this->record->data.edge_type;
}
VertexAccessor EdgeAccessor::from() const
{
return VertexAccessor(this->vlist->from(), this->db);
}
VertexAccessor EdgeAccessor::to() const
{
return VertexAccessor(this->vlist->to(), this->db);
}

View File

@ -1,5 +1,10 @@
#include "storage/edge_type/edge_type_store.hpp"
EdgeTypeStore::store_t::Accessor EdgeTypeStore::access()
{
return edge_types.access();
}
const EdgeType &EdgeTypeStore::find_or_create(const char *name)
{
auto accessor = edge_types.access();

View File

@ -3,6 +3,8 @@
#include "storage/edge_accessor.hpp"
#include "utils/iterator/iterator.hpp"
Edges::store_t::Accessor Edges::access() { return edges.access(); }
Option<const EdgeAccessor> Edges::find(DbTransaction &t, const Id &id)
{
auto edges_accessor = edges.access();
@ -35,6 +37,11 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from,
return EdgeAccessor(edge, &inserted_edge_record->second, t);
}
Edges::prop_familys_t::Accessor Edges::property_family_access()
{
return prop_familys.access();
}
EdgePropertyFamily &
Edges::property_family_find_or_create(const std::string &name)
{

View File

@ -65,9 +65,14 @@ auto NonUniqueUnorderedIndex<T, K>::for_range_exact(DbAccessor &t_v,
}
template <class T, class K>
void NonUniqueUnorderedIndex<T, K>::clean(DbTransaction &)
void NonUniqueUnorderedIndex<T, K>::clean(const Id &id)
{
// TODO: Actual cleaning
auto end = list.end();
for (auto it = list.begin(); it != end; it++) {
if (it->to_clean(id)) {
it.remove();
}
}
}
template class NonUniqueUnorderedIndex<TypeGroupEdge, std::nullptr_t>;

View File

@ -84,9 +84,15 @@ auto UniqueOrderedIndex<T, K>::for_range_exact(DbAccessor &t_v,
}
template <class T, class K>
void UniqueOrderedIndex<T, K>::clean(DbTransaction &)
void UniqueOrderedIndex<T, K>::clean(const Id &id)
{
// TODO: Actual cleaning
auto acc = set.access();
for (auto ir : acc) {
if (ir.to_clean(id)) {
// TODO: Optimization, iterator with remove method.
acc.remove(ir);
}
}
}
template class UniqueOrderedIndex<TypeGroupEdge, std::nullptr_t>;

View File

@ -31,6 +31,13 @@ bool IndexRecord<TG, K>::is_valid(tx::Transaction &t) const
return record == vlist->find(t);
}
template <class TG, class K>
bool IndexRecord<TG, K>::to_clean(const Id &oldest_active) const
{
assert(!empty());
return record->is_deleted_before(oldest_active);
}
template <class TG, class K>
const auto IndexRecord<TG, K>::access(DbTransaction &db) const
{

View File

@ -1,5 +1,7 @@
#include "storage/label/label_store.hpp"
LabelStore::store_t::Accessor LabelStore::access() { return labels.access(); }
const Label &LabelStore::find_or_create(const char *name)
{
auto accessor = labels.access();

View File

@ -37,6 +37,11 @@ VertexAccessor Vertices::insert(DbTransaction &t)
return VertexAccessor(vertex, &inserted_vertex_record->second, t);
}
Vertices::prop_familys_t::Accessor Vertices::property_family_access()
{
return prop_familys.access();
}
VertexPropertyFamily &
Vertices::property_family_find_or_create(const std::string &name)
{

12
src/threading/thread.cpp Normal file
View File

@ -0,0 +1,12 @@
#include "threading/thread.hpp"
Thread::Thread(Thread &&other)
{
assert(thread_id == UNINITIALIZED);
thread_id = other.thread_id;
thread = std::move(other.thread);
}
void Thread::join() { return thread.join(); }
std::atomic<unsigned> Thread::thread_counter{1};

View File

@ -31,6 +31,11 @@ bool Transaction::is_active(const Id &id) const
return snapshot.is_active(id);
}
Id Transaction::oldest_active()
{
return snapshot.oldest_active().take_or(Id(id));
}
void Transaction::take_lock(RecordLock &lock) { locks.take(&lock, id); }
void Transaction::commit() { engine.commit(*this); }

View File

@ -111,6 +111,7 @@ set_property(TARGET manual_query_engine PROPERTY CXX_STANDARD 14)
add_executable(manual_query_hasher manual/query_hasher.cpp)
target_link_libraries(manual_query_hasher memgraph)
target_link_libraries(manual_query_hasher ${fmt_static_lib})
target_link_libraries(manual_query_hasher Threads::Threads)
set_property(TARGET manual_query_hasher PROPERTY CXX_STANDARD 14)
# query_action_processor

View File

@ -4,6 +4,7 @@
#include "database/db.hpp"
#include "query_engine/query_stripper.hpp"
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
// #include "storage/edges.cpp"
// #include "storage/edges.hpp"
// #include "storage/vertices.cpp"

View File

@ -10,6 +10,7 @@
#include "storage/edges.hpp"
#include "storage/vertices.cpp"
#include "storage/vertices.hpp"
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
using namespace std;