Compare commits
No commits in common. "Fetch-edge-by-edge-id" and "master" have entirely different histories.
Fetch-edge
...
master
@ -119,10 +119,6 @@ modifications:
|
||||
value: "false"
|
||||
override: true
|
||||
|
||||
- name: "storage_enable_edges_metadata"
|
||||
value: "false"
|
||||
override: true
|
||||
|
||||
- name: "query_callable_mappings_path"
|
||||
value: "/etc/memgraph/apoc_compatibility_mappings.json"
|
||||
override: true
|
||||
|
@ -284,8 +284,8 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
|
||||
try {
|
||||
spdlog::debug("Loading snapshot");
|
||||
auto recovered_snapshot = storage::durability::LoadSnapshot(
|
||||
*maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->edges_metadata_,
|
||||
&storage->repl_storage_state_.history, storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_);
|
||||
*maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->repl_storage_state_.history,
|
||||
storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_);
|
||||
spdlog::debug("Snapshot loaded successfully");
|
||||
// If this step is present it should always be the first step of
|
||||
// the recovery so we use the UUID we read from snasphost
|
||||
|
@ -131,11 +131,6 @@ DEFINE_uint64(storage_recovery_thread_count,
|
||||
DEFINE_bool(storage_enable_schema_metadata, false,
|
||||
"Controls whether metadata should be collected about the resident labels and edge types.");
|
||||
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_bool(storage_enable_edges_metadata, false,
|
||||
"Controls whether additional metadata should be stored about the edges in order to do faster traversals on "
|
||||
"certain queries.");
|
||||
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_bool(storage_delta_on_identical_property_update, true,
|
||||
"Controls whether updating a property with the same value should create a delta object.");
|
||||
|
@ -85,8 +85,6 @@ DECLARE_uint64(storage_recovery_thread_count);
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_bool(storage_enable_schema_metadata);
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_bool(storage_enable_edges_metadata);
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_bool(storage_delta_on_identical_property_update);
|
||||
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
|
@ -332,16 +332,9 @@ int main(int argc, char **argv) {
|
||||
.durability_directory = FLAGS_data_directory + "/rocksdb_durability",
|
||||
.wal_directory = FLAGS_data_directory + "/rocksdb_wal"},
|
||||
.salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges,
|
||||
.enable_edges_metadata =
|
||||
FLAGS_storage_properties_on_edges ? FLAGS_storage_enable_edges_metadata : false,
|
||||
.enable_schema_metadata = FLAGS_storage_enable_schema_metadata,
|
||||
.delta_on_identical_property_update = FLAGS_storage_delta_on_identical_property_update},
|
||||
.salient.storage_mode = memgraph::flags::ParseStorageMode()};
|
||||
if (!FLAGS_storage_properties_on_edges && FLAGS_storage_enable_edges_metadata) {
|
||||
spdlog::warn(
|
||||
"Properties on edges were not enabled, hence edges metadata will also be disabled. If you wish to utilize "
|
||||
"extra metadata on edges, enable properties on edges as well.");
|
||||
}
|
||||
spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup,
|
||||
FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup);
|
||||
memgraph::utils::Scheduler jemalloc_purge_scheduler;
|
||||
|
@ -716,7 +716,8 @@ int main(int argc, char *argv[]) {
|
||||
.recover_on_startup = false,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::DISABLED,
|
||||
.snapshot_on_exit = true},
|
||||
.salient = {.items = {.properties_on_edges = FLAGS_storage_properties_on_edges}}};
|
||||
.salient = {.items = {.properties_on_edges = FLAGS_storage_properties_on_edges}},
|
||||
};
|
||||
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
|
||||
auto store = memgraph::dbms::CreateInMemoryStorage(config, repl_state);
|
||||
|
||||
|
@ -439,12 +439,6 @@ class DbAccessor final {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<EdgeAccessor> FindEdge(storage::Gid gid, storage::View view) {
|
||||
auto maybe_edge = accessor_->FindEdge(gid, view);
|
||||
if (maybe_edge) return EdgeAccessor(*maybe_edge);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void FinalizeTransaction() { accessor_->FinalizeTransaction(); }
|
||||
|
||||
void TrackCurrentThreadAllocations() {
|
||||
@ -819,8 +813,6 @@ class SubgraphDbAccessor final {
|
||||
|
||||
std::optional<VertexAccessor> FindVertex(storage::Gid gid, storage::View view);
|
||||
|
||||
std::optional<EdgeAccessor> FindEdge(storage::Gid gid, storage::View view);
|
||||
|
||||
Graph *getGraph();
|
||||
|
||||
storage::StorageMode GetStorageMode() const noexcept;
|
||||
|
@ -117,9 +117,6 @@ class PlanHintsProvider final : public HierarchicalLogicalOperatorVisitor {
|
||||
bool PreVisit(ScanAllByEdgeType & /*unused*/) override { return true; }
|
||||
bool PostVisit(ScanAllByEdgeType & /*unused*/) override { return true; }
|
||||
|
||||
bool PreVisit(ScanAllByEdgeId & /*unused*/) override { return true; }
|
||||
bool PostVisit(ScanAllByEdgeId & /*unused*/) override { return true; }
|
||||
|
||||
bool PreVisit(ConstructNamedPath & /*unused*/) override { return true; }
|
||||
bool PostVisit(ConstructNamedPath & /*unused*/) override { return true; }
|
||||
|
||||
|
@ -110,7 +110,6 @@ extern const Event ScanAllByLabelPropertyValueOperator;
|
||||
extern const Event ScanAllByLabelPropertyOperator;
|
||||
extern const Event ScanAllByIdOperator;
|
||||
extern const Event ScanAllByEdgeTypeOperator;
|
||||
extern const Event ScanAllByEdgeIdOperator;
|
||||
extern const Event ExpandOperator;
|
||||
extern const Event ExpandVariableOperator;
|
||||
extern const Event ConstructNamedPathOperator;
|
||||
@ -545,7 +544,7 @@ class ScanAllCursor : public Cursor {
|
||||
template <typename TEdgesFun>
|
||||
class ScanAllByEdgeTypeCursor : public Cursor {
|
||||
public:
|
||||
explicit ScanAllByEdgeTypeCursor(const ScanAll &self, Symbol output_symbol, UniqueCursorPtr input_cursor,
|
||||
explicit ScanAllByEdgeTypeCursor(const ScanAllByEdgeType &self, Symbol output_symbol, UniqueCursorPtr input_cursor,
|
||||
storage::View view, TEdgesFun get_edges, const char *op_name)
|
||||
: self_(self),
|
||||
output_symbol_(std::move(output_symbol)),
|
||||
@ -585,7 +584,7 @@ class ScanAllByEdgeTypeCursor : public Cursor {
|
||||
}
|
||||
|
||||
private:
|
||||
const ScanAll &self_;
|
||||
const ScanAllByEdgeType &self_;
|
||||
const Symbol output_symbol_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
storage::View view_;
|
||||
@ -637,7 +636,10 @@ UniqueCursorPtr ScanAllByLabel::MakeCursor(utils::MemoryResource *mem) const {
|
||||
|
||||
ScanAllByEdgeType::ScanAllByEdgeType(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol,
|
||||
storage::EdgeTypeId edge_type, storage::View view)
|
||||
: ScanAll(input, output_symbol, view), edge_type_(edge_type) {}
|
||||
: input_(input ? input : std::make_shared<Once>()),
|
||||
output_symbol_(std::move(output_symbol)),
|
||||
view_(view),
|
||||
edge_type_(edge_type) {}
|
||||
|
||||
ACCEPT_WITH_INPUT(ScanAllByEdgeType)
|
||||
|
||||
@ -803,38 +805,6 @@ UniqueCursorPtr ScanAllById::MakeCursor(utils::MemoryResource *mem) const {
|
||||
view_, std::move(vertices), "ScanAllById");
|
||||
}
|
||||
|
||||
ScanAllByEdgeId::ScanAllByEdgeId(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol,
|
||||
Expression *expression, storage::View view)
|
||||
: ScanAll(input, output_symbol, view), expression_(expression) {
|
||||
MG_ASSERT(expression);
|
||||
}
|
||||
|
||||
ACCEPT_WITH_INPUT(ScanAllByEdgeId)
|
||||
|
||||
UniqueCursorPtr ScanAllByEdgeId::MakeCursor(utils::MemoryResource *mem) const {
|
||||
memgraph::metrics::IncrementCounter(memgraph::metrics::ScanAllByEdgeIdOperator);
|
||||
|
||||
auto edges = [this](Frame &frame, ExecutionContext &context) -> std::optional<std::vector<EdgeAccessor>> {
|
||||
auto *db = context.db_accessor;
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, view_);
|
||||
auto value = expression_->Accept(evaluator);
|
||||
if (!value.IsNumeric()) return std::nullopt;
|
||||
int64_t id = value.IsInt() ? value.ValueInt() : value.ValueDouble();
|
||||
if (value.IsDouble() && id != value.ValueDouble()) return std::nullopt;
|
||||
auto maybe_edge = db->FindEdge(storage::Gid::FromInt(id), view_);
|
||||
if (!maybe_edge) return std::nullopt;
|
||||
return std::vector<EdgeAccessor>{*maybe_edge};
|
||||
};
|
||||
return MakeUniqueCursorPtr<ScanAllByEdgeTypeCursor<decltype(edges)>>(
|
||||
mem, *this, output_symbol_, input_->MakeCursor(mem), view_, std::move(edges), "ScanAllByEdgeId");
|
||||
}
|
||||
|
||||
std::vector<Symbol> ScanAllByEdgeId::ModifiedSymbols(const SymbolTable &table) const {
|
||||
auto symbols = input_->ModifiedSymbols(table);
|
||||
symbols.emplace_back(output_symbol_);
|
||||
return symbols;
|
||||
}
|
||||
|
||||
namespace {
|
||||
bool CheckExistingNode(const VertexAccessor &new_node, const Symbol &existing_node_sym, Frame &frame) {
|
||||
const TypedValue &existing_node = frame[existing_node_sym];
|
||||
|
@ -95,7 +95,6 @@ class ScanAllByLabelPropertyValue;
|
||||
class ScanAllByLabelProperty;
|
||||
class ScanAllById;
|
||||
class ScanAllByEdgeType;
|
||||
class ScanAllByEdgeId;
|
||||
class Expand;
|
||||
class ExpandVariable;
|
||||
class ConstructNamedPath;
|
||||
@ -129,12 +128,13 @@ class IndexedJoin;
|
||||
class HashJoin;
|
||||
class RollUpApply;
|
||||
|
||||
using LogicalOperatorCompositeVisitor = utils::CompositeVisitor<
|
||||
Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel, ScanAllByLabelPropertyRange, ScanAllByLabelPropertyValue,
|
||||
ScanAllByLabelProperty, ScanAllById, ScanAllByEdgeType, ScanAllByEdgeId, Expand, ExpandVariable, ConstructNamedPath,
|
||||
Filter, Produce, Delete, SetProperty, SetProperties, SetLabels, RemoveProperty, RemoveLabels, EdgeUniquenessFilter,
|
||||
Accumulate, Aggregate, Skip, Limit, OrderBy, Merge, Optional, Unwind, Distinct, Union, Cartesian, CallProcedure,
|
||||
LoadCsv, Foreach, EmptyResult, EvaluatePatternFilter, Apply, IndexedJoin, HashJoin, RollUpApply>;
|
||||
using LogicalOperatorCompositeVisitor =
|
||||
utils::CompositeVisitor<Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel, ScanAllByLabelPropertyRange,
|
||||
ScanAllByLabelPropertyValue, ScanAllByLabelProperty, ScanAllById, ScanAllByEdgeType, Expand,
|
||||
ExpandVariable, ConstructNamedPath, Filter, Produce, Delete, SetProperty, SetProperties,
|
||||
SetLabels, RemoveProperty, RemoveLabels, EdgeUniquenessFilter, Accumulate, Aggregate, Skip,
|
||||
Limit, OrderBy, Merge, Optional, Unwind, Distinct, Union, Cartesian, CallProcedure, LoadCsv,
|
||||
Foreach, EmptyResult, EvaluatePatternFilter, Apply, IndexedJoin, HashJoin, RollUpApply>;
|
||||
|
||||
using LogicalOperatorLeafVisitor = utils::LeafVisitor<Once>;
|
||||
|
||||
@ -590,7 +590,7 @@ class ScanAllByLabel : public memgraph::query::plan::ScanAll {
|
||||
}
|
||||
};
|
||||
|
||||
class ScanAllByEdgeType : public memgraph::query::plan::ScanAll {
|
||||
class ScanAllByEdgeType : public memgraph::query::plan::LogicalOperator {
|
||||
public:
|
||||
static const utils::TypeInfo kType;
|
||||
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
|
||||
@ -610,6 +610,10 @@ class ScanAllByEdgeType : public memgraph::query::plan::ScanAll {
|
||||
return fmt::format("ScanAllByEdgeType ({} :{})", output_symbol_.name(), dba_->EdgeTypeToName(edge_type_));
|
||||
}
|
||||
|
||||
std::shared_ptr<memgraph::query::plan::LogicalOperator> input_;
|
||||
Symbol output_symbol_;
|
||||
storage::View view_;
|
||||
|
||||
storage::EdgeTypeId edge_type_;
|
||||
|
||||
std::unique_ptr<LogicalOperator> Clone(AstStorage *storage) const override {
|
||||
@ -812,35 +816,6 @@ class ScanAllById : public memgraph::query::plan::ScanAll {
|
||||
return object;
|
||||
}
|
||||
};
|
||||
class ScanAllByEdgeId : public memgraph::query::plan::ScanAll {
|
||||
public:
|
||||
static const utils::TypeInfo kType;
|
||||
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
|
||||
|
||||
ScanAllByEdgeId() = default;
|
||||
ScanAllByEdgeId(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, Expression *expression,
|
||||
storage::View view = storage::View::OLD);
|
||||
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
|
||||
UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override;
|
||||
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
|
||||
|
||||
bool HasSingleInput() const override { return true; }
|
||||
std::shared_ptr<LogicalOperator> input() const override { return input_; }
|
||||
void set_input(std::shared_ptr<LogicalOperator> input) override { input_ = input; }
|
||||
|
||||
std::string ToString() const override { return fmt::format("ScanAllByEdgeId ({})", output_symbol_.name()); }
|
||||
|
||||
Expression *expression_;
|
||||
|
||||
std::unique_ptr<LogicalOperator> Clone(AstStorage *storage) const override {
|
||||
auto object = std::make_unique<ScanAllByEdgeId>();
|
||||
object->input_ = input_ ? input_->Clone(storage) : nullptr;
|
||||
object->output_symbol_ = output_symbol_;
|
||||
object->view_ = view_;
|
||||
object->expression_ = expression_ ? expression_->Clone(storage) : nullptr;
|
||||
return object;
|
||||
}
|
||||
};
|
||||
|
||||
struct ExpandCommon {
|
||||
static const utils::TypeInfo kType;
|
||||
|
@ -49,13 +49,9 @@ constexpr utils::TypeInfo query::plan::ScanAllByLabelProperty::kType{
|
||||
|
||||
constexpr utils::TypeInfo query::plan::ScanAllById::kType{utils::TypeId::SCAN_ALL_BY_ID, "ScanAllById",
|
||||
&query::plan::ScanAll::kType};
|
||||
|
||||
constexpr utils::TypeInfo query::plan::ScanAllByEdgeType::kType{utils::TypeId::SCAN_ALL_BY_EDGE_TYPE,
|
||||
"ScanAllByEdgeType", &query::plan::ScanAll::kType};
|
||||
|
||||
constexpr utils::TypeInfo query::plan::ScanAllByEdgeId::kType{utils::TypeId::SCAN_ALL_BY_ID, "ScanAllByEdgeId",
|
||||
&query::plan::ScanAll::kType};
|
||||
|
||||
constexpr utils::TypeInfo query::plan::ExpandCommon::kType{utils::TypeId::EXPAND_COMMON, "ExpandCommon", nullptr};
|
||||
|
||||
constexpr utils::TypeInfo query::plan::Expand::kType{utils::TypeId::EXPAND, "Expand",
|
||||
|
@ -83,11 +83,6 @@ bool PlanPrinter::PreVisit(query::plan::ScanAllByEdgeType &op) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PlanPrinter::PreVisit(query::plan::ScanAllByEdgeId &op) {
|
||||
WithPrintLn([&op](auto &out) { out << "* " << op.ToString(); });
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PlanPrinter::PreVisit(query::plan::Expand &op) {
|
||||
op.dba_ = dba_;
|
||||
WithPrintLn([&op](auto &out) { out << "* " << op.ToString(); });
|
||||
@ -501,16 +496,6 @@ bool PlanToJsonVisitor::PreVisit(ScanAllByEdgeType &op) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PlanToJsonVisitor::PreVisit(ScanAllByEdgeId &op) {
|
||||
json self;
|
||||
self["name"] = "ScanAllByEdgeId";
|
||||
self["output_symbol"] = ToJson(op.output_symbol_);
|
||||
op.input_->Accept(*this);
|
||||
self["input"] = PopOutput();
|
||||
output_ = std::move(self);
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PlanToJsonVisitor::PreVisit(CreateNode &op) {
|
||||
json self;
|
||||
self["name"] = "CreateNode";
|
||||
|
@ -68,7 +68,6 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
|
||||
bool PreVisit(ScanAllByLabelProperty &) override;
|
||||
bool PreVisit(ScanAllById &) override;
|
||||
bool PreVisit(ScanAllByEdgeType &) override;
|
||||
bool PreVisit(ScanAllByEdgeId &) override;
|
||||
|
||||
bool PreVisit(Expand &) override;
|
||||
bool PreVisit(ExpandVariable &) override;
|
||||
@ -207,7 +206,6 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor {
|
||||
bool PreVisit(ScanAllByLabelProperty &) override;
|
||||
bool PreVisit(ScanAllById &) override;
|
||||
bool PreVisit(ScanAllByEdgeType &) override;
|
||||
bool PreVisit(ScanAllByEdgeId &) override;
|
||||
|
||||
bool PreVisit(EmptyResult &) override;
|
||||
bool PreVisit(Produce &) override;
|
||||
|
@ -48,26 +48,11 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
|
||||
|
||||
bool PreVisit(Filter &op) override {
|
||||
prev_ops_.push_back(&op);
|
||||
filters_.CollectFilterExpression(op.expression_, *symbol_table_);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PostVisit(Filter &op) override {
|
||||
bool PostVisit(Filter & /*op*/) override {
|
||||
prev_ops_.pop_back();
|
||||
|
||||
ExpressionRemovalResult removal = RemoveExpressions(op.expression_, filter_exprs_for_removal_);
|
||||
op.expression_ = removal.trimmed_expression;
|
||||
if (op.expression_) {
|
||||
Filters leftover_filters;
|
||||
leftover_filters.CollectFilterExpression(op.expression_, *symbol_table_);
|
||||
op.all_filters_ = std::move(leftover_filters);
|
||||
}
|
||||
|
||||
if (!op.expression_) {
|
||||
SetOnParent(op.input());
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -85,7 +70,7 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
|
||||
bool PostVisit(ScanAll &op) override {
|
||||
prev_ops_.pop_back();
|
||||
|
||||
if (EdgeTypeIndexingPossible() || maybe_id_lookup_value_) {
|
||||
if (EdgeTypeIndexingPossible()) {
|
||||
SetOnParent(op.input());
|
||||
}
|
||||
|
||||
@ -103,25 +88,6 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
|
||||
edge_type_index_exist = only_one_edge_type ? db_->EdgeTypeIndexExists(op.common_.edge_types.front()) : false;
|
||||
|
||||
scanall_under_expand_ = only_one_edge_type && expansion_is_named && expdanded_node_not_named;
|
||||
|
||||
const auto &output_symbol = op.common_.edge_symbol;
|
||||
const auto &modified_symbols = op.ModifiedSymbols(*symbol_table_);
|
||||
std::unordered_set<Symbol> bound_symbols(modified_symbols.begin(), modified_symbols.end());
|
||||
auto are_bound = [&bound_symbols](const auto &used_symbols) {
|
||||
for (const auto &used_symbol : used_symbols) {
|
||||
if (!utils::Contains(bound_symbols, used_symbol)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
for (const auto &filter : filters_.IdFilters(output_symbol)) {
|
||||
if (filter.id_filter->is_symbol_in_value_ || !are_bound(filter.used_symbols)) continue;
|
||||
maybe_id_lookup_value_ = filter.id_filter->value_;
|
||||
filter_exprs_for_removal_.insert(filter.expression);
|
||||
filters_.EraseFilter(filter);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -130,10 +96,9 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
|
||||
bool PostVisit(Expand &op) override {
|
||||
prev_ops_.pop_back();
|
||||
|
||||
if (EdgeTypeIndexingPossible() || maybe_id_lookup_value_) {
|
||||
if (EdgeTypeIndexingPossible()) {
|
||||
auto indexed_scan = GenEdgeTypeScan(op);
|
||||
SetOnParent(std::move(indexed_scan));
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -289,15 +254,6 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PreVisit(ScanAllByEdgeId &op) override {
|
||||
prev_ops_.push_back(&op);
|
||||
return true;
|
||||
}
|
||||
bool PostVisit(ScanAllByEdgeId &) override {
|
||||
prev_ops_.pop_back();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PreVisit(ConstructNamedPath &op) override {
|
||||
prev_ops_.push_back(&op);
|
||||
return true;
|
||||
@ -535,12 +491,9 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
|
||||
std::vector<LogicalOperator *> prev_ops_;
|
||||
std::unordered_set<Symbol> cartesian_symbols_;
|
||||
|
||||
memgraph::query::Expression *maybe_id_lookup_value_ = nullptr;
|
||||
|
||||
bool EdgeTypeIndexingPossible() const {
|
||||
return expand_under_produce_ && scanall_under_expand_ && once_under_scanall_ && edge_type_index_exist;
|
||||
}
|
||||
|
||||
bool expand_under_produce_ = false;
|
||||
bool scanall_under_expand_ = false;
|
||||
bool once_under_scanall_ = false;
|
||||
@ -550,21 +503,14 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
|
||||
throw utils::NotYetImplemented("Operator not yet covered by EdgeTypeIndexRewriter");
|
||||
}
|
||||
|
||||
std::unique_ptr<ScanAll> GenEdgeTypeScan(const Expand &expand) {
|
||||
std::unique_ptr<ScanAllByEdgeType> GenEdgeTypeScan(const Expand &expand) {
|
||||
const auto &input = expand.input();
|
||||
const auto &output_symbol = expand.common_.edge_symbol;
|
||||
const auto &view = expand.view_;
|
||||
|
||||
if (EdgeTypeIndexingPossible()) {
|
||||
auto edge_type = expand.common_.edge_types.front();
|
||||
return std::make_unique<ScanAllByEdgeType>(input, output_symbol, edge_type, view);
|
||||
}
|
||||
|
||||
if (maybe_id_lookup_value_) {
|
||||
return std::make_unique<ScanAllByEdgeId>(input, output_symbol, maybe_id_lookup_value_, view);
|
||||
}
|
||||
|
||||
LOG_FATAL("Fatal error while rewriting query plan.");
|
||||
// Extract edge_type from symbol
|
||||
auto edge_type = expand.common_.edge_types.front();
|
||||
return std::make_unique<ScanAllByEdgeType>(input, output_symbol, edge_type, view);
|
||||
}
|
||||
|
||||
void SetOnParent(const std::shared_ptr<LogicalOperator> &input) {
|
||||
|
@ -36,7 +36,6 @@ struct SalientConfig {
|
||||
StorageMode storage_mode{StorageMode::IN_MEMORY_TRANSACTIONAL};
|
||||
struct Items {
|
||||
bool properties_on_edges{true};
|
||||
bool enable_edges_metadata{false};
|
||||
bool enable_schema_metadata{false};
|
||||
bool delta_on_identical_property_update{true};
|
||||
friend bool operator==(const Items &lrh, const Items &rhs) = default;
|
||||
@ -47,13 +46,11 @@ struct SalientConfig {
|
||||
|
||||
inline void to_json(nlohmann::json &data, SalientConfig::Items const &items) {
|
||||
data = nlohmann::json{{"properties_on_edges", items.properties_on_edges},
|
||||
{"enable_edges_metadata", items.enable_edges_metadata},
|
||||
{"enable_schema_metadata", items.enable_schema_metadata}};
|
||||
}
|
||||
|
||||
inline void from_json(const nlohmann::json &data, SalientConfig::Items &items) {
|
||||
data.at("properties_on_edges").get_to(items.properties_on_edges);
|
||||
data.at("enable_edges_metadata").get_to(items.enable_edges_metadata);
|
||||
data.at("enable_schema_metadata").get_to(items.enable_schema_metadata);
|
||||
}
|
||||
|
||||
|
@ -916,10 +916,6 @@ std::optional<VertexAccessor> DiskStorage::DiskAccessor::FindVertex(storage::Gid
|
||||
return disk_storage->FindVertex(gid, &transaction_, view);
|
||||
}
|
||||
|
||||
std::optional<EdgeAccessor> DiskStorage::DiskAccessor::FindEdge(storage::Gid gid, View view) {
|
||||
throw utils::NotYetImplemented("Id based lookup for on-disk storage mode is not yet implemented on edges.");
|
||||
}
|
||||
|
||||
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
|
||||
DiskStorage::DiskAccessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges,
|
||||
bool detach) {
|
||||
|
@ -72,8 +72,6 @@ class DiskStorage final : public Storage {
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
|
||||
|
||||
std::optional<EdgeAccessor> FindEdge(Gid gid, View view) override;
|
||||
|
||||
EdgesIterable Edges(EdgeTypeId edge_type, View view) override;
|
||||
|
||||
uint64_t ApproximateVertexCount() const override;
|
||||
|
@ -300,7 +300,6 @@ std::optional<ParallelizedSchemaCreationInfo> GetParallelExecInfoIndices(const R
|
||||
|
||||
std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
|
||||
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
|
||||
utils::SkipList<EdgeMetadata> *edges_metadata,
|
||||
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
|
||||
Indices *indices, Constraints *constraints, const Config &config,
|
||||
uint64_t *wal_seq_num) {
|
||||
@ -335,8 +334,7 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
|
||||
}
|
||||
spdlog::info("Starting snapshot recovery from {}.", path);
|
||||
try {
|
||||
recovered_snapshot =
|
||||
LoadSnapshot(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count, config);
|
||||
recovered_snapshot = LoadSnapshot(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
|
||||
spdlog::info("Snapshot recovery successful!");
|
||||
break;
|
||||
} catch (const RecoveryFailure &e) {
|
||||
|
@ -134,7 +134,6 @@ struct Recovery {
|
||||
/// @throw std::bad_alloc
|
||||
std::optional<RecoveryInfo> RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
|
||||
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
|
||||
utils::SkipList<EdgeMetadata> *edges_metadata,
|
||||
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
|
||||
Indices *indices, Constraints *constraints, const Config &config,
|
||||
uint64_t *wal_seq_num);
|
||||
|
@ -427,7 +427,6 @@ struct LoadPartialConnectivityResult {
|
||||
template <typename TEdgeTypeFromIdFunc>
|
||||
LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::path &path,
|
||||
utils::SkipList<Vertex> &vertices, utils::SkipList<Edge> &edges,
|
||||
utils::SkipList<EdgeMetadata> &edges_metadata,
|
||||
const uint64_t from_offset, const uint64_t vertices_count,
|
||||
const SalientConfig::Items items, const bool snapshot_has_edges,
|
||||
TEdgeTypeFromIdFunc get_edge_type_from_id) {
|
||||
@ -438,7 +437,6 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat
|
||||
|
||||
auto vertex_acc = vertices.access();
|
||||
auto edge_acc = edges.access();
|
||||
auto edge_metadata_acc = edges_metadata.access();
|
||||
|
||||
// Read the first gid to find the necessary iterator in vertices
|
||||
const auto first_vertex_gid = std::invoke([&]() mutable {
|
||||
@ -582,9 +580,6 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat
|
||||
auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr});
|
||||
edge_ref = EdgeRef(&*edge);
|
||||
}
|
||||
if (items.enable_edges_metadata) {
|
||||
edge_metadata_acc.insert(EdgeMetadata{Gid::FromUint(*edge_gid), &vertex});
|
||||
}
|
||||
}
|
||||
vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), &*to_vertex, edge_ref);
|
||||
// Increment edge count. We only increment the count here because the
|
||||
@ -631,7 +626,7 @@ void RecoverOnMultipleThreads(size_t thread_count, const TFunc &func, const std:
|
||||
}
|
||||
|
||||
RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
|
||||
utils::SkipList<Edge> *edges,
|
||||
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
|
||||
SalientConfig::Items items) {
|
||||
@ -649,7 +644,6 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
|
||||
if (!success) {
|
||||
edges->clear();
|
||||
vertices->clear();
|
||||
edges_metadata->clear();
|
||||
epoch_history->clear();
|
||||
}
|
||||
});
|
||||
@ -1104,7 +1098,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
|
||||
}
|
||||
|
||||
RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
|
||||
utils::SkipList<Edge> *edges,
|
||||
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
|
||||
const Config &config) {
|
||||
@ -1124,7 +1118,6 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
|
||||
if (!success) {
|
||||
edges->clear();
|
||||
vertices->clear();
|
||||
edges_metadata->clear();
|
||||
epoch_history->clear();
|
||||
}
|
||||
});
|
||||
@ -1233,10 +1226,10 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
|
||||
|
||||
RecoverOnMultipleThreads(
|
||||
config.durability.recovery_thread_count,
|
||||
[path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
|
||||
&get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
|
||||
const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
|
||||
batch.count, items, snapshot_has_edges, get_edge_type_from_id);
|
||||
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id,
|
||||
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
|
||||
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
|
||||
snapshot_has_edges, get_edge_type_from_id);
|
||||
edge_count->fetch_add(result.edge_count);
|
||||
auto known_highest_edge_gid = highest_edge_gid.load();
|
||||
while (known_highest_edge_gid < result.highest_edge_id) {
|
||||
@ -1394,7 +1387,7 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
|
||||
}
|
||||
|
||||
RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
|
||||
utils::SkipList<Edge> *edges,
|
||||
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
|
||||
const Config &config) {
|
||||
@ -1414,7 +1407,6 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
|
||||
if (!success) {
|
||||
edges->clear();
|
||||
vertices->clear();
|
||||
edges_metadata->clear();
|
||||
epoch_history->clear();
|
||||
}
|
||||
});
|
||||
@ -1523,10 +1515,10 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
|
||||
|
||||
RecoverOnMultipleThreads(
|
||||
config.durability.recovery_thread_count,
|
||||
[path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
|
||||
&get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
|
||||
const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
|
||||
batch.count, items, snapshot_has_edges, get_edge_type_from_id);
|
||||
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id,
|
||||
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
|
||||
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
|
||||
snapshot_has_edges, get_edge_type_from_id);
|
||||
edge_count->fetch_add(result.edge_count);
|
||||
auto known_highest_edge_gid = highest_edge_gid.load();
|
||||
while (known_highest_edge_gid < result.highest_edge_id) {
|
||||
@ -1738,7 +1730,7 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
|
||||
}
|
||||
|
||||
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
|
||||
utils::SkipList<Edge> *edges,
|
||||
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config) {
|
||||
RecoveryInfo recovery_info;
|
||||
@ -1750,16 +1742,14 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
|
||||
if (!IsVersionSupported(*version)) throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version));
|
||||
if (*version == 14U) {
|
||||
return LoadSnapshotVersion14(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
|
||||
return LoadSnapshotVersion14(path, vertices, edges, epoch_history, name_id_mapper, edge_count,
|
||||
config.salient.items);
|
||||
}
|
||||
if (*version == 15U) {
|
||||
return LoadSnapshotVersion15(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
|
||||
config);
|
||||
return LoadSnapshotVersion15(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
|
||||
}
|
||||
if (*version == 16U) {
|
||||
return LoadSnapshotVersion16(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
|
||||
config);
|
||||
return LoadSnapshotVersion16(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
|
||||
}
|
||||
|
||||
// Cleanup of loaded data in case of failure.
|
||||
@ -1768,7 +1758,6 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
if (!success) {
|
||||
edges->clear();
|
||||
vertices->clear();
|
||||
edges_metadata->clear();
|
||||
epoch_history->clear();
|
||||
}
|
||||
});
|
||||
@ -1877,10 +1866,10 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
|
||||
RecoverOnMultipleThreads(
|
||||
config.durability.recovery_thread_count,
|
||||
[path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
|
||||
&get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
|
||||
const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
|
||||
batch.count, items, snapshot_has_edges, get_edge_type_from_id);
|
||||
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id,
|
||||
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
|
||||
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
|
||||
snapshot_has_edges, get_edge_type_from_id);
|
||||
edge_count->fetch_add(result.edge_count);
|
||||
auto known_highest_edge_gid = highest_edge_gid.load();
|
||||
while (known_highest_edge_gid < result.highest_edge_id) {
|
||||
|
@ -64,7 +64,7 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path);
|
||||
/// Function used to load the snapshot data into the storage.
|
||||
/// @throw RecoveryFailure
|
||||
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
|
||||
utils::SkipList<Edge> *edges,
|
||||
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config);
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -49,18 +49,4 @@ inline bool operator<(const Edge &first, const Edge &second) { return first.gid
|
||||
inline bool operator==(const Edge &first, const Gid &second) { return first.gid == second; }
|
||||
inline bool operator<(const Edge &first, const Gid &second) { return first.gid < second; }
|
||||
|
||||
struct EdgeMetadata {
|
||||
EdgeMetadata(Gid gid, Vertex *from_vertex) : gid(gid), from_vertex(from_vertex) {}
|
||||
|
||||
Gid gid;
|
||||
Vertex *from_vertex;
|
||||
};
|
||||
|
||||
static_assert(alignof(Edge) >= 8, "The Edge should be aligned to at least 8!");
|
||||
|
||||
inline bool operator==(const EdgeMetadata &first, const EdgeMetadata &second) { return first.gid == second.gid; }
|
||||
inline bool operator<(const EdgeMetadata &first, const EdgeMetadata &second) { return first.gid < second.gid; }
|
||||
inline bool operator==(const EdgeMetadata &first, const Gid &second) { return first.gid == second; }
|
||||
inline bool operator<(const EdgeMetadata &first, const Gid &second) { return first.gid < second; }
|
||||
|
||||
} // namespace memgraph::storage
|
||||
|
@ -16,7 +16,6 @@
|
||||
#include <tuple>
|
||||
|
||||
#include "storage/v2/delta.hpp"
|
||||
#include "storage/v2/edge_info_helpers.hpp"
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "storage/v2/property_store.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
@ -27,15 +26,6 @@
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
std::optional<EdgeAccessor> EdgeAccessor::Create(EdgeRef edge, EdgeTypeId edge_type, Vertex *from_vertex,
|
||||
Vertex *to_vertex, Storage *storage, Transaction *transaction,
|
||||
View view, bool for_deleted) {
|
||||
if (!IsEdgeVisible(edge.ptr, transaction, view)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage, transaction, for_deleted);
|
||||
}
|
||||
|
||||
bool EdgeAccessor::IsDeleted() const {
|
||||
if (!storage_->config_.salient.items.properties_on_edges) {
|
||||
|
@ -44,10 +44,6 @@ class EdgeAccessor final {
|
||||
transaction_(transaction),
|
||||
for_deleted_(for_deleted) {}
|
||||
|
||||
static std::optional<EdgeAccessor> Create(EdgeRef edge, EdgeTypeId edge_type, Vertex *from_vertex, Vertex *to_vertex,
|
||||
Storage *storage, Transaction *transaction, View view,
|
||||
bool for_deleted = false);
|
||||
|
||||
bool IsDeleted() const;
|
||||
|
||||
/// @return true if the object is visible from the current transaction
|
||||
|
@ -1,56 +0,0 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
#pragma once
|
||||
|
||||
#include "storage/v2/delta.hpp"
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
inline bool IsEdgeVisible(Edge *edge, const Transaction *transaction, View view) {
|
||||
bool exists = true;
|
||||
bool deleted = true;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
auto guard = std::shared_lock{edge->lock};
|
||||
deleted = edge->deleted;
|
||||
delta = edge->delta;
|
||||
}
|
||||
ApplyDeltasForRead(transaction, delta, view, [&](const Delta &delta) {
|
||||
switch (delta.action) {
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
deleted = false;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
exists = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
return exists && !deleted;
|
||||
}
|
||||
|
||||
} // namespace memgraph::storage
|
@ -12,7 +12,6 @@
|
||||
#include "storage/v2/inmemory/edge_type_index.hpp"
|
||||
|
||||
#include "storage/v2/constraints/constraints.hpp"
|
||||
#include "storage/v2/edge_info_helpers.hpp"
|
||||
#include "storage/v2/indices/indices_utils.hpp"
|
||||
#include "utils/counter.hpp"
|
||||
|
||||
@ -26,6 +25,39 @@ using EdgeTypeId = memgraph::storage::EdgeTypeId;
|
||||
using Transaction = memgraph::storage::Transaction;
|
||||
using View = memgraph::storage::View;
|
||||
|
||||
bool IsIndexEntryVisible(Edge *edge, const Transaction *transaction, View view) {
|
||||
bool exists = true;
|
||||
bool deleted = true;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
auto guard = std::shared_lock{edge->lock};
|
||||
deleted = edge->deleted;
|
||||
delta = edge->delta;
|
||||
}
|
||||
ApplyDeltasForRead(transaction, delta, view, [&](const Delta &delta) {
|
||||
switch (delta.action) {
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
deleted = false;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
exists = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
return exists && !deleted;
|
||||
}
|
||||
|
||||
using ReturnType = std::optional<std::tuple<EdgeTypeId, Vertex *, EdgeRef>>;
|
||||
ReturnType VertexDeletedConnectedEdges(Vertex *vertex, Edge *edge, const Transaction *transaction, View view) {
|
||||
ReturnType link;
|
||||
@ -210,7 +242,7 @@ void InMemoryEdgeTypeIndex::Iterable::Iterator::AdvanceUntilValid() {
|
||||
auto *from_vertex = index_iterator_->from_vertex;
|
||||
auto *to_vertex = index_iterator_->to_vertex;
|
||||
|
||||
if (!IsEdgeVisible(index_iterator_->edge, self_->transaction_, self_->view_) || from_vertex->deleted ||
|
||||
if (!IsIndexEntryVisible(index_iterator_->edge, self_->transaction_, self_->view_) || from_vertex->deleted ||
|
||||
to_vertex->deleted) {
|
||||
continue;
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ InMemoryStorage::InMemoryStorage(Config config)
|
||||
config_.durability.storage_directory);
|
||||
}
|
||||
if (config_.durability.recover_on_startup) {
|
||||
auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edges_metadata_, &edge_count_,
|
||||
auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edge_count_,
|
||||
name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_);
|
||||
if (info) {
|
||||
vertex_id_ = info->next_vertex_id;
|
||||
@ -341,11 +341,6 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
|
||||
if (delta) {
|
||||
delta->prev.Set(&*it);
|
||||
}
|
||||
if (config_.enable_edges_metadata) {
|
||||
auto acc = mem_storage->edges_metadata_.access();
|
||||
auto [_, inserted] = acc.insert(EdgeMetadata(gid, from->vertex_));
|
||||
MG_ASSERT(inserted, "The edge must be inserted here!");
|
||||
}
|
||||
}
|
||||
utils::AtomicMemoryBlock atomic_memory_block{
|
||||
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
|
||||
@ -448,11 +443,6 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
|
||||
if (delta) {
|
||||
delta->prev.Set(&*it);
|
||||
}
|
||||
if (config_.enable_edges_metadata) {
|
||||
auto acc = mem_storage->edges_metadata_.access();
|
||||
auto [_, inserted] = acc.insert(EdgeMetadata(gid, from->vertex_));
|
||||
MG_ASSERT(inserted, "The edge must be inserted here!");
|
||||
}
|
||||
}
|
||||
utils::AtomicMemoryBlock atomic_memory_block{
|
||||
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
|
||||
@ -474,15 +464,6 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
|
||||
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
|
||||
}
|
||||
|
||||
void InMemoryStorage::UpdateEdgesMetadataOnModification(Edge *edge, Vertex *from_vertex) {
|
||||
auto edge_metadata_acc = edges_metadata_.access();
|
||||
auto edge_to_modify = edge_metadata_acc.find(edge->gid);
|
||||
if (edge_to_modify == edge_metadata_acc.end()) {
|
||||
throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__);
|
||||
}
|
||||
edge_to_modify->from_vertex = from_vertex;
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) {
|
||||
MG_ASSERT(edge->transaction_ == new_from->transaction_,
|
||||
"EdgeAccessor must be from the same transaction as the new from vertex "
|
||||
@ -583,10 +564,6 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor
|
||||
mem_edge_type_index->UpdateOnEdgeModification(old_from_vertex, to_vertex, new_from_vertex, to_vertex, edge_ref,
|
||||
edge_type, transaction_);
|
||||
|
||||
if (config_.enable_edges_metadata) {
|
||||
in_memory->UpdateEdgesMetadataOnModification(edge_ref.ptr, new_from_vertex);
|
||||
}
|
||||
|
||||
transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT);
|
||||
transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
|
||||
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
|
||||
@ -1010,10 +987,8 @@ void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_acti
|
||||
if (!current_deleted_edges.empty()) {
|
||||
// 3.c) remove from edge skip_list
|
||||
auto edge_acc = mem_storage->edges_.access();
|
||||
auto edge_metadata_acc = mem_storage->edges_metadata_.access();
|
||||
for (auto gid : current_deleted_edges) {
|
||||
edge_acc.remove(gid);
|
||||
edge_metadata_acc.remove(gid);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1259,10 +1234,8 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
// EDGES
|
||||
{
|
||||
auto edges_acc = mem_storage->edges_.access();
|
||||
auto edges_metadata_acc = mem_storage->edges_metadata_.access();
|
||||
for (auto gid : my_deleted_edges) {
|
||||
edges_acc.remove(gid);
|
||||
edges_metadata_acc.remove(gid);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1465,62 +1438,6 @@ EdgesIterable InMemoryStorage::InMemoryAccessor::Edges(EdgeTypeId edge_type, Vie
|
||||
return EdgesIterable(mem_edge_type_index->Edges(edge_type, view, storage_, &transaction_));
|
||||
}
|
||||
|
||||
std::optional<EdgeAccessor> InMemoryStorage::InMemoryAccessor::FindEdge(Gid gid, View view) {
|
||||
using EdgeInfo = std::optional<std::tuple<EdgeRef, EdgeTypeId, Vertex *, Vertex *>>;
|
||||
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
||||
|
||||
auto edge_acc = mem_storage->edges_.access();
|
||||
auto edge_it = edge_acc.find(gid);
|
||||
if (edge_it == edge_acc.end()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto *edge_ptr = &(*edge_it);
|
||||
auto vertices_acc = mem_storage->vertices_.access();
|
||||
|
||||
auto extract_edge_info = [&](Vertex *from_vertex) -> EdgeInfo {
|
||||
for (auto &out_edge : from_vertex->out_edges) {
|
||||
if (std::get<2>(out_edge).ptr == edge_ptr) {
|
||||
return std::tuple(std::get<2>(out_edge), std::get<0>(out_edge), from_vertex, std::get<1>(out_edge));
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
auto edge_accessor_from_info = [this, view](EdgeInfo &maybe_edge_info) -> std::optional<EdgeAccessor> {
|
||||
if (!maybe_edge_info) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto &edge_info = *maybe_edge_info;
|
||||
return EdgeAccessor::Create(std::get<0>(edge_info), std::get<1>(edge_info), std::get<2>(edge_info),
|
||||
std::get<3>(edge_info), storage_, &transaction_, view);
|
||||
};
|
||||
|
||||
if (mem_storage->config_.salient.items.enable_edges_metadata) {
|
||||
auto edge_metadata_acc = mem_storage->edges_metadata_.access();
|
||||
auto edge_metadata_it = edge_metadata_acc.find(gid);
|
||||
MG_ASSERT(edge_metadata_it != edge_metadata_acc.end(), "Invalid database state!");
|
||||
|
||||
auto maybe_edge_info = extract_edge_info(edge_metadata_it->from_vertex);
|
||||
return edge_accessor_from_info(maybe_edge_info);
|
||||
}
|
||||
|
||||
// If metadata on edges is not enabled we will have to do
|
||||
// a full scan.
|
||||
auto maybe_edge_info = std::invoke([&]() -> EdgeInfo {
|
||||
for (auto &from_vertex : vertices_acc) {
|
||||
auto maybe_edge_info = extract_edge_info(&from_vertex);
|
||||
if (maybe_edge_info) {
|
||||
return maybe_edge_info;
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
});
|
||||
|
||||
return edge_accessor_from_info(maybe_edge_info);
|
||||
}
|
||||
|
||||
Transaction InMemoryStorage::CreateTransaction(
|
||||
IsolationLevel isolation_level, StorageMode storage_mode,
|
||||
memgraph::replication_coordination_glue::ReplicationRole replication_role) {
|
||||
@ -1869,12 +1786,8 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
}
|
||||
{
|
||||
auto edge_acc = edges_.access();
|
||||
auto edge_metadata_acc = edges_metadata_.access();
|
||||
for (auto edge : current_deleted_edges) {
|
||||
MG_ASSERT(edge_acc.remove(edge), "Invalid database state!");
|
||||
if (config_.salient.items.enable_edges_metadata) {
|
||||
MG_ASSERT(edge_metadata_acc.remove(edge), "Invalid database state!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1896,12 +1809,10 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
// EXPENSIVE full scan, is only run if an IN_MEMORY_ANALYTICAL transaction involved any deletions
|
||||
if (need_full_scan_edges) {
|
||||
auto edge_acc = edges_.access();
|
||||
auto edge_metadata_acc = edges_metadata_.access();
|
||||
for (auto &edge : edge_acc) {
|
||||
// a deleted edge which as no deltas must have come from IN_MEMORY_ANALYTICAL deletion
|
||||
if (edge.delta == nullptr && edge.deleted) {
|
||||
edge_acc.remove(edge);
|
||||
edge_metadata_acc.remove(edge.gid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -109,8 +109,6 @@ class InMemoryStorage final : public Storage {
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
|
||||
|
||||
std::optional<EdgeAccessor> FindEdge(Gid gid, View view) override;
|
||||
|
||||
EdgesIterable Edges(EdgeTypeId edge_type, View view) override;
|
||||
|
||||
/// Return approximate number of all vertices in the database.
|
||||
@ -426,12 +424,9 @@ class InMemoryStorage final : public Storage {
|
||||
|
||||
void PrepareForNewEpoch() override;
|
||||
|
||||
void UpdateEdgesMetadataOnModification(Edge *edge, Vertex *from_vertex);
|
||||
|
||||
// Main object storage
|
||||
utils::SkipList<storage::Vertex> vertices_;
|
||||
utils::SkipList<storage::Edge> edges_;
|
||||
utils::SkipList<storage::EdgeMetadata> edges_metadata_;
|
||||
|
||||
// Durability
|
||||
durability::Recovery recovery_;
|
||||
|
@ -179,8 +179,6 @@ class Storage {
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) = 0;
|
||||
|
||||
virtual std::optional<EdgeAccessor> FindEdge(Gid gid, View view) = 0;
|
||||
|
||||
virtual EdgesIterable Edges(EdgeTypeId edge_type, View view) = 0;
|
||||
|
||||
virtual Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex);
|
||||
|
@ -27,7 +27,6 @@
|
||||
M(ScanAllByLabelPropertyOperator, Operator, "Number of times ScanAllByLabelProperty operator was used.") \
|
||||
M(ScanAllByIdOperator, Operator, "Number of times ScanAllById operator was used.") \
|
||||
M(ScanAllByEdgeTypeOperator, Operator, "Number of times ScanAllByEdgeTypeOperator operator was used.") \
|
||||
M(ScanAllByEdgeIdOperator, Operator, "Number of times ScanAllByEdgeIdOperator operator was used.") \
|
||||
M(ExpandOperator, Operator, "Number of times Expand operator was used.") \
|
||||
M(ExpandVariableOperator, Operator, "Number of times ExpandVariable operator was used.") \
|
||||
M(ConstructNamedPathOperator, Operator, "Number of times ConstructNamedPath operator was used.") \
|
||||
|
@ -124,11 +124,6 @@ startup_config_dict = {
|
||||
"false",
|
||||
"Controls whether metadata should be collected about the resident labels and edge types.",
|
||||
),
|
||||
"storage_enable_edges_metadata": (
|
||||
"false",
|
||||
"false",
|
||||
"Controls whether additional metadata should be stored about the edges in order to do faster traversals on certain queries.",
|
||||
),
|
||||
"password_encryption_algorithm": ("bcrypt", "bcrypt", "The password encryption algorithm used for authentication."),
|
||||
"pulsar_service_url": ("", "", "Default URL used while connecting to Pulsar brokers."),
|
||||
"query_execution_timeout_sec": (
|
||||
|
@ -1704,13 +1704,6 @@ TYPED_TEST(TestPlanner, ScanAllById) {
|
||||
CheckPlan<TypeParam>(query, this->storage, ExpectScanAllById(), ExpectProduce());
|
||||
}
|
||||
|
||||
TYPED_TEST(TestPlanner, ScanAllByEdgeId) {
|
||||
// Test MATCH ()-[r]->() WHERE id(r) = 42 RETURN r
|
||||
auto *query = QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("anon1"), EDGE("r"), NODE("anon2"))),
|
||||
WHERE(EQ(FN("id", IDENT("r")), LITERAL(42))), RETURN("r")));
|
||||
CheckPlan<TypeParam>(query, this->storage, ExpectScanAllByEdgeId(), ExpectProduce());
|
||||
}
|
||||
|
||||
TYPED_TEST(TestPlanner, BfsToExisting) {
|
||||
// Test MATCH (n)-[r *bfs]-(m) WHERE id(m) = 42 RETURN r
|
||||
auto *bfs = this->storage.template Create<memgraph::query::EdgeAtom>(
|
||||
|
@ -66,7 +66,6 @@ class PlanChecker : public virtual HierarchicalLogicalOperatorVisitor {
|
||||
PRE_VISIT(ScanAllByLabelPropertyRange);
|
||||
PRE_VISIT(ScanAllByLabelProperty);
|
||||
PRE_VISIT(ScanAllByEdgeType);
|
||||
PRE_VISIT(ScanAllByEdgeId);
|
||||
PRE_VISIT(ScanAllById);
|
||||
PRE_VISIT(Expand);
|
||||
PRE_VISIT(ExpandVariable);
|
||||
@ -173,7 +172,6 @@ using ExpectDelete = OpChecker<Delete>;
|
||||
using ExpectScanAll = OpChecker<ScanAll>;
|
||||
using ExpectScanAllByLabel = OpChecker<ScanAllByLabel>;
|
||||
using ExpectScanAllByEdgeType = OpChecker<ScanAllByEdgeType>;
|
||||
using ExpectScanAllByEdgeId = OpChecker<ScanAllByEdgeId>;
|
||||
using ExpectScanAllById = OpChecker<ScanAllById>;
|
||||
using ExpectExpand = OpChecker<Expand>;
|
||||
using ExpectConstructNamedPath = OpChecker<ConstructNamedPath>;
|
||||
|
@ -2965,7 +2965,6 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) {
|
||||
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
|
||||
memgraph::utils::SkipList<memgraph::storage::Vertex> vertices;
|
||||
memgraph::utils::SkipList<memgraph::storage::Edge> edges;
|
||||
memgraph::utils::SkipList<memgraph::storage::EdgeMetadata> edges_metadata;
|
||||
std::unique_ptr<memgraph::storage::NameIdMapper> name_id_mapper = std::make_unique<memgraph::storage::NameIdMapper>();
|
||||
std::atomic<uint64_t> edge_count{0};
|
||||
uint64_t wal_seq_num{0};
|
||||
@ -2979,7 +2978,7 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) {
|
||||
config.durability.storage_directory / memgraph::storage::durability::kWalDirectory};
|
||||
|
||||
// Recover snapshot.
|
||||
const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edges_metadata, &edge_count,
|
||||
const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edge_count,
|
||||
name_id_mapper.get(), &indices, &constraints, config, &wal_seq_num);
|
||||
|
||||
MG_ASSERT(info.has_value(), "Info doesn't have value present");
|
||||
@ -3045,61 +3044,3 @@ TEST_P(DurabilityTest, EdgeTypeIndexRecovered) {
|
||||
ASSERT_FALSE(acc->Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, EdgeMetadataRecovered) {
|
||||
if (GetParam() == false) {
|
||||
return;
|
||||
}
|
||||
// Create snapshot.
|
||||
{
|
||||
memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory, .snapshot_on_exit = true}};
|
||||
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
|
||||
memgraph::dbms::Database db{config, repl_state};
|
||||
CreateBaseDataset(db.storage(), GetParam());
|
||||
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
||||
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
||||
ASSERT_EQ(GetWalsList().size(), 0);
|
||||
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
||||
|
||||
// Recover snapshot.
|
||||
memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam(), .enable_edges_metadata = true},
|
||||
.durability = {.storage_directory = storage_directory, .recover_on_startup = true}};
|
||||
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
|
||||
memgraph::dbms::Database db{config, repl_state};
|
||||
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
|
||||
|
||||
// Check if data has been loaded correctly.
|
||||
{
|
||||
auto acc = db.Access();
|
||||
|
||||
for (auto i{0U}; i < kNumBaseEdges; ++i) {
|
||||
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(i), memgraph::storage::View::OLD);
|
||||
ASSERT_TRUE(edge.has_value());
|
||||
}
|
||||
|
||||
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges), memgraph::storage::View::OLD);
|
||||
ASSERT_FALSE(edge.has_value());
|
||||
|
||||
auto vertex = acc->CreateVertex();
|
||||
auto new_edge = acc->CreateEdge(&vertex, &vertex, db.storage()->NameToEdgeType("et"));
|
||||
ASSERT_TRUE(new_edge.HasValue());
|
||||
|
||||
ASSERT_FALSE(acc->Commit().HasError());
|
||||
}
|
||||
{
|
||||
auto acc = db.Access();
|
||||
|
||||
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges), memgraph::storage::View::OLD);
|
||||
ASSERT_TRUE(edge.has_value());
|
||||
|
||||
edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges + 1), memgraph::storage::View::OLD);
|
||||
ASSERT_FALSE(edge.has_value());
|
||||
|
||||
ASSERT_FALSE(acc->Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user