Commit 68f199f1 authored by John Esmet's avatar John Esmet

fixes #192 Remove legacy OMT API. Some of it survives in omt-test.cc to

simplify things. New OMTs will use the new templated API, or, when
performance is not critical, use something simple like a std::map
parent 7a76d84d
......@@ -53,7 +53,6 @@ set(FT_SOURCES
logger
log_upgrade
minicron
omt
pqueue
queue
quicklz
......
......@@ -743,8 +743,6 @@ ftleaf_get_split_loc(
return;
}
// TODO: (Zardosht) possibly get rid of this function and use toku_omt_split_at in
// ftleaf_split
static void
move_leafentries(
BASEMENTNODE dest_bn,
......
......@@ -4438,7 +4438,7 @@ int iterate_do_bn_apply_msg(const int32_t &offset, const uint32_t UU(idx), struc
* found. The pivot_bounds are the lower bound exclusive and upper bound
* inclusive, because they come from pivot keys in the tree. We want OMT
* indices, which must have the lower bound be inclusive and the upper
* bound exclusive. We will get these by telling toku_omt_find to look
* bound exclusive. We will get these by telling omt::find to look
* for something strictly bigger than each of our pivot bounds.
*
* Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper
......@@ -4511,7 +4511,7 @@ find_bounds_within_message_tree(
// Again, we use an msn of MAX_MSN and a direction of +1 to get
// the first thing bigger than the upper_bound_inclusive key.
// This is therefore the smallest thing we don't want to apply,
// and toku_omt_iterate_on_range will not examine it.
// and omt::iterate_on_range will not examine it.
struct toku_fifo_entry_key_msn_heaviside_extra ube_extra;
ZERO_STRUCT(ube_extra);
ube_extra.desc = desc;
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*
COPYING CONDITIONS NOTICE:
This program is free software; you can redistribute it and/or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation, and provided that the
following conditions are met:
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below).
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below) in the documentation and/or other materials
provided with the distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc.
DISCLAIMER:
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
UNIVERSITY PATENT NOTICE:
The technology is licensed by the Massachusetts Institute of
Technology, Rutgers State University of New Jersey, and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No. 11/760379 and to the patents
and/or patent applications resulting from it.
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
This software is covered by US Patent No. 8,489,638.
PATENT RIGHTS GRANT:
"THIS IMPLEMENTATION" means the copyrightable works distributed by
Tokutek as part of the Fractal Tree project.
"PATENT CLAIMS" means the claims of patents that are owned or
licensable by Tokutek, both currently or in the future; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
"PATENT CHALLENGE" shall mean a challenge to the validity,
patentability, enforceability and/or non-infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
Tokutek hereby grants to you, for the term and geographical scope of
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, transfer, and
otherwise run, modify, and propagate the contents of THIS
IMPLEMENTATION, where such license applies only to the PATENT
CLAIMS. This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION. If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any rights
granted to you under this License shall terminate as of the date
such litigation is filed. If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE, then Tokutek may terminate any rights granted to you
under this License.
*/
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
#include <memory.h>
#include <string.h>
#include <db.h>
#include "omt.h"
int
toku_omt_create_steal_sorted_array(OMT *omtp, OMTVALUE **valuesp, uint32_t numvalues, uint32_t capacity) {
OMT XMALLOC(omt);
omt->create_steal_sorted_array(valuesp, numvalues, capacity);
*omtp = omt;
return 0;
}
//TODO: Put all omt API functions here.
int toku_omt_create (OMT *omtp) {
OMT XMALLOC(omt);
omt->create();
*omtp = omt;
return 0;
}
void toku_omt_destroy(OMT *omtp) {
OMT omt=*omtp;
omt->destroy();
toku_free(omt);
*omtp=NULL;
}
uint32_t toku_omt_size(OMT V) {
return V->size();
}
int toku_omt_create_from_sorted_array(OMT *omtp, OMTVALUE *values, uint32_t numvalues) {
OMT XMALLOC(omt);
omt->create_from_sorted_array(values, numvalues);
*omtp=omt;
return 0;
}
int toku_omt_insert_at(OMT omt, OMTVALUE value, uint32_t index) {
return omt->insert_at(value, index);
}
int toku_omt_set_at (OMT omt, OMTVALUE value, uint32_t index) {
return omt->set_at(value, index);
}
int toku_omt_delete_at(OMT omt, uint32_t index) {
return omt->delete_at(index);
}
int toku_omt_fetch(OMT omt, uint32_t i, OMTVALUE *v) {
return omt->fetch(i, v);
}
struct functor {
int (*f)(OMTVALUE, uint32_t, void *);
void *v;
};
static_assert(std::is_pod<functor>::value, "not POD");
int call_functor(const OMTVALUE &v, uint32_t idx, functor *const ftor);
int call_functor(const OMTVALUE &v, uint32_t idx, functor *const ftor) {
return ftor->f(const_cast<OMTVALUE>(v), idx, ftor->v);
}
int toku_omt_iterate(OMT omt, int (*f)(OMTVALUE, uint32_t, void*), void*v) {
struct functor ftor = { .f = f, .v = v };
return omt->iterate<functor, call_functor>(&ftor);
}
int toku_omt_iterate_on_range(OMT omt, uint32_t left, uint32_t right, int (*f)(OMTVALUE, uint32_t, void*), void*v) {
struct functor ftor = { .f = f, .v = v };
return omt->iterate_on_range<functor, call_functor>(left, right, &ftor);
}
struct heftor {
int (*h)(OMTVALUE, void *v);
void *v;
};
static_assert(std::is_pod<heftor>::value, "not POD");
int call_heftor(const OMTVALUE &v, const heftor &htor);
int call_heftor(const OMTVALUE &v, const heftor &htor) {
return htor.h(const_cast<OMTVALUE>(v), htor.v);
}
int toku_omt_insert(OMT omt, OMTVALUE value, int(*h)(OMTVALUE, void*v), void *v, uint32_t *index) {
struct heftor htor = { .h = h, .v = v };
return omt->insert<heftor, call_heftor>(value, htor, index);
}
int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, uint32_t *index) {
struct heftor htor = { .h = h, .v = extra };
return V->find_zero<heftor, call_heftor>(htor, value, index);
}
int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, uint32_t *index) {
struct heftor htor = { .h = h, .v = extra };
return V->find<heftor, call_heftor>(htor, direction, value, index);
}
int toku_omt_split_at(OMT omt, OMT *newomtp, uint32_t index) {
OMT XMALLOC(newomt);
int r = omt->split_at(newomt, index);
if (r != 0) {
toku_free(newomt);
} else {
*newomtp = newomt;
}
return r;
}
int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) {
OMT XMALLOC(newomt);
newomt->merge(leftomt, rightomt);
toku_free(leftomt);
toku_free(rightomt);
*newomtp = newomt;
return 0;
}
int toku_omt_clone_noptr(OMT *dest, OMT src) {
OMT XMALLOC(omt);
omt->clone(*src);
*dest = omt;
return 0;
}
void toku_omt_clear(OMT omt) {
omt->clear();
}
size_t toku_omt_memory_size (OMT omt) {
return omt->memory_size();
}
This diff is collapsed.
......@@ -97,7 +97,8 @@ PATENT RIGHTS GRANT:
#include "cachetable.h"
#include "checkpoint.h"
#include "txn_manager.h"
#include "omt.h"
#include <util/omt.h>
int tokudb_recovery_trace = 0; // turn on recovery tracing, default off.
......@@ -174,7 +175,7 @@ static void file_map_tuple_destroy(struct file_map_tuple *tuple) {
// Map filenum to ft_handle
struct file_map {
OMT filenums;
toku::omt<struct file_map_tuple *> *filenums;
};
// The recovery environment
......@@ -200,31 +201,33 @@ typedef struct recover_env *RECOVER_ENV;
static void file_map_init(struct file_map *fmap) {
int r = toku_omt_create(&fmap->filenums);
assert(r == 0);
XMALLOC(fmap->filenums);
fmap->filenums->create();
}
static void file_map_destroy(struct file_map *fmap) {
toku_omt_destroy(&fmap->filenums);
fmap->filenums->destroy();
toku_free(fmap->filenums);
fmap->filenums = nullptr;
}
static uint32_t file_map_get_num_dictionaries(struct file_map *fmap) {
return toku_omt_size(fmap->filenums);
return fmap->filenums->size();
}
static void file_map_close_dictionaries(struct file_map *fmap, LSN oplsn) {
int r;
while (1) {
uint32_t n = toku_omt_size(fmap->filenums);
if (n == 0)
uint32_t n = fmap->filenums->size();
if (n == 0) {
break;
OMTVALUE v;
r = toku_omt_fetch(fmap->filenums, n-1, &v);
}
struct file_map_tuple *tuple;
r = fmap->filenums->fetch(n - 1, &tuple);
assert(r == 0);
r = toku_omt_delete_at(fmap->filenums, n-1);
r = fmap->filenums->delete_at(n - 1);
assert(r == 0);
struct file_map_tuple *CAST_FROM_VOIDP(tuple, v);
assert(tuple->ft_handle);
// Logging is on again, but we must pass the right LSN into close.
if (tuple->ft_handle) { // it's a DB, not a rollback file
......@@ -235,27 +238,29 @@ static void file_map_close_dictionaries(struct file_map *fmap, LSN oplsn) {
}
}
static int file_map_h(OMTVALUE omtv, void *v) {
struct file_map_tuple *CAST_FROM_VOIDP(a, omtv);
FILENUM *CAST_FROM_VOIDP(b, v);
if (a->filenum.fileid < b->fileid) return -1;
if (a->filenum.fileid > b->fileid) return +1;
return 0;
static int file_map_h(struct file_map_tuple *const &a, const FILENUM &b) {
if (a->filenum.fileid < b.fileid) {
return -1;
} else if (a->filenum.fileid > b.fileid) {
return 1;
} else {
return 0;
}
}
static int file_map_insert (struct file_map *fmap, FILENUM fnum, FT_HANDLE ft_handle, char *iname) {
struct file_map_tuple *XMALLOC(tuple);
file_map_tuple_init(tuple, fnum, ft_handle, iname);
int r = toku_omt_insert(fmap->filenums, tuple, file_map_h, &fnum, NULL);
int r = fmap->filenums->insert<FILENUM, file_map_h>(tuple, fnum, nullptr);
return r;
}
static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
OMTVALUE v; uint32_t idx;
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx);
uint32_t idx;
struct file_map_tuple *tuple;
int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx);
if (r == 0) {
struct file_map_tuple *CAST_FROM_VOIDP(tuple, v);
r = toku_omt_delete_at(fmap->filenums, idx);
r = fmap->filenums->delete_at(idx);
file_map_tuple_destroy(tuple);
toku_free(tuple);
}
......@@ -263,14 +268,15 @@ static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
// Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND)
static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) {
OMTVALUE v; uint32_t idx;
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx);
uint32_t idx;
struct file_map_tuple *tuple;
int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx);
if (r == 0) {
struct file_map_tuple *CAST_FROM_VOIDP(tuple, v);
assert(tuple->filenum.fileid == fnum.fileid);
*file_map_tuple = tuple;
} else {
assert(r == DB_NOTFOUND);
}
else assert(r==DB_NOTFOUND);
return r;
}
......@@ -320,7 +326,7 @@ static int recover_env_init (RECOVER_ENV renv,
static void recover_env_cleanup (RECOVER_ENV renv) {
int r;
assert(toku_omt_size(renv->fmap.filenums)==0);
invariant_zero(renv->fmap.filenums->size());
file_map_destroy(&renv->fmap);
if (renv->destroy_logger_at_end) {
......
......@@ -350,27 +350,6 @@ find_by_xid (const TOKUTXN &txn, const TXNID &txnidfind) {
return 0;
}
#if 0
static void
omt_insert_at_end_unless_recovery(OMT omt, int (*h)(OMTVALUE, void*extra), TOKUTXN txn, OMTVALUE v, bool for_recovery)
// Effect: insert v into omt that is sorted by xid gotten from txn.
// Rationale:
// During recovery, we get txns in the order that they did their first
// write operation, which is not necessarily monotonically increasing.
// During normal operation, txns are created with strictly increasing
// txnids, so we can always insert at the end.
{
int r;
uint32_t idx = toku_omt_size(omt);
if (for_recovery) {
r = toku_omt_find_zero(omt, h, (void *) txn->txnid64, NULL, &idx);
invariant(r==DB_NOTFOUND);
}
r = toku_omt_insert_at(omt, v, idx);
lazy_assert_zero(r);
}
#endif
static TXNID
max_xid(TXNID a, TXNID b) {
return a < b ? b : a;
......
......@@ -726,16 +726,18 @@ if __name__ == '__main__':
parser.add_option_group(test_group)
default_testnames = ['test_stress0.tdb',
'test_stress1.tdb',
'test_stress2.tdb',
'test_stress3.tdb',
'test_stress4.tdb',
'test_stress5.tdb',
'test_stress6.tdb',
'test_stress7.tdb',
'test_stress_hot_indexing.tdb',
'test_stress_openclose.tdb']
default_testnames = ['recover-test_stress2.tdb',
#'test_stress0.tdb',
#'test_stress1.tdb',
#'test_stress2.tdb',
#'test_stress3.tdb',
#'test_stress4.tdb',
#'test_stress5.tdb',
#'test_stress6.tdb',
#'test_stress7.tdb',
#'test_stress_hot_indexing.tdb',
#'test_stress_openclose.tdb']
]
default_recover_testnames = ['recover-test_stress1.tdb',
'recover-test_stress2.tdb',
'recover-test_stress3.tdb',
......
......@@ -97,14 +97,14 @@ PATENT RIGHTS GRANT:
#include <ft/tokuconst.h>
#include <ft/fttypes.h>
#include <ft/omt.h>
#include <ft/leafentry.h>
#include <ft/ule.h>
#include <ft/ule-internal.h>
#include <ft/le-cursor.h>
#include "indexer-internal.h"
#include <ft/xids-internal.h>
#include "indexer-internal.h"
struct txn {
TXNID xid;
TOKUTXN_STATE state;
......
......@@ -97,8 +97,6 @@ PATENT RIGHTS GRANT:
#include <ft/fttypes.h>
#include <ft/ft-ops.h>
#include <ft/minicron.h>
// TODO: remove vanilla omt in favor of templated one
#include <ft/omt.h>
#include <util/growable_array.h>
#include <util/omt.h>
......@@ -157,9 +155,8 @@ struct __toku_db_env_internal {
DB *directory; // Maps dnames to inames
DB *persistent_environment; // Stores environment settings, can be used for upgrade
// TODO: toku::omt<DB *>
OMT open_dbs_by_dname; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location)
OMT open_dbs_by_dict_id; // Stores open db handles, sorted by dictionary id and then by numerical value of pointer to the db (arbitrarily assigned memory location)
toku::omt<DB *> *open_dbs_by_dname; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location)
toku::omt<DB *> *open_dbs_by_dict_id; // Stores open db handles, sorted by dictionary id and then by numerical value of pointer to the db (arbitrarily assigned memory location)
toku_pthread_rwlock_t open_dbs_rwlock; // rwlock that protects the OMT of open dbs.
char *real_data_dir; // data dir used when the env is opened (relative to cwd, or absolute with leading /)
......
......@@ -1137,7 +1137,7 @@ env_close(DB_ENV * env, uint32_t flags) {
goto panic_and_quit_early;
}
if (env->i->open_dbs_by_dname) { //Verify that there are no open dbs.
if (toku_omt_size(env->i->open_dbs_by_dname) > 0) {
if (env->i->open_dbs_by_dname->size() > 0) {
err_msg = "Cannot close environment due to open DBs\n";
r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
goto panic_and_quit_early;
......@@ -1213,10 +1213,14 @@ env_close(DB_ENV * env, uint32_t flags) {
toku_free(env->i->real_log_dir);
if (env->i->real_tmp_dir)
toku_free(env->i->real_tmp_dir);
if (env->i->open_dbs_by_dname)
toku_omt_destroy(&env->i->open_dbs_by_dname);
if (env->i->open_dbs_by_dict_id)
toku_omt_destroy(&env->i->open_dbs_by_dict_id);
if (env->i->open_dbs_by_dname) {
env->i->open_dbs_by_dname->destroy();
toku_free(env->i->open_dbs_by_dname);
}
if (env->i->open_dbs_by_dict_id) {
env->i->open_dbs_by_dict_id->destroy();
toku_free(env->i->open_dbs_by_dict_id);
}
if (env->i->dir)
toku_free(env->i->dir);
toku_pthread_rwlock_destroy(&env->i->open_dbs_rwlock);
......@@ -2298,10 +2302,8 @@ struct ltm_iterate_requests_callback_extra {
};
static int
find_db_by_dict_id(OMTVALUE v, void *dict_id_v) {
DB *db = (DB *) v;
find_db_by_dict_id(DB *const &db, const DICTIONARY_ID &dict_id_find) {
DICTIONARY_ID dict_id = db->i->dict_id;
DICTIONARY_ID dict_id_find = *(DICTIONARY_ID *) dict_id_v;
if (dict_id.dictid < dict_id_find.dictid) {
return -1;
} else if (dict_id.dictid > dict_id_find.dictid) {
......@@ -2313,10 +2315,9 @@ find_db_by_dict_id(OMTVALUE v, void *dict_id_v) {
static DB *
locked_get_db_by_dict_id(DB_ENV *env, DICTIONARY_ID dict_id) {
OMTVALUE dbv;
int r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_dict_id,
(void *) &dict_id, &dbv, nullptr);
return r == 0 ? (DB *) dbv : nullptr;
DB *db;
int r = env->i->open_dbs_by_dict_id->find_zero<DICTIONARY_ID, find_db_by_dict_id>(dict_id, &db, nullptr);
return r == 0 ? db : nullptr;
}
static int ltm_iterate_requests_callback(DICTIONARY_ID dict_id, TXNID txnid,
......@@ -2578,10 +2579,10 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
// The escalate callback will need it to translate txnids to DB_TXNs
result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result);
r = toku_omt_create(&result->i->open_dbs_by_dname);
assert_zero(r);
r = toku_omt_create(&result->i->open_dbs_by_dict_id);
assert_zero(r);
XMALLOC(result->i->open_dbs_by_dname);
result->i->open_dbs_by_dname->create();
XMALLOC(result->i->open_dbs_by_dict_id);
result->i->open_dbs_by_dict_id->create();
toku_pthread_rwlock_init(&result->i->open_dbs_rwlock, NULL);
*envp = result;
......@@ -2607,9 +2608,7 @@ DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) {
// return <0 if v is earlier in omt than dbv
// return >0 if v is later in omt than dbv
static int
find_db_by_db_dname(OMTVALUE v, void *dbv) {
DB *db = (DB *) v; // DB* that is stored in the omt
DB *dbfind = (DB *) dbv; // extra, to be compared to v
find_db_by_db_dname(DB *const &db, DB *const &dbfind) {
int cmp;
const char *dname = db->i->dname;
const char *dnamefind = dbfind->i->dname;
......@@ -2621,9 +2620,7 @@ find_db_by_db_dname(OMTVALUE v, void *dbv) {
}
static int
find_db_by_db_dict_id(OMTVALUE v, void *dbv) {
DB *db = (DB *) v;
DB *dbfind = (DB *) dbv;
find_db_by_db_dict_id(DB *const &db, DB *const &dbfind) {
DICTIONARY_ID dict_id = db->i->dict_id;
DICTIONARY_ID dict_id_find = dbfind->i->dict_id;
if (dict_id.dictid < dict_id_find.dictid) {
......@@ -2646,20 +2643,18 @@ env_note_db_opened(DB_ENV *env, DB *db) {
assert(db->i->dname); // internal (non-user) dictionary has no dname
int r;
OMTVALUE v;
uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_db_by_db_dname,
db, &v, &idx);
r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
assert(r == DB_NOTFOUND);
r = toku_omt_insert_at(env->i->open_dbs_by_dname, db, idx);
r = env->i->open_dbs_by_dname->insert_at(db, idx);
assert_zero(r);
r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_db_dict_id,
db, &v, &idx);
r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
assert(r == DB_NOTFOUND);
r = toku_omt_insert_at(env->i->open_dbs_by_dict_id, db, idx);
r = env->i->open_dbs_by_dict_id->insert_at(db, idx);
assert_zero(r);
STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs_by_dname);
STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++;
if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) {
STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS);
......@@ -2672,58 +2667,44 @@ void
env_note_db_closed(DB_ENV *env, DB *db) {
toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
assert(db->i->dname); // internal (non-user) dictionary has no dname
assert(toku_omt_size(env->i->open_dbs_by_dname) > 0);
assert(toku_omt_size(env->i->open_dbs_by_dict_id) > 0);
assert(env->i->open_dbs_by_dname->size() > 0);
assert(env->i->open_dbs_by_dict_id->size() > 0);
int r;
OMTVALUE v;
uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_db_by_db_dname,
db, &v, &idx);
r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
assert_zero(r);
r = toku_omt_delete_at(env->i->open_dbs_by_dname, idx);
r = env->i->open_dbs_by_dname->delete_at(idx);
assert_zero(r);
r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_db_dict_id,
db, &v, &idx);
r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
assert_zero(r);
r = toku_omt_delete_at(env->i->open_dbs_by_dict_id, idx);
r = env->i->open_dbs_by_dict_id->delete_at(idx);
assert_zero(r);
STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++;
STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs_by_dname);
STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
}
static int
find_open_db_by_dname (OMTVALUE v, void *dnamev) {
DB *db = (DB *) v; // DB* that is stored in the omt
int cmp;
const char *dname = db->i->dname;
const char *dnamefind = (char *) dnamev;
cmp = strcmp(dname, dnamefind);
return cmp;
find_open_db_by_dname(DB *const &db, const char *const &dnamefind) {
return strcmp(db->i->dname, dnamefind);
}
// return true if there is any db open with the given dname
static bool
env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
int r;
bool rval;
OMTVALUE dbv;
uint32_t idx;
DB *db;
toku_pthread_rwlock_rdlock(&env->i->open_dbs_rwlock);
r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_open_db_by_dname, (void*)dname, &dbv, &idx);
if (r==0) {
DB *db = (DB *) dbv;
assert(strcmp(dname, db->i->dname) == 0);
rval = true;
}
else {
assert(r==DB_NOTFOUND);
rval = false;
int r = env->i->open_dbs_by_dname->find_zero<const char *, find_open_db_by_dname>(dname, &db, nullptr);
if (r == 0) {
invariant(strcmp(dname, db->i->dname) == 0);
} else {
invariant(r == DB_NOTFOUND);
}
toku_pthread_rwlock_rdunlock(&env->i->open_dbs_rwlock);
return rval;
return r == 0 ? true : false;
}
//We do not (yet?) support deleting subdbs by deleting the enclosing 'fname'
......
#ifndef FMUTEX_H
#define FMUTEX_H
// fair mutex
struct fmutex {
pthread_mutex_t mutex;
int mutex_held;
int num_want_mutex;
struct queue_item *wait_head;
struct queue_item *wait_tail;
};
// item on the queue
struct queue_item {
pthread_cond_t *cond;
struct queue_item *next;
};
static void enq_item(struct fmutex *fm, struct queue_item *const item) {
assert(item->next == NULL);
if (fm->wait_tail != NULL) {
fm->wait_tail->next = item;
} else {
assert(fm->wait_head == NULL);
fm->wait_head = item;
}
fm->wait_tail = item;
}
static pthread_cond_t *deq_item(struct fmutex *fm) {
assert(fm->wait_head != NULL);
assert(fm->wait_tail != NULL);
struct queue_item *item = fm->wait_head;
fm->wait_head = fm->wait_head->next;
if (fm->wait_tail == item) {
fm->wait_tail = NULL;
}
return item->cond;
}
void fmutex_create(struct fmutex *fm) {
pthread_mutex_init(&fm->mutex, NULL);
fm->mutex_held = 0;
fm->num_want_mutex = 0;
fm->wait_head = NULL;
fm->wait_tail = NULL;
}
void fmutex_destroy(struct fmutex *fm) {
pthread_mutex_destroy(&fm->mutex);
}
// Prerequisite: Holds m_mutex.
void fmutex_lock(struct fmutex *fm) {
pthread_mutex_lock(&fm->mutex);
if (fm->mutex_held == 0 || fm->num_want_mutex == 0) {
// No one holds the lock. Grant the write lock.
fm->mutex_held = 1;
return;
}
pthread_cond_t cond;
pthread_cond_init(&cond, NULL);
struct queue_item item = { .cond = &cond, .next = NULL };
enq_item(fm, &item);
// Wait for our turn.
++fm->num_want_mutex;
pthread_cond_wait(&cond, &fm->mutex);
pthread_cond_destroy(&cond);
// Now it's our turn.
assert(fm->num_want_mutex > 0);
assert(fm->mutex_held == 0);
// Not waiting anymore; grab the lock.
--fm->num_want_mutex;
fm->mutex_held = 1;
pthread_mutex_unlock();
}
void fmutex_mutex_unlock(struct fmutex *fm) {
pthread_mutex_lock();
fm->mutex_held = 0;
if (fm->wait_head == NULL) {
assert(fm->num_want_mutex == 0);
return;
}
assert(fm->num_want_mutex > 0);
// Grant lock to the next waiter
pthread_cond_t *cond = deq_item(fm);
pthread_cond_signal(cond);
pthread_mutex_unlock();
}
int fmutex_users(struct fmutex *fm) const {
return fm->mutex_held + fm->num_want_mutex;
}
int fmutex_blocked_users(struct fmutex *fm) const {
return fm->num_want_mutex;
}
#endif // FMUTEX_H
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment