Clean-up TypedValue misuse

Summary:
In a bunch of places `TypedValue` was used where `PropertyValue` should be. A lot of times it was only because `TypedValue` serialization code could be reused for `PropertyValue`, only without providing callbacks for `VERTEX`, `EDGE` and `PATH`. So first I wrote separate serialization code for `PropertyValue` and put it into storage folder. Then I fixed all the places where `TypedValue` was incorrectly used instead of `PropertyValue`. I also disabled implicit `TypedValue` to `PropertyValue` conversion in hopes of preventing misuse in the future.

After that, I wrote code for `VertexAccessor` and `EdgeAccessor` serialization and put it into `storage` folder because it was almost duplicated in distributed BFS and pull produce RPC messages. On the sender side, some subset of records (old or new or both) is serialized, and on the reciever side, records are deserialized and immediately put into transaction cache.

Then I rewrote the `TypedValue` serialization functions (`SaveCapnpTypedValue` and `LoadCapnpTypedValue`) to not take callbacks for `VERTEX`, `EDGE` and `PATH`, but use accessor serialization functions instead. That means that any code that wants to use `TypedValue` serialization must hold a reference to `GraphDbAccessor` and `DataManager`, so that should make clients reconsider if they really want to use `TypedValue` instead of `PropertyValue`.

Reviewers: teon.banek, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1598
This commit is contained in:
Marin Tomic 2018-09-13 12:12:07 +02:00
parent 503f8a7224
commit b5cdf6b476
60 changed files with 957 additions and 850 deletions

View File

@ -38,7 +38,6 @@ set(memgraph_src_files
distributed/plan_dispatcher.cpp
distributed/produce_rpc_server.cpp
distributed/pull_rpc_clients.cpp
distributed/serialization.cpp
distributed/updates_rpc_clients.cpp
distributed/updates_rpc_server.cpp
durability/paths.cpp
@ -64,6 +63,7 @@ set(memgraph_src_files
query/plan/rule_based_planner.cpp
query/plan/variable_start_planner.cpp
query/repl.cpp
query/serialization.cpp
query/typed_value.cpp
storage/concurrent_id_mapper_master.cpp
storage/concurrent_id_mapper_worker.cpp
@ -72,6 +72,7 @@ set(memgraph_src_files
storage/property_value.cpp
storage/property_value_store.cpp
storage/record_accessor.cpp
storage/serialization.cpp
storage/vertex_accessor.cpp
transactions/distributed/engine_master.cpp
transactions/distributed/engine_worker.cpp
@ -128,12 +129,11 @@ add_custom_target(generate_lcp DEPENDS ${generated_lcp_files})
# Registering capnp must come after registering lcp files.
add_capnp(communication/rpc/messages.capnp)
add_capnp(distributed/serialization.capnp)
add_capnp(durability/recovery.capnp)
add_capnp(query/common.capnp)
add_capnp(query/context.capnp)
add_capnp(query/frontend/ast/ast.capnp)
add_capnp(query/frontend/semantic/symbol.capnp)
add_capnp(query/serialization.capnp)
add_capnp(storage/serialization.capnp)
add_custom_target(generate_capnp DEPENDS generate_lcp ${generated_capnp_files})

View File

@ -856,7 +856,8 @@ distributed::IndexRpcClients &Master::index_rpc_clients() {
VertexAccessor InsertVertexIntoRemote(
GraphDbAccessor *dba, int worker_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue> &properties,
const std::unordered_map<storage::Property, PropertyValue>
&properties,
std::experimental::optional<int64_t> cypher_id) {
// TODO: Replace this with virtual call or some other mechanism.
auto *distributed_db =

View File

@ -132,7 +132,8 @@ class Worker final : public DistributedGraphDb {
VertexAccessor InsertVertexIntoRemote(
GraphDbAccessor *dba, int worker_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue> &properties,
const std::unordered_map<storage::Property, PropertyValue>
&properties,
std::experimental::optional<int64_t> cypher_id);
} // namespace database

View File

@ -4,20 +4,23 @@
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "database/state_delta.capnp.h"
#include "distributed/serialization.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/hashed_file_writer.hpp"
#include "storage/address_types.hpp"
#include "storage/gid.hpp"
#include "storage/property_value.hpp"
#include "storage/serialization.hpp"
cpp<#
(lcp:namespace database)
#>cpp
class GraphDbAccessor;
cpp<#
(lcp:capnp-namespace "database")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'dis "/distributed/serialization.capnp")
(lcp:capnp-type-conversion "tx::TransactionId" "UInt64")
(lcp:capnp-type-conversion "gid::Gid" "UInt64")
@ -48,18 +51,16 @@ cpp<#
(property "storage::Property")
(property-name "std::string")
(value "PropertyValue" :initval "PropertyValue::Null"
:capnp-type "Dis.TypedValue"
:capnp-type "Storage.PropertyValue"
:capnp-save
(lambda (builder member)
#>cpp
distributed::SaveCapnpTypedValue(${member}, &${builder});
storage::SaveCapnpPropertyValue(${member}, &${builder});
cpp<#)
:capnp-load
(lambda (reader member)
#>cpp
query::TypedValue tv;
distributed::LoadCapnpTypedValue(${reader}, &tv);
${member} = tv;
storage::LoadCapnpPropertyValue(${reader}, &${member});
cpp<#))
(label "storage::Label")
(label-name "std::string")

View File

@ -77,17 +77,15 @@ std::experimental::optional<VertexAccessor> BfsRpcClients::Pull(
return subcursor_storage_->Get(subcursor_id)->Pull();
}
auto res =
clients_->GetClientPool(worker_id).Call<SubcursorPullRpc>(subcursor_id);
auto res = clients_->GetClientPool(worker_id).CallWithLoad<SubcursorPullRpc>(
[this, dba](const auto &reader) {
SubcursorPullRes res;
res.Load(reader, dba, this->data_manager_);
return res;
},
subcursor_id);
CHECK(res) << "SubcursorPull RPC failed!";
if (!res->vertex) return std::experimental::nullopt;
data_manager_->Emplace<Vertex>(
dba->transaction_id(), res->vertex->global_address.gid(),
distributed::CachedRecordData<Vertex>(
res->cypher_id, std::move(res->vertex->old_element_output),
std::move(res->vertex->new_element_output)));
return VertexAccessor(res->vertex->global_address, *dba);
return res->vertex;
}
bool BfsRpcClients::ExpandLevel(
@ -137,22 +135,6 @@ bool BfsRpcClients::ExpandToRemoteVertex(
return res->member;
}
PathSegment BuildPathSegment(ReconstructPathRes *res,
database::GraphDbAccessor *dba,
distributed::DataManager *data_manager) {
std::vector<EdgeAccessor> edges;
for (auto &edge : res->edges) {
data_manager->Emplace<Edge>(
dba->transaction_id(), edge.global_address.gid(),
distributed::CachedRecordData<Edge>(
edge.cypher_id, std::move(edge.old_element_output),
std::move(edge.new_element_output)));
edges.emplace_back(edge.global_address, *dba);
}
return PathSegment{edges, res->next_vertex, res->next_edge};
}
PathSegment BfsRpcClients::ReconstructPath(
const std::unordered_map<int16_t, int64_t> &subcursor_ids,
storage::VertexAddress vertex, database::GraphDbAccessor *dba) {
@ -162,9 +144,16 @@ PathSegment BfsRpcClients::ReconstructPath(
->ReconstructPath(vertex);
}
auto res = clients_->GetClientPool(worker_id).Call<ReconstructPathRpc>(
subcursor_ids.at(worker_id), vertex);
return BuildPathSegment(&res.value(), dba, data_manager_);
auto res =
clients_->GetClientPool(worker_id).CallWithLoad<ReconstructPathRpc>(
[this, dba](const auto &reader) {
ReconstructPathRes res;
res.Load(reader, dba, this->data_manager_);
return res;
},
subcursor_ids.at(worker_id), vertex);
CHECK(res) << "ReconstructPath RPC failed!";
return PathSegment{res->edges, res->next_vertex, res->next_edge};
}
PathSegment BfsRpcClients::ReconstructPath(
@ -175,9 +164,16 @@ PathSegment BfsRpcClients::ReconstructPath(
return subcursor_storage_->Get(subcursor_ids.at(worker_id))
->ReconstructPath(edge);
}
auto res = clients_->GetClientPool(worker_id).Call<ReconstructPathRpc>(
subcursor_ids.at(worker_id), edge);
return BuildPathSegment(&res.value(), dba, data_manager_);
auto res =
clients_->GetClientPool(worker_id).CallWithLoad<ReconstructPathRpc>(
[this, dba](const auto &reader) {
ReconstructPathRes res;
res.Load(reader, dba, this->data_manager_);
return res;
},
subcursor_ids.at(worker_id), edge);
CHECK(res) << "ReconstructPath RPC failed!";
return PathSegment{res->edges, res->next_vertex, res->next_edge};
}
void BfsRpcClients::PrepareForExpand(

View File

@ -7,6 +7,7 @@
#include "distributed/bfs_rpc_messages.capnp.h"
#include "distributed/bfs_subcursor.hpp"
#include "query/plan/operator.hpp"
#include "storage/serialization.hpp"
#include "transactions/type.hpp"
#include "utils/serialization.hpp"
cpp<#
@ -16,7 +17,6 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'ast "/query/frontend/ast/ast.capnp")
(lcp:capnp-import 'dis "/distributed/serialization.capnp")
(lcp:capnp-import 'query "/query/common.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
@ -24,80 +24,6 @@ cpp<#
(lcp:capnp-type-conversion "storage::EdgeAddress" "Storage.Address")
(lcp:capnp-type-conversion "storage::VertexAddress" "Storage.Address")
(defun save-element (builder member)
#>cpp
if (${member}) {
if constexpr (std::is_same<TElement, Vertex>::value) {
auto builder = ${builder}.initVertex();
SaveVertex(*${member}, &builder, worker_id);
} else {
auto builder = ${builder}.initEdge();
SaveEdge(*${member}, &builder, worker_id);
}
} else {
${builder}.setNull();
}
cpp<#)
(defun load-element (reader member)
(let ((output-member (cl-ppcre:regex-replace "input$" member "output")))
#>cpp
if (!${reader}.isNull()) {
if constexpr (std::is_same<TElement, Vertex>::value) {
const auto reader = ${reader}.getVertex();
${output-member} = LoadVertex(reader);
} else {
const auto reader = ${reader}.getEdge();
${output-member} = LoadEdge(reader);
}
}
cpp<#))
(lcp:define-struct (serialized-graph-element t-element) ()
((cypher-id :int64_t)
(global-address "storage::Address<mvcc::VersionList<TElement>>"
:capnp-type "Storage.Address")
(old-element-input "TElement *"
:capnp-type '((null "Void") (vertex "Dis.Vertex") (edge "Dis.Edge"))
:capnp-save #'save-element :capnp-load #'load-element)
(old-element-output "std::unique_ptr<TElement>" :capnp-save :dont-save)
(new-element-input "TElement *"
:capnp-type '((null "Void") (vertex "Dis.Vertex") (edge "Dis.Edge"))
:capnp-save #'save-element :capnp-load #'load-element)
(new-element-output "std::unique_ptr<TElement>" :capnp-save :dont-save)
(worker-id :int16_t :capnp-save :dont-save))
(:public
#>cpp
SerializedGraphElement(int64_t cypher_id,
storage::Address<mvcc::VersionList<TElement>> global_address,
TElement *old_element_input, TElement *new_element_input,
int16_t worker_id)
: cypher_id(cypher_id),
global_address(global_address),
old_element_input(old_element_input),
old_element_output(nullptr),
new_element_input(new_element_input),
new_element_output(nullptr),
worker_id(worker_id) {
CHECK(global_address.is_remote())
<< "Only global addresses should be used with SerializedGraphElement";
}
SerializedGraphElement(const RecordAccessor<TElement> &accessor, int16_t worker_id)
: SerializedGraphElement(accessor.CypherId(),
accessor.GlobalAddress(),
accessor.GetOld(), accessor.GetNew(),
worker_id) {}
SerializedGraphElement() {}
cpp<#)
(:serialize :capnp :type-args '(vertex edge)))
#>cpp
using SerializedVertex = SerializedGraphElement<Vertex>;
using SerializedEdge = SerializedGraphElement<Edge>;
cpp<#
(lcp:define-rpc create-bfs-subcursor
(:request
((tx-id "tx::TransactionId" :capnp-type "UInt64")
@ -163,13 +89,24 @@ cpp<#
(:response ((member :bool))))
(lcp:define-rpc subcursor-pull
(:request ((member :int64_t)))
(:request ((member :int64_t)))
(:response
((cypher-id :int64_t)
(vertex "std::experimental::optional<SerializedVertex>" :initarg :move
:capnp-type "Utils.Optional(SerializedGraphElement)"
:capnp-save (lcp:capnp-save-optional "capnp::SerializedGraphElement" "SerializedVertex")
:capnp-load (lcp:capnp-load-optional "capnp::SerializedGraphElement" "SerializedVertex")))))
((vertex "std::experimental::optional<VertexAccessor>"
:capnp-type "Utils.Optional(Storage.VertexAccessor)"
:capnp-save (lcp:capnp-save-optional
"storage::capnp::VertexAccessor"
"VertexAccessor"
"[](auto *builder, const auto &vertex) {
storage::SaveVertexAccessor(vertex, builder, storage::SendVersions::BOTH);
}")
:capnp-load (lcp:capnp-load-optional
"storage::capnp::VertexAccessor"
"VertexAccessor"
"[dba, data_manager](const auto &reader) {
return storage::LoadVertexAccessor(reader, dba, data_manager);
}")))
(:serialize :capnp :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))))
(lcp:define-rpc set-source
(:request
@ -211,17 +148,25 @@ cpp<#
edge(edge) {}
cpp<#))
(:response
((subcursor-id :int64_t ;; TODO(mtomic): Unused?
:capnp-save :dont-save)
(edges "std::vector<SerializedEdge>" :capnp-type "List(SerializedGraphElement)"
:capnp-save (lcp:capnp-save-vector "capnp::SerializedGraphElement" "SerializedEdge")
:capnp-load (lcp:capnp-load-vector "capnp::SerializedGraphElement" "SerializedEdge"))
((edges "std::vector<EdgeAccessor>" :capnp-type "List(Storage.EdgeAccessor)"
:capnp-save (lcp:capnp-save-vector "storage::capnp::EdgeAccessor"
"EdgeAccessor"
"[](auto *builder, const auto &edge) {
storage::SaveEdgeAccessor(edge, builder, storage::SendVersions::BOTH);
}")
:capnp-load (lcp:capnp-load-vector "storage::capnp::EdgeAccessor"
"EdgeAccessor"
"[dba, data_manager](const auto &reader) {
return storage::LoadEdgeAccessor(reader, dba, data_manager);
}"))
(next-vertex "std::experimental::optional<storage::VertexAddress>"
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::VertexAddress")
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::VertexAddress"))
(next-edge "std::experimental::optional<storage::EdgeAddress>"
:capnp-save (lcp:capnp-save-optional "storage::capnp::Address" "storage::EdgeAddress")
:capnp-load (lcp:capnp-load-optional "storage::capnp::Address" "storage::EdgeAddress")))
(:serialize :capnp :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *")))
(:public
#>cpp
using Capnp = capnp::ReconstructPathRes;
@ -230,16 +175,12 @@ cpp<#
ReconstructPathRes() {}
ReconstructPathRes(
const std::vector<EdgeAccessor> &edge_accessors,
const std::vector<EdgeAccessor> &edges,
std::experimental::optional<storage::VertexAddress> next_vertex,
std::experimental::optional<storage::EdgeAddress> next_edge,
int16_t worker_id)
: next_vertex(std::move(next_vertex)), next_edge(std::move(next_edge)) {
std::experimental::optional<storage::EdgeAddress> next_edge)
: edges(edges), next_vertex(std::move(next_vertex)), next_edge(std::move(next_edge)) {
CHECK(!static_cast<bool>(next_vertex) || !static_cast<bool>(next_edge))
<< "At most one of `next_vertex` and `next_edge` should be set";
for (const auto &edge : edge_accessors) {
edges.emplace_back(edge, worker_id);
}
}
cpp<#)))

View File

@ -80,13 +80,7 @@ class BfsRpcServer {
SubcursorPullReq req;
req.Load(req_reader);
auto vertex = subcursor_storage_->Get(req.member)->Pull();
if (!vertex) {
SubcursorPullRes res;
res.Save(res_builder);
return;
}
SubcursorPullRes res(vertex->CypherId(),
SerializedVertex(*vertex, db_->WorkerId()));
SubcursorPullRes res(vertex);
res.Save(res_builder);
});
@ -113,8 +107,7 @@ class BfsRpcServer {
} else {
LOG(FATAL) << "`edge` or `vertex` should be set in ReconstructPathReq";
}
ReconstructPathRes res(result.edges, result.next_vertex, result.next_edge,
db_->WorkerId());
ReconstructPathRes res(result.edges, result.next_vertex, result.next_edge);
res.Save(res_builder);
});

View File

@ -77,6 +77,8 @@ class ExpandBfsSubcursor {
/// Reconstruct the part of path to given vertex stored on this worker.
PathSegment ReconstructPath(storage::VertexAddress vertex_addr);
database::GraphDbAccessor *db_accessor() { return dba_.get(); }
/// Used to reset subcursor state before starting expansion from new source.
void Reset();

View File

@ -2,6 +2,7 @@
#include "distributed/data_rpc_clients.hpp"
#include "distributed/data_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"

View File

@ -2,15 +2,18 @@
#pragma once
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
#include "distributed/rpc_worker_clients.hpp"
#include "storage/gid.hpp"
#include "transactions/type.hpp"
namespace distributed {
class RpcWorkerClients;
template <typename TRecord>
struct RemoteElementInfo {
RemoteElementInfo() = delete;

View File

@ -6,10 +6,10 @@
#include "communication/rpc/messages.hpp"
#include "distributed/data_rpc_messages.capnp.h"
#include "distributed/serialization.hpp"
#include "storage/edge.hpp"
#include "storage/gid.hpp"
#include "storage/vertex.hpp"
#include "storage/serialization.hpp"
#include "transactions/type.hpp"
cpp<#
@ -18,7 +18,7 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
(lcp:capnp-import 'dist "/distributed/serialization.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:define-struct tx-gid-pair ()
((tx-id "tx::TransactionId" :capnp-type "UInt64")
@ -30,17 +30,17 @@ cpp<#
(:response
((cypher-id :int64_t)
(vertex-input "const Vertex *"
:capnp-type "Dist.Vertex"
:capnp-type "Storage.Vertex"
:capnp-save
(lambda (builder member)
#>cpp
SaveVertex(*${member}, &${builder}, worker_id);
storage::SaveVertex(*${member}, &${builder}, worker_id);
cpp<#)
:capnp-load
(lambda (reader member)
(declare (ignore member))
#>cpp
vertex_output = LoadVertex(${reader});
vertex_output = storage::LoadVertex(${reader});
cpp<#))
(worker-id :int64_t :capnp-save :dont-save)
(vertex-output "std::unique_ptr<Vertex>" :initarg nil
@ -51,17 +51,17 @@ cpp<#
(:response
((cypher-id :int64_t)
(edge-input "const Edge *"
:capnp-type "Dist.Edge"
:capnp-type "Storage.Edge"
:capnp-save
(lambda (builder member)
#>cpp
SaveEdge(*${member}, &${builder}, worker_id);
storage::SaveEdge(*${member}, &${builder}, worker_id);
cpp<#)
:capnp-load
(lambda (reader member)
(declare (ignore member))
#>cpp
edge_output = LoadEdge(${reader});
edge_output = storage::LoadEdge(${reader});
cpp<#))
(worker-id :int64_t :capnp-save :dont-save)
(edge-output "std::unique_ptr<Edge>" :initarg nil

View File

@ -10,7 +10,7 @@ VertexMigrator::VertexMigrator(database::GraphDbAccessor *dba) : dba_(dba) {}
void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) {
auto get_props = [](auto &record) {
std::unordered_map<storage::Property, query::TypedValue> properties;
std::unordered_map<storage::Property, PropertyValue> properties;
for (auto prop : record.Properties()) {
properties[prop.first] = prop.second;
}

View File

@ -5,8 +5,9 @@
#include <string>
#include "communication/rpc/messages.hpp"
#include "distributed/serialization.hpp"
#include "distributed/index_rpc_messages.capnp.h"
#include "storage/types.hpp"
#include "transactions/transaction.hpp"
cpp<#
(lcp:namespace distributed)

View File

@ -170,7 +170,7 @@ ProduceRpcServer::OngoingProduce &ProduceRpcServer::GetOngoingProduce(
PullResData ProduceRpcServer::Pull(const PullReq &req) {
auto &ongoing_produce = GetOngoingProduce(req);
PullResData result(db_->WorkerId(), req.send_old, req.send_new);
PullResData result(db_->WorkerId(), req.send_versions);
result.pull_state = PullState::CURSOR_IN_PROGRESS;
if (req.accumulate) {

View File

@ -7,10 +7,10 @@
#include "communication/rpc/messages.hpp"
#include "distributed/pull_produce_rpc_messages.capnp.h"
#include "distributed/serialization.hpp"
#include "query/context.hpp"
#include "query/frontend/semantic/symbol.hpp"
#include "query/parameters.hpp"
#include "query/serialization.hpp"
#include "storage/address_types.hpp"
#include "transactions/type.hpp"
#include "utils/serialization.hpp"
@ -29,8 +29,8 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'ctx "/query/context.capnp")
(lcp:capnp-import 'dis "/distributed/serialization.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'query "/query/serialization.capnp")
(lcp:capnp-import 'sem "/query/frontend/semantic/symbol.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
@ -70,7 +70,7 @@ the relevant parts of the response, ready for use."))
(lcp:define-struct pull-res-data ()
((pull-state "PullState")
(frames "std::vector<std::vector<query::TypedValue>>"
:capnp-type "List(List(Dis.TypedValue))"
:capnp-type "List(List(Query.TypedValue))"
:capnp-save
(lambda (builder member)
#>cpp
@ -80,11 +80,7 @@ the relevant parts of the response, ready for use."))
for (size_t val_i = 0; val_i < frame.size(); ++val_i) {
const auto &value = frame[val_i];
auto value_builder = frame_builder[val_i];
distributed::SaveCapnpTypedValue(
value, &value_builder,
[this](const auto &value, auto *builder) {
this->SaveGraphElement(value, builder);
});
query::SaveCapnpTypedValue(value, &value_builder, send_versions);
}
}
cpp<#)
@ -97,11 +93,8 @@ the relevant parts of the response, ready for use."))
current_frame.reserve(frame_reader.size());
for (const auto &value_reader : frame_reader) {
query::TypedValue value;
distributed::LoadCapnpTypedValue(
value_reader, &value,
[this, dba, data_manager](const auto &reader, auto *value) {
this->LoadGraphElement(dba, reader, value, data_manager);
});
query::LoadCapnpTypedValue(value_reader, &value, dba,
data_manager);
current_frame.emplace_back(value);
}
${member}.emplace_back(current_frame);
@ -112,8 +105,7 @@ the relevant parts of the response, ready for use."))
"Id of the worker on which the response is created, used for
serializing vertices (converting local to global addresses). Indicates which
of (old, new) records of a graph element should be sent.")
(send-old :bool :capnp-save :dont-save)
(send-new :bool :capnp-save :dont-save)
(send-versions "storage::SendVersions" :capnp-save :dont-save)
;; Temporary caches used between deserialization and post-processing
;; (transfering the ownership of this data to a Cache).
(vertices "std::vector<GraphElementData<Vertex>>" :capnp-save :dont-save)
@ -173,153 +165,17 @@ to the appropriate value. Not used on side that generates the response.")
#>cpp
public:
PullResData() {} // Default constructor required for serialization.
PullResData(int worker_id, bool send_old, bool send_new)
: worker_id(worker_id), send_old(send_old), send_new(send_new) {}
PullResData(int worker_id, storage::SendVersions send_versions)
: worker_id(worker_id), send_versions(send_versions) {}
PullResData(const PullResData &) = delete;
PullResData &operator=(const PullResData &) = delete;
PullResData(PullResData &&) = default;
PullResData &operator=(PullResData &&) = default;
cpp<#)
(:private
#>cpp
void SaveGraphElement(const query::TypedValue &,
distributed::capnp::TypedValue::Builder *) const;
void LoadGraphElement(database::GraphDbAccessor *,
const distributed::capnp::TypedValue::Reader &,
query::TypedValue *, distributed::DataManager *);
cpp<#)
(:serialize :capnp :load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *"))))
(lcp:in-impl
#>cpp
void PullResData::SaveGraphElement(
const query::TypedValue &value,
distributed::capnp::TypedValue::Builder *builder) const {
auto save_element = [this](auto accessor, auto *builder) {
builder->setCypherId(accessor.CypherId());
builder->setAddress(accessor.GlobalAddress().raw());
// If both old and new are null, we need to reconstruct
if (!(accessor.GetOld() || accessor.GetNew())) {
bool result = accessor.Reconstruct();
CHECK(result) << "Attempting to serialize an element not visible to "
"current transaction.";
}
auto *old_rec = accessor.GetOld();
if (send_old && old_rec) {
auto old_builder = builder->initOld();
distributed::SaveElement(*old_rec, &old_builder, worker_id);
}
if (send_new) {
// Must call SwitchNew as that will trigger a potentially necesary
// Reconstruct.
accessor.SwitchNew();
auto *new_rec = accessor.GetNew();
if (new_rec) {
auto new_builder = builder->initNew();
distributed::SaveElement(*new_rec, &new_builder, worker_id);
}
}
};
switch (value.type()) {
case query::TypedValue::Type::Vertex: {
auto vertex_builder = builder->initVertex();
save_element(value.ValueVertex(), &vertex_builder);
break;
}
case query::TypedValue::Type::Edge: {
auto edge_builder = builder->initEdge();
save_element(value.ValueEdge(), &edge_builder);
break;
}
case query::TypedValue::Type::Path: {
const auto &path = value.ValuePath();
auto path_builder = builder->initPath();
auto vertices_builder = path_builder.initVertices(path.vertices().size());
for (size_t i = 0; i < path.vertices().size(); ++i) {
auto vertex_builder = vertices_builder[i];
save_element(path.vertices()[i], &vertex_builder);
}
auto edges_builder = path_builder.initEdges(path.edges().size());
for (size_t i = 0; i < path.edges().size(); ++i) {
auto edge_builder = edges_builder[i];
save_element(path.edges()[i], &edge_builder);
}
break;
}
default:
LOG(FATAL) << "Unsupported graph element type: " << value.type();
}
}
void PullResData::LoadGraphElement(
database::GraphDbAccessor *dba,
const distributed::capnp::TypedValue::Reader &reader,
query::TypedValue *value, distributed::DataManager *data_manager) {
auto load_vertex = [dba, data_manager](const auto &vertex_reader) {
int64_t cypher_id = vertex_reader.getCypherId();
storage::VertexAddress global_address(vertex_reader.getAddress());
auto old_record =
vertex_reader.hasOld()
? distributed::LoadVertex(vertex_reader.getOld())
: nullptr;
auto new_record =
vertex_reader.hasNew()
? distributed::LoadVertex(vertex_reader.getNew())
: nullptr;
data_manager->Emplace<Vertex>(
dba->transaction_id(), global_address.gid(),
distributed::CachedRecordData<Vertex>(cypher_id,
std::move(old_record),
std::move(new_record)));
// We don't need to pass cypher_id here because cypher_id is going to be
// fetched from the cache.
return VertexAccessor(global_address, *dba);
};
auto load_edge = [dba, data_manager](const auto &edge_reader) {
int64_t cypher_id = edge_reader.getCypherId();
storage::EdgeAddress global_address(edge_reader.getAddress());
auto old_record =
edge_reader.hasOld()
? distributed::LoadEdge(edge_reader.getOld())
: nullptr;
auto new_record =
edge_reader.hasNew()
? distributed::LoadEdge(edge_reader.getNew())
: nullptr;
data_manager->Emplace<Edge>(
dba->transaction_id(), global_address.gid(),
distributed::CachedRecordData<Edge>(cypher_id,
std::move(old_record),
std::move(new_record)));
return EdgeAccessor(global_address, *dba);
};
switch (reader.which()) {
case distributed::capnp::TypedValue::VERTEX:
*value = load_vertex(reader.getVertex());
break;
case distributed::capnp::TypedValue::EDGE:
*value = load_edge(reader.getEdge());
break;
case distributed::capnp::TypedValue::PATH: {
auto vertices_reader = reader.getPath().getVertices();
auto edges_reader = reader.getPath().getEdges();
query::Path path(load_vertex(vertices_reader[0]));
for (size_t i = 0; i < edges_reader.size(); ++i) {
path.Expand(load_edge(edges_reader[i]));
path.Expand(load_vertex(vertices_reader[i + 1]));
}
*value = path;
break;
}
default:
LOG(FATAL) << "Unsupported graph element type.";
}
}
cpp<#)
(lcp:define-rpc pull
(:request
((tx-id "tx::TransactionId")
@ -331,7 +187,7 @@ cpp<#)
(plan-id :int64_t)
(command-id "tx::CommandId")
(evaluation-context "query::EvaluationContext"
:capnp-type "Ctx.EvaluationContext"
:capnp-type "Query.EvaluationContext"
:capnp-save
(lambda (builder member)
#>cpp
@ -343,7 +199,7 @@ cpp<#)
auto key_builder = builder.initKey();
key_builder.setValue(entry.first);
auto value_builder = builder.initValue();
distributed::SaveCapnpTypedValue(entry.second, &value_builder);
storage::SaveCapnpPropertyValue(entry.second, &value_builder);
++i;
}
cpp<#)
@ -352,8 +208,8 @@ cpp<#)
#>cpp
${member}.timestamp = ${reader}.getTimestamp();
for (const auto &entry_reader : ${reader}.getParams().getEntries()) {
query::TypedValue value;
distributed::LoadCapnpTypedValue(entry_reader.getValue(), &value);
PropertyValue value;
storage::LoadCapnpPropertyValue(entry_reader.getValue(), &value);
${member}.parameters.Add(entry_reader.getKey().getValue(), value);
}
cpp<#))
@ -361,8 +217,14 @@ cpp<#)
(accumulate :bool)
(batch-size :int64_t)
;; Indicates which of (old, new) records of a graph element should be sent.
(send-old :bool)
(send-new :bool)))
(send-versions "storage::SendVersions"
:capnp-type "Storage.SendVersions" :capnp-init nil
:capnp-save (lcp:capnp-save-enum "storage::capnp::SendVersions"
"storage::SendVersions"
'(both only-old only-new))
:capnp-load (lcp:capnp-load-enum "storage::capnp::SendVersions"
"storage::SendVersions"
'(both only-old only-new)))))
(:response
((data "PullResData" :initarg :move))
(:serialize :capnp :base t :load-args '((dba "database::GraphDbAccessor *")

View File

@ -24,7 +24,7 @@ utils::Future<PullData> PullRpcClients::Pull(
auto result = client_pool.CallWithLoad<PullRpc>(
load_pull_res, dba->transaction_id(), dba->transaction().snapshot(),
plan_id, command_id, evaluation_context, symbols, accumulate,
batch_size, true, true);
batch_size, storage::SendVersions::BOTH);
return PullData{result->data.pull_state, std::move(result->data.frames)};
});
}

View File

@ -1,73 +0,0 @@
@0xccb448f0b998d9c8;
using Cxx = import "/capnp/c++.capnp";
$Cxx.namespace("distributed::capnp");
struct Address {
gid @0 :UInt64;
workerId @1 :Int16;
}
struct PropertyValue {
id @0 :UInt16;
value @1 :TypedValue;
}
struct Edge {
from @0 :Address;
to @1 :Address;
typeId @2 :UInt16;
properties @3 :List(PropertyValue);
}
struct Vertex {
outEdges @0 :List(EdgeEntry);
inEdges @1 :List(EdgeEntry);
labelIds @2 :List(UInt16);
properties @3 :List(PropertyValue);
struct EdgeEntry {
vertexAddress @0 :Address;
edgeAddress @1 :Address;
edgeTypeId @2 :UInt16;
}
}
struct TypedValue {
union {
nullType @0 :Void;
bool @1 :Bool;
integer @2 :Int64;
double @3 :Float64;
string @4 :Text;
list @5 :List(TypedValue);
map @6 :List(Entry);
vertex @7 :VertexAccessor;
edge @8 :EdgeAccessor;
path @9 :Path;
}
struct Entry {
key @0 :Text;
value @1 :TypedValue;
}
struct VertexAccessor {
cypherId @0 :Int64;
address @1 :UInt64;
old @2 :Vertex;
new @3 :Vertex;
}
struct EdgeAccessor {
cypherId @0 :Int64;
address @1 :UInt64;
old @2 :Edge;
new @3 :Edge;
}
struct Path {
vertices @0 :List(VertexAccessor);
edges @1 :List(EdgeAccessor);
}
}

View File

@ -1,118 +0,0 @@
#include "distributed/serialization.hpp"
namespace {
template <class TAddress>
void SaveAddress(TAddress address,
distributed::capnp::Address::Builder *builder,
int16_t worker_id) {
builder->setGid(address.is_local() ? address.local()->gid_ : address.gid());
builder->setWorkerId(address.is_local() ? worker_id : address.worker_id());
}
storage::VertexAddress LoadVertexAddress(
const distributed::capnp::Address::Reader &reader) {
return {reader.getGid(), reader.getWorkerId()};
}
storage::EdgeAddress LoadEdgeAddress(
const distributed::capnp::Address::Reader &reader) {
return {reader.getGid(), reader.getWorkerId()};
}
void SaveProperties(
const PropertyValueStore &props,
::capnp::List<distributed::capnp::PropertyValue>::Builder *builder) {
int64_t i = 0;
for (const auto &kv : props) {
auto prop_builder = (*builder)[i];
prop_builder.setId(kv.first.Id());
auto value_builder = prop_builder.initValue();
distributed::SaveCapnpTypedValue(kv.second, &value_builder);
++i;
}
}
PropertyValueStore LoadProperties(
const ::capnp::List<distributed::capnp::PropertyValue>::Reader &reader) {
PropertyValueStore props;
for (const auto &prop_reader : reader) {
query::TypedValue value;
distributed::LoadCapnpTypedValue(prop_reader.getValue(), &value);
props.set(storage::Property(prop_reader.getId()), value);
}
return props;
}
} // namespace
namespace distributed {
void SaveVertex(const Vertex &vertex, capnp::Vertex::Builder *builder,
int16_t worker_id) {
auto save_edges = [worker_id](const auto &edges, auto *edges_builder) {
int64_t i = 0;
for (const auto &edge : edges) {
auto edge_builder = (*edges_builder)[i];
auto vertex_addr_builder = edge_builder.initVertexAddress();
SaveAddress(edge.vertex, &vertex_addr_builder, worker_id);
auto edge_addr_builder = edge_builder.initEdgeAddress();
SaveAddress(edge.edge, &edge_addr_builder, worker_id);
edge_builder.setEdgeTypeId(edge.edge_type.Id());
++i;
}
};
auto out_builder = builder->initOutEdges(vertex.out_.size());
save_edges(vertex.out_, &out_builder);
auto in_builder = builder->initInEdges(vertex.in_.size());
save_edges(vertex.in_, &in_builder);
auto labels_builder = builder->initLabelIds(vertex.labels_.size());
for (size_t i = 0; i < vertex.labels_.size(); ++i) {
labels_builder.set(i, vertex.labels_[i].Id());
}
auto properties_builder = builder->initProperties(vertex.properties_.size());
SaveProperties(vertex.properties_, &properties_builder);
}
std::unique_ptr<Vertex> LoadVertex(const capnp::Vertex::Reader &reader) {
auto vertex = std::make_unique<Vertex>();
auto load_edges = [](const auto &edges_reader) {
Edges edges;
for (const auto &edge_reader : edges_reader) {
auto vertex_address = LoadVertexAddress(edge_reader.getVertexAddress());
auto edge_address = LoadEdgeAddress(edge_reader.getEdgeAddress());
storage::EdgeType edge_type(edge_reader.getEdgeTypeId());
edges.emplace(vertex_address, edge_address, edge_type);
}
return edges;
};
vertex->out_ = load_edges(reader.getOutEdges());
vertex->in_ = load_edges(reader.getInEdges());
for (const auto &label_id : reader.getLabelIds()) {
vertex->labels_.emplace_back(label_id);
}
vertex->properties_ = LoadProperties(reader.getProperties());
return vertex;
}
void SaveEdge(const Edge &edge, capnp::Edge::Builder *builder,
int16_t worker_id) {
auto from_builder = builder->initFrom();
SaveAddress(edge.from_, &from_builder, worker_id);
auto to_builder = builder->initTo();
SaveAddress(edge.to_, &to_builder, worker_id);
builder->setTypeId(edge.edge_type_.Id());
auto properties_builder = builder->initProperties(edge.properties_.size());
SaveProperties(edge.properties_, &properties_builder);
}
std::unique_ptr<Edge> LoadEdge(const capnp::Edge::Reader &reader) {
auto from = LoadVertexAddress(reader.getFrom());
auto to = LoadVertexAddress(reader.getTo());
auto edge =
std::make_unique<Edge>(from, to, storage::EdgeType{reader.getTypeId()});
edge->properties_ = LoadProperties(reader.getProperties());
return edge;
}
} // namespace distributed

View File

@ -1,143 +0,0 @@
#pragma once
#include <cstdint>
#include <memory>
#include "distributed/serialization.capnp.h"
#include "query/typed_value.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
#include "utils/exceptions.hpp"
namespace distributed {
void SaveVertex(const Vertex &vertex, capnp::Vertex::Builder *builder,
int16_t worker_id);
void SaveEdge(const Edge &edge, capnp::Edge::Builder *builder,
int16_t worker_id);
/// Alias for `SaveEdge` allowing for param type resolution.
inline void SaveElement(const Edge &record, capnp::Edge::Builder *builder,
int16_t worker_id) {
return SaveEdge(record, builder, worker_id);
}
/// Alias for `SaveVertex` allowing for param type resolution.
inline void SaveElement(const Vertex &record, capnp::Vertex::Builder *builder,
int16_t worker_id) {
return SaveVertex(record, builder, worker_id);
}
std::unique_ptr<Vertex> LoadVertex(const capnp::Vertex::Reader &reader);
std::unique_ptr<Edge> LoadEdge(const capnp::Edge::Reader &reader);
inline void SaveCapnpTypedValue(
const query::TypedValue &value, capnp::TypedValue::Builder *builder,
std::function<void(const query::TypedValue &, capnp::TypedValue::Builder *)>
save_graph_element = nullptr) {
switch (value.type()) {
case query::TypedValue::Type::Null:
builder->setNullType();
return;
case query::TypedValue::Type::Bool:
builder->setBool(value.Value<bool>());
return;
case query::TypedValue::Type::Int:
builder->setInteger(value.Value<int64_t>());
return;
case query::TypedValue::Type::Double:
builder->setDouble(value.Value<double>());
return;
case query::TypedValue::Type::String:
builder->setString(value.Value<std::string>());
return;
case query::TypedValue::Type::List: {
const auto &values = value.Value<std::vector<query::TypedValue>>();
auto list_builder = builder->initList(values.size());
for (size_t i = 0; i < values.size(); ++i) {
auto value_builder = list_builder[i];
SaveCapnpTypedValue(values[i], &value_builder, save_graph_element);
}
return;
}
case query::TypedValue::Type::Map: {
const auto &map = value.Value<std::map<std::string, query::TypedValue>>();
auto map_builder = builder->initMap(map.size());
size_t i = 0;
for (const auto &kv : map) {
auto kv_builder = map_builder[i];
kv_builder.setKey(kv.first);
auto value_builder = kv_builder.initValue();
SaveCapnpTypedValue(kv.second, &value_builder, save_graph_element);
++i;
}
return;
}
case query::TypedValue::Type::Vertex:
case query::TypedValue::Type::Edge:
case query::TypedValue::Type::Path:
if (save_graph_element) {
save_graph_element(value, builder);
} else {
throw utils::BasicException(
"Unable to serialize TypedValue of type: {}", value.type());
}
}
}
inline void LoadCapnpTypedValue(
const capnp::TypedValue::Reader &reader, query::TypedValue *value,
std::function<void(const capnp::TypedValue::Reader &, query::TypedValue *)>
load_graph_element = nullptr) {
switch (reader.which()) {
case distributed::capnp::TypedValue::NULL_TYPE:
*value = query::TypedValue::Null;
return;
case distributed::capnp::TypedValue::BOOL:
*value = reader.getBool();
return;
case distributed::capnp::TypedValue::INTEGER:
*value = reader.getInteger();
return;
case distributed::capnp::TypedValue::DOUBLE:
*value = reader.getDouble();
return;
case distributed::capnp::TypedValue::STRING:
*value = reader.getString().cStr();
return;
case distributed::capnp::TypedValue::LIST: {
std::vector<query::TypedValue> list;
list.reserve(reader.getList().size());
for (const auto &value_reader : reader.getList()) {
list.emplace_back();
LoadCapnpTypedValue(value_reader, &list.back(), load_graph_element);
}
*value = list;
return;
}
case distributed::capnp::TypedValue::MAP: {
std::map<std::string, query::TypedValue> map;
for (const auto &kv_reader : reader.getMap()) {
auto key = kv_reader.getKey().cStr();
LoadCapnpTypedValue(kv_reader.getValue(), &map[key],
load_graph_element);
}
*value = map;
return;
}
case distributed::capnp::TypedValue::VERTEX:
case distributed::capnp::TypedValue::EDGE:
case distributed::capnp::TypedValue::PATH:
if (load_graph_element) {
load_graph_element(reader, value);
} else {
throw utils::BasicException(
"Unexpected TypedValue type '{}' when loading from archive",
reader.which());
}
}
}
} // namespace distributed

View File

@ -5,7 +5,6 @@
#include <string>
#include "communication/rpc/messages.hpp"
#include "distributed/serialization.hpp"
#include "distributed/token_sharing_rpc_messages.capnp.h"
cpp<#

View File

@ -35,7 +35,7 @@ UpdateResult UpdatesRpcClients::Update(int worker_id,
CreatedVertexInfo UpdatesRpcClients::CreateVertex(
int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue> &properties,
const std::unordered_map<storage::Property, PropertyValue> &properties,
std::experimental::optional<int64_t> cypher_id) {
auto res = worker_clients_.GetClientPool(worker_id).Call<CreateVertexRpc>(
CreateVertexReqData{tx_id, labels, properties, cypher_id});

View File

@ -30,7 +30,7 @@ class UpdatesRpcClients {
CreatedVertexInfo CreateVertex(
int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
const std::unordered_map<storage::Property, PropertyValue>
&properties,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);

View File

@ -17,7 +17,6 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'db "/database/state_delta.capnp")
(lcp:capnp-import 'dis "/distributed/serialization.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
@ -57,30 +56,30 @@ cpp<#
(labels "std::vector<storage::Label>"
:capnp-save (lcp:capnp-save-vector "storage::capnp::Common" "storage::Label")
:capnp-load (lcp:capnp-load-vector "storage::capnp::Common" "storage::Label"))
(properties "std::unordered_map<storage::Property, query::TypedValue>"
:capnp-type "Utils.Map(Storage.Common, Dis.TypedValue)"
(properties "std::unordered_map<storage::Property, PropertyValue>"
:capnp-type "Utils.Map(Storage.Common, Storage.PropertyValue)"
:capnp-save
(lambda (builder member)
#>cpp
utils::SaveMap<storage::capnp::Common, capnp::TypedValue>(
utils::SaveMap<storage::capnp::Common, storage::capnp::PropertyValue>(
${member}, &${builder},
[](auto *builder, const auto &entry) {
auto key_builder = builder->initKey();
entry.first.Save(&key_builder);
auto value_builder = builder->initValue();
distributed::SaveCapnpTypedValue(entry.second, &value_builder);
storage::SaveCapnpPropertyValue(entry.second, &value_builder);
});
cpp<#)
:capnp-load
(lambda (reader member)
#>cpp
utils::LoadMap<storage::capnp::Common, capnp::TypedValue>(
utils::LoadMap<storage::capnp::Common, storage::capnp::PropertyValue>(
&${member}, ${reader},
[](const auto &reader) {
storage::Property prop;
prop.Load(reader.getKey());
query::TypedValue value;
distributed::LoadCapnpTypedValue(reader.getValue(), &value);
PropertyValue value;
storage::LoadCapnpPropertyValue(reader.getValue(), &value);
return std::make_pair(prop, value);
});
cpp<#))

View File

@ -62,7 +62,7 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
template <typename TRecordAccessor>
CreatedInfo UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue> &properties,
const std::unordered_map<storage::Property, PropertyValue> &properties,
std::experimental::optional<int64_t> cypher_id) {
auto result =
db_accessor_->InsertVertex(std::experimental::nullopt, cypher_id);

View File

@ -46,8 +46,7 @@ class UpdatesRpcServer {
/// Creates a new vertex and returns it's cypher_id and gid.
CreatedInfo CreateVertex(
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties,
const std::unordered_map<storage::Property, PropertyValue> &properties,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);

View File

@ -174,7 +174,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
}
for (const auto &property_pair : vertex->properties) {
vertex_accessor.PropsSet(dba->Property(property_pair.first),
glue::ToTypedValue(property_pair.second));
glue::ToPropertyValue(property_pair.second));
}
auto vertex_record = vertex_accessor.GetNew();
for (const auto &edge : vertex->in) {
@ -244,7 +244,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
for (const auto &property_pair : edge.properties)
edge_accessor.PropsSet(dba->Property(property_pair.first),
glue::ToTypedValue(property_pair.second));
glue::ToPropertyValue(property_pair.second));
}
// Vertex and edge counts are included in the hash. Re-read them to update the

View File

@ -38,11 +38,11 @@ using TEncoder =
std::vector<std::string> BoltSession::Interpret(
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
std::map<std::string, query::TypedValue> params_tv;
std::map<std::string, PropertyValue> params_pv;
for (const auto &kv : params)
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
try {
auto result = transaction_engine_.Interpret(query, params_tv);
auto result = transaction_engine_.Interpret(query, params_pv);
if (user_) {
const auto &permissions = user_->GetPermissions();
for (const auto &privilege : result.second) {
@ -108,11 +108,11 @@ void KafkaStreamWriter(
const std::map<std::string, communication::bolt::Value> &params) {
auto dba = session_data.db->Access();
KafkaResultStream stream;
std::map<std::string, query::TypedValue> params_tv;
std::map<std::string, PropertyValue> params_pv;
for (const auto &kv : params)
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
try {
(*session_data.interpreter)(query, *dba, params_tv, false).PullAll(stream);
(*session_data.interpreter)(query, *dba, params_pv, false).PullAll(stream);
dba->Commit();
} catch (const query::QueryException &e) {
LOG(WARNING) << "[Kafka] query execution failed with an exception: "

View File

@ -72,7 +72,7 @@ template <class TRecordAccessor>
void PropsSetChecked(TRecordAccessor *record, const storage::Property &key,
const TypedValue &value) {
try {
record->PropsSet(key, value);
record->PropsSet(key, PropertyValue(value));
} catch (const TypedValueException &) {
throw QueryRuntimeException("'{}' cannot be used as a property value.",
value.type());

View File

@ -1,12 +0,0 @@
@0xf2d47a8877eb7f4f;
using Cxx = import "/capnp/c++.capnp";
using Dis = import "/distributed/serialization.capnp";
using Utils = import "/utils/serialization.capnp";
$Cxx.namespace("query::capnp");
struct EvaluationContext {
timestamp @0 : Int64;
params @1 : Utils.Map(Utils.BoxInt64, Dis.TypedValue);
}

View File

@ -3,7 +3,6 @@
using Cxx = import "/capnp/c++.capnp";
$Cxx.namespace("query::capnp");
using Dis = import "/distributed/serialization.capnp";
using Storage = import "/storage/serialization.capnp";
using Symbols = import "/query/frontend/semantic/symbol.capnp";
@ -236,7 +235,7 @@ struct BaseLiteral {
struct PrimitiveLiteral {
tokenPosition @0 :Int32;
value @1 :Dis.TypedValue;
value @1 :Storage.PropertyValue;
}
struct ListLiteral {

View File

@ -2,7 +2,7 @@
#include <algorithm>
#include "distributed/serialization.hpp"
#include "storage/serialization.hpp"
#include "utils/serialization.capnp.h"
namespace query {
@ -238,8 +238,8 @@ void PrimitiveLiteral::Save(capnp::BaseLiteral::Builder *base_literal_builder,
BaseLiteral::Save(base_literal_builder, saved_uids);
auto primitive_literal_builder = base_literal_builder->initPrimitiveLiteral();
primitive_literal_builder.setTokenPosition(token_position_);
auto typed_value_builder = primitive_literal_builder.getValue();
distributed::SaveCapnpTypedValue(value_, &typed_value_builder);
auto property_value_builder = primitive_literal_builder.getValue();
storage::SaveCapnpPropertyValue(value_, &property_value_builder);
}
void PrimitiveLiteral::Load(const capnp::Tree::Reader &reader,
@ -249,7 +249,7 @@ void PrimitiveLiteral::Load(const capnp::Tree::Reader &reader,
auto pl_reader =
reader.getExpression().getBaseLiteral().getPrimitiveLiteral();
auto typed_value_reader = pl_reader.getValue();
distributed::LoadCapnpTypedValue(typed_value_reader, &value_);
storage::LoadCapnpPropertyValue(typed_value_reader, &value_);
token_position_ = pl_reader.getTokenPosition();
}

View File

@ -8,6 +8,7 @@
#include "query/frontend/semantic/symbol.hpp"
#include "query/interpret/awesome_memgraph_functions.hpp"
#include "query/typed_value.hpp"
#include "storage/property_value.hpp"
#include "storage/types.hpp"
#include "utils/serialization.hpp"
@ -754,7 +755,7 @@ class PrimitiveLiteral : public BaseLiteral {
static PrimitiveLiteral *Construct(
const capnp::PrimitiveLiteral::Reader &reader, AstStorage *storage);
TypedValue value_;
PropertyValue value_;
// This field contains token position of literal used to create
// PrimitiveLiteral object. If PrimitiveLiteral object is not created from
// query leave its value at -1.

View File

@ -66,7 +66,7 @@ StrippedQuery::StrippedQuery(const std::string &query) : original_(query) {
// literals_. In stripped query text literal is replaced with a new_value.
// new_value can be any value that is lexed as a literal.
auto replace_stripped = [this, &token_strings](int position,
const TypedValue &value,
const PropertyValue &value,
const std::string &new_value) {
literals_.Add(position, value);
token_strings.push_back(new_value);

View File

@ -4,7 +4,6 @@
#include <unordered_map>
#include "query/parameters.hpp"
#include "query/typed_value.hpp"
#include "utils/hashing/fnv.hpp"
namespace query {

View File

@ -25,7 +25,7 @@ Interpreter::CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan)
Interpreter::Results Interpreter::operator()(
const std::string &query, database::GraphDbAccessor &db_accessor,
const std::map<std::string, TypedValue> &params,
const std::map<std::string, PropertyValue> &params,
bool in_explicit_transaction) {
utils::Timer frontend_timer;

View File

@ -169,7 +169,7 @@ class Interpreter {
*/
Results operator()(const std::string &query,
database::GraphDbAccessor &db_accessor,
const std::map<std::string, TypedValue> &params,
const std::map<std::string, PropertyValue> &params,
bool in_explicit_transaction);
auth::Auth *auth_ = nullptr;

View File

@ -1,16 +1,17 @@
#pragma once
#include "glog/logging.h"
#include <algorithm>
#include <utility>
#include <vector>
#include "query/typed_value.hpp"
#include "storage/property_value.hpp"
/**
* Encapsulates user provided parameters (and stripped literals)
* and provides ways of obtaining them by position.
*/
// TODO move to namespace query::
namespace query {
struct Parameters {
@ -21,7 +22,7 @@ struct Parameters {
* @param position Token position in query of value.
* @param value
*/
void Add(int position, const query::TypedValue &value) {
void Add(int position, const PropertyValue &value) {
storage_.emplace_back(position, value);
}
@ -31,9 +32,9 @@ struct Parameters {
* @param position Token position in query of value.
* @return Value for the given token position.
*/
const query::TypedValue &AtTokenPosition(int position) const {
const PropertyValue &AtTokenPosition(int position) const {
auto found = std::find_if(storage_.begin(), storage_.end(),
[&](const std::pair<int, query::TypedValue> a) {
[&](const std::pair<int, PropertyValue> a) {
return a.first == position;
});
CHECK(found != storage_.end())
@ -48,7 +49,7 @@ struct Parameters {
* @param position Which stripped param is sought.
* @return Token position and value for sought param.
*/
const std::pair<int, query::TypedValue> &At(int position) const {
const std::pair<int, PropertyValue> &At(int position) const {
CHECK(position < static_cast<int>(storage_.size())) << "Invalid position";
return storage_[position];
}
@ -60,7 +61,7 @@ struct Parameters {
auto end() const { return storage_.end(); }
private:
std::vector<std::pair<int, query::TypedValue>> storage_;
std::vector<std::pair<int, PropertyValue>> storage_;
};
} // namespace query

View File

@ -231,12 +231,10 @@ class CostEstimator : public HierarchicalLogicalOperatorVisitor {
std::experimental::optional<PropertyValue> ConstPropertyValue(
const Expression *expression) {
if (auto *literal = dynamic_cast<const PrimitiveLiteral *>(expression)) {
if (literal->value_.IsPropertyValue())
return static_cast<PropertyValue>(literal->value_);
return literal->value_;
} else if (auto *param_lookup =
dynamic_cast<const ParameterLookup *>(expression)) {
auto value = parameters.AtTokenPosition(param_lookup->token_position_);
if (value.IsPropertyValue()) return static_cast<PropertyValue>(value);
return parameters.AtTokenPosition(param_lookup->token_position_);
}
return std::experimental::nullopt;
}

View File

@ -1181,7 +1181,7 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom,
if (worker_id == current_worker_id)
return CreateLocalVertex(node_atom, frame, context);
std::unordered_map<storage::Property, query::TypedValue> properties;
std::unordered_map<storage::Property, PropertyValue> properties;
// Evaluator should use the latest accessors, as modified in this query, when
// setting properties on new nodes.

View File

@ -352,7 +352,7 @@ std::unique_ptr<Cursor> ScanAllByLabelPropertyRange::MakeCursor(
auto value = bound->value()->Accept(evaluator);
try {
return std::experimental::make_optional(
utils::Bound<PropertyValue>(value, bound->type()));
utils::Bound<PropertyValue>(PropertyValue(value), bound->type()));
} catch (const TypedValueException &) {
throw QueryRuntimeException("'{}' cannot be used as a property value.",
value.type());
@ -391,19 +391,19 @@ std::unique_ptr<Cursor> ScanAllByLabelPropertyValue::MakeCursor(
database::GraphDbAccessor &db) const {
auto vertices = [this, &db](Frame &frame, Context &context)
-> std::experimental::optional<decltype(
db.Vertices(label_, property_, TypedValue::Null, false))> {
db.Vertices(label_, property_, PropertyValue::Null, false))> {
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
context.evaluation_context_,
&context.db_accessor_, graph_view_);
auto value = expression_->Accept(evaluator);
if (value.IsNull()) return std::experimental::nullopt;
try {
return std::experimental::make_optional(
db.Vertices(label_, property_, value, graph_view_ == GraphView::NEW));
} catch (const TypedValueException &) {
if (!value.IsPropertyValue()) {
throw QueryRuntimeException("'{}' cannot be used as a property value.",
value.type());
}
return std::experimental::make_optional(
db.Vertices(label_, property_, PropertyValue(value),
graph_view_ == GraphView::NEW));
};
return std::make_unique<ScanAllCursor<decltype(vertices)>>(
output_symbol_, input_->MakeCursor(db), std::move(vertices), db);

View File

@ -0,0 +1,37 @@
@0xf47e119e21912f20;
using Cxx = import "/capnp/c++.capnp";
using Storage = import "/storage/serialization.capnp";
using Utils = import "/utils/serialization.capnp";
$Cxx.namespace("query::capnp");
struct EvaluationContext {
timestamp @0 : Int64;
params @1 : Utils.Map(Utils.BoxInt64, Storage.PropertyValue);
}
struct TypedValue {
union {
nullType @0 :Void;
bool @1 :Bool;
integer @2 :Int64;
double @3 :Float64;
string @4 :Text;
list @5 :List(TypedValue);
map @6 :List(Entry);
vertex @7 :Storage.VertexAccessor;
edge @8 :Storage.EdgeAccessor;
path @9 :Path;
}
struct Entry {
key @0 :Text;
value @1 :TypedValue;
}
struct Path {
vertices @0 :List(Storage.VertexAccessor);
edges @1 :List(Storage.EdgeAccessor);
}
}

140
src/query/serialization.cpp Normal file
View File

@ -0,0 +1,140 @@
#include "serialization.hpp"
#include "distributed/data_manager.hpp"
namespace query {
void SaveCapnpTypedValue(const TypedValue &value,
capnp::TypedValue::Builder *builder,
storage::SendVersions versions) {
switch (value.type()) {
case TypedValue::Type::Null:
builder->setNullType();
return;
case TypedValue::Type::Bool:
builder->setBool(value.Value<bool>());
return;
case TypedValue::Type::Int:
builder->setInteger(value.Value<int64_t>());
return;
case TypedValue::Type::Double:
builder->setDouble(value.Value<double>());
return;
case TypedValue::Type::String:
builder->setString(value.Value<std::string>());
return;
case TypedValue::Type::List: {
const auto &values = value.Value<std::vector<TypedValue>>();
auto list_builder = builder->initList(values.size());
for (size_t i = 0; i < values.size(); ++i) {
auto value_builder = list_builder[i];
SaveCapnpTypedValue(values[i], &value_builder, versions);
}
return;
}
case TypedValue::Type::Map: {
const auto &map = value.Value<std::map<std::string, TypedValue>>();
auto map_builder = builder->initMap(map.size());
size_t i = 0;
for (const auto &kv : map) {
auto kv_builder = map_builder[i];
kv_builder.setKey(kv.first);
auto value_builder = kv_builder.initValue();
SaveCapnpTypedValue(kv.second, &value_builder, versions);
++i;
}
return;
}
case TypedValue::Type::Vertex: {
auto vertex_builder = builder->initVertex();
storage::SaveVertexAccessor(value.ValueVertex(), &vertex_builder,
versions);
return;
}
case TypedValue::Type::Edge: {
auto edge_builder = builder->initEdge();
storage::SaveEdgeAccessor(value.ValueEdge(), &edge_builder, versions);
return;
}
case TypedValue::Type::Path: {
auto path_builder = builder->initPath();
const auto &path = value.ValuePath();
auto vertices_builder = path_builder.initVertices(path.vertices().size());
for (size_t i = 0; i < path.vertices().size(); ++i) {
auto vertex_builder = vertices_builder[i];
storage::SaveVertexAccessor(path.vertices()[i], &vertex_builder,
versions);
}
auto edges_builder = path_builder.initEdges(path.edges().size());
for (size_t i = 0; i < path.edges().size(); ++i) {
auto edge_builder = edges_builder[i];
storage::SaveEdgeAccessor(path.edges()[i], &edge_builder, versions);
}
return;
}
}
}
void LoadCapnpTypedValue(const capnp::TypedValue::Reader &reader,
TypedValue *value, database::GraphDbAccessor *dba,
distributed::DataManager *data_manager) {
switch (reader.which()) {
case capnp::TypedValue::NULL_TYPE:
*value = TypedValue::Null;
return;
case capnp::TypedValue::BOOL:
*value = reader.getBool();
return;
case capnp::TypedValue::INTEGER:
*value = reader.getInteger();
return;
case capnp::TypedValue::DOUBLE:
*value = reader.getDouble();
return;
case capnp::TypedValue::STRING:
*value = reader.getString().cStr();
return;
case capnp::TypedValue::LIST: {
std::vector<TypedValue> list;
list.reserve(reader.getList().size());
for (const auto &value_reader : reader.getList()) {
list.emplace_back();
LoadCapnpTypedValue(value_reader, &list.back(), dba, data_manager);
}
*value = list;
return;
}
case capnp::TypedValue::MAP: {
std::map<std::string, TypedValue> map;
for (const auto &kv_reader : reader.getMap()) {
auto key = kv_reader.getKey().cStr();
LoadCapnpTypedValue(kv_reader.getValue(), &map[key], dba, data_manager);
}
*value = map;
return;
}
case capnp::TypedValue::VERTEX:
*value =
storage::LoadVertexAccessor(reader.getVertex(), dba, data_manager);
return;
case capnp::TypedValue::EDGE:
*value = storage::LoadEdgeAccessor(reader.getEdge(), dba, data_manager);
return;
case capnp::TypedValue::PATH: {
auto vertices_reader = reader.getPath().getVertices();
auto edges_reader = reader.getPath().getEdges();
query::Path path(
storage::LoadVertexAccessor(vertices_reader[0], dba, data_manager));
for (size_t i = 0; i < edges_reader.size(); ++i) {
path.Expand(
storage::LoadEdgeAccessor(edges_reader[i], dba, data_manager));
path.Expand(storage::LoadVertexAccessor(vertices_reader[i + 1], dba,
data_manager));
}
*value = path;
return;
}
}
}
} // namespace query

View File

@ -0,0 +1,22 @@
#pragma once
#include "query/serialization.capnp.h"
#include "query/typed_value.hpp"
#include "storage/serialization.hpp"
namespace distributed {
class DataManager;
}
namespace query {
void SaveCapnpTypedValue(const query::TypedValue &value,
capnp::TypedValue::Builder *builder,
storage::SendVersions versions);
void LoadCapnpTypedValue(const capnp::TypedValue::Reader &reader,
query::TypedValue *value,
database::GraphDbAccessor *dba,
distributed::DataManager *data_manager);
} // namespace query

View File

@ -19,7 +19,7 @@ class TransactionEngine final {
std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>>
Interpret(const std::string &query,
const std::map<std::string, TypedValue> &params) {
const std::map<std::string, PropertyValue> &params) {
// Clear pending results.
results_ = std::experimental::nullopt;

View File

@ -84,7 +84,7 @@ class TypedValue
TypedValue(double value) : type_(Type::Double) { double_v = value; }
// conversion function to PropertyValue
operator PropertyValue() const;
explicit operator PropertyValue() const;
/// constructors for non-primitive types
TypedValue(const std::string &value) : type_(Type::String) {

View File

@ -17,10 +17,6 @@
* PropertyValue::Type. Each such type corresponds to exactly one C++ type.
*/
class PropertyValue {
private:
/** Private default constructor, makes Null */
PropertyValue() : type_(Type::Null) {}
public:
/** A value type. Each type corresponds to exactly one C++ type */
enum class Type : unsigned { Null, String, Bool, Int, Double, List, Map };
@ -37,6 +33,9 @@ class PropertyValue {
return a == b || (is_numeric(a) && is_numeric(b));
}
// default constructor, makes Null
PropertyValue() : type_(Type::Null) {}
// constructors for primitive types
PropertyValue(bool value) : type_(Type::Bool) { bool_v = value; }
PropertyValue(int value) : type_(Type::Int) { int_v = value; }

View File

@ -19,3 +19,70 @@ struct Property {}
struct Address {
storage @0 :UInt64;
}
struct PropertyValue {
union {
nullType @0 :Void;
bool @1 :Bool;
integer @2 :Int64;
double @3 :Float64;
string @4 :Text;
list @5 :List(PropertyValue);
map @6 :List(MapEntry);
}
struct MapEntry {
key @0 :Text;
value @1 :PropertyValue;
}
}
struct PropertyValueStore {
properties @0 :List(Entry);
struct Entry {
id @0 :Common;
value @1 :PropertyValue;
}
}
struct Edge {
from @0 :Address;
to @1 :Address;
typeId @2 :UInt16;
properties @3 :PropertyValueStore;
}
struct Vertex {
outEdges @0 :List(EdgeEntry);
inEdges @1 :List(EdgeEntry);
labelIds @2 :List(UInt16);
properties @3 :PropertyValueStore;
struct EdgeEntry {
vertexAddress @0 :Address;
edgeAddress @1 :Address;
edgeTypeId @2 :UInt16;
}
}
enum SendVersions {
both @0;
onlyOld @1;
onlyNew @2;
}
struct VertexAccessor {
cypherId @0 :Int64;
address @1 :UInt64;
old @2 :Vertex;
new @3 :Vertex;
}
struct EdgeAccessor {
cypherId @0 :Int64;
address @1 :UInt64;
old @2 :Edge;
new @3 :Edge;
}

View File

@ -0,0 +1,292 @@
#include "serialization.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/data_manager.hpp"
namespace storage {
void SaveCapnpPropertyValue(const PropertyValue &value,
capnp::PropertyValue::Builder *builder) {
switch (value.type()) {
case PropertyValue::Type::Null:
builder->setNullType();
return;
case PropertyValue::Type::Bool:
builder->setBool(value.Value<bool>());
return;
case PropertyValue::Type::Int:
builder->setInteger(value.Value<int64_t>());
return;
case PropertyValue::Type::Double:
builder->setDouble(value.Value<double>());
return;
case PropertyValue::Type::String:
builder->setString(value.Value<std::string>());
return;
case PropertyValue::Type::List: {
const auto &values = value.Value<std::vector<PropertyValue>>();
auto list_builder = builder->initList(values.size());
for (size_t i = 0; i < values.size(); ++i) {
auto value_builder = list_builder[i];
SaveCapnpPropertyValue(values[i], &value_builder);
}
return;
}
case PropertyValue::Type::Map: {
const auto &map = value.Value<std::map<std::string, PropertyValue>>();
auto map_builder = builder->initMap(map.size());
size_t i = 0;
for (const auto &kv : map) {
auto kv_builder = map_builder[i];
kv_builder.setKey(kv.first);
auto value_builder = kv_builder.initValue();
SaveCapnpPropertyValue(kv.second, &value_builder);
++i;
}
return;
}
}
}
void LoadCapnpPropertyValue(const capnp::PropertyValue::Reader &reader,
PropertyValue *value) {
switch (reader.which()) {
case capnp::PropertyValue::NULL_TYPE:
*value = PropertyValue::Null;
return;
case capnp::PropertyValue::BOOL:
*value = reader.getBool();
return;
case capnp::PropertyValue::INTEGER:
*value = reader.getInteger();
return;
case capnp::PropertyValue::DOUBLE:
*value = reader.getDouble();
return;
case capnp::PropertyValue::STRING:
*value = reader.getString().cStr();
return;
case capnp::PropertyValue::LIST: {
std::vector<PropertyValue> list;
list.reserve(reader.getList().size());
for (const auto &value_reader : reader.getList()) {
list.emplace_back();
LoadCapnpPropertyValue(value_reader, &list.back());
}
*value = list;
return;
}
case capnp::PropertyValue::MAP: {
std::map<std::string, PropertyValue> map;
for (const auto &kv_reader : reader.getMap()) {
auto key = kv_reader.getKey();
LoadCapnpPropertyValue(kv_reader.getValue(), &map[key]);
}
*value = map;
return;
}
}
}
void SaveProperties(const PropertyValueStore &properties,
capnp::PropertyValueStore::Builder *builder) {
size_t i = 0;
auto props_builder = builder->initProperties(properties.size());
for (const auto &kv : properties) {
auto kv_builder = props_builder[i++];
auto id_builder = kv_builder.initId();
kv.first.Save(&id_builder);
auto value_builder = kv_builder.initValue();
SaveCapnpPropertyValue(kv.second, &value_builder);
}
}
void LoadProperties(const capnp::PropertyValueStore::Reader &reader,
PropertyValueStore *properties) {
properties->clear();
auto props_reader = reader.getProperties();
for (const auto &kv_reader : props_reader) {
storage::Property id;
id.Load(kv_reader.getId());
PropertyValue value;
LoadCapnpPropertyValue(kv_reader.getValue(), &value);
properties->set(id, value);
}
}
template <class TAddress>
void SaveAddress(TAddress address, capnp::Address::Builder *builder,
int16_t worker_id) {
TAddress global_address =
address.is_local() ? TAddress(address.local()->gid_, worker_id) : address;
builder->setStorage(global_address.raw());
}
EdgeAddress LoadEdgeAddress(const capnp::Address::Reader &reader) {
return EdgeAddress(reader.getStorage());
}
VertexAddress LoadVertexAddress(const capnp::Address::Reader &reader) {
return VertexAddress(reader.getStorage());
}
void SaveVertex(const Vertex &vertex, capnp::Vertex::Builder *builder,
int16_t worker_id) {
auto save_edges = [worker_id](const auto &edges, auto *edges_builder) {
int64_t i = 0;
for (const auto &edge : edges) {
auto edge_builder = (*edges_builder)[i];
auto vertex_addr_builder = edge_builder.initVertexAddress();
SaveAddress(edge.vertex, &vertex_addr_builder, worker_id);
auto edge_addr_builder = edge_builder.initEdgeAddress();
SaveAddress(edge.edge, &edge_addr_builder, worker_id);
edge_builder.setEdgeTypeId(edge.edge_type.Id());
++i;
}
};
auto out_builder = builder->initOutEdges(vertex.out_.size());
save_edges(vertex.out_, &out_builder);
auto in_builder = builder->initInEdges(vertex.in_.size());
save_edges(vertex.in_, &in_builder);
auto labels_builder = builder->initLabelIds(vertex.labels_.size());
for (size_t i = 0; i < vertex.labels_.size(); ++i) {
labels_builder.set(i, vertex.labels_[i].Id());
}
auto properties_builder = builder->initProperties();
storage::SaveProperties(vertex.properties_, &properties_builder);
}
void SaveEdge(const Edge &edge, capnp::Edge::Builder *builder,
int16_t worker_id) {
auto from_builder = builder->initFrom();
SaveAddress(edge.from_, &from_builder, worker_id);
auto to_builder = builder->initTo();
SaveAddress(edge.to_, &to_builder, worker_id);
builder->setTypeId(edge.edge_type_.Id());
auto properties_builder = builder->initProperties();
storage::SaveProperties(edge.properties_, &properties_builder);
}
/// Alias for `SaveEdge` allowing for param type resolution.
void SaveElement(const Edge &record, capnp::Edge::Builder *builder,
int16_t worker_id) {
return SaveEdge(record, builder, worker_id);
}
/// Alias for `SaveVertex` allowing for param type resolution.
void SaveElement(const Vertex &record, capnp::Vertex::Builder *builder,
int16_t worker_id) {
return SaveVertex(record, builder, worker_id);
}
std::unique_ptr<Vertex> LoadVertex(const capnp::Vertex::Reader &reader) {
auto vertex = std::make_unique<Vertex>();
auto load_edges = [](const auto &edges_reader) {
Edges edges;
for (const auto &edge_reader : edges_reader) {
auto vertex_address = LoadVertexAddress(edge_reader.getVertexAddress());
auto edge_address = LoadEdgeAddress(edge_reader.getEdgeAddress());
storage::EdgeType edge_type(edge_reader.getEdgeTypeId());
edges.emplace(vertex_address, edge_address, edge_type);
}
return edges;
};
vertex->out_ = load_edges(reader.getOutEdges());
vertex->in_ = load_edges(reader.getInEdges());
for (const auto &label_id : reader.getLabelIds()) {
vertex->labels_.emplace_back(label_id);
}
storage::LoadProperties(reader.getProperties(), &vertex->properties_);
return vertex;
}
std::unique_ptr<Edge> LoadEdge(const capnp::Edge::Reader &reader) {
auto from = LoadVertexAddress(reader.getFrom());
auto to = LoadVertexAddress(reader.getTo());
auto edge =
std::make_unique<Edge>(from, to, storage::EdgeType{reader.getTypeId()});
storage::LoadProperties(reader.getProperties(), &edge->properties_);
return edge;
}
template <class TRecord, class TCapnpRecord>
void SaveRecordAccessor(const RecordAccessor<TRecord> &accessor,
typename TCapnpRecord::Builder *builder,
SendVersions versions) {
builder->setCypherId(accessor.CypherId());
builder->setAddress(accessor.GlobalAddress().raw());
bool reconstructed = false;
if (!accessor.GetOld() && !accessor.GetNew()) {
reconstructed = true;
bool result = accessor.Reconstruct();
CHECK(result) << "Attempting to serialize an element not visible to "
"current transaction.";
}
int worker_id = accessor.GlobalAddress().worker_id();
auto old_rec = accessor.GetOld();
if (old_rec && versions != SendVersions::ONLY_NEW) {
auto old_builder = builder->initOld();
storage::SaveElement(*old_rec, &old_builder, worker_id);
}
if (versions != SendVersions::ONLY_OLD) {
if (!reconstructed && !accessor.GetNew()) {
bool result = accessor.Reconstruct();
CHECK(result) << "Attempting to serialize an element not visible to "
"current transaction.";
}
auto *new_rec = accessor.GetNew();
if (new_rec) {
auto new_builder = builder->initNew();
storage::SaveElement(*new_rec, &new_builder, worker_id);
}
}
}
void SaveVertexAccessor(const VertexAccessor &vertex_accessor,
capnp::VertexAccessor::Builder *builder,
SendVersions versions) {
SaveRecordAccessor<Vertex, capnp::VertexAccessor>(vertex_accessor, builder,
versions);
}
void SaveEdgeAccessor(const EdgeAccessor &edge_accessor,
capnp::EdgeAccessor::Builder *builder,
SendVersions versions) {
SaveRecordAccessor<Edge, capnp::EdgeAccessor>(edge_accessor, builder,
versions);
}
VertexAccessor LoadVertexAccessor(const capnp::VertexAccessor::Reader &reader,
database::GraphDbAccessor *dba,
distributed::DataManager *data_manager) {
int64_t cypher_id = reader.getCypherId();
storage::VertexAddress global_address(reader.getAddress());
auto old_record =
reader.hasOld() ? storage::LoadVertex(reader.getOld()) : nullptr;
auto new_record =
reader.hasNew() ? storage::LoadVertex(reader.getNew()) : nullptr;
data_manager->Emplace(
dba->transaction_id(), global_address.gid(),
distributed::CachedRecordData<Vertex>(cypher_id, std::move(old_record),
std::move(new_record)));
return VertexAccessor(global_address, *dba);
}
EdgeAccessor LoadEdgeAccessor(const capnp::EdgeAccessor::Reader &reader,
database::GraphDbAccessor *dba,
distributed::DataManager *data_manager) {
int64_t cypher_id = reader.getCypherId();
storage::EdgeAddress global_address(reader.getAddress());
auto old_record =
reader.hasOld() ? storage::LoadEdge(reader.getOld()) : nullptr;
auto new_record =
reader.hasNew() ? storage::LoadEdge(reader.getNew()) : nullptr;
data_manager->Emplace(
dba->transaction_id(), global_address.gid(),
distributed::CachedRecordData<Edge>(cypher_id, std::move(old_record),
std::move(new_record)));
return EdgeAccessor(global_address, *dba);
}
} // namespace storage

View File

@ -0,0 +1,69 @@
#pragma once
#include "storage/edge.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/property_value.hpp"
#include "storage/property_value_store.hpp"
#include "storage/serialization.capnp.h"
#include "storage/vertex.hpp"
#include "storage/vertex_accessor.hpp"
namespace database {
class GraphDbAccessor;
}
namespace distributed {
class DataManager;
}
namespace storage {
void SaveCapnpPropertyValue(const PropertyValue &value,
capnp::PropertyValue::Builder *builder);
void LoadCapnpPropertyValue(const capnp::PropertyValue::Reader &reader,
PropertyValue *value);
void SaveProperties(const PropertyValueStore &properties,
capnp::PropertyValueStore::Builder *builder);
void LoadProperties(const capnp::PropertyValueStore::Reader &reader,
PropertyValueStore *properties);
void SaveVertex(const Vertex &vertex, capnp::Vertex::Builder *builder,
int16_t worker_id);
void SaveEdge(const Edge &edge, capnp::Edge::Builder *builder,
int16_t worker_id);
/// Alias for `SaveEdge` allowing for param type resolution.
void SaveElement(const Edge &record, capnp::Edge::Builder *builder,
int16_t worker_id);
/// Alias for `SaveVertex` allowing for param type resolution.
void SaveElement(const Vertex &record, capnp::Vertex::Builder *builder,
int16_t worker_id);
std::unique_ptr<Vertex> LoadVertex(const capnp::Vertex::Reader &reader);
std::unique_ptr<Edge> LoadEdge(const capnp::Edge::Reader &reader);
enum class SendVersions { BOTH, ONLY_OLD, ONLY_NEW };
void SaveVertexAccessor(const VertexAccessor &vertex_accessor,
capnp::VertexAccessor::Builder *builder,
SendVersions versions);
VertexAccessor LoadVertexAccessor(const capnp::VertexAccessor::Reader &reader,
database::GraphDbAccessor *dba,
distributed::DataManager *data_manager);
void SaveEdgeAccessor(const EdgeAccessor &edge_accessor,
capnp::EdgeAccessor::Builder *builder,
SendVersions versions);
EdgeAccessor LoadEdgeAccessor(const capnp::EdgeAccessor::Reader &reader,
database::GraphDbAccessor *dba,
distributed::DataManager *data_manager);
} // namespace storage

View File

@ -64,7 +64,7 @@ class Cluster {
}
auto Execute(const std::string &query,
std::map<std::string, query::TypedValue> params = {}) {
std::map<std::string, PropertyValue> params = {}) {
auto dba = master_->Access();
ResultStreamFaker<query::TypedValue> result;
(*interpreter_)(query, *dba, params, false).PullAll(result);

View File

@ -38,7 +38,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest {
}
auto RunWithDba(const std::string &query, GraphDbAccessor &dba) {
std::map<std::string, query::TypedValue> params = {};
std::map<std::string, PropertyValue> params = {};
ResultStreamFaker<query::TypedValue> stream;
auto results = interpreter_.value()(query, dba, params, false);
stream.Header(results.header());

View File

@ -3,11 +3,11 @@
#include <capnp/message.h>
#include "distributed/serialization.hpp"
#include "mvcc/version_list.hpp"
#include "query/typed_value.hpp"
#include "storage/edge.hpp"
#include "storage/property_value_store.hpp"
#include "storage/serialization.hpp"
#include "storage/types.hpp"
#include "storage/vertex.hpp"
#include "transactions/single_node/engine_single_node.hpp"
@ -68,14 +68,14 @@ bool CheckEdge(const Edge &e1, int w1, const Edge &e2, int w2) {
#undef CHECK_RETURN
#define SAVE_AND_LOAD(type, name, element) \
std::unique_ptr<type> name; \
{ \
::capnp::MallocMessageBuilder message; \
auto builder = message.initRoot<distributed::capnp::type>(); \
distributed::Save##type(element, &builder, 0); \
auto reader = message.getRoot<distributed::capnp::type>(); \
name = distributed::Load##type(reader); \
#define SAVE_AND_LOAD(type, name, element) \
std::unique_ptr<type> name; \
{ \
::capnp::MallocMessageBuilder message; \
auto builder = message.initRoot<storage::capnp::type>(); \
storage::Save##type(element, &builder, 0); \
auto reader = message.getRoot<storage::capnp::type>(); \
name = storage::Load##type(reader); \
}
TEST(DistributedSerialization, Empty) {

View File

@ -18,7 +18,7 @@ class InterpreterTest : public ::testing::Test {
query::Interpreter interpreter_;
auto Interpret(const std::string &query,
const std::map<std::string, query::TypedValue> &params = {}) {
const std::map<std::string, PropertyValue> &params = {}) {
auto dba = db_.Access();
ResultStreamFaker<query::TypedValue> stream;
auto results = interpreter_(query, *dba, params, false);
@ -116,7 +116,7 @@ TEST_F(InterpreterTest, Parameters) {
{
// Non-primitive literal.
auto stream = Interpret("RETURN $2",
{{"2", std::vector<query::TypedValue>{5, 2, 3}}});
{{"2", std::vector<PropertyValue>{5, 2, 3}}});
ASSERT_EQ(stream.GetResults().size(), 1U);
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
auto result = query::test_common::ToList<int64_t>(

View File

@ -552,6 +552,8 @@ auto GetMerge(AstStorage &storage, Pattern *pattern, OnMatch on_match,
storage.Create<query::NotEqualOperator>((expr1), (expr2))
#define AND(expr1, expr2) storage.Create<query::AndOperator>((expr1), (expr2))
#define OR(expr1, expr2) storage.Create<query::OrOperator>((expr1), (expr2))
#define IN_LIST(expr1, expr2) \
storage.Create<query::InListOperator>((expr1), (expr2))
// Function call
#define FN(function_name, ...) \
storage.Create<query::Function>( \

View File

@ -77,7 +77,7 @@ class QueryCostEstimator : public ::testing::Test {
return storage_.Create<PrimitiveLiteral>(value);
}
Expression *Parameter(const TypedValue &value) {
Expression *Parameter(const PropertyValue &value) {
int token_position = parameters_.size();
parameters_.Add(token_position, value);
return storage_.Create<ParameterLookup>(token_position);

View File

@ -39,6 +39,15 @@ class ExpressionEvaluatorTest : public ::testing::Test {
Frame frame{128};
ExpressionEvaluator eval{&frame, symbol_table, ctx, dba.get(),
GraphView::OLD};
Identifier *CreateIdentifierWithValue(std::string name,
const TypedValue &value) {
auto id = storage.Create<Identifier>(name, true);
auto symbol = symbol_table.CreateSymbol(name, true);
symbol_table[*id] = symbol;
frame[symbol] = value;
return id;
}
};
TEST_F(ExpressionEvaluatorTest, OrOperator) {
@ -101,20 +110,20 @@ TEST_F(ExpressionEvaluatorTest, AndOperatorNull) {
{
// Null doesn't short circuit
auto *op = storage.Create<AndOperator>(
storage.Create<PrimitiveLiteral>(TypedValue::Null),
storage.Create<PrimitiveLiteral>(PropertyValue::Null),
storage.Create<PrimitiveLiteral>(5));
EXPECT_THROW(op->Accept(eval), QueryRuntimeException);
}
{
auto *op = storage.Create<AndOperator>(
storage.Create<PrimitiveLiteral>(TypedValue::Null),
storage.Create<PrimitiveLiteral>(PropertyValue::Null),
storage.Create<PrimitiveLiteral>(true));
auto value = op->Accept(eval);
EXPECT_TRUE(value.IsNull());
}
{
auto *op = storage.Create<AndOperator>(
storage.Create<PrimitiveLiteral>(TypedValue::Null),
storage.Create<PrimitiveLiteral>(PropertyValue::Null),
storage.Create<PrimitiveLiteral>(false));
auto value = op->Accept(eval);
ASSERT_TRUE(value.IsBool());
@ -275,7 +284,7 @@ TEST_F(ExpressionEvaluatorTest, InListOperator) {
}
{
auto *list_literal = storage.Create<ListLiteral>(std::vector<Expression *>{
storage.Create<PrimitiveLiteral>(TypedValue::Null),
storage.Create<PrimitiveLiteral>(PropertyValue::Null),
storage.Create<PrimitiveLiteral>(2),
storage.Create<PrimitiveLiteral>("a")});
// Element doesn't exist in list with null element.
@ -288,21 +297,21 @@ TEST_F(ExpressionEvaluatorTest, InListOperator) {
// Null list.
auto *op = storage.Create<InListOperator>(
storage.Create<PrimitiveLiteral>("x"),
storage.Create<PrimitiveLiteral>(TypedValue::Null));
storage.Create<PrimitiveLiteral>(PropertyValue::Null));
auto value = op->Accept(eval);
EXPECT_TRUE(value.IsNull());
}
{
// Null literal.
auto *op = storage.Create<InListOperator>(
storage.Create<PrimitiveLiteral>(TypedValue::Null), list_literal);
storage.Create<PrimitiveLiteral>(PropertyValue::Null), list_literal);
auto value = op->Accept(eval);
EXPECT_TRUE(value.IsNull());
}
{
// Null literal, empty list.
auto *op = storage.Create<InListOperator>(
storage.Create<PrimitiveLiteral>(TypedValue::Null),
storage.Create<PrimitiveLiteral>(PropertyValue::Null),
storage.Create<ListLiteral>(std::vector<Expression *>()));
auto value = op->Accept(eval);
EXPECT_FALSE(value.ValueBool());
@ -345,7 +354,7 @@ TEST_F(ExpressionEvaluatorTest, ListIndexing) {
{
// Indexing with one operator being null.
auto *op = storage.Create<SubscriptOperator>(
storage.Create<PrimitiveLiteral>(TypedValue::Null),
storage.Create<PrimitiveLiteral>(PropertyValue::Null),
storage.Create<PrimitiveLiteral>(-2));
auto value = op->Accept(eval);
EXPECT_TRUE(value.IsNull());
@ -390,7 +399,7 @@ TEST_F(ExpressionEvaluatorTest, MapIndexing) {
{
// Indexing with Null.
auto *op = storage.Create<SubscriptOperator>(
map_literal, storage.Create<PrimitiveLiteral>(TypedValue::Null));
map_literal, storage.Create<PrimitiveLiteral>(PropertyValue::Null));
auto value = op->Accept(eval);
EXPECT_TRUE(value.IsNull());
}
@ -404,51 +413,51 @@ TEST_F(ExpressionEvaluatorTest, VertexAndEdgeIndexing) {
v1.PropsSet(prop, 42);
e11.PropsSet(prop, 43);
auto *vertex_literal = storage.Create<PrimitiveLiteral>(v1);
auto *edge_literal = storage.Create<PrimitiveLiteral>(e11);
auto *vertex_id = CreateIdentifierWithValue("v1", v1);
auto *edge_id = CreateIdentifierWithValue("e11", e11);
{
// Legal indexing.
auto *op1 = storage.Create<SubscriptOperator>(
vertex_literal, storage.Create<PrimitiveLiteral>("prop"));
vertex_id, storage.Create<PrimitiveLiteral>("prop"));
auto value1 = op1->Accept(eval);
EXPECT_EQ(value1.ValueInt(), 42);
auto *op2 = storage.Create<SubscriptOperator>(
edge_literal, storage.Create<PrimitiveLiteral>("prop"));
edge_id, storage.Create<PrimitiveLiteral>("prop"));
auto value2 = op2->Accept(eval);
EXPECT_EQ(value2.ValueInt(), 43);
}
{
// Legal indexing, non-existing key.
auto *op1 = storage.Create<SubscriptOperator>(
vertex_literal, storage.Create<PrimitiveLiteral>("blah"));
vertex_id, storage.Create<PrimitiveLiteral>("blah"));
auto value1 = op1->Accept(eval);
EXPECT_TRUE(value1.IsNull());
auto *op2 = storage.Create<SubscriptOperator>(
edge_literal, storage.Create<PrimitiveLiteral>("blah"));
edge_id, storage.Create<PrimitiveLiteral>("blah"));
auto value2 = op2->Accept(eval);
EXPECT_TRUE(value2.IsNull());
}
{
// Wrong key type.
auto *op1 = storage.Create<SubscriptOperator>(
vertex_literal, storage.Create<PrimitiveLiteral>(1));
vertex_id, storage.Create<PrimitiveLiteral>(1));
EXPECT_THROW(op1->Accept(eval), QueryRuntimeException);
auto *op2 = storage.Create<SubscriptOperator>(
edge_literal, storage.Create<PrimitiveLiteral>(1));
edge_id, storage.Create<PrimitiveLiteral>(1));
EXPECT_THROW(op2->Accept(eval), QueryRuntimeException);
}
{
// Indexing with Null.
auto *op1 = storage.Create<SubscriptOperator>(
vertex_literal, storage.Create<PrimitiveLiteral>(TypedValue::Null));
vertex_id, storage.Create<PrimitiveLiteral>(PropertyValue::Null));
auto value1 = op1->Accept(eval);
EXPECT_TRUE(value1.IsNull());
auto *op2 = storage.Create<SubscriptOperator>(
edge_literal, storage.Create<PrimitiveLiteral>(TypedValue::Null));
edge_id, storage.Create<PrimitiveLiteral>(PropertyValue::Null));
auto value2 = op2->Accept(eval);
EXPECT_TRUE(value2.IsNull());
}
@ -516,7 +525,7 @@ TEST_F(ExpressionEvaluatorTest, ListSlicingOperator) {
{
// Bound of illegal type and null value bound.
auto *op = storage.Create<ListSlicingOperator>(
list_literal, storage.Create<PrimitiveLiteral>(TypedValue::Null),
list_literal, storage.Create<PrimitiveLiteral>(PropertyValue::Null),
storage.Create<PrimitiveLiteral>("mirko"));
EXPECT_THROW(op->Accept(eval), QueryRuntimeException);
}
@ -530,7 +539,7 @@ TEST_F(ExpressionEvaluatorTest, ListSlicingOperator) {
{
// Null value list with undefined upper bound.
auto *op = storage.Create<ListSlicingOperator>(
storage.Create<PrimitiveLiteral>(TypedValue::Null),
storage.Create<PrimitiveLiteral>(PropertyValue::Null),
storage.Create<PrimitiveLiteral>(-2), nullptr);
auto value = op->Accept(eval);
EXPECT_TRUE(value.IsNull());
@ -540,7 +549,7 @@ TEST_F(ExpressionEvaluatorTest, ListSlicingOperator) {
// Null value index.
auto *op = storage.Create<ListSlicingOperator>(
list_literal, storage.Create<PrimitiveLiteral>(-2),
storage.Create<PrimitiveLiteral>(TypedValue::Null));
storage.Create<PrimitiveLiteral>(PropertyValue::Null));
auto value = op->Accept(eval);
EXPECT_TRUE(value.IsNull());
;
@ -605,7 +614,7 @@ TEST_F(ExpressionEvaluatorTest, IsNullOperator) {
auto val1 = op->Accept(eval);
ASSERT_EQ(val1.ValueBool(), false);
op = storage.Create<IsNullOperator>(
storage.Create<PrimitiveLiteral>(TypedValue::Null));
storage.Create<PrimitiveLiteral>(PropertyValue::Null));
auto val2 = op->Accept(eval);
ASSERT_EQ(val2.ValueBool(), true);
}
@ -695,7 +704,7 @@ TEST_F(ExpressionEvaluatorTest, All) {
TEST_F(ExpressionEvaluatorTest, FunctionAllNullList) {
AstStorage storage;
auto *all = ALL("x", LITERAL(TypedValue::Null), WHERE(LITERAL(true)));
auto *all = ALL("x", LITERAL(PropertyValue::Null), WHERE(LITERAL(true)));
const auto x_sym = symbol_table.CreateSymbol("x", true);
symbol_table[*all->identifier_] = x_sym;
auto value = all->Accept(eval);
@ -738,7 +747,8 @@ TEST_F(ExpressionEvaluatorTest, FunctionSingle2) {
TEST_F(ExpressionEvaluatorTest, FunctionSingleNullList) {
AstStorage storage;
auto *single = SINGLE("x", LITERAL(TypedValue::Null), WHERE(LITERAL(true)));
auto *single =
SINGLE("x", LITERAL(PropertyValue::Null), WHERE(LITERAL(true)));
const auto x_sym = symbol_table.CreateSymbol("x", true);
symbol_table[*single->identifier_] = x_sym;
auto value = single->Accept(eval);
@ -766,7 +776,7 @@ TEST_F(ExpressionEvaluatorTest, FunctionExtract) {
AstStorage storage;
auto *ident_x = IDENT("x");
auto *extract =
EXTRACT("x", LIST(LITERAL(1), LITERAL(2), LITERAL(TypedValue::Null)),
EXTRACT("x", LIST(LITERAL(1), LITERAL(2), LITERAL(PropertyValue::Null)),
ADD(ident_x, LITERAL(1)));
const auto x_sym = symbol_table.CreateSymbol("x", true);
symbol_table[*extract->identifier_] = x_sym;
@ -784,7 +794,7 @@ TEST_F(ExpressionEvaluatorTest, FunctionExtractNull) {
AstStorage storage;
auto *ident_x = IDENT("x");
auto *extract =
EXTRACT("x", LITERAL(TypedValue::Null), ADD(ident_x, LITERAL(1)));
EXTRACT("x", LITERAL(PropertyValue::Null), ADD(ident_x, LITERAL(1)));
const auto x_sym = symbol_table.CreateSymbol("x", true);
symbol_table[*extract->identifier_] = x_sym;
symbol_table[*ident_x] = x_sym;
@ -853,8 +863,13 @@ class FunctionTest : public ExpressionEvaluatorTest {
TypedValue EvaluateFunction(const std::string &function_name,
const std::vector<TypedValue> &args) {
std::vector<Expression *> expressions;
for (const auto &arg : args) {
expressions.push_back(storage.Create<PrimitiveLiteral>(arg));
for (size_t i = 0; i < args.size(); ++i) {
auto *ident =
storage.Create<Identifier>("arg_" + std::to_string(i), true);
auto sym = symbol_table.CreateSymbol("arg_" + std::to_string(i), true);
symbol_table[*ident] = sym;
frame[sym] = args[i];
expressions.push_back(ident);
}
auto *op = storage.Create<Function>(function_name, expressions);
return op->Accept(eval);

View File

@ -294,9 +294,9 @@ TEST(QueryPlan, AggregateGroupByValues) {
auto dba_ptr = db.Access();
auto &dba = *dba_ptr;
// a vector of TypedValue to be set as property values on vertices
// a vector of PropertyValue to be set as property values on vertices
// most of them should result in a distinct group (commented where not)
std::vector<TypedValue> group_by_vals;
std::vector<PropertyValue> group_by_vals;
group_by_vals.emplace_back(4);
group_by_vals.emplace_back(7);
group_by_vals.emplace_back(7.3);
@ -306,14 +306,14 @@ TEST(QueryPlan, AggregateGroupByValues) {
group_by_vals.emplace_back("1");
group_by_vals.emplace_back(true);
group_by_vals.emplace_back(false);
group_by_vals.emplace_back(std::vector<TypedValue>{1});
group_by_vals.emplace_back(std::vector<TypedValue>{1, 2});
group_by_vals.emplace_back(std::vector<TypedValue>{2, 1});
group_by_vals.emplace_back(TypedValue::Null);
group_by_vals.emplace_back(std::vector<PropertyValue>{1});
group_by_vals.emplace_back(std::vector<PropertyValue>{1, 2});
group_by_vals.emplace_back(std::vector<PropertyValue>{2, 1});
group_by_vals.emplace_back(PropertyValue::Null);
// should NOT result in another group because 7.0 == 7
group_by_vals.emplace_back(7.0);
// should NOT result in another group
group_by_vals.emplace_back(std::vector<TypedValue>{1, 2.0});
group_by_vals.emplace_back(std::vector<PropertyValue>{1, 2.0});
// generate a lot of vertices and set props on them
auto prop = dba.Property("prop");
@ -587,9 +587,9 @@ TEST(QueryPlan, Unwind) {
SymbolTable symbol_table;
// UNWIND [ [1, true, "x"], [], ["bla"] ] AS x UNWIND x as y RETURN x, y
auto input_expr = storage.Create<PrimitiveLiteral>(std::vector<TypedValue>{
std::vector<TypedValue>{1, true, "x"}, std::vector<TypedValue>{},
std::vector<TypedValue>{"bla"}});
auto input_expr = storage.Create<PrimitiveLiteral>(std::vector<PropertyValue>{
std::vector<PropertyValue>{1, true, "x"}, std::vector<PropertyValue>{},
std::vector<PropertyValue>{"bla"}});
auto x = symbol_table.CreateSymbol("x", true);
auto unwind_0 = std::make_shared<plan::Unwind>(nullptr, input_expr, x);

View File

@ -113,8 +113,8 @@ TEST(QueryPlan, OrderBy) {
// contains a series of tests
// each test defines the ordering a vector of values in the desired order
auto Null = TypedValue::Null;
std::vector<std::pair<Ordering, std::vector<TypedValue>>> orderable{
auto Null = PropertyValue::Null;
std::vector<std::pair<Ordering, std::vector<PropertyValue>>> orderable{
{Ordering::ASC, {0, 0, 0.5, 1, 2, 12.6, 42, Null, Null}},
{Ordering::ASC, {false, false, true, true, Null, Null}},
{Ordering::ASC, {"A", "B", "a", "a", "aa", "ab", "aba", Null, Null}},
@ -132,7 +132,7 @@ TEST(QueryPlan, OrderBy) {
// take some effort to shuffle the values
// because we are testing that something not ordered gets ordered
// and need to take care it does not happen by accident
std::vector<TypedValue> shuffled(values.begin(), values.end());
std::vector<PropertyValue> shuffled(values.begin(), values.end());
auto order_equal = [&values, &shuffled]() {
return std::equal(values.begin(), values.end(), shuffled.begin(),
TypedValue::BoolEqual{});
@ -233,15 +233,15 @@ TEST(QueryPlan, OrderByExceptions) {
// a vector of pairs of typed values that should result
// in an exception when trying to order on them
std::vector<std::pair<TypedValue, TypedValue>> exception_pairs{
std::vector<std::pair<PropertyValue, PropertyValue>> exception_pairs{
{42, true},
{42, "bla"},
{42, std::vector<TypedValue>{42}},
{42, std::vector<PropertyValue>{42}},
{true, "bla"},
{true, std::vector<TypedValue>{true}},
{"bla", std::vector<TypedValue>{"bla"}},
{true, std::vector<PropertyValue>{true}},
{"bla", std::vector<PropertyValue>{"bla"}},
// illegal comparisons of same-type values
{std::vector<TypedValue>{42}, std::vector<TypedValue>{42}}};
{std::vector<PropertyValue>{42}, std::vector<PropertyValue>{42}}};
for (const auto &pair : exception_pairs) {
// empty database

View File

@ -16,6 +16,7 @@
#include "communication/result_stream_faker.hpp"
#include "database/graph_db.hpp"
#include "distributed/data_manager.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "query/context.hpp"
#include "query/exceptions.hpp"
@ -1058,15 +1059,20 @@ TEST_F(STShortestPathTest, ExpandLambda) {
std::tie(expression, length) = test;
auto source =
MakeUnwind(symbol_table, "s", nullptr, LIST(LITERAL(vertices[0])));
auto sink =
MakeUnwind(symbol_table, "t", source.op_, LIST(LITERAL(vertices[3])));
auto results =
ShortestPaths(sink.op_, source.sym_, sink.sym_,
EdgeAtom::Direction::BOTH, nullptr, nullptr,
ExpandVariable::Lambda{inner_edge_symbol,
inner_node_symbol, expression});
auto s = MakeScanAll(storage, symbol_table, "s");
auto source = std::make_shared<Filter>(
s.op_, EQ(PROPERTY_LOOKUP(s.node_->identifier_, dba.Property("id")),
LITERAL(0)));
auto t = MakeScanAll(storage, symbol_table, "t", source);
auto sink = std::make_shared<Filter>(
t.op_, EQ(PROPERTY_LOOKUP(t.node_->identifier_, dba.Property("id")),
LITERAL(3)));
auto results = ShortestPaths(
sink, s.sym_, t.sym_, EdgeAtom::Direction::BOTH, nullptr, nullptr,
ExpandVariable::Lambda{inner_edge_symbol, inner_node_symbol,
expression});
if (length == -1) {
EXPECT_EQ(results.size(), 0);
@ -1079,14 +1085,21 @@ TEST_F(STShortestPathTest, ExpandLambda) {
TEST_F(STShortestPathTest, OptionalMatch) {
for (int i = 0; i <= 2; ++i) {
auto source = MakeUnwind(
symbol_table, "s", nullptr,
LIST(i == 0 ? LITERAL(vertices[0]) : LITERAL(TypedValue::Null)));
auto sink = MakeUnwind(
symbol_table, "t", source.op_,
LIST(i == 1 ? LITERAL(vertices[3]) : LITERAL(TypedValue::Null)));
auto results = ShortestPaths(sink.op_, source.sym_, sink.sym_,
EdgeAtom::Direction::BOTH);
auto s = MakeScanAll(storage, symbol_table, "s");
auto source = std::make_shared<Filter>(
s.op_, EQ(PROPERTY_LOOKUP(s.node_->identifier_, dba.Property("id")),
LITERAL(i == 0 ? 0 : -1)));
auto source_opt = std::make_shared<Optional>(
std::make_shared<Once>(), source, std::vector<Symbol>{s.sym_});
auto t = MakeScanAll(storage, symbol_table, "t");
auto sink = std::make_shared<Filter>(
t.op_, EQ(PROPERTY_LOOKUP(t.node_->identifier_, dba.Property("id")),
LITERAL(i == 1 ? 3 : -1)));
auto sink_opt = std::make_shared<Optional>(source_opt, sink,
std::vector<Symbol>{t.sym_});
auto results =
ShortestPaths(sink_opt, s.sym_, t.sym_, EdgeAtom::Direction::BOTH);
EXPECT_EQ(results.size(), 0);
}
}
@ -1139,6 +1152,8 @@ class QueryPlanExpandBfs
Symbol inner_edge = symbol_table.CreateSymbol("inner_edge", true);
Symbol inner_node = symbol_table.CreateSymbol("inner_node", true);
int plan_id_{0};
void SetUp() {
for (auto p : iter::enumerate(vertices)) {
int id, worker;
@ -1168,36 +1183,48 @@ class QueryPlanExpandBfs
// Defines and performs a breadth-first expansion with the given parameters.
// Returns a vector of pairs. Each pair is (vector-of-edges, vertex).
auto ExpandBF(EdgeAtom::Direction direction, int min_depth, int max_depth,
Expression *where, const std::vector<TypedValue> &sources = {},
Expression *where, std::vector<int> source_ids = {},
std::experimental::optional<TypedValue> existing_node =
std::experimental::nullopt) {
auto source_sym = symbol_table.CreateSymbol("source", true);
// Wrap all sources in a list and unwind it.
std::vector<Expression *> source_literals;
for (auto &source : sources) {
source_literals.emplace_back(LITERAL(source));
// If source_ids are empty, we set ID to -1 so no nodes are matched (used
// for optional match test case).
if (source_ids.empty()) {
source_ids = std::vector<int>{-1};
}
auto source_expr = storage.Create<ListLiteral>(source_literals);
std::shared_ptr<LogicalOperator> last_op =
std::make_shared<query::plan::Unwind>(nullptr, source_expr, source_sym);
std::vector<PropertyValue> sources;
for (const auto id : source_ids) sources.emplace_back(id);
auto s = MakeScanAll(storage, symbol_table, "s");
std::shared_ptr<LogicalOperator> last_op = std::make_shared<Filter>(
s.op_, IN_LIST(PROPERTY_LOOKUP(s.node_->identifier_, prop.second),
LITERAL(sources)));
auto node_sym = symbol_table.CreateSymbol("node", true);
auto edge_list_sym = symbol_table.CreateSymbol("edgelist_", true);
if (GetParam().first == TestType::DISTRIBUTED) {
cluster_->master()->plan_dispatcher().DispatchPlan(plan_id_, last_op,
symbol_table);
last_op = std::make_shared<PullRemote>(last_op, plan_id_,
std::vector<Symbol>{s.sym_});
plan_id_++;
}
last_op = std::make_shared<Optional>(std::make_shared<Once>(), last_op,
std::vector<Symbol>{s.sym_});
if (GetParam().first == TestType::DISTRIBUTED) {
last_op = std::make_shared<DistributedExpandBfs>(
node_sym, edge_list_sym, direction, std::vector<storage::EdgeType>{},
last_op, source_sym, !!existing_node, GraphView::OLD,
LITERAL(min_depth), LITERAL(max_depth),
last_op, s.sym_, !!existing_node, GraphView::OLD, LITERAL(min_depth),
LITERAL(max_depth),
ExpandVariable::Lambda{inner_edge, inner_node, where});
} else {
last_op = std::make_shared<ExpandVariable>(
node_sym, edge_list_sym, EdgeAtom::Type::BREADTH_FIRST, direction,
std::vector<storage::EdgeType>{}, false, LITERAL(min_depth),
LITERAL(max_depth), last_op, source_sym,
static_cast<bool>(existing_node),
LITERAL(max_depth), last_op, s.sym_, !!existing_node,
ExpandVariable::Lambda{inner_edge, inner_node, where},
std::experimental::nullopt, std::experimental::nullopt,
GraphView::OLD);
@ -1251,8 +1278,7 @@ class QueryPlanExpandBfs
};
TEST_P(QueryPlanExpandBfs, Basic) {
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
{VertexAccessor(v[0], dba)});
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0});
ASSERT_EQ(results.size(), 5);
@ -1282,8 +1308,7 @@ TEST_P(QueryPlanExpandBfs, Basic) {
TEST_P(QueryPlanExpandBfs, EdgeDirection) {
{
auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr,
{VertexAccessor(v[4], dba)});
auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr, {4});
ASSERT_EQ(results.size(), 4);
if (GetProp(results[0].second) == 5) {
@ -1308,8 +1333,7 @@ TEST_P(QueryPlanExpandBfs, EdgeDirection) {
}
{
auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr,
{VertexAccessor(v[4], dba)});
auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr, {4});
ASSERT_EQ(results.size(), 4);
if (GetProp(results[0].second) == 5) {
@ -1340,8 +1364,8 @@ TEST_P(QueryPlanExpandBfs, Where) {
{
symbol_table[*ident] = inner_node;
auto filter_expr = LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(4));
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr,
{VertexAccessor(v[0], dba)});
auto results =
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr, {0});
ASSERT_EQ(results.size(), 2);
EXPECT_EQ(GetProp(results[0].second), 1);
EXPECT_EQ(GetProp(results[1].second), 2);
@ -1350,8 +1374,8 @@ TEST_P(QueryPlanExpandBfs, Where) {
symbol_table[*ident] = inner_edge;
auto filter_expr = AND(LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(50)),
NEQ(PROPERTY_LOOKUP(ident, prop), LITERAL(12)));
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr,
{VertexAccessor(v[0], dba)});
auto results =
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr, {0});
ASSERT_EQ(results.size(), 4);
EXPECT_EQ(GetProp(results[0].second), 1);
EXPECT_EQ(GetProp(results[1].second), 4);
@ -1366,9 +1390,7 @@ TEST_P(QueryPlanExpandBfs, Where) {
}
TEST_P(QueryPlanExpandBfs, MultipleInputs) {
auto results =
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
{VertexAccessor(v[0], dba), VertexAccessor(v[3], dba)});
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0, 3});
// Expect that each vertex has been returned 2 times.
EXPECT_EQ(results.size(), 10);
std::vector<int> found(5, 0);
@ -1382,21 +1404,15 @@ TEST_P(QueryPlanExpandBfs, ExistingNode) {
using testing::ElementsAre;
using testing::WhenSorted;
std::vector<TypedValue> sources;
for (int i = 0; i < 5; ++i) {
sources.push_back(VertexAccessor(v[i], dba));
}
{
auto results =
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
{VertexAccessor(v[0], dba)}, VertexAccessor(v[3], dba));
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0},
VertexAccessor(v[3], dba));
EXPECT_EQ(results.size(), 1);
EXPECT_EQ(GetProp(results[0].second), 3);
}
{
auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr, sources,
VertexAccessor(v[5], dba));
auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr,
{0, 1, 2, 3, 4}, VertexAccessor(v[5], dba));
std::vector<int> nodes;
for (auto &row : results) {
@ -1406,8 +1422,8 @@ TEST_P(QueryPlanExpandBfs, ExistingNode) {
EXPECT_THAT(nodes, WhenSorted(ElementsAre(1, 2, 3, 4)));
}
{
auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr, sources,
VertexAccessor(v[5], dba));
auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr,
{0, 1, 2, 3, 4}, VertexAccessor(v[5], dba));
std::vector<int> nodes;
for (auto &row : results) {
@ -1420,21 +1436,19 @@ TEST_P(QueryPlanExpandBfs, ExistingNode) {
TEST_P(QueryPlanExpandBfs, OptionalMatch) {
{
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
{TypedValue::Null});
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {});
EXPECT_EQ(results.size(), 0);
}
{
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
{VertexAccessor(v[0], dba)}, TypedValue::Null);
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0},
TypedValue::Null);
EXPECT_EQ(results.size(), 0);
}
}
TEST_P(QueryPlanExpandBfs, ExpansionDepth) {
{
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 2, 3, nullptr,
{VertexAccessor(v[0], dba)});
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 2, 3, nullptr, {0});
EXPECT_EQ(results.size(), 3);
if (GetProp(results[0].second) == 4) {
std::swap(results[0], results[1]);
@ -2298,10 +2312,10 @@ TEST(QueryPlan, ScanAllByLabelProperty) {
auto prop = db.Access()->Property("prop");
// vertex property values that will be stored into the DB
// clang-format off
std::vector<TypedValue> values{
std::vector<PropertyValue> values{
true, false, "a", "b", "c", 0, 1, 2, 0.5, 1.5, 2.5,
std::vector<TypedValue>{0}, std::vector<TypedValue>{1},
std::vector<TypedValue>{2}};
std::vector<PropertyValue>{0}, std::vector<PropertyValue>{1},
std::vector<PropertyValue>{2}};
// clang-format on
{
auto dba = db.Access();

View File

@ -12,7 +12,6 @@
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "distributed/serialization.hpp"
#include "io/network/endpoint.hpp"
#include "utils/file.hpp"