Commit d7a9cdc6 authored by Oleksandr Byelkin's avatar Oleksandr Byelkin Committed by Monty

Fixed hang in Aria page cache with concurrent SELECT

MDEV-20302 Server hangs upon concurrent SELECT from partitioned S3
parent b3179b7e
...@@ -20,3 +20,30 @@ connection con1; ...@@ -20,3 +20,30 @@ connection con1;
disconnect con1; disconnect con1;
connection default; connection default;
DROP TABLE t1; DROP TABLE t1;
#
# MDEV-20302 Server hangs upon concurrent SELECT from partitioned S3
# table
#
CREATE TABLE t1 (
pk INT AUTO_INCREMENT,
c CHAR(12),
PRIMARY KEY(pk),
KEY(c)
) ENGINE=Aria
PARTITION BY KEY(pk) PARTITIONS 2;
CREATE VIEW v1 AS SELECT * FROM t1;
INSERT INTO t1 VALUES (NULL,'ill'),(NULL,'loop');
ALTER TABLE t1 ENGINE=S3;
connect con1,localhost,root,,test;
SELECT * FROM t1 WHERE c BETWEEN 'bar' AND 'foo';
connection default;
SELECT pk FROM v1;
pk
1
2
connection con1;
pk c
disconnect con1;
connection default;
DROP VIEW v1;
DROP TABLE t1;
--source include/have_s3.inc --source include/have_s3.inc
--source include/have_partition.inc
--source create_database.inc --source create_database.inc
--echo # --echo #
...@@ -26,6 +27,38 @@ SELECT * FROM t1; ...@@ -26,6 +27,38 @@ SELECT * FROM t1;
--connection default --connection default
DROP TABLE t1; DROP TABLE t1;
--echo #
--echo # MDEV-20302 Server hangs upon concurrent SELECT from partitioned S3
--echo # table
--echo #
CREATE TABLE t1 (
pk INT AUTO_INCREMENT,
c CHAR(12),
PRIMARY KEY(pk),
KEY(c)
) ENGINE=Aria
PARTITION BY KEY(pk) PARTITIONS 2;
CREATE VIEW v1 AS SELECT * FROM t1;
INSERT INTO t1 VALUES (NULL,'ill'),(NULL,'loop');
ALTER TABLE t1 ENGINE=S3;
--connect (con1,localhost,root,,test)
--send
SELECT * FROM t1 WHERE c BETWEEN 'bar' AND 'foo';
--connection default
SELECT pk FROM v1;
--connection con1
--reap
--disconnect con1
--connection default
DROP VIEW v1;
DROP TABLE t1;
# #
# clean up # clean up
# #
......
...@@ -130,7 +130,7 @@ my_bool my_disable_flush_pagecache_blocks= 0; ...@@ -130,7 +130,7 @@ my_bool my_disable_flush_pagecache_blocks= 0;
#define COND_FOR_REQUESTED 0 /* queue of thread waiting for read operation */ #define COND_FOR_REQUESTED 0 /* queue of thread waiting for read operation */
#define COND_FOR_SAVED 1 /* queue of thread waiting for flush */ #define COND_FOR_SAVED 1 /* queue of thread waiting for flush */
#define COND_FOR_WRLOCK 2 /* queue of write lock */ #define COND_FOR_WRLOCK 2 /* queue of write lock */
#define COND_FOR_BIG_BLOCK 3 /* queue of waiting fo big block read */ #define COND_FOR_BIG_BLOCK 3 /* queue of waiting for big block read */
#define COND_SIZE 4 /* number of COND_* queues */ #define COND_SIZE 4 /* number of COND_* queues */
typedef mysql_cond_t KEYCACHE_CONDVAR; typedef mysql_cond_t KEYCACHE_CONDVAR;
...@@ -178,7 +178,9 @@ struct st_pagecache_hash_link ...@@ -178,7 +178,9 @@ struct st_pagecache_hash_link
#define PCBLOCK_CHANGED 32 /* block buffer contains a dirty page */ #define PCBLOCK_CHANGED 32 /* block buffer contains a dirty page */
#define PCBLOCK_DIRECT_W 64 /* possible direct write to the block */ #define PCBLOCK_DIRECT_W 64 /* possible direct write to the block */
#define PCBLOCK_DEL_WRITE 128 /* should be written on delete */ #define PCBLOCK_DEL_WRITE 128 /* should be written on delete */
#define PCBLOCK_BIG_READ 256 /* the first block of the big read in progress */ #define PCBLOCK_BIG_READ 256 /* the first block of the big read in progress
or not first block which other thread wait
to be read in big read operation */
/* page status, returned by find_block */ /* page status, returned by find_block */
#define PAGE_READ 0 #define PAGE_READ 0
...@@ -2770,7 +2772,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, ...@@ -2770,7 +2772,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache,
*/ */
#ifdef WITH_S3_STORAGE_ENGINE #ifdef WITH_S3_STORAGE_ENGINE
static my_bool read_big_block(PAGECACHE *pagecache, static void read_big_block(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK *block) PAGECACHE_BLOCK_LINK *block)
{ {
int page_st; int page_st;
...@@ -2809,25 +2811,26 @@ static my_bool read_big_block(PAGECACHE *pagecache, ...@@ -2809,25 +2811,26 @@ static my_bool read_big_block(PAGECACHE *pagecache,
if (block_to_read->status & PCBLOCK_ERROR) if (block_to_read->status & PCBLOCK_ERROR)
{ {
/* We get first block with an error so all operation failed */ /* We get first block with an error so all operation failed */
block->status|= PCBLOCK_ERROR; goto error;
block->error= block_to_read->error;
DBUG_RETURN(FALSE); // no retry
} }
// only primary request here, PAGE_WAIT_TO_BE_READ is impossible
DBUG_ASSERT(page_st != PAGE_WAIT_TO_BE_READ);
if (block_to_read->status & PCBLOCK_BIG_READ) if (block_to_read->status & PCBLOCK_BIG_READ)
{ {
/*
Other thread is reading the big block so we will wait when it will
have read our block for us
*/
struct st_my_thread_var *thread; struct st_my_thread_var *thread;
DBUG_ASSERT(page_st == PAGE_WAIT_TO_BE_READ);
DBUG_ASSERT(page_st != PAGE_TO_BE_READ); DBUG_ASSERT(page_st != PAGE_TO_BE_READ);
block->status|= PCBLOCK_BIG_READ; // will be read by other thread
/* /*
Block read failed because somebody else is reading the first block Block read failed because somebody else is reading the first block
(and all other blocks part of this one). (and all other blocks part of this one).
Wait until block is available. Wait until block is available.
*/ */
unreg_request(pagecache, block, 1);
thread= my_thread_var; thread= my_thread_var;
/* Put the request into a queue and wait until it can be processed */ /* Put the request into a queue and wait until it can be processed */
wqueue_add_to_queue(&block->wqueue[COND_FOR_BIG_BLOCK], thread); wqueue_add_to_queue(&block_to_read->wqueue[COND_FOR_BIG_BLOCK], thread);
do do
{ {
DBUG_PRINT("wait", DBUG_PRINT("wait",
...@@ -2837,7 +2840,21 @@ static my_bool read_big_block(PAGECACHE *pagecache, ...@@ -2837,7 +2840,21 @@ static my_bool read_big_block(PAGECACHE *pagecache,
&pagecache->cache_lock); &pagecache->cache_lock);
} }
while (thread->next); while (thread->next);
DBUG_RETURN(TRUE); // page shoud be read by other thread
DBUG_ASSERT(block->status & PCBLOCK_READ ||
block->status & PCBLOCK_ERROR);
DBUG_ASSERT(block->status & PCBLOCK_BIG_READ);
block->status&= ~PCBLOCK_BIG_READ;
// all is read => lets finish nice
DBUG_ASSERT(block_to_read != block);
remove_reader(block_to_read);
unreg_request(pagecache, block_to_read, 1);
DBUG_VOID_RETURN;
}
else
{
// only primary request here, PAGE_WAIT_TO_BE_READ is impossible
DBUG_ASSERT(page_st != PAGE_WAIT_TO_BE_READ);
} }
} }
else else
...@@ -2863,18 +2880,9 @@ static my_bool read_big_block(PAGECACHE *pagecache, ...@@ -2863,18 +2880,9 @@ static my_bool read_big_block(PAGECACHE *pagecache,
{ {
pagecache_pthread_mutex_lock(&pagecache->cache_lock); pagecache_pthread_mutex_lock(&pagecache->cache_lock);
block_to_read->status|= PCBLOCK_ERROR; block_to_read->status|= PCBLOCK_ERROR;
block->status|= PCBLOCK_ERROR; block_to_read->error= (int16) my_errno;
block_to_read->error= block->error= (int16) my_errno;
pagecache->big_block_free(&data); pagecache->big_block_free(&data);
if (block_to_read != block) goto error;
{
remove_reader(block_to_read);
unreg_request(pagecache, block_to_read, 1);
}
/* Signal that all pending requests for this page now can be processed */
if (block->wqueue[COND_FOR_REQUESTED].last_thread)
wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
DBUG_RETURN(FALSE); // no retry
} }
/* /*
...@@ -2914,12 +2922,18 @@ static my_bool read_big_block(PAGECACHE *pagecache, ...@@ -2914,12 +2922,18 @@ static my_bool read_big_block(PAGECACHE *pagecache,
TRUE /*register*/, TRUE /*fast*/, &page_st); TRUE /*register*/, TRUE /*fast*/, &page_st);
if (!bl) if (!bl)
{ {
// we run out of easy avaliable pages in the cache /*
break; We can not get this page easy.
Maybe we will be lucky with other pages,
also among other pages can be page which wated by other thread
*/
continue;
} }
DBUG_ASSERT(bl == bl->hash_link->block); DBUG_ASSERT(bl == bl->hash_link->block);
if ((bl->status & PCBLOCK_ERROR) == 0 && if ((bl->status & PCBLOCK_ERROR) == 0 &&
page_st == PAGE_TO_BE_READ) (page_st == PAGE_TO_BE_READ || // page shoud be read
(page_st == PAGE_WAIT_TO_BE_READ &&
(bl->status & PCBLOCK_BIG_READ)))) // or page waited by other thread
{ {
memcpy(bl->buffer, data.str + offset, pagecache->block_size); memcpy(bl->buffer, data.str + offset, pagecache->block_size);
bl->status|= PCBLOCK_READ; bl->status|= PCBLOCK_READ;
...@@ -2940,6 +2954,8 @@ static my_bool read_big_block(PAGECACHE *pagecache, ...@@ -2940,6 +2954,8 @@ static my_bool read_big_block(PAGECACHE *pagecache,
pagecache->big_block_free(&data); pagecache->big_block_free(&data);
block_to_read->status&= ~PCBLOCK_BIG_READ; block_to_read->status&= ~PCBLOCK_BIG_READ;
end:
if (block_to_read != block) if (block_to_read != block)
{ {
remove_reader(block_to_read); remove_reader(block_to_read);
...@@ -2947,10 +2963,56 @@ static my_bool read_big_block(PAGECACHE *pagecache, ...@@ -2947,10 +2963,56 @@ static my_bool read_big_block(PAGECACHE *pagecache,
} }
if (block->wqueue[COND_FOR_BIG_BLOCK].last_thread) if (block->wqueue[COND_FOR_BIG_BLOCK].last_thread)
wqueue_release_queue(&block->wqueue[COND_FOR_BIG_BLOCK]); wqueue_release_queue(&block->wqueue[COND_FOR_BIG_BLOCK]);
/* Signal that all pending requests for this page now can be processed */
if (block->wqueue[COND_FOR_REQUESTED].last_thread) if (block->wqueue[COND_FOR_REQUESTED].last_thread)
wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]); wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
DBUG_VOID_RETURN;
DBUG_RETURN(FALSE); error:
/*
Read failed. Mark all readers waiting for the a block covered by the
big block that the read failed
*/
for (offset= pagecache->block_size, page= page_to_read + 1;
offset < data.length;
offset+= pagecache->block_size, page++)
{
DBUG_ASSERT(offset + pagecache->block_size <= data.length);
if (page == our_page)
{
DBUG_ASSERT(!(block->status & PCBLOCK_READ));
block->status|= PCBLOCK_ERROR;
block->error= (int16) my_errno;
}
else
{
PAGECACHE_BLOCK_LINK *bl;
bl= find_block(pagecache, &block->hash_link->file, page, 1,
FALSE, TRUE /* copy under protection (?)*/,
TRUE /*register*/, TRUE /*fast*/, &page_st);
if (!bl)
{
/*
We can not get this page easy.
Maybe we will be lucky with other pages,
also among other pages can be page which wated by other thread
*/
continue;
}
DBUG_ASSERT(bl == bl->hash_link->block);
if ((bl->status & PCBLOCK_ERROR) == 0 &&
(page_st == PAGE_TO_BE_READ || // page shoud be read
(page_st == PAGE_WAIT_TO_BE_READ &&
(bl->status & PCBLOCK_BIG_READ)))) // or page waited by other thread
{
bl->status|= PCBLOCK_ERROR;
bl->error= (int16) my_errno;
}
remove_reader(bl);
unreg_request(pagecache, bl, 1);
}
}
goto end;
} }
#endif /* WITH_S3_STORAGE_ENGINE */ #endif /* WITH_S3_STORAGE_ENGINE */
...@@ -3706,14 +3768,8 @@ uchar *pagecache_read(PAGECACHE *pagecache, ...@@ -3706,14 +3768,8 @@ uchar *pagecache_read(PAGECACHE *pagecache,
/* It is big read and this thread should read */ /* It is big read and this thread should read */
DBUG_ASSERT(page_st == PAGE_TO_BE_READ); DBUG_ASSERT(page_st == PAGE_TO_BE_READ);
if (read_big_block(pagecache, block)) read_big_block(pagecache, block);
{
/* block is unregistered in read_big_block */
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
dec_counter_for_resize_op(pagecache);
DBUG_PRINT("restart", ("big block fail, restarting..."));
goto restart;
}
if (!((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) || if (!((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
(new_pin == PAGECACHE_PIN))) (new_pin == PAGECACHE_PIN)))
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment