diff --git a/src/distributed/bfs_rpc_server.hpp b/src/distributed/bfs_rpc_server.hpp index 636a01f47..946c79087 100644 --- a/src/distributed/bfs_rpc_server.hpp +++ b/src/distributed/bfs_rpc_server.hpp @@ -3,6 +3,7 @@ #pragma once #include +#include #include "distributed/bfs_rpc_messages.hpp" #include "distributed/bfs_subcursor.hpp" @@ -30,7 +31,10 @@ class BfsRpcServer { db_accessor.get(), req.direction, req.edge_types, std::move(req.symbol_table), std::move(ast_storage), req.filter_lambda, std::move(req.evaluation_context)); - db_accessors_[id] = std::move(db_accessor); + { + std::lock_guard guard(lock_); + db_accessors_[id] = std::move(db_accessor); + } CreateBfsSubcursorRes res(id); Save(res, res_builder); }); @@ -58,7 +62,10 @@ class BfsRpcServer { [this](const auto &req_reader, auto *res_builder) { RemoveBfsSubcursorReq req; Load(&req, req_reader); - db_accessors_.erase(req.member); + { + std::lock_guard guard(lock_); + db_accessors_.erase(req.member); + } subcursor_storage_->Erase(req.member); RemoveBfsSubcursorRes res; Save(res, res_builder); @@ -109,7 +116,7 @@ class BfsRpcServer { }); coordination->Register([this](const auto &req_reader, - auto *res_builder) { + auto *res_builder) { ReconstructPathReq req; Load(&req, req_reader); auto subcursor = subcursor_storage_->Get(req.subcursor_id); @@ -127,7 +134,7 @@ class BfsRpcServer { }); coordination->Register([this](const auto &req_reader, - auto *res_builder) { + auto *res_builder) { PrepareForExpandReq req; auto subcursor_id = req_reader.getSubcursorId(); auto *subcursor = subcursor_storage_->Get(subcursor_id); @@ -141,6 +148,7 @@ class BfsRpcServer { private: database::DistributedGraphDb *db_; + std::mutex lock_; std::map> db_accessors_; BfsSubcursorStorage *subcursor_storage_; }; diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index edcebfe60..c11f1e784 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -1062,11 +1062,6 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { << "ExpandVariable should only be planned with GraphView::OLD"; } - ~DistributedExpandBfsCursor() { - VLOG(10) << "Removing BFS subcursors"; - bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_); - } - void InitSubcursors(database::GraphDbAccessor *dba, const query::SymbolTable &symbol_table, const EvaluationContext &evaluation_context) { @@ -1212,7 +1207,13 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { } } - void Shutdown() override { input_cursor_->Shutdown(); } + void Shutdown() override { + input_cursor_->Shutdown(); + VLOG(10) << "Removing BFS subcursors"; + // TODO: This should be done using the + // `RegisterForTransactionalCacheCleanup` mechanism. + bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_); + } void Reset() override { input_cursor_->Reset();