Allow inheritance of storage durability encoder
Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2809
This commit is contained in:
parent
dd9180da32
commit
63de0b5db4
@ -12,8 +12,23 @@
|
||||
|
||||
namespace storage::durability {
|
||||
|
||||
/// Encoder interface class. Used to implement streams to different targets
|
||||
/// (e.g. file and network).
|
||||
class BaseEncoder {
|
||||
protected:
|
||||
~BaseEncoder() {}
|
||||
|
||||
public:
|
||||
virtual void WriteMarker(Marker marker) = 0;
|
||||
virtual void WriteBool(bool value) = 0;
|
||||
virtual void WriteUint(uint64_t value) = 0;
|
||||
virtual void WriteDouble(double value) = 0;
|
||||
virtual void WriteString(const std::string_view &value) = 0;
|
||||
virtual void WritePropertyValue(const PropertyValue &value) = 0;
|
||||
};
|
||||
|
||||
/// Encoder that is used to generate a snapshot/WAL.
|
||||
class Encoder final {
|
||||
class Encoder final : public BaseEncoder {
|
||||
public:
|
||||
void Initialize(const std::filesystem::path &path,
|
||||
const std::string_view &magic, uint64_t version);
|
||||
@ -22,12 +37,12 @@ class Encoder final {
|
||||
// directly.
|
||||
void Write(const uint8_t *data, uint64_t size);
|
||||
|
||||
void WriteMarker(Marker marker);
|
||||
void WriteBool(bool value);
|
||||
void WriteUint(uint64_t value);
|
||||
void WriteDouble(double value);
|
||||
void WriteString(const std::string_view &value);
|
||||
void WritePropertyValue(const PropertyValue &value);
|
||||
void WriteMarker(Marker marker) override;
|
||||
void WriteBool(bool value) override;
|
||||
void WriteUint(uint64_t value) override;
|
||||
void WriteDouble(double value) override;
|
||||
void WriteString(const std::string_view &value) override;
|
||||
void WritePropertyValue(const PropertyValue &value) override;
|
||||
|
||||
uint64_t GetPosition();
|
||||
void SetPosition(uint64_t position);
|
||||
|
@ -516,6 +516,151 @@ WalDeltaData::Type SkipWalDeltaData(Decoder *wal) {
|
||||
return delta.type;
|
||||
}
|
||||
|
||||
void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
|
||||
Config::Items items, const Delta &delta, const Vertex &vertex,
|
||||
uint64_t timestamp) {
|
||||
// When converting a Delta to a WAL delta the logic is inverted. That is
|
||||
// because the Delta's represent undo actions and we want to store redo
|
||||
// actions.
|
||||
encoder->WriteMarker(Marker::SECTION_DELTA);
|
||||
encoder->WriteUint(timestamp);
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
switch (delta.action) {
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
encoder->WriteMarker(VertexActionToMarker(delta.action));
|
||||
encoder->WriteUint(vertex.gid.AsUint());
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
encoder->WriteMarker(Marker::DELTA_VERTEX_SET_PROPERTY);
|
||||
encoder->WriteUint(vertex.gid.AsUint());
|
||||
encoder->WriteString(
|
||||
name_id_mapper->IdToName(delta.property.key.AsUint()));
|
||||
// The property value is the value that is currently stored in the
|
||||
// vertex.
|
||||
// TODO (mferencevic): Mitigate the memory allocation introduced here
|
||||
// (with the `GetProperty` call). It is the only memory allocation in the
|
||||
// entire WAL file writing logic.
|
||||
encoder->WritePropertyValue(
|
||||
vertex.properties.GetProperty(delta.property.key));
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL: {
|
||||
encoder->WriteMarker(VertexActionToMarker(delta.action));
|
||||
encoder->WriteUint(vertex.gid.AsUint());
|
||||
encoder->WriteString(name_id_mapper->IdToName(delta.label.AsUint()));
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
encoder->WriteMarker(VertexActionToMarker(delta.action));
|
||||
if (items.properties_on_edges) {
|
||||
encoder->WriteUint(delta.vertex_edge.edge.ptr->gid.AsUint());
|
||||
} else {
|
||||
encoder->WriteUint(delta.vertex_edge.edge.gid.AsUint());
|
||||
}
|
||||
encoder->WriteString(
|
||||
name_id_mapper->IdToName(delta.vertex_edge.edge_type.AsUint()));
|
||||
encoder->WriteUint(vertex.gid.AsUint());
|
||||
encoder->WriteUint(delta.vertex_edge.vertex->gid.AsUint());
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
// These actions are already encoded in the *_OUT_EDGE actions. This
|
||||
// function should never be called for this type of deltas.
|
||||
LOG(FATAL) << "Invalid delta action!";
|
||||
}
|
||||
}
|
||||
|
||||
void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
|
||||
const Delta &delta, const Edge &edge, uint64_t timestamp) {
|
||||
// When converting a Delta to a WAL delta the logic is inverted. That is
|
||||
// because the Delta's represent undo actions and we want to store redo
|
||||
// actions.
|
||||
encoder->WriteMarker(Marker::SECTION_DELTA);
|
||||
encoder->WriteUint(timestamp);
|
||||
std::lock_guard<utils::SpinLock> guard(edge.lock);
|
||||
switch (delta.action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
encoder->WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY);
|
||||
encoder->WriteUint(edge.gid.AsUint());
|
||||
encoder->WriteString(
|
||||
name_id_mapper->IdToName(delta.property.key.AsUint()));
|
||||
// The property value is the value that is currently stored in the
|
||||
// edge.
|
||||
// TODO (mferencevic): Mitigate the memory allocation introduced here
|
||||
// (with the `GetProperty` call). It is the only memory allocation in the
|
||||
// entire WAL file writing logic.
|
||||
encoder->WritePropertyValue(
|
||||
edge.properties.GetProperty(delta.property.key));
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
// These actions are already encoded in vertex *_OUT_EDGE actions. Also,
|
||||
// these deltas don't contain any information about the from vertex, to
|
||||
// vertex or edge type so they are useless. This function should never
|
||||
// be called for this type of deltas.
|
||||
LOG(FATAL) << "Invalid delta action!";
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
// These deltas shouldn't appear for edges.
|
||||
LOG(FATAL) << "Invalid database state!";
|
||||
}
|
||||
}
|
||||
|
||||
void EncodeTransactionEnd(BaseEncoder *encoder, uint64_t timestamp) {
|
||||
encoder->WriteMarker(Marker::SECTION_DELTA);
|
||||
encoder->WriteUint(timestamp);
|
||||
encoder->WriteMarker(Marker::DELTA_TRANSACTION_END);
|
||||
}
|
||||
|
||||
void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
|
||||
StorageGlobalOperation operation, LabelId label,
|
||||
const std::set<PropertyId> &properties,
|
||||
uint64_t timestamp) {
|
||||
encoder->WriteMarker(Marker::SECTION_DELTA);
|
||||
encoder->WriteUint(timestamp);
|
||||
switch (operation) {
|
||||
case StorageGlobalOperation::LABEL_INDEX_CREATE:
|
||||
case StorageGlobalOperation::LABEL_INDEX_DROP: {
|
||||
CHECK(properties.empty()) << "Invalid function call!";
|
||||
encoder->WriteMarker(OperationToMarker(operation));
|
||||
encoder->WriteString(name_id_mapper->IdToName(label.AsUint()));
|
||||
break;
|
||||
}
|
||||
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
|
||||
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
|
||||
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
|
||||
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP: {
|
||||
CHECK(properties.size() == 1) << "Invalid function call!";
|
||||
encoder->WriteMarker(OperationToMarker(operation));
|
||||
encoder->WriteString(name_id_mapper->IdToName(label.AsUint()));
|
||||
encoder->WriteString(
|
||||
name_id_mapper->IdToName((*properties.begin()).AsUint()));
|
||||
break;
|
||||
}
|
||||
case StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
|
||||
case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP: {
|
||||
CHECK(!properties.empty()) << "Invalid function call!";
|
||||
encoder->WriteMarker(OperationToMarker(operation));
|
||||
encoder->WriteString(name_id_mapper->IdToName(label.AsUint()));
|
||||
encoder->WriteUint(properties.size());
|
||||
for (const auto &property : properties) {
|
||||
encoder->WriteString(name_id_mapper->IdToName(property.AsUint()));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RecoveryInfo LoadWal(const std::filesystem::path &path,
|
||||
RecoveredIndicesAndConstraints *indices_constraints,
|
||||
std::optional<uint64_t> snapshot_timestamp,
|
||||
@ -884,145 +1029,26 @@ WalFile::~WalFile() {
|
||||
|
||||
void WalFile::AppendDelta(const Delta &delta, const Vertex &vertex,
|
||||
uint64_t timestamp) {
|
||||
// When converting a Delta to a WAL delta the logic is inverted. That is
|
||||
// because the Delta's represent undo actions and we want to store redo
|
||||
// actions.
|
||||
wal_.WriteMarker(Marker::SECTION_DELTA);
|
||||
wal_.WriteUint(timestamp);
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
switch (delta.action) {
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
wal_.WriteMarker(VertexActionToMarker(delta.action));
|
||||
wal_.WriteUint(vertex.gid.AsUint());
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
wal_.WriteMarker(Marker::DELTA_VERTEX_SET_PROPERTY);
|
||||
wal_.WriteUint(vertex.gid.AsUint());
|
||||
wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint()));
|
||||
// The property value is the value that is currently stored in the
|
||||
// vertex.
|
||||
// TODO (mferencevic): Mitigate the memory allocation introduced here
|
||||
// (with the `GetProperty` call). It is the only memory allocation in the
|
||||
// entire WAL file writing logic.
|
||||
wal_.WritePropertyValue(
|
||||
vertex.properties.GetProperty(delta.property.key));
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL: {
|
||||
wal_.WriteMarker(VertexActionToMarker(delta.action));
|
||||
wal_.WriteUint(vertex.gid.AsUint());
|
||||
wal_.WriteString(name_id_mapper_->IdToName(delta.label.AsUint()));
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
wal_.WriteMarker(VertexActionToMarker(delta.action));
|
||||
if (items_.properties_on_edges) {
|
||||
wal_.WriteUint(delta.vertex_edge.edge.ptr->gid.AsUint());
|
||||
} else {
|
||||
wal_.WriteUint(delta.vertex_edge.edge.gid.AsUint());
|
||||
}
|
||||
wal_.WriteString(
|
||||
name_id_mapper_->IdToName(delta.vertex_edge.edge_type.AsUint()));
|
||||
wal_.WriteUint(vertex.gid.AsUint());
|
||||
wal_.WriteUint(delta.vertex_edge.vertex->gid.AsUint());
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
// These actions are already encoded in the *_OUT_EDGE actions. This
|
||||
// function should never be called for this type of deltas.
|
||||
LOG(FATAL) << "Invalid delta action!";
|
||||
}
|
||||
EncodeDelta(&wal_, name_id_mapper_, items_, delta, vertex, timestamp);
|
||||
UpdateStats(timestamp);
|
||||
}
|
||||
|
||||
void WalFile::AppendDelta(const Delta &delta, const Edge &edge,
|
||||
uint64_t timestamp) {
|
||||
// When converting a Delta to a WAL delta the logic is inverted. That is
|
||||
// because the Delta's represent undo actions and we want to store redo
|
||||
// actions.
|
||||
wal_.WriteMarker(Marker::SECTION_DELTA);
|
||||
wal_.WriteUint(timestamp);
|
||||
std::lock_guard<utils::SpinLock> guard(edge.lock);
|
||||
switch (delta.action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
wal_.WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY);
|
||||
wal_.WriteUint(edge.gid.AsUint());
|
||||
wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint()));
|
||||
// The property value is the value that is currently stored in the
|
||||
// edge.
|
||||
// TODO (mferencevic): Mitigate the memory allocation introduced here
|
||||
// (with the `GetProperty` call). It is the only memory allocation in the
|
||||
// entire WAL file writing logic.
|
||||
wal_.WritePropertyValue(edge.properties.GetProperty(delta.property.key));
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
// These actions are already encoded in vertex *_OUT_EDGE actions. Also,
|
||||
// these deltas don't contain any information about the from vertex, to
|
||||
// vertex or edge type so they are useless. This function should never
|
||||
// be called for this type of deltas.
|
||||
LOG(FATAL) << "Invalid delta action!";
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
// These deltas shouldn't appear for edges.
|
||||
LOG(FATAL) << "Invalid database state!";
|
||||
}
|
||||
EncodeDelta(&wal_, name_id_mapper_, delta, edge, timestamp);
|
||||
UpdateStats(timestamp);
|
||||
}
|
||||
|
||||
void WalFile::AppendTransactionEnd(uint64_t timestamp) {
|
||||
wal_.WriteMarker(Marker::SECTION_DELTA);
|
||||
wal_.WriteUint(timestamp);
|
||||
wal_.WriteMarker(Marker::DELTA_TRANSACTION_END);
|
||||
EncodeTransactionEnd(&wal_, timestamp);
|
||||
UpdateStats(timestamp);
|
||||
}
|
||||
|
||||
void WalFile::AppendOperation(StorageGlobalOperation operation, LabelId label,
|
||||
const std::set<PropertyId> &properties,
|
||||
uint64_t timestamp) {
|
||||
wal_.WriteMarker(Marker::SECTION_DELTA);
|
||||
wal_.WriteUint(timestamp);
|
||||
switch (operation) {
|
||||
case StorageGlobalOperation::LABEL_INDEX_CREATE:
|
||||
case StorageGlobalOperation::LABEL_INDEX_DROP: {
|
||||
CHECK(properties.empty()) << "Invalid function call!";
|
||||
wal_.WriteMarker(OperationToMarker(operation));
|
||||
wal_.WriteString(name_id_mapper_->IdToName(label.AsUint()));
|
||||
break;
|
||||
}
|
||||
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
|
||||
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
|
||||
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
|
||||
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP: {
|
||||
CHECK(properties.size() == 1) << "Invalid function call!";
|
||||
wal_.WriteMarker(OperationToMarker(operation));
|
||||
wal_.WriteString(name_id_mapper_->IdToName(label.AsUint()));
|
||||
wal_.WriteString(
|
||||
name_id_mapper_->IdToName((*properties.begin()).AsUint()));
|
||||
break;
|
||||
}
|
||||
case StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
|
||||
case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP: {
|
||||
CHECK(!properties.empty()) << "Invalid function call!";
|
||||
wal_.WriteMarker(OperationToMarker(operation));
|
||||
wal_.WriteString(name_id_mapper_->IdToName(label.AsUint()));
|
||||
wal_.WriteUint(properties.size());
|
||||
for (const auto &property : properties) {
|
||||
wal_.WriteString(name_id_mapper_->IdToName(property.AsUint()));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
EncodeOperation(&wal_, name_id_mapper_, operation, label, properties,
|
||||
timestamp);
|
||||
UpdateStats(timestamp);
|
||||
}
|
||||
|
||||
|
@ -127,6 +127,24 @@ WalDeltaData ReadWalDeltaData(Decoder *wal);
|
||||
/// @throw RecoveryFailure
|
||||
WalDeltaData::Type SkipWalDeltaData(Decoder *wal);
|
||||
|
||||
/// Function used to encode a `Delta` that originated from a `Vertex`.
|
||||
void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
|
||||
Config::Items items, const Delta &delta, const Vertex &vertex,
|
||||
uint64_t timestamp);
|
||||
|
||||
/// Function used to encode a `Delta` that originated from an `Edge`.
|
||||
void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
|
||||
const Delta &delta, const Edge &edge, uint64_t timestamp);
|
||||
|
||||
/// Function used to encode the transaction end.
|
||||
void EncodeTransactionEnd(BaseEncoder *encoder, uint64_t timestamp);
|
||||
|
||||
/// Function used to encode non-transactional operation.
|
||||
void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
|
||||
StorageGlobalOperation operation, LabelId label,
|
||||
const std::set<PropertyId> &properties,
|
||||
uint64_t timestamp);
|
||||
|
||||
/// Function used to load the WAL data into the storage.
|
||||
/// @throw RecoveryFailure
|
||||
RecoveryInfo LoadWal(const std::filesystem::path &path,
|
||||
|
Loading…
Reference in New Issue
Block a user