Compare commits
1 Commits
master
...
edges-supe
Author | SHA1 | Date | |
---|---|---|---|
|
4e6ca5df01 |
@ -176,9 +176,9 @@ InMemoryStorage::~InMemoryStorage() {
|
||||
committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); });
|
||||
}
|
||||
|
||||
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(auto tag, InMemoryStorage *storage, IsolationLevel isolation_level,
|
||||
StorageMode storage_mode,
|
||||
memgraph::replication_coordination_glue::ReplicationRole replication_role)
|
||||
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(
|
||||
auto tag, InMemoryStorage *storage, IsolationLevel isolation_level, StorageMode storage_mode,
|
||||
memgraph::replication_coordination_glue::ReplicationRole replication_role)
|
||||
: Accessor(tag, storage, isolation_level, storage_mode, replication_role),
|
||||
config_(storage->config_.salient.items) {}
|
||||
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(InMemoryAccessor &&other) noexcept
|
||||
@ -882,6 +882,9 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
std::list<Gid> my_deleted_vertices;
|
||||
std::list<Gid> my_deleted_edges;
|
||||
|
||||
std::unordered_map<Gid, uint64_t> added_in_edges;
|
||||
std::unordered_map<Gid, uint64_t> added_out_edges;
|
||||
|
||||
std::map<LabelId, std::vector<Vertex *>> label_cleanup;
|
||||
std::map<LabelId, std::vector<std::pair<PropertyValue, Vertex *>>> label_property_cleanup;
|
||||
std::map<PropertyId, std::vector<std::pair<PropertyValue, Vertex *>>> property_cleanup;
|
||||
@ -904,6 +907,10 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
auto *vertex = prev.vertex;
|
||||
auto guard = std::unique_lock{vertex->lock};
|
||||
Delta *current = vertex->delta;
|
||||
|
||||
added_in_edges.clear();
|
||||
added_out_edges.clear();
|
||||
|
||||
while (current != nullptr &&
|
||||
current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
|
||||
switch (current->action) {
|
||||
@ -955,19 +962,43 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_IN_EDGE: {
|
||||
if (added_in_edges.empty()) {
|
||||
uint64_t idx = 0;
|
||||
for (auto &[type, opposing_vertex, ref] : vertex->in_edges) {
|
||||
auto gid = config_.properties_on_edges ? ref.ptr->gid : ref.gid;
|
||||
added_in_edges.emplace(gid, idx);
|
||||
idx++;
|
||||
}
|
||||
}
|
||||
|
||||
auto wanted_gid =
|
||||
config_.properties_on_edges ? current->vertex_edge.edge.ptr->gid : current->vertex_edge.edge.gid;
|
||||
MG_ASSERT(!added_in_edges.contains(wanted_gid), "Invalid database state!");
|
||||
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{current->vertex_edge.edge_type,
|
||||
current->vertex_edge.vertex, current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link);
|
||||
MG_ASSERT(it == vertex->in_edges.end(), "Invalid database state!");
|
||||
vertex->in_edges.push_back(link);
|
||||
added_in_edges.emplace(wanted_gid, vertex->in_edges.size() - 1);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_OUT_EDGE: {
|
||||
if (added_out_edges.empty()) {
|
||||
uint64_t idx = 0;
|
||||
for (auto &[type, opposing_vertex, ref] : vertex->out_edges) {
|
||||
auto gid = config_.properties_on_edges ? ref.ptr->gid : ref.gid;
|
||||
added_out_edges.emplace(gid, idx);
|
||||
idx++;
|
||||
}
|
||||
}
|
||||
|
||||
auto wanted_gid =
|
||||
config_.properties_on_edges ? current->vertex_edge.edge.ptr->gid : current->vertex_edge.edge.gid;
|
||||
MG_ASSERT(!added_out_edges.contains(wanted_gid), "Invalid database state!");
|
||||
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{current->vertex_edge.edge_type,
|
||||
current->vertex_edge.vertex, current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->out_edges.begin(), vertex->out_edges.end(), link);
|
||||
MG_ASSERT(it == vertex->out_edges.end(), "Invalid database state!");
|
||||
vertex->out_edges.push_back(link);
|
||||
added_out_edges.emplace(wanted_gid, vertex->out_edges.size() - 1);
|
||||
// Increment edge count. We only increment the count here because
|
||||
// the information in `ADD_IN_EDGE` and `Edge/RECREATE_OBJECT` is
|
||||
// redundant. Also, `Edge/RECREATE_OBJECT` isn't available when
|
||||
@ -976,21 +1007,55 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_IN_EDGE: {
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{current->vertex_edge.edge_type,
|
||||
current->vertex_edge.vertex, current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link);
|
||||
MG_ASSERT(it != vertex->in_edges.end(), "Invalid database state!");
|
||||
std::swap(*it, *vertex->in_edges.rbegin());
|
||||
if (added_in_edges.empty()) {
|
||||
uint64_t idx = 0;
|
||||
for (auto &[type, opposing_vertex, ref] : vertex->in_edges) {
|
||||
auto gid = config_.properties_on_edges ? ref.ptr->gid : ref.gid;
|
||||
added_in_edges.emplace(gid, idx);
|
||||
idx++;
|
||||
}
|
||||
}
|
||||
|
||||
auto wanted_gid =
|
||||
config_.properties_on_edges ? current->vertex_edge.edge.ptr->gid : current->vertex_edge.edge.gid;
|
||||
|
||||
MG_ASSERT(added_in_edges.count(wanted_gid), "Invalid database state!");
|
||||
|
||||
auto pop_index = added_in_edges[wanted_gid];
|
||||
|
||||
std::swap(vertex->in_edges[pop_index], *vertex->in_edges.rbegin());
|
||||
vertex->in_edges.pop_back();
|
||||
added_in_edges.erase(wanted_gid);
|
||||
|
||||
auto switched_vertex = std::get<2>(vertex->in_edges[pop_index]);
|
||||
auto switch_gid = config_.properties_on_edges ? switched_vertex.ptr->gid : switched_vertex.gid;
|
||||
added_in_edges[switch_gid] = pop_index;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{current->vertex_edge.edge_type,
|
||||
current->vertex_edge.vertex, current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->out_edges.begin(), vertex->out_edges.end(), link);
|
||||
MG_ASSERT(it != vertex->out_edges.end(), "Invalid database state!");
|
||||
std::swap(*it, *vertex->out_edges.rbegin());
|
||||
if (added_out_edges.empty()) {
|
||||
uint64_t idx = 0;
|
||||
for (auto &[type, opposing_vertex, ref] : vertex->out_edges) {
|
||||
auto gid = config_.properties_on_edges ? ref.ptr->gid : ref.gid;
|
||||
added_out_edges.emplace(gid, idx);
|
||||
idx++;
|
||||
}
|
||||
}
|
||||
|
||||
auto wanted_gid =
|
||||
config_.properties_on_edges ? current->vertex_edge.edge.ptr->gid : current->vertex_edge.edge.gid;
|
||||
|
||||
MG_ASSERT(added_out_edges.count(wanted_gid), "Invalid database state!");
|
||||
|
||||
auto pop_index = added_out_edges[wanted_gid];
|
||||
|
||||
std::swap(vertex->out_edges[pop_index], *vertex->out_edges.rbegin());
|
||||
vertex->out_edges.pop_back();
|
||||
added_out_edges.erase(wanted_gid);
|
||||
|
||||
auto switched_vertex = std::get<2>(vertex->out_edges[pop_index]);
|
||||
auto switch_gid = config_.properties_on_edges ? switched_vertex.ptr->gid : switched_vertex.gid;
|
||||
added_out_edges[switch_gid] = pop_index;
|
||||
// Decrement edge count. We only decrement the count here because
|
||||
// the information in `REMOVE_IN_EDGE` and `Edge/DELETE_OBJECT` is
|
||||
// redundant. Also, `Edge/DELETE_OBJECT` isn't available when edge
|
||||
|
Loading…
Reference in New Issue
Block a user