From d4b2d76a356bef8042a2fd4fccab8179f993809b Mon Sep 17 00:00:00 2001 From: Matej Ferencevic <matej.ferencevic@memgraph.io> Date: Tue, 14 May 2019 17:23:52 +0200 Subject: [PATCH] Reimplement `counter` openCypher function Reviewers: teon.banek, msantl Reviewed By: teon.banek, msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2047 --- .gitignore | 1 - src/CMakeLists.txt | 2 - src/database/distributed/counters.hpp | 30 ------------ .../distributed/counters_rpc_messages.lcp | 21 -------- .../distributed/distributed_counters.cpp | 49 ------------------- .../distributed/distributed_counters.hpp | 38 -------------- .../distributed/distributed_graph_db.cpp | 7 --- .../distributed/distributed_graph_db.hpp | 2 - src/database/distributed/graph_db.hpp | 2 - .../distributed/graph_db_accessor.cpp | 8 --- .../distributed/graph_db_accessor.hpp | 14 ------ src/database/single_node/counters.hpp | 31 ------------ src/database/single_node/graph_db.cpp | 3 -- src/database/single_node/graph_db.hpp | 3 -- .../single_node/graph_db_accessor.cpp | 8 --- .../single_node/graph_db_accessor.hpp | 14 ------ src/database/single_node_ha/counters.hpp | 31 ------------ src/database/single_node_ha/graph_db.cpp | 3 -- src/database/single_node_ha/graph_db.hpp | 3 -- .../single_node_ha/graph_db_accessor.cpp | 8 --- .../single_node_ha/graph_db_accessor.hpp | 14 ------ src/query/context.hpp | 3 ++ .../interpret/awesome_memgraph_functions.cpp | 45 +++++++++-------- .../memgraph_V1/features/functions.feature | 18 +------ tests/stress/long_running.cpp | 45 +++++++++-------- tests/unit/CMakeLists.txt | 3 -- tests/unit/counters.cpp | 32 ------------ tests/unit/distributed_graph_db.cpp | 10 ---- tests/unit/query_expression_evaluator.cpp | 39 +++++++-------- 29 files changed, 73 insertions(+), 414 deletions(-) delete mode 100644 src/database/distributed/counters.hpp delete mode 100644 src/database/distributed/counters_rpc_messages.lcp delete mode 100644 src/database/distributed/distributed_counters.cpp delete mode 100644 src/database/distributed/distributed_counters.hpp delete mode 100644 src/database/single_node/counters.hpp delete mode 100644 src/database/single_node_ha/counters.hpp delete mode 100644 tests/unit/counters.cpp diff --git a/.gitignore b/.gitignore index 18f338d1f..e1a4187b0 100644 --- a/.gitignore +++ b/.gitignore @@ -37,7 +37,6 @@ TAGS # LCP generated C++ files *.lcp.cpp -src/database/distributed/counters_rpc_messages.hpp src/database/distributed/serialization.hpp src/database/single_node_ha/serialization.hpp src/distributed/bfs_rpc_messages.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b3b855a27..655327bf4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -115,7 +115,6 @@ target_compile_definitions(mg-single-node PUBLIC MG_SINGLE_NODE) set(mg_distributed_sources ${lcp_common_cpp_files} audit/log.cpp - database/distributed/distributed_counters.cpp database/distributed/distributed_graph_db.cpp distributed/bfs_rpc_clients.cpp distributed/bfs_subcursor.cpp @@ -193,7 +192,6 @@ set(mg_distributed_sources define_add_lcp(add_lcp_distributed mg_distributed_sources generated_lcp_distributed_files) add_lcp_distributed(durability/distributed/state_delta.lcp) -add_lcp_distributed(database/distributed/counters_rpc_messages.lcp SLK_SERIALIZE) add_lcp_distributed(database/distributed/serialization.lcp SLK_SERIALIZE DEPENDS durability/distributed/state_delta.lcp) add_lcp_distributed(distributed/bfs_rpc_messages.lcp SLK_SERIALIZE) diff --git a/src/database/distributed/counters.hpp b/src/database/distributed/counters.hpp deleted file mode 100644 index 5ef973ece..000000000 --- a/src/database/distributed/counters.hpp +++ /dev/null @@ -1,30 +0,0 @@ -/** @file */ -#pragma once - -#include <cstdint> -#include <string> - -namespace database { - -/** A set of counter that are guaranteed to produce unique, consecutive values - * on each call. */ -class Counters { - public: - virtual ~Counters() {} - - /** - * Returns the current value of the counter with the given name, and - * increments that counter. If the counter with the given name does not exist, - * a new counter is created and this function returns 0. - */ - virtual int64_t Get(const std::string &name) = 0; - - /** - * Sets the counter with the given name to the given value. Returns nothing. - * If the counter with the given name does not exist, a new counter is created - * and set to the given value. - */ - virtual void Set(const std::string &name, int64_t values) = 0; -}; - -} // namespace database diff --git a/src/database/distributed/counters_rpc_messages.lcp b/src/database/distributed/counters_rpc_messages.lcp deleted file mode 100644 index 9db5f9f2e..000000000 --- a/src/database/distributed/counters_rpc_messages.lcp +++ /dev/null @@ -1,21 +0,0 @@ -#>cpp -#pragma once - -#include <string> - -#include "communication/rpc/messages.hpp" -#include "slk/serialization.hpp" -cpp<# - -(lcp:namespace database) - -(lcp:define-rpc counters-get - (:request ((name "std::string"))) - (:response ((value :int64_t)))) - -(lcp:define-rpc counters-set - (:request ((name "std::string") - (value :int64_t))) - (:response ())) - -(lcp:pop-namespace) ;; database diff --git a/src/database/distributed/distributed_counters.cpp b/src/database/distributed/distributed_counters.cpp deleted file mode 100644 index 6bb8049be..000000000 --- a/src/database/distributed/distributed_counters.cpp +++ /dev/null @@ -1,49 +0,0 @@ -#include "database/distributed/distributed_counters.hpp" - -#include "database/distributed/counters_rpc_messages.hpp" - -namespace database { - -MasterCounters::MasterCounters(distributed::Coordination *coordination) { - coordination->Register<CountersGetRpc>( - [this](auto *req_reader, auto *res_builder) { - CountersGetReq req; - slk::Load(&req, req_reader); - CountersGetRes res(Get(req.name)); - slk::Save(res, res_builder); - }); - coordination->Register<CountersSetRpc>( - [this](auto *req_reader, auto *res_builder) { - CountersSetReq req; - slk::Load(&req, req_reader); - Set(req.name, req.value); - CountersSetRes res; - slk::Save(res, res_builder); - }); -} - -int64_t MasterCounters::Get(const std::string &name) { - return counters_.access() - .emplace(name, std::make_tuple(name), std::make_tuple(0)) - .first->second.fetch_add(1); -} - -void MasterCounters::Set(const std::string &name, int64_t value) { - auto name_counter_pair = counters_.access().emplace( - name, std::make_tuple(name), std::make_tuple(value)); - if (!name_counter_pair.second) name_counter_pair.first->second.store(value); -} - -WorkerCounters::WorkerCounters( - communication::rpc::ClientPool *master_client_pool) - : master_client_pool_(master_client_pool) {} - -int64_t WorkerCounters::Get(const std::string &name) { - return master_client_pool_->Call<CountersGetRpc>(name).value; -} - -void WorkerCounters::Set(const std::string &name, int64_t value) { - master_client_pool_->Call<CountersSetRpc>(name, value); -} - -} // namespace database diff --git a/src/database/distributed/distributed_counters.hpp b/src/database/distributed/distributed_counters.hpp deleted file mode 100644 index 3c926c598..000000000 --- a/src/database/distributed/distributed_counters.hpp +++ /dev/null @@ -1,38 +0,0 @@ -/// @file -#pragma once - -#include <atomic> -#include <cstdint> -#include <string> - -#include "data_structures/concurrent/concurrent_map.hpp" -#include "database/distributed/counters.hpp" -#include "distributed/coordination.hpp" - -namespace database { - -/// Implementation for distributed master -class MasterCounters : public Counters { - public: - explicit MasterCounters(distributed::Coordination *coordination); - - int64_t Get(const std::string &name) override; - void Set(const std::string &name, int64_t value) override; - - private: - ConcurrentMap<std::string, std::atomic<int64_t>> counters_; -}; - -/// Implementation for distributed worker -class WorkerCounters : public Counters { - public: - explicit WorkerCounters(communication::rpc::ClientPool *master_client_pool); - - int64_t Get(const std::string &name) override; - void Set(const std::string &name, int64_t value) override; - - private: - communication::rpc::ClientPool *master_client_pool_; -}; - -} // namespace database diff --git a/src/database/distributed/distributed_graph_db.cpp b/src/database/distributed/distributed_graph_db.cpp index 9132222d4..66df5fb6d 100644 --- a/src/database/distributed/distributed_graph_db.cpp +++ b/src/database/distributed/distributed_graph_db.cpp @@ -1,6 +1,5 @@ #include "database/distributed/distributed_graph_db.hpp" -#include "database/distributed/distributed_counters.hpp" #include "distributed/bfs_rpc_clients.hpp" #include "distributed/bfs_rpc_server.hpp" #include "distributed/bfs_subcursor.hpp" @@ -282,7 +281,6 @@ class Master { std::make_unique<StorageGcMaster>(storage_.get(), &tx_engine_, config_.gc_cycle_sec, &coordination_); TypemapPack<storage::MasterConcurrentIdMapper> typemap_pack_{&coordination_}; - database::MasterCounters counters_{&coordination_}; distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_}; distributed::BfsRpcServer bfs_subcursor_server_{self_, &coordination_, &subcursor_storage_}; @@ -349,8 +347,6 @@ storage::ConcurrentIdMapper<storage::Property> &Master::property_mapper() { return impl_->typemap_pack_.property; } -database::Counters &Master::counters() { return impl_->counters_; } - void Master::CollectGarbage() { impl_->storage_gc_->CollectGarbage(); } int Master::WorkerId() const { return impl_->config_.worker_id; } @@ -649,7 +645,6 @@ class Worker { coordination_.GetClientPool(0), config_.worker_id); TypemapPack<storage::WorkerConcurrentIdMapper> typemap_pack_{ coordination_.GetClientPool(0)}; - database::WorkerCounters counters_{coordination_.GetClientPool(0)}; distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_}; distributed::BfsRpcServer bfs_subcursor_server_{self_, &coordination_, &subcursor_storage_}; @@ -719,8 +714,6 @@ storage::ConcurrentIdMapper<storage::Property> &Worker::property_mapper() { return impl_->typemap_pack_.property; } -database::Counters &Worker::counters() { return impl_->counters_; } - void Worker::CollectGarbage() { return impl_->storage_gc_->CollectGarbage(); } int Worker::WorkerId() const { return impl_->config_.worker_id; } diff --git a/src/database/distributed/distributed_graph_db.hpp b/src/database/distributed/distributed_graph_db.hpp index 92bab6994..5924daf49 100644 --- a/src/database/distributed/distributed_graph_db.hpp +++ b/src/database/distributed/distributed_graph_db.hpp @@ -20,7 +20,6 @@ class Master final : public GraphDb { storage::ConcurrentIdMapper<storage::Label> &label_mapper() override; storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override; storage::ConcurrentIdMapper<storage::Property> &property_mapper() override; - database::Counters &counters() override; void CollectGarbage() override; int WorkerId() const override; std::vector<int> GetWorkerIds() const override; @@ -68,7 +67,6 @@ class Worker final : public GraphDb { storage::ConcurrentIdMapper<storage::Label> &label_mapper() override; storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override; storage::ConcurrentIdMapper<storage::Property> &property_mapper() override; - database::Counters &counters() override; void CollectGarbage() override; int WorkerId() const override; std::vector<int> GetWorkerIds() const override; diff --git a/src/database/distributed/graph_db.hpp b/src/database/distributed/graph_db.hpp index cad7acb0f..f225b9138 100644 --- a/src/database/distributed/graph_db.hpp +++ b/src/database/distributed/graph_db.hpp @@ -5,7 +5,6 @@ #include <memory> #include <vector> -#include "database/distributed/counters.hpp" #include "durability/distributed/recovery.hpp" #include "durability/distributed/wal.hpp" #include "io/network/endpoint.hpp" @@ -118,7 +117,6 @@ class GraphDb { virtual storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() = 0; virtual storage::ConcurrentIdMapper<storage::Property> &property_mapper() = 0; - virtual database::Counters &counters() = 0; virtual void CollectGarbage() = 0; /// Makes a snapshot from the visibility of the given accessor diff --git a/src/database/distributed/graph_db_accessor.cpp b/src/database/distributed/graph_db_accessor.cpp index a32eaecdc..8538044b8 100644 --- a/src/database/distributed/graph_db_accessor.cpp +++ b/src/database/distributed/graph_db_accessor.cpp @@ -547,14 +547,6 @@ const std::string &GraphDbAccessor::PropertyName( return db_.property_mapper().id_to_value(property); } -int64_t GraphDbAccessor::Counter(const std::string &name) { - return db_.counters().Get(name); -} - -void GraphDbAccessor::CounterSet(const std::string &name, int64_t value) { - db_.counters().Set(name, value); -} - std::vector<std::string> GraphDbAccessor::IndexInfo() const { std::vector<std::string> info; for (storage::Label label : db_.storage().labels_index_.Keys()) { diff --git a/src/database/distributed/graph_db_accessor.hpp b/src/database/distributed/graph_db_accessor.hpp index 5edd0b8af..e0a94867b 100644 --- a/src/database/distributed/graph_db_accessor.hpp +++ b/src/database/distributed/graph_db_accessor.hpp @@ -619,20 +619,6 @@ class GraphDbAccessor { auto &db() { return db_; } const auto &db() const { return db_; } - /** - * Returns the current value of the counter with the given name, and - * increments that counter. If the counter with the given name does not exist, - * a new counter is created and this function returns 0. - */ - int64_t Counter(const std::string &name); - - /** - * Sets the counter with the given name to the given value. Returns nothing. - * If the counter with the given name does not exist, a new counter is created - * and set to the given value. - */ - void CounterSet(const std::string &name, int64_t value); - /* Returns a list of index names present in the database. */ std::vector<std::string> IndexInfo() const; diff --git a/src/database/single_node/counters.hpp b/src/database/single_node/counters.hpp deleted file mode 100644 index 5cd681f2b..000000000 --- a/src/database/single_node/counters.hpp +++ /dev/null @@ -1,31 +0,0 @@ -/// @file -#pragma once - -#include <atomic> -#include <cstdint> -#include <string> - -#include "data_structures/concurrent/concurrent_map.hpp" - -namespace database { - -/// Implementation for the single-node memgraph -class Counters { - public: - int64_t Get(const std::string &name) { - return counters_.access() - .emplace(name, std::make_tuple(name), std::make_tuple(0)) - .first->second.fetch_add(1); - } - - void Set(const std::string &name, int64_t value) { - auto name_counter_pair = counters_.access().emplace( - name, std::make_tuple(name), std::make_tuple(value)); - if (!name_counter_pair.second) name_counter_pair.first->second.store(value); - } - - private: - ConcurrentMap<std::string, std::atomic<int64_t>> counters_; -}; - -} // namespace database diff --git a/src/database/single_node/graph_db.cpp b/src/database/single_node/graph_db.cpp index f47bbf598..36c9cf11c 100644 --- a/src/database/single_node/graph_db.cpp +++ b/src/database/single_node/graph_db.cpp @@ -4,7 +4,6 @@ #include <glog/logging.h> -#include "database/single_node/counters.hpp" #include "database/single_node/graph_db_accessor.hpp" #include "durability/single_node/paths.hpp" #include "durability/single_node/recovery.hpp" @@ -127,8 +126,6 @@ storage::ConcurrentIdMapper<storage::Property> &GraphDb::property_mapper() { return property_mapper_; } -database::Counters &GraphDb::counters() { return counters_; } - void GraphDb::CollectGarbage() { storage_gc_->CollectGarbage(); } bool GraphDb::MakeSnapshot(GraphDbAccessor &accessor) { diff --git a/src/database/single_node/graph_db.hpp b/src/database/single_node/graph_db.hpp index e5bee8086..525dd6e35 100644 --- a/src/database/single_node/graph_db.hpp +++ b/src/database/single_node/graph_db.hpp @@ -6,7 +6,6 @@ #include <optional> #include <vector> -#include "database/single_node/counters.hpp" #include "durability/single_node/recovery.hpp" #include "durability/single_node/wal.hpp" #include "io/network/endpoint.hpp" @@ -104,7 +103,6 @@ class GraphDb { storage::ConcurrentIdMapper<storage::Label> &label_mapper(); storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper(); storage::ConcurrentIdMapper<storage::Property> &property_mapper(); - database::Counters &counters(); void CollectGarbage(); /// Makes a snapshot from the visibility of the given accessor @@ -166,7 +164,6 @@ class GraphDb { storage_->PropertiesOnDisk()}; storage::ConcurrentIdMapper<storage::Property> property_mapper_{ storage_->PropertiesOnDisk()}; - database::Counters counters_; }; } // namespace database diff --git a/src/database/single_node/graph_db_accessor.cpp b/src/database/single_node/graph_db_accessor.cpp index aa58c6d61..9e03998c9 100644 --- a/src/database/single_node/graph_db_accessor.cpp +++ b/src/database/single_node/graph_db_accessor.cpp @@ -513,14 +513,6 @@ const std::string &GraphDbAccessor::PropertyName( return db_->property_mapper().id_to_value(property); } -int64_t GraphDbAccessor::Counter(const std::string &name) { - return db_->counters().Get(name); -} - -void GraphDbAccessor::CounterSet(const std::string &name, int64_t value) { - db_->counters().Set(name, value); -} - std::vector<std::string> GraphDbAccessor::IndexInfo() const { std::vector<std::string> info; for (storage::Label label : db_->storage().labels_index_.Keys()) { diff --git a/src/database/single_node/graph_db_accessor.hpp b/src/database/single_node/graph_db_accessor.hpp index 01910af67..4e84a4f1b 100644 --- a/src/database/single_node/graph_db_accessor.hpp +++ b/src/database/single_node/graph_db_accessor.hpp @@ -590,20 +590,6 @@ class GraphDbAccessor { GraphDb &db() { return *db_; } const GraphDb &db() const { return *db_; } - /** - * Returns the current value of the counter with the given name, and - * increments that counter. If the counter with the given name does not exist, - * a new counter is created and this function returns 0. - */ - int64_t Counter(const std::string &name); - - /** - * Sets the counter with the given name to the given value. Returns nothing. - * If the counter with the given name does not exist, a new counter is created - * and set to the given value. - */ - void CounterSet(const std::string &name, int64_t value); - /* Returns a list of index names present in the database. */ std::vector<std::string> IndexInfo() const; diff --git a/src/database/single_node_ha/counters.hpp b/src/database/single_node_ha/counters.hpp deleted file mode 100644 index 5cd681f2b..000000000 --- a/src/database/single_node_ha/counters.hpp +++ /dev/null @@ -1,31 +0,0 @@ -/// @file -#pragma once - -#include <atomic> -#include <cstdint> -#include <string> - -#include "data_structures/concurrent/concurrent_map.hpp" - -namespace database { - -/// Implementation for the single-node memgraph -class Counters { - public: - int64_t Get(const std::string &name) { - return counters_.access() - .emplace(name, std::make_tuple(name), std::make_tuple(0)) - .first->second.fetch_add(1); - } - - void Set(const std::string &name, int64_t value) { - auto name_counter_pair = counters_.access().emplace( - name, std::make_tuple(name), std::make_tuple(value)); - if (!name_counter_pair.second) name_counter_pair.first->second.store(value); - } - - private: - ConcurrentMap<std::string, std::atomic<int64_t>> counters_; -}; - -} // namespace database diff --git a/src/database/single_node_ha/graph_db.cpp b/src/database/single_node_ha/graph_db.cpp index 505dab053..3bac93fd4 100644 --- a/src/database/single_node_ha/graph_db.cpp +++ b/src/database/single_node_ha/graph_db.cpp @@ -4,7 +4,6 @@ #include <glog/logging.h> -#include "database/single_node_ha/counters.hpp" #include "database/single_node_ha/graph_db_accessor.hpp" #include "storage/single_node_ha/concurrent_id_mapper.hpp" #include "storage/single_node_ha/storage_gc.hpp" @@ -84,8 +83,6 @@ storage::ConcurrentIdMapper<storage::Property> &GraphDb::property_mapper() { return property_mapper_; } -database::Counters &GraphDb::counters() { return counters_; } - void GraphDb::CollectGarbage() { storage_gc_->CollectGarbage(); } void GraphDb::Reset() { diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index 60bbf804f..da907fbb0 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -7,7 +7,6 @@ #include <vector> #include "database/single_node_ha/config.hpp" -#include "database/single_node_ha/counters.hpp" #include "io/network/endpoint.hpp" #include "raft/coordination.hpp" #include "raft/raft_server.hpp" @@ -88,7 +87,6 @@ class GraphDb { storage::ConcurrentIdMapper<storage::Label> &label_mapper(); storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper(); storage::ConcurrentIdMapper<storage::Property> &property_mapper(); - database::Counters &counters(); void CollectGarbage(); /// Releases the storage object safely and creates a new object, resets the tx @@ -150,7 +148,6 @@ class GraphDb { storage_->PropertiesOnDisk()}; storage::ConcurrentIdMapper<storage::Property> property_mapper_{ storage_->PropertiesOnDisk()}; - database::Counters counters_; }; } // namespace database diff --git a/src/database/single_node_ha/graph_db_accessor.cpp b/src/database/single_node_ha/graph_db_accessor.cpp index a12c4a664..ec13ab415 100644 --- a/src/database/single_node_ha/graph_db_accessor.cpp +++ b/src/database/single_node_ha/graph_db_accessor.cpp @@ -420,14 +420,6 @@ const std::string &GraphDbAccessor::PropertyName( return db_->property_mapper().id_to_value(property); } -int64_t GraphDbAccessor::Counter(const std::string &name) { - return db_->counters().Get(name); -} - -void GraphDbAccessor::CounterSet(const std::string &name, int64_t value) { - db_->counters().Set(name, value); -} - std::vector<std::string> GraphDbAccessor::IndexInfo() const { std::vector<std::string> info; for (storage::Label label : db_->storage().labels_index_.Keys()) { diff --git a/src/database/single_node_ha/graph_db_accessor.hpp b/src/database/single_node_ha/graph_db_accessor.hpp index 4e6af03cf..87b8fa9be 100644 --- a/src/database/single_node_ha/graph_db_accessor.hpp +++ b/src/database/single_node_ha/graph_db_accessor.hpp @@ -575,20 +575,6 @@ class GraphDbAccessor { auto &db() { return db_; } const auto &db() const { return db_; } - /** - * Returns the current value of the counter with the given name, and - * increments that counter. If the counter with the given name does not exist, - * a new counter is created and this function returns 0. - */ - int64_t Counter(const std::string &name); - - /** - * Sets the counter with the given name to the given value. Returns nothing. - * If the counter with the given name does not exist, a new counter is created - * and set to the given value. - */ - void CounterSet(const std::string &name, int64_t value); - /* Returns a list of index names present in the database. */ std::vector<std::string> IndexInfo() const; diff --git a/src/query/context.hpp b/src/query/context.hpp index bb119e5cc..9e70e76a9 100644 --- a/src/query/context.hpp +++ b/src/query/context.hpp @@ -16,6 +16,9 @@ struct EvaluationContext { std::vector<storage::Property> properties; /// All labels indexable via LabelIx std::vector<storage::Label> labels; + /// All counters generated by `counter` function, mutable because the function + /// modifies the values + mutable std::unordered_map<std::string, int64_t> counters; }; inline std::vector<storage::Property> NamesToProperties( diff --git a/src/query/interpret/awesome_memgraph_functions.cpp b/src/query/interpret/awesome_memgraph_functions.cpp index dd14cc3af..c9c5fcf91 100644 --- a/src/query/interpret/awesome_memgraph_functions.cpp +++ b/src/query/interpret/awesome_memgraph_functions.cpp @@ -649,32 +649,34 @@ TypedValue Assert(TypedValue *args, int64_t nargs, const EvaluationContext &, return args[0]; } -TypedValue Counter(TypedValue *args, int64_t nargs, const EvaluationContext &, - database::GraphDbAccessor *dba) { - if (nargs != 1) { - throw QueryRuntimeException("'counter' requires exactly one argument."); - } - if (!args[0].IsString()) - throw QueryRuntimeException("'counter' argument must be a string."); - - return dba->Counter(args[0].ValueString()); -} - -TypedValue CounterSet(TypedValue *args, int64_t nargs, - const EvaluationContext &, - database::GraphDbAccessor *dba) { - if (nargs != 2) { - throw QueryRuntimeException("'counterSet' requires two arguments."); +#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA) +TypedValue Counter(TypedValue *args, int64_t nargs, + const EvaluationContext &context, + database::GraphDbAccessor *) { + if (nargs < 2 || nargs > 3) { + throw QueryRuntimeException("'counter' requires two or three arguments."); } if (!args[0].IsString()) throw QueryRuntimeException( - "First argument of 'counterSet' must be a string."); + "First argument of 'counter' must be a string."); if (!args[1].IsInt()) throw QueryRuntimeException( - "Second argument of 'counterSet' must be an integer."); - dba->CounterSet(args[0].ValueString(), args[1].ValueInt()); - return TypedValue::Null; + "Second argument of 'counter' must be an integer."); + if (nargs == 3 && !args[2].IsInt()) + throw QueryRuntimeException( + "Third argument of 'counter' must be an integer."); + + int64_t step = 1; + if (nargs == 3) step = args[2].ValueInt(); + + auto [it, inserted] = + context.counters.emplace(args[0].ValueString(), args[1].ValueInt()); + auto value = it->second; + it->second += step; + + return value; } +#endif #ifdef MG_DISTRIBUTED TypedValue WorkerId(TypedValue *args, int64_t nargs, const EvaluationContext &, @@ -983,8 +985,9 @@ NameToFunction(const std::string &function_name) { // Memgraph specific functions if (function_name == "ASSERT") return Assert; +#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA) if (function_name == "COUNTER") return Counter; - if (function_name == "COUNTERSET") return CounterSet; +#endif #ifdef MG_SINGLE_NODE if (function_name == "DUMP") return Dump; #endif diff --git a/tests/qa/tests/memgraph_V1/features/functions.feature b/tests/qa/tests/memgraph_V1/features/functions.feature index b94e4c56b..e373c4a1e 100644 --- a/tests/qa/tests/memgraph_V1/features/functions.feature +++ b/tests/qa/tests/memgraph_V1/features/functions.feature @@ -785,28 +785,14 @@ Feature: Functions """ When executing query: """ - MATCH (n) SET n.id = counter("n.id") WITH n SKIP 1 - RETURN n.id, counter("other") AS c2 + MATCH (n) SET n.id = counter("n.id", 0) WITH n SKIP 1 + RETURN n.id, counter("other", 0) AS c2 """ Then the result should be: | n.id | c2 | | 1 | 0 | | 2 | 1 | - - Scenario: CounterSet test: - Given an empty graph - When executing query: - """ - WITH counter("n") AS zero - WITH counter("n") AS one, zero - WITH counterSet("n", 42) AS nothing, zero, one - RETURN counter("n") AS n, zero, one, counter("n2") AS n2 - """ - Then the result should be: - | n | zero | one | n2 | - | 42 | 0 | 1 | 0 | - Scenario: Vertex Id test: Given an empty graph And having executed: diff --git a/tests/stress/long_running.cpp b/tests/stress/long_running.cpp index ad88cfe81..fc2745e97 100644 --- a/tests/stress/long_running.cpp +++ b/tests/stress/long_running.cpp @@ -67,6 +67,9 @@ class GraphSession { ClientContextT context_{FLAGS_use_ssl}; std::unique_ptr<ClientT> client_; + uint64_t vertex_id_{0}; + uint64_t edge_id_{0}; + std::set<uint64_t> vertices_; std::set<uint64_t> edges_; @@ -123,15 +126,17 @@ class GraphSession { void CreateVertices(uint64_t vertices_count) { if (vertices_count == 0) return; - auto ret = - Execute(fmt::format("UNWIND RANGE(1, {}) AS r CREATE (n:{} {{id: " - "counter(\"vertex{}\")}}) RETURN min(n.id)", - vertices_count, indexed_label_, id_)); + auto ret = Execute(fmt::format( + "UNWIND RANGE({}, {}) AS r CREATE (n:{} {{id: r}}) RETURN count(n)", + vertex_id_, vertex_id_ + vertices_count - 1, indexed_label_)); CHECK(ret.records.size() == 1) << "Vertices creation failed!"; - uint64_t min_id = ret.records[0][0].ValueInt(); + CHECK(ret.records[0][0].ValueInt() == vertices_count) + << "Created " << ret.records[0][0].ValueInt() << " vertices instead of " + << vertices_count << "!"; for (uint64_t i = 0; i < vertices_count; ++i) { - vertices_.insert(min_id + i); + vertices_.insert(vertex_id_ + i); } + vertex_id_ += vertices_count; } void RemoveVertex() { @@ -173,27 +178,29 @@ class GraphSession { "MATCH (a:{0}) WITH a " "UNWIND range(0, {1}) AS i WITH a, tointeger(rand() * {2}) AS id " "MATCH (b:{0} {{id: id}}) WITH a, b " - "CREATE (a)-[e:EdgeType {{id: counter(\"edge{3}\")}}]->(b) RETURN " - "min(e.id), count(e)", - indexed_label_, (int64_t)edges_per_node - 1, vertices_.size(), id_)); + "CREATE (a)-[e:EdgeType {{id: counter(\"edge\", {3})}}]->(b) " + "RETURN count(e)", + indexed_label_, (int64_t)edges_per_node - 1, vertices_.size(), + edge_id_)); CHECK(ret.records.size() == 1) << "Failed to create edges"; - uint64_t min_id = ret.records[0][0].ValueInt(); - uint64_t count = ret.records[0][1].ValueInt(); + uint64_t count = ret.records[0][0].ValueInt(); for (uint64_t i = 0; i < count; ++i) { - edges_.insert(min_id + i); + edges_.insert(edge_id_ + i); } + edge_id_ += count; } void CreateEdge() { - auto ret = - Execute(fmt::format("MATCH (from:{} {{id: {}}}), (to:{} {{id: {}}}) " - "CREATE (from)-[e:EdgeType {{id: " - "counter(\"edge{}\")}}]->(to) RETURN e.id", - indexed_label_, RandomElement(vertices_), - indexed_label_, RandomElement(vertices_), id_)); + auto ret = Execute( + fmt::format("MATCH (from:{} {{id: {}}}), (to:{} {{id: {}}}) " + "CREATE (from)-[e:EdgeType {{id: " + "counter(\"edge\", {})}}]->(to) RETURN e.id", + indexed_label_, RandomElement(vertices_), indexed_label_, + RandomElement(vertices_), edge_id_)); if (ret.records.size() > 0) { edges_.insert(ret.records[0][0].ValueInt()); + edge_id_ += 1; } } @@ -384,8 +391,6 @@ int main(int argc, char **argv) { client.Execute("MATCH (n) DETACH DELETE n", {}); for (int i = 0; i < FLAGS_worker_count; ++i) { client.Execute(fmt::format("CREATE INDEX ON :indexed_label{}(id)", i), {}); - client.Execute(fmt::format("RETURN counterSet(\"vertex{}\", 0)", i), {}); - client.Execute(fmt::format("RETURN counterSet(\"edge{}\", 0)", i), {}); } // close client diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index bdbd36cd3..f9ed50181 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -39,9 +39,6 @@ target_link_libraries(${test_prefix}concurrent_map_access mg-single-node kvstore add_unit_test(concurrent_map.cpp) target_link_libraries(${test_prefix}concurrent_map mg-single-node kvstore_dummy_lib) -add_unit_test(counters.cpp) -target_link_libraries(${test_prefix}counters mg-distributed kvstore_dummy_lib) - add_unit_test(cypher_main_visitor.cpp) target_link_libraries(${test_prefix}cypher_main_visitor mg-single-node kvstore_dummy_lib) diff --git a/tests/unit/counters.cpp b/tests/unit/counters.cpp deleted file mode 100644 index 2ee0b73ae..000000000 --- a/tests/unit/counters.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include <gtest/gtest.h> - -#include "communication/rpc/client_pool.hpp" -#include "communication/rpc/server.hpp" -#include "database/distributed/distributed_counters.hpp" - -#include "test_coordination.hpp" - -TEST(CountersDistributed, All) { - TestMasterCoordination coordination; - database::MasterCounters master(&coordination); - coordination.Start(); - - communication::rpc::ClientPool master_client_pool( - coordination.GetServerEndpoint()); - - database::WorkerCounters w1(&master_client_pool); - database::WorkerCounters w2(&master_client_pool); - - EXPECT_EQ(w1.Get("a"), 0); - EXPECT_EQ(w1.Get("a"), 1); - EXPECT_EQ(w2.Get("a"), 2); - EXPECT_EQ(w1.Get("a"), 3); - EXPECT_EQ(master.Get("a"), 4); - - EXPECT_EQ(master.Get("b"), 0); - EXPECT_EQ(w2.Get("b"), 1); - w1.Set("b", 42); - EXPECT_EQ(w2.Get("b"), 42); - - coordination.Stop(); -} diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index eb45973ec..a800fa46f 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -94,16 +94,6 @@ TEST_F(DistributedGraphDb, StorageTypes) { std::vector<storage::Property>{}); } -TEST_F(DistributedGraphDb, Counters) { - EXPECT_EQ(master().counters().Get("a"), 0); - EXPECT_EQ(worker(1).counters().Get("a"), 1); - EXPECT_EQ(worker(2).counters().Get("a"), 2); - - EXPECT_EQ(worker(1).counters().Get("b"), 0); - EXPECT_EQ(worker(2).counters().Get("b"), 1); - EXPECT_EQ(master().counters().Get("b"), 2); -} - TEST_F(DistributedGraphDb, DispatchPlan) { auto kRPCWaitTime = 600ms; int64_t plan_id = 5; diff --git a/tests/unit/query_expression_evaluator.cpp b/tests/unit/query_expression_evaluator.cpp index 8aa484bfd..817cdfd80 100644 --- a/tests/unit/query_expression_evaluator.cpp +++ b/tests/unit/query_expression_evaluator.cpp @@ -1457,28 +1457,27 @@ TEST_F(FunctionTest, Assert) { TEST_F(FunctionTest, Counter) { EXPECT_THROW(EvaluateFunction("COUNTER", {}), QueryRuntimeException); + EXPECT_THROW(EvaluateFunction("COUNTER", {"a"}), QueryRuntimeException); EXPECT_THROW(EvaluateFunction("COUNTER", {"a", "b"}), QueryRuntimeException); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 0); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 1); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c2"}).ValueInt(), 0); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 2); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c2"}).ValueInt(), 1); -} + EXPECT_THROW(EvaluateFunction("COUNTER", {"a", "b", "c"}), QueryRuntimeException); -TEST_F(FunctionTest, CounterSet) { - EXPECT_THROW(EvaluateFunction("COUNTERSET", {}), QueryRuntimeException); - EXPECT_THROW(EvaluateFunction("COUNTERSET", {"a"}), QueryRuntimeException); - EXPECT_THROW(EvaluateFunction("COUNTERSET", {"a", "b"}), - QueryRuntimeException); - EXPECT_THROW(EvaluateFunction("COUNTERSET", {"a", 11, 12}), - QueryRuntimeException); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 0); - EvaluateFunction("COUNTERSET", {"c1", 12}); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 12); - EvaluateFunction("COUNTERSET", {"c2", 42}); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c2"}).ValueInt(), 42); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 13); - EXPECT_EQ(EvaluateFunction("COUNTER", {"c2"}).ValueInt(), 43); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c1", 0}).ValueInt(), 0); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c1", 0}).ValueInt(), 1); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c2", 0}).ValueInt(), 0); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c1", 0}).ValueInt(), 2); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c2", 0}).ValueInt(), 1); + + EXPECT_EQ(EvaluateFunction("COUNTER", {"c3", -1}).ValueInt(), -1); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c3", -1}).ValueInt(), 0); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c3", -1}).ValueInt(), 1); + + EXPECT_EQ(EvaluateFunction("COUNTER", {"c4", 0, 5}).ValueInt(), 0); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c4", 0, 5}).ValueInt(), 5); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c4", 0, 5}).ValueInt(), 10); + + EXPECT_EQ(EvaluateFunction("COUNTER", {"c5", 0, -5}).ValueInt(), 0); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c5", 0, -5}).ValueInt(), -5); + EXPECT_EQ(EvaluateFunction("COUNTER", {"c5", 0, -5}).ValueInt(), -10); } TEST_F(FunctionTest, Id) {