refactored storage model and engine
This commit is contained in:
parent
335f42ef43
commit
22ee84de0d
@ -5,19 +5,14 @@
|
||||
|
||||
#include "record.hpp"
|
||||
|
||||
template <class id_t, class lock_t>
|
||||
struct Vertex;
|
||||
|
||||
template <class id_t,
|
||||
class lock_t>
|
||||
struct Edge : public Record<Edge<id_t, lock_t>, id_t, lock_t>
|
||||
struct Edge : public Record<Edge>
|
||||
{
|
||||
Edge(uint64_t id) : Record<Edge<id_t, lock_t>, id_t, lock_t>(id) {}
|
||||
|
||||
using vertex_t = Vertex<id_t, lock_t>;
|
||||
Edge(uint64_t id) : Record<Edge>(id) {}
|
||||
|
||||
// pointer to the vertex this edge points to
|
||||
vertex_t* to;
|
||||
Vertex* to;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -1,13 +1,16 @@
|
||||
#ifndef MEMGRAPH_DATA_MODEL_GRAPH_HPP
|
||||
#define MEMGRAPH_DATA_MODEL_GRAPH_HPP
|
||||
|
||||
#include <vector>
|
||||
#include <list>
|
||||
|
||||
#include "node.hpp"
|
||||
#include "vertex.hpp"
|
||||
#include "edge.hpp"
|
||||
|
||||
struct Graph
|
||||
class Graph
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
std::list<
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -3,94 +3,29 @@
|
||||
|
||||
#include <mutex>
|
||||
|
||||
#include "sync/spinlock.hpp"
|
||||
#include "utils/crtp.hpp"
|
||||
|
||||
template <class Derived,
|
||||
class xid_t,
|
||||
class lock_t>
|
||||
#include "sync/spinlock.hpp"
|
||||
#include "sync/lockable.hpp"
|
||||
|
||||
#include "storage/model/utils/mvcc.hpp"
|
||||
|
||||
template <class Derived>
|
||||
class Record
|
||||
: public Crtp<Derived>,
|
||||
public Mvcc<Derived>,
|
||||
Lockable<SpinLock>
|
||||
{
|
||||
public:
|
||||
Record(uint64_t id = 0) : id(id), xmin_(0), xmax_(0), cmax_(0), cmin_(0),
|
||||
newer_(nullptr) {}
|
||||
|
||||
using record_t = Record<xid_t, lock_t, Derived>;
|
||||
Record(uint64_t id) : id(id) {}
|
||||
|
||||
// every node has a unique id. 2^64 = 1.8 x 10^19. that should be enough
|
||||
// for a looong time :) but keep in mind that some vacuuming would be nice
|
||||
// to reuse indices for deleted nodes.
|
||||
uint64_t id;
|
||||
|
||||
// acquire an exclusive guard on this node, used by mvcc to guard
|
||||
// concurrent writes to this record (update - update, update - delete)
|
||||
std::unique_lock<lock_t> guard()
|
||||
{
|
||||
return std::unique_lock<lock_t>(lock);
|
||||
}
|
||||
|
||||
xid_t xmin()
|
||||
{
|
||||
return xmin_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void xmin(xid_t value)
|
||||
{
|
||||
xmin_.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
xid_t xmax()
|
||||
{
|
||||
return xmax_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void xmax(xid_t value)
|
||||
{
|
||||
return xmax_.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
uint8_t cmin()
|
||||
{
|
||||
return cmin_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void cmin(uint8_t value)
|
||||
{
|
||||
cmin_.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
uint8_t cmax()
|
||||
{
|
||||
return cmax_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void cmax(uint8_t value)
|
||||
{
|
||||
return cmax_.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
record_t* newer()
|
||||
{
|
||||
return newer_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void newer(record_t* value)
|
||||
{
|
||||
newer_.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
Derived& derived()
|
||||
{
|
||||
return *static_cast<Derived*>(this);
|
||||
}
|
||||
|
||||
private:
|
||||
// used by MVCC to keep track of what's visible to transactions
|
||||
std::atomic<xid_t> xmin_, xmax_;
|
||||
std::atomic<uint8_t> cmin_, cmax_;
|
||||
|
||||
std::atomic<record_t*> newer_;
|
||||
|
||||
lock_t lock;
|
||||
// TODO add real data here
|
||||
};
|
||||
|
||||
#endif
|
||||
|
38
storage/model/utils/minmax.hpp
Normal file
38
storage/model/utils/minmax.hpp
Normal file
@ -0,0 +1,38 @@
|
||||
#ifndef MEMGRAPH_STORAGE_MODEL_UTILS_MINMAX_HPP
|
||||
#define MEMGRAPH_STORAGE_MODEL_UTILS_MINMAX_HPP
|
||||
|
||||
#include <atomic>
|
||||
|
||||
template <class T>
|
||||
class MinMax
|
||||
{
|
||||
public:
|
||||
MinMax() : min_(0), max_(0) {}
|
||||
|
||||
MinMax(T min, T max) : min_(min), max_(max) {}
|
||||
|
||||
T min()
|
||||
{
|
||||
return min_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void min(T value)
|
||||
{
|
||||
min_.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
T max()
|
||||
{
|
||||
return max_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void max(T value)
|
||||
{
|
||||
max_.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<T> min_, max_;
|
||||
};
|
||||
|
||||
#endif
|
87
storage/model/utils/mvcc.hpp
Normal file
87
storage/model/utils/mvcc.hpp
Normal file
@ -0,0 +1,87 @@
|
||||
#ifndef MEMGRAPH_STORAGE_MODEL_UTILS_MVCC_HPP
|
||||
#define MEMGRAPH_STORAGE_MODEL_UTILS_MVCC_HPP
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include "transaction/transaction.hpp"
|
||||
#include "storage/model/utils/minmax.hpp"
|
||||
#include "storage/model/utils/version.hpp"
|
||||
|
||||
#include "transaction/commit_log.hpp"
|
||||
|
||||
// the mvcc implementation used here is very much like postgresql's
|
||||
// more info: https://momjian.us/main/writings/pgsql/mvcc.pdf
|
||||
|
||||
template <class T>
|
||||
class Mvcc : public Version<T>
|
||||
{
|
||||
public:
|
||||
Mvcc() {}
|
||||
|
||||
// tx.min is the id of the transaction that created the record
|
||||
// and tx.max is the id of the transaction that deleted the record
|
||||
// these values are used to determine the visibility of the record
|
||||
// to the current transaction
|
||||
MinMax<uint64_t> tx;
|
||||
|
||||
// cmd.min is the id of the command in this transaction that created the
|
||||
// record and cmd.max is the id of the command in this transaction that
|
||||
// deleted the record. these values are used to determine the visibility
|
||||
// of the record to the current command in the running transaction
|
||||
MinMax<uint8_t> cmd;
|
||||
|
||||
// check if this record is visible to the transaction t
|
||||
bool visible(const Transaction& t)
|
||||
{
|
||||
// TODO check if the record was created by a transaction that has been
|
||||
// aborted. one might implement this by checking the hints in mvcc
|
||||
// anc/or consulting the commit log
|
||||
|
||||
// Mike Olson says 17 march 1993: the tests in this routine are correct;
|
||||
// if you think they're not, you're wrong, and you should think about it
|
||||
// again. i know, it happened to me.
|
||||
|
||||
return ((tx.min() == t.id && // inserted by the current transaction
|
||||
cmd.min() < t.cid && // before this command, and
|
||||
(tx.max() == 0 || // the row has not been deleted, or
|
||||
(tx.max() == t.id && // it was deleted by the current
|
||||
// transaction
|
||||
cmd.max() >= t.cid))) // but not before this command,
|
||||
|| // or
|
||||
(t.committed(tx.min()) && // the record was inserted by a
|
||||
// committed transaction, and
|
||||
(tx.max() == 0 || // the record has not been deleted, or
|
||||
(tx.max() == t.id && // the row is being deleted by this
|
||||
// transaction
|
||||
cmd.max() >= t.cid) || // but it's not deleted "yet", or
|
||||
(tx.max() != t.id && // the row was deleted by another
|
||||
// transaction
|
||||
!t.committed(tx.max()) // that has not been committed
|
||||
))));
|
||||
}
|
||||
|
||||
// inspects the record change history and returns the record version visible
|
||||
// to the current transaction if it exists, otherwise it returns nullptr
|
||||
T* latest(const Transaction& t)
|
||||
{
|
||||
T* record = this, newer = this->newer();
|
||||
|
||||
// move down through the versions of the nodes until you find the first
|
||||
// one visible to this transaction. if no visible records are found,
|
||||
// the function returns a nullptr
|
||||
while(newer != nullptr && !newer->visible(t))
|
||||
record = newer, newer = record->newer();
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
protected:
|
||||
// known committed and known aborted for both xmax and xmin
|
||||
// this hints are used to quickly check the commit/abort status of the
|
||||
// transaction that created this record. if these are not set, one should
|
||||
// consult the commit log to find the status and update the status here
|
||||
// more info https://wiki.postgresql.org/wiki/Hint_Bits
|
||||
std::atomic<uint8_t> hints;
|
||||
};
|
||||
|
||||
#endif
|
34
storage/model/utils/version.hpp
Normal file
34
storage/model/utils/version.hpp
Normal file
@ -0,0 +1,34 @@
|
||||
#ifndef MEMGRAPH_STORAGE_MODEL_UTILS_VERSION_HPP
|
||||
#define MEMGRAPH_STORAGE_MODEL_UTILS_VERSION_HPP
|
||||
|
||||
#include <atomic>
|
||||
|
||||
template <class T>
|
||||
class Version
|
||||
{
|
||||
public:
|
||||
Version() : versions(nullptr) {}
|
||||
|
||||
Version(T* value) : versions(value) {}
|
||||
|
||||
// return a pointer to a newer version stored in this record
|
||||
T* newer()
|
||||
{
|
||||
return versions.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
// set a newer version of this record
|
||||
void newer(T* value)
|
||||
{
|
||||
versions.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
private:
|
||||
// this is an atomic singly-linked list of all versions. this pointer
|
||||
// points to a newer version of this record. the newer version also has
|
||||
// this pointer which points to an even more recent version. if no newer
|
||||
// version is present, this value points to a nullptr
|
||||
std::atomic<T*> versions;
|
||||
};
|
||||
|
||||
#endif
|
@ -6,16 +6,12 @@
|
||||
#include "record.hpp"
|
||||
#include "edge.hpp"
|
||||
|
||||
template <class id_t,
|
||||
class lock_t>
|
||||
struct Vertex : public Record<Vertex<id_t, lock_t>, id_t, lock_t>
|
||||
struct Vertex : public Record<Vertex>
|
||||
{
|
||||
Vertex(uint64_t id) : Record<Vertex<id_t, lock_t>, id_t, lock_t>(id) {}
|
||||
|
||||
using edge_t = Edge<id_t, lock_t>;
|
||||
Vertex(uint64_t id) : Record<Vertex>(id) {}
|
||||
|
||||
// adjacency list containing pointers to outgoing edges from this vertex
|
||||
std::vector<edge_t*> out;
|
||||
std::vector<Edge*> out;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -8,54 +8,61 @@
|
||||
#include "storage/model/record.hpp"
|
||||
#include "storage/visible.hpp"
|
||||
#include "memory/memory_engine.hpp"
|
||||
#include "utils/counters/atomic_counter.hpp"
|
||||
|
||||
template <class id_t,
|
||||
class lock_t>
|
||||
class StorageEngine
|
||||
{
|
||||
template <class T>
|
||||
using record_t = Record<T, id_t, lock_t>;
|
||||
using memory_engine_t = MemoryEngine<id_t, lock_t>;
|
||||
|
||||
public:
|
||||
StorageEngine(memory_engine_t& memory) : memory(memory) {}
|
||||
StorageEngine(MemoryEngine& memory) : vertex_counter(0), edge_counter(0),
|
||||
memory(memory) {}
|
||||
|
||||
template <class T>
|
||||
bool insert(record_t<T>** record,
|
||||
const Transaction<id_t>& t)
|
||||
bool insert(T** record, const Transaction& t)
|
||||
{
|
||||
auto n = next<T>();
|
||||
*record = memory.create<T>(n);
|
||||
|
||||
// set creating transaction
|
||||
(*record)->tx.min(t.id);
|
||||
|
||||
}
|
||||
|
||||
template <class T>
|
||||
bool update(record_t<T>* record,
|
||||
record_t<T>** updated,
|
||||
const Transaction<id_t>& t)
|
||||
{
|
||||
// put a lock on the node to prevent other writers from modifying it
|
||||
auto guard = record->guard();
|
||||
|
||||
// find the newest visible version of the record about to be updated
|
||||
auto newest = max_visible(record, t);
|
||||
|
||||
if(newest == nullptr)
|
||||
return false; // another transaction just deleted it!
|
||||
|
||||
*updated = memory.allocate<T>();
|
||||
*updated = *newest; // copy the data in the current node TODO memset
|
||||
|
||||
newest->newer(latest);
|
||||
*updated_record = newest
|
||||
|
||||
// set creating command of this transaction
|
||||
(*record)->cmd.min(t.cid);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
bool remove(record_t<T>& record,
|
||||
const Transaction<id_t>& t)
|
||||
bool update(T* record,
|
||||
T** updated,
|
||||
const Transaction& t)
|
||||
{
|
||||
// put a lock on the node to prevent other writers from modifying it
|
||||
auto guard = record.guard();
|
||||
auto guard = record->guard();
|
||||
|
||||
// find the record visible version of the record about to be updated
|
||||
record = record->latest(t);
|
||||
|
||||
if(record == nullptr)
|
||||
return false; // another transaction just deleted it!
|
||||
|
||||
*updated = memory.allocate<T>();
|
||||
**updated = *record; // copy the data in the current node TODO memset?
|
||||
|
||||
record->newer(*updated);
|
||||
return true;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
bool remove(T& record,
|
||||
const Transaction& t)
|
||||
{
|
||||
// put a lock on the node to prevent other writers from modifying it
|
||||
auto guard = record->guard();
|
||||
|
||||
auto latest = record.latest(t);
|
||||
|
||||
if(record == nullptr)
|
||||
return false;
|
||||
|
||||
// only mark the record as deleted if it isn't already deleted
|
||||
// this prevents phantom reappearance of the deleted nodes
|
||||
@ -68,23 +75,35 @@ public:
|
||||
// running and determined that the record hasn't been deleted yet
|
||||
// even though T1 already committed before T3 even started!
|
||||
|
||||
if(record.xmax())
|
||||
return false; // another transaction just deleted it!
|
||||
if(record->xmax())
|
||||
return false; // another transaction deleted it!
|
||||
|
||||
record.xmax(t.id);
|
||||
record->xmax(t.id);
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
std::unique_lock<lock_t> acquire()
|
||||
{
|
||||
return std::unique_lock<lock_t>(lock);
|
||||
}
|
||||
|
||||
memory_engine_t& memory;
|
||||
template<class T>
|
||||
uint64_t next();
|
||||
|
||||
lock_t lock;
|
||||
AtomicCounter<uint64_t> vertex_counter;
|
||||
AtomicCounter<uint64_t> edge_counter;
|
||||
|
||||
MemoryEngine& memory;
|
||||
};
|
||||
|
||||
|
||||
template<>
|
||||
uint64_t StorageEngine::next<Vertex>()
|
||||
{
|
||||
return vertex_counter.next();
|
||||
}
|
||||
|
||||
template<>
|
||||
uint64_t StorageEngine::next<Edge>()
|
||||
{
|
||||
return edge_counter.next();
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -1,52 +0,0 @@
|
||||
#ifndef MEMGRAPH_STORAGE_VISIBLE_HPP
|
||||
#define MEMGRAPH_STORAGE_VISIBLE_HPP
|
||||
|
||||
#include "transaction/transaction.hpp"
|
||||
#include "model/record.hpp"
|
||||
#include "model/vertex.hpp"
|
||||
#include "model/edge.hpp"
|
||||
|
||||
template <class T,
|
||||
class id_t,
|
||||
class lock_t>
|
||||
bool visible(const Record<T, id_t, lock_t>& r, const Transaction<id_t>& t)
|
||||
{
|
||||
// Mike Olson says 17 march 1993: the tests in this routine are correct;
|
||||
// if you think they're not, you're wrong, and you should think about it
|
||||
// again. i know, it happened to me.
|
||||
|
||||
return ((r.xmin() == t.id && // inserted by the current transaction
|
||||
r.cmin() < t.cid && // before this command, and
|
||||
(r.xmax() == 0 || // the row has not been deleted, or
|
||||
(r.xmax() == t.id && // it was deleted by the current
|
||||
// transaction
|
||||
r.cmax() >= t.cid))) // but not before this command,
|
||||
|| // or
|
||||
(t.committed(r.xmin()) && // the record was inserted by a
|
||||
// committed transaction, and
|
||||
(r.xmax() > 0 || // the record has not been deleted, or
|
||||
(r.xmax() == t.id && // the row is being deleted by this
|
||||
// transaction
|
||||
r.cmax() >= t.cid) || // but it's not deleted "yet", or
|
||||
(r.xmax() != t.id && // the row was deleted by another
|
||||
// transaction
|
||||
!t.committed(r.xmax()))))); // that has not been committed
|
||||
}
|
||||
|
||||
// inspects the record change history and returns the record version visible
|
||||
// to the current transaction if it exists, otherwise it returns nullptr
|
||||
template <class T,
|
||||
class id_t,
|
||||
class lock_t>
|
||||
T* max_visible(Record<T, id_t, lock_t>* record, const Transaction<id_t>& t)
|
||||
{
|
||||
// move down through the versions of the nodes until you find the first
|
||||
// one visible to this transaction
|
||||
while(record != nullptr && !visible(*record, t))
|
||||
record = record->newer();
|
||||
|
||||
// if no visible nodes were found, return nullptr
|
||||
return record == nullptr ? record : &record->derived();
|
||||
}
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user