diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index 923406e00..a842d798d 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -26,6 +26,7 @@ #include "storage/v3/id_types.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/schemas.hpp" +#include "storage/v3/temporal.hpp" namespace memgraph::coordinator { diff --git a/src/io/messages.hpp b/src/io/messages.hpp index fc74a309a..46c2665ce 100644 --- a/src/io/messages.hpp +++ b/src/io/messages.hpp @@ -15,7 +15,7 @@ #include #include -#include +#include "query/v2/requests.hpp" #include "utils/concepts.hpp" namespace memgraph::io::messages { @@ -23,10 +23,8 @@ namespace memgraph::io::messages { using memgraph::coordinator::CoordinatorReadRequests; using memgraph::coordinator::CoordinatorWriteRequests; using memgraph::coordinator::CoordinatorWriteResponses; - -// TODO(everbody) change these to the real shard messages -using memgraph::io::rsm::StorageReadRequest; -using memgraph::io::rsm::StorageWriteRequest; +using StorageReadRequest = msgs::ReadRequests; +using StorageWriteRequest = msgs::WriteRequests; using memgraph::io::rsm::AppendRequest; using memgraph::io::rsm::AppendResponse; diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index 16b2b71a1..fe4bc8c1f 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -38,7 +38,7 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre server_addresses_.insert(address); while (true) { - const size_t blocked_servers = blocked_on_receive_; + const size_t blocked_servers = blocked_on_receive_.size(); const bool all_servers_blocked = blocked_servers == server_addresses_.size(); @@ -53,7 +53,7 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre bool SimulatorHandle::MaybeTickSimulator() { std::unique_lock lock(mu_); - const size_t blocked_servers = blocked_on_receive_; + const size_t blocked_servers = blocked_on_receive_.size(); if (blocked_servers < server_addresses_.size()) { // we only need to advance the simulator when all diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index a673b917c..ba64da3a5 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -48,7 +48,7 @@ class SimulatorHandle { Time cluster_wide_time_microseconds_; bool should_shut_down_ = false; SimulatorStats stats_; - size_t blocked_on_receive_ = 0; + std::set
blocked_on_receive_; std::set
server_addresses_; std::mt19937 rng_; SimulatorConfig config_; @@ -115,7 +115,7 @@ class SimulatorHandle { requires(sizeof...(Ms) > 0) RequestResult Receive(const Address &receiver, Duration timeout) { std::unique_lock lock(mu_); - blocked_on_receive_ += 1; + blocked_on_receive_.emplace(receiver); const Time deadline = cluster_wide_time_microseconds_ + timeout; @@ -130,7 +130,7 @@ class SimulatorHandle { // than asserting that the last item in can_rx matches. auto m_opt = std::move(message).Take(); - blocked_on_receive_ -= 1; + blocked_on_receive_.erase(receiver); return std::move(m_opt).value(); } @@ -144,7 +144,7 @@ class SimulatorHandle { } } - blocked_on_receive_ -= 1; + blocked_on_receive_.erase(receiver); return TimedOut{}; } diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp index 909d7235b..fe18b18b2 100644 --- a/src/machine_manager/machine_manager.hpp +++ b/src/machine_manager/machine_manager.hpp @@ -11,8 +11,6 @@ #pragma once -#include - #include #include #include @@ -36,6 +34,8 @@ using memgraph::io::Time; using memgraph::io::messages::CoordinatorMessages; using memgraph::io::messages::ShardManagerMessages; using memgraph::io::messages::ShardMessages; +using memgraph::io::messages::StorageReadRequest; +using memgraph::io::messages::StorageWriteRequest; using memgraph::io::rsm::AppendRequest; using memgraph::io::rsm::AppendResponse; using memgraph::io::rsm::ReadRequest; @@ -45,9 +45,6 @@ using memgraph::io::rsm::WriteRequest; using memgraph::io::rsm::WriteResponse; using memgraph::storage::v3::ShardManager; -using memgraph::io::rsm::StorageReadRequest; -using memgraph::io::rsm::StorageWriteRequest; - /// The MachineManager is responsible for: /// * starting the entire system and ensuring that high-level /// operational requirements continue to be met diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp index 2f0ec2d49..6288c011d 100644 --- a/src/storage/v3/shard_manager.hpp +++ b/src/storage/v3/shard_manager.hpp @@ -21,9 +21,12 @@ #include #include #include -#include #include #include +#include +#include +#include +#include "storage/v3/config.hpp" namespace memgraph::storage::v3 { @@ -43,20 +46,19 @@ using memgraph::io::messages::CoordinatorMessages; using memgraph::io::messages::ShardManagerMessages; using memgraph::io::messages::ShardMessages; using memgraph::io::rsm::Raft; -using memgraph::io::rsm::ShardRsm; -using memgraph::io::rsm::StorageReadRequest; -using memgraph::io::rsm::StorageReadResponse; -using memgraph::io::rsm::StorageWriteRequest; -using memgraph::io::rsm::StorageWriteResponse; using memgraph::io::rsm::WriteRequest; using memgraph::io::rsm::WriteResponse; +using memgraph::msgs::ReadRequests; +using memgraph::msgs::ReadResponses; +using memgraph::msgs::WriteRequests; +using memgraph::msgs::WriteResponses; +using memgraph::storage::v3::ShardRsm; using ShardManagerOrRsmMessage = std::variant; using TimeUuidPair = std::pair; template -using ShardRaft = - Raft; +using ShardRaft = Raft; using namespace std::chrono_literals; static constexpr Duration kMinimumCronInterval = 1000ms; @@ -208,8 +210,10 @@ class ShardManager { // TODO(tyler) get geers from Coordinator in HeartbeatResponse std::vector
rsm_peers = {}; - // TODO(everbody) change this to storage::Shard - ShardRsm rsm_state{}; + std::unique_ptr shard = + std::make_unique(to_init.label_id, to_init.min_key, to_init.max_key, to_init.config); + + ShardRsm rsm_state{std::move(shard)}; ShardRaft rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)}; diff --git a/tests/unit/machine_manager.cpp b/tests/unit/machine_manager.cpp index 3c3a69ff1..93f2cef23 100644 --- a/tests/unit/machine_manager.cpp +++ b/tests/unit/machine_manager.cpp @@ -25,8 +25,8 @@ #include #include #include +#include #include "io/rsm/rsm_client.hpp" -#include "io/rsm/shard_rsm.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/schemas.hpp" @@ -45,19 +45,19 @@ using memgraph::coordinator::ShardMap; using memgraph::io::Io; using memgraph::io::local_transport::LocalSystem; using memgraph::io::local_transport::LocalTransport; -using memgraph::io::rsm::RsmClient; -using memgraph::io::rsm::StorageReadRequest; -using memgraph::io::rsm::StorageReadResponse; -using memgraph::io::rsm::StorageWriteRequest; -using memgraph::io::rsm::StorageWriteResponse; using memgraph::machine_manager::MachineConfig; using memgraph::machine_manager::MachineManager; using memgraph::storage::v3::LabelId; using memgraph::storage::v3::SchemaProperty; +using memgraph::io::rsm::RsmClient; +using memgraph::msgs::ReadRequests; +using memgraph::msgs::ReadResponses; +using memgraph::msgs::WriteRequests; +using memgraph::msgs::WriteResponses; + using CompoundKey = std::vector; -using ShardClient = - RsmClient; +using ShardClient = RsmClient; ShardMap TestShardMap() { ShardMap sm{}; @@ -192,16 +192,22 @@ TEST(MachineManager, BasicFunctionality) { // submit a read request and assert that the requested key does not yet exist LabelId label_id = sm.labels.at(label_name); - StorageReadRequest storage_get_req; + + ReadRequests storage_get_req; + /* + TODO(tyler,kostas) set this to a real request storage_get_req.label_id = label_id; storage_get_req.key = compound_key; storage_get_req.transaction_id = hlc_response.new_hlc; + */ auto get_response_result = shard_client.SendReadRequest(storage_get_req); auto get_response = get_response_result.GetValue(); + /* auto val = get_response.value; MG_ASSERT(!val.has_value()); + */ local_system.ShutDown(); };