Refactor transaction inside db_accessor. Also introduce advance_command.
Summary: Modification to MVCC - CHECK! Refactor transaction_read Add automatic commit/abort functionality to graph_db_accessor. Fix failing test in graph_db_accessor. Reviewers: buda, dtomicevic, florijan Reviewed By: buda, florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D87
This commit is contained in:
parent
91bd7ccfb6
commit
57f1b40d8d
@ -318,7 +318,6 @@ set(memgraph_src_files
|
||||
# ${src_dir}/storage/record_accessor.cpp
|
||||
${src_dir}/transactions/snapshot.cpp
|
||||
${src_dir}/transactions/transaction.cpp
|
||||
${src_dir}/transactions/transaction_read.cpp
|
||||
${src_dir}/template_engine/engine.cpp
|
||||
${src_dir}/logging/streams/stdout.cpp
|
||||
${src_dir}/logging/levels.cpp
|
||||
|
@ -8,14 +8,38 @@
|
||||
#include "utils/assert.hpp"
|
||||
|
||||
GraphDbAccessor::GraphDbAccessor(GraphDb& db)
|
||||
: db_(db), transaction_(std::move(db.tx_engine.begin())) {}
|
||||
: db_(db), transaction_(db.tx_engine.begin()) {}
|
||||
|
||||
GraphDbAccessor::~GraphDbAccessor() {
|
||||
if (!commited_ && !aborted_) {
|
||||
this->abort();
|
||||
}
|
||||
}
|
||||
|
||||
const std::string& GraphDbAccessor::name() const { return db_.name_; }
|
||||
|
||||
void GraphDbAccessor::advance_command() {
|
||||
transaction_->engine.advance(transaction_->id);
|
||||
}
|
||||
|
||||
void GraphDbAccessor::commit() {
|
||||
debug_assert(commited_ == false && aborted_ == false,
|
||||
"Already aborted or commited transaction.");
|
||||
transaction_->commit();
|
||||
commited_ = true;
|
||||
}
|
||||
|
||||
void GraphDbAccessor::abort() {
|
||||
debug_assert(commited_ == false && aborted_ == false,
|
||||
"Already aborted or commited transaction.");
|
||||
transaction_->abort();
|
||||
aborted_ = true;
|
||||
}
|
||||
|
||||
VertexAccessor GraphDbAccessor::insert_vertex() {
|
||||
// create a vertex
|
||||
auto vertex_vlist = new mvcc::VersionList<Vertex>();
|
||||
Vertex* vertex = vertex_vlist->insert(transaction_);
|
||||
Vertex* vertex = vertex_vlist->insert(*transaction_);
|
||||
|
||||
// insert the newly created record into the main storage
|
||||
// TODO make the number of tries configurable
|
||||
@ -33,7 +57,7 @@ bool GraphDbAccessor::remove_vertex(VertexAccessor& vertex_accessor) {
|
||||
if (vertex_accessor.out_degree() > 0 || vertex_accessor.in_degree() > 0)
|
||||
return false;
|
||||
|
||||
vertex_accessor.vlist_.remove(&vertex_accessor.update(), transaction_);
|
||||
vertex_accessor.vlist_.remove(&vertex_accessor.update(), *transaction_);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -46,7 +70,7 @@ void GraphDbAccessor::detach_remove_vertex(VertexAccessor& vertex_accessor) {
|
||||
for (auto edge_accessor : vertex_accessor.out()) remove_edge(edge_accessor);
|
||||
|
||||
// mvcc removal of the vertex
|
||||
vertex_accessor.vlist_.remove(&vertex_accessor.update(), transaction_);
|
||||
vertex_accessor.vlist_.remove(&vertex_accessor.update(), *transaction_);
|
||||
}
|
||||
|
||||
EdgeAccessor GraphDbAccessor::insert_edge(VertexAccessor& from,
|
||||
@ -55,7 +79,7 @@ EdgeAccessor GraphDbAccessor::insert_edge(VertexAccessor& from,
|
||||
// create an edge
|
||||
auto edge_vlist = new mvcc::VersionList<Edge>();
|
||||
Edge* edge =
|
||||
edge_vlist->insert(transaction_, from.vlist_, to.vlist_, edge_type);
|
||||
edge_vlist->insert(*transaction_, from.vlist_, to.vlist_, edge_type);
|
||||
|
||||
// set the vertex connections to this edge
|
||||
from.update().out_.emplace_back(edge_vlist);
|
||||
@ -87,7 +111,7 @@ void swap_out_edge(std::vector<mvcc::VersionList<Edge>*>& edges,
|
||||
void GraphDbAccessor::remove_edge(EdgeAccessor& edge_accessor) {
|
||||
swap_out_edge(edge_accessor.from().update().out_, &edge_accessor.vlist_);
|
||||
swap_out_edge(edge_accessor.to().update().in_, &edge_accessor.vlist_);
|
||||
edge_accessor.vlist_.remove(&edge_accessor.update(), transaction_);
|
||||
edge_accessor.vlist_.remove(&edge_accessor.update(), *transaction_);
|
||||
}
|
||||
|
||||
GraphDb::Label GraphDbAccessor::label(const std::string& label_name) {
|
||||
|
@ -5,14 +5,14 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "cppitertools/imap.hpp"
|
||||
#include "cppitertools/filter.hpp"
|
||||
#include "cppitertools/imap.hpp"
|
||||
|
||||
#include "graph_db.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
|
||||
#include "storage/vertex_accessor.hpp"
|
||||
#include "storage/edge_accessor.hpp"
|
||||
#include "storage/vertex_accessor.hpp"
|
||||
|
||||
/**
|
||||
* An accessor for the database object: exposes functions
|
||||
@ -22,6 +22,7 @@
|
||||
* a new Vertex should take care of all the book-keeping around
|
||||
* the creation.
|
||||
*/
|
||||
|
||||
class GraphDbAccessor {
|
||||
public:
|
||||
/**
|
||||
@ -30,6 +31,7 @@ class GraphDbAccessor {
|
||||
* @param db The database
|
||||
*/
|
||||
GraphDbAccessor(GraphDb& db);
|
||||
~GraphDbAccessor();
|
||||
|
||||
/**
|
||||
* Returns the name of the database of this accessor.
|
||||
@ -67,14 +69,14 @@ class GraphDbAccessor {
|
||||
*/
|
||||
auto vertices() {
|
||||
// filter out the accessors not visible to the current transaction
|
||||
auto filtered = iter::filter([this](auto vlist) {
|
||||
return vlist->find(this->transaction_) != nullptr;
|
||||
}, db_.vertices_.access());
|
||||
auto filtered = iter::filter(
|
||||
[this](auto vlist) { return vlist->find(*transaction_) != nullptr; },
|
||||
db_.vertices_.access());
|
||||
|
||||
// return accessors of the filtered out vlists
|
||||
return iter::imap([this](auto vlist) {
|
||||
return VertexAccessor(*vlist, *this);
|
||||
}, std::move(filtered));
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return VertexAccessor(*vlist, *this); },
|
||||
std::move(filtered));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -101,14 +103,14 @@ class GraphDbAccessor {
|
||||
*/
|
||||
auto edges() {
|
||||
// filter out the accessors not visible to the current transaction
|
||||
auto filtered = iter::filter([this](auto vlist) {
|
||||
return vlist->find(transaction_) != nullptr;
|
||||
}, db_.edges_.access());
|
||||
auto filtered = iter::filter(
|
||||
[this](auto vlist) { return vlist->find(*transaction_) != nullptr; },
|
||||
db_.edges_.access());
|
||||
|
||||
// return accessors of the filtered out vlists
|
||||
return iter::imap([this](auto vlist) {
|
||||
return EdgeAccessor(*vlist, *this);
|
||||
}, std::move(filtered));
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return EdgeAccessor(*vlist, *this); },
|
||||
std::move(filtered));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -153,9 +155,46 @@ class GraphDbAccessor {
|
||||
*/
|
||||
std::string& property_name(const GraphDb::Property property) const;
|
||||
|
||||
/** The current transaction */
|
||||
tx::Transaction transaction_;
|
||||
/**
|
||||
* Advances transaction's command id by 1.
|
||||
*/
|
||||
void advance_command();
|
||||
|
||||
/**
|
||||
* Commit transaction.
|
||||
*/
|
||||
void commit();
|
||||
|
||||
/**
|
||||
* Abort transaction.
|
||||
*/
|
||||
void abort();
|
||||
|
||||
/**
|
||||
* Init accessor record with vlist.
|
||||
* @args accessor whose record to initialize.
|
||||
*/
|
||||
template <typename TRecord>
|
||||
void init_record(RecordAccessor<TRecord>& accessor) {
|
||||
accessor.record_ = accessor.vlist_.find(*transaction_);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update accessor record with vlist.
|
||||
* @args accessor whose record to update if possible.
|
||||
*/
|
||||
template <typename TRecord>
|
||||
void update(RecordAccessor<TRecord>& accessor) {
|
||||
if (!accessor.record_->is_visible_write(*transaction_))
|
||||
accessor.record_ = accessor.vlist_.update(*transaction_);
|
||||
}
|
||||
|
||||
private:
|
||||
GraphDb& db_;
|
||||
|
||||
/** The current transaction */
|
||||
tx::Transaction* const transaction_;
|
||||
|
||||
bool commited_{false};
|
||||
bool aborted_{false};
|
||||
};
|
||||
|
@ -5,7 +5,7 @@
|
||||
|
||||
#include "transactions/commit_log.hpp"
|
||||
#include "transactions/engine.hpp"
|
||||
#include "transactions/transaction_read.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
|
||||
#include "mvcc/cre_exp.hpp"
|
||||
#include "mvcc/hints.hpp"
|
||||
@ -51,7 +51,7 @@ class Record : public Version<T> {
|
||||
// RecordLock lock;
|
||||
|
||||
// check if this record is visible to the transaction t
|
||||
bool visible(const tx::TransactionRead &t) {
|
||||
bool visible(const tx::Transaction &t) {
|
||||
// TODO check if the record was created by a transaction that has been
|
||||
// aborted. one might implement this by checking the hints in mvcc
|
||||
// anc/or consulting the commit log
|
||||
@ -59,9 +59,11 @@ class Record : public Version<T> {
|
||||
// Mike Olson says 17 march 1993: the tests in this routine are correct;
|
||||
// if you think they're not, you're wrong, and you should think about it
|
||||
// again. i know, it happened to me.
|
||||
// This implementation is different than the original one by the <= between
|
||||
// cmd.cre() and t.cid, instead of just <. This is the behaviour we want.
|
||||
|
||||
return ((tx.cre() == t.id && // inserted by the current transaction
|
||||
cmd.cre() <= t.cid && // before this command, and
|
||||
cmd.cre() <= t.cid && // before or during this command, and
|
||||
(tx.exp() == Id(0) || // the row has not been deleted, or
|
||||
(tx.exp() == t.id && // it was deleted by the current
|
||||
// transaction
|
||||
@ -79,29 +81,29 @@ class Record : public Version<T> {
|
||||
))));
|
||||
}
|
||||
|
||||
void mark_created(const tx::TransactionRead &t) {
|
||||
void mark_created(const tx::Transaction &t) {
|
||||
tx.cre(t.id);
|
||||
cmd.cre(t.cid);
|
||||
}
|
||||
|
||||
void mark_deleted(const tx::TransactionRead &t) {
|
||||
void mark_deleted(const tx::Transaction &t) {
|
||||
tx.exp(t.id);
|
||||
cmd.exp(t.cid);
|
||||
}
|
||||
|
||||
bool exp_committed(const Id &id, const tx::TransactionRead &t) {
|
||||
bool exp_committed(const Id &id, const tx::Transaction &t) {
|
||||
return committed(hints.exp, id, t);
|
||||
}
|
||||
|
||||
bool exp_committed(const tx::TransactionRead &t) {
|
||||
bool exp_committed(const tx::Transaction &t) {
|
||||
return committed(hints.exp, tx.exp(), t);
|
||||
}
|
||||
|
||||
bool cre_committed(const Id &id, const tx::TransactionRead &t) {
|
||||
bool cre_committed(const Id &id, const tx::Transaction &t) {
|
||||
return committed(hints.cre, id, t);
|
||||
}
|
||||
|
||||
bool cre_committed(const tx::TransactionRead &t) {
|
||||
bool cre_committed(const tx::Transaction &t) {
|
||||
return committed(hints.cre, tx.cre(), t);
|
||||
}
|
||||
|
||||
@ -112,7 +114,7 @@ class Record : public Version<T> {
|
||||
|
||||
// TODO: Test this
|
||||
// True if this record is visible for write.
|
||||
bool is_visible_write(const tx::TransactionRead &t) {
|
||||
bool is_visible_write(const tx::Transaction &t) {
|
||||
return (tx.cre() == t.id && // inserted by the current transaction
|
||||
cmd.cre() <= t.cid && // before this command, and
|
||||
(tx.exp() == Id(0) || // the row has not been deleted, or
|
||||
@ -123,11 +125,11 @@ class Record : public Version<T> {
|
||||
|
||||
protected:
|
||||
template <class U>
|
||||
bool committed(U &hints, const Id &id, const tx::TransactionRead &t) {
|
||||
bool committed(U &hints, const Id &id, const tx::Transaction &t) {
|
||||
// you certainly can't see the transaction with id greater than yours
|
||||
// as that means it started after this transaction and if it committed,
|
||||
// it committed after this transaction had started.
|
||||
if (id > t.id) return false;
|
||||
if (id >= t.id) return false;
|
||||
|
||||
// The creating transaction is still in progress (examine snapshot)
|
||||
if (t.in_snapshot(id)) return false;
|
||||
|
@ -101,7 +101,7 @@ class VersionList {
|
||||
|
||||
void vacuum() {}
|
||||
|
||||
T *find(const tx::TransactionRead &t) const {
|
||||
T *find(const tx::Transaction &t) const {
|
||||
auto r = head.load(std::memory_order_seq_cst);
|
||||
|
||||
// nullptr
|
||||
|
@ -7,9 +7,8 @@
|
||||
template <typename TRecord>
|
||||
RecordAccessor<TRecord>::RecordAccessor(mvcc::VersionList<TRecord> &vlist,
|
||||
GraphDbAccessor &db_accessor)
|
||||
: vlist_(vlist),
|
||||
record_(vlist_.find(db_accessor.transaction_)),
|
||||
db_accessor_(db_accessor) {
|
||||
: db_accessor_(db_accessor), vlist_(vlist), record_(nullptr) {
|
||||
db_accessor_.init_record(*this);
|
||||
debug_assert(record_ != nullptr, "Record is nullptr.");
|
||||
}
|
||||
|
||||
@ -17,7 +16,7 @@ template <typename TRecord>
|
||||
RecordAccessor<TRecord>::RecordAccessor(mvcc::VersionList<TRecord> &vlist,
|
||||
TRecord &record,
|
||||
GraphDbAccessor &db_accessor)
|
||||
: vlist_(vlist), record_(&record), db_accessor_(db_accessor) {
|
||||
: db_accessor_(db_accessor), vlist_(vlist), record_(&record) {
|
||||
debug_assert(record_ != nullptr, "Record is nullptr.");
|
||||
}
|
||||
|
||||
@ -33,8 +32,8 @@ size_t RecordAccessor<TRecord>::PropsErase(GraphDb::Property key) {
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
const PropertyValueStore<GraphDb::Property> &RecordAccessor<TRecord>::Properties()
|
||||
const {
|
||||
const PropertyValueStore<GraphDb::Property>
|
||||
&RecordAccessor<TRecord>::Properties() const {
|
||||
return view().properties_;
|
||||
}
|
||||
|
||||
@ -58,9 +57,7 @@ const GraphDbAccessor &RecordAccessor<TRecord>::db_accessor() const {
|
||||
|
||||
template <typename TRecord>
|
||||
TRecord &RecordAccessor<TRecord>::update() {
|
||||
if (!record_->is_visible_write(db_accessor_.transaction_))
|
||||
record_ = vlist_.update(db_accessor_.transaction_);
|
||||
|
||||
db_accessor_.update(*this);
|
||||
return *record_;
|
||||
}
|
||||
|
||||
|
@ -84,10 +84,10 @@ class RecordAccessor {
|
||||
*/
|
||||
const PropertyValueStore<GraphDb::Property>& Properties() const;
|
||||
|
||||
void PropertiesAccept(
|
||||
std::function<void(const GraphDb::Property key, const PropertyValue& prop)>
|
||||
handler,
|
||||
std::function<void()> finish = {}) const;
|
||||
void PropertiesAccept(std::function<void(const GraphDb::Property key,
|
||||
const PropertyValue& prop)>
|
||||
handler,
|
||||
std::function<void()> finish = {}) const;
|
||||
|
||||
/**
|
||||
* This should be used with care as it's comparing vlist_ pointer records and
|
||||
|
@ -26,7 +26,7 @@ class Engine : Lockable<SpinLock> {
|
||||
// Begins transaction and runs given functions in same atomic step.
|
||||
// Functions will be given Transaction&
|
||||
template <class... F>
|
||||
Transaction &begin(F... fun) {
|
||||
Transaction *begin(F... fun) {
|
||||
auto guard = this->acquire_unique();
|
||||
|
||||
auto id = Id(counter.next());
|
||||
@ -37,7 +37,7 @@ class Engine : Lockable<SpinLock> {
|
||||
|
||||
call(*t, fun...);
|
||||
|
||||
return *t;
|
||||
return t;
|
||||
}
|
||||
|
||||
Transaction &advance(const Id &id) {
|
||||
@ -48,6 +48,9 @@ class Engine : Lockable<SpinLock> {
|
||||
if (t == nullptr) throw TransactionError("transaction does not exist");
|
||||
|
||||
// this is a new command
|
||||
if (t->cid == 255)
|
||||
throw TransactionError(
|
||||
"Reached maximum number of commands in this transaction.");
|
||||
t->cid++;
|
||||
|
||||
return *t;
|
||||
|
@ -7,16 +7,16 @@
|
||||
#include "transactions/engine.hpp"
|
||||
|
||||
namespace tx {
|
||||
Transaction::Transaction(Engine &engine)
|
||||
: Transaction(Id(), Snapshot<Id>(), engine) {}
|
||||
|
||||
Transaction::Transaction(const Id &&id, const Snapshot<Id> &&snapshot,
|
||||
Engine &engine)
|
||||
: id(id), cid(1), engine(engine), snapshot(std::move(snapshot)) {}
|
||||
|
||||
Transaction::Transaction(const Id &id, const Snapshot<Id> &snapshot,
|
||||
Engine &engine)
|
||||
: TransactionRead(id, snapshot, engine) {}
|
||||
|
||||
// Returns copy of transaction_id
|
||||
TransactionRead Transaction::transaction_read() {
|
||||
TransactionRead const &t = *this;
|
||||
return t;
|
||||
}
|
||||
: id(id), cid(1), engine(engine), snapshot(snapshot) {}
|
||||
|
||||
void Transaction::wait_for_active() {
|
||||
while (snapshot.size() > 0) {
|
||||
@ -33,4 +33,16 @@ void Transaction::take_lock(RecordLock &lock) { locks.take(&lock, id); }
|
||||
void Transaction::commit() { engine.commit(*this); }
|
||||
|
||||
void Transaction::abort() { engine.abort(*this); }
|
||||
|
||||
bool Transaction::all_finished() {
|
||||
return !engine.clog.is_active(id) && snapshot.all_finished(engine);
|
||||
}
|
||||
|
||||
bool Transaction::in_snapshot(const Id &id) const {
|
||||
return snapshot.is_active(id);
|
||||
}
|
||||
|
||||
Id Transaction::oldest_active() {
|
||||
return snapshot.oldest_active().take_or(Id(id));
|
||||
}
|
||||
}
|
||||
|
@ -9,19 +9,21 @@
|
||||
#include "storage/locking/record_lock.hpp"
|
||||
#include "transactions/lock_store.hpp"
|
||||
#include "transactions/snapshot.hpp"
|
||||
#include "transactions/transaction_read.hpp"
|
||||
|
||||
namespace tx {
|
||||
|
||||
class Transaction : public TransactionRead {
|
||||
class Engine;
|
||||
|
||||
class Transaction {
|
||||
friend class Engine;
|
||||
|
||||
public:
|
||||
Transaction(Engine &engine);
|
||||
Transaction(const Id &&id, const Snapshot<Id> &&snapshot, Engine &engine);
|
||||
Transaction(const Id &id, const Snapshot<Id> &snapshot, Engine &engine);
|
||||
Transaction(const Transaction &) = delete;
|
||||
Transaction(Transaction &&) = default;
|
||||
|
||||
// Returns copy of transaction_read
|
||||
TransactionRead transaction_read();
|
||||
|
||||
// Blocks until all transactions from snapshot finish. After this method,
|
||||
// snapshot will be empty.
|
||||
void wait_for_active();
|
||||
@ -30,7 +32,27 @@ class Transaction : public TransactionRead {
|
||||
void commit();
|
||||
void abort();
|
||||
|
||||
// True if this transaction and every transaction from snapshot have
|
||||
// finished.
|
||||
bool all_finished();
|
||||
|
||||
// Return id of oldest transaction from snapshot.
|
||||
Id oldest_active();
|
||||
|
||||
// True if id is in snapshot.
|
||||
bool in_snapshot(const Id &id) const;
|
||||
|
||||
// index of this transaction
|
||||
const Id id;
|
||||
|
||||
// index of the current command in the current transaction;
|
||||
uint8_t cid;
|
||||
|
||||
Engine &engine;
|
||||
|
||||
private:
|
||||
// a snapshot of currently active transactions
|
||||
Snapshot<Id> snapshot;
|
||||
LockStore<RecordLock> locks;
|
||||
};
|
||||
}
|
||||
|
@ -1,29 +0,0 @@
|
||||
#include "transactions/transaction_read.hpp"
|
||||
|
||||
#include "transactions/engine.hpp"
|
||||
|
||||
namespace tx {
|
||||
|
||||
TransactionRead::TransactionRead(Engine &engine)
|
||||
: TransactionRead(Id(), Snapshot<Id>(), engine) {}
|
||||
|
||||
TransactionRead::TransactionRead(const Id &&id, const Snapshot<Id> &&snapshot,
|
||||
Engine &engine)
|
||||
: id(id), cid(1), snapshot(std::move(snapshot)), engine(engine) {}
|
||||
|
||||
TransactionRead::TransactionRead(const Id &id, const Snapshot<Id> &snapshot,
|
||||
Engine &engine)
|
||||
: id(id), cid(1), snapshot(snapshot), engine(engine) {}
|
||||
|
||||
bool TransactionRead::all_finished() {
|
||||
return !engine.clog.is_active(id) && snapshot.all_finished(engine);
|
||||
}
|
||||
|
||||
bool TransactionRead::in_snapshot(const Id &id) const {
|
||||
return snapshot.is_active(id);
|
||||
}
|
||||
|
||||
Id TransactionRead::oldest_active() {
|
||||
return snapshot.oldest_active().take_or(Id(id));
|
||||
}
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstdlib>
|
||||
#include <vector>
|
||||
|
||||
#include "mvcc/id.hpp"
|
||||
#include "transactions/snapshot.hpp"
|
||||
|
||||
namespace tx {
|
||||
|
||||
class Engine;
|
||||
|
||||
// Has read only capbilities.
|
||||
// TODO: Not all applicable methods in code have been changed to accept
|
||||
// TransactionRead instead of a Transaction.
|
||||
class TransactionRead {
|
||||
friend class Engine;
|
||||
|
||||
public:
|
||||
TransactionRead(Engine &engine);
|
||||
|
||||
TransactionRead(const Id &&id, const Snapshot<Id> &&snapshot, Engine &engine);
|
||||
|
||||
TransactionRead(const Id &id, const Snapshot<Id> &snapshot, Engine &engine);
|
||||
|
||||
// True if this transaction and every transaction from snapshot have
|
||||
// finished.
|
||||
bool all_finished();
|
||||
|
||||
// Return id of oldest transaction from snapshot.
|
||||
Id oldest_active();
|
||||
|
||||
// True if id is in snapshot.
|
||||
bool in_snapshot(const Id &id) const;
|
||||
|
||||
// index of this transaction
|
||||
const Id id;
|
||||
|
||||
// index of the current command in the current transaction;
|
||||
uint8_t cid;
|
||||
|
||||
Engine &engine;
|
||||
|
||||
protected:
|
||||
// a snapshot of currently active transactions
|
||||
Snapshot<Id> snapshot;
|
||||
};
|
||||
}
|
@ -18,9 +18,9 @@ int main() {
|
||||
uint64_t sum = 0;
|
||||
|
||||
for (int i = 0; i < n; ++i) {
|
||||
auto& t = engine.begin();
|
||||
sum += t.id;
|
||||
engine.commit(t);
|
||||
auto t = engine.begin();
|
||||
sum += t->id;
|
||||
engine.commit(*t);
|
||||
}
|
||||
|
||||
sums[idx] = sum;
|
||||
|
@ -158,8 +158,8 @@ bool run_general_query(GraphDbAccessor &db_accessor,
|
||||
edges_indexed.push_back(&edges[i]);
|
||||
}
|
||||
const int n = vertices_indexed.size();
|
||||
auto cmp_vertex = [](const VertexAccessor *a, const VertexAccessor *b)
|
||||
-> bool { return *a < *b; };
|
||||
auto cmp_vertex = [](const VertexAccessor *a,
|
||||
const VertexAccessor *b) -> bool { return *a < *b; };
|
||||
auto cmp_edge = [](const EdgeAccessor *a, const EdgeAccessor *b) -> bool {
|
||||
if (a->from() != b->from()) return a->from() < b->from();
|
||||
return a->to() < b->to();
|
||||
@ -169,15 +169,15 @@ bool run_general_query(GraphDbAccessor &db_accessor,
|
||||
* @param v VertexAccessor to a vertex.
|
||||
* @return position of vertex or -1 if it doesn't exist.
|
||||
*/
|
||||
auto query =
|
||||
[&vertices_indexed, &cmp_vertex](const VertexAccessor &v) -> int {
|
||||
int pos = lower_bound(vertices_indexed.begin(), vertices_indexed.end(),
|
||||
&v, cmp_vertex) -
|
||||
vertices_indexed.begin();
|
||||
if (pos == (int)vertices_indexed.size() || *vertices_indexed[pos] != v)
|
||||
return -1;
|
||||
return pos;
|
||||
};
|
||||
auto query = [&vertices_indexed,
|
||||
&cmp_vertex](const VertexAccessor &v) -> int {
|
||||
int pos = lower_bound(vertices_indexed.begin(), vertices_indexed.end(), &v,
|
||||
cmp_vertex) -
|
||||
vertices_indexed.begin();
|
||||
if (pos == (int)vertices_indexed.size() || *vertices_indexed[pos] != v)
|
||||
return -1;
|
||||
return pos;
|
||||
};
|
||||
/**
|
||||
* Update bitset of neighbours. Set bit to 1 for index of every vertex
|
||||
* endpoint of edges with type default_outfit.
|
||||
@ -235,16 +235,15 @@ bool run_general_query(GraphDbAccessor &db_accessor,
|
||||
* @return EdgeAccessor* if it exists, nullptr otherwise.
|
||||
*/
|
||||
auto get_edge = [&edges_indexed](
|
||||
const VertexAccessor &first,
|
||||
const VertexAccessor &second) -> EdgeAccessor *{
|
||||
auto cmp_edge_to_pair =
|
||||
[](const EdgeAccessor *edge,
|
||||
const pair<const VertexAccessor *, const VertexAccessor *> &e)
|
||||
-> bool {
|
||||
if (edge->from() != *e.first) return edge->from() < *e.first;
|
||||
if (edge->to() != *e.second) return edge->to() < *e.second;
|
||||
return false;
|
||||
};
|
||||
const VertexAccessor &first,
|
||||
const VertexAccessor &second) -> EdgeAccessor * {
|
||||
auto cmp_edge_to_pair = [](
|
||||
const EdgeAccessor *edge,
|
||||
const pair<const VertexAccessor *, const VertexAccessor *> &e) -> bool {
|
||||
if (edge->from() != *e.first) return edge->from() < *e.first;
|
||||
if (edge->to() != *e.second) return edge->to() < *e.second;
|
||||
return false;
|
||||
};
|
||||
auto pos = lower_bound(edges_indexed.begin(), edges_indexed.end(),
|
||||
std::make_pair(&first, &second), cmp_edge_to_pair) -
|
||||
edges_indexed.begin();
|
||||
@ -268,19 +267,18 @@ bool run_general_query(GraphDbAccessor &db_accessor,
|
||||
* @param V index of clique vertices in vertices_indexed.
|
||||
* @return score if profile_index exists, else 0.
|
||||
*/
|
||||
auto calc_score =
|
||||
[&db_accessor, &vertices, &profile_index, &vertices_indexed, &get_edge](
|
||||
const std::vector<int> &V) -> int {
|
||||
int res = 0;
|
||||
if (profile_index == -1) return 0;
|
||||
for (auto x : V) {
|
||||
auto edge = get_edge(vertices[profile_index], *vertices_indexed[x]);
|
||||
if (edge == nullptr) continue;
|
||||
auto prop = TypedValue(edge->PropsAt(db_accessor.property("score")));
|
||||
if (prop.type() == TypedValue::Type::Int) res += prop.Value<int>();
|
||||
}
|
||||
return res;
|
||||
};
|
||||
auto calc_score = [&db_accessor, &vertices, &profile_index, &vertices_indexed,
|
||||
&get_edge](const std::vector<int> &V) -> int {
|
||||
int res = 0;
|
||||
if (profile_index == -1) return 0;
|
||||
for (auto x : V) {
|
||||
auto edge = get_edge(vertices[profile_index], *vertices_indexed[x]);
|
||||
if (edge == nullptr) continue;
|
||||
auto prop = TypedValue(edge->PropsAt(db_accessor.property("score")));
|
||||
if (prop.type() == TypedValue::Type::Int) res += prop.Value<int>();
|
||||
}
|
||||
return res;
|
||||
};
|
||||
if (query_type == CliqueQuery::SCORE_AND_LIMIT) {
|
||||
auto cmp_results = [&calc_score](const std::vector<int> &first,
|
||||
const std::vector<int> &second) {
|
||||
@ -305,6 +303,6 @@ bool run_general_query(GraphDbAccessor &db_accessor,
|
||||
stream.write(calc_score(results[i]));
|
||||
}
|
||||
stream.write_meta("r");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
stream.write_field("p");
|
||||
stream.write_vertex_record(v);
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
stream.write_field("p");
|
||||
stream.write_vertex_record(v);
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
stream.write_field("p");
|
||||
stream.write_vertex_record(v);
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
stream.write_field("g");
|
||||
stream.write_vertex_record(v);
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
stream.write_field("g");
|
||||
stream.write_vertex_record(v);
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
stream.write_field("g");
|
||||
stream.write_vertex_record(v);
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
for (auto v : db_accessor.vertices()) db_accessor.detach_remove_vertex(v);
|
||||
stream.write_empty_fields();
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -28,7 +28,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
}
|
||||
}
|
||||
stream.write_meta("r");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
~CPUPlan() {}
|
||||
|
@ -46,7 +46,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
stream.write_edge_record(e);
|
||||
}
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -29,6 +29,6 @@ bool run_general_query(GraphDbAccessor &db_accessor,
|
||||
}
|
||||
}
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
}
|
||||
}
|
||||
stream.write_meta("r");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
~CPUPlan() {}
|
||||
|
@ -48,7 +48,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
}
|
||||
}
|
||||
stream.write_meta("r");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
stream.write_edge_record(e);
|
||||
}
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -49,7 +49,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
}
|
||||
}
|
||||
stream.write_meta("rw");
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -14,7 +14,7 @@ class CPUPlan : public PlanInterface<Stream> {
|
||||
public:
|
||||
bool run(GraphDbAccessor &db_accessor, const PropertyValueStore<> &args,
|
||||
Stream &stream) {
|
||||
db_accessor.transaction_.commit();
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -50,6 +50,8 @@ TEST(GraphDbAccessorTest, RemoveVertexSameTransaction) {
|
||||
EXPECT_EQ(CountVertices(accessor), 1);
|
||||
|
||||
EXPECT_TRUE(accessor.remove_vertex(va1));
|
||||
EXPECT_EQ(CountVertices(accessor), 1);
|
||||
accessor.advance_command();
|
||||
EXPECT_EQ(CountVertices(accessor), 0);
|
||||
}
|
||||
|
||||
@ -59,14 +61,14 @@ TEST(GraphDbAccessorTest, RemoveVertexDifferentTransaction) {
|
||||
// first transaction creates a vertex
|
||||
GraphDbAccessor accessor1 = dbms.active();
|
||||
accessor1.insert_vertex();
|
||||
accessor1.transaction_.commit();
|
||||
accessor1.commit();
|
||||
|
||||
// second transaction checks that it sees it, and deletes it
|
||||
GraphDbAccessor accessor2 = dbms.active();
|
||||
EXPECT_EQ(CountVertices(accessor2), 1);
|
||||
for (auto vertex_accessor : accessor2.vertices())
|
||||
accessor2.remove_vertex(vertex_accessor);
|
||||
accessor2.transaction_.commit();
|
||||
accessor2.commit();
|
||||
|
||||
// third transaction checks that it does not see the vertex
|
||||
GraphDbAccessor accessor3 = dbms.active();
|
||||
@ -120,14 +122,14 @@ TEST(GraphDbAccessorTest, RemoveEdge) {
|
||||
EXPECT_EQ(CountEdges(dba1), 2);
|
||||
|
||||
// remove all [:hates] edges
|
||||
dba1.transaction_.commit();
|
||||
dba1.commit();
|
||||
GraphDbAccessor dba2 = dbms.active();
|
||||
EXPECT_EQ(CountEdges(dba2), 2);
|
||||
for (auto edge : dba2.edges())
|
||||
if (edge.edge_type() == dba2.edge_type("hates")) dba2.remove_edge(edge);
|
||||
|
||||
// current state: (v1) - [:likes] -> (v2), (v3)
|
||||
dba2.transaction_.commit();
|
||||
dba2.commit();
|
||||
GraphDbAccessor dba3 = dbms.active();
|
||||
EXPECT_EQ(CountEdges(dba3), 1);
|
||||
EXPECT_EQ(CountVertices(dba3), 3);
|
||||
@ -177,14 +179,14 @@ TEST(GraphDbAccessorTest, DetachRemoveVertex) {
|
||||
// DETACH REMOVE V3
|
||||
// new situation: (v1) - [:likes] -> (v2)
|
||||
dba1.detach_remove_vertex(va3);
|
||||
dba1.transaction_.commit();
|
||||
dba1.commit();
|
||||
GraphDbAccessor dba2 = dbms.active();
|
||||
|
||||
EXPECT_EQ(CountVertices(dba2), 2);
|
||||
EXPECT_EQ(CountEdges(dba2), 1);
|
||||
for (auto va : dba2.vertices()) EXPECT_FALSE(dba2.remove_vertex(va));
|
||||
|
||||
dba2.transaction_.commit();
|
||||
dba2.commit();
|
||||
GraphDbAccessor dba3 = dbms.active();
|
||||
EXPECT_EQ(CountVertices(dba3), 2);
|
||||
EXPECT_EQ(CountEdges(dba3), 1);
|
||||
@ -195,7 +197,7 @@ TEST(GraphDbAccessorTest, DetachRemoveVertex) {
|
||||
break;
|
||||
}
|
||||
|
||||
dba3.transaction_.commit();
|
||||
dba3.commit();
|
||||
GraphDbAccessor dba4 = dbms.active();
|
||||
EXPECT_EQ(CountVertices(dba4), 1);
|
||||
EXPECT_EQ(CountEdges(dba4), 0);
|
||||
@ -204,7 +206,7 @@ TEST(GraphDbAccessorTest, DetachRemoveVertex) {
|
||||
// so that should work
|
||||
for (auto va : dba4.vertices()) EXPECT_TRUE(dba4.remove_vertex(va));
|
||||
|
||||
dba4.transaction_.commit();
|
||||
dba4.commit();
|
||||
GraphDbAccessor dba5 = dbms.active();
|
||||
EXPECT_EQ(CountVertices(dba5), 0);
|
||||
EXPECT_EQ(CountEdges(dba5), 0);
|
||||
|
Loading…
Reference in New Issue
Block a user