2018-08-23 17:05:29 +08:00
|
|
|
## Graph Streams
|
|
|
|
|
2018-08-24 19:19:57 +08:00
|
|
|
### Kafka
|
|
|
|
|
2018-08-23 17:05:29 +08:00
|
|
|
Memgraphs custom openCypher clause for creating a stream is:
|
|
|
|
```opencypher
|
|
|
|
CREATE STREAM stream_name AS
|
|
|
|
LOAD DATA KAFKA 'URI'
|
|
|
|
WITH TOPIC 'topic'
|
|
|
|
WITH TRANSFORM 'URI'
|
|
|
|
[BATCH_INTERVAL milliseconds]
|
|
|
|
[BATCH_SIZE count]
|
|
|
|
```
|
|
|
|
The `CREATE STREAM` clause happens in a transaction.
|
|
|
|
|
|
|
|
`WITH TOPIC` parameter specifies the Kafka topic from which we'll stream
|
|
|
|
data.
|
|
|
|
|
|
|
|
`WITH TRANSFORM` parameter should contain a URI of the transform script.
|
|
|
|
We cover more about the transform script later, in the [transform](#transform)
|
|
|
|
section.
|
|
|
|
|
|
|
|
`BATCH_INTERVAL` parameter defines the time interval in milliseconds
|
|
|
|
which is the time between two successive stream importing operations.
|
|
|
|
|
|
|
|
`BATCH_SIZE` parameter defines the count of Kafka messages that will be
|
|
|
|
batched together before import.
|
|
|
|
|
|
|
|
If both `BATCH_INTERVAL` and `BATCH_SIZE` parameters are given, the condition
|
|
|
|
that is satisfied first will trigger the batched import.
|
|
|
|
|
|
|
|
Default value for `BATCH_INTERVAL` is 100 milliseconds, and the default value
|
|
|
|
for `BATCH_SIZE` is 10.
|
|
|
|
|
|
|
|
The `DROP` clause deletes a stream:
|
|
|
|
```opencypher
|
|
|
|
DROP STREAM stream_name;
|
|
|
|
```
|
|
|
|
|
|
|
|
The `SHOW` clause enables you to see all configured streams:
|
|
|
|
```opencypher
|
|
|
|
SHOW STREAMS;
|
|
|
|
```
|
|
|
|
|
|
|
|
You can also start/stop streams with the `START` and `STOP` clauses:
|
|
|
|
```opencypher
|
|
|
|
START STREAM stream_name [LIMIT count BATCHES];
|
|
|
|
STOP STREAM stream_name;
|
|
|
|
```
|
|
|
|
A stream needs to be stopped in order to start it and it needs to be started in
|
|
|
|
order to stop it. Starting a started or stopping a stopped stream will not
|
|
|
|
affect that stream.
|
|
|
|
|
|
|
|
There are also convenience clauses to start and stop all streams:
|
|
|
|
```opencypher
|
|
|
|
START ALL STREAMS;
|
|
|
|
STOP ALL STREAMS;
|
|
|
|
```
|
|
|
|
|
|
|
|
Before the actual import, you can also test the stream with the `TEST
|
|
|
|
STREAM` clause:
|
|
|
|
```opencypher
|
|
|
|
TEST STREAM stream_name [LIMIT count BATCHES];
|
|
|
|
```
|
|
|
|
When a stream is tested, data extraction and transformation occurs, but nothing
|
|
|
|
is inserted into the graph.
|
|
|
|
|
|
|
|
A stream needs to be stopped in order to test it. When the batch limit is
|
|
|
|
omitted, `TEST STREAM` will run for only one batch by default.
|
|
|
|
|
2018-08-24 19:19:57 +08:00
|
|
|
#### Transform
|
2018-08-23 17:05:29 +08:00
|
|
|
|
|
|
|
The transform script allows Memgraph users to have custom Kafka messages and
|
|
|
|
still be able to import data in Memgraph by adding the logic to decode the
|
|
|
|
messages in the transform script.
|
|
|
|
|
|
|
|
The entry point of the transform script from Memgraph is the `stream` function.
|
|
|
|
Input for the `stream` function is a list of bytes that represent byte encoded
|
|
|
|
Kafka messages, and the output of the `stream` function must be a list of
|
|
|
|
tuples containing openCypher string queries and corresponding parameters stored
|
|
|
|
in a dictionary.
|
|
|
|
|
|
|
|
To be more precise, the signature of the `stream` function looks like the
|
|
|
|
following:
|
|
|
|
|
|
|
|
```plaintext
|
|
|
|
stream : [bytes] -> [(str, {str : type})]
|
|
|
|
type : none | bool | int | float | str | list | dict
|
|
|
|
```
|
|
|
|
|
|
|
|
An example of a simple transform script that creates vertices if the message
|
|
|
|
contains one number (the vertex id) or it creates edges if the message contains
|
|
|
|
two numbers (origin vertex id and destination vertex id) would look like the
|
|
|
|
following:
|
|
|
|
|
|
|
|
```python
|
|
|
|
def create_vertex(vertex_id):
|
|
|
|
return ("CREATE (:Node {id: $id})", {"id": vertex_id})
|
|
|
|
|
|
|
|
|
|
|
|
def create_edge(from_id, to_id):
|
|
|
|
return ("MATCH (n:Node {id: $from_id}), (m:Node {id: $to_id}) "\
|
|
|
|
"CREATE (n)-[:Edge]->(m)", {"from_id": from_id, "to_id": to_id})
|
|
|
|
|
|
|
|
|
|
|
|
def stream(batch):
|
|
|
|
result = []
|
|
|
|
for item in batch:
|
|
|
|
message = item.decode('utf-8').split()
|
|
|
|
if len(message) == 1:
|
|
|
|
result.append(create_vertex(message[0]))
|
|
|
|
elif len(message) == 2:
|
|
|
|
result.append(create_edge(message[0], message[1]))
|
|
|
|
return result
|
|
|
|
```
|