Newer bits on head chunk in DynamicBitset

Reviewers: florijan, buda

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D813
This commit is contained in:
Mislav Bradac 2017-09-20 16:15:16 +02:00
parent 182a9241cf
commit 0eceefb2d4
3 changed files with 108 additions and 124 deletions

View File

@ -6,208 +6,185 @@
#include "threading/sync/spinlock.hpp"
#include "utils/assert.hpp"
/** A sequentially ordered non-unique
* lock-free concurrent collection of bits.
/**
* A sequentially ordered non-unique lock-free concurrent collection of bits.
*
* Grows dynamically to accomodate the maximum
* set-bit position. Does not dynamically
* decrease in size.
* Grows dynamically to accomodate the maximum set-bit position. Does not
* dynamically decrease in size.
*
* Bits can be set, retrieved and cleared
* in groups. Note that all group operations
* fail if the group spans multiple basic
* storage units. For example, if basic storage
* is in 8 bits, calling at(5, 2) is legal,
* but calling at(7, 2) is not.
* Bits can be set, retrieved and cleared in groups. Note that all group
* operations fail if the group spans multiple basic storage units. For example,
* if basic storage is in 8 bits, calling at(5, 2) is legal, but calling at(7,
* 2) is not.
*
* Organizes bits into chunks. Finding the
* right chunk has has O(n) lookup performance,
* so use large chunks for better speed in large
* bitsets. At the same time larger chunks
* mean coarser-grained memory allocation.
*
* @tparam block_t - Basic storage type. Must have
* lock-free atomic. Should probably always be uint_8.
* @tparam block_t - Basic storage type. Must have lock-free atomic. Should
* probably always be uint_8.
* @tparam chunk_size - number of bits in one chunk
*/
template <class block_t = uint8_t, size_t chunk_size = 32768>
class DynamicBitset : Lockable<SpinLock> {
// basic storage unit
class DynamicBitset {
// Basic storage unit.
struct Block {
Block() = default;
// the number of bits in one Block
static constexpr size_t size = sizeof(block_t) * 8;
Block(const Block &) = delete;
Block(Block &&) = delete;
Block &operator=(const Block &) = delete;
Block &operator=(Block &&) = delete;
// The number of bits in one Block.
static constexpr size_t kSize = sizeof(block_t) * 8;
block_t at(size_t k, size_t n) const {
debug_assert(k + n - 1 < size, "Invalid index.");
return (block.load() >> k) & bitmask(n);
debug_assert(k + n - 1 < kSize, "Invalid index.");
return (block_.load() >> k) & bitmask(n);
}
void set(size_t k, size_t n) {
debug_assert(k + n - 1 < size, "Invalid index.");
block.fetch_or(bitmask(n) << k);
debug_assert(k + n - 1 < kSize, "Invalid index.");
block_.fetch_or(bitmask(n) << k);
}
void clear(size_t k, size_t n) {
debug_assert(k + n - 1 < size, "Invalid index.");
block.fetch_and(~(bitmask(n) << k));
debug_assert(k + n - 1 < kSize, "Invalid index.");
block_.fetch_and(~(bitmask(n) << k));
}
private:
std::atomic<block_t> block{0};
std::atomic<block_t> block_{0};
constexpr block_t bitmask(size_t group_size) const {
return (block_t)(-1) >> (size - group_size);
return (block_t)(-1) >> (kSize - group_size);
}
};
struct Chunk {
Chunk() : next(nullptr) {
static_assert(chunk_size % sizeof(block_t) == 0,
Chunk(Chunk *next, int64_t chunk_id) : next_(next), chunk_id_(chunk_id) {
static_assert(chunk_size % Block::kSize == 0,
"chunk size not divisible by block size");
}
Chunk(Chunk &) = delete;
Chunk(const Chunk &) = delete;
Chunk(Chunk &&) = delete;
Chunk &operator=(const Chunk &) = delete;
Chunk &operator=(Chunk &&) = delete;
// the number of bits in one chunk
static constexpr size_t size = chunk_size * Block::size;
// the number of blocks in one chunk
static constexpr size_t n_blocks = chunk_size / sizeof(block_t);
// The number of bits in one chunk.
static constexpr size_t kSize = chunk_size;
// The number of blocks_ in one chunk.
static constexpr size_t kNumBlocks = chunk_size / Block::kSize;
block_t at(size_t k, size_t n) const {
return blocks[k / Block::size].at(k % Block::size, n);
return blocks_[k / Block::kSize].at(k % Block::kSize, n);
}
void set(size_t k, size_t n) {
blocks[k / Block::size].set(k % Block::size, n);
blocks_[k / Block::kSize].set(k % Block::kSize, n);
}
void clear(size_t k, size_t n) {
blocks[k / Block::size].clear(k % Block::size, n);
blocks_[k / Block::kSize].clear(k % Block::kSize, n);
}
Block blocks[n_blocks];
std::atomic<Chunk *> next;
// Range of the bits stored in this chunk is [low, high>.
int64_t low() const { return chunk_id_ * kSize; }
int64_t high() const { return (chunk_id_ + 1) * kSize; }
Block blocks_[kNumBlocks];
std::atomic<Chunk *> next_;
const int64_t chunk_id_;
};
public:
DynamicBitset(){};
DynamicBitset() {}
// can't move nor copy a DynamicBitset because of atomic
// head and locking
// Can't move nor copy a DynamicBitset because of atomic head and locking.
DynamicBitset(const DynamicBitset &) = delete;
DynamicBitset(DynamicBitset &&) = delete;
DynamicBitset &operator=(const DynamicBitset &) = delete;
DynamicBitset &operator=(DynamicBitset &&) = delete;
~DynamicBitset() {
auto now = head.load();
auto now = head_.load();
while (now != nullptr) {
auto next = now->next.load();
auto next = now->next_.load();
delete now;
now = next;
}
}
/** Gets the block of bit starting at bit k
* and containing the the following n bits.
* The bit index with k in this bitset is
* zeroth bit in the returned value.
/**
* Gets the block of bit starting at bit k and containing the the following n
* bits. The bit index with k in this bitset is zeroth bit in the returned
* value.
*/
block_t at(size_t k, size_t n) const {
if (k >= Chunk::size * chunk_count) return 0;
if (k >= head_.load()->high()) return 0;
auto &chunk = find_chunk(k);
const auto &chunk = FindChunk(k);
return chunk.at(k, n);
}
/** Returns k-th bit's value. */
bool at(size_t k) const { return at(k, 1); }
/** Set all the bits in the group of size `n`, starting from
* bit `k`.
/**
* Set all the bits in the group of size `n`, starting from bit `k`.
*/
void set(size_t k, size_t n = 1) {
auto &chunk = find_or_create_chunk(k);
auto &chunk = FindOrCreateChunk(k);
return chunk.set(k, n);
}
/** Clears all the bits in the group of size `n`, starting
* from bit `k`.
* */
/**
* Clears all the bits in the group of size `n`, starting from bit `k`.
*/
void clear(size_t k, size_t n = 1) {
// if desired bit is out of bounds, it's already clear
if (k >= Chunk::size * chunk_count) return;
// If desired bit is out of bounds, it's already clear.
if (k >= head_.load()->high()) return;
auto &chunk = find_or_create_chunk(k);
auto &chunk = FindOrCreateChunk(k);
return chunk.clear(k, n);
}
private:
// finds the chunk to which k-th bit belong. if k is
// out of bounds, the chunk for it is created
Chunk &find_or_create_chunk(size_t &k) {
Chunk *chunk = head.load(), *next = nullptr;
// Finds the chunk to which k-th bit belongs fails if k is out of bounds.
const Chunk &FindChunk(size_t &k) const {
debug_assert(k < head_.load()->high(), "Index out of bounds");
Chunk *chunk = head_;
// while i'm not in the right chunk
// (my index is bigger than the size of this chunk)
while (k >= Chunk::size) {
next = chunk->next.load();
// if a next chunk exists, switch to it and decrement my
// pointer by the size of the current chunk
if (next != nullptr) {
chunk = next;
k -= Chunk::size;
continue;
}
// the next chunk does not exist and we need it. take an exclusive
// lock to prevent others that also want to create a new chunk
// from creating it
auto guard = acquire_unique();
// double-check locking. if the chunk exists now, some other thread
// has just created it, continue searching for my chunk
if (chunk->next.load() != nullptr) continue;
chunk->next.store(new Chunk());
chunk_count++;
while (k < chunk->low()) {
chunk = chunk->next_;
debug_assert(chunk != nullptr, "chunk is nullptr");
}
debug_assert(chunk != nullptr, "Chunk is nullptr.");
k -= chunk->low();
return *chunk;
}
// finds the chunk to which k-th bit belongs.
// fails if k is out of bounds
const Chunk &find_chunk(size_t &k) const {
debug_assert(k < chunk_size * Chunk::size, "Index out of bounds");
Chunk *chunk = head.load(), *next = nullptr;
/**
* Finds the chunk to which k-th bit belong. If k is out of bounds, the chunk
* for it is created.
*/
Chunk &FindOrCreateChunk(size_t &k) {
Chunk *head = head_;
// while i'm not in the right chunk
// (my index is bigger than the size of this chunk)
while (k >= Chunk::size) {
next = chunk->next.load();
// if a next chunk exists, switch to it and decrement my
// pointer by the size of the current chunk
if (next != nullptr) {
chunk = next;
k -= Chunk::size;
while (k >= head->high()) {
// The next chunk does not exist and we need it, so we will try to create
// it.
Chunk *new_head = new Chunk(head, head->chunk_id_ + 1);
if (!head_.compare_exchange_strong(head, new_head)) {
// Other thread updated head_ before us, so we need to delete new_head.
head = head_;
delete new_head;
continue;
}
// the next chunk does not exist, this is illegal state
permanent_fail("Out of bounds");
}
debug_assert(chunk != nullptr, "Chunk is nullptr.");
return *chunk;
// Now we are sure chunk exists and we can call find function.
// const_cast is used to avoid code duplication.
return const_cast<Chunk &>(FindChunk(k));
}
std::atomic<Chunk *> head{new Chunk()};
std::atomic<int64_t> chunk_count{1};
std::atomic<Chunk *> head_{new Chunk(nullptr, 0)};
};

View File

@ -52,9 +52,6 @@ class CommitLog {
Info fetch_info(transaction_id_t id) const { return Info{log.at(2 * id, 2)}; }
// TODO: Searching the log will take more and more time the more and more
// transactoins are done. This could be awerted if DynamicBitset is changed
// to point to largest chunk instead of the smallest.
DynamicBitset<uint8_t, 32768> log;
};
}

View File

@ -1,9 +1,18 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "data_structures/bitset/dynamic_bitset.hpp"
TEST(DynamicBitset, BasicAtAndSet) {
DynamicBitset<> db;
namespace {
template <typename T>
class DynamicBitsetTest : public ::testing::Test {};
typedef ::testing::Types<DynamicBitset<>, DynamicBitset<uint8_t, 8>>
DynamicBitsetTypes;
TYPED_TEST_CASE(DynamicBitsetTest, DynamicBitsetTypes);
TYPED_TEST(DynamicBitsetTest, BasicAtAndSet) {
TypeParam db;
EXPECT_EQ(db.at(17, 1), 0);
EXPECT_EQ(db.at(17), false);
@ -12,8 +21,8 @@ TEST(DynamicBitset, BasicAtAndSet) {
EXPECT_EQ(db.at(17), true);
}
TEST(DynamicBitset, GroupAt) {
DynamicBitset<> db;
TYPED_TEST(DynamicBitsetTest, GroupAt) {
TypeParam db;
db.set(0, 1);
db.set(1, 1);
@ -27,8 +36,8 @@ TEST(DynamicBitset, GroupAt) {
EXPECT_EQ(db.at(1, 3), 1 | 4);
}
TEST(DynamicBitset, GroupSet) {
DynamicBitset<> db;
TYPED_TEST(DynamicBitsetTest, GroupSet) {
TypeParam db;
EXPECT_EQ(db.at(0, 3), 0);
db.set(1, 2);
EXPECT_FALSE(db.at(0));
@ -78,3 +87,4 @@ TEST(DynamicBitset, ConstBitset) {
dbs.set(17);
const_accepting(dbs);
}
}