Minor refactorings:

Importes now use logger.

Refactored order of constrution of objects in Db.

Moved index creation/removing from Db to Indexes.

Completed Garbage class.
Cleaner now calls garbage.clean() for databases.

Renamed List to ConcurrentList which better names it.
This commit is contained in:
Kruno Tomola Fabro 2016-09-12 20:13:04 +01:00
parent efbae39f41
commit 2a59ed8906
28 changed files with 529 additions and 233 deletions

View File

@ -458,6 +458,7 @@ set(memgraph_src_files
${src_dir}/storage/model/properties/properties.cpp
${src_dir}/storage/model/properties/stored_property.cpp
${src_dir}/storage/model/properties/property_family.cpp
${src_dir}/storage/indexes/indexes.cpp
${src_dir}/storage/indexes/index_base.cpp
${src_dir}/storage/indexes/index_record.cpp
${src_dir}/storage/indexes/index_update.cpp
@ -467,6 +468,7 @@ set(memgraph_src_files
${src_dir}/storage/locking/record_lock.cpp
${src_dir}/storage/garbage/garbage.cpp
${src_dir}/storage/vertex_accessor.cpp
${src_dir}/transactions/snapshot.cpp
${src_dir}/transactions/transaction.cpp
${src_dir}/transactions/transaction_read.cpp
${src_dir}/template_engine/engine.cpp

View File

@ -7,7 +7,7 @@
// TODO: reimplement this
template <class T>
class List
class ConcurrentList
{
private:
template <class V>
@ -50,12 +50,12 @@ private:
template <class It>
class IteratorBase : public Crtp<It>
{
friend class List;
friend class ConcurrentList;
protected:
IteratorBase() : list(nullptr), curr(nullptr) {}
IteratorBase(List *list) : list(list)
IteratorBase(ConcurrentList *list) : list(list)
{
assert(list != nullptr);
list->count++;
@ -157,17 +157,16 @@ private:
}
// True only if this call removed the element. Only reason for fail is
// if
// the element is already removed.
// if the element is already removed.
// Remove has deadlock if another thread dies between marking node for
// removal
// and the disconnection.
// removal and the disconnection.
// This can be improved with combinig the removed flag with prev.next or
// curr.next
bool remove()
{
assert(valid());
if (cas(curr->removed, false, true)) {
// I removed it!!!
if (!disconnect()) {
find_and_disconnect();
}
@ -222,7 +221,7 @@ private:
return true;
}
List *list;
ConcurrentList *list;
Node *prev{nullptr};
Node *curr;
};
@ -230,7 +229,7 @@ private:
public:
class ConstIterator : public IteratorBase<ConstIterator>
{
friend class List;
friend class ConcurrentList;
public:
using IteratorBase<ConstIterator>::IteratorBase;
@ -253,19 +252,19 @@ public:
class Iterator : public IteratorBase<Iterator>
{
friend class List;
friend class ConcurrentList;
public:
using IteratorBase<Iterator>::IteratorBase;
};
public:
List() = default;
ConcurrentList() = default;
List(List &) = delete;
List(List &&) = delete;
ConcurrentList(ConcurrentList &) = delete;
ConcurrentList(ConcurrentList &&) = delete;
~List()
~ConcurrentList()
{
auto now = head.load();
while (now != nullptr) {
@ -275,7 +274,7 @@ public:
}
}
void operator=(List &) = delete;
void operator=(ConcurrentList &) = delete;
Iterator begin() { return Iterator(this); }

View File

@ -9,8 +9,12 @@
#include "transactions/engine.hpp"
class Indexes;
class Snapshoter;
// Main class which represents Database concept in code.
// TODO: Maybe split this in another layer between Db and Dbms. Where the new
// layer would hold SnapshotEngine and his kind of concept objects. Some
// guidelines would be: retain objects which are necessary to implement querys
// in Db, the rest can be moved to the new layer.
class Db
{
public:
@ -20,32 +24,21 @@ public:
Db(const std::string &name);
Db(const Db &db) = delete;
Graph graph;
private:
const std::string name_;
public:
tx::Engine tx_engine;
Garbage garbage;
SnapshotEngine snap_engine;
Graph graph;
Garbage garbage = {tx_engine};
// This must be initialized after name.
SnapshotEngine snap_engine = {*this};
// Creates Indexes for this db.
// TODO: Indexes should be created only once somwhere Like Db or layer
// between Db and Dbms.
Indexes indexes();
std::string const &name() const;
Indexes indexes(); // TODO join into Db
// INDEXES
// TG - type group
// I - type of function I:const tx::Transaction& ->
// std::unique_ptr<IndexBase<TypeGroupVertex,std::nullptr_t>>
// G - type of collection (verrtex/edge)
// TODO: Currently only one index at a time can be created.
// TODO: Move to indexes
template <class TG, class I, class G>
bool create_index_on_vertex_property_family(const char *name, G &coll,
I &create_index);
// Removes index IndexHolder. True if there was index to remove.
// TODO: Move to indexes
template <class TG, class K>
bool remove_index(IndexHolder<TG, K> &ih);
private:
std::string name_;
};

View File

@ -14,6 +14,7 @@
#include "import/element_skeleton.hpp"
#include "import/fillings/filler.hpp"
#include "logging/default.hpp"
#include "storage/model/properties/flags.hpp"
#include "storage/vertex_accessor.hpp"
#include "utils/option.hpp"
@ -27,44 +28,11 @@ class BaseImporter
{
public:
BaseImporter(DbAccessor &db, ostream &err_stream)
: db(db), err_stream(err_stream)
BaseImporter(DbAccessor &db, Logger &&logger)
: db(db), logger(std::move(logger))
{
}
template <class... Args>
void err(Args &... args)
{
if (error) {
err_stream << " Error: ";
out_err(args...);
err_stream << endl;
}
}
template <class... Args>
void warn(Args &... args)
{
if (warning) {
err_stream << " Warning: ";
out_err(args...);
err_stream << endl;
}
}
template <class T, class... Args>
void out_err(T &first, Args &... args)
{
err_stream << first;
out_err(args...);
}
template <class T>
void out_err(T &first)
{
err_stream << first;
}
char *cstr(string &str) { return &str[0]; }
bool split(string &str, char mark, vector<char *> &sub_str)
@ -104,7 +72,7 @@ public:
// IN ARRAY check
if (c == open_bracket) {
if (in_array) {
err("Nested arrays aren't supported.");
logger.error("Nested arrays aren't supported.");
return false;
}
in_array = true;
@ -198,6 +166,7 @@ public:
public:
DbAccessor &db;
Logger logger;
char parts_mark = ',';
char parts_array_mark = ',';
@ -206,13 +175,7 @@ public:
char open_bracket = '[';
char closed_bracket = ']';
bool warning = true;
bool error = true;
protected:
// All errors are writen to this stream.
ostream &err_stream;
// All created vertices which have import local id
vector<Option<VertexAccessor>> vertices;
};

View File

@ -51,7 +51,10 @@ class CSVImporter : public BaseImporter
{
public:
using BaseImporter::BaseImporter;
CSVImporter(DbAccessor &db)
: BaseImporter(db, logging::log->logger("CSV_import"))
{
}
// Loads data from stream and returns number of loaded vertexes.
size_t import_vertices(std::fstream &file)
@ -77,12 +80,12 @@ private:
// HEADERS
if (!getline(file, line)) {
err("No lines");
logger.error("No lines");
return 0;
}
if (!split(line, parts_mark, sub_str)) {
err("Illegal headers");
logger.error("Illegal headers");
return 0;
}
@ -115,7 +118,7 @@ private:
for (int i = 0; i < n; i++) {
auto er = fillers[i]->fill(es, sub_str[i]);
if (er.is_present()) {
err(er.get(), " on line: ", line_no);
logger.error("{} on line: {}", er.get(), line_no);
}
}
@ -143,17 +146,18 @@ private:
id.get() - im->vertices.size() + 1, empty);
}
if (im->vertices[id.get()].is_present()) {
im->err("Vertex on line: ", line_no,
" has same id with another previously loaded vertex");
im->logger.error("Vertex on line: {} has same id with another "
"previously loaded vertex",
line_no);
return false;
} else {
im->vertices[id.get()] = make_option(std::move(va));
return true;
}
} else {
im->warn("Missing import local vertex id for vertex on "
"line: ",
line_no);
im->logger.warn("Missing import local vertex id for vertex on "
"line: {}",
line_no);
}
return true;
@ -166,7 +170,7 @@ private:
if (!o.is_present()) {
return true;
} else {
im->err(o.get(), " on line: ", line_no);
im->logger.error("{} on line: {}", o.get(), line_no);
return false;
}
}
@ -190,17 +194,17 @@ private:
const char *type = tmp_vec[1];
if (tmp_vec.size() > 2) {
err("To much sub parts in header part");
logger.error("To much sub parts in header part");
return make_option<unique_ptr<Filler>>();
} else if (tmp_vec.size() < 2) {
if (tmp_vec.size() == 1) {
warn(
"Column ", tmp_vec[0],
" doesn't have specified type so string type will be used");
logger.warn("Column: {} doesn't have specified type so string "
"type will be used",
tmp_vec[0]);
name = tmp_vec[0];
type = _string;
} else {
warn("Empty colum definition, skiping column.");
logger.warn("Empty colum definition, skiping column.");
std::unique_ptr<Filler> f(new SkipFiller());
return make_option(std::move(f));
}
@ -235,7 +239,7 @@ private:
return make_option(std::move(f));
} else if (name[0] == '\0') { // OTHER FILLERS REQUIRE NAME
warn("Unnamed column of type: ", type, " will be skipped.");
logger.warn("Unnamed column of type: {} will be skipped.", type);
std::unique_ptr<Filler> f(new SkipFiller());
return make_option(std::move(f));
@ -315,7 +319,7 @@ private:
return make_option(std::move(f));
} else {
err("Unknown type: ", type);
logger.error("Unknown type: {}", type);
return make_option<unique_ptr<Filler>>();
}
}
@ -324,13 +328,13 @@ private:
{
if (diff != 0) {
if (diff < 0) {
// warn("Line no: ", line_no, " has less parts then "
// "specified in header. Missing ",
// diff, " parts");
logger.warn("Line no: {} has less parts then specified in "
"header. Missing: {} parts",
line_no, diff);
} else {
warn("Line no: ", line_no,
" has more parts then specified in header. Extra ", diff,
" parts");
logger.warn("Line no: {} has more parts then specified in "
"header. Extra: {} parts",
line_no, diff);
}
}
}
@ -351,25 +355,18 @@ CSVImporter::property_key<TypeGroupEdge>(const char *name, Flags type)
}
// Imports all -v "vertex_file_path.csv" vertices and -e "edge_file_path.csv"
// edges from specified files. Also defines arguments -d, -ad, -w, -err, -info.
// edges from specified files. Also defines arguments -d, -ad.
// -d delimiter => sets delimiter for parsing .csv files. Default is ,
// -ad delimiter => sets delimiter for parsing arrays in .csv. Default is ,
// -w bool => turns on/off output of warnings. Default on.
// -err bool => turns on/off output of errors. Default on.
// -info bool => turns on/off output of info. Default on.
// -ad delimiter => sets delimiter for parsing arrays in .csv. Default is
// Returns (no loaded vertices,no loaded edges)
std::pair<size_t, size_t>
import_csv_from_arguments(Db &db, std::vector<std::string> &para)
{
DbAccessor t(db);
CSVImporter imp(t, cerr);
CSVImporter imp(t);
imp.parts_mark = get_argument(para, "-d", ",")[0];
imp.parts_array_mark = get_argument(para, "-ad", ",")[0];
imp.warning = strcmp(get_argument(para, "-w", "true").c_str(), "true") == 0;
imp.error = strcmp(get_argument(para, "-err", "true").c_str(), "true") == 0;
bool info =
strcmp(get_argument(para, "-info", "true").c_str(), "true") == 0;
// IMPORT VERTICES
size_t l_v = 0;
@ -377,16 +374,12 @@ import_csv_from_arguments(Db &db, std::vector<std::string> &para)
while (o.is_present()) {
std::fstream file(o.get());
if (info)
std::cout << "Importing vertices from file: " << o.get()
<< std::endl;
imp.logger.info("Importing vertices from file: {}", o.get());
auto n = imp.import_vertices(file);
l_v = +n;
if (info)
std::cout << "Loaded " << n << " vertices from " << o.get()
<< std::endl;
imp.logger.info("Loaded: {} vertices from {}", n, o.get());
o = take_argument(para, "-v");
}
@ -397,15 +390,12 @@ import_csv_from_arguments(Db &db, std::vector<std::string> &para)
while (o.is_present()) {
std::fstream file(o.get());
if (info)
std::cout << "Importing edges from file: " << o.get() << std::endl;
imp.logger.info("Importing edges from file: {}", o.get());
auto n = imp.import_edges(file);
l_e = +n;
if (info)
std::cout << "Loaded " << n << " edges from " << o.get()
<< std::endl;
imp.logger.info("Loaded: {} edges from {}", n, o.get());
o = take_argument(para, "-e");
}

View File

@ -16,3 +16,13 @@ class DecoderException : public BasicException
{
using BasicException::BasicException;
};
class NotYetImplemented : public BasicException
{
using BasicException::BasicException;
};
class NonExhaustiveSwitch : public BasicException
{
using BasicException::BasicException;
};

View File

@ -17,7 +17,7 @@ class SnapshotEngine
{
public:
SnapshotEngine(Db &db, std::string const &name);
SnapshotEngine(Db &db);
~SnapshotEngine() = default;

View File

@ -16,13 +16,15 @@ class Engine;
class Garbage
{
public:
Garbage(tx::Engine &e) : engine(e) {}
void dispose(tx::Snapshot<Id> &&snapshot, DeleteSensitive *data);
// Cleaner thread shoul call this method every some time. Removes data which
// is
// safe to be deleted.
void clean(tx::Engine &engine);
// Cleaner thread should call this method every some time. Removes data
// which is safe to be deleted.
void clean();
private:
List<std::pair<tx::Snapshot<Id>, DeleteSensitive *>> gar;
ConcurrentList<std::pair<tx::Snapshot<Id>, DeleteSensitive *>> gar;
tx::Engine &engine;
};

View File

@ -9,7 +9,7 @@ template <class TG, class K>
class NonUniqueUnorderedIndex : public IndexBase<TG, K>
{
public:
using store_t = List<IndexRecord<TG, K>>;
using store_t = ConcurrentList<IndexRecord<TG, K>>;
// typedef T value_type;
// typedef K key_type;

View File

@ -19,21 +19,37 @@ public:
const Order order;
};
// Defines location of index in a sense of what is necessary to be present in
// Edge/Vertex for it to be in index.
struct IndexLocation
{
public:
// Returns code for location.
size_t location_code() const
{
return (property_name.is_present() ? 1 : 0) |
(label_name.is_present() ? 2 : 0) |
(edge_type_name.is_present() ? 4 : 0);
}
IndexLocation clone() const
{
return IndexLocation{side, property_name, label_name, edge_type_name};
}
const DbSide side;
const Option<std::string> property_name;
const Option<std::string> label_name;
const Option<std::string> edge_type_name;
};
// Fully answers:
// On what index?
// Index on what?
// What kind of index?
struct IndexDefinition
{
public:
// Serializes self which can be deserialized
// Serializes self which can be deserialized.
template <class E>
void serialize(E &encoder) const
{
@ -41,6 +57,7 @@ public:
encoder.write_integer(underlying_cast(loc.side));
encoder.write_string(loc.property_name.get_or(empty));
encoder.write_string(loc.label_name.get_or(empty));
encoder.write_string(loc.edge_type_name.get_or(empty));
encoder.write_bool(type.unique);
encoder.write_integer(underlying_cast(type.order));
}
@ -64,14 +81,22 @@ public:
? Option<std::string>()
: Option<std::string>(std::move(label_name_s));
std::string edge_type_name_s;
decoder.string(edge_type_name_s);
auto edge_type_name =
edge_type_name_s.empty()
? Option<std::string>()
: Option<std::string>(std::move(edge_type_name_s));
bool unique = decoder.read_bool();
auto order_v = decoder.integer();
auto order =
order_v == 0 ? None : (order_v == 1 ? Ascending : Descending);
return IndexDefinition{IndexLocation{side, property_name, label_name},
IndexType{unique, order}};
return IndexDefinition{
IndexLocation{side, property_name, label_name, edge_type_name},
IndexType{unique, order}};
}
const IndexLocation loc;

View File

@ -11,7 +11,7 @@ namespace tx
class Transaction;
}
// Holds onde index which can be changed.
// Holds one index which can be changed. Convinient class.
// TG - type group
// K - key of index_records
template <class TG, class K>
@ -25,7 +25,7 @@ public:
IndexHolder(IndexHolder &&) = default;
// Sets index for this property family. Treturns false if index is already
// Sets index for this property family. returns false if index is already
// present.
bool set_index(std::unique_ptr<IndexBase<TG, K>> inx);

View File

@ -1,18 +1,77 @@
#pragma once
#include "database/db.hpp"
#include "query_engine/exceptions/exceptions.hpp"
#include "storage/garbage/garbage.hpp"
#include "storage/graph.hpp"
#include "storage/indexes/impl/nonunique_unordered_index.hpp"
#include "storage/indexes/impl/unique_ordered_index.hpp"
#include "storage/indexes/index_definition.hpp"
#include "transactions/engine.hpp"
// Operation on indexes in the Db
// Operation on indexes in the This should be the only place which knows how
// to get all indexes in database.
// TODO: This class should be updated accordingly when adding new place for
// index.
class Indexes
{
public:
Indexes(Db &d) : db(d) {}
bool add_index(IndexDefinition id)
// Adds index defined in given definition. Returns true if successfull.
bool add_index(IndexDefinition id);
// Removes index from given location. Returns true if successfull or if no
// index was present. False if index location is illegal.
bool remove_index(IndexLocation loc)
{
// TODO: Not yet implemented
// assert(false);
return true;
size_t code = loc.location_code();
switch (code) {
case 0: // Illegal location
return false;
case 1:
switch (loc.side) {
case EdgeSide: {
return remove_index(
db.graph.edges
.property_family_find_or_create(loc.property_name.get())
.index);
}
case VertexSide: {
return remove_index(
db.graph.vertices
.property_family_find_or_create(loc.property_name.get())
.index);
}
default:
throw new NonExhaustiveSwitch("Unkown side: " +
std::to_string(loc.side));
};
case 2: // Can't be removed
return false;
case 3: // Not yet implemented
throw new NotYetImplemented("Remove of index over label and "
"property isn't yet implemented");
case 4: // Can't be removed
return false;
case 5: // Not yet implemented
throw new NotYetImplemented("Remove of index over edge_type and "
"property isn't yet implemented");
case 6: // Not yet implemented
throw new NotYetImplemented("Remove of index over edge_type and "
"label isn't yet implemented");
case 7: // Not yet implemented
throw new NotYetImplemented("Remove of index over label, edge_type "
"and property isn't yet implemented");
default:
throw new NonExhaustiveSwitch("Unkown index location code: " +
std::to_string(code));
}
}
// Calls F over all vertex indexes in the database which are readable.
@ -54,7 +113,7 @@ public:
}
}
// TODO: other properti indexes
// TODO: other property indexes
}
return true;
@ -76,5 +135,103 @@ private:
// coded into the database.
}
// Creates type of index specified ind index definition.
// TG - type group vertex/edge
// K - key of index
template <class TG, class K>
std::unique_ptr<IndexBase<TG, K>> create_index(IndexDefinition id,
tx::Transaction const &t)
{
// Determine which index is needed
if (id.type.unique) {
switch (id.type.order) {
case None: {
// TODO: Implement this version of index.
throw NotYetImplemented(
"Missing implementation for Unique Unordered Index");
}
case Ascending:
case Descending: {
return std::make_unique<UniqueOrderedIndex<TG, K>>(
id.loc.clone(), id.type.order, t);
}
default:
throw new NonExhaustiveSwitch("Unknown order: " +
std::to_string(id.type.order));
};
} else {
switch (id.type.order) {
case None: {
return std::make_unique<NonUniqueUnorderedIndex<TG, K>>(
id.loc.clone(), t);
}
case Ascending:
case Descending: {
// TODO: Implement this version of index.
throw NotYetImplemented(
"Missing implementation for Nonunique Ordered Index");
}
default:
throw new NonExhaustiveSwitch("Unknown order: " +
std::to_string(id.type.order));
};
}
}
// Fills index with returned elements. Return true if successfully filled.
// Iterator must return std::pair<TG::accessor_t, K>
template <class TG, class K, class I>
bool fill_index(DbTransaction &t, IndexHolder<TG, K> &holder, I &&iter)
{
// Wait for all other active transactions to finish so that this
// transaction can see there changes so that they can be added into
// index.
t.trans.wait_for_active();
auto oindex = holder.get_write(t.trans);
if (oindex.is_present()) {
auto index = oindex.get();
// Iterate over all elements and add them into index. Fail if
// some insert failed.
bool res = iter.all([&](auto elem) {
if (!index->insert(elem.first.create_index_record(
std::move(elem.second)))) {
// Index is probably unique.
auto owned_maybe = holder.remove_index(index);
if (owned_maybe.is_present()) {
db.garbage.dispose(db.tx_engine.snapshot(),
owned_maybe.get().release());
}
return false;
}
return true;
});
if (res) {
index->activate();
return true;
}
}
return false;
}
// Removes index from index holder
template <class TG, class K>
bool remove_index(IndexHolder<TG, K> &ih)
{
auto owned_maybe = ih.remove_index();
if (owned_maybe.is_present()) {
db.garbage.dispose(db.tx_engine.snapshot(),
owned_maybe.get().release());
return true;
}
return false;
}
Db &db;
};

View File

@ -235,6 +235,9 @@ public:
return !(*this == other);
}
// True if NULL
bool is_empty() const { return is<Null>(); }
// True if contains T property
template <class D>
bool is() const

View File

@ -58,6 +58,13 @@ public:
return record->visible(id);
}
// Returns new IndexRecord with given key.
template <class K>
IndexRecord<TG, K> create_index_record(K &&key)
{
return IndexRecord<TG, K>(std::move(key), record, vlist);
}
// TODO: Test this
Derived update() const
{

View File

@ -3,11 +3,14 @@
#include <algorithm>
#include <vector>
#include "mvcc/id.hpp"
#include "utils/option.hpp"
namespace tx
{
class Engine;
template <class id_t>
class Snapshot
{
@ -20,6 +23,9 @@ public:
Snapshot(Snapshot &&other) { active = std::move(other.active); }
// True if all transaction from snapshot have finished.
bool all_finished(Engine &engine);
bool is_active(id_t xid) const
{
return std::binary_search(active.begin(), active.end(), xid);

View File

@ -28,6 +28,10 @@ public:
TransactionRead(const Id &id, const Snapshot<Id> &snapshot, Engine &engine);
// True if this transaction and every transaction from snapshot have
// finished.
bool all_finished();
// Return id of oldest transaction from snapshot.
Id oldest_active();

View File

@ -98,6 +98,13 @@ public:
});
}
// Filters with property under given key
template <class KEY>
auto has_property(KEY &key)
{
return filter([&](auto &va) { return !va.at(key).is_empty(); });
}
// Filters with property under given key
template <class KEY, class PROP>
auto has_property(KEY &key, PROP const &prop)
@ -151,5 +158,21 @@ public:
{
iter::for_all(move(), std::move(op));
}
// All items must satisfy given predicate for this function to return true.
// Otherwise stops calling predicate on firts false and returns fasle.
template <class OP>
bool all(OP &&op)
{
auto iter = move();
auto e = iter.next();
while (e.is_present()) {
if (!op(e.take())) {
return false;
}
e = iter.next();
}
return true;
}
};
}

View File

@ -4,73 +4,10 @@
#include "storage/indexes/indexes.hpp"
#include "storage/model/properties/property_family.hpp"
Db::Db() : name_("default"), snap_engine(*this, "default")
{
snap_engine.import();
}
Db::Db() : Db("default") {}
Db::Db(const std::string &name) : name_(name), snap_engine(*this, name)
{
snap_engine.import();
}
std::string const &Db::name() const { return name_; }
Db::Db(const std::string &name) : name_(name) { snap_engine.import(); }
Indexes Db::indexes() { return Indexes(*this); }
template <class TG, class I, class G>
bool Db::create_index_on_vertex_property_family(const char *name, G &coll,
I &create_index)
{
auto &family = coll.property_family_find_or_create(name);
bool added_index = false;
DbTransaction t(*this, tx_engine.begin([&](auto t) {
added_index = family.index.set_index(create_index(t));
}));
if (added_index) {
t.trans.wait_for_active();
auto oindex = family.index.get_write(t.trans);
if (oindex.is_present()) {
auto index = oindex.get();
for (auto &vr : coll.access()) {
auto v = vr.second.find(t.trans);
if (v != nullptr) {
if (!index->insert(IndexRecord<TG, std::nullptr_t>(
std::nullptr_t(), v, &vr.second))) {
// Index is probably unique.
auto owned_maybe = family.index.remove_index(index);
if (owned_maybe.is_present()) {
garbage.dispose(tx_engine.snapshot(),
owned_maybe.get().release());
}
t.trans.abort();
return false;
}
}
}
index->activate();
t.trans.commit();
return true;
}
}
t.trans.abort();
return false;
}
template <class TG, class K>
bool Db::remove_index(IndexHolder<TG, K> &ih)
{
auto owned_maybe = ih.remove_index();
if (owned_maybe.is_present()) {
garbage.dispose(tx_engine.snapshot(), owned_maybe.get().release());
return true;
}
return false;
}
std::string const &Db::name() const { return name_; }

View File

@ -48,7 +48,8 @@ void DbTransaction::clean_vertex_section()
Id oldest_active = trans.oldest_active();
// Clean indexes
db.indexes().vertex_indexes([&](auto &in) { in.clean(oldest_active); });
db.indexes().vertex_indexes(
[&](auto &in) { in.clean(oldest_active); });
// Clean vertex list
clean_version_lists(db.graph.vertices.access(), oldest_active);
@ -67,7 +68,8 @@ bool DbTransaction::update_indexes()
TRY(e.record->data.edge_type->index().insert(
EdgeTypeIndexRecord(std::nullptr_t(), e.record, e.vlist)));
TRY(db.indexes().update_property_indexes<TypeGroupEdge>(e, trans));
TRY(db.indexes().update_property_indexes<TypeGroupEdge>(
e, trans));
} else {
auto v = iu.v;
@ -80,8 +82,8 @@ bool DbTransaction::update_indexes()
LabelIndexRecord(std::nullptr_t(), v.record, v.vlist)));
}
TRY(db.indexes().update_property_indexes<TypeGroupVertex>(v,
trans));
TRY(db.indexes().update_property_indexes<TypeGroupVertex>(
v, trans));
}
index_updates.pop_back();

View File

@ -30,8 +30,13 @@ Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs, size_t cleaning_cycle)
try {
logger.info("Cleaning edges");
t.clean_edge_section();
logger.info("Cleaning vertices");
t.clean_vertex_section();
logger.info("Cleaning garbage");
db.second.garbage.clean();
} catch (const std::exception &e) {
logger.error(
"Error occured while cleaning database \"{}\"",

View File

@ -9,10 +9,10 @@
#include "threading/thread.hpp"
#include "utils/sys.hpp"
SnapshotEngine::SnapshotEngine(Db &db, std::string const &name)
SnapshotEngine::SnapshotEngine(Db &db)
: snapshot_folder(CONFIG(config::SNAPSHOTS_PATH)), db(db),
max_retained_snapshots(CONFIG_INTEGER(config::MAX_RETAINED_SNAPSHOTS)),
logger(logging::log->logger("SnapshotEngine db[" + name + "]"))
logger(logging::log->logger("SnapshotEngine db[" + db.name() + "]"))
{
}
@ -268,11 +268,14 @@ bool SnapshotEngine::snapshot_load(DbAccessor &t, SnapshotDecoder &snap)
// Load indexes
snap.start_indexes();
auto indexs = db.indexes();
while (!snap.end()) {
// This will add index. It is alright for now to ignore if add_index
// return false.
indexs.add_index(snap.load_index());
// This will add index.
// TODO: It is alright for now to ignore if add_index return false. I am
// not even sure if false should stop snapshot loading.
if (!db.indexes().add_index(snap.load_index())) {
logger.warn("Failed to add index, but still continuing with "
"loading snapshot");
}
}
return true;

View File

@ -2,10 +2,15 @@
void Garbage::dispose(tx::Snapshot<Id> &&snapshot, DeleteSensitive *data)
{
// TODO: add to list
// If this fails it's better to leak memory than to cause read after free.
gar.begin().push(std::make_pair(snapshot, data));
}
void Garbage::clean(tx::Engine &engine)
void Garbage::clean()
{
// TODO: iterator throug list and check snapshot
for (auto it = gar.begin(); it != gar.end(); it++) {
if (it->first.all_finished(engine) && it.remove()) {
it->second->~DeleteSensitive();
}
}
}

View File

@ -0,0 +1,136 @@
#include "storage/indexes/indexes.hpp"
#include "database/db_accessor.hpp"
bool Indexes::add_index(IndexDefinition id)
{
auto logger = logging::log->logger("Add index");
logger.info("Starting");
// Closure which needs to be runned to finish creating index.
std::function<bool(DbTransaction &)> finish = [](auto &t) { return false; };
// Creates transaction and during it's creation adds index into it's
// place. Also created finish closure which will add necessary elements
// into index.
DbTransaction t(db, db.tx_engine.begin([&, id](auto &t) mutable {
size_t code = id.loc.location_code();
switch (code) {
// Index over nothing
case 0: // Illegal location
break;
// Index over property
case 1: {
switch (id.loc.side) {
case EdgeSide: {
auto &holder = db.graph.edges
.property_family_find_or_create(
id.loc.property_name.get())
.index;
if (holder.set_index(
create_index<TypeGroupEdge, std::nullptr_t>(id, t))) {
// Set closure which will fill index.
finish = [&](auto &t) mutable {
DbAccessor acc(t.db, t.trans);
auto &key = acc.edge_property_family_get(
id.loc.property_name.get());
return fill_index(
t, holder,
acc.edge_access().fill().has_property(key).map(
[&](auto ra) {
return std::make_pair(std::move(ra),
std::nullptr_t());
}));
};
}
break;
}
case VertexSide: { // TODO: Duplicate code as above, only difference
// is vertices/vertex
auto &holder = db.graph.vertices
.property_family_find_or_create(
id.loc.property_name.get())
.index;
if (holder.set_index(
create_index<TypeGroupVertex, std::nullptr_t>(id, t))) {
// Set closure which will fill index.
finish = [&](auto &t) mutable {
DbAccessor acc(t.db, t.trans);
auto &key = acc.vertex_property_family_get(
id.loc.property_name.get());
return fill_index(
t, holder,
acc.vertex_access().fill().has_property(key).map(
[&](auto ra) {
return std::make_pair(std::move(ra),
std::nullptr_t());
}));
};
}
break;
}
default:
logger.error("Unkown side: " + std::to_string(id.loc.side));
};
break;
}
// Index over label
case 2: { // Always added
finish = [](auto &t) { return true; };
return;
}
// Index over property and label
case 3: { // Not yet implemented
logger.error("Remove of index over label and "
"property isn't yet implemented");
break;
}
// Index over edge_type
case 4: { // Always added
finish = [](auto &t) { return true; };
break;
}
// Index over property and edge_type
case 5: { // Not yet implemented
logger.error("Remove of index over edge_type and "
"property isn't yet implemented");
break;
}
// Index over edge_type and label
case 6: { // Not yet implemented
logger.error("Remove of index over edge_type and "
"label isn't yet implemented");
break;
}
// Index over property, edge_type and label
case 7: { // Not yet implemented
logger.error("Remove of index over label, edge_type "
"and property isn't yet implemented");
break;
}
default:
logger.error("Unkown index location code: " + std::to_string(code));
}
}));
if (finish(t)) {
t.trans.commit();
return true;
} else {
t.trans.abort();
return false;
}
}

View File

@ -0,0 +1,17 @@
#include "transactions/snapshot.hpp"
#include "transactions/engine.hpp"
template <class id_t>
bool tx::Snapshot<id_t>::all_finished(Engine &engine)
{
for (auto &sid : active) {
if (engine.clog.is_active(sid)) {
return false;
}
}
return true;
}
template class tx::Snapshot<Id>;

View File

@ -25,11 +25,11 @@ TransactionRead Transaction::transaction_read()
void Transaction::wait_for_active()
{
while (snapshot.size() > 0) {
auto id = snapshot.back();
while (engine.clog.fetch_info(id).is_active()) {
auto sid = snapshot.back();
while (engine.clog.fetch_info(sid).is_active()) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
snapshot.remove(id);
snapshot.remove(sid);
}
}

View File

@ -1,5 +1,7 @@
#include "transactions/transaction_read.hpp"
#include "transactions/engine.hpp"
namespace tx
{
@ -20,6 +22,11 @@ TransactionRead::TransactionRead(const Id &id, const Snapshot<Id> &snapshot,
{
}
bool TransactionRead::all_finished()
{
return !engine.clog.is_active(id) && snapshot.all_finished(engine);
}
bool TransactionRead::in_snapshot(const Id &id) const
{
return snapshot.is_active(id);

View File

@ -180,7 +180,7 @@ int main(void)
clear_database(db, stripper, query_functions);
db.snap_engine.import();
{
Db db2;
Db db2("snapshot");
assert(equal(db, db2));
}

View File

@ -5,7 +5,7 @@
TEST_CASE("Conncurent List insert")
{
List<int> list;
ConcurrentList<int> list;
auto it = list.begin();
it.push(32);
it.reset();
@ -14,7 +14,7 @@ TEST_CASE("Conncurent List insert")
TEST_CASE("Conncurent List iterate")
{
List<int> list;
ConcurrentList<int> list;
auto it = list.begin();
it.push(32);
it.push(7);
@ -35,7 +35,7 @@ TEST_CASE("Conncurent List iterate")
TEST_CASE("Conncurent List head remove")
{
List<int> list;
ConcurrentList<int> list;
auto it = list.begin();
it.push(32);
it.reset();
@ -50,7 +50,7 @@ TEST_CASE("Conncurent List head remove")
TEST_CASE("Conncurent List remove")
{
List<int> list;
ConcurrentList<int> list;
auto it = list.begin();
it.push(32);
it.push(7);