Handle dump database
query
Summary: Stream queries to the output table. For effective output streaming, a simple operator, `OutputTableStream` is implemented which fetches and produces a single row on each Pull. Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2099
This commit is contained in:
parent
fa62ea1920
commit
9cc2224ac3
@ -162,11 +162,4 @@ bool CypherDumpGenerator::NextQuery(std::ostream *os) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void DumpToCypher(std::ostream *os, GraphDbAccessor *dba) {
|
||||
CHECK(os && dba);
|
||||
|
||||
CypherDumpGenerator dump(dba);
|
||||
while (dump.NextQuery(os)) continue;
|
||||
}
|
||||
|
||||
} // namespace database
|
||||
|
@ -17,6 +17,13 @@ class CypherDumpGenerator {
|
||||
public:
|
||||
explicit CypherDumpGenerator(GraphDbAccessor *dba);
|
||||
|
||||
CypherDumpGenerator(const CypherDumpGenerator &other) = delete;
|
||||
// NOLINTNEXTLINE(performance-noexcept-move-constructor)
|
||||
CypherDumpGenerator(CypherDumpGenerator &&other) = default;
|
||||
CypherDumpGenerator &operator=(const CypherDumpGenerator &other) = delete;
|
||||
CypherDumpGenerator &operator=(CypherDumpGenerator &&other) = delete;
|
||||
~CypherDumpGenerator() = default;
|
||||
|
||||
bool NextQuery(std::ostream *os);
|
||||
|
||||
private:
|
||||
@ -30,6 +37,13 @@ class CypherDumpGenerator {
|
||||
end_(container_.end()),
|
||||
empty_(current_ == end_) {}
|
||||
|
||||
ContainerState(const ContainerState &other) = delete;
|
||||
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
|
||||
ContainerState(ContainerState &&other) = default;
|
||||
ContainerState &operator=(const ContainerState &other) = delete;
|
||||
ContainerState &operator=(ContainerState &&other) = delete;
|
||||
~ContainerState() = default;
|
||||
|
||||
auto GetCurrentAndAdvance() {
|
||||
auto to_be_returned = current_;
|
||||
if (current_ != end_) ++current_;
|
||||
@ -64,10 +78,4 @@ class CypherDumpGenerator {
|
||||
std::optional<ContainerState<decltype(dba_->Edges(false))>> edges_state_;
|
||||
};
|
||||
|
||||
/// Dumps database state to output stream as openCypher queries.
|
||||
///
|
||||
/// Currently, it only dumps vertices and edges of the graph. In the future,
|
||||
/// it should also dump indexes, constraints, roles, etc.
|
||||
void DumpToCypher(std::ostream *os, GraphDbAccessor *dba);
|
||||
|
||||
} // namespace database
|
||||
|
@ -5,6 +5,9 @@
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "auth/auth.hpp"
|
||||
#ifdef MG_SINGLE_NODE
|
||||
#include "database/single_node/dump.hpp"
|
||||
#endif
|
||||
#include "glue/auth.hpp"
|
||||
#include "glue/communication.hpp"
|
||||
#include "integrations/kafka/exceptions.hpp"
|
||||
@ -34,6 +37,40 @@ DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60,
|
||||
|
||||
namespace query {
|
||||
|
||||
#ifdef MG_SINGLE_NODE
|
||||
namespace {
|
||||
|
||||
class DumpClosure final {
|
||||
public:
|
||||
explicit DumpClosure(database::GraphDbAccessor *dba) : dump_generator_(dba) {}
|
||||
|
||||
// Please note that this copy constructor actually moves the other object. We
|
||||
// want this because lambdas are not movable, i.e. its move constructor
|
||||
// actually copies the lambda.
|
||||
DumpClosure(const DumpClosure &other)
|
||||
: dump_generator_(std::move(other.dump_generator_)) {}
|
||||
|
||||
DumpClosure(DumpClosure &&other) = default;
|
||||
DumpClosure &operator=(const DumpClosure &other) = delete;
|
||||
DumpClosure &operator=(DumpClosure &&other) = delete;
|
||||
~DumpClosure() {}
|
||||
|
||||
std::optional<std::vector<TypedValue>> operator()(Frame *frame,
|
||||
ExecutionContext *context) {
|
||||
std::ostringstream oss;
|
||||
if (dump_generator_.NextQuery(&oss)) {
|
||||
return std::make_optional(std::vector<TypedValue>{oss.str()});
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
private:
|
||||
mutable database::CypherDumpGenerator dump_generator_;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
#endif
|
||||
|
||||
class SingleNodeLogicalPlan final : public LogicalPlan {
|
||||
public:
|
||||
SingleNodeLogicalPlan(std::unique_ptr<plan::LogicalOperator> root,
|
||||
@ -728,10 +765,6 @@ Callback HandleConstraintQuery(ConstraintQuery *constraint_query,
|
||||
#endif
|
||||
}
|
||||
|
||||
Callback HandleDumpQuery(database::GraphDbAccessor *db_accessor) {
|
||||
throw utils::NotYetImplemented("Dump database");
|
||||
}
|
||||
|
||||
Interpreter::Interpreter() : is_tsc_available_(utils::CheckAvailableTSC()) {}
|
||||
|
||||
Interpreter::Results Interpreter::operator()(
|
||||
@ -950,6 +983,32 @@ Interpreter::Results Interpreter::operator()(
|
||||
/* is_profile_query */ true, /* should_abort_query */ true);
|
||||
}
|
||||
|
||||
if (auto *dump_query = utils::Downcast<DumpQuery>(parsed_query.query)) {
|
||||
#ifdef MG_SINGLE_NODE
|
||||
database::CypherDumpGenerator dump(&db_accessor);
|
||||
|
||||
SymbolTable symbol_table;
|
||||
auto query_symbol = symbol_table.CreateSymbol("QUERY", false);
|
||||
|
||||
std::vector<Symbol> output_symbols = {query_symbol};
|
||||
std::vector<std::string> header = {query_symbol.name()};
|
||||
|
||||
auto output_plan = std::make_unique<plan::OutputTableStream>(
|
||||
output_symbols, DumpClosure(&db_accessor));
|
||||
plan = std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
||||
std::move(output_plan), 0.0, AstStorage{}, symbol_table));
|
||||
|
||||
summary["planning_time"] = planning_timer.Elapsed().count();
|
||||
|
||||
return Results(&db_accessor, parameters, plan, output_symbols, header,
|
||||
summary, parsed_query.required_privileges,
|
||||
/* is_profile_query */ false,
|
||||
/* should_abort_query */ false);
|
||||
#else
|
||||
throw utils::NotYetImplemented("Dump database");
|
||||
#endif
|
||||
}
|
||||
|
||||
Callback callback;
|
||||
if (auto *index_query = utils::Downcast<IndexQuery>(parsed_query.query)) {
|
||||
if (in_explicit_transaction) {
|
||||
@ -994,9 +1053,6 @@ Interpreter::Results Interpreter::operator()(
|
||||
} else if (auto *constraint_query =
|
||||
utils::Downcast<ConstraintQuery>(parsed_query.query)) {
|
||||
callback = HandleConstraintQuery(constraint_query, &db_accessor);
|
||||
} else if (auto *dump_query =
|
||||
utils::Downcast<DumpQuery>(parsed_query.query)) {
|
||||
callback = HandleDumpQuery(&db_accessor);
|
||||
} else {
|
||||
LOG(FATAL) << "Should not get here -- unknown query type!";
|
||||
}
|
||||
|
@ -3502,4 +3502,50 @@ UniqueCursorPtr OutputTable::MakeCursor(database::GraphDbAccessor *,
|
||||
return MakeUniqueCursorPtr<OutputTableCursor>(mem, *this);
|
||||
}
|
||||
|
||||
OutputTableStream::OutputTableStream(
|
||||
std::vector<Symbol> output_symbols,
|
||||
std::function<std::optional<std::vector<TypedValue>>(Frame *,
|
||||
ExecutionContext *)>
|
||||
callback)
|
||||
: output_symbols_(std::move(output_symbols)),
|
||||
callback_(std::move(callback)) {}
|
||||
|
||||
WITHOUT_SINGLE_INPUT(OutputTableStream);
|
||||
|
||||
class OutputTableStreamCursor : public Cursor {
|
||||
public:
|
||||
explicit OutputTableStreamCursor(const OutputTableStream *self)
|
||||
: self_(self) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
const auto row = self_->callback_(&frame, &context);
|
||||
if (row) {
|
||||
CHECK(row->size() == self_->output_symbols_.size())
|
||||
<< "Wrong number of columns in row!";
|
||||
for (size_t i = 0; i < self_->output_symbols_.size(); ++i) {
|
||||
frame[self_->output_symbols_[i]] = row->at(i);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO(tsabolcec): Come up with better approach for handling `Reset()`.
|
||||
// One possibility is to implement a custom closure utility class with
|
||||
// `Reset()` method.
|
||||
void Reset() override {
|
||||
throw utils::NotYetImplemented("OutputTableStreamCursor::Reset");
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
private:
|
||||
const OutputTableStream *self_;
|
||||
};
|
||||
|
||||
UniqueCursorPtr OutputTableStream::MakeCursor(
|
||||
database::GraphDbAccessor *, utils::MemoryResource *mem) const {
|
||||
return MakeUniqueCursorPtr<OutputTableStreamCursor>(mem, this);
|
||||
}
|
||||
|
||||
} // namespace query::plan
|
||||
|
@ -2102,5 +2102,40 @@ of symbols used by each of the inputs.")
|
||||
(:serialize (:slk))
|
||||
(:clone))
|
||||
|
||||
(lcp:define-class output-table-stream (logical-operator)
|
||||
((output-symbols "std::vector<Symbol>" :scope :public :dont-save t)
|
||||
(callback "std::function<std::optional<std::vector<TypedValue>>(Frame *, ExecutionContext *)>"
|
||||
:scope :public :dont-save t :clone :copy))
|
||||
(:documentation "An operator that outputs a table, producing a single row on each pull.
|
||||
This class is different from @c OutputTable in that its callback doesn't fetch all rows
|
||||
at once. Instead, each call of the callback should return a single row of the table.")
|
||||
(:public
|
||||
#>cpp
|
||||
OutputTableStream() {}
|
||||
OutputTableStream(
|
||||
std::vector<Symbol> output_symbols,
|
||||
std::function<std::optional<std::vector<TypedValue>>(Frame *, ExecutionContext *)>
|
||||
callback);
|
||||
|
||||
bool Accept(HierarchicalLogicalOperatorVisitor &) override {
|
||||
LOG(FATAL) << "OutputTableStream operator should not be visited!";
|
||||
}
|
||||
|
||||
UniqueCursorPtr MakeCursor(
|
||||
database::GraphDbAccessor *, utils::MemoryResource *) const override;
|
||||
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override {
|
||||
return output_symbols_;
|
||||
}
|
||||
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override {
|
||||
return output_symbols_;
|
||||
}
|
||||
|
||||
bool HasSingleInput() const override;
|
||||
std::shared_ptr<LogicalOperator> input() const override;
|
||||
void set_input(std::shared_ptr<LogicalOperator> input) override;
|
||||
cpp<#)
|
||||
(:serialize (:slk))
|
||||
(:clone))
|
||||
|
||||
(lcp:pop-namespace) ;; plan
|
||||
(lcp:pop-namespace) ;; query
|
||||
|
@ -105,13 +105,6 @@ std::string DumpNext(CypherDumpGenerator *dump) {
|
||||
|
||||
class DatabaseEnvironment {
|
||||
public:
|
||||
std::string DumpStr() {
|
||||
auto dba = db_.Access();
|
||||
std::ostringstream oss;
|
||||
database::DumpToCypher(&oss, &dba);
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
GraphDbAccessor Access() { return db_.Access(); }
|
||||
|
||||
DatabaseState GetState() {
|
||||
@ -494,3 +487,28 @@ TEST(DumpTest, CheckStateSimpleGraph) {
|
||||
// Make sure that dump function doesn't make changes on the database.
|
||||
EXPECT_EQ(db.GetState(), db_initial_state);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST(DumpTest, ExecuteDumpDatabase) {
|
||||
DatabaseEnvironment db;
|
||||
{
|
||||
auto dba = db.Access();
|
||||
CreateVertex(&dba, {}, {}, false);
|
||||
dba.Commit();
|
||||
}
|
||||
|
||||
{
|
||||
auto dba = db.Access();
|
||||
const std::string query = "DUMP DATABASE";
|
||||
ResultStreamFaker<query::TypedValue> stream;
|
||||
auto results = query::Interpreter()(query, dba, {}, false);
|
||||
|
||||
stream.Header(results.header());
|
||||
results.PullAll(stream);
|
||||
stream.Summary(results.summary());
|
||||
|
||||
EXPECT_EQ(stream.GetResults().size(), 4U);
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "QUERY");
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user