Durability - snapshot summary refactor

Summary:
- Removed durability::Summary because it was wired into reader and stopped me from recovering WAL files.
- Refactored and renamed BufferedFile(Reader/Writer) to HashedFile(Reader/Writer).
- Vertex and edge counts in the snapshot are now hashed.

Breaking snapshot compatibility again (hashing), but since the previous version was not released, and we are not caching snapshots, the previous version does not need to be supported.

Reviewers: teon.banek, mislav.bradac, buda

Reviewed By: teon.banek, mislav.bradac

Subscribers: dgleich, pullbot

Differential Revision: https://phabricator.memgraph.io/D932
This commit is contained in:
florijan 2017-10-25 13:01:53 +02:00
parent 8d1e463e6b
commit be9c875fa9
13 changed files with 245 additions and 273 deletions

View File

@ -1,97 +0,0 @@
#pragma once
#include <fstream>
#include "durability/summary.hpp"
#include "hasher.hpp"
#include "utils/bswap.hpp"
/**
* Buffer reads data from file and calculates hash of read data. Implements
* template param Buffer interface from BaseDecoder class. Should be closed
* before destructing.
*/
class FileReaderBuffer {
public:
/**
* Opens ifstream to a file, resets hash and reads summary. Returns false if
* opening fails.
* @param file:
* path to a file to which should be read.
* @param summary:
* reference to a summary object where summary should be written.
*/
bool Open(const std::string &file, snapshot::Summary &summary) {
input_stream_.open(file);
if (input_stream_.fail()) return false;
return ReadSummary(summary);
}
/**
* Closes ifstream. Returns false if closing fails.
*/
bool Close() {
input_stream_.close();
return !input_stream_.fail();
}
/**
* Reads data from stream.
* @param data:
* pointer where data should be stored.
* @param n:
* data length.
*/
bool Read(uint8_t *data, size_t n) {
input_stream_.read(reinterpret_cast<char *>(data), n);
if (input_stream_.fail()) return false;
hasher_.Update(data, n);
return true;
}
/**
* Returns hash of read data.
*/
uint64_t hash() const { return hasher_.hash(); }
private:
/**
* Reads type T from buffer. Data is written in buffer in big endian format.
* Expected system endianness is little endian.
*/
template <typename T>
bool ReadType(T &val) {
if (!Read(reinterpret_cast<uint8_t *>(&val), sizeof(T))) return false;
// TODO: must be platform specific in the future
val = bswap(val);
return true;
}
/**
* Reads summary from the end of a file and resets hash. Method should be
* called after ifstream opening. Stream starts reading data from the
* beginning of file in the next read call. Returns false if reading fails.
* @param summary:
* reference to a summary object where summary should be written.
*/
bool ReadSummary(snapshot::Summary &summary) {
DCHECK(input_stream_.tellg() == 0)
<< "Summary should be read before other data!";
input_stream_.seekg(-static_cast<int64_t>(sizeof(snapshot::Summary)),
std::ios::end);
if (input_stream_.fail()) return false;
if (!ReadType(summary.vertex_num_) || !ReadType(summary.edge_num_) ||
!ReadType(summary.hash_))
return false;
input_stream_.seekg(0, std::ios::beg);
hasher_.Reset();
return !input_stream_.fail();
}
/**
* Used for calculating hash of read data.
*/
Hasher hasher_;
/**
* Ifstream used for reading from file.
*/
std::ifstream input_stream_;
};

View File

@ -1,91 +0,0 @@
#pragma once
#include <fstream>
#include "hasher.hpp"
#include "utils/bswap.hpp"
/**
* Buffer that writes data to file and calculates hash of written data.
* Implements template param Buffer interface from BaseEncoder class. Hash is
* incremented when Write is called. If any ofstream operation fails,
* std::ifstream::failure is thrown.
*/
class FileWriterBuffer {
public:
/**
* Constructor, initialize ofstream to throw exception on fail.
*/
FileWriterBuffer() {
output_stream_.exceptions(std::ifstream::failbit | std::ifstream::badbit);
}
/**
* Constructor which also takes a file path and opens it immediately.
*/
FileWriterBuffer(const std::string &path) : FileWriterBuffer() { Open(path); }
/**
* Opens ofstream to file given in constructor.
* @param file:
* path to ofstream file
*/
void Open(const std::string &file) {
output_stream_.open(file, std::ios::out | std::ios::binary);
}
/**
* Closes ofstream.
*/
void Close() { output_stream_.close(); }
/**
* Writes data to stream and increases hash.
* @param data:
* pointer to data.
* @param n:
* data length.
*/
void Write(const uint8_t *data, size_t n) {
hasher_.Update(data, n);
output_stream_.write(reinterpret_cast<const char *>(data), n);
}
/**
* BaseEncoder needs this method, it is not needed in this buffer.
*/
void Chunk() {}
/**
* Flushes data to stream.
*/
void Flush() { output_stream_.flush(); }
/**
* Writes summary to ofstream in big endian format. This method should be
* called when writing all other data in the file is done. Returns true if
* writing was successful.
*/
void WriteSummary(int64_t vertex_num, int64_t edge_num) {
DCHECK(vertex_num >= 0) << "Number of vertices should't be negative";
DCHECK(vertex_num >= 0) << "Number of edges should't be negative";
WriteLong(vertex_num);
WriteLong(edge_num);
WriteLong(hasher_.hash());
}
private:
/**
* Method writes uint64_t to ofstream.
*/
void WriteLong(uint64_t val) {
uint64_t bval = bswap(val);
output_stream_.write(reinterpret_cast<const char *>(&bval), sizeof(bval));
}
/**
* Stream used to write data to file.
*/
std::ofstream output_stream_;
/**
* Used to calculate hash of written data.
*/
Hasher hasher_;
};

View File

@ -0,0 +1,72 @@
#pragma once
#include <fstream>
#include "hasher.hpp"
#include "utils/bswap.hpp"
/**
* Buffer reads data from file and calculates hash of read data. Implements
* template param Buffer interface from BaseDecoder class.
*/
class HashedFileReader {
public:
/** Opens the file for reading. Returns true if successful. */
bool Open(const std::string &file) {
input_stream_.open(file, std::ios::in | std::ios::binary);
hasher_ = Hasher();
return !input_stream_.fail();
}
/** Closes ifstream. Returns false if closing fails. */
bool Close() {
input_stream_.close();
return !input_stream_.fail();
}
/**
* Reads raw data from stream.
*
* @param data - pointer to where data should be stored.
* @param n - data length.
* @param hash - If the read should be included in the hash calculation.
*/
bool Read(uint8_t *data, size_t n, bool hash = true) {
input_stream_.read(reinterpret_cast<char *>(data), n);
if (input_stream_.fail()) return false;
if (hash) hasher_.Update(data, n);
return true;
}
/**
* Reads a TValue value from the stream.
*
* @param val - The value to read into.
* @param hash - If the read should be included in the hash calculation.
* @tparam TValue - Type of value being read.
* @return - If the read was successful.
*/
template <typename TValue>
bool ReadType(TValue &val, bool hash = true) {
if (!Read(reinterpret_cast<uint8_t *>(&val), sizeof(TValue), hash))
return false;
// TODO: must be platform specific in the future
val = bswap(val);
return true;
}
void Seek(std::iostream::streamoff offset, std::ios_base::seekdir way) {
input_stream_.seekg(offset, way);
}
void Seek(std::iostream::streampos pos) { input_stream_.seekg(pos); }
auto Tellg() { return input_stream_.tellg(); }
/** Returns the hash of the data read so far from the stream. */
uint64_t hash() const { return hasher_.hash(); }
private:
Hasher hasher_;
std::ifstream input_stream_;
};

View File

@ -0,0 +1,67 @@
#pragma once
#include <fstream>
#include "hasher.hpp"
#include "utils/bswap.hpp"
/**
* Buffer that writes data to file and calculates hash of written data.
* Implements template param Buffer interface from BaseEncoder class.
*/
class HashedFileWriter {
public:
/** Constructor, initialize ofstream to throw exception on fail. */
HashedFileWriter() {
output_stream_.exceptions(std::ifstream::failbit | std::ifstream::badbit);
}
/** Constructor which also takes a file path and opens it immediately. */
HashedFileWriter(const std::string &path) : HashedFileWriter() {
output_stream_.open(path, std::ios::out | std::ios::binary);
}
/** Closes ofstream. */
void Close() { output_stream_.close(); }
/**
* Writes data to stream.
*
* @param data - Pointer to data to write.
* @param n - Data length.
* @param hash - If writing should update the hash.
* @return - True if succesful.
*/
void Write(const uint8_t *data, size_t n, bool hash = true) {
output_stream_.write(reinterpret_cast<const char *>(data), n);
if (hash) hasher_.Update(data, n);
}
/**
* Writes a TValue to the stream.
*
* @param val - The value to write.
* @param hash - If writing should update the hash.
* @return - True if succesful.
*/
template <typename TValue>
void WriteValue(const TValue &val, bool hash = true) {
TValue val_bswapped = bswap(val);
Write(reinterpret_cast<const uint8_t *>(&val_bswapped), sizeof(TValue),
hash);
}
// TODO try to remove before diff
/** Does nothing. Just for API compatibility with the bolt buffer. */
void Chunk() {}
/** Flushes data to stream. */
void Flush() { output_stream_.flush(); }
/** Returns the hash of the data written so far to the stream. */
uint64_t hash() const { return hasher_.hash(); }
private:
std::ofstream output_stream_;
Hasher hasher_;
};

View File

@ -1,37 +1,31 @@
#pragma once
#include <cstdint>
#include <cstdlib>
// TODO: implement better hash function
/**
* Class calculates hash of the data dynamically.
*/
class Hasher {
/** Prime number used in calculating hash. */
static constexpr uint64_t kPrime = 3137;
public:
Hasher() = default;
/**
* Sets hash to 0.
*/
void Reset() { hash_ = 0; }
/**
* Updates hash from given data.
* @param data data from which hash will be updated
* @param n length of the data
*
* @param data - Data from which hash will be updated.
* @param n - Length of the data.
*/
void Update(const uint8_t *data, size_t n) {
for (int i = 0; i < n; ++i) hash_ = hash_ * kPrime + data[i] + 1;
for (size_t i = 0; i < n; ++i) hash_ = hash_ * kPrime + data[i] + 1;
}
/**
* Returns current hash value.
*/
/** Returns current hash value. */
uint64_t hash() const { return hash_; }
private:
/**
* Prime number used in calculating hash.
*/
const uint64_t kPrime = 3137;
/**
* Hash of data.
*/
uint64_t hash_ = 0;
};

View File

@ -1,13 +1,27 @@
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "durability/recovery.hpp"
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/version.hpp"
#include "query/typed_value.hpp"
#include "utils/string.hpp"
#include "durability/file_reader_buffer.hpp"
#include "durability/version.hpp"
using communication::bolt::DecodedValue;
namespace durability {
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash) {
auto pos = buffer.Tellg();
auto offset = sizeof(vertex_count) + sizeof(edge_count) + sizeof(hash);
buffer.Seek(-offset, std::ios_base::end);
bool r_val = buffer.ReadType(vertex_count, false) &&
buffer.ReadType(edge_count, false) &&
buffer.ReadType(hash, false);
buffer.Seek(pos);
return r_val;
}
}
bool Recovery::Recover(const fs::path &snapshot_file,
GraphDbAccessor &db_accessor) {
if (!fs::exists(snapshot_file)) return false;
@ -27,17 +41,23 @@ bool Recovery::Recover(const fs::path &snapshot_file,
bool Recovery::Decode(const fs::path &snapshot_file,
GraphDbAccessor &db_accessor) {
FileReaderBuffer buffer;
communication::bolt::Decoder<FileReaderBuffer> decoder(buffer);
HashedFileReader buffer;
communication::bolt::Decoder<HashedFileReader> decoder(buffer);
snapshot::Summary summary;
RETURN_IF_NOT(buffer.Open(snapshot_file, summary));
RETURN_IF_NOT(buffer.Open(snapshot_file));
std::unordered_map<uint64_t, VertexAccessor> vertices;
auto magic_number = durability::kMagicNumber;
buffer.Read(magic_number.data(), magic_number.size());
RETURN_IF_NOT(magic_number == durability::kMagicNumber);
// Read the vertex and edge count, and the hash, from the end of the snapshot.
int64_t vertex_count;
int64_t edge_count;
uint64_t hash;
RETURN_IF_NOT(
durability::ReadSnapshotSummary(buffer, vertex_count, edge_count, hash));
DecodedValue dv;
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) &&
@ -59,7 +79,7 @@ bool Recovery::Decode(const fs::path &snapshot_file,
db_accessor.Property(property.ValueString()));
}
for (int64_t i = 0; i < summary.vertex_num_; ++i) {
for (int64_t i = 0; i < vertex_count; ++i) {
DecodedValue vertex_dv;
RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, DecodedValue::Type::Vertex));
auto &vertex = vertex_dv.ValueVertex();
@ -73,7 +93,7 @@ bool Recovery::Decode(const fs::path &snapshot_file,
}
vertices.insert({vertex.id, vertex_accessor});
}
for (int64_t i = 0; i < summary.edge_num_; ++i) {
for (int64_t i = 0; i < edge_count; ++i) {
DecodedValue edge_dv;
RETURN_IF_NOT(decoder.ReadValue(&edge_dv, DecodedValue::Type::Edge));
auto &edge = edge_dv.ValueEdge();
@ -88,9 +108,12 @@ bool Recovery::Decode(const fs::path &snapshot_file,
query::TypedValue(property_pair.second));
}
uint64_t hash = buffer.hash();
if (!buffer.Close()) return false;
return hash == summary.hash_;
// Vertex and edge counts are included in the hash. Re-read them to update the
// hash.
buffer.ReadType(vertex_count);
buffer.ReadType(edge_count);
return (buffer.Close() && buffer.hash() == hash);
}
#undef RETURN_IF_NOT

View File

@ -2,11 +2,20 @@
#include <experimental/filesystem>
#include <unordered_map>
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "storage/vertex_accessor.hpp"
namespace fs = std::experimental::filesystem;
namespace durability {
// TODO review: replacement of Recovery class with a function is coming in
// another diff.
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash);
}
/**
* Class used to recover database from snapshot file.
*/

View File

@ -2,13 +2,13 @@
#include <glog/logging.h>
#include "durability/snapshooter.hpp"
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/file_writer_buffer.hpp"
#include "utils/datetime/timestamp.hpp"
#include "durability/snapshooter.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/version.hpp"
#include "utils/datetime/timestamp.hpp"
bool Snapshooter::MakeSnapshot(GraphDbAccessor &db_accessor_,
const fs::path &snapshot_folder,
@ -30,11 +30,9 @@ bool Snapshooter::MakeSnapshot(GraphDbAccessor &db_accessor_,
bool Snapshooter::Encode(const fs::path &snapshot_file,
GraphDbAccessor &db_accessor_) {
try {
FileWriterBuffer buffer;
// BaseEncoder encodes graph elements.
communication::bolt::BaseEncoder<FileWriterBuffer> encoder(buffer);
HashedFileWriter buffer(snapshot_file);
communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
int64_t vertex_num = 0, edge_num = 0;
buffer.Open(snapshot_file);
encoder.WriteRAW(durability::kMagicNumber.data(),
durability::kMagicNumber.size());
@ -67,7 +65,9 @@ bool Snapshooter::Encode(const fs::path &snapshot_file,
encoder.WriteEdge(edge);
edge_num++;
}
buffer.WriteSummary(vertex_num, edge_num);
buffer.WriteValue(vertex_num);
buffer.WriteValue(edge_num);
buffer.WriteValue(buffer.hash());
buffer.Close();
} catch (const std::ifstream::failure &) {
if (fs::exists(snapshot_file) && !fs::remove(snapshot_file)) {

View File

@ -1,12 +0,0 @@
#pragma once
namespace snapshot {
/**
* Struct represents graph summary in a snapshot file.
*/
struct Summary {
int64_t vertex_num_ = 0LL;
int64_t edge_num_ = 0LL;
uint64_t hash_ = 0ULL;
};
}

View File

@ -8,5 +8,5 @@ namespace durability {
constexpr std::array<uint8_t, 4> kMagicNumber{{'M', 'G', 's', 'n'}};
// The current default version of snapshot and WAL enconding / decoding.
constexpr int64_t kVersion{1};
constexpr int64_t kVersion{2};
}

View File

@ -12,7 +12,7 @@
#include <json/json.hpp>
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "durability/file_writer_buffer.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/version.hpp"
#include "transactions/type.hpp"
#include "utils/string.hpp"
@ -70,14 +70,7 @@ DEFINE_string(config, "", "Path to config JSON file");
// Utilities for writing to the snapshot file.
class Writer {
public:
/**
* Creates a writer.
*
* @param path - Path to the output file.
* @Param indexes - A list of (label, property) indexes to create, each in the
* "Label.property" form.
*/
Writer(const std::string &path) : buffer_(path), encoder_(buffer_) {
Writer(const std::string &path) : buffer_(path) {
encoder_.WriteRAW(durability::kMagicNumber.data(),
durability::kMagicNumber.size());
encoder_.WriteTypedValue(durability::kVersion);
@ -98,7 +91,7 @@ class Writer {
encoder_.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
3);
encoder_.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
auto id = node_counter++;
auto id = node_counter_++;
encoder_.WriteInt(id);
encoder_.WriteList(
std::vector<query::TypedValue>{labels.begin(), labels.end()});
@ -123,13 +116,18 @@ class Writer {
return id;
}
void Close() { buffer_.WriteSummary(node_counter, edge_counter_); }
void Close() {
buffer_.WriteValue(node_counter_);
buffer_.WriteValue(edge_counter_);
buffer_.WriteValue(buffer_.hash());
buffer_.Close();
}
private:
int64_t node_counter{0};
int64_t node_counter_{0};
int64_t edge_counter_{0};
FileWriterBuffer buffer_;
communication::bolt::BaseEncoder<FileWriterBuffer> encoder_;
HashedFileWriter buffer_;
communication::bolt::BaseEncoder<HashedFileWriter> encoder_{buffer_};
};
// Helper class for tracking info about the generated graph.

View File

@ -7,7 +7,7 @@
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "database/dbms.hpp"
#include "durability/file_reader_buffer.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/recovery.hpp"
#include "durability/version.hpp"
@ -103,11 +103,14 @@ TEST_F(RecoveryTest, TestEncoding) {
TakeSnapshot(dbms, snapshot_max_retained_);
std::string snapshot = GetLatestSnapshot();
FileReaderBuffer buffer;
communication::bolt::Decoder<FileReaderBuffer> decoder(buffer);
HashedFileReader buffer;
communication::bolt::Decoder<HashedFileReader> decoder(buffer);
snapshot::Summary summary;
buffer.Open(snapshot, summary);
int64_t vertex_count, edge_count;
uint64_t hash;
ASSERT_TRUE(buffer.Open(snapshot));
ASSERT_TRUE(
durability::ReadSnapshotSummary(buffer, vertex_count, edge_count, hash));
auto magic_number = durability::kMagicNumber;
buffer.Read(magic_number.data(), magic_number.size());
@ -124,14 +127,14 @@ TEST_F(RecoveryTest, TestEncoding) {
std::vector<int64_t> ids;
std::vector<std::string> edge_types;
for (int i = 0; i < summary.vertex_num_; ++i) {
for (int i = 0; i < vertex_count; ++i) {
communication::bolt::DecodedValue vertex_dv;
decoder.ReadValue(&vertex_dv);
auto &vertex = vertex_dv.ValueVertex();
ids.push_back(vertex.id);
}
std::vector<int64_t> from, to;
for (int i = 0; i < summary.edge_num_; ++i) {
for (int i = 0; i < edge_count; ++i) {
communication::bolt::DecodedValue edge_dv;
decoder.ReadValue(&edge_dv);
auto &edge = edge_dv.ValueEdge();
@ -139,11 +142,15 @@ TEST_F(RecoveryTest, TestEncoding) {
to.push_back(edge.to);
edge_types.push_back(edge.type);
}
// Vertex and edge counts are included in the hash. Re-read them to update the
// hash.
buffer.ReadType(vertex_count);
buffer.ReadType(edge_count);
buffer.Close();
ASSERT_EQ(to.size(), 2U);
ASSERT_EQ(from.size(), 2U);
EXPECT_EQ(buffer.hash(), summary.hash_);
EXPECT_EQ(buffer.hash(), hash);
EXPECT_NE(edge_types.end(),
std::find(edge_types.begin(), edge_types.end(), "hates"));
EXPECT_NE(edge_types.end(),
@ -168,7 +175,7 @@ TEST_F(RecoveryTest, TestEncodingAndDecoding) {
auto dba_recover = dbms_recover.active();
Recovery recovery;
EXPECT_TRUE(recovery.Recover(snapshot, *dba_recover));
ASSERT_TRUE(recovery.Recover(snapshot, *dba_recover));
std::vector<VertexAccessor> vertices;
std::vector<EdgeAccessor> edges;

View File

@ -10,7 +10,7 @@
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "config.hpp"
#include "durability/file_writer_buffer.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/snapshooter.hpp"
#include "durability/version.hpp"
#include "utils/string.hpp"
@ -196,7 +196,7 @@ std::string GetIdSpace(const std::string &type) {
void WriteNodeRow(const std::vector<Field> &fields,
const std::vector<std::string> &row,
MemgraphNodeIdMap &node_id_map,
communication::bolt::BaseEncoder<FileWriterBuffer> &encoder) {
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
std::experimental::optional<int64_t> id;
std::vector<query::TypedValue> labels;
std::map<std::string, query::TypedValue> properties;
@ -237,7 +237,7 @@ void WriteNodeRow(const std::vector<Field> &fields,
}
auto ConvertNodes(const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
communication::bolt::BaseEncoder<FileWriterBuffer> &encoder) {
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
int64_t node_count = 0;
std::ifstream nodes_file(nodes_path);
CHECK(nodes_file) << fmt::format("Unable to open '{}'", nodes_path);
@ -257,7 +257,7 @@ auto ConvertNodes(const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
void WriteRelationshipsRow(
const std::vector<Field> &fields, const std::vector<std::string> &row,
const MemgraphNodeIdMap &node_id_map, int64_t relationship_id,
communication::bolt::BaseEncoder<FileWriterBuffer> &encoder) {
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
std::experimental::optional<int64_t> start_id;
std::experimental::optional<int64_t> end_id;
std::experimental::optional<std::string> relationship_type;
@ -302,7 +302,7 @@ void WriteRelationshipsRow(
auto ConvertRelationships(
const std::string &relationships_path, const MemgraphNodeIdMap &node_id_map,
communication::bolt::BaseEncoder<FileWriterBuffer> &encoder) {
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
int64_t relationship_count = 0;
std::ifstream relationships_file(relationships_path);
CHECK(relationships_file)
@ -325,8 +325,8 @@ void Convert(const std::vector<std::string> &nodes,
const std::vector<std::string> &relationships,
const std::string &output_path) {
try {
FileWriterBuffer buffer(output_path);
communication::bolt::BaseEncoder<FileWriterBuffer> encoder(buffer);
HashedFileWriter buffer(output_path);
communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
int64_t node_count = 0;
int64_t relationship_count = 0;
MemgraphNodeIdMap node_id_map;
@ -350,9 +350,11 @@ void Convert(const std::vector<std::string> &nodes,
relationship_count +=
ConvertRelationships(relationships_file, node_id_map, encoder);
}
buffer.WriteSummary(node_count, relationship_count);
buffer.WriteValue(node_count);
buffer.WriteValue(relationship_count);
buffer.WriteValue(buffer.hash());
} catch (const std::ios_base::failure &) {
// Only FileWriterBuffer sets the underlying fstream to throw.
// Only HashedFileWriter sets the underlying fstream to throw.
LOG(FATAL) << fmt::format("Unable to write to '{}'", output_path);
}
}