Separate distributed from single node storage

Summary:
This diff splits single node and distributed storage from each other.
Currently all of the storage code is copied into two directories (one single
node, one distributed).  The logic used in the storage implementation isn't
touched, it will be refactored in following diffs.

To clean the working directory after this diff you should execute:
```
rm database/state_delta.capnp
rm database/state_delta.hpp
rm storage/concurrent_id_mapper_rpc_messages.capnp
rm storage/concurrent_id_mapper_rpc_messages.hpp
```

Reviewers: teon.banek, buda, msantl

Reviewed By: teon.banek, msantl

Subscribers: teon.banek, pullbot

Differential Revision: https://phabricator.memgraph.io/D1625
This commit is contained in:
Matej Ferencevic 2018-10-04 15:23:07 +02:00
parent 96537eb181
commit 75950664a7
211 changed files with 6469 additions and 1117 deletions

11
.gitignore vendored
View File

@ -42,8 +42,8 @@ TAGS
*.lcp.cpp
src/database/counters_rpc_messages.capnp
src/database/counters_rpc_messages.hpp
src/database/state_delta.capnp
src/database/state_delta.hpp
src/database/serialization.capnp
src/database/serialization.hpp
src/distributed/bfs_rpc_messages.capnp
src/distributed/bfs_rpc_messages.hpp
src/distributed/coordination_rpc_messages.capnp
@ -69,12 +69,15 @@ src/distributed/updates_rpc_messages.hpp
src/query/frontend/ast/ast.hpp
src/query/frontend/ast/ast_serialization.capnp
src/query/frontend/ast/ast_serialization.hpp
src/durability/distributed/state_delta.capnp
src/durability/distributed/state_delta.hpp
src/durability/single_node/state_delta.hpp
src/query/plan/distributed_ops.capnp
src/query/plan/distributed_ops.hpp
src/query/plan/operator.hpp
src/stats/stats_rpc_messages.capnp
src/stats/stats_rpc_messages.hpp
src/storage/concurrent_id_mapper_rpc_messages.capnp
src/storage/concurrent_id_mapper_rpc_messages.hpp
src/storage/distributed/concurrent_id_mapper_rpc_messages.capnp
src/storage/distributed/concurrent_id_mapper_rpc_messages.hpp
src/transactions/distributed/engine_rpc_messages.capnp
src/transactions/distributed/engine_rpc_messages.hpp

View File

@ -1,7 +1,9 @@
- name: Binaries
archive:
- build_debug/memgraph
- build_debug/memgraph_distributed
- build_release/memgraph
- build_release/memgraph_distributed
- build_release/tools/src/mg_import_csv
- build_release/tools/src/mg_statsd
- config

View File

@ -31,7 +31,7 @@
mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=release ..
TIMEOUT=1200 make -j$THREADS memgraph tools memgraph__macro_benchmark memgraph__stress memgraph__manual__card_fraud_generate_snapshot memgraph__feature_benchmark__kafka__benchmark
TIMEOUT=1200 make -j$THREADS memgraph memgraph_distributed tools memgraph__macro_benchmark memgraph__stress memgraph__manual__card_fraud_generate_snapshot memgraph__feature_benchmark__kafka__benchmark
# Generate distributed card fraud dataset.
cd ../tests/distributed/card_fraud

View File

@ -128,7 +128,7 @@ set(lcp_src_files ${CMAKE_SOURCE_DIR}/src/lisp/lcp.lisp ${lcp_exe})
# file through `add_capnp` function. To generate the <id> use `capnp id`
# invocation, and specify it here. This preserves correct id information across
# multiple schema generations. If this wasn't the case, wrong typeId
# information will break serialization between different compilations of memgraph.
# information will break serialization between different compilations.
macro(define_add_lcp name main_src_files generated_lcp_files)
function(${name} lcp_file)
set(one_value_kwargs CAPNP_SCHEMA)

View File

@ -10,15 +10,85 @@ add_subdirectory(communication)
add_subdirectory(stats)
add_subdirectory(auth)
# all memgraph src files
set(memgraph_src_files
# ----------------------------------------------------------------------------
# Memgraph Single Node
# ----------------------------------------------------------------------------
set(mg_single_node_sources
data_structures/concurrent/skiplist_gc.cpp
database/config.cpp
database/distributed_counters.cpp
database/distributed_graph_db.cpp
database/graph_db.cpp
database/graph_db_accessor.cpp
database/state_delta.cpp
durability/single_node/state_delta.cpp
durability/paths.cpp
durability/single_node/recovery.cpp
durability/single_node/snapshooter.cpp
durability/single_node/wal.cpp
glue/auth.cpp
glue/communication.cpp
query/common.cpp
query/frontend/ast/ast.cpp
query/frontend/ast/cypher_main_visitor.cpp
query/frontend/semantic/required_privileges.cpp
query/frontend/semantic/symbol_generator.cpp
query/frontend/stripped.cpp
query/interpret/awesome_memgraph_functions.cpp
query/interpreter.cpp
query/plan/operator.cpp
query/plan/preprocess.cpp
query/plan/pretty_print.cpp
query/plan/rule_based_planner.cpp
query/plan/variable_start_planner.cpp
query/repl.cpp
query/typed_value.cpp
storage/single_node/edge_accessor.cpp
storage/locking/record_lock.cpp
storage/common/property_value.cpp
storage/common/property_value_store.cpp
storage/single_node/record_accessor.cpp
storage/single_node/vertex_accessor.cpp
transactions/single_node/engine_single_node.cpp
memgraph_init.cpp
)
define_add_lcp(add_lcp_single_node mg_single_node_sources generated_lcp_single_node_files)
add_lcp_single_node(durability/single_node/state_delta.lcp)
add_lcp_single_node(query/frontend/ast/ast.lcp)
add_lcp_single_node(query/plan/operator.lcp)
add_custom_target(generate_lcp_single_node DEPENDS ${generated_lcp_single_node_files})
set(MG_SINGLE_NODE_LIBS stdc++fs Threads::Threads fmt cppitertools
antlr_opencypher_parser_lib dl glog gflags capnp kj
mg-utils mg-io mg-integrations-kafka mg-requests mg-communication mg-auth mg-stats)
if (USE_LTALLOC)
list(APPEND MG_SINGLE_NODE_LIBS ltalloc)
# TODO(mferencevic): Enable this when clang is updated on apollo.
# set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -flto")
endif()
if (READLINE_FOUND)
list(APPEND MG_SINGLE_NODE_LIBS readline)
endif()
add_library(mg-single-node STATIC ${mg_single_node_sources})
target_link_libraries(mg-single-node ${MG_SINGLE_NODE_LIBS})
add_dependencies(mg-single-node generate_opencypher_parser)
add_dependencies(mg-single-node generate_lcp_single_node)
target_compile_definitions(mg-single-node PUBLIC MG_SINGLE_NODE)
# ----------------------------------------------------------------------------
# END Memgraph Single Node
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Memgraph Distributed
# ----------------------------------------------------------------------------
set(mg_distributed_sources
database/distributed_counters.cpp
database/distributed_graph_db.cpp
distributed/bfs_rpc_clients.cpp
distributed/bfs_subcursor.cpp
distributed/cluster_discovery_master.cpp
@ -41,14 +111,25 @@ set(memgraph_src_files
distributed/pull_rpc_clients.cpp
distributed/updates_rpc_clients.cpp
distributed/updates_rpc_server.cpp
query/distributed_interpreter.cpp
query/plan/distributed.cpp
query/plan/distributed_ops.cpp
query/plan/distributed_pretty_print.cpp
storage/distributed/concurrent_id_mapper_master.cpp
storage/distributed/concurrent_id_mapper_worker.cpp
transactions/distributed/engine_master.cpp
transactions/distributed/engine_worker.cpp
data_structures/concurrent/skiplist_gc.cpp
database/config.cpp
database/graph_db_accessor.cpp
durability/distributed/state_delta.cpp
durability/paths.cpp
durability/recovery.cpp
durability/snapshooter.cpp
durability/wal.cpp
durability/distributed/recovery.cpp
durability/distributed/snapshooter.cpp
durability/distributed/wal.cpp
glue/auth.cpp
glue/communication.cpp
query/common.cpp
query/distributed_interpreter.cpp
query/frontend/ast/ast.cpp
query/frontend/ast/cypher_main_visitor.cpp
query/frontend/semantic/required_privileges.cpp
@ -56,9 +137,6 @@ set(memgraph_src_files
query/frontend/stripped.cpp
query/interpret/awesome_memgraph_functions.cpp
query/interpreter.cpp
query/plan/distributed.cpp
query/plan/distributed_ops.cpp
query/plan/distributed_pretty_print.cpp
query/plan/operator.cpp
query/plan/preprocess.cpp
query/plan/pretty_print.cpp
@ -67,115 +145,106 @@ set(memgraph_src_files
query/repl.cpp
query/serialization.cpp
query/typed_value.cpp
storage/concurrent_id_mapper_master.cpp
storage/concurrent_id_mapper_worker.cpp
storage/edge_accessor.cpp
storage/common/property_value.cpp
storage/common/property_value_store.cpp
storage/distributed/edge_accessor.cpp
storage/distributed/record_accessor.cpp
storage/distributed/serialization.cpp
storage/distributed/vertex_accessor.cpp
storage/locking/record_lock.cpp
storage/property_value.cpp
storage/property_value_store.cpp
storage/record_accessor.cpp
storage/serialization.cpp
storage/vertex_accessor.cpp
transactions/distributed/engine_master.cpp
transactions/distributed/engine_worker.cpp
transactions/single_node/engine_single_node.cpp
memgraph_init.cpp
transactions/single_node/engine_single_node.cpp
)
# -----------------------------------------------------------------------------
define_add_capnp(memgraph_src_files generated_capnp_files)
define_add_capnp(mg_distributed_sources generated_capnp_files)
define_add_lcp(add_lcp memgraph_src_files generated_lcp_files)
define_add_lcp(add_lcp_distributed mg_distributed_sources generated_lcp_distributed_files)
add_lcp(database/counters_rpc_messages.lcp CAPNP_SCHEMA @0x95a2c3ea3871e945)
add_lcp_distributed(durability/distributed/state_delta.lcp)
add_lcp_distributed(database/counters_rpc_messages.lcp CAPNP_SCHEMA @0x95a2c3ea3871e945)
add_capnp(database/counters_rpc_messages.capnp)
add_lcp(database/state_delta.lcp CAPNP_SCHEMA @0xdea01657b3563887)
add_capnp(database/state_delta.capnp)
add_lcp(distributed/bfs_rpc_messages.lcp CAPNP_SCHEMA @0x8e508640b09b6d2a)
add_lcp_distributed(database/serialization.lcp CAPNP_SCHEMA @0xdea01657b3563887
DEPENDS durability/distributed/state_delta.lcp)
add_capnp(database/serialization.capnp)
add_lcp_distributed(distributed/bfs_rpc_messages.lcp CAPNP_SCHEMA @0x8e508640b09b6d2a)
add_capnp(distributed/bfs_rpc_messages.capnp)
add_lcp(distributed/coordination_rpc_messages.lcp CAPNP_SCHEMA @0x93df0c4703cf98fb)
add_lcp_distributed(distributed/coordination_rpc_messages.lcp CAPNP_SCHEMA @0x93df0c4703cf98fb)
add_capnp(distributed/coordination_rpc_messages.capnp)
add_lcp(distributed/data_rpc_messages.lcp CAPNP_SCHEMA @0xc1c8a341ba37aaf5)
add_lcp_distributed(distributed/data_rpc_messages.lcp CAPNP_SCHEMA @0xc1c8a341ba37aaf5)
add_capnp(distributed/data_rpc_messages.capnp)
add_lcp(distributed/durability_rpc_messages.lcp CAPNP_SCHEMA @0xf5e53bc271e2163d)
add_lcp_distributed(distributed/durability_rpc_messages.lcp CAPNP_SCHEMA @0xf5e53bc271e2163d)
add_capnp(distributed/durability_rpc_messages.capnp)
add_lcp(distributed/index_rpc_messages.lcp CAPNP_SCHEMA @0xa8aab46862945bd6)
add_lcp_distributed(distributed/index_rpc_messages.lcp CAPNP_SCHEMA @0xa8aab46862945bd6)
add_capnp(distributed/index_rpc_messages.capnp)
add_lcp(distributed/plan_rpc_messages.lcp CAPNP_SCHEMA @0xfcbc48dc9f106d28)
add_lcp_distributed(distributed/plan_rpc_messages.lcp CAPNP_SCHEMA @0xfcbc48dc9f106d28)
add_capnp(distributed/plan_rpc_messages.capnp)
add_lcp(distributed/pull_produce_rpc_messages.lcp CAPNP_SCHEMA @0xa78a9254a73685bd
DEPENDS transactions/distributed/serialization.lcp)
add_lcp_distributed(distributed/pull_produce_rpc_messages.lcp CAPNP_SCHEMA @0xa78a9254a73685bd
DEPENDS transactions/distributed/serialization.lcp)
add_capnp(distributed/pull_produce_rpc_messages.capnp)
add_lcp(distributed/storage_gc_rpc_messages.lcp CAPNP_SCHEMA @0xd705663dfe36cf81)
add_lcp_distributed(distributed/storage_gc_rpc_messages.lcp CAPNP_SCHEMA @0xd705663dfe36cf81)
add_capnp(distributed/storage_gc_rpc_messages.capnp)
add_lcp(distributed/token_sharing_rpc_messages.lcp CAPNP_SCHEMA @0x8f295db54ec4caec)
add_lcp_distributed(distributed/token_sharing_rpc_messages.lcp CAPNP_SCHEMA @0x8f295db54ec4caec)
add_capnp(distributed/token_sharing_rpc_messages.capnp)
add_lcp(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a)
add_lcp_distributed(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a)
add_capnp(distributed/updates_rpc_messages.capnp)
add_lcp(distributed/dynamic_worker_rpc_messages.lcp CAPNP_SCHEMA @0x8c53f6c9a0c71b05)
add_lcp_distributed(distributed/dynamic_worker_rpc_messages.lcp CAPNP_SCHEMA @0x8c53f6c9a0c71b05)
add_capnp(distributed/dynamic_worker_rpc_messages.capnp)
# distributed_ops.lcp is leading the capnp code generation, so we don't need
# to generate any capnp for operator.lcp
add_lcp(query/frontend/ast/ast.lcp)
add_lcp(query/frontend/ast/ast_serialization.lcp CAPNP_SCHEMA @0xb107d3d6b4b1600b
add_lcp_distributed(query/frontend/ast/ast.lcp)
add_lcp_distributed(query/frontend/ast/ast_serialization.lcp CAPNP_SCHEMA @0xb107d3d6b4b1600b
DEPENDS query/frontend/ast/ast.lcp)
add_capnp(query/frontend/ast/ast_serialization.capnp)
add_lcp(query/plan/operator.lcp)
add_lcp(query/plan/distributed_ops.lcp CAPNP_SCHEMA @0xe5cae8d045d30c42
add_lcp_distributed(query/plan/operator.lcp)
add_lcp_distributed(query/plan/distributed_ops.lcp CAPNP_SCHEMA @0xe5cae8d045d30c42
DEPENDS query/plan/operator.lcp)
add_capnp(query/plan/distributed_ops.capnp)
add_lcp(storage/concurrent_id_mapper_rpc_messages.lcp CAPNP_SCHEMA @0xa6068dae93d225dd)
add_capnp(storage/concurrent_id_mapper_rpc_messages.capnp)
add_lcp(transactions/distributed/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5
add_lcp_distributed(storage/distributed/concurrent_id_mapper_rpc_messages.lcp CAPNP_SCHEMA @0xa6068dae93d225dd)
add_capnp(storage/distributed/concurrent_id_mapper_rpc_messages.capnp)
add_lcp_distributed(transactions/distributed/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5
DEPENDS transactions/distributed/serialization.lcp)
add_capnp(transactions/distributed/engine_rpc_messages.capnp)
add_custom_target(generate_lcp DEPENDS ${generated_lcp_files})
add_custom_target(generate_lcp_distributed DEPENDS ${generated_lcp_distributed_files})
# Registering capnp must come after registering lcp files.
add_capnp(communication/rpc/messages.capnp)
add_capnp(durability/recovery.capnp)
add_capnp(durability/distributed/serialization.capnp)
add_capnp(query/frontend/semantic/symbol.capnp)
add_capnp(query/serialization.capnp)
add_capnp(storage/serialization.capnp)
add_capnp(storage/distributed/serialization.capnp)
add_custom_target(generate_capnp DEPENDS generate_lcp ${generated_capnp_files})
add_custom_target(generate_capnp DEPENDS generate_lcp_distributed ${generated_capnp_files})
# -----------------------------------------------------------------------------
string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)
# memgraph_lib depend on these libraries
set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools
set(MG_DISTRIBUTED_LIBS stdc++fs Threads::Threads fmt cppitertools
antlr_opencypher_parser_lib dl glog gflags capnp kj
mg-utils mg-io mg-integrations-kafka mg-requests mg-communication mg-auth mg-stats)
if (USE_LTALLOC)
list(APPEND MEMGRAPH_ALL_LIBS ltalloc)
# TODO(mferencevic): Enable this when clang is updated on apollo.
# set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -flto")
endif()
if (READLINE_FOUND)
list(APPEND MEMGRAPH_ALL_LIBS readline)
endif()
# STATIC library used by memgraph executables
add_library(memgraph_lib STATIC ${memgraph_src_files})
target_link_libraries(memgraph_lib ${MEMGRAPH_ALL_LIBS})
add_dependencies(memgraph_lib generate_opencypher_parser)
add_dependencies(memgraph_lib generate_lcp)
add_dependencies(memgraph_lib generate_capnp)
add_library(mg-distributed STATIC ${mg_distributed_sources})
target_link_libraries(mg-distributed ${MG_DISTRIBUTED_LIBS})
add_dependencies(mg-distributed generate_opencypher_parser)
add_dependencies(mg-distributed generate_lcp_distributed)
add_dependencies(mg-distributed generate_capnp)
target_compile_definitions(mg-distributed PUBLIC MG_DISTRIBUTED)
# ----------------------------------------------------------------------------
# END Memgraph Distributed
# ----------------------------------------------------------------------------
string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)
# STATIC library used to store key-value pairs
add_library(kvstore_lib STATIC storage/kvstore.cpp)
add_library(kvstore_lib STATIC storage/kvstore/kvstore.cpp)
target_link_libraries(kvstore_lib stdc++fs mg-utils rocksdb bzip2 zlib glog gflags)
# STATIC library for dummy key-value storage
add_library(kvstore_dummy_lib STATIC storage/kvstore_dummy.cpp)
add_library(kvstore_dummy_lib STATIC storage/kvstore/kvstore_dummy.cpp)
target_link_libraries(kvstore_dummy_lib mg-utils)
# Generate a version.hpp file
@ -184,8 +253,8 @@ configure_file(version.hpp.in version.hpp @ONLY)
include_directories(${CMAKE_CURRENT_BINARY_DIR})
# memgraph main executable
add_executable(memgraph memgraph_bolt.cpp)
target_link_libraries(memgraph memgraph_lib kvstore_lib telemetry_lib)
add_executable(memgraph memgraph.cpp)
target_link_libraries(memgraph mg-single-node kvstore_lib telemetry_lib)
set_target_properties(memgraph PROPERTIES
# Set the executable output name to include version information.
OUTPUT_NAME "memgraph-${memgraph_VERSION}-${COMMIT_HASH}_${CMAKE_BUILD_TYPE}"
@ -241,3 +310,18 @@ install(
${CMAKE_BINARY_DIR}/tests/manual/bolt_client
WORKING_DIRECTORY ${examples})")
install(DIRECTORY ${examples}/build/ DESTINATION share/memgraph/examples)
# memgraph distributed main executable
add_executable(memgraph_distributed memgraph_distributed.cpp)
target_link_libraries(memgraph_distributed mg-distributed kvstore_lib telemetry_lib)
set_target_properties(memgraph_distributed PROPERTIES
# Set the executable output name to include version information.
OUTPUT_NAME "memgraph_distributed-${memgraph_VERSION}-${COMMIT_HASH}_${CMAKE_BUILD_TYPE}"
# Output the executable in main binary dir.
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
# Create symlink to the built executable.
add_custom_command(TARGET memgraph_distributed POST_BUILD
COMMAND ${CMAKE_COMMAND} -E create_symlink $<TARGET_FILE:memgraph_distributed> ${CMAKE_BINARY_DIR}/memgraph_distributed
BYPRODUCTS ${CMAKE_BINARY_DIR}/memgraph_distributed
COMMENT Creating symlink to memgraph distributed executable)

View File

@ -6,7 +6,7 @@
#include "auth/exceptions.hpp"
#include "auth/models.hpp"
#include "storage/kvstore.hpp"
#include "storage/kvstore/kvstore.hpp"
namespace auth {

View File

@ -1,7 +1,13 @@
#include <limits>
#include "database/graph_db.hpp"
#include "storage/gid.hpp"
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "storage/single_node/gid.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "storage/distributed/gid.hpp"
#endif
#include "utils/flag_validation.hpp"
#include "utils/string.hpp"

View File

@ -1,8 +1,6 @@
#include "database/distributed_graph_db.hpp"
#include "database/distributed_counters.hpp"
#include "database/storage_gc_master.hpp"
#include "database/storage_gc_worker.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "distributed/bfs_rpc_server.hpp"
#include "distributed/bfs_subcursor.hpp"
@ -24,12 +22,14 @@
#include "distributed/token_sharing_rpc_server.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "durability/snapshooter.hpp"
#include "durability/distributed/snapshooter.hpp"
// TODO: Why do we depend on query here?
#include "query/exceptions.hpp"
#include "storage/concurrent_id_mapper.hpp"
#include "storage/concurrent_id_mapper_master.hpp"
#include "storage/concurrent_id_mapper_worker.hpp"
#include "storage/common/concurrent_id_mapper.hpp"
#include "storage/distributed/concurrent_id_mapper_master.hpp"
#include "storage/distributed/concurrent_id_mapper_worker.hpp"
#include "storage/distributed/storage_gc_master.hpp"
#include "storage/distributed/storage_gc_worker.hpp"
#include "transactions/distributed/engine_master.hpp"
#include "transactions/distributed/engine_worker.hpp"
#include "utils/file.hpp"

View File

@ -3,7 +3,7 @@
#pragma once
#include "database/graph_db.hpp"
#include "durability/version.hpp"
#include "durability/distributed/version.hpp"
namespace distributed {
class BfsRpcServer;

View File

@ -6,11 +6,11 @@
#include "database/graph_db_accessor.hpp"
#include "database/single_node_counters.hpp"
#include "database/storage_gc_single_node.hpp"
#include "durability/paths.hpp"
#include "durability/recovery.hpp"
#include "durability/snapshooter.hpp"
#include "storage/concurrent_id_mapper_single_node.hpp"
#include "durability/single_node/recovery.hpp"
#include "durability/single_node/snapshooter.hpp"
#include "storage/single_node/concurrent_id_mapper_single_node.hpp"
#include "storage/single_node/storage_gc_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
#include "utils/file.hpp"

View File

@ -6,16 +6,29 @@
#include <vector>
#include "database/counters.hpp"
#include "database/storage.hpp"
#include "database/storage_gc.hpp"
#include "durability/recovery.hpp"
#include "durability/wal.hpp"
#include "io/network/endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp"
#include "storage/types.hpp"
#include "storage/common/concurrent_id_mapper.hpp"
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "durability/single_node/recovery.hpp"
#include "durability/single_node/wal.hpp"
#include "storage/common/types.hpp"
#include "storage/single_node/storage.hpp"
#include "storage/single_node/storage_gc.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "durability/distributed/recovery.hpp"
#include "durability/distributed/wal.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/storage.hpp"
#include "storage/distributed/storage_gc.hpp"
#include "storage/distributed/vertex_accessor.hpp"
#endif
namespace database {
/// Database configuration. Initialized from flags, but modifiable.

View File

@ -5,12 +5,24 @@
#include <glog/logging.h>
#include "database/state_delta.hpp"
#include "storage/address_types.hpp"
#include "storage/edge.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/vertex.hpp"
#include "storage/vertex_accessor.hpp"
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "durability/single_node/state_delta.hpp"
#include "storage/single_node/address_types.hpp"
#include "storage/single_node/edge.hpp"
#include "storage/single_node/edge_accessor.hpp"
#include "storage/single_node/vertex.hpp"
#include "storage/single_node/vertex_accessor.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "durability/distributed/state_delta.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/edge.hpp"
#include "storage/distributed/edge_accessor.hpp"
#include "storage/distributed/vertex.hpp"
#include "storage/distributed/vertex_accessor.hpp"
#endif
#include "utils/cast.hpp"
#include "utils/on_scope_exit.hpp"

View File

@ -11,15 +11,25 @@
#include <cppitertools/imap.hpp>
#include "database/graph_db.hpp"
#include "storage/address_types.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/types.hpp"
#include "storage/vertex_accessor.hpp"
#include "transactions/transaction.hpp"
#include "transactions/type.hpp"
#include "utils/bound.hpp"
#include "utils/exceptions.hpp"
// TODO: THIS IS A HACK!
#ifdef MG_SINGLE_NODE
#include "storage/common/types.hpp"
#include "storage/single_node/address_types.hpp"
#include "storage/single_node/edge_accessor.hpp"
#include "storage/single_node/vertex_accessor.hpp"
#endif
#ifdef MG_DISTRIBUTED
#include "storage/common/types.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/edge_accessor.hpp"
#include "storage/distributed/vertex_accessor.hpp"
#endif
namespace database {
/** Thrown when creating an index which already exists. */

View File

@ -0,0 +1,10 @@
#>cpp
#pragma once
#include "database/serialization.capnp.h"
#include "durability/distributed/state_delta.hpp"
#include "storage/distributed/serialization.hpp"
cpp<#
;; Generate serialization of state-delta
(load "durability/distributed/state_delta.lcp")

View File

@ -9,7 +9,7 @@
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/plan/distributed_ops.hpp"
#include "query/serialization.hpp"
#include "storage/serialization.hpp"
#include "storage/distributed/serialization.hpp"
#include "transactions/type.hpp"
#include "utils/serialization.hpp"
cpp<#
@ -21,7 +21,7 @@ cpp<#
(lcp:capnp-import 'ast "/query/frontend/ast/ast_serialization.capnp")
(lcp:capnp-import 'dist-ops "/query/plan/distributed_ops.capnp")
(lcp:capnp-import 'query "/query/serialization.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'storage "/storage/distributed/serialization.capnp")
(lcp:capnp-import 'symbol "/query/frontend/semantic/symbol.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")

View File

@ -6,7 +6,7 @@
#include "distributed/bfs_rpc_clients.hpp"
#include "query/exceptions.hpp"
#include "query/plan/operator.hpp"
#include "storage/address_types.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/vertex_accessor.hpp"
namespace distributed {

View File

@ -6,7 +6,7 @@
#include <unordered_map>
#include "distributed/data_rpc_clients.hpp"
#include "storage/gid.hpp"
#include "storage/distributed/gid.hpp"
namespace database {
class Storage;

View File

@ -5,7 +5,7 @@
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "distributed/coordination_worker.hpp"
#include "durability/recovery.hpp"
#include "durability/distributed/recovery.hpp"
namespace distributed {
using Server = communication::rpc::Server;

View File

@ -8,7 +8,7 @@
#include <unordered_map>
#include "distributed/coordination.hpp"
#include "durability/recovery.hpp"
#include "durability/distributed/recovery.hpp"
#include "io/network/endpoint.hpp"
#include "utils/scheduler.hpp"

View File

@ -6,7 +6,8 @@
#include "communication/rpc/messages.hpp"
#include "distributed/coordination_rpc_messages.capnp.h"
#include "durability/recovery.hpp"
#include "durability/distributed/recovery.hpp"
#include "durability/distributed/serialization.hpp"
#include "io/network/endpoint.hpp"
cpp<#
@ -14,7 +15,7 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'dur "/durability/recovery.capnp")
(lcp:capnp-import 'dur "/durability/distributed/serialization.capnp")
(lcp:capnp-import 'io "/io/network/endpoint.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")

View File

@ -1,5 +1,6 @@
#include "distributed/data_manager.hpp"
#include "database/storage.hpp"
#include "storage/distributed/storage.hpp"
namespace {

View File

@ -1,9 +1,10 @@
#include "distributed/data_rpc_clients.hpp"
#include <unordered_map>
#include "distributed/data_rpc_clients.hpp"
#include "distributed/data_rpc_messages.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
#include "storage/distributed/edge.hpp"
#include "storage/distributed/vertex.hpp"
namespace distributed {

View File

@ -8,7 +8,7 @@
#include <utility>
#include "distributed/coordination.hpp"
#include "storage/gid.hpp"
#include "storage/distributed/gid.hpp"
#include "transactions/type.hpp"
namespace distributed {

View File

@ -6,10 +6,10 @@
#include "communication/rpc/messages.hpp"
#include "distributed/data_rpc_messages.capnp.h"
#include "storage/edge.hpp"
#include "storage/gid.hpp"
#include "storage/vertex.hpp"
#include "storage/serialization.hpp"
#include "storage/distributed/edge.hpp"
#include "storage/distributed/gid.hpp"
#include "storage/distributed/serialization.hpp"
#include "storage/distributed/vertex.hpp"
#include "transactions/type.hpp"
cpp<#
@ -18,7 +18,7 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'storage "/storage/distributed/serialization.capnp")
(lcp:define-struct tx-gid-pair ()
((tx-id "tx::TransactionId" :capnp-type "UInt64")

View File

@ -5,8 +5,8 @@
#include <thread>
#include <unordered_map>
#include "storage/gid.hpp"
#include "storage/vertex_accessor.hpp"
#include "storage/distributed/gid.hpp"
#include "storage/distributed/vertex_accessor.hpp"
namespace database {
class GraphDbAccessor;

View File

@ -5,8 +5,8 @@
#include <utility>
#include "distributed/coordination.hpp"
#include "durability/recovery.hpp"
#include "storage/gid.hpp"
#include "durability/distributed/recovery.hpp"
#include "storage/distributed/gid.hpp"
#include "transactions/type.hpp"
namespace distributed {

View File

@ -3,13 +3,14 @@
#include "communication/rpc/messages.hpp"
#include "distributed/durability_rpc_messages.capnp.h"
#include "durability/recovery.hpp"
#include "durability/distributed/recovery.hpp"
#include "durability/distributed/serialization.hpp"
#include "transactions/transaction.hpp"
cpp<#
(lcp:namespace distributed)
(lcp:capnp-import 'dur "/durability/recovery.capnp")
(lcp:capnp-import 'dur "/durability/distributed/serialization.capnp")
(lcp:capnp-namespace "distributed")

View File

@ -6,7 +6,8 @@
#include "communication/rpc/messages.hpp"
#include "distributed/index_rpc_messages.capnp.h"
#include "storage/types.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/serialization.hpp"
#include "transactions/transaction.hpp"
cpp<#
@ -14,7 +15,7 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'storage "/storage/distributed/serialization.capnp")
(lcp:define-rpc populate-index
(:request

View File

@ -11,7 +11,7 @@
#include "query/frontend/semantic/symbol.hpp"
#include "query/parameters.hpp"
#include "query/serialization.hpp"
#include "storage/address_types.hpp"
#include "storage/distributed/address_types.hpp"
#include "transactions/type.hpp"
#include "utils/serialization.hpp"
cpp<#
@ -29,7 +29,7 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'storage "/storage/distributed/serialization.capnp")
(lcp:capnp-import 'query "/query/serialization.capnp")
(lcp:capnp-import 'sem "/query/frontend/semantic/symbol.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")

View File

@ -1,8 +1,9 @@
#include "distributed/pull_rpc_clients.hpp"
#include <functional>
#include "distributed/pull_rpc_clients.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
#include "storage/distributed/edge.hpp"
#include "storage/distributed/vertex.hpp"
namespace distributed {

View File

@ -3,13 +3,13 @@
#include <unordered_map>
#include <vector>
#include "database/state_delta.hpp"
#include "distributed/coordination.hpp"
#include "distributed/updates_rpc_messages.hpp"
#include "durability/distributed/state_delta.hpp"
#include "query/typed_value.hpp"
#include "storage/address_types.hpp"
#include "storage/gid.hpp"
#include "storage/types.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/gid.hpp"
#include "transactions/type.hpp"
#include "utils/future.hpp"

View File

@ -4,10 +4,12 @@
#include <unordered_map>
#include "communication/rpc/messages.hpp"
#include "database/state_delta.hpp"
#include "database/serialization.hpp"
#include "distributed/updates_rpc_messages.capnp.h"
#include "storage/address_types.hpp"
#include "storage/gid.hpp"
#include "durability/distributed/state_delta.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/gid.hpp"
#include "storage/distributed/serialization.hpp"
#include "transactions/type.hpp"
#include "utils/serialization.hpp"
cpp<#
@ -16,8 +18,8 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'db "/database/state_delta.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'db "/database/serialization.capnp")
(lcp:capnp-import 'storage "/storage/distributed/serialization.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
(lcp:capnp-type-conversion "tx::TransactionId" "UInt64")

View File

@ -11,13 +11,13 @@
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
#include "distributed/updates_rpc_messages.hpp"
#include "durability/distributed/state_delta.hpp"
#include "query/typed_value.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/gid.hpp"
#include "storage/types.hpp"
#include "storage/vertex_accessor.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/edge_accessor.hpp"
#include "storage/distributed/gid.hpp"
#include "storage/distributed/vertex_accessor.hpp"
#include "transactions/type.hpp"
#include "utils/thread/sync.hpp"

View File

@ -1,20 +1,21 @@
#include "durability/recovery.hpp"
#include "durability/distributed/recovery.hpp"
#include <experimental/filesystem>
#include <limits>
#include <unordered_map>
#include "database/graph_db_accessor.hpp"
#include "database/indexes/label_property_index.hpp"
#include "durability/distributed/snapshot_decoder.hpp"
#include "durability/distributed/snapshot_value.hpp"
#include "durability/distributed/version.hpp"
#include "durability/distributed/wal.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/paths.hpp"
#include "durability/snapshot_decoder.hpp"
#include "durability/snapshot_value.hpp"
#include "durability/version.hpp"
#include "durability/wal.hpp"
#include "glue/communication.hpp"
// TODO: WTF is typed value doing here?!
#include "query/typed_value.hpp"
#include "storage/address_types.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/indexes/label_property_index.hpp"
#include "transactions/type.hpp"
#include "utils/algorithm.hpp"
#include "utils/file.hpp"

View File

@ -1,13 +1,13 @@
#pragma once
#include <experimental/filesystem>
#include <experimental/optional>
#include <unordered_map>
#include <vector>
#include "durability/distributed/state_delta.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/recovery.capnp.h"
#include "storage/vertex_accessor.hpp"
#include "transactions/type.hpp"
#include "utils/serialization.hpp"
namespace database {
class GraphDb;
@ -36,22 +36,6 @@ struct RecoveryInfo {
bool operator!=(const RecoveryInfo &other) const { return !(*this == other); }
};
inline void Save(const RecoveryInfo &info,
capnp::RecoveryInfo::Builder *builder) {
builder->setDurabilityVersion(info.durability_version);
builder->setSnapshotTxId(info.snapshot_tx_id);
auto list_builder = builder->initWalRecovered(info.wal_recovered.size());
utils::SaveVector(info.wal_recovered, &list_builder);
}
inline void Load(RecoveryInfo *info,
const capnp::RecoveryInfo::Reader &reader) {
info->durability_version = reader.getDurabilityVersion();
info->snapshot_tx_id = reader.getSnapshotTxId();
auto list_reader = reader.getWalRecovered();
utils::LoadVector(&info->wal_recovered, list_reader);
}
// A data structure for exchanging info between main recovery function and
// snapshot and WAL recovery functions.
struct RecoveryData {
@ -69,51 +53,6 @@ struct RecoveryData {
}
};
inline void Save(const RecoveryData &data,
capnp::RecoveryData::Builder *builder) {
builder->setSnapshooterTxId(data.snapshooter_tx_id);
{
auto list_builder =
builder->initWalTxToRecover(data.wal_tx_to_recover.size());
utils::SaveVector(data.wal_tx_to_recover, &list_builder);
}
{
auto list_builder =
builder->initSnapshooterTxSnapshot(data.snapshooter_tx_snapshot.size());
utils::SaveVector(data.snapshooter_tx_snapshot, &list_builder);
}
{
auto list_builder = builder->initIndexes(data.indexes.size());
utils::SaveVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
std::pair<std::string, std::string>>(
data.indexes, &list_builder, [](auto *builder, const auto value) {
builder->setFirst(value.first);
builder->setSecond(value.second);
});
}
}
inline void Load(RecoveryData *data,
const capnp::RecoveryData::Reader &reader) {
data->snapshooter_tx_id = reader.getSnapshooterTxId();
{
auto list_reader = reader.getWalTxToRecover();
utils::LoadVector(&data->wal_tx_to_recover, list_reader);
}
{
auto list_reader = reader.getSnapshooterTxSnapshot();
utils::LoadVector(&data->snapshooter_tx_snapshot, list_reader);
}
{
auto list_reader = reader.getIndexes();
utils::LoadVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
std::pair<std::string, std::string>>(
&data->indexes, list_reader, [](const auto &reader) {
return std::make_pair(reader.getFirst(), reader.getSecond());
});
}
}
/** Reads snapshot metadata from the end of the file without messing up the
* hash. */
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,

View File

@ -0,0 +1,70 @@
#pragma once
#include "durability/distributed/recovery.hpp"
#include "durability/distributed/serialization.capnp.h"
#include "utils/serialization.hpp"
namespace durability {
inline void Save(const RecoveryInfo &info,
capnp::RecoveryInfo::Builder *builder) {
builder->setDurabilityVersion(info.durability_version);
builder->setSnapshotTxId(info.snapshot_tx_id);
auto list_builder = builder->initWalRecovered(info.wal_recovered.size());
utils::SaveVector(info.wal_recovered, &list_builder);
}
inline void Load(RecoveryInfo *info,
const capnp::RecoveryInfo::Reader &reader) {
info->durability_version = reader.getDurabilityVersion();
info->snapshot_tx_id = reader.getSnapshotTxId();
auto list_reader = reader.getWalRecovered();
utils::LoadVector(&info->wal_recovered, list_reader);
}
inline void Save(const RecoveryData &data,
capnp::RecoveryData::Builder *builder) {
builder->setSnapshooterTxId(data.snapshooter_tx_id);
{
auto list_builder =
builder->initWalTxToRecover(data.wal_tx_to_recover.size());
utils::SaveVector(data.wal_tx_to_recover, &list_builder);
}
{
auto list_builder =
builder->initSnapshooterTxSnapshot(data.snapshooter_tx_snapshot.size());
utils::SaveVector(data.snapshooter_tx_snapshot, &list_builder);
}
{
auto list_builder = builder->initIndexes(data.indexes.size());
utils::SaveVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
std::pair<std::string, std::string>>(
data.indexes, &list_builder, [](auto *builder, const auto value) {
builder->setFirst(value.first);
builder->setSecond(value.second);
});
}
}
inline void Load(RecoveryData *data,
const capnp::RecoveryData::Reader &reader) {
data->snapshooter_tx_id = reader.getSnapshooterTxId();
{
auto list_reader = reader.getWalTxToRecover();
utils::LoadVector(&data->wal_tx_to_recover, list_reader);
}
{
auto list_reader = reader.getSnapshooterTxSnapshot();
utils::LoadVector(&data->snapshooter_tx_snapshot, list_reader);
}
{
auto list_reader = reader.getIndexes();
utils::LoadVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
std::pair<std::string, std::string>>(
&data->indexes, list_reader, [](const auto &reader) {
return std::make_pair(reader.getFirst(), reader.getSecond());
});
}
}
} // namespace durability

View File

@ -1,14 +1,14 @@
#include "durability/snapshooter.hpp"
#include "durability/distributed/snapshooter.hpp"
#include <algorithm>
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "durability/distributed/snapshot_encoder.hpp"
#include "durability/distributed/version.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/paths.hpp"
#include "durability/snapshot_encoder.hpp"
#include "durability/version.hpp"
#include "utils/file.hpp"
namespace fs = std::experimental::filesystem;

View File

@ -3,7 +3,7 @@
#include <experimental/optional>
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "durability/snapshot_value.hpp"
#include "durability/distributed/snapshot_value.hpp"
namespace durability {

View File

@ -5,9 +5,10 @@
#include <vector>
#include "communication/bolt/v1/value.hpp"
// TODO: WTF is this doing here?
#include "query/typed_value.hpp"
#include "storage/address_types.hpp"
#include "storage/property_value.hpp"
#include "storage/common/property_value.hpp"
#include "storage/distributed/address_types.hpp"
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"

View File

@ -1,4 +1,4 @@
#include "database/state_delta.hpp"
#include "durability/distributed/state_delta.hpp"
#include <string>

View File

@ -3,13 +3,12 @@
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "database/state_delta.capnp.h"
#include "durability/hashed_file_reader.hpp"
#include "durability/hashed_file_writer.hpp"
#include "storage/address_types.hpp"
#include "storage/gid.hpp"
#include "storage/property_value.hpp"
#include "storage/serialization.hpp"
#include "storage/common/property_value.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/gid.hpp"
cpp<#
(lcp:namespace database)
@ -20,7 +19,7 @@ cpp<#
(lcp:capnp-namespace "database")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'storage "/storage/distributed/serialization.capnp")
(lcp:capnp-type-conversion "tx::TransactionId" "UInt64")
(lcp:capnp-type-conversion "gid::Gid" "UInt64")

View File

@ -0,0 +1,161 @@
#include "durability/distributed/wal.hpp"
#include "durability/distributed/version.hpp"
#include "durability/paths.hpp"
#include "utils/file.hpp"
#include "utils/flag_validation.hpp"
DEFINE_HIDDEN_int32(
wal_flush_interval_millis, 2,
"Interval between two write-ahead log flushes, in milliseconds.");
DEFINE_HIDDEN_int32(
wal_rotate_deltas_count, 10000,
"How many write-ahead deltas should be stored in a single WAL file "
"before rotating it.");
DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096,
"Write-ahead log buffer size.",
FLAG_IN_RANGE(1, 1 << 30));
namespace durability {
WriteAheadLog::WriteAheadLog(
int worker_id, const std::experimental::filesystem::path &durability_dir,
bool durability_enabled, bool synchronous_commit)
: deltas_{FLAGS_wal_buffer_size},
wal_file_{worker_id, durability_dir},
durability_enabled_(durability_enabled),
synchronous_commit_(synchronous_commit) {
if (durability_enabled_) {
utils::CheckDir(durability_dir);
}
}
WriteAheadLog::~WriteAheadLog() {
if (durability_enabled_) {
if (!synchronous_commit_) scheduler_.Stop();
wal_file_.Flush(deltas_);
}
}
WriteAheadLog::WalFile::WalFile(
int worker_id, const std::experimental::filesystem::path &durability_dir)
: worker_id_(worker_id), wal_dir_{durability_dir / kWalDir} {}
WriteAheadLog::WalFile::~WalFile() {
if (!current_wal_file_.empty()) writer_.Close();
}
void WriteAheadLog::WalFile::Init() {
if (!utils::EnsureDir(wal_dir_)) {
LOG(ERROR) << "Can't write to WAL directory: " << wal_dir_;
current_wal_file_ = std::experimental::filesystem::path();
} else {
current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_);
// TODO: Fix error handling, the encoder_ returns `true` or `false`.
try {
writer_.Open(current_wal_file_);
encoder_.WriteRAW(durability::kWalMagic.data(),
durability::kWalMagic.size());
encoder_.WriteInt(durability::kVersion);
writer_.Flush();
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to open write-ahead log file: "
<< current_wal_file_;
current_wal_file_ = std::experimental::filesystem::path();
}
}
latest_tx_ = 0;
current_wal_file_delta_count_ = 0;
}
void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
std::lock_guard<std::mutex> flush_lock(flush_mutex_);
if (current_wal_file_.empty()) {
LOG(ERROR) << "Write-ahead log file uninitialized, discarding data.";
buffer.clear();
return;
}
try {
while (true) {
auto delta = buffer.pop();
if (!delta) break;
latest_tx_ = std::max(latest_tx_, delta->transaction_id);
delta->Encode(writer_, encoder_);
writer_.Flush();
if (++current_wal_file_delta_count_ >= FLAGS_wal_rotate_deltas_count)
RotateFile();
}
writer_.Flush();
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to write to write-ahead log, discarding data.";
buffer.clear();
return;
} catch (std::experimental::filesystem::filesystem_error &) {
LOG(ERROR) << "Failed to rotate write-ahead log.";
buffer.clear();
return;
}
}
void WriteAheadLog::WalFile::RotateFile() {
writer_.Flush();
writer_.Close();
std::experimental::filesystem::rename(
current_wal_file_,
WalFilenameForTransactionId(wal_dir_, worker_id_, latest_tx_));
Init();
}
void WriteAheadLog::Init() {
if (durability_enabled_) {
enabled_ = true;
wal_file_.Init();
if (!synchronous_commit_) {
scheduler_.Run("WAL",
std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(deltas_); });
}
}
}
void WriteAheadLog::Emplace(const database::StateDelta &delta) {
if (durability_enabled_ && enabled_) {
deltas_.emplace(delta);
if (synchronous_commit_ && IsStateDeltaTransactionEnd(delta)) {
wal_file_.Flush(deltas_);
}
}
}
bool WriteAheadLog::IsStateDeltaTransactionEnd(
const database::StateDelta &delta) {
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_COMMIT:
case database::StateDelta::Type::TRANSACTION_ABORT:
return true;
case database::StateDelta::Type::TRANSACTION_BEGIN:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::ADD_OUT_EDGE:
case database::StateDelta::Type::REMOVE_OUT_EDGE:
case database::StateDelta::Type::ADD_IN_EDGE:
case database::StateDelta::Type::REMOVE_IN_EDGE:
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
case database::StateDelta::Type::ADD_LABEL:
case database::StateDelta::Type::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_VERTEX:
case database::StateDelta::Type::REMOVE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
return false;
}
}
void WriteAheadLog::Flush() {
if (enabled_) {
wal_file_.Flush(deltas_);
}
}
} // namespace durability

View File

@ -9,10 +9,10 @@
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "data_structures/ring_buffer.hpp"
#include "database/state_delta.hpp"
#include "storage/gid.hpp"
#include "storage/property_value.hpp"
#include "storage/types.hpp"
#include "durability/distributed/state_delta.hpp"
#include "storage/common/property_value.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/gid.hpp"
#include "transactions/type.hpp"
#include "utils/scheduler.hpp"

View File

@ -0,0 +1,503 @@
#include "durability/single_node/recovery.hpp"
#include <experimental/filesystem>
#include <limits>
#include <unordered_map>
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/paths.hpp"
#include "durability/single_node/snapshot_decoder.hpp"
#include "durability/single_node/snapshot_value.hpp"
#include "durability/single_node/version.hpp"
#include "durability/single_node/wal.hpp"
#include "glue/communication.hpp"
// TODO: WTF is typed value doing here?!
#include "query/typed_value.hpp"
#include "storage/single_node/address_types.hpp"
#include "storage/single_node/indexes/label_property_index.hpp"
#include "transactions/type.hpp"
#include "utils/algorithm.hpp"
#include "utils/file.hpp"
namespace fs = std::experimental::filesystem;
namespace durability {
using communication::bolt::Value;
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash) {
auto pos = buffer.Tellg();
auto offset = sizeof(vertex_count) + sizeof(edge_count) + sizeof(hash);
buffer.Seek(-offset, std::ios_base::end);
bool r_val = buffer.ReadType(vertex_count, false) &&
buffer.ReadType(edge_count, false) &&
buffer.ReadType(hash, false);
buffer.Seek(pos);
return r_val;
}
bool VersionConsistency(const fs::path &durability_dir) {
for (const auto &durability_type : {kSnapshotDir, kWalDir}) {
auto recovery_dir = durability_dir / durability_type;
if (!fs::exists(recovery_dir) || !fs::is_directory(recovery_dir)) continue;
for (const auto &file : fs::directory_iterator(recovery_dir)) {
HashedFileReader reader;
SnapshotDecoder<HashedFileReader> decoder(reader);
// The following checks are ok because we are only trying to detect
// version inconsistencies.
if (!reader.Open(fs::path(file))) continue;
std::array<uint8_t, 4> target_magic_number =
(durability_type == kSnapshotDir) ? durability::kSnapshotMagic
: durability::kWalMagic;
std::array<uint8_t, 4> magic_number;
if (!reader.Read(magic_number.data(), magic_number.size())) continue;
if (magic_number != target_magic_number) continue;
if (reader.EndOfFile()) continue;
Value dv;
if (!decoder.ReadValue(&dv, Value::Type::Int) ||
dv.ValueInt() != durability::kVersion)
return false;
}
}
return true;
}
bool DistributedVersionConsistency(const int64_t master_version) {
return durability::kVersion == master_version;
}
bool ContainsDurabilityFiles(const fs::path &durability_dir) {
for (const auto &durability_type : {kSnapshotDir, kWalDir}) {
auto recovery_dir = durability_dir / durability_type;
if (fs::exists(recovery_dir) && fs::is_directory(recovery_dir) &&
!fs::is_empty(recovery_dir))
return true;
}
return false;
}
void MoveToBackup(const fs::path &durability_dir) {
auto backup_dir = durability_dir / kBackupDir;
utils::CheckDir(backup_dir);
utils::CheckDir(backup_dir / kSnapshotDir);
utils::CheckDir(backup_dir / kWalDir);
for (const auto &durability_type : {kSnapshotDir, kWalDir}) {
auto recovery_dir = durability_dir / durability_type;
if (!fs::exists(recovery_dir) || !fs::is_directory(recovery_dir)) continue;
for (const auto &file : fs::directory_iterator(recovery_dir)) {
auto filename = fs::path(file).filename();
fs::rename(file, backup_dir / durability_type / filename);
}
}
}
namespace {
using communication::bolt::Value;
#define RETURN_IF_NOT(condition) \
if (!(condition)) { \
reader.Close(); \
return false; \
}
bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
RecoveryData *recovery_data, int worker_id) {
HashedFileReader reader;
SnapshotDecoder<HashedFileReader> decoder(reader);
RETURN_IF_NOT(reader.Open(snapshot_file));
auto magic_number = durability::kSnapshotMagic;
reader.Read(magic_number.data(), magic_number.size());
RETURN_IF_NOT(magic_number == durability::kSnapshotMagic);
// Read the vertex and edge count, and the hash, from the end of the snapshot.
int64_t vertex_count;
int64_t edge_count;
uint64_t hash;
RETURN_IF_NOT(
durability::ReadSnapshotSummary(reader, vertex_count, edge_count, hash));
Value dv;
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) &&
dv.ValueInt() == durability::kVersion);
// Checks worker id was set correctly
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) &&
dv.ValueInt() == worker_id);
// Vertex and edge generator ids
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int));
uint64_t vertex_generator_cnt = dv.ValueInt();
db->storage().VertexGenerator().SetId(std::max(
db->storage().VertexGenerator().LocalCount(), vertex_generator_cnt));
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int));
uint64_t edge_generator_cnt = dv.ValueInt();
db->storage().EdgeGenerator().SetId(
std::max(db->storage().EdgeGenerator().LocalCount(), edge_generator_cnt));
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int));
recovery_data->snapshooter_tx_id = dv.ValueInt();
// Transaction snapshot of the transaction that created the snapshot.
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::List));
for (const auto &value : dv.ValueList()) {
RETURN_IF_NOT(value.IsInt());
recovery_data->snapshooter_tx_snapshot.emplace_back(value.ValueInt());
}
// A list of label+property indexes.
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::List));
auto index_value = dv.ValueList();
for (auto it = index_value.begin(); it != index_value.end();) {
auto label = *it++;
RETURN_IF_NOT(it != index_value.end());
auto property = *it++;
RETURN_IF_NOT(label.IsString() && property.IsString());
recovery_data->indexes.emplace_back(label.ValueString(),
property.ValueString());
}
auto dba = db->Access();
std::unordered_map<gid::Gid,
std::pair<storage::VertexAddress, storage::VertexAddress>>
edge_gid_endpoints_mapping;
for (int64_t i = 0; i < vertex_count; ++i) {
auto vertex = decoder.ReadSnapshotVertex();
RETURN_IF_NOT(vertex);
auto vertex_accessor = dba->InsertVertex(vertex->gid, vertex->cypher_id);
for (const auto &label : vertex->labels) {
vertex_accessor.add_label(dba->Label(label));
}
for (const auto &property_pair : vertex->properties) {
vertex_accessor.PropsSet(dba->Property(property_pair.first),
glue::ToPropertyValue(property_pair.second));
}
auto vertex_record = vertex_accessor.GetNew();
for (const auto &edge : vertex->in) {
vertex_record->in_.emplace(edge.vertex, edge.address,
dba->EdgeType(edge.type));
edge_gid_endpoints_mapping[edge.address.gid()] = {
edge.vertex, vertex_accessor.GlobalAddress()};
}
for (const auto &edge : vertex->out) {
vertex_record->out_.emplace(edge.vertex, edge.address,
dba->EdgeType(edge.type));
edge_gid_endpoints_mapping[edge.address.gid()] = {
vertex_accessor.GlobalAddress(), edge.vertex};
}
}
auto vertex_transform_to_local_if_possible =
[&dba, worker_id](storage::VertexAddress &address) {
if (address.is_local()) return;
// If the worker id matches it should be a local apperance
if (address.worker_id() == worker_id) {
address = storage::VertexAddress(
dba->db().storage().LocalAddress<Vertex>(address.gid()));
CHECK(address.is_local()) << "Address should be local but isn't";
}
};
auto edge_transform_to_local_if_possible =
[&dba, worker_id](storage::EdgeAddress &address) {
if (address.is_local()) return;
// If the worker id matches it should be a local apperance
if (address.worker_id() == worker_id) {
address = storage::EdgeAddress(
dba->db().storage().LocalAddress<Edge>(address.gid()));
CHECK(address.is_local()) << "Address should be local but isn't";
}
};
Value dv_cypher_id;
for (int64_t i = 0; i < edge_count; ++i) {
RETURN_IF_NOT(
decoder.ReadValue(&dv, communication::bolt::Value::Type::Edge));
auto &edge = dv.ValueEdge();
// Read cypher_id
RETURN_IF_NOT(decoder.ReadValue(&dv_cypher_id,
communication::bolt::Value::Type::Int));
auto cypher_id = dv_cypher_id.ValueInt();
// We have to take full edge endpoints from vertices since the endpoints
// found here don't containt worker_id, and this can't be changed since this
// edges must be bolt-compliant
auto &edge_endpoints = edge_gid_endpoints_mapping[edge.id.AsUint()];
storage::VertexAddress from;
storage::VertexAddress to;
std::tie(from, to) = edge_endpoints;
// From and to are written in the global_address format and we should
// convert them back to local format for speedup - if possible
vertex_transform_to_local_if_possible(from);
vertex_transform_to_local_if_possible(to);
auto edge_accessor = dba->InsertOnlyEdge(from, to, dba->EdgeType(edge.type),
edge.id.AsUint(), cypher_id);
for (const auto &property_pair : edge.properties)
edge_accessor.PropsSet(dba->Property(property_pair.first),
glue::ToPropertyValue(property_pair.second));
}
// Vertex and edge counts are included in the hash. Re-read them to update the
// hash.
reader.ReadType(vertex_count);
reader.ReadType(edge_count);
if (!reader.Close() || reader.hash() != hash) {
dba->Abort();
return false;
}
// We have to replace global_ids with local ids where possible for all edges
// in every vertex and this can only be done after we inserted the edges; this
// is to speedup execution
for (auto &vertex_accessor : dba->Vertices(true)) {
auto vertex = vertex_accessor.GetNew();
auto iterate_and_transform =
[vertex_transform_to_local_if_possible,
edge_transform_to_local_if_possible](Edges &edges) {
Edges transformed;
for (auto &element : edges) {
auto vertex = element.vertex;
vertex_transform_to_local_if_possible(vertex);
auto edge = element.edge;
edge_transform_to_local_if_possible(edge);
transformed.emplace(vertex, edge, element.edge_type);
}
return transformed;
};
vertex->in_ = iterate_and_transform(vertex->in_);
vertex->out_ = iterate_and_transform(vertex->out_);
}
// Ensure that the next transaction ID in the recovered DB will be greater
// than the latest one we have recovered. Do this to make sure that
// subsequently created snapshots and WAL files will have transactional info
// that does not interfere with that found in previous snapshots and WAL.
tx::TransactionId max_id = recovery_data->snapshooter_tx_id;
auto &snap = recovery_data->snapshooter_tx_snapshot;
if (!snap.empty()) max_id = *std::max_element(snap.begin(), snap.end());
dba->db().tx_engine().EnsureNextIdGreater(max_id);
dba->Commit();
return true;
}
#undef RETURN_IF_NOT
std::vector<fs::path> GetWalFiles(const fs::path &wal_dir) {
// Get paths to all the WAL files and sort them (on date).
std::vector<fs::path> wal_files;
if (!fs::exists(wal_dir)) return {};
for (auto &wal_file : fs::directory_iterator(wal_dir))
wal_files.emplace_back(wal_file);
std::sort(wal_files.begin(), wal_files.end());
return wal_files;
}
bool ApplyOverDeltas(
const std::vector<fs::path> &wal_files, tx::TransactionId first_to_recover,
const std::function<void(const database::StateDelta &)> &f) {
for (auto &wal_file : wal_files) {
auto wal_file_max_tx_id = TransactionIdFromWalFilename(wal_file.filename());
if (!wal_file_max_tx_id || *wal_file_max_tx_id < first_to_recover) continue;
HashedFileReader wal_reader;
if (!wal_reader.Open(wal_file)) return false;
communication::bolt::Decoder<HashedFileReader> decoder(wal_reader);
auto magic_number = durability::kWalMagic;
wal_reader.Read(magic_number.data(), magic_number.size());
if (magic_number != durability::kWalMagic) return false;
Value dv;
if (!decoder.ReadValue(&dv, Value::Type::Int) ||
dv.ValueInt() != durability::kVersion)
return false;
while (true) {
auto delta = database::StateDelta::Decode(wal_reader, decoder);
if (!delta) break;
f(*delta);
}
}
return true;
}
auto FirstWalTxToRecover(const RecoveryData &recovery_data) {
auto &tx_sn = recovery_data.snapshooter_tx_snapshot;
auto first_to_recover = tx_sn.empty() ? recovery_data.snapshooter_tx_id + 1
: *std::min(tx_sn.begin(), tx_sn.end());
return first_to_recover;
}
std::vector<tx::TransactionId> ReadWalRecoverableTransactions(
const fs::path &wal_dir, database::GraphDb *db,
const RecoveryData &recovery_data) {
auto wal_files = GetWalFiles(wal_dir);
std::unordered_set<tx::TransactionId> committed_set;
auto first_to_recover = FirstWalTxToRecover(recovery_data);
ApplyOverDeltas(
wal_files, first_to_recover, [&](const database::StateDelta &delta) {
if (delta.transaction_id >= first_to_recover &&
delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) {
committed_set.insert(delta.transaction_id);
}
});
std::vector<tx::TransactionId> committed_tx_ids(committed_set.size());
for (auto id : committed_set) committed_tx_ids.push_back(id);
return committed_tx_ids;
}
} // anonymous namespace
RecoveryInfo RecoverOnlySnapshot(
const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data,
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id,
int worker_id) {
// Attempt to recover from snapshot files in reverse order (from newest
// backwards).
const auto snapshot_dir = durability_dir / kSnapshotDir;
std::vector<fs::path> snapshot_files;
if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
for (auto &file : fs::directory_iterator(snapshot_dir))
snapshot_files.emplace_back(file);
std::sort(snapshot_files.rbegin(), snapshot_files.rend());
for (auto &snapshot_file : snapshot_files) {
if (required_snapshot_tx_id) {
auto snapshot_file_tx_id =
TransactionIdFromSnapshotFilename(snapshot_file);
if (!snapshot_file_tx_id ||
snapshot_file_tx_id.value() != *required_snapshot_tx_id) {
LOG(INFO) << "Skipping snapshot file '" << snapshot_file
<< "' because it does not match the required snapshot tx id: "
<< *required_snapshot_tx_id;
continue;
}
}
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db, recovery_data, worker_id)) {
db->ReinitializeStorage();
recovery_data->Clear();
LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
continue;
} else {
LOG(INFO) << "Snapshot recovery successful.";
break;
}
}
// If snapshot recovery is required, and we failed, don't even deal with
// the WAL recovery.
if (required_snapshot_tx_id &&
recovery_data->snapshooter_tx_id != *required_snapshot_tx_id)
return {durability::kVersion, recovery_data->snapshooter_tx_id, {}};
return {durability::kVersion, recovery_data->snapshooter_tx_id,
ReadWalRecoverableTransactions(durability_dir / kWalDir, db,
*recovery_data)};
}
// TODO - finer-grained recovery feedback could be useful here.
void RecoverWal(const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data,
RecoveryTransactions *transactions) {
auto wal_dir = durability_dir / kWalDir;
auto wal_files = GetWalFiles(wal_dir);
// Track which transaction should be recovered first, and define logic for
// which transactions should be skipped in recovery.
auto &tx_sn = recovery_data->snapshooter_tx_snapshot;
auto first_to_recover = FirstWalTxToRecover(*recovery_data);
// Set of transactions which can be recovered, since not every transaction in
// wal can be recovered because it might not be present on some workers (there
// wasn't enough time for it to flush to disk or similar)
std::unordered_set<tx::TransactionId> common_wal_tx;
for (auto tx_id : recovery_data->wal_tx_to_recover)
common_wal_tx.insert(tx_id);
auto should_skip = [&tx_sn, recovery_data, &common_wal_tx,
first_to_recover](tx::TransactionId tx_id) {
return tx_id < first_to_recover ||
(tx_id < recovery_data->snapshooter_tx_id &&
!utils::Contains(tx_sn, tx_id)) ||
!utils::Contains(common_wal_tx, tx_id);
};
// Ensure that the next transaction ID in the recovered DB will be greater
// than the latest one we have recovered. Do this to make sure that
// subsequently created snapshots and WAL files will have transactional info
// that does not interfere with that found in previous snapshots and WAL.
tx::TransactionId max_observed_tx_id{0};
// Read all the WAL files whose max_tx_id is not smaller than
// min_tx_to_recover.
ApplyOverDeltas(
wal_files, first_to_recover, [&](const database::StateDelta &delta) {
max_observed_tx_id = std::max(max_observed_tx_id, delta.transaction_id);
if (should_skip(delta.transaction_id)) return;
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_BEGIN:
transactions->Begin(delta.transaction_id);
break;
case database::StateDelta::Type::TRANSACTION_ABORT:
transactions->Abort(delta.transaction_id);
break;
case database::StateDelta::Type::TRANSACTION_COMMIT:
transactions->Commit(delta.transaction_id);
break;
case database::StateDelta::Type::BUILD_INDEX:
// TODO index building might still be problematic in HA
recovery_data->indexes.emplace_back(delta.label_name,
delta.property_name);
break;
default:
transactions->Apply(delta);
}
});
// TODO when implementing proper error handling return one of the following:
// - WAL fully recovered
// - WAL partially recovered
// - WAL recovery error
db->tx_engine().EnsureNextIdGreater(max_observed_tx_id);
}
void RecoverIndexes(
database::GraphDb *db,
const std::vector<std::pair<std::string, std::string>> &indexes) {
auto db_accessor_indices = db->Access();
for (const auto &label_prop : indexes) {
const database::LabelPropertyIndex::Key key{
db_accessor_indices->Label(label_prop.first),
db_accessor_indices->Property(label_prop.second)};
db_accessor_indices->db().storage().label_property_index().CreateIndex(key);
db_accessor_indices->PopulateIndex(key);
db_accessor_indices->EnableIndex(key);
}
db_accessor_indices->Commit();
}
} // namespace durability

View File

@ -0,0 +1,135 @@
#pragma once
#include <experimental/filesystem>
#include <experimental/optional>
#include <unordered_map>
#include <vector>
#include "durability/hashed_file_reader.hpp"
#include "durability/single_node/state_delta.hpp"
#include "transactions/type.hpp"
namespace database {
class GraphDb;
};
namespace durability {
/// Stores info on what was (or needs to be) recovered from durability.
struct RecoveryInfo {
RecoveryInfo() {}
RecoveryInfo(const int64_t durability_version,
tx::TransactionId snapshot_tx_id,
const std::vector<tx::TransactionId> &wal_recovered)
: durability_version(durability_version),
snapshot_tx_id(snapshot_tx_id),
wal_recovered(wal_recovered) {}
int64_t durability_version;
tx::TransactionId snapshot_tx_id;
std::vector<tx::TransactionId> wal_recovered;
bool operator==(const RecoveryInfo &other) const {
return durability_version == other.durability_version &&
snapshot_tx_id == other.snapshot_tx_id &&
wal_recovered == other.wal_recovered;
}
bool operator!=(const RecoveryInfo &other) const { return !(*this == other); }
};
// A data structure for exchanging info between main recovery function and
// snapshot and WAL recovery functions.
struct RecoveryData {
tx::TransactionId snapshooter_tx_id{0};
std::vector<tx::TransactionId> wal_tx_to_recover{};
std::vector<tx::TransactionId> snapshooter_tx_snapshot;
// A collection into which the indexes should be added so they
// can be rebuilt at the end of the recovery transaction.
std::vector<std::pair<std::string, std::string>> indexes;
void Clear() {
snapshooter_tx_id = 0;
snapshooter_tx_snapshot.clear();
indexes.clear();
}
};
/** Reads snapshot metadata from the end of the file without messing up the
* hash. */
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash);
/**
* Checks version consistency within the durability directory.
*
* @param durability_dir - Path to durability directory.
* @return - True if snapshot and WAL versions are compatible with
* ` current memgraph binary.
*/
bool VersionConsistency(
const std::experimental::filesystem::path &durability_dir);
/**
* Checks whether the current memgraph binary (on a worker) is
* version consistent with the cluster master.
*
* @param master_version - Version of the master.
* @return - True if versions match.
*/
bool DistributedVersionConsistency(const int64_t master_version);
/**
* Checks whether the durability directory contains snapshot
* or write-ahead log file.
*
* @param durability_dir - Path to durability directory.
* @return - True if durability directory contains either a snapshot
* or WAL file.
*/
bool ContainsDurabilityFiles(
const std::experimental::filesystem::path &durabilty_dir);
/**
* Backup snapshots and WAL files to a backup folder.
*
* @param durability_dir - Path to durability directory.
*/
void MoveToBackup(const std::experimental::filesystem::path &durability_dir);
/**
* Recovers database from the latest possible snapshot. If recovering fails,
* false is returned and db_accessor aborts transaction, else true is returned
* and transaction is commited.
*
* @param durability_dir - Path to durability directory.
* @param db - The database to recover into.
* @param required_snapshot_tx_id - Only used on distributed worker. Indicates
* what the master recovered. The same snapshot must be recovered on the
* worker.
* @return - recovery info
*/
RecoveryInfo RecoverOnlySnapshot(
const std::experimental::filesystem::path &durability_dir,
database::GraphDb *db, durability::RecoveryData *recovery_data,
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id,
int worker_id);
/** Interface for accessing transactions during WAL recovery. */
class RecoveryTransactions {
public:
virtual ~RecoveryTransactions() {}
virtual void Begin(const tx::TransactionId &) = 0;
virtual void Abort(const tx::TransactionId &) = 0;
virtual void Commit(const tx::TransactionId &) = 0;
virtual void Apply(const database::StateDelta &) = 0;
};
void RecoverWal(const std::experimental::filesystem::path &durability_dir,
database::GraphDb *db, RecoveryData *recovery_data,
RecoveryTransactions *transactions);
void RecoverIndexes(
database::GraphDb *db,
const std::vector<std::pair<std::string, std::string>> &indexes);
} // namespace durability

View File

@ -0,0 +1,142 @@
#include "durability/single_node/snapshooter.hpp"
#include <algorithm>
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/paths.hpp"
#include "durability/single_node/snapshot_encoder.hpp"
#include "durability/single_node/version.hpp"
#include "utils/file.hpp"
namespace fs = std::experimental::filesystem;
namespace durability {
// Snapshot layout is described in durability/version.hpp
static_assert(durability::kVersion == 6,
"Wrong snapshot version, please update!");
namespace {
bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
database::GraphDbAccessor &dba, int worker_id) {
try {
HashedFileWriter buffer(snapshot_file);
SnapshotEncoder<HashedFileWriter> encoder(buffer);
int64_t vertex_num = 0, edge_num = 0;
encoder.WriteRAW(durability::kSnapshotMagic.data(),
durability::kSnapshotMagic.size());
encoder.WriteInt(durability::kVersion);
// Writes the worker id to snapshot, used to guarantee consistent cluster
// state after recovery
encoder.WriteInt(worker_id);
// Write the number of generated vertex and edges, used to recover
// generators internal states
encoder.WriteInt(db.storage().VertexGenerator().LocalCount());
encoder.WriteInt(db.storage().EdgeGenerator().LocalCount());
// Write the ID of the transaction doing the snapshot.
encoder.WriteInt(dba.transaction_id());
// Write the transaction snapshot into the snapshot. It's used when
// recovering from the combination of snapshot and write-ahead-log.
{
std::vector<communication::bolt::Value> tx_snapshot;
for (int64_t tx : dba.transaction().snapshot())
tx_snapshot.emplace_back(tx);
encoder.WriteList(tx_snapshot);
}
// Write label+property indexes as list ["label", "property", ...]
{
std::vector<communication::bolt::Value> index_vec;
for (const auto &key : dba.GetIndicesKeys()) {
index_vec.emplace_back(dba.LabelName(key.label_));
index_vec.emplace_back(dba.PropertyName(key.property_));
}
encoder.WriteList(index_vec);
}
for (const auto &vertex : dba.Vertices(false)) {
encoder.WriteSnapshotVertex(vertex);
vertex_num++;
}
for (const auto &edge : dba.Edges(false)) {
encoder.WriteEdge(glue::ToBoltEdge(edge));
encoder.WriteInt(edge.CypherId());
edge_num++;
}
buffer.WriteValue(vertex_num);
buffer.WriteValue(edge_num);
buffer.WriteValue(buffer.hash());
buffer.Close();
} catch (const std::ifstream::failure &) {
if (fs::exists(snapshot_file) && !fs::remove(snapshot_file)) {
LOG(ERROR) << "Error while removing corrupted snapshot file: "
<< snapshot_file;
}
return false;
}
return true;
}
// Removes snapshot files so that only `max_retained` latest ones are kept. If
// `max_retained == -1`, all the snapshots are retained.
void RemoveOldSnapshots(const fs::path &snapshot_dir, int max_retained) {
if (max_retained == -1) return;
std::vector<fs::path> files;
for (auto &file : fs::directory_iterator(snapshot_dir))
files.push_back(file.path());
if (static_cast<int>(files.size()) <= max_retained) return;
sort(files.begin(), files.end());
for (int i = 0; i < static_cast<int>(files.size()) - max_retained; ++i) {
if (!fs::remove(files[i])) {
LOG(ERROR) << "Error while removing file: " << files[i];
}
}
}
// Removes write-ahead log files that are no longer necessary (they don't get
// used when recovering from the latest snapshot.
void RemoveOldWals(const fs::path &wal_dir,
const tx::Transaction &snapshot_transaction) {
if (!fs::exists(wal_dir)) return;
// We can remove all the WAL files that will not be used when restoring from
// the snapshot created in the given transaction.
auto min_trans_id = snapshot_transaction.snapshot().empty()
? snapshot_transaction.id_ + 1
: snapshot_transaction.snapshot().front();
for (auto &wal_file : fs::directory_iterator(wal_dir)) {
auto tx_id = TransactionIdFromWalFilename(wal_file.path().filename());
if (tx_id && tx_id.value() < min_trans_id) {
bool result = fs::remove(wal_file);
DCHECK(result) << "Unable to delete old wal file: " << wal_file;
}
}
}
} // namespace
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
int worker_id, const fs::path &durability_dir,
int snapshot_max_retained) {
if (!utils::EnsureDir(durability_dir / kSnapshotDir)) return false;
const auto snapshot_file =
MakeSnapshotPath(durability_dir, worker_id, dba.transaction_id());
if (fs::exists(snapshot_file)) return false;
if (Encode(snapshot_file, db, dba, worker_id)) {
RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained);
RemoveOldWals(durability_dir / kWalDir, dba.transaction());
return true;
} else {
std::error_code error_code; // Just for exception suppression.
fs::remove(snapshot_file, error_code);
return false;
}
}
} // namespace durability

View File

@ -0,0 +1,21 @@
#pragma once
#include <experimental/filesystem>
#include "database/graph_db.hpp"
namespace durability {
/**
* Make snapshot and save it in snapshots folder. Returns true if successful.
* @param db - database for which we are creating a snapshot
* @param dba - db accessor with which we are creating a snapshot (reading data)
* @param durability_dir - directory where durability data is stored.
* @param snapshot_max_retained - maximum number of snapshots to retain.
*/
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
int worker_id,
const std::experimental::filesystem::path &durability_dir,
int snapshot_max_retained);
} // namespace durability

View File

@ -0,0 +1,105 @@
#pragma once
#include <experimental/optional>
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "durability/single_node/snapshot_value.hpp"
namespace durability {
template <typename Buffer>
class SnapshotDecoder : public communication::bolt::Decoder<Buffer> {
public:
explicit SnapshotDecoder(Buffer &buffer)
: communication::bolt::Decoder<Buffer>(buffer) {}
std::experimental::optional<SnapshotVertex> ReadSnapshotVertex() {
communication::bolt::Value dv;
SnapshotVertex vertex;
// Read global id, labels and properties of the vertex
if (!communication::bolt::Decoder<Buffer>::ReadValue(
&dv, communication::bolt::Value::Type::Vertex)) {
DLOG(WARNING) << "Unable to read snapshot vertex";
return std::experimental::nullopt;
}
auto &read_vertex = dv.ValueVertex();
vertex.gid = read_vertex.id.AsUint();
vertex.labels = read_vertex.labels;
vertex.properties = read_vertex.properties;
// Read cypher_id
if (!communication::bolt::Decoder<Buffer>::ReadValue(
&dv, communication::bolt::Value::Type::Int)) {
DLOG(WARNING) << "Unable to read vertex cypher_id";
return std::experimental::nullopt;
}
vertex.cypher_id = dv.ValueInt();
// Read in edges
if (!communication::bolt::Decoder<Buffer>::ReadValue(
&dv, communication::bolt::Value::Type::Int)) {
DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of in "
"edges in vertex!";
return std::experimental::nullopt;
}
for (int i = 0; i < dv.ValueInt(); ++i) {
auto edge = ReadSnapshotEdge();
if (!edge) return std::experimental::nullopt;
vertex.in.emplace_back(*edge);
}
// Read out edges
if (!communication::bolt::Decoder<Buffer>::ReadValue(
&dv, communication::bolt::Value::Type::Int)) {
DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of out "
"edges in vertex!";
return std::experimental::nullopt;
}
for (int i = 0; i < dv.ValueInt(); ++i) {
auto edge = ReadSnapshotEdge();
if (!edge) return std::experimental::nullopt;
vertex.out.emplace_back(*edge);
}
VLOG(20) << "[ReadSnapshotVertex] Success";
return vertex;
}
private:
std::experimental::optional<InlinedVertexEdge> ReadSnapshotEdge() {
communication::bolt::Value dv;
InlinedVertexEdge edge;
VLOG(20) << "[ReadSnapshotEdge] Start";
// Read global id of this edge
if (!communication::bolt::Decoder<Buffer>::ReadValue(
&dv, communication::bolt::Value::Type::Int)) {
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read Global ID!";
return std::experimental::nullopt;
}
edge.address = storage::EdgeAddress(static_cast<uint64_t>(dv.ValueInt()));
// Read global vertex id of the other side of the edge
// (global id of from/to vertexes).
if (!communication::bolt::Decoder<Buffer>::ReadValue(
&dv, communication::bolt::Value::Type::Int)) {
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read from/to address!";
return std::experimental::nullopt;
}
edge.vertex = storage::VertexAddress(static_cast<uint64_t>(dv.ValueInt()));
// Read edge type
if (!communication::bolt::Decoder<Buffer>::ReadValue(
&dv, communication::bolt::Value::Type::String)) {
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read type!";
return std::experimental::nullopt;
}
edge.type = dv.ValueString();
VLOG(20) << "[ReadSnapshotEdge] Success";
return edge;
}
};
}; // namespace durability

View File

@ -0,0 +1,58 @@
#pragma once
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "glue/communication.hpp"
#include "utils/cast.hpp"
namespace durability {
template <typename Buffer>
class SnapshotEncoder : public communication::bolt::BaseEncoder<Buffer> {
public:
explicit SnapshotEncoder(Buffer &buffer)
: communication::bolt::BaseEncoder<Buffer>(buffer) {}
void WriteSnapshotVertex(const VertexAccessor &vertex) {
communication::bolt::BaseEncoder<Buffer>::WriteVertex(
glue::ToBoltVertex(vertex));
// Write cypher_id
this->WriteInt(vertex.CypherId());
// Write in edges without properties
this->WriteUInt(vertex.in_degree());
auto edges_in = vertex.in();
for (const auto &edge : edges_in) {
this->WriteSnapshotEdge(edge, true);
}
// Write out edges without properties
this->WriteUInt(vertex.out_degree());
auto edges_out = vertex.out();
for (const auto &edge : edges_out) {
this->WriteSnapshotEdge(edge, false);
}
}
private:
void WriteUInt(const uint64_t &value) {
this->WriteInt(utils::MemcpyCast<int64_t>(value));
}
// Writes edge without properties
void WriteSnapshotEdge(const EdgeAccessor &edge, bool write_from) {
// Write global id of the edge
WriteUInt(edge.GlobalAddress().raw());
// Write to/from global id
if (write_from)
WriteUInt(edge.from().GlobalAddress().raw());
else
WriteUInt(edge.to().GlobalAddress().raw());
// Write type
this->WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType()));
}
};
} // namespace durability

View File

@ -0,0 +1,46 @@
#pragma once
#include <map>
#include <string>
#include <vector>
#include "communication/bolt/v1/value.hpp"
// TODO: WTF is this doing here?
#include "query/typed_value.hpp"
#include "storage/common/property_value.hpp"
#include "storage/single_node/address_types.hpp"
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
namespace durability {
/** Forward declartion of SnapshotEdge. */
struct InlinedVertexEdge;
/**
* Structure used when reading a Vertex with the decoder.
* The decoder writes data into this structure.
*/
struct SnapshotVertex {
gid::Gid gid;
int64_t cypher_id;
std::vector<std::string> labels;
std::map<std::string, communication::bolt::Value> properties;
// Vector of edges without properties
std::vector<InlinedVertexEdge> in;
std::vector<InlinedVertexEdge> out;
};
/**
* Structure used when reading an Edge with the snapshot decoder.
* The decoder writes data into this structure.
*/
struct InlinedVertexEdge {
// Addresses down below must always be global_address and never direct
// pointers to a record.
storage::EdgeAddress address;
storage::VertexAddress vertex;
std::string type;
};
} // namespace durability

View File

@ -0,0 +1,411 @@
#include "durability/single_node/state_delta.hpp"
#include <string>
#include "communication/bolt/v1/value.hpp"
#include "database/graph_db_accessor.hpp"
#include "glue/communication.hpp"
namespace database {
StateDelta StateDelta::TxBegin(tx::TransactionId tx_id) {
return {StateDelta::Type::TRANSACTION_BEGIN, tx_id};
}
StateDelta StateDelta::TxCommit(tx::TransactionId tx_id) {
return {StateDelta::Type::TRANSACTION_COMMIT, tx_id};
}
StateDelta StateDelta::TxAbort(tx::TransactionId tx_id) {
return {StateDelta::Type::TRANSACTION_ABORT, tx_id};
}
StateDelta StateDelta::CreateVertex(tx::TransactionId tx_id, gid::Gid vertex_id,
int64_t cypher_id) {
StateDelta op(StateDelta::Type::CREATE_VERTEX, tx_id);
op.vertex_id = vertex_id;
op.cypher_id = cypher_id;
return op;
}
StateDelta StateDelta::CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id,
int64_t cypher_id, gid::Gid vertex_from_id,
gid::Gid vertex_to_id,
storage::EdgeType edge_type,
const std::string &edge_type_name) {
StateDelta op(StateDelta::Type::CREATE_EDGE, tx_id);
op.edge_id = edge_id;
op.cypher_id = cypher_id;
op.vertex_from_id = vertex_from_id;
op.vertex_to_id = vertex_to_id;
op.edge_type = edge_type;
op.edge_type_name = edge_type_name;
return op;
}
StateDelta StateDelta::AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::VertexAddress vertex_to_address,
storage::EdgeAddress edge_address,
storage::EdgeType edge_type) {
CHECK(vertex_to_address.is_remote() && edge_address.is_remote())
<< "WAL can only contain global addresses.";
StateDelta op(StateDelta::Type::ADD_OUT_EDGE, tx_id);
op.vertex_id = vertex_id;
op.vertex_to_address = vertex_to_address;
op.edge_address = edge_address;
op.edge_type = edge_type;
return op;
}
StateDelta StateDelta::RemoveOutEdge(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote()) << "WAL can only contain global addresses.";
StateDelta op(StateDelta::Type::REMOVE_OUT_EDGE, tx_id);
op.vertex_id = vertex_id;
op.edge_address = edge_address;
return op;
}
StateDelta StateDelta::AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::VertexAddress vertex_from_address,
storage::EdgeAddress edge_address,
storage::EdgeType edge_type) {
CHECK(vertex_from_address.is_remote() && edge_address.is_remote())
<< "WAL can only contain global addresses.";
StateDelta op(StateDelta::Type::ADD_IN_EDGE, tx_id);
op.vertex_id = vertex_id;
op.vertex_from_address = vertex_from_address;
op.edge_address = edge_address;
op.edge_type = edge_type;
return op;
}
StateDelta StateDelta::RemoveInEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote()) << "WAL can only contain global addresses.";
StateDelta op(StateDelta::Type::REMOVE_IN_EDGE, tx_id);
op.vertex_id = vertex_id;
op.edge_address = edge_address;
return op;
}
StateDelta StateDelta::PropsSetVertex(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::Property property,
const std::string &property_name,
const PropertyValue &value) {
StateDelta op(StateDelta::Type::SET_PROPERTY_VERTEX, tx_id);
op.vertex_id = vertex_id;
op.property = property;
op.property_name = property_name;
op.value = value;
return op;
}
StateDelta StateDelta::PropsSetEdge(tx::TransactionId tx_id, gid::Gid edge_id,
storage::Property property,
const std::string &property_name,
const PropertyValue &value) {
StateDelta op(StateDelta::Type::SET_PROPERTY_EDGE, tx_id);
op.edge_id = edge_id;
op.property = property;
op.property_name = property_name;
op.value = value;
return op;
}
StateDelta StateDelta::AddLabel(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::Label label,
const std::string &label_name) {
StateDelta op(StateDelta::Type::ADD_LABEL, tx_id);
op.vertex_id = vertex_id;
op.label = label;
op.label_name = label_name;
return op;
}
StateDelta StateDelta::RemoveLabel(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::Label label,
const std::string &label_name) {
StateDelta op(StateDelta::Type::REMOVE_LABEL, tx_id);
op.vertex_id = vertex_id;
op.label = label;
op.label_name = label_name;
return op;
}
StateDelta StateDelta::RemoveVertex(tx::TransactionId tx_id, gid::Gid vertex_id,
bool check_empty) {
StateDelta op(StateDelta::Type::REMOVE_VERTEX, tx_id);
op.vertex_id = vertex_id;
op.check_empty = check_empty;
return op;
}
StateDelta StateDelta::RemoveEdge(tx::TransactionId tx_id, gid::Gid edge_id) {
StateDelta op(StateDelta::Type::REMOVE_EDGE, tx_id);
op.edge_id = edge_id;
return op;
}
StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id, storage::Label label,
const std::string &label_name,
storage::Property property,
const std::string &property_name) {
StateDelta op(StateDelta::Type::BUILD_INDEX, tx_id);
op.label = label;
op.label_name = label_name;
op.property = property;
op.property_name = property_name;
return op;
}
void StateDelta::Encode(
HashedFileWriter &writer,
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) const {
encoder.WriteInt(static_cast<int64_t>(type));
encoder.WriteInt(static_cast<int64_t>(transaction_id));
switch (type) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
encoder.WriteInt(vertex_id);
encoder.WriteInt(cypher_id);
break;
case Type::CREATE_EDGE:
encoder.WriteInt(edge_id);
encoder.WriteInt(cypher_id);
encoder.WriteInt(vertex_from_id);
encoder.WriteInt(vertex_to_id);
encoder.WriteInt(edge_type.Id());
encoder.WriteString(edge_type_name);
break;
case Type::ADD_OUT_EDGE:
encoder.WriteInt(vertex_id);
encoder.WriteInt(vertex_to_address.raw());
encoder.WriteInt(edge_address.raw());
encoder.WriteInt(edge_type.Id());
break;
case Type::REMOVE_OUT_EDGE:
encoder.WriteInt(vertex_id);
encoder.WriteInt(edge_address.raw());
break;
case Type::ADD_IN_EDGE:
encoder.WriteInt(vertex_id);
encoder.WriteInt(vertex_from_address.raw());
encoder.WriteInt(edge_address.raw());
encoder.WriteInt(edge_type.Id());
break;
case Type::REMOVE_IN_EDGE:
encoder.WriteInt(vertex_id);
encoder.WriteInt(edge_address.raw());
break;
case Type::SET_PROPERTY_VERTEX:
encoder.WriteInt(vertex_id);
encoder.WriteInt(property.Id());
encoder.WriteString(property_name);
encoder.WriteValue(glue::ToBoltValue(value));
break;
case Type::SET_PROPERTY_EDGE:
encoder.WriteInt(edge_id);
encoder.WriteInt(property.Id());
encoder.WriteString(property_name);
encoder.WriteValue(glue::ToBoltValue(value));
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
encoder.WriteInt(vertex_id);
encoder.WriteInt(label.Id());
encoder.WriteString(label_name);
break;
case Type::REMOVE_VERTEX:
encoder.WriteInt(vertex_id);
break;
case Type::REMOVE_EDGE:
encoder.WriteInt(edge_id);
break;
case Type::BUILD_INDEX:
encoder.WriteInt(label.Id());
encoder.WriteString(label_name);
encoder.WriteInt(property.Id());
encoder.WriteString(property_name);
break;
}
writer.WriteValue(writer.hash());
}
#define DECODE_MEMBER(member, value_f) \
if (!decoder.ReadValue(&dv)) return nullopt; \
r_val.member = dv.value_f();
#define DECODE_MEMBER_CAST(member, value_f, type) \
if (!decoder.ReadValue(&dv)) return nullopt; \
r_val.member = static_cast<type>(dv.value_f());
std::experimental::optional<StateDelta> StateDelta::Decode(
HashedFileReader &reader,
communication::bolt::Decoder<HashedFileReader> &decoder) {
using std::experimental::nullopt;
StateDelta r_val;
// The decoded value used as a temporary while decoding.
communication::bolt::Value dv;
try {
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.type = static_cast<enum StateDelta::Type>(dv.ValueInt());
DECODE_MEMBER(transaction_id, ValueInt)
switch (r_val.type) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER(cypher_id, ValueInt)
break;
case Type::CREATE_EDGE:
DECODE_MEMBER(edge_id, ValueInt)
DECODE_MEMBER(cypher_id, ValueInt)
DECODE_MEMBER(vertex_from_id, ValueInt)
DECODE_MEMBER(vertex_to_id, ValueInt)
DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType)
DECODE_MEMBER(edge_type_name, ValueString)
break;
case Type::ADD_OUT_EDGE:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(vertex_to_address, ValueInt, storage::VertexAddress)
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType)
break;
case Type::REMOVE_OUT_EDGE:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
break;
case Type::ADD_IN_EDGE:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(vertex_from_address, ValueInt,
storage::VertexAddress)
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType)
break;
case Type::REMOVE_IN_EDGE:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
break;
case Type::SET_PROPERTY_VERTEX:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(property, ValueInt, storage::Property)
DECODE_MEMBER(property_name, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value = glue::ToPropertyValue(dv);
break;
case Type::SET_PROPERTY_EDGE:
DECODE_MEMBER(edge_id, ValueInt)
DECODE_MEMBER_CAST(property, ValueInt, storage::Property)
DECODE_MEMBER(property_name, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value = glue::ToPropertyValue(dv);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(label, ValueInt, storage::Label)
DECODE_MEMBER(label_name, ValueString)
break;
case Type::REMOVE_VERTEX:
DECODE_MEMBER(vertex_id, ValueInt)
break;
case Type::REMOVE_EDGE:
DECODE_MEMBER(edge_id, ValueInt)
break;
case Type::BUILD_INDEX:
DECODE_MEMBER_CAST(label, ValueInt, storage::Label)
DECODE_MEMBER(label_name, ValueString)
DECODE_MEMBER_CAST(property, ValueInt, storage::Property)
DECODE_MEMBER(property_name, ValueString)
break;
}
auto decoder_hash = reader.hash();
uint64_t encoded_hash;
if (!reader.ReadType(encoded_hash, true)) return nullopt;
if (decoder_hash != encoded_hash) return nullopt;
return r_val;
} catch (communication::bolt::ValueException &) {
return nullopt;
} catch (std::ifstream::failure &) {
return nullopt;
}
}
#undef DECODE_MEMBER
void StateDelta::Apply(GraphDbAccessor &dba) const {
switch (type) {
// Transactional state is not recovered.
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
LOG(FATAL) << "Transaction handling not handled in Apply";
break;
case Type::CREATE_VERTEX:
dba.InsertVertex(vertex_id, cypher_id);
break;
case Type::CREATE_EDGE: {
auto from = dba.FindVertex(vertex_from_id, true);
auto to = dba.FindVertex(vertex_to_id, true);
dba.InsertEdge(from, to, dba.EdgeType(edge_type_name), edge_id,
cypher_id);
break;
}
case Type::ADD_OUT_EDGE:
case Type::REMOVE_OUT_EDGE:
case Type::ADD_IN_EDGE:
case Type::REMOVE_IN_EDGE:
LOG(FATAL) << "Partial edge creation/deletion not yet supported in Apply";
case Type::SET_PROPERTY_VERTEX: {
auto vertex = dba.FindVertex(vertex_id, true);
vertex.PropsSet(dba.Property(property_name), value);
break;
}
case Type::SET_PROPERTY_EDGE: {
auto edge = dba.FindEdge(edge_id, true);
edge.PropsSet(dba.Property(property_name), value);
break;
}
case Type::ADD_LABEL: {
auto vertex = dba.FindVertex(vertex_id, true);
vertex.add_label(dba.Label(label_name));
break;
}
case Type::REMOVE_LABEL: {
auto vertex = dba.FindVertex(vertex_id, true);
vertex.remove_label(dba.Label(label_name));
break;
}
case Type::REMOVE_VERTEX: {
auto vertex = dba.FindVertex(vertex_id, true);
dba.DetachRemoveVertex(vertex);
break;
}
case Type::REMOVE_EDGE: {
auto edge = dba.FindEdge(edge_id, true);
dba.RemoveEdge(edge);
break;
}
case Type::BUILD_INDEX: {
LOG(FATAL) << "Index handling not handled in Apply";
break;
}
}
}
}; // namespace database

View File

@ -0,0 +1,149 @@
#>cpp
#pragma once
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/hashed_file_writer.hpp"
#include "storage/common/property_value.hpp"
#include "storage/common/types.hpp"
#include "storage/single_node/address_types.hpp"
#include "storage/single_node/gid.hpp"
cpp<#
(lcp:namespace database)
#>cpp
class GraphDbAccessor;
cpp<#
(lcp:define-struct state-delta ()
(
;; Members valid for every delta.
(type "Type")
(transaction-id "tx::TransactionId")
;; Members valid only for some deltas, see StateDelta::Type comments above.
;; TODO: when preparing the WAL for distributed, most likely remove Gids and
;; only keep addresses.
(vertex-id "gid::Gid")
(edge-id "gid::Gid")
(cypher-id :int64_t)
(edge-address "storage::EdgeAddress")
(vertex-from-id "gid::Gid")
(vertex-from-address "storage::VertexAddress")
(vertex-to-id "gid::Gid")
(vertex-to-address "storage::VertexAddress")
(edge-type "storage::EdgeType")
(edge-type-name "std::string")
(property "storage::Property")
(property-name "std::string")
(value "PropertyValue" :initval "PropertyValue::Null")
(label "storage::Label")
(label-name "std::string")
(check-empty :bool))
(:documentation
"Describes single change to the database state. Used for durability (WAL) and
state communication over network in HA and for distributed remote storage
changes.
Labels, Properties and EdgeTypes are stored both as values (integers) and
strings (their names). The values are used when applying deltas in a running
database. Names are used when recovering the database as it's not guaranteed
that after recovery the old name<->value mapping will be preserved.
TODO: ensure the mapping is preserved after recovery and don't save strings
in StateDeltas.")
(:public
(lcp:define-enum type
(transaction-begin
transaction-commit
transaction-abort
create-vertex ;; vertex_id
create-edge ;; edge_id, from_vertex_id, to_vertex_id, edge_type, edge_type_name
add-out-edge ;; vertex_id, edge_address, vertex_to_address, edge_type
remove-out-edge ;; vertex_id, edge_address
add-in-edge ;; vertex_id, edge_address, vertex_from_address, edge_type
remove-in-edge ;; vertex_id, edge_address
set-property-vertex ;; vertex_id, property, property_name, property_value
set-property-edge ;; edge_id, property, property_name, property_value
;; remove property is done by setting a PropertyValue::Null
add-label ;; vertex_id, label, label_name
remove-label ;; vertex_id, label, label_name
remove-vertex ;; vertex_id, check_empty
remove-edge ;; edge_id
build-index ;; label, label_name, property, property_name
)
(:documentation
"Defines StateDelta type. For each type the comment indicates which values
need to be stored. All deltas have the transaction_id member, so that's
omitted in the comment."))
#>cpp
StateDelta() = default;
StateDelta(const enum Type &type, tx::TransactionId tx_id)
: type(type), transaction_id(tx_id) {}
/** Attempts to decode a StateDelta from the given decoder. Returns the
* decoded value if successful, otherwise returns nullopt. */
static std::experimental::optional<StateDelta> Decode(
HashedFileReader &reader,
communication::bolt::Decoder<HashedFileReader> &decoder);
/** Encodes the delta using primitive encoder, and writes out the new hash
* with delta to the writer */
void Encode(
HashedFileWriter &writer,
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) const;
static StateDelta TxBegin(tx::TransactionId tx_id);
static StateDelta TxCommit(tx::TransactionId tx_id);
static StateDelta TxAbort(tx::TransactionId tx_id);
static StateDelta CreateVertex(tx::TransactionId tx_id,
gid::Gid vertex_id,
int64_t cypher_id);
static StateDelta CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id,
int64_t cypher_id,
gid::Gid vertex_from_id,
gid::Gid vertex_to_id,
storage::EdgeType edge_type,
const std::string &edge_type_name);
static StateDelta AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::VertexAddress vertex_to_address,
storage::EdgeAddress edge_address,
storage::EdgeType edge_type);
static StateDelta RemoveOutEdge(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address);
static StateDelta AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::VertexAddress vertex_from_address,
storage::EdgeAddress edge_address,
storage::EdgeType edge_type);
static StateDelta RemoveInEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address);
static StateDelta PropsSetVertex(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::Property property,
const std::string &property_name,
const PropertyValue &value);
static StateDelta PropsSetEdge(tx::TransactionId tx_id, gid::Gid edge_id,
storage::Property property,
const std::string &property_name,
const PropertyValue &value);
static StateDelta AddLabel(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::Label label,
const std::string &label_name);
static StateDelta RemoveLabel(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::Label label,
const std::string &label_name);
static StateDelta RemoveVertex(tx::TransactionId tx_id, gid::Gid vertex_id,
bool check_empty);
static StateDelta RemoveEdge(tx::TransactionId tx_id, gid::Gid edge_id);
static StateDelta BuildIndex(tx::TransactionId tx_id, storage::Label label,
const std::string &label_name,
storage::Property property,
const std::string &property_name);
/// Applies CRUD delta to database accessor. Fails on other types of deltas
void Apply(GraphDbAccessor &dba) const;
cpp<#))
(lcp:pop-namespace) ;; database

View File

@ -0,0 +1,52 @@
#pragma once
///
///
/// IMPORTANT: Please update this file for every snapshot format change!!!
/// TODO (buda): This is not rock solid.
///
#include <array>
#include <cstdint>
namespace durability {
constexpr std::array<uint8_t, 4> kSnapshotMagic{{'M', 'G', 's', 'n'}};
constexpr std::array<uint8_t, 4> kWalMagic{{'M', 'G', 'w', 'l'}};
// The current default version of snapshot and WAL encoding / decoding.
constexpr int64_t kVersion{6};
// Snapshot format (version 6):
// 1) Magic number + snapshot version
// 2) Distributed worker ID
//
// The following two entries indicate the starting points for generating new
// vertex/edge IDs in the DB. They are important when there are vertices/edges
// that were moved to another worker (in distributed Memgraph).
// 3) Vertex generator ID
// 4) Edge generator ID
//
// The following two entries are required when recovering from snapshot combined
// with WAL to determine record visibility.
// 5) Transactional ID of the snapshooter
// 6) Transactional snapshot of the snapshooter
//
// 7) A list of label+property indices.
//
// We must inline edges with nodes because some edges might be stored on other
// worker (edges are always stored only on the worker of the edge source).
// 8) Bolt encoded nodes. Each node is written in the following format:
// * gid, labels, properties
// * cypher_id
// * inlined edges (edge address, other endpoint address and edge type)
// 9) Bolt encoded edges. Each edge is written in the following format:
// * gid
// * from, to
// * edge_type
// * properties
// * cypher_id
//
// 10) Snapshot summary (number of nodes, number of edges, hash)
} // namespace durability

View File

@ -1,7 +1,7 @@
#include "wal.hpp"
#include "durability/paths.hpp"
#include "durability/version.hpp"
#include "durability/single_node/version.hpp"
#include "utils/file.hpp"
#include "utils/flag_validation.hpp"

View File

@ -0,0 +1,100 @@
#pragma once
#include <chrono>
#include <cstdint>
#include <experimental/filesystem>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "data_structures/ring_buffer.hpp"
#include "durability/single_node/state_delta.hpp"
#include "storage/common/property_value.hpp"
#include "storage/common/types.hpp"
#include "storage/single_node/gid.hpp"
#include "transactions/type.hpp"
#include "utils/scheduler.hpp"
namespace durability {
/// A database StateDelta log for durability. Buffers and periodically
/// serializes small-granulation database deltas (StateDelta).
///
/// The order is not deterministic in a multithreaded scenario (multiple DB
/// transactions). This is fine, the recovery process should be immune to this
/// indeterminism.
class WriteAheadLog {
public:
WriteAheadLog(int worker_id,
const std::experimental::filesystem::path &durability_dir,
bool durability_enabled, bool synchronous_commit);
~WriteAheadLog();
/// Initializes the WAL. Called at the end of GraphDb construction, after
/// (optional) recovery. Also responsible for initializing the wal_file.
void Init();
/// Emplaces the given DeltaState onto the buffer, if the WAL is enabled.
/// If the WAL is configured to work in synchronous commit mode, emplace will
/// flush the buffers if a delta represents a transaction end.
void Emplace(const database::StateDelta &delta);
/// Flushes every delta currently in the ring buffer.
/// This method should only be called from tests.
void Flush();
private:
/// Groups the logic of WAL file handling (flushing, naming, rotating)
class WalFile {
public:
WalFile(int worker_id, const std::experimental::filesystem::path &wal__dir);
~WalFile();
/// Initializes the WAL file. Must be called before first flush. Can be
/// called after Flush() to re-initialize stuff.
void Init();
/// Flushes all the deltas in the buffer to the WAL file. If necessary
/// rotates the file.
void Flush(RingBuffer<database::StateDelta> &buffer);
private:
/// Mutex used for flushing wal data
std::mutex flush_mutex_;
int worker_id_;
const std::experimental::filesystem::path wal_dir_;
HashedFileWriter writer_;
communication::bolt::BaseEncoder<HashedFileWriter> encoder_{writer_};
/// The file to which the WAL flushes data. The path is fixed, the file gets
/// moved when the WAL gets rotated.
std::experimental::filesystem::path current_wal_file_;
/// Number of deltas in the current wal file.
int current_wal_file_delta_count_{0};
/// The latest transaction whose delta is recorded in the current WAL file.
/// Zero indicates that no deltas have so far been written to the current
/// WAL file.
tx::TransactionId latest_tx_{0};
void RotateFile();
};
RingBuffer<database::StateDelta> deltas_;
utils::Scheduler scheduler_;
WalFile wal_file_;
/// Used for disabling the durability feature of the DB.
bool durability_enabled_{false};
/// Used for disabling the WAL during DB recovery.
bool enabled_{false};
/// Should every WAL write be synced with the underlying storage.
bool synchronous_commit_{false};
/// Checks whether the given state delta represents a transaction end,
/// TRANSACTION_COMMIT and TRANSACTION_ABORT.
bool IsStateDeltaTransactionEnd(const database::StateDelta &delta);
};
} // namespace durability

View File

@ -3,7 +3,7 @@
#include "communication/bolt/v1/value.hpp"
#include "query/typed_value.hpp"
#include "storage/property_value.hpp"
#include "storage/common/property_value.hpp"
namespace glue {

View File

@ -1,13 +1,12 @@
/// @file
#pragma once
#include "integrations/kafka/consumer.hpp"
#include <experimental/optional>
#include <mutex>
#include <unordered_map>
#include "storage/kvstore.hpp"
#include "integrations/kafka/consumer.hpp"
#include "storage/kvstore/kvstore.hpp"
namespace integrations::kafka {

108
src/memgraph.cpp Normal file
View File

@ -0,0 +1,108 @@
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <exception>
#include <functional>
#include <limits>
#include <thread>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/server.hpp"
#include "database/graph_db.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "memgraph_init.hpp"
#include "query/exceptions.hpp"
#include "telemetry/telemetry.hpp"
#include "utils/flag_validation.hpp"
// General purpose flags.
DEFINE_string(interface, "0.0.0.0",
"Communication interface on which to listen.");
DEFINE_VALIDATED_int32(port, 7687, "Communication port on which to listen.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
DEFINE_VALIDATED_int32(num_workers,
std::max(std::thread::hardware_concurrency(), 1U),
"Number of workers (Bolt)", FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_VALIDATED_int32(session_inactivity_timeout, 1800,
"Time in seconds after which inactive sessions will be "
"closed.",
FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_string(cert_file, "", "Certificate file to use.");
DEFINE_string(key_file, "", "Key file to use.");
DEFINE_bool(telemetry_enabled, false,
"Set to true to enable telemetry. We collect information about the "
"running system (CPU and memory information) and information about "
"the database runtime (vertex and edge counts and resource usage) "
"to allow for easier improvement of the product.");
using ServerT = communication::Server<BoltSession, SessionData>;
using communication::ServerContext;
void SingleNodeMain() {
google::SetUsageMessage("Memgraph single-node database server");
database::SingleNode db;
query::Interpreter interpreter;
SessionData session_data{&db, &interpreter};
integrations::kafka::Streams kafka_streams{
std::experimental::filesystem::path(FLAGS_durability_directory) /
"streams",
[&session_data](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
KafkaStreamWriter(session_data, query, params);
}};
try {
// Recover possible streams.
kafka_streams.Recover();
} catch (const integrations::kafka::KafkaStreamException &e) {
LOG(ERROR) << e.what();
}
session_data.interpreter->auth_ = &session_data.auth;
session_data.interpreter->kafka_streams_ = &kafka_streams;
ServerContext context;
std::string service_name = "Bolt";
if (FLAGS_key_file != "" && FLAGS_cert_file != "") {
context = ServerContext(FLAGS_key_file, FLAGS_cert_file);
service_name = "BoltS";
}
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
&session_data, &context, FLAGS_session_inactivity_timeout,
service_name, FLAGS_num_workers);
// Setup telemetry
std::experimental::optional<telemetry::Telemetry> telemetry;
if (FLAGS_telemetry_enabled) {
telemetry.emplace(
"https://telemetry.memgraph.com/88b5e7e8-746a-11e8-9f85-538a9e9690cc/",
std::experimental::filesystem::path(FLAGS_durability_directory) /
"telemetry",
std::chrono::minutes(10));
telemetry->AddCollector("db", [&db]() -> nlohmann::json {
auto dba = db.Access();
return {{"vertices", dba->VerticesCount()}, {"edges", dba->EdgesCount()}};
});
}
// Handler for regular termination signals
auto shutdown = [&server] {
// Server needs to be shutdown first and then the database. This prevents a
// race condition when a transaction is accepted during server shutdown.
server.Shutdown();
};
InitSignalHandlers(shutdown);
server.AwaitShutdown();
}
int main(int argc, char **argv) {
return WithInit(argc, argv, []() { return "memgraph"; }, SingleNodeMain);
}

View File

@ -20,8 +20,6 @@
#include "telemetry/telemetry.hpp"
#include "utils/flag_validation.hpp"
// Common stuff for enterprise and community editions
// General purpose flags.
DEFINE_string(interface, "0.0.0.0",
"Communication interface on which to listen.");
@ -46,77 +44,6 @@ DEFINE_bool(telemetry_enabled, false,
using ServerT = communication::Server<BoltSession, SessionData>;
using communication::ServerContext;
void SingleNodeMain() {
google::SetUsageMessage("Memgraph single-node database server");
database::SingleNode db;
query::Interpreter interpreter;
SessionData session_data{&db, &interpreter};
integrations::kafka::Streams kafka_streams{
std::experimental::filesystem::path(FLAGS_durability_directory) /
"streams",
[&session_data](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
KafkaStreamWriter(session_data, query, params);
}};
try {
// Recover possible streams.
kafka_streams.Recover();
} catch (const integrations::kafka::KafkaStreamException &e) {
LOG(ERROR) << e.what();
}
session_data.interpreter->auth_ = &session_data.auth;
session_data.interpreter->kafka_streams_ = &kafka_streams;
ServerContext context;
std::string service_name = "Bolt";
if (FLAGS_key_file != "" && FLAGS_cert_file != "") {
context = ServerContext(FLAGS_key_file, FLAGS_cert_file);
service_name = "BoltS";
}
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
&session_data, &context, FLAGS_session_inactivity_timeout,
service_name, FLAGS_num_workers);
// Setup telemetry
std::experimental::optional<telemetry::Telemetry> telemetry;
if (FLAGS_telemetry_enabled) {
telemetry.emplace(
"https://telemetry.memgraph.com/88b5e7e8-746a-11e8-9f85-538a9e9690cc/",
std::experimental::filesystem::path(FLAGS_durability_directory) /
"telemetry",
std::chrono::minutes(10));
telemetry->AddCollector("db", [&db]() -> nlohmann::json {
auto dba = db.Access();
return {{"vertices", dba->VerticesCount()}, {"edges", dba->EdgesCount()}};
});
}
// Handler for regular termination signals
auto shutdown = [&server] {
// Server needs to be shutdown first and then the database. This prevents a
// race condition when a transaction is accepted during server shutdown.
server.Shutdown();
};
InitSignalHandlers(shutdown);
server.AwaitShutdown();
}
// End common stuff for enterprise and community editions
#ifdef MG_COMMUNITY
int main(int argc, char **argv) {
return WithInit(argc, argv, []() { return "memgraph"; }, SingleNodeMain);
}
#else // enterprise edition
// Distributed flags.
DEFINE_HIDDEN_bool(
master, false,
@ -212,16 +139,14 @@ int main(int argc, char **argv) {
auto memgraph_main = [&]() {
CHECK(!(FLAGS_master && FLAGS_worker))
<< "Can't run Memgraph as worker and master at the same time";
<< "Can't run Memgraph as worker and master at the same time!";
CHECK(FLAGS_master || FLAGS_worker)
<< "You must specify that Memgraph should be either a master or a worker!";
if (FLAGS_master)
MasterMain();
else if (FLAGS_worker)
WorkerMain();
else
SingleNodeMain();
WorkerMain();
};
return WithInit(argc, argv, get_stats_prefix, memgraph_main);
}
#endif // enterprise edition

View File

@ -11,8 +11,8 @@
#include "auth/auth.hpp"
#include "communication/bolt/v1/session.hpp"
#include "communication/init.hpp"
#include "communication/session.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "query/interpreter.hpp"
#include "query/transaction_engine.hpp"

View File

@ -8,7 +8,7 @@
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
#include "mvcc/version.hpp"
#include "mvcc/common/version.hpp"
#include "storage/locking/record_lock.hpp"
// the mvcc implementation used here is very much like postgresql's

View File

@ -1,6 +1,6 @@
#pragma once
#include "storage/gid.hpp"
#include "storage/distributed/gid.hpp"
#include "storage/locking/record_lock.hpp"
#include "transactions/transaction.hpp"
#include "utils/exceptions.hpp"

View File

@ -0,0 +1,335 @@
#pragma once
#include <atomic>
#include <experimental/optional>
#include <iostream>
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
#include "mvcc/common/version.hpp"
#include "storage/locking/record_lock.hpp"
// the mvcc implementation used here is very much like postgresql's
// more info: https://momjian.us/main/writings/pgsql/mvcc.pdf
namespace mvcc {
template <class T>
class Record : public Version<T> {
public:
Record() = default;
Record(const Record &) = delete;
Record &operator=(const Record &) = delete;
Record(Record &&) = delete;
Record &operator=(Record &&) = delete;
// check if this record is visible to the transaction t
bool visible(const tx::Transaction &t) {
// Mike Olson says 17 march 1993: the tests in this routine are correct;
// if you think they're not, you're wrong, and you should think about it
// again. i know, it happened to me.
// fetch expiration info in a safe way (see fetch_exp for details)
tx::TransactionId tx_exp;
tx::CommandId cmd_exp;
std::tie(tx_exp, cmd_exp) = fetch_exp();
return ((tx_.cre == t.id_ && // inserted by the current transaction
cmd_.cre < t.cid() && // before this command, and
(tx_exp == 0 || // the row has not been deleted, or
(tx_exp == t.id_ && // it was deleted by the current
// transaction
cmd_exp >= t.cid()))) // but not before this command,
|| // or
(visible_from(Hints::kCre, tx_.cre,
t) && // the record was inserted by a
// committed transaction, and
(tx_exp == 0 || // the record has not been deleted, or
(tx_exp == t.id_ && // the row is being deleted by this
// transaction
cmd_exp >= t.cid()) || // but it's not deleted "yet", or
(tx_exp != t.id_ && // the row was deleted by another
// transaction
!visible_from(Hints::kExp, tx_exp,
t) // that has not been committed
))));
}
void mark_created(const tx::Transaction &t) {
DCHECK(tx_.cre == 0) << "Marking node as created twice.";
tx_.cre = t.id_;
cmd_.cre = t.cid();
}
void mark_expired(const tx::Transaction &t) {
tx_.exp = t.id_;
cmd_.exp = t.cid();
}
bool exp_committed(tx::Engine &engine) {
return committed(Hints::kExp, engine);
}
/**
* Check if this record is visible w.r.t. to the given garbage collection
* snapshot. See source comments for exact logic.
*
* @param snapshot - the GC snapshot. Consists of the oldest active
* transaction's snapshot, with that transaction's id appened as last.
*/
bool is_not_visible_from(const tx::Snapshot &snapshot,
const tx::Engine &engine) const {
// first get tx.exp so that all the subsequent checks operate on
// the same id. otherwise there could be a race condition
auto exp_id = tx_.exp.load();
// a record is NOT visible if:
// 1. it creating transaction aborted (last check), and is also older than
// the current oldest active transaction (optimization) OR
// 2. a) it's expiration is not 0 (some transaction expired it)
// AND
// b) the expiring transaction is older than latest active
// AND
// c) that transaction committed (as opposed to aborted)
// AND
// d) that transaction is not in oldest active transaction's
// snapshot (consequently also not in the snapshots of
// newer transactions)
return (exp_id != 0 && exp_id < snapshot.back() &&
committed(Hints::kExp, engine) && !snapshot.contains(exp_id)) ||
(tx_.cre.load() < snapshot.back() && cre_aborted(engine));
}
// TODO: Test this
// True if this record is visible for write.
// Note that this logic is different from the one above
// in the sense that a record is visible if created before
// OR DURING this command. this is done to support cypher's
// queries which can match, update and return in the same query
bool is_visible_write(const tx::Transaction &t) {
// fetch expiration info in a safe way (see fetch_exp for details)
tx::TransactionId tx_exp;
tx::CommandId cmd_exp;
std::tie(tx_exp, cmd_exp) = fetch_exp();
return (tx_.cre == t.id_ && // inserted by the current transaction
cmd_.cre <= t.cid() && // before OR DURING this command, and
(tx_exp == 0 || // the row has not been deleted, or
(tx_exp == t.id_ && // it was deleted by the current
// transaction
cmd_exp >= t.cid()))); // but not before this command,
}
/**
* True if this record is created in the current command
* of the given transaction.
*/
bool is_created_by(const tx::Transaction &t) {
return tx_.cre == t.id_ && cmd_.cre == t.cid();
}
/**
* True if this record is expired in the current command
* of the given transaction.
*/
bool is_expired_by(const tx::Transaction &t) const {
return std::make_pair(t.id_, t.cid()) == fetch_exp();
}
const auto &tx() const { return tx_; }
const auto &cmd() const { return cmd_; }
/**
* Makes sure that create and expiry are in sync with hints if they are
* committed or aborted and are before the `tx_cutoff`.
* `tx_cutoff` exists as a performance optimization to avoid setting hint bits
* on records for which we don't need to have a guarantee that they are set as
* part of GC hints setting procedure
*/
void populate_hints(const tx::Engine &engine, tx::TransactionId tx_cutoff) {
populate_hint_if_possible(engine, Hints::kCre, tx_cutoff);
if (!populate_hint_if_possible(engine, Hints::kExp, tx_cutoff)) {
// Exp is aborted and we can't set the hint, this way we don't have to set
// the hint because an aborted transaction which expires a record is the
// same thing as a non-expired record
tx::TransactionId expected;
do {
expected = tx_.exp;
// If the transaction expiry is no longer aborted we don't need to
// update it anymore, and hints can't be set since it's obviously an
// active transaction - there might be a case where this transaction
// gets finished and committed in the meantime and hints could be set,
// but since we are not going to delete info for this transaction from
// the commit log since it wasn't older than the oldest active
// transaction at the time, or before the invocation of this method;
// we are in the clear
if (!engine.Info(expected).is_aborted()) break;
} while (!tx_.exp.compare_exchange_weak(expected, 0));
// Ideally we should set the command id as well, but by setting it we
// can't guarantee that some new update won't change the transaction id
// and command id before we had a chance to set it, and just leaving it
// unchanged and relying on all methods to operate on [tx_id: 0, cmd_id:
// some cmd] as a non-transaction doesn't seem too crazy
}
}
private:
/**
* Fast indicators if a transaction has committed or aborted. It is possible
* the hints do not have that information, in which case the commit log needs
* to be consulted (a slower operation).
*/
class Hints {
public:
/// Masks for the creation/expration and commit/abort positions.
static constexpr uint8_t kCre = 0b0011;
static constexpr uint8_t kExp = 0b1100;
static constexpr uint8_t kCmt = 0b0101;
static constexpr uint8_t kAbt = 0b1010;
/** Returns true if any bit under the given mask is set. */
bool Get(uint8_t mask) const { return bits_ & mask; }
/** Sets all the bits under the given mask. */
void Set(uint8_t mask) { bits_.fetch_or(mask); }
/** Clears all the bits under the given mask. */
void Clear(uint8_t mask) { bits_.fetch_and(~mask); }
private:
std::atomic<uint8_t> bits_{0};
};
template <typename TId>
struct CreExp {
std::atomic<TId> cre{0};
std::atomic<TId> exp{0};
};
// tx.cre is the id of the transaction that created the record
// and tx.exp is the id of the transaction that deleted the record
// These values are used to determine the visibility of the record
// to the current transaction.
CreExp<tx::TransactionId> tx_;
// cmd.cre is the id of the command in this transaction that created the
// record and cmd.exp is the id of the command in this transaction that
// deleted the record. These values are used to determine the visibility
// of the record to the current command in the running transaction.
CreExp<tx::CommandId> cmd_;
mutable Hints hints_;
/** Fetch the (transaction, command) expiration before the check
* because they can be concurrently modified by multiple transactions.
* Do it in a loop to ensure that command is consistent with transaction.
*/
auto fetch_exp() const {
tx::TransactionId tx_exp;
tx::CommandId cmd_exp;
do {
tx_exp = tx_.exp;
cmd_exp = cmd_.exp;
} while (tx_exp != tx_.exp);
return std::make_pair(tx_exp, cmd_exp);
}
/**
* Populates hint if it is not set for the given create/expiry mask and is
* before the `tx_cutoff` if specified. Note that it doesn't set hint bits for
* expiry transactions which abort because it's too expensive to maintain
* correctness of those hints with regards to race conditions
* @returns - true if hints are now equal to transaction status
* (committed/aborted), will only be false if we are trying to set hint for
* aborted transaction which is this records expiry
*/
bool populate_hint_if_possible(
const tx::Engine &engine, const uint8_t mask,
const std::experimental::optional<tx::TransactionId> tx_cutoff =
std::experimental::nullopt) const {
DCHECK(mask == Hints::kCre || mask == Hints::kExp)
<< "Mask should be either for creation or expiration";
if (hints_.Get(mask)) return true;
auto id = mask == Hints::kCre ? tx_.cre.load() : tx_.exp.load();
// Nothing to do here if there is no id or id is larger than tx_cutoff
if (!id || (tx_cutoff && id >= *tx_cutoff)) return true;
auto info = engine.Info(id);
if (info.is_committed()) {
hints_.Set(mask & Hints::kCmt);
} else if (info.is_aborted()) {
// Abort hints can only be updated for creation hints because only one
// transaction can be creating a single record, so there is no races
if (mask == Hints::kCre)
hints_.Set(mask & Hints::kAbt);
else
return false;
}
return true;
}
/**
* @brief - Check if the transaciton `id` has comitted before `t` started
* (that means that edits done by transaction `id` are visible in `t`)
*
* Evaluates to true if that transaction has committed,
* it started before `t` and it's not in it's snapshot.
*
* about transactions commit/abort status
* @param mask - Hint bits mask (either Hints::kCre or Hints::kExp).
* @param id - id to check if it's commited and visible
* @return true if the id is commited and visible for the transaction t.
*/
bool visible_from(uint8_t mask, tx::TransactionId id,
const tx::Transaction &t) {
DCHECK(mask == Hints::kCre || mask == Hints::kExp)
<< "Mask must be either kCre or kExp";
// Dominik Gleich says 4 april 2017: the tests in this routine are correct;
// if you think they're not, you're wrong, and you should think about it
// again. I know, it happened to me (and also to Matej Gradicek).
// You certainly can't see the transaction with id greater than yours as
// that means it started after this transaction and if it commited, it
// commited after this transaction has started.
if (id >= t.id_) return false;
// The creating transaction is still in progress (examine snapshot)
if (t.snapshot().contains(id)) return false;
return committed(mask, t.engine_);
}
/**
* @brief - Check if the transaction with the given `id` is committed.
*
* @param mask - Hint bits mask (either Hints::kCre or Hints::kExp).
* @param id - id to check if commited
* statuses
* @return true if it's commited, false otherwise
*/
bool committed(uint8_t mask, const tx::Engine &engine) const {
DCHECK(mask == Hints::kCre || mask == Hints::kExp)
<< "Mask must be either kCre or kExp";
populate_hint_if_possible(engine, mask);
return hints_.Get(Hints::kCmt & mask);
}
/**
* @brief - Check if tx_.cre is aborted. If you need to check for exp
* transaction do it manually by looking at commit log. This function can't do
* that for you since hints can't be used for exp transaction (reason is
* described in function above).
*
* @param engine - engine instance with information about transaction
* statuses
* @return true if it's aborted, false otherwise
*/
bool cre_aborted(const tx::Engine &engine) const {
// Populate hints if not set and return result from hints
DCHECK(populate_hint_if_possible(engine, Hints::kCre))
<< "Hints not populated";
return hints_.Get(Hints::kAbt & Hints::kCre);
}
};
} // namespace mvcc

View File

@ -0,0 +1,283 @@
#pragma once
#include "storage/single_node/gid.hpp"
#include "storage/locking/record_lock.hpp"
#include "transactions/transaction.hpp"
#include "utils/exceptions.hpp"
namespace mvcc {
class SerializationError : public utils::BasicException {
static constexpr const char *default_message =
"Can't serialize due to concurrent operations.";
public:
using utils::BasicException::BasicException;
SerializationError() : BasicException(default_message) {}
};
template <class T>
class VersionList {
public:
/**
* @brief Constructor that is used to insert one item into VersionList.
*
* @param t - transaction
* @param gid - Version list identifier. Uniqueness guaranteed by the code
* creating this version list.
* @param cypher_id - Number returned from the id function.
* @param args - args forwarded to constructor of item T (for
* creating the first Record (Version) in this VersionList.
*/
template <typename... Args>
VersionList(const tx::Transaction &t, gid::Gid gid, int64_t cypher_id,
Args &&... args)
: gid_(gid), cypher_id_(cypher_id) {
// TODO replace 'new' with something better
auto *v1 = new T(std::forward<Args>(args)...);
v1->mark_created(t);
head_ = v1;
}
VersionList() = delete;
VersionList(const VersionList &) = delete;
VersionList &operator=(const VersionList &) = delete;
// We do a lot of raw-pointer ops with VLists, and these ops assume that a
// VList's address identifies a vertex/edge absolutely and during it's whole
// lifteme. We also assume that the VList owner is the database and that
// ownership is also handled via raw pointers so this shouldn't be moved or
// move assigned.
VersionList(VersionList &&other) = delete;
VersionList &operator=(VersionList &&other) = delete;
~VersionList() { delete head_.load(); }
friend std::ostream &operator<<(std::ostream &stream,
const VersionList<T> &vlist) {
stream << "VersionList" << std::endl;
T *record = vlist.head_;
while (record != nullptr) {
stream << "-- " << *record << std::endl;
record = record->next();
}
return stream;
}
/**
* Garbage collects records that are not reachable/visible anymore.
*
* Relinks this version-list so that garbage collected records are no
* longer reachable through this version list.
* Visibility is defined in mvcc::Record::is_not_visible_from,
* to which the given `snapshot` is passed.
*
* This method is NOT thread-safe.
*
* @param snapshot - the GC snapshot. Consists of the oldest active
* transaction's snapshot, with that transaction's id appened as last.
* @param engine - transaction engine to use - we need it to check which
* records were commited and which weren't
* @return pair<status, to_delete>; status is true - If version list is empty
* after garbage collection. to_delete points to the newest record that is not
* visible anymore. If none exists to_delete will point to nullptr.
*/
std::pair<bool, T *> GcDeleted(const tx::Snapshot &snapshot,
const tx::Engine &engine) {
// nullptr
// |
// [v1] ... all of this gets deleted!
// |
// [v2] <------+ head_of_deletable_records
// | |
// [v3] <------+ oldest_visible_record
// | | Jump backwards until you find the oldest visible
// [VerList] ----+ record, or you reach the end of the list
//
T *head = head_;
T *current = head;
T *oldest_visible_record = nullptr;
while (current) {
// Populate hints only when needed to avoid excessive rpc calls on
// workers.
// snapshot.back() corresponds to the oldest active transaction,
// and this makes it set only hint bits when the creating or expiring
// transaction of a record is older than that)
current->populate_hints(engine, snapshot.back());
if (!current->is_not_visible_from(snapshot, engine))
oldest_visible_record = current;
current = current->next();
}
if (oldest_visible_record) {
T *head_of_deletable_records = oldest_visible_record->next();
// oldest_visible_record might be visible to some transaction but
// head_of_deletable_records is not and will never be visted by the find
// function and as such doesn't represent pointer invalidation
// race-condition risk.
oldest_visible_record->next(nullptr); // No transaction will look
// further than this record and
// that's why it's safe to set
// next to nullptr.
// Calling destructor of head_of_deletable_records will clean everything
// older than this record since they are called recursively.
return std::make_pair(false, head_of_deletable_records);
}
// This can happen only if the head points to a expired record. Since there
// is no visible records in this version_list we can remove it.
head_ = nullptr;
// This is safe to return as ready for deletion since we unlinked head
// above and this will only be deleted after the last active transaction
// ends.
return std::make_pair(true, head);
}
/**
* @brief - returns oldest record
* @return nullptr if none exist
*/
T *Oldest() {
T *r = head_;
while (r && r->next(std::memory_order_seq_cst))
r = r->next(std::memory_order_seq_cst);
return r;
}
T *find(const tx::Transaction &t) {
T *r = head_;
// nullptr
// |
// [v1] ...
// |
// [v2] <------+
// | |
// [v3] <------+
// | | Jump backwards until you find a first visible
// [VerList] ----+ version, or you reach the end of the list
//
while (r != nullptr && !r->visible(t))
r = r->next(std::memory_order_seq_cst);
return r;
}
/**
* Looks for and sets two versions. The 'old' version is the
* newest version that is visible by the current transaction+command,
* but has not been created by it. The 'new' version is the version
* that has been created by current transaction+command.
*
* It is possible that both, either or neither are found:
* - both are found when an existing record has been modified
* - only old is found when an existing record has not been modified
* - only new is found when the whole vlist was created
* - neither is found when for example the record has been deleted but not
* garbage collected yet
*
* @param t The transaction
*/
void find_set_old_new(const tx::Transaction &t, T **old_ref, T **new_ref) {
// assume that the sought old record is further down the list
// from new record, so that if we found old we can stop looking
*new_ref = nullptr;
*old_ref = head_;
while (*old_ref != nullptr && !(*old_ref)->visible(t)) {
if (!*new_ref && (*old_ref)->is_created_by(t)) *new_ref = *old_ref;
*old_ref = (*old_ref)->next(std::memory_order_seq_cst);
}
}
/**
* Looks for the first visible record seen by this transaction. If the current
* transaction has already created new record in the current command then that
* record is returned, else first older visible record is updated. New record
* becomes head of the version list and it is returned. There should always be
* older visible record when this update is called.
*
* @param t The transaction
*/
T *update(const tx::Transaction &t) {
DCHECK(head_ != nullptr) << "Head is nullptr on update.";
T *old_record = nullptr;
T *new_record = nullptr;
find_set_old_new(t, &old_record, &new_record);
// check if current transaction in current cmd has
// already updated version list
if (new_record) return new_record;
// check if we found any visible records
CHECK(old_record != nullptr) << "Updating nullptr record";
return update(old_record, t);
}
/** Makes the given record as being expired by the given transaction. */
void remove(T *record, const tx::Transaction &t) {
DCHECK(record != nullptr) << "Record is nullptr on removal.";
lock_and_validate(record, t);
record->mark_expired(t);
}
const gid::Gid gid_;
auto cypher_id() { return cypher_id_; }
private:
void lock_and_validate(T *record, const tx::Transaction &t) {
DCHECK(record != nullptr) << "Record is nullptr on lock and validation.";
// take a lock on this node
t.TakeLock(lock_);
// if the record hasn't been deleted yet or the deleting transaction
// has aborted, it's ok to modify it
if (!record->tx().exp || !record->exp_committed(t.engine_)) return;
// if it committed, then we have a serialization conflict
throw SerializationError();
}
T *update(T *record, const tx::Transaction &t) {
DCHECK(record != nullptr) << "Record is nullptr on update.";
lock_and_validate(record, t);
// It could be done with unique_ptr but while this could mean memory
// leak on exception, unique_ptr could mean use after free. Memory
// leak is less dangerous.
auto *updated = record->CloneData();
updated->mark_created(t);
record->mark_expired(t);
// Updated version should point to the latest available version. Older
// versions that can be deleted will be removed during the GC phase.
updated->next(head_.load(), std::memory_order_seq_cst);
// Store the updated version as the first version point to by head.
head_.store(updated, std::memory_order_seq_cst);
return updated;
}
/**
* The following member is here because Memgraph supports ID function from
* the Cypher query language. If you have plans to change this you have to
* consider the following:
* * If the id has to be durable. -> Snapshot and WAL have to be updated.
* * Impact on query execution. |
* * Impact on the communication stack. |-> The id has to be returned
* to the client.
* * Import tools bacause of the dependencies on the durability stack.
* * Implications on the distributed system.
*/
int64_t cypher_id_{0};
std::atomic<T *> head_{nullptr};
RecordLock lock_;
};
} // namespace mvcc

View File

@ -284,28 +284,6 @@ bool TypedValueVectorCompare::operator()(
return (c1_it == c1.end()) && (c2_it != c2.end());
}
void Save(const TypedValueVectorCompare &comparator,
capnp::TypedValueVectorCompare::Builder *builder) {
auto ordering_builder = builder->initOrdering(comparator.ordering().size());
for (size_t i = 0; i < comparator.ordering().size(); ++i) {
ordering_builder.set(i, comparator.ordering()[i] == Ordering::ASC
? capnp::Ordering::ASC
: capnp::Ordering::DESC);
}
}
void Load(TypedValueVectorCompare *comparator,
const capnp::TypedValueVectorCompare::Reader &reader) {
std::vector<Ordering> ordering;
ordering.reserve(reader.getOrdering().size());
for (auto ordering_reader : reader.getOrdering()) {
ordering.push_back(ordering_reader == capnp::Ordering::ASC
? Ordering::ASC
: Ordering::DESC);
}
comparator->ordering_ = ordering;
}
template <typename TAccessor>
void SwitchAccessor(TAccessor &accessor, GraphView graph_view) {
switch (graph_view) {

View File

@ -7,9 +7,8 @@
#include "query/exceptions.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol.hpp"
#include "query/serialization.capnp.h"
#include "query/typed_value.hpp"
#include "storage/types.hpp"
#include "storage/common/types.hpp"
namespace query {
@ -49,12 +48,6 @@ class TypedValueVectorCompare final {
std::vector<Ordering> ordering_;
};
void Save(const TypedValueVectorCompare &comparator,
capnp::TypedValueVectorCompare::Builder *builder);
void Load(TypedValueVectorCompare *comparator,
const capnp::TypedValueVectorCompare::Reader &reader);
/// Switch the given [Vertex/Edge]Accessor to the desired state.
template <class TAccessor>
void SwitchAccessor(TAccessor &accessor, GraphView graph_view);

View File

@ -9,8 +9,8 @@
#include "query/frontend/semantic/symbol.hpp"
#include "query/interpret/awesome_memgraph_functions.hpp"
#include "query/typed_value.hpp"
#include "storage/property_value.hpp"
#include "storage/types.hpp"
#include "storage/common/property_value.hpp"
#include "storage/common/types.hpp"
// Hash function for the key in pattern atom property maps.
namespace std {
@ -36,7 +36,7 @@ cpp<#
(lcp:namespace query)
(lcp:capnp-namespace "query")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'storage "/storage/distributed/serialization.capnp")
(lcp:capnp-import 'symbol "/query/frontend/semantic/symbol.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")

View File

@ -3,7 +3,8 @@
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/ast/ast_serialization.capnp.h"
#include "storage/serialization.hpp"
#include "query/serialization.hpp"
#include "storage/distributed/serialization.hpp"
cpp<#
(load "query/frontend/ast/ast.lcp")

View File

@ -2,8 +2,6 @@
#include <string>
#include "symbol.capnp.h"
namespace query {
class Symbol {
@ -47,60 +45,6 @@ class Symbol {
int token_position_ = -1;
};
inline void Save(const Symbol &symbol, capnp::Symbol::Builder *builder) {
builder->setName(symbol.name());
builder->setPosition(symbol.position());
builder->setUserDeclared(symbol.user_declared());
builder->setTokenPosition(symbol.token_position());
switch (symbol.type()) {
case Symbol::Type::Any:
builder->setType(capnp::Symbol::Type::ANY);
break;
case Symbol::Type::Edge:
builder->setType(capnp::Symbol::Type::EDGE);
break;
case Symbol::Type::EdgeList:
builder->setType(capnp::Symbol::Type::EDGE_LIST);
break;
case Symbol::Type::Number:
builder->setType(capnp::Symbol::Type::NUMBER);
break;
case Symbol::Type::Path:
builder->setType(capnp::Symbol::Type::PATH);
break;
case Symbol::Type::Vertex:
builder->setType(capnp::Symbol::Type::VERTEX);
break;
}
}
inline void Load(Symbol *symbol, const capnp::Symbol::Reader &reader) {
symbol->name_ = reader.getName();
symbol->position_ = reader.getPosition();
symbol->user_declared_ = reader.getUserDeclared();
symbol->token_position_ = reader.getTokenPosition();
switch (reader.getType()) {
case capnp::Symbol::Type::ANY:
symbol->type_ = Symbol::Type::Any;
break;
case capnp::Symbol::Type::EDGE:
symbol->type_ = Symbol::Type::Edge;
break;
case capnp::Symbol::Type::EDGE_LIST:
symbol->type_ = Symbol::Type::EdgeList;
break;
case capnp::Symbol::Type::NUMBER:
symbol->type_ = Symbol::Type::Number;
break;
case capnp::Symbol::Type::PATH:
symbol->type_ = Symbol::Type::Path;
break;
case capnp::Symbol::Type::VERTEX:
symbol->type_ = Symbol::Type::Vertex;
break;
}
}
} // namespace query
namespace std {

View File

@ -4,7 +4,6 @@
#include <string>
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol.capnp.h"
#include "query/frontend/semantic/symbol.hpp"
namespace query {
@ -33,29 +32,5 @@ class SymbolTable final {
std::map<int, Symbol> table_;
};
inline void Save(const SymbolTable &symbol_table,
capnp::SymbolTable::Builder *builder) {
builder->setPosition(symbol_table.max_position());
auto list_builder = builder->initTable(symbol_table.table().size());
size_t i = 0;
for (const auto &entry : symbol_table.table()) {
auto entry_builder = list_builder[i++];
entry_builder.setKey(entry.first);
auto sym_builder = entry_builder.initVal();
Save(entry.second, &sym_builder);
}
}
inline void Load(SymbolTable *symbol_table,
const capnp::SymbolTable::Reader &reader) {
symbol_table->position_ = reader.getPosition();
symbol_table->table_.clear();
for (const auto &entry_reader : reader.getTable()) {
int key = entry_reader.getKey();
Symbol val;
Load(&val, entry_reader.getVal());
symbol_table->table_[key] = val;
}
}
} // namespace query

View File

@ -6,7 +6,7 @@
#include <utility>
#include <vector>
#include "storage/property_value.hpp"
#include "storage/common/property_value.hpp"
/**
* Encapsulates user provided parameters (and stripped literals)

View File

@ -1223,14 +1223,9 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom,
Frame &frame, Context &context) {
auto &dba = context.db_accessor_;
int current_worker_id = 0;
// TODO: Figure out a better solution.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
current_worker_id = distributed_db->WorkerId();
} else {
CHECK(dynamic_cast<database::SingleNode *>(&dba.db()));
}
auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba.db());
int current_worker_id = distributed_db->WorkerId();
if (worker_id == current_worker_id)
return CreateLocalVertex(node_atom, frame, context);

View File

@ -6,6 +6,8 @@
#include "query/frontend/ast/ast_serialization.hpp"
#include "query/plan/distributed_ops.capnp.h"
#include "query/plan/operator.hpp"
#include "query/serialization.hpp"
#include "storage/distributed/serialization.hpp"
cpp<#
(load "query/plan/operator.lcp")

View File

@ -14,7 +14,7 @@
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol.hpp"
#include "query/typed_value.hpp"
#include "storage/types.hpp"
#include "storage/common/types.hpp"
#include "utils/bound.hpp"
#include "utils/future.hpp"
#include "utils/hashing/fnv.hpp"
@ -138,7 +138,7 @@ cpp<#
(lcp:capnp-namespace "query::plan")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'storage "/storage/distributed/serialization.capnp")
(lcp:capnp-import 'ast "/query/frontend/ast/ast_serialization.capnp")
(lcp:capnp-import 'semantic "/query/frontend/semantic/symbol.capnp")
(lcp:capnp-import 'query "/query/serialization.capnp")

View File

@ -3,8 +3,8 @@
#include <experimental/optional>
#include "storage/property_value.hpp"
#include "storage/types.hpp"
#include "storage/common/property_value.hpp"
#include "storage/common/types.hpp"
#include "utils/bound.hpp"
#include "utils/hashing/fnv.hpp"

View File

@ -2,7 +2,7 @@
using Ast = import "/query/frontend/ast/ast_serialization.capnp";
using Cxx = import "/capnp/c++.capnp";
using Storage = import "/storage/serialization.capnp";
using Storage = import "/storage/distributed/serialization.capnp";
using Utils = import "/utils/serialization.capnp";
$Cxx.namespace("query::capnp");

View File

@ -165,4 +165,26 @@ void LoadEvaluationContext(const capnp::EvaluationContext::Reader &reader,
}
}
void Save(const TypedValueVectorCompare &comparator,
capnp::TypedValueVectorCompare::Builder *builder) {
auto ordering_builder = builder->initOrdering(comparator.ordering().size());
for (size_t i = 0; i < comparator.ordering().size(); ++i) {
ordering_builder.set(i, comparator.ordering()[i] == Ordering::ASC
? capnp::Ordering::ASC
: capnp::Ordering::DESC);
}
}
void Load(TypedValueVectorCompare *comparator,
const capnp::TypedValueVectorCompare::Reader &reader) {
std::vector<Ordering> ordering;
ordering.reserve(reader.getOrdering().size());
for (auto ordering_reader : reader.getOrdering()) {
ordering.push_back(ordering_reader == capnp::Ordering::ASC
? Ordering::ASC
: Ordering::DESC);
}
comparator->ordering_ = ordering;
}
} // namespace query

View File

@ -1,9 +1,13 @@
#pragma once
#include "query/common.hpp"
#include "query/context.hpp"
#include "query/frontend/semantic/symbol.capnp.h"
#include "query/frontend/semantic/symbol.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/serialization.capnp.h"
#include "query/typed_value.hpp"
#include "storage/serialization.hpp"
#include "storage/distributed/serialization.hpp"
namespace distributed {
class DataManager;
@ -26,4 +30,89 @@ void SaveEvaluationContext(const EvaluationContext &ctx,
void LoadEvaluationContext(const capnp::EvaluationContext::Reader &reader,
EvaluationContext *ctx);
void Save(const TypedValueVectorCompare &comparator,
capnp::TypedValueVectorCompare::Builder *builder);
void Load(TypedValueVectorCompare *comparator,
const capnp::TypedValueVectorCompare::Reader &reader);
inline void Save(const Symbol &symbol, capnp::Symbol::Builder *builder) {
builder->setName(symbol.name());
builder->setPosition(symbol.position());
builder->setUserDeclared(symbol.user_declared());
builder->setTokenPosition(symbol.token_position());
switch (symbol.type()) {
case Symbol::Type::Any:
builder->setType(capnp::Symbol::Type::ANY);
break;
case Symbol::Type::Edge:
builder->setType(capnp::Symbol::Type::EDGE);
break;
case Symbol::Type::EdgeList:
builder->setType(capnp::Symbol::Type::EDGE_LIST);
break;
case Symbol::Type::Number:
builder->setType(capnp::Symbol::Type::NUMBER);
break;
case Symbol::Type::Path:
builder->setType(capnp::Symbol::Type::PATH);
break;
case Symbol::Type::Vertex:
builder->setType(capnp::Symbol::Type::VERTEX);
break;
}
}
inline void Load(Symbol *symbol, const capnp::Symbol::Reader &reader) {
symbol->name_ = reader.getName();
symbol->position_ = reader.getPosition();
symbol->user_declared_ = reader.getUserDeclared();
symbol->token_position_ = reader.getTokenPosition();
switch (reader.getType()) {
case capnp::Symbol::Type::ANY:
symbol->type_ = Symbol::Type::Any;
break;
case capnp::Symbol::Type::EDGE:
symbol->type_ = Symbol::Type::Edge;
break;
case capnp::Symbol::Type::EDGE_LIST:
symbol->type_ = Symbol::Type::EdgeList;
break;
case capnp::Symbol::Type::NUMBER:
symbol->type_ = Symbol::Type::Number;
break;
case capnp::Symbol::Type::PATH:
symbol->type_ = Symbol::Type::Path;
break;
case capnp::Symbol::Type::VERTEX:
symbol->type_ = Symbol::Type::Vertex;
break;
}
}
inline void Save(const SymbolTable &symbol_table,
capnp::SymbolTable::Builder *builder) {
builder->setPosition(symbol_table.max_position());
auto list_builder = builder->initTable(symbol_table.table().size());
size_t i = 0;
for (const auto &entry : symbol_table.table()) {
auto entry_builder = list_builder[i++];
entry_builder.setKey(entry.first);
auto sym_builder = entry_builder.initVal();
Save(entry.second, &sym_builder);
}
}
inline void Load(SymbolTable *symbol_table,
const capnp::SymbolTable::Reader &reader) {
symbol_table->position_ = reader.getPosition();
symbol_table->table_.clear();
for (const auto &entry_reader : reader.getTable()) {
int key = entry_reader.getKey();
Symbol val;
Load(&val, entry_reader.getVal());
symbol_table->table_[key] = val;
}
}
} // namespace query

View File

@ -1,6 +1,5 @@
#pragma once
#include "database/distributed_graph_db.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/exceptions.hpp"

View File

@ -9,8 +9,8 @@
#include <vector>
#include "query/path.hpp"
#include "storage/common/property_value.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/property_value.hpp"
#include "storage/vertex_accessor.hpp"
#include "utils/exceptions.hpp"
#include "utils/total_ordering.hpp"

View File

@ -7,7 +7,6 @@
#include "data_structures/concurrent/concurrent_map.hpp"
#include "data_structures/concurrent/skiplist.hpp"
#include "mvcc/version_list.hpp"
#include "transactions/transaction.hpp"
namespace database::index {

View File

@ -1,3 +1,5 @@
#include "storage/common/property_value.hpp"
#include <cmath>
#include <iostream>
#include <memory>
@ -6,8 +8,6 @@
#include "fmt/format.h"
#include "glog/logging.h"
#include "storage/property_value.hpp"
// Value extraction template instantiations
template <>
bool PropertyValue::Value<bool>() const {

View File

@ -1,13 +1,14 @@
#include "storage/common/property_value_store.hpp"
#include <experimental/filesystem>
#include "gflags/gflags.h"
#include "glog/logging.h"
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "glue/communication.hpp"
#include "storage/pod_buffer.hpp"
#include "storage/property_value_store.hpp"
#include "storage/common/pod_buffer.hpp"
namespace fs = std::experimental::filesystem;

View File

@ -5,9 +5,9 @@
#include <string>
#include <vector>
#include "storage/kvstore.hpp"
#include "storage/property_value.hpp"
#include "storage/types.hpp"
#include "storage/common/property_value.hpp"
#include "storage/common/types.hpp"
#include "storage/kvstore/kvstore.hpp"
/**
* A collection of properties accessed in a map-like way using a key of type

View File

@ -4,9 +4,8 @@
#include <functional>
#include <limits>
#include "glog/logging.h"
#include <glog/logging.h>
#include "storage/serialization.capnp.h"
#include "utils/total_ordering.hpp"
namespace storage {
@ -63,16 +62,6 @@ class Common : public utils::TotalOrdering<TSpecificType> {
static constexpr IdT NotMask = ~Mask;
};
template <class Type>
void Save(const Common<Type> &common, capnp::Common::Builder *builder) {
builder->setStorage(common.id_);
}
template <class Type>
void Load(Common<Type> *common, const capnp::Common::Reader &reader) {
common->id_ = reader.getStorage();
}
class Label final : public Common<Label> {
using Common::Common;
};

View File

@ -2,10 +2,9 @@
#include <cstdint>
#include "glog/logging.h"
#include <glog/logging.h>
#include "storage/gid.hpp"
#include "storage/serialization.capnp.h"
#include "storage/distributed/gid.hpp"
namespace storage {
@ -92,14 +91,4 @@ class Address {
StorageT storage_{0};
};
template <typename TLocalObj>
void Save(const Address<TLocalObj> &address, capnp::Address::Builder *builder) {
builder->setStorage(address.raw());
}
template <typename TLocalObj>
void Load(Address<TLocalObj> *address, const capnp::Address::Reader &reader) {
address->storage_ = reader.getStorage();
}
} // namespace storage

View File

@ -1,7 +1,7 @@
#pragma once
#include "mvcc/version_list.hpp"
#include "storage/address.hpp"
#include "mvcc/distributed/version_list.hpp"
#include "storage/distributed/address.hpp"
class Edge;
class Vertex;

View File

@ -1,8 +1,9 @@
#include "glog/logging.h"
#include "storage/distributed/concurrent_id_mapper_master.hpp"
#include "storage/concurrent_id_mapper_master.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.hpp"
#include "storage/types.hpp"
#include <glog/logging.h>
#include "storage/common/types.hpp"
#include "storage/distributed/concurrent_id_mapper_rpc_messages.hpp"
namespace storage {

View File

@ -4,7 +4,7 @@
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/concurrent_id_mapper_single_node.hpp"
#include "storage/distributed/concurrent_id_mapper_single_node.hpp"
namespace storage {

View File

@ -4,8 +4,9 @@
#include <chrono>
#include "communication/rpc/messages.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.capnp.h"
#include "storage/types.hpp"
#include "storage/common/types.hpp"
#include "storage/distributed/concurrent_id_mapper_rpc_messages.capnp.h"
#include "storage/distributed/serialization.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/snapshot.hpp"
#include "transactions/type.hpp"
@ -15,7 +16,7 @@ cpp<#
(lcp:capnp-namespace "storage")
(lcp:capnp-import 's "/storage/serialization.capnp")
(lcp:capnp-import 's "/storage/distributed/serialization.capnp")
(lcp:define-rpc label-id
(:request ((member "std::string")))

View File

@ -3,8 +3,8 @@
#include <mutex>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/concurrent_id_mapper.hpp"
#include "storage/types.hpp"
#include "storage/common/concurrent_id_mapper.hpp"
#include "storage/common/types.hpp"
#include "utils/algorithm.hpp"
namespace storage {

View File

@ -1,8 +1,9 @@
#include "glog/logging.h"
#include "storage/distributed/concurrent_id_mapper_worker.hpp"
#include "concurrent_id_mapper_worker.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.hpp"
#include "storage/types.hpp"
#include <glog/logging.h>
#include "storage/common/types.hpp"
#include "storage/distributed/concurrent_id_mapper_rpc_messages.hpp"
namespace storage {

Some files were not shown because too many files have changed in this diff Show More