Commit 00bc00a9 authored by Ying Xue's avatar Ying Xue Committed by David S. Miller

tipc: involve reference counter for subscriber

At present subscriber's lock is used to protect the subscription list
of subscriber as well as subscriptions linked into the list. While one
or all subscriptions are deleted through iterating the list, the
subscriber's lock must be held. Meanwhile, as deletion of subscription
may happen in subscription timer's handler, the lock must be grabbed
in the function as well. When subscription's timer is terminated with
del_timer_sync() during above iteration, subscriber's lock has to be
temporarily released, otherwise, deadlock may occur. However, the
temporary release may cause the double free of a subscription as the
subscription is not disconnected from the subscription list.

Now if a reference counter is introduced to subscriber, subscription's
timer can be asynchronously stopped with del_timer(). As a result, the
issue is not only able to be fixed, but also relevant code is pretty
readable and understandable.
Signed-off-by: default avatarYing Xue <ying.xue@windriver.com>
Reviewed-by: default avatarJon Maloy <jon.maloy@ericson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 1b764828
......@@ -40,16 +40,21 @@
/**
* struct tipc_subscriber - TIPC network topology subscriber
* @kref: reference counter to tipc_subscription object
* @conid: connection identifier to server connecting to subscriber
* @lock: control access to subscriber
* @subscrp_list: list of subscription objects for this subscriber
*/
struct tipc_subscriber {
struct kref kref;
int conid;
spinlock_t lock;
struct list_head subscrp_list;
};
static void tipc_subscrp_delete(struct tipc_subscription *sub);
static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
/**
* htohl - convert value to endianness used by destination
* @in: value to convert
......@@ -116,42 +121,34 @@ static void tipc_subscrp_timeout(unsigned long data)
{
struct tipc_subscription *sub = (struct tipc_subscription *)data;
struct tipc_subscriber *subscriber = sub->subscriber;
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
/* The spin lock per subscriber is used to protect its members */
spin_lock_bh(&subscriber->lock);
/* Notify subscriber of timeout */
tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
TIPC_SUBSCR_TIMEOUT, 0, 0);
/* Validate timeout (in case subscription is being cancelled) */
if (sub->timeout == TIPC_WAIT_FOREVER) {
spin_lock_bh(&subscriber->lock);
tipc_subscrp_delete(sub);
spin_unlock_bh(&subscriber->lock);
return;
}
/* Unlink subscription from name table */
tipc_nametbl_unsubscribe(sub);
/* Unlink subscription from subscriber */
list_del(&sub->subscrp_list);
spin_unlock_bh(&subscriber->lock);
tipc_subscrb_put(subscriber);
}
/* Notify subscriber of timeout */
tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
TIPC_SUBSCR_TIMEOUT, 0, 0);
static void tipc_subscrb_kref_release(struct kref *kref)
{
struct tipc_subscriber *subcriber = container_of(kref,
struct tipc_subscriber, kref);
/* Now destroy subscription */
kfree(sub);
atomic_dec(&tn->subscription_count);
kfree(subcriber);
}
static void tipc_subscrp_delete(struct tipc_subscription *sub)
static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
{
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
kref_put(&subscriber->kref, tipc_subscrb_kref_release);
}
tipc_nametbl_unsubscribe(sub);
list_del(&sub->subscrp_list);
kfree(sub);
atomic_dec(&tn->subscription_count);
static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
{
kref_get(&subscriber->kref);
}
static struct tipc_subscriber *tipc_subscrb_create(int conid)
......@@ -163,6 +160,7 @@ static struct tipc_subscriber *tipc_subscrb_create(int conid)
pr_warn("Subscriber rejected, no memory\n");
return NULL;
}
kref_init(&subscriber->kref);
INIT_LIST_HEAD(&subscriber->subscrp_list);
subscriber->conid = conid;
spin_lock_init(&subscriber->lock);
......@@ -172,62 +170,48 @@ static struct tipc_subscriber *tipc_subscrb_create(int conid)
static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
{
struct tipc_subscription *sub;
struct tipc_subscription *sub_temp;
struct tipc_subscription *sub, *temp;
spin_lock_bh(&subscriber->lock);
/* Destroy any existing subscriptions for subscriber */
list_for_each_entry_safe(sub, sub_temp, &subscriber->subscrp_list,
list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
subscrp_list) {
if (sub->timeout != TIPC_WAIT_FOREVER) {
spin_unlock_bh(&subscriber->lock);
del_timer_sync(&sub->timer);
spin_lock_bh(&subscriber->lock);
}
if (del_timer(&sub->timer)) {
tipc_subscrp_delete(sub);
tipc_subscrb_put(subscriber);
}
}
spin_unlock_bh(&subscriber->lock);
/* Now destroy subscriber */
kfree(subscriber);
tipc_subscrb_put(subscriber);
}
static void tipc_subscrp_delete(struct tipc_subscription *sub)
{
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
tipc_nametbl_unsubscribe(sub);
list_del(&sub->subscrp_list);
kfree(sub);
atomic_dec(&tn->subscription_count);
}
/**
* tipc_subscrp_cancel - handle subscription cancellation request
*
* Called with subscriber lock held. Routine must temporarily release lock
* to enable the subscription timeout routine to finish without deadlocking;
* the lock is then reclaimed to allow caller to release it upon return.
*
* Note that fields of 's' use subscriber's endianness!
*/
static void tipc_subscrp_cancel(struct tipc_subscr *s,
struct tipc_subscriber *subscriber)
{
struct tipc_subscription *sub;
struct tipc_subscription *sub_temp;
int found = 0;
struct tipc_subscription *sub, *temp;
/* Find first matching subscription, exit if not found */
list_for_each_entry_safe(sub, sub_temp, &subscriber->subscrp_list,
list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
subscrp_list) {
if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
found = 1;
break;
if (del_timer(&sub->timer)) {
tipc_subscrp_delete(sub);
tipc_subscrb_put(subscriber);
}
break;
}
if (!found)
return;
/* Cancel subscription timer (if used), then delete subscription */
if (sub->timeout != TIPC_WAIT_FOREVER) {
sub->timeout = TIPC_WAIT_FOREVER;
spin_unlock_bh(&subscriber->lock);
del_timer_sync(&sub->timer);
spin_lock_bh(&subscriber->lock);
}
tipc_subscrp_delete(sub);
}
static int tipc_subscrp_create(struct net *net, struct tipc_subscr *s,
......@@ -281,11 +265,11 @@ static int tipc_subscrp_create(struct net *net, struct tipc_subscr *s,
sub->swap = swap;
memcpy(&sub->evt.s, s, sizeof(*s));
atomic_inc(&tn->subscription_count);
if (sub->timeout != TIPC_WAIT_FOREVER) {
setup_timer(&sub->timer, tipc_subscrp_timeout,
(unsigned long)sub);
mod_timer(&sub->timer, jiffies + sub->timeout);
}
setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
if (sub->timeout != TIPC_WAIT_FOREVER)
sub->timeout += jiffies;
if (!mod_timer(&sub->timer, sub->timeout))
tipc_subscrb_get(subscriber);
*sub_p = sub;
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment