Remove bug-prone inverted ownership of ExecutionState as a consideration of operators

This commit is contained in:
Tyler Neely 2022-11-29 14:30:59 +00:00
parent ec529da8d2
commit 9144d2dccd
3 changed files with 33 additions and 84 deletions

View File

@ -180,7 +180,7 @@ class DistributedCreateNodeCursor : public Cursor {
auto &request_router = context.request_router;
{
SCOPED_REQUEST_WAIT_PROFILE;
request_router->Request(state_, NodeCreationInfoToRequest(context, frame));
request_router->Request(NodeCreationInfoToRequest(context, frame));
}
PlaceNodeOnTheFrame(frame, context);
return true;
@ -191,7 +191,7 @@ class DistributedCreateNodeCursor : public Cursor {
void Shutdown() override { input_cursor_->Shutdown(); }
void Reset() override { state_ = {}; }
void Reset() override {}
void PlaceNodeOnTheFrame(Frame &frame, ExecutionContext &context) {
// TODO(kostasrim) Make this work with batching
@ -252,7 +252,6 @@ class DistributedCreateNodeCursor : public Cursor {
std::vector<const NodeCreationInfo *> nodes_info_;
std::vector<std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>> src_vertex_props_;
std::vector<msgs::PrimaryKey> primary_keys_;
ExecutionState<msgs::CreateVerticesRequest> state_;
};
bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
@ -365,7 +364,6 @@ class ScanAllCursor : public Cursor {
std::optional<decltype(vertices_.value().begin())> vertices_it_;
const char *op_name_;
std::vector<msgs::ScanVerticesResponse> current_batch;
ExecutionState<msgs::ScanVerticesRequest> request_state;
};
class DistributedScanAllAndFilterCursor : public Cursor {
@ -384,14 +382,21 @@ class DistributedScanAllAndFilterCursor : public Cursor {
ResetExecutionState();
}
enum class State : int8_t { INITIALIZING, COMPLETED };
using VertexAccessor = accessors::VertexAccessor;
bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) {
{
SCOPED_REQUEST_WAIT_PROFILE;
current_batch = request_router.Request(request_state_);
std::optional<std::string> request_label = std::nullopt;
if (label_.has_value()) {
request_label = request_router.LabelToName(*label_);
}
current_batch = request_router.Request(request_label);
}
current_vertex_it = current_batch.begin();
request_state_ = State::COMPLETED;
return !current_batch.empty();
}
@ -403,19 +408,15 @@ class DistributedScanAllAndFilterCursor : public Cursor {
if (MustAbort(context)) {
throw HintedAbortError();
}
using State = ExecutionState<msgs::ScanVerticesRequest>;
if (request_state_.state == State::INITIALIZING) {
if (request_state_ == State::INITIALIZING) {
if (!input_cursor_->Pull(frame, context)) {
return false;
}
}
request_state_.label =
label_.has_value() ? std::make_optional(request_router.LabelToName(*label_)) : std::nullopt;
if (current_vertex_it == current_batch.end() &&
(request_state_.state == State::COMPLETED || !MakeRequest(request_router, context))) {
(request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
ResetExecutionState();
continue;
}
@ -431,7 +432,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
void ResetExecutionState() {
current_batch.clear();
current_vertex_it = current_batch.end();
request_state_ = ExecutionState<msgs::ScanVerticesRequest>{};
request_state_ = State::INITIALIZING;
}
void Reset() override {
@ -445,7 +446,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
const char *op_name_;
std::vector<VertexAccessor> current_batch;
std::vector<VertexAccessor>::iterator current_vertex_it;
ExecutionState<msgs::ScanVerticesRequest> request_state_;
State request_state_ = State::INITIALIZING;
std::optional<storage::v3::LabelId> label_;
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
std::optional<std::vector<Expression *>> filter_expressions_;
@ -2343,7 +2344,7 @@ class DistributedCreateExpandCursor : public Cursor {
ResetExecutionState();
{
SCOPED_REQUEST_WAIT_PROFILE;
request_router->Request(state_, ExpandCreationInfoToRequest(context, frame));
request_router->Request(ExpandCreationInfoToRequest(context, frame));
}
return true;
}
@ -2423,11 +2424,10 @@ class DistributedCreateExpandCursor : public Cursor {
}
private:
void ResetExecutionState() { state_ = {}; }
void ResetExecutionState() {}
const UniqueCursorPtr input_cursor_;
const CreateExpand &self_;
ExecutionState<msgs::CreateExpandRequest> state_;
};
class DistributedExpandCursor : public Cursor {
@ -2474,8 +2474,7 @@ class DistributedExpandCursor : public Cursor {
request.edge_properties.emplace();
request.src_vertices.push_back(get_dst_vertex(edge, direction));
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = context.request_router->Request(request_state, std::move(request));
auto result_rows = context.request_router->Request(std::move(request));
MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front();
frame[self_.common_.node_symbol] = accessors::VertexAccessor(
@ -2500,10 +2499,9 @@ class DistributedExpandCursor : public Cursor {
// to not fetch any properties of the edges
request.edge_properties.emplace();
request.src_vertices.push_back(vertex.Id());
ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = std::invoke([&context, &request_state, &request]() mutable {
auto result_rows = std::invoke([&context, &request]() mutable {
SCOPED_REQUEST_WAIT_PROFILE;
return context.request_router->Request(request_state, std::move(request));
return context.request_router->Request(std::move(request));
});
MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front();

View File

@ -83,7 +83,6 @@ struct ExecutionState {
using CompoundKey = io::rsm::ShardRsmKey;
using Shard = coordinator::Shard;
enum State : int8_t { INITIALIZING, EXECUTING, COMPLETED };
// label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label
// on the request itself.
std::optional<std::string> label;
@ -97,7 +96,6 @@ struct ExecutionState {
// it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes
// empty, it means that all of the requests have completed succefully.
std::vector<ShardRequestState<TRequest>> requests;
State state = INITIALIZING;
};
class RequestRouterInterface {
@ -113,13 +111,10 @@ class RequestRouterInterface {
virtual void StartTransaction() = 0;
virtual void Commit() = 0;
virtual std::vector<VertexAccessor> Request(ExecutionState<msgs::ScanVerticesRequest> &state) = 0;
virtual std::vector<msgs::CreateVerticesResponse> Request(ExecutionState<msgs::CreateVerticesRequest> &state,
std::vector<msgs::NewVertex> new_vertices) = 0;
virtual std::vector<msgs::ExpandOneResultRow> Request(ExecutionState<msgs::ExpandOneRequest> &state,
msgs::ExpandOneRequest request) = 0;
virtual std::vector<msgs::CreateExpandResponse> Request(ExecutionState<msgs::CreateExpandRequest> &state,
std::vector<msgs::NewExpand> new_edges) = 0;
virtual std::vector<VertexAccessor> Request(std::optional<std::string> &label) = 0;
virtual std::vector<msgs::CreateVerticesResponse> Request(std::vector<msgs::NewVertex> new_vertices) = 0;
virtual std::vector<msgs::ExpandOneResultRow> Request(msgs::ExpandOneRequest request) = 0;
virtual std::vector<msgs::CreateExpandResponse> Request(std::vector<msgs::NewExpand> new_edges) = 0;
virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0;
virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0;
@ -245,7 +240,9 @@ class RequestRouter : public RequestRouterInterface {
bool IsPrimaryLabel(storage::v3::LabelId label) const override { return shards_map_.label_spaces.contains(label); }
// TODO(kostasrim) Simplify return result
std::vector<VertexAccessor> Request(ExecutionState<msgs::ScanVerticesRequest> &state) override {
std::vector<VertexAccessor> Request(std::optional<std::string> &label) override {
ExecutionState<msgs::ScanVerticesRequest> state = {};
state.label = label;
MaybeInitializeExecutionState(state);
std::vector<msgs::ScanVerticesResponse> responses;
@ -255,14 +252,13 @@ class RequestRouter : public RequestRouterInterface {
DriveReadResponses(state, responses);
} while (!state.requests.empty());
MaybeCompleteState(state);
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
// result of storage_client.SendReadRequest()).
return PostProcess(std::move(responses));
}
std::vector<msgs::CreateVerticesResponse> Request(ExecutionState<msgs::CreateVerticesRequest> &state,
std::vector<msgs::NewVertex> new_vertices) override {
std::vector<msgs::CreateVerticesResponse> Request(std::vector<msgs::NewVertex> new_vertices) override {
ExecutionState<msgs::CreateVerticesRequest> state = {};
MG_ASSERT(!new_vertices.empty());
MaybeInitializeExecutionState(state, new_vertices);
std::vector<msgs::CreateVerticesResponse> responses;
@ -275,14 +271,13 @@ class RequestRouter : public RequestRouterInterface {
DriveWriteResponses(state, responses);
} while (!state.requests.empty());
MaybeCompleteState(state);
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
// result of storage_client.SendReadRequest()).
return responses;
}
std::vector<msgs::CreateExpandResponse> Request(ExecutionState<msgs::CreateExpandRequest> &state,
std::vector<msgs::NewExpand> new_edges) override {
std::vector<msgs::CreateExpandResponse> Request(std::vector<msgs::NewExpand> new_edges) override {
ExecutionState<msgs::CreateExpandRequest> state = {};
MG_ASSERT(!new_edges.empty());
MaybeInitializeExecutionState(state, new_edges);
std::vector<msgs::CreateExpandResponse> responses;
@ -303,12 +298,11 @@ class RequestRouter : public RequestRouterInterface {
}
// We are done with this state
state.requests.clear();
MaybeCompleteState(state);
return responses;
}
std::vector<msgs::ExpandOneResultRow> Request(ExecutionState<msgs::ExpandOneRequest> &state,
msgs::ExpandOneRequest request) override {
std::vector<msgs::ExpandOneResultRow> Request(msgs::ExpandOneRequest request) override {
ExecutionState<msgs::ExpandOneRequest> state = {};
// TODO(kostasrim)Update to limit the batch size here
// Expansions of the destination must be handled by the caller. For example
// match (u:L1 { prop : 1 })-[:Friend]-(v:L1)
@ -335,7 +329,6 @@ class RequestRouter : public RequestRouterInterface {
result_rows.insert(result_rows.end(), std::make_move_iterator(response.result.begin()),
std::make_move_iterator(response.result.end()));
}
MaybeCompleteState(state);
return result_rows;
}
@ -362,31 +355,8 @@ class RequestRouter : public RequestRouterInterface {
return accessors;
}
template <typename ExecutionState>
void ThrowIfStateCompleted(ExecutionState &state) const {
if (state.state == ExecutionState::COMPLETED) [[unlikely]] {
throw std::runtime_error("State is completed and must be reset");
}
}
template <typename ExecutionState>
void MaybeCompleteState(ExecutionState &state) const {
if (state.requests.empty()) {
state.state = ExecutionState::COMPLETED;
}
}
template <typename ExecutionState>
bool ShallNotInitializeState(ExecutionState &state) const {
return state.state != ExecutionState::INITIALIZING;
}
void MaybeInitializeExecutionState(ExecutionState<msgs::CreateVerticesRequest> &state,
std::vector<msgs::NewVertex> new_vertices) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
std::map<Shard, msgs::CreateVerticesRequest> per_shard_request_table;
@ -410,15 +380,10 @@ class RequestRouter : public RequestRouterInterface {
};
state.requests.emplace_back(std::move(shard_request_state));
}
state.state = ExecutionState<msgs::CreateVerticesRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<msgs::CreateExpandRequest> &state,
std::vector<msgs::NewExpand> new_expands) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
std::map<Shard, msgs::CreateExpandRequest> per_shard_request_table;
@ -453,15 +418,9 @@ class RequestRouter : public RequestRouterInterface {
};
state.requests.emplace_back(std::move(shard_request_state));
}
state.state = ExecutionState<msgs::CreateExpandRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<msgs::ScanVerticesRequest> &state) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
std::vector<coordinator::Shards> multi_shards;
state.transaction_id = transaction_id_;
if (!state.label) {
@ -489,14 +448,9 @@ class RequestRouter : public RequestRouterInterface {
state.requests.emplace_back(std::move(shard_request_state));
}
}
state.state = ExecutionState<msgs::ScanVerticesRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<msgs::ExpandOneRequest> &state, msgs::ExpandOneRequest request) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
std::map<Shard, msgs::ExpandOneRequest> per_shard_request_table;
@ -522,7 +476,6 @@ class RequestRouter : public RequestRouterInterface {
state.requests.emplace_back(std::move(shard_request_state));
}
state.state = ExecutionState<msgs::ExpandOneRequest>::EXECUTING;
}
StorageClient &GetStorageClientForShard(Shard shard) {

View File

@ -174,8 +174,6 @@ void ExecuteOp(query::v2::RequestRouter<LocalTransport> &request_router, std::se
return;
}
query::v2::ExecutionState<msgs::CreateVerticesRequest> state;
auto label_id = request_router.NameToLabel("test_label");
msgs::NewVertex nv{.primary_key = primary_key};
@ -184,7 +182,7 @@ void ExecuteOp(query::v2::RequestRouter<LocalTransport> &request_router, std::se
std::vector<msgs::NewVertex> new_vertices;
new_vertices.push_back(std::move(nv));
auto result = request_router.Request(state, std::move(new_vertices));
auto result = request_router.Request(std::move(new_vertices));
MG_ASSERT(result.size() == 1);
MG_ASSERT(!result[0].error.has_value());