messenger.c 55.3 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
Sage Weil's avatar
Sage Weil committed
3 4 5 6 7 8 9

#include <linux/crc32c.h>
#include <linux/ctype.h>
#include <linux/highmem.h>
#include <linux/inet.h>
#include <linux/kthread.h>
#include <linux/net.h>
10
#include <linux/nsproxy.h>
11
#include <linux/sched/mm.h>
12
#include <linux/slab.h>
Sage Weil's avatar
Sage Weil committed
13 14
#include <linux/socket.h>
#include <linux/string.h>
15
#ifdef	CONFIG_BLOCK
16
#include <linux/bio.h>
17
#endif	/* CONFIG_BLOCK */
Noah Watkins's avatar
Noah Watkins committed
18
#include <linux/dns_resolver.h>
Sage Weil's avatar
Sage Weil committed
19
#include <net/tcp.h>
20
#include <trace/events/sock.h>
Sage Weil's avatar
Sage Weil committed
21

22
#include <linux/ceph/ceph_features.h>
23 24 25 26
#include <linux/ceph/libceph.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/pagelist.h>
27
#include <linux/export.h>
Sage Weil's avatar
Sage Weil committed
28 29 30 31 32 33 34 35 36 37

/*
 * Ceph uses the messenger to exchange ceph_msg messages with other
 * hosts in the system.  The messenger provides ordered and reliable
 * delivery.  We tolerate TCP disconnects by reconnecting (with
 * exponential backoff) in the case of a fault (disconnection, bad
 * crc, protocol error).  Acks allow sent messages to be discarded by
 * the sender.
 */

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
/*
 * We track the state of the socket on a given connection using
 * values defined below.  The transition to a new socket state is
 * handled by a function which verifies we aren't coming from an
 * unexpected state.
 *
 *      --------
 *      | NEW* |  transient initial state
 *      --------
 *          | con_sock_state_init()
 *          v
 *      ----------
 *      | CLOSED |  initialized, but no socket (and no
 *      ----------  TCP connection)
 *       ^      \
 *       |       \ con_sock_state_connecting()
 *       |        ----------------------
 *       |                              \
 *       + con_sock_state_closed()       \
57 58 59 60 61 62 63 64 65 66 67
 *       |+---------------------------    \
 *       | \                          \    \
 *       |  -----------                \    \
 *       |  | CLOSING |  socket event;  \    \
 *       |  -----------  await close     \    \
 *       |       ^                        \   |
 *       |       |                         \  |
 *       |       + con_sock_state_closing() \ |
 *       |      / \                         | |
 *       |     /   ---------------          | |
 *       |    /                   \         v v
68 69 70 71 72 73 74 75 76 77 78
 *       |   /                    --------------
 *       |  /    -----------------| CONNECTING |  socket created, TCP
 *       |  |   /                 --------------  connect initiated
 *       |  |   | con_sock_state_connected()
 *       |  |   v
 *      -------------
 *      | CONNECTED |  TCP connection established
 *      -------------
 *
 * State values for ceph_connection->sock_state; NEW is assumed to be 0.
 */
79 80 81 82 83 84 85

#define CON_SOCK_STATE_NEW		0	/* -> CLOSED */
#define CON_SOCK_STATE_CLOSED		1	/* -> CONNECTING */
#define CON_SOCK_STATE_CONNECTING	2	/* -> CONNECTED or -> CLOSING */
#define CON_SOCK_STATE_CONNECTED	3	/* -> CLOSING or -> CLOSED */
#define CON_SOCK_STATE_CLOSING		4	/* -> CLOSED */

86 87 88
static bool con_flag_valid(unsigned long con_flag)
{
	switch (con_flag) {
89 90 91 92 93
	case CEPH_CON_F_LOSSYTX:
	case CEPH_CON_F_KEEPALIVE_PENDING:
	case CEPH_CON_F_WRITE_PENDING:
	case CEPH_CON_F_SOCK_CLOSED:
	case CEPH_CON_F_BACKOFF:
94 95 96 97 98 99
		return true;
	default:
		return false;
	}
}

100
void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
101 102 103 104 105 106
{
	BUG_ON(!con_flag_valid(con_flag));

	clear_bit(con_flag, &con->flags);
}

107
void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag)
108 109 110 111 112 113
{
	BUG_ON(!con_flag_valid(con_flag));

	set_bit(con_flag, &con->flags);
}

114
bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag)
115 116 117 118 119 120
{
	BUG_ON(!con_flag_valid(con_flag));

	return test_bit(con_flag, &con->flags);
}

121 122
bool ceph_con_flag_test_and_clear(struct ceph_connection *con,
				  unsigned long con_flag)
123 124 125 126 127 128
{
	BUG_ON(!con_flag_valid(con_flag));

	return test_and_clear_bit(con_flag, &con->flags);
}

129 130
bool ceph_con_flag_test_and_set(struct ceph_connection *con,
				unsigned long con_flag)
131 132 133 134 135 136
{
	BUG_ON(!con_flag_valid(con_flag));

	return test_and_set_bit(con_flag, &con->flags);
}

137 138 139 140
/* Slab caches for frequently-allocated structures */

static struct kmem_cache	*ceph_msg_cache;

141 142 143 144
#ifdef CONFIG_LOCKDEP
static struct lock_class_key socket_class;
#endif

Sage Weil's avatar
Sage Weil committed
145
static void queue_con(struct ceph_connection *con);
146
static void cancel_con(struct ceph_connection *con);
147
static void ceph_con_workfn(struct work_struct *);
148
static void con_fault(struct ceph_connection *con);
Sage Weil's avatar
Sage Weil committed
149 150

/*
151 152
 * Nicely render a sockaddr as a string.  An array of formatted
 * strings is used, to approximate reentrancy.
Sage Weil's avatar
Sage Weil committed
153
 */
154 155 156 157 158 159 160
#define ADDR_STR_COUNT_LOG	5	/* log2(# address strings in array) */
#define ADDR_STR_COUNT		(1 << ADDR_STR_COUNT_LOG)
#define ADDR_STR_COUNT_MASK	(ADDR_STR_COUNT - 1)
#define MAX_ADDR_STR_LEN	64	/* 54 is enough */

static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
static atomic_t addr_str_seq = ATOMIC_INIT(0);
Sage Weil's avatar
Sage Weil committed
161

Ilya Dryomov's avatar
Ilya Dryomov committed
162
struct page *ceph_zero_page;		/* used in certain error cases */
163

164
const char *ceph_pr_addr(const struct ceph_entity_addr *addr)
Sage Weil's avatar
Sage Weil committed
165 166 167
{
	int i;
	char *s;
168 169 170
	struct sockaddr_storage ss = addr->in_addr; /* align */
	struct sockaddr_in *in4 = (struct sockaddr_in *)&ss;
	struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss;
Sage Weil's avatar
Sage Weil committed
171

172
	i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
Sage Weil's avatar
Sage Weil committed
173 174
	s = addr_str[i];

175
	switch (ss.ss_family) {
Sage Weil's avatar
Sage Weil committed
176
	case AF_INET:
177 178
		snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu",
			 le32_to_cpu(addr->type), &in4->sin_addr,
179
			 ntohs(in4->sin_port));
Sage Weil's avatar
Sage Weil committed
180 181 182
		break;

	case AF_INET6:
183 184
		snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu",
			 le32_to_cpu(addr->type), &in6->sin6_addr,
185
			 ntohs(in6->sin6_port));
Sage Weil's avatar
Sage Weil committed
186 187 188
		break;

	default:
Alex Elder's avatar
Alex Elder committed
189
		snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
190
			 ss.ss_family);
Sage Weil's avatar
Sage Weil committed
191 192 193 194
	}

	return s;
}
195
EXPORT_SYMBOL(ceph_pr_addr);
Sage Weil's avatar
Sage Weil committed
196

197
void ceph_encode_my_addr(struct ceph_messenger *msgr)
198
{
199 200 201 202 203
	if (!ceph_msgr2(from_msgr(msgr))) {
		memcpy(&msgr->my_enc_addr, &msgr->inst.addr,
		       sizeof(msgr->my_enc_addr));
		ceph_encode_banner_addr(&msgr->my_enc_addr);
	}
204 205
}

Sage Weil's avatar
Sage Weil committed
206 207 208
/*
 * work queue for all reading and writing to/from the socket.
 */
209
static struct workqueue_struct *ceph_msgr_wq;
Sage Weil's avatar
Sage Weil committed
210

211 212 213
static int ceph_msgr_slab_init(void)
{
	BUG_ON(ceph_msg_cache);
214
	ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
215 216 217
	if (!ceph_msg_cache)
		return -ENOMEM;

218
	return 0;
219 220 221 222 223 224 225 226 227
}

static void ceph_msgr_slab_exit(void)
{
	BUG_ON(!ceph_msg_cache);
	kmem_cache_destroy(ceph_msg_cache);
	ceph_msg_cache = NULL;
}

228
static void _ceph_msgr_exit(void)
229
{
Alex Elder's avatar
Alex Elder committed
230
	if (ceph_msgr_wq) {
231
		destroy_workqueue(ceph_msgr_wq);
Alex Elder's avatar
Alex Elder committed
232 233
		ceph_msgr_wq = NULL;
	}
234

Ilya Dryomov's avatar
Ilya Dryomov committed
235 236 237
	BUG_ON(!ceph_zero_page);
	put_page(ceph_zero_page);
	ceph_zero_page = NULL;
238 239

	ceph_msgr_slab_exit();
240 241
}

242
int __init ceph_msgr_init(void)
Sage Weil's avatar
Sage Weil committed
243
{
244 245 246
	if (ceph_msgr_slab_init())
		return -ENOMEM;

Ilya Dryomov's avatar
Ilya Dryomov committed
247 248 249
	BUG_ON(ceph_zero_page);
	ceph_zero_page = ZERO_PAGE(0);
	get_page(ceph_zero_page);
250

251 252 253 254 255
	/*
	 * The number of active work items is limited by the number of
	 * connections, so leave @max_active at default.
	 */
	ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
256 257
	if (ceph_msgr_wq)
		return 0;
258

259 260
	pr_err("msgr_init failed to create workqueue\n");
	_ceph_msgr_exit();
261

262
	return -ENOMEM;
Sage Weil's avatar
Sage Weil committed
263 264 265 266
}

void ceph_msgr_exit(void)
{
267 268
	BUG_ON(ceph_msgr_wq == NULL);

269
	_ceph_msgr_exit();
Sage Weil's avatar
Sage Weil committed
270 271
}

Yehuda Sadeh's avatar
Yehuda Sadeh committed
272
void ceph_msgr_flush(void)
273 274 275
{
	flush_workqueue(ceph_msgr_wq);
}
276
EXPORT_SYMBOL(ceph_msgr_flush);
277

278 279 280 281 282 283 284 285 286
/* Connection socket state transition functions */

static void con_sock_state_init(struct ceph_connection *con)
{
	int old_state;

	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
	if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
		printk("%s: unexpected old state %d\n", __func__, old_state);
287 288
	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
	     CON_SOCK_STATE_CLOSED);
289 290 291 292 293 294 295 296 297
}

static void con_sock_state_connecting(struct ceph_connection *con)
{
	int old_state;

	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
	if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
		printk("%s: unexpected old state %d\n", __func__, old_state);
298 299
	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
	     CON_SOCK_STATE_CONNECTING);
300 301 302 303 304 305 306 307 308
}

static void con_sock_state_connected(struct ceph_connection *con)
{
	int old_state;

	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
	if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
		printk("%s: unexpected old state %d\n", __func__, old_state);
309 310
	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
	     CON_SOCK_STATE_CONNECTED);
311 312 313 314 315 316 317 318 319 320 321
}

static void con_sock_state_closing(struct ceph_connection *con)
{
	int old_state;

	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
	if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
			old_state != CON_SOCK_STATE_CONNECTED &&
			old_state != CON_SOCK_STATE_CLOSING))
		printk("%s: unexpected old state %d\n", __func__, old_state);
322 323
	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
	     CON_SOCK_STATE_CLOSING);
324 325 326 327 328 329 330 331
}

static void con_sock_state_closed(struct ceph_connection *con)
{
	int old_state;

	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
	if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
332
		    old_state != CON_SOCK_STATE_CLOSING &&
333 334
		    old_state != CON_SOCK_STATE_CONNECTING &&
		    old_state != CON_SOCK_STATE_CLOSED))
335
		printk("%s: unexpected old state %d\n", __func__, old_state);
336 337
	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
	     CON_SOCK_STATE_CLOSED);
338
}
339

Sage Weil's avatar
Sage Weil committed
340 341 342 343 344
/*
 * socket callback functions
 */

/* data available on socket, or listen socket received a connect */
345
static void ceph_sock_data_ready(struct sock *sk)
Sage Weil's avatar
Sage Weil committed
346
{
347
	struct ceph_connection *con = sk->sk_user_data;
348 349 350

	trace_sk_data_ready(sk);

351 352 353
	if (atomic_read(&con->msgr->stopping)) {
		return;
	}
354

Sage Weil's avatar
Sage Weil committed
355
	if (sk->sk_state != TCP_CLOSE_WAIT) {
356
		dout("%s %p state = %d, queueing work\n", __func__,
Sage Weil's avatar
Sage Weil committed
357 358 359 360 361 362
		     con, con->state);
		queue_con(con);
	}
}

/* socket has buffer space for writing */
363
static void ceph_sock_write_space(struct sock *sk)
Sage Weil's avatar
Sage Weil committed
364
{
Alex Elder's avatar
Alex Elder committed
365
	struct ceph_connection *con = sk->sk_user_data;
Sage Weil's avatar
Sage Weil committed
366

367 368
	/* only queue to workqueue if there is data we want to write,
	 * and there is sufficient space in the socket buffer to accept
369
	 * more data.  clear SOCK_NOSPACE so that ceph_sock_write_space()
370 371 372 373
	 * doesn't get called again until try_write() fills the socket
	 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
	 * and net/core/stream.c:sk_stream_write_space().
	 */
374
	if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) {
375
		if (sk_stream_is_writeable(sk)) {
376
			dout("%s %p queueing write work\n", __func__, con);
377 378 379
			clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
			queue_con(con);
		}
Sage Weil's avatar
Sage Weil committed
380
	} else {
381
		dout("%s %p nothing to write\n", __func__, con);
Sage Weil's avatar
Sage Weil committed
382 383 384 385
	}
}

/* socket's state has changed */
386
static void ceph_sock_state_change(struct sock *sk)
Sage Weil's avatar
Sage Weil committed
387
{
388
	struct ceph_connection *con = sk->sk_user_data;
Sage Weil's avatar
Sage Weil committed
389

390
	dout("%s %p state = %d sk_state = %u\n", __func__,
Sage Weil's avatar
Sage Weil committed
391 392 393 394
	     con, con->state, sk->sk_state);

	switch (sk->sk_state) {
	case TCP_CLOSE:
395
		dout("%s TCP_CLOSE\n", __func__);
396
		fallthrough;
Sage Weil's avatar
Sage Weil committed
397
	case TCP_CLOSE_WAIT:
398
		dout("%s TCP_CLOSE_WAIT\n", __func__);
399
		con_sock_state_closing(con);
400
		ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED);
401
		queue_con(con);
Sage Weil's avatar
Sage Weil committed
402 403
		break;
	case TCP_ESTABLISHED:
404
		dout("%s TCP_ESTABLISHED\n", __func__);
405
		con_sock_state_connected(con);
Sage Weil's avatar
Sage Weil committed
406 407
		queue_con(con);
		break;
Alex Elder's avatar
Alex Elder committed
408 409
	default:	/* Everything else is uninteresting */
		break;
Sage Weil's avatar
Sage Weil committed
410 411 412 413 414 415 416 417 418 419
	}
}

/*
 * set up socket callbacks
 */
static void set_sock_callbacks(struct socket *sock,
			       struct ceph_connection *con)
{
	struct sock *sk = sock->sk;
420
	sk->sk_user_data = con;
421 422 423
	sk->sk_data_ready = ceph_sock_data_ready;
	sk->sk_write_space = ceph_sock_write_space;
	sk->sk_state_change = ceph_sock_state_change;
Sage Weil's avatar
Sage Weil committed
424 425 426 427 428 429 430 431 432 433
}


/*
 * socket helpers
 */

/*
 * initiate connection to a remote socket.
 */
434
int ceph_tcp_connect(struct ceph_connection *con)
Sage Weil's avatar
Sage Weil committed
435
{
436
	struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */
Sage Weil's avatar
Sage Weil committed
437
	struct socket *sock;
438
	unsigned int noio_flag;
Sage Weil's avatar
Sage Weil committed
439 440
	int ret;

441 442
	dout("%s con %p peer_addr %s\n", __func__, con,
	     ceph_pr_addr(&con->peer_addr));
Sage Weil's avatar
Sage Weil committed
443
	BUG_ON(con->sock);
444 445 446

	/* sock_create_kern() allocates with GFP_KERNEL */
	noio_flag = memalloc_noio_save();
447
	ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family,
448
			       SOCK_STREAM, IPPROTO_TCP, &sock);
449
	memalloc_noio_restore(noio_flag);
Sage Weil's avatar
Sage Weil committed
450
	if (ret)
451
		return ret;
452
	sock->sk->sk_allocation = GFP_NOFS;
453
	sock->sk->sk_use_task_frag = false;
Sage Weil's avatar
Sage Weil committed
454

455 456 457 458
#ifdef CONFIG_LOCKDEP
	lockdep_set_class(&sock->sk->sk_lock, &socket_class);
#endif

Sage Weil's avatar
Sage Weil committed
459 460
	set_sock_callbacks(sock, con);

461
	con_sock_state_connecting(con);
462 463
	ret = kernel_connect(sock, (struct sockaddr *)&ss, sizeof(ss),
			     O_NONBLOCK);
Sage Weil's avatar
Sage Weil committed
464 465
	if (ret == -EINPROGRESS) {
		dout("connect %s EINPROGRESS sk_state = %u\n",
466
		     ceph_pr_addr(&con->peer_addr),
Sage Weil's avatar
Sage Weil committed
467
		     sock->sk->sk_state);
468
	} else if (ret < 0) {
Sage Weil's avatar
Sage Weil committed
469
		pr_err("connect %s error %d\n",
470
		       ceph_pr_addr(&con->peer_addr), ret);
Sage Weil's avatar
Sage Weil committed
471
		sock_release(sock);
472
		return ret;
473
	}
474

475 476
	if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
		tcp_sock_set_nodelay(sock->sk);
477

478
	con->sock = sock;
479
	return 0;
Sage Weil's avatar
Sage Weil committed
480 481 482 483 484
}

/*
 * Shutdown/close the socket for the given connection.
 */
485
int ceph_con_close_socket(struct ceph_connection *con)
Sage Weil's avatar
Sage Weil committed
486
{
487
	int rc = 0;
Sage Weil's avatar
Sage Weil committed
488

489
	dout("%s con %p sock %p\n", __func__, con, con->sock);
490 491 492 493 494
	if (con->sock) {
		rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
		sock_release(con->sock);
		con->sock = NULL;
	}
495 496

	/*
Sage Weil's avatar
Sage Weil committed
497
	 * Forcibly clear the SOCK_CLOSED flag.  It gets set
498 499 500 501
	 * independent of the connection mutex, and we could have
	 * received a socket close event before we had the chance to
	 * shut the socket down.
	 */
502
	ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED);
503

504
	con_sock_state_closed(con);
Sage Weil's avatar
Sage Weil committed
505 506 507
	return rc;
}

508 509 510 511
static void ceph_con_reset_protocol(struct ceph_connection *con)
{
	dout("%s con %p\n", __func__, con);

512
	ceph_con_close_socket(con);
513 514 515 516 517 518 519 520 521 522
	if (con->in_msg) {
		WARN_ON(con->in_msg->con != con);
		ceph_msg_put(con->in_msg);
		con->in_msg = NULL;
	}
	if (con->out_msg) {
		WARN_ON(con->out_msg->con != con);
		ceph_msg_put(con->out_msg);
		con->out_msg = NULL;
	}
523 524 525 526
	if (con->bounce_page) {
		__free_page(con->bounce_page);
		con->bounce_page = NULL;
	}
527

528 529 530 531
	if (ceph_msgr2(from_msgr(con->msgr)))
		ceph_con_v2_reset_protocol(con);
	else
		ceph_con_v1_reset_protocol(con);
532 533
}

Sage Weil's avatar
Sage Weil committed
534 535 536 537 538 539 540
/*
 * Reset a connection.  Discard all incoming and outgoing messages
 * and clear *_seq state.
 */
static void ceph_msg_remove(struct ceph_msg *msg)
{
	list_del_init(&msg->list_head);
541

Sage Weil's avatar
Sage Weil committed
542 543
	ceph_msg_put(msg);
}
544

Sage Weil's avatar
Sage Weil committed
545 546 547 548 549 550 551 552 553
static void ceph_msg_remove_list(struct list_head *head)
{
	while (!list_empty(head)) {
		struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
							list_head);
		ceph_msg_remove(msg);
	}
}

554
void ceph_con_reset_session(struct ceph_connection *con)
Sage Weil's avatar
Sage Weil committed
555
{
556
	dout("%s con %p\n", __func__, con);
557 558 559

	WARN_ON(con->in_msg);
	WARN_ON(con->out_msg);
Sage Weil's avatar
Sage Weil committed
560 561 562 563
	ceph_msg_remove_list(&con->out_queue);
	ceph_msg_remove_list(&con->out_sent);
	con->out_seq = 0;
	con->in_seq = 0;
564
	con->in_seq_acked = 0;
565

566 567 568 569
	if (ceph_msgr2(from_msgr(con->msgr)))
		ceph_con_v2_reset_session(con);
	else
		ceph_con_v1_reset_session(con);
Sage Weil's avatar
Sage Weil committed
570 571 572 573 574 575 576
}

/*
 * mark a peer down.  drop any open connections.
 */
void ceph_con_close(struct ceph_connection *con)
{
577
	mutex_lock(&con->mutex);
578
	dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
579
	con->state = CEPH_CON_S_CLOSED;
580

581 582 583 584 585
	ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX);  /* so we retry next
							  connect */
	ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING);
	ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
	ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF);
586

587
	ceph_con_reset_protocol(con);
588
	ceph_con_reset_session(con);
589
	cancel_con(con);
590
	mutex_unlock(&con->mutex);
Sage Weil's avatar
Sage Weil committed
591
}
592
EXPORT_SYMBOL(ceph_con_close);
Sage Weil's avatar
Sage Weil committed
593 594 595 596

/*
 * Reopen a closed connection, with a new peer address.
 */
597 598 599
void ceph_con_open(struct ceph_connection *con,
		   __u8 entity_type, __u64 entity_num,
		   struct ceph_entity_addr *addr)
Sage Weil's avatar
Sage Weil committed
600
{
601
	mutex_lock(&con->mutex);
602
	dout("con_open %p %s\n", con, ceph_pr_addr(addr));
603

604 605
	WARN_ON(con->state != CEPH_CON_S_CLOSED);
	con->state = CEPH_CON_S_PREOPEN;
606

607 608 609
	con->peer_name.type = (__u8) entity_type;
	con->peer_name.num = cpu_to_le64(entity_num);

Sage Weil's avatar
Sage Weil committed
610
	memcpy(&con->peer_addr, addr, sizeof(*addr));
611
	con->delay = 0;      /* reset backoff memory */
612
	mutex_unlock(&con->mutex);
Sage Weil's avatar
Sage Weil committed
613 614
	queue_con(con);
}
615
EXPORT_SYMBOL(ceph_con_open);
Sage Weil's avatar
Sage Weil committed
616

617 618 619 620 621
/*
 * return true if this connection ever successfully opened
 */
bool ceph_con_opened(struct ceph_connection *con)
{
622 623 624
	if (ceph_msgr2(from_msgr(con->msgr)))
		return ceph_con_v2_opened(con);

625
	return ceph_con_v1_opened(con);
626 627
}

Sage Weil's avatar
Sage Weil committed
628 629 630
/*
 * initialize a new connection.
 */
631 632
void ceph_con_init(struct ceph_connection *con, void *private,
	const struct ceph_connection_operations *ops,
633
	struct ceph_messenger *msgr)
Sage Weil's avatar
Sage Weil committed
634 635 636
{
	dout("con_init %p\n", con);
	memset(con, 0, sizeof(*con));
637 638
	con->private = private;
	con->ops = ops;
Sage Weil's avatar
Sage Weil committed
639
	con->msgr = msgr;
640 641 642

	con_sock_state_init(con);

643
	mutex_init(&con->mutex);
Sage Weil's avatar
Sage Weil committed
644 645
	INIT_LIST_HEAD(&con->out_queue);
	INIT_LIST_HEAD(&con->out_sent);
646
	INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
647

648
	con->state = CEPH_CON_S_CLOSED;
Sage Weil's avatar
Sage Weil committed
649
}
650
EXPORT_SYMBOL(ceph_con_init);
Sage Weil's avatar
Sage Weil committed
651 652 653 654 655

/*
 * We maintain a global counter to order connection attempts.  Get
 * a unique seq greater than @gt.
 */
656
u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt)
Sage Weil's avatar
Sage Weil committed
657 658 659 660 661 662 663 664 665 666 667
{
	u32 ret;

	spin_lock(&msgr->global_seq_lock);
	if (msgr->global_seq < gt)
		msgr->global_seq = gt;
	ret = ++msgr->global_seq;
	spin_unlock(&msgr->global_seq_lock);
	return ret;
}

668 669 670
/*
 * Discard messages that have been acked by the server.
 */
671
void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
{
	struct ceph_msg *msg;
	u64 seq;

	dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq);
	while (!list_empty(&con->out_sent)) {
		msg = list_first_entry(&con->out_sent, struct ceph_msg,
				       list_head);
		WARN_ON(msg->needs_out_seq);
		seq = le64_to_cpu(msg->hdr.seq);
		if (seq > ack_seq)
			break;

		dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
		     msg, seq);
		ceph_msg_remove(msg);
	}
}

/*
 * Discard messages that have been requeued in con_fault(), up to
 * reconnect_seq.  This avoids gratuitously resending messages that
 * the server had received and handled prior to reconnect.
 */
696
void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716
{
	struct ceph_msg *msg;
	u64 seq;

	dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
	while (!list_empty(&con->out_queue)) {
		msg = list_first_entry(&con->out_queue, struct ceph_msg,
				       list_head);
		if (msg->needs_out_seq)
			break;
		seq = le64_to_cpu(msg->hdr.seq);
		if (seq > reconnect_seq)
			break;

		dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
		     msg, seq);
		ceph_msg_remove(msg);
	}
}

717
#ifdef CONFIG_BLOCK
718 719 720 721 722 723

/*
 * For a bio data item, a piece is whatever remains of the next
 * entry in the current bio iovec, or the first entry in the next
 * bio in the list.
 */
724
static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
725
					size_t length)
726
{
727
	struct ceph_msg_data *data = cursor->data;
728
	struct ceph_bio_iter *it = &cursor->bio_iter;
729

730 731 732 733
	cursor->resid = min_t(size_t, length, data->bio_length);
	*it = data->bio_pos;
	if (cursor->resid < it->iter.bi_size)
		it->iter.bi_size = cursor->resid;
734

735
	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
736 737
}

738
static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
739 740 741
						size_t *page_offset,
						size_t *length)
{
742 743
	struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
					   cursor->bio_iter.iter);
744

745 746 747
	*page_offset = bv.bv_offset;
	*length = bv.bv_len;
	return bv.bv_page;
748 749
}

750 751
static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
					size_t bytes)
752
{
753
	struct ceph_bio_iter *it = &cursor->bio_iter;
754
	struct page *page = bio_iter_page(it->bio, it->iter);
755

756 757
	BUG_ON(bytes > cursor->resid);
	BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
758
	cursor->resid -= bytes;
759
	bio_advance_iter(it->bio, &it->iter, bytes);
760

761
	if (!cursor->resid)
762
		return false;   /* no more data */
763

764 765
	if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done &&
		       page == bio_iter_page(it->bio, it->iter)))
766 767
		return false;	/* more bytes to process in this segment */

768 769 770 771 772
	if (!it->iter.bi_size) {
		it->bio = it->bio->bi_next;
		it->iter = it->bio->bi_iter;
		if (cursor->resid < it->iter.bi_size)
			it->iter.bi_size = cursor->resid;
773
	}
774

775
	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
776 777
	return true;
}
778
#endif /* CONFIG_BLOCK */
779

780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
					size_t length)
{
	struct ceph_msg_data *data = cursor->data;
	struct bio_vec *bvecs = data->bvec_pos.bvecs;

	cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
	cursor->bvec_iter = data->bvec_pos.iter;
	cursor->bvec_iter.bi_size = cursor->resid;

	BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
}

static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
						size_t *page_offset,
						size_t *length)
{
	struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
					   cursor->bvec_iter);

	*page_offset = bv.bv_offset;
	*length = bv.bv_len;
	return bv.bv_page;
}

static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
					size_t bytes)
{
	struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
809
	struct page *page = bvec_iter_page(bvecs, cursor->bvec_iter);
810 811 812 813 814 815

	BUG_ON(bytes > cursor->resid);
	BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
	cursor->resid -= bytes;
	bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);

816
	if (!cursor->resid)
817 818
		return false;   /* no more data */

819 820
	if (!bytes || (cursor->bvec_iter.bi_bvec_done &&
		       page == bvec_iter_page(bvecs, cursor->bvec_iter)))
821 822 823 824 825 826
		return false;	/* more bytes to process in this segment */

	BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
	return true;
}

827 828 829 830
/*
 * For a page array, a piece comes from the first page in the array
 * that has not already been fully consumed.
 */
831
static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
832
					size_t length)
833
{
834
	struct ceph_msg_data *data = cursor->data;
835 836 837 838 839 840 841
	int page_count;

	BUG_ON(data->type != CEPH_MSG_DATA_PAGES);

	BUG_ON(!data->pages);
	BUG_ON(!data->length);

842
	cursor->resid = min(length, data->length);
843 844 845
	page_count = calc_pages_for(data->alignment, (u64)data->length);
	cursor->page_offset = data->alignment & ~PAGE_MASK;
	cursor->page_index = 0;
846 847 848
	BUG_ON(page_count > (int)USHRT_MAX);
	cursor->page_count = (unsigned short)page_count;
	BUG_ON(length > SIZE_MAX - cursor->page_offset);
849 850
}

851 852 853
static struct page *
ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
					size_t *page_offset, size_t *length)
854
{
855
	struct ceph_msg_data *data = cursor->data;
856 857 858 859 860 861 862

	BUG_ON(data->type != CEPH_MSG_DATA_PAGES);

	BUG_ON(cursor->page_index >= cursor->page_count);
	BUG_ON(cursor->page_offset >= PAGE_SIZE);

	*page_offset = cursor->page_offset;
863
	*length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset);
864 865 866
	return data->pages[cursor->page_index];
}

867
static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
868 869
						size_t bytes)
{
870
	BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
871 872 873 874 875 876

	BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);

	/* Advance the cursor page offset */

	cursor->resid -= bytes;
877 878
	cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
	if (!bytes || cursor->page_offset)
879 880
		return false;	/* more bytes to process in the current page */

881 882 883
	if (!cursor->resid)
		return false;   /* no more data */

884
	/* Move on to the next page; offset is already at 0 */
885 886 887 888 889 890

	BUG_ON(cursor->page_index >= cursor->page_count);
	cursor->page_index++;
	return true;
}

891
/*
892 893
 * For a pagelist, a piece is whatever remains to be consumed in the
 * first page in the list, or the front of the next page.
894
 */
895 896
static void
ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
897
					size_t length)
898
{
899
	struct ceph_msg_data *data = cursor->data;
900 901 902
	struct ceph_pagelist *pagelist;
	struct page *page;

903
	BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
904 905 906

	pagelist = data->pagelist;
	BUG_ON(!pagelist);
907 908

	if (!length)
909 910 911 912 913
		return;		/* pagelist can be assigned but empty */

	BUG_ON(list_empty(&pagelist->head));
	page = list_first_entry(&pagelist->head, struct page, lru);

914
	cursor->resid = min(length, pagelist->length);
915 916 917 918
	cursor->page = page;
	cursor->offset = 0;
}

919 920 921
static struct page *
ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
				size_t *page_offset, size_t *length)
922
{
923
	struct ceph_msg_data *data = cursor->data;
924 925 926 927 928 929 930 931
	struct ceph_pagelist *pagelist;

	BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);

	pagelist = data->pagelist;
	BUG_ON(!pagelist);

	BUG_ON(!cursor->page);
932
	BUG_ON(cursor->offset + cursor->resid != pagelist->length);
933

934
	/* offset of first page in pagelist is always 0 */
935
	*page_offset = cursor->offset & ~PAGE_MASK;
936
	*length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset);
937
	return cursor->page;
938 939
}

940
static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
941
						size_t bytes)
942
{
943
	struct ceph_msg_data *data = cursor->data;
944 945 946 947 948 949
	struct ceph_pagelist *pagelist;

	BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);

	pagelist = data->pagelist;
	BUG_ON(!pagelist);
950 951

	BUG_ON(cursor->offset + cursor->resid != pagelist->length);
952 953 954 955
	BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);

	/* Advance the cursor offset */

956
	cursor->resid -= bytes;
957
	cursor->offset += bytes;
958
	/* offset of first page in pagelist is always 0 */
959 960 961
	if (!bytes || cursor->offset & ~PAGE_MASK)
		return false;	/* more bytes to process in the current page */

962 963 964
	if (!cursor->resid)
		return false;   /* no more data */

965 966 967
	/* Move on to the next page */

	BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
968
	cursor->page = list_next_entry(cursor->page, lru);
969 970 971
	return true;
}

972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor,
					   size_t length)
{
	struct ceph_msg_data *data = cursor->data;

	cursor->iov_iter = data->iter;
	cursor->lastlen = 0;
	iov_iter_truncate(&cursor->iov_iter, length);
	cursor->resid = iov_iter_count(&cursor->iov_iter);
}

static struct page *ceph_msg_data_iter_next(struct ceph_msg_data_cursor *cursor,
					    size_t *page_offset, size_t *length)
{
	struct page *page;
	ssize_t len;

	if (cursor->lastlen)
		iov_iter_revert(&cursor->iov_iter, cursor->lastlen);

	len = iov_iter_get_pages2(&cursor->iov_iter, &page, PAGE_SIZE,
				  1, page_offset);
	BUG_ON(len < 0);

	cursor->lastlen = len;

	/*
	 * FIXME: The assumption is that the pages represented by the iov_iter
	 *	  are pinned, with the references held by the upper-level
	 *	  callers, or by virtue of being under writeback. Eventually,
	 *	  we'll get an iov_iter_get_pages2 variant that doesn't take
	 *	  page refs. Until then, just put the page ref.
	 */
	VM_BUG_ON_PAGE(!PageWriteback(page) && page_count(page) < 2, page);
	put_page(page);

	*length = min_t(size_t, len, cursor->resid);
	return page;
}

static bool ceph_msg_data_iter_advance(struct ceph_msg_data_cursor *cursor,
				       size_t bytes)
{
	BUG_ON(bytes > cursor->resid);
	cursor->resid -= bytes;

	if (bytes < cursor->lastlen) {
		cursor->lastlen -= bytes;
	} else {
		iov_iter_advance(&cursor->iov_iter, bytes - cursor->lastlen);
		cursor->lastlen = 0;
	}

	return cursor->resid;
}

1028 1029 1030 1031 1032 1033 1034 1035
/*
 * Message data is handled (sent or received) in pieces, where each
 * piece resides on a single page.  The network layer might not
 * consume an entire piece at once.  A data item's cursor keeps
 * track of which piece is next to process and how much remains to
 * be processed in that piece.  It also tracks whether the current
 * piece is the last one in the data item.
 */
1036
static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
1037
{
1038
	size_t length = cursor->total_resid;
1039 1040

	switch (cursor->data->type) {
1041
	case CEPH_MSG_DATA_PAGELIST:
1042
		ceph_msg_data_pagelist_cursor_init(cursor, length);
1043
		break;
1044
	case CEPH_MSG_DATA_PAGES:
1045
		ceph_msg_data_pages_cursor_init(cursor, length);
1046
		break;
1047 1048
#ifdef CONFIG_BLOCK
	case CEPH_MSG_DATA_BIO:
1049
		ceph_msg_data_bio_cursor_init(cursor, length);
1050
		break;
1051
#endif /* CONFIG_BLOCK */
1052 1053 1054
	case CEPH_MSG_DATA_BVECS:
		ceph_msg_data_bvecs_cursor_init(cursor, length);
		break;
1055 1056 1057
	case CEPH_MSG_DATA_ITER:
		ceph_msg_data_iter_cursor_init(cursor, length);
		break;
1058
	case CEPH_MSG_DATA_NONE:
1059 1060 1061 1062
	default:
		/* BUG(); */
		break;
	}
1063
	cursor->need_crc = true;
1064 1065
}

1066 1067
void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
			       struct ceph_msg *msg, size_t length)
1068 1069 1070
{
	BUG_ON(!length);
	BUG_ON(length > msg->data_length);
1071
	BUG_ON(!msg->num_data_items);
1072 1073

	cursor->total_resid = length;
1074
	cursor->data = msg->data;
1075
	cursor->sr_resid = 0;
1076 1077 1078 1079

	__ceph_msg_data_cursor_init(cursor);
}

1080 1081 1082 1083 1084
/*
 * Return the page containing the next piece to process for a given
 * data item, and supply the page offset and length of that piece.
 * Indicate whether this is the last piece in this data item.
 */
1085
struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
1086
				size_t *page_offset, size_t *length)
1087 1088 1089
{
	struct page *page;

1090
	switch (cursor->data->type) {
1091
	case CEPH_MSG_DATA_PAGELIST:
1092
		page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
1093
		break;
1094
	case CEPH_MSG_DATA_PAGES:
1095
		page = ceph_msg_data_pages_next(cursor, page_offset, length);
1096
		break;
1097 1098
#ifdef CONFIG_BLOCK
	case CEPH_MSG_DATA_BIO:
1099
		page = ceph_msg_data_bio_next(cursor, page_offset, length);
1100
		break;
1101
#endif /* CONFIG_BLOCK */
1102 1103 1104
	case CEPH_MSG_DATA_BVECS:
		page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
		break;
1105 1106 1107
	case CEPH_MSG_DATA_ITER:
		page = ceph_msg_data_iter_next(cursor, page_offset, length);
		break;
1108
	case CEPH_MSG_DATA_NONE:
1109 1110 1111 1112
	default:
		page = NULL;
		break;
	}
1113

1114 1115 1116
	BUG_ON(!page);
	BUG_ON(*page_offset + *length > PAGE_SIZE);
	BUG_ON(!*length);
1117
	BUG_ON(*length > cursor->resid);
1118 1119 1120 1121 1122 1123 1124 1125

	return page;
}

/*
 * Returns true if the result moves the cursor on to the next piece
 * of the data item.
 */
1126
void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
1127 1128 1129
{
	bool new_piece;

1130
	BUG_ON(bytes > cursor->resid);
1131
	switch (cursor->data->type) {
1132
	case CEPH_MSG_DATA_PAGELIST:
1133
		new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
1134
		break;
1135
	case CEPH_MSG_DATA_PAGES:
1136
		new_piece = ceph_msg_data_pages_advance(cursor, bytes);
1137
		break;
1138 1139
#ifdef CONFIG_BLOCK
	case CEPH_MSG_DATA_BIO:
1140
		new_piece = ceph_msg_data_bio_advance(cursor, bytes);
1141
		break;
1142
#endif /* CONFIG_BLOCK */
1143 1144 1145
	case CEPH_MSG_DATA_BVECS:
		new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
		break;
1146 1147 1148
	case CEPH_MSG_DATA_ITER:
		new_piece = ceph_msg_data_iter_advance(cursor, bytes);
		break;
1149
	case CEPH_MSG_DATA_NONE:
1150 1151 1152 1153
	default:
		BUG();
		break;
	}
1154
	cursor->total_resid -= bytes;
1155

1156
	if (!cursor->resid && cursor->total_resid) {
1157
		cursor->data++;
1158
		__ceph_msg_data_cursor_init(cursor);
Alex Elder's avatar
Alex Elder committed
1159
		new_piece = true;
1160
	}
Alex Elder's avatar
Alex Elder committed
1161
	cursor->need_crc = new_piece;
1162 1163
}

1164 1165
u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
		     unsigned int length)
1166 1167 1168 1169 1170 1171 1172 1173 1174 1175
{
	char *kaddr;

	kaddr = kmap(page);
	BUG_ON(kaddr == NULL);
	crc = crc32c(crc, kaddr + page_offset, length);
	kunmap(page);

	return crc;
}
Sage Weil's avatar
Sage Weil committed
1176

1177
bool ceph_addr_is_blank(const struct ceph_entity_addr *addr)
Sage Weil's avatar
Sage Weil committed
1178
{
1179 1180 1181
	struct sockaddr_storage ss = addr->in_addr; /* align */
	struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr;
	struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr;
1182

1183
	switch (ss.ss_family) {
Sage Weil's avatar
Sage Weil committed
1184
	case AF_INET:
1185
		return addr4->s_addr == htonl(INADDR_ANY);
Sage Weil's avatar
Sage Weil committed
1186
	case AF_INET6:
1187 1188 1189
		return ipv6_addr_any(addr6);
	default:
		return true;
Sage Weil's avatar
Sage Weil committed
1190 1191
	}
}
1192
EXPORT_SYMBOL(ceph_addr_is_blank);
Sage Weil's avatar
Sage Weil committed
1193

1194
int ceph_addr_port(const struct ceph_entity_addr *addr)
Sage Weil's avatar
Sage Weil committed
1195
{
1196
	switch (get_unaligned(&addr->in_addr.ss_family)) {
Sage Weil's avatar
Sage Weil committed
1197
	case AF_INET:
1198
		return ntohs(get_unaligned(&((struct sockaddr_in *)&addr->in_addr)->sin_port));
Sage Weil's avatar
Sage Weil committed
1199
	case AF_INET6:
1200
		return ntohs(get_unaligned(&((struct sockaddr_in6 *)&addr->in_addr)->sin6_port));
Sage Weil's avatar
Sage Weil committed
1201 1202 1203 1204
	}
	return 0;
}

1205
void ceph_addr_set_port(struct ceph_entity_addr *addr, int p)
Sage Weil's avatar
Sage Weil committed
1206
{
1207
	switch (get_unaligned(&addr->in_addr.ss_family)) {
Sage Weil's avatar
Sage Weil committed
1208
	case AF_INET:
1209
		put_unaligned(htons(p), &((struct sockaddr_in *)&addr->in_addr)->sin_port);
1210
		break;
Sage Weil's avatar
Sage Weil committed
1211
	case AF_INET6:
1212
		put_unaligned(htons(p), &((struct sockaddr_in6 *)&addr->in_addr)->sin6_port);
1213
		break;
Sage Weil's avatar
Sage Weil committed
1214 1215 1216
	}
}

Noah Watkins's avatar
Noah Watkins committed
1217 1218 1219
/*
 * Unlike other *_pton function semantics, zero indicates success.
 */
1220
static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr,
Noah Watkins's avatar
Noah Watkins committed
1221 1222
		char delim, const char **ipend)
{
1223
	memset(&addr->in_addr, 0, sizeof(addr->in_addr));
Noah Watkins's avatar
Noah Watkins committed
1224

1225 1226
	if (in4_pton(str, len, (u8 *)&((struct sockaddr_in *)&addr->in_addr)->sin_addr.s_addr, delim, ipend)) {
		put_unaligned(AF_INET, &addr->in_addr.ss_family);
Noah Watkins's avatar
Noah Watkins committed
1227 1228 1229
		return 0;
	}

1230 1231
	if (in6_pton(str, len, (u8 *)&((struct sockaddr_in6 *)&addr->in_addr)->sin6_addr.s6_addr, delim, ipend)) {
		put_unaligned(AF_INET6, &addr->in_addr.ss_family);
Noah Watkins's avatar
Noah Watkins committed
1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
		return 0;
	}

	return -EINVAL;
}

/*
 * Extract hostname string and resolve using kernel DNS facility.
 */
#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
static int ceph_dns_resolve_name(const char *name, size_t namelen,
1243
		struct ceph_entity_addr *addr, char delim, const char **ipend)
Noah Watkins's avatar
Noah Watkins committed
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269
{
	const char *end, *delim_p;
	char *colon_p, *ip_addr = NULL;
	int ip_len, ret;

	/*
	 * The end of the hostname occurs immediately preceding the delimiter or
	 * the port marker (':') where the delimiter takes precedence.
	 */
	delim_p = memchr(name, delim, namelen);
	colon_p = memchr(name, ':', namelen);

	if (delim_p && colon_p)
		end = delim_p < colon_p ? delim_p : colon_p;
	else if (!delim_p && colon_p)
		end = colon_p;
	else {
		end = delim_p;
		if (!end) /* case: hostname:/ */
			end = name + namelen;
	}

	if (end <= name)
		return -EINVAL;

	/* do dns_resolve upcall */
1270 1271
	ip_len = dns_query(current->nsproxy->net_ns,
			   NULL, name, end - name, NULL, &ip_addr, NULL, false);
Noah Watkins's avatar
Noah Watkins committed
1272
	if (ip_len > 0)
1273
		ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL);
Noah Watkins's avatar
Noah Watkins committed
1274 1275 1276 1277 1278 1279 1280 1281
	else
		ret = -ESRCH;

	kfree(ip_addr);

	*ipend = end;

	pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
1282
			ret, ret ? "failed" : ceph_pr_addr(addr));
Noah Watkins's avatar
Noah Watkins committed
1283 1284 1285 1286 1287

	return ret;
}
#else
static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
1288
		struct ceph_entity_addr *addr, char delim, const char **ipend)
Noah Watkins's avatar
Noah Watkins committed
1289 1290 1291 1292 1293 1294 1295 1296 1297 1298
{
	return -EINVAL;
}
#endif

/*
 * Parse a server name (IP or hostname). If a valid IP address is not found
 * then try to extract a hostname to resolve using userspace DNS upcall.
 */
static int ceph_parse_server_name(const char *name, size_t namelen,
1299
		struct ceph_entity_addr *addr, char delim, const char **ipend)
Noah Watkins's avatar
Noah Watkins committed
1300 1301 1302
{
	int ret;

1303
	ret = ceph_pton(name, namelen, addr, delim, ipend);
Noah Watkins's avatar
Noah Watkins committed
1304
	if (ret)
1305
		ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend);
Noah Watkins's avatar
Noah Watkins committed
1306 1307 1308 1309

	return ret;
}

Sage Weil's avatar
Sage Weil committed
1310 1311 1312 1313 1314 1315
/*
 * Parse an ip[:port] list into an addr array.  Use the default
 * monitor port if a port isn't specified.
 */
int ceph_parse_ips(const char *c, const char *end,
		   struct ceph_entity_addr *addr,
1316
		   int max_count, int *count, char delim)
Sage Weil's avatar
Sage Weil committed
1317
{
Noah Watkins's avatar
Noah Watkins committed
1318
	int i, ret = -EINVAL;
Sage Weil's avatar
Sage Weil committed
1319 1320 1321 1322
	const char *p = c;

	dout("parse_ips on '%.*s'\n", (int)(end-c), c);
	for (i = 0; i < max_count; i++) {
1323
		char cur_delim = delim;
Sage Weil's avatar
Sage Weil committed
1324 1325
		const char *ipend;
		int port;
1326 1327

		if (*p == '[') {
1328
			cur_delim = ']';
1329 1330
			p++;
		}
Sage Weil's avatar
Sage Weil committed
1331

1332 1333
		ret = ceph_parse_server_name(p, end - p, &addr[i], cur_delim,
					     &ipend);
Noah Watkins's avatar
Noah Watkins committed
1334
		if (ret)
Sage Weil's avatar
Sage Weil committed
1335
			goto bad;
Noah Watkins's avatar
Noah Watkins committed
1336 1337
		ret = -EINVAL;

Sage Weil's avatar
Sage Weil committed
1338 1339
		p = ipend;

1340
		if (cur_delim == ']') {
1341 1342 1343 1344 1345 1346 1347
			if (*p != ']') {
				dout("missing matching ']'\n");
				goto bad;
			}
			p++;
		}

Sage Weil's avatar
Sage Weil committed
1348 1349 1350 1351 1352 1353 1354 1355
		/* port? */
		if (p < end && *p == ':') {
			port = 0;
			p++;
			while (p < end && *p >= '0' && *p <= '9') {
				port = (port * 10) + (*p - '0');
				p++;
			}
1356 1357 1358
			if (port == 0)
				port = CEPH_MON_PORT;
			else if (port > 65535)
Sage Weil's avatar
Sage Weil committed
1359 1360 1361 1362 1363
				goto bad;
		} else {
			port = CEPH_MON_PORT;
		}

1364
		ceph_addr_set_port(&addr[i], port);
1365 1366 1367 1368 1369 1370 1371 1372
		/*
		 * We want the type to be set according to ms_mode
		 * option, but options are normally parsed after mon
		 * addresses.  Rather than complicating parsing, set
		 * to LEGACY and override in build_initial_monmap()
		 * for mon addresses and ceph_messenger_init() for
		 * ip option.
		 */
1373
		addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
1374
		addr[i].nonce = 0;
Sage Weil's avatar
Sage Weil committed
1375

1376
		dout("%s got %s\n", __func__, ceph_pr_addr(&addr[i]));
Sage Weil's avatar
Sage Weil committed
1377 1378 1379

		if (p == end)
			break;
1380
		if (*p != delim)
Sage Weil's avatar
Sage Weil committed
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392
			goto bad;
		p++;
	}

	if (p != end)
		goto bad;

	if (count)
		*count = i + 1;
	return 0;

bad:
Noah Watkins's avatar
Noah Watkins committed
1393
	return ret;
Sage Weil's avatar
Sage Weil committed
1394 1395 1396 1397 1398 1399 1400
}

/*
 * Process message.  This happens in the worker thread.  The callback should
 * be careful not to do anything that waits on other incoming messages or it
 * may deadlock.
 */
1401
void ceph_con_process_message(struct ceph_connection *con)
Sage Weil's avatar
Sage Weil committed
1402
{
1403
	struct ceph_msg *msg = con->in_msg;
Sage Weil's avatar
Sage Weil committed
1404

1405
	BUG_ON(con->in_msg->con != con);
Sage Weil's avatar
Sage Weil committed
1406 1407 1408 1409
	con->in_msg = NULL;

	/* if first message, set peer_name */
	if (con->peer_name.type == 0)
1410
		con->peer_name = msg->hdr.src;
Sage Weil's avatar
Sage Weil committed
1411 1412

	con->in_seq++;
1413
	mutex_unlock(&con->mutex);
Sage Weil's avatar
Sage Weil committed
1414

1415
	dout("===== %p %llu from %s%lld %d=%s len %d+%d+%d (%u %u %u) =====\n",
Sage Weil's avatar
Sage Weil committed
1416
	     msg, le64_to_cpu(msg->hdr.seq),
1417
	     ENTITY_NAME(msg->hdr.src),
Sage Weil's avatar
Sage Weil committed
1418 1419 1420
	     le16_to_cpu(msg->hdr.type),
	     ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
	     le32_to_cpu(msg->hdr.front_len),
1421
	     le32_to_cpu(msg->hdr.middle_len),
Sage Weil's avatar
Sage Weil committed
1422 1423 1424
	     le32_to_cpu(msg->hdr.data_len),
	     con->in_front_crc, con->in_middle_crc, con->in_data_crc);
	con->ops->dispatch(con, msg);
1425 1426

	mutex_lock(&con->mutex);
Sage Weil's avatar
Sage Weil committed
1427 1428 1429
}

/*
1430 1431 1432
 * Atomically queue work on a connection after the specified delay.
 * Bump @con reference to avoid races with connection teardown.
 * Returns 0 if work was queued, or an error code otherwise.
Sage Weil's avatar
Sage Weil committed
1433
 */
1434
static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
Sage Weil's avatar
Sage Weil committed
1435 1436
{
	if (!con->ops->get(con)) {
1437 1438
		dout("%s %p ref count 0\n", __func__, con);
		return -ENOENT;
Sage Weil's avatar
Sage Weil committed
1439 1440
	}

1441 1442 1443
	if (delay >= HZ)
		delay = round_jiffies_relative(delay);

1444
	dout("%s %p %lu\n", __func__, con, delay);
1445 1446
	if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
		dout("%s %p - already queued\n", __func__, con);
Sage Weil's avatar
Sage Weil committed
1447
		con->ops->put(con);
1448
		return -EBUSY;
Sage Weil's avatar
Sage Weil committed
1449
	}
1450 1451 1452 1453 1454 1455 1456

	return 0;
}

static void queue_con(struct ceph_connection *con)
{
	(void) queue_con_delay(con, 0);
Sage Weil's avatar
Sage Weil committed
1457 1458
}

1459 1460 1461 1462 1463 1464 1465 1466
static void cancel_con(struct ceph_connection *con)
{
	if (cancel_delayed_work(&con->work)) {
		dout("%s %p\n", __func__, con);
		con->ops->put(con);
	}
}

1467 1468
static bool con_sock_closed(struct ceph_connection *con)
{
1469
	if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED))
1470 1471 1472
		return false;

#define CASE(x)								\
1473
	case CEPH_CON_S_ ## x:						\
1474 1475 1476 1477 1478 1479
		con->error_msg = "socket closed (con state " #x ")";	\
		break;

	switch (con->state) {
	CASE(CLOSED);
	CASE(PREOPEN);
1480 1481
	CASE(V1_BANNER);
	CASE(V1_CONNECT_MSG);
1482 1483 1484 1485 1486 1487 1488
	CASE(V2_BANNER_PREFIX);
	CASE(V2_BANNER_PAYLOAD);
	CASE(V2_HELLO);
	CASE(V2_AUTH);
	CASE(V2_AUTH_SIGNATURE);
	CASE(V2_SESSION_CONNECT);
	CASE(V2_SESSION_RECONNECT);
1489 1490 1491 1492 1493 1494 1495 1496 1497 1498
	CASE(OPEN);
	CASE(STANDBY);
	default:
		BUG();
	}
#undef CASE

	return true;
}

1499 1500 1501 1502
static bool con_backoff(struct ceph_connection *con)
{
	int ret;

1503
	if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF))
1504 1505
		return false;

1506
	ret = queue_con_delay(con, con->delay);
1507 1508 1509 1510
	if (ret) {
		dout("%s: con %p FAILED to back off %lu\n", __func__,
			con, con->delay);
		BUG_ON(ret == -ENOENT);
1511
		ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
1512 1513 1514 1515 1516
	}

	return true;
}

1517 1518 1519 1520
/* Finish fault handling; con->mutex must *not* be held here */

static void con_fault_finish(struct ceph_connection *con)
{
1521 1522
	dout("%s %p\n", __func__, con);

1523 1524 1525 1526
	/*
	 * in case we faulted due to authentication, invalidate our
	 * current tickets so that we can get new ones.
	 */
1527 1528
	if (con->v1.auth_retry) {
		dout("auth_retry %d, invalidating\n", con->v1.auth_retry);
1529 1530
		if (con->ops->invalidate_authorizer)
			con->ops->invalidate_authorizer(con);
1531
		con->v1.auth_retry = 0;
1532 1533 1534 1535 1536 1537
	}

	if (con->ops->fault)
		con->ops->fault(con);
}

Sage Weil's avatar
Sage Weil committed
1538 1539 1540
/*
 * Do some work on a connection.  Drop a connection ref when we're done.
 */
1541
static void ceph_con_workfn(struct work_struct *work)
Sage Weil's avatar
Sage Weil committed
1542 1543 1544
{
	struct ceph_connection *con = container_of(work, struct ceph_connection,
						   work.work);
1545
	bool fault;
Sage Weil's avatar
Sage Weil committed
1546

Sage Weil's avatar
Sage Weil committed
1547
	mutex_lock(&con->mutex);
1548 1549
	while (true) {
		int ret;
Sage Weil's avatar
Sage Weil committed
1550

1551 1552 1553 1554 1555 1556 1557 1558
		if ((fault = con_sock_closed(con))) {
			dout("%s: con %p SOCK_CLOSED\n", __func__, con);
			break;
		}
		if (con_backoff(con)) {
			dout("%s: con %p BACKOFF\n", __func__, con);
			break;
		}
1559
		if (con->state == CEPH_CON_S_STANDBY) {
1560 1561 1562
			dout("%s: con %p STANDBY\n", __func__, con);
			break;
		}
1563
		if (con->state == CEPH_CON_S_CLOSED) {
1564 1565 1566 1567
			dout("%s: con %p CLOSED\n", __func__, con);
			BUG_ON(con->sock);
			break;
		}
1568
		if (con->state == CEPH_CON_S_PREOPEN) {
1569 1570 1571
			dout("%s: con %p PREOPEN\n", __func__, con);
			BUG_ON(con->sock);
		}
1572

1573 1574 1575 1576
		if (ceph_msgr2(from_msgr(con->msgr)))
			ret = ceph_con_v2_try_read(con);
		else
			ret = ceph_con_v1_try_read(con);
1577 1578 1579
		if (ret < 0) {
			if (ret == -EAGAIN)
				continue;
1580 1581
			if (!con->error_msg)
				con->error_msg = "socket error on read";
1582 1583 1584 1585
			fault = true;
			break;
		}

1586 1587 1588 1589
		if (ceph_msgr2(from_msgr(con->msgr)))
			ret = ceph_con_v2_try_write(con);
		else
			ret = ceph_con_v1_try_write(con);
1590 1591 1592
		if (ret < 0) {
			if (ret == -EAGAIN)
				continue;
1593 1594
			if (!con->error_msg)
				con->error_msg = "socket error on write";
1595 1596 1597 1598
			fault = true;
		}

		break;	/* If we make it to here, we're done */
1599
	}
1600 1601
	if (fault)
		con_fault(con);
Sage Weil's avatar
Sage Weil committed
1602
	mutex_unlock(&con->mutex);
1603

1604 1605 1606 1607
	if (fault)
		con_fault_finish(con);

	con->ops->put(con);
Sage Weil's avatar
Sage Weil committed
1608 1609 1610 1611 1612 1613
}

/*
 * Generic error/fault handler.  A retry mechanism is used with
 * exponential backoff
 */
1614
static void con_fault(struct ceph_connection *con)
Sage Weil's avatar
Sage Weil committed
1615
{
1616
	dout("fault %p state %d to peer %s\n",
1617
	     con, con->state, ceph_pr_addr(&con->peer_addr));
Sage Weil's avatar
Sage Weil committed
1618

1619
	pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
1620
		ceph_pr_addr(&con->peer_addr), con->error_msg);
1621 1622
	con->error_msg = NULL;

1623 1624
	WARN_ON(con->state == CEPH_CON_S_STANDBY ||
		con->state == CEPH_CON_S_CLOSED);
1625

1626
	ceph_con_reset_protocol(con);
Sage Weil's avatar
Sage Weil committed
1627

1628
	if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
1629
		dout("fault on LOSSYTX channel, marking CLOSED\n");
1630
		con->state = CEPH_CON_S_CLOSED;
1631
		return;
1632 1633
	}

1634 1635
	/* Requeue anything that hasn't been acked */
	list_splice_init(&con->out_sent, &con->out_queue);
1636

Sage Weil's avatar
Sage Weil committed
1637 1638 1639
	/* If there are no messages queued or keepalive pending, place
	 * the connection in a STANDBY state */
	if (list_empty(&con->out_queue) &&
1640
	    !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
1641
		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
1642
		ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1643
		con->state = CEPH_CON_S_STANDBY;
1644 1645
	} else {
		/* retry after a delay. */
1646
		con->state = CEPH_CON_S_PREOPEN;
1647
		if (!con->delay) {
1648
			con->delay = BASE_DELAY_INTERVAL;
1649
		} else if (con->delay < MAX_DELAY_INTERVAL) {
1650
			con->delay *= 2;
1651 1652 1653
			if (con->delay > MAX_DELAY_INTERVAL)
				con->delay = MAX_DELAY_INTERVAL;
		}
1654
		ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
1655
		queue_con(con);
Sage Weil's avatar
Sage Weil committed
1656 1657 1658
	}
}

1659 1660 1661 1662
void ceph_messenger_reset_nonce(struct ceph_messenger *msgr)
{
	u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000;
	msgr->inst.addr.nonce = cpu_to_le32(nonce);
1663
	ceph_encode_my_addr(msgr);
1664
}
Sage Weil's avatar
Sage Weil committed
1665 1666

/*
1667
 * initialize a new messenger instance
Sage Weil's avatar
Sage Weil committed
1668
 */
1669
void ceph_messenger_init(struct ceph_messenger *msgr,
1670
			 struct ceph_entity_addr *myaddr)
Sage Weil's avatar
Sage Weil committed
1671 1672 1673
{
	spin_lock_init(&msgr->global_seq_lock);

1674 1675 1676
	if (myaddr) {
		memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr,
		       sizeof(msgr->inst.addr.in_addr));
1677
		ceph_addr_set_port(&msgr->inst.addr, 0);
1678
	}
Sage Weil's avatar
Sage Weil committed
1679

1680 1681 1682 1683 1684
	/*
	 * Since nautilus, clients are identified using type ANY.
	 * For msgr1, ceph_encode_banner_addr() munges it to NONE.
	 */
	msgr->inst.addr.type = CEPH_ENTITY_ADDR_TYPE_ANY;
1685 1686 1687 1688 1689 1690

	/* generate a random non-zero nonce */
	do {
		get_random_bytes(&msgr->inst.addr.nonce,
				 sizeof(msgr->inst.addr.nonce));
	} while (!msgr->inst.addr.nonce);
1691
	ceph_encode_my_addr(msgr);
Sage Weil's avatar
Sage Weil committed
1692

1693
	atomic_set(&msgr->stopping, 0);
1694
	write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
Sage Weil's avatar
Sage Weil committed
1695

1696
	dout("%s %p\n", __func__, msgr);
Sage Weil's avatar
Sage Weil committed
1697 1698
}

1699 1700 1701 1702 1703
void ceph_messenger_fini(struct ceph_messenger *msgr)
{
	put_net(read_pnet(&msgr->net));
}

1704 1705 1706 1707 1708 1709 1710 1711 1712
static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
{
	if (msg->con)
		msg->con->ops->put(msg->con);

	msg->con = con ? con->ops->get(con) : NULL;
	BUG_ON(msg->con != con);
}

1713 1714 1715
static void clear_standby(struct ceph_connection *con)
{
	/* come back from STANDBY? */
1716
	if (con->state == CEPH_CON_S_STANDBY) {
1717
		dout("clear_standby %p and ++connect_seq\n", con);
1718
		con->state = CEPH_CON_S_PREOPEN;
1719
		con->v1.connect_seq++;
1720 1721
		WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
		WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
1722 1723 1724
	}
}

Sage Weil's avatar
Sage Weil committed
1725 1726
/*
 * Queue up an outgoing message on the given connection.
1727 1728
 *
 * Consumes a ref on @msg.
Sage Weil's avatar
Sage Weil committed
1729 1730 1731 1732
 */
void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
{
	/* set src+dst */
1733
	msg->hdr.src = con->msgr->inst.name;
1734
	BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
1735 1736
	msg->needs_out_seq = true;

1737
	mutex_lock(&con->mutex);
1738

1739
	if (con->state == CEPH_CON_S_CLOSED) {
1740 1741 1742 1743 1744 1745
		dout("con_send %p closed, dropping %p\n", con, msg);
		ceph_msg_put(msg);
		mutex_unlock(&con->mutex);
		return;
	}

1746
	msg_con_set(msg, con);
1747

Sage Weil's avatar
Sage Weil committed
1748 1749 1750 1751 1752 1753 1754 1755
	BUG_ON(!list_empty(&msg->list_head));
	list_add_tail(&msg->list_head, &con->out_queue);
	dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
	     ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
	     ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
	     le32_to_cpu(msg->hdr.front_len),
	     le32_to_cpu(msg->hdr.middle_len),
	     le32_to_cpu(msg->hdr.data_len));
1756 1757

	clear_standby(con);
1758
	mutex_unlock(&con->mutex);
Sage Weil's avatar
Sage Weil committed
1759 1760 1761

	/* if there wasn't anything waiting to send before, queue
	 * new work */
1762
	if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
Sage Weil's avatar
Sage Weil committed
1763 1764
		queue_con(con);
}
1765
EXPORT_SYMBOL(ceph_con_send);
Sage Weil's avatar
Sage Weil committed
1766 1767 1768 1769

/*
 * Revoke a message that was previously queued for send
 */
1770
void ceph_msg_revoke(struct ceph_msg *msg)
Sage Weil's avatar
Sage Weil committed
1771
{
1772 1773
	struct ceph_connection *con = msg->con;

1774 1775
	if (!con) {
		dout("%s msg %p null con\n", __func__, msg);
1776
		return;		/* Message not in our possession */
1777
	}
1778

1779
	mutex_lock(&con->mutex);
1780 1781 1782 1783 1784
	if (list_empty(&msg->list_head)) {
		WARN_ON(con->out_msg == msg);
		dout("%s con %p msg %p not linked\n", __func__, con, msg);
		mutex_unlock(&con->mutex);
		return;
Sage Weil's avatar
Sage Weil committed
1785
	}
1786

1787 1788 1789 1790 1791 1792 1793
	dout("%s con %p msg %p was linked\n", __func__, con, msg);
	msg->hdr.seq = 0;
	ceph_msg_remove(msg);

	if (con->out_msg == msg) {
		WARN_ON(con->state != CEPH_CON_S_OPEN);
		dout("%s con %p msg %p was sending\n", __func__, con, msg);
1794 1795 1796 1797
		if (ceph_msgr2(from_msgr(con->msgr)))
			ceph_con_v2_revoke(con);
		else
			ceph_con_v1_revoke(con);
1798
		ceph_msg_put(con->out_msg);
1799
		con->out_msg = NULL;
1800 1801 1802
	} else {
		dout("%s con %p msg %p not current, out_msg %p\n", __func__,
		     con, msg, con->out_msg);
Sage Weil's avatar
Sage Weil committed
1803
	}
1804
	mutex_unlock(&con->mutex);
Sage Weil's avatar
Sage Weil committed
1805 1806
}

1807
/*
1808
 * Revoke a message that we may be reading data into
1809
 */
1810
void ceph_msg_revoke_incoming(struct ceph_msg *msg)
1811
{
1812
	struct ceph_connection *con = msg->con;
1813

1814
	if (!con) {
1815 1816 1817 1818
		dout("%s msg %p null con\n", __func__, msg);
		return;		/* Message not in our possession */
	}

1819
	mutex_lock(&con->mutex);
1820
	if (con->in_msg == msg) {
1821 1822
		WARN_ON(con->state != CEPH_CON_S_OPEN);
		dout("%s con %p msg %p was recving\n", __func__, con, msg);
1823 1824 1825 1826
		if (ceph_msgr2(from_msgr(con->msgr)))
			ceph_con_v2_revoke_incoming(con);
		else
			ceph_con_v1_revoke_incoming(con);
1827 1828 1829
		ceph_msg_put(con->in_msg);
		con->in_msg = NULL;
	} else {
1830 1831
		dout("%s con %p msg %p not current, in_msg %p\n", __func__,
		     con, msg, con->in_msg);
1832 1833 1834 1835
	}
	mutex_unlock(&con->mutex);
}

Sage Weil's avatar
Sage Weil committed
1836 1837 1838 1839 1840
/*
 * Queue a keepalive byte to ensure the tcp connection is alive.
 */
void ceph_con_keepalive(struct ceph_connection *con)
{
1841
	dout("con_keepalive %p\n", con);
1842
	mutex_lock(&con->mutex);
1843
	clear_standby(con);
1844
	ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
1845
	mutex_unlock(&con->mutex);
1846

1847
	if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
Sage Weil's avatar
Sage Weil committed
1848 1849
		queue_con(con);
}
1850
EXPORT_SYMBOL(ceph_con_keepalive);
Sage Weil's avatar
Sage Weil committed
1851

1852 1853 1854 1855 1856
bool ceph_con_keepalive_expired(struct ceph_connection *con,
			       unsigned long interval)
{
	if (interval > 0 &&
	    (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
1857 1858 1859 1860 1861 1862
		struct timespec64 now;
		struct timespec64 ts;
		ktime_get_real_ts64(&now);
		jiffies_to_timespec64(interval, &ts);
		ts = timespec64_add(con->last_keepalive_ack, ts);
		return timespec64_compare(&now, &ts) >= 0;
1863 1864 1865 1866
	}
	return false;
}

1867
static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
Alex Elder's avatar
Alex Elder committed
1868
{
1869 1870
	BUG_ON(msg->num_data_items >= msg->max_data_items);
	return &msg->data[msg->num_data_items++];
1871 1872 1873 1874
}

static void ceph_msg_data_destroy(struct ceph_msg_data *data)
{
1875 1876 1877 1878
	if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
		int num_pages = calc_pages_for(data->alignment, data->length);
		ceph_release_page_vector(data->pages, num_pages);
	} else if (data->type == CEPH_MSG_DATA_PAGELIST) {
1879
		ceph_pagelist_release(data->pagelist);
1880
	}
Alex Elder's avatar
Alex Elder committed
1881 1882
}

1883
void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
1884
			     size_t length, size_t alignment, bool own_pages)
1885
{
1886 1887
	struct ceph_msg_data *data;

1888 1889
	BUG_ON(!pages);
	BUG_ON(!length);
1890

1891 1892
	data = ceph_msg_data_add(msg);
	data->type = CEPH_MSG_DATA_PAGES;
1893 1894 1895
	data->pages = pages;
	data->length = length;
	data->alignment = alignment & ~PAGE_MASK;
1896
	data->own_pages = own_pages;
1897

1898
	msg->data_length += length;
1899
}
1900
EXPORT_SYMBOL(ceph_msg_data_add_pages);
Sage Weil's avatar
Sage Weil committed
1901

1902
void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
1903 1904
				struct ceph_pagelist *pagelist)
{
1905 1906
	struct ceph_msg_data *data;

1907 1908
	BUG_ON(!pagelist);
	BUG_ON(!pagelist->length);
1909

1910 1911
	data = ceph_msg_data_add(msg);
	data->type = CEPH_MSG_DATA_PAGELIST;
1912
	refcount_inc(&pagelist->refcnt);
1913 1914
	data->pagelist = pagelist;

1915
	msg->data_length += pagelist->length;
1916
}
1917
EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
1918

1919
#ifdef	CONFIG_BLOCK
1920 1921
void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
			   u32 length)
1922
{
1923 1924
	struct ceph_msg_data *data;

1925 1926
	data = ceph_msg_data_add(msg);
	data->type = CEPH_MSG_DATA_BIO;
1927
	data->bio_pos = *bio_pos;
Alex Elder's avatar
Alex Elder committed
1928
	data->bio_length = length;
1929

1930
	msg->data_length += length;
1931
}
1932
EXPORT_SYMBOL(ceph_msg_data_add_bio);
1933
#endif	/* CONFIG_BLOCK */
1934

1935 1936 1937 1938 1939
void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
			     struct ceph_bvec_iter *bvec_pos)
{
	struct ceph_msg_data *data;

1940 1941
	data = ceph_msg_data_add(msg);
	data->type = CEPH_MSG_DATA_BVECS;
1942 1943 1944 1945 1946 1947
	data->bvec_pos = *bvec_pos;

	msg->data_length += bvec_pos->iter.bi_size;
}
EXPORT_SYMBOL(ceph_msg_data_add_bvecs);

1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959
void ceph_msg_data_add_iter(struct ceph_msg *msg,
			    struct iov_iter *iter)
{
	struct ceph_msg_data *data;

	data = ceph_msg_data_add(msg);
	data->type = CEPH_MSG_DATA_ITER;
	data->iter = *iter;

	msg->data_length += iov_iter_count(&data->iter);
}

Sage Weil's avatar
Sage Weil committed
1960 1961 1962 1963
/*
 * construct a new message with given type, size
 * the new msg has a ref count of 1.
 */
1964 1965
struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
			       gfp_t flags, bool can_fail)
Sage Weil's avatar
Sage Weil committed
1966 1967 1968
{
	struct ceph_msg *m;

1969
	m = kmem_cache_zalloc(ceph_msg_cache, flags);
Sage Weil's avatar
Sage Weil committed
1970 1971 1972 1973
	if (m == NULL)
		goto out;

	m->hdr.type = cpu_to_le16(type);
1974
	m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
Sage Weil's avatar
Sage Weil committed
1975
	m->hdr.front_len = cpu_to_le32(front_len);
1976

1977 1978
	INIT_LIST_HEAD(&m->list_head);
	kref_init(&m->kref);
1979

Sage Weil's avatar
Sage Weil committed
1980 1981
	/* front */
	if (front_len) {
1982
		m->front.iov_base = kvmalloc(front_len, flags);
Sage Weil's avatar
Sage Weil committed
1983
		if (m->front.iov_base == NULL) {
1984
			dout("ceph_msg_new can't allocate %d bytes\n",
Sage Weil's avatar
Sage Weil committed
1985 1986 1987 1988 1989 1990
			     front_len);
			goto out2;
		}
	} else {
		m->front.iov_base = NULL;
	}
1991
	m->front_alloc_len = m->front.iov_len = front_len;
Sage Weil's avatar
Sage Weil committed
1992

1993 1994 1995 1996 1997 1998 1999 2000 2001
	if (max_data_items) {
		m->data = kmalloc_array(max_data_items, sizeof(*m->data),
					flags);
		if (!m->data)
			goto out2;

		m->max_data_items = max_data_items;
	}

Sage Weil's avatar
Sage Weil committed
2002
	dout("ceph_msg_new %p front %d\n", m, front_len);
Sage Weil's avatar
Sage Weil committed
2003 2004 2005 2006 2007
	return m;

out2:
	ceph_msg_put(m);
out:
2008 2009 2010
	if (!can_fail) {
		pr_err("msg_new can't create type %d front %d\n", type,
		       front_len);
2011
		WARN_ON(1);
2012 2013 2014 2015
	} else {
		dout("msg_new can't create type %d front %d\n", type,
		     front_len);
	}
2016
	return NULL;
Sage Weil's avatar
Sage Weil committed
2017
}
2018 2019 2020 2021 2022 2023 2024
EXPORT_SYMBOL(ceph_msg_new2);

struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
			      bool can_fail)
{
	return ceph_msg_new2(type, front_len, 0, flags, can_fail);
}
2025
EXPORT_SYMBOL(ceph_msg_new);
Sage Weil's avatar
Sage Weil committed
2026 2027 2028 2029 2030 2031 2032 2033

/*
 * Allocate "middle" portion of a message, if it is needed and wasn't
 * allocated by alloc_msg.  This allows us to read a small fixed-size
 * per-type header in the front and then gracefully fail (i.e.,
 * propagate the error to the caller based on info in the front) when
 * the middle is too large.
 */
2034
static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
Sage Weil's avatar
Sage Weil committed
2035 2036 2037 2038 2039 2040 2041 2042 2043
{
	int type = le16_to_cpu(msg->hdr.type);
	int middle_len = le32_to_cpu(msg->hdr.middle_len);

	dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
	     ceph_msg_type_name(type), middle_len);
	BUG_ON(!middle_len);
	BUG_ON(msg->middle);

2044
	msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
Sage Weil's avatar
Sage Weil committed
2045 2046 2047 2048 2049
	if (!msg->middle)
		return -ENOMEM;
	return 0;
}

2050
/*
Alex Elder's avatar
Alex Elder committed
2051 2052 2053 2054
 * Allocate a message for receiving an incoming message on a
 * connection, and save the result in con->in_msg.  Uses the
 * connection's private alloc_msg op if available.
 *
2055 2056 2057 2058 2059 2060 2061 2062 2063
 * Returns 0 on success, or a negative error code.
 *
 * On success, if we set *skip = 1:
 *  - the next message should be skipped and ignored.
 *  - con->in_msg == NULL
 * or if we set *skip = 0:
 *  - con->in_msg is non-null.
 * On error (ENOMEM, EAGAIN, ...),
 *  - con->in_msg == NULL
2064
 */
2065 2066
int ceph_con_in_msg_alloc(struct ceph_connection *con,
			  struct ceph_msg_header *hdr, int *skip)
2067 2068
{
	int middle_len = le32_to_cpu(hdr->middle_len);
2069
	struct ceph_msg *msg;
2070
	int ret = 0;
2071

Alex Elder's avatar
Alex Elder committed
2072
	BUG_ON(con->in_msg != NULL);
2073
	BUG_ON(!con->ops->alloc_msg);
2074

2075 2076 2077
	mutex_unlock(&con->mutex);
	msg = con->ops->alloc_msg(con, hdr, skip);
	mutex_lock(&con->mutex);
2078
	if (con->state != CEPH_CON_S_OPEN) {
2079
		if (msg)
2080
			ceph_msg_put(msg);
2081 2082
		return -EAGAIN;
	}
2083 2084
	if (msg) {
		BUG_ON(*skip);
2085
		msg_con_set(msg, con);
2086 2087 2088 2089 2090 2091 2092 2093 2094 2095
		con->in_msg = msg;
	} else {
		/*
		 * Null message pointer means either we should skip
		 * this message or we couldn't allocate memory.  The
		 * former is not an error.
		 */
		if (*skip)
			return 0;

2096
		con->error_msg = "error allocating memory for incoming message";
2097
		return -ENOMEM;
2098
	}
2099
	memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr));
2100

Alex Elder's avatar
Alex Elder committed
2101 2102
	if (middle_len && !con->in_msg->middle) {
		ret = ceph_alloc_middle(con, con->in_msg);
2103
		if (ret < 0) {
Alex Elder's avatar
Alex Elder committed
2104 2105
			ceph_msg_put(con->in_msg);
			con->in_msg = NULL;
2106 2107
		}
	}
2108

2109
	return ret;
2110 2111
}

2112
void ceph_con_get_out_msg(struct ceph_connection *con)
2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144
{
	struct ceph_msg *msg;

	BUG_ON(list_empty(&con->out_queue));
	msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
	WARN_ON(msg->con != con);

	/*
	 * Put the message on "sent" list using a ref from ceph_con_send().
	 * It is put when the message is acked or revoked.
	 */
	list_move_tail(&msg->list_head, &con->out_sent);

	/*
	 * Only assign outgoing seq # if we haven't sent this message
	 * yet.  If it is requeued, resend with it's original seq.
	 */
	if (msg->needs_out_seq) {
		msg->hdr.seq = cpu_to_le64(++con->out_seq);
		msg->needs_out_seq = false;

		if (con->ops->reencode_message)
			con->ops->reencode_message(msg);
	}

	/*
	 * Get a ref for out_msg.  It is put when we are done sending the
	 * message or in case of a fault.
	 */
	WARN_ON(con->out_msg);
	con->out_msg = ceph_msg_get(msg);
}
Sage Weil's avatar
Sage Weil committed
2145 2146 2147 2148

/*
 * Free a generically kmalloc'd message.
 */
2149
static void ceph_msg_free(struct ceph_msg *m)
Sage Weil's avatar
Sage Weil committed
2150
{
2151
	dout("%s %p\n", __func__, m);
Ilya Dryomov's avatar
Ilya Dryomov committed
2152
	kvfree(m->front.iov_base);
2153
	kfree(m->data);
2154
	kmem_cache_free(ceph_msg_cache, m);
Sage Weil's avatar
Sage Weil committed
2155 2156
}

2157
static void ceph_msg_release(struct kref *kref)
Sage Weil's avatar
Sage Weil committed
2158 2159
{
	struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
2160
	int i;
Sage Weil's avatar
Sage Weil committed
2161

2162
	dout("%s %p\n", __func__, m);
Sage Weil's avatar
Sage Weil committed
2163 2164
	WARN_ON(!list_empty(&m->list_head));

2165 2166
	msg_con_set(m, NULL);

Sage Weil's avatar
Sage Weil committed
2167 2168 2169 2170
	/* drop middle, data, if any */
	if (m->middle) {
		ceph_buffer_put(m->middle);
		m->middle = NULL;
Sage Weil's avatar
Sage Weil committed
2171
	}
2172

2173 2174
	for (i = 0; i < m->num_data_items; i++)
		ceph_msg_data_destroy(&m->data[i]);
2175

Sage Weil's avatar
Sage Weil committed
2176 2177 2178
	if (m->pool)
		ceph_msgpool_put(m->pool, m);
	else
2179 2180 2181 2182 2183 2184
		ceph_msg_free(m);
}

struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
{
	dout("%s %p (was %d)\n", __func__, msg,
2185
	     kref_read(&msg->kref));
2186 2187 2188 2189 2190 2191 2192 2193
	kref_get(&msg->kref);
	return msg;
}
EXPORT_SYMBOL(ceph_msg_get);

void ceph_msg_put(struct ceph_msg *msg)
{
	dout("%s %p (was %d)\n", __func__, msg,
2194
	     kref_read(&msg->kref));
2195
	kref_put(&msg->kref, ceph_msg_release);
Sage Weil's avatar
Sage Weil committed
2196
}
2197
EXPORT_SYMBOL(ceph_msg_put);
2198 2199 2200

void ceph_msg_dump(struct ceph_msg *msg)
{
2201 2202
	pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
		 msg->front_alloc_len, msg->data_length);
2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217
	print_hex_dump(KERN_DEBUG, "header: ",
		       DUMP_PREFIX_OFFSET, 16, 1,
		       &msg->hdr, sizeof(msg->hdr), true);
	print_hex_dump(KERN_DEBUG, " front: ",
		       DUMP_PREFIX_OFFSET, 16, 1,
		       msg->front.iov_base, msg->front.iov_len, true);
	if (msg->middle)
		print_hex_dump(KERN_DEBUG, "middle: ",
			       DUMP_PREFIX_OFFSET, 16, 1,
			       msg->middle->vec.iov_base,
			       msg->middle->vec.iov_len, true);
	print_hex_dump(KERN_DEBUG, "footer: ",
		       DUMP_PREFIX_OFFSET, 16, 1,
		       &msg->footer, sizeof(msg->footer), true);
}
2218
EXPORT_SYMBOL(ceph_msg_dump);