Filter deleted edges during edge prefetch (#1145)
This commit is contained in:
parent
762fe6a65d
commit
1fe2190747
@ -80,4 +80,38 @@ int ComparatorWithU64TsImpl::CompareTimestamp(const rocksdb::Slice &ts1, const r
|
||||
return 0;
|
||||
}
|
||||
|
||||
DiskEdgeKey::DiskEdgeKey(storage::EdgeAccessor *edge_acc) {
|
||||
auto from_gid = utils::SerializeIdType(edge_acc->FromVertex().Gid());
|
||||
auto to_gid = utils::SerializeIdType(edge_acc->ToVertex().Gid());
|
||||
auto edge_type = utils::SerializeIdType(edge_acc->EdgeType());
|
||||
auto edge_gid = utils::SerializeIdType(edge_acc->Gid());
|
||||
|
||||
key = fmt::format("{}|{}|{}|{}|{}", from_gid, to_gid, utils::outEdgeDirection, edge_type, edge_gid);
|
||||
}
|
||||
|
||||
DiskEdgeKey::DiskEdgeKey(storage::Gid src_vertex_gid, storage::Gid dest_vertex_gid, storage::EdgeTypeId edge_type_id,
|
||||
const storage::EdgeRef edge_ref, bool properties_on_edges) {
|
||||
auto from_gid = utils::SerializeIdType(src_vertex_gid);
|
||||
auto to_gid = utils::SerializeIdType(dest_vertex_gid);
|
||||
auto edge_type = utils::SerializeIdType(edge_type_id);
|
||||
std::string edge_gid;
|
||||
|
||||
if (properties_on_edges) {
|
||||
edge_gid = utils::SerializeIdType(edge_ref.ptr->gid);
|
||||
} else {
|
||||
edge_gid = utils::SerializeIdType(edge_ref.gid);
|
||||
}
|
||||
|
||||
key = fmt::format("{}|{}|{}|{}|{}", from_gid, to_gid, utils::outEdgeDirection, edge_type, edge_gid);
|
||||
}
|
||||
|
||||
std::string DiskEdgeKey::GetVertexOutGid() const { return key.substr(0, key.find('|')); }
|
||||
|
||||
std::string DiskEdgeKey::GetVertexInGid() const {
|
||||
auto vertex_in_start = key.find('|') + 1;
|
||||
return key.substr(vertex_in_start, key.find('|', vertex_in_start) - vertex_in_start);
|
||||
}
|
||||
|
||||
std::string DiskEdgeKey::GetEdgeGid() const { return key.substr(key.rfind('|') + 1); }
|
||||
|
||||
} // namespace memgraph::storage
|
||||
|
@ -18,12 +18,15 @@
|
||||
#include <rocksdb/status.h>
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/property_store.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
enum class EdgeDirection : uint8_t { OUT = 0, IN = 1 };
|
||||
|
||||
/// TODO: this should be somehow more wrapped inside the storage class so from the software engineering perspective
|
||||
/// it isn't great to have this here. But for now it is ok.
|
||||
/// Wraps RocksDB objects inside a struct. Vertex_chandle and edge_chandle are column family handles that may be
|
||||
@ -86,4 +89,27 @@ class ComparatorWithU64TsImpl : public rocksdb::Comparator {
|
||||
const Comparator *cmp_without_ts_{nullptr};
|
||||
};
|
||||
|
||||
struct DiskEdgeKey {
|
||||
DiskEdgeKey(const std::string_view keyView) : key(keyView) {}
|
||||
|
||||
DiskEdgeKey(EdgeAccessor *edge_acc);
|
||||
|
||||
/// @tparam src_vertex_gid, dest_vertex_gid: Gid of the source and destination vertices
|
||||
/// @tparam edge_type_id: EdgeTypeId of the edge
|
||||
/// @tparam edge_ref: Edge to be serialized
|
||||
DiskEdgeKey(Gid src_vertex_gid, storage::Gid dest_vertex_gid, storage::EdgeTypeId edge_type_id,
|
||||
const EdgeRef edge_ref, bool properties_on_edges);
|
||||
|
||||
std::string GetSerializedKey() const { return key; }
|
||||
|
||||
std::string GetVertexOutGid() const;
|
||||
std::string GetVertexInGid() const;
|
||||
std::string GetEdgeGid() const;
|
||||
|
||||
private:
|
||||
// vertex_gid_1 | vertex_gid_2 | direction | edge_type | GID | commit_timestamp
|
||||
// Currently direction is only out.
|
||||
std::string key;
|
||||
};
|
||||
|
||||
} // namespace memgraph::storage
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <limits>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <string_view>
|
||||
#include <vector>
|
||||
|
||||
#include <rocksdb/comparator.h>
|
||||
@ -933,7 +934,33 @@ DiskStorage::DiskAccessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
||||
std::move(deleted_edges));
|
||||
}
|
||||
|
||||
void DiskStorage::DiskAccessor::PrefetchEdges(const auto &prefetch_edge_filter) {
|
||||
bool DiskStorage::DiskAccessor::PrefetchEdgeFilter(const std::string_view disk_edge_key_str,
|
||||
const VertexAccessor &vertex_acc, EdgeDirection edge_direction) {
|
||||
bool isOutEdge = (edge_direction == EdgeDirection::OUT);
|
||||
DiskEdgeKey disk_edge_key(disk_edge_key_str);
|
||||
auto edges_res = (isOutEdge ? vertex_acc.OutEdges(storage::View::NEW) : vertex_acc.InEdges(storage::View::NEW));
|
||||
const std::string disk_vertex_gid = (isOutEdge ? disk_edge_key.GetVertexOutGid() : disk_edge_key.GetVertexInGid());
|
||||
auto edge_gid = disk_edge_key.GetEdgeGid();
|
||||
|
||||
if (disk_vertex_gid != utils::SerializeIdType(vertex_acc.Gid())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// We need to search in edges_to_delete_ because removed edges are not presented in edges_res
|
||||
if (auto edgeIt = edges_to_delete_.find(disk_edge_key.GetSerializedKey()); edgeIt != edges_to_delete_.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
MG_ASSERT(edges_res.HasValue());
|
||||
auto edges = edges_res.GetValue();
|
||||
bool isEdgeAlreadyInMemory = std::any_of(edges.begin(), edges.end(), [edge_gid](const auto &edge_acc) {
|
||||
return utils::SerializeIdType(edge_acc.Gid()) == edge_gid;
|
||||
});
|
||||
|
||||
return !isEdgeAlreadyInMemory;
|
||||
}
|
||||
|
||||
void DiskStorage::DiskAccessor::PrefetchEdges(const VertexAccessor &vertex_acc, EdgeDirection edge_direction) {
|
||||
rocksdb::ReadOptions read_opts;
|
||||
auto strTs = utils::StringTimestamp(transaction_.start_timestamp);
|
||||
rocksdb::Slice ts(strTs);
|
||||
@ -943,45 +970,19 @@ void DiskStorage::DiskAccessor::PrefetchEdges(const auto &prefetch_edge_filter)
|
||||
disk_transaction_->GetIterator(read_opts, disk_storage->kvstore_->edge_chandle));
|
||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||
const rocksdb::Slice &key = it->key();
|
||||
const auto edge_parts = utils::Split(key.ToStringView(), "|");
|
||||
if (prefetch_edge_filter(edge_parts)) {
|
||||
auto keyStr = key.ToStringView();
|
||||
if (PrefetchEdgeFilter(keyStr, vertex_acc, edge_direction)) {
|
||||
DeserializeEdge(key, it->value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DiskStorage::DiskAccessor::PrefetchInEdges(const VertexAccessor &vertex_acc) {
|
||||
PrefetchEdges([&vertex_acc](const std::vector<std::string> &disk_edge_parts) -> bool {
|
||||
auto disk_vertex_in_edge_gid = disk_edge_parts[1];
|
||||
auto edge_gid = disk_edge_parts[4];
|
||||
auto in_edges_res = vertex_acc.InEdges(storage::View::NEW);
|
||||
if (in_edges_res.HasValue()) {
|
||||
for (const auto &edge_acc : in_edges_res.GetValue()) {
|
||||
if (utils::SerializeIdType(edge_acc.Gid()) == edge_gid) {
|
||||
// We already inserted this edge into the vertex's in_edges list.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return disk_vertex_in_edge_gid == utils::SerializeIdType(vertex_acc.Gid());
|
||||
});
|
||||
PrefetchEdges(vertex_acc, EdgeDirection::IN);
|
||||
}
|
||||
|
||||
void DiskStorage::DiskAccessor::PrefetchOutEdges(const VertexAccessor &vertex_acc) {
|
||||
PrefetchEdges([&vertex_acc](const std::vector<std::string> &disk_edge_parts) -> bool {
|
||||
auto disk_vertex_out_edge_gid = disk_edge_parts[0];
|
||||
auto edge_gid = disk_edge_parts[4];
|
||||
auto out_edges_res = vertex_acc.OutEdges(storage::View::NEW);
|
||||
if (out_edges_res.HasValue()) {
|
||||
for (const auto &edge_acc : out_edges_res.GetValue()) {
|
||||
if (utils::SerializeIdType(edge_acc.Gid()) == edge_gid) {
|
||||
// We already inserted this edge into the vertex's out_edges list.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return disk_vertex_out_edge_gid == utils::SerializeIdType(vertex_acc.Gid());
|
||||
});
|
||||
PrefetchEdges(vertex_acc, EdgeDirection::OUT);
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(const VertexAccessor *from, const VertexAccessor *to,
|
||||
@ -1166,9 +1167,8 @@ Result<std::optional<EdgeAccessor>> DiskStorage::DiskAccessor::DeleteEdge(EdgeAc
|
||||
const auto op1 = delete_edge_from_storage(to_vertex, &from_vertex->out_edges);
|
||||
const auto op2 = delete_edge_from_storage(from_vertex, &to_vertex->in_edges);
|
||||
|
||||
const std::string src_dest_del_key{
|
||||
utils::SerializeEdge(from_vertex->gid, to_vertex->gid, edge_type, edge_ref, config_.properties_on_edges)};
|
||||
edges_to_delete_.emplace_back(src_dest_del_key);
|
||||
const DiskEdgeKey disk_edge_key(from_vertex->gid, to_vertex->gid, edge_type, edge_ref, config_.properties_on_edges);
|
||||
edges_to_delete_.emplace(disk_edge_key.GetSerializedKey());
|
||||
|
||||
if (config_.properties_on_edges) {
|
||||
MG_ASSERT((op1 && op2), "Invalid database state!");
|
||||
@ -1331,8 +1331,8 @@ DiskStorage::DiskAccessor::CheckVertexConstraintsBeforeCommit(
|
||||
|
||||
for (auto &edge_entry : vertex.out_edges) {
|
||||
EdgeRef edge = std::get<2>(edge_entry);
|
||||
auto src_dest_key = utils::SerializeEdge(vertex.gid, std::get<1>(edge_entry)->gid, std::get<0>(edge_entry), edge,
|
||||
config_.properties_on_edges);
|
||||
const DiskEdgeKey src_dest_key(vertex.gid, std::get<1>(edge_entry)->gid, std::get<0>(edge_entry), edge,
|
||||
config_.properties_on_edges);
|
||||
|
||||
/// TODO: expose temporal coupling
|
||||
/// NOTE: this deletion has to come before writing, otherwise RocksDB thinks that all entries are deleted
|
||||
@ -1344,7 +1344,7 @@ DiskStorage::DiskAccessor::CheckVertexConstraintsBeforeCommit(
|
||||
}
|
||||
}
|
||||
|
||||
if (!WriteEdgeToDisk(edge, src_dest_key)) {
|
||||
if (!WriteEdgeToDisk(edge, src_dest_key.GetSerializedKey())) {
|
||||
return StorageDataManipulationError{SerializationError{}};
|
||||
}
|
||||
|
||||
@ -1412,10 +1412,10 @@ DiskStorage::DiskAccessor::CheckVertexConstraintsBeforeCommit(
|
||||
|
||||
for (auto &edge_entry : vertex.out_edges) {
|
||||
EdgeRef edge = std::get<2>(edge_entry);
|
||||
auto src_dest_key = utils::SerializeEdge(vertex.gid, std::get<1>(edge_entry)->gid, std::get<0>(edge_entry),
|
||||
edge, config_.properties_on_edges);
|
||||
DiskEdgeKey src_dest_key(vertex.gid, std::get<1>(edge_entry)->gid, std::get<0>(edge_entry), edge,
|
||||
config_.properties_on_edges);
|
||||
|
||||
if (!WriteEdgeToDisk(edge, src_dest_key)) {
|
||||
if (!WriteEdgeToDisk(edge, src_dest_key.GetSerializedKey())) {
|
||||
return StorageDataManipulationError{SerializationError{}};
|
||||
}
|
||||
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "utils/rw_lock.hpp"
|
||||
|
||||
#include <rocksdb/db.h>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
@ -211,7 +212,9 @@ class DiskStorage final : public Storage {
|
||||
VertexAccessor CreateVertex(utils::SkipList<Vertex>::Accessor &accessor, storage::Gid gid,
|
||||
const std::vector<LabelId> &label_ids, PropertyStore &&properties, Delta *delta);
|
||||
|
||||
void PrefetchEdges(const auto &prefetch_edge_filter);
|
||||
bool PrefetchEdgeFilter(const std::string_view disk_edge_key_str, const VertexAccessor &vertex_acc,
|
||||
EdgeDirection edge_direction);
|
||||
void PrefetchEdges(const VertexAccessor &vertex_acc, EdgeDirection edge_direction);
|
||||
|
||||
Result<EdgeAccessor> CreateEdge(const VertexAccessor *from, const VertexAccessor *to, EdgeTypeId edge_type,
|
||||
storage::Gid gid, std::string_view properties, const std::string &old_disk_key);
|
||||
@ -240,7 +243,7 @@ class DiskStorage final : public Storage {
|
||||
std::vector<std::list<Delta>> index_deltas_storage_;
|
||||
utils::SkipList<storage::Edge> edges_;
|
||||
Config::Items config_;
|
||||
std::vector<std::string> edges_to_delete_;
|
||||
std::unordered_set<std::string> edges_to_delete_;
|
||||
std::vector<std::pair<std::string, std::string>> vertices_to_delete_;
|
||||
rocksdb::Transaction *disk_transaction_;
|
||||
};
|
||||
|
@ -223,52 +223,6 @@ inline storage::PropertyStore DeserializePropertiesFromLabelPropertyIndexStorage
|
||||
return DeserializePropertiesFromAuxiliaryStorages(value);
|
||||
}
|
||||
|
||||
/// Serialize edge as two KV entries
|
||||
/// vertex_gid_1 | vertex_gid_2 | direction | edge_type | GID | commit_timestamp
|
||||
inline std::string SerializeEdge(storage::EdgeAccessor *edge_acc) {
|
||||
// Serialized objects
|
||||
auto from_gid = utils::SerializeIdType(edge_acc->FromVertex().Gid());
|
||||
auto to_gid = utils::SerializeIdType(edge_acc->ToVertex().Gid());
|
||||
auto edge_type = utils::SerializeIdType(edge_acc->EdgeType());
|
||||
auto edge_gid = utils::SerializeIdType(edge_acc->Gid());
|
||||
// source->destination key
|
||||
std::string src_dest_key = from_gid + "|";
|
||||
src_dest_key += to_gid + "|";
|
||||
src_dest_key += outEdgeDirection;
|
||||
src_dest_key += "|" + edge_type + "|";
|
||||
src_dest_key += edge_gid;
|
||||
return src_dest_key;
|
||||
}
|
||||
|
||||
/// Serialize edge as two KV entries
|
||||
/// vertex_gid_1 | vertex_gid_2 | direction | edge_type | GID | commit_timestamp
|
||||
/// @tparam src_vertex_gid, dest_vertex_gid: Gid of the source and destination vertices
|
||||
/// @tparam edge: Edge to be serialized
|
||||
/// @tparam edge_type_id: EdgeTypeId of the edge
|
||||
inline std::string SerializeEdge(storage::Gid src_vertex_gid, storage::Gid dest_vertex_gid,
|
||||
storage::EdgeTypeId edge_type_id, const storage::EdgeRef edge_ref,
|
||||
bool properties_on_edges) {
|
||||
// Serialized objects
|
||||
auto from_gid = utils::SerializeIdType(src_vertex_gid);
|
||||
auto to_gid = utils::SerializeIdType(dest_vertex_gid);
|
||||
auto edge_type = utils::SerializeIdType(edge_type_id);
|
||||
std::string edge_gid;
|
||||
|
||||
if (properties_on_edges) {
|
||||
edge_gid = utils::SerializeIdType(edge_ref.ptr->gid);
|
||||
} else {
|
||||
edge_gid = utils::SerializeIdType(edge_ref.gid);
|
||||
}
|
||||
|
||||
// source->destination key
|
||||
std::string src_dest_key = from_gid + "|";
|
||||
src_dest_key += to_gid + "|";
|
||||
src_dest_key += outEdgeDirection;
|
||||
src_dest_key += "|" + edge_type + "|";
|
||||
src_dest_key += edge_gid;
|
||||
return src_dest_key;
|
||||
}
|
||||
|
||||
/// TODO: (andi): This can potentially be a problem on big-endian machines.
|
||||
inline void PutFixed64(std::string *dst, uint64_t value) {
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
|
||||
|
@ -25,7 +25,7 @@ disk_storage_properties_edges_false: &disk_storage_properties_edges_false
|
||||
main:
|
||||
args: ["--bolt-port", *bolt_port, "--log-level=TRACE", "--also-log-to-stderr", "--storage-properties-on-edges=False"]
|
||||
log_file: "triggers-e2e-disk.log"
|
||||
setup_queries: []
|
||||
setup_queries: ["storage mode on_disk_transactional"]
|
||||
validation_queries: []
|
||||
|
||||
workloads:
|
||||
|
Loading…
Reference in New Issue
Block a user