Commit e583525a authored by John Esmet's avatar John Esmet

FT-258 Move cursor code into ft/cursor.h and ft/cursor.cc

parent 741efcd4
...@@ -30,6 +30,7 @@ set(FT_SOURCES ...@@ -30,6 +30,7 @@ set(FT_SOURCES
cachetable cachetable
checkpoint checkpoint
compress compress
cursor
ft ft
ft-cachetable-wrappers ft-cachetable-wrappers
ft-flusher ft-flusher
......
...@@ -101,6 +101,9 @@ PATENT RIGHTS GRANT: ...@@ -101,6 +101,9 @@ PATENT RIGHTS GRANT:
#include "wbuf.h" #include "wbuf.h"
#include <util/nb_mutex.h> #include <util/nb_mutex.h>
// TODO: reorganize this dependency
#include "ft/ft-ops.h" // for toku_maybe_truncate_file
//When the translation (btt) is stored on disk: //When the translation (btt) is stored on disk:
// In Header: // In Header:
// size_on_disk // size_on_disk
......
...@@ -129,6 +129,7 @@ PATENT RIGHTS GRANT: ...@@ -129,6 +129,7 @@ PATENT RIGHTS GRANT:
#include <toku_portability.h> #include <toku_portability.h>
#include <time.h> #include <time.h>
#include "ft/ft.h"
#include "fttypes.h" #include "fttypes.h"
#include "cachetable.h" #include "cachetable.h"
#include "log-internal.h" #include "log-internal.h"
......
This diff is collapsed.
...@@ -90,7 +90,7 @@ PATENT RIGHTS GRANT: ...@@ -90,7 +90,7 @@ PATENT RIGHTS GRANT:
#include <db.h> #include <db.h>
typedef bool(*FT_CHECK_INTERRUPT_CALLBACK)(void* extra); #include "ft/ft-internal.h"
/* an ft cursor is represented as a kv pair in a tree */ /* an ft cursor is represented as a kv pair in a tree */
struct ft_cursor { struct ft_cursor {
...@@ -110,3 +110,125 @@ struct ft_cursor { ...@@ -110,3 +110,125 @@ struct ft_cursor {
void *interrupt_cb_extra; void *interrupt_cb_extra;
}; };
typedef struct ft_cursor *FT_CURSOR; typedef struct ft_cursor *FT_CURSOR;
enum ft_search_direction_e {
FT_SEARCH_LEFT = 1, /* search left -> right, finds min xy as defined by the compare function */
FT_SEARCH_RIGHT = 2, /* search right -> left, finds max xy as defined by the compare function */
};
struct ft_search;
/* the search compare function should return 0 for all xy < kv and 1 for all xy >= kv
the compare function should be a step function from 0 to 1 for a left to right search
and 1 to 0 for a right to left search */
typedef int (*ft_search_compare_func_t)(const struct ft_search &, const DBT *);
/* the search object contains the compare function, search direction, and the kv pair that
is used in the compare function. the context is the user's private data */
struct ft_search {
ft_search_compare_func_t compare;
enum ft_search_direction_e direction;
const DBT *k;
void *context;
// To fix #3522, we need to remember the pivots that we have searched unsuccessfully.
// For example, when searching right (left), we call search->compare() on the ith pivot key. If search->compare(0 returns
// nonzero, then we search the ith subtree. If that subsearch returns DB_NOTFOUND then maybe the key isn't present in the
// tree. But maybe we are doing a DB_NEXT (DB_PREV), and everything was deleted. So we remember the pivot, and later we
// will only search subtrees which contain keys that are bigger than (less than) the pivot.
// The code is a kludge (even before this fix), and interacts strangely with the TOKUDB_FOUND_BUT_REJECTED (which is there
// because a failed DB_GET we would keep searching the rest of the tree). We probably should write the various lookup
// codes (NEXT, PREV, CURRENT, etc) more directly, and we should probably use a binary search within a node to search the
// pivots so that we can support a larger fanout.
// These changes (3312+3522) also (probably) introduce an isolation error (#3529).
// We must make sure we lock the right range for proper isolation level.
// There's probably a bug in which the following could happen.
// Thread A: Searches through deleted keys A,B,D,E and finds nothing, so searches the next leaf, releasing the YDB lock.
// Thread B: Inserts key C, and acquires the write lock, then commits.
// Thread A: Resumes, searching F,G,H and return success. Thread A then read-locks the range A-H, and doesn't notice
// the value C inserted by thread B. Thus a failure of serialization.
// See #3529.
// There also remains a potential thrashing problem. When we get a TOKUDB_TRY_AGAIN, we unpin everything. There's
// no guarantee that we will get everything pinned again. We ought to keep nodes pinned when we retry, except that on the
// way out with a DB_NOTFOUND we ought to unpin those nodes. See #3528.
DBT pivot_bound;
const DBT *k_bound;
};
/* initialize the search compare object */
static inline ft_search *ft_search_init(ft_search *search, ft_search_compare_func_t compare,
enum ft_search_direction_e direction,
const DBT *k, const DBT *k_bound, void *context) {
search->compare = compare;
search->direction = direction;
search->k = k;
search->context = context;
toku_init_dbt(&search->pivot_bound);
search->k_bound = k_bound;
return search;
}
static inline void ft_search_finish(ft_search *search) {
toku_destroy_dbt(&search->pivot_bound);
}
int toku_ft_lookup (FT_HANDLE ft_h, DBT *k, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor(FT_HANDLE ft_handle, FT_CURSOR*, TOKUTXN, bool, bool) __attribute__ ((warn_unused_result));
void toku_ft_cursor_set_prefetching(FT_CURSOR cursor);
bool toku_ft_cursor_prefetching(FT_CURSOR cursor);
bool toku_ft_cursor_not_set(FT_CURSOR cursor);
void toku_ft_cursor_set_leaf_mode(FT_CURSOR cursor);
// Sets a boolean on the ft cursor that prevents uncessary copying of the cursor duing a one query.
void toku_ft_cursor_set_temporary(FT_CURSOR cursor);
void toku_ft_cursor_remove_restriction(FT_CURSOR cursor);
void toku_ft_cursor_set_check_interrupt_cb(FT_CURSOR cursor, FT_CHECK_INTERRUPT_CALLBACK cb, void *extra);
int toku_ft_cursor_is_leaf_mode(FT_CURSOR cursor);
void toku_ft_cursor_set_range_lock(FT_CURSOR, const DBT *, const DBT *, bool, bool, int);
int toku_ft_cursor_first(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_last(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_next(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_prev(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_current(FT_CURSOR cursor, int op, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set_range(FT_CURSOR cursor, DBT *key, DBT *key_bound, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set_range_reverse(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
void toku_ft_cursor_close(FT_CURSOR cursor);
bool toku_ft_cursor_uninitialized(FT_CURSOR cursor) __attribute__ ((warn_unused_result));
void toku_ft_cursor_peek(FT_CURSOR cursor, const DBT **pkey, const DBT **pval);
int toku_ft_cursor_check_restricted_range(FT_CURSOR cursor, bytevec key, ITEMLEN keylen);
int toku_ft_cursor_shortcut(FT_CURSOR cursor, int direction, uint32_t index, bn_data *bd,
FT_GET_CALLBACK_FUNCTION getf, void *getf_v,
uint32_t *keylen, void **key, uint32_t *vallen, void **val);
// deprecated
int toku_ft_cursor_get(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, int get_flags);
int toku_ft_cursor_delete(FT_CURSOR cursor, int flags, TOKUTXN txn);
// used by get_key_after_bytes
int toku_ft_cursor_compare_one(const ft_search &search, const DBT *x);
int toku_ft_cursor_compare_set_range(const ft_search &search, const DBT *x);
...@@ -106,7 +106,6 @@ PATENT RIGHTS GRANT: ...@@ -106,7 +106,6 @@ PATENT RIGHTS GRANT:
#include "ft_layout_version.h" #include "ft_layout_version.h"
#include "block_allocator.h" #include "block_allocator.h"
#include "cachetable.h" #include "cachetable.h"
#include "ft-ops.h"
#include "toku_list.h" #include "toku_list.h"
#include <util/omt.h> #include <util/omt.h>
#include "leafentry.h" #include "leafentry.h"
...@@ -116,9 +115,10 @@ PATENT RIGHTS GRANT: ...@@ -116,9 +115,10 @@ PATENT RIGHTS GRANT:
#include <util/omt.h> #include <util/omt.h>
#include "ft/bndata.h" #include "ft/bndata.h"
#include "ft/rollback.h" #include "ft/rollback.h"
#include "ft/ft-search.h"
#include "ft/msg_buffer.h" #include "ft/msg_buffer.h"
struct ft_search;
enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */ enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */
enum { FT_MSG_OVERHEAD = (2 + sizeof(MSN)) }; // the type plus freshness plus MSN enum { FT_MSG_OVERHEAD = (2 + sizeof(MSN)) }; // the type plus freshness plus MSN
enum { FT_DEFAULT_FANOUT = 16 }; enum { FT_DEFAULT_FANOUT = 16 };
...@@ -181,7 +181,7 @@ struct ftnode_fetch_extra { ...@@ -181,7 +181,7 @@ struct ftnode_fetch_extra {
FT h; FT h;
// used in the case where type == ftnode_fetch_subset // used in the case where type == ftnode_fetch_subset
// parameters needed to find out which child needs to be decompressed (so it can be read) // parameters needed to find out which child needs to be decompressed (so it can be read)
ft_search_t* search; ft_search *search;
DBT range_lock_left_key, range_lock_right_key; DBT range_lock_left_key, range_lock_right_key;
bool left_is_neg_infty, right_is_pos_infty; bool left_is_neg_infty, right_is_pos_infty;
// states if we should try to aggressively fetch basement nodes // states if we should try to aggressively fetch basement nodes
...@@ -858,7 +858,7 @@ static inline void fill_bfe_for_keymatch( ...@@ -858,7 +858,7 @@ static inline void fill_bfe_for_keymatch(
static inline void fill_bfe_for_subset_read( static inline void fill_bfe_for_subset_read(
struct ftnode_fetch_extra *bfe, struct ftnode_fetch_extra *bfe,
FT h, FT h,
ft_search_t* search, ft_search *search,
const DBT *left, const DBT *left,
const DBT *right, const DBT *right,
bool left_is_neg_infty, bool left_is_neg_infty,
...@@ -951,7 +951,7 @@ toku_ft_search_which_child( ...@@ -951,7 +951,7 @@ toku_ft_search_which_child(
DESCRIPTOR desc, DESCRIPTOR desc,
ft_compare_func cmp, ft_compare_func cmp,
FTNODE node, FTNODE node,
ft_search_t *search ft_search *search
); );
bool bool
...@@ -1229,3 +1229,22 @@ void toku_flusher_thread_set_callback(void (*callback_f)(int, void*), void* extr ...@@ -1229,3 +1229,22 @@ void toku_flusher_thread_set_callback(void (*callback_f)(int, void*), void* extr
int toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h) __attribute__((nonnull)); int toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h) __attribute__((nonnull));
int toku_upgrade_msn_from_root_to_header(int fd, FT h) __attribute__((nonnull)); int toku_upgrade_msn_from_root_to_header(int fd, FT h) __attribute__((nonnull));
// A callback function is invoked with the key, and the data.
// The pointers (to the bytevecs) must not be modified. The data must be copied out before the callback function returns.
// Note: In the thread-safe version, the ftnode remains locked while the callback function runs. So return soon, and don't call the ft code from the callback function.
// If the callback function returns a nonzero value (an error code), then that error code is returned from the get function itself.
// The cursor object will have been updated (so that if result==0 the current value is the value being passed)
// (If r!=0 then the cursor won't have been updated.)
// If r!=0, it's up to the callback function to return that value of r.
// A 'key' bytevec of NULL means that element is not found (effectively infinity or
// -infinity depending on direction)
// When lock_only is false, the callback does optional lock tree locking and then processes the key and val.
// When lock_only is true, the callback only does optional lock tree locking.
typedef int (*FT_GET_CALLBACK_FUNCTION)(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *extra, bool lock_only);
typedef bool (*FT_CHECK_INTERRUPT_CALLBACK)(void *extra);
struct ft_search;
struct ft_cursor;
int toku_ft_search(FT_HANDLE ft_handle, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, struct ft_cursor *ftcursor, bool can_bulk_fetch);
This diff is collapsed.
...@@ -98,23 +98,8 @@ PATENT RIGHTS GRANT: ...@@ -98,23 +98,8 @@ PATENT RIGHTS GRANT:
#include <db.h> #include <db.h>
#include "cachetable.h" #include "cachetable.h"
#include "log.h" #include "log.h"
#include "ft-search.h"
#include "compress.h" #include "compress.h"
#include "ft_msg.h" #include "ft_msg.h"
#include "ft/cursor.h"
// A callback function is invoked with the key, and the data.
// The pointers (to the bytevecs) must not be modified. The data must be copied out before the callback function returns.
// Note: In the thread-safe version, the ftnode remains locked while the callback function runs. So return soon, and don't call the ft code from the callback function.
// If the callback function returns a nonzero value (an error code), then that error code is returned from the get function itself.
// The cursor object will have been updated (so that if result==0 the current value is the value being passed)
// (If r!=0 then the cursor won't have been updated.)
// If r!=0, it's up to the callback function to return that value of r.
// A 'key' bytevec of NULL means that element is not found (effectively infinity or
// -infinity depending on direction)
// When lock_only is false, the callback does optional lock tree locking and then processes the key and val.
// When lock_only is true, the callback only does optional lock tree locking.
typedef int(*FT_GET_CALLBACK_FUNCTION)(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *extra, bool lock_only);
int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *, int nodesize, int basementnodesize, enum toku_compression_method compression_method, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*)) __attribute__ ((warn_unused_result)); int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *, int nodesize, int basementnodesize, enum toku_compression_method compression_method, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*)) __attribute__ ((warn_unused_result));
...@@ -208,8 +193,6 @@ toku_ft_handle_open_with_dict_id( ...@@ -208,8 +193,6 @@ toku_ft_handle_open_with_dict_id(
DICTIONARY_ID use_dictionary_id DICTIONARY_ID use_dictionary_id
) __attribute__ ((warn_unused_result)); ) __attribute__ ((warn_unused_result));
int toku_ft_lookup (FT_HANDLE ft_h, DBT *k, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
// Effect: Insert a key and data pair into an ft // Effect: Insert a key and data pair into an ft
void toku_ft_insert (FT_HANDLE ft_h, DBT *k, DBT *v, TOKUTXN txn); void toku_ft_insert (FT_HANDLE ft_h, DBT *k, DBT *v, TOKUTXN txn);
...@@ -262,36 +245,6 @@ extern int toku_ft_debug_mode; ...@@ -262,36 +245,6 @@ extern int toku_ft_debug_mode;
int toku_verify_ft (FT_HANDLE ft_h) __attribute__ ((warn_unused_result)); int toku_verify_ft (FT_HANDLE ft_h) __attribute__ ((warn_unused_result));
int toku_verify_ft_with_progress (FT_HANDLE ft_h, int (*progress_callback)(void *extra, float progress), void *extra, int verbose, int keep_going) __attribute__ ((warn_unused_result)); int toku_verify_ft_with_progress (FT_HANDLE ft_h, int (*progress_callback)(void *extra, float progress), void *extra, int verbose, int keep_going) __attribute__ ((warn_unused_result));
int toku_ft_cursor (FT_HANDLE, FT_CURSOR*, TOKUTXN, bool, bool) __attribute__ ((warn_unused_result));
void toku_ft_cursor_set_leaf_mode(FT_CURSOR);
// Sets a boolean on the ft cursor that prevents uncessary copying of
// the cursor duing a one query.
void toku_ft_cursor_set_temporary(FT_CURSOR);
void toku_ft_cursor_remove_restriction(FT_CURSOR);
void toku_ft_cursor_set_check_interrupt_cb(FT_CURSOR ftcursor, FT_CHECK_INTERRUPT_CALLBACK cb, void *extra);
int toku_ft_cursor_is_leaf_mode(FT_CURSOR);
void toku_ft_cursor_set_range_lock(FT_CURSOR, const DBT *, const DBT *, bool, bool, int);
// get is deprecated in favor of the individual functions below
int toku_ft_cursor_get (FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, int get_flags) __attribute__ ((warn_unused_result));
int toku_ft_cursor_first(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_last(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_next(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_prev(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_current(FT_CURSOR cursor, int op, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set_range(FT_CURSOR cursor, DBT *key, DBT *key_bound, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set_range_reverse(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_get_both_range(FT_CURSOR cursor, DBT *key, DBT *val, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_get_both_range_reverse(FT_CURSOR cursor, DBT *key, DBT *val, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_delete(FT_CURSOR cursor, int flags, TOKUTXN) __attribute__ ((warn_unused_result));
void toku_ft_cursor_close (FT_CURSOR curs);
bool toku_ft_cursor_uninitialized(FT_CURSOR c) __attribute__ ((warn_unused_result));
void toku_ft_cursor_peek(FT_CURSOR cursor, const DBT **pkey, const DBT **pval);
DICTIONARY_ID toku_ft_get_dictionary_id(FT_HANDLE); DICTIONARY_ID toku_ft_get_dictionary_id(FT_HANDLE);
enum ft_flags { enum ft_flags {
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*
COPYING CONDITIONS NOTICE:
This program is free software; you can redistribute it and/or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation, and provided that the
following conditions are met:
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below).
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below) in the documentation and/or other materials
provided with the distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc.
DISCLAIMER:
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
UNIVERSITY PATENT NOTICE:
The technology is licensed by the Massachusetts Institute of
Technology, Rutgers State University of New Jersey, and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No. 11/760379 and to the patents
and/or patent applications resulting from it.
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
This software is covered by US Patent No. 8,489,638.
PATENT RIGHTS GRANT:
"THIS IMPLEMENTATION" means the copyrightable works distributed by
Tokutek as part of the Fractal Tree project.
"PATENT CLAIMS" means the claims of patents that are owned or
licensable by Tokutek, both currently or in the future; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
"PATENT CHALLENGE" shall mean a challenge to the validity,
patentability, enforceability and/or non-infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
Tokutek hereby grants to you, for the term and geographical scope of
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, transfer, and
otherwise run, modify, and propagate the contents of THIS
IMPLEMENTATION, where such license applies only to the PATENT
CLAIMS. This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION. If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any rights
granted to you under this License shall terminate as of the date
such litigation is filed. If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE, then Tokutek may terminate any rights granted to you
under this License.
*/
#pragma once
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "ft/ybt.h"
enum ft_search_direction_e {
FT_SEARCH_LEFT = 1, /* search left -> right, finds min xy as defined by the compare function */
FT_SEARCH_RIGHT = 2, /* search right -> left, finds max xy as defined by the compare function */
};
struct ft_search;
/* the search compare function should return 0 for all xy < kv and 1 for all xy >= kv
the compare function should be a step function from 0 to 1 for a left to right search
and 1 to 0 for a right to left search */
typedef int (*ft_search_compare_func_t)(const struct ft_search &, const DBT *);
/* the search object contains the compare function, search direction, and the kv pair that
is used in the compare function. the context is the user's private data */
typedef struct ft_search {
ft_search_compare_func_t compare;
enum ft_search_direction_e direction;
const DBT *k;
void *context;
// To fix #3522, we need to remember the pivots that we have searched unsuccessfully.
// For example, when searching right (left), we call search->compare() on the ith pivot key. If search->compare(0 returns
// nonzero, then we search the ith subtree. If that subsearch returns DB_NOTFOUND then maybe the key isn't present in the
// tree. But maybe we are doing a DB_NEXT (DB_PREV), and everything was deleted. So we remember the pivot, and later we
// will only search subtrees which contain keys that are bigger than (less than) the pivot.
// The code is a kludge (even before this fix), and interacts strangely with the TOKUDB_FOUND_BUT_REJECTED (which is there
// because a failed DB_GET we would keep searching the rest of the tree). We probably should write the various lookup
// codes (NEXT, PREV, CURRENT, etc) more directly, and we should probably use a binary search within a node to search the
// pivots so that we can support a larger fanout.
// These changes (3312+3522) also (probably) introduce an isolation error (#3529).
// We must make sure we lock the right range for proper isolation level.
// There's probably a bug in which the following could happen.
// Thread A: Searches through deleted keys A,B,D,E and finds nothing, so searches the next leaf, releasing the YDB lock.
// Thread B: Inserts key C, and acquires the write lock, then commits.
// Thread A: Resumes, searching F,G,H and return success. Thread A then read-locks the range A-H, and doesn't notice
// the value C inserted by thread B. Thus a failure of serialization.
// See #3529.
// There also remains a potential thrashing problem. When we get a TOKUDB_TRY_AGAIN, we unpin everything. There's
// no guarantee that we will get everything pinned again. We ought to keep nodes pinned when we retry, except that on the
// way out with a DB_NOTFOUND we ought to unpin those nodes. See #3528.
DBT pivot_bound;
const DBT *k_bound;
} ft_search_t;
/* initialize the search compare object */
static inline ft_search_t *ft_search_init(ft_search_t *so, ft_search_compare_func_t compare, enum ft_search_direction_e direction,
const DBT *k, const DBT *k_bound, void *context) {
so->compare = compare;
so->direction = direction;
so->k = k;
so->context = context;
toku_init_dbt(&so->pivot_bound);
so->k_bound = k_bound;
return so;
}
static inline void ft_search_finish(ft_search_t *so) {
toku_destroy_dbt(&so->pivot_bound);
}
...@@ -96,7 +96,6 @@ PATENT RIGHTS GRANT: ...@@ -96,7 +96,6 @@ PATENT RIGHTS GRANT:
#include <db.h> #include <db.h>
#include "cachetable.h" #include "cachetable.h"
#include "log.h" #include "log.h"
#include "ft-search.h"
#include "ft-ops.h" #include "ft-ops.h"
#include "compress.h" #include "compress.h"
......
...@@ -211,6 +211,7 @@ void wbuf_nocrc_LEAFENTRY(struct wbuf *w, LEAFENTRY le); ...@@ -211,6 +211,7 @@ void wbuf_nocrc_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
int print_klpair (FILE *outf, const void* key, uint32_t keylen, LEAFENTRY v); // Print a leafentry out in human-readable form. int print_klpair (FILE *outf, const void* key, uint32_t keylen, LEAFENTRY v); // Print a leafentry out in human-readable form.
int le_latest_is_del(LEAFENTRY le); // Return true if it is a provisional delete. int le_latest_is_del(LEAFENTRY le); // Return true if it is a provisional delete.
int le_val_is_del(LEAFENTRY le, bool is_snapshot_read, TOKUTXN txn); // Returns true if the value that is to be read is empty
bool le_is_clean(LEAFENTRY le); //Return how many xids exist (0 does not count) bool le_is_clean(LEAFENTRY le); //Return how many xids exist (0 does not count)
bool le_has_xids(LEAFENTRY le, XIDS xids); // Return true transaction represented by xids is still provisional in this leafentry (le's xid stack is a superset or equal to xids) bool le_has_xids(LEAFENTRY le, XIDS xids); // Return true transaction represented by xids is still provisional in this leafentry (le's xid stack is a superset or equal to xids)
void* le_latest_val (LEAFENTRY le); // Return the latest val (return NULL for provisional deletes) void* le_latest_val (LEAFENTRY le); // Return the latest val (return NULL for provisional deletes)
...@@ -227,10 +228,13 @@ uint64_t le_outermost_uncommitted_xid (LEAFENTRY le); ...@@ -227,10 +228,13 @@ uint64_t le_outermost_uncommitted_xid (LEAFENTRY le);
// r|r!=0&&r!=TOKUDB_ACCEPT: Quit early, return r, because something unexpected went wrong (error case) // r|r!=0&&r!=TOKUDB_ACCEPT: Quit early, return r, because something unexpected went wrong (error case)
typedef int(*LE_ITERATE_CALLBACK)(TXNID id, TOKUTXN context); typedef int(*LE_ITERATE_CALLBACK)(TXNID id, TOKUTXN context);
int le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_empty, TOKUTXN context);
int le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vallenp, TOKUTXN context); int le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vallenp, TOKUTXN context);
void le_extract_val(LEAFENTRY le,
// should we return the entire leafentry as the val?
bool is_leaf_mode, bool is_snapshot_read,
TOKUTXN ttxn, uint32_t *vallen, void **val);
size_t size_t
leafentry_disksize_13(LEAFENTRY_13 le); leafentry_disksize_13(LEAFENTRY_13 le);
......
...@@ -91,7 +91,8 @@ PATENT RIGHTS GRANT: ...@@ -91,7 +91,8 @@ PATENT RIGHTS GRANT:
/* Dump the log from stdin to stdout. */ /* Dump the log from stdin to stdout. */
#include <ft/log_header.h> #include "ft/log_header.h"
#include "ft/logger.h"
static void newmain (int count) { static void newmain (int count) {
int i; int i;
......
...@@ -90,7 +90,7 @@ PATENT RIGHTS GRANT: ...@@ -90,7 +90,7 @@ PATENT RIGHTS GRANT:
#include "test.h" #include "test.h"
#include "ft/cursor.h"
enum ftnode_verify_type { enum ftnode_verify_type {
read_all=1, read_all=1,
...@@ -224,13 +224,13 @@ test2(int fd, FT ft_h, FTNODE *dn) { ...@@ -224,13 +224,13 @@ test2(int fd, FT ft_h, FTNODE *dn) {
memset(&dummy_db, 0, sizeof(dummy_db)); memset(&dummy_db, 0, sizeof(dummy_db));
memset(&left, 0, sizeof(left)); memset(&left, 0, sizeof(left));
memset(&right, 0, sizeof(right)); memset(&right, 0, sizeof(right));
ft_search_t search_t; ft_search search;
ft_h->compare_fun = string_key_cmp; ft_h->compare_fun = string_key_cmp;
fill_bfe_for_subset_read( fill_bfe_for_subset_read(
&bfe_subset, &bfe_subset,
ft_h, ft_h,
ft_search_init(&search_t, search_cmp, FT_SEARCH_LEFT, nullptr, nullptr, nullptr), ft_search_init(&search, search_cmp, FT_SEARCH_LEFT, nullptr, nullptr, nullptr),
&left, &left,
&right, &right,
true, true,
......
...@@ -107,6 +107,7 @@ PATENT RIGHTS GRANT: ...@@ -107,6 +107,7 @@ PATENT RIGHTS GRANT:
#include "logger.h" #include "logger.h"
#include "fttypes.h" #include "fttypes.h"
#include "ft-ops.h" #include "ft-ops.h"
#include "cursor.h"
#include "cachetable.h" #include "cachetable.h"
#include "cachetable-internal.h" #include "cachetable-internal.h"
......
...@@ -786,6 +786,21 @@ void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id) { ...@@ -786,6 +786,21 @@ void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id) {
txn->client_id = client_id; txn->client_id = client_id;
} }
int toku_txn_reads_txnid(TXNID txnid, TOKUTXN txn) {
int r = 0;
TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(txn);
if (oldest_live_in_snapshot == TXNID_NONE && txnid < txn->snapshot_txnid64) {
r = TOKUDB_ACCEPT;
} else if (txnid < oldest_live_in_snapshot || txnid == txn->txnid.parent_id64) {
r = TOKUDB_ACCEPT;
} else if (txnid > txn->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(*txn->live_root_txn_list, txnid)) {
r = 0;
} else {
r = TOKUDB_ACCEPT;
}
return r;
}
#include <toku_race_tools.h> #include <toku_race_tools.h>
void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void); void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void);
void toku_txn_status_helgrind_ignore(void) { void toku_txn_status_helgrind_ignore(void) {
......
...@@ -221,3 +221,17 @@ bool toku_txn_has_spilled_rollback(TOKUTXN txn); ...@@ -221,3 +221,17 @@ bool toku_txn_has_spilled_rollback(TOKUTXN txn);
uint64_t toku_txn_get_client_id(TOKUTXN txn); uint64_t toku_txn_get_client_id(TOKUTXN txn);
void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id); void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id);
//
// This function is used by the leafentry iterators.
// returns TOKUDB_ACCEPT if live transaction context is allowed to read a value
// that is written by transaction with LSN of id
// live transaction context may read value if either id is the root ancestor of context, or if
// id was committed before context's snapshot was taken.
// For id to be committed before context's snapshot was taken, the following must be true:
// - id < context->snapshot_txnid64 AND id is not in context's live root transaction list
// For the above to NOT be true:
// - id > context->snapshot_txnid64 OR id is in context's live root transaction list
//
int toku_txn_reads_txnid(TXNID txnid, TOKUTXN txn);
...@@ -103,17 +103,17 @@ PATENT RIGHTS GRANT: ...@@ -103,17 +103,17 @@ PATENT RIGHTS GRANT:
// TokuWiki/Imp/TransactionsOverview. // TokuWiki/Imp/TransactionsOverview.
#include <toku_portability.h> #include <toku_portability.h>
#include "fttypes.h" #include "ft/fttypes.h"
#include "ft-internal.h" #include "ft/ft-internal.h"
#include "ft/ft_msg.h"
#include "ft/leafentry.h"
#include "ft/logger.h"
#include "ft/txn.h"
#include "ft/txn_manager.h"
#include "ft/ule.h"
#include "ft/ule-internal.h"
#include "ft/xids.h"
#include <util/omt.h> #include <util/omt.h>
#include "leafentry.h"
#include "xids.h"
#include "ft_msg.h"
#include "ule.h"
#include "txn_manager.h"
#include "ule-internal.h"
#include <util/status.h> #include <util/status.h>
#include <util/scoped_malloc.h> #include <util/scoped_malloc.h>
#include <util/partitioned_counter.h> #include <util/partitioned_counter.h>
...@@ -362,6 +362,9 @@ ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) { ...@@ -362,6 +362,9 @@ ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) {
done:; done:;
} }
// TODO: Clean this up
extern bool garbage_collection_debug;
static void static void
ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) { ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) {
if (ule->num_cuxrs == 1) goto done; if (ule->num_cuxrs == 1) goto done;
...@@ -2079,7 +2082,7 @@ ule_verify_xids(ULE ule, uint32_t interesting, TXNID *xids) { ...@@ -2079,7 +2082,7 @@ ule_verify_xids(ULE ule, uint32_t interesting, TXNID *xids) {
// is_delp - output parameter that returns answer // is_delp - output parameter that returns answer
// context - parameter for f // context - parameter for f
// //
int static int
le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN context) { le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN context) {
#if ULE_DEBUG #if ULE_DEBUG
ULE_S ule; ULE_S ule;
...@@ -2147,6 +2150,27 @@ le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN co ...@@ -2147,6 +2150,27 @@ le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN co
return r; return r;
} }
//
// Returns true if the value that is to be read is empty.
//
int le_val_is_del(LEAFENTRY le, bool is_snapshot_read, TOKUTXN txn) {
int rval;
if (is_snapshot_read) {
bool is_del = false;
le_iterate_is_del(
le,
toku_txn_reads_txnid,
&is_del,
txn
);
rval = is_del;
}
else {
rval = le_latest_is_del(le);
}
return rval;
}
// //
// Iterates over "possible" TXNIDs in a leafentry's stack, until one is accepted by 'f'. Set // Iterates over "possible" TXNIDs in a leafentry's stack, until one is accepted by 'f'. Set
// valpp and vallenp to value and length associated with accepted TXNID // valpp and vallenp to value and length associated with accepted TXNID
...@@ -2267,6 +2291,27 @@ verify_is_empty:; ...@@ -2267,6 +2291,27 @@ verify_is_empty:;
return r; return r;
} }
void le_extract_val(LEAFENTRY le,
// should we return the entire leafentry as the val?
bool is_leaf_mode, bool is_snapshot_read,
TOKUTXN ttxn, uint32_t *vallen, void **val) {
if (is_leaf_mode) {
*val = le;
*vallen = leafentry_memsize(le);
} else if (is_snapshot_read) {
int r = le_iterate_val(
le,
toku_txn_reads_txnid,
val,
vallen,
ttxn
);
lazy_assert_zero(r);
} else {
*val = le_latest_val_and_len(le, vallen);
}
}
// This is an on-disk format. static_asserts verify everything is packed and aligned correctly. // This is an on-disk format. static_asserts verify everything is packed and aligned correctly.
struct __attribute__ ((__packed__)) leafentry_13 { struct __attribute__ ((__packed__)) leafentry_13 {
struct leafentry_committed_13 { struct leafentry_committed_13 {
......
...@@ -94,8 +94,10 @@ PATENT RIGHTS GRANT: ...@@ -94,8 +94,10 @@ PATENT RIGHTS GRANT:
#include <db.h> #include <db.h>
#include <limits.h> #include <limits.h>
#include <ft/cachetable.h>
#include <ft/fttypes.h> #include <ft/fttypes.h>
#include <ft/ft-ops.h> #include <ft/logger.h>
#include <ft/txn.h>
#include <util/growable_array.h> #include <util/growable_array.h>
#include <util/minicron.h> #include <util/minicron.h>
......
...@@ -97,6 +97,7 @@ PATENT RIGHTS GRANT: ...@@ -97,6 +97,7 @@ PATENT RIGHTS GRANT:
#include "ydb-internal.h" #include "ydb-internal.h"
#include "ydb_cursor.h" #include "ydb_cursor.h"
#include "ydb_row_lock.h" #include "ydb_row_lock.h"
#include "ft/cursor.h"
static YDB_C_LAYER_STATUS_S ydb_c_layer_status; static YDB_C_LAYER_STATUS_S ydb_c_layer_status;
#ifdef STATUS_VALUE #ifdef STATUS_VALUE
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment