Finished creating part of snapshot.

This commit is contained in:
Kruno Tomola Fabro 2016-09-08 13:25:52 +01:00
parent e1e7f3759d
commit 4443cb8b7b
61 changed files with 1336 additions and 250 deletions

View File

@ -240,7 +240,6 @@ FILE(COPY ${include_dir}/storage/label/label.hpp DESTINATION ${build_include_dir
FILE(COPY ${include_dir}/storage/label/label_collection.hpp DESTINATION ${build_include_dir}/storage/label)
FILE(COPY ${include_dir}/storage/label/label_store.hpp DESTINATION ${build_include_dir}/storage/label)
FILE(COPY ${include_dir}/storage/indexes/index.hpp DESTINATION ${build_include_dir}/storage/indexes)
FILE(COPY ${include_dir}/storage/indexes/index_record.hpp DESTINATION ${build_include_dir}/storage/indexes)
FILE(COPY ${include_dir}/storage/indexes/index_record_collection.hpp DESTINATION ${build_include_dir}/storage/indexes)
FILE(COPY ${include_dir}/storage/indexes/index_base.hpp DESTINATION ${build_include_dir}/storage/indexes)
@ -422,6 +421,7 @@ set(memgraph_src_files
${src_dir}/utils/string/transform.cpp
${src_dir}/utils/string/join.cpp
${src_dir}/utils/string/file.cpp
${src_dir}/utils/numerics/saturate.cpp
${src_dir}/query_engine/util.cpp
${src_dir}/communication/bolt/v1/bolt.cpp
${src_dir}/communication/bolt/v1/states.cpp
@ -435,6 +435,8 @@ set(memgraph_src_files
${src_dir}/communication/bolt/v1/serialization/bolt_serializer.cpp
${src_dir}/threading/thread.cpp
${src_dir}/mvcc/id.cpp
${src_dir}/snapshot/snapshoter.cpp
${src_dir}/snapshot/snapshot_encoder.cpp
${src_dir}/storage/vertices.cpp
${src_dir}/storage/edges.cpp
${src_dir}/storage/label/label.cpp
@ -463,6 +465,7 @@ set(memgraph_src_files
${src_dir}/storage/garbage/garbage.cpp
${src_dir}/storage/vertex_accessor.cpp
${src_dir}/transactions/transaction.cpp
${src_dir}/transactions/transaction_id.cpp
${src_dir}/template_engine/engine.cpp
${src_dir}/logging/streams/stdout.cpp
${src_dir}/logging/levels.cpp

View File

@ -2,3 +2,6 @@ compile_cpu_path: "./compiled/cpu/"
template_cpu_cpp_path: "./template/template_code_cpu.cpp"
barrier_template_cpu_cpp_path: "./template/barrier_template_code_cpu.cpp"
template_cpu_hpp_path: "./template/template_code_cpu.hpp"
snapshots_path: "./snapshots"
cleaning_cycle_sec: "60"
snapshot_cycle_sec: "60"

View File

@ -551,9 +551,7 @@ public:
VertexIterator for_range(DbAccessor &, Border<K> from = Border<K>(),
Border<K> to = Border<K>());
bool unique();
Order order();
IndexType type();
};
template <class K>
@ -565,9 +563,7 @@ public:
EdgeIterator for_range(DbAccessor &, Border<K> from = Border<K>(),
Border<K> to = Border<K>());
bool unique();
Order order();
IndexType type();
};
class Db : protected Unsized

View File

@ -8,13 +8,13 @@
// THis shoul be the only place to include code from memgraph other than
// barrier.cpp
#include "mvcc/id.hpp"
#include "storage/indexes/index_definition.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/model/properties/property.hpp"
#include "storage/model/properties/stored_property.hpp"
#include "utils/border.hpp"
#include "utils/iterator/iterator.hpp"
#include "utils/option_ptr.hpp"
#include "utils/order.hpp"
#include "utils/reference_wrapper.hpp"
// Contains common classes and functions for barrier.hpp and barrier.cpp.

View File

@ -22,10 +22,18 @@ constexpr const char *COMPILE_CPU_PATH = "compile_cpu_path";
constexpr const char *TEMPLATE_CPU_CPP_PATH = "template_cpu_cpp_path";
constexpr const char *BARRIER_TEMPLATE_CPU_CPP_PATH =
"barrier_template_cpu_cpp_path";
constexpr const char *SNAPSHOTS_PATH = "snapshots_path";
constexpr const char *CLEANING_CYCLE_SEC = "cleaning_cycle_sec";
constexpr const char *SNAPSHOT_CYCLE_SEC = "snapshot_cycle_sec";
// -- all possible Memgraph's keys --
}
// code uses this define for key access
// _KEY_ is value from all possible keys that are listed above
#define CONFIG(_KEY_) config::Config<config::MemgraphConfig>::instance()[_KEY_]
namespace stupid
{
size_t from(std::string &&s) { return stoull(s); }
};
#define CONFIG_INTEGER(_KEY_) stupid::from(CONFIG(_KEY_))

View File

@ -7,6 +7,8 @@
#include "storage/graph.hpp"
#include "transactions/engine.hpp"
class Indexes;
class Db
{
public:
@ -20,7 +22,9 @@ public:
tx::Engine tx_engine;
Garbage garbage;
std::string &name();
std::string const &name() const;
Indexes indexes();
// INDEXES

View File

@ -5,6 +5,7 @@
class Db;
class DbAccessor;
class SnapshotEncoder;
using index_updates_t = std::vector<IndexUpdate>;

View File

@ -1,24 +1,25 @@
#pragma once
#include "database/db.hpp"
#include "threading/thread.hpp"
class Thread;
// How much sec is a cleaning_cycle in which cleaner will clean at most
// once.
constexpr size_t cleaning_cycle = 60;
class Cleaning
{
public:
Cleaning(ConcurrentMap<std::string, Db> &dbs);
// How much sec is a cleaning_cycle in which cleaner will clean at most
// once.
Cleaning(ConcurrentMap<std::string, Db> &dbs, size_t cleaning_cycle);
~Cleaning();
private:
ConcurrentMap<std::string, Db> &dbms;
const size_t cleaning_cycle;
std::vector<std::unique_ptr<Thread>> cleaners;
std::atomic<bool> cleaning = {true};

View File

@ -1,8 +1,10 @@
#pragma once
#include "config/config.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/db.hpp"
#include "dbms/cleaner.hpp"
#include "snapshot/snapshoter.hpp"
class Dbms
{
@ -28,5 +30,8 @@ private:
// currently active database
std::atomic<Db *> active_db;
Cleaning cleaning = {dbs};
Cleaning cleaning = {dbs, CONFIG_INTEGER(config::CLEANING_CYCLE_SEC)};
Snapshoter snapshoter = {dbs, CONFIG_INTEGER(config::SNAPSHOT_CYCLE_SEC),
CONFIG(config::SNAPSHOTS_PATH)};
};

View File

@ -3,6 +3,7 @@
#include "database/db_accessor.hpp"
#include "import/fillings/common.hpp"
#include "import/fillings/filler.hpp"
#include "utils/array_store.hpp"
template <class TG, class T, class A>
class ArrayFiller : public Filler

View File

@ -5,7 +5,7 @@
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
#include "transactions/transaction_id.hpp"
#include "mvcc/cre_exp.hpp"
#include "mvcc/hints.hpp"
@ -41,7 +41,7 @@ public:
RecordLock lock;
// check if this record is visible to the transaction t
bool visible(const tx::Transaction &t)
bool visible(const tx::TransactionId &t)
{
// TODO check if the record was created by a transaction that has been
// aborted. one might implement this by checking the hints in mvcc
@ -70,34 +70,34 @@ public:
))));
}
void mark_created(const tx::Transaction &t)
void mark_created(const tx::TransactionId &t)
{
tx.cre(t.id);
cmd.cre(t.cid);
}
void mark_deleted(const tx::Transaction &t)
void mark_deleted(const tx::TransactionId &t)
{
tx.exp(t.id);
cmd.exp(t.cid);
}
bool exp_committed(const Id &id, const tx::Transaction &t)
bool exp_committed(const Id &id, const tx::TransactionId &t)
{
return committed(hints.exp, id, t);
}
bool exp_committed(const tx::Transaction &t)
bool exp_committed(const tx::TransactionId &t)
{
return committed(hints.exp, tx.exp(), t);
}
bool cre_committed(const Id &id, const tx::Transaction &t)
bool cre_committed(const Id &id, const tx::TransactionId &t)
{
return committed(hints.cre, id, t);
}
bool cre_committed(const tx::Transaction &t)
bool cre_committed(const tx::TransactionId &t)
{
return committed(hints.cre, tx.cre(), t);
}
@ -110,7 +110,7 @@ public:
// TODO: Test this
// True if this record is visible for write.
bool is_visible_write(const tx::Transaction &t)
bool is_visible_write(const tx::TransactionId &t)
{
return (tx.cre() == t.id && // inserted by the current transaction
cmd.cre() <= t.cid && // before this command, and
@ -122,7 +122,7 @@ public:
protected:
template <class U>
bool committed(U &hints, const Id &id, const tx::Transaction &t)
bool committed(U &hints, const Id &id, const tx::TransactionId &t)
{
// you certainly can't see the transaction with id greater than yours
// as that means it started after this transaction and if it committed,
@ -130,7 +130,7 @@ protected:
if (id > t.id) return false;
// The creating transaction is still in progress (examine snapshot)
if (t.is_active(id)) return false;
if (t.in_snapshot(id)) return false;
auto hint_bits = hints.load();

View File

@ -103,7 +103,7 @@ public:
void vacuum() {}
T *find(const tx::Transaction &t) const
T *find(const tx::TransactionId &t) const
{
auto r = head.load(std::memory_order_seq_cst);

View File

@ -0,0 +1,82 @@
#pragma once
#include "utils/array_store.hpp"
#include "utils/void.hpp"
// Common menthods for translating Vertex/Edge representations in database to
// other format for extern usage.
// Implementor should override those methods which he needs, and ignore the
// rest.
// Caller is responisble to structure his calls as following:
// * start_vertex
// 1 label_count
// * label
// 1 property_count
// * property_name
// 1 handle
// 1 end_vertex
// or
// * start_edge
// 0-1 edge_type
// 1 property_count
// * property_name
// 1 handle
// 1 end_edge
//
// End goal would be to enforce these rules during compile time.
class GraphEncoder
{
public:
// Starts writing vertex with given id.
void start_vertex(Id id) {}
// Number of following label calls.
void label_count(size_t n);
// Label of currently started vertex.
void label(std::string const &l) {}
// Ends writing vertex
void end_vertex() {}
// Starts writing edge from vertex to vertex
void start_edge(Id id, Id from, Id to) {}
// Type of currently started edge
void edge_type(std::string const &et) {}
// Ends writing edge
void end_edge() {}
// Number of following paired property_name,handle calls.
void property_count(size_t n);
// Property family name of next property for currently started element.
void property_name(std::string const &name) {}
void handle(const Void &v) {}
void handle(const bool &prop) {}
void handle(const float &prop) {}
void handle(const double &prop) {}
void handle(const int32_t &prop) {}
void handle(const int64_t &prop) {}
void handle(const std::string &value) {}
void handle(const ArrayStore<bool> &) {}
void handle(const ArrayStore<int32_t> &) {}
void handle(const ArrayStore<int64_t> &) {}
void handle(const ArrayStore<float> &) {}
void handle(const ArrayStore<double> &) {}
void handle(const ArrayStore<std::string> &) {}
};

View File

@ -0,0 +1,50 @@
#pragma once
#include "storage/edge_accessor.hpp"
#include "storage/edge_type/edge_type.hpp"
#include "storage/label/label.hpp"
#include "storage/vertex_accessor.hpp"
namespace serialization
{
// Serializes Vertex to given writer which implements GraphEncoder.
template <class W>
void serialize_vertex(VertexAccessor const &v, W &writer)
{
writer.start_vertex(v.id());
auto const &labels = v.labels();
writer.label_count(labels.size());
for (auto &label : labels) {
writer.label(label.get().str());
}
auto const &propertys = v.properties();
writer.property_count(propertys.size());
for (auto &prop : propertys) {
writer.property_name(prop.key.family_name());
prop.accept_primitive(writer);
}
writer.end_vertex();
}
// Serializes Edge to given writer which implements GraphEncoder.
template <class W>
void serialize_edge(EdgeAccessor const &e, W &writer)
{
writer.start_edge(e.id(), e.from().id(), e.to().id());
writer.edge_type(e.edge_type().str());
auto const &propertys = e.properties();
writer.property_count(propertys.size());
for (auto &prop : propertys) {
writer.property_name(prop.key.family_name());
prop.accept_primitive(writer);
}
writer.end_edge();
}
};

View File

@ -0,0 +1,8 @@
#pragma once
// Decodes stored snapshot.
class SnapshotDecoder
{
public:
SnapshotDecoder(std::ifstream &snap_file);
};

View File

@ -0,0 +1,121 @@
#pragma once
#include <string>
#include <unordered_map>
#include "communication/bolt/v1/transport/bolt_encoder.hpp"
#include "mvcc/id.hpp"
#include "serialization/graph_encoder.hpp"
#include "serialization/serialization.hpp"
#include "storage/indexes/index_definition.hpp"
#include "utils/stream_wrapper.hpp"
// Represents creation of a snapshot. Contains all necessary informations
// for
// write. Caller is responisble to structure his calls as following:
// * property_name_init
// * label_name_init
// 1 start_vertices
// * <vertex>
// 1 start_edges
// * <edges>
// 1 start_indexes
// * index
// 1 end
class SnapshotEncoder : public GraphEncoder
{
public:
SnapshotEncoder(std::ofstream &stream) : stream(stream) {}
SnapshotEncoder(SnapshotEncoder const &) = delete;
SnapshotEncoder(SnapshotEncoder &&) = delete;
SnapshotEncoder &operator=(SnapshotEncoder const &) = delete;
SnapshotEncoder &operator=(SnapshotEncoder &&) = delete;
// Tells in advance which names will be used.
void property_name_init(std::string const &name);
// Tells in advance which labels will be used.
void label_name_init(std::string const &name);
// Tells in advance which edge_type will be used.
void edge_type_name_init(std::string const &name);
// Prepares for vertices
void start_vertices();
// Prepares for edges
void start_edges();
// Prepares for indexes
void start_indexes();
// Writes index definition
void index(IndexDefinition const &);
// Finishes snapshot
void end();
// *********************From graph encoder
// Starts writing vertex with given id.
void start_vertex(Id id);
// Number of following label calls.
void label_count(size_t n);
// Label of currently started vertex.
void label(std::string const &l);
// Starts writing edge from vertex to vertex
void start_edge(Id id, Id from, Id to);
// Type of currently started edge
void edge_type(std::string const &et);
// Number of following paired property_name,handle calls.
void property_count(size_t n);
// Property family name of next property for currently started element.
void property_name(std::string const &name);
void handle(const Void &v);
void handle(const bool &prop);
void handle(const float &prop);
void handle(const double &prop);
void handle(const int32_t &prop);
void handle(const int64_t &prop);
void handle(const std::string &value);
void handle(const ArrayStore<bool> &);
void handle(const ArrayStore<int32_t> &);
void handle(const ArrayStore<int64_t> &);
void handle(const ArrayStore<float> &);
void handle(const ArrayStore<double> &);
void handle(const ArrayStore<std::string> &);
private:
std::ofstream &stream;
StreamWrapper<std::ofstream> wrapped = {stream};
bolt::BoltEncoder<StreamWrapper<std::ofstream>> encoder = {wrapped};
// Contains for every property_name here snapshot local id.
std::unordered_map<std::string, size_t> property_name_map;
// Contains for every label_name here snapshot local id.
std::unordered_map<std::string, size_t> label_name_map;
// Contains for every edge_type here snapshot local id.
std::unordered_map<std::string, size_t> edge_type_name_map;
};

View File

@ -0,0 +1,75 @@
#pragma once
#include <unordered_map>
#include "database/db.hpp"
#include "logging/default.hpp"
class Thread;
class SnapshotEncoder;
class SnapshotDecoder;
// Captures snapshots.
class Snapshoter
{
public:
// How much sec is between snapshots
// snapshot_folder is path to common folder for all snapshots.
Snapshoter(ConcurrentMap<std::string, Db> &dbs, size_t snapshot_cycle,
std::string &&snapshot_folder);
~Snapshoter();
// Imports latest snapshot into the databse
void import(Db &db);
private:
void run(Logger &logger);
// Makes snapshot of given type
void make_snapshot(std::time_t now, const char *type);
// Makes snapshot. It only saves records which have changed since old_trans.
void snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionId const &old_trans);
// Loads snapshot. True if success
bool snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap);
std::string snapshot_file(Db &db, std::time_t const &now, const char *type)
{
return snapshot_db_dir(db) + "/" + std::to_string(now) + "_" + type;
}
std::string snapshot_commit_file(Db &db)
{
return snapshot_db_dir(db) + "/snapshot_commit.txt";
}
// Path to directory of database. Ensures that all necessary directorys
// exist.
std::string snapshot_db_dir(Db &db)
{
if (!sys::ensure_directory_exists(snapshot_folder)) {
logger.error("Error while creating directory \"{}\"",
snapshot_folder);
}
auto db_path = snapshot_folder + "/" + db.name();
if (!sys::ensure_directory_exists(db_path)) {
logger.error("Error while creating directory \"{}\"", db_path);
}
return db_path;
}
Logger logger;
const size_t snapshot_cycle;
const std::string snapshot_folder;
std::unique_ptr<Thread> thread = {nullptr};
ConcurrentMap<std::string, Db> &dbms;
std::atomic<bool> snapshoting = {true};
};

View File

@ -35,6 +35,8 @@ public:
operator const std::string &() const;
std::string const &str() const { return id; }
CharStr char_str() { return CharStr(&id[0]); }
type_index_t &index() const;

View File

@ -14,9 +14,9 @@ public:
// typedef K key_type;
// Created with the database
NonUniqueUnorderedIndex();
NonUniqueUnorderedIndex(IndexLocation &&loc);
NonUniqueUnorderedIndex(tx::Transaction const &t);
NonUniqueUnorderedIndex(IndexLocation &&loc, tx::Transaction const &t);
// Insert's value.
// nonunique => always succeds.

View File

@ -12,9 +12,10 @@ public:
// typedef K key_type;
// Created with the database
UniqueOrderedIndex(Order order);
UniqueOrderedIndex(IndexLocation loc, Order order);
UniqueOrderedIndex(Order order, tx::Transaction const &t);
UniqueOrderedIndex(IndexLocation loc, Order order,
tx::Transaction const &t);
// Insert's value.
// nonunique => always succeds.

View File

@ -1,44 +0,0 @@
// #pragma once
//
// #include <memory>
//
// #include "data_structures/concurrent/concurrent_map.hpp"
// #include "storage/indexes/index_record.hpp"
// #include "storage/indexes/index_record_collection.hpp"
// #include "storage/label/label.hpp"
//
// template <class Key, class Item>
// class Index
// {
// public:
// using container_t = ConcurrentMap<Key, Item>;
//
// Index() : index(std::make_unique<container_t>()) {}
//
// auto update(const Label &label, VertexIndexRecord &&index_record)
// {
// auto accessor = index->access();
// auto label_ref = label_ref_t(label);
//
// // create Index Record Collection if it doesn't exist
// if (!accessor.contains(label_ref)) {
// accessor.insert(label_ref, std::move(VertexIndexRecordCollection()));
// }
//
// // add Vertex Index Record to the Record Collection
// auto &record_collection = (*accessor.find(label_ref)).second;
// record_collection.add(std::forward<VertexIndexRecord>(index_record));
// }
//
// VertexIndexRecordCollection &find(const Label &label)
// {
// // TODO: accessor should be outside?
// // bacause otherwise GC could delete record that has just be returned
// auto label_ref = label_ref_t(label);
// auto accessor = index->access();
// return (*accessor.find(label_ref)).second;
// }
//
// private:
// std::unique_ptr<container_t> index;
// };

View File

@ -1,3 +1,4 @@
#pragma once
#include <atomic>
@ -7,9 +8,9 @@
// #include "storage/indexes/index_record.hpp"
#include "storage/garbage/delete_sensitive.hpp"
#include "storage/indexes/index_definition.hpp"
#include "utils/border.hpp"
#include "utils/iterator/virtual_iter.hpp"
#include "utils/order.hpp"
template <class TG, class K>
class IndexRecord;
@ -32,9 +33,9 @@ public:
// typedef K key_type;
// Created with the database
IndexBase(bool unique, Order order);
IndexBase(IndexDefinition &&it);
IndexBase(bool unique, Order order, const tx::Transaction &t);
IndexBase(IndexDefinition &&it, const tx::Transaction &t);
virtual ~IndexBase(){};
@ -68,15 +69,12 @@ public:
// True if transaction is obliged to insert T into index.
bool is_obliged_to_insert(const tx::Transaction &t);
bool unique() { return _unique; }
IndexType type() const { return it.type; }
Order order() { return _order; }
const IndexDefinition &definition() const { return it; }
private:
// Are the records unique
const bool _unique;
// Ordering of the records.
const Order _order;
const IndexDefinition it;
// Id of transaction which created this index.
const Id created;
// Active state

View File

@ -0,0 +1,37 @@
#pragma once
#include "utils/option.hpp"
#include "utils/order.hpp"
enum DbSide : uint8_t
{
EdgeSide = 0,
VertexSide = 1,
};
struct IndexType
{
public:
// Are the records unique
const bool unique;
// Ordering of the records.
const Order order;
};
struct IndexLocation
{
public:
const DbSide side;
const Option<std::string> property_name;
const Option<std::string> label_name;
};
// Fully answers:
// On what index?
// What kind of index?
struct IndexDefinition
{
public:
const IndexLocation loc;
const IndexType type;
};

View File

@ -0,0 +1,73 @@
#pragma once
#include "database/db.hpp"
// Operation on indexes in the Db
class Indexes
{
public:
Indexes(Db &d) : db(d) {}
// Calls F over all vertex indexes in the database which are readable.
template <class F>
void vertex_indexes(F &&f)
{
for (auto &l : db.graph.label_store.access()) {
f(l.second.get()->index());
}
for_all_property_indexes_read(
db.graph.vertices.property_family_access(), f);
}
// Calls F over all edge indexes in the database which are readable.
template <class F>
void edge_indexes(F &&f)
{
for (auto &l : db.graph.edge_type_store.access()) {
f(l.second.get()->index());
}
for_all_property_indexes_read(db.graph.edges.property_family_access(),
f);
}
// Updates property indexes for given TypeGroup TG and IU index update
template <class TG, class IU>
bool update_property_indexes(IU &iu, const tx::Transaction &t)
{
for (auto kp : iu.record->data.props) {
// FamilyProperty index
auto opi = kp.key.get_family().index.get_write(t);
if (opi.is_present()) {
if (!opi.get()->insert(IndexRecord<TG, std::nullptr_t>(
std::nullptr_t(), iu.record, iu.vlist))) {
return false;
}
}
// TODO: other properti indexes
}
return true;
}
private:
// Calls F for all * property indexes which are readable.
template <class A, class F>
void for_all_property_indexes_read(A &&acc, F &f)
{
for (auto &family : acc) {
auto oi = family.second->index.get_read();
if (oi.is_present()) {
f(*oi.get());
}
}
// TODO: Code for reaching other property indexes which are not yet
// coded into the database.
}
Db &db;
};

View File

@ -4,13 +4,12 @@
#include <stdint.h>
#include "storage/indexes/impl/nonunique_unordered_index.hpp"
#include "storage/type_group_vertex.hpp"
#include "storage/vertex.hpp"
#include "storage/vertex_accessor.hpp"
#include "utils/char_str.hpp"
#include "utils/reference_wrapper.hpp"
#include "utils/total_ordering.hpp"
// #include "storage/type_group_edge.hpp"
#include "storage/type_group_vertex.hpp"
using LabelIndexRecord = IndexRecord<TypeGroupVertex, std::nullptr_t>;
@ -39,6 +38,8 @@ public:
operator const std::string &() const;
std::string const &str() const { return name; }
CharStr char_str() const { return CharStr(name.c_str()); }
label_index_t &index() const;

View File

@ -11,13 +11,13 @@ using label_ref_t = ReferenceWrapper<const Label>;
class LabelCollection
{
public:
auto begin();
auto begin() const;
auto cbegin() const;
auto begin() { return _labels.begin(); }
auto begin() const { return _labels.begin(); }
auto cbegin() const { return _labels.begin(); }
auto end();
auto end() const;
auto cend() const;
auto end() { return _labels.end(); }
auto end() const { return _labels.end(); }
auto cend() const { return _labels.end(); }
bool add(const Label &label);
bool has(const Label &label) const;

View File

@ -4,13 +4,7 @@
#include <vector>
#include "storage/model/properties/flags.hpp"
// TODO: more bytes can be saved if this is array with exact size as number
// of elements.
// TODO: even more bytes can be saved if this is one ptr to structure which
// holds len followed by len sized array.
template <class T>
using ArrayStore = std::vector<T>;
#include "utils/array_store.hpp"
template <class T, Flags flag_t>
class Array

View File

@ -10,8 +10,8 @@ class Null
public:
const static Type type;
Void &value() { return Void::_void; }
Void const &value() const { return Void::_void; }
Void &value() { return _void; }
Void const &value() const { return _void; }
std::ostream &print(std::ostream &stream) const;

View File

@ -43,6 +43,14 @@
break; \
}
// Genrates case claus for Flags::type_name to handle primitives of type_name
// object from field of union_name
#define GENERATE_CASE_CLAUSE_FOR_HANDLER_PRIMITIVE(type_name, union_name) \
case Flags::type_name: { \
h.handle(this->union_name.value()); \
break; \
}
// Genrates case claus for Flags::type_name to print type_name object from
// field of union_name
#define GENERATE_CASE_CLAUSE_FOR_PRINT(type_name, union_name) \
@ -147,6 +155,8 @@ public:
return *this;
}
// Calls appropiate handler with a database property.
// (String,Int64,ArrayBool,...)
template <class Handler>
void accept(Handler &h) const
{
@ -157,6 +167,17 @@ public:
}
}
// Calls appropiate handler with a primitive.(std::string,bool,int64_t,...)
template <class Handler>
void accept_primitive(Handler &h) const
{
switch (key.get_type().flags()) {
GENERATE_FOR_ALL_PROPERTYS(CASE_CLAUSE_FOR_HANDLER_PRIMITIVE);
default:
assert(false);
}
}
std::ostream &print(std::ostream &stream) const
{
switch (key.get_type().flags()) {

View File

@ -51,6 +51,13 @@ public:
const Id &id() const { return vlist->id; }
// True if record visible for current transaction is visible to given
// transaction id.
bool is_visble_to(tx::TransactionId const &id)
{
return record->visible(id);
}
// TODO: Test this
Derived update() const
{

View File

@ -9,44 +9,31 @@
#include "storage/locking/record_lock.hpp"
#include "transactions/lock_store.hpp"
#include "transactions/snapshot.hpp"
#include "transactions/transaction_id.hpp"
namespace tx
{
class Engine;
class Transaction
class Transaction : public TransactionId
{
friend class Engine;
public:
Transaction(const Id &id, const Snapshot<Id> &snapshot, Engine &engine);
Transaction(const Transaction &) = delete;
Transaction(Transaction &&) = delete;
// index of this transaction
const Id id;
// index of the current command in the current transaction;
uint8_t cid;
// a snapshot of currently active transactions
// Returns copy of transaction_id
TransactionId transaction_id();
// Blocks until all transactions from snapshot finish. After this method,
// snapshot will be empty.
void wait_for_active();
// Return id of oldest transaction from snapshot.
Id oldest_active();
// True if id is in snapshot.
bool is_active(const Id &id) const;
void take_lock(RecordLock &lock);
void commit();
void abort();
Engine &engine;
private:
Snapshot<Id> snapshot;
LockStore<RecordLock> locks;
};
}

View File

@ -0,0 +1,45 @@
#pragma once
#include <cstdint>
#include <cstdlib>
#include <vector>
#include "mvcc/id.hpp"
#include "transactions/snapshot.hpp"
namespace tx
{
class Engine;
class TransactionId
{
friend class Engine;
public:
TransactionId(Engine &engine);
TransactionId(const Id &&id, const Snapshot<Id> &&snapshot, Engine &engine);
TransactionId(const Id &id, const Snapshot<Id> &snapshot, Engine &engine);
// Return id of oldest transaction from snapshot.
Id oldest_active();
// True if id is in snapshot.
bool in_snapshot(const Id &id) const;
// index of this transaction
const Id id;
// index of the current command in the current transaction;
uint8_t cid;
Engine &engine;
protected:
// a snapshot of currently active transactions
Snapshot<Id> snapshot;
};
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <vector>
// TODO: more bytes can be saved if this is array with exact size as number
// of elements.
// TODO: even more bytes can be saved if this is one ptr to structure which
// holds len followed by len sized array.
template <class T>
using ArrayStore = std::vector<T>;

View File

@ -8,6 +8,8 @@ class CharStr : public TotalOrdering<CharStr>
public:
CharStr(const char *str) : str(str) {}
std::string to_string() const { return std::string(str); }
friend bool operator==(const CharStr &lhs, const CharStr &rhs)
{
return strcmp(lhs.str, rhs.str) == 0;

View File

@ -0,0 +1,63 @@
#pragma once
#include "utils/iterator/composable.hpp"
#include "utils/iterator/iterator_base.hpp"
namespace iter
{
// Class which Combined two iterators IT1 and IT2. Both return values T
// T - type of return value
// IT1 - first iterator type
// IT2 - second iterator type
template <class T, class IT1, class IT2>
class Combined : public IteratorBase<T>,
public Composable<T, Combined<T, IT1, IT2>>
{
public:
Combined() = delete;
// Combined operation is designed to be used in chained calls which operate
// on a iterator. Combined will in that usecase receive other iterator by
// value and std::move is a optimization for it.
Combined(IT1 &&iter1, IT2 &&iter2)
: iter1(Option<IT1>(std::move(iter1))),
iter2(Option<IT2>(std::move(iter2)))
{
}
// Return values first from first iterator then from second.
Option<T> next() final
{
if (iter1.is_present()) {
auto ret = iter1.get().next();
if (ret.is_present()) {
return std::move(ret);
} else {
iter1.take();
}
}
return iter2.next();
}
Count count() final
{
return iter1.map_or([](auto &it) { return it.count(); }, 0) +
iter2.count();
}
private:
Option<IT1> iter1;
IT2 iter2;
};
template <class IT1, class IT2>
auto make_combined(IT1 &&iter1, IT2 &&iter2)
{
// Compiler cant deduce type T. decltype is here to help with it.
return Combined<decltype(iter1.next().take()), IT1, IT2>(std::move(iter1),
std::move(iter2));
}
}

View File

@ -29,6 +29,9 @@ auto make_limited_map(I &&iter, OP &&op);
template <class I, class OP>
auto make_virtual(I &&iter);
template <class IT1, class IT2>
auto make_combined(IT1 &&iter1, IT2 &&iter2);
// Class for creating easy composable iterators fo querying.
// Derived - type of derived class
// T - return type
@ -40,6 +43,12 @@ class Composable : public Crtp<Derived>
public:
auto virtualize() { return iter::make_virtual(move()); }
template <class IT>
auto combine(IT &&it)
{
return iter::make_combined<Derived, IT>(move(), std::move(it));
}
template <class OP>
auto map(OP &&op)
{

View File

@ -1,5 +1,6 @@
#pragma once
#include "utils/numerics/saturate.hpp"
#include "utils/total_ordering.hpp"
// Represents number of to be returned elements from iterator. Where acutal
@ -26,6 +27,12 @@ public:
return lhs.avg() == rhs.avg();
}
friend Count operator+(const Count &lhs, const Count &rhs)
{
return Count(num::saturating_add(lhs.min, rhs.min),
num::saturating_add(lhs.max, rhs.max));
}
size_t min;
size_t max;
};

View File

@ -1,6 +1,7 @@
#pragma once
#include "utils/iterator/accessor.hpp"
#include "utils/iterator/combined.hpp"
#include "utils/iterator/count.hpp"
#include "utils/iterator/filter.hpp"
#include "utils/iterator/flat_map.hpp"

View File

@ -35,6 +35,13 @@ private:
size_t _count;
};
// Wraps lambda which returns options as an iterator.
template <class F>
auto make_iterator(F &&f)
{
return make_iterator<F>(std::move(f), ~((size_t)0));
}
// Wraps lambda which returns options as an iterator.
template <class F>
auto make_iterator(F &&f, size_t count)

View File

@ -6,7 +6,7 @@ namespace num
{
template <class T,
typename std::enable_if<std::is_integral<T>::value>::type* = nullptr>
typename std::enable_if<std::is_integral<T>::value>::type * = nullptr>
T iceil(T x, T y)
{
// this may seem inefficient, but on x86_64, when you already perform
@ -14,5 +14,4 @@ T iceil(T x, T y)
// is basically free!
return x / y + (x % y != 0);
}
}

View File

@ -0,0 +1,12 @@
#pragma once
#include <type_traits>
namespace num
{
constexpr std::size_t size_t_HIGHEST_BIT_SETED =
((std::size_t)1) << ((sizeof(std::size_t) * 8) - 1);
std::size_t saturating_add(std::size_t a, std::size_t b);
};

View File

@ -98,6 +98,24 @@ public:
return *data._M_ptr();
}
T &get_or(T &other)
{
if (is_present()) {
return get();
} else {
return other;
}
}
T const &get_or(T const &other) const
{
if (is_present()) {
return get();
} else {
return other;
}
}
const T &get() const noexcept
{
assert(initialized);
@ -124,6 +142,26 @@ public:
}
}
template <class U, class F>
U map_or(F f, U &&def)
{
if (is_present()) {
return f(take());
} else {
return std::move(def);
}
}
template <class U, class F>
U call_or(F f, U &&def)
{
if (is_present()) {
return f(get());
} else {
return std::move(def);
}
}
T take()
{
assert(initialized);

View File

@ -1,7 +1,7 @@
#pragma once
// Defines ordering of data
enum Order
enum Order : uint8_t
{
None = 0,
Ascending = 1,

View File

@ -0,0 +1,24 @@
#pragma once
// Wraps stream with convinient methods which need only one method:
// write (const char* s, n);
template <class STREAM>
class StreamWrapper
{
public:
StreamWrapper() = delete;
StreamWrapper(STREAM &s) : stream(s) {}
void write(const unsigned char value)
{
stream.write(reinterpret_cast<const char *>(&value), 1);
}
void write(const unsigned char *value, size_t n)
{
stream.write(reinterpret_cast<const char *>(value), n);
}
private:
STREAM &stream;
};

View File

@ -1,17 +1,78 @@
#pragma once
#include <sys/syscall.h>
#include <cassert>
#include <errno.h>
#include <fstream>
#include <linux/futex.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
namespace sys
{
// Code from stackoverflow
// http://stackoverflow.com/questions/109449/getting-a-file-from-a-stdfstream
// Extracts FILE* from streams in std.
template <class STREAM>
struct STDIOAdapter
{
static FILE *yield(STREAM *stream)
{
assert(stream != NULL);
inline int futex(void* addr1, int op, int val1, const struct timespec* timeout,
void* addr2, int val3)
static cookie_io_functions_t Cookies = {.read = NULL,
.write = cookieWrite,
.seek = NULL,
.close = cookieClose};
return fopencookie(stream, "w", Cookies);
}
ssize_t static cookieWrite(void *cookie, const char *buf, size_t size)
{
if (cookie == NULL) return -1;
STREAM *writer = static_cast<STREAM *>(cookie);
writer->write(buf, size);
return size;
}
int static cookieClose(void *cookie) { return EOF; }
}; // STDIOAdapter
inline int futex(void *addr1, int op, int val1, const struct timespec *timeout,
void *addr2, int val3)
{
return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
}
};
// Ensures that everything written to file will be writen on disk when the
// function call returns. !=0 if error occured
template <class STREAM>
inline size_t flush_file_to_disk(STREAM &file)
{
file.flush();
FILE *f = STDIOAdapter<STREAM>::yield(&file);
if (fsync(fileno(f)) == 0) {
return 0;
}
return errno;
};
// True if succesffull
inline bool ensure_directory_exists(std::string const &path)
{
struct stat st = {0};
if (stat(path.c_str(), &st) == -1) {
return mkdir(path.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) == 0;
}
return true;
}
};

View File

@ -5,9 +5,9 @@
class Void : public TotalOrdering<Void>
{
public:
static Void _void;
friend bool operator<(const Void &lhs, const Void &rhs) { return false; }
friend bool operator==(const Void &lhs, const Void &rhs) { return true; }
};
static Void _void = {};

View File

@ -101,15 +101,9 @@ VertexIterator VertexIndex<K>::for_range(DbAccessor &t, Border<K> from,
}
template <class K>
bool VertexIndex<K>::unique()
IndexType VertexIndex<K>::type()
{
return HALF_CALL(unique());
}
template <class K>
Order VertexIndex<K>::order()
{
return HALF_CALL(order());
return HALF_CALL(type());
}
// INSTANCES OF VERTEX INDEX
@ -124,15 +118,9 @@ EdgeIterator EdgeIndex<K>::for_range(DbAccessor &t, Border<K> from,
}
template <class K>
bool EdgeIndex<K>::unique()
IndexType EdgeIndex<K>::type()
{
return HALF_CALL(unique());
}
template <class K>
Order EdgeIndex<K>::order()
{
return HALF_CALL(order());
return HALF_CALL(type());
}
// INSTANCES OF EDGE INDEX

View File

@ -1,10 +1,14 @@
#include "database/db.hpp"
#include "storage/indexes/indexes.hpp"
#include "storage/model/properties/property_family.hpp"
Db::Db() = default;
Db::Db(const std::string &name) : name_(name) {}
std::string &Db::name() { return name_; }
std::string const &Db::name() const { return name_; }
Indexes Db::indexes() { return Indexes(*this); }
template <class TG, class I, class G>
bool Db::create_index_on_vertex_property_family(const char *name, G &coll,

View File

@ -1,8 +1,10 @@
#include "database/db_transaction.hpp"
#include "database/db.hpp"
#include "serialization/serialization.hpp"
#include "storage/edge.hpp"
#include "storage/edge_type/edge_type.hpp"
#include "storage/indexes/indexes.hpp"
#include "storage/label/label.hpp"
#include "storage/vertex.hpp"
@ -13,15 +15,6 @@
DbTransaction::DbTransaction(Db &db) : db(db), trans(db.tx_engine.begin()) {}
// Cleaning for indexes in labels and edge_type
template <class A>
void clean_indexes(A &&acc, Id oldest_active)
{
for (auto &l : acc) {
l.second.get()->index().clean(oldest_active);
}
}
// Cleaning for version lists
template <class A>
void clean_version_lists(A &&acc, Id oldest_active)
@ -35,33 +28,14 @@ void clean_version_lists(A &&acc, Id oldest_active)
}
}
// Cleaning for indexes in properties.
template <class A>
void clean_property_indexes(A &&acc, Id oldest_active)
{
for (auto &family : acc) {
auto oi = family.second->index.get_read();
if (oi.is_present()) {
oi.get()->clean(oldest_active);
}
}
// TODO: Code for cleaning other indexes which are not yet coded into
// the database.
}
// Cleans edge part of database. Should be called by one cleaner thread at
// one time.
void DbTransaction::clean_edge_section()
{
Id oldest_active = trans.oldest_active();
// Clean edge_type index
clean_indexes(db.graph.edge_type_store.access(), oldest_active);
// Clean family_type_s edge index
clean_property_indexes(db.graph.edges.property_family_access(),
oldest_active);
// Clean indexes
db.indexes().edge_indexes([&](auto &in) { in.clean(oldest_active); });
// Clean Edge list
clean_version_lists(db.graph.edges.access(), oldest_active);
@ -73,35 +47,13 @@ void DbTransaction::clean_vertex_section()
{
Id oldest_active = trans.oldest_active();
// Clean label index
clean_indexes(db.graph.label_store.access(), oldest_active);
// Clean family_type_s vertex index
clean_property_indexes(db.graph.vertices.property_family_access(),
oldest_active);
// Clean indexes
db.indexes().vertex_indexes([&](auto &in) { in.clean(oldest_active); });
// Clean vertex list
clean_version_lists(db.graph.vertices.access(), oldest_active);
}
template <class TG, class IU>
bool update_property_indexes(IU &iu, const tx::Transaction &t)
{
for (auto kp : iu.record->data.props) {
// FamilyProperty index
auto opi = kp.key.get_family().index.get_write(t);
if (opi.is_present()) {
TRY(opi.get()->insert(IndexRecord<TG, std::nullptr_t>(
std::nullptr_t(), iu.record, iu.vlist)));
}
// TODO: other properti indexes
}
return true;
}
bool DbTransaction::update_indexes()
{
while (!index_updates.empty()) {
@ -115,7 +67,7 @@ bool DbTransaction::update_indexes()
TRY(e.record->data.edge_type->index().insert(
EdgeTypeIndexRecord(std::nullptr_t(), e.record, e.vlist)));
TRY(update_property_indexes<TypeGroupEdge>(e, trans));
TRY(db.indexes().update_property_indexes<TypeGroupEdge>(e, trans));
} else {
auto v = iu.v;
@ -128,7 +80,8 @@ bool DbTransaction::update_indexes()
LabelIndexRecord(std::nullptr_t(), v.record, v.vlist)));
}
TRY(update_property_indexes<TypeGroupVertex>(v, trans));
TRY(db.indexes().update_property_indexes<TypeGroupVertex>(v,
trans));
}
index_updates.pop_back();

View File

@ -9,7 +9,8 @@
#include "logging/default.hpp"
Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs) : dbms(dbs)
Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs, size_t cleaning_cycle)
: dbms(dbs), cleaning_cycle(cleaning_cycle)
{
cleaners.push_back(std::make_unique<Thread>([&]() {
Logger logger = logging::log->logger("Cleaner");

View File

@ -0,0 +1,178 @@
#include "snapshot/snapshot_encoder.hpp"
void SnapshotEncoder::property_name_init(std::string const &name)
{
if (property_name_map.find(name) == property_name_map.end()) {
auto id = property_name_map.size();
property_name_map.insert(std::make_pair(name, id));
}
}
void SnapshotEncoder::label_name_init(std::string const &name)
{
if (label_name_map.find(name) == label_name_map.end()) {
auto id = label_name_map.size();
label_name_map.insert(std::make_pair(name, id));
}
}
void SnapshotEncoder::edge_type_name_init(std::string const &name)
{
if (edge_type_name_map.find(name) == edge_type_name_map.end()) {
auto id = edge_type_name_map.size();
edge_type_name_map.insert(std::make_pair(name, id));
}
}
void SnapshotEncoder::end() { encoder.write_string("end"); }
// **************** INDEX
// Prepares for indexes
void SnapshotEncoder::start_indexes() { encoder.write_string("vertices"); }
// Writes index definition
void SnapshotEncoder::index(IndexDefinition const &def)
{
std::string empty;
encoder.write_byte(underlying_cast(def.loc.side));
encoder.write_string(def.loc.property_name.get_or(empty));
encoder.write_string(def.loc.label_name.get_or(empty));
encoder.write_bool(def.type.unique);
encoder.write_byte(underlying_cast(def.type.order));
}
// ************* VERTEX
// Prepares for vertices
void SnapshotEncoder::start_vertices()
{
encoder.write_map_header(property_name_map.size());
for (auto p : property_name_map) {
encoder.write_string(p.first);
encoder.write_integer(p.second);
}
encoder.write_map_header(label_name_map.size());
for (auto p : label_name_map) {
encoder.write_string(p.first);
encoder.write_integer(p.second);
}
encoder.write_map_header(edge_type_name_map.size());
for (auto p : edge_type_name_map) {
encoder.write_string(p.first);
encoder.write_integer(p.second);
}
encoder.write_string("vertices");
}
// Starts writing vertex with given id.
void SnapshotEncoder::start_vertex(Id id) { encoder.write_integer(id); }
// Number of following label calls.
void SnapshotEncoder::label_count(size_t n) { encoder.write_list_header(n); }
// Label of currently started vertex.
void SnapshotEncoder::label(std::string const &l)
{
encoder.write_integer(label_name_map.at(l));
}
// ************* EDGE
// Prepares for edges
void SnapshotEncoder::start_edges() { encoder.write_string("edges"); }
// Starts writing edge from vertex to vertex
void SnapshotEncoder::start_edge(Id id, Id from, Id to)
{
encoder.write_integer(id);
encoder.write_integer(from);
encoder.write_integer(to);
}
// Type of currently started edge
void SnapshotEncoder::edge_type(std::string const &et)
{
encoder.write_integer(edge_type_name_map.at(et));
}
// ******* PROPERTY
void SnapshotEncoder::property_count(size_t n) { encoder.write_map_header(n); }
void SnapshotEncoder::property_name(std::string const &name)
{
encoder.write_integer(property_name_map.at(name));
}
void SnapshotEncoder::handle(const Void &v) { encoder.write_null(); }
void SnapshotEncoder::handle(const bool &prop) { encoder.write_bool(prop); }
void SnapshotEncoder::handle(const float &prop) { encoder.write_double(prop); }
void SnapshotEncoder::handle(const double &prop) { encoder.write_double(prop); }
void SnapshotEncoder::handle(const int32_t &prop)
{
encoder.write_integer(prop);
}
void SnapshotEncoder::handle(const int64_t &prop)
{
encoder.write_integer(prop);
}
void SnapshotEncoder::handle(const std::string &value)
{
encoder.write_string(value);
}
void SnapshotEncoder::handle(const ArrayStore<bool> &a)
{
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_bool(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<int32_t> &a)
{
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_integer(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<int64_t> &a)
{
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_integer(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<float> &a)
{
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_double(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<double> &a)
{
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_double(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<std::string> &a)
{
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_string(e);
}
}

222
src/snapshot/snapshoter.cpp Normal file
View File

@ -0,0 +1,222 @@
#include "snapshot/snapshoter.hpp"
#include "database/db_accessor.hpp"
#include "logging/default.hpp"
#include "snapshot/snapshot_decoder.hpp"
#include "snapshot/snapshot_encoder.hpp"
#include "storage/indexes/indexes.hpp"
#include "threading/thread.hpp"
#include "utils/sys.hpp"
Snapshoter::Snapshoter(ConcurrentMap<std::string, Db> &dbs,
size_t snapshot_cycle, std::string &&snapshot_folder)
: snapshot_cycle(snapshot_cycle), snapshot_folder(snapshot_folder),
dbms(dbs)
{
thread = std::make_unique<Thread>([&]() {
logger = logging::log->logger("Snapshoter");
try {
run(logger);
} catch (const std::exception &e) {
logger.error("Irreversible error occured in snapshoter");
logger.error("{}", e.what());
}
logger.info("Shutting down snapshoter");
});
}
Snapshoter::~Snapshoter()
{
snapshoting.store(false, std::memory_order_release);
thread.get()->join();
}
void Snapshoter::run(Logger &logger)
{
std::time_t last_snapshot = std::time(nullptr);
while (snapshoting.load(std::memory_order_acquire)) {
std::time_t now = std::time(nullptr);
if (now >= last_snapshot + snapshot_cycle) {
// It's time for snapshot
make_snapshot(now, "full");
last_snapshot = now;
} else {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
void Snapshoter::make_snapshot(std::time_t now, const char *type)
{
logger.info(std::string("Started ") + type + " snapshot cycle");
for (auto &db : dbms.access()) {
auto snapshot_file_name = snapshot_file(db.second, now, type);
logger.info(std::string("Writing ") + type + " snapshot of database "
"\"{}\" to file \"{}\"",
db.first, snapshot_file_name);
DbTransaction t(db.second);
bool success = false;
try {
std::ofstream snapshot_file(
snapshot_file_name, std::fstream::binary | std::fstream::trunc);
SnapshotEncoder snap(snapshot_file);
auto old_trans = tx::TransactionId(db.second.tx_engine);
snapshot(t, snap, old_trans);
auto res = sys::flush_file_to_disk(snapshot_file);
if (res == 0) {
t.trans.commit();
success = true;
} else {
logger.error("Error {} occured while flushing snapshot file",
res);
t.trans.abort();
}
} catch (const std::exception &e) {
logger.error(std::string("Error occured while creating ") + type +
" "
"snapshot of database \"{}\"",
db.first);
logger.error("{}", e.what());
t.trans.abort();
}
if (success) {
std::ofstream commit_file(snapshot_commit_file(db.second),
std::fstream::app);
commit_file << snapshot_file_name << std::endl;
auto res = sys::flush_file_to_disk(commit_file);
if (res == 0) {
commit_file.close();
} else {
logger.error("Error {} occured while flushing commit file",
res);
}
}
}
logger.info(std::string("Finished ") + type + " snapshot cycle");
}
void Snapshoter::snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionId const &old_trans)
{
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Anounce property names
for (auto &family : db.graph.vertices.property_family_access()) {
snap.property_name_init(family.first);
}
for (auto &family : db.graph.edges.property_family_access()) {
snap.property_name_init(family.first);
}
// Anounce label names
for (auto &labels : db.graph.label_store.access()) {
snap.label_name_init(labels.first.to_string());
}
// Store vertices
snap.start_vertices();
t.vertex_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto va) { serialization::serialize_vertex(va, snap); });
// Store edges
snap.start_edges();
t.edge_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto ea) { serialization::serialize_edge(ea, snap); });
// Store info on existing indexes.
snap.start_indexes();
db.indexes().vertex_indexes([&](auto &i) { snap.index(i.definition()); });
db.indexes().edge_indexes([&](auto &i) { snap.index(i.definition()); });
snap.end();
}
void Snapshoter::import(Db &db)
{
logger.info("Started import for database \"{}\"", db.name());
try {
std::ifstream commit_file(snapshot_commit_file(db));
std::vector<std::string> snapshots;
std::string line;
while (std::getline(commit_file, line)) {
snapshots.push_back(line);
}
while (snapshots.size() > 0) {
logger.info("Importing data from snapshot \"{}\" into "
"database \"{}\"",
snapshots.back(), db.name());
DbTransaction t(db);
try {
std::ifstream snapshot_file(snapshots.back(),
std::fstream::binary);
SnapshotDecoder decoder(snapshot_file);
if (snapshot_load(t, decoder)) {
t.trans.commit();
logger.info("Succesfully imported snapshot \"{}\" into "
"database \"{}\"",
snapshots.back(), db.name());
break;
} else {
t.trans.abort();
logger.info(
"Unuccesfully tryed to import snapshot \"{}\" into "
"database \"{}\"",
snapshots.back(), db.name());
}
} catch (const std::exception &e) {
logger.error(
"Error occured while importing snapshot \"{}\" into "
"database \"{}\"",
snapshots.back(), db.name());
logger.error("{}", e.what());
t.trans.abort();
}
snapshots.pop_back();
}
} catch (const std::exception &e) {
logger.error(
"Error occured while importing snapshot for database \"{}\"",
db.name());
logger.error("{}", e.what());
}
logger.info("Finished import for database \"{}\"", db.name());
}
bool Snapshoter::snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap)
{
// TODO
}

View File

@ -1,17 +1,10 @@
#include "storage/edge_type/edge_type.hpp"
EdgeType::EdgeType(const std::string &id)
: id(id), index_v(std::unique_ptr<type_index_t>(new type_index_t()))
{
}
EdgeType::EdgeType(const char *id)
: id(std::string(id)),
index_v(std::unique_ptr<type_index_t>(new type_index_t()))
{
}
EdgeType::EdgeType(const std::string &id) : EdgeType(std::string(id)) {}
EdgeType::EdgeType(const char *id) : EdgeType(std::string(id)) {}
EdgeType::EdgeType(std::string &&id)
: id(std::move(id)),
index_v(std::unique_ptr<type_index_t>(new type_index_t()))
: id(id), index_v(std::make_unique<type_index_t>(IndexLocation{
EdgeSide, Option<std::string>(), Option<std::string>(id)}))
{
}

View File

@ -13,14 +13,15 @@
#include "storage/indexes/index_record.cpp"
template <class T, class K>
NonUniqueUnorderedIndex<T, K>::NonUniqueUnorderedIndex()
: IndexBase<T, K>(false, None)
NonUniqueUnorderedIndex<T, K>::NonUniqueUnorderedIndex(IndexLocation &&loc)
: IndexBase<T, K>(IndexDefinition{loc, IndexType{false, None}})
{
}
template <class T, class K>
NonUniqueUnorderedIndex<T, K>::NonUniqueUnorderedIndex(tx::Transaction const &t)
: IndexBase<T, K>(false, None, t)
NonUniqueUnorderedIndex<T, K>::NonUniqueUnorderedIndex(IndexLocation &&loc,
tx::Transaction const &t)
: IndexBase<T, K>(IndexDefinition{loc, IndexType{false, None}}, t)
{
}

View File

@ -17,22 +17,22 @@
#include "storage/indexes/index_record.cpp"
template <class T, class K>
UniqueOrderedIndex<T, K>::UniqueOrderedIndex(Order order)
: IndexBase<T, K>(true, order)
UniqueOrderedIndex<T, K>::UniqueOrderedIndex(IndexLocation loc, Order order)
: IndexBase<T, K>(IndexDefinition{loc, IndexType{true, order}})
{
}
template <class T, class K>
UniqueOrderedIndex<T, K>::UniqueOrderedIndex(Order order,
UniqueOrderedIndex<T, K>::UniqueOrderedIndex(IndexLocation loc, Order order,
tx::Transaction const &t)
: IndexBase<T, K>(true, order, t)
: IndexBase<T, K>(IndexDefinition{loc, IndexType{true, order}}, t)
{
}
template <class T, class K>
bool UniqueOrderedIndex<T, K>::insert(IndexRecord<T, K> &&value)
{
if (this->order() == Descending) {
if (this->type().order == Descending) {
value.set_descending();
}
return set.access().insert(std::move(value)).second;
@ -55,13 +55,13 @@ auto UniqueOrderedIndex<T, K>::for_range_exact(DbAccessor &t_v,
auto end = to_v;
// Sorted order must be checked
if (this->order() == Ascending && from_v.key.is_present()) {
if (this->type().order == Ascending && from_v.key.is_present()) {
begin = acc.cfind_or_larger(from_v);
} else if (this->order() == Descending && to_v.key.is_present()) {
} else if (this->type().order == Descending && to_v.key.is_present()) {
begin = acc.cfind_or_larger(to_v);
end = from_v;
} else {
assert(this->order() != None);
assert(this->type().order != None);
}
// TODO: determine size on fact of border size.
auto size = acc.size();

View File

@ -5,14 +5,14 @@
#include "transactions/transaction.hpp"
template <class TG, class K>
IndexBase<TG, K>::IndexBase(bool unique, Order order)
: _unique(unique), _order(order), created(Id(0)), active(true)
IndexBase<TG, K>::IndexBase(IndexDefinition &&it)
: it(it), created(Id(0)), active(true)
{
}
template <class TG, class K>
IndexBase<TG, K>::IndexBase(bool unique, Order order, const tx::Transaction &t)
: _unique(unique), _order(order), created(t.id)
IndexBase<TG, K>::IndexBase(IndexDefinition &&it, const tx::Transaction &t)
: it(it), created(t.id)
{
}

View File

@ -2,7 +2,10 @@
#include "storage/label/label.hpp"
Label::Label(const char *name)
: name(std::string(name)), index_v(std::make_unique<label_index_t>())
: name(std::string(name)),
index_v(std::make_unique<label_index_t>(
IndexLocation{VertexSide, Option<std::string>(),
Option<std::string>(std::string(name))}))
{
}

View File

@ -2,14 +2,6 @@
#include "storage/label/label.hpp"
auto LabelCollection::begin() { return _labels.begin(); }
auto LabelCollection::begin() const { return _labels.begin(); }
auto LabelCollection::cbegin() const { return _labels.begin(); }
auto LabelCollection::end() { return _labels.end(); }
auto LabelCollection::end() const { return _labels.end(); }
auto LabelCollection::cend() const { return _labels.end(); }
bool LabelCollection::add(const Label &label)
{
if (has(label)) {

View File

@ -11,10 +11,17 @@ namespace tx
Transaction::Transaction(const Id &id, const Snapshot<Id> &snapshot,
Engine &engine)
: id(id), cid(1), snapshot(snapshot), engine(engine)
: TransactionId(id, snapshot, engine)
{
}
// Returns copy of transaction_id
TransactionId Transaction::transaction_id()
{
TransactionId const &t = *this;
return t;
}
void Transaction::wait_for_active()
{
while (snapshot.size() > 0) {
@ -26,16 +33,6 @@ void Transaction::wait_for_active()
}
}
bool Transaction::is_active(const Id &id) const
{
return snapshot.is_active(id);
}
Id Transaction::oldest_active()
{
return snapshot.oldest_active().take_or(Id(id));
}
void Transaction::take_lock(RecordLock &lock) { locks.take(&lock, id); }
void Transaction::commit() { engine.commit(*this); }

View File

@ -0,0 +1,32 @@
#include "transactions/transaction_id.hpp"
namespace tx
{
TransactionId::TransactionId(Engine &engine)
: TransactionId(Id(), Snapshot<Id>(), engine)
{
}
TransactionId::TransactionId(const Id &&id, const Snapshot<Id> &&snapshot,
Engine &engine)
: id(id), cid(1), snapshot(std::move(snapshot)), engine(engine)
{
}
TransactionId::TransactionId(const Id &id, const Snapshot<Id> &snapshot,
Engine &engine)
: id(id), cid(1), snapshot(snapshot), engine(engine)
{
}
bool TransactionId::in_snapshot(const Id &id) const
{
return snapshot.is_active(id);
}
Id TransactionId::oldest_active()
{
return snapshot.oldest_active().take_or(Id(id));
}
}

View File

@ -0,0 +1,9 @@
#include "utils/numerics/saturate.hpp"
std::size_t num::saturating_add(std::size_t a, std::size_t b)
{
a = a >= size_t_HIGHEST_BIT_SETED ? size_t_HIGHEST_BIT_SETED - 1 : a;
b = b >= size_t_HIGHEST_BIT_SETED ? size_t_HIGHEST_BIT_SETED - 1 : b;
return a + b;
}