Add CachedDataLock throughout the code

Summary:
CachedDataLock is necessary for lru cache as remote data is no longer
persistent. Most methods internally handle this, but for methods that
return pointers or references to remote data, we need to manually
lock data.

Reviewers: msantl, ipaljak

Reviewed By: msantl

Subscribers: teon.banek, pullbot

Differential Revision: https://phabricator.memgraph.io/D1869
This commit is contained in:
Vinko Kasljevic 2019-02-15 11:18:44 +01:00
parent 7dba861534
commit 3ad5c8069c
7 changed files with 153 additions and 5 deletions

View File

@ -0,0 +1,38 @@
/// @file
#pragma once
namespace storage {
/// CachedDataLock wrapps RecordAccessors HoldCachedData and ReleaseCachedData
/// methods and provides a RAII style mechanism for releasing the lock at the
/// end of the scope.
template <typename TRecordAccessor>
class CachedDataLock {
explicit CachedDataLock(const TRecordAccessor &accessor)
: accessor_(&accessor) {
accessor_->HoldCachedData();
}
template <typename URecordAccessor>
friend CachedDataLock<URecordAccessor> GetDataLock(const URecordAccessor &);
public:
~CachedDataLock() { Release(); }
void Release() {
if (accessor_) {
accessor_->ReleaseCachedData();
accessor_ = nullptr;
}
}
private:
const TRecordAccessor *accessor_;
};
/// Constructs CachedDataLock and then lockes given accessor.
template <typename TRecordAccessor>
CachedDataLock<TRecordAccessor> GetDataLock(const TRecordAccessor &accessor) {
return CachedDataLock<TRecordAccessor>(accessor);
}
} // namespace storage

View File

@ -6,6 +6,7 @@
#include "distributed/data_manager.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "durability/distributed/state_delta.hpp"
#include "storage/distributed/cached_data_lock.hpp"
#include "storage/distributed/edge.hpp"
#include "storage/distributed/vertex.hpp"
@ -20,6 +21,7 @@ RecordAccessor<TRecord>::RecordAccessor(AddressT address,
template <typename TRecord>
PropertyValue RecordAccessor<TRecord>::PropsAt(storage::Property key) const {
auto guard = storage::GetDataLock(*this);
return current().properties_.at(key);
}
@ -29,6 +31,7 @@ void RecordAccessor<Vertex>::PropsSet(storage::Property key,
auto &dba = db_accessor();
auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), value);
auto guard = storage::GetDataLock(*this);
update().properties_.set(key, value);
if (is_local()) {
dba.UpdatePropertyIndex(key, *this, &update());
@ -43,6 +46,7 @@ void RecordAccessor<Edge>::PropsSet(storage::Property key,
auto delta = StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key), value);
auto guard = storage::GetDataLock(*this);
update().properties_.set(key, value);
ProcessDelta(delta);
}
@ -53,6 +57,7 @@ void RecordAccessor<Vertex>::PropsErase(storage::Property key) {
auto delta =
StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue::Null);
auto guard = storage::GetDataLock(*this);
update().properties_.set(key, PropertyValue::Null);
ProcessDelta(delta);
}
@ -63,6 +68,8 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) {
auto delta =
StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue::Null);
auto guard = storage::GetDataLock(*this);
update().properties_.set(key, PropertyValue::Null);
ProcessDelta(delta);
}
@ -70,6 +77,7 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) {
template <typename TRecord>
void RecordAccessor<TRecord>::PropsClear() {
std::vector<storage::Property> to_remove;
auto guard = storage::GetDataLock(*this);
for (const auto &kv : update().properties_) to_remove.emplace_back(kv.first);
for (const auto &prop : to_remove) {
PropsErase(prop);
@ -78,6 +86,7 @@ void RecordAccessor<TRecord>::PropsClear() {
template <typename TRecord>
const PropertyValueStore &RecordAccessor<TRecord>::Properties() const {
auto guard = storage::GetDataLock(*this);
return current().properties_;
}
@ -114,6 +123,7 @@ RecordAccessor<TRecord>::GlobalAddress() const {
template <typename TRecord>
RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() {
auto guard = storage::GetDataLock(*this);
if (is_local()) {
if (!new_) {
// if new_ is not set yet, look for it
@ -132,15 +142,34 @@ RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() {
return *this;
}
template <typename TRecord>
TRecord *RecordAccessor<TRecord>::GetNew() const {
if (!is_local()) {
DCHECK(remote_.lock_counter > 0) << "Remote data is missing";
}
return new_;
}
template <typename TRecord>
RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchOld() {
current_ = old_ ? old_ : new_;
return *this;
}
template <typename TRecord>
TRecord *RecordAccessor<TRecord>::GetOld() const {
if (!is_local()) {
DCHECK(remote_.lock_counter > 0) << "Remote data is missing";
}
return old_;
}
template <typename TRecord>
bool RecordAccessor<TRecord>::Reconstruct() const {
auto &dba = db_accessor();
auto guard = storage::GetDataLock(*this);
if (is_local()) {
address().local()->find_set_old_new(dba.transaction(), &old_, &new_);
} else {
@ -159,6 +188,7 @@ bool RecordAccessor<TRecord>::Reconstruct() const {
template <typename TRecord>
TRecord &RecordAccessor<TRecord>::update() const {
auto &dba = db_accessor();
auto guard = storage::GetDataLock(*this);
// Edges have lazily initialize mutable, versioned data (properties).
if (std::is_same<TRecord, Edge>::value && current_ == nullptr) {
bool reconstructed = Reconstruct();
@ -203,8 +233,40 @@ int64_t RecordAccessor<TRecord>::CypherId() const {
.cypher_id;
}
template <typename TRecord>
void RecordAccessor<TRecord>::HoldCachedData() const {
if (!is_local()) {
if (remote_.lock_counter == 0) {
// TODO (vkasljevic) uncomment once Remote has beed implemented
// remote_.data = db_accessor_->data_manager().template Find<TRecord>(
// db_accessor_->transaction().id_, Address().worker_id(),
// Address().gid(), remote_.has_updated);
}
++remote_.lock_counter;
DCHECK(remote_.lock_counter <= 10000)
<< "Something wrong with lock counter";
}
}
template <typename TRecord>
void RecordAccessor<TRecord>::ReleaseCachedData() const {
if (!is_local()) {
DCHECK(remote_.lock_counter > 0) << "Lock should exist at this point";
--remote_.lock_counter;
if (remote_.lock_counter == 0) {
// TODO (vkasljevic) uncomment once Remote has beed implemented
// remote_.data = nullptr;
}
}
}
template <typename TRecord>
const TRecord &RecordAccessor<TRecord>::current() const {
if (!is_local()) {
DCHECK(remote_.lock_counter > 0) << "Remote data is missing";
}
// Edges have lazily initialize mutable, versioned data (properties).
if (std::is_same<TRecord, Edge>::value && current_ == nullptr) {
bool reconstructed = Reconstruct();

View File

@ -98,7 +98,7 @@ class RecordAccessor {
RecordAccessor<TRecord> &SwitchNew();
/** Returns the new record pointer. */
TRecord *GetNew() const { return new_; }
TRecord *GetNew() const;
/**
* Attempts to switch this accessor to use the latest version not updated by
@ -111,7 +111,7 @@ class RecordAccessor {
RecordAccessor<TRecord> &SwitchOld();
/** Returns the old record pointer. */
TRecord *GetOld() const { return old_; }
TRecord *GetOld() const;
/**
* Reconstructs the internal state of the record accessor so it uses the
@ -158,6 +158,19 @@ class RecordAccessor {
*/
int64_t CypherId() const;
/**
* If accessor holds remote record, this method will hold remote data until
* released. This is needed for methods that return pointers.
* This method can be called multiple times.
*/
void HoldCachedData() const;
/**
* If accessor holds remote record, this method will release remote data.
* This is needed for methods that return pointers.
*/
void ReleaseCachedData() const;
protected:
/**
* The database::GraphDbAccessor is friend to this accessor so it can
@ -183,6 +196,7 @@ class RecordAccessor {
* This pointer can be null if created by an accessor which lazily reads from
* mvcc.
*/
// TODO (vkasljevic) remove this
mutable TRecord *current_{nullptr};
/** Returns the current version (either new_ or old_) set on this
@ -197,6 +211,13 @@ class RecordAccessor {
AddressT address_;
struct Remote {
/* Keeps track of how many times HoldRemoteData was called. */
mutable unsigned short lock_counter{0};
};
Remote remote_;
/**
* Latest version which is visible to the current transaction+command
* but has not been created nor modified by the current transaction+command.

View File

@ -241,6 +241,7 @@ void SaveRecordAccessor(const RecordAccessor<TRecord> &accessor,
builder->setAddress(accessor.GlobalAddress().raw());
bool reconstructed = false;
auto guard = storage::GetDataLock(accessor);
if (!accessor.GetOld() && !accessor.GetNew()) {
reconstructed = true;
bool result = accessor.Reconstruct();

View File

@ -20,6 +20,7 @@ void VertexAccessor::add_label(storage::Label label) {
auto &dba = db_accessor();
auto delta = database::StateDelta::AddLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label));
auto guard = storage::GetDataLock(*this);
auto &vertex = update();
// not a duplicate label, add it
if (!utils::Contains(vertex.labels_, label)) {
@ -37,7 +38,8 @@ void VertexAccessor::remove_label(storage::Label label) {
auto &dba = db_accessor();
auto delta = database::StateDelta::RemoveLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label));
Vertex &vertex = update();
auto guard = storage::GetDataLock(*this);
auto &vertex = update();
if (utils::Contains(vertex.labels_, label)) {
auto &labels = vertex.labels_;
auto found = std::find(labels.begin(), labels.end(), delta.label);
@ -52,11 +54,13 @@ void VertexAccessor::remove_label(storage::Label label) {
}
bool VertexAccessor::has_label(storage::Label label) const {
auto guard = storage::GetDataLock(*this);
auto &labels = this->current().labels_;
return std::find(labels.begin(), labels.end(), label) != labels.end();
}
const std::vector<storage::Label> &VertexAccessor::labels() const {
std::vector<storage::Label> VertexAccessor::labels() const {
auto guard = storage::GetDataLock(*this);
return this->current().labels_;
}
@ -66,6 +70,7 @@ void VertexAccessor::RemoveOutEdge(storage::EdgeAddress edge) {
dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge));
SwitchNew();
auto guard = storage::GetDataLock(*this);
if (current().is_expired_by(dba.transaction())) return;
update().out_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge));
@ -78,6 +83,7 @@ void VertexAccessor::RemoveInEdge(storage::EdgeAddress edge) {
dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge));
SwitchNew();
auto guard = storage::GetDataLock(*this);
if (current().is_expired_by(dba.transaction())) return;
update().in_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge));

View File

@ -8,6 +8,7 @@
#include <cppitertools/chain.hpp>
#include <cppitertools/imap.hpp>
#include "storage/distributed/cached_data_lock.hpp"
#include "storage/distributed/edge_accessor.hpp"
#include "storage/distributed/record_accessor.hpp"
#include "storage/distributed/vertex.hpp"
@ -69,10 +70,12 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
bool has_label(storage::Label label) const;
/** Returns all the Labels of the Vertex. */
const std::vector<storage::Label> &labels() const;
// TODO (vkasljevic) add proxy for labels
std::vector<storage::Label> labels() const;
/** Returns EdgeAccessors for all incoming edges. */
auto in() const {
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(current().in_.begin(), current().in_.end(),
false, address(), db_accessor());
}
@ -86,6 +89,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
*/
auto in(const VertexAccessor &dest,
const std::vector<storage::EdgeType> *edge_types = nullptr) const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(current().in_.begin(dest.address(), edge_types),
current().in_.end(), false, address(),
db_accessor());
@ -98,6 +104,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
* or empty, the parameter is ignored.
*/
auto in(const std::vector<storage::EdgeType> *edge_types) const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(
current().in_.begin(std::experimental::nullopt, edge_types),
current().in_.end(), false, address(), db_accessor());
@ -105,6 +114,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
/** Returns EdgeAccessors for all outgoing edges. */
auto out() const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(current().out_.begin(), current().out_.end(),
true, address(), db_accessor());
}
@ -119,6 +131,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
*/
auto out(const VertexAccessor &dest,
const std::vector<storage::EdgeType> *edge_types = nullptr) const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(
current().out_.begin(dest.address(), edge_types), current().out_.end(),
true, address(), db_accessor());
@ -131,6 +146,9 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
* or empty, the parameter is ignored.
*/
auto out(const std::vector<storage::EdgeType> *edge_types) const {
// This is temporary
// TODO (vkasljevic) prepare iterators for lru cache
auto guard = storage::GetDataLock(*this);
return MakeAccessorIterator(
current().out_.begin(std::experimental::nullopt, edge_types),
current().out_.end(), true, address(), db_accessor());

View File

@ -45,6 +45,7 @@ TEST_F(DistributedDataExchangeTest, RemoteDataGetting) {
{
auto w1_dba = worker(1).Access(master_dba->transaction_id());
VertexAccessor v1_in_w1{{v1_id, 0}, *w1_dba};
auto guard = storage::GetDataLock(v1_in_w1);
EXPECT_NE(v1_in_w1.GetOld(), nullptr);
EXPECT_EQ(v1_in_w1.GetNew(), nullptr);
EXPECT_EQ(v1_in_w1.PropsAt(w1_dba->Property("p1")).Value<int64_t>(), 42);
@ -54,6 +55,7 @@ TEST_F(DistributedDataExchangeTest, RemoteDataGetting) {
{
auto w2_dba = worker(2).Access(master_dba->transaction_id());
VertexAccessor v2_in_w2{{v2_id, 0}, *w2_dba};
auto guard = storage::GetDataLock(v2_in_w2);
EXPECT_NE(v2_in_w2.GetOld(), nullptr);
EXPECT_EQ(v2_in_w2.GetNew(), nullptr);
EXPECT_EQ(v2_in_w2.PropsAt(w2_dba->Property("p2")).Value<std::string>(),