Commit 9426bbc6 authored by Sowmini Varadhan's avatar Sowmini Varadhan Committed by David S. Miller

rds: use list structure to track information for zerocopy completion notification

Commit 401910db ("rds: deliver zerocopy completion notification
with data") removes support fo r zerocopy completion notification
on the sk_error_queue, thus we no longer need to track the cookie
information in sk_buff structures.

This commit removes the struct sk_buff_head rs_zcookie_queue by
a simpler list that results in a smaller memory footprint as well
as more efficient memory_allocation time.
Signed-off-by: default avatarSowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: default avatarWillem de Bruijn <willemb@google.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent d40a126b
...@@ -77,7 +77,7 @@ static int rds_release(struct socket *sock) ...@@ -77,7 +77,7 @@ static int rds_release(struct socket *sock)
rds_send_drop_to(rs, NULL); rds_send_drop_to(rs, NULL);
rds_rdma_drop_keys(rs); rds_rdma_drop_keys(rs);
rds_notify_queue_get(rs, NULL); rds_notify_queue_get(rs, NULL);
__skb_queue_purge(&rs->rs_zcookie_queue); rds_notify_msg_zcopy_purge(&rs->rs_zcookie_queue);
spin_lock_bh(&rds_sock_lock); spin_lock_bh(&rds_sock_lock);
list_del_init(&rs->rs_item); list_del_init(&rs->rs_item);
...@@ -180,7 +180,7 @@ static __poll_t rds_poll(struct file *file, struct socket *sock, ...@@ -180,7 +180,7 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
} }
if (!list_empty(&rs->rs_recv_queue) || if (!list_empty(&rs->rs_recv_queue) ||
!list_empty(&rs->rs_notify_queue) || !list_empty(&rs->rs_notify_queue) ||
!skb_queue_empty(&rs->rs_zcookie_queue)) !list_empty(&rs->rs_zcookie_queue.zcookie_head))
mask |= (EPOLLIN | EPOLLRDNORM); mask |= (EPOLLIN | EPOLLRDNORM);
if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
mask |= (EPOLLOUT | EPOLLWRNORM); mask |= (EPOLLOUT | EPOLLWRNORM);
...@@ -515,7 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) ...@@ -515,7 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
INIT_LIST_HEAD(&rs->rs_recv_queue); INIT_LIST_HEAD(&rs->rs_recv_queue);
INIT_LIST_HEAD(&rs->rs_notify_queue); INIT_LIST_HEAD(&rs->rs_notify_queue);
INIT_LIST_HEAD(&rs->rs_cong_list); INIT_LIST_HEAD(&rs->rs_cong_list);
skb_queue_head_init(&rs->rs_zcookie_queue); rds_message_zcopy_queue_init(&rs->rs_zcookie_queue);
spin_lock_init(&rs->rs_rdma_lock); spin_lock_init(&rs->rs_rdma_lock);
rs->rs_rdma_keys = RB_ROOT; rs->rs_rdma_keys = RB_ROOT;
rs->rs_rx_traces = 0; rs->rs_rx_traces = 0;
......
...@@ -48,7 +48,6 @@ static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = { ...@@ -48,7 +48,6 @@ static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = {
[RDS_EXTHDR_GEN_NUM] = sizeof(u32), [RDS_EXTHDR_GEN_NUM] = sizeof(u32),
}; };
void rds_message_addref(struct rds_message *rm) void rds_message_addref(struct rds_message *rm)
{ {
rdsdebug("addref rm %p ref %d\n", rm, refcount_read(&rm->m_refcount)); rdsdebug("addref rm %p ref %d\n", rm, refcount_read(&rm->m_refcount));
...@@ -56,9 +55,9 @@ void rds_message_addref(struct rds_message *rm) ...@@ -56,9 +55,9 @@ void rds_message_addref(struct rds_message *rm)
} }
EXPORT_SYMBOL_GPL(rds_message_addref); EXPORT_SYMBOL_GPL(rds_message_addref);
static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) static inline bool rds_zcookie_add(struct rds_msg_zcopy_info *info, u32 cookie)
{ {
struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb; struct rds_zcopy_cookies *ck = &info->zcookies;
int ncookies = ck->num; int ncookies = ck->num;
if (ncookies == RDS_MAX_ZCOOKIES) if (ncookies == RDS_MAX_ZCOOKIES)
...@@ -68,38 +67,61 @@ static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) ...@@ -68,38 +67,61 @@ static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
return true; return true;
} }
struct rds_msg_zcopy_info *rds_info_from_znotifier(struct rds_znotifier *znotif)
{
return container_of(znotif, struct rds_msg_zcopy_info, znotif);
}
void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *q)
{
unsigned long flags;
LIST_HEAD(copy);
struct rds_msg_zcopy_info *info, *tmp;
spin_lock_irqsave(&q->lock, flags);
list_splice(&q->zcookie_head, &copy);
INIT_LIST_HEAD(&q->zcookie_head);
spin_unlock_irqrestore(&q->lock, flags);
list_for_each_entry_safe(info, tmp, &copy, rs_zcookie_next) {
list_del(&info->rs_zcookie_next);
kfree(info);
}
}
static void rds_rm_zerocopy_callback(struct rds_sock *rs, static void rds_rm_zerocopy_callback(struct rds_sock *rs,
struct rds_znotifier *znotif) struct rds_znotifier *znotif)
{ {
struct sk_buff *skb, *tail; struct rds_msg_zcopy_info *info;
unsigned long flags; struct rds_msg_zcopy_queue *q;
struct sk_buff_head *q;
u32 cookie = znotif->z_cookie; u32 cookie = znotif->z_cookie;
struct rds_zcopy_cookies *ck; struct rds_zcopy_cookies *ck;
struct list_head *head;
unsigned long flags;
mm_unaccount_pinned_pages(&znotif->z_mmp);
q = &rs->rs_zcookie_queue; q = &rs->rs_zcookie_queue;
spin_lock_irqsave(&q->lock, flags); spin_lock_irqsave(&q->lock, flags);
tail = skb_peek_tail(q); head = &q->zcookie_head;
if (!list_empty(head)) {
if (tail && skb_zcookie_add(tail, cookie)) { info = list_entry(head, struct rds_msg_zcopy_info,
spin_unlock_irqrestore(&q->lock, flags); rs_zcookie_next);
mm_unaccount_pinned_pages(&znotif->z_mmp); if (info && rds_zcookie_add(info, cookie)) {
consume_skb(rds_skb_from_znotifier(znotif)); spin_unlock_irqrestore(&q->lock, flags);
/* caller invokes rds_wake_sk_sleep() */ kfree(rds_info_from_znotifier(znotif));
return; /* caller invokes rds_wake_sk_sleep() */
return;
}
} }
skb = rds_skb_from_znotifier(znotif); info = rds_info_from_znotifier(znotif);
ck = (struct rds_zcopy_cookies *)skb->cb; ck = &info->zcookies;
memset(ck, 0, sizeof(*ck)); memset(ck, 0, sizeof(*ck));
WARN_ON(!skb_zcookie_add(skb, cookie)); WARN_ON(!rds_zcookie_add(info, cookie));
list_add_tail(&q->zcookie_head, &info->rs_zcookie_next);
__skb_queue_tail(q, skb);
spin_unlock_irqrestore(&q->lock, flags); spin_unlock_irqrestore(&q->lock, flags);
/* caller invokes rds_wake_sk_sleep() */ /* caller invokes rds_wake_sk_sleep() */
mm_unaccount_pinned_pages(&znotif->z_mmp);
} }
/* /*
...@@ -340,7 +362,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from) ...@@ -340,7 +362,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
int ret = 0; int ret = 0;
int length = iov_iter_count(from); int length = iov_iter_count(from);
int total_copied = 0; int total_copied = 0;
struct sk_buff *skb; struct rds_msg_zcopy_info *info;
rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from)); rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
...@@ -350,12 +372,11 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from) ...@@ -350,12 +372,11 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
sg = rm->data.op_sg; sg = rm->data.op_sg;
sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */ sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
skb = alloc_skb(0, GFP_KERNEL); info = kzalloc(sizeof(*info), GFP_KERNEL);
if (!skb) if (!info)
return -ENOMEM; return -ENOMEM;
BUILD_BUG_ON(sizeof(skb->cb) < max_t(int, sizeof(struct rds_znotifier), INIT_LIST_HEAD(&info->rs_zcookie_next);
sizeof(struct rds_zcopy_cookies))); rm->data.op_mmp_znotifier = &info->znotif;
rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp, if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
length)) { length)) {
ret = -ENOMEM; ret = -ENOMEM;
...@@ -389,7 +410,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from) ...@@ -389,7 +410,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
WARN_ON_ONCE(length != 0); WARN_ON_ONCE(length != 0);
return ret; return ret;
err: err:
consume_skb(skb); kfree(info);
rm->data.op_mmp_znotifier = NULL; rm->data.op_mmp_znotifier = NULL;
return ret; return ret;
} }
......
...@@ -357,16 +357,27 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie) ...@@ -357,16 +357,27 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
#define RDS_MSG_FLUSH 8 #define RDS_MSG_FLUSH 8
struct rds_znotifier { struct rds_znotifier {
struct list_head z_list;
struct mmpin z_mmp; struct mmpin z_mmp;
u32 z_cookie; u32 z_cookie;
}; };
#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0])) struct rds_msg_zcopy_info {
struct list_head rs_zcookie_next;
union {
struct rds_znotifier znotif;
struct rds_zcopy_cookies zcookies;
};
};
static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z) struct rds_msg_zcopy_queue {
struct list_head zcookie_head;
spinlock_t lock; /* protects zcookie_head queue */
};
static inline void rds_message_zcopy_queue_init(struct rds_msg_zcopy_queue *q)
{ {
return container_of((void *)z, struct sk_buff, cb); spin_lock_init(&q->lock);
INIT_LIST_HEAD(&q->zcookie_head);
} }
struct rds_message { struct rds_message {
...@@ -603,8 +614,7 @@ struct rds_sock { ...@@ -603,8 +614,7 @@ struct rds_sock {
/* Socket receive path trace points*/ /* Socket receive path trace points*/
u8 rs_rx_traces; u8 rs_rx_traces;
u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX]; u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
struct rds_msg_zcopy_queue rs_zcookie_queue;
struct sk_buff_head rs_zcookie_queue;
}; };
static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk) static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
...@@ -803,6 +813,7 @@ void rds_message_addref(struct rds_message *rm); ...@@ -803,6 +813,7 @@ void rds_message_addref(struct rds_message *rm);
void rds_message_put(struct rds_message *rm); void rds_message_put(struct rds_message *rm);
void rds_message_wait(struct rds_message *rm); void rds_message_wait(struct rds_message *rm);
void rds_message_unmapped(struct rds_message *rm); void rds_message_unmapped(struct rds_message *rm);
void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *info);
static inline void rds_message_make_checksum(struct rds_header *hdr) static inline void rds_message_make_checksum(struct rds_header *hdr)
{ {
......
...@@ -579,9 +579,10 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg, ...@@ -579,9 +579,10 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
{ {
struct sk_buff *skb; struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
struct sk_buff_head *q = &rs->rs_zcookie_queue; struct rds_msg_zcopy_info *info = NULL;
struct rds_zcopy_cookies *done; struct rds_zcopy_cookies *done;
unsigned long flags;
if (!msg->msg_control) if (!msg->msg_control)
return false; return false;
...@@ -590,16 +591,24 @@ static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) ...@@ -590,16 +591,24 @@ static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
msg->msg_controllen < CMSG_SPACE(sizeof(*done))) msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
return false; return false;
skb = skb_dequeue(q); spin_lock_irqsave(&q->lock, flags);
if (!skb) if (!list_empty(&q->zcookie_head)) {
info = list_entry(q->zcookie_head.next,
struct rds_msg_zcopy_info, rs_zcookie_next);
list_del(&info->rs_zcookie_next);
}
spin_unlock_irqrestore(&q->lock, flags);
if (!info)
return false; return false;
done = (struct rds_zcopy_cookies *)skb->cb; done = &info->zcookies;
if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done), if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
done)) { done)) {
skb_queue_head(q, skb); spin_lock_irqsave(&q->lock, flags);
list_add(&info->rs_zcookie_next, &q->zcookie_head);
spin_unlock_irqrestore(&q->lock, flags);
return false; return false;
} }
consume_skb(skb); kfree(info);
return true; return true;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment