diff --git a/src/database/single_node/dump.cpp b/src/database/single_node/dump.cpp index 0ac8ad038..582b151b4 100644 --- a/src/database/single_node/dump.cpp +++ b/src/database/single_node/dump.cpp @@ -162,11 +162,4 @@ bool CypherDumpGenerator::NextQuery(std::ostream *os) { return false; } -void DumpToCypher(std::ostream *os, GraphDbAccessor *dba) { - CHECK(os && dba); - - CypherDumpGenerator dump(dba); - while (dump.NextQuery(os)) continue; -} - } // namespace database diff --git a/src/database/single_node/dump.hpp b/src/database/single_node/dump.hpp index 40e94bb12..c27e377d4 100644 --- a/src/database/single_node/dump.hpp +++ b/src/database/single_node/dump.hpp @@ -17,6 +17,13 @@ class CypherDumpGenerator { public: explicit CypherDumpGenerator(GraphDbAccessor *dba); + CypherDumpGenerator(const CypherDumpGenerator &other) = delete; + // NOLINTNEXTLINE(performance-noexcept-move-constructor) + CypherDumpGenerator(CypherDumpGenerator &&other) = default; + CypherDumpGenerator &operator=(const CypherDumpGenerator &other) = delete; + CypherDumpGenerator &operator=(CypherDumpGenerator &&other) = delete; + ~CypherDumpGenerator() = default; + bool NextQuery(std::ostream *os); private: @@ -30,6 +37,13 @@ class CypherDumpGenerator { end_(container_.end()), empty_(current_ == end_) {} + ContainerState(const ContainerState &other) = delete; + // NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor) + ContainerState(ContainerState &&other) = default; + ContainerState &operator=(const ContainerState &other) = delete; + ContainerState &operator=(ContainerState &&other) = delete; + ~ContainerState() = default; + auto GetCurrentAndAdvance() { auto to_be_returned = current_; if (current_ != end_) ++current_; @@ -64,10 +78,4 @@ class CypherDumpGenerator { std::optionalEdges(false))>> edges_state_; }; -/// Dumps database state to output stream as openCypher queries. -/// -/// Currently, it only dumps vertices and edges of the graph. In the future, -/// it should also dump indexes, constraints, roles, etc. -void DumpToCypher(std::ostream *os, GraphDbAccessor *dba); - } // namespace database diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 74457d5ab..ba736b750 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -5,6 +5,9 @@ #include #include "auth/auth.hpp" +#ifdef MG_SINGLE_NODE +#include "database/single_node/dump.hpp" +#endif #include "glue/auth.hpp" #include "glue/communication.hpp" #include "integrations/kafka/exceptions.hpp" @@ -34,6 +37,40 @@ DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, namespace query { +#ifdef MG_SINGLE_NODE +namespace { + +class DumpClosure final { + public: + explicit DumpClosure(database::GraphDbAccessor *dba) : dump_generator_(dba) {} + + // Please note that this copy constructor actually moves the other object. We + // want this because lambdas are not movable, i.e. its move constructor + // actually copies the lambda. + DumpClosure(const DumpClosure &other) + : dump_generator_(std::move(other.dump_generator_)) {} + + DumpClosure(DumpClosure &&other) = default; + DumpClosure &operator=(const DumpClosure &other) = delete; + DumpClosure &operator=(DumpClosure &&other) = delete; + ~DumpClosure() {} + + std::optional> operator()(Frame *frame, + ExecutionContext *context) { + std::ostringstream oss; + if (dump_generator_.NextQuery(&oss)) { + return std::make_optional(std::vector{oss.str()}); + } + return std::nullopt; + } + + private: + mutable database::CypherDumpGenerator dump_generator_; +}; + +} // namespace +#endif + class SingleNodeLogicalPlan final : public LogicalPlan { public: SingleNodeLogicalPlan(std::unique_ptr root, @@ -728,10 +765,6 @@ Callback HandleConstraintQuery(ConstraintQuery *constraint_query, #endif } -Callback HandleDumpQuery(database::GraphDbAccessor *db_accessor) { - throw utils::NotYetImplemented("Dump database"); -} - Interpreter::Interpreter() : is_tsc_available_(utils::CheckAvailableTSC()) {} Interpreter::Results Interpreter::operator()( @@ -950,6 +983,32 @@ Interpreter::Results Interpreter::operator()( /* is_profile_query */ true, /* should_abort_query */ true); } + if (auto *dump_query = utils::Downcast(parsed_query.query)) { +#ifdef MG_SINGLE_NODE + database::CypherDumpGenerator dump(&db_accessor); + + SymbolTable symbol_table; + auto query_symbol = symbol_table.CreateSymbol("QUERY", false); + + std::vector output_symbols = {query_symbol}; + std::vector header = {query_symbol.name()}; + + auto output_plan = std::make_unique( + output_symbols, DumpClosure(&db_accessor)); + plan = std::make_shared(std::make_unique( + std::move(output_plan), 0.0, AstStorage{}, symbol_table)); + + summary["planning_time"] = planning_timer.Elapsed().count(); + + return Results(&db_accessor, parameters, plan, output_symbols, header, + summary, parsed_query.required_privileges, + /* is_profile_query */ false, + /* should_abort_query */ false); +#else + throw utils::NotYetImplemented("Dump database"); +#endif + } + Callback callback; if (auto *index_query = utils::Downcast(parsed_query.query)) { if (in_explicit_transaction) { @@ -994,9 +1053,6 @@ Interpreter::Results Interpreter::operator()( } else if (auto *constraint_query = utils::Downcast(parsed_query.query)) { callback = HandleConstraintQuery(constraint_query, &db_accessor); - } else if (auto *dump_query = - utils::Downcast(parsed_query.query)) { - callback = HandleDumpQuery(&db_accessor); } else { LOG(FATAL) << "Should not get here -- unknown query type!"; } diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 7c3ad4233..fab7bd85b 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -3502,4 +3502,50 @@ UniqueCursorPtr OutputTable::MakeCursor(database::GraphDbAccessor *, return MakeUniqueCursorPtr(mem, *this); } +OutputTableStream::OutputTableStream( + std::vector output_symbols, + std::function>(Frame *, + ExecutionContext *)> + callback) + : output_symbols_(std::move(output_symbols)), + callback_(std::move(callback)) {} + +WITHOUT_SINGLE_INPUT(OutputTableStream); + +class OutputTableStreamCursor : public Cursor { + public: + explicit OutputTableStreamCursor(const OutputTableStream *self) + : self_(self) {} + + bool Pull(Frame &frame, ExecutionContext &context) override { + const auto row = self_->callback_(&frame, &context); + if (row) { + CHECK(row->size() == self_->output_symbols_.size()) + << "Wrong number of columns in row!"; + for (size_t i = 0; i < self_->output_symbols_.size(); ++i) { + frame[self_->output_symbols_[i]] = row->at(i); + } + return true; + } + return false; + } + + // TODO(tsabolcec): Come up with better approach for handling `Reset()`. + // One possibility is to implement a custom closure utility class with + // `Reset()` method. + void Reset() override { + throw utils::NotYetImplemented("OutputTableStreamCursor::Reset"); + } + + void Shutdown() override {} + + private: + const OutputTableStream *self_; +}; + +UniqueCursorPtr OutputTableStream::MakeCursor( + database::GraphDbAccessor *, utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr(mem, this); +} + } // namespace query::plan diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index 1eba32281..76836aac4 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -2102,5 +2102,40 @@ of symbols used by each of the inputs.") (:serialize (:slk)) (:clone)) +(lcp:define-class output-table-stream (logical-operator) + ((output-symbols "std::vector" :scope :public :dont-save t) + (callback "std::function>(Frame *, ExecutionContext *)>" + :scope :public :dont-save t :clone :copy)) + (:documentation "An operator that outputs a table, producing a single row on each pull. +This class is different from @c OutputTable in that its callback doesn't fetch all rows +at once. Instead, each call of the callback should return a single row of the table.") + (:public + #>cpp + OutputTableStream() {} + OutputTableStream( + std::vector output_symbols, + std::function>(Frame *, ExecutionContext *)> + callback); + + bool Accept(HierarchicalLogicalOperatorVisitor &) override { + LOG(FATAL) << "OutputTableStream operator should not be visited!"; + } + + UniqueCursorPtr MakeCursor( + database::GraphDbAccessor *, utils::MemoryResource *) const override; + std::vector OutputSymbols(const SymbolTable &) const override { + return output_symbols_; + } + std::vector ModifiedSymbols(const SymbolTable &) const override { + return output_symbols_; + } + + bool HasSingleInput() const override; + std::shared_ptr input() const override; + void set_input(std::shared_ptr input) override; + cpp<#) + (:serialize (:slk)) + (:clone)) + (lcp:pop-namespace) ;; plan (lcp:pop-namespace) ;; query diff --git a/tests/unit/database_dump.cpp b/tests/unit/database_dump.cpp index 4f3cf6b91..d6b224961 100644 --- a/tests/unit/database_dump.cpp +++ b/tests/unit/database_dump.cpp @@ -105,13 +105,6 @@ std::string DumpNext(CypherDumpGenerator *dump) { class DatabaseEnvironment { public: - std::string DumpStr() { - auto dba = db_.Access(); - std::ostringstream oss; - database::DumpToCypher(&oss, &dba); - return oss.str(); - } - GraphDbAccessor Access() { return db_.Access(); } DatabaseState GetState() { @@ -494,3 +487,28 @@ TEST(DumpTest, CheckStateSimpleGraph) { // Make sure that dump function doesn't make changes on the database. EXPECT_EQ(db.GetState(), db_initial_state); } + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST(DumpTest, ExecuteDumpDatabase) { + DatabaseEnvironment db; + { + auto dba = db.Access(); + CreateVertex(&dba, {}, {}, false); + dba.Commit(); + } + + { + auto dba = db.Access(); + const std::string query = "DUMP DATABASE"; + ResultStreamFaker stream; + auto results = query::Interpreter()(query, dba, {}, false); + + stream.Header(results.header()); + results.PullAll(stream); + stream.Summary(results.summary()); + + EXPECT_EQ(stream.GetResults().size(), 4U); + ASSERT_EQ(stream.GetHeader().size(), 1U); + EXPECT_EQ(stream.GetHeader()[0], "QUERY"); + } +}