Commit 3a931cdc authored by Kentoku SHIBA's avatar Kentoku SHIBA

casual search

parent 7d74d0f6
......@@ -153,6 +153,7 @@ ha_spider::ha_spider(
#ifdef HANDLER_HAS_DIRECT_AGGREGATE
result_list.direct_aggregate = FALSE;
#endif
result_list.casual_read = NULL;
DBUG_VOID_RETURN;
}
......@@ -256,6 +257,7 @@ ha_spider::ha_spider(
#ifdef HANDLER_HAS_DIRECT_AGGREGATE
result_list.direct_aggregate = FALSE;
#endif
result_list.casual_read = NULL;
ref_length = sizeof(SPIDER_POSITION);
DBUG_VOID_RETURN;
}
......@@ -1670,6 +1672,7 @@ int ha_spider::reset()
}
*/
memset(need_mons, 0, sizeof(int) * share->link_count);
memset(result_list.casual_read, 0, sizeof(int) * share->link_count);
rm_bulk_tmp_table();
for (roop_count = share->link_count - 1; roop_count >= 0; roop_count--)
{
......@@ -2132,6 +2135,9 @@ int ha_spider::index_read_map_internal(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok))))
{
......@@ -2616,6 +2622,9 @@ int ha_spider::index_read_last_map_internal(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok))))
{
......@@ -3074,6 +3083,9 @@ int ha_spider::index_first_internal(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok))))
{
......@@ -3450,6 +3462,9 @@ int ha_spider::index_last_internal(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok))))
{
......@@ -3885,6 +3900,9 @@ int ha_spider::read_range_first_internal(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok))))
{
......@@ -4467,6 +4485,9 @@ int ha_spider::read_multi_range_first_internal(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok));
if (
......@@ -5239,6 +5260,9 @@ int ha_spider::read_multi_range_first_internal(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok))))
{
......@@ -5813,6 +5837,9 @@ int ha_spider::read_multi_range_next(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok));
if (
......@@ -6595,6 +6622,9 @@ int ha_spider::read_multi_range_next(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok))))
{
......@@ -7213,6 +7243,9 @@ int ha_spider::rnd_next_internal(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok))))
{
......@@ -7796,6 +7829,9 @@ int ha_spider::ft_read_internal(
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list.bgs_phase > 0)
{
if ((error_num = spider_check_and_init_casual_read(trx->thd, this,
roop_count)))
DBUG_RETURN(error_num);
if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
TRUE, FALSE, (roop_count != link_ok))))
{
......@@ -8689,6 +8725,14 @@ int ha_spider::pre_records()
{
DBUG_RETURN(0);
}
THD *thd = trx->thd;
if (
spider_param_sync_autocommit(thd) &&
(!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
) {
result_list.casual_read[search_link_idx] =
spider_param_casual_read(thd, share->casual_read);
}
if ((error_num = spider_db_show_records(this, search_link_idx, TRUE)))
{
DBUG_RETURN(check_error_mode(error_num));
......@@ -8708,6 +8752,17 @@ ha_rows ha_spider::records()
use_pre_records = FALSE;
DBUG_RETURN(0);
}
if (!use_pre_records)
{
THD *thd = trx->thd;
if (
spider_param_sync_autocommit(thd) &&
(!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
) {
result_list.casual_read[search_link_idx] =
spider_param_casual_read(thd, share->casual_read);
}
}
if ((error_num = spider_db_show_records(this, search_link_idx, FALSE)))
{
use_pre_records = FALSE;
......
......@@ -233,6 +233,7 @@ void spider_free_conn_from_trx(
) {
/* conn_recycle_mode == 1 */
*conn->conn_key = '0';
conn->casual_read_base_conn = NULL;
if (
conn->quick_target &&
spider_db_free_result((ha_spider *) conn->quick_target, FALSE)
......@@ -1100,6 +1101,91 @@ int spider_free_conn(
DBUG_RETURN(0);
}
int spider_check_and_get_casual_read_conn(
THD *thd,
ha_spider *spider,
int link_idx
) {
int error_num;
DBUG_ENTER("spider_check_and_get_casual_read_conn");
if (spider->result_list.casual_read[link_idx])
{
SPIDER_CONN *conn = spider->conns[link_idx];
if (conn->casual_read_query_id != thd->query_id)
{
conn->casual_read_query_id = thd->query_id;
conn->casual_read_current_id = 2;
}
if (spider->result_list.casual_read[link_idx] == 1)
{
spider->result_list.casual_read[link_idx] = conn->casual_read_current_id;
++conn->casual_read_current_id;
if (conn->casual_read_current_id > 63)
{
conn->casual_read_current_id = 2;
}
}
char first_byte_bak = *spider->conn_keys[link_idx];
*spider->conn_keys[link_idx] =
'0' + spider->result_list.casual_read[link_idx];
if (
!(spider->conns[link_idx] =
spider_get_conn(spider->share, link_idx,
spider->conn_keys[link_idx], spider->trx,
spider, FALSE, TRUE, SPIDER_CONN_KIND_MYSQL,
&error_num))
) {
*spider->conn_keys[link_idx] = first_byte_bak;
DBUG_RETURN(error_num);
}
*spider->conn_keys[link_idx] = first_byte_bak;
spider->conns[link_idx]->casual_read_base_conn = conn;
conn = spider->conns[link_idx];
spider_check_and_set_autocommit(thd, conn, NULL);
}
DBUG_RETURN(0);
}
int spider_check_and_init_casual_read(
THD *thd,
ha_spider *spider,
int link_idx
) {
int error_num;
SPIDER_RESULT_LIST *result_list = &spider->result_list;
SPIDER_SHARE *share = spider->share;
DBUG_ENTER("spider_check_and_init_casual_read");
if (
spider_param_sync_autocommit(thd) &&
(!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(
result_list->direct_order_limit
#ifdef HANDLER_HAS_DIRECT_AGGREGATE
|| result_list->direct_aggregate
#endif
)
) {
if (!result_list->casual_read[link_idx])
{
result_list->casual_read[link_idx] =
spider_param_casual_read(thd, share->casual_read);
}
if ((error_num = spider_check_and_get_casual_read_conn(thd, spider,
link_idx)))
{
DBUG_RETURN(error_num);
}
SPIDER_CONN *conn = spider->conns[link_idx];
if (
conn->casual_read_base_conn &&
(error_num = spider_create_conn_thread(conn))
) {
DBUG_RETURN(error_num);
}
}
DBUG_RETURN(0);
}
void spider_conn_queue_connect(
SPIDER_SHARE *share,
SPIDER_CONN *conn,
......
......@@ -70,6 +70,18 @@ int spider_free_conn(
SPIDER_CONN *conn
);
int spider_check_and_get_casual_read_conn(
THD *thd,
ha_spider *spider,
int link_idx
);
int spider_check_and_init_casual_read(
THD *thd,
ha_spider *spider,
int link_idx
);
void spider_conn_queue_connect(
SPIDER_SHARE *share,
SPIDER_CONN *conn,
......
......@@ -5177,29 +5177,42 @@ int spider_db_show_records(
bool pre_call
) {
int error_num;
SPIDER_CONN *conn = spider->conns[link_idx];
THD *thd = spider->trx->thd;
SPIDER_CONN *conn;
DBUG_ENTER("spider_db_show_records");
if (pre_call)
{
if (spider_param_bgs_mode(spider->trx->thd, spider->share->bgs_mode))
if (spider_param_bgs_mode(thd, spider->share->bgs_mode))
{
if ((error_num = spider_check_and_get_casual_read_conn(thd, spider,
link_idx)))
{
DBUG_RETURN(error_num);
}
conn = spider->conns[link_idx];
if (!(error_num = spider_create_conn_thread(conn)))
{
spider_bg_conn_simple_action(conn, SPIDER_BG_SIMPLE_RECORDS, FALSE,
spider, link_idx, (int *) &spider->result_list.bgs_error);
}
} else {
conn = spider->conns[link_idx];
error_num = spider->dbton_handler[conn->dbton_id]->show_records(
link_idx
);
}
} else {
conn = spider->conns[link_idx];
if (spider->use_pre_records)
{
if (spider_param_bgs_mode(spider->trx->thd, spider->share->bgs_mode))
if (spider_param_bgs_mode(thd, spider->share->bgs_mode))
{
spider_bg_conn_wait(conn);
error_num = spider->result_list.bgs_error;
if (conn->casual_read_base_conn)
{
spider->conns[link_idx] = conn->casual_read_base_conn;
}
} else {
error_num = 0;
}
......
......@@ -1637,6 +1637,7 @@ typedef struct st_spider_result_list
longlong first_read;
longlong second_read;
int set_split_read_count;
int *casual_read;
#ifndef WITHOUT_SPIDER_BG_SEARCH
/* 0:nomal 1:store 2:store end */
volatile
......
......@@ -13,7 +13,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define SPIDER_DETAIL_VERSION "3.1.11"
#define SPIDER_DETAIL_VERSION "3.1.12"
#define SPIDER_HEX_VERSION 0x0301
#if MYSQL_VERSION_ID < 50500
......@@ -264,6 +264,9 @@ typedef struct st_spider_conn
uint opened_handlers;
ulonglong conn_id;
ulonglong connection_id;
query_id_t casual_read_query_id;
uint casual_read_current_id;
st_spider_conn *casual_read_base_conn;
pthread_mutex_t mta_conn_mutex;
volatile bool mta_conn_mutex_lock_already;
volatile bool mta_conn_mutex_unlock_later;
......@@ -738,6 +741,7 @@ typedef struct st_spider_share
#ifdef HA_CAN_FORCE_BULK_DELETE
int force_bulk_delete;
#endif
int casual_read;
int bka_mode;
char *bka_engine;
......
......@@ -2887,6 +2887,33 @@ uint spider_param_internal_xa_id_type(
DBUG_RETURN(THDVAR(thd, internal_xa_id_type));
}
/*
-1 :use table parameter
0 :OFF
1 :automatic channel
2-63 :use custom channel
*/
static MYSQL_THDVAR_INT(
casual_read, /* name */
PLUGIN_VAR_RQCMDARG, /* opt */
"Read casually if it is possible", /* comment */
NULL, /* check */
NULL, /* update */
-1, /* def */
-1, /* min */
63, /* max */
0 /* blk */
);
int spider_param_casual_read(
THD *thd,
int casual_read
) {
DBUG_ENTER("spider_param_casual_read");
DBUG_RETURN(THDVAR(thd, casual_read) == -1 ?
casual_read : THDVAR(thd, casual_read));
}
static struct st_mysql_storage_engine spider_storage_engine =
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
......@@ -3017,6 +3044,7 @@ static struct st_mysql_sys_var* spider_system_variables[] = {
MYSQL_SYSVAR(log_result_error_with_sql),
MYSQL_SYSVAR(version),
MYSQL_SYSVAR(internal_xa_id_type),
MYSQL_SYSVAR(casual_read),
NULL
};
......
......@@ -374,3 +374,7 @@ uint spider_param_log_result_error_with_sql();
uint spider_param_internal_xa_id_type(
THD *thd
);
int spider_param_casual_read(
THD *thd,
int casual_read
);
......@@ -1792,6 +1792,7 @@ int spider_parse_connect_info(
#ifdef HA_CAN_FORCE_BULK_DELETE
share->force_bulk_delete = -1;
#endif
share->casual_read = -1;
#ifdef WITH_PARTITION_STORAGE_ENGINE
for (roop_count = 4; roop_count > 0; roop_count--)
......@@ -1921,6 +1922,7 @@ int spider_parse_connect_info(
#endif
SPIDER_PARAM_DOUBLE("civ", crd_interval, 0);
SPIDER_PARAM_INT_WITH_MAX("cmd", crd_mode, 0, 3);
SPIDER_PARAM_INT_WITH_MAX("csr", casual_read, 0, 63);
#ifdef WITH_PARTITION_STORAGE_ENGINE
SPIDER_PARAM_INT_WITH_MAX("csy", crd_sync, 0, 2);
#endif
......@@ -2111,6 +2113,7 @@ int spider_parse_connect_info(
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
SPIDER_PARAM_LONG_LIST_WITH_MAX("use_hs_read", use_hs_reads, 0, 1);
#endif
SPIDER_PARAM_INT_WITH_MAX("casual_read", casual_read, 0, 63);
error_num = ER_SPIDER_INVALID_CONNECT_INFO_NUM;
my_printf_error(error_num, ER_SPIDER_INVALID_CONNECT_INFO_STR,
MYF(0), tmp_ptr);
......@@ -3436,6 +3439,8 @@ int spider_set_connect_info_default(
if (share->force_bulk_delete == -1)
share->force_bulk_delete = 0;
#endif
if (share->casual_read == -1)
share->casual_read = 0;
if (share->bka_mode == -1)
share->bka_mode = 1;
if (!share->bka_engine)
......@@ -4331,6 +4336,7 @@ SPIDER_SHARE *spider_get_share(
#endif
#endif
&result_list->sql_kind_backup, sizeof(uint) * share->link_count,
&result_list->casual_read, sizeof(int) * share->link_count,
&spider->dbton_handler,
sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
NullS))
......@@ -4772,6 +4778,7 @@ SPIDER_SHARE *spider_get_share(
#endif
#endif
&result_list->sql_kind_backup, sizeof(uint) * share->link_count,
&result_list->casual_read, sizeof(int) * share->link_count,
&spider->dbton_handler,
sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
NullS))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment