Commit c02f63e6 authored by Rusty Russell's avatar Rusty Russell

tdb2: rework hash.c functions to return enum TDB_ERROR.

This time we have to use our tri-value "tdb_bool_err" type to indicate
true, false, or error, which now allows us to correctly handle errors
in key matching (rather than treating it as a non-match).
parent 323a9473
...@@ -82,12 +82,12 @@ static uint32_t use_bits(struct hash_info *h, unsigned num) ...@@ -82,12 +82,12 @@ static uint32_t use_bits(struct hash_info *h, unsigned num)
return bits_from(h->h, 64 - h->hash_used, num); return bits_from(h->h, 64 - h->hash_used, num);
} }
static bool key_matches(struct tdb_context *tdb, static tdb_bool_err key_matches(struct tdb_context *tdb,
const struct tdb_used_record *rec, const struct tdb_used_record *rec,
tdb_off_t off, tdb_off_t off,
const struct tdb_data *key) const struct tdb_data *key)
{ {
bool ret = false; tdb_bool_err ret = false;
const char *rkey; const char *rkey;
if (rec_key_length(rec) != key->dsize) { if (rec_key_length(rec) != key->dsize) {
...@@ -97,8 +97,7 @@ static bool key_matches(struct tdb_context *tdb, ...@@ -97,8 +97,7 @@ static bool key_matches(struct tdb_context *tdb,
rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false); rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
if (TDB_PTR_IS_ERR(rkey)) { if (TDB_PTR_IS_ERR(rkey)) {
tdb->ecode = TDB_PTR_ERR(rkey); return TDB_PTR_ERR(rkey);
return ret;
} }
if (memcmp(rkey, key->dptr, key->dsize) == 0) if (memcmp(rkey, key->dptr, key->dsize) == 0)
ret = true; ret = true;
...@@ -109,11 +108,11 @@ static bool key_matches(struct tdb_context *tdb, ...@@ -109,11 +108,11 @@ static bool key_matches(struct tdb_context *tdb,
} }
/* Does entry match? */ /* Does entry match? */
static bool match(struct tdb_context *tdb, static tdb_bool_err match(struct tdb_context *tdb,
struct hash_info *h, struct hash_info *h,
const struct tdb_data *key, const struct tdb_data *key,
tdb_off_t val, tdb_off_t val,
struct tdb_used_record *rec) struct tdb_used_record *rec)
{ {
tdb_off_t off; tdb_off_t off;
enum TDB_ERROR ecode; enum TDB_ERROR ecode;
...@@ -136,8 +135,7 @@ static bool match(struct tdb_context *tdb, ...@@ -136,8 +135,7 @@ static bool match(struct tdb_context *tdb,
off = val & TDB_OFF_MASK; off = val & TDB_OFF_MASK;
ecode = tdb_read_convert(tdb, off, rec, sizeof(*rec)); ecode = tdb_read_convert(tdb, off, rec, sizeof(*rec));
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return false;
} }
if ((h->h & ((1 << 11)-1)) != rec_hash(rec)) { if ((h->h & ((1 << 11)-1)) != rec_hash(rec)) {
...@@ -185,8 +183,7 @@ static tdb_off_t COLD find_in_chain(struct tdb_context *tdb, ...@@ -185,8 +183,7 @@ static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
h->group_start = off; h->group_start = off;
ecode = tdb_read_convert(tdb, off, h->group, sizeof(h->group)); ecode = tdb_read_convert(tdb, off, h->group, sizeof(h->group));
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return TDB_OFF_ERR;
} }
for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) { for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
...@@ -203,11 +200,14 @@ static tdb_off_t COLD find_in_chain(struct tdb_context *tdb, ...@@ -203,11 +200,14 @@ static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
ecode = tdb_read_convert(tdb, recoff, rec, ecode = tdb_read_convert(tdb, recoff, rec,
sizeof(*rec)); sizeof(*rec));
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return TDB_OFF_ERR;
} }
if (key_matches(tdb, rec, recoff, &key)) { ecode = key_matches(tdb, rec, recoff, &key);
if (ecode < 0) {
return ecode;
}
if (ecode == 1) {
h->home_bucket = h->found_bucket = i; h->home_bucket = h->found_bucket = i;
if (tinfo) { if (tinfo) {
...@@ -226,8 +226,7 @@ static tdb_off_t COLD find_in_chain(struct tdb_context *tdb, ...@@ -226,8 +226,7 @@ static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
next = tdb_read_off(tdb, off next = tdb_read_off(tdb, off
+ offsetof(struct tdb_chain, next)); + offsetof(struct tdb_chain, next));
if (TDB_OFF_IS_ERR(next)) { if (TDB_OFF_IS_ERR(next)) {
tdb->ecode = next; return next;
return TDB_OFF_ERR;
} }
if (next) if (next)
next += sizeof(struct tdb_used_record); next += sizeof(struct tdb_used_record);
...@@ -236,7 +235,7 @@ static tdb_off_t COLD find_in_chain(struct tdb_context *tdb, ...@@ -236,7 +235,7 @@ static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
} }
/* This is the core routine which searches the hashtable for an entry. /* This is the core routine which searches the hashtable for an entry.
* On error, no locks are held and TDB_OFF_ERR is returned. * On error, no locks are held and -ve is returned.
* Otherwise, hinfo is filled in (and the optional tinfo). * Otherwise, hinfo is filled in (and the optional tinfo).
* If not found, the return value is 0. * If not found, the return value is 0.
* If found, the return value is the offset, and *rec is the record. */ * If found, the return value is the offset, and *rec is the record. */
...@@ -260,8 +259,7 @@ tdb_off_t find_and_lock(struct tdb_context *tdb, ...@@ -260,8 +259,7 @@ tdb_off_t find_and_lock(struct tdb_context *tdb,
ecode = tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype, ecode = tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
TDB_LOCK_WAIT); TDB_LOCK_WAIT);
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return TDB_OFF_ERR;
} }
hashtable = offsetof(struct tdb_header, hashtable); hashtable = offsetof(struct tdb_header, hashtable);
...@@ -282,7 +280,6 @@ tdb_off_t find_and_lock(struct tdb_context *tdb, ...@@ -282,7 +280,6 @@ tdb_off_t find_and_lock(struct tdb_context *tdb,
ecode = tdb_read_convert(tdb, h->group_start, &h->group, ecode = tdb_read_convert(tdb, h->group_start, &h->group,
sizeof(h->group)); sizeof(h->group));
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode;
goto fail; goto fail;
} }
...@@ -315,14 +312,20 @@ tdb_off_t find_and_lock(struct tdb_context *tdb, ...@@ -315,14 +312,20 @@ tdb_off_t find_and_lock(struct tdb_context *tdb,
i < (1 << TDB_HASH_GROUP_BITS); i < (1 << TDB_HASH_GROUP_BITS);
i++, h->found_bucket = ((h->found_bucket+1) i++, h->found_bucket = ((h->found_bucket+1)
% (1 << TDB_HASH_GROUP_BITS))) { % (1 << TDB_HASH_GROUP_BITS))) {
tdb_bool_err berr;
if (is_subhash(h->group[h->found_bucket])) if (is_subhash(h->group[h->found_bucket]))
continue; continue;
if (!h->group[h->found_bucket]) if (!h->group[h->found_bucket])
break; break;
if (match(tdb, h, &key, h->group[h->found_bucket], berr = match(tdb, h, &key, h->group[h->found_bucket],
rec)) { rec);
if (berr < 0) {
ecode = berr;
goto fail;
}
if (berr) {
if (tinfo) { if (tinfo) {
tinfo->levels[tinfo->num_levels-1].entry tinfo->levels[tinfo->num_levels-1].entry
+= h->found_bucket; += h->found_bucket;
...@@ -338,7 +341,7 @@ tdb_off_t find_and_lock(struct tdb_context *tdb, ...@@ -338,7 +341,7 @@ tdb_off_t find_and_lock(struct tdb_context *tdb,
fail: fail:
tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype); tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
return TDB_OFF_ERR; return ecode;
} }
/* I wrote a simple test, expanding a hash to 2GB, for the following /* I wrote a simple test, expanding a hash to 2GB, for the following
...@@ -412,33 +415,25 @@ static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h) ...@@ -412,33 +415,25 @@ static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
} }
/* Simply overwrite the hash entry we found before. */ /* Simply overwrite the hash entry we found before. */
int replace_in_hash(struct tdb_context *tdb, enum TDB_ERROR replace_in_hash(struct tdb_context *tdb,
struct hash_info *h, struct hash_info *h,
tdb_off_t new_off) tdb_off_t new_off)
{ {
enum TDB_ERROR ecode; return tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
encode_offset(new_off, h));
ecode = tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
encode_offset(new_off, h));
if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode;
return -1;
}
return 0;
} }
/* We slot in anywhere that's empty in the chain. */ /* We slot in anywhere that's empty in the chain. */
static int COLD add_to_chain(struct tdb_context *tdb, static enum TDB_ERROR COLD add_to_chain(struct tdb_context *tdb,
tdb_off_t subhash, tdb_off_t subhash,
tdb_off_t new_off) tdb_off_t new_off)
{ {
tdb_off_t entry; tdb_off_t entry;
enum TDB_ERROR ecode; enum TDB_ERROR ecode;
entry = tdb_find_zero_off(tdb, subhash, 1<<TDB_HASH_GROUP_BITS); entry = tdb_find_zero_off(tdb, subhash, 1<<TDB_HASH_GROUP_BITS);
if (TDB_OFF_IS_ERR(entry)) { if (TDB_OFF_IS_ERR(entry)) {
tdb->ecode = entry; return entry;
return -1;
} }
if (entry == 1 << TDB_HASH_GROUP_BITS) { if (entry == 1 << TDB_HASH_GROUP_BITS) {
...@@ -447,51 +442,42 @@ static int COLD add_to_chain(struct tdb_context *tdb, ...@@ -447,51 +442,42 @@ static int COLD add_to_chain(struct tdb_context *tdb,
next = tdb_read_off(tdb, subhash next = tdb_read_off(tdb, subhash
+ offsetof(struct tdb_chain, next)); + offsetof(struct tdb_chain, next));
if (TDB_OFF_IS_ERR(next)) { if (TDB_OFF_IS_ERR(next)) {
tdb->ecode = next; return next;
return -1;
} }
if (!next) { if (!next) {
next = alloc(tdb, 0, sizeof(struct tdb_chain), 0, next = alloc(tdb, 0, sizeof(struct tdb_chain), 0,
TDB_CHAIN_MAGIC, false); TDB_CHAIN_MAGIC, false);
if (next == TDB_OFF_ERR) if (next == TDB_OFF_ERR)
return -1; return tdb->ecode;
ecode = zero_out(tdb, ecode = zero_out(tdb,
next+sizeof(struct tdb_used_record), next+sizeof(struct tdb_used_record),
sizeof(struct tdb_chain)); sizeof(struct tdb_chain));
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return -1;
} }
ecode = tdb_write_off(tdb, subhash ecode = tdb_write_off(tdb, subhash
+ offsetof(struct tdb_chain, + offsetof(struct tdb_chain,
next), next),
next); next);
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return -1;
} }
} }
return add_to_chain(tdb, next, new_off); return add_to_chain(tdb, next, new_off);
} }
ecode = tdb_write_off(tdb, subhash + entry * sizeof(tdb_off_t), return tdb_write_off(tdb, subhash + entry * sizeof(tdb_off_t),
new_off); new_off);
if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode;
return -1;
}
return 0;
} }
/* Add into a newly created subhash. */ /* Add into a newly created subhash. */
static int add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash, static enum TDB_ERROR add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
unsigned hash_used, tdb_off_t val) unsigned hash_used, tdb_off_t val)
{ {
tdb_off_t off = (val & TDB_OFF_MASK), *group; tdb_off_t off = (val & TDB_OFF_MASK), *group;
struct hash_info h; struct hash_info h;
unsigned int gnum; unsigned int gnum;
enum TDB_ERROR ecode;
h.hash_used = hash_used; h.hash_used = hash_used;
...@@ -507,19 +493,13 @@ static int add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash, ...@@ -507,19 +493,13 @@ static int add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
group = tdb_access_write(tdb, h.group_start, group = tdb_access_write(tdb, h.group_start,
sizeof(*group) << TDB_HASH_GROUP_BITS, true); sizeof(*group) << TDB_HASH_GROUP_BITS, true);
if (TDB_PTR_IS_ERR(group)) { if (TDB_PTR_IS_ERR(group)) {
tdb->ecode = TDB_PTR_ERR(group); return TDB_PTR_ERR(group);
return -1;
} }
force_into_group(group, h.home_bucket, encode_offset(off, &h)); force_into_group(group, h.home_bucket, encode_offset(off, &h));
ecode = tdb_access_commit(tdb, group); return tdb_access_commit(tdb, group);
if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode;
return -1;
}
return 0;
} }
static int expand_group(struct tdb_context *tdb, struct hash_info *h) static enum TDB_ERROR expand_group(struct tdb_context *tdb, struct hash_info *h)
{ {
unsigned bucket, num_vals, i, magic; unsigned bucket, num_vals, i, magic;
size_t subsize; size_t subsize;
...@@ -541,14 +521,14 @@ static int expand_group(struct tdb_context *tdb, struct hash_info *h) ...@@ -541,14 +521,14 @@ static int expand_group(struct tdb_context *tdb, struct hash_info *h)
} }
subhash = alloc(tdb, 0, subsize, 0, magic, false); subhash = alloc(tdb, 0, subsize, 0, magic, false);
if (subhash == TDB_OFF_ERR) if (subhash == TDB_OFF_ERR) {
return -1; return tdb->ecode;
}
ecode = zero_out(tdb, subhash + sizeof(struct tdb_used_record), ecode = zero_out(tdb, subhash + sizeof(struct tdb_used_record),
subsize); subsize);
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return -1;
} }
/* Remove any which are destined for bucket or are in wrong place. */ /* Remove any which are destined for bucket or are in wrong place. */
...@@ -576,21 +556,22 @@ static int expand_group(struct tdb_context *tdb, struct hash_info *h) ...@@ -576,21 +556,22 @@ static int expand_group(struct tdb_context *tdb, struct hash_info *h)
unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK; unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
if (this_bucket == bucket) { if (this_bucket == bucket) {
if (add_to_subhash(tdb, subhash, h->hash_used, vals[i])) ecode = add_to_subhash(tdb, subhash, h->hash_used,
return -1; vals[i]);
if (ecode != TDB_SUCCESS)
return ecode;
} else { } else {
/* There should be room to put this back. */ /* There should be room to put this back. */
force_into_group(h->group, this_bucket, vals[i]); force_into_group(h->group, this_bucket, vals[i]);
} }
} }
return 0; return TDB_SUCCESS;
} }
int delete_from_hash(struct tdb_context *tdb, struct hash_info *h) enum TDB_ERROR delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
{ {
unsigned int i, num_movers = 0; unsigned int i, num_movers = 0;
tdb_off_t movers[1 << TDB_HASH_GROUP_BITS]; tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
enum TDB_ERROR ecode;
h->group[h->found_bucket] = 0; h->group[h->found_bucket] = 0;
for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) { for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
...@@ -620,16 +601,12 @@ int delete_from_hash(struct tdb_context *tdb, struct hash_info *h) ...@@ -620,16 +601,12 @@ int delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
} }
/* Now we write back the hash group */ /* Now we write back the hash group */
ecode = tdb_write_convert(tdb, h->group_start, return tdb_write_convert(tdb, h->group_start,
h->group, sizeof(h->group)); h->group, sizeof(h->group));
if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode;
return -1;
}
return 0;
} }
int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off) enum TDB_ERROR add_to_hash(struct tdb_context *tdb, struct hash_info *h,
tdb_off_t new_off)
{ {
enum TDB_ERROR ecode; enum TDB_ERROR ecode;
...@@ -637,21 +614,18 @@ int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off) ...@@ -637,21 +614,18 @@ int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off)
if (!h->group[h->found_bucket]) { if (!h->group[h->found_bucket]) {
h->group[h->found_bucket] = encode_offset(new_off, h); h->group[h->found_bucket] = encode_offset(new_off, h);
/* Write back the modified group. */ /* Write back the modified group. */
ecode = tdb_write_convert(tdb, h->group_start, return tdb_write_convert(tdb, h->group_start,
h->group, sizeof(h->group)); h->group, sizeof(h->group));
if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode;
return -1;
}
return 0;
} }
if (h->hash_used > 64) if (h->hash_used > 64)
return add_to_chain(tdb, h->group_start, new_off); return add_to_chain(tdb, h->group_start, new_off);
/* We're full. Expand. */ /* We're full. Expand. */
if (expand_group(tdb, h) == -1) ecode = expand_group(tdb, h);
return -1; if (ecode != TDB_SUCCESS) {
return ecode;
}
if (is_subhash(h->group[h->home_bucket])) { if (is_subhash(h->group[h->home_bucket])) {
/* We were expanded! */ /* We were expanded! */
...@@ -662,8 +636,7 @@ int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off) ...@@ -662,8 +636,7 @@ int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off)
ecode = tdb_write_convert(tdb, h->group_start, h->group, ecode = tdb_write_convert(tdb, h->group_start, h->group,
sizeof(h->group)); sizeof(h->group));
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return -1;
} }
/* Move hashinfo down a level. */ /* Move hashinfo down a level. */
...@@ -676,21 +649,15 @@ int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off) ...@@ -676,21 +649,15 @@ int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off)
ecode = tdb_read_convert(tdb, h->group_start, &h->group, ecode = tdb_read_convert(tdb, h->group_start, &h->group,
sizeof(h->group)); sizeof(h->group));
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return -1;
} }
} }
/* Expanding the group must have made room if it didn't choose this /* Expanding the group must have made room if it didn't choose this
* bucket. */ * bucket. */
if (put_into_group(h->group, h->home_bucket, encode_offset(new_off,h))){ if (put_into_group(h->group, h->home_bucket, encode_offset(new_off,h))){
ecode = tdb_write_convert(tdb, h->group_start, return tdb_write_convert(tdb, h->group_start,
h->group, sizeof(h->group)); h->group, sizeof(h->group));
if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode;
return -1;
}
return 0;
} }
/* This can happen if all hashes in group (and us) dropped into same /* This can happen if all hashes in group (and us) dropped into same
...@@ -698,7 +665,7 @@ int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off) ...@@ -698,7 +665,7 @@ int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off)
return add_to_hash(tdb, h, new_off); return add_to_hash(tdb, h, new_off);
} }
/* Traverse support: returns offset of record, or 0 or TDB_OFF_ERR. */ /* Traverse support: returns offset of record, or 0 or -ve error. */
static tdb_off_t iterate_hash(struct tdb_context *tdb, static tdb_off_t iterate_hash(struct tdb_context *tdb,
struct traverse_info *tinfo) struct traverse_info *tinfo)
{ {
...@@ -714,14 +681,12 @@ again: ...@@ -714,14 +681,12 @@ again:
i = tdb_find_nonzero_off(tdb, tlevel->hashtable, i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
i+1, tlevel->total_buckets)) { i+1, tlevel->total_buckets)) {
if (TDB_OFF_IS_ERR(i)) { if (TDB_OFF_IS_ERR(i)) {
tdb->ecode = i; return i;
return TDB_OFF_ERR;
} }
val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i); val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i);
if (TDB_OFF_IS_ERR(val)) { if (TDB_OFF_IS_ERR(val)) {
tdb->ecode = val; return val;
return TDB_OFF_ERR;
} }
off = val & TDB_OFF_MASK; off = val & TDB_OFF_MASK;
...@@ -763,8 +728,7 @@ again: ...@@ -763,8 +728,7 @@ again:
+ offsetof(struct tdb_chain, + offsetof(struct tdb_chain,
next)); next));
if (TDB_OFF_IS_ERR(tlevel->hashtable)) { if (TDB_OFF_IS_ERR(tlevel->hashtable)) {
tdb->ecode = tlevel->hashtable; return tlevel->hashtable;
return TDB_OFF_ERR;
} }
if (tlevel->hashtable) { if (tlevel->hashtable) {
tlevel->hashtable += sizeof(struct tdb_used_record); tlevel->hashtable += sizeof(struct tdb_used_record);
...@@ -779,10 +743,10 @@ again: ...@@ -779,10 +743,10 @@ again:
goto again; goto again;
} }
/* Return 1 if we find something, 0 if not, -1 on error. */ /* Return success if we find something, TDB_ERR_NOEXIST if none. */
int next_in_hash(struct tdb_context *tdb, enum TDB_ERROR next_in_hash(struct tdb_context *tdb,
struct traverse_info *tinfo, struct traverse_info *tinfo,
TDB_DATA *kbuf, size_t *dlen) TDB_DATA *kbuf, size_t *dlen)
{ {
const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS; const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
tdb_off_t hl_start, hl_range, off; tdb_off_t hl_start, hl_range, off;
...@@ -795,28 +759,29 @@ int next_in_hash(struct tdb_context *tdb, ...@@ -795,28 +759,29 @@ int next_in_hash(struct tdb_context *tdb,
ecode = tdb_lock_hashes(tdb, hl_start, hl_range, F_RDLCK, ecode = tdb_lock_hashes(tdb, hl_start, hl_range, F_RDLCK,
TDB_LOCK_WAIT); TDB_LOCK_WAIT);
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; return ecode;
return -1;
} }
off = iterate_hash(tdb, tinfo); off = iterate_hash(tdb, tinfo);
if (off) { if (off) {
struct tdb_used_record rec; struct tdb_used_record rec;
if (TDB_OFF_IS_ERR(off)) {
ecode = off;
goto fail;
}
ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec)); ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode; goto fail;
tdb_unlock_hashes(tdb,
hl_start, hl_range, F_RDLCK);
return -1;
} }
if (rec_magic(&rec) != TDB_USED_MAGIC) { if (rec_magic(&rec) != TDB_USED_MAGIC) {
tdb_logerr(tdb, TDB_ERR_CORRUPT, ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT,
TDB_LOG_ERROR, TDB_LOG_ERROR,
"next_in_hash:" "next_in_hash:"
" corrupt record at %llu", " corrupt record at %llu",
(long long)off); (long long)off);
return -1; goto fail;
} }
kbuf->dsize = rec_key_length(&rec); kbuf->dsize = rec_key_length(&rec);
...@@ -835,10 +800,9 @@ int next_in_hash(struct tdb_context *tdb, ...@@ -835,10 +800,9 @@ int next_in_hash(struct tdb_context *tdb,
} }
tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK); tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
if (TDB_PTR_IS_ERR(kbuf->dptr)) { if (TDB_PTR_IS_ERR(kbuf->dptr)) {
tdb->ecode = TDB_PTR_ERR(kbuf->dptr); return TDB_PTR_ERR(kbuf->dptr);
return -1;
} }
return 1; return TDB_SUCCESS;
} }
tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK); tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
...@@ -848,13 +812,17 @@ int next_in_hash(struct tdb_context *tdb, ...@@ -848,13 +812,17 @@ int next_in_hash(struct tdb_context *tdb,
+= (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS); += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
tinfo->levels[0].entry = 0; tinfo->levels[0].entry = 0;
} }
return 0; return TDB_ERR_NOEXIST;
fail:
tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
return ecode;
} }
/* Return 1 if we find something, 0 if not, -1 on error. */ enum TDB_ERROR first_in_hash(struct tdb_context *tdb,
int first_in_hash(struct tdb_context *tdb, struct traverse_info *tinfo,
struct traverse_info *tinfo, TDB_DATA *kbuf, size_t *dlen)
TDB_DATA *kbuf, size_t *dlen)
{ {
tinfo->prev = 0; tinfo->prev = 0;
tinfo->toplevel_group = 0; tinfo->toplevel_group = 0;
...@@ -868,9 +836,9 @@ int first_in_hash(struct tdb_context *tdb, ...@@ -868,9 +836,9 @@ int first_in_hash(struct tdb_context *tdb,
/* Even if the entry isn't in this hash bucket, you'd have to lock this /* Even if the entry isn't in this hash bucket, you'd have to lock this
* bucket to find it. */ * bucket to find it. */
static int chainlock(struct tdb_context *tdb, const TDB_DATA *key, static enum TDB_ERROR chainlock(struct tdb_context *tdb, const TDB_DATA *key,
int ltype, enum tdb_lock_flags waitflag, int ltype, enum tdb_lock_flags waitflag,
const char *func) const char *func)
{ {
enum TDB_ERROR ecode; enum TDB_ERROR ecode;
uint64_t h = tdb_hash(tdb, key->dptr, key->dsize); uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
...@@ -884,18 +852,19 @@ static int chainlock(struct tdb_context *tdb, const TDB_DATA *key, ...@@ -884,18 +852,19 @@ static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
ecode = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag); ecode = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
tdb_trace_1rec(tdb, func, *key); tdb_trace_1rec(tdb, func, *key);
if (ecode != TDB_SUCCESS) { return ecode;
tdb->ecode = ecode;
return -1;
}
return 0;
} }
/* lock/unlock one hash chain. This is meant to be used to reduce /* lock/unlock one hash chain. This is meant to be used to reduce
contention - it cannot guarantee how many records will be locked */ contention - it cannot guarantee how many records will be locked */
int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key) int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
{ {
return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock"); tdb->ecode = chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
"tdb_chainlock");
if (tdb->ecode == TDB_SUCCESS)
return 0;
return -1;
} }
int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key) int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
......
...@@ -394,13 +394,13 @@ struct tdb_methods { ...@@ -394,13 +394,13 @@ struct tdb_methods {
/* hash.c: */ /* hash.c: */
void tdb_hash_init(struct tdb_context *tdb); void tdb_hash_init(struct tdb_context *tdb);
int first_in_hash(struct tdb_context *tdb, tdb_bool_err first_in_hash(struct tdb_context *tdb,
struct traverse_info *tinfo, struct traverse_info *tinfo,
TDB_DATA *kbuf, size_t *dlen); TDB_DATA *kbuf, size_t *dlen);
int next_in_hash(struct tdb_context *tdb, tdb_bool_err next_in_hash(struct tdb_context *tdb,
struct traverse_info *tinfo, struct traverse_info *tinfo,
TDB_DATA *kbuf, size_t *dlen); TDB_DATA *kbuf, size_t *dlen);
/* Hash random memory. */ /* Hash random memory. */
uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len); uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len);
...@@ -416,14 +416,14 @@ tdb_off_t find_and_lock(struct tdb_context *tdb, ...@@ -416,14 +416,14 @@ tdb_off_t find_and_lock(struct tdb_context *tdb,
struct tdb_used_record *rec, struct tdb_used_record *rec,
struct traverse_info *tinfo); struct traverse_info *tinfo);
int replace_in_hash(struct tdb_context *tdb, enum TDB_ERROR replace_in_hash(struct tdb_context *tdb,
struct hash_info *h, struct hash_info *h,
tdb_off_t new_off); tdb_off_t new_off);
int add_to_hash(struct tdb_context *tdb, struct hash_info *h, enum TDB_ERROR add_to_hash(struct tdb_context *tdb, struct hash_info *h,
tdb_off_t new_off); tdb_off_t new_off);
int delete_from_hash(struct tdb_context *tdb, struct hash_info *h); enum TDB_ERROR delete_from_hash(struct tdb_context *tdb, struct hash_info *h);
/* For tdb_check */ /* For tdb_check */
bool is_subhash(tdb_off_t val); bool is_subhash(tdb_off_t val);
......
...@@ -449,11 +449,13 @@ static int replace_data(struct tdb_context *tdb, ...@@ -449,11 +449,13 @@ static int replace_data(struct tdb_context *tdb,
add_free_record(tdb, old_off, add_free_record(tdb, old_off,
sizeof(struct tdb_used_record) sizeof(struct tdb_used_record)
+ key.dsize + old_room); + key.dsize + old_room);
if (replace_in_hash(tdb, h, new_off) == -1) ecode = replace_in_hash(tdb, h, new_off);
return -1;
} else { } else {
if (add_to_hash(tdb, h, new_off) == -1) ecode = add_to_hash(tdb, h, new_off);
return -1; }
if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode;
return -1;
} }
new_off += sizeof(struct tdb_used_record); new_off += sizeof(struct tdb_used_record);
...@@ -485,8 +487,10 @@ int tdb_store(struct tdb_context *tdb, ...@@ -485,8 +487,10 @@ int tdb_store(struct tdb_context *tdb,
enum TDB_ERROR ecode; enum TDB_ERROR ecode;
off = find_and_lock(tdb, key, F_WRLCK, &h, &rec, NULL); off = find_and_lock(tdb, key, F_WRLCK, &h, &rec, NULL);
if (unlikely(off == TDB_OFF_ERR)) if (TDB_OFF_IS_ERR(off)) {
tdb->ecode = off;
return -1; return -1;
}
/* Now we have lock on this hash bucket. */ /* Now we have lock on this hash bucket. */
if (flag == TDB_INSERT) { if (flag == TDB_INSERT) {
...@@ -551,8 +555,10 @@ int tdb_append(struct tdb_context *tdb, ...@@ -551,8 +555,10 @@ int tdb_append(struct tdb_context *tdb,
int ret; int ret;
off = find_and_lock(tdb, key, F_WRLCK, &h, &rec, NULL); off = find_and_lock(tdb, key, F_WRLCK, &h, &rec, NULL);
if (unlikely(off == TDB_OFF_ERR)) if (TDB_OFF_IS_ERR(off)) {
tdb->ecode = off;
return -1; return -1;
}
if (off) { if (off) {
old_dlen = rec_data_length(&rec); old_dlen = rec_data_length(&rec);
...@@ -621,8 +627,10 @@ struct tdb_data tdb_fetch(struct tdb_context *tdb, struct tdb_data key) ...@@ -621,8 +627,10 @@ struct tdb_data tdb_fetch(struct tdb_context *tdb, struct tdb_data key)
struct tdb_data ret; struct tdb_data ret;
off = find_and_lock(tdb, key, F_RDLCK, &h, &rec, NULL); off = find_and_lock(tdb, key, F_RDLCK, &h, &rec, NULL);
if (unlikely(off == TDB_OFF_ERR)) if (TDB_OFF_IS_ERR(off)) {
tdb->ecode = off;
return tdb_null; return tdb_null;
}
if (!off) { if (!off) {
tdb->ecode = TDB_ERR_NOEXIST; tdb->ecode = TDB_ERR_NOEXIST;
...@@ -646,10 +654,13 @@ int tdb_delete(struct tdb_context *tdb, struct tdb_data key) ...@@ -646,10 +654,13 @@ int tdb_delete(struct tdb_context *tdb, struct tdb_data key)
tdb_off_t off; tdb_off_t off;
struct tdb_used_record rec; struct tdb_used_record rec;
struct hash_info h; struct hash_info h;
enum TDB_ERROR ecode;
off = find_and_lock(tdb, key, F_WRLCK, &h, &rec, NULL); off = find_and_lock(tdb, key, F_WRLCK, &h, &rec, NULL);
if (unlikely(off == TDB_OFF_ERR)) if (TDB_OFF_IS_ERR(off)) {
tdb->ecode = off;
return -1; return -1;
}
if (!off) { if (!off) {
tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK); tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
...@@ -657,8 +668,11 @@ int tdb_delete(struct tdb_context *tdb, struct tdb_data key) ...@@ -657,8 +668,11 @@ int tdb_delete(struct tdb_context *tdb, struct tdb_data key)
return -1; return -1;
} }
if (delete_from_hash(tdb, &h) == -1) ecode = delete_from_hash(tdb, &h);
if (ecode != TDB_SUCCESS) {
tdb->ecode = ecode;
goto unlock_err; goto unlock_err;
}
/* Free the deleted entry. */ /* Free the deleted entry. */
add_stat(tdb, frees, 1); add_stat(tdb, frees, 1);
......
...@@ -19,7 +19,7 @@ static tdb_off_t tdb_offset(struct tdb_context *tdb, struct tdb_data key) ...@@ -19,7 +19,7 @@ static tdb_off_t tdb_offset(struct tdb_context *tdb, struct tdb_data key)
struct hash_info h; struct hash_info h;
off = find_and_lock(tdb, key, F_RDLCK, &h, &rec, NULL); off = find_and_lock(tdb, key, F_RDLCK, &h, &rec, NULL);
if (unlikely(off == TDB_OFF_ERR)) if (TDB_OFF_IS_ERR(off))
return 0; return 0;
tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_RDLCK); tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_RDLCK);
return off; return off;
......
...@@ -20,15 +20,15 @@ ...@@ -20,15 +20,15 @@
int64_t tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *p) int64_t tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *p)
{ {
int ret; enum TDB_ERROR ecode;
struct traverse_info tinfo; struct traverse_info tinfo;
struct tdb_data k, d; struct tdb_data k, d;
int64_t count = 0; int64_t count = 0;
k.dptr = NULL; k.dptr = NULL;
for (ret = first_in_hash(tdb, &tinfo, &k, &d.dsize); for (ecode = first_in_hash(tdb, &tinfo, &k, &d.dsize);
ret == 1; ecode == TDB_SUCCESS;
ret = next_in_hash(tdb, &tinfo, &k, &d.dsize)) { ecode = next_in_hash(tdb, &tinfo, &k, &d.dsize)) {
d.dptr = k.dptr + k.dsize; d.dptr = k.dptr + k.dsize;
count++; count++;
...@@ -39,8 +39,10 @@ int64_t tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *p) ...@@ -39,8 +39,10 @@ int64_t tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *p)
free(k.dptr); free(k.dptr);
} }
if (ret < 0) if (ecode != TDB_ERR_NOEXIST) {
tdb->ecode = ecode;
return -1; return -1;
}
return count; return count;
} }
...@@ -48,15 +50,16 @@ TDB_DATA tdb_firstkey(struct tdb_context *tdb) ...@@ -48,15 +50,16 @@ TDB_DATA tdb_firstkey(struct tdb_context *tdb)
{ {
struct traverse_info tinfo; struct traverse_info tinfo;
struct tdb_data k; struct tdb_data k;
switch (first_in_hash(tdb, &tinfo, &k, NULL)) { enum TDB_ERROR ecode;
case 1:
ecode = first_in_hash(tdb, &tinfo, &k, NULL);
if (ecode == TDB_SUCCESS) {
return k; return k;
case 0:
tdb->ecode = TDB_SUCCESS;
/* Fall thru... */
default:
return tdb_null;
} }
if (ecode == TDB_ERR_NOEXIST)
ecode = TDB_SUCCESS;
tdb->ecode = ecode;
return tdb_null;
} }
/* We lock twice, not very efficient. We could keep last key & tinfo cached. */ /* We lock twice, not very efficient. We could keep last key & tinfo cached. */
...@@ -65,19 +68,21 @@ TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA key) ...@@ -65,19 +68,21 @@ TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA key)
struct traverse_info tinfo; struct traverse_info tinfo;
struct hash_info h; struct hash_info h;
struct tdb_used_record rec; struct tdb_used_record rec;
enum TDB_ERROR ecode;
tinfo.prev = find_and_lock(tdb, key, F_RDLCK, &h, &rec, &tinfo); tinfo.prev = find_and_lock(tdb, key, F_RDLCK, &h, &rec, &tinfo);
if (unlikely(tinfo.prev == TDB_OFF_ERR)) if (TDB_OFF_IS_ERR(tinfo.prev)) {
tdb->ecode = tinfo.prev;
return tdb_null; return tdb_null;
}
tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_RDLCK); tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_RDLCK);
switch (next_in_hash(tdb, &tinfo, &key, NULL)) { ecode = next_in_hash(tdb, &tinfo, &key, NULL);
case 1: if (ecode == TDB_SUCCESS) {
return key; return key;
case 0:
tdb->ecode = TDB_SUCCESS;
/* Fall thru... */
default:
return tdb_null;
} }
if (ecode == TDB_ERR_NOEXIST)
ecode = TDB_SUCCESS;
tdb->ecode = ecode;
return tdb_null;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment