Clean docs folder

Summary:
Removed folders:
  * dev/diagram
  * dev/distributed
  * feature_ref

Reviewers: teon.banek, mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2710
This commit is contained in:
Marko Budiselic 2020-03-05 15:47:37 +01:00
parent 000d6dba55
commit 32b35f6f88
8 changed files with 0 additions and 631 deletions

View File

@ -1,288 +0,0 @@
// dot -Tpng dependencies.dot -o /path/to/output.png
// TODO (buda): Put PropertyValueStore to storage namespace
digraph {
// At the beginning of each block there is a default style for that block
label="Memgraph Dependencies Diagram"; fontname="Roboto Bold"; fontcolor=black;
fontsize=26; labelloc=top; labeljust=right;
compound=true; // If true, allow edges between clusters
rankdir=TB; // Alternatives: LR
node [shape=record fontname="Roboto", fontsize=12, fontcolor=white];
edge [color="#B5AFB7"];
// -- Legend --
// dir=both arrowtail=diamond arrowhead=vee -> group ownership
// dir=both arrowtail=none, arrowhead=vee -> ownership; stack or uptr
subgraph cluster_tcp_end_client_communication {
label="TCP End Client Communication"; fontsize=14;
node [style=filled, color="#DD2222" fillcolor="#DD2222"];
// Owned elements
"communication::Server";
"io::network::Socket";
// Intracluster connections
"communication::Server" -> "io::network::Socket"
[label="socket_" dir=both arrowtail=none arrowhead=vee];
}
subgraph cluster_bolt_server {
label="Bolt Server"; fontsize=14;
node [style=filled, color="#62A2CA" fillcolor="#62A2CA"];
// Owned elements
"communication::bolt::SessionData";
"communication::bolt::Session";
"communication::bolt::Encoder";
"communication::bolt::Decoder";
// Intracluster connections
"communication::bolt::Session" -> "communication::bolt::Encoder"
[label="encoder_", dir=both arrowtail=none, arrowhead=vee];
"communication::bolt::Session" -> "communication::bolt::Decoder"
[label="decoder_", dir=both arrowtail=none, arrowhead=vee];
}
subgraph cluster_opencypher_engine {
label="openCypher Engine"; fontsize=14;
node [style=filled, color="#68BDF6" fillcolor="#68BDF6"];
// Owned Elements
"query::Interpreter";
"query::AstTreeStorage";
"query::TypedValue"
"query::Path";
"query::Simbol";
"query::Context";
"query::ExpressionEvaluator";
"query::Frame";
"query::SymbolTable";
"query::plan::LogicalOperator";
"query::plan::Cursor";
"query::plan::CostEstimator";
// Intracluster connections
"query::Interpreter" -> "query::AstTreeStorage"
[label="ast_cache" dir=both arrowtail=diamond arrowhead=vee];
"query::TypedValue" -> "query::Path";
"query::plan::Cursor" -> "query::Frame";
"query::plan::Cursor" -> "query::Context";
"query::plan::LogicalOperator" -> "query::Symbol";
"query::plan::LogicalOperator" -> "query::SymbolTable";
"query::plan::LogicalOperator" -> "query::plan::Cursor";
}
subgraph cluster_storage {
label="Storage" fontsize=14;
node [style=filled, color="#FB6E00" fillcolor="#FB6E00"];
// Owned Elements
"database::GraphDb";
"database::GraphDbAccessor";
"storage::Record";
"storage::Vertex";
"storage::Edge";
"storage::RecordAccessor";
"storage::VertexAccessor";
"storage::EdgeAccessor";
"storage::Common";
"storage::Label";
"storage::EdgeType";
"storage::Property";
"storage::compression";
"storage::SingleNodeConcurrentIdMapper";
"storage::Location";
"storage::StorageTypesLocation";
"PropertyValueStore";
"storage::RecordLock";
"mvcc::Version";
"mvcc::Record";
"mvcc::VersionList";
// Intracluster connections
"storage::VertexAccessor" -> "storage::RecordAccessor"
[arrowhead=onormal];
"storage::EdgeAccessor" -> "storage::RecordAccessor"
[arrowhead=onormal];
"storage::RecordAccessor" -> "database::GraphDbAccessor"
[style=dashed arrowhead=vee];
"storage::Vertex" -> "mvcc::Record"
[arrowhead=onormal];
"storage::Edge" -> "mvcc::Record"
[arrowhead=onormal];
"storage::Edge" -> "PropertyValueStore"
[arrowhead=vee];
"storage::Vertex" -> "PropertyValueStore"
[arrowhead=vee];
"storage::Edge" -> "mvcc::VersionList"
[label="from,to" arrowhead=vee style=dashed];
"storage::VertexAccessor" -> "storage::Vertex"
[arrowhead=vee];
"storage::EdgeAccessor" -> "storage::Edge"
[arrowhead=vee];
"storage::SingleNodeConcurrentIdMapper" -> "storage::StorageTypesLocation"
[arrowhead=vee];
"storage::StorageTypesLocation" -> "storage::Location"
[arrowhead=vee];
"storage::Storage" -> "storage::StorageTypesLocation"
[arrowhead=vee];
"storage::Property" -> "storage::Common"
[arrowhead=onormal];
"storage::Label" -> "storage::Common"
[arrowhead=onormal];
"storage::EdgeType" -> "storage::Common"
[arrowhead=onormal];
"storage::Property" -> "storage::Location"
[arrowhead=vee];
"PropertyValueStore" -> "storage::Property"
[arrowhead=vee];
"PropertyValueStore" -> "storage::Location"
[arrowhead=vee];
"database::GraphDbAccessor" -> "database::GraphDb"
[arrowhead=vee];
"database::GraphDbAccessor" -> "tx::TransactionId"
[arrowhead=vee];
"mvcc::VersionList" -> "storge::RecordLock"
[label="lock" arrowhead=vee];
"mvcc::VersionList" -> "mvcc::Record"
[label="head" arrowhead=vee];
"mvcc::Record" -> "mvcc::Version"
[arrowhead=onormal];
// Explicit positioning
{rank=same;
"database::GraphDbAccessor";
"storage::VertexAccessor";
"storage::EdgeAccessor";}
{rank=same;
"storage::Common";
"storage::compression";}
}
subgraph cluster_properties_on_disk {
label="Properties on Disk" fontsize=14;
node [style=filled, color="#102647" fillcolor="#102647"];
// Owned Elements
"storage::KVStore";
"rocksdb";
// Intracluster connections
"storage::KVStore" -> "rocksdb";
}
subgraph cluster_distributed {
label="Distributed" fontsize=14;
node [style=filled, color="#FFC500" fillcolor="#FFC500"];
// Owned Elements
"distributed::DataManager";
"distributed::DataRpcClients";
// Intracluster connections
"distributed::DataManager" -> "distributed::DataRpcClients"
[arrowhead=vee];
"storage::RecordAccessor" -> "distributed::DataManager"
[style=dashed arrowhead=vee];
}
subgraph cluster_dynamic_partitioning {
label="Dynamic Partitioning" fontsize=14;
node [style=filled, color="#720096" fillcolor="#720096"];
// Owned Elements
"DynamicPartitioner";
}
subgraph cluster_security {
label="Security" fontsize=14;
node [style=filled, color="#857F87" fillcolor="#857F87"];
// Owned Elements
"Communication Encryption";
"Data Encryption";
"Access Control";
"Audit Logging";
}
subgraph cluster_web_dashboard {
label="Dashaboard" fontsize=14;
node [style=filled, color="#FF0092" fillcolor="#FF0092"];
// Owned Elements
"Memgraph Ops / Memgraph Cockpit";
}
subgraph cluster_rpc {
label="RPC" fontsize=14;
node [style=filled, color="#857F87" fillcolor="#857F87"];
// Owned Elements
"communication::rpc::Server";
"communication::rpc::Client";
}
subgraph cluster_ingestion {
label="Ingestion" fontsize=14;
node [style=filled, color="#0B6D88" fillcolor="#0B6D88"];
// Owned Elements
"Extract";
"Transform";
"Load";
"Amazon S3";
"Kafka";
// Intracluster connections
"Extract" -> "Amazon S3";
"Extract" -> "Kafka";
// Explicit positioning
{rank=same;"Extract";"Transform";"Load";}
}
// -- Intercluster connections --
// cluster_tcp_end_client_communication -- cluster_bolt_server
"communication::Server" -> "communication::bolt::SessionData" [color=black];
"communication::Server" -> "communication::bolt::Session" [color=black];
// cluster_bolt_server -> cluster_storage
"communication::bolt::SessionData" -> "database::GraphDb" [color=red];
"communication::bolt::Session" -> "database::GraphDbAccessor" [color=red];
// cluster_bolt_server -> cluster_opencypher_engine
"communication::bolt::SessionData" -> "query::Interpreter" [color=red];
// cluster_opencypher_engine -- cluster_storage
"query::Interpreter" -> "database::GraphDbAccessor" [color=black];
"query::Interpreter" -> "storage::VertexAccessor" [color=black];
"query::Interpreter" -> "storage::EdgeAccessor" [color=black];
"query::TypedValue" -> "storage::VertexAccessor" [color=black];
"query::TypedValue" -> "storage::EdgeAccessor" [color=black];
"query::Path" -> "storage::VertexAccessor"
[label="vertices" dir=both arrowtail=diamond arrowhead=vee color=black];
"query::Path" -> "storage::EdgeAccessor"
[label="edges" dir=both arrowtail=diamond arrowhead=vee color=black];
"query::plan::LogicalOperator" -> "database::GraphDbAccessor"
[color=black arrowhead=vee];
// cluster_distributed -- cluster_storage
"distributed::DataManager" -> "database::GraphDb"
[arrowhead=vee style=dashed color=red];
"distributed::DataManager" -> "tx::TransactionId"
[label="ves_caches_key" dir=both arrowhead=none arrowtail=diamond
color=red];
"distributed::DataManager" -> "storage::Vertex"
[label="vertices_caches" dir=both arrowhead=none arrowtail=diamond
color=red];
"distributed::DataManager" -> "storage::Edge"
[label="edges_caches" dir=both arrowhead=none arrowtail=diamond
color=red];
// cluster_storage -- cluster_properties_on_disk
"PropertyValueStore" -> "storage::KVStore"
[label="static" arrowhead=vee color=black];
// cluster_dynamic_partitioning -- cluster_storage
"database::GraphDb" -> "DynamicPartitioner"
[arrowhead=vee color=red];
"DynamicPartitioner" -> "database::GraphDbAccessor"
[arrowhead=vee color=black];
}

View File

@ -1,22 +0,0 @@
digraph {
// label="Dynamig Graph Partitioning";
fontname="Roboto Bold"; fontcolor=black;
fontsize=26; labelloc=top; labeljust=center;
compound=true; // If true, allow edges between clusters
rankdir=TB; // Alternatives: LR
node [shape=record fontname="Roboto", fontsize=12, fontcolor=white
style=filled, color="#FB6E00" fillcolor="#FB6E00"];
edge [color="#B5AFB7"];
"distributed::DistributedGraphDb" -> "distributed::TokenSharingRpcServer";
"distributed::TokenSharingRpcServer" -> "communication::rpc::Server";
"distributed::TokenSharingRpcServer" -> "distributed::Coordination";
"distributed::TokenSharingRpcServer" -> "distributed::TokenSharingRpcClients";
"distributed::TokenSharingRpcServer" -> "distributed::dgp::Partitioner";
"distributed::dgp::Partitioner" -> "distributed::DistributedGraphDb" [style=dashed];
"distributed::dgp::Partitioner" -> "distributed::dgp::VertexMigrator";
"distributed::dgp::VertexMigrator" -> "database::GraphDbAccessor" [style=dashed];
}

View File

@ -1,43 +0,0 @@
# Distributed addressing
In distributed Memgraph a single graph element must be owned by exactly
one worker. It is possible that multiple workers have cached copies of
a single graph element (which is inevitable), but there is only one
owner.
The owner of a graph element can change. This is not yet implemented,
but is intended. Graph partitioning is intended to be dynamic.
Graph elements refer to other graph elements that are possibly on some
other worker. Even though each graph element is identified with a unique
ID, that ID does not contain the information about where that element
currently resides (which worker is the owner).
Thus we introduce the concept of a global address. It indicates both
which graph element is referred to (it's global ID), and where it
resides. Semantically it's a pair of two elements, but for efficiency
it's stored in 64 bits.
The global address is efficient for usage in a cluster: it indicates
where something can be found. However, finding a graph element based on
it's ID is still not a free operation (in the current implementation
it's a skiplist lookup). So, whenever possible, it's better to use local
addresses (pointers).
Succinctly, the requirements for addressing are:
- global addressing containing location info
- fast local addressing
- storage of both types in the same location efficiently
- translation between the two
The `storage::Address` class handles the enumerated storage
requirements. It stores either a local or global address in the size of
a local pointer (typically 8 bytes).
Conversion between the two is done in multiple places. The general
approach is to use local addresses (when possible) only for local
in-memory handling. All the communication and persistence uses global
addresses. Also, when receiving address from another worker, attempt to
localize addresses as soon as possible, so that least code has to worry
about potential inefficiency of using a global address for a local graph
element.

View File

@ -1,50 +0,0 @@
# Distributed durability
Durability in distributed is slightly different then in single-node as
the state itself is shared between multiple workers and none of those
states are independent.
Note that recovering from persistent storage must result in a stable
database state. This means that across the cluster the state
modification of every transaction that was running is either recovered
fully or not at all. Also, if transaction A committed before transaction B,
then if B is recovered so must A.
## Snapshots
It is possibly avoidable but highly desirable that the database can be
recovered from snapshot only, without relying on WAL files. For this to
be possible in distributed, it must be ensured that the same
transactions are recovered on all the workers (including master) in the
cluster. Since the snapshot does not contain information about which
state change happened in which transaction, the only way to achieve this
is to have synchronized snapshots. This means that the process of
creating a snapshot, which is in itself transactional (it happens within
a transaction and thus observes some consistent database state), must
happen in the same transaction. This is achieved by the master starting
a snapshot generating transaction and triggering the process on all
workers in the cluster.
## WAL
Unlike the snapshot, write-ahead logs contain the information on which
transaction made which state change. This makes it possible to include
or exclude transactions during the recovery process. What is necessary
however is a global consensus on which of the transactions should be
recovered and which not, to ensure recovery into a consistent state.
It would be possible to achieve this with some kind of synchronized
recovery process, but it would impose constraints on cluster startup and
would not be trivial.
A simpler alternative is that the consensus is achieved beforehand,
while the database (to be recovered) is still operational. What is
necessary is to keep track of which transactions are guaranteed to
have been flushed to the WAL files on all the workers in the cluster. It
makes sense to keep this record on the master, so a mechanism is
introduced which periodically pings all the workers, telling them to
flush their WALs, and writes some sort of a log indicating that this has
been confirmed. The downside of this is a periodic broadcast must be
done, and that potentially slightly less data can be recovered in the
case of a crash then if using a post-crash consensus. It is however much
simpler to implement.

View File

@ -1,51 +0,0 @@
## Dynamic Graph Partitioning
Memgraph supports dynamic graph partitioning similar to the Spinner algorithm,
mentioned in this paper: [https://arxiv.org/pdf/1404.3861.pdf].
Dgp is useful because it tries to group `local` date on the same worker, i.e.
it tries to keep closely connected data on one worker. It tries to avoid jumps
across workers when querying/traversing the distributed graph.
### Our implementation
It works independently on each worker but it is always running the migration
on only one worker at the same time. It achieves that by sharing a token
between workers, and the token ownership is transferred to the next worker
when the current worker finishes its migration step.
The reason that we want workers to work in disjoint time slots is it avoid
serialization errors caused by creating/removing edges of vertices during
migrations, which might cause an update of some vertex from two or more
different transactions.
### Migrations
For each vertex and workerid (label in the context of Dgp algorithm) we define
a score function. Score function takes into account labels of surrounding
endpoints of vertex edges (in/out) and the capacity of the worker with said
label. Score function loosely looks like this
```
locality(v, l) =
count endpoints of edges of vertex `v` with label `l` / degree of `v`
capacity(l) =
number of vertices on worker `l` divided by the worker capacity
(usually equal to the average number of vertices per worker)
score(v, l) = locality(v, l) - capacity(l)
```
We also define two flags alongside ```dynamic_graph_partitioner_enabled```,
```dgp_improvement_threshold``` and ```dgp_max_batch_size```.
These two flags are used during the migration phase.
When deciding if we need to migrate some vertex `v` from worker `l1` to worker
`l2` we examine the difference in scores, i.e.
if score(v, l1) - dgp_improvement_threshold / 100 < score(v, l2) then we
migrate the vertex.
Max batch size flag limits the number of vertices we can transfer in one batch
(one migration step).
Setting this value to a too large value will probably cause
a lot of interference with client queries, and having it a small value
will slow down convergence of the algorithm.

View File

@ -1,54 +0,0 @@
# Memgraph distributed
This chapter describes some of the concepts used in distributed
Memgraph. By "distributed" here we mean the sharding of a single graph
onto multiple processing units (servers).
## Conceptual organization
There is a single master and multiple workers. The master contains all
the global sources of truth (transaction engine,
[label|edge-type|property] to name mappings). Also, in the current
organization it is the only one that contains a Bolt server (for
communication with the end client) and an interpretation engine. Workers
contain the data and means of subquery interpretation (query plans
recieved from the master) and means of communication with the master and
other workers.
In many query plans the load on the master is much larger then the load
on the workers. For that reason it might be beneficial to make the
master contain less data (or none at all), and/or having multiple
interpretation masters.
## Logic organization
Both the distributed and the single node Memgraph use the same codebase.
In cases where the behavior in single-node differs from that in
distributed, some kind of dynamic behavior change is implemented (either
through inheritance or conditional logic).
### GraphDb
The `database::GraphDb` is an "umbrella" object for parts of the
database such as storage, garbage collection, transaction engine etc.
There is a class heirarchy of `GraphDb` implementations, as well as a
base interface object. There are subclasses for single-node, master and
worker deplotyments. Which implementation is used depends on the
configuration processed in the `main` entry point of memgraph.
The `GraphDb` interface exposes getters to base classes of
other similar heirarchies (for example to `tx::Engine`). In that way
much of the code that uses those objects (for example query plan
interpretation) is agnostic to the type of deployment.
### RecordAccessors
The functionality of `RecordAccessors` and it's subclasses is already
documented. It's important to note that the same implementation of
accessors is used in all deployments, with internal changes of behavior
depending on the locality of the graph element (vertex or edge) the
accessor represents. For example, if the graph element is local, an
update operation on an accessor will make the necessary MVCC ops, update
local data, indexes, the write-ahead log etc. However, if the accessor
represents a remote graph element, an update will trigger an RPC message
to the owner about the update and a change in the local cache.

View File

@ -1,103 +0,0 @@
# Distributed updates
Operations that modify the graph state are somewhat more complex in the
distributed system, as opposed to a single-node Memgraph deployment. The
complexity arises from two factors.
First, the data being modified is not necessarily owned by the worker
performing the modification. This situation is completely valid workers
execute parts of the query plan and parts must be executed by the
master.
Second, there are less guarantees regarding multi-threaded access. In
single-node Memgraph it was guaranteed that only one transaction will be
performing database work in a single transaction. This implied that
per-version storage could be thread-unsafe. In distributed Memgraph it
is possible that multiple threads could be performing work in the same
transaction as a consequence of the query being executed at the same
time on multiple workers and those executions interacting with the
globally partitioned database state.
## Deferred state modification
Making the per-version data storage thread-safe would most likely have a
performance impact very undesirable in a transactional database intended
for high throughput.
An alternative is that state modification over unsafe structures is not
performed immediately when requested, but postponed until it is safe to
do (there is is a guarantee of no concurrent access).
Since local query plan execution is done the same way on local data as
it is in single-node Memgraph, it is not possible to deffer that part of
the modification story. What can be deferred are modifications requested
by other workers. Since local query plan execution still is
single-threaded, this approach is safe.
At the same time those workers requesting the remote update can update
local copies (caches) of the not-owned data since that cache is only
being used by the single, local-execution thread.
### Visibility
Since updates are deferred the question arises: when do the updates
become visible? The above described process offers the following
visibility guarantees:
- updates done on the local state are visible to the owner
- updates done on the local state are NOT visible to anyone else during
the same (transaction + command)
- updates done on remote state are deferred on the owner and not
visible to the owner until applied
- updates done on the remote state are applied immediately to the local
caches and thus visible locally
This implies an inconsistent view of the database state. In a concurrent
execution of a single query this can hardly be avoided and is accepted
as such. It does not change the Cypher query execution semantic in any
of the well-defined scenarios. It possibly changes some of the behaviors
in which the semantic is not well defined even in single-node execution.
### Synchronization, update application
In many queries it is mandatory to observe the latest global graph state
(typically when returning it to the client). That means that before that
happens all the deferred updates need to be applied, and all the caches
to remote data invalidated. Exactly this happens when executing queries
that modify the graph state. At some point a global synchronization
point is reached. First it is waited that all workers finish the
execution of query plan parts performing state modifications. After that
all the workers are told to apply the deferred updates they received to
their graph state. Since there is no concurrent query plan execution,
this is safe. Once that is done all the local caches are cleared and the
requested data can be returned to the client.
### Command advancement
In complex queries where a read part follows a state modification part
the synchronization process after the state modification part is
followed by command advancement, like in single-node execution.
## Creation
Graph element creation is not deferred. This is practical because the
response to a creation is the global ID of the newly created element. At
the same time it is safe because no other worker (including the owner)
will be using the newly added graph element.
## Updating
Updating is deferred, as described. Note that this also means that
record locking conflicts are deferred and serialization errors
(including lock timeouts) are postponed until the deferred update
application phase. In certain scenarios it might be beneficial to force
these errors to happen earlier, when the deferred update request is
processed.
## Deletion
Deletion is also deferred. Deleting an edge implies a modification of
it's endpoint vertices, which must be deferred as those data structures
are not thread-safe. Deleting a vertex is either with detaching, in
which case an arbitrary number of updates are implied in the vertex's
neighborhood, or without detaching which relies on checking the current
state of the graph which is generally impossible in distributed.

View File

@ -1,20 +0,0 @@
## Dynamic Graph Partitioner
Memgraph supports dynamic graph partitioning which dynamically improves
performance on badly partitioned dataset over workers. To enable it, the user
should use the following flag when firing up the *master* node:
```plaintext
--dynamic_graph_partitioner_enable
```
### Parameters
| Name | Default Value | Description | Range |
|------|---------------|-------------|-------|
|--dgp_improvement_threshold | 10 | How much better should specific node score
be to consider a migration to another worker. This represents the minimal
difference between new score that the vertex will have when migrated and the
old one such that it's migrated. | Min: 1, Max: 100
|--dgp_max_batch_size | 2000 | Maximal amount of vertices which should be
migrated in one dynamic graph partitioner step. | Min: 1, Max: MaxInt32 |