Snapshot store and load works.

This commit is contained in:
Kruno Tomola Fabro 2016-09-09 16:14:20 +01:00
parent 74b9305285
commit 911eeff1d9
28 changed files with 1068 additions and 228 deletions

View File

@ -437,6 +437,7 @@ set(memgraph_src_files
${src_dir}/mvcc/id.cpp
${src_dir}/snapshot/snapshoter.cpp
${src_dir}/snapshot/snapshot_encoder.cpp
${src_dir}/snapshot/snapshot_decoder.cpp
${src_dir}/storage/vertices.cpp
${src_dir}/storage/edges.cpp
${src_dir}/storage/label/label.cpp
@ -452,6 +453,7 @@ set(memgraph_src_files
${src_dir}/storage/model/properties/double.cpp
${src_dir}/storage/model/properties/string.cpp
${src_dir}/storage/model/properties/array.cpp
${src_dir}/storage/model/properties/property.cpp
${src_dir}/storage/model/properties/properties.cpp
${src_dir}/storage/model/properties/stored_property.cpp
${src_dir}/storage/model/properties/property_family.cpp

View File

@ -2,11 +2,11 @@
#include <string>
#include "communication/bolt/v1/packing/codes.hpp"
#include "communication/bolt/v1/messaging/codes.hpp"
#include "utils/types/byte.hpp"
#include "utils/bswap.hpp"
#include "communication/bolt/v1/packing/codes.hpp"
#include "logging/default.hpp"
#include "utils/bswap.hpp"
#include "utils/types/byte.hpp"
namespace bolt
{
@ -14,24 +14,21 @@ namespace bolt
template <class Stream>
class BoltEncoder
{
static constexpr int64_t plus_2_to_the_31 = 2147483648L;
static constexpr int64_t plus_2_to_the_15 = 32768L;
static constexpr int64_t plus_2_to_the_7 = 128L;
static constexpr int64_t minus_2_to_the_4 = -16L;
static constexpr int64_t minus_2_to_the_7 = -128L;
static constexpr int64_t plus_2_to_the_31 = 2147483648L;
static constexpr int64_t plus_2_to_the_15 = 32768L;
static constexpr int64_t plus_2_to_the_7 = 128L;
static constexpr int64_t minus_2_to_the_4 = -16L;
static constexpr int64_t minus_2_to_the_7 = -128L;
static constexpr int64_t minus_2_to_the_15 = -32768L;
static constexpr int64_t minus_2_to_the_31 = -2147483648L;
public:
BoltEncoder(Stream& stream) : stream(stream)
BoltEncoder(Stream &stream) : stream(stream)
{
logger = logging::log->logger("Bolt Encoder");
}
void write(byte value)
{
write_byte(value);
}
void write(byte value) { write_byte(value); }
void write_byte(byte value)
{
@ -39,185 +36,129 @@ public:
stream.write(value);
}
void write(const byte* values, size_t n)
{
stream.write(values, n);
}
void write(const byte *values, size_t n) { stream.write(values, n); }
void write_null()
{
stream.write(pack::Null);
}
void write_null() { stream.write(pack::Null); }
void write(bool value)
{
write_bool(value);
}
void write(bool value) { write_bool(value); }
void write_bool(bool value)
{
if(value) write_true(); else write_false();
if (value)
write_true();
else
write_false();
}
void write_true()
{
stream.write(pack::True);
}
void write_true() { stream.write(pack::True); }
void write_false()
{
stream.write(pack::False);
}
void write_false() { stream.write(pack::False); }
template <class T>
void write_value(T value)
{
value = bswap(value);
stream.write(reinterpret_cast<const byte*>(&value), sizeof(value));
stream.write(reinterpret_cast<const byte *>(&value), sizeof(value));
}
void write_integer(int64_t value)
{
if(value >= minus_2_to_the_4 && value < plus_2_to_the_7)
{
if (value >= minus_2_to_the_4 && value < plus_2_to_the_7) {
write(static_cast<byte>(value));
}
else if(value >= minus_2_to_the_7 && value < minus_2_to_the_4)
{
} else if (value >= minus_2_to_the_7 && value < minus_2_to_the_4) {
write(pack::Int8);
write(static_cast<byte>(value));
}
else if(value >= minus_2_to_the_15 && value < plus_2_to_the_15)
{
} else if (value >= minus_2_to_the_15 && value < plus_2_to_the_15) {
write(pack::Int16);
write_value(static_cast<int16_t>(value));
}
else if(value >= minus_2_to_the_31 && value < plus_2_to_the_31)
{
} else if (value >= minus_2_to_the_31 && value < plus_2_to_the_31) {
write(pack::Int32);
write_value(static_cast<int32_t>(value));
}
else
{
} else {
write(pack::Int64);
write_value(value);
}
}
void write(double value)
{
write_double(value);
}
void write(double value) { write_double(value); }
void write_double(double value)
{
write(pack::Float64);
write_value(*reinterpret_cast<const int64_t*>(&value));
write_value(*reinterpret_cast<const int64_t *>(&value));
}
void write_map_header(size_t size)
{
if(size < 0x10)
{
if (size < 0x10) {
write(static_cast<byte>(pack::TinyMap | size));
}
else if(size <= 0xFF)
{
} else if (size <= 0xFF) {
write(pack::Map8);
write(static_cast<byte>(size));
}
else if(size <= 0xFFFF)
{
} else if (size <= 0xFFFF) {
write(pack::Map16);
write_value<uint16_t>(size);
}
else
{
} else {
write(pack::Map32);
write_value<uint32_t>(size);
}
}
void write_empty_map()
{
write(pack::TinyMap);
}
void write_empty_map() { write(pack::TinyMap); }
void write_list_header(size_t size)
{
if(size < 0x10)
{
if (size < 0x10) {
write(static_cast<byte>(pack::TinyList | size));
}
else if(size <= 0xFF)
{
} else if (size <= 0xFF) {
write(pack::List8);
write(static_cast<byte>(size));
}
else if(size <= 0xFFFF)
{
} else if (size <= 0xFFFF) {
write(pack::List16);
write_value<uint16_t>(size);
}
else
{
} else {
write(pack::List32);
write_value<uint32_t>(size);
}
}
void write_empty_list()
{
write(pack::TinyList);
}
void write_empty_list() { write(pack::TinyList); }
void write_string_header(size_t size)
{
if(size < 0x10)
{
if (size < 0x10) {
write(static_cast<byte>(pack::TinyString | size));
}
else if(size <= 0xFF)
{
} else if (size <= 0xFF) {
write(pack::String8);
write(static_cast<byte>(size));
}
else if(size <= 0xFFFF)
{
} else if (size <= 0xFFFF) {
write(pack::String16);
write_value<uint16_t>(size);
}
else
{
} else {
write(pack::String32);
write_value<uint32_t>(size);
}
}
void write_string(const std::string& str)
void write_string(const std::string &str)
{
write_string(str.c_str(), str.size());
}
void write_string(const char* str, size_t len)
void write_string(const char *str, size_t len)
{
write_string_header(len);
write(reinterpret_cast<const byte*>(str), len);
write(reinterpret_cast<const byte *>(str), len);
}
void write_struct_header(size_t size)
{
if(size < 0x10)
{
if (size < 0x10) {
write(static_cast<byte>(pack::TinyStruct | size));
}
else if(size <= 0xFF)
{
} else if (size <= 0xFF) {
write(pack::Struct8);
write(static_cast<byte>(size));
}
else
{
} else {
write(pack::Struct16);
write_value<uint16_t>(size);
}
@ -269,7 +210,6 @@ protected:
Logger logger;
private:
Stream& stream;
Stream &stream;
};
}

View File

@ -0,0 +1,330 @@
#pragma once
#include <string>
#include "communication/bolt/v1/packing/codes.hpp"
#include "query_engine/exceptions/exceptions.hpp"
#include "utils/bswap.hpp"
#include "utils/types/byte.hpp"
namespace bolt
{
// BoltDecoder for streams. Meant for use in SnapshotDecoder.
// This should be recoded to recieve the current caller so that decoder can
// based on a current type call it.
template <class STREAM>
class StreamedBoltDecoder
{
static constexpr int64_t plus_2_to_the_31 = 2147483648L;
static constexpr int64_t plus_2_to_the_15 = 32768L;
static constexpr int64_t plus_2_to_the_7 = 128L;
static constexpr int64_t minus_2_to_the_4 = -16L;
static constexpr int64_t minus_2_to_the_7 = -128L;
static constexpr int64_t minus_2_to_the_15 = -32768L;
static constexpr int64_t minus_2_to_the_31 = -2147483648L;
public:
StreamedBoltDecoder(STREAM &stream) : stream(stream) {}
// Returns mark of a data.
size_t mark() { return peek_byte(); }
// Calls handle with current primitive data. Throws DecoderException if it
// isn't a primitive.
template <class H, class T>
T accept_primitive(H &handle)
{
switch (byte()) {
case pack::False: {
return handle.handle(false);
}
case pack::True: {
return handle.handle(true);
}
case pack::Float64: {
return handle.handle(read_double());
}
default: {
return handle.handle(integer());
}
};
}
// Reads map header. Throws DecoderException if it isn't map header.
size_t map_header()
{
auto marker = byte();
size_t size;
if ((marker & 0xF0) == pack::TinyMap) {
size = marker & 0x0F;
} else if (marker == pack::Map8) {
size = byte();
} else if (marker == pack::Map16) {
size = read<uint16_t>();
} else if (marker == pack::Map32) {
size = read<uint32_t>();
} else {
// Error
throw DecoderException(
"StreamedBoltDecoder: Tryed to read map header but found ",
marker);
}
return size;
}
bool is_list()
{
auto marker = peek_byte();
if ((marker & 0xF0) == pack::TinyList) {
return true;
} else if (marker == pack::List8) {
return true;
} else if (marker == pack::List16) {
return true;
} else if (marker == pack::List32) {
return true;
} else {
return false;
}
}
// Reads list header. Throws DecoderException if it isn't list header.
size_t list_header()
{
auto marker = byte();
if ((marker & 0xF0) == pack::TinyList) {
return marker & 0x0F;
} else if (marker == pack::List8) {
return byte();
} else if (marker == pack::List16) {
return read<uint16_t>();
} else if (marker == pack::List32) {
return read<uint32_t>();
} else {
// Error
throw DecoderException(
"StreamedBoltDecoder: Tryed to read list header but found ",
marker);
}
}
bool is_bool()
{
auto marker = peek_byte();
if (marker == pack::True) {
return true;
} else if (marker == pack::False) {
return true;
} else {
return false;
}
}
// Reads bool.Throws DecoderException if it isn't bool.
bool read_bool()
{
auto marker = byte();
if (marker == pack::True) {
return true;
} else if (marker == pack::False) {
return false;
} else {
throw DecoderException(
"StreamedBoltDecoder: Tryed to read bool header but found ",
marker);
}
}
bool is_integer()
{
auto marker = peek_byte();
if (marker >= minus_2_to_the_4 && marker < plus_2_to_the_7) {
return true;
} else if (marker == pack::Int8) {
return true;
} else if (marker == pack::Int16) {
return true;
} else if (marker == pack::Int32) {
return true;
} else if (marker == pack::Int64) {
return true;
} else {
return false;
}
}
// Reads integer.Throws DecoderException if it isn't integer.
int64_t integer()
{
auto marker = byte();
if (marker >= minus_2_to_the_4 && marker < plus_2_to_the_7) {
return marker;
} else if (marker == pack::Int8) {
return byte();
} else if (marker == pack::Int16) {
return read<int16_t>();
} else if (marker == pack::Int32) {
return read<int32_t>();
} else if (marker == pack::Int64) {
return read<int64_t>();
} else {
throw DecoderException(
"StreamedBoltDecoder: Tryed to read integer but found ",
marker);
}
}
bool is_double()
{
auto marker = peek_byte();
return marker == pack::Float64;
}
// Reads double.Throws DecoderException if it isn't double.
double read_double()
{
auto marker = byte();
if (marker == pack::Float64) {
auto tmp = read<int64_t>();
return *reinterpret_cast<const double *>(&tmp);
} else {
throw DecoderException(
"StreamedBoltDecoder: Tryed to read double but found ", marker);
}
}
bool is_string()
{
auto marker = peek_byte();
// if the first 4 bits equal to 1000 (0x8), this is a tiny string
if ((marker & 0xF0) == pack::TinyString) {
return true;
}
// if the marker is 0xD0, size is an 8-bit unsigned integer
else if (marker == pack::String8) {
return true;
}
// if the marker is 0xD1, size is a 16-bit big-endian unsigned integer
else if (marker == pack::String16) {
return true;
}
// if the marker is 0xD2, size is a 32-bit big-endian unsigned integer
else if (marker == pack::String32) {
return true;
} else {
return false;
}
}
// Reads string into res. Throws DecoderException if it isn't string.
void string(std::string &res)
{
if (!string_try(res)) {
throw DecoderException(
"StreamedBoltDecoder: Tryed to read string but found ",
std::to_string(peek_byte()));
}
}
// Try-s to read string. Retunrns true on success. If it didn't succed
// stream remains unchanged
bool string_try(std::string &res)
{
auto marker = peek_byte();
uint32_t size;
// if the first 4 bits equal to 1000 (0x8), this is a tiny string
if ((marker & 0xF0) == pack::TinyString) {
byte();
// size is stored in the lower 4 bits of the marker byte
size = marker & 0x0F;
}
// if the marker is 0xD0, size is an 8-bit unsigned integer
else if (marker == pack::String8) {
byte();
size = byte();
}
// if the marker is 0xD1, size is a 16-bit big-endian unsigned integer
else if (marker == pack::String16) {
byte();
size = read<uint16_t>();
}
// if the marker is 0xD2, size is a 32-bit big-endian unsigned integer
else if (marker == pack::String32) {
byte();
size = read<uint32_t>();
} else {
// Error
return false;
}
if (size > 0) {
res.resize(size);
stream.read(&res.front(), size);
} else {
res.clear();
}
return true;
}
private:
// Reads T from stream. It doens't care for alligment so this is valid only
// for primitives.
template <class T>
T read()
{
buffer.resize(sizeof(T));
// Load value
stream.read(&buffer.front(), sizeof(T));
// reinterpret bytes as the target value
auto value = reinterpret_cast<const T *>(&buffer.front());
// swap values to little endian
return bswap(*value);
}
::byte byte() { return stream.get(); }
::byte peek_byte() { return stream.peek(); }
STREAM &stream;
std::string buffer;
};
};

View File

@ -26,14 +26,12 @@ constexpr const char *SNAPSHOTS_PATH = "snapshots_path";
constexpr const char *CLEANING_CYCLE_SEC = "cleaning_cycle_sec";
constexpr const char *SNAPSHOT_CYCLE_SEC = "snapshot_cycle_sec";
// -- all possible Memgraph's keys --
inline size_t to_int(std::string &&s) { return stoull(s); }
}
// code uses this define for key access
// _KEY_ is value from all possible keys that are listed above
#define CONFIG(_KEY_) config::Config<config::MemgraphConfig>::instance()[_KEY_]
namespace stupid
{
size_t from(std::string &&s) { return stoull(s); }
};
#define CONFIG_INTEGER(_KEY_) stupid::from(CONFIG(_KEY_))
#define CONFIG_INTEGER(_KEY_) config::to_int(CONFIG(_KEY_))

View File

@ -8,6 +8,7 @@
#include "transactions/engine.hpp"
class Indexes;
class Snapshoter;
class Db
{
@ -16,6 +17,8 @@ public:
Db();
Db(const std::string &name);
// Loads newest snapshot
Db(const std::string &name, Snapshoter &snap);
Db(const Db &db) = delete;
Graph graph;

View File

@ -80,6 +80,10 @@ public:
Option<const EdgeAccessor> edge_find(const Id &id);
// Creates new Edge and returns filled EdgeAccessor.
// Slighlty faster than const version.
EdgeAccessor edge_insert(VertexAccessor &from, VertexAccessor &to);
// Creates new Edge and returns filled EdgeAccessor.
EdgeAccessor edge_insert(VertexAccessor const &from,
VertexAccessor const &to);

View File

@ -11,3 +11,8 @@ class CppGeneratorException : public BasicException
{
using BasicException::BasicException;
};
class DecoderException : public BasicException
{
using BasicException::BasicException;
};

View File

@ -0,0 +1,47 @@
#pragma once
#include "utils/array_store.hpp"
#include "utils/void.hpp"
// Common menthods for translating Vertex/Edge representations from serialized
// form into database form.
// Implementor should override those methods which he needs, and ignore the
// rest.
// Caller is responisble to structure his calls as following:
//
//
// End goal would be to enforce these rules during compile time.
class GraphDecoder
{
public:
// Starts reading vertex.
Id vertex_start();
// Returns number of stored labels.
size_t label_count();
// Wiil read label into given storage.
std::string const &label();
// Ends reading vertex
void vertex_end() {}
// Starts reading edge. Return from to ids of connected vertices.
std::pair<Id, Id> edge_start();
// Reads edge_type into given storage.
std::string const &edge_type();
// Ends reading edge.
void edge_end() {}
// Returns number of stored propertys.
size_t property_count();
// Reads property name into given storage.
std::string const &property_name();
// Reads property and calls T::handle for that property .
template <class T>
T property();
};

View File

@ -40,7 +40,7 @@ public:
void end_vertex() {}
// Starts writing edge from vertex to vertex
void start_edge(Id id, Id from, Id to) {}
void start_edge(Id from, Id to) {}
// Type of currently started edge
void edge_type(std::string const &et) {}

View File

@ -1,5 +1,6 @@
#pragma once
#include "database/db_accessor.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/edge_type/edge_type.hpp"
#include "storage/label/label.hpp"
@ -34,7 +35,7 @@ void serialize_vertex(VertexAccessor const &v, W &writer)
template <class W>
void serialize_edge(EdgeAccessor const &e, W &writer)
{
writer.start_edge(e.id(), e.from().id(), e.to().id());
writer.start_edge(e.from().id(), e.to().id());
writer.edge_type(e.edge_type().str());
@ -47,4 +48,55 @@ void serialize_edge(EdgeAccessor const &e, W &writer)
writer.end_edge();
}
// Deserializes vertex from reader into database db. Returns Id which vertex had
// in the reader and VertexAccessor.
template <class D>
std::pair<Id, VertexAccessor> deserialize_vertex(DbAccessor &db, D &reader)
{
auto v = db.vertex_insert();
auto old_id = reader.vertex_start();
std::string s;
for (auto i = reader.label_count(); i > 0; i--) {
auto &label_key = db.label_find_or_create(reader.label().c_str());
v.add_label(label_key);
}
for (auto i = reader.property_count(); i > 0; i--) {
auto &family =
db.vertex_property_family_get(reader.property_name().c_str());
v.set(StoredProperty<TypeGroupVertex>(
family, reader.template property<Property>()));
}
reader.vertex_end();
return std::make_pair(old_id, v);
}
// Deserializes edge from reader into database db. Returns loaded EdgeAccessor.
// S - is the storage with at() method for accesing VertexAccessor under thers
// deserialization local id returnd from deserialize_vertex.
template <class D, class S>
EdgeAccessor deserialize_edge(DbAccessor &db, D &reader, S &store)
{
auto ids = reader.edge_start();
VertexAccessor &from = store.at(ids.first);
VertexAccessor &to = store.at(ids.second);
auto e = db.edge_insert(from, to);
auto &edge_type_key = db.type_find_or_create(reader.edge_type().c_str());
e.edge_type(edge_type_key);
for (auto i = reader.property_count(); i > 0; i--) {
auto &family =
db.edge_property_family_get(reader.property_name().c_str());
e.set(StoredProperty<TypeGroupEdge>(
family, reader.template property<Property>()));
}
reader.edge_end();
return e;
}
};

View File

@ -1,8 +1,135 @@
#pragma once
#include <fstream>
#include <unordered_map>
#include "communication/bolt/v1/transport/streamed_bolt_decoder.hpp"
#include "mvcc/id.hpp"
#include "serialization/graph_decoder.hpp"
#include "storage/indexes/index_definition.hpp"
#include "storage/model/properties/property.hpp"
// Decodes stored snapshot.
class SnapshotDecoder
// Caller must respect loading order to be same as stored order with
// SnapshotEncoder.
class SnapshotDecoder : public GraphDecoder
{
public:
SnapshotDecoder(std::ifstream &snap_file);
// Loads propert names, label names, edge_type names.
void load_init();
// Begins process of reading vertices
void begin_vertices();
// True if it is end of vertices
bool end_vertices();
// Begins process of loading edges
void begin_edges();
// True if it is end of edges
bool end_edges();
// Begins process of reading indexes.
void start_indexes();
// Loads IndexDefinition.
IndexDefinition load_index();
// True if it is end.
bool end();
// ***************** from GraphDecoder
// Starts reading vertex.
Id vertex_start();
// Returns number of stored labels.
size_t label_count();
// Wiil read label into given storage.
std::string const &label();
// Starts reading edge. Return from to ids of connected vertices.
std::pair<Id, Id> edge_start();
// Reads edge_type into given storage.
std::string const &edge_type();
// Returns number of stored propertys.
size_t property_count();
// Reads property name into given storage.
std::string const &property_name();
// Reads property and calls T::handle for that property .
template <class T>
T property()
{
if (decoder.is_list()) {
auto size = decoder.list_header();
if (decoder.is_bool()) {
ArrayStore<bool> store;
for (auto i = 0; i < size; i++) {
store.push_back(decoder.read_bool());
}
return T::handle(std::move(store));
} else if (decoder.is_integer()) {
ArrayStore<int64_t> store;
for (auto i = 0; i < size; i++) {
store.push_back(decoder.integer());
}
return T::handle(std::move(store));
} else if (decoder.is_double()) {
ArrayStore<double> store;
for (auto i = 0; i < size; i++) {
store.push_back(decoder.read_double());
}
return T::handle(std::move(store));
} else if (decoder.is_string()) {
ArrayStore<std::string> store;
for (auto i = 0; i < size; i++) {
std::string s;
decoder.string(s);
store.push_back(std::move(s));
}
return T::handle(std::move(store));
}
} else {
if (decoder.is_bool()) {
return T::handle(decoder.read_bool());
} else if (decoder.is_integer()) {
return T::handle(decoder.integer());
} else if (decoder.is_double()) {
return T::handle(decoder.read_double());
} else if (decoder.is_string()) {
std::string s;
decoder.string(s);
return T::handle(std::move(s));
}
}
throw DecoderException("Tryed to read property but found "
"unknown type in bolt marked as: ",
decoder.mark());
}
private:
bolt::StreamedBoltDecoder<std::ifstream> decoder;
// Contains for every property_name here snapshot local id.
std::unordered_map<size_t, std::string> property_name_map;
// Contains for every label_name here snapshot local id.
std::unordered_map<size_t, std::string> label_name_map;
// Contains for every edge_type here snapshot local id.
std::unordered_map<size_t, std::string> edge_type_name_map;
};

View File

@ -15,6 +15,7 @@
// write. Caller is responisble to structure his calls as following:
// * property_name_init
// * label_name_init
// * edge_type_name_init
// 1 start_vertices
// * <vertex>
// 1 start_edges
@ -68,7 +69,7 @@ public:
void label(std::string const &l);
// Starts writing edge from vertex to vertex
void start_edge(Id id, Id from, Id to);
void start_edge(Id from, Id to);
// Type of currently started edge
void edge_type(std::string const &et);

View File

@ -4,8 +4,8 @@
#include "database/db.hpp"
#include "logging/default.hpp"
#include "threading/thread.hpp"
class Thread;
class SnapshotEncoder;
class SnapshotDecoder;
@ -21,7 +21,7 @@ public:
~Snapshoter();
// Imports latest snapshot into the databse
// Imports latest snapshot into the databse. Multi thread safe.
void import(Db &db);
private:
@ -65,6 +65,7 @@ private:
Logger logger;
const size_t snapshot_cycle;
const size_t max_old_snapshots;
const std::string snapshot_folder;
std::unique_ptr<Thread> thread = {nullptr};

View File

@ -2,6 +2,7 @@
#include "utils/option.hpp"
#include "utils/order.hpp"
#include "utils/underlying_cast.hpp"
enum DbSide : uint8_t
{
@ -32,6 +33,47 @@ public:
struct IndexDefinition
{
public:
// Serializes self which can be deserialized
template <class E>
void serialize(E &encoder) const
{
std::string empty;
encoder.write_integer(underlying_cast(loc.side));
encoder.write_string(loc.property_name.get_or(empty));
encoder.write_string(loc.label_name.get_or(empty));
encoder.write_bool(type.unique);
encoder.write_integer(underlying_cast(type.order));
}
// Deserializes self.
template <class D>
static IndexDefinition deserialize(D &decoder)
{
auto side = decoder.integer() == 0 ? EdgeSide : VertexSide;
std::string property_name_s;
decoder.string(property_name_s);
auto property_name =
property_name_s.empty()
? Option<std::string>()
: Option<std::string>(std::move(property_name_s));
std::string label_name_s;
decoder.string(label_name_s);
auto label_name = label_name_s.empty()
? Option<std::string>()
: Option<std::string>(std::move(label_name_s));
bool unique = decoder.read_bool();
auto order_v = decoder.integer();
auto order =
order_v == 0 ? None : (order_v == 1 ? Ascending : Descending);
return IndexDefinition{IndexLocation{side, property_name, label_name},
IndexType{unique, order}};
}
const IndexLocation loc;
const IndexType type;
};

View File

@ -8,6 +8,13 @@ class Indexes
public:
Indexes(Db &d) : db(d) {}
bool add_index(IndexDefinition id)
{
// TODO: Not yet implemented
// assert(false);
return true;
}
// Calls F over all vertex indexes in the database which are readable.
template <class F>
void vertex_indexes(F &&f)

View File

@ -16,6 +16,32 @@ class Property : public PropertyHolder<Type>
{
public:
using PropertyHolder<Type>::PropertyHolder;
static Property handle(Void &&v);
static Property handle(bool &&prop);
static Property handle(float &&prop);
static Property handle(double &&prop);
static Property handle(int32_t &&prop);
static Property handle(int64_t &&prop);
static Property handle(std::string &&value);
static Property handle(ArrayStore<bool> &&);
static Property handle(ArrayStore<int32_t> &&);
static Property handle(ArrayStore<int64_t> &&);
static Property handle(ArrayStore<float> &&);
static Property handle(ArrayStore<double> &&);
static Property handle(ArrayStore<std::string> &&);
};
using properties_t = std::vector<Property>;

View File

@ -21,4 +21,7 @@ public:
const static class StoredProperty<TG> null;
using PropertyHolder<property_key<TG>>::PropertyHolder;
template <class P>
StoredProperty(PropertyFamily<TG> &family, P &&property);
};

View File

@ -13,37 +13,19 @@
namespace sys
{
// Code from stackoverflow
// http://stackoverflow.com/questions/109449/getting-a-file-from-a-stdfstream
// Code from stackoverflow:
// http://stackoverflow.com/questions/676787/how-to-do-fsync-on-an-ofstream
// Extracts FILE* from streams in std.
template <class STREAM>
struct STDIOAdapter
inline int GetFileDescriptor(std::filebuf &filebuf)
{
static FILE *yield(STREAM *stream)
class my_filebuf : public std::filebuf
{
assert(stream != NULL);
public:
int handle() { return _M_file.fd(); }
};
static cookie_io_functions_t Cookies = {.read = NULL,
.write = cookieWrite,
.seek = NULL,
.close = cookieClose};
return fopencookie(stream, "w", Cookies);
}
ssize_t static cookieWrite(void *cookie, const char *buf, size_t size)
{
if (cookie == NULL) return -1;
STREAM *writer = static_cast<STREAM *>(cookie);
writer->write(buf, size);
return size;
}
int static cookieClose(void *cookie) { return EOF; }
}; // STDIOAdapter
return static_cast<my_filebuf &>(filebuf).handle();
}
inline int futex(void *addr1, int op, int val1, const struct timespec *timeout,
void *addr2, int val3)
@ -57,8 +39,7 @@ template <class STREAM>
inline size_t flush_file_to_disk(STREAM &file)
{
file.flush();
FILE *f = STDIOAdapter<STREAM>::yield(&file);
if (fsync(fileno(f)) == 0) {
if (fsync(GetFileDescriptor(*file.rdbuf())) == 0) {
return 0;
}

View File

@ -1,11 +1,18 @@
#include "database/db.hpp"
#include "snapshot/snapshoter.hpp"
#include "storage/indexes/indexes.hpp"
#include "storage/model/properties/property_family.hpp"
Db::Db() = default;
Db::Db(const std::string &name) : name_(name) {}
Db::Db(const std::string &name, Snapshoter &snap) : name_(name)
{
snap.import(*this);
}
std::string const &Db::name() const { return name_; }
Indexes Db::indexes() { return Indexes(*this); }

View File

@ -32,6 +32,15 @@ Option<const EdgeAccessor> DbAccessor::edge_find(const Id &id)
return db_transaction.db.graph.edges.find(db_transaction, id);
}
EdgeAccessor DbAccessor::edge_insert(VertexAccessor &from, VertexAccessor &to)
{
auto edge_accessor = db_transaction.db.graph.edges.insert(
db_transaction, from.vlist, to.vlist);
from->data.out.add(edge_accessor.vlist);
to->data.in.add(edge_accessor.vlist);
return edge_accessor;
}
EdgeAccessor DbAccessor::edge_insert(VertexAccessor const &from,
VertexAccessor const &to)
{

View File

@ -12,37 +12,41 @@
Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs, size_t cleaning_cycle)
: dbms(dbs), cleaning_cycle(cleaning_cycle)
{
cleaners.push_back(std::make_unique<Thread>([&]() {
Logger logger = logging::log->logger("Cleaner");
std::time_t last_clean = std::time(nullptr);
while (cleaning.load(std::memory_order_acquire)) {
std::time_t now = std::time(nullptr);
cleaners.push_back(
std::make_unique<Thread>([&, cleaning_cycle = cleaning_cycle ]() {
Logger logger = logging::log->logger("Cleaner");
logger.info("Started with cleaning cycle of {} sec",
cleaning_cycle);
if (now >= last_clean + cleaning_cycle) {
logger.info("Started cleaning cyle");
for (auto &db : dbs.access()) {
logger.info("Cleaning database \"{}\"", db.first);
DbTransaction t(db.second);
try {
logger.info("Cleaning edges");
t.clean_edge_section();
logger.info("Cleaning vertices");
t.clean_vertex_section();
} catch (const std::exception &e) {
logger.error(
"Error occured while cleaning database \"{}\"",
db.first);
logger.error("{}", e.what());
std::time_t last_clean = std::time(nullptr);
while (cleaning.load(std::memory_order_acquire)) {
std::time_t now = std::time(nullptr);
if (now >= last_clean + cleaning_cycle) {
logger.info("Started cleaning cyle");
for (auto &db : dbs.access()) {
logger.info("Cleaning database \"{}\"", db.first);
DbTransaction t(db.second);
try {
logger.info("Cleaning edges");
t.clean_edge_section();
logger.info("Cleaning vertices");
t.clean_vertex_section();
} catch (const std::exception &e) {
logger.error(
"Error occured while cleaning database \"{}\"",
db.first);
logger.error("{}", e.what());
}
t.trans.commit();
}
t.trans.commit();
last_clean = now;
logger.info("Finished cleaning cyle");
} else {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
last_clean = now;
logger.info("Finished cleaning cyle");
} else {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}));
}));
}
Cleaning::~Cleaning()

View File

@ -19,8 +19,9 @@ Db &Dbms::active(const std::string &name)
// create db if it doesn't exist
auto it = acc.find(name);
if (it == acc.end()) {
Snapshoter &snap = snapshoter;
it = acc.emplace(name, std::forward_as_tuple(name),
std::forward_as_tuple(name))
std::forward_as_tuple(name, snap))
.first;
}

View File

@ -0,0 +1,134 @@
#include "snapshot/snapshot_decoder.hpp"
SnapshotDecoder::SnapshotDecoder(std::ifstream &snap_file) : decoder(snap_file)
{
}
// Loads propert names, label names, edge_type names.
void SnapshotDecoder::load_init()
{
for (auto i = decoder.map_header(); i > 0; i--) {
std::string name;
decoder.string(name);
auto id = decoder.integer();
property_name_map.insert(std::make_pair(id, std::move(name)));
}
for (auto i = decoder.map_header(); i > 0; i--) {
std::string name;
decoder.string(name);
auto id = decoder.integer();
label_name_map.insert(std::make_pair(id, std::move(name)));
}
for (auto i = decoder.map_header(); i > 0; i--) {
std::string name;
decoder.string(name);
auto id = decoder.integer();
edge_type_name_map.insert(std::make_pair(id, std::move(name)));
}
}
// Begins process of reading vertices
void SnapshotDecoder::begin_vertices()
{
std::string name;
decoder.string(name);
if (name != "vertices") {
throw DecoderException(
"Tryed to start reading vertices on illegal position marked as: " +
name);
}
}
// True if it is end of vertices
bool SnapshotDecoder::end_vertices()
{
std::string name;
bool ret = decoder.string_try(name);
if (ret && name != "edges") {
throw DecoderException(
"Tryed to end reading vertices on illegal position marked as: " +
name);
}
return ret;
}
// Begins process of loading edges
void SnapshotDecoder::begin_edges()
{
// EMPTY
}
// True if it is end of edges
bool SnapshotDecoder::end_edges()
{
std::string name;
bool ret = decoder.string_try(name);
if (ret && name != "indexes") {
throw DecoderException(
"Tryed to end reading edges on illegal position marked as: " +
name);
}
return ret;
}
// Begins process of reading indexes.
void SnapshotDecoder::start_indexes()
{
// EMPTY
}
// Loads IndexDefinition.
IndexDefinition SnapshotDecoder::load_index()
{
return IndexDefinition::deserialize(decoder);
}
// True if it is end.
bool SnapshotDecoder::end()
{
std::string name;
bool ret = decoder.string_try(name);
if (ret && name != "end") {
throw DecoderException("Tryed to end on illegal position marked as: " +
name);
}
return ret;
}
// ***************** from GraphDecoder
// Starts reading vertex.
Id SnapshotDecoder::vertex_start() { return Id(decoder.integer()); }
// Returns number of stored labels.
size_t SnapshotDecoder::label_count() { return decoder.list_header(); }
// Wiil read label into given storage.
std::string const &SnapshotDecoder::label()
{
return label_name_map.at(decoder.integer());
}
// Starts reading edge. Return from to ids of connected vertices.
std::pair<Id, Id> SnapshotDecoder::edge_start()
{
auto from = Id(decoder.integer());
auto to = Id(decoder.integer());
return std::make_pair(from, to);
}
// Reads edge_type into given storage.
std::string const &SnapshotDecoder::edge_type()
{
return edge_type_name_map.at(decoder.integer());
}
// Returns number of stored propertys.
size_t SnapshotDecoder::property_count() { return decoder.map_header(); }
// Reads property name into given storage.
std::string const &SnapshotDecoder::property_name()
{
return property_name_map.at(decoder.integer());
}

View File

@ -29,18 +29,12 @@ void SnapshotEncoder::end() { encoder.write_string("end"); }
// **************** INDEX
// Prepares for indexes
void SnapshotEncoder::start_indexes() { encoder.write_string("vertices"); }
void SnapshotEncoder::start_indexes() { encoder.write_string("indexes"); }
// Writes index definition
void SnapshotEncoder::index(IndexDefinition const &def)
{
std::string empty;
encoder.write_byte(underlying_cast(def.loc.side));
encoder.write_string(def.loc.property_name.get_or(empty));
encoder.write_string(def.loc.label_name.get_or(empty));
encoder.write_bool(def.type.unique);
encoder.write_byte(underlying_cast(def.type.order));
def.serialize(encoder);
}
// ************* VERTEX
@ -85,9 +79,8 @@ void SnapshotEncoder::label(std::string const &l)
void SnapshotEncoder::start_edges() { encoder.write_string("edges"); }
// Starts writing edge from vertex to vertex
void SnapshotEncoder::start_edge(Id id, Id from, Id to)
void SnapshotEncoder::start_edge(Id from, Id to)
{
encoder.write_integer(id);
encoder.write_integer(from);
encoder.write_integer(to);
}

View File

@ -15,6 +15,8 @@ Snapshoter::Snapshoter(ConcurrentMap<std::string, Db> &dbs,
{
thread = std::make_unique<Thread>([&]() {
logger = logging::log->logger("Snapshoter");
logger.info("Started with snapshoot cycle of {} sec",
this->snapshot_cycle);
try {
run(logger);
@ -113,49 +115,9 @@ void Snapshoter::make_snapshot(std::time_t now, const char *type)
logger.info(std::string("Finished ") + type + " snapshot cycle");
}
void Snapshoter::snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionId const &old_trans)
{
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Anounce property names
for (auto &family : db.graph.vertices.property_family_access()) {
snap.property_name_init(family.first);
}
for (auto &family : db.graph.edges.property_family_access()) {
snap.property_name_init(family.first);
}
// Anounce label names
for (auto &labels : db.graph.label_store.access()) {
snap.label_name_init(labels.first.to_string());
}
// Store vertices
snap.start_vertices();
t.vertex_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto va) { serialization::serialize_vertex(va, snap); });
// Store edges
snap.start_edges();
t.edge_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto ea) { serialization::serialize_edge(ea, snap); });
// Store info on existing indexes.
snap.start_indexes();
db.indexes().vertex_indexes([&](auto &i) { snap.index(i.definition()); });
db.indexes().edge_indexes([&](auto &i) { snap.index(i.definition()); });
snap.end();
}
void Snapshoter::import(Db &db)
{
Logger logger = logging::log->logger("Snapshot import");
logger.info("Started import for database \"{}\"", db.name());
try {
@ -216,7 +178,82 @@ void Snapshoter::import(Db &db)
logger.info("Finished import for database \"{}\"", db.name());
}
void Snapshoter::snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionId const &old_trans)
{
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Anounce property names
for (auto &family : db.graph.vertices.property_family_access()) {
snap.property_name_init(family.first);
}
for (auto &family : db.graph.edges.property_family_access()) {
snap.property_name_init(family.first);
}
// Anounce label names
for (auto &labels : db.graph.label_store.access()) {
snap.label_name_init(labels.first.to_string());
}
// Anounce edge_type names
for (auto &et : db.graph.edge_type_store.access()) {
snap.edge_type_name_init(et.first.to_string());
}
// Store vertices
snap.start_vertices();
t.vertex_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto va) { serialization::serialize_vertex(va, snap); });
// Store edges
snap.start_edges();
t.edge_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto ea) { serialization::serialize_edge(ea, snap); });
// Store info on existing indexes.
snap.start_indexes();
db.indexes().vertex_indexes([&](auto &i) { snap.index(i.definition()); });
db.indexes().edge_indexes([&](auto &i) { snap.index(i.definition()); });
snap.end();
}
bool Snapshoter::snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap)
{
// TODO
std::unordered_map<uint64_t, VertexAccessor> vertices;
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Load names
snap.load_init();
// Load vertices
snap.begin_vertices();
while (!snap.end_vertices()) {
vertices.insert(serialization::deserialize_vertex(t, snap));
}
// Load edges
snap.begin_edges();
while (!snap.end_edges()) {
serialization::deserialize_edge(t, snap, vertices);
}
// Load indexes
snap.start_indexes();
auto indexs = db.indexes();
while (!snap.end()) {
// This will add index. It is alright for now to ignore if add_index
// return false.
indexs.add_index(snap.load_index());
}
return true;
}

View File

@ -1,5 +1,7 @@
#include "storage/edge_accessor.hpp"
#include <cassert>
#include "storage/vertex_record.hpp"
void EdgeAccessor::remove() const
@ -24,6 +26,7 @@ void EdgeAccessor::edge_type(const EdgeType &edge_type)
const EdgeType &EdgeAccessor::edge_type() const
{
assert(this->record->data.edge_type != nullptr);
runtime_assert(this->record->data.edge_type != nullptr, "EdgeType is null");
return *this->record->data.edge_type;
}

View File

@ -0,0 +1,68 @@
#include "storage/model/properties/property.hpp"
#include "storage/model/properties/all.hpp"
Property Property::handle(Void &&v)
{
return Property(Null(), Type(Flags::Null));
}
Property Property::handle(bool &&prop)
{
return Property(Bool(prop), Type(Flags::Bool));
}
Property Property::handle(float &&prop)
{
return Property(Float(prop), Type(Flags::Float));
}
Property Property::handle(double &&prop)
{
return Property(Double(prop), Type(Flags::Double));
}
Property Property::handle(int32_t &&prop)
{
return Property(Int32(prop), Type(Flags::Int32));
}
Property Property::handle(int64_t &&prop)
{
return Property(Int64(prop), Type(Flags::Int64));
}
Property Property::handle(std::string &&value)
{
return Property(String(std::move(value)), Type(Flags::String));
}
Property Property::handle(ArrayStore<bool> &&a)
{
return Property(ArrayBool(std::move(a)), Type(Flags::ArrayBool));
}
Property Property::handle(ArrayStore<int32_t> &&a)
{
return Property(ArrayInt32(std::move(a)), Type(Flags::ArrayInt32));
}
Property Property::handle(ArrayStore<int64_t> &&a)
{
return Property(ArrayInt64(std::move(a)), Type(Flags::ArrayInt64));
}
Property Property::handle(ArrayStore<float> &&a)
{
return Property(ArrayFloat(std::move(a)), Type(Flags::ArrayFloat));
}
Property Property::handle(ArrayStore<double> &&a)
{
return Property(ArrayDouble(std::move(a)), Type(Flags::ArrayDouble));
}
Property Property::handle(ArrayStore<std::string> &&a)
{
return Property(ArrayString(std::move(a)), Type(Flags::ArrayString));
}

View File

@ -1,3 +1,4 @@
#include "storage/model/properties/property.hpp"
#include "storage/model/properties/stored_property.hpp"
template <class TG>
@ -7,5 +8,19 @@ template <class TG>
const StoredProperty<TG> StoredProperty<TG>::null = {
Null(), StoredProperty<TG>::null_family.getNull().family_key()};
template <class TG>
template <class P>
StoredProperty<TG>::StoredProperty(PropertyFamily<TG> &family, P &&property)
: StoredProperty(std::move(property),
family.get(property.key.get_type()).family_key())
{
}
template class StoredProperty<TypeGroupVertex>;
template class StoredProperty<TypeGroupEdge>;
template StoredProperty<TypeGroupVertex>::StoredProperty(
PropertyFamily<TypeGroupVertex> &family, Property &&property);
template StoredProperty<TypeGroupEdge>::StoredProperty(
PropertyFamily<TypeGroupEdge> &family, Property &&property);