From 58453794a6213bdb94cddbf1285072e03149a5d2 Mon Sep 17 00:00:00 2001
From: Goran Zuzic <goran.zuzic@memgraph.io>
Date: Thu, 17 Aug 2017 15:36:58 +0200
Subject: [PATCH] [Rename diff] Change the terminology to match the rest.

Reviewers: sasa.stanko

Reviewed By: sasa.stanko

Subscribers: buda, lion

Differential Revision: https://phabricator.memgraph.io/D672
---
 experimental/distributed/README.md            |  11 ++
 experimental/distributed/src/graph.hpp        | 164 +++++++++---------
 experimental/distributed/src/spinner.hpp      |  92 +++++-----
 experimental/distributed/src/uid.hpp          |  72 ++++----
 experimental/distributed/tests/graph_test.cpp |  76 ++++----
 .../distributed/tests/spinner_test.cpp        |  28 +--
 src/utils/hashing/fnv.hpp                     |  13 ++
 7 files changed, 243 insertions(+), 213 deletions(-)

diff --git a/experimental/distributed/README.md b/experimental/distributed/README.md
index 300c7e8c3..31d38956b 100644
--- a/experimental/distributed/README.md
+++ b/experimental/distributed/README.md
@@ -1,7 +1,18 @@
+
 # distributed memgraph
 
 This subdirectory structure implements distributed infrastructure of Memgraph.
 
+## terminology
+
+* Memgraph Node Id (mnid): a machine (processs) that runs a (distributed) Memgraph program.
+* Node: a computer that performs (distributed) work.
+* Vertex: an abstract graph concept.
+* Reactor: a unit of concurrent execution, lives on its own thread.
+* Connector: a communication abstraction between Reactors. The reactors can be on the same machine or on different processes.
+* EventStream: read-end of a connector, is owned by exactly one Reactor/thread.
+* Channel: write-end of a connector, can be owned (wrote into) by multiple threads.
+
 ## conventions
 
 1. Locked: A method having a Locked... prefix indicates that you
diff --git a/experimental/distributed/src/graph.hpp b/experimental/distributed/src/graph.hpp
index f81874a0d..76d9bfd4c 100644
--- a/experimental/distributed/src/graph.hpp
+++ b/experimental/distributed/src/graph.hpp
@@ -8,155 +8,157 @@
 
 enum class EdgeType { OUTGOING, INCOMING };
 
-/** A node in the graph. Has incoming and outgoing edges which
- * are defined as global addresses of other nodes */
-class Node {
+/** A vertex in the graph. Has incoming and outgoing edges which
+ * are defined as global addresses of other vertices */
+class Vertex {
  public:
-  Node(const GlobalId &id) : id_(id) {}
+  Vertex(const UniqueVid &id) : id_(id) {}
 
   const auto &id() const { return id_; };
   const auto &edges_out() const { return edges_out_; }
   const auto &edges_in() const { return edges_in_; }
 
-  void AddConnection(EdgeType edge_type, const GlobalAddress &gad) {
+  void AddConnection(EdgeType edge_type, const GlobalVertAddress &gad) {
     (edge_type == EdgeType::INCOMING ? edges_in_ : edges_out_)
         .emplace_back(gad);
   }
 
-  /** Changes all old_address edges to have the new_worker */
-  void RedirectEdges(const GlobalAddress old_address, size_t new_worker) {
+  /** Changes all old_address edges to have the new Memgraph node id */
+  void RedirectEdges(const GlobalVertAddress& old_address, int64_t new_mnid) {
     for (auto &address : edges_in_)
-      if (address == old_address) address.worker_id_ = new_worker;
+      if (address == old_address) address.cur_mnid_ = new_mnid;
     for (auto &address : edges_out_)
-      if (address == old_address) address.worker_id_ = new_worker;
+      if (address == old_address) address.cur_mnid_ = new_mnid;
   }
 
  private:
-  // TODO remove id_ from Node if not necessary
-  GlobalId id_;
+  UniqueVid id_;
 
-  // global addresses of nodes this node is connected to
-  std::vector<GlobalAddress> edges_out_;
-  std::vector<GlobalAddress> edges_in_;
+  // global addresses of vertices this vertex is connected to
+  std::vector<GlobalVertAddress> edges_out_;
+  std::vector<GlobalVertAddress> edges_in_;
 };
 
-/** A worker / shard in the distributed system */
-class Worker {
+/**
+ * A storage that doesn't assume everything is in-memory.
+ */
+class ShardedStorage {
  public:
-  // unique worker ID. uniqueness is ensured by the worker
-  // owner (the Distributed class)
-  const int64_t id_;
+  // Unique Memgraph node ID. Uniqueness is ensured by the (distributed) system.
+  const int64_t mnid_;
 
-  Worker(int64_t id) : id_(id) {}
+  ShardedStorage(int64_t mnid) : mnid_(mnid) {}
 
-  int64_t NodeCount() const { return nodes_.size(); }
+  int64_t VertexCount() const { return vertices_.size(); }
 
-  /** Gets a node. */
-  Node &GetNode(const GlobalId &gid) {
-    auto found = nodes_.find(gid);
-    assert(found != nodes_.end());
+  /** Gets a vertex. */
+  Vertex &GetVertex(const UniqueVid &gid) {
+    auto found = vertices_.find(gid);
+    assert(found != vertices_.end());
     return found->second;
   }
 
-  /** Returns the number of edges that cross from this
-   * graph / worker into another one */
+  /**
+   * Returns the number of edges that cross from this
+   * node into another one
+   */
   int64_t BoundaryEdgeCount() const {
     int64_t count = 0;
     auto count_f = [this, &count](const auto &edges) {
-      for (const GlobalAddress &address : edges)
-        if (address.worker_id_ != id_) count++;
+      for (const GlobalVertAddress &address : edges)
+        if (address.cur_mnid_ != mnid_) count++;
     };
-    for (const auto &node : nodes_) {
-      count_f(node.second.edges_out());
-      count_f(node.second.edges_in());
+    for (const auto &vertex : vertices_) {
+      count_f(vertex.second.edges_out());
+      count_f(vertex.second.edges_in());
     }
 
     return count;
   }
 
-  /** Creates a new node on this worker. Returns it's global id */
-  const GlobalId &MakeNode() {
-    GlobalId new_id(id_, next_node_sequence_++);
-    auto new_node = nodes_.emplace(std::make_pair(new_id, Node(new_id)));
-    return new_node.first->first;
+  /** Creates a new vertex on this node. Returns its global id */
+  const UniqueVid &MakeVertex() {
+    UniqueVid new_id(mnid_, next_vertex_sequence_++);
+    auto new_vertex = vertices_.emplace(std::make_pair(new_id, Vertex(new_id)));
+    return new_vertex.first->first;
   };
 
-  /** Places the existing node on this worker */
-  void PlaceNode(const GlobalId &gid, const Node &node) {
-    nodes_.emplace(gid, node);
+  /** Places the existing vertex on this node */
+  void PlaceVertex(const UniqueVid &gid, const Vertex &vertex) {
+    vertices_.emplace(gid, vertex);
   }
 
-  /** Removes the node with the given ID from this worker */
-  void RemoveNode(const GlobalId &gid) { nodes_.erase(gid); }
+  /** Removes the vertex with the given ID from this node */
+  void RemoveVertex(const UniqueVid &gid) { vertices_.erase(gid); }
 
-  auto begin() const { return nodes_.begin(); }
+  auto begin() const { return vertices_.begin(); }
 
-  auto end() const { return nodes_.end(); }
+  auto end() const { return vertices_.end(); }
 
  private:
-  // counter of sequences numbers of nodes created on this worker
-  int64_t next_node_sequence_{0};
+  // counter of sequences numbers of vertices created on this node
+  int64_t next_vertex_sequence_{0};
 
-  // node storage of this worker
-  std::unordered_map<GlobalId, Node> nodes_;
+  // vertex storage of this node
+  std::unordered_map<UniqueVid, Vertex> vertices_;
 };
 
 /**
- * A distributed system consisting of mulitple workers.
+ * A distributed system consisting of mulitple nodes.
  * For the time being it's not modelling a distributed
  * system correctly in terms of message passing (as opposed
- * to operating on workers and their data directly).
+ * to operating on nodes and their data directly).
  */
 class Distributed {
  public:
-  /** Creates a distributed with the given number of workers */
-  Distributed(int initial_worker_count = 0) {
-    for (int worker_id = 0; worker_id < initial_worker_count; worker_id++)
-      AddWorker();
+  /** Creates a distributed with the given number of nodes */
+  Distributed(int initial_mnode_count = 0) {
+    for (int mnode_id = 0; mnode_id < initial_mnode_count; mnode_id++)
+      AddMnode();
   }
 
-  int64_t AddWorker() {
-    int64_t new_worker_id = workers_.size();
-    workers_.emplace_back(new_worker_id);
-    return new_worker_id;
+  int64_t AddMnode() {
+    int64_t new_mnode_id = mnodes_.size();
+    mnodes_.emplace_back(new_mnode_id);
+    return new_mnode_id;
   }
 
-  int WorkerCount() const { return workers_.size(); }
+  int MnodeCount() const { return mnodes_.size(); }
 
-  auto &GetWorker(int64_t worker_id) { return workers_[worker_id]; }
+  auto &GetMnode(int64_t mnode_id) { return mnodes_[mnode_id]; }
 
-  GlobalAddress MakeNode(int64_t worker_id) {
-    return {worker_id, workers_[worker_id].MakeNode()};
+  GlobalVertAddress MakeVertex(int64_t mnid) {
+    return {mnid, mnodes_[mnid].MakeVertex()};
   }
 
-  Node &GetNode(const GlobalAddress &address) {
-    return workers_[address.worker_id_].GetNode(address.id_);
+  Vertex &GetVertex(const GlobalVertAddress &address) {
+    return mnodes_[address.cur_mnid_].GetVertex(address.uvid_);
   }
 
-  /** Moves a node with the given global id to the given worker */
-  void MoveNode(const GlobalAddress &gid, int64_t destination) {
-    const Node &node = GetNode(gid);
+  /** Moves a vertex with the given global id to the given mnode */
+  void MoveVertex(const GlobalVertAddress &gad, int64_t destination) {
+    const Vertex &vertex = GetVertex(gad);
 
-    // make sure that all edges to and from the node are updated
-    for (auto &edge : node.edges_in())
-      GetNode(edge).RedirectEdges(gid, destination);
-    for (auto &edge : node.edges_out())
-      GetNode(edge).RedirectEdges(gid, destination);
+    // make sure that all edges to and from the vertex are updated
+    for (auto &edge : vertex.edges_in())
+      GetVertex(edge).RedirectEdges(gad, destination);
+    for (auto &edge : vertex.edges_out())
+      GetVertex(edge).RedirectEdges(gad, destination);
 
-    // change node destination
-    workers_[destination].PlaceNode(gid.id_, node);
-    workers_[gid.worker_id_].RemoveNode(gid.id_);
+    // change vertex destination
+    mnodes_[destination].PlaceVertex(gad.uvid_, vertex);
+    mnodes_[gad.cur_mnid_].RemoveVertex(gad.uvid_);
   }
 
-  void MakeEdge(const GlobalAddress &from, const GlobalAddress &to) {
-    GetNode(from).AddConnection(EdgeType::OUTGOING, to);
-    GetNode(to).AddConnection(EdgeType::INCOMING, from);
+  void MakeEdge(const GlobalVertAddress &from, const GlobalVertAddress &to) {
+    GetVertex(from).AddConnection(EdgeType::OUTGOING, to);
+    GetVertex(to).AddConnection(EdgeType::INCOMING, from);
   }
 
-  auto begin() const { return workers_.begin(); }
+  auto begin() const { return mnodes_.begin(); }
 
-  auto end() const { return workers_.end(); }
+  auto end() const { return mnodes_.end(); }
 
  private:
-  std::vector<Worker> workers_;
+  std::vector<ShardedStorage> mnodes_;
 };
diff --git a/experimental/distributed/src/spinner.hpp b/experimental/distributed/src/spinner.hpp
index 34db7a925..eb49364e5 100644
--- a/experimental/distributed/src/spinner.hpp
+++ b/experimental/distributed/src/spinner.hpp
@@ -36,76 +36,76 @@ auto MaxRandom(const std::vector<double> &scores) {
 }
 
 /**
- * Returns the index of the best (highest scored) worker
- * for the given node. If there are multiple workers with
- * the best score, node prefers to remain on the same worker
+ * Returns the index of the best (highest scored) mnode
+ * for the given vertex. If there are multiple mnodes with
+ * the best score, vertex prefers to remain on the same mnode
  * (if among the best), or one is chosen at random.
  *
  * @param distributed - the distributed system.
- * @param node - the node which is being evaluated.
- * @param penalties - a vector of penalties (per worker).
- * @param current_worker - the worker on which the given
- *  node is currently residing.
+ * @param vertex - the vertex which is being evaluated.
+ * @param penalties - a vector of penalties (per mnode).
+ * @param current_mnode - the mnode on which the given
+ *  vertex is currently residing.
  * @return - std::pair<int, std::vector<double>> which is a
- * pair of (best worker, score_per_worker).
+ * pair of (best mnode, score_per_mnode).
  */
-auto BestWorker(const Distributed &distributed, const Node &node,
-                const std::vector<double> &penalties, int current_worker) {
-  // scores per worker
-  std::vector<double> scores(distributed.WorkerCount(), 0.0);
+auto BestMnode(const Distributed &distributed, const Vertex &vertex,
+                const std::vector<double> &penalties, int current_mnode) {
+  // scores per mnode
+  std::vector<double> scores(distributed.MnodeCount(), 0.0);
 
-  for (auto &edge : node.edges_in()) scores[edge.worker_id_] += 1.0;
-  for (auto &edge : node.edges_out()) scores[edge.worker_id_] += 1.0;
+  for (auto &edge : vertex.edges_in()) scores[edge.cur_mnid_] += 1.0;
+  for (auto &edge : vertex.edges_out()) scores[edge.cur_mnid_] += 1.0;
 
-  for (int worker = 0; worker < distributed.WorkerCount(); ++worker) {
-    // normalize contribution of worker over neighbourhood size
-    scores[worker] /= node.edges_out().size() + node.edges_in().size();
+  for (int mnode = 0; mnode < distributed.MnodeCount(); ++mnode) {
+    // normalize contribution of mnode over neighbourhood size
+    scores[mnode] /= vertex.edges_out().size() + vertex.edges_in().size();
     // add balancing penalty
-    scores[worker] -= penalties[worker];
+    scores[mnode] -= penalties[mnode];
   }
 
   // pick the best destination, but prefer to stay if you can
   size_t destination = MaxRandom(scores);
-  if (scores[current_worker] == scores[destination])
-    destination = current_worker;
+  if (scores[current_mnode] == scores[destination])
+    destination = current_mnode;
 
   return std::make_pair(destination, scores);
 }
 
-/** Indication if Spinner worker penality is calculated based on
- * vertex or edge worker cardinalities */
+/** Indication if Spinner mnode penality is calculated based on
+ * vertex or edge mnode cardinalities */
 enum class PenaltyType { Vertex, Edge };
 
-/** Calcualtes Spinner penalties for workers in the given
+/** Calcualtes Spinner penalties for mnodes in the given
  * distributed system. */
 auto Penalties(const Distributed &distributed,
                PenaltyType penalty_type = PenaltyType::Edge) {
   std::vector<double> penalties;
   int64_t total_count{0};
 
-  for (const auto &worker : distributed) {
-    int64_t worker_count{0};
+  for (const auto &mnode : distributed) {
+    int64_t mnode_count{0};
     switch (penalty_type) {
       case PenaltyType::Vertex:
-        worker_count += worker.NodeCount();
+        mnode_count += mnode.VertexCount();
         break;
       case PenaltyType::Edge:
-        for (const auto &node_kv : worker) {
-          // Spinner counts the edges on a worker as the sum
-          // of degrees of nodes on that worker. In that sense
+        for (const auto &vertex_kv : mnode) {
+          // Spinner counts the edges on a mnode as the sum
+          // of degrees of vertices on that mnode. In that sense
           // both incoming and outgoing edges are individually
           // added...
-          worker_count += node_kv.second.edges_out().size();
-          worker_count += node_kv.second.edges_in().size();
+          mnode_count += vertex_kv.second.edges_out().size();
+          mnode_count += vertex_kv.second.edges_in().size();
         }
         break;
     }
-    total_count += worker_count;
-    penalties.emplace_back(worker_count);
+    total_count += mnode_count;
+    penalties.emplace_back(mnode_count);
   }
 
   for (auto &penalty : penalties)
-    penalty /= c * total_count / distributed.WorkerCount();
+    penalty /= c * total_count / distributed.MnodeCount();
 
   return penalties;
 }
@@ -117,31 +117,31 @@ void PerformSpinnerStep(Distributed &distributed) {
   // here a strategy can be injected for limiting
   // the number of movements performed in one step.
   // limiting could be based on (for example):
-  //  - limiting the number of movements per worker
+  //  - limiting the number of movements per mnode
   //  - limiting only to movements that are above
   //    a treshold (score improvement or something)
-  //  - not executing on all the workers (also prevents
+  //  - not executing on all the mnodes (also prevents
   //    oscilations)
   //
   // in the first implementation just accumulate all
   // the movements and execute together.
 
-  // relocation info: contains the address of the Node
-  // that needs to relocate and it's destination worker
-  std::vector<std::pair<GlobalAddress, int>> movements;
+  // relocation info: contains the address of the Vertex
+  // that needs to relocate and it's destination mnode
+  std::vector<std::pair<GlobalVertAddress, int>> movements;
 
-  for (const Worker &worker : distributed)
-    for (const auto &gid_node_pair : worker) {
-      // (best destination, scores) pair for node
+  for (const ShardedStorage &mnode : distributed)
+    for (const auto &gid_vertex_pair : mnode) {
+      // (best destination, scores) pair for vertex
       std::pair<int, std::vector<double>> destination_scores =
-          BestWorker(distributed, gid_node_pair.second, penalties, worker.id_);
-      if (destination_scores.first != worker.id_)
-        movements.emplace_back(GlobalAddress(worker.id_, gid_node_pair.first),
+          BestMnode(distributed, gid_vertex_pair.second, penalties, mnode.mnid_);
+      if (destination_scores.first != mnode.mnid_)
+        movements.emplace_back(GlobalVertAddress(mnode.mnid_, gid_vertex_pair.first),
                                destination_scores.first);
     }
 
   // execute movements. it is likely that in the real system
   // this will need to happen as a single db transaction
-  for (const auto &m : movements) distributed.MoveNode(m.first, m.second);
+  for (const auto &m : movements) distributed.MoveVertex(m.first, m.second);
 }
 }  // namespace spinner
diff --git a/experimental/distributed/src/uid.hpp b/experimental/distributed/src/uid.hpp
index 467c17bfa..60ce31fc5 100644
--- a/experimental/distributed/src/uid.hpp
+++ b/experimental/distributed/src/uid.hpp
@@ -1,62 +1,66 @@
 #pragma once
 
+#include "utils/hashing/fnv.hpp"
+
 #include <cstdint>
 #include <vector>
 
-/** A globally defined identifier. Defines a worker
- * and the sequence number on that worker */
-class GlobalId {
+/**
+ * Globally unique id (in the entire distributed system) of a vertex.
+ *
+ * It is identified by a pair of a (original memgraph node, local vertex id)
+ */
+class UniqueVid {
  public:
-  GlobalId(int64_t worker_id, int64_t sequence_number)
-      : worker_id_(worker_id), sequence_number_(sequence_number) {}
-  // TODO perhaps make members const and replace instead of changing
-  // when migrating nodes
-  int64_t worker_id_;
-  int64_t sequence_number_;
+  UniqueVid(int64_t orig_mnid, int64_t vid)
+    : orig_mnid_(orig_mnid), vid_(vid) {}
+  /** Original Memgraph node the vertex was created **/
+  int64_t orig_mnid_;
 
-  bool operator==(const GlobalId &other) const {
-    return worker_id_ == other.worker_id_ &&
-           sequence_number_ == other.sequence_number_;
+  /** Original vertex id it was assigned on creation. **/
+  int64_t vid_;
+
+  bool operator==(const UniqueVid &other) const {
+    return orig_mnid_ == other.orig_mnid_ &&
+           vid_ == other.vid_;
   }
 
-  bool operator!=(const GlobalId &other) const { return !(*this == other); }
+  bool operator!=(const UniqueVid &other) const { return !(*this == other); }
 };
 
-/** Defines a location in the system where something can be found.
- * Something can be found on some worker, for some Id */
-class GlobalAddress {
+/**
+ * Specifies where a vertex is in the distributed system.
+ */
+class GlobalVertAddress {
  public:
-  GlobalAddress(int64_t worker_id, GlobalId id)
-      : worker_id_(worker_id), id_(id) {}
-  // TODO perhaps make members const and replace instead of changing
-  // when migrating nodes
-  int64_t worker_id_;
-  GlobalId id_;
+  GlobalVertAddress(int64_t cur_mnid, const UniqueVid &uvid)
+      : cur_mnid_(cur_mnid), uvid_(uvid) {}
 
-  bool operator==(const GlobalAddress &other) const {
-    return worker_id_ == other.worker_id_ && id_ == other.id_;
+  /** The current Memgraph node where the vertex is **/
+  int64_t cur_mnid_;
+  UniqueVid uvid_;
+
+  bool operator==(const GlobalVertAddress &other) const {
+    return cur_mnid_ == other.cur_mnid_ && uvid_ == other.uvid_;
   }
 
-  bool operator!=(const GlobalAddress &other) const {
+  bool operator!=(const GlobalVertAddress &other) const {
     return !(*this == other);
   }
 };
 
 namespace std {
 template <>
-struct hash<GlobalId> {
-  size_t operator()(const GlobalId &id) const {
-    return id.sequence_number_ << 4 ^ id.worker_id_;
+struct hash<UniqueVid> {
+  size_t operator()(const UniqueVid &uid) const {
+    return HashCombine<decltype(uid.orig_mnid_), decltype(uid.vid_)>()(uid.orig_mnid_, uid.vid_);
   }
 };
 
 template <>
-struct hash<GlobalAddress> {
-  size_t operator()(const GlobalAddress &ga) const {
-    return gid_hash(ga.id_) << 4 ^ ga.worker_id_;
+struct hash<GlobalVertAddress> {
+  size_t operator()(const GlobalVertAddress &ga) const {
+    return HashCombine<decltype(ga.cur_mnid_), decltype(ga.uvid_)>()(ga.cur_mnid_, ga.uvid_);
   }
-
- private:
-  std::hash<GlobalId> gid_hash{};
 };
 }
diff --git a/experimental/distributed/tests/graph_test.cpp b/experimental/distributed/tests/graph_test.cpp
index 09911b211..8aae83987 100644
--- a/experimental/distributed/tests/graph_test.cpp
+++ b/experimental/distributed/tests/graph_test.cpp
@@ -5,63 +5,63 @@
 #include "graph.hpp"
 
 void test_global_id() {
-  GlobalId a(1, 1);
-  assert(a == GlobalId(1, 1));
-  assert(a != GlobalId(1, 2));
-  assert(a != GlobalId(2, 1));
+  UniqueVid a(1, 1);
+  assert(a == UniqueVid(1, 1));
+  assert(a != UniqueVid(1, 2));
+  assert(a != UniqueVid(2, 1));
 }
 
 void test_global_address() {
-  GlobalAddress a(1, {1, 1});
-  assert(a == GlobalAddress(1, {1, 1}));
-  assert(a != GlobalAddress(2, {1, 1}));
-  assert(a != GlobalAddress(1, {2, 1}));
+  GlobalVertAddress a(1, {1, 1});
+  assert(a == GlobalVertAddress(1, {1, 1}));
+  assert(a != GlobalVertAddress(2, {1, 1}));
+  assert(a != GlobalVertAddress(1, {2, 1}));
 }
 
-void test_worker() {
-  Worker worker0{0};
-  assert(worker0.NodeCount() == 0);
-  GlobalId n0 = worker0.MakeNode();
-  assert(worker0.NodeCount() == 1);
+void test_mnode() {
+  ShardedStorage mnode0{0};
+  assert(mnode0.VertexCount() == 0);
+  UniqueVid n0 = mnode0.MakeVertex();
+  assert(mnode0.VertexCount() == 1);
 
-  Worker worker1{1};
-  worker1.PlaceNode(n0, worker0.GetNode(n0));
-  worker0.RemoveNode(n0);
-  assert(worker0.NodeCount() == 0);
-  assert(worker1.NodeCount() == 1);
+  ShardedStorage mnode1{1};
+  mnode1.PlaceVertex(n0, mnode0.GetVertex(n0));
+  mnode0.RemoveVertex(n0);
+  assert(mnode0.VertexCount() == 0);
+  assert(mnode1.VertexCount() == 1);
 
-  worker1.MakeNode();
-  assert(worker1.NodeCount() == 2);
-  assert(std::distance(worker1.begin(), worker1.end()) == 2);
+  mnode1.MakeVertex();
+  assert(mnode1.VertexCount() == 2);
+  assert(std::distance(mnode1.begin(), mnode1.end()) == 2);
 }
 
 void test_distributed() {
   Distributed d;
-  assert(d.WorkerCount() == 0);
-  auto w0 = d.AddWorker();
-  assert(d.WorkerCount() == 1);
-  auto w1 = d.AddWorker();
-  assert(d.WorkerCount() == 2);
+  assert(d.MnodeCount() == 0);
+  auto w0 = d.AddMnode();
+  assert(d.MnodeCount() == 1);
+  auto w1 = d.AddMnode();
+  assert(d.MnodeCount() == 2);
 
-  GlobalAddress n0 = d.MakeNode(w0);
-  assert(d.GetWorker(w0).NodeCount() == 1);
-  GlobalAddress n1 = d.MakeNode(w1);
+  GlobalVertAddress n0 = d.MakeVertex(w0);
+  assert(d.GetMnode(w0).VertexCount() == 1);
+  GlobalVertAddress n1 = d.MakeVertex(w1);
 
-  assert(d.GetNode(n0).edges_out().size() == 0);
-  assert(d.GetNode(n0).edges_in().size() == 0);
-  assert(d.GetNode(n1).edges_out().size() == 0);
-  assert(d.GetNode(n1).edges_in().size() == 0);
+  assert(d.GetVertex(n0).edges_out().size() == 0);
+  assert(d.GetVertex(n0).edges_in().size() == 0);
+  assert(d.GetVertex(n1).edges_out().size() == 0);
+  assert(d.GetVertex(n1).edges_in().size() == 0);
   d.MakeEdge(n0, n1);
-  assert(d.GetNode(n0).edges_out().size() == 1);
-  assert(d.GetNode(n0).edges_in().size() == 0);
-  assert(d.GetNode(n1).edges_out().size() == 0);
-  assert(d.GetNode(n1).edges_in().size() == 1);
+  assert(d.GetVertex(n0).edges_out().size() == 1);
+  assert(d.GetVertex(n0).edges_in().size() == 0);
+  assert(d.GetVertex(n1).edges_out().size() == 0);
+  assert(d.GetVertex(n1).edges_in().size() == 1);
 }
 
 int main() {
   test_global_id();
   test_global_address();
-  test_worker();
+  test_mnode();
   test_distributed();
   std::cout << "All tests  passed" << std::endl;
 }
diff --git a/experimental/distributed/tests/spinner_test.cpp b/experimental/distributed/tests/spinner_test.cpp
index 2be7834df..d02d34016 100644
--- a/experimental/distributed/tests/spinner_test.cpp
+++ b/experimental/distributed/tests/spinner_test.cpp
@@ -12,16 +12,16 @@
 void PrintStatistics(const Distributed &distributed) {
   using std::cout;
   using std::endl;
-  for (const Worker &worker : distributed) {
-    cout << "  Worker " << worker.id_ << ":";
-    cout << " #nodes = " << worker.NodeCount();
+  for (const ShardedStorage &mnode : distributed) {
+    cout << "  ShardedStorage " << mnode.mnid_ << ":";
+    cout << " #vertices = " << mnode.VertexCount();
     int64_t edge_count{0};
-    for (const auto &gid_node_pair : worker) {
-      edge_count += gid_node_pair.second.edges_in().size();
-      edge_count += gid_node_pair.second.edges_out().size();
+    for (const auto &gid_vertex_pair : mnode) {
+      edge_count += gid_vertex_pair.second.edges_in().size();
+      edge_count += gid_vertex_pair.second.edges_out().size();
     }
     cout << ", #edges = " << edge_count;
-    cout << ", #cuts = " << worker.BoundaryEdgeCount() << endl;
+    cout << ", #cuts = " << mnode.BoundaryEdgeCount() << endl;
   }
 }
 
@@ -33,8 +33,8 @@ void PrintStatistics(const Distributed &distributed) {
  *   https://snap.stanford.edu/data/facebook_combined.txt.gz
  *   add number of vertices and edges in the first line of that file
  */
-Distributed ReadGraph(std::string filename, int worker_count) {
-  Distributed distributed(worker_count);
+Distributed ReadGraph(std::string filename, int mnode_count) {
+  Distributed distributed(mnode_count);
 
   std::fstream fs;
   fs.open(filename, std::fstream::in);
@@ -43,17 +43,17 @@ Distributed ReadGraph(std::string filename, int worker_count) {
   int vertex_count, edge_count;
   fs >> vertex_count >> edge_count;
 
-  // assign vertices to random workers
-  std::vector<GlobalAddress> nodes;
+  // assign vertices to random mnodes
+  std::vector<GlobalVertAddress> vertices;
   for (int i = 0; i < vertex_count; ++i)
-    nodes.emplace_back(distributed.MakeNode(rand() % worker_count));
+    vertices.emplace_back(distributed.MakeVertex(rand() % mnode_count));
 
   // add edges
   for (int i = 0; i < edge_count; ++i) {
     size_t u, v;
     fs >> u >> v;
-    assert(u < nodes.size() && v < nodes.size());
-    distributed.MakeEdge(nodes[u], nodes[v]);
+    assert(u < vertices.size() && v < vertices.size());
+    distributed.MakeEdge(vertices[u], vertices[v]);
   }
   fs.close();
   return distributed;
diff --git a/src/utils/hashing/fnv.hpp b/src/utils/hashing/fnv.hpp
index ed586ebeb..d2df451e5 100644
--- a/src/utils/hashing/fnv.hpp
+++ b/src/utils/hashing/fnv.hpp
@@ -65,3 +65,16 @@ struct FnvCollection {
  private:
   static const uint64_t fnv_prime = 1099511628211u;
 };
+
+template<typename TA, typename TB>
+struct HashCombine {
+  size_t operator()(const TA& a, const TB& b) const {
+    constexpr size_t fnv_prime = 1099511628211UL;
+    constexpr size_t fnv_offset = 14695981039346656037UL;
+    size_t ret = fnv_offset;
+    ret ^= std::hash<TA>()(a);
+    ret *= fnv_prime;
+    ret ^= std::hash<TB>()(b);
+    return ret;
+  }
+};