Compare commits

...

1 Commits

Author SHA1 Message Date
Josip Mrden
4e6ca5df01 Add cache for when rollbacking on edge adding and removing 2024-02-07 15:21:53 +01:00

View File

@ -176,9 +176,9 @@ InMemoryStorage::~InMemoryStorage() {
committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); });
}
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(auto tag, InMemoryStorage *storage, IsolationLevel isolation_level,
StorageMode storage_mode,
memgraph::replication_coordination_glue::ReplicationRole replication_role)
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(
auto tag, InMemoryStorage *storage, IsolationLevel isolation_level, StorageMode storage_mode,
memgraph::replication_coordination_glue::ReplicationRole replication_role)
: Accessor(tag, storage, isolation_level, storage_mode, replication_role),
config_(storage->config_.salient.items) {}
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(InMemoryAccessor &&other) noexcept
@ -882,6 +882,9 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
std::list<Gid> my_deleted_vertices;
std::list<Gid> my_deleted_edges;
std::unordered_map<Gid, uint64_t> added_in_edges;
std::unordered_map<Gid, uint64_t> added_out_edges;
std::map<LabelId, std::vector<Vertex *>> label_cleanup;
std::map<LabelId, std::vector<std::pair<PropertyValue, Vertex *>>> label_property_cleanup;
std::map<PropertyId, std::vector<std::pair<PropertyValue, Vertex *>>> property_cleanup;
@ -904,6 +907,10 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
auto *vertex = prev.vertex;
auto guard = std::unique_lock{vertex->lock};
Delta *current = vertex->delta;
added_in_edges.clear();
added_out_edges.clear();
while (current != nullptr &&
current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
switch (current->action) {
@ -955,19 +962,43 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
break;
}
case Delta::Action::ADD_IN_EDGE: {
if (added_in_edges.empty()) {
uint64_t idx = 0;
for (auto &[type, opposing_vertex, ref] : vertex->in_edges) {
auto gid = config_.properties_on_edges ? ref.ptr->gid : ref.gid;
added_in_edges.emplace(gid, idx);
idx++;
}
}
auto wanted_gid =
config_.properties_on_edges ? current->vertex_edge.edge.ptr->gid : current->vertex_edge.edge.gid;
MG_ASSERT(!added_in_edges.contains(wanted_gid), "Invalid database state!");
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{current->vertex_edge.edge_type,
current->vertex_edge.vertex, current->vertex_edge.edge};
auto it = std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link);
MG_ASSERT(it == vertex->in_edges.end(), "Invalid database state!");
vertex->in_edges.push_back(link);
added_in_edges.emplace(wanted_gid, vertex->in_edges.size() - 1);
break;
}
case Delta::Action::ADD_OUT_EDGE: {
if (added_out_edges.empty()) {
uint64_t idx = 0;
for (auto &[type, opposing_vertex, ref] : vertex->out_edges) {
auto gid = config_.properties_on_edges ? ref.ptr->gid : ref.gid;
added_out_edges.emplace(gid, idx);
idx++;
}
}
auto wanted_gid =
config_.properties_on_edges ? current->vertex_edge.edge.ptr->gid : current->vertex_edge.edge.gid;
MG_ASSERT(!added_out_edges.contains(wanted_gid), "Invalid database state!");
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{current->vertex_edge.edge_type,
current->vertex_edge.vertex, current->vertex_edge.edge};
auto it = std::find(vertex->out_edges.begin(), vertex->out_edges.end(), link);
MG_ASSERT(it == vertex->out_edges.end(), "Invalid database state!");
vertex->out_edges.push_back(link);
added_out_edges.emplace(wanted_gid, vertex->out_edges.size() - 1);
// Increment edge count. We only increment the count here because
// the information in `ADD_IN_EDGE` and `Edge/RECREATE_OBJECT` is
// redundant. Also, `Edge/RECREATE_OBJECT` isn't available when
@ -976,21 +1007,55 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
break;
}
case Delta::Action::REMOVE_IN_EDGE: {
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{current->vertex_edge.edge_type,
current->vertex_edge.vertex, current->vertex_edge.edge};
auto it = std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link);
MG_ASSERT(it != vertex->in_edges.end(), "Invalid database state!");
std::swap(*it, *vertex->in_edges.rbegin());
if (added_in_edges.empty()) {
uint64_t idx = 0;
for (auto &[type, opposing_vertex, ref] : vertex->in_edges) {
auto gid = config_.properties_on_edges ? ref.ptr->gid : ref.gid;
added_in_edges.emplace(gid, idx);
idx++;
}
}
auto wanted_gid =
config_.properties_on_edges ? current->vertex_edge.edge.ptr->gid : current->vertex_edge.edge.gid;
MG_ASSERT(added_in_edges.count(wanted_gid), "Invalid database state!");
auto pop_index = added_in_edges[wanted_gid];
std::swap(vertex->in_edges[pop_index], *vertex->in_edges.rbegin());
vertex->in_edges.pop_back();
added_in_edges.erase(wanted_gid);
auto switched_vertex = std::get<2>(vertex->in_edges[pop_index]);
auto switch_gid = config_.properties_on_edges ? switched_vertex.ptr->gid : switched_vertex.gid;
added_in_edges[switch_gid] = pop_index;
break;
}
case Delta::Action::REMOVE_OUT_EDGE: {
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{current->vertex_edge.edge_type,
current->vertex_edge.vertex, current->vertex_edge.edge};
auto it = std::find(vertex->out_edges.begin(), vertex->out_edges.end(), link);
MG_ASSERT(it != vertex->out_edges.end(), "Invalid database state!");
std::swap(*it, *vertex->out_edges.rbegin());
if (added_out_edges.empty()) {
uint64_t idx = 0;
for (auto &[type, opposing_vertex, ref] : vertex->out_edges) {
auto gid = config_.properties_on_edges ? ref.ptr->gid : ref.gid;
added_out_edges.emplace(gid, idx);
idx++;
}
}
auto wanted_gid =
config_.properties_on_edges ? current->vertex_edge.edge.ptr->gid : current->vertex_edge.edge.gid;
MG_ASSERT(added_out_edges.count(wanted_gid), "Invalid database state!");
auto pop_index = added_out_edges[wanted_gid];
std::swap(vertex->out_edges[pop_index], *vertex->out_edges.rbegin());
vertex->out_edges.pop_back();
added_out_edges.erase(wanted_gid);
auto switched_vertex = std::get<2>(vertex->out_edges[pop_index]);
auto switch_gid = config_.properties_on_edges ? switched_vertex.ptr->gid : switched_vertex.gid;
added_out_edges[switch_gid] = pop_index;
// Decrement edge count. We only decrement the count here because
// the information in `REMOVE_IN_EDGE` and `Edge/DELETE_OBJECT` is
// redundant. Also, `Edge/DELETE_OBJECT` isn't available when edge