Commit 5c45ab24 authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: make struct tipc_server private for server.c

In order to narrow the interface and dependencies between the topology
server and the subscription/binding table functionality we move struct
tipc_server inside the file server.c. This requires some code
adaptations in other files, but those are mostly minor.

The most important change is that we have to move the start/stop
functions for the topology server to server.c, where they logically
belong anyway.
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent da0a75e8
...@@ -813,8 +813,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, ...@@ -813,8 +813,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
*/ */
void tipc_nametbl_subscribe(struct tipc_subscription *sub) void tipc_nametbl_subscribe(struct tipc_subscription *sub)
{ {
struct tipc_server *srv = sub->server; struct tipc_net *tn = tipc_net(sub->net);
struct tipc_net *tn = tipc_net(srv->net);
struct tipc_subscr *s = &sub->evt.s; struct tipc_subscr *s = &sub->evt.s;
u32 type = tipc_sub_read(s, seq.type); u32 type = tipc_sub_read(s, seq.type);
int index = hash(type); int index = hash(type);
...@@ -822,7 +821,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub) ...@@ -822,7 +821,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub)
struct tipc_name_seq ns; struct tipc_name_seq ns;
spin_lock_bh(&tn->nametbl_lock); spin_lock_bh(&tn->nametbl_lock);
seq = nametbl_find_seq(srv->net, type); seq = nametbl_find_seq(sub->net, type);
if (!seq) if (!seq)
seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
if (seq) { if (seq) {
...@@ -844,14 +843,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub) ...@@ -844,14 +843,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub)
*/ */
void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) void tipc_nametbl_unsubscribe(struct tipc_subscription *sub)
{ {
struct tipc_server *srv = sub->server;
struct tipc_subscr *s = &sub->evt.s; struct tipc_subscr *s = &sub->evt.s;
struct tipc_net *tn = tipc_net(srv->net); struct tipc_net *tn = tipc_net(sub->net);
struct name_seq *seq; struct name_seq *seq;
u32 type = tipc_sub_read(s, seq.type); u32 type = tipc_sub_read(s, seq.type);
spin_lock_bh(&tn->nametbl_lock); spin_lock_bh(&tn->nametbl_lock);
seq = nametbl_find_seq(srv->net, type); seq = nametbl_find_seq(sub->net, type);
if (seq != NULL) { if (seq != NULL) {
spin_lock_bh(&seq->lock); spin_lock_bh(&seq->lock);
list_del_init(&sub->nameseq_list); list_del_init(&sub->nameseq_list);
......
...@@ -49,7 +49,37 @@ ...@@ -49,7 +49,37 @@
#define CF_CONNECTED 1 #define CF_CONNECTED 1
#define CF_SERVER 2 #define CF_SERVER 2
#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) #define TIPC_SERVER_NAME_LEN 32
/**
* struct tipc_server - TIPC server structure
* @conn_idr: identifier set of connection
* @idr_lock: protect the connection identifier set
* @idr_in_use: amount of allocated identifier entry
* @net: network namspace instance
* @rcvbuf_cache: memory cache of server receive buffer
* @rcv_wq: receive workqueue
* @send_wq: send workqueue
* @max_rcvbuf_size: maximum permitted receive message length
* @tipc_conn_new: callback will be called when new connection is incoming
* @tipc_conn_release: callback will be called before releasing the connection
* @tipc_conn_recvmsg: callback will be called when message arrives
* @saddr: TIPC server address
* @name: server name
* @imp: message importance
* @type: socket type
*/
struct tipc_server {
struct idr conn_idr;
spinlock_t idr_lock; /* for idr list */
int idr_in_use;
struct net *net;
struct workqueue_struct *rcv_wq;
struct workqueue_struct *send_wq;
int max_rcvbuf_size;
struct sockaddr_tipc *saddr;
char name[TIPC_SERVER_NAME_LEN];
};
/** /**
* struct tipc_conn - TIPC connection structure * struct tipc_conn - TIPC connection structure
...@@ -93,6 +123,11 @@ static void tipc_recv_work(struct work_struct *work); ...@@ -93,6 +123,11 @@ static void tipc_recv_work(struct work_struct *work);
static void tipc_send_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work);
static void tipc_clean_outqueues(struct tipc_conn *con); static void tipc_clean_outqueues(struct tipc_conn *con);
static struct tipc_conn *sock2con(struct sock *sk)
{
return sk->sk_user_data;
}
static bool connected(struct tipc_conn *con) static bool connected(struct tipc_conn *con)
{ {
return con && test_bit(CF_CONNECTED, &con->flags); return con && test_bit(CF_CONNECTED, &con->flags);
...@@ -198,14 +233,17 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) ...@@ -198,14 +233,17 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
{ {
struct list_head *sub_list = &con->sub_list; struct list_head *sub_list = &con->sub_list;
struct tipc_net *tn = tipc_net(con->server->net);
struct tipc_subscription *sub, *tmp; struct tipc_subscription *sub, *tmp;
spin_lock_bh(&con->sub_lock); spin_lock_bh(&con->sub_lock);
list_for_each_entry_safe(sub, tmp, sub_list, sub_list) { list_for_each_entry_safe(sub, tmp, sub_list, sub_list) {
if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) {
tipc_sub_unsubscribe(sub); tipc_sub_unsubscribe(sub);
else if (s) atomic_dec(&tn->subscription_count);
} else if (s) {
break; break;
}
} }
spin_unlock_bh(&con->sub_lock); spin_unlock_bh(&con->sub_lock);
} }
...@@ -220,8 +258,7 @@ static void tipc_close_conn(struct tipc_conn *con) ...@@ -220,8 +258,7 @@ static void tipc_close_conn(struct tipc_conn *con)
if (disconnect) { if (disconnect) {
sk->sk_user_data = NULL; sk->sk_user_data = NULL;
if (con->conid) tipc_con_delete_sub(con, NULL);
tipc_con_delete_sub(con, NULL);
} }
write_unlock_bh(&sk->sk_callback_lock); write_unlock_bh(&sk->sk_callback_lock);
...@@ -272,15 +309,21 @@ static int tipc_con_rcv_sub(struct tipc_server *srv, ...@@ -272,15 +309,21 @@ static int tipc_con_rcv_sub(struct tipc_server *srv,
struct tipc_conn *con, struct tipc_conn *con,
struct tipc_subscr *s) struct tipc_subscr *s)
{ {
struct tipc_net *tn = tipc_net(srv->net);
struct tipc_subscription *sub; struct tipc_subscription *sub;
if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) {
tipc_con_delete_sub(con, s); tipc_con_delete_sub(con, s);
return 0; return 0;
} }
sub = tipc_sub_subscribe(srv, s, con->conid); if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) {
pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR);
return -1;
}
sub = tipc_sub_subscribe(srv->net, s, con->conid);
if (!sub) if (!sub)
return -1; return -1;
atomic_inc(&tn->subscription_count);
spin_lock_bh(&con->sub_lock); spin_lock_bh(&con->sub_lock);
list_add(&sub->sub_list, &con->sub_list); list_add(&sub->sub_list, &con->sub_list);
spin_unlock_bh(&con->sub_lock); spin_unlock_bh(&con->sub_lock);
...@@ -426,13 +469,14 @@ static void tipc_clean_outqueues(struct tipc_conn *con) ...@@ -426,13 +469,14 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
/* tipc_conn_queue_evt - interrupt level call from a subscription instance /* tipc_conn_queue_evt - interrupt level call from a subscription instance
* The queued job is launched in tipc_send_to_sock() * The queued job is launched in tipc_send_to_sock()
*/ */
void tipc_conn_queue_evt(struct tipc_server *s, int conid, void tipc_conn_queue_evt(struct net *net, int conid,
u32 event, struct tipc_event *evt) u32 event, struct tipc_event *evt)
{ {
struct tipc_server *srv = tipc_topsrv(net);
struct outqueue_entry *e; struct outqueue_entry *e;
struct tipc_conn *con; struct tipc_conn *con;
con = tipc_conn_lookup(s, conid); con = tipc_conn_lookup(srv, conid);
if (!con) if (!con)
return; return;
...@@ -448,7 +492,7 @@ void tipc_conn_queue_evt(struct tipc_server *s, int conid, ...@@ -448,7 +492,7 @@ void tipc_conn_queue_evt(struct tipc_server *s, int conid,
list_add_tail(&e->list, &con->outqueue); list_add_tail(&e->list, &con->outqueue);
spin_unlock_bh(&con->outqueue_lock); spin_unlock_bh(&con->outqueue_lock);
if (queue_work(s->send_wq, &con->swork)) if (queue_work(srv->send_wq, &con->swork))
return; return;
err: err:
conn_put(con); conn_put(con);
...@@ -620,41 +664,71 @@ static int tipc_work_start(struct tipc_server *s) ...@@ -620,41 +664,71 @@ static int tipc_work_start(struct tipc_server *s)
return 0; return 0;
} }
int tipc_server_start(struct tipc_server *s) int tipc_topsrv_start(struct net *net)
{ {
struct tipc_net *tn = tipc_net(net);
const char name[] = "topology_server";
struct sockaddr_tipc *saddr;
struct tipc_server *srv;
int ret; int ret;
spin_lock_init(&s->idr_lock); saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC);
idr_init(&s->conn_idr); if (!saddr)
s->idr_in_use = 0; return -ENOMEM;
saddr->family = AF_TIPC;
saddr->addrtype = TIPC_ADDR_NAMESEQ;
saddr->addr.nameseq.type = TIPC_TOP_SRV;
saddr->addr.nameseq.lower = TIPC_TOP_SRV;
saddr->addr.nameseq.upper = TIPC_TOP_SRV;
saddr->scope = TIPC_NODE_SCOPE;
srv = kzalloc(sizeof(*srv), GFP_ATOMIC);
if (!srv) {
kfree(saddr);
return -ENOMEM;
}
srv->net = net;
srv->saddr = saddr;
srv->max_rcvbuf_size = sizeof(struct tipc_subscr);
strncpy(srv->name, name, strlen(name) + 1);
tn->topsrv = srv;
atomic_set(&tn->subscription_count, 0);
spin_lock_init(&srv->idr_lock);
idr_init(&srv->conn_idr);
srv->idr_in_use = 0;
ret = tipc_work_start(s); ret = tipc_work_start(srv);
if (ret < 0) if (ret < 0)
return ret; return ret;
ret = tipc_open_listening_sock(s); ret = tipc_open_listening_sock(srv);
if (ret < 0) if (ret < 0)
tipc_work_stop(s); tipc_work_stop(srv);
return ret; return ret;
} }
void tipc_server_stop(struct tipc_server *s) void tipc_topsrv_stop(struct net *net)
{ {
struct tipc_server *srv = tipc_topsrv(net);
struct tipc_conn *con; struct tipc_conn *con;
int id; int id;
spin_lock_bh(&s->idr_lock); spin_lock_bh(&srv->idr_lock);
for (id = 0; s->idr_in_use; id++) { for (id = 0; srv->idr_in_use; id++) {
con = idr_find(&s->conn_idr, id); con = idr_find(&srv->conn_idr, id);
if (con) { if (con) {
spin_unlock_bh(&s->idr_lock); spin_unlock_bh(&srv->idr_lock);
tipc_close_conn(con); tipc_close_conn(con);
spin_lock_bh(&s->idr_lock); spin_lock_bh(&srv->idr_lock);
} }
} }
spin_unlock_bh(&s->idr_lock); spin_unlock_bh(&srv->idr_lock);
tipc_work_stop(s); tipc_work_stop(srv);
idr_destroy(&s->conn_idr); idr_destroy(&srv->conn_idr);
kfree(srv->saddr);
kfree(srv);
} }
...@@ -47,45 +47,11 @@ ...@@ -47,45 +47,11 @@
#define TIPC_SUB_NODE_SCOPE 0x40 #define TIPC_SUB_NODE_SCOPE 0x40
#define TIPC_SUB_NO_STATUS 0x80 #define TIPC_SUB_NO_STATUS 0x80
/** void tipc_conn_queue_evt(struct net *net, int conid,
* struct tipc_server - TIPC server structure
* @conn_idr: identifier set of connection
* @idr_lock: protect the connection identifier set
* @idr_in_use: amount of allocated identifier entry
* @net: network namspace instance
* @rcvbuf_cache: memory cache of server receive buffer
* @rcv_wq: receive workqueue
* @send_wq: send workqueue
* @max_rcvbuf_size: maximum permitted receive message length
* @tipc_conn_new: callback will be called when new connection is incoming
* @tipc_conn_release: callback will be called before releasing the connection
* @tipc_conn_recvmsg: callback will be called when message arrives
* @saddr: TIPC server address
* @name: server name
* @imp: message importance
* @type: socket type
*/
struct tipc_server {
struct idr conn_idr;
spinlock_t idr_lock;
int idr_in_use;
struct net *net;
struct workqueue_struct *rcv_wq;
struct workqueue_struct *send_wq;
int max_rcvbuf_size;
struct sockaddr_tipc *saddr;
char name[TIPC_SERVER_NAME_LEN];
};
void tipc_conn_queue_evt(struct tipc_server *s, int conid,
u32 event, struct tipc_event *evt); u32 event, struct tipc_event *evt);
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid); u32 upper, u32 filter, int *conid);
void tipc_topsrv_kern_unsubscr(struct net *net, int conid); void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
int tipc_server_start(struct tipc_server *s);
void tipc_server_stop(struct tipc_server *s);
#endif #endif
...@@ -51,7 +51,7 @@ static void tipc_sub_send_event(struct tipc_subscription *sub, ...@@ -51,7 +51,7 @@ static void tipc_sub_send_event(struct tipc_subscription *sub,
tipc_evt_write(evt, found_upper, found_upper); tipc_evt_write(evt, found_upper, found_upper);
tipc_evt_write(evt, port.ref, port); tipc_evt_write(evt, port.ref, port);
tipc_evt_write(evt, port.node, node); tipc_evt_write(evt, port.node, node);
tipc_conn_queue_evt(sub->server, sub->conid, event, evt); tipc_conn_queue_evt(sub->net, sub->conid, event, evt);
} }
/** /**
...@@ -114,14 +114,7 @@ static void tipc_sub_timeout(struct timer_list *t) ...@@ -114,14 +114,7 @@ static void tipc_sub_timeout(struct timer_list *t)
static void tipc_sub_kref_release(struct kref *kref) static void tipc_sub_kref_release(struct kref *kref)
{ {
struct tipc_subscription *sub; kfree(container_of(kref, struct tipc_subscription, kref));
struct tipc_net *tn;
sub = container_of(kref, struct tipc_subscription, kref);
tn = tipc_net(sub->server->net);
atomic_dec(&tn->subscription_count);
kfree(sub);
} }
void tipc_sub_put(struct tipc_subscription *subscription) void tipc_sub_put(struct tipc_subscription *subscription)
...@@ -134,19 +127,14 @@ void tipc_sub_get(struct tipc_subscription *subscription) ...@@ -134,19 +127,14 @@ void tipc_sub_get(struct tipc_subscription *subscription)
kref_get(&subscription->kref); kref_get(&subscription->kref);
} }
struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, struct tipc_subscription *tipc_sub_subscribe(struct net *net,
struct tipc_subscr *s, struct tipc_subscr *s,
int conid) int conid)
{ {
struct tipc_net *tn = tipc_net(srv->net);
u32 filter = tipc_sub_read(s, filter); u32 filter = tipc_sub_read(s, filter);
struct tipc_subscription *sub; struct tipc_subscription *sub;
u32 timeout; u32 timeout;
if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) {
pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR);
return NULL;
}
if ((filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) || if ((filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) ||
(tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper))) { (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper))) {
pr_warn("Subscription rejected, illegal request\n"); pr_warn("Subscription rejected, illegal request\n");
...@@ -157,12 +145,11 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, ...@@ -157,12 +145,11 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv,
pr_warn("Subscription rejected, no memory\n"); pr_warn("Subscription rejected, no memory\n");
return NULL; return NULL;
} }
sub->server = srv; sub->net = net;
sub->conid = conid; sub->conid = conid;
sub->inactive = false; sub->inactive = false;
memcpy(&sub->evt.s, s, sizeof(*s)); memcpy(&sub->evt.s, s, sizeof(*s));
spin_lock_init(&sub->lock); spin_lock_init(&sub->lock);
atomic_inc(&tn->subscription_count);
kref_init(&sub->kref); kref_init(&sub->kref);
tipc_nametbl_subscribe(sub); tipc_nametbl_subscribe(sub);
timer_setup(&sub->timer, tipc_sub_timeout, 0); timer_setup(&sub->timer, tipc_sub_timeout, 0);
...@@ -180,46 +167,3 @@ void tipc_sub_unsubscribe(struct tipc_subscription *sub) ...@@ -180,46 +167,3 @@ void tipc_sub_unsubscribe(struct tipc_subscription *sub)
list_del(&sub->sub_list); list_del(&sub->sub_list);
tipc_sub_put(sub); tipc_sub_put(sub);
} }
int tipc_topsrv_start(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
const char name[] = "topology_server";
struct sockaddr_tipc *saddr;
struct tipc_server *topsrv;
saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC);
if (!saddr)
return -ENOMEM;
saddr->family = AF_TIPC;
saddr->addrtype = TIPC_ADDR_NAMESEQ;
saddr->addr.nameseq.type = TIPC_TOP_SRV;
saddr->addr.nameseq.lower = TIPC_TOP_SRV;
saddr->addr.nameseq.upper = TIPC_TOP_SRV;
saddr->scope = TIPC_NODE_SCOPE;
topsrv = kzalloc(sizeof(*topsrv), GFP_ATOMIC);
if (!topsrv) {
kfree(saddr);
return -ENOMEM;
}
topsrv->net = net;
topsrv->saddr = saddr;
topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr);
strncpy(topsrv->name, name, strlen(name) + 1);
tn->topsrv = topsrv;
atomic_set(&tn->subscription_count, 0);
return tipc_server_start(topsrv);
}
void tipc_topsrv_stop(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_server *topsrv = tn->topsrv;
tipc_server_stop(topsrv);
kfree(topsrv->saddr);
kfree(topsrv);
}
...@@ -56,7 +56,7 @@ struct tipc_conn; ...@@ -56,7 +56,7 @@ struct tipc_conn;
*/ */
struct tipc_subscription { struct tipc_subscription {
struct kref kref; struct kref kref;
struct tipc_server *server; struct net *net;
struct timer_list timer; struct timer_list timer;
struct list_head nameseq_list; struct list_head nameseq_list;
struct list_head sub_list; struct list_head sub_list;
...@@ -66,7 +66,7 @@ struct tipc_subscription { ...@@ -66,7 +66,7 @@ struct tipc_subscription {
spinlock_t lock; /* serialize up/down and timer events */ spinlock_t lock; /* serialize up/down and timer events */
}; };
struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, struct tipc_subscription *tipc_sub_subscribe(struct net *net,
struct tipc_subscr *s, struct tipc_subscr *s,
int conid); int conid);
void tipc_sub_unsubscribe(struct tipc_subscription *sub); void tipc_sub_unsubscribe(struct tipc_subscription *sub);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment