Implement single function for WAL loading
Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2513
This commit is contained in:
parent
4c6eb0746e
commit
2e0586e182
@ -434,10 +434,6 @@ bool Decoder::SetPosition(uint64_t position) {
|
||||
/////////////////////////////
|
||||
|
||||
namespace {
|
||||
// Magic values written to the start of a snapshot/WAL file to identify it.
|
||||
const std::string kSnapshotMagic{"MGsn"};
|
||||
const std::string kWalMagic{"MGwl"};
|
||||
|
||||
// The current version of snapshot and WAL encoding / decoding.
|
||||
// IMPORTANT: Please bump this version for every snapshot and/or WAL format
|
||||
// change!!!
|
||||
@ -614,6 +610,204 @@ Marker VertexActionToMarker(Delta::Action action) {
|
||||
return Marker::DELTA_EDGE_CREATE;
|
||||
}
|
||||
}
|
||||
|
||||
// This function convertes a Marker to a WalDeltaData::Type. It checks for the
|
||||
// validity of the marker and throws if an invalid marker is specified.
|
||||
// @throw RecoveryFailure
|
||||
WalDeltaData::Type MarkerToWalDeltaDataType(Marker marker) {
|
||||
switch (marker) {
|
||||
case Marker::DELTA_VERTEX_CREATE:
|
||||
return WalDeltaData::Type::VERTEX_CREATE;
|
||||
case Marker::DELTA_VERTEX_DELETE:
|
||||
return WalDeltaData::Type::VERTEX_DELETE;
|
||||
case Marker::DELTA_VERTEX_ADD_LABEL:
|
||||
return WalDeltaData::Type::VERTEX_ADD_LABEL;
|
||||
case Marker::DELTA_VERTEX_REMOVE_LABEL:
|
||||
return WalDeltaData::Type::VERTEX_REMOVE_LABEL;
|
||||
case Marker::DELTA_EDGE_CREATE:
|
||||
return WalDeltaData::Type::EDGE_CREATE;
|
||||
case Marker::DELTA_EDGE_DELETE:
|
||||
return WalDeltaData::Type::EDGE_DELETE;
|
||||
case Marker::DELTA_VERTEX_SET_PROPERTY:
|
||||
return WalDeltaData::Type::VERTEX_SET_PROPERTY;
|
||||
case Marker::DELTA_EDGE_SET_PROPERTY:
|
||||
return WalDeltaData::Type::EDGE_SET_PROPERTY;
|
||||
case Marker::DELTA_TRANSACTION_END:
|
||||
return WalDeltaData::Type::TRANSACTION_END;
|
||||
case Marker::DELTA_LABEL_INDEX_CREATE:
|
||||
return WalDeltaData::Type::LABEL_INDEX_CREATE;
|
||||
case Marker::DELTA_LABEL_INDEX_DROP:
|
||||
return WalDeltaData::Type::LABEL_INDEX_DROP;
|
||||
case Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE:
|
||||
return WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE;
|
||||
case Marker::DELTA_LABEL_PROPERTY_INDEX_DROP:
|
||||
return WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP;
|
||||
case Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE:
|
||||
return WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE;
|
||||
case Marker::DELTA_EXISTENCE_CONSTRAINT_DROP:
|
||||
return WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP;
|
||||
|
||||
case Marker::TYPE_NULL:
|
||||
case Marker::TYPE_BOOL:
|
||||
case Marker::TYPE_INT:
|
||||
case Marker::TYPE_DOUBLE:
|
||||
case Marker::TYPE_STRING:
|
||||
case Marker::TYPE_LIST:
|
||||
case Marker::TYPE_MAP:
|
||||
case Marker::TYPE_PROPERTY_VALUE:
|
||||
case Marker::SECTION_VERTEX:
|
||||
case Marker::SECTION_EDGE:
|
||||
case Marker::SECTION_MAPPER:
|
||||
case Marker::SECTION_METADATA:
|
||||
case Marker::SECTION_INDICES:
|
||||
case Marker::SECTION_CONSTRAINTS:
|
||||
case Marker::SECTION_DELTA:
|
||||
case Marker::SECTION_OFFSETS:
|
||||
case Marker::VALUE_FALSE:
|
||||
case Marker::VALUE_TRUE:
|
||||
throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
}
|
||||
|
||||
bool IsWalDeltaDataTypeTransactionEnd(WalDeltaData::Type type) {
|
||||
switch (type) {
|
||||
// These delta actions are all found inside transactions so they don't
|
||||
// indicate a transaction end.
|
||||
case WalDeltaData::Type::VERTEX_CREATE:
|
||||
case WalDeltaData::Type::VERTEX_DELETE:
|
||||
case WalDeltaData::Type::VERTEX_ADD_LABEL:
|
||||
case WalDeltaData::Type::VERTEX_REMOVE_LABEL:
|
||||
case WalDeltaData::Type::EDGE_CREATE:
|
||||
case WalDeltaData::Type::EDGE_DELETE:
|
||||
case WalDeltaData::Type::VERTEX_SET_PROPERTY:
|
||||
case WalDeltaData::Type::EDGE_SET_PROPERTY:
|
||||
return false;
|
||||
|
||||
// This delta explicitly indicates that a transaction is done.
|
||||
case WalDeltaData::Type::TRANSACTION_END:
|
||||
return true;
|
||||
|
||||
// These operations aren't transactional and they are encoded only using
|
||||
// a single delta, so they each individually mark the end of their
|
||||
// 'transaction'.
|
||||
case WalDeltaData::Type::LABEL_INDEX_CREATE:
|
||||
case WalDeltaData::Type::LABEL_INDEX_DROP:
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE:
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP:
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Function used to either read or skip the current WAL delta data. The WAL
|
||||
// delta header must be read before calling this function. If the delta data is
|
||||
// read then the data returned is valid, if the delta data is skipped then the
|
||||
// returned data is not guaranteed to be set (it could be empty) and shouldn't
|
||||
// be used.
|
||||
// @throw RecoveryFailure
|
||||
template <bool read_data>
|
||||
WalDeltaData ReadSkipWalDeltaData(Decoder *wal) {
|
||||
WalDeltaData delta;
|
||||
|
||||
auto action = wal->ReadMarker();
|
||||
if (!action) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.type = MarkerToWalDeltaDataType(*action);
|
||||
|
||||
switch (delta.type) {
|
||||
case WalDeltaData::Type::VERTEX_CREATE:
|
||||
case WalDeltaData::Type::VERTEX_DELETE: {
|
||||
auto gid = wal->ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_create_delete.gid = Gid::FromUint(*gid);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_ADD_LABEL:
|
||||
case WalDeltaData::Type::VERTEX_REMOVE_LABEL: {
|
||||
auto gid = wal->ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_add_remove_label.gid = Gid::FromUint(*gid);
|
||||
if constexpr (read_data) {
|
||||
auto label = wal->ReadString();
|
||||
if (!label) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_add_remove_label.label = std::move(*label);
|
||||
} else {
|
||||
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_SET_PROPERTY:
|
||||
case WalDeltaData::Type::EDGE_SET_PROPERTY: {
|
||||
auto gid = wal->ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_edge_set_property.gid = Gid::FromUint(*gid);
|
||||
if constexpr (read_data) {
|
||||
auto property = wal->ReadString();
|
||||
if (!property) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_edge_set_property.property = std::move(*property);
|
||||
auto value = wal->ReadPropertyValue();
|
||||
if (!value) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.vertex_edge_set_property.value = std::move(*value);
|
||||
} else {
|
||||
if (!wal->SkipString() || !wal->SkipPropertyValue())
|
||||
throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_CREATE:
|
||||
case WalDeltaData::Type::EDGE_DELETE: {
|
||||
auto gid = wal->ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.edge_create_delete.gid = Gid::FromUint(*gid);
|
||||
if constexpr (read_data) {
|
||||
auto edge_type = wal->ReadString();
|
||||
if (!edge_type) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.edge_create_delete.edge_type = std::move(*edge_type);
|
||||
} else {
|
||||
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
auto from_gid = wal->ReadUint();
|
||||
if (!from_gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.edge_create_delete.from_vertex = Gid::FromUint(*from_gid);
|
||||
auto to_gid = wal->ReadUint();
|
||||
if (!to_gid) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.edge_create_delete.to_vertex = Gid::FromUint(*to_gid);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::TRANSACTION_END:
|
||||
break;
|
||||
case WalDeltaData::Type::LABEL_INDEX_CREATE:
|
||||
case WalDeltaData::Type::LABEL_INDEX_DROP: {
|
||||
if constexpr (read_data) {
|
||||
auto label = wal->ReadString();
|
||||
if (!label) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.operation_label.label = std::move(*label);
|
||||
} else {
|
||||
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE:
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP:
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: {
|
||||
if constexpr (read_data) {
|
||||
auto label = wal->ReadString();
|
||||
if (!label) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.operation_label_property.label = std::move(*label);
|
||||
auto property = wal->ReadString();
|
||||
if (!property) throw RecoveryFailure("Invalid WAL data!");
|
||||
delta.operation_label_property.property = std::move(*property);
|
||||
} else {
|
||||
if (!wal->SkipString() || !wal->SkipString())
|
||||
throw RecoveryFailure("Invalid WAL data!");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return delta;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
// Function used to read information about the snapshot file.
|
||||
@ -737,87 +931,13 @@ WalInfo ReadWalInfo(const std::filesystem::path &path) {
|
||||
// Read deltas.
|
||||
info.num_deltas = 0;
|
||||
auto validate_delta = [&wal]() -> std::optional<std::pair<uint64_t, bool>> {
|
||||
auto marker = wal.ReadMarker();
|
||||
if (!marker || *marker != Marker::SECTION_DELTA) return std::nullopt;
|
||||
|
||||
auto timestamp = wal.ReadUint();
|
||||
if (!timestamp) return std::nullopt;
|
||||
|
||||
auto action = wal.ReadMarker();
|
||||
if (!action) return std::nullopt;
|
||||
|
||||
bool is_transaction_end;
|
||||
switch (*action) {
|
||||
// These delta actions are all found inside transactions so they don't
|
||||
// indicate a transaction end.
|
||||
case Marker::DELTA_VERTEX_CREATE:
|
||||
case Marker::DELTA_VERTEX_DELETE:
|
||||
if (!wal.ReadUint()) return std::nullopt;
|
||||
is_transaction_end = false;
|
||||
break;
|
||||
case Marker::DELTA_VERTEX_ADD_LABEL:
|
||||
case Marker::DELTA_VERTEX_REMOVE_LABEL:
|
||||
if (!wal.ReadUint() || !wal.SkipString()) return std::nullopt;
|
||||
is_transaction_end = false;
|
||||
break;
|
||||
case Marker::DELTA_EDGE_CREATE:
|
||||
case Marker::DELTA_EDGE_DELETE:
|
||||
if (!wal.ReadUint() || !wal.SkipString() || !wal.ReadUint() ||
|
||||
!wal.ReadUint())
|
||||
return std::nullopt;
|
||||
is_transaction_end = false;
|
||||
break;
|
||||
case Marker::DELTA_VERTEX_SET_PROPERTY:
|
||||
case Marker::DELTA_EDGE_SET_PROPERTY:
|
||||
if (!wal.ReadUint() || !wal.SkipString() || !wal.SkipPropertyValue())
|
||||
return std::nullopt;
|
||||
is_transaction_end = false;
|
||||
break;
|
||||
|
||||
// This delta explicitly indicates that a transaction is done.
|
||||
case Marker::DELTA_TRANSACTION_END:
|
||||
is_transaction_end = true;
|
||||
break;
|
||||
|
||||
// These operations aren't transactional and they are encoded only using
|
||||
// a single delta, so they each individually mark the end of their
|
||||
// 'transaction'.
|
||||
case Marker::DELTA_LABEL_INDEX_CREATE:
|
||||
case Marker::DELTA_LABEL_INDEX_DROP:
|
||||
if (!wal.SkipString()) return std::nullopt;
|
||||
is_transaction_end = true;
|
||||
break;
|
||||
case Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE:
|
||||
case Marker::DELTA_LABEL_PROPERTY_INDEX_DROP:
|
||||
case Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE:
|
||||
case Marker::DELTA_EXISTENCE_CONSTRAINT_DROP:
|
||||
if (!wal.SkipString() || !wal.SkipString()) return std::nullopt;
|
||||
is_transaction_end = true;
|
||||
break;
|
||||
|
||||
// These markers aren't delta actions.
|
||||
case Marker::TYPE_NULL:
|
||||
case Marker::TYPE_BOOL:
|
||||
case Marker::TYPE_INT:
|
||||
case Marker::TYPE_DOUBLE:
|
||||
case Marker::TYPE_STRING:
|
||||
case Marker::TYPE_LIST:
|
||||
case Marker::TYPE_MAP:
|
||||
case Marker::TYPE_PROPERTY_VALUE:
|
||||
case Marker::SECTION_VERTEX:
|
||||
case Marker::SECTION_EDGE:
|
||||
case Marker::SECTION_MAPPER:
|
||||
case Marker::SECTION_METADATA:
|
||||
case Marker::SECTION_INDICES:
|
||||
case Marker::SECTION_CONSTRAINTS:
|
||||
case Marker::SECTION_DELTA:
|
||||
case Marker::SECTION_OFFSETS:
|
||||
case Marker::VALUE_FALSE:
|
||||
case Marker::VALUE_TRUE:
|
||||
try {
|
||||
auto timestamp = ReadWalDeltaHeader(&wal);
|
||||
auto type = SkipWalDeltaData(&wal);
|
||||
return {{timestamp, IsWalDeltaDataTypeTransactionEnd(type)}};
|
||||
} catch (const RecoveryFailure &) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return {{*timestamp, is_transaction_end}};
|
||||
};
|
||||
auto size = wal.GetSize();
|
||||
// Here we read the whole file and determine the number of valid deltas. A
|
||||
@ -855,6 +975,80 @@ WalInfo ReadWalInfo(const std::filesystem::path &path) {
|
||||
return info;
|
||||
}
|
||||
|
||||
bool operator==(const WalDeltaData &a, const WalDeltaData &b) {
|
||||
if (a.type != b.type) return false;
|
||||
switch (a.type) {
|
||||
case WalDeltaData::Type::VERTEX_CREATE:
|
||||
case WalDeltaData::Type::VERTEX_DELETE:
|
||||
return a.vertex_create_delete.gid == b.vertex_create_delete.gid;
|
||||
|
||||
case WalDeltaData::Type::VERTEX_ADD_LABEL:
|
||||
case WalDeltaData::Type::VERTEX_REMOVE_LABEL:
|
||||
return a.vertex_add_remove_label.gid == b.vertex_add_remove_label.gid &&
|
||||
a.vertex_add_remove_label.label == b.vertex_add_remove_label.label;
|
||||
|
||||
case WalDeltaData::Type::VERTEX_SET_PROPERTY:
|
||||
case WalDeltaData::Type::EDGE_SET_PROPERTY:
|
||||
return a.vertex_edge_set_property.gid == b.vertex_edge_set_property.gid &&
|
||||
a.vertex_edge_set_property.property ==
|
||||
b.vertex_edge_set_property.property &&
|
||||
a.vertex_edge_set_property.value ==
|
||||
b.vertex_edge_set_property.value;
|
||||
|
||||
case WalDeltaData::Type::EDGE_CREATE:
|
||||
case WalDeltaData::Type::EDGE_DELETE:
|
||||
return a.edge_create_delete.gid == b.edge_create_delete.gid &&
|
||||
a.edge_create_delete.edge_type == b.edge_create_delete.edge_type &&
|
||||
a.edge_create_delete.from_vertex ==
|
||||
b.edge_create_delete.from_vertex &&
|
||||
a.edge_create_delete.to_vertex == b.edge_create_delete.to_vertex;
|
||||
|
||||
case WalDeltaData::Type::TRANSACTION_END:
|
||||
return true;
|
||||
|
||||
case WalDeltaData::Type::LABEL_INDEX_CREATE:
|
||||
case WalDeltaData::Type::LABEL_INDEX_DROP:
|
||||
return a.operation_label.label == b.operation_label.label;
|
||||
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE:
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP:
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP:
|
||||
return a.operation_label_property.label ==
|
||||
b.operation_label_property.label &&
|
||||
a.operation_label_property.property ==
|
||||
b.operation_label_property.property;
|
||||
}
|
||||
}
|
||||
bool operator!=(const WalDeltaData &a, const WalDeltaData &b) {
|
||||
return !(a == b);
|
||||
}
|
||||
|
||||
// Function used to read the WAL delta header. The function returns the delta
|
||||
// timestamp.
|
||||
uint64_t ReadWalDeltaHeader(Decoder *wal) {
|
||||
auto marker = wal->ReadMarker();
|
||||
if (!marker || *marker != Marker::SECTION_DELTA)
|
||||
throw RecoveryFailure("Invalid WAL data!");
|
||||
|
||||
auto timestamp = wal->ReadUint();
|
||||
if (!timestamp) throw RecoveryFailure("Invalid WAL data!");
|
||||
return *timestamp;
|
||||
}
|
||||
|
||||
// Function used to either read the current WAL delta data. The WAL delta header
|
||||
// must be read before calling this function.
|
||||
WalDeltaData ReadWalDeltaData(Decoder *wal) {
|
||||
return ReadSkipWalDeltaData<true>(wal);
|
||||
}
|
||||
|
||||
// Function used to either skip the current WAL delta data. The WAL delta header
|
||||
// must be read before calling this function.
|
||||
WalDeltaData::Type SkipWalDeltaData(Decoder *wal) {
|
||||
auto delta = ReadSkipWalDeltaData<false>(wal);
|
||||
return delta.type;
|
||||
}
|
||||
|
||||
WalFile::WalFile(const std::filesystem::path &wal_directory,
|
||||
const std::string &uuid, Config::Items items,
|
||||
NameIdMapper *name_id_mapper, uint64_t seq_num)
|
||||
|
@ -27,6 +27,10 @@ namespace storage {
|
||||
static const std::string kSnapshotDirectory{"snapshots"};
|
||||
static const std::string kWalDirectory{"wal"};
|
||||
|
||||
// Magic values written to the start of a snapshot/WAL file to identify it.
|
||||
const std::string kSnapshotMagic{"MGsn"};
|
||||
const std::string kWalMagic{"MGwl"};
|
||||
|
||||
static_assert(std::is_same_v<uint8_t, unsigned char>);
|
||||
|
||||
/// Markers that are used to indicate crucial parts of the snapshot/WAL.
|
||||
@ -208,6 +212,80 @@ struct WalInfo {
|
||||
/// @throw RecoveryFailure
|
||||
WalInfo ReadWalInfo(const std::filesystem::path &path);
|
||||
|
||||
/// Structure used to return loaded WAL delta data.
|
||||
struct WalDeltaData {
|
||||
enum class Type {
|
||||
VERTEX_CREATE,
|
||||
VERTEX_DELETE,
|
||||
VERTEX_ADD_LABEL,
|
||||
VERTEX_REMOVE_LABEL,
|
||||
VERTEX_SET_PROPERTY,
|
||||
EDGE_CREATE,
|
||||
EDGE_DELETE,
|
||||
EDGE_SET_PROPERTY,
|
||||
TRANSACTION_END,
|
||||
LABEL_INDEX_CREATE,
|
||||
LABEL_INDEX_DROP,
|
||||
LABEL_PROPERTY_INDEX_CREATE,
|
||||
LABEL_PROPERTY_INDEX_DROP,
|
||||
EXISTENCE_CONSTRAINT_CREATE,
|
||||
EXISTENCE_CONSTRAINT_DROP,
|
||||
};
|
||||
|
||||
Type type{Type::TRANSACTION_END};
|
||||
|
||||
struct {
|
||||
Gid gid;
|
||||
} vertex_create_delete;
|
||||
|
||||
struct {
|
||||
Gid gid;
|
||||
std::string label;
|
||||
} vertex_add_remove_label;
|
||||
|
||||
struct {
|
||||
Gid gid;
|
||||
std::string property;
|
||||
PropertyValue value;
|
||||
} vertex_edge_set_property;
|
||||
|
||||
struct {
|
||||
Gid gid;
|
||||
std::string edge_type;
|
||||
Gid from_vertex;
|
||||
Gid to_vertex;
|
||||
} edge_create_delete;
|
||||
|
||||
struct {
|
||||
std::string label;
|
||||
} operation_label;
|
||||
|
||||
struct {
|
||||
std::string label;
|
||||
std::string property;
|
||||
} operation_label_property;
|
||||
};
|
||||
|
||||
bool operator==(const WalDeltaData &a, const WalDeltaData &b);
|
||||
bool operator!=(const WalDeltaData &a, const WalDeltaData &b);
|
||||
|
||||
/// Function used to read the WAL delta header. The function returns the delta
|
||||
/// timestamp.
|
||||
/// @throw RecoveryFailure
|
||||
uint64_t ReadWalDeltaHeader(Decoder *wal);
|
||||
|
||||
/// Function used to either read the current WAL delta data. The function
|
||||
/// returns the read delta data. The WAL delta header must be read before
|
||||
/// calling this function.
|
||||
/// @throw RecoveryFailure
|
||||
WalDeltaData ReadWalDeltaData(Decoder *wal);
|
||||
|
||||
/// Function used to either skip the current WAL delta data. The function
|
||||
/// returns the skipped delta type. The WAL delta header must be read before
|
||||
/// calling this function.
|
||||
/// @throw RecoveryFailure
|
||||
WalDeltaData::Type SkipWalDeltaData(Decoder *wal);
|
||||
|
||||
/// Enum used to indicate a global database operation that isn't transactional.
|
||||
enum class StorageGlobalOperation {
|
||||
LABEL_INDEX_CREATE,
|
||||
|
@ -12,6 +12,25 @@
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/uuid.hpp"
|
||||
|
||||
// Helper function used to convert between enum types.
|
||||
storage::WalDeltaData::Type StorageGlobalOperationToWalDeltaDataType(
|
||||
storage::StorageGlobalOperation operation) {
|
||||
switch (operation) {
|
||||
case storage::StorageGlobalOperation::LABEL_INDEX_CREATE:
|
||||
return storage::WalDeltaData::Type::LABEL_INDEX_CREATE;
|
||||
case storage::StorageGlobalOperation::LABEL_INDEX_DROP:
|
||||
return storage::WalDeltaData::Type::LABEL_INDEX_DROP;
|
||||
case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
|
||||
return storage::WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE;
|
||||
case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
|
||||
return storage::WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP;
|
||||
case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
|
||||
return storage::WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE;
|
||||
case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP:
|
||||
return storage::WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP;
|
||||
}
|
||||
}
|
||||
|
||||
// This class mimics the internals of the storage to generate the deltas.
|
||||
class DeltaGenerator final {
|
||||
public:
|
||||
@ -28,12 +47,24 @@ class DeltaGenerator final {
|
||||
auto delta = storage::CreateDeleteObjectDelta(&transaction_);
|
||||
auto &it = gen_->vertices_.emplace_back(gid, delta);
|
||||
delta->prev.Set(&it);
|
||||
{
|
||||
storage::WalDeltaData data;
|
||||
data.type = storage::WalDeltaData::Type::VERTEX_CREATE;
|
||||
data.vertex_create_delete.gid = gid;
|
||||
data_.push_back(data);
|
||||
}
|
||||
return ⁢
|
||||
}
|
||||
|
||||
void DeleteVertex(storage::Vertex *vertex) {
|
||||
storage::CreateAndLinkDelta(&transaction_, &*vertex,
|
||||
storage::Delta::RecreateObjectTag());
|
||||
{
|
||||
storage::WalDeltaData data;
|
||||
data.type = storage::WalDeltaData::Type::VERTEX_DELETE;
|
||||
data.vertex_create_delete.gid = vertex->gid;
|
||||
data_.push_back(data);
|
||||
}
|
||||
}
|
||||
|
||||
void AddLabel(storage::Vertex *vertex, const std::string &label) {
|
||||
@ -41,6 +72,13 @@ class DeltaGenerator final {
|
||||
vertex->labels.push_back(label_id);
|
||||
storage::CreateAndLinkDelta(&transaction_, &*vertex,
|
||||
storage::Delta::RemoveLabelTag(), label_id);
|
||||
{
|
||||
storage::WalDeltaData data;
|
||||
data.type = storage::WalDeltaData::Type::VERTEX_ADD_LABEL;
|
||||
data.vertex_add_remove_label.gid = vertex->gid;
|
||||
data.vertex_add_remove_label.label = label;
|
||||
data_.push_back(data);
|
||||
}
|
||||
}
|
||||
|
||||
void RemoveLabel(storage::Vertex *vertex, const std::string &label) {
|
||||
@ -49,6 +87,13 @@ class DeltaGenerator final {
|
||||
std::find(vertex->labels.begin(), vertex->labels.end(), label_id));
|
||||
storage::CreateAndLinkDelta(&transaction_, &*vertex,
|
||||
storage::Delta::AddLabelTag(), label_id);
|
||||
{
|
||||
storage::WalDeltaData data;
|
||||
data.type = storage::WalDeltaData::Type::VERTEX_REMOVE_LABEL;
|
||||
data.vertex_add_remove_label.gid = vertex->gid;
|
||||
data.vertex_add_remove_label.label = label;
|
||||
data_.push_back(data);
|
||||
}
|
||||
}
|
||||
|
||||
void SetProperty(storage::Vertex *vertex, const std::string &property,
|
||||
@ -74,6 +119,17 @@ class DeltaGenerator final {
|
||||
props.erase(it);
|
||||
}
|
||||
}
|
||||
{
|
||||
storage::WalDeltaData data;
|
||||
data.type = storage::WalDeltaData::Type::VERTEX_SET_PROPERTY;
|
||||
data.vertex_edge_set_property.gid = vertex->gid;
|
||||
data.vertex_edge_set_property.property = property;
|
||||
// We don't store the property value here. That is because the storage
|
||||
// generates multiple `SetProperty` deltas using only the final values
|
||||
// of the property. The intermediate values aren't encoded. The value is
|
||||
// later determined in the `Finalize` function.
|
||||
data_.push_back(data);
|
||||
}
|
||||
}
|
||||
|
||||
void Finalize(bool append_transaction_end = true) {
|
||||
@ -93,7 +149,33 @@ class DeltaGenerator final {
|
||||
}
|
||||
if (append_transaction_end) {
|
||||
gen_->wal_file_.AppendTransactionEnd(commit_timestamp);
|
||||
if (gen_->valid_) {
|
||||
gen_->UpdateStats(commit_timestamp, transaction_.deltas.size() + 1);
|
||||
for (auto &data : data_) {
|
||||
if (data.type == storage::WalDeltaData::Type::VERTEX_SET_PROPERTY) {
|
||||
// We need to put the final property value into the SET_PROPERTY
|
||||
// delta.
|
||||
auto vertex =
|
||||
std::find(gen_->vertices_.begin(), gen_->vertices_.end(),
|
||||
data.vertex_edge_set_property.gid);
|
||||
ASSERT_NE(vertex, gen_->vertices_.end());
|
||||
auto property_id =
|
||||
storage::PropertyId::FromUint(gen_->mapper_.NameToId(
|
||||
data.vertex_edge_set_property.property));
|
||||
auto &props = vertex->properties;
|
||||
auto it = props.find(property_id);
|
||||
if (it == props.end()) {
|
||||
data.vertex_edge_set_property.value = storage::PropertyValue();
|
||||
} else {
|
||||
data.vertex_edge_set_property.value = it->second;
|
||||
}
|
||||
}
|
||||
gen_->data_.emplace_back(commit_timestamp, data);
|
||||
}
|
||||
storage::WalDeltaData data{
|
||||
.type = storage::WalDeltaData::Type::TRANSACTION_END};
|
||||
gen_->data_.emplace_back(commit_timestamp, data);
|
||||
}
|
||||
} else {
|
||||
gen_->valid_ = false;
|
||||
}
|
||||
@ -102,8 +184,11 @@ class DeltaGenerator final {
|
||||
private:
|
||||
DeltaGenerator *gen_;
|
||||
storage::Transaction transaction_;
|
||||
std::vector<storage::WalDeltaData> data_;
|
||||
};
|
||||
|
||||
using DataT = std::vector<std::pair<uint64_t, storage::WalDeltaData>>;
|
||||
|
||||
DeltaGenerator(const std::filesystem::path &data_directory,
|
||||
bool properties_on_edges, uint64_t seq_num)
|
||||
: uuid_(utils::GenerateUUID()),
|
||||
@ -129,7 +214,24 @@ class DeltaGenerator final {
|
||||
property_id = storage::PropertyId::FromUint(mapper_.NameToId(*property));
|
||||
}
|
||||
wal_file_.AppendOperation(operation, label_id, property_id, timestamp_);
|
||||
if (valid_) {
|
||||
UpdateStats(timestamp_, 1);
|
||||
storage::WalDeltaData data;
|
||||
data.type = StorageGlobalOperationToWalDeltaDataType(operation);
|
||||
switch (operation) {
|
||||
case storage::StorageGlobalOperation::LABEL_INDEX_CREATE:
|
||||
case storage::StorageGlobalOperation::LABEL_INDEX_DROP:
|
||||
data.operation_label.label = label;
|
||||
break;
|
||||
case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
|
||||
case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
|
||||
case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
|
||||
case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP:
|
||||
data.operation_label_property.label = label;
|
||||
data.operation_label_property.property = *property;
|
||||
}
|
||||
data_.emplace_back(timestamp_, data);
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t GetPosition() { return wal_file_.GetSize(); }
|
||||
@ -144,9 +246,10 @@ class DeltaGenerator final {
|
||||
.num_deltas = deltas_count_};
|
||||
}
|
||||
|
||||
DataT GetData() { return data_; }
|
||||
|
||||
private:
|
||||
void UpdateStats(uint64_t timestamp, uint64_t count) {
|
||||
if (!valid_) return;
|
||||
if (deltas_count_ == 0) {
|
||||
tx_from_ = timestamp;
|
||||
}
|
||||
@ -165,6 +268,8 @@ class DeltaGenerator final {
|
||||
|
||||
storage::WalFile wal_file_;
|
||||
|
||||
DataT data_;
|
||||
|
||||
uint64_t deltas_count_{0};
|
||||
uint64_t tx_from_{0};
|
||||
uint64_t tx_to_{0};
|
||||
@ -191,6 +296,21 @@ void AssertWalInfoEqual(const storage::WalInfo &a, const storage::WalInfo &b) {
|
||||
ASSERT_EQ(a.num_deltas, b.num_deltas);
|
||||
}
|
||||
|
||||
void AssertWalDataEqual(const DeltaGenerator::DataT &data,
|
||||
const std::filesystem::path &path) {
|
||||
auto info = storage::ReadWalInfo(path);
|
||||
storage::Decoder wal;
|
||||
wal.Initialize(path, storage::kWalMagic);
|
||||
wal.SetPosition(info.offset_deltas);
|
||||
DeltaGenerator::DataT current;
|
||||
for (uint64_t i = 0; i < info.num_deltas; ++i) {
|
||||
auto timestamp = storage::ReadWalDeltaHeader(&wal);
|
||||
current.emplace_back(timestamp, storage::ReadWalDeltaData(&wal));
|
||||
}
|
||||
ASSERT_EQ(data.size(), current.size());
|
||||
ASSERT_EQ(data, current);
|
||||
}
|
||||
|
||||
class WalFileTest : public ::testing::TestWithParam<bool> {
|
||||
public:
|
||||
WalFileTest() {}
|
||||
@ -236,11 +356,13 @@ TEST_P(WalFileTest, EmptyFile) {
|
||||
#define GENERATE_SIMPLE_TEST(name, ops) \
|
||||
TEST_P(WalFileTest, name) { \
|
||||
storage::WalInfo info; \
|
||||
DeltaGenerator::DataT data; \
|
||||
\
|
||||
{ \
|
||||
DeltaGenerator gen(storage_directory, GetParam(), 5); \
|
||||
ops; \
|
||||
info = gen.GetInfo(); \
|
||||
data = gen.GetData(); \
|
||||
} \
|
||||
\
|
||||
auto wal_files = GetFilesList(); \
|
||||
@ -251,6 +373,7 @@ TEST_P(WalFileTest, EmptyFile) {
|
||||
storage::RecoveryFailure); \
|
||||
} else { \
|
||||
AssertWalInfoEqual(info, storage::ReadWalInfo(wal_files.front())); \
|
||||
AssertWalDataEqual(data, wal_files.front()); \
|
||||
} \
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user