diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt
index 2cc96690b..dffc93f28 100644
--- a/src/storage/v2/CMakeLists.txt
+++ b/src/storage/v2/CMakeLists.txt
@@ -1,4 +1,5 @@
 set(storage_v2_src_files
+    durability.cpp
     edge_accessor.cpp
     indices.cpp
     vertex_accessor.cpp
diff --git a/src/storage/v2/durability.cpp b/src/storage/v2/durability.cpp
new file mode 100644
index 000000000..aa69e7707
--- /dev/null
+++ b/src/storage/v2/durability.cpp
@@ -0,0 +1,377 @@
+#include "storage/v2/durability.hpp"
+
+#include "utils/endian.hpp"
+
+namespace storage::durability {
+
+namespace {
+std::optional<Marker> CastToMarker(uint8_t value) {
+  for (auto marker : kMarkersAll) {
+    if (static_cast<uint8_t>(marker) == value) {
+      return marker;
+    }
+  }
+  return std::nullopt;
+}
+
+void WriteSize(Encoder *encoder, uint64_t size) {
+  size = utils::HostToLittleEndian(size);
+  encoder->Write(reinterpret_cast<const uint8_t *>(&size), sizeof(size));
+}
+
+std::optional<uint64_t> ReadSize(Decoder *decoder) {
+  uint64_t size;
+  if (!decoder->Read(reinterpret_cast<uint8_t *>(&size), sizeof(size)))
+    return std::nullopt;
+  size = utils::LittleEndianToHost(size);
+  return size;
+}
+}  // namespace
+
+void Encoder::Initialize(const std::filesystem::path &path,
+                         const std::string_view &magic, uint64_t version) {
+  file_.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
+  Write(reinterpret_cast<const uint8_t *>(magic.data()), magic.size());
+  auto version_encoded = utils::HostToLittleEndian(version);
+  Write(reinterpret_cast<const uint8_t *>(&version_encoded),
+        sizeof(version_encoded));
+}
+
+void Encoder::Write(const uint8_t *data, uint64_t size) {
+  file_.Write(data, size);
+}
+
+void Encoder::WriteMarker(Marker marker) {
+  auto value = static_cast<uint8_t>(marker);
+  Write(&value, sizeof(value));
+}
+
+void Encoder::WriteBool(bool value) {
+  WriteMarker(Marker::TYPE_BOOL);
+  if (value) {
+    WriteMarker(Marker::VALUE_TRUE);
+  } else {
+    WriteMarker(Marker::VALUE_FALSE);
+  }
+}
+
+void Encoder::WriteUint(uint64_t value) {
+  value = utils::HostToLittleEndian(value);
+  WriteMarker(Marker::TYPE_INT);
+  Write(reinterpret_cast<const uint8_t *>(&value), sizeof(value));
+}
+
+void Encoder::WriteDouble(double value) {
+  auto value_uint = utils::MemcpyCast<uint64_t>(value);
+  value_uint = utils::HostToLittleEndian(value_uint);
+  WriteMarker(Marker::TYPE_DOUBLE);
+  Write(reinterpret_cast<const uint8_t *>(&value_uint), sizeof(value_uint));
+}
+
+void Encoder::WriteString(const std::string_view &value) {
+  WriteMarker(Marker::TYPE_STRING);
+  WriteSize(this, value.size());
+  Write(reinterpret_cast<const uint8_t *>(value.data()), value.size());
+}
+
+void Encoder::WritePropertyValue(const PropertyValue &value) {
+  WriteMarker(Marker::TYPE_PROPERTY_VALUE);
+  switch (value.type()) {
+    case PropertyValue::Type::Null: {
+      WriteMarker(Marker::TYPE_NULL);
+      break;
+    }
+    case PropertyValue::Type::Bool: {
+      WriteBool(value.ValueBool());
+      break;
+    }
+    case PropertyValue::Type::Int: {
+      WriteUint(utils::MemcpyCast<uint64_t>(value.ValueInt()));
+      break;
+    }
+    case PropertyValue::Type::Double: {
+      WriteDouble(value.ValueDouble());
+      break;
+    }
+    case PropertyValue::Type::String: {
+      WriteString(value.ValueString());
+      break;
+    }
+    case PropertyValue::Type::List: {
+      const auto &list = value.ValueList();
+      WriteMarker(Marker::TYPE_LIST);
+      WriteSize(this, list.size());
+      for (const auto &item : list) {
+        WritePropertyValue(item);
+      }
+      break;
+    }
+    case PropertyValue::Type::Map: {
+      const auto &map = value.ValueMap();
+      WriteMarker(Marker::TYPE_MAP);
+      WriteSize(this, map.size());
+      for (const auto &item : map) {
+        WriteString(item.first);
+        WritePropertyValue(item.second);
+      }
+      break;
+    }
+  }
+}
+
+uint64_t Encoder::GetPosition() { return file_.GetPosition(); }
+
+void Encoder::SetPosition(uint64_t position) {
+  file_.SetPosition(utils::OutputFile::Position::SET, position);
+}
+
+void Encoder::Finalize() {
+  file_.Sync();
+  file_.Close();
+}
+
+std::optional<uint64_t> Decoder::Initialize(const std::filesystem::path &path,
+                                            const std::string &magic) {
+  file_.Open(path);
+  std::string file_magic(magic.size(), '\0');
+  if (!Read(reinterpret_cast<uint8_t *>(file_magic.data()), file_magic.size()))
+    return std::nullopt;
+  if (file_magic != magic) return std::nullopt;
+  uint64_t version_encoded;
+  if (!Read(reinterpret_cast<uint8_t *>(&version_encoded),
+            sizeof(version_encoded)))
+    return std::nullopt;
+  return utils::LittleEndianToHost(version_encoded);
+}
+
+bool Decoder::Read(uint8_t *data, size_t size) {
+  return file_.Read(data, size);
+}
+
+bool Decoder::Peek(uint8_t *data, size_t size) {
+  return file_.Peek(data, size);
+}
+
+std::optional<Marker> Decoder::PeekMarker() {
+  uint8_t value;
+  if (!Peek(&value, sizeof(value))) return std::nullopt;
+  auto marker = CastToMarker(value);
+  if (!marker) return std::nullopt;
+  return *marker;
+}
+
+std::optional<Marker> Decoder::ReadMarker() {
+  uint8_t value;
+  if (!Read(&value, sizeof(value))) return std::nullopt;
+  auto marker = CastToMarker(value);
+  if (!marker) return std::nullopt;
+  return *marker;
+}
+
+std::optional<bool> Decoder::ReadBool() {
+  auto marker = ReadMarker();
+  if (!marker || *marker != Marker::TYPE_BOOL) return std::nullopt;
+  auto value = ReadMarker();
+  if (!value || (*value != Marker::VALUE_FALSE && *value != Marker::VALUE_TRUE))
+    return std::nullopt;
+  return *value == Marker::VALUE_TRUE;
+}
+
+std::optional<uint64_t> Decoder::ReadUint() {
+  auto marker = ReadMarker();
+  if (!marker || *marker != Marker::TYPE_INT) return std::nullopt;
+  uint64_t value;
+  if (!Read(reinterpret_cast<uint8_t *>(&value), sizeof(value)))
+    return std::nullopt;
+  value = utils::LittleEndianToHost(value);
+  return value;
+}
+
+std::optional<double> Decoder::ReadDouble() {
+  auto marker = ReadMarker();
+  if (!marker || *marker != Marker::TYPE_DOUBLE) return std::nullopt;
+  uint64_t value_int;
+  if (!Read(reinterpret_cast<uint8_t *>(&value_int), sizeof(value_int)))
+    return std::nullopt;
+  value_int = utils::LittleEndianToHost(value_int);
+  auto value = utils::MemcpyCast<double>(value_int);
+  return value;
+}
+
+std::optional<std::string> Decoder::ReadString() {
+  auto marker = ReadMarker();
+  if (!marker || *marker != Marker::TYPE_STRING) return std::nullopt;
+  auto size = ReadSize(this);
+  if (!size) return std::nullopt;
+  std::string value(*size, '\0');
+  if (!Read(reinterpret_cast<uint8_t *>(value.data()), *size))
+    return std::nullopt;
+  return value;
+}
+
+std::optional<PropertyValue> Decoder::ReadPropertyValue() {
+  auto pv_marker = ReadMarker();
+  if (!pv_marker || *pv_marker != Marker::TYPE_PROPERTY_VALUE)
+    return std::nullopt;
+
+  auto marker = PeekMarker();
+  if (!marker) return std::nullopt;
+  switch (*marker) {
+    case Marker::TYPE_NULL: {
+      auto inner_marker = ReadMarker();
+      if (!inner_marker || *inner_marker != Marker::TYPE_NULL)
+        return std::nullopt;
+      return PropertyValue();
+    }
+    case Marker::TYPE_BOOL: {
+      auto value = ReadBool();
+      if (!value) return std::nullopt;
+      return PropertyValue(*value);
+    }
+    case Marker::TYPE_INT: {
+      auto value = ReadUint();
+      if (!value) return std::nullopt;
+      return PropertyValue(utils::MemcpyCast<int64_t>(*value));
+    }
+    case Marker::TYPE_DOUBLE: {
+      auto value = ReadDouble();
+      if (!value) return std::nullopt;
+      return PropertyValue(*value);
+    }
+    case Marker::TYPE_STRING: {
+      auto value = ReadString();
+      if (!value) return std::nullopt;
+      return PropertyValue(std::move(*value));
+    }
+    case Marker::TYPE_LIST: {
+      auto inner_marker = ReadMarker();
+      if (!inner_marker || *inner_marker != Marker::TYPE_LIST)
+        return std::nullopt;
+      auto size = ReadSize(this);
+      if (!size) return std::nullopt;
+      std::vector<PropertyValue> value;
+      value.reserve(*size);
+      for (uint64_t i = 0; i < *size; ++i) {
+        auto item = ReadPropertyValue();
+        if (!item) return std::nullopt;
+        value.emplace_back(std::move(*item));
+      }
+      return PropertyValue(std::move(value));
+    }
+    case Marker::TYPE_MAP: {
+      auto inner_marker = ReadMarker();
+      if (!inner_marker || *inner_marker != Marker::TYPE_MAP)
+        return std::nullopt;
+      auto size = ReadSize(this);
+      if (!size) return std::nullopt;
+      std::map<std::string, PropertyValue> value;
+      for (uint64_t i = 0; i < *size; ++i) {
+        auto key = ReadString();
+        if (!key) return std::nullopt;
+        auto item = ReadPropertyValue();
+        if (!item) return std::nullopt;
+        value.emplace(std::move(*key), std::move(*item));
+      }
+      return PropertyValue(std::move(value));
+    }
+
+    case Marker::TYPE_PROPERTY_VALUE:
+    case Marker::SECTION_VERTEX:
+    case Marker::SECTION_EDGE:
+    case Marker::SECTION_MAPPER:
+    case Marker::SECTION_METADATA:
+    case Marker::SECTION_INDICES:
+    case Marker::SECTION_CONSTRAINTS:
+    case Marker::SECTION_OFFSETS:
+    case Marker::VALUE_FALSE:
+    case Marker::VALUE_TRUE:
+      return std::nullopt;
+  }
+}
+
+bool Decoder::SkipString() {
+  auto marker = ReadMarker();
+  if (!marker || *marker != Marker::TYPE_STRING) return false;
+  auto maybe_size = ReadSize(this);
+  if (!maybe_size) return false;
+
+  const uint64_t kBufferSize = 262144;
+  uint8_t buffer[kBufferSize];
+  uint64_t size = *maybe_size;
+  while (size > 0) {
+    uint64_t to_read = size < kBufferSize ? size : kBufferSize;
+    if (!Read(reinterpret_cast<uint8_t *>(&buffer), to_read)) return false;
+    size -= to_read;
+  }
+
+  return true;
+}
+
+bool Decoder::SkipPropertyValue() {
+  auto pv_marker = ReadMarker();
+  if (!pv_marker || *pv_marker != Marker::TYPE_PROPERTY_VALUE) return false;
+
+  auto marker = PeekMarker();
+  if (!marker) return false;
+  switch (*marker) {
+    case Marker::TYPE_NULL: {
+      auto inner_marker = ReadMarker();
+      return inner_marker && *inner_marker == Marker::TYPE_NULL;
+    }
+    case Marker::TYPE_BOOL: {
+      return !!ReadBool();
+    }
+    case Marker::TYPE_INT: {
+      return !!ReadUint();
+    }
+    case Marker::TYPE_DOUBLE: {
+      return !!ReadDouble();
+    }
+    case Marker::TYPE_STRING: {
+      return SkipString();
+    }
+    case Marker::TYPE_LIST: {
+      auto inner_marker = ReadMarker();
+      if (!inner_marker || *inner_marker != Marker::TYPE_LIST) return false;
+      auto size = ReadSize(this);
+      if (!size) return false;
+      for (uint64_t i = 0; i < *size; ++i) {
+        if (!SkipPropertyValue()) return false;
+      }
+      return true;
+    }
+    case Marker::TYPE_MAP: {
+      auto inner_marker = ReadMarker();
+      if (!inner_marker || *inner_marker != Marker::TYPE_MAP) return false;
+      auto size = ReadSize(this);
+      if (!size) return false;
+      for (uint64_t i = 0; i < *size; ++i) {
+        if (!SkipString()) return false;
+        if (!SkipPropertyValue()) return false;
+      }
+      return true;
+    }
+
+    case Marker::TYPE_PROPERTY_VALUE:
+    case Marker::SECTION_VERTEX:
+    case Marker::SECTION_EDGE:
+    case Marker::SECTION_MAPPER:
+    case Marker::SECTION_METADATA:
+    case Marker::SECTION_INDICES:
+    case Marker::SECTION_CONSTRAINTS:
+    case Marker::SECTION_OFFSETS:
+    case Marker::VALUE_FALSE:
+    case Marker::VALUE_TRUE:
+      return false;
+  }
+}
+
+uint64_t Decoder::GetSize() { return file_.GetSize(); }
+
+uint64_t Decoder::GetPosition() { return file_.GetPosition(); }
+
+void Decoder::SetPosition(uint64_t position) {
+  file_.SetPosition(utils::InputFile::Position::SET, position);
+}
+
+}  // namespace storage::durability
diff --git a/src/storage/v2/durability.hpp b/src/storage/v2/durability.hpp
new file mode 100644
index 000000000..6b65c39e5
--- /dev/null
+++ b/src/storage/v2/durability.hpp
@@ -0,0 +1,113 @@
+#pragma once
+
+#include <cstdint>
+#include <filesystem>
+#include <optional>
+#include <string>
+#include <string_view>
+#include <type_traits>
+
+#include "storage/v2/property_value.hpp"
+#include "utils/file.hpp"
+
+namespace storage::durability {
+
+static_assert(std::is_same_v<uint8_t, unsigned char>);
+
+/// Markers that are used to indicate crucial parts of the snapshot/WAL.
+/// IMPORTANT: Don't forget to update the list of all markers `kMarkersAll` when
+/// you add a new Marker.
+enum class Marker : uint8_t {
+  TYPE_NULL = 0x10,
+  TYPE_BOOL = 0x11,
+  TYPE_INT = 0x12,
+  TYPE_DOUBLE = 0x13,
+  TYPE_STRING = 0x14,
+  TYPE_LIST = 0x15,
+  TYPE_MAP = 0x16,
+  TYPE_PROPERTY_VALUE = 0x17,
+
+  SECTION_VERTEX = 0x20,
+  SECTION_EDGE = 0x21,
+  SECTION_MAPPER = 0x22,
+  SECTION_METADATA = 0x23,
+  SECTION_INDICES = 0x24,
+  SECTION_CONSTRAINTS = 0x25,
+  SECTION_OFFSETS = 0x42,
+
+  VALUE_FALSE = 0x00,
+  VALUE_TRUE = 0xff,
+};
+
+/// List of all available markers.
+/// IMPORTANT: Don't forget to update this list when you add a new Marker.
+static const Marker kMarkersAll[] = {
+    Marker::TYPE_NULL,       Marker::TYPE_BOOL,
+    Marker::TYPE_INT,        Marker::TYPE_DOUBLE,
+    Marker::TYPE_STRING,     Marker::TYPE_LIST,
+    Marker::TYPE_MAP,        Marker::TYPE_PROPERTY_VALUE,
+    Marker::SECTION_VERTEX,  Marker::SECTION_EDGE,
+    Marker::SECTION_MAPPER,  Marker::SECTION_METADATA,
+    Marker::SECTION_INDICES, Marker::SECTION_CONSTRAINTS,
+    Marker::SECTION_OFFSETS, Marker::VALUE_FALSE,
+    Marker::VALUE_TRUE,
+};
+
+/// Encoder that is used to generate a snapshot/WAL.
+class Encoder final {
+ public:
+  void Initialize(const std::filesystem::path &path,
+                  const std::string_view &magic, uint64_t version);
+
+  // Main write function, the only one that is allowed to write to the `file_`
+  // directly.
+  void Write(const uint8_t *data, uint64_t size);
+
+  void WriteMarker(Marker marker);
+  void WriteBool(bool value);
+  void WriteUint(uint64_t value);
+  void WriteDouble(double value);
+  void WriteString(const std::string_view &value);
+  void WritePropertyValue(const PropertyValue &value);
+
+  uint64_t GetPosition();
+  void SetPosition(uint64_t position);
+
+  void Finalize();
+
+ private:
+  utils::OutputFile file_;
+};
+
+/// Decoder that is used to read a generated snapshot/WAL.
+class Decoder final {
+ public:
+  std::optional<uint64_t> Initialize(const std::filesystem::path &path,
+                                     const std::string &magic);
+
+  // Main read functions, the only one that are allowed to read from the `file_`
+  // directly.
+  bool Read(uint8_t *data, size_t size);
+  bool Peek(uint8_t *data, size_t size);
+
+  std::optional<Marker> PeekMarker();
+
+  std::optional<Marker> ReadMarker();
+  std::optional<bool> ReadBool();
+  std::optional<uint64_t> ReadUint();
+  std::optional<double> ReadDouble();
+  std::optional<std::string> ReadString();
+  std::optional<PropertyValue> ReadPropertyValue();
+
+  bool SkipString();
+  bool SkipPropertyValue();
+
+  uint64_t GetSize();
+  uint64_t GetPosition();
+  void SetPosition(uint64_t position);
+
+ private:
+  utils::InputFile file_;
+};
+
+}  // namespace storage::durability
diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
index b4913029a..babf65947 100644
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -341,6 +341,9 @@ target_link_libraries(${test_prefix}property_value_v2 mg-utils)
 add_unit_test(storage_v2_constraints.cpp)
 target_link_libraries(${test_prefix}storage_v2_constraints mg-storage-v2)
 
+add_unit_test(storage_v2_decoder_encoder.cpp)
+target_link_libraries(${test_prefix}storage_v2_decoder_encoder mg-storage-v2)
+
 add_unit_test(storage_v2_edge.cpp)
 target_link_libraries(${test_prefix}storage_v2_edge mg-storage-v2)
 
diff --git a/tests/unit/storage_v2_decoder_encoder.cpp b/tests/unit/storage_v2_decoder_encoder.cpp
new file mode 100644
index 000000000..26362e765
--- /dev/null
+++ b/tests/unit/storage_v2_decoder_encoder.cpp
@@ -0,0 +1,421 @@
+#include <gtest/gtest.h>
+
+#include <filesystem>
+#include <limits>
+
+#include "storage/v2/durability.hpp"
+
+static const std::string kTestMagic{"MGtest"};
+static const uint64_t kTestVersion{1};
+
+class DecoderEncoderTest : public ::testing::Test {
+ public:
+  void SetUp() override { Clear(); }
+
+  void TearDown() override { Clear(); }
+
+  std::filesystem::path storage_file{
+      std::filesystem::temp_directory_path() /
+      "MG_test_unit_storage_v2_decoder_encoder.bin"};
+
+  std::filesystem::path alternate_file{
+      std::filesystem::temp_directory_path() /
+      "MG_test_unit_storage_v2_decoder_encoder_alternate.bin"};
+
+ private:
+  void Clear() {
+    if (std::filesystem::exists(storage_file)) {
+      std::filesystem::remove(storage_file);
+    }
+    if (std::filesystem::exists(alternate_file)) {
+      std::filesystem::remove(alternate_file);
+    }
+  }
+};
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_F(DecoderEncoderTest, ReadMarker) {
+  {
+    storage::durability::Encoder encoder;
+    encoder.Initialize(storage_file, kTestMagic, kTestVersion);
+    for (const auto &item : storage::durability::kMarkersAll) {
+      encoder.WriteMarker(item);
+    }
+    {
+      uint8_t invalid = 1;
+      encoder.Write(&invalid, sizeof(invalid));
+    }
+    encoder.Finalize();
+  }
+  {
+    storage::durability::Decoder decoder;
+    auto version = decoder.Initialize(storage_file, kTestMagic);
+    ASSERT_TRUE(version);
+    ASSERT_EQ(*version, kTestVersion);
+    for (const auto &item : storage::durability::kMarkersAll) {
+      auto decoded = decoder.ReadMarker();
+      ASSERT_TRUE(decoded);
+      ASSERT_EQ(*decoded, item);
+    }
+    ASSERT_FALSE(decoder.ReadMarker());
+    ASSERT_FALSE(decoder.ReadMarker());
+    ASSERT_EQ(decoder.GetPosition(), decoder.GetSize());
+  }
+}
+
+// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
+#define GENERATE_READ_TEST(name, type, ...)                        \
+  TEST_F(DecoderEncoderTest, Read##name) {                         \
+    std::vector<type> dataset{__VA_ARGS__};                        \
+    {                                                              \
+      storage::durability::Encoder encoder;                        \
+      encoder.Initialize(storage_file, kTestMagic, kTestVersion);  \
+      for (const auto &item : dataset) {                           \
+        encoder.Write##name(item);                                 \
+      }                                                            \
+      {                                                            \
+        uint8_t invalid = 1;                                       \
+        encoder.Write(&invalid, sizeof(invalid));                  \
+      }                                                            \
+      encoder.Finalize();                                          \
+    }                                                              \
+    {                                                              \
+      storage::durability::Decoder decoder;                        \
+      auto version = decoder.Initialize(storage_file, kTestMagic); \
+      ASSERT_TRUE(version);                                        \
+      ASSERT_EQ(*version, kTestVersion);                           \
+      for (const auto &item : dataset) {                           \
+        auto decoded = decoder.Read##name();                       \
+        ASSERT_TRUE(decoded);                                      \
+        ASSERT_EQ(*decoded, item);                                 \
+      }                                                            \
+      ASSERT_FALSE(decoder.Read##name());                          \
+      ASSERT_FALSE(decoder.Read##name());                          \
+      ASSERT_EQ(decoder.GetPosition(), decoder.GetSize());         \
+    }                                                              \
+  }
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_READ_TEST(Bool, bool, false, true);
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_READ_TEST(Uint, uint64_t, 0, 1, 1000, 123123123,
+                   std::numeric_limits<uint64_t>::max());
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_READ_TEST(Double, double, 1.123, 3.1415926535, 0, -505.505,
+                   std::numeric_limits<double>::infinity(),
+                   -std::numeric_limits<double>::infinity());
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_READ_TEST(String, std::string, "hello", "world", "nandare",
+                   "haihaihai", std::string(), std::string(100000, 'a'));
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_READ_TEST(
+    PropertyValue, storage::PropertyValue, storage::PropertyValue(),
+    storage::PropertyValue(false), storage::PropertyValue(true),
+    storage::PropertyValue(123L), storage::PropertyValue(123.5),
+    storage::PropertyValue("nandare"),
+    storage::PropertyValue(std::vector<storage::PropertyValue>{
+        storage::PropertyValue("nandare"), storage::PropertyValue(123L)}),
+    storage::PropertyValue(std::map<std::string, storage::PropertyValue>{
+        {"nandare", storage::PropertyValue(123)}}));
+
+// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
+#define GENERATE_SKIP_TEST(name, type, ...)                        \
+  TEST_F(DecoderEncoderTest, Skip##name) {                         \
+    std::vector<type> dataset{__VA_ARGS__};                        \
+    {                                                              \
+      storage::durability::Encoder encoder;                        \
+      encoder.Initialize(storage_file, kTestMagic, kTestVersion);  \
+      for (const auto &item : dataset) {                           \
+        encoder.Write##name(item);                                 \
+      }                                                            \
+      {                                                            \
+        uint8_t invalid = 1;                                       \
+        encoder.Write(&invalid, sizeof(invalid));                  \
+      }                                                            \
+      encoder.Finalize();                                          \
+    }                                                              \
+    {                                                              \
+      storage::durability::Decoder decoder;                        \
+      auto version = decoder.Initialize(storage_file, kTestMagic); \
+      ASSERT_TRUE(version);                                        \
+      ASSERT_EQ(*version, kTestVersion);                           \
+      for (auto it = dataset.begin(); it != dataset.end(); ++it) { \
+        ASSERT_TRUE(decoder.Skip##name());                         \
+      }                                                            \
+      ASSERT_FALSE(decoder.Skip##name());                          \
+      ASSERT_FALSE(decoder.Skip##name());                          \
+      ASSERT_EQ(decoder.GetPosition(), decoder.GetSize());         \
+    }                                                              \
+  }
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_SKIP_TEST(String, std::string, "hello", "world", "nandare",
+                   "haihaihai", std::string(500000, 'a'));
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_SKIP_TEST(
+    PropertyValue, storage::PropertyValue, storage::PropertyValue(),
+    storage::PropertyValue(false), storage::PropertyValue(true),
+    storage::PropertyValue(123L), storage::PropertyValue(123.5),
+    storage::PropertyValue("nandare"),
+    storage::PropertyValue(std::vector<storage::PropertyValue>{
+        storage::PropertyValue("nandare"), storage::PropertyValue(123L)}),
+    storage::PropertyValue(std::map<std::string, storage::PropertyValue>{
+        {"nandare", storage::PropertyValue(123)}}));
+
+// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
+#define GENERATE_PARTIAL_READ_TEST(name, value)                                \
+  TEST_F(DecoderEncoderTest, PartialRead##name) {                              \
+    {                                                                          \
+      storage::durability::Encoder encoder;                                    \
+      encoder.Initialize(storage_file, kTestMagic, kTestVersion);              \
+      encoder.Write##name(value);                                              \
+      encoder.Finalize();                                                      \
+    }                                                                          \
+    {                                                                          \
+      utils::InputFile ifile;                                                  \
+      utils::OutputFile ofile;                                                 \
+      ifile.Open(storage_file);                                                \
+      ofile.Open(alternate_file, utils::OutputFile::Mode::OVERWRITE_EXISTING); \
+      auto size = ifile.GetSize();                                             \
+      for (size_t i = 0; i <= size; ++i) {                                     \
+        if (i != 0) {                                                          \
+          uint8_t byte;                                                        \
+          ASSERT_TRUE(ifile.Read(&byte, sizeof(byte)));                        \
+          ofile.Write(&byte, sizeof(byte));                                    \
+          ofile.Sync();                                                        \
+        }                                                                      \
+        storage::durability::Decoder decoder;                                  \
+        auto version = decoder.Initialize(alternate_file, kTestMagic);         \
+        if (i < kTestMagic.size() + sizeof(kTestVersion)) {                    \
+          ASSERT_FALSE(version);                                               \
+        } else {                                                               \
+          ASSERT_TRUE(version);                                                \
+          ASSERT_EQ(*version, kTestVersion);                                   \
+        }                                                                      \
+        if (i != size) {                                                       \
+          ASSERT_FALSE(decoder.Read##name());                                  \
+        } else {                                                               \
+          auto decoded = decoder.Read##name();                                 \
+          ASSERT_TRUE(decoded);                                                \
+          ASSERT_EQ(*decoded, value);                                          \
+        }                                                                      \
+      }                                                                        \
+    }                                                                          \
+  }
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_PARTIAL_READ_TEST(Marker, storage::durability::Marker::SECTION_VERTEX);
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_PARTIAL_READ_TEST(Bool, false);
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_PARTIAL_READ_TEST(Uint, 123123123);
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_PARTIAL_READ_TEST(Double, 3.1415926535);
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_PARTIAL_READ_TEST(String, "nandare");
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_PARTIAL_READ_TEST(
+    PropertyValue,
+    storage::PropertyValue(std::vector<storage::PropertyValue>{
+        storage::PropertyValue(), storage::PropertyValue(true),
+        storage::PropertyValue(123L), storage::PropertyValue(123.5),
+        storage::PropertyValue("nandare"),
+        storage::PropertyValue{std::map<std::string, storage::PropertyValue>{
+            {"haihai", storage::PropertyValue()}}}}));
+
+// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
+#define GENERATE_PARTIAL_SKIP_TEST(name, value)                                \
+  TEST_F(DecoderEncoderTest, PartialSkip##name) {                              \
+    {                                                                          \
+      storage::durability::Encoder encoder;                                    \
+      encoder.Initialize(storage_file, kTestMagic, kTestVersion);              \
+      encoder.Write##name(value);                                              \
+      encoder.Finalize();                                                      \
+    }                                                                          \
+    {                                                                          \
+      utils::InputFile ifile;                                                  \
+      utils::OutputFile ofile;                                                 \
+      ifile.Open(storage_file);                                                \
+      ofile.Open(alternate_file, utils::OutputFile::Mode::OVERWRITE_EXISTING); \
+      auto size = ifile.GetSize();                                             \
+      for (size_t i = 0; i <= size; ++i) {                                     \
+        if (i != 0) {                                                          \
+          uint8_t byte;                                                        \
+          ASSERT_TRUE(ifile.Read(&byte, sizeof(byte)));                        \
+          ofile.Write(&byte, sizeof(byte));                                    \
+          ofile.Sync();                                                        \
+        }                                                                      \
+        storage::durability::Decoder decoder;                                  \
+        auto version = decoder.Initialize(alternate_file, kTestMagic);         \
+        if (i < kTestMagic.size() + sizeof(kTestVersion)) {                    \
+          ASSERT_FALSE(version);                                               \
+        } else {                                                               \
+          ASSERT_TRUE(version);                                                \
+          ASSERT_EQ(*version, kTestVersion);                                   \
+        }                                                                      \
+        if (i != size) {                                                       \
+          ASSERT_FALSE(decoder.Skip##name());                                  \
+        } else {                                                               \
+          ASSERT_TRUE(decoder.Skip##name());                                   \
+        }                                                                      \
+      }                                                                        \
+    }                                                                          \
+  }
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_PARTIAL_SKIP_TEST(String, "nandare");
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+GENERATE_PARTIAL_SKIP_TEST(
+    PropertyValue,
+    storage::PropertyValue(std::vector<storage::PropertyValue>{
+        storage::PropertyValue(), storage::PropertyValue(true),
+        storage::PropertyValue(123L), storage::PropertyValue(123.5),
+        storage::PropertyValue("nandare"),
+        storage::PropertyValue{std::map<std::string, storage::PropertyValue>{
+            {"haihai", storage::PropertyValue()}}}}));
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) {
+  {
+    storage::durability::Encoder encoder;
+    encoder.Initialize(storage_file, kTestMagic, kTestVersion);
+    encoder.WritePropertyValue(storage::PropertyValue(123L));
+    encoder.Finalize();
+  }
+  {
+    utils::OutputFile file;
+    file.Open(storage_file, utils::OutputFile::Mode::OVERWRITE_EXISTING);
+    for (auto marker : storage::durability::kMarkersAll) {
+      bool valid_marker;
+      switch (marker) {
+        case storage::durability::Marker::TYPE_NULL:
+        case storage::durability::Marker::TYPE_BOOL:
+        case storage::durability::Marker::TYPE_INT:
+        case storage::durability::Marker::TYPE_DOUBLE:
+        case storage::durability::Marker::TYPE_STRING:
+        case storage::durability::Marker::TYPE_LIST:
+        case storage::durability::Marker::TYPE_MAP:
+        case storage::durability::Marker::TYPE_PROPERTY_VALUE:
+          valid_marker = true;
+          break;
+
+        case storage::durability::Marker::SECTION_VERTEX:
+        case storage::durability::Marker::SECTION_EDGE:
+        case storage::durability::Marker::SECTION_MAPPER:
+        case storage::durability::Marker::SECTION_METADATA:
+        case storage::durability::Marker::SECTION_INDICES:
+        case storage::durability::Marker::SECTION_CONSTRAINTS:
+        case storage::durability::Marker::SECTION_OFFSETS:
+        case storage::durability::Marker::VALUE_FALSE:
+        case storage::durability::Marker::VALUE_TRUE:
+          valid_marker = false;
+          break;
+      }
+      // We only run this test with invalid markers.
+      if (valid_marker) continue;
+      {
+        file.SetPosition(
+            utils::OutputFile::Position::RELATIVE_TO_END,
+            -(sizeof(uint64_t) + sizeof(storage::durability::Marker)));
+        auto byte = static_cast<uint8_t>(marker);
+        file.Write(&byte, sizeof(byte));
+        file.Sync();
+      }
+      {
+        storage::durability::Decoder decoder;
+        auto version = decoder.Initialize(storage_file, kTestMagic);
+        ASSERT_TRUE(version);
+        ASSERT_EQ(*version, kTestVersion);
+        ASSERT_FALSE(decoder.SkipPropertyValue());
+      }
+      {
+        storage::durability::Decoder decoder;
+        auto version = decoder.Initialize(storage_file, kTestMagic);
+        ASSERT_TRUE(version);
+        ASSERT_EQ(*version, kTestVersion);
+        ASSERT_FALSE(decoder.ReadPropertyValue());
+      }
+    }
+    {
+      {
+        file.SetPosition(
+            utils::OutputFile::Position::RELATIVE_TO_END,
+            -(sizeof(uint64_t) + sizeof(storage::durability::Marker)));
+        uint8_t byte = 1;
+        file.Write(&byte, sizeof(byte));
+        file.Sync();
+      }
+      {
+        storage::durability::Decoder decoder;
+        auto version = decoder.Initialize(storage_file, kTestMagic);
+        ASSERT_TRUE(version);
+        ASSERT_EQ(*version, kTestVersion);
+        ASSERT_FALSE(decoder.SkipPropertyValue());
+      }
+      {
+        storage::durability::Decoder decoder;
+        auto version = decoder.Initialize(storage_file, kTestMagic);
+        ASSERT_TRUE(version);
+        ASSERT_EQ(*version, kTestVersion);
+        ASSERT_FALSE(decoder.ReadPropertyValue());
+      }
+    }
+  }
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_F(DecoderEncoderTest, DecoderPosition) {
+  {
+    storage::durability::Encoder encoder;
+    encoder.Initialize(storage_file, kTestMagic, kTestVersion);
+    encoder.WriteBool(true);
+    encoder.Finalize();
+  }
+  {
+    storage::durability::Decoder decoder;
+    auto version = decoder.Initialize(storage_file, kTestMagic);
+    ASSERT_TRUE(version);
+    ASSERT_EQ(*version, kTestVersion);
+    for (int i = 0; i < 10; ++i) {
+      decoder.SetPosition(kTestMagic.size() + sizeof(kTestVersion));
+      auto decoded = decoder.ReadBool();
+      ASSERT_TRUE(decoded);
+      ASSERT_TRUE(*decoded);
+      ASSERT_EQ(decoder.GetPosition(), decoder.GetSize());
+    }
+  }
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_F(DecoderEncoderTest, EncoderPosition) {
+  {
+    storage::durability::Encoder encoder;
+    encoder.Initialize(storage_file, kTestMagic, kTestVersion);
+    encoder.WriteBool(false);
+    encoder.SetPosition(kTestMagic.size() + sizeof(kTestVersion));
+    ASSERT_EQ(encoder.GetPosition(), kTestMagic.size() + sizeof(kTestVersion));
+    encoder.WriteBool(true);
+    encoder.Finalize();
+  }
+  {
+    storage::durability::Decoder decoder;
+    auto version = decoder.Initialize(storage_file, kTestMagic);
+    ASSERT_TRUE(version);
+    ASSERT_EQ(*version, kTestVersion);
+    auto decoded = decoder.ReadBool();
+    ASSERT_TRUE(decoded);
+    ASSERT_TRUE(*decoded);
+    ASSERT_EQ(decoder.GetPosition(), decoder.GetSize());
+  }
+}