Merge branch 'master' into T0804-MG-base-thrift-project

This commit is contained in:
János Benjamin Antal 2022-06-08 11:21:53 +02:00
commit c651a8d15a
59 changed files with 1124 additions and 515 deletions

View File

@ -88,4 +88,3 @@ CheckOptions:
- key: modernize-use-nullptr.NullMacros
value: 'NULL'
...

View File

@ -24,14 +24,6 @@ for file in $modified_files; do
git checkout-index --prefix="$tmpdir/" -- $file
echo "Running clang-format..."
$project_folder/tools/git-clang-format $tmpdir/$file
CODE=$?
if [ $CODE -ne 0 ]; then
break
fi
# Do not break header checker
echo "Running header checker..."
$project_folder/tools/header-checker.py $tmpdir/$file $file --amend-year
@ -39,7 +31,6 @@ for file in $modified_files; do
if [ $CODE -ne 0 ]; then
FAIL=1
fi
done;
return ${FAIL}

24
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,24 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
args: # arguments to configure black
- --line-length=120
- --include='\.pyi?$'
# these folders wont be formatted by black
- --exclude="""\.git |
\.__pycache__|
build|
libs|
.cache"""
- repo: https://github.com/pre-commit/mirrors-clang-format
rev: v13.0.0
hooks:
- id: clang-format

View File

@ -47,6 +47,14 @@ modifications:
value: ""
override: false
- name: "bolt_cert_file"
value: "/etc/memgraph/ssl/cert.pem"
override: false
- name: "bolt_key_file"
value: "/etc/memgraph/ssl/key.pem"
override: false
- name: "storage_properties_on_edges"
value: "true"
override: true

File diff suppressed because it is too large Load Diff

View File

@ -1179,16 +1179,15 @@ def read_proc(func: typing.Callable[..., Record]):
"""
Register `func` as a read-only procedure of the current module.
`read_proc` is meant to be used as a decorator function to register module
procedures. The registered `func` needs to be a callable which optionally
takes `ProcCtx` as the first argument. Other arguments of `func` will be
bound to values passed in the cypherQuery. The full signature of `func`
needs to be annotated with types. The return type must be
`Record(field_name=type, ...)` and the procedure must produce either a
complete Record or None. To mark a field as deprecated, use
`Record(field_name=Deprecated(type), ...)`. Multiple records can be
produced by returning an iterable of them. Registering generator functions
is currently not supported.
The decorator `read_proc` is meant to be used to register module procedures.
The registered `func` needs to be a callable which optionally takes
`ProcCtx` as its first argument. Other arguments of `func` will be bound to
values passed in the cypherQuery. The full signature of `func` needs to be
annotated with types. The return type must be `Record(field_name=type, ...)`
and the procedure must produce either a complete Record or None. To mark a
field as deprecated, use `Record(field_name=Deprecated(type), ...)`.
Multiple records can be produced by returning an iterable of them.
Registering generator functions is currently not supported.
Example usage.
@ -1222,16 +1221,16 @@ def write_proc(func: typing.Callable[..., Record]):
"""
Register `func` as a writeable procedure of the current module.
`write_proc` is meant to be used as a decorator function to register module
The decorator `write_proc` is meant to be used to register module
procedures. The registered `func` needs to be a callable which optionally
takes `ProcCtx` as the first argument. Other arguments of `func` will be
bound to values passed in the cypherQuery. The full signature of `func`
needs to be annotated with types. The return type must be
`Record(field_name=type, ...)` and the procedure must produce either a
complete Record or None. To mark a field as deprecated, use
`Record(field_name=Deprecated(type), ...)`. Multiple records can be
produced by returning an iterable of them. Registering generator functions
is currently not supported.
`Record(field_name=Deprecated(type), ...)`. Multiple records can be produced
by returning an iterable of them. Registering generator functions is
currently not supported.
Example usage.
@ -1459,8 +1458,9 @@ def transformation(func: typing.Callable[..., Record]):
class FuncCtx:
"""Context of a function being executed.
Access to a FuncCtx is only valid during a single execution of a transformation.
You should not globally store a FuncCtx instance.
Access to a FuncCtx is only valid during a single execution of a function in
a query. You should not globally store a FuncCtx instance. The graph object
within the FuncCtx is not mutable.
"""
__slots__ = "_graph"
@ -1475,6 +1475,45 @@ class FuncCtx:
def function(func: typing.Callable):
"""
Register `func` as a user-defined function in the current module.
The decorator `function` is meant to be used to register module functions.
The registered `func` needs to be a callable which optionally takes
`FuncCtx` as its first argument. Other arguments of `func` will be bound to
values passed in the Cypher query. Only the funcion arguments need to be
annotated with types. The return type doesn't need to be specified, but it
has to be supported by `mgp.Any`. Registering generator functions is
currently not supported.
Example usage.
```
import mgp
@mgp.function
def func_example(context: mgp.FuncCtx,
required_arg: str,
optional_arg: mgp.Nullable[str] = None
):
return_args = [required_arg]
if optional_arg is not None:
return_args.append(optional_arg)
# Return any kind of result supported by mgp.Any
return return_args
```
The example function above returns a list of provided arguments:
* `required_arg` is always present and its value is the first argument of
the function.
* `optional_arg` is present if the second argument of the function is not
`null`.
Any errors can be reported by raising an Exception.
The function can be invoked in Cypher using the following calls:
RETURN example.func_example("first argument", "second_argument");
RETURN example.func_example("first argument");
Naturally, you may pass in different arguments.
"""
raise_if_does_not_meet_requirements(func)
register_func = _mgp.Module.add_function
sig = inspect.signature(func)

4
init
View File

@ -135,3 +135,7 @@ for hook in $(find $DIR/.githooks -type f -printf "%f\n"); do
ln -s -f "$DIR/.githooks/$hook" "$DIR/.git/hooks/$hook"
echo "Added $hook hook"
done;
# Install precommit hook
python3 -m pip install pre-commit
python3 -m pre_commit install

View File

@ -41,7 +41,7 @@ set(CPACK_DEBIAN_PACKAGE_DESCRIPTION "${CPACK_PACKAGE_DESCRIPTION_SUMMARY}
applications driver by real-time connected data.")
# Add `openssl` package to dependencies list. Used to generate SSL certificates.
# We also depend on `python3` because we embed it in Memgraph.
set(CPACK_DEBIAN_PACKAGE_DEPENDS "openssl (>= 1.1.0), python3 (>= 3.5.0)")
set(CPACK_DEBIAN_PACKAGE_DEPENDS "openssl (>= 1.1.0), python3 (>= 3.5.0), libstdc++6")
# Setting arhitecture extension for rpm packages
set(MG_ARCH_EXTENSION_RPM "noarch")
@ -67,7 +67,7 @@ It aims to deliver developers the speed, simplicity and scale required to build
the next generation of applications driver by real-time connected data.")
# Add `openssl` package to dependencies list. Used to generate SSL certificates.
# We also depend on `python3` because we embed it in Memgraph.
set(CPACK_RPM_PACKAGE_REQUIRES "openssl >= 1.0.0, curl >= 7.29.0, python3 >= 3.5.0")
set(CPACK_RPM_PACKAGE_REQUIRES "openssl >= 1.0.0, curl >= 7.29.0, python3 >= 3.5.0, libstdc >= 6")
# All variables must be set before including.
include(CPack)

View File

@ -246,11 +246,7 @@ class Session final : public std::enable_shared_from_this<Session<TSession, TSes
Session(Session &&) = delete;
Session &operator=(const Session &) = delete;
Session &operator=(Session &&) = delete;
~Session() {
if (IsConnected()) {
spdlog::error("Session: Destructor called while execution is active");
}
}
~Session() = default;
bool Start() {
if (execution_active_) {
@ -400,7 +396,6 @@ class Session final : public std::enable_shared_from_this<Session<TSession, TSes
if (ec == boost::asio::error::operation_aborted) {
return;
}
execution_active_ = false;
if (ec == boost::asio::error::eof) {
spdlog::info("Session closed by peer");

View File

@ -258,6 +258,11 @@ DEFINE_double(query_execution_timeout_sec, 600,
"Maximum allowed query execution time. Queries exceeding this "
"limit will be aborted. Value of 0 means no limit.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint64(replication_replica_check_frequency_sec, 1,
"The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: "
"The MAIN instance allocates a new thread for each REPLICA.");
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint64(
memory_limit, 0,
@ -1076,6 +1081,22 @@ int main(int argc, char **argv) {
if (maybe_exc) {
spdlog::error(memgraph::utils::MessageWithLink("Unable to load support for embedded Python: {}.", *maybe_exc,
"https://memgr.ph/python"));
} else {
// Change how we load dynamic libraries on Python by using RTLD_NOW and
// RTLD_DEEPBIND flags. This solves an issue with using the wrong version of
// libstd.
auto gil = memgraph::py::EnsureGIL();
// NOLINTNEXTLINE(hicpp-signed-bitwise)
auto *flag = PyLong_FromLong(RTLD_NOW | RTLD_DEEPBIND);
auto *setdl = PySys_GetObject("setdlopenflags");
MG_ASSERT(setdl);
auto *arg = PyTuple_New(1);
MG_ASSERT(arg);
MG_ASSERT(PyTuple_SetItem(arg, 0, flag) == 0);
PyObject_CallObject(setdl, arg);
Py_DECREF(flag);
Py_DECREF(setdl);
Py_DECREF(arg);
}
} else {
spdlog::error(
@ -1206,6 +1227,7 @@ int main(int argc, char **argv) {
&db,
{.query = {.allow_load_csv = FLAGS_allow_load_csv},
.execution_timeout_sec = FLAGS_query_execution_timeout_sec,
.replication_replica_check_frequency = std::chrono::seconds(FLAGS_replication_replica_check_frequency_sec),
.default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers,
.default_pulsar_service_url = FLAGS_pulsar_service_url,
.stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries,

View File

@ -21,6 +21,8 @@ struct InterpreterConfig {
// The default execution timeout is 10 minutes.
double execution_timeout_sec{600.0};
// The same as \ref memgraph::storage::replication::ReplicationClientConfig
std::chrono::seconds replication_replica_check_frequency{1};
std::string default_kafka_bootstrap_servers;
std::string default_pulsar_service_url;

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -160,7 +160,8 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout) override {
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
const std::chrono::seconds replica_check_frequency) override {
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
// replica can't register another replica
throw QueryRuntimeException("Replica can't register another replica!");
@ -182,8 +183,9 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, query::kDefaultReplicationPort);
if (maybe_ip_and_port) {
auto [ip, port] = *maybe_ip_and_port;
auto ret =
db_->RegisterReplica(name, {std::move(ip), port}, repl_mode, {.timeout = timeout, .ssl = std::nullopt});
auto ret = db_->RegisterReplica(
name, {std::move(ip), port}, repl_mode,
{.timeout = timeout, .replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
if (ret.HasError()) {
throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name));
}
@ -448,7 +450,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
return callback;
}
case ReplicationQuery::Action::SHOW_REPLICATION_ROLE: {
callback.header = {"replication mode"};
callback.header = {"replication role"};
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}] {
auto mode = handler.ShowReplicationRole();
switch (mode) {
@ -467,6 +469,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
const auto &sync_mode = repl_query->sync_mode_;
auto socket_address = repl_query->socket_address_->Accept(evaluator);
auto timeout = EvaluateOptionalExpression(repl_query->timeout_, &evaluator);
const auto replica_check_frequency = interpreter_context->config.replication_replica_check_frequency;
std::optional<double> maybe_timeout;
if (timeout.IsDouble()) {
maybe_timeout = timeout.ValueDouble();
@ -474,8 +477,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
maybe_timeout = static_cast<double>(timeout.ValueInt());
}
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode,
maybe_timeout]() mutable {
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout);
maybe_timeout, replica_check_frequency]() mutable {
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout,
replica_check_frequency);
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::REGISTER_REPLICA,
@ -512,7 +516,6 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
typed_replica.emplace_back(TypedValue("async"));
break;
}
typed_replica.emplace_back(TypedValue(static_cast<int64_t>(replica.sync_mode)));
if (replica.timeout) {
typed_replica.emplace_back(TypedValue(*replica.timeout));
} else {

View File

@ -137,7 +137,8 @@ class ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
virtual void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout) = 0;
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
const std::chrono::seconds replica_check_frequency) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void DropReplica(const std::string &replica_name) = 0;

View File

@ -24,7 +24,7 @@ MgpUniquePtr<mgp_value> GetStringValueOrSetError(const char *string, mgp_memory
}
bool InsertResultOrSetError(mgp_result *result, mgp_result_record *record, const char *result_name, mgp_value *value) {
if (const auto err = mgp_result_record_insert(record, result_name, value); err != MGP_ERROR_NO_ERROR) {
if (const auto err = mgp_result_record_insert(record, result_name, value); err != mgp_error::MGP_ERROR_NO_ERROR) {
const auto error_msg = fmt::format("Unable to set the result for {}, error = {}", result_name, err);
static_cast<void>(mgp_result_set_error_msg(result, error_msg.c_str()));
return false;

View File

@ -25,7 +25,7 @@ TResult Call(TFunc func, TArgs... args) {
static_assert(std::is_trivially_copyable_v<TFunc>);
static_assert((std::is_trivially_copyable_v<std::remove_reference_t<TArgs>> && ...));
TResult result{};
MG_ASSERT(func(args..., &result) == MGP_ERROR_NO_ERROR);
MG_ASSERT(func(args..., &result) == mgp_error::MGP_ERROR_NO_ERROR);
return result;
}
@ -50,10 +50,10 @@ mgp_error CreateMgpObject(MgpUniquePtr<TObj> &obj, TFunc func, TArgs &&...args)
template <typename Fun>
[[nodiscard]] bool TryOrSetError(Fun &&func, mgp_result *result) {
if (const auto err = func(); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = func(); err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return false;
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
const auto error_msg = fmt::format("Unexpected error ({})!", err);
static_cast<void>(mgp_result_set_error_msg(result, error_msg.c_str()));
return false;

View File

@ -154,48 +154,48 @@ template <typename TFunc, typename... Args>
WrapExceptionsHelper(std::forward<TFunc>(func), std::forward<Args>(args)...);
} catch (const DeletedObjectException &neoe) {
spdlog::error("Deleted object error during mg API call: {}", neoe.what());
return MGP_ERROR_DELETED_OBJECT;
return mgp_error::MGP_ERROR_DELETED_OBJECT;
} catch (const KeyAlreadyExistsException &kaee) {
spdlog::error("Key already exists error during mg API call: {}", kaee.what());
return MGP_ERROR_KEY_ALREADY_EXISTS;
return mgp_error::MGP_ERROR_KEY_ALREADY_EXISTS;
} catch (const InsufficientBufferException &ibe) {
spdlog::error("Insufficient buffer error during mg API call: {}", ibe.what());
return MGP_ERROR_INSUFFICIENT_BUFFER;
return mgp_error::MGP_ERROR_INSUFFICIENT_BUFFER;
} catch (const ImmutableObjectException &ioe) {
spdlog::error("Immutable object error during mg API call: {}", ioe.what());
return MGP_ERROR_IMMUTABLE_OBJECT;
return mgp_error::MGP_ERROR_IMMUTABLE_OBJECT;
} catch (const ValueConversionException &vce) {
spdlog::error("Value converion error during mg API call: {}", vce.what());
return MGP_ERROR_VALUE_CONVERSION;
return mgp_error::MGP_ERROR_VALUE_CONVERSION;
} catch (const SerializationException &se) {
spdlog::error("Serialization error during mg API call: {}", se.what());
return MGP_ERROR_SERIALIZATION_ERROR;
return mgp_error::MGP_ERROR_SERIALIZATION_ERROR;
} catch (const std::bad_alloc &bae) {
spdlog::error("Memory allocation error during mg API call: {}", bae.what());
return MGP_ERROR_UNABLE_TO_ALLOCATE;
return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE;
} catch (const memgraph::utils::OutOfMemoryException &oome) {
spdlog::error("Memory limit exceeded during mg API call: {}", oome.what());
return MGP_ERROR_UNABLE_TO_ALLOCATE;
return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE;
} catch (const std::out_of_range &oore) {
spdlog::error("Out of range error during mg API call: {}", oore.what());
return MGP_ERROR_OUT_OF_RANGE;
return mgp_error::MGP_ERROR_OUT_OF_RANGE;
} catch (const std::invalid_argument &iae) {
spdlog::error("Invalid argument error during mg API call: {}", iae.what());
return MGP_ERROR_INVALID_ARGUMENT;
return mgp_error::MGP_ERROR_INVALID_ARGUMENT;
} catch (const std::logic_error &lee) {
spdlog::error("Logic error during mg API call: {}", lee.what());
return MGP_ERROR_LOGIC_ERROR;
return mgp_error::MGP_ERROR_LOGIC_ERROR;
} catch (const std::exception &e) {
spdlog::error("Unexpected error during mg API call: {}", e.what());
return MGP_ERROR_UNKNOWN_ERROR;
return mgp_error::MGP_ERROR_UNKNOWN_ERROR;
} catch (const memgraph::utils::temporal::InvalidArgumentException &e) {
spdlog::error("Invalid argument was sent to an mg API call for temporal types: {}", e.what());
return MGP_ERROR_INVALID_ARGUMENT;
return mgp_error::MGP_ERROR_INVALID_ARGUMENT;
} catch (...) {
spdlog::error("Unexpected error during mg API call");
return MGP_ERROR_UNKNOWN_ERROR;
return mgp_error::MGP_ERROR_UNKNOWN_ERROR;
}
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
// Graph mutations
@ -856,7 +856,7 @@ mgp_value_type MgpValueGetType(const mgp_value &val) noexcept { return val.type;
mgp_error mgp_value_get_type(mgp_value *val, mgp_value_type *result) {
static_assert(noexcept(MgpValueGetType(*val)));
*result = MgpValueGetType(*val);
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
@ -864,7 +864,7 @@ mgp_error mgp_value_get_type(mgp_value *val, mgp_value_type *result) {
mgp_error mgp_value_is_##type_lowercase(mgp_value *val, int *result) { \
static_assert(noexcept(MgpValueGetType(*val))); \
*result = MgpValueGetType(*val) == MGP_VALUE_TYPE_##type_uppercase; \
return MGP_ERROR_NO_ERROR; \
return mgp_error::MGP_ERROR_NO_ERROR; \
}
DEFINE_MGP_VALUE_IS(null, NULL)
@ -884,27 +884,27 @@ DEFINE_MGP_VALUE_IS(duration, DURATION)
mgp_error mgp_value_get_bool(mgp_value *val, int *result) {
*result = val->bool_v ? 1 : 0;
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_value_get_int(mgp_value *val, int64_t *result) {
*result = val->int_v;
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_value_get_double(mgp_value *val, double *result) {
*result = val->double_v;
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_value_get_string(mgp_value *val, const char **result) {
static_assert(noexcept(val->string_v.c_str()));
*result = val->string_v.c_str();
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define DEFINE_MGP_VALUE_GET(type) \
mgp_error mgp_value_get_##type(mgp_value *val, mgp_##type **result) { \
*result = val->type##_v; \
return MGP_ERROR_NO_ERROR; \
return mgp_error::MGP_ERROR_NO_ERROR; \
}
DEFINE_MGP_VALUE_GET(list)
@ -950,13 +950,13 @@ mgp_error mgp_list_append_extend(mgp_list *list, mgp_value *val) {
mgp_error mgp_list_size(mgp_list *list, size_t *result) {
static_assert(noexcept(list->elems.size()));
*result = list->elems.size();
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_list_capacity(mgp_list *list, size_t *result) {
static_assert(noexcept(list->elems.capacity()));
*result = list->elems.capacity();
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_list_at(mgp_list *list, size_t i, mgp_value **result) {
@ -988,7 +988,7 @@ mgp_error mgp_map_insert(mgp_map *map, const char *key, mgp_value *value) {
mgp_error mgp_map_size(mgp_map *map, size_t *result) {
static_assert(noexcept(map->items.size()));
*result = map->items.size();
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_map_at(mgp_map *map, const char *key, mgp_value **result) {
@ -1099,7 +1099,7 @@ size_t MgpPathSize(const mgp_path &path) noexcept { return path.edges.size(); }
mgp_error mgp_path_size(mgp_path *path, size_t *result) {
*result = MgpPathSize(*path);
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_path_vertex_at(mgp_path *path, size_t i, mgp_vertex **result) {
@ -1698,7 +1698,7 @@ void mgp_vertex_destroy(mgp_vertex *v) { DeleteRawMgpObject(v); }
mgp_error mgp_vertex_equal(mgp_vertex *v1, mgp_vertex *v2, int *result) {
MG_EXECUTE_NOEXCEPT(*result = *v1 == *v2 ? 1 : 0);
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_vertex_labels_count(mgp_vertex *v, size_t *result) {
@ -1956,7 +1956,7 @@ void mgp_edge_destroy(mgp_edge *e) { DeleteRawMgpObject(e); }
mgp_error mgp_edge_equal(mgp_edge *e1, mgp_edge *e2, int *result) {
MG_EXECUTE_NOEXCEPT(*result = *e1 == *e2 ? 1 : 0);
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_edge_get_type(mgp_edge *e, mgp_edge_type *result) {
@ -1973,12 +1973,12 @@ mgp_error mgp_edge_get_type(mgp_edge *e, mgp_edge_type *result) {
mgp_error mgp_edge_get_from(mgp_edge *e, mgp_vertex **result) {
*result = &e->from;
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_edge_get_to(mgp_edge *e, mgp_vertex **result) {
*result = &e->to;
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_edge_get_property(mgp_edge *e, const char *name, mgp_memory *memory, mgp_value **result) {
@ -2088,7 +2088,7 @@ mgp_error mgp_graph_get_vertex_by_id(mgp_graph *graph, mgp_vertex_id id, mgp_mem
mgp_error mgp_graph_is_mutable(mgp_graph *graph, int *result) {
*result = MgpGraphIsMutable(*graph) ? 1 : 0;
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
};
mgp_error mgp_graph_create_vertex(struct mgp_graph *graph, mgp_memory *memory, mgp_vertex **result) {
@ -2513,7 +2513,7 @@ mgp_error mgp_proc_add_result(mgp_proc *proc, const char *name, mgp_type *type)
mgp_error MgpTransAddFixedResult(mgp_trans *trans) noexcept {
if (const auto err = AddResultToProp(trans, "query", Call<mgp_type *>(mgp_type_string), false);
err != MGP_ERROR_NO_ERROR) {
err != mgp_error::MGP_ERROR_NO_ERROR) {
return err;
}
return AddResultToProp(trans, "parameters", Call<mgp_type *>(mgp_type_nullable, Call<mgp_type *>(mgp_type_map)),
@ -2760,7 +2760,7 @@ mgp_error mgp_message_offset(struct mgp_message *message, int64_t *result) {
mgp_error mgp_messages_size(mgp_messages *messages, size_t *result) {
static_assert(noexcept(messages->messages.size()));
*result = messages->messages.size();
return MGP_ERROR_NO_ERROR;
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_messages_at(mgp_messages *messages, size_t index, mgp_message **result) {

View File

@ -121,18 +121,18 @@ void RegisterMgLoad(ModuleRegistry *module_registry, utils::RWLock *lock, Builti
bool succ = false;
WithUpgradedLock(lock, [&]() {
const char *arg_as_string{nullptr};
if (const auto err = mgp_value_get_string(arg, &arg_as_string); err != MGP_ERROR_NO_ERROR) {
if (const auto err = mgp_value_get_string(arg, &arg_as_string); err != mgp_error::MGP_ERROR_NO_ERROR) {
succ = false;
} else {
succ = module_registry->LoadOrReloadModuleFromName(arg_as_string);
}
});
if (!succ) {
MG_ASSERT(mgp_result_set_error_msg(result, "Failed to (re)load the module.") == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_result_set_error_msg(result, "Failed to (re)load the module.") == mgp_error::MGP_ERROR_NO_ERROR);
}
};
mgp_proc load("load", load_cb, utils::NewDeleteResource());
MG_ASSERT(mgp_proc_add_arg(&load, "module_name", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&load, "module_name", Call<mgp_type *>(mgp_type_string)) == mgp_error::MGP_ERROR_NO_ERROR);
module->AddProcedure("load", std::move(load));
}
@ -235,11 +235,16 @@ void RegisterMgProcedures(
}
};
mgp_proc procedures("procedures", procedures_cb, utils::NewDeleteResource());
MG_ASSERT(mgp_proc_add_result(&procedures, "name", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "signature", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "is_write", Call<mgp_type *>(mgp_type_bool)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "is_editable", Call<mgp_type *>(mgp_type_bool)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "name", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "signature", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "is_write", Call<mgp_type *>(mgp_type_bool)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "path", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "is_editable", Call<mgp_type *>(mgp_type_bool)) ==
mgp_error::MGP_ERROR_NO_ERROR);
module->AddProcedure("procedures", std::move(procedures));
}
@ -298,9 +303,12 @@ void RegisterMgTransformations(const std::map<std::string, std::unique_ptr<Modul
}
};
mgp_proc procedures("transformations", transformations_cb, utils::NewDeleteResource());
MG_ASSERT(mgp_proc_add_result(&procedures, "name", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "is_editable", Call<mgp_type *>(mgp_type_bool)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "name", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "path", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "is_editable", Call<mgp_type *>(mgp_type_bool)) ==
mgp_error::MGP_ERROR_NO_ERROR);
module->AddProcedure("transformations", std::move(procedures));
}
@ -374,10 +382,14 @@ void RegisterMgFunctions(
}
};
mgp_proc functions("functions", functions_cb, utils::NewDeleteResource());
MG_ASSERT(mgp_proc_add_result(&functions, "name", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&functions, "signature", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&functions, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&functions, "is_editable", Call<mgp_type *>(mgp_type_bool)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&functions, "name", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&functions, "signature", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&functions, "path", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&functions, "is_editable", Call<mgp_type *>(mgp_type_bool)) ==
mgp_error::MGP_ERROR_NO_ERROR);
module->AddProcedure("functions", std::move(functions));
}
namespace {
@ -469,9 +481,10 @@ void RegisterMgGetModuleFiles(ModuleRegistry *module_registry, BuiltinModule *mo
mgp_proc get_module_files("get_module_files", get_module_files_cb, utils::NewDeleteResource(),
{.required_privilege = AuthQuery::Privilege::MODULE_READ});
MG_ASSERT(mgp_proc_add_result(&get_module_files, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&get_module_files, "path", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&get_module_files, "is_editable", Call<mgp_type *>(mgp_type_bool)) ==
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
module->AddProcedure("get_module_files", std::move(get_module_files));
}
@ -530,8 +543,10 @@ void RegisterMgGetModuleFile(ModuleRegistry *module_registry, BuiltinModule *mod
};
mgp_proc get_module_file("get_module_file", std::move(get_module_file_cb), utils::NewDeleteResource(),
{.required_privilege = AuthQuery::Privilege::MODULE_READ});
MG_ASSERT(mgp_proc_add_arg(&get_module_file, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&get_module_file, "content", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&get_module_file, "path", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&get_module_file, "content", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
module->AddProcedure("get_module_file", std::move(get_module_file));
}
@ -609,9 +624,12 @@ void RegisterMgCreateModuleFile(ModuleRegistry *module_registry, utils::RWLock *
};
mgp_proc create_module_file("create_module_file", std::move(create_module_file_cb), utils::NewDeleteResource(),
{.required_privilege = AuthQuery::Privilege::MODULE_WRITE});
MG_ASSERT(mgp_proc_add_arg(&create_module_file, "filename", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&create_module_file, "content", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&create_module_file, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&create_module_file, "filename", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&create_module_file, "content", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&create_module_file, "path", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
module->AddProcedure("create_module_file", std::move(create_module_file));
}
@ -664,8 +682,10 @@ void RegisterMgUpdateModuleFile(ModuleRegistry *module_registry, utils::RWLock *
};
mgp_proc update_module_file("update_module_file", std::move(update_module_file_cb), utils::NewDeleteResource(),
{.required_privilege = AuthQuery::Privilege::MODULE_WRITE});
MG_ASSERT(mgp_proc_add_arg(&update_module_file, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&update_module_file, "content", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&update_module_file, "path", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&update_module_file, "content", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
module->AddProcedure("update_module_file", std::move(update_module_file));
}
@ -721,7 +741,8 @@ void RegisterMgDeleteModuleFile(ModuleRegistry *module_registry, utils::RWLock *
};
mgp_proc delete_module_file("delete_module_file", std::move(delete_module_file_cb), utils::NewDeleteResource(),
{.required_privilege = AuthQuery::Privilege::MODULE_WRITE});
MG_ASSERT(mgp_proc_add_arg(&delete_module_file, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&delete_module_file, "path", Call<mgp_type *>(mgp_type_string)) ==
mgp_error::MGP_ERROR_NO_ERROR);
module->AddProcedure("delete_module_file", std::move(delete_module_file));
}
@ -801,7 +822,8 @@ bool SharedLibraryModule::Load(const std::filesystem::path &file_path) {
spdlog::info("Loading module {}...", file_path);
file_path_ = file_path;
dlerror(); // Clear any existing error.
handle_ = dlopen(file_path.c_str(), RTLD_NOW | RTLD_LOCAL);
// NOLINTNEXTLINE(hicpp-signed-bitwise)
handle_ = dlopen(file_path.c_str(), RTLD_NOW | RTLD_LOCAL | RTLD_DEEPBIND);
if (!handle_) {
spdlog::error(
utils::MessageWithLink("Unable to load module {}; {}.", file_path, dlerror(), "https://memgr.ph/modules"));
@ -832,7 +854,7 @@ bool SharedLibraryModule::Load(const std::filesystem::path &file_path) {
return with_error(error);
}
for (auto &trans : module_def->transformations) {
const bool success = MGP_ERROR_NO_ERROR == MgpTransAddFixedResult(&trans.second);
const bool success = mgp_error::MGP_ERROR_NO_ERROR == MgpTransAddFixedResult(&trans.second);
if (!success) {
const auto error =
fmt::format("Unable to add result to transformation in module {}; add result failed", file_path);
@ -941,7 +963,7 @@ bool PythonModule::Load(const std::filesystem::path &file_path) {
auto module_cb = [&](auto *module_def, auto * /*memory*/) {
auto result = ImportPyModule(file_path.stem().c_str(), module_def);
for (auto &trans : module_def->transformations) {
succ = MgpTransAddFixedResult(&trans.second) == MGP_ERROR_NO_ERROR;
succ = MgpTransAddFixedResult(&trans.second) == mgp_error::MGP_ERROR_NO_ERROR;
if (!succ) {
return result;
}

View File

@ -13,6 +13,7 @@
/// API for loading and registering modules providing custom oC procedures
#pragma once
#include <dlfcn.h>
#include <filesystem>
#include <functional>
#include <optional>
@ -128,6 +129,40 @@ class ModuleRegistry final {
const std::filesystem::path &InternalModuleDir() const noexcept;
private:
class SharedLibraryHandle {
public:
SharedLibraryHandle(const std::string &shared_library, int mode) : handle_{dlopen(shared_library.c_str(), mode)} {}
SharedLibraryHandle(const SharedLibraryHandle &) = delete;
SharedLibraryHandle(SharedLibraryHandle &&) = delete;
SharedLibraryHandle operator=(const SharedLibraryHandle &) = delete;
SharedLibraryHandle operator=(SharedLibraryHandle &&) = delete;
~SharedLibraryHandle() {
if (handle_) {
dlclose(handle_);
}
}
private:
void *handle_;
};
#if __has_feature(address_sanitizer)
// This is why we need RTLD_NODELETE and we must not use RTLD_DEEPBIND with
// ASAN: https://github.com/google/sanitizers/issues/89
SharedLibraryHandle libstd_handle{"libstdc++.so.6", RTLD_NOW | RTLD_LOCAL | RTLD_NODELETE};
#else
// The reason behind opening share library during runtime is to avoid issues
// with loading symbols from stdlib. We have encounter issues with locale
// that cause std::cout not being printed and issues when python libraries
// would call stdlib (e.g. pytorch).
// The way that those issues were solved was
// by using RTLD_DEEPBIND. RTLD_DEEPBIND ensures that the lookup for the
// mentioned library will be first performed in the already existing binded
// libraries and then the global namespace.
// RTLD_DEEPBIND => https://linux.die.net/man/3/dlopen
SharedLibraryHandle libstd_handle{"libstdc++.so.6", RTLD_NOW | RTLD_LOCAL | RTLD_DEEPBIND};
#endif
std::vector<std::filesystem::path> modules_dirs_;
std::filesystem::path internal_module_dir_;
};

View File

@ -55,49 +55,49 @@ PyObject *gMgpSerializationError{nullptr}; // NOLINT(cppcoreguidelines-avo
// Returns true if an exception is raised
bool RaiseExceptionFromErrorCode(const mgp_error error) {
switch (error) {
case MGP_ERROR_NO_ERROR:
case mgp_error::MGP_ERROR_NO_ERROR:
return false;
case MGP_ERROR_UNKNOWN_ERROR: {
case mgp_error::MGP_ERROR_UNKNOWN_ERROR: {
PyErr_SetString(gMgpUnknownError, "Unknown error happened.");
return true;
}
case MGP_ERROR_UNABLE_TO_ALLOCATE: {
case mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE: {
PyErr_SetString(gMgpUnableToAllocateError, "Unable to allocate memory.");
return true;
}
case MGP_ERROR_INSUFFICIENT_BUFFER: {
case mgp_error::MGP_ERROR_INSUFFICIENT_BUFFER: {
PyErr_SetString(gMgpInsufficientBufferError, "Insufficient buffer.");
return true;
}
case MGP_ERROR_OUT_OF_RANGE: {
case mgp_error::MGP_ERROR_OUT_OF_RANGE: {
PyErr_SetString(gMgpOutOfRangeError, "Out of range.");
return true;
}
case MGP_ERROR_LOGIC_ERROR: {
case mgp_error::MGP_ERROR_LOGIC_ERROR: {
PyErr_SetString(gMgpLogicErrorError, "Logic error.");
return true;
}
case MGP_ERROR_DELETED_OBJECT: {
case mgp_error::MGP_ERROR_DELETED_OBJECT: {
PyErr_SetString(gMgpDeletedObjectError, "Accessing deleted object.");
return true;
}
case MGP_ERROR_INVALID_ARGUMENT: {
case mgp_error::MGP_ERROR_INVALID_ARGUMENT: {
PyErr_SetString(gMgpInvalidArgumentError, "Invalid argument.");
return true;
}
case MGP_ERROR_KEY_ALREADY_EXISTS: {
case mgp_error::MGP_ERROR_KEY_ALREADY_EXISTS: {
PyErr_SetString(gMgpKeyAlreadyExistsError, "Key already exists.");
return true;
}
case MGP_ERROR_IMMUTABLE_OBJECT: {
case mgp_error::MGP_ERROR_IMMUTABLE_OBJECT: {
PyErr_SetString(gMgpImmutableObjectError, "Cannot modify immutable object.");
return true;
}
case MGP_ERROR_VALUE_CONVERSION: {
case mgp_error::MGP_ERROR_VALUE_CONVERSION: {
PyErr_SetString(gMgpValueConversionError, "Value conversion failed.");
return true;
}
case MGP_ERROR_SERIALIZATION_ERROR: {
case mgp_error::MGP_ERROR_SERIALIZATION_ERROR: {
PyErr_SetString(gMgpSerializationError, "Operation cannot be serialized.");
return true;
}
@ -902,7 +902,7 @@ std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Obj
if (field_val == nullptr) {
return py::FetchError();
}
if (mgp_result_record_insert(record, field_name, field_val) != MGP_ERROR_NO_ERROR) {
if (mgp_result_record_insert(record, field_name, field_val) != mgp_error::MGP_ERROR_NO_ERROR) {
std::stringstream ss;
ss << "Unable to insert field '" << py::Object::FromBorrow(key) << "' with value: '"
<< py::Object::FromBorrow(val) << "'; did you set the correct field type?";
@ -2281,9 +2281,10 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
auto py_seq_to_list = [memory](PyObject *seq, Py_ssize_t len, const auto &py_seq_get_item) {
static_assert(std::numeric_limits<Py_ssize_t>::max() <= std::numeric_limits<size_t>::max());
MgpUniquePtr<mgp_list> list{nullptr, &mgp_list_destroy};
if (const auto err = CreateMgpObject(list, mgp_list_make_empty, len, memory); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = CreateMgpObject(list, mgp_list_make_empty, len, memory);
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during making mgp_list"};
}
for (Py_ssize_t i = 0; i < len; ++i) {
@ -2292,17 +2293,17 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
v = PyObjectToMgpValue(e, memory);
const auto err = mgp_list_append(list.get(), v);
mgp_value_destroy(v);
if (err != MGP_ERROR_NO_ERROR) {
if (err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (err != mgp_error::MGP_ERROR_NO_ERROR) {
if (err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
}
throw std::runtime_error{"Unexpected error during appending to mgp_list"};
}
}
mgp_value *v{nullptr};
if (const auto err = mgp_value_make_list(list.get(), &v); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = mgp_value_make_list(list.get(), &v); err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during making mgp_value"};
}
static_cast<void>(list.release());
@ -2334,7 +2335,7 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
};
mgp_value *mgp_v{nullptr};
mgp_error last_error{MGP_ERROR_NO_ERROR};
mgp_error last_error{mgp_error::MGP_ERROR_NO_ERROR};
if (o == Py_None) {
last_error = mgp_value_make_null(memory, &mgp_v);
@ -2360,10 +2361,10 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
MgpUniquePtr<mgp_map> map{nullptr, mgp_map_destroy};
const auto map_err = CreateMgpObject(map, mgp_map_make_empty, memory);
if (map_err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (map_err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
}
if (map_err != MGP_ERROR_NO_ERROR) {
if (map_err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during creating mgp_map"};
}
@ -2384,16 +2385,16 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
MgpUniquePtr<mgp_value> v{PyObjectToMgpValue(value, memory), mgp_value_destroy};
if (const auto err = mgp_map_insert(map.get(), k, v.get()); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = mgp_map_insert(map.get(), k, v.get()); err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during inserting an item to mgp_map"};
}
}
if (const auto err = mgp_value_make_map(map.get(), &mgp_v); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = mgp_value_make_map(map.get(), &mgp_v); err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during creating mgp_value"};
}
static_cast<void>(map.release());
@ -2402,14 +2403,14 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
// Copy the edge and pass the ownership to the created mgp_value.
if (const auto err = CreateMgpObject(e, mgp_edge_copy, reinterpret_cast<PyEdge *>(o)->edge, memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during copying mgp_edge"};
}
if (const auto err = mgp_value_make_edge(e.get(), &mgp_v); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = mgp_value_make_edge(e.get(), &mgp_v); err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during copying mgp_edge"};
}
static_cast<void>(e.release());
@ -2418,14 +2419,14 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
// Copy the edge and pass the ownership to the created mgp_value.
if (const auto err = CreateMgpObject(p, mgp_path_copy, reinterpret_cast<PyPath *>(o)->path, memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during copying mgp_path"};
}
if (const auto err = mgp_value_make_path(p.get(), &mgp_v); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = mgp_value_make_path(p.get(), &mgp_v); err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during copying mgp_path"};
}
static_cast<void>(p.release());
@ -2434,14 +2435,14 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
// Copy the edge and pass the ownership to the created mgp_value.
if (const auto err = CreateMgpObject(v, mgp_vertex_copy, reinterpret_cast<PyVertex *>(o)->vertex, memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during copying mgp_vertex"};
}
if (const auto err = mgp_value_make_vertex(v.get(), &mgp_v); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = mgp_value_make_vertex(v.get(), &mgp_v); err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error during copying mgp_vertex"};
}
static_cast<void>(v.release());
@ -2474,14 +2475,14 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
MgpUniquePtr<mgp_date> date{nullptr, mgp_date_destroy};
if (const auto err = CreateMgpObject(date, mgp_date_from_parameters, &parameters, memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_date"};
}
if (const auto err = mgp_value_make_date(date.get(), &mgp_v); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = mgp_value_make_date(date.get(), &mgp_v); err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_value"};
}
static_cast<void>(date.release());
@ -2499,14 +2500,15 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
MgpUniquePtr<mgp_local_time> local_time{nullptr, mgp_local_time_destroy};
if (const auto err = CreateMgpObject(local_time, mgp_local_time_from_parameters, &parameters, memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_local_time"};
}
if (const auto err = mgp_value_make_local_time(local_time.get(), &mgp_v); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = mgp_value_make_local_time(local_time.get(), &mgp_v);
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_value"};
}
static_cast<void>(local_time.release());
@ -2531,15 +2533,15 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
MgpUniquePtr<mgp_local_date_time> local_date_time{nullptr, mgp_local_date_time_destroy};
if (const auto err = CreateMgpObject(local_date_time, mgp_local_date_time_from_parameters, &parameters, memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_local_date_time"};
}
if (const auto err = mgp_value_make_local_date_time(local_date_time.get(), &mgp_v);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_value"};
}
static_cast<void>(local_date_time.release());
@ -2558,14 +2560,15 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
MgpUniquePtr<mgp_duration> duration{nullptr, mgp_duration_destroy};
if (const auto err = CreateMgpObject(duration, mgp_duration_from_microseconds, microseconds, memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_duration"};
}
if (const auto err = mgp_value_make_duration(duration.get(), &mgp_v); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (const auto err = mgp_value_make_duration(duration.get(), &mgp_v);
err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
} else if (err != MGP_ERROR_NO_ERROR) {
} else if (err != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_value"};
}
static_cast<void>(duration.release());
@ -2573,10 +2576,10 @@ mgp_value *PyObjectToMgpValue(PyObject *o, mgp_memory *memory) {
throw std::invalid_argument("Unsupported PyObject conversion");
}
if (last_error == MGP_ERROR_UNABLE_TO_ALLOCATE) {
if (last_error == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
throw std::bad_alloc{};
}
if (last_error != MGP_ERROR_NO_ERROR) {
if (last_error != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error{"Unexpected error while creating mgp_value"};
}

View File

@ -181,25 +181,27 @@ void Streams::RegisterKafkaProcedures() {
const auto offset = procedure::Call<int64_t>(mgp_value_get_int, arg_offset);
auto lock_ptr = streams_.Lock();
auto it = GetStream(*lock_ptr, std::string(stream_name));
std::visit(utils::Overloaded{
[&](StreamData<KafkaStream> &kafka_stream) {
auto stream_source_ptr = kafka_stream.stream_source->Lock();
const auto error = stream_source_ptr->SetStreamOffset(offset);
if (error.HasError()) {
MG_ASSERT(mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR,
"Unable to set procedure error message of procedure: {}", proc_name);
}
},
[](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name);
}},
std::visit(utils::Overloaded{[&](StreamData<KafkaStream> &kafka_stream) {
auto stream_source_ptr = kafka_stream.stream_source->Lock();
const auto error = stream_source_ptr->SetStreamOffset(offset);
if (error.HasError()) {
MG_ASSERT(mgp_result_set_error_msg(result, error.GetError().c_str()) ==
mgp_error::MGP_ERROR_NO_ERROR,
"Unable to set procedure error message of procedure: {}", proc_name);
}
},
[](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Kafka stream sources",
proc_name);
}},
it->second);
};
mgp_proc proc(proc_name, set_stream_offset, utils::NewDeleteResource());
MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call<mgp_type *>(mgp_type_string)) ==
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&proc, "offset", procedure::Call<mgp_type *>(mgp_type_int)) == MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&proc, "offset", procedure::Call<mgp_type *>(mgp_type_int)) ==
mgp_error::MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
}
@ -345,19 +347,19 @@ void Streams::RegisterKafkaProcedures() {
mgp_proc proc(proc_name, get_stream_info, utils::NewDeleteResource());
MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call<mgp_type *>(mgp_type_string)) ==
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, consumer_group_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
procedure::Call<mgp_type *>(mgp_type_string)) == mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(
mgp_proc_add_result(&proc, topics_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_list, procedure::Call<mgp_type *>(mgp_type_string))) ==
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, bootstrap_servers_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
procedure::Call<mgp_type *>(mgp_type_string)) == mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, configs_result_name.data(), procedure::Call<mgp_type *>(mgp_type_map)) ==
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, credentials_result_name.data(), procedure::Call<mgp_type *>(mgp_type_map)) ==
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
}
@ -432,14 +434,14 @@ void Streams::RegisterPulsarProcedures() {
mgp_proc proc(proc_name, get_stream_info, utils::NewDeleteResource());
MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call<mgp_type *>(mgp_type_string)) ==
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, service_url_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
procedure::Call<mgp_type *>(mgp_type_string)) == mgp_error::MGP_ERROR_NO_ERROR);
MG_ASSERT(
mgp_proc_add_result(&proc, topics_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_list, procedure::Call<mgp_type *>(mgp_type_string))) ==
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
}

View File

@ -16,6 +16,10 @@
namespace memgraph::storage::replication {
struct ReplicationClientConfig {
std::optional<double> timeout;
// The default delay between main checking/pinging replicas is 1s because
// that seems like a reasonable timeframe in which main should notice a
// replica is down.
std::chrono::seconds replica_check_frequency{1};
struct SSL {
std::string key_file = "";

View File

@ -41,12 +41,49 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage
}
rpc_client_.emplace(endpoint, &*rpc_context_);
TryInitializeClient();
TryInitializeClientSync();
if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) {
timeout_.emplace(*config.timeout);
timeout_dispatcher_.emplace();
}
// Help the user to get the most accurate replica state possible.
if (config.replica_check_frequency > std::chrono::seconds(0)) {
replica_checker_.Run("Replica Checker", config.replica_check_frequency, [&] { FrequentCheck(); });
}
}
void Storage::ReplicationClient::TryInitializeClientAsync() {
thread_pool_.AddTask([this] {
rpc_client_->Abort();
this->TryInitializeClientSync();
});
}
void Storage::ReplicationClient::FrequentCheck() {
const auto is_success = std::invoke([this]() {
try {
auto stream{rpc_client_->Stream<replication::FrequentHeartbeatRpc>()};
const auto response = stream.AwaitResponse();
return response.success;
} catch (const rpc::RpcFailedException &) {
return false;
}
});
// States: READY, REPLICATING, RECOVERY, INVALID
// If success && ready, replicating, recovery -> stay the same because something good is going on.
// If success && INVALID -> [it's possible that replica came back to life] -> TryInitializeClient.
// If fail -> [replica is not reachable at all] -> INVALID state.
// NOTE: TryInitializeClient might return nothing if there is a branching point.
// NOTE: The early return pattern simplified the code, but the behavior should be as explained.
if (!is_success) {
replica_state_.store(replication::ReplicaState::INVALID);
return;
}
if (replica_state_.load() == replication::ReplicaState::INVALID) {
TryInitializeClientAsync();
}
}
/// @throws rpc::RpcFailedException
@ -100,7 +137,7 @@ void Storage::ReplicationClient::InitializeClient() {
}
}
void Storage::ReplicationClient::TryInitializeClient() {
void Storage::ReplicationClient::TryInitializeClientSync() {
try {
InitializeClient();
} catch (const rpc::RpcFailedException &) {
@ -113,10 +150,7 @@ void Storage::ReplicationClient::TryInitializeClient() {
void Storage::ReplicationClient::HandleRpcFailure() {
spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication"));
thread_pool_.AddTask([this] {
rpc_client_->Abort();
this->TryInitializeClient();
});
TryInitializeClientAsync();
}
replication::SnapshotRes Storage::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {

View File

@ -142,16 +142,14 @@ class Storage::ReplicationClient {
std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker);
void FrequentCheck();
void InitializeClient();
void TryInitializeClient();
void TryInitializeClientSync();
void TryInitializeClientAsync();
void HandleRpcFailure();
std::string name_;
Storage *storage_;
std::optional<communication::ClientContext> rpc_context_;
std::optional<rpc::Client> rpc_client_;
@ -198,6 +196,8 @@ class Storage::ReplicationClient {
// to ignore concurrency problems inside the client.
utils::ThreadPool thread_pool_{1};
std::atomic<replication::ReplicaState> replica_state_{replication::ReplicaState::INVALID};
utils::Scheduler replica_checker_;
};
} // namespace memgraph::storage

View File

@ -60,6 +60,10 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End
spdlog::debug("Received HeartbeatRpc");
this->HeartbeatHandler(req_reader, res_builder);
});
rpc_server_->Register<replication::FrequentHeartbeatRpc>([](auto *req_reader, auto *res_builder) {
spdlog::debug("Received FrequentHeartbeatRpc");
FrequentHeartbeatHandler(req_reader, res_builder);
});
rpc_server_->Register<replication::AppendDeltasRpc>([this](auto *req_reader, auto *res_builder) {
spdlog::debug("Received AppendDeltasRpc");
this->AppendDeltasHandler(req_reader, res_builder);
@ -86,6 +90,13 @@ void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::FrequentHeartbeatReq req;
slk::Load(&req, req_reader);
replication::FrequentHeartbeatRes res{true};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::AppendDeltasReq req;
slk::Load(&req, req_reader);

View File

@ -29,6 +29,7 @@ class Storage::ReplicationServer {
private:
// RPC handlers
void HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder);
static void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder);

View File

@ -43,6 +43,12 @@ cpp<#
(current-commit-timestamp :uint64_t)
(epoch-id "std::string"))))
;; FrequentHearthbeat is required because calling Heartbeat takes the storage lock.
;; Configured by `replication_replica_check_delay`.
(lcp:define-rpc frequent-heartbeat
(:request ())
(:response ((success :bool))))
(lcp:define-rpc snapshot
(:request ())
(:response

View File

@ -22,6 +22,7 @@ add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
endfunction()
add_subdirectory(server)
add_subdirectory(replication)
add_subdirectory(memory)
add_subdirectory(triggers)
@ -31,6 +32,8 @@ add_subdirectory(temporal_types)
add_subdirectory(write_procedures)
add_subdirectory(magic_functions)
add_subdirectory(module_file_manager)
add_subdirectory(websocket)
add_subdirectory(monitoring_server)
copy_e2e_python_files(pytest_runner pytest_runner.sh "")
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/memgraph-selfsigned.crt DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/memgraph-selfsigned.key DESTINATION ${CMAKE_CURRENT_BINARY_DIR})

View File

@ -21,13 +21,13 @@ static void ReturnFunctionArgument(struct mgp_list *args, mgp_func_context *ctx,
struct mgp_memory *memory) {
mgp_value *value{nullptr};
auto err_code = mgp_list_at(args, 0, &value);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
mgp_func_result_set_error_msg(result, "Failed to fetch list!", memory);
return;
}
err_code = mgp_func_result_set_value(result, value, memory);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
mgp_func_result_set_error_msg(result, "Failed to construct return value!", memory);
return;
}
@ -37,13 +37,13 @@ static void ReturnOptionalArgument(struct mgp_list *args, mgp_func_context *ctx,
struct mgp_memory *memory) {
mgp_value *value{nullptr};
auto err_code = mgp_list_at(args, 0, &value);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
mgp_func_result_set_error_msg(result, "Failed to fetch list!", memory);
return;
}
err_code = mgp_func_result_set_value(result, value, memory);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
mgp_func_result_set_error_msg(result, "Failed to construct return value!", memory);
return;
}
@ -51,7 +51,7 @@ static void ReturnOptionalArgument(struct mgp_list *args, mgp_func_context *ctx,
double GetElementFromArg(struct mgp_list *args, int index) {
mgp_value *value{nullptr};
if (mgp_list_at(args, index, &value) != MGP_ERROR_NO_ERROR) {
if (mgp_list_at(args, index, &value) != mgp_error::MGP_ERROR_NO_ERROR) {
throw std::runtime_error("Error while argument fetching.");
}
@ -87,7 +87,7 @@ static void AddTwoNumbers(struct mgp_list *args, mgp_func_context *ctx, mgp_func
memgraph::utils::OnScopeExit delete_summation_value([&value] { mgp_value_destroy(value); });
auto err_code = mgp_func_result_set_value(result, value, memory);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
mgp_func_result_set_error_msg(result, "Failed to construct return value!", memory);
}
}
@ -99,7 +99,7 @@ static void ReturnNull(struct mgp_list *args, mgp_func_context *ctx, mgp_func_re
memgraph::utils::OnScopeExit delete_null([&value] { mgp_value_destroy(value); });
auto err_code = mgp_func_result_set_value(result, value, memory);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
mgp_func_result_set_error_msg(result, "Failed to fetch list!", memory);
}
}
@ -111,14 +111,14 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
{
mgp_func *func{nullptr};
auto err_code = mgp_module_add_function(module, "return_function_argument", ReturnFunctionArgument, &func);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
mgp_type *type_any{nullptr};
mgp_type_any(&type_any);
err_code = mgp_func_add_arg(func, "argument", type_any);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
}
@ -126,7 +126,7 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
{
mgp_func *func{nullptr};
auto err_code = mgp_module_add_function(module, "return_optional_argument", ReturnOptionalArgument, &func);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
@ -137,7 +137,7 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
mgp_type *type_int{nullptr};
mgp_type_int(&type_int);
err_code = mgp_func_add_opt_arg(func, "opt_argument", type_int, default_value);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
}
@ -145,18 +145,18 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
{
mgp_func *func{nullptr};
auto err_code = mgp_module_add_function(module, "add_two_numbers", AddTwoNumbers, &func);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
mgp_type *type_number{nullptr};
mgp_type_number(&type_number);
err_code = mgp_func_add_arg(func, "first", type_number);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
err_code = mgp_func_add_arg(func, "second", type_number);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
}
@ -164,7 +164,7 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
{
mgp_func *func{nullptr};
auto err_code = mgp_module_add_function(module, "return_null", ReturnNull, &func);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
}

View File

@ -26,13 +26,13 @@ static void TryToWrite(struct mgp_list *args, mgp_func_context *ctx, mgp_func_re
// Setting a property should set an error
auto err_code = mgp_vertex_set_property(vertex, name, value);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
mgp_func_result_set_error_msg(result, "Cannot set property in the function!", memory);
return;
}
err_code = mgp_func_result_set_value(result, value, memory);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
mgp_func_result_set_error_msg(result, "Failed to construct return value!", memory);
return;
}
@ -44,21 +44,21 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
{
mgp_func *func{nullptr};
auto err_code = mgp_module_add_function(module, "try_to_write", TryToWrite, &func);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
mgp_type *type_vertex{nullptr};
mgp_type_node(&type_vertex);
err_code = mgp_func_add_arg(func, "argument", type_vertex);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
mgp_type *type_string{nullptr};
mgp_type_string(&type_string);
err_code = mgp_func_add_arg(func, "name", type_string);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
@ -67,7 +67,7 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
mgp_type *nullable_type{nullptr};
mgp_type_nullable(any_type, &nullable_type);
err_code = mgp_func_add_arg(func, "value", nullable_type);
if (err_code != MGP_ERROR_NO_ERROR) {
if (err_code != mgp_error::MGP_ERROR_NO_ERROR) {
return 1;
}
}

View File

@ -0,0 +1,8 @@
find_package(gflags REQUIRED)
find_package(Boost REQUIRED)
add_executable(memgraph__e2e__monitoring_server monitoring.cpp)
target_link_libraries(memgraph__e2e__monitoring_server mgclient mg-utils json gflags Boost::headers)
add_executable(memgraph__e2e__monitoring_server_ssl monitoring_ssl.cpp)
target_link_libraries(memgraph__e2e__monitoring_server_ssl mgclient mg-utils json gflags Boost::headers)

View File

@ -1,15 +1,15 @@
cert_file: &cert_file "$PROJECT_DIR/tests/e2e/websocket/memgraph-selfsigned.crt"
key_file: &key_file "$PROJECT_DIR/tests/e2e/websocket/memgraph-selfsigned.key"
cert_file: &cert_file "$PROJECT_DIR/tests/e2e/memgraph-selfsigned.crt"
key_file: &key_file "$PROJECT_DIR/tests/e2e/memgraph-selfsigned.key"
bolt_port: &bolt_port "7687"
monitoring_port: &monitoring_port "7444"
template_cluster: &template_cluster
cluster:
websocket:
monitoring:
args: ["--bolt-port=7687", "--log-level=TRACE", "--"]
log_file: "websocket-e2e.log"
log_file: "monitoring-websocket-e2e.log"
template_cluster_ssl: &template_cluster_ssl
cluster:
websocket:
monitoring:
args:
[
"--bolt-port",
@ -23,16 +23,15 @@ template_cluster_ssl: &template_cluster_ssl
*key_file,
"--",
]
log_file: "websocket-ssl-e2e.log"
log_file: "monitoring-websocket-ssl-e2e.log"
ssl: true
workloads:
- name: "Websocket"
binary: "tests/e2e/websocket/memgraph__e2e__websocket"
- name: "Monitoring server using WebSocket"
binary: "tests/e2e/monitoring_server/memgraph__e2e__monitoring_server"
args: ["--bolt-port", *bolt_port, "--monitoring-port", *monitoring_port]
<<: *template_cluster
- name: "Websocket SSL"
binary: "tests/e2e/websocket/memgraph__e2e__websocket_ssl"
- name: "Monitoring server using WebSocket SSL"
binary: "tests/e2e/monitoring_server/memgraph__e2e__monitoring_server_ssl"
args: ["--bolt-port", *bolt_port, "--monitoring-port", *monitoring_port]
<<: *template_cluster_ssl

View File

@ -5,3 +5,7 @@ target_link_libraries(memgraph__e2e__replication__constraints gflags mgclient mg
add_executable(memgraph__e2e__replication__read_write_benchmark read_write_benchmark.cpp)
target_link_libraries(memgraph__e2e__replication__read_write_benchmark gflags json mgclient mg-utils mg-io Threads::Threads)
copy_e2e_python_files(replication_show common.py)
copy_e2e_python_files(replication_show conftest.py)
copy_e2e_python_files(replication_show show.py)

View File

@ -0,0 +1,26 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import mgclient
import typing
def execute_and_fetch_all(
cursor: mgclient.Cursor, query: str, params: dict = {}
) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(**kwargs)
connection.autocommit = True
return connection

View File

@ -0,0 +1,44 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import pytest
from common import execute_and_fetch_all, connect
# The fixture here is more complex because the connection has to be
# parameterized based on the test parameters (info has to be available on both
# sides).
#
# https://docs.pytest.org/en/latest/example/parametrize.html#indirect-parametrization
# is not an elegant/feasible solution here.
#
# The solution was independently developed and then I stumbled upon the same
# approach here https://stackoverflow.com/a/68286553/4888809 which I think is
# optimal.
@pytest.fixture(scope="function")
def connection():
connection_holder = None
role_holder = None
def inner_connection(port, role):
nonlocal connection_holder, role_holder
connection_holder = connect(host="localhost", port=port)
role_holder = role
return connection_holder
yield inner_connection
# Only main instance can be cleaned up because replicas do NOT accept
# writes.
if role_holder == "main":
cursor = connection_holder.cursor()
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")

46
tests/e2e/replication/show.py Executable file
View File

@ -0,0 +1,46 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import pytest
from common import execute_and_fetch_all
@pytest.mark.parametrize(
"port, role",
[(7687, "main"), (7688, "replica"), (7689, "replica"), (7690, "replica")],
)
def test_show_replication_role(port, role, connection):
cursor = connection(port, role).cursor()
data = execute_and_fetch_all(cursor, "SHOW REPLICATION ROLE;")
assert cursor.description[0].name == "replication role"
assert data[0][0] == role
def test_show_replicas(connection):
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_column_names = {"name", "socket_address", "sync_mode", "timeout"}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0),
("replica_2", "127.0.0.1:10002", "sync", 1.0),
("replica_3", "127.0.0.1:10003", "async", None),
}
assert expected_data == actual_data
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -46,4 +46,31 @@ workloads:
args: []
<<: *template_cluster
- name: "Show"
binary: "tests/e2e/pytest_runner.sh"
args: ["replication/show.py"]
cluster:
replica_1:
args: ["--bolt-port", "7688", "--log-level=TRACE"]
log_file: "replication-e2e-replica1.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"]
validation_queries: []
replica_2:
args: ["--bolt-port", "7689", "--log-level=TRACE"]
log_file: "replication-e2e-replica2.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"]
validation_queries: []
replica_3:
args: ["--bolt-port", "7690", "--log-level=TRACE"]
log_file: "replication-e2e-replica3.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"]
validation_queries: []
main:
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "replication-e2e-main.log"
setup_queries: [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
]
validation_queries: []

View File

@ -0,0 +1,8 @@
find_package(gflags REQUIRED)
find_package(Boost REQUIRED)
add_executable(memgraph__e2e__server_connection server_connection.cpp)
target_link_libraries(memgraph__e2e__server_connection mgclient mg-utils gflags)
add_executable(memgraph__e2e__server_ssl_connection server_ssl_connection.cpp)
target_link_libraries(memgraph__e2e__server_ssl_connection mgclient mg-utils gflags)

View File

@ -0,0 +1,60 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <chrono>
#include <functional>
#include <thread>
#include <spdlog/spdlog.h>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/system/detail/error_code.hpp>
#include <mgclient.hpp>
#include "utils/logging.hpp"
inline void OnTimeoutExpiration(const boost::system::error_code &ec) {
// Timer was not cancelled, take necessary action.
MG_ASSERT(!!ec, "Connection timeout");
}
inline void EstablishConnection(const uint16_t bolt_port, const bool use_ssl) {
spdlog::info("Testing successfull connection from one client");
mg::Client::Init();
boost::asio::io_context ioc;
boost::asio::steady_timer timer(ioc, std::chrono::seconds(5));
timer.async_wait(std::bind_front(&OnTimeoutExpiration));
std::jthread bg_thread([&ioc]() { ioc.run(); });
auto client = mg::Client::Connect({.host = "127.0.0.1", .port = bolt_port, .use_ssl = use_ssl});
MG_ASSERT(client, "Failed to connect!");
timer.cancel();
}
inline void EstablishMultipleConnections(const uint16_t bolt_port, const bool use_ssl) {
spdlog::info("Testing successfull connection from multiple clients");
mg::Client::Init();
boost::asio::io_context ioc;
boost::asio::steady_timer timer(ioc, std::chrono::seconds(5));
timer.async_wait(std::bind_front(&OnTimeoutExpiration));
std::jthread bg_thread([&ioc]() { ioc.run(); });
auto client1 = mg::Client::Connect({.host = "127.0.0.1", .port = bolt_port, .use_ssl = use_ssl});
auto client2 = mg::Client::Connect({.host = "127.0.0.1", .port = bolt_port, .use_ssl = use_ssl});
auto client3 = mg::Client::Connect({.host = "127.0.0.1", .port = bolt_port, .use_ssl = use_ssl});
MG_ASSERT(client1, "Failed to connect!");
MG_ASSERT(client2, "Failed to connect!");
MG_ASSERT(client3, "Failed to connect!");
timer.cancel();
}

View File

@ -0,0 +1,56 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <unistd.h>
#include <chrono>
#include <cstddef>
#include <gflags/gflags.h>
#include <spdlog/spdlog.h>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/system/detail/error_code.hpp>
#include <mgclient.hpp>
#include "common.hpp"
#include "utils/logging.hpp"
DEFINE_uint64(bolt_port, 7687, "Bolt port");
void EstablishSSLConnectionToNonSSLServer(const auto bolt_port) {
spdlog::info("Testing that connection fails when connecting to non SSL server while using SSL");
mg::Client::Init();
boost::asio::io_context ioc;
boost::asio::steady_timer timer(ioc, std::chrono::seconds(5));
timer.async_wait(std::bind_front(&OnTimeoutExpiration));
std::jthread bg_thread([&ioc]() { ioc.run(); });
auto client = mg::Client::Connect({.host = "127.0.0.1", .port = bolt_port, .use_ssl = true});
MG_ASSERT(client == nullptr, "Connection not refused when connecting with SSL turned on to a non SSL server!");
timer.cancel();
}
int main(int argc, char **argv) {
google::SetUsageMessage("Memgraph E2E server connection!");
gflags::ParseCommandLineFlags(&argc, &argv, true);
MG_ASSERT(FLAGS_bolt_port != 0);
memgraph::logging::RedirectToStderr();
const auto bolt_port = static_cast<uint16_t>(FLAGS_bolt_port);
EstablishConnection(bolt_port, false);
EstablishMultipleConnections(bolt_port, false);
EstablishSSLConnectionToNonSSLServer(bolt_port);
return 0;
}

View File

@ -0,0 +1,57 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <unistd.h>
#include <chrono>
#include <cstddef>
#include <thread>
#include <gflags/gflags.h>
#include <spdlog/spdlog.h>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/system/detail/error_code.hpp>
#include <mgclient.hpp>
#include "common.hpp"
#include "utils/logging.hpp"
DEFINE_uint64(bolt_port, 7687, "Bolt port");
void EstablishNonSSLConnectionToSSLServer(const auto bolt_port) {
spdlog::info("Testing that connection fails when connecting to SSL server without using SSL");
mg::Client::Init();
boost::asio::io_context ioc;
boost::asio::steady_timer timer(ioc, std::chrono::seconds(5));
timer.async_wait(std::bind_front(&OnTimeoutExpiration));
std::jthread bg_thread([&ioc]() { ioc.run(); });
auto client = mg::Client::Connect({.host = "127.0.0.1", .port = bolt_port, .use_ssl = false});
MG_ASSERT(client == nullptr, "Connection not refused when conneting without SSL turned on to a SSL server!");
timer.cancel();
}
int main(int argc, char **argv) {
google::SetUsageMessage("Memgraph E2E server SSL connection!");
gflags::ParseCommandLineFlags(&argc, &argv, true);
MG_ASSERT(FLAGS_bolt_port != 0);
memgraph::logging::RedirectToStderr();
const auto bolt_port = static_cast<uint16_t>(FLAGS_bolt_port);
EstablishConnection(bolt_port, true);
EstablishMultipleConnections(bolt_port, true);
EstablishNonSSLConnectionToSSLServer(bolt_port);
return 0;
}

View File

@ -0,0 +1,34 @@
cert_file: &cert_file "$PROJECT_DIR/tests/e2e/memgraph-selfsigned.crt"
key_file: &key_file "$PROJECT_DIR/tests/e2e/memgraph-selfsigned.key"
bolt_port: &bolt_port "7687"
template_cluster: &template_cluster
cluster:
server:
args: ["--bolt-port=7687", "--log-level=TRACE", "--"]
log_file: "server-connection-e2e.log"
template_cluster_ssl: &template_cluster_ssl
cluster:
server:
args:
[
"--bolt-port",
*bolt_port,
"--log-level=TRACE",
"--bolt-cert-file",
*cert_file,
"--bolt-key-file",
*key_file,
"--",
]
log_file: "server-connection-ssl-e2e.log"
ssl: true
workloads:
- name: "Server connection"
binary: "tests/e2e/server/memgraph__e2e__server_connection"
args: ["--bolt-port", *bolt_port]
<<: *template_cluster
- name: "Server SSL connection"
binary: "tests/e2e/server/memgraph__e2e__server_ssl_connection"
args: ["--bolt-port", *bolt_port]
<<: *template_cluster_ssl

View File

@ -0,0 +1,8 @@
There are three docker-compose files in this directory:
* [kafka.yml](kafka.yml)
* [pulsar.yml](pulsar.yml)
* [redpanda.yml](redpanda.yml)
To run one of them, use the `docker-compose -f <filename> up -V` command. Optionally you can append `-d` to detach from the started containers. You can stop the detach containers by `docker-compose -f <filename> down`.
If you experience strange errors, try to clean up the previously created containers by `docker-compose -f <filename> rm -svf`.

View File

@ -1,13 +1,13 @@
version: "3"
version: '3.7'
services:
zookeeper:
image: 'bitnami/zookeeper:3.6.3-debian-10-r33'
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:2.8.0-debian-10-r49'
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
@ -18,9 +18,3 @@ services:
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
pulsar:
image: 'apachepulsar/pulsar:2.8.1'
ports:
- '6652:8080'
- '6650:6650'
entrypoint: ['bin/pulsar', 'standalone']

View File

@ -0,0 +1,8 @@
version: '3.7'
services:
pulsar:
image: 'apachepulsar/pulsar:latest'
ports:
- '6652:8080'
- '6650:6650'
entrypoint: ['bin/pulsar', 'standalone']

View File

@ -0,0 +1,23 @@
version: '3.7'
services:
redpanda:
command:
- redpanda
- start
- --smp
- '1'
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- '0'
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
# NOTE: Please use the latest version here!
image: docker.vectorized.io/vectorized/redpanda:latest
container_name: redpanda-1
ports:
- 9092:9092
- 29092:29092

View File

@ -14,7 +14,7 @@
extern "C" int mgp_init_module(mgp_module *module, mgp_memory *memory) {
static const auto no_op_cb = [](mgp_messages *msg, mgp_graph *graph, mgp_result *result, mgp_memory *memory) {};
if (MGP_ERROR_NO_ERROR != mgp_module_add_transformation(module, "empty_transformation", no_op_cb)) {
if (mgp_error::MGP_ERROR_NO_ERROR != mgp_module_add_transformation(module, "empty_transformation", no_op_cb)) {
return 1;
}

View File

@ -1,10 +0,0 @@
find_package(gflags REQUIRED)
find_package(Boost REQUIRED)
add_executable(memgraph__e2e__websocket websocket.cpp)
target_link_libraries(memgraph__e2e__websocket mgclient mg-utils json gflags Boost::headers)
add_executable(memgraph__e2e__websocket_ssl websocket_ssl.cpp)
target_link_libraries(memgraph__e2e__websocket_ssl mgclient mg-utils json gflags Boost::headers)
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/memgraph-selfsigned.crt DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/memgraph-selfsigned.key DESTINATION ${CMAKE_CURRENT_BINARY_DIR})

View File

@ -23,13 +23,13 @@ TEST(MgpTransTest, TestMgpTransApi) {
// for different string cases as these are all handled by
// IsValidIdentifier().
// Maybe add a mock instead and expect IsValidIdentifier() to be called once?
EXPECT_EQ(mgp_module_add_transformation(&module, "dash-dash", no_op_cb), MGP_ERROR_INVALID_ARGUMENT);
EXPECT_EQ(mgp_module_add_transformation(&module, "dash-dash", no_op_cb), mgp_error::MGP_ERROR_INVALID_ARGUMENT);
EXPECT_TRUE(module.transformations.empty());
EXPECT_EQ(mgp_module_add_transformation(&module, "transform", no_op_cb), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_transformation(&module, "transform", no_op_cb), mgp_error::MGP_ERROR_NO_ERROR);
EXPECT_NE(module.transformations.find("transform"), module.transformations.end());
// Try to register a transformation twice
EXPECT_EQ(mgp_module_add_transformation(&module, "transform", no_op_cb), MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_module_add_transformation(&module, "transform", no_op_cb), mgp_error::MGP_ERROR_LOGIC_ERROR);
EXPECT_TRUE(module.transformations.size() == 1);
}

View File

@ -25,25 +25,26 @@ TEST(Module, InvalidFunctionRegistration) {
mgp_module module(memgraph::utils::NewDeleteResource());
mgp_func *func{nullptr};
// Other test cases are covered within the procedure API. This is only sanity check
EXPECT_EQ(mgp_module_add_function(&module, "dashes-not-supported", DummyCallback, &func), MGP_ERROR_INVALID_ARGUMENT);
EXPECT_EQ(mgp_module_add_function(&module, "dashes-not-supported", DummyCallback, &func),
mgp_error::MGP_ERROR_INVALID_ARGUMENT);
}
TEST(Module, RegisterSameFunctionMultipleTimes) {
mgp_module module(memgraph::utils::NewDeleteResource());
mgp_func *func{nullptr};
EXPECT_EQ(module.functions.find("same_name"), module.functions.end());
EXPECT_EQ(mgp_module_add_function(&module, "same_name", DummyCallback, &func), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_function(&module, "same_name", DummyCallback, &func), mgp_error::MGP_ERROR_NO_ERROR);
EXPECT_NE(module.functions.find("same_name"), module.functions.end());
EXPECT_EQ(mgp_module_add_function(&module, "same_name", DummyCallback, &func), MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_module_add_function(&module, "same_name", DummyCallback, &func), MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_module_add_function(&module, "same_name", DummyCallback, &func), mgp_error::MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_module_add_function(&module, "same_name", DummyCallback, &func), mgp_error::MGP_ERROR_LOGIC_ERROR);
EXPECT_NE(module.functions.find("same_name"), module.functions.end());
}
TEST(Module, CaseSensitiveFunctionNames) {
mgp_module module(memgraph::utils::NewDeleteResource());
mgp_func *func{nullptr};
EXPECT_EQ(mgp_module_add_function(&module, "not_same", DummyCallback, &func), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_function(&module, "NoT_saME", DummyCallback, &func), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_function(&module, "NOT_SAME", DummyCallback, &func), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_function(&module, "not_same", DummyCallback, &func), mgp_error::MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_function(&module, "NoT_saME", DummyCallback, &func), mgp_error::MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_function(&module, "NOT_SAME", DummyCallback, &func), mgp_error::MGP_ERROR_NO_ERROR);
EXPECT_EQ(module.functions.size(), 3U);
}

View File

@ -25,30 +25,34 @@ TEST(Module, InvalidProcedureRegistration) {
mgp_module module(memgraph::utils::NewDeleteResource());
mgp_proc *proc{nullptr};
EXPECT_EQ(mgp_module_add_read_procedure(&module, "dashes-not-supported", DummyCallback, &proc),
MGP_ERROR_INVALID_ARGUMENT);
mgp_error::MGP_ERROR_INVALID_ARGUMENT);
// as u8string this is u8"unicode\u22c6not\u2014supported"
EXPECT_EQ(mgp_module_add_read_procedure(&module, "unicode\xE2\x8B\x86not\xE2\x80\x94supported", DummyCallback, &proc),
MGP_ERROR_INVALID_ARGUMENT);
mgp_error::MGP_ERROR_INVALID_ARGUMENT);
// as u8string this is u8"`backticks⋆\u22c6won't-save\u2014you`"
EXPECT_EQ(
mgp_module_add_read_procedure(&module, "`backticks⋆\xE2\x8B\x86won't-save\xE2\x80\x94you`", DummyCallback, &proc),
MGP_ERROR_INVALID_ARGUMENT);
mgp_error::MGP_ERROR_INVALID_ARGUMENT);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "42_name_must_not_start_with_number", DummyCallback, &proc),
MGP_ERROR_INVALID_ARGUMENT);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "div/", DummyCallback, &proc), MGP_ERROR_INVALID_ARGUMENT);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "mul*", DummyCallback, &proc), MGP_ERROR_INVALID_ARGUMENT);
mgp_error::MGP_ERROR_INVALID_ARGUMENT);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "div/", DummyCallback, &proc),
mgp_error::MGP_ERROR_INVALID_ARGUMENT);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "mul*", DummyCallback, &proc),
mgp_error::MGP_ERROR_INVALID_ARGUMENT);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "question_mark_is_not_valid?", DummyCallback, &proc),
MGP_ERROR_INVALID_ARGUMENT);
mgp_error::MGP_ERROR_INVALID_ARGUMENT);
}
TEST(Module, RegisteringTheSameProcedureMultipleTimes) {
mgp_module module(memgraph::utils::NewDeleteResource());
mgp_proc *proc{nullptr};
EXPECT_EQ(module.procedures.find("same_name"), module.procedures.end());
EXPECT_EQ(mgp_module_add_read_procedure(&module, "same_name", DummyCallback, &proc), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "same_name", DummyCallback, &proc), mgp_error::MGP_ERROR_NO_ERROR);
EXPECT_NE(module.procedures.find("same_name"), module.procedures.end());
EXPECT_EQ(mgp_module_add_read_procedure(&module, "same_name", DummyCallback, &proc), MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "same_name", DummyCallback, &proc), MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "same_name", DummyCallback, &proc),
mgp_error::MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "same_name", DummyCallback, &proc),
mgp_error::MGP_ERROR_LOGIC_ERROR);
EXPECT_NE(module.procedures.find("same_name"), module.procedures.end());
}
@ -56,9 +60,9 @@ TEST(Module, CaseSensitiveProcedureNames) {
mgp_module module(memgraph::utils::NewDeleteResource());
EXPECT_TRUE(module.procedures.empty());
mgp_proc *proc{nullptr};
EXPECT_EQ(mgp_module_add_read_procedure(&module, "not_same", DummyCallback, &proc), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "NoT_saME", DummyCallback, &proc), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "NOT_SAME", DummyCallback, &proc), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "not_same", DummyCallback, &proc), mgp_error::MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "NoT_saME", DummyCallback, &proc), mgp_error::MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_module_add_read_procedure(&module, "NOT_SAME", DummyCallback, &proc), mgp_error::MGP_ERROR_NO_ERROR);
EXPECT_EQ(module.procedures.size(), 3U);
}
@ -73,37 +77,41 @@ TEST(Module, ProcedureSignature) {
mgp_module module(memgraph::utils::NewDeleteResource());
auto *proc = EXPECT_MGP_NO_ERROR(mgp_proc *, mgp_module_add_read_procedure, &module, "proc", &DummyCallback);
CheckSignature(proc, "proc() :: ()");
EXPECT_EQ(mgp_proc_add_arg(proc, "arg1", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_number)), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_proc_add_arg(proc, "arg1", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_number)),
mgp_error::MGP_ERROR_NO_ERROR);
CheckSignature(proc, "proc(arg1 :: NUMBER) :: ()");
EXPECT_EQ(mgp_proc_add_opt_arg(
proc, "opt1",
EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_nullable, EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_any)),
test_utils::CreateValueOwningPtr(EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_null, &memory)).get()),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
CheckSignature(proc, "proc(arg1 :: NUMBER, opt1 = Null :: ANY?) :: ()");
EXPECT_EQ(
mgp_proc_add_result(
proc, "res1", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_list, EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_int))),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
CheckSignature(proc, "proc(arg1 :: NUMBER, opt1 = Null :: ANY?) :: (res1 :: LIST OF INTEGER)");
EXPECT_EQ(mgp_proc_add_arg(proc, "arg2", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_number)), MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_proc_add_arg(proc, "arg2", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_number)),
mgp_error::MGP_ERROR_LOGIC_ERROR);
CheckSignature(proc, "proc(arg1 :: NUMBER, opt1 = Null :: ANY?) :: (res1 :: LIST OF INTEGER)");
EXPECT_EQ(mgp_proc_add_arg(proc, "arg2", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_map)), MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_proc_add_arg(proc, "arg2", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_map)),
mgp_error::MGP_ERROR_LOGIC_ERROR);
CheckSignature(proc, "proc(arg1 :: NUMBER, opt1 = Null :: ANY?) :: (res1 :: LIST OF INTEGER)");
EXPECT_EQ(mgp_proc_add_deprecated_result(proc, "res2", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_string)),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
CheckSignature(proc,
"proc(arg1 :: NUMBER, opt1 = Null :: ANY?) :: "
"(res1 :: LIST OF INTEGER, DEPRECATED res2 :: STRING)");
EXPECT_EQ(mgp_proc_add_result(proc, "res2", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_any)), MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_proc_add_result(proc, "res2", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_any)),
mgp_error::MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_proc_add_deprecated_result(proc, "res1", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_any)),
MGP_ERROR_LOGIC_ERROR);
mgp_error::MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(
mgp_proc_add_opt_arg(proc, "opt2", EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_string),
test_utils::CreateValueOwningPtr(
EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_string, "string=\"value\"", &memory))
.get()),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
CheckSignature(proc,
"proc(arg1 :: NUMBER, opt1 = Null :: ANY?, "
"opt2 = \"string=\\\"value\\\"\" :: STRING) :: "
@ -118,7 +126,7 @@ TEST(Module, ProcedureSignatureOnlyOptArg) {
proc, "opt1",
EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_nullable, EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_any)),
test_utils::CreateValueOwningPtr(EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_null, &memory)).get()),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
CheckSignature(proc, "proc(opt1 = Null :: ANY?) :: ()");
}

View File

@ -207,7 +207,7 @@ TEST(CypherType, MapSatisfiesType) {
mgp_map_insert(
map, "key",
test_utils::CreateValueOwningPtr(EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_int, 42, &memory)).get()),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
auto *mgp_map_v = EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_map, map);
const memgraph::query::TypedValue tv_map(
std::map<std::string, memgraph::query::TypedValue>{{"key", memgraph::query::TypedValue(42)}});
@ -287,7 +287,7 @@ TEST(CypherType, PathSatisfiesType) {
ASSERT_TRUE(path);
alloc.delete_object(mgp_vertex_v);
auto mgp_edge_v = alloc.new_object<mgp_edge>(edge, &graph);
ASSERT_EQ(mgp_path_expand(path, mgp_edge_v), MGP_ERROR_NO_ERROR);
ASSERT_EQ(mgp_path_expand(path, mgp_edge_v), mgp_error::MGP_ERROR_NO_ERROR);
alloc.delete_object(mgp_edge_v);
auto *mgp_path_v = EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_path, path);
const memgraph::query::TypedValue tv_path(memgraph::query::Path(v1, edge, v2));
@ -343,7 +343,7 @@ TEST(CypherType, ListOfIntSatisfiesType) {
mgp_list_append(
list,
test_utils::CreateValueOwningPtr(EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_int, i, &memory)).get()),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
tv_list.ValueList().emplace_back(i);
auto valid_types =
MakeListTypes({EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_any), EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_int),
@ -371,14 +371,14 @@ TEST(CypherType, ListOfIntAndBoolSatisfiesType) {
mgp_list_append(
list,
test_utils::CreateValueOwningPtr(EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_int, 42, &memory)).get()),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
tv_list.ValueList().emplace_back(42);
// Add a boolean
ASSERT_EQ(
mgp_list_append(
list,
test_utils::CreateValueOwningPtr(EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_bool, 1, &memory)).get()),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
tv_list.ValueList().emplace_back(true);
auto valid_types = MakeListTypes({EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_any)});
valid_types.push_back(EXPECT_MGP_NO_ERROR(mgp_type *, mgp_type_any));
@ -402,7 +402,7 @@ TEST(CypherType, ListOfNullSatisfiesType) {
ASSERT_EQ(
mgp_list_append(
list, test_utils::CreateValueOwningPtr(EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_null, &memory)).get()),
MGP_ERROR_NO_ERROR);
mgp_error::MGP_ERROR_NO_ERROR);
tv_list.ValueList().emplace_back();
// List with Null satisfies all nullable list element types
std::vector<mgp_type *> primitive_types{

View File

@ -30,13 +30,13 @@ TEST(PyModule, MgpValueToPyObject) {
EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_double, 0.1, &memory),
EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_string, "some text", &memory)};
for (auto *val : primitive_values) {
EXPECT_EQ(mgp_list_append(list, val), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_list_append(list, val), mgp_error::MGP_ERROR_NO_ERROR);
mgp_value_destroy(val);
}
}
auto *list_val = EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_list, list);
auto *map = EXPECT_MGP_NO_ERROR(mgp_map *, mgp_map_make_empty, &memory);
EXPECT_EQ(mgp_map_insert(map, "list", list_val), MGP_ERROR_NO_ERROR);
EXPECT_EQ(mgp_map_insert(map, "list", list_val), mgp_error::MGP_ERROR_NO_ERROR);
mgp_value_destroy(list_val);
auto *map_val = EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_map, map);
auto gil = memgraph::py::EnsureGIL();
@ -218,7 +218,7 @@ TEST(PyModule, PyPath) {
ASSERT_TRUE(edges_it);
for (auto *edge = EXPECT_MGP_NO_ERROR(mgp_edge *, mgp_edges_iterator_get, edges_it); edge != nullptr;
edge = EXPECT_MGP_NO_ERROR(mgp_edge *, mgp_edges_iterator_next, edges_it)) {
ASSERT_EQ(mgp_path_expand(path, edge), MGP_ERROR_NO_ERROR);
ASSERT_EQ(mgp_path_expand(path, edge), mgp_error::MGP_ERROR_NO_ERROR);
}
ASSERT_EQ(EXPECT_MGP_NO_ERROR(size_t, mgp_path_size, path), 1);
mgp_edges_iterator_destroy(edges_it);

View File

@ -31,7 +31,7 @@
#include "test_utils.hpp"
#include "utils/memory.hpp"
#define EXPECT_SUCCESS(...) EXPECT_EQ(__VA_ARGS__, MGP_ERROR_NO_ERROR)
#define EXPECT_SUCCESS(...) EXPECT_EQ(__VA_ARGS__, mgp_error::MGP_ERROR_NO_ERROR)
namespace {
struct MgpEdgeDeleter {
@ -193,7 +193,7 @@ TEST_F(MgpGraphTest, DetachDeleteVertex) {
EXPECT_EQ(CountVertices(read_uncommited_accessor, memgraph::storage::View::NEW), 2);
MgpVertexPtr vertex{EXPECT_MGP_NO_ERROR(mgp_vertex *, mgp_graph_get_vertex_by_id, &graph,
mgp_vertex_id{vertex_ids.front().AsInt()}, &memory)};
EXPECT_EQ(mgp_graph_delete_vertex(&graph, vertex.get()), MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(mgp_graph_delete_vertex(&graph, vertex.get()), mgp_error::MGP_ERROR_LOGIC_ERROR);
EXPECT_EQ(CountVertices(read_uncommited_accessor, memgraph::storage::View::NEW), 2);
EXPECT_SUCCESS(mgp_graph_detach_delete_vertex(&graph, vertex.get()));
EXPECT_EQ(CountVertices(read_uncommited_accessor, memgraph::storage::View::NEW), 1);
@ -212,14 +212,14 @@ TEST_F(MgpGraphTest, CreateDeleteWithImmutableGraph) {
mgp_graph immutable_graph = CreateGraph(memgraph::storage::View::OLD);
mgp_vertex *raw_vertex{nullptr};
EXPECT_EQ(mgp_graph_create_vertex(&immutable_graph, &memory, &raw_vertex), MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(mgp_graph_create_vertex(&immutable_graph, &memory, &raw_vertex), mgp_error::MGP_ERROR_IMMUTABLE_OBJECT);
MgpVertexPtr created_vertex{raw_vertex};
EXPECT_EQ(created_vertex, nullptr);
EXPECT_EQ(CountVertices(read_uncommited_accessor, memgraph::storage::View::NEW), 1);
MgpVertexPtr vertex_to_delete{EXPECT_MGP_NO_ERROR(mgp_vertex *, mgp_graph_get_vertex_by_id, &immutable_graph,
mgp_vertex_id{vertex_id.AsInt()}, &memory)};
ASSERT_NE(vertex_to_delete, nullptr);
EXPECT_EQ(mgp_graph_delete_vertex(&immutable_graph, vertex_to_delete.get()), MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(mgp_graph_delete_vertex(&immutable_graph, vertex_to_delete.get()), mgp_error::MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(CountVertices(read_uncommited_accessor, memgraph::storage::View::NEW), 1);
}
@ -398,10 +398,11 @@ TEST_F(MgpGraphTest, ModifyImmutableVertex) {
EXPECT_MGP_NO_ERROR(mgp_vertex *, mgp_graph_get_vertex_by_id, &graph, mgp_vertex_id{vertex_id.AsInt()}, &memory)};
EXPECT_EQ(EXPECT_MGP_NO_ERROR(int, mgp_vertex_underlying_graph_is_mutable, vertex.get()), 0);
EXPECT_EQ(mgp_vertex_add_label(vertex.get(), mgp_label{"label"}), MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(mgp_vertex_remove_label(vertex.get(), mgp_label{label_to_remove.data()}), MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(mgp_vertex_add_label(vertex.get(), mgp_label{"label"}), mgp_error::MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(mgp_vertex_remove_label(vertex.get(), mgp_label{label_to_remove.data()}),
mgp_error::MGP_ERROR_IMMUTABLE_OBJECT);
MgpValuePtr value{EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_int, 4, &memory)};
EXPECT_EQ(mgp_vertex_set_property(vertex.get(), "property", value.get()), MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(mgp_vertex_set_property(vertex.get(), "property", value.get()), mgp_error::MGP_ERROR_IMMUTABLE_OBJECT);
}
TEST_F(MgpGraphTest, CreateDeleteEdge) {
@ -452,16 +453,16 @@ TEST_F(MgpGraphTest, CreateDeleteEdgeWithImmutableGraph) {
mgp_edge *edge{nullptr};
EXPECT_EQ(
mgp_graph_create_edge(&graph, from.get(), to.get(), mgp_edge_type{"NEWLY_CREATED_EDGE_TYPE"}, &memory, &edge),
MGP_ERROR_IMMUTABLE_OBJECT);
mgp_error::MGP_ERROR_IMMUTABLE_OBJECT);
CheckEdgeCountBetween(from, to, 1);
MgpEdgesIteratorPtr edges_it{
EXPECT_MGP_NO_ERROR(mgp_edges_iterator *, mgp_vertex_iter_out_edges, from.get(), &memory)};
auto *edge_from_it = EXPECT_MGP_NO_ERROR(mgp_edge *, mgp_edges_iterator_get, edges_it.get());
ASSERT_NE(edge_from_it, nullptr);
EXPECT_EQ(mgp_graph_delete_edge(&graph, edge_from_it), MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(mgp_graph_delete_edge(&graph, edge_from_it), mgp_error::MGP_ERROR_IMMUTABLE_OBJECT);
MgpEdgePtr edge_copy_of_immutable{EXPECT_MGP_NO_ERROR(mgp_edge *, mgp_edge_copy, edge_from_it, &memory)};
EXPECT_EQ(mgp_graph_delete_edge(&graph, edge_copy_of_immutable.get()), MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(mgp_graph_delete_edge(&graph, edge_copy_of_immutable.get()), mgp_error::MGP_ERROR_IMMUTABLE_OBJECT);
CheckEdgeCountBetween(from, to, 1);
}
@ -616,5 +617,5 @@ TEST_F(MgpGraphTest, EdgeSetPropertyWithImmutableGraph) {
ASSERT_NO_FATAL_FAILURE(GetFirstOutEdge(graph, from_vertex_id, edge));
MgpValuePtr value{EXPECT_MGP_NO_ERROR(mgp_value *, mgp_value_make_int, 65, &memory)};
EXPECT_EQ(EXPECT_MGP_NO_ERROR(int, mgp_edge_underlying_graph_is_mutable, edge.get()), 0);
EXPECT_EQ(mgp_edge_set_property(edge.get(), "property", value.get()), MGP_ERROR_IMMUTABLE_OBJECT);
EXPECT_EQ(mgp_edge_set_property(edge.get(), "property", value.get()), mgp_error::MGP_ERROR_IMMUTABLE_OBJECT);
}

View File

@ -29,7 +29,7 @@ TResult ExpectNoError(const char *file, int line, TFunc func, TArgs &&...args) {
static_assert(std::is_trivially_copyable_v<TFunc>);
static_assert((std::is_trivially_copyable_v<std::remove_reference_t<TArgs>> && ...));
TResult result{};
EXPECT_EQ(func(args..., &result), MGP_ERROR_NO_ERROR) << fmt::format("Source of error: {}:{}", file, line);
EXPECT_EQ(func(args..., &result), mgp_error::MGP_ERROR_NO_ERROR) << fmt::format("Source of error: {}:{}", file, line);
return result;
}
} // namespace test_utils