Reimplement counter
openCypher function
Reviewers: teon.banek, msantl Reviewed By: teon.banek, msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2047
This commit is contained in:
parent
98a853a95c
commit
d4b2d76a35
1
.gitignore
vendored
1
.gitignore
vendored
@ -37,7 +37,6 @@ TAGS
|
||||
# LCP generated C++ files
|
||||
*.lcp.cpp
|
||||
|
||||
src/database/distributed/counters_rpc_messages.hpp
|
||||
src/database/distributed/serialization.hpp
|
||||
src/database/single_node_ha/serialization.hpp
|
||||
src/distributed/bfs_rpc_messages.hpp
|
||||
|
@ -115,7 +115,6 @@ target_compile_definitions(mg-single-node PUBLIC MG_SINGLE_NODE)
|
||||
set(mg_distributed_sources
|
||||
${lcp_common_cpp_files}
|
||||
audit/log.cpp
|
||||
database/distributed/distributed_counters.cpp
|
||||
database/distributed/distributed_graph_db.cpp
|
||||
distributed/bfs_rpc_clients.cpp
|
||||
distributed/bfs_subcursor.cpp
|
||||
@ -193,7 +192,6 @@ set(mg_distributed_sources
|
||||
define_add_lcp(add_lcp_distributed mg_distributed_sources generated_lcp_distributed_files)
|
||||
|
||||
add_lcp_distributed(durability/distributed/state_delta.lcp)
|
||||
add_lcp_distributed(database/distributed/counters_rpc_messages.lcp SLK_SERIALIZE)
|
||||
add_lcp_distributed(database/distributed/serialization.lcp SLK_SERIALIZE
|
||||
DEPENDS durability/distributed/state_delta.lcp)
|
||||
add_lcp_distributed(distributed/bfs_rpc_messages.lcp SLK_SERIALIZE)
|
||||
|
@ -1,30 +0,0 @@
|
||||
/** @file */
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
namespace database {
|
||||
|
||||
/** A set of counter that are guaranteed to produce unique, consecutive values
|
||||
* on each call. */
|
||||
class Counters {
|
||||
public:
|
||||
virtual ~Counters() {}
|
||||
|
||||
/**
|
||||
* Returns the current value of the counter with the given name, and
|
||||
* increments that counter. If the counter with the given name does not exist,
|
||||
* a new counter is created and this function returns 0.
|
||||
*/
|
||||
virtual int64_t Get(const std::string &name) = 0;
|
||||
|
||||
/**
|
||||
* Sets the counter with the given name to the given value. Returns nothing.
|
||||
* If the counter with the given name does not exist, a new counter is created
|
||||
* and set to the given value.
|
||||
*/
|
||||
virtual void Set(const std::string &name, int64_t values) = 0;
|
||||
};
|
||||
|
||||
} // namespace database
|
@ -1,21 +0,0 @@
|
||||
#>cpp
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "slk/serialization.hpp"
|
||||
cpp<#
|
||||
|
||||
(lcp:namespace database)
|
||||
|
||||
(lcp:define-rpc counters-get
|
||||
(:request ((name "std::string")))
|
||||
(:response ((value :int64_t))))
|
||||
|
||||
(lcp:define-rpc counters-set
|
||||
(:request ((name "std::string")
|
||||
(value :int64_t)))
|
||||
(:response ()))
|
||||
|
||||
(lcp:pop-namespace) ;; database
|
@ -1,49 +0,0 @@
|
||||
#include "database/distributed/distributed_counters.hpp"
|
||||
|
||||
#include "database/distributed/counters_rpc_messages.hpp"
|
||||
|
||||
namespace database {
|
||||
|
||||
MasterCounters::MasterCounters(distributed::Coordination *coordination) {
|
||||
coordination->Register<CountersGetRpc>(
|
||||
[this](auto *req_reader, auto *res_builder) {
|
||||
CountersGetReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
CountersGetRes res(Get(req.name));
|
||||
slk::Save(res, res_builder);
|
||||
});
|
||||
coordination->Register<CountersSetRpc>(
|
||||
[this](auto *req_reader, auto *res_builder) {
|
||||
CountersSetReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
Set(req.name, req.value);
|
||||
CountersSetRes res;
|
||||
slk::Save(res, res_builder);
|
||||
});
|
||||
}
|
||||
|
||||
int64_t MasterCounters::Get(const std::string &name) {
|
||||
return counters_.access()
|
||||
.emplace(name, std::make_tuple(name), std::make_tuple(0))
|
||||
.first->second.fetch_add(1);
|
||||
}
|
||||
|
||||
void MasterCounters::Set(const std::string &name, int64_t value) {
|
||||
auto name_counter_pair = counters_.access().emplace(
|
||||
name, std::make_tuple(name), std::make_tuple(value));
|
||||
if (!name_counter_pair.second) name_counter_pair.first->second.store(value);
|
||||
}
|
||||
|
||||
WorkerCounters::WorkerCounters(
|
||||
communication::rpc::ClientPool *master_client_pool)
|
||||
: master_client_pool_(master_client_pool) {}
|
||||
|
||||
int64_t WorkerCounters::Get(const std::string &name) {
|
||||
return master_client_pool_->Call<CountersGetRpc>(name).value;
|
||||
}
|
||||
|
||||
void WorkerCounters::Set(const std::string &name, int64_t value) {
|
||||
master_client_pool_->Call<CountersSetRpc>(name, value);
|
||||
}
|
||||
|
||||
} // namespace database
|
@ -1,38 +0,0 @@
|
||||
/// @file
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "database/distributed/counters.hpp"
|
||||
#include "distributed/coordination.hpp"
|
||||
|
||||
namespace database {
|
||||
|
||||
/// Implementation for distributed master
|
||||
class MasterCounters : public Counters {
|
||||
public:
|
||||
explicit MasterCounters(distributed::Coordination *coordination);
|
||||
|
||||
int64_t Get(const std::string &name) override;
|
||||
void Set(const std::string &name, int64_t value) override;
|
||||
|
||||
private:
|
||||
ConcurrentMap<std::string, std::atomic<int64_t>> counters_;
|
||||
};
|
||||
|
||||
/// Implementation for distributed worker
|
||||
class WorkerCounters : public Counters {
|
||||
public:
|
||||
explicit WorkerCounters(communication::rpc::ClientPool *master_client_pool);
|
||||
|
||||
int64_t Get(const std::string &name) override;
|
||||
void Set(const std::string &name, int64_t value) override;
|
||||
|
||||
private:
|
||||
communication::rpc::ClientPool *master_client_pool_;
|
||||
};
|
||||
|
||||
} // namespace database
|
@ -1,6 +1,5 @@
|
||||
#include "database/distributed/distributed_graph_db.hpp"
|
||||
|
||||
#include "database/distributed/distributed_counters.hpp"
|
||||
#include "distributed/bfs_rpc_clients.hpp"
|
||||
#include "distributed/bfs_rpc_server.hpp"
|
||||
#include "distributed/bfs_subcursor.hpp"
|
||||
@ -282,7 +281,6 @@ class Master {
|
||||
std::make_unique<StorageGcMaster>(storage_.get(), &tx_engine_,
|
||||
config_.gc_cycle_sec, &coordination_);
|
||||
TypemapPack<storage::MasterConcurrentIdMapper> typemap_pack_{&coordination_};
|
||||
database::MasterCounters counters_{&coordination_};
|
||||
distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_};
|
||||
distributed::BfsRpcServer bfs_subcursor_server_{self_, &coordination_,
|
||||
&subcursor_storage_};
|
||||
@ -349,8 +347,6 @@ storage::ConcurrentIdMapper<storage::Property> &Master::property_mapper() {
|
||||
return impl_->typemap_pack_.property;
|
||||
}
|
||||
|
||||
database::Counters &Master::counters() { return impl_->counters_; }
|
||||
|
||||
void Master::CollectGarbage() { impl_->storage_gc_->CollectGarbage(); }
|
||||
|
||||
int Master::WorkerId() const { return impl_->config_.worker_id; }
|
||||
@ -649,7 +645,6 @@ class Worker {
|
||||
coordination_.GetClientPool(0), config_.worker_id);
|
||||
TypemapPack<storage::WorkerConcurrentIdMapper> typemap_pack_{
|
||||
coordination_.GetClientPool(0)};
|
||||
database::WorkerCounters counters_{coordination_.GetClientPool(0)};
|
||||
distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_};
|
||||
distributed::BfsRpcServer bfs_subcursor_server_{self_, &coordination_,
|
||||
&subcursor_storage_};
|
||||
@ -719,8 +714,6 @@ storage::ConcurrentIdMapper<storage::Property> &Worker::property_mapper() {
|
||||
return impl_->typemap_pack_.property;
|
||||
}
|
||||
|
||||
database::Counters &Worker::counters() { return impl_->counters_; }
|
||||
|
||||
void Worker::CollectGarbage() { return impl_->storage_gc_->CollectGarbage(); }
|
||||
|
||||
int Worker::WorkerId() const { return impl_->config_.worker_id; }
|
||||
|
@ -20,7 +20,6 @@ class Master final : public GraphDb {
|
||||
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
|
||||
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
|
||||
storage::ConcurrentIdMapper<storage::Property> &property_mapper() override;
|
||||
database::Counters &counters() override;
|
||||
void CollectGarbage() override;
|
||||
int WorkerId() const override;
|
||||
std::vector<int> GetWorkerIds() const override;
|
||||
@ -68,7 +67,6 @@ class Worker final : public GraphDb {
|
||||
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
|
||||
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
|
||||
storage::ConcurrentIdMapper<storage::Property> &property_mapper() override;
|
||||
database::Counters &counters() override;
|
||||
void CollectGarbage() override;
|
||||
int WorkerId() const override;
|
||||
std::vector<int> GetWorkerIds() const override;
|
||||
|
@ -5,7 +5,6 @@
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "database/distributed/counters.hpp"
|
||||
#include "durability/distributed/recovery.hpp"
|
||||
#include "durability/distributed/wal.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
@ -118,7 +117,6 @@ class GraphDb {
|
||||
virtual storage::ConcurrentIdMapper<storage::EdgeType>
|
||||
&edge_type_mapper() = 0;
|
||||
virtual storage::ConcurrentIdMapper<storage::Property> &property_mapper() = 0;
|
||||
virtual database::Counters &counters() = 0;
|
||||
virtual void CollectGarbage() = 0;
|
||||
|
||||
/// Makes a snapshot from the visibility of the given accessor
|
||||
|
@ -547,14 +547,6 @@ const std::string &GraphDbAccessor::PropertyName(
|
||||
return db_.property_mapper().id_to_value(property);
|
||||
}
|
||||
|
||||
int64_t GraphDbAccessor::Counter(const std::string &name) {
|
||||
return db_.counters().Get(name);
|
||||
}
|
||||
|
||||
void GraphDbAccessor::CounterSet(const std::string &name, int64_t value) {
|
||||
db_.counters().Set(name, value);
|
||||
}
|
||||
|
||||
std::vector<std::string> GraphDbAccessor::IndexInfo() const {
|
||||
std::vector<std::string> info;
|
||||
for (storage::Label label : db_.storage().labels_index_.Keys()) {
|
||||
|
@ -619,20 +619,6 @@ class GraphDbAccessor {
|
||||
auto &db() { return db_; }
|
||||
const auto &db() const { return db_; }
|
||||
|
||||
/**
|
||||
* Returns the current value of the counter with the given name, and
|
||||
* increments that counter. If the counter with the given name does not exist,
|
||||
* a new counter is created and this function returns 0.
|
||||
*/
|
||||
int64_t Counter(const std::string &name);
|
||||
|
||||
/**
|
||||
* Sets the counter with the given name to the given value. Returns nothing.
|
||||
* If the counter with the given name does not exist, a new counter is created
|
||||
* and set to the given value.
|
||||
*/
|
||||
void CounterSet(const std::string &name, int64_t value);
|
||||
|
||||
/* Returns a list of index names present in the database. */
|
||||
std::vector<std::string> IndexInfo() const;
|
||||
|
||||
|
@ -1,31 +0,0 @@
|
||||
/// @file
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
|
||||
namespace database {
|
||||
|
||||
/// Implementation for the single-node memgraph
|
||||
class Counters {
|
||||
public:
|
||||
int64_t Get(const std::string &name) {
|
||||
return counters_.access()
|
||||
.emplace(name, std::make_tuple(name), std::make_tuple(0))
|
||||
.first->second.fetch_add(1);
|
||||
}
|
||||
|
||||
void Set(const std::string &name, int64_t value) {
|
||||
auto name_counter_pair = counters_.access().emplace(
|
||||
name, std::make_tuple(name), std::make_tuple(value));
|
||||
if (!name_counter_pair.second) name_counter_pair.first->second.store(value);
|
||||
}
|
||||
|
||||
private:
|
||||
ConcurrentMap<std::string, std::atomic<int64_t>> counters_;
|
||||
};
|
||||
|
||||
} // namespace database
|
@ -4,7 +4,6 @@
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "database/single_node/counters.hpp"
|
||||
#include "database/single_node/graph_db_accessor.hpp"
|
||||
#include "durability/single_node/paths.hpp"
|
||||
#include "durability/single_node/recovery.hpp"
|
||||
@ -127,8 +126,6 @@ storage::ConcurrentIdMapper<storage::Property> &GraphDb::property_mapper() {
|
||||
return property_mapper_;
|
||||
}
|
||||
|
||||
database::Counters &GraphDb::counters() { return counters_; }
|
||||
|
||||
void GraphDb::CollectGarbage() { storage_gc_->CollectGarbage(); }
|
||||
|
||||
bool GraphDb::MakeSnapshot(GraphDbAccessor &accessor) {
|
||||
|
@ -6,7 +6,6 @@
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
|
||||
#include "database/single_node/counters.hpp"
|
||||
#include "durability/single_node/recovery.hpp"
|
||||
#include "durability/single_node/wal.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
@ -104,7 +103,6 @@ class GraphDb {
|
||||
storage::ConcurrentIdMapper<storage::Label> &label_mapper();
|
||||
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper();
|
||||
storage::ConcurrentIdMapper<storage::Property> &property_mapper();
|
||||
database::Counters &counters();
|
||||
void CollectGarbage();
|
||||
|
||||
/// Makes a snapshot from the visibility of the given accessor
|
||||
@ -166,7 +164,6 @@ class GraphDb {
|
||||
storage_->PropertiesOnDisk()};
|
||||
storage::ConcurrentIdMapper<storage::Property> property_mapper_{
|
||||
storage_->PropertiesOnDisk()};
|
||||
database::Counters counters_;
|
||||
};
|
||||
|
||||
} // namespace database
|
||||
|
@ -513,14 +513,6 @@ const std::string &GraphDbAccessor::PropertyName(
|
||||
return db_->property_mapper().id_to_value(property);
|
||||
}
|
||||
|
||||
int64_t GraphDbAccessor::Counter(const std::string &name) {
|
||||
return db_->counters().Get(name);
|
||||
}
|
||||
|
||||
void GraphDbAccessor::CounterSet(const std::string &name, int64_t value) {
|
||||
db_->counters().Set(name, value);
|
||||
}
|
||||
|
||||
std::vector<std::string> GraphDbAccessor::IndexInfo() const {
|
||||
std::vector<std::string> info;
|
||||
for (storage::Label label : db_->storage().labels_index_.Keys()) {
|
||||
|
@ -590,20 +590,6 @@ class GraphDbAccessor {
|
||||
GraphDb &db() { return *db_; }
|
||||
const GraphDb &db() const { return *db_; }
|
||||
|
||||
/**
|
||||
* Returns the current value of the counter with the given name, and
|
||||
* increments that counter. If the counter with the given name does not exist,
|
||||
* a new counter is created and this function returns 0.
|
||||
*/
|
||||
int64_t Counter(const std::string &name);
|
||||
|
||||
/**
|
||||
* Sets the counter with the given name to the given value. Returns nothing.
|
||||
* If the counter with the given name does not exist, a new counter is created
|
||||
* and set to the given value.
|
||||
*/
|
||||
void CounterSet(const std::string &name, int64_t value);
|
||||
|
||||
/* Returns a list of index names present in the database. */
|
||||
std::vector<std::string> IndexInfo() const;
|
||||
|
||||
|
@ -1,31 +0,0 @@
|
||||
/// @file
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
|
||||
namespace database {
|
||||
|
||||
/// Implementation for the single-node memgraph
|
||||
class Counters {
|
||||
public:
|
||||
int64_t Get(const std::string &name) {
|
||||
return counters_.access()
|
||||
.emplace(name, std::make_tuple(name), std::make_tuple(0))
|
||||
.first->second.fetch_add(1);
|
||||
}
|
||||
|
||||
void Set(const std::string &name, int64_t value) {
|
||||
auto name_counter_pair = counters_.access().emplace(
|
||||
name, std::make_tuple(name), std::make_tuple(value));
|
||||
if (!name_counter_pair.second) name_counter_pair.first->second.store(value);
|
||||
}
|
||||
|
||||
private:
|
||||
ConcurrentMap<std::string, std::atomic<int64_t>> counters_;
|
||||
};
|
||||
|
||||
} // namespace database
|
@ -4,7 +4,6 @@
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "database/single_node_ha/counters.hpp"
|
||||
#include "database/single_node_ha/graph_db_accessor.hpp"
|
||||
#include "storage/single_node_ha/concurrent_id_mapper.hpp"
|
||||
#include "storage/single_node_ha/storage_gc.hpp"
|
||||
@ -84,8 +83,6 @@ storage::ConcurrentIdMapper<storage::Property> &GraphDb::property_mapper() {
|
||||
return property_mapper_;
|
||||
}
|
||||
|
||||
database::Counters &GraphDb::counters() { return counters_; }
|
||||
|
||||
void GraphDb::CollectGarbage() { storage_gc_->CollectGarbage(); }
|
||||
|
||||
void GraphDb::Reset() {
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <vector>
|
||||
|
||||
#include "database/single_node_ha/config.hpp"
|
||||
#include "database/single_node_ha/counters.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "raft/coordination.hpp"
|
||||
#include "raft/raft_server.hpp"
|
||||
@ -88,7 +87,6 @@ class GraphDb {
|
||||
storage::ConcurrentIdMapper<storage::Label> &label_mapper();
|
||||
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper();
|
||||
storage::ConcurrentIdMapper<storage::Property> &property_mapper();
|
||||
database::Counters &counters();
|
||||
void CollectGarbage();
|
||||
|
||||
/// Releases the storage object safely and creates a new object, resets the tx
|
||||
@ -150,7 +148,6 @@ class GraphDb {
|
||||
storage_->PropertiesOnDisk()};
|
||||
storage::ConcurrentIdMapper<storage::Property> property_mapper_{
|
||||
storage_->PropertiesOnDisk()};
|
||||
database::Counters counters_;
|
||||
};
|
||||
|
||||
} // namespace database
|
||||
|
@ -420,14 +420,6 @@ const std::string &GraphDbAccessor::PropertyName(
|
||||
return db_->property_mapper().id_to_value(property);
|
||||
}
|
||||
|
||||
int64_t GraphDbAccessor::Counter(const std::string &name) {
|
||||
return db_->counters().Get(name);
|
||||
}
|
||||
|
||||
void GraphDbAccessor::CounterSet(const std::string &name, int64_t value) {
|
||||
db_->counters().Set(name, value);
|
||||
}
|
||||
|
||||
std::vector<std::string> GraphDbAccessor::IndexInfo() const {
|
||||
std::vector<std::string> info;
|
||||
for (storage::Label label : db_->storage().labels_index_.Keys()) {
|
||||
|
@ -575,20 +575,6 @@ class GraphDbAccessor {
|
||||
auto &db() { return db_; }
|
||||
const auto &db() const { return db_; }
|
||||
|
||||
/**
|
||||
* Returns the current value of the counter with the given name, and
|
||||
* increments that counter. If the counter with the given name does not exist,
|
||||
* a new counter is created and this function returns 0.
|
||||
*/
|
||||
int64_t Counter(const std::string &name);
|
||||
|
||||
/**
|
||||
* Sets the counter with the given name to the given value. Returns nothing.
|
||||
* If the counter with the given name does not exist, a new counter is created
|
||||
* and set to the given value.
|
||||
*/
|
||||
void CounterSet(const std::string &name, int64_t value);
|
||||
|
||||
/* Returns a list of index names present in the database. */
|
||||
std::vector<std::string> IndexInfo() const;
|
||||
|
||||
|
@ -16,6 +16,9 @@ struct EvaluationContext {
|
||||
std::vector<storage::Property> properties;
|
||||
/// All labels indexable via LabelIx
|
||||
std::vector<storage::Label> labels;
|
||||
/// All counters generated by `counter` function, mutable because the function
|
||||
/// modifies the values
|
||||
mutable std::unordered_map<std::string, int64_t> counters;
|
||||
};
|
||||
|
||||
inline std::vector<storage::Property> NamesToProperties(
|
||||
|
@ -649,32 +649,34 @@ TypedValue Assert(TypedValue *args, int64_t nargs, const EvaluationContext &,
|
||||
return args[0];
|
||||
}
|
||||
|
||||
TypedValue Counter(TypedValue *args, int64_t nargs, const EvaluationContext &,
|
||||
database::GraphDbAccessor *dba) {
|
||||
if (nargs != 1) {
|
||||
throw QueryRuntimeException("'counter' requires exactly one argument.");
|
||||
}
|
||||
if (!args[0].IsString())
|
||||
throw QueryRuntimeException("'counter' argument must be a string.");
|
||||
|
||||
return dba->Counter(args[0].ValueString());
|
||||
}
|
||||
|
||||
TypedValue CounterSet(TypedValue *args, int64_t nargs,
|
||||
const EvaluationContext &,
|
||||
database::GraphDbAccessor *dba) {
|
||||
if (nargs != 2) {
|
||||
throw QueryRuntimeException("'counterSet' requires two arguments.");
|
||||
#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA)
|
||||
TypedValue Counter(TypedValue *args, int64_t nargs,
|
||||
const EvaluationContext &context,
|
||||
database::GraphDbAccessor *) {
|
||||
if (nargs < 2 || nargs > 3) {
|
||||
throw QueryRuntimeException("'counter' requires two or three arguments.");
|
||||
}
|
||||
if (!args[0].IsString())
|
||||
throw QueryRuntimeException(
|
||||
"First argument of 'counterSet' must be a string.");
|
||||
"First argument of 'counter' must be a string.");
|
||||
if (!args[1].IsInt())
|
||||
throw QueryRuntimeException(
|
||||
"Second argument of 'counterSet' must be an integer.");
|
||||
dba->CounterSet(args[0].ValueString(), args[1].ValueInt());
|
||||
return TypedValue::Null;
|
||||
"Second argument of 'counter' must be an integer.");
|
||||
if (nargs == 3 && !args[2].IsInt())
|
||||
throw QueryRuntimeException(
|
||||
"Third argument of 'counter' must be an integer.");
|
||||
|
||||
int64_t step = 1;
|
||||
if (nargs == 3) step = args[2].ValueInt();
|
||||
|
||||
auto [it, inserted] =
|
||||
context.counters.emplace(args[0].ValueString(), args[1].ValueInt());
|
||||
auto value = it->second;
|
||||
it->second += step;
|
||||
|
||||
return value;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef MG_DISTRIBUTED
|
||||
TypedValue WorkerId(TypedValue *args, int64_t nargs, const EvaluationContext &,
|
||||
@ -983,8 +985,9 @@ NameToFunction(const std::string &function_name) {
|
||||
|
||||
// Memgraph specific functions
|
||||
if (function_name == "ASSERT") return Assert;
|
||||
#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA)
|
||||
if (function_name == "COUNTER") return Counter;
|
||||
if (function_name == "COUNTERSET") return CounterSet;
|
||||
#endif
|
||||
#ifdef MG_SINGLE_NODE
|
||||
if (function_name == "DUMP") return Dump;
|
||||
#endif
|
||||
|
@ -785,28 +785,14 @@ Feature: Functions
|
||||
"""
|
||||
When executing query:
|
||||
"""
|
||||
MATCH (n) SET n.id = counter("n.id") WITH n SKIP 1
|
||||
RETURN n.id, counter("other") AS c2
|
||||
MATCH (n) SET n.id = counter("n.id", 0) WITH n SKIP 1
|
||||
RETURN n.id, counter("other", 0) AS c2
|
||||
"""
|
||||
Then the result should be:
|
||||
| n.id | c2 |
|
||||
| 1 | 0 |
|
||||
| 2 | 1 |
|
||||
|
||||
|
||||
Scenario: CounterSet test:
|
||||
Given an empty graph
|
||||
When executing query:
|
||||
"""
|
||||
WITH counter("n") AS zero
|
||||
WITH counter("n") AS one, zero
|
||||
WITH counterSet("n", 42) AS nothing, zero, one
|
||||
RETURN counter("n") AS n, zero, one, counter("n2") AS n2
|
||||
"""
|
||||
Then the result should be:
|
||||
| n | zero | one | n2 |
|
||||
| 42 | 0 | 1 | 0 |
|
||||
|
||||
Scenario: Vertex Id test:
|
||||
Given an empty graph
|
||||
And having executed:
|
||||
|
@ -67,6 +67,9 @@ class GraphSession {
|
||||
ClientContextT context_{FLAGS_use_ssl};
|
||||
std::unique_ptr<ClientT> client_;
|
||||
|
||||
uint64_t vertex_id_{0};
|
||||
uint64_t edge_id_{0};
|
||||
|
||||
std::set<uint64_t> vertices_;
|
||||
std::set<uint64_t> edges_;
|
||||
|
||||
@ -123,15 +126,17 @@ class GraphSession {
|
||||
|
||||
void CreateVertices(uint64_t vertices_count) {
|
||||
if (vertices_count == 0) return;
|
||||
auto ret =
|
||||
Execute(fmt::format("UNWIND RANGE(1, {}) AS r CREATE (n:{} {{id: "
|
||||
"counter(\"vertex{}\")}}) RETURN min(n.id)",
|
||||
vertices_count, indexed_label_, id_));
|
||||
auto ret = Execute(fmt::format(
|
||||
"UNWIND RANGE({}, {}) AS r CREATE (n:{} {{id: r}}) RETURN count(n)",
|
||||
vertex_id_, vertex_id_ + vertices_count - 1, indexed_label_));
|
||||
CHECK(ret.records.size() == 1) << "Vertices creation failed!";
|
||||
uint64_t min_id = ret.records[0][0].ValueInt();
|
||||
CHECK(ret.records[0][0].ValueInt() == vertices_count)
|
||||
<< "Created " << ret.records[0][0].ValueInt() << " vertices instead of "
|
||||
<< vertices_count << "!";
|
||||
for (uint64_t i = 0; i < vertices_count; ++i) {
|
||||
vertices_.insert(min_id + i);
|
||||
vertices_.insert(vertex_id_ + i);
|
||||
}
|
||||
vertex_id_ += vertices_count;
|
||||
}
|
||||
|
||||
void RemoveVertex() {
|
||||
@ -173,27 +178,29 @@ class GraphSession {
|
||||
"MATCH (a:{0}) WITH a "
|
||||
"UNWIND range(0, {1}) AS i WITH a, tointeger(rand() * {2}) AS id "
|
||||
"MATCH (b:{0} {{id: id}}) WITH a, b "
|
||||
"CREATE (a)-[e:EdgeType {{id: counter(\"edge{3}\")}}]->(b) RETURN "
|
||||
"min(e.id), count(e)",
|
||||
indexed_label_, (int64_t)edges_per_node - 1, vertices_.size(), id_));
|
||||
"CREATE (a)-[e:EdgeType {{id: counter(\"edge\", {3})}}]->(b) "
|
||||
"RETURN count(e)",
|
||||
indexed_label_, (int64_t)edges_per_node - 1, vertices_.size(),
|
||||
edge_id_));
|
||||
|
||||
CHECK(ret.records.size() == 1) << "Failed to create edges";
|
||||
uint64_t min_id = ret.records[0][0].ValueInt();
|
||||
uint64_t count = ret.records[0][1].ValueInt();
|
||||
uint64_t count = ret.records[0][0].ValueInt();
|
||||
for (uint64_t i = 0; i < count; ++i) {
|
||||
edges_.insert(min_id + i);
|
||||
edges_.insert(edge_id_ + i);
|
||||
}
|
||||
edge_id_ += count;
|
||||
}
|
||||
|
||||
void CreateEdge() {
|
||||
auto ret =
|
||||
Execute(fmt::format("MATCH (from:{} {{id: {}}}), (to:{} {{id: {}}}) "
|
||||
"CREATE (from)-[e:EdgeType {{id: "
|
||||
"counter(\"edge{}\")}}]->(to) RETURN e.id",
|
||||
indexed_label_, RandomElement(vertices_),
|
||||
indexed_label_, RandomElement(vertices_), id_));
|
||||
auto ret = Execute(
|
||||
fmt::format("MATCH (from:{} {{id: {}}}), (to:{} {{id: {}}}) "
|
||||
"CREATE (from)-[e:EdgeType {{id: "
|
||||
"counter(\"edge\", {})}}]->(to) RETURN e.id",
|
||||
indexed_label_, RandomElement(vertices_), indexed_label_,
|
||||
RandomElement(vertices_), edge_id_));
|
||||
if (ret.records.size() > 0) {
|
||||
edges_.insert(ret.records[0][0].ValueInt());
|
||||
edge_id_ += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@ -384,8 +391,6 @@ int main(int argc, char **argv) {
|
||||
client.Execute("MATCH (n) DETACH DELETE n", {});
|
||||
for (int i = 0; i < FLAGS_worker_count; ++i) {
|
||||
client.Execute(fmt::format("CREATE INDEX ON :indexed_label{}(id)", i), {});
|
||||
client.Execute(fmt::format("RETURN counterSet(\"vertex{}\", 0)", i), {});
|
||||
client.Execute(fmt::format("RETURN counterSet(\"edge{}\", 0)", i), {});
|
||||
}
|
||||
|
||||
// close client
|
||||
|
@ -39,9 +39,6 @@ target_link_libraries(${test_prefix}concurrent_map_access mg-single-node kvstore
|
||||
add_unit_test(concurrent_map.cpp)
|
||||
target_link_libraries(${test_prefix}concurrent_map mg-single-node kvstore_dummy_lib)
|
||||
|
||||
add_unit_test(counters.cpp)
|
||||
target_link_libraries(${test_prefix}counters mg-distributed kvstore_dummy_lib)
|
||||
|
||||
add_unit_test(cypher_main_visitor.cpp)
|
||||
target_link_libraries(${test_prefix}cypher_main_visitor mg-single-node kvstore_dummy_lib)
|
||||
|
||||
|
@ -1,32 +0,0 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "communication/rpc/client_pool.hpp"
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "database/distributed/distributed_counters.hpp"
|
||||
|
||||
#include "test_coordination.hpp"
|
||||
|
||||
TEST(CountersDistributed, All) {
|
||||
TestMasterCoordination coordination;
|
||||
database::MasterCounters master(&coordination);
|
||||
coordination.Start();
|
||||
|
||||
communication::rpc::ClientPool master_client_pool(
|
||||
coordination.GetServerEndpoint());
|
||||
|
||||
database::WorkerCounters w1(&master_client_pool);
|
||||
database::WorkerCounters w2(&master_client_pool);
|
||||
|
||||
EXPECT_EQ(w1.Get("a"), 0);
|
||||
EXPECT_EQ(w1.Get("a"), 1);
|
||||
EXPECT_EQ(w2.Get("a"), 2);
|
||||
EXPECT_EQ(w1.Get("a"), 3);
|
||||
EXPECT_EQ(master.Get("a"), 4);
|
||||
|
||||
EXPECT_EQ(master.Get("b"), 0);
|
||||
EXPECT_EQ(w2.Get("b"), 1);
|
||||
w1.Set("b", 42);
|
||||
EXPECT_EQ(w2.Get("b"), 42);
|
||||
|
||||
coordination.Stop();
|
||||
}
|
@ -94,16 +94,6 @@ TEST_F(DistributedGraphDb, StorageTypes) {
|
||||
std::vector<storage::Property>{});
|
||||
}
|
||||
|
||||
TEST_F(DistributedGraphDb, Counters) {
|
||||
EXPECT_EQ(master().counters().Get("a"), 0);
|
||||
EXPECT_EQ(worker(1).counters().Get("a"), 1);
|
||||
EXPECT_EQ(worker(2).counters().Get("a"), 2);
|
||||
|
||||
EXPECT_EQ(worker(1).counters().Get("b"), 0);
|
||||
EXPECT_EQ(worker(2).counters().Get("b"), 1);
|
||||
EXPECT_EQ(master().counters().Get("b"), 2);
|
||||
}
|
||||
|
||||
TEST_F(DistributedGraphDb, DispatchPlan) {
|
||||
auto kRPCWaitTime = 600ms;
|
||||
int64_t plan_id = 5;
|
||||
|
@ -1457,28 +1457,27 @@ TEST_F(FunctionTest, Assert) {
|
||||
|
||||
TEST_F(FunctionTest, Counter) {
|
||||
EXPECT_THROW(EvaluateFunction("COUNTER", {}), QueryRuntimeException);
|
||||
EXPECT_THROW(EvaluateFunction("COUNTER", {"a"}), QueryRuntimeException);
|
||||
EXPECT_THROW(EvaluateFunction("COUNTER", {"a", "b"}), QueryRuntimeException);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 0);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 1);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c2"}).ValueInt(), 0);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 2);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c2"}).ValueInt(), 1);
|
||||
}
|
||||
EXPECT_THROW(EvaluateFunction("COUNTER", {"a", "b", "c"}), QueryRuntimeException);
|
||||
|
||||
TEST_F(FunctionTest, CounterSet) {
|
||||
EXPECT_THROW(EvaluateFunction("COUNTERSET", {}), QueryRuntimeException);
|
||||
EXPECT_THROW(EvaluateFunction("COUNTERSET", {"a"}), QueryRuntimeException);
|
||||
EXPECT_THROW(EvaluateFunction("COUNTERSET", {"a", "b"}),
|
||||
QueryRuntimeException);
|
||||
EXPECT_THROW(EvaluateFunction("COUNTERSET", {"a", 11, 12}),
|
||||
QueryRuntimeException);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 0);
|
||||
EvaluateFunction("COUNTERSET", {"c1", 12});
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 12);
|
||||
EvaluateFunction("COUNTERSET", {"c2", 42});
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c2"}).ValueInt(), 42);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c1"}).ValueInt(), 13);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c2"}).ValueInt(), 43);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c1", 0}).ValueInt(), 0);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c1", 0}).ValueInt(), 1);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c2", 0}).ValueInt(), 0);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c1", 0}).ValueInt(), 2);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c2", 0}).ValueInt(), 1);
|
||||
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c3", -1}).ValueInt(), -1);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c3", -1}).ValueInt(), 0);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c3", -1}).ValueInt(), 1);
|
||||
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c4", 0, 5}).ValueInt(), 0);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c4", 0, 5}).ValueInt(), 5);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c4", 0, 5}).ValueInt(), 10);
|
||||
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c5", 0, -5}).ValueInt(), 0);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c5", 0, -5}).ValueInt(), -5);
|
||||
EXPECT_EQ(EvaluateFunction("COUNTER", {"c5", 0, -5}).ValueInt(), -10);
|
||||
}
|
||||
|
||||
TEST_F(FunctionTest, Id) {
|
||||
|
Loading…
Reference in New Issue
Block a user