Commit 41205539 authored by Chuck Lever's avatar Chuck Lever

nfsd: Fix NFSv4 READ on RDMA when using readv

svcrdma expects that the payload falls precisely into the xdr_buf
page vector. This does not seem to be the case for
nfsd4_encode_readv().

This code is called only when fops->splice_read is missing or when
RQ_SPLICE_OK is clear, so it's not a noticeable problem in many
common cases.

Add new transport method: ->xpo_read_payload so that when a READ
payload does not fit exactly in rq_res's page vector, the XDR
encoder can inform the RPC transport exactly where that payload is,
without the payload's XDR pad.

That way, when a Write chunk is present, the transport knows what
byte range in the Reply message is supposed to be matched with the
chunk.

Note that the Linux NFS server implementation of NFS/RDMA can
currently handle only one Write chunk per RPC-over-RDMA message.
This simplifies the implementation of this fix.

Fixes: b0420980 ("nfsd4: allow exotic read compounds")
Buglink: https://bugzilla.kernel.org/show_bug.cgi?id=198053Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 057a2274
...@@ -3594,17 +3594,17 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp, ...@@ -3594,17 +3594,17 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
u32 zzz = 0; u32 zzz = 0;
int pad; int pad;
/*
* svcrdma requires every READ payload to start somewhere
* in xdr->pages.
*/
if (xdr->iov == xdr->buf->head) {
xdr->iov = NULL;
xdr->end = xdr->p;
}
len = maxcount; len = maxcount;
v = 0; v = 0;
thislen = min_t(long, len, ((void *)xdr->end - (void *)xdr->p));
p = xdr_reserve_space(xdr, (thislen+3)&~3);
WARN_ON_ONCE(!p);
resp->rqstp->rq_vec[v].iov_base = p;
resp->rqstp->rq_vec[v].iov_len = thislen;
v++;
len -= thislen;
while (len) { while (len) {
thislen = min_t(long, len, PAGE_SIZE); thislen = min_t(long, len, PAGE_SIZE);
p = xdr_reserve_space(xdr, (thislen+3)&~3); p = xdr_reserve_space(xdr, (thislen+3)&~3);
...@@ -3623,6 +3623,8 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp, ...@@ -3623,6 +3623,8 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
read->rd_length = maxcount; read->rd_length = maxcount;
if (nfserr) if (nfserr)
return nfserr; return nfserr;
if (svc_encode_read_payload(resp->rqstp, starting_len + 8, maxcount))
return nfserr_io;
xdr_truncate_encode(xdr, starting_len + 8 + ((maxcount+3)&~3)); xdr_truncate_encode(xdr, starting_len + 8 + ((maxcount+3)&~3));
tmp = htonl(eof); tmp = htonl(eof);
......
...@@ -517,6 +517,9 @@ void svc_wake_up(struct svc_serv *); ...@@ -517,6 +517,9 @@ void svc_wake_up(struct svc_serv *);
void svc_reserve(struct svc_rqst *rqstp, int space); void svc_reserve(struct svc_rqst *rqstp, int space);
struct svc_pool * svc_pool_for_cpu(struct svc_serv *serv, int cpu); struct svc_pool * svc_pool_for_cpu(struct svc_serv *serv, int cpu);
char * svc_print_addr(struct svc_rqst *, char *, size_t); char * svc_print_addr(struct svc_rqst *, char *, size_t);
int svc_encode_read_payload(struct svc_rqst *rqstp,
unsigned int offset,
unsigned int length);
unsigned int svc_fill_write_vector(struct svc_rqst *rqstp, unsigned int svc_fill_write_vector(struct svc_rqst *rqstp,
struct page **pages, struct page **pages,
struct kvec *first, size_t total); struct kvec *first, size_t total);
......
...@@ -137,6 +137,8 @@ struct svc_rdma_recv_ctxt { ...@@ -137,6 +137,8 @@ struct svc_rdma_recv_ctxt {
unsigned int rc_page_count; unsigned int rc_page_count;
unsigned int rc_hdr_count; unsigned int rc_hdr_count;
u32 rc_inv_rkey; u32 rc_inv_rkey;
unsigned int rc_read_payload_offset;
unsigned int rc_read_payload_length;
struct page *rc_pages[RPCSVC_MAXPAGES]; struct page *rc_pages[RPCSVC_MAXPAGES];
}; };
...@@ -170,7 +172,9 @@ extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, ...@@ -170,7 +172,9 @@ extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp, struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head, __be32 *p); struct svc_rdma_recv_ctxt *head, __be32 *p);
extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
__be32 *wr_ch, struct xdr_buf *xdr); __be32 *wr_ch, struct xdr_buf *xdr,
unsigned int offset,
unsigned long length);
extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
__be32 *rp_ch, bool writelist, __be32 *rp_ch, bool writelist,
struct xdr_buf *xdr); struct xdr_buf *xdr);
...@@ -189,6 +193,8 @@ extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, ...@@ -189,6 +193,8 @@ extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt, struct svc_rdma_send_ctxt *ctxt,
struct xdr_buf *xdr, __be32 *wr_lst); struct xdr_buf *xdr, __be32 *wr_lst);
extern int svc_rdma_sendto(struct svc_rqst *); extern int svc_rdma_sendto(struct svc_rqst *);
extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length);
/* svc_rdma_transport.c */ /* svc_rdma_transport.c */
extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *); extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
......
...@@ -21,6 +21,8 @@ struct svc_xprt_ops { ...@@ -21,6 +21,8 @@ struct svc_xprt_ops {
int (*xpo_has_wspace)(struct svc_xprt *); int (*xpo_has_wspace)(struct svc_xprt *);
int (*xpo_recvfrom)(struct svc_rqst *); int (*xpo_recvfrom)(struct svc_rqst *);
int (*xpo_sendto)(struct svc_rqst *); int (*xpo_sendto)(struct svc_rqst *);
int (*xpo_read_payload)(struct svc_rqst *, unsigned int,
unsigned int);
void (*xpo_release_rqst)(struct svc_rqst *); void (*xpo_release_rqst)(struct svc_rqst *);
void (*xpo_detach)(struct svc_xprt *); void (*xpo_detach)(struct svc_xprt *);
void (*xpo_free)(struct svc_xprt *); void (*xpo_free)(struct svc_xprt *);
......
...@@ -1636,6 +1636,22 @@ u32 svc_max_payload(const struct svc_rqst *rqstp) ...@@ -1636,6 +1636,22 @@ u32 svc_max_payload(const struct svc_rqst *rqstp)
} }
EXPORT_SYMBOL_GPL(svc_max_payload); EXPORT_SYMBOL_GPL(svc_max_payload);
/**
* svc_encode_read_payload - mark a range of bytes as a READ payload
* @rqstp: svc_rqst to operate on
* @offset: payload's byte offset in rqstp->rq_res
* @length: size of payload, in bytes
*
* Returns zero on success, or a negative errno if a permanent
* error occurred.
*/
int svc_encode_read_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length)
{
return rqstp->rq_xprt->xpt_ops->xpo_read_payload(rqstp, offset, length);
}
EXPORT_SYMBOL_GPL(svc_encode_read_payload);
/** /**
* svc_fill_write_vector - Construct data argument for VFS write call * svc_fill_write_vector - Construct data argument for VFS write call
* @rqstp: svc_rqst to operate on * @rqstp: svc_rqst to operate on
......
...@@ -279,6 +279,12 @@ static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) ...@@ -279,6 +279,12 @@ static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
return len; return len;
} }
static int svc_sock_read_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length)
{
return 0;
}
/* /*
* Report socket names for nfsdfs * Report socket names for nfsdfs
*/ */
...@@ -653,6 +659,7 @@ static const struct svc_xprt_ops svc_udp_ops = { ...@@ -653,6 +659,7 @@ static const struct svc_xprt_ops svc_udp_ops = {
.xpo_create = svc_udp_create, .xpo_create = svc_udp_create,
.xpo_recvfrom = svc_udp_recvfrom, .xpo_recvfrom = svc_udp_recvfrom,
.xpo_sendto = svc_udp_sendto, .xpo_sendto = svc_udp_sendto,
.xpo_read_payload = svc_sock_read_payload,
.xpo_release_rqst = svc_release_udp_skb, .xpo_release_rqst = svc_release_udp_skb,
.xpo_detach = svc_sock_detach, .xpo_detach = svc_sock_detach,
.xpo_free = svc_sock_free, .xpo_free = svc_sock_free,
...@@ -1171,6 +1178,7 @@ static const struct svc_xprt_ops svc_tcp_ops = { ...@@ -1171,6 +1178,7 @@ static const struct svc_xprt_ops svc_tcp_ops = {
.xpo_create = svc_tcp_create, .xpo_create = svc_tcp_create,
.xpo_recvfrom = svc_tcp_recvfrom, .xpo_recvfrom = svc_tcp_recvfrom,
.xpo_sendto = svc_tcp_sendto, .xpo_sendto = svc_tcp_sendto,
.xpo_read_payload = svc_sock_read_payload,
.xpo_release_rqst = svc_release_skb, .xpo_release_rqst = svc_release_skb,
.xpo_detach = svc_tcp_sock_detach, .xpo_detach = svc_tcp_sock_detach,
.xpo_free = svc_sock_free, .xpo_free = svc_sock_free,
......
...@@ -193,6 +193,7 @@ svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) ...@@ -193,6 +193,7 @@ svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
out: out:
ctxt->rc_page_count = 0; ctxt->rc_page_count = 0;
ctxt->rc_read_payload_length = 0;
return ctxt; return ctxt;
out_empty: out_empty:
......
...@@ -482,18 +482,19 @@ static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info, ...@@ -482,18 +482,19 @@ static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
vec->iov_len); vec->iov_len);
} }
/* Send an xdr_buf's page list by itself. A Write chunk is /* Send an xdr_buf's page list by itself. A Write chunk is just
* just the page list. a Reply chunk is the head, page list, * the page list. A Reply chunk is @xdr's head, page list, and
* and tail. This function is shared between the two types * tail. This function is shared between the two types of chunk.
* of chunk.
*/ */
static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
struct xdr_buf *xdr) struct xdr_buf *xdr,
unsigned int offset,
unsigned long length)
{ {
info->wi_xdr = xdr; info->wi_xdr = xdr;
info->wi_next_off = 0; info->wi_next_off = offset - xdr->head[0].iov_len;
return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
xdr->page_len); length);
} }
/** /**
...@@ -501,6 +502,8 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, ...@@ -501,6 +502,8 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
* @rdma: controlling RDMA transport * @rdma: controlling RDMA transport
* @wr_ch: Write chunk provided by client * @wr_ch: Write chunk provided by client
* @xdr: xdr_buf containing the data payload * @xdr: xdr_buf containing the data payload
* @offset: payload's byte offset in @xdr
* @length: size of payload, in bytes
* *
* Returns a non-negative number of bytes the chunk consumed, or * Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Write chunk, * %-E2BIG if the payload was larger than the Write chunk,
...@@ -510,19 +513,20 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, ...@@ -510,19 +513,20 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
* %-EIO if rdma_rw initialization failed (DMA mapping, etc). * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/ */
int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
struct xdr_buf *xdr) struct xdr_buf *xdr,
unsigned int offset, unsigned long length)
{ {
struct svc_rdma_write_info *info; struct svc_rdma_write_info *info;
int ret; int ret;
if (!xdr->page_len) if (!length)
return 0; return 0;
info = svc_rdma_write_info_alloc(rdma, wr_ch); info = svc_rdma_write_info_alloc(rdma, wr_ch);
if (!info) if (!info)
return -ENOMEM; return -ENOMEM;
ret = svc_rdma_send_xdr_pagelist(info, xdr); ret = svc_rdma_send_xdr_pagelist(info, xdr, offset, length);
if (ret < 0) if (ret < 0)
goto out_err; goto out_err;
...@@ -531,7 +535,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, ...@@ -531,7 +535,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
goto out_err; goto out_err;
trace_svcrdma_encode_write(xdr->page_len); trace_svcrdma_encode_write(xdr->page_len);
return xdr->page_len; return length;
out_err: out_err:
svc_rdma_write_info_free(info); svc_rdma_write_info_free(info);
...@@ -571,7 +575,9 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch, ...@@ -571,7 +575,9 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
* client did not provide Write chunks. * client did not provide Write chunks.
*/ */
if (!writelist && xdr->page_len) { if (!writelist && xdr->page_len) {
ret = svc_rdma_send_xdr_pagelist(info, xdr); ret = svc_rdma_send_xdr_pagelist(info, xdr,
xdr->head[0].iov_len,
xdr->page_len);
if (ret < 0) if (ret < 0)
goto out_err; goto out_err;
consumed += xdr->page_len; consumed += xdr->page_len;
......
...@@ -858,7 +858,18 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -858,7 +858,18 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (wr_lst) { if (wr_lst) {
/* XXX: Presume the client sent only one Write chunk */ /* XXX: Presume the client sent only one Write chunk */
ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr); unsigned long offset;
unsigned int length;
if (rctxt->rc_read_payload_length) {
offset = rctxt->rc_read_payload_offset;
length = rctxt->rc_read_payload_length;
} else {
offset = xdr->head[0].iov_len;
length = xdr->page_len;
}
ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
length);
if (ret < 0) if (ret < 0)
goto err2; goto err2;
svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
...@@ -900,3 +911,30 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -900,3 +911,30 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
ret = -ENOTCONN; ret = -ENOTCONN;
goto out; goto out;
} }
/**
* svc_rdma_read_payload - special processing for a READ payload
* @rqstp: svc_rqst to operate on
* @offset: payload's byte offset in @xdr
* @length: size of payload, in bytes
*
* Returns zero on success.
*
* For the moment, just record the xdr_buf location of the READ
* payload. svc_rdma_sendto will use that location later when
* we actually send the payload.
*/
int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length)
{
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
/* XXX: Just one READ payload slot for now, since our
* transport implementation currently supports only one
* Write chunk.
*/
rctxt->rc_read_payload_offset = offset;
rctxt->rc_read_payload_length = length;
return 0;
}
...@@ -82,6 +82,7 @@ static const struct svc_xprt_ops svc_rdma_ops = { ...@@ -82,6 +82,7 @@ static const struct svc_xprt_ops svc_rdma_ops = {
.xpo_create = svc_rdma_create, .xpo_create = svc_rdma_create,
.xpo_recvfrom = svc_rdma_recvfrom, .xpo_recvfrom = svc_rdma_recvfrom,
.xpo_sendto = svc_rdma_sendto, .xpo_sendto = svc_rdma_sendto,
.xpo_read_payload = svc_rdma_read_payload,
.xpo_release_rqst = svc_rdma_release_rqst, .xpo_release_rqst = svc_rdma_release_rqst,
.xpo_detach = svc_rdma_detach, .xpo_detach = svc_rdma_detach,
.xpo_free = svc_rdma_free, .xpo_free = svc_rdma_free,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment