Extract distributed create operators

Reviewers: mtomic, msantl

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1575
This commit is contained in:
Teon Banek 2018-08-30 13:31:50 +02:00
parent 2c732f3ea1
commit 80f649bcd1
13 changed files with 411 additions and 180 deletions

View File

@ -1,10 +1,14 @@
/// @file
#pragma once
#include <cstdint>
#include <string>
#include "query/exceptions.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol.hpp"
#include "query/typed_value.hpp"
#include "storage/types.hpp"
#include "query/common.capnp.h"
@ -17,27 +21,21 @@ std::string ParseStringLiteral(const std::string &s);
double ParseDoubleLiteral(const std::string &s);
std::string ParseParameter(const std::string &s);
/**
* Indicates that some part of query execution should
* see the OLD graph state (the latest state before the
* current transaction+command), or NEW (state as
* changed by the current transaction+command).
*/
/// Indicates that some part of query execution should see the OLD graph state
/// (the latest state before the current transaction+command), or NEW (state as
/// changed by the current transaction+command).
enum class GraphView { OLD, NEW };
/**
* Helper function for recursively reconstructing all the accessors in the
* given TypedValue.
*
* @returns - If the reconstruction succeeded.
*/
/// Recursively reconstruct all the accessors in the given TypedValue.
///
/// @throw ReconstructionException if any reconstruction failed.
void ReconstructTypedValue(TypedValue &value);
// Custom Comparator type for comparing vectors of TypedValues.
//
// Does lexicographical ordering of elements based on the above
// defined TypedValueCompare, and also accepts a vector of Orderings
// the define how respective elements compare.
/// Custom Comparator type for comparing vectors of TypedValues.
///
/// Does lexicographical ordering of elements based on the above
/// defined TypedValueCompare, and also accepts a vector of Orderings
/// the define how respective elements compare.
class TypedValueVectorCompare final {
public:
TypedValueVectorCompare() {}
@ -55,8 +53,33 @@ class TypedValueVectorCompare final {
std::vector<Ordering> ordering_;
};
// Switch the given [Vertex/Edge]Accessor to the desired state.
/// Switch the given [Vertex/Edge]Accessor to the desired state.
template <class TAccessor>
void SwitchAccessor(TAccessor &accessor, GraphView graph_view);
/// Raise QueryRuntimeException if the value for symbol isn't of expected type.
inline void ExpectType(const Symbol &symbol, const TypedValue &value,
TypedValue::Type expected) {
if (value.type() != expected)
throw QueryRuntimeException("Expected a {} for '{}', but got {}.", expected,
symbol.name(), value.type());
}
/// Set a property `value` mapped with given `key` on a `record`.
///
/// @throw QueryRuntimeException if value cannot be set as a property value
template <class TRecordAccessor>
void PropsSetChecked(TRecordAccessor *record, const storage::Property &key,
const TypedValue &value) {
try {
record->PropsSet(key, value);
} catch (const TypedValueException &) {
throw QueryRuntimeException("'{}' cannot be used as a property value.",
value.type());
} catch (const RecordDeletedError &) {
throw QueryRuntimeException(
"Trying to set properties on a deleted graph element.");
}
}
} // namespace query

View File

@ -478,11 +478,11 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor {
// symbols. This is the case when we are planning *another* Cartesian after
// Produce.
bool PreVisit(CreateNode &op) override {
bool PreVisit(DistributedCreateNode &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(CreateNode &op) override {
bool PostVisit(DistributedCreateNode &op) override {
prev_ops_.pop_back();
CHECK(!FindForbidden(symbol_table_->at(*(op.node_atom()->identifier_))));
for (auto &kv : op.node_atom()->properties_) {
@ -493,11 +493,11 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor {
return true;
}
bool PreVisit(CreateExpand &op) override {
bool PreVisit(DistributedCreateExpand &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(CreateExpand &op) override {
bool PostVisit(DistributedCreateExpand &op) override {
prev_ops_.pop_back();
CHECK(!FindForbidden(op.input_symbol()));
CHECK(!FindForbidden(symbol_table_->at(*(op.node_atom()->identifier_))));
@ -1490,9 +1490,13 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
// Creation needs to be modified if running on master, so as to distribute
// node creation to workers.
if (!ShouldSplit()) {
op.set_on_random_worker(true);
}
bool create_on_random_worker = !ShouldSplit();
auto distributed_create = std::make_unique<DistributedCreateNode>(
op.input(), op.node_atom(), create_on_random_worker);
if (prev_ops_.empty())
distributed_plan_.master_plan = std::move(distributed_create);
else
SetOnPrevious(std::move(distributed_create));
needs_synchronize_ = true;
return true;
}
@ -1506,6 +1510,13 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
if (!cartesian_branches_.empty()) {
Split(op, PlanCartesian(op.input()));
}
auto distributed_create = std::make_unique<DistributedCreateExpand>(
op.node_atom(), op.edge_atom(), op.input(), op.input_symbol(),
op.existing_node());
if (prev_ops_.empty())
distributed_plan_.master_plan = std::move(distributed_create);
else
SetOnPrevious(std::move(distributed_create));
needs_synchronize_ = true;
return true;
}
@ -1652,10 +1663,10 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
void SetOnPrevious(std::unique_ptr<LogicalOperator> input_op) {
auto *prev_op = prev_ops_.back();
CHECK(prev_op)
CHECK(!prev_ops_.empty())
<< "SetOnPrevious should only be called when there is a previously "
"visited operation";
auto *prev_op = prev_ops_.back();
if (!prev_op->HasSingleInput())
throw utils::NotYetImplemented("distributed planning");
prev_op->set_input(std::move(input_op));

View File

@ -29,6 +29,10 @@ DEFINE_HIDDEN_int32(remote_pull_sleep_micros, 10,
namespace query::plan {
// Create a vertex on this GraphDb and return it. Defined in operator.cpp
VertexAccessor &CreateLocalVertex(NodeAtom *node_atom, Frame &frame,
Context &context);
bool PullRemote::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
auto *distributed_visitor =
dynamic_cast<DistributedOperatorVisitor *>(&visitor);
@ -126,6 +130,41 @@ std::vector<Symbol> DistributedExpandBfs::ModifiedSymbols(
return symbols;
}
DistributedCreateNode::DistributedCreateNode(
const std::shared_ptr<LogicalOperator> &input, NodeAtom *node_atom,
bool on_random_worker)
: input_(input),
node_atom_(node_atom),
on_random_worker_(on_random_worker) {}
ACCEPT_WITH_INPUT(DistributedCreateNode);
std::vector<Symbol> DistributedCreateNode::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(table.at(*node_atom_->identifier_));
return symbols;
}
DistributedCreateExpand::DistributedCreateExpand(
NodeAtom *node_atom, EdgeAtom *edge_atom,
const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol,
bool existing_node)
: node_atom_(node_atom),
edge_atom_(edge_atom),
input_(input ? input : std::make_shared<Once>()),
input_symbol_(input_symbol),
existing_node_(existing_node) {}
ACCEPT_WITH_INPUT(DistributedCreateExpand);
std::vector<Symbol> DistributedCreateExpand::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(table.at(*node_atom_->identifier_));
symbols.emplace_back(table.at(*edge_atom_->identifier_));
return symbols;
}
//////////////////////////////////////////////////////////////////////
// Cursors
//////////////////////////////////////////////////////////////////////
@ -866,7 +905,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
const DistributedExpandBfs &self_;
database::GraphDbAccessor &db_;
distributed::BfsRpcClients *bfs_subcursor_clients_{nullptr};
const std::unique_ptr<query::plan::Cursor> input_cursor_;
std::unique_ptr<query::plan::Cursor> input_cursor_;
// Depth bounds. Calculated on each pull from the input, the initial value
// is irrelevant.
@ -887,6 +926,167 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
std::unordered_map<int16_t, int64_t>::iterator pull_pos_;
};
// Returns a random worker id. Worker ID is obtained from the Db.
int RandomWorkerId(const database::DistributedGraphDb &db) {
thread_local std::mt19937 gen_{std::random_device{}()};
thread_local std::uniform_int_distribution<int> rand_;
auto worker_ids = db.GetWorkerIds();
return worker_ids[rand_(gen_) % worker_ids.size()];
}
// Creates a vertex on the GraphDb with the given worker_id. Can be this worker.
VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom,
Frame &frame, Context &context) {
auto &dba = context.db_accessor_;
int current_worker_id = 0;
// TODO: Figure out a better solution.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
current_worker_id = distributed_db->WorkerId();
} else {
CHECK(dynamic_cast<database::SingleNode *>(&dba.db()));
}
if (worker_id == current_worker_id)
return CreateLocalVertex(node_atom, frame, context);
std::unordered_map<storage::Property, query::TypedValue> properties;
// Evaluator should use the latest accessors, as modified in this query, when
// setting properties on new nodes.
ExpressionEvaluator evaluator(frame, &context, GraphView::NEW);
for (auto &kv : node_atom->properties_) {
auto value = kv.second->Accept(evaluator);
if (!value.IsPropertyValue()) {
throw QueryRuntimeException("'{}' cannot be used as a property value.",
value.type());
}
properties.emplace(kv.first.second, std::move(value));
}
auto new_node = database::InsertVertexIntoRemote(
&dba, worker_id, node_atom->labels_, properties);
frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node;
return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex();
}
class DistributedCreateNodeCursor : public query::plan::Cursor {
public:
DistributedCreateNodeCursor(const DistributedCreateNode *self,
database::GraphDbAccessor *dba)
: input_cursor_(self->input()->MakeCursor(*dba)),
// TODO: Replace this with some other mechanism
db_(dynamic_cast<database::DistributedGraphDb *>(&dba->db())),
node_atom_(self->node_atom()),
on_random_worker_(self->on_random_worker()) {
CHECK(db_);
CHECK(node_atom_);
}
bool Pull(Frame &frame, Context &context) {
if (input_cursor_->Pull(frame, context)) {
if (on_random_worker_) {
CreateVertexOnWorker(RandomWorkerId(*db_), node_atom_, frame, context);
} else {
CreateLocalVertex(node_atom_, frame, context);
}
return true;
}
return false;
}
void Reset() { input_cursor_->Reset(); }
private:
std::unique_ptr<query::plan::Cursor> input_cursor_;
database::DistributedGraphDb *db_{nullptr};
NodeAtom *node_atom_{nullptr};
bool on_random_worker_{false};
};
class DistributedCreateExpandCursor : public query::plan::Cursor {
public:
DistributedCreateExpandCursor(const DistributedCreateExpand *self,
database::GraphDbAccessor *dba)
: input_cursor_(self->input()->MakeCursor(*dba)),
self_(self),
// TODO: Replace this with some other mechanism
db_(dynamic_cast<database::DistributedGraphDb *>(&dba->db())) {
CHECK(db_);
}
bool Pull(Frame &frame, Context &context) {
if (!input_cursor_->Pull(frame, context)) return false;
// get the origin vertex
TypedValue &vertex_value = frame[self_->input_symbol()];
ExpectType(self_->input_symbol(), vertex_value, TypedValue::Type::Vertex);
auto &v1 = vertex_value.Value<VertexAccessor>();
// Similarly to CreateNode, newly created edges and nodes should use the
// latest accesors.
ExpressionEvaluator evaluator(frame, &context, GraphView::NEW);
// E.g. we pickup new properties: `CREATE (n {p: 42}) -[:r {ep: n.p}]-> ()`
v1.SwitchNew();
// get the destination vertex (possibly an existing node)
auto &v2 = OtherVertex(v1.GlobalAddress().worker_id(), frame, context);
v2.SwitchNew();
auto *dba = &context.db_accessor_;
// create an edge between the two nodes
switch (self_->edge_atom()->direction_) {
case EdgeAtom::Direction::IN:
CreateEdge(&v2, &v1, &frame, context.symbol_table_, &evaluator, dba);
break;
case EdgeAtom::Direction::OUT:
CreateEdge(&v1, &v2, &frame, context.symbol_table_, &evaluator, dba);
break;
case EdgeAtom::Direction::BOTH:
// in the case of an undirected CreateExpand we choose an arbitrary
// direction. this is used in the MERGE clause
// it is not allowed in the CREATE clause, and the semantic
// checker needs to ensure it doesn't reach this point
CreateEdge(&v1, &v2, &frame, context.symbol_table_, &evaluator, dba);
}
return true;
}
void Reset() { input_cursor_->Reset(); }
VertexAccessor &OtherVertex(int worker_id, Frame &frame, Context &context) {
if (self_->existing_node()) {
const auto &dest_node_symbol =
context.symbol_table_.at(*self_->node_atom()->identifier_);
TypedValue &dest_node_value = frame[dest_node_symbol];
ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex);
return dest_node_value.Value<VertexAccessor>();
} else {
return CreateVertexOnWorker(worker_id, self_->node_atom(), frame,
context);
}
}
void CreateEdge(VertexAccessor *from, VertexAccessor *to, Frame *frame,
const SymbolTable &symbol_table,
ExpressionEvaluator *evaluator,
database::GraphDbAccessor *dba) {
EdgeAccessor edge =
dba->InsertEdge(*from, *to, self_->edge_atom()->edge_types_[0]);
for (auto kv : self_->edge_atom()->properties_)
PropsSetChecked(&edge, kv.first.second, kv.second->Accept(*evaluator));
(*frame)[symbol_table.at(*self_->edge_atom()->identifier_)] = edge;
}
private:
std::unique_ptr<query::plan::Cursor> input_cursor_;
const DistributedCreateExpand *self_{nullptr};
database::DistributedGraphDb *db_{nullptr};
};
} // namespace
std::unique_ptr<Cursor> PullRemote::MakeCursor(
@ -909,4 +1109,14 @@ std::unique_ptr<Cursor> DistributedExpandBfs::MakeCursor(
return std::make_unique<DistributedExpandBfsCursor>(*this, db);
}
std::unique_ptr<Cursor> DistributedCreateNode::MakeCursor(
database::GraphDbAccessor &db) const {
return std::make_unique<DistributedCreateNodeCursor>(this, &db);
}
std::unique_ptr<Cursor> DistributedCreateExpand::MakeCursor(
database::GraphDbAccessor &db) const {
return std::make_unique<DistributedCreateExpandCursor>(this, &db);
}
} // namespace query::plan

View File

@ -16,10 +16,13 @@ class PullRemote;
class Synchronize;
class PullRemoteOrderBy;
class DistributedExpandBfs;
class DistributedCreateNode;
class DistributedCreateExpand;
using DistributedOperatorCompositeVisitor =
::utils::CompositeVisitor<PullRemote, Synchronize, PullRemoteOrderBy,
DistributedExpandBfs>;
DistributedExpandBfs, DistributedCreateNode,
DistributedCreateExpand>;
/// Base class for visiting regular and distributed LogicalOperator instances.
class DistributedOperatorVisitor : public HierarchicalLogicalOperatorVisitor,
@ -211,5 +214,65 @@ by having only one result from each worker.")
(:private #>cpp DistributedExpandBfs() {} cpp<#)
(:serialize :capnp :inherit-compose '(expand-common)))
(lcp:define-class distributed-create-node (logical-operator)
((input "std::shared_ptr<LogicalOperator>"
:capnp-save #'save-operator-pointer
:capnp-load #'load-operator-pointer)
(node-atom "NodeAtom *" :initval "nullptr" :reader t
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *"))
(on-random-worker :bool :initval "false" :reader t))
(:documentation "Create nodes in distributed environment.")
(:public
#>cpp
DistributedCreateNode(const std::shared_ptr<LogicalOperator> &input,
NodeAtom *node_atom, bool on_random_worker);
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
bool HasSingleInput() const override { return true; }
std::shared_ptr<LogicalOperator> input() const override { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) override {
input_ = input;
}
cpp<#)
(:private #>cpp DistributedCreateNode() {} cpp<#)
(:serialize :capnp))
(lcp:define-class distributed-create-expand (logical-operator)
((node-atom "NodeAtom *" :reader t
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *"))
(edge-atom "EdgeAtom *" :reader t
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "EdgeAtom *"))
(input "std::shared_ptr<LogicalOperator>"
:capnp-save #'save-operator-pointer
:capnp-load #'load-operator-pointer)
(input-symbol "Symbol" :reader t)
(existing-node :bool :reader t))
(:documentation "Distributed version of CreateExpand")
(:public
#>cpp
DistributedCreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom,
const std::shared_ptr<LogicalOperator> &input,
Symbol input_symbol, bool existing_node);
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
bool HasSingleInput() const override { return true; }
std::shared_ptr<LogicalOperator> input() const override { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) override {
input_ = input;
}
cpp<#)
(:private #>cpp DistributedCreateExpand() {} cpp<#)
(:serialize :capnp))
(lcp:pop-namespace)
(lcp:pop-namespace)

View File

@ -15,7 +15,6 @@
#include "auth/auth.hpp"
#include "communication/result_stream_faker.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "glue/auth.hpp"
#include "glue/communication.hpp"
@ -60,31 +59,6 @@ namespace query::plan {
namespace {
// Sets a property on a record accessor from a TypedValue. In cases when the
// TypedValue cannot be converted to PropertyValue,
// QueryRuntimeException is raised.
template <class TRecordAccessor>
void PropsSetChecked(TRecordAccessor &record, storage::Property key,
TypedValue value) {
try {
record.PropsSet(key, value);
} catch (const TypedValueException &) {
throw QueryRuntimeException("'{}' cannot be used as a property value.",
value.type());
} catch (const RecordDeletedError &) {
throw QueryRuntimeException(
"Trying to set properties on a deleted graph element.");
}
}
// Checks if the given value of the symbol has the expected type. If not, raises
// QueryRuntimeException.
void ExpectType(Symbol symbol, TypedValue value, TypedValue::Type expected) {
if (value.type() != expected)
throw QueryRuntimeException("Expected a {} for '{}', but got {}.", expected,
symbol.name(), value.type());
}
// Returns boolean result of evaluating filter expression. Null is treated as
// false. Other non boolean values raise a QueryRuntimeException.
bool EvaluateFilter(ExpressionEvaluator &evaluator, Expression *filter) {
@ -117,21 +91,8 @@ WITHOUT_SINGLE_INPUT(Once);
void Once::OnceCursor::Reset() { did_pull_ = false; }
CreateNode::CreateNode(const std::shared_ptr<LogicalOperator> &input,
NodeAtom *node_atom, bool on_random_worker)
: input_(input ? input : std::make_shared<Once>()),
node_atom_(node_atom),
on_random_worker_(on_random_worker) {}
namespace {
// Returns a random worker id. Worker ID is obtained from the Db.
int RandomWorkerId(const database::DistributedGraphDb &db) {
thread_local std::mt19937 gen_{std::random_device{}()};
thread_local std::uniform_int_distribution<int> rand_;
auto worker_ids = db.GetWorkerIds();
return worker_ids[rand_(gen_) % worker_ids.size()];
}
NodeAtom *node_atom)
: input_(input ? input : std::make_shared<Once>()), node_atom_(node_atom) {}
// Creates a vertex on this GraphDb. Returns a reference to vertex placed on the
// frame.
@ -145,50 +106,11 @@ VertexAccessor &CreateLocalVertex(NodeAtom *node_atom, Frame &frame,
// setting properties on new nodes.
ExpressionEvaluator evaluator(frame, &context, GraphView::NEW);
for (auto &kv : node_atom->properties_)
PropsSetChecked(new_node, kv.first.second, kv.second->Accept(evaluator));
PropsSetChecked(&new_node, kv.first.second, kv.second->Accept(evaluator));
frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node;
return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex();
}
// Creates a vertex on the GraphDb with the given worker_id. Can be this worker.
VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom,
Frame &frame, Context &context) {
auto &dba = context.db_accessor_;
int current_worker_id = 0;
// TODO: Figure out a better solution.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
current_worker_id = distributed_db->WorkerId();
} else {
CHECK(dynamic_cast<database::SingleNode *>(&dba.db()));
}
if (worker_id == current_worker_id)
return CreateLocalVertex(node_atom, frame, context);
std::unordered_map<storage::Property, query::TypedValue> properties;
// Evaluator should use the latest accessors, as modified in this query, when
// setting properties on new nodes.
ExpressionEvaluator evaluator(frame, &context, GraphView::NEW);
for (auto &kv : node_atom->properties_) {
auto value = kv.second->Accept(evaluator);
if (!value.IsPropertyValue()) {
throw QueryRuntimeException("'{}' cannot be used as a property value.",
value.type());
}
properties.emplace(kv.first.second, std::move(value));
}
auto new_node = database::InsertVertexIntoRemote(
&dba, worker_id, node_atom->labels_, properties);
frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node;
return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex();
}
} // namespace
ACCEPT_WITH_INPUT(CreateNode)
std::unique_ptr<Cursor> CreateNode::MakeCursor(
@ -205,20 +127,11 @@ std::vector<Symbol> CreateNode::ModifiedSymbols(
CreateNode::CreateNodeCursor::CreateNodeCursor(const CreateNode &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
bool CreateNode::CreateNodeCursor::Pull(Frame &frame, Context &context) {
if (input_cursor_->Pull(frame, context)) {
if (self_.on_random_worker_) {
// TODO: Replace this with some other mechanism
auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db_.db());
CHECK(distributed_db);
CreateVertexOnWorker(RandomWorkerId(*distributed_db), self_.node_atom_,
frame, context);
} else {
CreateLocalVertex(self_.node_atom_, frame, context);
}
CreateLocalVertex(self_.node_atom_, frame, context);
return true;
}
return false;
@ -261,6 +174,8 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) {
TypedValue &vertex_value = frame[self_.input_symbol_];
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
auto &v1 = vertex_value.Value<VertexAccessor>();
CHECK(v1.GlobalAddress().worker_id() == 0 && v1.is_local())
<< "Expected CreateExpand only in single node execution";
// Similarly to CreateNode, newly created edges and nodes should use the
// latest accesors.
@ -269,7 +184,7 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) {
v1.SwitchNew();
// get the destination vertex (possibly an existing node)
auto &v2 = OtherVertex(v1.GlobalAddress().worker_id(), frame, context);
auto &v2 = OtherVertex(frame, context);
v2.SwitchNew();
// create an edge between the two nodes
@ -294,7 +209,7 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) {
void CreateExpand::CreateExpandCursor::Reset() { input_cursor_->Reset(); }
VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex(
int worker_id, Frame &frame, Context &context) {
Frame &frame, Context &context) {
if (self_.existing_node_) {
const auto &dest_node_symbol =
context.symbol_table_.at(*self_.node_atom_->identifier_);
@ -302,7 +217,7 @@ VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex(
ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex);
return dest_node_value.Value<VertexAccessor>();
} else {
return CreateVertexOnWorker(worker_id, self_.node_atom_, frame, context);
return CreateLocalVertex(self_.node_atom(), frame, context);
}
}
@ -312,7 +227,7 @@ void CreateExpand::CreateExpandCursor::CreateEdge(
EdgeAccessor edge =
db_.InsertEdge(from, to, self_.edge_atom_->edge_types_[0]);
for (auto kv : self_.edge_atom_->properties_)
PropsSetChecked(edge, kv.first.second, kv.second->Accept(evaluator));
PropsSetChecked(&edge, kv.first.second, kv.second->Accept(evaluator));
frame[symbol_table.at(*self_.edge_atom_->identifier_)] = edge;
}
@ -1918,10 +1833,10 @@ bool SetProperty::SetPropertyCursor::Pull(Frame &frame, Context &context) {
switch (lhs.type()) {
case TypedValue::Type::Vertex:
PropsSetChecked(lhs.Value<VertexAccessor>(), self_.lhs_->property_, rhs);
PropsSetChecked(&lhs.Value<VertexAccessor>(), self_.lhs_->property_, rhs);
break;
case TypedValue::Type::Edge:
PropsSetChecked(lhs.Value<EdgeAccessor>(), self_.lhs_->property_, rhs);
PropsSetChecked(&lhs.Value<EdgeAccessor>(), self_.lhs_->property_, rhs);
break;
case TypedValue::Type::Null:
// Skip setting properties on Null (can occur in optional match).
@ -2021,7 +1936,7 @@ void SetProperties::SetPropertiesCursor::Set(TRecordAccessor &record,
break;
case TypedValue::Type::Map: {
for (const auto &kv : rhs.Value<std::map<std::string, TypedValue>>())
PropsSetChecked(record, db_.Property(kv.first), kv.second);
PropsSetChecked(&record, db_.Property(kv.first), kv.second);
break;
}
default:

View File

@ -331,8 +331,7 @@ and false on every following Pull.")
:capnp-load #'load-operator-pointer)
(node-atom "NodeAtom *" :initval "nullptr" :reader t
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *"))
(on-random-worker :bool :initval "false"))
:capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *")))
(:documentation
"Operator for creating a node.
@ -352,8 +351,7 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or
* @param on_random_worker If the node should be created locally or on random
* worker.
*/
CreateNode(const std::shared_ptr<LogicalOperator> &input, NodeAtom *node_atom,
bool on_random_worker);
CreateNode(const std::shared_ptr<LogicalOperator> &input, NodeAtom *node_atom);
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
@ -364,9 +362,6 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or
void set_input(std::shared_ptr<LogicalOperator> input) override {
input_ = input;
}
auto on_random_worker() const { return on_random_worker_; }
void set_on_random_worker(bool v) { on_random_worker_ = v; }
cpp<#)
(:private
#>cpp
@ -380,7 +375,6 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or
private:
const CreateNode &self_;
database::GraphDbAccessor &db_;
const std::unique_ptr<Cursor> input_cursor_;
};
cpp<#)
@ -401,7 +395,7 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or
:capnp-save #'save-operator-pointer
:capnp-load #'load-operator-pointer)
(input-symbol "Symbol" :reader t)
(existing-node :bool :documentation
(existing-node :bool :reader t :documentation
"if the given node atom refers to an existing node (either matched or created)"))
(:documentation
"Operator for creating edges and destination nodes.
@ -459,9 +453,8 @@ chained in cases when longer paths need creating.
database::GraphDbAccessor &db_;
const std::unique_ptr<Cursor> input_cursor_;
/** Gets the existing node (if existing_node_ == true), or creates a new
* node (on the given worker) and returns it. */
VertexAccessor &OtherVertex(int worker_id, Frame &frame, Context &context);
// Get the existing node (if existing_node_ == true), or create a new node
VertexAccessor &OtherVertex(Frame &frame, Context &context);
/**
* Helper function for creating an edge and adding it

View File

@ -266,6 +266,9 @@ class PlanPrinter final : public DistributedOperatorVisitor {
--depth_;
return true;
}
PRE_VISIT(DistributedCreateNode);
PRE_VISIT(DistributedCreateExpand);
#undef PRE_VISIT
private:

View File

@ -645,7 +645,7 @@ std::unique_ptr<LogicalOperator> GenCreateForPattern(
std::unordered_set<Symbol> &bound_symbols) {
auto base = [&](NodeAtom *node) -> std::unique_ptr<LogicalOperator> {
if (bound_symbols.insert(symbol_table.at(*node->identifier_)).second)
return std::make_unique<CreateNode>(std::move(input_op), node, false);
return std::make_unique<CreateNode>(std::move(input_op), node);
else
return std::move(input_op);
};

View File

@ -263,7 +263,8 @@ TEST_F(DistributedQueryPlan, Create) {
auto node = NODE("n");
ctx.symbol_table_[*node->identifier_] =
ctx.symbol_table_.CreateSymbol("n", true);
auto create = std::make_shared<query::plan::CreateNode>(unwind, node, true);
auto create =
std::make_shared<query::plan::DistributedCreateNode>(unwind, node, true);
PullAll(create, *dba, ctx.symbol_table_);
dba->Commit();

View File

@ -96,7 +96,7 @@ TEST(QueryPlan, AccumulateAdvance) {
auto node = NODE("n");
auto sym_n = symbol_table.CreateSymbol("n", true);
symbol_table[*node->identifier_] = sym_n;
auto create = std::make_shared<CreateNode>(nullptr, node, false);
auto create = std::make_shared<CreateNode>(nullptr, node);
auto accumulate = std::make_shared<Accumulate>(
create, std::vector<Symbol>{sym_n}, advance);
auto match = MakeScanAll(storage, symbol_table, "m", accumulate);

View File

@ -95,7 +95,7 @@ TEST(QueryPlan, CreateLimit) {
auto n = MakeScanAll(storage, symbol_table, "n1");
auto m = NODE("m");
symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true);
auto c = std::make_shared<CreateNode>(n.op_, m, false);
auto c = std::make_shared<CreateNode>(n.op_, m);
auto skip = std::make_shared<plan::Limit>(c, LITERAL(1));
EXPECT_EQ(1, PullAll(skip, *dba, symbol_table));

View File

@ -32,7 +32,7 @@ TEST(QueryPlan, CreateNodeWithAttributes) {
node->labels_.emplace_back(label);
node->properties_[property] = LITERAL(42);
auto create = std::make_shared<CreateNode>(nullptr, node, false);
auto create = std::make_shared<CreateNode>(nullptr, node);
PullAll(create, dba, symbol_table);
dba.AdvanceCommand();
@ -68,7 +68,7 @@ TEST(QueryPlan, CreateReturn) {
node->labels_.emplace_back(label);
node->properties_[property] = LITERAL(42);
auto create = std::make_shared<CreateNode>(nullptr, node, false);
auto create = std::make_shared<CreateNode>(nullptr, node);
auto named_expr_n = NEXPR("n", IDENT("n"));
symbol_table[*named_expr_n] = symbol_table.CreateSymbol("named_expr_n", true);
symbol_table[*named_expr_n->expression_] = sym_n;
@ -132,7 +132,7 @@ TEST(QueryPlan, CreateExpand) {
r->edge_types_.emplace_back(edge_type);
r->properties_[property] = LITERAL(3);
auto create_op = std::make_shared<CreateNode>(nullptr, n, false);
auto create_op = std::make_shared<CreateNode>(nullptr, n);
auto create_expand =
std::make_shared<CreateExpand>(m, r, create_op, n_sym, cycle);
PullAll(create_expand, dba, symbol_table);
@ -187,7 +187,7 @@ TEST(QueryPlan, MatchCreateNode) {
auto m = NODE("m");
symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true);
// creation op
auto create_node = std::make_shared<CreateNode>(n_scan_all.op_, m, false);
auto create_node = std::make_shared<CreateNode>(n_scan_all.op_, m);
EXPECT_EQ(CountIterable(dba->Vertices(false)), 3);
PullAll(create_node, *dba, symbol_table);
@ -858,7 +858,7 @@ TEST(QueryPlan, MergeNoInput) {
auto node = NODE("n");
auto sym_n = symbol_table.CreateSymbol("n", true);
symbol_table[*node->identifier_] = sym_n;
auto create = std::make_shared<CreateNode>(nullptr, node, false);
auto create = std::make_shared<CreateNode>(nullptr, node);
auto merge = std::make_shared<plan::Merge>(nullptr, create, create);
EXPECT_EQ(0, CountIterable(dba->Vertices(false)));

View File

@ -132,6 +132,8 @@ class PlanChecker : public DistributedOperatorVisitor {
PRE_VISIT(PullRemoteOrderBy);
PRE_VISIT(DistributedExpandBfs);
PRE_VISIT(DistributedCreateNode);
PRE_VISIT(DistributedCreateExpand);
VISIT(AuthHandler);
@ -171,6 +173,7 @@ class OpChecker : public BaseOpChecker {
virtual void ExpectOp(TOp &, const SymbolTable &) {}
};
using ExpectCreateNode = OpChecker<CreateNode>;
using ExpectCreateExpand = OpChecker<CreateExpand>;
using ExpectDelete = OpChecker<Delete>;
using ExpectScanAll = OpChecker<ScanAll>;
@ -194,6 +197,7 @@ using ExpectUnwind = OpChecker<Unwind>;
using ExpectDistinct = OpChecker<Distinct>;
using ExpectShowStreams = OpChecker<ShowStreams>;
using ExpectDistributedExpandBfs = OpChecker<DistributedExpandBfs>;
using ExpectDistributedCreateExpand = OpChecker<DistributedCreateExpand>;
class ExpectExpandVariable : public OpChecker<ExpandVariable> {
public:
@ -487,12 +491,12 @@ class ExpectCartesian : public OpChecker<Cartesian> {
const std::list<std::unique_ptr<BaseOpChecker>> &right_;
};
class ExpectCreateNode : public OpChecker<CreateNode> {
class ExpectDistributedCreateNode : public OpChecker<DistributedCreateNode> {
public:
ExpectCreateNode(bool on_random_worker = false)
ExpectDistributedCreateNode(bool on_random_worker = false)
: on_random_worker_(on_random_worker) {}
void ExpectOp(CreateNode &op, const SymbolTable &) override {
void ExpectOp(DistributedCreateNode &op, const SymbolTable &) override {
EXPECT_EQ(op.on_random_worker(), on_random_worker_);
}
@ -940,8 +944,9 @@ TYPED_TEST(TestPlanner, CreateNodeReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc,
ExpectProduce());
{
auto expected = ExpectDistributed(MakeCheckers(
ExpectCreateNode(true), ExpectSynchronize(false), ExpectProduce()));
auto expected = ExpectDistributed(
MakeCheckers(ExpectDistributedCreateNode(true),
ExpectSynchronize(false), ExpectProduce()));
std::atomic<int64_t> next_plan_id{0};
auto distributed_plan =
MakeDistributedPlan(planner.plan(), symbol_table, next_plan_id);
@ -958,8 +963,8 @@ TYPED_TEST(TestPlanner, CreateExpand) {
NODE("n"), EDGE("r", Direction::OUT, {relationship}), NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(true), ExpectCreateExpand(),
ExpectSynchronize(false)),
MakeCheckers(ExpectDistributedCreateNode(true),
ExpectDistributedCreateExpand(), ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -970,8 +975,8 @@ TYPED_TEST(TestPlanner, CreateMultipleNode) {
QUERY(SINGLE_QUERY(CREATE(PATTERN(NODE("n")), PATTERN(NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateNode());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(true), ExpectCreateNode(true),
ExpectSynchronize(false)),
MakeCheckers(ExpectDistributedCreateNode(true),
ExpectDistributedCreateNode(true), ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -987,8 +992,9 @@ TYPED_TEST(TestPlanner, CreateNodeExpandNode) {
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand(),
ExpectCreateNode());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(true), ExpectCreateExpand(),
ExpectCreateNode(true), ExpectSynchronize(false)),
MakeCheckers(ExpectDistributedCreateNode(true),
ExpectDistributedCreateExpand(),
ExpectDistributedCreateNode(true), ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1003,8 +1009,9 @@ TYPED_TEST(TestPlanner, CreateNamedPattern) {
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand(),
ExpectConstructNamedPath());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(true), ExpectCreateExpand(),
ExpectConstructNamedPath(), ExpectSynchronize(false)),
MakeCheckers(ExpectDistributedCreateNode(true),
ExpectDistributedCreateExpand(), ExpectConstructNamedPath(),
ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1020,8 +1027,9 @@ TYPED_TEST(TestPlanner, MatchCreateExpand) {
NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectCreateExpand());
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectCreateExpand(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectCreateExpand()));
MakeCheckers(ExpectScanAll(), ExpectDistributedCreateExpand(),
ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectDistributedCreateExpand()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1365,8 +1373,9 @@ TYPED_TEST(TestPlanner, CreateMultiExpand) {
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand(),
ExpectCreateExpand());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(true), ExpectCreateExpand(),
ExpectCreateExpand(), ExpectSynchronize(false)),
MakeCheckers(ExpectDistributedCreateNode(true),
ExpectDistributedCreateExpand(),
ExpectDistributedCreateExpand(), ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1449,9 +1458,10 @@ TYPED_TEST(TestPlanner, MatchWithCreate) {
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectProduce(),
ExpectCreateExpand());
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectCreateExpand(),
ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectCreateExpand()));
MakeCheckers(ExpectScanAll(), ExpectProduce(),
ExpectDistributedCreateExpand(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectProduce(),
ExpectDistributedCreateExpand()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1493,7 +1503,7 @@ TYPED_TEST(TestPlanner, CreateWithSkipReturnLimit) {
CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc,
ExpectProduce(), ExpectSkip(), ExpectProduce(), ExpectLimit());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(true), ExpectSynchronize(true),
MakeCheckers(ExpectDistributedCreateNode(true), ExpectSynchronize(true),
ExpectProduce(), ExpectSkip(), ExpectProduce(),
ExpectLimit()),
{}};
@ -1571,9 +1581,10 @@ TYPED_TEST(TestPlanner, CreateWithOrderByWhere) {
CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(),
ExpectCreateExpand(), acc, ExpectProduce(), ExpectOrderBy(),
ExpectFilter());
auto expected = ExpectDistributed(MakeCheckers(
ExpectCreateNode(true), ExpectCreateExpand(), ExpectSynchronize(true),
ExpectProduce(), ExpectOrderBy(), ExpectFilter()));
auto expected = ExpectDistributed(
MakeCheckers(ExpectDistributedCreateNode(true),
ExpectDistributedCreateExpand(), ExpectSynchronize(true),
ExpectProduce(), ExpectOrderBy(), ExpectFilter()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -2603,10 +2614,10 @@ TYPED_TEST(TestPlanner, DistributedMatchCreateReturn) {
FakeDbAccessor dba;
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectCreateNode(),
MakeCheckers(ExpectScanAll(), ExpectDistributedCreateNode(),
ExpectSynchronize({symbol_table.at(*ident_m)}),
ExpectProduce()),
MakeCheckers(ExpectScanAll(), ExpectCreateNode()));
MakeCheckers(ExpectScanAll(), ExpectDistributedCreateNode()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -2630,8 +2641,9 @@ TYPED_TEST(TestPlanner, DistributedCartesianCreateExpand) {
MakeCheckers(ExpectScanAll(),
ExpectPullRemote({symbol_table.at(*node_b->identifier_)}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectCreateExpand(),
ExpectSynchronize(false), ExpectProduce()),
MakeCheckers(ExpectCartesian(left_cart, right_cart),
ExpectDistributedCreateExpand(), ExpectSynchronize(false),
ExpectProduce()),
MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()));
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
CheckDistributedPlan(planner.plan(), symbol_table, expected);
@ -3003,14 +3015,14 @@ TYPED_TEST(TestPlanner, DistributedCartesianCreateNode) {
auto symbol_table = MakeSymbolTable(*storage.query());
auto sym_b = symbol_table.at(*node_b->identifier_);
auto left_cart =
MakeCheckers(ExpectScanAll(), ExpectCreateNode(),
MakeCheckers(ExpectScanAll(), ExpectDistributedCreateNode(),
ExpectSynchronize({sym_b}, true), ExpectProduce());
auto sym_c = symbol_table.at(*node_c->identifier_);
auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_c}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(left_cart, right_cart),
ExpectCreateNode(true), ExpectSynchronize(false)),
MakeCheckers(ExpectScanAll(), ExpectCreateNode()),
ExpectDistributedCreateNode(true), ExpectSynchronize(false)),
MakeCheckers(ExpectScanAll(), ExpectDistributedCreateNode()),
MakeCheckers(ExpectScanAll()));
FakeDbAccessor dba;
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);