diff --git a/src/durability/version.hpp b/src/durability/version.hpp index d4517b032..0385695bf 100644 --- a/src/durability/version.hpp +++ b/src/durability/version.hpp @@ -26,12 +26,12 @@ constexpr int64_t kVersion{6}; // 3) Vertex generator ID // 4) Edge generator ID // -// 5) A list of label+property indices. -// // The following two entries are required when recovering from snapshot combined // with WAL to determine record visibility. -// 6) Transactional ID of the snapshooter -// 7) Transactional snapshot of the snapshooter +// 5) Transactional ID of the snapshooter +// 6) Transactional snapshot of the snapshooter +// +// 7) A list of label+property indices. // // We must inline edges with nodes because some edges might be stored on other // worker (edges are always stored only on the worker of the edge source). diff --git a/tests/manual/CMakeLists.txt b/tests/manual/CMakeLists.txt index 5fff30a64..3db23d003 100644 --- a/tests/manual/CMakeLists.txt +++ b/tests/manual/CMakeLists.txt @@ -65,6 +65,9 @@ target_link_libraries(${test_prefix}single_query memgraph_lib kvstore_dummy_lib) add_manual_test(sl_position_and_count.cpp) target_link_libraries(${test_prefix}sl_position_and_count memgraph_lib kvstore_dummy_lib) +add_manual_test(snapshot_explorer.cpp) +target_link_libraries(${test_prefix}snapshot_explorer memgraph_lib kvstore_dummy_lib) + add_manual_test(stripped_timing.cpp) target_link_libraries(${test_prefix}stripped_timing memgraph_lib kvstore_dummy_lib) @@ -74,6 +77,9 @@ target_link_libraries(${test_prefix}ssl_client mg-communication) add_manual_test(ssl_server.cpp) target_link_libraries(${test_prefix}ssl_server mg-communication) +add_manual_test(wal_explorer.cpp) +target_link_libraries(${test_prefix}wal_explorer memgraph_lib kvstore_dummy_lib) + add_manual_test(xorshift.cpp) target_link_libraries(${test_prefix}xorshift mg-utils) diff --git a/tests/manual/snapshot_explorer.cpp b/tests/manual/snapshot_explorer.cpp new file mode 100644 index 000000000..085296813 --- /dev/null +++ b/tests/manual/snapshot_explorer.cpp @@ -0,0 +1,109 @@ +#include +#include + +#include +#include + +#include "durability/hashed_file_reader.hpp" +#include "durability/recovery.hpp" +#include "durability/snapshot_decoder.hpp" +#include "durability/snapshot_value.hpp" +#include "durability/version.hpp" + +DEFINE_string(snapshot_file, "", "Snapshot file location"); + +using communication::bolt::Value; +namespace fs = std::experimental::filesystem; + +int main(int argc, char *argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + // At the time this was written, the version was 6. This makes sure we update + // the explorer when we bump the snapshot version. + static_assert(durability::kVersion == 6, + "Wrong snapshot version, please update!"); + + fs::path snapshot_path(FLAGS_snapshot_file); + CHECK(fs::exists(snapshot_path)) << "File doesn't exist!"; + + HashedFileReader reader; + durability::SnapshotDecoder decoder(reader); + + CHECK(reader.Open(snapshot_path)) << "Couldn't open snapshot file!"; + + auto magic_number = durability::kMagicNumber; + reader.Read(magic_number.data(), magic_number.size()); + CHECK(magic_number == durability::kMagicNumber) << "Magic number mismatch"; + + int64_t vertex_count, edge_count; + uint64_t hash; + + CHECK(durability::ReadSnapshotSummary(reader, vertex_count, edge_count, hash)) + << "ReadSnapshotSummary failed"; + + LOG(INFO) << "Vertex count: " << vertex_count; + LOG(INFO) << "Edge count: " << edge_count; + LOG(INFO) << "Hash: " << hash; + + Value dv; + + decoder.ReadValue(&dv, Value::Type::Int); + CHECK(dv.ValueInt() == durability::kVersion) + << "Snapshot version mismatch" + << ", got " << dv.ValueInt() << " expected " << durability::kVersion; + + decoder.ReadValue(&dv, Value::Type::Int); + LOG(INFO) << "Snapshot was generated for worker id: " << dv.ValueInt(); + + decoder.ReadValue(&dv, Value::Type::Int); + LOG(INFO) << "Vertex generator last id: " << dv.ValueInt(); + + decoder.ReadValue(&dv, Value::Type::Int); + LOG(INFO) << "Edge generator last id: " << dv.ValueInt(); + + decoder.ReadValue(&dv, Value::Type::Int); + LOG(INFO) << "Transactional ID of the snapshooter " << dv.ValueInt(); + + decoder.ReadValue(&dv, Value::Type::List); + for (const auto &value : dv.ValueList()) { + CHECK(value.IsInt()) << "Transaction is not a number!"; + LOG(INFO) << "Transactional snapshot of the snapshooter " + << value.ValueInt(); + } + + decoder.ReadValue(&dv, Value::Type::List); + + auto index_value = dv.ValueList(); + for (auto it = index_value.begin(); it != index_value.end();) { + auto label = *it++; + CHECK(label.IsString()) << "Label is not a string!"; + CHECK(it != index_value.end()) << "Missing propery for label " + << label.ValueString(); + auto property = *it++; + CHECK(property.IsString()) << "Property is not a string!"; + LOG(INFO) << "Adding label " << label.ValueString() << " and property " + << property.ValueString(); + } + + for (int64_t i = 0; i < vertex_count; ++i) { + auto vertex = decoder.ReadSnapshotVertex(); + CHECK(vertex) << "Failed to read vertex " << i; + } + + for (int64_t i = 0; i < edge_count; ++i) { + auto edge = decoder.ReadValue(&dv, Value::Type::Edge); + CHECK(edge) << "Failed to read edge " << i; + } + + reader.ReadType(vertex_count); + LOG(INFO) << "Vertex count: " << vertex_count; + + reader.ReadType(edge_count); + LOG(INFO) << "Edge count:" << edge_count; + + LOG(INFO) << "Hash: " << reader.hash(); + + CHECK(reader.Close()) << "Failed to close the reader"; + return 0; +} diff --git a/tests/manual/wal_explorer.cpp b/tests/manual/wal_explorer.cpp new file mode 100644 index 000000000..1fdd8dac4 --- /dev/null +++ b/tests/manual/wal_explorer.cpp @@ -0,0 +1,86 @@ +#include +#include +#include + +#include +#include + +#include "database/state_delta.hpp" +#include "durability/hashed_file_reader.hpp" +#include "durability/recovery.hpp" +#include "durability/wal.hpp" +#include "transactions/type.hpp" + +DEFINE_string(wal_file, "", "WAL file location"); + +using communication::bolt::Value; +namespace fs = std::experimental::filesystem; + +std::string StateDeltaTypeToString(database::StateDelta::Type type) { + switch (type) { + case database::StateDelta::Type::TRANSACTION_BEGIN: + return "TRANSACTION_BEGIN"; + case database::StateDelta::Type::TRANSACTION_COMMIT: + return "TRANSACTION_COMMIT"; + case database::StateDelta::Type::TRANSACTION_ABORT: + return "TRANSACTION_ABORT"; + case database::StateDelta::Type::CREATE_VERTEX: + return "CREATE_VERTEX"; + case database::StateDelta::Type::CREATE_EDGE: + return "CREATE_EDGE"; + case database::StateDelta::Type::ADD_OUT_EDGE: + return "ADD_OUT_EDGE"; + case database::StateDelta::Type::REMOVE_OUT_EDGE: + return "REMOVE_OUT_EDGE"; + case database::StateDelta::Type::ADD_IN_EDGE: + return "ADD_IN_EDGE"; + case database::StateDelta::Type::REMOVE_IN_EDGE: + return "REMOVE_IN_EDGE"; + case database::StateDelta::Type::SET_PROPERTY_VERTEX: + return "SET_PROPERTY_VERTEX"; + case database::StateDelta::Type::SET_PROPERTY_EDGE: + return "SET_PROPERTY_EDGE"; + case database::StateDelta::Type::ADD_LABEL: + return "ADD_LABEL"; + case database::StateDelta::Type::REMOVE_LABEL: + return "REMOVE_LABEL"; + case database::StateDelta::Type::REMOVE_VERTEX: + return "REMOVE_VERTEX"; + case database::StateDelta::Type::REMOVE_EDGE: + return "REMOVE_EDGE"; + case database::StateDelta::Type::BUILD_INDEX: + return "BUILD_INDEX"; + } +} + +int main(int argc, char *argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + fs::path wal_path(FLAGS_wal_file); + CHECK(fs::exists(wal_path)) << "File doesn't exist!"; + + HashedFileReader wal_reader; + CHECK(wal_reader.Open(wal_path)) << "Couldn't open wal file!"; + + communication::bolt::Decoder decoder(wal_reader); + tx::TransactionId max_observed_tx_id{0}; + tx::TransactionId min_observed_tx_id{std::numeric_limits::max()}; + + std::vector wal_entries; + + while (true) { + auto delta = database::StateDelta::Decode(wal_reader, decoder); + if (!delta) break; + + max_observed_tx_id = std::max(max_observed_tx_id, delta->transaction_id); + min_observed_tx_id = std::min(min_observed_tx_id, delta->transaction_id); + LOG(INFO) << "Found tx: " << delta->transaction_id << " " + << StateDeltaTypeToString(delta->type); + } + + LOG(INFO) << "Min tx " << min_observed_tx_id; + LOG(INFO) << "Max tx " << max_observed_tx_id; + + return 0; +}