Add custom VLOGs for distributed memgraph
Summary: Add different priority VLOGs for distributed memgraph. For level 3 you'll get logs for dispatching/consuming plans. For level 4 you'll get logs for tx start/commit/abort, remote produce, remote pull, remote result consume, For level 5 there will be a log for each request/response made by the RPC client. Master log snippet P9 Worker log snippet P10 Reviewers: florijan, teon.banek Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1296
This commit is contained in:
parent
ac8c96ccc2
commit
29ba055b64
@ -33,7 +33,7 @@ class Decoder {
|
||||
bool ReadValue(DecodedValue *data) {
|
||||
uint8_t value;
|
||||
|
||||
VLOG(1) << "[ReadValue] Start";
|
||||
VLOG(20) << "[ReadValue] Start";
|
||||
|
||||
if (!buffer_.Read(&value, 1)) {
|
||||
DLOG(WARNING) << "[ReadValue] Marker data missing!";
|
||||
@ -148,7 +148,7 @@ class Decoder {
|
||||
bool ReadMessageHeader(Signature *signature, Marker *marker) {
|
||||
uint8_t values[2];
|
||||
|
||||
VLOG(1) << "[ReadMessageHeader] Start";
|
||||
VLOG(20) << "[ReadMessageHeader] Start";
|
||||
|
||||
if (!buffer_.Read(values, 2)) {
|
||||
DLOG(WARNING) << "[ReadMessageHeader] Marker data missing!";
|
||||
@ -166,15 +166,15 @@ class Decoder {
|
||||
|
||||
private:
|
||||
bool ReadNull(const Marker &marker, DecodedValue *data) {
|
||||
VLOG(1) << "[ReadNull] Start";
|
||||
VLOG(20) << "[ReadNull] Start";
|
||||
DCHECK(marker == Marker::Null) << "Received invalid marker!";
|
||||
*data = DecodedValue();
|
||||
VLOG(1) << "[ReadNull] Success";
|
||||
VLOG(20) << "[ReadNull] Success";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadBool(const Marker &marker, DecodedValue *data) {
|
||||
VLOG(1) << "[ReadBool] Start";
|
||||
VLOG(20) << "[ReadBool] Start";
|
||||
DCHECK(marker == Marker::False || marker == Marker::True)
|
||||
<< "Received invalid marker!";
|
||||
if (marker == Marker::False) {
|
||||
@ -182,20 +182,20 @@ class Decoder {
|
||||
} else {
|
||||
*data = DecodedValue(true);
|
||||
}
|
||||
VLOG(1) << "[ReadBool] Success";
|
||||
VLOG(20) << "[ReadBool] Success";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadInt(const Marker &marker, DecodedValue *data) {
|
||||
uint8_t value = underlying_cast(marker);
|
||||
int64_t ret;
|
||||
VLOG(1) << "[ReadInt] Start";
|
||||
VLOG(20) << "[ReadInt] Start";
|
||||
if (value >= 240 || value <= 127) {
|
||||
VLOG(1) << "[ReadInt] Found a TinyInt";
|
||||
VLOG(20) << "[ReadInt] Found a TinyInt";
|
||||
ret = value;
|
||||
if (value >= 240) ret -= 256;
|
||||
} else if (marker == Marker::Int8) {
|
||||
VLOG(1) << "[ReadInt] Found an Int8";
|
||||
VLOG(20) << "[ReadInt] Found an Int8";
|
||||
int8_t tmp;
|
||||
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
|
||||
DLOG(WARNING) << "[ReadInt] Int8 missing data!";
|
||||
@ -203,7 +203,7 @@ class Decoder {
|
||||
}
|
||||
ret = tmp;
|
||||
} else if (marker == Marker::Int16) {
|
||||
VLOG(1) << "[ReadInt] Found an Int16";
|
||||
VLOG(20) << "[ReadInt] Found an Int16";
|
||||
int16_t tmp;
|
||||
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
|
||||
DLOG(WARNING) << "[ReadInt] Int16 missing data!";
|
||||
@ -211,7 +211,7 @@ class Decoder {
|
||||
}
|
||||
ret = bswap(tmp);
|
||||
} else if (marker == Marker::Int32) {
|
||||
VLOG(1) << "[ReadInt] Found an Int32";
|
||||
VLOG(20) << "[ReadInt] Found an Int32";
|
||||
int32_t tmp;
|
||||
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
|
||||
DLOG(WARNING) << "[ReadInt] Int32 missing data!";
|
||||
@ -219,7 +219,7 @@ class Decoder {
|
||||
}
|
||||
ret = bswap(tmp);
|
||||
} else if (marker == Marker::Int64) {
|
||||
VLOG(1) << "[ReadInt] Found an Int64";
|
||||
VLOG(20) << "[ReadInt] Found an Int64";
|
||||
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&ret), sizeof(ret))) {
|
||||
DLOG(WARNING) << "[ReadInt] Int64 missing data!";
|
||||
return false;
|
||||
@ -231,14 +231,14 @@ class Decoder {
|
||||
return false;
|
||||
}
|
||||
*data = DecodedValue(ret);
|
||||
VLOG(1) << "[ReadInt] Success";
|
||||
VLOG(20) << "[ReadInt] Success";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadDouble(const Marker marker, DecodedValue *data) {
|
||||
uint64_t value;
|
||||
double ret;
|
||||
VLOG(1) << "[ReadDouble] Start";
|
||||
VLOG(20) << "[ReadDouble] Start";
|
||||
DCHECK(marker == Marker::Float64) << "Received invalid marker!";
|
||||
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&value), sizeof(value))) {
|
||||
DLOG(WARNING) << "[ReadDouble] Missing data!";
|
||||
@ -248,17 +248,17 @@ class Decoder {
|
||||
// cppcheck-suppress invalidPointerCast
|
||||
ret = *reinterpret_cast<double *>(&value);
|
||||
*data = DecodedValue(ret);
|
||||
VLOG(1) << "[ReadDouble] Success";
|
||||
VLOG(20) << "[ReadDouble] Success";
|
||||
return true;
|
||||
}
|
||||
|
||||
int64_t ReadTypeSize(const Marker &marker, const uint8_t type) {
|
||||
uint8_t value = underlying_cast(marker);
|
||||
if ((value & 0xF0) == underlying_cast(MarkerTiny[type])) {
|
||||
VLOG(1) << "[ReadTypeSize] Found a TinyType";
|
||||
VLOG(20) << "[ReadTypeSize] Found a TinyType";
|
||||
return value & 0x0F;
|
||||
} else if (marker == Marker8[type]) {
|
||||
VLOG(1) << "[ReadTypeSize] Found a Type8";
|
||||
VLOG(20) << "[ReadTypeSize] Found a Type8";
|
||||
uint8_t tmp;
|
||||
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
|
||||
DLOG(WARNING) << "[ReadTypeSize] Type8 missing data!";
|
||||
@ -266,7 +266,7 @@ class Decoder {
|
||||
}
|
||||
return tmp;
|
||||
} else if (marker == Marker16[type]) {
|
||||
VLOG(1) << "[ReadTypeSize] Found a Type16";
|
||||
VLOG(20) << "[ReadTypeSize] Found a Type16";
|
||||
uint16_t tmp;
|
||||
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
|
||||
DLOG(WARNING) << "[ReadTypeSize] Type16 missing data!";
|
||||
@ -275,7 +275,7 @@ class Decoder {
|
||||
tmp = bswap(tmp);
|
||||
return tmp;
|
||||
} else if (marker == Marker32[type]) {
|
||||
VLOG(1) << "[ReadTypeSize] Found a Type32";
|
||||
VLOG(20) << "[ReadTypeSize] Found a Type32";
|
||||
uint32_t tmp;
|
||||
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
|
||||
DLOG(WARNING) << "[ReadTypeSize] Type32 missing data!";
|
||||
@ -291,7 +291,7 @@ class Decoder {
|
||||
}
|
||||
|
||||
bool ReadString(const Marker &marker, DecodedValue *data) {
|
||||
VLOG(1) << "[ReadString] Start";
|
||||
VLOG(20) << "[ReadString] Start";
|
||||
auto size = ReadTypeSize(marker, MarkerString);
|
||||
if (size == -1) {
|
||||
DLOG(WARNING) << "[ReadString] Couldn't get size!";
|
||||
@ -304,12 +304,12 @@ class Decoder {
|
||||
}
|
||||
*data =
|
||||
DecodedValue(std::string(reinterpret_cast<char *>(ret.get()), size));
|
||||
VLOG(1) << "[ReadString] Success";
|
||||
VLOG(20) << "[ReadString] Success";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadList(const Marker &marker, DecodedValue *data) {
|
||||
VLOG(1) << "[ReadList] Start";
|
||||
VLOG(20) << "[ReadList] Start";
|
||||
auto size = ReadTypeSize(marker, MarkerList);
|
||||
if (size == -1) {
|
||||
DLOG(WARNING) << "[ReadList] Couldn't get size!";
|
||||
@ -323,12 +323,12 @@ class Decoder {
|
||||
}
|
||||
}
|
||||
*data = DecodedValue(ret);
|
||||
VLOG(1) << "[ReadList] Success";
|
||||
VLOG(20) << "[ReadList] Success";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadMap(const Marker &marker, DecodedValue *data) {
|
||||
VLOG(1) << "[ReadMap] Start";
|
||||
VLOG(20) << "[ReadMap] Start";
|
||||
auto size = ReadTypeSize(marker, MarkerMap);
|
||||
if (size == -1) {
|
||||
DLOG(WARNING) << "[ReadMap] Couldn't get size!";
|
||||
@ -362,7 +362,7 @@ class Decoder {
|
||||
}
|
||||
|
||||
*data = DecodedValue(ret);
|
||||
VLOG(1) << "[ReadMap] Success";
|
||||
VLOG(20) << "[ReadMap] Success";
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -370,7 +370,7 @@ class Decoder {
|
||||
DecodedValue dv;
|
||||
DecodedVertex vertex;
|
||||
|
||||
VLOG(1) << "[ReadVertex] Start";
|
||||
VLOG(20) << "[ReadVertex] Start";
|
||||
|
||||
// read ID
|
||||
if (!ReadValue(&dv, DecodedValue::Type::Int)) {
|
||||
@ -403,7 +403,7 @@ class Decoder {
|
||||
|
||||
*data = DecodedValue(vertex);
|
||||
|
||||
VLOG(1) << "[ReadVertex] Success";
|
||||
VLOG(20) << "[ReadVertex] Success";
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -413,7 +413,7 @@ class Decoder {
|
||||
DecodedValue dv;
|
||||
DecodedEdge edge;
|
||||
|
||||
VLOG(1) << "[ReadEdge] Start";
|
||||
VLOG(20) << "[ReadEdge] Start";
|
||||
|
||||
if (!buffer_.Read(&value, 1)) {
|
||||
DLOG(WARNING) << "[ReadEdge] Missing marker and/or signature data!";
|
||||
@ -468,7 +468,7 @@ class Decoder {
|
||||
|
||||
*data = DecodedValue(edge);
|
||||
|
||||
VLOG(1) << "[ReadEdge] Success";
|
||||
VLOG(20) << "[ReadEdge] Success";
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -477,7 +477,7 @@ class Decoder {
|
||||
DecodedValue dv;
|
||||
DecodedUnboundedEdge edge;
|
||||
|
||||
VLOG(1) << "[ReadUnboundedEdge] Start";
|
||||
VLOG(20) << "[ReadUnboundedEdge] Start";
|
||||
|
||||
// read ID
|
||||
if (!ReadValue(&dv, DecodedValue::Type::Int)) {
|
||||
@ -502,7 +502,7 @@ class Decoder {
|
||||
|
||||
*data = DecodedValue(edge);
|
||||
|
||||
VLOG(1) << "[ReadUnboundedEdge] Success";
|
||||
VLOG(20) << "[ReadUnboundedEdge] Success";
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -511,7 +511,7 @@ class Decoder {
|
||||
DecodedValue dv;
|
||||
DecodedPath path;
|
||||
|
||||
VLOG(1) << "[ReadPath] Start";
|
||||
VLOG(20) << "[ReadPath] Start";
|
||||
|
||||
// vertices
|
||||
if (!ReadValue(&dv, DecodedValue::Type::List)) {
|
||||
@ -557,7 +557,7 @@ class Decoder {
|
||||
|
||||
*data = DecodedValue(path);
|
||||
|
||||
VLOG(1) << "[ReadPath] Success";
|
||||
VLOG(20) << "[ReadPath] Success";
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -59,8 +59,8 @@ std::unique_ptr<Message> Client::Call(const Message &request) {
|
||||
const std::string &request_buffer = request_stream.str();
|
||||
CHECK(request_buffer.size() <= std::numeric_limits<MessageSize>::max())
|
||||
<< fmt::format(
|
||||
"Trying to send message of size {}, max message size is {}",
|
||||
request_buffer.size(), std::numeric_limits<MessageSize>::max());
|
||||
"Trying to send message of size {}, max message size is {}",
|
||||
request_buffer.size(), std::numeric_limits<MessageSize>::max());
|
||||
|
||||
MessageSize request_data_size = request_buffer.size();
|
||||
if (!socket_->Write(reinterpret_cast<uint8_t *>(&request_data_size),
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "utils/demangle.hpp"
|
||||
|
||||
namespace communication::rpc {
|
||||
|
||||
@ -29,7 +30,14 @@ class Client {
|
||||
"TRequestResponse::Request must be derived from Message");
|
||||
static_assert(std::is_base_of<Message, Res>::value,
|
||||
"TRequestResponse::Response must be derived from Message");
|
||||
std::unique_ptr<Message> response = Call(Req(std::forward<Args>(args)...));
|
||||
auto request = Req(std::forward<Args>(args)...);
|
||||
|
||||
if (VLOG_IS_ON(12)) {
|
||||
auto req_type = utils::Demangle(request.type_index().name());
|
||||
LOG(INFO) << "[RpcClient] sent " << (req_type ? req_type.value() : "");
|
||||
}
|
||||
|
||||
std::unique_ptr<Message> response = Call(request);
|
||||
auto *real_response = dynamic_cast<Res *>(response.get());
|
||||
if (!real_response && response) {
|
||||
// Since message_id was checked in private Call function, this means
|
||||
@ -38,6 +46,13 @@ class Client {
|
||||
socket_ = std::experimental::nullopt;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (VLOG_IS_ON(12)) {
|
||||
auto res_type = utils::Demangle(response->type_index().name());
|
||||
LOG(INFO) << "[RpcClient] received "
|
||||
<< (res_type ? res_type.value() : "");
|
||||
}
|
||||
|
||||
response.release();
|
||||
return std::unique_ptr<Res>(real_response);
|
||||
}
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "communication/rpc/protocol.hpp"
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "utils/demangle.hpp"
|
||||
|
||||
namespace communication::rpc {
|
||||
|
||||
@ -44,6 +45,11 @@ void Session::Execute() {
|
||||
"Session trying to execute an unregistered RPC call!");
|
||||
}
|
||||
|
||||
if (VLOG_IS_ON(12)) {
|
||||
auto req_type = utils::Demangle(request->type_index().name());
|
||||
LOG(INFO) << "[RpcServer] received " << (req_type ? req_type.value() : "");
|
||||
}
|
||||
|
||||
std::unique_ptr<Message> response = it->second(*(request.get()));
|
||||
|
||||
if (!response) {
|
||||
@ -73,6 +79,11 @@ void Session::Execute() {
|
||||
if (!socket_.Write(buffer)) {
|
||||
throw SessionException("Couldn't send response data!");
|
||||
}
|
||||
|
||||
if (VLOG_IS_ON(12)) {
|
||||
auto res_type = utils::Demangle(response->type_index().name());
|
||||
LOG(INFO) << "[RpcServer] sent " << (res_type ? res_type.value() : "");
|
||||
}
|
||||
}
|
||||
|
||||
StreamBuffer Session::Allocate() { return buffer_.Allocate(); }
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "data_structures/queue.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "utils/demangle.hpp"
|
||||
|
||||
namespace communication::rpc {
|
||||
|
||||
@ -47,6 +48,14 @@ class Server {
|
||||
return callback(message);
|
||||
});
|
||||
CHECK(got.second) << "Callback for that message type already registered";
|
||||
if (VLOG_IS_ON(12)) {
|
||||
auto req_type =
|
||||
utils::Demangle(typeid(typename TRequestResponse::Request).name());
|
||||
auto res_type =
|
||||
utils::Demangle(typeid(typename TRequestResponse::Response).name());
|
||||
LOG(INFO) << "[RpcServer] register " << (req_type ? req_type.value() : "")
|
||||
<< " -> " << (res_type ? res_type.value() : "");
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -75,7 +75,7 @@ class StorageGc {
|
||||
vertices_.gc_.Run(snapshot, tx_engine_);
|
||||
edges_.gc_.Run(snapshot, tx_engine_);
|
||||
|
||||
VLOG(1) << "Garbage collector mvcc phase time: " << x.Elapsed().count();
|
||||
VLOG(21) << "Garbage collector mvcc phase time: " << x.Elapsed().count();
|
||||
}
|
||||
// This has to be run sequentially after gc because gc modifies
|
||||
// version_lists and changes the oldest visible record, on which Refresh
|
||||
@ -85,7 +85,7 @@ class StorageGc {
|
||||
utils::Timer x;
|
||||
storage_.labels_index_.Refresh(snapshot, tx_engine_);
|
||||
storage_.label_property_index_.Refresh(snapshot, tx_engine_);
|
||||
VLOG(1) << "Garbage collector index phase time: " << x.Elapsed().count();
|
||||
VLOG(21) << "Garbage collector index phase time: " << x.Elapsed().count();
|
||||
}
|
||||
{
|
||||
// We free expired objects with snapshot.back(), which is
|
||||
@ -100,21 +100,21 @@ class StorageGc {
|
||||
vertices_.record_deleter_.FreeExpiredObjects(snapshot.back());
|
||||
edges_.version_list_deleter_.FreeExpiredObjects(snapshot.back());
|
||||
vertices_.version_list_deleter_.FreeExpiredObjects(snapshot.back());
|
||||
VLOG(1) << "Garbage collector deferred deletion phase time: "
|
||||
<< x.Elapsed().count();
|
||||
VLOG(21) << "Garbage collector deferred deletion phase time: "
|
||||
<< x.Elapsed().count();
|
||||
}
|
||||
|
||||
LOG(INFO) << "Garbage collector finished";
|
||||
VLOG(2) << "gc snapshot: " << snapshot;
|
||||
VLOG(2) << "edge_record_deleter_ size: " << edges_.record_deleter_.Count();
|
||||
VLOG(2) << "vertex record deleter_ size: "
|
||||
<< vertices_.record_deleter_.Count();
|
||||
VLOG(2) << "edge_version_list_deleter_ size: "
|
||||
<< edges_.version_list_deleter_.Count();
|
||||
VLOG(2) << "vertex_version_list_deleter_ size: "
|
||||
<< vertices_.version_list_deleter_.Count();
|
||||
VLOG(2) << "vertices_ size: " << storage_.vertices_.access().size();
|
||||
VLOG(2) << "edges_ size: " << storage_.edges_.access().size();
|
||||
VLOG(21) << "gc snapshot: " << snapshot;
|
||||
VLOG(21) << "edge_record_deleter_ size: " << edges_.record_deleter_.Count();
|
||||
VLOG(21) << "vertex record deleter_ size: "
|
||||
<< vertices_.record_deleter_.Count();
|
||||
VLOG(21) << "edge_version_list_deleter_ size: "
|
||||
<< edges_.version_list_deleter_.Count();
|
||||
VLOG(21) << "vertex_version_list_deleter_ size: "
|
||||
<< vertices_.version_list_deleter_.Count();
|
||||
VLOG(21) << "vertices_ size: " << storage_.vertices_.access().size();
|
||||
VLOG(21) << "edges_ size: " << storage_.edges_.access().size();
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -52,7 +52,7 @@ class SnapshotDecoder : public Decoder<Buffer> {
|
||||
vertex.out.emplace_back(*edge);
|
||||
}
|
||||
|
||||
VLOG(1) << "[ReadSnapshotVertex] Success";
|
||||
VLOG(20) << "[ReadSnapshotVertex] Success";
|
||||
return vertex;
|
||||
}
|
||||
|
||||
@ -61,7 +61,7 @@ class SnapshotDecoder : public Decoder<Buffer> {
|
||||
DecodedValue dv;
|
||||
DecodedInlinedVertexEdge edge;
|
||||
|
||||
VLOG(1) << "[ReadSnapshotEdge] Start";
|
||||
VLOG(20) << "[ReadSnapshotEdge] Start";
|
||||
|
||||
// read ID
|
||||
if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Int)) {
|
||||
@ -84,7 +84,7 @@ class SnapshotDecoder : public Decoder<Buffer> {
|
||||
}
|
||||
edge.type = dv.ValueString();
|
||||
|
||||
VLOG(1) << "[ReadSnapshotEdge] Success";
|
||||
VLOG(20) << "[ReadSnapshotEdge] Success";
|
||||
|
||||
return edge;
|
||||
}
|
||||
|
@ -140,6 +140,8 @@ std::shared_ptr<Interpreter::CachedPlan> Interpreter::QueryToPlan(
|
||||
database::GraphDb::Type::DISTRIBUTED_MASTER) {
|
||||
auto distributed_plan = MakeDistributedPlan(
|
||||
*tmp_logical_plan, ctx.symbol_table_, next_plan_id_);
|
||||
VLOG(10) << "[Interpreter] Created plan for distributed execution "
|
||||
<< next_plan_id_ - 1;
|
||||
return std::make_shared<CachedPlan>(std::move(distributed_plan),
|
||||
query_plan_cost_estimation,
|
||||
plan_dispatcher_);
|
||||
|
@ -3021,6 +3021,8 @@ class RemotePuller {
|
||||
|
||||
void Initialize(Context &context) {
|
||||
if (!remote_pulls_initialized_) {
|
||||
VLOG(10) << "[RemotePuller] [" << context.db_accessor_.transaction_id()
|
||||
<< "] [" << plan_id_ << "] initialized";
|
||||
for (auto &worker_id : worker_ids_) {
|
||||
UpdatePullForWorker(worker_id, context);
|
||||
}
|
||||
@ -3032,7 +3034,9 @@ class RemotePuller {
|
||||
// If we don't have results for a worker, check if his remote pull
|
||||
// finished and save results locally.
|
||||
|
||||
auto move_frames = [this](int worker_id, auto remote_results) {
|
||||
auto move_frames = [this, &context](int worker_id, auto remote_results) {
|
||||
VLOG(10) << "[RemotePuller] [" << context.db_accessor_.transaction_id()
|
||||
<< "] [" << plan_id_ << "] received results from " << worker_id;
|
||||
remote_results_[worker_id] = std::move(remote_results.frames);
|
||||
// Since we return and remove results from the back of the vector,
|
||||
// reverse the results so the first to return is on the end of the
|
||||
@ -3053,10 +3057,16 @@ class RemotePuller {
|
||||
auto remote_results = remote_pull.get();
|
||||
switch (remote_results.pull_state) {
|
||||
case distributed::PullState::CURSOR_EXHAUSTED:
|
||||
VLOG(10) << "[RemotePuller] ["
|
||||
<< context.db_accessor_.transaction_id() << "] [" << plan_id_
|
||||
<< "] cursor exhausted from " << worker_id;
|
||||
move_frames(worker_id, remote_results);
|
||||
remote_pulls_.erase(found_it);
|
||||
break;
|
||||
case distributed::PullState::CURSOR_IN_PROGRESS:
|
||||
VLOG(10) << "[RemotePuller] ["
|
||||
<< context.db_accessor_.transaction_id() << "] [" << plan_id_
|
||||
<< "] cursor in progress from " << worker_id;
|
||||
move_frames(worker_id, remote_results);
|
||||
UpdatePullForWorker(worker_id, context);
|
||||
break;
|
||||
@ -3170,9 +3180,15 @@ class PullRemoteCursor : public Cursor {
|
||||
// If there are no remote results available, try to pull and return
|
||||
// local results.
|
||||
if (input_cursor_ && input_cursor_->Pull(frame, context)) {
|
||||
VLOG(10) << "[PullRemoteCursor] ["
|
||||
<< context.db_accessor_.transaction_id() << "] ["
|
||||
<< self_.plan_id() << "] producing local results ";
|
||||
return true;
|
||||
}
|
||||
|
||||
VLOG(10) << "[PullRemoteCursor] ["
|
||||
<< context.db_accessor_.transaction_id() << "] ["
|
||||
<< self_.plan_id() << "] no results available, sleeping ";
|
||||
// If there aren't any local/remote results available, sleep.
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::microseconds(FLAGS_remote_pull_sleep_micros));
|
||||
@ -3182,6 +3198,9 @@ class PullRemoteCursor : public Cursor {
|
||||
// No more remote results, make sure local results get exhausted.
|
||||
if (!have_remote_results) {
|
||||
if (input_cursor_ && input_cursor_->Pull(frame, context)) {
|
||||
VLOG(10) << "[PullRemoteCursor] ["
|
||||
<< context.db_accessor_.transaction_id() << "] ["
|
||||
<< self_.plan_id() << "] producing local results ";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
@ -3189,6 +3208,10 @@ class PullRemoteCursor : public Cursor {
|
||||
|
||||
{
|
||||
int worker_id = remote_puller_.GetWorkerId(last_pulled_worker_id_index_);
|
||||
VLOG(10) << "[PullRemoteCursor] ["
|
||||
<< context.db_accessor_.transaction_id() << "] ["
|
||||
<< self_.plan_id() << "] producing results from worker "
|
||||
<< worker_id;
|
||||
auto result = remote_puller_.PopResultFromWorker(worker_id);
|
||||
for (size_t i = 0; i < self_.symbols().size(); ++i) {
|
||||
frame[self_.symbols()[i]] = std::move(result[i]);
|
||||
@ -3224,6 +3247,9 @@ class SynchronizeCursor : public Cursor {
|
||||
}
|
||||
// Yield local stuff while available.
|
||||
if (!local_frames_.empty()) {
|
||||
VLOG(10) << "[SynchronizeCursor] ["
|
||||
<< context.db_accessor_.transaction_id()
|
||||
<< "] producing local results";
|
||||
auto &result = local_frames_.back();
|
||||
for (size_t i = 0; i < frame.elems().size(); ++i) {
|
||||
if (self_.advance_command()) {
|
||||
@ -3236,8 +3262,12 @@ class SynchronizeCursor : public Cursor {
|
||||
}
|
||||
|
||||
// We're out of local stuff, yield from pull_remote if available.
|
||||
if (pull_remote_cursor_ && pull_remote_cursor_->Pull(frame, context))
|
||||
if (pull_remote_cursor_ && pull_remote_cursor_->Pull(frame, context)) {
|
||||
VLOG(10) << "[SynchronizeCursor] ["
|
||||
<< context.db_accessor_.transaction_id()
|
||||
<< "] producing remote results";
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -3254,6 +3284,8 @@ class SynchronizeCursor : public Cursor {
|
||||
std::vector<std::vector<TypedValue>> local_frames_;
|
||||
|
||||
void InitialPull(Frame &frame, Context &context) {
|
||||
VLOG(10) << "[SynchronizeCursor] [" << context.db_accessor_.transaction_id()
|
||||
<< "] initial pull";
|
||||
auto &db = context.db_accessor_.db();
|
||||
|
||||
// Tell all workers to accumulate, only if there is a remote pull.
|
||||
@ -3453,6 +3485,9 @@ class PullRemoteOrderByCursor : public Cursor {
|
||||
};
|
||||
|
||||
if (!merge_initialized_) {
|
||||
VLOG(10) << "[PullRemoteOrderBy] ["
|
||||
<< context.db_accessor_.transaction_id() << "] ["
|
||||
<< self_.plan_id() << "] initialize";
|
||||
remote_puller_.Initialize(context);
|
||||
missing_results_from_ = remote_puller_.Workers();
|
||||
missing_master_result_ = true;
|
||||
@ -3487,6 +3522,9 @@ class PullRemoteOrderByCursor : public Cursor {
|
||||
}
|
||||
|
||||
if (!has_all_result) {
|
||||
VLOG(10) << "[PullRemoteOrderByCursor] ["
|
||||
<< context.db_accessor_.transaction_id() << "] ["
|
||||
<< self_.plan_id() << "] missing results, sleep";
|
||||
// If we don't have results from all workers, sleep before continuing.
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::microseconds(FLAGS_remote_pull_sleep_micros));
|
||||
@ -3516,8 +3554,15 @@ class PullRemoteOrderByCursor : public Cursor {
|
||||
restore_frame(result_it->remote_result);
|
||||
|
||||
if (result_it->worker_id) {
|
||||
VLOG(10) << "[PullRemoteOrderByCursor] ["
|
||||
<< context.db_accessor_.transaction_id() << "] ["
|
||||
<< self_.plan_id() << "] producing results from worker "
|
||||
<< result_it->worker_id.value();
|
||||
missing_results_from_.push_back(result_it->worker_id.value());
|
||||
} else {
|
||||
VLOG(10) << "[PullRemoteOrderByCursor] ["
|
||||
<< context.db_accessor_.transaction_id() << "] ["
|
||||
<< self_.plan_id() << "] producing local results";
|
||||
missing_master_result_ = true;
|
||||
}
|
||||
|
||||
|
@ -74,7 +74,7 @@ void StatsDispatchMain(const io::network::Endpoint &endpoint) {
|
||||
flush_batch();
|
||||
}
|
||||
|
||||
VLOG(10) << fmt::format("Sent {} out of {} events from queue.", sent,
|
||||
VLOG(30) << fmt::format("Sent {} out of {} events from queue.", sent,
|
||||
total);
|
||||
last.delete_tail();
|
||||
std::this_thread::sleep_for(
|
||||
|
@ -13,6 +13,7 @@ SingleNodeEngine::SingleNodeEngine(durability::WriteAheadLog *wal)
|
||||
: wal_(wal) {}
|
||||
|
||||
Transaction *SingleNodeEngine::Begin() {
|
||||
VLOG(11) << "[Tx] Starting transaction " << counter_ + 1;
|
||||
std::lock_guard<SpinLock> guard(lock_);
|
||||
|
||||
transaction_id_t id{++counter_};
|
||||
@ -50,6 +51,7 @@ command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) {
|
||||
}
|
||||
|
||||
void SingleNodeEngine::Commit(const Transaction &t) {
|
||||
VLOG(11) << "[Tx] Commiting transaction " << t.id_;
|
||||
std::lock_guard<SpinLock> guard(lock_);
|
||||
clog_.set_committed(t.id_);
|
||||
active_.remove(t.id_);
|
||||
@ -60,6 +62,7 @@ void SingleNodeEngine::Commit(const Transaction &t) {
|
||||
}
|
||||
|
||||
void SingleNodeEngine::Abort(const Transaction &t) {
|
||||
VLOG(11) << "[Tx] Aborting transaction " << t.id_;
|
||||
std::lock_guard<SpinLock> guard(lock_);
|
||||
clog_.set_aborted(t.id_);
|
||||
active_.remove(t.id_);
|
||||
|
@ -25,6 +25,7 @@ Transaction *WorkerEngine::Begin() {
|
||||
Transaction *tx = new Transaction(data.tx_id, data.snapshot, *this);
|
||||
auto insertion = active_.access().insert(data.tx_id, tx);
|
||||
CHECK(insertion.second) << "Failed to start creation from worker";
|
||||
VLOG(11) << "[Tx] Starting worker transaction " << data.tx_id;
|
||||
return tx;
|
||||
}
|
||||
|
||||
@ -63,12 +64,14 @@ void WorkerEngine::Commit(const Transaction &t) {
|
||||
auto res = master_client_pool_.Call<CommitRpc>(t.id_);
|
||||
CHECK(res) << "CommitRpc failed";
|
||||
ClearSingleTransaction(t.id_);
|
||||
VLOG(11) << "[Tx] Commiting worker transaction " << t.id_;
|
||||
}
|
||||
|
||||
void WorkerEngine::Abort(const Transaction &t) {
|
||||
auto res = master_client_pool_.Call<AbortRpc>(t.id_);
|
||||
CHECK(res) << "AbortRpc failed";
|
||||
ClearSingleTransaction(t.id_);
|
||||
VLOG(11) << "[Tx] Aborting worker transaction " << t.id_;
|
||||
}
|
||||
|
||||
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
|
||||
|
Loading…
Reference in New Issue
Block a user