diff --git a/src/glue/SessionHL.cpp b/src/glue/SessionHL.cpp index 4b996af57..a84f44974 100644 --- a/src/glue/SessionHL.cpp +++ b/src/glue/SessionHL.cpp @@ -177,6 +177,11 @@ std::map SessionHL::Pull(Sess // Wrap QueryException into ClientError, because we want to allow the // client to fix their query. throw memgraph::communication::bolt::ClientError(e.what()); + } catch (const utils::BasicException &) { + // Exceptions inheriting from BasicException will result in a TransientError + // i. e. client will be encouraged to retry execution because it + // could succeed if executed again. + throw; } } diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index a0998d035..1b2e712f9 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -29,6 +29,16 @@ class QueryException : public utils::BasicException { SPECIALIZE_GET_EXCEPTION_NAME(QueryException) }; +/** + * @brief Base class of all query language related exceptions which can be retried. + * All exceptions derived from this one will be interpreted as TransientError-s, + * i.e. client will be encouraged to retry the queries. + */ +class RetryBasicException : public utils::BasicException { + using utils::BasicException::BasicException; + SPECIALIZE_GET_EXCEPTION_NAME(RetryBasicException) +}; + class LexingException : public QueryException { public: using QueryException::QueryException; @@ -139,13 +149,12 @@ enum class AbortReason : uint8_t { TIMEOUT = 3, }; -// This one is inherited from BasicException and will be treated as +// This one is inherited from RetryBasicException and will be treated as // TransientError, i. e. client will be encouraged to retry execution because it // could succeed if executed again. -class HintedAbortError : public utils::BasicException { +class HintedAbortError : public RetryBasicException { public: - using utils::BasicException::BasicException; - explicit HintedAbortError(AbortReason reason) : utils::BasicException(AsMsg(reason)), reason_{reason} {} + explicit HintedAbortError(AbortReason reason) : RetryBasicException(AsMsg(reason)), reason_{reason} {} SPECIALIZE_GET_EXCEPTION_NAME(HintedAbortError) auto Reason() const -> AbortReason { return reason_; } @@ -187,11 +196,13 @@ class WriteVertexOperationInEdgeImportModeException : public QueryException { SPECIALIZE_GET_EXCEPTION_NAME(WriteVertexOperationInEdgeImportModeException) }; -class TransactionSerializationException : public QueryException { +// This one is inherited from BasicException and will be treated as +// TransientError, i. e. client will be encouraged to retry execution because it +// could succeed if executed again. +class TransactionSerializationException : public RetryBasicException { public: - using QueryException::QueryException; TransactionSerializationException() - : QueryException( + : RetryBasicException( "Cannot resolve conflicting transactions. You can retry this transaction when the conflicting transaction " "is finished") {} SPECIALIZE_GET_EXCEPTION_NAME(TransactionSerializationException) diff --git a/tests/drivers/csharp/v4_1/ParallelEdgeImport/ParallelEdgeImport.csproj b/tests/drivers/csharp/v4_1/ParallelEdgeImport/ParallelEdgeImport.csproj new file mode 100644 index 000000000..f5bdfa763 --- /dev/null +++ b/tests/drivers/csharp/v4_1/ParallelEdgeImport/ParallelEdgeImport.csproj @@ -0,0 +1,12 @@ + + + + Exe + netcoreapp2.1 + + + + + + + diff --git a/tests/drivers/csharp/v4_1/ParallelEdgeImport/Program.cs b/tests/drivers/csharp/v4_1/ParallelEdgeImport/Program.cs new file mode 100644 index 000000000..6d8ae5cb0 --- /dev/null +++ b/tests/drivers/csharp/v4_1/ParallelEdgeImport/Program.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using Neo4j.Driver; + +public class ParallelEdgeImport { + public static async Task Main(string[] args) { + var nodeCount = 100; + using (var driver = GraphDatabase.Driver( + "bolt://localhost:7687", AuthTokens.None, + (builder) => builder.WithEncryptionLevel(EncryptionLevel.None) + .WithMaxTransactionRetryTime(TimeSpan.FromSeconds(15)))) { + ClearDatabase(driver); + + // Create root + using (var session = driver.Session()) { + session.ExecuteWrite(tx => { + var result = tx.Run("MERGE (root:Root) RETURN ID(root);"); + return result.Single()[0].As(); + }); + } + + Trace.Assert(CountNodes(driver) == 1, "Node count after root creation is not correct!."); + + // Importing edges asynchronously + var tasks = new List(); + for (int i = 0; i < nodeCount - 1; i++) { + tasks.Add(ImportEdgeAsync(driver)); + } + + await Task.WhenAll(tasks); + + Trace.Assert(CountNodes(driver) == nodeCount); + } + + Console.WriteLine("All ok!"); + } + + private static void ClearDatabase(IDriver driver) { + using (var session = driver.Session()) session.Run("MATCH (n) DETACH DELETE n").Consume(); + } + + private static async Task ImportEdgeAsync(IDriver driver) { + using (var session = driver.AsyncSession()) { + await session.WriteTransactionAsync(async tx => { + var reader = await tx.RunAsync( + "MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root) RETURN ID(root)"); + await reader.FetchAsync(); + Console.WriteLine("Transaction got through!"); + return reader.Current[0]; + }); + } + } + + private static int CountNodes(IDriver driver) { + using (var session = driver.Session()) { + var result = session.Run("MATCH (n) RETURN COUNT(*) AS cnt"); + return Convert.ToInt32(result.First()["cnt"]); + } + } +} diff --git a/tests/drivers/go/v5/go.mod b/tests/drivers/go/v5/go.mod index ed02d3b35..a44baf405 100644 --- a/tests/drivers/go/v5/go.mod +++ b/tests/drivers/go/v5/go.mod @@ -3,6 +3,6 @@ module bolt-test go 1.18 require ( - github.com/neo4j/neo4j-go-driver/v5 v5.9.0 // indirect + github.com/neo4j/neo4j-go-driver/v5 v5.13.0 // indirect golang.org/dl v0.0.0-20230502172222-5216546bad51 // indirect ) diff --git a/tests/drivers/go/v5/go.sum b/tests/drivers/go/v5/go.sum index 29a1ab00e..dc85aef95 100644 --- a/tests/drivers/go/v5/go.sum +++ b/tests/drivers/go/v5/go.sum @@ -6,5 +6,7 @@ github.com/neo4j/neo4j-go-driver/v5 v5.8.1 h1:IysKg6KJIUgyItmnHRRrt2N8srbd6znMsl github.com/neo4j/neo4j-go-driver/v5 v5.8.1/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= github.com/neo4j/neo4j-go-driver/v5 v5.9.0 h1:TYxT0RSiwnvVFia90V7TLnRXv8HkdQQ6rTUaPVoyZ+w= github.com/neo4j/neo4j-go-driver/v5 v5.9.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= +github.com/neo4j/neo4j-go-driver/v5 v5.13.0 h1:NmyUxh4LYTdcJdI6EnazHyUKu1f0/BPiHCYUZUZIGQw= +github.com/neo4j/neo4j-go-driver/v5 v5.13.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= golang.org/dl v0.0.0-20230502172222-5216546bad51 h1:Bmo/kmR2hzyhGt3jjtl1ghkCqa5LINbB9D3QTkiLJIY= golang.org/dl v0.0.0-20230502172222-5216546bad51/go.mod h1:IUMfjQLJQd4UTqG1Z90tenwKoCX93Gn3MAQJMOSBsDQ= diff --git a/tests/drivers/go/v5/parallel_edge_import.go b/tests/drivers/go/v5/parallel_edge_import.go new file mode 100644 index 000000000..b13fccbfc --- /dev/null +++ b/tests/drivers/go/v5/parallel_edge_import.go @@ -0,0 +1,126 @@ +package main + +import ( + "log" + "fmt" + "sync" + "github.com/neo4j/neo4j-go-driver/v5/neo4j" +) + +var nodeCount int = 100 + +func handle_if_error(err error) { + if err != nil { + log.Fatal("Error occured: %s", err) + } +} + +func main() { + dbUri := "bolt://localhost:7687" + driver, err := neo4j.NewDriver(dbUri, neo4j.BasicAuth("", "", "")) + if err != nil { + log.Fatal("An error occurred opening conn: %s", err) + } + defer driver.Close() + + session := driver.NewSession(neo4j.SessionConfig{}) + defer session.Close() + + _, err = session.WriteTransaction(clearDatabase) + handle_if_error(err) + fmt.Println("Database cleared.") + + _, err = session.WriteTransaction(createRoot) + handle_if_error(err) + fmt.Println("Record created.") + + parallelEdgeImport(driver) + + _, err = session.WriteTransaction(testNodeCount) + handle_if_error(err) + fmt.Println("All ok!") +} + +func clearDatabase(tx neo4j.Transaction) (interface{}, error) { + result, err := tx.Run( + "MATCH (n) DETACH DELETE n;", + map[string]interface{}{}) + handle_if_error(err) + fmt.Printf("Database cleared!\n") + return result.Consume() +} + +func createRoot(tx neo4j.Transaction) (interface{}, error) { + result, err := tx.Run( + `MERGE (root:Root) RETURN root;`, + map[string]interface{}{}) + handle_if_error(err) + fmt.Printf("Root created!\n") + return result.Consume() +} + +func worker(driver neo4j.Driver, wg *sync.WaitGroup) { + defer wg.Done() // Decrement the WaitGroup counter when the goroutine is done + + session := driver.NewSession(neo4j.SessionConfig{}) + defer session.Close() + + _, err := session.WriteTransaction(edgeImport) + handle_if_error(err) + fmt.Printf("Transaction got through!\n") +} + +func parallelEdgeImport(driver neo4j.Driver) { + var wg sync.WaitGroup + + for i := 1; i <= nodeCount - 1; i++ { + wg.Add(1) + go worker(driver, &wg) + } + + wg.Wait() + + fmt.Println("All workers have finished.") +} + +func edgeImport(tx neo4j.Transaction) (interface{}, error) { + result, err := tx.Run( + `MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root);`, + map[string]interface{}{}) + handle_if_error(err) + return result.Consume() +} + +func testNodeCount(tx neo4j.Transaction) (interface{}, error) { + result, err := tx.Run( + "MATCH (n) RETURN COUNT(n) AS cnt;", + map[string]interface{}{}) + handle_if_error(err) + + if !result.Next() { + log.Fatal("Missing result.") + } + + count, found := result.Record().Get("cnt") + if !found { + return nil, fmt.Errorf("Wrong result returned.") + } + + var countInt int + switch v := count.(type) { + case int: + countInt = v + case int64: + countInt = int(v) + case float64: + countInt = int(v) + default: + return nil, fmt.Errorf("Unexpected data type for count: %T", count) + } + + if countInt != nodeCount { + log.Fatal("Count does not match! (", count, ")") + } + + return result.Consume() +} diff --git a/tests/drivers/go/v5/run.sh b/tests/drivers/go/v5/run.sh index 344495f15..cbe31bd26 100755 --- a/tests/drivers/go/v5/run.sh +++ b/tests/drivers/go/v5/run.sh @@ -18,3 +18,4 @@ done go get github.com/neo4j/neo4j-go-driver/v5 go run docs_quick_start.go +# go run parallel_edge_import.go diff --git a/tests/drivers/java/v5_8/pom.xml b/tests/drivers/java/v5_8/pom.xml index c0a64e8c0..6db6a6ded 100644 --- a/tests/drivers/java/v5_8/pom.xml +++ b/tests/drivers/java/v5_8/pom.xml @@ -85,6 +85,25 @@ single + + build-d + + + + memgraph.ParallelEdgeImport + + + + jar-with-dependencies + + false + ParallelEdgeImport + + package + + single + + diff --git a/tests/drivers/java/v5_8/run.sh b/tests/drivers/java/v5_8/run.sh index 6ae33ec4d..0e85f68df 100755 --- a/tests/drivers/java/v5_8/run.sh +++ b/tests/drivers/java/v5_8/run.sh @@ -36,3 +36,4 @@ mvn clean package java -jar target/DocsHowToQuery.jar java -jar target/MaxQueryLength.jar java -jar target/Transactions.jar +# java -jar target/ParallelEdgeImport.jar diff --git a/tests/drivers/java/v5_8/src/main/java/memgraph/ParallelEdgeImport.java b/tests/drivers/java/v5_8/src/main/java/memgraph/ParallelEdgeImport.java new file mode 100644 index 000000000..343898c15 --- /dev/null +++ b/tests/drivers/java/v5_8/src/main/java/memgraph/ParallelEdgeImport.java @@ -0,0 +1,122 @@ +package memgraph; + +import static org.neo4j.driver.Values.parameters; + +import java.util.*; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; +import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.Config; +import org.neo4j.driver.Driver; +import org.neo4j.driver.GraphDatabase; +import org.neo4j.driver.Result; +import org.neo4j.driver.Session; +import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionWork; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionWork; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.TransientException; + +public class ParallelEdgeImport { + public static int dropDatabase(Driver driver) { + try (Session session = driver.session()) { + session.writeTransaction(new TransactionWork() { + @Override + public Integer execute(Transaction tx) { + tx.run("MATCH (n) DETACH DELETE n;"); + return 0; + } + }); + } catch (ClientException e) { + System.out.println(e); + } + + return 0; + } + + public static int createRoot(Driver driver) { + try (var session = driver.session()) { + session.writeTransaction(new TransactionWork() { + @Override + public Integer execute(Transaction tx) { + tx.run("MERGE (n:Root) RETURN ID(n);"); + return 0; + } + }); + } catch (ClientException e) { + System.out.println(e); + } + + return 0; + } + + public static void parallelEdgeImport(Driver d, int nodeCount) { + try (var session = driver.session()) { + session.writeTransaction(new TransactionWork() { + @Override + public Integer execute(Transaction tx) { + tx.run("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root);"); + System.out.println("Transaction got through!"); + return 0; + } + }); + } catch (ClientException e) { + System.out.println(e); + } + } + + private static Config config = Config.builder() + .withoutEncryption() + .withMaxTransactionRetryTime(15, TimeUnit.SECONDS) + .build(); + + private static Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.none(), config); + private static int nodeCount = 100; + + static class MyRunnable implements Runnable { + @Override + public void run() { + parallelEdgeImport(driver, nodeCount); + } + } + + public static void main(String[] args) { + dropDatabase(driver); + System.out.println("Database cleared!"); + + createRoot(driver); + System.out.println("Root created!"); + + List threadList = new ArrayList<>(); + for (int i = 0; i < nodeCount - 1; i++) { + Runnable myRunnable = new MyRunnable(); + Thread thread = new Thread(myRunnable); + threadList.add(thread); + thread.start(); + } + + // Wait for all threads to finish + for (Thread thread : threadList) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + try (var session = driver.session()) { + var actualNodeCount = session.run("MATCH (n) RETURN COUNT(n) AS cnt;").list().get(0).get("cnt").asInt(); + if (actualNodeCount != nodeCount) { + System.out.println("Node count doesn't match (" + actualNodeCount + ")"); + System.exit(1); + } + } + + System.out.println("All ok!"); + driver.close(); + } +} diff --git a/tests/drivers/node/v4_1/parallel_edge_import.js b/tests/drivers/node/v4_1/parallel_edge_import.js new file mode 100644 index 000000000..8dcf9c875 --- /dev/null +++ b/tests/drivers/node/v4_1/parallel_edge_import.js @@ -0,0 +1,69 @@ +var neo4j = require('neo4j-driver'); +var driver = neo4j.driver("bolt://localhost:7687", + neo4j.auth.basic("", ""), + { encrypted: 'ENCRYPTION_OFF', maxTransactionRetryTime: 30000 }); +var session = driver.session(); + +var node_count = 50; + +function die() { + session.close(); + driver.close(); + process.exit(1); +} + +async function run_query(query, callback) { + var run = session.run(query, {}); + run.then(callback).catch(function (error) { + console.log(error); + die(); + }); +} + +async function run_query_with_sess(sess, query, callback) { + const result = await sess.executeWrite(async tx => { + return await tx.run(query); + }); + console.log("Transaction got through!"); + sess.close(); + return result; +} + +async function run_conflicting_queries(query, number, callback) { + const promises = []; + for (let i = 0; i < number; i++) { + var sess = driver.session(); + promises.push(run_query_with_sess(sess, query)); + } + await Promise.all(promises).then(callback); +} + +async function assert_node_count(expectedCount) { + try { + const run = await session.run("MATCH (n) RETURN count(n) AS cnt", {}); + const result = run.records[0].get("cnt").toNumber(); + if (result !== expectedCount) { + console.log("Count result is not correct! (" + result + ")"); + die(); + } else { + console.log("Count result is correct! (" + result + ")"); + console.log("All ok!"); + } + } catch (error) { + console.log(error); + die(); + } finally { + session.close(); + driver.close(); + } +} + +run_query("MATCH (n) DETACH DELETE n;", function (result) { + console.log("Database cleared."); + run_query("MERGE (root:Root);", function (result) { + console.log("Root created."); + run_conflicting_queries("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root)", node_count - 1, function (result) { + assert_node_count(node_count); + }); + }); +}); diff --git a/tests/drivers/node/v5_8/parallel_edge_import.js b/tests/drivers/node/v5_8/parallel_edge_import.js new file mode 100644 index 000000000..4503c2b77 --- /dev/null +++ b/tests/drivers/node/v5_8/parallel_edge_import.js @@ -0,0 +1,69 @@ +var neo4j = require('neo4j-driver'); +var driver = neo4j.driver("bolt://localhost:7687", + neo4j.auth.basic("", ""), + { encrypted: 'ENCRYPTION_OFF' }); +var session = driver.session(); + +var node_count = 50; + +function die() { + session.close(); + driver.close(); + process.exit(1); +} + +async function run_query(query, callback) { + var run = session.run(query, {}); + run.then(callback).catch(function (error) { + console.log(error); + die(); + }); +} + +async function run_query_with_sess(sess, query, callback) { + const result = await sess.executeWrite(async tx => { + return await tx.run(query); + }); + console.log("Transaction got through!"); + sess.close(); + return result; +} + +async function run_conflicting_queries(query, number, callback) { + const promises = []; + for (let i = 0; i < number; i++) { + var sess = driver.session(); + promises.push(run_query_with_sess(sess, query)); + } + await Promise.all(promises).then(callback); +} + +async function assert_node_count(expectedCount) { + try { + const run = await session.run("MATCH (n) RETURN count(n) AS cnt", {}); + const result = run.records[0].get("cnt").toNumber(); + if (result !== expectedCount) { + console.log("Count result is not correct! (" + result + ")"); + die(); + } else { + console.log("Count result is correct! (" + result + ")"); + console.log("All ok!"); + } + } catch (error) { + console.log(error); + die(); + } finally { + session.close(); + driver.close(); + } +} + +run_query("MATCH (n) DETACH DELETE n;", function (result) { + console.log("Database cleared."); + run_query("MERGE (root:Root);", function (result) { + console.log("Root created."); + run_conflicting_queries("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root)", node_count - 1, function (result) { + assert_node_count(node_count); + }); + }); +}); diff --git a/tests/drivers/node/v5_8/run.sh b/tests/drivers/node/v5_8/run.sh index a24c5110c..276fdbb2b 100755 --- a/tests/drivers/node/v5_8/run.sh +++ b/tests/drivers/node/v5_8/run.sh @@ -15,3 +15,4 @@ fi node docs_how_to_query.js node max_query_length.js +# node parallel_edge_import.js diff --git a/tests/drivers/python/v4_1/parallel_edge_import.py b/tests/drivers/python/v4_1/parallel_edge_import.py new file mode 100644 index 000000000..1e54042c3 --- /dev/null +++ b/tests/drivers/python/v4_1/parallel_edge_import.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright 2021 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import sys +import threading + +from neo4j import GraphDatabase, basic_auth + +driver = GraphDatabase.driver("bolt://localhost:7687", auth=("", ""), encrypted=False) +session = driver.session() + +node_count = 100 + + +def add_edge_tx(tx): + result = tx.run("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root) RETURN root").single() + pass + + +def add_edge(): + sess = driver.session() + sess.execute_write(add_edge_tx) + print("Transaction got through!") + sess.close() + + +session.run("MATCH (n) DETACH DELETE n").consume() +print("Database cleared.") + +session.run("MERGE (root:Root);").consume() +print("Root created.") + +threads = [threading.Thread(target=add_edge) for x in range(node_count - 1)] +[x.start() for x in threads] +[x.join() for x in threads] + +count = session.run("MATCH (n) RETURN count(n) AS cnt").single()["cnt"] +assert count == node_count + +session.close() +driver.close() + +print("All ok!") diff --git a/tests/drivers/python/v4_1/run.sh b/tests/drivers/python/v4_1/run.sh index 146f3f57e..487685ade 100755 --- a/tests/drivers/python/v4_1/run.sh +++ b/tests/drivers/python/v4_1/run.sh @@ -32,3 +32,4 @@ python3 docs_how_to_query.py || exit 1 python3 max_query_length.py || exit 1 python3 transactions.py || exit 1 python3 metadata.py || exit 1 +# python3 parallel_edge_import.py || exit 1 diff --git a/tests/drivers/python/v5_8/parallel_edge_import.py b/tests/drivers/python/v5_8/parallel_edge_import.py new file mode 100644 index 000000000..1e54042c3 --- /dev/null +++ b/tests/drivers/python/v5_8/parallel_edge_import.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright 2021 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import sys +import threading + +from neo4j import GraphDatabase, basic_auth + +driver = GraphDatabase.driver("bolt://localhost:7687", auth=("", ""), encrypted=False) +session = driver.session() + +node_count = 100 + + +def add_edge_tx(tx): + result = tx.run("MATCH (root:Root) CREATE (n:Node) CREATE (n)-[:TROUBLING_EDGE]->(root) RETURN root").single() + pass + + +def add_edge(): + sess = driver.session() + sess.execute_write(add_edge_tx) + print("Transaction got through!") + sess.close() + + +session.run("MATCH (n) DETACH DELETE n").consume() +print("Database cleared.") + +session.run("MERGE (root:Root);").consume() +print("Root created.") + +threads = [threading.Thread(target=add_edge) for x in range(node_count - 1)] +[x.start() for x in threads] +[x.join() for x in threads] + +count = session.run("MATCH (n) RETURN count(n) AS cnt").single()["cnt"] +assert count == node_count + +session.close() +driver.close() + +print("All ok!") diff --git a/tests/drivers/python/v5_8/run.sh b/tests/drivers/python/v5_8/run.sh index 0231f4e0d..2e36da6bc 100755 --- a/tests/drivers/python/v5_8/run.sh +++ b/tests/drivers/python/v5_8/run.sh @@ -26,3 +26,4 @@ python3 max_query_length.py || exit 1 python3 transactions.py || exit 1 python3 path.py || exit 1 python3 server_name.py || exit 1 +# python3 parallel_edge_import.py || exit 1