Commit 7426b1a5 authored by stephen hemminger's avatar stephen hemminger Committed by David S. Miller

netvsc: optimize receive completions

Optimize how receive completion ring are managed.
   * Allocate only as many slots as needed for all buffers from host
   * Allocate before setting up sub channel for better error detection
   * Don't need to keep copy of initial receive section message
   * Precompute the watermark for when receive flushing is needed
   * Replace division with conditional test
   * Replace atomic per-device variable with per-channel check.
   * Handle corner case where receive completion send
     fails if ring buffer to host is full.
Signed-off-by: default avatarStephen Hemminger <sthemmin@microsoft.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 02b6de01
......@@ -186,6 +186,7 @@ struct net_device_context;
struct netvsc_device *netvsc_device_add(struct hv_device *device,
const struct netvsc_device_info *info);
int netvsc_alloc_recv_comp_ring(struct netvsc_device *net_device, u32 q_idx);
void netvsc_device_remove(struct hv_device *device);
int netvsc_send(struct net_device_context *ndc,
struct hv_netvsc_packet *packet,
......@@ -657,11 +658,8 @@ struct recv_comp_data {
u32 status;
};
/* Netvsc Receive Slots Max */
#define NETVSC_RECVSLOT_MAX (NETVSC_RECEIVE_BUFFER_SIZE / ETH_DATA_LEN + 1)
struct multi_recv_comp {
void *buf; /* queued receive completions */
struct recv_comp_data *slots;
u32 first; /* first data entry */
u32 next; /* next entry for writing */
};
......@@ -750,7 +748,7 @@ struct netvsc_device {
u32 recv_buf_size;
u32 recv_buf_gpadl_handle;
u32 recv_section_cnt;
struct nvsp_1_receive_buffer_section *recv_section;
u32 recv_completion_cnt;
/* Send buffer allocated by us */
void *send_buf;
......@@ -778,8 +776,6 @@ struct netvsc_device {
u32 max_pkt; /* max number of pkt in one send, e.g. 8 */
u32 pkt_align; /* alignment bytes, e.g. 8 */
atomic_t num_outstanding_recvs;
atomic_t open_cnt;
struct netvsc_channel chan_table[VRSS_CHANNEL_MAX];
......
......@@ -72,9 +72,6 @@ static struct netvsc_device *alloc_net_device(void)
if (!net_device)
return NULL;
net_device->chan_table[0].mrc.buf
= vzalloc(NETVSC_RECVSLOT_MAX * sizeof(struct recv_comp_data));
init_waitqueue_head(&net_device->wait_drain);
net_device->destroy = false;
atomic_set(&net_device->open_cnt, 0);
......@@ -92,7 +89,7 @@ static void free_netvsc_device(struct rcu_head *head)
int i;
for (i = 0; i < VRSS_CHANNEL_MAX; i++)
vfree(nvdev->chan_table[i].mrc.buf);
vfree(nvdev->chan_table[i].mrc.slots);
kfree(nvdev);
}
......@@ -171,12 +168,6 @@ static void netvsc_destroy_buf(struct hv_device *device)
net_device->recv_buf = NULL;
}
if (net_device->recv_section) {
net_device->recv_section_cnt = 0;
kfree(net_device->recv_section);
net_device->recv_section = NULL;
}
/* Deal with the send buffer we may have setup.
* If we got a send section size, it means we received a
* NVSP_MSG1_TYPE_SEND_SEND_BUF_COMPLETE msg (ie sent
......@@ -239,11 +230,26 @@ static void netvsc_destroy_buf(struct hv_device *device)
kfree(net_device->send_section_map);
}
int netvsc_alloc_recv_comp_ring(struct netvsc_device *net_device, u32 q_idx)
{
struct netvsc_channel *nvchan = &net_device->chan_table[q_idx];
int node = cpu_to_node(nvchan->channel->target_cpu);
size_t size;
size = net_device->recv_completion_cnt * sizeof(struct recv_comp_data);
nvchan->mrc.slots = vzalloc_node(size, node);
if (!nvchan->mrc.slots)
nvchan->mrc.slots = vzalloc(size);
return nvchan->mrc.slots ? 0 : -ENOMEM;
}
static int netvsc_init_buf(struct hv_device *device,
struct netvsc_device *net_device)
{
int ret = 0;
struct nvsp_message *init_packet;
struct nvsp_1_message_send_receive_buffer_complete *resp;
struct net_device *ndev;
size_t map_words;
int node;
......@@ -300,43 +306,41 @@ static int netvsc_init_buf(struct hv_device *device,
wait_for_completion(&net_device->channel_init_wait);
/* Check the response */
if (init_packet->msg.v1_msg.
send_recv_buf_complete.status != NVSP_STAT_SUCCESS) {
netdev_err(ndev, "Unable to complete receive buffer "
"initialization with NetVsp - status %d\n",
init_packet->msg.v1_msg.
send_recv_buf_complete.status);
resp = &init_packet->msg.v1_msg.send_recv_buf_complete;
if (resp->status != NVSP_STAT_SUCCESS) {
netdev_err(ndev,
"Unable to complete receive buffer initialization with NetVsp - status %d\n",
resp->status);
ret = -EINVAL;
goto cleanup;
}
/* Parse the response */
netdev_dbg(ndev, "Receive sections: %u sub_allocs: size %u count: %u\n",
resp->num_sections, resp->sections[0].sub_alloc_size,
resp->sections[0].num_sub_allocs);
net_device->recv_section_cnt = init_packet->msg.
v1_msg.send_recv_buf_complete.num_sections;
net_device->recv_section = kmemdup(
init_packet->msg.v1_msg.send_recv_buf_complete.sections,
net_device->recv_section_cnt *
sizeof(struct nvsp_1_receive_buffer_section),
GFP_KERNEL);
if (net_device->recv_section == NULL) {
ret = -EINVAL;
goto cleanup;
}
net_device->recv_section_cnt = resp->num_sections;
/*
* For 1st release, there should only be 1 section that represents the
* entire receive buffer
*/
if (net_device->recv_section_cnt != 1 ||
net_device->recv_section->offset != 0) {
resp->sections[0].offset != 0) {
ret = -EINVAL;
goto cleanup;
}
/* Now setup the send buffer.
*/
/* Setup receive completion ring */
net_device->recv_completion_cnt
= round_up(resp->sections[0].num_sub_allocs + 1,
PAGE_SIZE / sizeof(u64));
ret = netvsc_alloc_recv_comp_ring(net_device, 0);
if (ret)
goto cleanup;
/* Now setup the send buffer. */
net_device->send_buf = vzalloc_node(net_device->send_buf_size, node);
if (!net_device->send_buf)
net_device->send_buf = vzalloc(net_device->send_buf_size);
......@@ -951,121 +955,85 @@ int netvsc_send(struct net_device_context *ndev_ctx,
return ret;
}
static int netvsc_send_recv_completion(struct vmbus_channel *channel,
u64 transaction_id, u32 status)
/* Send pending recv completions */
static int send_recv_completions(struct netvsc_channel *nvchan)
{
struct nvsp_message recvcompMessage;
struct netvsc_device *nvdev = nvchan->net_device;
struct multi_recv_comp *mrc = &nvchan->mrc;
struct recv_comp_msg {
struct nvsp_message_header hdr;
u32 status;
} __packed;
struct recv_comp_msg msg = {
.hdr.msg_type = NVSP_MSG1_TYPE_SEND_RNDIS_PKT_COMPLETE,
};
int ret;
recvcompMessage.hdr.msg_type =
NVSP_MSG1_TYPE_SEND_RNDIS_PKT_COMPLETE;
recvcompMessage.msg.v1_msg.send_rndis_pkt_complete.status = status;
/* Send the completion */
ret = vmbus_sendpacket(channel, &recvcompMessage,
sizeof(struct nvsp_message_header) + sizeof(u32),
transaction_id, VM_PKT_COMP, 0);
while (mrc->first != mrc->next) {
const struct recv_comp_data *rcd
= mrc->slots + mrc->first;
msg.status = rcd->status;
ret = vmbus_sendpacket(nvchan->channel, &msg, sizeof(msg),
rcd->tid, VM_PKT_COMP, 0);
if (unlikely(ret))
return ret;
}
static inline void count_recv_comp_slot(struct netvsc_device *nvdev, u16 q_idx,
u32 *filled, u32 *avail)
{
struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
u32 first = mrc->first;
u32 next = mrc->next;
*filled = (first > next) ? NETVSC_RECVSLOT_MAX - first + next :
next - first;
*avail = NETVSC_RECVSLOT_MAX - *filled - 1;
}
/* Read the first filled slot, no change to index */
static inline struct recv_comp_data *read_recv_comp_slot(struct netvsc_device
*nvdev, u16 q_idx)
{
struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
u32 filled, avail;
if (unlikely(!mrc->buf))
return NULL;
if (++mrc->first == nvdev->recv_completion_cnt)
mrc->first = 0;
}
count_recv_comp_slot(nvdev, q_idx, &filled, &avail);
if (!filled)
return NULL;
/* receive completion ring has been emptied */
if (unlikely(nvdev->destroy))
wake_up(&nvdev->wait_drain);
return mrc->buf + mrc->first * sizeof(struct recv_comp_data);
return 0;
}
/* Put the first filled slot back to available pool */
static inline void put_recv_comp_slot(struct netvsc_device *nvdev, u16 q_idx)
/* Count how many receive completions are outstanding */
static void recv_comp_slot_avail(const struct netvsc_device *nvdev,
const struct multi_recv_comp *mrc,
u32 *filled, u32 *avail)
{
struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
int num_recv;
mrc->first = (mrc->first + 1) % NETVSC_RECVSLOT_MAX;
u32 count = nvdev->recv_completion_cnt;
num_recv = atomic_dec_return(&nvdev->num_outstanding_recvs);
if (mrc->next >= mrc->first)
*filled = mrc->next - mrc->first;
else
*filled = (count - mrc->first) + mrc->next;
if (nvdev->destroy && num_recv == 0)
wake_up(&nvdev->wait_drain);
*avail = count - *filled - 1;
}
/* Check and send pending recv completions */
static void netvsc_chk_recv_comp(struct netvsc_device *nvdev,
struct vmbus_channel *channel, u16 q_idx)
/* Add receive complete to ring to send to host. */
static void enq_receive_complete(struct net_device *ndev,
struct netvsc_device *nvdev, u16 q_idx,
u64 tid, u32 status)
{
struct netvsc_channel *nvchan = &nvdev->chan_table[q_idx];
struct multi_recv_comp *mrc = &nvchan->mrc;
struct recv_comp_data *rcd;
int ret;
while (true) {
rcd = read_recv_comp_slot(nvdev, q_idx);
if (!rcd)
break;
u32 filled, avail;
ret = netvsc_send_recv_completion(channel, rcd->tid,
rcd->status);
if (ret)
break;
recv_comp_slot_avail(nvdev, mrc, &filled, &avail);
put_recv_comp_slot(nvdev, q_idx);
if (unlikely(filled > NAPI_POLL_WEIGHT)) {
send_recv_completions(nvchan);
recv_comp_slot_avail(nvdev, mrc, &filled, &avail);
}
}
#define NETVSC_RCD_WATERMARK 80
/* Get next available slot */
static inline struct recv_comp_data *get_recv_comp_slot(
struct netvsc_device *nvdev, struct vmbus_channel *channel, u16 q_idx)
{
struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
u32 filled, avail, next;
struct recv_comp_data *rcd;
if (unlikely(!nvdev->recv_section))
return NULL;
if (unlikely(!mrc->buf))
return NULL;
if (atomic_read(&nvdev->num_outstanding_recvs) >
nvdev->recv_section->num_sub_allocs * NETVSC_RCD_WATERMARK / 100)
netvsc_chk_recv_comp(nvdev, channel, q_idx);
count_recv_comp_slot(nvdev, q_idx, &filled, &avail);
if (!avail)
return NULL;
next = mrc->next;
rcd = mrc->buf + next * sizeof(struct recv_comp_data);
mrc->next = (next + 1) % NETVSC_RECVSLOT_MAX;
if (unlikely(!avail)) {
netdev_err(ndev, "Recv_comp full buf q:%hd, tid:%llx\n",
q_idx, tid);
return;
}
atomic_inc(&nvdev->num_outstanding_recvs);
rcd = mrc->slots + mrc->next;
rcd->tid = tid;
rcd->status = status;
return rcd;
if (++mrc->next == nvdev->recv_completion_cnt)
mrc->next = 0;
}
static int netvsc_receive(struct net_device *ndev,
......@@ -1083,7 +1051,6 @@ static int netvsc_receive(struct net_device *ndev,
u32 status = NVSP_STAT_SUCCESS;
int i;
int count = 0;
int ret;
/* Make sure this is a valid nvsp packet */
if (unlikely(nvsp->hdr.msg_type != NVSP_MSG1_TYPE_SEND_RNDIS_PKT)) {
......@@ -1114,25 +1081,9 @@ static int netvsc_receive(struct net_device *ndev,
channel, data, buflen);
}
if (net_device->chan_table[q_idx].mrc.buf) {
struct recv_comp_data *rcd;
enq_receive_complete(ndev, net_device, q_idx,
vmxferpage_packet->d.trans_id, status);
rcd = get_recv_comp_slot(net_device, channel, q_idx);
if (rcd) {
rcd->tid = vmxferpage_packet->d.trans_id;
rcd->status = status;
} else {
netdev_err(ndev, "Recv_comp full buf q:%hd, tid:%llx\n",
q_idx, vmxferpage_packet->d.trans_id);
}
} else {
ret = netvsc_send_recv_completion(channel,
vmxferpage_packet->d.trans_id,
status);
if (ret)
netdev_err(ndev, "Recv_comp q:%hd, tid:%llx, err:%d\n",
q_idx, vmxferpage_packet->d.trans_id, ret);
}
return count;
}
......@@ -1231,7 +1182,6 @@ int netvsc_poll(struct napi_struct *napi, int budget)
struct netvsc_device *net_device = nvchan->net_device;
struct vmbus_channel *channel = nvchan->channel;
struct hv_device *device = netvsc_channel_to_device(channel);
u16 q_idx = channel->offermsg.offer.sub_channel_index;
struct net_device *ndev = hv_get_drvdata(device);
int work_done = 0;
......@@ -1245,17 +1195,18 @@ int netvsc_poll(struct napi_struct *napi, int budget)
nvchan->desc = hv_pkt_iter_next(channel, nvchan->desc);
}
/* If receive ring was exhausted
/* If send of pending receive completions suceeded
* and did not exhaust NAPI budget
* and not doing busy poll
* then re-enable host interrupts
* and reschedule if ring is not empty.
* then reschedule if more data has arrived from host
*/
if (work_done < budget &&
if (send_recv_completions(nvchan) == 0 &&
work_done < budget &&
napi_complete_done(napi, work_done) &&
hv_end_read(&channel->inbound) != 0)
hv_end_read(&channel->inbound)) {
hv_begin_read(&channel->inbound);
napi_reschedule(napi);
netvsc_chk_recv_comp(net_device, channel, q_idx);
}
/* Driver may overshoot since multiple packets per descriptor */
return min(work_done, budget);
......
......@@ -928,12 +928,12 @@ static bool netvsc_device_idle(const struct netvsc_device *nvdev)
{
int i;
if (atomic_read(&nvdev->num_outstanding_recvs) > 0)
return false;
for (i = 0; i < nvdev->num_chn; i++) {
const struct netvsc_channel *nvchan = &nvdev->chan_table[i];
if (nvchan->mrc.first != nvchan->mrc.next)
return false;
if (atomic_read(&nvchan->queue_sends) > 0)
return false;
}
......@@ -1031,11 +1031,6 @@ static void netvsc_sc_open(struct vmbus_channel *new_sc)
return;
nvchan = nvscdev->chan_table + chn_index;
nvchan->mrc.buf
= vzalloc(NETVSC_RECVSLOT_MAX * sizeof(struct recv_comp_data));
if (!nvchan->mrc.buf)
return;
/* Because the device uses NAPI, all the interrupt batching and
* control is done via Net softirq, not the channel handling
......@@ -1225,6 +1220,15 @@ struct netvsc_device *rndis_filter_device_add(struct hv_device *dev,
if (num_rss_qs == 0)
return net_device;
for (i = 1; i < net_device->num_chn; i++) {
ret = netvsc_alloc_recv_comp_ring(net_device, i);
if (ret) {
while (--i != 0)
vfree(net_device->chan_table[i].mrc.slots);
goto out;
}
}
refcount_set(&net_device->sc_offered, num_rss_qs);
vmbus_set_sc_create_callback(dev->channel, netvsc_sc_open);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment