From 5418dfb19e13280a94c0a33dacb773bd35600905 Mon Sep 17 00:00:00 2001 From: Dominik Gleich <dominik.gleich@memgraph.io> Date: Mon, 15 Jan 2018 14:03:07 +0100 Subject: [PATCH] Rename NetworkEndpoint Summary: Rename redunant port str Add endpoint << operator Migrate everything to endpoint Reviewers: mferencevic, florijan Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1100 --- src/CMakeLists.txt | 3 +- src/communication/messaging/distributed.cpp | 14 ++--- src/communication/messaging/distributed.hpp | 28 +++------ src/communication/messaging/protocol.cpp | 25 ++++---- src/communication/messaging/protocol.hpp | 10 +-- src/communication/raft/rpc.hpp | 14 ++--- src/communication/rpc/rpc.cpp | 38 +++++------- src/communication/rpc/rpc.hpp | 6 +- src/communication/server.hpp | 2 +- src/database/counters.cpp | 2 +- src/database/counters.hpp | 2 +- src/database/graph_db.hpp | 6 +- src/distributed/coordination_master.hpp | 4 +- src/distributed/coordination_rpc_messages.hpp | 4 +- src/distributed/coordination_worker.hpp | 4 +- src/io/network/addrinfo.cpp | 2 +- src/io/network/endpoint.cpp | 36 +++++++++++ .../{network_endpoint.hpp => endpoint.hpp} | 23 +++---- src/io/network/network_endpoint.cpp | 53 ---------------- src/io/network/socket.cpp | 14 +++-- src/io/network/socket.hpp | 20 +++--- src/memgraph_bolt.cpp | 17 +++--- src/storage/concurrent_id_mapper_worker.cpp | 2 +- src/storage/concurrent_id_mapper_worker.hpp | 4 +- src/transactions/engine_worker.cpp | 2 +- src/transactions/engine_worker.hpp | 4 +- src/utils/network.cpp | 44 +++++++++++++ src/utils/network.hpp | 10 +++ tests/concurrent/network_common.hpp | 6 +- tests/concurrent/network_read_hang.cpp | 12 ++-- tests/concurrent/network_server.cpp | 10 +-- tests/concurrent/network_session_leak.cpp | 7 ++- tests/distributed/raft/example_client.cpp | 15 +++-- tests/distributed/raft/example_server.cpp | 3 +- tests/macro_benchmark/clients/bolt_client.hpp | 6 +- .../clients/long_running_client.cpp | 41 +++++++------ .../macro_benchmark/clients/query_client.cpp | 14 ++--- tests/manual/bolt_client.cpp | 9 +-- tests/manual/raft_rpc.cpp | 13 ++-- tests/stress/long_running.cpp | 6 +- .../unit/concurrent_id_mapper_distributed.cpp | 4 +- tests/unit/counters.cpp | 6 +- tests/unit/distributed_coordination.cpp | 11 ++-- tests/unit/messaging_distributed.cpp | 23 +++---- tests/unit/network_endpoint.cpp | 61 +++---------------- tests/unit/network_timeouts.cpp | 9 ++- tests/unit/rpc.cpp | 14 ++--- tests/unit/transaction_engine_distributed.cpp | 6 +- tests/unit/utils_network.cpp | 21 +++++++ 49 files changed, 342 insertions(+), 348 deletions(-) create mode 100644 src/io/network/endpoint.cpp rename src/io/network/{network_endpoint.hpp => endpoint.hpp} (56%) delete mode 100644 src/io/network/network_endpoint.cpp create mode 100644 src/utils/network.cpp create mode 100644 src/utils/network.hpp create mode 100644 tests/unit/utils_network.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ad68fffe2..0e100dcc9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -21,7 +21,7 @@ set(memgraph_src_files durability/snapshooter.cpp durability/wal.cpp io/network/addrinfo.cpp - io/network/network_endpoint.cpp + io/network/endpoint.cpp io/network/socket.cpp query/common.cpp query/console.cpp @@ -47,6 +47,7 @@ set(memgraph_src_files transactions/engine_master.cpp transactions/engine_single_node.cpp transactions/engine_worker.cpp + utils/network.cpp utils/watchdog.cpp ) # ----------------------------------------------------------------------------- diff --git a/src/communication/messaging/distributed.cpp b/src/communication/messaging/distributed.cpp index d944b2d06..6bd92af55 100644 --- a/src/communication/messaging/distributed.cpp +++ b/src/communication/messaging/distributed.cpp @@ -2,16 +2,12 @@ namespace communication::messaging { -System::System(const std::string &address, uint16_t port) - : endpoint_(address, port) { +System::System(const io::network::Endpoint &endpoint) : endpoint_(endpoint) { // Numbers of workers is quite arbitrary at this point. StartClient(4); StartServer(4); } -System::System(const io::network::NetworkEndpoint &endpoint) - : System(endpoint.address(), endpoint.port()) {} - System::~System() { queue_.Shutdown(); for (size_t i = 0; i < pool_.size(); ++i) { @@ -26,7 +22,7 @@ void System::StartClient(int worker_count) { while (true) { auto message = queue_.AwaitPop(); if (message == std::experimental::nullopt) break; - SendMessage(message->address, message->port, message->channel, + SendMessage(message->endpoint, message->channel, std::move(message->message)); } })); @@ -47,11 +43,11 @@ std::shared_ptr<EventStream> System::Open(const std::string &name) { return system_.Open(name); } -Writer::Writer(System &system, const std::string &address, uint16_t port, +Writer::Writer(System &system, const Endpoint &endpoint, const std::string &name) - : system_(system), address_(address), port_(port), name_(name) {} + : system_(system), endpoint_(endpoint), name_(name) {} void Writer::Send(std::unique_ptr<Message> message) { - system_.queue_.Emplace(address_, port_, name_, std::move(message)); + system_.queue_.Emplace(endpoint_, name_, std::move(message)); } } // namespace communication::messaging diff --git a/src/communication/messaging/distributed.hpp b/src/communication/messaging/distributed.hpp index 294d2f9a2..47ff9e54a 100644 --- a/src/communication/messaging/distributed.hpp +++ b/src/communication/messaging/distributed.hpp @@ -19,7 +19,7 @@ #include "protocol.hpp" #include "communication/server.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "threading/sync/spinlock.hpp" namespace communication::messaging { @@ -29,8 +29,7 @@ class System; // Writes message to remote event stream. class Writer { public: - Writer(System &system, const std::string &address, uint16_t port, - const std::string &name); + Writer(System &system, const Endpoint &endpoint, const std::string &name); Writer(const Writer &) = delete; void operator=(const Writer &) = delete; Writer(Writer &&) = delete; @@ -46,19 +45,15 @@ class Writer { private: System &system_; - std::string address_; - uint16_t port_; + Endpoint endpoint_; std::string name_; }; class System { - using Endpoint = io::network::NetworkEndpoint; - public: friend class Writer; - System(const std::string &address, uint16_t port); - explicit System(const Endpoint &endpoint); + System(const Endpoint &endpoint); System(const System &) = delete; System(System &&) = delete; System &operator=(const System &) = delete; @@ -67,7 +62,7 @@ class System { std::shared_ptr<EventStream> Open(const std::string &name); - const io::network::NetworkEndpoint &endpoint() const { return endpoint_; } + const Endpoint &endpoint() const { return endpoint_; } private: using Socket = Socket; @@ -76,19 +71,14 @@ class System { struct NetworkMessage { NetworkMessage() {} - NetworkMessage(const std::string &address, uint16_t port, - const std::string &channel, + NetworkMessage(const Endpoint &endpoint, const std::string &channel, std::unique_ptr<Message> &&message) - : address(address), - port(port), - channel(channel), - message(std::move(message)) {} + : endpoint(endpoint), channel(channel), message(std::move(message)) {} NetworkMessage(NetworkMessage &&nm) = default; NetworkMessage &operator=(NetworkMessage &&nm) = default; - std::string address; - uint16_t port = 0; + Endpoint endpoint; std::string channel; std::unique_ptr<Message> message; }; @@ -108,7 +98,7 @@ class System { // Server variables. SessionData protocol_data_; std::unique_ptr<ServerT> server_{nullptr}; - io::network::NetworkEndpoint endpoint_; + Endpoint endpoint_; LocalSystem &system_ = protocol_data_.system; }; diff --git a/src/communication/messaging/protocol.cpp b/src/communication/messaging/protocol.cpp index eaba214bc..fbd647493 100644 --- a/src/communication/messaging/protocol.cpp +++ b/src/communication/messaging/protocol.cpp @@ -75,30 +75,27 @@ struct PairHash { } }; -void SendMessage(const std::string &address, uint16_t port, - const std::string &channel, std::unique_ptr<Message> message) { +void SendMessage(const Endpoint &endpoint, const std::string &channel, + std::unique_ptr<Message> message) { static thread_local std::unordered_map<std::pair<std::string, uint16_t>, Socket, PairHash> cache; CHECK(message) << "Trying to send nullptr instead of message"; - auto it = cache.find({address, port}); + auto it = cache.find({endpoint.address(), endpoint.port()}); if (it == cache.end()) { - // Initialize endpoint. - Endpoint endpoint(address.c_str(), port); - Socket socket; if (!socket.Connect(endpoint)) { - LOG(INFO) << "Couldn't connect to remote address: " << address << ":" - << port; + LOG(INFO) << "Couldn't connect to endpoint: " << endpoint; return; } - it = cache - .emplace(std::piecewise_construct, - std::forward_as_tuple(address, port), - std::forward_as_tuple(std::move(socket))) - .first; + it = + cache + .emplace(std::piecewise_construct, + std::forward_as_tuple(endpoint.address(), endpoint.port()), + std::forward_as_tuple(std::move(socket))) + .first; } auto &socket = it->second; @@ -131,4 +128,4 @@ void SendMessage(const std::string &address, uint16_t port, return; } } -} +} // namespace communication::messaging diff --git a/src/communication/messaging/protocol.hpp b/src/communication/messaging/protocol.hpp index 3e8d6da9b..c7794a04e 100644 --- a/src/communication/messaging/protocol.hpp +++ b/src/communication/messaging/protocol.hpp @@ -5,8 +5,8 @@ #include "communication/bolt/v1/decoder/buffer.hpp" #include "communication/messaging/local.hpp" +#include "io/network/endpoint.hpp" #include "io/network/epoll.hpp" -#include "io/network/network_endpoint.hpp" #include "io/network/socket.hpp" #include "io/network/stream_buffer.hpp" @@ -23,7 +23,7 @@ namespace communication::messaging { class Message; -using Endpoint = io::network::NetworkEndpoint; +using Endpoint = io::network::Endpoint; using Socket = io::network::Socket; using StreamBuffer = io::network::StreamBuffer; @@ -101,6 +101,6 @@ class Session { /** * Distributed Protocol Send Message */ -void SendMessage(const std::string &address, uint16_t port, - const std::string &channel, std::unique_ptr<Message> message); -} +void SendMessage(const Endpoint &endpoint, const std::string &channel, + std::unique_ptr<Message> message); +} // namespace communication::messaging diff --git a/src/communication/raft/rpc.hpp b/src/communication/raft/rpc.hpp index ec51b1281..fff1539b7 100644 --- a/src/communication/raft/rpc.hpp +++ b/src/communication/raft/rpc.hpp @@ -8,7 +8,7 @@ #include "communication/raft/network_common.hpp" #include "communication/raft/raft.hpp" #include "communication/rpc/rpc.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" /* Implementation of `RaftNetworkInterface` using RPC. Raft RPC requests and * responses are wrapped in `PeerRpcRequest` and `PeerRpcReply`. */ @@ -28,9 +28,8 @@ using PeerProtocol = template <class State> class RpcNetwork : public RaftNetworkInterface<State> { public: - RpcNetwork( - communication::messaging::System &system, - std::unordered_map<std::string, io::network::NetworkEndpoint> directory) + RpcNetwork(communication::messaging::System &system, + std::unordered_map<std::string, io::network::Endpoint> directory) : system_(system), directory_(std::move(directory)), server_(system, kRaftChannelName) {} @@ -110,17 +109,14 @@ class RpcNetwork : public RaftNetworkInterface<State> { auto it = clients_.find(id); if (it == clients_.end()) { auto ne = directory_[id]; - it = clients_ - .try_emplace(id, system_, ne.address(), ne.port(), - kRaftChannelName) - .first; + it = clients_.try_emplace(id, system_, ne, kRaftChannelName).first; } return it->second; } communication::messaging::System &system_; // TODO(mtomic): how to update and distribute this? - std::unordered_map<MemberId, io::network::NetworkEndpoint> directory_; + std::unordered_map<MemberId, io::network::Endpoint> directory_; rpc::Server server_; std::unordered_map<MemberId, communication::rpc::Client> clients_; diff --git a/src/communication/rpc/rpc.cpp b/src/communication/rpc/rpc.cpp index dc9ec21f3..2f95af546 100644 --- a/src/communication/rpc/rpc.cpp +++ b/src/communication/rpc/rpc.cpp @@ -9,24 +9,25 @@ #include "boost/serialization/unique_ptr.hpp" #include "communication/rpc/rpc.hpp" +#include "io/network/endpoint.hpp" #include "utils/string.hpp" namespace communication::rpc { const char kProtocolStreamPrefix[] = "rpc-"; +using Endpoint = io::network::Endpoint; + class Request : public messaging::Message { public: - Request(const std::string &address, uint16_t port, const std::string &stream, + Request(const Endpoint &endpoint, const std::string &stream, std::unique_ptr<Message> message) - : address_(address), - port_(port), + : endpoint_(endpoint), stream_(stream), message_id_(utils::RandomString(20)), message_(std::move(message)) {} - const std::string &address() const { return address_; } - uint16_t port() const { return port_; } + const Endpoint &endpoint() const { return endpoint_; } const std::string &stream() const { return stream_; } const std::string &message_id() const { return message_id_; } const messaging::Message &message() const { return *message_; } @@ -38,15 +39,13 @@ class Request : public messaging::Message { template <class TArchive> void serialize(TArchive &ar, unsigned int) { ar &boost::serialization::base_object<messaging::Message>(*this); - ar &address_; - ar &port_; + ar &endpoint_; ar &stream_; ar &message_id_; ar &message_; } - std::string address_; - uint16_t port_; + io::network::Endpoint endpoint_; std::string stream_; std::string message_id_; std::unique_ptr<messaging::Message> message_; @@ -76,16 +75,11 @@ class Response : public messaging::Message { std::unique_ptr<messaging::Message> message_; }; -Client::Client(messaging::System &system, const std::string &address, - uint16_t port, const std::string &name) - : system_(system), - writer_(system, address, port, kProtocolStreamPrefix + name), - stream_(system.Open(utils::RandomString(20))) {} - -Client::Client(messaging::System &system, - const io::network::NetworkEndpoint &endpoint, +Client::Client(messaging::System &system, const io::network::Endpoint &endpoint, const std::string &name) - : Client(system, endpoint.address(), endpoint.port(), name) {} + : system_(system), + writer_(system, endpoint, kProtocolStreamPrefix + name), + stream_(system.Open(utils::RandomString(20))) {} // Because of the way Call is implemented it can fail without reporting (it will // just block indefinately). This is why you always need to provide reasonable @@ -95,9 +89,8 @@ Client::Client(messaging::System &system, std::unique_ptr<messaging::Message> Client::Call( std::chrono::system_clock::duration timeout, std::unique_ptr<messaging::Message> message) { - auto request = std::make_unique<Request>(system_.endpoint().address(), - system_.endpoint().port(), - stream_->name(), std::move(message)); + auto request = std::make_unique<Request>(system_.endpoint(), stream_->name(), + std::move(message)); auto message_id = request->message_id(); writer_.Send(std::move(request)); @@ -136,8 +129,7 @@ Server::Server(messaging::System &system, const std::string &name) auto it = callbacks_accessor.find(real_request.type_index()); if (it == callbacks_accessor.end()) continue; auto response = it->second(real_request); - messaging::Writer writer(system_, request->address(), request->port(), - request->stream()); + messaging::Writer writer(system_, request->endpoint(), request->stream()); writer.Send<Response>(request->message_id(), std::move(response)); } }); diff --git a/src/communication/rpc/rpc.hpp b/src/communication/rpc/rpc.hpp index 278da664e..be1a4aa92 100644 --- a/src/communication/rpc/rpc.hpp +++ b/src/communication/rpc/rpc.hpp @@ -4,7 +4,7 @@ #include "communication/messaging/distributed.hpp" #include "data_structures/concurrent/concurrent_map.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" namespace communication::rpc { @@ -17,10 +17,8 @@ struct RequestResponse { // Client is thread safe. class Client { public: - Client(messaging::System &system, const std::string &address, uint16_t port, + Client(messaging::System &system, const io::network::Endpoint &endpoint, const std::string &name); - Client(messaging::System &system, - const io::network::NetworkEndpoint &endpoint, const std::string &name); // Call function can initiate only one request at the time. Function blocks // until there is a response or timeout was reached. If timeout was reached diff --git a/src/communication/server.hpp b/src/communication/server.hpp index ab5ac2fd0..e7c00fb22 100644 --- a/src/communication/server.hpp +++ b/src/communication/server.hpp @@ -45,7 +45,7 @@ class Server { * Constructs and binds server to endpoint, operates on session data and * invokes n workers */ - Server(const io::network::NetworkEndpoint &endpoint, + Server(const io::network::Endpoint &endpoint, TSessionData &session_data, size_t n) : session_data_(session_data) { // Without server we can't continue with application so we can just diff --git a/src/database/counters.cpp b/src/database/counters.cpp index e4fbb2cbf..483ec44f9 100644 --- a/src/database/counters.cpp +++ b/src/database/counters.cpp @@ -46,7 +46,7 @@ MasterCounters::MasterCounters(communication::messaging::System &system) WorkerCounters::WorkerCounters( communication::messaging::System &system, - const io::network::NetworkEndpoint &master_endpoint) + const io::network::Endpoint &master_endpoint) : rpc_client_(system, master_endpoint, kCountersRpc) {} int64_t WorkerCounters::Get(const std::string &name) { diff --git a/src/database/counters.hpp b/src/database/counters.hpp index c79e8b358..431dcde7a 100644 --- a/src/database/counters.hpp +++ b/src/database/counters.hpp @@ -55,7 +55,7 @@ class MasterCounters : public SingleNodeCounters { class WorkerCounters : public Counters { public: WorkerCounters(communication::messaging::System &system, - const io::network::NetworkEndpoint &master_endpoint); + const io::network::Endpoint &master_endpoint); int64_t Get(const std::string &name) override; void Set(const std::string &name, int64_t value) override; diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 934f4fad8..7fbf9bcf8 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -11,7 +11,7 @@ #include "database/storage_gc.hpp" #include "database/types.hpp" #include "durability/wal.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "storage/concurrent_id_mapper.hpp" #include "transactions/engine.hpp" #include "utils/scheduler.hpp" @@ -35,8 +35,8 @@ struct Config { // Distributed master/worker flags. int worker_id; - io::network::NetworkEndpoint master_endpoint; - io::network::NetworkEndpoint worker_endpoint; + io::network::Endpoint master_endpoint; + io::network::Endpoint worker_endpoint; }; namespace impl { diff --git a/src/distributed/coordination_master.hpp b/src/distributed/coordination_master.hpp index 5545bf6f2..6e95d6787 100644 --- a/src/distributed/coordination_master.hpp +++ b/src/distributed/coordination_master.hpp @@ -5,10 +5,10 @@ #include "communication/messaging/distributed.hpp" #include "communication/rpc/rpc.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" namespace distributed { -using Endpoint = io::network::NetworkEndpoint; +using Endpoint = io::network::Endpoint; /** Handles worker registration, getting of other workers' endpoints and * coordinated shutdown in a distributed memgraph. Master side. */ diff --git a/src/distributed/coordination_rpc_messages.hpp b/src/distributed/coordination_rpc_messages.hpp index 524f74192..2758d8c18 100644 --- a/src/distributed/coordination_rpc_messages.hpp +++ b/src/distributed/coordination_rpc_messages.hpp @@ -5,7 +5,7 @@ #include "communication/messaging/local.hpp" #include "communication/rpc/rpc.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "utils/rpc_pimp.hpp" namespace distributed { @@ -13,7 +13,7 @@ namespace distributed { const std::string kCoordinationServerName = "CoordinationRpc"; using communication::messaging::Message; -using Endpoint = io::network::NetworkEndpoint; +using Endpoint = io::network::Endpoint; struct RegisterWorkerReq : public Message { RegisterWorkerReq() {} diff --git a/src/distributed/coordination_worker.hpp b/src/distributed/coordination_worker.hpp index ad615eb8a..fb6280bb8 100644 --- a/src/distributed/coordination_worker.hpp +++ b/src/distributed/coordination_worker.hpp @@ -2,10 +2,10 @@ #include "data_structures/concurrent/concurrent_map.hpp" #include "distributed/coordination_rpc_messages.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" namespace distributed { -using Endpoint = io::network::NetworkEndpoint; +using Endpoint = io::network::Endpoint; /** Handles worker registration, getting of other workers' endpoints and * coordinated shutdown in a distributed memgraph. Worker side. */ diff --git a/src/io/network/addrinfo.cpp b/src/io/network/addrinfo.cpp index 0dd92449a..5d64e90d7 100644 --- a/src/io/network/addrinfo.cpp +++ b/src/io/network/addrinfo.cpp @@ -28,4 +28,4 @@ AddrInfo AddrInfo::Get(const char *addr, const char *port) { } AddrInfo::operator struct addrinfo *() { return info; } -} +} // namespace io::network diff --git a/src/io/network/endpoint.cpp b/src/io/network/endpoint.cpp new file mode 100644 index 000000000..845bb59de --- /dev/null +++ b/src/io/network/endpoint.cpp @@ -0,0 +1,36 @@ +#include <arpa/inet.h> +#include <netdb.h> +#include <sys/socket.h> + +#include <algorithm> + +#include "glog/logging.h" + +#include "io/network/endpoint.hpp" + +namespace io::network { + +Endpoint::Endpoint() {} +Endpoint::Endpoint(const std::string &address, uint16_t port) + : address_(address), port_(port) { + in_addr addr4; + in6_addr addr6; + int ipv4_result = inet_pton(AF_INET, address_.c_str(), &addr4); + int ipv6_result = inet_pton(AF_INET6, address_.c_str(), &addr6); + if (ipv4_result == 1) + family_ = 4; + else if (ipv6_result == 1) + family_ = 6; + CHECK(family_ != 0) << "Not a valid IPv4 or IPv6 address: " << address; +} + +bool Endpoint::operator==(const Endpoint &other) const { + return address_ == other.address_ && port_ == other.port_ && + family_ == other.family_; +} + +std::ostream &operator<<(std::ostream &os, const Endpoint &endpoint) { + return os << "Address:" << endpoint.address() << "Port:" << endpoint.port(); +} + +} // namespace io::network diff --git a/src/io/network/network_endpoint.hpp b/src/io/network/endpoint.hpp similarity index 56% rename from src/io/network/network_endpoint.hpp rename to src/io/network/endpoint.hpp index 7e5166e84..5c7e8a477 100644 --- a/src/io/network/network_endpoint.hpp +++ b/src/io/network/endpoint.hpp @@ -2,6 +2,7 @@ #include <netinet/in.h> #include <cstdint> +#include <iostream> #include <string> #include "boost/serialization/access.hpp" @@ -15,19 +16,17 @@ namespace io::network { * It is used when connecting to an address and to get the current * connection address. */ -class NetworkEndpoint { +class Endpoint { public: - NetworkEndpoint(); - NetworkEndpoint(const std::string &addr, const std::string &port); - NetworkEndpoint(const char *addr, const char *port); - NetworkEndpoint(const std::string &addr, uint16_t port); + Endpoint(); + Endpoint(const std::string &address, uint16_t port); - const char *address() const { return address_; } - const char *port_str() const { return port_str_; } + std::string address() const { return address_; } uint16_t port() const { return port_; } unsigned char family() const { return family_; } - bool operator==(const NetworkEndpoint &other) const; + bool operator==(const Endpoint &other) const; + friend std::ostream &operator<<(std::ostream &os, const Endpoint &endpoint); private: friend class boost::serialization::access; @@ -35,15 +34,13 @@ class NetworkEndpoint { template <class TArchive> void serialize(TArchive &ar, unsigned int) { ar &address_; - ar &port_str_; ar &port_; ar &family_; } - char address_[INET6_ADDRSTRLEN]; - char port_str_[6]; - uint16_t port_; - unsigned char family_; + std::string address_; + uint16_t port_{0}; + unsigned char family_{0}; }; } // namespace io::network diff --git a/src/io/network/network_endpoint.cpp b/src/io/network/network_endpoint.cpp deleted file mode 100644 index d3d74c4b9..000000000 --- a/src/io/network/network_endpoint.cpp +++ /dev/null @@ -1,53 +0,0 @@ -#include <arpa/inet.h> -#include <netdb.h> -#include <algorithm> - -#include "glog/logging.h" - -#include "io/network/network_endpoint.hpp" - -namespace io::network { - -NetworkEndpoint::NetworkEndpoint() : port_(0), family_(0) { - memset(address_, 0, sizeof address_); - memset(port_str_, 0, sizeof port_str_); -} - -NetworkEndpoint::NetworkEndpoint(const char *addr, const char *port) { - if (!addr || !port) LOG(FATAL) << "Address or port is nullptr"; - - // strncpy isn't used because it does not guarantee an ending null terminator - snprintf(address_, sizeof address_, "%s", addr); - snprintf(port_str_, sizeof port_str_, "%s", port); - - in_addr addr4; - in6_addr addr6; - int ret = inet_pton(AF_INET, address_, &addr4); - if (ret != 1) { - ret = inet_pton(AF_INET6, address_, &addr6); - if (ret != 1) { - LOG(FATAL) << "Not a valid IPv4 or IPv6 address: " << *addr; - } - family_ = 6; - } else { - family_ = 4; - } - - ret = sscanf(port, "%hu", &port_); - if (ret != 1) LOG(FATAL) << "Not a valid port: " << *port; -} - -NetworkEndpoint::NetworkEndpoint(const std::string &addr, - const std::string &port) - : NetworkEndpoint(addr.c_str(), port.c_str()) {} - -NetworkEndpoint::NetworkEndpoint(const std::string &addr, uint16_t port) - : NetworkEndpoint(addr.c_str(), std::to_string(port)) {} - -bool NetworkEndpoint::operator==(const NetworkEndpoint &other) const { - return std::equal(std::begin(address_), std::end(address_), - std::begin(other.address_)) && - port_ == other.port_ && family_ == other.family_; -} - -} // namespace io::network diff --git a/src/io/network/socket.cpp b/src/io/network/socket.cpp index 52cfef417..3f5106c29 100644 --- a/src/io/network/socket.cpp +++ b/src/io/network/socket.cpp @@ -52,10 +52,11 @@ void Socket::Close() { bool Socket::IsOpen() const { return socket_ != -1; } -bool Socket::Connect(const NetworkEndpoint &endpoint) { +bool Socket::Connect(const Endpoint &endpoint) { if (socket_ != -1) return false; - auto info = AddrInfo::Get(endpoint.address(), endpoint.port_str()); + auto info = AddrInfo::Get(endpoint.address().c_str(), + std::to_string(endpoint.port()).c_str()); for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) { int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol); @@ -71,10 +72,11 @@ bool Socket::Connect(const NetworkEndpoint &endpoint) { return true; } -bool Socket::Bind(const NetworkEndpoint &endpoint) { +bool Socket::Bind(const Endpoint &endpoint) { if (socket_ != -1) return false; - auto info = AddrInfo::Get(endpoint.address(), endpoint.port_str()); + auto info = AddrInfo::Get(endpoint.address().c_str(), + std::to_string(endpoint.port()).c_str()); for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) { int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol); @@ -99,7 +101,7 @@ bool Socket::Bind(const NetworkEndpoint &endpoint) { return false; } - endpoint_ = NetworkEndpoint(endpoint.address(), ntohs(portdata.sin6_port)); + endpoint_ = Endpoint(endpoint.address(), ntohs(portdata.sin6_port)); return true; } @@ -173,7 +175,7 @@ std::experimental::optional<Socket> Socket::Accept() { inet_ntop(addr.ss_family, addr_src, addr_decoded, INET6_ADDRSTRLEN); - NetworkEndpoint endpoint(addr_decoded, port); + Endpoint endpoint(addr_decoded, port); return Socket(sfd, endpoint); } diff --git a/src/io/network/socket.hpp b/src/io/network/socket.hpp index 29536eb6a..d72cd41a3 100644 --- a/src/io/network/socket.hpp +++ b/src/io/network/socket.hpp @@ -4,13 +4,13 @@ #include <functional> #include <iostream> -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" namespace io::network { /** * This class creates a network socket. - * It is used to connect/bind/listen on a NetworkEndpoint (address + port). + * It is used to connect/bind/listen on a Endpoint (address + port). * It has wrappers for setting network socket flags and wrappers for * reading/writing data from/to the socket. */ @@ -40,24 +40,24 @@ class Socket { /** * Connects the socket to the specified endpoint. * - * @param endpoint NetworkEndpoint to which to connect to + * @param endpoint Endpoint to which to connect to * * @return connection success status: * true if the connect succeeded * false if the connect failed */ - bool Connect(const NetworkEndpoint &endpoint); + bool Connect(const Endpoint &endpoint); /** * Binds the socket to the specified endpoint. * - * @param endpoint NetworkEndpoint to which to bind to + * @param endpoint Endpoint to which to bind to * * @return bind success status: * true if the bind succeeded * false if the bind failed */ - bool Bind(const NetworkEndpoint &endpoint); + bool Bind(const Endpoint &endpoint); /** * Start listening on the bound socket. @@ -112,7 +112,7 @@ class Socket { /** * Returns the currently active endpoint of the socket. */ - const NetworkEndpoint &endpoint() const { return endpoint_; } + const Endpoint &endpoint() const { return endpoint_; } /** * Write data to the socket. @@ -157,10 +157,10 @@ class Socket { int Read(void *buffer, size_t len); private: - Socket(int fd, const NetworkEndpoint &endpoint) + Socket(int fd, const Endpoint &endpoint) : socket_(fd), endpoint_(endpoint) {} int socket_ = -1; - NetworkEndpoint endpoint_; + Endpoint endpoint_; }; -} +} // namespace io::network diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index e44e4eac8..97e690f85 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -12,7 +12,7 @@ #include "database/graph_db.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "io/network/network_error.hpp" #include "io/network/socket.hpp" #include "utils/flag_validation.hpp" @@ -24,16 +24,17 @@ #include "version.hpp" namespace fs = std::experimental::filesystem; -using io::network::NetworkEndpoint; -using io::network::Socket; using communication::bolt::SessionData; +using io::network::Endpoint; +using io::network::Socket; using SessionT = communication::bolt::Session<Socket>; using ServerT = communication::Server<SessionT, SessionData>; // General purpose flags. DEFINE_string(interface, "0.0.0.0", "Communication interface on which to listen."); -DEFINE_string(port, "7687", "Communication port on which to listen."); +DEFINE_VALIDATED_int32(port, 7687, "Communication port on which to listen.", + FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max())); DEFINE_VALIDATED_int32(num_workers, std::max(std::thread::hardware_concurrency(), 1U), "Number of workers", FLAG_IN_RANGE(1, INT32_MAX)); @@ -103,8 +104,8 @@ void MasterMain() { database::Master db; SessionData session_data{db}; - ServerT server({FLAGS_interface, FLAGS_port}, session_data, - FLAGS_num_workers); + ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)}, + session_data, FLAGS_num_workers); // Handler for regular termination signals auto shutdown = [&server] { @@ -131,8 +132,8 @@ void SingleNodeMain() { google::SetUsageMessage("Memgraph single-node database server"); database::SingleNode db; SessionData session_data{db}; - ServerT server({FLAGS_interface, FLAGS_port}, session_data, - FLAGS_num_workers); + ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)}, + session_data, FLAGS_num_workers); // Handler for regular termination signals auto shutdown = [&server] { diff --git a/src/storage/concurrent_id_mapper_worker.cpp b/src/storage/concurrent_id_mapper_worker.cpp index 285a8a916..d6c36dd05 100644 --- a/src/storage/concurrent_id_mapper_worker.cpp +++ b/src/storage/concurrent_id_mapper_worker.cpp @@ -34,7 +34,7 @@ ID_VALUE_RPC_CALLS(Property) template <typename TId> WorkerConcurrentIdMapper<TId>::WorkerConcurrentIdMapper( communication::messaging::System &system, - const io::network::NetworkEndpoint &master_endpoint) + const io::network::Endpoint &master_endpoint) : rpc_client_(system, master_endpoint, kConcurrentIdMapperRpc) {} template <typename TId> diff --git a/src/storage/concurrent_id_mapper_worker.hpp b/src/storage/concurrent_id_mapper_worker.hpp index 31561f8c8..97bae133f 100644 --- a/src/storage/concurrent_id_mapper_worker.hpp +++ b/src/storage/concurrent_id_mapper_worker.hpp @@ -3,7 +3,7 @@ #include "communication/messaging/distributed.hpp" #include "communication/rpc/rpc.hpp" #include "data_structures/concurrent/concurrent_map.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "storage/concurrent_id_mapper.hpp" namespace storage { @@ -19,7 +19,7 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId> { public: WorkerConcurrentIdMapper(communication::messaging::System &system, - const io::network::NetworkEndpoint &master_endpoint); + const io::network::Endpoint &master_endpoint); TId value_to_id(const std::string &value) override; const std::string &id_to_value(const TId &id) override; diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index f4815def7..5b871d86c 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -10,7 +10,7 @@ static const auto kRpcTimeout = 100ms; } // namespace WorkerEngine::WorkerEngine(communication::messaging::System &system, - const io::network::NetworkEndpoint &endpoint) + const io::network::Endpoint &endpoint) : rpc_client_(system, endpoint, kTransactionEngineRpc) {} Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) { diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index 84d05dbd5..d74d3cbfa 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -6,7 +6,7 @@ #include "communication/messaging/distributed.hpp" #include "communication/rpc/rpc.hpp" #include "data_structures/concurrent/concurrent_map.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "transactions/commit_log.hpp" #include "transactions/engine.hpp" #include "transactions/transaction.hpp" @@ -18,7 +18,7 @@ namespace tx { class WorkerEngine : public Engine { public: WorkerEngine(communication::messaging::System &system, - const io::network::NetworkEndpoint &endpoint); + const io::network::Endpoint &endpoint); Transaction *LocalBegin(transaction_id_t tx_id); diff --git a/src/utils/network.cpp b/src/utils/network.cpp new file mode 100644 index 000000000..4149345f1 --- /dev/null +++ b/src/utils/network.cpp @@ -0,0 +1,44 @@ +#include "utils/network.hpp" + +#include <arpa/inet.h> +#include <netdb.h> + +#include <cstring> +#include <string> + +#include "glog/logging.h" + +namespace utils { + +/// Resolves hostname to ip, if already an ip, just returns it +std::string ResolveHostname(std::string hostname) { + addrinfo hints; + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6 + hints.ai_socktype = SOCK_STREAM; + + int addr_result; + addrinfo *servinfo; + CHECK((addr_result = + getaddrinfo(hostname.c_str(), NULL, &hints, &servinfo)) == 0) + << "Error with getaddrinfo:" << gai_strerror(addr_result); + CHECK(servinfo) << "Could not resolve address: " << hostname; + + std::string address; + if (servinfo->ai_family == AF_INET) { + sockaddr_in *hipv4 = (sockaddr_in *)servinfo->ai_addr; + char astring[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(hipv4->sin_addr), astring, INET_ADDRSTRLEN); + address = astring; + } else { + sockaddr_in6 *hipv6 = (sockaddr_in6 *)servinfo->ai_addr; + char astring[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, &(hipv6->sin6_addr), astring, INET6_ADDRSTRLEN); + address = astring; + } + + freeaddrinfo(servinfo); + return address; +} + +}; // namespace utils diff --git a/src/utils/network.hpp b/src/utils/network.hpp new file mode 100644 index 000000000..7e0333a1b --- /dev/null +++ b/src/utils/network.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include <string> + +namespace utils { + +/// Resolves hostname to ip, if already an ip, just returns it +std::string ResolveHostname(std::string hostname); + +}; // namespace utils diff --git a/tests/concurrent/network_common.hpp b/tests/concurrent/network_common.hpp index 1a3427240..85d29deb0 100644 --- a/tests/concurrent/network_common.hpp +++ b/tests/concurrent/network_common.hpp @@ -19,7 +19,7 @@ static constexpr const int SIZE = 60000; static constexpr const int REPLY = 10; -using io::network::NetworkEndpoint; +using io::network::Endpoint; using io::network::Socket; class TestData {}; @@ -66,13 +66,13 @@ class TestSession { using ServerT = communication::Server<TestSession, TestData>; -void client_run(int num, const char *interface, const char *port, +void client_run(int num, const char *interface, uint16_t port, const unsigned char *data, int lo, int hi) { std::stringstream name; name << "Client " << num; unsigned char buffer[SIZE * REPLY], head[2]; int have, read; - NetworkEndpoint endpoint(interface, port); + Endpoint endpoint(interface, port); Socket socket; ASSERT_TRUE(socket.Connect(endpoint)); socket.SetTimeout(2, 0); diff --git a/tests/concurrent/network_read_hang.cpp b/tests/concurrent/network_read_hang.cpp index e7373838f..028782bcc 100644 --- a/tests/concurrent/network_read_hang.cpp +++ b/tests/concurrent/network_read_hang.cpp @@ -20,7 +20,7 @@ static constexpr const char interface[] = "127.0.0.1"; -using io::network::NetworkEndpoint; +using io::network::Endpoint; using io::network::Socket; class TestData {}; @@ -53,8 +53,8 @@ class TestSession { std::atomic<bool> run{true}; -void client_run(int num, const char *interface, const char *port) { - NetworkEndpoint endpoint(interface, port); +void client_run(int num, const char *interface, uint16_t port) { + Endpoint endpoint(interface, port); Socket socket; uint8_t data = 0x00; ASSERT_TRUE(socket.Connect(endpoint)); @@ -70,9 +70,9 @@ void client_run(int num, const char *interface, const char *port) { TEST(Network, SocketReadHangOnConcurrentConnections) { // initialize listen socket - NetworkEndpoint endpoint(interface, "0"); + Endpoint endpoint(interface, 0); - printf("ADDRESS: %s, PORT: %d\n", endpoint.address(), endpoint.port()); + std::cout << endpoint << std::endl; // initialize server TestData data; @@ -84,7 +84,7 @@ TEST(Network, SocketReadHangOnConcurrentConnections) { // start clients std::vector<std::thread> clients; for (int i = 0; i < Nc; ++i) - clients.push_back(std::thread(client_run, i, interface, ep.port_str())); + clients.push_back(std::thread(client_run, i, interface, ep.port())); // wait for 2s and stop clients std::this_thread::sleep_for(std::chrono::seconds(2)); diff --git a/tests/concurrent/network_server.cpp b/tests/concurrent/network_server.cpp index 75b6878f1..e130fa405 100644 --- a/tests/concurrent/network_server.cpp +++ b/tests/concurrent/network_server.cpp @@ -2,6 +2,8 @@ #define NDEBUG #endif +#include <iostream> + #include "network_common.hpp" static constexpr const char interface[] = "127.0.0.1"; @@ -13,8 +15,8 @@ TEST(Network, Server) { initialize_data(data, SIZE); // initialize listen socket - NetworkEndpoint endpoint(interface, "0"); - printf("ADDRESS: %s, PORT: %d\n", endpoint.address(), endpoint.port()); + Endpoint endpoint(interface, 0); + std::cout << endpoint << std::endl; // initialize server TestData session_data; @@ -25,8 +27,8 @@ TEST(Network, Server) { // start clients std::vector<std::thread> clients; for (int i = 0; i < N; ++i) - clients.push_back(std::thread(client_run, i, interface, ep.port_str(), data, - 30000, SIZE)); + clients.push_back( + std::thread(client_run, i, interface, ep.port(), data, 30000, SIZE)); // cleanup clients for (int i = 0; i < N; ++i) clients[i].join(); diff --git a/tests/concurrent/network_session_leak.cpp b/tests/concurrent/network_session_leak.cpp index 36c550e9a..98284d71d 100644 --- a/tests/concurrent/network_session_leak.cpp +++ b/tests/concurrent/network_session_leak.cpp @@ -3,6 +3,7 @@ #endif #include <chrono> +#include <iostream> #include "network_common.hpp" @@ -17,8 +18,8 @@ TEST(Network, SessionLeak) { initialize_data(data, SIZE); // initialize listen socket - NetworkEndpoint endpoint(interface, "0"); - printf("ADDRESS: %s, PORT: %d\n", endpoint.address(), endpoint.port()); + Endpoint endpoint(interface, 0); + std::cout << endpoint << std::endl; // initialize server TestData session_data; @@ -31,7 +32,7 @@ TEST(Network, SessionLeak) { const auto &ep = server.endpoint(); int testlen = 3000; for (int i = 0; i < N; ++i) { - clients.push_back(std::thread(client_run, i, interface, ep.port_str(), data, + clients.push_back(std::thread(client_run, i, interface, ep.port(), data, testlen, testlen)); std::this_thread::sleep_for(10ms); } diff --git a/tests/distributed/raft/example_client.cpp b/tests/distributed/raft/example_client.cpp index be91091c1..159ee858a 100644 --- a/tests/distributed/raft/example_client.cpp +++ b/tests/distributed/raft/example_client.cpp @@ -8,7 +8,9 @@ #include "communication/messaging/distributed.hpp" #include "communication/rpc/rpc.hpp" +#include "io/network/endpoint.hpp" #include "messages.hpp" +#include "utils/network.hpp" #include "utils/signals/handler.hpp" #include "utils/terminate_handler.hpp" @@ -18,10 +20,10 @@ using namespace communication::rpc; using namespace std::literals::chrono_literals; DEFINE_string(interface, "127.0.0.1", "Client system interface."); -DEFINE_string(port, "8020", "Client system port."); +DEFINE_int32(port, 8020, "Client system port."); DEFINE_string(server_interface, "127.0.0.1", "Server interface on which to communicate."); -DEFINE_string(server_port, "8010", "Server port on which to communicate."); +DEFINE_int32(server_port, 8010, "Server port on which to communicate."); volatile sig_atomic_t is_shutting_down = 0; @@ -32,9 +34,12 @@ int main(int argc, char **argv) { google::InitGoogleLogging(argv[0]); // Initialize client. - System client_system(FLAGS_interface, stoul(FLAGS_port)); - Client client(client_system, FLAGS_server_interface, stoul(FLAGS_server_port), - "main"); + System client_system(io::network::Endpoint(FLAGS_interface, FLAGS_port)); + Client client( + client_system, + io::network::Endpoint(utils::ResolveHostname(FLAGS_server_interface), + FLAGS_server_port), + "main"); // Try to send 100 values to server. // If requests timeout, try to resend it. diff --git a/tests/distributed/raft/example_server.cpp b/tests/distributed/raft/example_server.cpp index 43610c128..89d9fec90 100644 --- a/tests/distributed/raft/example_server.cpp +++ b/tests/distributed/raft/example_server.cpp @@ -32,7 +32,8 @@ int main(int argc, char **argv) { // Unhandled exception handler init. std::set_terminate(&terminate_handler); - System server_system(FLAGS_interface, stoul(FLAGS_port)); + System server_system( + io::network::Endpoint(FLAGS_interface, stoul(FLAGS_port))); Server server(server_system, "main"); std::ofstream log(FLAGS_log, std::ios_base::app); diff --git a/tests/macro_benchmark/clients/bolt_client.hpp b/tests/macro_benchmark/clients/bolt_client.hpp index b24acbb80..a9fb5e68f 100644 --- a/tests/macro_benchmark/clients/bolt_client.hpp +++ b/tests/macro_benchmark/clients/bolt_client.hpp @@ -5,18 +5,18 @@ #include "communication/bolt/client.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "io/network/socket.hpp" using SocketT = io::network::Socket; -using EndpointT = io::network::NetworkEndpoint; +using EndpointT = io::network::Endpoint; using ClientT = communication::bolt::Client<SocketT>; using QueryDataT = communication::bolt::QueryData; using communication::bolt::DecodedValue; class BoltClient { public: - BoltClient(const std::string &address, const std::string &port, + BoltClient(const std::string &address, uint16_t port, const std::string &username, const std::string &password, const std::string & = "") { SocketT socket; diff --git a/tests/macro_benchmark/clients/long_running_client.cpp b/tests/macro_benchmark/clients/long_running_client.cpp index 87d18a9ae..08eaa841d 100644 --- a/tests/macro_benchmark/clients/long_running_client.cpp +++ b/tests/macro_benchmark/clients/long_running_client.cpp @@ -17,19 +17,19 @@ #include "common.hpp" #include "communication/bolt/client.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "io/network/socket.hpp" #include "threading/sync/spinlock.hpp" #include "utils/algorithm.hpp" -#include "utils/algorithm.hpp" +#include "utils/network.hpp" #include "utils/timer.hpp" +using communication::bolt::DecodedEdge; using communication::bolt::DecodedValue; using communication::bolt::DecodedVertex; -using communication::bolt::DecodedEdge; DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_string(port, "7687", "Server port"); +DEFINE_int32(port, 7687, "Server port"); DEFINE_int32(num_workers, 1, "Number of workers"); DEFINE_string(output, "", "Output file"); DEFINE_string(username, "", "Username for the database"); @@ -49,7 +49,7 @@ std::atomic<int64_t> executed_queries; class Session { public: Session(const nlohmann::json &config, const std::string &address, - const std::string &port, const std::string &username, + uint16_t port, const std::string &username, const std::string &password) : config_(config), client_(address, port, username, password) {} @@ -269,10 +269,11 @@ int64_t NumNodes(BoltClient &client, const std::string &label) { std::vector<int64_t> Neighbours(BoltClient &client, const std::string &label, int64_t id) { - auto result = ExecuteNTimesTillSuccess( - client, "MATCH (n :" + label + " {id: " + std::to_string(id) + - "})-[e]-(m) RETURN m.id", - {}, MAX_RETRIES); + auto result = ExecuteNTimesTillSuccess(client, + "MATCH (n :" + label + + " {id: " + std::to_string(id) + + "})-[e]-(m) RETURN m.id", + {}, MAX_RETRIES); std::vector<int64_t> ret; for (const auto &record : result.records) { ret.push_back(record[0].ValueInt()); @@ -319,8 +320,8 @@ int main(int argc, char **argv) { const std::string independent_label = config["independent_label"]; auto independent_nodes_ids = [&] { - BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, - FLAGS_password); + BoltClient client(utils::ResolveHostname(FLAGS_address), FLAGS_port, + FLAGS_username, FLAGS_password); return IndependentSet(client, independent_label); }(); @@ -387,7 +388,10 @@ int main(int argc, char **argv) { // little bit chaotic. Think about refactoring this part to only use json // and write DecodedValue to json converter. const std::vector<std::string> fields = { - "wall_time", "parsing_time", "planning_time", "plan_execution_time", + "wall_time", + "parsing_time", + "planning_time", + "plan_execution_time", }; for (const auto &query_stats : stats) { std::map<std::string, double> new_aggregated_query_stats; @@ -417,12 +421,13 @@ int main(int argc, char **argv) { out << "{\"num_executed_queries\": " << executed_queries << ", " << "\"elapsed_time\": " << timer.Elapsed().count() << ", \"queries\": ["; - utils::PrintIterable(out, aggregated_stats, ", ", [](auto &stream, - const auto &x) { - stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": "; - PrintJsonDecodedValue(stream, DecodedValue(x.second)); - stream << "}"; - }); + utils::PrintIterable( + out, aggregated_stats, ", ", [](auto &stream, const auto &x) { + stream << "{\"query\": " << nlohmann::json(x.first) + << ", \"stats\": "; + PrintJsonDecodedValue(stream, DecodedValue(x.second)); + stream << "}"; + }); out << "]}" << std::endl; out.flush(); std::this_thread::sleep_for(1s); diff --git a/tests/macro_benchmark/clients/query_client.cpp b/tests/macro_benchmark/clients/query_client.cpp index 1dc21221e..8bf9c10ff 100644 --- a/tests/macro_benchmark/clients/query_client.cpp +++ b/tests/macro_benchmark/clients/query_client.cpp @@ -21,7 +21,7 @@ DEFINE_string(input, "", "Input file"); DEFINE_string(output, "", "Output file"); DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_string(port, "", "Server port"); +DEFINE_int32(port, 0, "Server port"); DEFINE_string(username, "", "Username for the database"); DEFINE_string(password, "", "Password for the database"); DEFINE_string(database, "", "Database for the database"); @@ -51,9 +51,9 @@ void PrintSummary( template <typename ClientT> void ExecuteQueries(const std::vector<std::string> &queries, int num_workers, - std::ostream &ostream, std::string &address, - std::string &port, std::string &username, - std::string &password, std::string &database) { + std::ostream &ostream, std::string &address, uint16_t port, + std::string &username, std::string &password, + std::string &database) { std::vector<std::thread> threads; SpinLock spinlock; @@ -121,11 +121,11 @@ int main(int argc, char **argv) { ostream = &ofile; } - std::string port = FLAGS_port; + uint16_t port = FLAGS_port; if (FLAGS_protocol == "bolt") { - if (port == "") port = "7687"; + if (port == 0) port = 7687; } else if (FLAGS_protocol == "postgres") { - if (port == "") port = "5432"; + if (port == 0) port = 5432; } while (!istream->eof()) { diff --git a/tests/manual/bolt_client.cpp b/tests/manual/bolt_client.cpp index 5437d7811..b1b50e974 100644 --- a/tests/manual/bolt_client.cpp +++ b/tests/manual/bolt_client.cpp @@ -2,16 +2,17 @@ #include <glog/logging.h> #include "communication/bolt/client.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "io/network/socket.hpp" +#include "utils/network.hpp" #include "utils/timer.hpp" using SocketT = io::network::Socket; -using EndpointT = io::network::NetworkEndpoint; +using EndpointT = io::network::Endpoint; using ClientT = communication::bolt::Client<SocketT>; DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_string(port, "7687", "Server port"); +DEFINE_int32(port, 7687, "Server port"); DEFINE_string(username, "", "Username for the database"); DEFINE_string(password, "", "Password for the database"); @@ -20,7 +21,7 @@ int main(int argc, char **argv) { google::InitGoogleLogging(argv[0]); // TODO: handle endpoint exception - EndpointT endpoint(FLAGS_address, FLAGS_port); + EndpointT endpoint(utils::ResolveHostname(FLAGS_address), FLAGS_port); SocketT socket; if (!socket.Connect(endpoint)) return 1; diff --git a/tests/manual/raft_rpc.cpp b/tests/manual/raft_rpc.cpp index 1f064b70a..730684d64 100644 --- a/tests/manual/raft_rpc.cpp +++ b/tests/manual/raft_rpc.cpp @@ -12,7 +12,7 @@ namespace raft = communication::raft; -using io::network::NetworkEndpoint; +using io::network::Endpoint; using raft::RaftConfig; using raft::RpcNetwork; using raft::test_utils::DummyState; @@ -35,13 +35,12 @@ int main(int argc, char *argv[]) { google::InitGoogleLogging(argv[0]); gflags::ParseCommandLineFlags(&argc, &argv, true); - std::unordered_map<std::string, NetworkEndpoint> directory = { - {"a", NetworkEndpoint("127.0.0.1", 12345)}, - {"b", NetworkEndpoint("127.0.0.1", 12346)}, - {"c", NetworkEndpoint("127.0.0.1", 12347)}}; + std::unordered_map<std::string, Endpoint> directory = { + {"a", Endpoint("127.0.0.1", 12345)}, + {"b", Endpoint("127.0.0.1", 12346)}, + {"c", Endpoint("127.0.0.1", 12347)}}; - communication::messaging::System my_system("127.0.0.1", - directory[FLAGS_member_id].port()); + communication::messaging::System my_system(directory[FLAGS_member_id]); RpcNetwork<DummyState> network(my_system, directory); raft::test_utils::InMemoryStorageInterface<DummyState> storage(0, {}, {}); diff --git a/tests/stress/long_running.cpp b/tests/stress/long_running.cpp index 476f4347b..ef81a5365 100644 --- a/tests/stress/long_running.cpp +++ b/tests/stress/long_running.cpp @@ -3,20 +3,20 @@ #include <glog/logging.h> #include "communication/bolt/client.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "io/network/socket.hpp" #include "utils/exceptions.hpp" #include "utils/timer.hpp" using SocketT = io::network::Socket; -using EndpointT = io::network::NetworkEndpoint; +using EndpointT = io::network::Endpoint; using ClientT = communication::bolt::Client<SocketT>; using DecodedValueT = communication::bolt::DecodedValue; using QueryDataT = communication::bolt::QueryData; using ExceptionT = communication::bolt::ClientQueryException; DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_string(port, "7687", "Server port"); +DEFINE_int32(port, 7687, "Server port"); DEFINE_string(username, "", "Username for the database"); DEFINE_string(password, "", "Password for the database"); diff --git a/tests/unit/concurrent_id_mapper_distributed.cpp b/tests/unit/concurrent_id_mapper_distributed.cpp index 53e8706f7..34aa6fd18 100644 --- a/tests/unit/concurrent_id_mapper_distributed.cpp +++ b/tests/unit/concurrent_id_mapper_distributed.cpp @@ -16,9 +16,9 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test { const std::string kLocal{"127.0.0.1"}; protected: - System master_system_{kLocal, 0}; + System master_system_{{kLocal, 0}}; std::experimental::optional<MasterConcurrentIdMapper<TId>> master_mapper_; - System worker_system_{kLocal, 0}; + System worker_system_{{kLocal, 0}}; std::experimental::optional<WorkerConcurrentIdMapper<TId>> worker_mapper_; void SetUp() override { diff --git a/tests/unit/counters.cpp b/tests/unit/counters.cpp index f0e1bdad4..7d034846d 100644 --- a/tests/unit/counters.cpp +++ b/tests/unit/counters.cpp @@ -6,13 +6,13 @@ const std::string kLocal = "127.0.0.1"; TEST(CountersDistributed, All) { - communication::messaging::System master_sys(kLocal, 0); + communication::messaging::System master_sys({kLocal, 0}); database::MasterCounters master(master_sys); - communication::messaging::System w1_sys(kLocal, 0); + communication::messaging::System w1_sys({kLocal, 0}); database::WorkerCounters w1(w1_sys, master_sys.endpoint()); - communication::messaging::System w2_sys(kLocal, 0); + communication::messaging::System w2_sys({kLocal, 0}); database::WorkerCounters w2(w2_sys, master_sys.endpoint()); EXPECT_EQ(w1.Get("a"), 0); diff --git a/tests/unit/distributed_coordination.cpp b/tests/unit/distributed_coordination.cpp index a434bc197..f672fe09c 100644 --- a/tests/unit/distributed_coordination.cpp +++ b/tests/unit/distributed_coordination.cpp @@ -9,7 +9,7 @@ #include "communication/messaging/distributed.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" using communication::messaging::System; using namespace distributed; @@ -19,10 +19,9 @@ const std::string kLocal = "127.0.0.1"; class WorkerInThread { public: - WorkerInThread(io::network::NetworkEndpoint master_endpoint, - int desired_id = -1) { + WorkerInThread(io::network::Endpoint master_endpoint, int desired_id = -1) { worker_thread_ = std::thread([this, master_endpoint, desired_id] { - system_.emplace(kLocal, 0); + system_.emplace(Endpoint(kLocal, 0)); coord_.emplace(*system_, master_endpoint); worker_id_ = coord_->RegisterWorker(desired_id); coord_->WaitForShutdown(); @@ -42,7 +41,7 @@ class WorkerInThread { }; TEST(Distributed, Coordination) { - System master_system(kLocal, 0); + System master_system({kLocal, 0}); std::vector<std::unique_ptr<WorkerInThread>> workers; { MasterCoordination master_coord(master_system); @@ -71,7 +70,7 @@ TEST(Distributed, Coordination) { } TEST(Distributed, DesiredAndUniqueId) { - System master_system(kLocal, 0); + System master_system({kLocal, 0}); std::vector<std::unique_ptr<WorkerInThread>> workers; { MasterCoordination master_coord(master_system); diff --git a/tests/unit/messaging_distributed.cpp b/tests/unit/messaging_distributed.cpp index 9dc47c093..706c928cb 100644 --- a/tests/unit/messaging_distributed.cpp +++ b/tests/unit/messaging_distributed.cpp @@ -43,17 +43,16 @@ BOOST_CLASS_EXPORT(MessageInt); * Test do the services start up without crashes. */ TEST(SimpleTests, StartAndShutdown) { - System system("127.0.0.1", 0); + System system({"127.0.0.1", 0}); // do nothing std::this_thread::sleep_for(500ms); } TEST(Messaging, Pop) { - System master_system("127.0.0.1", 0); - System slave_system("127.0.0.1", 0); + System master_system({"127.0.0.1", 0}); + System slave_system({"127.0.0.1", 0}); auto stream = master_system.Open("main"); - Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(), - "main"); + Writer writer(slave_system, master_system.endpoint(), "main"); std::this_thread::sleep_for(100ms); EXPECT_EQ(stream->Poll(), nullptr); @@ -62,11 +61,10 @@ TEST(Messaging, Pop) { } TEST(Messaging, Await) { - System master_system("127.0.0.1", 0); - System slave_system("127.0.0.1", 0); + System master_system({"127.0.0.1", 0}); + System slave_system({"127.0.0.1", 0}); auto stream = master_system.Open("main"); - Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(), - "main"); + Writer writer(slave_system, master_system.endpoint(), "main"); std::this_thread::sleep_for(100ms); std::thread t([&] { @@ -82,11 +80,10 @@ TEST(Messaging, Await) { } TEST(Messaging, RecreateChannelAfterClosing) { - System master_system("127.0.0.1", 0); - System slave_system("127.0.0.1", 0); + System master_system({"127.0.0.1", 0}); + System slave_system({"127.0.0.1", 0}); auto stream = master_system.Open("main"); - Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(), - "main"); + Writer writer(slave_system, master_system.endpoint(), "main"); std::this_thread::sleep_for(100ms); writer.Send<MessageInt>(10); diff --git a/tests/unit/network_endpoint.cpp b/tests/unit/network_endpoint.cpp index dc72e7e80..c7112f6e0 100644 --- a/tests/unit/network_endpoint.cpp +++ b/tests/unit/network_endpoint.cpp @@ -2,76 +2,35 @@ #include "gtest/gtest.h" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "io/network/network_error.hpp" -using endpoint_t = io::network::NetworkEndpoint; +using endpoint_t = io::network::Endpoint; -TEST(NetworkEndpoint, IPv4) { +TEST(Endpoint, IPv4) { endpoint_t endpoint; - // test first constructor - endpoint = endpoint_t("127.0.0.1", "12345"); - EXPECT_STREQ(endpoint.address(), "127.0.0.1"); - EXPECT_STREQ(endpoint.port_str(), "12345"); - EXPECT_EQ(endpoint.port(), 12345); - EXPECT_EQ(endpoint.family(), 4); - - // test second constructor - std::string addr("127.0.0.2"), port("12346"); - endpoint = endpoint_t(addr, port); - EXPECT_STREQ(endpoint.address(), "127.0.0.2"); - EXPECT_STREQ(endpoint.port_str(), "12346"); - EXPECT_EQ(endpoint.port(), 12346); - EXPECT_EQ(endpoint.family(), 4); - - // test third constructor + // test constructor endpoint = endpoint_t("127.0.0.1", 12347); - EXPECT_STREQ(endpoint.address(), "127.0.0.1"); - EXPECT_STREQ(endpoint.port_str(), "12347"); + EXPECT_EQ(endpoint.address(), "127.0.0.1"); EXPECT_EQ(endpoint.port(), 12347); EXPECT_EQ(endpoint.family(), 4); - // test address null - EXPECT_DEATH(endpoint_t(nullptr, nullptr), "null"); - // test address invalid - EXPECT_DEATH(endpoint_t("invalid", "12345"), "addres"); - - // test port invalid - EXPECT_DEATH(endpoint_t("127.0.0.1", "invalid"), "port"); + EXPECT_DEATH(endpoint_t("invalid", 12345), "address"); } -TEST(NetworkEndpoint, IPv6) { +TEST(Endpoint, IPv6) { endpoint_t endpoint; - // test first constructor - endpoint = endpoint_t("ab:cd:ef::1", "12345"); - EXPECT_STREQ(endpoint.address(), "ab:cd:ef::1"); - EXPECT_STREQ(endpoint.port_str(), "12345"); - EXPECT_EQ(endpoint.port(), 12345); - EXPECT_EQ(endpoint.family(), 6); - - // test second constructor - std::string addr("ab:cd:ef::2"), port("12346"); - endpoint = endpoint_t(addr, port); - EXPECT_STREQ(endpoint.address(), "ab:cd:ef::2"); - EXPECT_STREQ(endpoint.port_str(), "12346"); - EXPECT_EQ(endpoint.port(), 12346); - EXPECT_EQ(endpoint.family(), 6); - - // test third constructor + // test constructor endpoint = endpoint_t("ab:cd:ef::3", 12347); - EXPECT_STREQ(endpoint.address(), "ab:cd:ef::3"); - EXPECT_STREQ(endpoint.port_str(), "12347"); + EXPECT_EQ(endpoint.address(), "ab:cd:ef::3"); EXPECT_EQ(endpoint.port(), 12347); EXPECT_EQ(endpoint.family(), 6); // test address invalid - EXPECT_DEATH(endpoint_t("::g", "12345"), "address"); - - // test port invalid - EXPECT_DEATH(endpoint_t("::1", "invalid"), "port"); + EXPECT_DEATH(endpoint_t("::g", 12345), "address"); } int main(int argc, char **argv) { diff --git a/tests/unit/network_timeouts.cpp b/tests/unit/network_timeouts.cpp index af73eff8f..a803d07b1 100644 --- a/tests/unit/network_timeouts.cpp +++ b/tests/unit/network_timeouts.cpp @@ -9,8 +9,7 @@ #include "communication/bolt/client.hpp" #include "communication/bolt/v1/session.hpp" #include "communication/server.hpp" -#include "database/graph_db.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "io/network/socket.hpp" DECLARE_int32(query_execution_time_sec); @@ -20,7 +19,7 @@ using namespace std::chrono_literals; class TestClientSocket; using communication::bolt::ClientException; using communication::bolt::SessionData; -using io::network::NetworkEndpoint; +using io::network::Endpoint; using io::network::Socket; using SessionT = communication::bolt::Session<Socket>; using ResultStreamT = SessionT::ResultStreamT; @@ -31,13 +30,13 @@ class RunningServer { public: database::SingleNode db_; SessionData session_data_{db_}; - NetworkEndpoint endpoint_{"127.0.0.1", "0"}; + Endpoint endpoint_{"127.0.0.1", 0}; ServerT server_{endpoint_, session_data_, 1}; }; class TestClient : public ClientT { public: - TestClient(NetworkEndpoint endpoint) + TestClient(Endpoint endpoint) : ClientT( [&] { Socket socket; diff --git a/tests/unit/rpc.cpp b/tests/unit/rpc.cpp index 40ba4c5d8..be8e9e5cb 100644 --- a/tests/unit/rpc.cpp +++ b/tests/unit/rpc.cpp @@ -55,22 +55,21 @@ BOOST_CLASS_EXPORT(SumRes); using Sum = RequestResponse<SumReq, SumRes>; TEST(Rpc, Call) { - System server_system("127.0.0.1", 0); + System server_system({"127.0.0.1", 0}); Server server(server_system, "main"); server.Register<Sum>([](const SumReq &request) { return std::make_unique<SumRes>(request.x + request.y); }); std::this_thread::sleep_for(100ms); - System client_system("127.0.0.1", 0); - Client client(client_system, "127.0.0.1", server_system.endpoint().port(), - "main"); + System client_system({"127.0.0.1", 0}); + Client client(client_system, server_system.endpoint(), "main"); auto sum = client.Call<Sum>(300ms, 10, 20); EXPECT_EQ(sum->sum, 30); } TEST(Rpc, Timeout) { - System server_system("127.0.0.1", 0); + System server_system({"127.0.0.1", 0}); Server server(server_system, "main"); server.Register<Sum>([](const SumReq &request) { std::this_thread::sleep_for(300ms); @@ -78,9 +77,8 @@ TEST(Rpc, Timeout) { }); std::this_thread::sleep_for(100ms); - System client_system("127.0.0.1", 0); - Client client(client_system, "127.0.0.1", server_system.endpoint().port(), - "main"); + System client_system({"127.0.0.1", 0}); + Client client(client_system, server_system.endpoint(), "main"); auto sum = client.Call<Sum>(100ms, 10, 20); EXPECT_FALSE(sum); } diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index 17d6e73f3..b83df7927 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -3,7 +3,7 @@ #include "gtest/gtest.h" #include "communication/messaging/distributed.hpp" -#include "io/network/network_endpoint.hpp" +#include "io/network/endpoint.hpp" #include "transactions/engine_master.hpp" #include "transactions/engine_rpc_messages.hpp" #include "transactions/engine_worker.hpp" @@ -15,10 +15,10 @@ class WorkerEngineTest : public testing::Test { protected: const std::string local{"127.0.0.1"}; - System master_system_{local, 0}; + System master_system_{{local, 0}}; MasterEngine master_{master_system_}; - System worker_system_{local, 0}; + System worker_system_{{local, 0}}; WorkerEngine worker_{worker_system_, master_system_.endpoint()}; }; diff --git a/tests/unit/utils_network.cpp b/tests/unit/utils_network.cpp new file mode 100644 index 000000000..7b2fcc7a7 --- /dev/null +++ b/tests/unit/utils_network.cpp @@ -0,0 +1,21 @@ +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "utils/network.hpp" + +using namespace utils; + +TEST(ResolveHostname, Simple) { + auto result = ResolveHostname("localhost"); + EXPECT_TRUE(result == "127.0.0.1" || result == "::1"); +} + +TEST(ResolveHostname, PassThroughIpv4) { + auto result = ResolveHostname("127.0.0.1"); + EXPECT_EQ(result, "127.0.0.1"); +} + +TEST(ResolveHostname, PassThroughIpv6) { + auto result = ResolveHostname("::1"); + EXPECT_EQ(result, "::1"); +}