From dab95af366ca3d6b2e190ac2f0c283dee823f51d Mon Sep 17 00:00:00 2001
From: Marko Culinovic <marko.culinovic@memgraph.io>
Date: Wed, 22 Aug 2018 15:26:51 +0200
Subject: [PATCH] Extract stats to static lib

Reviewers: teon.banek, mferencevic

Reviewed By: teon.banek, mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1546
---
 src/CMakeLists.txt                            |   8 +-
 src/database/state_delta.lcp                  |   6 +-
 src/distributed/pull_produce_rpc_messages.lcp |   8 +-
 src/distributed/serialization.cpp             |   6 +-
 src/distributed/serialization.hpp             | 109 ++++++++++++++++
 src/distributed/updates_rpc_messages.lcp      |   4 +-
 src/memgraph_bolt.cpp                         |  72 +++++------
 src/query/frontend/ast/ast.cpp                |   5 +-
 src/stats/CMakeLists.txt                      |  73 +++++++++++
 src/stats/stats.cpp                           |   4 +-
 src/stats/stats.hpp                           |   2 +-
 src/utils/CMakeLists.txt                      |  22 ++++
 src/utils/serialization.hpp                   | 121 +-----------------
 .../clients/card_fraud_client.cpp             |   5 +-
 tools/tests/statsd/mg_statsd_client.cpp       |   1 +
 15 files changed, 268 insertions(+), 178 deletions(-)
 create mode 100644 src/stats/CMakeLists.txt

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 9af0b63c2..1356cb1e1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -7,6 +7,7 @@ add_subdirectory(integrations)
 add_subdirectory(io)
 add_subdirectory(telemetry)
 add_subdirectory(communication)
+add_subdirectory(stats)
 add_subdirectory(auth)
 
 # all memgraph src files
@@ -59,8 +60,6 @@ set(memgraph_src_files
     query/plan/variable_start_planner.cpp
     query/repl.cpp
     query/typed_value.cpp
-    stats/metrics.cpp
-    stats/stats.cpp
     storage/concurrent_id_mapper_master.cpp
     storage/concurrent_id_mapper_worker.cpp
     storage/dynamic_graph_partitioner/dgp.cpp
@@ -161,8 +160,6 @@ add_lcp(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a)
 add_capnp(distributed/updates_rpc_messages.capnp)
 add_lcp(query/plan/operator.lcp CAPNP_SCHEMA @0xe5cae8d045d30c42)
 add_capnp(query/plan/operator.capnp)
-add_lcp(stats/stats_rpc_messages.lcp CAPNP_SCHEMA @0xc19a87c81b9b4512)
-add_capnp(stats/stats_rpc_messages.capnp)
 add_lcp(storage/concurrent_id_mapper_rpc_messages.lcp CAPNP_SCHEMA @0xa6068dae93d225dd)
 add_capnp(storage/concurrent_id_mapper_rpc_messages.capnp)
 add_lcp(transactions/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5)
@@ -180,7 +177,6 @@ add_capnp(query/frontend/ast/ast.capnp)
 add_capnp(query/frontend/semantic/symbol.capnp)
 add_capnp(storage/serialization.capnp)
 add_capnp(transactions/common.capnp)
-add_capnp(utils/serialization.capnp)
 
 add_custom_target(generate_capnp DEPENDS generate_lcp ${generated_capnp_files})
 
@@ -191,7 +187,7 @@ string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)
 # memgraph_lib depend on these libraries
 set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools
     antlr_opencypher_parser_lib dl glog gflags capnp kj
-    mg-utils mg-io mg-integrations mg-requests mg-communication mg-auth)
+    mg-utils mg-io mg-integrations mg-requests mg-communication mg-auth mg-stats)
 
 if (USE_LTALLOC)
     list(APPEND MEMGRAPH_ALL_LIBS ltalloc)
diff --git a/src/database/state_delta.lcp b/src/database/state_delta.lcp
index cb4b43617..8e211d03a 100644
--- a/src/database/state_delta.lcp
+++ b/src/database/state_delta.lcp
@@ -4,12 +4,12 @@
 #include "communication/bolt/v1/decoder/decoder.hpp"
 #include "communication/bolt/v1/encoder/base_encoder.hpp"
 #include "database/state_delta.capnp.h"
+#include "distributed/serialization.hpp"
 #include "durability/hashed_file_reader.hpp"
 #include "durability/hashed_file_writer.hpp"
 #include "storage/address_types.hpp"
 #include "storage/gid.hpp"
 #include "storage/property_value.hpp"
-#include "utils/serialization.hpp"
 cpp<#
 
 (lcp:namespace database)
@@ -52,13 +52,13 @@ cpp<#
           :capnp-save
           (lambda (builder member)
             #>cpp
-            utils::SaveCapnpTypedValue(${member}, &${builder});
+            distributed::SaveCapnpTypedValue(${member}, &${builder});
             cpp<#)
           :capnp-load
           (lambda (reader member)
             #>cpp
             query::TypedValue tv;
-            utils::LoadCapnpTypedValue(${reader}, &tv);
+            distributed::LoadCapnpTypedValue(${reader}, &tv);
             ${member} = tv;
             cpp<#))
    (label "storage::Label")
diff --git a/src/distributed/pull_produce_rpc_messages.lcp b/src/distributed/pull_produce_rpc_messages.lcp
index 32d756048..06ca0d410 100644
--- a/src/distributed/pull_produce_rpc_messages.lcp
+++ b/src/distributed/pull_produce_rpc_messages.lcp
@@ -78,7 +78,7 @@ the relevant parts of the response, ready for use."))
                for (size_t val_i = 0; val_i < frame.size(); ++val_i) {
                  const auto &value = frame[val_i];
                  auto value_builder = frame_builder[val_i];
-                 utils::SaveCapnpTypedValue(
+                 distributed::SaveCapnpTypedValue(
                      value, &value_builder,
                      [this](const auto &value, auto *builder) {
                        this->SaveGraphElement(value, builder);
@@ -95,7 +95,7 @@ the relevant parts of the response, ready for use."))
                current_frame.reserve(frame_reader.size());
                for (const auto &value_reader : frame_reader) {
                  query::TypedValue value;
-                 utils::LoadCapnpTypedValue(
+                 distributed::LoadCapnpTypedValue(
                      value_reader, &value,
                      [this, dba, data_manager](const auto &reader, auto *value) {
                        this->LoadGraphElement(dba, reader, value, data_manager);
@@ -323,7 +323,7 @@ cpp<#)
                   auto key_builder = builder.initKey();
                   key_builder.setValue(entry.first);
                   auto value_builder = builder.initValue();
-                  utils::SaveCapnpTypedValue(entry.second, &value_builder);
+                  distributed::SaveCapnpTypedValue(entry.second, &value_builder);
                   ++i;
                 }
                 cpp<#)
@@ -332,7 +332,7 @@ cpp<#)
                 #>cpp
                 for (const auto &entry_reader : ${reader}.getEntries()) {
                   query::TypedValue value;
-                  utils::LoadCapnpTypedValue(entry_reader.getValue(), &value);
+                  distributed::LoadCapnpTypedValue(entry_reader.getValue(), &value);
                   ${member}.Add(entry_reader.getKey().getValue(), value);
                 }
                 cpp<#))
diff --git a/src/distributed/serialization.cpp b/src/distributed/serialization.cpp
index 539daf0e0..019ff2ca5 100644
--- a/src/distributed/serialization.cpp
+++ b/src/distributed/serialization.cpp
@@ -1,7 +1,5 @@
 #include "distributed/serialization.hpp"
 
-#include "utils/serialization.hpp"
-
 namespace {
 
 template <class TAddress>
@@ -30,7 +28,7 @@ void SaveProperties(
     auto prop_builder = (*builder)[i];
     prop_builder.setId(kv.first.Id());
     auto value_builder = prop_builder.initValue();
-    utils::SaveCapnpTypedValue(kv.second, &value_builder);
+    distributed::SaveCapnpTypedValue(kv.second, &value_builder);
     ++i;
   }
 }
@@ -40,7 +38,7 @@ PropertyValueStore LoadProperties(
   PropertyValueStore props;
   for (const auto &prop_reader : reader) {
     query::TypedValue value;
-    utils::LoadCapnpTypedValue(prop_reader.getValue(), &value);
+    distributed::LoadCapnpTypedValue(prop_reader.getValue(), &value);
     props.set(storage::Property(prop_reader.getId()), value);
   }
   return props;
diff --git a/src/distributed/serialization.hpp b/src/distributed/serialization.hpp
index 654d2f1cb..6e90f04f8 100644
--- a/src/distributed/serialization.hpp
+++ b/src/distributed/serialization.hpp
@@ -4,8 +4,10 @@
 #include <memory>
 
 #include "distributed/serialization.capnp.h"
+#include "query/typed_value.hpp"
 #include "storage/edge.hpp"
 #include "storage/vertex.hpp"
+#include "utils/exceptions.hpp"
 
 namespace distributed {
 
@@ -31,4 +33,111 @@ std::unique_ptr<Vertex> LoadVertex(const capnp::Vertex::Reader &reader);
 
 std::unique_ptr<Edge> LoadEdge(const capnp::Edge::Reader &reader);
 
+inline void SaveCapnpTypedValue(
+    const query::TypedValue &value, capnp::TypedValue::Builder *builder,
+    std::function<void(const query::TypedValue &, capnp::TypedValue::Builder *)>
+        save_graph_element = nullptr) {
+  switch (value.type()) {
+    case query::TypedValue::Type::Null:
+      builder->setNullType();
+      return;
+    case query::TypedValue::Type::Bool:
+      builder->setBool(value.Value<bool>());
+      return;
+    case query::TypedValue::Type::Int:
+      builder->setInteger(value.Value<int64_t>());
+      return;
+    case query::TypedValue::Type::Double:
+      builder->setDouble(value.Value<double>());
+      return;
+    case query::TypedValue::Type::String:
+      builder->setString(value.Value<std::string>());
+      return;
+    case query::TypedValue::Type::List: {
+      const auto &values = value.Value<std::vector<query::TypedValue>>();
+      auto list_builder = builder->initList(values.size());
+      for (size_t i = 0; i < values.size(); ++i) {
+        auto value_builder = list_builder[i];
+        SaveCapnpTypedValue(values[i], &value_builder, save_graph_element);
+      }
+      return;
+    }
+    case query::TypedValue::Type::Map: {
+      const auto &map = value.Value<std::map<std::string, query::TypedValue>>();
+      auto map_builder = builder->initMap(map.size());
+      size_t i = 0;
+      for (const auto &kv : map) {
+        auto kv_builder = map_builder[i];
+        kv_builder.setKey(kv.first);
+        auto value_builder = kv_builder.initValue();
+        SaveCapnpTypedValue(kv.second, &value_builder, save_graph_element);
+        ++i;
+      }
+      return;
+    }
+    case query::TypedValue::Type::Vertex:
+    case query::TypedValue::Type::Edge:
+    case query::TypedValue::Type::Path:
+      if (save_graph_element) {
+        save_graph_element(value, builder);
+      } else {
+        throw utils::BasicException(
+            "Unable to serialize TypedValue of type: {}", value.type());
+      }
+  }
+}
+
+inline void LoadCapnpTypedValue(
+    const capnp::TypedValue::Reader &reader, query::TypedValue *value,
+    std::function<void(const capnp::TypedValue::Reader &, query::TypedValue *)>
+        load_graph_element = nullptr) {
+  switch (reader.which()) {
+    case distributed::capnp::TypedValue::NULL_TYPE:
+      *value = query::TypedValue::Null;
+      return;
+    case distributed::capnp::TypedValue::BOOL:
+      *value = reader.getBool();
+      return;
+    case distributed::capnp::TypedValue::INTEGER:
+      *value = reader.getInteger();
+      return;
+    case distributed::capnp::TypedValue::DOUBLE:
+      *value = reader.getDouble();
+      return;
+    case distributed::capnp::TypedValue::STRING:
+      *value = reader.getString().cStr();
+      return;
+    case distributed::capnp::TypedValue::LIST: {
+      std::vector<query::TypedValue> list;
+      list.reserve(reader.getList().size());
+      for (const auto &value_reader : reader.getList()) {
+        list.emplace_back();
+        LoadCapnpTypedValue(value_reader, &list.back(), load_graph_element);
+      }
+      *value = list;
+      return;
+    }
+    case distributed::capnp::TypedValue::MAP: {
+      std::map<std::string, query::TypedValue> map;
+      for (const auto &kv_reader : reader.getMap()) {
+        auto key = kv_reader.getKey().cStr();
+        LoadCapnpTypedValue(kv_reader.getValue(), &map[key],
+                            load_graph_element);
+      }
+      *value = map;
+      return;
+    }
+    case distributed::capnp::TypedValue::VERTEX:
+    case distributed::capnp::TypedValue::EDGE:
+    case distributed::capnp::TypedValue::PATH:
+      if (load_graph_element) {
+        load_graph_element(reader, value);
+      } else {
+        throw utils::BasicException(
+            "Unexpected TypedValue type '{}' when loading from archive",
+            reader.which());
+      }
+  }
+}
+
 }  // namespace distributed
diff --git a/src/distributed/updates_rpc_messages.lcp b/src/distributed/updates_rpc_messages.lcp
index 82ffbc36d..da9cd8f01 100644
--- a/src/distributed/updates_rpc_messages.lcp
+++ b/src/distributed/updates_rpc_messages.lcp
@@ -67,7 +67,7 @@ cpp<#
                        auto key_builder = builder->initKey();
                        entry.first.Save(&key_builder);
                        auto value_builder = builder->initValue();
-                       utils::SaveCapnpTypedValue(entry.second, &value_builder);
+                       distributed::SaveCapnpTypedValue(entry.second, &value_builder);
                      });
                  cpp<#)
                :capnp-load
@@ -79,7 +79,7 @@ cpp<#
                        storage::Property prop;
                        prop.Load(reader.getKey());
                        query::TypedValue value;
-                       utils::LoadCapnpTypedValue(reader.getValue(), &value);
+                       distributed::LoadCapnpTypedValue(reader.getValue(), &value);
                        return std::make_pair(prop, value);
                      });
                  cpp<#)))
diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp
index a68a5b96f..f819fb67e 100644
--- a/src/memgraph_bolt.cpp
+++ b/src/memgraph_bolt.cpp
@@ -301,24 +301,24 @@ void SingleNodeMain() {
   SessionData session_data{db};
 
   auto stream_writer =
-      [&session_data](
-          const std::string &query,
-          const std::map<std::string, communication::bolt::Value> &params) {
-        auto dba = session_data.db.Access();
-        KafkaResultStream stream;
-        std::map<std::string, query::TypedValue> params_tv;
-        for (const auto &kv : params)
-          params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
-        try {
-          session_data.interpreter(query, *dba, params_tv, false)
-              .PullAll(stream);
-          dba->Commit();
-        } catch (const query::QueryException &e) {
-          LOG(WARNING) << "[Kafka] query execution failed with an exception: "
-                       << e.what();
-          dba->Abort();
-        }
-      };
+    [&session_data](
+      const std::string &query,
+      const std::map<std::string, communication::bolt::Value> &params) {
+    auto dba = session_data.db.Access();
+    KafkaResultStream stream;
+    std::map<std::string, query::TypedValue> params_tv;
+    for (const auto &kv : params)
+      params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
+    try {
+      session_data.interpreter(query, *dba, params_tv, false)
+        .PullAll(stream);
+      dba->Commit();
+    } catch (const query::QueryException &e) {
+      LOG(WARNING) << "[Kafka] query execution failed with an exception: "
+                   << e.what();
+      dba->Abort();
+    }
+  };
 
   integrations::kafka::Streams kafka_streams{
       std::experimental::filesystem::path(FLAGS_durability_directory) /
@@ -397,24 +397,24 @@ void MasterMain() {
   SessionData session_data{db};
 
   auto stream_writer =
-      [&session_data](
-          const std::string &query,
-          const std::map<std::string, communication::bolt::Value> &params) {
-        auto dba = session_data.db.Access();
-        KafkaResultStream stream;
-        std::map<std::string, query::TypedValue> params_tv;
-        for (const auto &kv : params)
-          params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
-        try {
-          session_data.interpreter(query, *dba, params_tv, false)
-              .PullAll(stream);
-          dba->Commit();
-        } catch (const query::QueryException &e) {
-          LOG(WARNING) << "[Kafka] query execution failed with an exception: "
-                       << e.what();
-          dba->Abort();
-        }
-      };
+    [&session_data](
+      const std::string &query,
+      const std::map<std::string, communication::bolt::Value> &params) {
+    auto dba = session_data.db.Access();
+    KafkaResultStream stream;
+    std::map<std::string, query::TypedValue> params_tv;
+    for (const auto &kv : params)
+      params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
+    try {
+      session_data.interpreter(query, *dba, params_tv, false)
+        .PullAll(stream);
+      dba->Commit();
+    } catch (const query::QueryException &e) {
+      LOG(WARNING) << "[Kafka] query execution failed with an exception: "
+                   << e.what();
+      dba->Abort();
+    }
+  };
 
   integrations::kafka::Streams kafka_streams{
       std::experimental::filesystem::path(FLAGS_durability_directory) /
diff --git a/src/query/frontend/ast/ast.cpp b/src/query/frontend/ast/ast.cpp
index f417bb9b4..b1a6c0d7b 100644
--- a/src/query/frontend/ast/ast.cpp
+++ b/src/query/frontend/ast/ast.cpp
@@ -2,6 +2,7 @@
 
 #include <algorithm>
 
+#include "distributed/serialization.hpp"
 #include "utils/serialization.capnp.h"
 
 namespace query {
@@ -235,7 +236,7 @@ void PrimitiveLiteral::Save(capnp::BaseLiteral::Builder *base_literal_builder,
   auto primitive_literal_builder = base_literal_builder->initPrimitiveLiteral();
   primitive_literal_builder.setTokenPosition(token_position_);
   auto typed_value_builder = primitive_literal_builder.getValue();
-  utils::SaveCapnpTypedValue(value_, &typed_value_builder);
+  distributed::SaveCapnpTypedValue(value_, &typed_value_builder);
 }
 
 void PrimitiveLiteral::Load(const capnp::Tree::Reader &reader,
@@ -245,7 +246,7 @@ void PrimitiveLiteral::Load(const capnp::Tree::Reader &reader,
   auto pl_reader =
       reader.getExpression().getBaseLiteral().getPrimitiveLiteral();
   auto typed_value_reader = pl_reader.getValue();
-  utils::LoadCapnpTypedValue(typed_value_reader, &value_);
+  distributed::LoadCapnpTypedValue(typed_value_reader, &value_);
   token_position_ = pl_reader.getTokenPosition();
 }
 
diff --git a/src/stats/CMakeLists.txt b/src/stats/CMakeLists.txt
new file mode 100644
index 000000000..d03a74d33
--- /dev/null
+++ b/src/stats/CMakeLists.txt
@@ -0,0 +1,73 @@
+set(stats_src_files
+    metrics.cpp
+    stats.cpp)
+
+set(lcp_exe ${CMAKE_SOURCE_DIR}/tools/lcp)
+set(lcp_src_files ../lisp/lcp.lisp ${lcp_exe})
+
+# Use this function to add each lcp file to generation. This way each file is
+# standalone and we avoid recompiling everything.
+#
+# You may pass a CAPNP_SCHEMA <id> keyword argument to generate the Cap'n Proto
+# serialization code from .lcp file. You still need to add the generated capnp
+# file through `add_capnp` function. To generate the <id> use `capnp id`
+# invocation, and specify it here. This preserves correct id information across
+# multiple schema generations. If this wasn't the case, wrong typeId
+# information will break RPC between different compilations of memgraph.
+#
+# NOTE: stats_src_files and stats_lcp_files are globally updated.
+# TODO: This is duplicated from src/CMakeLists.txt,
+# find a good way to generalize this on per
+# subdirectory basis.
+function(add_lcp lcp_file)
+  set(one_value_kwargs CAPNP_SCHEMA)
+  cmake_parse_arguments(KW "" "${one_value_kwargs}" "" ${ARGN})
+  string(REGEX REPLACE "\.lcp$" ".hpp" h_file
+         "${CMAKE_CURRENT_SOURCE_DIR}/${lcp_file}")
+  if (KW_CAPNP_SCHEMA)
+    string(REGEX REPLACE "\.lcp$" ".capnp" capnp_file
+           "${CMAKE_CURRENT_SOURCE_DIR}/${lcp_file}")
+    set(capnp_id ${KW_CAPNP_SCHEMA})
+    set(capnp_depend capnproto-proj)
+    set(cpp_file ${CMAKE_CURRENT_SOURCE_DIR}/${lcp_file}.cpp)
+    # Update *global* stats_src_files
+    set(stats_src_files ${stats_src_files} ${cpp_file} PARENT_SCOPE)
+  endif()
+  add_custom_command(OUTPUT ${h_file} ${cpp_file} ${capnp_file}
+    COMMAND ${lcp_exe} ${lcp_file} ${capnp_id}
+    VERBATIM
+    DEPENDS ${lcp_file} ${lcp_src_files} ${capnp_depend}
+    WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
+  # Update *global* stats_lcp_files
+  set(stats_lcp_files ${stats_lcp_files} ${h_file} ${cpp_file} ${capnp_file} PARENT_SCOPE)
+endfunction(add_lcp)
+
+# Use this function to add each capnp file to generation. This way each file is
+# standalone and we avoid recompiling everything.
+# NOTE: stats_src_files and stats_capnp_files are globally updated.
+# TODO: This is duplicated from src/CMakeLists.txt and
+# src/utils/CMakeLists.txt, find a good way to generalize this on per
+# subdirectory basis.
+function(add_capnp capnp_src_file)
+  set(cpp_file ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file}.c++)
+  set(h_file ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file}.h)
+  add_custom_command(OUTPUT ${cpp_file} ${h_file}
+    COMMAND ${CAPNP_EXE} compile -o${CAPNP_CXX_EXE} ${capnp_src_file} -I ${CMAKE_CURRENT_SOURCE_DIR}/..
+    DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file} capnproto-proj
+    WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
+  # Update *global* stats_capnp_files
+  set(stats_capnp_files ${stats_capnp_files} ${cpp_file} ${h_file} PARENT_SCOPE)
+  # Update *global* stats_src_files
+  set(stats_src_files ${stats_src_files} ${cpp_file} PARENT_SCOPE)
+endfunction(add_capnp)
+
+add_lcp(stats_rpc_messages.lcp CAPNP_SCHEMA @0xc19a87c81b9b4512)
+add_capnp(stats_rpc_messages.capnp)
+
+add_custom_target(generate_stats_lcp DEPENDS ${stats_lcp_files})
+add_custom_target(generate_stats_capnp DEPENDS generate_stats_lcp ${stats_capnp_files})
+
+add_library(mg-stats STATIC ${stats_src_files})
+target_link_libraries(mg-stats Threads::Threads mg-utils mg-io mg-communication fmt glog gflags)
+target_link_libraries(mg-stats capnp kj)
+add_dependencies(mg-stats generate_stats_capnp)
diff --git a/src/stats/stats.cpp b/src/stats/stats.cpp
index 2abc28c63..f27e4d969 100644
--- a/src/stats/stats.cpp
+++ b/src/stats/stats.cpp
@@ -4,9 +4,9 @@
 
 #include "communication/rpc/client.hpp"
 #include "data_structures/concurrent/push_queue.hpp"
-#include "utils/thread.hpp"
-
+#include "stats/metrics.hpp"
 #include "stats/stats_rpc_messages.hpp"
+#include "utils/thread.hpp"
 
 DEFINE_HIDDEN_string(statsd_address, "", "Stats server IP address");
 DEFINE_HIDDEN_int32(statsd_port, 2500, "Stats server port");
diff --git a/src/stats/stats.hpp b/src/stats/stats.hpp
index b3dd2f703..ce9ff5c20 100644
--- a/src/stats/stats.hpp
+++ b/src/stats/stats.hpp
@@ -7,7 +7,7 @@
 
 #include "gflags/gflags.h"
 
-#include "stats/metrics.hpp"
+#include "io/network/endpoint.hpp"
 
 namespace stats {
 
diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt
index 3f93c1419..0222f24d8 100644
--- a/src/utils/CMakeLists.txt
+++ b/src/utils/CMakeLists.txt
@@ -7,5 +7,27 @@ set(utils_src_files
     uuid.cpp
     watchdog.cpp)
 
+# Use this function to add each capnp file to generation. This way each file is
+# standalone and we avoid recompiling everything.
+# NOTE: utils_src_files and utils_capnp_files are globally updated.
+# TODO: This is duplicated from src/CMakeLists.txt,
+# find a good way to generalize this on per
+# subdirectory basis.
+function(add_capnp capnp_src_file)
+  set(cpp_file ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file}.c++)
+  set(h_file ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file}.h)
+  add_custom_command(OUTPUT ${cpp_file} ${h_file}
+    COMMAND ${CAPNP_EXE} compile -o${CAPNP_CXX_EXE} ${capnp_src_file} -I ${CMAKE_CURRENT_SOURCE_DIR}/..
+    DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${capnp_src_file} capnproto-proj
+    WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
+  # Update *global* stats_capnp_files
+  set(utils_capnp_files ${utils_capnp_files} ${cpp_file} ${h_file} PARENT_SCOPE)
+  # Update *global* stats_src_files
+  set(utils_src_files ${utils_src_files} ${cpp_file} PARENT_SCOPE)
+endfunction(add_capnp)
+
+add_capnp(serialization.capnp)
+
 add_library(mg-utils STATIC ${utils_src_files})
 target_link_libraries(mg-utils stdc++fs Threads::Threads fmt glog gflags uuid)
+target_link_libraries(mg-utils capnp kj)
diff --git a/src/utils/serialization.hpp b/src/utils/serialization.hpp
index 0c76cd582..e66daadd8 100644
--- a/src/utils/serialization.hpp
+++ b/src/utils/serialization.hpp
@@ -1,127 +1,16 @@
 #pragma once
 
 #include <experimental/optional>
+#include <limits>
+#include <vector>
 
-#include "distributed/serialization.capnp.h"
-#include "query/typed_value.hpp"
-#include "storage/edge.hpp"
-#include "storage/vertex.hpp"
-#include "utils/exceptions.hpp"
+#include <glog/logging.h>
+
+#include "utils/algorithm.hpp"
 #include "utils/serialization.capnp.h"
 
 namespace utils {
 
-inline void SaveCapnpTypedValue(
-    const query::TypedValue &value,
-    distributed::capnp::TypedValue::Builder *builder,
-    std::function<void(const query::TypedValue &,
-                       distributed::capnp::TypedValue::Builder *)>
-        save_graph_element = nullptr) {
-  switch (value.type()) {
-    case query::TypedValue::Type::Null:
-      builder->setNullType();
-      return;
-    case query::TypedValue::Type::Bool:
-      builder->setBool(value.Value<bool>());
-      return;
-    case query::TypedValue::Type::Int:
-      builder->setInteger(value.Value<int64_t>());
-      return;
-    case query::TypedValue::Type::Double:
-      builder->setDouble(value.Value<double>());
-      return;
-    case query::TypedValue::Type::String:
-      builder->setString(value.Value<std::string>());
-      return;
-    case query::TypedValue::Type::List: {
-      const auto &values = value.Value<std::vector<query::TypedValue>>();
-      auto list_builder = builder->initList(values.size());
-      for (size_t i = 0; i < values.size(); ++i) {
-        auto value_builder = list_builder[i];
-        SaveCapnpTypedValue(values[i], &value_builder, save_graph_element);
-      }
-      return;
-    }
-    case query::TypedValue::Type::Map: {
-      const auto &map = value.Value<std::map<std::string, query::TypedValue>>();
-      auto map_builder = builder->initMap(map.size());
-      size_t i = 0;
-      for (const auto &kv : map) {
-        auto kv_builder = map_builder[i];
-        kv_builder.setKey(kv.first);
-        auto value_builder = kv_builder.initValue();
-        SaveCapnpTypedValue(kv.second, &value_builder, save_graph_element);
-        ++i;
-      }
-      return;
-    }
-    case query::TypedValue::Type::Vertex:
-    case query::TypedValue::Type::Edge:
-    case query::TypedValue::Type::Path:
-      if (save_graph_element) {
-        save_graph_element(value, builder);
-      } else {
-        throw utils::BasicException(
-            "Unable to serialize TypedValue of type: {}", value.type());
-      }
-  }
-}
-
-inline void LoadCapnpTypedValue(
-    const distributed::capnp::TypedValue::Reader &reader,
-    query::TypedValue *value,
-    std::function<void(const distributed::capnp::TypedValue::Reader &,
-                       query::TypedValue *)>
-        load_graph_element = nullptr) {
-  switch (reader.which()) {
-    case distributed::capnp::TypedValue::NULL_TYPE:
-      *value = query::TypedValue::Null;
-      return;
-    case distributed::capnp::TypedValue::BOOL:
-      *value = reader.getBool();
-      return;
-    case distributed::capnp::TypedValue::INTEGER:
-      *value = reader.getInteger();
-      return;
-    case distributed::capnp::TypedValue::DOUBLE:
-      *value = reader.getDouble();
-      return;
-    case distributed::capnp::TypedValue::STRING:
-      *value = reader.getString().cStr();
-      return;
-    case distributed::capnp::TypedValue::LIST: {
-      std::vector<query::TypedValue> list;
-      list.reserve(reader.getList().size());
-      for (const auto &value_reader : reader.getList()) {
-        list.emplace_back();
-        LoadCapnpTypedValue(value_reader, &list.back(), load_graph_element);
-      }
-      *value = list;
-      return;
-    }
-    case distributed::capnp::TypedValue::MAP: {
-      std::map<std::string, query::TypedValue> map;
-      for (const auto &kv_reader : reader.getMap()) {
-        auto key = kv_reader.getKey().cStr();
-        LoadCapnpTypedValue(kv_reader.getValue(), &map[key],
-                            load_graph_element);
-      }
-      *value = map;
-      return;
-    }
-    case distributed::capnp::TypedValue::VERTEX:
-    case distributed::capnp::TypedValue::EDGE:
-    case distributed::capnp::TypedValue::PATH:
-      if (load_graph_element) {
-        load_graph_element(reader, value);
-      } else {
-        throw utils::BasicException(
-            "Unexpected TypedValue type '{}' when loading from archive",
-            reader.which());
-      }
-  }
-}
-
 template <typename T>
 inline void SaveVector(const std::vector<T> &data,
                        typename ::capnp::List<T>::Builder *list_builder) {
diff --git a/tests/macro_benchmark/clients/card_fraud_client.cpp b/tests/macro_benchmark/clients/card_fraud_client.cpp
index fdba02efb..1de65a906 100644
--- a/tests/macro_benchmark/clients/card_fraud_client.cpp
+++ b/tests/macro_benchmark/clients/card_fraud_client.cpp
@@ -5,6 +5,7 @@
 
 #include "gflags/gflags.h"
 
+#include "communication/rpc/client.hpp"
 #include "stats/stats.hpp"
 #include "stats/stats_rpc_messages.hpp"
 #include "utils/thread/sync.hpp"
@@ -363,10 +364,10 @@ int main(int argc, char **argv) {
     CHECK(FLAGS_num_workers >= 2)
         << "There should be at least 2 client workers (analytic and cleanup)";
     CHECK(num_pos == config["num_workers"].get<int>() *
-                         config["pos_per_worker"].get<int>())
+              config["pos_per_worker"].get<int>())
         << "Wrong number of POS per worker";
     CHECK(num_cards == config["num_workers"].get<int>() *
-                           config["cards_per_worker"].get<int>())
+              config["cards_per_worker"].get<int>())
         << "Wrong number of cards per worker";
     for (int i = 0; i < FLAGS_num_workers - 2; ++i) {
       clients.emplace_back(std::make_unique<CardFraudClient>(i, config));
diff --git a/tools/tests/statsd/mg_statsd_client.cpp b/tools/tests/statsd/mg_statsd_client.cpp
index f96343236..c9664486c 100644
--- a/tools/tests/statsd/mg_statsd_client.cpp
+++ b/tools/tests/statsd/mg_statsd_client.cpp
@@ -1,6 +1,7 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+#include "communication/rpc/client.hpp"
 #include "stats/stats.hpp"
 #include "stats/stats_rpc_messages.hpp"
 #include "utils/string.hpp"