Add retry logic possible when conflicting transactions (#1361)

This commit is contained in:
Josipmrden 2023-10-24 19:43:23 +02:00 committed by GitHub
parent 1f118e7521
commit 4e8148f7d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 619 additions and 8 deletions

View File

@ -177,6 +177,11 @@ std::map<std::string, memgraph::communication::bolt::Value> SessionHL::Pull(Sess
// Wrap QueryException into ClientError, because we want to allow the
// client to fix their query.
throw memgraph::communication::bolt::ClientError(e.what());
} catch (const utils::BasicException &) {
// Exceptions inheriting from BasicException will result in a TransientError
// i. e. client will be encouraged to retry execution because it
// could succeed if executed again.
throw;
}
}

View File

@ -29,6 +29,16 @@ class QueryException : public utils::BasicException {
SPECIALIZE_GET_EXCEPTION_NAME(QueryException)
};
/**
* @brief Base class of all query language related exceptions which can be retried.
* All exceptions derived from this one will be interpreted as TransientError-s,
* i.e. client will be encouraged to retry the queries.
*/
class RetryBasicException : public utils::BasicException {
using utils::BasicException::BasicException;
SPECIALIZE_GET_EXCEPTION_NAME(RetryBasicException)
};
class LexingException : public QueryException {
public:
using QueryException::QueryException;
@ -139,13 +149,12 @@ enum class AbortReason : uint8_t {
TIMEOUT = 3,
};
// This one is inherited from BasicException and will be treated as
// This one is inherited from RetryBasicException and will be treated as
// TransientError, i. e. client will be encouraged to retry execution because it
// could succeed if executed again.
class HintedAbortError : public utils::BasicException {
class HintedAbortError : public RetryBasicException {
public:
using utils::BasicException::BasicException;
explicit HintedAbortError(AbortReason reason) : utils::BasicException(AsMsg(reason)), reason_{reason} {}
explicit HintedAbortError(AbortReason reason) : RetryBasicException(AsMsg(reason)), reason_{reason} {}
SPECIALIZE_GET_EXCEPTION_NAME(HintedAbortError)
auto Reason() const -> AbortReason { return reason_; }
@ -187,11 +196,13 @@ class WriteVertexOperationInEdgeImportModeException : public QueryException {
SPECIALIZE_GET_EXCEPTION_NAME(WriteVertexOperationInEdgeImportModeException)
};
class TransactionSerializationException : public QueryException {
// This one is inherited from BasicException and will be treated as
// TransientError, i. e. client will be encouraged to retry execution because it
// could succeed if executed again.
class TransactionSerializationException : public RetryBasicException {
public:
using QueryException::QueryException;
TransactionSerializationException()
: QueryException(
: RetryBasicException(
"Cannot resolve conflicting transactions. You can retry this transaction when the conflicting transaction "
"is finished") {}
SPECIALIZE_GET_EXCEPTION_NAME(TransactionSerializationException)

View File

@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Neo4j.Driver.Simple" Version="5.8.0" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Neo4j.Driver;
public class ParallelEdgeImport {
public static async Task Main(string[] args) {
var nodeCount = 100;
using (var driver = GraphDatabase.Driver(
"bolt://localhost:7687", AuthTokens.None,
(builder) => builder.WithEncryptionLevel(EncryptionLevel.None)
.WithMaxTransactionRetryTime(TimeSpan.FromSeconds(15)))) {
ClearDatabase(driver);
// Create root
using (var session = driver.Session()) {
session.ExecuteWrite(tx => {
var result = tx.Run("MERGE (root:Root) RETURN ID(root);");
return result.Single()[0].As<int>();
});
}
Trace.Assert(CountNodes(driver) == 1, "Node count after root creation is not correct!.");
// Importing edges asynchronously
var tasks = new List<Task>();
for (int i = 0; i < nodeCount - 1; i++) {
tasks.Add(ImportEdgeAsync(driver));
}
await Task.WhenAll(tasks);
Trace.Assert(CountNodes(driver) == nodeCount);
}
Console.WriteLine("All ok!");
}
private static void ClearDatabase(IDriver driver) {
using (var session = driver.Session()) session.Run("MATCH (n) DETACH DELETE n").Consume();
}
private static async Task ImportEdgeAsync(IDriver driver) {
using (var session = driver.AsyncSession()) {
await session.WriteTransactionAsync(async tx => {
var reader = await tx.RunAsync(
"MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root) RETURN ID(root)");
await reader.FetchAsync();
Console.WriteLine("Transaction got through!");
return reader.Current[0];
});
}
}
private static int CountNodes(IDriver driver) {
using (var session = driver.Session()) {
var result = session.Run("MATCH (n) RETURN COUNT(*) AS cnt");
return Convert.ToInt32(result.First()["cnt"]);
}
}
}

View File

@ -3,6 +3,6 @@ module bolt-test
go 1.18
require (
github.com/neo4j/neo4j-go-driver/v5 v5.9.0 // indirect
github.com/neo4j/neo4j-go-driver/v5 v5.13.0 // indirect
golang.org/dl v0.0.0-20230502172222-5216546bad51 // indirect
)

View File

@ -6,5 +6,7 @@ github.com/neo4j/neo4j-go-driver/v5 v5.8.1 h1:IysKg6KJIUgyItmnHRRrt2N8srbd6znMsl
github.com/neo4j/neo4j-go-driver/v5 v5.8.1/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k=
github.com/neo4j/neo4j-go-driver/v5 v5.9.0 h1:TYxT0RSiwnvVFia90V7TLnRXv8HkdQQ6rTUaPVoyZ+w=
github.com/neo4j/neo4j-go-driver/v5 v5.9.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k=
github.com/neo4j/neo4j-go-driver/v5 v5.13.0 h1:NmyUxh4LYTdcJdI6EnazHyUKu1f0/BPiHCYUZUZIGQw=
github.com/neo4j/neo4j-go-driver/v5 v5.13.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k=
golang.org/dl v0.0.0-20230502172222-5216546bad51 h1:Bmo/kmR2hzyhGt3jjtl1ghkCqa5LINbB9D3QTkiLJIY=
golang.org/dl v0.0.0-20230502172222-5216546bad51/go.mod h1:IUMfjQLJQd4UTqG1Z90tenwKoCX93Gn3MAQJMOSBsDQ=

View File

@ -0,0 +1,126 @@
package main
import (
"log"
"fmt"
"sync"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
var nodeCount int = 100
func handle_if_error(err error) {
if err != nil {
log.Fatal("Error occured: %s", err)
}
}
func main() {
dbUri := "bolt://localhost:7687"
driver, err := neo4j.NewDriver(dbUri, neo4j.BasicAuth("", "", ""))
if err != nil {
log.Fatal("An error occurred opening conn: %s", err)
}
defer driver.Close()
session := driver.NewSession(neo4j.SessionConfig{})
defer session.Close()
_, err = session.WriteTransaction(clearDatabase)
handle_if_error(err)
fmt.Println("Database cleared.")
_, err = session.WriteTransaction(createRoot)
handle_if_error(err)
fmt.Println("Record created.")
parallelEdgeImport(driver)
_, err = session.WriteTransaction(testNodeCount)
handle_if_error(err)
fmt.Println("All ok!")
}
func clearDatabase(tx neo4j.Transaction) (interface{}, error) {
result, err := tx.Run(
"MATCH (n) DETACH DELETE n;",
map[string]interface{}{})
handle_if_error(err)
fmt.Printf("Database cleared!\n")
return result.Consume()
}
func createRoot(tx neo4j.Transaction) (interface{}, error) {
result, err := tx.Run(
`MERGE (root:Root) RETURN root;`,
map[string]interface{}{})
handle_if_error(err)
fmt.Printf("Root created!\n")
return result.Consume()
}
func worker(driver neo4j.Driver, wg *sync.WaitGroup) {
defer wg.Done() // Decrement the WaitGroup counter when the goroutine is done
session := driver.NewSession(neo4j.SessionConfig{})
defer session.Close()
_, err := session.WriteTransaction(edgeImport)
handle_if_error(err)
fmt.Printf("Transaction got through!\n")
}
func parallelEdgeImport(driver neo4j.Driver) {
var wg sync.WaitGroup
for i := 1; i <= nodeCount - 1; i++ {
wg.Add(1)
go worker(driver, &wg)
}
wg.Wait()
fmt.Println("All workers have finished.")
}
func edgeImport(tx neo4j.Transaction) (interface{}, error) {
result, err := tx.Run(
`MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root);`,
map[string]interface{}{})
handle_if_error(err)
return result.Consume()
}
func testNodeCount(tx neo4j.Transaction) (interface{}, error) {
result, err := tx.Run(
"MATCH (n) RETURN COUNT(n) AS cnt;",
map[string]interface{}{})
handle_if_error(err)
if !result.Next() {
log.Fatal("Missing result.")
}
count, found := result.Record().Get("cnt")
if !found {
return nil, fmt.Errorf("Wrong result returned.")
}
var countInt int
switch v := count.(type) {
case int:
countInt = v
case int64:
countInt = int(v)
case float64:
countInt = int(v)
default:
return nil, fmt.Errorf("Unexpected data type for count: %T", count)
}
if countInt != nodeCount {
log.Fatal("Count does not match! (", count, ")")
}
return result.Consume()
}

View File

@ -18,3 +18,4 @@ done
go get github.com/neo4j/neo4j-go-driver/v5
go run docs_quick_start.go
# go run parallel_edge_import.go

View File

@ -85,6 +85,25 @@
<goal>single</goal>
</goals>
</execution>
<execution>
<id>build-d</id>
<configuration>
<archive>
<manifest>
<mainClass>memgraph.ParallelEdgeImport</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<appendAssemblyId>false</appendAssemblyId>
<finalName>ParallelEdgeImport</finalName>
</configuration>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

View File

@ -36,3 +36,4 @@ mvn clean package
java -jar target/DocsHowToQuery.jar
java -jar target/MaxQueryLength.jar
java -jar target/Transactions.jar
# java -jar target/ParallelEdgeImport.jar

View File

@ -0,0 +1,122 @@
package memgraph;
import static org.neo4j.driver.Values.parameters;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.TransactionWork;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.async.AsyncTransaction;
import org.neo4j.driver.async.AsyncTransactionWork;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TransientException;
public class ParallelEdgeImport {
public static int dropDatabase(Driver driver) {
try (Session session = driver.session()) {
session.writeTransaction(new TransactionWork<Integer>() {
@Override
public Integer execute(Transaction tx) {
tx.run("MATCH (n) DETACH DELETE n;");
return 0;
}
});
} catch (ClientException e) {
System.out.println(e);
}
return 0;
}
public static int createRoot(Driver driver) {
try (var session = driver.session()) {
session.writeTransaction(new TransactionWork<Integer>() {
@Override
public Integer execute(Transaction tx) {
tx.run("MERGE (n:Root) RETURN ID(n);");
return 0;
}
});
} catch (ClientException e) {
System.out.println(e);
}
return 0;
}
public static void parallelEdgeImport(Driver d, int nodeCount) {
try (var session = driver.session()) {
session.writeTransaction(new TransactionWork<Integer>() {
@Override
public Integer execute(Transaction tx) {
tx.run("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root);");
System.out.println("Transaction got through!");
return 0;
}
});
} catch (ClientException e) {
System.out.println(e);
}
}
private static Config config = Config.builder()
.withoutEncryption()
.withMaxTransactionRetryTime(15, TimeUnit.SECONDS)
.build();
private static Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.none(), config);
private static int nodeCount = 100;
static class MyRunnable implements Runnable {
@Override
public void run() {
parallelEdgeImport(driver, nodeCount);
}
}
public static void main(String[] args) {
dropDatabase(driver);
System.out.println("Database cleared!");
createRoot(driver);
System.out.println("Root created!");
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < nodeCount - 1; i++) {
Runnable myRunnable = new MyRunnable();
Thread thread = new Thread(myRunnable);
threadList.add(thread);
thread.start();
}
// Wait for all threads to finish
for (Thread thread : threadList) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try (var session = driver.session()) {
var actualNodeCount = session.run("MATCH (n) RETURN COUNT(n) AS cnt;").list().get(0).get("cnt").asInt();
if (actualNodeCount != nodeCount) {
System.out.println("Node count doesn't match (" + actualNodeCount + ")");
System.exit(1);
}
}
System.out.println("All ok!");
driver.close();
}
}

View File

@ -0,0 +1,69 @@
var neo4j = require('neo4j-driver');
var driver = neo4j.driver("bolt://localhost:7687",
neo4j.auth.basic("", ""),
{ encrypted: 'ENCRYPTION_OFF', maxTransactionRetryTime: 30000 });
var session = driver.session();
var node_count = 50;
function die() {
session.close();
driver.close();
process.exit(1);
}
async function run_query(query, callback) {
var run = session.run(query, {});
run.then(callback).catch(function (error) {
console.log(error);
die();
});
}
async function run_query_with_sess(sess, query, callback) {
const result = await sess.executeWrite(async tx => {
return await tx.run(query);
});
console.log("Transaction got through!");
sess.close();
return result;
}
async function run_conflicting_queries(query, number, callback) {
const promises = [];
for (let i = 0; i < number; i++) {
var sess = driver.session();
promises.push(run_query_with_sess(sess, query));
}
await Promise.all(promises).then(callback);
}
async function assert_node_count(expectedCount) {
try {
const run = await session.run("MATCH (n) RETURN count(n) AS cnt", {});
const result = run.records[0].get("cnt").toNumber();
if (result !== expectedCount) {
console.log("Count result is not correct! (" + result + ")");
die();
} else {
console.log("Count result is correct! (" + result + ")");
console.log("All ok!");
}
} catch (error) {
console.log(error);
die();
} finally {
session.close();
driver.close();
}
}
run_query("MATCH (n) DETACH DELETE n;", function (result) {
console.log("Database cleared.");
run_query("MERGE (root:Root);", function (result) {
console.log("Root created.");
run_conflicting_queries("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root)", node_count - 1, function (result) {
assert_node_count(node_count);
});
});
});

View File

@ -0,0 +1,69 @@
var neo4j = require('neo4j-driver');
var driver = neo4j.driver("bolt://localhost:7687",
neo4j.auth.basic("", ""),
{ encrypted: 'ENCRYPTION_OFF' });
var session = driver.session();
var node_count = 50;
function die() {
session.close();
driver.close();
process.exit(1);
}
async function run_query(query, callback) {
var run = session.run(query, {});
run.then(callback).catch(function (error) {
console.log(error);
die();
});
}
async function run_query_with_sess(sess, query, callback) {
const result = await sess.executeWrite(async tx => {
return await tx.run(query);
});
console.log("Transaction got through!");
sess.close();
return result;
}
async function run_conflicting_queries(query, number, callback) {
const promises = [];
for (let i = 0; i < number; i++) {
var sess = driver.session();
promises.push(run_query_with_sess(sess, query));
}
await Promise.all(promises).then(callback);
}
async function assert_node_count(expectedCount) {
try {
const run = await session.run("MATCH (n) RETURN count(n) AS cnt", {});
const result = run.records[0].get("cnt").toNumber();
if (result !== expectedCount) {
console.log("Count result is not correct! (" + result + ")");
die();
} else {
console.log("Count result is correct! (" + result + ")");
console.log("All ok!");
}
} catch (error) {
console.log(error);
die();
} finally {
session.close();
driver.close();
}
}
run_query("MATCH (n) DETACH DELETE n;", function (result) {
console.log("Database cleared.");
run_query("MERGE (root:Root);", function (result) {
console.log("Root created.");
run_conflicting_queries("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root)", node_count - 1, function (result) {
assert_node_count(node_count);
});
});
});

View File

@ -15,3 +15,4 @@ fi
node docs_how_to_query.js
node max_query_length.js
# node parallel_edge_import.js

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2021 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import threading
from neo4j import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("", ""), encrypted=False)
session = driver.session()
node_count = 100
def add_edge_tx(tx):
result = tx.run("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root) RETURN root").single()
pass
def add_edge():
sess = driver.session()
sess.execute_write(add_edge_tx)
print("Transaction got through!")
sess.close()
session.run("MATCH (n) DETACH DELETE n").consume()
print("Database cleared.")
session.run("MERGE (root:Root);").consume()
print("Root created.")
threads = [threading.Thread(target=add_edge) for x in range(node_count - 1)]
[x.start() for x in threads]
[x.join() for x in threads]
count = session.run("MATCH (n) RETURN count(n) AS cnt").single()["cnt"]
assert count == node_count
session.close()
driver.close()
print("All ok!")

View File

@ -32,3 +32,4 @@ python3 docs_how_to_query.py || exit 1
python3 max_query_length.py || exit 1
python3 transactions.py || exit 1
python3 metadata.py || exit 1
# python3 parallel_edge_import.py || exit 1

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2021 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import threading
from neo4j import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("", ""), encrypted=False)
session = driver.session()
node_count = 100
def add_edge_tx(tx):
result = tx.run("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root) RETURN root").single()
pass
def add_edge():
sess = driver.session()
sess.execute_write(add_edge_tx)
print("Transaction got through!")
sess.close()
session.run("MATCH (n) DETACH DELETE n").consume()
print("Database cleared.")
session.run("MERGE (root:Root);").consume()
print("Root created.")
threads = [threading.Thread(target=add_edge) for x in range(node_count - 1)]
[x.start() for x in threads]
[x.join() for x in threads]
count = session.run("MATCH (n) RETURN count(n) AS cnt").single()["cnt"]
assert count == node_count
session.close()
driver.close()
print("All ok!")

View File

@ -26,3 +26,4 @@ python3 max_query_length.py || exit 1
python3 transactions.py || exit 1
python3 path.py || exit 1
python3 server_name.py || exit 1
# python3 parallel_edge_import.py || exit 1