Add remote updates RPC

Summary:
Updates are supported, insertions and removals not in this diff. The
test is a bit overdesigned, it happens.

Reviewers: teon.banek, dgleich, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1176
This commit is contained in:
florijan 2018-02-06 16:26:29 +01:00
parent fa55c130ba
commit 81e2e8f64f
12 changed files with 488 additions and 64 deletions

View File

@ -4,11 +4,13 @@
#include "boost/archive/binary_oarchive.hpp" #include "boost/archive/binary_oarchive.hpp"
#include "boost/serialization/export.hpp" #include "boost/serialization/export.hpp"
#include "database/state_delta.hpp"
#include "distributed/coordination_rpc_messages.hpp" #include "distributed/coordination_rpc_messages.hpp"
#include "distributed/index_rpc_messages.hpp" #include "distributed/index_rpc_messages.hpp"
#include "distributed/plan_rpc_messages.hpp" #include "distributed/plan_rpc_messages.hpp"
#include "distributed/remote_data_rpc_messages.hpp" #include "distributed/remote_data_rpc_messages.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp" #include "distributed/remote_pull_produce_rpc_messages.hpp"
#include "distributed/remote_updates_rpc_messages.hpp"
#include "stats/stats_rpc_messages.hpp" #include "stats/stats_rpc_messages.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.hpp" #include "storage/concurrent_id_mapper_rpc_messages.hpp"
#include "transactions/engine_rpc_messages.hpp" #include "transactions/engine_rpc_messages.hpp"
@ -25,7 +27,6 @@ ID_VALUE_EXPORT_BOOST_TYPE(Property)
#undef ID_VALUE_EXPORT_BOOST_TYPE #undef ID_VALUE_EXPORT_BOOST_TYPE
// Distributed transaction engine. // Distributed transaction engine.
BOOST_CLASS_EXPORT(tx::TxAndSnapshot); BOOST_CLASS_EXPORT(tx::TxAndSnapshot);
BOOST_CLASS_EXPORT(tx::BeginReq); BOOST_CLASS_EXPORT(tx::BeginReq);
@ -80,3 +81,8 @@ BOOST_CLASS_EXPORT(stats::StatsReq);
BOOST_CLASS_EXPORT(stats::StatsRes); BOOST_CLASS_EXPORT(stats::StatsRes);
BOOST_CLASS_EXPORT(stats::BatchStatsReq); BOOST_CLASS_EXPORT(stats::BatchStatsReq);
BOOST_CLASS_EXPORT(stats::BatchStatsRes); BOOST_CLASS_EXPORT(stats::BatchStatsRes);
// Remote updates.
BOOST_CLASS_EXPORT(database::StateDelta);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes);

View File

@ -11,6 +11,8 @@
#include "distributed/remote_data_rpc_server.hpp" #include "distributed/remote_data_rpc_server.hpp"
#include "distributed/remote_produce_rpc_server.hpp" #include "distributed/remote_produce_rpc_server.hpp"
#include "distributed/remote_pull_rpc_clients.hpp" #include "distributed/remote_pull_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_server.hpp"
#include "durability/paths.hpp" #include "durability/paths.hpp"
#include "durability/recovery.hpp" #include "durability/recovery.hpp"
#include "durability/snapshooter.hpp" #include "durability/snapshooter.hpp"
@ -108,6 +110,12 @@ class SingleNode : public PrivateBase {
distributed::PlanConsumer &plan_consumer() override { distributed::PlanConsumer &plan_consumer() override {
LOG(FATAL) << "Plan Consumer not available in single-node."; LOG(FATAL) << "Plan Consumer not available in single-node.";
} }
distributed::RemoteUpdatesRpcServer &remote_updates_server() override {
LOG(FATAL) << "Remote updates server not available in single-node.";
}
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override {
LOG(FATAL) << "Remote updates clients not available in single-node.";
}
}; };
#define IMPL_DISTRIBUTED_GETTERS \ #define IMPL_DISTRIBUTED_GETTERS \
@ -116,6 +124,12 @@ class SingleNode : public PrivateBase {
} \ } \
distributed::RemoteDataRpcClients &remote_data_clients() override { \ distributed::RemoteDataRpcClients &remote_data_clients() override { \
return remote_data_clients_; \ return remote_data_clients_; \
} \
distributed::RemoteUpdatesRpcServer &remote_updates_server() override { \
return remote_updates_server_; \
} \
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override { \
return remote_updates_clients_; \
} }
class Master : public PrivateBase { class Master : public PrivateBase {
@ -148,6 +162,8 @@ class Master : public PrivateBase {
distributed::RemotePullRpcClients remote_pull_clients_{coordination_}; distributed::RemotePullRpcClients remote_pull_clients_{coordination_};
distributed::RpcWorkerClients index_rpc_clients_{coordination_, distributed::RpcWorkerClients index_rpc_clients_{coordination_,
distributed::kIndexRpcName}; distributed::kIndexRpcName};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
}; };
class Worker : public PrivateBase { class Worker : public PrivateBase {
@ -179,6 +195,8 @@ class Worker : public PrivateBase {
distributed::RemoteProduceRpcServer remote_produce_server_{*this, system_, distributed::RemoteProduceRpcServer remote_produce_server_{*this, system_,
plan_consumer_}; plan_consumer_};
distributed::IndexRpcServer index_rpc_server_{*this, system_}; distributed::IndexRpcServer index_rpc_server_{*this, system_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
}; };
#undef IMPL_GETTERS #undef IMPL_GETTERS
@ -241,6 +259,12 @@ distributed::RemotePullRpcClients &PublicBase::remote_pull_clients() {
distributed::RemoteProduceRpcServer &PublicBase::remote_produce_server() { distributed::RemoteProduceRpcServer &PublicBase::remote_produce_server() {
return impl_->remote_produce_server(); return impl_->remote_produce_server();
} }
distributed::RemoteUpdatesRpcServer &PublicBase::remote_updates_server() {
return impl_->remote_updates_server();
}
distributed::RemoteUpdatesRpcClients &PublicBase::remote_updates_clients() {
return impl_->remote_updates_clients();
}
void PublicBase::MakeSnapshot() { void PublicBase::MakeSnapshot() {
const bool status = durability::MakeSnapshot( const bool status = durability::MakeSnapshot(

View File

@ -21,6 +21,8 @@ class PlanDispatcher;
class PlanConsumer; class PlanConsumer;
class RemotePullRpcClients; class RemotePullRpcClients;
class RemoteProduceRpcServer; class RemoteProduceRpcServer;
class RemoteUpdatesRpcServer;
class RemoteUpdatesRpcClients;
} }
namespace database { namespace database {
@ -91,6 +93,8 @@ class GraphDb {
// Supported only in distributed master and worker, not in single-node. // Supported only in distributed master and worker, not in single-node.
virtual distributed::RemoteDataRpcServer &remote_data_server() = 0; virtual distributed::RemoteDataRpcServer &remote_data_server() = 0;
virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0; virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0;
virtual distributed::RemoteUpdatesRpcServer &remote_updates_server() = 0;
virtual distributed::RemoteUpdatesRpcClients &remote_updates_clients() = 0;
// Supported only in distributed master. // Supported only in distributed master.
virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0; virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0;
@ -135,6 +139,8 @@ class PublicBase : public GraphDb {
distributed::PlanConsumer &plan_consumer() override; distributed::PlanConsumer &plan_consumer() override;
distributed::RemotePullRpcClients &remote_pull_clients() override; distributed::RemotePullRpcClients &remote_pull_clients() override;
distributed::RemoteProduceRpcServer &remote_produce_server() override; distributed::RemoteProduceRpcServer &remote_produce_server() override;
distributed::RemoteUpdatesRpcServer &remote_updates_server() override;
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override;
protected: protected:
explicit PublicBase(std::unique_ptr<PrivateBase> impl); explicit PublicBase(std::unique_ptr<PrivateBase> impl);

View File

@ -6,6 +6,7 @@
#include "durability/hashed_file_writer.hpp" #include "durability/hashed_file_writer.hpp"
#include "storage/gid.hpp" #include "storage/gid.hpp"
#include "storage/property_value.hpp" #include "storage/property_value.hpp"
#include "utils/serialization.hpp"
namespace database { namespace database {
/** Describes single change to the database state. Used for durability (WAL) and /** Describes single change to the database state. Used for durability (WAL) and
@ -107,5 +108,44 @@ struct StateDelta {
PropertyValue value = PropertyValue::Null; PropertyValue value = PropertyValue::Null;
storage::Label label; storage::Label label;
std::string label_name; std::string label_name;
private:
friend class boost::serialization::access;
BOOST_SERIALIZATION_SPLIT_MEMBER();
template <class TArchive>
void save(TArchive &ar, const unsigned int) const {
ar &type;
ar &transaction_id;
ar &vertex_id;
ar &edge_id;
ar &vertex_from_id;
ar &vertex_to_id;
ar &edge_type;
ar &edge_type_name;
ar &property;
ar &property_name;
utils::SaveTypedValue(ar, value);
ar &label;
ar &label_name;
}
template <class TArchive>
void load(TArchive &ar, const unsigned int) {
ar &type;
ar &transaction_id;
ar &vertex_id;
ar &edge_id;
ar &vertex_from_id;
ar &vertex_to_id;
ar &edge_type;
ar &edge_type_name;
ar &property;
ar &property_name;
query::TypedValue tv;
utils::LoadTypedValue(ar, tv);
value = tv;
ar &label;
ar &label_name;
}
}; };
} // namespace database } // namespace database

View File

@ -17,7 +17,7 @@ class RemoteDataRpcServer {
// invalidation. // invalidation.
public: public:
RemoteDataRpcServer(database::GraphDb &db, communication::rpc::System &system) RemoteDataRpcServer(database::GraphDb &db, communication::rpc::System &system)
: db_(db), system_(system) { : db_(db), rpc_server_(system, kRemoteDataRpcName) {
rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) { rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id); database::GraphDbAccessor dba(db_, req.member.tx_id);
auto vertex = dba.FindVertexChecked(req.member.gid, false); auto vertex = dba.FindVertexChecked(req.member.gid, false);
@ -36,7 +36,6 @@ class RemoteDataRpcServer {
private: private:
database::GraphDb &db_; database::GraphDb &db_;
communication::rpc::System &system_; communication::rpc::Server rpc_server_;
communication::rpc::Server rpc_server_{system_, kRemoteDataRpcName};
}; };
} // namespace distributed } // namespace distributed

View File

@ -0,0 +1,45 @@
#pragma once
#include "database/state_delta.hpp"
#include "distributed/coordination.hpp"
#include "distributed/remote_updates_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "transactions/type.hpp"
namespace distributed {
/// Exposes the functionality to send updates to other workers (that own the
/// graph element we are updating). Also enables us to call for a worker to
/// apply the accumulated deferred updates, or discard them.
class RemoteUpdatesRpcClients {
public:
explicit RemoteUpdatesRpcClients(distributed::Coordination &coordination)
: worker_clients_(coordination, kRemoteUpdatesRpc) {}
/// Sends an update delta to the given worker.
RemoteUpdateResult RemoteUpdate(int worker_id,
const database::StateDelta &delta) {
return worker_clients_.GetClientPool(worker_id)
.Call<RemoteUpdateRpc>(delta)
->member;
}
/// Calls for the worker with the given ID to apply remote updates. Returns
/// the results of that operation.
RemoteUpdateResult RemoteUpdateApply(int worker_id,
tx::transaction_id_t tx_id) {
return worker_clients_.GetClientPool(worker_id)
.Call<RemoteUpdateApplyRpc>(tx_id)
->member;
}
/// Calls for the worker with the given ID to discard remote updates.
void RemoteUpdateDiscard(int worker_id, tx::transaction_id_t tx_id) {
worker_clients_.GetClientPool(worker_id).Call<RemoteUpdateDiscardRpc>(
tx_id);
}
private:
RpcWorkerClients worker_clients_;
};
} // namespace distributed

View File

@ -0,0 +1,35 @@
#pragma once
#include "communication/rpc/messages.hpp"
#include "database/state_delta.hpp"
#include "transactions/type.hpp"
namespace distributed {
const std::string kRemoteUpdatesRpc = "RemoteUpdatesRpc";
/// The result of sending or applying a deferred update to a worker.
enum class RemoteUpdateResult {
DONE,
SERIALIZATION_ERROR,
LOCK_TIMEOUT_ERROR,
UPDATE_DELETED_ERROR
};
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateReq, database::StateDelta);
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateRes, RemoteUpdateResult);
using RemoteUpdateRpc =
communication::rpc::RequestResponse<RemoteUpdateReq, RemoteUpdateRes>;
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyReq, tx::transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyRes, RemoteUpdateResult);
using RemoteUpdateApplyRpc =
communication::rpc::RequestResponse<RemoteUpdateApplyReq,
RemoteUpdateApplyRes>;
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateDiscardReq, tx::transaction_id_t);
RPC_NO_MEMBER_MESSAGE(RemoteUpdateDiscardRes);
using RemoteUpdateDiscardRpc =
communication::rpc::RequestResponse<RemoteUpdateDiscardReq,
RemoteUpdateDiscardRes>;
} // namespace distributed

View File

@ -0,0 +1,221 @@
#pragma once
#include <mutex>
#include <unordered_map>
#include <utility>
#include <vector>
#include "glog/logging.h"
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
#include "distributed/remote_updates_rpc_messages.hpp"
#include "mvcc/version_list.hpp"
#include "storage/gid.hpp"
#include "storage/record_accessor.hpp"
#include "storage/vertex_accessor.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/type.hpp"
namespace distributed {
/// An RPC server that accepts and holds deferred updates (deltas) until it's
/// told to apply or discard them. The updates are organized and applied per
/// transaction in this single updates server.
///
/// Attempts to get serialization and update-after-delete errors to happen as
/// soon as possible during query execution (fail fast).
class RemoteUpdatesRpcServer {
// Remote updates for one transaction.
template <typename TRecordAccessor>
class TransactionUpdates {
public:
TransactionUpdates(database::GraphDb &db, tx::transaction_id_t tx_id)
: db_accessor_(db, tx_id) {}
/// Adds a delta and returns the result. Does not modify the state (data) of
/// the graph element the update is for, but calls the `update` method to
/// fail-fast on serialization and update-after-delete errors.
RemoteUpdateResult Emplace(const database::StateDelta &delta) {
auto gid = std::is_same<TRecordAccessor, VertexAccessor>::value
? delta.vertex_id
: delta.edge_id;
std::lock_guard<SpinLock> guard{lock_};
auto found = deltas_.find(gid);
if (found == deltas_.end()) {
found = deltas_
.emplace(gid, std::make_pair(
FindAccessor(gid),
std::vector<database::StateDelta>{}))
.first;
}
found->second.second.emplace_back(delta);
// TODO call `RecordAccessor::update` to force serialization errors to
// fail-fast (as opposed to when all the deltas get applied).
//
// This is problematic because `VersionList::update` needs to become
// thread-safe within the same transaction. Note that the concurrency is
// possible both between the owner worker interpretation thread and an RPC
// thread (current thread), as well as multiple RPC threads if this
// object's lock is released (perhaps desirable).
//
// A potential solution *might* be that `LockStore::Lock` returns a `bool`
// indicating if the caller was the one obtaining the lock (not the same
// as lock already being held by the same transaction).
//
// Another thing that needs to be done (if we do this) is ensuring that
// `LockStore::Take` is thread-safe when called in parallel in the same
// transaction. Currently it's thread-safe only when called in parallel
// from different transactions (only one manages to take the RecordLock).
//
// Deferring the implementation of this as it's tricky, and essentially an
// optimization.
//
// try {
// found->second.first.update();
// } catch (const mvcc::SerializationError &) {
// return RemoteUpdateResult::SERIALIZATION_ERROR;
// } catch (const RecordDeletedError &) {
// return RemoteUpdateResult::UPDATE_DELETED_ERROR;
// } catch (const LockTimeoutException &) {
// return RemoteUpdateResult::LOCK_TIMEOUT_ERROR;
// }
return RemoteUpdateResult::DONE;
}
/// Applies all the deltas on the record.
RemoteUpdateResult Apply() {
std::lock_guard<SpinLock> guard{lock_};
for (auto &kv : deltas_) {
for (database::StateDelta &delta : kv.second.second) {
try {
kv.second.first.ProcessDelta(delta);
} catch (const mvcc::SerializationError &) {
return RemoteUpdateResult::SERIALIZATION_ERROR;
} catch (const RecordDeletedError &) {
return RemoteUpdateResult::UPDATE_DELETED_ERROR;
} catch (const LockTimeoutException &) {
return RemoteUpdateResult::LOCK_TIMEOUT_ERROR;
}
}
}
return RemoteUpdateResult::DONE;
}
private:
database::GraphDbAccessor db_accessor_;
std::unordered_map<
gid::Gid, std::pair<TRecordAccessor, std::vector<database::StateDelta>>>
deltas_;
// Multiple workers might be sending remote updates concurrently.
SpinLock lock_;
// Helper method specialized for [Vertex|Edge]Accessor.
TRecordAccessor FindAccessor(gid::Gid gid);
};
public:
RemoteUpdatesRpcServer(database::GraphDb &db,
communication::rpc::System &system)
: db_(db), server_(system, kRemoteUpdatesRpc) {
server_.Register<RemoteUpdateRpc>([this](const RemoteUpdateReq &req) {
using DeltaType = database::StateDelta::Type;
switch (req.member.type) {
case DeltaType::SET_PROPERTY_VERTEX:
case DeltaType::ADD_LABEL:
case DeltaType::REMOVE_LABEL:
return std::make_unique<RemoteUpdateRes>(
Process(vertex_updates_, req.member));
case DeltaType::SET_PROPERTY_EDGE:
return std::make_unique<RemoteUpdateRes>(
Process(edge_updates_, req.member));
default:
LOG(FATAL) << "Can't perform a remote update with delta type: "
<< static_cast<int>(req.member.type);
}
});
server_.Register<RemoteUpdateApplyRpc>(
[this](const RemoteUpdateApplyReq &req) {
return std::make_unique<RemoteUpdateApplyRes>(Apply(req.member));
});
server_.Register<RemoteUpdateDiscardRpc>(
[this](const RemoteUpdateDiscardReq &req) {
Discard(req.member);
return std::make_unique<RemoteUpdateDiscardRes>();
});
}
/// Applies all existsing updates for the given transaction ID. If there are
/// no updates for that transaction, nothing happens. Clears the updates cache
/// after applying them, regardless of the result.
RemoteUpdateResult Apply(tx::transaction_id_t tx_id) {
auto apply = [tx_id](auto &collection) {
auto access = collection.access();
auto found = access.find(tx_id);
if (found == access.end()) {
return RemoteUpdateResult::DONE;
}
auto result = found->second.Apply();
access.remove(tx_id);
return result;
};
auto vertex_result = apply(vertex_updates_);
auto edge_result = apply(edge_updates_);
if (vertex_result != RemoteUpdateResult::DONE) return vertex_result;
if (edge_result != RemoteUpdateResult::DONE) return edge_result;
return RemoteUpdateResult::DONE;
}
/// Discards all the existing updates for the given transaction ID.
void Discard(tx::transaction_id_t tx_id) {
vertex_updates_.access().remove(tx_id);
edge_updates_.access().remove(tx_id);
}
private:
database::GraphDb &db_;
communication::rpc::Server server_;
ConcurrentMap<tx::transaction_id_t, TransactionUpdates<VertexAccessor>>
vertex_updates_;
ConcurrentMap<tx::transaction_id_t, TransactionUpdates<EdgeAccessor>>
edge_updates_;
// Processes a single delta recieved in the RPC request.
template <typename TCollection>
RemoteUpdateResult Process(TCollection &updates,
const database::StateDelta &delta) {
auto tx_id = delta.transaction_id;
auto access = updates.access();
auto &transaction_updates =
access
.emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(db_), tx_id))
.first->second;
return transaction_updates.Emplace(delta);
}
};
template <>
inline VertexAccessor
RemoteUpdatesRpcServer::TransactionUpdates<VertexAccessor>::FindAccessor(
gid::Gid gid) {
return db_accessor_.FindVertexChecked(gid, false);
}
template <>
inline EdgeAccessor
RemoteUpdatesRpcServer::TransactionUpdates<EdgeAccessor>::FindAccessor(
gid::Gid gid) {
return db_accessor_.FindEdgeChecked(gid, false);
}
} // namespace distributed

View File

@ -2,9 +2,11 @@
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/remote_updates_rpc_clients.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
#include "storage/record_accessor.hpp" #include "storage/record_accessor.hpp"
#include "storage/vertex.hpp" #include "storage/vertex.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
using database::StateDelta; using database::StateDelta;
@ -160,14 +162,10 @@ TRecord &RecordAccessor<TRecord>::update() const {
} }
const auto &t = db_accessor_->transaction(); const auto &t = db_accessor_->transaction();
{
const std::string err =
"Can't update a record deleted in the current transaction+commad";
if (!new_ && old_->is_expired_by(t)) if (!new_ && old_->is_expired_by(t))
throw RecordDeletedError(err); throw RecordDeletedError();
else if (new_ && new_->is_expired_by(t)) else if (new_ && new_->is_expired_by(t))
throw RecordDeletedError(err); throw RecordDeletedError();
}
if (new_) return *new_; if (new_) return *new_;
@ -227,8 +225,18 @@ void RecordAccessor<TRecord>::ProcessDelta(
if (is_local()) { if (is_local()) {
db_accessor().wal().Emplace(delta); db_accessor().wal().Emplace(delta);
} else { } else {
// TODO use the delta to perform a remote update. auto result = db_accessor().db().remote_updates_clients().RemoteUpdate(
// TODO check for results (success, serialization_error, ...) address().worker_id(), delta);
switch (result) {
case distributed::RemoteUpdateResult::DONE:
break;
case distributed::RemoteUpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError();
case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError();
case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException("Lock timeout on remote worker");
}
} }
} }

View File

@ -121,6 +121,18 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
*/ */
bool Reconstruct() const; bool Reconstruct() const;
/**
* Ensures there is an updateable version of the record in the version_list,
* and that the `new_` pointer points to it. Returns a reference to that
* version.
*
* It is not legal to call this function on a Vertex/Edge that has been
* deleted in the current transaction+command.
*
* @throws RecordDeletedError
*/
TRecord &update() const;
/** /**
* Returns true if the given accessor is visible to the given transaction. * Returns true if the given accessor is visible to the given transaction.
* *
@ -134,6 +146,14 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
(current_state && new_ && !new_->is_expired_by(t)); (current_state && new_ && !new_->is_expired_by(t));
} }
/**
* Processes the delta that's a consequence of changes in this accessor. If
* the accessor is local that means writing the delta to the write-ahead log.
* If it's remote, then the delta needs to be sent to it's owner for
* processing.
*/
void ProcessDelta(const database::StateDelta &delta) const;
protected: protected:
/** /**
* Pointer to the version (either old_ or new_) that READ operations * Pointer to the version (either old_ or new_) that READ operations
@ -145,18 +165,6 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
*/ */
mutable TRecord *current_{nullptr}; mutable TRecord *current_{nullptr};
/**
* Ensures there is an updateable version of the record in the version_list,
* and that the `new_` pointer points to it. Returns a reference to that
* version.
*
* It is not legal to call this function on a Vertex/Edge that has been
* deleted in the current transaction+command.
*
* @throws RecordDeletedError
*/
TRecord &update() const;
/** Returns the current version (either new_ or old_) set on this /** Returns the current version (either new_ or old_) set on this
* RecordAccessor. */ * RecordAccessor. */
const TRecord &current() const; const TRecord &current() const;
@ -165,14 +173,6 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
* owner is some other worker in a distributed system. */ * owner is some other worker in a distributed system. */
bool is_local() const { return address_.is_local(); } bool is_local() const { return address_.is_local(); }
/**
* Processes the delta that's a consequence of changes in this accessor. If
* the accessor is local that means writing the delta to the write-ahead log.
* If it's remote, then the delta needs to be sent to it's owner for
* processing.
*/
void ProcessDelta(const database::StateDelta &delta) const;
private: private:
// The database accessor for which this record accessor is created // The database accessor for which this record accessor is created
// Provides means of getting to the transaction and database functions. // Provides means of getting to the transaction and database functions.
@ -212,5 +212,9 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
/** Error when trying to update a deleted record */ /** Error when trying to update a deleted record */
class RecordDeletedError : public utils::BasicException { class RecordDeletedError : public utils::BasicException {
using utils::BasicException::BasicException; public:
RecordDeletedError()
: utils::BasicException(
"Can't update a record deleted in the current transaction+commad") {
}
}; };

View File

@ -11,7 +11,7 @@ class DistributedGraphDbTest : public ::testing::Test {
class WorkerInThread { class WorkerInThread {
public: public:
WorkerInThread(database::Config config) : worker_(config) { explicit WorkerInThread(database::Config config) : worker_(config) {
thread_ = std::thread([this, config] { worker_.WaitForShutdown(); }); thread_ = std::thread([this, config] { worker_.WaitForShutdown(); });
} }

View File

@ -1,31 +1,67 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "distributed/remote_updates_rpc_server.hpp"
#include "distributed_common.hpp" #include "distributed_common.hpp"
TEST_F(DistributedGraphDbTest, RemoteUpdateLocalVisibility) { class DistributedUpdateTest : public DistributedGraphDbTest {
protected:
std::unique_ptr<database::GraphDbAccessor> dba1;
std::unique_ptr<database::GraphDbAccessor> dba2;
storage::Label label;
std::unique_ptr<VertexAccessor> v1_dba1;
std::unique_ptr<VertexAccessor> v1_dba2;
void SetUp() override {
DistributedGraphDbTest::SetUp();
database::GraphDbAccessor dba_tx1{worker(1)}; database::GraphDbAccessor dba_tx1{worker(1)};
auto v = dba_tx1.InsertVertex(); auto v = dba_tx1.InsertVertex();
auto v_ga = v.GlobalAddress(); auto v_ga = v.GlobalAddress();
dba_tx1.Commit(); dba_tx1.Commit();
database::GraphDbAccessor dba_tx2_w2{worker(2)}; dba1 = std::make_unique<database::GraphDbAccessor>(worker(1));
v = VertexAccessor(v_ga, dba_tx2_w2); dba2 = std::make_unique<database::GraphDbAccessor>(worker(2),
ASSERT_FALSE(v.address().is_local()); dba1->transaction_id());
auto label = dba_tx2_w2.Label("l");
EXPECT_FALSE(v.has_label(label));
v.add_label(label);
v.SwitchNew();
EXPECT_TRUE(v.has_label(label));
v.SwitchOld();
EXPECT_FALSE(v.has_label(label));
// In the same transaction on the owning worker there is no label. v1_dba1 = std::make_unique<VertexAccessor>(v_ga, *dba1);
database::GraphDbAccessor dba_tx2_w1{worker(1), dba_tx2_w2.transaction_id()}; v1_dba2 = std::make_unique<VertexAccessor>(v_ga, *dba2);
v = VertexAccessor(v_ga, dba_tx2_w1); ASSERT_FALSE(v1_dba2->address().is_local());
v.SwitchOld(); label = dba1->Label("l");
EXPECT_FALSE(v.has_label(label)); v1_dba2->add_label(label);
v.SwitchNew(); }
EXPECT_FALSE(v.has_label(label));
void TearDown() override {
dba2 = nullptr;
dba1 = nullptr;
DistributedGraphDbTest::TearDown();
}
};
#define EXPECT_LABEL(var, old_result, new_result) \
{ \
var->SwitchOld(); \
EXPECT_EQ(var->has_label(label), old_result); \
var->SwitchNew(); \
EXPECT_EQ(var->has_label(label), new_result); \
}
TEST_F(DistributedUpdateTest, RemoteUpdateLocalOnly) {
EXPECT_LABEL(v1_dba2, false, true);
EXPECT_LABEL(v1_dba1, false, false);
} }
TEST_F(DistributedUpdateTest, RemoteUpdateApply) {
EXPECT_LABEL(v1_dba1, false, false);
worker(1).remote_updates_server().Apply(dba1->transaction_id());
EXPECT_LABEL(v1_dba1, false, true);
}
TEST_F(DistributedUpdateTest, RemoteUpdateDiscard) {
EXPECT_LABEL(v1_dba1, false, false);
worker(1).remote_updates_server().Discard(dba1->transaction_id());
EXPECT_LABEL(v1_dba1, false, false);
}
#undef EXPECT_LABEL