memgraph/tests/gql_behave/steps/query.py

315 lines
8.7 KiB
Python
Raw Normal View History

# Copyright 2021 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
# -*- coding: utf-8 -*-
2017-06-20 21:01:11 +08:00
import database
import parser
from behave import given, then, step, when
from neo4j.graph import Node, Path, Relationship
2017-06-20 21:01:11 +08:00
@given('parameters are')
def parameters_step(context):
context.test_parameters.set_parameters_from_table(context.table)
@then('parameters are')
def parameters_step(context):
context.test_parameters.set_parameters_from_table(context.table)
@step('having executed')
def having_executed_step(context):
context.results = database.query(
context.text, context, context.test_parameters.get_parameters())
@when('executing query')
def executing_query_step(context):
context.results = database.query(
context.text, context, context.test_parameters.get_parameters())
@when('executing control query')
def executing_query_step(context):
context.results = database.query(
context.text, context, context.test_parameters.get_parameters())
def parse_props(props_key_value):
2017-06-20 21:01:11 +08:00
"""
Function used to parse properties from map of properties to string.
@param prop_json:
Dictionary, map of properties of graph element.
@return:
Map of properties in string format.
"""
if not props_key_value:
2017-06-20 21:01:11 +08:00
return ""
properties = "{"
for key, value in props_key_value:
if value is None:
properties += key + ": null, "
elif isinstance(value, str):
properties += key + ": " + "'" + value + "', "
elif isinstance(value, bool):
if value:
properties += key + ": true, "
2017-06-20 21:01:11 +08:00
else:
properties += key + ": false, "
2017-06-20 21:01:11 +08:00
else:
properties += key + ": " + str(value) + ", "
2017-06-20 21:01:11 +08:00
properties = properties[:-2]
properties += "}"
return properties
def to_string(element):
"""
Function used to parse result from database to string. Format of
the string is same as format of a result given in cucumber test.
@param element:
Can be None, string, bool, number, list, dict or any of neo4j.graph
Node, Path, Relationship. Element which will be parsed.
2017-06-20 21:01:11 +08:00
@return:
String of parsed element.
"""
if element is None:
# parsing None
return "null"
if isinstance(element, Node):
# parsing Node
sol = "("
if element.labels:
sol += ':' + ': '.join(element.labels)
if element.keys():
2017-06-20 21:01:11 +08:00
if element.labels:
sol += ' '
sol += parse_props(element.items())
2017-06-20 21:01:11 +08:00
sol += ")"
return sol
elif isinstance(element, Relationship):
# parsing Relationship
sol = "[:"
if element.type:
sol += element.type
if element.keys():
2017-06-20 21:01:11 +08:00
sol += ' '
sol += parse_props(element.items())
2017-06-20 21:01:11 +08:00
sol += "]"
return sol
elif isinstance(element, Path):
# parsing Path
# TODO add longer paths
edges = []
nodes = []
for rel in element.relationships:
edges.append([rel.start_node.id, to_string(rel)])
2017-06-20 21:01:11 +08:00
for node in element.nodes:
nodes.append([node.id, to_string(node)])
sol = "<"
for i in range(0, len(edges)):
if edges[i][0] == nodes[i][0]:
sol += nodes[i][1] + "-" + edges[i][1] + "->"
else:
sol += nodes[i][1] + "<-" + edges[i][1] + "-"
sol += nodes[len(edges)][1]
sol += ">"
return sol
elif isinstance(element, str):
# parsing string
return "'" + element + "'"
elif isinstance(element, list):
# parsing list
sol = '['
el_str = []
for el in element:
el_str.append(to_string(el))
sol += ', '.join(el_str)
sol += ']'
return sol
elif isinstance(element, bool):
# parsing bool
if element:
return "true"
return "false"
elif isinstance(element, dict):
# parsing map
if len(element) == 0:
return '{}'
sol = '{'
for key, val in element.items():
sol += key + ':' + to_string(val) + ','
sol = sol[:-1] + '}'
return sol
elif isinstance(element, float):
# parsing float, scientific
if 'e' in str(element):
if str(element)[-3] == '-':
zeroes = int(str(element)[-2:]) - 1
num_str = ''
if str(element)[0] == '-':
num_str += '-'
num_str += '.' + zeroes * '0' + \
str(element)[:-4].replace("-", "").replace(".", "")
return num_str
return str(element)
def get_result_rows(context, ignore_order):
"""
Function returns results from database queries stored in context
as list.
@param context:
behave.runner.Context, behave context.
@param ignore_order:
bool, ignore order in result and expected list.
@return
Result rows.
"""
result_rows = []
for result in context.results:
keys = result.keys()
values = result.values()
for i in range(0, len(keys)):
result_rows.append(keys[i] + ":" + parser.parse(
to_string(values[i]).replace("\n", "\\n").replace(" ", ""),
ignore_order))
2017-06-20 21:01:11 +08:00
return result_rows
def get_expected_rows(context, ignore_order):
"""
Fuction returns expected results as list from context table.
@param context:
behave.runner.Context, behave context.
@param ignore_order:
bool, ignore order in result and expected list.
@return
Expected rows
"""
expected_rows = []
for row in context.table:
for col in context.table.headings:
expected_rows.append(
col + ":" + parser.parse(row[col].replace(" ", ""),
ignore_order))
2017-06-20 21:01:11 +08:00
return expected_rows
def validate(context, ignore_order):
"""
Function used to check if results from database are same
as expected results in any order.
@param context:
behave.runner.Context, behave context.
@param ignore_order:
bool, ignore order in result and expected list.
"""
result_rows = get_result_rows(context, ignore_order)
expected_rows = get_expected_rows(context, ignore_order)
context.log.info("Expected: %s", str(expected_rows))
context.log.info("Results: %s", str(result_rows))
assert(len(expected_rows) == len(result_rows))
for i in range(0, len(expected_rows)):
if expected_rows[i] in result_rows:
result_rows.remove(expected_rows[i])
else:
assert(False)
def validate_in_order(context, ignore_order):
"""
Function used to check if results from database are same
as exected results. First result from database should be
first in expected results list etc.
@param context:
behave.runner.Context, behave context.
@param ignore_order:
bool, ignore order in result and expected list.
"""
result_rows = get_result_rows(context, ignore_order)
expected_rows = get_expected_rows(context, ignore_order)
context.log.info("Expected: %s", str(expected_rows))
context.log.info("Results: %s", str(result_rows))
assert(len(expected_rows) == len(result_rows))
for i in range(0, len(expected_rows)):
if expected_rows[i] != result_rows[i]:
assert(False)
@then('the result should be')
def expected_result_step(context):
validate(context, False)
check_exception(context)
@then('the result should be, in order')
def expected_result_step(context):
validate_in_order(context, False)
check_exception(context)
@then('the result should be (ignoring element order for lists)')
def expected_result_step(context):
validate(context, True)
check_exception(context)
def check_exception(context):
if context.exception is not None:
context.log.info("Exception when executing query!")
2017-06-20 21:01:11 +08:00
assert(False)
@then('the result should be empty')
def empty_result_step(context):
assert(len(context.results) == 0)
check_exception(context)
@then('the side effects should be')
def side_effects_step(context):
return
2017-06-20 21:01:11 +08:00
@then('no side effects')
def side_effects_step(context):
return