Durability utility
Summary: Added WAL and Snapshot explorer utility executables that read a wal or a snapshot file and print the content of it, but the main purpose is that they check it. This is useful when debugging durability and want to know where it gets stuck. Reviewers: dgleich, buda Reviewed By: buda Subscribers: ipaljak, vkasljevic, teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1422
This commit is contained in:
parent
4ee3db80b0
commit
cd3210fb9b
@ -26,12 +26,12 @@ constexpr int64_t kVersion{6};
|
||||
// 3) Vertex generator ID
|
||||
// 4) Edge generator ID
|
||||
//
|
||||
// 5) A list of label+property indices.
|
||||
//
|
||||
// The following two entries are required when recovering from snapshot combined
|
||||
// with WAL to determine record visibility.
|
||||
// 6) Transactional ID of the snapshooter
|
||||
// 7) Transactional snapshot of the snapshooter
|
||||
// 5) Transactional ID of the snapshooter
|
||||
// 6) Transactional snapshot of the snapshooter
|
||||
//
|
||||
// 7) A list of label+property indices.
|
||||
//
|
||||
// We must inline edges with nodes because some edges might be stored on other
|
||||
// worker (edges are always stored only on the worker of the edge source).
|
||||
|
@ -65,6 +65,9 @@ target_link_libraries(${test_prefix}single_query memgraph_lib kvstore_dummy_lib)
|
||||
add_manual_test(sl_position_and_count.cpp)
|
||||
target_link_libraries(${test_prefix}sl_position_and_count memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
add_manual_test(snapshot_explorer.cpp)
|
||||
target_link_libraries(${test_prefix}snapshot_explorer memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
add_manual_test(stripped_timing.cpp)
|
||||
target_link_libraries(${test_prefix}stripped_timing memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
@ -74,6 +77,9 @@ target_link_libraries(${test_prefix}ssl_client mg-communication)
|
||||
add_manual_test(ssl_server.cpp)
|
||||
target_link_libraries(${test_prefix}ssl_server mg-communication)
|
||||
|
||||
add_manual_test(wal_explorer.cpp)
|
||||
target_link_libraries(${test_prefix}wal_explorer memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
add_manual_test(xorshift.cpp)
|
||||
target_link_libraries(${test_prefix}xorshift mg-utils)
|
||||
|
||||
|
109
tests/manual/snapshot_explorer.cpp
Normal file
109
tests/manual/snapshot_explorer.cpp
Normal file
@ -0,0 +1,109 @@
|
||||
#include <experimental/filesystem>
|
||||
#include <iostream>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "durability/hashed_file_reader.hpp"
|
||||
#include "durability/recovery.hpp"
|
||||
#include "durability/snapshot_decoder.hpp"
|
||||
#include "durability/snapshot_value.hpp"
|
||||
#include "durability/version.hpp"
|
||||
|
||||
DEFINE_string(snapshot_file, "", "Snapshot file location");
|
||||
|
||||
using communication::bolt::Value;
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
// At the time this was written, the version was 6. This makes sure we update
|
||||
// the explorer when we bump the snapshot version.
|
||||
static_assert(durability::kVersion == 6,
|
||||
"Wrong snapshot version, please update!");
|
||||
|
||||
fs::path snapshot_path(FLAGS_snapshot_file);
|
||||
CHECK(fs::exists(snapshot_path)) << "File doesn't exist!";
|
||||
|
||||
HashedFileReader reader;
|
||||
durability::SnapshotDecoder<HashedFileReader> decoder(reader);
|
||||
|
||||
CHECK(reader.Open(snapshot_path)) << "Couldn't open snapshot file!";
|
||||
|
||||
auto magic_number = durability::kMagicNumber;
|
||||
reader.Read(magic_number.data(), magic_number.size());
|
||||
CHECK(magic_number == durability::kMagicNumber) << "Magic number mismatch";
|
||||
|
||||
int64_t vertex_count, edge_count;
|
||||
uint64_t hash;
|
||||
|
||||
CHECK(durability::ReadSnapshotSummary(reader, vertex_count, edge_count, hash))
|
||||
<< "ReadSnapshotSummary failed";
|
||||
|
||||
LOG(INFO) << "Vertex count: " << vertex_count;
|
||||
LOG(INFO) << "Edge count: " << edge_count;
|
||||
LOG(INFO) << "Hash: " << hash;
|
||||
|
||||
Value dv;
|
||||
|
||||
decoder.ReadValue(&dv, Value::Type::Int);
|
||||
CHECK(dv.ValueInt() == durability::kVersion)
|
||||
<< "Snapshot version mismatch"
|
||||
<< ", got " << dv.ValueInt() << " expected " << durability::kVersion;
|
||||
|
||||
decoder.ReadValue(&dv, Value::Type::Int);
|
||||
LOG(INFO) << "Snapshot was generated for worker id: " << dv.ValueInt();
|
||||
|
||||
decoder.ReadValue(&dv, Value::Type::Int);
|
||||
LOG(INFO) << "Vertex generator last id: " << dv.ValueInt();
|
||||
|
||||
decoder.ReadValue(&dv, Value::Type::Int);
|
||||
LOG(INFO) << "Edge generator last id: " << dv.ValueInt();
|
||||
|
||||
decoder.ReadValue(&dv, Value::Type::Int);
|
||||
LOG(INFO) << "Transactional ID of the snapshooter " << dv.ValueInt();
|
||||
|
||||
decoder.ReadValue(&dv, Value::Type::List);
|
||||
for (const auto &value : dv.ValueList()) {
|
||||
CHECK(value.IsInt()) << "Transaction is not a number!";
|
||||
LOG(INFO) << "Transactional snapshot of the snapshooter "
|
||||
<< value.ValueInt();
|
||||
}
|
||||
|
||||
decoder.ReadValue(&dv, Value::Type::List);
|
||||
|
||||
auto index_value = dv.ValueList();
|
||||
for (auto it = index_value.begin(); it != index_value.end();) {
|
||||
auto label = *it++;
|
||||
CHECK(label.IsString()) << "Label is not a string!";
|
||||
CHECK(it != index_value.end()) << "Missing propery for label "
|
||||
<< label.ValueString();
|
||||
auto property = *it++;
|
||||
CHECK(property.IsString()) << "Property is not a string!";
|
||||
LOG(INFO) << "Adding label " << label.ValueString() << " and property "
|
||||
<< property.ValueString();
|
||||
}
|
||||
|
||||
for (int64_t i = 0; i < vertex_count; ++i) {
|
||||
auto vertex = decoder.ReadSnapshotVertex();
|
||||
CHECK(vertex) << "Failed to read vertex " << i;
|
||||
}
|
||||
|
||||
for (int64_t i = 0; i < edge_count; ++i) {
|
||||
auto edge = decoder.ReadValue(&dv, Value::Type::Edge);
|
||||
CHECK(edge) << "Failed to read edge " << i;
|
||||
}
|
||||
|
||||
reader.ReadType(vertex_count);
|
||||
LOG(INFO) << "Vertex count: " << vertex_count;
|
||||
|
||||
reader.ReadType(edge_count);
|
||||
LOG(INFO) << "Edge count:" << edge_count;
|
||||
|
||||
LOG(INFO) << "Hash: " << reader.hash();
|
||||
|
||||
CHECK(reader.Close()) << "Failed to close the reader";
|
||||
return 0;
|
||||
}
|
86
tests/manual/wal_explorer.cpp
Normal file
86
tests/manual/wal_explorer.cpp
Normal file
@ -0,0 +1,86 @@
|
||||
#include <experimental/filesystem>
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "database/state_delta.hpp"
|
||||
#include "durability/hashed_file_reader.hpp"
|
||||
#include "durability/recovery.hpp"
|
||||
#include "durability/wal.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
|
||||
DEFINE_string(wal_file, "", "WAL file location");
|
||||
|
||||
using communication::bolt::Value;
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
std::string StateDeltaTypeToString(database::StateDelta::Type type) {
|
||||
switch (type) {
|
||||
case database::StateDelta::Type::TRANSACTION_BEGIN:
|
||||
return "TRANSACTION_BEGIN";
|
||||
case database::StateDelta::Type::TRANSACTION_COMMIT:
|
||||
return "TRANSACTION_COMMIT";
|
||||
case database::StateDelta::Type::TRANSACTION_ABORT:
|
||||
return "TRANSACTION_ABORT";
|
||||
case database::StateDelta::Type::CREATE_VERTEX:
|
||||
return "CREATE_VERTEX";
|
||||
case database::StateDelta::Type::CREATE_EDGE:
|
||||
return "CREATE_EDGE";
|
||||
case database::StateDelta::Type::ADD_OUT_EDGE:
|
||||
return "ADD_OUT_EDGE";
|
||||
case database::StateDelta::Type::REMOVE_OUT_EDGE:
|
||||
return "REMOVE_OUT_EDGE";
|
||||
case database::StateDelta::Type::ADD_IN_EDGE:
|
||||
return "ADD_IN_EDGE";
|
||||
case database::StateDelta::Type::REMOVE_IN_EDGE:
|
||||
return "REMOVE_IN_EDGE";
|
||||
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
|
||||
return "SET_PROPERTY_VERTEX";
|
||||
case database::StateDelta::Type::SET_PROPERTY_EDGE:
|
||||
return "SET_PROPERTY_EDGE";
|
||||
case database::StateDelta::Type::ADD_LABEL:
|
||||
return "ADD_LABEL";
|
||||
case database::StateDelta::Type::REMOVE_LABEL:
|
||||
return "REMOVE_LABEL";
|
||||
case database::StateDelta::Type::REMOVE_VERTEX:
|
||||
return "REMOVE_VERTEX";
|
||||
case database::StateDelta::Type::REMOVE_EDGE:
|
||||
return "REMOVE_EDGE";
|
||||
case database::StateDelta::Type::BUILD_INDEX:
|
||||
return "BUILD_INDEX";
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
fs::path wal_path(FLAGS_wal_file);
|
||||
CHECK(fs::exists(wal_path)) << "File doesn't exist!";
|
||||
|
||||
HashedFileReader wal_reader;
|
||||
CHECK(wal_reader.Open(wal_path)) << "Couldn't open wal file!";
|
||||
|
||||
communication::bolt::Decoder<HashedFileReader> decoder(wal_reader);
|
||||
tx::TransactionId max_observed_tx_id{0};
|
||||
tx::TransactionId min_observed_tx_id{std::numeric_limits<uint64_t>::max()};
|
||||
|
||||
std::vector<std::string> wal_entries;
|
||||
|
||||
while (true) {
|
||||
auto delta = database::StateDelta::Decode(wal_reader, decoder);
|
||||
if (!delta) break;
|
||||
|
||||
max_observed_tx_id = std::max(max_observed_tx_id, delta->transaction_id);
|
||||
min_observed_tx_id = std::min(min_observed_tx_id, delta->transaction_id);
|
||||
LOG(INFO) << "Found tx: " << delta->transaction_id << " "
|
||||
<< StateDeltaTypeToString(delta->type);
|
||||
}
|
||||
|
||||
LOG(INFO) << "Min tx " << min_observed_tx_id;
|
||||
LOG(INFO) << "Max tx " << max_observed_tx_id;
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue
Block a user