Separate distributed from single node transaction engine

Reviewers: teon.banek, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1637
This commit is contained in:
Matej Ferencevic 2018-10-05 12:48:57 +02:00
parent ade2593b51
commit 5097c10ba8
7 changed files with 192 additions and 17 deletions

View File

@ -153,7 +153,7 @@ set(mg_distributed_sources
storage/distributed/vertex_accessor.cpp storage/distributed/vertex_accessor.cpp
storage/locking/record_lock.cpp storage/locking/record_lock.cpp
memgraph_init.cpp memgraph_init.cpp
transactions/single_node/engine_single_node.cpp transactions/distributed/engine_single_node.cpp
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@ -5,7 +5,7 @@
#include "communication/rpc/server.hpp" #include "communication/rpc/server.hpp"
#include "distributed/coordination.hpp" #include "distributed/coordination.hpp"
#include "transactions/distributed/engine_distributed.hpp" #include "transactions/distributed/engine_distributed.hpp"
#include "transactions/single_node/engine_single_node.hpp" #include "transactions/distributed/engine_single_node.hpp"
namespace tx { namespace tx {

View File

@ -0,0 +1,133 @@
#include "transactions/distributed/engine_single_node.hpp"
#include <limits>
#include <mutex>
#include "glog/logging.h"
#include "durability/distributed/state_delta.hpp"
namespace tx {
EngineSingleNode::EngineSingleNode(durability::WriteAheadLog *wal)
: wal_(wal) {}
Transaction *EngineSingleNode::Begin() {
VLOG(11) << "[Tx] Starting transaction " << counter_ + 1;
std::lock_guard<utils::SpinLock> guard(lock_);
TransactionId id{++counter_};
auto t = CreateTransaction(id, active_);
active_.insert(id);
store_.emplace(id, t);
if (wal_) {
wal_->Emplace(database::StateDelta::TxBegin(id));
}
return t;
}
CommandId EngineSingleNode::Advance(TransactionId id) {
std::lock_guard<utils::SpinLock> guard(lock_);
auto it = store_.find(id);
DCHECK(it != store_.end())
<< "Transaction::advance on non-existing transaction";
Transaction *t = it->second.get();
return AdvanceCommand(t);
}
CommandId EngineSingleNode::UpdateCommand(TransactionId id) {
std::lock_guard<utils::SpinLock> guard(lock_);
auto it = store_.find(id);
DCHECK(it != store_.end())
<< "Transaction::advance on non-existing transaction";
return it->second->cid();
}
void EngineSingleNode::Commit(const Transaction &t) {
VLOG(11) << "[Tx] Commiting transaction " << t.id_;
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_committed(t.id_);
active_.remove(t.id_);
if (wal_) {
wal_->Emplace(database::StateDelta::TxCommit(t.id_));
}
store_.erase(store_.find(t.id_));
}
void EngineSingleNode::Abort(const Transaction &t) {
VLOG(11) << "[Tx] Aborting transaction " << t.id_;
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
active_.remove(t.id_);
if (wal_) {
wal_->Emplace(database::StateDelta::TxAbort(t.id_));
}
store_.erase(store_.find(t.id_));
}
CommitLog::Info EngineSingleNode::Info(TransactionId tx) const {
return clog_.fetch_info(tx);
}
Snapshot EngineSingleNode::GlobalGcSnapshot() {
std::lock_guard<utils::SpinLock> guard(lock_);
// No active transactions.
if (active_.size() == 0) {
auto snapshot_copy = active_;
snapshot_copy.insert(counter_ + 1);
return snapshot_copy;
}
// There are active transactions.
auto snapshot_copy = store_.find(active_.front())->second->snapshot();
snapshot_copy.insert(active_.front());
return snapshot_copy;
}
Snapshot EngineSingleNode::GlobalActiveTransactions() {
std::lock_guard<utils::SpinLock> guard(lock_);
Snapshot active_transactions = active_;
return active_transactions;
}
TransactionId EngineSingleNode::LocalLast() const {
std::lock_guard<utils::SpinLock> guard(lock_);
return counter_;
}
TransactionId EngineSingleNode::GlobalLast() const { return LocalLast(); }
TransactionId EngineSingleNode::LocalOldestActive() const {
std::lock_guard<utils::SpinLock> guard(lock_);
return active_.empty() ? counter_ + 1 : active_.front();
}
void EngineSingleNode::GarbageCollectCommitLog(TransactionId tx_id) {
clog_.garbage_collect_older(tx_id);
}
void EngineSingleNode::LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) {
std::lock_guard<utils::SpinLock> guard(lock_);
for (auto transaction : active_) {
f(*store_.find(transaction)->second);
}
}
Transaction *EngineSingleNode::RunningTransaction(TransactionId tx_id) {
std::lock_guard<utils::SpinLock> guard(lock_);
auto found = store_.find(tx_id);
CHECK(found != store_.end())
<< "Can't return snapshot for an inactive transaction";
return found->second.get();
}
void EngineSingleNode::EnsureNextIdGreater(TransactionId tx_id) {
std::lock_guard<utils::SpinLock> guard(lock_);
counter_ = std::max(tx_id, counter_);
}
} // namespace tx

View File

@ -0,0 +1,56 @@
/// @file
#pragma once
#include <atomic>
#include <experimental/optional>
#include <unordered_map>
#include "durability/distributed/wal.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
#include "utils/thread/sync.hpp"
namespace tx {
/// Single-node deployment transaction engine. Has complete functionality.
class EngineSingleNode final : public Engine {
public:
/// @param wal - Optional. If present, the Engine will write tx
/// Begin/Commit/Abort atomically (while under lock).
explicit EngineSingleNode(durability::WriteAheadLog *wal = nullptr);
EngineSingleNode(const EngineSingleNode &) = delete;
EngineSingleNode(EngineSingleNode &&) = delete;
EngineSingleNode &operator=(const EngineSingleNode &) = delete;
EngineSingleNode &operator=(EngineSingleNode &&) = delete;
Transaction *Begin() override;
CommandId Advance(TransactionId id) override;
CommandId UpdateCommand(TransactionId id) override;
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
CommitLog::Info Info(TransactionId tx) const override;
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;
TransactionId GlobalLast() const override;
TransactionId LocalLast() const override;
TransactionId LocalOldestActive() const override;
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
Transaction *RunningTransaction(TransactionId tx_id) override;
void EnsureNextIdGreater(TransactionId tx_id) override;
void GarbageCollectCommitLog(TransactionId tx_id) override;
private:
TransactionId counter_{0};
CommitLog clog_;
std::unordered_map<TransactionId, std::unique_ptr<Transaction>> store_;
Snapshot active_;
mutable utils::SpinLock lock_;
// Optional. If present, the Engine will write tx Begin/Commit/Abort
// atomically (while under lock).
durability::WriteAheadLog *wal_{nullptr};
};
} // namespace tx

View File

@ -5,13 +5,7 @@
#include "glog/logging.h" #include "glog/logging.h"
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "durability/single_node/state_delta.hpp" #include "durability/single_node/state_delta.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "durability/distributed/state_delta.hpp"
#endif
namespace tx { namespace tx {

View File

@ -6,14 +6,7 @@
#include <experimental/optional> #include <experimental/optional>
#include <unordered_map> #include <unordered_map>
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "durability/single_node/wal.hpp" #include "durability/single_node/wal.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "durability/distributed/wal.hpp"
#endif
#include "transactions/commit_log.hpp" #include "transactions/commit_log.hpp"
#include "transactions/engine.hpp" #include "transactions/engine.hpp"
#include "transactions/transaction.hpp" #include "transactions/transaction.hpp"

View File

@ -10,8 +10,7 @@
#include "storage/distributed/edge.hpp" #include "storage/distributed/edge.hpp"
#include "storage/distributed/serialization.hpp" #include "storage/distributed/serialization.hpp"
#include "storage/distributed/vertex.hpp" #include "storage/distributed/vertex.hpp"
// TODO: THIS SHOULD BE CHANGED! #include "transactions/distributed/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
using namespace storage; using namespace storage;