Add new feature specs (#43)

This commit is contained in:
Marko Budiselić 2021-01-13 11:20:05 +01:00 committed by GitHub
parent 7bff678cd9
commit 57806544cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 618 additions and 314 deletions

View File

@ -0,0 +1,198 @@
# Tensorflow Op - Technicalities
The final result should be a shared object (".so") file that can be dynamically
loaded by the Tensorflow runtime in order to directly access the bolt client.
## About Tensorflow
Tensorflow is usually used with Python such that the Python code is used to
define a directed acyclic computation graph. Basically no computation is done
in Python. Instead, values from Python are copied into the graph structure as
constants to be used by other Ops. The directed acyclic graph naturally ends up
with two sets of border nodes, one for inputs, one for outputs. These are
sometimes called "feeds".
Following the Python definition of the graph, during training, the entire data
processing graph/pipeline is called from Python as a single expression. This
leads to lazy evaluation since the called result has already been defined for a
while.
Tensorflow internally works with tensors, i.e. n-dimensional arrays. That means
all of its inputs need to be matrices as well as its outputs. While it is
possible to feed data directly from Python's numpy matrices straight into
Tensorflow, this is less desirable than using the Tensorflow data API (which
defines data input and processing as a Tensorflow graph) because:
1. The data API is written in C++ and entirely avoids Python and as such is
faster
2. The data API, unlike Python is available in "Tensorflow serving". The
default way to serve Tensorflow models in production.
Once the entire input pipeline is defined via the tf.data API, its input is
basically a list of node IDs the model is supposed to work with. The model,
through the data API knows how to connect to Memgraph and execute openCypher
queries in order to get the remaining data it needs. (For example features of
neighbouring nodes.)
## The Interface
I think it's best you read the official guide...
<https://www.tensorflow.org/extend/adding_an_op> And especially the addition
that specifies how data ops are special
<https://www.tensorflow.org/extend/new_data_formats>
## Compiling the TF Op
There are two options for compiling a custom op. One of them involves pulling
the TF source, adding your code to it and compiling via bazel. This is
probably awkward to do for us and would significantly slow down compilation.
The other method involves installing Tensorflow as a Python package and pulling
the required headers from for example:
`/usr/local/lib/python3.6/site-packages/tensorflow/include` We can then compile
our Op with our regular build system.
This is practical since we can copy the required headers to our repo. If
necessary, we can have several versions of the headers to build several
versions of our Op for every TF version which we want to support. (But this is
unlikely to be required as the API should be stable).
## Example for Using the Bolt Client Tensorflow Op
### Dynamic Loading
``` python3
import tensorflow as tf
mg_ops = tf.load_op_library('/usr/bin/memgraph/tensorflow_ops.so')
```
### Basic Usage
``` python3
dataset = mg_ops.OpenCypherDataset(
# This is probably unfortunate as the username and password
# get hardcoded into the graph, but for the simple case it's fine
"hostname:7687", auth=("user", "pass"),
# Our query
'''
MATCH (n:Train) RETURN n.id, n.features
''',
# Cast return values to these types
(tf.string, tf.float32))
# Some Tensorflow data api boilerplate
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
# Up to now we have only defined our computation graph which basically
# just connects to Memgraph
# `next_element` is not really data but a handle to a node in the Tensorflow
# graph, which we can and do evaluate
# It is a Tensorflow tensor with shape=(None, 2)
# and dtype=(tf.string, tf.float)
# shape `None` means the shape of the tensor is unknown at definition time
# and is dynamic and will only be known once the tensor has been evaluated
with tf.Session() as sess:
node_ids = sess.run(next_element)
# `node_ids` contains IDs and features of all the nodes
# in the graph with the label "Train"
# It is a numpy.ndarray with a shape ($n_matching_nodes, 2)
```
### Memgraph Client as a Generic Tensorflow Op
Other than the Tensorflow Data Op, we'll want to support a generic Tensorflow
Op which can be put anywhere in the Tensorflow computation Graph. It takes in
an arbitrary tensor and produces a tensor. This would be used in the GraphSage
algorithm to fetch the lowest level features into Tensorflow
```python3
requested_ids = np.array([1, 2, 3])
ids_placeholder = tf.placeholder(tf.int32)
model = mg_ops.OpenCypher()
"hostname:7687", auth=("user", "pass"),
"""
UNWIND $node_ids as nid
MATCH (n:Train {id: nid})
RETURN n.features
""",
# What to call the input tensor as an openCypher parameter
parameter_name="node_ids",
# Type of our resulting tensor
dtype=(tf.float32)
)
features = model(ids_placeholder)
with tf.Session() as sess:
result = sess.run(features,
feed_dict={ids_placeholder: requested_ids})
```
This is probably easier to implement than the Data Op, so it might be a good
idea to start with.
### Production Usage
During training, in the GraphSage algorithm at least, Memgraph is at the
beginning and at the end of the Tensorflow computation graph. At the
beginning, the Data Op provides the node IDs which are fed into the generic
Tensorflow Op to find their neighbours and their neighbours and their features.
Production usage differs in that we don't use the Data Op. The Data Op is
effectively cut off and the initial input is fed by Tensorflow serving, with
the data found in the request.
For example a JSON request to classify a node might look like:
`POST http://host:port/v1/models/GraphSage/versions/v1:classify`
With the contents:
```json
{
"examples": [
{"node_id": 1},
{"node_id": 2}
],
}
```
Every element of the "examples" list is an example to be computed. Each is
represented by a dict with keys matching names of feeds in the Tensorflow graph
and values being the values we want fed in for each example.
The REST API then replies in kind with the classification result in JSON.
Note about adding our custom Op to Tensorflow serving. Our Ops .so can be
added into the Bazel build to link with Tensorflow serving or it can be
dynamically loaded by starting Tensorflow serving with a flag
`--custom_op_paths`.
### Considerations
There might be issues here that the url to connect to Memgraph is hardcoded
into the op and would thus be wrong when moved to production, requiring some
type of a hack to make work. We probably want to solve this by having the
client op take in another tf.Variable as an input which would contain a
connection url and username/password. We have to research whether this makes
it easy enough to move to production, as the connection string variable is
still a part of the graph, but maybe easier to replace.
It is probably the best idea to utilize openCypher parameters to make our
queries flexible. The exact API as to how to declare the parameters in Python
is open to discussion.
The Data Op might not even be necessary to implement as it is not key for
production use. It can be replaced in training mode with feed dicts and either
1. Getting the initial list of nodes via a Python Bolt client
2. Creating a separate Tensorflow computation graph that gets all the relevant
node IDs into Python

View File

@ -1,3 +1,33 @@
# Feature Specifications
* [Python Query Modules](python-query-modules.md)
## Active
* [Python Query Modules](active/python-query-modules.md)
* [Tensorflow Op](active/tensorflow-op.md)
## Draft
* [A-star Variable-length Expand](draft/a-star-variable-length-expand.md)
* [Cloud-native Graph Store](draft/cloud-native-graph-store.md)
* [Compile Filter Expressions](draft/compile-filter-expressions.md)
* [Database Triggers](draft/database-triggers.md)
* [Date and Time Data Types](draft/date-and-time-data-types.md)
* [Distributed Query Execution](draft/distributed-query-execution.md)
* [Edge Create or Update Queries](draft/edge-create-or-update-queries.md)
* [Extend Variable-length Filter Expressions](draft/extend-variable-length-filter-expression.md)
* [Geospatial Data Types](draft/geospatial-data-types.md)
* [Hybrid Storage Engine](draft/hybrid-storage-engine.md)
* [Load Data Queries](draft/load-data-queries.md)
* [Multitenancy](draft/multitenancy.md)
* [Query Compilation](draft/query-compilation.md)
* [Release Log Levels](draft/release-log-levels.md)
* [Rust Query Modules](draft/rust-query-modules.md)
* [Sharded Graph Store](draft/sharded-graph-store.md)
* [Storage Memory Management](draft/storage-memory-management.md)
* [Vectorized Query Execution](draft/vectorized-query-execution.md)
## Obsolete
* [Distributed](obsolete/distributed.md)
* [High-availability](obsolete/high-availability.md)
* [Kafka Integration](obsolete/kafka-integration.md)

View File

@ -0,0 +1,15 @@
# A-star Variable-length Expand
Like DFS/BFS/WeightedShortestPath, it should be possible to support the A-star
algorithm in the format of variable length expansion.
Syntactically, the query should look like the following one:
```
MATCH (start)-[
*aStar{{hops}} {{heuristic_expression} {{weight_expression}} {{aggregated_weight_variable}} {{filtering_expression}}
]-(end)
RETURN {{aggregated_weight_variable}};
```
It would be convenient to add geospatial data support before because A-star
works well with geospatial data (heuristic function might exist).

View File

@ -0,0 +1,7 @@
# Cloud-native Graph Store
The biggest problem with the current in-memory storage is the total cost of
ownership for large datasets non-frequently updated. An idea to solve that is a
decoupled storage and compute inside a cloud environment. E.g., on AWS, a
database instance could use EC2 machines to run the query execution against
data stored inside S3.

View File

@ -0,0 +1,40 @@
# Compile Filter Expressions
Memgraph evaluates filter expression by traversing the abstract syntax tree of
the given filter. Filtering is a general operation in query execution.
Some simple examples are:
```
MATCH (n:Person {name: "John"}) WHERE n.age > 20 AND n.age < 40 RETURN n;
MATCH (a {id: 723})-[*bfs..10 (e, n | e.x > 12 AND n.y < 3)]-() RETURN *;
```
More real-world example looks like this (Ethereum network analysis):
```
MATCH (a: Address {addr: ''})-[]->(t: Transaction)-[]->(b: Address)
RETURN DISTINCT b.addr
UNION
MATCH (a: Address {addr: ''})-[]->(t: Transaction)-[]->(b1: Address)-[]->(t2: Transaction)-[]->(b: Address)
WHERE t2.timestamp > t.timestamp
RETURN DISTINCT b.addr
UNION
MATCH (a: Address {addr: ''})-[]->(t: Transaction)-[]->(b1: Address)-[]->(t2: Transaction)-[]->(b2: Address)-[]->(t3: Transaction)-[]->(b: Address)
WHERE t2.timestamp > t.timestamp AND t3.timestamp > t2.timestamp
return distinct b.addr
UNION
MATCH (a: Address {addr: ''})-[]->(t: Transaction)-[]->(b1: Address)-[]->(t2: Transaction)-[]->(b2: Address)-[]->(t3: Transaction)-[]->(b3: Address)-[]->(t4: Transaction)-[]->(b: Address)
WHERE t2.timestamp > t.timestamp AND t3.timestamp > t2.timestamp AND t4.timestamp > t3.timestamp
RETURN DISTINCT b.addr
UNION
MATCH (a: Address {addr: ''})-[]->(t: Transaction)-[]->(b1: Address)-[]->(t2: Transaction)-[]->(b2: Address)-[]->(t3: Transaction)-[]->(b3: Address)-[]->(t4: Transaction)-[]->(b4: Address)-[]->(t5: Transaction)-[]->(b: Address)
WHERE t2.timestamp > t.timestamp AND t3.timestamp > t2.timestamp AND t4.timestamp > t3.timestamp AND t5.timestamp > t4.timestamp
RETURN DISTINCT b.addr;
```
Filtering may take a significant portion of query execution, which means it has
to be fast.
The first step towards improvement might be to expose an API under which a
developer can implement its filtering logic (it's OK to support only C++ in the
beginning). Later on, we can introduce an automatic compilation of filtering
expressions.

View File

@ -0,0 +1,14 @@
# Database Triggers
Memgraph doesn't have any built-in notification mechanism yet. In the case a
user wants to get notified about anything happening inside Memgraph, the only
option is some pull mechanism from the client code. In many cases, that might
be suboptimal.
A natural place to start would be put to some notification code on each update
action inside Memgraph. It's probably too early to send a notification
immediately after WAL delta gets created, but at some point after transaction
commits or after WAL deltas are written to disk might be a pretty good place.
Furthermore, Memgraph has the query module infrastructure. The first
implementation might call a user-defined query module procedure and pass
whatever gets created or updated during the query execution.

View File

@ -0,0 +1,13 @@
# Date and Time Data Types
Neo4j offers the following functionality:
* https://neo4j.com/docs/cypher-manual/current/syntax/temporal/
* https://neo4j.com/docs/cypher-manual/current/functions/temporal/
The question is, how are we going to support equivalent capabilities? We need
something very similar because these are, in general, very well defined types.
A note about the storage is that Memgraph has a limit on the total number of
different data types, 16 at this point. We have to be mindful of that during
the design phase.

View File

@ -0,0 +1,10 @@
# Distributed Query Execution
Add the ability to execute graph algorithms on a cluster of machines. The scope
of this is ONLY the query execution without changing the underlying storage
because that's much more complex. The first significant decision here is to
figure out do we implement our own distributed execution engine or deploy
something already available, like [Giraph](https://giraph.apache.org). An
important part is that Giraph by itself isn't enough because people want to
update data on the fly. The final solution needs to provide some updating
capabilities.

View File

@ -0,0 +1,14 @@
# Edge Create or Update Queries
The old semantic of the `MERGE` clause is quite tricky. The new semantic of
`MERGE` is explained
[here](https://blog.acolyer.org/2019/09/18/updating-graph-databases-with-cypher/).
Similar to `MERGE`, but maybe simpler is to define clauses and semantics that
apply only to a single edge. In the case an edge between two nodes doesn't
exist, it should be created. On the other hand, if it exists, it should be
updated. The syntax should look similar to the following:
```
MERGE EDGE (a)-[e:Type {props}]->(b) [ON CREATE SET expression ON UPDATE SET expression] ...
```

View File

@ -0,0 +1,12 @@
# Extend Variable-length Filter Expressions
Variable-length filtering (DFS/BFS/WeightedShortestPath) can to be arbitrarily
complex. At this point, the filtering expression only gets currently visited
node and edge:
```
MATCH (a {id: 723})-[*bfs..10 (e, n | e.x > 12 AND n.y < 3)]-() RETURN *;
```
If a user had the whole path available, he would write more complex filtering
logic.

View File

@ -0,0 +1,28 @@
# Geospatial Data Types
Neo4j offers the following functionality:
* https://neo4j.com/docs/cypher-manual/current/syntax/spatial/
* https://neo4j.com/docs/cypher-manual/current/functions/spatial/
The question is, how are we going to support equivalent capabilities? We need
something very similar because these are, in general, very well defined types.
The main reasons for implementing this feature are:
1. Ease of use. At this point, users have to encode/decode time data types
manually.
2. Memory efficiency in some cases because user defined encoding could still
be more efficient.
The number of functionalities that could be built on top of geospatial types is
huge. Probably some C/C++ libraries should be used:
* https://github.com/OSGeo/gdal.
* http://geostarslib.sourceforge.net/ Furthermore, the query engine could use
these data types during query execution (specific for query execution).
* https://www.cgal.org.
Also, the storage engine could have specialized indices for these types of
data.
A note about the storage is that Memgraph has a limit on the total number of
different data types, 16 at this point. We have to be mindful of that during
the design phase.

View File

@ -0,0 +1,20 @@
# Hybrid Storage Engine
The goal here is easy to improve Memgraph storage massively! Please take a look
[here](http://cidrdb.org/cidr2020/papers/p29-neumann-cidr20.pdf) for the
reasons.
The general idea is to store edges on disk by using an LSM like data structure.
Storing edge properties will be tricky because strict schema also has to be
introduced. Otherwise, it's impossible to store data on disk optimally (Neo4j
already has a pretty optimized implementation of that). Furthermore, we have to
introduce the paging concept.
This is a complex feature because various aspects of the core engine have to be
considered and probably updated (memory management, garbage collection,
indexing).
## References
* [On Disk IO, Part 3: LSM Trees](https://medium.com/databasss/on-disk-io-part-3-lsm-trees-8b2da218496f)
* [2020-04-13 On-disk Edge Store Research](https://docs.google.com/document/d/1avoR2g9dNWa4FSFt9NVn4JrT6uOAH_ReNeUoNVsJ7J4)

View File

@ -0,0 +1,17 @@
# Load Data Queries
Loading data into Memgraph is a challenging task. We have to implement
something equivalent to the [Neo4j LOAD
CSV](https://neo4j.com/developer/guide-import-csv/#import-load-csv). This
feature seems relatively straightforward to implement because `LoadCSV` could
be another operator that would yield row by row. By having the operator, the
operation would be composable with the rest of the `CREATE`|`MERGE` queries.
The composability is the key because users would be able to combine various
clauses to import data.
A more general concept is [SingleStore
Pipelines](https://docs.singlestore.com/v7.1/reference/sql-reference/pipelines-commands/create-pipeline).
We already tried with [Graph Streams](../obsolete/kafka-integration.md). An option
is to migrate that code as a standalone product
[here](https://github.com/memgraph/mgtools).

View File

@ -0,0 +1,15 @@
# Multitenancy
[Multitenancy](https://en.wikipedia.org/wiki/Multitenancy) is a feature mainly
in the domain of ease of use. Neo4j made a great move by introducing
[Fabric](https://neo4j.com/developer/multi-tenancy-worked-example).
Memgraph first step in a similar direction would be to add an abstraction layer
containing multiple `Storage` instances + the ability to specify a database
instance per client session or database transaction.
## Replication Context
Each transaction has to encode on top of which database it's getting executed.
Once a replica gets delta objects containing database info, the replica engine
could apply changes locally.

View File

@ -0,0 +1,14 @@
# Query Compilation
Memgraph supports the interpretation of queries in a pull-based way. An
advantage of interpreting queries is a fast time until the execution, which is
convenient when a user wants to test a bunch of queries in a short time. The
downside is slow runtime. The runtime could be improved by compiling query
plans.
## Research Area 1
The easiest route to the query compilation might be generating [virtual
constexpr](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p1064r0.html)
pull functions, making a dynamic library out of the entire compiled query plan,
and swapping query plans during the database runtime.

View File

@ -0,0 +1,17 @@
# Release Log Levels
It's impossible to control the log level in Memgraph Community. That means it's
tough to debug issues in interacting with Memgraph. At least three log levels
should be available to the user:
* Log nothing (as it is now).
* Log each executed query.
* Log Bolt server states.
Memgraph Enterprise has the audit log feature. The audit log provides
additional info about each query (user, source, etc.), but it's only available
in the Enterprise edition. Furthermore, the intention of audit logs isn't
debugging.
An important note is that the logged queries should be stripped out because, in
the Memgraph cloud context, we shouldn't log sensitive data.

View File

@ -0,0 +1,15 @@
# Rust Query Modules
Memgraph provides the query modules infrastructure. It's possible to write
query modules in
[C/C++](https://docs.memgraph.com/memgraph/reference-overview/query-modules/c-api)
and
[Python](https://docs.memgraph.com/memgraph/reference-overview/query-modules/python-api).
The problem with C/C++ is that it's very error-prone and time-consuming.
Python's problem is that it's slow and has a bunch of other limitations listed
in the [feature spec](../active/python-query-modules.md).
On the other hand, Rust is fast and much less error-prone compared to C. It
should be possible to use [bindgen](https://github.com/rust-lang/rust-bindgen)
to generate bindings out of the current C API and write wrapper code for Rust
developers to enjoy.

View File

@ -0,0 +1,8 @@
# Sharded Graph Store
Add the ability to shard graph data across machines in a cluster. The scope of
this is ONLY changing to the storage engine.
## References
* [Spinner: Scalable Graph Partitioning in the Cloud](https://arxiv.org/pdf/1404.3861.pdf)

View File

@ -0,0 +1,13 @@
# Storage Memory Management
If Memgraph uses too much memory, OS will kill it. There has to be an internal
mechanism to control memory usage.
Since C++17, polymorphic allocators are an excellent way to inject custom
memory management while having a modular code. Memgraph already uses PMR in the
query execution. Also, refer to [1] on how to start with PMR in the storage
context.
## Resources
[1] [PMR: Mistakes Were Made](https://www.youtube.com/watch?v=6BLlIj2QoT8)

View File

@ -0,0 +1,9 @@
# Vectorized Query Execution
Memgraph query engine pulls one by one record during query execution. A more
efficient way would be to pull multiple records in an array. Adding that
shouldn't be complicated, but it wouldn't be advantageous without vectorizing
fetching records from the storage.
On the query engine level, the array could be part of the frame. In other
words, the frame and the code dealing with the frame has to be changed.

View File

@ -1,75 +0,0 @@
# Dynamic Graph Partitioning (abbr. DGP)
## Implementation
Take a look under `dev/memgraph/distributed/dynamic_graph_partitioning.md`.
### Implemented parameters
--dynamic-graph-partitioner-enabled (If the dynamic graph partitioner should be
enabled.) type: bool default: false (start time)
--dgp-improvement-threshold (How much better should specific node score be
to consider a migration to another worker. This represents the minimal
difference between new score that the vertex will have when migrated
and the old one such that it's migrated.) type: int32 default: 10
(start time)
--dgp-max-batch-size (Maximal amount of vertices which should be migrated
in one dynamic graph partitioner step.) type: int32 default: 2000
(start time)
## Planning
### Design decisions
* Each partitioning session has to be a new transaction.
* When and how does an instance perform the moves?
* Periodically.
* Token sharing (round robin, exactly one instance at a time has an
opportunity to perform the moves).
* On server-side serialization error (when DGP receives an error).
-> Quit partitioning and wait for the next turn.
* On client-side serialization error (when end client receives an error).
-> The client should never receive an error because of any
internal operation.
-> For the first implementation, it's good enough to wait until data becomes
available again.
-> It would be nice to achieve that DGP has lower priority than end client
operations.
### End-user parameters
* --dynamic-graph-partitioner-enabled (execution time)
* --dgp-improvement-threshold (execution time)
* --dgp-max-batch-size (execution time)
* --dgp-min-batch-size (execution time)
-> Minimum number of nodes that will be moved in each step.
* --dgp-fitness-threshold (execution time)
-> Do not perform moves if partitioning is good enough.
* --dgp-delta-turn-time (execution time)
-> Time between each turn.
* --dgp-delta-step-time (execution time)
-> Time between each step.
* --dgp-step-time (execution time)
-> Time limit per each step.
### Testing
The implementation has to provide good enough results in terms of:
* How good the partitioning is (numeric value), aka goodness.
* Workload execution time.
* Stress test correctness.
Test cases:
* N not connected subgraphs
-> shuffle nodes to N instances
-> run partitioning
-> test perfect partitioning.
* N connected subgraph
-> shuffle nodes to N instance
-> run partitioning
-> test partitioning.
* Take realistic workload (Long Running, LDBC1, LDBC2, Card Fraud, BFS, WSP)
-> measure exec time
-> run partitioning
-> test partitioning
-> measure exec time (during and after partitioning).

View File

@ -1,34 +0,0 @@
# Kafka - data transform
The transform script is a user defined script written in Python. The script
should be aware of the data format in the Kafka message.
Each Kafka message is byte length encoded, which means that the first eight
bytes of each message contain the length of the message.
A sample code for a streaming transform script could look like this:
```python
def create_vertex(vertex_id):
return ("CREATE (:Node {id: $id})", {"id": vertex_id})
def create_edge(from_id, to_id):
return ("MATCH (n:Node {id: $from_id}), (m:Node {id: $to_id}) "\
"CREATE (n)-[:Edge]->(m)", {"from_id": from_id, "to_id": to_id})
def stream(batch):
result = []
for item in batch:
message = item.decode('utf-8').strip().split()
if len(message) == 1:
result.append(create_vertex(message[0])))
else:
result.append(create_edge(message[0], message[1]))
return result
```
The script should output openCypher query strings based on the type of the
records.

View File

@ -76,3 +76,73 @@ The recovery happens in following steps:
* Master broadcasts a recovery request with the common recovery point.
* Master waits for the cluster to recover.
* After a successful cluster recovery, master can enter Working state.
## Dynamic Graph Partitioning (abbr. DGP)
### Implemented parameters
--dynamic-graph-partitioner-enabled (If the dynamic graph partitioner should be
enabled.) type: bool default: false (start time)
--dgp-improvement-threshold (How much better should specific node score be
to consider a migration to another worker. This represents the minimal
difference between new score that the vertex will have when migrated
and the old one such that it's migrated.) type: int32 default: 10
(start time)
--dgp-max-batch-size (Maximal amount of vertices which should be migrated
in one dynamic graph partitioner step.) type: int32 default: 2000
(start time)
### Design decisions
* Each partitioning session has to be a new transaction.
* When and how does an instance perform the moves?
* Periodically.
* Token sharing (round robin, exactly one instance at a time has an
opportunity to perform the moves).
* On server-side serialization error (when DGP receives an error).
-> Quit partitioning and wait for the next turn.
* On client-side serialization error (when end client receives an error).
-> The client should never receive an error because of any
internal operation.
-> For the first implementation, it's good enough to wait until data becomes
available again.
-> It would be nice to achieve that DGP has lower priority than end client
operations.
### End-user parameters
* --dynamic-graph-partitioner-enabled (execution time)
* --dgp-improvement-threshold (execution time)
* --dgp-max-batch-size (execution time)
* --dgp-min-batch-size (execution time)
-> Minimum number of nodes that will be moved in each step.
* --dgp-fitness-threshold (execution time)
-> Do not perform moves if partitioning is good enough.
* --dgp-delta-turn-time (execution time)
-> Time between each turn.
* --dgp-delta-step-time (execution time)
-> Time between each step.
* --dgp-step-time (execution time)
-> Time limit per each step.
### Testing
The implementation has to provide good enough results in terms of:
* How good the partitioning is (numeric value), aka goodness.
* Workload execution time.
* Stress test correctness.
Test cases:
* N not connected subgraphs
-> shuffle nodes to N instances
-> run partitioning
-> test perfect partitioning.
* N connected subgraph
-> shuffle nodes to N instance
-> run partitioning
-> test partitioning.
* Take realistic workload (Long Running, LDBC1, LDBC2, Card Fraud, BFS, WSP)
-> measure exec time
-> run partitioning
-> test partitioning
-> measure exec time (during and after partitioning).

View File

@ -1,4 +1,6 @@
# Kafka - openCypher clause
# Kafka Integration
## openCypher clause
One must be able to specify the following when importing data from Kafka:
@ -78,3 +80,38 @@ output is inserted in the graph.
A stream needs to be stopped in order to test it. When the batch limit is
omitted, `TEST STREAM` will run for only one batch by default.
## Data Transform
The transform script is a user defined script written in Python. The script
should be aware of the data format in the Kafka message.
Each Kafka message is byte length encoded, which means that the first eight
bytes of each message contain the length of the message.
A sample code for a streaming transform script could look like this:
```python
def create_vertex(vertex_id):
return ("CREATE (:Node {id: $id})", {"id": vertex_id})
def create_edge(from_id, to_id):
return ("MATCH (n:Node {id: $from_id}), (m:Node {id: $to_id}) "\
"CREATE (n)-[:Edge]->(m)", {"from_id": from_id, "to_id": to_id})
def stream(batch):
result = []
for item in batch:
message = item.decode('utf-8').strip().split()
if len(message) == 1:
result.append(create_vertex(message[0])))
else:
result.append(create_edge(message[0], message[1]))
return result
```
The script should output openCypher query strings based on the type of the
records.

View File

@ -1,61 +0,0 @@
# Tensorflow Op - Technicalities
The final result should be a shared object (".so") file that can be
dynamically loaded by the Tensorflow runtime in order to directly
access the bolt client.
## About Tensorflow
Tensorflow is usually used with Python such that the Python code is used
to define a directed acyclic computation graph. Basically no computation
is done in Python. Instead, values from Python are copied into the graph
structure as constants to be used by other Ops. The directed acyclic graph
naturally ends up with two sets of border nodes, one for inputs, one for
outputs. These are sometimes called "feeds".
Following the Python definition of the graph, during training, the entire
data processing graph/pipeline is called from Python as a single expression.
This leads to lazy evaluation since the called result has already been
defined for a while.
Tensorflow internally works with tensors, i.e. n-dimensional arrays. That
means all of its inputs need to be matrices as well as its outputs. While
it is possible to feed data directly from Python's numpy matrices straight
into Tensorflow, this is less desirable than using the Tensorflow data API
(which defines data input and processing as a Tensorflow graph) because:
1. The data API is written in C++ and entirely avoids Python and as such
is faster
2. The data API, unlike Python is available in "Tensorflow serving". The
default way to serve Tensorflow models in production.
Once the entire input pipeline is defined via the tf.data API, its input
is basically a list of node IDs the model is supposed to work with. The
model, through the data API knows how to connect to Memgraph and execute
openCypher queries in order to get the remaining data it needs.
(For example features of neighbouring nodes.)
## The Interface
I think it's best you read the official guide...
<https://www.tensorflow.org/extend/adding_an_op>
And especially the addition that specifies how data ops are special
<https://www.tensorflow.org/extend/new_data_formats>
## Compiling the TF Op
There are two options for compiling a custom op.
One of them involves pulling the TF source, adding your code to it and
compiling via bazel.
This is probably awkward to do for us and would
significantly slow down compilation.
The other method involves installing Tensorflow as a Python package and
pulling the required headers from for example:
`/usr/local/lib/python3.6/site-packages/tensorflow/include`
We can then compile our Op with our regular build system.
This is practical since we can copy the required headers to our repo.
If necessary, we can have several versions of the headers to build several
versions of our Op for every TF version which we want to support.
(But this is unlikely to be required as the API should be stable).

View File

@ -1,142 +0,0 @@
# Example for Using the Bolt Client Tensorflow Op
## Dynamic Loading
``` python3
import tensorflow as tf
mg_ops = tf.load_op_library('/usr/bin/memgraph/tensorflow_ops.so')
```
## Basic Usage
``` python3
dataset = mg_ops.OpenCypherDataset(
# This is probably unfortunate as the username and password
# get hardcoded into the graph, but for the simple case it's fine
"hostname:7687", auth=("user", "pass"),
# Our query
'''
MATCH (n:Train) RETURN n.id, n.features
''',
# Cast return values to these types
(tf.string, tf.float32))
# Some Tensorflow data api boilerplate
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
# Up to now we have only defined our computation graph which basically
# just connects to Memgraph
# `next_element` is not really data but a handle to a node in the Tensorflow
# graph, which we can and do evaluate
# It is a Tensorflow tensor with shape=(None, 2)
# and dtype=(tf.string, tf.float)
# shape `None` means the shape of the tensor is unknown at definition time
# and is dynamic and will only be known once the tensor has been evaluated
with tf.Session() as sess:
node_ids = sess.run(next_element)
# `node_ids` contains IDs and features of all the nodes
# in the graph with the label "Train"
# It is a numpy.ndarray with a shape ($n_matching_nodes, 2)
```
## Memgraph Client as a Generic Tensorflow Op
Other than the Tensorflow Data Op, we'll want to support a generic Tensorflow
Op which can be put anywhere in the Tensorflow computation Graph. It takes in
an arbitrary tensor and produces a tensor. This would be used in the GraphSage
algorithm to fetch the lowest level features into Tensorflow
```python3
requested_ids = np.array([1, 2, 3])
ids_placeholder = tf.placeholder(tf.int32)
model = mg_ops.OpenCypher()
"hostname:7687", auth=("user", "pass"),
"""
UNWIND $node_ids as nid
MATCH (n:Train {id: nid})
RETURN n.features
""",
# What to call the input tensor as an openCypher parameter
parameter_name="node_ids",
# Type of our resulting tensor
dtype=(tf.float32)
)
features = model(ids_placeholder)
with tf.Session() as sess:
result = sess.run(features,
feed_dict={ids_placeholder: requested_ids})
```
This is probably easier to implement than the Data Op, so it might be a good
idea to start with.
## Production Usage
During training, in the GraphSage algorithm at least, Memgraph is at the
beginning and at the end of the Tensorflow computation graph.
At the beginning, the Data Op provides the node IDs which are fed into the
generic Tensorflow Op to find their neighbours and their neighbours and
their features.
Production usage differs in that we don't use the Data Op. The Data Op is
effectively cut off and the initial input is fed by Tensorflow serving,
with the data found in the request.
For example a JSON request to classify a node might look like:
`POST http://host:port/v1/models/GraphSage/versions/v1:classify`
With the contents:
```json
{
"examples": [
{"node_id": 1},
{"node_id": 2}
],
}
```
Every element of the "examples" list is an example to be computed. Each is
represented by a dict with keys matching names of feeds in the Tensorflow
graph and values being the values we want fed in for each example
The REST API then replies in kind with the classification result in JSON
Note about adding our custom Op to Tensorflow serving.
Our Ops .so can be added into the Bazel build to link with Tensorflow serving
or it can be dynamically loaded by starting Tensorflow serving with a flag
`--custom_op_paths`
## Considerations
There might be issues here that the url to connect to Memgraph is
hardcoded into the op and would thus be wrong when moved to production,
requiring some type of a hack to make work. We probably want to solve
this by having the client op take in another tf.Variable as an input
which would contain a connection url and username/password.
We have to research whether this makes it easy enough to move to
production, as the connection string variable is still a part of the
graph, but maybe easier to replace.
It is probably the best idea to utilize openCypher parameters to make
our queries flexible. The exact API as to how to declare the parameters
in Python is open to discussion.
The Data Op might not even be necessary to implement as it is not
key for production use. It can be replaced in training mode with
feed dicts and either
1. Getting the initial list of nodes via a Python Bolt client
2. Creating a separate Tensorflow computation graph that gets all the
relevant node IDs into Python