Fix a mistake with singleton inheritence
Reviewers: sasa.stanko Reviewed By: sasa.stanko Differential Revision: https://phabricator.memgraph.io/D697
This commit is contained in:
parent
591d086013
commit
10e98b5c2e
@ -107,7 +107,7 @@ class Network {
|
|||||||
bool success =
|
bool success =
|
||||||
Protocol::SendMessage(nm.address, nm.port, nm.reactor,
|
Protocol::SendMessage(nm.address, nm.port, nm.reactor,
|
||||||
nm.channel, std::move(nm.message));
|
nm.channel, std::move(nm.message));
|
||||||
std::cout << "Network client message send status: " << success << std::endl;
|
DLOG(INFO) << "Network client message send status: " << success << std::endl;
|
||||||
} else {
|
} else {
|
||||||
this->mutex_.unlock();
|
this->mutex_.unlock();
|
||||||
}
|
}
|
||||||
@ -277,8 +277,9 @@ class ChannelResolvedMessage : public Message {
|
|||||||
* E.g. resolve remote channels by memgraph node id, etc.
|
* E.g. resolve remote channels by memgraph node id, etc.
|
||||||
* Alive through the entire process lifetime.
|
* Alive through the entire process lifetime.
|
||||||
* Singleton class. Created automatically on first use.
|
* Singleton class. Created automatically on first use.
|
||||||
|
* Final (can't extend) because it's a singleton. Please be careful if you're changing this.
|
||||||
*/
|
*/
|
||||||
class Distributed {
|
class Distributed final {
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Get the (singleton) instance of Distributed.
|
* Get the (singleton) instance of Distributed.
|
||||||
|
@ -466,8 +466,9 @@ class Reactor {
|
|||||||
* E.g. holds set of reactors, channels for all reactors.
|
* E.g. holds set of reactors, channels for all reactors.
|
||||||
* Alive through the entire process lifetime.
|
* Alive through the entire process lifetime.
|
||||||
* Singleton class. Created automatically on first use.
|
* Singleton class. Created automatically on first use.
|
||||||
|
* Final (can't extend) because it's a singleton. Please be careful if you're changing this.
|
||||||
*/
|
*/
|
||||||
class System {
|
class System final {
|
||||||
public:
|
public:
|
||||||
friend class Reactor;
|
friend class Reactor;
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future
|
DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future
|
||||||
DEFINE_string(config_filename, "", "File containing list of all processes");
|
DEFINE_string(config_filename, "", "File containing list of all processes");
|
||||||
|
|
||||||
class MemgraphDistributed : public Distributed {
|
class MemgraphDistributed {
|
||||||
private:
|
private:
|
||||||
using Location = std::pair<std::string, uint16_t>;
|
using Location = std::pair<std::string, uint16_t>;
|
||||||
|
|
||||||
@ -19,8 +19,8 @@ class MemgraphDistributed : public Distributed {
|
|||||||
* More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
|
* More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
|
||||||
*/
|
*/
|
||||||
static MemgraphDistributed &GetInstance() {
|
static MemgraphDistributed &GetInstance() {
|
||||||
static MemgraphDistributed distributed; // guaranteed to be destroyed, initialized on first use
|
static MemgraphDistributed memgraph; // guaranteed to be destroyed, initialized on first use
|
||||||
return distributed;
|
return memgraph;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Register memgraph node id to the given location. */
|
/** Register memgraph node id to the given location. */
|
||||||
@ -34,7 +34,7 @@ class MemgraphDistributed : public Distributed {
|
|||||||
const std::string &channel) {
|
const std::string &channel) {
|
||||||
std::unique_lock<std::recursive_mutex> lock(mutex_);
|
std::unique_lock<std::recursive_mutex> lock(mutex_);
|
||||||
const auto &location = mnodes_.at(mnid);
|
const auto &location = mnodes_.at(mnid);
|
||||||
return Distributed::FindChannel(location.first, location.second, reactor, channel);
|
return Distributed::GetInstance().FindChannel(location.first, location.second, reactor, channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
@ -74,13 +74,13 @@ std::pair<int64_t, std::vector<int64_t>>
|
|||||||
std::string address;
|
std::string address;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
file >> master_mnid >> address >> port;
|
file >> master_mnid >> address >> port;
|
||||||
MemgraphDistributed &distributed = MemgraphDistributed::GetInstance();
|
MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance();
|
||||||
distributed.RegisterMemgraphNode(master_mnid, address, port);
|
memgraph.RegisterMemgraphNode(master_mnid, address, port);
|
||||||
while (file.good()) {
|
while (file.good()) {
|
||||||
file >> mnid >> address >> port;
|
file >> mnid >> address >> port;
|
||||||
if (file.eof())
|
if (file.eof())
|
||||||
break ;
|
break ;
|
||||||
distributed.RegisterMemgraphNode(mnid, address, port);
|
memgraph.RegisterMemgraphNode(mnid, address, port);
|
||||||
worker_mnids.push_back(mnid);
|
worker_mnids.push_back(mnid);
|
||||||
}
|
}
|
||||||
file.close();
|
file.close();
|
||||||
@ -116,7 +116,9 @@ class Master : public Reactor {
|
|||||||
worker_mnids_(std::move(worker_mnids)) {}
|
worker_mnids_(std::move(worker_mnids)) {}
|
||||||
|
|
||||||
virtual void Run() {
|
virtual void Run() {
|
||||||
MemgraphDistributed &distributed = MemgraphDistributed::GetInstance();
|
MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance();
|
||||||
|
Distributed &distributed = Distributed::GetInstance();
|
||||||
|
|
||||||
std::cout << "Master (" << mnid_ << ") @ " << distributed.network().Address()
|
std::cout << "Master (" << mnid_ << ") @ " << distributed.network().Address()
|
||||||
<< ":" << distributed.network().Port() << std::endl;
|
<< ":" << distributed.network().Port() << std::endl;
|
||||||
|
|
||||||
@ -139,7 +141,7 @@ class Master : public Reactor {
|
|||||||
|
|
||||||
// send a TextMessage to each worker
|
// send a TextMessage to each worker
|
||||||
for (auto wmnid : worker_mnids_) {
|
for (auto wmnid : worker_mnids_) {
|
||||||
auto stream = distributed.FindChannel(wmnid, "worker", "main");
|
auto stream = memgraph.FindChannel(wmnid, "worker", "main");
|
||||||
stream->OnEventOnce()
|
stream->OnEventOnce()
|
||||||
.ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg){
|
.ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg){
|
||||||
msg.channel()->Send<TextMessage>("master", "main", "hi from master");
|
msg.channel()->Send<TextMessage>("master", "main", "hi from master");
|
||||||
@ -161,7 +163,8 @@ class Worker : public Reactor {
|
|||||||
master_mnid_(master_mnid) {}
|
master_mnid_(master_mnid) {}
|
||||||
|
|
||||||
virtual void Run() {
|
virtual void Run() {
|
||||||
MemgraphDistributed &distributed = MemgraphDistributed::GetInstance();
|
Distributed &distributed = Distributed::GetInstance();
|
||||||
|
|
||||||
std::cout << "Worker (" << mnid_ << ") @ " << distributed.network().Address()
|
std::cout << "Worker (" << mnid_ << ") @ " << distributed.network().Address()
|
||||||
<< ":" << distributed.network().Port() << std::endl;
|
<< ":" << distributed.network().Port() << std::endl;
|
||||||
|
|
||||||
@ -191,15 +194,14 @@ int main(int argc, char *argv[]) {
|
|||||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||||
|
|
||||||
System &system = System::GetInstance();
|
System &system = System::GetInstance();
|
||||||
MemgraphDistributed& distributed = MemgraphDistributed::GetInstance();
|
|
||||||
auto mnids = ParseConfigAndRegister(FLAGS_config_filename);
|
auto mnids = ParseConfigAndRegister(FLAGS_config_filename);
|
||||||
distributed.StartServices();
|
Distributed::GetInstance().StartServices();
|
||||||
if (FLAGS_my_mnid == mnids.first)
|
if (FLAGS_my_mnid == mnids.first)
|
||||||
system.Spawn<Master>("master", FLAGS_my_mnid, std::move(mnids.second));
|
system.Spawn<Master>("master", FLAGS_my_mnid, std::move(mnids.second));
|
||||||
else
|
else
|
||||||
system.Spawn<Worker>("worker", FLAGS_my_mnid, mnids.first);
|
system.Spawn<Worker>("worker", FLAGS_my_mnid, mnids.first);
|
||||||
system.AwaitShutdown();
|
system.AwaitShutdown();
|
||||||
distributed.StopServices();
|
Distributed::GetInstance().StopServices();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ import os
|
|||||||
command = 'gnome-terminal'
|
command = 'gnome-terminal'
|
||||||
program = './distributed_test'
|
program = './distributed_test'
|
||||||
config_filename = 'config'
|
config_filename = 'config'
|
||||||
flags = '--minloglevel=2'
|
flags = '-alsologtostderr --minloglevel=2'
|
||||||
|
|
||||||
f = open(config_filename, 'r')
|
f = open(config_filename, 'r')
|
||||||
for line in f:
|
for line in f:
|
||||||
|
Loading…
Reference in New Issue
Block a user