Fix race condition in simulator. Make leader election function more reliably in raft code
This commit is contained in:
parent
4876b7cd8c
commit
ad1d8637e5
@ -331,7 +331,7 @@ class SimulatorHandle {
|
||||
|
||||
uint64_t deadline = cluster_wide_time_microseconds_ + timeout_microseconds;
|
||||
|
||||
while (!(should_shut_down_ && cluster_wide_time_microseconds_ < deadline)) {
|
||||
while (!should_shut_down_ && (cluster_wide_time_microseconds_ < deadline)) {
|
||||
if (can_receive_.contains(receiver)) {
|
||||
std::vector<OpaqueMessage> &can_rx = can_receive_.at(receiver);
|
||||
if (!can_rx.empty()) {
|
||||
@ -349,7 +349,7 @@ class SimulatorHandle {
|
||||
lock.unlock();
|
||||
bool made_progress = MaybeTickSimulator();
|
||||
lock.lock();
|
||||
if (!made_progress) {
|
||||
if (!should_shut_down_ && !made_progress) {
|
||||
cv_.wait(lock);
|
||||
}
|
||||
blocked_on_receive_ -= 1;
|
||||
@ -390,7 +390,7 @@ class SimulatorHandle {
|
||||
// messages that are sent to servers that may later receive them
|
||||
std::map<Address, std::vector<OpaqueMessage>> can_receive_;
|
||||
|
||||
uint64_t cluster_wide_time_microseconds_ = 100000000;
|
||||
uint64_t cluster_wide_time_microseconds_ = 1000000; // it's one million (microseconds) o'clock!
|
||||
bool should_shut_down_ = false;
|
||||
SimulatorStats stats_;
|
||||
size_t blocked_on_receive_ = 0;
|
||||
|
@ -9,6 +9,7 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
// TODO(tyler) add role and term to all log statements
|
||||
// TODO(tyler) buffer out-of-order Append buffers to reassemble more quickly
|
||||
// TODO(tyler) handle granular batch sizes based on simple flow control
|
||||
// TODO(tyler) add "application" test that asserts that all state machines apply the same items in-order
|
||||
@ -114,11 +115,11 @@ class Server {
|
||||
Server(Io<IoImpl> io, std::vector<Address> peers) : io_(io), peers_(peers) {}
|
||||
|
||||
void Run() {
|
||||
Time last_cron = 0;
|
||||
Time last_cron = io_.Now();
|
||||
|
||||
while (!io_.ShouldShutDown()) {
|
||||
auto now = io_.Now();
|
||||
Duration random_cron_interval = RandomTimeout(10000, 30000);
|
||||
Duration random_cron_interval = RandomTimeout(5000, 30000);
|
||||
if (now - last_cron > random_cron_interval) {
|
||||
Cron();
|
||||
last_cron = now;
|
||||
@ -557,7 +558,7 @@ void RunServer(Server<IoImpl> server) {
|
||||
server.Run();
|
||||
}
|
||||
|
||||
int main() {
|
||||
void RunSimulation() {
|
||||
auto config = SimulatorConfig{
|
||||
.drop_percent = 0,
|
||||
.perform_timeouts = true,
|
||||
@ -589,8 +590,6 @@ int main() {
|
||||
auto srv_thread_2 = std::jthread(RunServer<SimulatorTransport>, std::move(srv_2));
|
||||
auto srv_thread_3 = std::jthread(RunServer<SimulatorTransport>, std::move(srv_3));
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
|
||||
bool success = false;
|
||||
Address leader = srv_addr_1;
|
||||
for (int retries = 0; retries < 30; retries++) {
|
||||
@ -619,10 +618,20 @@ int main() {
|
||||
MG_ASSERT(success);
|
||||
|
||||
simulator.ShutDown();
|
||||
std::cout << "========================== SUCCESS :) ==========================" << std::endl;
|
||||
|
||||
/*
|
||||
srv_thread_1.join();
|
||||
srv_thread_2.join();
|
||||
srv_thread_3.join();
|
||||
*/
|
||||
}
|
||||
|
||||
int main() {
|
||||
for (int i = 0; i < 1; i++) {
|
||||
std::cout << "========================== NEW SIMULATION ==========================" << std::endl;
|
||||
RunSimulation();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user