This commit is contained in:
János Benjamin Antal 2022-07-18 08:21:04 +02:00
commit c0bee760bf
71 changed files with 2296 additions and 543 deletions

View File

@ -3,7 +3,9 @@
- [ ] Update [changelog](https://docs.memgraph.com/memgraph/changelog)
- [ ] Write E2E tests
- [ ] Compare the [benchmarking results](https://bench-graph.memgraph.com/) between the master branch and the Epic branch
- [ ] Provide the full content or a guide for the final git message
[master < Task] PR
- [ ] Check, and update documentation if necessary
- [ ] Update [changelog](https://docs.memgraph.com/memgraph/changelog)
- [ ] Provide the full content or a guide for the final git message

View File

@ -1,4 +1,7 @@
name: Diff
concurrency:
group: ${{ github.head_ref || github.sha }}
cancel-in-progress: true
on:
push:

View File

@ -174,5 +174,5 @@ jobs:
- name: "Upload package"
uses: actions/upload-artifact@v2
with:
name: debian-11
path: build/output/debian-11/memgraph*.deb
name: debian-11-arm
path: build/output/debian-11-arm/memgraph*.deb

View File

@ -4,8 +4,12 @@ on:
workflow_dispatch:
inputs:
version:
description: "Memgraph binary version to publish on Dockerhub."
description: "Memgraph binary version to publish on DockerHub."
required: true
force_release:
type: boolean
required: false
default: false
jobs:
docker_publish:
@ -36,6 +40,22 @@ jobs:
curl -L https://download.memgraph.com/memgraph/v${{ github.event.inputs.version }}/debian-11/memgraph_${{ github.event.inputs.version }}-1_amd64.deb > memgraph-amd64.deb
curl -L https://download.memgraph.com/memgraph/v${{ github.event.inputs.version }}/debian-11-aarch64/memgraph_${{ github.event.inputs.version }}-1_arm64.deb > memgraph-arm64.deb
- name: Check if specified version is already pushed
run: |
EXISTS=$(docker manifest inspect $DOCKER_ORGANIZATION_NAME/$DOCKER_REPOSITORY_NAME:${{ github.event.inputs.version }} > /dev/null; echo $?)
echo $EXISTS
if [[ ${EXISTS} -eq 0 ]]; then
echo 'The specified version has been already released to DockerHub.'
if [[ ${{ github.event.inputs.force_release }} = true ]]; then
echo 'Forcing the release!'
else
echo 'Stopping the release!'
exit 1
fi
else
echo 'All good the specified version has not been release to DockerHub.'
fi
- name: Build & push docker images
run: |
cd release/docker

View File

@ -1 +0,0 @@
* @antaljanosbenjamin @kostasrim

View File

@ -5,6 +5,9 @@ set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
source "$DIR/../util.sh"
check_operating_system "centos-7"
check_architecture "x86_64"
TOOLCHAIN_BUILD_DEPS=(
coreutils gcc gcc-c++ make # generic build tools
wget # used for archive download

View File

@ -5,6 +5,9 @@ set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
source "$DIR/../util.sh"
check_operating_system "centos-9"
check_architecture "x86_64"
TOOLCHAIN_BUILD_DEPS=(
coreutils-common gcc gcc-c++ make # generic build tools
wget # used for archive download

View File

@ -5,6 +5,9 @@ set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
source "$DIR/../util.sh"
check_operating_system "debian-10"
check_architecture "x86_64"
TOOLCHAIN_BUILD_DEPS=(
coreutils gcc g++ build-essential make # generic build tools
wget # used for archive download

View File

@ -5,6 +5,9 @@ set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
source "$DIR/../util.sh"
check_operating_system "debian-11"
check_architecture "arm64"
TOOLCHAIN_BUILD_DEPS=(
coreutils gcc g++ build-essential make # generic build tools
wget # used for archive download

View File

@ -5,6 +5,9 @@ set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
source "$DIR/../util.sh"
check_operating_system "debian-11"
check_architecture "x86_64"
TOOLCHAIN_BUILD_DEPS=(
coreutils gcc g++ build-essential make # generic build tools
wget # used for archive download

View File

@ -5,6 +5,8 @@ set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
source "$DIR/../util.sh"
check_operating_system "todo-os-name"
TOOLCHAIN_BUILD_DEPS=(
pkg
)

View File

@ -5,6 +5,9 @@ set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
source "$DIR/../util.sh"
check_operating_system "ubuntu-18.04"
check_architecture "x86_64"
TOOLCHAIN_BUILD_DEPS=(
coreutils gcc g++ build-essential make # generic build tools
wget # archive download

View File

@ -5,6 +5,9 @@ set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
source "$DIR/../util.sh"
check_operating_system "ubuntu-20.04"
check_architecture "x86_64"
TOOLCHAIN_BUILD_DEPS=(
coreutils gcc g++ build-essential make # generic build tools
wget # used for archive download

View File

@ -5,6 +5,9 @@ set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
source "$DIR/../util.sh"
check_operating_system "ubuntu-22.04"
check_architecture "x86_64"
TOOLCHAIN_BUILD_DEPS=(
coreutils gcc g++ build-essential make # generic build tools
wget # used for archive download

View File

@ -5,10 +5,28 @@ operating_system() {
sort | cut -d '=' -f 2- | sed 's/"//g' | paste -s -d '-'
}
check_operating_system() {
if [ "$(operating_system)" != "$1" ]; then
echo "Not the right operating system!"
exit 1
else
echo "The right operating system."
fi
}
architecture() {
uname -m
}
check_architecture() {
if [ "$(architecture)" != "$1" ]; then
echo "Not the right architecture!"
exit 1
else
echo "The right architecture."
fi
}
check_all_yum() {
local missing=""
for pkg in $1; do

File diff suppressed because it is too large Load Diff

3
init
View File

@ -139,3 +139,6 @@ done;
# Install precommit hook
python3 -m pip install pre-commit
python3 -m pre_commit install
# Link `include/mgp.py` with `release/mgp/mgp.py`
ln -v -f include/mgp.py release/mgp/mgp.py

3
release/mgp/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.venv
dist
mgp.py

201
release/mgp/LICENSE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

4
release/mgp/README.md Normal file
View File

@ -0,0 +1,4 @@
# mgp
PyPi package used for type hinting when creating MAGE modules. The get started
using MAGE repository checkout the repository here: https://github.com/memgraph/mage.

255
release/mgp/_mgp.py Normal file
View File

@ -0,0 +1,255 @@
from typing import Any
class MgpIterable:
def get() -> Any:
pass
def next() -> Any:
pass
class Vertex:
def is_valid() -> bool: # type: ignore
pass
def underlying_graph_is_mutable() -> bool: # type: ignore
pass
def iter_properties() -> MgpIterable: # type: ignore
pass
def get_property(self, property_name: str) -> "Property": # type: ignore
pass
def set_property(self, property_name: str, value: Any) -> "Property": # type: ignore
pass
def get_id() -> "VertexId": # type: ignore
pass
def label_at(self, index: int) -> "Label": # type: ignore
pass
def labels_count() -> int: # type: ignore
pass
def add_label(self, label: Any):
pass
def remove_label(self, label: Any):
pass
def iter_in_edges() -> MgpIterable: # type: ignore
pass
def iter_out_edges() -> MgpIterable: # type: ignore
pass
class Edge:
def is_valid() -> bool: # type: ignore
pass
def underlying_graph_is_mutable() -> bool: # type: ignore
pass
def iter_properties() -> MgpIterable: # type: ignore
pass
def get_property(self, property_name: str) -> "Property": # type: ignore
pass
def set_property(self, property_name: str, valuse: Any) -> "Property": # type: ignore
pass
def get_type_name() -> str: # type: ignore
pass
def get_id() -> "EdgeId": # type: ignore
pass
def from_vertex() -> Vertex: # type: ignore
pass
def to_vertex() -> Vertex: # type: ignore
pass
class Path:
def is_valid() -> bool: # type: ignore
pass
@staticmethod
def make_with_start(vertex: Vertex) -> "Path": # type: ignore
pass
class Graph:
def is_valid() -> bool: # type: ignore
pass
class CypherType:
pass
class Message:
def is_valid() -> bool: # type: ignore
pass
def source_type() -> str: # type: ignore
pass
def topic_name() -> str: # type: ignore
pass
def key() -> bytes: # type: ignore
pass
def timestamp() -> int: # type: ignore
pass
def offset() -> int: # type: ignore
pass
def payload() -> bytes: # type: ignore
pass
class Messages:
def is_valid() -> bool: # type: ignore
pass
def message_at(self, id: int) -> Message: # type: ignore
pass
def total_messages() -> int: # type: ignore
pass
class UnknownError(Exception):
pass
class UnableToAllocateError(Exception):
pass
class InsufficientBufferError(Exception):
pass
class OutOfRangeError(Exception):
pass
class LogicErrorError(Exception):
pass
class DeletedObjectError(Exception):
pass
class InvalidArgumentError(Exception):
pass
class KeyAlreadyExistsError(Exception):
pass
class ImmutableObjectError(Exception):
pass
class ValueConversionError(Exception):
pass
class SerializationError(Exception):
pass
def type_nullable(elem: Any):
pass
def type_list(elem: Any):
pass
def type_bool():
pass
def type_string():
pass
def type_int():
pass
def type_float():
pass
def type_number():
pass
def type_map():
pass
def type_node():
pass
def type_relationship():
pass
def type_path():
pass
def type_date():
pass
def type_local_time():
pass
def type_local_date_time():
pass
def type_duration():
pass
def type_any():
pass
class _MODULE:
@staticmethod
def add_read_procedure(wrapper):
pass
@staticmethod
def add_write_procedure(wrapper):
pass
@staticmethod
def add_transformation(wrapper):
pass
@staticmethod
def add_function(wrapper):
pass

View File

@ -0,0 +1,22 @@
# How to publish new versions
## Prerequisites
1. Installed poetry
```
pip install poetry
```
2. Set up [API tokens](https://pypi.org/help/#apitoken)
3. Be a collaborator on [pypi](https://pypi.org/project/mgp/)
## Making changes
1. Make changes to the package
2. Bump version in `pyproject.tml`
3. `poetry build`
4. `poetry publish`
## Why is this not automatized?
Because someone always has to manually bump up the version in `pyproject.toml`
## Why does `_mgp.py` exists?
Because we are mocking here all the types that are created by Memgraph
in order to fix typing errors in `mgp.py`.

View File

@ -0,0 +1,23 @@
[tool.poetry]
name = "mgp"
version = "1.0.0"
description = "Memgraph's module for developing MAGE modules. Used only for type hinting!"
authors = [
"MasterMedo <mislav.vuletic@gmail.com>",
"jbajic <jure.bajic@memgraph.io>",
"katarinasupe <katarina.supe@memgraph.io>",
"antejavor <ante.javor@memgraph.io>",
"antaljanosbenjamin <benjamin.antal@memgraph.io>",
]
license = "Apache-2.0"
readme = "README.md"
include = ["mgp.py", "_mgp.py"]
[tool.poetry.dependencies]
python = "^3.7"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

View File

@ -216,6 +216,11 @@ DEFINE_bool(telemetry_enabled, false,
"the database runtime (vertex and edge counts and resource usage) "
"to allow for easier improvement of the product.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_restore_replicas_on_startup, true,
"Controls replicas should be restored automatically."); // TODO(42jeremy) this must be removed once T0835
// is implemented.
// Streams flags
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(
@ -1195,7 +1200,8 @@ int main(int argc, char **argv) {
.snapshot_retention_count = FLAGS_storage_snapshot_retention_count,
.wal_file_size_kibibytes = FLAGS_storage_wal_file_size_kib,
.wal_file_flush_every_n_tx = FLAGS_storage_wal_file_flush_every_n_tx,
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit},
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit,
.restore_replicas_on_startup = FLAGS_storage_restore_replicas_on_startup},
.transaction = {.isolation_level = ParseIsolationLevel()}};
if (FLAGS_storage_snapshot_interval_sec == 0) {
if (FLAGS_storage_wal_enabled) {

View File

@ -2375,10 +2375,7 @@ cpp<#
(port "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(sync_mode "SyncMode" :scope :public)
(timeout "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression")))
(sync_mode "SyncMode" :scope :public))
(:public
(lcp:define-enum action

View File

@ -275,18 +275,7 @@ antlrcpp::Any CypherMainVisitor::visitRegisterReplica(MemgraphCypher::RegisterRe
replication_query->replica_name_ = ctx->replicaName()->symbolicName()->accept(this).as<std::string>();
if (ctx->SYNC()) {
replication_query->sync_mode_ = memgraph::query::ReplicationQuery::SyncMode::SYNC;
if (ctx->WITH() && ctx->TIMEOUT()) {
if (ctx->timeout->numberLiteral()) {
// we accept both double and integer literals
replication_query->timeout_ = ctx->timeout->accept(this);
} else {
throw SemanticException("Timeout should be a integer or double literal!");
}
}
} else if (ctx->ASYNC()) {
if (ctx->WITH() && ctx->TIMEOUT()) {
throw SyntaxException("Timeout can be set only for the SYNC replication mode!");
}
replication_query->sync_mode_ = memgraph::query::ReplicationQuery::SyncMode::ASYNC;
}

View File

@ -276,7 +276,6 @@ replicaName : symbolicName ;
socketAddress : literal ;
registerReplica : REGISTER REPLICA replicaName ( SYNC | ASYNC )
( WITH TIMEOUT timeout=literal ) ?
TO socketAddress ;
dropReplica : DROP REPLICA replicaName ;

View File

@ -882,21 +882,36 @@ TypedValue Id(const TypedValue *args, int64_t nargs, const FunctionContext &ctx)
}
TypedValue ToString(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {
FType<Or<Null, String, Number, Bool>>("toString", args, nargs);
FType<Or<Null, String, Number, Date, LocalTime, LocalDateTime, Duration, Bool>>("toString", args, nargs);
const auto &arg = args[0];
if (arg.IsNull()) {
return TypedValue(ctx.memory);
} else if (arg.IsString()) {
}
if (arg.IsString()) {
return TypedValue(arg, ctx.memory);
} else if (arg.IsInt()) {
}
if (arg.IsInt()) {
// TODO: This is making a pointless copy of std::string, we may want to
// use a different conversion to string
return TypedValue(std::to_string(arg.ValueInt()), ctx.memory);
} else if (arg.IsDouble()) {
return TypedValue(std::to_string(arg.ValueDouble()), ctx.memory);
} else {
return TypedValue(arg.ValueBool() ? "true" : "false", ctx.memory);
}
if (arg.IsDouble()) {
return TypedValue(std::to_string(arg.ValueDouble()), ctx.memory);
}
if (arg.IsDate()) {
return TypedValue(arg.ValueDate().ToString(), ctx.memory);
}
if (arg.IsLocalTime()) {
return TypedValue(arg.ValueLocalTime().ToString(), ctx.memory);
}
if (arg.IsLocalDateTime()) {
return TypedValue(arg.ValueLocalDateTime().ToString(), ctx.memory);
}
if (arg.IsDuration()) {
return TypedValue(arg.ValueDuration().ToString(), ctx.memory);
}
return TypedValue(arg.ValueBool() ? "true" : "false", ctx.memory);
}
TypedValue Timestamp(const TypedValue *args, int64_t nargs, const FunctionContext &ctx) {

View File

@ -111,7 +111,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
TypedValue Visit(IfOperator &if_operator) override {
auto condition = if_operator.condition_->Accept(*this);
if (condition.IsNull()) {
return if_operator.then_expression_->Accept(*this);
return if_operator.else_expression_->Accept(*this);
}
if (condition.type() != TypedValue::Type::Bool) {
// At the moment IfOperator is used only in CASE construct.

View File

@ -44,6 +44,7 @@
#include "query/trigger.hpp"
#include "query/typed_value.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/replication/enums.hpp"
#include "utils/algorithm.hpp"
#include "utils/csv_parsing.hpp"
#include "utils/event_counter.hpp"
@ -160,7 +161,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
const ReplicationQuery::SyncMode sync_mode,
const std::chrono::seconds replica_check_frequency) override {
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
// replica can't register another replica
@ -183,9 +184,9 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, query::kDefaultReplicationPort);
if (maybe_ip_and_port) {
auto [ip, port] = *maybe_ip_and_port;
auto ret = db_->RegisterReplica(
name, {std::move(ip), port}, repl_mode,
{.timeout = timeout, .replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode,
storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
{.replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
if (ret.HasError()) {
throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name));
}
@ -228,9 +229,6 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
replica.sync_mode = ReplicationQuery::SyncMode::ASYNC;
break;
}
if (repl_info.timeout) {
replica.timeout = *repl_info.timeout;
}
replica.current_timestamp_of_replica = repl_info.timestamp_info.current_timestamp_of_replica;
replica.current_number_of_timestamp_behind_master =
@ -487,18 +485,11 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
const auto &name = repl_query->replica_name_;
const auto &sync_mode = repl_query->sync_mode_;
auto socket_address = repl_query->socket_address_->Accept(evaluator);
auto timeout = EvaluateOptionalExpression(repl_query->timeout_, &evaluator);
const auto replica_check_frequency = interpreter_context->config.replication_replica_check_frequency;
std::optional<double> maybe_timeout;
if (timeout.IsDouble()) {
maybe_timeout = timeout.ValueDouble();
} else if (timeout.IsInt()) {
maybe_timeout = static_cast<double>(timeout.ValueInt());
}
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode,
maybe_timeout, replica_check_frequency]() mutable {
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout,
replica_check_frequency);
replica_check_frequency]() mutable {
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, replica_check_frequency);
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::REGISTER_REPLICA,
@ -518,13 +509,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
}
case ReplicationQuery::Action::SHOW_REPLICAS: {
callback.header = {"name",
"socket_address",
"sync_mode",
"timeout",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state"};
callback.header = {
"name", "socket_address", "sync_mode", "current_timestamp_of_replica", "number_of_timestamp_behind_master",
"state"};
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, replica_nfields = callback.header.size()] {
const auto &replicas = handler.ShowReplicas();
auto typed_replicas = std::vector<std::vector<TypedValue>>{};
@ -545,12 +532,6 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
break;
}
if (replica.timeout) {
typed_replica.emplace_back(TypedValue(*replica.timeout));
} else {
typed_replica.emplace_back(TypedValue());
}
typed_replica.emplace_back(TypedValue(static_cast<int64_t>(replica.current_timestamp_of_replica)));
typed_replica.emplace_back(
TypedValue(static_cast<int64_t>(replica.current_number_of_timestamp_behind_master)));

View File

@ -140,7 +140,7 @@ class ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
virtual void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
ReplicationQuery::SyncMode sync_mode,
const std::chrono::seconds replica_check_frequency) = 0;
/// @throw QueryRuntimeException if an error ocurred.

View File

@ -619,10 +619,10 @@ void Streams::Drop(const std::string &stream_name) {
// no running Test function for this consumer, therefore it can be erased.
std::visit([&](const auto &stream_data) { stream_data.stream_source->Lock(); }, it->second);
locked_streams->erase(it);
if (!storage_.Delete(stream_name)) {
throw StreamsException("Couldn't delete stream '{}' from persistent store!", stream_name);
}
locked_streams->erase(it);
// TODO(antaljanosbenjamin) Release the transformation
}

View File

@ -188,7 +188,7 @@ class Streams final {
void Persist(StreamStatus<TStream> &&status) {
const std::string stream_name = status.name;
if (!storage_.Put(stream_name, nlohmann::json(std::move(status)).dump())) {
throw StreamsException{"Couldn't persist steam data for stream '{}'", stream_name};
throw StreamsException{"Couldn't persist stream data for stream '{}'", stream_name};
}
}

View File

@ -13,7 +13,6 @@ set(storage_v2_src_files
storage.cpp)
##### Replication #####
define_add_lcp(add_lcp_storage lcp_storage_cpp_files generated_lcp_storage_files)
add_lcp_storage(replication/rpc.lcp SLK_SERIALIZE)
@ -26,10 +25,10 @@ set(storage_v2_src_files
replication/replication_server.cpp
replication/serialization.cpp
replication/slk.cpp
replication/replication_persistence_helper.cpp
${lcp_storage_cpp_files})
#######################
find_package(gflags REQUIRED)
find_package(Threads REQUIRED)

View File

@ -49,7 +49,7 @@ struct Config {
uint64_t wal_file_flush_every_n_tx{100000};
bool snapshot_on_exit{false};
bool restore_replicas_on_startup{false};
} durability;
struct Transaction {

View File

@ -22,6 +22,7 @@ static const std::string kSnapshotDirectory{"snapshots"};
static const std::string kWalDirectory{"wal"};
static const std::string kBackupDirectory{".backup"};
static const std::string kLockFile{".lock"};
static const std::string kReplicationDirectory{"replication"};
// This is the prefix used for Snapshot and WAL filenames. It is a timestamp
// format that equals to: YYYYmmddHHMMSSffffff

View File

@ -10,12 +10,13 @@
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <optional>
#include <string>
namespace memgraph::storage::replication {
struct ReplicationClientConfig {
std::optional<double> timeout;
// The default delay between main checking/pinging replicas is 1s because
// that seems like a reasonable timeframe in which main should notice a
// replica is down.
@ -24,6 +25,8 @@ struct ReplicationClientConfig {
struct SSL {
std::string key_file = "";
std::string cert_file = "";
friend bool operator==(const SSL &, const SSL &) = default;
};
std::optional<SSL> ssl;

View File

@ -16,4 +16,6 @@ namespace memgraph::storage::replication {
enum class ReplicationMode : std::uint8_t { SYNC, ASYNC };
enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, INVALID };
enum class RegistrationMode : std::uint8_t { MUST_BE_INSTANTLY_VALID, CAN_BE_INVALID };
} // namespace memgraph::storage::replication

View File

@ -43,11 +43,6 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage
rpc_client_.emplace(endpoint, &*rpc_context_);
TryInitializeClientSync();
if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) {
timeout_.emplace(*config.timeout);
timeout_dispatcher_.emplace();
}
// Help the user to get the most accurate replica state possible.
if (config.replica_check_frequency > std::chrono::seconds(0)) {
replica_checker_.Run("Replica Checker", config.replica_check_frequency, [&] { FrequentCheck(); });
@ -238,41 +233,6 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
if (mode_ == replication::ReplicationMode::ASYNC) {
thread_pool_.AddTask([this] { this->FinalizeTransactionReplicationInternal(); });
} else if (timeout_) {
MG_ASSERT(mode_ == replication::ReplicationMode::SYNC, "Only SYNC replica can have a timeout.");
MG_ASSERT(timeout_dispatcher_, "Timeout thread is missing");
timeout_dispatcher_->WaitForTaskToFinish();
timeout_dispatcher_->active = true;
thread_pool_.AddTask([&, this] {
this->FinalizeTransactionReplicationInternal();
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
// TimerThread can finish waiting for timeout
timeout_dispatcher_->active = false;
// Notify the main thread
timeout_dispatcher_->main_cv.notify_one();
});
timeout_dispatcher_->StartTimeoutTask(*timeout_);
// Wait until one of the threads notifies us that they finished executing
// Both threads should first set the active flag to false
{
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
timeout_dispatcher_->main_cv.wait(main_guard, [&] { return !timeout_dispatcher_->active.load(); });
}
// TODO (antonio2368): Document and/or polish SEMI-SYNC to ASYNC fallback.
if (replica_state_ == replication::ReplicaState::REPLICATING) {
mode_ = replication::ReplicationMode::ASYNC;
timeout_.reset();
// This can only happen if we timeouted so we are sure that
// Timeout task finished
// We need to delete timeout dispatcher AFTER the replication
// finished because it tries to acquire the timeout lock
// and acces the `active` variable`
thread_pool_.AddTask([this] { timeout_dispatcher_.reset(); });
}
} else {
FinalizeTransactionReplicationInternal();
}
@ -558,36 +518,13 @@ Storage::TimestampInfo Storage::ReplicationClient::GetTimestampInfo() {
std::unique_lock client_guard(client_lock_);
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't block
HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't
// block
}
return info;
}
////// TimeoutDispatcher //////
void Storage::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() {
// Wait for the previous timeout task to finish
std::unique_lock main_guard(main_lock);
main_cv.wait(main_guard, [&] { return finished; });
}
void Storage::ReplicationClient::TimeoutDispatcher::StartTimeoutTask(const double timeout) {
timeout_pool.AddTask([timeout, this] {
finished = false;
using std::chrono::steady_clock;
const auto timeout_duration =
std::chrono::duration_cast<steady_clock::duration>(std::chrono::duration<double>(timeout));
const auto end_time = steady_clock::now() + timeout_duration;
while (active && (steady_clock::now() < end_time)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::unique_lock main_guard(main_lock);
finished = true;
active = false;
main_cv.notify_one();
});
}
////// ReplicaStream //////
Storage::ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self,
const uint64_t previous_commit_timestamp,

View File

@ -120,8 +120,6 @@ class Storage::ReplicationClient {
auto Mode() const { return mode_; }
auto Timeout() const { return timeout_; }
const auto &Endpoint() const { return rpc_client_->Endpoint(); }
Storage::TimestampInfo GetTimestampInfo();
@ -158,30 +156,6 @@ class Storage::ReplicationClient {
std::optional<ReplicaStream> replica_stream_;
replication::ReplicationMode mode_{replication::ReplicationMode::SYNC};
// Dispatcher class for timeout tasks
struct TimeoutDispatcher {
explicit TimeoutDispatcher(){};
void WaitForTaskToFinish();
void StartTimeoutTask(double timeout);
// If the Timeout task should continue waiting
std::atomic<bool> active{false};
std::mutex main_lock;
std::condition_variable main_cv;
private:
// if the Timeout task finished executing
bool finished{true};
utils::ThreadPool timeout_pool{1};
};
std::optional<double> timeout_;
std::optional<TimeoutDispatcher> timeout_dispatcher_;
utils::SpinLock client_lock_;
// This thread pool is used for background tasks so we don't
// block the main storage thread

View File

@ -0,0 +1,83 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "utils/logging.hpp"
namespace {
const std::string kReplicaName = "replica_name";
const std::string kIpAddress = "replica_ip_address";
const std::string kPort = "replica_port";
const std::string kSyncMode = "replica_sync_mode";
const std::string kCheckFrequency = "replica_check_frequency";
const std::string kSSLKeyFile = "replica_ssl_key_file";
const std::string kSSLCertFile = "replica_ssl_cert_file";
} // namespace
namespace memgraph::storage::replication {
nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status) {
auto data = nlohmann::json::object();
data[kReplicaName] = std::move(status.name);
data[kIpAddress] = std::move(status.ip_address);
data[kPort] = status.port;
data[kSyncMode] = status.sync_mode;
data[kCheckFrequency] = status.replica_check_frequency.count();
if (status.ssl.has_value()) {
data[kSSLKeyFile] = std::move(status.ssl->key_file);
data[kSSLCertFile] = std::move(status.ssl->cert_file);
} else {
data[kSSLKeyFile] = nullptr;
data[kSSLCertFile] = nullptr;
}
return data;
}
std::optional<ReplicaStatus> JSONToReplicaStatus(nlohmann::json &&data) {
ReplicaStatus replica_status;
const auto get_failed_message = [](const std::string_view message, const std::string_view nested_message) {
return fmt::format("Failed to deserialize replica's configuration: {} : {}", message, nested_message);
};
try {
data.at(kReplicaName).get_to(replica_status.name);
data.at(kIpAddress).get_to(replica_status.ip_address);
data.at(kPort).get_to(replica_status.port);
data.at(kSyncMode).get_to(replica_status.sync_mode);
replica_status.replica_check_frequency = std::chrono::seconds(data.at(kCheckFrequency));
const auto &key_file = data.at(kSSLKeyFile);
const auto &cert_file = data.at(kSSLCertFile);
MG_ASSERT(key_file.is_null() == cert_file.is_null());
if (!key_file.is_null()) {
replica_status.ssl = replication::ReplicationClientConfig::SSL{};
data.at(kSSLKeyFile).get_to(replica_status.ssl->key_file);
data.at(kSSLCertFile).get_to(replica_status.ssl->cert_file);
}
} catch (const nlohmann::json::type_error &exception) {
spdlog::error(get_failed_message("Invalid type conversion", exception.what()));
return std::nullopt;
} catch (const nlohmann::json::out_of_range &exception) {
spdlog::error(get_failed_message("Non existing field", exception.what()));
return std::nullopt;
}
return replica_status;
}
} // namespace memgraph::storage::replication

View File

@ -0,0 +1,40 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <compare>
#include <optional>
#include <string>
#include <json/json.hpp>
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
namespace memgraph::storage::replication {
struct ReplicaStatus {
std::string name;
std::string ip_address;
uint16_t port;
ReplicationMode sync_mode;
std::chrono::seconds replica_check_frequency;
std::optional<ReplicationClientConfig::SSL> ssl;
friend bool operator==(const ReplicaStatus &, const ReplicaStatus &) = default;
};
nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status);
std::optional<ReplicaStatus> JSONToReplicaStatus(nlohmann::json &&data);
} // namespace memgraph::storage::replication

View File

@ -17,6 +17,7 @@
#include <variant>
#include <gflags/gflags.h>
#include <spdlog/spdlog.h>
#include "io/network/endpoint.hpp"
#include "storage/v2/durability/durability.hpp"
@ -28,6 +29,8 @@
#include "storage/v2/indices.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/file.hpp"
@ -50,6 +53,19 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
namespace {
inline constexpr uint16_t kEpochHistoryRetention = 1000;
std::string RegisterReplicaErrorToString(Storage::RegisterReplicaError error) {
switch (error) {
case Storage::RegisterReplicaError::NAME_EXISTS:
return "NAME_EXISTS";
case Storage::RegisterReplicaError::END_POINT_EXISTS:
return "END_POINT_EXISTS";
case Storage::RegisterReplicaError::CONNECTION_FAILED:
return "CONNECTION_FAILED";
case Storage::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
return "COULD_NOT_BE_PERSISTED";
}
}
} // namespace
auto AdvanceToVisibleVertex(utils::SkipList<Vertex>::Iterator it, utils::SkipList<Vertex>::Iterator end,
@ -400,6 +416,16 @@ Storage::Storage(Config config)
} else {
commit_log_.emplace(timestamp_);
}
if (config_.durability.restore_replicas_on_startup) {
spdlog::info("Replica's configuration will be stored and will be automatically restored in case of a crash.");
utils::EnsureDirOrDie(config_.durability.storage_directory / durability::kReplicationDirectory);
storage_ =
std::make_unique<kvstore::KVStore>(config_.durability.storage_directory / durability::kReplicationDirectory);
RestoreReplicas();
} else {
spdlog::warn("Replicas' configuration will NOT be stored. When the server restarts, replicas will be forgotten.");
}
}
Storage::~Storage() {
@ -1882,7 +1908,7 @@ bool Storage::SetMainReplicationRole() {
utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config) {
const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!");
const bool name_exists = replication_clients_.WithLock([&](auto &clients) {
@ -1902,12 +1928,28 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
return RegisterReplicaError::END_POINT_EXISTS;
}
MG_ASSERT(replication_mode == replication::ReplicationMode::SYNC || !config.timeout,
"Only SYNC mode can have a timeout set");
if (ShouldStoreAndRestoreReplicas()) {
auto data = replication::ReplicaStatusToJSON(
replication::ReplicaStatus{.name = name,
.ip_address = endpoint.address,
.port = endpoint.port,
.sync_mode = replication_mode,
.replica_check_frequency = config.replica_check_frequency,
.ssl = config.ssl});
if (!storage_->Put(name, data.dump())) {
spdlog::error("Error when saving replica {} in settings.", name);
return RegisterReplicaError::COULD_NOT_BE_PERSISTED;
}
}
auto client = std::make_unique<ReplicationClient>(std::move(name), this, endpoint, replication_mode, config);
if (client->State() == replication::ReplicaState::INVALID) {
return RegisterReplicaError::CONNECTION_FAILED;
if (replication::RegistrationMode::CAN_BE_INVALID != registration_mode) {
return RegisterReplicaError::CONNECTION_FAILED;
}
spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name());
}
return replication_clients_.WithLock([&](auto &clients) -> utils::BasicResult<Storage::RegisterReplicaError> {
@ -1928,8 +1970,15 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
});
}
bool Storage::UnregisterReplica(const std::string_view name) {
bool Storage::UnregisterReplica(const std::string &name) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can unregister a replica!");
if (ShouldStoreAndRestoreReplicas()) {
if (!storage_->Delete(name)) {
spdlog::error("Error when removing replica {} from settings.", name);
return false;
}
}
return replication_clients_.WithLock([&](auto &clients) {
return std::erase_if(clients, [&](const auto &client) { return client->Name() == name; });
});
@ -1952,11 +2001,10 @@ std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
return replication_clients_.WithLock([](auto &clients) {
std::vector<Storage::ReplicaInfo> replica_info;
replica_info.reserve(clients.size());
std::transform(clients.begin(), clients.end(), std::back_inserter(replica_info),
[](const auto &client) -> ReplicaInfo {
return {client->Name(), client->Mode(), client->Timeout(),
client->Endpoint(), client->State(), client->GetTimestampInfo()};
});
std::transform(
clients.begin(), clients.end(), std::back_inserter(replica_info), [](const auto &client) -> ReplicaInfo {
return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo()};
});
return replica_info;
});
}
@ -1966,4 +2014,41 @@ void Storage::SetIsolationLevel(IsolationLevel isolation_level) {
isolation_level_ = isolation_level;
}
void Storage::RestoreReplicas() {
MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole());
if (!ShouldStoreAndRestoreReplicas()) {
return;
}
spdlog::info("Restoring replicas.");
for (const auto &[replica_name, replica_data] : *storage_) {
spdlog::info("Restoring replica {}.", replica_name);
const auto maybe_replica_status = replication::JSONToReplicaStatus(nlohmann::json::parse(replica_data));
if (!maybe_replica_status.has_value()) {
LOG_FATAL("Cannot parse previously saved configuration of replica {}.", replica_name);
}
auto replica_status = *maybe_replica_status;
MG_ASSERT(replica_status.name == replica_name, "Expected replica name is '{}', but got '{}'", replica_status.name,
replica_name);
auto ret =
RegisterReplica(std::move(replica_status.name), {std::move(replica_status.ip_address), replica_status.port},
replica_status.sync_mode, replication::RegistrationMode::CAN_BE_INVALID,
{
.replica_check_frequency = replica_status.replica_check_frequency,
.ssl = replica_status.ssl,
});
if (ret.HasError()) {
MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError());
LOG_FATAL("Failure when restoring replica {}: {}.", replica_name, RegisterReplicaErrorToString(ret.GetError()));
}
spdlog::info("Replica {} restored.", replica_name);
}
}
bool Storage::ShouldStoreAndRestoreReplicas() const { return nullptr != storage_; }
} // namespace memgraph::storage

View File

@ -18,6 +18,7 @@
#include <variant>
#include "io/network/endpoint.hpp"
#include "kvstore/kvstore.hpp"
#include "storage/v2/commit_log.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
@ -411,15 +412,20 @@ class Storage final {
bool SetMainReplicationRole();
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED };
enum class RegisterReplicaError : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
CONNECTION_FAILED,
COULD_NOT_BE_PERSISTED
};
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication
utils::BasicResult<RegisterReplicaError, void> RegisterReplica(
std::string name, io::network::Endpoint endpoint, replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config = {});
replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config = {});
/// @pre The instance should have a MAIN role
bool UnregisterReplica(std::string_view name);
bool UnregisterReplica(const std::string &name);
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name);
@ -433,7 +439,6 @@ class Storage final {
struct ReplicaInfo {
std::string name;
replication::ReplicationMode mode;
std::optional<double> timeout;
io::network::Endpoint endpoint;
replication::ReplicaState state;
TimestampInfo timestamp_info;
@ -475,6 +480,10 @@ class Storage final {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
void RestoreReplicas();
bool ShouldStoreAndRestoreReplicas() const;
// Main storage lock.
//
// Accessors take a shared lock when starting, so it is possible to block
@ -535,6 +544,7 @@ class Storage final {
std::filesystem::path wal_directory_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
std::unique_ptr<kvstore::KVStore> storage_;
utils::Scheduler snapshot_runner_;
utils::SpinLock snapshot_lock_;

View File

@ -15,6 +15,7 @@
#include <chrono>
#include <ctime>
#include <limits>
#include <string>
#include <string_view>
#include "utils/exceptions.hpp"
@ -175,6 +176,10 @@ int64_t Date::MicrosecondsSinceEpoch() const {
int64_t Date::DaysSinceEpoch() const { return utils::DaysSinceEpoch(year, month, day).count(); }
std::string Date::ToString() const {
return fmt::format("{:0>4}-{:0>2}-{:0>2}", year, static_cast<int>(month), static_cast<int>(day));
}
size_t DateHash::operator()(const Date &date) const {
utils::HashCombine<uint64_t, uint64_t> hasher;
size_t result = hasher(0, date.year);
@ -377,6 +382,15 @@ int64_t LocalTime::NanosecondsSinceEpoch() const {
return chrono::duration_cast<chrono::nanoseconds>(SumLocalTimeParts()).count();
}
std::string LocalTime::ToString() const {
using milli = std::chrono::milliseconds;
using micro = std::chrono::microseconds;
const auto subseconds = milli(millisecond) + micro(microsecond);
return fmt::format("{:0>2}:{:0>2}:{:0>2}.{:0>6}", static_cast<int>(hour), static_cast<int>(minute),
static_cast<int>(second), subseconds.count());
}
size_t LocalTimeHash::operator()(const LocalTime &local_time) const {
utils::HashCombine<uint64_t, uint64_t> hasher;
size_t result = hasher(0, local_time.hour);
@ -486,6 +500,8 @@ int64_t LocalDateTime::SubSecondsAsNanoseconds() const {
return (milli_as_nanos + micros_as_nanos).count();
}
std::string LocalDateTime::ToString() const { return date.ToString() + 'T' + local_time.ToString(); }
LocalDateTime::LocalDateTime(const DateParameters &date_parameters, const LocalTimeParameters &local_time_parameters)
: date(date_parameters), local_time(local_time_parameters) {}
@ -699,6 +715,23 @@ int64_t Duration::SubSecondsAsNanoseconds() const {
return chrono::duration_cast<chrono::nanoseconds>(micros - secs).count();
}
std::string Duration::ToString() const {
// Format [nD]T[nH]:[nM]:[nS].
namespace chrono = std::chrono;
auto micros = chrono::microseconds(microseconds);
const auto dd = GetAndSubtractDuration<chrono::days>(micros);
const auto h = GetAndSubtractDuration<chrono::hours>(micros);
const auto m = GetAndSubtractDuration<chrono::minutes>(micros);
const auto s = GetAndSubtractDuration<chrono::seconds>(micros);
auto first_half = fmt::format("P{}DT{}H{}M", dd, h, m);
auto second_half = fmt::format("{}.{:0>6}S", s, std::abs(micros.count()));
if (s == 0 && micros.count() < 0) {
return first_half + '-' + second_half;
}
return first_half + second_half;
}
Duration Duration::operator-() const {
if (microseconds == std::numeric_limits<decltype(microseconds)>::min()) [[unlikely]] {
throw temporal::InvalidArgumentException("Duration arithmetic overflows");

View File

@ -87,20 +87,9 @@ struct Duration {
int64_t SubDaysAsNanoseconds() const;
int64_t SubSecondsAsNanoseconds() const;
friend std::ostream &operator<<(std::ostream &os, const Duration &dur) {
// Format [nD]T[nH]:[nM]:[nS].
namespace chrono = std::chrono;
auto micros = chrono::microseconds(dur.microseconds);
const auto dd = GetAndSubtractDuration<chrono::days>(micros);
const auto h = GetAndSubtractDuration<chrono::hours>(micros);
const auto m = GetAndSubtractDuration<chrono::minutes>(micros);
const auto s = GetAndSubtractDuration<chrono::seconds>(micros);
os << fmt::format("P{}DT{}H{}M", dd, h, m);
if (s == 0 && micros.count() < 0) {
os << '-';
}
return os << fmt::format("{}.{:0>6}S", s, std::abs(micros.count()));
}
std::string ToString() const;
friend std::ostream &operator<<(std::ostream &os, const Duration &dur) { return os << dur.ToString(); }
Duration operator-() const;
@ -155,13 +144,11 @@ struct Date {
explicit Date(int64_t microseconds);
explicit Date(const DateParameters &date_parameters);
friend std::ostream &operator<<(std::ostream &os, const Date &date) {
return os << fmt::format("{:0>2}-{:0>2}-{:0>2}", date.year, static_cast<int>(date.month),
static_cast<int>(date.day));
}
friend std::ostream &operator<<(std::ostream &os, const Date &date) { return os << date.ToString(); }
int64_t MicrosecondsSinceEpoch() const;
int64_t DaysSinceEpoch() const;
std::string ToString() const;
friend Date operator+(const Date &date, const Duration &dur) {
namespace chrono = std::chrono;
@ -217,17 +204,11 @@ struct LocalTime {
// Epoch means the start of the day, i,e, midnight
int64_t MicrosecondsSinceEpoch() const;
int64_t NanosecondsSinceEpoch() const;
std::string ToString() const;
auto operator<=>(const LocalTime &) const = default;
friend std::ostream &operator<<(std::ostream &os, const LocalTime &lt) {
namespace chrono = std::chrono;
using milli = chrono::milliseconds;
using micro = chrono::microseconds;
const auto subseconds = milli(lt.millisecond) + micro(lt.microsecond);
return os << fmt::format("{:0>2}:{:0>2}:{:0>2}.{:0>6}", static_cast<int>(lt.hour), static_cast<int>(lt.minute),
static_cast<int>(lt.second), subseconds.count());
}
friend std::ostream &operator<<(std::ostream &os, const LocalTime &lt) { return os << lt.ToString(); }
friend LocalTime operator+(const LocalTime &local_time, const Duration &dur) {
namespace chrono = std::chrono;
@ -279,13 +260,11 @@ struct LocalDateTime {
int64_t MicrosecondsSinceEpoch() const;
int64_t SecondsSinceEpoch() const; // seconds since epoch
int64_t SubSecondsAsNanoseconds() const;
std::string ToString() const;
auto operator<=>(const LocalDateTime &) const = default;
friend std::ostream &operator<<(std::ostream &os, const LocalDateTime &ldt) {
os << ldt.date << 'T' << ldt.local_time;
return os;
}
friend std::ostream &operator<<(std::ostream &os, const LocalDateTime &ldt) { return os << ldt.ToString(); }
friend LocalDateTime operator+(const LocalDateTime &dt, const Duration &dur) {
const auto local_date_time_as_duration = Duration(dt.MicrosecondsSinceEpoch());

View File

@ -36,12 +36,14 @@ import os
import subprocess
from argparse import ArgumentParser
from pathlib import Path
import tempfile
import time
import sys
from inspect import signature
import yaml
from memgraph import MemgraphInstanceRunner
from memgraph import extract_bolt_port
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
@ -66,7 +68,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA replica1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA replica2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
"REGISTER REPLICA replica2 SYNC TO '127.0.0.1:10002'",
],
},
}
@ -95,11 +97,25 @@ def load_args():
return parser.parse_args()
def _start_instance(name, args, log_file, queries, use_ssl, procdir):
def is_port_in_use(port: int) -> bool:
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("localhost", port)) == 0
def _start_instance(name, args, log_file, queries, use_ssl, procdir, data_directory):
assert (
name not in MEMGRAPH_INSTANCES.keys()
), "If this raises, you are trying to start an instance with the same name than one already running."
assert not is_port_in_use(
extract_bolt_port(args)
), "If this raises, you are trying to start an instance on a port already used by one already running instance."
mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY, use_ssl)
MEMGRAPH_INSTANCES[name] = mg_instance
log_file_path = os.path.join(BUILD_DIR, "logs", log_file)
binary_args = args + ["--log-file", log_file_path]
data_directory_path = os.path.join(BUILD_DIR, data_directory)
binary_args = args + ["--log-file", log_file_path] + ["--data-directory", data_directory_path]
if len(procdir) != 0:
binary_args.append("--query-modules-directory=" + procdir)
@ -108,12 +124,13 @@ def _start_instance(name, args, log_file, queries, use_ssl, procdir):
for query in queries:
mg_instance.query(query)
return mg_instance
assert mg_instance.is_running(), "An error occured after starting Memgraph instance: application stopped running."
def stop_all():
for mg_instance in MEMGRAPH_INSTANCES.values():
mg_instance.stop()
MEMGRAPH_INSTANCES.clear()
def stop_instance(context, name):
@ -121,6 +138,7 @@ def stop_instance(context, name):
if key != name:
continue
MEMGRAPH_INSTANCES[name].stop()
MEMGRAPH_INSTANCES.pop(name)
def stop(context, name):
@ -131,6 +149,14 @@ def stop(context, name):
stop_all()
def kill(context, name):
for key in context.keys():
if key != name:
continue
MEMGRAPH_INSTANCES[name].kill()
MEMGRAPH_INSTANCES.pop(name)
@atexit.register
def cleanup():
stop_all()
@ -151,28 +177,30 @@ def start_instance(context, name, procdir):
if "ssl" in value:
use_ssl = bool(value["ssl"])
value.pop("ssl")
data_directory = ""
if "data_directory" in value:
data_directory = value["data_directory"]
else:
data_directory = tempfile.TemporaryDirectory().name
instance = _start_instance(name, args, log_file, queries, use_ssl, procdir)
instance = _start_instance(name, args, log_file, queries, use_ssl, procdir, data_directory)
mg_instances[name] = instance
assert len(mg_instances) == 1
return mg_instances
def start_all(context, procdir=""):
mg_instances = {}
stop_all()
for key, _ in context.items():
mg_instances.update(start_instance(context, key, procdir))
return mg_instances
start_instance(context, key, procdir)
def start(context, name, procdir=""):
if name != "all":
return start_instance(context, name, procdir)
start_instance(context, name, procdir)
return
return start_all(context)
start_all(context)
def info(context):

View File

@ -76,11 +76,8 @@ class MemgraphInstanceRunner:
self.stop()
self.args = copy.deepcopy(args)
self.args = [replace_paths(arg) for arg in self.args]
self.data_directory = tempfile.TemporaryDirectory()
args_mg = [
self.binary_path,
"--data-directory",
self.data_directory.name,
"--storage-wal-enabled",
"--storage-snapshot-interval-sec",
"300",

16
tests/e2e/mg_utils.py Normal file
View File

@ -0,0 +1,16 @@
import time
def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.05):
result = function_to_retrieve_data()
start_time = time.time()
while result != expected_value:
current_time = time.time()
duration = current_time - start_time
if duration > max_duration:
assert False, " mg_sleep_and_assert has tried for too long and did not get the expected result!"
time.sleep(time_between_attempt)
result = function_to_retrieve_data()
return result

View File

@ -5,7 +5,7 @@ monitoring_port: &monitoring_port "7444"
template_cluster: &template_cluster
cluster:
monitoring:
args: ["--bolt-port=7687", "--log-level=TRACE", "--"]
args: ["--bolt-port=7687", "--log-level=TRACE"]
log_file: "monitoring-websocket-e2e.log"
template_cluster_ssl: &template_cluster_ssl
cluster:
@ -21,7 +21,6 @@ template_cluster_ssl: &template_cluster_ssl
*cert_file,
"--bolt-key-file",
*key_file,
"--",
]
log_file: "monitoring-websocket-ssl-e2e.log"
ssl: true

View File

@ -12,3 +12,4 @@ copy_e2e_python_files(replication_show show.py)
copy_e2e_python_files(replication_show show_while_creating_invalid_state.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." memgraph.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." interactive_mg_runner.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." mg_utils.py)

View File

@ -15,6 +15,7 @@ import pytest
import time
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
@pytest.mark.parametrize(
@ -36,20 +37,19 @@ def test_show_replicas(connection):
"name",
"socket_address",
"sync_mode",
"timeout",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
assert actual_column_names == expected_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
}
assert expected_data == actual_data
assert actual_data == expected_data
def test_show_replicas_while_inserting_data(connection):
@ -68,43 +68,43 @@ def test_show_replicas_while_inserting_data(connection):
"name",
"socket_address",
"sync_mode",
"timeout",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
assert actual_column_names == expected_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
}
assert expected_data == actual_data
assert actual_data == expected_data
# 1/
execute_and_fetch_all(cursor, "CREATE (n1:Number {name: 'forty_two', value:42});")
time.sleep(1)
# 2/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2.0, 4, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, 4, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 4, 0, "ready"),
("replica_1", "127.0.0.1:10001", "sync", 4, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 4, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 4, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
print("actual_data=" + str(actual_data))
print("expected_data=" + str(expected_data))
assert expected_data == actual_data
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
# 3/
res = execute_and_fetch_all(cursor, "MATCH (node) return node;")
assert 1 == len(res)
assert len(res) == 1
# 4/
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert expected_data == actual_data
assert actual_data == expected_data
if __name__ == "__main__":

View File

@ -11,13 +11,14 @@
import sys
import atexit
import os
import pytest
import time
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
import interactive_mg_runner
import mgclient
import tempfile
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -51,8 +52,8 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001';",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002';",
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';",
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';",
"REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';",
],
@ -68,11 +69,7 @@ def test_show_replicas(connection):
# 3/ We kill another replica. It should become invalid in the SHOW REPLICAS command.
# 0/
atexit.register(
interactive_mg_runner.stop_all
) # Needed in case the test fails due to an assert. One still want the instances to be stoped.
mg_instances = interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7687, "main").cursor()
@ -82,47 +79,328 @@ def test_show_replicas(connection):
"name",
"socket_address",
"sync_mode",
"timeout",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
}
actual_column_names = {x.name for x in cursor.description}
assert EXPECTED_COLUMN_NAMES == actual_column_names
assert actual_column_names == EXPECTED_COLUMN_NAMES
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"),
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"),
}
assert expected_data == actual_data
assert actual_data == expected_data
# 2/
execute_and_fetch_all(cursor, "DROP REPLICA replica_2")
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"),
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"),
}
assert expected_data == actual_data
assert actual_data == expected_data
# 3/
mg_instances["replica_1"].kill()
mg_instances["replica_3"].kill()
mg_instances["replica_4"].stop()
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_3")
interactive_mg_runner.stop(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_4")
# We leave some time for the main to realise the replicas are down.
time.sleep(2)
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "invalid"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "invalid"),
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "invalid"),
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "invalid"),
("replica_4", "127.0.0.1:10004", "async", 0, 0, "invalid"),
}
assert expected_data == actual_data
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
def test_basic_recovery(connection):
# Goal of this test is to check the recovery of main.
# 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1/ We check that all replicas have the correct state: they should all be ready.
# 2/ We kill main.
# 3/ We re-start main.
# 4/ We check that all replicas have the correct state: they should all be ready.
# 5/ Drop one replica.
# 6/ We add some data to main, then kill it and restart.
# 7/ We check that all replicas but one have the expected data.
# 8/ We kill another replica.
# 9/ We add some data to main.
# 10/ We re-add the two replicas droped/killed and check the data.
# 11/ We kill another replica.
# 12/ Add some more data to main.
# 13/ Check the states of replicas.
# 0/
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"replica_1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"replica_2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"replica_3": {
"args": ["--bolt-port", "7690", "--log-level=TRACE"],
"log_file": "replica3.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"],
},
"replica_4": {
"args": ["--bolt-port", "7691", "--log-level=TRACE"],
"log_file": "replica4.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10004;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}",
},
}
interactive_mg_runner.start_all(CONFIGURATION)
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration.
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';")
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';")
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';")
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';")
# 1/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
def check_roles():
assert "main" == interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICATION ROLE;")[0][0]
for index in range(1, 4):
assert (
"replica"
== interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query("SHOW REPLICATION ROLE;")[0][0]
)
check_roles()
# 2/
interactive_mg_runner.kill(CONFIGURATION, "main")
# 3/
interactive_mg_runner.start(CONFIGURATION, "main")
cursor = connection(7687, "main").cursor()
check_roles()
# 4/
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
# 5/
execute_and_fetch_all(cursor, "DROP REPLICA replica_2;")
# 6/
execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic', value:42})")
interactive_mg_runner.kill(CONFIGURATION, "main")
interactive_mg_runner.start(CONFIGURATION, "main")
cursor = connection(7687, "main").cursor()
check_roles()
# 7/
QUERY_TO_CHECK = "MATCH (node) return node;"
res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK)
assert len(res_from_main) == 1
for index in (1, 3, 4):
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK)
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 2, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 2, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
# Replica_2 was dropped, we check it does not have the data from main.
assert len(interactive_mg_runner.MEMGRAPH_INSTANCES["replica_2"].query(QUERY_TO_CHECK)) == 0
# 8/
interactive_mg_runner.kill(CONFIGURATION, "replica_3")
# 9/
execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic_again', value:43})")
res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK)
assert len(res_from_main) == 2
# 10/
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';")
interactive_mg_runner.start(CONFIGURATION, "replica_3")
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 6, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 6, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 6, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"),
}
def retrieve_data2():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data2)
assert actual_data == expected_data
for index in (1, 2, 3, 4):
assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main
# 11/
interactive_mg_runner.kill(CONFIGURATION, "replica_1")
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
("replica_2", "127.0.0.1:10002", "sync", 6, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 6, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"),
}
def retrieve_data3():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data3)
assert actual_data == expected_data
# 12/
execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic_again_again', value:44})")
res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK)
assert len(res_from_main) == 3
for index in (2, 3, 4):
assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main
# 13/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
("replica_2", "127.0.0.1:10002", "sync", 9, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 9, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 9, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
def test_conflict_at_startup(connection):
# Goal of this test is to check starting up several instance with different replicas' configuration directory works as expected.
# main_1 and main_2 have different directory.
data_directory1 = tempfile.TemporaryDirectory()
data_directory2 = tempfile.TemporaryDirectory()
CONFIGURATION = {
"main_1": {
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
"log_file": "main1.log",
"setup_queries": [],
"data_directory": f"{data_directory1.name}",
},
"main_2": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "main2.log",
"setup_queries": [],
"data_directory": f"{data_directory2.name}",
},
}
interactive_mg_runner.start_all(CONFIGURATION)
cursor_1 = connection(7687, "main_1").cursor()
cursor_2 = connection(7688, "main_2").cursor()
assert execute_and_fetch_all(cursor_1, "SHOW REPLICATION ROLE;")[0][0] == "main"
assert execute_and_fetch_all(cursor_2, "SHOW REPLICATION ROLE;")[0][0] == "main"
def test_basic_recovery_when_replica_is_kill_when_main_is_down(connection):
# Goal of this test is to check the recovery of main.
# 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1/ We check that all replicas have the correct state: they should all be ready.
# 2/ We kill main then kill a replica.
# 3/ We re-start main: it should be able to restart.
# 4/ Check status of replica: replica_2 is invalid.
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"replica_1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"replica_2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}",
},
}
interactive_mg_runner.start_all(CONFIGURATION)
# We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration.
interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';")
interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';")
# 1/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
}
actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;"))
assert actual_data == expected_data
def check_roles():
assert "main" == interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICATION ROLE;")[0][0]
for index in range(1, 2):
assert (
"replica"
== interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query("SHOW REPLICATION ROLE;")[0][0]
)
check_roles()
# 2/
interactive_mg_runner.kill(CONFIGURATION, "main")
interactive_mg_runner.kill(CONFIGURATION, "replica_2")
# 3/
interactive_mg_runner.start(CONFIGURATION, "main")
# 4/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
}
actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;"))
assert actual_data == expected_data
if __name__ == "__main__":

View File

@ -29,8 +29,8 @@ template_cluster: &template_cluster
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "replication-e2e-main.log"
setup_queries: [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002'",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
]
<<: *template_validation_queries
@ -69,8 +69,8 @@ workloads:
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "replication-e2e-main.log"
setup_queries: [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002'",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
]
validation_queries: []

View File

@ -50,18 +50,15 @@ def run(args):
continue
log.info("%s STARTED.", workload_name)
# Setup.
mg_instances = {}
@atexit.register
def cleanup():
for mg_instance in mg_instances.values():
mg_instance.stop()
interactive_mg_runner.stop_all()
if "cluster" in workload:
procdir = ""
if "proc" in workload:
procdir = os.path.join(BUILD_DIR, workload["proc"])
mg_instances = interactive_mg_runner.start_all(workload["cluster"], procdir)
interactive_mg_runner.start_all(workload["cluster"], procdir)
# Test.
mg_test_binary = os.path.join(BUILD_DIR, workload["binary"])
@ -70,7 +67,7 @@ def run(args):
if "cluster" in workload:
for name, config in workload["cluster"].items():
for validation in config.get("validation_queries", []):
mg_instance = mg_instances[name]
mg_instance = interactive_mg_runner.MEMGRAPH_INSTANCES[name]
data = mg_instance.query(validation["query"])[0][0]
assert data == validation["expected"]
cleanup()

View File

@ -4,7 +4,7 @@ bolt_port: &bolt_port "7687"
template_cluster: &template_cluster
cluster:
server:
args: ["--bolt-port=7687", "--log-level=TRACE", "--"]
args: ["--bolt-port=7687", "--log-level=TRACE"]
log_file: "server-connection-e2e.log"
template_cluster_ssl: &template_cluster_ssl
cluster:
@ -18,7 +18,6 @@ template_cluster_ssl: &template_cluster_ssl
*cert_file,
"--bolt-key-file",
*key_file,
"--",
]
log_file: "server-connection-ssl-e2e.log"
ssl: true

View File

@ -9,3 +9,5 @@ copy_streams_e2e_python_files(streams_owner_tests.py)
copy_streams_e2e_python_files(pulsar_streams_tests.py)
add_subdirectory(transformations)
copy_e2e_python_files_from_parent_folder(streams ".." mg_utils.py)

View File

@ -13,6 +13,7 @@ import mgclient
import pytest
import time
from mg_utils import mg_sleep_and_assert
from multiprocessing import Manager, Process, Value
# These are the indices of the different values in the result of SHOW STREAM
@ -427,8 +428,10 @@ def test_start_stream_with_batch_limit(connection, stream_creator, messages_send
thread_stream_running = Process(target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT))
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
def is_running():
return get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
messages_sender(BATCH_LIMIT - 1)
@ -438,10 +441,8 @@ def test_start_stream_with_batch_limit(connection, stream_creator, messages_send
# We send a last message to reach the batch_limit
messages_sender(1)
time.sleep(2)
# We check that the stream has correctly stoped.
assert not get_is_running(cursor, STREAM_NAME)
assert not mg_sleep_and_assert(False, is_running)
def test_start_stream_with_batch_limit_timeout(connection, stream_creator):
@ -504,8 +505,11 @@ def test_start_stream_with_batch_limit_while_check_running(
# 1/
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT))
thread_stream_check.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
def is_running():
return get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
with pytest.raises(mgclient.DatabaseError):
start_stream_with_limit(cursor, STREAM_NAME, BATCH_LIMIT, timeout=TIMEOUT)
@ -521,13 +525,12 @@ def test_start_stream_with_batch_limit_while_check_running(
target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT + 1, TIMEOUT)
) # Sending BATCH_LIMIT + 1 messages as BATCH_LIMIT messages have already been sent during the CHECK STREAM (and not consumed)
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
message_sender(SIMPLE_MSG)
time.sleep(2)
assert not get_is_running(cursor, STREAM_NAME)
assert not mg_sleep_and_assert(False, is_running)
def test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender):
@ -557,8 +560,11 @@ def test_check_while_stream_with_batch_limit_running(connection, stream_creator,
)
start_time = time.time()
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
def is_running():
return get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {BATCH_LIMIT} TIMEOUT {TIMEOUT}")
@ -567,23 +573,17 @@ def test_check_while_stream_with_batch_limit_running(connection, stream_creator,
assert (end_time - start_time) < 0.8 * TIMEOUT, "The CHECK STREAM has probably thrown due to timeout!"
message_sender(SIMPLE_MSG)
time.sleep(2)
assert not get_is_running(cursor, STREAM_NAME)
assert not mg_sleep_and_assert(False, is_running)
# 2/
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT))
start_time = time.time()
thread_stream_check.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
message_sender(SIMPLE_MSG)
time.sleep(2)
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!"
assert not get_is_running(cursor, STREAM_NAME)
assert not mg_sleep_and_assert(False, is_running)
def test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator):

View File

@ -15,6 +15,7 @@ import sys
import pytest
import mgclient
import time
from mg_utils import mg_sleep_and_assert
from multiprocessing import Process, Value
import common
@ -465,8 +466,11 @@ def test_start_stream_with_batch_limit_while_check_running(kafka_producer, kafka
def setup_function(start_check_stream, cursor, stream_name, batch_limit, timeout):
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(stream_name, batch_limit, timeout))
thread_stream_check.start()
time.sleep(2)
assert common.get_is_running(cursor, stream_name)
def is_running():
return common.get_is_running(cursor, stream_name)
assert mg_sleep_and_assert(True, is_running)
message_sender(common.SIMPLE_MSG)
thread_stream_check.join()

View File

@ -37,7 +37,6 @@ def test_simple(pulsar_client, pulsar_topics, connection, transformation):
f"CREATE PULSAR STREAM test TOPICS '{','.join(pulsar_topics)}' TRANSFORM {transformation}",
)
common.start_stream(cursor, "test")
time.sleep(5)
for topic in pulsar_topics:
producer = pulsar_client.create_producer(
@ -66,8 +65,6 @@ def test_separate_consumers(pulsar_client, pulsar_topics, connection, transforma
for stream_name in stream_names:
common.start_stream(cursor, stream_name)
time.sleep(5)
for topic in pulsar_topics:
producer = pulsar_client.create_producer(topic, send_timeout_millis=60000)
producer.send(common.SIMPLE_MSG)
@ -89,7 +86,6 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection):
f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple",
)
common.start_stream(cursor, "test")
time.sleep(1)
def assert_message_not_consumed(message):
vertices_with_msg = common.execute_and_fetch_all(
@ -154,7 +150,6 @@ def test_check_stream(pulsar_client, pulsar_topics, connection, transformation):
f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM {transformation} BATCH_SIZE {BATCH_SIZE}",
)
common.start_stream(cursor, "test")
time.sleep(1)
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
@ -308,7 +303,6 @@ def test_restart_after_error(pulsar_client, pulsar_topics, connection):
)
common.start_stream(cursor, "test_stream")
time.sleep(1)
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
@ -317,7 +311,6 @@ def test_restart_after_error(pulsar_client, pulsar_topics, connection):
assert common.timed_wait(lambda: not common.get_is_running(cursor, "test_stream"))
common.start_stream(cursor, "test_stream")
time.sleep(1)
producer.send(b"CREATE (n:VERTEX { id : 42 })")
assert common.check_one_result_row(cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
@ -332,7 +325,6 @@ def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
f"CREATE PULSAR STREAM test TOPICS {','.join(pulsar_topics)} TRANSFORM {transformation} SERVICE_URL '{LOCAL}'",
)
common.start_stream(cursor, "test")
time.sleep(5)
for topic in pulsar_topics:
producer = pulsar_client.create_producer(

View File

@ -57,3 +57,43 @@ Feature: Case
Then the result should be:
| z |
| ['nottwo', 'two', 'nottwo'] |
Scenario: Simple CASE nullcheck does not have match:
Given an empty graph
When executing query:
"""
WITH 2 AS name RETURN CASE name WHEN 3 THEN 'something went wrong' WHEN null THEN "doesn't work" ELSE 'works' END
"""
Then the result should be:
| CASE name WHEN 3 THEN 'something went wrong' WHEN null THEN "doesn't work" ELSE 'works' END |
| 'works' |
Scenario: Simple CASE nullcheck does have match:
Given an empty graph
When executing query:
"""
WITH 2 AS name RETURN CASE name WHEN 2 THEN 'works' WHEN null THEN "doesn't work" ELSE 'something went wrong' END
"""
Then the result should be:
| CASE name WHEN 2 THEN 'works' WHEN null THEN "doesn't work" ELSE 'something went wrong' END |
| 'works' |
Scenario: Generic CASE nullcheck does have match:
Given an empty graph
When executing query:
"""
WITH 2 AS name RETURN CASE WHEN name is NULL THEN "doesn't work" WHEN name = 2 THEN "works" ELSE "something went wrong" END
"""
Then the result should be:
| CASE WHEN name is NULL THEN "doesn't work" WHEN name = 2 THEN "works" ELSE "something went wrong" END |
| 'works' |
Scenario: Generic CASE expression is null:
Given an empty graph
When executing query:
"""
WITH null AS name RETURN CASE name WHEN null THEN "doesn't work" WHEN 2 THEN "doesn't work" ELSE 'works' END
"""
Then the result should be:
| CASE name WHEN null THEN "doesn't work" WHEN 2 THEN "doesn't work" ELSE 'works' END |
| 'works' |

View File

@ -3,4 +3,4 @@
"n3" {:replication-role :replica :replication-mode :async :port 10000}}
{"n1" {:replication-role :main}
"n2" {:replication-role :replica :replication-mode :async :port 10000}
"n3" {:replication-role :replica :replication-mode :sync :port 10000 :timeout 3}}]
"n3" {:replication-role :replica :replication-mode :sync :port 10000}}]

View File

@ -31,7 +31,7 @@
[node-config]
(case (:replication-mode node-config)
:async "ASYNC"
:sync (str "SYNC" (when-let [timeout (:timeout node-config)] (str " WITH TIMEOUT " timeout)))))
:sync "SYNC" ))
(defn create-register-replica-query
[name node-config]

View File

@ -129,13 +129,7 @@
:replication-mode
(str "Invalid node configuration. "
"Every replication node requires "
":replication-mode to be defined."))
(throw-if-key-missing-in-any
(filter #(= (:replication-mode %) :sync) replica-nodes-configs)
:timeout
(str "Invalid node confiruation. "
"Every SYNC replication node requires "
":timeout to be defined."))))
":replication-mode to be defined."))))
(map (fn [node-config] (resolve-all-node-hostnames
(merge

View File

@ -25,7 +25,8 @@
:--storage-recover-on-startup
:--storage-wal-enabled
:--storage-snapshot-interval-sec 300
:--storage-properties-on-edges))
:--storage-properties-on-edges
:--storage-restore-replicas-on-startup false))
(defn stop-node!
[test node]

View File

@ -318,6 +318,9 @@ target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt)
add_unit_test(storage_v2_isolation_level.cpp)
target_link_libraries(${test_prefix}storage_v2_isolation_level mg-storage-v2)
add_unit_test(replication_persistence_helper.cpp)
target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2)
# Test mg-auth
if(MG_ENTERPRISE)
add_unit_test(auth.cpp)

View File

@ -2322,17 +2322,13 @@ TEST_P(CypherMainVisitorTest, ShowUsersForRole) {
void check_replication_query(Base *ast_generator, const ReplicationQuery *query, const std::string name,
const std::optional<TypedValue> socket_address, const ReplicationQuery::SyncMode sync_mode,
const std::optional<TypedValue> timeout = {}, const std::optional<TypedValue> port = {}) {
const std::optional<TypedValue> port = {}) {
EXPECT_EQ(query->replica_name_, name);
EXPECT_EQ(query->sync_mode_, sync_mode);
ASSERT_EQ(static_cast<bool>(query->socket_address_), static_cast<bool>(socket_address));
if (socket_address) {
ast_generator->CheckLiteral(query->socket_address_, *socket_address);
}
ASSERT_EQ(static_cast<bool>(query->timeout_), static_cast<bool>(timeout));
if (timeout) {
ast_generator->CheckLiteral(query->timeout_, *timeout);
}
ASSERT_EQ(static_cast<bool>(query->port_), static_cast<bool>(port));
if (port) {
ast_generator->CheckLiteral(query->port_, *port);
@ -2390,20 +2386,22 @@ TEST_P(CypherMainVisitorTest, TestSetReplicationMode) {
TEST_P(CypherMainVisitorTest, TestRegisterReplicationQuery) {
auto &ast_generator = *GetParam();
const std::string faulty_query = "REGISTER REPLICA WITH TIMEOUT TO";
const std::string faulty_query = "REGISTER REPLICA TO";
ASSERT_THROW(ast_generator.ParseQuery(faulty_query), SyntaxException);
const std::string no_timeout_query = R"(REGISTER REPLICA replica1 SYNC TO "127.0.0.1")";
auto *no_timeout_query_parsed = dynamic_cast<ReplicationQuery *>(ast_generator.ParseQuery(no_timeout_query));
ASSERT_TRUE(no_timeout_query_parsed);
check_replication_query(&ast_generator, no_timeout_query_parsed, "replica1", TypedValue("127.0.0.1"),
const std::string faulty_query_with_timeout = R"(REGISTER REPLICA replica1 SYNC WITH TIMEOUT 1.0 TO "127.0.0.1")";
ASSERT_THROW(ast_generator.ParseQuery(faulty_query_with_timeout), SyntaxException);
const std::string correct_query = R"(REGISTER REPLICA replica1 SYNC TO "127.0.0.1")";
auto *correct_query_parsed = dynamic_cast<ReplicationQuery *>(ast_generator.ParseQuery(correct_query));
check_replication_query(&ast_generator, correct_query_parsed, "replica1", TypedValue("127.0.0.1"),
ReplicationQuery::SyncMode::SYNC);
std::string full_query = R"(REGISTER REPLICA replica2 SYNC WITH TIMEOUT 0.5 TO "1.1.1.1:10000")";
std::string full_query = R"(REGISTER REPLICA replica2 SYNC TO "1.1.1.1:10000")";
auto *full_query_parsed = dynamic_cast<ReplicationQuery *>(ast_generator.ParseQuery(full_query));
ASSERT_TRUE(full_query_parsed);
check_replication_query(&ast_generator, full_query_parsed, "replica2", TypedValue("1.1.1.1:10000"),
ReplicationQuery::SyncMode::SYNC, TypedValue(0.5));
ReplicationQuery::SyncMode::SYNC);
}
TEST_P(CypherMainVisitorTest, TestDeleteReplica) {

View File

@ -1747,6 +1747,26 @@ TEST_F(FunctionTest, ToStringBool) {
EXPECT_EQ(EvaluateFunction("TOSTRING", false).ValueString(), "false");
}
TEST_F(FunctionTest, ToStringDate) {
const auto date = memgraph::utils::Date({1970, 1, 2});
EXPECT_EQ(EvaluateFunction("TOSTRING", date).ValueString(), "1970-01-02");
}
TEST_F(FunctionTest, ToStringLocalTime) {
const auto lt = memgraph::utils::LocalTime({13, 2, 40, 100, 50});
EXPECT_EQ(EvaluateFunction("TOSTRING", lt).ValueString(), "13:02:40.100050");
}
TEST_F(FunctionTest, ToStringLocalDateTime) {
const auto ldt = memgraph::utils::LocalDateTime({1970, 1, 2}, {23, 02, 59});
EXPECT_EQ(EvaluateFunction("TOSTRING", ldt).ValueString(), "1970-01-02T23:02:59.000000");
}
TEST_F(FunctionTest, ToStringDuration) {
memgraph::utils::Duration duration{{.minute = 2, .second = 2, .microsecond = 33}};
EXPECT_EQ(EvaluateFunction("TOSTRING", duration).ValueString(), "P0DT0H2M2.000033S");
}
TEST_F(FunctionTest, ToStringExceptions) { EXPECT_THROW(EvaluateFunction("TOSTRING", 1, 2, 3), QueryRuntimeException); }
TEST_F(FunctionTest, TimestampVoid) {

View File

@ -0,0 +1,91 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "formatters.hpp"
#include "utils/logging.hpp"
#include <gtest/gtest.h>
#include <fstream>
#include <iostream>
#include <optional>
#include <string>
class ReplicationPersistanceHelperTest : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
memgraph::storage::replication::ReplicaStatus CreateReplicaStatus(
std::string name, std::string ip_address, uint16_t port,
memgraph::storage::replication::ReplicationMode sync_mode, std::chrono::seconds replica_check_frequency,
std::optional<memgraph::storage::replication::ReplicationClientConfig::SSL> ssl) const {
return memgraph::storage::replication::ReplicaStatus{.name = name,
.ip_address = ip_address,
.port = port,
.sync_mode = sync_mode,
.replica_check_frequency = replica_check_frequency,
.ssl = ssl};
}
static_assert(
sizeof(memgraph::storage::replication::ReplicaStatus) == 152,
"Most likely you modified ReplicaStatus without updating the tests. Please modify CreateReplicaStatus. ");
};
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesInitialized) {
auto replicas_status = CreateReplicaStatus(
"name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1),
memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"});
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestOnlyMandatoryAttributesInitialized) {
auto replicas_status =
CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC,
std::chrono::seconds(1), std::nullopt);
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButSSLInitialized) {
auto replicas_status =
CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC,
std::chrono::seconds(1), std::nullopt);
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButTimeoutInitialized) {
auto replicas_status = CreateReplicaStatus(
"name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1),
memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"});
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}

View File

@ -39,6 +39,10 @@ class ReplicationTest : public ::testing::Test {
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}};
const std::string local_host = ("127.0.0.1");
const std::array<uint16_t, 2> ports{10000, 20000};
const std::array<std::string, 2> replicas = {"REPLICA1", "REPLICA2"};
private:
void Clear() {
if (!std::filesystem::exists(storage_directory)) return;
@ -50,11 +54,12 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
memgraph::storage::Storage main_store(configuration);
memgraph::storage::Storage replica_store(configuration);
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica("REPLICA", memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
// vertex create
@ -268,22 +273,24 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
memgraph::storage::Storage replica_store2(
{.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 20000});
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA2", memgraph::io::network::Endpoint{"127.0.0.1", 20000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
const auto *vertex_label = "label";
@ -314,7 +321,7 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
check_replica(&replica_store1);
check_replica(&replica_store2);
main_store.UnregisterReplica("REPLICA2");
main_store.UnregisterReplica(replicas[1]);
{
auto acc = main_store.Access();
auto v = acc.CreateVertex();
@ -415,16 +422,17 @@ TEST_F(ReplicationTest, RecoveryProcess) {
.storage_directory = replica_storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}});
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
ASSERT_EQ(main_store.GetReplicaState("REPLICA1"), memgraph::storage::replication::ReplicaState::RECOVERY);
ASSERT_EQ(main_store.GetReplicaState(replicas[0]), memgraph::storage::replication::ReplicaState::RECOVERY);
while (main_store.GetReplicaState("REPLICA1") != memgraph::storage::replication::ReplicaState::READY) {
while (main_store.GetReplicaState(replicas[0]) != memgraph::storage::replication::ReplicaState::READY) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
@ -484,11 +492,12 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
memgraph::storage::Storage replica_store_async(configuration);
replica_store_async.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 20000});
replica_store_async.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA_ASYNC", memgraph::io::network::Endpoint{"127.0.0.1", 20000},
memgraph::storage::replication::ReplicationMode::ASYNC)
.RegisterReplica("REPLICA_ASYNC", memgraph::io::network::Endpoint{local_host, ports[1]},
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
static constexpr size_t vertices_create_num = 10;
@ -524,20 +533,22 @@ TEST_F(ReplicationTest, EpochTest) {
memgraph::storage::Storage replica_store1(configuration);
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10001});
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, 10001});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA2", memgraph::io::network::Endpoint{"127.0.0.1", 10001},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, 10001},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
std::optional<memgraph::storage::Gid> vertex_gid;
@ -560,13 +571,14 @@ TEST_F(ReplicationTest, EpochTest) {
ASSERT_FALSE(acc.Commit().HasError());
}
main_store.UnregisterReplica("REPLICA1");
main_store.UnregisterReplica("REPLICA2");
main_store.UnregisterReplica(replicas[0]);
main_store.UnregisterReplica(replicas[1]);
replica_store1.SetMainReplicationRole();
ASSERT_FALSE(replica_store1
.RegisterReplica("REPLICA2", memgraph::io::network::Endpoint{"127.0.0.1", 10001},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, 10001},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
{
@ -588,10 +600,11 @@ TEST_F(ReplicationTest, EpochTest) {
ASSERT_FALSE(acc.Commit().HasError());
}
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
ASSERT_TRUE(main_store
.RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
{
@ -615,25 +628,27 @@ TEST_F(ReplicationTest, ReplicationInformation) {
memgraph::storage::Storage replica_store1(configuration);
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001};
replica_store1.SetReplicaRole(replica1_endpoint);
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002};
const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10002};
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(replica2_endpoint);
const std::string replica1_name{"REPLICA1"};
const std::string replica1_name{replicas[0]};
ASSERT_FALSE(main_store
.RegisterReplica(replica1_name, replica1_endpoint,
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
const std::string replica2_name{"REPLICA2"};
ASSERT_FALSE(
main_store
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
.HasError());
const std::string replica2_name{replicas[1]};
ASSERT_FALSE(main_store
.RegisterReplica(replica2_name, replica2_endpoint,
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
ASSERT_EQ(main_store.GetReplicationRole(), memgraph::storage::ReplicationRole::MAIN);
ASSERT_EQ(replica_store1.GetReplicationRole(), memgraph::storage::ReplicationRole::REPLICA);
@ -645,15 +660,12 @@ TEST_F(ReplicationTest, ReplicationInformation) {
const auto &first_info = replicas_info[0];
ASSERT_EQ(first_info.name, replica1_name);
ASSERT_EQ(first_info.mode, memgraph::storage::replication::ReplicationMode::SYNC);
ASSERT_TRUE(first_info.timeout);
ASSERT_EQ(*first_info.timeout, 2.0);
ASSERT_EQ(first_info.endpoint, replica1_endpoint);
ASSERT_EQ(first_info.state, memgraph::storage::replication::ReplicaState::READY);
const auto &second_info = replicas_info[1];
ASSERT_EQ(second_info.name, replica2_name);
ASSERT_EQ(second_info.mode, memgraph::storage::replication::ReplicationMode::ASYNC);
ASSERT_FALSE(second_info.timeout);
ASSERT_EQ(second_info.endpoint, replica2_endpoint);
ASSERT_EQ(second_info.state, memgraph::storage::replication::ReplicaState::READY);
}
@ -663,25 +675,27 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
memgraph::storage::Storage replica_store1(configuration);
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001};
replica_store1.SetReplicaRole(replica1_endpoint);
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002};
const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10002};
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(replica2_endpoint);
const std::string replica1_name{"REPLICA1"};
const std::string replica1_name{replicas[0]};
ASSERT_FALSE(main_store
.RegisterReplica(replica1_name, replica1_endpoint,
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
const std::string replica2_name{"REPLICA1"};
ASSERT_TRUE(
main_store
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::NAME_EXISTS);
const std::string replica2_name{replicas[0]};
ASSERT_TRUE(main_store
.RegisterReplica(replica2_name, replica2_endpoint,
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::NAME_EXISTS);
}
TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
@ -689,23 +703,126 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
memgraph::storage::Storage replica_store1(configuration);
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001};
replica_store1.SetReplicaRole(replica1_endpoint);
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10001};
const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10001};
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(replica2_endpoint);
const std::string replica1_name{"REPLICA1"};
const std::string replica1_name{replicas[0]};
ASSERT_FALSE(main_store
.RegisterReplica(replica1_name, replica1_endpoint,
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
const std::string replica2_name{"REPLICA2"};
ASSERT_TRUE(
main_store
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::END_POINT_EXISTS);
const std::string replica2_name{replicas[1]};
ASSERT_TRUE(main_store
.RegisterReplica(replica2_name, replica2_endpoint,
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::END_POINT_EXISTS);
}
TEST_F(ReplicationTest, RestoringReplicationAtStartupAftgerDroppingReplica) {
auto main_config = configuration;
main_config.durability.restore_replicas_on_startup = true;
auto main_store = std::make_unique<memgraph::storage::Storage>(main_config);
memgraph::storage::Storage replica_store1(configuration);
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]});
auto res = main_store->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID);
ASSERT_FALSE(res.HasError());
res = main_store->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID);
ASSERT_FALSE(res.HasError());
auto replica_infos = main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 2);
ASSERT_EQ(replica_infos[0].name, replicas[0]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[0]);
ASSERT_EQ(replica_infos[1].name, replicas[1]);
ASSERT_EQ(replica_infos[1].endpoint.address, local_host);
ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]);
main_store.reset();
auto other_main_store = std::make_unique<memgraph::storage::Storage>(main_config);
replica_infos = other_main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 2);
ASSERT_EQ(replica_infos[0].name, replicas[0]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[0]);
ASSERT_EQ(replica_infos[1].name, replicas[1]);
ASSERT_EQ(replica_infos[1].endpoint.address, local_host);
ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]);
}
TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
auto main_config = configuration;
main_config.durability.restore_replicas_on_startup = true;
auto main_store = std::make_unique<memgraph::storage::Storage>(main_config);
memgraph::storage::Storage replica_store1(configuration);
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]});
auto res = main_store->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID);
ASSERT_FALSE(res.HasError());
res = main_store->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID);
ASSERT_FALSE(res.HasError());
auto replica_infos = main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 2);
ASSERT_EQ(replica_infos[0].name, replicas[0]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[0]);
ASSERT_EQ(replica_infos[1].name, replicas[1]);
ASSERT_EQ(replica_infos[1].endpoint.address, local_host);
ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]);
const auto unregister_res = main_store->UnregisterReplica(replicas[0]);
ASSERT_TRUE(unregister_res);
replica_infos = main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 1);
ASSERT_EQ(replica_infos[0].name, replicas[1]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[1]);
main_store.reset();
auto other_main_store = std::make_unique<memgraph::storage::Storage>(main_config);
replica_infos = other_main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 1);
ASSERT_EQ(replica_infos[0].name, replicas[1]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[1]);
}
TEST_F(ReplicationTest, AddingInvalidReplica) {
memgraph::storage::Storage main_store(configuration);
ASSERT_TRUE(main_store
.RegisterReplica("REPLICA", memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::CONNECTION_FAILED);
}

View File

@ -609,3 +609,73 @@ TEST(TemporalTest, LocalDateTimeDelta) {
ASSERT_EQ(two_years_after_unix_epoch - unix_epoch,
memgraph::utils::Duration({.day = 761, .millisecond = 20, .microsecond = 34}));
}
TEST(TemporalTest, DateConvertsToString) {
const auto date1 = memgraph::utils::Date({1970, 1, 2});
const std::string date1_expected_str = "1970-01-02";
const auto date2 = memgraph::utils::Date({0000, 1, 1});
const std::string date2_expected_str = "0000-01-01";
const auto date3 = memgraph::utils::Date({2022, 7, 4});
const std::string date3_expected_str = "2022-07-04";
ASSERT_EQ(date1_expected_str, date1.ToString());
ASSERT_EQ(date2_expected_str, date2.ToString());
ASSERT_EQ(date3_expected_str, date3.ToString());
}
TEST(TemporalTest, LocalTimeConvertsToString) {
const auto lt1 = memgraph::utils::LocalTime({13, 2, 40, 100, 50});
const std::string lt1_expected_str = "13:02:40.100050";
const auto lt2 = memgraph::utils::LocalTime({13, 2, 40});
const std::string lt2_expected_str = "13:02:40.000000";
const auto lt3 = memgraph::utils::LocalTime({0, 0, 0});
const std::string lt3_expected_str = "00:00:00.000000";
const auto lt4 = memgraph::utils::LocalTime({3, 2, 4, 6, 7});
const std::string lt4_expected_str = "03:02:04.006007";
ASSERT_EQ(lt1_expected_str, lt1.ToString());
ASSERT_EQ(lt2_expected_str, lt2.ToString());
ASSERT_EQ(lt3_expected_str, lt3.ToString());
ASSERT_EQ(lt4_expected_str, lt4.ToString());
}
TEST(TemporalTest, LocalDateTimeConvertsToString) {
const auto ldt1 = memgraph::utils::LocalDateTime({1970, 1, 2}, {23, 02, 59});
const std::string ldt1_expected_str = "1970-01-02T23:02:59.000000";
const auto ldt2 = memgraph::utils::LocalDateTime({1970, 1, 2}, {23, 02, 59, 456, 123});
const std::string ldt2_expected_str = "1970-01-02T23:02:59.456123";
const auto ldt3 = memgraph::utils::LocalDateTime({1997, 8, 24}, {16, 32, 9});
const std::string ldt3_expected_str = "1997-08-24T16:32:09.000000";
ASSERT_EQ(ldt1_expected_str, ldt1.ToString());
ASSERT_EQ(ldt2_expected_str, ldt2.ToString());
ASSERT_EQ(ldt3_expected_str, ldt3.ToString());
}
TEST(TemporalTest, DurationConvertsToString) {
memgraph::utils::Duration duration1{{.minute = 2, .second = 2, .microsecond = 33}};
const std::string duration1_expected_str = "P0DT0H2M2.000033S";
memgraph::utils::Duration duration2{{.hour = 2.5, .minute = 2, .second = 2, .microsecond = 33}};
const std::string duration2_expected_str = "P0DT2H32M2.000033S";
memgraph::utils::Duration duration3{{.hour = 1.25, .minute = 2, .second = 2}};
const std::string duration3_expected_str = "P0DT1H17M2.000000S";
memgraph::utils::Duration duration4{{.day = 20, .hour = 1.25, .minute = 2, .second = 2}};
const std::string duration4_expected_str = "P20DT1H17M2.000000S";
memgraph::utils::Duration duration5{{.hour = -3, .minute = 2, .second = 2, .microsecond = -33}};
const std::string duration5_expected_str = "P0DT-2H-57M-58.000033S";
memgraph::utils::Duration duration6{{.day = -2, .hour = 3, .minute = 2, .second = 2, .microsecond = 33}};
const std::string duration6_expected_str = "P-1DT-20H-57M-57.999967S";
memgraph::utils::Duration duration7{{.day = 20, .hour = 72, .minute = 154, .second = 312}};
const std::string duration7_expected_str = "P23DT2H39M12.000000S";
memgraph::utils::Duration duration8{{.day = 1, .hour = 23, .minute = 59, .second = 60}};
const std::string duration8_expected_str = "P2DT0H0M0.000000S";
ASSERT_EQ(duration1_expected_str, duration1.ToString());
ASSERT_EQ(duration2_expected_str, duration2.ToString());
ASSERT_EQ(duration3_expected_str, duration3.ToString());
ASSERT_EQ(duration4_expected_str, duration4.ToString());
ASSERT_EQ(duration5_expected_str, duration5.ToString());
ASSERT_EQ(duration6_expected_str, duration6.ToString());
ASSERT_EQ(duration7_expected_str, duration7.ToString());
ASSERT_EQ(duration8_expected_str, duration8.ToString());
}