Move Parameters into EvaluationContext
Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1593
This commit is contained in:
parent
8a3f3b6c88
commit
503f8a7224
src
CMakeLists.txt
distributed
produce_rpc_server.cppproduce_rpc_server.hpppull_produce_rpc_messages.lcppull_rpc_clients.cpppull_rpc_clients.hpp
query
tests
benchmark/query
manual
unit
@ -131,6 +131,7 @@ add_capnp(communication/rpc/messages.capnp)
|
||||
add_capnp(distributed/serialization.capnp)
|
||||
add_capnp(durability/recovery.capnp)
|
||||
add_capnp(query/common.capnp)
|
||||
add_capnp(query/context.capnp)
|
||||
add_capnp(query/frontend/ast/ast.capnp)
|
||||
add_capnp(query/frontend/semantic/symbol.capnp)
|
||||
add_capnp(storage/serialization.capnp)
|
||||
|
@ -12,7 +12,8 @@ namespace distributed {
|
||||
ProduceRpcServer::OngoingProduce::OngoingProduce(
|
||||
database::Worker *db, tx::TransactionId tx_id,
|
||||
std::shared_ptr<query::plan::LogicalOperator> op,
|
||||
query::SymbolTable symbol_table, Parameters parameters, int64_t timestamp,
|
||||
query::SymbolTable symbol_table,
|
||||
query::EvaluationContext evaluation_context,
|
||||
std::vector<query::Symbol> pull_symbols)
|
||||
: dba_(db->Access(tx_id)),
|
||||
context_(*dba_),
|
||||
@ -20,8 +21,7 @@ ProduceRpcServer::OngoingProduce::OngoingProduce(
|
||||
frame_(symbol_table.max_position()),
|
||||
cursor_(op->MakeCursor(*dba_)) {
|
||||
context_.symbol_table_ = std::move(symbol_table);
|
||||
context_.parameters_ = std::move(parameters);
|
||||
context_.evaluation_context_.timestamp = timestamp;
|
||||
context_.evaluation_context_ = std::move(evaluation_context);
|
||||
}
|
||||
|
||||
std::pair<std::vector<query::TypedValue>, PullState>
|
||||
@ -162,8 +162,8 @@ ProduceRpcServer::OngoingProduce &ProduceRpcServer::GetOngoingProduce(
|
||||
return ongoing_produces_
|
||||
.emplace(std::piecewise_construct, std::forward_as_tuple(key_tuple),
|
||||
std::forward_as_tuple(db_, req.tx_id, plan_pack.plan,
|
||||
plan_pack.symbol_table, req.params,
|
||||
req.timestamp, req.symbols))
|
||||
plan_pack.symbol_table,
|
||||
req.evaluation_context, req.symbols))
|
||||
.first->second;
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,6 @@
|
||||
#include "query/context.hpp"
|
||||
#include "query/frontend/semantic/symbol_table.hpp"
|
||||
#include "query/interpret/frame.hpp"
|
||||
#include "query/parameters.hpp"
|
||||
#include "query/plan/operator.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
@ -45,8 +44,9 @@ class ProduceRpcServer {
|
||||
public:
|
||||
OngoingProduce(database::Worker *db, tx::TransactionId tx_id,
|
||||
std::shared_ptr<query::plan::LogicalOperator> op,
|
||||
query::SymbolTable symbol_table, Parameters parameters,
|
||||
int64_t timestamp, std::vector<query::Symbol> pull_symbols);
|
||||
query::SymbolTable symbol_table,
|
||||
query::EvaluationContext evaluation_context,
|
||||
std::vector<query::Symbol> pull_symbols);
|
||||
|
||||
/// Returns a vector of typed values (one for each `pull_symbol`), and an
|
||||
/// indication of the pull result. The result data is valid only if the
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "distributed/pull_produce_rpc_messages.capnp.h"
|
||||
#include "distributed/serialization.hpp"
|
||||
#include "query/context.hpp"
|
||||
#include "query/frontend/semantic/symbol.hpp"
|
||||
#include "query/parameters.hpp"
|
||||
#include "storage/address_types.hpp"
|
||||
@ -28,6 +29,7 @@ cpp<#
|
||||
|
||||
(lcp:capnp-namespace "distributed")
|
||||
|
||||
(lcp:capnp-import 'ctx "/query/context.capnp")
|
||||
(lcp:capnp-import 'dis "/distributed/serialization.capnp")
|
||||
(lcp:capnp-import 'sem "/query/frontend/semantic/symbol.capnp")
|
||||
(lcp:capnp-import 'utils "/utils/serialization.capnp")
|
||||
@ -328,33 +330,34 @@ cpp<#)
|
||||
:capnp-load #'load-snapshot)
|
||||
(plan-id :int64_t)
|
||||
(command-id "tx::CommandId")
|
||||
(params "Parameters"
|
||||
:capnp-type "Utils.Map(Utils.BoxInt64, Dis.TypedValue)"
|
||||
:capnp-save
|
||||
(lambda (builder member)
|
||||
#>cpp
|
||||
auto entries_builder = ${builder}.initEntries(${member}.size());
|
||||
size_t i = 0;
|
||||
for (auto &entry : params) {
|
||||
auto builder = entries_builder[i];
|
||||
auto key_builder = builder.initKey();
|
||||
key_builder.setValue(entry.first);
|
||||
auto value_builder = builder.initValue();
|
||||
distributed::SaveCapnpTypedValue(entry.second, &value_builder);
|
||||
++i;
|
||||
}
|
||||
cpp<#)
|
||||
:capnp-load
|
||||
(lambda (reader member)
|
||||
#>cpp
|
||||
for (const auto &entry_reader : ${reader}.getEntries()) {
|
||||
query::TypedValue value;
|
||||
distributed::LoadCapnpTypedValue(entry_reader.getValue(), &value);
|
||||
${member}.Add(entry_reader.getKey().getValue(), value);
|
||||
}
|
||||
cpp<#))
|
||||
(evaluation-context "query::EvaluationContext"
|
||||
:capnp-type "Ctx.EvaluationContext"
|
||||
:capnp-save
|
||||
(lambda (builder member)
|
||||
#>cpp
|
||||
${builder}.setTimestamp(${member}.timestamp);
|
||||
auto params_builder = ${builder}.initParams().initEntries(${member}.parameters.size());
|
||||
size_t i = 0;
|
||||
for (auto &entry : ${member}.parameters) {
|
||||
auto builder = params_builder[i];
|
||||
auto key_builder = builder.initKey();
|
||||
key_builder.setValue(entry.first);
|
||||
auto value_builder = builder.initValue();
|
||||
distributed::SaveCapnpTypedValue(entry.second, &value_builder);
|
||||
++i;
|
||||
}
|
||||
cpp<#)
|
||||
:capnp-load
|
||||
(lambda (reader member)
|
||||
#>cpp
|
||||
${member}.timestamp = ${reader}.getTimestamp();
|
||||
for (const auto &entry_reader : ${reader}.getParams().getEntries()) {
|
||||
query::TypedValue value;
|
||||
distributed::LoadCapnpTypedValue(entry_reader.getValue(), &value);
|
||||
${member}.parameters.Add(entry_reader.getKey().getValue(), value);
|
||||
}
|
||||
cpp<#))
|
||||
(symbols "std::vector<query::Symbol>" :capnp-type "List(Sem.Symbol)")
|
||||
(timestamp :int64_t)
|
||||
(accumulate :bool)
|
||||
(batch-size :int64_t)
|
||||
;; Indicates which of (old, new) records of a graph element should be sent.
|
||||
|
@ -8,14 +8,14 @@ namespace distributed {
|
||||
|
||||
utils::Future<PullData> PullRpcClients::Pull(
|
||||
database::GraphDbAccessor *dba, int worker_id, int64_t plan_id,
|
||||
tx::CommandId command_id, const Parameters ¶ms,
|
||||
const std::vector<query::Symbol> &symbols, int64_t timestamp,
|
||||
bool accumulate, int batch_size) {
|
||||
return clients_->ExecuteOnWorker<
|
||||
PullData>(worker_id, [data_manager = data_manager_, dba, plan_id,
|
||||
command_id, params, symbols, timestamp, accumulate,
|
||||
batch_size](int worker_id,
|
||||
ClientPool &client_pool) {
|
||||
tx::CommandId command_id,
|
||||
const query::EvaluationContext &evaluation_context,
|
||||
const std::vector<query::Symbol> &symbols, bool accumulate,
|
||||
int batch_size) {
|
||||
return clients_->ExecuteOnWorker<PullData>(worker_id, [
|
||||
data_manager = data_manager_, dba, plan_id, command_id, evaluation_context,
|
||||
symbols, accumulate, batch_size
|
||||
](int worker_id, ClientPool &client_pool) {
|
||||
auto load_pull_res = [data_manager, dba](const auto &res_reader) {
|
||||
PullRes res;
|
||||
res.Load(res_reader, dba, data_manager);
|
||||
@ -23,8 +23,8 @@ utils::Future<PullData> PullRpcClients::Pull(
|
||||
};
|
||||
auto result = client_pool.CallWithLoad<PullRpc>(
|
||||
load_pull_res, dba->transaction_id(), dba->transaction().snapshot(),
|
||||
plan_id, command_id, params, symbols, timestamp, accumulate, batch_size,
|
||||
true, true);
|
||||
plan_id, command_id, evaluation_context, symbols, accumulate,
|
||||
batch_size, true, true);
|
||||
return PullData{result->data.pull_state, std::move(result->data.frames)};
|
||||
});
|
||||
}
|
||||
|
@ -5,8 +5,8 @@
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "distributed/pull_produce_rpc_messages.hpp"
|
||||
#include "distributed/rpc_worker_clients.hpp"
|
||||
#include "query/context.hpp"
|
||||
#include "query/frontend/semantic/symbol.hpp"
|
||||
#include "query/parameters.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/future.hpp"
|
||||
|
||||
@ -32,12 +32,12 @@ class PullRpcClients {
|
||||
/// @todo: it might be cleaner to split Pull into {InitRemoteCursor,
|
||||
/// Pull, RemoteAccumulate}, but that's a lot of refactoring and more
|
||||
/// RPC calls.
|
||||
utils::Future<PullData> Pull(database::GraphDbAccessor *dba, int worker_id,
|
||||
int64_t plan_id, tx::CommandId command_id,
|
||||
const Parameters ¶ms,
|
||||
const std::vector<query::Symbol> &symbols,
|
||||
int64_t timestamp, bool accumulate,
|
||||
int batch_size = kDefaultBatchSize);
|
||||
utils::Future<PullData> Pull(
|
||||
database::GraphDbAccessor *dba, int worker_id, int64_t plan_id,
|
||||
tx::CommandId command_id,
|
||||
const query::EvaluationContext &evaluation_context,
|
||||
const std::vector<query::Symbol> &symbols, bool accumulate,
|
||||
int batch_size = kDefaultBatchSize);
|
||||
|
||||
utils::Future<void> ResetCursor(database::GraphDbAccessor *dba, int worker_id,
|
||||
int64_t plan_id, tx::CommandId command_id);
|
||||
|
12
src/query/context.capnp
Normal file
12
src/query/context.capnp
Normal file
@ -0,0 +1,12 @@
|
||||
@0xf2d47a8877eb7f4f;
|
||||
|
||||
using Cxx = import "/capnp/c++.capnp";
|
||||
using Dis = import "/distributed/serialization.capnp";
|
||||
using Utils = import "/utils/serialization.capnp";
|
||||
|
||||
$Cxx.namespace("query::capnp");
|
||||
|
||||
struct EvaluationContext {
|
||||
timestamp @0 : Int64;
|
||||
params @1 : Utils.Map(Utils.BoxInt64, Dis.TypedValue);
|
||||
}
|
@ -17,6 +17,7 @@ namespace query {
|
||||
|
||||
struct EvaluationContext {
|
||||
int64_t timestamp{-1};
|
||||
query::Parameters parameters;
|
||||
};
|
||||
|
||||
class Context {
|
||||
@ -34,7 +35,6 @@ class Context {
|
||||
bool in_explicit_transaction_ = false;
|
||||
bool is_index_created_ = false;
|
||||
SymbolTable symbol_table_;
|
||||
Parameters parameters_;
|
||||
EvaluationContext evaluation_context_;
|
||||
|
||||
auth::Auth *auth_ = nullptr;
|
||||
|
@ -58,7 +58,8 @@ std::unique_ptr<LogicalPlan> DistributedInterpreter::MakeLogicalPlan(
|
||||
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan;
|
||||
double cost;
|
||||
std::tie(tmp_logical_plan, cost) = plan::MakeLogicalPlan(
|
||||
planning_context, context->parameters_, FLAGS_query_cost_planner);
|
||||
planning_context, context->evaluation_context_.parameters,
|
||||
FLAGS_query_cost_planner);
|
||||
auto plan = MakeDistributedPlan(*tmp_logical_plan, context->symbol_table_,
|
||||
next_plan_id_);
|
||||
VLOG(10) << "[Interpreter] Created plan for distributed execution "
|
||||
|
@ -13,7 +13,6 @@
|
||||
#include "query/frontend/ast/ast.hpp"
|
||||
#include "query/frontend/semantic/symbol_table.hpp"
|
||||
#include "query/interpret/frame.hpp"
|
||||
#include "query/parameters.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
@ -22,12 +21,10 @@ namespace query {
|
||||
class ExpressionEvaluator : public TreeVisitor<TypedValue> {
|
||||
public:
|
||||
ExpressionEvaluator(Frame *frame, const SymbolTable &symbol_table,
|
||||
const Parameters ¶meters,
|
||||
const EvaluationContext &ctx,
|
||||
database::GraphDbAccessor *dba, GraphView graph_view)
|
||||
: frame_(frame),
|
||||
symbol_table_(&symbol_table),
|
||||
parameters_(¶meters),
|
||||
ctx_(&ctx),
|
||||
dba_(dba),
|
||||
graph_view_(graph_view) {}
|
||||
@ -498,7 +495,7 @@ class ExpressionEvaluator : public TreeVisitor<TypedValue> {
|
||||
}
|
||||
|
||||
TypedValue Visit(ParameterLookup ¶m_lookup) override {
|
||||
return parameters_->AtTokenPosition(param_lookup.token_position_);
|
||||
return ctx_->parameters.AtTokenPosition(param_lookup.token_position_);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -566,7 +563,6 @@ class ExpressionEvaluator : public TreeVisitor<TypedValue> {
|
||||
|
||||
Frame *frame_;
|
||||
const SymbolTable *symbol_table_;
|
||||
const Parameters *parameters_;
|
||||
const EvaluationContext *ctx_;
|
||||
database::GraphDbAccessor *dba_;
|
||||
// which switching approach should be used when evaluating
|
||||
|
@ -35,26 +35,26 @@ Interpreter::Results Interpreter::operator()(
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
|
||||
Context ctx(db_accessor);
|
||||
ctx.in_explicit_transaction_ = in_explicit_transaction;
|
||||
ctx.auth_ = auth_;
|
||||
ctx.kafka_streams_ = kafka_streams_;
|
||||
ctx.evaluation_context_ = evaluation_context;
|
||||
|
||||
// query -> stripped query
|
||||
StrippedQuery stripped(query);
|
||||
|
||||
// Update context with provided parameters.
|
||||
ctx.parameters_ = stripped.literals();
|
||||
evaluation_context.parameters = stripped.literals();
|
||||
for (const auto ¶m_pair : stripped.parameters()) {
|
||||
auto param_it = params.find(param_pair.second);
|
||||
if (param_it == params.end()) {
|
||||
throw query::UnprovidedParameterError(
|
||||
fmt::format("Parameter ${} not provided.", param_pair.second));
|
||||
}
|
||||
ctx.parameters_.Add(param_pair.first, param_it->second);
|
||||
evaluation_context.parameters.Add(param_pair.first, param_it->second);
|
||||
}
|
||||
|
||||
Context ctx(db_accessor);
|
||||
ctx.in_explicit_transaction_ = in_explicit_transaction;
|
||||
ctx.auth_ = auth_;
|
||||
ctx.kafka_streams_ = kafka_streams_;
|
||||
ctx.evaluation_context_ = evaluation_context;
|
||||
|
||||
ParsingContext parsing_context;
|
||||
parsing_context.is_query_cached = true;
|
||||
AstStorage ast_storage = QueryToAst(stripped, parsing_context, &db_accessor);
|
||||
@ -201,7 +201,8 @@ std::unique_ptr<LogicalPlan> Interpreter::MakeLogicalPlan(
|
||||
std::unique_ptr<plan::LogicalOperator> root;
|
||||
double cost;
|
||||
std::tie(root, cost) = plan::MakeLogicalPlan(
|
||||
planning_context, context->parameters_, FLAGS_query_cost_planner);
|
||||
planning_context, context->evaluation_context_.parameters,
|
||||
FLAGS_query_cost_planner);
|
||||
return std::make_unique<SingleNodeLogicalPlan>(
|
||||
std::move(root), cost, std::move(ast_storage), context->symbol_table_);
|
||||
}
|
||||
|
@ -11,6 +11,8 @@
|
||||
* and provides ways of obtaining them by position.
|
||||
*/
|
||||
// TODO move to namespace query::
|
||||
namespace query {
|
||||
|
||||
struct Parameters {
|
||||
public:
|
||||
/**
|
||||
@ -60,3 +62,5 @@ struct Parameters {
|
||||
private:
|
||||
std::vector<std::pair<int, query::TypedValue>> storage_;
|
||||
};
|
||||
|
||||
} // namespace query
|
||||
|
@ -340,9 +340,9 @@ class RemotePuller {
|
||||
bool remote_pulls_initialized_ = false;
|
||||
|
||||
void UpdatePullForWorker(int worker_id, Context &context) {
|
||||
remote_pulls_[worker_id] = pull_clients_->Pull(
|
||||
&db_, worker_id, plan_id_, command_id_, context.parameters_, symbols_,
|
||||
context.evaluation_context_.timestamp, false);
|
||||
remote_pulls_[worker_id] =
|
||||
pull_clients_->Pull(&db_, worker_id, plan_id_, command_id_,
|
||||
context.evaluation_context_, symbols_, false);
|
||||
}
|
||||
};
|
||||
|
||||
@ -529,8 +529,8 @@ class SynchronizeCursor : public Cursor {
|
||||
if (worker_id == master_id_) continue;
|
||||
worker_accumulations.emplace_back(pull_clients_->Pull(
|
||||
&context.db_accessor_, worker_id, self_.pull_remote()->plan_id(),
|
||||
command_id_, context.parameters_, self_.pull_remote()->symbols(),
|
||||
context.evaluation_context_.timestamp, true, 0));
|
||||
command_id_, context.evaluation_context_,
|
||||
self_.pull_remote()->symbols(), true, 0));
|
||||
}
|
||||
}
|
||||
|
||||
@ -627,9 +627,9 @@ class PullRemoteOrderByCursor : public Cursor {
|
||||
|
||||
bool Pull(Frame &frame, Context &context) {
|
||||
if (context.db_accessor_.should_abort()) throw HintedAbortError();
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::OLD);
|
||||
|
||||
auto evaluate_result = [this, &evaluator]() {
|
||||
std::vector<TypedValue> order_by;
|
||||
@ -1007,9 +1007,9 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
|
||||
}
|
||||
|
||||
// Evaluator for the filtering condition and expansion depth.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, self_.graph_view());
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, self_.graph_view());
|
||||
|
||||
while (true) {
|
||||
TypedValue last_vertex;
|
||||
@ -1185,9 +1185,9 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom,
|
||||
|
||||
// Evaluator should use the latest accessors, as modified in this query, when
|
||||
// setting properties on new nodes.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
for (auto &kv : node_atom->properties_) {
|
||||
auto value = kv.second->Accept(evaluator);
|
||||
if (!value.IsPropertyValue()) {
|
||||
@ -1259,9 +1259,9 @@ class DistributedCreateExpandCursor : public query::plan::Cursor {
|
||||
|
||||
// Similarly to CreateNode, newly created edges and nodes should use the
|
||||
// latest accesors.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
// E.g. we pickup new properties: `CREATE (n {p: 42}) -[:r {ep: n.p}]-> ()`
|
||||
v1.SwitchNew();
|
||||
|
||||
|
@ -101,9 +101,9 @@ VertexAccessor &CreateLocalVertex(NodeAtom *node_atom, Frame &frame,
|
||||
|
||||
// Evaluator should use the latest accessors, as modified in this query, when
|
||||
// setting properties on new nodes.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
for (auto &kv : node_atom->properties_)
|
||||
PropsSetChecked(&new_node, kv.first.second, kv.second->Accept(evaluator));
|
||||
frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node;
|
||||
@ -178,9 +178,9 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) {
|
||||
|
||||
// Similarly to CreateNode, newly created edges and nodes should use the
|
||||
// latest accesors.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
// E.g. we pickup new properties: `CREATE (n {p: 42}) -[:r {ep: n.p}]-> ()`
|
||||
v1.SwitchNew();
|
||||
|
||||
@ -343,9 +343,9 @@ std::unique_ptr<Cursor> ScanAllByLabelPropertyRange::MakeCursor(
|
||||
-> std::experimental::optional<decltype(
|
||||
db.Vertices(label_, property_, std::experimental::nullopt,
|
||||
std::experimental::nullopt, false))> {
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, graph_view_);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, graph_view_);
|
||||
auto convert = [&evaluator](const auto &bound)
|
||||
-> std::experimental::optional<utils::Bound<PropertyValue>> {
|
||||
if (!bound) return std::experimental::nullopt;
|
||||
@ -392,9 +392,9 @@ std::unique_ptr<Cursor> ScanAllByLabelPropertyValue::MakeCursor(
|
||||
auto vertices = [this, &db](Frame &frame, Context &context)
|
||||
-> std::experimental::optional<decltype(
|
||||
db.Vertices(label_, property_, TypedValue::Null, false))> {
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, graph_view_);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, graph_view_);
|
||||
auto value = expression_->Accept(evaluator);
|
||||
if (value.IsNull()) return std::experimental::nullopt;
|
||||
try {
|
||||
@ -659,9 +659,9 @@ class ExpandVariableCursor : public Cursor {
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
|
||||
|
||||
bool Pull(Frame &frame, Context &context) override {
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, self_.graph_view_);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, self_.graph_view_);
|
||||
while (true) {
|
||||
if (Expand(frame, context)) return true;
|
||||
|
||||
@ -734,7 +734,7 @@ class ExpandVariableCursor : public Cursor {
|
||||
|
||||
// Evaluate the upper and lower bounds.
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.parameters_,
|
||||
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, self_.graph_view_);
|
||||
auto calc_bound = [&evaluator](auto &bound) {
|
||||
@ -795,9 +795,9 @@ class ExpandVariableCursor : public Cursor {
|
||||
* vertex and another Pull from the input cursor should be performed.
|
||||
*/
|
||||
bool Expand(Frame &frame, Context &context) {
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, self_.graph_view_);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, self_.graph_view_);
|
||||
// Some expansions might not be valid due to edge uniqueness and
|
||||
// existing_node criterions, so expand in a loop until either the input
|
||||
// vertex is exhausted or a valid variable-length expansion is available.
|
||||
@ -890,9 +890,9 @@ class STShortestPathCursor : public query::plan::Cursor {
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, Context &context) override {
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::OLD);
|
||||
while (input_cursor_->Pull(frame, context)) {
|
||||
auto source_tv = frame[self_.input_symbol()];
|
||||
auto sink_tv = frame[self_.node_symbol()];
|
||||
@ -1119,9 +1119,9 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, Context &context) override {
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::OLD);
|
||||
|
||||
// for the given (edge, vertex) pair checks if they satisfy the
|
||||
// "where" condition. if so, places them in the to_visit_ structure.
|
||||
@ -1266,9 +1266,9 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
: self_(self), input_cursor_(self_.input_->MakeCursor(db)) {}
|
||||
|
||||
bool Pull(Frame &frame, Context &context) override {
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, self_.graph_view_);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, self_.graph_view_);
|
||||
auto create_state = [this](VertexAccessor vertex, int depth) {
|
||||
return std::make_pair(vertex, upper_bound_set_ ? depth : 0);
|
||||
};
|
||||
@ -1631,9 +1631,9 @@ Filter::FilterCursor::FilterCursor(const Filter &self,
|
||||
bool Filter::FilterCursor::Pull(Frame &frame, Context &context) {
|
||||
// Like all filters, newly set values should not affect filtering of old
|
||||
// nodes and edges.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::OLD);
|
||||
while (input_cursor_->Pull(frame, context)) {
|
||||
if (EvaluateFilter(evaluator, self_.expression_)) return true;
|
||||
}
|
||||
@ -1674,9 +1674,9 @@ Produce::ProduceCursor::ProduceCursor(const Produce &self,
|
||||
bool Produce::ProduceCursor::Pull(Frame &frame, Context &context) {
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
// Produce should always yield the latest results.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
for (auto named_expr : self_.named_expressions_)
|
||||
named_expr->Accept(evaluator);
|
||||
return true;
|
||||
@ -1711,9 +1711,9 @@ bool Delete::DeleteCursor::Pull(Frame &frame, Context &context) {
|
||||
// Delete should get the latest information, this way it is also possible
|
||||
// to
|
||||
// delete newly added nodes and edges.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
// collect expressions results so edges can get deleted before vertices
|
||||
// this is necessary because an edge that gets deleted could block vertex
|
||||
// deletion
|
||||
@ -1781,9 +1781,9 @@ bool SetProperty::SetPropertyCursor::Pull(Frame &frame, Context &context) {
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
|
||||
// Set, just like Create needs to see the latest changes.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
TypedValue lhs = self_.lhs_->expression_->Accept(evaluator);
|
||||
TypedValue rhs = self_.rhs_->Accept(evaluator);
|
||||
|
||||
@ -1839,9 +1839,9 @@ bool SetProperties::SetPropertiesCursor::Pull(Frame &frame, Context &context) {
|
||||
TypedValue &lhs = frame[self_.input_symbol_];
|
||||
|
||||
// Set, just like Create needs to see the latest changes.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
TypedValue rhs = self_.rhs_->Accept(evaluator);
|
||||
|
||||
switch (lhs.type()) {
|
||||
@ -1976,9 +1976,9 @@ bool RemoveProperty::RemovePropertyCursor::Pull(Frame &frame,
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
|
||||
// Remove, just like Delete needs to see the latest changes.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
TypedValue lhs = self_.lhs_->expression_->Accept(evaluator);
|
||||
|
||||
switch (lhs.type()) {
|
||||
@ -2276,9 +2276,9 @@ bool Aggregate::AggregateCursor::Pull(Frame &frame, Context &context) {
|
||||
}
|
||||
|
||||
void Aggregate::AggregateCursor::ProcessAll(Frame &frame, Context &context) {
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::NEW);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::NEW);
|
||||
while (input_cursor_->Pull(frame, context))
|
||||
ProcessOne(frame, context.symbol_table_, evaluator);
|
||||
|
||||
@ -2505,9 +2505,9 @@ bool Skip::SkipCursor::Pull(Frame &frame, Context &context) {
|
||||
// First successful pull from the input, evaluate the skip expression.
|
||||
// The skip expression doesn't contain identifiers so graph view
|
||||
// parameter is not important.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::OLD);
|
||||
TypedValue to_skip = self_.expression_->Accept(evaluator);
|
||||
if (to_skip.type() != TypedValue::Type::Int)
|
||||
throw QueryRuntimeException(
|
||||
@ -2563,9 +2563,9 @@ bool Limit::LimitCursor::Pull(Frame &frame, Context &context) {
|
||||
if (limit_ == -1) {
|
||||
// Limit expression doesn't contain identifiers so graph view is not
|
||||
// important.
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::OLD);
|
||||
TypedValue limit = self_.expression_->Accept(evaluator);
|
||||
if (limit.type() != TypedValue::Type::Int)
|
||||
throw QueryRuntimeException(
|
||||
@ -2627,9 +2627,9 @@ OrderBy::OrderByCursor::OrderByCursor(const OrderBy &self,
|
||||
|
||||
bool OrderBy::OrderByCursor::Pull(Frame &frame, Context &context) {
|
||||
if (!did_pull_all_) {
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::OLD);
|
||||
while (input_cursor_->Pull(frame, context)) {
|
||||
// collect the order_by elements
|
||||
std::vector<TypedValue> order_by;
|
||||
@ -2858,9 +2858,9 @@ bool Unwind::UnwindCursor::Pull(Frame &frame, Context &context) {
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
|
||||
// successful pull from input, initialize value and iterator
|
||||
ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_, GraphView::OLD);
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
&context.db_accessor_, GraphView::OLD);
|
||||
TypedValue input_value = self_.input_expression_->Accept(evaluator);
|
||||
if (input_value.type() != TypedValue::Type::List)
|
||||
throw QueryRuntimeException(
|
||||
@ -3259,7 +3259,7 @@ class AuthHandlerCursor : public Cursor {
|
||||
throw UserModificationInMulticommandTxException();
|
||||
}
|
||||
|
||||
ExpressionEvaluator evaluator(&frame, ctx.symbol_table_, ctx.parameters_,
|
||||
ExpressionEvaluator evaluator(&frame, ctx.symbol_table_,
|
||||
ctx.evaluation_context_, &ctx.db_accessor_,
|
||||
GraphView::OLD);
|
||||
std::experimental::optional<std::string> password;
|
||||
@ -3553,7 +3553,7 @@ class CreateStreamCursor : public Cursor {
|
||||
if (ctx.in_explicit_transaction_) {
|
||||
throw StreamClauseInMulticommandTxException();
|
||||
}
|
||||
ExpressionEvaluator evaluator(&frame, ctx.symbol_table_, ctx.parameters_,
|
||||
ExpressionEvaluator evaluator(&frame, ctx.symbol_table_,
|
||||
ctx.evaluation_context_, &ctx.db_accessor_,
|
||||
GraphView::OLD);
|
||||
|
||||
@ -3713,7 +3713,7 @@ class StartStopStreamCursor : public Cursor {
|
||||
throw StreamClauseInMulticommandTxException();
|
||||
}
|
||||
|
||||
ExpressionEvaluator evaluator(&frame, ctx.symbol_table_, ctx.parameters_,
|
||||
ExpressionEvaluator evaluator(&frame, ctx.symbol_table_,
|
||||
ctx.evaluation_context_, &ctx.db_accessor_,
|
||||
GraphView::OLD);
|
||||
std::experimental::optional<int64_t> limit_batches;
|
||||
@ -3811,7 +3811,7 @@ class TestStreamCursor : public Cursor {
|
||||
}
|
||||
|
||||
if (!is_initialized_) {
|
||||
ExpressionEvaluator evaluator(&frame, ctx.symbol_table_, ctx.parameters_,
|
||||
ExpressionEvaluator evaluator(&frame, ctx.symbol_table_,
|
||||
ctx.evaluation_context_, &ctx.db_accessor_,
|
||||
GraphView::OLD);
|
||||
std::experimental::optional<int64_t> limit_batches;
|
||||
|
@ -17,9 +17,8 @@ static void BenchmarkCoalesceCallWithNulls(benchmark::State &state) {
|
||||
database::GraphDbAccessor *dba = nullptr;
|
||||
query::Context context(*dba);
|
||||
query::ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_,
|
||||
query::GraphView::OLD);
|
||||
&frame, context.symbol_table_, context.evaluation_context_,
|
||||
&context.db_accessor_, query::GraphView::OLD);
|
||||
while (state.KeepRunning()) {
|
||||
function->Accept(evaluator);
|
||||
}
|
||||
@ -39,9 +38,8 @@ static void BenchmarkCoalesceCallWithStrings(benchmark::State &state) {
|
||||
database::GraphDbAccessor *dba = nullptr;
|
||||
query::Context context(*dba);
|
||||
query::ExpressionEvaluator evaluator(
|
||||
&frame, context.symbol_table_, context.parameters_,
|
||||
context.evaluation_context_, &context.db_accessor_,
|
||||
query::GraphView::OLD);
|
||||
&frame, context.symbol_table_, context.evaluation_context_,
|
||||
&context.db_accessor_, query::GraphView::OLD);
|
||||
while (state.KeepRunning()) {
|
||||
function->Accept(evaluator);
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ static void BM_PlanAndEstimateIndexedMatching(benchmark::State &state) {
|
||||
int vertex_count = state.range(1);
|
||||
std::tie(label, prop) = CreateIndexedVertices(index_count, vertex_count, db);
|
||||
auto dba = db.Access();
|
||||
Parameters parameters;
|
||||
query::Parameters parameters;
|
||||
while (state.KeepRunning()) {
|
||||
state.PauseTiming();
|
||||
query::AstStorage storage;
|
||||
@ -142,7 +142,7 @@ static void BM_PlanAndEstimateIndexedMatchingWithCachedCounts(
|
||||
std::tie(label, prop) = CreateIndexedVertices(index_count, vertex_count, db);
|
||||
auto dba = db.Access();
|
||||
auto vertex_counts = query::plan::MakeVertexCountCache(*dba);
|
||||
Parameters parameters;
|
||||
query::Parameters parameters;
|
||||
while (state.KeepRunning()) {
|
||||
state.PauseTiming();
|
||||
query::AstStorage storage;
|
||||
|
@ -522,7 +522,7 @@ auto MakeLogicalPlans(query::AstStorage &ast, query::SymbolTable &symbol_table,
|
||||
auto plans = query::plan::MakeLogicalPlanForSingleQuery<
|
||||
query::plan::VariableStartPlanner>(
|
||||
query_parts.query_parts.at(0).single_query_parts, ctx);
|
||||
Parameters parameters;
|
||||
query::Parameters parameters;
|
||||
for (auto plan : plans) {
|
||||
query::plan::CostEstimator<InteractiveDbAccessor> estimator(dba,
|
||||
parameters);
|
||||
|
@ -85,12 +85,12 @@ TEST_F(DistributedQueryPlan, PullProduceRpc) {
|
||||
master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
|
||||
|
||||
tx::CommandId command_id = dba->transaction().cid();
|
||||
Parameters params;
|
||||
EvaluationContext evaluation_context;
|
||||
std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
|
||||
auto remote_pull = [this, &command_id, ¶ms, &symbols](
|
||||
auto remote_pull = [this, &command_id, &evaluation_context, &symbols](
|
||||
GraphDbAccessor &dba, int worker_id) {
|
||||
return master().pull_clients().Pull(&dba, worker_id, plan_id, command_id,
|
||||
params, symbols, 0, false, 3);
|
||||
evaluation_context, symbols, false, 3);
|
||||
};
|
||||
auto expect_first_batch = [](auto &batch) {
|
||||
EXPECT_EQ(batch.pull_state, distributed::PullState::CURSOR_IN_PROGRESS);
|
||||
@ -204,13 +204,13 @@ TEST_F(DistributedQueryPlan, PullProduceRpcWithGraphElements) {
|
||||
master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
|
||||
|
||||
tx::CommandId command_id = dba->transaction().cid();
|
||||
Parameters params;
|
||||
EvaluationContext evaluation_context;
|
||||
std::vector<query::Symbol> symbols{ctx.symbol_table_[*return_n_r],
|
||||
ctx.symbol_table_[*return_m], p_sym};
|
||||
auto remote_pull = [this, &command_id, ¶ms, &symbols](
|
||||
auto remote_pull = [this, &command_id, &evaluation_context, &symbols](
|
||||
GraphDbAccessor &dba, int worker_id) {
|
||||
return master().pull_clients().Pull(&dba, worker_id, plan_id, command_id,
|
||||
params, symbols, 0, false, 3);
|
||||
evaluation_context, symbols, false, 3);
|
||||
};
|
||||
auto future_w1_results = remote_pull(*dba, 1);
|
||||
auto future_w2_results = remote_pull(*dba, 2);
|
||||
@ -380,12 +380,14 @@ TEST_F(DistributedTransactionTimeout, Timeout) {
|
||||
master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
|
||||
tx::CommandId command_id = dba->transaction().cid();
|
||||
|
||||
Parameters params;
|
||||
EvaluationContext evaluation_context;
|
||||
std::vector<query::Symbol> symbols{ctx.symbol_table_[*output]};
|
||||
auto remote_pull = [this, &command_id, ¶ms, &symbols, &dba]() {
|
||||
auto remote_pull = [this, &command_id, &evaluation_context, &symbols,
|
||||
&dba]() {
|
||||
return master()
|
||||
.pull_clients()
|
||||
.Pull(dba.get(), 1, plan_id, command_id, params, symbols, 0, false, 1)
|
||||
.Pull(dba.get(), 1, plan_id, command_id, evaluation_context, symbols,
|
||||
false, 1)
|
||||
.get()
|
||||
.pull_state;
|
||||
};
|
||||
|
@ -35,11 +35,10 @@ class ExpressionEvaluatorTest : public ::testing::Test {
|
||||
AstStorage storage;
|
||||
EvaluationContext ctx;
|
||||
SymbolTable symbol_table;
|
||||
Parameters parameters;
|
||||
|
||||
Frame frame{128};
|
||||
ExpressionEvaluator eval{&frame, symbol_table, parameters,
|
||||
ctx, dba.get(), GraphView::OLD};
|
||||
ExpressionEvaluator eval{&frame, symbol_table, ctx, dba.get(),
|
||||
GraphView::OLD};
|
||||
};
|
||||
|
||||
TEST_F(ExpressionEvaluatorTest, OrOperator) {
|
||||
@ -652,7 +651,6 @@ TEST_F(ExpressionEvaluatorTest, Aggregation) {
|
||||
auto aggr_sym = symbol_table.CreateSymbol("aggr", true);
|
||||
symbol_table[*aggr] = aggr_sym;
|
||||
frame[aggr_sym] = TypedValue(1);
|
||||
Parameters parameters;
|
||||
auto value = aggr->Accept(eval);
|
||||
EXPECT_EQ(value.ValueInt(), 1);
|
||||
}
|
||||
@ -675,7 +673,7 @@ TEST_F(ExpressionEvaluatorTest, ListLiteral) {
|
||||
}
|
||||
|
||||
TEST_F(ExpressionEvaluatorTest, ParameterLookup) {
|
||||
parameters.Add(0, 42);
|
||||
ctx.parameters.Add(0, 42);
|
||||
auto *param_lookup = storage.Create<ParameterLookup>(0);
|
||||
auto value = param_lookup->Accept(eval);
|
||||
ASSERT_TRUE(value.IsInt());
|
||||
|
Loading…
Reference in New Issue
Block a user