Merge branch 'project-pineapples' of github.com:memgraph/memgraph into T1122-MG-ShardManager-ThreadPool
This commit is contained in:
commit
b685a21171
@ -12,8 +12,8 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <unordered_map>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "storage/v3/property_value.hpp"
|
#include "storage/v3/property_value.hpp"
|
||||||
#include "utils/logging.hpp"
|
#include "utils/logging.hpp"
|
||||||
@ -32,7 +32,7 @@ struct Parameters {
|
|||||||
* @param position Token position in query of value.
|
* @param position Token position in query of value.
|
||||||
* @param value
|
* @param value
|
||||||
*/
|
*/
|
||||||
void Add(int position, const storage::v3::PropertyValue &value) { storage_.emplace_back(position, value); }
|
void Add(int position, const storage::v3::PropertyValue &value) { storage_.emplace(position, value); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the value found for the given token position.
|
* Returns the value found for the given token position.
|
||||||
@ -41,23 +41,11 @@ struct Parameters {
|
|||||||
* @return Value for the given token position.
|
* @return Value for the given token position.
|
||||||
*/
|
*/
|
||||||
const storage::v3::PropertyValue &AtTokenPosition(int position) const {
|
const storage::v3::PropertyValue &AtTokenPosition(int position) const {
|
||||||
auto found = std::find_if(storage_.begin(), storage_.end(), [&](const auto &a) { return a.first == position; });
|
auto found = storage_.find(position);
|
||||||
MG_ASSERT(found != storage_.end(), "Token position must be present in container");
|
MG_ASSERT(found != storage_.end(), "Token position must be present in container");
|
||||||
return found->second;
|
return found->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the position-th stripped value. Asserts that this
|
|
||||||
* container has at least (position + 1) elements.
|
|
||||||
*
|
|
||||||
* @param position Which stripped param is sought.
|
|
||||||
* @return Token position and value for sought param.
|
|
||||||
*/
|
|
||||||
const std::pair<int, storage::v3::PropertyValue> &At(int position) const {
|
|
||||||
MG_ASSERT(position < static_cast<int>(storage_.size()), "Invalid position");
|
|
||||||
return storage_[position];
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the number of arguments in this container */
|
/** Returns the number of arguments in this container */
|
||||||
auto size() const { return storage_.size(); }
|
auto size() const { return storage_.size(); }
|
||||||
|
|
||||||
@ -65,7 +53,7 @@ struct Parameters {
|
|||||||
auto end() const { return storage_.end(); }
|
auto end() const { return storage_.end(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<std::pair<int, storage::v3::PropertyValue>> storage_;
|
std::unordered_map<int, storage::v3::PropertyValue> storage_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace memgraph::query::v2
|
} // namespace memgraph::query::v2
|
||||||
|
@ -2458,13 +2458,13 @@ class DistributedCreateExpandCursor : public Cursor {
|
|||||||
std::invoke([&]() {
|
std::invoke([&]() {
|
||||||
switch (edge_info.direction) {
|
switch (edge_info.direction) {
|
||||||
case EdgeAtom::Direction::IN: {
|
case EdgeAtom::Direction::IN: {
|
||||||
set_vertex(v1, request.src_vertex);
|
set_vertex(v2, request.src_vertex);
|
||||||
set_vertex(v2, request.dest_vertex);
|
set_vertex(v1, request.dest_vertex);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case EdgeAtom::Direction::OUT: {
|
case EdgeAtom::Direction::OUT: {
|
||||||
set_vertex(v1, request.dest_vertex);
|
set_vertex(v1, request.src_vertex);
|
||||||
set_vertex(v2, request.src_vertex);
|
set_vertex(v2, request.dest_vertex);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case EdgeAtom::Direction::BOTH:
|
case EdgeAtom::Direction::BOTH:
|
||||||
@ -2513,7 +2513,8 @@ class DistributedExpandCursor : public Cursor {
|
|||||||
}
|
}
|
||||||
MG_ASSERT(direction != EdgeAtom::Direction::BOTH);
|
MG_ASSERT(direction != EdgeAtom::Direction::BOTH);
|
||||||
const auto &edge = frame[self_.common_.edge_symbol].ValueEdge();
|
const auto &edge = frame[self_.common_.edge_symbol].ValueEdge();
|
||||||
static auto get_dst_vertex = [&edge](const EdgeAtom::Direction direction) {
|
static constexpr auto get_dst_vertex = [](const EdgeAccessor &edge,
|
||||||
|
const EdgeAtom::Direction direction) -> msgs::VertexId {
|
||||||
switch (direction) {
|
switch (direction) {
|
||||||
case EdgeAtom::Direction::IN:
|
case EdgeAtom::Direction::IN:
|
||||||
return edge.From().Id();
|
return edge.From().Id();
|
||||||
@ -2526,7 +2527,7 @@ class DistributedExpandCursor : public Cursor {
|
|||||||
msgs::ExpandOneRequest request;
|
msgs::ExpandOneRequest request;
|
||||||
// to not fetch any properties of the edges
|
// to not fetch any properties of the edges
|
||||||
request.edge_properties.emplace();
|
request.edge_properties.emplace();
|
||||||
request.src_vertices.push_back(get_dst_vertex(direction));
|
request.src_vertices.push_back(get_dst_vertex(edge, direction));
|
||||||
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
|
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
|
||||||
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
|
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
|
||||||
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
|
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
|
||||||
|
@ -29,6 +29,7 @@ struct ProfilingStats {
|
|||||||
static constexpr std::string_view kNumCycles{"num_cycles"};
|
static constexpr std::string_view kNumCycles{"num_cycles"};
|
||||||
static constexpr std::string_view kRelativeTime{"relative_time"};
|
static constexpr std::string_view kRelativeTime{"relative_time"};
|
||||||
static constexpr std::string_view kAbsoluteTime{"absolute_time"};
|
static constexpr std::string_view kAbsoluteTime{"absolute_time"};
|
||||||
|
static constexpr std::string_view kActualHits{"actual_hits"};
|
||||||
|
|
||||||
int64_t actual_hits{0};
|
int64_t actual_hits{0};
|
||||||
uint64_t num_cycles{0};
|
uint64_t num_cycles{0};
|
||||||
|
@ -42,13 +42,18 @@ class ScopedCustomProfile {
|
|||||||
custom_data = nlohmann::json::object();
|
custom_data = nlohmann::json::object();
|
||||||
}
|
}
|
||||||
const auto elapsed = utils::ReadTSC() - start_time_;
|
const auto elapsed = utils::ReadTSC() - start_time_;
|
||||||
auto &num_cycles_json = custom_data[ProfilingStats::kNumCycles];
|
IncreaseCustomData(custom_data, ProfilingStats::kNumCycles, elapsed);
|
||||||
const auto num_cycles = num_cycles_json.is_null() ? 0 : num_cycles_json.get<uint64_t>();
|
IncreaseCustomData(custom_data, ProfilingStats::kActualHits, 1);
|
||||||
num_cycles_json = num_cycles + elapsed;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
static void IncreaseCustomData(nlohmann::json &custom_data, const std::string_view key, const uint64_t increment) {
|
||||||
|
auto &json_data = custom_data[key];
|
||||||
|
const auto numerical_data = json_data.is_null() ? 0 : json_data.get<uint64_t>();
|
||||||
|
json_data = numerical_data + increment;
|
||||||
|
}
|
||||||
|
|
||||||
std::string_view custom_data_name_;
|
std::string_view custom_data_name_;
|
||||||
uint64_t start_time_;
|
uint64_t start_time_;
|
||||||
ExecutionContext *context_;
|
ExecutionContext *context_;
|
||||||
|
@ -42,7 +42,7 @@ struct Parameters {
|
|||||||
* @param position Token position in query of value.
|
* @param position Token position in query of value.
|
||||||
* @param value
|
* @param value
|
||||||
*/
|
*/
|
||||||
void Add(int position, const storage::v3::PropertyValue &value) { storage_.emplace_back(position, value); }
|
void Add(int position, const storage::v3::PropertyValue &value) { storage_.emplace(position, value); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the value found for the given token position.
|
* Returns the value found for the given token position.
|
||||||
@ -51,23 +51,11 @@ struct Parameters {
|
|||||||
* @return Value for the given token position.
|
* @return Value for the given token position.
|
||||||
*/
|
*/
|
||||||
const storage::v3::PropertyValue &AtTokenPosition(int position) const {
|
const storage::v3::PropertyValue &AtTokenPosition(int position) const {
|
||||||
auto found = std::find_if(storage_.begin(), storage_.end(), [&](const auto &a) { return a.first == position; });
|
auto found = storage_.find(position);
|
||||||
MG_ASSERT(found != storage_.end(), "Token position must be present in container");
|
MG_ASSERT(found != storage_.end(), "Token position must be present in container");
|
||||||
return found->second;
|
return found->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the position-th stripped value. Asserts that this
|
|
||||||
* container has at least (position + 1) elements.
|
|
||||||
*
|
|
||||||
* @param position Which stripped param is sought.
|
|
||||||
* @return Token position and value for sought param.
|
|
||||||
*/
|
|
||||||
const std::pair<int, storage::v3::PropertyValue> &At(int position) const {
|
|
||||||
MG_ASSERT(position < static_cast<int>(storage_.size()), "Invalid position");
|
|
||||||
return storage_[position];
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the number of arguments in this container */
|
/** Returns the number of arguments in this container */
|
||||||
auto size() const { return storage_.size(); }
|
auto size() const { return storage_.size(); }
|
||||||
|
|
||||||
@ -75,7 +63,7 @@ struct Parameters {
|
|||||||
auto end() const { return storage_.end(); }
|
auto end() const { return storage_.end(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<std::pair<int, storage::v3::PropertyValue>> storage_;
|
std::unordered_map<int, storage::v3::PropertyValue> storage_;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct EvaluationContext {
|
struct EvaluationContext {
|
||||||
|
Loading…
Reference in New Issue
Block a user