Compare commits
11 Commits
master
...
fix-commit
Author | SHA1 | Date | |
---|---|---|---|
|
2d109a13d6 | ||
|
ed71d01aa3 | ||
|
15afb4b5c2 | ||
|
05f3e7d243 | ||
|
7ac7b7c483 | ||
|
8734d8d1a9 | ||
|
5df607f1e1 | ||
|
e32adaa18c | ||
|
28eef7edf4 | ||
|
e662206fd1 | ||
|
d47cc290ef |
@ -324,7 +324,7 @@ class DbAccessor final {
|
|||||||
|
|
||||||
void AdvanceCommand() { accessor_->AdvanceCommand(); }
|
void AdvanceCommand() { accessor_->AdvanceCommand(); }
|
||||||
|
|
||||||
utils::BasicResult<storage::ConstraintViolation, void> Commit() { return accessor_->Commit(); }
|
utils::BasicResult<storage::CommitError, void> Commit() { return accessor_->Commit(); }
|
||||||
|
|
||||||
void Abort() { accessor_->Abort(); }
|
void Abort() { accessor_->Abort(); }
|
||||||
|
|
||||||
|
@ -2209,25 +2209,39 @@ void RunTriggersIndividually(const utils::SkipList<Trigger> &triggers, Interpret
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto maybe_constraint_violation = db_accessor.Commit();
|
auto maybe_commit_error = db_accessor.Commit();
|
||||||
if (maybe_constraint_violation.HasError()) {
|
if (maybe_commit_error.HasError()) {
|
||||||
const auto &constraint_violation = maybe_constraint_violation.GetError();
|
const auto &commit_error = maybe_commit_error.GetError();
|
||||||
switch (constraint_violation.type) {
|
switch (commit_error.type) {
|
||||||
case storage::ConstraintViolation::Type::EXISTENCE: {
|
case storage::CommitError::Type::UNABLE_TO_SYNC_REPLICATE: {
|
||||||
const auto &label_name = db_accessor.LabelToName(constraint_violation.label);
|
// TODO(gitbuda): This is tricky because this is an internal
|
||||||
MG_ASSERT(constraint_violation.properties.size() == 1U);
|
// operation. Consider stopping main Memgraph instance here.
|
||||||
const auto &property_name = db_accessor.PropertyToName(*constraint_violation.properties.begin());
|
spdlog::warn("Trigger '{}' failed to commit due to inability to replicate data to SYNC replica",
|
||||||
spdlog::warn("Trigger '{}' failed to commit due to existence constraint violation on :{}({})", trigger.Name(),
|
trigger.Name());
|
||||||
label_name, property_name);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case storage::ConstraintViolation::Type::UNIQUE: {
|
case storage::CommitError::Type::CONSTRAINT_VIOLATION: {
|
||||||
const auto &label_name = db_accessor.LabelToName(constraint_violation.label);
|
MG_ASSERT(commit_error.maybe_constraint_violation.has_value());
|
||||||
std::stringstream property_names_stream;
|
const auto &constraint_violation = *commit_error.maybe_constraint_violation;
|
||||||
utils::PrintIterable(property_names_stream, constraint_violation.properties, ", ",
|
switch (constraint_violation.type) {
|
||||||
[&](auto &stream, const auto &prop) { stream << db_accessor.PropertyToName(prop); });
|
case storage::ConstraintViolation::Type::EXISTENCE: {
|
||||||
spdlog::warn("Trigger '{}' failed to commit due to unique constraint violation on :{}({})", trigger.Name(),
|
const auto &label_name = db_accessor.LabelToName(constraint_violation.label);
|
||||||
label_name, property_names_stream.str());
|
MG_ASSERT(constraint_violation.properties.size() == 1U);
|
||||||
|
const auto &property_name = db_accessor.PropertyToName(*constraint_violation.properties.begin());
|
||||||
|
spdlog::warn("Trigger '{}' failed to commit due to existence constraint violation on :{}({})",
|
||||||
|
trigger.Name(), label_name, property_name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case storage::ConstraintViolation::Type::UNIQUE: {
|
||||||
|
const auto &label_name = db_accessor.LabelToName(constraint_violation.label);
|
||||||
|
std::stringstream property_names_stream;
|
||||||
|
utils::PrintIterable(property_names_stream, constraint_violation.properties, ", ",
|
||||||
|
[&](auto &stream, const auto &prop) { stream << db_accessor.PropertyToName(prop); });
|
||||||
|
spdlog::warn("Trigger '{}' failed to commit due to unique constraint violation on :{}({})",
|
||||||
|
trigger.Name(), label_name, property_names_stream.str());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2272,28 +2286,40 @@ void Interpreter::Commit() {
|
|||||||
trigger_context_collector_.reset();
|
trigger_context_collector_.reset();
|
||||||
};
|
};
|
||||||
|
|
||||||
auto maybe_constraint_violation = db_accessor_->Commit();
|
auto maybe_commit_error = db_accessor_->Commit();
|
||||||
if (maybe_constraint_violation.HasError()) {
|
if (maybe_commit_error.HasError()) {
|
||||||
const auto &constraint_violation = maybe_constraint_violation.GetError();
|
const auto &commit_error = maybe_commit_error.GetError();
|
||||||
switch (constraint_violation.type) {
|
switch (commit_error.type) {
|
||||||
case storage::ConstraintViolation::Type::EXISTENCE: {
|
case storage::CommitError::Type::UNABLE_TO_SYNC_REPLICATE: {
|
||||||
auto label_name = execution_db_accessor_->LabelToName(constraint_violation.label);
|
|
||||||
MG_ASSERT(constraint_violation.properties.size() == 1U);
|
|
||||||
auto property_name = execution_db_accessor_->PropertyToName(*constraint_violation.properties.begin());
|
|
||||||
reset_necessary_members();
|
reset_necessary_members();
|
||||||
throw QueryException("Unable to commit due to existence constraint violation on :{}({})", label_name,
|
throw QueryException("Unable to commit due to inability to replicate to SYNC replica");
|
||||||
property_name);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case storage::ConstraintViolation::Type::UNIQUE: {
|
case storage::CommitError::Type::CONSTRAINT_VIOLATION: {
|
||||||
auto label_name = execution_db_accessor_->LabelToName(constraint_violation.label);
|
MG_ASSERT(commit_error.maybe_constraint_violation.has_value());
|
||||||
std::stringstream property_names_stream;
|
const auto &constraint_violation = *commit_error.maybe_constraint_violation;
|
||||||
utils::PrintIterable(
|
switch (constraint_violation.type) {
|
||||||
property_names_stream, constraint_violation.properties, ", ",
|
case storage::ConstraintViolation::Type::EXISTENCE: {
|
||||||
[this](auto &stream, const auto &prop) { stream << execution_db_accessor_->PropertyToName(prop); });
|
auto label_name = execution_db_accessor_->LabelToName(constraint_violation.label);
|
||||||
reset_necessary_members();
|
MG_ASSERT(constraint_violation.properties.size() == 1U);
|
||||||
throw QueryException("Unable to commit due to unique constraint violation on :{}({})", label_name,
|
auto property_name = execution_db_accessor_->PropertyToName(*constraint_violation.properties.begin());
|
||||||
property_names_stream.str());
|
reset_necessary_members();
|
||||||
|
throw QueryException("Unable to commit due to existence constraint violation on :{}({})", label_name,
|
||||||
|
property_name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case storage::ConstraintViolation::Type::UNIQUE: {
|
||||||
|
auto label_name = execution_db_accessor_->LabelToName(constraint_violation.label);
|
||||||
|
std::stringstream property_names_stream;
|
||||||
|
utils::PrintIterable(
|
||||||
|
property_names_stream, constraint_violation.properties, ", ",
|
||||||
|
[this](auto &stream, const auto &prop) { stream << execution_db_accessor_->PropertyToName(prop); });
|
||||||
|
reset_necessary_members();
|
||||||
|
throw QueryException("Unable to commit due to unique constraint violation on :{}({})", label_name,
|
||||||
|
property_names_stream.str());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
set(storage_v2_src_files
|
set(storage_v2_src_files
|
||||||
commit_log.cpp
|
commit_log.cpp
|
||||||
|
commit_error.cpp
|
||||||
constraints.cpp
|
constraints.cpp
|
||||||
temporal.cpp
|
temporal.cpp
|
||||||
durability/durability.cpp
|
durability/durability.cpp
|
||||||
|
20
src/storage/v2/commit_error.cpp
Normal file
20
src/storage/v2/commit_error.cpp
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
// Copyright 2022 Memgraph Ltd.
|
||||||
|
//
|
||||||
|
// Use of this software is governed by the Business Source License
|
||||||
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
// License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
//
|
||||||
|
// As of the Change Date specified in that file, in accordance with
|
||||||
|
// the Business Source License, use of this software will be governed
|
||||||
|
// by the Apache License, Version 2.0, included in the file
|
||||||
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
#include "storage/v2/commit_error.hpp"
|
||||||
|
|
||||||
|
namespace memgraph::storage {
|
||||||
|
|
||||||
|
bool operator==(const CommitError &lhs, const CommitError &rhs) {
|
||||||
|
return lhs.type == rhs.type && lhs.maybe_constraint_violation == rhs.maybe_constraint_violation;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace memgraph::storage
|
32
src/storage/v2/commit_error.hpp
Normal file
32
src/storage/v2/commit_error.hpp
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
// Copyright 2022 Memgraph Ltd.
|
||||||
|
//
|
||||||
|
// Use of this software is governed by the Business Source License
|
||||||
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
// License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
//
|
||||||
|
// As of the Change Date specified in that file, in accordance with
|
||||||
|
// the Business Source License, use of this software will be governed
|
||||||
|
// by the Apache License, Version 2.0, included in the file
|
||||||
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <optional>
|
||||||
|
|
||||||
|
#include "storage/v2/constraints.hpp"
|
||||||
|
|
||||||
|
namespace memgraph::storage {
|
||||||
|
|
||||||
|
struct CommitError {
|
||||||
|
enum class Type {
|
||||||
|
CONSTRAINT_VIOLATION,
|
||||||
|
UNABLE_TO_SYNC_REPLICATE,
|
||||||
|
};
|
||||||
|
Type type;
|
||||||
|
|
||||||
|
std::optional<ConstraintViolation> maybe_constraint_violation;
|
||||||
|
};
|
||||||
|
|
||||||
|
bool operator==(const CommitError &lhs, const CommitError &rhs);
|
||||||
|
|
||||||
|
} // namespace memgraph::storage
|
@ -227,17 +227,18 @@ void Storage::ReplicationClient::IfStreamingTransaction(const std::function<void
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Storage::ReplicationClient::FinalizeTransactionReplication() {
|
std::optional<bool> Storage::ReplicationClient::FinalizeTransactionReplication() {
|
||||||
// We can only check the state because it guarantees to be only
|
// We can only check the state because it guarantees to be only
|
||||||
// valid during a single transaction replication (if the assumption
|
// valid during a single transaction replication (if the assumption
|
||||||
// that this and other transaction replication functions can only be
|
// that this and other transaction replication functions can only be
|
||||||
// called from a one thread stands)
|
// called from a one thread stands)
|
||||||
if (replica_state_ != replication::ReplicaState::REPLICATING) {
|
if (replica_state_ != replication::ReplicaState::REPLICATING) {
|
||||||
return;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mode_ == replication::ReplicationMode::ASYNC) {
|
if (mode_ == replication::ReplicationMode::ASYNC) {
|
||||||
thread_pool_.AddTask([this] { this->FinalizeTransactionReplicationInternal(); });
|
thread_pool_.AddTask([this] { [[maybe_unused]] auto finalized = this->FinalizeTransactionReplicationInternal(); });
|
||||||
|
return true;
|
||||||
} else if (timeout_) {
|
} else if (timeout_) {
|
||||||
MG_ASSERT(mode_ == replication::ReplicationMode::SYNC, "Only SYNC replica can have a timeout.");
|
MG_ASSERT(mode_ == replication::ReplicationMode::SYNC, "Only SYNC replica can have a timeout.");
|
||||||
MG_ASSERT(timeout_dispatcher_, "Timeout thread is missing");
|
MG_ASSERT(timeout_dispatcher_, "Timeout thread is missing");
|
||||||
@ -245,7 +246,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
|
|||||||
|
|
||||||
timeout_dispatcher_->active = true;
|
timeout_dispatcher_->active = true;
|
||||||
thread_pool_.AddTask([&, this] {
|
thread_pool_.AddTask([&, this] {
|
||||||
this->FinalizeTransactionReplicationInternal();
|
[[maybe_unused]] auto finalized = this->FinalizeTransactionReplicationInternal();
|
||||||
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
|
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
|
||||||
// TimerThread can finish waiting for timeout
|
// TimerThread can finish waiting for timeout
|
||||||
timeout_dispatcher_->active = false;
|
timeout_dispatcher_->active = false;
|
||||||
@ -273,12 +274,13 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
|
|||||||
// and acces the `active` variable`
|
// and acces the `active` variable`
|
||||||
thread_pool_.AddTask([this] { timeout_dispatcher_.reset(); });
|
thread_pool_.AddTask([this] { timeout_dispatcher_.reset(); });
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
FinalizeTransactionReplicationInternal();
|
return FinalizeTransactionReplicationInternal();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
|
bool Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
|
||||||
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
|
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
|
||||||
try {
|
try {
|
||||||
auto response = replica_stream_->Finalize();
|
auto response = replica_stream_->Finalize();
|
||||||
@ -289,6 +291,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
|
|||||||
thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); });
|
thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); });
|
||||||
} else {
|
} else {
|
||||||
replica_state_.store(replication::ReplicaState::READY);
|
replica_state_.store(replication::ReplicaState::READY);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
} catch (const rpc::RpcFailedException &) {
|
} catch (const rpc::RpcFailedException &) {
|
||||||
replica_stream_.reset();
|
replica_stream_.reset();
|
||||||
@ -298,6 +301,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
|
|||||||
}
|
}
|
||||||
HandleRpcFailure();
|
HandleRpcFailure();
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
|
void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
|
||||||
|
@ -103,7 +103,10 @@ class Storage::ReplicationClient {
|
|||||||
// StartTransactionReplication, stream is created.
|
// StartTransactionReplication, stream is created.
|
||||||
void IfStreamingTransaction(const std::function<void(ReplicaStream &handler)> &callback);
|
void IfStreamingTransaction(const std::function<void(ReplicaStream &handler)> &callback);
|
||||||
|
|
||||||
void FinalizeTransactionReplication();
|
// Return none -> OK
|
||||||
|
// Return true -> OK
|
||||||
|
// Return false -> FAIL
|
||||||
|
std::optional<bool> FinalizeTransactionReplication();
|
||||||
|
|
||||||
// Transfer the snapshot file.
|
// Transfer the snapshot file.
|
||||||
// @param path Path of the snapshot file.
|
// @param path Path of the snapshot file.
|
||||||
@ -125,7 +128,7 @@ class Storage::ReplicationClient {
|
|||||||
const auto &Endpoint() const { return rpc_client_->Endpoint(); }
|
const auto &Endpoint() const { return rpc_client_->Endpoint(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void FinalizeTransactionReplicationInternal();
|
[[nodiscard]] bool FinalizeTransactionReplicationInternal();
|
||||||
|
|
||||||
void RecoverReplica(uint64_t replica_commit);
|
void RecoverReplica(uint64_t replica_commit);
|
||||||
|
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
#include "utils/uuid.hpp"
|
#include "utils/uuid.hpp"
|
||||||
|
|
||||||
/// REPLICATION ///
|
/// REPLICATION ///
|
||||||
|
#include "storage/v2/commit_error.hpp"
|
||||||
#include "storage/v2/replication/replication_client.hpp"
|
#include "storage/v2/replication/replication_client.hpp"
|
||||||
#include "storage/v2/replication/replication_server.hpp"
|
#include "storage/v2/replication/replication_server.hpp"
|
||||||
#include "storage/v2/replication/rpc.hpp"
|
#include "storage/v2/replication/rpc.hpp"
|
||||||
@ -813,7 +814,7 @@ EdgeTypeId Storage::Accessor::NameToEdgeType(const std::string_view &name) { ret
|
|||||||
|
|
||||||
void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; }
|
void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; }
|
||||||
|
|
||||||
utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
|
utils::BasicResult<CommitError, void> Storage::Accessor::Commit(
|
||||||
const std::optional<uint64_t> desired_commit_timestamp) {
|
const std::optional<uint64_t> desired_commit_timestamp) {
|
||||||
MG_ASSERT(is_transaction_active_, "The transaction is already terminated!");
|
MG_ASSERT(is_transaction_active_, "The transaction is already terminated!");
|
||||||
MG_ASSERT(!transaction_.must_abort, "The transaction can't be committed!");
|
MG_ASSERT(!transaction_.must_abort, "The transaction can't be committed!");
|
||||||
@ -836,7 +837,8 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
|
|||||||
auto validation_result = ValidateExistenceConstraints(*prev.vertex, storage_->constraints_);
|
auto validation_result = ValidateExistenceConstraints(*prev.vertex, storage_->constraints_);
|
||||||
if (validation_result) {
|
if (validation_result) {
|
||||||
Abort();
|
Abort();
|
||||||
return *validation_result;
|
return CommitError{.type = CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
.maybe_constraint_violation = *validation_result};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -848,6 +850,29 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
|
|||||||
// Save these so we can mark them used in the commit log.
|
// Save these so we can mark them used in the commit log.
|
||||||
uint64_t start_timestamp = transaction_.start_timestamp;
|
uint64_t start_timestamp = transaction_.start_timestamp;
|
||||||
|
|
||||||
|
// Aboring here and after the locked block obviously has an issue if
|
||||||
|
// replica goes down and back up during the execution of the locked block of
|
||||||
|
// code.
|
||||||
|
// Not enough, before the actual commit check all SYNC replicas for availability.
|
||||||
|
bool unable_to_sync_replicate = false;
|
||||||
|
const auto check_replicas = [&]() {
|
||||||
|
storage_->replication_clients_.WithLock([&](auto &clients) {
|
||||||
|
for (auto &client : clients) {
|
||||||
|
// Exclusively SYNC replicas have to be available to commit the transaction.
|
||||||
|
if (client->Mode() == replication::ReplicationMode::SYNC && !client->Timeout().has_value() &&
|
||||||
|
client->State() == replication::ReplicaState::INVALID) {
|
||||||
|
unable_to_sync_replicate = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
check_replicas();
|
||||||
|
if (unable_to_sync_replicate) {
|
||||||
|
Abort();
|
||||||
|
return CommitError{.type = CommitError::Type::UNABLE_TO_SYNC_REPLICATE};
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
|
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
|
||||||
commit_timestamp_.emplace(storage_->CommitTimestamp(desired_commit_timestamp));
|
commit_timestamp_.emplace(storage_->CommitTimestamp(desired_commit_timestamp));
|
||||||
@ -893,6 +918,8 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
|
|||||||
// Replica can log only the write transaction received from Main
|
// Replica can log only the write transaction received from Main
|
||||||
// so the Wal files are consistent
|
// so the Wal files are consistent
|
||||||
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
|
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
|
||||||
|
// TODO(gitbuda): Possible to abort data operation because in this context there is an abort operation.
|
||||||
|
// TODO(gitbuda): If AppendToWal returns false, we can exit this block and Abort.
|
||||||
storage_->AppendToWal(transaction_, *commit_timestamp_);
|
storage_->AppendToWal(transaction_, *commit_timestamp_);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -915,13 +942,27 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
|
|||||||
engine_guard.unlock();
|
engine_guard.unlock();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// NOTE: This will finish/commit the transaction.
|
||||||
storage_->commit_log_->MarkFinished(start_timestamp);
|
storage_->commit_log_->MarkFinished(start_timestamp);
|
||||||
|
// TODO(gitbuda): Maybe the solution here is to 1. mark 2. check all
|
||||||
|
// the replicas 3. unmark if required... it's not possible to unmark
|
||||||
|
// easily because mark contains mark + update_latest_active which is
|
||||||
|
// not reversable operation
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(gitbuda): This doesn't have any effect because (it will only if the
|
||||||
|
// constraints are also violated).
|
||||||
|
check_replicas();
|
||||||
|
if (unable_to_sync_replicate) {
|
||||||
|
Abort();
|
||||||
|
return CommitError{.type = CommitError::Type::UNABLE_TO_SYNC_REPLICATE};
|
||||||
|
}
|
||||||
|
|
||||||
if (unique_constraint_violation) {
|
if (unique_constraint_violation) {
|
||||||
Abort();
|
Abort();
|
||||||
return *unique_constraint_violation;
|
return CommitError{.type = CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
.maybe_constraint_violation = *unique_constraint_violation};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
is_transaction_active_ = false;
|
is_transaction_active_ = false;
|
||||||
@ -1124,6 +1165,18 @@ EdgeTypeId Storage::NameToEdgeType(const std::string_view &name) {
|
|||||||
return EdgeTypeId::FromUint(name_id_mapper_.NameToId(name));
|
return EdgeTypeId::FromUint(name_id_mapper_.NameToId(name));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(gitbuda): Hard to abort global operations in SYNC replication mode
|
||||||
|
// because there is no an abort op for that yet, one idea is to just apply
|
||||||
|
// reverse operation, e.g., CreateIndex <-> DropIndex.
|
||||||
|
//
|
||||||
|
// Another idea is to double check that all replicas have relevant data prior
|
||||||
|
// to calling MarkFinished. That approach would work in both data replication
|
||||||
|
// and global operation cases.
|
||||||
|
//
|
||||||
|
// EDGE CASE 1: What if the first SYNC replica is alive, receives the delta
|
||||||
|
// object, while the second SYNC replica is dead? (replication clients are
|
||||||
|
// stored in a vector and accessed one by one)
|
||||||
|
|
||||||
bool Storage::CreateIndex(LabelId label, const std::optional<uint64_t> desired_commit_timestamp) {
|
bool Storage::CreateIndex(LabelId label, const std::optional<uint64_t> desired_commit_timestamp) {
|
||||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||||
if (!indices_.label_index.CreateIndex(label, vertices_.access())) return false;
|
if (!indices_.label_index.CreateIndex(label, vertices_.access())) return false;
|
||||||
@ -1572,6 +1625,9 @@ void Storage::FinalizeWalFile() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(gitbuda): Hard to abort data operation in SYNC replication mode because:
|
||||||
|
// * Just calling Abort inside AppendToWal for some reason causes infinite loop.
|
||||||
|
|
||||||
void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_timestamp) {
|
void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_timestamp) {
|
||||||
if (!InitializeWalFile()) return;
|
if (!InitializeWalFile()) return;
|
||||||
// Traverse deltas and append them to the WAL file.
|
// Traverse deltas and append them to the WAL file.
|
||||||
@ -1743,10 +1799,15 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_
|
|||||||
FinalizeWalFile();
|
FinalizeWalFile();
|
||||||
|
|
||||||
replication_clients_.WithLock([&](auto &clients) {
|
replication_clients_.WithLock([&](auto &clients) {
|
||||||
|
bool all_sync_replicas_ok = true;
|
||||||
for (auto &client : clients) {
|
for (auto &client : clients) {
|
||||||
|
// TODO(gitbuda): SEMI-SYNC should be exculded from here.
|
||||||
|
if (client->Mode() == replication::ReplicationMode::SYNC)
|
||||||
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(final_commit_timestamp); });
|
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(final_commit_timestamp); });
|
||||||
|
// TODO(gitbuda): FinalizeTransactionReplication should also indicate that eveything went well for SYNC replicas.
|
||||||
client->FinalizeTransactionReplication();
|
client->FinalizeTransactionReplication();
|
||||||
}
|
}
|
||||||
|
return all_sync_replicas_ok;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,6 +43,7 @@
|
|||||||
|
|
||||||
/// REPLICATION ///
|
/// REPLICATION ///
|
||||||
#include "rpc/server.hpp"
|
#include "rpc/server.hpp"
|
||||||
|
#include "storage/v2/commit_error.hpp"
|
||||||
#include "storage/v2/replication/config.hpp"
|
#include "storage/v2/replication/config.hpp"
|
||||||
#include "storage/v2/replication/enums.hpp"
|
#include "storage/v2/replication/enums.hpp"
|
||||||
#include "storage/v2/replication/rpc.hpp"
|
#include "storage/v2/replication/rpc.hpp"
|
||||||
@ -308,11 +309,12 @@ class Storage final {
|
|||||||
|
|
||||||
void AdvanceCommand();
|
void AdvanceCommand();
|
||||||
|
|
||||||
/// Commit returns `ConstraintViolation` if the changes made by this
|
/// Commit returns `CommitError` if the changes made by this transaction
|
||||||
/// transaction violate an existence or unique constraint. In that case the
|
/// violate an existence, unique constraint or data could NOT be replicated
|
||||||
/// transaction is automatically aborted. Otherwise, void is returned.
|
/// to SYNC replica. In that case the transaction is automatically aborted.
|
||||||
|
/// Otherwise, void is returned.
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
utils::BasicResult<ConstraintViolation, void> Commit(std::optional<uint64_t> desired_commit_timestamp = {});
|
utils::BasicResult<CommitError, void> Commit(std::optional<uint64_t> desired_commit_timestamp = {});
|
||||||
|
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
void Abort();
|
void Abort();
|
||||||
|
@ -137,8 +137,10 @@ TEST_F(ConstraintsTest, ExistenceConstraintsViolationOnCommit) {
|
|||||||
|
|
||||||
auto res = acc.Commit();
|
auto res = acc.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(),
|
EXPECT_EQ(
|
||||||
(ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label1, std::set<PropertyId>{prop1}}));
|
res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label1, std::set<PropertyId>{prop1}}}));
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -157,8 +159,10 @@ TEST_F(ConstraintsTest, ExistenceConstraintsViolationOnCommit) {
|
|||||||
|
|
||||||
auto res = acc.Commit();
|
auto res = acc.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(),
|
EXPECT_EQ(
|
||||||
(ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label1, std::set<PropertyId>{prop1}}));
|
res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label1, std::set<PropertyId>{prop1}}}));
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -458,8 +462,10 @@ TEST_F(ConstraintsTest, UniqueConstraintsViolationOnCommit1) {
|
|||||||
ASSERT_NO_ERROR(vertex2.SetProperty(prop1, PropertyValue(1)));
|
ASSERT_NO_ERROR(vertex2.SetProperty(prop1, PropertyValue(1)));
|
||||||
auto res = acc.Commit();
|
auto res = acc.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(),
|
EXPECT_EQ(
|
||||||
(ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set<PropertyId>{prop1}}));
|
res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set<PropertyId>{prop1}}}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -500,8 +506,10 @@ TEST_F(ConstraintsTest, UniqueConstraintsViolationOnCommit2) {
|
|||||||
ASSERT_NO_ERROR(acc2.Commit());
|
ASSERT_NO_ERROR(acc2.Commit());
|
||||||
auto res = acc3.Commit();
|
auto res = acc3.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(),
|
EXPECT_EQ(
|
||||||
(ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set<PropertyId>{prop1}}));
|
res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set<PropertyId>{prop1}}}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -545,12 +553,16 @@ TEST_F(ConstraintsTest, UniqueConstraintsViolationOnCommit3) {
|
|||||||
|
|
||||||
auto res = acc2.Commit();
|
auto res = acc2.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(),
|
EXPECT_EQ(
|
||||||
(ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set<PropertyId>{prop1}}));
|
res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set<PropertyId>{prop1}}}));
|
||||||
res = acc3.Commit();
|
res = acc3.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(),
|
EXPECT_EQ(
|
||||||
(ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set<PropertyId>{prop1}}));
|
res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set<PropertyId>{prop1}}}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -620,7 +632,9 @@ TEST_F(ConstraintsTest, UniqueConstraintsLabelAlteration) {
|
|||||||
|
|
||||||
auto res = acc.Commit();
|
auto res = acc.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}}));
|
EXPECT_EQ(res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}}}));
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -654,7 +668,9 @@ TEST_F(ConstraintsTest, UniqueConstraintsLabelAlteration) {
|
|||||||
|
|
||||||
auto res = acc1.Commit();
|
auto res = acc1.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}}));
|
EXPECT_EQ(res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}}}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -749,8 +765,9 @@ TEST_F(ConstraintsTest, UniqueConstraintsMultipleProperties) {
|
|||||||
ASSERT_NO_ERROR(vertex2->SetProperty(prop2, PropertyValue(2)));
|
ASSERT_NO_ERROR(vertex2->SetProperty(prop2, PropertyValue(2)));
|
||||||
auto res = acc.Commit();
|
auto res = acc.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(),
|
EXPECT_EQ(res.GetError(), (CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
(ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set<PropertyId>{prop1, prop2}}));
|
ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1,
|
||||||
|
std::set<PropertyId>{prop1, prop2}}}));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then change the second property of both vertex to null. Property values of
|
// Then change the second property of both vertex to null. Property values of
|
||||||
@ -861,7 +878,9 @@ TEST_F(ConstraintsTest, UniqueConstraintsInsertRemoveAbortInsert) {
|
|||||||
|
|
||||||
auto res = acc.Commit();
|
auto res = acc.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1, prop2}}));
|
EXPECT_EQ(res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1, prop2}}}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -900,7 +919,9 @@ TEST_F(ConstraintsTest, UniqueConstraintsDeleteVertexSetProperty) {
|
|||||||
|
|
||||||
auto res = acc1.Commit();
|
auto res = acc1.Commit();
|
||||||
ASSERT_TRUE(res.HasError());
|
ASSERT_TRUE(res.HasError());
|
||||||
EXPECT_EQ(res.GetError(), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}}));
|
EXPECT_EQ(res.GetError(),
|
||||||
|
(CommitError{CommitError::Type::CONSTRAINT_VIOLATION,
|
||||||
|
ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}}}));
|
||||||
|
|
||||||
ASSERT_NO_ERROR(acc2.Commit());
|
ASSERT_NO_ERROR(acc2.Commit());
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user