1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2009-2013 Vkontakte Ltd
18 2008-2013 Nikolai Durov
19 2008-2013 Andrey Lopatin
20
21 Copyright 2014 Telegram Messenger Inc
22 2014 Nikolai Durov
23 2014 Andrey Lopatin
24
25 Copyright 2015-2016 Telegram Messenger Inc
26 2015-2016 Vitaly Valtman
27*/
28
29#define _FILE_OFFSET_BITS 64
30
31#include <arpa/inet.h>
32#include <assert.h>
33#include <errno.h>
34#include <fcntl.h>
35#include <math.h>
36#include <netinet/in.h>
37#include <netinet/tcp.h>
38#include <pthread.h>
39#include <stddef.h>
40#include <stdio.h>
41#include <stdlib.h>
42#include <string.h>
43#include <sys/epoll.h>
44#include <sys/socket.h>
45#include <sys/uio.h>
46#include <time.h>
47#include <unistd.h>
48
49#include "crc32.h"
50#include "jobs/jobs.h"
51#include "net/net-events.h"
52//#include "net/net-buffers.h"
53#include "kprintf.h"
54#include "precise-time.h"
55#include "server-functions.h"
56#include "net/net-connections.h"
57#include "net/net-config.h"
58#include "vv/vv-io.h"
59#include "vv/vv-tree.h"
60#include "pid.h"
61#include "common/mp-queue.h"
62
63#include "net/net-msg-buffers.h"
64#include "net/net-tcp-connections.h"
65
66#include "common/common-stats.h"
67
68//struct process_id PID;
69
70#define USE_EPOLLET 1
71#define MAX_RECONNECT_INTERVAL 20
72
73#define MODULE connections
74
75static int max_accept_rate;
76static double cur_accept_rate_remaining;
77static double cur_accept_rate_time;
78static int max_connection;
79static int conn_generation;
80static int max_connection_fd = MAX_CONNECTIONS;
81
82int active_special_connections, max_special_connections = MAX_CONNECTIONS;
83
84int special_listen_sockets;
85
86static struct {
87 int fd, generation;
88} special_socket[MAX_SPECIAL_LISTEN_SOCKETS];
89
90static struct mp_queue *free_later_queue;
91
92
93MODULE_STAT_TYPE {
94int active_connections, active_dh_connections;
95int outbound_connections, active_outbound_connections, ready_outbound_connections, listening_connections;
96int allocated_outbound_connections, allocated_inbound_connections;
97int inbound_connections, active_inbound_connections;
98
99long long outbound_connections_created, inbound_connections_accepted;
100int ready_targets;
101
102long long netw_queries, netw_update_queries, total_failed_connections, total_connect_failures, unused_connections_closed;
103
104int allocated_targets, active_targets, inactive_targets, free_targets;
105int allocated_connections, allocated_socket_connections;
106long long accept_calls_failed, accept_nonblock_set_failed, accept_connection_limit_failed,
107 accept_rate_limit_failed, accept_init_accepted_failed;
108
109long long tcp_readv_calls, tcp_writev_calls, tcp_readv_intr, tcp_writev_intr;
110long long tcp_readv_bytes, tcp_writev_bytes;
111
112int free_later_size;
113long long free_later_total;
114};
115
116MODULE_INIT
117
118MODULE_STAT_FUNCTION
119 SB_SUM_ONE_I (active_connections);
120 SB_SUM_ONE_I (active_dh_connections);
121
122 SB_SUM_ONE_I (outbound_connections);
123 SB_SUM_ONE_I (ready_outbound_connections);
124 SB_SUM_ONE_I (active_outbound_connections);
125 SB_SUM_ONE_LL (outbound_connections_created);
126 SB_SUM_ONE_LL (total_connect_failures);
127
128 SB_SUM_ONE_I (inbound_connections);
129 //SB_SUM_ONE_I (ready_inbound_connections);
130 SB_SUM_ONE_I (active_inbound_connections);
131 SB_SUM_ONE_LL (inbound_connections_accepted);
132
133 SB_SUM_ONE_I (listening_connections);
134 SB_SUM_ONE_LL (unused_connections_closed);
135 SB_SUM_ONE_I (ready_targets);
136 SB_SUM_ONE_I (allocated_targets);
137 SB_SUM_ONE_I (active_targets);
138 SB_SUM_ONE_I (inactive_targets);
139 SB_SUM_ONE_I (free_targets);
140 sb_printf (sb,
141 "max_connections\t%d\n"
142 "active_special_connections\t%d\n"
143 "max_special_connections\t%d\n"
144 ,
145 max_connection_fd,
146 active_special_connections,
147 max_special_connections
148 );
149 SBP_PRINT_I32(max_accept_rate);
150 SBP_PRINT_DOUBLE(cur_accept_rate_remaining);
151 SBP_PRINT_I32(max_connection);
152 SBP_PRINT_I32(conn_generation);
153
154 SB_SUM_ONE_I (allocated_connections);
155 SB_SUM_ONE_I (allocated_outbound_connections);
156 SB_SUM_ONE_I (allocated_inbound_connections);
157 SB_SUM_ONE_I (allocated_socket_connections);
158 SB_SUM_ONE_LL (tcp_readv_calls);
159 SB_SUM_ONE_LL (tcp_readv_intr);
160 SB_SUM_ONE_LL (tcp_readv_bytes);
161 SB_SUM_ONE_LL (tcp_writev_calls);
162 SB_SUM_ONE_LL (tcp_writev_intr);
163 SB_SUM_ONE_LL (tcp_writev_bytes);
164 SB_SUM_ONE_I (free_later_size);
165 SB_SUM_ONE_LL (free_later_total);
166
167 SB_SUM_ONE_LL (accept_calls_failed);
168 SB_SUM_ONE_LL (accept_nonblock_set_failed);
169 SB_SUM_ONE_LL (accept_connection_limit_failed);
170 SB_SUM_ONE_LL (accept_rate_limit_failed);
171 SB_SUM_ONE_LL (accept_init_accepted_failed);
172MODULE_STAT_FUNCTION_END
173
174void fetch_connections_stat (struct connections_stat *st) {
175#define COLLECT_I(__x) st->__x = SB_SUM_I (__x);
176#define COLLECT_LL(__x) st->__x = SB_SUM_LL (__x);
177 COLLECT_I (active_connections);
178 COLLECT_I (active_dh_connections);
179 COLLECT_I (outbound_connections);
180 COLLECT_I (active_outbound_connections);
181 COLLECT_I (ready_outbound_connections);
182 st->max_special_connections = max_special_connections;
183 st->active_special_connections = active_special_connections;
184 COLLECT_I (allocated_connections);
185 COLLECT_I (allocated_outbound_connections);
186 COLLECT_I (allocated_inbound_connections);
187 COLLECT_I (allocated_socket_connections);
188 COLLECT_I (allocated_targets);
189 COLLECT_I (ready_targets);
190 COLLECT_I (active_targets);
191 COLLECT_I (inactive_targets);
192 COLLECT_LL (tcp_readv_calls);
193 COLLECT_LL (tcp_readv_intr);
194 COLLECT_LL (tcp_readv_bytes);
195 COLLECT_LL (tcp_writev_calls);
196 COLLECT_LL (tcp_writev_intr);
197 COLLECT_LL (tcp_writev_bytes);
198 COLLECT_LL (accept_calls_failed);
199 COLLECT_LL (accept_nonblock_set_failed);
200 COLLECT_LL (accept_rate_limit_failed);
201 COLLECT_LL (accept_init_accepted_failed);
202 COLLECT_LL (accept_connection_limit_failed);
203#undef COLLECT_I
204#undef COLLECT_LL
205}
206
207void connection_event_incref (int fd, long long val);
208
209void tcp_set_max_accept_rate (int rate) {
210 max_accept_rate = rate;
211}
212
213int set_write_timer (connection_job_t C);
214
215int prealloc_tcp_buffers (void);
216int clear_connection_write_timeout (connection_job_t c);
217
218static int tcp_recv_buffers_num;
219static int tcp_recv_buffers_total_size;
220static struct iovec tcp_recv_iovec[MAX_TCP_RECV_BUFFERS + 1];
221static struct msg_buffer *tcp_recv_buffers[MAX_TCP_RECV_BUFFERS];
222
223int prealloc_tcp_buffers (void) /* {{{ */ {
224 assert (!tcp_recv_buffers_num);
225
226 int i;
227 for (i = MAX_TCP_RECV_BUFFERS - 1; i >= 0; i--) {
228 struct msg_buffer *X = alloc_msg_buffer ((tcp_recv_buffers_num) ? tcp_recv_buffers[i + 1] : 0, TCP_RECV_BUFFER_SIZE);
229 if (!X) {
230 vkprintf (0, "**FATAL**: cannot allocate tcp receive buffer\n");
231 exit (2);
232 }
233 vkprintf (3, "allocated %d byte tcp receive buffer #%d at %p\n", X->chunk->buffer_size, i, X);
234 tcp_recv_buffers[i] = X;
235 tcp_recv_iovec[i + 1].iov_base = X->data;
236 tcp_recv_iovec[i + 1].iov_len = X->chunk->buffer_size;
237 ++ tcp_recv_buffers_num;
238 tcp_recv_buffers_total_size += X->chunk->buffer_size;
239 }
240 return tcp_recv_buffers_num;
241}
242/* }}} */
243
244int tcp_prepare_iovec (struct iovec *iov, int *iovcnt, int maxcnt, struct raw_message *raw) /* {{{ */ {
245 int t = rwm_prepare_iovec (raw, iov, maxcnt, raw->total_bytes);
246 if (t < 0) {
247 *iovcnt = maxcnt;
248 int i;
249 t = 0;
250 for (i = 0; i < maxcnt; i++) {
251 t += iov[i].iov_len;
252 }
253 assert (t < raw->total_bytes);
254 return t;
255 } else {
256 *iovcnt = t;
257 return raw->total_bytes;
258 }
259}
260/* }}} */
261
262void assert_main_thread (void) {}
263void assert_net_cpu_thread (void) {}
264void assert_net_net_thread (void) {}
265void assert_engine_thread (void) {
266 assert (this_job_thread && (this_job_thread->thread_class == JC_ENGINE || this_job_thread->thread_class == JC_MAIN));
267}
268
269socket_connection_job_t alloc_new_socket_connection (connection_job_t C);
270
271#define X_TYPE connection_job_t
272#define X_CMP(a,b) (((a) < (b)) ? -1 : ((a) > (b)) ? 1 : 0)
273#define TREE_NAME connection
274#define TREE_MALLOC
275#define TREE_PTHREAD
276#define TREE_INCREF job_incref
277#define TREE_DECREF job_decref_f
278#include "vv/vv-tree.c"
279
280static inline int connection_is_active (int flags) {
281 return (flags & C_CONNECTED) && !(flags & C_READY_PENDING);
282}
283
284/* {{{ compute_conn_events */
285#if USE_EPOLLET
286static inline int compute_conn_events (socket_connection_job_t c) {
287 unsigned flags = SOCKET_CONN_INFO(c)->flags;
288 if (flags & C_ERROR) {
289 return 0;
290 } else {
291 return EVT_READ | EVT_WRITE | EVT_SPEC;
292 }
293}
294#else
295static inline int compute_conn_events (connection_job_t c) {
296 unsigned flags = CONN_INFO(c)->flags;
297 if (flags & (C_ERROR | C_FAILED | C_NET_FAILED)) {
298 return 0;
299 }
300 return (((flags & (C_WANTRD | C_STOPREAD)) == C_WANTRD) ? EVT_READ : 0) | (flags & C_WANTWR ? EVT_WRITE : 0) | EVT_SPEC
301 | (((flags & (C_WANTRD | C_NORD)) == (C_WANTRD | C_NORD))
302 || ((flags & (C_WANTWR | C_NOWR)) == (C_WANTWR | C_NOWR)) ? EVT_LEVEL : 0);
303}
304#endif
305/* }}} */
306
307void connection_write_close (connection_job_t C) /* {{{ */ {
308 struct connection_info *c = CONN_INFO (C);
309 if (c->status == conn_working) {
310 socket_connection_job_t S = c->io_conn;
311 if (S) {
312 __sync_fetch_and_or (&SOCKET_CONN_INFO(S)->flags, C_STOPREAD);
313 }
314 __sync_fetch_and_or (&c->flags, C_STOPREAD);
315 c->status = conn_write_close;
316
317 job_signal (JOB_REF_CREATE_PASS (C), JS_RUN);
318 }
319}
320/* }}} */
321
322/* qack {{{ */
323static inline void disable_qack (int fd) {
324 vkprintf (2, "disable TCP_QUICKACK for %d\n", fd);
325 assert (setsockopt (fd, IPPROTO_TCP, TCP_QUICKACK, (int[]){0}, sizeof (int)) >= 0);
326}
327
328static inline void cond_disable_qack (socket_connection_job_t C) {
329 struct socket_connection_info *c = SOCKET_CONN_INFO (C);
330 if (c->flags & C_NOQACK) {
331 disable_qack (c->fd);
332 }
333}
334/* }}} */
335
336/* {{{ cork
337static inline void cond_reset_cork (connection_job_t c) {
338 if (c->flags & C_NOQACK) {
339 vkprintf (2, "disable TCP_CORK for %d\n", c->fd);
340 assert (setsockopt (c->fd, IPPROTO_TCP, TCP_CORK, (int[]){0}, sizeof (int)) >= 0);
341 vkprintf (2, "enable TCP_CORK for %d\n", c->fd);
342 assert (setsockopt (c->fd, IPPROTO_TCP, TCP_CORK, (int[]){1}, sizeof (int)) >= 0);
343 }
344}
345}}} */
346
347
348
349/* {{{ CPU PART OF CONNECTION */
350
351/* {{{ TIMEOUT */
352int set_connection_timeout (connection_job_t C, double timeout) /* {{{ */ {
353 struct connection_info *c = CONN_INFO (C);
354
355 if (c->flags & C_ERROR) { return 0; }
356
357 __sync_fetch_and_and (&c->flags, ~C_ALARM);
358
359 if (timeout > 0) {
360 job_timer_insert (C, precise_now + timeout);
361 return 0;
362 } else {
363 job_timer_remove (C);
364 return 0;
365 }
366}
367/* }}} */
368
369int clear_connection_timeout (connection_job_t C) /* {{{ */ {
370 set_connection_timeout (C, 0);
371 return 0;
372}
373/* }}} */
374
375/* }}} */
376
377
378/*
379 can be called from any thread and without lock
380 just sets error code and sends JS_ABORT to connection job
381*/
382void fail_connection (connection_job_t C, int err) /* {{{ */ {
383 struct connection_info *c = CONN_INFO (C);
384
385 if (!(__sync_fetch_and_or (&c->flags, C_ERROR) & C_ERROR)) {
386 c->status = conn_error;
387 if (c->error >= 0) {
388 c->error = err;
389 }
390
391 job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT);
392 }
393}
394/* }}} */
395
396/*
397 just runs ->reader and ->writer virtual methods
398*/
399int cpu_server_read_write (connection_job_t C) /* {{{ */ {
400 struct connection_info *c = CONN_INFO (C);
401
402 c->type->reader (C);
403 c->type->writer (C);
404 return 0;
405}
406/* }}} */
407
408/*
409 frees connection structure, including mpq and buffers
410*/
411int cpu_server_free_connection (connection_job_t C) /* {{{ */ {
412 assert_net_cpu_thread ();
413 assert (C->j_refcnt == 1);
414
415 struct connection_info *c = CONN_INFO (C);
416 if (!(c->flags & C_ERROR)) {
417 vkprintf (0, "target = %p, basic=%d\n", c->target, c->basic_type);
418 }
419 assert (c->flags & C_ERROR);
420 assert (c->flags & C_FAILED);
421 assert (!c->target);
422 assert (!c->io_conn);
423
424 vkprintf (1, "Closing connection socket #%d\n", c->fd);
425
426 while (1) {
427 struct raw_message *raw = mpq_pop_nw (c->out_queue, 4);
428 if (!raw) { break; }
429 rwm_free (raw);
430 free (raw);
431 }
432
433 free_mp_queue (c->out_queue);
434 c->out_queue = NULL;
435
436 while (1) {
437 struct raw_message *raw = mpq_pop_nw (c->in_queue, 4);
438 if (!raw) { break; }
439 rwm_free (raw);
440 free (raw);
441 }
442
443 free_mp_queue (c->in_queue);
444 c->in_queue = NULL;
445
446 if (c->type->crypto_free) {
447 c->type->crypto_free (C);
448 }
449
450 close (c->fd);
451 c->fd = -1;
452
453 MODULE_STAT->allocated_connections --;
454 if (c->basic_type == ct_outbound) {
455 MODULE_STAT->allocated_outbound_connections --;
456 }
457 if (c->basic_type == ct_inbound) {
458 MODULE_STAT->allocated_inbound_connections --;
459 }
460
461 return c->type->free_buffers (C);
462}
463/* }}} */
464
465/*
466 deletes link to io_conn
467 deletes link to target
468 aborts pending queries
469 updates stats
470*/
471int cpu_server_close_connection (connection_job_t C, int who) /* {{{ */ {
472 assert_net_cpu_thread ();
473 struct connection_info *c = CONN_INFO(C);
474
475 assert (c->flags & C_ERROR);
476 assert (c->status == conn_error);
477 assert (c->flags & C_FAILED);
478
479 if (c->error != -17) {
480 MODULE_STAT->total_failed_connections ++;
481 if (!connection_is_active (c->flags)) {
482 MODULE_STAT->total_connect_failures ++;
483 }
484 } else {
485 MODULE_STAT->unused_connections_closed ++;
486 }
487
488 if (c->flags & C_ISDH) {
489 MODULE_STAT->active_dh_connections --;
490 __sync_fetch_and_and (&c->flags, ~C_ISDH);
491 }
492
493 assert (c->io_conn);
494 job_signal (JOB_REF_PASS (c->io_conn), JS_ABORT);
495
496 if (c->target) {
497 MODULE_STAT->outbound_connections --;
498
499 if (connection_is_active (c->flags)) {
500 MODULE_STAT->active_outbound_connections --;
501 }
502
503 job_signal (JOB_REF_PASS (c->target), JS_RUN);
504 } else {
505 MODULE_STAT->inbound_connections --;
506
507 if (connection_is_active (c->flags)) {
508 MODULE_STAT->active_inbound_connections --;
509 }
510 }
511
512 if (connection_is_active (c->flags)) {
513 MODULE_STAT->active_connections --;
514 }
515
516 if (c->flags & C_SPECIAL) {
517 c->flags &= ~C_SPECIAL;
518 int orig_special_connections = __sync_fetch_and_add (&active_special_connections, -1);
519 if (orig_special_connections == max_special_connections) {
520 int i;
521 for (i = 0; i < special_listen_sockets; i++) {
522 connection_job_t LC = connection_get_by_fd_generation (special_socket[i].fd, special_socket[i].generation);
523 assert (LC);
524 job_signal (JOB_REF_PASS (LC), JS_AUX);
525 }
526 }
527 }
528
529 job_timer_remove (C);
530 return 0;
531}
532/* }}} */
533
534int do_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ {
535 connection_job_t C = job;
536
537 struct connection_info *c = CONN_INFO (C);
538
539 if (op == JS_RUN) { // RUN IN NET-CPU THREAD
540 assert_net_cpu_thread ();
541 if (!(c->flags & C_ERROR)) {
542 if (c->flags & C_READY_PENDING) {
543 assert (c->flags & C_CONNECTED);
544 __sync_fetch_and_and (&c->flags, ~C_READY_PENDING);
545 MODULE_STAT->active_outbound_connections ++;
546 MODULE_STAT->active_connections ++;
547 __sync_fetch_and_add (&CONN_TARGET_INFO(c->target)->active_outbound_connections, 1);
548 if (c->status == conn_connecting) {
549 if (!__sync_bool_compare_and_swap (&c->status, conn_connecting, conn_working)) {
550 assert (c->status == conn_error);
551 }
552 }
553 c->type->connected (C);
554 }
555 c->type->read_write (C);
556 }
557 return 0;
558 }
559 if (op == JS_ALARM) { // RUN IN NET-CPU THREAD
560 if (!job_timer_check (job)) {
561 return 0;
562 }
563 if (!(c->flags & C_ERROR)) {
564 c->type->alarm (C);
565 }
566 return 0;
567 }
568 if (op == JS_ABORT) { // RUN IN NET-CPU THREAD
569 assert (c->flags & C_ERROR);
570 if (!(__sync_fetch_and_or (&c->flags, C_FAILED) & C_FAILED)) {
571 c->type->close (C, 0);
572 }
573 return JOB_COMPLETED;
574 }
575 if (op == JS_FINISH) { // RUN IN NET-CPU THREAD
576 assert (C->j_refcnt == 1);
577 c->type->free (C);
578 return job_free (JOB_REF_PASS (C));
579 }
580 return JOB_ERROR;
581}
582/* }}} */
583
584/*
585 allocates inbound or outbound connection
586 runs init_accepted or init_outbound
587 updates stats
588 creates socket_connection
589*/
590connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening_connection_job_t LCJ, unsigned peer, unsigned char peer_ipv6[16], int peer_port) /* {{{ */ {
591 if (cfd < 0) {
592 return NULL;
593 }
594 assert_main_thread ();
595
596 struct conn_target_info *CT = CTJ ? CONN_TARGET_INFO (CTJ) : NULL;
597 struct listening_connection_info *LC = LCJ ? LISTEN_CONN_INFO (LCJ) : NULL;
598
599 unsigned flags;
600 if ((flags = fcntl (cfd, F_GETFL, 0) < 0) || fcntl (cfd, F_SETFL, flags | O_NONBLOCK) < 0) {
601 kprintf ("cannot set O_NONBLOCK on accepted socket %d: %m\n", cfd);
602 MODULE_STAT->accept_nonblock_set_failed ++;
603 close (cfd);
604 return NULL;
605 }
606
607 flags = 1;
608 setsockopt (cfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof (flags));
609 if (tcp_maximize_buffers) {
610 maximize_sndbuf (cfd, 0);
611 maximize_rcvbuf (cfd, 0);
612 }
613
614 if (cfd >= max_connection_fd) {
615 vkprintf (2, "cfd = %d, max_connection_fd = %d\n", cfd, max_connection_fd);
616 MODULE_STAT->accept_connection_limit_failed ++;
617 close (cfd);
618 return NULL;
619 }
620
621 if (cfd > max_connection) {
622 max_connection = cfd;
623 }
624
625 connection_job_t C = create_async_job (do_connection_job, JSC_ALLOW (JC_CONNECTION, JS_RUN) | JSC_ALLOW (JC_CONNECTION, JS_ALARM) | JSC_ALLOW (JC_CONNECTION, JS_ABORT) | JSC_ALLOW (JC_CONNECTION, JS_FINISH), -2, sizeof (struct connection_info), JT_HAVE_TIMER, JOB_REF_NULL);
626
627 struct connection_info *c = CONN_INFO (C);
628 //memset (c, 0, sizeof (*c)); /* no need, create_async_job memsets itself */
629
630 c->fd = cfd;
631 c->target = CTJ;
632 c->generation = new_conn_generation ();
633
634 c->flags = 0;//SS ? C_WANTWR : C_WANTRD;
635 if (LC) {
636 c->flags = C_CONNECTED;
637 }
638
639 int raw = C_RAWMSG;
640
641 if (raw) {
642 c->flags |= C_RAWMSG;
643 rwm_init (&c->in, 0);
644 rwm_init (&c->out, 0);
645 rwm_init (&c->in_u, 0);
646 rwm_init (&c->out_p, 0);
647 } else {
648 assert (0);
649 }
650
651 c->type = CT ? CT->type : LC->type;
652 c->extra = CT ? CT->extra : LC->extra;
653 assert (c->type);
654
655 c->basic_type = CT ? ct_outbound : ct_inbound;
656 c->status = CT ? conn_connecting : conn_working;
657
658 c->flags |= c->type->flags & C_EXTERNAL;
659 if (LC) {
660 c->flags |= LC->flags & C_EXTERNAL;
661 }
662
663 union sockaddr_in46 self;
664 unsigned self_addrlen = sizeof (self);
665 memset (&self, 0, sizeof (self));
666 getsockname (cfd, (struct sockaddr *) &self, &self_addrlen);
667
668 if (self.a4.sin_family == AF_INET) {
669 assert (self_addrlen == sizeof (struct sockaddr_in));
670 c->our_ip = ntohl (self.a4.sin_addr.s_addr);
671 c->our_port = ntohs (self.a4.sin_port);
672 assert (peer);
673 c->remote_ip = peer;
674 } else {
675 assert (self.a6.sin6_family == AF_INET6);
676 assert (!peer);
677 if (is_4in6 (peer_ipv6)) {
678 assert (is_4in6 (self.a6.sin6_addr.s6_addr));
679 c->our_ip = ntohl (extract_4in6 (self.a6.sin6_addr.s6_addr));
680 c->our_port = ntohs (self.a6.sin6_port);
681 c->remote_ip = ntohl (extract_4in6 (peer_ipv6));
682 } else {
683 memcpy (c->our_ipv6, self.a6.sin6_addr.s6_addr, 16);
684 c->our_port = ntohs (self.a6.sin6_port);
685 c->flags |= C_IPV6;
686 memcpy (c->remote_ipv6, peer_ipv6, 16);
687 }
688 }
689 c->remote_port = peer_port;
690
691 c->in_queue = alloc_mp_queue_w ();
692 c->out_queue = alloc_mp_queue_w ();
693 //c->out_packet_queue = alloc_mp_queue_w ();
694
695 if (CT) {
696 vkprintf (1, "New connection %s:%d -> %s:%d\n", show_our_ip (C), c->our_port, show_remote_ip (C), c->remote_port);
697 } else {
698 vkprintf (1, "New connection %s:%d -> %s:%d\n", show_remote_ip (C), c->remote_port, show_our_ip (C), c->our_port);
699 }
700
701
702 int (*func)(connection_job_t) = CT ? CT->type->init_outbound : LC->type->init_accepted;
703
704 vkprintf (3, "func = %p\n", func);
705
706
707 if (func (C) >= 0) {
708 if (CT) {
709 job_incref (CTJ);
710
711 MODULE_STAT->outbound_connections ++;
712 MODULE_STAT->allocated_outbound_connections ++;
713 MODULE_STAT->outbound_connections_created ++;
714
715 CT->outbound_connections ++;
716 } else {
717 MODULE_STAT->inbound_connections_accepted ++;
718 MODULE_STAT->allocated_inbound_connections ++;
719 MODULE_STAT->inbound_connections ++;
720 MODULE_STAT->active_inbound_connections ++;
721 MODULE_STAT->active_connections ++;
722
723 c->listening = LC->fd;
724 c->listening_generation = LC->generation;
725 if (LC->flags & C_NOQACK) {
726 c->flags |= C_NOQACK;
727 }
728
729 c->window_clamp = LC->window_clamp;
730 if (c->window_clamp) {
731 if (setsockopt (cfd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &c->window_clamp, 4) < 0) {
732 vkprintf (0, "error while setting window size for socket %d to %d: %m\n", cfd, c->window_clamp);
733 } else {
734 int t1 = -1, t2 = -1;
735 socklen_t s1 = 4, s2 = 4;
736 getsockopt (cfd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &t1, &s1);
737 getsockopt (cfd, SOL_SOCKET, SO_RCVBUF, &t2, &s2);
738 vkprintf (2, "window clamp for socket %d is %d, receive buffer is %d\n", cfd, t1, t2);
739 }
740 }
741
742 if (LC->flags & C_SPECIAL) {
743 c->flags |= C_SPECIAL;
744 __sync_fetch_and_add (&active_special_connections, 1);
745
746 if (active_special_connections > max_special_connections) {
747 vkprintf (active_special_connections >= max_special_connections + 16 ? 0 : 1, "ERROR: forced to accept connection when special connections limit was reached (%d of %d)\n", active_special_connections, max_special_connections);
748 }
749 if (active_special_connections >= max_special_connections) {
750 vkprintf (2, "**Invoking epoll_remove(%d)\n", LC->fd);
751 epoll_remove (LC->fd);
752 }
753 }
754 }
755
756 alloc_new_socket_connection (C);
757
758 MODULE_STAT->allocated_connections ++;
759
760 return C;
761 } else {
762 MODULE_STAT->accept_init_accepted_failed ++;
763 if (c->flags & C_RAWMSG) {
764 rwm_free (&c->in);
765 rwm_free (&c->out);
766 rwm_free (&c->in_u);
767 rwm_free (&c->out_p);
768 }
769 c->basic_type = ct_none;
770 close (cfd);
771
772 free_mp_queue (c->in_queue);
773 free_mp_queue (c->out_queue);
774
775 job_free (JOB_REF_PASS (C));
776 this_job_thread->jobs_active --;
777
778 return NULL;
779 }
780}
781/* }}} */
782
783/* }}} */
784
785/* {{{ IO PART OF CONNECTION */
786
787/*
788 Have to have lock on socket_connection to run this method
789
790 removes event from evemt heap and epoll
791*/
792void fail_socket_connection (socket_connection_job_t C, int who) /* {{{ */ {
793 assert_main_thread ();
794
795 struct socket_connection_info *c = SOCKET_CONN_INFO (C);
796 assert (C->j_flags & JF_LOCKED);
797
798 if (!(__sync_fetch_and_or (&c->flags, C_ERROR) & C_ERROR)) {
799 job_timer_remove (C);
800
801 remove_event_from_heap (c->ev, 0);
802 connection_event_incref (c->fd, -1);
803 epoll_insert (c->fd, 0);
804 c->ev = NULL;
805
806 c->type->socket_close (C);
807
808 fail_connection (c->conn, who);
809 }
810}
811/* }}} */
812
813/*
814 Frees socket_connection structure
815 Removes link to cpu_connection
816*/
817int net_server_socket_free (socket_connection_job_t C) /* {{{ */ {
818 assert_net_net_thread ();
819
820 struct socket_connection_info *c = SOCKET_CONN_INFO (C);
821
822 assert (!c->ev);
823 assert (c->flags & C_ERROR);
824
825 if (c->conn) {
826 fail_connection (c->conn, -201);
827 job_decref (JOB_REF_PASS (c->conn));
828 }
829
830 while (1) {
831 struct raw_message *raw = mpq_pop_nw (c->out_packet_queue, 4);
832 if (!raw) { break; }
833 rwm_free (raw);
834 free (raw);
835 }
836
837 free_mp_queue (c->out_packet_queue);
838
839 rwm_free (&c->out);
840
841 MODULE_STAT->allocated_socket_connections --;
842 return 0;
843}
844/* }}} */
845
846/*
847 Reads data from socket until all data is read
848 Then puts it to conn->in_queue and send JS_RUN signal
849*/
850int net_server_socket_reader (socket_connection_job_t C) /* {{{ */ {
851 assert_net_net_thread ();
852 struct socket_connection_info *c = SOCKET_CONN_INFO (C);
853
854 while ((c->flags & (C_WANTRD | C_NORD | C_STOPREAD | C_ERROR | C_NET_FAILED)) == C_WANTRD) {
855 if (!tcp_recv_buffers_num) {
856 prealloc_tcp_buffers ();
857 }
858
859 struct raw_message *in = malloc (sizeof (*in));
860 rwm_init (in, 0);
861
862 int s = tcp_recv_buffers_total_size;
863 assert (s > 0);
864
865 int p = 1;
866
867 __sync_fetch_and_or (&c->flags, C_NORD);
868 int r = readv (c->fd, tcp_recv_iovec + p, MAX_TCP_RECV_BUFFERS + 1 - p);
869 MODULE_STAT->tcp_readv_calls ++;
870
871 if (r <= 0) {
872 if (r < 0 && errno == EAGAIN) {
873 } else if (r < 0 && errno == EINTR) {
874 __sync_fetch_and_and (&c->flags, ~C_NORD);
875 MODULE_STAT->tcp_readv_intr ++;
876 continue;
877 } else {
878 vkprintf (1, "Connection %d: Fatal error %m\n", c->fd);
879 job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT);
880 __sync_fetch_and_or (&c->flags, C_NET_FAILED);
881 return 0;
882 }
883 } else {
884 __sync_fetch_and_and (&c->flags, ~C_NORD);
885 }
886
887 if (verbosity > 0 && r < 0 && errno != EAGAIN) {
888 perror ("recv()");
889 }
890 vkprintf (2, "readv from %d: %d read out of %d\n", c->fd, r, s);
891
892 if (r <= 0) {
893 rwm_free (in);
894 free (in);
895 break;
896 }
897
898 MODULE_STAT->tcp_readv_bytes += r;
899 struct msg_part *mp = 0;
900 assert (p == 1);
901 mp = new_msg_part (0, tcp_recv_buffers[p - 1]);
902 assert (tcp_recv_buffers[p - 1]->data == tcp_recv_iovec[p].iov_base);
903 mp->offset = 0;
904 mp->data_end = r > tcp_recv_iovec[p].iov_len ? tcp_recv_iovec[p].iov_len : r;
905 r -= mp->data_end;
906 in->first = in->last = mp;
907 in->total_bytes = mp->data_end;
908 in->first_offset = 0;
909 in->last_offset = mp->data_end;
910 p ++;
911
912 int rs = r;
913 while (rs > 0) {
914 mp = new_msg_part (0, tcp_recv_buffers[p - 1]);
915 mp->offset = 0;
916 mp->data_end = rs > tcp_recv_iovec[p].iov_len ? tcp_recv_iovec[p].iov_len : rs;
917 rs -= mp->data_end;
918 in->last->next = mp;
919 in->last = mp;
920 in->last_offset = mp->data_end;
921 in->total_bytes += mp->data_end;
922 p ++;
923 }
924 assert (!rs);
925
926 int i;
927 for (i = 0; i < p - 1; i++) {
928 struct msg_buffer *X = alloc_msg_buffer (tcp_recv_buffers[i], TCP_RECV_BUFFER_SIZE);
929 if (!X) {
930 vkprintf (0, "**FATAL**: cannot allocate tcp receive buffer\n");
931 assert (0);
932 }
933 tcp_recv_buffers[i] = X;
934 tcp_recv_iovec[i + 1].iov_base = X->data;
935 tcp_recv_iovec[i + 1].iov_len = X->chunk->buffer_size;
936 }
937
938 assert (c->conn);
939 mpq_push_w (CONN_INFO(c->conn)->in_queue, in, 0);
940 job_signal (JOB_REF_CREATE_PASS (c->conn), JS_RUN);
941 }
942 return 0;
943}
944/* }}} */
945
946/*
947 Get data from out raw message and writes it to socket
948*/
949int net_server_socket_writer (socket_connection_job_t C) /* {{{ */{
950 assert_net_net_thread ();
951 struct socket_connection_info *c = SOCKET_CONN_INFO (C);
952
953 struct raw_message *out = &c->out;
954
955 int check_watermark = out->total_bytes >= c->write_low_watermark;
956 int t = 0;
957
958 int stop = c->flags & C_STOPWRITE;
959
960 while ((c->flags & (C_WANTWR | C_NOWR | C_ERROR | C_NET_FAILED)) == C_WANTWR) {
961 if (!out->total_bytes) {
962 __sync_fetch_and_and (&c->flags, ~C_WANTWR);
963 break;
964 }
965
966 struct iovec iov[384];
967 int iovcnt = -1;
968
969 int s = tcp_prepare_iovec (iov, &iovcnt, sizeof (iov) / sizeof (iov[0]), out);
970 assert (iovcnt > 0 && s > 0);
971
972 __sync_fetch_and_or (&c->flags, C_NOWR);
973 int r = writev (c->fd, iov, iovcnt);
974 MODULE_STAT->tcp_writev_calls ++;
975
976 if (r <= 0) {
977 if (r < 0 && errno == EAGAIN) {
978 if (++c->eagain_count > 100) {
979 kprintf ("Too much EAGAINs for connection %d (%s), dropping\n", c->fd, show_remote_socket_ip (C));
980 job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT);
981 __sync_fetch_and_or (&c->flags, C_NET_FAILED);
982 return 0;
983 }
984 } else if (r < 0 && errno == EINTR) {
985 __sync_fetch_and_and (&c->flags, ~C_NOWR);
986 MODULE_STAT->tcp_writev_intr ++;
987 continue;
988 } else {
989 vkprintf (1, "Connection %d: Fatal error %m\n", c->fd);
990 job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT);
991 __sync_fetch_and_or (&c->flags, C_NET_FAILED);
992 return 0;
993 }
994 } else {
995 __sync_fetch_and_and (&c->flags, ~C_NOWR);
996 MODULE_STAT->tcp_writev_bytes += r;
997 c->eagain_count = 0;
998 t += r;
999 }
1000
1001 if (verbosity && r < 0 && errno != EAGAIN) {
1002 perror ("writev()");
1003 }
1004 vkprintf (2, "send/writev() to %d: %d written out of %d in %d chunks\n", c->fd, r, s, iovcnt);
1005
1006 if (r > 0) {
1007 rwm_skip_data (out, r);
1008 if (c->type->data_sent) {
1009 c->type->data_sent (C, r);
1010 }
1011 }
1012 }
1013
1014 if (check_watermark && out->total_bytes < c->write_low_watermark) {
1015 if (c->type->ready_to_write) {
1016 c->type->ready_to_write (C);
1017 }
1018 }
1019
1020 if (stop && !(c->flags & C_WANTWR)) {
1021 vkprintf (1, "Closing write_close socket\n");
1022 job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT);
1023 __sync_fetch_and_or (&c->flags, C_NET_FAILED);
1024 }
1025
1026 vkprintf (2, "socket_server_writer: written %d bytes to %d, flags=0x%08x\n", t, c->fd, c->flags);
1027 return out->total_bytes;
1028}
1029/* }}} */
1030
1031/*
1032 checks if outbound connections become connected
1033 merges contents of out_packet_queue mpq to out raw message
1034 runs socket_reader and socket_writer
1035*/
1036int net_server_socket_read_write (socket_connection_job_t C) /* {{{ */ {
1037 assert_net_net_thread ();
1038
1039 struct socket_connection_info *c = SOCKET_CONN_INFO (C);
1040
1041 if (c->flags & C_ERROR) {
1042 return 0;
1043 }
1044
1045 if (!(c->flags & C_CONNECTED)) {
1046 if (!(c->flags & C_NOWR)) {
1047 __sync_fetch_and_and (&c->flags, C_PERMANENT);
1048 __sync_fetch_and_or (&c->flags, C_WANTRD | C_CONNECTED);
1049 __sync_fetch_and_or (&CONN_INFO(c->conn)->flags, C_READY_PENDING | C_CONNECTED);
1050
1051 c->type->socket_connected (C);
1052 job_signal (JOB_REF_CREATE_PASS (c->conn), JS_RUN);
1053 } else {
1054 return compute_conn_events (C);
1055 }
1056 }
1057
1058 vkprintf (2, "END processing connection %d, flags=%d\n", c->fd, c->flags);
1059
1060 while ((c->flags & (C_WANTRD | C_NORD | C_ERROR | C_STOPREAD | C_NET_FAILED)) == C_WANTRD) {
1061 c->type->socket_reader (C);
1062 }
1063
1064 struct raw_message *out = &c->out;
1065
1066 while (1) {
1067 struct raw_message *raw = mpq_pop_nw (c->out_packet_queue, 4);
1068 if (!raw) { break; }
1069 rwm_union (out, raw);
1070 free (raw);
1071 }
1072
1073 if (out->total_bytes) {
1074 __sync_fetch_and_or (&c->flags, C_WANTWR);
1075 }
1076
1077 while ((c->flags & (C_NOWR | C_ERROR | C_WANTWR | C_NET_FAILED)) == C_WANTWR) {
1078 c->type->socket_writer (C);
1079 }
1080
1081 return compute_conn_events (C);
1082}
1083/* }}} */
1084
1085/*
1086 removes C_NOWR and C_NORD flags if necessary
1087 reads errors from socket
1088 sends JS_RUN signal to socket_connection
1089*/
1090int net_server_socket_read_write_gateway (int fd, void *data, event_t *ev) /* {{{ */ {
1091 assert_main_thread ();
1092 if (!data) { return EVA_REMOVE; }
1093
1094 assert ((int)ev->refcnt);
1095
1096 socket_connection_job_t C = (socket_connection_job_t) data;
1097 assert (C);
1098 struct socket_connection_info *c = SOCKET_CONN_INFO (C);
1099 assert (c->type);
1100
1101 if (ev->ready & EVT_FROM_EPOLL) {
1102 // update C_NORD / C_NOWR only if we arrived from epoll()
1103 vkprintf (2, "fd=%d state=%d ready=%d epoll_ready=%d\n", ev->fd, ev->state, ev->ready, ev->epoll_ready);
1104 ev->ready &= ~EVT_FROM_EPOLL;
1105
1106 int clear_flags = 0;
1107 if ((ev->state & EVT_READ) && (ev->ready & EVT_READ)) {
1108 clear_flags |= C_NORD;
1109 }
1110 if ((ev->state & EVT_WRITE) && (ev->ready & EVT_WRITE)) {
1111 clear_flags |= C_NOWR;
1112 }
1113 __sync_fetch_and_and (&c->flags, ~clear_flags);
1114
1115 if (ev->epoll_ready & EPOLLERR) {
1116 int error = 0;
1117 socklen_t errlen = sizeof (error);
1118 if (getsockopt (c->fd, SOL_SOCKET, SO_ERROR, (void *) &error, &errlen) == 0) {
1119 vkprintf (1, "got error for tcp socket #%d, [%s]:%d : %s\n", c->fd, show_remote_socket_ip (C), c->remote_port, strerror (error));
1120 }
1121
1122 job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT);
1123 return EVA_REMOVE;
1124 }
1125 if (ev->epoll_ready & (EPOLLHUP | EPOLLERR | EPOLLRDHUP | EPOLLPRI)) {
1126 vkprintf (!(ev->epoll_ready & EPOLLPRI), "socket %d: disconnected (epoll_ready=%02x), cleaning\n", c->fd, ev->epoll_ready);
1127
1128 job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT);
1129 return EVA_REMOVE;
1130 }
1131 }
1132
1133 job_signal (JOB_REF_CREATE_PASS (C), JS_RUN);
1134 return EVA_CONTINUE;
1135}
1136/* }}} */
1137
1138int do_socket_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ {
1139 socket_connection_job_t C = job;
1140
1141 struct socket_connection_info *c = SOCKET_CONN_INFO (C);
1142
1143 if (op == JS_ABORT) { // MAIN THREAD
1144 fail_socket_connection (C, -200);
1145 return JOB_COMPLETED;
1146 }
1147 if (op == JS_RUN) { // IO THREAD
1148 if (!(c->flags & C_ERROR)) {
1149 int res = c->type->socket_read_write (job);
1150 if (res != c->current_epoll_status) {
1151 c->current_epoll_status = res;
1152 return JOB_SENDSIG (JS_AUX);
1153 }
1154 }
1155 return 0;
1156 }
1157 if (op == JS_AUX) { // MAIN THREAD
1158 if (!(c->flags & C_ERROR)) {
1159 epoll_insert (c->fd, compute_conn_events (job));
1160 }
1161 return 0;
1162 }
1163
1164 if (op == JS_FINISH) { // ANY THREAD
1165 assert (C->j_refcnt == 1);
1166 c->type->socket_free (C);
1167 return job_free (JOB_REF_PASS (C));
1168 }
1169
1170 return JOB_ERROR;
1171}
1172/* }}} */
1173
1174/*
1175 creates socket_connection structure
1176 insert event to epoll
1177*/
1178socket_connection_job_t alloc_new_socket_connection (connection_job_t C) /* {{{ */ {
1179 assert_main_thread ();
1180 struct connection_info *c = CONN_INFO (C);
1181
1182 socket_connection_job_t S = create_async_job (do_socket_connection_job, JSC_ALLOW (JC_CONNECTION_IO, JS_RUN) | JSC_ALLOW (JC_CONNECTION_IO, JS_ALARM) | JSC_ALLOW (JC_EPOLL, JS_ABORT) | JSC_ALLOW (JC_CONNECTION_IO, JS_FINISH) | JSC_ALLOW (JC_EPOLL, JS_AUX), -2, sizeof (struct socket_connection_info), JT_HAVE_TIMER, JOB_REF_NULL);
1183 S->j_refcnt = 2;
1184 struct socket_connection_info *s = SOCKET_CONN_INFO (S);
1185 //memset (s, 0, sizeof (*s)); /* no need, create_async_job memsets itself */
1186
1187 s->fd = c->fd;
1188 s->type = c->type;
1189 s->conn = job_incref (C);
1190 s->flags = C_WANTWR | C_WANTRD | (c->flags & C_CONNECTED);
1191
1192 s->our_ip = c->our_ip;
1193 s->our_port = c->our_port;
1194 memcpy (s->our_ipv6, c->our_ipv6, 16);
1195
1196 s->remote_ip = c->remote_ip;
1197 s->remote_port = c->remote_port;
1198 memcpy (s->remote_ipv6, c->remote_ipv6, 16);
1199
1200 s->out_packet_queue = alloc_mp_queue_w ();
1201
1202 struct event_descr *ev = Events + s->fd;
1203 assert (!ev->data);
1204 assert (!ev->refcnt);
1205
1206 s->ev = ev;
1207
1208 epoll_sethandler (s->fd, 0, net_server_socket_read_write_gateway, S);
1209
1210 s->current_epoll_status = compute_conn_events (S);
1211 epoll_insert (s->fd, s->current_epoll_status);
1212
1213 c->io_conn = S;
1214
1215 rwm_init (&s->out, 0);
1216 unlock_job (JOB_REF_CREATE_PASS (S));
1217
1218 MODULE_STAT->allocated_socket_connections ++;
1219 return S;
1220}
1221/* }}} */
1222/* }}} */
1223
1224/* {{{ LISTENING CONNECTION */
1225
1226/*
1227 accepts new connections
1228 executes alloc_new_connection ()
1229*/
1230int net_accept_new_connections (listening_connection_job_t LCJ) /* {{{ */ {
1231 struct listening_connection_info *LC = LISTEN_CONN_INFO (LCJ);
1232
1233 union sockaddr_in46 peer;
1234 unsigned peer_addrlen;
1235 int cfd, acc = 0;
1236
1237 while (Events[LC->fd].state & EVT_IN_EPOLL) {
1238 peer_addrlen = sizeof (peer);
1239 memset (&peer, 0, sizeof (peer));
1240 cfd = accept (LC->fd, (struct sockaddr *) &peer, &peer_addrlen);
1241
1242 vkprintf (2, "%s: cfd = %d\n", __func__, cfd);
1243 if (cfd < 0) {
1244 if (errno != EAGAIN) {
1245 MODULE_STAT->accept_calls_failed ++;
1246 }
1247 if (!acc) {
1248 vkprintf ((errno == EAGAIN) * 2, "accept(%d) unexpectedly returns %d: %m\n", LC->fd, cfd);
1249 }
1250 break;
1251 }
1252
1253 acc ++;
1254 MODULE_STAT->inbound_connections_accepted ++;
1255
1256 if (max_accept_rate) {
1257 cur_accept_rate_remaining += (precise_now - cur_accept_rate_time) * max_accept_rate;
1258 cur_accept_rate_time = precise_now;
1259 if (cur_accept_rate_remaining > max_accept_rate) {
1260 cur_accept_rate_remaining = max_accept_rate;
1261 }
1262
1263 if (cur_accept_rate_remaining < 1) {
1264 MODULE_STAT->accept_rate_limit_failed ++;
1265 close (cfd);
1266 continue;
1267 }
1268
1269 cur_accept_rate_remaining -= 1;
1270 }
1271
1272 if (LC->flags & C_IPV6) {
1273 assert (peer_addrlen == sizeof (struct sockaddr_in6));
1274 assert (peer.a6.sin6_family == AF_INET6);
1275 } else {
1276 assert (peer_addrlen == sizeof (struct sockaddr_in));
1277 assert (peer.a4.sin_family == AF_INET);
1278 }
1279
1280 connection_job_t C;
1281 if (peer.a4.sin_family == AF_INET) {
1282 C = alloc_new_connection (cfd, NULL, LCJ,
1283 ntohl (peer.a4.sin_addr.s_addr), NULL, ntohs (peer.a4.sin_port));
1284 } else {
1285 C = alloc_new_connection (cfd, NULL, LCJ,
1286 0, peer.a6.sin6_addr.s6_addr, ntohs (peer.a6.sin6_port));
1287 }
1288 if (C) {
1289 assert (CONN_INFO(C)->io_conn);
1290 unlock_job (JOB_REF_PASS (C));
1291 }
1292 }
1293 return 0;
1294}
1295/* }}} */
1296
1297int do_listening_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ {
1298 listening_connection_job_t LCJ = job;
1299
1300 if (op == JS_RUN) {
1301 net_accept_new_connections (LCJ);
1302 return 0;
1303 } else if (op == JS_AUX) {
1304 vkprintf (2, "**Invoking epoll_insert(%d,%d)\n", LISTEN_CONN_INFO(LCJ)->fd, EVT_RWX);
1305 epoll_insert (LISTEN_CONN_INFO(LCJ)->fd, EVT_RWX);
1306 return 0;
1307 }
1308 return JOB_ERROR;
1309}
1310/* }}} */
1311
1312int init_listening_connection_ext (int fd, conn_type_t *type, void *extra, int mode, int prio) /* {{{ */ {
1313 if (check_conn_functions (type, 1) < 0) {
1314 return -1;
1315 }
1316 if (fd >= max_connection_fd) {
1317 vkprintf (0, "TOO big fd for listening connection %d (max %d)\n", fd, max_connection_fd);
1318 return -1;
1319 }
1320 if (fd > max_connection) {
1321 max_connection = fd;
1322 }
1323
1324 listening_connection_job_t LCJ = create_async_job (do_listening_connection_job, JSC_ALLOW (JC_EPOLL, JS_RUN) | JSC_ALLOW (JC_EPOLL, JS_AUX) | JSC_ALLOW (JC_EPOLL, JS_FINISH), -2, sizeof (struct listening_connection_info), JT_HAVE_TIMER, JOB_REF_NULL);
1325 LCJ->j_refcnt = 2;
1326
1327 struct listening_connection_info *LC = LISTEN_CONN_INFO (LCJ);
1328 memset (LC, 0, sizeof (*LC));
1329
1330 LC->fd = fd;
1331 LC->type = type;
1332 LC->extra = extra;
1333
1334 struct event_descr *ev = Events + fd;
1335 assert (!ev->data);
1336 assert (!ev->refcnt);
1337 LC->ev = ev;
1338
1339 LC->generation = new_conn_generation ();
1340
1341 if (mode & SM_LOWPRIO) {
1342 prio = 10;
1343 }
1344
1345 if (mode & SM_SPECIAL) {
1346 LC->flags |= C_SPECIAL;
1347 int idx = __sync_fetch_and_add (&special_listen_sockets, 1);
1348 assert (idx < MAX_SPECIAL_LISTEN_SOCKETS);
1349 special_socket[idx].fd = LC->fd;
1350 special_socket[idx].generation = LC->generation;
1351 }
1352
1353 if (mode & SM_NOQACK) {
1354 LC->flags |= C_NOQACK;
1355 disable_qack (LC->fd);
1356 }
1357
1358 if (mode & SM_IPV6) {
1359 LC->flags |= C_IPV6;
1360 }
1361
1362 if (mode & SM_RAWMSG) {
1363 LC->flags |= C_RAWMSG;
1364 }
1365
1366 epoll_sethandler (fd, prio, net_server_socket_read_write_gateway, LCJ);
1367 epoll_insert (fd, EVT_RWX);
1368
1369 MODULE_STAT->listening_connections ++;
1370
1371 unlock_job (JOB_REF_PASS (LCJ));
1372
1373 return 0;
1374}
1375
1376int init_listening_connection (int fd, conn_type_t *type, void *extra) {
1377 return init_listening_connection_ext (fd, type, extra, 0, -10);
1378}
1379
1380int init_listening_tcpv6_connection (int fd, conn_type_t *type, void *extra, int mode) {
1381 return init_listening_connection_ext (fd, type, extra, mode, -10);
1382}
1383/* }}} */
1384
1385/* }}} */
1386
1387/* {{{ connection refcnt */
1388void connection_event_incref (int fd, long long val) {
1389 struct event_descr *ev = &Events[fd];
1390
1391 if (!__sync_add_and_fetch (&ev->refcnt, val) && ev->data) {
1392 socket_connection_job_t C = ev->data;
1393 ev->data = NULL;
1394 job_decref (JOB_REF_PASS (C));
1395 }
1396}
1397
1398connection_job_t connection_get_by_fd (int fd) {
1399 struct event_descr *ev = &Events[fd];
1400 if (!(int)(ev->refcnt) || !ev->data) { return NULL; }
1401
1402 while (1) {
1403 long long v = __sync_fetch_and_add (&ev->refcnt, (1ll << 32));
1404 if (((int)v) != 0) { break; }
1405 v = __sync_fetch_and_add (&ev->refcnt, -(1ll << 32));
1406 if (((int)v) != 0) { continue; }
1407 return NULL;
1408 }
1409 __sync_fetch_and_add (&ev->refcnt, 1 - (1ll << 32));
1410 socket_connection_job_t C = job_incref (ev->data);
1411
1412 connection_event_incref (fd, -1);
1413
1414 if (C->j_execute == &do_listening_connection_job) {
1415 return C;
1416 }
1417
1418 assert (C->j_execute == &do_socket_connection_job);
1419
1420 struct socket_connection_info *c = SOCKET_CONN_INFO (C);
1421 if (c->flags & C_ERROR) {
1422 job_decref (JOB_REF_PASS (C));
1423 return NULL;
1424 } else {
1425 assert (c->conn);
1426 connection_job_t C2 = job_incref (c->conn);
1427 job_decref (JOB_REF_PASS (C));
1428 return C2;
1429 }
1430}
1431
1432connection_job_t connection_get_by_fd_generation (int fd, int generation) {
1433 connection_job_t C = connection_get_by_fd (fd);
1434 if (C && CONN_INFO(C)->generation != generation) {
1435 job_decref (JOB_REF_PASS (C));
1436 return NULL;
1437 } else {
1438 return C;
1439 }
1440}
1441/* }}} */
1442
1443
1444/* {{{ Sample server functions */
1445
1446int server_check_ready (connection_job_t C) /* {{{ */ {
1447 struct connection_info *c = CONN_INFO (C);
1448 if (c->status == conn_none || c->status == conn_connecting) {
1449 return c->ready = cr_notyet;
1450 }
1451 if (c->status == conn_error || c->ready == cr_failed) {
1452 return c->ready = cr_failed;
1453 }
1454 return c->ready = cr_ok;
1455}
1456/* }}} */
1457
1458int server_noop (connection_job_t C) /* {{{ */ {
1459 return 0;
1460}
1461/* }}} */
1462
1463int server_failed (connection_job_t C) /* {{{ */ {
1464 kprintf ("connection %d: call to pure virtual method\n", CONN_INFO(C)->fd);
1465 assert (0);
1466 return -1;
1467}
1468/* }}} */
1469
1470int server_flush (connection_job_t C) /* {{{ */ {
1471 //job_signal (job_incref (C), JS_RUN);
1472 return 0;
1473}
1474/* }}} */
1475
1476int check_conn_functions (conn_type_t *type, int listening) /* {{{ */ {
1477 if (type->magic != CONN_FUNC_MAGIC) {
1478 return -1;
1479 }
1480 if (!type->title) {
1481 type->title = "(unknown)";
1482 }
1483 if (!type->socket_read_write) {
1484 type->socket_read_write = net_server_socket_read_write;
1485 }
1486 if (!type->socket_reader) {
1487 type->socket_reader = net_server_socket_reader;
1488 }
1489 if (!type->socket_writer) {
1490 type->socket_writer = net_server_socket_writer;
1491 }
1492 if (!type->socket_close) {
1493 type->socket_close = server_noop;
1494 }
1495
1496 if (!type->accept) {
1497 if (listening) {
1498 type->accept = net_accept_new_connections;
1499 } else {
1500 type->accept = server_failed;
1501 }
1502 }
1503 if (!type->init_accepted) {
1504 if (listening) {
1505 type->init_accepted = server_noop;
1506 } else {
1507 type->init_accepted = server_failed;
1508 }
1509 }
1510
1511 if (!type->close) {
1512 type->close = cpu_server_close_connection;
1513 }
1514 if (!type->init_outbound) {
1515 type->init_outbound = server_noop;
1516 }
1517 if (!type->wakeup) {
1518 type->wakeup = server_noop;
1519 }
1520 if (!type->alarm) {
1521 type->alarm = server_noop;
1522 }
1523 if (!type->connected) {
1524 type->connected = server_noop;
1525 }
1526 if (!type->flush) {
1527 type->flush = server_flush;
1528 }
1529 if (!type->check_ready) {
1530 type->check_ready = server_check_ready;
1531 }
1532 if (!type->read_write) {
1533 type->read_write = cpu_server_read_write;
1534 }
1535 if (!type->free) {
1536 type->free = cpu_server_free_connection;
1537 }
1538 if (!type->socket_connected) {
1539 type->socket_connected = server_noop;
1540 }
1541 if (!type->socket_free) {
1542 type->socket_free = net_server_socket_free;
1543 }
1544 if (type->flags & C_RAWMSG) {
1545 if (!type->free_buffers) {
1546 type->free_buffers = cpu_tcp_free_connection_buffers;
1547 }
1548 if (!type->reader) {
1549 type->reader = cpu_tcp_server_reader;
1550 if (!type->parse_execute) {
1551 return -1;
1552 }
1553 }
1554 if (!type->writer) {
1555 type->writer = cpu_tcp_server_writer;
1556 }
1557 } else {
1558 if (!type->free_buffers) {
1559 assert (0);
1560 }
1561 if (!type->reader) {
1562 assert (0);
1563 }
1564 if (!type->writer) {
1565 assert (0);
1566 }
1567 }
1568 return 0;
1569}
1570/* }}} */
1571
1572/* }}} */
1573
1574
1575
1576/* CONN TARGETS {{{ */
1577
1578void compute_next_reconnect (conn_target_job_t CT) /* {{{ */{
1579 struct conn_target_info *S = CONN_TARGET_INFO (CT);
1580 if (S->next_reconnect_timeout < S->reconnect_timeout || S->active_outbound_connections) {
1581 S->next_reconnect_timeout = S->reconnect_timeout;
1582 }
1583 S->next_reconnect = precise_now + S->next_reconnect_timeout;
1584 if (!S->active_outbound_connections && S->next_reconnect_timeout < MAX_RECONNECT_INTERVAL) {
1585 S->next_reconnect_timeout = S->next_reconnect_timeout * 1.5 + drand48_j () * 0.2;
1586 }
1587}
1588/* }}} */
1589
1590static void count_connection_num (connection_job_t C, void *good_c, void *stopped_c, void *bad_c) /* {{{ */ {
1591 int cr = CONN_INFO(C)->type->check_ready (C);
1592 switch (cr) {
1593 case cr_notyet:
1594 case cr_busy:
1595 break;
1596 case cr_ok:
1597 (*(int *)good_c)++;
1598 break;
1599 case cr_stopped:
1600 (*(int *)stopped_c)++;
1601 break;
1602 case cr_failed:
1603 (*(int *)bad_c)++;
1604 break;
1605 default:
1606 assert (0);
1607 }
1608}
1609/* }}} */
1610
1611static void find_bad_connection (connection_job_t C, void *x) /* {{{ */ {
1612 connection_job_t *T = x;
1613 if (*T) { return; }
1614 if (CONN_INFO(C)->flags & C_ERROR) {
1615 *T = C;
1616 }
1617}
1618/* }}} */
1619
1620/*
1621 Deletes failed connections (with flag C_ERROR) from target's tree
1622*/
1623void destroy_dead_target_connections (conn_target_job_t CTJ) /* {{{ */ {
1624 struct conn_target_info *CT = CONN_TARGET_INFO (CTJ);
1625
1626 struct tree_connection *T = CT->conn_tree;
1627 if (T) {
1628 __sync_fetch_and_add (&T->refcnt, 1);
1629 }
1630
1631 while (1) {
1632 connection_job_t CJ = NULL;
1633 tree_act_ex_connection (T, find_bad_connection, &CJ);
1634 if (!CJ) { break; }
1635
1636 if (connection_is_active (CONN_INFO (CJ)->flags)) {
1637 __sync_fetch_and_add (&CT->active_outbound_connections, -1);
1638 }
1639 __sync_fetch_and_add (&CT->outbound_connections, -1);
1640
1641 T = tree_delete_connection (T, CJ);
1642 }
1643
1644 int good_c = 0, bad_c = 0, stopped_c = 0;
1645
1646 tree_act_ex3_connection (T, count_connection_num, &good_c, &stopped_c, &bad_c);
1647
1648 int was_ready = CT->ready_outbound_connections;
1649 CT->ready_outbound_connections = good_c;
1650
1651 if (was_ready != CT->ready_outbound_connections) {
1652 MODULE_STAT->ready_outbound_connections += CT->ready_outbound_connections - was_ready;
1653 }
1654
1655 if (was_ready && !CT->ready_outbound_connections) {
1656 MODULE_STAT->ready_targets --;
1657 }
1658 if (!was_ready && CT->ready_outbound_connections) {
1659 MODULE_STAT->ready_targets ++;
1660 }
1661
1662 if (T == CT->conn_tree) {
1663 tree_free_connection (T);
1664 } else {
1665 struct tree_connection *old = CT->conn_tree;
1666 CT->conn_tree = T;
1667 barrier ();
1668 __sync_synchronize ();
1669 free_tree_ptr_connection (old);
1670 }
1671}
1672/* }}} */
1673
1674/*
1675 creates new connections for target
1676 must be called in main thread, because we can allocate new connections only in main thread
1677*/
1678int create_new_connections (conn_target_job_t CTJ) /* {{{ */ {
1679 assert_main_thread ();
1680
1681 destroy_dead_target_connections (CTJ);
1682 struct conn_target_info *CT = CONN_TARGET_INFO (CTJ);
1683
1684 int count = 0, good_c = 0, bad_c = 0, stopped_c = 0, need_c;
1685
1686 tree_act_ex3_connection (CT->conn_tree, count_connection_num, &good_c, &stopped_c, &bad_c);
1687
1688 int was_ready = CT->ready_outbound_connections;
1689 CT->ready_outbound_connections = good_c;
1690
1691 if (was_ready != CT->ready_outbound_connections) {
1692 MODULE_STAT->ready_outbound_connections += CT->ready_outbound_connections - was_ready;
1693 }
1694
1695 if (was_ready && !CT->ready_outbound_connections) {
1696 MODULE_STAT->ready_targets --;
1697 }
1698 if (!was_ready && CT->ready_outbound_connections) {
1699 MODULE_STAT->ready_targets ++;
1700 }
1701
1702 need_c = CT->min_connections + bad_c + ((stopped_c + 1) >> 1);
1703 if (need_c > CT->max_connections) {
1704 need_c = CT->max_connections;
1705 }
1706
1707 if (precise_now >= CT->next_reconnect || CT->active_outbound_connections) {
1708 struct tree_connection *T = CT->conn_tree;
1709 if (T) {
1710 __sync_fetch_and_add (&T->refcnt, 1);
1711 }
1712
1713 while (CT->outbound_connections < need_c) {
1714 if (CT->target.s_addr) {
1715 vkprintf (1, "Creating NEW connection to %s:%d\n", inet_ntoa (CT->target), CT->port);
1716 } else {
1717 vkprintf (1, "Creating NEW ipv6 connection to [%s]:%d\n", show_ipv6 (CT->target_ipv6), CT->port);
1718 }
1719 int cfd = CT->target.s_addr ? client_socket (CT->target.s_addr, CT->port, 0) : client_socket_ipv6 (CT->target_ipv6, CT->port, SM_IPV6);
1720 if (cfd < 0) {
1721 if (CT->target.s_addr) {
1722 vkprintf (1, "error connecting to %s:%d: %m\n", inet_ntoa (CT->target), CT->port);
1723 } else {
1724 vkprintf (1, "error connecting to [%s]:%d\n", show_ipv6 (CT->target_ipv6), CT->port);
1725 }
1726 break;
1727 }
1728
1729 connection_job_t C = alloc_new_connection (cfd, CTJ, NULL,
1730 ntohl (CT->target.s_addr), CT->target_ipv6, CT->port);
1731
1732 if (C) {
1733 assert (CONN_INFO(C)->io_conn);
1734 count ++;
1735 unlock_job (JOB_REF_CREATE_PASS (C));
1736 T = tree_insert_connection (T, C, lrand48_j ());
1737 } else {
1738 break;
1739 }
1740 }
1741
1742 if (T == CT->conn_tree) {
1743 tree_free_connection (T);
1744 } else {
1745 struct tree_connection *old = CT->conn_tree;
1746 CT->conn_tree = T;
1747 __sync_synchronize ();
1748 free_tree_ptr_connection (old);
1749 }
1750
1751 compute_next_reconnect (CTJ);
1752 }
1753
1754
1755 return count;
1756}
1757/* }}} */
1758
1759conn_target_job_t HTarget[PRIME_TARGETS];
1760pthread_mutex_t TargetsLock = PTHREAD_MUTEX_INITIALIZER;
1761
1762/* must be called with mutex held */
1763/* mode = 0 -- lookup, mode = 1 -- insert, mode = -1 -- delete */
1764static conn_target_job_t find_target (struct in_addr ad, int port, conn_type_t *type, void *extra, int mode, conn_target_job_t new_target) /* {{{ */ {
1765 assert (ad.s_addr);
1766 unsigned h1 = ((unsigned long) type * 0xabacaba + ad.s_addr) % PRIME_TARGETS;
1767 h1 = (h1 * 239 + port) % PRIME_TARGETS;
1768 conn_target_job_t *prev = HTarget + h1, cur;
1769 while ((cur = *prev) != 0) {
1770 struct conn_target_info *S = CONN_TARGET_INFO (cur);
1771 if (S->target.s_addr == ad.s_addr && S->port == port && S->type == type && S->extra == extra) {
1772 if (mode < 0) {
1773 *prev = S->hnext;
1774 S->hnext = 0;
1775 return cur;
1776 }
1777 assert (!mode);
1778 return cur;
1779 }
1780 prev = &S->hnext;
1781 }
1782 assert (mode >= 0);
1783 if (mode > 0) {
1784 CONN_TARGET_INFO (new_target)->hnext = HTarget[h1];
1785 HTarget[h1] = new_target;
1786 return new_target;
1787 }
1788 return 0;
1789}
1790/* }}} */
1791
1792/* must be called with mutex held */
1793/* mode = 0 -- lookup, mode = 1 -- insert, mode = -1 -- delete */
1794static conn_target_job_t find_target_ipv6 (unsigned char ad_ipv6[16], int port, conn_type_t *type, void *extra, int mode, conn_target_job_t new_target) /* {{{ */ {
1795 assert (*(long long *)ad_ipv6 || ((long long *) ad_ipv6)[1]);
1796 unsigned h1 = ((unsigned long) type * 0xabacaba) % PRIME_TARGETS;
1797 int i;
1798 for (i = 0; i < 4; i++) {
1799 h1 = ((unsigned long long) h1 * 17239 + ((unsigned *) ad_ipv6)[i]) % PRIME_TARGETS;
1800 }
1801 h1 = (h1 * 239 + port) % PRIME_TARGETS;
1802 conn_target_job_t *prev = HTarget + h1, cur;
1803 while ((cur = *prev) != 0) {
1804 struct conn_target_info *S = CONN_TARGET_INFO (cur);
1805 if (
1806 ((long long *)S->target_ipv6)[1] == ((long long *)ad_ipv6)[1] &&
1807 *(long long *)S->target_ipv6 == *(long long *)ad_ipv6 &&
1808 S->port == port && S->type == type && !S->target.s_addr && S->extra == extra) {
1809 if (mode < 0) {
1810 *prev = S->hnext;
1811 S->hnext = 0;
1812 return cur;
1813 }
1814 assert (!mode);
1815 return cur;
1816 }
1817 prev = &S->hnext;
1818 }
1819 assert (mode >= 0);
1820 if (mode > 0) {
1821 CONN_TARGET_INFO (new_target)->hnext = HTarget[h1];
1822 HTarget[h1] = new_target;
1823 return new_target;
1824 }
1825 return 0;
1826}
1827/* }}} */
1828
1829static int free_target (conn_target_job_t CTJ) /* {{{ */ {
1830 pthread_mutex_lock (&TargetsLock);
1831 struct conn_target_info *CT = CONN_TARGET_INFO (CTJ);
1832 if (CT->global_refcnt > 0 || CT->conn_tree) {
1833 pthread_mutex_unlock (&TargetsLock);
1834 return -1;
1835 }
1836
1837 assert (CT && CT->type && !CT->global_refcnt);
1838 assert (!CT->conn_tree);
1839 if (CT->target.s_addr) {
1840 vkprintf (1, "Freeing unused target to %s:%d\n", inet_ntoa (CT->target), CT->port);
1841 assert (CTJ == find_target (CT->target, CT->port, CT->type, CT->extra, -1, 0));
1842 } else {
1843 vkprintf (1, "Freeing unused ipv6 target to [%s]:%d\n", show_ipv6 (CT->target_ipv6), CT->port);
1844 assert (CTJ == find_target_ipv6 (CT->target_ipv6, CT->port, CT->type, CT->extra, -1, 0));
1845 }
1846
1847 pthread_mutex_unlock (&TargetsLock);
1848
1849 MODULE_STAT->inactive_targets --;
1850 MODULE_STAT->free_targets ++;
1851
1852 job_decref (JOB_REF_PASS (CTJ));
1853
1854 return 1;
1855}
1856 /* }}} */
1857
1858static void fail_connection_gw (connection_job_t C) {
1859 fail_connection (C, -17);
1860}
1861
1862int clean_unused_target (conn_target_job_t CTJ) /* {{{ */ {
1863 assert (CTJ);
1864 struct conn_target_info *CT = CONN_TARGET_INFO (CTJ);
1865 assert (CT->type);
1866 if (CT->global_refcnt) {
1867 return 0;
1868 }
1869 if (CT->conn_tree) {
1870 tree_act_connection (CT->conn_tree, fail_connection_gw);
1871 return 0;
1872 }
1873 job_timer_remove (CTJ);
1874 return 0;
1875}
1876/* }}} */
1877
1878int destroy_target (JOB_REF_ARG (CTJ)) /* {{{ */ {
1879 struct conn_target_info *CT = CONN_TARGET_INFO (CTJ);
1880 assert (CT);
1881 assert (CT->type);
1882 assert (CT->global_refcnt > 0);
1883
1884 int r;
1885 if (!((r = __sync_add_and_fetch (&CT->global_refcnt, -1)))) {
1886 MODULE_STAT->active_targets--;
1887 MODULE_STAT->inactive_targets++;
1888
1889 job_signal (JOB_REF_PASS (CTJ), JS_RUN);
1890 } else {
1891 job_decref (JOB_REF_PASS (CTJ));
1892 }
1893 return r;
1894}
1895/*}}} */
1896
1897int do_conn_target_job (job_t job, int op, struct job_thread *JT) /* {{{ */ {
1898 if (epoll_fd <= 0) {
1899 job_timer_insert (job, precise_now + 0.01);
1900 return 0;
1901 }
1902 conn_target_job_t CTJ = job;
1903 struct conn_target_info *CT = CONN_TARGET_INFO (CTJ);
1904
1905 if (op == JS_ALARM || op == JS_RUN) {
1906 if (op == JS_ALARM && !job_timer_check (job)) {
1907 return 0;
1908 }
1909 if (!CT->global_refcnt) {
1910 destroy_dead_target_connections (CTJ);
1911 clean_unused_target (CTJ);
1912 compute_next_reconnect (CTJ);
1913 } else {
1914 create_new_connections (CTJ);
1915 }
1916
1917 if (CTJ->j_flags & JF_COMPLETED) { return 0; }
1918
1919 if (CT->global_refcnt || CT->conn_tree) {
1920 job_timer_insert (CTJ, precise_now + 0.1);
1921 return 0;
1922 } else {
1923 if (free_target (CTJ) >= 0) {
1924 return JOB_COMPLETED;
1925 } else {
1926 job_timer_insert (CTJ, precise_now + 0.1);
1927 return 0;
1928 }
1929 }
1930 }
1931 if (op == JS_FINISH) {
1932 assert (CTJ->j_flags & JF_COMPLETED);
1933 MODULE_STAT->allocated_targets --;
1934 return job_free (JOB_REF_PASS (job));
1935 }
1936
1937 return JOB_ERROR;
1938}
1939/* }}} */
1940
1941conn_target_job_t create_target (struct conn_target_info *source, int *was_created) /* {{{ */ {
1942 if (check_conn_functions (source->type, 0) < 0) {
1943 return NULL;
1944 }
1945 pthread_mutex_lock (&TargetsLock);
1946
1947 conn_target_job_t T =
1948 source->target.s_addr ?
1949 find_target (source->target, source->port, source->type, source->extra, 0, 0) :
1950 find_target_ipv6 (source->target_ipv6, source->port, source->type, source->extra, 0, 0);
1951
1952 if (T) {
1953 struct conn_target_info *t = CONN_TARGET_INFO (T);
1954
1955 t->min_connections = source->min_connections;
1956 t->max_connections = source->max_connections;
1957 t->reconnect_timeout = source->reconnect_timeout;
1958
1959 if (!__sync_fetch_and_add (&t->global_refcnt, 1)) {
1960 MODULE_STAT->active_targets++;
1961 MODULE_STAT->inactive_targets--;
1962
1963 if (was_created) {
1964 *was_created = 2;
1965 }
1966 } else {
1967 if (was_created) {
1968 *was_created = 0;
1969 }
1970 }
1971
1972 job_incref (T);
1973 } else {
1974 //assert (MODULE_STAT->allocated_targets < MAX_TARGETS);
1975 T = create_async_job (do_conn_target_job, JSC_ALLOW (JC_EPOLL, JS_RUN) | JSC_ALLOW (JC_EPOLL, JS_ABORT) | JSC_ALLOW (JC_EPOLL, JS_ALARM) | JSC_ALLOW (JC_EPOLL, JS_FINISH), -2, sizeof (struct conn_target_info), JT_HAVE_TIMER, JOB_REF_NULL);
1976 T->j_refcnt = 2;
1977
1978 struct conn_target_info *t = CONN_TARGET_INFO (T);
1979 memcpy (t, source, sizeof (*source));
1980 job_timer_init (T);
1981
1982 //t->generation = 1;
1983 MODULE_STAT->active_targets ++;
1984 MODULE_STAT->allocated_targets ++;
1985
1986 if (source->target.s_addr) {
1987 find_target (source->target, source->port, source->type, source->extra, 1, T);
1988 } else {
1989 find_target_ipv6 (source->target_ipv6, source->port, source->type, source->extra, 1, T);
1990 }
1991
1992 if (was_created) {
1993 *was_created = 1;
1994 }
1995 t->global_refcnt = 1;
1996 schedule_job (JOB_REF_CREATE_PASS (T));
1997 }
1998
1999 pthread_mutex_unlock (&TargetsLock);
2000
2001 return T;
2002}
2003/* }}} */
2004
2005
2006/* }}} */
2007
2008
2009
2010
2011void tcp_set_max_connections (int maxconn) /* {{{ */ {
2012 max_connection_fd = maxconn;
2013 if (!max_special_connections || max_special_connections > maxconn) {
2014 max_special_connections = maxconn;
2015 }
2016}
2017/* }}} */
2018
2019int create_all_outbound_connections_limited (int limit) /* {{{ */ {
2020 return 0;
2021 /*int count = 0;
2022 get_utime_monotonic ();
2023 //close_some_unneeded_connections ();
2024 //ready_outbound_connections = ready_targets = 0;
2025 int new_ready_outbound_connections = 0;
2026 int new_ready_targets = 0;
2027
2028 pthread_mutex_lock (&TargetsLock);
2029 conn_target_job_t S;
2030 for (S = CONN_TARGET_INFO(ActiveTargets)->next_target; S != ActiveTargets && count < limit; S = CONN_TARGET_INFO(S)->next_target) {
2031 struct conn_target_info *s = CONN_TARGET_INFO (S);
2032
2033 assert (s->type && s->refcnt > 0);
2034 count += create_new_connections (S);
2035
2036 if (s->ready_outbound_connections) {
2037 new_ready_outbound_connections += s->ready_outbound_connections;
2038 new_ready_targets++;
2039 }
2040 }
2041 pthread_mutex_unlock (&TargetsLock);
2042 MODULE_STAT->ready_targets = new_ready_targets;
2043 MODULE_STAT->ready_outbound_connections = new_ready_outbound_connections;
2044 return count; */
2045}
2046/* }}} */
2047
2048int create_all_outbound_connections (void) /* {{{ */ {
2049 return create_all_outbound_connections_limited (0x7fffffff);
2050}
2051/* }}} */
2052
2053/* {{{ conn_target_get_connection */
2054static void check_connection (connection_job_t C, void *x) {
2055 connection_job_t *P = x;
2056 if (*P) { return; }
2057
2058 int r = CONN_INFO (C)->type->check_ready (C);
2059
2060 if (r == cr_ok) {
2061 *P = C;
2062 return;
2063 }
2064}
2065
2066static void check_connection_stopped (connection_job_t C, void *x) {
2067 connection_job_t *P = x;
2068
2069 if (*P && CONN_INFO (*P)->ready == cr_ok) { return; }
2070
2071 int r = CONN_INFO (C)->type->check_ready (C);
2072
2073 if (r == cr_ok) {
2074 *P = C;
2075 return;
2076 }
2077
2078 if (r == cr_stopped && (!*P || CONN_INFO (*P)->unreliability > CONN_INFO (C)->unreliability)) {
2079 *P = C;
2080 return;
2081 }
2082}
2083
2084connection_job_t conn_target_get_connection (conn_target_job_t CT, int allow_stopped) {
2085 assert (CT);
2086
2087 struct conn_target_info *t = CONN_TARGET_INFO (CT);
2088
2089 struct tree_connection *T = get_tree_ptr_connection (&t->conn_tree);
2090
2091 connection_job_t S = NULL;
2092 tree_act_ex_connection (T, allow_stopped ? check_connection_stopped : check_connection, &S);
2093
2094 if (S) { job_incref (S); }
2095 tree_free_connection (T);
2096
2097 return S;
2098}
2099/* }}} */
2100
2101void insert_free_later_struct (struct free_later *F) {
2102 if (!free_later_queue) {
2103 free_later_queue = alloc_mp_queue_w ();
2104 }
2105 mpq_push_w (free_later_queue, F, 0);
2106 MODULE_STAT->free_later_size ++;
2107 MODULE_STAT->free_later_total ++;
2108}
2109
2110void free_later_act (void) {
2111 if (!free_later_queue) { return; }
2112 while (1) {
2113 struct free_later *F = mpq_pop_nw (free_later_queue, 4);
2114 if (!F) { return; }
2115 MODULE_STAT->free_later_size --;
2116 F->free (F->ptr);
2117 free (F);
2118 }
2119}
2120
2121void free_connection_tree_ptr (struct tree_connection *T) /* {{{ */ {
2122 free_tree_ptr_connection (T);
2123}
2124/* }}} */
2125
2126
2127void incr_active_dh_connections (void) {
2128 MODULE_STAT->active_dh_connections ++;
2129}
2130
2131int new_conn_generation (void) {
2132 return __sync_fetch_and_add (&conn_generation, 1);
2133}
2134
2135int get_cur_conn_generation (void) {
2136 return conn_generation;
2137}
2138
2139// -----
2140
2141int nat_info_rules;
2142unsigned nat_info[MAX_NAT_INFO_RULES][2];
2143
2144int net_add_nat_info (char *str) {
2145 char *str2 = strrchr (str, ':');
2146 if (!str2) {
2147 fprintf (stderr, "expected <local-addr>:<global-addr> in --nat-info\n");
2148 return -1;
2149 }
2150 *str2++ = 0;
2151 struct in_addr l_addr, g_addr;
2152 if (inet_pton (AF_INET, str, &l_addr) <= 0) {
2153 fprintf (stderr, "cannot translate host '%s' in --nat-info\n", str);
2154 return -1;
2155 }
2156 if (inet_pton (AF_INET, str2, &g_addr) <= 0) {
2157 fprintf (stderr, "cannot translate host '%s' in --nat-info\n", str2);
2158 return -1;
2159 }
2160 if (nat_info_rules >= MAX_NAT_INFO_RULES) {
2161 fprintf (stderr, "too many rules in --nat-info\n");
2162 return -1;
2163 }
2164 nat_info[nat_info_rules][0] = ntohl (l_addr.s_addr);
2165 nat_info[nat_info_rules][1] = ntohl (g_addr.s_addr);
2166 return nat_info_rules++;
2167}
2168
2169unsigned nat_translate_ip (unsigned local_ip) {
2170 int i;
2171 vkprintf (6, "nat_info: %d rules\n", nat_info_rules);
2172 for (i = 0; i < nat_info_rules; i++) {
2173 vkprintf (6, "nat_info rule #%d: %s to %s\n", i, show_ip (nat_info[i][0]), show_ip (nat_info[i][1]));
2174 if (nat_info[i][0] == local_ip) {
2175 vkprintf (4, "translating ip by nat_info rules: %s to %s\n", show_ip (local_ip), show_ip (nat_info[i][1]));
2176 return nat_info[i][1];
2177 }
2178 }
2179 return local_ip;
2180}
2181