2018-04-20 20:58:49 +08:00
|
|
|
#include <algorithm>
|
|
|
|
#include <chrono>
|
2017-10-31 16:49:33 +08:00
|
|
|
#include <csignal>
|
2018-04-20 20:58:49 +08:00
|
|
|
#include <cstdint>
|
|
|
|
#include <exception>
|
|
|
|
#include <functional>
|
|
|
|
#include <limits>
|
|
|
|
#include <string>
|
|
|
|
#include <thread>
|
2016-08-10 16:39:02 +08:00
|
|
|
|
2017-07-06 19:53:39 +08:00
|
|
|
#include <gflags/gflags.h>
|
|
|
|
#include <glog/logging.h>
|
2017-05-22 18:31:04 +08:00
|
|
|
|
2018-07-27 16:54:20 +08:00
|
|
|
#include "auth/auth.hpp"
|
2017-03-06 20:37:51 +08:00
|
|
|
#include "communication/bolt/v1/session.hpp"
|
2017-10-25 21:28:10 +08:00
|
|
|
#include "config.hpp"
|
Split GraphDb to distributed and single node files
Summary:
This change, hopefully, simplifies the implementation of different kinds
of GraphDb. The pimpl idiom is now simplified by removing all of the
crazy inheritance. Implementations classes are just plain data stores,
without any methods. The interface classes now have a more flat
hierarchy:
```
GraphDb (pure interface)
|
+----+---------- DistributedGraphDb (pure interface)
| |
Single Node +-----+------+
| |
Master Worker
```
DistributedGraphDb is used as an intermediate interface for all the
things that should work only in distributed. Therefore, virtual calls
for distributed stuff have been removed from GraphDb. Some are exposed
via DistributedGraphDb, other's are only in concrete Master and Worker
classes. The code which relied on those virtual calls has been
refactored to either use DistributedGraphDb, take a pointer to what is
actually needed or use dynamic_cast. Obviously, dynamic_cast is a
temporary solution and should be replaced with another mechanism (e.g.
virtual call, or some other function pointer style).
The cost of the above change is some code duplication in constructors
and destructors of classes. This duplication has a lot of little tweaks
that make it hard to generalize, not to mention that virtual calls do
not work in constructor and destructor. If we really care about
generalizing this, we should think about abandoning RAII in favor of
constructor + Init method.
The next steps for splitting the dependencies that seem logical are:
1) Split GraphDbAccessor implementation, either via inheritance or
passing in an implementation pointer. GraphDbAccessor should then
only be created by a virtual call on GraphDb.
2) Split Interpreter implementation. Besides allowing single node
interpreter to exist without depending on distributed, this will
enable the planner and operators to be correctly separated.
Reviewers: msantl, mferencevic, ipaljak
Reviewed By: msantl
Subscribers: dgleich, pullbot
Differential Revision: https://phabricator.memgraph.io/D1493
2018-07-19 23:00:50 +08:00
|
|
|
#include "database/distributed_graph_db.hpp"
|
2018-01-12 22:17:04 +08:00
|
|
|
#include "database/graph_db.hpp"
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
#include "distributed/pull_rpc_clients.hpp"
|
2018-08-22 16:59:46 +08:00
|
|
|
#include "glue/auth.hpp"
|
|
|
|
#include "glue/communication.hpp"
|
2018-07-06 15:28:05 +08:00
|
|
|
#include "integrations/kafka/exceptions.hpp"
|
|
|
|
#include "integrations/kafka/streams.hpp"
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
#include "query/exceptions.hpp"
|
|
|
|
#include "query/interpreter.hpp"
|
2018-07-16 15:51:02 +08:00
|
|
|
#include "query/transaction_engine.hpp"
|
2018-07-06 15:28:05 +08:00
|
|
|
#include "requests/requests.hpp"
|
2018-02-02 18:11:06 +08:00
|
|
|
#include "stats/stats.hpp"
|
2018-06-20 19:46:54 +08:00
|
|
|
#include "telemetry/telemetry.hpp"
|
2017-06-09 21:48:40 +08:00
|
|
|
#include "utils/flag_validation.hpp"
|
2018-03-30 17:07:37 +08:00
|
|
|
#include "utils/signals.hpp"
|
2017-09-22 20:24:53 +08:00
|
|
|
#include "utils/sysinfo/memory.hpp"
|
2017-01-13 17:47:17 +08:00
|
|
|
#include "utils/terminate_handler.hpp"
|
2017-09-26 15:43:43 +08:00
|
|
|
#include "version.hpp"
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
// Common stuff for enterprise and community editions
|
|
|
|
|
2017-12-19 19:40:30 +08:00
|
|
|
// General purpose flags.
|
2017-09-25 21:56:14 +08:00
|
|
|
DEFINE_string(interface, "0.0.0.0",
|
|
|
|
"Communication interface on which to listen.");
|
2018-01-15 21:03:07 +08:00
|
|
|
DEFINE_VALIDATED_int32(port, 7687, "Communication port on which to listen.",
|
|
|
|
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
|
2017-06-09 21:48:40 +08:00
|
|
|
DEFINE_VALIDATED_int32(num_workers,
|
|
|
|
std::max(std::thread::hardware_concurrency(), 1U),
|
2018-02-23 17:56:56 +08:00
|
|
|
"Number of workers (Bolt)", FLAG_IN_RANGE(1, INT32_MAX));
|
2018-03-23 23:32:17 +08:00
|
|
|
DEFINE_VALIDATED_int32(session_inactivity_timeout, 1800,
|
|
|
|
"Time in seconds after which inactive sessions will be "
|
|
|
|
"closed.",
|
|
|
|
FLAG_IN_RANGE(1, INT32_MAX));
|
2018-06-20 23:44:47 +08:00
|
|
|
DEFINE_string(cert_file, "", "Certificate file to use.");
|
|
|
|
DEFINE_string(key_file, "", "Key file to use.");
|
2017-10-17 20:05:08 +08:00
|
|
|
DEFINE_string(log_file, "", "Path to where the log should be stored.");
|
2017-11-22 23:40:39 +08:00
|
|
|
DEFINE_HIDDEN_string(
|
|
|
|
log_link_basename, "",
|
|
|
|
"Basename used for symlink creation to the last log file.");
|
2017-09-25 21:56:14 +08:00
|
|
|
DEFINE_uint64(memory_warning_threshold, 1024,
|
2018-02-02 18:11:06 +08:00
|
|
|
"Memory warning threshold, in MB. If Memgraph detects there is "
|
|
|
|
"less available RAM it will log a warning. Set to 0 to "
|
2017-09-25 21:56:14 +08:00
|
|
|
"disable.");
|
2018-06-20 19:46:54 +08:00
|
|
|
DEFINE_bool(telemetry_enabled, false,
|
|
|
|
"Set to true to enable telemetry. We collect information about the "
|
|
|
|
"running system (CPU and memory information) and information about "
|
|
|
|
"the database runtime (vertex and edge counts and resource usage) "
|
|
|
|
"to allow for easier improvement of the product.");
|
|
|
|
DECLARE_string(durability_directory);
|
2016-08-01 01:58:12 +08:00
|
|
|
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
/** Encapsulates Dbms and Interpreter that are passed through the network server
|
|
|
|
* and worker to the session. */
|
|
|
|
struct SessionData {
|
Split GraphDb to distributed and single node files
Summary:
This change, hopefully, simplifies the implementation of different kinds
of GraphDb. The pimpl idiom is now simplified by removing all of the
crazy inheritance. Implementations classes are just plain data stores,
without any methods. The interface classes now have a more flat
hierarchy:
```
GraphDb (pure interface)
|
+----+---------- DistributedGraphDb (pure interface)
| |
Single Node +-----+------+
| |
Master Worker
```
DistributedGraphDb is used as an intermediate interface for all the
things that should work only in distributed. Therefore, virtual calls
for distributed stuff have been removed from GraphDb. Some are exposed
via DistributedGraphDb, other's are only in concrete Master and Worker
classes. The code which relied on those virtual calls has been
refactored to either use DistributedGraphDb, take a pointer to what is
actually needed or use dynamic_cast. Obviously, dynamic_cast is a
temporary solution and should be replaced with another mechanism (e.g.
virtual call, or some other function pointer style).
The cost of the above change is some code duplication in constructors
and destructors of classes. This duplication has a lot of little tweaks
that make it hard to generalize, not to mention that virtual calls do
not work in constructor and destructor. If we really care about
generalizing this, we should think about abandoning RAII in favor of
constructor + Init method.
The next steps for splitting the dependencies that seem logical are:
1) Split GraphDbAccessor implementation, either via inheritance or
passing in an implementation pointer. GraphDbAccessor should then
only be created by a virtual call on GraphDb.
2) Split Interpreter implementation. Besides allowing single node
interpreter to exist without depending on distributed, this will
enable the planner and operators to be correctly separated.
Reviewers: msantl, mferencevic, ipaljak
Reviewed By: msantl
Subscribers: dgleich, pullbot
Differential Revision: https://phabricator.memgraph.io/D1493
2018-07-19 23:00:50 +08:00
|
|
|
database::GraphDb &db;
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
query::Interpreter interpreter{db};
|
2018-07-27 16:54:20 +08:00
|
|
|
auth::Auth auth{
|
|
|
|
std::experimental::filesystem::path(FLAGS_durability_directory) / "auth"};
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
class BoltSession final
|
|
|
|
: public communication::bolt::Session<communication::InputStream,
|
|
|
|
communication::OutputStream> {
|
|
|
|
public:
|
|
|
|
BoltSession(SessionData &data, communication::InputStream &input_stream,
|
|
|
|
communication::OutputStream &output_stream)
|
|
|
|
: communication::bolt::Session<communication::InputStream,
|
|
|
|
communication::OutputStream>(
|
|
|
|
input_stream, output_stream),
|
2018-07-27 16:54:20 +08:00
|
|
|
transaction_engine_(data.db, data.interpreter),
|
|
|
|
auth_(&data.auth) {}
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
|
2018-07-18 16:40:06 +08:00
|
|
|
using communication::bolt::Session<communication::InputStream,
|
|
|
|
communication::OutputStream>::TEncoder;
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
|
2018-07-18 16:40:06 +08:00
|
|
|
std::vector<std::string> Interpret(
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
const std::string &query,
|
2018-07-24 21:11:18 +08:00
|
|
|
const std::map<std::string, communication::bolt::Value> ¶ms)
|
2018-07-18 16:40:06 +08:00
|
|
|
override {
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
std::map<std::string, query::TypedValue> params_tv;
|
|
|
|
for (const auto &kv : params)
|
|
|
|
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
|
|
|
|
try {
|
2018-08-22 16:59:46 +08:00
|
|
|
auto result = transaction_engine_.Interpret(query, params_tv);
|
|
|
|
if (user_) {
|
|
|
|
const auto &permissions = user_->GetPermissions();
|
|
|
|
for (const auto &privilege : result.second) {
|
|
|
|
if (permissions.Has(glue::PrivilegeToPermission(privilege)) !=
|
|
|
|
auth::PermissionLevel::GRANT) {
|
|
|
|
transaction_engine_.Abort();
|
|
|
|
throw communication::bolt::ClientError(
|
|
|
|
"You are not authorized to execute this query! Please contact "
|
|
|
|
"your database administrator.");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return result.first;
|
2018-07-18 16:40:06 +08:00
|
|
|
} catch (const query::QueryException &e) {
|
|
|
|
// Wrap QueryException into ClientError, because we want to allow the
|
|
|
|
// client to fix their query.
|
|
|
|
throw communication::bolt::ClientError(e.what());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-24 21:11:18 +08:00
|
|
|
std::map<std::string, communication::bolt::Value> PullAll(
|
2018-07-18 16:40:06 +08:00
|
|
|
TEncoder *encoder) override {
|
|
|
|
try {
|
|
|
|
TypedValueResultStream stream(encoder);
|
|
|
|
const auto &summary = transaction_engine_.PullAll(&stream);
|
2018-07-24 21:11:18 +08:00
|
|
|
std::map<std::string, communication::bolt::Value> decoded_summary;
|
2018-07-18 16:40:06 +08:00
|
|
|
for (const auto &kv : summary) {
|
2018-07-24 21:11:18 +08:00
|
|
|
decoded_summary.emplace(kv.first, glue::ToBoltValue(kv.second));
|
2018-07-18 16:40:06 +08:00
|
|
|
}
|
|
|
|
return decoded_summary;
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
} catch (const query::QueryException &e) {
|
|
|
|
// Wrap QueryException into ClientError, because we want to allow the
|
|
|
|
// client to fix their query.
|
|
|
|
throw communication::bolt::ClientError(e.what());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-16 15:51:02 +08:00
|
|
|
void Abort() override { transaction_engine_.Abort(); }
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
|
2018-07-27 16:54:20 +08:00
|
|
|
bool Authenticate(const std::string &username,
|
|
|
|
const std::string &password) override {
|
|
|
|
if (!auth_->HasUsers()) return true;
|
|
|
|
user_ = auth_->Authenticate(username, password);
|
|
|
|
return !!user_;
|
|
|
|
}
|
|
|
|
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
private:
|
2018-07-24 21:11:18 +08:00
|
|
|
// Wrapper around TEncoder which converts TypedValue to Value
|
2018-07-18 16:40:06 +08:00
|
|
|
// before forwarding the calls to original TEncoder.
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
class TypedValueResultStream {
|
|
|
|
public:
|
2018-07-18 16:40:06 +08:00
|
|
|
TypedValueResultStream(TEncoder *encoder) : encoder_(encoder) {}
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
|
|
|
|
void Result(const std::vector<query::TypedValue> &values) {
|
2018-07-24 21:11:18 +08:00
|
|
|
std::vector<communication::bolt::Value> decoded_values;
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
decoded_values.reserve(values.size());
|
|
|
|
for (const auto &v : values) {
|
2018-07-24 21:11:18 +08:00
|
|
|
decoded_values.push_back(glue::ToBoltValue(v));
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
}
|
2018-07-18 16:40:06 +08:00
|
|
|
encoder_->MessageRecord(decoded_values);
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
2018-07-18 16:40:06 +08:00
|
|
|
TEncoder *encoder_;
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
};
|
|
|
|
|
2018-07-16 15:51:02 +08:00
|
|
|
query::TransactionEngine transaction_engine_;
|
2018-07-27 16:54:20 +08:00
|
|
|
auth::Auth *auth_;
|
|
|
|
std::experimental::optional<auth::User> user_;
|
Extract communication to static library
Summary:
Session specifics have been move out of the Bolt `executing` state, and
are accessed via pure virtual Session type. Our server is templated on
the session and we are setting the concrete type, so there should be no
virtual call overhead. Abstract Session is used to indicate the
interface, this could have also been templated, but the explicit
interface definition makes it clearer.
Specific session implementation for running Memgraph is now implemented
in memgraph_bolt, which instantiates the concrete session type. This may
not be 100% appropriate place, but Memgraph specific session isn't
needed anywhere else.
Bolt/communication tests now use a dummy session and depend only on
communication, which significantly improves test run times.
All these changes make the communication a library which doesn't depend
on storage nor the database. Only shared connection points, which aren't
part of the base communication library are:
* glue/conversion -- which converts between storage and bolt types, and
* communication/result_stream_faker -- templated, but used in tests and query/repl
Depends on D1453
Reviewers: mferencevic, buda, mtomic, msantl
Reviewed By: mferencevic, mtomic
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D1456
2018-07-10 22:18:19 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
using ServerT = communication::Server<BoltSession, SessionData>;
|
|
|
|
using communication::ServerContext;
|
|
|
|
|
2018-07-06 15:28:05 +08:00
|
|
|
/**
|
|
|
|
* Class that implements ResultStream API for Kafka.
|
|
|
|
*
|
|
|
|
* Kafka doesn't need to stream the import results back to the client so we
|
|
|
|
* don't need any functionality here.
|
|
|
|
*/
|
|
|
|
class KafkaResultStream {
|
|
|
|
public:
|
|
|
|
void Result(const std::vector<query::TypedValue> &) {}
|
|
|
|
};
|
|
|
|
|
2017-10-31 16:49:33 +08:00
|
|
|
// Needed to correctly handle memgraph destruction from a signal handler.
|
|
|
|
// Without having some sort of a flag, it is possible that a signal is handled
|
2018-01-12 22:17:04 +08:00
|
|
|
// when we are exiting main, inside destructors of database::GraphDb and
|
|
|
|
// similar. The signal handler may then initiate another shutdown on memgraph
|
|
|
|
// which is in half destructed state, causing invalid memory access and crash.
|
2017-10-31 16:49:33 +08:00
|
|
|
volatile sig_atomic_t is_shutting_down = 0;
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
/// Set up signal handlers and register `shutdown` on SIGTERM and SIGINT.
|
|
|
|
/// In most cases you don't have to call this. If you are using a custom server
|
|
|
|
/// startup function for `WithInit`, then you probably need to use this to
|
|
|
|
/// shutdown your server.
|
|
|
|
void InitSignalHandlers(const std::function<void()> &shutdown_fun) {
|
2017-10-31 16:49:33 +08:00
|
|
|
// Prevent handling shutdown inside a shutdown. For example, SIGINT handler
|
|
|
|
// being interrupted by SIGTERM before is_shutting_down is set, thus causing
|
|
|
|
// double shutdown.
|
|
|
|
sigset_t block_shutdown_signals;
|
|
|
|
sigemptyset(&block_shutdown_signals);
|
|
|
|
sigaddset(&block_shutdown_signals, SIGTERM);
|
|
|
|
sigaddset(&block_shutdown_signals, SIGINT);
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
// Wrap the shutdown function in a safe way to prevent recursive shutdown.
|
|
|
|
auto shutdown = [shutdown_fun]() {
|
|
|
|
if (is_shutting_down) return;
|
|
|
|
is_shutting_down = 1;
|
|
|
|
shutdown_fun();
|
|
|
|
};
|
|
|
|
|
2018-03-30 17:07:37 +08:00
|
|
|
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Terminate,
|
|
|
|
shutdown, block_shutdown_signals))
|
2017-10-31 16:49:33 +08:00
|
|
|
<< "Unable to register SIGTERM handler!";
|
2018-03-30 17:07:37 +08:00
|
|
|
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Interupt, shutdown,
|
|
|
|
block_shutdown_signals))
|
2017-10-31 16:49:33 +08:00
|
|
|
<< "Unable to register SIGINT handler!";
|
2017-06-08 19:30:59 +08:00
|
|
|
|
2017-11-22 23:40:39 +08:00
|
|
|
// Setup SIGUSR1 to be used for reopening log files, when e.g. logrotate
|
|
|
|
// rotates our logs.
|
2018-03-30 17:07:37 +08:00
|
|
|
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::User1, []() {
|
2017-11-22 23:40:39 +08:00
|
|
|
google::CloseLogDestination(google::INFO);
|
|
|
|
})) << "Unable to register SIGUSR1 handler!";
|
2017-12-19 19:40:30 +08:00
|
|
|
}
|
2017-11-22 23:40:39 +08:00
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
/// Run the Memgraph server.
|
|
|
|
///
|
|
|
|
/// Sets up all the required state before running `memgraph_main` and does any
|
|
|
|
/// required cleanup afterwards. `get_stats_prefix` is used to obtain the
|
|
|
|
/// prefix when logging Memgraph's statistics.
|
|
|
|
///
|
|
|
|
/// Command line arguments and configuration files are read before calling any
|
|
|
|
/// of the supplied functions. Therefore, you should use flags only from those
|
|
|
|
/// functions, and *not before* invoking `WithInit`.
|
|
|
|
///
|
|
|
|
/// This should be the first and last thing a OS specific main function does.
|
|
|
|
///
|
|
|
|
/// A common example of usage is:
|
|
|
|
///
|
|
|
|
/// @code
|
|
|
|
/// int main(int argc, char *argv[]) {
|
|
|
|
/// auto get_stats_prefix = []() -> std::string { return "memgraph"; };
|
|
|
|
/// return WithInit(argc, argv, get_stats_prefix, SingleNodeMain);
|
|
|
|
/// }
|
|
|
|
/// @endcode
|
|
|
|
///
|
|
|
|
/// If you wish to start Memgraph server in another way, you can pass a
|
|
|
|
/// `memgraph_main` functions which does that. You should take care to call
|
|
|
|
/// `InitSignalHandlers` with appropriate function to shutdown the server you
|
|
|
|
/// started.
|
|
|
|
int WithInit(int argc, char **argv,
|
|
|
|
const std::function<std::string()> &get_stats_prefix,
|
|
|
|
const std::function<void()> &memgraph_main) {
|
|
|
|
gflags::SetVersionString(version_string);
|
2017-12-19 19:40:30 +08:00
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
// Load config before parsing arguments, so that flags from the command line
|
|
|
|
// overwrite the config.
|
|
|
|
LoadConfig();
|
|
|
|
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
|
|
|
|
|
|
|
google::InitGoogleLogging(argv[0]);
|
|
|
|
google::SetLogDestination(google::INFO, FLAGS_log_file.c_str());
|
|
|
|
google::SetLogSymlink(google::INFO, FLAGS_log_link_basename.c_str());
|
|
|
|
|
|
|
|
// Unhandled exception handler init.
|
|
|
|
std::set_terminate(&utils::TerminateHandler);
|
|
|
|
|
|
|
|
stats::InitStatsLogging(get_stats_prefix());
|
|
|
|
utils::OnScopeExit stop_stats([] { stats::StopStatsLogging(); });
|
|
|
|
|
2018-06-20 23:44:47 +08:00
|
|
|
// Initialize the communication library.
|
|
|
|
communication::Init();
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
// Start memory warning logger.
|
|
|
|
utils::Scheduler mem_log_scheduler;
|
|
|
|
if (FLAGS_memory_warning_threshold > 0) {
|
2018-08-17 04:20:44 +08:00
|
|
|
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
|
|
|
if (free_ram) {
|
|
|
|
mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] {
|
|
|
|
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
|
|
|
if (free_ram && *free_ram / 1024 < FLAGS_memory_warning_threshold)
|
|
|
|
LOG(WARNING) << "Running out of available RAM, only "
|
|
|
|
<< *free_ram / 1024 << " MB left.";
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
// Kernel version for the `MemAvailable` value is from: man procfs
|
|
|
|
LOG(WARNING) << "You have an older kernel version (<3.14) or the /proc "
|
|
|
|
"filesystem isn't available so remaining memory warnings "
|
|
|
|
"won't be available.";
|
|
|
|
}
|
2018-04-20 20:58:49 +08:00
|
|
|
}
|
2018-07-06 15:28:05 +08:00
|
|
|
requests::Init();
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
memgraph_main();
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
void SingleNodeMain() {
|
|
|
|
google::SetUsageMessage("Memgraph single-node database server");
|
|
|
|
database::SingleNode db;
|
2018-01-12 22:17:04 +08:00
|
|
|
SessionData session_data{db};
|
2018-06-20 23:44:47 +08:00
|
|
|
|
2018-07-06 15:28:05 +08:00
|
|
|
auto stream_writer =
|
2018-08-06 19:05:42 +08:00
|
|
|
[&session_data](
|
|
|
|
const std::string &query,
|
|
|
|
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
|
|
|
auto dba = session_data.db.Access();
|
|
|
|
KafkaResultStream stream;
|
|
|
|
std::map<std::string, query::TypedValue> params_tv;
|
|
|
|
for (const auto &kv : params)
|
|
|
|
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
|
|
|
|
try {
|
|
|
|
session_data.interpreter(query, *dba, params_tv, false)
|
|
|
|
.PullAll(stream);
|
|
|
|
dba->Commit();
|
|
|
|
} catch (const query::QueryException &e) {
|
|
|
|
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
|
|
|
|
<< e.what();
|
|
|
|
dba->Abort();
|
2018-07-06 15:28:05 +08:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
integrations::kafka::Streams kafka_streams{
|
|
|
|
std::experimental::filesystem::path(FLAGS_durability_directory) /
|
|
|
|
"streams",
|
|
|
|
stream_writer};
|
|
|
|
|
|
|
|
try {
|
|
|
|
// Recover possible streams.
|
|
|
|
kafka_streams.Recover();
|
|
|
|
} catch (const integrations::kafka::KafkaStreamException &e) {
|
|
|
|
LOG(ERROR) << e.what();
|
|
|
|
}
|
|
|
|
|
2018-07-27 16:54:20 +08:00
|
|
|
session_data.interpreter.auth_ = &session_data.auth;
|
2018-07-06 15:28:05 +08:00
|
|
|
session_data.interpreter.kafka_streams_ = &kafka_streams;
|
|
|
|
|
2018-06-20 23:44:47 +08:00
|
|
|
ServerContext context;
|
|
|
|
std::string service_name = "Bolt";
|
|
|
|
if (FLAGS_key_file != "" && FLAGS_cert_file != "") {
|
|
|
|
context = ServerContext(FLAGS_key_file, FLAGS_cert_file);
|
|
|
|
service_name = "BoltS";
|
|
|
|
}
|
|
|
|
|
2018-01-15 21:03:07 +08:00
|
|
|
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
|
2018-06-20 23:44:47 +08:00
|
|
|
session_data, &context, FLAGS_session_inactivity_timeout,
|
|
|
|
service_name, FLAGS_num_workers);
|
2017-12-19 19:40:30 +08:00
|
|
|
|
2018-06-20 19:46:54 +08:00
|
|
|
// Setup telemetry
|
|
|
|
std::experimental::optional<telemetry::Telemetry> telemetry;
|
|
|
|
if (FLAGS_telemetry_enabled) {
|
|
|
|
telemetry.emplace(
|
|
|
|
"https://telemetry.memgraph.com/88b5e7e8-746a-11e8-9f85-538a9e9690cc/",
|
|
|
|
std::experimental::filesystem::path(FLAGS_durability_directory) /
|
|
|
|
"telemetry",
|
|
|
|
std::chrono::minutes(10));
|
|
|
|
telemetry->AddCollector("db", [&db]() -> nlohmann::json {
|
2018-07-26 15:08:21 +08:00
|
|
|
auto dba = db.Access();
|
|
|
|
return {{"vertices", dba->VerticesCount()}, {"edges", dba->EdgesCount()}};
|
2018-06-20 19:46:54 +08:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2017-12-19 19:40:30 +08:00
|
|
|
// Handler for regular termination signals
|
2018-01-12 22:17:04 +08:00
|
|
|
auto shutdown = [&server] {
|
2017-12-19 19:40:30 +08:00
|
|
|
// Server needs to be shutdown first and then the database. This prevents a
|
|
|
|
// race condition when a transaction is accepted during server shutdown.
|
|
|
|
server.Shutdown();
|
|
|
|
};
|
|
|
|
InitSignalHandlers(shutdown);
|
2018-04-20 20:58:49 +08:00
|
|
|
|
2018-01-10 20:56:12 +08:00
|
|
|
server.AwaitShutdown();
|
2017-12-19 19:40:30 +08:00
|
|
|
}
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
// End common stuff for enterprise and community editions
|
|
|
|
|
|
|
|
#ifdef MG_COMMUNITY
|
|
|
|
|
|
|
|
int main(int argc, char **argv) {
|
|
|
|
return WithInit(argc, argv, []() { return "memgraph"; }, SingleNodeMain);
|
2017-12-19 19:40:30 +08:00
|
|
|
}
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
#else // enterprise edition
|
|
|
|
|
|
|
|
// Distributed flags.
|
|
|
|
DEFINE_HIDDEN_bool(
|
|
|
|
master, false,
|
|
|
|
"If this Memgraph server is the master in a distributed deployment.");
|
|
|
|
DEFINE_HIDDEN_bool(
|
|
|
|
worker, false,
|
|
|
|
"If this Memgraph server is a worker in a distributed deployment.");
|
|
|
|
DECLARE_int32(worker_id);
|
|
|
|
|
|
|
|
void MasterMain() {
|
|
|
|
google::SetUsageMessage("Memgraph distributed master");
|
|
|
|
|
|
|
|
database::Master db;
|
2018-01-12 22:17:04 +08:00
|
|
|
SessionData session_data{db};
|
2018-06-20 23:44:47 +08:00
|
|
|
|
2018-07-06 15:28:05 +08:00
|
|
|
auto stream_writer =
|
2018-08-06 19:05:42 +08:00
|
|
|
[&session_data](
|
|
|
|
const std::string &query,
|
|
|
|
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
|
|
|
auto dba = session_data.db.Access();
|
|
|
|
KafkaResultStream stream;
|
|
|
|
std::map<std::string, query::TypedValue> params_tv;
|
|
|
|
for (const auto &kv : params)
|
|
|
|
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
|
|
|
|
try {
|
|
|
|
session_data.interpreter(query, *dba, params_tv, false)
|
|
|
|
.PullAll(stream);
|
|
|
|
dba->Commit();
|
|
|
|
} catch (const query::QueryException &e) {
|
|
|
|
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
|
|
|
|
<< e.what();
|
|
|
|
dba->Abort();
|
2018-07-06 15:28:05 +08:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
integrations::kafka::Streams kafka_streams{
|
|
|
|
std::experimental::filesystem::path(FLAGS_durability_directory) /
|
|
|
|
"streams",
|
|
|
|
stream_writer};
|
|
|
|
|
|
|
|
try {
|
|
|
|
// Recover possible streams.
|
|
|
|
kafka_streams.Recover();
|
|
|
|
} catch (const integrations::kafka::KafkaStreamException &e) {
|
|
|
|
LOG(ERROR) << e.what();
|
|
|
|
}
|
|
|
|
|
|
|
|
session_data.interpreter.kafka_streams_ = &kafka_streams;
|
|
|
|
|
2018-06-20 23:44:47 +08:00
|
|
|
ServerContext context;
|
|
|
|
std::string service_name = "Bolt";
|
|
|
|
if (FLAGS_key_file != "" && FLAGS_cert_file != "") {
|
|
|
|
context = ServerContext(FLAGS_key_file, FLAGS_cert_file);
|
|
|
|
service_name = "BoltS";
|
|
|
|
}
|
|
|
|
|
2018-01-15 21:03:07 +08:00
|
|
|
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
|
2018-06-20 23:44:47 +08:00
|
|
|
session_data, &context, FLAGS_session_inactivity_timeout,
|
|
|
|
service_name, FLAGS_num_workers);
|
2017-12-19 19:40:30 +08:00
|
|
|
|
|
|
|
// Handler for regular termination signals
|
2018-01-12 22:17:04 +08:00
|
|
|
auto shutdown = [&server] {
|
2017-12-19 19:40:30 +08:00
|
|
|
// Server needs to be shutdown first and then the database. This prevents a
|
|
|
|
// race condition when a transaction is accepted during server shutdown.
|
|
|
|
server.Shutdown();
|
|
|
|
};
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
InitSignalHandlers(shutdown);
|
2018-01-10 20:56:12 +08:00
|
|
|
server.AwaitShutdown();
|
2017-12-19 19:40:30 +08:00
|
|
|
}
|
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
void WorkerMain() {
|
|
|
|
google::SetUsageMessage("Memgraph distributed worker");
|
|
|
|
database::Worker db;
|
|
|
|
db.WaitForShutdown();
|
|
|
|
}
|
2017-01-23 19:02:11 +08:00
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
int main(int argc, char **argv) {
|
|
|
|
auto get_stats_prefix = [&]() -> std::string {
|
|
|
|
if (FLAGS_master) {
|
|
|
|
return "master";
|
|
|
|
} else if (FLAGS_worker) {
|
|
|
|
return fmt::format("worker-{}", FLAGS_worker_id);
|
|
|
|
}
|
|
|
|
return "memgraph";
|
|
|
|
};
|
2018-02-02 18:11:06 +08:00
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
auto memgraph_main = [&]() {
|
|
|
|
CHECK(!(FLAGS_master && FLAGS_worker))
|
|
|
|
<< "Can't run Memgraph as worker and master at the same time";
|
|
|
|
if (FLAGS_master)
|
|
|
|
MasterMain();
|
|
|
|
else if (FLAGS_worker)
|
|
|
|
WorkerMain();
|
|
|
|
else
|
|
|
|
SingleNodeMain();
|
|
|
|
};
|
2018-02-23 21:35:16 +08:00
|
|
|
|
2018-04-20 20:58:49 +08:00
|
|
|
return WithInit(argc, argv, get_stats_prefix, memgraph_main);
|
2016-08-01 01:58:12 +08:00
|
|
|
}
|
2018-04-20 20:58:49 +08:00
|
|
|
|
|
|
|
#endif // enterprise edition
|