Merge branch 'master' into Implement-constant-time-label-and-edge-type-retrieval

This commit is contained in:
gvolfing 2023-11-14 13:13:16 +01:00
commit a370b09d12
26 changed files with 540 additions and 105 deletions

294
.github/workflows/package_specific.yaml vendored Normal file
View File

@ -0,0 +1,294 @@
name: Package Specific
# TODO(gitbuda): Cleanup docker container if GHA job was canceled.
on:
workflow_dispatch:
inputs:
memgraph_version:
description: "Memgraph version to upload as. Leave this field empty if you don't want to upload binaries to S3. Format: 'X.Y.Z'"
required: false
build_type:
type: choice
description: "Memgraph Build type. Default value is Release."
default: 'Release'
options:
- Release
- RelWithDebInfo
target_os:
type: choice
description: "Target OS for which memgraph will be packaged. Default is Ubuntu 22.04"
default: 'ubuntu-22_04'
options:
- amzn-2
- centos-7
- centos-9
- debian-10
- debian-11
- debian-11-arm
- debian-11-platform
- docker
- fedora-36
- ubuntu-18_04
- ubuntu-20_04
- ubuntu-22_04
- ubuntu-22_04-arm
jobs:
amzn-2:
if: ${{ github.event.inputs.target_os == 'amzn-2' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package amzn-2 ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: amzn-2
path: build/output/amzn-2/memgraph*.rpm
centos-7:
if: ${{ github.event.inputs.target_os == 'centos-7' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package centos-7 ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: centos-7
path: build/output/centos-7/memgraph*.rpm
centos-9:
if: ${{ github.event.inputs.target_os == 'centos-9' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package centos-9 ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: centos-9
path: build/output/centos-9/memgraph*.rpm
debian-10:
if: ${{ github.event.inputs.target_os == 'debian-10' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package debian-10 ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: debian-10
path: build/output/debian-10/memgraph*.deb
debian-11:
if: ${{ github.event.inputs.target_os == 'debian-11' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package debian-11 ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: debian-11
path: build/output/debian-11/memgraph*.deb
debian-11-arm:
if: ${{ github.event.inputs.target_os == 'debian-11-arm' }}
runs-on: [self-hosted, DockerMgBuild, ARM64, strange]
timeout-minutes: 120
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package debian-11-arm ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: debian-11-aarch64
path: build/output/debian-11-arm/memgraph*.deb
debian-11-platform:
if: ${{ github.event.inputs.target_os == 'debian-11-platform' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package debian-11 ${{ github.event.inputs.build_type }} --for-platform
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: debian-11-platform
path: build/output/debian-11/memgraph*.deb
docker:
if: ${{ github.event.inputs.target_os == 'docker' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
cd release/package
./run.sh package debian-11 ${{ github.event.inputs.build_type }} --for-docker
./run.sh docker
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: docker
path: build/output/docker/memgraph*.tar.gz
fedora-36:
if: ${{ github.event.inputs.target_os == 'fedora-36' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package fedora-36 ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: fedora-36
path: build/output/fedora-36/memgraph*.rpm
ubuntu-18_04:
if: ${{ github.event.inputs.target_os == 'ubuntu-18_04' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package ubuntu-18.04 ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: ubuntu-18.04
path: build/output/ubuntu-18.04/memgraph*.deb
ubuntu-20_04:
if: ${{ github.event.inputs.target_os == 'ubuntu-20_04' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package ubuntu-20.04 ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: ubuntu-20.04
path: build/output/ubuntu-20.04/memgraph*.deb
ubuntu-22_04:
if: ${{ github.event.inputs.target_os == 'ubuntu-22_04' }}
runs-on: [self-hosted, DockerMgBuild, X64]
timeout-minutes: 60
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package ubuntu-22.04 ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: ubuntu-22.04
path: build/output/ubuntu-22.04/memgraph*.deb
ubuntu-22_04-arm:
if: ${{ github.event.inputs.target_os == 'ubuntu-22_04-arm' }}
runs-on: [self-hosted, DockerMgBuild, ARM64, strange]
timeout-minutes: 120
steps:
- name: "Set up repository"
uses: actions/checkout@v3
with:
fetch-depth: 0 # Required because of release/get_version.py
- name: "Build package"
run: |
./release/package/run.sh package ubuntu-22.04-arm ${{ github.event.inputs.build_type }}
- name: "Upload package"
uses: actions/upload-artifact@v3
with:
name: ubuntu-22.04-aarch64
path: build/output/ubuntu-22.04-arm/memgraph*.deb
upload-to-s3:
# only run upload if we specified version. Allows for runs without upload
if: "${{ github.event.inputs.memgraph_version != '' }}"
needs: [amzn-2, centos-7, centos-9, debian-10, debian-11, debian-11-arm, debian-11-platform, docker, fedora-36, ubuntu-18_04, ubuntu-20_04, ubuntu-22_04, ubuntu-22_04-arm]
runs-on: ubuntu-latest
steps:
- name: Download artifacts
uses: actions/download-artifact@v3
with:
# name: # if name input parameter is not provided, all artifacts are downloaded
# and put in directories named after each one.
path: build/output/release
- name: Upload to S3
uses: jakejarvis/s3-sync-action@v0.5.1
env:
AWS_S3_BUCKET: "download.memgraph.com"
AWS_ACCESS_KEY_ID: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: "eu-west-1"
SOURCE_DIR: "build/output/release"
DEST_DIR: "memgraph/v${{ github.event.inputs.memgraph_version }}/"

View File

@ -158,9 +158,9 @@ install() {
continue
fi
if [ "$pkg" == nodejs ]; then
curl -sL https://rpm.nodesource.com/setup_16.x | bash -
if ! yum list installed nodejs >/dev/null 2>/dev/null; then
yum install -y nodejs
yum install https://rpm.nodesource.com/pub_16.x/nodistro/repo/nodesource-release-nodistro-1.noarch.rpm -y
yum install nodejs -y --setopt=nodesource-nodejs.module_hotfixes=1
fi
continue
fi
@ -172,13 +172,6 @@ install() {
fi
continue
fi
if [ "$pkg" == nodejs ]; then
curl -sL https://rpm.nodesource.com/setup_16.x | bash -
if ! yum list installed nodejs >/dev/null 2>/dev/null; then
yum install -y nodejs
fi
continue
fi
if [ "$pkg" == java-11-openjdk ]; then
amazon-linux-extras install -y java-openjdk11
continue

View File

@ -13,6 +13,7 @@ string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)
add_library(example_c SHARED example.c)
target_include_directories(example_c PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_compile_options(example_c PRIVATE -Wall)
target_link_libraries(example_c PRIVATE -static-libgcc -static-libstdc++)
# Strip C example in release build.
if (lower_build_type STREQUAL "release")
add_custom_command(TARGET example_c POST_BUILD
@ -28,6 +29,7 @@ install(FILES example.c DESTINATION lib/memgraph/query_modules/src)
add_library(example_cpp SHARED example.cpp)
target_include_directories(example_cpp PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_compile_options(example_cpp PRIVATE -Wall)
target_link_libraries(example_cpp PRIVATE -static-libgcc -static-libstdc++)
# Strip C++ example in release build.
if (lower_build_type STREQUAL "release")
add_custom_command(TARGET example_cpp POST_BUILD
@ -43,6 +45,7 @@ install(FILES example.cpp DESTINATION lib/memgraph/query_modules/src)
add_library(schema SHARED schema.cpp)
target_include_directories(schema PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_compile_options(schema PRIVATE -Wall)
target_link_libraries(schema PRIVATE -static-libgcc -static-libstdc++)
# Strip C++ example in release build.
if (lower_build_type STREQUAL "release")
add_custom_command(TARGET schema POST_BUILD

View File

@ -72,6 +72,10 @@ make_package () {
if [[ "$(git rev-parse --abbrev-ref HEAD)" != "master" ]]; then
git fetch origin master:master
fi
# Ensure we have a clean build directory
docker exec "$build_container" rm -rf /memgraph
docker exec "$build_container" mkdir -p /memgraph
# TODO(gitbuda): Revisit copying the whole repo -> makese sense under CI.
docker cp "$PROJECT_ROOT/." "$build_container:/memgraph/"

View File

@ -61,7 +61,7 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t
// This needs to be before, to throw exception in case of too big alloc
if (*commit) [[likely]] {
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackAllocOnCurrentThread(size);
}
}
@ -70,7 +70,7 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t
if (ptr == nullptr) [[unlikely]] {
if (*commit) {
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
}
}
@ -90,7 +90,7 @@ static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, boo
if (committed) [[likely]] {
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
}
}
@ -101,7 +101,7 @@ static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, boo
static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind) {
if (committed) [[likely]] {
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
}
}
@ -118,7 +118,7 @@ static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, siz
}
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
}
@ -135,7 +135,7 @@ static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, s
}
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
}
@ -152,7 +152,7 @@ static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t siz
}
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
}
@ -179,8 +179,11 @@ void SetHooks() {
return;
}
// Needs init due to error we might encounter otherwise
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=13684
[[maybe_unused]] const auto &queries_memory_control = GetQueriesMemoryControl();
for (int i = 0; i < n_arenas; i++) {
GetQueriesMemoryControl().InitializeArenaCounter(i);
std::string func_name = "arena." + std::to_string(i) + ".extent_hooks";
size_t hooks_len = sizeof(old_hooks);
@ -240,7 +243,6 @@ void UnsetHooks() {
}
for (int i = 0; i < n_arenas; i++) {
GetQueriesMemoryControl().InitializeArenaCounter(i);
std::string func_name = "arena." + std::to_string(i) + ".extent_hooks";
MG_ASSERT(old_hooks);

View File

@ -22,6 +22,7 @@
#include "query_memory_control.hpp"
#include "utils/exceptions.hpp"
#include "utils/logging.hpp"
#include "utils/memory.hpp"
#include "utils/memory_tracker.hpp"
#include "utils/rw_spin_lock.hpp"
@ -33,20 +34,6 @@ namespace memgraph::memory {
#if USE_JEMALLOC
unsigned QueriesMemoryControl::GetArenaForThread() {
unsigned thread_arena{0};
size_t size_thread_arena = sizeof(thread_arena);
int err = mallctl("thread.arena", &thread_arena, &size_thread_arena, nullptr, 0);
if (err) {
LOG_FATAL("Can't get arena for thread.");
}
return thread_arena;
}
void QueriesMemoryControl::AddTrackingOnArena(unsigned arena_id) { arena_tracking[arena_id].fetch_add(1); }
void QueriesMemoryControl::RemoveTrackingOnArena(unsigned arena_id) { arena_tracking[arena_id].fetch_sub(1); }
void QueriesMemoryControl::UpdateThreadToTransactionId(const std::thread::id &thread_id, uint64_t transaction_id) {
auto accessor = thread_id_to_transaction_id.access();
accessor.insert({thread_id, transaction_id});
@ -119,14 +106,6 @@ bool QueriesMemoryControl::EraseTransactionIdTracker(uint64_t transaction_id) {
return removed;
}
bool QueriesMemoryControl::IsArenaTracked(unsigned arena_ind) {
return arena_tracking[arena_ind].load(std::memory_order_acquire) != 0;
}
void QueriesMemoryControl::InitializeArenaCounter(unsigned arena_ind) {
arena_tracking[arena_ind].store(0, std::memory_order_relaxed);
}
bool QueriesMemoryControl::CheckTransactionIdTrackerExists(uint64_t transaction_id) {
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
return transaction_id_to_tracker_accessor.contains(transaction_id);
@ -166,19 +145,29 @@ void QueriesMemoryControl::PauseProcedureTracking(uint64_t transaction_id) {
query_tracker->tracker.StopProcTracking();
}
inline int &Get_Thread_Tracker() {
// store variable in bss segment for each thread
// https://cs-fundamentals.com/c-programming/memory-layout-of-c-program-code-data-segments#size-of-code-data-bss-segments
static thread_local int is_thread_tracked{0};
return is_thread_tracked;
}
bool QueriesMemoryControl::IsThreadTracked() { return Get_Thread_Tracker() == 1; }
#endif
void StartTrackingCurrentThreadTransaction(uint64_t transaction_id) {
#if USE_JEMALLOC
Get_Thread_Tracker() = 0;
GetQueriesMemoryControl().UpdateThreadToTransactionId(std::this_thread::get_id(), transaction_id);
GetQueriesMemoryControl().AddTrackingOnArena(QueriesMemoryControl::GetArenaForThread());
Get_Thread_Tracker() = 1;
#endif
}
void StopTrackingCurrentThreadTransaction(uint64_t transaction_id) {
#if USE_JEMALLOC
Get_Thread_Tracker() = 0;
GetQueriesMemoryControl().EraseThreadToTransactionId(std::this_thread::get_id(), transaction_id);
GetQueriesMemoryControl().RemoveTrackingOnArena(QueriesMemoryControl::GetArenaForThread());
#endif
}

View File

@ -33,25 +33,6 @@ static constexpr int64_t UNLIMITED_MEMORY{0};
// it is necessary to restart tracking at the beginning of new query for that transaction.
class QueriesMemoryControl {
public:
/*
Arena stats
*/
static unsigned GetArenaForThread();
// Add counter on threads allocating inside arena
void AddTrackingOnArena(unsigned);
// Remove counter on threads allocating in arena
void RemoveTrackingOnArena(unsigned);
// Are any threads using current arena for allocations
// Multiple threads can allocate inside one arena
bool IsArenaTracked(unsigned);
// Initialize arena counter
void InitializeArenaCounter(unsigned);
/*
Transaction id <-> tracker
*/
@ -94,9 +75,9 @@ class QueriesMemoryControl {
void PauseProcedureTracking(uint64_t);
private:
std::unordered_map<unsigned, std::atomic<int>> arena_tracking;
bool IsThreadTracked();
private:
struct ThreadIdToTransactionId {
std::thread::id thread_id;
uint64_t transaction_id;

View File

@ -43,23 +43,6 @@ struct CachedValue {
return cache_.get_allocator().GetMemoryResource();
}
// Func to check if cache_ contains value
bool CacheValue(TypedValue &&maybe_list) {
if (!maybe_list.IsList()) {
return false;
}
auto &list = maybe_list.ValueList();
TypedValue::Hash hash{};
for (auto &element : list) {
const auto key = hash(element);
auto &vector_values = cache_[key];
if (!IsValueInVec(vector_values, element)) {
vector_values.emplace_back(std::move(element));
}
}
return true;
}
bool CacheValue(const TypedValue &maybe_list) {
if (!maybe_list.IsList()) {
return false;

View File

@ -315,8 +315,8 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
return std::move(*preoperational_checks);
}
auto &cached_value = frame_change_collector_->GetCachedValue(*cached_id);
cached_value.CacheValue(std::move(list));
spdlog::trace("Value cached {}", *cached_id);
// Don't move here because we don't want to remove the element from the frame
cached_value.CacheValue(list);
}
const auto &cached_value = frame_change_collector_->GetCachedValue(*cached_id);
@ -338,7 +338,6 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
}
const auto &list_value = list.ValueList();
spdlog::trace("Not using cache on IN LIST operator");
auto has_null = false;
for (const auto &element : list_value) {
auto result = literal == element;

View File

@ -1535,7 +1535,6 @@ inline static void TryCaching(const AstStorage &ast_storage, FrameChangeCollecto
continue;
}
frame_change_collector->AddTrackingKey(*cached_id);
spdlog::trace("Tracking {} operator, by id: {}", InListOperator::kType.name, *cached_id);
}
}

View File

@ -3463,7 +3463,7 @@ class AggregateCursor : public Cursor {
SCOPED_PROFILE_OP_BY_REF(self_);
if (!pulled_all_input_) {
ProcessAll(&frame, &context);
if (!ProcessAll(&frame, &context) && self_.AreAllAggregationsForCollecting()) return false;
pulled_all_input_ = true;
aggregation_it_ = aggregation_.begin();
@ -3487,7 +3487,6 @@ class AggregateCursor : public Cursor {
return true;
}
}
if (aggregation_it_ == aggregation_.end()) return false;
// place aggregation values on the frame
@ -3567,12 +3566,16 @@ class AggregateCursor : public Cursor {
* cache cardinality depends on number of
* aggregation results, and not on the number of inputs.
*/
void ProcessAll(Frame *frame, ExecutionContext *context) {
bool ProcessAll(Frame *frame, ExecutionContext *context) {
ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, context->db_accessor,
storage::View::NEW);
bool pulled = false;
while (input_cursor_->Pull(*frame, *context)) {
ProcessOne(*frame, &evaluator);
pulled = true;
}
if (!pulled) return false;
// post processing
for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) {
@ -3606,6 +3609,7 @@ class AggregateCursor : public Cursor {
break;
}
}
return true;
}
/**
@ -3819,6 +3823,12 @@ UniqueCursorPtr Aggregate::MakeCursor(utils::MemoryResource *mem) const {
return MakeUniqueCursorPtr<AggregateCursor>(mem, *this, mem);
}
auto Aggregate::AreAllAggregationsForCollecting() const -> bool {
return std::all_of(aggregations_.begin(), aggregations_.end(), [](const auto &agg) {
return agg.op == Aggregation::Op::COLLECT_LIST || agg.op == Aggregation::Op::COLLECT_MAP;
});
}
Skip::Skip(const std::shared_ptr<LogicalOperator> &input, Expression *expression)
: input_(input), expression_(expression) {}
@ -5404,7 +5414,7 @@ class HashJoinCursor : public Cursor {
// Check if the join value from the pulled frame is shared with any left frames
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::View::OLD);
auto right_value = self_.hash_join_condition_->expression1_->Accept(evaluator);
auto right_value = self_.hash_join_condition_->expression2_->Accept(evaluator);
if (hashtable_.contains(right_value)) {
// If so, finish pulling for now and proceed to joining the pulled frame
right_op_frame_.assign(frame.elems().begin(), frame.elems().end());
@ -5452,7 +5462,7 @@ class HashJoinCursor : public Cursor {
while (left_op_cursor_->Pull(frame, context)) {
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::View::OLD);
auto left_value = self_.hash_join_condition_->expression2_->Accept(evaluator);
auto left_value = self_.hash_join_condition_->expression1_->Accept(evaluator);
if (left_value.type() != TypedValue::Type::Null) {
hashtable_[left_value].emplace_back(frame.elems().begin(), frame.elems().end());
}

View File

@ -1758,6 +1758,9 @@ class Aggregate : public memgraph::query::plan::LogicalOperator {
Aggregate() = default;
Aggregate(const std::shared_ptr<LogicalOperator> &input, const std::vector<Element> &aggregations,
const std::vector<Expression *> &group_by, const std::vector<Symbol> &remember);
auto AreAllAggregationsForCollecting() const -> bool;
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;

View File

@ -27,6 +27,7 @@
#include "query/plan/operator.hpp"
#include "query/plan/preprocess.hpp"
#include "utils/algorithm.hpp"
namespace memgraph::query::plan {
@ -145,8 +146,11 @@ class JoinRewriter final : public HierarchicalLogicalOperatorVisitor {
bool PreVisit(IndexedJoin &op) override {
prev_ops_.push_back(&op);
return true;
RewriteBranch(&op.main_branch_);
RewriteBranch(&op.sub_branch_);
return false;
}
bool PostVisit(IndexedJoin &) override {
prev_ops_.pop_back();
return true;
@ -523,6 +527,13 @@ class JoinRewriter final : public HierarchicalLogicalOperatorVisitor {
auto rhs_property = rhs_lookup->property_;
filter_exprs_for_removal_.insert(filter.expression);
filters_.EraseFilter(filter);
if (utils::Contains(right_symbols, lhs_symbol) && utils::Contains(left_symbols, rhs_symbol)) {
// We need to duplicate this because expressions are shared between plans
join_condition = join_condition->Clone(ast_storage_);
std::swap(join_condition->expression1_, join_condition->expression2_);
}
return std::make_unique<HashJoin>(left_op, left_symbols, right_op, right_symbols, join_condition);
}

View File

@ -397,7 +397,12 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
spdlog::info("All necessary WAL files are loaded successfully.");
}
RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices);
const auto par_exec_info =
config.durability.allow_parallel_index_creation && !recovery_info.vertex_batches.empty()
? std::make_optional(std::make_pair(recovery_info.vertex_batches, config.durability.recovery_thread_count))
: std::nullopt;
RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices, par_exec_info);
memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us,
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());

View File

@ -107,9 +107,6 @@ class MemoryTracker final {
};
private:
enum class TrackingMode { DEFAULT, ADDITIONAL_PROC };
TrackingMode tracking_mode_{TrackingMode::DEFAULT};
std::atomic<int64_t> amount_{0};
std::atomic<int64_t> peak_{0};
std::atomic<int64_t> hard_limit_{0};

View File

@ -70,6 +70,7 @@ add_subdirectory(index_hints)
add_subdirectory(query_modules)
add_subdirectory(constraints)
add_subdirectory(inspect_query)
add_subdirectory(queries)
copy_e2e_python_files(pytest_runner pytest_runner.sh "")
copy_e2e_python_files(x x.sh "")

View File

@ -0,0 +1,6 @@
function(copy_queries_e2e_python_files FILE_NAME)
copy_e2e_python_files(queries ${FILE_NAME})
endfunction()
copy_queries_e2e_python_files(common.py)
copy_queries_e2e_python_files(queries.py)

View File

@ -0,0 +1,24 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import pytest
from gqlalchemy import Memgraph
@pytest.fixture
def memgraph(**kwargs) -> Memgraph:
memgraph = Memgraph()
yield memgraph
memgraph.drop_indexes()
memgraph.ensure_constraints([])
memgraph.drop_database()

View File

@ -0,0 +1,39 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import pytest
from common import memgraph
def test_indexed_join_with_indices(memgraph):
memgraph.execute(
"CREATE (c:A {prop: 1})-[b:TYPE]->(p:A {prop: 1}) CREATE (cf:B:A {prop : 1}) CREATE (pf:B:A {prop : 1});"
)
memgraph.execute("CREATE INDEX ON :A;")
memgraph.execute("CREATE INDEX ON :B;")
memgraph.execute("CREATE INDEX ON :A(prop);")
memgraph.execute("CREATE INDEX ON :B(prop);")
results = list(
memgraph.execute_and_fetch(
"match (c:A)-[b:TYPE]->(p:A) match (cf:B:A {prop : c.prop}) match (pf:B:A {prop : p.prop}) return c;"
)
)
assert len(results) == 4
for res in results:
assert res["c"].prop == 1
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,14 @@
queries_cluster: &queries_cluster
cluster:
main:
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "queries.log"
setup_queries: []
validation_queries: []
workloads:
- name: "Queries validation"
binary: "tests/e2e/pytest_runner.sh"
args: ["queries/queries.py"]
<<: *queries_cluster

View File

@ -697,7 +697,7 @@ def test_sync_replication_when_main_is_killed():
)
# 2/
QUERY_TO_CHECK = "MATCH (n) RETURN COLLECT(n.name);"
QUERY_TO_CHECK = "MATCH (n) RETURN COUNT(n.name);"
last_result_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK)[0][0]
for index in range(50):
interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(f"CREATE (p:Number {{name:{index}}})")

View File

@ -401,4 +401,30 @@ Feature: Aggregations
MATCH p=()-[:Z]->() WITH project(p) as graph WITH graph.edges as edges UNWIND edges as e RETURN e.prop as y ORDER BY y DESC
"""
Then the result should be:
| y |
| y |
Scenario: Empty collect aggregation:
Given an empty graph
And having executed
"""
CREATE (s:Subnet {ip: "192.168.0.1"})
"""
When executing query:
"""
MATCH (subnet:Subnet) WHERE FALSE WITH subnet, collect(subnet.ip) as ips RETURN id(subnet) as id
"""
Then the result should be empty
Scenario: Empty count aggregation:
Given an empty graph
And having executed
"""
CREATE (s:Subnet {ip: "192.168.0.1"})
"""
When executing query:
"""
MATCH (subnet:Subnet) WHERE FALSE WITH subnet, count(subnet.ip) as ips RETURN id(subnet) as id
"""
Then the result should be:
| id |
| null |

View File

@ -186,6 +186,20 @@ Feature: Cartesian
| a | b |
| (:A {id: 1}) | (:B {id: 1}) |
Scenario: Multiple match with WHERE x = y 01 reversed
Given an empty graph
And having executed
"""
CREATE (:A {id: 1}), (:A {id: 2}), (:B {id: 1})
"""
When executing query:
"""
MATCH (a:A) MATCH (b:B) WHERE b.id = a.id RETURN a, b
"""
Then the result should be:
| a | b |
| (:A {id: 1}) | (:B {id: 1}) |
Scenario: Multiple match with WHERE x = y 02
Given an empty graph
And having executed

View File

@ -263,3 +263,19 @@ Feature: List operators
| id |
| 1 |
| 2 |
Scenario: InList 01
Given an empty graph
And having executed
"""
CREATE (o:Node) SET o.Status = 'This is the status';
"""
When executing query:
"""
match (o:Node)
where o.Status IN ['This is not the status', 'This is the status']
return o;
"""
Then the result should be:
| o |
| (:Node {Status: 'This is the status'}) |

View File

@ -401,4 +401,30 @@ Feature: Aggregations
MATCH p=()-[:Z]->() WITH project(p) as graph WITH graph.edges as edges UNWIND edges as e RETURN e.prop as y ORDER BY y DESC
"""
Then the result should be:
| y |
| y |
Scenario: Empty collect aggregation:
Given an empty graph
And having executed
"""
CREATE (s:Subnet {ip: "192.168.0.1"})
"""
When executing query:
"""
MATCH (subnet:Subnet) WHERE FALSE WITH subnet, collect(subnet.ip) as ips RETURN id(subnet) as id
"""
Then the result should be empty
Scenario: Empty count aggregation:
Given an empty graph
And having executed
"""
CREATE (s:Subnet {ip: "192.168.0.1"})
"""
When executing query:
"""
MATCH (subnet:Subnet) WHERE FALSE WITH subnet, count(subnet.ip) as ips RETURN id(subnet) as id
"""
Then the result should be:
| id |
| null |

View File

@ -277,13 +277,11 @@ TYPED_TEST(QueryPlanAggregateOps, WithoutDataWithGroupBy) {
}
{
auto results = this->AggregationResults(true, false, {Aggregation::Op::COLLECT_LIST});
EXPECT_EQ(results.size(), 1);
EXPECT_EQ(results[0][0].type(), TypedValue::Type::List);
EXPECT_EQ(results.size(), 0);
}
{
auto results = this->AggregationResults(true, false, {Aggregation::Op::COLLECT_MAP});
EXPECT_EQ(results.size(), 1);
EXPECT_EQ(results[0][0].type(), TypedValue::Type::Map);
EXPECT_EQ(results.size(), 0);
}
}
@ -695,13 +693,11 @@ TYPED_TEST(QueryPlanAggregateOps, WithoutDataWithDistinctAndWithGroupBy) {
}
{
auto results = this->AggregationResults(true, true, {Aggregation::Op::COLLECT_LIST});
EXPECT_EQ(results.size(), 1);
EXPECT_EQ(results[0][0].type(), TypedValue::Type::List);
EXPECT_EQ(results.size(), 0);
}
{
auto results = this->AggregationResults(true, true, {Aggregation::Op::COLLECT_MAP});
EXPECT_EQ(results.size(), 1);
EXPECT_EQ(results[0][0].type(), TypedValue::Type::Map);
EXPECT_EQ(results.size(), 0);
}
}