Rename NetworkEndpoint

Summary:
Rename redunant port str

Add endpoint << operator

Migrate everything to endpoint

Reviewers: mferencevic, florijan

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1100
This commit is contained in:
Dominik Gleich 2018-01-15 14:03:07 +01:00
parent 41679b6ec5
commit 5418dfb19e
49 changed files with 342 additions and 348 deletions

View File

@ -21,7 +21,7 @@ set(memgraph_src_files
durability/snapshooter.cpp
durability/wal.cpp
io/network/addrinfo.cpp
io/network/network_endpoint.cpp
io/network/endpoint.cpp
io/network/socket.cpp
query/common.cpp
query/console.cpp
@ -47,6 +47,7 @@ set(memgraph_src_files
transactions/engine_master.cpp
transactions/engine_single_node.cpp
transactions/engine_worker.cpp
utils/network.cpp
utils/watchdog.cpp
)
# -----------------------------------------------------------------------------

View File

@ -2,16 +2,12 @@
namespace communication::messaging {
System::System(const std::string &address, uint16_t port)
: endpoint_(address, port) {
System::System(const io::network::Endpoint &endpoint) : endpoint_(endpoint) {
// Numbers of workers is quite arbitrary at this point.
StartClient(4);
StartServer(4);
}
System::System(const io::network::NetworkEndpoint &endpoint)
: System(endpoint.address(), endpoint.port()) {}
System::~System() {
queue_.Shutdown();
for (size_t i = 0; i < pool_.size(); ++i) {
@ -26,7 +22,7 @@ void System::StartClient(int worker_count) {
while (true) {
auto message = queue_.AwaitPop();
if (message == std::experimental::nullopt) break;
SendMessage(message->address, message->port, message->channel,
SendMessage(message->endpoint, message->channel,
std::move(message->message));
}
}));
@ -47,11 +43,11 @@ std::shared_ptr<EventStream> System::Open(const std::string &name) {
return system_.Open(name);
}
Writer::Writer(System &system, const std::string &address, uint16_t port,
Writer::Writer(System &system, const Endpoint &endpoint,
const std::string &name)
: system_(system), address_(address), port_(port), name_(name) {}
: system_(system), endpoint_(endpoint), name_(name) {}
void Writer::Send(std::unique_ptr<Message> message) {
system_.queue_.Emplace(address_, port_, name_, std::move(message));
system_.queue_.Emplace(endpoint_, name_, std::move(message));
}
} // namespace communication::messaging

View File

@ -19,7 +19,7 @@
#include "protocol.hpp"
#include "communication/server.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "threading/sync/spinlock.hpp"
namespace communication::messaging {
@ -29,8 +29,7 @@ class System;
// Writes message to remote event stream.
class Writer {
public:
Writer(System &system, const std::string &address, uint16_t port,
const std::string &name);
Writer(System &system, const Endpoint &endpoint, const std::string &name);
Writer(const Writer &) = delete;
void operator=(const Writer &) = delete;
Writer(Writer &&) = delete;
@ -46,19 +45,15 @@ class Writer {
private:
System &system_;
std::string address_;
uint16_t port_;
Endpoint endpoint_;
std::string name_;
};
class System {
using Endpoint = io::network::NetworkEndpoint;
public:
friend class Writer;
System(const std::string &address, uint16_t port);
explicit System(const Endpoint &endpoint);
System(const Endpoint &endpoint);
System(const System &) = delete;
System(System &&) = delete;
System &operator=(const System &) = delete;
@ -67,7 +62,7 @@ class System {
std::shared_ptr<EventStream> Open(const std::string &name);
const io::network::NetworkEndpoint &endpoint() const { return endpoint_; }
const Endpoint &endpoint() const { return endpoint_; }
private:
using Socket = Socket;
@ -76,19 +71,14 @@ class System {
struct NetworkMessage {
NetworkMessage() {}
NetworkMessage(const std::string &address, uint16_t port,
const std::string &channel,
NetworkMessage(const Endpoint &endpoint, const std::string &channel,
std::unique_ptr<Message> &&message)
: address(address),
port(port),
channel(channel),
message(std::move(message)) {}
: endpoint(endpoint), channel(channel), message(std::move(message)) {}
NetworkMessage(NetworkMessage &&nm) = default;
NetworkMessage &operator=(NetworkMessage &&nm) = default;
std::string address;
uint16_t port = 0;
Endpoint endpoint;
std::string channel;
std::unique_ptr<Message> message;
};
@ -108,7 +98,7 @@ class System {
// Server variables.
SessionData protocol_data_;
std::unique_ptr<ServerT> server_{nullptr};
io::network::NetworkEndpoint endpoint_;
Endpoint endpoint_;
LocalSystem &system_ = protocol_data_.system;
};

View File

@ -75,30 +75,27 @@ struct PairHash {
}
};
void SendMessage(const std::string &address, uint16_t port,
const std::string &channel, std::unique_ptr<Message> message) {
void SendMessage(const Endpoint &endpoint, const std::string &channel,
std::unique_ptr<Message> message) {
static thread_local std::unordered_map<std::pair<std::string, uint16_t>,
Socket, PairHash>
cache;
CHECK(message) << "Trying to send nullptr instead of message";
auto it = cache.find({address, port});
auto it = cache.find({endpoint.address(), endpoint.port()});
if (it == cache.end()) {
// Initialize endpoint.
Endpoint endpoint(address.c_str(), port);
Socket socket;
if (!socket.Connect(endpoint)) {
LOG(INFO) << "Couldn't connect to remote address: " << address << ":"
<< port;
LOG(INFO) << "Couldn't connect to endpoint: " << endpoint;
return;
}
it = cache
.emplace(std::piecewise_construct,
std::forward_as_tuple(address, port),
std::forward_as_tuple(std::move(socket)))
.first;
it =
cache
.emplace(std::piecewise_construct,
std::forward_as_tuple(endpoint.address(), endpoint.port()),
std::forward_as_tuple(std::move(socket)))
.first;
}
auto &socket = it->second;
@ -131,4 +128,4 @@ void SendMessage(const std::string &address, uint16_t port,
return;
}
}
}
} // namespace communication::messaging

View File

@ -5,8 +5,8 @@
#include "communication/bolt/v1/decoder/buffer.hpp"
#include "communication/messaging/local.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/epoll.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/socket.hpp"
#include "io/network/stream_buffer.hpp"
@ -23,7 +23,7 @@ namespace communication::messaging {
class Message;
using Endpoint = io::network::NetworkEndpoint;
using Endpoint = io::network::Endpoint;
using Socket = io::network::Socket;
using StreamBuffer = io::network::StreamBuffer;
@ -101,6 +101,6 @@ class Session {
/**
* Distributed Protocol Send Message
*/
void SendMessage(const std::string &address, uint16_t port,
const std::string &channel, std::unique_ptr<Message> message);
}
void SendMessage(const Endpoint &endpoint, const std::string &channel,
std::unique_ptr<Message> message);
} // namespace communication::messaging

View File

@ -8,7 +8,7 @@
#include "communication/raft/network_common.hpp"
#include "communication/raft/raft.hpp"
#include "communication/rpc/rpc.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
/* Implementation of `RaftNetworkInterface` using RPC. Raft RPC requests and
* responses are wrapped in `PeerRpcRequest` and `PeerRpcReply`. */
@ -28,9 +28,8 @@ using PeerProtocol =
template <class State>
class RpcNetwork : public RaftNetworkInterface<State> {
public:
RpcNetwork(
communication::messaging::System &system,
std::unordered_map<std::string, io::network::NetworkEndpoint> directory)
RpcNetwork(communication::messaging::System &system,
std::unordered_map<std::string, io::network::Endpoint> directory)
: system_(system),
directory_(std::move(directory)),
server_(system, kRaftChannelName) {}
@ -110,17 +109,14 @@ class RpcNetwork : public RaftNetworkInterface<State> {
auto it = clients_.find(id);
if (it == clients_.end()) {
auto ne = directory_[id];
it = clients_
.try_emplace(id, system_, ne.address(), ne.port(),
kRaftChannelName)
.first;
it = clients_.try_emplace(id, system_, ne, kRaftChannelName).first;
}
return it->second;
}
communication::messaging::System &system_;
// TODO(mtomic): how to update and distribute this?
std::unordered_map<MemberId, io::network::NetworkEndpoint> directory_;
std::unordered_map<MemberId, io::network::Endpoint> directory_;
rpc::Server server_;
std::unordered_map<MemberId, communication::rpc::Client> clients_;

View File

@ -9,24 +9,25 @@
#include "boost/serialization/unique_ptr.hpp"
#include "communication/rpc/rpc.hpp"
#include "io/network/endpoint.hpp"
#include "utils/string.hpp"
namespace communication::rpc {
const char kProtocolStreamPrefix[] = "rpc-";
using Endpoint = io::network::Endpoint;
class Request : public messaging::Message {
public:
Request(const std::string &address, uint16_t port, const std::string &stream,
Request(const Endpoint &endpoint, const std::string &stream,
std::unique_ptr<Message> message)
: address_(address),
port_(port),
: endpoint_(endpoint),
stream_(stream),
message_id_(utils::RandomString(20)),
message_(std::move(message)) {}
const std::string &address() const { return address_; }
uint16_t port() const { return port_; }
const Endpoint &endpoint() const { return endpoint_; }
const std::string &stream() const { return stream_; }
const std::string &message_id() const { return message_id_; }
const messaging::Message &message() const { return *message_; }
@ -38,15 +39,13 @@ class Request : public messaging::Message {
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &boost::serialization::base_object<messaging::Message>(*this);
ar &address_;
ar &port_;
ar &endpoint_;
ar &stream_;
ar &message_id_;
ar &message_;
}
std::string address_;
uint16_t port_;
io::network::Endpoint endpoint_;
std::string stream_;
std::string message_id_;
std::unique_ptr<messaging::Message> message_;
@ -76,16 +75,11 @@ class Response : public messaging::Message {
std::unique_ptr<messaging::Message> message_;
};
Client::Client(messaging::System &system, const std::string &address,
uint16_t port, const std::string &name)
: system_(system),
writer_(system, address, port, kProtocolStreamPrefix + name),
stream_(system.Open(utils::RandomString(20))) {}
Client::Client(messaging::System &system,
const io::network::NetworkEndpoint &endpoint,
Client::Client(messaging::System &system, const io::network::Endpoint &endpoint,
const std::string &name)
: Client(system, endpoint.address(), endpoint.port(), name) {}
: system_(system),
writer_(system, endpoint, kProtocolStreamPrefix + name),
stream_(system.Open(utils::RandomString(20))) {}
// Because of the way Call is implemented it can fail without reporting (it will
// just block indefinately). This is why you always need to provide reasonable
@ -95,9 +89,8 @@ Client::Client(messaging::System &system,
std::unique_ptr<messaging::Message> Client::Call(
std::chrono::system_clock::duration timeout,
std::unique_ptr<messaging::Message> message) {
auto request = std::make_unique<Request>(system_.endpoint().address(),
system_.endpoint().port(),
stream_->name(), std::move(message));
auto request = std::make_unique<Request>(system_.endpoint(), stream_->name(),
std::move(message));
auto message_id = request->message_id();
writer_.Send(std::move(request));
@ -136,8 +129,7 @@ Server::Server(messaging::System &system, const std::string &name)
auto it = callbacks_accessor.find(real_request.type_index());
if (it == callbacks_accessor.end()) continue;
auto response = it->second(real_request);
messaging::Writer writer(system_, request->address(), request->port(),
request->stream());
messaging::Writer writer(system_, request->endpoint(), request->stream());
writer.Send<Response>(request->message_id(), std::move(response));
}
});

View File

@ -4,7 +4,7 @@
#include "communication/messaging/distributed.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
namespace communication::rpc {
@ -17,10 +17,8 @@ struct RequestResponse {
// Client is thread safe.
class Client {
public:
Client(messaging::System &system, const std::string &address, uint16_t port,
Client(messaging::System &system, const io::network::Endpoint &endpoint,
const std::string &name);
Client(messaging::System &system,
const io::network::NetworkEndpoint &endpoint, const std::string &name);
// Call function can initiate only one request at the time. Function blocks
// until there is a response or timeout was reached. If timeout was reached

View File

@ -45,7 +45,7 @@ class Server {
* Constructs and binds server to endpoint, operates on session data and
* invokes n workers
*/
Server(const io::network::NetworkEndpoint &endpoint,
Server(const io::network::Endpoint &endpoint,
TSessionData &session_data, size_t n)
: session_data_(session_data) {
// Without server we can't continue with application so we can just

View File

@ -46,7 +46,7 @@ MasterCounters::MasterCounters(communication::messaging::System &system)
WorkerCounters::WorkerCounters(
communication::messaging::System &system,
const io::network::NetworkEndpoint &master_endpoint)
const io::network::Endpoint &master_endpoint)
: rpc_client_(system, master_endpoint, kCountersRpc) {}
int64_t WorkerCounters::Get(const std::string &name) {

View File

@ -55,7 +55,7 @@ class MasterCounters : public SingleNodeCounters {
class WorkerCounters : public Counters {
public:
WorkerCounters(communication::messaging::System &system,
const io::network::NetworkEndpoint &master_endpoint);
const io::network::Endpoint &master_endpoint);
int64_t Get(const std::string &name) override;
void Set(const std::string &name, int64_t value) override;

View File

@ -11,7 +11,7 @@
#include "database/storage_gc.hpp"
#include "database/types.hpp"
#include "durability/wal.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp"
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
@ -35,8 +35,8 @@ struct Config {
// Distributed master/worker flags.
int worker_id;
io::network::NetworkEndpoint master_endpoint;
io::network::NetworkEndpoint worker_endpoint;
io::network::Endpoint master_endpoint;
io::network::Endpoint worker_endpoint;
};
namespace impl {

View File

@ -5,10 +5,10 @@
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
namespace distributed {
using Endpoint = io::network::NetworkEndpoint;
using Endpoint = io::network::Endpoint;
/** Handles worker registration, getting of other workers' endpoints and
* coordinated shutdown in a distributed memgraph. Master side. */

View File

@ -5,7 +5,7 @@
#include "communication/messaging/local.hpp"
#include "communication/rpc/rpc.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "utils/rpc_pimp.hpp"
namespace distributed {
@ -13,7 +13,7 @@ namespace distributed {
const std::string kCoordinationServerName = "CoordinationRpc";
using communication::messaging::Message;
using Endpoint = io::network::NetworkEndpoint;
using Endpoint = io::network::Endpoint;
struct RegisterWorkerReq : public Message {
RegisterWorkerReq() {}

View File

@ -2,10 +2,10 @@
#include "data_structures/concurrent/concurrent_map.hpp"
#include "distributed/coordination_rpc_messages.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
namespace distributed {
using Endpoint = io::network::NetworkEndpoint;
using Endpoint = io::network::Endpoint;
/** Handles worker registration, getting of other workers' endpoints and
* coordinated shutdown in a distributed memgraph. Worker side. */

View File

@ -28,4 +28,4 @@ AddrInfo AddrInfo::Get(const char *addr, const char *port) {
}
AddrInfo::operator struct addrinfo *() { return info; }
}
} // namespace io::network

View File

@ -0,0 +1,36 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include <algorithm>
#include "glog/logging.h"
#include "io/network/endpoint.hpp"
namespace io::network {
Endpoint::Endpoint() {}
Endpoint::Endpoint(const std::string &address, uint16_t port)
: address_(address), port_(port) {
in_addr addr4;
in6_addr addr6;
int ipv4_result = inet_pton(AF_INET, address_.c_str(), &addr4);
int ipv6_result = inet_pton(AF_INET6, address_.c_str(), &addr6);
if (ipv4_result == 1)
family_ = 4;
else if (ipv6_result == 1)
family_ = 6;
CHECK(family_ != 0) << "Not a valid IPv4 or IPv6 address: " << address;
}
bool Endpoint::operator==(const Endpoint &other) const {
return address_ == other.address_ && port_ == other.port_ &&
family_ == other.family_;
}
std::ostream &operator<<(std::ostream &os, const Endpoint &endpoint) {
return os << "Address:" << endpoint.address() << "Port:" << endpoint.port();
}
} // namespace io::network

View File

@ -2,6 +2,7 @@
#include <netinet/in.h>
#include <cstdint>
#include <iostream>
#include <string>
#include "boost/serialization/access.hpp"
@ -15,19 +16,17 @@ namespace io::network {
* It is used when connecting to an address and to get the current
* connection address.
*/
class NetworkEndpoint {
class Endpoint {
public:
NetworkEndpoint();
NetworkEndpoint(const std::string &addr, const std::string &port);
NetworkEndpoint(const char *addr, const char *port);
NetworkEndpoint(const std::string &addr, uint16_t port);
Endpoint();
Endpoint(const std::string &address, uint16_t port);
const char *address() const { return address_; }
const char *port_str() const { return port_str_; }
std::string address() const { return address_; }
uint16_t port() const { return port_; }
unsigned char family() const { return family_; }
bool operator==(const NetworkEndpoint &other) const;
bool operator==(const Endpoint &other) const;
friend std::ostream &operator<<(std::ostream &os, const Endpoint &endpoint);
private:
friend class boost::serialization::access;
@ -35,15 +34,13 @@ class NetworkEndpoint {
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &address_;
ar &port_str_;
ar &port_;
ar &family_;
}
char address_[INET6_ADDRSTRLEN];
char port_str_[6];
uint16_t port_;
unsigned char family_;
std::string address_;
uint16_t port_{0};
unsigned char family_{0};
};
} // namespace io::network

View File

@ -1,53 +0,0 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <algorithm>
#include "glog/logging.h"
#include "io/network/network_endpoint.hpp"
namespace io::network {
NetworkEndpoint::NetworkEndpoint() : port_(0), family_(0) {
memset(address_, 0, sizeof address_);
memset(port_str_, 0, sizeof port_str_);
}
NetworkEndpoint::NetworkEndpoint(const char *addr, const char *port) {
if (!addr || !port) LOG(FATAL) << "Address or port is nullptr";
// strncpy isn't used because it does not guarantee an ending null terminator
snprintf(address_, sizeof address_, "%s", addr);
snprintf(port_str_, sizeof port_str_, "%s", port);
in_addr addr4;
in6_addr addr6;
int ret = inet_pton(AF_INET, address_, &addr4);
if (ret != 1) {
ret = inet_pton(AF_INET6, address_, &addr6);
if (ret != 1) {
LOG(FATAL) << "Not a valid IPv4 or IPv6 address: " << *addr;
}
family_ = 6;
} else {
family_ = 4;
}
ret = sscanf(port, "%hu", &port_);
if (ret != 1) LOG(FATAL) << "Not a valid port: " << *port;
}
NetworkEndpoint::NetworkEndpoint(const std::string &addr,
const std::string &port)
: NetworkEndpoint(addr.c_str(), port.c_str()) {}
NetworkEndpoint::NetworkEndpoint(const std::string &addr, uint16_t port)
: NetworkEndpoint(addr.c_str(), std::to_string(port)) {}
bool NetworkEndpoint::operator==(const NetworkEndpoint &other) const {
return std::equal(std::begin(address_), std::end(address_),
std::begin(other.address_)) &&
port_ == other.port_ && family_ == other.family_;
}
} // namespace io::network

View File

@ -52,10 +52,11 @@ void Socket::Close() {
bool Socket::IsOpen() const { return socket_ != -1; }
bool Socket::Connect(const NetworkEndpoint &endpoint) {
bool Socket::Connect(const Endpoint &endpoint) {
if (socket_ != -1) return false;
auto info = AddrInfo::Get(endpoint.address(), endpoint.port_str());
auto info = AddrInfo::Get(endpoint.address().c_str(),
std::to_string(endpoint.port()).c_str());
for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) {
int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
@ -71,10 +72,11 @@ bool Socket::Connect(const NetworkEndpoint &endpoint) {
return true;
}
bool Socket::Bind(const NetworkEndpoint &endpoint) {
bool Socket::Bind(const Endpoint &endpoint) {
if (socket_ != -1) return false;
auto info = AddrInfo::Get(endpoint.address(), endpoint.port_str());
auto info = AddrInfo::Get(endpoint.address().c_str(),
std::to_string(endpoint.port()).c_str());
for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) {
int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
@ -99,7 +101,7 @@ bool Socket::Bind(const NetworkEndpoint &endpoint) {
return false;
}
endpoint_ = NetworkEndpoint(endpoint.address(), ntohs(portdata.sin6_port));
endpoint_ = Endpoint(endpoint.address(), ntohs(portdata.sin6_port));
return true;
}
@ -173,7 +175,7 @@ std::experimental::optional<Socket> Socket::Accept() {
inet_ntop(addr.ss_family, addr_src, addr_decoded, INET6_ADDRSTRLEN);
NetworkEndpoint endpoint(addr_decoded, port);
Endpoint endpoint(addr_decoded, port);
return Socket(sfd, endpoint);
}

View File

@ -4,13 +4,13 @@
#include <functional>
#include <iostream>
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
namespace io::network {
/**
* This class creates a network socket.
* It is used to connect/bind/listen on a NetworkEndpoint (address + port).
* It is used to connect/bind/listen on a Endpoint (address + port).
* It has wrappers for setting network socket flags and wrappers for
* reading/writing data from/to the socket.
*/
@ -40,24 +40,24 @@ class Socket {
/**
* Connects the socket to the specified endpoint.
*
* @param endpoint NetworkEndpoint to which to connect to
* @param endpoint Endpoint to which to connect to
*
* @return connection success status:
* true if the connect succeeded
* false if the connect failed
*/
bool Connect(const NetworkEndpoint &endpoint);
bool Connect(const Endpoint &endpoint);
/**
* Binds the socket to the specified endpoint.
*
* @param endpoint NetworkEndpoint to which to bind to
* @param endpoint Endpoint to which to bind to
*
* @return bind success status:
* true if the bind succeeded
* false if the bind failed
*/
bool Bind(const NetworkEndpoint &endpoint);
bool Bind(const Endpoint &endpoint);
/**
* Start listening on the bound socket.
@ -112,7 +112,7 @@ class Socket {
/**
* Returns the currently active endpoint of the socket.
*/
const NetworkEndpoint &endpoint() const { return endpoint_; }
const Endpoint &endpoint() const { return endpoint_; }
/**
* Write data to the socket.
@ -157,10 +157,10 @@ class Socket {
int Read(void *buffer, size_t len);
private:
Socket(int fd, const NetworkEndpoint &endpoint)
Socket(int fd, const Endpoint &endpoint)
: socket_(fd), endpoint_(endpoint) {}
int socket_ = -1;
NetworkEndpoint endpoint_;
Endpoint endpoint_;
};
}
} // namespace io::network

View File

@ -12,7 +12,7 @@
#include "database/graph_db.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/network_error.hpp"
#include "io/network/socket.hpp"
#include "utils/flag_validation.hpp"
@ -24,16 +24,17 @@
#include "version.hpp"
namespace fs = std::experimental::filesystem;
using io::network::NetworkEndpoint;
using io::network::Socket;
using communication::bolt::SessionData;
using io::network::Endpoint;
using io::network::Socket;
using SessionT = communication::bolt::Session<Socket>;
using ServerT = communication::Server<SessionT, SessionData>;
// General purpose flags.
DEFINE_string(interface, "0.0.0.0",
"Communication interface on which to listen.");
DEFINE_string(port, "7687", "Communication port on which to listen.");
DEFINE_VALIDATED_int32(port, 7687, "Communication port on which to listen.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
DEFINE_VALIDATED_int32(num_workers,
std::max(std::thread::hardware_concurrency(), 1U),
"Number of workers", FLAG_IN_RANGE(1, INT32_MAX));
@ -103,8 +104,8 @@ void MasterMain() {
database::Master db;
SessionData session_data{db};
ServerT server({FLAGS_interface, FLAGS_port}, session_data,
FLAGS_num_workers);
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
session_data, FLAGS_num_workers);
// Handler for regular termination signals
auto shutdown = [&server] {
@ -131,8 +132,8 @@ void SingleNodeMain() {
google::SetUsageMessage("Memgraph single-node database server");
database::SingleNode db;
SessionData session_data{db};
ServerT server({FLAGS_interface, FLAGS_port}, session_data,
FLAGS_num_workers);
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
session_data, FLAGS_num_workers);
// Handler for regular termination signals
auto shutdown = [&server] {

View File

@ -34,7 +34,7 @@ ID_VALUE_RPC_CALLS(Property)
template <typename TId>
WorkerConcurrentIdMapper<TId>::WorkerConcurrentIdMapper(
communication::messaging::System &system,
const io::network::NetworkEndpoint &master_endpoint)
const io::network::Endpoint &master_endpoint)
: rpc_client_(system, master_endpoint, kConcurrentIdMapperRpc) {}
template <typename TId>

View File

@ -3,7 +3,7 @@
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp"
namespace storage {
@ -19,7 +19,7 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId> {
public:
WorkerConcurrentIdMapper(communication::messaging::System &system,
const io::network::NetworkEndpoint &master_endpoint);
const io::network::Endpoint &master_endpoint);
TId value_to_id(const std::string &value) override;
const std::string &id_to_value(const TId &id) override;

View File

@ -10,7 +10,7 @@ static const auto kRpcTimeout = 100ms;
} // namespace
WorkerEngine::WorkerEngine(communication::messaging::System &system,
const io::network::NetworkEndpoint &endpoint)
const io::network::Endpoint &endpoint)
: rpc_client_(system, endpoint, kTransactionEngineRpc) {}
Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) {

View File

@ -6,7 +6,7 @@
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
@ -18,7 +18,7 @@ namespace tx {
class WorkerEngine : public Engine {
public:
WorkerEngine(communication::messaging::System &system,
const io::network::NetworkEndpoint &endpoint);
const io::network::Endpoint &endpoint);
Transaction *LocalBegin(transaction_id_t tx_id);

44
src/utils/network.cpp Normal file
View File

@ -0,0 +1,44 @@
#include "utils/network.hpp"
#include <arpa/inet.h>
#include <netdb.h>
#include <cstring>
#include <string>
#include "glog/logging.h"
namespace utils {
/// Resolves hostname to ip, if already an ip, just returns it
std::string ResolveHostname(std::string hostname) {
addrinfo hints;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
hints.ai_socktype = SOCK_STREAM;
int addr_result;
addrinfo *servinfo;
CHECK((addr_result =
getaddrinfo(hostname.c_str(), NULL, &hints, &servinfo)) == 0)
<< "Error with getaddrinfo:" << gai_strerror(addr_result);
CHECK(servinfo) << "Could not resolve address: " << hostname;
std::string address;
if (servinfo->ai_family == AF_INET) {
sockaddr_in *hipv4 = (sockaddr_in *)servinfo->ai_addr;
char astring[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(hipv4->sin_addr), astring, INET_ADDRSTRLEN);
address = astring;
} else {
sockaddr_in6 *hipv6 = (sockaddr_in6 *)servinfo->ai_addr;
char astring[INET6_ADDRSTRLEN];
inet_ntop(AF_INET6, &(hipv6->sin6_addr), astring, INET6_ADDRSTRLEN);
address = astring;
}
freeaddrinfo(servinfo);
return address;
}
}; // namespace utils

10
src/utils/network.hpp Normal file
View File

@ -0,0 +1,10 @@
#pragma once
#include <string>
namespace utils {
/// Resolves hostname to ip, if already an ip, just returns it
std::string ResolveHostname(std::string hostname);
}; // namespace utils

View File

@ -19,7 +19,7 @@
static constexpr const int SIZE = 60000;
static constexpr const int REPLY = 10;
using io::network::NetworkEndpoint;
using io::network::Endpoint;
using io::network::Socket;
class TestData {};
@ -66,13 +66,13 @@ class TestSession {
using ServerT = communication::Server<TestSession, TestData>;
void client_run(int num, const char *interface, const char *port,
void client_run(int num, const char *interface, uint16_t port,
const unsigned char *data, int lo, int hi) {
std::stringstream name;
name << "Client " << num;
unsigned char buffer[SIZE * REPLY], head[2];
int have, read;
NetworkEndpoint endpoint(interface, port);
Endpoint endpoint(interface, port);
Socket socket;
ASSERT_TRUE(socket.Connect(endpoint));
socket.SetTimeout(2, 0);

View File

@ -20,7 +20,7 @@
static constexpr const char interface[] = "127.0.0.1";
using io::network::NetworkEndpoint;
using io::network::Endpoint;
using io::network::Socket;
class TestData {};
@ -53,8 +53,8 @@ class TestSession {
std::atomic<bool> run{true};
void client_run(int num, const char *interface, const char *port) {
NetworkEndpoint endpoint(interface, port);
void client_run(int num, const char *interface, uint16_t port) {
Endpoint endpoint(interface, port);
Socket socket;
uint8_t data = 0x00;
ASSERT_TRUE(socket.Connect(endpoint));
@ -70,9 +70,9 @@ void client_run(int num, const char *interface, const char *port) {
TEST(Network, SocketReadHangOnConcurrentConnections) {
// initialize listen socket
NetworkEndpoint endpoint(interface, "0");
Endpoint endpoint(interface, 0);
printf("ADDRESS: %s, PORT: %d\n", endpoint.address(), endpoint.port());
std::cout << endpoint << std::endl;
// initialize server
TestData data;
@ -84,7 +84,7 @@ TEST(Network, SocketReadHangOnConcurrentConnections) {
// start clients
std::vector<std::thread> clients;
for (int i = 0; i < Nc; ++i)
clients.push_back(std::thread(client_run, i, interface, ep.port_str()));
clients.push_back(std::thread(client_run, i, interface, ep.port()));
// wait for 2s and stop clients
std::this_thread::sleep_for(std::chrono::seconds(2));

View File

@ -2,6 +2,8 @@
#define NDEBUG
#endif
#include <iostream>
#include "network_common.hpp"
static constexpr const char interface[] = "127.0.0.1";
@ -13,8 +15,8 @@ TEST(Network, Server) {
initialize_data(data, SIZE);
// initialize listen socket
NetworkEndpoint endpoint(interface, "0");
printf("ADDRESS: %s, PORT: %d\n", endpoint.address(), endpoint.port());
Endpoint endpoint(interface, 0);
std::cout << endpoint << std::endl;
// initialize server
TestData session_data;
@ -25,8 +27,8 @@ TEST(Network, Server) {
// start clients
std::vector<std::thread> clients;
for (int i = 0; i < N; ++i)
clients.push_back(std::thread(client_run, i, interface, ep.port_str(), data,
30000, SIZE));
clients.push_back(
std::thread(client_run, i, interface, ep.port(), data, 30000, SIZE));
// cleanup clients
for (int i = 0; i < N; ++i) clients[i].join();

View File

@ -3,6 +3,7 @@
#endif
#include <chrono>
#include <iostream>
#include "network_common.hpp"
@ -17,8 +18,8 @@ TEST(Network, SessionLeak) {
initialize_data(data, SIZE);
// initialize listen socket
NetworkEndpoint endpoint(interface, "0");
printf("ADDRESS: %s, PORT: %d\n", endpoint.address(), endpoint.port());
Endpoint endpoint(interface, 0);
std::cout << endpoint << std::endl;
// initialize server
TestData session_data;
@ -31,7 +32,7 @@ TEST(Network, SessionLeak) {
const auto &ep = server.endpoint();
int testlen = 3000;
for (int i = 0; i < N; ++i) {
clients.push_back(std::thread(client_run, i, interface, ep.port_str(), data,
clients.push_back(std::thread(client_run, i, interface, ep.port(), data,
testlen, testlen));
std::this_thread::sleep_for(10ms);
}

View File

@ -8,7 +8,9 @@
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "io/network/endpoint.hpp"
#include "messages.hpp"
#include "utils/network.hpp"
#include "utils/signals/handler.hpp"
#include "utils/terminate_handler.hpp"
@ -18,10 +20,10 @@ using namespace communication::rpc;
using namespace std::literals::chrono_literals;
DEFINE_string(interface, "127.0.0.1", "Client system interface.");
DEFINE_string(port, "8020", "Client system port.");
DEFINE_int32(port, 8020, "Client system port.");
DEFINE_string(server_interface, "127.0.0.1",
"Server interface on which to communicate.");
DEFINE_string(server_port, "8010", "Server port on which to communicate.");
DEFINE_int32(server_port, 8010, "Server port on which to communicate.");
volatile sig_atomic_t is_shutting_down = 0;
@ -32,9 +34,12 @@ int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
// Initialize client.
System client_system(FLAGS_interface, stoul(FLAGS_port));
Client client(client_system, FLAGS_server_interface, stoul(FLAGS_server_port),
"main");
System client_system(io::network::Endpoint(FLAGS_interface, FLAGS_port));
Client client(
client_system,
io::network::Endpoint(utils::ResolveHostname(FLAGS_server_interface),
FLAGS_server_port),
"main");
// Try to send 100 values to server.
// If requests timeout, try to resend it.

View File

@ -32,7 +32,8 @@ int main(int argc, char **argv) {
// Unhandled exception handler init.
std::set_terminate(&terminate_handler);
System server_system(FLAGS_interface, stoul(FLAGS_port));
System server_system(
io::network::Endpoint(FLAGS_interface, stoul(FLAGS_port)));
Server server(server_system, "main");
std::ofstream log(FLAGS_log, std::ios_base::app);

View File

@ -5,18 +5,18 @@
#include "communication/bolt/client.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/socket.hpp"
using SocketT = io::network::Socket;
using EndpointT = io::network::NetworkEndpoint;
using EndpointT = io::network::Endpoint;
using ClientT = communication::bolt::Client<SocketT>;
using QueryDataT = communication::bolt::QueryData;
using communication::bolt::DecodedValue;
class BoltClient {
public:
BoltClient(const std::string &address, const std::string &port,
BoltClient(const std::string &address, uint16_t port,
const std::string &username, const std::string &password,
const std::string & = "") {
SocketT socket;

View File

@ -17,19 +17,19 @@
#include "common.hpp"
#include "communication/bolt/client.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/socket.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/algorithm.hpp"
#include "utils/algorithm.hpp"
#include "utils/network.hpp"
#include "utils/timer.hpp"
using communication::bolt::DecodedEdge;
using communication::bolt::DecodedValue;
using communication::bolt::DecodedVertex;
using communication::bolt::DecodedEdge;
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_string(port, "7687", "Server port");
DEFINE_int32(port, 7687, "Server port");
DEFINE_int32(num_workers, 1, "Number of workers");
DEFINE_string(output, "", "Output file");
DEFINE_string(username, "", "Username for the database");
@ -49,7 +49,7 @@ std::atomic<int64_t> executed_queries;
class Session {
public:
Session(const nlohmann::json &config, const std::string &address,
const std::string &port, const std::string &username,
uint16_t port, const std::string &username,
const std::string &password)
: config_(config), client_(address, port, username, password) {}
@ -269,10 +269,11 @@ int64_t NumNodes(BoltClient &client, const std::string &label) {
std::vector<int64_t> Neighbours(BoltClient &client, const std::string &label,
int64_t id) {
auto result = ExecuteNTimesTillSuccess(
client, "MATCH (n :" + label + " {id: " + std::to_string(id) +
"})-[e]-(m) RETURN m.id",
{}, MAX_RETRIES);
auto result = ExecuteNTimesTillSuccess(client,
"MATCH (n :" + label +
" {id: " + std::to_string(id) +
"})-[e]-(m) RETURN m.id",
{}, MAX_RETRIES);
std::vector<int64_t> ret;
for (const auto &record : result.records) {
ret.push_back(record[0].ValueInt());
@ -319,8 +320,8 @@ int main(int argc, char **argv) {
const std::string independent_label = config["independent_label"];
auto independent_nodes_ids = [&] {
BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username,
FLAGS_password);
BoltClient client(utils::ResolveHostname(FLAGS_address), FLAGS_port,
FLAGS_username, FLAGS_password);
return IndependentSet(client, independent_label);
}();
@ -387,7 +388,10 @@ int main(int argc, char **argv) {
// little bit chaotic. Think about refactoring this part to only use json
// and write DecodedValue to json converter.
const std::vector<std::string> fields = {
"wall_time", "parsing_time", "planning_time", "plan_execution_time",
"wall_time",
"parsing_time",
"planning_time",
"plan_execution_time",
};
for (const auto &query_stats : stats) {
std::map<std::string, double> new_aggregated_query_stats;
@ -417,12 +421,13 @@ int main(int argc, char **argv) {
out << "{\"num_executed_queries\": " << executed_queries << ", "
<< "\"elapsed_time\": " << timer.Elapsed().count()
<< ", \"queries\": [";
utils::PrintIterable(out, aggregated_stats, ", ", [](auto &stream,
const auto &x) {
stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": ";
PrintJsonDecodedValue(stream, DecodedValue(x.second));
stream << "}";
});
utils::PrintIterable(
out, aggregated_stats, ", ", [](auto &stream, const auto &x) {
stream << "{\"query\": " << nlohmann::json(x.first)
<< ", \"stats\": ";
PrintJsonDecodedValue(stream, DecodedValue(x.second));
stream << "}";
});
out << "]}" << std::endl;
out.flush();
std::this_thread::sleep_for(1s);

View File

@ -21,7 +21,7 @@ DEFINE_string(input, "", "Input file");
DEFINE_string(output, "", "Output file");
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_string(port, "", "Server port");
DEFINE_int32(port, 0, "Server port");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_string(database, "", "Database for the database");
@ -51,9 +51,9 @@ void PrintSummary(
template <typename ClientT>
void ExecuteQueries(const std::vector<std::string> &queries, int num_workers,
std::ostream &ostream, std::string &address,
std::string &port, std::string &username,
std::string &password, std::string &database) {
std::ostream &ostream, std::string &address, uint16_t port,
std::string &username, std::string &password,
std::string &database) {
std::vector<std::thread> threads;
SpinLock spinlock;
@ -121,11 +121,11 @@ int main(int argc, char **argv) {
ostream = &ofile;
}
std::string port = FLAGS_port;
uint16_t port = FLAGS_port;
if (FLAGS_protocol == "bolt") {
if (port == "") port = "7687";
if (port == 0) port = 7687;
} else if (FLAGS_protocol == "postgres") {
if (port == "") port = "5432";
if (port == 0) port = 5432;
}
while (!istream->eof()) {

View File

@ -2,16 +2,17 @@
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/socket.hpp"
#include "utils/network.hpp"
#include "utils/timer.hpp"
using SocketT = io::network::Socket;
using EndpointT = io::network::NetworkEndpoint;
using EndpointT = io::network::Endpoint;
using ClientT = communication::bolt::Client<SocketT>;
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_string(port, "7687", "Server port");
DEFINE_int32(port, 7687, "Server port");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
@ -20,7 +21,7 @@ int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
// TODO: handle endpoint exception
EndpointT endpoint(FLAGS_address, FLAGS_port);
EndpointT endpoint(utils::ResolveHostname(FLAGS_address), FLAGS_port);
SocketT socket;
if (!socket.Connect(endpoint)) return 1;

View File

@ -12,7 +12,7 @@
namespace raft = communication::raft;
using io::network::NetworkEndpoint;
using io::network::Endpoint;
using raft::RaftConfig;
using raft::RpcNetwork;
using raft::test_utils::DummyState;
@ -35,13 +35,12 @@ int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
gflags::ParseCommandLineFlags(&argc, &argv, true);
std::unordered_map<std::string, NetworkEndpoint> directory = {
{"a", NetworkEndpoint("127.0.0.1", 12345)},
{"b", NetworkEndpoint("127.0.0.1", 12346)},
{"c", NetworkEndpoint("127.0.0.1", 12347)}};
std::unordered_map<std::string, Endpoint> directory = {
{"a", Endpoint("127.0.0.1", 12345)},
{"b", Endpoint("127.0.0.1", 12346)},
{"c", Endpoint("127.0.0.1", 12347)}};
communication::messaging::System my_system("127.0.0.1",
directory[FLAGS_member_id].port());
communication::messaging::System my_system(directory[FLAGS_member_id]);
RpcNetwork<DummyState> network(my_system, directory);
raft::test_utils::InMemoryStorageInterface<DummyState> storage(0, {}, {});

View File

@ -3,20 +3,20 @@
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/socket.hpp"
#include "utils/exceptions.hpp"
#include "utils/timer.hpp"
using SocketT = io::network::Socket;
using EndpointT = io::network::NetworkEndpoint;
using EndpointT = io::network::Endpoint;
using ClientT = communication::bolt::Client<SocketT>;
using DecodedValueT = communication::bolt::DecodedValue;
using QueryDataT = communication::bolt::QueryData;
using ExceptionT = communication::bolt::ClientQueryException;
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_string(port, "7687", "Server port");
DEFINE_int32(port, 7687, "Server port");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");

View File

@ -16,9 +16,9 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test {
const std::string kLocal{"127.0.0.1"};
protected:
System master_system_{kLocal, 0};
System master_system_{{kLocal, 0}};
std::experimental::optional<MasterConcurrentIdMapper<TId>> master_mapper_;
System worker_system_{kLocal, 0};
System worker_system_{{kLocal, 0}};
std::experimental::optional<WorkerConcurrentIdMapper<TId>> worker_mapper_;
void SetUp() override {

View File

@ -6,13 +6,13 @@
const std::string kLocal = "127.0.0.1";
TEST(CountersDistributed, All) {
communication::messaging::System master_sys(kLocal, 0);
communication::messaging::System master_sys({kLocal, 0});
database::MasterCounters master(master_sys);
communication::messaging::System w1_sys(kLocal, 0);
communication::messaging::System w1_sys({kLocal, 0});
database::WorkerCounters w1(w1_sys, master_sys.endpoint());
communication::messaging::System w2_sys(kLocal, 0);
communication::messaging::System w2_sys({kLocal, 0});
database::WorkerCounters w2(w2_sys, master_sys.endpoint());
EXPECT_EQ(w1.Get("a"), 0);

View File

@ -9,7 +9,7 @@
#include "communication/messaging/distributed.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
using communication::messaging::System;
using namespace distributed;
@ -19,10 +19,9 @@ const std::string kLocal = "127.0.0.1";
class WorkerInThread {
public:
WorkerInThread(io::network::NetworkEndpoint master_endpoint,
int desired_id = -1) {
WorkerInThread(io::network::Endpoint master_endpoint, int desired_id = -1) {
worker_thread_ = std::thread([this, master_endpoint, desired_id] {
system_.emplace(kLocal, 0);
system_.emplace(Endpoint(kLocal, 0));
coord_.emplace(*system_, master_endpoint);
worker_id_ = coord_->RegisterWorker(desired_id);
coord_->WaitForShutdown();
@ -42,7 +41,7 @@ class WorkerInThread {
};
TEST(Distributed, Coordination) {
System master_system(kLocal, 0);
System master_system({kLocal, 0});
std::vector<std::unique_ptr<WorkerInThread>> workers;
{
MasterCoordination master_coord(master_system);
@ -71,7 +70,7 @@ TEST(Distributed, Coordination) {
}
TEST(Distributed, DesiredAndUniqueId) {
System master_system(kLocal, 0);
System master_system({kLocal, 0});
std::vector<std::unique_ptr<WorkerInThread>> workers;
{
MasterCoordination master_coord(master_system);

View File

@ -43,17 +43,16 @@ BOOST_CLASS_EXPORT(MessageInt);
* Test do the services start up without crashes.
*/
TEST(SimpleTests, StartAndShutdown) {
System system("127.0.0.1", 0);
System system({"127.0.0.1", 0});
// do nothing
std::this_thread::sleep_for(500ms);
}
TEST(Messaging, Pop) {
System master_system("127.0.0.1", 0);
System slave_system("127.0.0.1", 0);
System master_system({"127.0.0.1", 0});
System slave_system({"127.0.0.1", 0});
auto stream = master_system.Open("main");
Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(),
"main");
Writer writer(slave_system, master_system.endpoint(), "main");
std::this_thread::sleep_for(100ms);
EXPECT_EQ(stream->Poll(), nullptr);
@ -62,11 +61,10 @@ TEST(Messaging, Pop) {
}
TEST(Messaging, Await) {
System master_system("127.0.0.1", 0);
System slave_system("127.0.0.1", 0);
System master_system({"127.0.0.1", 0});
System slave_system({"127.0.0.1", 0});
auto stream = master_system.Open("main");
Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(),
"main");
Writer writer(slave_system, master_system.endpoint(), "main");
std::this_thread::sleep_for(100ms);
std::thread t([&] {
@ -82,11 +80,10 @@ TEST(Messaging, Await) {
}
TEST(Messaging, RecreateChannelAfterClosing) {
System master_system("127.0.0.1", 0);
System slave_system("127.0.0.1", 0);
System master_system({"127.0.0.1", 0});
System slave_system({"127.0.0.1", 0});
auto stream = master_system.Open("main");
Writer writer(slave_system, "127.0.0.1", master_system.endpoint().port(),
"main");
Writer writer(slave_system, master_system.endpoint(), "main");
std::this_thread::sleep_for(100ms);
writer.Send<MessageInt>(10);

View File

@ -2,76 +2,35 @@
#include "gtest/gtest.h"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/network_error.hpp"
using endpoint_t = io::network::NetworkEndpoint;
using endpoint_t = io::network::Endpoint;
TEST(NetworkEndpoint, IPv4) {
TEST(Endpoint, IPv4) {
endpoint_t endpoint;
// test first constructor
endpoint = endpoint_t("127.0.0.1", "12345");
EXPECT_STREQ(endpoint.address(), "127.0.0.1");
EXPECT_STREQ(endpoint.port_str(), "12345");
EXPECT_EQ(endpoint.port(), 12345);
EXPECT_EQ(endpoint.family(), 4);
// test second constructor
std::string addr("127.0.0.2"), port("12346");
endpoint = endpoint_t(addr, port);
EXPECT_STREQ(endpoint.address(), "127.0.0.2");
EXPECT_STREQ(endpoint.port_str(), "12346");
EXPECT_EQ(endpoint.port(), 12346);
EXPECT_EQ(endpoint.family(), 4);
// test third constructor
// test constructor
endpoint = endpoint_t("127.0.0.1", 12347);
EXPECT_STREQ(endpoint.address(), "127.0.0.1");
EXPECT_STREQ(endpoint.port_str(), "12347");
EXPECT_EQ(endpoint.address(), "127.0.0.1");
EXPECT_EQ(endpoint.port(), 12347);
EXPECT_EQ(endpoint.family(), 4);
// test address null
EXPECT_DEATH(endpoint_t(nullptr, nullptr), "null");
// test address invalid
EXPECT_DEATH(endpoint_t("invalid", "12345"), "addres");
// test port invalid
EXPECT_DEATH(endpoint_t("127.0.0.1", "invalid"), "port");
EXPECT_DEATH(endpoint_t("invalid", 12345), "address");
}
TEST(NetworkEndpoint, IPv6) {
TEST(Endpoint, IPv6) {
endpoint_t endpoint;
// test first constructor
endpoint = endpoint_t("ab:cd:ef::1", "12345");
EXPECT_STREQ(endpoint.address(), "ab:cd:ef::1");
EXPECT_STREQ(endpoint.port_str(), "12345");
EXPECT_EQ(endpoint.port(), 12345);
EXPECT_EQ(endpoint.family(), 6);
// test second constructor
std::string addr("ab:cd:ef::2"), port("12346");
endpoint = endpoint_t(addr, port);
EXPECT_STREQ(endpoint.address(), "ab:cd:ef::2");
EXPECT_STREQ(endpoint.port_str(), "12346");
EXPECT_EQ(endpoint.port(), 12346);
EXPECT_EQ(endpoint.family(), 6);
// test third constructor
// test constructor
endpoint = endpoint_t("ab:cd:ef::3", 12347);
EXPECT_STREQ(endpoint.address(), "ab:cd:ef::3");
EXPECT_STREQ(endpoint.port_str(), "12347");
EXPECT_EQ(endpoint.address(), "ab:cd:ef::3");
EXPECT_EQ(endpoint.port(), 12347);
EXPECT_EQ(endpoint.family(), 6);
// test address invalid
EXPECT_DEATH(endpoint_t("::g", "12345"), "address");
// test port invalid
EXPECT_DEATH(endpoint_t("::1", "invalid"), "port");
EXPECT_DEATH(endpoint_t("::g", 12345), "address");
}
int main(int argc, char **argv) {

View File

@ -9,8 +9,7 @@
#include "communication/bolt/client.hpp"
#include "communication/bolt/v1/session.hpp"
#include "communication/server.hpp"
#include "database/graph_db.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/socket.hpp"
DECLARE_int32(query_execution_time_sec);
@ -20,7 +19,7 @@ using namespace std::chrono_literals;
class TestClientSocket;
using communication::bolt::ClientException;
using communication::bolt::SessionData;
using io::network::NetworkEndpoint;
using io::network::Endpoint;
using io::network::Socket;
using SessionT = communication::bolt::Session<Socket>;
using ResultStreamT = SessionT::ResultStreamT;
@ -31,13 +30,13 @@ class RunningServer {
public:
database::SingleNode db_;
SessionData session_data_{db_};
NetworkEndpoint endpoint_{"127.0.0.1", "0"};
Endpoint endpoint_{"127.0.0.1", 0};
ServerT server_{endpoint_, session_data_, 1};
};
class TestClient : public ClientT {
public:
TestClient(NetworkEndpoint endpoint)
TestClient(Endpoint endpoint)
: ClientT(
[&] {
Socket socket;

View File

@ -55,22 +55,21 @@ BOOST_CLASS_EXPORT(SumRes);
using Sum = RequestResponse<SumReq, SumRes>;
TEST(Rpc, Call) {
System server_system("127.0.0.1", 0);
System server_system({"127.0.0.1", 0});
Server server(server_system, "main");
server.Register<Sum>([](const SumReq &request) {
return std::make_unique<SumRes>(request.x + request.y);
});
std::this_thread::sleep_for(100ms);
System client_system("127.0.0.1", 0);
Client client(client_system, "127.0.0.1", server_system.endpoint().port(),
"main");
System client_system({"127.0.0.1", 0});
Client client(client_system, server_system.endpoint(), "main");
auto sum = client.Call<Sum>(300ms, 10, 20);
EXPECT_EQ(sum->sum, 30);
}
TEST(Rpc, Timeout) {
System server_system("127.0.0.1", 0);
System server_system({"127.0.0.1", 0});
Server server(server_system, "main");
server.Register<Sum>([](const SumReq &request) {
std::this_thread::sleep_for(300ms);
@ -78,9 +77,8 @@ TEST(Rpc, Timeout) {
});
std::this_thread::sleep_for(100ms);
System client_system("127.0.0.1", 0);
Client client(client_system, "127.0.0.1", server_system.endpoint().port(),
"main");
System client_system({"127.0.0.1", 0});
Client client(client_system, server_system.endpoint(), "main");
auto sum = client.Call<Sum>(100ms, 10, 20);
EXPECT_FALSE(sum);
}

View File

@ -3,7 +3,7 @@
#include "gtest/gtest.h"
#include "communication/messaging/distributed.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/endpoint.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/engine_rpc_messages.hpp"
#include "transactions/engine_worker.hpp"
@ -15,10 +15,10 @@ class WorkerEngineTest : public testing::Test {
protected:
const std::string local{"127.0.0.1"};
System master_system_{local, 0};
System master_system_{{local, 0}};
MasterEngine master_{master_system_};
System worker_system_{local, 0};
System worker_system_{{local, 0}};
WorkerEngine worker_{worker_system_, master_system_.endpoint()};
};

View File

@ -0,0 +1,21 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "utils/network.hpp"
using namespace utils;
TEST(ResolveHostname, Simple) {
auto result = ResolveHostname("localhost");
EXPECT_TRUE(result == "127.0.0.1" || result == "::1");
}
TEST(ResolveHostname, PassThroughIpv4) {
auto result = ResolveHostname("127.0.0.1");
EXPECT_EQ(result, "127.0.0.1");
}
TEST(ResolveHostname, PassThroughIpv6) {
auto result = ResolveHostname("::1");
EXPECT_EQ(result, "::1");
}