Impl of Scan with filters (unfinished)

This commit is contained in:
jeremy 2022-09-23 09:52:48 +02:00
parent 93460f0fee
commit 82957ef727
2 changed files with 70 additions and 3 deletions

View File

@ -2703,10 +2703,27 @@ bool Foreach::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
return visitor.PostVisit(*this);
}
/*
#NoCommit #JBA we will need to also modify the request sent (attributes?) + the client of storage + the storage side
for the string version we need to stringify the expression
perhaps not needed now for customer?
Note: operators are not yet connected to anything are they? For now I think the plan still use the non-distrib version
*/
class DistributedScanAllCursor : public Cursor {
public:
explicit DistributedScanAllCursor(Symbol output_symbol, UniqueCursorPtr input_cursor, const char *op_name)
: output_symbol_(output_symbol), input_cursor_(std::move(input_cursor)), op_name_(op_name) {}
explicit DistributedScanAllCursor(
Symbol output_symbol, UniqueCursorPtr input_cursor, const char *op_name,
std::optional<storage::v3::LabelId> label_to_filter, std::optional<storage::v3::PropertyId> property_to_filter,
std::optional<std::vector<std::pair<storage::v3::PropertyId, Expression *>>> property_value_pairs_to_filter)
: output_symbol_(output_symbol),
input_cursor_(std::move(input_cursor)),
op_name_(op_name),
label_to_filter_(label_to_filter),
property_to_filter_(property_to_filter),
property_value_pairs_to_filter_(property_value_pairs_to_filter) {
ResetExecutionState();
}
using VertexAccessor = accessors::VertexAccessor;
@ -2744,6 +2761,47 @@ class DistributedScanAllCursor : public Cursor {
current_batch.clear();
current_vertex_it = current_batch.end();
request_state_ = msgs::ExecutionState<msgs::ScanVerticesRequest>{};
/*
Many questions here:
Where are all attributes of ScanVerticesRequest init?
Then why vector of request? What do we expect?
How do we build the vector? What are the several elements corresponding to?
Where do we iterate on those requests?
Convert the Expression* to string version of ast? or something?
We gonna have to break down the expression at some point somewhere
Is there a way to Ut something? Perhaps we can UT the creatiuon of request and compare we exactly get the
ScanVerticesRequest we want
*/
auto request = msgs::ScanVerticesRequest{};
if (label_to_filter_.has_value()) {
request.label_to_filter_ = msgs::Label{.id = label_to_filter_.value()};
}
if (property_to_filter_.has_value()) {
request.property_to_filter_ = property_to_filter_;
}
if (property_value_pairs_to_filter_.has_value()) {
auto res = std::vector<std::pair<storage::v3::PropertyId, std::string>>{}; // #NoCommit not int
res.reserve(property_value_pairs_to_filter_->size());
std::transform(
property_value_pairs_to_filter_->begin(), property_value_pairs_to_filter_->end(), std::back_inserter(res),
[](auto &property_value_pair) {
return std::make_pair(
property_value_pair.first,
std::string("") /* ToString(property_value_pair.second)*/); // #NoCommit call expression.tostring? see
// the json thing?
// #NoCommit we need to replace the node by a special character (sequence?) that the storage will
// identify as a node to know where is the symbol on the frame #NoCommit not bool but something
// with property_value_pair.second?
});
request.property_value_pairs_to_filter_ = res;
}
request_state_.requests.emplace_back(request);
}
void Reset() override {
@ -2758,6 +2816,9 @@ class DistributedScanAllCursor : public Cursor {
std::vector<VertexAccessor> current_batch;
decltype(std::vector<VertexAccessor>().begin()) current_vertex_it;
msgs::ExecutionState<msgs::ScanVerticesRequest> request_state_;
std::optional<storage::v3::LabelId> label_to_filter_;
std::optional<storage::v3::PropertyId> property_to_filter_;
std::optional<std::vector<std::pair<storage::v3::PropertyId, Expression *>>> property_value_pairs_to_filter_;
};
class DistributedCreateNodeCursor : public Cursor {

View File

@ -376,13 +376,19 @@ struct OrderBy {
enum class StorageView { OLD = 0, NEW = 1 };
// #NoCommit #JBA to modify
struct ScanVerticesRequest {
Hlc transaction_id;
VertexId start_id;
std::optional<std::vector<PropertyId>> props_to_return;
std::optional<std::vector<std::string>> filter_expressions;
std::optional<std::vector<std::string>> filter_expressions; // #NoCommit what the use of that exactly?
std::optional<size_t> batch_limit;
StorageView storage_view;
std::optional<Label> label_to_filter_;
std::optional<PropertyId> property_to_filter_;
std::optional<std::vector<std::pair<storage::v3::PropertyId, std::string>>> property_value_pairs_to_filter_;
// #NoCommit not bool but the string version of the ast? to double check
};
struct ScanResultRow {