Fix BFS subcursor cleanup
Reviewers: mtomic, teon.banek Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1748
This commit is contained in:
parent
d819c7b48c
commit
809e1779b1
@ -3,6 +3,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
#include "distributed/bfs_rpc_messages.hpp"
|
#include "distributed/bfs_rpc_messages.hpp"
|
||||||
#include "distributed/bfs_subcursor.hpp"
|
#include "distributed/bfs_subcursor.hpp"
|
||||||
@ -30,7 +31,10 @@ class BfsRpcServer {
|
|||||||
db_accessor.get(), req.direction, req.edge_types,
|
db_accessor.get(), req.direction, req.edge_types,
|
||||||
std::move(req.symbol_table), std::move(ast_storage),
|
std::move(req.symbol_table), std::move(ast_storage),
|
||||||
req.filter_lambda, std::move(req.evaluation_context));
|
req.filter_lambda, std::move(req.evaluation_context));
|
||||||
db_accessors_[id] = std::move(db_accessor);
|
{
|
||||||
|
std::lock_guard<std::mutex> guard(lock_);
|
||||||
|
db_accessors_[id] = std::move(db_accessor);
|
||||||
|
}
|
||||||
CreateBfsSubcursorRes res(id);
|
CreateBfsSubcursorRes res(id);
|
||||||
Save(res, res_builder);
|
Save(res, res_builder);
|
||||||
});
|
});
|
||||||
@ -58,7 +62,10 @@ class BfsRpcServer {
|
|||||||
[this](const auto &req_reader, auto *res_builder) {
|
[this](const auto &req_reader, auto *res_builder) {
|
||||||
RemoveBfsSubcursorReq req;
|
RemoveBfsSubcursorReq req;
|
||||||
Load(&req, req_reader);
|
Load(&req, req_reader);
|
||||||
db_accessors_.erase(req.member);
|
{
|
||||||
|
std::lock_guard<std::mutex> guard(lock_);
|
||||||
|
db_accessors_.erase(req.member);
|
||||||
|
}
|
||||||
subcursor_storage_->Erase(req.member);
|
subcursor_storage_->Erase(req.member);
|
||||||
RemoveBfsSubcursorRes res;
|
RemoveBfsSubcursorRes res;
|
||||||
Save(res, res_builder);
|
Save(res, res_builder);
|
||||||
@ -109,7 +116,7 @@ class BfsRpcServer {
|
|||||||
});
|
});
|
||||||
|
|
||||||
coordination->Register<ReconstructPathRpc>([this](const auto &req_reader,
|
coordination->Register<ReconstructPathRpc>([this](const auto &req_reader,
|
||||||
auto *res_builder) {
|
auto *res_builder) {
|
||||||
ReconstructPathReq req;
|
ReconstructPathReq req;
|
||||||
Load(&req, req_reader);
|
Load(&req, req_reader);
|
||||||
auto subcursor = subcursor_storage_->Get(req.subcursor_id);
|
auto subcursor = subcursor_storage_->Get(req.subcursor_id);
|
||||||
@ -127,7 +134,7 @@ class BfsRpcServer {
|
|||||||
});
|
});
|
||||||
|
|
||||||
coordination->Register<PrepareForExpandRpc>([this](const auto &req_reader,
|
coordination->Register<PrepareForExpandRpc>([this](const auto &req_reader,
|
||||||
auto *res_builder) {
|
auto *res_builder) {
|
||||||
PrepareForExpandReq req;
|
PrepareForExpandReq req;
|
||||||
auto subcursor_id = req_reader.getSubcursorId();
|
auto subcursor_id = req_reader.getSubcursorId();
|
||||||
auto *subcursor = subcursor_storage_->Get(subcursor_id);
|
auto *subcursor = subcursor_storage_->Get(subcursor_id);
|
||||||
@ -141,6 +148,7 @@ class BfsRpcServer {
|
|||||||
private:
|
private:
|
||||||
database::DistributedGraphDb *db_;
|
database::DistributedGraphDb *db_;
|
||||||
|
|
||||||
|
std::mutex lock_;
|
||||||
std::map<int64_t, std::unique_ptr<database::GraphDbAccessor>> db_accessors_;
|
std::map<int64_t, std::unique_ptr<database::GraphDbAccessor>> db_accessors_;
|
||||||
BfsSubcursorStorage *subcursor_storage_;
|
BfsSubcursorStorage *subcursor_storage_;
|
||||||
};
|
};
|
||||||
|
@ -1062,11 +1062,6 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
|
|||||||
<< "ExpandVariable should only be planned with GraphView::OLD";
|
<< "ExpandVariable should only be planned with GraphView::OLD";
|
||||||
}
|
}
|
||||||
|
|
||||||
~DistributedExpandBfsCursor() {
|
|
||||||
VLOG(10) << "Removing BFS subcursors";
|
|
||||||
bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void InitSubcursors(database::GraphDbAccessor *dba,
|
void InitSubcursors(database::GraphDbAccessor *dba,
|
||||||
const query::SymbolTable &symbol_table,
|
const query::SymbolTable &symbol_table,
|
||||||
const EvaluationContext &evaluation_context) {
|
const EvaluationContext &evaluation_context) {
|
||||||
@ -1212,7 +1207,13 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
void Shutdown() override {
|
||||||
|
input_cursor_->Shutdown();
|
||||||
|
VLOG(10) << "Removing BFS subcursors";
|
||||||
|
// TODO: This should be done using the
|
||||||
|
// `RegisterForTransactionalCacheCleanup` mechanism.
|
||||||
|
bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_);
|
||||||
|
}
|
||||||
|
|
||||||
void Reset() override {
|
void Reset() override {
|
||||||
input_cursor_->Reset();
|
input_cursor_->Reset();
|
||||||
|
Loading…
Reference in New Issue
Block a user