Commit cc3baecb authored by David S. Miller's avatar David S. Miller

Merge tag 'rxrpc-rewrite-20160706' of...

Merge tag 'rxrpc-rewrite-20160706' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Improve conn/call lookup and fix call number generation [ver #3]

I've fixed a couple of patch descriptions and excised the patch that
duplicated the connections list for reconsideration at a later date.

For reference, the excised patch is sitting on the rxrpc-experimental
branch of my git tree, based on top of the rxrpc-rewrite branch.  Diffing
it against yesterday's tag shows no differences.

Would you prefer the patch set to be emailed afresh instead of a git-pull
request?

David
---
Here's the next part of the AF_RXRPC rewrite.  The two main purposes of
this set are to fix the call number handling and to make use of RCU when
looking up the connection or call to pass a received packet to.

Important changes in this set include:

 (1) Avoidance of placing stack data into SG lists in rxkad so that kernel
     stacks can become vmalloc'd (Herbert Xu).

 (2) Calls cease pinning the connection they used as soon as possible,
     which allows the connection to be discarded sooner and allows the call
     channel on that connection to be reused earlier.

 (3) Make each call channel on a connection have a separate and independent
     call number space rather than having a shared number space for the
     connection.  Call numbers should increment monotonically per channel
     on the client, and the server should ignore a call with a lower call
     number for that channel than the latest it has seen.  The RESPONSE
     packet sets the minimum values of each call ID counter on a
     connection.

 (4) Look up calls by indexing the channel array on a connection rather
     than by keeping calls in an rbtree on that connection.  Also look up
     calls using the channel array rather than using a hashtable.

     The call hashtable can then be removed.

 (5) Call terminal statuses are cached in the channel array for the last
     call.  It is assumed that if we the server have seen call N, then the
     client no longer cares about call N-1 on the same channel.

     This will allow retransmission of the terminal status in future
     without the need to keep the rxrpc_call struct around.

 (6) Peer lookups are moved out of common connection handling code and into
     service connection handling code as client connections (a) must point
     to a peer before they can be used and (b) are looked up by a
     machine-unique connection ID directly, so we only need to look up the
     peer first if we're going to deal with a service call.

 (7) The reference count on a connection is held elevated by 1 whilst it is
     alive (ie. idle unused connections have a refcount of 1).  The reaper
     will attempt to change the refcount from 1->0 and skip if this cannot
     be done, whilst look ups only increment the refcount if it's non-zero.

     This makes the implementation of RCU lookups easier as we don't have
     to get a ref on the connection or a lock on the connection list to
     prevent a connection being reaped whilst we're contemplating queueing
     a packet that initiates a new service call upon it.

     If we need to get a connection, but there's a dead connection in the
     tree, we use rb_replace_node() to replace the dead one with a new one.

 (8) Use a seqlock to validate the walk over the service connection rbtree
     attached to a peer when it's being walked in RCU mode.

 (9) Make the incoming call/connection packet handling code use RCU mode
     and locks and make it only take a reference if the call/connection
     gets queued on a workqueue.

The intention is that the next set will introduce the connection lifetime
management and capacity limits to prevent clients from overloading the
server.

There are some fixes too:

 (1) Verifying that a packet coming in to a client connection came from the
     expected source.

 (2) Fix handling of connection failure in client call creation where we
     don't reinitialise the list linkage block and a second attempt to
     unlink the failed connection oopses and also we don't set the state
     correctly, which causes an assertion failure.

 (3) New service calls were being added to the socket's accept queue under
     the wrong lock.

Changes:

 (V2) In rxrpc_find_service_conn_rcu() initialised the sequence number to 0.

      Fixed the RCU handling in conn_service.c by introducing and using
      rb_replace_node_rcu() as an RCU-safe alternative in
      rxrpc_publish_service_conn().

      Modified and used rcu_dereference_raw() to avoid RCU sparse warnings
      in rxrpc_find_service_conn_rcu().

      Added in some missing RCU dereference wrappers.  It seems to be
      necessary to turn on CONFIG_PROVE_RCU_REPEATEDLY as well as
      CONFIG_SPARSE_RCU_POINTER to get the static __rcu annotation checking
      to happen.

      Fixed some other sparse warnings, including a missing ntohs() in
      jumbo packet processing.

 (V3) Fixed some commit descriptions.

      Excised the patch that duplicated the connection list to separate out
      the procfs list for reconsideration at a later date.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 99a50bb1 d440a1ce
......@@ -76,6 +76,8 @@ extern struct rb_node *rb_next_postorder(const struct rb_node *);
/* Fast replacement of a single node without remove/rebalance/add/rebalance */
extern void rb_replace_node(struct rb_node *victim, struct rb_node *new,
struct rb_root *root);
extern void rb_replace_node_rcu(struct rb_node *victim, struct rb_node *new,
struct rb_root *root);
static inline void rb_link_node(struct rb_node *node, struct rb_node *parent,
struct rb_node **rb_link)
......
......@@ -130,6 +130,19 @@ __rb_change_child(struct rb_node *old, struct rb_node *new,
WRITE_ONCE(root->rb_node, new);
}
static inline void
__rb_change_child_rcu(struct rb_node *old, struct rb_node *new,
struct rb_node *parent, struct rb_root *root)
{
if (parent) {
if (parent->rb_left == old)
rcu_assign_pointer(parent->rb_left, new);
else
rcu_assign_pointer(parent->rb_right, new);
} else
rcu_assign_pointer(root->rb_node, new);
}
extern void __rb_erase_color(struct rb_node *parent, struct rb_root *root,
void (*augment_rotate)(struct rb_node *old, struct rb_node *new));
......
......@@ -611,6 +611,12 @@ static inline void rcu_preempt_sleep_check(void)
rcu_dereference_sparse(p, space); \
((typeof(*p) __force __kernel *)(p)); \
})
#define rcu_dereference_raw(p) \
({ \
/* Dependency order vs. p above. */ \
typeof(p) ________p1 = lockless_dereference(p); \
((typeof(*p) __force __kernel *)(________p1)); \
})
/**
* RCU_INITIALIZER() - statically initialize an RCU-protected global variable
......@@ -729,8 +735,6 @@ static inline void rcu_preempt_sleep_check(void)
__rcu_dereference_check((p), (c) || rcu_read_lock_sched_held(), \
__rcu)
#define rcu_dereference_raw(p) rcu_dereference_check(p, 1) /*@@@ needed? @@@*/
/*
* The tracing infrastructure traces RCU (we want that), but unfortunately
* some of the RCU checks causes tracing to lock up the system.
......
......@@ -539,17 +539,39 @@ void rb_replace_node(struct rb_node *victim, struct rb_node *new,
{
struct rb_node *parent = rb_parent(victim);
/* Copy the pointers/colour from the victim to the replacement */
*new = *victim;
/* Set the surrounding nodes to point to the replacement */
__rb_change_child(victim, new, parent, root);
if (victim->rb_left)
rb_set_parent(victim->rb_left, new);
if (victim->rb_right)
rb_set_parent(victim->rb_right, new);
__rb_change_child(victim, new, parent, root);
}
EXPORT_SYMBOL(rb_replace_node);
void rb_replace_node_rcu(struct rb_node *victim, struct rb_node *new,
struct rb_root *root)
{
struct rb_node *parent = rb_parent(victim);
/* Copy the pointers/colour from the victim to the replacement */
*new = *victim;
/* Set the surrounding nodes to point to the replacement */
if (victim->rb_left)
rb_set_parent(victim->rb_left, new);
if (victim->rb_right)
rb_set_parent(victim->rb_right, new);
/* Set the parent's pointer to the new node last after an RCU barrier
* so that the pointers onwards are seen to be set correctly when doing
* an RCU walk over the tree.
*/
__rb_change_child_rcu(victim, new, parent, root);
}
EXPORT_SYMBOL(rb_replace_node);
EXPORT_SYMBOL(rb_replace_node_rcu);
static struct rb_node *rb_left_deepest_node(const struct rb_node *node)
{
......
......@@ -10,6 +10,7 @@ af-rxrpc-y := \
conn_client.o \
conn_event.o \
conn_object.o \
conn_service.o \
input.o \
insecure.o \
key.o \
......
......@@ -788,27 +788,7 @@ static void __exit af_rxrpc_exit(void)
proto_unregister(&rxrpc_proto);
rxrpc_destroy_all_calls();
rxrpc_destroy_all_connections();
ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
/* We need to flush the scheduled work twice because the local endpoint
* records involve a work item in their destruction as they can only be
* destroyed from process context. However, a connection may have a
* work item outstanding - and this will pin the local endpoint record
* until the connection goes away.
*
* Peers don't pin locals and calls pin sockets - which prevents the
* module from being unloaded - so we should only need two flushes.
*/
_debug("flush scheduled work");
flush_workqueue(rxrpc_workqueue);
_debug("flush scheduled work 2");
flush_workqueue(rxrpc_workqueue);
_debug("synchronise RCU");
rcu_barrier();
_debug("destroy locals");
ASSERT(idr_is_empty(&rxrpc_client_conn_ids));
idr_destroy(&rxrpc_client_conn_ids);
rxrpc_destroy_all_locals();
remove_proc_entry("rxrpc_conns", init_net.proc_net);
......
This diff is collapsed.
......@@ -75,7 +75,6 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
{
struct rxrpc_connection *conn;
struct rxrpc_skb_priv *sp, *nsp;
struct rxrpc_peer *peer;
struct rxrpc_call *call;
struct sk_buff *notification;
int ret;
......@@ -94,15 +93,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
rxrpc_new_skb(notification);
notification->mark = RXRPC_SKB_MARK_NEW_CALL;
peer = rxrpc_lookup_peer(local, srx, GFP_NOIO);
if (!peer) {
_debug("no peer");
ret = -EBUSY;
goto error;
}
conn = rxrpc_incoming_connection(local, peer, skb);
rxrpc_put_peer(peer);
conn = rxrpc_incoming_connection(local, srx, skb);
if (IS_ERR(conn)) {
_debug("no conn");
ret = PTR_ERR(conn);
......@@ -128,12 +119,11 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
spin_lock(&call->conn->state_lock);
if (sp->hdr.securityIndex > 0 &&
call->conn->state == RXRPC_CONN_SERVER_UNSECURED) {
call->conn->state == RXRPC_CONN_SERVICE_UNSECURED) {
_debug("await conn sec");
list_add_tail(&call->accept_link, &rx->secureq);
call->conn->state = RXRPC_CONN_SERVER_CHALLENGING;
rxrpc_get_connection(call->conn);
set_bit(RXRPC_CONN_CHALLENGE, &call->conn->events);
call->conn->state = RXRPC_CONN_SERVICE_CHALLENGING;
set_bit(RXRPC_CONN_EV_CHALLENGE, &call->conn->events);
rxrpc_queue_conn(call->conn);
} else {
_debug("conn ready");
......@@ -227,20 +217,8 @@ void rxrpc_accept_incoming_calls(struct rxrpc_local *local)
whdr._rsvd = 0;
whdr.serviceId = htons(sp->hdr.serviceId);
/* determine the remote address */
memset(&srx, 0, sizeof(srx));
srx.srx_family = AF_RXRPC;
srx.transport.family = local->srx.transport.family;
srx.transport_type = local->srx.transport_type;
switch (srx.transport.family) {
case AF_INET:
srx.transport_len = sizeof(struct sockaddr_in);
srx.transport.sin.sin_port = udp_hdr(skb)->source;
srx.transport.sin.sin_addr.s_addr = ip_hdr(skb)->saddr;
break;
default:
goto busy;
}
if (rxrpc_extract_addr_from_skb(&srx, skb) < 0)
goto drop;
/* get the socket providing the service */
read_lock_bh(&local->services_lock);
......@@ -286,6 +264,10 @@ void rxrpc_accept_incoming_calls(struct rxrpc_local *local)
rxrpc_free_skb(skb);
return;
drop:
rxrpc_free_skb(skb);
return;
invalid_service:
skb->priority = RX_INVALID_OPERATION;
rxrpc_reject_packet(local, skb);
......
......@@ -858,11 +858,6 @@ void rxrpc_process_call(struct work_struct *work)
iov[0].iov_len = sizeof(whdr);
/* deal with events of a final nature */
if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
rxrpc_release_call(call);
clear_bit(RXRPC_CALL_EV_RELEASE, &call->events);
}
if (test_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events)) {
enum rxrpc_skb_mark mark;
int error;
......@@ -1094,7 +1089,7 @@ void rxrpc_process_call(struct work_struct *work)
if (call->state == RXRPC_CALL_SERVER_SECURING) {
_debug("securing");
write_lock(&call->conn->lock);
write_lock(&call->socket->call_lock);
if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
!test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
_debug("not released");
......@@ -1102,7 +1097,7 @@ void rxrpc_process_call(struct work_struct *work)
list_move_tail(&call->accept_link,
&call->socket->acceptq);
}
write_unlock(&call->conn->lock);
write_unlock(&call->socket->call_lock);
read_lock(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE)
set_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events);
......@@ -1144,6 +1139,11 @@ void rxrpc_process_call(struct work_struct *work)
goto maybe_reschedule;
}
if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
rxrpc_release_call(call);
clear_bit(RXRPC_CALL_EV_RELEASE, &call->events);
}
/* other events may have been raised since we started checking */
goto maybe_reschedule;
......
This diff is collapsed.
......@@ -33,7 +33,8 @@ static DEFINE_SPINLOCK(rxrpc_conn_id_lock);
* client conns away from the current allocation point to try and keep the IDs
* concentrated. We will also need to retire connections from an old epoch.
*/
int rxrpc_get_client_connection_id(struct rxrpc_connection *conn, gfp_t gfp)
static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
gfp_t gfp)
{
u32 epoch;
int id;
......@@ -83,7 +84,7 @@ int rxrpc_get_client_connection_id(struct rxrpc_connection *conn, gfp_t gfp)
/*
* Release a connection ID for a client connection from the global pool.
*/
void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
static void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
{
if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
spin_lock(&rxrpc_conn_id_lock);
......@@ -92,3 +93,280 @@ void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
spin_unlock(&rxrpc_conn_id_lock);
}
}
/*
* Destroy the client connection ID tree.
*/
void rxrpc_destroy_client_conn_ids(void)
{
struct rxrpc_connection *conn;
int id;
if (!idr_is_empty(&rxrpc_client_conn_ids)) {
idr_for_each_entry(&rxrpc_client_conn_ids, conn, id) {
pr_err("AF_RXRPC: Leaked client conn %p {%d}\n",
conn, atomic_read(&conn->usage));
}
BUG();
}
idr_destroy(&rxrpc_client_conn_ids);
}
/*
* Allocate a client connection. The caller must take care to clear any
* padding bytes in *cp.
*/
static struct rxrpc_connection *
rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
{
struct rxrpc_connection *conn;
int ret;
_enter("");
conn = rxrpc_alloc_connection(gfp);
if (!conn) {
_leave(" = -ENOMEM");
return ERR_PTR(-ENOMEM);
}
conn->params = *cp;
conn->out_clientflag = RXRPC_CLIENT_INITIATED;
conn->state = RXRPC_CONN_CLIENT;
ret = rxrpc_get_client_connection_id(conn, gfp);
if (ret < 0)
goto error_0;
ret = rxrpc_init_client_conn_security(conn);
if (ret < 0)
goto error_1;
ret = conn->security->prime_packet_security(conn);
if (ret < 0)
goto error_2;
write_lock(&rxrpc_connection_lock);
list_add_tail(&conn->link, &rxrpc_connections);
write_unlock(&rxrpc_connection_lock);
/* We steal the caller's peer ref. */
cp->peer = NULL;
rxrpc_get_local(conn->params.local);
key_get(conn->params.key);
_leave(" = %p", conn);
return conn;
error_2:
conn->security->clear(conn);
error_1:
rxrpc_put_client_connection_id(conn);
error_0:
kfree(conn);
_leave(" = %d", ret);
return ERR_PTR(ret);
}
/*
* find a connection for a call
* - called in process context with IRQs enabled
*/
int rxrpc_connect_call(struct rxrpc_call *call,
struct rxrpc_conn_parameters *cp,
struct sockaddr_rxrpc *srx,
gfp_t gfp)
{
struct rxrpc_connection *conn, *candidate = NULL;
struct rxrpc_local *local = cp->local;
struct rb_node *p, **pp, *parent;
long diff;
int chan;
DECLARE_WAITQUEUE(myself, current);
_enter("{%d,%lx},", call->debug_id, call->user_call_ID);
cp->peer = rxrpc_lookup_peer(cp->local, srx, gfp);
if (!cp->peer)
return -ENOMEM;
if (!cp->exclusive) {
/* Search for a existing client connection unless this is going
* to be a connection that's used exclusively for a single call.
*/
_debug("search 1");
spin_lock(&local->client_conns_lock);
p = local->client_conns.rb_node;
while (p) {
conn = rb_entry(p, struct rxrpc_connection, client_node);
#define cmp(X) ((long)conn->params.X - (long)cp->X)
diff = (cmp(peer) ?:
cmp(key) ?:
cmp(security_level));
if (diff < 0)
p = p->rb_left;
else if (diff > 0)
p = p->rb_right;
else
goto found_extant_conn;
}
spin_unlock(&local->client_conns_lock);
}
/* We didn't find a connection or we want an exclusive one. */
_debug("get new conn");
candidate = rxrpc_alloc_client_connection(cp, gfp);
if (!candidate) {
_leave(" = -ENOMEM");
return -ENOMEM;
}
if (cp->exclusive) {
/* Assign the call on an exclusive connection to channel 0 and
* don't add the connection to the endpoint's shareable conn
* lookup tree.
*/
_debug("exclusive chan 0");
conn = candidate;
atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
spin_lock(&conn->channel_lock);
chan = 0;
goto found_channel;
}
/* We need to redo the search before attempting to add a new connection
* lest we race with someone else adding a conflicting instance.
*/
_debug("search 2");
spin_lock(&local->client_conns_lock);
pp = &local->client_conns.rb_node;
parent = NULL;
while (*pp) {
parent = *pp;
conn = rb_entry(parent, struct rxrpc_connection, client_node);
diff = (cmp(peer) ?:
cmp(key) ?:
cmp(security_level));
if (diff < 0)
pp = &(*pp)->rb_left;
else if (diff > 0)
pp = &(*pp)->rb_right;
else
goto found_extant_conn;
}
/* The second search also failed; simply add the new connection with
* the new call in channel 0. Note that we need to take the channel
* lock before dropping the client conn lock.
*/
_debug("new conn");
set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
rb_link_node(&candidate->client_node, parent, pp);
rb_insert_color(&candidate->client_node, &local->client_conns);
attached:
conn = candidate;
candidate = NULL;
atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
spin_lock(&conn->channel_lock);
spin_unlock(&local->client_conns_lock);
chan = 0;
found_channel:
_debug("found chan");
call->conn = conn;
call->channel = chan;
call->epoch = conn->proto.epoch;
call->cid = conn->proto.cid | chan;
call->call_id = ++conn->channels[chan].call_counter;
conn->channels[chan].call_id = call->call_id;
rcu_assign_pointer(conn->channels[chan].call, call);
_net("CONNECT call %d on conn %d", call->debug_id, conn->debug_id);
spin_unlock(&conn->channel_lock);
rxrpc_put_peer(cp->peer);
cp->peer = NULL;
_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
return 0;
/* We found a potentially suitable connection already in existence. If
* we can reuse it (ie. its usage count hasn't been reduced to 0 by the
* reaper), discard any candidate we may have allocated, and try to get
* a channel on this one, otherwise we have to replace it.
*/
found_extant_conn:
_debug("found conn");
if (!rxrpc_get_connection_maybe(conn)) {
set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
rb_replace_node(&conn->client_node,
&candidate->client_node,
&local->client_conns);
clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags);
goto attached;
}
spin_unlock(&local->client_conns_lock);
rxrpc_put_connection(candidate);
if (!atomic_add_unless(&conn->avail_chans, -1, 0)) {
if (!gfpflags_allow_blocking(gfp)) {
rxrpc_put_connection(conn);
_leave(" = -EAGAIN");
return -EAGAIN;
}
add_wait_queue(&conn->channel_wq, &myself);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (atomic_add_unless(&conn->avail_chans, -1, 0))
break;
if (signal_pending(current))
goto interrupted;
schedule();
}
remove_wait_queue(&conn->channel_wq, &myself);
__set_current_state(TASK_RUNNING);
}
/* The connection allegedly now has a free channel and we can now
* attach the call to it.
*/
spin_lock(&conn->channel_lock);
for (chan = 0; chan < RXRPC_MAXCALLS; chan++)
if (!conn->channels[chan].call)
goto found_channel;
BUG();
interrupted:
remove_wait_queue(&conn->channel_wq, &myself);
__set_current_state(TASK_RUNNING);
rxrpc_put_connection(conn);
rxrpc_put_peer(cp->peer);
cp->peer = NULL;
_leave(" = -ERESTARTSYS");
return -ERESTARTSYS;
}
/*
* Remove a client connection from the local endpoint's tree, thereby removing
* it as a target for reuse for new client calls.
*/
void rxrpc_unpublish_client_conn(struct rxrpc_connection *conn)
{
struct rxrpc_local *local = conn->params.local;
spin_lock(&local->client_conns_lock);
if (test_and_clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags))
rb_erase(&conn->client_node, &local->client_conns);
spin_unlock(&local->client_conns_lock);
rxrpc_put_client_connection_id(conn);
}
......@@ -31,15 +31,17 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
u32 abort_code)
{
struct rxrpc_call *call;
struct rb_node *p;
int i;
_enter("{%d},%x", conn->debug_id, abort_code);
read_lock_bh(&conn->lock);
spin_lock(&conn->channel_lock);
for (p = rb_first(&conn->calls); p; p = rb_next(p)) {
call = rb_entry(p, struct rxrpc_call, conn_node);
write_lock(&call->state_lock);
for (i = 0; i < RXRPC_MAXCALLS; i++) {
call = rcu_dereference_protected(
conn->channels[i].call,
lockdep_is_held(&conn->channel_lock));
write_lock_bh(&call->state_lock);
if (call->state <= RXRPC_CALL_COMPLETE) {
call->state = state;
if (state == RXRPC_CALL_LOCALLY_ABORTED) {
......@@ -51,10 +53,10 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
}
rxrpc_queue_call(call);
}
write_unlock(&call->state_lock);
write_unlock_bh(&call->state_lock);
}
read_unlock_bh(&conn->lock);
spin_unlock(&conn->channel_lock);
_leave("");
}
......@@ -188,18 +190,24 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
if (ret < 0)
return ret;
conn->security->prime_packet_security(conn);
read_lock_bh(&conn->lock);
ret = conn->security->prime_packet_security(conn);
if (ret < 0)
return ret;
spin_lock(&conn->channel_lock);
spin_lock(&conn->state_lock);
if (conn->state == RXRPC_CONN_SERVER_CHALLENGING) {
conn->state = RXRPC_CONN_SERVER;
if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING) {
conn->state = RXRPC_CONN_SERVICE;
for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
rxrpc_call_is_secure(conn->channels[loop]);
rxrpc_call_is_secure(
rcu_dereference_protected(
conn->channels[loop].call,
lockdep_is_held(&conn->channel_lock)));
}
spin_unlock(&conn->state_lock);
read_unlock_bh(&conn->lock);
spin_unlock(&conn->channel_lock);
return 0;
default:
......@@ -263,12 +271,8 @@ void rxrpc_process_connection(struct work_struct *work)
_enter("{%d}", conn->debug_id);
rxrpc_get_connection(conn);
if (test_and_clear_bit(RXRPC_CONN_CHALLENGE, &conn->events)) {
if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
rxrpc_secure_connection(conn);
rxrpc_put_connection(conn);
}
/* go through the conn-level event packets, releasing the ref on this
* connection that each one has when we've finished with it */
......@@ -283,7 +287,6 @@ void rxrpc_process_connection(struct work_struct *work)
goto requeue_and_leave;
case -ECONNABORTED:
default:
rxrpc_put_connection(conn);
rxrpc_free_skb(skb);
break;
}
......@@ -301,7 +304,6 @@ void rxrpc_process_connection(struct work_struct *work)
protocol_error:
if (rxrpc_abort_connection(conn, -ret, abort_code) < 0)
goto requeue_and_leave;
rxrpc_put_connection(conn);
rxrpc_free_skb(skb);
_leave(" [EPROTO]");
goto out;
......@@ -315,7 +317,7 @@ void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
CHECK_SLAB_OKAY(&local->usage);
skb_queue_tail(&local->reject_queue, skb);
rxrpc_queue_work(&local->processor);
rxrpc_queue_local(local);
}
/*
......
This diff is collapsed.
/* Service connection management
*
* Copyright (C) 2016 Red Hat, Inc. All Rights Reserved.
* Written by David Howells (dhowells@redhat.com)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public Licence
* as published by the Free Software Foundation; either version
* 2 of the Licence, or (at your option) any later version.
*/
#include <linux/slab.h>
#include "ar-internal.h"
/*
* Find a service connection under RCU conditions.
*
* We could use a hash table, but that is subject to bucket stuffing by an
* attacker as the client gets to pick the epoch and cid values and would know
* the hash function. So, instead, we use a hash table for the peer and from
* that an rbtree to find the service connection. Under ordinary circumstances
* it might be slower than a large hash table, but it is at least limited in
* depth.
*/
struct rxrpc_connection *rxrpc_find_service_conn_rcu(struct rxrpc_peer *peer,
struct sk_buff *skb)
{
struct rxrpc_connection *conn = NULL;
struct rxrpc_conn_proto k;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rb_node *p;
unsigned int seq = 0;
k.epoch = sp->hdr.epoch;
k.cid = sp->hdr.cid & RXRPC_CIDMASK;
do {
/* Unfortunately, rbtree walking doesn't give reliable results
* under just the RCU read lock, so we have to check for
* changes.
*/
read_seqbegin_or_lock(&peer->service_conn_lock, &seq);
p = rcu_dereference_raw(peer->service_conns.rb_node);
while (p) {
conn = rb_entry(p, struct rxrpc_connection, service_node);
if (conn->proto.index_key < k.index_key)
p = rcu_dereference_raw(p->rb_left);
else if (conn->proto.index_key > k.index_key)
p = rcu_dereference_raw(p->rb_right);
else
goto done;
conn = NULL;
}
} while (need_seqretry(&peer->service_conn_lock, seq));
done:
done_seqretry(&peer->service_conn_lock, seq);
_leave(" = %d", conn ? conn->debug_id : -1);
return conn;
}
/*
* Insert a service connection into a peer's tree, thereby making it a target
* for incoming packets.
*/
static struct rxrpc_connection *
rxrpc_publish_service_conn(struct rxrpc_peer *peer,
struct rxrpc_connection *conn)
{
struct rxrpc_connection *cursor = NULL;
struct rxrpc_conn_proto k = conn->proto;
struct rb_node **pp, *parent;
write_seqlock_bh(&peer->service_conn_lock);
pp = &peer->service_conns.rb_node;
parent = NULL;
while (*pp) {
parent = *pp;
cursor = rb_entry(parent,
struct rxrpc_connection, service_node);
if (cursor->proto.index_key < k.index_key)
pp = &(*pp)->rb_left;
else if (cursor->proto.index_key > k.index_key)
pp = &(*pp)->rb_right;
else
goto found_extant_conn;
}
rb_link_node_rcu(&conn->service_node, parent, pp);
rb_insert_color(&conn->service_node, &peer->service_conns);
conn_published:
set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);
write_sequnlock_bh(&peer->service_conn_lock);
_leave(" = %d [new]", conn->debug_id);
return conn;
found_extant_conn:
if (atomic_read(&cursor->usage) == 0)
goto replace_old_connection;
write_sequnlock_bh(&peer->service_conn_lock);
/* We should not be able to get here. rxrpc_incoming_connection() is
* called in a non-reentrant context, so there can't be a race to
* insert a new connection.
*/
BUG();
replace_old_connection:
/* The old connection is from an outdated epoch. */
_debug("replace conn");
rb_replace_node_rcu(&cursor->service_node,
&conn->service_node,
&peer->service_conns);
clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &cursor->flags);
goto conn_published;
}
/*
* get a record of an incoming connection
*/
struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
struct sockaddr_rxrpc *srx,
struct sk_buff *skb)
{
struct rxrpc_connection *conn;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rxrpc_peer *peer;
const char *new = "old";
_enter("");
peer = rxrpc_lookup_peer(local, srx, GFP_NOIO);
if (!peer) {
_debug("no peer");
return ERR_PTR(-EBUSY);
}
ASSERT(sp->hdr.flags & RXRPC_CLIENT_INITIATED);
rcu_read_lock();
peer = rxrpc_lookup_peer_rcu(local, srx);
if (peer) {
conn = rxrpc_find_service_conn_rcu(peer, skb);
if (conn) {
if (sp->hdr.securityIndex != conn->security_ix)
goto security_mismatch_rcu;
if (rxrpc_get_connection_maybe(conn))
goto found_extant_connection_rcu;
/* The conn has expired but we can't remove it without
* the appropriate lock, so we attempt to replace it
* when we have a new candidate.
*/
}
if (!rxrpc_get_peer_maybe(peer))
peer = NULL;
}
rcu_read_unlock();
if (!peer) {
peer = rxrpc_lookup_peer(local, srx, GFP_NOIO);
if (IS_ERR(peer))
goto enomem;
}
/* We don't have a matching record yet. */
conn = rxrpc_alloc_connection(GFP_NOIO);
if (!conn)
goto enomem_peer;
conn->proto.epoch = sp->hdr.epoch;
conn->proto.cid = sp->hdr.cid & RXRPC_CIDMASK;
conn->params.local = local;
conn->params.peer = peer;
conn->params.service_id = sp->hdr.serviceId;
conn->security_ix = sp->hdr.securityIndex;
conn->out_clientflag = 0;
conn->state = RXRPC_CONN_SERVICE;
if (conn->params.service_id)
conn->state = RXRPC_CONN_SERVICE_UNSECURED;
rxrpc_get_local(local);
write_lock(&rxrpc_connection_lock);
list_add_tail(&conn->link, &rxrpc_connections);
write_unlock(&rxrpc_connection_lock);
/* Make the connection a target for incoming packets. */
rxrpc_publish_service_conn(peer, conn);
new = "new";
success:
_net("CONNECTION %s %d {%x}", new, conn->debug_id, conn->proto.cid);
_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
return conn;
found_extant_connection_rcu:
rcu_read_unlock();
goto success;
security_mismatch_rcu:
rcu_read_unlock();
_leave(" = -EKEYREJECTED");
return ERR_PTR(-EKEYREJECTED);
enomem_peer:
rxrpc_put_peer(peer);
enomem:
_leave(" = -ENOMEM");
return ERR_PTR(-ENOMEM);
}
/*
* Remove the service connection from the peer's tree, thereby removing it as a
* target for incoming packets.
*/
void rxrpc_unpublish_service_conn(struct rxrpc_connection *conn)
{
struct rxrpc_peer *peer = conn->params.peer;
write_seqlock_bh(&peer->service_conn_lock);
if (test_and_clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags))
rb_erase(&conn->service_node, &peer->service_conns);
write_sequnlock_bh(&peer->service_conn_lock);
}
......@@ -476,7 +476,7 @@ static void rxrpc_process_jumbo_packet(struct rxrpc_call *call,
sp->hdr.seq += 1;
sp->hdr.serial += 1;
sp->hdr.flags = jhdr.flags;
sp->hdr._rsvd = jhdr._rsvd;
sp->hdr._rsvd = ntohs(jhdr._rsvd);
_proto("Rx DATA Jumbo %%%u", sp->hdr.serial - 1);
......@@ -575,14 +575,13 @@ static void rxrpc_post_packet_to_call(struct rxrpc_call *call,
* post connection-level events to the connection
* - this includes challenges, responses and some aborts
*/
static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
static bool rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
struct sk_buff *skb)
{
_enter("%p,%p", conn, skb);
rxrpc_get_connection(conn);
skb_queue_tail(&conn->rx_queue, skb);
rxrpc_queue_conn(conn);
return rxrpc_queue_conn(conn);
}
/*
......@@ -595,7 +594,7 @@ static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
_enter("%p,%p", local, skb);
skb_queue_tail(&local->event_queue, skb);
rxrpc_queue_work(&local->processor);
rxrpc_queue_local(local);
}
/*
......@@ -627,32 +626,6 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
return 0;
}
static struct rxrpc_connection *rxrpc_conn_from_local(struct rxrpc_local *local,
struct sk_buff *skb)
{
struct rxrpc_peer *peer;
struct rxrpc_connection *conn;
struct sockaddr_rxrpc srx;
rxrpc_get_addr_from_skb(local, skb, &srx);
rcu_read_lock();
peer = rxrpc_lookup_peer_rcu(local, &srx);
if (!peer)
goto cant_find_peer;
conn = rxrpc_find_connection(local, peer, skb);
rcu_read_unlock();
if (!conn)
goto cant_find_conn;
return conn;
cant_find_peer:
rcu_read_unlock();
cant_find_conn:
return NULL;
}
/*
* handle data received on the local endpoint
* - may be called in interrupt context
......@@ -663,6 +636,7 @@ static struct rxrpc_connection *rxrpc_conn_from_local(struct rxrpc_local *local,
*/
void rxrpc_data_ready(struct sock *sk)
{
struct rxrpc_connection *conn;
struct rxrpc_skb_priv *sp;
struct rxrpc_local *local = sk->sk_user_data;
struct sk_buff *skb;
......@@ -726,34 +700,37 @@ void rxrpc_data_ready(struct sock *sk)
(sp->hdr.callNumber == 0 || sp->hdr.seq == 0))
goto bad_message;
if (sp->hdr.callNumber == 0) {
/* This is a connection-level packet. These should be
* fairly rare, so the extra overhead of looking them up the
* old-fashioned way doesn't really hurt */
struct rxrpc_connection *conn;
rcu_read_lock();
conn = rxrpc_conn_from_local(local, skb);
if (!conn)
goto cant_route_call;
retry_find_conn:
conn = rxrpc_find_connection_rcu(local, skb);
if (!conn)
goto cant_route_call;
if (sp->hdr.callNumber == 0) {
/* Connection-level packet */
_debug("CONN %p {%d}", conn, conn->debug_id);
rxrpc_post_packet_to_conn(conn, skb);
rxrpc_put_connection(conn);
if (!rxrpc_post_packet_to_conn(conn, skb))
goto retry_find_conn;
} else {
struct rxrpc_call *call;
/* Call-bound packets are routed by connection channel. */
unsigned int channel = sp->hdr.cid & RXRPC_CHANNELMASK;
struct rxrpc_channel *chan = &conn->channels[channel];
struct rxrpc_call *call = rcu_dereference(chan->call);
call = rxrpc_find_call_hash(&sp->hdr, local,
AF_INET, &ip_hdr(skb)->saddr);
if (call)
rxrpc_post_packet_to_call(call, skb);
else
if (!call || atomic_read(&call->usage) == 0)
goto cant_route_call;
rxrpc_post_packet_to_call(call, skb);
}
rcu_read_unlock();
out:
return;
cant_route_call:
rcu_read_unlock();
_debug("can't route call");
if (sp->hdr.flags & RXRPC_CLIENT_INITIATED &&
sp->hdr.type == RXRPC_PACKET_TYPE_DATA) {
......
......@@ -17,11 +17,12 @@ static int none_init_connection_security(struct rxrpc_connection *conn)
return 0;
}
static void none_prime_packet_security(struct rxrpc_connection *conn)
static int none_prime_packet_security(struct rxrpc_connection *conn)
{
return 0;
}
static int none_secure_packet(const struct rxrpc_call *call,
static int none_secure_packet(struct rxrpc_call *call,
struct sk_buff *skb,
size_t data_size,
void *sechdr)
......@@ -29,7 +30,7 @@ static int none_secure_packet(const struct rxrpc_call *call,
return 0;
}
static int none_verify_packet(const struct rxrpc_call *call,
static int none_verify_packet(struct rxrpc_call *call,
struct sk_buff *skb,
u32 *_abort_code)
{
......
......@@ -374,14 +374,17 @@ void __exit rxrpc_destroy_all_locals(void)
_enter("");
if (list_empty(&rxrpc_local_endpoints))
return;
flush_workqueue(rxrpc_workqueue);
mutex_lock(&rxrpc_local_mutex);
list_for_each_entry(local, &rxrpc_local_endpoints, link) {
pr_err("AF_RXRPC: Leaked local %p {%d}\n",
local, atomic_read(&local->usage));
if (!list_empty(&rxrpc_local_endpoints)) {
mutex_lock(&rxrpc_local_mutex);
list_for_each_entry(local, &rxrpc_local_endpoints, link) {
pr_err("AF_RXRPC: Leaked local %p {%d}\n",
local, atomic_read(&local->usage));
}
mutex_unlock(&rxrpc_local_mutex);
BUG();
}
mutex_unlock(&rxrpc_local_mutex);
BUG();
rcu_barrier();
}
......@@ -189,7 +189,7 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
INIT_WORK(&peer->error_distributor,
&rxrpc_peer_error_distributor);
peer->service_conns = RB_ROOT;
rwlock_init(&peer->conn_lock);
seqlock_init(&peer->service_conn_lock);
spin_lock_init(&peer->lock);
peer->debug_id = atomic_inc_return(&rxrpc_debug_id);
}
......
......@@ -14,15 +14,15 @@
#include <net/af_rxrpc.h>
#include "ar-internal.h"
static const char *const rxrpc_conn_states[] = {
[RXRPC_CONN_UNUSED] = "Unused ",
[RXRPC_CONN_CLIENT] = "Client ",
[RXRPC_CONN_SERVER_UNSECURED] = "SvUnsec ",
[RXRPC_CONN_SERVER_CHALLENGING] = "SvChall ",
[RXRPC_CONN_SERVER] = "SvSecure",
[RXRPC_CONN_REMOTELY_ABORTED] = "RmtAbort",
[RXRPC_CONN_LOCALLY_ABORTED] = "LocAbort",
[RXRPC_CONN_NETWORK_ERROR] = "NetError",
static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = {
[RXRPC_CONN_UNUSED] = "Unused ",
[RXRPC_CONN_CLIENT] = "Client ",
[RXRPC_CONN_SERVICE_UNSECURED] = "SvUnsec ",
[RXRPC_CONN_SERVICE_CHALLENGING] = "SvChall ",
[RXRPC_CONN_SERVICE] = "SvSecure",
[RXRPC_CONN_REMOTELY_ABORTED] = "RmtAbort",
[RXRPC_CONN_LOCALLY_ABORTED] = "LocAbort",
[RXRPC_CONN_NETWORK_ERROR] = "NetError",
};
/*
......@@ -137,7 +137,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
if (v == &rxrpc_connections) {
seq_puts(seq,
"Proto Local Remote "
" SvID ConnID Calls End Use State Key "
" SvID ConnID End Use State Key "
" Serial ISerial\n"
);
return 0;
......@@ -154,13 +154,12 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
ntohs(conn->params.peer->srx.transport.sin.sin_port));
seq_printf(seq,
"UDP %-22.22s %-22.22s %4x %08x %08x %s %3u"
"UDP %-22.22s %-22.22s %4x %08x %s %3u"
" %s %08x %08x %08x\n",
lbuff,
rbuff,
conn->params.service_id,
conn->proto.cid,
conn->call_counter,
rxrpc_conn_is_service(conn) ? "Svc" : "Clt",
atomic_read(&conn->usage),
rxrpc_conn_states[conn->state],
......
This diff is collapsed.
......@@ -10,32 +10,37 @@
*/
#include <linux/ip.h>
#include <linux/ipv6.h>
#include <linux/udp.h>
#include "ar-internal.h"
/*
* Set up an RxRPC address from a socket buffer.
* Fill out a peer address from a socket buffer containing a packet.
*/
void rxrpc_get_addr_from_skb(struct rxrpc_local *local,
const struct sk_buff *skb,
struct sockaddr_rxrpc *srx)
int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *srx, struct sk_buff *skb)
{
memset(srx, 0, sizeof(*srx));
srx->transport_type = local->srx.transport_type;
srx->transport.family = local->srx.transport.family;
/* Can we see an ipv4 UDP packet on an ipv6 UDP socket? and vice
* versa?
*/
switch (srx->transport.family) {
case AF_INET:
switch (ntohs(skb->protocol)) {
case ETH_P_IP:
srx->transport_type = SOCK_DGRAM;
srx->transport_len = sizeof(srx->transport.sin);
srx->transport.sin.sin_family = AF_INET;
srx->transport.sin.sin_port = udp_hdr(skb)->source;
srx->transport_len = sizeof(struct sockaddr_in);
memcpy(&srx->transport.sin.sin_addr, &ip_hdr(skb)->saddr,
sizeof(struct in_addr));
break;
srx->transport.sin.sin_addr.s_addr = ip_hdr(skb)->saddr;
return 0;
case ETH_P_IPV6:
srx->transport_type = SOCK_DGRAM;
srx->transport_len = sizeof(srx->transport.sin6);
srx->transport.sin6.sin6_family = AF_INET6;
srx->transport.sin6.sin6_port = udp_hdr(skb)->source;
srx->transport.sin6.sin6_addr = ipv6_hdr(skb)->saddr;
return 0;
default:
BUG();
pr_warn_ratelimited("AF_RXRPC: Unknown eth protocol %u\n",
ntohs(skb->protocol));
return -EAFNOSUPPORT;
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment