Produce logical op can now be streamed out by query/entry.hpp. Streaming, header and output functions removed from LogicalOperator

Summary: See above.

Reviewers: teon.banek, mislav.bradac, buda

Reviewed By: teon.banek, mislav.bradac, buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D117
This commit is contained in:
florijan 2017-03-14 09:38:20 +01:00
parent de9e4991ac
commit 2e0cc13813
2 changed files with 53 additions and 47 deletions

View File

@ -1,15 +1,38 @@
#pragma once
#include "query/frontend/interpret/interpret.hpp"
#include "query/frontend/logical/planner.hpp"
#include "query/context.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/context.hpp"
#include "query/context.hpp"
#include "query/frontend/interpret/interpret.hpp"
#include "query/frontend/logical/planner.hpp"
#include "query/frontend/opencypher/parser.hpp"
#include "query/frontend/typecheck/typecheck.hpp"
namespace query {
/**
* A Stream implementation that writes out to the
* console (for testing and debugging only).
*/
// TODO move somewhere to /test/manual or so
class ConsoleResultStream : public Loggable {
public:
ConsoleResultStream() : Loggable("ConsoleResultStream") {}
void Header(const std::vector<std::string> &) { logger.info("header"); }
void Result(std::vector<TypedValue> &values) {
for (auto value : values) {
auto va = value.Value<VertexAccessor>();
logger.info(" {}", va.labels().size());
}
}
void Summary(const std::map<std::string, TypedValue> &) {
logger.info("summary");
}
};
template <typename Stream>
class Engine {
public:
@ -38,18 +61,34 @@ class Engine {
// generate frame based on symbol table max_position
Frame frame(symbol_table.max_position());
// interpret
auto cursor = logical_plan->MakeCursor(db_accessor);
logical_plan->WriteHeader(stream);
auto symbols = logical_plan->OutputSymbols(symbol_table);
while (cursor->pull(frame, symbol_table)) {
std::vector<TypedValue> values;
for (auto symbol : symbols) {
values.emplace_back(frame[symbol.position_]);
auto *produce = dynamic_cast<Produce*>(logical_plan.get());
if (produce) {
// top level node in the operator tree is a produce (return)
// so stream out results
// generate header
std::vector<std::string> header;
for (auto named_expression : produce->named_expressions())
header.push_back(named_expression->name_);
stream.Header(header);
// collect the symbols from the return clause
std::vector<Symbol> symbols;
for (auto named_expression : produce->named_expressions())
symbols.emplace_back(symbol_table[*named_expression]);
// stream out results
auto cursor = produce->MakeCursor(db_accessor);
while (cursor->Pull(frame, symbol_table)) {
std::vector<TypedValue> values;
for (auto &symbol : symbols)
values.emplace_back(frame[symbol.position_]);
stream.Result(values);
}
stream.Result(values);
stream.Summary({{std::string("type"), TypedValue("r")}});
}
stream.Summary({{std::string("type"), TypedValue("r")}});
}
};
}

View File

@ -11,24 +11,6 @@
namespace query {
class ConsoleResultStream : public Loggable {
public:
ConsoleResultStream() : Loggable("ConsoleResultStream") {}
void Header(const std::vector<std::string>&) { logger.info("header"); }
void Result(std::vector<TypedValue>& values) {
for (auto value : values) {
auto va = value.Value<VertexAccessor>();
logger.info(" {}", va.labels().size());
}
}
void Summary(const std::map<std::string, TypedValue>&) {
logger.info("summary");
}
};
class Cursor {
public:
virtual bool Pull(Frame&, SymbolTable&) = 0;
@ -39,10 +21,6 @@ class LogicalOperator {
public:
auto children() { return children_; };
virtual std::unique_ptr<Cursor> MakeCursor(GraphDbAccessor& db) = 0;
virtual void WriteHeader(ConsoleResultStream&) {}
virtual std::vector<Symbol> OutputSymbols(SymbolTable& symbol_table) {
return {};
}
virtual ~LogicalOperator() {}
protected:
@ -106,22 +84,11 @@ class Produce : public LogicalOperator {
children_.emplace_back(input);
}
void WriteHeader(ConsoleResultStream& stream) override {
// TODO: write real result
stream.Header({"n"});
}
std::unique_ptr<Cursor> MakeCursor(GraphDbAccessor& db) override {
return std::make_unique<ProduceCursor>(*this, db);
}
std::vector<Symbol> OutputSymbols(SymbolTable& symbol_table) override {
std::vector<Symbol> result;
for (auto named_expr : named_expressions_) {
result.emplace_back(symbol_table[*named_expr]);
}
return result;
}
const auto& named_expressions() { return named_expressions_; }
private:
class ProduceCursor : public Cursor {