From 81e2e8f64f0b3bdc747294e62d9853a26a37fec2 Mon Sep 17 00:00:00 2001 From: florijan Date: Tue, 6 Feb 2018 16:26:29 +0100 Subject: [PATCH] Add remote updates RPC Summary: Updates are supported, insertions and removals not in this diff. The test is a bit overdesigned, it happens. Reviewers: teon.banek, dgleich, msantl Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1176 --- src/communication/rpc/messages-inl.hpp | 8 +- src/database/graph_db.cpp | 36 ++- src/database/graph_db.hpp | 6 + src/database/state_delta.hpp | 40 ++++ src/distributed/remote_data_rpc_server.hpp | 5 +- .../remote_updates_rpc_clients.hpp | 45 ++++ .../remote_updates_rpc_messages.hpp | 35 +++ src/distributed/remote_updates_rpc_server.hpp | 221 ++++++++++++++++++ src/storage/record_accessor.cpp | 28 ++- src/storage/record_accessor.hpp | 46 ++-- tests/unit/distributed_common.hpp | 2 +- tests/unit/distributed_updates.cpp | 80 +++++-- 12 files changed, 488 insertions(+), 64 deletions(-) create mode 100644 src/distributed/remote_updates_rpc_clients.hpp create mode 100644 src/distributed/remote_updates_rpc_messages.hpp create mode 100644 src/distributed/remote_updates_rpc_server.hpp diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index b583852f7..269ac7996 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -4,11 +4,13 @@ #include "boost/archive/binary_oarchive.hpp" #include "boost/serialization/export.hpp" +#include "database/state_delta.hpp" #include "distributed/coordination_rpc_messages.hpp" #include "distributed/index_rpc_messages.hpp" #include "distributed/plan_rpc_messages.hpp" #include "distributed/remote_data_rpc_messages.hpp" #include "distributed/remote_pull_produce_rpc_messages.hpp" +#include "distributed/remote_updates_rpc_messages.hpp" #include "stats/stats_rpc_messages.hpp" #include "storage/concurrent_id_mapper_rpc_messages.hpp" #include "transactions/engine_rpc_messages.hpp" @@ -25,7 +27,6 @@ ID_VALUE_EXPORT_BOOST_TYPE(Property) #undef ID_VALUE_EXPORT_BOOST_TYPE - // Distributed transaction engine. BOOST_CLASS_EXPORT(tx::TxAndSnapshot); BOOST_CLASS_EXPORT(tx::BeginReq); @@ -80,3 +81,8 @@ BOOST_CLASS_EXPORT(stats::StatsReq); BOOST_CLASS_EXPORT(stats::StatsRes); BOOST_CLASS_EXPORT(stats::BatchStatsReq); BOOST_CLASS_EXPORT(stats::BatchStatsRes); + +// Remote updates. +BOOST_CLASS_EXPORT(database::StateDelta); +BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq); +BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes); diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 6e071d9dd..f74924ffa 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -11,6 +11,8 @@ #include "distributed/remote_data_rpc_server.hpp" #include "distributed/remote_produce_rpc_server.hpp" #include "distributed/remote_pull_rpc_clients.hpp" +#include "distributed/remote_updates_rpc_clients.hpp" +#include "distributed/remote_updates_rpc_server.hpp" #include "durability/paths.hpp" #include "durability/recovery.hpp" #include "durability/snapshooter.hpp" @@ -108,14 +110,26 @@ class SingleNode : public PrivateBase { distributed::PlanConsumer &plan_consumer() override { LOG(FATAL) << "Plan Consumer not available in single-node."; } + distributed::RemoteUpdatesRpcServer &remote_updates_server() override { + LOG(FATAL) << "Remote updates server not available in single-node."; + } + distributed::RemoteUpdatesRpcClients &remote_updates_clients() override { + LOG(FATAL) << "Remote updates clients not available in single-node."; + } }; -#define IMPL_DISTRIBUTED_GETTERS \ - distributed::RemoteDataRpcServer &remote_data_server() override { \ - return remote_data_server_; \ - } \ - distributed::RemoteDataRpcClients &remote_data_clients() override { \ - return remote_data_clients_; \ +#define IMPL_DISTRIBUTED_GETTERS \ + distributed::RemoteDataRpcServer &remote_data_server() override { \ + return remote_data_server_; \ + } \ + distributed::RemoteDataRpcClients &remote_data_clients() override { \ + return remote_data_clients_; \ + } \ + distributed::RemoteUpdatesRpcServer &remote_updates_server() override { \ + return remote_updates_server_; \ + } \ + distributed::RemoteUpdatesRpcClients &remote_updates_clients() override { \ + return remote_updates_clients_; \ } class Master : public PrivateBase { @@ -148,6 +162,8 @@ class Master : public PrivateBase { distributed::RemotePullRpcClients remote_pull_clients_{coordination_}; distributed::RpcWorkerClients index_rpc_clients_{coordination_, distributed::kIndexRpcName}; + distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_}; + distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; }; class Worker : public PrivateBase { @@ -179,6 +195,8 @@ class Worker : public PrivateBase { distributed::RemoteProduceRpcServer remote_produce_server_{*this, system_, plan_consumer_}; distributed::IndexRpcServer index_rpc_server_{*this, system_}; + distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_}; + distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; }; #undef IMPL_GETTERS @@ -241,6 +259,12 @@ distributed::RemotePullRpcClients &PublicBase::remote_pull_clients() { distributed::RemoteProduceRpcServer &PublicBase::remote_produce_server() { return impl_->remote_produce_server(); } +distributed::RemoteUpdatesRpcServer &PublicBase::remote_updates_server() { + return impl_->remote_updates_server(); +} +distributed::RemoteUpdatesRpcClients &PublicBase::remote_updates_clients() { + return impl_->remote_updates_clients(); +} void PublicBase::MakeSnapshot() { const bool status = durability::MakeSnapshot( diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 1f8b7bc83..bd64cfffd 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -21,6 +21,8 @@ class PlanDispatcher; class PlanConsumer; class RemotePullRpcClients; class RemoteProduceRpcServer; +class RemoteUpdatesRpcServer; +class RemoteUpdatesRpcClients; } namespace database { @@ -91,6 +93,8 @@ class GraphDb { // Supported only in distributed master and worker, not in single-node. virtual distributed::RemoteDataRpcServer &remote_data_server() = 0; virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0; + virtual distributed::RemoteUpdatesRpcServer &remote_updates_server() = 0; + virtual distributed::RemoteUpdatesRpcClients &remote_updates_clients() = 0; // Supported only in distributed master. virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0; @@ -135,6 +139,8 @@ class PublicBase : public GraphDb { distributed::PlanConsumer &plan_consumer() override; distributed::RemotePullRpcClients &remote_pull_clients() override; distributed::RemoteProduceRpcServer &remote_produce_server() override; + distributed::RemoteUpdatesRpcServer &remote_updates_server() override; + distributed::RemoteUpdatesRpcClients &remote_updates_clients() override; protected: explicit PublicBase(std::unique_ptr impl); diff --git a/src/database/state_delta.hpp b/src/database/state_delta.hpp index e5f84eba2..6f346b385 100644 --- a/src/database/state_delta.hpp +++ b/src/database/state_delta.hpp @@ -6,6 +6,7 @@ #include "durability/hashed_file_writer.hpp" #include "storage/gid.hpp" #include "storage/property_value.hpp" +#include "utils/serialization.hpp" namespace database { /** Describes single change to the database state. Used for durability (WAL) and @@ -107,5 +108,44 @@ struct StateDelta { PropertyValue value = PropertyValue::Null; storage::Label label; std::string label_name; + + private: + friend class boost::serialization::access; + BOOST_SERIALIZATION_SPLIT_MEMBER(); + template + void save(TArchive &ar, const unsigned int) const { + ar &type; + ar &transaction_id; + ar &vertex_id; + ar &edge_id; + ar &vertex_from_id; + ar &vertex_to_id; + ar &edge_type; + ar &edge_type_name; + ar &property; + ar &property_name; + utils::SaveTypedValue(ar, value); + ar &label; + ar &label_name; + } + + template + void load(TArchive &ar, const unsigned int) { + ar &type; + ar &transaction_id; + ar &vertex_id; + ar &edge_id; + ar &vertex_from_id; + ar &vertex_to_id; + ar &edge_type; + ar &edge_type_name; + ar &property; + ar &property_name; + query::TypedValue tv; + utils::LoadTypedValue(ar, tv); + value = tv; + ar &label; + ar &label_name; + } }; } // namespace database diff --git a/src/distributed/remote_data_rpc_server.hpp b/src/distributed/remote_data_rpc_server.hpp index dbe83a8a1..c4990f119 100644 --- a/src/distributed/remote_data_rpc_server.hpp +++ b/src/distributed/remote_data_rpc_server.hpp @@ -17,7 +17,7 @@ class RemoteDataRpcServer { // invalidation. public: RemoteDataRpcServer(database::GraphDb &db, communication::rpc::System &system) - : db_(db), system_(system) { + : db_(db), rpc_server_(system, kRemoteDataRpcName) { rpc_server_.Register([this](const RemoteVertexReq &req) { database::GraphDbAccessor dba(db_, req.member.tx_id); auto vertex = dba.FindVertexChecked(req.member.gid, false); @@ -36,7 +36,6 @@ class RemoteDataRpcServer { private: database::GraphDb &db_; - communication::rpc::System &system_; - communication::rpc::Server rpc_server_{system_, kRemoteDataRpcName}; + communication::rpc::Server rpc_server_; }; } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/remote_updates_rpc_clients.hpp new file mode 100644 index 000000000..d45ca2f1f --- /dev/null +++ b/src/distributed/remote_updates_rpc_clients.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include "database/state_delta.hpp" +#include "distributed/coordination.hpp" +#include "distributed/remote_updates_rpc_messages.hpp" +#include "distributed/rpc_worker_clients.hpp" +#include "transactions/type.hpp" + +namespace distributed { + +/// Exposes the functionality to send updates to other workers (that own the +/// graph element we are updating). Also enables us to call for a worker to +/// apply the accumulated deferred updates, or discard them. +class RemoteUpdatesRpcClients { + public: + explicit RemoteUpdatesRpcClients(distributed::Coordination &coordination) + : worker_clients_(coordination, kRemoteUpdatesRpc) {} + + /// Sends an update delta to the given worker. + RemoteUpdateResult RemoteUpdate(int worker_id, + const database::StateDelta &delta) { + return worker_clients_.GetClientPool(worker_id) + .Call(delta) + ->member; + } + + /// Calls for the worker with the given ID to apply remote updates. Returns + /// the results of that operation. + RemoteUpdateResult RemoteUpdateApply(int worker_id, + tx::transaction_id_t tx_id) { + return worker_clients_.GetClientPool(worker_id) + .Call(tx_id) + ->member; + } + + /// Calls for the worker with the given ID to discard remote updates. + void RemoteUpdateDiscard(int worker_id, tx::transaction_id_t tx_id) { + worker_clients_.GetClientPool(worker_id).Call( + tx_id); + } + + private: + RpcWorkerClients worker_clients_; +}; +} // namespace distributed diff --git a/src/distributed/remote_updates_rpc_messages.hpp b/src/distributed/remote_updates_rpc_messages.hpp new file mode 100644 index 000000000..53a18cdcd --- /dev/null +++ b/src/distributed/remote_updates_rpc_messages.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include "communication/rpc/messages.hpp" +#include "database/state_delta.hpp" +#include "transactions/type.hpp" + +namespace distributed { + +const std::string kRemoteUpdatesRpc = "RemoteUpdatesRpc"; + +/// The result of sending or applying a deferred update to a worker. +enum class RemoteUpdateResult { + DONE, + SERIALIZATION_ERROR, + LOCK_TIMEOUT_ERROR, + UPDATE_DELETED_ERROR +}; + +RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateReq, database::StateDelta); +RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateRes, RemoteUpdateResult); +using RemoteUpdateRpc = + communication::rpc::RequestResponse; + +RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyReq, tx::transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyRes, RemoteUpdateResult); +using RemoteUpdateApplyRpc = + communication::rpc::RequestResponse; + +RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateDiscardReq, tx::transaction_id_t); +RPC_NO_MEMBER_MESSAGE(RemoteUpdateDiscardRes); +using RemoteUpdateDiscardRpc = + communication::rpc::RequestResponse; +} // namespace distributed diff --git a/src/distributed/remote_updates_rpc_server.hpp b/src/distributed/remote_updates_rpc_server.hpp new file mode 100644 index 000000000..e0c0487ff --- /dev/null +++ b/src/distributed/remote_updates_rpc_server.hpp @@ -0,0 +1,221 @@ +#pragma once + +#include +#include +#include +#include + +#include "glog/logging.h" + +#include "communication/rpc/server.hpp" +#include "data_structures/concurrent/concurrent_map.hpp" +#include "database/graph_db.hpp" +#include "database/graph_db_accessor.hpp" +#include "database/state_delta.hpp" +#include "distributed/remote_updates_rpc_messages.hpp" +#include "mvcc/version_list.hpp" +#include "storage/gid.hpp" +#include "storage/record_accessor.hpp" +#include "storage/vertex_accessor.hpp" +#include "threading/sync/lock_timeout_exception.hpp" +#include "threading/sync/spinlock.hpp" +#include "transactions/type.hpp" + +namespace distributed { + +/// An RPC server that accepts and holds deferred updates (deltas) until it's +/// told to apply or discard them. The updates are organized and applied per +/// transaction in this single updates server. +/// +/// Attempts to get serialization and update-after-delete errors to happen as +/// soon as possible during query execution (fail fast). +class RemoteUpdatesRpcServer { + // Remote updates for one transaction. + template + class TransactionUpdates { + public: + TransactionUpdates(database::GraphDb &db, tx::transaction_id_t tx_id) + : db_accessor_(db, tx_id) {} + + /// Adds a delta and returns the result. Does not modify the state (data) of + /// the graph element the update is for, but calls the `update` method to + /// fail-fast on serialization and update-after-delete errors. + RemoteUpdateResult Emplace(const database::StateDelta &delta) { + auto gid = std::is_same::value + ? delta.vertex_id + : delta.edge_id; + std::lock_guard guard{lock_}; + auto found = deltas_.find(gid); + if (found == deltas_.end()) { + found = deltas_ + .emplace(gid, std::make_pair( + FindAccessor(gid), + std::vector{})) + .first; + } + + found->second.second.emplace_back(delta); + + // TODO call `RecordAccessor::update` to force serialization errors to + // fail-fast (as opposed to when all the deltas get applied). + // + // This is problematic because `VersionList::update` needs to become + // thread-safe within the same transaction. Note that the concurrency is + // possible both between the owner worker interpretation thread and an RPC + // thread (current thread), as well as multiple RPC threads if this + // object's lock is released (perhaps desirable). + // + // A potential solution *might* be that `LockStore::Lock` returns a `bool` + // indicating if the caller was the one obtaining the lock (not the same + // as lock already being held by the same transaction). + // + // Another thing that needs to be done (if we do this) is ensuring that + // `LockStore::Take` is thread-safe when called in parallel in the same + // transaction. Currently it's thread-safe only when called in parallel + // from different transactions (only one manages to take the RecordLock). + // + // Deferring the implementation of this as it's tricky, and essentially an + // optimization. + // + // try { + // found->second.first.update(); + // } catch (const mvcc::SerializationError &) { + // return RemoteUpdateResult::SERIALIZATION_ERROR; + // } catch (const RecordDeletedError &) { + // return RemoteUpdateResult::UPDATE_DELETED_ERROR; + // } catch (const LockTimeoutException &) { + // return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; + // } + return RemoteUpdateResult::DONE; + } + + /// Applies all the deltas on the record. + RemoteUpdateResult Apply() { + std::lock_guard guard{lock_}; + for (auto &kv : deltas_) { + for (database::StateDelta &delta : kv.second.second) { + try { + kv.second.first.ProcessDelta(delta); + } catch (const mvcc::SerializationError &) { + return RemoteUpdateResult::SERIALIZATION_ERROR; + } catch (const RecordDeletedError &) { + return RemoteUpdateResult::UPDATE_DELETED_ERROR; + } catch (const LockTimeoutException &) { + return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; + } + } + } + return RemoteUpdateResult::DONE; + } + + private: + database::GraphDbAccessor db_accessor_; + std::unordered_map< + gid::Gid, std::pair>> + deltas_; + // Multiple workers might be sending remote updates concurrently. + SpinLock lock_; + + // Helper method specialized for [Vertex|Edge]Accessor. + TRecordAccessor FindAccessor(gid::Gid gid); + }; + + public: + RemoteUpdatesRpcServer(database::GraphDb &db, + communication::rpc::System &system) + : db_(db), server_(system, kRemoteUpdatesRpc) { + server_.Register([this](const RemoteUpdateReq &req) { + using DeltaType = database::StateDelta::Type; + switch (req.member.type) { + case DeltaType::SET_PROPERTY_VERTEX: + case DeltaType::ADD_LABEL: + case DeltaType::REMOVE_LABEL: + return std::make_unique( + Process(vertex_updates_, req.member)); + case DeltaType::SET_PROPERTY_EDGE: + return std::make_unique( + Process(edge_updates_, req.member)); + default: + LOG(FATAL) << "Can't perform a remote update with delta type: " + << static_cast(req.member.type); + } + }); + + server_.Register( + [this](const RemoteUpdateApplyReq &req) { + return std::make_unique(Apply(req.member)); + }); + + server_.Register( + [this](const RemoteUpdateDiscardReq &req) { + Discard(req.member); + return std::make_unique(); + }); + } + + /// Applies all existsing updates for the given transaction ID. If there are + /// no updates for that transaction, nothing happens. Clears the updates cache + /// after applying them, regardless of the result. + RemoteUpdateResult Apply(tx::transaction_id_t tx_id) { + auto apply = [tx_id](auto &collection) { + auto access = collection.access(); + auto found = access.find(tx_id); + if (found == access.end()) { + return RemoteUpdateResult::DONE; + } + auto result = found->second.Apply(); + access.remove(tx_id); + return result; + }; + + auto vertex_result = apply(vertex_updates_); + auto edge_result = apply(edge_updates_); + if (vertex_result != RemoteUpdateResult::DONE) return vertex_result; + if (edge_result != RemoteUpdateResult::DONE) return edge_result; + return RemoteUpdateResult::DONE; + } + + /// Discards all the existing updates for the given transaction ID. + void Discard(tx::transaction_id_t tx_id) { + vertex_updates_.access().remove(tx_id); + edge_updates_.access().remove(tx_id); + } + + private: + database::GraphDb &db_; + communication::rpc::Server server_; + ConcurrentMap> + vertex_updates_; + ConcurrentMap> + edge_updates_; + + // Processes a single delta recieved in the RPC request. + template + RemoteUpdateResult Process(TCollection &updates, + const database::StateDelta &delta) { + auto tx_id = delta.transaction_id; + auto access = updates.access(); + auto &transaction_updates = + access + .emplace(tx_id, std::make_tuple(tx_id), + std::make_tuple(std::ref(db_), tx_id)) + .first->second; + + return transaction_updates.Emplace(delta); + } +}; + +template <> +inline VertexAccessor +RemoteUpdatesRpcServer::TransactionUpdates::FindAccessor( + gid::Gid gid) { + return db_accessor_.FindVertexChecked(gid, false); +} + +template <> +inline EdgeAccessor +RemoteUpdatesRpcServer::TransactionUpdates::FindAccessor( + gid::Gid gid) { + return db_accessor_.FindEdgeChecked(gid, false); +} +} // namespace distributed diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 3a67e5ac5..581f6bd10 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -2,9 +2,11 @@ #include "database/graph_db_accessor.hpp" #include "database/state_delta.hpp" +#include "distributed/remote_updates_rpc_clients.hpp" #include "storage/edge.hpp" #include "storage/record_accessor.hpp" #include "storage/vertex.hpp" +#include "threading/sync/lock_timeout_exception.hpp" using database::StateDelta; @@ -160,14 +162,10 @@ TRecord &RecordAccessor::update() const { } const auto &t = db_accessor_->transaction(); - { - const std::string err = - "Can't update a record deleted in the current transaction+commad"; - if (!new_ && old_->is_expired_by(t)) - throw RecordDeletedError(err); - else if (new_ && new_->is_expired_by(t)) - throw RecordDeletedError(err); - } + if (!new_ && old_->is_expired_by(t)) + throw RecordDeletedError(); + else if (new_ && new_->is_expired_by(t)) + throw RecordDeletedError(); if (new_) return *new_; @@ -227,8 +225,18 @@ void RecordAccessor::ProcessDelta( if (is_local()) { db_accessor().wal().Emplace(delta); } else { - // TODO use the delta to perform a remote update. - // TODO check for results (success, serialization_error, ...) + auto result = db_accessor().db().remote_updates_clients().RemoteUpdate( + address().worker_id(), delta); + switch (result) { + case distributed::RemoteUpdateResult::DONE: + break; + case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: + throw mvcc::SerializationError(); + case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: + throw RecordDeletedError(); + case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR: + throw LockTimeoutException("Lock timeout on remote worker"); + } } } diff --git a/src/storage/record_accessor.hpp b/src/storage/record_accessor.hpp index 6e4272a90..259eec7cc 100644 --- a/src/storage/record_accessor.hpp +++ b/src/storage/record_accessor.hpp @@ -121,6 +121,18 @@ class RecordAccessor : public TotalOrdering> { */ bool Reconstruct() const; + /** + * Ensures there is an updateable version of the record in the version_list, + * and that the `new_` pointer points to it. Returns a reference to that + * version. + * + * It is not legal to call this function on a Vertex/Edge that has been + * deleted in the current transaction+command. + * + * @throws RecordDeletedError + */ + TRecord &update() const; + /** * Returns true if the given accessor is visible to the given transaction. * @@ -134,6 +146,14 @@ class RecordAccessor : public TotalOrdering> { (current_state && new_ && !new_->is_expired_by(t)); } + /** + * Processes the delta that's a consequence of changes in this accessor. If + * the accessor is local that means writing the delta to the write-ahead log. + * If it's remote, then the delta needs to be sent to it's owner for + * processing. + */ + void ProcessDelta(const database::StateDelta &delta) const; + protected: /** * Pointer to the version (either old_ or new_) that READ operations @@ -145,18 +165,6 @@ class RecordAccessor : public TotalOrdering> { */ mutable TRecord *current_{nullptr}; - /** - * Ensures there is an updateable version of the record in the version_list, - * and that the `new_` pointer points to it. Returns a reference to that - * version. - * - * It is not legal to call this function on a Vertex/Edge that has been - * deleted in the current transaction+command. - * - * @throws RecordDeletedError - */ - TRecord &update() const; - /** Returns the current version (either new_ or old_) set on this * RecordAccessor. */ const TRecord ¤t() const; @@ -165,14 +173,6 @@ class RecordAccessor : public TotalOrdering> { * owner is some other worker in a distributed system. */ bool is_local() const { return address_.is_local(); } - /** - * Processes the delta that's a consequence of changes in this accessor. If - * the accessor is local that means writing the delta to the write-ahead log. - * If it's remote, then the delta needs to be sent to it's owner for - * processing. - */ - void ProcessDelta(const database::StateDelta &delta) const; - private: // The database accessor for which this record accessor is created // Provides means of getting to the transaction and database functions. @@ -212,5 +212,9 @@ class RecordAccessor : public TotalOrdering> { /** Error when trying to update a deleted record */ class RecordDeletedError : public utils::BasicException { - using utils::BasicException::BasicException; + public: + RecordDeletedError() + : utils::BasicException( + "Can't update a record deleted in the current transaction+commad") { + } }; diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index 92b1a1401..1080de58f 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -11,7 +11,7 @@ class DistributedGraphDbTest : public ::testing::Test { class WorkerInThread { public: - WorkerInThread(database::Config config) : worker_(config) { + explicit WorkerInThread(database::Config config) : worker_(config) { thread_ = std::thread([this, config] { worker_.WaitForShutdown(); }); } diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index 090eba153..c9637bdb6 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -1,31 +1,67 @@ #include #include "database/graph_db_accessor.hpp" +#include "distributed/remote_updates_rpc_server.hpp" #include "distributed_common.hpp" -TEST_F(DistributedGraphDbTest, RemoteUpdateLocalVisibility) { - database::GraphDbAccessor dba_tx1{worker(1)}; - auto v = dba_tx1.InsertVertex(); - auto v_ga = v.GlobalAddress(); - dba_tx1.Commit(); +class DistributedUpdateTest : public DistributedGraphDbTest { + protected: + std::unique_ptr dba1; + std::unique_ptr dba2; + storage::Label label; + std::unique_ptr v1_dba1; + std::unique_ptr v1_dba2; - database::GraphDbAccessor dba_tx2_w2{worker(2)}; - v = VertexAccessor(v_ga, dba_tx2_w2); - ASSERT_FALSE(v.address().is_local()); - auto label = dba_tx2_w2.Label("l"); - EXPECT_FALSE(v.has_label(label)); - v.add_label(label); - v.SwitchNew(); - EXPECT_TRUE(v.has_label(label)); - v.SwitchOld(); - EXPECT_FALSE(v.has_label(label)); + void SetUp() override { + DistributedGraphDbTest::SetUp(); - // In the same transaction on the owning worker there is no label. - database::GraphDbAccessor dba_tx2_w1{worker(1), dba_tx2_w2.transaction_id()}; - v = VertexAccessor(v_ga, dba_tx2_w1); - v.SwitchOld(); - EXPECT_FALSE(v.has_label(label)); - v.SwitchNew(); - EXPECT_FALSE(v.has_label(label)); + database::GraphDbAccessor dba_tx1{worker(1)}; + auto v = dba_tx1.InsertVertex(); + auto v_ga = v.GlobalAddress(); + dba_tx1.Commit(); + + dba1 = std::make_unique(worker(1)); + dba2 = std::make_unique(worker(2), + dba1->transaction_id()); + + v1_dba1 = std::make_unique(v_ga, *dba1); + v1_dba2 = std::make_unique(v_ga, *dba2); + ASSERT_FALSE(v1_dba2->address().is_local()); + label = dba1->Label("l"); + v1_dba2->add_label(label); + } + + void TearDown() override { + dba2 = nullptr; + dba1 = nullptr; + DistributedGraphDbTest::TearDown(); + } +}; + +#define EXPECT_LABEL(var, old_result, new_result) \ + { \ + var->SwitchOld(); \ + EXPECT_EQ(var->has_label(label), old_result); \ + var->SwitchNew(); \ + EXPECT_EQ(var->has_label(label), new_result); \ + } + +TEST_F(DistributedUpdateTest, RemoteUpdateLocalOnly) { + EXPECT_LABEL(v1_dba2, false, true); + EXPECT_LABEL(v1_dba1, false, false); } + +TEST_F(DistributedUpdateTest, RemoteUpdateApply) { + EXPECT_LABEL(v1_dba1, false, false); + worker(1).remote_updates_server().Apply(dba1->transaction_id()); + EXPECT_LABEL(v1_dba1, false, true); +} + +TEST_F(DistributedUpdateTest, RemoteUpdateDiscard) { + EXPECT_LABEL(v1_dba1, false, false); + worker(1).remote_updates_server().Discard(dba1->transaction_id()); + EXPECT_LABEL(v1_dba1, false, false); +} + +#undef EXPECT_LABEL