Add support for triggers in database dump (#1610)
This commit is contained in:
parent
bd11266f82
commit
4bb9238679
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -11,6 +11,7 @@
|
||||
|
||||
#include "query/dump.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <iomanip>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
@ -21,9 +22,11 @@
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include "dbms/database.hpp"
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/stream.hpp"
|
||||
#include "query/trigger_context.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
@ -260,10 +263,20 @@ void DumpUniqueConstraint(std::ostream *os, query::DbAccessor *dba, storage::Lab
|
||||
*os << " IS UNIQUE;";
|
||||
}
|
||||
|
||||
const char *triggerPhaseToString(TriggerPhase phase) {
|
||||
switch (phase) {
|
||||
case TriggerPhase::BEFORE_COMMIT:
|
||||
return "BEFORE COMMIT EXECUTE";
|
||||
case TriggerPhase::AFTER_COMMIT:
|
||||
return "AFTER COMMIT EXECUTE";
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
PullPlanDump::PullPlanDump(DbAccessor *dba)
|
||||
PullPlanDump::PullPlanDump(DbAccessor *dba, dbms::DatabaseAccess db_acc)
|
||||
: dba_(dba),
|
||||
db_acc_(db_acc),
|
||||
vertices_iterable_(dba->Vertices(storage::View::OLD)),
|
||||
pull_chunks_{// Dump all label indices
|
||||
CreateLabelIndicesPullChunk(),
|
||||
@ -282,7 +295,9 @@ PullPlanDump::PullPlanDump(DbAccessor *dba)
|
||||
// Drop the internal index
|
||||
CreateDropInternalIndexPullChunk(),
|
||||
// Internal index cleanup
|
||||
CreateInternalIndexCleanupPullChunk()} {}
|
||||
CreateInternalIndexCleanupPullChunk(),
|
||||
// Dump all triggers
|
||||
CreateTriggersPullChunk()} {}
|
||||
|
||||
bool PullPlanDump::Pull(AnyStream *stream, std::optional<int> n) {
|
||||
// Iterate all functions that stream some results.
|
||||
@ -536,6 +551,23 @@ PullPlanDump::PullChunk PullPlanDump::CreateInternalIndexCleanupPullChunk() {
|
||||
};
|
||||
}
|
||||
|
||||
void DumpDatabaseToCypherQueries(query::DbAccessor *dba, AnyStream *stream) { PullPlanDump(dba).Pull(stream, {}); }
|
||||
PullPlanDump::PullChunk PullPlanDump::CreateTriggersPullChunk() {
|
||||
return [this](AnyStream *stream, std::optional<int>) {
|
||||
auto triggers = db_acc_->trigger_store()->GetTriggerInfo();
|
||||
for (const auto &trigger : triggers) {
|
||||
std::ostringstream os;
|
||||
auto trigger_statement_copy = trigger.statement;
|
||||
std::replace(trigger_statement_copy.begin(), trigger_statement_copy.end(), '\n', ' ');
|
||||
os << "CREATE TRIGGER " << trigger.name << " ON " << memgraph::query::TriggerEventTypeToString(trigger.event_type)
|
||||
<< " " << triggerPhaseToString(trigger.phase) << " " << trigger_statement_copy << ";";
|
||||
stream->Result({TypedValue(os.str())});
|
||||
}
|
||||
return 0;
|
||||
};
|
||||
}
|
||||
|
||||
void DumpDatabaseToCypherQueries(query::DbAccessor *dba, AnyStream *stream, dbms::DatabaseAccess db_acc) {
|
||||
PullPlanDump(dba, db_acc).Pull(stream, {});
|
||||
}
|
||||
|
||||
} // namespace memgraph::query
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -11,18 +11,17 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <ostream>
|
||||
|
||||
#include "dbms/database.hpp"
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/stream.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
|
||||
namespace memgraph::query {
|
||||
|
||||
void DumpDatabaseToCypherQueries(query::DbAccessor *dba, AnyStream *stream);
|
||||
void DumpDatabaseToCypherQueries(query::DbAccessor *dba, AnyStream *stream, dbms::DatabaseAccess db_acc);
|
||||
|
||||
struct PullPlanDump {
|
||||
explicit PullPlanDump(query::DbAccessor *dba);
|
||||
explicit PullPlanDump(query::DbAccessor *dba, dbms::DatabaseAccess db_acc);
|
||||
|
||||
/// Pull the dump results lazily
|
||||
/// @return true if all results were returned, false otherwise
|
||||
@ -30,6 +29,7 @@ struct PullPlanDump {
|
||||
|
||||
private:
|
||||
query::DbAccessor *dba_ = nullptr;
|
||||
dbms::DatabaseAccess db_acc_;
|
||||
|
||||
std::optional<storage::IndicesInfo> indices_info_ = std::nullopt;
|
||||
std::optional<storage::ConstraintsInfo> constraints_info_ = std::nullopt;
|
||||
@ -62,5 +62,6 @@ struct PullPlanDump {
|
||||
PullChunk CreateEdgePullChunk();
|
||||
PullChunk CreateDropInternalIndexPullChunk();
|
||||
PullChunk CreateInternalIndexCleanupPullChunk();
|
||||
PullChunk CreateTriggersPullChunk();
|
||||
};
|
||||
} // namespace memgraph::query
|
||||
|
@ -1818,7 +1818,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
|
||||
PreparedQuery PrepareDumpQuery(ParsedQuery parsed_query, CurrentDB ¤t_db) {
|
||||
MG_ASSERT(current_db.execution_db_accessor_, "Dump query expects a current DB transaction");
|
||||
auto *dba = &*current_db.execution_db_accessor_;
|
||||
auto plan = std::make_shared<PullPlanDump>(dba);
|
||||
auto plan = std::make_shared<PullPlanDump>(dba, *current_db.db_acc_);
|
||||
return PreparedQuery{
|
||||
{"QUERY"},
|
||||
std::move(parsed_query.required_privileges),
|
||||
|
@ -9,10 +9,12 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gtest/gtest-typed-test.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <filesystem>
|
||||
#include <map>
|
||||
#include <optional>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
|
||||
@ -24,6 +26,8 @@
|
||||
#include "query/interpreter.hpp"
|
||||
#include "query/interpreter_context.hpp"
|
||||
#include "query/stream/streams.hpp"
|
||||
#include "query/trigger.hpp"
|
||||
#include "query/trigger_context.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
#include "storage/v2/disk/storage.hpp"
|
||||
@ -331,7 +335,7 @@ TYPED_TEST(DumpTest, EmptyGraph) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
ASSERT_EQ(stream.GetResults().size(), 0);
|
||||
}
|
||||
@ -350,7 +354,7 @@ TYPED_TEST(DumpTest, SingleVertex) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), kCreateInternalIndex, "CREATE (:__mg_vertex__ {__mg_id__: 0});",
|
||||
kDropInternalIndex, kRemoveInternalLabelProperty);
|
||||
@ -371,7 +375,7 @@ TYPED_TEST(DumpTest, VertexWithSingleLabel) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), kCreateInternalIndex, "CREATE (:__mg_vertex__:`Label1` {__mg_id__: 0});",
|
||||
kDropInternalIndex, kRemoveInternalLabelProperty);
|
||||
@ -392,7 +396,7 @@ TYPED_TEST(DumpTest, VertexWithMultipleLabels) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), kCreateInternalIndex,
|
||||
"CREATE (:__mg_vertex__:`Label1`:`Label 2` {__mg_id__: 0});", kDropInternalIndex,
|
||||
@ -414,7 +418,7 @@ TYPED_TEST(DumpTest, VertexWithSingleProperty) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), kCreateInternalIndex, "CREATE (:__mg_vertex__ {__mg_id__: 0, `prop`: 42});",
|
||||
kDropInternalIndex, kRemoveInternalLabelProperty);
|
||||
@ -437,7 +441,7 @@ TYPED_TEST(DumpTest, MultipleVertices) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), kCreateInternalIndex, "CREATE (:__mg_vertex__ {__mg_id__: 0});",
|
||||
"CREATE (:__mg_vertex__ {__mg_id__: 1});", "CREATE (:__mg_vertex__ {__mg_id__: 2});",
|
||||
@ -475,7 +479,7 @@ TYPED_TEST(DumpTest, PropertyValue) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), kCreateInternalIndex,
|
||||
"CREATE (:__mg_vertex__ {__mg_id__: 0, `p1`: [{`prop 1`: 13, "
|
||||
@ -502,7 +506,7 @@ TYPED_TEST(DumpTest, SingleEdge) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), kCreateInternalIndex, "CREATE (:__mg_vertex__ {__mg_id__: 0});",
|
||||
"CREATE (:__mg_vertex__ {__mg_id__: 1});",
|
||||
@ -531,7 +535,7 @@ TYPED_TEST(DumpTest, MultipleEdges) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), kCreateInternalIndex, "CREATE (:__mg_vertex__ {__mg_id__: 0});",
|
||||
"CREATE (:__mg_vertex__ {__mg_id__: 1});", "CREATE (:__mg_vertex__ {__mg_id__: 2});",
|
||||
@ -561,7 +565,7 @@ TYPED_TEST(DumpTest, EdgeWithProperties) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), kCreateInternalIndex, "CREATE (:__mg_vertex__ {__mg_id__: 0});",
|
||||
"CREATE (:__mg_vertex__ {__mg_id__: 1});",
|
||||
@ -601,7 +605,7 @@ TYPED_TEST(DumpTest, IndicesKeys) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), "CREATE INDEX ON :`Label1`(`prop`);", "CREATE INDEX ON :`Label 2`(`prop ```);",
|
||||
kCreateInternalIndex, "CREATE (:__mg_vertex__:`Label1`:`Label 2` {__mg_id__: 0, `p`: 1});",
|
||||
@ -630,7 +634,7 @@ TYPED_TEST(DumpTest, ExistenceConstraints) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(), "CREATE CONSTRAINT ON (u:`L``abel 1`) ASSERT EXISTS (u.`prop`);",
|
||||
kCreateInternalIndex, "CREATE (:__mg_vertex__:`L``abel 1` {__mg_id__: 0, `prop`: 1});",
|
||||
@ -665,7 +669,7 @@ TYPED_TEST(DumpTest, UniqueConstraints) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
VerifyQueries(stream.GetResults(),
|
||||
"CREATE CONSTRAINT ON (u:`Label`) ASSERT u.`prop`, u.`prop2` "
|
||||
@ -724,7 +728,7 @@ TYPED_TEST(DumpTest, CheckStateVertexWithMultipleProperties) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
const auto &results = stream.GetResults();
|
||||
ASSERT_GE(results.size(), 1);
|
||||
@ -841,7 +845,7 @@ TYPED_TEST(DumpTest, CheckStateSimpleGraph) {
|
||||
{
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream);
|
||||
memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db);
|
||||
}
|
||||
const auto &results = stream.GetResults();
|
||||
// Indices and constraints are 4 queries and there must be at least one more
|
||||
@ -1056,7 +1060,7 @@ TYPED_TEST(DumpTest, MultiplePartialPulls) {
|
||||
auto acc = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
|
||||
memgraph::query::PullPlanDump pullPlan{&dba};
|
||||
memgraph::query::PullPlanDump pullPlan{&dba, this->db};
|
||||
|
||||
auto offset_index = 0U;
|
||||
auto check_next = [&](const std::string &expected_row) mutable {
|
||||
@ -1095,3 +1099,30 @@ TYPED_TEST(DumpTest, MultiplePartialPulls) {
|
||||
check_next(kDropInternalIndex);
|
||||
check_next(kRemoveInternalLabelProperty);
|
||||
}
|
||||
|
||||
TYPED_TEST(DumpTest, DumpDatabaseWithTriggers) {
|
||||
auto acc = this->db->storage()->Access(memgraph::replication::ReplicationRole::MAIN);
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
{
|
||||
auto trigger_store = this->db.get()->trigger_store();
|
||||
const std::string trigger_name = "test_trigger";
|
||||
const std::string trigger_statement = "UNWIND createdVertices AS newNodes SET newNodes.created = timestamp()";
|
||||
memgraph::query::TriggerEventType trigger_event_type = memgraph::query::TriggerEventType::VERTEX_CREATE;
|
||||
memgraph::query::TriggerPhase trigger_phase = memgraph::query::TriggerPhase::AFTER_COMMIT;
|
||||
memgraph::utils::SkipList<memgraph::query::QueryCacheEntry> ast_cache;
|
||||
memgraph::query::AllowEverythingAuthChecker auth_checker;
|
||||
memgraph::query::InterpreterConfig::Query query_config;
|
||||
memgraph::query::DbAccessor dba(acc.get());
|
||||
const std::map<std::string, memgraph::storage::PropertyValue> props;
|
||||
trigger_store->AddTrigger(trigger_name, trigger_statement, props, trigger_event_type, trigger_phase, &ast_cache,
|
||||
&dba, query_config, std::nullopt, &auth_checker);
|
||||
}
|
||||
{
|
||||
ResultStreamFaker stream(this->db->storage());
|
||||
memgraph::query::AnyStream query_stream(&stream, memgraph::utils::NewDeleteResource());
|
||||
{ memgraph::query::DumpDatabaseToCypherQueries(&dba, &query_stream, this->db); }
|
||||
VerifyQueries(stream.GetResults(),
|
||||
"CREATE TRIGGER test_trigger ON () CREATE AFTER COMMIT EXECUTE UNWIND createdVertices AS newNodes "
|
||||
"SET newNodes.created = timestamp();");
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user