Merge branch 'project-pineapples' into T1149-MG-split-shard

This commit is contained in:
jbajic 2023-01-16 13:54:16 +01:00
commit 8a1dd54735
17 changed files with 163 additions and 190 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -283,7 +283,7 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
// TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info
bool machine_contains_shard = false;
for (auto &aas : shard) {
for (auto &aas : shard.peers) {
if (initialized.contains(aas.address.unique_id)) {
machine_contains_shard = true;
if (aas.status != Status::CONSENSUS_PARTICIPANT) {
@ -311,7 +311,7 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
}
}
if (!machine_contains_shard && shard.size() < label_space.replication_factor) {
if (!machine_contains_shard && shard.peers.size() < label_space.replication_factor) {
// increment version for each new uuid for deterministic creation
IncrementShardMapVersion();
@ -337,7 +337,7 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
.status = Status::INITIALIZING,
};
shard.emplace_back(aas);
shard.peers.emplace_back(aas);
}
}
}
@ -360,9 +360,9 @@ bool ShardMap::SplitShard(Hlc previous_shard_map_version, LabelId label_id, cons
MG_ASSERT(!shards_in_map.contains(key));
MG_ASSERT(label_spaces.contains(label_id));
// Finding the Shard that the new PrimaryKey should map to.
// Finding the ShardMetadata that the new PrimaryKey should map to.
auto prev = std::prev(shards_in_map.upper_bound(key));
Shard duplicated_shard = prev->second;
ShardMetadata duplicated_shard = prev->second;
// Apply the split
shards_in_map[key] = duplicated_shard;
@ -383,7 +383,7 @@ std::optional<LabelId> ShardMap::InitializeNewLabel(std::string label_name, std:
labels.emplace(std::move(label_name), label_id);
PrimaryKey initial_key = SchemaToMinKey(schema);
Shard empty_shard = {};
ShardMetadata empty_shard = {};
Shards shards = {
{initial_key, empty_shard},
@ -479,7 +479,7 @@ Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey
return shards;
}
Shard ShardMap::GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const {
ShardMetadata ShardMap::GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const {
MG_ASSERT(labels.contains(label_name));
LabelId label_id = labels.at(label_name);
@ -492,7 +492,7 @@ Shard ShardMap::GetShardForKey(const LabelName &label_name, const PrimaryKey &ke
return std::prev(label_space.shards.upper_bound(key))->second;
}
Shard ShardMap::GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const {
ShardMetadata ShardMap::GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const {
MG_ASSERT(label_spaces.contains(label_id));
const auto &label_space = label_spaces.at(label_id);
@ -556,12 +556,12 @@ EdgeTypeIdMap ShardMap::AllocateEdgeTypeIds(const std::vector<EdgeTypeName> &new
bool ShardMap::ClusterInitialized() const {
for (const auto &[label_id, label_space] : label_spaces) {
for (const auto &[low_key, shard] : label_space.shards) {
if (shard.size() < label_space.replication_factor) {
if (shard.peers.size() < label_space.replication_factor) {
spdlog::info("label_space below desired replication factor");
return false;
}
for (const auto &aas : shard) {
for (const auto &aas : shard.peers) {
if (aas.status != Status::CONSENSUS_PARTICIPANT) {
spdlog::info("shard member not yet a CONSENSUS_PARTICIPANT");
return false;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -76,8 +76,35 @@ struct AddressAndStatus {
};
using PrimaryKey = std::vector<PropertyValue>;
using Shard = std::vector<AddressAndStatus>;
using Shards = std::map<PrimaryKey, Shard>;
struct ShardMetadata {
std::vector<AddressAndStatus> peers;
uint64_t version;
friend std::ostream &operator<<(std::ostream &in, const ShardMetadata &shard) {
using utils::print_helpers::operator<<;
in << "ShardMetadata { peers: ";
in << shard.peers;
in << " version: ";
in << shard.version;
in << " }";
return in;
}
friend bool operator==(const ShardMetadata &lhs, const ShardMetadata &rhs) = default;
friend bool operator<(const ShardMetadata &lhs, const ShardMetadata &rhs) {
if (lhs.peers != rhs.peers) {
return lhs.peers < rhs.peers;
}
return lhs.version < rhs.version;
}
};
using Shards = std::map<PrimaryKey, ShardMetadata>;
using LabelName = std::string;
using PropertyName = std::string;
using EdgeTypeName = std::string;
@ -99,7 +126,7 @@ PrimaryKey SchemaToMinKey(const std::vector<SchemaProperty> &schema);
struct LabelSpace {
std::vector<SchemaProperty> schema;
// Maps between the smallest primary key stored in the shard and the shard
std::map<PrimaryKey, Shard> shards;
std::map<PrimaryKey, ShardMetadata> shards;
size_t replication_factor;
friend std::ostream &operator<<(std::ostream &in, const LabelSpace &label_space) {
@ -160,9 +187,9 @@ struct ShardMap {
Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const;
Shard GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const;
ShardMetadata GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const;
Shard GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const;
ShardMetadata GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const;
PropertyMap AllocatePropertyIds(const std::vector<PropertyName> &new_properties);

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -50,7 +50,7 @@ template <typename TStorageClient>
class RsmStorageClientManager {
public:
using CompoundKey = io::rsm::ShardRsmKey;
using Shard = coordinator::Shard;
using ShardMetadata = coordinator::ShardMetadata;
RsmStorageClientManager() = default;
RsmStorageClientManager(const RsmStorageClientManager &) = delete;
RsmStorageClientManager(RsmStorageClientManager &&) = delete;
@ -58,25 +58,25 @@ class RsmStorageClientManager {
RsmStorageClientManager &operator=(RsmStorageClientManager &&) = delete;
~RsmStorageClientManager() = default;
void AddClient(Shard key, TStorageClient client) { cli_cache_.emplace(std::move(key), std::move(client)); }
void AddClient(ShardMetadata key, TStorageClient client) { cli_cache_.emplace(std::move(key), std::move(client)); }
bool Exists(const Shard &key) { return cli_cache_.contains(key); }
bool Exists(const ShardMetadata &key) { return cli_cache_.contains(key); }
void PurgeCache() { cli_cache_.clear(); }
TStorageClient &GetClient(const Shard &key) {
TStorageClient &GetClient(const ShardMetadata &key) {
auto it = cli_cache_.find(key);
MG_ASSERT(it != cli_cache_.end(), "Non-existing shard client");
return it->second;
}
private:
std::map<Shard, TStorageClient> cli_cache_;
std::map<ShardMetadata, TStorageClient> cli_cache_;
};
template <typename TRequest>
struct ShardRequestState {
memgraph::coordinator::Shard shard;
memgraph::coordinator::ShardMetadata shard;
TRequest request;
};
@ -125,7 +125,7 @@ class RequestRouter : public RequestRouterInterface {
using CoordinatorWriteRequests = coordinator::CoordinatorWriteRequests;
using CoordinatorClient = coordinator::CoordinatorClient<TTransport>;
using Address = io::Address;
using Shard = coordinator::Shard;
using ShardMetadata = coordinator::ShardMetadata;
using ShardMap = coordinator::ShardMap;
using CompoundKey = coordinator::PrimaryKey;
using VertexAccessor = query::v2::accessors::VertexAccessor;
@ -403,7 +403,7 @@ class RequestRouter : public RequestRouterInterface {
private:
std::vector<ShardRequestState<msgs::CreateVerticesRequest>> RequestsForCreateVertices(
const std::vector<msgs::NewVertex> &new_vertices) {
std::map<Shard, msgs::CreateVerticesRequest> per_shard_request_table;
std::map<ShardMetadata, msgs::CreateVerticesRequest> per_shard_request_table;
for (auto &new_vertex : new_vertices) {
MG_ASSERT(!new_vertex.label_ids.empty(), "No label_ids provided for new vertex in RequestRouter::CreateVertices");
@ -431,9 +431,9 @@ class RequestRouter : public RequestRouterInterface {
std::vector<ShardRequestState<msgs::CreateExpandRequest>> RequestsForCreateExpand(
const std::vector<msgs::NewExpand> &new_expands) {
std::map<Shard, msgs::CreateExpandRequest> per_shard_request_table;
std::map<ShardMetadata, msgs::CreateExpandRequest> per_shard_request_table;
auto ensure_shard_exists_in_table = [&per_shard_request_table,
transaction_id = transaction_id_](const Shard &shard) {
transaction_id = transaction_id_](const ShardMetadata &shard) {
if (!per_shard_request_table.contains(shard)) {
msgs::CreateExpandRequest create_expand_request{.transaction_id = transaction_id};
per_shard_request_table.insert({shard, std::move(create_expand_request)});
@ -484,7 +484,7 @@ class RequestRouter : public RequestRouterInterface {
for (auto &shards : multi_shards) {
for (auto &[key, shard] : shards) {
MG_ASSERT(!shard.empty());
MG_ASSERT(!shard.peers.empty());
msgs::ScanVerticesRequest request;
request.transaction_id = transaction_id_;
@ -503,7 +503,7 @@ class RequestRouter : public RequestRouterInterface {
}
std::vector<ShardRequestState<msgs::ExpandOneRequest>> RequestsForExpandOne(const msgs::ExpandOneRequest &request) {
std::map<Shard, msgs::ExpandOneRequest> per_shard_request_table;
std::map<ShardMetadata, msgs::ExpandOneRequest> per_shard_request_table;
msgs::ExpandOneRequest top_level_rqst_template = request;
top_level_rqst_template.transaction_id = transaction_id_;
top_level_rqst_template.src_vertices.clear();
@ -533,7 +533,7 @@ class RequestRouter : public RequestRouterInterface {
std::vector<ShardRequestState<msgs::GetPropertiesRequest>> RequestsForGetProperties(
msgs::GetPropertiesRequest &&request) {
std::map<Shard, msgs::GetPropertiesRequest> per_shard_request_table;
std::map<ShardMetadata, msgs::GetPropertiesRequest> per_shard_request_table;
auto top_level_rqst_template = request;
top_level_rqst_template.transaction_id = transaction_id_;
top_level_rqst_template.vertex_ids.clear();
@ -571,7 +571,7 @@ class RequestRouter : public RequestRouterInterface {
return requests;
}
StorageClient &GetStorageClientForShard(Shard shard) {
StorageClient &GetStorageClientForShard(ShardMetadata shard) {
if (!storage_cli_manager_.Exists(shard)) {
AddStorageClientToManager(shard);
}
@ -583,12 +583,12 @@ class RequestRouter : public RequestRouterInterface {
return GetStorageClientForShard(std::move(shard));
}
void AddStorageClientToManager(Shard target_shard) {
MG_ASSERT(!target_shard.empty());
auto leader_addr = target_shard.front();
void AddStorageClientToManager(ShardMetadata target_shard) {
MG_ASSERT(!target_shard.peers.empty());
auto leader_addr = target_shard.peers.front();
std::vector<Address> addresses;
addresses.reserve(target_shard.size());
for (auto &address : target_shard) {
addresses.reserve(target_shard.peers.size());
for (auto &address : target_shard.peers) {
addresses.push_back(std::move(address.address));
}
auto cli = StorageClient(io_, std::move(leader_addr.address), std::move(addresses));

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -162,23 +162,23 @@ class DbAccessor final {
return accessor_->LabelPropertyIndexExists(label, prop);
}
int64_t VerticesCount() const { return accessor_->VertexCount(); }
int64_t VerticesCount() const { return accessor_->ApproximateVertexCount(); }
int64_t VerticesCount(storage::v3::LabelId label) const { return accessor_->ApproximateVertexCount(label); }
int64_t VerticesCount(storage::v3::LabelId label, storage::v3::PropertyId property) const {
return accessor_->VertexCount(label, property);
return accessor_->ApproximateVertexCount(label, property);
}
int64_t VerticesCount(storage::v3::LabelId label, storage::v3::PropertyId property,
const storage::v3::PropertyValue &value) const {
return accessor_->VertexCount(label, property, value);
return accessor_->ApproximateVertexCount(label, property, value);
}
int64_t VerticesCount(storage::v3::LabelId label, storage::v3::PropertyId property,
const std::optional<utils::Bound<storage::v3::PropertyValue>> &lower,
const std::optional<utils::Bound<storage::v3::PropertyValue>> &upper) const {
return accessor_->VertexCount(label, property, lower, upper);
return accessor_->ApproximateVertexCount(label, property, lower, upper);
}
storage::v3::IndicesInfo ListAllIndices() const { return accessor_->ListAllIndices(); }

View File

@ -11,6 +11,7 @@
#pragma once
#include <cstdint>
#include <memory>
#include <boost/uuid/uuid.hpp>
@ -136,7 +137,7 @@ struct Delta {
// TODO Replace this with int identifier
boost::uuids::uuid uuid{boost::uuids::uuid()};
enum class Action {
enum class Action : uint8_t {
// Used for both Vertex and Edge
DELETE_OBJECT,
RECREATE_OBJECT,

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -12,8 +12,6 @@
#include "indices.hpp"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <limits>
@ -271,8 +269,7 @@ bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label, Propert
void LabelIndex::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx) {
auto it = index_.find(label);
if (it == index_.end()) return;
auto acc = it->second.access();
acc.insert(Entry{vertex, tx.start_timestamp.logical_id});
it->second.insert(Entry{vertex, tx.start_timestamp.logical_id});
}
bool LabelIndex::CreateIndex(LabelId label, VertexContainer &vertices) {
@ -283,12 +280,11 @@ bool LabelIndex::CreateIndex(LabelId label, VertexContainer &vertices) {
return false;
}
try {
auto acc = it->second.access();
for ([[maybe_unused]] auto &vertex : vertices) {
for (auto &vertex : vertices) {
if (vertex.second.deleted || !VertexHasLabel(vertex, label)) {
continue;
}
acc.insert(Entry{&vertex, 0});
it->second.insert(Entry{&vertex, 0});
}
} catch (const utils::OutOfMemoryException &) {
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker;
@ -309,7 +305,7 @@ std::vector<LabelId> LabelIndex::ListIndices() const {
void LabelIndex::RemoveObsoleteEntries(const uint64_t clean_up_before_timestamp) {
for (auto &label_storage : index_) {
auto vertices_acc = label_storage.second.access();
auto &vertices_acc = label_storage.second;
for (auto it = vertices_acc.begin(); it != vertices_acc.end();) {
auto next_it = it;
++next_it;
@ -321,7 +317,7 @@ void LabelIndex::RemoveObsoleteEntries(const uint64_t clean_up_before_timestamp)
if ((next_it != vertices_acc.end() && it->vertex == next_it->vertex) ||
!AnyVersionHasLabel(*it->vertex, label_storage.first, clean_up_before_timestamp)) {
vertices_acc.remove(*it);
vertices_acc.erase(*it);
}
it = next_it;
@ -329,7 +325,7 @@ void LabelIndex::RemoveObsoleteEntries(const uint64_t clean_up_before_timestamp)
}
}
LabelIndex::Iterable::Iterator::Iterator(Iterable *self, utils::SkipList<Entry>::Iterator index_iterator)
LabelIndex::Iterable::Iterator::Iterator(Iterable *self, LabelIndexContainer::iterator index_iterator)
: self_(self),
index_iterator_(index_iterator),
current_vertex_accessor_(nullptr, nullptr, nullptr, self_->config_, *self_->vertex_validator_),
@ -344,7 +340,7 @@ LabelIndex::Iterable::Iterator &LabelIndex::Iterable::Iterator::operator++() {
}
void LabelIndex::Iterable::Iterator::AdvanceUntilValid() {
for (; index_iterator_ != self_->index_accessor_.end(); ++index_iterator_) {
for (; index_iterator_ != self_->index_container_->end(); ++index_iterator_) {
if (index_iterator_->vertex == current_vertex_) {
continue;
}
@ -357,10 +353,9 @@ void LabelIndex::Iterable::Iterator::AdvanceUntilValid() {
}
}
LabelIndex::Iterable::Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label, View view,
Transaction *transaction, Indices *indices, Config::Items config,
const VertexValidator &vertex_validator)
: index_accessor_(std::move(index_accessor)),
LabelIndex::Iterable::Iterable(LabelIndexContainer &index_container, LabelId label, View view, Transaction *transaction,
Indices *indices, Config::Items config, const VertexValidator &vertex_validator)
: index_container_(&index_container),
label_(label),
view_(view),
transaction_(transaction),
@ -422,7 +417,7 @@ bool LabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, VertexC
return false;
}
try {
for ([[maybe_unused]] auto &vertex : vertices) {
for (auto &vertex : vertices) {
if (vertex.second.deleted || !VertexHasLabel(vertex, label)) {
continue;
}
@ -485,7 +480,7 @@ LabelPropertyIndex::Iterable::Iterator &LabelPropertyIndex::Iterable::Iterator::
}
void LabelPropertyIndex::Iterable::Iterator::AdvanceUntilValid() {
for (; index_iterator_ != self_->index_accessor_->end(); ++index_iterator_) {
for (; index_iterator_ != self_->index_container_->end(); ++index_iterator_) {
if (index_iterator_->vertex == current_vertex_) {
continue;
}
@ -500,11 +495,11 @@ void LabelPropertyIndex::Iterable::Iterator::AdvanceUntilValid() {
}
if (self_->upper_bound_) {
if (self_->upper_bound_->value() < index_iterator_->value) {
index_iterator_ = self_->index_accessor_->end();
index_iterator_ = self_->index_container_->end();
break;
}
if (!self_->upper_bound_->IsInclusive() && index_iterator_->value == self_->upper_bound_->value()) {
index_iterator_ = self_->index_accessor_->end();
index_iterator_ = self_->index_container_->end();
break;
}
}
@ -531,12 +526,12 @@ const PropertyValue kSmallestMap = PropertyValue(std::map<std::string, PropertyV
const PropertyValue kSmallestTemporalData =
PropertyValue(TemporalData{static_cast<TemporalType>(0), std::numeric_limits<int64_t>::min()});
LabelPropertyIndex::Iterable::Iterable(LabelPropertyIndexContainer &index_accessor, LabelId label, PropertyId property,
LabelPropertyIndex::Iterable::Iterable(LabelPropertyIndexContainer &index_container, LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view,
Transaction *transaction, Indices *indices, Config::Items config,
const VertexValidator &vertex_validator)
: index_accessor_(&index_accessor),
: index_container_(&index_container),
label_(label),
property_(property),
lower_bound_(lower_bound),
@ -644,29 +639,25 @@ LabelPropertyIndex::Iterable::Iterator LabelPropertyIndex::Iterable::begin() {
// If the bounds are set and don't have comparable types we don't yield any
// items from the index.
if (!bounds_valid_) {
return {this, index_accessor_->end()};
return {this, index_container_->end()};
}
auto index_iterator = index_accessor_->begin();
if (lower_bound_) {
index_iterator = std::ranges::find_if(*index_accessor_, [lower_bound = lower_bound_->value()](const auto &pair) {
return pair.value >= lower_bound;
});
return {this, std::ranges::lower_bound(*index_container_, lower_bound_->value(), std::less{}, &Entry::value)};
}
return {this, index_iterator};
return {this, index_container_->begin()};
}
LabelPropertyIndex::Iterable::Iterator LabelPropertyIndex::Iterable::end() { return {this, index_accessor_->end()}; }
LabelPropertyIndex::Iterable::Iterator LabelPropertyIndex::Iterable::end() { return {this, index_container_->end()}; }
int64_t LabelPropertyIndex::VertexCount(LabelId label, PropertyId property, const PropertyValue &value) const {
auto it = index_.find({label, property});
MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint());
if (!value.IsNull()) {
return static_cast<int64_t>(
std::ranges::count_if(it->second, [&value](const auto &elem) { return elem.value == value; }));
}
// TODO Do check this
MG_ASSERT(!value.IsNull(), "Null is not supported!");
// TODO(jbajic) This can be improved by exiting early
auto start_it = std::ranges::lower_bound(it->second, value, std::less{}, &Entry::value);
return static_cast<int64_t>(
std::ranges::count_if(it->second, [&value](const auto &elem) { return elem.value == value; }));
std::ranges::count_if(start_it, it->second.end(), [&value](const auto &elem) { return elem.value == value; }));
}
int64_t LabelPropertyIndex::VertexCount(LabelId label, PropertyId property,
@ -678,11 +669,9 @@ int64_t LabelPropertyIndex::VertexCount(LabelId label, PropertyId property,
[&index = it->second](const auto value, const auto def) {
if (value) {
if (value->IsInclusive()) {
return std::lower_bound(index.begin(), index.end(), value->value(),
[](const Entry &elem, const PropertyValue &val) { return elem.value < val; });
return std::ranges::lower_bound(index, value->value(), std::less{}, &Entry::value);
}
return std::upper_bound(index.begin(), index.end(), value->value(),
[](const PropertyValue &val, const Entry &elem) { return val < elem.value; });
return std::ranges::upper_bound(index, value->value(), std::less{}, &Entry::value);
}
return def;
},
@ -691,11 +680,9 @@ int64_t LabelPropertyIndex::VertexCount(LabelId label, PropertyId property,
[&index = it->second](const auto value, const auto def) {
if (value) {
if (value->IsInclusive()) {
return std::upper_bound(index.begin(), index.end(), value->value(),
[](const PropertyValue &val, const Entry &elem) { return val < elem.value; });
return std::ranges::upper_bound(index, value->value(), std::less{}, &Entry::value);
}
return std::lower_bound(index.begin(), index.end(), value->value(),
[](const Entry &elem, const PropertyValue &val) { return elem.value < val; });
return std::ranges::lower_bound(index, value->value(), std::less{}, &Entry::value);
}
return def;
},

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -30,7 +30,6 @@ namespace memgraph::storage::v3 {
struct Indices;
class LabelIndex {
private:
struct Entry {
Vertex *vertex;
uint64_t timestamp;
@ -41,17 +40,9 @@ class LabelIndex {
bool operator==(const Entry &rhs) const { return vertex == rhs.vertex && timestamp == rhs.timestamp; }
};
struct LabelStorage {
LabelId label;
utils::SkipList<Entry> vertices;
bool operator<(const LabelStorage &rhs) const { return label < rhs.label; }
bool operator<(LabelId rhs) const { return label < rhs; }
bool operator==(const LabelStorage &rhs) const { return label == rhs.label; }
bool operator==(LabelId rhs) const { return label == rhs; }
};
public:
using LabelIndexContainer = std::set<Entry>;
LabelIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator)
: indices_(indices), config_(config), vertex_validator_{&vertex_validator} {}
@ -72,12 +63,12 @@ class LabelIndex {
class Iterable {
public:
Iterable(utils::SkipList<Entry>::Accessor index_accessor, LabelId label, View view, Transaction *transaction,
Indices *indices, Config::Items config, const VertexValidator &vertex_validator);
Iterable(LabelIndexContainer &index_container, LabelId label, View view, Transaction *transaction, Indices *indices,
Config::Items config, const VertexValidator &vertex_validator);
class Iterator {
public:
Iterator(Iterable *self, utils::SkipList<Entry>::Iterator index_iterator);
Iterator(Iterable *self, LabelIndexContainer::iterator index_iterator);
VertexAccessor operator*() const { return current_vertex_accessor_; }
@ -90,16 +81,16 @@ class LabelIndex {
void AdvanceUntilValid();
Iterable *self_;
utils::SkipList<Entry>::Iterator index_iterator_;
LabelIndexContainer::iterator index_iterator_;
VertexAccessor current_vertex_accessor_;
Vertex *current_vertex_;
};
Iterator begin() { return {this, index_accessor_.begin()}; }
Iterator end() { return {this, index_accessor_.end()}; }
Iterator begin() { return {this, index_container_->begin()}; }
Iterator end() { return {this, index_container_->end()}; }
private:
utils::SkipList<Entry>::Accessor index_accessor_;
LabelIndexContainer *index_container_;
LabelId label_;
View view_;
Transaction *transaction_;
@ -112,7 +103,7 @@ class LabelIndex {
Iterable Vertices(LabelId label, View view, Transaction *transaction) {
auto it = index_.find(label);
MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint());
return {it->second.access(), label, view, transaction, indices_, config_, *vertex_validator_};
return {it->second, label, view, transaction, indices_, config_, *vertex_validator_};
}
int64_t ApproximateVertexCount(LabelId label) {
@ -124,7 +115,7 @@ class LabelIndex {
void Clear() { index_.clear(); }
private:
std::map<LabelId, utils::SkipList<Entry>> index_;
std::map<LabelId, LabelIndexContainer> index_;
Indices *indices_;
Config::Items config_;
const VertexValidator *vertex_validator_;
@ -168,7 +159,7 @@ class LabelPropertyIndex {
class Iterable {
public:
Iterable(LabelPropertyIndexContainer &index_accessor, LabelId label, PropertyId property,
Iterable(LabelPropertyIndexContainer &index_container, LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Transaction *transaction,
Indices *indices, Config::Items config, const VertexValidator &vertex_validator);
@ -197,7 +188,7 @@ class LabelPropertyIndex {
Iterator end();
private:
LabelPropertyIndexContainer *index_accessor_;
LabelPropertyIndexContainer *index_container_;
LabelId label_;
PropertyId property_;
std::optional<utils::Bound<PropertyValue>> lower_bound_;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -12,6 +12,7 @@
#pragma once
#include <type_traits>
#include "storage/v3/edge.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/transaction.hpp"
@ -21,6 +22,10 @@
namespace memgraph::storage::v3 {
inline VertexData *GetDeltaHolder(Vertex *vertex) { return &vertex->second; }
inline Edge *GetDeltaHolder(Edge *edge) { return edge; }
/// This function iterates through the undo buffers from an object (starting
/// from the supplied delta) and determines what deltas should be applied to get
/// the currently visible version of the object. When the function finds a delta
@ -83,13 +88,7 @@ inline void ApplyDeltasForRead(Transaction *transaction, const Delta *delta, Vie
template <typename TObj>
requires utils::SameAsAnyOf<TObj, Edge, Vertex>
inline bool PrepareForWrite(Transaction *transaction, TObj *object) {
auto *delta_holder = std::invoke([object]() -> auto * {
if constexpr (std::is_same_v<TObj, Vertex>) {
return &object->second;
} else {
return object;
}
});
auto *delta_holder = GetDeltaHolder(object);
if (delta_holder->delta == nullptr) return true;
const auto &delta_commit_info = *delta_holder->delta->commit_info;
@ -121,13 +120,7 @@ requires utils::SameAsAnyOf<TObj, Edge, Vertex>
inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&...args) {
auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)..., transaction->commit_info.get(),
transaction->command_id);
auto *delta_holder = std::invoke([object]() -> auto * {
if constexpr (std::is_same_v<TObj, Vertex>) {
return &object->second;
} else {
return object;
}
});
auto *delta_holder = GetDeltaHolder(object);
// The operations are written in such order so that both `next` and `prev`
// chains are valid at all times. The chains must be valid at all times

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -330,36 +330,6 @@ inline bool operator<(const PropertyValue &first, const PropertyValue &second) {
}
}
inline bool operator>=(const PropertyValue &first, const PropertyValue &second) {
if (!PropertyValue::AreComparableTypes(first.type(), second.type())) return first.type() >= second.type();
switch (first.type()) {
case PropertyValue::Type::Null:
return false;
case PropertyValue::Type::Bool:
return first.ValueBool() >= second.ValueBool();
case PropertyValue::Type::Int:
if (second.type() == PropertyValue::Type::Double) {
return static_cast<double>(first.ValueInt()) >= second.ValueDouble();
} else {
return first.ValueInt() >= second.ValueInt();
}
case PropertyValue::Type::Double:
if (second.type() == PropertyValue::Type::Double) {
return first.ValueDouble() >= second.ValueDouble();
} else {
return first.ValueDouble() >= static_cast<double>(second.ValueInt());
}
case PropertyValue::Type::String:
return first.ValueString() >= second.ValueString();
case PropertyValue::Type::List:
return first.ValueList() >= second.ValueList();
case PropertyValue::Type::Map:
return first.ValueMap() >= second.ValueMap();
case PropertyValue::Type::TemporalData:
return first.ValueTemporalData() >= second.ValueTemporalData();
}
}
inline PropertyValue::PropertyValue(const PropertyValue &other) : type_(other.type_) {
switch (other.type_) {
case Type::Null:

View File

@ -1014,7 +1014,7 @@ void Shard::CollectGarbage(const io::Time current_time) {
RemoveObsoleteEntries(&indices_, clean_up_before_timestamp);
}
for (const auto &vertex : deleted_vertices_) {
for (const auto *vertex : deleted_vertices_) {
MG_ASSERT(vertices_.erase(*vertex), "Invalid database state!");
}
deleted_vertices_.clear();

View File

@ -239,30 +239,34 @@ class Shard final {
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view);
/// Return number of all vertices in the database.
int64_t VertexCount() const { return static_cast<int64_t>(shard_->vertices_.size()); }
/// Return approximate number of all vertices in the database.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount() const { return static_cast<int64_t>(shard_->vertices_.size()); }
/// Return approximate number of vertices with the given label.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label) const {
return shard_->indices_.label_index.ApproximateVertexCount(label);
}
/// Return number of vertices with the given label and property.
int64_t VertexCount(LabelId label, PropertyId property) const {
/// Return approximate number of vertices with the given label and property.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label, PropertyId property) const {
return shard_->indices_.label_property_index.VertexCount(label, property);
}
/// Return number of vertices with the given label and the given
int64_t VertexCount(LabelId label, PropertyId property, const PropertyValue &value) const {
/// Return approximate number of vertices with the given label and the given
/// value for the given property.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label, PropertyId property, const PropertyValue &value) const {
return shard_->indices_.label_property_index.VertexCount(label, property, value);
}
/// Return number of vertices with the given label and value for
/// Return approximate number of vertices with the given label and value for
/// the given property in the range defined by provided upper and lower
/// bounds.
int64_t VertexCount(LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower,
const std::optional<utils::Bound<PropertyValue>> &upper) const {
int64_t ApproximateVertexCount(LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower,
const std::optional<utils::Bound<PropertyValue>> &upper) const {
return shard_->indices_.label_property_index.VertexCount(label, property, lower, upper);
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -46,8 +46,8 @@ using coordinator::CoordinatorClient;
using coordinator::CoordinatorRsm;
using coordinator::HlcRequest;
using coordinator::HlcResponse;
using coordinator::Shard;
using coordinator::ShardMap;
using coordinator::ShardMetadata;
using coordinator::Shards;
using coordinator::Status;
using io::Address;
@ -113,7 +113,7 @@ ShardMap CreateDummyShardmap(coordinator::Address a_io_1, coordinator::Address a
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard1 = {aas1_1, aas1_2, aas1_3};
ShardMetadata shard1 = ShardMetadata{.peers = {aas1_1, aas1_2, aas1_3}, .version = 1};
auto key1 = storage::v3::PropertyValue(0);
auto key2 = storage::v3::PropertyValue(0);
@ -125,7 +125,7 @@ ShardMap CreateDummyShardmap(coordinator::Address a_io_1, coordinator::Address a
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard2 = {aas2_1, aas2_2, aas2_3};
ShardMetadata shard2 = ShardMetadata{.peers = {aas2_1, aas2_2, aas2_3}, .version = 1};
auto key3 = storage::v3::PropertyValue(12);
auto key4 = storage::v3::PropertyValue(13);

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -40,8 +40,8 @@ using memgraph::coordinator::CoordinatorRsm;
using memgraph::coordinator::HlcRequest;
using memgraph::coordinator::HlcResponse;
using memgraph::coordinator::PrimaryKey;
using memgraph::coordinator::Shard;
using memgraph::coordinator::ShardMap;
using memgraph::coordinator::ShardMetadata;
using memgraph::coordinator::Shards;
using memgraph::coordinator::Status;
using memgraph::io::Address;
@ -109,7 +109,7 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard1 = {aas1_1, aas1_2, aas1_3};
ShardMetadata shard1 = ShardMetadata{.peers = {aas1_1, aas1_2, aas1_3}, .version = 1};
const auto key1 = PropertyValue(0);
const auto key2 = PropertyValue(0);
@ -121,7 +121,7 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard2 = {aas2_1, aas2_2, aas2_3};
ShardMetadata shard2 = ShardMetadata{.peers = {aas2_1, aas2_2, aas2_3}, .version = 1};
auto key3 = PropertyValue(12);
auto key4 = PropertyValue(13);
@ -131,10 +131,10 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
return sm;
}
std::optional<ShardClient *> DetermineShardLocation(const Shard &target_shard, const std::vector<Address> &a_addrs,
ShardClient &a_client, const std::vector<Address> &b_addrs,
ShardClient &b_client) {
for (const auto &addr : target_shard) {
std::optional<ShardClient *> DetermineShardLocation(const ShardMetadata &target_shard,
const std::vector<Address> &a_addrs, ShardClient &a_client,
const std::vector<Address> &b_addrs, ShardClient &b_client) {
for (const auto &addr : target_shard.peers) {
if (addr.address == b_addrs[0]) {
return &b_client;
}
@ -275,7 +275,7 @@ int main() {
const PrimaryKey compound_key = {cm_key_1, cm_key_2};
// Look for Shard
// Look for ShardMetadata
BasicResult<TimedOut, memgraph::coordinator::CoordinatorWriteResponses> read_res =
coordinator_client.SendWriteRequest(req);

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -47,8 +47,8 @@ using coordinator::GetShardMapRequest;
using coordinator::GetShardMapResponse;
using coordinator::Hlc;
using coordinator::HlcResponse;
using coordinator::Shard;
using coordinator::ShardMap;
using coordinator::ShardMetadata;
using io::Address;
using io::Io;
using io::rsm::RsmClient;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -44,8 +44,8 @@ using coordinator::GetShardMapRequest;
using coordinator::GetShardMapResponse;
using coordinator::Hlc;
using coordinator::HlcResponse;
using coordinator::Shard;
using coordinator::ShardMap;
using coordinator::ShardMetadata;
using io::Address;
using io::Io;
using io::local_transport::LocalSystem;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -45,8 +45,8 @@ using memgraph::coordinator::CoordinatorWriteRequests;
using memgraph::coordinator::CoordinatorWriteResponses;
using memgraph::coordinator::Hlc;
using memgraph::coordinator::HlcResponse;
using memgraph::coordinator::Shard;
using memgraph::coordinator::ShardMap;
using memgraph::coordinator::ShardMetadata;
using memgraph::io::Io;
using memgraph::io::local_transport::LocalSystem;
using memgraph::io::local_transport::LocalTransport;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -652,13 +652,13 @@ TEST_F(IndexTest, LabelPropertyIndexCountEstimate) {
}
}
EXPECT_EQ(acc.VertexCount(label1, prop_val), 55);
EXPECT_EQ(acc.ApproximateVertexCount(label1, prop_val), 55);
for (int i = 1; i <= 10; ++i) {
EXPECT_EQ(acc.VertexCount(label1, prop_val, PropertyValue(i)), i);
EXPECT_EQ(acc.ApproximateVertexCount(label1, prop_val, PropertyValue(i)), i);
}
EXPECT_EQ(acc.VertexCount(label1, prop_val, memgraph::utils::MakeBoundInclusive(PropertyValue(2)),
memgraph::utils::MakeBoundInclusive(PropertyValue(6))),
EXPECT_EQ(acc.ApproximateVertexCount(label1, prop_val, memgraph::utils::MakeBoundInclusive(PropertyValue(2)),
memgraph::utils::MakeBoundInclusive(PropertyValue(6))),
2 + 3 + 4 + 5 + 6);
}