Prepare RecordAccessor for distributed, part one

Summary:
This diff consolidates local and remote update handling. It ensures and
tests that updates for remote elements are visible locally (on the
updating worker).

The next part will be accumulating remote updates and applying them on
the owner.

Also extracted a common testing fixture.

Reviewers: dgleich, buda, mtomic

Reviewed By: mtomic

Subscribers: mtomic, pullbot

Differential Revision: https://phabricator.memgraph.io/D1169
This commit is contained in:
florijan 2018-02-05 09:48:45 +01:00
parent dca1e9eebc
commit f808252142
9 changed files with 217 additions and 181 deletions

View File

@ -29,19 +29,15 @@ class RemoteCache {
RemoteCache(distributed::RemoteDataRpcClients &remote_data_clients)
: remote_data_clients_(remote_data_clients) {}
/**
* Returns the "new" Vertex/Edge for the given gid.
*
* @param gid - global ID.
* @param init_if_necessary - If "new" is not initialized and this flag is
* set, then "new" is initialized with a copy of "old" before returning.
*/
// TODO most likely remove this function in the new remote_data_comm arch
TRecord *FindNew(gid::Gid gid, bool init_if_necessary) {
/// Returns the new data for the given ID. Creates it (as copy of old) if
/// necessary.
TRecord *FindNew(gid::Gid gid) {
std::lock_guard<std::mutex> guard{lock_};
auto found = cache_.find(gid);
DCHECK(found != cache_.end()) << "Uninitialized remote Vertex/Edge";
DCHECK(found != cache_.end())
<< "FindNew for uninitialized remote Vertex/Edge";
auto &pair = found->second;
if (!pair.second && init_if_necessary) {
if (!pair.second) {
pair.second = std::unique_ptr<TRecord>(pair.first->CloneData());
}
return pair.second.get();

View File

@ -103,4 +103,7 @@ void WriteAheadLog::Emplace(database::StateDelta &&delta) {
deltas_.emplace(std::move(delta));
}
void WriteAheadLog::Emplace(const database::StateDelta &delta) {
if (enabled_ && FLAGS_wal_flush_interval_millis >= 0) deltas_.emplace(delta);
}
} // namespace durability

View File

@ -38,9 +38,12 @@ class WriteAheadLog {
* (optional) recovery. */
void Enable() { enabled_ = true; }
// Emplaces the given DeltaState onto the buffer, if the WAL is enabled.
/// Emplaces the given DeltaState onto the buffer, if the WAL is enabled.
void Emplace(database::StateDelta &&delta);
/// Emplaces the given DeltaState onto the buffer, if the WAL is enabled.
void Emplace(const database::StateDelta &delta);
private:
/** Groups the logic of WAL file handling (flushing, naming, rotating) */
class WalFile {

View File

@ -1,6 +1,7 @@
#include "glog/logging.h"
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
#include "storage/edge.hpp"
#include "storage/record_accessor.hpp"
#include "storage/vertex.hpp"
@ -21,69 +22,54 @@ const PropertyValue &RecordAccessor<TRecord>::PropsAt(
template <>
void RecordAccessor<Vertex>::PropsSet(storage::Property key,
PropertyValue value) {
Vertex &vertex = update();
vertex.properties_.set(key, value);
auto &dba = db_accessor();
// TODO use the delta for handling.
dba.wal().Emplace(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
ProcessDelta(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), value));
if (is_local()) {
db_accessor().UpdatePropertyIndex(key, *this, &vertex);
dba.UpdatePropertyIndex(key, *this, &update());
}
}
template <>
void RecordAccessor<Edge>::PropsSet(storage::Property key,
PropertyValue value) {
update().properties_.set(key, value);
auto &dba = db_accessor();
// TODO use the delta for handling.
dba.wal().Emplace(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
ProcessDelta(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key), value));
}
template <>
size_t RecordAccessor<Vertex>::PropsErase(storage::Property key) {
void RecordAccessor<Vertex>::PropsErase(storage::Property key) {
auto &dba = db_accessor();
// TODO use the delta for handling.
dba.wal().Emplace(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
ProcessDelta(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key),
PropertyValue::Null));
return update().properties_.erase(key);
}
template <>
size_t RecordAccessor<Edge>::PropsErase(storage::Property key) {
void RecordAccessor<Edge>::PropsErase(storage::Property key) {
auto &dba = db_accessor();
// TODO use the delta for handling.
dba.wal().Emplace(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
ProcessDelta(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key),
PropertyValue::Null));
return update().properties_.erase(key);
}
template <>
void RecordAccessor<Vertex>::PropsClear() {
auto &updated = update();
// TODO use the delta for handling.
template <typename TRecord>
void RecordAccessor<TRecord>::PropsClear() {
auto &dba = db_accessor();
for (const auto &kv : updated.properties_)
dba.wal().Emplace(StateDelta::PropsSetVertex(
dba.transaction_id(), gid(), kv.first, dba.PropertyName(kv.first),
std::vector<storage::Property> to_remove;
for (const auto &kv : update().properties_) to_remove.emplace_back(kv.first);
for (const auto &prop : to_remove) {
if (std::is_same<TRecord, Vertex>::value) {
ProcessDelta(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), prop,
dba.PropertyName(prop),
PropertyValue::Null));
} else {
ProcessDelta(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), prop,
dba.PropertyName(prop),
PropertyValue::Null));
updated.properties_.clear();
}
template <>
void RecordAccessor<Edge>::PropsClear() {
auto &updated = update();
auto &dba = db_accessor();
// TODO use the delta for handling.
for (const auto &kv : updated.properties_)
dba.wal().Emplace(StateDelta::PropsSetEdge(
dba.transaction_id(), gid(), kv.first, dba.PropertyName(kv.first),
PropertyValue::Null));
updated.properties_.clear();
}
}
template <typename TRecord>
@ -187,11 +173,11 @@ TRecord &RecordAccessor<TRecord>::update() const {
if (is_local()) {
new_ = address_.local()->update(t);
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
} else {
// TODO implement
throw std::runtime_error("Not yet implemented");
new_ = db_accessor().template remote_elements<TRecord>().FindNew(
address_.gid());
}
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
return *new_;
}
@ -205,10 +191,41 @@ const TRecord &RecordAccessor<TRecord>::current() const {
}
template <typename TRecord>
void RecordAccessor<TRecord>::ProcessDelta(const GraphStateDelta &) const {
LOG(ERROR) << "Delta processing not yet implemented";
void RecordAccessor<TRecord>::ProcessDelta(
const database::StateDelta &delta) const {
// Apply the delta both on local and remote data. We need to see the changes
// we make to remote data, even if it's not applied immediately.
auto &updated = update();
switch (delta.type) {
case StateDelta::Type::TRANSACTION_BEGIN:
case StateDelta::Type::TRANSACTION_COMMIT:
case StateDelta::Type::TRANSACTION_ABORT:
case StateDelta::Type::CREATE_VERTEX:
case StateDelta::Type::CREATE_EDGE:
case StateDelta::Type::REMOVE_VERTEX:
case StateDelta::Type::REMOVE_EDGE:
case StateDelta::Type::BUILD_INDEX:
LOG(FATAL)
<< "Can only apply record update deltas for remote graph element";
case StateDelta::Type::SET_PROPERTY_VERTEX:
case StateDelta::Type::SET_PROPERTY_EDGE:
updated.properties_.set(delta.property, delta.value);
break;
case StateDelta::Type::ADD_LABEL:
// It is only possible that ADD_LABEL gets calld on a VertexAccessor.
reinterpret_cast<Vertex &>(updated).labels_.emplace_back(delta.label);
break;
case StateDelta::Type::REMOVE_LABEL: {
// It is only possible that REMOVE_LABEL gets calld on a VertexAccessor.
auto &labels = reinterpret_cast<Vertex &>(updated).labels_;
auto found = std::find(labels.begin(), labels.end(), delta.label);
std::swap(*found, labels.back());
labels.pop_back();
} break;
}
if (is_local()) {
// TODO write delta to WAL
db_accessor().wal().Emplace(delta);
} else {
// TODO use the delta to perform a remote update.
// TODO check for results (success, serialization_error, ...)

View File

@ -12,20 +12,7 @@
namespace database {
class GraphDbAccessor;
};
/// Mock class for a DB delta.
// TODO replace with the real thing.
class GraphStateDelta {
public:
/// Indicates what the result of applying the delta to the remote worker
/// (owner of the Vertex/Edge the delta affects).
enum class RemoteResult {
SUCCES,
SERIALIZATION_ERROR,
LOCK_TIMEOUT_ERROR
// TODO: network error?
};
struct StateDelta;
};
/**
@ -74,7 +61,7 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
void PropsSet(storage::Property key, PropertyValue value);
/** Erases the property for the given key. */
size_t PropsErase(storage::Property key);
void PropsErase(storage::Property key);
/** Removes all the properties from this record. */
void PropsClear();
@ -183,10 +170,8 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
* the accessor is local that means writing the delta to the write-ahead log.
* If it's remote, then the delta needs to be sent to it's owner for
* processing.
*
* @param delta The delta to process.
*/
void ProcessDelta(const GraphStateDelta &delta) const;
void ProcessDelta(const database::StateDelta &delta) const;
private:
// The database accessor for which this record accessor is created

View File

@ -3,6 +3,7 @@
#include <algorithm>
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
#include "utils/algorithm.hpp"
size_t VertexAccessor::out_degree() const { return current().out_.size(); }
@ -10,32 +11,27 @@ size_t VertexAccessor::out_degree() const { return current().out_.size(); }
size_t VertexAccessor::in_degree() const { return current().in_.size(); }
bool VertexAccessor::add_label(storage::Label label) {
auto &labels_view = current().labels_;
auto found = std::find(labels_view.begin(), labels_view.end(), label);
if (found != labels_view.end()) return false;
auto &updated = update();
if (utils::Contains(updated.labels_, label)) return false;
// not a duplicate label, add it
Vertex &vertex = update();
vertex.labels_.emplace_back(label);
auto &dba = db_accessor();
ProcessDelta(database::StateDelta::AddLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label)));
Vertex &vertex = update();
if (is_local()) {
dba.UpdateLabelIndices(label, *this, &vertex);
// TODO support distributed.
dba.wal().Emplace(database::StateDelta::AddLabel(
dba.transaction_id(), gid(), label, dba.LabelName(label)));
}
return true;
}
size_t VertexAccessor::remove_label(storage::Label label) {
auto &labels = update().labels_;
auto found = std::find(labels.begin(), labels.end(), label);
if (found == labels.end()) return 0;
if (!utils::Contains(update().labels_, label)) return 0;
std::swap(*found, labels.back());
labels.pop_back();
auto &dba = db_accessor();
// TODO support distributed.
dba.wal().Emplace(database::StateDelta::RemoveLabel(
dba.transaction_id(), gid(), label, dba.LabelName(label)));
ProcessDelta(database::StateDelta::RemoveLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label)));
return 1;
}

View File

@ -0,0 +1,70 @@
#include <experimental/optional>
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "transactions/engine_master.hpp"
template <typename T>
using optional = std::experimental::optional<T>;
class DistributedGraphDbTest : public ::testing::Test {
const std::string kLocal = "127.0.0.1";
class WorkerInThread {
public:
WorkerInThread(database::Config config) : worker_(config) {
thread_ = std::thread([this, config] { worker_.WaitForShutdown(); });
}
~WorkerInThread() {
if (thread_.joinable()) thread_.join();
}
database::Worker worker_;
std::thread thread_;
};
protected:
void SetUp() override {
const auto kInitTime = 200ms;
database::Config master_config;
master_config.master_endpoint = {kLocal, 0};
master_.emplace(master_config);
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this](int worker_id) {
database::Config config;
config.worker_id = worker_id;
config.master_endpoint = master_->endpoint();
config.worker_endpoint = {kLocal, 0};
return config;
};
worker1_.emplace(worker_config(1));
std::this_thread::sleep_for(kInitTime);
worker2_.emplace(worker_config(2));
std::this_thread::sleep_for(kInitTime);
}
void TearDown() override {
// Kill master first because it will expect a shutdown response from the
// workers.
master_ = std::experimental::nullopt;
worker2_ = std::experimental::nullopt;
worker1_ = std::experimental::nullopt;
}
database::Master &master() { return *master_; }
auto &master_tx_engine() {
return dynamic_cast<tx::MasterEngine &>(master_->tx_engine());
}
database::Worker &worker1() { return worker1_->worker_; }
database::Worker &worker2() { return worker2_->worker_; }
private:
optional<database::Master> master_;
optional<WorkerInThread> worker1_;
optional<WorkerInThread> worker2_;
};

View File

@ -1,4 +1,3 @@
#include <experimental/optional>
#include <memory>
#include <thread>
#include <unordered_set>
@ -23,74 +22,12 @@
#include "query_plan_common.hpp"
#include "transactions/engine_master.hpp"
#include "distributed_common.hpp"
#include "query_common.hpp"
#include "query_plan_common.hpp"
template <typename T>
using optional = std::experimental::optional<T>;
using namespace distributed;
class DistributedGraphDbTest : public ::testing::Test {
const std::string kLocal = "127.0.0.1";
class WorkerInThread {
public:
WorkerInThread(database::Config config) : worker_(config) {
thread_ = std::thread([this, config] { worker_.WaitForShutdown(); });
}
~WorkerInThread() {
if (thread_.joinable()) thread_.join();
}
database::Worker worker_;
std::thread thread_;
};
protected:
void SetUp() override {
const auto kInitTime = 200ms;
database::Config master_config;
master_config.master_endpoint = {kLocal, 0};
master_.emplace(master_config);
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this](int worker_id) {
database::Config config;
config.worker_id = worker_id;
config.master_endpoint = master_->endpoint();
config.worker_endpoint = {kLocal, 0};
return config;
};
worker1_.emplace(worker_config(1));
std::this_thread::sleep_for(kInitTime);
worker2_.emplace(worker_config(2));
std::this_thread::sleep_for(kInitTime);
}
void TearDown() override {
// Kill master first because it will expect a shutdown response from the
// workers.
master_ = std::experimental::nullopt;
worker2_ = std::experimental::nullopt;
worker1_ = std::experimental::nullopt;
}
database::Master &master() { return *master_; }
auto &master_tx_engine() {
return dynamic_cast<tx::MasterEngine &>(master_->tx_engine());
}
database::Worker &worker1() { return worker1_->worker_; }
database::Worker &worker2() { return worker2_->worker_; }
private:
optional<database::Master> master_;
optional<WorkerInThread> worker1_;
optional<WorkerInThread> worker2_;
};
using namespace database;
TEST_F(DistributedGraphDbTest, Coordination) {
EXPECT_NE(master().endpoint().port(), 0);
@ -162,7 +99,6 @@ TEST_F(DistributedGraphDbTest, Counters) {
}
TEST_F(DistributedGraphDbTest, RemoteDataGetting) {
using GraphDbAccessor = database::GraphDbAccessor;
// Only old data is visible remotely, so create and commit some data.
gid::Gid v1_id, v2_id, e1_id;
@ -186,10 +122,10 @@ TEST_F(DistributedGraphDbTest, RemoteDataGetting) {
}
// The master must start a transaction before workers can work in it.
database::GraphDbAccessor master_dba{master()};
GraphDbAccessor master_dba{master()};
{
database::GraphDbAccessor w1_dba{worker1(), master_dba.transaction_id()};
GraphDbAccessor w1_dba{worker1(), master_dba.transaction_id()};
VertexAccessor v1_in_w1{{v1_id, 0}, w1_dba};
EXPECT_NE(v1_in_w1.GetOld(), nullptr);
EXPECT_EQ(v1_in_w1.GetNew(), nullptr);
@ -198,7 +134,7 @@ TEST_F(DistributedGraphDbTest, RemoteDataGetting) {
}
{
database::GraphDbAccessor w2_dba{worker2(), master_dba.transaction_id()};
GraphDbAccessor w2_dba{worker2(), master_dba.transaction_id()};
VertexAccessor v2_in_w2{{v2_id, 0}, w2_dba};
EXPECT_NE(v2_in_w2.GetOld(), nullptr);
EXPECT_EQ(v2_in_w2.GetNew(), nullptr);
@ -237,7 +173,7 @@ TEST_F(DistributedGraphDbTest, DispatchPlan) {
}
TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
database::GraphDbAccessor dba{master()};
GraphDbAccessor dba{master()};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
@ -263,7 +199,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
Parameters params;
std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
auto remote_pull = [this, &params, &symbols](database::GraphDbAccessor &dba,
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, 3);
@ -285,8 +221,8 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
EXPECT_EQ(batch.frames[1][0].ValueInt(), 2);
};
database::GraphDbAccessor dba_1{master()};
database::GraphDbAccessor dba_2{master()};
GraphDbAccessor dba_1{master()};
GraphDbAccessor dba_2{master()};
for (int worker_id : {1, 2}) {
// TODO flor, proper test async here.
auto tx1_batch1 = remote_pull(dba_1, worker_id).get();
@ -310,9 +246,9 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
// sequence ID, so we can check we retrieved all.
storage::Property prop;
{
database::GraphDbAccessor dba{master()};
GraphDbAccessor dba{master()};
prop = dba.Property("prop");
auto create_data = [prop](database::GraphDbAccessor &dba, int worker_id) {
auto create_data = [prop](GraphDbAccessor &dba, int worker_id) {
auto v1 = dba.InsertVertex();
v1.PropsSet(prop, worker_id * 10);
auto v2 = dba.InsertVertex();
@ -321,14 +257,14 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
e12.PropsSet(prop, worker_id * 10 + 2);
};
create_data(dba, 0);
database::GraphDbAccessor dba_w1{worker1(), dba.transaction_id()};
GraphDbAccessor dba_w1{worker1(), dba.transaction_id()};
create_data(dba_w1, 1);
database::GraphDbAccessor dba_w2{worker2(), dba.transaction_id()};
GraphDbAccessor dba_w2{worker2(), dba.transaction_id()};
create_data(dba_w2, 2);
dba.Commit();
}
database::GraphDbAccessor dba{master()};
GraphDbAccessor dba{master()};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
@ -358,9 +294,9 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
ctx.symbol_table_[*return_p] = ctx.symbol_table_.CreateSymbol("", true);
auto produce = MakeProduce(p, return_n_r, return_m, return_p);
auto check_result = [prop](int worker_id,
const std::vector<std::vector<query::TypedValue>>
&frames) {
auto check_result = [prop](
int worker_id,
const std::vector<std::vector<query::TypedValue>> &frames) {
int offset = worker_id * 10;
ASSERT_EQ(frames.size(), 1);
auto &row = frames[0];
@ -387,7 +323,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
Parameters params;
std::vector<query::Symbol> symbols{ctx.symbol_table_[*return_n_r],
ctx.symbol_table_[*return_m], p_sym};
auto remote_pull = [this, &params, &symbols](database::GraphDbAccessor &dba,
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, 3);
@ -402,7 +338,6 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
}
TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
using GraphDbAccessor = database::GraphDbAccessor;
storage::Label label;
storage::Property property;
@ -448,14 +383,14 @@ TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
}
TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) {
database::GraphDbAccessor dba_w1(worker1());
GraphDbAccessor dba_w1(worker1());
auto v = dba_w1.InsertVertex();
auto prop = dba_w1.Property("p");
v.PropsSet(prop, 42);
auto v_ga = v.GlobalAddress();
dba_w1.Commit();
database::GraphDbAccessor dba_w2(worker2());
GraphDbAccessor dba_w2(worker2());
VertexAccessor v_in_w2{v_ga, dba_w2};
EXPECT_EQ(v_in_w2.PropsAt(prop).Value<int64_t>(), 42);
}

View File

@ -0,0 +1,31 @@
#include <gtest/gtest.h>
#include "database/graph_db_accessor.hpp"
#include "distributed_common.hpp"
TEST_F(DistributedGraphDbTest, RemoteUpdateLocalVisibility) {
database::GraphDbAccessor dba_tx1{worker1()};
auto v = dba_tx1.InsertVertex();
auto v_ga = v.GlobalAddress();
dba_tx1.Commit();
database::GraphDbAccessor dba_tx2_w2{worker2()};
v = VertexAccessor(v_ga, dba_tx2_w2);
ASSERT_FALSE(v.address().is_local());
auto label = dba_tx2_w2.Label("l");
EXPECT_FALSE(v.has_label(label));
v.add_label(label);
v.SwitchNew();
EXPECT_TRUE(v.has_label(label));
v.SwitchOld();
EXPECT_FALSE(v.has_label(label));
// In the same transaction on the owning worker there is no label.
database::GraphDbAccessor dba_tx2_w1{worker1(), dba_tx2_w2.transaction_id()};
v = VertexAccessor(v_ga, dba_tx2_w1);
v.SwitchOld();
EXPECT_FALSE(v.has_label(label));
v.SwitchNew();
EXPECT_FALSE(v.has_label(label));
}