Commit a39970cf authored by Sergei Golubchik's avatar Sergei Golubchik

MDEV-35032 streaming mode for mhnsw search

support SQL semantics for SELECT ... WHERE ... ORDER BY ... LIMIT

* switch from returning k nearest neighbors to returning
  as many as needed, in k-neighbor chunks, with increasing distance
* make search_layer() skips nodes that are closer than a threshold
* read_next keeps a search context - list of k found nodes,
  threshold, ctx, etc.
* when the list of found nodes is exhausted, it repeats the search
  starting from last found nodes and a threshold
* search context kepts ctx->refcount incremented, so ctx won't go away
* but commit_lock is unlocked between calls, so InnoDB can modify the table
* use ctx version to detect that, switch to MHNSW_Trx when it happens

bugfix:
* use the correct lock in ha_external_lock() for the graph table
* InnoDB didn't reset locks on ha_external_lock(F_UNLCK) and previous
  LOCK_X leaked into the next statement
parent 4a70df19
......@@ -228,6 +228,24 @@ id1 id2 vec_distance_euclidean(t1.v, t2.v)
9 8 1.25752
7 8 1.28823
8 7 1.28823
select id,vec_distance_euclidean(v, x'b047263C9F87233fcfd27e3eae493e3f0329f43e') d
from t1 order by d limit 9;
id d
9 0.4719976290006591
10 0.5069011044450041
3 0.5865673124650332
7 0.7344464697214867
5 0.7671033529042712
1 0.8625150112991553
2 0.8750391593753565
4 1.1588173344514852
6 1.228441526895331
select id,vec_distance_euclidean(v, x'b047263C9F87233fcfd27e3eae493e3f0329f43e') d
from t1 where id % 3 = 0 order by d limit 3;
id d
9 0.4719976290006591
3 0.5865673124650332
6 1.228441526895331
select * from (
select id,vec_distance_euclidean(v, x'b047263C9F87233fcfd27e3eae493e3f0329f43e') d
from t1 where id < 10
......@@ -235,6 +253,7 @@ from t1 where id < 10
id d
9 0.47199
3 0.58656
7 0.73444
flush session status;
select id,vec_distance_euclidean(v, x'B047263c9f87233fcfd27e3eae493e3f0329f43e') d from t1 order by d limit 3;
id d
......
......@@ -55,6 +55,12 @@ select id>0,vec_distance_euclidean(v, x'123456') d from t1 order by d limit 3;
--replace_regex /(\.\d{5})\d+/\1/
select t1.id as id1, t2.id as id2, vec_distance_euclidean(t1.v, t2.v) from t1, t1 as t2 order by 3,1,2;
# where clause
select id,vec_distance_euclidean(v, x'b047263C9F87233fcfd27e3eae493e3f0329f43e') d
from t1 order by d limit 9;
select id,vec_distance_euclidean(v, x'b047263C9F87233fcfd27e3eae493e3f0329f43e') d
from t1 where id % 3 = 0 order by d limit 3;
# subquery
--replace_regex /(\.\d{5})\d+/\1/
select * from (
......
......@@ -85,6 +85,23 @@ id d
6 0.69420
2 0.79716
9 0.82985
connect con2, localhost, root;
select get_lock('a:5', 10);
get_lock('a:5', 10)
1
connection default;
select id,vec_distance_euclidean(v, x'1f4d053f7056493f937da03dd8c97a3f220cbb3c926c1c3facca213ec0618a3e') d from t1
where get_lock(concat('a:', id), 10)+id < 11 order by d limit 3;
connection con2;
delete from t1 where id=2;
select release_all_locks();
release_all_locks()
1
connection default;
id d
6 0.93093
5 0.97063
2 1.09071
drop table t1, t2;
#
# MDEV-34989 After selecting from empty table with vector key the next insert hangs
......
......@@ -72,6 +72,28 @@ select id,vec_distance_euclidean(v, x'1f4d053f7056493f937da03dd8c97a3f220cbb3c92
--replace_regex /(\.\d{5})\d+/\1/
select id,vec_distance_euclidean(v, x'f618663f256be73e62cd453f8bcdbf3e16ae503c3858313f') d from t2 order by d limit 5;
### streaming
connect con2, localhost, root;
select get_lock('a:5', 10);
connection default;
#set innodb_snapshot_isolation=ON;
send select id,vec_distance_euclidean(v, x'1f4d053f7056493f937da03dd8c97a3f220cbb3c926c1c3facca213ec0618a3e') d from t1
where get_lock(concat('a:', id), 10)+id < 11 order by d limit 3;
connection con2;
let $wait_condition= select count(*) = 1 from information_schema.processlist where state = 'user lock';
source include/wait_condition.inc;
delete from t1 where id=2;
select release_all_locks();
connection default;
--replace_regex /(\.\d{5})\d+/\1/
reap;
drop table t1, t2;
--echo #
......
......@@ -9889,7 +9889,8 @@ int TABLE::hlindex_open(uint nr)
return 0;
hlindex->in_use= in_use; // mark in use for this query
hlindex->use_all_columns();
return hlindex->file->ha_external_lock(in_use, F_WRLCK);
return hlindex->file->ha_external_lock(in_use,
reginfo.lock_type < TL_FIRST_WRITE ? F_RDLCK : F_WRLCK);
}
int TABLE::open_hlindexes_for_write()
......@@ -9971,3 +9972,8 @@ int TABLE::hlindex_read_next()
{
return mhnsw_read_next(this);
}
int TABLE::hlindex_read_end()
{
return mhnsw_read_end(this);
}
......@@ -16008,6 +16008,8 @@ void JOIN_TAB::cleanup()
table->file->ha_end_keyread();
if (type == JT_FT)
table->file->ha_ft_end();
else if (table->hlindex && table->hlindex->context)
table->hlindex_read_end();
else
table->file->ha_index_or_rnd_end();
preread_init_done= FALSE;
......@@ -1796,6 +1796,7 @@ struct TABLE
int hlindex_open(uint nr);
int hlindex_read_first(uint nr, Item *item, ulonglong limit);
int hlindex_read_next();
int hlindex_read_end();
int open_hlindexes_for_write();
int hlindexes_on_insert();
......
......@@ -24,6 +24,9 @@
#include <my_atomic_wrapper.h>
#include "bloom_filters.h"
// distance can be a little bit < 0 because of fast math
static constexpr float NEAREST = -1.0f;
// Algorithm parameters
static constexpr float alpha = 1.1f;
static constexpr uint ef_construction= 10;
......@@ -329,6 +332,7 @@ class MHNSW_Share : public Sql_alloc
Hash_set<FVectorNode> node_cache{PSI_INSTRUMENT_MEM, FVectorNode::get_key};
public:
ulonglong version= 0; // protected by commit_lock
mysql_rwlock_t commit_lock;
size_t vec_len= 0;
size_t byte_len= 0;
......@@ -416,6 +420,14 @@ class MHNSW_Share : public Sql_alloc
this->~MHNSW_Share(); // XXX reuse
}
virtual MHNSW_Share *dup(bool can_commit)
{
refcnt++;
if (can_commit)
mysql_rwlock_rdlock(&commit_lock);
return this;
}
FVectorNode *get_node(const void *gref)
{
mysql_mutex_lock(&cache_lock);
......@@ -496,6 +508,8 @@ class MHNSW_Trx : public MHNSW_Share
reset(nullptr);
}
virtual MHNSW_Share *dup(bool) override { return this; }
static MHNSW_Trx *get_from_thd(TABLE *table, bool for_update);
// it's okay in a transaction-local cache, there's no concurrent access
......@@ -556,6 +570,7 @@ int MHNSW_Trx::do_commit(THD *thd, bool)
if (ctx)
{
mysql_rwlock_wrlock(&ctx->commit_lock);
ctx->version++;
if (trx->list_of_nodes_is_lost)
ctx->reset(trx->table_share);
else
......@@ -972,7 +987,7 @@ static inline float generous_furthest(const Queue<Visited> &q, float maxd, float
@param[in/out] inout in: start nodes, out: result nodes
*/
static int search_layer(MHNSW_Share *ctx, TABLE *graph, const FVector *target,
uint result_size,
float threshold, uint result_size,
size_t layer, Neighborhood *inout, bool construction)
{
DBUG_ASSERT(inout->num > 0);
......@@ -1009,9 +1024,10 @@ static int search_layer(MHNSW_Share *ctx, TABLE *graph, const FVector *target,
for (size_t i=0; i < inout->num; i++)
{
Visited *v= visited.create(inout->links[i]);
DBUG_ASSERT(v->distance_to_target >= threshold);
max_distance= std::max(max_distance, v->distance_to_target);
candidates.push(v);
if (skip_deleted && v->node->deleted)
if ((skip_deleted && v->node->deleted) || threshold > NEAREST)
continue;
best.push(v);
}
......@@ -1041,6 +1057,8 @@ static int search_layer(MHNSW_Share *ctx, TABLE *graph, const FVector *target,
if (int err= links[i]->load(graph))
return err;
Visited *v= visited.create(links[i]);
if (v->distance_to_target <= threshold)
continue;
if (!best.is_full())
{
max_distance= std::max(max_distance, v->distance_to_target);
......@@ -1161,7 +1179,7 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
for (cur_layer= max_layer; cur_layer > target_layer; cur_layer--)
{
if (int err= search_layer(ctx, graph, target->vec,
if (int err= search_layer(ctx, graph, target->vec, NEAREST,
1, cur_layer, &candidates, false))
return err;
}
......@@ -1169,7 +1187,7 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
for (; cur_layer >= 0; cur_layer--)
{
uint max_neighbors= ctx->max_neighbors(cur_layer);
if (int err= search_layer(ctx, graph, target->vec,
if (int err= search_layer(ctx, graph, target->vec, NEAREST,
max_neighbors, cur_layer, &candidates, true))
return err;
......@@ -1196,6 +1214,19 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
}
struct Search_context: public Sql_alloc
{
Neighborhood found;
MHNSW_Share *ctx;
const FVector *target;
ulonglong ctx_version;
size_t pos= 0;
float threshold= NEAREST/2;
Search_context(Neighborhood *n, MHNSW_Share *s, const FVector *v)
: found(*n), ctx(s->dup(false)), target(v), ctx_version(ctx->version) {}
};
int mhnsw_read_first(TABLE *table, KEY *keyinfo, Item *dist, ulonglong limit)
{
THD *thd= table->in_use;
......@@ -1240,47 +1271,82 @@ int mhnsw_read_first(TABLE *table, KEY *keyinfo, Item *dist, ulonglong limit)
if (int err= graph->file->ha_rnd_init(0))
return err;
SCOPE_EXIT([graph](){ graph->file->ha_rnd_end(); });
for (size_t cur_layer= max_layer; cur_layer > 0; cur_layer--)
{
if (int err= search_layer(ctx, graph, target,
if (int err= search_layer(ctx, graph, target, NEAREST,
1, cur_layer, &candidates, false))
return err;
}
if (int err= search_layer(ctx, graph, target,
if (int err= search_layer(ctx, graph, target, NEAREST,
static_cast<uint>(limit), 0, &candidates, false))
return err;
if (limit > candidates.num)
limit= candidates.num;
size_t context_size= limit * ctx->tref_len + sizeof(ulonglong);
char *context= thd->alloc(context_size);
graph->context= context;
*(ulonglong*)context= limit;
context+= context_size;
for (size_t i=0; limit--; i++)
{
context-= ctx->tref_len;
memcpy(context, candidates.links[i]->tref(), ctx->tref_len);
}
DBUG_ASSERT(context - sizeof(ulonglong) == graph->context);
auto result= new (thd->mem_root) Search_context(&candidates, ctx, target);
graph->context= result;
return mhnsw_read_next(table);
}
int mhnsw_read_next(TABLE *table)
{
uchar *ref= (uchar*)(table->hlindex->context);
if (ulonglong *limit= (ulonglong*)ref)
auto result= static_cast<Search_context*>(table->hlindex->context);
if (result->pos < result->found.num)
{
ref+= sizeof(ulonglong) + (--*limit) * table->file->ref_length;
uchar *ref= result->found.links[result->pos++]->tref();
return table->file->ha_rnd_pos(table->record[0], ref);
}
return my_errno= HA_ERR_END_OF_FILE;
if (!result->found.num)
return my_errno= HA_ERR_END_OF_FILE;
TABLE *graph= table->hlindex;
MHNSW_Share *ctx= result->ctx->dup(table->file->has_transactions());
SCOPE_EXIT([&ctx, table](){ ctx->release(table); });
if (ctx->version != result->ctx_version)
{
// oops, shared ctx was modified, need to switch to MHNSW_Trx
MHNSW_Share *trx;
graph->file->ha_rnd_end();
int err= MHNSW_Share::acquire(&trx, table, true);
SCOPE_EXIT([&trx, table](){ trx->release(table); });
if (int err2= graph->file->ha_rnd_init(0))
err= err ? err : err2;
if (err)
return err;
for (size_t i=0; i < result->found.num; i++)
{
FVectorNode *node= trx->get_node(result->found.links[i]->gref());
if (!node)
return my_errno= HA_ERR_OUT_OF_MEM;
if ((err= node->load(graph)))
return err;
result->found.links[i]= node;
}
ctx->release(false, table->s); // release shared ctx
result->ctx= trx; // replace it with trx
result->ctx_version= trx->version;
std::swap(trx, ctx); // free shared ctx in this scope, keep trx
}
float new_threshold= result->found.links[result->found.num-1]->distance_to(result->target);
if (int err= search_layer(ctx, graph, result->target, result->threshold,
result->pos, 0, &result->found, false))
return err;
result->pos= 0;
result->threshold= new_threshold;
return mhnsw_read_next(table);
}
int mhnsw_read_end(TABLE *table)
{
auto result= static_cast<Search_context*>(table->hlindex->context);
result->ctx->release(false, table->s);
table->hlindex->context= 0;
table->hlindex->file->ha_rnd_end();
return 0;
}
void mhnsw_free(TABLE_SHARE *share)
......
......@@ -29,6 +29,7 @@ const LEX_CSTRING mhnsw_hlindex_table_def(THD *thd, uint ref_length);
int mhnsw_insert(TABLE *table, KEY *keyinfo);
int mhnsw_read_first(TABLE *table, KEY *keyinfo, Item *dist, ulonglong limit);
int mhnsw_read_next(TABLE *table);
int mhnsw_read_end(TABLE *table);
int mhnsw_invalidate(TABLE *table, const uchar *rec, KEY *keyinfo);
int mhnsw_delete_all(TABLE *table, KEY *keyinfo, bool truncate);
void mhnsw_free(TABLE_SHARE *share);
......
......@@ -16150,6 +16150,8 @@ ha_innobase::external_lock(
}
/* MySQL is releasing a table lock */
m_prebuilt->select_lock_type = LOCK_NONE;
m_prebuilt->stored_select_lock_type = LOCK_NONE;
trx->n_mysql_tables_in_use--;
m_mysql_has_locked = false;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment