Send Bolt success messages only after DB operations run successfully (#1556)
This commit is contained in:
parent
953a8f5340
commit
64e5428d94
@ -367,14 +367,16 @@ State HandleReset(TSession &session, const Marker marker) {
|
|||||||
return State::Close;
|
return State::Close;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!session.encoder_.MessageSuccess()) {
|
try {
|
||||||
spdlog::trace("Couldn't send success message!");
|
session.Abort();
|
||||||
return State::Close;
|
if (!session.encoder_.MessageSuccess({})) {
|
||||||
|
spdlog::trace("Couldn't send success message!");
|
||||||
|
return State::Close;
|
||||||
|
}
|
||||||
|
return State::Idle;
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
return HandleFailure(session, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
session.Abort();
|
|
||||||
|
|
||||||
return State::Idle;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename TSession>
|
template <typename TSession>
|
||||||
@ -397,19 +399,17 @@ State HandleBegin(TSession &session, const State state, const Marker marker) {
|
|||||||
|
|
||||||
DMG_ASSERT(!session.encoder_buffer_.HasData(), "There should be no data to write in this state");
|
DMG_ASSERT(!session.encoder_buffer_.HasData(), "There should be no data to write in this state");
|
||||||
|
|
||||||
if (!session.encoder_.MessageSuccess({})) {
|
|
||||||
spdlog::trace("Couldn't send success message!");
|
|
||||||
return State::Close;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
session.Configure(extra.ValueMap());
|
session.Configure(extra.ValueMap());
|
||||||
session.BeginTransaction(extra.ValueMap());
|
session.BeginTransaction(extra.ValueMap());
|
||||||
|
if (!session.encoder_.MessageSuccess({})) {
|
||||||
|
spdlog::trace("Couldn't send success message!");
|
||||||
|
return State::Close;
|
||||||
|
}
|
||||||
|
return State::Idle;
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
return HandleFailure(session, e);
|
return HandleFailure(session, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
return State::Idle;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename TSession>
|
template <typename TSession>
|
||||||
@ -427,11 +427,11 @@ State HandleCommit(TSession &session, const State state, const Marker marker) {
|
|||||||
DMG_ASSERT(!session.encoder_buffer_.HasData(), "There should be no data to write in this state");
|
DMG_ASSERT(!session.encoder_buffer_.HasData(), "There should be no data to write in this state");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
session.CommitTransaction();
|
||||||
if (!session.encoder_.MessageSuccess({})) {
|
if (!session.encoder_.MessageSuccess({})) {
|
||||||
spdlog::trace("Couldn't send success message!");
|
spdlog::trace("Couldn't send success message!");
|
||||||
return State::Close;
|
return State::Close;
|
||||||
}
|
}
|
||||||
session.CommitTransaction();
|
|
||||||
return State::Idle;
|
return State::Idle;
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
return HandleFailure(session, e);
|
return HandleFailure(session, e);
|
||||||
@ -453,11 +453,11 @@ State HandleRollback(TSession &session, const State state, const Marker marker)
|
|||||||
DMG_ASSERT(!session.encoder_buffer_.HasData(), "There should be no data to write in this state");
|
DMG_ASSERT(!session.encoder_buffer_.HasData(), "There should be no data to write in this state");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
session.RollbackTransaction();
|
||||||
if (!session.encoder_.MessageSuccess({})) {
|
if (!session.encoder_.MessageSuccess({})) {
|
||||||
spdlog::trace("Couldn't send success message!");
|
spdlog::trace("Couldn't send success message!");
|
||||||
return State::Close;
|
return State::Close;
|
||||||
}
|
}
|
||||||
session.RollbackTransaction();
|
|
||||||
return State::Idle;
|
return State::Idle;
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
return HandleFailure(session, e);
|
return HandleFailure(session, e);
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from neo4j import GraphDatabase, basic_auth
|
from neo4j import GraphDatabase
|
||||||
from neo4j.exceptions import ClientError, TransientError
|
from neo4j.exceptions import ClientError, TransientError
|
||||||
|
|
||||||
|
|
||||||
@ -75,11 +75,58 @@ def test_timeout(driver, set_timeout):
|
|||||||
raise Exception("The query should have timed out, but it didn't!")
|
raise Exception("The query should have timed out, but it didn't!")
|
||||||
|
|
||||||
|
|
||||||
|
def violate_constraint(tx):
|
||||||
|
tx.run("CREATE (n:Employee:Person {id: '123', alt_id: '100'});").consume()
|
||||||
|
|
||||||
|
|
||||||
|
def violate_constraint_on_intermediate_result(tx):
|
||||||
|
tx.run("CREATE (n:Employee:Person {id: '124', alt_id: '200'});").consume()
|
||||||
|
tx.run("MATCH (n {alt_id: '200'}) SET n.id = '123';").consume() # two (:Person {id: '123'})
|
||||||
|
tx.run("MATCH (n {alt_id: '100'}) SET n.id = '122';").consume() # above violation fixed
|
||||||
|
|
||||||
|
|
||||||
|
def clear_db(session):
|
||||||
|
session.run("DROP CONSTRAINT ON (n:Person) ASSERT n.id IS UNIQUE;")
|
||||||
|
session.run("DROP CONSTRAINT ON (n:Employee) ASSERT n.id IS UNIQUE;")
|
||||||
|
session.run("DROP CONSTRAINT ON (n:Employee) ASSERT EXISTS (n.id);")
|
||||||
|
|
||||||
|
session.run("MATCH (n) DETACH DELETE n;")
|
||||||
|
|
||||||
|
|
||||||
with GraphDatabase.driver("bolt://localhost:7687", auth=None, encrypted=False) as driver:
|
with GraphDatabase.driver("bolt://localhost:7687", auth=None, encrypted=False) as driver:
|
||||||
|
with driver.session() as session:
|
||||||
|
# Clear the DB
|
||||||
|
session.run("MATCH (n) DETACH DELETE n;")
|
||||||
|
|
||||||
|
# Add constraints
|
||||||
|
session.run("CREATE CONSTRAINT ON (n:Person) ASSERT n.id IS UNIQUE;")
|
||||||
|
session.run("CREATE CONSTRAINT ON (n:Employee) ASSERT n.id IS UNIQUE;")
|
||||||
|
session.run("CREATE CONSTRAINT ON (n:Employee) ASSERT EXISTS (n.id);")
|
||||||
|
|
||||||
|
# Set the initial graph state
|
||||||
|
session.execute_write(lambda tx: tx.run("CREATE (n:Employee:Person {id: '123', alt_id: '100'}) RETURN n;"))
|
||||||
|
|
||||||
|
# Run a transaction that violates a constraint
|
||||||
|
try:
|
||||||
|
session.execute_write(violate_constraint)
|
||||||
|
except TransientError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
clear_db(session)
|
||||||
|
raise Exception("neo4j.exceptions.TransientError should have been thrown!")
|
||||||
|
|
||||||
|
# Run a transaction that violates no constraints even though an intermediate result does
|
||||||
|
try:
|
||||||
|
session.execute_write(violate_constraint_on_intermediate_result)
|
||||||
|
except TransientError:
|
||||||
|
clear_db(session)
|
||||||
|
raise Exception("neo4j.exceptions.TransientError should not have been thrown!")
|
||||||
|
|
||||||
|
clear_db(session)
|
||||||
|
|
||||||
def add_person(f, name, name2):
|
def add_person(f, name, name2):
|
||||||
with driver.session() as session:
|
with driver.session() as session:
|
||||||
session.write_transaction(f, name, name2)
|
session.execute_write(f, name, name2)
|
||||||
|
|
||||||
# Wrong query.
|
# Wrong query.
|
||||||
try:
|
try:
|
||||||
|
Loading…
Reference in New Issue
Block a user