memgraph/transactions/engine.hpp

116 lines
2.3 KiB
C++
Raw Normal View History

#ifndef MEMGRAPH_TRANSACTIONS_ENGINE_HPP
#define MEMGRAPH_TRANSACTIONS_ENGINE_HPP
#include <atomic>
#include <vector>
#include "transaction.hpp"
2015-10-04 15:47:15 +08:00
#include "transaction_cache.hpp"
#include "commit_log.hpp"
2015-10-04 15:47:15 +08:00
2015-07-31 18:36:41 +08:00
#include "utils/counters/simple_counter.hpp"
#include "threading/sync/spinlock.hpp"
#include "threading/sync/lockable.hpp"
2015-10-04 15:47:15 +08:00
namespace tx
{
class TransactionError : std::runtime_error
{
public:
using std::runtime_error::runtime_error;
};
class Engine : Lockable<SpinLock>
2015-07-31 18:36:41 +08:00
{
public:
using sptr = std::shared_ptr<Engine>;
Engine() : counter(0) {}
2015-10-04 15:47:15 +08:00
const Transaction& begin()
{
auto guard = this->acquire_unique();
2015-07-31 18:36:41 +08:00
auto id = counter.next();
2015-10-04 15:47:15 +08:00
auto t = new Transaction(id, active);
active.push_back(id);
2015-10-04 15:47:15 +08:00
cache.put(id, t);
return *t;
}
2015-10-04 15:47:15 +08:00
const Transaction& advance(uint64_t id)
{
auto guard = this->acquire_unique();
2015-10-04 15:47:15 +08:00
auto* t = cache.get(id);
if(t == nullptr)
throw TransactionError("transaction does not exist");
// this is a new command
t->cid++;
return *t;
}
2015-07-31 18:36:41 +08:00
void commit(const Transaction& t)
{
auto guard = this->acquire_unique();
CommitLog::get().set_committed(t.id);
2015-07-31 18:36:41 +08:00
finalize(t);
}
void abort(const Transaction& t)
{
auto guard = this->acquire_unique();
CommitLog::get().set_aborted(t.id);
2015-07-31 18:36:41 +08:00
finalize(t);
}
2015-07-31 18:36:41 +08:00
uint64_t last_known_active()
{
auto guard = this->acquire_unique();
2015-07-31 18:36:41 +08:00
return active.front();
}
// total number of transactions started from the beginning of time
2015-07-31 18:36:41 +08:00
uint64_t count()
{
auto guard = this->acquire_unique();
2015-07-31 18:36:41 +08:00
return counter.count();
}
// the number of currently active transactions
size_t size()
{
auto guard = this->acquire_unique();
return active.size();
}
private:
2015-07-31 18:36:41 +08:00
void finalize(const Transaction& t)
{
// remove transaction from the active transactions list
2015-10-04 15:47:15 +08:00
auto last = std::remove(active.begin(), active.end(), t.id);
active.erase(last, active.end());
2015-10-04 15:47:15 +08:00
// remove transaction from cache
cache.del(t.id);
}
2015-07-31 18:36:41 +08:00
SimpleCounter<uint64_t> counter;
2015-07-31 18:36:41 +08:00
std::vector<uint64_t> active;
2015-10-04 15:47:15 +08:00
TransactionCache<uint64_t> cache;
};
2015-10-04 15:47:15 +08:00
}
#endif