Commit d04e4599 authored by unknown's avatar unknown

ndb - wl#1497 LinearPool seize_index


storage/ndb/src/kernel/vm/LinearPool.hpp:
  wl#1497 LinearPool seize_index
storage/ndb/src/kernel/vm/testSuperPool.cpp:
  wl#1497 LinearPool seize_index
parent b2b1ec99
......@@ -17,7 +17,8 @@
#ifndef LINEAR_POOL_HPP
#define LINEAR_POOL_HPP
#include <SuperPool.hpp>
#include <Bitmask.hpp>
#include "SuperPool.hpp"
/*
* LinearPool - indexed record pool
......@@ -38,22 +39,26 @@
*
* This works exactly like numbers in a given base. Each map has base
* size entries. For implementation convenience the base must be power
* of 2 and is given as its log2 value.
* of 2 between 2^1 and 2^15. It is given by its log2 value (1-15).
*
* A position in a map is also called a "digit".
*
* There is a doubly linked list of available maps (some free entries)
* on each level. There is a singly linked free list within each map.
* on each level. There is a doubly linked freelist within each map.
* There is also a bitmask of used entries in each map.
*
* Level 0 free entry has space for one record. Level N free entry
* implies space for base^N records. The implied levels are created and
* removed on demand. Completely empty maps are removed.
* removed on demand. Empty maps are usually removed.
*
* Default base is 256 (log2 = 8) which requires maximum 4 levels or
* "digits" (similar to ip address).
* digits (similar to ip address).
*
* TODO
*
* - move most of the inline code to LinearPool.cpp
* - add methods to check / seize user-specified index
* - optimize for common case
* - add optimized 2-level implementation (?)
*/
#include "SuperPool.hpp"
......@@ -71,6 +76,9 @@ class LinearPool {
// Max possible levels (0 to max root level).
STATIC_CONST( MaxLevels = (32 + LogBase - 1) / LogBase );
// Number of words in map used bit mask.
STATIC_CONST( BitmaskSize = (Base + 31) / 32 );
// Map.
struct Map {
Uint32 m_level;
......@@ -80,6 +88,7 @@ class LinearPool {
Uint32 m_index; // from root to here
PtrI m_nextavail;
PtrI m_prevavail;
Uint32 m_bitmask[BitmaskSize];
PtrI m_entry[Base];
};
......@@ -97,9 +106,15 @@ public:
// Allocate record from the pool. Reuses free index if possible.
bool seize(Ptr<T>& ptr);
// Allocate given index. Like seize but returns -1 if in use.
int seize_index(Ptr<T>& ptr, Uint32 index);
// Return record to the pool.
void release(Ptr<T>& ptr);
// Return number of used records (may require 1 page scan).
Uint32 count();
// Verify (debugging).
void verify();
......@@ -114,6 +129,12 @@ private:
// Add new non-root map.
bool add_map(Ptr<Map>& map_ptr, Ptr<Map> parent_ptr, Uint32 digit);
// Subroutine to initialize map free lists.
void init_free(Ptr<Map> map_ptr);
// Add entry at given free position.
void add_entry(Ptr<Map> map_ptr, Uint32 digit, PtrI ptr_i);
// Remove entry and map if it becomes empty.
void remove_entry(Ptr<Map> map_ptr, Uint32 digit);
......@@ -126,8 +147,11 @@ private:
// Remove map from available list.
void remove_avail(Ptr<Map> map_ptr);
// Verify available lists
void verify_avail();
// Verify map (recursive).
void verify(Ptr<Map> map_ptr, Uint32 level);
void verify_map(Ptr<Map> map_ptr, Uint32 level, Uint32* count);
RecordPool<T> m_records;
RecordPool<Map> m_maps;
......@@ -166,6 +190,7 @@ LinearPool<T, LogBase>::getPtr(Ptr<T>& ptr)
// get record
Ptr<T> rec_ptr;
Uint32 digit = index & DigitMask;
assert(BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit));
rec_ptr.i = map_ptr.p->m_entry[digit];
m_records.getPtr(rec_ptr);
ptr.p = rec_ptr.p;
......@@ -193,26 +218,20 @@ LinearPool<T, LogBase>::seize(Ptr<T>& ptr)
}
m_maps.getPtr(map_ptr);
// walk down creating missing levels and using an entry on each
Uint32 firstfree;
Uint32 digit;
Ptr<Map> new_ptr;
new_ptr.i = RNIL;
while (true) {
assert(map_ptr.p->m_occup < Base);
map_ptr.p->m_occup++;
firstfree = map_ptr.p->m_firstfree;
assert(firstfree < Base);
map_ptr.p->m_firstfree = map_ptr.p->m_entry[firstfree];
if (map_ptr.p->m_occup == Base) {
assert(map_ptr.p->m_firstfree == Base);
// remove from available list
remove_avail(map_ptr);
}
digit = map_ptr.p->m_firstfree;
if (n == 0)
break;
Ptr<Map> child_ptr;
if (! add_map(child_ptr, map_ptr, firstfree)) {
remove_entry(map_ptr, firstfree);
if (! add_map(child_ptr, map_ptr, digit)) {
if (new_ptr.i != RNIL)
remove_map(new_ptr);
return false;
}
map_ptr.p->m_entry[firstfree] = child_ptr.i;
new_ptr = child_ptr;
map_ptr = child_ptr;
n--;
}
......@@ -220,11 +239,71 @@ LinearPool<T, LogBase>::seize(Ptr<T>& ptr)
assert(map_ptr.p->m_level == 0);
Ptr<T> rec_ptr;
if (! m_records.seize(rec_ptr)) {
remove_entry(map_ptr, firstfree);
if (new_ptr.i != RNIL)
remove_map(new_ptr);
return false;
}
map_ptr.p->m_entry[firstfree] = rec_ptr.i;
ptr.i = firstfree + (map_ptr.p->m_index << LogBase);
add_entry(map_ptr, digit, rec_ptr.i);
ptr.i = digit + (map_ptr.p->m_index << LogBase);
ptr.p = rec_ptr.p;
return true;
}
template <class T, Uint32 LogBase>
inline int
LinearPool<T, LogBase>::seize_index(Ptr<T>& ptr, Uint32 index)
{
// extract all digits at least up to current root level
Uint32 digits[MaxLevels];
Uint32 n = 0;
Uint32 tmp = index;
do {
digits[n] = tmp & DigitMask;
tmp >>= LogBase;
} while (++n < m_levels || tmp != 0);
// add any new root levels
while (n > m_levels) {
if (! add_root())
return false;
}
// start from root
Ptr<Map> map_ptr;
map_ptr.i = m_root;
m_maps.getPtr(map_ptr);
// walk down creating or re-using existing levels
Uint32 digit;
bool used;
Ptr<Map> new_ptr;
new_ptr.i = RNIL;
while (true) {
digit = digits[--n];
used = BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit);
if (n == 0)
break;
if (used) {
map_ptr.i = map_ptr.p->m_entry[digit];
m_maps.getPtr(map_ptr);
} else {
Ptr<Map> child_ptr;
if (! add_map(child_ptr, map_ptr, digit)) {
if (new_ptr.i != RNIL)
remove_map(new_ptr);
}
new_ptr = child_ptr;
map_ptr = child_ptr;
}
}
// now at level 0
assert(map_ptr.p->m_level == 0);
Ptr<T> rec_ptr;
if (used || ! m_records.seize(rec_ptr)) {
if (new_ptr.i != RNIL)
remove_map(new_ptr);
return used ? -1 : false;
}
add_entry(map_ptr, digit, rec_ptr.i);
assert(index == digit + (map_ptr.p->m_index << LogBase));
ptr.i = index;
ptr.p = rec_ptr.p;
return true;
}
......@@ -249,17 +328,32 @@ LinearPool<T, LogBase>::release(Ptr<T>& ptr)
ptr.p = 0;
}
template <class T, Uint32 LogBase>
inline Uint32
LinearPool<T, LogBase>::count()
{
SuperPool& sp = m_records.m_superPool;
Uint32 count1 = sp.getRecUseCount(m_records.m_recInfo);
return count1;
}
template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::verify()
{
if (m_root == RNIL)
verify_avail();
if (m_root == RNIL) {
assert(m_levels == 0);
return;
}
assert(m_levels != 0);
Ptr<Map> map_ptr;
map_ptr.i = m_root;
m_maps.getPtr(map_ptr);
verify(map_ptr, m_levels - 1);
Uint32 count1 = count();
Uint32 count2 = 0;
verify_map(map_ptr, m_levels - 1, &count2);
assert(count1 == count2);
}
// private methods
......@@ -303,26 +397,18 @@ LinearPool<T, LogBase>::add_root()
assert(n < MaxLevels);
// set up
map_ptr.p->m_level = n;
if (n == 0) {
map_ptr.p->m_occup = 0;
map_ptr.p->m_firstfree = 0;
} else {
// on level > 0 digit 0 points to old root
map_ptr.p->m_occup = 1;
map_ptr.p->m_firstfree = 1;
map_ptr.p->m_parent = RNIL;
map_ptr.p->m_index = 0;
init_free(map_ptr);
// on level > 0 digit 0 points to old root
if (n > 0) {
Ptr<Map> old_ptr;
old_ptr.i = m_root;
m_maps.getPtr(old_ptr);
assert(old_ptr.p->m_parent == RNIL);
old_ptr.p->m_parent = map_ptr.i;
map_ptr.p->m_entry[0] = old_ptr.i;
add_entry(map_ptr, 0, old_ptr.i);
}
// set up free list with Base as terminator
for (Uint32 j = map_ptr.p->m_firstfree; j < Base; j++)
map_ptr.p->m_entry[j] = j + 1;
map_ptr.p->m_parent = RNIL;
map_ptr.p->m_index = 0;
add_avail(map_ptr);
// set new root
m_root = map_ptr.i;
return true;
......@@ -337,25 +423,81 @@ LinearPool<T, LogBase>::add_map(Ptr<Map>& map_ptr, Ptr<Map> parent_ptr, Uint32 d
assert(parent_ptr.p->m_level != 0);
// set up
map_ptr.p->m_level = parent_ptr.p->m_level - 1;
map_ptr.p->m_occup = 0;
map_ptr.p->m_firstfree = 0;
// set up free list with Base as terminator
for (Uint32 j = map_ptr.p->m_firstfree; j < Base; j++)
map_ptr.p->m_entry[j] = j + 1;
map_ptr.p->m_parent = parent_ptr.i;
map_ptr.p->m_index = digit + (parent_ptr.p->m_index << LogBase);
add_avail(map_ptr);
init_free(map_ptr);
add_entry(parent_ptr, digit, map_ptr.i);
return true;
}
template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::init_free(Ptr<Map> map_ptr)
{
map_ptr.p->m_occup = 0;
map_ptr.p->m_firstfree = 0;
// freelist
Uint32 j;
Uint16 back = ZNIL;
for (j = 0; j < Base - 1; j++) {
map_ptr.p->m_entry[j] = back | ((j + 1) << 16);
back = j;
}
map_ptr.p->m_entry[j] = back | (ZNIL << 16);
// bitmask
BitmaskImpl::clear(BitmaskSize, map_ptr.p->m_bitmask);
// add to available
add_avail(map_ptr);
}
template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::add_entry(Ptr<Map> map_ptr, Uint32 digit, PtrI ptr_i)
{
assert(map_ptr.p->m_occup < Base && digit < Base);
assert(! BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit));
// unlink from freelist
Uint32 val = map_ptr.p->m_entry[digit];
Uint16 back = val & ZNIL;
Uint16 forw = val >> 16;
if (back != ZNIL) {
assert(back < Base);
map_ptr.p->m_entry[back] &= ZNIL;
map_ptr.p->m_entry[back] |= (forw << 16);
}
if (forw != ZNIL) {
assert(forw < Base);
map_ptr.p->m_entry[forw] &= (ZNIL << 16);
map_ptr.p->m_entry[forw] |= back;
}
if (back == ZNIL) {
map_ptr.p->m_firstfree = forw;
}
// set new value
map_ptr.p->m_entry[digit] = ptr_i;
map_ptr.p->m_occup++;
BitmaskImpl::set(BitmaskSize, map_ptr.p->m_bitmask, digit);
if (map_ptr.p->m_occup == Base)
remove_avail(map_ptr);
}
template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::remove_entry(Ptr<Map> map_ptr, Uint32 digit)
{
assert(map_ptr.p->m_occup != 0 && digit < Base);
map_ptr.p->m_occup--;
map_ptr.p->m_entry[digit] = map_ptr.p->m_firstfree;
assert(BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit));
// add to freelist
Uint32 firstfree = map_ptr.p->m_firstfree;
map_ptr.p->m_entry[digit] = ZNIL | (firstfree << 16);
if (firstfree != ZNIL) {
assert(firstfree < Base);
map_ptr.p->m_entry[firstfree] &= (ZNIL << 16);
map_ptr.p->m_entry[firstfree] |= digit;
}
map_ptr.p->m_firstfree = digit;
map_ptr.p->m_occup--;
BitmaskImpl::clear(BitmaskSize, map_ptr.p->m_bitmask, digit);
if (map_ptr.p->m_occup + 1 == Base)
add_avail(map_ptr);
else if (map_ptr.p->m_occup == 0)
......@@ -375,7 +517,7 @@ LinearPool<T, LogBase>::remove_map(Ptr<Map> map_ptr)
m_maps.release(map_ptr);
if (m_root == map_ptr_i) {
assert(parent_ptr.i == RNIL);
Uint32 used = m_maps.m_superPool.getRecUseCount(m_maps.m_recInfo);
Uint32 used = count();
assert(used == 0);
m_root = RNIL;
m_levels = 0;
......@@ -431,49 +573,83 @@ LinearPool<T, LogBase>::remove_avail(Ptr<Map> map_ptr)
template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::verify(Ptr<Map> map_ptr, Uint32 level)
LinearPool<T, LogBase>::verify_avail()
{
// check available lists
for (Uint32 n = 0; n < MaxLevels; n++) {
Ptr<Map> map_ptr;
map_ptr.i = m_avail[n];
Uint32 back = RNIL;
while (map_ptr.i != RNIL) {
m_maps.getPtr(map_ptr);
assert(map_ptr.p->m_occup < Base);
assert(back == map_ptr.p->m_prevavail);
back = map_ptr.i;
map_ptr.i = map_ptr.p->m_nextavail;
}
}
}
template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::verify_map(Ptr<Map> map_ptr, Uint32 level, Uint32* count)
{
assert(level < MaxLevels);
assert(map_ptr.p->m_level == level);
Uint32 j = 0;
while (j < Base) {
bool free = false;
Uint32 j2 = map_ptr.p->m_firstfree;
while (j2 != Base) {
if (j2 == j) {
free = true;
break;
}
assert(j2 < Base);
j2 = map_ptr.p->m_entry[j2];
// check freelist
{
Uint32 nused = BitmaskImpl::count(BitmaskSize, map_ptr.p->m_bitmask);
assert(nused <= Base);
assert(map_ptr.p->m_occup == nused);
Uint32 nfree = 0;
Uint32 j = map_ptr.p->m_firstfree;
Uint16 back = ZNIL;
while (j != ZNIL) {
assert(j < Base);
assert(! BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, j));
Uint32 val = map_ptr.p->m_entry[j];
assert(back == (val & ZNIL));
back = j;
j = (val >> 16);
nfree++;
}
if (! free) {
assert(nused + nfree == Base);
}
// check entries
{
for (Uint32 j = 0; j < Base; j++) {
bool free = ! BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, j);
if (free)
continue;
if (level != 0) {
Ptr<Map> child_ptr;
child_ptr.i = map_ptr.p->m_entry[j];
m_maps.getPtr(child_ptr);
assert(child_ptr.p->m_parent == map_ptr.i);
assert(child_ptr.p->m_index == j + (map_ptr.p->m_index << LogBase));
verify(child_ptr, level - 1);
verify_map(child_ptr, level - 1, count);
} else {
Ptr<T> rec_ptr;
rec_ptr.i = map_ptr.p->m_entry[j];
m_records.getPtr(rec_ptr);
(*count)++;
}
Ptr<Map> avail_ptr;
avail_ptr.i = m_avail[map_ptr.p->m_level];
bool found = false;
while (avail_ptr.i != RNIL) {
if (avail_ptr.i == map_ptr.i) {
found = true;
break;
}
m_maps.getPtr(avail_ptr);
avail_ptr.i = avail_ptr.p->m_nextavail;
}
}
// check membership on available list
{
Ptr<Map> avail_ptr;
avail_ptr.i = m_avail[map_ptr.p->m_level];
bool found = false;
while (avail_ptr.i != RNIL) {
if (avail_ptr.i == map_ptr.i) {
found = true;
break;
}
assert(found == (map_ptr.p->m_occup < Base));
m_maps.getPtr(avail_ptr);
avail_ptr.i = avail_ptr.p->m_nextavail;
}
j++;
assert(found == (map_ptr.p->m_occup < Base));
}
}
......
#if 0
make -f Makefile -f - testSuperPool <<'_eof_'
testSuperPool: testSuperPool.cpp libkernel.a
testSuperPool: testSuperPool.cpp libkernel.a LinearPool.hpp
$(CXXCOMPILE) -o $@ $@.cpp libkernel.a -L../../common/util/.libs -lgeneral
_eof_
exit $?
......@@ -57,6 +57,7 @@ random_coprime(Uint32 n)
{
Uint32 prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
Uint32 count = sizeof(prime) / sizeof(prime[0]);
assert(n != 0);
while (1) {
Uint32 i = urandom(count);
if (n % prime[i] != 0)
......@@ -208,7 +209,7 @@ lp_test(GroupPool& gp)
ndbout << "linear pool test" << endl;
Ptr<T> ptr;
Uint32 loop;
for (loop = 0; loop < 3 * loopcount; loop++) {
for (loop = 0; loop < loopcount; loop++) {
int count = 0;
while (1) {
bool ret = lp.seize(ptr);
......@@ -223,6 +224,7 @@ lp_test(GroupPool& gp)
assert(ptr.p == ptr2.p);
count++;
}
assert(count != 0);
ndbout << "seized " << count << endl;
switch (loop % 3) {
case 0:
......@@ -264,6 +266,47 @@ lp_test(GroupPool& gp)
}
break;
}
{ Uint32 cnt = lp.count(); assert(cnt == 0); }
// seize_index test
char *used = new char [10 * count];
memset(used, false, sizeof(used));
Uint32 i, ns = 0, nr = 0;
for (i = 0; i < count; i++) {
Uint32 index = urandom(10 * count);
if (used[index]) {
ptr.i = index;
lp.release(ptr);
lp.verify();
nr++;
} else {
int i = lp.seize_index(ptr, index);
assert(i >= 0);
lp.verify();
if (i == 0) // no space
continue;
assert(ptr.i == index);
Ptr<T> ptr2;
ptr2.i = ptr.i;
ptr2.p = 0;
lp.getPtr(ptr2);
assert(ptr.p == ptr2.p);
ns++;
}
used[index] = ! used[index];
}
ndbout << "random sparse seize " << ns << " release " << nr << endl;
nr = 0;
for (i = 0; i < 10 * count; i++) {
if (used[i]) {
ptr.i = i;
lp.release(ptr);
lp.verify();
used[i] = false;
nr++;
}
}
ndbout << "released " << nr << endl;
{ Uint32 cnt = lp.count(); assert(cnt == 0); }
}
}
......@@ -291,8 +334,10 @@ template static void sp_test<T5>(GroupPool& sp);
template static void lp_test<T3>(GroupPool& sp);
int
main()
main(int argc, char** argv)
{
if (argc > 1 && strncmp(argv[1], "-l", 2) == 0)
loopcount = atoi(argv[1] + 2);
HeapPool sp(pageSize, pageBits);
sp.setInitPages(7);
sp.setMaxPages(7);
......@@ -304,7 +349,7 @@ main()
ndbout << "rand " << s << endl;
int count;
count = 0;
while (++count <= 0) {
while (++count <= 0) { // change to 1 to find new bug
sp_test<T1>(gp);
sp_test<T2>(gp);
sp_test<T3>(gp);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment