Commit 41c8f70f authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Harden backchannel call decoding

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 96f8778f
...@@ -239,6 +239,19 @@ extern unsigned int xdr_read_pages(struct xdr_stream *xdr, unsigned int len); ...@@ -239,6 +239,19 @@ extern unsigned int xdr_read_pages(struct xdr_stream *xdr, unsigned int len);
extern void xdr_enter_page(struct xdr_stream *xdr, unsigned int len); extern void xdr_enter_page(struct xdr_stream *xdr, unsigned int len);
extern int xdr_process_buf(struct xdr_buf *buf, unsigned int offset, unsigned int len, int (*actor)(struct scatterlist *, void *), void *data); extern int xdr_process_buf(struct xdr_buf *buf, unsigned int offset, unsigned int len, int (*actor)(struct scatterlist *, void *), void *data);
/**
* xdr_stream_remaining - Return the number of bytes remaining in the stream
* @xdr: pointer to struct xdr_stream
*
* Return value:
* Number of bytes remaining in @xdr before xdr->end
*/
static inline size_t
xdr_stream_remaining(const struct xdr_stream *xdr)
{
return xdr->nwords << 2;
}
ssize_t xdr_stream_decode_string_dup(struct xdr_stream *xdr, char **str, ssize_t xdr_stream_decode_string_dup(struct xdr_stream *xdr, char **str,
size_t maxlen, gfp_t gfp_flags); size_t maxlen, gfp_t gfp_flags);
/** /**
......
...@@ -271,9 +271,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) ...@@ -271,9 +271,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
* @xprt: transport receiving the call * @xprt: transport receiving the call
* @rep: receive buffer containing the call * @rep: receive buffer containing the call
* *
* Called in the RPC reply handler, which runs in a tasklet.
* Be quick about it.
*
* Operational assumptions: * Operational assumptions:
* o Backchannel credits are ignored, just as the NFS server * o Backchannel credits are ignored, just as the NFS server
* forechannel currently does * forechannel currently does
...@@ -284,7 +281,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, ...@@ -284,7 +281,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_rep *rep) struct rpcrdma_rep *rep)
{ {
struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct rpcrdma_msg *headerp;
struct svc_serv *bc_serv; struct svc_serv *bc_serv;
struct rpcrdma_req *req; struct rpcrdma_req *req;
struct rpc_rqst *rqst; struct rpc_rqst *rqst;
...@@ -292,24 +288,15 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, ...@@ -292,24 +288,15 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
size_t size; size_t size;
__be32 *p; __be32 *p;
headerp = rdmab_to_msg(rep->rr_rdmabuf); p = xdr_inline_decode(&rep->rr_stream, 0);
size = xdr_stream_remaining(&rep->rr_stream);
#ifdef RPCRDMA_BACKCHANNEL_DEBUG #ifdef RPCRDMA_BACKCHANNEL_DEBUG
pr_info("RPC: %s: callback XID %08x, length=%u\n", pr_info("RPC: %s: callback XID %08x, length=%u\n",
__func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); __func__, be32_to_cpup(p), size);
pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp); pr_info("RPC: %s: %*ph\n", __func__, size, p);
#endif #endif
/* Sanity check:
* Need at least enough bytes for RPC/RDMA header, as code
* here references the header fields by array offset. Also,
* backward calls are always inline, so ensure there
* are some bytes beyond the RPC/RDMA header.
*/
if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
goto out_short;
p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
/* Grab a free bc rqst */ /* Grab a free bc rqst */
spin_lock(&xprt->bc_pa_lock); spin_lock(&xprt->bc_pa_lock);
if (list_empty(&xprt->bc_pa_list)) { if (list_empty(&xprt->bc_pa_list)) {
...@@ -325,7 +312,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, ...@@ -325,7 +312,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
/* Prepare rqst */ /* Prepare rqst */
rqst->rq_reply_bytes_recvd = 0; rqst->rq_reply_bytes_recvd = 0;
rqst->rq_bytes_sent = 0; rqst->rq_bytes_sent = 0;
rqst->rq_xid = headerp->rm_xid; rqst->rq_xid = *p;
rqst->rq_private_buf.len = size; rqst->rq_private_buf.len = size;
set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
...@@ -337,9 +324,9 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, ...@@ -337,9 +324,9 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
buf->len = size; buf->len = size;
/* The receive buffer has to be hooked to the rpcrdma_req /* The receive buffer has to be hooked to the rpcrdma_req
* so that it can be reposted after the server is done * so that it is not released while the req is pointing
* parsing it but just before sending the backward * to its buffer, and so that it can be reposted after
* direction reply. * the Upper Layer is done decoding it.
*/ */
req = rpcr_to_rdmar(rqst); req = rpcr_to_rdmar(rqst);
dprintk("RPC: %s: attaching rep %p to req %p\n", dprintk("RPC: %s: attaching rep %p to req %p\n",
...@@ -367,13 +354,4 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, ...@@ -367,13 +354,4 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
* when the connection is re-established. * when the connection is re-established.
*/ */
return; return;
out_short:
pr_warn("RPC/RDMA short backward direction call\n");
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
xprt_disconnect_done(xprt);
else
pr_warn("RPC: %s: reposting rep %p\n",
__func__, rep);
} }
...@@ -949,35 +949,59 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws, ...@@ -949,35 +949,59 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws,
} }
} }
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
/* By convention, backchannel calls arrive via rdma_msg type /* By convention, backchannel calls arrive via rdma_msg type
* messages, and never populate the chunk lists. This makes * messages, and never populate the chunk lists. This makes
* the RPC/RDMA header small and fixed in size, so it is * the RPC/RDMA header small and fixed in size, so it is
* straightforward to check the RPC header's direction field. * straightforward to check the RPC header's direction field.
*/ */
static bool static bool
rpcrdma_is_bcall(struct rpcrdma_msg *headerp) rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
__be32 xid, __be32 proc)
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
{ {
__be32 *p = (__be32 *)headerp; struct xdr_stream *xdr = &rep->rr_stream;
__be32 *p;
if (headerp->rm_type != rdma_msg) if (proc != rdma_msg)
return false; return false;
if (headerp->rm_body.rm_chunks[0] != xdr_zero)
/* Peek at stream contents without advancing. */
p = xdr_inline_decode(xdr, 0);
/* Chunk lists */
if (*p++ != xdr_zero)
return false; return false;
if (headerp->rm_body.rm_chunks[1] != xdr_zero) if (*p++ != xdr_zero)
return false; return false;
if (headerp->rm_body.rm_chunks[2] != xdr_zero) if (*p++ != xdr_zero)
return false; return false;
/* sanity */ /* RPC header */
if (p[7] != headerp->rm_xid) if (*p++ != xid)
return false; return false;
/* call direction */ if (*p != cpu_to_be32(RPC_CALL))
if (p[8] != cpu_to_be32(RPC_CALL))
return false; return false;
/* Now that we are sure this is a backchannel call,
* advance to the RPC header.
*/
p = xdr_inline_decode(xdr, 3 * sizeof(*p));
if (unlikely(!p))
goto out_short;
rpcrdma_bc_receive_call(r_xprt, rep);
return true;
out_short:
pr_warn("RPC/RDMA short backward direction call\n");
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
xprt_disconnect_done(&r_xprt->rx_xprt);
return true; return true;
} }
#else /* CONFIG_SUNRPC_BACKCHANNEL */
{
return false;
}
#endif /* CONFIG_SUNRPC_BACKCHANNEL */ #endif /* CONFIG_SUNRPC_BACKCHANNEL */
/* Process received RPC/RDMA messages. /* Process received RPC/RDMA messages.
...@@ -1020,10 +1044,8 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1020,10 +1044,8 @@ rpcrdma_reply_handler(struct work_struct *work)
proc = *p++; proc = *p++;
headerp = rdmab_to_msg(rep->rr_rdmabuf); headerp = rdmab_to_msg(rep->rr_rdmabuf);
#if defined(CONFIG_SUNRPC_BACKCHANNEL) if (rpcrdma_is_bcall(r_xprt, rep, xid, proc))
if (rpcrdma_is_bcall(headerp)) return;
goto out_bcall;
#endif
/* Match incoming rpcrdma_rep to an rpcrdma_req to /* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks. * get context for handling any incoming chunks.
...@@ -1159,12 +1181,6 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1159,12 +1181,6 @@ rpcrdma_reply_handler(struct work_struct *work)
} }
return; return;
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
out_bcall:
rpcrdma_bc_receive_call(r_xprt, rep);
return;
#endif
/* If the incoming reply terminated a pending RPC, the next /* If the incoming reply terminated a pending RPC, the next
* RPC call will post a replacement receive buffer as it is * RPC call will post a replacement receive buffer as it is
* being marshaled. * being marshaled.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment