Implement the rest of the KV-on-Raft RSM test
This commit is contained in:
parent
aebac2c519
commit
d1c5aead61
@ -44,7 +44,7 @@ struct CasRequest {
|
||||
};
|
||||
|
||||
struct CasResponse {
|
||||
bool success;
|
||||
bool cas_success;
|
||||
std::optional<int> last_value;
|
||||
};
|
||||
|
||||
@ -53,7 +53,7 @@ struct GetRequest {
|
||||
};
|
||||
|
||||
struct GetResponse {
|
||||
int value;
|
||||
std::optional<int> value;
|
||||
};
|
||||
|
||||
class TestState {
|
||||
@ -62,16 +62,17 @@ class TestState {
|
||||
public:
|
||||
GetResponse read(GetRequest request) {
|
||||
GetResponse ret;
|
||||
ret.value = state_.at(request.key);
|
||||
if (state_.contains(request.key)) {
|
||||
ret.value = state_[request.key];
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
CasResponse apply(CasRequest request) {
|
||||
CasResponse ret;
|
||||
|
||||
// TODO(gabor) remove my annoying-ass comments
|
||||
// Key exist
|
||||
if (state_.find(request.key) != state_.end()) {
|
||||
if (state_.contains(request.key)) {
|
||||
auto &val = state_[request.key];
|
||||
|
||||
/*
|
||||
@ -79,7 +80,7 @@ class TestState {
|
||||
*/
|
||||
if (!request.new_value) {
|
||||
ret.last_value = val;
|
||||
ret.success = true;
|
||||
ret.cas_success = true;
|
||||
|
||||
state_.erase(state_.find(request.key));
|
||||
}
|
||||
@ -90,12 +91,12 @@ class TestState {
|
||||
// Does old_value match?
|
||||
if (request.old_value == val) {
|
||||
ret.last_value = val;
|
||||
ret.success = true;
|
||||
ret.cas_success = true;
|
||||
|
||||
val = request.new_value.value();
|
||||
} else {
|
||||
ret.last_value = val;
|
||||
ret.success = false;
|
||||
ret.cas_success = false;
|
||||
}
|
||||
}
|
||||
/*
|
||||
@ -103,9 +104,9 @@ class TestState {
|
||||
*/
|
||||
else {
|
||||
ret.last_value = std::nullopt;
|
||||
ret.success = true;
|
||||
ret.cas_success = true;
|
||||
|
||||
state_[request.key] = request.new_value.value();
|
||||
state_.emplace(request.key, std::move(request.new_value).value());
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -162,45 +163,99 @@ void RunSimulation() {
|
||||
|
||||
std::mt19937 cli_rng_{0};
|
||||
Address server_addrs[]{srv_addr_1, srv_addr_2, srv_addr_3};
|
||||
bool success = false;
|
||||
Address leader = server_addrs[0];
|
||||
|
||||
while (true) {
|
||||
const int key = 0;
|
||||
std::optional<int> last_known_value;
|
||||
|
||||
bool success = false;
|
||||
|
||||
for (int i = 0; !success; i++) {
|
||||
// send request
|
||||
CasRequest cas_req;
|
||||
cas_req.key = 0;
|
||||
cas_req.old_value = std::nullopt;
|
||||
cas_req.new_value = 12;
|
||||
cas_req.key = key;
|
||||
|
||||
cas_req.old_value = last_known_value;
|
||||
|
||||
cas_req.new_value = i;
|
||||
|
||||
WriteRequest<CasRequest> cli_req;
|
||||
cli_req.operation = cas_req;
|
||||
|
||||
// TODO(tyler / gabor) replace Replication* with Cas/Read
|
||||
|
||||
std::cout << "client sending ReplicationRequest to Leader " << leader.last_known_port << std::endl;
|
||||
ResponseFuture<WriteResponse<CasResponse>> response_future =
|
||||
std::cout << "client sending CasRequest to Leader " << leader.last_known_port << std::endl;
|
||||
ResponseFuture<WriteResponse<CasResponse>> cas_response_future =
|
||||
cli_io.RequestWithTimeout<WriteRequest<CasRequest>, WriteResponse<CasResponse>>(leader, cli_req, 50000);
|
||||
|
||||
// receive response
|
||||
ResponseResult<WriteResponse<CasResponse>> response_result = response_future.Wait();
|
||||
// receive cas_response
|
||||
ResponseResult<WriteResponse<CasResponse>> cas_response_result = cas_response_future.Wait();
|
||||
|
||||
if (response_result.HasError()) {
|
||||
if (cas_response_result.HasError()) {
|
||||
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
ResponseEnvelope<WriteResponse<CasResponse>> response_envelope = response_result.GetValue();
|
||||
WriteResponse<CasResponse> response = response_envelope.message;
|
||||
ResponseEnvelope<WriteResponse<CasResponse>> cas_response_envelope = cas_response_result.GetValue();
|
||||
WriteResponse<CasResponse> write_cas_response = cas_response_envelope.message;
|
||||
|
||||
if (response.success) {
|
||||
success = true;
|
||||
break;
|
||||
if (write_cas_response.retry_leader) {
|
||||
MG_ASSERT(!write_cas_response.success, "retry_leader should never be set for successful responses");
|
||||
leader = write_cas_response.retry_leader.value();
|
||||
std::cout << "client redirected to leader server " << leader.last_known_port << std::endl;
|
||||
} else if (!write_cas_response.success) {
|
||||
std::uniform_int_distribution<size_t> addr_distrib(0, 2);
|
||||
size_t addr_index = addr_distrib(cli_rng_);
|
||||
leader = server_addrs[addr_index];
|
||||
|
||||
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
|
||||
<< " with port " << leader.last_known_port << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (response.retry_leader) {
|
||||
leader = response.retry_leader.value();
|
||||
std::cout << "client redirected to leader server " << leader.last_known_port << std::endl;
|
||||
CasResponse cas_response = write_cas_response.write_return;
|
||||
|
||||
bool cas_succeeded = cas_response.cas_success;
|
||||
|
||||
std::cout << "Client received CasResponse! success: " << cas_succeeded
|
||||
<< " last_known_value: " << (int)*last_known_value << std::endl;
|
||||
|
||||
if (cas_succeeded) {
|
||||
last_known_value = i;
|
||||
} else {
|
||||
last_known_value = cas_response.last_value;
|
||||
continue;
|
||||
}
|
||||
|
||||
GetRequest get_req;
|
||||
get_req.key = key;
|
||||
|
||||
ReadRequest<GetRequest> read_req;
|
||||
read_req.operation = get_req;
|
||||
|
||||
std::cout << "client sending GetRequest to Leader " << leader.last_known_port << std::endl;
|
||||
ResponseFuture<ReadResponse<GetResponse>> get_response_future =
|
||||
cli_io.RequestWithTimeout<ReadRequest<GetRequest>, ReadResponse<GetResponse>>(leader, read_req, 50000);
|
||||
|
||||
// receive response
|
||||
ResponseResult<ReadResponse<GetResponse>> get_response_result = get_response_future.Wait();
|
||||
|
||||
if (get_response_result.HasError()) {
|
||||
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
ResponseEnvelope<ReadResponse<GetResponse>> get_response_envelope = get_response_result.GetValue();
|
||||
ReadResponse<GetResponse> read_get_response = get_response_envelope.message;
|
||||
|
||||
if (!read_get_response.success) {
|
||||
// sent to a non-leader
|
||||
continue;
|
||||
}
|
||||
|
||||
if (read_get_response.retry_leader) {
|
||||
MG_ASSERT(!read_get_response.success, "retry_leader should never be set for successful responses");
|
||||
leader = read_get_response.retry_leader.value();
|
||||
std::cout << "client redirected to leader server " << leader.last_known_port << std::endl;
|
||||
} else if (!read_get_response.success) {
|
||||
std::uniform_int_distribution<size_t> addr_distrib(0, 2);
|
||||
size_t addr_index = addr_distrib(cli_rng_);
|
||||
leader = server_addrs[addr_index];
|
||||
@ -208,6 +263,14 @@ void RunSimulation() {
|
||||
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
|
||||
<< " with port " << leader.last_known_port << std::endl;
|
||||
}
|
||||
|
||||
GetResponse get_response = read_get_response.read_return;
|
||||
|
||||
MG_ASSERT(get_response.value == i);
|
||||
|
||||
std::cout << "client successfully cas'd a value and read it back! value: " << i << std::endl;
|
||||
|
||||
success = true;
|
||||
}
|
||||
|
||||
MG_ASSERT(success);
|
||||
|
Loading…
Reference in New Issue
Block a user