diff --git a/.gitignore b/.gitignore
index 260c92a1c..7e1ef8dfd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -62,8 +62,6 @@ src/distributed/storage_gc_rpc_messages.capnp
 src/distributed/storage_gc_rpc_messages.hpp
 src/distributed/token_sharing_rpc_messages.capnp
 src/distributed/token_sharing_rpc_messages.hpp
-src/distributed/transactional_cache_cleaner_rpc_messages.capnp
-src/distributed/transactional_cache_cleaner_rpc_messages.hpp
 src/distributed/updates_rpc_messages.capnp
 src/distributed/updates_rpc_messages.hpp
 src/query/plan/distributed_ops.capnp
@@ -73,5 +71,5 @@ src/stats/stats_rpc_messages.capnp
 src/stats/stats_rpc_messages.hpp
 src/storage/concurrent_id_mapper_rpc_messages.capnp
 src/storage/concurrent_id_mapper_rpc_messages.hpp
-src/transactions/engine_rpc_messages.capnp
-src/transactions/engine_rpc_messages.hpp
+src/transactions/distributed/engine_rpc_messages.capnp
+src/transactions/distributed/engine_rpc_messages.hpp
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8db566170..f5bba8848 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -73,10 +73,9 @@ set(memgraph_src_files
     storage/property_value_store.cpp
     storage/record_accessor.cpp
     storage/vertex_accessor.cpp
-    transactions/engine_master.cpp
-    transactions/engine_single_node.cpp
-    transactions/engine_worker.cpp
-    transactions/snapshot.cpp
+    transactions/distributed/engine_master.cpp
+    transactions/distributed/engine_worker.cpp
+    transactions/single_node/engine_single_node.cpp
     memgraph_init.cpp
 )
 # -----------------------------------------------------------------------------
@@ -101,14 +100,13 @@ add_lcp(distributed/index_rpc_messages.lcp CAPNP_SCHEMA @0xa8aab46862945bd6)
 add_capnp(distributed/index_rpc_messages.capnp)
 add_lcp(distributed/plan_rpc_messages.lcp CAPNP_SCHEMA @0xfcbc48dc9f106d28)
 add_capnp(distributed/plan_rpc_messages.capnp)
-add_lcp(distributed/pull_produce_rpc_messages.lcp CAPNP_SCHEMA @0xa78a9254a73685bd)
+add_lcp(distributed/pull_produce_rpc_messages.lcp CAPNP_SCHEMA @0xa78a9254a73685bd
+        DEPENDS transactions/distributed/serialization.lcp)
 add_capnp(distributed/pull_produce_rpc_messages.capnp)
 add_lcp(distributed/storage_gc_rpc_messages.lcp CAPNP_SCHEMA @0xd705663dfe36cf81)
 add_capnp(distributed/storage_gc_rpc_messages.capnp)
 add_lcp(distributed/token_sharing_rpc_messages.lcp CAPNP_SCHEMA @0x8f295db54ec4caec)
 add_capnp(distributed/token_sharing_rpc_messages.capnp)
-add_lcp(distributed/transactional_cache_cleaner_rpc_messages.lcp CAPNP_SCHEMA @0xe2be6183a1ff9e11)
-add_capnp(distributed/transactional_cache_cleaner_rpc_messages.capnp)
 add_lcp(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a)
 add_capnp(distributed/updates_rpc_messages.capnp)
 
@@ -121,8 +119,9 @@ add_capnp(query/plan/distributed_ops.capnp)
 
 add_lcp(storage/concurrent_id_mapper_rpc_messages.lcp CAPNP_SCHEMA @0xa6068dae93d225dd)
 add_capnp(storage/concurrent_id_mapper_rpc_messages.capnp)
-add_lcp(transactions/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5)
-add_capnp(transactions/engine_rpc_messages.capnp)
+add_lcp(transactions/distributed/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5
+        DEPENDS transactions/distributed/serialization.lcp)
+add_capnp(transactions/distributed/engine_rpc_messages.capnp)
 
 add_custom_target(generate_lcp DEPENDS ${generated_lcp_files})
 
@@ -135,7 +134,6 @@ add_capnp(query/common.capnp)
 add_capnp(query/frontend/ast/ast.capnp)
 add_capnp(query/frontend/semantic/symbol.capnp)
 add_capnp(storage/serialization.capnp)
-add_capnp(transactions/common.capnp)
 
 add_custom_target(generate_capnp DEPENDS generate_lcp ${generated_capnp_files})
 
diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp
index c55114539..299a4916a 100644
--- a/src/database/distributed_graph_db.cpp
+++ b/src/database/distributed_graph_db.cpp
@@ -15,10 +15,11 @@
 #include "distributed/durability_rpc_master.hpp"
 #include "distributed/durability_rpc_worker.hpp"
 #include "distributed/index_rpc_server.hpp"
+#include "distributed/plan_consumer.hpp"
 #include "distributed/plan_dispatcher.hpp"
+#include "distributed/produce_rpc_server.hpp"
 #include "distributed/pull_rpc_clients.hpp"
 #include "distributed/token_sharing_rpc_server.hpp"
-#include "distributed/transactional_cache_cleaner.hpp"
 #include "distributed/updates_rpc_clients.hpp"
 #include "distributed/updates_rpc_server.hpp"
 #include "durability/snapshooter.hpp"
@@ -27,7 +28,8 @@
 #include "storage/concurrent_id_mapper.hpp"
 #include "storage/concurrent_id_mapper_master.hpp"
 #include "storage/concurrent_id_mapper_worker.hpp"
-#include "transactions/engine_master.hpp"
+#include "transactions/distributed/engine_master.hpp"
+#include "transactions/distributed/engine_worker.hpp"
 #include "utils/file.hpp"
 
 using namespace std::literals::chrono_literals;
@@ -472,17 +474,6 @@ class DistributedRecoveryTransactions
  public:
   explicit DistributedRecoveryTransactions(DistributedGraphDb *db) : db_(db) {}
 
-  void Begin(const tx::TransactionId &tx_id) override {
-    CHECK(accessors_.find(tx_id) == accessors_.end())
-        << "Double transaction start";
-    accessors_.emplace(tx_id, db_->Access());
-  }
-
-  void Abort(const tx::TransactionId &tx_id) final {
-    GetAccessor(tx_id)->Abort();
-    accessors_.erase(accessors_.find(tx_id));
-  }
-
   void Commit(const tx::TransactionId &tx_id) final {
     GetAccessor(tx_id)->Commit();
     accessors_.erase(accessors_.find(tx_id));
@@ -506,6 +497,17 @@ class MasterRecoveryTransactions final
   explicit MasterRecoveryTransactions(Master *db)
       : DistributedRecoveryTransactions(db) {}
 
+  void Begin(const tx::TransactionId &tx_id) final {
+    CHECK(accessors_.find(tx_id) == accessors_.end())
+        << "Double transaction start";
+    accessors_.emplace(tx_id, db_->Access());
+  }
+
+  void Abort(const tx::TransactionId &tx_id) final {
+    GetAccessor(tx_id)->Abort();
+    accessors_.erase(accessors_.find(tx_id));
+  }
+
  protected:
   virtual GraphDbAccessor *GetAccessor(
       const tx::TransactionId &tx_id) override {
@@ -526,6 +528,10 @@ class WorkerRecoveryTransactions final
     LOG(FATAL) << "Unexpected transaction begin on worker recovery.";
   }
 
+  void Abort(const tx::TransactionId &tx_id) override {
+    LOG(FATAL) << "Unexpected transaction abort on worker recovery.";
+  }
+
  protected:
   GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id) override {
     auto found = accessors_.find(tx_id);
@@ -586,7 +592,7 @@ class Master {
   database::Master *self_{nullptr};
   communication::rpc::Server server_{
       config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
-  tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_};
+  tx::EngineMaster tx_engine_{server_, rpc_worker_clients_, &wal_};
   distributed::MasterCoordination coordination_{server_.endpoint()};
   std::unique_ptr<StorageGcMaster> storage_gc_ =
       std::make_unique<StorageGcMaster>(
@@ -610,8 +616,6 @@ class Master {
   distributed::UpdatesRpcServer updates_server_{self_, &server_};
   distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
   distributed::DataManager data_manager_{*self_, data_clients_};
-  distributed::TransactionalCacheCleaner cache_cleaner_{
-      tx_engine_, updates_server_, data_manager_};
   distributed::ClusterDiscoveryMaster cluster_discovery_{
       server_, coordination_, rpc_worker_clients_,
       config_.durability_directory};
@@ -626,6 +630,14 @@ class Master {
 
 Master::Master(Config config)
     : impl_(std::make_unique<impl::Master>(config, this)) {
+  // Register all transaction based caches for cleanup.
+  impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
+      impl_->updates_server_);
+  impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_);
+
+  // Start transactional cache cleanup.
+  impl_->tx_engine_.StartTransactionalCacheCleanup();
+
   if (impl_->config_.durability_enabled)
     utils::CheckDir(impl_->config_.durability_directory);
 
@@ -722,6 +734,10 @@ Master::~Master() {
     auto dba = Access();
     MakeSnapshot(*dba);
   }
+
+  // Transactional cache cleanup must be stopped before all of the objects
+  // that were registered for cleanup are destructed.
+  impl_->tx_engine_.StopTransactionalCacheCleanup();
 }
 
 std::unique_ptr<GraphDbAccessor> Master::Access() {
@@ -900,7 +916,7 @@ class Worker {
   distributed::WorkerCoordination coordination_{server_,
                                                 config_.master_endpoint};
   distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
-  tx::WorkerEngine tx_engine_{rpc_worker_clients_.GetClientPool(0)};
+  tx::EngineWorker tx_engine_{server_, rpc_worker_clients_.GetClientPool(0), &wal_};
   std::unique_ptr<StorageGcWorker> storage_gc_ =
       std::make_unique<StorageGcWorker>(
           *storage_, tx_engine_, config_.gc_cycle_sec,
@@ -923,9 +939,6 @@ class Worker {
   distributed::UpdatesRpcServer updates_server_{self_, &server_};
   distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
   distributed::DataManager data_manager_{*self_, data_clients_};
-  distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
-      tx_engine_,      &wal_,           server_,
-      produce_server_, updates_server_, data_manager_};
   distributed::DurabilityRpcWorker durability_rpc_{self_, &server_};
   distributed::ClusterDiscoveryWorker cluster_discovery_{
       server_, coordination_, rpc_worker_clients_.GetClientPool(0)};
@@ -940,6 +953,16 @@ class Worker {
 
 Worker::Worker(Config config)
     : impl_(std::make_unique<impl::Worker>(config, this)) {
+  // Register all transaction based caches for cleanup.
+  impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
+      impl_->updates_server_);
+  impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_);
+  impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
+      impl_->produce_server_);
+
+  // Start transactional cache cleanup.
+  impl_->tx_engine_.StartTransactionalCacheCleanup();
+
   if (impl_->config_.durability_enabled)
     utils::CheckDir(impl_->config_.durability_directory);
 
@@ -996,6 +1019,9 @@ Worker::~Worker() {
   is_accepting_transactions_ = false;
   impl_->tx_engine_.LocalForEachActiveTransaction(
       [](auto &t) { t.set_should_abort(); });
+  // Transactional cache cleanup must be stopped before all of the objects
+  // that were registered for cleanup are destructed.
+  impl_->tx_engine_.StopTransactionalCacheCleanup();
 }
 
 std::unique_ptr<GraphDbAccessor> Worker::Access() {
diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp
index fa186a1fb..fe3cea9ac 100644
--- a/src/database/graph_db.cpp
+++ b/src/database/graph_db.cpp
@@ -11,7 +11,7 @@
 #include "durability/recovery.hpp"
 #include "durability/snapshooter.hpp"
 #include "storage/concurrent_id_mapper_single_node.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 #include "utils/file.hpp"
 
 namespace database {
@@ -197,7 +197,7 @@ class SingleNode {
       config_.worker_id, config_.durability_directory,
       config_.durability_enabled, config_.synchronous_commit};
 
-  tx::SingleNodeEngine tx_engine_{&wal_};
+  tx::EngineSingleNode tx_engine_{&wal_};
   std::unique_ptr<StorageGcSingleNode> storage_gc_ =
       std::make_unique<StorageGcSingleNode>(*storage_, tx_engine_,
                                             config_.gc_cycle_sec);
diff --git a/src/database/storage_gc_worker.hpp b/src/database/storage_gc_worker.hpp
index 4d938dbb9..7219b412e 100644
--- a/src/database/storage_gc_worker.hpp
+++ b/src/database/storage_gc_worker.hpp
@@ -3,9 +3,7 @@
 #include "communication/rpc/client_pool.hpp"
 #include "database/storage_gc.hpp"
 #include "distributed/storage_gc_rpc_messages.hpp"
-
-#include "transactions/engine_worker.hpp"
-#include "transactions/transaction.hpp"
+#include "transactions/distributed/engine_worker.hpp"
 
 namespace database {
 class StorageGcWorker : public StorageGc {
@@ -30,7 +28,7 @@ class StorageGcWorker : public StorageGc {
     // try to acquire a lock which hasn't been released (if the transaction
     // cache cleaner was not scheduled at this time), and take a look into the
     // commit log which no longer contains that transaction id.
-    dynamic_cast<tx::WorkerEngine &>(tx_engine_)
+    dynamic_cast<tx::EngineWorker &>(tx_engine_)
         .ClearTransactionalCache(oldest_active);
     auto safe_to_delete = GetClogSafeTransaction(oldest_active);
     if (safe_to_delete) {
diff --git a/src/distributed/produce_rpc_server.cpp b/src/distributed/produce_rpc_server.cpp
index 915ba8899..6f7d0eb78 100644
--- a/src/distributed/produce_rpc_server.cpp
+++ b/src/distributed/produce_rpc_server.cpp
@@ -5,7 +5,7 @@
 #include "distributed/pull_produce_rpc_messages.hpp"
 #include "query/common.hpp"
 #include "query/exceptions.hpp"
-#include "transactions/engine_worker.hpp"
+#include "transactions/distributed/engine_worker.hpp"
 
 namespace distributed {
 
@@ -98,7 +98,7 @@ ProduceRpcServer::OngoingProduce::PullOneFromCursor() {
 }
 
 ProduceRpcServer::ProduceRpcServer(database::Worker *db,
-                                   tx::WorkerEngine *tx_engine,
+                                   tx::EngineWorker *tx_engine,
                                    communication::rpc::Server &server,
                                    const PlanConsumer &plan_consumer,
                                    DataManager *data_manager)
@@ -136,11 +136,11 @@ ProduceRpcServer::ProduceRpcServer(database::Worker *db,
       });
 }
 
-void ProduceRpcServer::FinishAndClearOngoingProducePlans(
-    tx::TransactionId tx_id) {
+void ProduceRpcServer::ClearTransactionalCache(
+    tx::TransactionId oldest_active) {
   std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
   for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) {
-    if (std::get<0>(it->first) == tx_id) {
+    if (std::get<0>(it->first) < oldest_active) {
       it = ongoing_produces_.erase(it);
     } else {
       ++it;
diff --git a/src/distributed/produce_rpc_server.hpp b/src/distributed/produce_rpc_server.hpp
index 8abea9cb8..131c416bf 100644
--- a/src/distributed/produce_rpc_server.hpp
+++ b/src/distributed/produce_rpc_server.hpp
@@ -25,7 +25,7 @@ class Worker;
 }
 
 namespace tx {
-class WorkerEngine;
+class EngineWorker;
 }
 
 namespace distributed {
@@ -73,14 +73,15 @@ class ProduceRpcServer {
   };
 
  public:
-  ProduceRpcServer(database::Worker *db, tx::WorkerEngine *tx_engine,
+  ProduceRpcServer(database::Worker *db, tx::EngineWorker *tx_engine,
                    communication::rpc::Server &server,
                    const PlanConsumer &plan_consumer,
                    DataManager *data_manager);
 
-  /// Finish and clear ongoing produces for all plans that are tied to a
-  /// transaction with tx_id.
-  void FinishAndClearOngoingProducePlans(tx::TransactionId tx_id);
+  /// Clears all ongoing produces that are older than the oldest active
+  /// transaction. This function should be registered in the transaction engine
+  /// for transactional cache cleanup.
+  void ClearTransactionalCache(tx::TransactionId oldest_active);
 
  private:
   std::mutex ongoing_produces_lock_;
@@ -93,7 +94,7 @@ class ProduceRpcServer {
   database::Worker *db_;
   communication::rpc::Server &produce_rpc_server_;
   const distributed::PlanConsumer &plan_consumer_;
-  tx::WorkerEngine *tx_engine_;
+  tx::EngineWorker *tx_engine_;
 
   /// Gets an ongoing produce for the given pull request. Creates a new one if
   /// there is none currently existing.
diff --git a/src/distributed/pull_produce_rpc_messages.lcp b/src/distributed/pull_produce_rpc_messages.lcp
index dd9f62a83..35047b3f6 100644
--- a/src/distributed/pull_produce_rpc_messages.lcp
+++ b/src/distributed/pull_produce_rpc_messages.lcp
@@ -22,17 +22,17 @@ cpp<#
  #include "distributed/data_manager.hpp"
  cpp<#)
 
+(load "transactions/distributed/serialization.lcp")
+
 (lcp:namespace distributed)
 
 (lcp:capnp-namespace "distributed")
 
 (lcp:capnp-import 'dis "/distributed/serialization.capnp")
 (lcp:capnp-import 'sem "/query/frontend/semantic/symbol.capnp")
-(lcp:capnp-import 'tx "/transactions/common.capnp")
 (lcp:capnp-import 'utils "/utils/serialization.capnp")
 
 (lcp:capnp-type-conversion "tx::CommandId" "UInt32")
-(lcp:capnp-type-conversion "tx::Snapshot" "Tx.Snapshot")
 (lcp:capnp-type-conversion "tx::TransactionId" "UInt64")
 
 #>cpp
@@ -321,7 +321,11 @@ cpp<#)
 (lcp:define-rpc pull
     (:request
      ((tx-id "tx::TransactionId")
-      (tx-snapshot "tx::Snapshot")
+      (tx-snapshot "tx::Snapshot"
+                   :capnp-type "List(UInt64)"
+                   :capnp-init nil
+                   :capnp-save #'save-snapshot
+                   :capnp-load #'load-snapshot)
       (plan-id :int64_t)
       (command-id "tx::CommandId")
       (params "Parameters"
diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp
index 52ee3c8ee..25806cb47 100644
--- a/src/distributed/rpc_worker_clients.hpp
+++ b/src/distributed/rpc_worker_clients.hpp
@@ -8,7 +8,6 @@
 #include "distributed/coordination.hpp"
 #include "distributed/index_rpc_messages.hpp"
 #include "distributed/token_sharing_rpc_messages.hpp"
-#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
 #include "storage/types.hpp"
 #include "transactions/transaction.hpp"
 #include "utils/future.hpp"
@@ -132,36 +131,4 @@ class TokenSharingRpcClients {
   RpcWorkerClients *clients_;
 };
 
-/** Join ongoing produces on all workers.
- *
- * Sends a RPC request to all workers when a transaction is ending, notifying
- * them to end all ongoing produces tied to that transaction.
- */
-class OngoingProduceJoinerRpcClients {
- public:
-  OngoingProduceJoinerRpcClients(RpcWorkerClients &clients)
-      : clients_(clients) {}
-
-  void JoinOngoingProduces(tx::TransactionId tx_id, bool committed) {
-    auto futures = clients_.ExecuteOnWorkers<void>(
-        0, [tx_id, committed](int worker_id,
-                              communication::rpc::ClientPool &client_pool) {
-          auto result = client_pool.Call<distributed::WaitOnTransactionEndRpc>(
-              tx_id, committed);
-          CHECK(result)
-              << "[WaitOnTransactionEndRpc] failed to notify that transaction "
-              << tx_id << " ended";
-        });
-
-    // We need to wait for all workers to destroy pending futures to avoid
-    // using already destroyed (released) transaction objects.
-    for (auto &future : futures) {
-      future.wait();
-    }
-  }
-
- private:
-  RpcWorkerClients &clients_;
-};
-
 }  // namespace distributed
diff --git a/src/distributed/transactional_cache_cleaner.hpp b/src/distributed/transactional_cache_cleaner.hpp
deleted file mode 100644
index f45566009..000000000
--- a/src/distributed/transactional_cache_cleaner.hpp
+++ /dev/null
@@ -1,98 +0,0 @@
-#pragma once
-
-#include <functional>
-#include <vector>
-
-#include "communication/rpc/server.hpp"
-#include "distributed/produce_rpc_server.hpp"
-#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
-#include "transactions/engine.hpp"
-#include "transactions/engine_worker.hpp"
-#include "utils/scheduler.hpp"
-
-namespace distributed {
-
-/// Periodically calls `ClearTransactionalCache(oldest_transaction)` on all
-/// registered objects.
-class TransactionalCacheCleaner {
-  /// The wait time between two releases of local transaction objects that have
-  /// expired on the master.
-  static constexpr std::chrono::seconds kCacheReleasePeriod{1};
-
- public:
-  template <typename... T>
-  TransactionalCacheCleaner(tx::Engine &tx_engine, T &... caches)
-      : tx_engine_(tx_engine) {
-    Register(caches...);
-    cache_clearing_scheduler_.Run(
-        "DistrTxCacheGc", kCacheReleasePeriod,
-        [this]() { this->Clear(tx_engine_.GlobalGcSnapshot().back()); });
-  }
-
- protected:
-  /// Registers the given object for transactional cleaning. The object will
-  /// periodically get it's `ClearCache(tx::TransactionId)` method called
-  /// with the oldest active transaction id. Note that the ONLY guarantee for
-  /// the call param is that there are no transactions alive that have an id
-  /// lower than it.
-  template <typename TCache>
-  void Register(TCache &cache) {
-    functions_.emplace_back([&cache](tx::TransactionId oldest_active) {
-      cache.ClearTransactionalCache(oldest_active);
-    });
-  }
-
- private:
-  template <typename TCache, typename... T>
-  void Register(TCache &cache, T &... caches) {
-    Register(cache);
-    Register(caches...);
-  }
-
-  void Clear(tx::TransactionId oldest_active) {
-    for (auto &f : functions_) f(oldest_active);
-  }
-
-  tx::Engine &tx_engine_;
-  std::vector<std::function<void(tx::TransactionId &oldest_active)>> functions_;
-  utils::Scheduler cache_clearing_scheduler_;
-};
-
-/// Registers a RPC server that listens for `WaitOnTransactionEnd` requests
-/// that require all ongoing produces to finish. It also periodically calls
-/// `ClearTransactionalCache` on all registered objects.
-class WorkerTransactionalCacheCleaner : public TransactionalCacheCleaner {
- public:
-  template <class... T>
-  WorkerTransactionalCacheCleaner(tx::WorkerEngine &tx_engine,
-                                  durability::WriteAheadLog *wal,
-                                  communication::rpc::Server &server,
-                                  ProduceRpcServer &produce_server,
-                                  T &... caches)
-      : TransactionalCacheCleaner(tx_engine, caches...),
-        wal_(wal),
-        rpc_server_(server),
-        produce_server_(produce_server) {
-    Register(tx_engine);
-    rpc_server_.Register<WaitOnTransactionEndRpc>(
-        [this](const auto &req_reader, auto *res_builder) {
-          auto tx_id = req_reader.getTxId();
-          auto committed = req_reader.getCommitted();
-          produce_server_.FinishAndClearOngoingProducePlans(tx_id);
-          if (wal_) {
-            if (committed) {
-              wal_->Emplace(database::StateDelta::TxCommit(tx_id));
-            } else {
-              wal_->Emplace(database::StateDelta::TxAbort(tx_id));
-            }
-          }
-        });
-  }
-
- private:
-  durability::WriteAheadLog *wal_;
-  communication::rpc::Server &rpc_server_;
-  ProduceRpcServer &produce_server_;
-};
-
-}  // namespace distributed
diff --git a/src/distributed/transactional_cache_cleaner_rpc_messages.lcp b/src/distributed/transactional_cache_cleaner_rpc_messages.lcp
deleted file mode 100644
index 8cb6d65b4..000000000
--- a/src/distributed/transactional_cache_cleaner_rpc_messages.lcp
+++ /dev/null
@@ -1,18 +0,0 @@
-#>cpp
-#pragma once
-
-#include "distributed/transactional_cache_cleaner_rpc_messages.capnp.h"
-#include "communication/rpc/messages.hpp"
-#include "transactions/type.hpp"
-cpp<#
-
-(lcp:namespace distributed)
-
-(lcp:capnp-namespace "distributed")
-
-(lcp:define-rpc wait-on-transaction-end
-    (:request ((tx-id "tx::TransactionId" :capnp-type "UInt64")
-               (committed :bool)))
-  (:response ()))
-
-(lcp:pop-namespace)
diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp
index afa520934..10259a62d 100644
--- a/src/durability/wal.cpp
+++ b/src/durability/wal.cpp
@@ -52,11 +52,13 @@ void WriteAheadLog::WalFile::Init() {
     current_wal_file_ = std::experimental::filesystem::path();
   } else {
     current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_);
+    // TODO: Fix error handling, the encoder_ returns `true` or `false`.
     try {
       writer_.Open(current_wal_file_);
       encoder_.WriteRAW(durability::kWalMagic.data(),
                         durability::kWalMagic.size());
       encoder_.WriteInt(durability::kVersion);
+      writer_.Flush();
     } catch (std::ios_base::failure &) {
       LOG(ERROR) << "Failed to open write-ahead log file: "
                  << current_wal_file_;
@@ -81,6 +83,7 @@ void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
       if (!delta) break;
       latest_tx_ = std::max(latest_tx_, delta->transaction_id);
       delta->Encode(writer_, encoder_);
+      writer_.Flush();
       if (++current_wal_file_delta_count_ >= FLAGS_wal_rotate_deltas_count)
         RotateFile();
     }
@@ -97,6 +100,7 @@ void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
 }
 
 void WriteAheadLog::WalFile::RotateFile() {
+  writer_.Flush();
   writer_.Close();
   std::experimental::filesystem::rename(
       current_wal_file_,
diff --git a/src/transactions/commit_log.hpp b/src/transactions/commit_log.hpp
index 6081cc2f7..b99a55ce4 100644
--- a/src/transactions/commit_log.hpp
+++ b/src/transactions/commit_log.hpp
@@ -1,16 +1,16 @@
 #pragma once
 
 #include "data_structures/bitset/dynamic_bitset.hpp"
-#include "transactions/common.capnp.h"
 #include "transactions/type.hpp"
 
 namespace tx {
 
 // This class is lock free. There is no need to acquire any lock when accessing
 // this class and this class doesn't acquire any lock on method calls.
-class CommitLog {
+class CommitLog final {
  public:
   static constexpr int kBitsetBlockSize = 32768;
+
   CommitLog() = default;
   CommitLog(const CommitLog &) = delete;
   CommitLog(CommitLog &&) = delete;
@@ -37,7 +37,7 @@ class CommitLog {
   // lower than `id`.
   void garbage_collect_older(TransactionId id) { log.delete_prefix(2 * id); }
 
-  class Info {
+  class Info final {
    public:
     Info() {}  // Needed for serialization.
     enum Status {
@@ -46,24 +46,27 @@ class CommitLog {
       ABORTED = 2,    // 10
     };
 
-    explicit Info(uint8_t flags) : flags_(flags) {}
+    explicit Info(uint8_t flags) {
+      if (flags & ABORTED) {
+        flags_ = ABORTED;
+      } else if (flags & COMMITTED) {
+        flags_ = COMMITTED;
+      } else {
+        flags_ = ACTIVE;
+      }
+    }
 
     bool is_active() const { return flags_ == ACTIVE; }
 
-    bool is_committed() const { return flags_ & COMMITTED; }
+    bool is_committed() const {
+      if (flags_ & ABORTED) return false;
+      return flags_ & COMMITTED;
+    }
 
     bool is_aborted() const { return flags_ & ABORTED; }
 
     operator uint8_t() const { return flags_; }
 
-    void Save(capnp::CommitLogInfo::Builder *builder) const {
-      builder->setFlags(flags_);
-    }
-
-    void Load(const capnp::CommitLogInfo::Reader &reader) {
-      flags_ = reader.getFlags();
-    }
-
    private:
     uint8_t flags_{0};
   };
diff --git a/src/transactions/common.capnp b/src/transactions/common.capnp
deleted file mode 100644
index f9999efe4..000000000
--- a/src/transactions/common.capnp
+++ /dev/null
@@ -1,12 +0,0 @@
-@0xcdbe169866471033;
-
-using Cxx = import "/capnp/c++.capnp";
-$Cxx.namespace("tx::capnp");
-
-struct Snapshot {
-  transactionIds @0 :List(UInt64);
-}
-
-struct CommitLogInfo {
-  flags @0 :UInt8;
-}
diff --git a/src/transactions/distributed/engine_distributed.hpp b/src/transactions/distributed/engine_distributed.hpp
new file mode 100644
index 000000000..2407199e8
--- /dev/null
+++ b/src/transactions/distributed/engine_distributed.hpp
@@ -0,0 +1,62 @@
+/// @file
+
+#pragma once
+
+#include "communication/rpc/server.hpp"
+#include "distributed/rpc_worker_clients.hpp"
+#include "transactions/engine.hpp"
+
+namespace tx {
+
+/// Distributed base transaction engine. Has only common functionality shared
+/// between the master and worker engines.
+class EngineDistributed : public Engine {
+  /// The wait time between two releases of local transaction objects that have
+  /// expired on the master.
+  static constexpr std::chrono::seconds kCacheReleasePeriod{1};
+
+ public:
+  /// Starts transactional cache cleanup. Transactional cache cleanup should be
+  /// started *after* all of the registered objects are constructed to avoid
+  /// segmentation faults.
+  void StartTransactionalCacheCleanup() {
+    cache_clearing_scheduler_.Run("TX cache GC", kCacheReleasePeriod, [this]() {
+      std::lock_guard<std::mutex> guard(lock_);
+      // TODO (mferencevic): this has to be aware that `GlobalGcSnapshot` can
+      // throw!
+      auto oldest_active = GlobalGcSnapshot().back();
+      // Call all registered functions for cleanup.
+      for (auto &f : functions_) f(oldest_active);
+      // Clean our cache.
+      ClearTransactionalCache(oldest_active);
+    });
+  }
+
+  /// Registers the given object for transactional cleaning. The object will
+  /// periodically get it's `ClearCache(tx::TransactionId)` method called
+  /// with the oldest active transaction id. Note that the ONLY guarantee for
+  /// the call param is that there are no transactions alive that have an id
+  /// lower than it.
+  template <typename TCache>
+  void RegisterForTransactionalCacheCleanup(TCache &cache) {
+    std::lock_guard<std::mutex> guard(lock_);
+    functions_.emplace_back([&cache](tx::TransactionId oldest_active) {
+      cache.ClearTransactionalCache(oldest_active);
+    });
+  }
+
+  /// Stops transactional cache cleanup. Transactional cache cleanup should be
+  /// stopped *before* all of the registered objects are destructed to avoid
+  /// segmentation faults.
+  void StopTransactionalCacheCleanup() { cache_clearing_scheduler_.Stop(); }
+
+  /// Clears the cache of local transactions that have expired. This function
+  /// has to be implemented by each class that inherits this class.
+  virtual void ClearTransactionalCache(TransactionId oldest_active) = 0;
+
+ private:
+  std::mutex lock_;
+  std::vector<std::function<void(tx::TransactionId)>> functions_;
+  utils::Scheduler cache_clearing_scheduler_;
+};
+}  // namespace tx
diff --git a/src/transactions/engine_master.cpp b/src/transactions/distributed/engine_master.cpp
similarity index 53%
rename from src/transactions/engine_master.cpp
rename to src/transactions/distributed/engine_master.cpp
index 68602f583..44bf2c7d4 100644
--- a/src/transactions/engine_master.cpp
+++ b/src/transactions/distributed/engine_master.cpp
@@ -4,17 +4,17 @@
 #include "glog/logging.h"
 
 #include "database/state_delta.hpp"
-#include "transactions/engine_master.hpp"
-#include "transactions/engine_rpc_messages.hpp"
+#include "transactions/distributed/engine_master.hpp"
+#include "transactions/distributed/engine_rpc_messages.hpp"
 
 namespace tx {
 
-MasterEngine::MasterEngine(communication::rpc::Server &server,
+EngineMaster::EngineMaster(communication::rpc::Server &server,
                            distributed::RpcWorkerClients &rpc_worker_clients,
                            durability::WriteAheadLog *wal)
-    : SingleNodeEngine(wal),
+    : engine_single_node_(wal),
       rpc_server_(server),
-      ongoing_produce_joiner_(rpc_worker_clients) {
+      rpc_worker_clients_(rpc_worker_clients) {
   rpc_server_.Register<BeginRpc>(
       [this](const auto &req_reader, auto *res_builder) {
         auto tx = this->Begin();
@@ -85,14 +85,78 @@ MasterEngine::MasterEngine(communication::rpc::Server &server,
       });
 }
 
-void MasterEngine::Commit(const Transaction &t) {
-  ongoing_produce_joiner_.JoinOngoingProduces(t.id_, true);
-  SingleNodeEngine::Commit(t);
+Transaction *EngineMaster::Begin() { return engine_single_node_.Begin(); }
+
+CommandId EngineMaster::Advance(TransactionId id) {
+  return engine_single_node_.Advance(id);
 }
 
-void MasterEngine::Abort(const Transaction &t) {
-  ongoing_produce_joiner_.JoinOngoingProduces(t.id_, false);
-  SingleNodeEngine::Abort(t);
+CommandId EngineMaster::UpdateCommand(TransactionId id) {
+  return engine_single_node_.UpdateCommand(id);
 }
 
+void EngineMaster::Commit(const Transaction &t) {
+  auto tx_id = t.id_;
+  auto futures = rpc_worker_clients_.ExecuteOnWorkers<void>(
+      0, [tx_id](int worker_id, communication::rpc::ClientPool &client_pool) {
+        auto result = client_pool.Call<NotifyCommittedRpc>(tx_id);
+        CHECK(result)
+            << "[NotifyCommittedRpc] failed to notify that transaction "
+            << tx_id << " committed";
+      });
+
+  // We need to wait for all workers to destroy pending futures to avoid
+  // using already destroyed (released) transaction objects.
+  for (auto &future : futures) {
+    future.wait();
+  }
+
+  engine_single_node_.Commit(t);
+}
+
+void EngineMaster::Abort(const Transaction &t) { engine_single_node_.Abort(t); }
+
+CommitLog::Info EngineMaster::Info(TransactionId tx) const {
+  return engine_single_node_.Info(tx);
+}
+
+Snapshot EngineMaster::GlobalGcSnapshot() {
+  return engine_single_node_.GlobalGcSnapshot();
+}
+
+Snapshot EngineMaster::GlobalActiveTransactions() {
+  return engine_single_node_.GlobalActiveTransactions();
+}
+
+TransactionId EngineMaster::LocalLast() const {
+  return engine_single_node_.LocalLast();
+}
+
+TransactionId EngineMaster::GlobalLast() const {
+  return engine_single_node_.GlobalLast();
+}
+
+TransactionId EngineMaster::LocalOldestActive() const {
+  return engine_single_node_.LocalOldestActive();
+}
+
+void EngineMaster::GarbageCollectCommitLog(TransactionId tx_id) {
+  return engine_single_node_.GarbageCollectCommitLog(tx_id);
+}
+
+void EngineMaster::LocalForEachActiveTransaction(
+    std::function<void(Transaction &)> f) {
+  return engine_single_node_.LocalForEachActiveTransaction(f);
+}
+
+Transaction *EngineMaster::RunningTransaction(TransactionId tx_id) {
+  return engine_single_node_.RunningTransaction(tx_id);
+}
+
+void EngineMaster::EnsureNextIdGreater(TransactionId tx_id) {
+  return engine_single_node_.EnsureNextIdGreater(tx_id);
+}
+
+void EngineMaster::ClearTransactionalCache(TransactionId oldest_active) {}
+
 }  // namespace tx
diff --git a/src/transactions/distributed/engine_master.hpp b/src/transactions/distributed/engine_master.hpp
new file mode 100644
index 000000000..627bb2296
--- /dev/null
+++ b/src/transactions/distributed/engine_master.hpp
@@ -0,0 +1,53 @@
+/// @file
+
+#pragma once
+
+#include "communication/rpc/server.hpp"
+#include "distributed/rpc_worker_clients.hpp"
+#include "transactions/distributed/engine_distributed.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
+
+namespace tx {
+
+/// Distributed master transaction engine. Has complete engine functionality and
+/// exposes an RPC server to be used by distributed Workers.
+class EngineMaster final : public EngineDistributed {
+ public:
+  /// @param server - Required. Used for rpc::Server construction.
+  /// @param rpc_worker_clients - Required. Used for
+  /// OngoingProduceJoinerRpcClients construction.
+  /// @param wal - Optional. If present, the Engine will write tx
+  /// Begin/Commit/Abort atomically (while under lock).
+  EngineMaster(communication::rpc::Server &server,
+               distributed::RpcWorkerClients &rpc_worker_clients,
+               durability::WriteAheadLog *wal = nullptr);
+
+  EngineMaster(const EngineMaster &) = delete;
+  EngineMaster(EngineMaster &&) = delete;
+  EngineMaster &operator=(const EngineMaster &) = delete;
+  EngineMaster &operator=(EngineMaster &&) = delete;
+
+  Transaction *Begin() override;
+  CommandId Advance(TransactionId id) override;
+  CommandId UpdateCommand(TransactionId id) override;
+  void Commit(const Transaction &t) override;
+  void Abort(const Transaction &t) override;
+  CommitLog::Info Info(TransactionId tx) const override;
+  Snapshot GlobalGcSnapshot() override;
+  Snapshot GlobalActiveTransactions() override;
+  TransactionId GlobalLast() const override;
+  TransactionId LocalLast() const override;
+  TransactionId LocalOldestActive() const override;
+  void LocalForEachActiveTransaction(
+      std::function<void(Transaction &)> f) override;
+  Transaction *RunningTransaction(TransactionId tx_id) override;
+  void EnsureNextIdGreater(TransactionId tx_id) override;
+  void GarbageCollectCommitLog(TransactionId tx_id) override;
+  void ClearTransactionalCache(TransactionId oldest_active) override;
+
+ private:
+  EngineSingleNode engine_single_node_;
+  communication::rpc::Server &rpc_server_;
+  distributed::RpcWorkerClients &rpc_worker_clients_;
+};
+}  // namespace tx
diff --git a/src/transactions/engine_rpc_messages.lcp b/src/transactions/distributed/engine_rpc_messages.lcp
similarity index 50%
rename from src/transactions/engine_rpc_messages.lcp
rename to src/transactions/distributed/engine_rpc_messages.lcp
index 839ad6c57..8ef17744b 100644
--- a/src/transactions/engine_rpc_messages.lcp
+++ b/src/transactions/distributed/engine_rpc_messages.lcp
@@ -3,23 +3,27 @@
 
 #include "communication/rpc/messages.hpp"
 #include "transactions/commit_log.hpp"
-#include "transactions/engine_rpc_messages.capnp.h"
+#include "transactions/distributed/engine_rpc_messages.capnp.h"
 #include "transactions/snapshot.hpp"
 #include "transactions/type.hpp"
 cpp<#
 
+(load "transactions/distributed/serialization.lcp")
+
 (lcp:namespace tx)
 
 (lcp:capnp-namespace "tx")
 
-(lcp:capnp-import 'tx "/transactions/common.capnp")
 (lcp:capnp-type-conversion "TransactionId" "UInt64")
 (lcp:capnp-type-conversion "CommandId" "UInt32")
-(lcp:capnp-type-conversion "Snapshot" "Tx.Snapshot")
 
 (lcp:define-struct tx-and-snapshot ()
   ((tx-id "TransactionId")
-   (snapshot "Snapshot"))
+   (snapshot "Snapshot"
+             :capnp-type "List(UInt64)"
+             :capnp-init nil
+             :capnp-save #'save-snapshot
+             :capnp-load #'load-snapshot))
   (:serialize :capnp))
 
 (lcp:define-rpc begin
@@ -40,7 +44,11 @@ cpp<#
 
 (lcp:define-rpc snapshot
     (:request ((member "TransactionId")))
-  (:response ((member "Snapshot"))))
+  (:response ((member "Snapshot"
+                      :capnp-type "List(UInt64)"
+                      :capnp-init nil
+                      :capnp-save #'save-snapshot
+                      :capnp-load #'load-snapshot))))
 
 (lcp:define-rpc command
     (:request ((member "TransactionId")))
@@ -48,15 +56,27 @@ cpp<#
 
 (lcp:define-rpc gc-snapshot
     (:request ())
-  (:response ((member "Snapshot"))))
+  (:response ((member "Snapshot"
+                      :capnp-type "List(UInt64)"
+                      :capnp-init nil
+                      :capnp-save #'save-snapshot
+                      :capnp-load #'load-snapshot))))
 
 (lcp:define-rpc clog-info
     (:request ((member "TransactionId")))
-  (:response ((member "CommitLog::Info" :capnp-type "Tx.CommitLogInfo"))))
+  (:response ((member "CommitLog::Info"
+                      :capnp-type "UInt8"
+                      :capnp-init nil
+                      :capnp-save #'save-commitlog-info
+                      :capnp-load #'load-commitlog-info))))
 
 (lcp:define-rpc active-transactions
     (:request ())
-  (:response ((member "Snapshot"))))
+  (:response ((member "Snapshot"
+                      :capnp-type "List(UInt64)"
+                      :capnp-init nil
+                      :capnp-save #'save-snapshot
+                      :capnp-load #'load-snapshot))))
 
 (lcp:define-rpc ensure-next-id-greater
     (:request ((member "TransactionId")))
@@ -66,4 +86,8 @@ cpp<#
     (:request ())
   (:response ((member "TransactionId"))))
 
+(lcp:define-rpc notify-committed
+    (:request ((member "TransactionId")))
+  (:response ()))
+
 (lcp:pop-namespace) ;; tx
diff --git a/src/transactions/engine_worker.cpp b/src/transactions/distributed/engine_worker.cpp
similarity index 60%
rename from src/transactions/engine_worker.cpp
rename to src/transactions/distributed/engine_worker.cpp
index d47419123..f3819fb6f 100644
--- a/src/transactions/engine_worker.cpp
+++ b/src/transactions/distributed/engine_worker.cpp
@@ -2,45 +2,70 @@
 
 #include "glog/logging.h"
 
-#include "transactions/engine_rpc_messages.hpp"
-#include "transactions/engine_worker.hpp"
+#include "transactions/distributed/engine_rpc_messages.hpp"
+#include "transactions/distributed/engine_worker.hpp"
 #include "utils/atomic.hpp"
 
 namespace tx {
 
-WorkerEngine::WorkerEngine(communication::rpc::ClientPool &master_client_pool)
-    : master_client_pool_(master_client_pool) {}
+EngineWorker::EngineWorker(communication::rpc::Server &server,
+                           communication::rpc::ClientPool &master_client_pool,
+                           durability::WriteAheadLog *wal)
+    : server_(server), master_client_pool_(master_client_pool), wal_(wal) {
+  // Register our `NotifyCommittedRpc` server. This RPC should only write the
+  // `TxCommit` operation into the WAL. It is only used to indicate that the
+  // transaction has succeeded on all workers and that it will be committed on
+  // the master. When recovering the cluster from WALs the `TxCommit` operation
+  // indicates that all operations for a given transaction were written to the
+  // WAL. If we wouldn't have the `TxCommit` after all operations for a given
+  // transaction in the WAL we couldn't be sure that all operations were saved
+  // (eg. some operations could have been written into one WAL file, some to
+  // another file). This way we ensure that if the `TxCommit` is written it was
+  // the last thing associated with that transaction and everything before was
+  // flushed to the disk.
+  // NOTE: We can't cache the commit state for this transaction because this
+  // RPC call could fail on other workers which will cause the transaction to be
+  // aborted. This mismatch in committed/aborted across workers is resolved by
+  // using the master as a single source of truth when doing recovery.
+  server_.Register<NotifyCommittedRpc>(
+      [this](const auto &req_reader, auto *res_builder) {
+        auto tid = req_reader.getMember();
+        if (wal_) {
+          wal_->Emplace(database::StateDelta::TxCommit(tid));
+        }
+      });
+}
 
-WorkerEngine::~WorkerEngine() {
+EngineWorker::~EngineWorker() {
   for (auto &kv : active_.access()) {
     delete kv.second;
   }
 }
 
-Transaction *WorkerEngine::Begin() {
+Transaction *EngineWorker::Begin() {
   auto res = master_client_pool_.Call<BeginRpc>();
   CHECK(res) << "BeginRpc failed";
   auto &data = res->member;
   UpdateOldestActive(data.snapshot, data.tx_id);
-  Transaction *tx = new Transaction(data.tx_id, data.snapshot, *this);
+  Transaction *tx = CreateTransaction(data.tx_id, data.snapshot);
   auto insertion = active_.access().insert(data.tx_id, tx);
   CHECK(insertion.second) << "Failed to start creation from worker";
   VLOG(11) << "[Tx] Starting worker transaction " << data.tx_id;
   return tx;
 }
 
-CommandId WorkerEngine::Advance(TransactionId tx_id) {
+CommandId EngineWorker::Advance(TransactionId tx_id) {
   auto res = master_client_pool_.Call<AdvanceRpc>(tx_id);
   CHECK(res) << "AdvanceRpc failed";
   auto access = active_.access();
   auto found = access.find(tx_id);
   CHECK(found != access.end())
       << "Can't advance a transaction not in local cache";
-  found->second->cid_ = res->member;
+  SetCommand(found->second, res->member);
   return res->member;
 }
 
-CommandId WorkerEngine::UpdateCommand(TransactionId tx_id) {
+CommandId EngineWorker::UpdateCommand(TransactionId tx_id) {
   auto res = master_client_pool_.Call<CommandRpc>(tx_id);
   CHECK(res) << "CommandRpc failed";
   auto cmd_id = res->member;
@@ -55,26 +80,26 @@ CommandId WorkerEngine::UpdateCommand(TransactionId tx_id) {
   auto access = active_.access();
   auto found = access.find(tx_id);
   if (found != access.end()) {
-    found->second->cid_ = cmd_id;
+    SetCommand(found->second, cmd_id);
   }
   return cmd_id;
 }
 
-void WorkerEngine::Commit(const Transaction &t) {
+void EngineWorker::Commit(const Transaction &t) {
   auto res = master_client_pool_.Call<CommitRpc>(t.id_);
   CHECK(res) << "CommitRpc failed";
   VLOG(11) << "[Tx] Commiting worker transaction " << t.id_;
   ClearSingleTransaction(t.id_);
 }
 
-void WorkerEngine::Abort(const Transaction &t) {
+void EngineWorker::Abort(const Transaction &t) {
   auto res = master_client_pool_.Call<AbortRpc>(t.id_);
   CHECK(res) << "AbortRpc failed";
   VLOG(11) << "[Tx] Aborting worker transaction " << t.id_;
   ClearSingleTransaction(t.id_);
 }
 
-CommitLog::Info WorkerEngine::Info(TransactionId tid) const {
+CommitLog::Info EngineWorker::Info(TransactionId tid) const {
   auto info = clog_.fetch_info(tid);
   // If we don't know the transaction to be commited nor aborted, ask the
   // master about it and update the local commit log.
@@ -94,7 +119,7 @@ CommitLog::Info WorkerEngine::Info(TransactionId tid) const {
   return info;
 }
 
-Snapshot WorkerEngine::GlobalGcSnapshot() {
+Snapshot EngineWorker::GlobalGcSnapshot() {
   auto res = master_client_pool_.Call<GcSnapshotRpc>();
   CHECK(res) << "GcSnapshotRpc failed";
   auto snapshot = std::move(res->member);
@@ -102,7 +127,7 @@ Snapshot WorkerEngine::GlobalGcSnapshot() {
   return snapshot;
 }
 
-Snapshot WorkerEngine::GlobalActiveTransactions() {
+Snapshot EngineWorker::GlobalActiveTransactions() {
   auto res = master_client_pool_.Call<ActiveTransactionsRpc>();
   CHECK(res) << "ActiveTransactionsRpc failed";
   auto snapshot = std::move(res->member);
@@ -110,21 +135,22 @@ Snapshot WorkerEngine::GlobalActiveTransactions() {
   return snapshot;
 }
 
-TransactionId WorkerEngine::LocalLast() const { return local_last_; }
-TransactionId WorkerEngine::GlobalLast() const {
+TransactionId EngineWorker::LocalLast() const { return local_last_; }
+
+TransactionId EngineWorker::GlobalLast() const {
   auto res = master_client_pool_.Call<GlobalLastRpc>();
   CHECK(res) << "GlobalLastRpc failed";
   return res->member;
 }
 
-void WorkerEngine::LocalForEachActiveTransaction(
+void EngineWorker::LocalForEachActiveTransaction(
     std::function<void(Transaction &)> f) {
   for (auto pair : active_.access()) f(*pair.second);
 }
 
-TransactionId WorkerEngine::LocalOldestActive() const { return oldest_active_; }
+TransactionId EngineWorker::LocalOldestActive() const { return oldest_active_; }
 
-Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id) {
+Transaction *EngineWorker::RunningTransaction(TransactionId tx_id) {
   auto accessor = active_.access();
   auto found = accessor.find(tx_id);
   if (found != accessor.end()) return found->second;
@@ -136,20 +162,20 @@ Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id) {
   return RunningTransaction(tx_id, snapshot);
 }
 
-Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id,
+Transaction *EngineWorker::RunningTransaction(TransactionId tx_id,
                                               const Snapshot &snapshot) {
   auto accessor = active_.access();
   auto found = accessor.find(tx_id);
   if (found != accessor.end()) return found->second;
 
-  auto new_tx = new Transaction(tx_id, snapshot, *this);
+  auto new_tx = CreateTransaction(tx_id, snapshot);
   auto insertion = accessor.insert(tx_id, new_tx);
   if (!insertion.second) delete new_tx;
   utils::EnsureAtomicGe(local_last_, tx_id);
   return insertion.first->second;
 }
 
-void WorkerEngine::ClearTransactionalCache(TransactionId oldest_active) const {
+void EngineWorker::ClearTransactionalCache(TransactionId oldest_active) {
   auto access = active_.access();
   for (auto kv : access) {
     if (kv.first < oldest_active) {
@@ -161,7 +187,7 @@ void WorkerEngine::ClearTransactionalCache(TransactionId oldest_active) const {
   }
 }
 
-void WorkerEngine::ClearSingleTransaction(TransactionId tx_id) const {
+void EngineWorker::ClearSingleTransaction(TransactionId tx_id) const {
   auto access = active_.access();
   auto found = access.find(tx_id);
   if (found != access.end()) {
@@ -172,7 +198,7 @@ void WorkerEngine::ClearSingleTransaction(TransactionId tx_id) const {
   }
 }
 
-void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot,
+void EngineWorker::UpdateOldestActive(const Snapshot &snapshot,
                                       TransactionId alternative) {
   if (snapshot.empty()) {
     oldest_active_.store(std::max(alternative, oldest_active_.load()));
@@ -181,11 +207,11 @@ void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot,
   }
 }
 
-void WorkerEngine::EnsureNextIdGreater(TransactionId tx_id) {
+void EngineWorker::EnsureNextIdGreater(TransactionId tx_id) {
   master_client_pool_.Call<EnsureNextIdGreaterRpc>(tx_id);
 }
 
-void WorkerEngine::GarbageCollectCommitLog(TransactionId tx_id) {
+void EngineWorker::GarbageCollectCommitLog(TransactionId tx_id) {
   clog_.garbage_collect_older(tx_id);
 }
 }  // namespace tx
diff --git a/src/transactions/engine_worker.hpp b/src/transactions/distributed/engine_worker.hpp
similarity index 72%
rename from src/transactions/engine_worker.hpp
rename to src/transactions/distributed/engine_worker.hpp
index 65023ee30..768529833 100644
--- a/src/transactions/engine_worker.hpp
+++ b/src/transactions/distributed/engine_worker.hpp
@@ -3,25 +3,30 @@
 #include <atomic>
 
 #include "communication/rpc/client_pool.hpp"
+#include "communication/rpc/server.hpp"
 #include "data_structures/concurrent/concurrent_map.hpp"
+#include "durability/wal.hpp"
 #include "io/network/endpoint.hpp"
 #include "transactions/commit_log.hpp"
-#include "transactions/engine.hpp"
+#include "transactions/distributed/engine_distributed.hpp"
 #include "transactions/transaction.hpp"
 
 namespace tx {
 
-/** Distributed worker transaction engine. Connects to a MasterEngine (single
+/** Distributed worker transaction engine. Connects to a EngineMaster (single
  * source of truth) to obtain transactional info. Caches most info locally. Can
  * begin/advance/end transactions on the master. */
-class WorkerEngine : public Engine {
+class EngineWorker final : public EngineDistributed {
  public:
-  /// The wait time between two releases of local transaction objects that have
-  /// expired on the master.
-  static constexpr std::chrono::seconds kCacheReleasePeriod{1};
+  EngineWorker(communication::rpc::Server &server,
+               communication::rpc::ClientPool &master_client_pool,
+               durability::WriteAheadLog *wal = nullptr);
+  ~EngineWorker();
 
-  explicit WorkerEngine(communication::rpc::ClientPool &master_client_pool);
-  ~WorkerEngine();
+  EngineWorker(const EngineWorker &) = delete;
+  EngineWorker(EngineWorker &&) = delete;
+  EngineWorker &operator=(const EngineWorker &) = delete;
+  EngineWorker &operator=(EngineWorker &&) = delete;
 
   Transaction *Begin() override;
   CommandId Advance(TransactionId id) override;
@@ -45,9 +50,7 @@ class WorkerEngine : public Engine {
   void EnsureNextIdGreater(TransactionId tx_id) override;
   void GarbageCollectCommitLog(tx::TransactionId tx_id) override;
 
-  /// Clears the cache of local transactions that have expired. The signature of
-  /// this method is dictated by `distributed::TransactionalCacheCleaner`.
-  void ClearTransactionalCache(TransactionId oldest_active) const;
+  void ClearTransactionalCache(TransactionId oldest_active) override;
 
  private:
   // Local caches.
@@ -56,9 +59,15 @@ class WorkerEngine : public Engine {
   // Mutable because just getting info can cause a cache fill.
   mutable CommitLog clog_;
 
+  // Our local RPC server.
+  communication::rpc::Server &server_;
+
   // Communication to the transactional master.
   communication::rpc::ClientPool &master_client_pool_;
 
+  // Write ahead log.
+  durability::WriteAheadLog *wal_;
+
   // Used for clearing of caches of transactions that have expired.
   // Initialize the oldest_active_ with 1 because there's never a tx with id=0
   std::atomic<TransactionId> oldest_active_{1};
diff --git a/src/transactions/distributed/serialization.lcp b/src/transactions/distributed/serialization.lcp
new file mode 100644
index 000000000..5dea0342a
--- /dev/null
+++ b/src/transactions/distributed/serialization.lcp
@@ -0,0 +1,20 @@
+;; This file doesn't need to be preprocessed. It only holds helper functions.
+
+(defun save-commitlog-info (builder member) #>cpp ${builder}->setMember(${member}); cpp<#)
+
+(defun load-commitlog-info (reader member) #>cpp ${member} = CommitLog::Info(${reader}.getMember()); cpp<#)
+
+(defun save-snapshot (builder member)
+  (let ((capnp-member (remove #\_ (string-capitalize member))))
+    #>cpp
+    auto list_builder = builder->init${capnp-member}(${member}.transaction_ids().size());
+    utils::SaveVector(${member}.transaction_ids(), &list_builder);
+    cpp<#))
+
+(defun load-snapshot (reader member)
+  (let ((capnp-member (remove #\_ (string-capitalize member))))
+    #>cpp
+    std::vector<uint64_t> transaction_ids;
+    utils::LoadVector(&transaction_ids, ${reader}.get${capnp-member}());
+    ${member} = tx::Snapshot(std::move(transaction_ids));
+    cpp<#))
diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp
index 00888a154..153b164ef 100644
--- a/src/transactions/engine.hpp
+++ b/src/transactions/engine.hpp
@@ -1,3 +1,5 @@
+/// @file
+
 #pragma once
 
 #include <algorithm>
@@ -11,18 +13,14 @@
 #include "transactions/type.hpp"
 
 namespace tx {
-/**
- * Database transaction engine. Used for managing transactions and the related
- * information such as transaction snapshots and the transaction state info.
- *
- * This is an abstract base class for implementing a single-node transactional
- * engine (MasterEngine), an engine for the master in a distributed system (also
- * MasterEngine), and for the worker in a distributed system (WorkerEngine).
- *
- * Methods in this class are often prefixed with "Global" or "Local", depending
- * on the guarantees that they need to satisfy. These guarantee requirements are
- * determined by the users of a particular method.
- */
+/// Database transaction engine. Used for managing transactions and the related
+/// information such as transaction snapshots and the transaction state info.
+///
+/// This is an abstract base class for implementing a transactional engine.
+///
+/// Methods in this class are often prefixed with "Global" or "Local", depending
+/// on the guarantees that they need to satisfy. These guarantee requirements
+/// are determined by the users of a particular method.
 class Engine {
  public:
   virtual ~Engine() = default;
@@ -44,58 +42,66 @@ class Engine {
   /// valid after this function executes.
   virtual void Abort(const Transaction &t) = 0;
 
-  /** Returns the commit log Info about the given transaction. */
+  /// Returns the commit log Info about the given transaction.
   virtual CommitLog::Info Info(TransactionId tx) const = 0;
 
-  /** Returns the snapshot relevant to garbage collection of database records.
-   *
-   * If there are no active transactions that means a snapshot containing only
-   * the next transaction ID.  If there are active transactions, that means the
-   * oldest active transaction's snapshot, with that transaction's ID appened as
-   * last.
-   *
-   * The idea is that data records can only be deleted if they were expired (and
-   * that was committed) by a transaction older than the older currently active.
-   * We need the full snapshot to prevent overlaps (see general GC
-   * documentation).
-   *
-   * The returned snapshot must be for the globally oldest active transaction.
-   * If we only looked at locally known transactions, it would be possible to
-   * delete something that and older active transaction can still see.
-   */
+  /// Returns the snapshot relevant to garbage collection of database records.
+  ///
+  /// If there are no active transactions that means a snapshot containing only
+  /// the next transaction ID.  If there are active transactions, that means the
+  /// oldest active transaction's snapshot, with that transaction's ID appened
+  /// as last.
+  ///
+  /// The idea is that data records can only be deleted if they were expired
+  /// (and that was committed) by a transaction older than the older currently
+  /// active. We need the full snapshot to prevent overlaps (see general GC
+  /// documentation).
+  ///
+  /// The returned snapshot must be for the globally oldest active transaction.
+  /// If we only looked at locally known transactions, it would be possible to
+  /// delete something that and older active transaction can still see.
   virtual Snapshot GlobalGcSnapshot() = 0;
 
-  /** Returns active transactions. */
+  /// Returns active transactions.
   virtual Snapshot GlobalActiveTransactions() = 0;
 
-  /** Returns the ID the last globally known transaction. */
+  /// Returns the ID the last globally known transaction.
   virtual tx::TransactionId GlobalLast() const = 0;
 
-  /** Returns the ID of last locally known transaction. */
+  /// Returns the ID of last locally known transaction.
   virtual tx::TransactionId LocalLast() const = 0;
 
-  /** Returns the ID of the oldest transaction locally known to be active. It is
-   * guaranteed that all the transactions older than the returned are globally
-   * not active. */
+  /// Returns the ID of the oldest transaction locally known to be active. It is
+  /// guaranteed that all the transactions older than the returned are globally
+  /// not active.
   virtual TransactionId LocalOldestActive() const = 0;
 
-  /** Calls function f on each locally active transaction. */
+  /// Calls function f on each locally active transaction.
   virtual void LocalForEachActiveTransaction(
       std::function<void(Transaction &)> f) = 0;
 
-  /** Gets a transaction object for a running transaction. */
+  /// Gets a transaction object for a running transaction.
   virtual tx::Transaction *RunningTransaction(TransactionId tx_id) = 0;
 
-  /** Ensures the next transaction that starts will have the ID greater than
-   * the given id. */
+  /// Ensures the next transaction that starts will have the ID greater than
+  /// the given id.
   virtual void EnsureNextIdGreater(TransactionId tx_id) = 0;
 
-  /** Garbage collects transactions older than tx_id from commit log. */
+  /// Garbage collects transactions older than tx_id from commit log.
   virtual void GarbageCollectCommitLog(TransactionId tx_id) = 0;
 
   auto &local_lock_graph() { return local_lock_graph_; }
   const auto &local_lock_graph() const { return local_lock_graph_; }
 
+ protected:
+  Transaction *CreateTransaction(TransactionId id, const Snapshot &snapshot) {
+    return new Transaction(id, snapshot, *this);
+  }
+
+  CommandId AdvanceCommand(Transaction *t) { return t->AdvanceCommand(); }
+
+  void SetCommand(Transaction *t, CommandId cid) { t->SetCommand(cid); }
+
  private:
   // Map lock dependencies. Each entry maps (tx_that_wants_lock,
   // tx_that_holds_lock). Used for local deadlock resolution.
diff --git a/src/transactions/engine_master.hpp b/src/transactions/engine_master.hpp
deleted file mode 100644
index 5dc9b9e95..000000000
--- a/src/transactions/engine_master.hpp
+++ /dev/null
@@ -1,30 +0,0 @@
-#pragma once
-
-#include "communication/rpc/server.hpp"
-#include "distributed/rpc_worker_clients.hpp"
-#include "transactions/engine_single_node.hpp"
-
-namespace tx {
-
-/** Distributed master transaction engine. Has complete engine functionality and
- * exposes an RPC server to be used by distributed Workers. */
-class MasterEngine : public SingleNodeEngine {
- public:
-  /**
-   * @param server - Required. Used for rpc::Server construction.
-   * @param rpc_worker_clients - Required. Used for
-   * OngoingProduceJoinerRpcClients construction.
-   * @param wal - Optional. If present, the Engine will write tx
-   * Begin/Commit/Abort atomically (while under lock).
-   */
-  MasterEngine(communication::rpc::Server &server,
-               distributed::RpcWorkerClients &rpc_worker_clients,
-               durability::WriteAheadLog *wal = nullptr);
-  void Commit(const Transaction &t) override;
-  void Abort(const Transaction &t) override;
-
- private:
-  communication::rpc::Server &rpc_server_;
-  distributed::OngoingProduceJoinerRpcClients ongoing_produce_joiner_;
-};
-}  // namespace tx
diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/single_node/engine_single_node.cpp
similarity index 67%
rename from src/transactions/engine_single_node.cpp
rename to src/transactions/single_node/engine_single_node.cpp
index 71dedfffa..b685c1402 100644
--- a/src/transactions/engine_single_node.cpp
+++ b/src/transactions/single_node/engine_single_node.cpp
@@ -4,20 +4,19 @@
 #include "glog/logging.h"
 
 #include "database/state_delta.hpp"
-#include "transactions/engine_rpc_messages.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 
 namespace tx {
 
-SingleNodeEngine::SingleNodeEngine(durability::WriteAheadLog *wal)
+EngineSingleNode::EngineSingleNode(durability::WriteAheadLog *wal)
     : wal_(wal) {}
 
-Transaction *SingleNodeEngine::Begin() {
+Transaction *EngineSingleNode::Begin() {
   VLOG(11) << "[Tx] Starting transaction " << counter_ + 1;
   std::lock_guard<utils::SpinLock> guard(lock_);
 
   TransactionId id{++counter_};
-  auto t = new Transaction(id, active_, *this);
+  auto t = CreateTransaction(id, active_);
   active_.insert(id);
   store_.emplace(id, t);
   if (wal_) {
@@ -26,7 +25,7 @@ Transaction *SingleNodeEngine::Begin() {
   return t;
 }
 
-CommandId SingleNodeEngine::Advance(TransactionId id) {
+CommandId EngineSingleNode::Advance(TransactionId id) {
   std::lock_guard<utils::SpinLock> guard(lock_);
 
   auto it = store_.find(id);
@@ -34,23 +33,18 @@ CommandId SingleNodeEngine::Advance(TransactionId id) {
       << "Transaction::advance on non-existing transaction";
 
   Transaction *t = it->second.get();
-  if (t->cid_ == std::numeric_limits<CommandId>::max())
-    throw TransactionError(
-        "Reached maximum number of commands in this "
-        "transaction.");
-
-  return ++(t->cid_);
+  return AdvanceCommand(t);
 }
 
-CommandId SingleNodeEngine::UpdateCommand(TransactionId id) {
+CommandId EngineSingleNode::UpdateCommand(TransactionId id) {
   std::lock_guard<utils::SpinLock> guard(lock_);
   auto it = store_.find(id);
   DCHECK(it != store_.end())
       << "Transaction::advance on non-existing transaction";
-  return it->second->cid_;
+  return it->second->cid();
 }
 
-void SingleNodeEngine::Commit(const Transaction &t) {
+void EngineSingleNode::Commit(const Transaction &t) {
   VLOG(11) << "[Tx] Commiting transaction " << t.id_;
   std::lock_guard<utils::SpinLock> guard(lock_);
   clog_.set_committed(t.id_);
@@ -61,7 +55,7 @@ void SingleNodeEngine::Commit(const Transaction &t) {
   store_.erase(store_.find(t.id_));
 }
 
-void SingleNodeEngine::Abort(const Transaction &t) {
+void EngineSingleNode::Abort(const Transaction &t) {
   VLOG(11) << "[Tx] Aborting transaction " << t.id_;
   std::lock_guard<utils::SpinLock> guard(lock_);
   clog_.set_aborted(t.id_);
@@ -72,11 +66,11 @@ void SingleNodeEngine::Abort(const Transaction &t) {
   store_.erase(store_.find(t.id_));
 }
 
-CommitLog::Info SingleNodeEngine::Info(TransactionId tx) const {
+CommitLog::Info EngineSingleNode::Info(TransactionId tx) const {
   return clog_.fetch_info(tx);
 }
 
-Snapshot SingleNodeEngine::GlobalGcSnapshot() {
+Snapshot EngineSingleNode::GlobalGcSnapshot() {
   std::lock_guard<utils::SpinLock> guard(lock_);
 
   // No active transactions.
@@ -92,29 +86,29 @@ Snapshot SingleNodeEngine::GlobalGcSnapshot() {
   return snapshot_copy;
 }
 
-Snapshot SingleNodeEngine::GlobalActiveTransactions() {
+Snapshot EngineSingleNode::GlobalActiveTransactions() {
   std::lock_guard<utils::SpinLock> guard(lock_);
   Snapshot active_transactions = active_;
   return active_transactions;
 }
 
-TransactionId SingleNodeEngine::LocalLast() const {
+TransactionId EngineSingleNode::LocalLast() const {
   std::lock_guard<utils::SpinLock> guard(lock_);
   return counter_;
 }
 
-TransactionId SingleNodeEngine::GlobalLast() const { return LocalLast(); }
+TransactionId EngineSingleNode::GlobalLast() const { return LocalLast(); }
 
-TransactionId SingleNodeEngine::LocalOldestActive() const {
+TransactionId EngineSingleNode::LocalOldestActive() const {
   std::lock_guard<utils::SpinLock> guard(lock_);
   return active_.empty() ? counter_ + 1 : active_.front();
 }
 
-void SingleNodeEngine::GarbageCollectCommitLog(TransactionId tx_id) {
+void EngineSingleNode::GarbageCollectCommitLog(TransactionId tx_id) {
   clog_.garbage_collect_older(tx_id);
 }
 
-void SingleNodeEngine::LocalForEachActiveTransaction(
+void EngineSingleNode::LocalForEachActiveTransaction(
     std::function<void(Transaction &)> f) {
   std::lock_guard<utils::SpinLock> guard(lock_);
   for (auto transaction : active_) {
@@ -122,7 +116,7 @@ void SingleNodeEngine::LocalForEachActiveTransaction(
   }
 }
 
-Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) {
+Transaction *EngineSingleNode::RunningTransaction(TransactionId tx_id) {
   std::lock_guard<utils::SpinLock> guard(lock_);
   auto found = store_.find(tx_id);
   CHECK(found != store_.end())
@@ -130,7 +124,7 @@ Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) {
   return found->second.get();
 }
 
-void SingleNodeEngine::EnsureNextIdGreater(TransactionId tx_id) {
+void EngineSingleNode::EnsureNextIdGreater(TransactionId tx_id) {
   std::lock_guard<utils::SpinLock> guard(lock_);
   counter_ = std::max(tx_id, counter_);
 }
diff --git a/src/transactions/engine_single_node.hpp b/src/transactions/single_node/engine_single_node.hpp
similarity index 72%
rename from src/transactions/engine_single_node.hpp
rename to src/transactions/single_node/engine_single_node.hpp
index ddc09918a..fc7e7ce77 100644
--- a/src/transactions/engine_single_node.hpp
+++ b/src/transactions/single_node/engine_single_node.hpp
@@ -1,3 +1,5 @@
+/// @file
+
 #pragma once
 
 #include <atomic>
@@ -8,26 +10,21 @@
 #include "transactions/commit_log.hpp"
 #include "transactions/engine.hpp"
 #include "transactions/transaction.hpp"
-#include "utils/exceptions.hpp"
 #include "utils/thread/sync.hpp"
 
 namespace tx {
 
-/** Indicates an error in transaction handling (currently
- * only command id overflow). */
-class TransactionError : public utils::BasicException {
+/// Single-node deployment transaction engine. Has complete functionality.
+class EngineSingleNode final : public Engine {
  public:
-  using utils::BasicException::BasicException;
-};
+  /// @param wal - Optional. If present, the Engine will write tx
+  /// Begin/Commit/Abort atomically (while under lock).
+  explicit EngineSingleNode(durability::WriteAheadLog *wal = nullptr);
 
-/** Single-node deployment transaction engine. Has complete functionality. */
-class SingleNodeEngine : public Engine {
- public:
-  /**
-   * @param wal - Optional. If present, the Engine will write tx
-   * Begin/Commit/Abort atomically (while under lock).
-   */
-  explicit SingleNodeEngine(durability::WriteAheadLog *wal = nullptr);
+  EngineSingleNode(const EngineSingleNode &) = delete;
+  EngineSingleNode(EngineSingleNode &&) = delete;
+  EngineSingleNode &operator=(const EngineSingleNode &) = delete;
+  EngineSingleNode &operator=(EngineSingleNode &&) = delete;
 
   Transaction *Begin() override;
   CommandId Advance(TransactionId id) override;
diff --git a/src/transactions/snapshot.cpp b/src/transactions/snapshot.cpp
deleted file mode 100644
index 134259566..000000000
--- a/src/transactions/snapshot.cpp
+++ /dev/null
@@ -1,16 +0,0 @@
-#include "transactions/snapshot.hpp"
-
-#include "utils/serialization.hpp"
-
-namespace tx {
-
-void Snapshot::Save(capnp::Snapshot::Builder *builder) const {
-  auto list_builder = builder->initTransactionIds(transaction_ids_.size());
-  utils::SaveVector(transaction_ids_, &list_builder);
-}
-
-void Snapshot::Load(const capnp::Snapshot::Reader &reader) {
-  utils::LoadVector(&transaction_ids_, reader.getTransactionIds());
-}
-
-}  // namespace tx
diff --git a/src/transactions/snapshot.hpp b/src/transactions/snapshot.hpp
index bb2549282..ef6d493e5 100644
--- a/src/transactions/snapshot.hpp
+++ b/src/transactions/snapshot.hpp
@@ -1,3 +1,5 @@
+/// @file
+
 #pragma once
 
 #include <algorithm>
@@ -5,51 +7,49 @@
 #include <vector>
 
 #include "glog/logging.h"
-#include "transactions/common.capnp.h"
 #include "transactions/type.hpp"
 #include "utils/algorithm.hpp"
 
 namespace tx {
 
-class Engine;
-
-/** Ascendingly sorted collection of transaction ids.
- *
- * Represents the transactions that were active at
- * some point in the discrete transaction time.
- */
-class Snapshot {
+/// Ascendingly sorted collection of transaction ids.
+///
+/// Represents the transactions that were active at
+/// some point in the discrete transaction time.
+class Snapshot final {
  public:
   Snapshot() = default;
   Snapshot(std::vector<TransactionId> &&active)
       : transaction_ids_(std::move(active)) {}
-  // all the copy/move constructors/assignments act naturally
 
-  /** Returns true if this snapshot contains the given
-   * transaction id.
-   *
-   * @param xid - The transcation id in question
-   */
+  Snapshot(const Snapshot &) = default;
+  Snapshot(Snapshot &&) = default;
+  Snapshot &operator=(const Snapshot &) = default;
+  Snapshot &operator=(Snapshot &&) = default;
+
+  /// Returns true if this snapshot contains the given
+  /// transaction id.
+  ///
+  /// @param xid - The transcation id in question
   bool contains(TransactionId id) const {
     return std::binary_search(transaction_ids_.begin(), transaction_ids_.end(),
                               id);
   }
 
-  /** Adds the given transaction id to the end of this Snapshot.
-   * The given id must be greater then all the existing ones,
-   * to maintain ascending sort order.
-   *
-   * @param id - the transaction id to add
-   */
+  /// Adds the given transaction id to the end of this Snapshot.
+  /// The given id must be greater then all the existing ones,
+  /// to maintain ascending sort order.
+  ///
+  /// @param id - the transaction id to add
   void insert(TransactionId id) {
     transaction_ids_.push_back(id);
     DCHECK(std::is_sorted(transaction_ids_.begin(), transaction_ids_.end()))
         << "Snapshot must be sorted";
   }
 
-  /** Removes the given transaction id from this Snapshot.
-   *
-   * @param id - the transaction id to remove */
+  /// Removes the given transaction id from this Snapshot.
+  ///
+  /// @param id - the transaction id to remove
   void remove(TransactionId id) {
     auto last =
         std::remove(transaction_ids_.begin(), transaction_ids_.end(), id);
@@ -84,8 +84,7 @@ class Snapshot {
     return stream;
   }
 
-  void Save(capnp::Snapshot::Builder *builder) const;
-  void Load(const capnp::Snapshot::Reader &reader);
+  const auto &transaction_ids() const { return transaction_ids_; }
 
  private:
   std::vector<TransactionId> transaction_ids_;
diff --git a/src/transactions/transaction.hpp b/src/transactions/transaction.hpp
index 68d57bbcc..25757e4c7 100644
--- a/src/transactions/transaction.hpp
+++ b/src/transactions/transaction.hpp
@@ -1,3 +1,5 @@
+/// @file
+
 #pragma once
 
 #include <chrono>
@@ -11,22 +13,28 @@
 #include "transactions/lock_store.hpp"
 #include "transactions/snapshot.hpp"
 #include "transactions/type.hpp"
+#include "utils/exceptions.hpp"
 
 namespace tx {
 
-/** A database transaction. Encapsulates an atomic, abortable unit of work. Also
- * defines that all db ops are single-threaded within a single transaction */
-class Transaction {
+/// Indicates an error in transaction handling (currently
+/// only command id overflow).
+class TransactionError : public utils::BasicException {
  public:
-  /** Returns the maximum possible transcation id */
+  using utils::BasicException::BasicException;
+};
+
+/// A database transaction. Encapsulates an atomic, abortable unit of work. Also
+/// defines that all db ops are single-threaded within a single transaction
+class Transaction final {
+ public:
+  /// Returns the maximum possible transcation id
   static TransactionId MaxId() {
     return std::numeric_limits<TransactionId>::max();
   }
 
  private:
-  friend class SingleNodeEngine;
-  friend class MasterEngine;
-  friend class WorkerEngine;
+  friend class Engine;
 
   // The constructor is private, only the Engine ever uses it.
   Transaction(TransactionId id, const Snapshot &snapshot, Engine &engine)
@@ -40,27 +48,26 @@ class Transaction {
   Transaction &operator=(Transaction &&) = delete;
 
  public:
-  /** Acquires the lock over the given RecordLock, preventing other transactions
-   * from doing the same */
+  /// Acquires the lock over the given RecordLock, preventing other transactions
+  /// from doing the same
   void TakeLock(RecordLock &lock) const { locks_.Take(&lock, *this, engine_); }
 
-  /** Transaction's id. Unique in the engine that owns it */
+  /// Transaction's id. Unique in the engine that owns it
   const TransactionId id_;
 
-  /** The transaction engine to which this transaction belongs */
+  /// The transaction engine to which this transaction belongs
   Engine &engine_;
 
-  /** Returns the current transaction's current command id */
+  /// Returns the current transaction's current command id
   // TODO rename to cmd_id (variable and function
   auto cid() const { return cid_; }
 
-  /** Returns this transaction's snapshot. */
+  /// Returns this transaction's snapshot.
   const Snapshot &snapshot() const { return snapshot_; }
 
-  /** Signal to transaction that it should abort. It doesn't really enforce that
-   * transaction will abort, but it merely hints too the transaction that it is
-   * preferable to stop its execution.
-   */
+  /// Signal to transaction that it should abort. It doesn't really enforce that
+  /// transaction will abort, but it merely hints too the transaction that it is
+  /// preferable to stop its execution.
   void set_should_abort() { should_abort_ = true; }
 
   bool should_abort() const { return should_abort_; }
@@ -68,6 +75,19 @@ class Transaction {
   auto creation_time() const { return creation_time_; }
 
  private:
+  // Function used to advance the command.
+  CommandId AdvanceCommand() {
+    if (cid_ == std::numeric_limits<CommandId>::max()) {
+      throw TransactionError(
+          "Reached maximum number of commands in this "
+          "transaction.");
+    }
+    return ++cid_;
+  }
+
+  // Function used to set the command.
+  void SetCommand(CommandId cid) { cid_ = cid; }
+
   // Index of the current command in the current transaction.
   CommandId cid_{1};
 
diff --git a/src/transactions/type.hpp b/src/transactions/type.hpp
index 991ddc8fb..9dd1a6617 100644
--- a/src/transactions/type.hpp
+++ b/src/transactions/type.hpp
@@ -1,12 +1,14 @@
+/// @file
+
 #include <cstdint>
 
 // transcation and command types defined
 // in a separate header to avoid cyclic dependencies
 namespace tx {
 
-  /** Type of a tx::Transcation's id member */
+  /// Type of a tx::Transcation's id member
   using TransactionId = uint64_t;
 
-  /** Type of a tx::Transcation's command id member */
+  /// Type of a tx::Transcation's command id member
   using CommandId = uint32_t;
 }
diff --git a/tests/benchmark/mvcc.cpp b/tests/benchmark/mvcc.cpp
index 1f3ad3b28..d886971a1 100644
--- a/tests/benchmark/mvcc.cpp
+++ b/tests/benchmark/mvcc.cpp
@@ -4,7 +4,7 @@
 
 #include "mvcc/record.hpp"
 #include "mvcc/version_list.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 
 class Prop : public mvcc::Record<Prop> {
  public:
@@ -19,7 +19,7 @@ class Prop : public mvcc::Record<Prop> {
 void MvccMix(benchmark::State &state) {
   while (state.KeepRunning()) {
     state.PauseTiming();
-    tx::SingleNodeEngine engine;
+    tx::EngineSingleNode engine;
     auto t1 = engine.Begin();
     mvcc::VersionList<Prop> version_list(*t1, 0, 0);
 
diff --git a/tests/benchmark/tx_engine.cpp b/tests/benchmark/tx_engine.cpp
index 409387bd9..a2a14d746 100644
--- a/tests/benchmark/tx_engine.cpp
+++ b/tests/benchmark/tx_engine.cpp
@@ -3,14 +3,14 @@
 
 #include <glog/logging.h>
 
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 #include "utils/timer.hpp"
 
 void Benchmark(int64_t num_threads, int64_t num_transactions) {
   LOG(INFO) << "Testing with " << num_threads << " threads and "
             << num_transactions << " transactions per thread...";
 
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
   std::vector<std::thread> threads;
   utils::Timer timer;
   for (int i = 0; i < num_threads; ++i) {
diff --git a/tests/unit/database_key_index.cpp b/tests/unit/database_key_index.cpp
index 394329377..5a18ad77e 100644
--- a/tests/unit/database_key_index.cpp
+++ b/tests/unit/database_key_index.cpp
@@ -5,7 +5,7 @@
 #include "database/graph_db_accessor.hpp"
 #include "storage/types.hpp"
 #include "storage/vertex.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 
 #include "mvcc_gc_common.hpp"
 
@@ -16,7 +16,7 @@ TEST(LabelsIndex, UniqueInsert) {
   database::KeyIndex<storage::Label, Vertex> index;
   database::SingleNode db;
   auto dba = db.Access();
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
 
   auto t1 = engine.Begin();
   mvcc::VersionList<Vertex> vlist(*t1, 0, 0);
@@ -45,7 +45,7 @@ TEST(LabelsIndex, UniqueFilter) {
   database::SingleNode db;
   database::KeyIndex<storage::Label, Vertex> index;
   auto dba = db.Access();
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
 
   auto t1 = engine.Begin();
   mvcc::VersionList<Vertex> vlist1(*t1, 0, 0);
@@ -85,7 +85,7 @@ TEST(LabelsIndex, Refresh) {
   database::KeyIndex<storage::Label, Vertex> index;
   database::SingleNode db;
   auto access = db.Access();
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
 
   // add two vertices to  database
   auto t1 = engine.Begin();
diff --git a/tests/unit/database_label_property_index.cpp b/tests/unit/database_label_property_index.cpp
index b6400e370..36579d3eb 100644
--- a/tests/unit/database_label_property_index.cpp
+++ b/tests/unit/database_label_property_index.cpp
@@ -4,6 +4,7 @@
 #include "database/graph_db_accessor.hpp"
 #include "database/indexes/label_property_index.hpp"
 #include "storage/types.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 
 #include "mvcc_gc_common.hpp"
 
@@ -45,7 +46,7 @@ class LabelPropertyIndexComplexTest : public ::testing::Test {
   LabelPropertyIndex index;
   LabelPropertyIndex::Key *key;
 
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
   tx::Transaction *t{nullptr};
 
   mvcc::VersionList<Vertex> *vlist;
diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp
index 62592f636..c43b0b95d 100644
--- a/tests/unit/distributed_common.hpp
+++ b/tests/unit/distributed_common.hpp
@@ -10,7 +10,7 @@
 #include "distributed/data_manager.hpp"
 #include "distributed/updates_rpc_server.hpp"
 #include "storage/address_types.hpp"
-#include "transactions/engine_master.hpp"
+#include "transactions/distributed/engine_master.hpp"
 #include "utils/file.hpp"
 
 DECLARE_string(durability_directory);
diff --git a/tests/unit/distributed_durability.cpp b/tests/unit/distributed_durability.cpp
index ef196cb89..464ad5209 100644
--- a/tests/unit/distributed_durability.cpp
+++ b/tests/unit/distributed_durability.cpp
@@ -89,11 +89,15 @@ class DistributedDurability : public DistributedGraphDbTest {
         // The master always has TRANSACTION_BEGIN and `op`.
         ASSERT_EQ(deltas.size(), 2);
         EXPECT_EQ(deltas[1].type, op);
-      }
-      else {
-        // The workers only have `op`.
-        ASSERT_EQ(deltas.size(), 1);
-        EXPECT_EQ(deltas[0].type, op);
+      } else {
+        // The workers only have `op` if the op is `COMMITTED`, they don't have
+        // the `ABORTED` op.
+        if (op == database::StateDelta::Type::TRANSACTION_COMMIT) {
+          ASSERT_EQ(deltas.size(), 1);
+          EXPECT_EQ(deltas[0].type, op);
+        } else {
+          ASSERT_EQ(deltas.size(), 0);
+        }
       }
     }
   }
diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp
index 464119e45..f987d910c 100644
--- a/tests/unit/distributed_graph_db.cpp
+++ b/tests/unit/distributed_graph_db.cpp
@@ -24,7 +24,7 @@
 #include "query/typed_value.hpp"
 #include "query_common.hpp"
 #include "query_plan_common.hpp"
-#include "transactions/engine_master.hpp"
+#include "transactions/distributed/engine_master.hpp"
 
 using database::GraphDbAccessor;
 using namespace distributed;
diff --git a/tests/unit/distributed_query_plan.cpp b/tests/unit/distributed_query_plan.cpp
index a2bc4f628..2274b4fbb 100644
--- a/tests/unit/distributed_query_plan.cpp
+++ b/tests/unit/distributed_query_plan.cpp
@@ -25,7 +25,6 @@
 #include "query/typed_value.hpp"
 #include "query_common.hpp"
 #include "query_plan_common.hpp"
-#include "transactions/engine_master.hpp"
 
 DECLARE_int32(query_execution_time_sec);
 
diff --git a/tests/unit/distributed_serialization.cpp b/tests/unit/distributed_serialization.cpp
index 0f43a1955..4f8a4067d 100644
--- a/tests/unit/distributed_serialization.cpp
+++ b/tests/unit/distributed_serialization.cpp
@@ -10,7 +10,7 @@
 #include "storage/property_value_store.hpp"
 #include "storage/types.hpp"
 #include "storage/vertex.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 
 using namespace storage;
 
@@ -117,7 +117,7 @@ TEST(DistributedSerialization, VertexProperties) {
 
 class DistributedSerializationMvcc : public ::testing::Test {
  protected:
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
   tx::Transaction *tx = engine.Begin();
   mvcc::VersionList<Vertex> v1_vlist{*tx, 0, 0};
   Vertex &v1 = *v1_vlist.Oldest();
diff --git a/tests/unit/mvcc.cpp b/tests/unit/mvcc.cpp
index 149f64f80..d321aff01 100644
--- a/tests/unit/mvcc.cpp
+++ b/tests/unit/mvcc.cpp
@@ -4,14 +4,14 @@
 #include "mvcc/record.hpp"
 #include "mvcc/version.hpp"
 #include "mvcc/version_list.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 #include "transactions/transaction.hpp"
 #include "utils/thread/sync.hpp"
 
 #include "mvcc_gc_common.hpp"
 
 TEST(MVCC, Deadlock) {
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
 
   auto t0 = engine.Begin();
   mvcc::VersionList<Prop> version_list1(*t0, 0, 0);
@@ -31,7 +31,7 @@ TEST(MVCC, Deadlock) {
 TEST(MVCC, UpdateDontDelete) {
   std::atomic<int> count{0};
   {
-    tx::SingleNodeEngine engine;
+    tx::EngineSingleNode engine;
     auto t1 = engine.Begin();
     mvcc::VersionList<DestrCountRec> version_list(*t1, 0, 0, count);
     engine.Commit(*t1);
@@ -55,7 +55,7 @@ TEST(MVCC, UpdateDontDelete) {
 
 // Check that we get the oldest record.
 TEST(MVCC, Oldest) {
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
   auto t1 = engine.Begin();
   mvcc::VersionList<Prop> version_list(*t1, 0, 0);
   auto first = version_list.Oldest();
diff --git a/tests/unit/mvcc_find_update_common.hpp b/tests/unit/mvcc_find_update_common.hpp
index 0586d38e6..cdd4c8632 100644
--- a/tests/unit/mvcc_find_update_common.hpp
+++ b/tests/unit/mvcc_find_update_common.hpp
@@ -4,7 +4,7 @@
 #include "mvcc/record.hpp"
 #include "mvcc/version.hpp"
 #include "mvcc/version_list.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 #include "transactions/transaction.hpp"
 
 class TestClass : public mvcc::Record<TestClass> {
@@ -57,7 +57,7 @@ class Mvcc : public ::testing::Test {
   }
   // variable where number of versions is stored
   int version_list_size = 0;
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
   tx::Transaction *t1 = engine.Begin();
   mvcc::VersionList<TestClass> version_list{*t1, 0, 0, version_list_size};
   TestClass *v1 = nullptr;
diff --git a/tests/unit/mvcc_gc.cpp b/tests/unit/mvcc_gc.cpp
index a3b2bea3c..b2ced902b 100644
--- a/tests/unit/mvcc_gc.cpp
+++ b/tests/unit/mvcc_gc.cpp
@@ -12,13 +12,13 @@
 #include "mvcc/version_list.hpp"
 #include "storage/garbage_collector.hpp"
 #include "storage/vertex.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 
 #include "mvcc_gc_common.hpp"
 
 class MvccGcTest : public ::testing::Test {
  protected:
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
 
  private:
   tx::Transaction *t0 = engine.Begin();
@@ -116,7 +116,7 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) {
  */
 TEST(GarbageCollector, GcClean) {
   ConcurrentMap<int64_t, mvcc::VersionList<DestrCountRec> *> collection;
-  tx::SingleNodeEngine engine;
+  tx::EngineSingleNode engine;
   DeferredDeleter<DestrCountRec> deleter;
   DeferredDeleter<mvcc::VersionList<DestrCountRec>> vlist_deleter;
   GarbageCollector<decltype(collection), DestrCountRec> gc(collection, deleter,
diff --git a/tests/unit/mvcc_gc_common.hpp b/tests/unit/mvcc_gc_common.hpp
index 270dbc36c..e35ed2cbf 100644
--- a/tests/unit/mvcc_gc_common.hpp
+++ b/tests/unit/mvcc_gc_common.hpp
@@ -1,7 +1,7 @@
 #pragma once
 
 #include "mvcc/record.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 
 /**
  * @brief - Empty class which inherits from mvcc:Record.
@@ -29,7 +29,7 @@ class DestrCountRec : public mvcc::Record<DestrCountRec> {
 // helper function for creating a GC snapshot
 // if given a nullptr it makes a GC snapshot like there
 // are no active transactions
-auto GcSnapshot(tx::SingleNodeEngine &engine, tx::Transaction *t) {
+auto GcSnapshot(tx::EngineSingleNode &engine, tx::Transaction *t) {
   if (t != nullptr) {
     tx::Snapshot gc_snap = t->snapshot();
     gc_snap.insert(t->id_);
diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp
index ac9227de6..048e434a9 100644
--- a/tests/unit/transaction_engine_distributed.cpp
+++ b/tests/unit/transaction_engine_distributed.cpp
@@ -9,9 +9,8 @@
 #include "distributed/cluster_discovery_master.hpp"
 #include "distributed/coordination_master.hpp"
 #include "io/network/endpoint.hpp"
-#include "transactions/engine_master.hpp"
-#include "transactions/engine_rpc_messages.hpp"
-#include "transactions/engine_worker.hpp"
+#include "transactions/distributed/engine_master.hpp"
+#include "transactions/distributed/engine_worker.hpp"
 
 using namespace tx;
 using namespace communication::rpc;
@@ -22,13 +21,14 @@ class WorkerEngineTest : public testing::Test {
   const std::string local{"127.0.0.1"};
 
   Server master_server_{{local, 0}};
+  Server worker_server_{{local, 0}};
   MasterCoordination master_coordination_{master_server_.endpoint()};
   RpcWorkerClients rpc_worker_clients_{master_coordination_};
 
-  MasterEngine master_{master_server_, rpc_worker_clients_};
+  EngineMaster master_{master_server_, rpc_worker_clients_};
   ClientPool master_client_pool{master_server_.endpoint()};
 
-  WorkerEngine worker_{master_client_pool};
+  EngineWorker worker_{worker_server_, master_client_pool};
 };
 
 TEST_F(WorkerEngineTest, BeginOnWorker) {
diff --git a/tests/unit/transaction_engine_single_node.cpp b/tests/unit/transaction_engine_single_node.cpp
index 2a34897ce..67d8ce2b7 100644
--- a/tests/unit/transaction_engine_single_node.cpp
+++ b/tests/unit/transaction_engine_single_node.cpp
@@ -4,13 +4,13 @@
 #include <vector>
 
 #include "data_structures/concurrent/skiplist.hpp"
-#include "transactions/engine_single_node.hpp"
+#include "transactions/single_node/engine_single_node.hpp"
 #include "transactions/transaction.hpp"
 
 using namespace tx;
 
 TEST(Engine, GcSnapshot) {
-  SingleNodeEngine engine;
+  EngineSingleNode engine;
   ASSERT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
 
   std::vector<Transaction *> transactions;
@@ -38,7 +38,7 @@ TEST(Engine, GcSnapshot) {
 }
 
 TEST(Engine, Advance) {
-  SingleNodeEngine engine;
+  EngineSingleNode engine;
 
   auto t0 = engine.Begin();
   auto t1 = engine.Begin();
@@ -51,7 +51,7 @@ TEST(Engine, Advance) {
 }
 
 TEST(Engine, ConcurrentBegin) {
-  SingleNodeEngine engine;
+  EngineSingleNode engine;
   std::vector<std::thread> threads;
   SkipList<TransactionId> tx_ids;
   for (int i = 0; i < 10; ++i) {
@@ -67,7 +67,7 @@ TEST(Engine, ConcurrentBegin) {
 }
 
 TEST(Engine, RunningTransaction) {
-  SingleNodeEngine engine;
+  EngineSingleNode engine;
   auto t0 = engine.Begin();
   auto t1 = engine.Begin();
   EXPECT_EQ(t0, engine.RunningTransaction(t0->id_));
@@ -76,7 +76,7 @@ TEST(Engine, RunningTransaction) {
 }
 
 TEST(Engine, EnsureTxIdGreater) {
-  SingleNodeEngine engine;
+  EngineSingleNode engine;
   ASSERT_LE(engine.Begin()->id_, 40);
   engine.EnsureNextIdGreater(42);
   EXPECT_EQ(engine.Begin()->id_, 43);