Commit 57aef552 authored by Rich Prohaska's avatar Rich Prohaska

implement and test cursor->count. closes #258

git-svn-id: file:///svn/tokudb@1587 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4d76d2c4
...@@ -31,6 +31,14 @@ int Dbc::del(u_int32_t flags) { ...@@ -31,6 +31,14 @@ int Dbc::del(u_int32_t flags) {
return env->maybe_throw_error(ret); return env->maybe_throw_error(ret);
} }
int Dbc::count(db_recno_t *count, u_int32_t flags) {
DBC *dbc = this;
int ret = dbc->c_count(dbc, count, flags);
DB_ENV *dbenv_c=dbc->dbp->dbenv;
DbEnv *env = (DbEnv*)dbenv_c->api1_internal;
return env->maybe_throw_error(ret);
}
// Not callable, but some compilers require it to be defined anyway. // Not callable, but some compilers require it to be defined anyway.
Dbc::~Dbc() Dbc::~Dbc()
{ {
......
...@@ -34,3 +34,5 @@ check: $(TARGETS) ...@@ -34,3 +34,5 @@ check: $(TARGETS)
$(VGRIND) ./test_no_env $(VGRIND) ./test_no_env
$(VGRIND) ./test_db_assoc3 $(VGRIND) ./test_db_assoc3
$(VGRIND) ./test_db_assoc3 --more $(VGRIND) ./test_db_assoc3 --more
$(VGRIND) ./test_cursor_count
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <assert.h> #include <assert.h>
#include <db_cxx.h> #include <db_cxx.h>
int verbose;
#ifndef DB_YESOVERWRITE #ifndef DB_YESOVERWRITE
#define BDB 1 #define BDB 1
#define DB_YESOVERWRITE 0 #define DB_YESOVERWRITE 0
#else #else
#define TDB 1 #define TDB 1
typedef u_int32_t db_recno_t;
#endif #endif
int keyeq(Dbt *a, Dbt *b) { int keyeq(Dbt *a, Dbt *b) {
...@@ -17,7 +19,7 @@ int keyeq(Dbt *a, Dbt *b) { ...@@ -17,7 +19,7 @@ int keyeq(Dbt *a, Dbt *b) {
} }
void load(Db *db, int n) { void load(Db *db, int n) {
printf("load\n"); if (verbose) printf("load\n");
int i; int i;
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
if (i == n/2) continue; if (i == n/2) continue;
...@@ -37,7 +39,7 @@ void load(Db *db, int n) { ...@@ -37,7 +39,7 @@ void load(Db *db, int n) {
} }
} }
db_recno_t my_cursor_count(Db *db, Dbc *cursor) { int my_cursor_count(Dbc *cursor, db_recno_t *count, Db *db) {
int r; int r;
Dbt key; key.set_flags(DB_DBT_REALLOC); Dbt key; key.set_flags(DB_DBT_REALLOC);
Dbt val; val.set_flags(DB_DBT_REALLOC); Dbt val; val.set_flags(DB_DBT_REALLOC);
...@@ -46,30 +48,33 @@ db_recno_t my_cursor_count(Db *db, Dbc *cursor) { ...@@ -46,30 +48,33 @@ db_recno_t my_cursor_count(Db *db, Dbc *cursor) {
Dbc *count_cursor; Dbc *count_cursor;
r = db->cursor(0, &count_cursor, 0); assert(r == 0); r = db->cursor(0, &count_cursor, 0); assert(r == 0);
r = count_cursor->get(&key, &val, DB_SET); assert(r == 0); r = count_cursor->get(&key, &val, DB_SET); assert(r == 0);
int nmatch; *count = 0;
Dbt nkey, nval; Dbt nkey, nval;
for (nmatch=1; ; nmatch++) { for (;; ) {
*count += 1;
nkey.set_flags(DB_DBT_REALLOC); nkey.set_flags(DB_DBT_REALLOC);
nval.set_flags(DB_DBT_REALLOC); nval.set_flags(DB_DBT_REALLOC);
r = count_cursor->get(&nkey, &nval, DB_NEXT); r = count_cursor->get(&nkey, &nval, DB_NEXT);
if (r != 0) break; if (r != 0) break;
if (!keyeq(&key, &nkey)) break; if (!keyeq(&key, &nkey)) break;
} }
r = 0;
if (nkey.get_data()) free(nkey.get_data()); if (nkey.get_data()) free(nkey.get_data());
if (nval.get_data()) free(nval.get_data()); if (nval.get_data()) free(nval.get_data());
if (key.get_data()) free(key.get_data()); if (key.get_data()) free(key.get_data());
if (val.get_data()) free(val.get_data()); if (val.get_data()) free(val.get_data());
r = count_cursor->close(); assert(r == 0); int rr = count_cursor->close(); assert(rr == 0);
return nmatch; return r;
} }
void walk(Db *db, int n) { void walk(Db *db, int n) {
printf("walk\n"); if (verbose) printf("walk\n");
Dbc *cursor; Dbc *cursor;
int r = db->cursor(0, &cursor, 0); assert(r == 0); int r = db->cursor(0, &cursor, 0); assert(r == 0);
int i;
Dbt key, val; Dbt key, val;
for (i=0; ; i++) { int i;
for (i=0;;i++) {
key.set_flags(DB_DBT_REALLOC); key.set_flags(DB_DBT_REALLOC);
val.set_flags(DB_DBT_REALLOC); val.set_flags(DB_DBT_REALLOC);
r = cursor->get(&key, &val, DB_NEXT); r = cursor->get(&key, &val, DB_NEXT);
...@@ -81,22 +86,57 @@ void walk(Db *db, int n) { ...@@ -81,22 +86,57 @@ void walk(Db *db, int n) {
int v; int v;
assert(val.get_size() == sizeof v); assert(val.get_size() == sizeof v);
memcpy(&v, val.get_data(), val.get_size()); memcpy(&v, val.get_data(), val.get_size());
db_recno_t nmatch; db_recno_t count;
#if BDB r = cursor->count(&count, 0); assert(r == 0);
r = cursor->count(&nmatch, 0); if (verbose) printf("%d %d %d\n", k, v, count);
printf("%d %d %d\n", k, v, nmatch); db_recno_t mycount;
assert(my_cursor_count(db, cursor) == nmatch); r = my_cursor_count(cursor, &mycount, db); assert(r == 0);
#else assert(mycount == count);
nmatch = my_cursor_count(db, cursor); if (k == n/2) assert((int)count == n); else assert(count == 1);
printf("%d %d %d\n", k, v, nmatch);
if (k == n/2) assert(nmatch == (db_recno_t) n); else assert(nmatch == 1);
#endif
} }
assert(i == 2*n-1);
free(key.get_data()); free(key.get_data());
free(val.get_data()); free(val.get_data());
r = cursor->close(); assert(r == 0); r = cursor->close(); assert(r == 0);
} }
int cursor_set(Dbc *cursor, int k) {
Dbt key(&k, sizeof k);
Dbt val;
int r = cursor->get(&key, &val, DB_SET);
return r;
}
void test_zero_count(Db *db, int n) {
if (verbose) printf("test_zero_count\n");
Dbc *cursor;
int r = db->cursor(0, &cursor, 0); assert(r == 0);
r = cursor_set(cursor, htonl(n/2)); assert(r == 0);
db_recno_t count;
r = cursor->count(&count, 0); assert(r == 0);
assert((int)count == n);
Dbt key; key.set_flags(DB_DBT_REALLOC);
Dbt val; val.set_flags(DB_DBT_REALLOC);
int i;
for (i=1; count > 0; i++) {
r = cursor->del(0); assert(r == 0);
db_recno_t newcount;
r = cursor->count(&newcount, 0);
if (r != 0)
break;
assert(newcount == count - 1);
count = newcount;
r = cursor->get(&key, &val, DB_NEXT_DUP);
if (r != 0) break;
}
assert(i == n);
if (key.get_data()) free(key.get_data());
if (val.get_data()) free(val.get_data());
r = cursor->close(); assert(r == 0);
}
int my_next_nodup(Dbc *cursor, Dbt *key, Dbt *val) { int my_next_nodup(Dbc *cursor, Dbt *key, Dbt *val) {
int r; int r;
Dbt currentkey; currentkey.set_flags(DB_DBT_REALLOC); Dbt currentkey; currentkey.set_flags(DB_DBT_REALLOC);
...@@ -138,17 +178,24 @@ int my_prev_nodup(Dbc *cursor, Dbt *key, Dbt *val) { ...@@ -138,17 +178,24 @@ int my_prev_nodup(Dbc *cursor, Dbt *key, Dbt *val) {
} }
void test_next_nodup(Db *db, int n) { void test_next_nodup(Db *db, int n) {
printf("test_next_nodup\n"); if (verbose) printf("test_next_nodup\n");
int r; int r;
Dbc *cursor; Dbc *cursor;
r = db->cursor(0, &cursor, 0); assert(r == 0); r = db->cursor(0, &cursor, 0); assert(r == 0);
Dbt key; key.set_flags(DB_DBT_REALLOC); Dbt key; key.set_flags(DB_DBT_REALLOC);
Dbt val; val.set_flags(DB_DBT_REALLOC); Dbt val; val.set_flags(DB_DBT_REALLOC);
r = cursor->get(&key, &val, DB_FIRST); assert(r == 0); r = cursor->get(&key, &val, DB_FIRST); assert(r == 0);
int i = 0;
while (r == 0) { while (r == 0) {
printf("%d %d\n", htonl(*(int*)key.get_data()), *(int*)val.get_data()); int k = htonl(*(int*)key.get_data());
int v = *(int*)val.get_data();
if (verbose) printf("%d %d\n", k, v);
assert(k == i);
if (k != n/2) assert(v == i); else assert(v == 0);
i += 1;
r = my_next_nodup(cursor, &key, &val); r = my_next_nodup(cursor, &key, &val);
} }
assert(i == n);
if (key.get_data()) free(key.get_data()); if (key.get_data()) free(key.get_data());
if (val.get_data()) free(val.get_data()); if (val.get_data()) free(val.get_data());
r = cursor->close(); assert(r == 0); r = cursor->close(); assert(r == 0);
...@@ -189,7 +236,7 @@ int my_prev_dup(Dbc *cursor, Dbt *key, Dbt *val) { ...@@ -189,7 +236,7 @@ int my_prev_dup(Dbc *cursor, Dbt *key, Dbt *val) {
} }
void test_next_dup(Db *db, int n) { void test_next_dup(Db *db, int n) {
printf("test_next_dup\n"); if (verbose) printf("test_next_dup\n");
int r; int r;
Dbc *cursor; Dbc *cursor;
r = db->cursor(0, &cursor, 0); assert(r == 0); r = db->cursor(0, &cursor, 0); assert(r == 0);
...@@ -199,8 +246,13 @@ void test_next_dup(Db *db, int n) { ...@@ -199,8 +246,13 @@ void test_next_dup(Db *db, int n) {
Dbt val; val.set_flags(DB_DBT_REALLOC); Dbt val; val.set_flags(DB_DBT_REALLOC);
r = cursor->get(&setkey, &val, DB_SET); assert(r == 0); r = cursor->get(&setkey, &val, DB_SET); assert(r == 0);
r = cursor->get(&key, &val, DB_CURRENT); assert(r == 0); r = cursor->get(&key, &val, DB_CURRENT); assert(r == 0);
int i = 0;
while (r == 0) { while (r == 0) {
printf("%d %d\n", htonl(*(int*)key.get_data()), *(int*)val.get_data()); int k = htonl(*(int*)key.get_data());
int v = *(int*)val.get_data();
if (verbose) printf("%d %d\n", k, v);
assert(k == n/2); assert(v == i);
i += 1;
r = my_next_dup(cursor, &key, &val); r = my_next_dup(cursor, &key, &val);
} }
if (key.get_data()) free(key.get_data()); if (key.get_data()) free(key.get_data());
...@@ -208,17 +260,25 @@ void test_next_dup(Db *db, int n) { ...@@ -208,17 +260,25 @@ void test_next_dup(Db *db, int n) {
r = cursor->close(); assert(r == 0); r = cursor->close(); assert(r == 0);
} }
int main() { int main(int argc, char *argv[]) {
for (int i=1; i<argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-v") == 0 || strcmp(arg, "--verbose") == 0)
verbose = 1;
}
int r; int r;
Db db(0, DB_CXX_NO_EXCEPTIONS); Db db(0, DB_CXX_NO_EXCEPTIONS);
r = db.set_flags(DB_DUP + DB_DUPSORT); assert(r == 0); r = db.set_flags(DB_DUP + DB_DUPSORT); assert(r == 0);
unlink("test.db");
r = db.open(0, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0); r = db.open(0, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
load(&db, 10); load(&db, 10);
walk(&db, 10); walk(&db, 10);
test_next_nodup(&db, 10); test_next_nodup(&db, 10);
test_next_dup(&db, 10); test_next_dup(&db, 10);
test_zero_count(&db, 10);
return 0; return 0;
} }
...@@ -211,6 +211,7 @@ class Dbc : protected DBC { ...@@ -211,6 +211,7 @@ class Dbc : protected DBC {
int get(Dbt *, Dbt *, u_int32_t); int get(Dbt *, Dbt *, u_int32_t);
int pget(Dbt *, Dbt *, Dbt *, u_int32_t); int pget(Dbt *, Dbt *, Dbt *, u_int32_t);
int del(u_int32_t); int del(u_int32_t);
int count(db_recno_t *, u_int32_t);
private: private:
Dbc(); // User may not call it. Dbc(); // User may not call it.
~Dbc(); // User may not delete it. ~Dbc(); // User may not delete it.
......
...@@ -690,7 +690,7 @@ static TXNID next_txn = 0; ...@@ -690,7 +690,7 @@ static TXNID next_txn = 0;
static int toku_txn_abort(DB_TXN * txn) { static int toku_txn_abort(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
return toku_logger_abort(txn->mgrp->i->logger); return -1; // wont compile yet return toku_logger_abort(txn->mgrp->i->logger);
} }
static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
...@@ -975,6 +975,48 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag ...@@ -975,6 +975,48 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag
return r; return r;
} }
static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
int r;
DBC *count_cursor = 0;
DBT currentkey; memset(&currentkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC;
DBT currentval; memset(&currentval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC;
DBT key; memset(&key, 0, sizeof key); key.flags = DB_DBT_REALLOC;
DBT val; memset(&val, 0, sizeof val); val.flags = DB_DBT_REALLOC;
if (flags != 0) {
r = EINVAL; goto finish;
}
r = cursor->c_get(cursor, &currentkey, &currentval, DB_CURRENT+256);
if (r != 0) goto finish;
r = cursor->dbp->cursor(cursor->dbp, 0, &count_cursor, 0);
if (r != 0) goto finish;
*count = 0;
r = count_cursor->c_get(count_cursor, &currentkey, &currentval, DB_SET);
if (r != 0) {
r = 0; goto finish; /* success, the current key must be deleted and there are no more */
}
for (;;) {
*count += 1;
r = count_cursor->c_get(count_cursor, &key, &val, DB_NEXT);
if (r != 0) break;
if (!keyeq(count_cursor, &currentkey, &key)) break;
}
r = 0; /* success, we found at least one before the end */
finish:
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
if (currentkey.data) toku_free(currentkey.data);
if (currentval.data) toku_free(currentval.data);
if (count_cursor) {
int rr = count_cursor->c_close(count_cursor); assert(rr == 0);
}
return r;
}
static int toku_c_del_noassociate(DBC * c, u_int32_t flags) { static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
HANDLE_PANICKED_DB(c->dbp); HANDLE_PANICKED_DB(c->dbp);
int r; int r;
...@@ -1334,6 +1376,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) { ...@@ -1334,6 +1376,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) {
result->c_put = toku_c_put; result->c_put = toku_c_put;
result->c_close = toku_c_close; result->c_close = toku_c_close;
result->c_del = toku_c_del; result->c_del = toku_c_del;
result->c_count = toku_c_count;
MALLOC(result->i); MALLOC(result->i);
assert(result->i); assert(result->i);
result->dbp = db; result->dbp = db;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment