diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index df7584a9e..1b4944521 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,7 +19,7 @@ set(mg_single_node_sources database/single_node/graph_db.cpp database/single_node/graph_db_accessor.cpp durability/single_node/state_delta.cpp - durability/paths.cpp + durability/single_node/paths.cpp durability/single_node/recovery.cpp durability/single_node/snapshooter.cpp durability/single_node/wal.cpp @@ -123,7 +123,7 @@ set(mg_distributed_sources database/distributed/config.cpp database/distributed/graph_db_accessor.cpp durability/distributed/state_delta.cpp - durability/paths.cpp + durability/distributed/paths.cpp durability/distributed/recovery.cpp durability/distributed/snapshooter.cpp durability/distributed/wal.cpp diff --git a/src/database/single_node/graph_db.cpp b/src/database/single_node/graph_db.cpp index 1fc079656..15aed8eb1 100644 --- a/src/database/single_node/graph_db.cpp +++ b/src/database/single_node/graph_db.cpp @@ -6,7 +6,7 @@ #include "database/single_node/counters.hpp" #include "database/single_node/graph_db_accessor.hpp" -#include "durability/paths.hpp" +#include "durability/single_node/paths.hpp" #include "durability/single_node/recovery.hpp" #include "durability/single_node/snapshooter.hpp" #include "storage/single_node/concurrent_id_mapper.hpp" @@ -17,8 +17,6 @@ namespace database { GraphDb::GraphDb(Config config) : config_(config) { - CHECK(config.worker_id == 0) - << "Worker ID should only be set in distributed GraphDb"; if (config_.durability_enabled) utils::CheckDir(config_.durability_directory); // Durability recovery. @@ -33,7 +31,7 @@ GraphDb::GraphDb(Config config) : config_(config) { recovery_info = durability::RecoverOnlySnapshot( config_.durability_directory, this, &recovery_data, - std::experimental::nullopt, 0); + std::experimental::nullopt); // Post-recovery setup and checking. if (recovery_info) { @@ -132,7 +130,7 @@ void GraphDb::CollectGarbage() { storage_gc_->CollectGarbage(); } bool GraphDb::MakeSnapshot(GraphDbAccessor &accessor) { const bool status = durability::MakeSnapshot( - *this, accessor, 0, fs::path(config_.durability_directory), + *this, accessor, fs::path(config_.durability_directory), config_.snapshot_max_retained); if (status) { LOG(INFO) << "Snapshot created successfully."; @@ -145,8 +143,7 @@ bool GraphDb::MakeSnapshot(GraphDbAccessor &accessor) { void GraphDb::ReinitializeStorage() { // Release gc scheduler to stop it from touching storage storage_gc_ = nullptr; - storage_ = - std::make_unique(config_.worker_id, config_.properties_on_disk); + storage_ = std::make_unique(config_.properties_on_disk); storage_gc_ = std::make_unique(*storage_, tx_engine_, config_.gc_cycle_sec); } diff --git a/src/database/single_node/graph_db.hpp b/src/database/single_node/graph_db.hpp index ba99ba124..e68dd7fd3 100644 --- a/src/database/single_node/graph_db.hpp +++ b/src/database/single_node/graph_db.hpp @@ -37,15 +37,6 @@ struct Config { // set of properties which will be stored on disk std::vector properties_on_disk; - - // Distributed master/worker flags. - bool dynamic_graph_partitioner_enabled{false}; - int rpc_num_client_workers{0}; - int rpc_num_server_workers{0}; - int worker_id{0}; - io::network::Endpoint master_endpoint{"0.0.0.0", 0}; - io::network::Endpoint worker_endpoint{"0.0.0.0", 0}; - int recovering_cluster_size{0}; }; class GraphDbAccessor; @@ -115,10 +106,10 @@ class GraphDb { Config config_; std::unique_ptr storage_ = - std::make_unique(config_.worker_id, config_.properties_on_disk); - durability::WriteAheadLog wal_{ - config_.worker_id, config_.durability_directory, - config_.durability_enabled, config_.synchronous_commit}; + std::make_unique(config_.properties_on_disk); + durability::WriteAheadLog wal_{config_.durability_directory, + config_.durability_enabled, + config_.synchronous_commit}; tx::Engine tx_engine_{&wal_}; std::unique_ptr storage_gc_ = diff --git a/src/database/single_node/graph_db_accessor.cpp b/src/database/single_node/graph_db_accessor.cpp index 1b73adcc6..bbaedabad 100644 --- a/src/database/single_node/graph_db_accessor.cpp +++ b/src/database/single_node/graph_db_accessor.cpp @@ -6,7 +6,6 @@ #include #include "durability/single_node/state_delta.hpp" -#include "storage/single_node/address_types.hpp" #include "storage/single_node/edge.hpp" #include "storage/single_node/edge_accessor.hpp" #include "storage/single_node/vertex.hpp" @@ -61,29 +60,26 @@ bool GraphDbAccessor::should_abort() const { durability::WriteAheadLog &GraphDbAccessor::wal() { return db_.wal(); } VertexAccessor GraphDbAccessor::InsertVertex( - std::experimental::optional requested_gid, - std::experimental::optional cypher_id) { + std::experimental::optional requested_gid) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; auto gid = db_.storage().vertex_generator_.Next(requested_gid); - if (!cypher_id) cypher_id = utils::MemcpyCast(gid); - auto vertex_vlist = - new mvcc::VersionList(transaction_, gid, *cypher_id); + auto vertex_vlist = new mvcc::VersionList(transaction_, gid); bool success = db_.storage().vertices_.access().insert(gid, vertex_vlist).second; CHECK(success) << "Attempting to insert a vertex with an existing GID: " << gid; - wal().Emplace(database::StateDelta::CreateVertex( - transaction_.id_, vertex_vlist->gid_, vertex_vlist->cypher_id())); - auto va = VertexAccessor(storage::VertexAddress(vertex_vlist), *this); + wal().Emplace( + database::StateDelta::CreateVertex(transaction_.id_, vertex_vlist->gid_)); + auto va = VertexAccessor(vertex_vlist, *this); return va; } std::experimental::optional GraphDbAccessor::FindVertexOptional( gid::Gid gid, bool current_state) { - VertexAccessor record_accessor( - storage::VertexAddress(db_.storage().LocalAddress(gid)), *this); + VertexAccessor record_accessor(db_.storage().LocalAddress(gid), + *this); if (!record_accessor.Visible(transaction(), current_state)) return std::experimental::nullopt; return record_accessor; @@ -97,8 +93,7 @@ VertexAccessor GraphDbAccessor::FindVertex(gid::Gid gid, bool current_state) { std::experimental::optional GraphDbAccessor::FindEdgeOptional( gid::Gid gid, bool current_state) { - EdgeAccessor record_accessor( - storage::EdgeAddress(db_.storage().LocalAddress(gid)), *this); + EdgeAccessor record_accessor(db_.storage().LocalAddress(gid), *this); if (!record_accessor.Visible(transaction(), current_state)) return std::experimental::nullopt; return record_accessor; @@ -131,8 +126,6 @@ void GraphDbAccessor::BuildIndex(storage::Label label, "Index is either being created by another transaction or already " "exists."); } - // Call the hook for inherited classes. - PostCreateIndex(key); // Everything that happens after the line above ended will be added to the // index automatically, but we still have to add to index everything that @@ -171,7 +164,7 @@ void GraphDbAccessor::BuildIndex(storage::Label label, DCHECK(removed) << "Index building (read) transaction should be inside set"; }); - dba->PopulateIndexFromBuildIndex(key); + dba->PopulateIndex(key); dba->EnableIndex(key); dba->Commit(); @@ -197,8 +190,8 @@ void GraphDbAccessor::PopulateIndex(const LabelPropertyIndex::Key &key) { for (auto vertex : Vertices(key.label_, false)) { if (vertex.PropsAt(key.property_).type() == PropertyValue::Type::Null) continue; - db_.storage().label_property_index_.UpdateOnLabelProperty( - vertex.address().local(), vertex.current_); + db_.storage().label_property_index_.UpdateOnLabelProperty(vertex.address(), + vertex.current_); } } @@ -206,8 +199,7 @@ void GraphDbAccessor::UpdateLabelIndices(storage::Label label, const VertexAccessor &vertex_accessor, const Vertex *const vertex) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - DCHECK(vertex_accessor.is_local()) << "Only local vertices belong in indexes"; - auto *vlist_ptr = vertex_accessor.address().local(); + auto *vlist_ptr = vertex_accessor.address(); db_.storage().labels_index_.Update(label, vlist_ptr, vertex); db_.storage().label_property_index_.UpdateOnLabel(label, vlist_ptr, vertex); } @@ -216,9 +208,8 @@ void GraphDbAccessor::UpdatePropertyIndex( storage::Property property, const RecordAccessor &vertex_accessor, const Vertex *const vertex) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - DCHECK(vertex_accessor.is_local()) << "Only local vertices belong in indexes"; db_.storage().label_property_index_.UpdateOnProperty( - property, vertex_accessor.address().local(), vertex); + property, vertex_accessor.address(), vertex); } int64_t GraphDbAccessor::VerticesCount() const { @@ -305,7 +296,7 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor, vertex_accessor.out_degree() + vertex_accessor.in_degree() > 0) return false; - auto *vlist_ptr = vertex_accessor.address().local(); + auto *vlist_ptr = vertex_accessor.address(); wal().Emplace(database::StateDelta::RemoveVertex( transaction_.id_, vlist_ptr->gid_, check_empty)); vlist_ptr->remove(vertex_accessor.current_, transaction_); @@ -331,76 +322,34 @@ void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) { EdgeAccessor GraphDbAccessor::InsertEdge( VertexAccessor &from, VertexAccessor &to, storage::EdgeType edge_type, - std::experimental::optional requested_gid, - std::experimental::optional cypher_id) { + std::experimental::optional requested_gid) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - - auto edge_address = - InsertEdgeOnFrom(&from, &to, edge_type, requested_gid, cypher_id); - - InsertEdgeOnTo(&from, &to, edge_type, edge_address); - return EdgeAccessor(edge_address, *this, from.address(), to.address(), - edge_type); -} - -storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom( - VertexAccessor *from, VertexAccessor *to, - const storage::EdgeType &edge_type, - const std::experimental::optional &requested_gid, - const std::experimental::optional &cypher_id) { - auto edge_accessor = InsertOnlyEdge(from->address(), to->address(), edge_type, - requested_gid, cypher_id); - auto edge_address = edge_accessor.address(); - - from->SwitchNew(); - auto from_updated = &from->update(); - - // TODO when preparing WAL for distributed, most likely never use - // `CREATE_EDGE`, but always have it split into 3 parts (edge insertion, - // in/out modification). - wal().Emplace(database::StateDelta::CreateEdge( - transaction_.id_, edge_accessor.gid(), edge_accessor.CypherId(), - from->gid(), to->gid(), edge_type, EdgeTypeName(edge_type))); - - from_updated->out_.emplace( - db_.storage().LocalizedAddressIfPossible(to->address()), edge_address, - edge_type); - return edge_address; -} - -void GraphDbAccessor::InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to, - const storage::EdgeType &edge_type, - const storage::EdgeAddress &edge_address) { - // Ensure that the "to" accessor has the latest version (switch new). - // WARNING: Must do that after the above "from->update()" for cases when - // we are creating a cycle and "from" and "to" are the same vlist. - to->SwitchNew(); - auto *to_updated = &to->update(); - to_updated->in_.emplace( - db_.storage().LocalizedAddressIfPossible(from->address()), edge_address, - edge_type); -} - -EdgeAccessor GraphDbAccessor::InsertOnlyEdge( - storage::VertexAddress from, storage::VertexAddress to, - storage::EdgeType edge_type, - std::experimental::optional requested_gid, - std::experimental::optional cypher_id) { - CHECK(from.is_local()) - << "`from` address should be local when calling InsertOnlyEdge"; auto gid = db_.storage().edge_generator_.Next(requested_gid); - if (!cypher_id) cypher_id = utils::MemcpyCast(gid); - auto edge_vlist = new mvcc::VersionList(transaction_, gid, *cypher_id, - from, to, edge_type); + auto edge_vlist = new mvcc::VersionList( + transaction_, gid, from.address(), to.address(), edge_type); // We need to insert edge_vlist to edges_ before calling update since update // can throw and edge_vlist will not be garbage collected if it is not in // edges_ skiplist. bool success = db_.storage().edges_.access().insert(gid, edge_vlist).second; CHECK(success) << "Attempting to insert an edge with an existing GID: " << gid; - auto ea = EdgeAccessor(storage::EdgeAddress(edge_vlist), *this, from, to, - edge_type); - return ea; + + // ensure that the "from" accessor has the latest version + from.SwitchNew(); + from.update().out_.emplace(to.address(), edge_vlist, edge_type); + + // ensure that the "to" accessor has the latest version (Switch new) + // WARNING: must do that after the above "from.update()" for cases when + // we are creating a cycle and "from" and "to" are the same vlist + to.SwitchNew(); + to.update().in_.emplace(from.address(), edge_vlist, edge_type); + + wal().Emplace(database::StateDelta::CreateEdge( + transaction_.id_, edge_vlist->gid_, from.gid(), to.gid(), edge_type, + EdgeTypeName(edge_type))); + + return EdgeAccessor(edge_vlist, *this, from.address(), to.address(), + edge_type); } int64_t GraphDbAccessor::EdgesCount() const { @@ -419,7 +368,7 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge, if (remove_out_edge) edge.from().RemoveOutEdge(edge.address()); if (remove_in_edge) edge.to().RemoveInEdge(edge.address()); - edge.address().local()->remove(edge.current_, transaction_); + edge.address()->remove(edge.current_, transaction_); wal().Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid())); } diff --git a/src/database/single_node/graph_db_accessor.hpp b/src/database/single_node/graph_db_accessor.hpp index cf7d90b2b..4a6ffaa6d 100644 --- a/src/database/single_node/graph_db_accessor.hpp +++ b/src/database/single_node/graph_db_accessor.hpp @@ -12,7 +12,6 @@ #include "database/single_node/graph_db.hpp" #include "storage/common/types.hpp" -#include "storage/single_node/address_types.hpp" #include "storage/single_node/edge_accessor.hpp" #include "storage/single_node/vertex_accessor.hpp" #include "transactions/transaction.hpp" @@ -77,14 +76,11 @@ class GraphDbAccessor { * * @param requested_gid The requested GID. Should only be provided when * recovering from durability. - * @param cypher_id Take a look under mvcc::VersionList::cypher_id * * @return See above. */ VertexAccessor InsertVertex(std::experimental::optional - requested_gid = std::experimental::nullopt, - std::experimental::optional cypher_id = - std::experimental::nullopt); + requested_gid = std::experimental::nullopt); /** * Removes the vertex of the given accessor. If the vertex has any outgoing or @@ -150,7 +146,7 @@ class GraphDbAccessor { // wrap version lists into accessors, which will look for visible versions auto accessors = iter::imap( [this](auto id_vlist) { - return VertexAccessor(storage::VertexAddress(id_vlist.second), *this); + return VertexAccessor(id_vlist.second, *this); }, db_.storage().vertices_.access()); @@ -176,7 +172,7 @@ class GraphDbAccessor { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; return iter::imap( [this](auto vlist) { - return VertexAccessor(storage::VertexAddress(vlist), *this); + return VertexAccessor(vlist, *this); }, db_.storage().labels_index_.GetVlists(label, transaction_, current_state)); @@ -202,7 +198,7 @@ class GraphDbAccessor { << "Label+property index doesn't exist."; return iter::imap( [this](auto vlist) { - return VertexAccessor(storage::VertexAddress(vlist), *this); + return VertexAccessor(vlist, *this); }, db_.storage().label_property_index_.GetVlists( LabelPropertyIndex::Key(label, property), transaction_, @@ -232,7 +228,7 @@ class GraphDbAccessor { << "Can't query index for propery value type null."; return iter::imap( [this](auto vlist) { - return VertexAccessor(storage::VertexAddress(vlist), *this); + return VertexAccessor(vlist, *this); }, db_.storage().label_property_index_.GetVlists( LabelPropertyIndex::Key(label, property), value, transaction_, @@ -277,7 +273,7 @@ class GraphDbAccessor { << "Label+property index doesn't exist."; return iter::imap( [this](auto vlist) { - return VertexAccessor(storage::VertexAddress(vlist), *this); + return VertexAccessor(vlist, *this); }, db_.storage().label_property_index_.GetVlists( LabelPropertyIndex::Key(label, property), lower, upper, @@ -300,31 +296,14 @@ class GraphDbAccessor { * @param type Edge type. * @param requested_gid The requested GID. Should only be provided when * recovering from durability. - * @param cypher_id Take a look under mvcc::VersionList::cypher_id * * @return An accessor to the edge. */ EdgeAccessor InsertEdge(VertexAccessor &from, VertexAccessor &to, storage::EdgeType type, std::experimental::optional requested_gid = - std::experimental::nullopt, - std::experimental::optional cypher_id = std::experimental::nullopt); - /** - * Insert edge into main storage, but don't insert it into from and to - * vertices edge lists. - * - * @param cypher_id Take a look under mvcc::VersionList::cypher_id - */ - EdgeAccessor InsertOnlyEdge(storage::VertexAddress from, - storage::VertexAddress to, - storage::EdgeType edge_type, - std::experimental::optional - requested_gid = std::experimental::nullopt, - std::experimental::optional cypher_id = - std::experimental::nullopt); - /** * Removes an edge from the graph. Parameters can indicate if the edge should * be removed from data structures in vertices it connects. When removing an @@ -382,7 +361,7 @@ class GraphDbAccessor { // wrap version lists into accessors, which will look for visible versions auto accessors = iter::imap( [this](auto id_vlist) { - return EdgeAccessor(storage::EdgeAddress(id_vlist.second), *this); + return EdgeAccessor(id_vlist.second, *this); }, db_.storage().edges_.access()); @@ -616,34 +595,6 @@ class GraphDbAccessor { const VertexAccessor &vertex_accessor, const Vertex *const vertex); - protected: - /** Called in `BuildIndex` after creating an index, but before populating. */ - void PostCreateIndex(const LabelPropertyIndex::Key &key) {} - - /** Populates the index from a *new* transaction after creating the index. */ - void PopulateIndexFromBuildIndex(const LabelPropertyIndex::Key &key) { - PopulateIndex(key); - } - - /** - * Insert a new edge to `from` vertex and return the address. - * Called from `InsertEdge` as the first step in edge insertion. - * */ - storage::EdgeAddress InsertEdgeOnFrom( - VertexAccessor *from, VertexAccessor *to, - const storage::EdgeType &edge_type, - const std::experimental::optional &requested_gid, - const std::experimental::optional &cypher_id); - - /** - * Set the newly created edge on `to` vertex. - * Called after `InsertEdgeOnFrom` in `InsertEdge`. The given `edge_address` - * is from the created edge, returned by `InsertEdgeOnFrom`. - */ - void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to, - const storage::EdgeType &edge_type, - const storage::EdgeAddress &edge_address); - private: GraphDb &db_; tx::Transaction &transaction_; diff --git a/src/durability/paths.cpp b/src/durability/distributed/paths.cpp similarity index 98% rename from src/durability/paths.cpp rename to src/durability/distributed/paths.cpp index f773a9bd0..e673c6da7 100644 --- a/src/durability/paths.cpp +++ b/src/durability/distributed/paths.cpp @@ -1,4 +1,4 @@ -#include "durability/paths.hpp" +#include "durability/distributed/paths.hpp" #include #include diff --git a/src/durability/paths.hpp b/src/durability/distributed/paths.hpp similarity index 100% rename from src/durability/paths.hpp rename to src/durability/distributed/paths.hpp diff --git a/src/durability/distributed/recovery.cpp b/src/durability/distributed/recovery.cpp index 1401b0255..89fb6fd8a 100644 --- a/src/durability/distributed/recovery.cpp +++ b/src/durability/distributed/recovery.cpp @@ -5,12 +5,12 @@ #include #include "database/distributed/graph_db_accessor.hpp" +#include "durability/distributed/paths.hpp" #include "durability/distributed/snapshot_decoder.hpp" #include "durability/distributed/snapshot_value.hpp" #include "durability/distributed/version.hpp" #include "durability/distributed/wal.hpp" #include "durability/hashed_file_reader.hpp" -#include "durability/paths.hpp" #include "glue/communication.hpp" // TODO: WTF is typed value doing here?! #include "query/typed_value.hpp" diff --git a/src/durability/distributed/snapshooter.cpp b/src/durability/distributed/snapshooter.cpp index e9899e6bd..550c3fa5d 100644 --- a/src/durability/distributed/snapshooter.cpp +++ b/src/durability/distributed/snapshooter.cpp @@ -5,10 +5,10 @@ #include #include "database/distributed/graph_db_accessor.hpp" +#include "durability/distributed/paths.hpp" #include "durability/distributed/snapshot_encoder.hpp" #include "durability/distributed/version.hpp" #include "durability/hashed_file_writer.hpp" -#include "durability/paths.hpp" #include "utils/file.hpp" namespace fs = std::experimental::filesystem; diff --git a/src/durability/distributed/wal.cpp b/src/durability/distributed/wal.cpp index d7c95b093..6ac4ddae8 100644 --- a/src/durability/distributed/wal.cpp +++ b/src/durability/distributed/wal.cpp @@ -1,7 +1,7 @@ #include "durability/distributed/wal.hpp" #include "durability/distributed/version.hpp" -#include "durability/paths.hpp" +#include "durability/distributed/paths.hpp" #include "utils/file.hpp" #include "utils/flag_validation.hpp" diff --git a/src/durability/single_node/paths.cpp b/src/durability/single_node/paths.cpp new file mode 100644 index 000000000..6a85cd40f --- /dev/null +++ b/src/durability/single_node/paths.cpp @@ -0,0 +1,90 @@ +#include "durability/single_node/paths.hpp" + +#include +#include +#include + +#include "glog/logging.h" + +#include "transactions/type.hpp" +#include "utils/string.hpp" +#include "utils/timestamp.hpp" + +namespace durability { + +namespace fs = std::experimental::filesystem; + +std::experimental::optional TransactionIdFromWalFilename( + const std::string &name) { + auto nullopt = std::experimental::nullopt; + // Get the max_transaction_id from the file name that has format + // "XXXXX__max_transaction_" + auto file_name_split = utils::RSplit(name, "__", 1); + if (file_name_split.size() != 2) { + LOG(WARNING) << "Unable to parse WAL file name: " << name; + return nullopt; + } + if (utils::StartsWith(file_name_split[1], "current")) + return std::numeric_limits::max(); + file_name_split = utils::Split(file_name_split[1], "_"); + if (file_name_split.size() != 3) { + LOG(WARNING) << "Unable to parse WAL file name: " << name; + return nullopt; + } + auto &tx_id_str = file_name_split[2]; + try { + return std::stoll(tx_id_str); + } catch (std::invalid_argument &) { + LOG(WARNING) << "Unable to parse WAL file name tx ID: " << tx_id_str; + return nullopt; + } catch (std::out_of_range &) { + LOG(WARNING) << "WAL file name tx ID too large: " << tx_id_str; + return nullopt; + } +} + +/// Generates a file path for a write-ahead log file. If given a transaction ID +/// the file name will contain it. Otherwise the file path is for the "current" +/// WAL file for which the max tx id is still unknown. +fs::path WalFilenameForTransactionId( + const std::experimental::filesystem::path &wal_dir, + std::experimental::optional tx_id) { + auto file_name = utils::Timestamp::Now().ToIso8601(); + if (tx_id) { + file_name += "__max_transaction_" + std::to_string(*tx_id); + } else { + file_name += "__current"; + } + return wal_dir / file_name; +} + +fs::path MakeSnapshotPath(const fs::path &durability_dir, + tx::TransactionId tx_id) { + std::string date_str = + utils::Timestamp(utils::Timestamp::Now()) + .ToString("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}"); + auto file_name = date_str + "_tx_" + std::to_string(tx_id); + return durability_dir / kSnapshotDir / file_name; +} + +std::experimental::optional +TransactionIdFromSnapshotFilename(const std::string &name) { + auto nullopt = std::experimental::nullopt; + auto file_name_split = utils::RSplit(name, "_tx_", 1); + if (file_name_split.size() != 2) { + LOG(WARNING) << "Unable to parse snapshot file name: " << name; + return nullopt; + } + try { + return std::stoll(file_name_split[1]); + } catch (std::invalid_argument &) { + LOG(WARNING) << "Unable to parse snapshot file name tx ID: " + << file_name_split[1]; + return nullopt; + } catch (std::out_of_range &) { + LOG(WARNING) << "Unable to parse snapshot file name tx ID: " + << file_name_split[1]; + return nullopt; + } +} +} // namespace durability diff --git a/src/durability/single_node/paths.hpp b/src/durability/single_node/paths.hpp new file mode 100644 index 000000000..a0a45604e --- /dev/null +++ b/src/durability/single_node/paths.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include +#include + +#include "transactions/type.hpp" + +namespace durability { +const std::string kSnapshotDir = "snapshots"; +const std::string kWalDir = "wal"; +const std::string kBackupDir = ".backup"; + +/// Returns the transaction id contained in the file name. If the filename is +/// not a parseable WAL file name, nullopt is returned. If the filename +/// represents the "current" WAL file, then the maximum possible transaction ID +/// is returned because that's appropriate for the recovery logic (the current +/// WAL does not yet have a maximum transaction ID and can't be discarded by +/// the recovery regardless of the snapshot from which the transaction starts). +std::experimental::optional TransactionIdFromWalFilename( + const std::string &name); + +/// Generates a file path for a write-ahead log file. If given a transaction ID +/// the file name will contain it. Otherwise the file path is for the "current" +/// WAL file for which the max tx id is still unknown. +std::experimental::filesystem::path WalFilenameForTransactionId( + const std::experimental::filesystem::path &wal_dir, + std::experimental::optional tx_id = + std::experimental::nullopt); + +/// Generates a path for a DB snapshot in the given folder in a well-defined +/// sortable format with transaction from which the snapshot is created appended +/// to the file name. +std::experimental::filesystem::path MakeSnapshotPath( + const std::experimental::filesystem::path &durability_dir, + tx::TransactionId tx_id); + +/// Returns the transaction id contained in the file name. If the filename is +/// not a parseable WAL file name, nullopt is returned. +std::experimental::optional +TransactionIdFromSnapshotFilename(const std::string &name); +} // namespace durability diff --git a/src/durability/single_node/recovery.cpp b/src/durability/single_node/recovery.cpp index 1f95a8127..06b5fc000 100644 --- a/src/durability/single_node/recovery.cpp +++ b/src/durability/single_node/recovery.cpp @@ -4,17 +4,13 @@ #include #include +#include "communication/bolt/v1/decoder/decoder.hpp" #include "database/single_node/graph_db_accessor.hpp" #include "durability/hashed_file_reader.hpp" -#include "durability/paths.hpp" -#include "durability/single_node/snapshot_decoder.hpp" -#include "durability/single_node/snapshot_value.hpp" +#include "durability/single_node/paths.hpp" #include "durability/single_node/version.hpp" #include "durability/single_node/wal.hpp" #include "glue/communication.hpp" -// TODO: WTF is typed value doing here?! -#include "query/typed_value.hpp" -#include "storage/single_node/address_types.hpp" #include "storage/single_node/indexes/label_property_index.hpp" #include "transactions/type.hpp" #include "utils/algorithm.hpp" @@ -44,7 +40,7 @@ bool VersionConsistency(const fs::path &durability_dir) { for (const auto &file : fs::directory_iterator(recovery_dir)) { HashedFileReader reader; - SnapshotDecoder decoder(reader); + communication::bolt::Decoder decoder(reader); // The following checks are ok because we are only trying to detect // version inconsistencies. @@ -108,9 +104,9 @@ using communication::bolt::Value; } bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db, - RecoveryData *recovery_data, int worker_id) { + RecoveryData *recovery_data) { HashedFileReader reader; - SnapshotDecoder decoder(reader); + communication::bolt::Decoder decoder(reader); RETURN_IF_NOT(reader.Open(snapshot_file)); @@ -129,20 +125,6 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db, RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) && dv.ValueInt() == durability::kVersion); - // Checks worker id was set correctly - RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) && - dv.ValueInt() == worker_id); - - // Vertex and edge generator ids - RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int)); - uint64_t vertex_generator_cnt = dv.ValueInt(); - db->storage().VertexGenerator().SetId(std::max( - db->storage().VertexGenerator().LocalCount(), vertex_generator_cnt)); - RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int)); - uint64_t edge_generator_cnt = dv.ValueInt(); - db->storage().EdgeGenerator().SetId( - std::max(db->storage().EdgeGenerator().LocalCount(), edge_generator_cnt)); - RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int)); recovery_data->snapshooter_tx_id = dv.ValueInt(); // Transaction snapshot of the transaction that created the snapshot. @@ -165,87 +147,33 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db, } auto dba = db->Access(); - std::unordered_map> - edge_gid_endpoints_mapping; - + std::unordered_map vertices; for (int64_t i = 0; i < vertex_count; ++i) { - auto vertex = decoder.ReadSnapshotVertex(); - RETURN_IF_NOT(vertex); + Value vertex_dv; + RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, Value::Type::Vertex)); + auto &vertex = vertex_dv.ValueVertex(); + auto vertex_accessor = dba->InsertVertex(vertex.id.AsUint()); - auto vertex_accessor = dba->InsertVertex(vertex->gid, vertex->cypher_id); - for (const auto &label : vertex->labels) { + for (const auto &label : vertex.labels) { vertex_accessor.add_label(dba->Label(label)); } - for (const auto &property_pair : vertex->properties) { + for (const auto &property_pair : vertex.properties) { vertex_accessor.PropsSet(dba->Property(property_pair.first), glue::ToPropertyValue(property_pair.second)); } - auto vertex_record = vertex_accessor.GetNew(); - for (const auto &edge : vertex->in) { - vertex_record->in_.emplace(edge.vertex, edge.address, - dba->EdgeType(edge.type)); - edge_gid_endpoints_mapping[edge.address.gid()] = { - edge.vertex, vertex_accessor.GlobalAddress()}; - } - for (const auto &edge : vertex->out) { - vertex_record->out_.emplace(edge.vertex, edge.address, - dba->EdgeType(edge.type)); - edge_gid_endpoints_mapping[edge.address.gid()] = { - vertex_accessor.GlobalAddress(), edge.vertex}; - } + vertices.insert({vertex.id.AsUint(), vertex_accessor}); } - auto vertex_transform_to_local_if_possible = - [&dba, worker_id](storage::VertexAddress &address) { - if (address.is_local()) return; - // If the worker id matches it should be a local apperance - if (address.worker_id() == worker_id) { - address = storage::VertexAddress( - dba->db().storage().LocalAddress(address.gid())); - CHECK(address.is_local()) << "Address should be local but isn't"; - } - }; - - auto edge_transform_to_local_if_possible = - [&dba, worker_id](storage::EdgeAddress &address) { - if (address.is_local()) return; - // If the worker id matches it should be a local apperance - if (address.worker_id() == worker_id) { - address = storage::EdgeAddress( - dba->db().storage().LocalAddress(address.gid())); - CHECK(address.is_local()) << "Address should be local but isn't"; - } - }; - - Value dv_cypher_id; - for (int64_t i = 0; i < edge_count; ++i) { - RETURN_IF_NOT( - decoder.ReadValue(&dv, communication::bolt::Value::Type::Edge)); - auto &edge = dv.ValueEdge(); - - // Read cypher_id - RETURN_IF_NOT(decoder.ReadValue(&dv_cypher_id, - communication::bolt::Value::Type::Int)); - auto cypher_id = dv_cypher_id.ValueInt(); - - // We have to take full edge endpoints from vertices since the endpoints - // found here don't containt worker_id, and this can't be changed since this - // edges must be bolt-compliant - auto &edge_endpoints = edge_gid_endpoints_mapping[edge.id.AsUint()]; - - storage::VertexAddress from; - storage::VertexAddress to; - std::tie(from, to) = edge_endpoints; - - // From and to are written in the global_address format and we should - // convert them back to local format for speedup - if possible - vertex_transform_to_local_if_possible(from); - vertex_transform_to_local_if_possible(to); - - auto edge_accessor = dba->InsertOnlyEdge(from, to, dba->EdgeType(edge.type), - edge.id.AsUint(), cypher_id); + Value edge_dv; + RETURN_IF_NOT(decoder.ReadValue(&edge_dv, Value::Type::Edge)); + auto &edge = edge_dv.ValueEdge(); + auto it_from = vertices.find(edge.from.AsUint()); + auto it_to = vertices.find(edge.to.AsUint()); + RETURN_IF_NOT(it_from != vertices.end() && it_to != vertices.end()); + auto edge_accessor = + dba->InsertEdge(it_from->second, it_to->second, + dba->EdgeType(edge.type), edge.id.AsUint()); for (const auto &property_pair : edge.properties) edge_accessor.PropsSet(dba->Property(property_pair.first), @@ -261,32 +189,6 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db, return false; } - // We have to replace global_ids with local ids where possible for all edges - // in every vertex and this can only be done after we inserted the edges; this - // is to speedup execution - for (auto &vertex_accessor : dba->Vertices(true)) { - auto vertex = vertex_accessor.GetNew(); - auto iterate_and_transform = - [vertex_transform_to_local_if_possible, - edge_transform_to_local_if_possible](Edges &edges) { - Edges transformed; - for (auto &element : edges) { - auto vertex = element.vertex; - vertex_transform_to_local_if_possible(vertex); - - auto edge = element.edge; - edge_transform_to_local_if_possible(edge); - - transformed.emplace(vertex, edge, element.edge_type); - } - - return transformed; - }; - - vertex->in_ = iterate_and_transform(vertex->in_); - vertex->out_ = iterate_and_transform(vertex->out_); - } - // Ensure that the next transaction ID in the recovered DB will be greater // than the latest one we have recovered. Do this to make sure that // subsequently created snapshots and WAL files will have transactional info @@ -374,8 +276,7 @@ std::vector ReadWalRecoverableTransactions( RecoveryInfo RecoverOnlySnapshot( const fs::path &durability_dir, database::GraphDb *db, RecoveryData *recovery_data, - std::experimental::optional required_snapshot_tx_id, - int worker_id) { + std::experimental::optional required_snapshot_tx_id) { // Attempt to recover from snapshot files in reverse order (from newest // backwards). const auto snapshot_dir = durability_dir / kSnapshotDir; @@ -397,7 +298,7 @@ RecoveryInfo RecoverOnlySnapshot( } } LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file; - if (!RecoverSnapshot(snapshot_file, db, recovery_data, worker_id)) { + if (!RecoverSnapshot(snapshot_file, db, recovery_data)) { db->ReinitializeStorage(); recovery_data->Clear(); LOG(WARNING) << "Snapshot recovery failed, trying older snapshot..."; diff --git a/src/durability/single_node/recovery.hpp b/src/durability/single_node/recovery.hpp index 93d26c350..6b40a6c62 100644 --- a/src/durability/single_node/recovery.hpp +++ b/src/durability/single_node/recovery.hpp @@ -110,13 +110,12 @@ void MoveToBackup(const std::experimental::filesystem::path &durability_dir); RecoveryInfo RecoverOnlySnapshot( const std::experimental::filesystem::path &durability_dir, database::GraphDb *db, durability::RecoveryData *recovery_data, - std::experimental::optional required_snapshot_tx_id, - int worker_id); + std::experimental::optional required_snapshot_tx_id); /** Interface for accessing transactions during WAL recovery. */ class RecoveryTransactions { public: - RecoveryTransactions(database::GraphDb *db); + explicit RecoveryTransactions(database::GraphDb *db); void Begin(const tx::TransactionId &tx_id); diff --git a/src/durability/single_node/snapshooter.cpp b/src/durability/single_node/snapshooter.cpp index 862a6ee49..c878eb661 100644 --- a/src/durability/single_node/snapshooter.cpp +++ b/src/durability/single_node/snapshooter.cpp @@ -4,11 +4,12 @@ #include +#include "communication/bolt/v1/encoder/base_encoder.hpp" #include "database/single_node/graph_db_accessor.hpp" #include "durability/hashed_file_writer.hpp" -#include "durability/paths.hpp" -#include "durability/single_node/snapshot_encoder.hpp" +#include "durability/single_node/paths.hpp" #include "durability/single_node/version.hpp" +#include "glue/communication.hpp" #include "utils/file.hpp" namespace fs = std::experimental::filesystem; @@ -16,30 +17,21 @@ namespace fs = std::experimental::filesystem; namespace durability { // Snapshot layout is described in durability/version.hpp -static_assert(durability::kVersion == 6, +static_assert(durability::kVersion == 7, "Wrong snapshot version, please update!"); namespace { bool Encode(const fs::path &snapshot_file, database::GraphDb &db, - database::GraphDbAccessor &dba, int worker_id) { + database::GraphDbAccessor &dba) { try { HashedFileWriter buffer(snapshot_file); - SnapshotEncoder encoder(buffer); + communication::bolt::BaseEncoder encoder(buffer); int64_t vertex_num = 0, edge_num = 0; encoder.WriteRAW(durability::kSnapshotMagic.data(), durability::kSnapshotMagic.size()); encoder.WriteInt(durability::kVersion); - // Writes the worker id to snapshot, used to guarantee consistent cluster - // state after recovery - encoder.WriteInt(worker_id); - - // Write the number of generated vertex and edges, used to recover - // generators internal states - encoder.WriteInt(db.storage().VertexGenerator().LocalCount()); - encoder.WriteInt(db.storage().EdgeGenerator().LocalCount()); - // Write the ID of the transaction doing the snapshot. encoder.WriteInt(dba.transaction_id()); @@ -63,12 +55,11 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db, } for (const auto &vertex : dba.Vertices(false)) { - encoder.WriteSnapshotVertex(vertex); + encoder.WriteVertex(glue::ToBoltVertex(vertex)); vertex_num++; } for (const auto &edge : dba.Edges(false)) { encoder.WriteEdge(glue::ToBoltEdge(edge)); - encoder.WriteInt(edge.CypherId()); edge_num++; } buffer.WriteValue(vertex_num); @@ -122,13 +113,12 @@ void RemoveOldWals(const fs::path &wal_dir, } // namespace bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba, - int worker_id, const fs::path &durability_dir, - int snapshot_max_retained) { + const fs::path &durability_dir, int snapshot_max_retained) { if (!utils::EnsureDir(durability_dir / kSnapshotDir)) return false; const auto snapshot_file = - MakeSnapshotPath(durability_dir, worker_id, dba.transaction_id()); + MakeSnapshotPath(durability_dir, dba.transaction_id()); if (fs::exists(snapshot_file)) return false; - if (Encode(snapshot_file, db, dba, worker_id)) { + if (Encode(snapshot_file, db, dba)) { RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained); RemoveOldWals(durability_dir / kWalDir, dba.transaction()); return true; diff --git a/src/durability/single_node/snapshooter.hpp b/src/durability/single_node/snapshooter.hpp index 1d7f19a8d..1ad7568b5 100644 --- a/src/durability/single_node/snapshooter.hpp +++ b/src/durability/single_node/snapshooter.hpp @@ -14,7 +14,6 @@ namespace durability { * @param snapshot_max_retained - maximum number of snapshots to retain. */ bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba, - int worker_id, const std::experimental::filesystem::path &durability_dir, int snapshot_max_retained); diff --git a/src/durability/single_node/snapshot_decoder.hpp b/src/durability/single_node/snapshot_decoder.hpp deleted file mode 100644 index 243f014cd..000000000 --- a/src/durability/single_node/snapshot_decoder.hpp +++ /dev/null @@ -1,105 +0,0 @@ -#pragma once - -#include - -#include "communication/bolt/v1/decoder/decoder.hpp" -#include "durability/single_node/snapshot_value.hpp" - -namespace durability { - -template -class SnapshotDecoder : public communication::bolt::Decoder { - public: - explicit SnapshotDecoder(Buffer &buffer) - : communication::bolt::Decoder(buffer) {} - - std::experimental::optional ReadSnapshotVertex() { - communication::bolt::Value dv; - SnapshotVertex vertex; - - // Read global id, labels and properties of the vertex - if (!communication::bolt::Decoder::ReadValue( - &dv, communication::bolt::Value::Type::Vertex)) { - DLOG(WARNING) << "Unable to read snapshot vertex"; - return std::experimental::nullopt; - } - auto &read_vertex = dv.ValueVertex(); - vertex.gid = read_vertex.id.AsUint(); - vertex.labels = read_vertex.labels; - vertex.properties = read_vertex.properties; - - // Read cypher_id - if (!communication::bolt::Decoder::ReadValue( - &dv, communication::bolt::Value::Type::Int)) { - DLOG(WARNING) << "Unable to read vertex cypher_id"; - return std::experimental::nullopt; - } - vertex.cypher_id = dv.ValueInt(); - - // Read in edges - if (!communication::bolt::Decoder::ReadValue( - &dv, communication::bolt::Value::Type::Int)) { - DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of in " - "edges in vertex!"; - return std::experimental::nullopt; - } - for (int i = 0; i < dv.ValueInt(); ++i) { - auto edge = ReadSnapshotEdge(); - if (!edge) return std::experimental::nullopt; - vertex.in.emplace_back(*edge); - } - - // Read out edges - if (!communication::bolt::Decoder::ReadValue( - &dv, communication::bolt::Value::Type::Int)) { - DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of out " - "edges in vertex!"; - return std::experimental::nullopt; - } - for (int i = 0; i < dv.ValueInt(); ++i) { - auto edge = ReadSnapshotEdge(); - if (!edge) return std::experimental::nullopt; - vertex.out.emplace_back(*edge); - } - - VLOG(20) << "[ReadSnapshotVertex] Success"; - return vertex; - } - - private: - std::experimental::optional ReadSnapshotEdge() { - communication::bolt::Value dv; - InlinedVertexEdge edge; - - VLOG(20) << "[ReadSnapshotEdge] Start"; - - // Read global id of this edge - if (!communication::bolt::Decoder::ReadValue( - &dv, communication::bolt::Value::Type::Int)) { - DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read Global ID!"; - return std::experimental::nullopt; - } - edge.address = storage::EdgeAddress(static_cast(dv.ValueInt())); - - // Read global vertex id of the other side of the edge - // (global id of from/to vertexes). - if (!communication::bolt::Decoder::ReadValue( - &dv, communication::bolt::Value::Type::Int)) { - DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read from/to address!"; - return std::experimental::nullopt; - } - edge.vertex = storage::VertexAddress(static_cast(dv.ValueInt())); - - // Read edge type - if (!communication::bolt::Decoder::ReadValue( - &dv, communication::bolt::Value::Type::String)) { - DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read type!"; - return std::experimental::nullopt; - } - edge.type = dv.ValueString(); - - VLOG(20) << "[ReadSnapshotEdge] Success"; - return edge; - } -}; -}; // namespace durability diff --git a/src/durability/single_node/snapshot_encoder.hpp b/src/durability/single_node/snapshot_encoder.hpp deleted file mode 100644 index 1b05a58e8..000000000 --- a/src/durability/single_node/snapshot_encoder.hpp +++ /dev/null @@ -1,58 +0,0 @@ -#pragma once - -#include "communication/bolt/v1/encoder/base_encoder.hpp" -#include "database/single_node/graph_db_accessor.hpp" -#include "glue/communication.hpp" -#include "utils/cast.hpp" - -namespace durability { - -template -class SnapshotEncoder : public communication::bolt::BaseEncoder { - public: - explicit SnapshotEncoder(Buffer &buffer) - : communication::bolt::BaseEncoder(buffer) {} - void WriteSnapshotVertex(const VertexAccessor &vertex) { - communication::bolt::BaseEncoder::WriteVertex( - glue::ToBoltVertex(vertex)); - - // Write cypher_id - this->WriteInt(vertex.CypherId()); - - // Write in edges without properties - this->WriteUInt(vertex.in_degree()); - auto edges_in = vertex.in(); - for (const auto &edge : edges_in) { - this->WriteSnapshotEdge(edge, true); - } - - // Write out edges without properties - this->WriteUInt(vertex.out_degree()); - auto edges_out = vertex.out(); - for (const auto &edge : edges_out) { - this->WriteSnapshotEdge(edge, false); - } - } - - private: - void WriteUInt(const uint64_t &value) { - this->WriteInt(utils::MemcpyCast(value)); - } - - // Writes edge without properties - void WriteSnapshotEdge(const EdgeAccessor &edge, bool write_from) { - // Write global id of the edge - WriteUInt(edge.GlobalAddress().raw()); - - // Write to/from global id - if (write_from) - WriteUInt(edge.from().GlobalAddress().raw()); - else - WriteUInt(edge.to().GlobalAddress().raw()); - - // Write type - this->WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType())); - } -}; - -} // namespace durability diff --git a/src/durability/single_node/snapshot_value.hpp b/src/durability/single_node/snapshot_value.hpp deleted file mode 100644 index 26ea3c804..000000000 --- a/src/durability/single_node/snapshot_value.hpp +++ /dev/null @@ -1,46 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "communication/bolt/v1/value.hpp" -// TODO: WTF is this doing here? -#include "query/typed_value.hpp" -#include "storage/common/property_value.hpp" -#include "storage/single_node/address_types.hpp" -#include "utils/algorithm.hpp" -#include "utils/exceptions.hpp" - -namespace durability { - -/** Forward declartion of SnapshotEdge. */ -struct InlinedVertexEdge; - -/** - * Structure used when reading a Vertex with the decoder. - * The decoder writes data into this structure. - */ -struct SnapshotVertex { - gid::Gid gid; - int64_t cypher_id; - std::vector labels; - std::map properties; - // Vector of edges without properties - std::vector in; - std::vector out; -}; - -/** - * Structure used when reading an Edge with the snapshot decoder. - * The decoder writes data into this structure. - */ -struct InlinedVertexEdge { - // Addresses down below must always be global_address and never direct - // pointers to a record. - storage::EdgeAddress address; - storage::VertexAddress vertex; - std::string type; -}; - -} // namespace durability diff --git a/src/durability/single_node/state_delta.cpp b/src/durability/single_node/state_delta.cpp index dd092d203..18c653813 100644 --- a/src/durability/single_node/state_delta.cpp +++ b/src/durability/single_node/state_delta.cpp @@ -20,22 +20,20 @@ StateDelta StateDelta::TxAbort(tx::TransactionId tx_id) { return {StateDelta::Type::TRANSACTION_ABORT, tx_id}; } -StateDelta StateDelta::CreateVertex(tx::TransactionId tx_id, gid::Gid vertex_id, - int64_t cypher_id) { +StateDelta StateDelta::CreateVertex(tx::TransactionId tx_id, + gid::Gid vertex_id) { StateDelta op(StateDelta::Type::CREATE_VERTEX, tx_id); op.vertex_id = vertex_id; - op.cypher_id = cypher_id; return op; } StateDelta StateDelta::CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id, - int64_t cypher_id, gid::Gid vertex_from_id, + gid::Gid vertex_from_id, gid::Gid vertex_to_id, storage::EdgeType edge_type, const std::string &edge_type_name) { StateDelta op(StateDelta::Type::CREATE_EDGE, tx_id); op.edge_id = edge_id; - op.cypher_id = cypher_id; op.vertex_from_id = vertex_from_id; op.vertex_to_id = vertex_to_id; op.edge_type = edge_type; @@ -43,53 +41,6 @@ StateDelta StateDelta::CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id, return op; } -StateDelta StateDelta::AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id, - storage::VertexAddress vertex_to_address, - storage::EdgeAddress edge_address, - storage::EdgeType edge_type) { - CHECK(vertex_to_address.is_remote() && edge_address.is_remote()) - << "WAL can only contain global addresses."; - StateDelta op(StateDelta::Type::ADD_OUT_EDGE, tx_id); - op.vertex_id = vertex_id; - op.vertex_to_address = vertex_to_address; - op.edge_address = edge_address; - op.edge_type = edge_type; - return op; -} - -StateDelta StateDelta::RemoveOutEdge(tx::TransactionId tx_id, - gid::Gid vertex_id, - storage::EdgeAddress edge_address) { - CHECK(edge_address.is_remote()) << "WAL can only contain global addresses."; - StateDelta op(StateDelta::Type::REMOVE_OUT_EDGE, tx_id); - op.vertex_id = vertex_id; - op.edge_address = edge_address; - return op; -} - -StateDelta StateDelta::AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, - storage::VertexAddress vertex_from_address, - storage::EdgeAddress edge_address, - storage::EdgeType edge_type) { - CHECK(vertex_from_address.is_remote() && edge_address.is_remote()) - << "WAL can only contain global addresses."; - StateDelta op(StateDelta::Type::ADD_IN_EDGE, tx_id); - op.vertex_id = vertex_id; - op.vertex_from_address = vertex_from_address; - op.edge_address = edge_address; - op.edge_type = edge_type; - return op; -} - -StateDelta StateDelta::RemoveInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, - storage::EdgeAddress edge_address) { - CHECK(edge_address.is_remote()) << "WAL can only contain global addresses."; - StateDelta op(StateDelta::Type::REMOVE_IN_EDGE, tx_id); - op.vertex_id = vertex_id; - op.edge_address = edge_address; - return op; -} - StateDelta StateDelta::PropsSetVertex(tx::TransactionId tx_id, gid::Gid vertex_id, storage::Property property, @@ -174,36 +125,14 @@ void StateDelta::Encode( break; case Type::CREATE_VERTEX: encoder.WriteInt(vertex_id); - encoder.WriteInt(cypher_id); break; case Type::CREATE_EDGE: encoder.WriteInt(edge_id); - encoder.WriteInt(cypher_id); encoder.WriteInt(vertex_from_id); encoder.WriteInt(vertex_to_id); encoder.WriteInt(edge_type.Id()); encoder.WriteString(edge_type_name); break; - case Type::ADD_OUT_EDGE: - encoder.WriteInt(vertex_id); - encoder.WriteInt(vertex_to_address.raw()); - encoder.WriteInt(edge_address.raw()); - encoder.WriteInt(edge_type.Id()); - break; - case Type::REMOVE_OUT_EDGE: - encoder.WriteInt(vertex_id); - encoder.WriteInt(edge_address.raw()); - break; - case Type::ADD_IN_EDGE: - encoder.WriteInt(vertex_id); - encoder.WriteInt(vertex_from_address.raw()); - encoder.WriteInt(edge_address.raw()); - encoder.WriteInt(edge_type.Id()); - break; - case Type::REMOVE_IN_EDGE: - encoder.WriteInt(vertex_id); - encoder.WriteInt(edge_address.raw()); - break; case Type::SET_PROPERTY_VERTEX: encoder.WriteInt(vertex_id); encoder.WriteInt(property.Id()); @@ -268,37 +197,14 @@ std::experimental::optional StateDelta::Decode( break; case Type::CREATE_VERTEX: DECODE_MEMBER(vertex_id, ValueInt) - DECODE_MEMBER(cypher_id, ValueInt) break; case Type::CREATE_EDGE: DECODE_MEMBER(edge_id, ValueInt) - DECODE_MEMBER(cypher_id, ValueInt) DECODE_MEMBER(vertex_from_id, ValueInt) DECODE_MEMBER(vertex_to_id, ValueInt) DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType) DECODE_MEMBER(edge_type_name, ValueString) break; - case Type::ADD_OUT_EDGE: - DECODE_MEMBER(vertex_id, ValueInt) - DECODE_MEMBER_CAST(vertex_to_address, ValueInt, storage::VertexAddress) - DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress) - DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType) - break; - case Type::REMOVE_OUT_EDGE: - DECODE_MEMBER(vertex_id, ValueInt) - DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress) - break; - case Type::ADD_IN_EDGE: - DECODE_MEMBER(vertex_id, ValueInt) - DECODE_MEMBER_CAST(vertex_from_address, ValueInt, - storage::VertexAddress) - DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress) - DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType) - break; - case Type::REMOVE_IN_EDGE: - DECODE_MEMBER(vertex_id, ValueInt) - DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress) - break; case Type::SET_PROPERTY_VERTEX: DECODE_MEMBER(vertex_id, ValueInt) DECODE_MEMBER_CAST(property, ValueInt, storage::Property) @@ -357,20 +263,14 @@ void StateDelta::Apply(GraphDbAccessor &dba) const { LOG(FATAL) << "Transaction handling not handled in Apply"; break; case Type::CREATE_VERTEX: - dba.InsertVertex(vertex_id, cypher_id); + dba.InsertVertex(vertex_id); break; case Type::CREATE_EDGE: { auto from = dba.FindVertex(vertex_from_id, true); auto to = dba.FindVertex(vertex_to_id, true); - dba.InsertEdge(from, to, dba.EdgeType(edge_type_name), edge_id, - cypher_id); + dba.InsertEdge(from, to, dba.EdgeType(edge_type_name), edge_id); break; } - case Type::ADD_OUT_EDGE: - case Type::REMOVE_OUT_EDGE: - case Type::ADD_IN_EDGE: - case Type::REMOVE_IN_EDGE: - LOG(FATAL) << "Partial edge creation/deletion not yet supported in Apply"; case Type::SET_PROPERTY_VERTEX: { auto vertex = dba.FindVertex(vertex_id, true); vertex.PropsSet(dba.Property(property_name), value); diff --git a/src/durability/single_node/state_delta.lcp b/src/durability/single_node/state_delta.lcp index 774fcc39e..5285d1ac8 100644 --- a/src/durability/single_node/state_delta.lcp +++ b/src/durability/single_node/state_delta.lcp @@ -5,10 +5,13 @@ #include "communication/bolt/v1/encoder/base_encoder.hpp" #include "durability/hashed_file_reader.hpp" #include "durability/hashed_file_writer.hpp" +#include "mvcc/single_node/version_list.hpp" #include "storage/common/property_value.hpp" #include "storage/common/types.hpp" -#include "storage/single_node/address_types.hpp" #include "storage/single_node/gid.hpp" + +class Vertex; +class Edge; cpp<# (lcp:namespace database) @@ -27,12 +30,11 @@ cpp<# ;; only keep addresses. (vertex-id "gid::Gid") (edge-id "gid::Gid") - (cypher-id :int64_t) - (edge-address "storage::EdgeAddress") + (edge-address "mvcc::VersionList *") (vertex-from-id "gid::Gid") - (vertex-from-address "storage::VertexAddress") + (vertex-from-address "mvcc::VersionList *") (vertex-to-id "gid::Gid") - (vertex-to-address "storage::VertexAddress") + (vertex-to-address "mvcc::VersionList *") (edge-type "storage::EdgeType") (edge-type-name "std::string") (property "storage::Property") @@ -60,10 +62,6 @@ in StateDeltas.") transaction-abort create-vertex ;; vertex_id create-edge ;; edge_id, from_vertex_id, to_vertex_id, edge_type, edge_type_name - add-out-edge ;; vertex_id, edge_address, vertex_to_address, edge_type - remove-out-edge ;; vertex_id, edge_address - add-in-edge ;; vertex_id, edge_address, vertex_from_address, edge_type - remove-in-edge ;; vertex_id, edge_address set-property-vertex ;; vertex_id, property, property_name, property_value set-property-edge ;; edge_id, property, property_name, property_value ;; remove property is done by setting a PropertyValue::Null @@ -98,27 +96,12 @@ omitted in the comment.")) static StateDelta TxCommit(tx::TransactionId tx_id); static StateDelta TxAbort(tx::TransactionId tx_id); static StateDelta CreateVertex(tx::TransactionId tx_id, - gid::Gid vertex_id, - int64_t cypher_id); + gid::Gid vertex_id); static StateDelta CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id, - int64_t cypher_id, gid::Gid vertex_from_id, gid::Gid vertex_to_id, storage::EdgeType edge_type, const std::string &edge_type_name); - static StateDelta AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id, - storage::VertexAddress vertex_to_address, - storage::EdgeAddress edge_address, - storage::EdgeType edge_type); - static StateDelta RemoveOutEdge(tx::TransactionId tx_id, - gid::Gid vertex_id, - storage::EdgeAddress edge_address); - static StateDelta AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, - storage::VertexAddress vertex_from_address, - storage::EdgeAddress edge_address, - storage::EdgeType edge_type); - static StateDelta RemoveInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, - storage::EdgeAddress edge_address); static StateDelta PropsSetVertex(tx::TransactionId tx_id, gid::Gid vertex_id, storage::Property property, diff --git a/src/durability/single_node/version.hpp b/src/durability/single_node/version.hpp index 549ffc212..e3bf86772 100644 --- a/src/durability/single_node/version.hpp +++ b/src/durability/single_node/version.hpp @@ -15,38 +15,29 @@ constexpr std::array kSnapshotMagic{{'M', 'G', 's', 'n'}}; constexpr std::array kWalMagic{{'M', 'G', 'w', 'l'}}; // The current default version of snapshot and WAL encoding / decoding. -constexpr int64_t kVersion{6}; +constexpr int64_t kVersion{7}; -// Snapshot format (version 6): +// Snapshot format (version 7): // 1) Magic number + snapshot version -// 2) Distributed worker ID -// -// The following two entries indicate the starting points for generating new -// vertex/edge IDs in the DB. They are important when there are vertices/edges -// that were moved to another worker (in distributed Memgraph). -// 3) Vertex generator ID -// 4) Edge generator ID // // The following two entries are required when recovering from snapshot combined // with WAL to determine record visibility. -// 5) Transactional ID of the snapshooter -// 6) Transactional snapshot of the snapshooter +// 2) Transactional ID of the snapshooter +// 3) Transactional snapshot of the snapshooter // -// 7) A list of label+property indices. +// 4) A list of label+property indices. // // We must inline edges with nodes because some edges might be stored on other // worker (edges are always stored only on the worker of the edge source). -// 8) Bolt encoded nodes. Each node is written in the following format: +// 5) Bolt encoded nodes. Each node is written in the following format: // * gid, labels, properties -// * cypher_id // * inlined edges (edge address, other endpoint address and edge type) -// 9) Bolt encoded edges. Each edge is written in the following format: +// 6) Bolt encoded edges. Each edge is written in the following format: // * gid // * from, to // * edge_type // * properties -// * cypher_id // -// 10) Snapshot summary (number of nodes, number of edges, hash) +// 7) Snapshot summary (number of nodes, number of edges, hash) } // namespace durability diff --git a/src/durability/single_node/wal.cpp b/src/durability/single_node/wal.cpp index 8977ac2c6..902494d7b 100644 --- a/src/durability/single_node/wal.cpp +++ b/src/durability/single_node/wal.cpp @@ -1,6 +1,6 @@ #include "wal.hpp" -#include "durability/paths.hpp" +#include "durability/single_node/paths.hpp" #include "durability/single_node/version.hpp" #include "utils/file.hpp" #include "utils/flag_validation.hpp" @@ -19,11 +19,12 @@ DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096, FLAG_IN_RANGE(1, 1 << 30)); namespace durability { + WriteAheadLog::WriteAheadLog( - int worker_id, const std::experimental::filesystem::path &durability_dir, + const std::experimental::filesystem::path &durability_dir, bool durability_enabled, bool synchronous_commit) : deltas_{FLAGS_wal_buffer_size}, - wal_file_{worker_id, durability_dir}, + wal_file_{durability_dir}, durability_enabled_(durability_enabled), synchronous_commit_(synchronous_commit) { if (durability_enabled_) { @@ -39,8 +40,8 @@ WriteAheadLog::~WriteAheadLog() { } WriteAheadLog::WalFile::WalFile( - int worker_id, const std::experimental::filesystem::path &durability_dir) - : worker_id_(worker_id), wal_dir_{durability_dir / kWalDir} {} + const std::experimental::filesystem::path &durability_dir) + : wal_dir_{durability_dir / kWalDir} {} WriteAheadLog::WalFile::~WalFile() { if (!current_wal_file_.empty()) writer_.Close(); @@ -51,7 +52,7 @@ void WriteAheadLog::WalFile::Init() { LOG(ERROR) << "Can't write to WAL directory: " << wal_dir_; current_wal_file_ = std::experimental::filesystem::path(); } else { - current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_); + current_wal_file_ = WalFilenameForTransactionId(wal_dir_); // TODO: Fix error handling, the encoder_ returns `true` or `false`. try { writer_.Open(current_wal_file_); @@ -104,7 +105,7 @@ void WriteAheadLog::WalFile::RotateFile() { writer_.Close(); std::experimental::filesystem::rename( current_wal_file_, - WalFilenameForTransactionId(wal_dir_, worker_id_, latest_tx_)); + WalFilenameForTransactionId(wal_dir_, latest_tx_)); Init(); } @@ -138,10 +139,6 @@ bool WriteAheadLog::IsStateDeltaTransactionEnd( case database::StateDelta::Type::TRANSACTION_BEGIN: case database::StateDelta::Type::CREATE_VERTEX: case database::StateDelta::Type::CREATE_EDGE: - case database::StateDelta::Type::ADD_OUT_EDGE: - case database::StateDelta::Type::REMOVE_OUT_EDGE: - case database::StateDelta::Type::ADD_IN_EDGE: - case database::StateDelta::Type::REMOVE_IN_EDGE: case database::StateDelta::Type::SET_PROPERTY_VERTEX: case database::StateDelta::Type::SET_PROPERTY_EDGE: case database::StateDelta::Type::ADD_LABEL: diff --git a/src/durability/single_node/wal.hpp b/src/durability/single_node/wal.hpp index 5eee6bb52..b7eb6d20d 100644 --- a/src/durability/single_node/wal.hpp +++ b/src/durability/single_node/wal.hpp @@ -26,8 +26,7 @@ namespace durability { /// indeterminism. class WriteAheadLog { public: - WriteAheadLog(int worker_id, - const std::experimental::filesystem::path &durability_dir, + WriteAheadLog(const std::experimental::filesystem::path &durability_dir, bool durability_enabled, bool synchronous_commit); ~WriteAheadLog(); @@ -48,7 +47,7 @@ class WriteAheadLog { /// Groups the logic of WAL file handling (flushing, naming, rotating) class WalFile { public: - WalFile(int worker_id, const std::experimental::filesystem::path &wal__dir); + explicit WalFile(const std::experimental::filesystem::path &durability_dir); ~WalFile(); /// Initializes the WAL file. Must be called before first flush. Can be @@ -62,7 +61,6 @@ class WriteAheadLog { private: /// Mutex used for flushing wal data std::mutex flush_mutex_; - int worker_id_; const std::experimental::filesystem::path wal_dir_; HashedFileWriter writer_; communication::bolt::BaseEncoder encoder_{writer_}; diff --git a/src/mvcc/single_node/version_list.hpp b/src/mvcc/single_node/version_list.hpp index 86ae7ed8d..1ac3bfed1 100644 --- a/src/mvcc/single_node/version_list.hpp +++ b/src/mvcc/single_node/version_list.hpp @@ -3,6 +3,7 @@ #include "storage/single_node/gid.hpp" #include "storage/locking/record_lock.hpp" #include "transactions/transaction.hpp" +#include "utils/cast.hpp" #include "utils/exceptions.hpp" namespace mvcc { @@ -25,14 +26,12 @@ class VersionList { * @param t - transaction * @param gid - Version list identifier. Uniqueness guaranteed by the code * creating this version list. - * @param cypher_id - Number returned from the id function. * @param args - args forwarded to constructor of item T (for * creating the first Record (Version) in this VersionList. */ template - VersionList(const tx::Transaction &t, gid::Gid gid, int64_t cypher_id, - Args &&... args) - : gid_(gid), cypher_id_(cypher_id) { + VersionList(const tx::Transaction &t, gid::Gid gid, Args &&... args) + : gid_(gid) { // TODO replace 'new' with something better auto *v1 = new T(std::forward(args)...); v1->mark_created(t); @@ -226,7 +225,7 @@ class VersionList { const gid::Gid gid_; - auto cypher_id() { return cypher_id_; } + int64_t cypher_id() { return utils::MemcpyCast(gid_); } private: void lock_and_validate(T *record, const tx::Transaction &t) { @@ -265,18 +264,6 @@ class VersionList { return updated; } - /** - * The following member is here because Memgraph supports ID function from - * the Cypher query language. If you have plans to change this you have to - * consider the following: - * * If the id has to be durable. -> Snapshot and WAL have to be updated. - * * Impact on query execution. | - * * Impact on the communication stack. |-> The id has to be returned - * to the client. - * * Import tools bacause of the dependencies on the durability stack. - * * Implications on the distributed system. - */ - int64_t cypher_id_{0}; std::atomic head_{nullptr}; RecordLock lock_; }; diff --git a/src/query/interpret/awesome_memgraph_functions.cpp b/src/query/interpret/awesome_memgraph_functions.cpp index fe685bd9e..afdbc222f 100644 --- a/src/query/interpret/awesome_memgraph_functions.cpp +++ b/src/query/interpret/awesome_memgraph_functions.cpp @@ -625,6 +625,7 @@ TypedValue IndexInfo(TypedValue *, int64_t nargs, const EvaluationContext &, return std::vector(info.begin(), info.end()); } +#ifdef MG_DISTRIBUTED TypedValue WorkerId(TypedValue *args, int64_t nargs, const EvaluationContext &, database::GraphDbAccessor *) { if (nargs != 1) { @@ -641,6 +642,7 @@ TypedValue WorkerId(TypedValue *args, int64_t nargs, const EvaluationContext &, "'workerId' argument must be a node or an edge."); } } +#endif TypedValue Id(TypedValue *args, int64_t nargs, const EvaluationContext &, database::GraphDbAccessor *dba) { @@ -919,7 +921,9 @@ NameToFunction(const std::string &function_name) { if (function_name == "COUNTER") return Counter; if (function_name == "COUNTERSET") return CounterSet; if (function_name == "INDEXINFO") return IndexInfo; +#ifdef MG_DISTRIBUTED if (function_name == "WORKERID") return WorkerId; +#endif return nullptr; } diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 06bbf20c2..51fa6e880 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -176,8 +176,6 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) { TypedValue &vertex_value = frame[self_.input_symbol_]; ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); auto &v1 = vertex_value.Value(); - CHECK(v1.GlobalAddress().worker_id() == 0 && v1.is_local()) - << "Expected CreateExpand only in single node execution"; // Similarly to CreateNode, newly created edges and nodes should use the // latest accesors. @@ -485,8 +483,6 @@ bool Expand::ExpandCursor::Pull(Frame &frame, Context &context) { // attempt to get a value from the incoming edges if (in_edges_ && *in_edges_it_ != in_edges_->end()) { auto edge = *(*in_edges_it_)++; - CHECK(edge.address().is_local() && edge.from().address().is_local()) - << "Expected Expand only in single node execution"; frame[self_.edge_symbol_] = edge; pull_node(edge, EdgeAtom::Direction::IN); return true; @@ -495,8 +491,6 @@ bool Expand::ExpandCursor::Pull(Frame &frame, Context &context) { // attempt to get a value from the outgoing edges if (out_edges_ && *out_edges_it_ != out_edges_->end()) { auto edge = *(*out_edges_it_)++; - CHECK(edge.address().is_local() && edge.to().address().is_local()) - << "Expected Expand only in single node execution"; // when expanding in EdgeAtom::Direction::BOTH directions // we should do only one expansion for cycles, and it was // already done in the block above diff --git a/src/storage/single_node/address.hpp b/src/storage/single_node/address.hpp deleted file mode 100644 index 003840a80..000000000 --- a/src/storage/single_node/address.hpp +++ /dev/null @@ -1,94 +0,0 @@ -#pragma once - -#include - -#include - -#include "storage/single_node/gid.hpp" - -namespace storage { - -/** - * A data structure that tracks a Vertex/Edge location (address) that's either - * local or remote. The remote address is a global id alongside the id of the - * worker on which it's currently stored, while the local address is simply the - * memory address in the current local process. Both types of address are stored - * in the same storage space, so an Address always takes as much memory as a - * pointer does. - * - * The memory layout for storage is on x64 architecture is the following: - * - the lowest bit stores 0 if address is local and 1 if address is global - * - if the address is local all 64 bits store the local memory address - * - if the address is global then lowest bit stores 1. the following - * kWorkerIdSize bits contain the worker id and the final (upper) 64 - 1 - - * kWorkerIdSize bits contain the global id. - * - * @tparam TRecord - Type of record this address points to. Either Vertex or - * Edge. - */ -template -class Address { - static constexpr uint64_t kTypeMaskSize{1}; - static constexpr uint64_t kTypeMask{(1ULL << kTypeMaskSize) - 1}; - static constexpr uint64_t kWorkerIdSize{gid::kWorkerIdSize}; - static constexpr uint64_t kLocal{0}; - static constexpr uint64_t kRemote{1}; - - public: - using StorageT = uint64_t; - - Address() {} - - // Constructor for raw address value - explicit Address(StorageT storage) : storage_(storage) {} - - // Constructor for local Address. - explicit Address(TLocalObj *ptr) { - uintptr_t ptr_no_type = reinterpret_cast(ptr); - DCHECK((ptr_no_type & kTypeMask) == 0) << "Ptr has type_mask bit set"; - storage_ = ptr_no_type | kLocal; - } - - // Constructor for remote Address, takes worker_id which specifies the worker - // that is storing that vertex/edge - Address(gid::Gid global_id, int worker_id) { - CHECK(global_id < - (1ULL << (sizeof(StorageT) * 8 - kWorkerIdSize - kTypeMaskSize))) - << "Too large global id"; - CHECK(worker_id < (1ULL << kWorkerIdSize)) << "Too larger worker id"; - - storage_ = kRemote; - storage_ |= global_id << (kTypeMaskSize + kWorkerIdSize); - storage_ |= worker_id << kTypeMaskSize; - } - - bool is_local() const { return (storage_ & kTypeMask) == kLocal; } - bool is_remote() const { return (storage_ & kTypeMask) == kRemote; } - - TLocalObj *local() const { - DCHECK(is_local()) << "Attempting to get local address from global"; - return reinterpret_cast(storage_); - } - - gid::Gid gid() const { - DCHECK(is_remote()) << "Attempting to get global ID from local address"; - return storage_ >> (kTypeMaskSize + kWorkerIdSize); - } - - /// Returns worker id where the object is located - int worker_id() const { - DCHECK(is_remote()) << "Attempting to get worker ID from local address"; - return (storage_ >> 1) & ((1ULL << kWorkerIdSize) - 1); - } - - /// Returns raw address value - StorageT raw() const { return storage_; } - - bool operator==(const Address &other) const { - return storage_ == other.storage_; - } - - StorageT storage_{0}; -}; - -} // namespace storage diff --git a/src/storage/single_node/address_types.hpp b/src/storage/single_node/address_types.hpp deleted file mode 100644 index 939a7afa8..000000000 --- a/src/storage/single_node/address_types.hpp +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include "mvcc/single_node/version_list.hpp" -#include "storage/single_node/address.hpp" - -class Edge; -class Vertex; -namespace storage { -using VertexAddress = Address>; -using EdgeAddress = Address>; - -} // namespace storage diff --git a/src/storage/single_node/edge.hpp b/src/storage/single_node/edge.hpp index f34a01756..3237a6765 100644 --- a/src/storage/single_node/edge.hpp +++ b/src/storage/single_node/edge.hpp @@ -4,23 +4,21 @@ #include "mvcc/single_node/version_list.hpp" #include "storage/common/property_value_store.hpp" #include "storage/common/types.hpp" -#include "storage/single_node/address.hpp" class Vertex; class Edge : public mvcc::Record { - using VertexAddress = storage::Address>; - public: - Edge(VertexAddress from, VertexAddress to, storage::EdgeType edge_type) + Edge(mvcc::VersionList *from, mvcc::VersionList *to, + storage::EdgeType edge_type) : from_(from), to_(to), edge_type_(edge_type) {} // Returns new Edge with copy of data stored in this Edge, but without // copying superclass' members. Edge *CloneData() { return new Edge(*this); } - VertexAddress from_; - VertexAddress to_; + mvcc::VersionList *from_; + mvcc::VersionList *to_; storage::EdgeType edge_type_; PropertyValueStore properties_; diff --git a/src/storage/single_node/edge_accessor.cpp b/src/storage/single_node/edge_accessor.cpp index a12fad91a..7f0554be9 100644 --- a/src/storage/single_node/edge_accessor.cpp +++ b/src/storage/single_node/edge_accessor.cpp @@ -4,7 +4,7 @@ #include "storage/vertex_accessor.hpp" #include "utils/algorithm.hpp" -EdgeAccessor::EdgeAccessor(EdgeAddress address, +EdgeAccessor::EdgeAccessor(mvcc::VersionList *address, database::GraphDbAccessor &db_accessor) : RecordAccessor(address, db_accessor), from_(nullptr), @@ -18,9 +18,10 @@ EdgeAccessor::EdgeAccessor(EdgeAddress address, } } -EdgeAccessor::EdgeAccessor(EdgeAddress address, +EdgeAccessor::EdgeAccessor(mvcc::VersionList *address, database::GraphDbAccessor &db_accessor, - VertexAddress from, VertexAddress to, + mvcc::VersionList *from, + mvcc::VersionList *to, storage::EdgeType edge_type) : RecordAccessor(address, db_accessor), from_(from), diff --git a/src/storage/single_node/edge_accessor.hpp b/src/storage/single_node/edge_accessor.hpp index e2eaeda25..2692b3c35 100644 --- a/src/storage/single_node/edge_accessor.hpp +++ b/src/storage/single_node/edge_accessor.hpp @@ -1,6 +1,5 @@ #pragma once -#include "storage/single_node/address_types.hpp" #include "storage/single_node/edge.hpp" #include "storage/single_node/record_accessor.hpp" @@ -20,20 +19,19 @@ class VertexAccessor; * location, which is often a performance bottleneck in traversals. */ class EdgeAccessor final : public RecordAccessor { - using EdgeAddress = storage::EdgeAddress; - using VertexAddress = storage::VertexAddress; - public: /** Constructor that reads data from the random memory location (lower * performance, see class docs). */ - EdgeAccessor(EdgeAddress address, database::GraphDbAccessor &db_accessor); + EdgeAccessor(mvcc::VersionList *address, + database::GraphDbAccessor &db_accessor); /** * Constructor that does NOT read data from the random memory location * (better performance, see class docs). */ - EdgeAccessor(EdgeAddress address, database::GraphDbAccessor &db_accessor, - VertexAddress from, VertexAddress to, + EdgeAccessor(mvcc::VersionList *address, + database::GraphDbAccessor &db_accessor, + mvcc::VersionList *from, mvcc::VersionList *to, storage::EdgeType edge_type); storage::EdgeType EdgeType() const; @@ -63,8 +61,8 @@ class EdgeAccessor final : public RecordAccessor { bool is_cycle() const; private: - VertexAddress from_; - VertexAddress to_; + mvcc::VersionList *from_; + mvcc::VersionList *to_; storage::EdgeType edge_type_; }; diff --git a/src/storage/single_node/edges.hpp b/src/storage/single_node/edges.hpp index b12d42e71..99b02530c 100644 --- a/src/storage/single_node/edges.hpp +++ b/src/storage/single_node/edges.hpp @@ -7,8 +7,6 @@ #include "mvcc/single_node/version_list.hpp" #include "storage/common/types.hpp" -#include "storage/single_node/address.hpp" -#include "storage/single_node/address_types.hpp" #include "utils/algorithm.hpp" /** @@ -19,8 +17,8 @@ class Edges { private: struct Element { - storage::VertexAddress vertex; - storage::EdgeAddress edge; + mvcc::VersionList *vertex; + mvcc::VersionList *edge; storage::EdgeType edge_type; }; @@ -49,7 +47,7 @@ class Edges { */ Iterator(std::vector::const_iterator position, std::vector::const_iterator end, - storage::VertexAddress vertex, + mvcc::VersionList *vertex, const std::vector *edge_types) : position_(position), end_(end), @@ -80,14 +78,14 @@ class Edges { // Optional predicates. If set they define which edges are skipped by the // iterator. Only one can be not-null in the current implementation. - storage::VertexAddress vertex_{nullptr}; + mvcc::VersionList *vertex_{nullptr}; // For edge types we use a vector pointer because it's optional. const std::vector *edge_types_ = nullptr; /** Helper function that skips edges that don't satisfy the predicate * present in this iterator. */ void update_position() { - if (vertex_.local()) { + if (vertex_) { position_ = std::find_if(position_, end_, [v = this->vertex_](const Element &e) { return e.vertex == v; @@ -110,7 +108,7 @@ class Edges { * @param edge - The edge. * @param edge_type - Type of the edge. */ - void emplace(storage::VertexAddress vertex, storage::EdgeAddress edge, + void emplace(mvcc::VersionList *vertex, mvcc::VersionList *edge, storage::EdgeType edge_type) { storage_.emplace_back(Element{vertex, edge, edge_type}); } @@ -118,7 +116,7 @@ class Edges { /** * Removes an edge from this structure. */ - void RemoveEdge(storage::EdgeAddress edge) { + void RemoveEdge(mvcc::VersionList *edge) { auto found = std::find_if( storage_.begin(), storage_.end(), [edge](const Element &element) { return edge == element.edge; }); @@ -143,7 +141,7 @@ class Edges { * @param edge_types - The edge types at least one of which must be matched. * If nullptr edges are not filtered on type. */ - auto begin(storage::VertexAddress vertex, + auto begin(mvcc::VersionList *vertex, const std::vector *edge_types) const { if (edge_types && edge_types->empty()) edge_types = nullptr; return Iterator(storage_.begin(), storage_.end(), vertex, edge_types); diff --git a/src/storage/single_node/gid.hpp b/src/storage/single_node/gid.hpp index 80ad99a37..fa0fd6c9e 100644 --- a/src/storage/single_node/gid.hpp +++ b/src/storage/single_node/gid.hpp @@ -16,16 +16,6 @@ namespace gid { */ using Gid = uint64_t; -static constexpr std::size_t kWorkerIdSize{10}; - -/// Returns `local` id from global id. -static inline uint64_t LocalId(Gid gid) { return gid >> kWorkerIdSize; } - -/// Returns id of the worker that created this gid. -static inline int CreatorWorker(Gid gid) { - return gid & ((1ULL << kWorkerIdSize) - 1); -} - /** * Threadsafe generation of new global ids which belong to the * worker_id machine. Never call SetId after calling Next without an Id you are @@ -36,8 +26,6 @@ static inline int CreatorWorker(Gid gid) { */ class Generator { public: - Generator(int worker_id) : worker_id_(worker_id) {} - /** * Returns a globally unique identifier. * @@ -47,29 +35,14 @@ class Generator { gid::Gid Next(std::experimental::optional requested_gid = std::experimental::nullopt) { if (requested_gid) { - if (gid::CreatorWorker(*requested_gid) == worker_id_) - utils::EnsureAtomicGe(next_local_id_, gid::LocalId(*requested_gid) + 1); + utils::EnsureAtomicGe(next_local_id_, *requested_gid + 1); return *requested_gid; } else { - generated_id_ = true; - return worker_id_ | next_local_id_++ << kWorkerIdSize; + return next_local_id_++; } } - /// Returns number of locally generated ids - uint64_t LocalCount() const { return next_local_id_; }; - - // Sets a new id from which every new gid will be generated, should only be - // set before first Next is called - void SetId(uint64_t id) { - DCHECK(!generated_id_) - << "Id should be set only before first id is generated"; - next_local_id_ = id; - } - private: - bool generated_id_{false}; - int worker_id_; std::atomic next_local_id_{0}; }; } // namespace gid diff --git a/src/storage/single_node/record_accessor.cpp b/src/storage/single_node/record_accessor.cpp index d9dc580d8..77bc8fcd6 100644 --- a/src/storage/single_node/record_accessor.cpp +++ b/src/storage/single_node/record_accessor.cpp @@ -10,11 +10,9 @@ using database::StateDelta; template -RecordAccessor::RecordAccessor(AddressT address, +RecordAccessor::RecordAccessor(mvcc::VersionList *address, database::GraphDbAccessor &db_accessor) - : db_accessor_(&db_accessor), - address_(db_accessor.db().storage().LocalizedAddressIfPossible(address)) { -} + : db_accessor_(&db_accessor), address_(address) {} template PropertyValue RecordAccessor::PropsAt(storage::Property key) const { @@ -28,10 +26,8 @@ void RecordAccessor::PropsSet(storage::Property key, auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, dba.PropertyName(key), value); update().properties_.set(key, value); - if (is_local()) { - dba.UpdatePropertyIndex(key, *this, &update()); - } - ProcessDelta(delta); + dba.UpdatePropertyIndex(key, *this, &update()); + db_accessor().wal().Emplace(delta); } template <> @@ -42,7 +38,7 @@ void RecordAccessor::PropsSet(storage::Property key, dba.PropertyName(key), value); update().properties_.set(key, value); - ProcessDelta(delta); + db_accessor().wal().Emplace(delta); } template <> @@ -52,7 +48,7 @@ void RecordAccessor::PropsErase(storage::Property key) { StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); update().properties_.set(key, PropertyValue::Null); - ProcessDelta(delta); + db_accessor().wal().Emplace(delta); } template <> @@ -62,7 +58,7 @@ void RecordAccessor::PropsErase(storage::Property key) { StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); update().properties_.set(key, PropertyValue::Null); - ProcessDelta(delta); + db_accessor().wal().Emplace(delta); } template @@ -93,39 +89,24 @@ database::GraphDbAccessor &RecordAccessor::db_accessor() const { template gid::Gid RecordAccessor::gid() const { - return is_local() ? address_.local()->gid_ : address_.gid(); + return address_->gid_; } template -typename RecordAccessor::AddressT RecordAccessor::address() - const { +typename mvcc::VersionList *RecordAccessor::address() const { return address_; } -template -typename RecordAccessor::AddressT -RecordAccessor::GlobalAddress() const { - // TODO: This is still coupled to distributed storage, albeit loosely. - int worker_id = 0; - CHECK(is_local()); - return storage::Address>(gid(), worker_id); -} - template RecordAccessor &RecordAccessor::SwitchNew() { - if (is_local()) { - if (!new_) { - // if new_ is not set yet, look for it - // we can just Reconstruct the pointers, old_ will get initialized - // to the same value as it has now, and the amount of work is the - // same as just looking for a new_ record - if (!Reconstruct()) - DLOG(FATAL) - << "RecordAccessor::SwitchNew - accessor invalid after Reconstruct"; - } - } else { - // A remote record only sees local updates, until the command is advanced. - // So this does nothing, as the old/new switch happens below. + if (!new_) { + // if new_ is not set yet, look for it + // we can just Reconstruct the pointers, old_ will get initialized + // to the same value as it has now, and the amount of work is the + // same as just looking for a new_ record + if (!Reconstruct()) + DLOG(FATAL) + << "RecordAccessor::SwitchNew - accessor invalid after Reconstruct"; } current_ = new_ ? new_ : old_; return *this; @@ -141,8 +122,7 @@ template bool RecordAccessor::Reconstruct() const { auto &dba = db_accessor(); const auto &addr = address(); - CHECK(is_local()); - addr.local()->find_set_old_new(dba.transaction(), &old_, &new_); + addr->find_set_old_new(dba.transaction(), &old_, &new_); current_ = old_ ? old_ : new_; return old_ != nullptr || new_ != nullptr; } @@ -165,8 +145,7 @@ TRecord &RecordAccessor::update() const { if (new_) return *new_; const auto &addr = address(); - CHECK(addr.is_local()); - new_ = addr.local()->update(dba.transaction()); + new_ = addr->update(dba.transaction()); DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update"; return *new_; @@ -174,7 +153,7 @@ TRecord &RecordAccessor::update() const { template int64_t RecordAccessor::CypherId() const { - return address().local()->cypher_id(); + return address()->cypher_id(); } template @@ -188,12 +167,5 @@ const TRecord &RecordAccessor::current() const { return *current_; } -template -void RecordAccessor::ProcessDelta( - const database::StateDelta &delta) const { - CHECK(is_local()); - db_accessor().wal().Emplace(delta); -} - template class RecordAccessor; template class RecordAccessor; diff --git a/src/storage/single_node/record_accessor.hpp b/src/storage/single_node/record_accessor.hpp index df2a7ea76..b36653548 100644 --- a/src/storage/single_node/record_accessor.hpp +++ b/src/storage/single_node/record_accessor.hpp @@ -7,7 +7,6 @@ #include "storage/common/property_value.hpp" #include "storage/common/property_value_store.hpp" #include "storage/common/types.hpp" -#include "storage/single_node/address.hpp" #include "storage/single_node/gid.hpp" #include "utils/total_ordering.hpp" @@ -28,8 +27,6 @@ struct StateDelta; template class RecordAccessor : public utils::TotalOrdering> { protected: - using AddressT = storage::Address>; - /** * The database::GraphDbAccessor is friend to this accessor so it can * operate on it's data (mvcc version-list and the record itself). @@ -47,7 +44,7 @@ class RecordAccessor : public utils::TotalOrdering> { * accessor. * @param db_accessor The DB accessor that "owns" this record accessor. */ - RecordAccessor(AddressT address, database::GraphDbAccessor &db_accessor); + RecordAccessor(mvcc::VersionList *address, database::GraphDbAccessor &db_accessor); // this class is default copyable, movable and assignable RecordAccessor(const RecordAccessor &other) = default; @@ -82,9 +79,7 @@ class RecordAccessor : public utils::TotalOrdering> { */ gid::Gid gid() const; - AddressT address() const; - - AddressT GlobalAddress() const; + mvcc::VersionList *address() const; /* * Switches this record accessor to use the latest version visible to the @@ -146,20 +141,12 @@ class RecordAccessor : public utils::TotalOrdering> { (current_state && new_ && !new_->is_expired_by(t)); } - // TODO: This shouldn't be here, because it's only relevant in distributed. - /** Indicates if this accessor represents a local Vertex/Edge, or one whose - * owner is some other worker in a distributed system. */ - bool is_local() const { return address_.is_local(); } - /** * Returns Cypher Id of this record. */ int64_t CypherId() const; protected: - /** Process a change delta, e.g. by writing WAL. */ - void ProcessDelta(const database::StateDelta &delta) const; - /** * Pointer to the version (either old_ or new_) that READ operations * in the accessor should take data from. Note that WRITE operations @@ -180,7 +167,7 @@ class RecordAccessor : public utils::TotalOrdering> { // Immutable, set in the constructor and never changed. database::GraphDbAccessor *db_accessor_; - AddressT address_; + mvcc::VersionList *address_; /** * Latest version which is visible to the current transaction+command diff --git a/src/storage/single_node/storage.hpp b/src/storage/single_node/storage.hpp index 979c0f579..46d08a78b 100644 --- a/src/storage/single_node/storage.hpp +++ b/src/storage/single_node/storage.hpp @@ -8,17 +8,12 @@ #include "mvcc/single_node/version_list.hpp" #include "storage/common/types.hpp" #include "storage/kvstore/kvstore.hpp" -#include "storage/single_node/address.hpp" #include "storage/single_node/edge.hpp" #include "storage/single_node/indexes/key_index.hpp" #include "storage/single_node/indexes/label_property_index.hpp" #include "storage/single_node/vertex.hpp" #include "transactions/type.hpp" -namespace distributed { -class IndexRpcServer; -}; - namespace database { class GraphDb; }; @@ -28,11 +23,8 @@ namespace database { /** A data structure containing the main data members of a graph database. */ class Storage { public: - Storage(int worker_id, const std::vector &properties_on_disk) - : worker_id_(worker_id), - vertex_generator_{worker_id}, - edge_generator_{worker_id}, - properties_on_disk_{properties_on_disk} {} + explicit Storage(const std::vector &properties_on_disk) + : properties_on_disk_{properties_on_disk} {} public: ~Storage() { @@ -64,41 +56,13 @@ class Storage { return found->second; } - /// Converts an address to local, if possible. Returns the same address if - /// not. - template - storage::Address> LocalizedAddressIfPossible( - storage::Address> address) const { - if (address.is_local()) return address; - if (address.worker_id() == worker_id_) { - auto vlist = LocalAddress(address.gid()); - if constexpr (std::is_same::value) - return storage::VertexAddress(vlist); - else - return storage::EdgeAddress(vlist); - } - return address; - } - - /// Returns remote address for the given local or remote address. - template - TAddress GlobalizedAddress(TAddress address) const { - if (address.is_remote()) return address; - return {address.local()->gid_, worker_id_}; - } - - /// Gets the local edge address for the given gid. Fails if not present. - mvcc::VersionList *LocalEdgeAddress(gid::Gid gid) const; - /// Gets names of properties stored on disk std::vector &PropertiesOnDisk() { return properties_on_disk_; } private: friend class GraphDbAccessor; friend class StorageGc; - friend class distributed::IndexRpcServer; - int worker_id_; gid::Generator vertex_generator_; gid::Generator edge_generator_; diff --git a/src/storage/single_node/vertex.hpp b/src/storage/single_node/vertex.hpp index 36a560eb4..71d989b73 100644 --- a/src/storage/single_node/vertex.hpp +++ b/src/storage/single_node/vertex.hpp @@ -4,7 +4,6 @@ #include "mvcc/single_node/version_list.hpp" #include "storage/common/property_value_store.hpp" #include "storage/common/types.hpp" -#include "storage/single_node/address.hpp" #include "storage/single_node/edges.hpp" class Vertex : public mvcc::Record { diff --git a/src/storage/single_node/vertex_accessor.cpp b/src/storage/single_node/vertex_accessor.cpp index f350bb75a..e2c4ed21f 100644 --- a/src/storage/single_node/vertex_accessor.cpp +++ b/src/storage/single_node/vertex_accessor.cpp @@ -6,7 +6,7 @@ #include "durability/single_node/state_delta.hpp" #include "utils/algorithm.hpp" -VertexAccessor::VertexAccessor(VertexAddress address, +VertexAccessor::VertexAccessor(mvcc::VersionList *address, database::GraphDbAccessor &db_accessor) : RecordAccessor(address, db_accessor) { Reconstruct(); @@ -17,7 +17,6 @@ size_t VertexAccessor::out_degree() const { return current().out_.size(); } size_t VertexAccessor::in_degree() const { return current().in_.size(); } void VertexAccessor::add_label(storage::Label label) { - CHECK(is_local()); auto &dba = db_accessor(); auto delta = database::StateDelta::AddLabel(dba.transaction_id(), gid(), label, dba.LabelName(label)); @@ -31,7 +30,6 @@ void VertexAccessor::add_label(storage::Label label) { } void VertexAccessor::remove_label(storage::Label label) { - CHECK(is_local()); auto &dba = db_accessor(); auto delta = database::StateDelta::RemoveLabel(dba.transaction_id(), gid(), label, dba.LabelName(label)); @@ -54,28 +52,22 @@ const std::vector &VertexAccessor::labels() const { return this->current().labels_; } -void VertexAccessor::RemoveOutEdge(storage::EdgeAddress edge) { +void VertexAccessor::RemoveOutEdge(mvcc::VersionList *edge) { auto &dba = db_accessor(); - auto delta = database::StateDelta::RemoveOutEdge( - dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge)); SwitchNew(); if (current().is_expired_by(dba.transaction())) return; - update().out_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge)); - ProcessDelta(delta); + update().out_.RemoveEdge(edge); } -void VertexAccessor::RemoveInEdge(storage::EdgeAddress edge) { +void VertexAccessor::RemoveInEdge(mvcc::VersionList *edge) { auto &dba = db_accessor(); - auto delta = database::StateDelta::RemoveInEdge( - dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge)); SwitchNew(); if (current().is_expired_by(dba.transaction())) return; - update().in_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge)); - ProcessDelta(delta); + update().in_.RemoveEdge(edge); } std::ostream &operator<<(std::ostream &os, const VertexAccessor &va) { diff --git a/src/storage/single_node/vertex_accessor.hpp b/src/storage/single_node/vertex_accessor.hpp index 6efeefc00..3cdfa6202 100644 --- a/src/storage/single_node/vertex_accessor.hpp +++ b/src/storage/single_node/vertex_accessor.hpp @@ -20,7 +20,6 @@ * takes care of MVCC versioning. */ class VertexAccessor final : public RecordAccessor { - using VertexAddress = storage::Address>; // Helper function for creating an iterator over edges. // @param begin - begin iterator // @param end - end iterator @@ -31,7 +30,8 @@ class VertexAccessor final : public RecordAccessor { // @return - Iterator over EdgeAccessors template static inline auto MakeAccessorIterator( - TIterator &&begin, TIterator &&end, bool from, VertexAddress vertex, + TIterator &&begin, TIterator &&end, bool from, + mvcc::VersionList *vertex, database::GraphDbAccessor &db_accessor) { return iter::imap( [from, vertex, &db_accessor](auto &edges_element) { @@ -49,7 +49,8 @@ class VertexAccessor final : public RecordAccessor { } public: - VertexAccessor(VertexAddress address, database::GraphDbAccessor &db_accessor); + VertexAccessor(mvcc::VersionList *address, + database::GraphDbAccessor &db_accessor); /** Returns the number of outgoing edges. */ size_t out_degree() const; @@ -97,9 +98,9 @@ class VertexAccessor final : public RecordAccessor { * or empty, the parameter is ignored. */ auto in(const std::vector *edge_types) const { - return MakeAccessorIterator( - current().in_.begin(storage::VertexAddress(nullptr), edge_types), - current().in_.end(), false, address(), db_accessor()); + return MakeAccessorIterator(current().in_.begin(nullptr, edge_types), + current().in_.end(), false, address(), + db_accessor()); } /** Returns EdgeAccessors for all outgoing edges. */ @@ -130,20 +131,20 @@ class VertexAccessor final : public RecordAccessor { * or empty, the parameter is ignored. */ auto out(const std::vector *edge_types) const { - return MakeAccessorIterator( - current().out_.begin(storage::VertexAddress(nullptr), edge_types), - current().out_.end(), true, address(), db_accessor()); + return MakeAccessorIterator(current().out_.begin(nullptr, edge_types), + current().out_.end(), true, address(), + db_accessor()); } /** Removes the given edge from the outgoing edges of this vertex. Note that * this operation should always be accompanied by the removal of the edge from * the incoming edges on the other side and edge deletion. */ - void RemoveOutEdge(storage::EdgeAddress edge); + void RemoveOutEdge(mvcc::VersionList *edge); /** Removes the given edge from the incoming edges of this vertex. Note that * this operation should always be accompanied by the removal of the edge from * the outgoing edges on the other side and edge deletion. */ - void RemoveInEdge(storage::EdgeAddress edge); + void RemoveInEdge(mvcc::VersionList *edge); }; std::ostream &operator<<(std::ostream &, const VertexAccessor &); diff --git a/tests/benchmark/mvcc.cpp b/tests/benchmark/mvcc.cpp index a79d59a87..24f3c3881 100644 --- a/tests/benchmark/mvcc.cpp +++ b/tests/benchmark/mvcc.cpp @@ -21,7 +21,7 @@ void MvccMix(benchmark::State &state) { state.PauseTiming(); tx::Engine engine; auto t1 = engine.Begin(); - mvcc::VersionList version_list(*t1, 0, 0); + mvcc::VersionList version_list(*t1, 0); engine.Commit(*t1); auto t2 = engine.Begin(); diff --git a/tests/manual/card_fraud_generate_snapshot.cpp b/tests/manual/card_fraud_generate_snapshot.cpp index e9211fd57..21e74584d 100644 --- a/tests/manual/card_fraud_generate_snapshot.cpp +++ b/tests/manual/card_fraud_generate_snapshot.cpp @@ -12,7 +12,7 @@ #include "communication/bolt/v1/encoder/base_encoder.hpp" #include "durability/distributed/snapshot_encoder.hpp" #include "durability/distributed/version.hpp" -#include "durability/paths.hpp" +#include "durability/distributed/paths.hpp" #include "storage/distributed/address_types.hpp" #include "utils/string.hpp" #include "utils/timer.hpp" diff --git a/tests/manual/snapshot_explorer.cpp b/tests/manual/snapshot_explorer.cpp index 772fabf96..d235e2c99 100644 --- a/tests/manual/snapshot_explorer.cpp +++ b/tests/manual/snapshot_explorer.cpp @@ -4,10 +4,9 @@ #include #include +#include "communication/bolt/v1/decoder/decoder.hpp" #include "durability/hashed_file_reader.hpp" #include "durability/single_node/recovery.hpp" -#include "durability/single_node/snapshot_decoder.hpp" -#include "durability/single_node/snapshot_value.hpp" #include "durability/single_node/version.hpp" DEFINE_string(snapshot_file, "", "Snapshot file location"); @@ -19,16 +18,16 @@ int main(int argc, char *argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); - // At the time this was written, the version was 6. This makes sure we update + // At the time this was written, the version was 7. This makes sure we update // the explorer when we bump the snapshot version. - static_assert(durability::kVersion == 6, + static_assert(durability::kVersion == 7, "Wrong snapshot version, please update!"); fs::path snapshot_path(FLAGS_snapshot_file); CHECK(fs::exists(snapshot_path)) << "File doesn't exist!"; HashedFileReader reader; - durability::SnapshotDecoder decoder(reader); + communication::bolt::Decoder decoder(reader); CHECK(reader.Open(snapshot_path)) << "Couldn't open snapshot file!"; @@ -53,9 +52,6 @@ int main(int argc, char *argv[]) { << "Snapshot version mismatch" << ", got " << dv.ValueInt() << " expected " << durability::kVersion; - decoder.ReadValue(&dv, Value::Type::Int); - LOG(INFO) << "Snapshot was generated for worker id: " << dv.ValueInt(); - decoder.ReadValue(&dv, Value::Type::Int); LOG(INFO) << "Vertex generator last id: " << dv.ValueInt(); @@ -87,7 +83,7 @@ int main(int argc, char *argv[]) { } for (int64_t i = 0; i < vertex_count; ++i) { - auto vertex = decoder.ReadSnapshotVertex(); + auto vertex = decoder.ReadValue(&dv, Value::Type::Vertex); CHECK(vertex) << "Failed to read vertex " << i; } diff --git a/tests/manual/snapshot_generation/snapshot_writer.hpp b/tests/manual/snapshot_generation/snapshot_writer.hpp index fa2b530a8..f15c58cc9 100644 --- a/tests/manual/snapshot_generation/snapshot_writer.hpp +++ b/tests/manual/snapshot_generation/snapshot_writer.hpp @@ -4,9 +4,9 @@ #include #include "communication/bolt/v1/encoder/base_encoder.hpp" +#include "durability/distributed/paths.hpp" #include "durability/distributed/version.hpp" #include "durability/hashed_file_writer.hpp" -#include "durability/paths.hpp" #include "glue/communication.hpp" #include "query/typed_value.hpp" #include "utils/file.hpp" diff --git a/tests/manual/wal_explorer.cpp b/tests/manual/wal_explorer.cpp index 9c39f041a..3c69163f0 100644 --- a/tests/manual/wal_explorer.cpp +++ b/tests/manual/wal_explorer.cpp @@ -29,14 +29,6 @@ std::string StateDeltaTypeToString(database::StateDelta::Type type) { return "CREATE_VERTEX"; case database::StateDelta::Type::CREATE_EDGE: return "CREATE_EDGE"; - case database::StateDelta::Type::ADD_OUT_EDGE: - return "ADD_OUT_EDGE"; - case database::StateDelta::Type::REMOVE_OUT_EDGE: - return "REMOVE_OUT_EDGE"; - case database::StateDelta::Type::ADD_IN_EDGE: - return "ADD_IN_EDGE"; - case database::StateDelta::Type::REMOVE_IN_EDGE: - return "REMOVE_IN_EDGE"; case database::StateDelta::Type::SET_PROPERTY_VERTEX: return "SET_PROPERTY_VERTEX"; case database::StateDelta::Type::SET_PROPERTY_EDGE: diff --git a/tests/qa/tck_engine/tests/memgraph_V1/features/functions.feature b/tests/qa/tck_engine/tests/memgraph_V1/features/functions.feature index f4afd9429..7f560f720 100644 --- a/tests/qa/tck_engine/tests/memgraph_V1/features/functions.feature +++ b/tests/qa/tck_engine/tests/memgraph_V1/features/functions.feature @@ -809,7 +809,7 @@ Feature: Functions MATCH (n) WITH n ORDER BY id(n) WITH COLLECT(id(n)) AS node_ids UNWIND node_ids AS node_id - RETURN (node_id - node_ids[0]) / 1024 AS id; + RETURN node_id - node_ids[0] AS id; """ Then the result should be: | id | @@ -828,7 +828,7 @@ Feature: Functions MATCH ()-[e]->() WITH e ORDER BY id(e) WITH COLLECT(id(e)) AS edge_ids UNWIND edge_ids AS edge_id - RETURN (edge_id - edge_ids[0]) / 1024 AS id; + RETURN edge_id - edge_ids[0] AS id; """ Then the result should be: | id | diff --git a/tests/unit/bfs_common.hpp b/tests/unit/bfs_common.hpp index 2e815fcf2..c21a21284 100644 --- a/tests/unit/bfs_common.hpp +++ b/tests/unit/bfs_common.hpp @@ -24,6 +24,15 @@ void PrintTo(const query::EdgeAtom::Direction &dir, std::ostream *os) { } } // namespace query +#ifdef MG_SINGLE_NODE +using VertexAddress = mvcc::VersionList *; +using EdgeAddress = mvcc::VersionList *; +#endif +#ifdef MG_DISTRIBUTED +using VertexAddress = storage::Address>; +using EdgeAddress = storage::Address>; +#endif + const auto kVertexCount = 6; // Maps vertices to workers const std::vector kVertexLocations = {0, 1, 1, 0, 2, 2}; @@ -195,8 +204,8 @@ class Database { bool existing_node, query::Expression *lower_bound, query::Expression *upper_bound, const query::plan::ExpansionLambda &filter_lambda) = 0; - virtual std::pair, - std::vector> + virtual std::pair, + std::vector> BuildGraph(database::GraphDbAccessor *dba, const std::vector &vertex_locations, const std::vector> &edges) = 0; @@ -208,7 +217,7 @@ class Database { // include query::TypedValue::Null to account for the optional match case. std::unique_ptr YieldVertices( database::GraphDbAccessor *dba, - std::vector vertices, query::Symbol symbol, + std::vector vertices, query::Symbol symbol, std::shared_ptr input_op) { std::vector> frames; frames.push_back(std::vector{query::TypedValue::Null}); @@ -223,8 +232,8 @@ std::unique_ptr YieldVertices( // Returns an operator that yields edges and vertices given by their address. std::unique_ptr YieldEntities( database::GraphDbAccessor *dba, - std::vector vertices, - std::vector edges, query::Symbol symbol, + std::vector vertices, + std::vector edges, query::Symbol symbol, std::shared_ptr input_op) { std::vector> frames; for (const auto &vertex : vertices) { @@ -312,8 +321,8 @@ void BfsTest(Database *db, int lower_bound, int upper_bound, context.symbol_table_[*inner_node] = inner_node_sym; context.symbol_table_[*inner_edge] = inner_edge_sym; - std::vector vertices; - std::vector edges; + std::vector vertices; + std::vector edges; std::tie(vertices, edges) = db->BuildGraph(dba_ptr.get(), kVertexLocations, kEdges); diff --git a/tests/unit/bfs_single_node.cpp b/tests/unit/bfs_single_node.cpp index 8f761b6b6..577e63e57 100644 --- a/tests/unit/bfs_single_node.cpp +++ b/tests/unit/bfs_single_node.cpp @@ -30,18 +30,18 @@ class SingleNodeDb : public Database { std::experimental::nullopt, GraphView::OLD); } - std::pair, - std::vector> + std::pair, + std::vector> BuildGraph( database::GraphDbAccessor *dba, const std::vector &vertex_locations, const std::vector> &edges) override { - std::vector vertex_addr; - std::vector edge_addr; + std::vector vertex_addr; + std::vector edge_addr; for (size_t id = 0; id < vertex_locations.size(); ++id) { auto vertex = dba->InsertVertex(); vertex.PropsSet(dba->Property("id"), (int64_t)id); - vertex_addr.push_back(vertex.GlobalAddress()); + vertex_addr.push_back(vertex.address()); } for (auto e : edges) { @@ -53,7 +53,7 @@ class SingleNodeDb : public Database { auto edge = dba->InsertEdge(from, to, dba->EdgeType(type)); edge.PropsSet(dba->Property("from"), u); edge.PropsSet(dba->Property("to"), v); - edge_addr.push_back(edge.GlobalAddress()); + edge_addr.push_back(edge.address()); } return std::make_pair(vertex_addr, edge_addr); diff --git a/tests/unit/database_key_index.cpp b/tests/unit/database_key_index.cpp index 0b61f491e..5a23ba777 100644 --- a/tests/unit/database_key_index.cpp +++ b/tests/unit/database_key_index.cpp @@ -19,7 +19,7 @@ TEST(LabelsIndex, UniqueInsert) { tx::Engine engine; auto t1 = engine.Begin(); - mvcc::VersionList vlist(*t1, 0, 0); + mvcc::VersionList vlist(*t1, 0); engine.Commit(*t1); auto t2 = engine.Begin(); @@ -48,8 +48,8 @@ TEST(LabelsIndex, UniqueFilter) { tx::Engine engine; auto t1 = engine.Begin(); - mvcc::VersionList vlist1(*t1, 0, 0); - mvcc::VersionList vlist2(*t1, 1, 1); + mvcc::VersionList vlist1(*t1, 0); + mvcc::VersionList vlist2(*t1, 1); engine.Advance(t1->id_); auto r1v1 = vlist1.find(*t1); auto r1v2 = vlist2.find(*t1); @@ -89,8 +89,8 @@ TEST(LabelsIndex, Refresh) { // add two vertices to database auto t1 = engine.Begin(); - mvcc::VersionList vlist1(*t1, 0, 0); - mvcc::VersionList vlist2(*t1, 1, 1); + mvcc::VersionList vlist1(*t1, 0); + mvcc::VersionList vlist2(*t1, 1); engine.Advance(t1->id_); auto v1r1 = vlist1.find(*t1); diff --git a/tests/unit/database_label_property_index.cpp b/tests/unit/database_label_property_index.cpp index 947b0dbac..3d77e4887 100644 --- a/tests/unit/database_label_property_index.cpp +++ b/tests/unit/database_label_property_index.cpp @@ -25,7 +25,7 @@ class LabelPropertyIndexComplexTest : public ::testing::Test { index.IndexFinishedBuilding(*key); t = engine.Begin(); - vlist = new mvcc::VersionList(*t, 0, 0); + vlist = new mvcc::VersionList(*t, 0); engine.Advance(t->id_); vertex = vlist->find(*t); diff --git a/tests/unit/distributed_durability.cpp b/tests/unit/distributed_durability.cpp index 35074b450..138425b2f 100644 --- a/tests/unit/distributed_durability.cpp +++ b/tests/unit/distributed_durability.cpp @@ -5,7 +5,7 @@ #include "database/distributed/graph_db_accessor.hpp" #include "durability/distributed/snapshooter.hpp" #include "durability/distributed/version.hpp" -#include "durability/paths.hpp" +#include "durability/distributed/paths.hpp" #include "utils/string.hpp" std::vector DirFiles(fs::path dir) { diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index 22a239c1d..83cba1c90 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -13,15 +13,17 @@ // TODO: FIXME // #include "database/distributed/distributed_graph_db.hpp" +#include "communication/bolt/v1/decoder/decoder.hpp" #include "database/single_node/graph_db.hpp" #include "database/single_node/graph_db_accessor.hpp" #include "durability/hashed_file_reader.hpp" -#include "durability/paths.hpp" +#include "durability/single_node/paths.hpp" #include "durability/single_node/recovery.hpp" #include "durability/single_node/snapshooter.hpp" -#include "durability/single_node/snapshot_decoder.hpp" #include "durability/single_node/state_delta.hpp" #include "durability/single_node/version.hpp" +// TODO: Why do we depend on TypedValue here? +#include "query/typed_value.hpp" #include "utils/string.hpp" DECLARE_int32(wal_flush_interval_millis); @@ -314,10 +316,9 @@ class Durability : public ::testing::Test { return config; } - void MakeSnapshot(int worker_id, database::GraphDb &db, - int snapshot_max_retained = -1) { + void MakeSnapshot(database::GraphDb &db, int snapshot_max_retained = -1) { auto dba = db.Access(); - ASSERT_TRUE(durability::MakeSnapshot(db, *dba, worker_id, durability_dir_, + ASSERT_TRUE(durability::MakeSnapshot(db, *dba, durability_dir_, snapshot_max_retained)); } @@ -337,7 +338,7 @@ class Durability : public ::testing::Test { // Tests wal encoder to encode correctly non-CRUD deltas, and that all deltas // are written in the correct order TEST_F(Durability, WalEncoding) { - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); auto gid1 = generator.Next(); { @@ -413,7 +414,7 @@ TEST_F(Durability, WalEncoding) { } TEST_F(Durability, SnapshotEncoding) { - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); auto gid1 = generator.Next(); auto gid2 = generator.Next(); @@ -439,12 +440,12 @@ TEST_F(Durability, SnapshotEncoding) { ASSERT_EQ(e1.gid(), gid1); dba->BuildIndex(dba->Label("l1"), dba->Property("p1")); dba->Commit(); - MakeSnapshot(0, db); + MakeSnapshot(db); } auto snapshot = GetLastFile(snapshot_dir_); HashedFileReader buffer; - durability::SnapshotDecoder decoder(buffer); + communication::bolt::Decoder decoder(buffer); int64_t vertex_count, edge_count; uint64_t hash; @@ -461,15 +462,6 @@ TEST_F(Durability, SnapshotEncoding) { communication::bolt::Value dv; decoder.ReadValue(&dv); ASSERT_EQ(dv.ValueInt(), durability::kVersion); - // Worker id - decoder.ReadValue(&dv); - ASSERT_EQ(dv.ValueInt(), 0); - // Number of generated vertex ids. - decoder.ReadValue(&dv); - ASSERT_TRUE(dv.IsInt()); - // Number of generated edge ids. - decoder.ReadValue(&dv); - ASSERT_TRUE(dv.IsInt()); // Transaction ID. decoder.ReadValue(&dv); ASSERT_TRUE(dv.IsInt()); @@ -482,13 +474,14 @@ TEST_F(Durability, SnapshotEncoding) { EXPECT_EQ(dv.ValueList()[0].ValueString(), "l1"); EXPECT_EQ(dv.ValueList()[1].ValueString(), "p1"); - std::map decoded_vertices; + std::map decoded_vertices; // Decode vertices. for (int i = 0; i < vertex_count; ++i) { - auto vertex = decoder.ReadSnapshotVertex(); - ASSERT_NE(vertex, std::experimental::nullopt); - decoded_vertices.emplace(vertex->gid, *vertex); + ASSERT_TRUE( + decoder.ReadValue(&dv, communication::bolt::Value::Type::Vertex)); + auto &vertex = dv.ValueVertex(); + decoded_vertices.emplace(vertex.id.AsUint(), vertex); } ASSERT_EQ(decoded_vertices.size(), 3); ASSERT_EQ(decoded_vertices[gid0].labels.size(), 1); @@ -504,13 +497,9 @@ TEST_F(Durability, SnapshotEncoding) { // Decode edges. for (int i = 0; i < edge_count; ++i) { - decoder.ReadValue(&dv); - ASSERT_EQ(dv.type(), communication::bolt::Value::Type::Edge); + ASSERT_TRUE(decoder.ReadValue(&dv, communication::bolt::Value::Type::Edge)); auto &edge = dv.ValueEdge(); decoded_edges.emplace(edge.id.AsUint(), edge); - // Read cypher_id. - decoder.ReadValue(&dv); - ASSERT_EQ(dv.type(), communication::bolt::Value::Type::Int); } EXPECT_EQ(decoded_edges.size(), 2); EXPECT_EQ(decoded_edges[gid0].from.AsUint(), gid0); @@ -536,7 +525,7 @@ TEST_F(Durability, SnapshotRecovery) { MakeDb(db, 300, {0, 1, 2}); MakeDb(db, 300); MakeDb(db, 300, {3, 4}); - MakeSnapshot(0, db); + MakeSnapshot(db); { auto recovered_config = DbConfig(); recovered_config.db_recover_on_startup = true; @@ -545,71 +534,6 @@ TEST_F(Durability, SnapshotRecovery) { } } -TEST_F(Durability, SnapshotNoVerticesIdRecovery) { - database::GraphDb db{DbConfig()}; - MakeDb(db, 10); - - // Erase all vertices, this should cause snapshot to not have any more - // vertices which should make it not change any id after snapshot recovery, - // but we still have to make sure that the id for generators is recovered - { - auto dba = db.Access(); - for (auto vertex : dba->Vertices(false)) dba->RemoveVertex(vertex); - dba->Commit(); - } - - MakeSnapshot(0, db); - { - auto recovered_config = DbConfig(); - recovered_config.db_recover_on_startup = true; - database::GraphDb recovered{recovered_config}; - EXPECT_EQ(db.storage().VertexGenerator().LocalCount(), - recovered.storage().VertexGenerator().LocalCount()); - EXPECT_EQ(db.storage().EdgeGenerator().LocalCount(), - recovered.storage().EdgeGenerator().LocalCount()); - } -} - -TEST_F(Durability, SnapshotAndWalIdRecovery) { - auto config = DbConfig(); - config.durability_enabled = true; - database::GraphDb db{config}; - MakeDb(db, 300); - MakeSnapshot(0, db); - MakeDb(db, 300); - db.wal().Flush(); - ASSERT_EQ(DirFiles(snapshot_dir_).size(), 1); - EXPECT_GT(DirFiles(wal_dir_).size(), 1); - { - auto recovered_config = DbConfig(); - recovered_config.db_recover_on_startup = true; - database::GraphDb recovered{recovered_config}; - EXPECT_EQ(db.storage().VertexGenerator().LocalCount(), - recovered.storage().VertexGenerator().LocalCount()); - EXPECT_EQ(db.storage().EdgeGenerator().LocalCount(), - recovered.storage().EdgeGenerator().LocalCount()); - } -} - -TEST_F(Durability, OnlyWalIdRecovery) { - auto config = DbConfig(); - config.durability_enabled = true; - database::GraphDb db{config}; - MakeDb(db, 300); - db.wal().Flush(); - ASSERT_EQ(DirFiles(snapshot_dir_).size(), 0); - EXPECT_GT(DirFiles(wal_dir_).size(), 1); - { - auto recovered_config = DbConfig(); - recovered_config.db_recover_on_startup = true; - database::GraphDb recovered{recovered_config}; - EXPECT_EQ(db.storage().VertexGenerator().LocalCount(), - recovered.storage().VertexGenerator().LocalCount()); - EXPECT_EQ(db.storage().EdgeGenerator().LocalCount(), - recovered.storage().EdgeGenerator().LocalCount()); - } -} - TEST_F(Durability, WalRecovery) { auto modify_config = [](database::Config config, bool durability_enabled, bool synchronous_commit) { @@ -649,7 +573,7 @@ TEST_F(Durability, SnapshotAndWalRecovery) { database::GraphDb db{config}; MakeDb(db, 300, {0, 1, 2}); MakeDb(db, 300); - MakeSnapshot(0, db); + MakeSnapshot(db); MakeDb(db, 300, {3, 4}); MakeDb(db, 300); MakeDb(db, 300, {5}); @@ -685,7 +609,7 @@ TEST_F(Durability, SnapshotAndWalRecoveryAfterComplexTxSituation) { MakeDb(*dba_3, 100); dba_3->Commit(); - MakeSnapshot(0, db); // Snapshooter takes the fourth transaction. + MakeSnapshot(db); // Snapshooter takes the fourth transaction. dba_2->Commit(); // The fifth transaction starts and commits after snapshot. @@ -747,7 +671,7 @@ TEST_F(Durability, SnapshotRetention) { // Track the added snapshots to ensure the correct ones are pruned. std::unordered_set snapshots; for (int i = 0; i < count; ++i) { - MakeSnapshot(0, db, retain); + MakeSnapshot(db, retain); auto latest = GetLastFile(snapshot_dir_); snapshots.emplace(GetLastFile(snapshot_dir_)); // Ensures that the latest snapshot was not in the snapshots collection @@ -767,13 +691,13 @@ TEST_F(Durability, WalRetention) { config.durability_enabled = true; database::GraphDb db{config}; MakeDb(db, 100); - MakeSnapshot(0, db); + MakeSnapshot(db); MakeDb(db, 100); EXPECT_EQ(DirFiles(snapshot_dir_).size(), 1); db.wal().Flush(); // 1 current WAL file, plus retained ones EXPECT_GT(DirFiles(wal_dir_).size(), 1); - MakeSnapshot(0, db); + MakeSnapshot(db); db.wal().Flush(); } @@ -882,13 +806,13 @@ TEST_F(Durability, SequentialRecovery) { auto update_theads = run_updates(db, keep_running); std::this_thread::sleep_for(25ms); if (snapshot_during) { - MakeSnapshot(0, db); + MakeSnapshot(db); } std::this_thread::sleep_for(25ms); keep_running = false; for (auto &t : update_theads) t.join(); if (snapshot_after) { - MakeSnapshot(0, db); + MakeSnapshot(db); } db.wal().Flush(); @@ -926,7 +850,7 @@ TEST_F(Durability, ContainsDurabilityFilesSnapshot) { database::GraphDb db{DbConfig()}; auto dba = db.Access(); auto v0 = dba->InsertVertex(); - MakeSnapshot(0, db); + MakeSnapshot(db); } ASSERT_TRUE(durability::ContainsDurabilityFiles(durability_dir_)); } @@ -949,7 +873,7 @@ TEST_F(Durability, MoveToBackupSnapshot) { database::GraphDb db{DbConfig()}; auto dba = db.Access(); auto v0 = dba->InsertVertex(); - MakeSnapshot(0, db); + MakeSnapshot(db); } // durability-enabled=true, db-recover-on-startup=false diff --git a/tests/unit/graph_db_accessor.cpp b/tests/unit/graph_db_accessor.cpp index 963169384..c2c323b42 100644 --- a/tests/unit/graph_db_accessor.cpp +++ b/tests/unit/graph_db_accessor.cpp @@ -19,7 +19,7 @@ auto Count(TIterable iterable) { TEST(GraphDbAccessorTest, InsertVertex) { GraphDb db; auto accessor = db.Access(); - gid::Generator generator(0); + gid::Generator generator; EXPECT_EQ(Count(accessor->Vertices(false)), 0); diff --git a/tests/unit/kvstore.cpp b/tests/unit/kvstore.cpp index 125fc1841..b2e96cac9 100644 --- a/tests/unit/kvstore.cpp +++ b/tests/unit/kvstore.cpp @@ -3,7 +3,6 @@ #include #include -#include "durability/paths.hpp" #include "storage/kvstore/kvstore.hpp" #include "utils/file.hpp" diff --git a/tests/unit/mvcc.cpp b/tests/unit/mvcc.cpp index 55027db1b..bc88f07d6 100644 --- a/tests/unit/mvcc.cpp +++ b/tests/unit/mvcc.cpp @@ -15,8 +15,8 @@ TEST(MVCC, Deadlock) { tx::Engine engine; auto t0 = engine.Begin(); - mvcc::VersionList version_list1(*t0, 0, 0); - mvcc::VersionList version_list2(*t0, 1, 1); + mvcc::VersionList version_list1(*t0, 0); + mvcc::VersionList version_list2(*t0, 1); engine.Commit(*t0); auto t1 = engine.Begin(); @@ -34,7 +34,7 @@ TEST(MVCC, UpdateDontDelete) { { tx::Engine engine; auto t1 = engine.Begin(); - mvcc::VersionList version_list(*t1, 0, 0, count); + mvcc::VersionList version_list(*t1, 0, count); engine.Commit(*t1); auto t2 = engine.Begin(); @@ -58,7 +58,7 @@ TEST(MVCC, UpdateDontDelete) { TEST(MVCC, Oldest) { tx::Engine engine; auto t1 = engine.Begin(); - mvcc::VersionList version_list(*t1, 0, 0); + mvcc::VersionList version_list(*t1, 0); auto first = version_list.Oldest(); EXPECT_NE(first, nullptr); // TODO Gleich: no need to do 10 checks of the same thing diff --git a/tests/unit/mvcc_find_update_common.hpp b/tests/unit/mvcc_find_update_common.hpp index b03236c88..f5d29b7ae 100644 --- a/tests/unit/mvcc_find_update_common.hpp +++ b/tests/unit/mvcc_find_update_common.hpp @@ -60,7 +60,7 @@ class Mvcc : public ::testing::Test { int version_list_size = 0; tx::Engine engine; tx::Transaction *t1 = engine.Begin(); - mvcc::VersionList version_list{*t1, 0, 0, version_list_size}; + mvcc::VersionList version_list{*t1, 0, version_list_size}; TestClass *v1 = nullptr; tx::Transaction *t2 = nullptr; tx::TransactionId id0, id1, id2; diff --git a/tests/unit/mvcc_gc.cpp b/tests/unit/mvcc_gc.cpp index 29e833251..50e3d133a 100644 --- a/tests/unit/mvcc_gc.cpp +++ b/tests/unit/mvcc_gc.cpp @@ -24,7 +24,7 @@ class MvccGcTest : public ::testing::Test { protected: std::atomic record_destruction_count{0}; - mvcc::VersionList version_list{*t0, 0, 0, + mvcc::VersionList version_list{*t0, 0, record_destruction_count}; std::vector transactions{t0}; @@ -125,7 +125,7 @@ TEST(GarbageCollector, GcClean) { auto t1 = engine.Begin(); std::atomic record_destruction_count{0}; auto vl = - new mvcc::VersionList(*t1, 0, 0, record_destruction_count); + new mvcc::VersionList(*t1, 0, record_destruction_count); auto access = collection.access(); access.insert(0, vl); engine.Commit(*t1); diff --git a/tests/unit/query_expression_evaluator.cpp b/tests/unit/query_expression_evaluator.cpp index 35104e71e..a544f0f44 100644 --- a/tests/unit/query_expression_evaluator.cpp +++ b/tests/unit/query_expression_evaluator.cpp @@ -1351,22 +1351,26 @@ TEST_F(FunctionTest, Id) { auto vb = dba->InsertVertex(); EXPECT_EQ(EvaluateFunction("ID", {va}).ValueInt(), 0); EXPECT_EQ(EvaluateFunction("ID", {ea}).ValueInt(), 0); - EXPECT_EQ(EvaluateFunction("ID", {vb}).ValueInt(), 1024); + EXPECT_EQ(EvaluateFunction("ID", {vb}).ValueInt(), 1); EXPECT_THROW(EvaluateFunction("ID", {}), QueryRuntimeException); EXPECT_THROW(EvaluateFunction("ID", {0}), QueryRuntimeException); EXPECT_THROW(EvaluateFunction("ID", {va, ea}), QueryRuntimeException); } +/* TODO: FIXME TEST_F(FunctionTest, WorkerIdException) { auto va = dba->InsertVertex(); EXPECT_THROW(EvaluateFunction("WORKERID", {}), QueryRuntimeException); EXPECT_THROW(EvaluateFunction("WORKERID", {va, va}), QueryRuntimeException); } +*/ +/* TODO: FIXME TEST_F(FunctionTest, WorkerIdSingleNode) { auto va = dba->InsertVertex(); EXPECT_EQ(EvaluateFunction("WORKERID", {va}).ValueInt(), 0); } +*/ TEST_F(FunctionTest, ToStringNull) { EXPECT_TRUE(EvaluateFunction("TOSTRING", {TypedValue::Null}).IsNull()); diff --git a/tests/unit/record_edge_vertex_accessor.cpp b/tests/unit/record_edge_vertex_accessor.cpp index 525a816ec..03d83b961 100644 --- a/tests/unit/record_edge_vertex_accessor.cpp +++ b/tests/unit/record_edge_vertex_accessor.cpp @@ -7,7 +7,6 @@ #include "database/single_node/graph_db_accessor.hpp" #include "mvcc/single_node/version_list.hpp" #include "storage/common/property_value.hpp" -#include "storage/single_node/address.hpp" #include "storage/single_node/edge_accessor.hpp" #include "storage/single_node/vertex.hpp" #include "storage/single_node/vertex_accessor.hpp" @@ -60,18 +59,6 @@ TEST(RecordAccessor, RecordEquality) { EXPECT_NE(e1, e2); } -TEST(RecordAccessor, GlobalToLocalAddressConversion) { - database::GraphDb db; - auto dba = db.Access(); - - auto v1 = dba->InsertVertex(); - storage::Address> global_address{v1.gid(), 0}; - EXPECT_FALSE(global_address.is_local()); - auto v1_from_global = VertexAccessor(global_address, *dba); - EXPECT_TRUE(v1_from_global.address().is_local()); - EXPECT_EQ(v1_from_global.address(), v1.address()); -} - TEST(RecordAccessor, SwitchOldAndSwitchNewMemberFunctionTest) { database::GraphDb db; diff --git a/tests/unit/state_delta.cpp b/tests/unit/state_delta.cpp index 764193508..0e858b3a5 100644 --- a/tests/unit/state_delta.cpp +++ b/tests/unit/state_delta.cpp @@ -6,12 +6,12 @@ TEST(StateDelta, CreateVertex) { database::GraphDb db; - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); { auto dba = db.Access(); auto delta = - database::StateDelta::CreateVertex(dba->transaction_id(), gid0, 0); + database::StateDelta::CreateVertex(dba->transaction_id(), gid0); delta.Apply(*dba); dba->Commit(); } @@ -25,7 +25,7 @@ TEST(StateDelta, CreateVertex) { TEST(StateDelta, RemoveVertex) { database::GraphDb db; - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); { auto dba = db.Access(); @@ -48,7 +48,7 @@ TEST(StateDelta, RemoveVertex) { TEST(StateDelta, CreateEdge) { database::GraphDb db; - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); auto gid1 = generator.Next(); auto gid2 = generator.Next(); @@ -61,7 +61,7 @@ TEST(StateDelta, CreateEdge) { { auto dba = db.Access(); auto delta = - database::StateDelta::CreateEdge(dba->transaction_id(), gid2, 0, gid0, + database::StateDelta::CreateEdge(dba->transaction_id(), gid2, gid0, gid1, dba->EdgeType("edge"), "edge"); delta.Apply(*dba); dba->Commit(); @@ -75,7 +75,7 @@ TEST(StateDelta, CreateEdge) { TEST(StateDelta, RemoveEdge) { database::GraphDb db; - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); auto gid1 = generator.Next(); auto gid2 = generator.Next(); @@ -101,7 +101,7 @@ TEST(StateDelta, RemoveEdge) { TEST(StateDelta, AddLabel) { database::GraphDb db; - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); { auto dba = db.Access(); @@ -127,7 +127,7 @@ TEST(StateDelta, AddLabel) { TEST(StateDelta, RemoveLabel) { database::GraphDb db; - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); { auto dba = db.Access(); @@ -153,7 +153,7 @@ TEST(StateDelta, RemoveLabel) { TEST(StateDelta, SetPropertyVertex) { database::GraphDb db; - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); { auto dba = db.Access(); @@ -179,7 +179,7 @@ TEST(StateDelta, SetPropertyVertex) { TEST(StateDelta, SetPropertyEdge) { database::GraphDb db; - gid::Generator generator(0); + gid::Generator generator; auto gid0 = generator.Next(); auto gid1 = generator.Next(); auto gid2 = generator.Next(); diff --git a/tools/apollo/apollo_runs.yaml b/tools/apollo/apollo_runs.yaml index 50d6e84c0..f45f0095e 100644 --- a/tools/apollo/apollo_runs.yaml +++ b/tools/apollo/apollo_runs.yaml @@ -11,6 +11,7 @@ - \./memgraph/tools/apollo/\.cppcheck_errors - name: code_coverage + project: ^NEVER$ # TODO (mferencevic): remove when the coverage is split into two (single node and distributed) type: data process require_runs: ^unit__.+ # regex to match all unit runs commands: TIMEOUT=300 ./coverage_convert diff --git a/tools/src/mg_import_csv/main.cpp b/tools/src/mg_import_csv/main.cpp index 0385855f6..113c41e83 100644 --- a/tools/src/mg_import_csv/main.cpp +++ b/tools/src/mg_import_csv/main.cpp @@ -10,13 +10,11 @@ #include #include "config.hpp" +#include "communication/bolt/v1/encoder/base_encoder.hpp" #include "durability/hashed_file_writer.hpp" -#include "durability/paths.hpp" +#include "durability/single_node/paths.hpp" #include "durability/single_node/snapshooter.hpp" -#include "durability/single_node/snapshot_encoder.hpp" -#include "durability/single_node/snapshot_value.hpp" #include "durability/single_node/version.hpp" -#include "storage/single_node/address_types.hpp" #include "utils/cast.hpp" #include "utils/string.hpp" #include "utils/timer.hpp" @@ -156,7 +154,7 @@ class MemgraphNodeIdMap { } private: - gid::Generator generator_{0}; + gid::Generator generator_; std::unordered_map node_id_to_mg_; }; @@ -262,7 +260,7 @@ std::string GetIdSpace(const std::string &type) { } void WriteNodeRow( - std::unordered_map &partial_vertices, + std::unordered_map &partial_vertices, const std::vector &fields, const std::vector &row, const std::vector &additional_labels, MemgraphNodeIdMap &node_id_map) { @@ -298,12 +296,12 @@ void WriteNodeRow( labels.insert(labels.end(), additional_labels.begin(), additional_labels.end()); CHECK(id) << "Node ID must be specified"; - partial_vertices[*id] = { - *id, utils::MemcpyCast(*id), labels, properties, {}}; + partial_vertices[*id] = {communication::bolt::Id::FromUint(*id), labels, + properties}; } auto PassNodes( - std::unordered_map &partial_vertices, + std::unordered_map &partial_vertices, const std::string &nodes_path, MemgraphNodeIdMap &node_id_map, const std::vector &additional_labels) { int64_t node_count = 0; @@ -395,10 +393,10 @@ void Convert(const std::vector &nodes, const std::string &output_path) { try { HashedFileWriter buffer(output_path); - durability::SnapshotEncoder encoder(buffer); + communication::bolt::BaseEncoder encoder(buffer); int64_t node_count = 0; int64_t edge_count = 0; - gid::Generator relationship_id_generator(0); + gid::Generator relationship_id_generator; MemgraphNodeIdMap node_id_map; // Snapshot file has the following contents in order: // 1) Magic number. @@ -413,20 +411,10 @@ void Convert(const std::vector &nodes, durability::kSnapshotMagic.size()); encoder.WriteValue(durability::kVersion); - encoder.WriteInt(0); // Worker Id - for this use case it's okay to set to 0 - // since we are using a single-node version of - // memgraph here - // The following two entries indicate the starting points for generating new - // Vertex/Edge IDs in the DB. They are only important when there are - // vertices/edges that were moved to another worker (in distributed - // Memgraph), so it's safe to set them to 0 in snapshot generation. - encoder.WriteInt(0); // Internal Id of vertex generator - encoder.WriteInt(0); // Internal Id of edge generator - encoder.WriteInt(0); // Id of transaction that is snapshooting. encoder.WriteList({}); // Transactional snapshot. encoder.WriteList({}); // Label + property indexes. - std::unordered_map vertices; + std::unordered_map vertices; std::unordered_map edges; for (const auto &nodes_file : nodes) { node_count += @@ -436,66 +424,15 @@ void Convert(const std::vector &nodes, edge_count += PassRelationships(edges, relationships_file, node_id_map, relationship_id_generator); } - for (auto edge : edges) { - auto encoded = edge.second; - auto edge_address = storage::EdgeAddress(encoded.id.AsUint(), 0); - vertices[encoded.from.AsUint()].out.push_back( - {edge_address, storage::VertexAddress(encoded.to.AsUint(), 0), - encoded.type}); - vertices[encoded.to.AsUint()].in.push_back( - {edge_address, storage::VertexAddress(encoded.from.AsUint(), 0), - encoded.type}); - } + for (auto vertex_pair : vertices) { auto &vertex = vertex_pair.second; - // write node - encoder.WriteRAW( - utils::UnderlyingCast(communication::bolt::Marker::TinyStruct) + 3); - encoder.WriteRAW( - utils::UnderlyingCast(communication::bolt::Signature::Node)); - - encoder.WriteInt(vertex.gid); - auto &labels = vertex.labels; - std::vector transformed; - std::transform(labels.begin(), labels.end(), - std::back_inserter(transformed), - [](const std::string &str) { - return communication::bolt::Value(str); - }); - encoder.WriteList(transformed); - encoder.WriteMap(vertex.properties); - - encoder.WriteInt(vertex.cypher_id); - - encoder.WriteInt(vertex.in.size()); - for (auto edge : vertex.in) { - encoder.WriteInt(edge.address.raw()); - encoder.WriteInt(edge.vertex.raw()); - encoder.WriteString(edge.type); - } - encoder.WriteInt(vertex.out.size()); - for (auto edge : vertex.out) { - encoder.WriteInt(edge.address.raw()); - encoder.WriteInt(edge.vertex.raw()); - encoder.WriteString(edge.type); - } + encoder.WriteVertex(vertex); } for (auto edge_pair : edges) { auto &edge = edge_pair.second; - // write relationship - encoder.WriteRAW( - utils::UnderlyingCast(communication::bolt::Marker::TinyStruct) + 5); - encoder.WriteRAW( - utils::UnderlyingCast(communication::bolt::Signature::Relationship)); - encoder.WriteInt(edge.id.AsInt()); - encoder.WriteInt(edge.from.AsInt()); - encoder.WriteInt(edge.to.AsInt()); - encoder.WriteString(edge.type); - encoder.WriteMap(edge.properties); - - // cypher_id - encoder.WriteInt(edge.id.AsInt()); + encoder.WriteEdge(edge); } buffer.WriteValue(node_count); @@ -557,10 +494,8 @@ std::string GetOutputPath() { } catch (const std::experimental::filesystem::filesystem_error &error) { LOG(FATAL) << error.what(); } - int worker_id = 0; - // TODO(dgleich): Remove this transaction id hack return std::string( - durability::MakeSnapshotPath(durability_dir, worker_id, 0)); + durability::MakeSnapshotPath(durability_dir, 0)); } int main(int argc, char *argv[]) { diff --git a/tools/tests/mg_recovery_check.cpp b/tools/tests/mg_recovery_check.cpp index 24e51765c..9c4416ee6 100644 --- a/tools/tests/mg_recovery_check.cpp +++ b/tools/tests/mg_recovery_check.cpp @@ -23,7 +23,7 @@ class RecoveryTest : public ::testing::Test { std::string durability_dir(FLAGS_durability_dir); durability::RecoveryData recovery_data; durability::RecoverOnlySnapshot(durability_dir, &db_, &recovery_data, - std::experimental::nullopt, 0); + std::experimental::nullopt); durability::RecoveryTransactions recovery_transactions(&db_); durability::RecoverWal(durability_dir, &db_, &recovery_data, &recovery_transactions);