Commit 24be34b5 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: eliminate upcall function pointers between port and socket

Due to the original one-to-many relation between port and user API
layers, upcalls to the API have been performed via function pointers,
installed in struct tipc_port at creation. Since this relation now
always is one-to-one, we can instead use ordinary function calls.

We remove the function pointers 'dispatcher' and ´wakeup' from
struct tipc_port, and replace them with calls to the renamed
functions tipc_sk_rcv() and tipc_sk_wakeup().

At the same time we change the name and signature of the functions
tipc_createport() and tipc_deleteport() to reflect their new role
as mere initialization/destruction functions.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Reviewed-by: default avatarErik Hugne <erik.hugne@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 8826cde6
...@@ -327,8 +327,6 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) ...@@ -327,8 +327,6 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
spin_lock_bh(&tipc_port_list_lock); spin_lock_bh(&tipc_port_list_lock);
p_ptr = tipc_port_lock(origport); p_ptr = tipc_port_lock(origport);
if (p_ptr) { if (p_ptr) {
if (!p_ptr->wakeup)
goto exit;
if (!list_empty(&p_ptr->wait_list)) if (!list_empty(&p_ptr->wait_list))
goto exit; goto exit;
p_ptr->congested = 1; p_ptr->congested = 1;
...@@ -363,7 +361,7 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) ...@@ -363,7 +361,7 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
list_del_init(&p_ptr->wait_list); list_del_init(&p_ptr->wait_list);
spin_lock_bh(p_ptr->lock); spin_lock_bh(p_ptr->lock);
p_ptr->congested = 0; p_ptr->congested = 0;
p_ptr->wakeup(p_ptr); tipc_port_wakeup(p_ptr);
win -= p_ptr->waiting_pkts; win -= p_ptr->waiting_pkts;
spin_unlock_bh(p_ptr->lock); spin_unlock_bh(p_ptr->lock);
} }
......
...@@ -190,33 +190,32 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) ...@@ -190,33 +190,32 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
tipc_port_list_free(dp); tipc_port_list_free(dp);
} }
/**
* tipc_createport - create a generic TIPC port void tipc_port_wakeup(struct tipc_port *port)
{
tipc_sk_wakeup(tipc_port_to_sk(port));
}
/* tipc_port_init - intiate TIPC port and lock it
* *
* Returns pointer to (locked) TIPC port, or NULL if unable to create it * Returns obtained reference if initialization is successful, zero otherwise
*/ */
struct tipc_port *tipc_createport(struct sock *sk, u32 tipc_port_init(struct tipc_port *p_ptr,
u32 (*dispatcher)(struct tipc_port *, const unsigned int importance)
struct sk_buff *),
void (*wakeup)(struct tipc_port *),
const u32 importance)
{ {
struct tipc_port *p_ptr = tipc_sk_port(sk);
struct tipc_msg *msg; struct tipc_msg *msg;
u32 ref; u32 ref;
ref = tipc_ref_acquire(p_ptr, &p_ptr->lock); ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
if (!ref) { if (!ref) {
pr_warn("Port registration failed, ref. table exhausted\n"); pr_warn("Port registration failed, ref. table exhausted\n");
return NULL; return 0;
} }
p_ptr->max_pkt = MAX_PKT_DEFAULT; p_ptr->max_pkt = MAX_PKT_DEFAULT;
p_ptr->ref = ref; p_ptr->ref = ref;
INIT_LIST_HEAD(&p_ptr->wait_list); INIT_LIST_HEAD(&p_ptr->wait_list);
INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
p_ptr->dispatcher = dispatcher;
p_ptr->wakeup = wakeup;
k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
INIT_LIST_HEAD(&p_ptr->publications); INIT_LIST_HEAD(&p_ptr->publications);
INIT_LIST_HEAD(&p_ptr->port_list); INIT_LIST_HEAD(&p_ptr->port_list);
...@@ -232,10 +231,10 @@ struct tipc_port *tipc_createport(struct sock *sk, ...@@ -232,10 +231,10 @@ struct tipc_port *tipc_createport(struct sock *sk,
msg_set_origport(msg, ref); msg_set_origport(msg, ref);
list_add_tail(&p_ptr->port_list, &ports); list_add_tail(&p_ptr->port_list, &ports);
spin_unlock_bh(&tipc_port_list_lock); spin_unlock_bh(&tipc_port_list_lock);
return p_ptr; return ref;
} }
int tipc_deleteport(struct tipc_port *p_ptr) void tipc_port_destroy(struct tipc_port *p_ptr)
{ {
struct sk_buff *buf = NULL; struct sk_buff *buf = NULL;
...@@ -257,7 +256,6 @@ int tipc_deleteport(struct tipc_port *p_ptr) ...@@ -257,7 +256,6 @@ int tipc_deleteport(struct tipc_port *p_ptr)
spin_unlock_bh(&tipc_port_list_lock); spin_unlock_bh(&tipc_port_list_lock);
k_term_timer(&p_ptr->timer); k_term_timer(&p_ptr->timer);
tipc_net_route_msg(buf); tipc_net_route_msg(buf);
return 0;
} }
static int port_unreliable(struct tipc_port *p_ptr) static int port_unreliable(struct tipc_port *p_ptr)
...@@ -530,13 +528,12 @@ void tipc_port_proto_rcv(struct sk_buff *buf) ...@@ -530,13 +528,12 @@ void tipc_port_proto_rcv(struct sk_buff *buf)
/* Process protocol message sent by peer */ /* Process protocol message sent by peer */
switch (msg_type(msg)) { switch (msg_type(msg)) {
case CONN_ACK: case CONN_ACK:
wakeable = tipc_port_congested(p_ptr) && p_ptr->congested && wakeable = tipc_port_congested(p_ptr) && p_ptr->congested;
p_ptr->wakeup;
p_ptr->acked += msg_msgcnt(msg); p_ptr->acked += msg_msgcnt(msg);
if (!tipc_port_congested(p_ptr)) { if (!tipc_port_congested(p_ptr)) {
p_ptr->congested = 0; p_ptr->congested = 0;
if (wakeable) if (wakeable)
p_ptr->wakeup(p_ptr); tipc_port_wakeup(p_ptr);
} }
break; break;
case CONN_PROBE: case CONN_PROBE:
...@@ -865,7 +862,7 @@ int tipc_port_rcv(struct sk_buff *buf) ...@@ -865,7 +862,7 @@ int tipc_port_rcv(struct sk_buff *buf)
/* validate destination & pass to port, otherwise reject message */ /* validate destination & pass to port, otherwise reject message */
p_ptr = tipc_port_lock(destport); p_ptr = tipc_port_lock(destport);
if (likely(p_ptr)) { if (likely(p_ptr)) {
err = p_ptr->dispatcher(p_ptr, buf); err = tipc_sk_rcv(tipc_port_to_sk(p_ptr), buf);
tipc_port_unlock(p_ptr); tipc_port_unlock(p_ptr);
if (likely(!err)) if (likely(!err))
return dsz; return dsz;
......
...@@ -59,8 +59,6 @@ ...@@ -59,8 +59,6 @@
* @ref: unique reference to port in TIPC object registry * @ref: unique reference to port in TIPC object registry
* @phdr: preformatted message header used when sending messages * @phdr: preformatted message header used when sending messages
* @port_list: adjacent ports in TIPC's global list of ports * @port_list: adjacent ports in TIPC's global list of ports
* @dispatcher: ptr to routine which handles received messages
* @wakeup: ptr to routine to call when port is no longer congested
* @wait_list: adjacent ports in list of ports waiting on link congestion * @wait_list: adjacent ports in list of ports waiting on link congestion
* @waiting_pkts: * @waiting_pkts:
* @sent: # of non-empty messages sent by port * @sent: # of non-empty messages sent by port
...@@ -84,8 +82,6 @@ struct tipc_port { ...@@ -84,8 +82,6 @@ struct tipc_port {
u32 ref; u32 ref;
struct tipc_msg phdr; struct tipc_msg phdr;
struct list_head port_list; struct list_head port_list;
u32 (*dispatcher)(struct tipc_port *, struct sk_buff *);
void (*wakeup)(struct tipc_port *);
struct list_head wait_list; struct list_head wait_list;
u32 waiting_pkts; u32 waiting_pkts;
u32 sent; u32 sent;
...@@ -104,17 +100,14 @@ struct tipc_port_list; ...@@ -104,17 +100,14 @@ struct tipc_port_list;
/* /*
* TIPC port manipulation routines * TIPC port manipulation routines
*/ */
struct tipc_port *tipc_createport(struct sock *sk, u32 tipc_port_init(struct tipc_port *p_ptr,
u32 (*dispatcher)(struct tipc_port *, const unsigned int importance);
struct sk_buff *),
void (*wakeup)(struct tipc_port *),
const u32 importance);
int tipc_reject_msg(struct sk_buff *buf, u32 err); int tipc_reject_msg(struct sk_buff *buf, u32 err);
void tipc_acknowledge(u32 port_ref, u32 ack); void tipc_acknowledge(u32 port_ref, u32 ack);
int tipc_deleteport(struct tipc_port *p_ptr); void tipc_port_destroy(struct tipc_port *p_ptr);
int tipc_portimportance(u32 portref, unsigned int *importance); int tipc_portimportance(u32 portref, unsigned int *importance);
int tipc_set_portimportance(u32 portref, unsigned int importance); int tipc_set_portimportance(u32 portref, unsigned int importance);
...@@ -136,6 +129,7 @@ int tipc_port_disconnect(u32 portref); ...@@ -136,6 +129,7 @@ int tipc_port_disconnect(u32 portref);
int tipc_port_shutdown(u32 ref); int tipc_port_shutdown(u32 ref);
void tipc_port_wakeup(struct tipc_port *port);
/* /*
* The following routines require that the port be locked on entry * The following routines require that the port be locked on entry
......
...@@ -44,10 +44,7 @@ ...@@ -44,10 +44,7 @@
#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
static int backlog_rcv(struct sock *sk, struct sk_buff *skb); static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
static void wakeupdispatch(struct tipc_port *tport);
static void tipc_data_ready(struct sock *sk, int len); static void tipc_data_ready(struct sock *sk, int len);
static void tipc_write_space(struct sock *sk); static void tipc_write_space(struct sock *sk);
static int tipc_release(struct socket *sock); static int tipc_release(struct socket *sock);
...@@ -181,10 +178,8 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, ...@@ -181,10 +178,8 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,
if (sk == NULL) if (sk == NULL)
return -ENOMEM; return -ENOMEM;
/* Allocate TIPC port for socket to use */ tp_ptr = tipc_sk_port(sk);
tp_ptr = tipc_createport(sk, &dispatch, &wakeupdispatch, if (!tipc_port_init(tp_ptr, TIPC_LOW_IMPORTANCE)) {
TIPC_LOW_IMPORTANCE);
if (unlikely(!tp_ptr)) {
sk_free(sk); sk_free(sk);
return -ENOMEM; return -ENOMEM;
} }
...@@ -199,7 +194,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, ...@@ -199,7 +194,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,
sk->sk_data_ready = tipc_data_ready; sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space; sk->sk_write_space = tipc_write_space;
tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
spin_unlock_bh(tp_ptr->lock); spin_unlock_bh(tp_ptr->lock);
if (sock->state == SS_READY) { if (sock->state == SS_READY) {
...@@ -207,7 +201,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, ...@@ -207,7 +201,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,
if (sock->type == SOCK_DGRAM) if (sock->type == SOCK_DGRAM)
tipc_set_portunreliable(tp_ptr->ref, 1); tipc_set_portunreliable(tp_ptr->ref, 1);
} }
return 0; return 0;
} }
...@@ -337,7 +330,7 @@ static int tipc_release(struct socket *sock) ...@@ -337,7 +330,7 @@ static int tipc_release(struct socket *sock)
* Delete TIPC port; this ensures no more messages are queued * Delete TIPC port; this ensures no more messages are queued
* (also disconnects an active connection & sends a 'FIN-' to peer) * (also disconnects an active connection & sends a 'FIN-' to peer)
*/ */
res = tipc_deleteport(tport); tipc_port_destroy(tport);
/* Discard any remaining (connection-based) messages in receive queue */ /* Discard any remaining (connection-based) messages in receive queue */
__skb_queue_purge(&sk->sk_receive_queue); __skb_queue_purge(&sk->sk_receive_queue);
...@@ -1430,17 +1423,16 @@ static int backlog_rcv(struct sock *sk, struct sk_buff *buf) ...@@ -1430,17 +1423,16 @@ static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
} }
/** /**
* dispatch - handle incoming message * tipc_sk_rcv - handle incoming message
* @tport: TIPC port that received message * @sk: socket receiving message
* @buf: message * @buf: message
* *
* Called with port lock already taken. * Called with port lock already taken.
* *
* Returns TIPC error status code (TIPC_OK if message is not to be rejected) * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
*/ */
static u32 dispatch(struct tipc_port *port, struct sk_buff *buf) u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf)
{ {
struct sock *sk = tipc_port_to_sk(port);
u32 res; u32 res;
/* /*
...@@ -1463,18 +1455,6 @@ static u32 dispatch(struct tipc_port *port, struct sk_buff *buf) ...@@ -1463,18 +1455,6 @@ static u32 dispatch(struct tipc_port *port, struct sk_buff *buf)
return res; return res;
} }
/**
* wakeupdispatch - wake up port after congestion
* @tport: port to wakeup
*
* Called with port lock already taken.
*/
static void wakeupdispatch(struct tipc_port *port)
{
struct sock *sk = tipc_port_to_sk(port);
sk->sk_write_space(sk);
}
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
......
...@@ -67,4 +67,11 @@ static inline struct sock *tipc_port_to_sk(const struct tipc_port *port) ...@@ -67,4 +67,11 @@ static inline struct sock *tipc_port_to_sk(const struct tipc_port *port)
return &(container_of(port, struct tipc_sock, port))->sk; return &(container_of(port, struct tipc_sock, port))->sk;
} }
static inline void tipc_sk_wakeup(struct sock *sk)
{
sk->sk_write_space(sk);
}
u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf);
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment