svc.c 42.6 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0-only
Linus Torvalds's avatar
Linus Torvalds committed
2 3 4 5 6 7
/*
 * linux/net/sunrpc/svc.c
 *
 * High-level RPC service routines
 *
 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
8 9 10 11
 *
 * Multiple threads pools and NUMAisation
 * Copyright (c) 2006 Silicon Graphics, Inc.
 * by Greg Banks <gnb@melbourne.sgi.com>
Linus Torvalds's avatar
Linus Torvalds committed
12 13 14
 */

#include <linux/linkage.h>
15
#include <linux/sched/signal.h>
Linus Torvalds's avatar
Linus Torvalds committed
16 17 18 19
#include <linux/errno.h>
#include <linux/net.h>
#include <linux/in.h>
#include <linux/mm.h>
20 21
#include <linux/interrupt.h>
#include <linux/module.h>
22
#include <linux/kthread.h>
23
#include <linux/slab.h>
Linus Torvalds's avatar
Linus Torvalds committed
24 25 26 27 28 29

#include <linux/sunrpc/types.h>
#include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/stats.h>
#include <linux/sunrpc/svcsock.h>
#include <linux/sunrpc/clnt.h>
30
#include <linux/sunrpc/bc_xprt.h>
Linus Torvalds's avatar
Linus Torvalds committed
31

32 33
#include <trace/events/sunrpc.h>

34 35
#include "fail.h"

Linus Torvalds's avatar
Linus Torvalds committed
36 37
#define RPCDBG_FACILITY	RPCDBG_SVCDSP

38
static void svc_unregister(const struct svc_serv *serv, struct net *net);
39

40
#define SVC_POOL_DEFAULT	SVC_POOL_GLOBAL
41

42 43 44 45 46 47 48 49 50 51 52
/*
 * Mode for mapping cpus to pools.
 */
enum {
	SVC_POOL_AUTO = -1,	/* choose one of the others */
	SVC_POOL_GLOBAL,	/* no mapping, just a single global pool
				 * (legacy & UP mode) */
	SVC_POOL_PERCPU,	/* one pool per cpu */
	SVC_POOL_PERNODE	/* one pool per numa node */
};

53 54 55 56
/*
 * Structure for mapping cpus to pools and vice versa.
 * Setup once during sunrpc initialisation.
 */
57 58 59 60 61 62 63 64 65 66 67 68

struct svc_pool_map {
	int count;			/* How many svc_servs use us */
	int mode;			/* Note: int not enum to avoid
					 * warnings about "enumeration value
					 * not handled in switch" */
	unsigned int npools;
	unsigned int *pool_to;		/* maps pool id to cpu or node */
	unsigned int *to_pool;		/* maps cpu or node to pool id */
};

static struct svc_pool_map svc_pool_map = {
69
	.mode = SVC_POOL_DEFAULT
70
};
71

72 73 74
static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */

static int
75
param_set_pool_mode(const char *val, const struct kernel_param *kp)
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
{
	int *ip = (int *)kp->arg;
	struct svc_pool_map *m = &svc_pool_map;
	int err;

	mutex_lock(&svc_pool_map_mutex);

	err = -EBUSY;
	if (m->count)
		goto out;

	err = 0;
	if (!strncmp(val, "auto", 4))
		*ip = SVC_POOL_AUTO;
	else if (!strncmp(val, "global", 6))
		*ip = SVC_POOL_GLOBAL;
	else if (!strncmp(val, "percpu", 6))
		*ip = SVC_POOL_PERCPU;
	else if (!strncmp(val, "pernode", 7))
		*ip = SVC_POOL_PERNODE;
	else
		err = -EINVAL;

out:
	mutex_unlock(&svc_pool_map_mutex);
	return err;
}

static int
105
param_get_pool_mode(char *buf, const struct kernel_param *kp)
106 107 108 109 110 111
{
	int *ip = (int *)kp->arg;

	switch (*ip)
	{
	case SVC_POOL_AUTO:
112
		return sysfs_emit(buf, "auto\n");
113
	case SVC_POOL_GLOBAL:
114
		return sysfs_emit(buf, "global\n");
115
	case SVC_POOL_PERCPU:
116
		return sysfs_emit(buf, "percpu\n");
117
	case SVC_POOL_PERNODE:
118
		return sysfs_emit(buf, "pernode\n");
119
	default:
120
		return sysfs_emit(buf, "%d\n", *ip);
121 122
	}
}
123

124 125
module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
		 &svc_pool_map.mode, 0644);
126 127 128 129 130 131 132 133 134 135

/*
 * Detect best pool mapping mode heuristically,
 * according to the machine's topology.
 */
static int
svc_pool_map_choose_mode(void)
{
	unsigned int node;

136
	if (nr_online_nodes > 1) {
137 138 139 140 141 142 143
		/*
		 * Actually have multiple NUMA nodes,
		 * so split pools on NUMA node boundaries
		 */
		return SVC_POOL_PERNODE;
	}

144
	node = first_online_node;
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
	if (nr_cpus_node(node) > 2) {
		/*
		 * Non-trivial SMP, or CONFIG_NUMA on
		 * non-NUMA hardware, e.g. with a generic
		 * x86_64 kernel on Xeons.  In this case we
		 * want to divide the pools on cpu boundaries.
		 */
		return SVC_POOL_PERCPU;
	}

	/* default: one global pool */
	return SVC_POOL_GLOBAL;
}

/*
 * Allocate the to_pool[] and pool_to[] arrays.
 * Returns 0 on success or an errno.
 */
static int
svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
{
	m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
	if (!m->to_pool)
		goto fail;
	m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
	if (!m->pool_to)
		goto fail_free;

	return 0;

fail_free:
	kfree(m->to_pool);
177
	m->to_pool = NULL;
178 179 180 181 182 183 184 185 186 187 188
fail:
	return -ENOMEM;
}

/*
 * Initialise the pool map for SVC_POOL_PERCPU mode.
 * Returns number of pools or <0 on error.
 */
static int
svc_pool_map_init_percpu(struct svc_pool_map *m)
{
189
	unsigned int maxpools = nr_cpu_ids;
190 191 192 193 194 195 196 197 198
	unsigned int pidx = 0;
	unsigned int cpu;
	int err;

	err = svc_pool_map_alloc_arrays(m, maxpools);
	if (err)
		return err;

	for_each_online_cpu(cpu) {
199
		BUG_ON(pidx >= maxpools);
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
		m->to_pool[cpu] = pidx;
		m->pool_to[pidx] = cpu;
		pidx++;
	}
	/* cpus brought online later all get mapped to pool0, sorry */

	return pidx;
};


/*
 * Initialise the pool map for SVC_POOL_PERNODE mode.
 * Returns number of pools or <0 on error.
 */
static int
svc_pool_map_init_pernode(struct svc_pool_map *m)
{
217
	unsigned int maxpools = nr_node_ids;
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
	unsigned int pidx = 0;
	unsigned int node;
	int err;

	err = svc_pool_map_alloc_arrays(m, maxpools);
	if (err)
		return err;

	for_each_node_with_cpus(node) {
		/* some architectures (e.g. SN2) have cpuless nodes */
		BUG_ON(pidx > maxpools);
		m->to_pool[node] = pidx;
		m->pool_to[pidx] = node;
		pidx++;
	}
	/* nodes brought online later all get mapped to pool0, sorry */

	return pidx;
}


/*
240
 * Add a reference to the global map of cpus to pools (and
241 242 243 244
 * vice versa) if pools are in use.
 * Initialise the map if we're the first user.
 * Returns the number of pools. If this is '1', no reference
 * was taken.
245
 */
246
static unsigned int
247
svc_pool_map_get(void)
248 249 250 251
{
	struct svc_pool_map *m = &svc_pool_map;
	int npools = -1;

252 253 254 255
	mutex_lock(&svc_pool_map_mutex);

	if (m->count++) {
		mutex_unlock(&svc_pool_map_mutex);
256
		WARN_ON_ONCE(m->npools <= 1);
257
		return m->npools;
258
	}
259

260 261
	if (m->mode == SVC_POOL_AUTO)
		m->mode = svc_pool_map_choose_mode();
262 263 264 265 266 267 268 269 270 271

	switch (m->mode) {
	case SVC_POOL_PERCPU:
		npools = svc_pool_map_init_percpu(m);
		break;
	case SVC_POOL_PERNODE:
		npools = svc_pool_map_init_pernode(m);
		break;
	}

272
	if (npools <= 0) {
273 274 275 276 277 278
		/* default, or memory allocation failure */
		npools = 1;
		m->mode = SVC_POOL_GLOBAL;
	}
	m->npools = npools;

279 280 281 282
	if (npools == 1)
		/* service is unpooled, so doesn't hold a reference */
		m->count--;

283
	mutex_unlock(&svc_pool_map_mutex);
284
	return npools;
285
}
286 287

/*
288 289
 * Drop a reference to the global map of cpus to pools, if
 * pools were in use, i.e. if npools > 1.
290 291 292 293 294
 * When the last reference is dropped, the map data is
 * freed; this allows the sysadmin to change the pool
 * mode using the pool_mode module option without
 * rebooting or re-loading sunrpc.ko.
 */
295
static void
296
svc_pool_map_put(int npools)
297 298 299
{
	struct svc_pool_map *m = &svc_pool_map;

300 301
	if (npools <= 1)
		return;
302 303 304 305
	mutex_lock(&svc_pool_map_mutex);

	if (!--m->count) {
		kfree(m->to_pool);
306
		m->to_pool = NULL;
307
		kfree(m->pool_to);
308
		m->pool_to = NULL;
309 310 311 312 313 314
		m->npools = 0;
	}

	mutex_unlock(&svc_pool_map_mutex);
}

315 316 317 318 319 320 321 322 323 324 325 326
static int svc_pool_map_get_node(unsigned int pidx)
{
	const struct svc_pool_map *m = &svc_pool_map;

	if (m->count) {
		if (m->mode == SVC_POOL_PERCPU)
			return cpu_to_node(m->pool_to[pidx]);
		if (m->mode == SVC_POOL_PERNODE)
			return m->pool_to[pidx];
	}
	return NUMA_NO_NODE;
}
327
/*
328
 * Set the given thread's cpus_allowed mask so that it
329 330
 * will only run on cpus in the given pool.
 */
331 332
static inline void
svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
333 334
{
	struct svc_pool_map *m = &svc_pool_map;
335
	unsigned int node = m->pool_to[pidx];
336 337 338

	/*
	 * The caller checks for sv_nrpools > 1, which
339
	 * implies that we've been initialized.
340
	 */
341 342 343
	WARN_ON_ONCE(m->count == 0);
	if (m->count == 0)
		return;
344

345
	switch (m->mode) {
346
	case SVC_POOL_PERCPU:
347
	{
348
		set_cpus_allowed_ptr(task, cpumask_of(node));
349
		break;
350
	}
351
	case SVC_POOL_PERNODE:
352
	{
353
		set_cpus_allowed_ptr(task, cpumask_of_node(node));
354
		break;
355
	}
356
	}
357 358
}

359 360 361 362 363 364 365 366 367 368
/**
 * svc_pool_for_cpu - Select pool to run a thread on this cpu
 * @serv: An RPC service
 *
 * Use the active CPU and the svc_pool_map's mode setting to
 * select the svc thread pool to use. Once initialized, the
 * svc_pool_map does not change.
 *
 * Return value:
 *   A pointer to an svc_pool
369
 */
370
struct svc_pool *svc_pool_for_cpu(struct svc_serv *serv)
371 372
{
	struct svc_pool_map *m = &svc_pool_map;
373
	int cpu = raw_smp_processor_id();
374 375
	unsigned int pidx = 0;

376 377 378 379 380 381 382 383 384 385
	if (serv->sv_nrpools <= 1)
		return serv->sv_pools;

	switch (m->mode) {
	case SVC_POOL_PERCPU:
		pidx = m->to_pool[cpu];
		break;
	case SVC_POOL_PERNODE:
		pidx = m->to_pool[cpu_to_node(cpu)];
		break;
386
	}
387

388 389 390
	return &serv->sv_pools[pidx % serv->sv_nrpools];
}

391
int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
392 393 394
{
	int err;

395
	err = rpcb_create_local(net);
396 397 398 399
	if (err)
		return err;

	/* Remove any stale portmap registrations */
400
	svc_unregister(serv, net);
401 402
	return 0;
}
403
EXPORT_SYMBOL_GPL(svc_rpcb_setup);
404

405
void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
406
{
407 408
	svc_unregister(serv, net);
	rpcb_put_local(net);
409
}
410
EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
411 412 413 414 415 416 417 418 419 420

static int svc_uses_rpcbind(struct svc_serv *serv)
{
	struct svc_program	*progp;
	unsigned int		i;

	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
		for (i = 0; i < progp->pg_nvers; i++) {
			if (progp->pg_vers[i] == NULL)
				continue;
421
			if (!progp->pg_vers[i]->vs_hidden)
422 423 424 425 426 427
				return 1;
		}
	}

	return 0;
}
428

429 430 431 432 433 434 435 436
int svc_bind(struct svc_serv *serv, struct net *net)
{
	if (!svc_uses_rpcbind(serv))
		return 0;
	return svc_rpcb_setup(serv, net);
}
EXPORT_SYMBOL_GPL(svc_bind);

437 438 439 440 441 442 443 444 445 446 447 448 449 450
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
static void
__svc_init_bc(struct svc_serv *serv)
{
	INIT_LIST_HEAD(&serv->sv_cb_list);
	spin_lock_init(&serv->sv_cb_lock);
}
#else
static void
__svc_init_bc(struct svc_serv *serv)
{
}
#endif

Linus Torvalds's avatar
Linus Torvalds committed
451 452 453
/*
 * Create an RPC service
 */
454 455
static struct svc_serv *
__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
456
	     int (*threadfn)(void *data))
Linus Torvalds's avatar
Linus Torvalds committed
457 458
{
	struct svc_serv	*serv;
459
	unsigned int vers;
Linus Torvalds's avatar
Linus Torvalds committed
460
	unsigned int xdrsize;
461
	unsigned int i;
Linus Torvalds's avatar
Linus Torvalds committed
462

463
	if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
Linus Torvalds's avatar
Linus Torvalds committed
464
		return NULL;
465
	serv->sv_name      = prog->pg_name;
Linus Torvalds's avatar
Linus Torvalds committed
466
	serv->sv_program   = prog;
467
	kref_init(&serv->sv_refcnt);
Linus Torvalds's avatar
Linus Torvalds committed
468
	serv->sv_stats     = prog->pg_stats;
469 470 471 472
	if (bufsize > RPCSVC_MAXPAYLOAD)
		bufsize = RPCSVC_MAXPAYLOAD;
	serv->sv_max_payload = bufsize? bufsize : 4096;
	serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
473
	serv->sv_threadfn = threadfn;
Linus Torvalds's avatar
Linus Torvalds committed
474
	xdrsize = 0;
475 476 477 478 479 480 481 482 483 484 485 486
	while (prog) {
		prog->pg_lovers = prog->pg_nvers-1;
		for (vers=0; vers<prog->pg_nvers ; vers++)
			if (prog->pg_vers[vers]) {
				prog->pg_hivers = vers;
				if (prog->pg_lovers > vers)
					prog->pg_lovers = vers;
				if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
					xdrsize = prog->pg_vers[vers]->vs_xdrsize;
			}
		prog = prog->pg_next;
	}
Linus Torvalds's avatar
Linus Torvalds committed
487 488 489
	serv->sv_xdrsize   = xdrsize;
	INIT_LIST_HEAD(&serv->sv_tempsocks);
	INIT_LIST_HEAD(&serv->sv_permsocks);
490
	timer_setup(&serv->sv_temptimer, NULL, 0);
Linus Torvalds's avatar
Linus Torvalds committed
491 492
	spin_lock_init(&serv->sv_lock);

493 494
	__svc_init_bc(serv);

495
	serv->sv_nrpools = npools;
496
	serv->sv_pools =
497
		kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
498 499 500 501 502 503 504 505 506
			GFP_KERNEL);
	if (!serv->sv_pools) {
		kfree(serv);
		return NULL;
	}

	for (i = 0; i < serv->sv_nrpools; i++) {
		struct svc_pool *pool = &serv->sv_pools[i];

507
		dprintk("svc: initialising pool %u for %s\n",
508 509 510
				i, serv->sv_name);

		pool->sp_id = i;
511
		lwq_init(&pool->sp_xprts);
512
		INIT_LIST_HEAD(&pool->sp_all_threads);
513
		init_llist_head(&pool->sp_idle_threads);
514
		spin_lock_init(&pool->sp_lock);
515

516
		percpu_counter_init(&pool->sp_messages_arrived, 0, GFP_KERNEL);
517 518
		percpu_counter_init(&pool->sp_sockets_queued, 0, GFP_KERNEL);
		percpu_counter_init(&pool->sp_threads_woken, 0, GFP_KERNEL);
519 520
	}

Linus Torvalds's avatar
Linus Torvalds committed
521 522 523
	return serv;
}

524 525 526 527 528 529 530 531 532 533
/**
 * svc_create - Create an RPC service
 * @prog: the RPC program the new service will handle
 * @bufsize: maximum message size for @prog
 * @threadfn: a function to service RPC requests for @prog
 *
 * Returns an instantiated struct svc_serv object or NULL.
 */
struct svc_serv *svc_create(struct svc_program *prog, unsigned int bufsize,
			    int (*threadfn)(void *data))
534
{
535
	return __svc_create(prog, bufsize, 1, threadfn);
536
}
537
EXPORT_SYMBOL_GPL(svc_create);
538

539 540 541 542 543 544 545 546 547 548 549
/**
 * svc_create_pooled - Create an RPC service with pooled threads
 * @prog: the RPC program the new service will handle
 * @bufsize: maximum message size for @prog
 * @threadfn: a function to service RPC requests for @prog
 *
 * Returns an instantiated struct svc_serv object or NULL.
 */
struct svc_serv *svc_create_pooled(struct svc_program *prog,
				   unsigned int bufsize,
				   int (*threadfn)(void *data))
550 551
{
	struct svc_serv *serv;
552
	unsigned int npools = svc_pool_map_get();
553

554
	serv = __svc_create(prog, bufsize, npools, threadfn);
555 556
	if (!serv)
		goto out_err;
557
	return serv;
558
out_err:
559
	svc_pool_map_put(npools);
560
	return NULL;
561
}
562
EXPORT_SYMBOL_GPL(svc_create_pooled);
563

Linus Torvalds's avatar
Linus Torvalds committed
564
/*
565
 * Destroy an RPC service. Should be called with appropriate locking to
566
 * protect sv_permsocks and sv_tempsocks.
Linus Torvalds's avatar
Linus Torvalds committed
567 568
 */
void
569
svc_destroy(struct kref *ref)
Linus Torvalds's avatar
Linus Torvalds committed
570
{
571
	struct svc_serv *serv = container_of(ref, struct svc_serv, sv_refcnt);
572
	unsigned int i;
Linus Torvalds's avatar
Linus Torvalds committed
573

574
	dprintk("svc: svc_destroy(%s)\n", serv->sv_program->pg_name);
575
	timer_shutdown_sync(&serv->sv_temptimer);
576

577 578 579 580 581 582
	/*
	 * The last user is gone and thus all sockets have to be destroyed to
	 * the point. Check this.
	 */
	BUG_ON(!list_empty(&serv->sv_permsocks));
	BUG_ON(!list_empty(&serv->sv_tempsocks));
583

Linus Torvalds's avatar
Linus Torvalds committed
584 585
	cache_clean_deferred(serv);

586
	svc_pool_map_put(serv->sv_nrpools);
587

588 589 590
	for (i = 0; i < serv->sv_nrpools; i++) {
		struct svc_pool *pool = &serv->sv_pools[i];

591
		percpu_counter_destroy(&pool->sp_messages_arrived);
592 593 594
		percpu_counter_destroy(&pool->sp_sockets_queued);
		percpu_counter_destroy(&pool->sp_threads_woken);
	}
595
	kfree(serv->sv_pools);
Linus Torvalds's avatar
Linus Torvalds committed
596 597
	kfree(serv);
}
598
EXPORT_SYMBOL_GPL(svc_destroy);
Linus Torvalds's avatar
Linus Torvalds committed
599

600
static bool
601
svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
Linus Torvalds's avatar
Linus Torvalds committed
602
{
603
	unsigned long pages, ret;
604

605 606
	/* bc_xprt uses fore channel allocated buffers */
	if (svc_is_backchannel(rqstp))
607
		return true;
608

609 610 611
	pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
				       * We assume one is at most one page
				       */
612 613 614
	WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
	if (pages > RPCSVC_MAXPAGES)
		pages = RPCSVC_MAXPAGES;
615 616 617 618

	ret = alloc_pages_bulk_array_node(GFP_KERNEL, node, pages,
					  rqstp->rq_pages);
	return ret == pages;
Linus Torvalds's avatar
Linus Torvalds committed
619 620 621 622 623 624 625 626
}

/*
 * Release an RPC server buffer
 */
static void
svc_release_buffer(struct svc_rqst *rqstp)
{
627 628 629
	unsigned int i;

	for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
630 631
		if (rqstp->rq_pages[i])
			put_page(rqstp->rq_pages[i]);
Linus Torvalds's avatar
Linus Torvalds committed
632 633
}

634
struct svc_rqst *
635
svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
Linus Torvalds's avatar
Linus Torvalds committed
636 637 638
{
	struct svc_rqst	*rqstp;

639
	rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
Linus Torvalds's avatar
Linus Torvalds committed
640
	if (!rqstp)
641
		return rqstp;
Linus Torvalds's avatar
Linus Torvalds committed
642

643
	folio_batch_init(&rqstp->rq_fbatch);
644

645 646
	rqstp->rq_server = serv;
	rqstp->rq_pool = pool;
647

648 649 650 651
	rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0);
	if (!rqstp->rq_scratch_page)
		goto out_enomem;

652
	rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
653
	if (!rqstp->rq_argp)
654
		goto out_enomem;
655

656
	rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
657
	if (!rqstp->rq_resp)
658
		goto out_enomem;
659

660
	if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
661
		goto out_enomem;
662 663 664

	return rqstp;
out_enomem:
665 666 667 668 669
	svc_rqst_free(rqstp);
	return NULL;
}
EXPORT_SYMBOL_GPL(svc_rqst_alloc);

670
static struct svc_rqst *
671 672 673 674 675 676 677 678
svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
{
	struct svc_rqst	*rqstp;

	rqstp = svc_rqst_alloc(serv, pool, node);
	if (!rqstp)
		return ERR_PTR(-ENOMEM);

679
	svc_get(serv);
680 681 682 683
	spin_lock_bh(&serv->sv_lock);
	serv->sv_nrthreads += 1;
	spin_unlock_bh(&serv->sv_lock);

684
	atomic_inc(&pool->sp_nrthreads);
685 686 687 688
	spin_lock_bh(&pool->sp_lock);
	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
	spin_unlock_bh(&pool->sp_lock);
	return rqstp;
689 690
}

691 692 693 694 695 696 697 698 699
/**
 * svc_pool_wake_idle_thread - Awaken an idle thread in @pool
 * @pool: service thread pool
 *
 * Can be called from soft IRQ or process context. Finding an idle
 * service thread and marking it BUSY is atomic with respect to
 * other calls to svc_pool_wake_idle_thread().
 *
 */
700
void svc_pool_wake_idle_thread(struct svc_pool *pool)
701 702
{
	struct svc_rqst	*rqstp;
703
	struct llist_node *ln;
704 705

	rcu_read_lock();
706
	ln = READ_ONCE(pool->sp_idle_threads.first);
707 708
	if (ln) {
		rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
709
		WRITE_ONCE(rqstp->rq_qtime, ktime_get());
710 711 712 713 714
		if (!task_is_running(rqstp->rq_task)) {
			wake_up_process(rqstp->rq_task);
			trace_svc_wake_up(rqstp->rq_task->pid);
			percpu_counter_inc(&pool->sp_threads_woken);
		}
715
		rcu_read_unlock();
716
		return;
717 718 719 720
	}
	rcu_read_unlock();

}
721
EXPORT_SYMBOL_GPL(svc_pool_wake_idle_thread);
722

723 724
static struct svc_pool *
svc_pool_next(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
725
{
726
	return pool ? pool : &serv->sv_pools[(*state)++ % serv->sv_nrpools];
727 728
}

729
static struct svc_pool *
730 731
svc_pool_victim(struct svc_serv *serv, struct svc_pool *target_pool,
		unsigned int *state)
732
{
733
	struct svc_pool *pool;
734 735
	unsigned int i;

736 737 738
retry:
	pool = target_pool;

739
	if (pool != NULL) {
740
		if (atomic_inc_not_zero(&pool->sp_nrthreads))
741 742
			goto found_pool;
		return NULL;
743
	} else {
744 745
		for (i = 0; i < serv->sv_nrpools; i++) {
			pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
746
			if (atomic_inc_not_zero(&pool->sp_nrthreads))
747 748
				goto found_pool;
		}
749 750 751 752
		return NULL;
	}

found_pool:
753 754
	set_bit(SP_VICTIM_REMAINS, &pool->sp_flags);
	set_bit(SP_NEED_VICTIM, &pool->sp_flags);
755 756 757 758 759 760
	if (!atomic_dec_and_test(&pool->sp_nrthreads))
		return pool;
	/* Nothing left in this pool any more */
	clear_bit(SP_NEED_VICTIM, &pool->sp_flags);
	clear_bit(SP_VICTIM_REMAINS, &pool->sp_flags);
	goto retry;
761 762
}

763 764
static int
svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
765
{
766 767 768
	struct svc_rqst	*rqstp;
	struct task_struct *task;
	struct svc_pool *chosen_pool;
769
	unsigned int state = serv->sv_nrthreads-1;
770
	int node;
771

772
	do {
773
		nrservs--;
774
		chosen_pool = svc_pool_next(serv, pool, &state);
775
		node = svc_pool_map_get_node(chosen_pool->sp_id);
776

777
		rqstp = svc_prepare_thread(serv, chosen_pool, node);
778 779
		if (IS_ERR(rqstp))
			return PTR_ERR(rqstp);
780
		task = kthread_create_on_node(serv->sv_threadfn, rqstp,
781
					      node, "%s", serv->sv_name);
782 783
		if (IS_ERR(task)) {
			svc_exit_thread(rqstp);
784
			return PTR_ERR(task);
785
		}
786 787 788 789 790 791 792

		rqstp->rq_task = task;
		if (serv->sv_nrpools > 1)
			svc_pool_map_set_cpumask(task, chosen_pool->sp_id);

		svc_sock_update_bufs(serv);
		wake_up_process(task);
793 794 795 796 797
	} while (nrservs > 0);

	return 0;
}

798 799 800 801
static int
svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
{
	unsigned int state = serv->sv_nrthreads-1;
802
	struct svc_pool *victim;
803 804

	do {
805 806
		victim = svc_pool_victim(serv, pool, &state);
		if (!victim)
807
			break;
808 809 810
		svc_pool_wake_idle_thread(victim);
		wait_on_bit(&victim->sp_flags, SP_VICTIM_REMAINS,
			    TASK_IDLE);
811 812 813 814 815
		nrservs++;
	} while (nrservs < 0);
	return 0;
}

816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832
/**
 * svc_set_num_threads - adjust number of threads per RPC service
 * @serv: RPC service to adjust
 * @pool: Specific pool from which to choose threads, or NULL
 * @nrservs: New number of threads for @serv (0 or less means kill all threads)
 *
 * Create or destroy threads to make the number of threads for @serv the
 * given number. If @pool is non-NULL, change only threads in that pool;
 * otherwise, round-robin between all pools for @serv. @serv's
 * sv_nrthreads is adjusted for each thread created or destroyed.
 *
 * Caller must ensure mutual exclusion between this and server startup or
 * shutdown.
 *
 * Returns zero on success or a negative errno if an error occurred while
 * starting a thread.
 */
833
int
834
svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
835
{
836
	if (!pool)
837
		nrservs -= serv->sv_nrthreads;
838 839
	else
		nrservs -= atomic_read(&pool->sp_nrthreads);
840 841 842 843 844 845 846

	if (nrservs > 0)
		return svc_start_kthreads(serv, pool, nrservs);
	if (nrservs < 0)
		return svc_stop_kthreads(serv, pool, nrservs);
	return 0;
}
847
EXPORT_SYMBOL_GPL(svc_set_num_threads);
848

849 850 851 852 853 854 855
/**
 * svc_rqst_replace_page - Replace one page in rq_pages[]
 * @rqstp: svc_rqst with pages to replace
 * @page: replacement page
 *
 * When replacing a page in rq_pages, batch the release of the
 * replaced pages to avoid hammering the page allocator.
856 857 858 859
 *
 * Return values:
 *   %true: page replaced
 *   %false: array bounds checking failed
860
 */
861
bool svc_rqst_replace_page(struct svc_rqst *rqstp, struct page *page)
862
{
863 864 865 866 867 868 869 870
	struct page **begin = rqstp->rq_pages;
	struct page **end = &rqstp->rq_pages[RPCSVC_MAXPAGES];

	if (unlikely(rqstp->rq_next_page < begin || rqstp->rq_next_page > end)) {
		trace_svc_replace_page_err(rqstp);
		return false;
	}

871
	if (*rqstp->rq_next_page) {
872 873 874
		if (!folio_batch_add(&rqstp->rq_fbatch,
				page_folio(*rqstp->rq_next_page)))
			__folio_batch_release(&rqstp->rq_fbatch);
875 876 877 878
	}

	get_page(page);
	*(rqstp->rq_next_page++) = page;
879
	return true;
880 881 882
}
EXPORT_SYMBOL_GPL(svc_rqst_replace_page);

883 884 885 886 887 888 889 890 891
/**
 * svc_rqst_release_pages - Release Reply buffer pages
 * @rqstp: RPC transaction context
 *
 * Release response pages that might still be in flight after
 * svc_send, and any spliced filesystem-owned pages.
 */
void svc_rqst_release_pages(struct svc_rqst *rqstp)
{
892
	int i, count = rqstp->rq_next_page - rqstp->rq_respages;
893

894 895 896 897
	if (count) {
		release_pages(rqstp->rq_respages, count);
		for (i = 0; i < count; i++)
			rqstp->rq_respages[i] = NULL;
898 899 900
	}
}

901
/*
902 903
 * Called from a server thread as it's exiting. Caller must hold the "service
 * mutex" for the service.
Linus Torvalds's avatar
Linus Torvalds committed
904 905
 */
void
906
svc_rqst_free(struct svc_rqst *rqstp)
Linus Torvalds's avatar
Linus Torvalds committed
907
{
908
	folio_batch_release(&rqstp->rq_fbatch);
Linus Torvalds's avatar
Linus Torvalds committed
909
	svc_release_buffer(rqstp);
910 911
	if (rqstp->rq_scratch_page)
		put_page(rqstp->rq_scratch_page);
Jesper Juhl's avatar
Jesper Juhl committed
912 913 914
	kfree(rqstp->rq_resp);
	kfree(rqstp->rq_argp);
	kfree(rqstp->rq_auth_data);
915 916 917 918 919 920 921 922 923
	kfree_rcu(rqstp, rq_rcu_head);
}
EXPORT_SYMBOL_GPL(svc_rqst_free);

void
svc_exit_thread(struct svc_rqst *rqstp)
{
	struct svc_serv	*serv = rqstp->rq_server;
	struct svc_pool	*pool = rqstp->rq_pool;
924 925

	spin_lock_bh(&pool->sp_lock);
926
	list_del_rcu(&rqstp->rq_all);
927 928
	spin_unlock_bh(&pool->sp_lock);

929 930
	atomic_dec(&pool->sp_nrthreads);

931
	spin_lock_bh(&serv->sv_lock);
932
	serv->sv_nrthreads -= 1;
933
	spin_unlock_bh(&serv->sv_lock);
934 935
	svc_sock_update_bufs(serv);

936
	svc_rqst_free(rqstp);
Linus Torvalds's avatar
Linus Torvalds committed
937

938
	svc_put(serv);
939 940 941 942 943
	/* That svc_put() cannot be the last, because the thread
	 * waiting for SP_VICTIM_REMAINS to clear must hold
	 * a reference. So it is still safe to access pool.
	 */
	clear_and_wake_up_bit(SP_VICTIM_REMAINS, &pool->sp_flags);
Linus Torvalds's avatar
Linus Torvalds committed
944
}
945
EXPORT_SYMBOL_GPL(svc_exit_thread);
Linus Torvalds's avatar
Linus Torvalds committed
946 947

/*
948 949
 * Register an "inet" protocol family netid with the local
 * rpcbind daemon via an rpcbind v4 SET request.
950
 *
951 952
 * No netconfig infrastructure is available in the kernel, so
 * we map IP_ protocol numbers to netids by hand.
953
 *
954 955
 * Returns zero on success; a negative errno value is returned
 * if any error occurs.
Linus Torvalds's avatar
Linus Torvalds committed
956
 */
957 958
static int __svc_rpcb_register4(struct net *net, const u32 program,
				const u32 version,
959 960
				const unsigned short protocol,
				const unsigned short port)
961
{
962
	const struct sockaddr_in sin = {
963 964 965 966
		.sin_family		= AF_INET,
		.sin_addr.s_addr	= htonl(INADDR_ANY),
		.sin_port		= htons(port),
	};
967 968
	const char *netid;
	int error;
969 970 971 972 973 974 975 976 977

	switch (protocol) {
	case IPPROTO_UDP:
		netid = RPCBIND_NETID_UDP;
		break;
	case IPPROTO_TCP:
		netid = RPCBIND_NETID_TCP;
		break;
	default:
978
		return -ENOPROTOOPT;
979 980
	}

981
	error = rpcb_v4_register(net, program, version,
982 983 984 985 986 987 988
					(const struct sockaddr *)&sin, netid);

	/*
	 * User space didn't support rpcbind v4, so retry this
	 * registration request with the legacy rpcbind v2 protocol.
	 */
	if (error == -EPROTONOSUPPORT)
989
		error = rpcb_register(net, program, version, protocol, port);
990 991

	return error;
992 993
}

994
#if IS_ENABLED(CONFIG_IPV6)
995 996 997 998 999 1000 1001 1002 1003 1004
/*
 * Register an "inet6" protocol family netid with the local
 * rpcbind daemon via an rpcbind v4 SET request.
 *
 * No netconfig infrastructure is available in the kernel, so
 * we map IP_ protocol numbers to netids by hand.
 *
 * Returns zero on success; a negative errno value is returned
 * if any error occurs.
 */
1005 1006
static int __svc_rpcb_register6(struct net *net, const u32 program,
				const u32 version,
1007 1008 1009
				const unsigned short protocol,
				const unsigned short port)
{
1010
	const struct sockaddr_in6 sin6 = {
1011 1012 1013 1014
		.sin6_family		= AF_INET6,
		.sin6_addr		= IN6ADDR_ANY_INIT,
		.sin6_port		= htons(port),
	};
1015 1016
	const char *netid;
	int error;
1017

1018 1019 1020
	switch (protocol) {
	case IPPROTO_UDP:
		netid = RPCBIND_NETID_UDP6;
1021
		break;
1022
	case IPPROTO_TCP:
1023 1024 1025
		netid = RPCBIND_NETID_TCP6;
		break;
	default:
1026
		return -ENOPROTOOPT;
1027 1028
	}

1029
	error = rpcb_v4_register(net, program, version,
1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
					(const struct sockaddr *)&sin6, netid);

	/*
	 * User space didn't support rpcbind version 4, so we won't
	 * use a PF_INET6 listener.
	 */
	if (error == -EPROTONOSUPPORT)
		error = -EAFNOSUPPORT;

	return error;
1040
}
1041
#endif	/* IS_ENABLED(CONFIG_IPV6) */
1042 1043 1044 1045 1046 1047 1048

/*
 * Register a kernel RPC service via rpcbind version 4.
 *
 * Returns zero on success; a negative errno value is returned
 * if any error occurs.
 */
1049
static int __svc_register(struct net *net, const char *progname,
1050
			  const u32 program, const u32 version,
1051
			  const int family,
1052 1053 1054
			  const unsigned short protocol,
			  const unsigned short port)
{
1055
	int error = -EAFNOSUPPORT;
1056 1057

	switch (family) {
1058
	case PF_INET:
1059
		error = __svc_rpcb_register4(net, program, version,
1060
						protocol, port);
1061
		break;
1062
#if IS_ENABLED(CONFIG_IPV6)
1063
	case PF_INET6:
1064
		error = __svc_rpcb_register6(net, program, version,
1065
						protocol, port);
1066
#endif
1067 1068
	}

1069
	trace_svc_register(progname, version, family, protocol, port, error);
1070
	return error;
1071
}
1072

1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
int svc_rpcbind_set_version(struct net *net,
			    const struct svc_program *progp,
			    u32 version, int family,
			    unsigned short proto,
			    unsigned short port)
{
	return __svc_register(net, progp->pg_name, progp->pg_prog,
				version, family, proto, port);

}
EXPORT_SYMBOL_GPL(svc_rpcbind_set_version);

int svc_generic_rpcbind_set(struct net *net,
			    const struct svc_program *progp,
			    u32 version, int family,
			    unsigned short proto,
			    unsigned short port)
{
	const struct svc_version *vers = progp->pg_vers[version];
	int error;

	if (vers == NULL)
		return 0;

	if (vers->vs_hidden) {
1098 1099
		trace_svc_noregister(progp->pg_name, version, proto,
				     port, family, 0);
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116
		return 0;
	}

	/*
	 * Don't register a UDP port if we need congestion
	 * control.
	 */
	if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
		return 0;

	error = svc_rpcbind_set_version(net, progp, version,
					family, proto, port);

	return (vers->vs_rpcb_optnl) ? 0 : error;
}
EXPORT_SYMBOL_GPL(svc_generic_rpcbind_set);

1117 1118 1119
/**
 * svc_register - register an RPC service with the local portmapper
 * @serv: svc_serv struct for the service to register
1120
 * @net: net namespace for the service to register
1121
 * @family: protocol family of service's listener socket
1122 1123 1124
 * @proto: transport protocol number to advertise
 * @port: port to advertise
 *
1125
 * Service is registered for any address in the passed-in protocol family
1126
 */
1127 1128 1129
int svc_register(const struct svc_serv *serv, struct net *net,
		 const int family, const unsigned short proto,
		 const unsigned short port)
Linus Torvalds's avatar
Linus Torvalds committed
1130 1131
{
	struct svc_program	*progp;
1132
	unsigned int		i;
1133
	int			error = 0;
Linus Torvalds's avatar
Linus Torvalds committed
1134

1135 1136 1137
	WARN_ON_ONCE(proto == 0 && port == 0);
	if (proto == 0 && port == 0)
		return -EINVAL;
Linus Torvalds's avatar
Linus Torvalds committed
1138

1139 1140
	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
		for (i = 0; i < progp->pg_nvers; i++) {
1141

1142 1143
			error = progp->pg_rpcbind_set(net, progp, i,
					family, proto, port);
1144 1145 1146 1147
			if (error < 0) {
				printk(KERN_WARNING "svc: failed to register "
					"%sv%u RPC service (errno %d).\n",
					progp->pg_name, i, -error);
1148
				break;
1149
			}
Linus Torvalds's avatar
Linus Torvalds committed
1150 1151 1152
		}
	}

1153 1154 1155
	return error;
}

1156 1157 1158 1159 1160 1161 1162
/*
 * If user space is running rpcbind, it should take the v4 UNSET
 * and clear everything for this [program, version].  If user space
 * is running portmap, it will reject the v4 UNSET, but won't have
 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
 * in this case to clear all existing entries for [program, version].
 */
1163
static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1164 1165 1166 1167
			     const char *progname)
{
	int error;

1168
	error = rpcb_v4_register(net, program, version, NULL, "");
1169

1170 1171 1172 1173 1174
	/*
	 * User space didn't support rpcbind v4, so retry this
	 * request with the legacy rpcbind v2 protocol.
	 */
	if (error == -EPROTONOSUPPORT)
1175
		error = rpcb_register(net, program, version, 0, 0);
1176

1177
	trace_svc_unregister(progname, version, error);
1178 1179
}

1180
/*
1181 1182 1183
 * All netids, bind addresses and ports registered for [program, version]
 * are removed from the local rpcbind database (if the service is not
 * hidden) to make way for a new instance of the service.
1184
 *
1185 1186
 * The result of unregistration is reported via dprintk for those who want
 * verification of the result, but is otherwise not important.
1187
 */
1188
static void svc_unregister(const struct svc_serv *serv, struct net *net)
1189
{
1190
	struct sighand_struct *sighand;
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
	struct svc_program *progp;
	unsigned long flags;
	unsigned int i;

	clear_thread_flag(TIF_SIGPENDING);

	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
		for (i = 0; i < progp->pg_nvers; i++) {
			if (progp->pg_vers[i] == NULL)
				continue;
			if (progp->pg_vers[i]->vs_hidden)
				continue;
1203
			__svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1204
		}
Linus Torvalds's avatar
Linus Torvalds committed
1205 1206
	}

1207 1208 1209
	rcu_read_lock();
	sighand = rcu_dereference(current->sighand);
	spin_lock_irqsave(&sighand->siglock, flags);
1210
	recalc_sigpending();
1211 1212
	spin_unlock_irqrestore(&sighand->siglock, flags);
	rcu_read_unlock();
Linus Torvalds's avatar
Linus Torvalds committed
1213 1214
}

1215
/*
1216
 * dprintk the given error with the address of the client that caused it.
1217
 */
Jeff Layton's avatar
Jeff Layton committed
1218
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1219
static __printf(2, 3)
1220
void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1221
{
1222
	struct va_format vaf;
1223 1224 1225
	va_list args;
	char 	buf[RPC_MAX_ADDRBUFLEN];

1226
	va_start(args, fmt);
1227

1228 1229
	vaf.fmt = fmt;
	vaf.va = &args;
1230

1231
	dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1232

1233
	va_end(args);
1234
}
1235 1236 1237
#else
static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
#endif
1238

1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
__be32
svc_generic_init_request(struct svc_rqst *rqstp,
		const struct svc_program *progp,
		struct svc_process_info *ret)
{
	const struct svc_version *versp = NULL;	/* compiler food */
	const struct svc_procedure *procp = NULL;

	if (rqstp->rq_vers >= progp->pg_nvers )
		goto err_bad_vers;
1249 1250
	versp = progp->pg_vers[rqstp->rq_vers];
	if (!versp)
1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
		goto err_bad_vers;

	/*
	 * Some protocol versions (namely NFSv4) require some form of
	 * congestion control.  (See RFC 7530 section 3.1 paragraph 2)
	 * In other words, UDP is not allowed. We mark those when setting
	 * up the svc_xprt, and verify that here.
	 *
	 * The spec is not very clear about what error should be returned
	 * when someone tries to access a server that is listening on UDP
	 * for lower versions. RPC_PROG_MISMATCH seems to be the closest
	 * fit.
	 */
	if (versp->vs_need_cong_ctrl && rqstp->rq_xprt &&
	    !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
		goto err_bad_vers;

	if (rqstp->rq_proc >= versp->vs_nproc)
		goto err_bad_proc;
	rqstp->rq_procinfo = procp = &versp->vs_proc[rqstp->rq_proc];
	if (!procp)
		goto err_bad_proc;

	/* Initialize storage for argp and resp */
1275
	memset(rqstp->rq_argp, 0, procp->pc_argzero);
1276 1277 1278
	memset(rqstp->rq_resp, 0, procp->pc_ressize);

	/* Bump per-procedure stats counter */
1279
	this_cpu_inc(versp->vs_count[rqstp->rq_proc]);
1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291

	ret->dispatch = versp->vs_dispatch;
	return rpc_success;
err_bad_vers:
	ret->mismatch.lovers = progp->pg_lovers;
	ret->mismatch.hivers = progp->pg_hivers;
	return rpc_prog_mismatch;
err_bad_proc:
	return rpc_proc_unavail;
}
EXPORT_SYMBOL_GPL(svc_generic_init_request);

Linus Torvalds's avatar
Linus Torvalds committed
1292
/*
1293
 * Common routine for processing the RPC request.
Linus Torvalds's avatar
Linus Torvalds committed
1294
 */
1295
static int
1296
svc_process_common(struct svc_rqst *rqstp)
Linus Torvalds's avatar
Linus Torvalds committed
1297
{
1298
	struct xdr_stream	*xdr = &rqstp->rq_res_stream;
Linus Torvalds's avatar
Linus Torvalds committed
1299
	struct svc_program	*progp;
1300
	const struct svc_procedure *procp = NULL;
1301
	struct svc_serv		*serv = rqstp->rq_server;
1302
	struct svc_process_info process;
1303
	enum svc_auth_status	auth_res;
1304
	unsigned int		aoffset;
1305
	int			rc;
1306
	__be32			*p;
Linus Torvalds's avatar
Linus Torvalds committed
1307

1308
	/* Will be turned off by GSS integrity and privacy services */
1309
	set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1310
	/* Will be turned off only when NFSv4 Sessions are used */
1311 1312
	set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
	clear_bit(RQ_DROPME, &rqstp->rq_flags);
Tom Tucker's avatar
Tom Tucker committed
1313

1314
	/* Construct the first words of the reply: */
1315 1316 1317
	svcxdr_init_encode(rqstp);
	xdr_stream_encode_be32(xdr, rqstp->rq_xid);
	xdr_stream_encode_be32(xdr, rpc_reply);
Linus Torvalds's avatar
Linus Torvalds committed
1318

1319 1320 1321 1322
	p = xdr_inline_decode(&rqstp->rq_arg_stream, XDR_UNIT * 4);
	if (unlikely(!p))
		goto err_short_len;
	if (*p++ != cpu_to_be32(RPC_VERSION))
Linus Torvalds's avatar
Linus Torvalds committed
1323 1324
		goto err_bad_rpc;

1325
	xdr_stream_encode_be32(xdr, rpc_msg_accepted);
Linus Torvalds's avatar
Linus Torvalds committed
1326

1327 1328 1329
	rqstp->rq_prog = be32_to_cpup(p++);
	rqstp->rq_vers = be32_to_cpup(p++);
	rqstp->rq_proc = be32_to_cpup(p);
Linus Torvalds's avatar
Linus Torvalds committed
1330

1331
	for (progp = serv->sv_program; progp; progp = progp->pg_next)
1332
		if (rqstp->rq_prog == progp->pg_prog)
1333 1334
			break;

Linus Torvalds's avatar
Linus Torvalds committed
1335 1336 1337 1338 1339
	/*
	 * Decode auth data, and add verifier to reply buffer.
	 * We do this before anything else in order to get a decent
	 * auth verifier.
	 */
1340
	auth_res = svc_authenticate(rqstp);
Linus Torvalds's avatar
Linus Torvalds committed
1341
	/* Also give the program a chance to reject this call: */
1342
	if (auth_res == SVC_OK && progp)
Linus Torvalds's avatar
Linus Torvalds committed
1343
		auth_res = progp->pg_authenticate(rqstp);
1344
	trace_svc_authenticate(rqstp, auth_res);
Linus Torvalds's avatar
Linus Torvalds committed
1345 1346 1347 1348
	switch (auth_res) {
	case SVC_OK:
		break;
	case SVC_GARBAGE:
1349
		goto err_garbage_args;
Linus Torvalds's avatar
Linus Torvalds committed
1350
	case SVC_SYSERR:
1351
		goto err_system_err;
Linus Torvalds's avatar
Linus Torvalds committed
1352 1353
	case SVC_DENIED:
		goto err_bad_auth;
1354
	case SVC_CLOSE:
1355
		goto close;
Linus Torvalds's avatar
Linus Torvalds committed
1356 1357 1358 1359
	case SVC_DROP:
		goto dropit;
	case SVC_COMPLETE:
		goto sendit;
1360 1361 1362
	default:
		pr_warn_once("Unexpected svc_auth_status (%d)\n", auth_res);
		goto err_system_err;
Linus Torvalds's avatar
Linus Torvalds committed
1363
	}
1364

1365
	if (progp == NULL)
Linus Torvalds's avatar
Linus Torvalds committed
1366 1367
		goto err_bad_prog;

1368
	switch (progp->pg_init_request(rqstp, progp, &process)) {
1369 1370 1371 1372 1373
	case rpc_success:
		break;
	case rpc_prog_unavail:
		goto err_bad_prog;
	case rpc_prog_mismatch:
1374
		goto err_bad_vers;
1375 1376 1377
	case rpc_proc_unavail:
		goto err_bad_proc;
	}
1378

1379 1380 1381
	procp = rqstp->rq_procinfo;
	/* Should this check go into the dispatcher? */
	if (!procp || !procp->pc_func)
Linus Torvalds's avatar
Linus Torvalds committed
1382 1383 1384 1385
		goto err_bad_proc;

	/* Syntactic check complete */
	serv->sv_stats->rpccnt++;
1386
	trace_svc_process(rqstp, progp->pg_name);
Linus Torvalds's avatar
Linus Torvalds committed
1387

1388
	aoffset = xdr_stream_pos(xdr);
Linus Torvalds's avatar
Linus Torvalds committed
1389

1390
	/* un-reserve some of the out-queue now that we have a
Linus Torvalds's avatar
Linus Torvalds committed
1391 1392 1393
	 * better idea of reply size
	 */
	if (procp->pc_xdrressize)
1394
		svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
Linus Torvalds's avatar
Linus Torvalds committed
1395 1396

	/* Call the function that processes the request. */
1397
	rc = process.dispatch(rqstp);
1398 1399
	if (procp->pc_release)
		procp->pc_release(rqstp);
1400 1401
	xdr_finish_decode(xdr);

1402 1403
	if (!rc)
		goto dropit;
1404
	if (rqstp->rq_auth_stat != rpc_auth_ok)
1405
		goto err_bad_auth;
1406

1407
	if (*rqstp->rq_accept_statp != rpc_success)
1408
		xdr_truncate_encode(xdr, aoffset);
Linus Torvalds's avatar
Linus Torvalds committed
1409 1410 1411 1412 1413 1414

	if (procp->pc_encode == NULL)
		goto dropit;

 sendit:
	if (svc_authorise(rqstp))
1415
		goto close_xprt;
1416
	return 1;		/* Caller can now send it */
Linus Torvalds's avatar
Linus Torvalds committed
1417 1418 1419 1420 1421 1422

 dropit:
	svc_authorise(rqstp);	/* doesn't hurt to call this twice */
	dprintk("svc: svc_process dropit\n");
	return 0;

1423
 close:
1424 1425
	svc_authorise(rqstp);
close_xprt:
1426
	if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1427
		svc_xprt_close(rqstp->rq_xprt);
1428 1429 1430
	dprintk("svc: svc_process close\n");
	return 0;

Linus Torvalds's avatar
Linus Torvalds committed
1431
err_short_len:
1432 1433
	svc_printk(rqstp, "short len %u, dropping request\n",
		   rqstp->rq_arg.len);
1434
	goto close_xprt;
Linus Torvalds's avatar
Linus Torvalds committed
1435 1436 1437

err_bad_rpc:
	serv->sv_stats->rpcbadfmt++;
1438 1439 1440 1441 1442
	xdr_stream_encode_u32(xdr, RPC_MSG_DENIED);
	xdr_stream_encode_u32(xdr, RPC_MISMATCH);
	/* Only RPCv2 supported */
	xdr_stream_encode_u32(xdr, RPC_VERSION);
	xdr_stream_encode_u32(xdr, RPC_VERSION);
1443
	return 1;	/* don't wrap */
Linus Torvalds's avatar
Linus Torvalds committed
1444 1445

err_bad_auth:
1446 1447
	dprintk("svc: authentication failed (%d)\n",
		be32_to_cpu(rqstp->rq_auth_stat));
Linus Torvalds's avatar
Linus Torvalds committed
1448
	serv->sv_stats->rpcbadauth++;
1449 1450 1451 1452 1453
	/* Restore write pointer to location of reply status: */
	xdr_truncate_encode(xdr, XDR_UNIT * 2);
	xdr_stream_encode_u32(xdr, RPC_MSG_DENIED);
	xdr_stream_encode_u32(xdr, RPC_AUTH_ERROR);
	xdr_stream_encode_be32(xdr, rqstp->rq_auth_stat);
Linus Torvalds's avatar
Linus Torvalds committed
1454 1455 1456
	goto sendit;

err_bad_prog:
1457
	dprintk("svc: unknown program %d\n", rqstp->rq_prog);
Linus Torvalds's avatar
Linus Torvalds committed
1458
	serv->sv_stats->rpcbadfmt++;
1459
	*rqstp->rq_accept_statp = rpc_prog_unavail;
Linus Torvalds's avatar
Linus Torvalds committed
1460 1461 1462
	goto sendit;

err_bad_vers:
1463
	svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1464
		       rqstp->rq_vers, rqstp->rq_prog, progp->pg_name);
1465

Linus Torvalds's avatar
Linus Torvalds committed
1466
	serv->sv_stats->rpcbadfmt++;
1467 1468 1469 1470 1471 1472
	*rqstp->rq_accept_statp = rpc_prog_mismatch;

	/*
	 * svc_authenticate() has already added the verifier and
	 * advanced the stream just past rq_accept_statp.
	 */
1473 1474
	xdr_stream_encode_u32(xdr, process.mismatch.lovers);
	xdr_stream_encode_u32(xdr, process.mismatch.hivers);
Linus Torvalds's avatar
Linus Torvalds committed
1475 1476 1477
	goto sendit;

err_bad_proc:
1478
	svc_printk(rqstp, "unknown procedure (%d)\n", rqstp->rq_proc);
1479

Linus Torvalds's avatar
Linus Torvalds committed
1480
	serv->sv_stats->rpcbadfmt++;
1481
	*rqstp->rq_accept_statp = rpc_proc_unavail;
Linus Torvalds's avatar
Linus Torvalds committed
1482 1483
	goto sendit;

1484 1485 1486 1487
err_garbage_args:
	svc_printk(rqstp, "failed to decode RPC header\n");

	serv->sv_stats->rpcbadfmt++;
1488
	*rqstp->rq_accept_statp = rpc_garbage_args;
1489
	goto sendit;
1490

1491
err_system_err:
Linus Torvalds's avatar
Linus Torvalds committed
1492
	serv->sv_stats->rpcbadfmt++;
1493
	*rqstp->rq_accept_statp = rpc_system_err;
Linus Torvalds's avatar
Linus Torvalds committed
1494 1495
	goto sendit;
}
1496

1497 1498 1499 1500
/**
 * svc_process - Execute one RPC transaction
 * @rqstp: RPC transaction context
 *
1501
 */
1502
void svc_process(struct svc_rqst *rqstp)
1503 1504
{
	struct kvec		*resv = &rqstp->rq_res.head[0];
1505
	__be32 *p;
1506

1507 1508 1509 1510 1511 1512
#if IS_ENABLED(CONFIG_FAIL_SUNRPC)
	if (!fail_sunrpc.ignore_server_disconnect &&
	    should_fail(&fail_sunrpc.attr, 1))
		svc_xprt_deferred_close(rqstp->rq_xprt);
#endif

1513 1514 1515 1516
	/*
	 * Setup response xdr_buf.
	 * Initially it has just one page
	 */
1517
	rqstp->rq_next_page = &rqstp->rq_respages[1];
1518 1519
	resv->iov_base = page_address(rqstp->rq_respages[0]);
	resv->iov_len = 0;
1520
	rqstp->rq_res.pages = rqstp->rq_next_page;
1521 1522 1523 1524 1525 1526 1527
	rqstp->rq_res.len = 0;
	rqstp->rq_res.page_base = 0;
	rqstp->rq_res.page_len = 0;
	rqstp->rq_res.buflen = PAGE_SIZE;
	rqstp->rq_res.tail[0].iov_base = NULL;
	rqstp->rq_res.tail[0].iov_len = 0;

1528 1529 1530 1531 1532 1533
	svcxdr_init_decode(rqstp);
	p = xdr_inline_decode(&rqstp->rq_arg_stream, XDR_UNIT * 2);
	if (unlikely(!p))
		goto out_drop;
	rqstp->rq_xid = *p++;
	if (unlikely(*p != rpc_call))
1534
		goto out_baddir;
1535

1536
	if (!svc_process_common(rqstp))
1537
		goto out_drop;
1538 1539
	svc_send(rqstp);
	return;
1540

1541 1542
out_baddir:
	svc_printk(rqstp, "bad direction 0x%08x, dropping request\n",
1543
		   be32_to_cpu(*p));
1544
	rqstp->rq_server->sv_stats->rpcbadfmt++;
1545 1546
out_drop:
	svc_drop(rqstp);
1547 1548
}

1549
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1550 1551 1552 1553 1554
/**
 * svc_process_bc - process a reverse-direction RPC request
 * @req: RPC request to be used for client-side processing
 * @rqstp: server-side execution context
 *
1555
 */
1556
void svc_process_bc(struct rpc_rqst *req, struct svc_rqst *rqstp)
1557
{
Chuck Lever's avatar
Chuck Lever committed
1558
	struct rpc_task *task;
1559
	int proc_error;
1560 1561 1562 1563

	/* Build the svc_rqst used by the common processing routine */
	rqstp->rq_xid = req->rq_xid;
	rqstp->rq_prot = req->rq_xprt->prot;
1564
	rqstp->rq_bc_net = req->rq_xprt->xprt_net;
1565 1566 1567 1568 1569

	rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
	memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
	memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
	memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1570 1571

	/* Adjust the argument buffer length */
1572
	rqstp->rq_arg.len = req->rq_private_buf.len;
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582
	if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
		rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
		rqstp->rq_arg.page_len = 0;
	} else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
			rqstp->rq_arg.page_len)
		rqstp->rq_arg.page_len = rqstp->rq_arg.len -
			rqstp->rq_arg.head[0].iov_len;
	else
		rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
			rqstp->rq_arg.page_len;
1583

1584 1585
	/* Reset the response buffer */
	rqstp->rq_res.head[0].iov_len = 0;
1586

1587
	/*
1588 1589
	 * Skip the XID and calldir fields because they've already
	 * been processed by the caller.
1590
	 */
1591
	svcxdr_init_decode(rqstp);
1592 1593
	if (!xdr_inline_decode(&rqstp->rq_arg_stream, XDR_UNIT * 2))
		return;
1594

Chuck Lever's avatar
Chuck Lever committed
1595
	/* Parse and execute the bc call */
1596
	proc_error = svc_process_common(rqstp);
1597

1598
	atomic_dec(&req->rq_xprt->bc_slot_count);
1599
	if (!proc_error) {
Chuck Lever's avatar
Chuck Lever committed
1600
		/* Processing error: drop the request */
1601
		xprt_free_bc_request(req);
1602
		return;
1603
	}
Chuck Lever's avatar
Chuck Lever committed
1604 1605
	/* Finally, send the reply synchronously */
	memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1606
	task = rpc_run_bc_task(req);
1607 1608
	if (IS_ERR(task))
		return;
Chuck Lever's avatar
Chuck Lever committed
1609 1610 1611

	WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
	rpc_put_task(task);
1612
}
1613
EXPORT_SYMBOL_GPL(svc_process_bc);
1614
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1615

1616 1617 1618 1619 1620 1621
/**
 * svc_max_payload - Return transport-specific limit on the RPC payload
 * @rqstp: RPC transaction context
 *
 * Returns the maximum number of payload bytes the current transport
 * allows.
1622 1623 1624
 */
u32 svc_max_payload(const struct svc_rqst *rqstp)
{
1625
	u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1626

1627 1628
	if (rqstp->rq_server->sv_max_payload < max)
		max = rqstp->rq_server->sv_max_payload;
1629 1630 1631
	return max;
}
EXPORT_SYMBOL_GPL(svc_max_payload);
1632

1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647
/**
 * svc_proc_name - Return RPC procedure name in string form
 * @rqstp: svc_rqst to operate on
 *
 * Return value:
 *   Pointer to a NUL-terminated string
 */
const char *svc_proc_name(const struct svc_rqst *rqstp)
{
	if (rqstp && rqstp->rq_procinfo)
		return rqstp->rq_procinfo->pc_name;
	return "unknown";
}


1648
/**
1649
 * svc_encode_result_payload - mark a range of bytes as a result payload
1650 1651 1652 1653 1654 1655 1656
 * @rqstp: svc_rqst to operate on
 * @offset: payload's byte offset in rqstp->rq_res
 * @length: size of payload, in bytes
 *
 * Returns zero on success, or a negative errno if a permanent
 * error occurred.
 */
1657 1658
int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset,
			      unsigned int length)
1659
{
1660 1661
	return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset,
							   length);
1662
}
1663
EXPORT_SYMBOL_GPL(svc_encode_result_payload);
1664

1665 1666 1667
/**
 * svc_fill_write_vector - Construct data argument for VFS write call
 * @rqstp: svc_rqst to operate on
1668
 * @payload: xdr_buf containing only the write data payload
1669
 *
1670
 * Fills in rqstp::rq_vec, and returns the number of elements.
1671
 */
1672 1673
unsigned int svc_fill_write_vector(struct svc_rqst *rqstp,
				   struct xdr_buf *payload)
1674
{
1675 1676
	struct page **pages = payload->pages;
	struct kvec *first = payload->head;
1677
	struct kvec *vec = rqstp->rq_vec;
1678
	size_t total = payload->len;
1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703
	unsigned int i;

	/* Some types of transport can present the write payload
	 * entirely in rq_arg.pages. In this case, @first is empty.
	 */
	i = 0;
	if (first->iov_len) {
		vec[i].iov_base = first->iov_base;
		vec[i].iov_len = min_t(size_t, total, first->iov_len);
		total -= vec[i].iov_len;
		++i;
	}

	while (total) {
		vec[i].iov_base = page_address(*pages);
		vec[i].iov_len = min_t(size_t, total, PAGE_SIZE);
		total -= vec[i].iov_len;
		++i;
		++pages;
	}

	WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec));
	return i;
}
EXPORT_SYMBOL_GPL(svc_fill_write_vector);
1704 1705 1706 1707 1708

/**
 * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call
 * @rqstp: svc_rqst to operate on
 * @first: buffer containing first section of pathname
1709
 * @p: buffer containing remaining section of pathname
1710 1711
 * @total: total length of the pathname argument
 *
1712 1713 1714
 * The VFS symlink API demands a NUL-terminated pathname in mapped memory.
 * Returns pointer to a NUL-terminated string, or an ERR_PTR. Caller must free
 * the returned string.
1715 1716
 */
char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first,
1717
				void *p, size_t total)
1718
{
1719 1720
	size_t len, remaining;
	char *result, *dst;
1721

1722 1723 1724
	result = kmalloc(total + 1, GFP_KERNEL);
	if (!result)
		return ERR_PTR(-ESERVERFAULT);
1725

1726 1727
	dst = result;
	remaining = total;
1728

1729 1730
	len = min_t(size_t, total, first->iov_len);
	if (len) {
1731 1732 1733
		memcpy(dst, first->iov_base, len);
		dst += len;
		remaining -= len;
1734
	}
1735

1736 1737 1738 1739
	if (remaining) {
		len = min_t(size_t, remaining, PAGE_SIZE);
		memcpy(dst, p, len);
		dst += len;
1740 1741
	}

1742 1743 1744
	*dst = '\0';

	/* Sanity check: Linux doesn't allow the pathname argument to
1745 1746
	 * contain a NUL byte.
	 */
1747 1748
	if (strlen(result) != total) {
		kfree(result);
1749
		return ERR_PTR(-EINVAL);
1750
	}
1751 1752 1753
	return result;
}
EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname);