1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2009-2013 Vkontakte Ltd
18 2008-2013 Nikolai Durov
19 2008-2013 Andrey Lopatin
20
21 Copyright 2014 Telegram Messenger Inc
22 2014 Nikolai Durov
23 2014 Andrey Lopatin
24
25 Copyright 2015-2016 Telegram Messenger Inc
26 2015-2016 Vitaly Valtman
27*/
28
29#pragma once
30
31//#include "net/net-buffers.h"
32#include "net/net-events.h"
33#include "net/net-msg.h"
34#include "jobs/jobs.h"
35#include "common/mp-queue.h"
36#include "common/pid.h"
37
38#define MAX_CONNECTIONS 65536
39#define MAX_TARGETS 65536
40#define PRIME_TARGETS 99961
41#define MAX_SPECIAL_LISTEN_SOCKETS 64
42
43#define MAX_TCP_RECV_BUFFERS 128
44#define TCP_RECV_BUFFER_SIZE 1024
45
46#define MAX_NET_RES (1L << 16)
47
48//#define BUFF_SIZE 2048
49
50#define CONN_CUSTOM_DATA_BYTES 256
51
52#define NEED_MORE_BYTES (~(-1 << 31))
53#define SKIP_ALL_BYTES (-1 << 31)
54
55
56/* for connection flags */
57#define C_WANTRD 1
58#define C_WANTWR 2
59#define C_WANTRW (C_WANTRD | C_WANTWR)
60#define C_INCONN 4
61#define C_ERROR 8
62#define C_NORD 0x10
63#define C_NOWR 0x20
64#define C_NORW (C_NORD | C_NOWR)
65#define C_INQUERY 0x40
66#define C_FAILED 0x80
67#define C_ALARM 0x100
68#define C_AIO 0x200
69#define C_INTIMEOUT 0x400
70#define C_STOPREAD 0x800
71#define C_REPARSE 0x1000
72#define C_DFLUSH 0x2000
73#define C_IPV6 0x4000
74#define C_EXTERNAL 0x8000
75#define C_SPECIAL 0x10000
76#define C_NOQACK 0x20000
77#define C_RAWMSG 0x40000
78#define C_NET_FAILED 0x80000
79#define C_CRYPTOIN 0x100000
80#define C_CRYPTOOUT 0x200000
81#define C_STOPPARSE 0x400000
82#define C_ISDH 0x800000
83#define C_READY_PENDING 0x1000000
84#define C_CONNECTED 0x2000000
85#define C_STOPWRITE 0x4000000
86
87#define C_PERMANENT (C_IPV6 | C_RAWMSG)
88/* for connection status */
89enum {
90 conn_none, // closed/uninitialized
91 conn_connecting,
92 conn_working,
93 conn_error, // connection in bad state (it will be probably closed)
94 conn_listen, // listening for inbound connections
95 conn_write_close, // write all output buffer, then close; don't read input
96 conn_total_states // total number of connection states
97};
98
99/* for connection basic_type */
100enum {
101 ct_none, // no connection (closed)
102 ct_listen, // listening socket
103 ct_inbound, // inbound connection
104 ct_outbound, // outbound connection
105 ct_pipe, // used for pipe reading
106 ct_job // used for async jobs ( net-jobs.h )
107};
108
109/* for connection->ready of outbound connections */
110enum {
111 cr_notyet, // not ready yet (e.g. logging in)
112 cr_ok, // working
113 cr_stopped, // stopped (don't send more queries)
114 cr_busy, // busy (sending queries not allowed by protocol)
115 cr_failed // failed (possibly timed out)
116};
117
118
119typedef job_t connection_job_t;
120typedef job_t socket_connection_job_t;
121typedef job_t listening_connection_job_t;
122typedef job_t conn_target_job_t;
123typedef job_t query_job_t;
124
125
126/* connection function table */
127
128#define CONN_FUNC_MAGIC 0x11ef55aa
129
130typedef struct conn_functions {
131 int magic;
132 int flags; /* may contain for example C_RAWMSG; (partially) inherited by inbound/outbound connections */
133 char *title;
134 int (*accept)(connection_job_t c); /* invoked for listen/accept connections of this type */
135 int (*init_accepted)(connection_job_t c); /* initialize a new accept()'ed connection */
136 int (*reader)(connection_job_t c); /* invoked from run() for reading network data */
137 int (*writer)(connection_job_t c); /* invoked from run() for writing data */
138 int (*close)(connection_job_t c, int who); /* invoked from run() whenever we need to close connection */
139 int (*parse_execute)(connection_job_t c); /* invoked from reader() for parsing and executing one query */
140 int (*init_outbound)(connection_job_t c); /* initializes newly created outbound connection */
141 int (*connected)(connection_job_t c); /* invoked from run() when outbound connection is established */
142 int (*check_ready)(connection_job_t c); /* updates conn->ready if necessary and returns it */
143 int (*wakeup_aio)(connection_job_t c, int r);/* invoked from net_aio.c::check_aio_completion when aio read operation is complete */
144 int (*write_packet)(connection_job_t c, struct raw_message *raw); /* adds necessary headers to packet */
145 int (*flush)(connection_job_t c); /* generates necessary padding and writes as much bytes as possible */
146
147 // CPU-NET METHODS
148 int (*free)(connection_job_t c);
149 int (*free_buffers)(connection_job_t c); /* invoked from close() to free all buffers */
150 int (*read_write)(connection_job_t c); /* invoked when an event related to connection of this type occurs */
151 int (*wakeup)(connection_job_t c); /* invoked from run() when pending_queries == 0 */
152 int (*alarm)(connection_job_t c); /* invoked when timer is out */
153
154 // NET-NET METHODS
155 int (*socket_read_write)(connection_job_t c); /* invoked when an event related to connection of this type occurs */
156 int (*socket_reader)(connection_job_t c); /* invoked from run() for reading network data */
157 int (*socket_writer)(connection_job_t c); /* invoked from run() for writing data */
158 int (*socket_connected)(connection_job_t c); /* invoked from run() when outbound connection is established */
159 int (*socket_free)(connection_job_t c);
160 int (*socket_close)(connection_job_t c);
161
162 // INLINE FUNCTIONS
163 int (*data_received)(connection_job_t c, int r); /* invoked after r>0 bytes are read from socket */
164 int (*data_sent)(connection_job_t c, int w); /* invoked after w>0 bytes are written into socket */
165 int (*ready_to_write)(connection_job_t c); /* invoked from server_writer when Out.total_bytes crosses write_low_watermark ("greater or equal" -> "less") */
166
167 // INLINE METHODS
168 int (*crypto_init)(connection_job_t c, void *key_data, int key_data_len); /* < 0 = error */
169 int (*crypto_free)(connection_job_t c);
170 int (*crypto_encrypt_output)(connection_job_t c); /* 0 = all ok, >0 = so much more bytes needed to encrypt last block */
171 int (*crypto_decrypt_input)(connection_job_t c); /* 0 = all ok, >0 = so much more bytes needed to decrypt last block */
172 int (*crypto_needed_output_bytes)(connection_job_t c); /* returns # of bytes needed to complete last output block */
173} conn_type_t;
174
175struct conn_target_info {
176 struct event_timer timer;
177 int min_connections;
178 int max_connections;
179
180 struct tree_connection *conn_tree;
181 //connection_job_t first_conn, last_conn;
182 conn_type_t *type;
183 void *extra;
184 struct in_addr target;
185 unsigned char target_ipv6[16];
186 int port;
187 int active_outbound_connections, outbound_connections;
188 int ready_outbound_connections;
189 double next_reconnect, reconnect_timeout, next_reconnect_timeout;
190 int custom_field;
191 conn_target_job_t next_target, prev_target;
192 conn_target_job_t hnext;
193
194 int global_refcnt;
195};
196
197struct connection_info {
198 struct event_timer timer;
199 int fd;
200 int generation;
201 int flags;
202 // connection_job_t next, prev;
203 conn_type_t *type;
204 void *extra;
205 conn_target_job_t target;
206 connection_job_t io_conn;
207 int basic_type;
208 int status;
209 int error;
210 int unread_res_bytes;
211 int skip_bytes;
212 int pending_queries;
213 int queries_ok;
214 char custom_data[CONN_CUSTOM_DATA_BYTES];
215 unsigned our_ip, remote_ip;
216 unsigned our_port, remote_port;
217 unsigned char our_ipv6[16], remote_ipv6[16];
218 double query_start_time;
219 double last_query_time;
220 double last_query_sent_time;
221 double last_response_time;
222 double last_query_timeout;
223 //event_timer_t timer;
224 //event_timer_t write_timer;
225 int limit_per_write, limit_per_sec;
226 int last_write_time, written_per_sec;
227 int unreliability;
228 int ready;
229 //int parse_state;
230 int write_low_watermark;
231 void *crypto;
232 void *crypto_temp;
233 int listening, listening_generation;
234 int window_clamp;
235
236 struct raw_message in_u, in, out, out_p;
237
238 struct mp_queue *in_queue;
239 struct mp_queue *out_queue;
240
241 //netbuffer_t *Tmp, In, Out;
242 //char in_buff[BUFF_SIZE];
243 //char out_buff[BUFF_SIZE];
244};
245
246struct socket_connection_info {
247 struct event_timer timer;
248 int fd;
249 int pad;
250 int flags;
251 int current_epoll_status;
252 conn_type_t *type;
253 event_t *ev;
254 connection_job_t conn;
255 struct mp_queue *out_packet_queue;
256 struct raw_message out;
257 unsigned our_ip, remote_ip;
258 unsigned our_port, remote_port;
259 unsigned char our_ipv6[16], remote_ipv6[16];
260 int write_low_watermark;
261 int eagain_count;
262};
263
264struct listening_connection_info {
265 struct event_timer timer;
266 int fd;
267 int generation;
268 int flags;
269 int current_epoll_status;
270 conn_type_t *type;
271 event_t *ev;
272 void *extra;
273 int window_clamp;
274};
275
276struct connections_stat {
277 int active_connections;
278 int active_dh_connections;
279 int outbound_connections;
280 int active_outbound_connections;
281 int ready_outbound_connections;
282 int active_special_connections;
283 int max_special_connections;
284 int allocated_connections;
285 int allocated_outbound_connections;
286 int allocated_inbound_connections;
287 int allocated_socket_connections;
288 int allocated_targets;
289 int ready_targets;
290 int active_targets;
291 int inactive_targets;
292 long long tcp_readv_calls;
293 long long tcp_readv_intr;
294 long long tcp_readv_bytes;
295 long long tcp_writev_calls;
296 long long tcp_writev_intr;
297 long long tcp_writev_bytes;
298 long long accept_calls_failed;
299 long long accept_nonblock_set_failed;
300 long long accept_rate_limit_failed;
301 long long accept_init_accepted_failed;
302 long long accept_connection_limit_failed;
303};
304
305#define QUERY_INFO(_c) ((struct query_info *)(_c)->j_custom)
306
307#define CONN_INFO(_conn) ((struct connection_info *)((_conn)->j_custom))
308#define LISTEN_CONN_INFO(_conn) ((struct listening_connection_info *)((_conn)->j_custom))
309#define SOCKET_CONN_INFO(_conn) ((struct socket_connection_info *)((_conn)->j_custom))
310#define CONN_TARGET_INFO(_conn_target) ((struct conn_target_info *)((_conn_target)->j_custom))
311
312static inline const char *show_ip46 (unsigned ip, const unsigned char ipv6[16]) { return ip ? show_ip (ip) : show_ipv6 (ipv6); }
313static inline const char *show_our_ip (connection_job_t c) { return show_ip46 (CONN_INFO(c)->our_ip, CONN_INFO(c)->our_ipv6); }
314static inline const char *show_remote_ip (connection_job_t c) { return show_ip46 (CONN_INFO(c)->remote_ip, CONN_INFO(c)->remote_ipv6); }
315static inline const char *show_our_socket_ip (socket_connection_job_t c) { return show_ip46 (SOCKET_CONN_INFO(c)->our_ip, SOCKET_CONN_INFO(c)->our_ipv6); }
316static inline const char *show_remote_socket_ip (socket_connection_job_t c) { return show_ip46 (SOCKET_CONN_INFO(c)->remote_ip, SOCKET_CONN_INFO(c)->remote_ipv6); }
317
318void fetch_connections_stat (struct connections_stat *st);
319
320void compute_next_reconnect (conn_target_job_t CT);
321int create_all_outbound_connections (void);
322int clean_unused_target (conn_target_job_t S);
323int create_new_connections (conn_target_job_t S);
324
325int set_connection_timeout (connection_job_t C, double timeout);
326int clear_connection_timeout (connection_job_t C);
327
328int prepare_stats (char *buf, int size);
329void fail_connection (connection_job_t C, int who);
330void fail_socket_connection (socket_connection_job_t C, int who);
331
332
333int destroy_target (JOB_REF_ARG (CTJ));
334conn_target_job_t create_target (struct conn_target_info *source, int *was_created);
335void compute_next_reconnect (conn_target_job_t CT);
336
337
338static inline connection_job_t connection_incref (connection_job_t C) { return job_incref (C); }
339static inline void connection_decref (connection_job_t C) { job_decref (JOB_REF_PASS (C)); }
340
341connection_job_t connection_get_by_fd (int fd);
342connection_job_t connection_get_by_fd_generation (int fd, int generation);
343
344int cpu_server_reader (connection_job_t C);
345int cpu_server_writer (connection_job_t C);
346int cpu_server_read_write (connection_job_t C);
347//int cpu_free_tmp_buffers (connection_job_t C);
348int cpu_server_free_connection (connection_job_t C);
349int cpu_free_connection_buffers (connection_job_t C);
350int cpu_server_close_connection (connection_job_t C, int who);
351
352
353int net_server_socket_reader (connection_job_t C);
354int net_server_socket_writer (connection_job_t C);
355int net_server_socket_read_write (connection_job_t C);
356
357int net_accept_new_connections (connection_job_t C);
358
359int set_connection_timeout (connection_job_t C, double timeout);
360int clear_connection_timeout (connection_job_t C);
361
362int server_check_ready (connection_job_t C);
363int server_noop (connection_job_t C);
364int server_failed (connection_job_t C);
365
366void connection_write_close (connection_job_t C);
367#define write_out_chk(c,data,len) assert(write_out (&CONN_INFO(c)->Out, data, len) == len);
368#define write_out_old(c,data,len) write_out(&CONN_INFO(c)->Out, data, len)
369#define read_in_old(c,data,len) read_in(&CONN_INFO(c)->In, data, len)
370
371static inline int is_ipv6_localhost (unsigned char ipv6[16]) {
372 return !*(long long *)ipv6 && ((long long *)ipv6)[1] == 1LL << 56;
373}
374
375void assert_net_cpu_thread (void);
376void assert_net_net_thread (void);
377void assert_engine_thread (void);
378
379connection_job_t conn_target_get_connection (conn_target_job_t CT, int allow_stopped);
380
381void insert_connection_into_target (conn_target_job_t SS, connection_job_t C);
382struct tree_connection *get_connection_tree (conn_target_job_t SS);
383//void wakeup_main_thread (void);
384
385void delete_connection_tree_ptr (struct tree_connection *T);
386
387int init_listening_connection_ext (int fd, conn_type_t *type, void *extra, int mode, int prio);
388int init_listening_connection (int fd, conn_type_t *type, void *extra);
389int init_listening_tcpv6_connection (int fd, conn_type_t *type, void *extra, int mode);
390
391//struct tree_connection *get_connection_tree_ptr (struct tree_connection **);
392//void free_connection_tree_ptr (struct tree_connection *);
393
394struct free_later {
395 void *ptr;
396 void (*free)(void *);
397};
398
399
400struct query_info {
401 struct event_timer ev;
402 struct raw_message raw;
403 int src_type;
404 struct process_id src_pid;
405 void *conn;
406};
407
408void free_later_act (void);
409
410void incr_active_dh_connections (void);
411int check_conn_functions (conn_type_t *type, int listening);
412
413#define QUERY_INFO(_c) ((struct query_info *)(_c)->j_custom)
414void insert_free_later_struct (struct free_later *F);
415int new_conn_generation (void);
416int get_cur_conn_generation (void);
417
418void tcp_set_max_accept_rate (int rate);
419void tcp_set_max_connections (int maxconn);
420
421extern int max_special_connections, active_special_connections;
422
423#define MAX_NAT_INFO_RULES 16
424extern int nat_info_rules;
425extern unsigned nat_info[MAX_NAT_INFO_RULES][2];
426
427int net_add_nat_info (char *str);
428unsigned nat_translate_ip (unsigned local_ip);
429
430connection_job_t alloc_new_connection (int cfd, conn_target_job_t SS, connection_job_t LL, unsigned peer, unsigned char peer_ipv6[16], int peer_port);
431