Implement record lock deadlock breaker
Reviewers: florijan, buda Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D823
This commit is contained in:
parent
afff458afa
commit
3140f175fc
@ -6,6 +6,7 @@
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "query/context.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
|
@ -1,17 +1,119 @@
|
||||
#include "storage/locking/record_lock.hpp"
|
||||
|
||||
LockStatus RecordLock::Lock(tx::transaction_id_t id) {
|
||||
if (mutex_.try_lock()) {
|
||||
owner_ = id;
|
||||
#include <fmt/format.h>
|
||||
#include <glog/logging.h>
|
||||
#include <experimental/optional>
|
||||
#include <stack>
|
||||
#include <unordered_set>
|
||||
|
||||
#include "threading/sync/lock_timeout_exception.hpp"
|
||||
#include "transactions/engine.hpp"
|
||||
#include "utils/on_scope_exit.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
namespace {
|
||||
|
||||
// Finds lock cycle that start transaction is a part of and returns id of oldest
|
||||
// transaction in that cycle. If start transaction is not in a cycle nullopt is
|
||||
// returned.
|
||||
std::experimental::optional<tx::transaction_id_t> FindOldestTxInLockCycle(
|
||||
tx::transaction_id_t start,
|
||||
ConcurrentMap<tx::transaction_id_t, tx::transaction_id_t>::Accessor
|
||||
&graph_accessor) {
|
||||
std::vector<tx::transaction_id_t> path;
|
||||
std::unordered_set<tx::transaction_id_t> visited;
|
||||
|
||||
auto current = start;
|
||||
|
||||
do {
|
||||
visited.insert(current);
|
||||
path.push_back(current);
|
||||
auto it = graph_accessor.find(current);
|
||||
if (it == graph_accessor.end()) return std::experimental::nullopt;
|
||||
current = it->second;
|
||||
} while (visited.find(current) == visited.end());
|
||||
|
||||
if (current == start) {
|
||||
// start is a part of the cycle, return oldest transaction.
|
||||
CHECK(path.size() >= 2U) << "Cycle must have at least two nodes";
|
||||
return *std::min(path.begin(), path.end());
|
||||
}
|
||||
|
||||
// There is a cycle, but start is not a part of it. Some transaction that is
|
||||
// in a cycle will find it and abort oldest transaction.
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) {
|
||||
if (lock_.try_lock()) {
|
||||
owner_ = tx.id_;
|
||||
return LockStatus::Acquired;
|
||||
}
|
||||
|
||||
if (owner_ == id) return LockStatus::AlreadyHeld;
|
||||
tx::transaction_id_t owner = owner_;
|
||||
if (owner_ == tx.id_) return LockStatus::AlreadyHeld;
|
||||
|
||||
mutex_.lock(&kTimeout);
|
||||
return LockStatus::Acquired;
|
||||
// Insert edge into lock_graph.
|
||||
auto accessor = engine.lock_graph().access();
|
||||
auto it = accessor.insert(tx.id_, owner).first;
|
||||
|
||||
auto abort_oldest_tx_in_lock_cycle = [&tx, &accessor, &engine]() {
|
||||
// Find oldest transaction in lock cycle if cycle exists and notify that
|
||||
// transaction that it should abort.
|
||||
// TODO: maybe we can be smarter and abort some other transaction and not
|
||||
// the
|
||||
// oldest one.
|
||||
auto oldest = FindOldestTxInLockCycle(tx.id_, accessor);
|
||||
if (oldest) {
|
||||
engine.ForEachActiveTransaction([&](tx::Transaction &t) {
|
||||
if (t.id_ == oldest) {
|
||||
t.set_should_abort();
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
abort_oldest_tx_in_lock_cycle();
|
||||
|
||||
// Make sure to erase edge on function exit. Either function will throw and
|
||||
// transaction will be killed so we should erase the edge because transaction
|
||||
// won't exist anymore or owner_ will finish and we will be able to acquire
|
||||
// the lock.
|
||||
utils::OnScopeExit cleanup{[&tx, &accessor] { accessor.remove(tx.id_); }};
|
||||
|
||||
utils::Timer t;
|
||||
while (t.Elapsed() < kTimeout) {
|
||||
if (tx.should_abort()) {
|
||||
// Message could be incorrect. Transaction could be aborted because it was
|
||||
// running for too long time, but that is unlikely and it is not very
|
||||
// important which exception (and message) we throw here.
|
||||
throw LockTimeoutException(
|
||||
"Transaction was aborted since it was oldest in a lock cycle");
|
||||
}
|
||||
if (lock_.try_lock()) {
|
||||
owner_ = tx.id_;
|
||||
return LockStatus::Acquired;
|
||||
}
|
||||
if (owner != owner_) {
|
||||
// Owner changed while we were spinlocking. Update the edge and rerun
|
||||
// cycle resolution routine.
|
||||
// TODO: we should make sure that first transaction that tries to acquire
|
||||
// already held lock succeeds in acquiring the lock once transaction that
|
||||
// was lock owner finishes. That would probably reduce number of aborted
|
||||
// transactions.
|
||||
owner = owner_;
|
||||
it->second = owner;
|
||||
abort_oldest_tx_in_lock_cycle();
|
||||
}
|
||||
cpu_relax();
|
||||
}
|
||||
|
||||
throw LockTimeoutException(fmt::format(
|
||||
"Transaction locked for more than {} seconds", kTimeout.count()));
|
||||
}
|
||||
|
||||
void RecordLock::Unlock() { mutex_.unlock(); }
|
||||
void RecordLock::Unlock() { lock_.unlock(); }
|
||||
|
||||
constexpr struct timespec RecordLock::kTimeout;
|
||||
constexpr std::chrono::duration<double> RecordLock::kTimeout;
|
||||
|
@ -1,18 +1,34 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <unordered_set>
|
||||
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "storage/locking/lock_status.hpp"
|
||||
#include "threading/sync/futex.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
|
||||
class RecordLock {
|
||||
// TODO arbitrary constant, reconsider
|
||||
static constexpr struct timespec kTimeout { 2, 0 };
|
||||
namespace tx {
|
||||
class Engine;
|
||||
class Transaction;
|
||||
};
|
||||
|
||||
class RecordLock {
|
||||
public:
|
||||
LockStatus Lock(tx::transaction_id_t id);
|
||||
LockStatus Lock(const tx::Transaction &id, tx::Engine &engine);
|
||||
|
||||
void Unlock();
|
||||
|
||||
private:
|
||||
Futex mutex_;
|
||||
tx::transaction_id_t owner_;
|
||||
// Arbitrary choosen constant, postgresql uses 1 second so do we.
|
||||
constexpr static std::chrono::duration<double> kTimeout{
|
||||
std::chrono::seconds(1)};
|
||||
|
||||
// TODO: Because of the current architecture it is somewhat OK to use SpinLock
|
||||
// here. Once we reimplement worker architecture to execute some other
|
||||
// transaction in this thread while other is waiting for a lock this will had
|
||||
// to change to something else.
|
||||
SpinLock lock_;
|
||||
std::atomic<tx::transaction_id_t> owner_{0};
|
||||
};
|
||||
|
@ -2,8 +2,10 @@
|
||||
|
||||
#include <unistd.h>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
|
||||
#include "threading/sync/cpu_relax.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
/**
|
||||
* @class SpinLock
|
||||
@ -17,12 +19,16 @@
|
||||
class SpinLock {
|
||||
public:
|
||||
void lock() { // Before was memory_order_acquire
|
||||
while (lock_flag.test_and_set(std::memory_order_seq_cst)) cpu_relax();
|
||||
while (lock_flag_.test_and_set()) {
|
||||
cpu_relax();
|
||||
}
|
||||
}
|
||||
// Before was memory_order_release
|
||||
void unlock() { lock_flag.clear(std::memory_order_seq_cst); }
|
||||
void unlock() { lock_flag_.clear(); }
|
||||
|
||||
bool try_lock() { return !lock_flag_.test_and_set(); }
|
||||
|
||||
private:
|
||||
// guaranteed by standard to be lock free!
|
||||
mutable std::atomic_flag lock_flag = ATOMIC_FLAG_INIT;
|
||||
mutable std::atomic_flag lock_flag_ = ATOMIC_FLAG_INIT;
|
||||
};
|
||||
|
@ -1,4 +1,4 @@
|
||||
#pragma onces
|
||||
#pragma once
|
||||
|
||||
#include <unistd.h>
|
||||
#include <atomic>
|
||||
|
@ -154,8 +154,10 @@ class Engine : Lockable<SpinLock> {
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns this engine's commit log */
|
||||
auto &clog() const { return clog_; }
|
||||
const auto &clog() const { return clog_; }
|
||||
|
||||
auto &lock_graph() { return lock_graph_; }
|
||||
const auto &lock_graph() const { return lock_graph_; }
|
||||
|
||||
private:
|
||||
// Commit log of this engine.
|
||||
@ -177,5 +179,13 @@ class Engine : Lockable<SpinLock> {
|
||||
|
||||
// Storage for the transactions.
|
||||
TransactionStore<transaction_id_t> store_;
|
||||
|
||||
// For each active transaction we store a transaction that holds a lock that
|
||||
// mentioned transaction is also trying to acquire. We can think of this
|
||||
// data structure as a graph: key being a start node of directed edges and
|
||||
// value being an end node of that edge. ConcurrentMap is used since it is
|
||||
// garbage collected and we are sure that we will not be having problems with
|
||||
// lifetimes of each object.
|
||||
ConcurrentMap<transaction_id_t, transaction_id_t> lock_graph_;
|
||||
};
|
||||
}
|
||||
|
@ -3,20 +3,24 @@
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include "storage/locking/lock_status.hpp"
|
||||
#include "storage/locking/record_lock.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/assert.hpp"
|
||||
|
||||
namespace tx {
|
||||
|
||||
template <class T>
|
||||
class Engine;
|
||||
class Transaction;
|
||||
|
||||
class LockStore {
|
||||
class LockHolder {
|
||||
public:
|
||||
LockHolder() = default;
|
||||
|
||||
template <class... Args>
|
||||
LockHolder(T *lock, Args &&... args) : lock_(lock) {
|
||||
LockHolder(RecordLock *lock, const Transaction &tx, tx::Engine &engine)
|
||||
: lock_(lock) {
|
||||
debug_assert(lock != nullptr, "Lock is nullptr.");
|
||||
auto status = lock_->Lock(std::forward<Args>(args)...);
|
||||
auto status = lock_->Lock(tx, engine);
|
||||
|
||||
if (status != LockStatus::Acquired) {
|
||||
lock_ = nullptr;
|
||||
@ -34,6 +38,7 @@ class LockStore {
|
||||
if (this == &other) return *this;
|
||||
lock_ = other.lock_;
|
||||
other.lock_ = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
~LockHolder() {
|
||||
@ -45,13 +50,12 @@ class LockStore {
|
||||
bool active() const { return lock_ != nullptr; }
|
||||
|
||||
private:
|
||||
T *lock_{nullptr};
|
||||
RecordLock *lock_{nullptr};
|
||||
};
|
||||
|
||||
public:
|
||||
template <class... Args>
|
||||
void Take(T *lock, Args &&... args) {
|
||||
locks_.emplace_back(LockHolder(lock, std::forward<Args>(args)...));
|
||||
void Take(RecordLock *lock, const tx::Transaction &tx, tx::Engine &engine) {
|
||||
locks_.emplace_back(LockHolder(lock, tx, engine));
|
||||
if (!locks_.back().active()) {
|
||||
locks_.pop_back();
|
||||
}
|
||||
|
@ -4,10 +4,9 @@
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
|
||||
#include "transaction.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
#include "utils/assert.hpp"
|
||||
#include "utils/option.hpp"
|
||||
|
||||
namespace tx {
|
||||
|
||||
|
@ -8,7 +8,9 @@ Transaction::Transaction(transaction_id_t id, const Snapshot &snapshot,
|
||||
Engine &engine)
|
||||
: id_(id), engine_(engine), snapshot_(snapshot) {}
|
||||
|
||||
void Transaction::TakeLock(RecordLock &lock) { locks_.Take(&lock, id_); }
|
||||
void Transaction::TakeLock(RecordLock &lock) {
|
||||
locks_.Take(&lock, *this, engine_);
|
||||
}
|
||||
|
||||
void Transaction::Commit() { engine_.Commit(*this); }
|
||||
|
||||
|
@ -3,14 +3,16 @@
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <cstdlib>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "storage/locking/record_lock.hpp"
|
||||
#include "threading/sync/lockable.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "transactions/lock_store.hpp"
|
||||
#include "transactions/snapshot.hpp"
|
||||
#include "type.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
|
||||
namespace tx {
|
||||
|
||||
@ -75,13 +77,17 @@ class Transaction {
|
||||
private:
|
||||
// Index of the current command in the current transaction.
|
||||
command_id_t cid_{1};
|
||||
|
||||
// A snapshot of currently active transactions.
|
||||
const Snapshot snapshot_;
|
||||
|
||||
// Record locks held by this transaction.
|
||||
LockStore<RecordLock> locks_;
|
||||
LockStore locks_;
|
||||
|
||||
// True if transaction should abort. Used to signal query executor that it
|
||||
// should stop execution, it is only a hint, transaction can disobey.
|
||||
std::atomic<bool> should_abort_{false};
|
||||
|
||||
// Creation time.
|
||||
const std::chrono::time_point<std::chrono::steady_clock> creation_time_{
|
||||
std::chrono::steady_clock::now()};
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
namespace {
|
||||
|
||||
@ -66,8 +67,11 @@ communication::bolt::QueryData ExecuteNTimesTillSuccess(
|
||||
return ret;
|
||||
} catch (const utils::BasicException &e) {
|
||||
last_exception = e;
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::milliseconds(rand_dist_(pseudo_rand_gen_)));
|
||||
utils::Timer t;
|
||||
std::chrono::microseconds to_sleep(rand_dist_(pseudo_rand_gen_));
|
||||
while (t.Elapsed() < to_sleep) {
|
||||
cpu_relax();
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG(WARNING) << query << " failed " << times << "times";
|
||||
|
Loading…
Reference in New Issue
Block a user