Commit 45bb912b authored by Lars Ellenberg's avatar Lars Ellenberg Committed by Philipp Reisner

drbd: Allow drbd_epoch_entries to use multiple bios.

This should allow for better performance if the lower level IO stack
of the peers differs in limits exposed either via the queue,
or via some merge_bvec_fn.
Signed-off-by: default avatarPhilipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: default avatarLars Ellenberg <lars.ellenberg@linbit.com>
parent 708d740e
......@@ -740,18 +740,6 @@ enum epoch_event {
EV_CLEANUP = 32, /* used as flag */
};
struct drbd_epoch_entry {
struct drbd_work w;
struct drbd_conf *mdev;
struct bio *private_bio;
struct hlist_node colision;
sector_t sector;
unsigned int size;
unsigned int flags;
struct drbd_epoch *epoch;
u64 block_id;
};
struct drbd_wq_barrier {
struct drbd_work w;
struct completion done;
......@@ -762,17 +750,49 @@ struct digest_info {
void *digest;
};
/* ee flag bits */
struct drbd_epoch_entry {
struct drbd_work w;
struct hlist_node colision;
struct drbd_epoch *epoch;
struct drbd_conf *mdev;
struct page *pages;
atomic_t pending_bios;
unsigned int size;
/* see comments on ee flag bits below */
unsigned long flags;
sector_t sector;
u64 block_id;
};
/* ee flag bits.
* While corresponding bios are in flight, the only modification will be
* set_bit WAS_ERROR, which has to be atomic.
* If no bios are in flight yet, or all have been completed,
* non-atomic modification to ee->flags is ok.
*/
enum {
__EE_CALL_AL_COMPLETE_IO,
__EE_CONFLICT_PENDING,
__EE_MAY_SET_IN_SYNC,
/* This epoch entry closes an epoch using a barrier.
* On sucessful completion, the epoch is released,
* and the P_BARRIER_ACK send. */
__EE_IS_BARRIER,
/* In case a barrier failed,
* we need to resubmit without the barrier flag. */
__EE_RESUBMITTED,
/* we may have several bios per epoch entry.
* if any of those fail, we set this flag atomically
* from the endio callback */
__EE_WAS_ERROR,
};
#define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO)
#define EE_CONFLICT_PENDING (1<<__EE_CONFLICT_PENDING)
#define EE_MAY_SET_IN_SYNC (1<<__EE_MAY_SET_IN_SYNC)
#define EE_IS_BARRIER (1<<__EE_IS_BARRIER)
#define EE_RESUBMITTED (1<<__EE_RESUBMITTED)
#define EE_WAS_ERROR (1<<__EE_WAS_ERROR)
/* global flag bits */
enum {
......@@ -1441,7 +1461,8 @@ static inline void ov_oos_print(struct drbd_conf *mdev)
}
extern void drbd_csum(struct drbd_conf *, struct crypto_hash *, struct bio *, void *);
extern void drbd_csum_bio(struct drbd_conf *, struct crypto_hash *, struct bio *, void *);
extern void drbd_csum_ee(struct drbd_conf *, struct crypto_hash *, struct drbd_epoch_entry *, void *);
/* worker callbacks */
extern int w_req_cancel_conflict(struct drbd_conf *, struct drbd_work *, int);
extern int w_read_retry_remote(struct drbd_conf *, struct drbd_work *, int);
......@@ -1465,6 +1486,8 @@ extern int w_e_reissue(struct drbd_conf *, struct drbd_work *, int);
extern void resync_timer_fn(unsigned long data);
/* drbd_receiver.c */
extern int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
const unsigned rw, const int fault_type);
extern int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list);
extern struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
u64 id,
......@@ -1620,6 +1643,41 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
* inline helper functions
*************************/
/* see also page_chain_add and friends in drbd_receiver.c */
static inline struct page *page_chain_next(struct page *page)
{
return (struct page *)page_private(page);
}
#define page_chain_for_each(page) \
for (; page && ({ prefetch(page_chain_next(page)); 1; }); \
page = page_chain_next(page))
#define page_chain_for_each_safe(page, n) \
for (; page && ({ n = page_chain_next(page); 1; }); page = n)
static inline int drbd_bio_has_active_page(struct bio *bio)
{
struct bio_vec *bvec;
int i;
__bio_for_each_segment(bvec, bio, i, 0) {
if (page_count(bvec->bv_page) > 1)
return 1;
}
return 0;
}
static inline int drbd_ee_has_active_page(struct drbd_epoch_entry *e)
{
struct page *page = e->pages;
page_chain_for_each(page) {
if (page_count(page) > 1)
return 1;
}
return 0;
}
static inline void drbd_state_lock(struct drbd_conf *mdev)
{
wait_event(mdev->misc_wait,
......
......@@ -2354,6 +2354,19 @@ static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
return 1;
}
static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
{
struct page *page = e->pages;
unsigned len = e->size;
page_chain_for_each(page) {
unsigned l = min_t(unsigned, len, PAGE_SIZE);
if (!_drbd_send_page(mdev, page, 0, l))
return 0;
len -= l;
}
return 1;
}
static void consider_delay_probes(struct drbd_conf *mdev)
{
if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93)
......@@ -2430,7 +2443,7 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
if (ok && dgs) {
dgb = mdev->int_dig_out;
drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
}
if (ok) {
......@@ -2483,11 +2496,11 @@ int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
sizeof(p), MSG_MORE);
if (ok && dgs) {
dgb = mdev->int_dig_out;
drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb);
drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
}
if (ok)
ok = _drbd_send_zc_bio(mdev, e->private_bio);
ok = _drbd_send_zc_ee(mdev, e);
drbd_put_data_sock(mdev);
......
......@@ -2215,9 +2215,9 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
{
struct cn_msg *cn_reply;
struct drbd_nl_cfg_reply *reply;
struct bio_vec *bvec;
unsigned short *tl;
int i;
struct page *page;
unsigned len;
if (!e)
return;
......@@ -2255,11 +2255,15 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
put_unaligned(T_ee_data, tl++);
put_unaligned(e->size, tl++);
__bio_for_each_segment(bvec, e->private_bio, i, 0) {
void *d = kmap(bvec->bv_page);
memcpy(tl, d + bvec->bv_offset, bvec->bv_len);
kunmap(bvec->bv_page);
tl=(unsigned short*)((char*)tl + bvec->bv_len);
len = e->size;
page = e->pages;
page_chain_for_each(page) {
void *d = kmap_atomic(page, KM_USER0);
unsigned l = min_t(unsigned, len, PAGE_SIZE);
memcpy(tl, d, l);
kunmap_atomic(d, KM_USER0);
tl = (unsigned short*)((char*)tl + l);
len -= l;
}
put_unaligned(TT_END, tl++); /* Close the tag list */
......
......@@ -80,30 +80,124 @@ static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epo
#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
/*
* some helper functions to deal with single linked page lists,
* page->private being our "next" pointer.
*/
/* If at least n pages are linked at head, get n pages off.
* Otherwise, don't modify head, and return NULL.
* Locking is the responsibility of the caller.
*/
static struct page *page_chain_del(struct page **head, int n)
{
struct page *page;
struct page *tmp;
BUG_ON(!n);
BUG_ON(!head);
page = *head;
while (page) {
tmp = page_chain_next(page);
if (--n == 0)
break; /* found sufficient pages */
if (tmp == NULL)
/* insufficient pages, don't use any of them. */
return NULL;
page = tmp;
}
/* add end of list marker for the returned list */
set_page_private(page, 0);
/* actual return value, and adjustment of head */
page = *head;
*head = tmp;
return page;
}
/* may be used outside of locks to find the tail of a (usually short)
* "private" page chain, before adding it back to a global chain head
* with page_chain_add() under a spinlock. */
static struct page *page_chain_tail(struct page *page, int *len)
{
struct page *tmp;
int i = 1;
while ((tmp = page_chain_next(page)))
++i, page = tmp;
if (len)
*len = i;
return page;
}
static int page_chain_free(struct page *page)
{
struct page *tmp;
int i = 0;
page_chain_for_each_safe(page, tmp) {
put_page(page);
++i;
}
return i;
}
static void page_chain_add(struct page **head,
struct page *chain_first, struct page *chain_last)
{
#if 1
struct page *tmp;
tmp = page_chain_tail(chain_first, NULL);
BUG_ON(tmp != chain_last);
#endif
/* add chain to head */
set_page_private(chain_last, (unsigned long)*head);
*head = chain_first;
}
static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
{
struct page *page = NULL;
struct page *tmp = NULL;
int i = 0;
/* Yes, testing drbd_pp_vacant outside the lock is racy.
* So what. It saves a spin_lock. */
if (drbd_pp_vacant > 0) {
if (drbd_pp_vacant >= number) {
spin_lock(&drbd_pp_lock);
page = drbd_pp_pool;
if (page) {
drbd_pp_pool = (struct page *)page_private(page);
set_page_private(page, 0); /* just to be polite */
drbd_pp_vacant--;
}
page = page_chain_del(&drbd_pp_pool, number);
if (page)
drbd_pp_vacant -= number;
spin_unlock(&drbd_pp_lock);
if (page)
return page;
}
/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
* "criss-cross" setup, that might cause write-out on some other DRBD,
* which in turn might block on the other node at this very place. */
if (!page)
page = alloc_page(GFP_TRY);
if (page)
atomic_inc(&mdev->pp_in_use);
return page;
for (i = 0; i < number; i++) {
tmp = alloc_page(GFP_TRY);
if (!tmp)
break;
set_page_private(tmp, (unsigned long)page);
page = tmp;
}
if (i == number)
return page;
/* Not enough pages immediately available this time.
* No need to jump around here, drbd_pp_alloc will retry this
* function "soon". */
if (page) {
tmp = page_chain_tail(page, NULL);
spin_lock(&drbd_pp_lock);
page_chain_add(&drbd_pp_pool, page, tmp);
drbd_pp_vacant += i;
spin_unlock(&drbd_pp_lock);
}
return NULL;
}
/* kick lower level device, if we have more than (arbitrary number)
......@@ -127,7 +221,7 @@ static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed
list_for_each_safe(le, tle, &mdev->net_ee) {
e = list_entry(le, struct drbd_epoch_entry, w.list);
if (drbd_bio_has_active_page(e->private_bio))
if (drbd_ee_has_active_page(e))
break;
list_move(le, to_be_freed);
}
......@@ -148,32 +242,34 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
}
/**
* drbd_pp_alloc() - Returns a page, fails only if a signal comes in
* drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
* @mdev: DRBD device.
* @retry: whether or not to retry allocation forever (or until signalled)
* @number: number of pages requested
* @retry: whether to retry, if not enough pages are available right now
*
* Tries to allocate number pages, first from our own page pool, then from
* the kernel, unless this allocation would exceed the max_buffers setting.
* Possibly retry until DRBD frees sufficient pages somewhere else.
*
* Tries to allocate a page, first from our own page pool, then from the
* kernel, unless this allocation would exceed the max_buffers setting.
* If @retry is non-zero, retry until DRBD frees a page somewhere else.
* Returns a page chain linked via page->private.
*/
static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
{
struct page *page = NULL;
DEFINE_WAIT(wait);
if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
page = drbd_pp_first_page_or_try_alloc(mdev);
if (page)
return page;
}
/* Yes, we may run up to @number over max_buffers. If we
* follow it strictly, the admin will get it wrong anyways. */
if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
page = drbd_pp_first_pages_or_try_alloc(mdev, number);
for (;;) {
while (page == NULL) {
prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
drbd_kick_lo_and_reclaim_net(mdev);
if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
page = drbd_pp_first_page_or_try_alloc(mdev);
page = drbd_pp_first_pages_or_try_alloc(mdev, number);
if (page)
break;
}
......@@ -190,62 +286,32 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
}
finish_wait(&drbd_pp_wait, &wait);
if (page)
atomic_add(number, &mdev->pp_in_use);
return page;
}
/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
* Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
* Is also used from inside an other spin_lock_irq(&mdev->req_lock);
* Either links the page chain back to the global pool,
* or returns all pages to the system. */
static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
{
int free_it;
spin_lock(&drbd_pp_lock);
if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
free_it = 1;
} else {
set_page_private(page, (unsigned long)drbd_pp_pool);
drbd_pp_pool = page;
drbd_pp_vacant++;
free_it = 0;
}
spin_unlock(&drbd_pp_lock);
atomic_dec(&mdev->pp_in_use);
if (free_it)
__free_page(page);
wake_up(&drbd_pp_wait);
}
static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
{
struct page *p_to_be_freed = NULL;
struct page *page;
struct bio_vec *bvec;
int i;
spin_lock(&drbd_pp_lock);
__bio_for_each_segment(bvec, bio, i, 0) {
if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
p_to_be_freed = bvec->bv_page;
} else {
set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
drbd_pp_pool = bvec->bv_page;
drbd_pp_vacant++;
}
}
spin_unlock(&drbd_pp_lock);
atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
while (p_to_be_freed) {
page = p_to_be_freed;
p_to_be_freed = (struct page *)page_private(page);
set_page_private(page, 0); /* just to be polite */
put_page(page);
if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
i = page_chain_free(page);
else {
struct page *tmp;
tmp = page_chain_tail(page, &i);
spin_lock(&drbd_pp_lock);
page_chain_add(&drbd_pp_pool, page, tmp);
drbd_pp_vacant += i;
spin_unlock(&drbd_pp_lock);
}
atomic_sub(i, &mdev->pp_in_use);
i = atomic_read(&mdev->pp_in_use);
if (i < 0)
dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
wake_up(&drbd_pp_wait);
}
......@@ -270,11 +336,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
unsigned int data_size,
gfp_t gfp_mask) __must_hold(local)
{
struct request_queue *q;
struct drbd_epoch_entry *e;
struct page *page;
struct bio *bio;
unsigned int ds;
unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
return NULL;
......@@ -286,84 +350,32 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
return NULL;
}
bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
if (!bio) {
if (!(gfp_mask & __GFP_NOWARN))
dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
goto fail1;
}
bio->bi_bdev = mdev->ldev->backing_bdev;
bio->bi_sector = sector;
ds = data_size;
while (ds) {
page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
if (!page) {
if (!(gfp_mask & __GFP_NOWARN))
dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
goto fail2;
}
if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
drbd_pp_free(mdev, page);
dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
"data_size=%u,ds=%u) failed\n",
(unsigned long long)sector, data_size, ds);
q = bdev_get_queue(bio->bi_bdev);
if (q->merge_bvec_fn) {
struct bvec_merge_data bvm = {
.bi_bdev = bio->bi_bdev,
.bi_sector = bio->bi_sector,
.bi_size = bio->bi_size,
.bi_rw = bio->bi_rw,
};
int l = q->merge_bvec_fn(q, &bvm,
&bio->bi_io_vec[bio->bi_vcnt]);
dev_err(DEV, "merge_bvec_fn() = %d\n", l);
}
/* dump more of the bio. */
dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
goto fail2;
break;
}
ds -= min_t(int, ds, PAGE_SIZE);
}
D_ASSERT(data_size == bio->bi_size);
bio->bi_private = e;
e->mdev = mdev;
e->sector = sector;
e->size = bio->bi_size;
page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
if (!page)
goto fail;
e->private_bio = bio;
e->block_id = id;
INIT_HLIST_NODE(&e->colision);
e->epoch = NULL;
e->mdev = mdev;
e->pages = page;
atomic_set(&e->pending_bios, 0);
e->size = data_size;
e->flags = 0;
e->sector = sector;
e->sector = sector;
e->block_id = id;
return e;
fail2:
drbd_pp_free_bio_pages(mdev, bio);
bio_put(bio);
fail1:
fail:
mempool_free(e, drbd_ee_mempool);
return NULL;
}
void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
{
struct bio *bio = e->private_bio;
drbd_pp_free_bio_pages(mdev, bio);
bio_put(bio);
drbd_pp_free(mdev, e->pages);
D_ASSERT(atomic_read(&e->pending_bios) == 0);
D_ASSERT(hlist_unhashed(&e->colision));
mempool_free(e, drbd_ee_mempool);
}
......@@ -1120,6 +1132,90 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
}
/**
* drbd_submit_ee()
* @mdev: DRBD device.
* @e: epoch entry
* @rw: flag field, see bio->bi_rw
*/
/* TODO allocate from our own bio_set. */
int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
const unsigned rw, const int fault_type)
{
struct bio *bios = NULL;
struct bio *bio;
struct page *page = e->pages;
sector_t sector = e->sector;
unsigned ds = e->size;
unsigned n_bios = 0;
unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
/* In most cases, we will only need one bio. But in case the lower
* level restrictions happen to be different at this offset on this
* side than those of the sending peer, we may need to submit the
* request in more than one bio. */
next_bio:
bio = bio_alloc(GFP_NOIO, nr_pages);
if (!bio) {
dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
goto fail;
}
/* > e->sector, unless this is the first bio */
bio->bi_sector = sector;
bio->bi_bdev = mdev->ldev->backing_bdev;
/* we special case some flags in the multi-bio case, see below
* (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
bio->bi_rw = rw;
bio->bi_private = e;
bio->bi_end_io = drbd_endio_sec;
bio->bi_next = bios;
bios = bio;
++n_bios;
page_chain_for_each(page) {
unsigned len = min_t(unsigned, ds, PAGE_SIZE);
if (!bio_add_page(bio, page, len, 0)) {
/* a single page must always be possible! */
BUG_ON(bio->bi_vcnt == 0);
goto next_bio;
}
ds -= len;
sector += len >> 9;
--nr_pages;
}
D_ASSERT(page == NULL);
D_ASSERT(ds == 0);
atomic_set(&e->pending_bios, n_bios);
do {
bio = bios;
bios = bios->bi_next;
bio->bi_next = NULL;
/* strip off BIO_RW_UNPLUG unless it is the last bio */
if (bios)
bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
drbd_generic_make_request(mdev, fault_type, bio);
/* strip off BIO_RW_BARRIER,
* unless it is the first or last bio */
if (bios && bios->bi_next)
bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
} while (bios);
maybe_kick_lo(mdev);
return 0;
fail:
while (bios) {
bio = bios;
bios = bios->bi_next;
bio_put(bio);
}
return -ENOMEM;
}
/**
* w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
* @mdev: DRBD device.
......@@ -1129,8 +1225,6 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
{
struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
struct bio *bio = e->private_bio;
/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
(and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
so that we can finish that epoch in drbd_may_finish_epoch().
......@@ -1144,33 +1238,17 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea
if (previous_epoch(mdev, e->epoch))
dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
/* prepare bio for re-submit,
* re-init volatile members */
/* we still have a local reference,
* get_ldev was done in receive_Data. */
bio->bi_bdev = mdev->ldev->backing_bdev;
bio->bi_sector = e->sector;
bio->bi_size = e->size;
bio->bi_idx = 0;
bio->bi_flags &= ~(BIO_POOL_MASK - 1);
bio->bi_flags |= 1 << BIO_UPTODATE;
/* don't know whether this is necessary: */
bio->bi_phys_segments = 0;
bio->bi_next = NULL;
/* these should be unchanged: */
/* bio->bi_end_io = drbd_endio_write_sec; */
/* bio->bi_vcnt = whatever; */
e->w.cb = e_end_block;
/* This is no longer a barrier request. */
bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
/* drbd_submit_ee fails for one reason only:
* if was not able to allocate sufficient bios.
* requeue, try again later. */
e->w.cb = w_e_reissue;
drbd_queue_work(&mdev->data.work, &e->w);
}
return 1;
}
......@@ -1264,10 +1342,8 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
{
const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
struct drbd_epoch_entry *e;
struct bio_vec *bvec;
struct page *page;
struct bio *bio;
int dgs, ds, i, rr;
int dgs, ds, rr;
void *dig_in = mdev->int_dig_in;
void *dig_vv = mdev->int_dig_vv;
unsigned long *data;
......@@ -1304,28 +1380,29 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
if (!e)
return NULL;
bio = e->private_bio;
ds = data_size;
bio_for_each_segment(bvec, bio, i) {
page = bvec->bv_page;
page = e->pages;
page_chain_for_each(page) {
unsigned len = min_t(int, ds, PAGE_SIZE);
data = kmap(page);
rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE));
rr = drbd_recv(mdev, data, len);
if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
dev_err(DEV, "Fault injection: Corrupting data on receive\n");
data[0] = data[0] ^ (unsigned long)-1;
}
kunmap(page);
if (rr != min_t(int, ds, PAGE_SIZE)) {
if (rr != len) {
drbd_free_ee(mdev, e);
dev_warn(DEV, "short read receiving data: read %d expected %d\n",
rr, min_t(int, ds, PAGE_SIZE));
rr, len);
return NULL;
}
ds -= rr;
}
if (dgs) {
drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
if (memcmp(dig_in, dig_vv, dgs)) {
dev_err(DEV, "Digest integrity check FAILED.\n");
drbd_bcast_ee(mdev, "digest failed",
......@@ -1350,7 +1427,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
if (!data_size)
return TRUE;
page = drbd_pp_alloc(mdev, 1);
page = drbd_pp_alloc(mdev, 1, 1);
data = kmap(page);
while (data_size) {
......@@ -1414,7 +1491,7 @@ static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
}
if (dgs) {
drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
if (memcmp(dig_in, dig_vv, dgs)) {
dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
return 0;
......@@ -1435,7 +1512,7 @@ static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int u
D_ASSERT(hlist_unhashed(&e->colision));
if (likely(drbd_bio_uptodate(e->private_bio))) {
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
drbd_set_in_sync(mdev, sector, e->size);
ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
} else {
......@@ -1454,30 +1531,28 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si
struct drbd_epoch_entry *e;
e = read_in_block(mdev, ID_SYNCER, sector, data_size);
if (!e) {
put_ldev(mdev);
return FALSE;
}
if (!e)
goto fail;
dec_rs_pending(mdev);
e->private_bio->bi_end_io = drbd_endio_write_sec;
e->private_bio->bi_rw = WRITE;
e->w.cb = e_end_resync_block;
inc_unacked(mdev);
/* corresponding dec_unacked() in e_end_resync_block()
* respective _drbd_clear_done_ee */
e->w.cb = e_end_resync_block;
spin_lock_irq(&mdev->req_lock);
list_add(&e->w.list, &mdev->sync_ee);
spin_unlock_irq(&mdev->req_lock);
drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
/* accounting done in endio */
if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
return TRUE;
maybe_kick_lo(mdev);
return TRUE;
drbd_free_ee(mdev, e);
fail:
put_ldev(mdev);
return FALSE;
}
static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
......@@ -1572,7 +1647,7 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
}
if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
if (likely(drbd_bio_uptodate(e->private_bio))) {
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
mdev->state.conn <= C_PAUSED_SYNC_T &&
e->flags & EE_MAY_SET_IN_SYNC) ?
......@@ -1718,7 +1793,6 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
return FALSE;
}
e->private_bio->bi_end_io = drbd_endio_write_sec;
e->w.cb = e_end_block;
spin_lock(&mdev->epoch_lock);
......@@ -1914,12 +1988,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
drbd_al_begin_io(mdev, e->sector);
}
e->private_bio->bi_rw = rw;
drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
/* accounting done in endio */
maybe_kick_lo(mdev);
return TRUE;
if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
return TRUE;
out_interrupted:
/* yes, the epoch_size now is imbalanced.
......@@ -1977,9 +2047,6 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
return FALSE;
}
e->private_bio->bi_rw = READ;
e->private_bio->bi_end_io = drbd_endio_read_sec;
switch (h->command) {
case P_DATA_REQUEST:
e->w.cb = w_e_end_data_req;
......@@ -2073,10 +2140,8 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
inc_unacked(mdev);
drbd_generic_make_request(mdev, fault_type, e->private_bio);
maybe_kick_lo(mdev);
return TRUE;
if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
return TRUE;
out_free_e:
kfree(di);
......@@ -3837,7 +3902,7 @@ static void drbd_disconnect(struct drbd_conf *mdev)
dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
i = atomic_read(&mdev->pp_in_use);
if (i)
dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
D_ASSERT(list_empty(&mdev->read_ee));
D_ASSERT(list_empty(&mdev->active_ee));
......
......@@ -47,8 +47,7 @@ static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int ca
/* defined here:
drbd_md_io_complete
drbd_endio_write_sec
drbd_endio_read_sec
drbd_endio_sec
drbd_endio_pri
* more endio handlers:
......@@ -85,27 +84,10 @@ void drbd_md_io_complete(struct bio *bio, int error)
/* reads on behalf of the partner,
* "submitted" by the receiver
*/
void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
{
unsigned long flags = 0;
struct drbd_epoch_entry *e = NULL;
struct drbd_conf *mdev;
int uptodate = bio_flagged(bio, BIO_UPTODATE);
e = bio->bi_private;
mdev = e->mdev;
if (error)
dev_warn(DEV, "read: error=%d s=%llus\n", error,
(unsigned long long)e->sector);
if (!error && !uptodate) {
dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
(unsigned long long)e->sector);
/* strange behavior of some lower level drivers...
* fail the request by clearing the uptodate flag,
* but do not return any error?! */
error = -EIO;
}
struct drbd_conf *mdev = e->mdev;
D_ASSERT(e->block_id != ID_VACANT);
......@@ -114,49 +96,38 @@ void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
list_del(&e->w.list);
if (list_empty(&mdev->read_ee))
wake_up(&mdev->ee_wait);
if (test_bit(__EE_WAS_ERROR, &e->flags))
__drbd_chk_io_error(mdev, FALSE);
spin_unlock_irqrestore(&mdev->req_lock, flags);
drbd_chk_io_error(mdev, error, FALSE);
drbd_queue_work(&mdev->data.work, &e->w);
put_ldev(mdev);
}
static int is_failed_barrier(int ee_flags)
{
return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
== (EE_IS_BARRIER|EE_WAS_ERROR);
}
/* writes on behalf of the partner, or resync writes,
* "submitted" by the receiver.
*/
void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
* "submitted" by the receiver, final stage. */
static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
{
unsigned long flags = 0;
struct drbd_epoch_entry *e = NULL;
struct drbd_conf *mdev;
struct drbd_conf *mdev = e->mdev;
sector_t e_sector;
int do_wake;
int is_syncer_req;
int do_al_complete_io;
int uptodate = bio_flagged(bio, BIO_UPTODATE);
int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
e = bio->bi_private;
mdev = e->mdev;
if (error)
dev_warn(DEV, "write: error=%d s=%llus\n", error,
(unsigned long long)e->sector);
if (!error && !uptodate) {
dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
(unsigned long long)e->sector);
/* strange behavior of some lower level drivers...
* fail the request by clearing the uptodate flag,
* but do not return any error?! */
error = -EIO;
}
/* error == -ENOTSUPP would be a better test,
* alas it is not reliable */
if (error && is_barrier && e->flags & EE_IS_BARRIER) {
/* if this is a failed barrier request, disable use of barriers,
* and schedule for resubmission */
if (is_failed_barrier(e->flags)) {
drbd_bump_write_ordering(mdev, WO_bdev_flush);
spin_lock_irqsave(&mdev->req_lock, flags);
list_del(&e->w.list);
e->flags |= EE_RESUBMITTED;
e->w.cb = w_e_reissue;
/* put_ldev actually happens below, once we come here again. */
__release(local);
......@@ -167,17 +138,16 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
D_ASSERT(e->block_id != ID_VACANT);
spin_lock_irqsave(&mdev->req_lock, flags);
mdev->writ_cnt += e->size >> 9;
is_syncer_req = is_syncer_block_id(e->block_id);
/* after we moved e to done_ee,
* we may no longer access it,
* it may be freed/reused already!
* (as soon as we release the req_lock) */
e_sector = e->sector;
do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
is_syncer_req = is_syncer_block_id(e->block_id);
spin_lock_irqsave(&mdev->req_lock, flags);
mdev->writ_cnt += e->size >> 9;
list_del(&e->w.list); /* has been on active_ee or sync_ee */
list_add_tail(&e->w.list, &mdev->done_ee);
......@@ -190,7 +160,7 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
? list_empty(&mdev->sync_ee)
: list_empty(&mdev->active_ee);
if (error)
if (test_bit(__EE_WAS_ERROR, &e->flags))
__drbd_chk_io_error(mdev, FALSE);
spin_unlock_irqrestore(&mdev->req_lock, flags);
......@@ -205,7 +175,42 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
wake_asender(mdev);
put_ldev(mdev);
}
/* writes on behalf of the partner, or resync writes,
* "submitted" by the receiver.
*/
void drbd_endio_sec(struct bio *bio, int error)
{
struct drbd_epoch_entry *e = bio->bi_private;
struct drbd_conf *mdev = e->mdev;
int uptodate = bio_flagged(bio, BIO_UPTODATE);
int is_write = bio_data_dir(bio) == WRITE;
if (error)
dev_warn(DEV, "%s: error=%d s=%llus\n",
is_write ? "write" : "read", error,
(unsigned long long)e->sector);
if (!error && !uptodate) {
dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
is_write ? "write" : "read",
(unsigned long long)e->sector);
/* strange behavior of some lower level drivers...
* fail the request by clearing the uptodate flag,
* but do not return any error?! */
error = -EIO;
}
if (error)
set_bit(__EE_WAS_ERROR, &e->flags);
bio_put(bio); /* no need for the bio anymore */
if (atomic_dec_and_test(&e->pending_bios)) {
if (is_write)
drbd_endio_write_sec_final(e);
else
drbd_endio_read_sec_final(e);
}
}
/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
......@@ -295,7 +300,34 @@ int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
return 1; /* Simply ignore this! */
}
void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
{
struct hash_desc desc;
struct scatterlist sg;
struct page *page = e->pages;
struct page *tmp;
unsigned len;
desc.tfm = tfm;
desc.flags = 0;
sg_init_table(&sg, 1);
crypto_hash_init(&desc);
while ((tmp = page_chain_next(page))) {
/* all but the last page will be fully used */
sg_set_page(&sg, page, PAGE_SIZE, 0);
crypto_hash_update(&desc, &sg, sg.length);
page = tmp;
}
/* and now the last, possibly only partially used page */
len = e->size & (PAGE_SIZE - 1);
sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
crypto_hash_update(&desc, &sg, sg.length);
crypto_hash_final(&desc, digest);
}
void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
{
struct hash_desc desc;
struct scatterlist sg;
......@@ -329,11 +361,11 @@ static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel
return 1;
}
if (likely(drbd_bio_uptodate(e->private_bio))) {
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
digest_size = crypto_hash_digestsize(mdev->csums_tfm);
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev,
......@@ -369,23 +401,21 @@ static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
/* GFP_TRY, because if there is no memory available right now, this may
* be rescheduled for later. It is "only" background resync, after all. */
e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
if (!e) {
put_ldev(mdev);
return 2;
}
if (!e)
goto fail;
spin_lock_irq(&mdev->req_lock);
list_add(&e->w.list, &mdev->read_ee);
spin_unlock_irq(&mdev->req_lock);
e->private_bio->bi_end_io = drbd_endio_read_sec;
e->private_bio->bi_rw = READ;
e->w.cb = w_e_send_csum;
if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
return 1;
mdev->read_cnt += size >> 9;
drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
return 1;
drbd_free_ee(mdev, e);
fail:
put_ldev(mdev);
return 2;
}
void resync_timer_fn(unsigned long data)
......@@ -819,7 +849,7 @@ int drbd_resync_finished(struct drbd_conf *mdev)
/* helper */
static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
{
if (drbd_bio_has_active_page(e->private_bio)) {
if (drbd_ee_has_active_page(e)) {
/* This might happen if sendpage() has not finished */
spin_lock_irq(&mdev->req_lock);
list_add_tail(&e->w.list, &mdev->net_ee);
......@@ -845,7 +875,7 @@ int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
return 1;
}
if (likely(drbd_bio_uptodate(e->private_bio))) {
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
ok = drbd_send_block(mdev, P_DATA_REPLY, e);
} else {
if (__ratelimit(&drbd_ratelimit_state))
......@@ -886,7 +916,7 @@ int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
put_ldev(mdev);
}
if (likely(drbd_bio_uptodate(e->private_bio))) {
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
inc_rs_pending(mdev);
ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
......@@ -934,7 +964,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
di = (struct digest_info *)(unsigned long)e->block_id;
if (likely(drbd_bio_uptodate(e->private_bio))) {
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
/* quick hack to try to avoid a race against reconfiguration.
* a real fix would be much more involved,
* introducing more locking mechanisms */
......@@ -944,7 +974,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
digest = kmalloc(digest_size, GFP_NOIO);
}
if (digest) {
drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
eq = !memcmp(digest, di->digest, digest_size);
kfree(digest);
}
......@@ -986,14 +1016,14 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
if (unlikely(cancel))
goto out;
if (unlikely(!drbd_bio_uptodate(e->private_bio)))
if (unlikely((e->flags & EE_WAS_ERROR) != 0))
goto out;
digest_size = crypto_hash_digestsize(mdev->verify_tfm);
/* FIXME if this allocation fails, online verify will not terminate! */
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
digest, digest_size, P_OV_REPLY);
......@@ -1042,11 +1072,11 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
di = (struct digest_info *)(unsigned long)e->block_id;
if (likely(drbd_bio_uptodate(e->private_bio))) {
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
digest_size = crypto_hash_digestsize(mdev->verify_tfm);
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
D_ASSERT(digest_size == di->digest_size);
eq = !memcmp(digest, di->digest, digest_size);
......
......@@ -18,23 +18,9 @@ static inline void drbd_set_my_capacity(struct drbd_conf *mdev,
#define drbd_bio_uptodate(bio) bio_flagged(bio, BIO_UPTODATE)
static inline int drbd_bio_has_active_page(struct bio *bio)
{
struct bio_vec *bvec;
int i;
__bio_for_each_segment(bvec, bio, i, 0) {
if (page_count(bvec->bv_page) > 1)
return 1;
}
return 0;
}
/* bi_end_io handlers */
extern void drbd_md_io_complete(struct bio *bio, int error);
extern void drbd_endio_read_sec(struct bio *bio, int error);
extern void drbd_endio_write_sec(struct bio *bio, int error);
extern void drbd_endio_sec(struct bio *bio, int error);
extern void drbd_endio_pri(struct bio *bio, int error);
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment