moved transactions back to threading
This commit is contained in:
parent
b8fa8fd904
commit
49dc08b9b3
mvcc
threading/sync
transactions
@ -1,44 +0,0 @@
|
||||
#ifndef MEMGRAPH_TRANSACTION_COMMIT_LOG_HPP
|
||||
#define MEMGRAPH_TRANSACTION_COMMIT_LOG_HPP
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
#include <boost/dynamic_bitset/dynamic_bitset.hpp>
|
||||
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "threading/sync/lockable.hpp"
|
||||
|
||||
// optimize allocation performance by preallocating chunks in like 2MB or so
|
||||
// optimize memory by purging old transactions after vacuuming or something
|
||||
// this is not sustainable in the long run
|
||||
|
||||
class CommitLog : Lockable<SpinLock>
|
||||
{
|
||||
void begin()
|
||||
{
|
||||
auto guard = this->acquire();
|
||||
log.push_back(0);
|
||||
}
|
||||
|
||||
void mark_aborted(uint64_t)
|
||||
{
|
||||
// this would be useful in a different implementation
|
||||
}
|
||||
|
||||
void mark_committed(uint64_t id)
|
||||
{
|
||||
auto guard = this->acquire();
|
||||
log.set(id, true);
|
||||
}
|
||||
|
||||
bool is_committed(uint64_t id)
|
||||
{
|
||||
auto guard = this->acquire();
|
||||
return log[id];
|
||||
}
|
||||
|
||||
private:
|
||||
boost::dynamic_bitset<uint64_t> log;
|
||||
};
|
||||
|
||||
#endif
|
@ -8,7 +8,12 @@ template <class lock_t = SpinLock>
|
||||
class Lockable
|
||||
{
|
||||
protected:
|
||||
std::unique_lock<lock_t> acquire()
|
||||
std::lock_guard<lock_t> acquire_guard()
|
||||
{
|
||||
return std::lock_guard<lock_t>(lock);
|
||||
}
|
||||
|
||||
std::unique_lock<lock_t> acquire_unique()
|
||||
{
|
||||
return std::unique_lock<lock_t>(lock);
|
||||
}
|
||||
|
@ -5,7 +5,8 @@
|
||||
#include <cstdint>
|
||||
#include <vector>
|
||||
|
||||
#include "commit_log.hpp"
|
||||
namespace tx
|
||||
{
|
||||
|
||||
struct Transaction
|
||||
{
|
||||
@ -26,7 +27,7 @@ struct Transaction
|
||||
|
||||
// check weather the transaction with the xid looks committed from the
|
||||
// database snapshot given to this transaction
|
||||
bool committed(uint64_t xid) const
|
||||
bool looks_committed(uint64_t xid) const
|
||||
{
|
||||
// transaction xid is newer than id and therefore not visible at all
|
||||
if (xid > id)
|
||||
@ -44,4 +45,6 @@ struct Transaction
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
39
transactions/transaction_cache.hpp
Normal file
39
transactions/transaction_cache.hpp
Normal file
@ -0,0 +1,39 @@
|
||||
#ifndef MEMGRAPH_TRANSACTIONS_TRANSACTION_CACHE_HPP
|
||||
#define MEMGRAPH_TRANSACTIONS_TRANSACTION_CACHE_HPP
|
||||
|
||||
#include <map>
|
||||
#include <memory>
|
||||
|
||||
#include "transaction.hpp"
|
||||
|
||||
namespace tx
|
||||
{
|
||||
|
||||
template <class id_t>
|
||||
class TransactionCache
|
||||
{
|
||||
public:
|
||||
|
||||
Transaction* get(id_t id) const
|
||||
{
|
||||
auto it = cache.find(id);
|
||||
return it != cache.end() ? it->second : nullptr;
|
||||
}
|
||||
|
||||
void put(id_t id, Transaction* transaction)
|
||||
{
|
||||
cache.emplace(std::make_pair(id, transaction));
|
||||
}
|
||||
|
||||
void del(id_t id)
|
||||
{
|
||||
cache.erase(id);
|
||||
}
|
||||
|
||||
private:
|
||||
std::map<id_t, std::unique_ptr<Transaction>> cache;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -10,26 +10,53 @@
|
||||
#include <algorithm>
|
||||
|
||||
#include "transaction.hpp"
|
||||
#include "transaction_cache.hpp"
|
||||
|
||||
#include "utils/counters/simple_counter.hpp"
|
||||
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "threading/sync/lockable.hpp"
|
||||
|
||||
namespace tx
|
||||
{
|
||||
|
||||
class TransactionError : std::runtime_error
|
||||
{
|
||||
public:
|
||||
using std::runtime_error::runtime_error;
|
||||
};
|
||||
|
||||
class TransactionEngine : Lockable<SpinLock>
|
||||
{
|
||||
public:
|
||||
TransactionEngine(uint64_t n) : counter(n) {}
|
||||
|
||||
Transaction begin()
|
||||
const Transaction& begin()
|
||||
{
|
||||
auto guard = this->acquire();
|
||||
|
||||
auto id = counter.next();
|
||||
auto t = Transaction(id, active);
|
||||
auto t = new Transaction(id, active);
|
||||
|
||||
active.push_back(id);
|
||||
cache.put(id, t);
|
||||
|
||||
return t;
|
||||
return *t;
|
||||
}
|
||||
|
||||
const Transaction& advance(uint64_t id)
|
||||
{
|
||||
auto guard = this->acquire();
|
||||
|
||||
auto* t = cache.get(id);
|
||||
|
||||
if(t == nullptr)
|
||||
throw TransactionError("transaction does not exist");
|
||||
|
||||
// this is a new command
|
||||
t->cid++;
|
||||
|
||||
return *t;
|
||||
}
|
||||
|
||||
void commit(const Transaction& t)
|
||||
@ -42,7 +69,6 @@ public:
|
||||
void rollback(const Transaction& t)
|
||||
{
|
||||
auto guard = this->acquire();
|
||||
// what to do here?
|
||||
|
||||
finalize(t);
|
||||
}
|
||||
@ -70,16 +96,20 @@ public:
|
||||
private:
|
||||
void finalize(const Transaction& t)
|
||||
{
|
||||
auto x = t.id;
|
||||
|
||||
// remove transaction from the active transactions list
|
||||
auto last = std::remove(active.begin(), active.end(), x);
|
||||
auto last = std::remove(active.begin(), active.end(), t.id);
|
||||
active.erase(last, active.end());
|
||||
|
||||
// remove transaction from cache
|
||||
cache.del(t.id);
|
||||
}
|
||||
|
||||
SimpleCounter<uint64_t> counter;
|
||||
|
||||
std::vector<uint64_t> active;
|
||||
TransactionCache<uint64_t> cache;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user