Allow inheritance of storage durability decoder
Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2810
This commit is contained in:
parent
63de0b5db4
commit
c12a87e9ca
@ -55,8 +55,26 @@ class Encoder final : public BaseEncoder {
|
||||
utils::OutputFile file_;
|
||||
};
|
||||
|
||||
/// Decoder interface class. Used to implement streams from different sources
|
||||
/// (e.g. file and network).
|
||||
class BaseDecoder {
|
||||
protected:
|
||||
~BaseDecoder() {}
|
||||
|
||||
public:
|
||||
virtual std::optional<Marker> ReadMarker() = 0;
|
||||
virtual std::optional<bool> ReadBool() = 0;
|
||||
virtual std::optional<uint64_t> ReadUint() = 0;
|
||||
virtual std::optional<double> ReadDouble() = 0;
|
||||
virtual std::optional<std::string> ReadString() = 0;
|
||||
virtual std::optional<PropertyValue> ReadPropertyValue() = 0;
|
||||
|
||||
virtual bool SkipString() = 0;
|
||||
virtual bool SkipPropertyValue() = 0;
|
||||
};
|
||||
|
||||
/// Decoder that is used to read a generated snapshot/WAL.
|
||||
class Decoder final {
|
||||
class Decoder final : public BaseDecoder {
|
||||
public:
|
||||
std::optional<uint64_t> Initialize(const std::filesystem::path &path,
|
||||
const std::string &magic);
|
||||
@ -68,15 +86,15 @@ class Decoder final {
|
||||
|
||||
std::optional<Marker> PeekMarker();
|
||||
|
||||
std::optional<Marker> ReadMarker();
|
||||
std::optional<bool> ReadBool();
|
||||
std::optional<uint64_t> ReadUint();
|
||||
std::optional<double> ReadDouble();
|
||||
std::optional<std::string> ReadString();
|
||||
std::optional<PropertyValue> ReadPropertyValue();
|
||||
std::optional<Marker> ReadMarker() override;
|
||||
std::optional<bool> ReadBool() override;
|
||||
std::optional<uint64_t> ReadUint() override;
|
||||
std::optional<double> ReadDouble() override;
|
||||
std::optional<std::string> ReadString() override;
|
||||
std::optional<PropertyValue> ReadPropertyValue() override;
|
||||
|
||||
bool SkipString();
|
||||
bool SkipPropertyValue();
|
||||
bool SkipString() override;
|
||||
bool SkipPropertyValue() override;
|
||||
|
||||
std::optional<uint64_t> GetSize();
|
||||
std::optional<uint64_t> GetPosition();
|
||||
|
@ -211,69 +211,69 @@ bool IsWalDeltaDataTypeTransactionEnd(WalDeltaData::Type type) {
|
||||
// be used.
|
||||
// @throw RecoveryFailure
|
||||
template <bool read_data>
|
||||
WalDeltaData ReadSkipWalDeltaData(Decoder *wal) {
|
||||
WalDeltaData ReadSkipWalDeltaData(BaseDecoder *decoder) {
|
||||
WalDeltaData delta;
|
||||
|
||||
auto action = wal->ReadMarker();
|
||||
auto action = decoder->ReadMarker();
|
||||
if (!action) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.type = MarkerToWalDeltaDataType(*action);
|
||||
|
||||
switch (delta.type) {
|
||||
case WalDeltaData::Type::VERTEX_CREATE:
|
||||
case WalDeltaData::Type::VERTEX_DELETE: {
|
||||
auto gid = wal->ReadUint();
|
||||
auto gid = decoder->ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_create_delete.gid = Gid::FromUint(*gid);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_ADD_LABEL:
|
||||
case WalDeltaData::Type::VERTEX_REMOVE_LABEL: {
|
||||
auto gid = wal->ReadUint();
|
||||
auto gid = decoder->ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_add_remove_label.gid = Gid::FromUint(*gid);
|
||||
if constexpr (read_data) {
|
||||
auto label = wal->ReadString();
|
||||
auto label = decoder->ReadString();
|
||||
if (!label) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_add_remove_label.label = std::move(*label);
|
||||
} else {
|
||||
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_SET_PROPERTY:
|
||||
case WalDeltaData::Type::EDGE_SET_PROPERTY: {
|
||||
auto gid = wal->ReadUint();
|
||||
auto gid = decoder->ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_edge_set_property.gid = Gid::FromUint(*gid);
|
||||
if constexpr (read_data) {
|
||||
auto property = wal->ReadString();
|
||||
auto property = decoder->ReadString();
|
||||
if (!property) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_edge_set_property.property = std::move(*property);
|
||||
auto value = wal->ReadPropertyValue();
|
||||
auto value = decoder->ReadPropertyValue();
|
||||
if (!value) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_edge_set_property.value = std::move(*value);
|
||||
} else {
|
||||
if (!wal->SkipString() || !wal->SkipPropertyValue())
|
||||
if (!decoder->SkipString() || !decoder->SkipPropertyValue())
|
||||
throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_CREATE:
|
||||
case WalDeltaData::Type::EDGE_DELETE: {
|
||||
auto gid = wal->ReadUint();
|
||||
auto gid = decoder->ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.edge_create_delete.gid = Gid::FromUint(*gid);
|
||||
if constexpr (read_data) {
|
||||
auto edge_type = wal->ReadString();
|
||||
auto edge_type = decoder->ReadString();
|
||||
if (!edge_type) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.edge_create_delete.edge_type = std::move(*edge_type);
|
||||
} else {
|
||||
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
auto from_gid = wal->ReadUint();
|
||||
auto from_gid = decoder->ReadUint();
|
||||
if (!from_gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.edge_create_delete.from_vertex = Gid::FromUint(*from_gid);
|
||||
auto to_gid = wal->ReadUint();
|
||||
auto to_gid = decoder->ReadUint();
|
||||
if (!to_gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.edge_create_delete.to_vertex = Gid::FromUint(*to_gid);
|
||||
break;
|
||||
@ -283,11 +283,11 @@ WalDeltaData ReadSkipWalDeltaData(Decoder *wal) {
|
||||
case WalDeltaData::Type::LABEL_INDEX_CREATE:
|
||||
case WalDeltaData::Type::LABEL_INDEX_DROP: {
|
||||
if constexpr (read_data) {
|
||||
auto label = wal->ReadString();
|
||||
auto label = decoder->ReadString();
|
||||
if (!label) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.operation_label.label = std::move(*label);
|
||||
} else {
|
||||
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -296,14 +296,14 @@ WalDeltaData ReadSkipWalDeltaData(Decoder *wal) {
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: {
|
||||
if constexpr (read_data) {
|
||||
auto label = wal->ReadString();
|
||||
auto label = decoder->ReadString();
|
||||
if (!label) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.operation_label_property.label = std::move(*label);
|
||||
auto property = wal->ReadString();
|
||||
auto property = decoder->ReadString();
|
||||
if (!property) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.operation_label_property.property = std::move(*property);
|
||||
} else {
|
||||
if (!wal->SkipString() || !wal->SkipString())
|
||||
if (!decoder->SkipString() || !decoder->SkipString())
|
||||
throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
break;
|
||||
@ -311,23 +311,24 @@ WalDeltaData ReadSkipWalDeltaData(Decoder *wal) {
|
||||
case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE:
|
||||
case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: {
|
||||
if constexpr (read_data) {
|
||||
auto label = wal->ReadString();
|
||||
auto label = decoder->ReadString();
|
||||
if (!label) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.operation_label_properties.label = std::move(*label);
|
||||
auto properties_count = wal->ReadUint();
|
||||
auto properties_count = decoder->ReadUint();
|
||||
if (!properties_count) throw RecoveryFailure("Invalid WAL data!");
|
||||
for (uint64_t i = 0; i < *properties_count; ++i) {
|
||||
auto property = wal->ReadString();
|
||||
auto property = decoder->ReadString();
|
||||
if (!property) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.operation_label_properties.properties.emplace(
|
||||
std::move(*property));
|
||||
}
|
||||
} else {
|
||||
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
auto properties_count = wal->ReadUint();
|
||||
if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
auto properties_count = decoder->ReadUint();
|
||||
if (!properties_count) throw RecoveryFailure("Invalid WAL data!");
|
||||
for (uint64_t i = 0; i < *properties_count; ++i) {
|
||||
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
if (!decoder->SkipString())
|
||||
throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -493,26 +494,26 @@ bool operator!=(const WalDeltaData &a, const WalDeltaData &b) {
|
||||
|
||||
// Function used to read the WAL delta header. The function returns the delta
|
||||
// timestamp.
|
||||
uint64_t ReadWalDeltaHeader(Decoder *wal) {
|
||||
auto marker = wal->ReadMarker();
|
||||
uint64_t ReadWalDeltaHeader(BaseDecoder *decoder) {
|
||||
auto marker = decoder->ReadMarker();
|
||||
if (!marker || *marker != Marker::SECTION_DELTA)
|
||||
throw RecoveryFailure("Invalid WAL data!");
|
||||
|
||||
auto timestamp = wal->ReadUint();
|
||||
auto timestamp = decoder->ReadUint();
|
||||
if (!timestamp) throw RecoveryFailure("Invalid WAL data!");
|
||||
return *timestamp;
|
||||
}
|
||||
|
||||
// Function used to read the current WAL delta data. The WAL delta header must
|
||||
// be read before calling this function.
|
||||
WalDeltaData ReadWalDeltaData(Decoder *wal) {
|
||||
return ReadSkipWalDeltaData<true>(wal);
|
||||
WalDeltaData ReadWalDeltaData(BaseDecoder *decoder) {
|
||||
return ReadSkipWalDeltaData<true>(decoder);
|
||||
}
|
||||
|
||||
// Function used to skip the current WAL delta data. The WAL delta header must
|
||||
// be read before calling this function.
|
||||
WalDeltaData::Type SkipWalDeltaData(Decoder *wal) {
|
||||
auto delta = ReadSkipWalDeltaData<false>(wal);
|
||||
WalDeltaData::Type SkipWalDeltaData(BaseDecoder *decoder) {
|
||||
auto delta = ReadSkipWalDeltaData<false>(decoder);
|
||||
return delta.type;
|
||||
}
|
||||
|
||||
|
@ -113,19 +113,19 @@ WalInfo ReadWalInfo(const std::filesystem::path &path);
|
||||
/// Function used to read the WAL delta header. The function returns the delta
|
||||
/// timestamp.
|
||||
/// @throw RecoveryFailure
|
||||
uint64_t ReadWalDeltaHeader(Decoder *wal);
|
||||
uint64_t ReadWalDeltaHeader(BaseDecoder *decoder);
|
||||
|
||||
/// Function used to read the current WAL delta data. The function returns the
|
||||
/// read delta data. The WAL delta header must be read before calling this
|
||||
/// function.
|
||||
/// @throw RecoveryFailure
|
||||
WalDeltaData ReadWalDeltaData(Decoder *wal);
|
||||
WalDeltaData ReadWalDeltaData(BaseDecoder *decoder);
|
||||
|
||||
/// Function used to skip the current WAL delta data. The function returns the
|
||||
/// skipped delta type. The WAL delta header must be read before calling this
|
||||
/// function.
|
||||
/// @throw RecoveryFailure
|
||||
WalDeltaData::Type SkipWalDeltaData(Decoder *wal);
|
||||
WalDeltaData::Type SkipWalDeltaData(BaseDecoder *decoder);
|
||||
|
||||
/// Function used to encode a `Delta` that originated from a `Vertex`.
|
||||
void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
|
||||
|
Loading…
Reference in New Issue
Block a user