Add flag for after commit trigger pool size

This commit is contained in:
János Benjamin Antal 2022-02-04 14:52:06 +01:00
parent 6a2e566492
commit f57843088b
4 changed files with 25 additions and 11 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -219,11 +219,19 @@ DEFINE_double(query_execution_timeout_sec, 600,
"Maximum allowed query execution time. Queries exceeding this "
"limit will be aborted. Value of 0 means no limit.");
DEFINE_VALIDATED_uint64(after_commit_trigger_pool_size, 1, "Number of threads to process after commit triggers.", {
if (value == 0) {
std::cout << "At least one thread is required for processing after commit triggers!" << std::endl;
return false;
}
return true;
});
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint64(
memory_limit, 0,
"Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical memory if the swap "
"is enabled and 90\% of the physical memory otherwise.");
DEFINE_uint64(memory_limit, 0,
"Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical "
"memory if the swap "
"is enabled and 90\% of the physical memory otherwise.");
namespace {
using namespace std::literals;
@ -1135,6 +1143,7 @@ int main(int argc, char **argv) {
&db,
{.query = {.allow_load_csv = FLAGS_allow_load_csv},
.execution_timeout_sec = FLAGS_query_execution_timeout_sec,
.after_commit_trigger_pool_size = FLAGS_after_commit_trigger_pool_size,
.default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers,
.default_pulsar_service_url = FLAGS_pulsar_service_url,
.stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries,

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -22,6 +22,8 @@ struct InterpreterConfig {
// The default execution timeout is 10 minutes.
double execution_timeout_sec{600.0};
size_t after_commit_trigger_pool_size;
std::string default_kafka_bootstrap_servers;
std::string default_pulsar_service_url;
uint32_t stream_transaction_conflict_retries;

View File

@ -987,7 +987,11 @@ using RWType = plan::ReadWriteTypeChecker::RWType;
InterpreterContext::InterpreterContext(storage::Storage *db, const InterpreterConfig config,
const std::filesystem::path &data_directory)
: db(db), trigger_store(data_directory / "triggers"), config(config), streams{this, data_directory / "streams"} {}
: db(db),
config(config),
trigger_store(data_directory / "triggers"),
after_commit_trigger_pool{config.after_commit_trigger_pool_size},
streams{this, data_directory / "streams"} {}
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -168,6 +168,7 @@ struct InterpreterContext {
const std::filesystem::path &data_directory);
storage::Storage *db;
const InterpreterConfig config;
// ANTLR has singleton instance that is shared between threads. It is
// protected by locks inside of ANTLR. Unfortunately, they are not protected
@ -186,9 +187,7 @@ struct InterpreterContext {
utils::SkipList<PlanCacheEntry> plan_cache;
TriggerStore trigger_store;
utils::ThreadPool after_commit_trigger_pool{1};
const InterpreterConfig config;
utils::ThreadPool after_commit_trigger_pool;
query::stream::Streams streams;
};