Commit 045757af authored by Thirunarayanan Balathandayuthapani's avatar Thirunarayanan Balathandayuthapani Committed by Marko Mäkelä

MDEV-24621 In bulk insert, pre-sort and build indexes one page at a time

When inserting a number of rows into an empty table,
InnoDB will buffer and pre-sort the records for each index, and
build the indexes one page at a time.

For each index, a buffer of innodb_sort_buffer_size will be created.

If the buffer ran out of memory then we will create temporary files
for storing the data.

At the end of the statement, we will sort and apply the buffered
records. Ideally, we would do this at the end of the transaction
or only when starting to execute a non-INSERT statement on the table.
However, it could be awkward if duplicate keys or similar errors
would be reported during the execution of a later statement.
This will be addressed in MDEV-25036.

Any columns longer than 2000 bytes will buffered in temporary files.

innodb_prepare_commit_versioned(): Apply all bulk buffered insert
operation, at the end of each statement.

ha_commit_trans(): Handle errors from innodb_prepare_commit_versioned().

row_merge_buf_write(): This function should accept blob
file handle too and it should write the field data which are
greater than 2000 bytes

row_merge_bulk_t: Data structure to maintain the data during
bulk insert operation.

trx_mod_table_time_t::start_bulk_insert(): Notify the start of
bulk insert operation and create new buffer for the given table

trx_mod_table_time_t::add_tuple(): Buffer a record.

trx_mod_table_time_t::write_bulk(): Do bulk insert operation
present in the transaction

trx_mod_table_time_t::bulk_buffer_exist(): Whether the buffer
storage exist for the bulk transaction

trx_mod_table_time_t::write_bulk(): Write all buffered insert
operation for the transaction and the table.

row_ins_clust_index_entry_low(): Insert the data into the
bulk buffer if it is already exist.

row_ins_sec_index_entry(): Insert the secondary tuple
if the bulk buffer already exist.

row_merge_bulk_buf_add(): Insert the tuple into bulk buffer
insert operation.

row_merge_buf_blob(): Write the field data whose length is
more than 2000 bytes into blob temporary file. Write the
file offset and length into the tuple field.

row_merge_copy_blob_from_file(): Copy the blob from blob file
handler based on reference of the given tuple.

row_merge_insert_index_tuples(): Handle blob for bulk insert
operation.

row_merge_bulk_t::row_merge_bulk_t(): Constructor. Initialize
the buffer and file for all the indexes expect fts index.

row_merge_bulk_t::create_tmp_file(): Create new temporary file
for the given index.

row_merge_bulk_t::write_to_tmp_file(): Write the content from
buffer to disk file for the given index.

row_merge_bulk_t::add_tuple(): Insert the tuple into the merge
buffer for the given index. If the memory ran out then InnoDB
should sort the buffer and write into file.

row_merge_bulk_t::write_to_index(): Do bulk insert operation
from merge file/merge buffer for the given index

row_merge_bulk_t::write_to_table(): Do bulk insert operation
for all the indexes.

dict_stats_update(): If a bulk insert transaction is in progress,
treat the table as empty. The index creation could hold latches
for extended amounts of time.
parent c8e309a6
......@@ -1746,6 +1746,13 @@ int ha_commit_trans(THD *thd, bool all)
if (ha_info->ht()->prepare_commit_versioned)
{
trx_end_id= ha_info->ht()->prepare_commit_versioned(thd, &trx_start_id);
if (trx_end_id == ULONGLONG_MAX)
{
my_error(ER_ERROR_DURING_COMMIT, MYF(0), 1);
goto err;
}
if (trx_end_id)
break; // FIXME: use a common ID for cross-engine transactions
}
......
......@@ -3820,6 +3820,13 @@ dict_stats_update(
return(DB_SUCCESS);
}
if (trx_id_t bulk_trx_id = table->bulk_trx_id) {
if (trx_sys.find(nullptr, bulk_trx_id, false)) {
dict_stats_empty_table(table, false);
return DB_SUCCESS;
}
}
switch (stats_upd_option) {
case DICT_STATS_RECALC_PERSISTENT:
......
......@@ -3686,23 +3686,36 @@ static const char* ha_innobase_exts[] = {
@retval 0 if no system-versioned data was affected by the transaction */
static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id)
{
if (const trx_t* trx = thd_to_trx(thd)) {
*trx_id = trx->id;
for (const auto& t : trx->mod_tables) {
if (t.second.is_versioned()) {
DBUG_ASSERT(t.first->versioned_by_id());
DBUG_ASSERT(trx->rsegs.m_redo.rseg);
if (trx_t *trx= thd_to_trx(thd))
{
*trx_id= trx->id;
bool versioned= false;
return trx_sys.get_new_trx_id();
}
}
for (auto &t : trx->mod_tables)
{
if (t.second.is_versioned())
{
DBUG_ASSERT(t.first->versioned_by_id());
DBUG_ASSERT(trx->rsegs.m_redo.rseg);
versioned= true;
if (!trx->bulk_insert)
break;
}
if (t.second.is_bulk_insert())
{
ut_ad(trx->bulk_insert);
ut_ad(!trx->check_unique_secondary);
ut_ad(!trx->check_foreigns);
if (t.second.write_bulk(t.first, trx))
return ULONGLONG_MAX;
}
}
return 0;
}
return versioned ? trx_sys.get_new_trx_id() : 0;
}
*trx_id = 0;
return 0;
*trx_id= 0;
return 0;
}
/** Initialize and normalize innodb_buffer_pool_size. */
......@@ -15650,6 +15663,7 @@ ha_innobase::extra(
row_ins_duplicate_error_in_clust() will acquire a
shared lock instead of an exclusive lock. */
stmt_boundary:
trx->bulk_insert_apply();
trx->end_bulk_insert(*m_prebuilt->table);
trx->bulk_insert = false;
break;
......@@ -15670,6 +15684,9 @@ ha_innobase::extra(
if (trx->is_bulk_insert()) {
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
if (dberr_t err = trx->bulk_insert_apply()) {
return err;
}
break;
}
goto stmt_boundary;
......
......@@ -266,15 +266,16 @@ row_merge_build_indexes(
bool allow_non_null)
MY_ATTRIBUTE((warn_unused_result));
/********************************************************************//**
Write a buffer to a block. */
void
row_merge_buf_write(
/*================*/
const row_merge_buf_t* buf, /*!< in: sorted buffer */
const merge_file_t* of, /*!< in: output file */
row_merge_block_t* block) /*!< out: buffer for writing to file */
MY_ATTRIBUTE((nonnull));
/** Write a buffer to a block.
@param buf sorted buffer
@param block buffer for writing to file
@param blob_file blob file handle for doing bulk insert operation */
dberr_t row_merge_buf_write(const row_merge_buf_t *buf,
#ifndef DBUG_OFF
const merge_file_t *of, /*!< output file */
#endif
row_merge_block_t *block,
merge_file_t *blob_file= nullptr);
/********************************************************************//**
Sort a buffer. */
......@@ -409,4 +410,77 @@ row_merge_read_rec(
row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
ulint space) /*!< in: space id */
MY_ATTRIBUTE((warn_unused_result));
/** Buffer for bulk insert */
class row_merge_bulk_t
{
/** Buffer for each index in the table. main memory
buffer for sorting the index */
row_merge_buf_t *m_merge_buf;
/** Block for IO operation */
row_merge_block_t *m_block= nullptr;
/** File to store the buffer and used for merge sort */
merge_file_t *m_merge_files= nullptr;
/** Temporary file to be used for merge sort */
pfs_os_file_t m_tmpfd;
/** Allocate memory for merge file data structure */
ut_allocator<row_merge_block_t> m_alloc;
/** Storage for description for the m_alloc */
ut_new_pfx_t m_block_pfx;
/** Temporary file to store the blob */
merge_file_t m_blob_file;
public:
/** Constructor.
Create all merge files, merge buffer for all the table indexes
expect fts indexes.
Create a merge block which is used to write IO operation
@param table table which undergoes bulk insert operation */
row_merge_bulk_t(dict_table_t *table);
/** Destructor.
Remove all merge files, merge buffer for all table indexes. */
~row_merge_bulk_t();
/** Remove all buffer for the table indexes */
void remove_all_bulk_buffer();
/** Clean the merge buffer for the given index number */
void clean_bulk_buffer(ulint index_no);
/** Create the temporary file for the given index number
@retval true if temporary file creation went well */
bool create_tmp_file(ulint index_no);
/** Write the merge buffer to the tmp file for the given
index number.
@param index_no buffer to be written for the index */
dberr_t write_to_tmp_file(ulint index_no);
/** Add the tuple to the merge buffer for the given index.
If the buffer ran out of memory then write the buffer into
the temporary file and do insert the tuple again.
@param row tuple to be inserted
@param ind index to be buffered
@param trx bulk transaction */
dberr_t bulk_insert_buffered(const dtuple_t &row, const dict_index_t &ind,
trx_t *trx);
/** Do bulk insert operation into the index tree from
buffer or merge file if exists
@param index_no index to be inserted
@param trx bulk transaction */
dberr_t write_to_index(ulint index_no, trx_t *trx);
/** Do bulk insert for the buffered insert for the table.
@param table table which undergoes for bulk insert operation
@param trx bulk transaction */
dberr_t write_to_table(dict_table_t *table, trx_t *trx);
/** Allocate block for writing the buffer into disk */
dberr_t alloc_block();
/** Init temporary files for each index */
void init_tmp_file();
};
#endif /* row0merge.h */
......@@ -194,7 +194,7 @@ trx_undo_report_row_operation(
const rec_offs* offsets, /*!< in: rec_get_offsets(rec) */
roll_ptr_t* roll_ptr) /*!< out: DB_ROLL_PTR to the
undo log record */
MY_ATTRIBUTE((nonnull(1,2,8), warn_unused_result));
MY_ATTRIBUTE((nonnull(1,2), warn_unused_result));
/** status bit used for trx_undo_prev_version_build() */
......
......@@ -36,6 +36,7 @@ Created 3/26/1996 Heikki Tuuri
#include "fts0fts.h"
#include "read0types.h"
#include "ilist.h"
#include "row0merge.h"
#include <vector>
......@@ -435,6 +436,9 @@ class trx_mod_table_time_t
/** First modification of a system versioned column
(NONE= no versioning, BULK= the table was dropped) */
undo_no_t first_versioned= NONE;
/** Buffer to store insert opertion */
row_merge_bulk_t *bulk_store= nullptr;
public:
/** Constructor
@param rows number of modified rows so far */
......@@ -468,8 +472,14 @@ class trx_mod_table_time_t
first_versioned= BULK;
}
/** Notify the start of a bulk insert operation */
void start_bulk_insert() { first|= BULK; }
/** Notify the start of a bulk insert operation
@param table table to do bulk operation */
void start_bulk_insert(dict_table_t *table)
{
first|= BULK;
if (!table->is_temporary())
bulk_store= new row_merge_bulk_t(table);
}
/** Notify the end of a bulk insert operation */
void end_bulk_insert() { first&= ~BULK; }
......@@ -489,6 +499,36 @@ class trx_mod_table_time_t
first_versioned= NONE;
return false;
}
/** Add the tuple to the transaction bulk buffer for the given index.
@param entry tuple to be inserted
@param index bulk insert for the index
@param trx transaction */
dberr_t bulk_insert_buffered(const dtuple_t &entry,
const dict_index_t &index, trx_t *trx)
{
return bulk_store->bulk_insert_buffered(entry, index, trx);
}
/** Do bulk insert operation present in the buffered operation
@return DB_SUCCESS or error code */
dberr_t write_bulk(dict_table_t *table, trx_t *trx)
{
if (!bulk_store)
return DB_SUCCESS;
dberr_t err= bulk_store->write_to_table(table, trx);
delete bulk_store;
bulk_store= nullptr;
return err;
}
/** @return whether the buffer storage exist */
bool bulk_buffer_exist()
{
if (is_bulk_insert() && bulk_store)
return true;
return false;
}
};
/** Collection of persistent tables and their first modification
......@@ -1065,6 +1105,36 @@ struct trx_t : ilist_node<>
return false;
}
/** @return logical modification time of a table only
if the table has bulk buffer exist in the transaction */
trx_mod_table_time_t *check_bulk_buffer(dict_table_t *table)
{
if (UNIV_LIKELY(!bulk_insert))
return nullptr;
ut_ad(!check_unique_secondary);
ut_ad(!check_foreigns);
auto it= mod_tables.find(table);
if (it == mod_tables.end() || !it->second.bulk_buffer_exist())
return nullptr;
return &it->second;
}
/** Do the bulk insert for the buffered insert operation
for the transaction.
@return DB_SUCCESS or error code */
dberr_t bulk_insert_apply()
{
if (UNIV_LIKELY(!bulk_insert))
return DB_SUCCESS;
ut_ad(!check_unique_secondary);
ut_ad(!check_foreigns);
for (auto& t : mod_tables)
if (t.second.is_bulk_insert())
if (dberr_t err= t.second.write_bulk(t.first, this))
return err;
return DB_SUCCESS;
}
private:
/** Assign a rollback segment for modifying temporary tables.
@return the assigned rollback segment */
......
......@@ -876,7 +876,9 @@ void fts_parallel_tokenization(
if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
row_merge_buf_write(buf[t_ctx.buf_used],
#ifndef DBUG_OFF
merge_file[t_ctx.buf_used],
#endif
block[t_ctx.buf_used]);
if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
......@@ -942,8 +944,11 @@ void fts_parallel_tokenization(
for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
if (t_ctx.rows_added[i]) {
row_merge_buf_sort(buf[i], NULL);
row_merge_buf_write(
buf[i], merge_file[i], block[i]);
row_merge_buf_write(buf[i],
#ifndef DBUG_OFF
merge_file[i],
#endif
block[i]);
/* Write to temp file, only if records have
been flushed to temp file before (offset > 0):
......
......@@ -2641,15 +2641,20 @@ row_ins_clust_index_entry_low(
&& !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) {
DEBUG_SYNC_C("empty_root_page_insert");
trx->bulk_insert = true;
if (!index->table->is_temporary()) {
err = lock_table(index->table, LOCK_X, thr);
if (err != DB_SUCCESS) {
trx->error_state = err;
trx->bulk_insert = false;
goto commit_exit;
}
if (index->table->n_rec_locks) {
avoid_bulk:
trx->bulk_insert = false;
goto skip_bulk_insert;
}
......@@ -2664,9 +2669,20 @@ row_ins_clust_index_entry_low(
#else /* BTR_CUR_HASH_ADAPT */
index->table->bulk_trx_id = trx->id;
#endif /* BTR_CUR_HASH_ADAPT */
}
trx->bulk_insert = true;
/* Write TRX_UNDO_EMPTY undo log and
start buffering the insert operation */
err = trx_undo_report_row_operation(
thr, index, entry,
nullptr, 0, nullptr, nullptr,
nullptr);
if (err != DB_SUCCESS) {
goto avoid_bulk;
}
goto commit_exit;
}
}
skip_bulk_insert:
......@@ -3269,7 +3285,7 @@ row_ins_sec_index_entry(
bool check_foreign) /*!< in: true if check
foreign table is needed, false otherwise */
{
dberr_t err;
dberr_t err = DB_SUCCESS;
mem_heap_t* offsets_heap;
mem_heap_t* heap;
trx_id_t trx_id = 0;
......@@ -3346,13 +3362,21 @@ row_ins_index_entry(
dtuple_t* entry, /*!< in/out: index entry to insert */
que_thr_t* thr) /*!< in: query thread */
{
ut_ad(thr_get_trx(thr)->id || index->table->no_rollback()
trx_t* trx = thr_get_trx(thr);
ut_ad(trx->id || index->table->no_rollback()
|| index->table->is_temporary());
DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
DBUG_SET("-d,row_ins_index_entry_timeout");
return(DB_LOCK_WAIT);});
if (auto t= trx->check_bulk_buffer(index->table)) {
/* MDEV-25036 FIXME: check also foreign key constraints */
ut_ad(!trx->check_foreigns);
return t->bulk_insert_buffered(*entry, *index, trx);
}
if (index->is_primary()) {
return row_ins_clust_index_entry(index, entry, thr, 0);
} else {
......
This diff is collapsed.
......@@ -1952,8 +1952,7 @@ TRANSACTIONAL_TARGET ATTRIBUTE_COLD ATTRIBUTE_NOINLINE
/** @return whether the transaction holds an exclusive lock on a table */
static bool trx_has_lock_x(const trx_t &trx, dict_table_t& table)
{
if (table.is_temporary())
return true;
ut_ad(!table.is_temporary());
uint32_t n;
......@@ -2050,9 +2049,16 @@ trx_undo_report_row_operation(
ut_ad(que_node_get_type(thr->run_node) == QUE_NODE_INSERT);
ut_ad(trx->bulk_insert);
return DB_SUCCESS;
} else if (m.second && trx->bulk_insert
&& trx_has_lock_x(*trx, *index->table)) {
m.first->second.start_bulk_insert();
} else if (!m.second || !trx->bulk_insert) {
bulk = false;
} else if (index->table->is_temporary()) {
} else if (trx_has_lock_x(*trx, *index->table)) {
m.first->second.start_bulk_insert(index->table);
if (dberr_t err = m.first->second.bulk_insert_buffered(
*clust_entry, *index, trx)) {
return err;
}
} else {
bulk = false;
}
......
......@@ -1634,6 +1634,9 @@ trx_mark_sql_stat_end(
}
if (trx->is_bulk_insert()) {
/* MDEV-25036 FIXME: we support buffered
insert only for the first insert statement */
trx->error_state = trx->bulk_insert_apply();
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
return;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment