Commit b2347cb5 authored by David Howells's avatar David Howells

rxrpc: Implement local endpoint cache

Implement the local RxRPC endpoint cache.  Only the primary cache is used.
This is indexed on the following details:

  - Local network transport family - currently only AF_INET.
  - Local network transport type - currently only UDP.
  - Local network transport address.

The hash isn't very big since we don't expect to have many local endpoints
hanging around - RxRPC sockets opened with a 0 service ID (ie. client-only
sockets) share local endpoints if they have matching local network
addresses (typically all zeros).

We use a mutex to handle lookups and don't provide RCU-only lookups since
we only expect write access to this cache to be done from process context
when opening a socket.  The local endpoint object is pointed to by the
transport socket's sk_user_data for the life of the transport socket so
that it's fast to access by the transport socket sk_data_ready and
sk_error_report callbacks.

Further, the transport socket is shut down before we clear the sk_user_data
pointer, so that we can be sure that the transport socket's callbacks won't
be invoked once the RCU destruction is scheduled.

The local endpoint retains the transport socket that we use to send and
capture packets and capture network error messages (ICMP).  The socket is
opened when an endpoint is looked up - if it doesn't already exist.

Note that to make this work, we have to get rid of rxrpc_local_lock as that
causes a potential deadlock between a softirq looking in an object cache
whilst holding that lock vs objcache_clear() taking the cache lock and then
an interrupt.

However, since the socket is locked by the caller of the rxrpc_data_ready()
function and given that we don't clear sk_user_data until after we've shut
down the socket, we are guaranteed that the local endpoint struct is pinned
until rxrpc_data_ready() returns - so we don't need to lock the local
endpoint struct there.

The other places we've taken the lock where we read the usage count and
then increment it if not zero can be replaced by atomic_inc_not_zero()
(hidden inside rxrpc_get_local_maybe()).
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
parent f9aa3a15
......@@ -791,6 +791,8 @@ static int __init af_rxrpc_init(void)
rxrpc_epoch = get_seconds();
objcache_init(&rxrpc_local_cache);
ret = -ENOMEM;
rxrpc_call_jar = kmem_cache_create(
"rxrpc_call_jar", sizeof(struct rxrpc_call), 0,
......@@ -856,6 +858,7 @@ static int __init af_rxrpc_init(void)
error_work_queue:
kmem_cache_destroy(rxrpc_call_jar);
error_call_jar:
objcache_clear(&rxrpc_local_cache);
return ret;
}
......@@ -874,7 +877,7 @@ static void __exit af_rxrpc_exit(void)
rxrpc_destroy_all_connections();
rxrpc_destroy_all_transports();
rxrpc_destroy_all_peers();
rxrpc_destroy_all_locals();
objcache_clear(&rxrpc_local_cache);
ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
......
......@@ -213,12 +213,7 @@ void rxrpc_accept_incoming_calls(struct work_struct *work)
_enter("%d", local->debug_id);
read_lock_bh(&rxrpc_local_lock);
if (atomic_read(&local->usage) > 0)
rxrpc_get_local(local);
else
local = NULL;
read_unlock_bh(&rxrpc_local_lock);
local = rxrpc_get_local_maybe(local);
if (!local) {
_leave(" [local dead]");
return;
......
......@@ -317,7 +317,7 @@ void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
{
CHECK_SLAB_OKAY(&local->usage);
if (!atomic_inc_not_zero(&local->usage)) {
if (!rxrpc_get_local_maybe(local)) {
printk("resurrected on reject\n");
BUG();
}
......
......@@ -598,9 +598,9 @@ static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
{
_enter("%p,%p", local, skb);
atomic_inc(&local->usage);
rxrpc_get_local(local);
skb_queue_tail(&local->event_queue, skb);
rxrpc_queue_work(&local->event_processor);
rxrpc_queue_work(&local->processor);
}
/*
......@@ -675,13 +675,13 @@ void rxrpc_data_ready(struct sock *sk)
ASSERT(!irqs_disabled());
read_lock_bh(&rxrpc_local_lock);
local = sk->sk_user_data;
if (local && atomic_read(&local->usage) > 0)
rxrpc_get_local(local);
else
local = NULL;
read_unlock_bh(&rxrpc_local_lock);
/* The socket is locked by the caller and this prevents the socket from
* being shut down, thus preventing sk_user_data from being cleared
* until this function returns. The local endpoint may, however, be in
* the process of being discarded from the cache, so we still need to
* validate it.
*/
local = rxrpc_get_local_maybe(sk->sk_user_data);
if (!local) {
_leave(" [local dead]");
return;
......
......@@ -167,24 +167,24 @@ struct rxrpc_security {
};
/*
* RxRPC local transport endpoint definition
* - matched by local port, address and protocol type
* RxRPC local transport endpoint description
* - owned by a single AF_RXRPC socket
* - pointed to by transport socket struct sk_user_data
*/
struct rxrpc_local {
struct obj_node obj;
struct socket *socket; /* my UDP socket */
struct work_struct destroyer; /* endpoint destroyer */
struct work_struct acceptor; /* incoming call processor */
struct work_struct rejecter; /* packet reject writer */
struct work_struct event_processor; /* endpoint event processor */
struct work_struct processor; /* endpoint packet processor */
struct list_head services; /* services listening on this endpoint */
struct list_head link; /* link in endpoint list */
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
struct sk_buff_head accept_queue; /* incoming calls awaiting acceptance */
struct sk_buff_head reject_queue; /* packets awaiting rejection */
struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */
struct mutex conn_lock; /* Client connection creation lock */
spinlock_t lock; /* access lock */
rwlock_t services_lock; /* lock for services list */
atomic_t usage;
int debug_id; /* debug ID for printks */
volatile char error_rcvd; /* T if received ICMP error outstanding */
struct sockaddr_rxrpc srx; /* local address */
......@@ -674,11 +674,25 @@ struct rxrpc_transport *rxrpc_find_transport(struct rxrpc_local *,
/*
* local-object.c
*/
extern rwlock_t rxrpc_local_lock;
extern struct objcache rxrpc_local_cache;
struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *);
void rxrpc_put_local(struct rxrpc_local *);
void __exit rxrpc_destroy_all_locals(void);
static inline void rxrpc_get_local(struct rxrpc_local *local)
{
objcache_get(&local->obj);
}
static inline
struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local)
{
return objcache_get_maybe(&local->obj) ? local : NULL;
}
static inline void rxrpc_put_local(struct rxrpc_local *local)
{
objcache_put(&rxrpc_local_cache, &local->obj);
}
/*
* sysctl.c
......@@ -866,15 +880,6 @@ static inline void rxrpc_purge_queue(struct sk_buff_head *list)
rxrpc_free_skb(skb);
}
static inline void __rxrpc_get_local(struct rxrpc_local *local, const char *f)
{
CHECK_SLAB_OKAY(&local->usage);
if (atomic_inc_return(&local->usage) == 1)
printk("resurrected (%s)\n", f);
}
#define rxrpc_get_local(LOCAL) __rxrpc_get_local((LOCAL), __func__)
#define rxrpc_get_call(CALL) \
do { \
CHECK_SLAB_OKAY(&(CALL)->usage); \
......
......@@ -82,13 +82,14 @@ static void rxrpc_send_version_request(struct rxrpc_local *local,
*/
void rxrpc_process_local_events(struct work_struct *work)
{
struct rxrpc_local *local = container_of(work, struct rxrpc_local, event_processor);
struct rxrpc_local *local =
container_of(work, struct rxrpc_local, processor);
struct sk_buff *skb;
char v;
_enter("");
atomic_inc(&local->usage);
rxrpc_get_local(local);
while ((skb = skb_dequeue(&local->event_queue))) {
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment