Send tx info with pull remote

Reviewers: dgleich, teon.banek

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1219
This commit is contained in:
florijan 2018-02-21 16:06:56 +01:00
parent 452ce6a30c
commit 2a99b9c80e
5 changed files with 38 additions and 15 deletions

View File

@ -21,6 +21,7 @@
#include "query/plan/operator.hpp"
#include "query/typed_value.hpp"
#include "transactions/engine.hpp"
#include "transactions/engine_worker.hpp"
#include "transactions/tx_end_listener.hpp"
#include "transactions/type.hpp"
@ -131,13 +132,13 @@ class RemoteProduceRpcServer {
};
public:
RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &engine,
RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine,
communication::rpc::System &system,
const distributed::PlanConsumer &plan_consumer)
: db_(db),
remote_produce_rpc_server_(system, kRemotePullProduceRpcName),
plan_consumer_(plan_consumer),
engine_(engine) {
tx_engine_(tx_engine) {
remote_produce_rpc_server_.Register<RemotePullRpc>(
[this](const RemotePullReq &req) {
return std::make_unique<RemotePullRes>(RemotePull(req));
@ -145,7 +146,7 @@ class RemoteProduceRpcServer {
remote_produce_rpc_server_.Register<TransactionCommandAdvancedRpc>(
[this](const TransactionCommandAdvancedReq &req) {
db_.tx_engine().UpdateCommand(req.member);
tx_engine_.UpdateCommand(req.member);
db_.remote_data_manager().ClearCaches(req.member);
return std::make_unique<TransactionCommandAdvancedRes>();
});
@ -160,9 +161,9 @@ class RemoteProduceRpcServer {
ongoing_produces_;
std::mutex ongoing_produces_lock_;
tx::Engine &engine_;
tx::Engine &tx_engine_;
tx::TxEndListener tx_end_listener_{
engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }};
tx_engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }};
// Removes all onging pulls for the given tx_id (that transaction expired).
void ClearCache(tx::transaction_id_t tx_id) {
@ -182,7 +183,11 @@ class RemoteProduceRpcServer {
if (found != ongoing_produces_.end()) {
return found->second;
}
if (db_.type() == database::GraphDb::Type::DISTRIBUTED_WORKER) {
// On the worker cache the snapshot to have one RPC less.
dynamic_cast<tx::WorkerEngine &>(tx_engine_)
.RunningTransaction(req.tx_id, req.tx_snapshot);
}
auto &plan_pack = plan_consumer_.PlanForId(req.plan_id);
return ongoing_produces_
.emplace(std::piecewise_construct,

View File

@ -37,10 +37,12 @@ const std::string kRemotePullProduceRpcName = "RemotePullProduceRpc";
struct RemotePullReq : public communication::rpc::Message {
RemotePullReq() {}
RemotePullReq(tx::transaction_id_t tx_id, int64_t plan_id,
const Parameters &params, std::vector<query::Symbol> symbols,
bool accumulate, int batch_size, bool send_old, bool send_new)
RemotePullReq(tx::transaction_id_t tx_id, tx::Snapshot tx_snapshot,
int64_t plan_id, const Parameters &params,
std::vector<query::Symbol> symbols, bool accumulate,
int batch_size, bool send_old, bool send_new)
: tx_id(tx_id),
tx_snapshot(tx_snapshot),
plan_id(plan_id),
params(params),
symbols(symbols),
@ -50,6 +52,7 @@ struct RemotePullReq : public communication::rpc::Message {
send_new(send_new) {}
tx::transaction_id_t tx_id;
tx::Snapshot tx_snapshot;
int64_t plan_id;
Parameters params;
std::vector<query::Symbol> symbols;
@ -66,6 +69,7 @@ struct RemotePullReq : public communication::rpc::Message {
void save(TArchive &ar, unsigned int) const {
ar << boost::serialization::base_object<communication::rpc::Message>(*this);
ar << tx_id;
ar << tx_snapshot;
ar << plan_id;
ar << params.size();
for (auto &kv : params) {
@ -84,6 +88,7 @@ struct RemotePullReq : public communication::rpc::Message {
void load(TArchive &ar, unsigned int) {
ar >> boost::serialization::base_object<communication::rpc::Message>(*this);
ar >> tx_id;
ar >> tx_snapshot;
ar >> plan_id;
size_t params_size;
ar >> params_size;

View File

@ -39,8 +39,8 @@ class RemotePullRpcClients {
worker_id, [&dba, plan_id, params, symbols, accumulate,
batch_size](ClientPool &client_pool) {
auto result = client_pool.Call<RemotePullRpc>(
dba.transaction_id(), plan_id, params, symbols, accumulate,
batch_size, true, true);
dba.transaction_id(), dba.transaction().snapshot(), plan_id,
params, symbols, accumulate, batch_size, true, true);
auto handle_vertex = [&dba](auto &v) {
dba.db()

View File

@ -109,20 +109,29 @@ bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const {
return rpc_client_pool_.Call<IsActiveRpc>(tid)->member;
}
tx::transaction_id_t WorkerEngine::LocalLast() const { return local_last_; }
transaction_id_t WorkerEngine::LocalLast() const { return local_last_; }
void WorkerEngine::LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) {
for (auto pair : active_.access()) f(*pair.second);
}
tx::Transaction *WorkerEngine::RunningTransaction(tx::transaction_id_t tx_id) {
Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
Snapshot snapshot(
std::move(rpc_client_pool_.Call<SnapshotRpc>(tx_id)->member));
return RunningTransaction(tx_id, snapshot);
}
Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id,
const Snapshot &snapshot) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
auto new_tx = new Transaction(tx_id, snapshot, *this);
auto insertion = accessor.insert(tx_id, new_tx);
if (!insertion.second) delete new_tx;

View File

@ -35,10 +35,14 @@ class WorkerEngine : public Engine {
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;
bool GlobalIsActive(transaction_id_t tid) const override;
tx::transaction_id_t LocalLast() const override;
transaction_id_t LocalLast() const override;
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) override;
Transaction *RunningTransaction(transaction_id_t tx_id) override;
// Caches the transaction for the given info an returs a ptr to it.
Transaction *RunningTransaction(transaction_id_t tx_id,
const Snapshot &snapshot);
private:
// Local caches.