Make ResultStreamFaker
a normal class
Summary: Store accumulated results as `communication::bolt::Value`s instead of `TypedValue`s. Add additional overloads for `Result` and `Summary` which accept `TypedValue`s but internally perform conversions. Reviewers: teon.banek, mferencevic Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2514
This commit is contained in:
parent
13ba9cc23e
commit
b0cc564f8d
@ -5,6 +5,9 @@
|
|||||||
#include "glog/logging.h"
|
#include "glog/logging.h"
|
||||||
|
|
||||||
#include "communication/bolt/v1/value.hpp"
|
#include "communication/bolt/v1/value.hpp"
|
||||||
|
#include "glue/communication.hpp"
|
||||||
|
#include "query/typed_value.hpp"
|
||||||
|
#include "storage/v2/view.hpp"
|
||||||
#include "utils/algorithm.hpp"
|
#include "utils/algorithm.hpp"
|
||||||
|
|
||||||
// TODO: Why is this here?! It's only used in tests and query/repl.cpp
|
// TODO: Why is this here?! It's only used in tests and query/repl.cpp
|
||||||
@ -15,7 +18,6 @@
|
|||||||
* sent to it in an acceptable order, and tracks
|
* sent to it in an acceptable order, and tracks
|
||||||
* the content of those messages.
|
* the content of those messages.
|
||||||
*/
|
*/
|
||||||
template <class TResultValue = communication::bolt::Value>
|
|
||||||
class ResultStreamFaker {
|
class ResultStreamFaker {
|
||||||
public:
|
public:
|
||||||
ResultStreamFaker() = default;
|
ResultStreamFaker() = default;
|
||||||
@ -26,14 +28,33 @@ class ResultStreamFaker {
|
|||||||
|
|
||||||
void Header(const std::vector<std::string> &fields) { header_ = fields; }
|
void Header(const std::vector<std::string> &fields) { header_ = fields; }
|
||||||
|
|
||||||
void Result(const std::vector<TResultValue> &values) {
|
void Result(const std::vector<communication::bolt::Value> &values) {
|
||||||
results_.push_back(values);
|
results_.push_back(values);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Summary(const std::map<std::string, TResultValue> &summary) {
|
void Result(const std::vector<query::TypedValue> &values) {
|
||||||
|
std::vector<communication::bolt::Value> bvalues;
|
||||||
|
bvalues.reserve(values.size());
|
||||||
|
for (const auto &value : values) {
|
||||||
|
bvalues.push_back(glue::ToBoltValue(value, storage::View::NEW));
|
||||||
|
}
|
||||||
|
results_.push_back(std::move(bvalues));
|
||||||
|
}
|
||||||
|
|
||||||
|
void Summary(
|
||||||
|
const std::map<std::string, communication::bolt::Value> &summary) {
|
||||||
summary_ = summary;
|
summary_ = summary;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Summary(const std::map<std::string, query::TypedValue> &summary) {
|
||||||
|
std::map<std::string, communication::bolt::Value> bsummary;
|
||||||
|
for (const auto &item : summary) {
|
||||||
|
bsummary.insert(
|
||||||
|
{item.first, glue::ToBoltValue(item.second, storage::View::NEW)});
|
||||||
|
}
|
||||||
|
summary_ = std::move(bsummary);
|
||||||
|
}
|
||||||
|
|
||||||
const auto &GetHeader() const { return header_; }
|
const auto &GetHeader() const { return header_; }
|
||||||
|
|
||||||
const auto &GetResults() const { return results_; }
|
const auto &GetResults() const { return results_; }
|
||||||
@ -110,6 +131,6 @@ class ResultStreamFaker {
|
|||||||
private:
|
private:
|
||||||
// the data that the record stream can accept
|
// the data that the record stream can accept
|
||||||
std::vector<std::string> header_;
|
std::vector<std::string> header_;
|
||||||
std::vector<std::vector<TResultValue>> results_;
|
std::vector<std::vector<communication::bolt::Value>> results_;
|
||||||
std::map<std::string, TResultValue> summary_;
|
std::map<std::string, communication::bolt::Value> summary_;
|
||||||
};
|
};
|
||||||
|
@ -61,7 +61,7 @@ void query::Repl(database::GraphDb *db, query::Interpreter *interpreter) {
|
|||||||
|
|
||||||
// regular cypher queries
|
// regular cypher queries
|
||||||
try {
|
try {
|
||||||
ResultStreamFaker<query::TypedValue> stream;
|
ResultStreamFaker stream;
|
||||||
auto [header, _] = interpreter->Interpret(command, {});
|
auto [header, _] = interpreter->Interpret(command, {});
|
||||||
stream.Header(header);
|
stream.Header(header);
|
||||||
auto summary = interpreter->PullAll(&stream);
|
auto summary = interpreter->PullAll(&stream);
|
||||||
|
@ -49,7 +49,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) {
|
|||||||
auto query = "MATCH (s:Starting) return s";
|
auto query = "MATCH (s:Starting) return s";
|
||||||
|
|
||||||
while (state.KeepRunning()) {
|
while (state.KeepRunning()) {
|
||||||
ResultStreamFaker<query::TypedValue> results;
|
ResultStreamFaker results;
|
||||||
interpreter().Interpret(query, {});
|
interpreter().Interpret(query, {});
|
||||||
interpreter().PullAll(&results);
|
interpreter().PullAll(&results);
|
||||||
}
|
}
|
||||||
@ -64,7 +64,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) {
|
|||||||
auto query = "MATCH (s:Starting) WITH s MATCH (s)--(d) RETURN count(d)";
|
auto query = "MATCH (s:Starting) WITH s MATCH (s)--(d) RETURN count(d)";
|
||||||
|
|
||||||
while (state.KeepRunning()) {
|
while (state.KeepRunning()) {
|
||||||
ResultStreamFaker<query::TypedValue> results;
|
ResultStreamFaker results;
|
||||||
interpreter().Interpret(query, {});
|
interpreter().Interpret(query, {});
|
||||||
interpreter().PullAll(&results);
|
interpreter().PullAll(&results);
|
||||||
}
|
}
|
||||||
|
@ -112,7 +112,7 @@ static void Distinct(benchmark::State &state) {
|
|||||||
cypher_query, &execution_dba);
|
cypher_query, &execution_dba);
|
||||||
auto plan_and_cost =
|
auto plan_and_cost =
|
||||||
query::plan::MakeLogicalPlan(&context, parameters, false);
|
query::plan::MakeLogicalPlan(&context, parameters, false);
|
||||||
ResultStreamFaker<query::TypedValue> results;
|
ResultStreamFaker results;
|
||||||
// We need to only set the memory for temporary (per pull) evaluations
|
// We need to only set the memory for temporary (per pull) evaluations
|
||||||
TMemory per_pull_memory;
|
TMemory per_pull_memory;
|
||||||
query::EvaluationContext evaluation_context{per_pull_memory.get()};
|
query::EvaluationContext evaluation_context{per_pull_memory.get()};
|
||||||
|
@ -16,7 +16,7 @@ int main(int argc, char *argv[]) {
|
|||||||
query::InterpreterContext interpreter_context{&db};
|
query::InterpreterContext interpreter_context{&db};
|
||||||
query::Interpreter interpreter{&interpreter_context};
|
query::Interpreter interpreter{&interpreter_context};
|
||||||
|
|
||||||
ResultStreamFaker<query::TypedValue> stream;
|
ResultStreamFaker stream;
|
||||||
auto [header, _] = interpreter.Interpret(argv[1], {});
|
auto [header, _] = interpreter.Interpret(argv[1], {});
|
||||||
stream.Header(header);
|
stream.Header(header);
|
||||||
auto summary = interpreter.PullAll(&stream);
|
auto summary = interpreter.PullAll(&stream);
|
||||||
|
@ -186,7 +186,7 @@ class DatabaseEnvironment {
|
|||||||
* Return the query stream.
|
* Return the query stream.
|
||||||
*/
|
*/
|
||||||
auto Execute(const std::string &query) {
|
auto Execute(const std::string &query) {
|
||||||
ResultStreamFaker<query::TypedValue> stream;
|
ResultStreamFaker stream;
|
||||||
|
|
||||||
auto [header, _] = interpreter_.Interpret(query, {});
|
auto [header, _] = interpreter_.Interpret(query, {});
|
||||||
stream.Header(header);
|
stream.Header(header);
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
|
|
||||||
|
#include "communication/bolt/v1/value.hpp"
|
||||||
#include "communication/result_stream_faker.hpp"
|
#include "communication/result_stream_faker.hpp"
|
||||||
#include "database/single_node/graph_db_accessor.hpp"
|
#include "database/single_node/graph_db_accessor.hpp"
|
||||||
|
#include "glue/communication.hpp"
|
||||||
#include "gmock/gmock.h"
|
#include "gmock/gmock.h"
|
||||||
#include "gtest/gtest.h"
|
#include "gtest/gtest.h"
|
||||||
#include "query/exceptions.hpp"
|
#include "query/exceptions.hpp"
|
||||||
@ -9,6 +11,18 @@
|
|||||||
#include "query/typed_value.hpp"
|
#include "query/typed_value.hpp"
|
||||||
#include "query_common.hpp"
|
#include "query_common.hpp"
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
auto ToEdgeList(const communication::bolt::Value &v) {
|
||||||
|
std::vector<communication::bolt::Edge> list;
|
||||||
|
for (auto x : v.ValueList()) {
|
||||||
|
list.push_back(x.ValueEdge());
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
// TODO: This is not a unit test, but tests/integration dir is chaotic at the
|
// TODO: This is not a unit test, but tests/integration dir is chaotic at the
|
||||||
// moment. After tests refactoring is done, move/rename this.
|
// moment. After tests refactoring is done, move/rename this.
|
||||||
|
|
||||||
@ -25,7 +39,7 @@ class InterpreterTest : public ::testing::Test {
|
|||||||
*/
|
*/
|
||||||
auto Interpret(const std::string &query,
|
auto Interpret(const std::string &query,
|
||||||
const std::map<std::string, PropertyValue> ¶ms = {}) {
|
const std::map<std::string, PropertyValue> ¶ms = {}) {
|
||||||
ResultStreamFaker<query::TypedValue> stream;
|
ResultStreamFaker stream;
|
||||||
|
|
||||||
auto [header, _] = interpreter_.Interpret(query, params);
|
auto [header, _] = interpreter_.Interpret(query, params);
|
||||||
stream.Header(header);
|
stream.Header(header);
|
||||||
@ -132,7 +146,8 @@ TEST_F(InterpreterTest, Parameters) {
|
|||||||
PropertyValue(5), PropertyValue(2), PropertyValue(3)})}});
|
PropertyValue(5), PropertyValue(2), PropertyValue(3)})}});
|
||||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||||
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
|
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
|
||||||
auto result = query::test_common::ToIntList(stream.GetResults()[0][0]);
|
auto result = query::test_common::ToIntList(
|
||||||
|
glue::ToTypedValue(stream.GetResults()[0][0]));
|
||||||
ASSERT_THAT(result, testing::ElementsAre(5, 2, 3));
|
ASSERT_THAT(result, testing::ElementsAre(5, 2, 3));
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@ -214,10 +229,12 @@ TEST_F(InterpreterTest, Bfs) {
|
|||||||
|
|
||||||
auto stream = Interpret(
|
auto stream = Interpret(
|
||||||
"MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and "
|
"MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and "
|
||||||
"e.reachable)]->(m) RETURN r");
|
"e.reachable)]->(m) RETURN n, r, m");
|
||||||
|
|
||||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
ASSERT_EQ(stream.GetHeader().size(), 3U);
|
||||||
EXPECT_EQ(stream.GetHeader()[0], "r");
|
EXPECT_EQ(stream.GetHeader()[0], "n");
|
||||||
|
EXPECT_EQ(stream.GetHeader()[1], "r");
|
||||||
|
EXPECT_EQ(stream.GetHeader()[2], "m");
|
||||||
ASSERT_EQ(stream.GetResults().size(), 5 * kNumNodesPerLevel);
|
ASSERT_EQ(stream.GetResults().size(), 5 * kNumNodesPerLevel);
|
||||||
|
|
||||||
auto dba = db_.Access();
|
auto dba = db_.Access();
|
||||||
@ -226,18 +243,22 @@ TEST_F(InterpreterTest, Bfs) {
|
|||||||
std::unordered_set<int64_t> matched_ids;
|
std::unordered_set<int64_t> matched_ids;
|
||||||
|
|
||||||
for (const auto &result : stream.GetResults()) {
|
for (const auto &result : stream.GetResults()) {
|
||||||
const auto &edges = query::test_common::ToEdgeList(result[0]);
|
const auto &begin = result[0].ValueVertex();
|
||||||
|
const auto &edges = ToEdgeList(result[1]);
|
||||||
|
const auto &end = result[2].ValueVertex();
|
||||||
|
|
||||||
// Check that path is of expected length. Returned paths should be from
|
// Check that path is of expected length. Returned paths should be from
|
||||||
// shorter to longer ones.
|
// shorter to longer ones.
|
||||||
EXPECT_EQ(edges.size(), expected_level);
|
EXPECT_EQ(edges.size(), expected_level);
|
||||||
// Check that starting node is correct.
|
// Check that starting node is correct.
|
||||||
EXPECT_EQ(edges[0].impl_.from().PropsAt(dba.Property(kId)).ValueInt(), 0);
|
EXPECT_EQ(edges.front().from, begin.id);
|
||||||
|
EXPECT_EQ(begin.properties.at(kId).ValueInt(), 0);
|
||||||
for (int i = 1; i < static_cast<int>(edges.size()); ++i) {
|
for (int i = 1; i < static_cast<int>(edges.size()); ++i) {
|
||||||
// Check that edges form a connected path.
|
// Check that edges form a connected path.
|
||||||
EXPECT_EQ(edges[i - 1].To(), edges[i].From());
|
EXPECT_EQ(edges[i - 1].to.AsInt(), edges[i].from.AsInt());
|
||||||
}
|
}
|
||||||
auto matched_id =
|
auto matched_id = end.properties.at(kId).ValueInt();
|
||||||
edges.back().impl_.to().PropsAt(dba.Property(kId)).ValueInt();
|
EXPECT_EQ(edges.back().to, end.id);
|
||||||
// Check that we didn't match that node already.
|
// Check that we didn't match that node already.
|
||||||
EXPECT_TRUE(matched_ids.insert(matched_id).second);
|
EXPECT_TRUE(matched_ids.insert(matched_id).second);
|
||||||
// Check that shortest path was found.
|
// Check that shortest path was found.
|
||||||
@ -251,7 +272,6 @@ TEST_F(InterpreterTest, Bfs) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) {
|
TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) {
|
||||||
ResultStreamFaker<query::TypedValue> stream;
|
|
||||||
interpreter_.Interpret("BEGIN", {});
|
interpreter_.Interpret("BEGIN", {});
|
||||||
ASSERT_THROW(interpreter_.Interpret("CREATE INDEX ON :X(y)", {}),
|
ASSERT_THROW(interpreter_.Interpret("CREATE INDEX ON :X(y)", {}),
|
||||||
query::IndexInMulticommandTxException);
|
query::IndexInMulticommandTxException);
|
||||||
@ -276,11 +296,13 @@ TEST_F(InterpreterTest, ShortestPath) {
|
|||||||
{"r1"}, {"r2"}, {"r1", "r2"}};
|
{"r1"}, {"r2"}, {"r1", "r2"}};
|
||||||
|
|
||||||
for (const auto &result : stream.GetResults()) {
|
for (const auto &result : stream.GetResults()) {
|
||||||
const auto &edges = query::test_common::ToEdgeList(result[0]);
|
const auto &edges = ToEdgeList(result[0]);
|
||||||
|
|
||||||
std::vector<std::string> datum;
|
std::vector<std::string> datum;
|
||||||
|
datum.reserve(edges.size());
|
||||||
|
|
||||||
for (const auto &edge : edges) {
|
for (const auto &edge : edges) {
|
||||||
datum.push_back(dba.EdgeTypeName(edge.EdgeType()));
|
datum.push_back(edge.type);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool any_match = false;
|
bool any_match = false;
|
||||||
|
@ -47,14 +47,6 @@ auto ToIntList(const TypedValue &t) {
|
|||||||
return list;
|
return list;
|
||||||
};
|
};
|
||||||
|
|
||||||
auto ToEdgeList(const TypedValue &t) {
|
|
||||||
std::vector<EdgeAccessor> list;
|
|
||||||
for (auto x : t.ValueList()) {
|
|
||||||
list.push_back(x.ValueEdge());
|
|
||||||
}
|
|
||||||
return list;
|
|
||||||
};
|
|
||||||
|
|
||||||
auto ToIntMap(const TypedValue &t) {
|
auto ToIntMap(const TypedValue &t) {
|
||||||
std::map<std::string, int64_t> map;
|
std::map<std::string, int64_t> map;
|
||||||
for (const auto &kv : t.ValueMap())
|
for (const auto &kv : t.ValueMap())
|
||||||
|
@ -38,7 +38,7 @@ class QueryExecution : public testing::Test {
|
|||||||
* Return the query results.
|
* Return the query results.
|
||||||
*/
|
*/
|
||||||
auto Execute(const std::string &query) {
|
auto Execute(const std::string &query) {
|
||||||
ResultStreamFaker<query::TypedValue> stream;
|
ResultStreamFaker stream;
|
||||||
|
|
||||||
auto [header, _] = interpreter_->Interpret(query, {});
|
auto [header, _] = interpreter_->Interpret(query, {});
|
||||||
stream.Header(header);
|
stream.Header(header);
|
||||||
|
Loading…
Reference in New Issue
Block a user