Commit 79b54d9b authored by Christoph Hellwig's avatar Christoph Hellwig Committed by Darrick J. Wong

xfs: use bios directly to write log buffers

Currently the XFS logging code uses the xfs_buf structure and
associated APIs to write the log buffers to disk.  This requires
various special cases in the log code and is generally not very
optimal.

Instead of using a buffer just allocate a kmem_alloc_larger region for
each log buffer, and use a bio and bio_vec array embedded in the iclog
structure to write the buffer to disk.  This also allows for using
the bio split and chaining case to deal with the case of a log
buffer wrapping around the end of the log.
Signed-off-by: default avatarChristoph Hellwig <hch@lst.de>
Reviewed-by: default avatarDarrick J. Wong <darrick.wong@oracle.com>
[darrick: don't split if/else with an #endif]
Signed-off-by: default avatarDarrick J. Wong <darrick.wong@oracle.com>
parent 2d15d2c0
...@@ -1239,32 +1239,29 @@ xlog_space_left( ...@@ -1239,32 +1239,29 @@ xlog_space_left(
} }
/*
* Log function which is called when an io completes.
*
* The log manager needs its own routine, in order to control what
* happens with the buffer after the write completes.
*/
static void static void
xlog_iodone(xfs_buf_t *bp) xlog_ioend_work(
struct work_struct *work)
{ {
struct xlog_in_core *iclog = bp->b_log_item; struct xlog_in_core *iclog =
struct xlog *l = iclog->ic_log; container_of(work, struct xlog_in_core, ic_end_io_work);
struct xlog *log = iclog->ic_log;
int aborted = 0; int aborted = 0;
int error;
error = blk_status_to_errno(iclog->ic_bio.bi_status);
#ifdef DEBUG #ifdef DEBUG
/* treat writes with injected CRC errors as failed */ /* treat writes with injected CRC errors as failed */
if (iclog->ic_fail_crc) if (iclog->ic_fail_crc)
bp->b_error = -EIO; error = -EIO;
#endif #endif
/* /*
* Race to shutdown the filesystem if we see an error. * Race to shutdown the filesystem if we see an error.
*/ */
if (XFS_TEST_ERROR(bp->b_error, l->l_mp, XFS_ERRTAG_IODONE_IOERR)) { if (XFS_TEST_ERROR(error, log->l_mp, XFS_ERRTAG_IODONE_IOERR)) {
xfs_buf_ioerror_alert(bp, __func__); xfs_alert(log->l_mp, "log I/O error %d", error);
xfs_buf_stale(bp); xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
xfs_force_shutdown(l->l_mp, SHUTDOWN_LOG_IO_ERROR);
/* /*
* This flag will be propagated to the trans-committed * This flag will be propagated to the trans-committed
* callback routines to let them know that the log-commit * callback routines to let them know that the log-commit
...@@ -1275,17 +1272,16 @@ xlog_iodone(xfs_buf_t *bp) ...@@ -1275,17 +1272,16 @@ xlog_iodone(xfs_buf_t *bp)
aborted = XFS_LI_ABORTED; aborted = XFS_LI_ABORTED;
} }
/* log I/O is always issued ASYNC */
ASSERT(bp->b_flags & XBF_ASYNC);
xlog_state_done_syncing(iclog, aborted); xlog_state_done_syncing(iclog, aborted);
bio_uninit(&iclog->ic_bio);
/* /*
* drop the buffer lock now that we are done. Nothing references * Drop the lock to signal that we are done. Nothing references the
* the buffer after this, so an unmount waiting on this lock can now * iclog after this, so an unmount waiting on this lock can now tear it
* tear it down safely. As such, it is unsafe to reference the buffer * down safely. As such, it is unsafe to reference the iclog after the
* (bp) after the unlock as we could race with it being freed. * unlock as we could race with it being freed.
*/ */
xfs_buf_unlock(bp); up(&iclog->ic_sema);
} }
/* /*
...@@ -1378,7 +1374,6 @@ xlog_alloc_log( ...@@ -1378,7 +1374,6 @@ xlog_alloc_log(
xlog_rec_header_t *head; xlog_rec_header_t *head;
xlog_in_core_t **iclogp; xlog_in_core_t **iclogp;
xlog_in_core_t *iclog, *prev_iclog=NULL; xlog_in_core_t *iclog, *prev_iclog=NULL;
xfs_buf_t *bp;
int i; int i;
int error = -ENOMEM; int error = -ENOMEM;
uint log2_size = 0; uint log2_size = 0;
...@@ -1436,30 +1431,6 @@ xlog_alloc_log( ...@@ -1436,30 +1431,6 @@ xlog_alloc_log(
xlog_get_iclog_buffer_size(mp, log); xlog_get_iclog_buffer_size(mp, log);
/*
* Use a NULL block for the extra log buffer used during splits so that
* it will trigger errors if we ever try to do IO on it without first
* having set it up properly.
*/
error = -ENOMEM;
bp = xfs_buf_alloc(log->l_targ, XFS_BUF_DADDR_NULL,
BTOBB(log->l_iclog_size), XBF_NO_IOACCT);
if (!bp)
goto out_free_log;
/*
* The iclogbuf buffer locks are held over IO but we are not going to do
* IO yet. Hence unlock the buffer so that the log IO path can grab it
* when appropriately.
*/
ASSERT(xfs_buf_islocked(bp));
xfs_buf_unlock(bp);
/* use high priority wq for log I/O completion */
bp->b_ioend_wq = mp->m_log_workqueue;
bp->b_iodone = xlog_iodone;
log->l_xbuf = bp;
spin_lock_init(&log->l_icloglock); spin_lock_init(&log->l_icloglock);
init_waitqueue_head(&log->l_flush_wait); init_waitqueue_head(&log->l_flush_wait);
...@@ -1472,29 +1443,21 @@ xlog_alloc_log( ...@@ -1472,29 +1443,21 @@ xlog_alloc_log(
* xlog_in_core_t in xfs_log_priv.h for details. * xlog_in_core_t in xfs_log_priv.h for details.
*/ */
ASSERT(log->l_iclog_size >= 4096); ASSERT(log->l_iclog_size >= 4096);
for (i=0; i < log->l_iclog_bufs; i++) { for (i = 0; i < log->l_iclog_bufs; i++) {
*iclogp = kmem_zalloc(sizeof(xlog_in_core_t), KM_MAYFAIL); size_t bvec_size = howmany(log->l_iclog_size, PAGE_SIZE);
if (!*iclogp)
iclog = kmem_zalloc(sizeof(*iclog) + bvec_size, KM_MAYFAIL);
if (!iclog)
goto out_free_iclog; goto out_free_iclog;
iclog = *iclogp; *iclogp = iclog;
iclog->ic_prev = prev_iclog; iclog->ic_prev = prev_iclog;
prev_iclog = iclog; prev_iclog = iclog;
bp = xfs_buf_get_uncached(mp->m_logdev_targp, iclog->ic_data = kmem_alloc_large(log->l_iclog_size,
BTOBB(log->l_iclog_size), KM_MAYFAIL);
XBF_NO_IOACCT); if (!iclog->ic_data)
if (!bp)
goto out_free_iclog; goto out_free_iclog;
ASSERT(xfs_buf_islocked(bp));
xfs_buf_unlock(bp);
/* use high priority wq for log I/O completion */
bp->b_ioend_wq = mp->m_log_workqueue;
bp->b_iodone = xlog_iodone;
iclog->ic_bp = bp;
iclog->ic_data = bp->b_addr;
#ifdef DEBUG #ifdef DEBUG
log->l_iclog_bak[i] = &iclog->ic_header; log->l_iclog_bak[i] = &iclog->ic_header;
#endif #endif
...@@ -1508,7 +1471,7 @@ xlog_alloc_log( ...@@ -1508,7 +1471,7 @@ xlog_alloc_log(
head->h_fmt = cpu_to_be32(XLOG_FMT); head->h_fmt = cpu_to_be32(XLOG_FMT);
memcpy(&head->h_fs_uuid, &mp->m_sb.sb_uuid, sizeof(uuid_t)); memcpy(&head->h_fs_uuid, &mp->m_sb.sb_uuid, sizeof(uuid_t));
iclog->ic_size = BBTOB(bp->b_length) - log->l_iclog_hsize; iclog->ic_size = log->l_iclog_size - log->l_iclog_hsize;
iclog->ic_state = XLOG_STATE_ACTIVE; iclog->ic_state = XLOG_STATE_ACTIVE;
iclog->ic_log = log; iclog->ic_log = log;
atomic_set(&iclog->ic_refcnt, 0); atomic_set(&iclog->ic_refcnt, 0);
...@@ -1518,6 +1481,8 @@ xlog_alloc_log( ...@@ -1518,6 +1481,8 @@ xlog_alloc_log(
init_waitqueue_head(&iclog->ic_force_wait); init_waitqueue_head(&iclog->ic_force_wait);
init_waitqueue_head(&iclog->ic_write_wait); init_waitqueue_head(&iclog->ic_write_wait);
INIT_WORK(&iclog->ic_end_io_work, xlog_ioend_work);
sema_init(&iclog->ic_sema, 1);
iclogp = &iclog->ic_next; iclogp = &iclog->ic_next;
} }
...@@ -1532,11 +1497,9 @@ xlog_alloc_log( ...@@ -1532,11 +1497,9 @@ xlog_alloc_log(
out_free_iclog: out_free_iclog:
for (iclog = log->l_iclog; iclog; iclog = prev_iclog) { for (iclog = log->l_iclog; iclog; iclog = prev_iclog) {
prev_iclog = iclog->ic_next; prev_iclog = iclog->ic_next;
if (iclog->ic_bp) kmem_free(iclog->ic_data);
xfs_buf_free(iclog->ic_bp);
kmem_free(iclog); kmem_free(iclog);
} }
xfs_buf_free(log->l_xbuf);
out_free_log: out_free_log:
kmem_free(log); kmem_free(log);
out: out:
...@@ -1721,23 +1684,43 @@ xlog_cksum( ...@@ -1721,23 +1684,43 @@ xlog_cksum(
return xfs_end_cksum(crc); return xfs_end_cksum(crc);
} }
static void
xlog_bio_end_io(
struct bio *bio)
{
struct xlog_in_core *iclog = bio->bi_private;
queue_work(iclog->ic_log->l_mp->m_log_workqueue,
&iclog->ic_end_io_work);
}
static void
xlog_map_iclog_data(
struct bio *bio,
void *data,
size_t count)
{
do {
struct page *page = kmem_to_page(data);
unsigned int off = offset_in_page(data);
size_t len = min_t(size_t, count, PAGE_SIZE - off);
WARN_ON_ONCE(bio_add_page(bio, page, len, off) != len);
data += len;
count -= len;
} while (count);
}
STATIC void STATIC void
xlog_write_iclog( xlog_write_iclog(
struct xlog *log, struct xlog *log,
struct xlog_in_core *iclog, struct xlog_in_core *iclog,
struct xfs_buf *bp,
uint64_t bno, uint64_t bno,
unsigned int count,
bool need_flush) bool need_flush)
{ {
ASSERT(bno < log->l_logBBsize); ASSERT(bno < log->l_logBBsize);
ASSERT(bno + bp->b_io_length <= log->l_logBBsize);
bp->b_maps[0].bm_bn = log->l_logBBstart + bno;
bp->b_log_item = iclog;
bp->b_flags &= ~XBF_FLUSH;
bp->b_flags |= (XBF_ASYNC | XBF_SYNCIO | XBF_WRITE | XBF_FUA);
if (need_flush)
bp->b_flags |= XBF_FLUSH;
/* /*
* We lock the iclogbufs here so that we can serialise against I/O * We lock the iclogbufs here so that we can serialise against I/O
...@@ -1747,21 +1730,52 @@ xlog_write_iclog( ...@@ -1747,21 +1730,52 @@ xlog_write_iclog(
* tearing down the iclogbufs. Hence we need to hold the buffer lock * tearing down the iclogbufs. Hence we need to hold the buffer lock
* across the log IO to archieve that. * across the log IO to archieve that.
*/ */
xfs_buf_lock(bp); down(&iclog->ic_sema);
if (unlikely(iclog->ic_state & XLOG_STATE_IOERROR)) { if (unlikely(iclog->ic_state & XLOG_STATE_IOERROR)) {
xfs_buf_ioerror(bp, -EIO);
xfs_buf_stale(bp);
xfs_buf_ioend(bp);
/* /*
* It would seem logical to return EIO here, but we rely on * It would seem logical to return EIO here, but we rely on
* the log state machine to propagate I/O errors instead of * the log state machine to propagate I/O errors instead of
* doing it here. Similarly, IO completion will unlock the * doing it here. We kick of the state machine and unlock
* buffer, so we don't do it here. * the buffer manually, the code needs to be kept in sync
* with the I/O completion path.
*/ */
xlog_state_done_syncing(iclog, XFS_LI_ABORTED);
up(&iclog->ic_sema);
return; return;
} }
xfs_buf_submit(bp); iclog->ic_io_size = count;
bio_init(&iclog->ic_bio, iclog->ic_bvec, howmany(count, PAGE_SIZE));
bio_set_dev(&iclog->ic_bio, log->l_targ->bt_bdev);
iclog->ic_bio.bi_iter.bi_sector = log->l_logBBstart + bno;
iclog->ic_bio.bi_end_io = xlog_bio_end_io;
iclog->ic_bio.bi_private = iclog;
iclog->ic_bio.bi_opf = REQ_OP_WRITE | REQ_META | REQ_SYNC | REQ_FUA;
if (need_flush)
iclog->ic_bio.bi_opf |= REQ_PREFLUSH;
xlog_map_iclog_data(&iclog->ic_bio, iclog->ic_data, iclog->ic_io_size);
if (is_vmalloc_addr(iclog->ic_data))
flush_kernel_vmap_range(iclog->ic_data, iclog->ic_io_size);
/*
* If this log buffer would straddle the end of the log we will have
* to split it up into two bios, so that we can continue at the start.
*/
if (bno + BTOBB(count) > log->l_logBBsize) {
struct bio *split;
split = bio_split(&iclog->ic_bio, log->l_logBBsize - bno,
GFP_NOIO, &fs_bio_set);
bio_chain(split, &iclog->ic_bio);
submit_bio(split);
/* restart at logical offset zero for the remainder */
iclog->ic_bio.bi_iter.bi_sector = log->l_logBBstart;
}
submit_bio(&iclog->ic_bio);
} }
/* /*
...@@ -1769,7 +1783,7 @@ xlog_write_iclog( ...@@ -1769,7 +1783,7 @@ xlog_write_iclog(
* written to the start of the log. Watch out for the header magic * written to the start of the log. Watch out for the header magic
* number case, though. * number case, though.
*/ */
static unsigned int static void
xlog_split_iclog( xlog_split_iclog(
struct xlog *log, struct xlog *log,
void *data, void *data,
...@@ -1786,8 +1800,6 @@ xlog_split_iclog( ...@@ -1786,8 +1800,6 @@ xlog_split_iclog(
cycle++; cycle++;
put_unaligned_be32(cycle, data + i); put_unaligned_be32(cycle, data + i);
} }
return split_offset;
} }
static int static int
...@@ -1854,9 +1866,8 @@ xlog_sync( ...@@ -1854,9 +1866,8 @@ xlog_sync(
unsigned int count; /* byte count of bwrite */ unsigned int count; /* byte count of bwrite */
unsigned int roundoff; /* roundoff to BB or stripe */ unsigned int roundoff; /* roundoff to BB or stripe */
uint64_t bno; uint64_t bno;
unsigned int split = 0;
unsigned int size; unsigned int size;
bool need_flush = true; bool need_flush = true, split = false;
ASSERT(atomic_read(&iclog->ic_refcnt) == 0); ASSERT(atomic_read(&iclog->ic_refcnt) == 0);
...@@ -1881,8 +1892,10 @@ xlog_sync( ...@@ -1881,8 +1892,10 @@ xlog_sync(
bno = BLOCK_LSN(be64_to_cpu(iclog->ic_header.h_lsn)); bno = BLOCK_LSN(be64_to_cpu(iclog->ic_header.h_lsn));
/* Do we need to split this write into 2 parts? */ /* Do we need to split this write into 2 parts? */
if (bno + BTOBB(count) > log->l_logBBsize) if (bno + BTOBB(count) > log->l_logBBsize) {
split = xlog_split_iclog(log, &iclog->ic_header, bno, count); xlog_split_iclog(log, &iclog->ic_header, bno, count);
split = true;
}
/* calculcate the checksum */ /* calculcate the checksum */
iclog->ic_header.h_crc = xlog_cksum(log, &iclog->ic_header, iclog->ic_header.h_crc = xlog_cksum(log, &iclog->ic_header,
...@@ -1917,18 +1930,8 @@ xlog_sync( ...@@ -1917,18 +1930,8 @@ xlog_sync(
need_flush = false; need_flush = false;
} }
iclog->ic_bp->b_io_length = BTOBB(split ? split : count);
iclog->ic_bwritecnt = split ? 2 : 1;
xlog_verify_iclog(log, iclog, count); xlog_verify_iclog(log, iclog, count);
xlog_write_iclog(log, iclog, iclog->ic_bp, bno, need_flush); xlog_write_iclog(log, iclog, bno, count, need_flush);
if (split) {
xfs_buf_associate_memory(iclog->ic_log->l_xbuf,
(char *)&iclog->ic_header + split,
count - split);
xlog_write_iclog(log, iclog, iclog->ic_log->l_xbuf, 0, false);
}
} }
/* /*
...@@ -1949,25 +1952,15 @@ xlog_dealloc_log( ...@@ -1949,25 +1952,15 @@ xlog_dealloc_log(
*/ */
iclog = log->l_iclog; iclog = log->l_iclog;
for (i = 0; i < log->l_iclog_bufs; i++) { for (i = 0; i < log->l_iclog_bufs; i++) {
xfs_buf_lock(iclog->ic_bp); down(&iclog->ic_sema);
xfs_buf_unlock(iclog->ic_bp); up(&iclog->ic_sema);
iclog = iclog->ic_next; iclog = iclog->ic_next;
} }
/*
* Always need to ensure that the extra buffer does not point to memory
* owned by another log buffer before we free it. Also, cycle the lock
* first to ensure we've completed IO on it.
*/
xfs_buf_lock(log->l_xbuf);
xfs_buf_unlock(log->l_xbuf);
xfs_buf_set_empty(log->l_xbuf, BTOBB(log->l_iclog_size));
xfs_buf_free(log->l_xbuf);
iclog = log->l_iclog; iclog = log->l_iclog;
for (i = 0; i < log->l_iclog_bufs; i++) { for (i = 0; i < log->l_iclog_bufs; i++) {
xfs_buf_free(iclog->ic_bp);
next_iclog = iclog->ic_next; next_iclog = iclog->ic_next;
kmem_free(iclog->ic_data);
kmem_free(iclog); kmem_free(iclog);
iclog = next_iclog; iclog = next_iclog;
} }
...@@ -2892,8 +2885,6 @@ xlog_state_done_syncing( ...@@ -2892,8 +2885,6 @@ xlog_state_done_syncing(
ASSERT(iclog->ic_state == XLOG_STATE_SYNCING || ASSERT(iclog->ic_state == XLOG_STATE_SYNCING ||
iclog->ic_state == XLOG_STATE_IOERROR); iclog->ic_state == XLOG_STATE_IOERROR);
ASSERT(atomic_read(&iclog->ic_refcnt) == 0); ASSERT(atomic_read(&iclog->ic_refcnt) == 0);
ASSERT(iclog->ic_bwritecnt == 1 || iclog->ic_bwritecnt == 2);
/* /*
* If we got an error, either on the first buffer, or in the case of * If we got an error, either on the first buffer, or in the case of
...@@ -2901,13 +2892,8 @@ xlog_state_done_syncing( ...@@ -2901,13 +2892,8 @@ xlog_state_done_syncing(
* and none should ever be attempted to be written to disk * and none should ever be attempted to be written to disk
* again. * again.
*/ */
if (iclog->ic_state != XLOG_STATE_IOERROR) { if (iclog->ic_state != XLOG_STATE_IOERROR)
if (--iclog->ic_bwritecnt == 1) {
spin_unlock(&log->l_icloglock);
return;
}
iclog->ic_state = XLOG_STATE_DONE_SYNC; iclog->ic_state = XLOG_STATE_DONE_SYNC;
}
/* /*
* Someone could be sleeping prior to writing out the next * Someone could be sleeping prior to writing out the next
......
...@@ -178,11 +178,12 @@ typedef struct xlog_ticket { ...@@ -178,11 +178,12 @@ typedef struct xlog_ticket {
* the iclog. * the iclog.
* - ic_forcewait is used to implement synchronous forcing of the iclog to disk. * - ic_forcewait is used to implement synchronous forcing of the iclog to disk.
* - ic_next is the pointer to the next iclog in the ring. * - ic_next is the pointer to the next iclog in the ring.
* - ic_bp is a pointer to the buffer used to write this incore log to disk.
* - ic_log is a pointer back to the global log structure. * - ic_log is a pointer back to the global log structure.
* - ic_callback is a linked list of callback function/argument pairs to be * - ic_callback is a linked list of callback function/argument pairs to be
* called after an iclog finishes writing. * called after an iclog finishes writing.
* - ic_size is the full size of the header plus data. * - ic_size is the full size of the log buffer, minus the cycle headers.
* - ic_io_size is the size of the currently pending log buffer write, which
* might be smaller than ic_size
* - ic_offset is the current number of bytes written to in this iclog. * - ic_offset is the current number of bytes written to in this iclog.
* - ic_refcnt is bumped when someone is writing to the log. * - ic_refcnt is bumped when someone is writing to the log.
* - ic_state is the state of the iclog. * - ic_state is the state of the iclog.
...@@ -205,11 +206,10 @@ typedef struct xlog_in_core { ...@@ -205,11 +206,10 @@ typedef struct xlog_in_core {
wait_queue_head_t ic_write_wait; wait_queue_head_t ic_write_wait;
struct xlog_in_core *ic_next; struct xlog_in_core *ic_next;
struct xlog_in_core *ic_prev; struct xlog_in_core *ic_prev;
struct xfs_buf *ic_bp;
struct xlog *ic_log; struct xlog *ic_log;
int ic_size; u32 ic_size;
int ic_offset; u32 ic_io_size;
int ic_bwritecnt; u32 ic_offset;
unsigned short ic_state; unsigned short ic_state;
char *ic_datap; /* pointer to iclog data */ char *ic_datap; /* pointer to iclog data */
...@@ -225,6 +225,10 @@ typedef struct xlog_in_core { ...@@ -225,6 +225,10 @@ typedef struct xlog_in_core {
#ifdef DEBUG #ifdef DEBUG
bool ic_fail_crc : 1; bool ic_fail_crc : 1;
#endif #endif
struct semaphore ic_sema;
struct work_struct ic_end_io_work;
struct bio ic_bio;
struct bio_vec ic_bvec[];
} xlog_in_core_t; } xlog_in_core_t;
/* /*
...@@ -352,8 +356,6 @@ struct xlog { ...@@ -352,8 +356,6 @@ struct xlog {
struct xfs_mount *l_mp; /* mount point */ struct xfs_mount *l_mp; /* mount point */
struct xfs_ail *l_ailp; /* AIL log is working with */ struct xfs_ail *l_ailp; /* AIL log is working with */
struct xfs_cil *l_cilp; /* CIL log is working with */ struct xfs_cil *l_cilp; /* CIL log is working with */
struct xfs_buf *l_xbuf; /* extra buffer for log
* wrapping */
struct xfs_buftarg *l_targ; /* buftarg of log */ struct xfs_buftarg *l_targ; /* buftarg of log */
struct delayed_work l_work; /* background flush work */ struct delayed_work l_work; /* background flush work */
uint l_flags; uint l_flags;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment