1 | /* |
2 | This file is part of Mtproto-proxy Library. |
3 | |
4 | Mtproto-proxy Library is free software: you can redistribute it and/or modify |
5 | it under the terms of the GNU Lesser General Public License as published by |
6 | the Free Software Foundation, either version 2 of the License, or |
7 | (at your option) any later version. |
8 | |
9 | Mtproto-proxy Library is distributed in the hope that it will be useful, |
10 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | GNU Lesser General Public License for more details. |
13 | |
14 | You should have received a copy of the GNU Lesser General Public License |
15 | along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>. |
16 | |
17 | Copyright 2009-2013 Vkontakte Ltd |
18 | 2008-2013 Nikolai Durov |
19 | 2008-2013 Andrey Lopatin |
20 | |
21 | Copyright 2014 Telegram Messenger Inc |
22 | 2014 Nikolai Durov |
23 | 2014 Andrey Lopatin |
24 | |
25 | Copyright 2015-2016 Telegram Messenger Inc |
26 | 2015-2016 Vitaly Valtman |
27 | */ |
28 | |
29 | #define _FILE_OFFSET_BITS 64 |
30 | |
31 | #include <arpa/inet.h> |
32 | #include <assert.h> |
33 | #include <errno.h> |
34 | #include <fcntl.h> |
35 | #include <math.h> |
36 | #include <netinet/in.h> |
37 | #include <netinet/tcp.h> |
38 | #include <pthread.h> |
39 | #include <stddef.h> |
40 | #include <stdio.h> |
41 | #include <stdlib.h> |
42 | #include <string.h> |
43 | #include <sys/epoll.h> |
44 | #include <sys/socket.h> |
45 | #include <sys/uio.h> |
46 | #include <time.h> |
47 | #include <unistd.h> |
48 | |
49 | #include "crc32.h" |
50 | #include "jobs/jobs.h" |
51 | #include "net/net-events.h" |
52 | //#include "net/net-buffers.h" |
53 | #include "kprintf.h" |
54 | #include "precise-time.h" |
55 | #include "server-functions.h" |
56 | #include "net/net-connections.h" |
57 | #include "net/net-config.h" |
58 | #include "vv/vv-io.h" |
59 | #include "vv/vv-tree.h" |
60 | #include "pid.h" |
61 | #include "common/mp-queue.h" |
62 | |
63 | #include "net/net-msg-buffers.h" |
64 | #include "net/net-tcp-connections.h" |
65 | |
66 | #include "common/common-stats.h" |
67 | |
68 | //struct process_id PID; |
69 | |
70 | #define USE_EPOLLET 1 |
71 | #define MAX_RECONNECT_INTERVAL 20 |
72 | |
73 | #define MODULE connections |
74 | |
75 | static int max_accept_rate; |
76 | static double cur_accept_rate_remaining; |
77 | static double cur_accept_rate_time; |
78 | static int max_connection; |
79 | static int conn_generation; |
80 | static int max_connection_fd = MAX_CONNECTIONS; |
81 | |
82 | int active_special_connections, max_special_connections = MAX_CONNECTIONS; |
83 | |
84 | int special_listen_sockets; |
85 | |
86 | static struct { |
87 | int fd, generation; |
88 | } special_socket[MAX_SPECIAL_LISTEN_SOCKETS]; |
89 | |
90 | static struct mp_queue *free_later_queue; |
91 | |
92 | |
93 | MODULE_STAT_TYPE { |
94 | int active_connections, active_dh_connections; |
95 | int outbound_connections, active_outbound_connections, ready_outbound_connections, listening_connections; |
96 | int allocated_outbound_connections, allocated_inbound_connections; |
97 | int inbound_connections, active_inbound_connections; |
98 | |
99 | long long outbound_connections_created, inbound_connections_accepted; |
100 | int ready_targets; |
101 | |
102 | long long netw_queries, netw_update_queries, total_failed_connections, total_connect_failures, unused_connections_closed; |
103 | |
104 | int allocated_targets, active_targets, inactive_targets, free_targets; |
105 | int allocated_connections, allocated_socket_connections; |
106 | long long accept_calls_failed, accept_nonblock_set_failed, accept_connection_limit_failed, |
107 | accept_rate_limit_failed, accept_init_accepted_failed; |
108 | |
109 | long long tcp_readv_calls, tcp_writev_calls, tcp_readv_intr, tcp_writev_intr; |
110 | long long tcp_readv_bytes, tcp_writev_bytes; |
111 | |
112 | int free_later_size; |
113 | long long free_later_total; |
114 | }; |
115 | |
116 | MODULE_INIT |
117 | |
118 | MODULE_STAT_FUNCTION |
119 | SB_SUM_ONE_I (active_connections); |
120 | SB_SUM_ONE_I (active_dh_connections); |
121 | |
122 | SB_SUM_ONE_I (outbound_connections); |
123 | SB_SUM_ONE_I (ready_outbound_connections); |
124 | SB_SUM_ONE_I (active_outbound_connections); |
125 | SB_SUM_ONE_LL (outbound_connections_created); |
126 | SB_SUM_ONE_LL (total_connect_failures); |
127 | |
128 | SB_SUM_ONE_I (inbound_connections); |
129 | //SB_SUM_ONE_I (ready_inbound_connections); |
130 | SB_SUM_ONE_I (active_inbound_connections); |
131 | SB_SUM_ONE_LL (inbound_connections_accepted); |
132 | |
133 | SB_SUM_ONE_I (listening_connections); |
134 | SB_SUM_ONE_LL (unused_connections_closed); |
135 | SB_SUM_ONE_I (ready_targets); |
136 | SB_SUM_ONE_I (allocated_targets); |
137 | SB_SUM_ONE_I (active_targets); |
138 | SB_SUM_ONE_I (inactive_targets); |
139 | SB_SUM_ONE_I (free_targets); |
140 | sb_printf (sb, |
141 | "max_connections\t%d\n" |
142 | "active_special_connections\t%d\n" |
143 | "max_special_connections\t%d\n" |
144 | , |
145 | max_connection_fd, |
146 | active_special_connections, |
147 | max_special_connections |
148 | ); |
149 | SBP_PRINT_I32(max_accept_rate); |
150 | SBP_PRINT_DOUBLE(cur_accept_rate_remaining); |
151 | SBP_PRINT_I32(max_connection); |
152 | SBP_PRINT_I32(conn_generation); |
153 | |
154 | SB_SUM_ONE_I (allocated_connections); |
155 | SB_SUM_ONE_I (allocated_outbound_connections); |
156 | SB_SUM_ONE_I (allocated_inbound_connections); |
157 | SB_SUM_ONE_I (allocated_socket_connections); |
158 | SB_SUM_ONE_LL (tcp_readv_calls); |
159 | SB_SUM_ONE_LL (tcp_readv_intr); |
160 | SB_SUM_ONE_LL (tcp_readv_bytes); |
161 | SB_SUM_ONE_LL (tcp_writev_calls); |
162 | SB_SUM_ONE_LL (tcp_writev_intr); |
163 | SB_SUM_ONE_LL (tcp_writev_bytes); |
164 | SB_SUM_ONE_I (free_later_size); |
165 | SB_SUM_ONE_LL (free_later_total); |
166 | |
167 | SB_SUM_ONE_LL (accept_calls_failed); |
168 | SB_SUM_ONE_LL (accept_nonblock_set_failed); |
169 | SB_SUM_ONE_LL (accept_connection_limit_failed); |
170 | SB_SUM_ONE_LL (accept_rate_limit_failed); |
171 | SB_SUM_ONE_LL (accept_init_accepted_failed); |
172 | MODULE_STAT_FUNCTION_END |
173 | |
174 | void fetch_connections_stat (struct connections_stat *st) { |
175 | #define COLLECT_I(__x) st->__x = SB_SUM_I (__x); |
176 | #define COLLECT_LL(__x) st->__x = SB_SUM_LL (__x); |
177 | COLLECT_I (active_connections); |
178 | COLLECT_I (active_dh_connections); |
179 | COLLECT_I (outbound_connections); |
180 | COLLECT_I (active_outbound_connections); |
181 | COLLECT_I (ready_outbound_connections); |
182 | st->max_special_connections = max_special_connections; |
183 | st->active_special_connections = active_special_connections; |
184 | COLLECT_I (allocated_connections); |
185 | COLLECT_I (allocated_outbound_connections); |
186 | COLLECT_I (allocated_inbound_connections); |
187 | COLLECT_I (allocated_socket_connections); |
188 | COLLECT_I (allocated_targets); |
189 | COLLECT_I (ready_targets); |
190 | COLLECT_I (active_targets); |
191 | COLLECT_I (inactive_targets); |
192 | COLLECT_LL (tcp_readv_calls); |
193 | COLLECT_LL (tcp_readv_intr); |
194 | COLLECT_LL (tcp_readv_bytes); |
195 | COLLECT_LL (tcp_writev_calls); |
196 | COLLECT_LL (tcp_writev_intr); |
197 | COLLECT_LL (tcp_writev_bytes); |
198 | COLLECT_LL (accept_calls_failed); |
199 | COLLECT_LL (accept_nonblock_set_failed); |
200 | COLLECT_LL (accept_rate_limit_failed); |
201 | COLLECT_LL (accept_init_accepted_failed); |
202 | COLLECT_LL (accept_connection_limit_failed); |
203 | #undef COLLECT_I |
204 | #undef COLLECT_LL |
205 | } |
206 | |
207 | void connection_event_incref (int fd, long long val); |
208 | |
209 | void tcp_set_max_accept_rate (int rate) { |
210 | max_accept_rate = rate; |
211 | } |
212 | |
213 | int set_write_timer (connection_job_t C); |
214 | |
215 | int prealloc_tcp_buffers (void); |
216 | int clear_connection_write_timeout (connection_job_t c); |
217 | |
218 | static int tcp_recv_buffers_num; |
219 | static int tcp_recv_buffers_total_size; |
220 | static struct iovec tcp_recv_iovec[MAX_TCP_RECV_BUFFERS + 1]; |
221 | static struct msg_buffer *tcp_recv_buffers[MAX_TCP_RECV_BUFFERS]; |
222 | |
223 | int prealloc_tcp_buffers (void) /* {{{ */ { |
224 | assert (!tcp_recv_buffers_num); |
225 | |
226 | int i; |
227 | for (i = MAX_TCP_RECV_BUFFERS - 1; i >= 0; i--) { |
228 | struct msg_buffer *X = alloc_msg_buffer ((tcp_recv_buffers_num) ? tcp_recv_buffers[i + 1] : 0, TCP_RECV_BUFFER_SIZE); |
229 | if (!X) { |
230 | vkprintf (0, "**FATAL**: cannot allocate tcp receive buffer\n" ); |
231 | exit (2); |
232 | } |
233 | vkprintf (3, "allocated %d byte tcp receive buffer #%d at %p\n" , X->chunk->buffer_size, i, X); |
234 | tcp_recv_buffers[i] = X; |
235 | tcp_recv_iovec[i + 1].iov_base = X->data; |
236 | tcp_recv_iovec[i + 1].iov_len = X->chunk->buffer_size; |
237 | ++ tcp_recv_buffers_num; |
238 | tcp_recv_buffers_total_size += X->chunk->buffer_size; |
239 | } |
240 | return tcp_recv_buffers_num; |
241 | } |
242 | /* }}} */ |
243 | |
244 | int tcp_prepare_iovec (struct iovec *iov, int *iovcnt, int maxcnt, struct raw_message *raw) /* {{{ */ { |
245 | int t = rwm_prepare_iovec (raw, iov, maxcnt, raw->total_bytes); |
246 | if (t < 0) { |
247 | *iovcnt = maxcnt; |
248 | int i; |
249 | t = 0; |
250 | for (i = 0; i < maxcnt; i++) { |
251 | t += iov[i].iov_len; |
252 | } |
253 | assert (t < raw->total_bytes); |
254 | return t; |
255 | } else { |
256 | *iovcnt = t; |
257 | return raw->total_bytes; |
258 | } |
259 | } |
260 | /* }}} */ |
261 | |
262 | void assert_main_thread (void) {} |
263 | void assert_net_cpu_thread (void) {} |
264 | void assert_net_net_thread (void) {} |
265 | void assert_engine_thread (void) { |
266 | assert (this_job_thread && (this_job_thread->thread_class == JC_ENGINE || this_job_thread->thread_class == JC_MAIN)); |
267 | } |
268 | |
269 | socket_connection_job_t alloc_new_socket_connection (connection_job_t C); |
270 | |
271 | #define X_TYPE connection_job_t |
272 | #define X_CMP(a,b) (((a) < (b)) ? -1 : ((a) > (b)) ? 1 : 0) |
273 | #define TREE_NAME connection |
274 | #define TREE_MALLOC |
275 | #define TREE_PTHREAD |
276 | #define TREE_INCREF job_incref |
277 | #define TREE_DECREF job_decref_f |
278 | #include "vv/vv-tree.c" |
279 | |
280 | static inline int connection_is_active (int flags) { |
281 | return (flags & C_CONNECTED) && !(flags & C_READY_PENDING); |
282 | } |
283 | |
284 | /* {{{ compute_conn_events */ |
285 | #if USE_EPOLLET |
286 | static inline int compute_conn_events (socket_connection_job_t c) { |
287 | unsigned flags = SOCKET_CONN_INFO(c)->flags; |
288 | if (flags & C_ERROR) { |
289 | return 0; |
290 | } else { |
291 | return EVT_READ | EVT_WRITE | EVT_SPEC; |
292 | } |
293 | } |
294 | #else |
295 | static inline int compute_conn_events (connection_job_t c) { |
296 | unsigned flags = CONN_INFO(c)->flags; |
297 | if (flags & (C_ERROR | C_FAILED | C_NET_FAILED)) { |
298 | return 0; |
299 | } |
300 | return (((flags & (C_WANTRD | C_STOPREAD)) == C_WANTRD) ? EVT_READ : 0) | (flags & C_WANTWR ? EVT_WRITE : 0) | EVT_SPEC |
301 | | (((flags & (C_WANTRD | C_NORD)) == (C_WANTRD | C_NORD)) |
302 | || ((flags & (C_WANTWR | C_NOWR)) == (C_WANTWR | C_NOWR)) ? EVT_LEVEL : 0); |
303 | } |
304 | #endif |
305 | /* }}} */ |
306 | |
307 | void connection_write_close (connection_job_t C) /* {{{ */ { |
308 | struct connection_info *c = CONN_INFO (C); |
309 | if (c->status == conn_working) { |
310 | socket_connection_job_t S = c->io_conn; |
311 | if (S) { |
312 | __sync_fetch_and_or (&SOCKET_CONN_INFO(S)->flags, C_STOPREAD); |
313 | } |
314 | __sync_fetch_and_or (&c->flags, C_STOPREAD); |
315 | c->status = conn_write_close; |
316 | |
317 | job_signal (JOB_REF_CREATE_PASS (C), JS_RUN); |
318 | } |
319 | } |
320 | /* }}} */ |
321 | |
322 | /* qack {{{ */ |
323 | static inline void disable_qack (int fd) { |
324 | vkprintf (2, "disable TCP_QUICKACK for %d\n" , fd); |
325 | assert (setsockopt (fd, IPPROTO_TCP, TCP_QUICKACK, (int[]){0}, sizeof (int)) >= 0); |
326 | } |
327 | |
328 | static inline void cond_disable_qack (socket_connection_job_t C) { |
329 | struct socket_connection_info *c = SOCKET_CONN_INFO (C); |
330 | if (c->flags & C_NOQACK) { |
331 | disable_qack (c->fd); |
332 | } |
333 | } |
334 | /* }}} */ |
335 | |
336 | /* {{{ cork |
337 | static inline void cond_reset_cork (connection_job_t c) { |
338 | if (c->flags & C_NOQACK) { |
339 | vkprintf (2, "disable TCP_CORK for %d\n", c->fd); |
340 | assert (setsockopt (c->fd, IPPROTO_TCP, TCP_CORK, (int[]){0}, sizeof (int)) >= 0); |
341 | vkprintf (2, "enable TCP_CORK for %d\n", c->fd); |
342 | assert (setsockopt (c->fd, IPPROTO_TCP, TCP_CORK, (int[]){1}, sizeof (int)) >= 0); |
343 | } |
344 | } |
345 | }}} */ |
346 | |
347 | |
348 | |
349 | /* {{{ CPU PART OF CONNECTION */ |
350 | |
351 | /* {{{ TIMEOUT */ |
352 | int set_connection_timeout (connection_job_t C, double timeout) /* {{{ */ { |
353 | struct connection_info *c = CONN_INFO (C); |
354 | |
355 | if (c->flags & C_ERROR) { return 0; } |
356 | |
357 | __sync_fetch_and_and (&c->flags, ~C_ALARM); |
358 | |
359 | if (timeout > 0) { |
360 | job_timer_insert (C, precise_now + timeout); |
361 | return 0; |
362 | } else { |
363 | job_timer_remove (C); |
364 | return 0; |
365 | } |
366 | } |
367 | /* }}} */ |
368 | |
369 | int clear_connection_timeout (connection_job_t C) /* {{{ */ { |
370 | set_connection_timeout (C, 0); |
371 | return 0; |
372 | } |
373 | /* }}} */ |
374 | |
375 | /* }}} */ |
376 | |
377 | |
378 | /* |
379 | can be called from any thread and without lock |
380 | just sets error code and sends JS_ABORT to connection job |
381 | */ |
382 | void fail_connection (connection_job_t C, int err) /* {{{ */ { |
383 | struct connection_info *c = CONN_INFO (C); |
384 | |
385 | if (!(__sync_fetch_and_or (&c->flags, C_ERROR) & C_ERROR)) { |
386 | c->status = conn_error; |
387 | if (c->error >= 0) { |
388 | c->error = err; |
389 | } |
390 | |
391 | job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT); |
392 | } |
393 | } |
394 | /* }}} */ |
395 | |
396 | /* |
397 | just runs ->reader and ->writer virtual methods |
398 | */ |
399 | int cpu_server_read_write (connection_job_t C) /* {{{ */ { |
400 | struct connection_info *c = CONN_INFO (C); |
401 | |
402 | c->type->reader (C); |
403 | c->type->writer (C); |
404 | return 0; |
405 | } |
406 | /* }}} */ |
407 | |
408 | /* |
409 | frees connection structure, including mpq and buffers |
410 | */ |
411 | int cpu_server_free_connection (connection_job_t C) /* {{{ */ { |
412 | assert_net_cpu_thread (); |
413 | assert (C->j_refcnt == 1); |
414 | |
415 | struct connection_info *c = CONN_INFO (C); |
416 | if (!(c->flags & C_ERROR)) { |
417 | vkprintf (0, "target = %p, basic=%d\n" , c->target, c->basic_type); |
418 | } |
419 | assert (c->flags & C_ERROR); |
420 | assert (c->flags & C_FAILED); |
421 | assert (!c->target); |
422 | assert (!c->io_conn); |
423 | |
424 | vkprintf (1, "Closing connection socket #%d\n" , c->fd); |
425 | |
426 | while (1) { |
427 | struct raw_message *raw = mpq_pop_nw (c->out_queue, 4); |
428 | if (!raw) { break; } |
429 | rwm_free (raw); |
430 | free (raw); |
431 | } |
432 | |
433 | free_mp_queue (c->out_queue); |
434 | c->out_queue = NULL; |
435 | |
436 | while (1) { |
437 | struct raw_message *raw = mpq_pop_nw (c->in_queue, 4); |
438 | if (!raw) { break; } |
439 | rwm_free (raw); |
440 | free (raw); |
441 | } |
442 | |
443 | free_mp_queue (c->in_queue); |
444 | c->in_queue = NULL; |
445 | |
446 | if (c->type->crypto_free) { |
447 | c->type->crypto_free (C); |
448 | } |
449 | |
450 | close (c->fd); |
451 | c->fd = -1; |
452 | |
453 | MODULE_STAT->allocated_connections --; |
454 | if (c->basic_type == ct_outbound) { |
455 | MODULE_STAT->allocated_outbound_connections --; |
456 | } |
457 | if (c->basic_type == ct_inbound) { |
458 | MODULE_STAT->allocated_inbound_connections --; |
459 | } |
460 | |
461 | return c->type->free_buffers (C); |
462 | } |
463 | /* }}} */ |
464 | |
465 | /* |
466 | deletes link to io_conn |
467 | deletes link to target |
468 | aborts pending queries |
469 | updates stats |
470 | */ |
471 | int cpu_server_close_connection (connection_job_t C, int who) /* {{{ */ { |
472 | assert_net_cpu_thread (); |
473 | struct connection_info *c = CONN_INFO(C); |
474 | |
475 | assert (c->flags & C_ERROR); |
476 | assert (c->status == conn_error); |
477 | assert (c->flags & C_FAILED); |
478 | |
479 | if (c->error != -17) { |
480 | MODULE_STAT->total_failed_connections ++; |
481 | if (!connection_is_active (c->flags)) { |
482 | MODULE_STAT->total_connect_failures ++; |
483 | } |
484 | } else { |
485 | MODULE_STAT->unused_connections_closed ++; |
486 | } |
487 | |
488 | if (c->flags & C_ISDH) { |
489 | MODULE_STAT->active_dh_connections --; |
490 | __sync_fetch_and_and (&c->flags, ~C_ISDH); |
491 | } |
492 | |
493 | assert (c->io_conn); |
494 | job_signal (JOB_REF_PASS (c->io_conn), JS_ABORT); |
495 | |
496 | if (c->target) { |
497 | MODULE_STAT->outbound_connections --; |
498 | |
499 | if (connection_is_active (c->flags)) { |
500 | MODULE_STAT->active_outbound_connections --; |
501 | } |
502 | |
503 | job_signal (JOB_REF_PASS (c->target), JS_RUN); |
504 | } else { |
505 | MODULE_STAT->inbound_connections --; |
506 | |
507 | if (connection_is_active (c->flags)) { |
508 | MODULE_STAT->active_inbound_connections --; |
509 | } |
510 | } |
511 | |
512 | if (connection_is_active (c->flags)) { |
513 | MODULE_STAT->active_connections --; |
514 | } |
515 | |
516 | if (c->flags & C_SPECIAL) { |
517 | c->flags &= ~C_SPECIAL; |
518 | int orig_special_connections = __sync_fetch_and_add (&active_special_connections, -1); |
519 | if (orig_special_connections == max_special_connections) { |
520 | int i; |
521 | for (i = 0; i < special_listen_sockets; i++) { |
522 | connection_job_t LC = connection_get_by_fd_generation (special_socket[i].fd, special_socket[i].generation); |
523 | assert (LC); |
524 | job_signal (JOB_REF_PASS (LC), JS_AUX); |
525 | } |
526 | } |
527 | } |
528 | |
529 | job_timer_remove (C); |
530 | return 0; |
531 | } |
532 | /* }}} */ |
533 | |
534 | int do_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ { |
535 | connection_job_t C = job; |
536 | |
537 | struct connection_info *c = CONN_INFO (C); |
538 | |
539 | if (op == JS_RUN) { // RUN IN NET-CPU THREAD |
540 | assert_net_cpu_thread (); |
541 | if (!(c->flags & C_ERROR)) { |
542 | if (c->flags & C_READY_PENDING) { |
543 | assert (c->flags & C_CONNECTED); |
544 | __sync_fetch_and_and (&c->flags, ~C_READY_PENDING); |
545 | MODULE_STAT->active_outbound_connections ++; |
546 | MODULE_STAT->active_connections ++; |
547 | __sync_fetch_and_add (&CONN_TARGET_INFO(c->target)->active_outbound_connections, 1); |
548 | if (c->status == conn_connecting) { |
549 | if (!__sync_bool_compare_and_swap (&c->status, conn_connecting, conn_working)) { |
550 | assert (c->status == conn_error); |
551 | } |
552 | } |
553 | c->type->connected (C); |
554 | } |
555 | c->type->read_write (C); |
556 | } |
557 | return 0; |
558 | } |
559 | if (op == JS_ALARM) { // RUN IN NET-CPU THREAD |
560 | if (!job_timer_check (job)) { |
561 | return 0; |
562 | } |
563 | if (!(c->flags & C_ERROR)) { |
564 | c->type->alarm (C); |
565 | } |
566 | return 0; |
567 | } |
568 | if (op == JS_ABORT) { // RUN IN NET-CPU THREAD |
569 | assert (c->flags & C_ERROR); |
570 | if (!(__sync_fetch_and_or (&c->flags, C_FAILED) & C_FAILED)) { |
571 | c->type->close (C, 0); |
572 | } |
573 | return JOB_COMPLETED; |
574 | } |
575 | if (op == JS_FINISH) { // RUN IN NET-CPU THREAD |
576 | assert (C->j_refcnt == 1); |
577 | c->type->free (C); |
578 | return job_free (JOB_REF_PASS (C)); |
579 | } |
580 | return JOB_ERROR; |
581 | } |
582 | /* }}} */ |
583 | |
584 | /* |
585 | allocates inbound or outbound connection |
586 | runs init_accepted or init_outbound |
587 | updates stats |
588 | creates socket_connection |
589 | */ |
590 | connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening_connection_job_t LCJ, unsigned peer, unsigned char peer_ipv6[16], int peer_port) /* {{{ */ { |
591 | if (cfd < 0) { |
592 | return NULL; |
593 | } |
594 | assert_main_thread (); |
595 | |
596 | struct conn_target_info *CT = CTJ ? CONN_TARGET_INFO (CTJ) : NULL; |
597 | struct listening_connection_info *LC = LCJ ? LISTEN_CONN_INFO (LCJ) : NULL; |
598 | |
599 | unsigned flags; |
600 | if ((flags = fcntl (cfd, F_GETFL, 0) < 0) || fcntl (cfd, F_SETFL, flags | O_NONBLOCK) < 0) { |
601 | kprintf ("cannot set O_NONBLOCK on accepted socket %d: %m\n" , cfd); |
602 | MODULE_STAT->accept_nonblock_set_failed ++; |
603 | close (cfd); |
604 | return NULL; |
605 | } |
606 | |
607 | flags = 1; |
608 | setsockopt (cfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof (flags)); |
609 | if (tcp_maximize_buffers) { |
610 | maximize_sndbuf (cfd, 0); |
611 | maximize_rcvbuf (cfd, 0); |
612 | } |
613 | |
614 | if (cfd >= max_connection_fd) { |
615 | vkprintf (2, "cfd = %d, max_connection_fd = %d\n" , cfd, max_connection_fd); |
616 | MODULE_STAT->accept_connection_limit_failed ++; |
617 | close (cfd); |
618 | return NULL; |
619 | } |
620 | |
621 | if (cfd > max_connection) { |
622 | max_connection = cfd; |
623 | } |
624 | |
625 | connection_job_t C = create_async_job (do_connection_job, JSC_ALLOW (JC_CONNECTION, JS_RUN) | JSC_ALLOW (JC_CONNECTION, JS_ALARM) | JSC_ALLOW (JC_CONNECTION, JS_ABORT) | JSC_ALLOW (JC_CONNECTION, JS_FINISH), -2, sizeof (struct connection_info), JT_HAVE_TIMER, JOB_REF_NULL); |
626 | |
627 | struct connection_info *c = CONN_INFO (C); |
628 | //memset (c, 0, sizeof (*c)); /* no need, create_async_job memsets itself */ |
629 | |
630 | c->fd = cfd; |
631 | c->target = CTJ; |
632 | c->generation = new_conn_generation (); |
633 | |
634 | c->flags = 0;//SS ? C_WANTWR : C_WANTRD; |
635 | if (LC) { |
636 | c->flags = C_CONNECTED; |
637 | } |
638 | |
639 | int raw = C_RAWMSG; |
640 | |
641 | if (raw) { |
642 | c->flags |= C_RAWMSG; |
643 | rwm_init (&c->in, 0); |
644 | rwm_init (&c->out, 0); |
645 | rwm_init (&c->in_u, 0); |
646 | rwm_init (&c->out_p, 0); |
647 | } else { |
648 | assert (0); |
649 | } |
650 | |
651 | c->type = CT ? CT->type : LC->type; |
652 | c->extra = CT ? CT->extra : LC->extra; |
653 | assert (c->type); |
654 | |
655 | c->basic_type = CT ? ct_outbound : ct_inbound; |
656 | c->status = CT ? conn_connecting : conn_working; |
657 | |
658 | c->flags |= c->type->flags & C_EXTERNAL; |
659 | if (LC) { |
660 | c->flags |= LC->flags & C_EXTERNAL; |
661 | } |
662 | |
663 | union sockaddr_in46 self; |
664 | unsigned self_addrlen = sizeof (self); |
665 | memset (&self, 0, sizeof (self)); |
666 | getsockname (cfd, (struct sockaddr *) &self, &self_addrlen); |
667 | |
668 | if (self.a4.sin_family == AF_INET) { |
669 | assert (self_addrlen == sizeof (struct sockaddr_in)); |
670 | c->our_ip = ntohl (self.a4.sin_addr.s_addr); |
671 | c->our_port = ntohs (self.a4.sin_port); |
672 | assert (peer); |
673 | c->remote_ip = peer; |
674 | } else { |
675 | assert (self.a6.sin6_family == AF_INET6); |
676 | assert (!peer); |
677 | if (is_4in6 (peer_ipv6)) { |
678 | assert (is_4in6 (self.a6.sin6_addr.s6_addr)); |
679 | c->our_ip = ntohl (extract_4in6 (self.a6.sin6_addr.s6_addr)); |
680 | c->our_port = ntohs (self.a6.sin6_port); |
681 | c->remote_ip = ntohl (extract_4in6 (peer_ipv6)); |
682 | } else { |
683 | memcpy (c->our_ipv6, self.a6.sin6_addr.s6_addr, 16); |
684 | c->our_port = ntohs (self.a6.sin6_port); |
685 | c->flags |= C_IPV6; |
686 | memcpy (c->remote_ipv6, peer_ipv6, 16); |
687 | } |
688 | } |
689 | c->remote_port = peer_port; |
690 | |
691 | c->in_queue = alloc_mp_queue_w (); |
692 | c->out_queue = alloc_mp_queue_w (); |
693 | //c->out_packet_queue = alloc_mp_queue_w (); |
694 | |
695 | if (CT) { |
696 | vkprintf (1, "New connection %s:%d -> %s:%d\n" , show_our_ip (C), c->our_port, show_remote_ip (C), c->remote_port); |
697 | } else { |
698 | vkprintf (1, "New connection %s:%d -> %s:%d\n" , show_remote_ip (C), c->remote_port, show_our_ip (C), c->our_port); |
699 | } |
700 | |
701 | |
702 | int (*func)(connection_job_t) = CT ? CT->type->init_outbound : LC->type->init_accepted; |
703 | |
704 | vkprintf (3, "func = %p\n" , func); |
705 | |
706 | |
707 | if (func (C) >= 0) { |
708 | if (CT) { |
709 | job_incref (CTJ); |
710 | |
711 | MODULE_STAT->outbound_connections ++; |
712 | MODULE_STAT->allocated_outbound_connections ++; |
713 | MODULE_STAT->outbound_connections_created ++; |
714 | |
715 | CT->outbound_connections ++; |
716 | } else { |
717 | MODULE_STAT->inbound_connections_accepted ++; |
718 | MODULE_STAT->allocated_inbound_connections ++; |
719 | MODULE_STAT->inbound_connections ++; |
720 | MODULE_STAT->active_inbound_connections ++; |
721 | MODULE_STAT->active_connections ++; |
722 | |
723 | c->listening = LC->fd; |
724 | c->listening_generation = LC->generation; |
725 | if (LC->flags & C_NOQACK) { |
726 | c->flags |= C_NOQACK; |
727 | } |
728 | |
729 | c->window_clamp = LC->window_clamp; |
730 | if (c->window_clamp) { |
731 | if (setsockopt (cfd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &c->window_clamp, 4) < 0) { |
732 | vkprintf (0, "error while setting window size for socket %d to %d: %m\n" , cfd, c->window_clamp); |
733 | } else { |
734 | int t1 = -1, t2 = -1; |
735 | socklen_t s1 = 4, s2 = 4; |
736 | getsockopt (cfd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &t1, &s1); |
737 | getsockopt (cfd, SOL_SOCKET, SO_RCVBUF, &t2, &s2); |
738 | vkprintf (2, "window clamp for socket %d is %d, receive buffer is %d\n" , cfd, t1, t2); |
739 | } |
740 | } |
741 | |
742 | if (LC->flags & C_SPECIAL) { |
743 | c->flags |= C_SPECIAL; |
744 | __sync_fetch_and_add (&active_special_connections, 1); |
745 | |
746 | if (active_special_connections > max_special_connections) { |
747 | vkprintf (active_special_connections >= max_special_connections + 16 ? 0 : 1, "ERROR: forced to accept connection when special connections limit was reached (%d of %d)\n" , active_special_connections, max_special_connections); |
748 | } |
749 | if (active_special_connections >= max_special_connections) { |
750 | vkprintf (2, "**Invoking epoll_remove(%d)\n" , LC->fd); |
751 | epoll_remove (LC->fd); |
752 | } |
753 | } |
754 | } |
755 | |
756 | alloc_new_socket_connection (C); |
757 | |
758 | MODULE_STAT->allocated_connections ++; |
759 | |
760 | return C; |
761 | } else { |
762 | MODULE_STAT->accept_init_accepted_failed ++; |
763 | if (c->flags & C_RAWMSG) { |
764 | rwm_free (&c->in); |
765 | rwm_free (&c->out); |
766 | rwm_free (&c->in_u); |
767 | rwm_free (&c->out_p); |
768 | } |
769 | c->basic_type = ct_none; |
770 | close (cfd); |
771 | |
772 | free_mp_queue (c->in_queue); |
773 | free_mp_queue (c->out_queue); |
774 | |
775 | job_free (JOB_REF_PASS (C)); |
776 | this_job_thread->jobs_active --; |
777 | |
778 | return NULL; |
779 | } |
780 | } |
781 | /* }}} */ |
782 | |
783 | /* }}} */ |
784 | |
785 | /* {{{ IO PART OF CONNECTION */ |
786 | |
787 | /* |
788 | Have to have lock on socket_connection to run this method |
789 | |
790 | removes event from evemt heap and epoll |
791 | */ |
792 | void fail_socket_connection (socket_connection_job_t C, int who) /* {{{ */ { |
793 | assert_main_thread (); |
794 | |
795 | struct socket_connection_info *c = SOCKET_CONN_INFO (C); |
796 | assert (C->j_flags & JF_LOCKED); |
797 | |
798 | if (!(__sync_fetch_and_or (&c->flags, C_ERROR) & C_ERROR)) { |
799 | job_timer_remove (C); |
800 | |
801 | remove_event_from_heap (c->ev, 0); |
802 | connection_event_incref (c->fd, -1); |
803 | epoll_insert (c->fd, 0); |
804 | c->ev = NULL; |
805 | |
806 | c->type->socket_close (C); |
807 | |
808 | fail_connection (c->conn, who); |
809 | } |
810 | } |
811 | /* }}} */ |
812 | |
813 | /* |
814 | Frees socket_connection structure |
815 | Removes link to cpu_connection |
816 | */ |
817 | int net_server_socket_free (socket_connection_job_t C) /* {{{ */ { |
818 | assert_net_net_thread (); |
819 | |
820 | struct socket_connection_info *c = SOCKET_CONN_INFO (C); |
821 | |
822 | assert (!c->ev); |
823 | assert (c->flags & C_ERROR); |
824 | |
825 | if (c->conn) { |
826 | fail_connection (c->conn, -201); |
827 | job_decref (JOB_REF_PASS (c->conn)); |
828 | } |
829 | |
830 | while (1) { |
831 | struct raw_message *raw = mpq_pop_nw (c->out_packet_queue, 4); |
832 | if (!raw) { break; } |
833 | rwm_free (raw); |
834 | free (raw); |
835 | } |
836 | |
837 | free_mp_queue (c->out_packet_queue); |
838 | |
839 | rwm_free (&c->out); |
840 | |
841 | MODULE_STAT->allocated_socket_connections --; |
842 | return 0; |
843 | } |
844 | /* }}} */ |
845 | |
846 | /* |
847 | Reads data from socket until all data is read |
848 | Then puts it to conn->in_queue and send JS_RUN signal |
849 | */ |
850 | int net_server_socket_reader (socket_connection_job_t C) /* {{{ */ { |
851 | assert_net_net_thread (); |
852 | struct socket_connection_info *c = SOCKET_CONN_INFO (C); |
853 | |
854 | while ((c->flags & (C_WANTRD | C_NORD | C_STOPREAD | C_ERROR | C_NET_FAILED)) == C_WANTRD) { |
855 | if (!tcp_recv_buffers_num) { |
856 | prealloc_tcp_buffers (); |
857 | } |
858 | |
859 | struct raw_message *in = malloc (sizeof (*in)); |
860 | rwm_init (in, 0); |
861 | |
862 | int s = tcp_recv_buffers_total_size; |
863 | assert (s > 0); |
864 | |
865 | int p = 1; |
866 | |
867 | __sync_fetch_and_or (&c->flags, C_NORD); |
868 | int r = readv (c->fd, tcp_recv_iovec + p, MAX_TCP_RECV_BUFFERS + 1 - p); |
869 | MODULE_STAT->tcp_readv_calls ++; |
870 | |
871 | if (r <= 0) { |
872 | if (r < 0 && errno == EAGAIN) { |
873 | } else if (r < 0 && errno == EINTR) { |
874 | __sync_fetch_and_and (&c->flags, ~C_NORD); |
875 | MODULE_STAT->tcp_readv_intr ++; |
876 | continue; |
877 | } else { |
878 | vkprintf (1, "Connection %d: Fatal error %m\n" , c->fd); |
879 | job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT); |
880 | __sync_fetch_and_or (&c->flags, C_NET_FAILED); |
881 | return 0; |
882 | } |
883 | } else { |
884 | __sync_fetch_and_and (&c->flags, ~C_NORD); |
885 | } |
886 | |
887 | if (verbosity > 0 && r < 0 && errno != EAGAIN) { |
888 | perror ("recv()" ); |
889 | } |
890 | vkprintf (2, "readv from %d: %d read out of %d\n" , c->fd, r, s); |
891 | |
892 | if (r <= 0) { |
893 | rwm_free (in); |
894 | free (in); |
895 | break; |
896 | } |
897 | |
898 | MODULE_STAT->tcp_readv_bytes += r; |
899 | struct msg_part *mp = 0; |
900 | assert (p == 1); |
901 | mp = new_msg_part (0, tcp_recv_buffers[p - 1]); |
902 | assert (tcp_recv_buffers[p - 1]->data == tcp_recv_iovec[p].iov_base); |
903 | mp->offset = 0; |
904 | mp->data_end = r > tcp_recv_iovec[p].iov_len ? tcp_recv_iovec[p].iov_len : r; |
905 | r -= mp->data_end; |
906 | in->first = in->last = mp; |
907 | in->total_bytes = mp->data_end; |
908 | in->first_offset = 0; |
909 | in->last_offset = mp->data_end; |
910 | p ++; |
911 | |
912 | int rs = r; |
913 | while (rs > 0) { |
914 | mp = new_msg_part (0, tcp_recv_buffers[p - 1]); |
915 | mp->offset = 0; |
916 | mp->data_end = rs > tcp_recv_iovec[p].iov_len ? tcp_recv_iovec[p].iov_len : rs; |
917 | rs -= mp->data_end; |
918 | in->last->next = mp; |
919 | in->last = mp; |
920 | in->last_offset = mp->data_end; |
921 | in->total_bytes += mp->data_end; |
922 | p ++; |
923 | } |
924 | assert (!rs); |
925 | |
926 | int i; |
927 | for (i = 0; i < p - 1; i++) { |
928 | struct msg_buffer *X = alloc_msg_buffer (tcp_recv_buffers[i], TCP_RECV_BUFFER_SIZE); |
929 | if (!X) { |
930 | vkprintf (0, "**FATAL**: cannot allocate tcp receive buffer\n" ); |
931 | assert (0); |
932 | } |
933 | tcp_recv_buffers[i] = X; |
934 | tcp_recv_iovec[i + 1].iov_base = X->data; |
935 | tcp_recv_iovec[i + 1].iov_len = X->chunk->buffer_size; |
936 | } |
937 | |
938 | assert (c->conn); |
939 | mpq_push_w (CONN_INFO(c->conn)->in_queue, in, 0); |
940 | job_signal (JOB_REF_CREATE_PASS (c->conn), JS_RUN); |
941 | } |
942 | return 0; |
943 | } |
944 | /* }}} */ |
945 | |
946 | /* |
947 | Get data from out raw message and writes it to socket |
948 | */ |
949 | int net_server_socket_writer (socket_connection_job_t C) /* {{{ */{ |
950 | assert_net_net_thread (); |
951 | struct socket_connection_info *c = SOCKET_CONN_INFO (C); |
952 | |
953 | struct raw_message *out = &c->out; |
954 | |
955 | int check_watermark = out->total_bytes >= c->write_low_watermark; |
956 | int t = 0; |
957 | |
958 | int stop = c->flags & C_STOPWRITE; |
959 | |
960 | while ((c->flags & (C_WANTWR | C_NOWR | C_ERROR | C_NET_FAILED)) == C_WANTWR) { |
961 | if (!out->total_bytes) { |
962 | __sync_fetch_and_and (&c->flags, ~C_WANTWR); |
963 | break; |
964 | } |
965 | |
966 | struct iovec iov[384]; |
967 | int iovcnt = -1; |
968 | |
969 | int s = tcp_prepare_iovec (iov, &iovcnt, sizeof (iov) / sizeof (iov[0]), out); |
970 | assert (iovcnt > 0 && s > 0); |
971 | |
972 | __sync_fetch_and_or (&c->flags, C_NOWR); |
973 | int r = writev (c->fd, iov, iovcnt); |
974 | MODULE_STAT->tcp_writev_calls ++; |
975 | |
976 | if (r <= 0) { |
977 | if (r < 0 && errno == EAGAIN) { |
978 | if (++c->eagain_count > 100) { |
979 | kprintf ("Too much EAGAINs for connection %d (%s), dropping\n" , c->fd, show_remote_socket_ip (C)); |
980 | job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT); |
981 | __sync_fetch_and_or (&c->flags, C_NET_FAILED); |
982 | return 0; |
983 | } |
984 | } else if (r < 0 && errno == EINTR) { |
985 | __sync_fetch_and_and (&c->flags, ~C_NOWR); |
986 | MODULE_STAT->tcp_writev_intr ++; |
987 | continue; |
988 | } else { |
989 | vkprintf (1, "Connection %d: Fatal error %m\n" , c->fd); |
990 | job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT); |
991 | __sync_fetch_and_or (&c->flags, C_NET_FAILED); |
992 | return 0; |
993 | } |
994 | } else { |
995 | __sync_fetch_and_and (&c->flags, ~C_NOWR); |
996 | MODULE_STAT->tcp_writev_bytes += r; |
997 | c->eagain_count = 0; |
998 | t += r; |
999 | } |
1000 | |
1001 | if (verbosity && r < 0 && errno != EAGAIN) { |
1002 | perror ("writev()" ); |
1003 | } |
1004 | vkprintf (2, "send/writev() to %d: %d written out of %d in %d chunks\n" , c->fd, r, s, iovcnt); |
1005 | |
1006 | if (r > 0) { |
1007 | rwm_skip_data (out, r); |
1008 | if (c->type->data_sent) { |
1009 | c->type->data_sent (C, r); |
1010 | } |
1011 | } |
1012 | } |
1013 | |
1014 | if (check_watermark && out->total_bytes < c->write_low_watermark) { |
1015 | if (c->type->ready_to_write) { |
1016 | c->type->ready_to_write (C); |
1017 | } |
1018 | } |
1019 | |
1020 | if (stop && !(c->flags & C_WANTWR)) { |
1021 | vkprintf (1, "Closing write_close socket\n" ); |
1022 | job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT); |
1023 | __sync_fetch_and_or (&c->flags, C_NET_FAILED); |
1024 | } |
1025 | |
1026 | vkprintf (2, "socket_server_writer: written %d bytes to %d, flags=0x%08x\n" , t, c->fd, c->flags); |
1027 | return out->total_bytes; |
1028 | } |
1029 | /* }}} */ |
1030 | |
1031 | /* |
1032 | checks if outbound connections become connected |
1033 | merges contents of out_packet_queue mpq to out raw message |
1034 | runs socket_reader and socket_writer |
1035 | */ |
1036 | int net_server_socket_read_write (socket_connection_job_t C) /* {{{ */ { |
1037 | assert_net_net_thread (); |
1038 | |
1039 | struct socket_connection_info *c = SOCKET_CONN_INFO (C); |
1040 | |
1041 | if (c->flags & C_ERROR) { |
1042 | return 0; |
1043 | } |
1044 | |
1045 | if (!(c->flags & C_CONNECTED)) { |
1046 | if (!(c->flags & C_NOWR)) { |
1047 | __sync_fetch_and_and (&c->flags, C_PERMANENT); |
1048 | __sync_fetch_and_or (&c->flags, C_WANTRD | C_CONNECTED); |
1049 | __sync_fetch_and_or (&CONN_INFO(c->conn)->flags, C_READY_PENDING | C_CONNECTED); |
1050 | |
1051 | c->type->socket_connected (C); |
1052 | job_signal (JOB_REF_CREATE_PASS (c->conn), JS_RUN); |
1053 | } else { |
1054 | return compute_conn_events (C); |
1055 | } |
1056 | } |
1057 | |
1058 | vkprintf (2, "END processing connection %d, flags=%d\n" , c->fd, c->flags); |
1059 | |
1060 | while ((c->flags & (C_WANTRD | C_NORD | C_ERROR | C_STOPREAD | C_NET_FAILED)) == C_WANTRD) { |
1061 | c->type->socket_reader (C); |
1062 | } |
1063 | |
1064 | struct raw_message *out = &c->out; |
1065 | |
1066 | while (1) { |
1067 | struct raw_message *raw = mpq_pop_nw (c->out_packet_queue, 4); |
1068 | if (!raw) { break; } |
1069 | rwm_union (out, raw); |
1070 | free (raw); |
1071 | } |
1072 | |
1073 | if (out->total_bytes) { |
1074 | __sync_fetch_and_or (&c->flags, C_WANTWR); |
1075 | } |
1076 | |
1077 | while ((c->flags & (C_NOWR | C_ERROR | C_WANTWR | C_NET_FAILED)) == C_WANTWR) { |
1078 | c->type->socket_writer (C); |
1079 | } |
1080 | |
1081 | return compute_conn_events (C); |
1082 | } |
1083 | /* }}} */ |
1084 | |
1085 | /* |
1086 | removes C_NOWR and C_NORD flags if necessary |
1087 | reads errors from socket |
1088 | sends JS_RUN signal to socket_connection |
1089 | */ |
1090 | int net_server_socket_read_write_gateway (int fd, void *data, event_t *ev) /* {{{ */ { |
1091 | assert_main_thread (); |
1092 | if (!data) { return EVA_REMOVE; } |
1093 | |
1094 | assert ((int)ev->refcnt); |
1095 | |
1096 | socket_connection_job_t C = (socket_connection_job_t) data; |
1097 | assert (C); |
1098 | struct socket_connection_info *c = SOCKET_CONN_INFO (C); |
1099 | assert (c->type); |
1100 | |
1101 | if (ev->ready & EVT_FROM_EPOLL) { |
1102 | // update C_NORD / C_NOWR only if we arrived from epoll() |
1103 | vkprintf (2, "fd=%d state=%d ready=%d epoll_ready=%d\n" , ev->fd, ev->state, ev->ready, ev->epoll_ready); |
1104 | ev->ready &= ~EVT_FROM_EPOLL; |
1105 | |
1106 | int clear_flags = 0; |
1107 | if ((ev->state & EVT_READ) && (ev->ready & EVT_READ)) { |
1108 | clear_flags |= C_NORD; |
1109 | } |
1110 | if ((ev->state & EVT_WRITE) && (ev->ready & EVT_WRITE)) { |
1111 | clear_flags |= C_NOWR; |
1112 | } |
1113 | __sync_fetch_and_and (&c->flags, ~clear_flags); |
1114 | |
1115 | if (ev->epoll_ready & EPOLLERR) { |
1116 | int error = 0; |
1117 | socklen_t errlen = sizeof (error); |
1118 | if (getsockopt (c->fd, SOL_SOCKET, SO_ERROR, (void *) &error, &errlen) == 0) { |
1119 | vkprintf (1, "got error for tcp socket #%d, [%s]:%d : %s\n" , c->fd, show_remote_socket_ip (C), c->remote_port, strerror (error)); |
1120 | } |
1121 | |
1122 | job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT); |
1123 | return EVA_REMOVE; |
1124 | } |
1125 | if (ev->epoll_ready & (EPOLLHUP | EPOLLERR | EPOLLRDHUP | EPOLLPRI)) { |
1126 | vkprintf (!(ev->epoll_ready & EPOLLPRI), "socket %d: disconnected (epoll_ready=%02x), cleaning\n" , c->fd, ev->epoll_ready); |
1127 | |
1128 | job_signal (JOB_REF_CREATE_PASS (C), JS_ABORT); |
1129 | return EVA_REMOVE; |
1130 | } |
1131 | } |
1132 | |
1133 | job_signal (JOB_REF_CREATE_PASS (C), JS_RUN); |
1134 | return EVA_CONTINUE; |
1135 | } |
1136 | /* }}} */ |
1137 | |
1138 | int do_socket_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ { |
1139 | socket_connection_job_t C = job; |
1140 | |
1141 | struct socket_connection_info *c = SOCKET_CONN_INFO (C); |
1142 | |
1143 | if (op == JS_ABORT) { // MAIN THREAD |
1144 | fail_socket_connection (C, -200); |
1145 | return JOB_COMPLETED; |
1146 | } |
1147 | if (op == JS_RUN) { // IO THREAD |
1148 | if (!(c->flags & C_ERROR)) { |
1149 | int res = c->type->socket_read_write (job); |
1150 | if (res != c->current_epoll_status) { |
1151 | c->current_epoll_status = res; |
1152 | return JOB_SENDSIG (JS_AUX); |
1153 | } |
1154 | } |
1155 | return 0; |
1156 | } |
1157 | if (op == JS_AUX) { // MAIN THREAD |
1158 | if (!(c->flags & C_ERROR)) { |
1159 | epoll_insert (c->fd, compute_conn_events (job)); |
1160 | } |
1161 | return 0; |
1162 | } |
1163 | |
1164 | if (op == JS_FINISH) { // ANY THREAD |
1165 | assert (C->j_refcnt == 1); |
1166 | c->type->socket_free (C); |
1167 | return job_free (JOB_REF_PASS (C)); |
1168 | } |
1169 | |
1170 | return JOB_ERROR; |
1171 | } |
1172 | /* }}} */ |
1173 | |
1174 | /* |
1175 | creates socket_connection structure |
1176 | insert event to epoll |
1177 | */ |
1178 | socket_connection_job_t alloc_new_socket_connection (connection_job_t C) /* {{{ */ { |
1179 | assert_main_thread (); |
1180 | struct connection_info *c = CONN_INFO (C); |
1181 | |
1182 | socket_connection_job_t S = create_async_job (do_socket_connection_job, JSC_ALLOW (JC_CONNECTION_IO, JS_RUN) | JSC_ALLOW (JC_CONNECTION_IO, JS_ALARM) | JSC_ALLOW (JC_EPOLL, JS_ABORT) | JSC_ALLOW (JC_CONNECTION_IO, JS_FINISH) | JSC_ALLOW (JC_EPOLL, JS_AUX), -2, sizeof (struct socket_connection_info), JT_HAVE_TIMER, JOB_REF_NULL); |
1183 | S->j_refcnt = 2; |
1184 | struct socket_connection_info *s = SOCKET_CONN_INFO (S); |
1185 | //memset (s, 0, sizeof (*s)); /* no need, create_async_job memsets itself */ |
1186 | |
1187 | s->fd = c->fd; |
1188 | s->type = c->type; |
1189 | s->conn = job_incref (C); |
1190 | s->flags = C_WANTWR | C_WANTRD | (c->flags & C_CONNECTED); |
1191 | |
1192 | s->our_ip = c->our_ip; |
1193 | s->our_port = c->our_port; |
1194 | memcpy (s->our_ipv6, c->our_ipv6, 16); |
1195 | |
1196 | s->remote_ip = c->remote_ip; |
1197 | s->remote_port = c->remote_port; |
1198 | memcpy (s->remote_ipv6, c->remote_ipv6, 16); |
1199 | |
1200 | s->out_packet_queue = alloc_mp_queue_w (); |
1201 | |
1202 | struct event_descr *ev = Events + s->fd; |
1203 | assert (!ev->data); |
1204 | assert (!ev->refcnt); |
1205 | |
1206 | s->ev = ev; |
1207 | |
1208 | epoll_sethandler (s->fd, 0, net_server_socket_read_write_gateway, S); |
1209 | |
1210 | s->current_epoll_status = compute_conn_events (S); |
1211 | epoll_insert (s->fd, s->current_epoll_status); |
1212 | |
1213 | c->io_conn = S; |
1214 | |
1215 | rwm_init (&s->out, 0); |
1216 | unlock_job (JOB_REF_CREATE_PASS (S)); |
1217 | |
1218 | MODULE_STAT->allocated_socket_connections ++; |
1219 | return S; |
1220 | } |
1221 | /* }}} */ |
1222 | /* }}} */ |
1223 | |
1224 | /* {{{ LISTENING CONNECTION */ |
1225 | |
1226 | /* |
1227 | accepts new connections |
1228 | executes alloc_new_connection () |
1229 | */ |
1230 | int net_accept_new_connections (listening_connection_job_t LCJ) /* {{{ */ { |
1231 | struct listening_connection_info *LC = LISTEN_CONN_INFO (LCJ); |
1232 | |
1233 | union sockaddr_in46 peer; |
1234 | unsigned peer_addrlen; |
1235 | int cfd, acc = 0; |
1236 | |
1237 | while (Events[LC->fd].state & EVT_IN_EPOLL) { |
1238 | peer_addrlen = sizeof (peer); |
1239 | memset (&peer, 0, sizeof (peer)); |
1240 | cfd = accept (LC->fd, (struct sockaddr *) &peer, &peer_addrlen); |
1241 | |
1242 | vkprintf (2, "%s: cfd = %d\n" , __func__, cfd); |
1243 | if (cfd < 0) { |
1244 | if (errno != EAGAIN) { |
1245 | MODULE_STAT->accept_calls_failed ++; |
1246 | } |
1247 | if (!acc) { |
1248 | vkprintf ((errno == EAGAIN) * 2, "accept(%d) unexpectedly returns %d: %m\n" , LC->fd, cfd); |
1249 | } |
1250 | break; |
1251 | } |
1252 | |
1253 | acc ++; |
1254 | MODULE_STAT->inbound_connections_accepted ++; |
1255 | |
1256 | if (max_accept_rate) { |
1257 | cur_accept_rate_remaining += (precise_now - cur_accept_rate_time) * max_accept_rate; |
1258 | cur_accept_rate_time = precise_now; |
1259 | if (cur_accept_rate_remaining > max_accept_rate) { |
1260 | cur_accept_rate_remaining = max_accept_rate; |
1261 | } |
1262 | |
1263 | if (cur_accept_rate_remaining < 1) { |
1264 | MODULE_STAT->accept_rate_limit_failed ++; |
1265 | close (cfd); |
1266 | continue; |
1267 | } |
1268 | |
1269 | cur_accept_rate_remaining -= 1; |
1270 | } |
1271 | |
1272 | if (LC->flags & C_IPV6) { |
1273 | assert (peer_addrlen == sizeof (struct sockaddr_in6)); |
1274 | assert (peer.a6.sin6_family == AF_INET6); |
1275 | } else { |
1276 | assert (peer_addrlen == sizeof (struct sockaddr_in)); |
1277 | assert (peer.a4.sin_family == AF_INET); |
1278 | } |
1279 | |
1280 | connection_job_t C; |
1281 | if (peer.a4.sin_family == AF_INET) { |
1282 | C = alloc_new_connection (cfd, NULL, LCJ, |
1283 | ntohl (peer.a4.sin_addr.s_addr), NULL, ntohs (peer.a4.sin_port)); |
1284 | } else { |
1285 | C = alloc_new_connection (cfd, NULL, LCJ, |
1286 | 0, peer.a6.sin6_addr.s6_addr, ntohs (peer.a6.sin6_port)); |
1287 | } |
1288 | if (C) { |
1289 | assert (CONN_INFO(C)->io_conn); |
1290 | unlock_job (JOB_REF_PASS (C)); |
1291 | } |
1292 | } |
1293 | return 0; |
1294 | } |
1295 | /* }}} */ |
1296 | |
1297 | int do_listening_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ { |
1298 | listening_connection_job_t LCJ = job; |
1299 | |
1300 | if (op == JS_RUN) { |
1301 | net_accept_new_connections (LCJ); |
1302 | return 0; |
1303 | } else if (op == JS_AUX) { |
1304 | vkprintf (2, "**Invoking epoll_insert(%d,%d)\n" , LISTEN_CONN_INFO(LCJ)->fd, EVT_RWX); |
1305 | epoll_insert (LISTEN_CONN_INFO(LCJ)->fd, EVT_RWX); |
1306 | return 0; |
1307 | } |
1308 | return JOB_ERROR; |
1309 | } |
1310 | /* }}} */ |
1311 | |
1312 | int init_listening_connection_ext (int fd, conn_type_t *type, void *, int mode, int prio) /* {{{ */ { |
1313 | if (check_conn_functions (type, 1) < 0) { |
1314 | return -1; |
1315 | } |
1316 | if (fd >= max_connection_fd) { |
1317 | vkprintf (0, "TOO big fd for listening connection %d (max %d)\n" , fd, max_connection_fd); |
1318 | return -1; |
1319 | } |
1320 | if (fd > max_connection) { |
1321 | max_connection = fd; |
1322 | } |
1323 | |
1324 | listening_connection_job_t LCJ = create_async_job (do_listening_connection_job, JSC_ALLOW (JC_EPOLL, JS_RUN) | JSC_ALLOW (JC_EPOLL, JS_AUX) | JSC_ALLOW (JC_EPOLL, JS_FINISH), -2, sizeof (struct listening_connection_info), JT_HAVE_TIMER, JOB_REF_NULL); |
1325 | LCJ->j_refcnt = 2; |
1326 | |
1327 | struct listening_connection_info *LC = LISTEN_CONN_INFO (LCJ); |
1328 | memset (LC, 0, sizeof (*LC)); |
1329 | |
1330 | LC->fd = fd; |
1331 | LC->type = type; |
1332 | LC->extra = extra; |
1333 | |
1334 | struct event_descr *ev = Events + fd; |
1335 | assert (!ev->data); |
1336 | assert (!ev->refcnt); |
1337 | LC->ev = ev; |
1338 | |
1339 | LC->generation = new_conn_generation (); |
1340 | |
1341 | if (mode & SM_LOWPRIO) { |
1342 | prio = 10; |
1343 | } |
1344 | |
1345 | if (mode & SM_SPECIAL) { |
1346 | LC->flags |= C_SPECIAL; |
1347 | int idx = __sync_fetch_and_add (&special_listen_sockets, 1); |
1348 | assert (idx < MAX_SPECIAL_LISTEN_SOCKETS); |
1349 | special_socket[idx].fd = LC->fd; |
1350 | special_socket[idx].generation = LC->generation; |
1351 | } |
1352 | |
1353 | if (mode & SM_NOQACK) { |
1354 | LC->flags |= C_NOQACK; |
1355 | disable_qack (LC->fd); |
1356 | } |
1357 | |
1358 | if (mode & SM_IPV6) { |
1359 | LC->flags |= C_IPV6; |
1360 | } |
1361 | |
1362 | if (mode & SM_RAWMSG) { |
1363 | LC->flags |= C_RAWMSG; |
1364 | } |
1365 | |
1366 | epoll_sethandler (fd, prio, net_server_socket_read_write_gateway, LCJ); |
1367 | epoll_insert (fd, EVT_RWX); |
1368 | |
1369 | MODULE_STAT->listening_connections ++; |
1370 | |
1371 | unlock_job (JOB_REF_PASS (LCJ)); |
1372 | |
1373 | return 0; |
1374 | } |
1375 | |
1376 | int init_listening_connection (int fd, conn_type_t *type, void *) { |
1377 | return init_listening_connection_ext (fd, type, extra, 0, -10); |
1378 | } |
1379 | |
1380 | int init_listening_tcpv6_connection (int fd, conn_type_t *type, void *, int mode) { |
1381 | return init_listening_connection_ext (fd, type, extra, mode, -10); |
1382 | } |
1383 | /* }}} */ |
1384 | |
1385 | /* }}} */ |
1386 | |
1387 | /* {{{ connection refcnt */ |
1388 | void connection_event_incref (int fd, long long val) { |
1389 | struct event_descr *ev = &Events[fd]; |
1390 | |
1391 | if (!__sync_add_and_fetch (&ev->refcnt, val) && ev->data) { |
1392 | socket_connection_job_t C = ev->data; |
1393 | ev->data = NULL; |
1394 | job_decref (JOB_REF_PASS (C)); |
1395 | } |
1396 | } |
1397 | |
1398 | connection_job_t connection_get_by_fd (int fd) { |
1399 | struct event_descr *ev = &Events[fd]; |
1400 | if (!(int)(ev->refcnt) || !ev->data) { return NULL; } |
1401 | |
1402 | while (1) { |
1403 | long long v = __sync_fetch_and_add (&ev->refcnt, (1ll << 32)); |
1404 | if (((int)v) != 0) { break; } |
1405 | v = __sync_fetch_and_add (&ev->refcnt, -(1ll << 32)); |
1406 | if (((int)v) != 0) { continue; } |
1407 | return NULL; |
1408 | } |
1409 | __sync_fetch_and_add (&ev->refcnt, 1 - (1ll << 32)); |
1410 | socket_connection_job_t C = job_incref (ev->data); |
1411 | |
1412 | connection_event_incref (fd, -1); |
1413 | |
1414 | if (C->j_execute == &do_listening_connection_job) { |
1415 | return C; |
1416 | } |
1417 | |
1418 | assert (C->j_execute == &do_socket_connection_job); |
1419 | |
1420 | struct socket_connection_info *c = SOCKET_CONN_INFO (C); |
1421 | if (c->flags & C_ERROR) { |
1422 | job_decref (JOB_REF_PASS (C)); |
1423 | return NULL; |
1424 | } else { |
1425 | assert (c->conn); |
1426 | connection_job_t C2 = job_incref (c->conn); |
1427 | job_decref (JOB_REF_PASS (C)); |
1428 | return C2; |
1429 | } |
1430 | } |
1431 | |
1432 | connection_job_t connection_get_by_fd_generation (int fd, int generation) { |
1433 | connection_job_t C = connection_get_by_fd (fd); |
1434 | if (C && CONN_INFO(C)->generation != generation) { |
1435 | job_decref (JOB_REF_PASS (C)); |
1436 | return NULL; |
1437 | } else { |
1438 | return C; |
1439 | } |
1440 | } |
1441 | /* }}} */ |
1442 | |
1443 | |
1444 | /* {{{ Sample server functions */ |
1445 | |
1446 | int server_check_ready (connection_job_t C) /* {{{ */ { |
1447 | struct connection_info *c = CONN_INFO (C); |
1448 | if (c->status == conn_none || c->status == conn_connecting) { |
1449 | return c->ready = cr_notyet; |
1450 | } |
1451 | if (c->status == conn_error || c->ready == cr_failed) { |
1452 | return c->ready = cr_failed; |
1453 | } |
1454 | return c->ready = cr_ok; |
1455 | } |
1456 | /* }}} */ |
1457 | |
1458 | int server_noop (connection_job_t C) /* {{{ */ { |
1459 | return 0; |
1460 | } |
1461 | /* }}} */ |
1462 | |
1463 | int server_failed (connection_job_t C) /* {{{ */ { |
1464 | kprintf ("connection %d: call to pure virtual method\n" , CONN_INFO(C)->fd); |
1465 | assert (0); |
1466 | return -1; |
1467 | } |
1468 | /* }}} */ |
1469 | |
1470 | int server_flush (connection_job_t C) /* {{{ */ { |
1471 | //job_signal (job_incref (C), JS_RUN); |
1472 | return 0; |
1473 | } |
1474 | /* }}} */ |
1475 | |
1476 | int check_conn_functions (conn_type_t *type, int listening) /* {{{ */ { |
1477 | if (type->magic != CONN_FUNC_MAGIC) { |
1478 | return -1; |
1479 | } |
1480 | if (!type->title) { |
1481 | type->title = "(unknown)" ; |
1482 | } |
1483 | if (!type->socket_read_write) { |
1484 | type->socket_read_write = net_server_socket_read_write; |
1485 | } |
1486 | if (!type->socket_reader) { |
1487 | type->socket_reader = net_server_socket_reader; |
1488 | } |
1489 | if (!type->socket_writer) { |
1490 | type->socket_writer = net_server_socket_writer; |
1491 | } |
1492 | if (!type->socket_close) { |
1493 | type->socket_close = server_noop; |
1494 | } |
1495 | |
1496 | if (!type->accept) { |
1497 | if (listening) { |
1498 | type->accept = net_accept_new_connections; |
1499 | } else { |
1500 | type->accept = server_failed; |
1501 | } |
1502 | } |
1503 | if (!type->init_accepted) { |
1504 | if (listening) { |
1505 | type->init_accepted = server_noop; |
1506 | } else { |
1507 | type->init_accepted = server_failed; |
1508 | } |
1509 | } |
1510 | |
1511 | if (!type->close) { |
1512 | type->close = cpu_server_close_connection; |
1513 | } |
1514 | if (!type->init_outbound) { |
1515 | type->init_outbound = server_noop; |
1516 | } |
1517 | if (!type->wakeup) { |
1518 | type->wakeup = server_noop; |
1519 | } |
1520 | if (!type->alarm) { |
1521 | type->alarm = server_noop; |
1522 | } |
1523 | if (!type->connected) { |
1524 | type->connected = server_noop; |
1525 | } |
1526 | if (!type->flush) { |
1527 | type->flush = server_flush; |
1528 | } |
1529 | if (!type->check_ready) { |
1530 | type->check_ready = server_check_ready; |
1531 | } |
1532 | if (!type->read_write) { |
1533 | type->read_write = cpu_server_read_write; |
1534 | } |
1535 | if (!type->free) { |
1536 | type->free = cpu_server_free_connection; |
1537 | } |
1538 | if (!type->socket_connected) { |
1539 | type->socket_connected = server_noop; |
1540 | } |
1541 | if (!type->socket_free) { |
1542 | type->socket_free = net_server_socket_free; |
1543 | } |
1544 | if (type->flags & C_RAWMSG) { |
1545 | if (!type->free_buffers) { |
1546 | type->free_buffers = cpu_tcp_free_connection_buffers; |
1547 | } |
1548 | if (!type->reader) { |
1549 | type->reader = cpu_tcp_server_reader; |
1550 | if (!type->parse_execute) { |
1551 | return -1; |
1552 | } |
1553 | } |
1554 | if (!type->writer) { |
1555 | type->writer = cpu_tcp_server_writer; |
1556 | } |
1557 | } else { |
1558 | if (!type->free_buffers) { |
1559 | assert (0); |
1560 | } |
1561 | if (!type->reader) { |
1562 | assert (0); |
1563 | } |
1564 | if (!type->writer) { |
1565 | assert (0); |
1566 | } |
1567 | } |
1568 | return 0; |
1569 | } |
1570 | /* }}} */ |
1571 | |
1572 | /* }}} */ |
1573 | |
1574 | |
1575 | |
1576 | /* CONN TARGETS {{{ */ |
1577 | |
1578 | void compute_next_reconnect (conn_target_job_t CT) /* {{{ */{ |
1579 | struct conn_target_info *S = CONN_TARGET_INFO (CT); |
1580 | if (S->next_reconnect_timeout < S->reconnect_timeout || S->active_outbound_connections) { |
1581 | S->next_reconnect_timeout = S->reconnect_timeout; |
1582 | } |
1583 | S->next_reconnect = precise_now + S->next_reconnect_timeout; |
1584 | if (!S->active_outbound_connections && S->next_reconnect_timeout < MAX_RECONNECT_INTERVAL) { |
1585 | S->next_reconnect_timeout = S->next_reconnect_timeout * 1.5 + drand48_j () * 0.2; |
1586 | } |
1587 | } |
1588 | /* }}} */ |
1589 | |
1590 | static void count_connection_num (connection_job_t C, void *good_c, void *stopped_c, void *bad_c) /* {{{ */ { |
1591 | int cr = CONN_INFO(C)->type->check_ready (C); |
1592 | switch (cr) { |
1593 | case cr_notyet: |
1594 | case cr_busy: |
1595 | break; |
1596 | case cr_ok: |
1597 | (*(int *)good_c)++; |
1598 | break; |
1599 | case cr_stopped: |
1600 | (*(int *)stopped_c)++; |
1601 | break; |
1602 | case cr_failed: |
1603 | (*(int *)bad_c)++; |
1604 | break; |
1605 | default: |
1606 | assert (0); |
1607 | } |
1608 | } |
1609 | /* }}} */ |
1610 | |
1611 | static void find_bad_connection (connection_job_t C, void *x) /* {{{ */ { |
1612 | connection_job_t *T = x; |
1613 | if (*T) { return; } |
1614 | if (CONN_INFO(C)->flags & C_ERROR) { |
1615 | *T = C; |
1616 | } |
1617 | } |
1618 | /* }}} */ |
1619 | |
1620 | /* |
1621 | Deletes failed connections (with flag C_ERROR) from target's tree |
1622 | */ |
1623 | void destroy_dead_target_connections (conn_target_job_t CTJ) /* {{{ */ { |
1624 | struct conn_target_info *CT = CONN_TARGET_INFO (CTJ); |
1625 | |
1626 | struct tree_connection *T = CT->conn_tree; |
1627 | if (T) { |
1628 | __sync_fetch_and_add (&T->refcnt, 1); |
1629 | } |
1630 | |
1631 | while (1) { |
1632 | connection_job_t CJ = NULL; |
1633 | tree_act_ex_connection (T, find_bad_connection, &CJ); |
1634 | if (!CJ) { break; } |
1635 | |
1636 | if (connection_is_active (CONN_INFO (CJ)->flags)) { |
1637 | __sync_fetch_and_add (&CT->active_outbound_connections, -1); |
1638 | } |
1639 | __sync_fetch_and_add (&CT->outbound_connections, -1); |
1640 | |
1641 | T = tree_delete_connection (T, CJ); |
1642 | } |
1643 | |
1644 | int good_c = 0, bad_c = 0, stopped_c = 0; |
1645 | |
1646 | tree_act_ex3_connection (T, count_connection_num, &good_c, &stopped_c, &bad_c); |
1647 | |
1648 | int was_ready = CT->ready_outbound_connections; |
1649 | CT->ready_outbound_connections = good_c; |
1650 | |
1651 | if (was_ready != CT->ready_outbound_connections) { |
1652 | MODULE_STAT->ready_outbound_connections += CT->ready_outbound_connections - was_ready; |
1653 | } |
1654 | |
1655 | if (was_ready && !CT->ready_outbound_connections) { |
1656 | MODULE_STAT->ready_targets --; |
1657 | } |
1658 | if (!was_ready && CT->ready_outbound_connections) { |
1659 | MODULE_STAT->ready_targets ++; |
1660 | } |
1661 | |
1662 | if (T == CT->conn_tree) { |
1663 | tree_free_connection (T); |
1664 | } else { |
1665 | struct tree_connection *old = CT->conn_tree; |
1666 | CT->conn_tree = T; |
1667 | barrier (); |
1668 | __sync_synchronize (); |
1669 | free_tree_ptr_connection (old); |
1670 | } |
1671 | } |
1672 | /* }}} */ |
1673 | |
1674 | /* |
1675 | creates new connections for target |
1676 | must be called in main thread, because we can allocate new connections only in main thread |
1677 | */ |
1678 | int create_new_connections (conn_target_job_t CTJ) /* {{{ */ { |
1679 | assert_main_thread (); |
1680 | |
1681 | destroy_dead_target_connections (CTJ); |
1682 | struct conn_target_info *CT = CONN_TARGET_INFO (CTJ); |
1683 | |
1684 | int count = 0, good_c = 0, bad_c = 0, stopped_c = 0, need_c; |
1685 | |
1686 | tree_act_ex3_connection (CT->conn_tree, count_connection_num, &good_c, &stopped_c, &bad_c); |
1687 | |
1688 | int was_ready = CT->ready_outbound_connections; |
1689 | CT->ready_outbound_connections = good_c; |
1690 | |
1691 | if (was_ready != CT->ready_outbound_connections) { |
1692 | MODULE_STAT->ready_outbound_connections += CT->ready_outbound_connections - was_ready; |
1693 | } |
1694 | |
1695 | if (was_ready && !CT->ready_outbound_connections) { |
1696 | MODULE_STAT->ready_targets --; |
1697 | } |
1698 | if (!was_ready && CT->ready_outbound_connections) { |
1699 | MODULE_STAT->ready_targets ++; |
1700 | } |
1701 | |
1702 | need_c = CT->min_connections + bad_c + ((stopped_c + 1) >> 1); |
1703 | if (need_c > CT->max_connections) { |
1704 | need_c = CT->max_connections; |
1705 | } |
1706 | |
1707 | if (precise_now >= CT->next_reconnect || CT->active_outbound_connections) { |
1708 | struct tree_connection *T = CT->conn_tree; |
1709 | if (T) { |
1710 | __sync_fetch_and_add (&T->refcnt, 1); |
1711 | } |
1712 | |
1713 | while (CT->outbound_connections < need_c) { |
1714 | if (CT->target.s_addr) { |
1715 | vkprintf (1, "Creating NEW connection to %s:%d\n" , inet_ntoa (CT->target), CT->port); |
1716 | } else { |
1717 | vkprintf (1, "Creating NEW ipv6 connection to [%s]:%d\n" , show_ipv6 (CT->target_ipv6), CT->port); |
1718 | } |
1719 | int cfd = CT->target.s_addr ? client_socket (CT->target.s_addr, CT->port, 0) : client_socket_ipv6 (CT->target_ipv6, CT->port, SM_IPV6); |
1720 | if (cfd < 0) { |
1721 | if (CT->target.s_addr) { |
1722 | vkprintf (1, "error connecting to %s:%d: %m\n" , inet_ntoa (CT->target), CT->port); |
1723 | } else { |
1724 | vkprintf (1, "error connecting to [%s]:%d\n" , show_ipv6 (CT->target_ipv6), CT->port); |
1725 | } |
1726 | break; |
1727 | } |
1728 | |
1729 | connection_job_t C = alloc_new_connection (cfd, CTJ, NULL, |
1730 | ntohl (CT->target.s_addr), CT->target_ipv6, CT->port); |
1731 | |
1732 | if (C) { |
1733 | assert (CONN_INFO(C)->io_conn); |
1734 | count ++; |
1735 | unlock_job (JOB_REF_CREATE_PASS (C)); |
1736 | T = tree_insert_connection (T, C, lrand48_j ()); |
1737 | } else { |
1738 | break; |
1739 | } |
1740 | } |
1741 | |
1742 | if (T == CT->conn_tree) { |
1743 | tree_free_connection (T); |
1744 | } else { |
1745 | struct tree_connection *old = CT->conn_tree; |
1746 | CT->conn_tree = T; |
1747 | __sync_synchronize (); |
1748 | free_tree_ptr_connection (old); |
1749 | } |
1750 | |
1751 | compute_next_reconnect (CTJ); |
1752 | } |
1753 | |
1754 | |
1755 | return count; |
1756 | } |
1757 | /* }}} */ |
1758 | |
1759 | conn_target_job_t HTarget[PRIME_TARGETS]; |
1760 | pthread_mutex_t TargetsLock = PTHREAD_MUTEX_INITIALIZER; |
1761 | |
1762 | /* must be called with mutex held */ |
1763 | /* mode = 0 -- lookup, mode = 1 -- insert, mode = -1 -- delete */ |
1764 | static conn_target_job_t find_target (struct in_addr ad, int port, conn_type_t *type, void *, int mode, conn_target_job_t new_target) /* {{{ */ { |
1765 | assert (ad.s_addr); |
1766 | unsigned h1 = ((unsigned long) type * 0xabacaba + ad.s_addr) % PRIME_TARGETS; |
1767 | h1 = (h1 * 239 + port) % PRIME_TARGETS; |
1768 | conn_target_job_t *prev = HTarget + h1, cur; |
1769 | while ((cur = *prev) != 0) { |
1770 | struct conn_target_info *S = CONN_TARGET_INFO (cur); |
1771 | if (S->target.s_addr == ad.s_addr && S->port == port && S->type == type && S->extra == extra) { |
1772 | if (mode < 0) { |
1773 | *prev = S->hnext; |
1774 | S->hnext = 0; |
1775 | return cur; |
1776 | } |
1777 | assert (!mode); |
1778 | return cur; |
1779 | } |
1780 | prev = &S->hnext; |
1781 | } |
1782 | assert (mode >= 0); |
1783 | if (mode > 0) { |
1784 | CONN_TARGET_INFO (new_target)->hnext = HTarget[h1]; |
1785 | HTarget[h1] = new_target; |
1786 | return new_target; |
1787 | } |
1788 | return 0; |
1789 | } |
1790 | /* }}} */ |
1791 | |
1792 | /* must be called with mutex held */ |
1793 | /* mode = 0 -- lookup, mode = 1 -- insert, mode = -1 -- delete */ |
1794 | static conn_target_job_t find_target_ipv6 (unsigned char ad_ipv6[16], int port, conn_type_t *type, void *, int mode, conn_target_job_t new_target) /* {{{ */ { |
1795 | assert (*(long long *)ad_ipv6 || ((long long *) ad_ipv6)[1]); |
1796 | unsigned h1 = ((unsigned long) type * 0xabacaba) % PRIME_TARGETS; |
1797 | int i; |
1798 | for (i = 0; i < 4; i++) { |
1799 | h1 = ((unsigned long long) h1 * 17239 + ((unsigned *) ad_ipv6)[i]) % PRIME_TARGETS; |
1800 | } |
1801 | h1 = (h1 * 239 + port) % PRIME_TARGETS; |
1802 | conn_target_job_t *prev = HTarget + h1, cur; |
1803 | while ((cur = *prev) != 0) { |
1804 | struct conn_target_info *S = CONN_TARGET_INFO (cur); |
1805 | if ( |
1806 | ((long long *)S->target_ipv6)[1] == ((long long *)ad_ipv6)[1] && |
1807 | *(long long *)S->target_ipv6 == *(long long *)ad_ipv6 && |
1808 | S->port == port && S->type == type && !S->target.s_addr && S->extra == extra) { |
1809 | if (mode < 0) { |
1810 | *prev = S->hnext; |
1811 | S->hnext = 0; |
1812 | return cur; |
1813 | } |
1814 | assert (!mode); |
1815 | return cur; |
1816 | } |
1817 | prev = &S->hnext; |
1818 | } |
1819 | assert (mode >= 0); |
1820 | if (mode > 0) { |
1821 | CONN_TARGET_INFO (new_target)->hnext = HTarget[h1]; |
1822 | HTarget[h1] = new_target; |
1823 | return new_target; |
1824 | } |
1825 | return 0; |
1826 | } |
1827 | /* }}} */ |
1828 | |
1829 | static int free_target (conn_target_job_t CTJ) /* {{{ */ { |
1830 | pthread_mutex_lock (&TargetsLock); |
1831 | struct conn_target_info *CT = CONN_TARGET_INFO (CTJ); |
1832 | if (CT->global_refcnt > 0 || CT->conn_tree) { |
1833 | pthread_mutex_unlock (&TargetsLock); |
1834 | return -1; |
1835 | } |
1836 | |
1837 | assert (CT && CT->type && !CT->global_refcnt); |
1838 | assert (!CT->conn_tree); |
1839 | if (CT->target.s_addr) { |
1840 | vkprintf (1, "Freeing unused target to %s:%d\n" , inet_ntoa (CT->target), CT->port); |
1841 | assert (CTJ == find_target (CT->target, CT->port, CT->type, CT->extra, -1, 0)); |
1842 | } else { |
1843 | vkprintf (1, "Freeing unused ipv6 target to [%s]:%d\n" , show_ipv6 (CT->target_ipv6), CT->port); |
1844 | assert (CTJ == find_target_ipv6 (CT->target_ipv6, CT->port, CT->type, CT->extra, -1, 0)); |
1845 | } |
1846 | |
1847 | pthread_mutex_unlock (&TargetsLock); |
1848 | |
1849 | MODULE_STAT->inactive_targets --; |
1850 | MODULE_STAT->free_targets ++; |
1851 | |
1852 | job_decref (JOB_REF_PASS (CTJ)); |
1853 | |
1854 | return 1; |
1855 | } |
1856 | /* }}} */ |
1857 | |
1858 | static void fail_connection_gw (connection_job_t C) { |
1859 | fail_connection (C, -17); |
1860 | } |
1861 | |
1862 | int clean_unused_target (conn_target_job_t CTJ) /* {{{ */ { |
1863 | assert (CTJ); |
1864 | struct conn_target_info *CT = CONN_TARGET_INFO (CTJ); |
1865 | assert (CT->type); |
1866 | if (CT->global_refcnt) { |
1867 | return 0; |
1868 | } |
1869 | if (CT->conn_tree) { |
1870 | tree_act_connection (CT->conn_tree, fail_connection_gw); |
1871 | return 0; |
1872 | } |
1873 | job_timer_remove (CTJ); |
1874 | return 0; |
1875 | } |
1876 | /* }}} */ |
1877 | |
1878 | int destroy_target (JOB_REF_ARG (CTJ)) /* {{{ */ { |
1879 | struct conn_target_info *CT = CONN_TARGET_INFO (CTJ); |
1880 | assert (CT); |
1881 | assert (CT->type); |
1882 | assert (CT->global_refcnt > 0); |
1883 | |
1884 | int r; |
1885 | if (!((r = __sync_add_and_fetch (&CT->global_refcnt, -1)))) { |
1886 | MODULE_STAT->active_targets--; |
1887 | MODULE_STAT->inactive_targets++; |
1888 | |
1889 | job_signal (JOB_REF_PASS (CTJ), JS_RUN); |
1890 | } else { |
1891 | job_decref (JOB_REF_PASS (CTJ)); |
1892 | } |
1893 | return r; |
1894 | } |
1895 | /*}}} */ |
1896 | |
1897 | int do_conn_target_job (job_t job, int op, struct job_thread *JT) /* {{{ */ { |
1898 | if (epoll_fd <= 0) { |
1899 | job_timer_insert (job, precise_now + 0.01); |
1900 | return 0; |
1901 | } |
1902 | conn_target_job_t CTJ = job; |
1903 | struct conn_target_info *CT = CONN_TARGET_INFO (CTJ); |
1904 | |
1905 | if (op == JS_ALARM || op == JS_RUN) { |
1906 | if (op == JS_ALARM && !job_timer_check (job)) { |
1907 | return 0; |
1908 | } |
1909 | if (!CT->global_refcnt) { |
1910 | destroy_dead_target_connections (CTJ); |
1911 | clean_unused_target (CTJ); |
1912 | compute_next_reconnect (CTJ); |
1913 | } else { |
1914 | create_new_connections (CTJ); |
1915 | } |
1916 | |
1917 | if (CTJ->j_flags & JF_COMPLETED) { return 0; } |
1918 | |
1919 | if (CT->global_refcnt || CT->conn_tree) { |
1920 | job_timer_insert (CTJ, precise_now + 0.1); |
1921 | return 0; |
1922 | } else { |
1923 | if (free_target (CTJ) >= 0) { |
1924 | return JOB_COMPLETED; |
1925 | } else { |
1926 | job_timer_insert (CTJ, precise_now + 0.1); |
1927 | return 0; |
1928 | } |
1929 | } |
1930 | } |
1931 | if (op == JS_FINISH) { |
1932 | assert (CTJ->j_flags & JF_COMPLETED); |
1933 | MODULE_STAT->allocated_targets --; |
1934 | return job_free (JOB_REF_PASS (job)); |
1935 | } |
1936 | |
1937 | return JOB_ERROR; |
1938 | } |
1939 | /* }}} */ |
1940 | |
1941 | conn_target_job_t create_target (struct conn_target_info *source, int *was_created) /* {{{ */ { |
1942 | if (check_conn_functions (source->type, 0) < 0) { |
1943 | return NULL; |
1944 | } |
1945 | pthread_mutex_lock (&TargetsLock); |
1946 | |
1947 | conn_target_job_t T = |
1948 | source->target.s_addr ? |
1949 | find_target (source->target, source->port, source->type, source->extra, 0, 0) : |
1950 | find_target_ipv6 (source->target_ipv6, source->port, source->type, source->extra, 0, 0); |
1951 | |
1952 | if (T) { |
1953 | struct conn_target_info *t = CONN_TARGET_INFO (T); |
1954 | |
1955 | t->min_connections = source->min_connections; |
1956 | t->max_connections = source->max_connections; |
1957 | t->reconnect_timeout = source->reconnect_timeout; |
1958 | |
1959 | if (!__sync_fetch_and_add (&t->global_refcnt, 1)) { |
1960 | MODULE_STAT->active_targets++; |
1961 | MODULE_STAT->inactive_targets--; |
1962 | |
1963 | if (was_created) { |
1964 | *was_created = 2; |
1965 | } |
1966 | } else { |
1967 | if (was_created) { |
1968 | *was_created = 0; |
1969 | } |
1970 | } |
1971 | |
1972 | job_incref (T); |
1973 | } else { |
1974 | //assert (MODULE_STAT->allocated_targets < MAX_TARGETS); |
1975 | T = create_async_job (do_conn_target_job, JSC_ALLOW (JC_EPOLL, JS_RUN) | JSC_ALLOW (JC_EPOLL, JS_ABORT) | JSC_ALLOW (JC_EPOLL, JS_ALARM) | JSC_ALLOW (JC_EPOLL, JS_FINISH), -2, sizeof (struct conn_target_info), JT_HAVE_TIMER, JOB_REF_NULL); |
1976 | T->j_refcnt = 2; |
1977 | |
1978 | struct conn_target_info *t = CONN_TARGET_INFO (T); |
1979 | memcpy (t, source, sizeof (*source)); |
1980 | job_timer_init (T); |
1981 | |
1982 | //t->generation = 1; |
1983 | MODULE_STAT->active_targets ++; |
1984 | MODULE_STAT->allocated_targets ++; |
1985 | |
1986 | if (source->target.s_addr) { |
1987 | find_target (source->target, source->port, source->type, source->extra, 1, T); |
1988 | } else { |
1989 | find_target_ipv6 (source->target_ipv6, source->port, source->type, source->extra, 1, T); |
1990 | } |
1991 | |
1992 | if (was_created) { |
1993 | *was_created = 1; |
1994 | } |
1995 | t->global_refcnt = 1; |
1996 | schedule_job (JOB_REF_CREATE_PASS (T)); |
1997 | } |
1998 | |
1999 | pthread_mutex_unlock (&TargetsLock); |
2000 | |
2001 | return T; |
2002 | } |
2003 | /* }}} */ |
2004 | |
2005 | |
2006 | /* }}} */ |
2007 | |
2008 | |
2009 | |
2010 | |
2011 | void tcp_set_max_connections (int maxconn) /* {{{ */ { |
2012 | max_connection_fd = maxconn; |
2013 | if (!max_special_connections || max_special_connections > maxconn) { |
2014 | max_special_connections = maxconn; |
2015 | } |
2016 | } |
2017 | /* }}} */ |
2018 | |
2019 | int create_all_outbound_connections_limited (int limit) /* {{{ */ { |
2020 | return 0; |
2021 | /*int count = 0; |
2022 | get_utime_monotonic (); |
2023 | //close_some_unneeded_connections (); |
2024 | //ready_outbound_connections = ready_targets = 0; |
2025 | int new_ready_outbound_connections = 0; |
2026 | int new_ready_targets = 0; |
2027 | |
2028 | pthread_mutex_lock (&TargetsLock); |
2029 | conn_target_job_t S; |
2030 | for (S = CONN_TARGET_INFO(ActiveTargets)->next_target; S != ActiveTargets && count < limit; S = CONN_TARGET_INFO(S)->next_target) { |
2031 | struct conn_target_info *s = CONN_TARGET_INFO (S); |
2032 | |
2033 | assert (s->type && s->refcnt > 0); |
2034 | count += create_new_connections (S); |
2035 | |
2036 | if (s->ready_outbound_connections) { |
2037 | new_ready_outbound_connections += s->ready_outbound_connections; |
2038 | new_ready_targets++; |
2039 | } |
2040 | } |
2041 | pthread_mutex_unlock (&TargetsLock); |
2042 | MODULE_STAT->ready_targets = new_ready_targets; |
2043 | MODULE_STAT->ready_outbound_connections = new_ready_outbound_connections; |
2044 | return count; */ |
2045 | } |
2046 | /* }}} */ |
2047 | |
2048 | int create_all_outbound_connections (void) /* {{{ */ { |
2049 | return create_all_outbound_connections_limited (0x7fffffff); |
2050 | } |
2051 | /* }}} */ |
2052 | |
2053 | /* {{{ conn_target_get_connection */ |
2054 | static void check_connection (connection_job_t C, void *x) { |
2055 | connection_job_t *P = x; |
2056 | if (*P) { return; } |
2057 | |
2058 | int r = CONN_INFO (C)->type->check_ready (C); |
2059 | |
2060 | if (r == cr_ok) { |
2061 | *P = C; |
2062 | return; |
2063 | } |
2064 | } |
2065 | |
2066 | static void check_connection_stopped (connection_job_t C, void *x) { |
2067 | connection_job_t *P = x; |
2068 | |
2069 | if (*P && CONN_INFO (*P)->ready == cr_ok) { return; } |
2070 | |
2071 | int r = CONN_INFO (C)->type->check_ready (C); |
2072 | |
2073 | if (r == cr_ok) { |
2074 | *P = C; |
2075 | return; |
2076 | } |
2077 | |
2078 | if (r == cr_stopped && (!*P || CONN_INFO (*P)->unreliability > CONN_INFO (C)->unreliability)) { |
2079 | *P = C; |
2080 | return; |
2081 | } |
2082 | } |
2083 | |
2084 | connection_job_t conn_target_get_connection (conn_target_job_t CT, int allow_stopped) { |
2085 | assert (CT); |
2086 | |
2087 | struct conn_target_info *t = CONN_TARGET_INFO (CT); |
2088 | |
2089 | struct tree_connection *T = get_tree_ptr_connection (&t->conn_tree); |
2090 | |
2091 | connection_job_t S = NULL; |
2092 | tree_act_ex_connection (T, allow_stopped ? check_connection_stopped : check_connection, &S); |
2093 | |
2094 | if (S) { job_incref (S); } |
2095 | tree_free_connection (T); |
2096 | |
2097 | return S; |
2098 | } |
2099 | /* }}} */ |
2100 | |
2101 | void insert_free_later_struct (struct free_later *F) { |
2102 | if (!free_later_queue) { |
2103 | free_later_queue = alloc_mp_queue_w (); |
2104 | } |
2105 | mpq_push_w (free_later_queue, F, 0); |
2106 | MODULE_STAT->free_later_size ++; |
2107 | MODULE_STAT->free_later_total ++; |
2108 | } |
2109 | |
2110 | void free_later_act (void) { |
2111 | if (!free_later_queue) { return; } |
2112 | while (1) { |
2113 | struct free_later *F = mpq_pop_nw (free_later_queue, 4); |
2114 | if (!F) { return; } |
2115 | MODULE_STAT->free_later_size --; |
2116 | F->free (F->ptr); |
2117 | free (F); |
2118 | } |
2119 | } |
2120 | |
2121 | void free_connection_tree_ptr (struct tree_connection *T) /* {{{ */ { |
2122 | free_tree_ptr_connection (T); |
2123 | } |
2124 | /* }}} */ |
2125 | |
2126 | |
2127 | void incr_active_dh_connections (void) { |
2128 | MODULE_STAT->active_dh_connections ++; |
2129 | } |
2130 | |
2131 | int new_conn_generation (void) { |
2132 | return __sync_fetch_and_add (&conn_generation, 1); |
2133 | } |
2134 | |
2135 | int get_cur_conn_generation (void) { |
2136 | return conn_generation; |
2137 | } |
2138 | |
2139 | // ----- |
2140 | |
2141 | int nat_info_rules; |
2142 | unsigned nat_info[MAX_NAT_INFO_RULES][2]; |
2143 | |
2144 | int net_add_nat_info (char *str) { |
2145 | char *str2 = strrchr (str, ':'); |
2146 | if (!str2) { |
2147 | fprintf (stderr, "expected <local-addr>:<global-addr> in --nat-info\n" ); |
2148 | return -1; |
2149 | } |
2150 | *str2++ = 0; |
2151 | struct in_addr l_addr, g_addr; |
2152 | if (inet_pton (AF_INET, str, &l_addr) <= 0) { |
2153 | fprintf (stderr, "cannot translate host '%s' in --nat-info\n" , str); |
2154 | return -1; |
2155 | } |
2156 | if (inet_pton (AF_INET, str2, &g_addr) <= 0) { |
2157 | fprintf (stderr, "cannot translate host '%s' in --nat-info\n" , str2); |
2158 | return -1; |
2159 | } |
2160 | if (nat_info_rules >= MAX_NAT_INFO_RULES) { |
2161 | fprintf (stderr, "too many rules in --nat-info\n" ); |
2162 | return -1; |
2163 | } |
2164 | nat_info[nat_info_rules][0] = ntohl (l_addr.s_addr); |
2165 | nat_info[nat_info_rules][1] = ntohl (g_addr.s_addr); |
2166 | return nat_info_rules++; |
2167 | } |
2168 | |
2169 | unsigned nat_translate_ip (unsigned local_ip) { |
2170 | int i; |
2171 | vkprintf (6, "nat_info: %d rules\n" , nat_info_rules); |
2172 | for (i = 0; i < nat_info_rules; i++) { |
2173 | vkprintf (6, "nat_info rule #%d: %s to %s\n" , i, show_ip (nat_info[i][0]), show_ip (nat_info[i][1])); |
2174 | if (nat_info[i][0] == local_ip) { |
2175 | vkprintf (4, "translating ip by nat_info rules: %s to %s\n" , show_ip (local_ip), show_ip (nat_info[i][1])); |
2176 | return nat_info[i][1]; |
2177 | } |
2178 | } |
2179 | return local_ip; |
2180 | } |
2181 | |