From 29ba055b64a24f8a824c7f7c9b900e4413e6bdcc Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Mon, 12 Mar 2018 09:24:31 +0100 Subject: [PATCH] Add custom VLOGs for distributed memgraph Summary: Add different priority VLOGs for distributed memgraph. For level 3 you'll get logs for dispatching/consuming plans. For level 4 you'll get logs for tx start/commit/abort, remote produce, remote pull, remote result consume, For level 5 there will be a log for each request/response made by the RPC client. Master log snippet P9 Worker log snippet P10 Reviewers: florijan, teon.banek Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1296 --- src/communication/bolt/v1/decoder/decoder.hpp | 66 +++++++++---------- src/communication/rpc/client.cpp | 4 +- src/communication/rpc/client.hpp | 17 ++++- src/communication/rpc/protocol.cpp | 11 ++++ src/communication/rpc/server.hpp | 9 +++ src/database/storage_gc.hpp | 28 ++++---- src/durability/snapshot_decoder.hpp | 6 +- src/query/interpreter.cpp | 2 + src/query/plan/operator.cpp | 49 +++++++++++++- src/stats/stats.cpp | 2 +- src/transactions/engine_single_node.cpp | 3 + src/transactions/engine_worker.cpp | 3 + 12 files changed, 144 insertions(+), 56 deletions(-) diff --git a/src/communication/bolt/v1/decoder/decoder.hpp b/src/communication/bolt/v1/decoder/decoder.hpp index c6ca2800c..e0154abe2 100644 --- a/src/communication/bolt/v1/decoder/decoder.hpp +++ b/src/communication/bolt/v1/decoder/decoder.hpp @@ -33,7 +33,7 @@ class Decoder { bool ReadValue(DecodedValue *data) { uint8_t value; - VLOG(1) << "[ReadValue] Start"; + VLOG(20) << "[ReadValue] Start"; if (!buffer_.Read(&value, 1)) { DLOG(WARNING) << "[ReadValue] Marker data missing!"; @@ -148,7 +148,7 @@ class Decoder { bool ReadMessageHeader(Signature *signature, Marker *marker) { uint8_t values[2]; - VLOG(1) << "[ReadMessageHeader] Start"; + VLOG(20) << "[ReadMessageHeader] Start"; if (!buffer_.Read(values, 2)) { DLOG(WARNING) << "[ReadMessageHeader] Marker data missing!"; @@ -166,15 +166,15 @@ class Decoder { private: bool ReadNull(const Marker &marker, DecodedValue *data) { - VLOG(1) << "[ReadNull] Start"; + VLOG(20) << "[ReadNull] Start"; DCHECK(marker == Marker::Null) << "Received invalid marker!"; *data = DecodedValue(); - VLOG(1) << "[ReadNull] Success"; + VLOG(20) << "[ReadNull] Success"; return true; } bool ReadBool(const Marker &marker, DecodedValue *data) { - VLOG(1) << "[ReadBool] Start"; + VLOG(20) << "[ReadBool] Start"; DCHECK(marker == Marker::False || marker == Marker::True) << "Received invalid marker!"; if (marker == Marker::False) { @@ -182,20 +182,20 @@ class Decoder { } else { *data = DecodedValue(true); } - VLOG(1) << "[ReadBool] Success"; + VLOG(20) << "[ReadBool] Success"; return true; } bool ReadInt(const Marker &marker, DecodedValue *data) { uint8_t value = underlying_cast(marker); int64_t ret; - VLOG(1) << "[ReadInt] Start"; + VLOG(20) << "[ReadInt] Start"; if (value >= 240 || value <= 127) { - VLOG(1) << "[ReadInt] Found a TinyInt"; + VLOG(20) << "[ReadInt] Found a TinyInt"; ret = value; if (value >= 240) ret -= 256; } else if (marker == Marker::Int8) { - VLOG(1) << "[ReadInt] Found an Int8"; + VLOG(20) << "[ReadInt] Found an Int8"; int8_t tmp; if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { DLOG(WARNING) << "[ReadInt] Int8 missing data!"; @@ -203,7 +203,7 @@ class Decoder { } ret = tmp; } else if (marker == Marker::Int16) { - VLOG(1) << "[ReadInt] Found an Int16"; + VLOG(20) << "[ReadInt] Found an Int16"; int16_t tmp; if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { DLOG(WARNING) << "[ReadInt] Int16 missing data!"; @@ -211,7 +211,7 @@ class Decoder { } ret = bswap(tmp); } else if (marker == Marker::Int32) { - VLOG(1) << "[ReadInt] Found an Int32"; + VLOG(20) << "[ReadInt] Found an Int32"; int32_t tmp; if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { DLOG(WARNING) << "[ReadInt] Int32 missing data!"; @@ -219,7 +219,7 @@ class Decoder { } ret = bswap(tmp); } else if (marker == Marker::Int64) { - VLOG(1) << "[ReadInt] Found an Int64"; + VLOG(20) << "[ReadInt] Found an Int64"; if (!buffer_.Read(reinterpret_cast(&ret), sizeof(ret))) { DLOG(WARNING) << "[ReadInt] Int64 missing data!"; return false; @@ -231,14 +231,14 @@ class Decoder { return false; } *data = DecodedValue(ret); - VLOG(1) << "[ReadInt] Success"; + VLOG(20) << "[ReadInt] Success"; return true; } bool ReadDouble(const Marker marker, DecodedValue *data) { uint64_t value; double ret; - VLOG(1) << "[ReadDouble] Start"; + VLOG(20) << "[ReadDouble] Start"; DCHECK(marker == Marker::Float64) << "Received invalid marker!"; if (!buffer_.Read(reinterpret_cast(&value), sizeof(value))) { DLOG(WARNING) << "[ReadDouble] Missing data!"; @@ -248,17 +248,17 @@ class Decoder { // cppcheck-suppress invalidPointerCast ret = *reinterpret_cast(&value); *data = DecodedValue(ret); - VLOG(1) << "[ReadDouble] Success"; + VLOG(20) << "[ReadDouble] Success"; return true; } int64_t ReadTypeSize(const Marker &marker, const uint8_t type) { uint8_t value = underlying_cast(marker); if ((value & 0xF0) == underlying_cast(MarkerTiny[type])) { - VLOG(1) << "[ReadTypeSize] Found a TinyType"; + VLOG(20) << "[ReadTypeSize] Found a TinyType"; return value & 0x0F; } else if (marker == Marker8[type]) { - VLOG(1) << "[ReadTypeSize] Found a Type8"; + VLOG(20) << "[ReadTypeSize] Found a Type8"; uint8_t tmp; if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { DLOG(WARNING) << "[ReadTypeSize] Type8 missing data!"; @@ -266,7 +266,7 @@ class Decoder { } return tmp; } else if (marker == Marker16[type]) { - VLOG(1) << "[ReadTypeSize] Found a Type16"; + VLOG(20) << "[ReadTypeSize] Found a Type16"; uint16_t tmp; if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { DLOG(WARNING) << "[ReadTypeSize] Type16 missing data!"; @@ -275,7 +275,7 @@ class Decoder { tmp = bswap(tmp); return tmp; } else if (marker == Marker32[type]) { - VLOG(1) << "[ReadTypeSize] Found a Type32"; + VLOG(20) << "[ReadTypeSize] Found a Type32"; uint32_t tmp; if (!buffer_.Read(reinterpret_cast(&tmp), sizeof(tmp))) { DLOG(WARNING) << "[ReadTypeSize] Type32 missing data!"; @@ -291,7 +291,7 @@ class Decoder { } bool ReadString(const Marker &marker, DecodedValue *data) { - VLOG(1) << "[ReadString] Start"; + VLOG(20) << "[ReadString] Start"; auto size = ReadTypeSize(marker, MarkerString); if (size == -1) { DLOG(WARNING) << "[ReadString] Couldn't get size!"; @@ -304,12 +304,12 @@ class Decoder { } *data = DecodedValue(std::string(reinterpret_cast(ret.get()), size)); - VLOG(1) << "[ReadString] Success"; + VLOG(20) << "[ReadString] Success"; return true; } bool ReadList(const Marker &marker, DecodedValue *data) { - VLOG(1) << "[ReadList] Start"; + VLOG(20) << "[ReadList] Start"; auto size = ReadTypeSize(marker, MarkerList); if (size == -1) { DLOG(WARNING) << "[ReadList] Couldn't get size!"; @@ -323,12 +323,12 @@ class Decoder { } } *data = DecodedValue(ret); - VLOG(1) << "[ReadList] Success"; + VLOG(20) << "[ReadList] Success"; return true; } bool ReadMap(const Marker &marker, DecodedValue *data) { - VLOG(1) << "[ReadMap] Start"; + VLOG(20) << "[ReadMap] Start"; auto size = ReadTypeSize(marker, MarkerMap); if (size == -1) { DLOG(WARNING) << "[ReadMap] Couldn't get size!"; @@ -362,7 +362,7 @@ class Decoder { } *data = DecodedValue(ret); - VLOG(1) << "[ReadMap] Success"; + VLOG(20) << "[ReadMap] Success"; return true; } @@ -370,7 +370,7 @@ class Decoder { DecodedValue dv; DecodedVertex vertex; - VLOG(1) << "[ReadVertex] Start"; + VLOG(20) << "[ReadVertex] Start"; // read ID if (!ReadValue(&dv, DecodedValue::Type::Int)) { @@ -403,7 +403,7 @@ class Decoder { *data = DecodedValue(vertex); - VLOG(1) << "[ReadVertex] Success"; + VLOG(20) << "[ReadVertex] Success"; return true; } @@ -413,7 +413,7 @@ class Decoder { DecodedValue dv; DecodedEdge edge; - VLOG(1) << "[ReadEdge] Start"; + VLOG(20) << "[ReadEdge] Start"; if (!buffer_.Read(&value, 1)) { DLOG(WARNING) << "[ReadEdge] Missing marker and/or signature data!"; @@ -468,7 +468,7 @@ class Decoder { *data = DecodedValue(edge); - VLOG(1) << "[ReadEdge] Success"; + VLOG(20) << "[ReadEdge] Success"; return true; } @@ -477,7 +477,7 @@ class Decoder { DecodedValue dv; DecodedUnboundedEdge edge; - VLOG(1) << "[ReadUnboundedEdge] Start"; + VLOG(20) << "[ReadUnboundedEdge] Start"; // read ID if (!ReadValue(&dv, DecodedValue::Type::Int)) { @@ -502,7 +502,7 @@ class Decoder { *data = DecodedValue(edge); - VLOG(1) << "[ReadUnboundedEdge] Success"; + VLOG(20) << "[ReadUnboundedEdge] Success"; return true; } @@ -511,7 +511,7 @@ class Decoder { DecodedValue dv; DecodedPath path; - VLOG(1) << "[ReadPath] Start"; + VLOG(20) << "[ReadPath] Start"; // vertices if (!ReadValue(&dv, DecodedValue::Type::List)) { @@ -557,7 +557,7 @@ class Decoder { *data = DecodedValue(path); - VLOG(1) << "[ReadPath] Success"; + VLOG(20) << "[ReadPath] Success"; return true; } diff --git a/src/communication/rpc/client.cpp b/src/communication/rpc/client.cpp index 705e163cc..43f83c0f2 100644 --- a/src/communication/rpc/client.cpp +++ b/src/communication/rpc/client.cpp @@ -59,8 +59,8 @@ std::unique_ptr Client::Call(const Message &request) { const std::string &request_buffer = request_stream.str(); CHECK(request_buffer.size() <= std::numeric_limits::max()) << fmt::format( - "Trying to send message of size {}, max message size is {}", - request_buffer.size(), std::numeric_limits::max()); + "Trying to send message of size {}, max message size is {}", + request_buffer.size(), std::numeric_limits::max()); MessageSize request_data_size = request_buffer.size(); if (!socket_->Write(reinterpret_cast(&request_data_size), diff --git a/src/communication/rpc/client.hpp b/src/communication/rpc/client.hpp index f11411aca..555976a9f 100644 --- a/src/communication/rpc/client.hpp +++ b/src/communication/rpc/client.hpp @@ -11,6 +11,7 @@ #include "communication/rpc/messages.hpp" #include "io/network/endpoint.hpp" #include "io/network/socket.hpp" +#include "utils/demangle.hpp" namespace communication::rpc { @@ -29,7 +30,14 @@ class Client { "TRequestResponse::Request must be derived from Message"); static_assert(std::is_base_of::value, "TRequestResponse::Response must be derived from Message"); - std::unique_ptr response = Call(Req(std::forward(args)...)); + auto request = Req(std::forward(args)...); + + if (VLOG_IS_ON(12)) { + auto req_type = utils::Demangle(request.type_index().name()); + LOG(INFO) << "[RpcClient] sent " << (req_type ? req_type.value() : ""); + } + + std::unique_ptr response = Call(request); auto *real_response = dynamic_cast(response.get()); if (!real_response && response) { // Since message_id was checked in private Call function, this means @@ -38,6 +46,13 @@ class Client { socket_ = std::experimental::nullopt; return nullptr; } + + if (VLOG_IS_ON(12)) { + auto res_type = utils::Demangle(response->type_index().name()); + LOG(INFO) << "[RpcClient] received " + << (res_type ? res_type.value() : ""); + } + response.release(); return std::unique_ptr(real_response); } diff --git a/src/communication/rpc/protocol.cpp b/src/communication/rpc/protocol.cpp index 4e167507f..507fbd405 100644 --- a/src/communication/rpc/protocol.cpp +++ b/src/communication/rpc/protocol.cpp @@ -9,6 +9,7 @@ #include "communication/rpc/messages.hpp" #include "communication/rpc/protocol.hpp" #include "communication/rpc/server.hpp" +#include "utils/demangle.hpp" namespace communication::rpc { @@ -44,6 +45,11 @@ void Session::Execute() { "Session trying to execute an unregistered RPC call!"); } + if (VLOG_IS_ON(12)) { + auto req_type = utils::Demangle(request->type_index().name()); + LOG(INFO) << "[RpcServer] received " << (req_type ? req_type.value() : ""); + } + std::unique_ptr response = it->second(*(request.get())); if (!response) { @@ -73,6 +79,11 @@ void Session::Execute() { if (!socket_.Write(buffer)) { throw SessionException("Couldn't send response data!"); } + + if (VLOG_IS_ON(12)) { + auto res_type = utils::Demangle(response->type_index().name()); + LOG(INFO) << "[RpcServer] sent " << (res_type ? res_type.value() : ""); + } } StreamBuffer Session::Allocate() { return buffer_.Allocate(); } diff --git a/src/communication/rpc/server.hpp b/src/communication/rpc/server.hpp index 0577c268b..b663b6e92 100644 --- a/src/communication/rpc/server.hpp +++ b/src/communication/rpc/server.hpp @@ -10,6 +10,7 @@ #include "data_structures/concurrent/concurrent_map.hpp" #include "data_structures/queue.hpp" #include "io/network/endpoint.hpp" +#include "utils/demangle.hpp" namespace communication::rpc { @@ -47,6 +48,14 @@ class Server { return callback(message); }); CHECK(got.second) << "Callback for that message type already registered"; + if (VLOG_IS_ON(12)) { + auto req_type = + utils::Demangle(typeid(typename TRequestResponse::Request).name()); + auto res_type = + utils::Demangle(typeid(typename TRequestResponse::Response).name()); + LOG(INFO) << "[RpcServer] register " << (req_type ? req_type.value() : "") + << " -> " << (res_type ? res_type.value() : ""); + } } private: diff --git a/src/database/storage_gc.hpp b/src/database/storage_gc.hpp index 852791d45..7fdd6db5f 100644 --- a/src/database/storage_gc.hpp +++ b/src/database/storage_gc.hpp @@ -75,7 +75,7 @@ class StorageGc { vertices_.gc_.Run(snapshot, tx_engine_); edges_.gc_.Run(snapshot, tx_engine_); - VLOG(1) << "Garbage collector mvcc phase time: " << x.Elapsed().count(); + VLOG(21) << "Garbage collector mvcc phase time: " << x.Elapsed().count(); } // This has to be run sequentially after gc because gc modifies // version_lists and changes the oldest visible record, on which Refresh @@ -85,7 +85,7 @@ class StorageGc { utils::Timer x; storage_.labels_index_.Refresh(snapshot, tx_engine_); storage_.label_property_index_.Refresh(snapshot, tx_engine_); - VLOG(1) << "Garbage collector index phase time: " << x.Elapsed().count(); + VLOG(21) << "Garbage collector index phase time: " << x.Elapsed().count(); } { // We free expired objects with snapshot.back(), which is @@ -100,21 +100,21 @@ class StorageGc { vertices_.record_deleter_.FreeExpiredObjects(snapshot.back()); edges_.version_list_deleter_.FreeExpiredObjects(snapshot.back()); vertices_.version_list_deleter_.FreeExpiredObjects(snapshot.back()); - VLOG(1) << "Garbage collector deferred deletion phase time: " - << x.Elapsed().count(); + VLOG(21) << "Garbage collector deferred deletion phase time: " + << x.Elapsed().count(); } LOG(INFO) << "Garbage collector finished"; - VLOG(2) << "gc snapshot: " << snapshot; - VLOG(2) << "edge_record_deleter_ size: " << edges_.record_deleter_.Count(); - VLOG(2) << "vertex record deleter_ size: " - << vertices_.record_deleter_.Count(); - VLOG(2) << "edge_version_list_deleter_ size: " - << edges_.version_list_deleter_.Count(); - VLOG(2) << "vertex_version_list_deleter_ size: " - << vertices_.version_list_deleter_.Count(); - VLOG(2) << "vertices_ size: " << storage_.vertices_.access().size(); - VLOG(2) << "edges_ size: " << storage_.edges_.access().size(); + VLOG(21) << "gc snapshot: " << snapshot; + VLOG(21) << "edge_record_deleter_ size: " << edges_.record_deleter_.Count(); + VLOG(21) << "vertex record deleter_ size: " + << vertices_.record_deleter_.Count(); + VLOG(21) << "edge_version_list_deleter_ size: " + << edges_.version_list_deleter_.Count(); + VLOG(21) << "vertex_version_list_deleter_ size: " + << vertices_.version_list_deleter_.Count(); + VLOG(21) << "vertices_ size: " << storage_.vertices_.access().size(); + VLOG(21) << "edges_ size: " << storage_.edges_.access().size(); } private: diff --git a/src/durability/snapshot_decoder.hpp b/src/durability/snapshot_decoder.hpp index a6f8d530d..ab287eaea 100644 --- a/src/durability/snapshot_decoder.hpp +++ b/src/durability/snapshot_decoder.hpp @@ -52,7 +52,7 @@ class SnapshotDecoder : public Decoder { vertex.out.emplace_back(*edge); } - VLOG(1) << "[ReadSnapshotVertex] Success"; + VLOG(20) << "[ReadSnapshotVertex] Success"; return vertex; } @@ -61,7 +61,7 @@ class SnapshotDecoder : public Decoder { DecodedValue dv; DecodedInlinedVertexEdge edge; - VLOG(1) << "[ReadSnapshotEdge] Start"; + VLOG(20) << "[ReadSnapshotEdge] Start"; // read ID if (!Decoder::ReadValue(&dv, DecodedValue::Type::Int)) { @@ -84,7 +84,7 @@ class SnapshotDecoder : public Decoder { } edge.type = dv.ValueString(); - VLOG(1) << "[ReadSnapshotEdge] Success"; + VLOG(20) << "[ReadSnapshotEdge] Success"; return edge; } diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 94660fb53..9245ed3fc 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -140,6 +140,8 @@ std::shared_ptr Interpreter::QueryToPlan( database::GraphDb::Type::DISTRIBUTED_MASTER) { auto distributed_plan = MakeDistributedPlan( *tmp_logical_plan, ctx.symbol_table_, next_plan_id_); + VLOG(10) << "[Interpreter] Created plan for distributed execution " + << next_plan_id_ - 1; return std::make_shared(std::move(distributed_plan), query_plan_cost_estimation, plan_dispatcher_); diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 620f97d90..59eddddf5 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -3021,6 +3021,8 @@ class RemotePuller { void Initialize(Context &context) { if (!remote_pulls_initialized_) { + VLOG(10) << "[RemotePuller] [" << context.db_accessor_.transaction_id() + << "] [" << plan_id_ << "] initialized"; for (auto &worker_id : worker_ids_) { UpdatePullForWorker(worker_id, context); } @@ -3032,7 +3034,9 @@ class RemotePuller { // If we don't have results for a worker, check if his remote pull // finished and save results locally. - auto move_frames = [this](int worker_id, auto remote_results) { + auto move_frames = [this, &context](int worker_id, auto remote_results) { + VLOG(10) << "[RemotePuller] [" << context.db_accessor_.transaction_id() + << "] [" << plan_id_ << "] received results from " << worker_id; remote_results_[worker_id] = std::move(remote_results.frames); // Since we return and remove results from the back of the vector, // reverse the results so the first to return is on the end of the @@ -3053,10 +3057,16 @@ class RemotePuller { auto remote_results = remote_pull.get(); switch (remote_results.pull_state) { case distributed::PullState::CURSOR_EXHAUSTED: + VLOG(10) << "[RemotePuller] [" + << context.db_accessor_.transaction_id() << "] [" << plan_id_ + << "] cursor exhausted from " << worker_id; move_frames(worker_id, remote_results); remote_pulls_.erase(found_it); break; case distributed::PullState::CURSOR_IN_PROGRESS: + VLOG(10) << "[RemotePuller] [" + << context.db_accessor_.transaction_id() << "] [" << plan_id_ + << "] cursor in progress from " << worker_id; move_frames(worker_id, remote_results); UpdatePullForWorker(worker_id, context); break; @@ -3170,9 +3180,15 @@ class PullRemoteCursor : public Cursor { // If there are no remote results available, try to pull and return // local results. if (input_cursor_ && input_cursor_->Pull(frame, context)) { + VLOG(10) << "[PullRemoteCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] producing local results "; return true; } + VLOG(10) << "[PullRemoteCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] no results available, sleeping "; // If there aren't any local/remote results available, sleep. std::this_thread::sleep_for( std::chrono::microseconds(FLAGS_remote_pull_sleep_micros)); @@ -3182,6 +3198,9 @@ class PullRemoteCursor : public Cursor { // No more remote results, make sure local results get exhausted. if (!have_remote_results) { if (input_cursor_ && input_cursor_->Pull(frame, context)) { + VLOG(10) << "[PullRemoteCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] producing local results "; return true; } return false; @@ -3189,6 +3208,10 @@ class PullRemoteCursor : public Cursor { { int worker_id = remote_puller_.GetWorkerId(last_pulled_worker_id_index_); + VLOG(10) << "[PullRemoteCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] producing results from worker " + << worker_id; auto result = remote_puller_.PopResultFromWorker(worker_id); for (size_t i = 0; i < self_.symbols().size(); ++i) { frame[self_.symbols()[i]] = std::move(result[i]); @@ -3224,6 +3247,9 @@ class SynchronizeCursor : public Cursor { } // Yield local stuff while available. if (!local_frames_.empty()) { + VLOG(10) << "[SynchronizeCursor] [" + << context.db_accessor_.transaction_id() + << "] producing local results"; auto &result = local_frames_.back(); for (size_t i = 0; i < frame.elems().size(); ++i) { if (self_.advance_command()) { @@ -3236,8 +3262,12 @@ class SynchronizeCursor : public Cursor { } // We're out of local stuff, yield from pull_remote if available. - if (pull_remote_cursor_ && pull_remote_cursor_->Pull(frame, context)) + if (pull_remote_cursor_ && pull_remote_cursor_->Pull(frame, context)) { + VLOG(10) << "[SynchronizeCursor] [" + << context.db_accessor_.transaction_id() + << "] producing remote results"; return true; + } return false; } @@ -3254,6 +3284,8 @@ class SynchronizeCursor : public Cursor { std::vector> local_frames_; void InitialPull(Frame &frame, Context &context) { + VLOG(10) << "[SynchronizeCursor] [" << context.db_accessor_.transaction_id() + << "] initial pull"; auto &db = context.db_accessor_.db(); // Tell all workers to accumulate, only if there is a remote pull. @@ -3453,6 +3485,9 @@ class PullRemoteOrderByCursor : public Cursor { }; if (!merge_initialized_) { + VLOG(10) << "[PullRemoteOrderBy] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] initialize"; remote_puller_.Initialize(context); missing_results_from_ = remote_puller_.Workers(); missing_master_result_ = true; @@ -3487,6 +3522,9 @@ class PullRemoteOrderByCursor : public Cursor { } if (!has_all_result) { + VLOG(10) << "[PullRemoteOrderByCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] missing results, sleep"; // If we don't have results from all workers, sleep before continuing. std::this_thread::sleep_for( std::chrono::microseconds(FLAGS_remote_pull_sleep_micros)); @@ -3516,8 +3554,15 @@ class PullRemoteOrderByCursor : public Cursor { restore_frame(result_it->remote_result); if (result_it->worker_id) { + VLOG(10) << "[PullRemoteOrderByCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] producing results from worker " + << result_it->worker_id.value(); missing_results_from_.push_back(result_it->worker_id.value()); } else { + VLOG(10) << "[PullRemoteOrderByCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] producing local results"; missing_master_result_ = true; } diff --git a/src/stats/stats.cpp b/src/stats/stats.cpp index 2768f30d2..2abc28c63 100644 --- a/src/stats/stats.cpp +++ b/src/stats/stats.cpp @@ -74,7 +74,7 @@ void StatsDispatchMain(const io::network::Endpoint &endpoint) { flush_batch(); } - VLOG(10) << fmt::format("Sent {} out of {} events from queue.", sent, + VLOG(30) << fmt::format("Sent {} out of {} events from queue.", sent, total); last.delete_tail(); std::this_thread::sleep_for( diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index 5995afe68..2c63cca61 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -13,6 +13,7 @@ SingleNodeEngine::SingleNodeEngine(durability::WriteAheadLog *wal) : wal_(wal) {} Transaction *SingleNodeEngine::Begin() { + VLOG(11) << "[Tx] Starting transaction " << counter_ + 1; std::lock_guard guard(lock_); transaction_id_t id{++counter_}; @@ -50,6 +51,7 @@ command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) { } void SingleNodeEngine::Commit(const Transaction &t) { + VLOG(11) << "[Tx] Commiting transaction " << t.id_; std::lock_guard guard(lock_); clog_.set_committed(t.id_); active_.remove(t.id_); @@ -60,6 +62,7 @@ void SingleNodeEngine::Commit(const Transaction &t) { } void SingleNodeEngine::Abort(const Transaction &t) { + VLOG(11) << "[Tx] Aborting transaction " << t.id_; std::lock_guard guard(lock_); clog_.set_aborted(t.id_); active_.remove(t.id_); diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index f89736dd2..fbb872386 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -25,6 +25,7 @@ Transaction *WorkerEngine::Begin() { Transaction *tx = new Transaction(data.tx_id, data.snapshot, *this); auto insertion = active_.access().insert(data.tx_id, tx); CHECK(insertion.second) << "Failed to start creation from worker"; + VLOG(11) << "[Tx] Starting worker transaction " << data.tx_id; return tx; } @@ -63,12 +64,14 @@ void WorkerEngine::Commit(const Transaction &t) { auto res = master_client_pool_.Call(t.id_); CHECK(res) << "CommitRpc failed"; ClearSingleTransaction(t.id_); + VLOG(11) << "[Tx] Commiting worker transaction " << t.id_; } void WorkerEngine::Abort(const Transaction &t) { auto res = master_client_pool_.Call(t.id_); CHECK(res) << "AbortRpc failed"; ClearSingleTransaction(t.id_); + VLOG(11) << "[Tx] Aborting worker transaction " << t.id_; } CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {