Remove gqlalchemy from stress tests (#1245)

This commit is contained in:
Josipmrden 2023-09-12 17:32:16 +02:00 committed by GitHub
parent fd63944493
commit bf03b38e39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 52 deletions

View File

@ -24,7 +24,6 @@ import time
from argparse import ArgumentParser
from threading import Thread
from gqlalchemy import Memgraph
from neo4j import TRUST_ALL_CERTIFICATES, GraphDatabase
@ -101,19 +100,6 @@ def execute_till_success(session, query, max_retries=1000):
raise Exception("Query '%s' failed %d times, aborting" % (query, max_retries))
def execute_till_success_gqlalchemy(memgraph: Memgraph, query: str, max_retries=1000):
"""Same method as execute_till_success, but for gqlalchemy."""
no_failures = 0
while True:
try:
result = memgraph.execute(query)
return result, no_failures
except Exception:
no_failures += 1
if no_failures >= max_retries:
raise Exception("Query '%s' failed %d times, aborting" % (query, max_retries))
def batch(input, batch_size):
"""Batches the given input (must be iterable).
Supports input generators. Returns a generator.
@ -216,23 +202,6 @@ def argument_driver(args):
)
def get_memgraph(args) -> Memgraph:
host_port = args.endpoint.split(":")
connection_params = {
"host": host_port[0],
"port": int(host_port[1]),
"username": args.username,
"password": args.password,
"encrypted": False,
}
if args.use_ssl:
connection_params["encrypted"] = True
return Memgraph(**connection_params)
# This class is used to create and cache sessions. Session is cached by args
# used to create it and process' pid in which it was created. This makes it
# easy to reuse session with python multiprocessing primitives like pmap.

View File

@ -16,6 +16,7 @@
Large bipartite graph stress test.
"""
import atexit
import logging
import multiprocessing
import random
@ -25,7 +26,12 @@ from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, Tuple
from common import OutputData, connection_argument_parser, get_memgraph
from common import (
OutputData,
SessionCache,
connection_argument_parser,
execute_till_success,
)
log = logging.getLogger(__name__)
output_data = OutputData()
@ -35,6 +41,9 @@ CREATE_FUNCTION = "CREATE"
DELETE_FUNCTION = "DELETE"
atexit.register(SessionCache.cleanup)
def parse_args() -> Args:
"""
Parses user arguments
@ -98,20 +107,20 @@ def timed_function(name) -> Callable:
@timed_function("cleanup_time")
def clean_database() -> None:
memgraph = get_memgraph(args)
memgraph.execute("MATCH (n) DETACH DELETE n")
session = SessionCache.argument_session(args)
execute_till_success(session, "MATCH (n) DETACH DELETE n")
def create_indices() -> None:
memgraph = get_memgraph(args)
memgraph.execute("CREATE INDEX ON :Node")
memgraph.execute("CREATE INDEX ON :Node(id)")
session = SessionCache.argument_session(args)
execute_till_success(session, "CREATE INDEX ON :Node")
execute_till_success(session, "CREATE INDEX ON :Node(id)")
def setup_database_mode() -> None:
memgraph = get_memgraph(args)
memgraph.execute(f"STORAGE MODE {args.storage_mode}")
memgraph.execute(f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}")
session = SessionCache.argument_session(args)
execute_till_success(session, f"STORAGE MODE {args.storage_mode}")
execute_till_success(session, f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}")
def execute_function(worker: Worker) -> Worker:
@ -135,28 +144,28 @@ def run_writer(total_workers_cnt: int, repetition_count: int, sleep_sec: float,
a valid graph. A graph is valid if the number of nodes is preserved, and the chain is either
not present or present completely.
"""
memgraph = get_memgraph(args)
session = SessionCache.argument_session(args)
def create():
try:
memgraph.execute(
f"MERGE (:Node{worker_id} {{id: 1}})-[:REL]-(:Node{worker_id} {{id: 2}})-[:REL]-(:Node{worker_id} {{id: 3}})-[:REL]-(:Node{worker_id} {{id: 4}})"
execute_till_success(
session,
f"MERGE (:Node{worker_id} {{id: 1}})-[:REL]-(:Node{worker_id} {{id: 2}})-[:REL]-(:Node{worker_id} {{id: 3}})-[:REL]-(:Node{worker_id} {{id: 4}})",
)
except Exception as ex:
pass
def verify() -> Tuple[bool, int]:
# We always create X nodes and therefore the number of nodes needs to be always a fraction of X
count = list(memgraph.execute_and_fetch(f"MATCH (n) RETURN COUNT(n) AS cnt"))[0]["cnt"]
count = execute_till_success(session, f"MATCH (n) RETURN COUNT(n) AS cnt")[0][0]["cnt"]
log.info(f"Worker {worker_id} verified graph count {count} in repetition {curr_repetition}")
assert count <= total_workers_cnt * NUMBER_NODES_IN_CHAIN and count % NUMBER_NODES_IN_CHAIN == 0
ids = list(
memgraph.execute_and_fetch(
f"MATCH (n:Node{worker_id} {{id: 1}})-->(m)-->(o)-->(p) RETURN n.id AS id1, m.id AS id2, o.id AS id3, p.id AS id4"
)
)
ids = execute_till_success(
session,
f"MATCH (n:Node{worker_id} {{id: 1}})-->(m)-->(o)-->(p) RETURN n.id AS id1, m.id AS id2, o.id AS id3, p.id AS id4",
)[0]
if len(ids):
result = ids[0]
@ -183,11 +192,11 @@ def run_deleter(total_workers_cnt: int, repetition_count: int, sleep_sec: float)
"""
Periodic deletion of an arbitrary chain in the graph
"""
memgraph = get_memgraph(args)
session = SessionCache.argument_session(args)
def delete_part_of_graph(id: int):
try:
memgraph.execute(f"MATCH (n:Node{id}) DETACH DELETE n")
execute_till_success(session, f"MATCH (n:Node{id}) DETACH DELETE n")
log.info(f"Worker deleted chain with nodes of id {id}")
except Exception as ex:
log.info(f"Worker failed to delete the chain with id {id}")

View File

@ -1,2 +1 @@
neo4j-driver==4.1.1
gqlalchemy==1.3.3