Return an error and the latest known ShardMap version if the requested key is not possibly stored in the given shard

This commit is contained in:
gvolfing 2022-08-05 15:20:40 +02:00 committed by Tyler Neely
parent 92d69e080c
commit dd46cc407f

View File

@ -19,12 +19,15 @@
#include <thread>
#include <vector>
#include "coordinator/hybrid_logical_clock.hpp"
#include "io/address.hpp"
#include "io/rsm/raft.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/logging.hpp"
using memgraph::coordinator::Hlc;
using memgraph::io::Address;
using memgraph::io::Io;
using memgraph::io::ResponseEnvelope;
@ -53,6 +56,8 @@ struct StorageWriteRequest {
struct StorageWriteResponse {
bool shard_rsm_success;
std::optional<int> last_value;
// Only has a value if the given shard does not contain the requested key
std::optional<Hlc> latest_known_shard_map_version{std::nullopt};
};
struct StorageGetRequest {
@ -60,19 +65,37 @@ struct StorageGetRequest {
};
struct StorageGetResponse {
bool shard_rsm_success;
std::optional<int> value;
// Only has a value if the given shard does not contain the requested key
std::optional<Hlc> latest_known_shard_map_version{std::nullopt};
};
class StorageRsm {
std::map<ShardRsmKey, int> state_;
ShardRsmKey minimum_key_;
std::optional<ShardRsmKey> maximum_key_{std::nullopt};
Hlc shard_map_version_;
// The key is not located in this shard
bool IsKeyInRange(const ShardRsmKey &key) {
MG_ASSERT(maximum_key_);
return (key >= minimum_key_ && key <= maximum_key_);
}
public:
StorageGetResponse Read(StorageGetRequest request) {
StorageGetResponse ret;
if (state_.contains(request.key)) {
if (IsKeyInRange(request.key)) {
ret.latest_known_shard_map_version = shard_map_version_;
ret.shard_rsm_success = false;
} else if (state_.contains(request.key)) {
ret.value = state_[request.key];
ret.shard_rsm_success = true;
} else {
ret.shard_rsm_success = false;
ret.value = std::nullopt;
}
return ret;
}
@ -80,8 +103,12 @@ class StorageRsm {
StorageWriteResponse Apply(StorageWriteRequest request) {
StorageWriteResponse ret;
if (IsKeyInRange(request.key)) {
ret.latest_known_shard_map_version = shard_map_version_;
ret.shard_rsm_success = false;
}
// Key exist
if (state_.contains(request.key)) {
else if (state_.contains(request.key)) {
auto &val = state_[request.key];
/*