Commit 5f1a8c1b authored by Steffen Klassert's avatar Steffen Klassert Committed by Herbert Xu

padata: simplify serialization mechanism

We count the number of processed objects on a percpu basis,
so we need to go through all the percpu reorder queues to calculate
the sequence number of the next object that needs serialization.
This patch changes this to count the number of processed objects
global. So we can calculate the sequence number and the percpu
reorder queue of the next object that needs serialization without
searching through the percpu reorder queues. This avoids some
accesses to memory of foreign cpus.
Signed-off-by: default avatarSteffen Klassert <steffen.klassert@secunet.com>
Signed-off-by: default avatarHerbert Xu <herbert@gondor.apana.org.au>
parent 83f619f3
...@@ -67,7 +67,6 @@ struct padata_list { ...@@ -67,7 +67,6 @@ struct padata_list {
* @pwork: work struct for parallelization. * @pwork: work struct for parallelization.
* @swork: work struct for serialization. * @swork: work struct for serialization.
* @pd: Backpointer to the internal control structure. * @pd: Backpointer to the internal control structure.
* @num_obj: Number of objects that are processed by this cpu.
* @cpu_index: Index of the cpu. * @cpu_index: Index of the cpu.
*/ */
struct padata_queue { struct padata_queue {
...@@ -77,7 +76,6 @@ struct padata_queue { ...@@ -77,7 +76,6 @@ struct padata_queue {
struct work_struct pwork; struct work_struct pwork;
struct work_struct swork; struct work_struct swork;
struct parallel_data *pd; struct parallel_data *pd;
atomic_t num_obj;
int cpu_index; int cpu_index;
}; };
...@@ -93,6 +91,7 @@ struct padata_queue { ...@@ -93,6 +91,7 @@ struct padata_queue {
* @max_seq_nr: Maximal used sequence number. * @max_seq_nr: Maximal used sequence number.
* @cpumask: cpumask in use. * @cpumask: cpumask in use.
* @lock: Reorder lock. * @lock: Reorder lock.
* @processed: Number of already processed objects.
* @timer: Reorder timer. * @timer: Reorder timer.
*/ */
struct parallel_data { struct parallel_data {
...@@ -103,7 +102,8 @@ struct parallel_data { ...@@ -103,7 +102,8 @@ struct parallel_data {
atomic_t refcnt; atomic_t refcnt;
unsigned int max_seq_nr; unsigned int max_seq_nr;
cpumask_var_t cpumask; cpumask_var_t cpumask;
spinlock_t lock; spinlock_t lock ____cacheline_aligned;
unsigned int processed;
struct timer_list timer; struct timer_list timer;
}; };
......
...@@ -170,79 +170,47 @@ EXPORT_SYMBOL(padata_do_parallel); ...@@ -170,79 +170,47 @@ EXPORT_SYMBOL(padata_do_parallel);
*/ */
static struct padata_priv *padata_get_next(struct parallel_data *pd) static struct padata_priv *padata_get_next(struct parallel_data *pd)
{ {
int cpu, num_cpus, empty, calc_seq_nr; int cpu, num_cpus;
int seq_nr, next_nr, overrun, next_overrun; int next_nr, next_index;
struct padata_queue *queue, *next_queue; struct padata_queue *queue, *next_queue;
struct padata_priv *padata; struct padata_priv *padata;
struct padata_list *reorder; struct padata_list *reorder;
empty = 0;
next_nr = -1;
next_overrun = 0;
next_queue = NULL;
num_cpus = cpumask_weight(pd->cpumask); num_cpus = cpumask_weight(pd->cpumask);
for_each_cpu(cpu, pd->cpumask) { /*
queue = per_cpu_ptr(pd->queue, cpu); * Calculate the percpu reorder queue and the sequence
reorder = &queue->reorder; * number of the next object.
*/
/* next_nr = pd->processed;
* Calculate the seq_nr of the object that should be next_index = next_nr % num_cpus;
* next in this reorder queue. cpu = padata_index_to_cpu(pd, next_index);
*/ next_queue = per_cpu_ptr(pd->queue, cpu);
overrun = 0;
calc_seq_nr = (atomic_read(&queue->num_obj) * num_cpus) if (unlikely(next_nr > pd->max_seq_nr)) {
+ queue->cpu_index; next_nr = next_nr - pd->max_seq_nr - 1;
next_index = next_nr % num_cpus;
if (unlikely(calc_seq_nr > pd->max_seq_nr)) { cpu = padata_index_to_cpu(pd, next_index);
calc_seq_nr = calc_seq_nr - pd->max_seq_nr - 1; next_queue = per_cpu_ptr(pd->queue, cpu);
overrun = 1; pd->processed = 0;
}
if (!list_empty(&reorder->list)) {
padata = list_entry(reorder->list.next,
struct padata_priv, list);
seq_nr = padata->seq_nr;
BUG_ON(calc_seq_nr != seq_nr);
} else {
seq_nr = calc_seq_nr;
empty++;
}
if (next_nr < 0 || seq_nr < next_nr
|| (next_overrun && !overrun)) {
next_nr = seq_nr;
next_overrun = overrun;
next_queue = queue;
}
} }
padata = NULL; padata = NULL;
if (empty == num_cpus)
goto out;
reorder = &next_queue->reorder; reorder = &next_queue->reorder;
if (!list_empty(&reorder->list)) { if (!list_empty(&reorder->list)) {
padata = list_entry(reorder->list.next, padata = list_entry(reorder->list.next,
struct padata_priv, list); struct padata_priv, list);
if (unlikely(next_overrun)) { BUG_ON(next_nr != padata->seq_nr);
for_each_cpu(cpu, pd->cpumask) {
queue = per_cpu_ptr(pd->queue, cpu);
atomic_set(&queue->num_obj, 0);
}
}
spin_lock(&reorder->lock); spin_lock(&reorder->lock);
list_del_init(&padata->list); list_del_init(&padata->list);
atomic_dec(&pd->reorder_objects); atomic_dec(&pd->reorder_objects);
spin_unlock(&reorder->lock); spin_unlock(&reorder->lock);
atomic_inc(&next_queue->num_obj); pd->processed++;
goto out; goto out;
} }
...@@ -430,7 +398,6 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst, ...@@ -430,7 +398,6 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
INIT_WORK(&queue->pwork, padata_parallel_worker); INIT_WORK(&queue->pwork, padata_parallel_worker);
INIT_WORK(&queue->swork, padata_serial_worker); INIT_WORK(&queue->swork, padata_serial_worker);
atomic_set(&queue->num_obj, 0);
} }
num_cpus = cpumask_weight(pd->cpumask); num_cpus = cpumask_weight(pd->cpumask);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment