Commit 61f561a8 authored by Sergei Golubchik's avatar Sergei Golubchik

mhnsw: return an error if lazy neighbor read failed

parent fe0f7d20
...@@ -74,6 +74,7 @@ class MHNSW_Context ...@@ -74,6 +74,7 @@ class MHNSW_Context
size_t vec_len= 0; size_t vec_len= 0;
size_t byte_len= 0; size_t byte_len= 0;
FVector *target= 0; FVector *target= 0;
uint err= 0;
Hash_set<FVectorNode> node_cache{PSI_INSTRUMENT_MEM, FVectorNode::get_key}; Hash_set<FVectorNode> node_cache{PSI_INSTRUMENT_MEM, FVectorNode::get_key};
...@@ -142,8 +143,8 @@ float FVectorNode::distance_to(const FVector &other) const ...@@ -142,8 +143,8 @@ float FVectorNode::distance_to(const FVector &other) const
int FVectorNode::instantiate_vector() int FVectorNode::instantiate_vector()
{ {
DBUG_ASSERT(vec == nullptr); DBUG_ASSERT(vec == nullptr);
if (int err= ctx->table->file->ha_rnd_pos(ctx->table->record[0], ref)) if ((ctx->err= ctx->table->file->ha_rnd_pos(ctx->table->record[0], ref)))
return err; return ctx->err;
String buf, *v= ctx->vec_field->val_str(&buf); String buf, *v= ctx->vec_field->val_str(&buf);
if (unlikely(ctx->byte_len == 0)) if (unlikely(ctx->byte_len == 0))
{ {
...@@ -173,13 +174,13 @@ int FVectorNode::instantiate_neighbors(size_t layer) ...@@ -173,13 +174,13 @@ int FVectorNode::instantiate_neighbors(size_t layer)
graph->field[0]->store(layer, false); graph->field[0]->store(layer, false);
graph->field[1]->store_binary(ref, ref_len); graph->field[1]->store_binary(ref, ref_len);
key_copy(key, graph->record[0], graph->key_info, graph->key_info->key_length); key_copy(key, graph->record[0], graph->key_info, graph->key_info->key_length);
if (int err= graph->file->ha_index_read_map(graph->record[0], key, if ((ctx->err= graph->file->ha_index_read_map(graph->record[0], key,
HA_WHOLE_KEY, HA_READ_KEY_EXACT)) HA_WHOLE_KEY, HA_READ_KEY_EXACT)))
return err; return ctx->err;
String strbuf, *str= graph->field[2]->val_str(&strbuf); String strbuf, *str= graph->field[2]->val_str(&strbuf);
if (str->length() % ref_len) if (str->length() % ref_len)
return HA_ERR_CRASHED; // should not happen, corrupted HNSW index return ctx->err= HA_ERR_CRASHED; // corrupted HNSW index
for (const char *pos= str->ptr(); pos < str->end(); pos+= ref_len) for (const char *pos= str->ptr(); pos < str->end(); pos+= ref_len)
neighbors[layer].push_back(ctx->get_node(pos), &ctx->root); neighbors[layer].push_back(ctx->get_node(pos), &ctx->root);
...@@ -249,11 +250,15 @@ static int select_neighbors(MHNSW_Context *ctx, size_t layer, ...@@ -249,11 +250,15 @@ static int select_neighbors(MHNSW_Context *ctx, size_t layer,
*/ */
List<FVectorNode> candidates= candidates_unsafe; List<FVectorNode> candidates= candidates_unsafe;
List<FVectorNode> &neighbors= target.get_neighbors(layer); List<FVectorNode> &neighbors= target.get_neighbors(layer);
if (ctx->err)
return ctx->err;
neighbors.empty(); neighbors.empty();
if (pq.init(10000, 0, cmp_vec, &target) || if (pq.init(10000, 0, cmp_vec, &target) ||
pq_discard.init(10000, 0, cmp_vec, &target)) pq_discard.init(10000, 0, cmp_vec, &target))
return HA_ERR_OUT_OF_MEM; return ctx->err= HA_ERR_OUT_OF_MEM;
for (const FVectorNode &candidate : candidates) for (const FVectorNode &candidate : candidates)
{ {
...@@ -293,10 +298,12 @@ static int select_neighbors(MHNSW_Context *ctx, size_t layer, ...@@ -293,10 +298,12 @@ static int select_neighbors(MHNSW_Context *ctx, size_t layer,
static int write_neighbors(MHNSW_Context *ctx, size_t layer, static int write_neighbors(MHNSW_Context *ctx, size_t layer,
const FVectorNode &source_node) const FVectorNode &source_node)
{ {
int err;
TABLE *graph= ctx->table->hlindex; TABLE *graph= ctx->table->hlindex;
const List<FVectorNode> &new_neighbors= source_node.get_neighbors(layer); const List<FVectorNode> &new_neighbors= source_node.get_neighbors(layer);
if (ctx->err)
return ctx->err;
size_t total_size= new_neighbors.elements * source_node.get_ref_len(); size_t total_size= new_neighbors.elements * source_node.get_ref_len();
// Allocate memory for the struct and the flexible array member // Allocate memory for the struct and the flexible array member
...@@ -315,23 +322,23 @@ static int write_neighbors(MHNSW_Context *ctx, size_t layer, ...@@ -315,23 +322,23 @@ static int write_neighbors(MHNSW_Context *ctx, size_t layer,
graph->field[2]->store_binary(neighbor_array_bytes, total_size); graph->field[2]->store_binary(neighbor_array_bytes, total_size);
if (source_node.is_new()) if (source_node.is_new())
err= graph->file->ha_write_row(graph->record[0]); ctx->err= graph->file->ha_write_row(graph->record[0]);
else else
{ {
uchar *key= static_cast<uchar*>(alloca(graph->key_info->key_length)); uchar *key= static_cast<uchar*>(alloca(graph->key_info->key_length));
key_copy(key, graph->record[0], graph->key_info, graph->key_info->key_length); key_copy(key, graph->record[0], graph->key_info, graph->key_info->key_length);
err= graph->file->ha_index_read_map(graph->record[1], key, ctx->err= graph->file->ha_index_read_map(graph->record[1], key,
HA_WHOLE_KEY, HA_READ_KEY_EXACT); HA_WHOLE_KEY, HA_READ_KEY_EXACT);
if (!err) if (!ctx->err)
{ {
err= graph->file->ha_update_row(graph->record[1], graph->record[0]); ctx->err= graph->file->ha_update_row(graph->record[1], graph->record[0]);
if (err == HA_ERR_RECORD_IS_THE_SAME) if (ctx->err == HA_ERR_RECORD_IS_THE_SAME)
err= 0; ctx->err= 0;
} }
} }
my_safe_afree(neighbor_array_bytes, total_size); my_safe_afree(neighbor_array_bytes, total_size);
return err; return ctx->err;
} }
...@@ -341,18 +348,19 @@ static int update_second_degree_neighbors(MHNSW_Context *ctx, size_t layer, ...@@ -341,18 +348,19 @@ static int update_second_degree_neighbors(MHNSW_Context *ctx, size_t layer,
{ {
for (const FVectorNode &neigh: node.get_neighbors(layer)) for (const FVectorNode &neigh: node.get_neighbors(layer))
{ {
neigh.get_neighbors(layer).push_back(&node, &ctx->root); List<FVectorNode> &neighneighbors= neigh.get_neighbors(layer);
if (neigh.get_neighbors(layer).elements > max_neighbors) if (ctx->err)
return ctx->err;
neighneighbors.push_back(&node, &ctx->root);
if (neighneighbors.elements > max_neighbors)
{ {
if (int err= select_neighbors(ctx, layer, neigh, if (select_neighbors(ctx, layer, neigh, neighneighbors, max_neighbors))
neigh.get_neighbors(layer), max_neighbors)) return ctx->err;
return err;
} }
if (int err= write_neighbors(ctx, layer, neigh)) if (write_neighbors(ctx, layer, neigh))
return err; return ctx->err;
} }
return ctx->err;
return 0;
} }
...@@ -360,8 +368,8 @@ static int update_neighbors(MHNSW_Context *ctx, size_t layer, ...@@ -360,8 +368,8 @@ static int update_neighbors(MHNSW_Context *ctx, size_t layer,
uint max_neighbors, const FVectorNode &node) uint max_neighbors, const FVectorNode &node)
{ {
// 1. update node's neighbors // 1. update node's neighbors
if (int err= write_neighbors(ctx, layer, node)) if (write_neighbors(ctx, layer, node))
return err; return ctx->err;
// 2. update node's neighbors' neighbors (shrink before update) // 2. update node's neighbors' neighbors (shrink before update)
return update_second_degree_neighbors(ctx, layer, max_neighbors, node); return update_second_degree_neighbors(ctx, layer, max_neighbors, node);
} }
...@@ -428,7 +436,7 @@ static int search_layer(MHNSW_Context *ctx, ...@@ -428,7 +436,7 @@ static int search_layer(MHNSW_Context *ctx,
while (best.elements()) while (best.elements())
result->push_front(best.pop(), &ctx->root); result->push_front(best.pop(), &ctx->root);
return 0; return ctx->err;
} }
...@@ -482,10 +490,11 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo) ...@@ -482,10 +490,11 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
SCOPE_EXIT([graph](){ graph->file->ha_index_end(); }); SCOPE_EXIT([graph](){ graph->file->ha_index_end(); });
if (int err= graph->file->ha_index_last(graph->record[0])) if ((ctx.err= graph->file->ha_index_last(graph->record[0])))
{ {
if (err != HA_ERR_END_OF_FILE) if (ctx.err != HA_ERR_END_OF_FILE)
return err; return ctx.err;
ctx.err= 0;
// First insert! // First insert!
FVectorNode target(&ctx, table->file->ref); FVectorNode target(&ctx, table->file->ref);
...@@ -519,16 +528,16 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo) ...@@ -519,16 +528,16 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
if (new_node_layer > max_layer) if (new_node_layer > max_layer)
{ {
if (int err= write_neighbors(&ctx, max_layer + 1, target)) if (write_neighbors(&ctx, max_layer + 1, target))
return err; return ctx.err;
new_node_layer= max_layer; new_node_layer= max_layer;
} }
else else
{ {
for (longlong cur_layer= max_layer; cur_layer > new_node_layer; cur_layer--) for (longlong cur_layer= max_layer; cur_layer > new_node_layer; cur_layer--)
{ {
if (int err= search_layer(&ctx, start_nodes, 1, cur_layer, &candidates)) if (search_layer(&ctx, start_nodes, 1, cur_layer, &candidates))
return err; return ctx.err;
start_nodes= candidates; start_nodes= candidates;
candidates.empty(); candidates.empty();
} }
...@@ -539,15 +548,14 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo) ...@@ -539,15 +548,14 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
uint max_neighbors= (cur_layer == 0) // heuristics from the paper uint max_neighbors= (cur_layer == 0) // heuristics from the paper
? thd->variables.mhnsw_max_edges_per_node * 2 ? thd->variables.mhnsw_max_edges_per_node * 2
: thd->variables.mhnsw_max_edges_per_node; : thd->variables.mhnsw_max_edges_per_node;
if (int err= search_layer(&ctx, start_nodes, ef_construction, cur_layer, if (search_layer(&ctx, start_nodes, ef_construction, cur_layer,
&candidates)) &candidates))
return err; return ctx.err;
if (int err= select_neighbors(&ctx, cur_layer, target, candidates, if (select_neighbors(&ctx, cur_layer, target, candidates, max_neighbors))
max_neighbors)) return ctx.err;
return err; if (update_neighbors(&ctx, cur_layer, max_neighbors, target))
if (int err= update_neighbors(&ctx, cur_layer, max_neighbors, target)) return ctx.err;
return err;
start_nodes= candidates; start_nodes= candidates;
candidates.empty(); candidates.empty();
} }
...@@ -576,8 +584,8 @@ int mhnsw_first(TABLE *table, KEY *keyinfo, Item *dist, ulonglong limit) ...@@ -576,8 +584,8 @@ int mhnsw_first(TABLE *table, KEY *keyinfo, Item *dist, ulonglong limit)
SCOPE_EXIT([graph](){ graph->file->ha_index_end(); }); SCOPE_EXIT([graph](){ graph->file->ha_index_end(); });
if (int err= graph->file->ha_index_last(graph->record[0])) if ((ctx.err= graph->file->ha_index_last(graph->record[0])))
return err; return ctx.err;
longlong max_layer= graph->field[0]->val_int(); longlong max_layer= graph->field[0]->val_int();
...@@ -610,14 +618,14 @@ int mhnsw_first(TABLE *table, KEY *keyinfo, Item *dist, ulonglong limit) ...@@ -610,14 +618,14 @@ int mhnsw_first(TABLE *table, KEY *keyinfo, Item *dist, ulonglong limit)
for (size_t cur_layer= max_layer; cur_layer > 0; cur_layer--) for (size_t cur_layer= max_layer; cur_layer > 0; cur_layer--)
{ {
if (int err= search_layer(&ctx, start_nodes, 1, cur_layer, &candidates)) if (search_layer(&ctx, start_nodes, 1, cur_layer, &candidates))
return err; return ctx.err;
start_nodes= candidates; start_nodes= candidates;
candidates.empty(); candidates.empty();
} }
if (int err= search_layer(&ctx, start_nodes, ef_search, 0, &candidates)) if (search_layer(&ctx, start_nodes, ef_search, 0, &candidates))
return err; return ctx.err;
size_t context_size=limit * h->ref_length + sizeof(ulonglong); size_t context_size=limit * h->ref_length + sizeof(ulonglong);
char *context= thd->alloc(context_size); char *context= thd->alloc(context_size);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment