Implement sync operator

Reviewers: teon.banek, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1192
This commit is contained in:
florijan 2018-02-14 09:44:48 +01:00
parent cc6f6f4b35
commit 796946ad1b
24 changed files with 501 additions and 150 deletions

View File

@ -39,6 +39,8 @@ BOOST_CLASS_EXPORT(tx::AbortReq);
BOOST_CLASS_EXPORT(tx::AbortRes);
BOOST_CLASS_EXPORT(tx::SnapshotReq);
BOOST_CLASS_EXPORT(tx::SnapshotRes);
BOOST_CLASS_EXPORT(tx::CommandReq);
BOOST_CLASS_EXPORT(tx::CommandRes);
BOOST_CLASS_EXPORT(tx::GcSnapshotReq);
BOOST_CLASS_EXPORT(tx::ClogInfoReq);
BOOST_CLASS_EXPORT(tx::ClogInfoRes);
@ -70,6 +72,8 @@ BOOST_CLASS_EXPORT(distributed::RemotePullReq);
BOOST_CLASS_EXPORT(distributed::RemotePullRes);
BOOST_CLASS_EXPORT(distributed::EndRemotePullReq);
BOOST_CLASS_EXPORT(distributed::EndRemotePullRes);
BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedReq);
BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedRes);
// Distributed indexes.
BOOST_CLASS_EXPORT(distributed::BuildIndexReq);
@ -86,3 +90,7 @@ BOOST_CLASS_EXPORT(stats::BatchStatsRes);
BOOST_CLASS_EXPORT(database::StateDelta);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardRes);

View File

@ -89,6 +89,15 @@ class RemoteCache {
std::make_pair(std::move(old_record), std::move(new_record));
}
/// Removes all the cached data. All the pointers to that data still held by
/// RecordAccessors will become invalid and must never be dereferenced after
/// this call. To make a RecordAccessor valid again Reconstruct must be called
/// on it. This is typically done after the command advanced.
void ClearCache() {
std::lock_guard<std::mutex> guard{lock_};
cache_.clear();
}
private:
std::mutex lock_;
distributed::RemoteDataRpcClients &remote_data_clients_;

View File

@ -41,6 +41,12 @@ class RemoteDataManager {
template <typename TRecord>
auto &Elements(tx::transaction_id_t tx_id);
/// Calls RemoteCache::ClearCache on vertex and edge caches.
void ClearCaches(tx::transaction_id_t tx_id) {
Vertices(tx_id).ClearCache();
Edges(tx_id).ClearCache();
}
private:
RemoteDataRpcClients &remote_data_clients_;
SpinLock lock_;

View File

@ -10,7 +10,9 @@
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/remote_data_manager.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp"
#include "query/common.hpp"
#include "query/context.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
@ -29,7 +31,10 @@ namespace distributed {
* identified.
*/
class RemoteProduceRpcServer {
/** Encapsulates an execution in progress. */
/// Encapsulates a Cursor execution in progress. Can be used for pulling a
/// single result from the execution, or pulling all and accumulating the
/// results. Accumulations are used for synchronizing updates in distributed
/// MG (see query::plan::Synchronize).
class OngoingProduce {
public:
OngoingProduce(database::GraphDb &db, tx::transaction_id_t tx_id,
@ -45,12 +50,60 @@ class RemoteProduceRpcServer {
context_.parameters_ = std::move(parameters);
}
/** Returns a vector of typed values (one for each `pull_symbol`), and a
* `bool` indicating if the pull was successful (or the cursor is
* exhausted). */
/// Returns a vector of typed values (one for each `pull_symbol`), and an
/// indication of the pull result. The result data is valid only if the
/// returned state is CURSOR_IN_PROGRESS.
std::pair<std::vector<query::TypedValue>, RemotePullState> Pull() {
RemotePullState state = RemotePullState::CURSOR_IN_PROGRESS;
if (!accumulation_.empty()) {
auto results = std::move(accumulation_.back());
accumulation_.pop_back();
for (auto &element : results) {
try {
query::ReconstructTypedValue(element);
} catch (query::ReconstructionException &) {
cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR;
return std::make_pair(std::move(results), cursor_state_);
}
}
return std::make_pair(std::move(results),
RemotePullState::CURSOR_IN_PROGRESS);
}
return PullOneFromCursor();
}
/// Accumulates all the frames pulled from the cursor and returns
/// CURSOR_EXHAUSTED. If an error occurs, an appropriate value is returned.
RemotePullState Accumulate() {
while (true) {
auto result = PullOneFromCursor();
if (result.second != RemotePullState::CURSOR_IN_PROGRESS)
return result.second;
else
accumulation_.emplace_back(std::move(result.first));
}
}
private:
database::GraphDbAccessor dba_;
std::unique_ptr<query::plan::Cursor> cursor_;
query::Context context_;
std::vector<query::Symbol> pull_symbols_;
query::Frame frame_;
RemotePullState cursor_state_{RemotePullState::CURSOR_IN_PROGRESS};
std::vector<std::vector<query::TypedValue>> accumulation_;
std::pair<std::vector<query::TypedValue>, RemotePullState>
PullOneFromCursor() {
std::vector<query::TypedValue> results;
// Check if we already exhausted this cursor (or it entered an error
// state). This happens when we accumulate before normal pull.
if (cursor_state_ != RemotePullState::CURSOR_IN_PROGRESS) {
return std::make_pair(results, cursor_state_);
}
try {
if (cursor_->Pull(frame_, context_)) {
results.reserve(pull_symbols_.size());
@ -58,32 +111,21 @@ class RemoteProduceRpcServer {
results.emplace_back(std::move(frame_[symbol]));
}
} else {
state = RemotePullState::CURSOR_EXHAUSTED;
cursor_state_ = RemotePullState::CURSOR_EXHAUSTED;
}
} catch (const mvcc::SerializationError &) {
state = RemotePullState::SERIALIZATION_ERROR;
cursor_state_ = RemotePullState::SERIALIZATION_ERROR;
} catch (const LockTimeoutException &) {
state = RemotePullState::LOCK_TIMEOUT_ERROR;
cursor_state_ = RemotePullState::LOCK_TIMEOUT_ERROR;
} catch (const RecordDeletedError &) {
state = RemotePullState::UPDATE_DELETED_ERROR;
cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR;
} catch (const query::ReconstructionException &) {
state = RemotePullState::RECONSTRUCTION_ERROR;
cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR;
} catch (const query::QueryRuntimeException &) {
state = RemotePullState::QUERY_ERROR;
cursor_state_ = RemotePullState::QUERY_ERROR;
}
return std::make_pair(std::move(results), state);
return std::make_pair(std::move(results), cursor_state_);
}
private:
// TODO currently each OngoingProduce has it's own GDBA. There is no sharing
// of them in the same transaction. This should be correct, but it's
// inefficient in multi-command queries, and when a single query will get
// broken down into multiple parts.
database::GraphDbAccessor dba_;
std::unique_ptr<query::plan::Cursor> cursor_;
query::Context context_;
std::vector<query::Symbol> pull_symbols_;
query::Frame frame_;
};
public:
@ -106,6 +148,13 @@ class RemoteProduceRpcServer {
ongoing_produces_.erase(it);
return std::make_unique<EndRemotePullRes>();
});
remote_produce_rpc_server_.Register<TransactionCommandAdvancedRpc>(
[this](const TransactionCommandAdvancedReq &req) {
db_.tx_engine().UpdateCommand(req.member);
db_.remote_data_manager().ClearCaches(req.member);
return std::make_unique<TransactionCommandAdvancedRes>();
});
}
private:
@ -140,11 +189,19 @@ class RemoteProduceRpcServer {
RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new};
result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS;
if (req.accumulate) {
result.state_and_frames.pull_state = ongoing_produce.Accumulate();
// If an error ocurred, we need to return that error.
if (result.state_and_frames.pull_state !=
RemotePullState::CURSOR_EXHAUSTED) {
return result;
}
}
for (int i = 0; i < req.batch_size; ++i) {
auto pull_result = ongoing_produce.Pull();
result.state_and_frames.pull_state = pull_result.second;
if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS)
break;
if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) break;
result.state_and_frames.frames.emplace_back(std::move(pull_result.first));
}

View File

@ -39,11 +39,12 @@ struct RemotePullReq : public communication::rpc::Message {
RemotePullReq() {}
RemotePullReq(tx::transaction_id_t tx_id, int64_t plan_id,
const Parameters &params, std::vector<query::Symbol> symbols,
int batch_size, bool send_old, bool send_new)
bool accumulate, int batch_size, bool send_old, bool send_new)
: tx_id(tx_id),
plan_id(plan_id),
params(params),
symbols(symbols),
accumulate(accumulate),
batch_size(batch_size),
send_old(send_old),
send_new(send_new) {}
@ -52,6 +53,7 @@ struct RemotePullReq : public communication::rpc::Message {
int64_t plan_id;
Parameters params;
std::vector<query::Symbol> symbols;
bool accumulate;
int batch_size;
// Indicates which of (old, new) records of a graph element should be sent.
bool send_old;
@ -72,6 +74,7 @@ struct RemotePullReq : public communication::rpc::Message {
utils::SaveTypedValue(ar, kv.second);
}
ar << symbols;
ar << accumulate;
ar << batch_size;
ar << send_old;
ar << send_new;
@ -93,6 +96,7 @@ struct RemotePullReq : public communication::rpc::Message {
params.Add(token_pos, param);
}
ar >> symbols;
ar >> accumulate;
ar >> batch_size;
ar >> send_old;
ar >> send_new;
@ -360,11 +364,16 @@ using RemotePullRpc =
// optimization not to have to send the full RemotePullReqData pack every
// time.
using EndRemotePullReqData = std::pair<tx::transaction_id_t, int64_t>;
RPC_SINGLE_MEMBER_MESSAGE(EndRemotePullReq, EndRemotePullReqData);
using EndRemotePullData = std::pair<tx::transaction_id_t, int64_t>;
RPC_SINGLE_MEMBER_MESSAGE(EndRemotePullReq, EndRemotePullData);
RPC_NO_MEMBER_MESSAGE(EndRemotePullRes);
using EndRemotePullRpc =
communication::rpc::RequestResponse<EndRemotePullReq, EndRemotePullRes>;
RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t);
RPC_NO_MEMBER_MESSAGE(TransactionCommandAdvancedRes);
using TransactionCommandAdvancedRpc =
communication::rpc::RequestResponse<TransactionCommandAdvancedReq,
TransactionCommandAdvancedRes>;
} // namespace distributed

View File

@ -27,16 +27,20 @@ class RemotePullRpcClients {
/// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this
/// function for the same (tx_id, worker_id, plan_id) before the previous call
/// has ended.
///
/// @todo: it might be cleaner to split RemotePull into {InitRemoteCursor,
/// RemotePull, RemoteAccumulate}, but that's a lot of refactoring and more
/// RPC calls.
std::future<RemotePullData> RemotePull(
database::GraphDbAccessor &dba, int worker_id, int64_t plan_id,
const Parameters &params, const std::vector<query::Symbol> &symbols,
int batch_size = kDefaultBatchSize) {
bool accumulate, int batch_size = kDefaultBatchSize) {
return clients_.ExecuteOnWorker<RemotePullData>(
worker_id,
[&dba, plan_id, params, symbols, batch_size](ClientPool &client_pool) {
worker_id, [&dba, plan_id, params, symbols, accumulate,
batch_size](ClientPool &client_pool) {
auto result = client_pool.Call<RemotePullRpc>(
dba.transaction_id(), plan_id, params, symbols, batch_size, true,
true);
dba.transaction_id(), plan_id, params, symbols, accumulate,
batch_size, true, true);
auto handle_vertex = [&dba](auto &v) {
dba.db()
@ -86,7 +90,7 @@ class RemotePullRpcClients {
// Notifies a worker that the given transaction/plan is done. Otherwise the
// server is left with potentially unconsumed Cursors that never get deleted.
//
// TODO - maybe this needs to be done with hooks into the transactional
// @todo - this needs to be done with hooks into the transactional
// engine, so that the Worker discards it's stuff when the relevant
// transaction are done.
std::future<void> EndRemotePull(int worker_id, tx::transaction_id_t tx_id,
@ -94,7 +98,7 @@ class RemotePullRpcClients {
return clients_.ExecuteOnWorker<void>(
worker_id, [tx_id, plan_id](ClientPool &client_pool) {
return client_pool.Call<EndRemotePullRpc>(
EndRemotePullReqData{tx_id, plan_id});
EndRemotePullData{tx_id, plan_id});
});
}
@ -107,6 +111,13 @@ class RemotePullRpcClients {
for (auto &future : futures) future.wait();
}
std::vector<std::future<void>> NotifyAllTransactionCommandAdvanced(
tx::transaction_id_t tx_id) {
return clients_.ExecuteOnWorkers<void>(0, [tx_id](auto &client) {
client.template Call<TransactionCommandAdvancedRpc>(tx_id);
});
}
private:
RpcWorkerClients clients_;
};

View File

@ -33,6 +33,16 @@ class RemoteUpdatesRpcClients {
->member;
}
/// Calls for all the workers (except the given one) to apply their updates
/// and returns the future results.
std::vector<std::future<RemoteUpdateResult>> RemoteUpdateApplyAll(
int skip_worker_id, tx::transaction_id_t tx_id) {
return worker_clients_.ExecuteOnWorkers<RemoteUpdateResult>(
skip_worker_id, [tx_id](auto &client) {
return client.template Call<RemoteUpdateApplyRpc>(tx_id)->member;
});
}
/// Calls for the worker with the given ID to discard remote updates.
void RemoteUpdateDiscard(int worker_id, tx::transaction_id_t tx_id) {
worker_clients_.GetClientPool(worker_id).Call<RemoteUpdateDiscardRpc>(

View File

@ -18,6 +18,8 @@ class Frame {
return elems_[symbol.position()];
}
auto &elems() { return elems_; }
private:
int size_;
std::vector<TypedValue> elems_;

View File

@ -14,6 +14,8 @@
#include "database/graph_db_accessor.hpp"
#include "distributed/remote_pull_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_server.hpp"
#include "query/context.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/ast/ast.hpp"
@ -2720,18 +2722,6 @@ void Union::UnionCursor::Reset() {
right_cursor_->Reset();
}
ProduceRemote::ProduceRemote(const std::shared_ptr<LogicalOperator> &input,
const std::vector<Symbol> &symbols)
: input_(input ? input : std::make_shared<Once>()), symbols_(symbols) {}
ACCEPT_WITH_INPUT(ProduceRemote)
std::unique_ptr<Cursor> ProduceRemote::MakeCursor(
database::GraphDbAccessor &) const {
// TODO: Implement a concrete cursor.
return nullptr;
}
PullRemote::PullRemote(const std::shared_ptr<LogicalOperator> &input,
int64_t plan_id, const std::vector<Symbol> &symbols)
: input_(input), plan_id_(plan_id), symbols_(symbols) {}
@ -2761,7 +2751,8 @@ void PullRemote::PullRemoteCursor::EndRemotePull() {
bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
auto insert_future_for_worker = [&](int worker_id) {
remote_pulls_[worker_id] = db_.db().remote_pull_clients().RemotePull(
db_, worker_id, self_.plan_id(), context.parameters_, self_.symbols());
db_, worker_id, self_.plan_id(), context.parameters_, self_.symbols(),
false);
};
if (!remote_pulls_initialized_) {
@ -2804,7 +2795,8 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
"LockTimeout error occured during PullRemote !");
case distributed::RemotePullState::UPDATE_DELETED_ERROR:
EndRemotePull();
throw RecordDeletedError();
throw QueryRuntimeException(
"RecordDeleted error ocured during PullRemote !");
case distributed::RemotePullState::RECONSTRUCTION_ERROR:
EndRemotePull();
throw query::ReconstructionException();
@ -2896,10 +2888,141 @@ bool Synchronize::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
return visitor.PostVisit(*this);
}
namespace {
class SynchronizeCursor : public Cursor {
public:
SynchronizeCursor(const Synchronize &self, database::GraphDbAccessor &db)
: self_(self),
input_cursor_(self.input()->MakeCursor(db)),
pull_remote_cursor_(
self.pull_remote() ? self.pull_remote()->MakeCursor(db) : nullptr) {
}
bool Pull(Frame &frame, Context &context) override {
if (!initial_pull_done_) {
InitialPull(frame, context);
initial_pull_done_ = true;
}
// Yield local stuff while available.
if (!local_frames_.empty()) {
auto &result = local_frames_.back();
for (size_t i = 0; i < frame.elems().size(); ++i) {
if (self_.advance_command()) {
query::ReconstructTypedValue(result[i]);
}
frame.elems()[i] = std::move(result[i]);
}
local_frames_.resize(local_frames_.size() - 1);
return true;
}
// We're out of local stuff, yield from pull_remote if available.
if (pull_remote_cursor_ && pull_remote_cursor_->Pull(frame, context))
return true;
return false;
}
void Reset() override {
throw QueryRuntimeException("Unsupported: Reset during Synchronize!");
}
private:
const Synchronize &self_;
const std::unique_ptr<Cursor> input_cursor_;
const std::unique_ptr<Cursor> pull_remote_cursor_;
bool initial_pull_done_{false};
std::vector<std::vector<TypedValue>> local_frames_;
void InitialPull(Frame &frame, Context &context) {
auto &db = context.db_accessor_.db();
// Tell all workers to accumulate, only if there is a remote pull.
std::vector<std::future<distributed::RemotePullData>> worker_accumulations;
if (pull_remote_cursor_) {
for (auto worker_id : db.remote_pull_clients().GetWorkerIds()) {
if (worker_id == db.WorkerId()) continue;
worker_accumulations.emplace_back(db.remote_pull_clients().RemotePull(
context.db_accessor_, worker_id, self_.pull_remote()->plan_id(),
context.parameters_, self_.pull_remote()->symbols(), true, 0));
}
}
// Accumulate local results
while (input_cursor_->Pull(frame, context)) {
local_frames_.emplace_back();
auto &local_frame = local_frames_.back();
local_frame.reserve(frame.elems().size());
for (auto &elem : frame.elems()) {
local_frame.emplace_back(std::move(elem));
}
}
// Wait for all workers to finish accumulation (first sync point).
for (auto &accu : worker_accumulations) {
switch (accu.get().pull_state) {
case distributed::RemotePullState::CURSOR_EXHAUSTED:
continue;
case distributed::RemotePullState::CURSOR_IN_PROGRESS:
throw QueryRuntimeException(
"Expected exhausted cursor after remote pull accumulate");
case distributed::RemotePullState::SERIALIZATION_ERROR:
throw mvcc::SerializationError(
"Failed to perform remote accumulate due to SerializationError");
case distributed::RemotePullState::UPDATE_DELETED_ERROR:
throw QueryRuntimeException(
"Failed to perform remote accumulate due to RecordDeletedError");
case distributed::RemotePullState::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
"Failed to perform remote accumulate due to "
"LockTimeoutException");
case distributed::RemotePullState::RECONSTRUCTION_ERROR:
throw QueryRuntimeException(
"Failed to perform remote accumulate due to ReconstructionError");
case distributed::RemotePullState::QUERY_ERROR:
throw QueryRuntimeException(
"Failed to perform remote accumulate due to Query runtime error");
}
}
if (self_.advance_command()) {
context.db_accessor_.AdvanceCommand();
}
// Make all the workers apply their deltas.
auto tx_id = context.db_accessor_.transaction_id();
auto apply_futures =
db.remote_updates_clients().RemoteUpdateApplyAll(db.WorkerId(), tx_id);
db.remote_updates_server().Apply(tx_id);
for (auto &future : apply_futures) {
switch (future.get()) {
case distributed::RemoteUpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError(
"Failed to apply deferred updates due to SerializationError");
case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR:
throw QueryRuntimeException(
"Failed to apply deferred updates due to RecordDeletedError");
case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
"Failed to apply deferred update due to LockTimeoutException");
case distributed::RemoteUpdateResult::DONE:
break;
}
}
// If the command advanced, let the workers know.
if (self_.advance_command()) {
auto futures =
db.remote_pull_clients().NotifyAllTransactionCommandAdvanced(tx_id);
for (auto &future : futures) future.wait();
}
}
};
}
std::unique_ptr<Cursor> Synchronize::MakeCursor(
database::GraphDbAccessor &) const {
// TODO: Implement a concrete cursor.
return nullptr;
database::GraphDbAccessor &db) const {
return std::make_unique<SynchronizeCursor>(*this, db);
}
} // namespace query::plan
@ -2936,6 +3059,5 @@ BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Unwind);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Distinct);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::CreateIndex);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Union);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::ProduceRemote);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::PullRemote);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Synchronize);

View File

@ -97,7 +97,6 @@ class Unwind;
class Distinct;
class CreateIndex;
class Union;
class ProduceRemote;
class PullRemote;
class Synchronize;
@ -108,8 +107,7 @@ using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor<
SetProperties, SetLabels, RemoveProperty, RemoveLabels,
ExpandUniquenessFilter<VertexAccessor>,
ExpandUniquenessFilter<EdgeAccessor>, Accumulate, Aggregate, Skip, Limit,
OrderBy, Merge, Optional, Unwind, Distinct, Union, ProduceRemote,
PullRemote, Synchronize>;
OrderBy, Merge, Optional, Unwind, Distinct, Union, PullRemote, Synchronize>;
using LogicalOperatorLeafVisitor = ::utils::LeafVisitor<Once, CreateIndex>;
@ -2271,29 +2269,14 @@ class Union : public LogicalOperator {
}
};
class ProduceRemote : public LogicalOperator {
public:
ProduceRemote(const std::shared_ptr<LogicalOperator> &input,
const std::vector<Symbol> &symbols);
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
private:
std::shared_ptr<LogicalOperator> input_;
std::vector<Symbol> symbols_;
ProduceRemote() {}
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, const unsigned int) {
ar &boost::serialization::base_object<LogicalOperator>(*this);
ar &input_;
ar &symbols_;
}
};
/**
* An operator in distributed Memgraph that yields both local and remote (from
* other workers) frames. Obtaining remote frames is done through RPC calls to
* `distributed::RemoteProduceRpcServer`s running on all the workers.
*
* This operator aims to yield results as fast as possible and loose minimal
* time on data transfer. It gives no guarantees on result order.
*/
class PullRemote : public LogicalOperator {
public:
PullRemote(const std::shared_ptr<LogicalOperator> &input, int64_t plan_id,
@ -2346,12 +2329,30 @@ class PullRemote : public LogicalOperator {
}
};
/** Operator used to synchronize the execution of master and workers. */
/**
* Operator used to synchronize stages of plan execution between the master and
* all the workers. Synchronization is necessary in queries that update that
* graph state because updates (as well as creations and deletions) are deferred
* to avoid multithreaded modification of graph element data (as it's not
* thread-safe).
*
* Logic of the synchronize operator is:
*
* 1. If there is a RemotePull, tell all the workers to pull on that plan and
* accumulate results without sending them to the master. This is async.
* 2. Accumulate local results, in parallel with 1. getting executed on workers.
* 3. Wait till the master and all the workers are done accumulating.
* 4. Advance the command, if necessary.
* 5. Tell all the workers to apply their updates. This is async.
* 6. Apply local updates, in parallel with 5. on the workers.
* 7. Notify workers that the command has advanced, if necessary.
* 8. Yield all the resutls, first local, then from RemotePull if available.
*/
class Synchronize : public LogicalOperator {
public:
Synchronize(const std::shared_ptr<LogicalOperator> &input,
const std::shared_ptr<PullRemote> &pull_remote,
bool advance_command = false)
bool advance_command)
: input_(input),
pull_remote_(pull_remote),
advance_command_(advance_command) {}
@ -2413,6 +2414,5 @@ BOOST_CLASS_EXPORT_KEY(query::plan::Unwind);
BOOST_CLASS_EXPORT_KEY(query::plan::Distinct);
BOOST_CLASS_EXPORT_KEY(query::plan::CreateIndex);
BOOST_CLASS_EXPORT_KEY(query::plan::Union);
BOOST_CLASS_EXPORT_KEY(query::plan::ProduceRemote);
BOOST_CLASS_EXPORT_KEY(query::plan::PullRemote);
BOOST_CLASS_EXPORT_KEY(query::plan::Synchronize);

View File

@ -55,6 +55,19 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) {
tx::transaction_id_t owner = owner_;
if (owner_ == tx.id_) return LockStatus::AlreadyHeld;
// In a distributed worker the transaction objects (and the locks they own)
// are not destructed at the same time like on the master. Consequently a lock
// might be active for a dead transaction. By asking the transaction engine
// for transaction info, we'll make the worker refresh it's knowledge about
// live transactions and release obsolete locks.
auto info = engine.Info(owner);
if (!info.is_active()) {
if (lock_.try_lock()) {
owner_ = tx.id_;
return LockStatus::Acquired;
}
}
// Insert edge into local lock_graph.
auto accessor = engine.local_lock_graph().access();
auto it = accessor.insert(tx.id_, owner).first;
@ -63,8 +76,7 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) {
// Find oldest transaction in lock cycle if cycle exists and notify that
// transaction that it should abort.
// TODO: maybe we can be smarter and abort some other transaction and not
// the
// oldest one.
// the oldest one.
auto oldest = FindOldestTxInLockCycle(tx.id_, accessor);
if (oldest) {
engine.LocalForEachActiveTransaction([&](tx::Transaction &t) {

View File

@ -28,6 +28,9 @@ class Engine {
/// Advances the command on the transaction with the given id.
virtual command_id_t Advance(transaction_id_t id) = 0;
/// Updates the command on the workers to the master's value.
virtual command_id_t UpdateCommand(transaction_id_t id) = 0;
/// Comits the given transaction. Deletes the transaction object, it's not
/// valid after this function executes.
virtual void Commit(const Transaction &t) = 0;

View File

@ -13,7 +13,7 @@ MasterEngine::MasterEngine(communication::rpc::System &system,
durability::WriteAheadLog *wal)
: SingleNodeEngine(wal), rpc_server_(system, kTransactionEngineRpc) {
rpc_server_.Register<BeginRpc>([this](const BeginReq &) {
auto tx = Begin();
auto tx = Begin();
return std::make_unique<BeginRes>(TxAndSnapshot{tx->id_, tx->snapshot()});
});
@ -22,12 +22,12 @@ MasterEngine::MasterEngine(communication::rpc::System &system,
});
rpc_server_.Register<CommitRpc>([this](const CommitReq &req) {
Commit(*RunningTransaction(req.member));
Commit(*RunningTransaction(req.member));
return std::make_unique<CommitRes>();
});
rpc_server_.Register<AbortRpc>([this](const AbortReq &req) {
Abort(*RunningTransaction(req.member));
Abort(*RunningTransaction(req.member));
return std::make_unique<AbortRes>();
});
@ -38,6 +38,12 @@ MasterEngine::MasterEngine(communication::rpc::System &system,
RunningTransaction(req.member)->snapshot());
});
rpc_server_.Register<CommandRpc>([this](const CommandReq &req) {
// It is guaranteed that the Worker will not be requesting this for a
// transaction that's done, and that there are no race conditions here.
return std::make_unique<CommandRes>(RunningTransaction(req.member)->cid());
});
rpc_server_.Register<GcSnapshotRpc>(
[this](const communication::rpc::Message &) {
return std::make_unique<SnapshotRes>(GlobalGcSnapshot());

View File

@ -23,8 +23,7 @@ struct TxAndSnapshot {
}
};
RPC_SINGLE_MEMBER_MESSAGE(BeginRes, TxAndSnapshot);
using BeginRpc =
communication::rpc::RequestResponse<BeginReq, BeginRes>;
using BeginRpc = communication::rpc::RequestResponse<BeginReq, BeginRes>;
RPC_SINGLE_MEMBER_MESSAGE(AdvanceReq, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(AdvanceRes, command_id_t);
@ -43,6 +42,10 @@ RPC_SINGLE_MEMBER_MESSAGE(SnapshotRes, Snapshot)
using SnapshotRpc =
communication::rpc::RequestResponse<SnapshotReq, SnapshotRes>;
RPC_SINGLE_MEMBER_MESSAGE(CommandReq, transaction_id_t)
RPC_SINGLE_MEMBER_MESSAGE(CommandRes, command_id_t)
using CommandRpc = communication::rpc::RequestResponse<CommandReq, CommandRes>;
RPC_NO_MEMBER_MESSAGE(GcSnapshotReq)
using GcSnapshotRpc =
communication::rpc::RequestResponse<GcSnapshotReq, SnapshotRes>;

View File

@ -41,6 +41,14 @@ command_id_t SingleNodeEngine::Advance(transaction_id_t id) {
return ++(t->cid_);
}
command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) {
std::lock_guard<SpinLock> guard(lock_);
auto it = store_.find(id);
DCHECK(it != store_.end())
<< "Transaction::advance on non-existing transaction";
return it->second->cid_;
}
void SingleNodeEngine::Commit(const Transaction &t) {
std::lock_guard<SpinLock> guard(lock_);
clog_.set_committed(t.id_);

View File

@ -31,6 +31,7 @@ class SingleNodeEngine : public Engine {
Transaction *Begin() override;
command_id_t Advance(transaction_id_t id) override;
command_id_t UpdateCommand(transaction_id_t id) override;
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
CommitLog::Info Info(transaction_id_t tx) const override;

View File

@ -34,16 +34,32 @@ command_id_t WorkerEngine::Advance(transaction_id_t tx_id) {
return res->member;
}
command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) {
command_id_t cmd_id = rpc_client_pool_.Call<CommandRpc>(tx_id)->member;
// Assume there is no concurrent work being done on this worker in the given
// transaction. This assumption is sound because command advancing needs to be
// done in a synchronized fashion, while no workers are executing in that
// transaction. That assumption lets us freely modify the command id in the
// cached transaction object, and ensures there are no race conditions on
// caching a transaction object if it wasn't cached already.
auto access = active_.access();
auto found = access.find(tx_id);
if (found != access.end()) {
found->second->cid_ = cmd_id;
}
return cmd_id;
}
void WorkerEngine::Commit(const Transaction &t) {
auto res = rpc_client_pool_.Call<CommitRpc>(t.id_);
auto removal = active_.access().remove(t.id_);
CHECK(removal) << "Can't commit a transaction not in local cache";
ClearCache(t.id_);
}
void WorkerEngine::Abort(const Transaction &t) {
auto res = rpc_client_pool_.Call<AbortRpc>(t.id_);
auto removal = active_.access().remove(t.id_);
CHECK(removal) << "Can't abort a transaction not in local cache";
ClearCache(t.id_);
}
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
@ -54,12 +70,11 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
// @review: this version of Call is just used because Info has no
// default constructor.
info = rpc_client_pool_.Call<ClogInfoRpc>(tid)->member;
DCHECK(info.is_committed() || info.is_aborted())
<< "It is expected that the transaction is not running anymore. This "
"function should be used only after the snapshot of the current "
"transaction is checked (in MVCC).";
if (info.is_committed()) clog_.set_committed(tid);
if (info.is_aborted()) clog_.set_aborted(tid);
if (!info.is_active()) {
if (info.is_committed()) clog_.set_committed(tid);
if (info.is_aborted()) clog_.set_aborted(tid);
ClearCache(tid);
}
}
return info;
@ -91,10 +106,19 @@ tx::Transaction *WorkerEngine::RunningTransaction(tx::transaction_id_t tx_id) {
Snapshot snapshot(
std::move(rpc_client_pool_.Call<SnapshotRpc>(tx_id)->member));
auto insertion =
accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this));
auto new_tx = new Transaction(tx_id, snapshot, *this);
auto insertion = accessor.insert(tx_id, new_tx);
if (!insertion.second) delete new_tx;
utils::EnsureAtomicGe(local_last_, tx_id);
return insertion.first->second;
}
void WorkerEngine::ClearCache(transaction_id_t tx_id) const {
auto access = active_.access();
auto found = access.find(tx_id);
if (found != access.end()) {
delete found->second;
access.remove(tx_id);
}
}
} // namespace tx

View File

@ -22,6 +22,7 @@ class WorkerEngine : public Engine {
Transaction *Begin() override;
command_id_t Advance(transaction_id_t id) override;
command_id_t UpdateCommand(transaction_id_t id) override;
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
CommitLog::Info Info(transaction_id_t tid) const override;
@ -35,13 +36,16 @@ class WorkerEngine : public Engine {
private:
// Local caches.
ConcurrentMap<transaction_id_t, Transaction *> active_;
mutable ConcurrentMap<transaction_id_t, Transaction *> active_;
std::atomic<transaction_id_t> local_last_{0};
// Mutable because just getting info can cause a cache fill.
mutable CommitLog clog_;
// Communication to the transactional master.
mutable communication::rpc::ClientPool rpc_client_pool_;
};
// Removes (destructs) a Transaction that's expired. If there is no cached
// transacton for the given id, nothing is done.
void ClearCache(transaction_id_t tx_id) const;
};
} // namespace tx

View File

@ -502,8 +502,6 @@ class PlanPrinter : public query::plan::HierarchicalLogicalOperatorVisitor {
return true;
}
PRE_VISIT(ProduceRemote);
bool PreVisit(query::plan::PullRemote &op) override {
WithPrintLn([&op](auto &out) {
out << "* PullRemote {";

View File

@ -3,6 +3,7 @@
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "transactions/engine_master.hpp"
class DistributedGraphDbTest : public ::testing::Test {
@ -63,6 +64,44 @@ class DistributedGraphDbTest : public ::testing::Test {
return workers_[worker_id - 1]->worker_;
}
/// Inserts a vertex and returns it's global address. Does it in a new
/// transaction.
auto InsertVertex(database::GraphDb &db) {
database::GraphDbAccessor dba{db};
auto r_val = dba.InsertVertex().GlobalAddress();
dba.Commit();
return r_val;
}
/// Inserts an edge (on the 'from' side) and returns it's global address.
auto InsertEdge(Edges::VertexAddress from, Edges::VertexAddress to,
const std::string &edge_type_name) {
database::GraphDbAccessor dba{worker(from.worker_id())};
auto from_v = dba.FindVertexChecked(from.gid(), false);
auto edge_type = dba.EdgeType(edge_type_name);
// If 'to' is on the same worker as 'from', create and send local.
if (to.worker_id() == from.worker_id()) {
auto to_v = dba.FindVertexChecked(to.gid(), false);
auto r_val = dba.InsertEdge(from_v, to_v, edge_type).GlobalAddress();
dba.Commit();
return r_val;
}
// 'to' is not on the same worker as 'from'
auto edge_ga = dba.InsertOnlyEdge(from, to, edge_type,
dba.db().storage().EdgeGenerator().Next())
.GlobalAddress();
from_v.update().out_.emplace(to, edge_ga, edge_type);
database::GraphDbAccessor dba_to{worker(to.worker_id()),
dba.transaction_id()};
auto to_v = dba_to.FindVertexChecked(to.gid(), false);
to_v.update().in_.emplace(from, edge_ga, edge_type);
dba.Commit();
return edge_ga;
}
private:
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;

View File

@ -65,36 +65,9 @@ TEST_F(DistributedGraphDbTest, RemoteDataGetting) {
TEST_F(DistributedGraphDbTest, RemoteExpansion) {
// Model (v1)-->(v2), where each vertex is on one worker.
std::vector<Edges::VertexAddress> v_ga;
{
GraphDbAccessor dba{master()};
v_ga.emplace_back(GraphDbAccessor(worker(1), dba.transaction_id())
.InsertVertex()
.GlobalAddress());
v_ga.emplace_back(GraphDbAccessor(worker(2), dba.transaction_id())
.InsertVertex()
.GlobalAddress());
dba.Commit();
}
{
GraphDbAccessor dba{master()};
auto edge_type = dba.EdgeType("et");
auto prop = dba.Property("prop");
GraphDbAccessor dba1{worker(1), dba.transaction_id()};
auto edge_ga =
dba1.InsertOnlyEdge(v_ga[0], v_ga[1], edge_type,
dba1.db().storage().EdgeGenerator().Next())
.GlobalAddress();
for (int i : {0, 1}) {
GraphDbAccessor dba_w{worker(i + 1), dba.transaction_id()};
auto v = dba_w.FindVertexChecked(v_ga[i].gid(), false);
// Update the vertex to create a new record.
v.PropsSet(prop, 42);
auto &edges = i == 0 ? v.GetNew()->out_ : v.GetNew()->in_;
edges.emplace(v_ga[(i + 1) % 2], edge_ga, edge_type);
}
dba.Commit();
}
auto from = InsertVertex(worker(1));
auto to = InsertVertex(worker(2));
InsertEdge(from, to, "et");
{
// Expand on the master for three hops. Collect vertex gids.
GraphDbAccessor dba{master()};
@ -107,11 +80,11 @@ TEST_F(DistributedGraphDbTest, RemoteExpansion) {
};
// Do a few hops back and forth, all on the master.
VertexAccessor v{v_ga[0], dba};
VertexAccessor v{from, dba};
for (int i = 0; i < 5; ++i) {
v = expand(v);
EXPECT_FALSE(v.address().is_local());
EXPECT_EQ(v.address(), v_ga[(i + 1) % 2]);
EXPECT_EQ(v.address(), i % 2 ? from : to);
}
}
}

View File

@ -149,7 +149,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, 3);
params, symbols, false, 3);
};
auto expect_first_batch = [](auto &batch) {
EXPECT_EQ(batch.pull_state,
@ -271,7 +271,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, 3);
params, symbols, false, 3);
};
auto future_w1_results = remote_pull(dba, 1);
auto future_w2_results = remote_pull(dba, 2);
@ -339,3 +339,57 @@ TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) {
VertexAccessor v_in_w2{v_ga, dba_w2};
EXPECT_EQ(v_in_w2.PropsAt(prop).Value<int64_t>(), 42);
}
TEST_F(DistributedGraphDbTest, Synchronize) {
auto from = InsertVertex(worker(1));
auto to = InsertVertex(worker(2));
InsertEdge(from, to, "et");
// Query: MATCH (n)--(m) SET m.prop = 2 RETURN n.prop
// This query ensures that a remote update gets applied and the local stuff
// gets reconstructed.
auto &db = master();
GraphDbAccessor dba{db};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
// MATCH
auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
auto r_m =
MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
EdgeAtom::Direction::BOTH, {}, "m", false, GraphView::OLD);
// SET
auto literal = LITERAL(42);
auto prop = PROPERTY_PAIR("prop");
auto m_p = PROPERTY_LOOKUP("m", prop);
ctx.symbol_table_[*m_p->expression_] = r_m.node_sym_;
auto set_m_p = std::make_shared<plan::SetProperty>(r_m.op_, m_p, literal);
const int plan_id = 42;
master().plan_dispatcher().DispatchPlan(plan_id, set_m_p, ctx.symbol_table_);
// Master-side PullRemote, Synchronize
auto pull_remote = std::make_shared<query::plan::PullRemote>(
nullptr, plan_id, std::vector<Symbol>{n.sym_});
auto synchronize =
std::make_shared<query::plan::Synchronize>(set_m_p, pull_remote, true);
// RETURN
auto n_p =
storage.Create<PropertyLookup>(storage.Create<Identifier>("n"), prop);
ctx.symbol_table_[*n_p->expression_] = n.sym_;
auto return_n_p = NEXPR("n.prop", n_p);
auto return_n_p_sym = ctx.symbol_table_.CreateSymbol("n.p", true);
ctx.symbol_table_[*return_n_p] = return_n_p_sym;
auto produce = MakeProduce(synchronize, return_n_p);
auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
ASSERT_EQ(results.size(), 2);
ASSERT_EQ(results[0].size(), 1);
EXPECT_EQ(results[0][0].ValueInt(), 42);
ASSERT_EQ(results[1].size(), 1);
EXPECT_EQ(results[1][0].ValueInt(), 42);
// TODO test without advance command?
}

View File

@ -1,8 +1,3 @@
//
// Copyright 2017 Memgraph
// Created by Florijan Stamenkovic on 14.03.17.
//
#include <experimental/optional>
#include <iterator>
#include <memory>

View File

@ -113,7 +113,6 @@ class PlanChecker : public HierarchicalLogicalOperatorVisitor {
return true;
}
PRE_VISIT(ProduceRemote);
PRE_VISIT(PullRemote);
bool PreVisit(Synchronize &op) override {
@ -1685,9 +1684,8 @@ TYPED_TEST(TestPlanner, WhereIndexedLabelPropertyRange) {
AstTreeStorage storage;
auto lit_42 = LITERAL(42);
auto n_prop = PROPERTY_LOOKUP("n", property);
auto check_planned_range = [&label, &property, &db](const auto &rel_expr,
auto lower_bound,
auto upper_bound) {
auto check_planned_range = [&label, &property, &db](
const auto &rel_expr, auto lower_bound, auto upper_bound) {
// Shadow the first storage, so that the query is created in this one.
AstTreeStorage storage;
QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n", label))), WHERE(rel_expr),
@ -2020,5 +2018,4 @@ TYPED_TEST(TestPlanner, DistributedMatchCreateReturn) {
MakeCheckers(ExpectScanAll(), ExpectCreateNode(), acc)};
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
} // namespace