1 | /* |
2 | This file is part of Mtproto-proxy Library. |
3 | |
4 | Mtproto-proxy Library is free software: you can redistribute it and/or modify |
5 | it under the terms of the GNU Lesser General Public License as published by |
6 | the Free Software Foundation, either version 2 of the License, or |
7 | (at your option) any later version. |
8 | |
9 | Mtproto-proxy Library is distributed in the hope that it will be useful, |
10 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | GNU Lesser General Public License for more details. |
13 | |
14 | You should have received a copy of the GNU Lesser General Public License |
15 | along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>. |
16 | |
17 | Copyright 2009-2013 Vkontakte Ltd |
18 | 2008-2013 Nikolai Durov |
19 | 2008-2013 Andrey Lopatin |
20 | |
21 | Copyright 2014 Telegram Messenger Inc |
22 | 2014 Nikolai Durov |
23 | 2014 Andrey Lopatin |
24 | |
25 | Copyright 2015-2016 Telegram Messenger Inc |
26 | 2015-2016 Vitaly Valtman |
27 | */ |
28 | |
29 | #pragma once |
30 | |
31 | //#include "net/net-buffers.h" |
32 | #include "net/net-events.h" |
33 | #include "net/net-msg.h" |
34 | #include "jobs/jobs.h" |
35 | #include "common/mp-queue.h" |
36 | #include "common/pid.h" |
37 | |
38 | #define MAX_CONNECTIONS 65536 |
39 | #define MAX_TARGETS 65536 |
40 | #define PRIME_TARGETS 99961 |
41 | #define MAX_SPECIAL_LISTEN_SOCKETS 64 |
42 | |
43 | #define MAX_TCP_RECV_BUFFERS 128 |
44 | #define TCP_RECV_BUFFER_SIZE 1024 |
45 | |
46 | #define MAX_NET_RES (1L << 16) |
47 | |
48 | //#define BUFF_SIZE 2048 |
49 | |
50 | #define CONN_CUSTOM_DATA_BYTES 256 |
51 | |
52 | #define NEED_MORE_BYTES (~(-1 << 31)) |
53 | #define SKIP_ALL_BYTES (-1 << 31) |
54 | |
55 | |
56 | /* for connection flags */ |
57 | #define C_WANTRD 1 |
58 | #define C_WANTWR 2 |
59 | #define C_WANTRW (C_WANTRD | C_WANTWR) |
60 | #define C_INCONN 4 |
61 | #define C_ERROR 8 |
62 | #define C_NORD 0x10 |
63 | #define C_NOWR 0x20 |
64 | #define C_NORW (C_NORD | C_NOWR) |
65 | #define C_INQUERY 0x40 |
66 | #define C_FAILED 0x80 |
67 | #define C_ALARM 0x100 |
68 | #define C_AIO 0x200 |
69 | #define C_INTIMEOUT 0x400 |
70 | #define C_STOPREAD 0x800 |
71 | #define C_REPARSE 0x1000 |
72 | #define C_DFLUSH 0x2000 |
73 | #define C_IPV6 0x4000 |
74 | #define C_EXTERNAL 0x8000 |
75 | #define C_SPECIAL 0x10000 |
76 | #define C_NOQACK 0x20000 |
77 | #define C_RAWMSG 0x40000 |
78 | #define C_NET_FAILED 0x80000 |
79 | #define C_CRYPTOIN 0x100000 |
80 | #define C_CRYPTOOUT 0x200000 |
81 | #define C_STOPPARSE 0x400000 |
82 | #define C_ISDH 0x800000 |
83 | #define C_READY_PENDING 0x1000000 |
84 | #define C_CONNECTED 0x2000000 |
85 | #define C_STOPWRITE 0x4000000 |
86 | |
87 | #define C_PERMANENT (C_IPV6 | C_RAWMSG) |
88 | /* for connection status */ |
89 | enum { |
90 | conn_none, // closed/uninitialized |
91 | conn_connecting, |
92 | conn_working, |
93 | conn_error, // connection in bad state (it will be probably closed) |
94 | conn_listen, // listening for inbound connections |
95 | conn_write_close, // write all output buffer, then close; don't read input |
96 | conn_total_states // total number of connection states |
97 | }; |
98 | |
99 | /* for connection basic_type */ |
100 | enum { |
101 | ct_none, // no connection (closed) |
102 | ct_listen, // listening socket |
103 | ct_inbound, // inbound connection |
104 | ct_outbound, // outbound connection |
105 | ct_pipe, // used for pipe reading |
106 | ct_job // used for async jobs ( net-jobs.h ) |
107 | }; |
108 | |
109 | /* for connection->ready of outbound connections */ |
110 | enum { |
111 | cr_notyet, // not ready yet (e.g. logging in) |
112 | cr_ok, // working |
113 | cr_stopped, // stopped (don't send more queries) |
114 | cr_busy, // busy (sending queries not allowed by protocol) |
115 | cr_failed // failed (possibly timed out) |
116 | }; |
117 | |
118 | |
119 | typedef job_t connection_job_t; |
120 | typedef job_t socket_connection_job_t; |
121 | typedef job_t listening_connection_job_t; |
122 | typedef job_t conn_target_job_t; |
123 | typedef job_t query_job_t; |
124 | |
125 | |
126 | /* connection function table */ |
127 | |
128 | #define CONN_FUNC_MAGIC 0x11ef55aa |
129 | |
130 | typedef struct conn_functions { |
131 | int magic; |
132 | int flags; /* may contain for example C_RAWMSG; (partially) inherited by inbound/outbound connections */ |
133 | char *title; |
134 | int (*accept)(connection_job_t c); /* invoked for listen/accept connections of this type */ |
135 | int (*init_accepted)(connection_job_t c); /* initialize a new accept()'ed connection */ |
136 | int (*reader)(connection_job_t c); /* invoked from run() for reading network data */ |
137 | int (*writer)(connection_job_t c); /* invoked from run() for writing data */ |
138 | int (*close)(connection_job_t c, int who); /* invoked from run() whenever we need to close connection */ |
139 | int (*parse_execute)(connection_job_t c); /* invoked from reader() for parsing and executing one query */ |
140 | int (*init_outbound)(connection_job_t c); /* initializes newly created outbound connection */ |
141 | int (*connected)(connection_job_t c); /* invoked from run() when outbound connection is established */ |
142 | int (*check_ready)(connection_job_t c); /* updates conn->ready if necessary and returns it */ |
143 | int (*wakeup_aio)(connection_job_t c, int r);/* invoked from net_aio.c::check_aio_completion when aio read operation is complete */ |
144 | int (*write_packet)(connection_job_t c, struct raw_message *raw); /* adds necessary headers to packet */ |
145 | int (*flush)(connection_job_t c); /* generates necessary padding and writes as much bytes as possible */ |
146 | |
147 | // CPU-NET METHODS |
148 | int (*free)(connection_job_t c); |
149 | int (*free_buffers)(connection_job_t c); /* invoked from close() to free all buffers */ |
150 | int (*read_write)(connection_job_t c); /* invoked when an event related to connection of this type occurs */ |
151 | int (*wakeup)(connection_job_t c); /* invoked from run() when pending_queries == 0 */ |
152 | int (*alarm)(connection_job_t c); /* invoked when timer is out */ |
153 | |
154 | // NET-NET METHODS |
155 | int (*socket_read_write)(connection_job_t c); /* invoked when an event related to connection of this type occurs */ |
156 | int (*socket_reader)(connection_job_t c); /* invoked from run() for reading network data */ |
157 | int (*socket_writer)(connection_job_t c); /* invoked from run() for writing data */ |
158 | int (*socket_connected)(connection_job_t c); /* invoked from run() when outbound connection is established */ |
159 | int (*socket_free)(connection_job_t c); |
160 | int (*socket_close)(connection_job_t c); |
161 | |
162 | // INLINE FUNCTIONS |
163 | int (*data_received)(connection_job_t c, int r); /* invoked after r>0 bytes are read from socket */ |
164 | int (*data_sent)(connection_job_t c, int w); /* invoked after w>0 bytes are written into socket */ |
165 | int (*ready_to_write)(connection_job_t c); /* invoked from server_writer when Out.total_bytes crosses write_low_watermark ("greater or equal" -> "less") */ |
166 | |
167 | // INLINE METHODS |
168 | int (*crypto_init)(connection_job_t c, void *key_data, int key_data_len); /* < 0 = error */ |
169 | int (*crypto_free)(connection_job_t c); |
170 | int (*crypto_encrypt_output)(connection_job_t c); /* 0 = all ok, >0 = so much more bytes needed to encrypt last block */ |
171 | int (*crypto_decrypt_input)(connection_job_t c); /* 0 = all ok, >0 = so much more bytes needed to decrypt last block */ |
172 | int (*crypto_needed_output_bytes)(connection_job_t c); /* returns # of bytes needed to complete last output block */ |
173 | } conn_type_t; |
174 | |
175 | struct conn_target_info { |
176 | struct event_timer timer; |
177 | int min_connections; |
178 | int max_connections; |
179 | |
180 | struct tree_connection *conn_tree; |
181 | //connection_job_t first_conn, last_conn; |
182 | conn_type_t *type; |
183 | void *; |
184 | struct in_addr target; |
185 | unsigned char target_ipv6[16]; |
186 | int port; |
187 | int active_outbound_connections, outbound_connections; |
188 | int ready_outbound_connections; |
189 | double next_reconnect, reconnect_timeout, next_reconnect_timeout; |
190 | int custom_field; |
191 | conn_target_job_t next_target, prev_target; |
192 | conn_target_job_t hnext; |
193 | |
194 | int global_refcnt; |
195 | }; |
196 | |
197 | struct connection_info { |
198 | struct event_timer timer; |
199 | int fd; |
200 | int generation; |
201 | int flags; |
202 | // connection_job_t next, prev; |
203 | conn_type_t *type; |
204 | void *; |
205 | conn_target_job_t target; |
206 | connection_job_t io_conn; |
207 | int basic_type; |
208 | int status; |
209 | int error; |
210 | int unread_res_bytes; |
211 | int skip_bytes; |
212 | int pending_queries; |
213 | int queries_ok; |
214 | char custom_data[CONN_CUSTOM_DATA_BYTES]; |
215 | unsigned our_ip, remote_ip; |
216 | unsigned our_port, remote_port; |
217 | unsigned char our_ipv6[16], remote_ipv6[16]; |
218 | double query_start_time; |
219 | double last_query_time; |
220 | double last_query_sent_time; |
221 | double last_response_time; |
222 | double last_query_timeout; |
223 | //event_timer_t timer; |
224 | //event_timer_t write_timer; |
225 | int limit_per_write, limit_per_sec; |
226 | int last_write_time, written_per_sec; |
227 | int unreliability; |
228 | int ready; |
229 | //int parse_state; |
230 | int write_low_watermark; |
231 | void *crypto; |
232 | void *crypto_temp; |
233 | int listening, listening_generation; |
234 | int window_clamp; |
235 | |
236 | struct raw_message in_u, in, out, out_p; |
237 | |
238 | struct mp_queue *in_queue; |
239 | struct mp_queue *out_queue; |
240 | |
241 | //netbuffer_t *Tmp, In, Out; |
242 | //char in_buff[BUFF_SIZE]; |
243 | //char out_buff[BUFF_SIZE]; |
244 | }; |
245 | |
246 | struct socket_connection_info { |
247 | struct event_timer timer; |
248 | int fd; |
249 | int pad; |
250 | int flags; |
251 | int current_epoll_status; |
252 | conn_type_t *type; |
253 | event_t *ev; |
254 | connection_job_t conn; |
255 | struct mp_queue *out_packet_queue; |
256 | struct raw_message out; |
257 | unsigned our_ip, remote_ip; |
258 | unsigned our_port, remote_port; |
259 | unsigned char our_ipv6[16], remote_ipv6[16]; |
260 | int write_low_watermark; |
261 | int eagain_count; |
262 | }; |
263 | |
264 | struct listening_connection_info { |
265 | struct event_timer timer; |
266 | int fd; |
267 | int generation; |
268 | int flags; |
269 | int current_epoll_status; |
270 | conn_type_t *type; |
271 | event_t *ev; |
272 | void *; |
273 | int window_clamp; |
274 | }; |
275 | |
276 | struct connections_stat { |
277 | int active_connections; |
278 | int active_dh_connections; |
279 | int outbound_connections; |
280 | int active_outbound_connections; |
281 | int ready_outbound_connections; |
282 | int active_special_connections; |
283 | int max_special_connections; |
284 | int allocated_connections; |
285 | int allocated_outbound_connections; |
286 | int allocated_inbound_connections; |
287 | int allocated_socket_connections; |
288 | int allocated_targets; |
289 | int ready_targets; |
290 | int active_targets; |
291 | int inactive_targets; |
292 | long long tcp_readv_calls; |
293 | long long tcp_readv_intr; |
294 | long long tcp_readv_bytes; |
295 | long long tcp_writev_calls; |
296 | long long tcp_writev_intr; |
297 | long long tcp_writev_bytes; |
298 | long long accept_calls_failed; |
299 | long long accept_nonblock_set_failed; |
300 | long long accept_rate_limit_failed; |
301 | long long accept_init_accepted_failed; |
302 | long long accept_connection_limit_failed; |
303 | }; |
304 | |
305 | #define QUERY_INFO(_c) ((struct query_info *)(_c)->j_custom) |
306 | |
307 | #define CONN_INFO(_conn) ((struct connection_info *)((_conn)->j_custom)) |
308 | #define LISTEN_CONN_INFO(_conn) ((struct listening_connection_info *)((_conn)->j_custom)) |
309 | #define SOCKET_CONN_INFO(_conn) ((struct socket_connection_info *)((_conn)->j_custom)) |
310 | #define CONN_TARGET_INFO(_conn_target) ((struct conn_target_info *)((_conn_target)->j_custom)) |
311 | |
312 | static inline const char *show_ip46 (unsigned ip, const unsigned char ipv6[16]) { return ip ? show_ip (ip) : show_ipv6 (ipv6); } |
313 | static inline const char *show_our_ip (connection_job_t c) { return show_ip46 (CONN_INFO(c)->our_ip, CONN_INFO(c)->our_ipv6); } |
314 | static inline const char *show_remote_ip (connection_job_t c) { return show_ip46 (CONN_INFO(c)->remote_ip, CONN_INFO(c)->remote_ipv6); } |
315 | static inline const char *show_our_socket_ip (socket_connection_job_t c) { return show_ip46 (SOCKET_CONN_INFO(c)->our_ip, SOCKET_CONN_INFO(c)->our_ipv6); } |
316 | static inline const char *show_remote_socket_ip (socket_connection_job_t c) { return show_ip46 (SOCKET_CONN_INFO(c)->remote_ip, SOCKET_CONN_INFO(c)->remote_ipv6); } |
317 | |
318 | void fetch_connections_stat (struct connections_stat *st); |
319 | |
320 | void compute_next_reconnect (conn_target_job_t CT); |
321 | int create_all_outbound_connections (void); |
322 | int clean_unused_target (conn_target_job_t S); |
323 | int create_new_connections (conn_target_job_t S); |
324 | |
325 | int set_connection_timeout (connection_job_t C, double timeout); |
326 | int clear_connection_timeout (connection_job_t C); |
327 | |
328 | int prepare_stats (char *buf, int size); |
329 | void fail_connection (connection_job_t C, int who); |
330 | void fail_socket_connection (socket_connection_job_t C, int who); |
331 | |
332 | |
333 | int destroy_target (JOB_REF_ARG (CTJ)); |
334 | conn_target_job_t create_target (struct conn_target_info *source, int *was_created); |
335 | void compute_next_reconnect (conn_target_job_t CT); |
336 | |
337 | |
338 | static inline connection_job_t connection_incref (connection_job_t C) { return job_incref (C); } |
339 | static inline void connection_decref (connection_job_t C) { job_decref (JOB_REF_PASS (C)); } |
340 | |
341 | connection_job_t connection_get_by_fd (int fd); |
342 | connection_job_t connection_get_by_fd_generation (int fd, int generation); |
343 | |
344 | int cpu_server_reader (connection_job_t C); |
345 | int cpu_server_writer (connection_job_t C); |
346 | int cpu_server_read_write (connection_job_t C); |
347 | //int cpu_free_tmp_buffers (connection_job_t C); |
348 | int cpu_server_free_connection (connection_job_t C); |
349 | int cpu_free_connection_buffers (connection_job_t C); |
350 | int cpu_server_close_connection (connection_job_t C, int who); |
351 | |
352 | |
353 | int net_server_socket_reader (connection_job_t C); |
354 | int net_server_socket_writer (connection_job_t C); |
355 | int net_server_socket_read_write (connection_job_t C); |
356 | |
357 | int net_accept_new_connections (connection_job_t C); |
358 | |
359 | int set_connection_timeout (connection_job_t C, double timeout); |
360 | int clear_connection_timeout (connection_job_t C); |
361 | |
362 | int server_check_ready (connection_job_t C); |
363 | int server_noop (connection_job_t C); |
364 | int server_failed (connection_job_t C); |
365 | |
366 | void connection_write_close (connection_job_t C); |
367 | #define write_out_chk(c,data,len) assert(write_out (&CONN_INFO(c)->Out, data, len) == len); |
368 | #define write_out_old(c,data,len) write_out(&CONN_INFO(c)->Out, data, len) |
369 | #define read_in_old(c,data,len) read_in(&CONN_INFO(c)->In, data, len) |
370 | |
371 | static inline int is_ipv6_localhost (unsigned char ipv6[16]) { |
372 | return !*(long long *)ipv6 && ((long long *)ipv6)[1] == 1LL << 56; |
373 | } |
374 | |
375 | void assert_net_cpu_thread (void); |
376 | void assert_net_net_thread (void); |
377 | void assert_engine_thread (void); |
378 | |
379 | connection_job_t conn_target_get_connection (conn_target_job_t CT, int allow_stopped); |
380 | |
381 | void insert_connection_into_target (conn_target_job_t SS, connection_job_t C); |
382 | struct tree_connection *get_connection_tree (conn_target_job_t SS); |
383 | //void wakeup_main_thread (void); |
384 | |
385 | void delete_connection_tree_ptr (struct tree_connection *T); |
386 | |
387 | int init_listening_connection_ext (int fd, conn_type_t *type, void *, int mode, int prio); |
388 | int init_listening_connection (int fd, conn_type_t *type, void *); |
389 | int init_listening_tcpv6_connection (int fd, conn_type_t *type, void *, int mode); |
390 | |
391 | //struct tree_connection *get_connection_tree_ptr (struct tree_connection **); |
392 | //void free_connection_tree_ptr (struct tree_connection *); |
393 | |
394 | struct free_later { |
395 | void *ptr; |
396 | void (*free)(void *); |
397 | }; |
398 | |
399 | |
400 | struct query_info { |
401 | struct event_timer ev; |
402 | struct raw_message raw; |
403 | int src_type; |
404 | struct process_id src_pid; |
405 | void *conn; |
406 | }; |
407 | |
408 | void free_later_act (void); |
409 | |
410 | void incr_active_dh_connections (void); |
411 | int check_conn_functions (conn_type_t *type, int listening); |
412 | |
413 | #define QUERY_INFO(_c) ((struct query_info *)(_c)->j_custom) |
414 | void insert_free_later_struct (struct free_later *F); |
415 | int new_conn_generation (void); |
416 | int get_cur_conn_generation (void); |
417 | |
418 | void tcp_set_max_accept_rate (int rate); |
419 | void tcp_set_max_connections (int maxconn); |
420 | |
421 | extern int max_special_connections, active_special_connections; |
422 | |
423 | #define MAX_NAT_INFO_RULES 16 |
424 | extern int nat_info_rules; |
425 | extern unsigned nat_info[MAX_NAT_INFO_RULES][2]; |
426 | |
427 | int net_add_nat_info (char *str); |
428 | unsigned nat_translate_ip (unsigned local_ip); |
429 | |
430 | connection_job_t alloc_new_connection (int cfd, conn_target_job_t SS, connection_job_t LL, unsigned peer, unsigned char peer_ipv6[16], int peer_port); |
431 | |