Fix concurrent query module race condition (#1158)

Concurrent access to the same query module had a race condition on the
pointer that was used to handle the custom memory management. With this
commit, a mapping has been added to keep information about what
thread used the pointer to handle the memory resources. This should be
fine since the respected query executions are running on a dedicated
thread. Access to the mapping itself is threadsafe. A simple RAII
wrapper for the mapping container has also been added for simpler
client-side use.
This commit is contained in:
gvolfing 2023-08-21 16:45:36 +02:00 committed by GitHub
parent 97183fb9da
commit 476968e2c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 427 additions and 159 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -18,7 +18,11 @@ void ProcImpl(std::vector<mgp::Value> arguments, mgp::Graph graph, mgp::RecordFa
void SampleReadProc(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
try {
mgp::memory = memory;
// The outcommented way of assigning the memory pointer is still
// working, but it is deprecated because of certain concurrency
// issues. Please use the guard instead.
// mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
std::vector<mgp::Value> arguments;
for (size_t i = 0; i < mgp::list_size(args); i++) {
@ -34,7 +38,11 @@ void SampleReadProc(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *resul
}
void AddXNodes(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::memory = memory;
// The outcommented way of assigning the memory pointer is still
// working, but it is deprecated because of certain concurrency
// issues. Please use the guard instead.
// mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
auto graph = mgp::Graph(memgraph_graph);
std::vector<mgp::Value> arguments;
@ -49,7 +57,11 @@ void AddXNodes(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mg
}
void Multiply(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::memory = memory;
// The outcommented way of assigning the memory pointer is still
// working, but it is deprecated because of certain concurrency
// issues. Please use the guard instead.
// mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
std::vector<mgp::Value> arguments;
for (size_t i = 0; i < mgp::list_size(args); i++) {
@ -67,7 +79,11 @@ void Multiply(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_m
extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
try {
mgp::memory = memory;
// The outcommented way of assigning the memory pointer is still
// working, but it is deprecated because of certain concurrency
// issues. Please use the guard instead.
// mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
AddProcedure(SampleReadProc, "return_true", mgp::ProcedureType::Read,
{mgp::Parameter("param_1", mgp::Type::Int), mgp::Parameter("param_2", mgp::Type::Double, 2.3)},
@ -77,7 +93,11 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
}
try {
mgp::memory = memory;
// The outcommented way of assigning the memory pointer is still
// working, but it is deprecated because of certain concurrency
// issues. Please use the guard instead.
// mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
mgp::AddProcedure(AddXNodes, "add_x_nodes", mgp::ProcedureType::Write, {mgp::Parameter("param_1", mgp::Type::Int)},
{}, module, memory);
@ -87,7 +107,11 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
}
try {
mgp::memory = memory;
// The outcommented way of assigning the memory pointer is still
// working, but it is deprecated because of certain concurrency
// issues. Please use the guard instead.
// mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
mgp::AddFunction(Multiply, "multiply",
{mgp::Parameter("int", mgp::Type::Int), mgp::Parameter("int", mgp::Type::Int, (int64_t)3)}, module,

View File

@ -61,6 +61,7 @@ add_subdirectory(load_csv)
add_subdirectory(init_file_flags)
add_subdirectory(analytical_mode)
add_subdirectory(batched_procedures)
add_subdirectory(concurrent_query_modules)
copy_e2e_python_files(pytest_runner pytest_runner.sh "")
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/memgraph-selfsigned.crt DESTINATION ${CMAKE_CURRENT_BINARY_DIR})

View File

@ -30,7 +30,7 @@ static int returned_strings{0};
const char *kReturnOutput = "output";
void NumsBatchInit(struct mgp_list *args, mgp_graph *graph, struct mgp_memory *memory) {
mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
const auto arguments = mgp::List(args);
if (arguments.Empty()) {
throw std::runtime_error("Expected to recieve argument");
@ -43,7 +43,7 @@ void NumsBatchInit(struct mgp_list *args, mgp_graph *graph, struct mgp_memory *m
}
void NumsBatch(struct mgp_list *args, mgp_graph *graph, mgp_result *result, struct mgp_memory *memory) {
mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
const auto arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
if (returned_ints < num_ints) {
@ -58,7 +58,7 @@ void NumsBatchCleanup() {
}
void StringsBatchInit(struct mgp_list *args, mgp_graph *graph, struct mgp_memory *memory) {
mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
const auto arguments = mgp::List(args);
if (arguments.Empty()) {
throw std::runtime_error("Expected to recieve argument");
@ -71,7 +71,7 @@ void StringsBatchInit(struct mgp_list *args, mgp_graph *graph, struct mgp_memory
}
void StringsBatch(struct mgp_list *args, mgp_graph *graph, mgp_result *result, struct mgp_memory *memory) {
mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
const auto arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
@ -117,7 +117,7 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
{
try {
mgp::memory = memory;
mgp::MemoryDispatcherGuard guard(memory);
mgp::AddBatchProcedure(StringsBatch, StringsBatchInit, StringsBatchCleanup, "batch_strings",
mgp::ProcedureType::Read, {mgp::Parameter("num_strings", mgp::Type::Int)},
{mgp::Return("output", mgp::Type::String)}, module, memory);

View File

@ -0,0 +1,8 @@
function(copy_concurrent_query_modules_e2e_python_files FILE_NAME)
copy_e2e_python_files(concurrent_query_modules ${FILE_NAME})
endfunction()
copy_concurrent_query_modules_e2e_python_files(client.py)
copy_concurrent_query_modules_e2e_python_files(con_query_modules.py)
add_subdirectory(test_query_modules)

View File

@ -0,0 +1,33 @@
import multiprocessing
import mgclient
import pytest
def inner(query, number_of_executions):
connection = mgclient.connect(host="127.0.0.1", port=7687)
connection.autocommit = True
cursor = connection.cursor()
for _ in range(number_of_executions):
cursor.execute(query)
cursor.fetchall()
class MemgraphClient:
def __init__(self):
self.query_list = []
def initialize_to_execute(self, query: str, number_of_executions):
self.query_list.append((query, number_of_executions))
def execute_queries(self):
num_processes = len(self.query_list)
with multiprocessing.Pool(processes=num_processes) as pool:
pool.starmap(inner, self.query_list)
return True
@pytest.fixture
def client() -> MemgraphClient:
return MemgraphClient()

View File

@ -0,0 +1,19 @@
import sys
import pytest
from client import *
query = "MATCH (my_person:Person)-[:FOLLOW]->(follow_person:Person) MATCH (follow_person)-[: LIKE]->(post:Post) WHERE post.indexedAt IS NOT NULL AND NOT exists((post)-[:ROOT]->(:Post)) WITH localDateTime() - post.indexedAt as duration, post, follow_person WHERE duration.day < 5 WITH (duration.day * 24) + duration.hour as hour_age, post, follow_person ORDER BY post.indexedAt DESC LIMIT 500 MATCH(: Person) - [l: LIKE] -> (post) WITH count(l) as likes, hour_age, post, follow_person CALL libmodule_test.hacker_news(likes, 123, 4.1) YIELD score RETURN ID(post), post.uri, hour_age, likes, score, follow_person ORDER BY score DESC, hour_age ASC, post.indexedAt DESC LIMIT 100;"
def test_concurrent_module_access(client):
client.initialize_to_execute(query, 200)
client.initialize_to_execute(query, 200)
client.initialize_to_execute(query, 200)
success = client.execute_queries()
assert success
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,3 @@
project(TestSharedObjects)
add_library(module_test SHARED module_test.cpp)
target_include_directories(module_test PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../../../../include)

View File

@ -0,0 +1,59 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <mgp.hpp>
#include <chrono>
#include <cmath>
#include <list>
#include <thread>
constexpr char *kProcedureHackerNews = "hacker_news";
constexpr char *kArgumentHackerNewsVotes = "votes";
constexpr char *kArgumentHackerNewsItemHourAge = "item_hour_age";
constexpr char *kArgumentHackerNewsGravity = "gravity";
constexpr char *kReturnHackerNewsScore = "score";
void HackerNews(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard(memory);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
const auto &arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
try {
const auto votes = arguments[0].ValueInt();
const auto item_hour_age = arguments[1].ValueInt();
const auto gravity = arguments[2].ValueDouble();
const auto score = 1000000.0 * (votes / pow((item_hour_age + 2), gravity));
auto record = record_factory.NewRecord();
record.Insert(kReturnHackerNewsScore, score);
} catch (const std::exception &e) {
record_factory.SetErrorMessage(e.what());
return;
}
}
extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard(memory);
std::vector<mgp::Parameter> params = {
mgp::Parameter(kArgumentHackerNewsVotes, mgp::Type::Int),
mgp::Parameter(kArgumentHackerNewsItemHourAge, mgp::Type::Int),
mgp::Parameter(kArgumentHackerNewsGravity, mgp::Type::Double),
};
std::vector<mgp::Return> returns = {mgp::Return(kReturnHackerNewsScore, mgp::Type::Double)};
AddProcedure(HackerNews, kProcedureHackerNews, mgp::ProcedureType::Read, params, returns, module, memory);
} catch (const std::exception &e) {
return 1;
}
return 0;
}
extern "C" int mgp_shutdown_module() { return 0; }

View File

@ -0,0 +1,33 @@
args: &args
- "--bolt-port"
- "7687"
- "--log-level"
- "TRACE"
in_memory_cluster: &in_memory_cluster
cluster:
main:
args: *args
log_file: "concurrent-query-modules-e2e.log"
setup_queries: []
validation_queries: []
disk_cluster: &disk_cluster
cluster:
main:
args: *args
log_file: "concurrent-query-modules-e2e.log"
setup_queries: ["STORAGE MODE ON_DISK_TRANSACTIONAL"]
validation_queries: []
workloads:
- name: "Concurrent query modules"
binary: "tests/e2e/pytest_runner.sh"
proc: "tests/e2e/concurrent_query_modules/test_query_modules/"
args: ["concurrent_query_modules/con_query_modules.py"]
<<: *in_memory_cluster
- name: "Disk concurrent query modules"
binary: "tests/e2e/pytest_runner.sh"
proc: "tests/e2e/concurrent_query_modules/test_query_modules/"
args: ["concurrent_query_modules/con_query_modules.py"]
<<: *disk_cluster

View File

@ -27,12 +27,13 @@
template <typename StorageType>
struct CppApiTestFixture : public ::testing::Test {
protected:
virtual void SetUp() override { mgp::memory = &memory; }
virtual void SetUp() override { mgp::mrd.Register(&memory); }
void TearDown() override {
if (std::is_same<StorageType, memgraph::storage::DiskStorage>::value) {
disk_test_utils::RemoveRocksDbDirs(testSuite);
}
mgp::mrd.UnRegister();
}
mgp_graph CreateGraph(const memgraph::storage::View view = memgraph::storage::View::NEW) {