Compare commits

..

19 Commits

Author SHA1 Message Date
gvolfing
61b76044c1 Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-25 08:21:16 +01:00
gvolfing
0a9d6475c0 Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-22 16:01:13 +01:00
gvolfing
878b6cfe4f Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-22 11:42:02 +01:00
gvolfing
6b7e1d8a9f Assert only if metadata flag is enabled 2024-03-22 11:41:02 +01:00
gvolfing
6f18117b0a Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-20 11:39:36 +01:00
gvolfing
da1189b960 Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-19 16:21:50 +01:00
gvolfing
494b4d2d83 Undo outcommented tests 2024-03-19 16:21:16 +01:00
gvolfing
527d4c864f React to PR comments 2024-03-19 16:18:23 +01:00
gvolfing
71fdace31d Generic refactor 2024-03-18 12:52:31 +01:00
gvolfing
9dff306782 Merge branch 'master' into Fetch-edge-by-edge-id 2024-03-18 11:10:21 +01:00
gvolfing
5d233c2486 Conform cpp-api when changing edges on the fly 2024-03-18 11:02:43 +01:00
gvolfing
db6b1fc265 Implement edge-visibility checking 2024-03-18 09:48:00 +01:00
gvolfing
9825bdb41b Add snapshot based recovery capabilites 2024-03-15 15:59:01 +01:00
gvolfing
4f6d8c10ec Add metadata flag + runtime check during id scan 2024-03-15 13:56:43 +01:00
gvolfing
96651170a0 Add query plan test 2024-03-12 08:50:51 +01:00
gvolfing
a5ee0fad08 Enhance planner for edge-id indexing 2024-03-11 19:39:32 +01:00
gvolfing
956705293e Add logical operator for id based edge scans 2024-03-11 09:56:00 +01:00
gvolfing
5167c6ee37 Add temporary edge retrieval code 2024-03-10 19:39:30 +01:00
gvolfing
a58f2887b3 Create storage level call-chain 2024-03-10 18:56:15 +01:00
34 changed files with 490 additions and 87 deletions

View File

@ -119,6 +119,10 @@ modifications:
value: "false"
override: true
- name: "storage_enable_edges_metadata"
value: "false"
override: true
- name: "query_callable_mappings_path"
value: "/etc/memgraph/apoc_compatibility_mappings.json"
override: true

View File

@ -284,8 +284,8 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
try {
spdlog::debug("Loading snapshot");
auto recovered_snapshot = storage::durability::LoadSnapshot(
*maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->repl_storage_state_.history,
storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_);
*maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->edges_metadata_,
&storage->repl_storage_state_.history, storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_);
spdlog::debug("Snapshot loaded successfully");
// If this step is present it should always be the first step of
// the recovery so we use the UUID we read from snasphost

View File

@ -131,6 +131,11 @@ DEFINE_uint64(storage_recovery_thread_count,
DEFINE_bool(storage_enable_schema_metadata, false,
"Controls whether metadata should be collected about the resident labels and edge types.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_enable_edges_metadata, false,
"Controls whether additional metadata should be stored about the edges in order to do faster traversals on "
"certain queries.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_delta_on_identical_property_update, true,
"Controls whether updating a property with the same value should create a delta object.");

View File

@ -85,6 +85,8 @@ DECLARE_uint64(storage_recovery_thread_count);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_enable_schema_metadata);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_enable_edges_metadata);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_delta_on_identical_property_update);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -332,9 +332,16 @@ int main(int argc, char **argv) {
.durability_directory = FLAGS_data_directory + "/rocksdb_durability",
.wal_directory = FLAGS_data_directory + "/rocksdb_wal"},
.salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges,
.enable_edges_metadata =
FLAGS_storage_properties_on_edges ? FLAGS_storage_enable_edges_metadata : false,
.enable_schema_metadata = FLAGS_storage_enable_schema_metadata,
.delta_on_identical_property_update = FLAGS_storage_delta_on_identical_property_update},
.salient.storage_mode = memgraph::flags::ParseStorageMode()};
if (!FLAGS_storage_properties_on_edges && FLAGS_storage_enable_edges_metadata) {
spdlog::warn(
"Properties on edges were not enabled, hence edges metadata will also be disabled. If you wish to utilize "
"extra metadata on edges, enable properties on edges as well.");
}
spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup,
FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup);
memgraph::utils::Scheduler jemalloc_purge_scheduler;

View File

@ -716,8 +716,7 @@ int main(int argc, char *argv[]) {
.recover_on_startup = false,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::DISABLED,
.snapshot_on_exit = true},
.salient = {.items = {.properties_on_edges = FLAGS_storage_properties_on_edges}},
};
.salient = {.items = {.properties_on_edges = FLAGS_storage_properties_on_edges}}};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
auto store = memgraph::dbms::CreateInMemoryStorage(config, repl_state);

View File

@ -439,6 +439,12 @@ class DbAccessor final {
return std::nullopt;
}
std::optional<EdgeAccessor> FindEdge(storage::Gid gid, storage::View view) {
auto maybe_edge = accessor_->FindEdge(gid, view);
if (maybe_edge) return EdgeAccessor(*maybe_edge);
return std::nullopt;
}
void FinalizeTransaction() { accessor_->FinalizeTransaction(); }
void TrackCurrentThreadAllocations() {
@ -813,6 +819,8 @@ class SubgraphDbAccessor final {
std::optional<VertexAccessor> FindVertex(storage::Gid gid, storage::View view);
std::optional<EdgeAccessor> FindEdge(storage::Gid gid, storage::View view);
Graph *getGraph();
storage::StorageMode GetStorageMode() const noexcept;

View File

@ -117,6 +117,9 @@ class PlanHintsProvider final : public HierarchicalLogicalOperatorVisitor {
bool PreVisit(ScanAllByEdgeType & /*unused*/) override { return true; }
bool PostVisit(ScanAllByEdgeType & /*unused*/) override { return true; }
bool PreVisit(ScanAllByEdgeId & /*unused*/) override { return true; }
bool PostVisit(ScanAllByEdgeId & /*unused*/) override { return true; }
bool PreVisit(ConstructNamedPath & /*unused*/) override { return true; }
bool PostVisit(ConstructNamedPath & /*unused*/) override { return true; }

View File

@ -110,6 +110,7 @@ extern const Event ScanAllByLabelPropertyValueOperator;
extern const Event ScanAllByLabelPropertyOperator;
extern const Event ScanAllByIdOperator;
extern const Event ScanAllByEdgeTypeOperator;
extern const Event ScanAllByEdgeIdOperator;
extern const Event ExpandOperator;
extern const Event ExpandVariableOperator;
extern const Event ConstructNamedPathOperator;
@ -544,7 +545,7 @@ class ScanAllCursor : public Cursor {
template <typename TEdgesFun>
class ScanAllByEdgeTypeCursor : public Cursor {
public:
explicit ScanAllByEdgeTypeCursor(const ScanAllByEdgeType &self, Symbol output_symbol, UniqueCursorPtr input_cursor,
explicit ScanAllByEdgeTypeCursor(const ScanAll &self, Symbol output_symbol, UniqueCursorPtr input_cursor,
storage::View view, TEdgesFun get_edges, const char *op_name)
: self_(self),
output_symbol_(std::move(output_symbol)),
@ -584,7 +585,7 @@ class ScanAllByEdgeTypeCursor : public Cursor {
}
private:
const ScanAllByEdgeType &self_;
const ScanAll &self_;
const Symbol output_symbol_;
const UniqueCursorPtr input_cursor_;
storage::View view_;
@ -636,10 +637,7 @@ UniqueCursorPtr ScanAllByLabel::MakeCursor(utils::MemoryResource *mem) const {
ScanAllByEdgeType::ScanAllByEdgeType(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol,
storage::EdgeTypeId edge_type, storage::View view)
: input_(input ? input : std::make_shared<Once>()),
output_symbol_(std::move(output_symbol)),
view_(view),
edge_type_(edge_type) {}
: ScanAll(input, output_symbol, view), edge_type_(edge_type) {}
ACCEPT_WITH_INPUT(ScanAllByEdgeType)
@ -805,6 +803,38 @@ UniqueCursorPtr ScanAllById::MakeCursor(utils::MemoryResource *mem) const {
view_, std::move(vertices), "ScanAllById");
}
ScanAllByEdgeId::ScanAllByEdgeId(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol,
Expression *expression, storage::View view)
: ScanAll(input, output_symbol, view), expression_(expression) {
MG_ASSERT(expression);
}
ACCEPT_WITH_INPUT(ScanAllByEdgeId)
UniqueCursorPtr ScanAllByEdgeId::MakeCursor(utils::MemoryResource *mem) const {
memgraph::metrics::IncrementCounter(memgraph::metrics::ScanAllByEdgeIdOperator);
auto edges = [this](Frame &frame, ExecutionContext &context) -> std::optional<std::vector<EdgeAccessor>> {
auto *db = context.db_accessor;
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, view_);
auto value = expression_->Accept(evaluator);
if (!value.IsNumeric()) return std::nullopt;
int64_t id = value.IsInt() ? value.ValueInt() : value.ValueDouble();
if (value.IsDouble() && id != value.ValueDouble()) return std::nullopt;
auto maybe_edge = db->FindEdge(storage::Gid::FromInt(id), view_);
if (!maybe_edge) return std::nullopt;
return std::vector<EdgeAccessor>{*maybe_edge};
};
return MakeUniqueCursorPtr<ScanAllByEdgeTypeCursor<decltype(edges)>>(
mem, *this, output_symbol_, input_->MakeCursor(mem), view_, std::move(edges), "ScanAllByEdgeId");
}
std::vector<Symbol> ScanAllByEdgeId::ModifiedSymbols(const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(output_symbol_);
return symbols;
}
namespace {
bool CheckExistingNode(const VertexAccessor &new_node, const Symbol &existing_node_sym, Frame &frame) {
const TypedValue &existing_node = frame[existing_node_sym];

View File

@ -95,6 +95,7 @@ class ScanAllByLabelPropertyValue;
class ScanAllByLabelProperty;
class ScanAllById;
class ScanAllByEdgeType;
class ScanAllByEdgeId;
class Expand;
class ExpandVariable;
class ConstructNamedPath;
@ -128,13 +129,12 @@ class IndexedJoin;
class HashJoin;
class RollUpApply;
using LogicalOperatorCompositeVisitor =
utils::CompositeVisitor<Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel, ScanAllByLabelPropertyRange,
ScanAllByLabelPropertyValue, ScanAllByLabelProperty, ScanAllById, ScanAllByEdgeType, Expand,
ExpandVariable, ConstructNamedPath, Filter, Produce, Delete, SetProperty, SetProperties,
SetLabels, RemoveProperty, RemoveLabels, EdgeUniquenessFilter, Accumulate, Aggregate, Skip,
Limit, OrderBy, Merge, Optional, Unwind, Distinct, Union, Cartesian, CallProcedure, LoadCsv,
Foreach, EmptyResult, EvaluatePatternFilter, Apply, IndexedJoin, HashJoin, RollUpApply>;
using LogicalOperatorCompositeVisitor = utils::CompositeVisitor<
Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel, ScanAllByLabelPropertyRange, ScanAllByLabelPropertyValue,
ScanAllByLabelProperty, ScanAllById, ScanAllByEdgeType, ScanAllByEdgeId, Expand, ExpandVariable, ConstructNamedPath,
Filter, Produce, Delete, SetProperty, SetProperties, SetLabels, RemoveProperty, RemoveLabels, EdgeUniquenessFilter,
Accumulate, Aggregate, Skip, Limit, OrderBy, Merge, Optional, Unwind, Distinct, Union, Cartesian, CallProcedure,
LoadCsv, Foreach, EmptyResult, EvaluatePatternFilter, Apply, IndexedJoin, HashJoin, RollUpApply>;
using LogicalOperatorLeafVisitor = utils::LeafVisitor<Once>;
@ -590,7 +590,7 @@ class ScanAllByLabel : public memgraph::query::plan::ScanAll {
}
};
class ScanAllByEdgeType : public memgraph::query::plan::LogicalOperator {
class ScanAllByEdgeType : public memgraph::query::plan::ScanAll {
public:
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
@ -610,10 +610,6 @@ class ScanAllByEdgeType : public memgraph::query::plan::LogicalOperator {
return fmt::format("ScanAllByEdgeType ({} :{})", output_symbol_.name(), dba_->EdgeTypeToName(edge_type_));
}
std::shared_ptr<memgraph::query::plan::LogicalOperator> input_;
Symbol output_symbol_;
storage::View view_;
storage::EdgeTypeId edge_type_;
std::unique_ptr<LogicalOperator> Clone(AstStorage *storage) const override {
@ -816,6 +812,35 @@ class ScanAllById : public memgraph::query::plan::ScanAll {
return object;
}
};
class ScanAllByEdgeId : public memgraph::query::plan::ScanAll {
public:
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
ScanAllByEdgeId() = default;
ScanAllByEdgeId(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, Expression *expression,
storage::View view = storage::View::OLD);
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
bool HasSingleInput() const override { return true; }
std::shared_ptr<LogicalOperator> input() const override { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) override { input_ = input; }
std::string ToString() const override { return fmt::format("ScanAllByEdgeId ({})", output_symbol_.name()); }
Expression *expression_;
std::unique_ptr<LogicalOperator> Clone(AstStorage *storage) const override {
auto object = std::make_unique<ScanAllByEdgeId>();
object->input_ = input_ ? input_->Clone(storage) : nullptr;
object->output_symbol_ = output_symbol_;
object->view_ = view_;
object->expression_ = expression_ ? expression_->Clone(storage) : nullptr;
return object;
}
};
struct ExpandCommon {
static const utils::TypeInfo kType;

View File

@ -49,9 +49,13 @@ constexpr utils::TypeInfo query::plan::ScanAllByLabelProperty::kType{
constexpr utils::TypeInfo query::plan::ScanAllById::kType{utils::TypeId::SCAN_ALL_BY_ID, "ScanAllById",
&query::plan::ScanAll::kType};
constexpr utils::TypeInfo query::plan::ScanAllByEdgeType::kType{utils::TypeId::SCAN_ALL_BY_EDGE_TYPE,
"ScanAllByEdgeType", &query::plan::ScanAll::kType};
constexpr utils::TypeInfo query::plan::ScanAllByEdgeId::kType{utils::TypeId::SCAN_ALL_BY_ID, "ScanAllByEdgeId",
&query::plan::ScanAll::kType};
constexpr utils::TypeInfo query::plan::ExpandCommon::kType{utils::TypeId::EXPAND_COMMON, "ExpandCommon", nullptr};
constexpr utils::TypeInfo query::plan::Expand::kType{utils::TypeId::EXPAND, "Expand",

View File

@ -83,6 +83,11 @@ bool PlanPrinter::PreVisit(query::plan::ScanAllByEdgeType &op) {
return true;
}
bool PlanPrinter::PreVisit(query::plan::ScanAllByEdgeId &op) {
WithPrintLn([&op](auto &out) { out << "* " << op.ToString(); });
return true;
}
bool PlanPrinter::PreVisit(query::plan::Expand &op) {
op.dba_ = dba_;
WithPrintLn([&op](auto &out) { out << "* " << op.ToString(); });
@ -496,6 +501,16 @@ bool PlanToJsonVisitor::PreVisit(ScanAllByEdgeType &op) {
return false;
}
bool PlanToJsonVisitor::PreVisit(ScanAllByEdgeId &op) {
json self;
self["name"] = "ScanAllByEdgeId";
self["output_symbol"] = ToJson(op.output_symbol_);
op.input_->Accept(*this);
self["input"] = PopOutput();
output_ = std::move(self);
return false;
}
bool PlanToJsonVisitor::PreVisit(CreateNode &op) {
json self;
self["name"] = "CreateNode";

View File

@ -68,6 +68,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
bool PreVisit(ScanAllByLabelProperty &) override;
bool PreVisit(ScanAllById &) override;
bool PreVisit(ScanAllByEdgeType &) override;
bool PreVisit(ScanAllByEdgeId &) override;
bool PreVisit(Expand &) override;
bool PreVisit(ExpandVariable &) override;
@ -206,6 +207,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor {
bool PreVisit(ScanAllByLabelProperty &) override;
bool PreVisit(ScanAllById &) override;
bool PreVisit(ScanAllByEdgeType &) override;
bool PreVisit(ScanAllByEdgeId &) override;
bool PreVisit(EmptyResult &) override;
bool PreVisit(Produce &) override;

View File

@ -48,11 +48,26 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
bool PreVisit(Filter &op) override {
prev_ops_.push_back(&op);
filters_.CollectFilterExpression(op.expression_, *symbol_table_);
return true;
}
bool PostVisit(Filter & /*op*/) override {
bool PostVisit(Filter &op) override {
prev_ops_.pop_back();
ExpressionRemovalResult removal = RemoveExpressions(op.expression_, filter_exprs_for_removal_);
op.expression_ = removal.trimmed_expression;
if (op.expression_) {
Filters leftover_filters;
leftover_filters.CollectFilterExpression(op.expression_, *symbol_table_);
op.all_filters_ = std::move(leftover_filters);
}
if (!op.expression_) {
SetOnParent(op.input());
}
return true;
}
@ -70,7 +85,7 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
bool PostVisit(ScanAll &op) override {
prev_ops_.pop_back();
if (EdgeTypeIndexingPossible()) {
if (EdgeTypeIndexingPossible() || maybe_id_lookup_value_) {
SetOnParent(op.input());
}
@ -88,6 +103,25 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
edge_type_index_exist = only_one_edge_type ? db_->EdgeTypeIndexExists(op.common_.edge_types.front()) : false;
scanall_under_expand_ = only_one_edge_type && expansion_is_named && expdanded_node_not_named;
const auto &output_symbol = op.common_.edge_symbol;
const auto &modified_symbols = op.ModifiedSymbols(*symbol_table_);
std::unordered_set<Symbol> bound_symbols(modified_symbols.begin(), modified_symbols.end());
auto are_bound = [&bound_symbols](const auto &used_symbols) {
for (const auto &used_symbol : used_symbols) {
if (!utils::Contains(bound_symbols, used_symbol)) {
return false;
}
}
return true;
};
for (const auto &filter : filters_.IdFilters(output_symbol)) {
if (filter.id_filter->is_symbol_in_value_ || !are_bound(filter.used_symbols)) continue;
maybe_id_lookup_value_ = filter.id_filter->value_;
filter_exprs_for_removal_.insert(filter.expression);
filters_.EraseFilter(filter);
}
}
return true;
@ -96,9 +130,10 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
bool PostVisit(Expand &op) override {
prev_ops_.pop_back();
if (EdgeTypeIndexingPossible()) {
if (EdgeTypeIndexingPossible() || maybe_id_lookup_value_) {
auto indexed_scan = GenEdgeTypeScan(op);
SetOnParent(std::move(indexed_scan));
return true;
}
return true;
@ -254,6 +289,15 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(ScanAllByEdgeId &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(ScanAllByEdgeId &) override {
prev_ops_.pop_back();
return true;
}
bool PreVisit(ConstructNamedPath &op) override {
prev_ops_.push_back(&op);
return true;
@ -491,9 +535,12 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
std::vector<LogicalOperator *> prev_ops_;
std::unordered_set<Symbol> cartesian_symbols_;
memgraph::query::Expression *maybe_id_lookup_value_ = nullptr;
bool EdgeTypeIndexingPossible() const {
return expand_under_produce_ && scanall_under_expand_ && once_under_scanall_ && edge_type_index_exist;
}
bool expand_under_produce_ = false;
bool scanall_under_expand_ = false;
bool once_under_scanall_ = false;
@ -503,14 +550,21 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
throw utils::NotYetImplemented("Operator not yet covered by EdgeTypeIndexRewriter");
}
std::unique_ptr<ScanAllByEdgeType> GenEdgeTypeScan(const Expand &expand) {
std::unique_ptr<ScanAll> GenEdgeTypeScan(const Expand &expand) {
const auto &input = expand.input();
const auto &output_symbol = expand.common_.edge_symbol;
const auto &view = expand.view_;
// Extract edge_type from symbol
auto edge_type = expand.common_.edge_types.front();
return std::make_unique<ScanAllByEdgeType>(input, output_symbol, edge_type, view);
if (EdgeTypeIndexingPossible()) {
auto edge_type = expand.common_.edge_types.front();
return std::make_unique<ScanAllByEdgeType>(input, output_symbol, edge_type, view);
}
if (maybe_id_lookup_value_) {
return std::make_unique<ScanAllByEdgeId>(input, output_symbol, maybe_id_lookup_value_, view);
}
LOG_FATAL("Fatal error while rewriting query plan.");
}
void SetOnParent(const std::shared_ptr<LogicalOperator> &input) {

View File

@ -36,6 +36,7 @@ struct SalientConfig {
StorageMode storage_mode{StorageMode::IN_MEMORY_TRANSACTIONAL};
struct Items {
bool properties_on_edges{true};
bool enable_edges_metadata{false};
bool enable_schema_metadata{false};
bool delta_on_identical_property_update{true};
friend bool operator==(const Items &lrh, const Items &rhs) = default;
@ -46,11 +47,13 @@ struct SalientConfig {
inline void to_json(nlohmann::json &data, SalientConfig::Items const &items) {
data = nlohmann::json{{"properties_on_edges", items.properties_on_edges},
{"enable_edges_metadata", items.enable_edges_metadata},
{"enable_schema_metadata", items.enable_schema_metadata}};
}
inline void from_json(const nlohmann::json &data, SalientConfig::Items &items) {
data.at("properties_on_edges").get_to(items.properties_on_edges);
data.at("enable_edges_metadata").get_to(items.enable_edges_metadata);
data.at("enable_schema_metadata").get_to(items.enable_schema_metadata);
}

View File

@ -916,6 +916,10 @@ std::optional<VertexAccessor> DiskStorage::DiskAccessor::FindVertex(storage::Gid
return disk_storage->FindVertex(gid, &transaction_, view);
}
std::optional<EdgeAccessor> DiskStorage::DiskAccessor::FindEdge(storage::Gid gid, View view) {
throw utils::NotYetImplemented("Id based lookup for on-disk storage mode is not yet implemented on edges.");
}
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
DiskStorage::DiskAccessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges,
bool detach) {

View File

@ -72,6 +72,8 @@ class DiskStorage final : public Storage {
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
std::optional<EdgeAccessor> FindEdge(Gid gid, View view) override;
EdgesIterable Edges(EdgeTypeId edge_type, View view) override;
uint64_t ApproximateVertexCount() const override;

View File

@ -300,6 +300,7 @@ std::optional<ParallelizedSchemaCreationInfo> GetParallelExecInfoIndices(const R
std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
utils::SkipList<EdgeMetadata> *edges_metadata,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num) {
@ -334,7 +335,8 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
}
spdlog::info("Starting snapshot recovery from {}.", path);
try {
recovered_snapshot = LoadSnapshot(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
recovered_snapshot =
LoadSnapshot(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count, config);
spdlog::info("Snapshot recovery successful!");
break;
} catch (const RecoveryFailure &e) {

View File

@ -134,6 +134,7 @@ struct Recovery {
/// @throw std::bad_alloc
std::optional<RecoveryInfo> RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
utils::SkipList<EdgeMetadata> *edges_metadata,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num);

View File

@ -427,6 +427,7 @@ struct LoadPartialConnectivityResult {
template <typename TEdgeTypeFromIdFunc>
LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::path &path,
utils::SkipList<Vertex> &vertices, utils::SkipList<Edge> &edges,
utils::SkipList<EdgeMetadata> &edges_metadata,
const uint64_t from_offset, const uint64_t vertices_count,
const SalientConfig::Items items, const bool snapshot_has_edges,
TEdgeTypeFromIdFunc get_edge_type_from_id) {
@ -437,6 +438,7 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat
auto vertex_acc = vertices.access();
auto edge_acc = edges.access();
auto edge_metadata_acc = edges_metadata.access();
// Read the first gid to find the necessary iterator in vertices
const auto first_vertex_gid = std::invoke([&]() mutable {
@ -580,6 +582,9 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat
auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr});
edge_ref = EdgeRef(&*edge);
}
if (items.enable_edges_metadata) {
edge_metadata_acc.insert(EdgeMetadata{Gid::FromUint(*edge_gid), &vertex});
}
}
vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), &*to_vertex, edge_ref);
// Increment edge count. We only increment the count here because the
@ -626,7 +631,7 @@ void RecoverOnMultipleThreads(size_t thread_count, const TFunc &func, const std:
}
RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
SalientConfig::Items items) {
@ -644,6 +649,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
if (!success) {
edges->clear();
vertices->clear();
edges_metadata->clear();
epoch_history->clear();
}
});
@ -1098,7 +1104,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
}
RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
const Config &config) {
@ -1118,6 +1124,7 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
if (!success) {
edges->clear();
vertices->clear();
edges_metadata->clear();
epoch_history->clear();
}
});
@ -1226,10 +1233,10 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
RecoverOnMultipleThreads(
config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id,
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
snapshot_has_edges, get_edge_type_from_id);
[path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
&get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < result.highest_edge_id) {
@ -1387,7 +1394,7 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
}
RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
const Config &config) {
@ -1407,6 +1414,7 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
if (!success) {
edges->clear();
vertices->clear();
edges_metadata->clear();
epoch_history->clear();
}
});
@ -1515,10 +1523,10 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
RecoverOnMultipleThreads(
config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id,
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
snapshot_has_edges, get_edge_type_from_id);
[path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
&get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < result.highest_edge_id) {
@ -1730,7 +1738,7 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
}
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config) {
RecoveryInfo recovery_info;
@ -1742,14 +1750,16 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
if (!IsVersionSupported(*version)) throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version));
if (*version == 14U) {
return LoadSnapshotVersion14(path, vertices, edges, epoch_history, name_id_mapper, edge_count,
return LoadSnapshotVersion14(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
config.salient.items);
}
if (*version == 15U) {
return LoadSnapshotVersion15(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
return LoadSnapshotVersion15(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
config);
}
if (*version == 16U) {
return LoadSnapshotVersion16(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
return LoadSnapshotVersion16(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
config);
}
// Cleanup of loaded data in case of failure.
@ -1758,6 +1768,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
if (!success) {
edges->clear();
vertices->clear();
edges_metadata->clear();
epoch_history->clear();
}
});
@ -1866,10 +1877,10 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
RecoverOnMultipleThreads(
config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id,
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
snapshot_has_edges, get_edge_type_from_id);
[path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
&get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < result.highest_edge_id) {

View File

@ -64,7 +64,7 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path);
/// Function used to load the snapshot data into the storage.
/// @throw RecoveryFailure
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config);

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -49,4 +49,18 @@ inline bool operator<(const Edge &first, const Edge &second) { return first.gid
inline bool operator==(const Edge &first, const Gid &second) { return first.gid == second; }
inline bool operator<(const Edge &first, const Gid &second) { return first.gid < second; }
struct EdgeMetadata {
EdgeMetadata(Gid gid, Vertex *from_vertex) : gid(gid), from_vertex(from_vertex) {}
Gid gid;
Vertex *from_vertex;
};
static_assert(alignof(Edge) >= 8, "The Edge should be aligned to at least 8!");
inline bool operator==(const EdgeMetadata &first, const EdgeMetadata &second) { return first.gid == second.gid; }
inline bool operator<(const EdgeMetadata &first, const EdgeMetadata &second) { return first.gid < second.gid; }
inline bool operator==(const EdgeMetadata &first, const Gid &second) { return first.gid == second; }
inline bool operator<(const EdgeMetadata &first, const Gid &second) { return first.gid < second; }
} // namespace memgraph::storage

View File

@ -16,6 +16,7 @@
#include <tuple>
#include "storage/v2/delta.hpp"
#include "storage/v2/edge_info_helpers.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/property_store.hpp"
#include "storage/v2/property_value.hpp"
@ -26,6 +27,15 @@
#include "utils/memory_tracker.hpp"
namespace memgraph::storage {
std::optional<EdgeAccessor> EdgeAccessor::Create(EdgeRef edge, EdgeTypeId edge_type, Vertex *from_vertex,
Vertex *to_vertex, Storage *storage, Transaction *transaction,
View view, bool for_deleted) {
if (!IsEdgeVisible(edge.ptr, transaction, view)) {
return std::nullopt;
}
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage, transaction, for_deleted);
}
bool EdgeAccessor::IsDeleted() const {
if (!storage_->config_.salient.items.properties_on_edges) {

View File

@ -44,6 +44,10 @@ class EdgeAccessor final {
transaction_(transaction),
for_deleted_(for_deleted) {}
static std::optional<EdgeAccessor> Create(EdgeRef edge, EdgeTypeId edge_type, Vertex *from_vertex, Vertex *to_vertex,
Storage *storage, Transaction *transaction, View view,
bool for_deleted = false);
bool IsDeleted() const;
/// @return true if the object is visible from the current transaction

View File

@ -0,0 +1,56 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "storage/v2/delta.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/view.hpp"
#include <shared_mutex>
namespace memgraph::storage {
inline bool IsEdgeVisible(Edge *edge, const Transaction *transaction, View view) {
bool exists = true;
bool deleted = true;
Delta *delta = nullptr;
{
auto guard = std::shared_lock{edge->lock};
deleted = edge->deleted;
delta = edge->delta;
}
ApplyDeltasForRead(transaction, delta, view, [&](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
}
});
return exists && !deleted;
}
} // namespace memgraph::storage

View File

@ -12,6 +12,7 @@
#include "storage/v2/inmemory/edge_type_index.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/edge_info_helpers.hpp"
#include "storage/v2/indices/indices_utils.hpp"
#include "utils/counter.hpp"
@ -25,39 +26,6 @@ using EdgeTypeId = memgraph::storage::EdgeTypeId;
using Transaction = memgraph::storage::Transaction;
using View = memgraph::storage::View;
bool IsIndexEntryVisible(Edge *edge, const Transaction *transaction, View view) {
bool exists = true;
bool deleted = true;
Delta *delta = nullptr;
{
auto guard = std::shared_lock{edge->lock};
deleted = edge->deleted;
delta = edge->delta;
}
ApplyDeltasForRead(transaction, delta, view, [&](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
}
});
return exists && !deleted;
}
using ReturnType = std::optional<std::tuple<EdgeTypeId, Vertex *, EdgeRef>>;
ReturnType VertexDeletedConnectedEdges(Vertex *vertex, Edge *edge, const Transaction *transaction, View view) {
ReturnType link;
@ -242,7 +210,7 @@ void InMemoryEdgeTypeIndex::Iterable::Iterator::AdvanceUntilValid() {
auto *from_vertex = index_iterator_->from_vertex;
auto *to_vertex = index_iterator_->to_vertex;
if (!IsIndexEntryVisible(index_iterator_->edge, self_->transaction_, self_->view_) || from_vertex->deleted ||
if (!IsEdgeVisible(index_iterator_->edge, self_->transaction_, self_->view_) || from_vertex->deleted ||
to_vertex->deleted) {
continue;
}

View File

@ -104,7 +104,7 @@ InMemoryStorage::InMemoryStorage(Config config)
config_.durability.storage_directory);
}
if (config_.durability.recover_on_startup) {
auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edge_count_,
auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edges_metadata_, &edge_count_,
name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_);
if (info) {
vertex_id_ = info->next_vertex_id;
@ -341,6 +341,11 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
if (delta) {
delta->prev.Set(&*it);
}
if (config_.enable_edges_metadata) {
auto acc = mem_storage->edges_metadata_.access();
auto [_, inserted] = acc.insert(EdgeMetadata(gid, from->vertex_));
MG_ASSERT(inserted, "The edge must be inserted here!");
}
}
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
@ -443,6 +448,11 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
if (delta) {
delta->prev.Set(&*it);
}
if (config_.enable_edges_metadata) {
auto acc = mem_storage->edges_metadata_.access();
auto [_, inserted] = acc.insert(EdgeMetadata(gid, from->vertex_));
MG_ASSERT(inserted, "The edge must be inserted here!");
}
}
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
@ -464,6 +474,15 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
}
void InMemoryStorage::UpdateEdgesMetadataOnModification(Edge *edge, Vertex *from_vertex) {
auto edge_metadata_acc = edges_metadata_.access();
auto edge_to_modify = edge_metadata_acc.find(edge->gid);
if (edge_to_modify == edge_metadata_acc.end()) {
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
}
edge_to_modify->from_vertex = from_vertex;
}
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) {
MG_ASSERT(edge->transaction_ == new_from->transaction_,
"EdgeAccessor must be from the same transaction as the new from vertex "
@ -564,6 +583,10 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor
mem_edge_type_index->UpdateOnEdgeModification(old_from_vertex, to_vertex, new_from_vertex, to_vertex, edge_ref,
edge_type, transaction_);
if (config_.enable_edges_metadata) {
in_memory->UpdateEdgesMetadataOnModification(edge_ref.ptr, new_from_vertex);
}
transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
@ -987,8 +1010,10 @@ void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_acti
if (!current_deleted_edges.empty()) {
// 3.c) remove from edge skip_list
auto edge_acc = mem_storage->edges_.access();
auto edge_metadata_acc = mem_storage->edges_metadata_.access();
for (auto gid : current_deleted_edges) {
edge_acc.remove(gid);
edge_metadata_acc.remove(gid);
}
}
}
@ -1234,8 +1259,10 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
// EDGES
{
auto edges_acc = mem_storage->edges_.access();
auto edges_metadata_acc = mem_storage->edges_metadata_.access();
for (auto gid : my_deleted_edges) {
edges_acc.remove(gid);
edges_metadata_acc.remove(gid);
}
}
}
@ -1438,6 +1465,62 @@ EdgesIterable InMemoryStorage::InMemoryAccessor::Edges(EdgeTypeId edge_type, Vie
return EdgesIterable(mem_edge_type_index->Edges(edge_type, view, storage_, &transaction_));
}
std::optional<EdgeAccessor> InMemoryStorage::InMemoryAccessor::FindEdge(Gid gid, View view) {
using EdgeInfo = std::optional<std::tuple<EdgeRef, EdgeTypeId, Vertex *, Vertex *>>;
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
auto edge_acc = mem_storage->edges_.access();
auto edge_it = edge_acc.find(gid);
if (edge_it == edge_acc.end()) {
return std::nullopt;
}
auto *edge_ptr = &(*edge_it);
auto vertices_acc = mem_storage->vertices_.access();
auto extract_edge_info = [&](Vertex *from_vertex) -> EdgeInfo {
for (auto &out_edge : from_vertex->out_edges) {
if (std::get<2>(out_edge).ptr == edge_ptr) {
return std::tuple(std::get<2>(out_edge), std::get<0>(out_edge), from_vertex, std::get<1>(out_edge));
}
}
return std::nullopt;
};
auto edge_accessor_from_info = [this, view](EdgeInfo &maybe_edge_info) -> std::optional<EdgeAccessor> {
if (!maybe_edge_info) {
return std::nullopt;
}
auto &edge_info = *maybe_edge_info;
return EdgeAccessor::Create(std::get<0>(edge_info), std::get<1>(edge_info), std::get<2>(edge_info),
std::get<3>(edge_info), storage_, &transaction_, view);
};
if (mem_storage->config_.salient.items.enable_edges_metadata) {
auto edge_metadata_acc = mem_storage->edges_metadata_.access();
auto edge_metadata_it = edge_metadata_acc.find(gid);
MG_ASSERT(edge_metadata_it != edge_metadata_acc.end(), "Invalid database state!");
auto maybe_edge_info = extract_edge_info(edge_metadata_it->from_vertex);
return edge_accessor_from_info(maybe_edge_info);
}
// If metadata on edges is not enabled we will have to do
// a full scan.
auto maybe_edge_info = std::invoke([&]() -> EdgeInfo {
for (auto &from_vertex : vertices_acc) {
auto maybe_edge_info = extract_edge_info(&from_vertex);
if (maybe_edge_info) {
return maybe_edge_info;
}
}
return std::nullopt;
});
return edge_accessor_from_info(maybe_edge_info);
}
Transaction InMemoryStorage::CreateTransaction(
IsolationLevel isolation_level, StorageMode storage_mode,
memgraph::replication_coordination_glue::ReplicationRole replication_role) {
@ -1786,8 +1869,12 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
}
{
auto edge_acc = edges_.access();
auto edge_metadata_acc = edges_metadata_.access();
for (auto edge : current_deleted_edges) {
MG_ASSERT(edge_acc.remove(edge), "Invalid database state!");
if (config_.salient.items.enable_edges_metadata) {
MG_ASSERT(edge_metadata_acc.remove(edge), "Invalid database state!");
}
}
}
@ -1809,10 +1896,12 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
// EXPENSIVE full scan, is only run if an IN_MEMORY_ANALYTICAL transaction involved any deletions
if (need_full_scan_edges) {
auto edge_acc = edges_.access();
auto edge_metadata_acc = edges_metadata_.access();
for (auto &edge : edge_acc) {
// a deleted edge which as no deltas must have come from IN_MEMORY_ANALYTICAL deletion
if (edge.delta == nullptr && edge.deleted) {
edge_acc.remove(edge);
edge_metadata_acc.remove(edge.gid);
}
}
}

View File

@ -109,6 +109,8 @@ class InMemoryStorage final : public Storage {
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
std::optional<EdgeAccessor> FindEdge(Gid gid, View view) override;
EdgesIterable Edges(EdgeTypeId edge_type, View view) override;
/// Return approximate number of all vertices in the database.
@ -424,9 +426,12 @@ class InMemoryStorage final : public Storage {
void PrepareForNewEpoch() override;
void UpdateEdgesMetadataOnModification(Edge *edge, Vertex *from_vertex);
// Main object storage
utils::SkipList<storage::Vertex> vertices_;
utils::SkipList<storage::Edge> edges_;
utils::SkipList<storage::EdgeMetadata> edges_metadata_;
// Durability
durability::Recovery recovery_;

View File

@ -179,6 +179,8 @@ class Storage {
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) = 0;
virtual std::optional<EdgeAccessor> FindEdge(Gid gid, View view) = 0;
virtual EdgesIterable Edges(EdgeTypeId edge_type, View view) = 0;
virtual Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex);

View File

@ -27,6 +27,7 @@
M(ScanAllByLabelPropertyOperator, Operator, "Number of times ScanAllByLabelProperty operator was used.") \
M(ScanAllByIdOperator, Operator, "Number of times ScanAllById operator was used.") \
M(ScanAllByEdgeTypeOperator, Operator, "Number of times ScanAllByEdgeTypeOperator operator was used.") \
M(ScanAllByEdgeIdOperator, Operator, "Number of times ScanAllByEdgeIdOperator operator was used.") \
M(ExpandOperator, Operator, "Number of times Expand operator was used.") \
M(ExpandVariableOperator, Operator, "Number of times ExpandVariable operator was used.") \
M(ConstructNamedPathOperator, Operator, "Number of times ConstructNamedPath operator was used.") \

View File

@ -124,6 +124,11 @@ startup_config_dict = {
"false",
"Controls whether metadata should be collected about the resident labels and edge types.",
),
"storage_enable_edges_metadata": (
"false",
"false",
"Controls whether additional metadata should be stored about the edges in order to do faster traversals on certain queries.",
),
"password_encryption_algorithm": ("bcrypt", "bcrypt", "The password encryption algorithm used for authentication."),
"pulsar_service_url": ("", "", "Default URL used while connecting to Pulsar brokers."),
"query_execution_timeout_sec": (

View File

@ -1704,6 +1704,13 @@ TYPED_TEST(TestPlanner, ScanAllById) {
CheckPlan<TypeParam>(query, this->storage, ExpectScanAllById(), ExpectProduce());
}
TYPED_TEST(TestPlanner, ScanAllByEdgeId) {
// Test MATCH ()-[r]->() WHERE id(r) = 42 RETURN r
auto *query = QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("anon1"), EDGE("r"), NODE("anon2"))),
WHERE(EQ(FN("id", IDENT("r")), LITERAL(42))), RETURN("r")));
CheckPlan<TypeParam>(query, this->storage, ExpectScanAllByEdgeId(), ExpectProduce());
}
TYPED_TEST(TestPlanner, BfsToExisting) {
// Test MATCH (n)-[r *bfs]-(m) WHERE id(m) = 42 RETURN r
auto *bfs = this->storage.template Create<memgraph::query::EdgeAtom>(

View File

@ -66,6 +66,7 @@ class PlanChecker : public virtual HierarchicalLogicalOperatorVisitor {
PRE_VISIT(ScanAllByLabelPropertyRange);
PRE_VISIT(ScanAllByLabelProperty);
PRE_VISIT(ScanAllByEdgeType);
PRE_VISIT(ScanAllByEdgeId);
PRE_VISIT(ScanAllById);
PRE_VISIT(Expand);
PRE_VISIT(ExpandVariable);
@ -172,6 +173,7 @@ using ExpectDelete = OpChecker<Delete>;
using ExpectScanAll = OpChecker<ScanAll>;
using ExpectScanAllByLabel = OpChecker<ScanAllByLabel>;
using ExpectScanAllByEdgeType = OpChecker<ScanAllByEdgeType>;
using ExpectScanAllByEdgeId = OpChecker<ScanAllByEdgeId>;
using ExpectScanAllById = OpChecker<ScanAllById>;
using ExpectExpand = OpChecker<Expand>;
using ExpectConstructNamedPath = OpChecker<ConstructNamedPath>;

View File

@ -2965,6 +2965,7 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) {
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::utils::SkipList<memgraph::storage::Vertex> vertices;
memgraph::utils::SkipList<memgraph::storage::Edge> edges;
memgraph::utils::SkipList<memgraph::storage::EdgeMetadata> edges_metadata;
std::unique_ptr<memgraph::storage::NameIdMapper> name_id_mapper = std::make_unique<memgraph::storage::NameIdMapper>();
std::atomic<uint64_t> edge_count{0};
uint64_t wal_seq_num{0};
@ -2978,7 +2979,7 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) {
config.durability.storage_directory / memgraph::storage::durability::kWalDirectory};
// Recover snapshot.
const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edge_count,
const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edges_metadata, &edge_count,
name_id_mapper.get(), &indices, &constraints, config, &wal_seq_num);
MG_ASSERT(info.has_value(), "Info doesn't have value present");
@ -3044,3 +3045,61 @@ TEST_P(DurabilityTest, EdgeTypeIndexRecovered) {
ASSERT_FALSE(acc->Commit().HasError());
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(DurabilityTest, EdgeMetadataRecovered) {
if (GetParam() == false) {
return;
}
// Create snapshot.
{
memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam()},
.durability = {.storage_directory = storage_directory, .snapshot_on_exit = true}};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::dbms::Database db{config, repl_state};
CreateBaseDataset(db.storage(), GetParam());
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
}
ASSERT_EQ(GetSnapshotsList().size(), 1);
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
ASSERT_EQ(GetWalsList().size(), 0);
ASSERT_EQ(GetBackupWalsList().size(), 0);
// Recover snapshot.
memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam(), .enable_edges_metadata = true},
.durability = {.storage_directory = storage_directory, .recover_on_startup = true}};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::dbms::Database db{config, repl_state};
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
// Check if data has been loaded correctly.
{
auto acc = db.Access();
for (auto i{0U}; i < kNumBaseEdges; ++i) {
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(i), memgraph::storage::View::OLD);
ASSERT_TRUE(edge.has_value());
}
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges), memgraph::storage::View::OLD);
ASSERT_FALSE(edge.has_value());
auto vertex = acc->CreateVertex();
auto new_edge = acc->CreateEdge(&vertex, &vertex, db.storage()->NameToEdgeType("et"));
ASSERT_TRUE(new_edge.HasValue());
ASSERT_FALSE(acc->Commit().HasError());
}
{
auto acc = db.Access();
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges), memgraph::storage::View::OLD);
ASSERT_TRUE(edge.has_value());
edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges + 1), memgraph::storage::View::OLD);
ASSERT_FALSE(edge.has_value());
ASSERT_FALSE(acc->Commit().HasError());
}
}