Refactor state deltas call

Summary:
Add label test

Index gets updated after remote update

Reviewers: florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1246
This commit is contained in:
Dominik Gleich 2018-02-26 14:55:55 +01:00
parent c8dc07ad0e
commit ef6cfc4c0e
7 changed files with 176 additions and 118 deletions

View File

@ -2,7 +2,6 @@
#include <mutex>
#include <unordered_map>
#include <unordered_map>
#include <utility>
#include <vector>
@ -112,9 +111,54 @@ class RemoteUpdatesRpcServer {
RemoteUpdateResult Apply() {
std::lock_guard<SpinLock> guard{lock_};
for (auto &kv : deltas_) {
auto &record_accessor = kv.second.first;
// We need to reconstruct the record as in the meantime some local
// update might have updated it.
record_accessor.Reconstruct();
for (database::StateDelta &delta : kv.second.second) {
try {
kv.second.first.ProcessDelta(delta);
auto &updated = record_accessor.update();
auto &dba = db_accessor_;
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_BEGIN:
case database::StateDelta::Type::TRANSACTION_COMMIT:
case database::StateDelta::Type::TRANSACTION_ABORT:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::REMOVE_VERTEX:
case database::StateDelta::Type::REMOVE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
LOG(FATAL) << "Can only apply record update deltas for remote "
"graph element";
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
record_accessor.PropsSet(delta.property, delta.value);
break;
case database::StateDelta::Type::ADD_LABEL:
// It is only possible that ADD_LABEL gets called on a
// VertexAccessor.
reinterpret_cast<VertexAccessor &>(record_accessor)
.add_label(delta.label);
break;
case database::StateDelta::Type::REMOVE_LABEL: {
// It is only possible that REMOVE_LABEL gets called on a
// VertexAccessor.
reinterpret_cast<VertexAccessor &>(record_accessor)
.remove_label(delta.label);
} break;
case database::StateDelta::Type::ADD_OUT_EDGE:
reinterpret_cast<Vertex &>(updated).out_.emplace(
dba.LocalizedAddress(delta.vertex_to_address),
dba.LocalizedAddress(delta.edge_address), delta.edge_type);
dba.wal().Emplace(delta);
break;
case database::StateDelta::Type::ADD_IN_EDGE:
reinterpret_cast<Vertex &>(updated).in_.emplace(
dba.LocalizedAddress(delta.vertex_from_address),
dba.LocalizedAddress(delta.edge_address), delta.edge_type);
dba.wal().Emplace(delta);
break;
}
} catch (const mvcc::SerializationError &) {
return RemoteUpdateResult::SERIALIZATION_ERROR;
} catch (const RecordDeletedError &) {

View File

@ -27,52 +27,52 @@ template <>
void RecordAccessor<Vertex>::PropsSet(storage::Property key,
PropertyValue value) {
auto &dba = db_accessor();
ProcessDelta(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), value));
auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), value);
update().properties_.set(key, value);
if (is_local()) {
dba.UpdatePropertyIndex(key, *this, &update());
}
ProcessDelta(delta);
}
template <>
void RecordAccessor<Edge>::PropsSet(storage::Property key,
PropertyValue value) {
auto &dba = db_accessor();
ProcessDelta(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key), value));
auto delta = StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key), value);
update().properties_.set(key, value);
ProcessDelta(delta);
}
template <>
void RecordAccessor<Vertex>::PropsErase(storage::Property key) {
auto &dba = db_accessor();
ProcessDelta(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key),
PropertyValue::Null));
auto delta =
StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue::Null);
update().properties_.set(key, PropertyValue::Null);
ProcessDelta(delta);
}
template <>
void RecordAccessor<Edge>::PropsErase(storage::Property key) {
auto &dba = db_accessor();
ProcessDelta(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key),
PropertyValue::Null));
auto delta =
StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue::Null);
update().properties_.set(key, PropertyValue::Null);
ProcessDelta(delta);
}
template <typename TRecord>
void RecordAccessor<TRecord>::PropsClear() {
auto &dba = db_accessor();
std::vector<storage::Property> to_remove;
for (const auto &kv : update().properties_) to_remove.emplace_back(kv.first);
for (const auto &prop : to_remove) {
if (std::is_same<TRecord, Vertex>::value) {
ProcessDelta(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), prop,
dba.PropertyName(prop),
PropertyValue::Null));
} else {
ProcessDelta(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), prop,
dba.PropertyName(prop),
PropertyValue::Null));
}
PropsErase(prop);
}
}
@ -196,69 +196,33 @@ const TRecord &RecordAccessor<TRecord>::current() const {
return *current_;
}
template <typename TRecord>
void RecordAccessor<TRecord>::SendDelta(
const database::StateDelta &delta) const {
DCHECK(!is_local())
<< "Only a delta created on a remote accessor should be sent";
auto result = db_accessor().db().remote_updates_clients().RemoteUpdate(
address().worker_id(), delta);
switch (result) {
case distributed::RemoteUpdateResult::DONE:
break;
case distributed::RemoteUpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError();
case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError();
case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException("Lock timeout on remote worker");
}
}
template <typename TRecord>
void RecordAccessor<TRecord>::ProcessDelta(
const database::StateDelta &delta) const {
auto &dba = db_accessor();
// We need to reconstruct the record as in the meantime some local update
// have updated it.
Reconstruct();
// Apply the delta both on local and remote data. We need to see the changes
// we make to remote data, even if it's not applied immediately.
auto &updated = update();
switch (delta.type) {
case StateDelta::Type::TRANSACTION_BEGIN:
case StateDelta::Type::TRANSACTION_COMMIT:
case StateDelta::Type::TRANSACTION_ABORT:
case StateDelta::Type::CREATE_VERTEX:
case StateDelta::Type::CREATE_EDGE:
case StateDelta::Type::REMOVE_VERTEX:
case StateDelta::Type::REMOVE_EDGE:
case StateDelta::Type::BUILD_INDEX:
LOG(FATAL)
<< "Can only apply record update deltas for remote graph element";
case StateDelta::Type::SET_PROPERTY_VERTEX:
case StateDelta::Type::SET_PROPERTY_EDGE:
updated.properties_.set(delta.property, delta.value);
break;
case StateDelta::Type::ADD_LABEL:
// It is only possible that ADD_LABEL gets calld on a VertexAccessor.
reinterpret_cast<Vertex &>(updated).labels_.emplace_back(delta.label);
break;
case StateDelta::Type::REMOVE_LABEL: {
// It is only possible that REMOVE_LABEL gets calld on a VertexAccessor.
auto &labels = reinterpret_cast<Vertex &>(updated).labels_;
auto found = std::find(labels.begin(), labels.end(), delta.label);
std::swap(*found, labels.back());
labels.pop_back();
} break;
case StateDelta::Type::ADD_OUT_EDGE:
reinterpret_cast<Vertex &>(updated).out_.emplace(
dba.LocalizedAddress(delta.vertex_to_address),
dba.LocalizedAddress(delta.edge_address), delta.edge_type);
break;
case StateDelta::Type::ADD_IN_EDGE:
reinterpret_cast<Vertex &>(updated).in_.emplace(
dba.LocalizedAddress(delta.vertex_from_address),
dba.LocalizedAddress(delta.edge_address), delta.edge_type);
break;
}
if (is_local()) {
dba.wal().Emplace(delta);
db_accessor().wal().Emplace(delta);
} else {
auto result = dba.db().remote_updates_clients().RemoteUpdate(
address().worker_id(), delta);
switch (result) {
case distributed::RemoteUpdateResult::DONE:
break;
case distributed::RemoteUpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError();
case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError();
case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException("Lock timeout on remote worker");
}
SendDelta(delta);
}
}

View File

@ -13,7 +13,7 @@
namespace database {
class GraphDbAccessor;
struct StateDelta;
};
}; // namespace database
/**
* An accessor to a database record (an Edge or a Vertex).
@ -146,15 +146,17 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
(current_state && new_ && !new_->is_expired_by(t));
}
protected:
/**
* Processes the delta that's a consequence of changes in this accessor. If
* the accessor is local that means writing the delta to the write-ahead log.
* If it's remote, then the delta needs to be sent to it's owner for
* processing.
* Sends delta for remote processing.
*/
void SendDelta(const database::StateDelta &delta) const;
/**
* Processes delta by either adding it to WAL, or by sending it remotely.
*/
void ProcessDelta(const database::StateDelta &delta) const;
protected:
/**
* Pointer to the version (either old_ or new_) that READ operations
* in the accessor should take data from. Note that WRITE operations

View File

@ -10,29 +10,39 @@ size_t VertexAccessor::out_degree() const { return current().out_.size(); }
size_t VertexAccessor::in_degree() const { return current().in_.size(); }
bool VertexAccessor::add_label(storage::Label label) {
auto &updated = update();
if (utils::Contains(updated.labels_, label)) return false;
// not a duplicate label, add it
void VertexAccessor::add_label(storage::Label label) {
auto &dba = db_accessor();
ProcessDelta(database::StateDelta::AddLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label)));
auto delta = database::StateDelta::AddLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label));
Vertex &vertex = update();
if (is_local()) {
dba.UpdateLabelIndices(label, *this, &vertex);
// not a duplicate label, add it
if (!utils::Contains(vertex.labels_, label)) {
vertex.labels_.emplace_back(label);
if (is_local()) {
dba.wal().Emplace(delta);
dba.UpdateLabelIndices(label, *this, &vertex);
}
}
return true;
if (!is_local()) SendDelta(delta);
}
size_t VertexAccessor::remove_label(storage::Label label) {
if (!utils::Contains(update().labels_, label)) return 0;
void VertexAccessor::remove_label(storage::Label label) {
auto &dba = db_accessor();
ProcessDelta(database::StateDelta::RemoveLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label)));
return 1;
auto delta = database::StateDelta::RemoveLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label));
Vertex &vertex = update();
if (utils::Contains(vertex.labels_, label)) {
auto &labels = vertex.labels_;
auto found = std::find(labels.begin(), labels.end(), delta.label);
std::swap(*found, labels.back());
labels.pop_back();
if (is_local()) {
dba.wal().Emplace(delta);
}
}
if (!is_local()) SendDelta(delta);
}
bool VertexAccessor::has_label(storage::Label label) const {
@ -50,9 +60,10 @@ std::ostream &operator<<(std::ostream &os, const VertexAccessor &va) {
stream << va.db_accessor().LabelName(label);
});
os << " {";
utils::PrintIterable(os, va.Properties(), ", ", [&](auto &stream,
const auto &pair) {
stream << va.db_accessor().PropertyName(pair.first) << ": " << pair.second;
});
utils::PrintIterable(os, va.Properties(), ", ",
[&](auto &stream, const auto &pair) {
stream << va.db_accessor().PropertyName(pair.first)
<< ": " << pair.second;
});
return os << "})";
}

View File

@ -62,12 +62,10 @@ class VertexAccessor : public RecordAccessor<Vertex> {
/** Adds a label to the Vertex. If the Vertex already has that label the call
* has no effect. */
// TODO revise return value, is it necessary?
bool add_label(storage::Label label);
void add_label(storage::Label label);
/** Removes a label from the Vertex. Return number of removed (0, 1). */
// TODO reves return value, is it necessary?
size_t remove_label(storage::Label label);
/** Removes a label from the Vertex. */
void remove_label(storage::Label label);
/** Indicates if the Vertex has the given label. */
bool has_label(storage::Label label) const;

View File

@ -157,6 +157,45 @@ TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) {
}
}
TEST_F(DistributedGraphDbTest, AddSameLabelRemoteAndLocal) {
auto v_address = InsertVertex(worker(1));
{
database::GraphDbAccessor dba0{master()};
database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()};
auto v_local = dba1.FindVertexChecked(v_address.gid(), false);
auto v_remote = VertexAccessor(v_address, dba0);
auto l1 = dba1.Label("label");
v_remote.add_label(l1);
v_local.add_label(l1);
worker(1).remote_updates_server().Apply(dba0.transaction_id());
dba0.Commit();
}
{
database::GraphDbAccessor dba0{master()};
database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()};
auto v = dba1.FindVertexChecked(v_address.gid(), false);
EXPECT_EQ(v.labels().size(), 1);
}
}
TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) {
storage::VertexAddress v_remote = InsertVertex(worker(1));
storage::Label label;
{
database::GraphDbAccessor dba0{master()};
label = dba0.Label("label");
VertexAccessor va(v_remote, dba0);
va.add_label(label);
worker(1).remote_updates_server().Apply(dba0.transaction_id());
dba0.Commit();
}
{
database::GraphDbAccessor dba1{worker(1)};
auto vertices = dba1.Vertices(label, false);
EXPECT_EQ(std::distance(vertices.begin(), vertices.end()), 1);
}
}
class DistributedEdgeCreateTest : public DistributedGraphDbTest {
protected:
storage::VertexAddress w1_a;

View File

@ -156,31 +156,31 @@ TEST(RecordAccessor, VertexLabels) {
// adding labels
EXPECT_FALSE(v1.has_label(l1));
EXPECT_TRUE(v1.add_label(l1));
v1.add_label(l1);
EXPECT_TRUE(v1.has_label(l1));
EXPECT_EQ(v1.labels().size(), 1);
EXPECT_EQ(labels.size(), 1);
EXPECT_FALSE(v1.add_label(l1));
v1.add_label(l1);
EXPECT_EQ(v1.labels().size(), 1);
EXPECT_EQ(labels.size(), 1);
EXPECT_FALSE(v1.has_label(l2));
EXPECT_TRUE(v1.add_label(l2));
v1.add_label(l2);
EXPECT_TRUE(v1.has_label(l2));
EXPECT_EQ(v1.labels().size(), 2);
EXPECT_EQ(labels.size(), 2);
// removing labels
storage::Label l3 = dba.Label("label3");
EXPECT_EQ(v1.remove_label(l3), 0);
v1.remove_label(l3);
EXPECT_EQ(labels.size(), 2);
EXPECT_EQ(v1.remove_label(l1), 1);
v1.remove_label(l1);
EXPECT_FALSE(v1.has_label(l1));
EXPECT_EQ(v1.labels().size(), 1);
EXPECT_EQ(v1.remove_label(l1), 0);
v1.remove_label(l1);
EXPECT_TRUE(v1.has_label(l2));
}