From 4443cb8b7b02f7975041e01de7f858b895c4b276 Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Thu, 8 Sep 2016 13:25:52 +0100 Subject: [PATCH] Finished creating part of snapshot. --- CMakeLists.txt | 5 +- config/memgraph.yaml | 3 + include/barrier/barrier.hpp | 8 +- include/barrier/common.hpp | 2 +- include/config/config.hpp | 10 +- include/database/db.hpp | 6 +- include/database/db_transaction.hpp | 1 + include/dbms/cleaner.hpp | 11 +- include/dbms/dbms.hpp | 7 +- include/import/fillings/array.hpp | 1 + include/mvcc/record.hpp | 22 +- include/mvcc/version_list.hpp | 2 +- include/serialization/graph_encoder.hpp | 82 +++++++ include/serialization/serialization.hpp | 50 ++++ include/snapshot/snapshot_decoder.hpp | 8 + include/snapshot/snapshot_encoder.hpp | 121 ++++++++++ include/snapshot/snapshoter.hpp | 75 ++++++ include/storage/edge_type/edge_type.hpp | 2 + .../impl/nonunique_unordered_index.hpp | 4 +- .../indexes/impl/unique_ordered_index.hpp | 5 +- include/storage/indexes/index.hpp | 44 ---- include/storage/indexes/index_base.hpp | 16 +- include/storage/indexes/index_definition.hpp | 37 +++ include/storage/indexes/indexes.hpp | 73 ++++++ include/storage/label/label.hpp | 5 +- include/storage/label/label_collection.hpp | 12 +- include/storage/model/properties/array.hpp | 8 +- include/storage/model/properties/null.hpp | 4 +- .../model/properties/property_holder.hpp | 21 ++ include/storage/record_accessor.hpp | 7 + include/transactions/transaction.hpp | 21 +- include/transactions/transaction_id.hpp | 45 ++++ include/utils/array_store.hpp | 10 + include/utils/char_str.hpp | 2 + include/utils/iterator/combined.hpp | 63 +++++ include/utils/iterator/composable.hpp | 9 + include/utils/iterator/count.hpp | 7 + include/utils/iterator/iterator.hpp | 1 + include/utils/iterator/lambda_iterator.hpp | 7 + include/utils/numerics/ceil.hpp | 3 +- include/utils/numerics/saturate.hpp | 12 + include/utils/option.hpp | 38 +++ include/utils/order.hpp | 2 +- include/utils/stream_wrapper.hpp | 24 ++ include/utils/sys.hpp | 71 +++++- include/utils/void.hpp | 4 +- src/barrier/barrier.cpp | 20 +- src/database/db.cpp | 6 +- src/database/db_transaction.cpp | 65 +---- src/dbms/cleaner.cpp | 3 +- src/snapshot/snapshot_encoder.cpp | 178 ++++++++++++++ src/snapshot/snapshoter.cpp | 222 ++++++++++++++++++ src/storage/edge_type/edge_type.cpp | 15 +- .../impl/nonunique_unordered_index.cpp | 9 +- .../indexes/impl/unique_ordered_index.cpp | 16 +- src/storage/indexes/index_base.cpp | 8 +- src/storage/label/label.cpp | 5 +- src/storage/label/label_collection.cpp | 8 - src/transactions/transaction.cpp | 19 +- src/transactions/transaction_id.cpp | 32 +++ src/utils/numerics/saturate.cpp | 9 + 61 files changed, 1336 insertions(+), 250 deletions(-) create mode 100644 include/serialization/graph_encoder.hpp create mode 100644 include/serialization/serialization.hpp create mode 100644 include/snapshot/snapshot_decoder.hpp create mode 100644 include/snapshot/snapshot_encoder.hpp create mode 100644 include/snapshot/snapshoter.hpp delete mode 100644 include/storage/indexes/index.hpp create mode 100644 include/storage/indexes/index_definition.hpp create mode 100644 include/storage/indexes/indexes.hpp create mode 100644 include/transactions/transaction_id.hpp create mode 100644 include/utils/array_store.hpp create mode 100644 include/utils/iterator/combined.hpp create mode 100644 include/utils/numerics/saturate.hpp create mode 100644 include/utils/stream_wrapper.hpp create mode 100644 src/snapshot/snapshot_encoder.cpp create mode 100644 src/snapshot/snapshoter.cpp create mode 100644 src/transactions/transaction_id.cpp create mode 100644 src/utils/numerics/saturate.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9e9d9de83..64c5d7b1e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -240,7 +240,6 @@ FILE(COPY ${include_dir}/storage/label/label.hpp DESTINATION ${build_include_dir FILE(COPY ${include_dir}/storage/label/label_collection.hpp DESTINATION ${build_include_dir}/storage/label) FILE(COPY ${include_dir}/storage/label/label_store.hpp DESTINATION ${build_include_dir}/storage/label) -FILE(COPY ${include_dir}/storage/indexes/index.hpp DESTINATION ${build_include_dir}/storage/indexes) FILE(COPY ${include_dir}/storage/indexes/index_record.hpp DESTINATION ${build_include_dir}/storage/indexes) FILE(COPY ${include_dir}/storage/indexes/index_record_collection.hpp DESTINATION ${build_include_dir}/storage/indexes) FILE(COPY ${include_dir}/storage/indexes/index_base.hpp DESTINATION ${build_include_dir}/storage/indexes) @@ -422,6 +421,7 @@ set(memgraph_src_files ${src_dir}/utils/string/transform.cpp ${src_dir}/utils/string/join.cpp ${src_dir}/utils/string/file.cpp + ${src_dir}/utils/numerics/saturate.cpp ${src_dir}/query_engine/util.cpp ${src_dir}/communication/bolt/v1/bolt.cpp ${src_dir}/communication/bolt/v1/states.cpp @@ -435,6 +435,8 @@ set(memgraph_src_files ${src_dir}/communication/bolt/v1/serialization/bolt_serializer.cpp ${src_dir}/threading/thread.cpp ${src_dir}/mvcc/id.cpp + ${src_dir}/snapshot/snapshoter.cpp + ${src_dir}/snapshot/snapshot_encoder.cpp ${src_dir}/storage/vertices.cpp ${src_dir}/storage/edges.cpp ${src_dir}/storage/label/label.cpp @@ -463,6 +465,7 @@ set(memgraph_src_files ${src_dir}/storage/garbage/garbage.cpp ${src_dir}/storage/vertex_accessor.cpp ${src_dir}/transactions/transaction.cpp + ${src_dir}/transactions/transaction_id.cpp ${src_dir}/template_engine/engine.cpp ${src_dir}/logging/streams/stdout.cpp ${src_dir}/logging/levels.cpp diff --git a/config/memgraph.yaml b/config/memgraph.yaml index 18858d113..7049cf781 100644 --- a/config/memgraph.yaml +++ b/config/memgraph.yaml @@ -2,3 +2,6 @@ compile_cpu_path: "./compiled/cpu/" template_cpu_cpp_path: "./template/template_code_cpu.cpp" barrier_template_cpu_cpp_path: "./template/barrier_template_code_cpu.cpp" template_cpu_hpp_path: "./template/template_code_cpu.hpp" +snapshots_path: "./snapshots" +cleaning_cycle_sec: "60" +snapshot_cycle_sec: "60" diff --git a/include/barrier/barrier.hpp b/include/barrier/barrier.hpp index b0bf0e8e9..2d5edf002 100644 --- a/include/barrier/barrier.hpp +++ b/include/barrier/barrier.hpp @@ -551,9 +551,7 @@ public: VertexIterator for_range(DbAccessor &, Border from = Border(), Border to = Border()); - bool unique(); - - Order order(); + IndexType type(); }; template @@ -565,9 +563,7 @@ public: EdgeIterator for_range(DbAccessor &, Border from = Border(), Border to = Border()); - bool unique(); - - Order order(); + IndexType type(); }; class Db : protected Unsized diff --git a/include/barrier/common.hpp b/include/barrier/common.hpp index 366077364..ff89a9b1e 100644 --- a/include/barrier/common.hpp +++ b/include/barrier/common.hpp @@ -8,13 +8,13 @@ // THis shoul be the only place to include code from memgraph other than // barrier.cpp #include "mvcc/id.hpp" +#include "storage/indexes/index_definition.hpp" #include "storage/model/properties/all.hpp" #include "storage/model/properties/property.hpp" #include "storage/model/properties/stored_property.hpp" #include "utils/border.hpp" #include "utils/iterator/iterator.hpp" #include "utils/option_ptr.hpp" -#include "utils/order.hpp" #include "utils/reference_wrapper.hpp" // Contains common classes and functions for barrier.hpp and barrier.cpp. diff --git a/include/config/config.hpp b/include/config/config.hpp index 36d55b638..8f9bbd7d1 100644 --- a/include/config/config.hpp +++ b/include/config/config.hpp @@ -22,10 +22,18 @@ constexpr const char *COMPILE_CPU_PATH = "compile_cpu_path"; constexpr const char *TEMPLATE_CPU_CPP_PATH = "template_cpu_cpp_path"; constexpr const char *BARRIER_TEMPLATE_CPU_CPP_PATH = "barrier_template_cpu_cpp_path"; +constexpr const char *SNAPSHOTS_PATH = "snapshots_path"; +constexpr const char *CLEANING_CYCLE_SEC = "cleaning_cycle_sec"; +constexpr const char *SNAPSHOT_CYCLE_SEC = "snapshot_cycle_sec"; // -- all possible Memgraph's keys -- - } // code uses this define for key access // _KEY_ is value from all possible keys that are listed above #define CONFIG(_KEY_) config::Config::instance()[_KEY_] + +namespace stupid +{ +size_t from(std::string &&s) { return stoull(s); } +}; +#define CONFIG_INTEGER(_KEY_) stupid::from(CONFIG(_KEY_)) diff --git a/include/database/db.hpp b/include/database/db.hpp index c95def98a..07aa5f1fa 100644 --- a/include/database/db.hpp +++ b/include/database/db.hpp @@ -7,6 +7,8 @@ #include "storage/graph.hpp" #include "transactions/engine.hpp" +class Indexes; + class Db { public: @@ -20,7 +22,9 @@ public: tx::Engine tx_engine; Garbage garbage; - std::string &name(); + std::string const &name() const; + + Indexes indexes(); // INDEXES diff --git a/include/database/db_transaction.hpp b/include/database/db_transaction.hpp index 8e38e140c..b6fabf86b 100644 --- a/include/database/db_transaction.hpp +++ b/include/database/db_transaction.hpp @@ -5,6 +5,7 @@ class Db; class DbAccessor; +class SnapshotEncoder; using index_updates_t = std::vector; diff --git a/include/dbms/cleaner.hpp b/include/dbms/cleaner.hpp index 4b1ca9453..17930160c 100644 --- a/include/dbms/cleaner.hpp +++ b/include/dbms/cleaner.hpp @@ -1,24 +1,25 @@ #pragma once #include "database/db.hpp" +#include "threading/thread.hpp" class Thread; -// How much sec is a cleaning_cycle in which cleaner will clean at most -// once. -constexpr size_t cleaning_cycle = 60; - class Cleaning { public: - Cleaning(ConcurrentMap &dbs); + // How much sec is a cleaning_cycle in which cleaner will clean at most + // once. + Cleaning(ConcurrentMap &dbs, size_t cleaning_cycle); ~Cleaning(); private: ConcurrentMap &dbms; + const size_t cleaning_cycle; + std::vector> cleaners; std::atomic cleaning = {true}; diff --git a/include/dbms/dbms.hpp b/include/dbms/dbms.hpp index 60e35a1bf..423d3068e 100644 --- a/include/dbms/dbms.hpp +++ b/include/dbms/dbms.hpp @@ -1,8 +1,10 @@ #pragma once +#include "config/config.hpp" #include "data_structures/concurrent/concurrent_map.hpp" #include "database/db.hpp" #include "dbms/cleaner.hpp" +#include "snapshot/snapshoter.hpp" class Dbms { @@ -28,5 +30,8 @@ private: // currently active database std::atomic active_db; - Cleaning cleaning = {dbs}; + Cleaning cleaning = {dbs, CONFIG_INTEGER(config::CLEANING_CYCLE_SEC)}; + + Snapshoter snapshoter = {dbs, CONFIG_INTEGER(config::SNAPSHOT_CYCLE_SEC), + CONFIG(config::SNAPSHOTS_PATH)}; }; diff --git a/include/import/fillings/array.hpp b/include/import/fillings/array.hpp index 4c4b2379f..773578fd3 100644 --- a/include/import/fillings/array.hpp +++ b/include/import/fillings/array.hpp @@ -3,6 +3,7 @@ #include "database/db_accessor.hpp" #include "import/fillings/common.hpp" #include "import/fillings/filler.hpp" +#include "utils/array_store.hpp" template class ArrayFiller : public Filler diff --git a/include/mvcc/record.hpp b/include/mvcc/record.hpp index aab7c63e9..05f79fb10 100644 --- a/include/mvcc/record.hpp +++ b/include/mvcc/record.hpp @@ -5,7 +5,7 @@ #include "transactions/commit_log.hpp" #include "transactions/engine.hpp" -#include "transactions/transaction.hpp" +#include "transactions/transaction_id.hpp" #include "mvcc/cre_exp.hpp" #include "mvcc/hints.hpp" @@ -41,7 +41,7 @@ public: RecordLock lock; // check if this record is visible to the transaction t - bool visible(const tx::Transaction &t) + bool visible(const tx::TransactionId &t) { // TODO check if the record was created by a transaction that has been // aborted. one might implement this by checking the hints in mvcc @@ -70,34 +70,34 @@ public: )))); } - void mark_created(const tx::Transaction &t) + void mark_created(const tx::TransactionId &t) { tx.cre(t.id); cmd.cre(t.cid); } - void mark_deleted(const tx::Transaction &t) + void mark_deleted(const tx::TransactionId &t) { tx.exp(t.id); cmd.exp(t.cid); } - bool exp_committed(const Id &id, const tx::Transaction &t) + bool exp_committed(const Id &id, const tx::TransactionId &t) { return committed(hints.exp, id, t); } - bool exp_committed(const tx::Transaction &t) + bool exp_committed(const tx::TransactionId &t) { return committed(hints.exp, tx.exp(), t); } - bool cre_committed(const Id &id, const tx::Transaction &t) + bool cre_committed(const Id &id, const tx::TransactionId &t) { return committed(hints.cre, id, t); } - bool cre_committed(const tx::Transaction &t) + bool cre_committed(const tx::TransactionId &t) { return committed(hints.cre, tx.cre(), t); } @@ -110,7 +110,7 @@ public: // TODO: Test this // True if this record is visible for write. - bool is_visible_write(const tx::Transaction &t) + bool is_visible_write(const tx::TransactionId &t) { return (tx.cre() == t.id && // inserted by the current transaction cmd.cre() <= t.cid && // before this command, and @@ -122,7 +122,7 @@ public: protected: template - bool committed(U &hints, const Id &id, const tx::Transaction &t) + bool committed(U &hints, const Id &id, const tx::TransactionId &t) { // you certainly can't see the transaction with id greater than yours // as that means it started after this transaction and if it committed, @@ -130,7 +130,7 @@ protected: if (id > t.id) return false; // The creating transaction is still in progress (examine snapshot) - if (t.is_active(id)) return false; + if (t.in_snapshot(id)) return false; auto hint_bits = hints.load(); diff --git a/include/mvcc/version_list.hpp b/include/mvcc/version_list.hpp index ede6f2825..bbb1d2701 100644 --- a/include/mvcc/version_list.hpp +++ b/include/mvcc/version_list.hpp @@ -103,7 +103,7 @@ public: void vacuum() {} - T *find(const tx::Transaction &t) const + T *find(const tx::TransactionId &t) const { auto r = head.load(std::memory_order_seq_cst); diff --git a/include/serialization/graph_encoder.hpp b/include/serialization/graph_encoder.hpp new file mode 100644 index 000000000..59ca2a272 --- /dev/null +++ b/include/serialization/graph_encoder.hpp @@ -0,0 +1,82 @@ +#pragma once + +#include "utils/array_store.hpp" +#include "utils/void.hpp" + +// Common menthods for translating Vertex/Edge representations in database to +// other format for extern usage. +// Implementor should override those methods which he needs, and ignore the +// rest. +// Caller is responisble to structure his calls as following: +// * start_vertex +// 1 label_count +// * label +// 1 property_count +// * property_name +// 1 handle +// 1 end_vertex +// or +// * start_edge +// 0-1 edge_type +// 1 property_count +// * property_name +// 1 handle +// 1 end_edge +// +// End goal would be to enforce these rules during compile time. +class GraphEncoder +{ +public: + // Starts writing vertex with given id. + void start_vertex(Id id) {} + + // Number of following label calls. + void label_count(size_t n); + + // Label of currently started vertex. + void label(std::string const &l) {} + + // Ends writing vertex + void end_vertex() {} + + // Starts writing edge from vertex to vertex + void start_edge(Id id, Id from, Id to) {} + + // Type of currently started edge + void edge_type(std::string const &et) {} + + // Ends writing edge + void end_edge() {} + + // Number of following paired property_name,handle calls. + void property_count(size_t n); + + // Property family name of next property for currently started element. + void property_name(std::string const &name) {} + + void handle(const Void &v) {} + + void handle(const bool &prop) {} + + void handle(const float &prop) {} + + void handle(const double &prop) {} + + void handle(const int32_t &prop) {} + + void handle(const int64_t &prop) {} + + void handle(const std::string &value) {} + + void handle(const ArrayStore &) {} + + void handle(const ArrayStore &) {} + + void handle(const ArrayStore &) {} + + void handle(const ArrayStore &) {} + + void handle(const ArrayStore &) {} + + void handle(const ArrayStore &) {} +}; diff --git a/include/serialization/serialization.hpp b/include/serialization/serialization.hpp new file mode 100644 index 000000000..eb11c292e --- /dev/null +++ b/include/serialization/serialization.hpp @@ -0,0 +1,50 @@ +#pragma once + +#include "storage/edge_accessor.hpp" +#include "storage/edge_type/edge_type.hpp" +#include "storage/label/label.hpp" +#include "storage/vertex_accessor.hpp" + +namespace serialization +{ + +// Serializes Vertex to given writer which implements GraphEncoder. +template +void serialize_vertex(VertexAccessor const &v, W &writer) +{ + writer.start_vertex(v.id()); + + auto const &labels = v.labels(); + writer.label_count(labels.size()); + for (auto &label : labels) { + writer.label(label.get().str()); + } + + auto const &propertys = v.properties(); + writer.property_count(propertys.size()); + for (auto &prop : propertys) { + writer.property_name(prop.key.family_name()); + prop.accept_primitive(writer); + } + + writer.end_vertex(); +} + +// Serializes Edge to given writer which implements GraphEncoder. +template +void serialize_edge(EdgeAccessor const &e, W &writer) +{ + writer.start_edge(e.id(), e.from().id(), e.to().id()); + + writer.edge_type(e.edge_type().str()); + + auto const &propertys = e.properties(); + writer.property_count(propertys.size()); + for (auto &prop : propertys) { + writer.property_name(prop.key.family_name()); + prop.accept_primitive(writer); + } + + writer.end_edge(); +} +}; diff --git a/include/snapshot/snapshot_decoder.hpp b/include/snapshot/snapshot_decoder.hpp new file mode 100644 index 000000000..5a1495f4d --- /dev/null +++ b/include/snapshot/snapshot_decoder.hpp @@ -0,0 +1,8 @@ +#pragma once + +// Decodes stored snapshot. +class SnapshotDecoder +{ +public: + SnapshotDecoder(std::ifstream &snap_file); +}; diff --git a/include/snapshot/snapshot_encoder.hpp b/include/snapshot/snapshot_encoder.hpp new file mode 100644 index 000000000..ea6e90e26 --- /dev/null +++ b/include/snapshot/snapshot_encoder.hpp @@ -0,0 +1,121 @@ +#pragma once + +#include +#include + +#include "communication/bolt/v1/transport/bolt_encoder.hpp" +#include "mvcc/id.hpp" +#include "serialization/graph_encoder.hpp" +#include "serialization/serialization.hpp" +#include "storage/indexes/index_definition.hpp" +#include "utils/stream_wrapper.hpp" + +// Represents creation of a snapshot. Contains all necessary informations +// for +// write. Caller is responisble to structure his calls as following: +// * property_name_init +// * label_name_init +// 1 start_vertices +// * +// 1 start_edges +// * +// 1 start_indexes +// * index +// 1 end +class SnapshotEncoder : public GraphEncoder +{ +public: + SnapshotEncoder(std::ofstream &stream) : stream(stream) {} + + SnapshotEncoder(SnapshotEncoder const &) = delete; + SnapshotEncoder(SnapshotEncoder &&) = delete; + + SnapshotEncoder &operator=(SnapshotEncoder const &) = delete; + SnapshotEncoder &operator=(SnapshotEncoder &&) = delete; + + // Tells in advance which names will be used. + void property_name_init(std::string const &name); + + // Tells in advance which labels will be used. + void label_name_init(std::string const &name); + + // Tells in advance which edge_type will be used. + void edge_type_name_init(std::string const &name); + + // Prepares for vertices + void start_vertices(); + + // Prepares for edges + void start_edges(); + + // Prepares for indexes + void start_indexes(); + + // Writes index definition + void index(IndexDefinition const &); + + // Finishes snapshot + void end(); + + // *********************From graph encoder + // Starts writing vertex with given id. + void start_vertex(Id id); + + // Number of following label calls. + void label_count(size_t n); + + // Label of currently started vertex. + void label(std::string const &l); + + // Starts writing edge from vertex to vertex + void start_edge(Id id, Id from, Id to); + + // Type of currently started edge + void edge_type(std::string const &et); + + // Number of following paired property_name,handle calls. + void property_count(size_t n); + + // Property family name of next property for currently started element. + void property_name(std::string const &name); + + void handle(const Void &v); + + void handle(const bool &prop); + + void handle(const float &prop); + + void handle(const double &prop); + + void handle(const int32_t &prop); + + void handle(const int64_t &prop); + + void handle(const std::string &value); + + void handle(const ArrayStore &); + + void handle(const ArrayStore &); + + void handle(const ArrayStore &); + + void handle(const ArrayStore &); + + void handle(const ArrayStore &); + + void handle(const ArrayStore &); + +private: + std::ofstream &stream; + StreamWrapper wrapped = {stream}; + bolt::BoltEncoder> encoder = {wrapped}; + + // Contains for every property_name here snapshot local id. + std::unordered_map property_name_map; + + // Contains for every label_name here snapshot local id. + std::unordered_map label_name_map; + + // Contains for every edge_type here snapshot local id. + std::unordered_map edge_type_name_map; +}; diff --git a/include/snapshot/snapshoter.hpp b/include/snapshot/snapshoter.hpp new file mode 100644 index 000000000..8ae580f8e --- /dev/null +++ b/include/snapshot/snapshoter.hpp @@ -0,0 +1,75 @@ +#pragma once + +#include + +#include "database/db.hpp" +#include "logging/default.hpp" + +class Thread; +class SnapshotEncoder; +class SnapshotDecoder; + +// Captures snapshots. +class Snapshoter +{ + +public: + // How much sec is between snapshots + // snapshot_folder is path to common folder for all snapshots. + Snapshoter(ConcurrentMap &dbs, size_t snapshot_cycle, + std::string &&snapshot_folder); + + ~Snapshoter(); + + // Imports latest snapshot into the databse + void import(Db &db); + +private: + void run(Logger &logger); + + // Makes snapshot of given type + void make_snapshot(std::time_t now, const char *type); + + // Makes snapshot. It only saves records which have changed since old_trans. + void snapshot(DbTransaction const &dt, SnapshotEncoder &snap, + tx::TransactionId const &old_trans); + + // Loads snapshot. True if success + bool snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap); + + std::string snapshot_file(Db &db, std::time_t const &now, const char *type) + { + return snapshot_db_dir(db) + "/" + std::to_string(now) + "_" + type; + } + + std::string snapshot_commit_file(Db &db) + { + return snapshot_db_dir(db) + "/snapshot_commit.txt"; + } + + // Path to directory of database. Ensures that all necessary directorys + // exist. + std::string snapshot_db_dir(Db &db) + { + if (!sys::ensure_directory_exists(snapshot_folder)) { + logger.error("Error while creating directory \"{}\"", + snapshot_folder); + } + auto db_path = snapshot_folder + "/" + db.name(); + if (!sys::ensure_directory_exists(db_path)) { + logger.error("Error while creating directory \"{}\"", db_path); + } + return db_path; + } + + Logger logger; + + const size_t snapshot_cycle; + const std::string snapshot_folder; + + std::unique_ptr thread = {nullptr}; + + ConcurrentMap &dbms; + + std::atomic snapshoting = {true}; +}; diff --git a/include/storage/edge_type/edge_type.hpp b/include/storage/edge_type/edge_type.hpp index 104174d5d..ca2a7bf01 100644 --- a/include/storage/edge_type/edge_type.hpp +++ b/include/storage/edge_type/edge_type.hpp @@ -35,6 +35,8 @@ public: operator const std::string &() const; + std::string const &str() const { return id; } + CharStr char_str() { return CharStr(&id[0]); } type_index_t &index() const; diff --git a/include/storage/indexes/impl/nonunique_unordered_index.hpp b/include/storage/indexes/impl/nonunique_unordered_index.hpp index 6fba48e75..55fb1c057 100644 --- a/include/storage/indexes/impl/nonunique_unordered_index.hpp +++ b/include/storage/indexes/impl/nonunique_unordered_index.hpp @@ -14,9 +14,9 @@ public: // typedef K key_type; // Created with the database - NonUniqueUnorderedIndex(); + NonUniqueUnorderedIndex(IndexLocation &&loc); - NonUniqueUnorderedIndex(tx::Transaction const &t); + NonUniqueUnorderedIndex(IndexLocation &&loc, tx::Transaction const &t); // Insert's value. // nonunique => always succeds. diff --git a/include/storage/indexes/impl/unique_ordered_index.hpp b/include/storage/indexes/impl/unique_ordered_index.hpp index bdc0f560f..845321f4e 100644 --- a/include/storage/indexes/impl/unique_ordered_index.hpp +++ b/include/storage/indexes/impl/unique_ordered_index.hpp @@ -12,9 +12,10 @@ public: // typedef K key_type; // Created with the database - UniqueOrderedIndex(Order order); + UniqueOrderedIndex(IndexLocation loc, Order order); - UniqueOrderedIndex(Order order, tx::Transaction const &t); + UniqueOrderedIndex(IndexLocation loc, Order order, + tx::Transaction const &t); // Insert's value. // nonunique => always succeds. diff --git a/include/storage/indexes/index.hpp b/include/storage/indexes/index.hpp deleted file mode 100644 index 87007f0a7..000000000 --- a/include/storage/indexes/index.hpp +++ /dev/null @@ -1,44 +0,0 @@ -// #pragma once -// -// #include -// -// #include "data_structures/concurrent/concurrent_map.hpp" -// #include "storage/indexes/index_record.hpp" -// #include "storage/indexes/index_record_collection.hpp" -// #include "storage/label/label.hpp" -// -// template -// class Index -// { -// public: -// using container_t = ConcurrentMap; -// -// Index() : index(std::make_unique()) {} -// -// auto update(const Label &label, VertexIndexRecord &&index_record) -// { -// auto accessor = index->access(); -// auto label_ref = label_ref_t(label); -// -// // create Index Record Collection if it doesn't exist -// if (!accessor.contains(label_ref)) { -// accessor.insert(label_ref, std::move(VertexIndexRecordCollection())); -// } -// -// // add Vertex Index Record to the Record Collection -// auto &record_collection = (*accessor.find(label_ref)).second; -// record_collection.add(std::forward(index_record)); -// } -// -// VertexIndexRecordCollection &find(const Label &label) -// { -// // TODO: accessor should be outside? -// // bacause otherwise GC could delete record that has just be returned -// auto label_ref = label_ref_t(label); -// auto accessor = index->access(); -// return (*accessor.find(label_ref)).second; -// } -// -// private: -// std::unique_ptr index; -// }; diff --git a/include/storage/indexes/index_base.hpp b/include/storage/indexes/index_base.hpp index 08270d5ab..951c16861 100644 --- a/include/storage/indexes/index_base.hpp +++ b/include/storage/indexes/index_base.hpp @@ -1,3 +1,4 @@ + #pragma once #include @@ -7,9 +8,9 @@ // #include "storage/indexes/index_record.hpp" #include "storage/garbage/delete_sensitive.hpp" +#include "storage/indexes/index_definition.hpp" #include "utils/border.hpp" #include "utils/iterator/virtual_iter.hpp" -#include "utils/order.hpp" template class IndexRecord; @@ -32,9 +33,9 @@ public: // typedef K key_type; // Created with the database - IndexBase(bool unique, Order order); + IndexBase(IndexDefinition &&it); - IndexBase(bool unique, Order order, const tx::Transaction &t); + IndexBase(IndexDefinition &&it, const tx::Transaction &t); virtual ~IndexBase(){}; @@ -68,15 +69,12 @@ public: // True if transaction is obliged to insert T into index. bool is_obliged_to_insert(const tx::Transaction &t); - bool unique() { return _unique; } + IndexType type() const { return it.type; } - Order order() { return _order; } + const IndexDefinition &definition() const { return it; } private: - // Are the records unique - const bool _unique; - // Ordering of the records. - const Order _order; + const IndexDefinition it; // Id of transaction which created this index. const Id created; // Active state diff --git a/include/storage/indexes/index_definition.hpp b/include/storage/indexes/index_definition.hpp new file mode 100644 index 000000000..5bb53012c --- /dev/null +++ b/include/storage/indexes/index_definition.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include "utils/option.hpp" +#include "utils/order.hpp" + +enum DbSide : uint8_t +{ + EdgeSide = 0, + VertexSide = 1, +}; + +struct IndexType +{ +public: + // Are the records unique + const bool unique; + // Ordering of the records. + const Order order; +}; + +struct IndexLocation +{ +public: + const DbSide side; + const Option property_name; + const Option label_name; +}; + +// Fully answers: +// On what index? +// What kind of index? +struct IndexDefinition +{ +public: + const IndexLocation loc; + const IndexType type; +}; diff --git a/include/storage/indexes/indexes.hpp b/include/storage/indexes/indexes.hpp new file mode 100644 index 000000000..c3d24c780 --- /dev/null +++ b/include/storage/indexes/indexes.hpp @@ -0,0 +1,73 @@ +#pragma once + +#include "database/db.hpp" + +// Operation on indexes in the Db +class Indexes +{ +public: + Indexes(Db &d) : db(d) {} + + // Calls F over all vertex indexes in the database which are readable. + template + void vertex_indexes(F &&f) + { + for (auto &l : db.graph.label_store.access()) { + f(l.second.get()->index()); + } + + for_all_property_indexes_read( + db.graph.vertices.property_family_access(), f); + } + + // Calls F over all edge indexes in the database which are readable. + template + void edge_indexes(F &&f) + { + for (auto &l : db.graph.edge_type_store.access()) { + f(l.second.get()->index()); + } + + for_all_property_indexes_read(db.graph.edges.property_family_access(), + f); + } + + // Updates property indexes for given TypeGroup TG and IU index update + template + bool update_property_indexes(IU &iu, const tx::Transaction &t) + { + for (auto kp : iu.record->data.props) { + + // FamilyProperty index + auto opi = kp.key.get_family().index.get_write(t); + if (opi.is_present()) { + if (!opi.get()->insert(IndexRecord( + std::nullptr_t(), iu.record, iu.vlist))) { + return false; + } + } + + // TODO: other properti indexes + } + + return true; + } + +private: + // Calls F for all * property indexes which are readable. + template + void for_all_property_indexes_read(A &&acc, F &f) + { + for (auto &family : acc) { + auto oi = family.second->index.get_read(); + if (oi.is_present()) { + f(*oi.get()); + } + } + + // TODO: Code for reaching other property indexes which are not yet + // coded into the database. + } + + Db &db; +}; diff --git a/include/storage/label/label.hpp b/include/storage/label/label.hpp index f0e04c2d2..3273f5978 100644 --- a/include/storage/label/label.hpp +++ b/include/storage/label/label.hpp @@ -4,13 +4,12 @@ #include #include "storage/indexes/impl/nonunique_unordered_index.hpp" +#include "storage/type_group_vertex.hpp" #include "storage/vertex.hpp" #include "storage/vertex_accessor.hpp" #include "utils/char_str.hpp" #include "utils/reference_wrapper.hpp" #include "utils/total_ordering.hpp" -// #include "storage/type_group_edge.hpp" -#include "storage/type_group_vertex.hpp" using LabelIndexRecord = IndexRecord; @@ -39,6 +38,8 @@ public: operator const std::string &() const; + std::string const &str() const { return name; } + CharStr char_str() const { return CharStr(name.c_str()); } label_index_t &index() const; diff --git a/include/storage/label/label_collection.hpp b/include/storage/label/label_collection.hpp index a6963c49c..831578da4 100644 --- a/include/storage/label/label_collection.hpp +++ b/include/storage/label/label_collection.hpp @@ -11,13 +11,13 @@ using label_ref_t = ReferenceWrapper; class LabelCollection { public: - auto begin(); - auto begin() const; - auto cbegin() const; + auto begin() { return _labels.begin(); } + auto begin() const { return _labels.begin(); } + auto cbegin() const { return _labels.begin(); } - auto end(); - auto end() const; - auto cend() const; + auto end() { return _labels.end(); } + auto end() const { return _labels.end(); } + auto cend() const { return _labels.end(); } bool add(const Label &label); bool has(const Label &label) const; diff --git a/include/storage/model/properties/array.hpp b/include/storage/model/properties/array.hpp index caae8995d..0818df24b 100644 --- a/include/storage/model/properties/array.hpp +++ b/include/storage/model/properties/array.hpp @@ -4,13 +4,7 @@ #include #include "storage/model/properties/flags.hpp" - -// TODO: more bytes can be saved if this is array with exact size as number -// of elements. -// TODO: even more bytes can be saved if this is one ptr to structure which -// holds len followed by len sized array. -template -using ArrayStore = std::vector; +#include "utils/array_store.hpp" template class Array diff --git a/include/storage/model/properties/null.hpp b/include/storage/model/properties/null.hpp index 6f7cf1614..56034e48b 100644 --- a/include/storage/model/properties/null.hpp +++ b/include/storage/model/properties/null.hpp @@ -10,8 +10,8 @@ class Null public: const static Type type; - Void &value() { return Void::_void; } - Void const &value() const { return Void::_void; } + Void &value() { return _void; } + Void const &value() const { return _void; } std::ostream &print(std::ostream &stream) const; diff --git a/include/storage/model/properties/property_holder.hpp b/include/storage/model/properties/property_holder.hpp index c8a342061..40aab3f4a 100644 --- a/include/storage/model/properties/property_holder.hpp +++ b/include/storage/model/properties/property_holder.hpp @@ -43,6 +43,14 @@ break; \ } +// Genrates case claus for Flags::type_name to handle primitives of type_name +// object from field of union_name +#define GENERATE_CASE_CLAUSE_FOR_HANDLER_PRIMITIVE(type_name, union_name) \ + case Flags::type_name: { \ + h.handle(this->union_name.value()); \ + break; \ + } + // Genrates case claus for Flags::type_name to print type_name object from // field of union_name #define GENERATE_CASE_CLAUSE_FOR_PRINT(type_name, union_name) \ @@ -147,6 +155,8 @@ public: return *this; } + // Calls appropiate handler with a database property. + // (String,Int64,ArrayBool,...) template void accept(Handler &h) const { @@ -157,6 +167,17 @@ public: } } + // Calls appropiate handler with a primitive.(std::string,bool,int64_t,...) + template + void accept_primitive(Handler &h) const + { + switch (key.get_type().flags()) { + GENERATE_FOR_ALL_PROPERTYS(CASE_CLAUSE_FOR_HANDLER_PRIMITIVE); + default: + assert(false); + } + } + std::ostream &print(std::ostream &stream) const { switch (key.get_type().flags()) { diff --git a/include/storage/record_accessor.hpp b/include/storage/record_accessor.hpp index c7845193d..bcc2f35b8 100644 --- a/include/storage/record_accessor.hpp +++ b/include/storage/record_accessor.hpp @@ -51,6 +51,13 @@ public: const Id &id() const { return vlist->id; } + // True if record visible for current transaction is visible to given + // transaction id. + bool is_visble_to(tx::TransactionId const &id) + { + return record->visible(id); + } + // TODO: Test this Derived update() const { diff --git a/include/transactions/transaction.hpp b/include/transactions/transaction.hpp index 64306d6cd..b201eaceb 100644 --- a/include/transactions/transaction.hpp +++ b/include/transactions/transaction.hpp @@ -9,44 +9,31 @@ #include "storage/locking/record_lock.hpp" #include "transactions/lock_store.hpp" #include "transactions/snapshot.hpp" +#include "transactions/transaction_id.hpp" namespace tx { -class Engine; - -class Transaction +class Transaction : public TransactionId { - friend class Engine; public: Transaction(const Id &id, const Snapshot &snapshot, Engine &engine); Transaction(const Transaction &) = delete; Transaction(Transaction &&) = delete; - // index of this transaction - const Id id; - // index of the current command in the current transaction; - uint8_t cid; - // a snapshot of currently active transactions + // Returns copy of transaction_id + TransactionId transaction_id(); // Blocks until all transactions from snapshot finish. After this method, // snapshot will be empty. void wait_for_active(); - // Return id of oldest transaction from snapshot. - Id oldest_active(); - - // True if id is in snapshot. - bool is_active(const Id &id) const; void take_lock(RecordLock &lock); void commit(); void abort(); - Engine &engine; - private: - Snapshot snapshot; LockStore locks; }; } diff --git a/include/transactions/transaction_id.hpp b/include/transactions/transaction_id.hpp new file mode 100644 index 000000000..e97add53a --- /dev/null +++ b/include/transactions/transaction_id.hpp @@ -0,0 +1,45 @@ + +#pragma once + +#include +#include +#include + +#include "mvcc/id.hpp" +#include "transactions/snapshot.hpp" + +namespace tx +{ + +class Engine; + +class TransactionId +{ + friend class Engine; + +public: + TransactionId(Engine &engine); + + TransactionId(const Id &&id, const Snapshot &&snapshot, Engine &engine); + + TransactionId(const Id &id, const Snapshot &snapshot, Engine &engine); + + // Return id of oldest transaction from snapshot. + Id oldest_active(); + + // True if id is in snapshot. + bool in_snapshot(const Id &id) const; + + // index of this transaction + const Id id; + + // index of the current command in the current transaction; + uint8_t cid; + + Engine &engine; + +protected: + // a snapshot of currently active transactions + Snapshot snapshot; +}; +} diff --git a/include/utils/array_store.hpp b/include/utils/array_store.hpp new file mode 100644 index 000000000..60b20a2e1 --- /dev/null +++ b/include/utils/array_store.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include + +// TODO: more bytes can be saved if this is array with exact size as number +// of elements. +// TODO: even more bytes can be saved if this is one ptr to structure which +// holds len followed by len sized array. +template +using ArrayStore = std::vector; diff --git a/include/utils/char_str.hpp b/include/utils/char_str.hpp index 71118989a..6d76952b7 100644 --- a/include/utils/char_str.hpp +++ b/include/utils/char_str.hpp @@ -8,6 +8,8 @@ class CharStr : public TotalOrdering public: CharStr(const char *str) : str(str) {} + std::string to_string() const { return std::string(str); } + friend bool operator==(const CharStr &lhs, const CharStr &rhs) { return strcmp(lhs.str, rhs.str) == 0; diff --git a/include/utils/iterator/combined.hpp b/include/utils/iterator/combined.hpp new file mode 100644 index 000000000..adb3aa73b --- /dev/null +++ b/include/utils/iterator/combined.hpp @@ -0,0 +1,63 @@ +#pragma once + +#include "utils/iterator/composable.hpp" +#include "utils/iterator/iterator_base.hpp" + +namespace iter +{ + +// Class which Combined two iterators IT1 and IT2. Both return values T +// T - type of return value +// IT1 - first iterator type +// IT2 - second iterator type +template +class Combined : public IteratorBase, + public Composable> +{ + +public: + Combined() = delete; + + // Combined operation is designed to be used in chained calls which operate + // on a iterator. Combined will in that usecase receive other iterator by + // value and std::move is a optimization for it. + Combined(IT1 &&iter1, IT2 &&iter2) + : iter1(Option(std::move(iter1))), + iter2(Option(std::move(iter2))) + { + } + + // Return values first from first iterator then from second. + Option next() final + { + if (iter1.is_present()) { + auto ret = iter1.get().next(); + if (ret.is_present()) { + return std::move(ret); + } else { + iter1.take(); + } + } + + return iter2.next(); + } + + Count count() final + { + return iter1.map_or([](auto &it) { return it.count(); }, 0) + + iter2.count(); + } + +private: + Option iter1; + IT2 iter2; +}; + +template +auto make_combined(IT1 &&iter1, IT2 &&iter2) +{ + // Compiler cant deduce type T. decltype is here to help with it. + return Combined(std::move(iter1), + std::move(iter2)); +} +} diff --git a/include/utils/iterator/composable.hpp b/include/utils/iterator/composable.hpp index 857dbd4f8..98efecb26 100644 --- a/include/utils/iterator/composable.hpp +++ b/include/utils/iterator/composable.hpp @@ -29,6 +29,9 @@ auto make_limited_map(I &&iter, OP &&op); template auto make_virtual(I &&iter); +template +auto make_combined(IT1 &&iter1, IT2 &&iter2); + // Class for creating easy composable iterators fo querying. // Derived - type of derived class // T - return type @@ -40,6 +43,12 @@ class Composable : public Crtp public: auto virtualize() { return iter::make_virtual(move()); } + template + auto combine(IT &&it) + { + return iter::make_combined(move(), std::move(it)); + } + template auto map(OP &&op) { diff --git a/include/utils/iterator/count.hpp b/include/utils/iterator/count.hpp index 7beffb1a0..24cd17615 100644 --- a/include/utils/iterator/count.hpp +++ b/include/utils/iterator/count.hpp @@ -1,5 +1,6 @@ #pragma once +#include "utils/numerics/saturate.hpp" #include "utils/total_ordering.hpp" // Represents number of to be returned elements from iterator. Where acutal @@ -26,6 +27,12 @@ public: return lhs.avg() == rhs.avg(); } + friend Count operator+(const Count &lhs, const Count &rhs) + { + return Count(num::saturating_add(lhs.min, rhs.min), + num::saturating_add(lhs.max, rhs.max)); + } + size_t min; size_t max; }; diff --git a/include/utils/iterator/iterator.hpp b/include/utils/iterator/iterator.hpp index a540dff0d..e47748a7c 100644 --- a/include/utils/iterator/iterator.hpp +++ b/include/utils/iterator/iterator.hpp @@ -1,6 +1,7 @@ #pragma once #include "utils/iterator/accessor.hpp" +#include "utils/iterator/combined.hpp" #include "utils/iterator/count.hpp" #include "utils/iterator/filter.hpp" #include "utils/iterator/flat_map.hpp" diff --git a/include/utils/iterator/lambda_iterator.hpp b/include/utils/iterator/lambda_iterator.hpp index cfae1c3d5..d1bd24f4a 100644 --- a/include/utils/iterator/lambda_iterator.hpp +++ b/include/utils/iterator/lambda_iterator.hpp @@ -35,6 +35,13 @@ private: size_t _count; }; +// Wraps lambda which returns options as an iterator. +template +auto make_iterator(F &&f) +{ + return make_iterator(std::move(f), ~((size_t)0)); +} + // Wraps lambda which returns options as an iterator. template auto make_iterator(F &&f, size_t count) diff --git a/include/utils/numerics/ceil.hpp b/include/utils/numerics/ceil.hpp index 3b173fb74..3a1ee94bc 100644 --- a/include/utils/numerics/ceil.hpp +++ b/include/utils/numerics/ceil.hpp @@ -6,7 +6,7 @@ namespace num { template ::value>::type* = nullptr> + typename std::enable_if::value>::type * = nullptr> T iceil(T x, T y) { // this may seem inefficient, but on x86_64, when you already perform @@ -14,5 +14,4 @@ T iceil(T x, T y) // is basically free! return x / y + (x % y != 0); } - } diff --git a/include/utils/numerics/saturate.hpp b/include/utils/numerics/saturate.hpp new file mode 100644 index 000000000..a5c85fb51 --- /dev/null +++ b/include/utils/numerics/saturate.hpp @@ -0,0 +1,12 @@ +#pragma once + +#include + +namespace num +{ + +constexpr std::size_t size_t_HIGHEST_BIT_SETED = + ((std::size_t)1) << ((sizeof(std::size_t) * 8) - 1); + +std::size_t saturating_add(std::size_t a, std::size_t b); +}; diff --git a/include/utils/option.hpp b/include/utils/option.hpp index 56fa51d06..751c73b03 100644 --- a/include/utils/option.hpp +++ b/include/utils/option.hpp @@ -98,6 +98,24 @@ public: return *data._M_ptr(); } + T &get_or(T &other) + { + if (is_present()) { + return get(); + } else { + return other; + } + } + + T const &get_or(T const &other) const + { + if (is_present()) { + return get(); + } else { + return other; + } + } + const T &get() const noexcept { assert(initialized); @@ -124,6 +142,26 @@ public: } } + template + U map_or(F f, U &&def) + { + if (is_present()) { + return f(take()); + } else { + return std::move(def); + } + } + + template + U call_or(F f, U &&def) + { + if (is_present()) { + return f(get()); + } else { + return std::move(def); + } + } + T take() { assert(initialized); diff --git a/include/utils/order.hpp b/include/utils/order.hpp index 645609320..ceb744d16 100644 --- a/include/utils/order.hpp +++ b/include/utils/order.hpp @@ -1,7 +1,7 @@ #pragma once // Defines ordering of data -enum Order +enum Order : uint8_t { None = 0, Ascending = 1, diff --git a/include/utils/stream_wrapper.hpp b/include/utils/stream_wrapper.hpp new file mode 100644 index 000000000..1f64bf093 --- /dev/null +++ b/include/utils/stream_wrapper.hpp @@ -0,0 +1,24 @@ +#pragma once + +// Wraps stream with convinient methods which need only one method: +// write (const char* s, n); +template +class StreamWrapper +{ +public: + StreamWrapper() = delete; + StreamWrapper(STREAM &s) : stream(s) {} + + void write(const unsigned char value) + { + stream.write(reinterpret_cast(&value), 1); + } + + void write(const unsigned char *value, size_t n) + { + stream.write(reinterpret_cast(value), n); + } + +private: + STREAM &stream; +}; diff --git a/include/utils/sys.hpp b/include/utils/sys.hpp index 8e4ff35a2..57ec1e28e 100644 --- a/include/utils/sys.hpp +++ b/include/utils/sys.hpp @@ -1,17 +1,78 @@ #pragma once -#include +#include +#include +#include #include -#include +#include +#include +#include #include +#include +#include namespace sys { +// Code from stackoverflow +// http://stackoverflow.com/questions/109449/getting-a-file-from-a-stdfstream +// Extracts FILE* from streams in std. +template +struct STDIOAdapter +{ + static FILE *yield(STREAM *stream) + { + assert(stream != NULL); -inline int futex(void* addr1, int op, int val1, const struct timespec* timeout, - void* addr2, int val3) + static cookie_io_functions_t Cookies = {.read = NULL, + .write = cookieWrite, + .seek = NULL, + .close = cookieClose}; + + return fopencookie(stream, "w", Cookies); + } + + ssize_t static cookieWrite(void *cookie, const char *buf, size_t size) + { + if (cookie == NULL) return -1; + + STREAM *writer = static_cast(cookie); + + writer->write(buf, size); + + return size; + } + + int static cookieClose(void *cookie) { return EOF; } +}; // STDIOAdapter + +inline int futex(void *addr1, int op, int val1, const struct timespec *timeout, + void *addr2, int val3) { return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); -} +}; +// Ensures that everything written to file will be writen on disk when the +// function call returns. !=0 if error occured +template +inline size_t flush_file_to_disk(STREAM &file) +{ + file.flush(); + FILE *f = STDIOAdapter::yield(&file); + if (fsync(fileno(f)) == 0) { + return 0; + } + + return errno; +}; + +// True if succesffull +inline bool ensure_directory_exists(std::string const &path) +{ + struct stat st = {0}; + + if (stat(path.c_str(), &st) == -1) { + return mkdir(path.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) == 0; + } + return true; } +}; diff --git a/include/utils/void.hpp b/include/utils/void.hpp index 6ba0328f7..b151e66a1 100644 --- a/include/utils/void.hpp +++ b/include/utils/void.hpp @@ -5,9 +5,9 @@ class Void : public TotalOrdering { public: - static Void _void; - friend bool operator<(const Void &lhs, const Void &rhs) { return false; } friend bool operator==(const Void &lhs, const Void &rhs) { return true; } }; + +static Void _void = {}; diff --git a/src/barrier/barrier.cpp b/src/barrier/barrier.cpp index 7e9dc6d97..0c21fbef2 100644 --- a/src/barrier/barrier.cpp +++ b/src/barrier/barrier.cpp @@ -101,15 +101,9 @@ VertexIterator VertexIndex::for_range(DbAccessor &t, Border from, } template -bool VertexIndex::unique() +IndexType VertexIndex::type() { - return HALF_CALL(unique()); -} - -template -Order VertexIndex::order() -{ - return HALF_CALL(order()); + return HALF_CALL(type()); } // INSTANCES OF VERTEX INDEX @@ -124,15 +118,9 @@ EdgeIterator EdgeIndex::for_range(DbAccessor &t, Border from, } template -bool EdgeIndex::unique() +IndexType EdgeIndex::type() { - return HALF_CALL(unique()); -} - -template -Order EdgeIndex::order() -{ - return HALF_CALL(order()); + return HALF_CALL(type()); } // INSTANCES OF EDGE INDEX diff --git a/src/database/db.cpp b/src/database/db.cpp index e00c772ca..78612bfad 100644 --- a/src/database/db.cpp +++ b/src/database/db.cpp @@ -1,10 +1,14 @@ #include "database/db.hpp" + +#include "storage/indexes/indexes.hpp" #include "storage/model/properties/property_family.hpp" Db::Db() = default; Db::Db(const std::string &name) : name_(name) {} -std::string &Db::name() { return name_; } +std::string const &Db::name() const { return name_; } + +Indexes Db::indexes() { return Indexes(*this); } template bool Db::create_index_on_vertex_property_family(const char *name, G &coll, diff --git a/src/database/db_transaction.cpp b/src/database/db_transaction.cpp index a7d5bfa72..c7d46ac19 100644 --- a/src/database/db_transaction.cpp +++ b/src/database/db_transaction.cpp @@ -1,8 +1,10 @@ #include "database/db_transaction.hpp" #include "database/db.hpp" +#include "serialization/serialization.hpp" #include "storage/edge.hpp" #include "storage/edge_type/edge_type.hpp" +#include "storage/indexes/indexes.hpp" #include "storage/label/label.hpp" #include "storage/vertex.hpp" @@ -13,15 +15,6 @@ DbTransaction::DbTransaction(Db &db) : db(db), trans(db.tx_engine.begin()) {} -// Cleaning for indexes in labels and edge_type -template -void clean_indexes(A &&acc, Id oldest_active) -{ - for (auto &l : acc) { - l.second.get()->index().clean(oldest_active); - } -} - // Cleaning for version lists template void clean_version_lists(A &&acc, Id oldest_active) @@ -35,33 +28,14 @@ void clean_version_lists(A &&acc, Id oldest_active) } } -// Cleaning for indexes in properties. -template -void clean_property_indexes(A &&acc, Id oldest_active) -{ - for (auto &family : acc) { - auto oi = family.second->index.get_read(); - if (oi.is_present()) { - oi.get()->clean(oldest_active); - } - } - - // TODO: Code for cleaning other indexes which are not yet coded into - // the database. -} - // Cleans edge part of database. Should be called by one cleaner thread at // one time. void DbTransaction::clean_edge_section() { Id oldest_active = trans.oldest_active(); - // Clean edge_type index - clean_indexes(db.graph.edge_type_store.access(), oldest_active); - - // Clean family_type_s edge index - clean_property_indexes(db.graph.edges.property_family_access(), - oldest_active); + // Clean indexes + db.indexes().edge_indexes([&](auto &in) { in.clean(oldest_active); }); // Clean Edge list clean_version_lists(db.graph.edges.access(), oldest_active); @@ -73,35 +47,13 @@ void DbTransaction::clean_vertex_section() { Id oldest_active = trans.oldest_active(); - // Clean label index - clean_indexes(db.graph.label_store.access(), oldest_active); - - // Clean family_type_s vertex index - clean_property_indexes(db.graph.vertices.property_family_access(), - oldest_active); + // Clean indexes + db.indexes().vertex_indexes([&](auto &in) { in.clean(oldest_active); }); // Clean vertex list clean_version_lists(db.graph.vertices.access(), oldest_active); } -template -bool update_property_indexes(IU &iu, const tx::Transaction &t) -{ - for (auto kp : iu.record->data.props) { - - // FamilyProperty index - auto opi = kp.key.get_family().index.get_write(t); - if (opi.is_present()) { - TRY(opi.get()->insert(IndexRecord( - std::nullptr_t(), iu.record, iu.vlist))); - } - - // TODO: other properti indexes - } - - return true; -} - bool DbTransaction::update_indexes() { while (!index_updates.empty()) { @@ -115,7 +67,7 @@ bool DbTransaction::update_indexes() TRY(e.record->data.edge_type->index().insert( EdgeTypeIndexRecord(std::nullptr_t(), e.record, e.vlist))); - TRY(update_property_indexes(e, trans)); + TRY(db.indexes().update_property_indexes(e, trans)); } else { auto v = iu.v; @@ -128,7 +80,8 @@ bool DbTransaction::update_indexes() LabelIndexRecord(std::nullptr_t(), v.record, v.vlist))); } - TRY(update_property_indexes(v, trans)); + TRY(db.indexes().update_property_indexes(v, + trans)); } index_updates.pop_back(); diff --git a/src/dbms/cleaner.cpp b/src/dbms/cleaner.cpp index 0a3f21649..26ce9f0ea 100644 --- a/src/dbms/cleaner.cpp +++ b/src/dbms/cleaner.cpp @@ -9,7 +9,8 @@ #include "logging/default.hpp" -Cleaning::Cleaning(ConcurrentMap &dbs) : dbms(dbs) +Cleaning::Cleaning(ConcurrentMap &dbs, size_t cleaning_cycle) + : dbms(dbs), cleaning_cycle(cleaning_cycle) { cleaners.push_back(std::make_unique([&]() { Logger logger = logging::log->logger("Cleaner"); diff --git a/src/snapshot/snapshot_encoder.cpp b/src/snapshot/snapshot_encoder.cpp new file mode 100644 index 000000000..30cd06f0b --- /dev/null +++ b/src/snapshot/snapshot_encoder.cpp @@ -0,0 +1,178 @@ +#include "snapshot/snapshot_encoder.hpp" + +void SnapshotEncoder::property_name_init(std::string const &name) +{ + + if (property_name_map.find(name) == property_name_map.end()) { + auto id = property_name_map.size(); + property_name_map.insert(std::make_pair(name, id)); + } +} + +void SnapshotEncoder::label_name_init(std::string const &name) +{ + if (label_name_map.find(name) == label_name_map.end()) { + auto id = label_name_map.size(); + label_name_map.insert(std::make_pair(name, id)); + } +} + +void SnapshotEncoder::edge_type_name_init(std::string const &name) +{ + if (edge_type_name_map.find(name) == edge_type_name_map.end()) { + auto id = edge_type_name_map.size(); + edge_type_name_map.insert(std::make_pair(name, id)); + } +} + +void SnapshotEncoder::end() { encoder.write_string("end"); } + +// **************** INDEX +// Prepares for indexes +void SnapshotEncoder::start_indexes() { encoder.write_string("vertices"); } + +// Writes index definition +void SnapshotEncoder::index(IndexDefinition const &def) +{ + std::string empty; + + encoder.write_byte(underlying_cast(def.loc.side)); + encoder.write_string(def.loc.property_name.get_or(empty)); + encoder.write_string(def.loc.label_name.get_or(empty)); + encoder.write_bool(def.type.unique); + encoder.write_byte(underlying_cast(def.type.order)); +} + +// ************* VERTEX +// Prepares for vertices +void SnapshotEncoder::start_vertices() +{ + encoder.write_map_header(property_name_map.size()); + for (auto p : property_name_map) { + encoder.write_string(p.first); + encoder.write_integer(p.second); + } + + encoder.write_map_header(label_name_map.size()); + for (auto p : label_name_map) { + encoder.write_string(p.first); + encoder.write_integer(p.second); + } + + encoder.write_map_header(edge_type_name_map.size()); + for (auto p : edge_type_name_map) { + encoder.write_string(p.first); + encoder.write_integer(p.second); + } + + encoder.write_string("vertices"); +} + +// Starts writing vertex with given id. +void SnapshotEncoder::start_vertex(Id id) { encoder.write_integer(id); } + +// Number of following label calls. +void SnapshotEncoder::label_count(size_t n) { encoder.write_list_header(n); } + +// Label of currently started vertex. +void SnapshotEncoder::label(std::string const &l) +{ + encoder.write_integer(label_name_map.at(l)); +} + +// ************* EDGE +// Prepares for edges +void SnapshotEncoder::start_edges() { encoder.write_string("edges"); } + +// Starts writing edge from vertex to vertex +void SnapshotEncoder::start_edge(Id id, Id from, Id to) +{ + encoder.write_integer(id); + encoder.write_integer(from); + encoder.write_integer(to); +} + +// Type of currently started edge +void SnapshotEncoder::edge_type(std::string const &et) +{ + encoder.write_integer(edge_type_name_map.at(et)); +} + +// ******* PROPERTY +void SnapshotEncoder::property_count(size_t n) { encoder.write_map_header(n); } + +void SnapshotEncoder::property_name(std::string const &name) +{ + encoder.write_integer(property_name_map.at(name)); +} + +void SnapshotEncoder::handle(const Void &v) { encoder.write_null(); } + +void SnapshotEncoder::handle(const bool &prop) { encoder.write_bool(prop); } + +void SnapshotEncoder::handle(const float &prop) { encoder.write_double(prop); } + +void SnapshotEncoder::handle(const double &prop) { encoder.write_double(prop); } + +void SnapshotEncoder::handle(const int32_t &prop) +{ + encoder.write_integer(prop); +} + +void SnapshotEncoder::handle(const int64_t &prop) +{ + encoder.write_integer(prop); +} + +void SnapshotEncoder::handle(const std::string &value) +{ + encoder.write_string(value); +} + +void SnapshotEncoder::handle(const ArrayStore &a) +{ + encoder.write_list_header(a.size()); + for (auto const &e : a) { + encoder.write_bool(e); + } +} + +void SnapshotEncoder::handle(const ArrayStore &a) +{ + encoder.write_list_header(a.size()); + for (auto const &e : a) { + encoder.write_integer(e); + } +} + +void SnapshotEncoder::handle(const ArrayStore &a) +{ + encoder.write_list_header(a.size()); + for (auto const &e : a) { + encoder.write_integer(e); + } +} + +void SnapshotEncoder::handle(const ArrayStore &a) +{ + encoder.write_list_header(a.size()); + for (auto const &e : a) { + encoder.write_double(e); + } +} + +void SnapshotEncoder::handle(const ArrayStore &a) +{ + encoder.write_list_header(a.size()); + for (auto const &e : a) { + encoder.write_double(e); + } +} + +void SnapshotEncoder::handle(const ArrayStore &a) +{ + encoder.write_list_header(a.size()); + for (auto const &e : a) { + encoder.write_string(e); + } +} diff --git a/src/snapshot/snapshoter.cpp b/src/snapshot/snapshoter.cpp new file mode 100644 index 000000000..fad6409ec --- /dev/null +++ b/src/snapshot/snapshoter.cpp @@ -0,0 +1,222 @@ +#include "snapshot/snapshoter.hpp" + +#include "database/db_accessor.hpp" +#include "logging/default.hpp" +#include "snapshot/snapshot_decoder.hpp" +#include "snapshot/snapshot_encoder.hpp" +#include "storage/indexes/indexes.hpp" +#include "threading/thread.hpp" +#include "utils/sys.hpp" + +Snapshoter::Snapshoter(ConcurrentMap &dbs, + size_t snapshot_cycle, std::string &&snapshot_folder) + : snapshot_cycle(snapshot_cycle), snapshot_folder(snapshot_folder), + dbms(dbs) +{ + thread = std::make_unique([&]() { + logger = logging::log->logger("Snapshoter"); + + try { + run(logger); + } catch (const std::exception &e) { + logger.error("Irreversible error occured in snapshoter"); + logger.error("{}", e.what()); + } + + logger.info("Shutting down snapshoter"); + }); +} + +Snapshoter::~Snapshoter() +{ + snapshoting.store(false, std::memory_order_release); + thread.get()->join(); +} + +void Snapshoter::run(Logger &logger) +{ + std::time_t last_snapshot = std::time(nullptr); + + while (snapshoting.load(std::memory_order_acquire)) { + std::time_t now = std::time(nullptr); + + if (now >= last_snapshot + snapshot_cycle) { + // It's time for snapshot + make_snapshot(now, "full"); + + last_snapshot = now; + } else { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } +} + +void Snapshoter::make_snapshot(std::time_t now, const char *type) +{ + logger.info(std::string("Started ") + type + " snapshot cycle"); + + for (auto &db : dbms.access()) { + auto snapshot_file_name = snapshot_file(db.second, now, type); + + logger.info(std::string("Writing ") + type + " snapshot of database " + "\"{}\" to file \"{}\"", + db.first, snapshot_file_name); + + DbTransaction t(db.second); + + bool success = false; + try { + std::ofstream snapshot_file( + snapshot_file_name, std::fstream::binary | std::fstream::trunc); + + SnapshotEncoder snap(snapshot_file); + + auto old_trans = tx::TransactionId(db.second.tx_engine); + snapshot(t, snap, old_trans); + + auto res = sys::flush_file_to_disk(snapshot_file); + if (res == 0) { + t.trans.commit(); + success = true; + } else { + logger.error("Error {} occured while flushing snapshot file", + res); + t.trans.abort(); + } + + } catch (const std::exception &e) { + logger.error(std::string("Error occured while creating ") + type + + " " + "snapshot of database \"{}\"", + db.first); + logger.error("{}", e.what()); + + t.trans.abort(); + } + + if (success) { + std::ofstream commit_file(snapshot_commit_file(db.second), + std::fstream::app); + + commit_file << snapshot_file_name << std::endl; + + auto res = sys::flush_file_to_disk(commit_file); + if (res == 0) { + commit_file.close(); + } else { + logger.error("Error {} occured while flushing commit file", + res); + } + } + } + + logger.info(std::string("Finished ") + type + " snapshot cycle"); +} + +void Snapshoter::snapshot(DbTransaction const &dt, SnapshotEncoder &snap, + tx::TransactionId const &old_trans) +{ + Db &db = dt.db; + DbAccessor t(db, dt.trans); + + // Anounce property names + for (auto &family : db.graph.vertices.property_family_access()) { + snap.property_name_init(family.first); + } + for (auto &family : db.graph.edges.property_family_access()) { + snap.property_name_init(family.first); + } + + // Anounce label names + for (auto &labels : db.graph.label_store.access()) { + snap.label_name_init(labels.first.to_string()); + } + + // Store vertices + snap.start_vertices(); + t.vertex_access() + .fill() + .filter([&](auto va) { return !va.is_visble_to(old_trans); }) + .for_all([&](auto va) { serialization::serialize_vertex(va, snap); }); + + // Store edges + snap.start_edges(); + t.edge_access() + .fill() + .filter([&](auto va) { return !va.is_visble_to(old_trans); }) + .for_all([&](auto ea) { serialization::serialize_edge(ea, snap); }); + + // Store info on existing indexes. + snap.start_indexes(); + db.indexes().vertex_indexes([&](auto &i) { snap.index(i.definition()); }); + db.indexes().edge_indexes([&](auto &i) { snap.index(i.definition()); }); + + snap.end(); +} + +void Snapshoter::import(Db &db) +{ + logger.info("Started import for database \"{}\"", db.name()); + + try { + + std::ifstream commit_file(snapshot_commit_file(db)); + + std::vector snapshots; + std::string line; + while (std::getline(commit_file, line)) { + snapshots.push_back(line); + } + + while (snapshots.size() > 0) { + logger.info("Importing data from snapshot \"{}\" into " + "database \"{}\"", + snapshots.back(), db.name()); + + DbTransaction t(db); + + try { + std::ifstream snapshot_file(snapshots.back(), + std::fstream::binary); + SnapshotDecoder decoder(snapshot_file); + + if (snapshot_load(t, decoder)) { + t.trans.commit(); + logger.info("Succesfully imported snapshot \"{}\" into " + "database \"{}\"", + snapshots.back(), db.name()); + break; + } else { + t.trans.abort(); + logger.info( + "Unuccesfully tryed to import snapshot \"{}\" into " + "database \"{}\"", + snapshots.back(), db.name()); + } + + } catch (const std::exception &e) { + logger.error( + "Error occured while importing snapshot \"{}\" into " + "database \"{}\"", + snapshots.back(), db.name()); + logger.error("{}", e.what()); + t.trans.abort(); + } + + snapshots.pop_back(); + } + + } catch (const std::exception &e) { + logger.error( + "Error occured while importing snapshot for database \"{}\"", + db.name()); + logger.error("{}", e.what()); + } + + logger.info("Finished import for database \"{}\"", db.name()); +} + +bool Snapshoter::snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap) +{ + // TODO +} diff --git a/src/storage/edge_type/edge_type.cpp b/src/storage/edge_type/edge_type.cpp index 2e448f3f2..ce8b56386 100644 --- a/src/storage/edge_type/edge_type.cpp +++ b/src/storage/edge_type/edge_type.cpp @@ -1,17 +1,10 @@ #include "storage/edge_type/edge_type.hpp" -EdgeType::EdgeType(const std::string &id) - : id(id), index_v(std::unique_ptr(new type_index_t())) -{ -} -EdgeType::EdgeType(const char *id) - : id(std::string(id)), - index_v(std::unique_ptr(new type_index_t())) -{ -} +EdgeType::EdgeType(const std::string &id) : EdgeType(std::string(id)) {} +EdgeType::EdgeType(const char *id) : EdgeType(std::string(id)) {} EdgeType::EdgeType(std::string &&id) - : id(std::move(id)), - index_v(std::unique_ptr(new type_index_t())) + : id(id), index_v(std::make_unique(IndexLocation{ + EdgeSide, Option(), Option(id)})) { } diff --git a/src/storage/indexes/impl/nonunique_unordered_index.cpp b/src/storage/indexes/impl/nonunique_unordered_index.cpp index 5a11ee8f5..ca05249e9 100644 --- a/src/storage/indexes/impl/nonunique_unordered_index.cpp +++ b/src/storage/indexes/impl/nonunique_unordered_index.cpp @@ -13,14 +13,15 @@ #include "storage/indexes/index_record.cpp" template -NonUniqueUnorderedIndex::NonUniqueUnorderedIndex() - : IndexBase(false, None) +NonUniqueUnorderedIndex::NonUniqueUnorderedIndex(IndexLocation &&loc) + : IndexBase(IndexDefinition{loc, IndexType{false, None}}) { } template -NonUniqueUnorderedIndex::NonUniqueUnorderedIndex(tx::Transaction const &t) - : IndexBase(false, None, t) +NonUniqueUnorderedIndex::NonUniqueUnorderedIndex(IndexLocation &&loc, + tx::Transaction const &t) + : IndexBase(IndexDefinition{loc, IndexType{false, None}}, t) { } diff --git a/src/storage/indexes/impl/unique_ordered_index.cpp b/src/storage/indexes/impl/unique_ordered_index.cpp index 9d4add4d0..ba5a98dd0 100644 --- a/src/storage/indexes/impl/unique_ordered_index.cpp +++ b/src/storage/indexes/impl/unique_ordered_index.cpp @@ -17,22 +17,22 @@ #include "storage/indexes/index_record.cpp" template -UniqueOrderedIndex::UniqueOrderedIndex(Order order) - : IndexBase(true, order) +UniqueOrderedIndex::UniqueOrderedIndex(IndexLocation loc, Order order) + : IndexBase(IndexDefinition{loc, IndexType{true, order}}) { } template -UniqueOrderedIndex::UniqueOrderedIndex(Order order, +UniqueOrderedIndex::UniqueOrderedIndex(IndexLocation loc, Order order, tx::Transaction const &t) - : IndexBase(true, order, t) + : IndexBase(IndexDefinition{loc, IndexType{true, order}}, t) { } template bool UniqueOrderedIndex::insert(IndexRecord &&value) { - if (this->order() == Descending) { + if (this->type().order == Descending) { value.set_descending(); } return set.access().insert(std::move(value)).second; @@ -55,13 +55,13 @@ auto UniqueOrderedIndex::for_range_exact(DbAccessor &t_v, auto end = to_v; // Sorted order must be checked - if (this->order() == Ascending && from_v.key.is_present()) { + if (this->type().order == Ascending && from_v.key.is_present()) { begin = acc.cfind_or_larger(from_v); - } else if (this->order() == Descending && to_v.key.is_present()) { + } else if (this->type().order == Descending && to_v.key.is_present()) { begin = acc.cfind_or_larger(to_v); end = from_v; } else { - assert(this->order() != None); + assert(this->type().order != None); } // TODO: determine size on fact of border size. auto size = acc.size(); diff --git a/src/storage/indexes/index_base.cpp b/src/storage/indexes/index_base.cpp index 749bc85b2..eec8dd57c 100644 --- a/src/storage/indexes/index_base.cpp +++ b/src/storage/indexes/index_base.cpp @@ -5,14 +5,14 @@ #include "transactions/transaction.hpp" template -IndexBase::IndexBase(bool unique, Order order) - : _unique(unique), _order(order), created(Id(0)), active(true) +IndexBase::IndexBase(IndexDefinition &&it) + : it(it), created(Id(0)), active(true) { } template -IndexBase::IndexBase(bool unique, Order order, const tx::Transaction &t) - : _unique(unique), _order(order), created(t.id) +IndexBase::IndexBase(IndexDefinition &&it, const tx::Transaction &t) + : it(it), created(t.id) { } diff --git a/src/storage/label/label.cpp b/src/storage/label/label.cpp index b532c2d52..e4e70c0d7 100644 --- a/src/storage/label/label.cpp +++ b/src/storage/label/label.cpp @@ -2,7 +2,10 @@ #include "storage/label/label.hpp" Label::Label(const char *name) - : name(std::string(name)), index_v(std::make_unique()) + : name(std::string(name)), + index_v(std::make_unique( + IndexLocation{VertexSide, Option(), + Option(std::string(name))})) { } diff --git a/src/storage/label/label_collection.cpp b/src/storage/label/label_collection.cpp index 1932e528d..9a02fd3e9 100644 --- a/src/storage/label/label_collection.cpp +++ b/src/storage/label/label_collection.cpp @@ -2,14 +2,6 @@ #include "storage/label/label.hpp" -auto LabelCollection::begin() { return _labels.begin(); } -auto LabelCollection::begin() const { return _labels.begin(); } -auto LabelCollection::cbegin() const { return _labels.begin(); } - -auto LabelCollection::end() { return _labels.end(); } -auto LabelCollection::end() const { return _labels.end(); } -auto LabelCollection::cend() const { return _labels.end(); } - bool LabelCollection::add(const Label &label) { if (has(label)) { diff --git a/src/transactions/transaction.cpp b/src/transactions/transaction.cpp index d8ac4673b..3b6aa309a 100644 --- a/src/transactions/transaction.cpp +++ b/src/transactions/transaction.cpp @@ -11,10 +11,17 @@ namespace tx Transaction::Transaction(const Id &id, const Snapshot &snapshot, Engine &engine) - : id(id), cid(1), snapshot(snapshot), engine(engine) + : TransactionId(id, snapshot, engine) { } +// Returns copy of transaction_id +TransactionId Transaction::transaction_id() +{ + TransactionId const &t = *this; + return t; +} + void Transaction::wait_for_active() { while (snapshot.size() > 0) { @@ -26,16 +33,6 @@ void Transaction::wait_for_active() } } -bool Transaction::is_active(const Id &id) const -{ - return snapshot.is_active(id); -} - -Id Transaction::oldest_active() -{ - return snapshot.oldest_active().take_or(Id(id)); -} - void Transaction::take_lock(RecordLock &lock) { locks.take(&lock, id); } void Transaction::commit() { engine.commit(*this); } diff --git a/src/transactions/transaction_id.cpp b/src/transactions/transaction_id.cpp new file mode 100644 index 000000000..e3bc7cec3 --- /dev/null +++ b/src/transactions/transaction_id.cpp @@ -0,0 +1,32 @@ +#include "transactions/transaction_id.hpp" + +namespace tx +{ + +TransactionId::TransactionId(Engine &engine) + : TransactionId(Id(), Snapshot(), engine) +{ +} + +TransactionId::TransactionId(const Id &&id, const Snapshot &&snapshot, + Engine &engine) + : id(id), cid(1), snapshot(std::move(snapshot)), engine(engine) +{ +} + +TransactionId::TransactionId(const Id &id, const Snapshot &snapshot, + Engine &engine) + : id(id), cid(1), snapshot(snapshot), engine(engine) +{ +} + +bool TransactionId::in_snapshot(const Id &id) const +{ + return snapshot.is_active(id); +} + +Id TransactionId::oldest_active() +{ + return snapshot.oldest_active().take_or(Id(id)); +} +} diff --git a/src/utils/numerics/saturate.cpp b/src/utils/numerics/saturate.cpp new file mode 100644 index 000000000..13a24db2b --- /dev/null +++ b/src/utils/numerics/saturate.cpp @@ -0,0 +1,9 @@ +#include "utils/numerics/saturate.hpp" + +std::size_t num::saturating_add(std::size_t a, std::size_t b) +{ + a = a >= size_t_HIGHEST_BIT_SETED ? size_t_HIGHEST_BIT_SETED - 1 : a; + b = b >= size_t_HIGHEST_BIT_SETED ? size_t_HIGHEST_BIT_SETED - 1 : b; + + return a + b; +}