1/*
2 This file is part of MTProto-proxy
3
4 MTProto-proxy is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 MTProto-Server is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with MTProto-Server. If not, see <http://www.gnu.org/licenses/>.
16
17 This program is released under the GPL with the additional exemption
18 that compiling, linking, and/or using OpenSSL is allowed.
19 You are free to remove this exemption from derived works.
20
21 Copyright 2012-2018 Nikolai Durov
22 2012-2014 Andrey Lopatin
23 2014-2018 Telegram Messenger Inc
24*/
25#define _FILE_OFFSET_BITS 64
26
27#include <assert.h>
28#include <errno.h>
29#include <string.h>
30#include <stdio.h>
31#include <stdlib.h>
32#include <time.h>
33#include <unistd.h>
34#include <netinet/in.h>
35#include <netinet/tcp.h>
36#include <sys/socket.h>
37#include <sys/types.h>
38#include <sys/wait.h>
39#include <sys/mman.h>
40#include <netdb.h>
41#include <ctype.h>
42#include <openssl/rand.h>
43#include <openssl/rsa.h>
44#include <openssl/pem.h>
45
46#include "crc32.h"
47#include "md5.h"
48#include "resolver.h"
49#include "net/net-events.h"
50#include "kprintf.h"
51#include "precise-time.h"
52#include "server-functions.h"
53#include "net/net-tcp-connections.h"
54#include "net/net-rpc-targets.h"
55#include "net/net-http-server.h"
56#include "net/net-tcp-rpc-server.h"
57#include "net/net-tcp-rpc-client.h"
58#include "net/net-tcp-rpc-ext-server.h"
59#include "net/net-crypto-aes.h"
60#include "net/net-crypto-dh.h"
61#include "mtproto-common.h"
62#include "mtproto-config.h"
63#include "common/tl-parse.h"
64#include "engine/engine.h"
65
66#ifndef COMMIT
67#define COMMIT "unknown"
68#endif
69
70#define VERSION_STR "mtproxy-0.01"
71const char FullVersionStr[] = VERSION_STR " compiled at " __DATE__ " " __TIME__ " by gcc " __VERSION__ " "
72#ifdef __LP64__
73 "64-bit"
74#else
75 "32-bit"
76#endif
77" after commit " COMMIT;
78
79#define EXT_CONN_TABLE_SIZE (1 << 22)
80#define EXT_CONN_HASH_SHIFT 20
81#define EXT_CONN_HASH_SIZE (1 << EXT_CONN_HASH_SHIFT)
82
83#define RPC_TIMEOUT_INTERVAL 5.0
84
85#define MAX_HTTP_LISTEN_PORTS 128
86
87#define HTTP_MAX_WAIT_TIMEOUT 960.0
88
89#define PING_INTERVAL 5.0
90#define STOP_INTERVAL (2 * ping_interval)
91#define FAIL_INTERVAL (20 * ping_interval)
92#define RESPONSE_FAIL_TIMEOUT 5
93#define CONNECT_TIMEOUT 3
94
95#define MAX_POST_SIZE (262144 * 4 - 4096)
96
97#define DEFAULT_WINDOW_CLAMP 131072
98
99// #define DEFAULT_OUTBOUND_CONNECTION_CREATION_RATE 1000000
100
101#if 0
102#define MAX_CONNECTION_BUFFER_SPACE (1 << 10) //(1 << 25)
103#define MAX_MTFRONT_NB 1 //((NB_max * 3) >> 2)
104#else
105#define MAX_CONNECTION_BUFFER_SPACE (1 << 25)
106#define MAX_MTFRONT_NB ((NB_max * 3) >> 2)
107#endif
108
109double ping_interval = PING_INTERVAL;
110int window_clamp = DEFAULT_WINDOW_CLAMP;
111
112#define PROXY_MODE_OUT 2
113int proxy_mode;
114
115#define IS_PROXY_IN 0
116#define IS_PROXY_OUT 1
117#define IS_PROXY_INOUT 1
118
119#define TL_HTTP_QUERY_INFO 0xd45ab381
120#define TL_PROXY_TAG 0xdb1e26ae
121
122conn_type_t ct_http_server_mtfront, ct_tcp_rpc_ext_server_mtfront, ct_tcp_rpc_server_mtfront;
123
124long long connections_failed_lru, connections_failed_flood;
125long long api_invoke_requests;
126
127volatile int sigpoll_cnt;
128
129#define STATS_BUFF_SIZE (1 << 20)
130
131int stats_buff_len;
132char stats_buff[STATS_BUFF_SIZE];
133
134
135// current HTTP query headers
136char cur_http_origin[1024], cur_http_referer[1024], cur_http_user_agent[1024];
137int cur_http_origin_len, cur_http_referer_len, cur_http_user_agent_len;
138
139int check_conn_buffers (connection_job_t c);
140void lru_insert_conn (connection_job_t c);
141
142/*
143 *
144 * CONFIGURATION PARSER SETUP
145 *
146 */
147
148#define DEFAULT_CFG_MIN_CONNECTIONS 4
149#define DEFAULT_CFG_MAX_CONNECTIONS 8
150
151int default_cfg_min_connections = DEFAULT_CFG_MIN_CONNECTIONS;
152int default_cfg_max_connections = DEFAULT_CFG_MAX_CONNECTIONS;
153
154struct tcp_rpc_client_functions mtfront_rpc_client;
155
156conn_type_t ct_tcp_rpc_client_mtfront;
157
158struct conn_target_info default_cfg_ct = {
159.min_connections = DEFAULT_CFG_MIN_CONNECTIONS,
160.max_connections = DEFAULT_CFG_MAX_CONNECTIONS,
161.type = &ct_tcp_rpc_client_mtfront,
162.extra = (void *)&mtfront_rpc_client,
163.reconnect_timeout = 17
164};
165
166
167/*
168 *
169 * EXTERNAL CONNECTIONS TABLE
170 *
171 */
172
173struct ext_connection {
174 struct ext_connection *o_prev, *o_next; // list of all with same out_fd
175 struct ext_connection *i_prev, *i_next; // list of all with same in_fd
176 struct ext_connection *h_next; // next in hash on (in_fd, in_conn_id)
177 int in_fd, in_gen;
178 int out_fd, out_gen;
179 long long in_conn_id;
180 long long out_conn_id;
181 long long auth_key_id;
182 struct ext_connection *lru_prev, *lru_next;
183};
184
185struct ext_connection_ref {
186 struct ext_connection *ref;
187 long long out_conn_id;
188};
189
190long long ext_connections, ext_connections_created;
191
192struct ext_connection_ref OutExtConnections[EXT_CONN_TABLE_SIZE];
193struct ext_connection *InExtConnectionHash[EXT_CONN_HASH_SIZE];
194struct ext_connection ExtConnectionHead[MAX_CONNECTIONS];
195
196void lru_delete_ext_conn (struct ext_connection *Ext);
197
198static inline void check_engine_class (void) {
199 check_thread_class (JC_ENGINE);
200}
201
202static inline int ext_conn_hash (int in_fd, long long in_conn_id) {
203 unsigned long long h = (unsigned long long) in_fd * 11400714819323198485ULL + (unsigned long long) in_conn_id * 13043817825332782213ULL;
204 return (h >> (64 - EXT_CONN_HASH_SHIFT));
205}
206
207// makes sense only for !IS_PROXY_IN
208// returns the only ext_connection with given in_fd
209struct ext_connection *get_ext_connection_by_in_fd (int in_fd) {
210 check_engine_class ();
211 assert ((unsigned) in_fd < MAX_CONNECTIONS);
212 struct ext_connection *H = &ExtConnectionHead[in_fd];
213 struct ext_connection *Ex = H->i_next;
214 assert (H->i_next == H->i_prev);
215 if (!Ex || Ex == H) {
216 return 0;
217 }
218 assert (Ex->in_fd == in_fd);
219 return Ex;
220}
221
222// mode: 0 = find, 1 = delete, 2 = create if not found, 3 = find or create
223struct ext_connection *get_ext_connection_by_in_conn_id (int in_fd, int in_gen, long long in_conn_id, int mode, int *created) {
224 check_engine_class ();
225 int h = ext_conn_hash (in_fd, in_conn_id);
226 struct ext_connection **prev = &InExtConnectionHash[h], *cur = *prev;
227 for (; cur; cur = *prev) {
228 if (cur->in_fd == in_fd && cur->in_conn_id == in_conn_id) {
229 assert (cur->out_conn_id);
230 if (mode == 0 || mode == 3) {
231 return cur;
232 }
233 if (mode != 1) {
234 return 0;
235 }
236 if (cur->i_next) {
237 cur->i_next->i_prev = cur->i_prev;
238 cur->i_prev->i_next = cur->i_next;
239 cur->i_next = cur->i_prev = 0;
240 }
241 if (cur->o_next) {
242 cur->o_next->o_prev = cur->o_prev;
243 cur->o_prev->o_next = cur->o_next;
244 cur->o_next = cur->o_prev = 0;
245 }
246 lru_delete_ext_conn (cur);
247 *prev = cur->h_next;
248 cur->h_next = 0;
249 int h = cur->out_conn_id & (EXT_CONN_TABLE_SIZE - 1);
250 assert (OutExtConnections[h].ref == cur);
251 assert (OutExtConnections[h].out_conn_id == cur->out_conn_id);
252 OutExtConnections[h].ref = 0;
253 cur->out_conn_id = 0;
254 memset (cur, 0, sizeof (struct ext_connection));
255 free (cur);
256 ext_connections--;
257 return (void *) -1L;
258 }
259 prev = &(cur->h_next);
260 }
261 if (mode != 2 && mode != 3) {
262 return 0;
263 }
264 assert (ext_connections < EXT_CONN_TABLE_SIZE / 2);
265 cur = calloc (sizeof (struct ext_connection), 1);
266 assert (cur);
267 cur->h_next = InExtConnectionHash[h];
268 InExtConnectionHash[h] = cur;
269 cur->in_fd = in_fd;
270 cur->in_gen = in_gen;
271 cur->in_conn_id = in_conn_id;
272 assert ((unsigned) in_fd < MAX_CONNECTIONS);
273 if (in_fd) {
274 struct ext_connection *H = &ExtConnectionHead[in_fd];
275 if (!H->i_next) {
276 H->i_next = H->i_prev = H;
277 }
278 assert (H->i_next == H);
279 cur->i_next = H;
280 cur->i_prev = H->i_prev;
281 H->i_prev->i_next = cur;
282 H->i_prev = cur;
283 }
284 h = in_conn_id ? lrand48() : in_fd;
285 while (OutExtConnections[h &= (EXT_CONN_TABLE_SIZE - 1)].ref) {
286 h = lrand48();
287 }
288 OutExtConnections[h].ref = cur;
289 cur->out_conn_id = OutExtConnections[h].out_conn_id = (OutExtConnections[h].out_conn_id | (EXT_CONN_TABLE_SIZE - 1)) + 1 + h;
290 assert (cur->out_conn_id);
291 if (created) {
292 ++*created;
293 }
294 ext_connections++;
295 ext_connections_created++;
296 return cur;
297}
298
299struct ext_connection *find_ext_connection_by_out_conn_id (long long out_conn_id) {
300 check_engine_class ();
301 int h = out_conn_id & (EXT_CONN_TABLE_SIZE - 1);
302 struct ext_connection *cur = OutExtConnections[h].ref;
303 if (!cur || OutExtConnections[h].out_conn_id != out_conn_id) {
304 return 0;
305 }
306 assert (cur->out_conn_id == out_conn_id);
307 return cur;
308}
309
310// MUST be new
311struct ext_connection *create_ext_connection (connection_job_t CI, long long in_conn_id, connection_job_t CO, long long auth_key_id) {
312 check_engine_class ();
313 struct ext_connection *Ex = get_ext_connection_by_in_conn_id (CONN_INFO(CI)->fd, CONN_INFO(CI)->generation, in_conn_id, 2, 0);
314 assert (Ex && "ext_connection already exists");
315 assert (!Ex->out_fd && !Ex->o_next && !Ex->auth_key_id);
316 assert (!CO || (unsigned) CONN_INFO(CO)->fd < MAX_CONNECTIONS);
317 assert (CO != CI);
318 if (CO) {
319 struct ext_connection *H = &ExtConnectionHead[CONN_INFO(CO)->fd];
320 assert (H->o_next);
321 Ex->o_next = H;
322 Ex->o_prev = H->o_prev;
323 H->o_prev->o_next = Ex;
324 H->o_prev = Ex;
325 Ex->out_fd = CONN_INFO(CO)->fd;
326 Ex->out_gen = CONN_INFO(CO)->generation;
327 }
328 Ex->auth_key_id = auth_key_id;
329 return Ex;
330}
331
332static int _notify_remote_closed (JOB_REF_ARG(C), long long out_conn_id);
333
334void remove_ext_connection (struct ext_connection *Ex, int send_notifications) {
335 assert (Ex);
336 assert (Ex->out_conn_id);
337 assert (Ex == find_ext_connection_by_out_conn_id (Ex->out_conn_id));
338 if (Ex->out_fd) {
339 assert ((unsigned) Ex->out_fd < MAX_CONNECTIONS);
340 assert (Ex->o_next);
341 if (send_notifications & 1) {
342 connection_job_t CO = connection_get_by_fd_generation (Ex->out_fd, Ex->out_gen);
343 if (CO) {
344 _notify_remote_closed (JOB_REF_PASS (CO), Ex->out_conn_id);
345 }
346 }
347 }
348 if (Ex->in_fd) {
349 assert ((unsigned) Ex->in_fd < MAX_CONNECTIONS);
350 assert (Ex->i_next);
351 if (send_notifications & 2) {
352 connection_job_t CI = connection_get_by_fd_generation (Ex->in_fd, Ex->in_gen);
353 if (Ex->in_conn_id) {
354 assert (0);
355 } else {
356 if (CI) {
357 fail_connection (CI, -33);
358 job_decref (JOB_REF_PASS (CI));
359 }
360 }
361 }
362 }
363 assert (get_ext_connection_by_in_conn_id (Ex->in_fd, Ex->in_gen, Ex->in_conn_id, 1, 0) == (void *) -1L);
364}
365
366/*
367 *
368 * MULTIPROCESS STATISTICS
369 *
370 */
371
372#define MAX_WORKERS 256
373
374struct worker_stats {
375 int cnt;
376 int updated_at;
377
378 struct buffers_stat bufs;
379 struct connections_stat conn;
380 int allocated_aes_crypto, allocated_aes_crypto_temp;
381 long long tot_dh_rounds[3];
382
383 int ev_heap_size;
384 int http_connections;
385
386 long long get_queries;
387 int pending_http_queries;
388
389 long long accept_calls_failed, accept_nonblock_set_failed, accept_connection_limit_failed,
390 accept_rate_limit_failed, accept_init_accepted_failed;
391
392 long long active_rpcs, active_rpcs_created;
393 long long rpc_dropped_running, rpc_dropped_answers;
394 long long tot_forwarded_queries, expired_forwarded_queries;
395 long long tot_forwarded_responses;
396 long long dropped_queries, dropped_responses;
397 long long tot_forwarded_simple_acks, dropped_simple_acks;
398 long long mtproto_proxy_errors;
399
400 long long connections_failed_lru, connections_failed_flood;
401
402 long long ext_connections, ext_connections_created;
403 long long http_queries, http_bad_headers;
404};
405
406struct worker_stats *WStats, SumStats;
407int worker_id, workers, slave_mode, parent_pid;
408int pids[MAX_WORKERS];
409
410long long get_queries;
411long long http_queries;
412int pending_http_queries;
413
414long long active_rpcs, active_rpcs_created;
415long long rpc_dropped_running, rpc_dropped_answers;
416long long tot_forwarded_queries, expired_forwarded_queries, dropped_queries;
417long long tot_forwarded_responses, dropped_responses;
418long long tot_forwarded_simple_acks, dropped_simple_acks;
419long long mtproto_proxy_errors;
420
421char proxy_tag[16];
422int proxy_tag_set;
423
424static void update_local_stats_copy (struct worker_stats *S) {
425 S->cnt++;
426 __sync_synchronize();
427 S->updated_at = now;
428#define UPD(x) S->x = x;
429 fetch_tot_dh_rounds_stat (S->tot_dh_rounds);
430 fetch_connections_stat (&S->conn);
431 fetch_aes_crypto_stat (&S->allocated_aes_crypto, &S->allocated_aes_crypto_temp);
432 fetch_buffers_stat (&S->bufs);
433
434 UPD (ev_heap_size);
435
436 UPD (get_queries);
437 UPD (http_connections);
438 UPD (pending_http_queries);
439 UPD (active_rpcs);
440 UPD (active_rpcs_created);
441 UPD (rpc_dropped_running);
442 UPD (rpc_dropped_answers);
443 UPD (tot_forwarded_queries);
444 UPD (expired_forwarded_queries);
445 UPD (dropped_queries);
446 UPD (tot_forwarded_responses);
447 UPD (dropped_responses);
448 UPD (tot_forwarded_simple_acks);
449 UPD (dropped_simple_acks);
450 UPD (mtproto_proxy_errors);
451 UPD (connections_failed_lru);
452 UPD (connections_failed_flood);
453 UPD (ext_connections);
454 UPD (ext_connections_created);
455 UPD (http_queries);
456 UPD (http_bad_headers);
457#undef UPD
458 __sync_synchronize();
459 S->cnt++;
460 __sync_synchronize();
461}
462
463static inline void add_stats (struct worker_stats *W) {
464#define UPD(x) SumStats.x += W->x;
465 UPD (tot_dh_rounds[0]);
466 UPD (tot_dh_rounds[1]);
467 UPD (tot_dh_rounds[2]);
468
469 UPD (conn.active_connections);
470 UPD (conn.active_dh_connections);
471 UPD (conn.outbound_connections);
472 UPD (conn.active_outbound_connections);
473 UPD (conn.ready_outbound_connections);
474 UPD (conn.active_special_connections);
475 UPD (conn.max_special_connections);
476 UPD (conn.allocated_connections);
477 UPD (conn.allocated_outbound_connections);
478 UPD (conn.allocated_inbound_connections);
479 UPD (conn.allocated_socket_connections);
480 UPD (conn.allocated_targets);
481 UPD (conn.ready_targets);
482 UPD (conn.active_targets);
483 UPD (conn.inactive_targets);
484 UPD (conn.tcp_readv_calls);
485 UPD (conn.tcp_readv_intr);
486 UPD (conn.tcp_readv_bytes);
487 UPD (conn.tcp_writev_calls);
488 UPD (conn.tcp_writev_intr);
489 UPD (conn.tcp_writev_bytes);
490 UPD (conn.accept_calls_failed);
491 UPD (conn.accept_nonblock_set_failed);
492 UPD (conn.accept_rate_limit_failed);
493 UPD (conn.accept_init_accepted_failed);
494
495 UPD (allocated_aes_crypto);
496 UPD (allocated_aes_crypto_temp);
497
498 UPD (bufs.total_used_buffers_size);
499 UPD (bufs.allocated_buffer_bytes);
500 UPD (bufs.total_used_buffers);
501 UPD (bufs.allocated_buffer_chunks);
502 UPD (bufs.max_allocated_buffer_chunks);
503 UPD (bufs.max_allocated_buffer_bytes);
504 UPD (bufs.max_buffer_chunks);
505 UPD (bufs.buffer_chunk_alloc_ops);
506
507 UPD (ev_heap_size);
508
509 UPD (get_queries);
510 UPD (http_connections);
511 UPD (pending_http_queries);
512 UPD (active_rpcs);
513 UPD (active_rpcs_created);
514 UPD (rpc_dropped_running);
515 UPD (rpc_dropped_answers);
516 UPD (tot_forwarded_queries);
517 UPD (expired_forwarded_queries);
518 UPD (dropped_queries);
519 UPD (tot_forwarded_responses);
520 UPD (dropped_responses);
521 UPD (tot_forwarded_simple_acks);
522 UPD (dropped_simple_acks);
523 UPD (mtproto_proxy_errors);
524 UPD (connections_failed_lru);
525 UPD (connections_failed_flood);
526 UPD (ext_connections);
527 UPD (ext_connections_created);
528 UPD (http_queries);
529 UPD (http_bad_headers);
530#undef UPD
531}
532
533void update_local_stats (void) {
534 if (!slave_mode) {
535 return;
536 }
537 update_local_stats_copy (WStats + worker_id * 2);
538 update_local_stats_copy (WStats + worker_id * 2 + 1);
539}
540
541void compute_stats_sum (void) {
542 if (!workers) {
543 return;
544 }
545 memset (&SumStats, 0, sizeof (SumStats));
546 int i;
547 for (i = 0; i < workers; i++) {
548 static struct worker_stats W;
549 struct worker_stats *F;
550 int s_cnt;
551 do {
552 F = WStats + i * 2;
553 do {
554 barrier ();
555 s_cnt = (++F)->cnt;
556 if (!(s_cnt & 1)) {
557 break;
558 }
559 s_cnt = (--F)->cnt;
560 } while (s_cnt & 1);
561 barrier ();
562 memcpy (&W, F, sizeof (W));
563 barrier ();
564 } while (s_cnt != F->cnt);
565 add_stats (&W);
566 }
567}
568
569/*
570 *
571 * SERVER
572 *
573 */
574
575
576void mtfront_prepare_stats (stats_buffer_t *sb) {
577 struct connections_stat conn;
578 struct buffers_stat bufs;
579 long long tot_dh_rounds[3];
580 int allocated_aes_crypto, allocated_aes_crypto_temp;
581 int uptime = now - start_time;
582 compute_stats_sum ();
583 fetch_connections_stat (&conn);
584 fetch_buffers_stat (&bufs);
585 fetch_tot_dh_rounds_stat (tot_dh_rounds);
586 fetch_aes_crypto_stat (&allocated_aes_crypto, &allocated_aes_crypto_temp);
587
588 sb_prepare (sb);
589 sb_memory (sb, AM_GET_MEMORY_USAGE_SELF);
590
591#define S(x) ((x)+(SumStats.x))
592#define S1(x) (SumStats.x)
593#define SW(x) (workers ? S1(x) : S(x))
594 sb_printf (sb,
595 "config_filename\t%s\n"
596 "config_loaded_at\t%d\n"
597 "config_size\t%d\n"
598 "config_md5\t%s\n"
599 "config_auth_clusters\t%d\n"
600 "workers\t%d\n"
601 "queries_get\t%lld\n"
602 "qps_get\t%.3f\n"
603 "tot_forwarded_queries\t%lld\n"
604 "expired_forwarded_queries\t%lld\n"
605 "dropped_queries\t%lld\n"
606 "tot_forwarded_responses\t%lld\n"
607 "dropped_responses\t%lld\n"
608 "tot_forwarded_simple_acks\t%lld\n"
609 "dropped_simple_acks\t%lld\n"
610 "active_rpcs_created\t%lld\n"
611 "active_rpcs\t%lld\n"
612 "rpc_dropped_answers\t%lld\n"
613 "rpc_dropped_running\t%lld\n"
614 "window_clamp\t%d\n"
615 "total_ready_targets\t%d\n"
616 "total_allocated_targets\t%d\n"
617 "total_declared_targets\t%d\n"
618 "total_inactive_targets\t%d\n"
619 "total_connections\t%d\n"
620 "total_encrypted_connections\t%d\n"
621 "total_allocated_connections\t%d\n"
622 "total_allocated_outbound_connections\t%d\n"
623 "total_allocated_inbound_connections\t%d\n"
624 "total_allocated_socket_connections\t%d\n"
625 "total_dh_connections\t%d\n"
626 "total_dh_rounds\t%lld %lld %lld\n"
627 "total_special_connections\t%d\n"
628 "total_max_special_connections\t%d\n"
629 "total_accept_connections_failed\t%lld %lld %lld %lld %lld\n"
630 "ext_connections\t%lld\n"
631 "ext_connections_created\t%lld\n"
632 "total_active_network_events\t%d\n"
633 "total_network_buffers_used_size\t%lld\n"
634 "total_network_buffers_allocated_bytes\t%lld\n"
635 "total_network_buffers_used\t%d\n"
636 "total_network_buffer_chunks_allocated\t%d\n"
637 "total_network_buffer_chunks_allocated_max\t%d\n"
638 "mtproto_proxy_errors\t%lld\n"
639 "connections_failed_lru\t%lld\n"
640 "connections_failed_flood\t%lld\n"
641 "http_connections\t%d\n"
642 "pending_http_queries\t%d\n"
643 "http_queries\t%lld\n"
644 "http_bad_headers\t%lld\n"
645 "http_qps\t%.6f\n"
646 "proxy_mode\t%d\n"
647 "proxy_tag_set\t%d\n"
648 "version\t" VERSION_STR " compiled at " __DATE__ " " __TIME__ " by gcc " __VERSION__ " "
649#ifdef __LP64__
650 "64-bit"
651#else
652 "32-bit"
653#endif
654 " after commit " COMMIT "\n",
655 config_filename,
656 CurConf->config_loaded_at,
657 CurConf->config_bytes,
658 CurConf->config_md5_hex,
659 CurConf->auth_stats.tot_clusters,
660 workers,
661 S(get_queries),
662 safe_div (S(get_queries), uptime),
663 S(tot_forwarded_queries),
664 S(expired_forwarded_queries),
665 S(dropped_queries),
666 S(tot_forwarded_responses),
667 S(dropped_responses),
668 S(tot_forwarded_simple_acks),
669 S(dropped_simple_acks),
670 S(active_rpcs_created),
671 S(active_rpcs),
672 S(rpc_dropped_answers),
673 S(rpc_dropped_running),
674 window_clamp,
675 SW(conn.ready_targets),
676 SW(conn.allocated_targets),
677 SW(conn.active_targets),
678 SW(conn.inactive_targets),
679 S(conn.active_connections),
680 S(allocated_aes_crypto),
681 S(conn.allocated_connections),
682 S(conn.allocated_outbound_connections),
683 S(conn.allocated_inbound_connections),
684 S(conn.allocated_socket_connections),
685 S(conn.active_dh_connections),
686 S(tot_dh_rounds[0]),
687 S(tot_dh_rounds[1]),
688 S(tot_dh_rounds[2]),
689 SW(conn.active_special_connections),
690 SW(conn.max_special_connections),
691 S(conn.accept_init_accepted_failed),
692 S(conn.accept_calls_failed),
693 S(conn.accept_connection_limit_failed),
694 S(conn.accept_rate_limit_failed),
695 S(conn.accept_nonblock_set_failed),
696 S(ext_connections),
697 S(ext_connections_created),
698 S(ev_heap_size),
699 SW(bufs.total_used_buffers_size),
700 SW(bufs.allocated_buffer_bytes),
701 SW(bufs.total_used_buffers),
702 SW(bufs.allocated_buffer_chunks),
703 SW(bufs.max_allocated_buffer_chunks),
704 S(mtproto_proxy_errors),
705 S(connections_failed_lru),
706 S(connections_failed_flood),
707 S(http_connections),
708 S(pending_http_queries),
709 S(http_queries),
710 S(http_bad_headers),
711 safe_div (S(http_queries), uptime),
712 proxy_mode,
713 proxy_tag_set
714 );
715#undef S
716#undef S1
717#undef SW
718}
719
720/*
721 *
722 * JOB UTILS
723 *
724 */
725
726typedef int (*job_callback_func_t)(void *data, int len);
727void schedule_job_callback (int context, job_callback_func_t func, void *data, int len);
728
729struct job_callback_info {
730 job_callback_func_t func;
731 void *data[0];
732};
733
734int callback_job_run (job_t job, int op, struct job_thread *JT) {
735 struct job_callback_info *D = (struct job_callback_info *)(job->j_custom);
736 switch (op) {
737 case JS_RUN:
738 return D->func (D->data, job->j_custom_bytes - offsetof (struct job_callback_info, data));
739 // return JOB_COMPLETED;
740 case JS_FINISH:
741 return job_free (JOB_REF_PASS (job));
742 default:
743 assert (0);
744 }
745}
746
747void schedule_job_callback (int context, job_callback_func_t func, void *data, int len) {
748 job_t job = create_async_job (callback_job_run, JSP_PARENT_RWE | JSC_ALLOW (context, JS_RUN) | JSIG_FAST (JS_FINISH), -2, offsetof (struct job_callback_info, data) + len, 0, JOB_REF_NULL);
749 assert (job);
750 struct job_callback_info *D = (struct job_callback_info *)(job->j_custom);
751 D->func = func;
752 memcpy (D->data, data, len);
753 schedule_job (JOB_REF_PASS (job));
754}
755
756
757/*
758 *
759 * RPC CLIENT
760 *
761 */
762
763int client_send_message (JOB_REF_ARG (C), long long in_conn_id, struct tl_in_state *tlio_in, int flags);
764
765int mtfront_client_ready (connection_job_t C);
766int mtfront_client_close (connection_job_t C, int who);
767int rpcc_execute (connection_job_t C, int op, struct raw_message *msg);
768int tcp_rpcc_check_ready (connection_job_t C);
769
770struct tcp_rpc_client_functions mtfront_rpc_client = {
771 .execute = rpcc_execute,
772 .check_ready = tcp_rpcc_default_check_ready,
773 .flush_packet = tcp_rpc_flush_packet,
774 .rpc_check_perm = tcp_rpcc_default_check_perm,
775 .rpc_init_crypto = tcp_rpcc_init_crypto,
776 .rpc_start_crypto = tcp_rpcc_start_crypto,
777 .rpc_ready = mtfront_client_ready,
778 .rpc_close = mtfront_client_close
779};
780
781int rpcc_exists;
782
783static int _notify_remote_closed (JOB_REF_ARG(C), long long out_conn_id) {
784 TLS_START (JOB_REF_PASS(C)) {
785 tl_store_int (RPC_CLOSE_CONN);
786 tl_store_long (out_conn_id);
787 } TLS_END;
788 return 1;
789}
790
791void push_rpc_confirmation (JOB_REF_ARG (C), int confirm) {
792 struct raw_message *msg = malloc (sizeof (struct raw_message));
793 rwm_create (msg, "\xdd", 1);
794 rwm_push_data (msg, &confirm, 4);
795 mpq_push_w (CONN_INFO(C)->out_queue, msg, 0);
796 job_signal (JOB_REF_PASS (C), JS_RUN);
797}
798
799struct client_packet_info {
800 struct event_timer ev;
801 struct raw_message msg;
802 connection_job_t conn;
803 int type;
804};
805
806int process_client_packet (struct tl_in_state *tlio_in, int op, connection_job_t C) {
807 int len = tl_fetch_unread ();
808 assert (op == tl_fetch_int ());
809
810 switch (op) {
811 case RPC_PONG:
812 return 1;
813 case RPC_PROXY_ANS:
814 if (len >= 16) {
815 int flags = tl_fetch_int ();
816 long long out_conn_id = tl_fetch_long ();
817 assert (tl_fetch_unread () == len - 16);
818 vkprintf (2, "got RPC_PROXY_ANS from connection %d:%llx, data size = %d, flags = %d\n", CONN_INFO(C)->fd, out_conn_id, tl_fetch_unread (), flags);
819 struct ext_connection *Ex = find_ext_connection_by_out_conn_id (out_conn_id);
820 connection_job_t D = 0;
821 if (Ex && Ex->out_fd == CONN_INFO(C)->fd && Ex->out_gen == CONN_INFO(C)->generation) {
822 D = connection_get_by_fd_generation (Ex->in_fd, Ex->in_gen);
823 }
824 if (D) {
825 vkprintf (2, "proxying answer into connection %d:%llx\n", Ex->in_fd, Ex->in_conn_id);
826 tot_forwarded_responses++;
827 client_send_message (JOB_REF_PASS(D), Ex->in_conn_id, tlio_in, flags);
828 } else {
829 vkprintf (2, "external connection not found, dropping proxied answer\n");
830 dropped_responses++;
831 _notify_remote_closed (JOB_REF_CREATE_PASS(C), out_conn_id);
832 }
833 return 1;
834 }
835 break;
836 case RPC_SIMPLE_ACK:
837 if (len == 16) {
838 long long out_conn_id = tl_fetch_long ();
839 int confirm = tl_fetch_int ();
840 vkprintf (2, "got RPC_SIMPLE_ACK for connection = %llx, value %08x\n", out_conn_id, confirm);
841 struct ext_connection *Ex = find_ext_connection_by_out_conn_id (out_conn_id);
842 connection_job_t D = 0;
843 if (Ex && Ex->out_fd == CONN_INFO(C)->fd && Ex->out_gen == CONN_INFO(C)->generation) {
844 D = connection_get_by_fd_generation (Ex->in_fd, Ex->in_gen);
845 }
846 if (D) {
847 vkprintf (2, "proxying simple ack %08x into connection %d:%llx\n", confirm, Ex->in_fd, Ex->in_conn_id);
848 if (Ex->in_conn_id) {
849 assert (0);
850 } else {
851 if (TCP_RPC_DATA(D)->flags & RPC_F_COMPACT) {
852 confirm = __builtin_bswap32 (confirm);
853 }
854 push_rpc_confirmation (JOB_REF_PASS (D), confirm);
855 }
856 tot_forwarded_simple_acks++;
857 } else {
858 vkprintf (2, "external connection not found, dropping simple ack\n");
859 dropped_simple_acks++;
860 _notify_remote_closed (JOB_REF_CREATE_PASS (C), out_conn_id);
861 }
862 return 1;
863 }
864 break;
865 case RPC_CLOSE_EXT:
866 if (len == 12) {
867 long long out_conn_id = tl_fetch_long ();
868 vkprintf (2, "got RPC_CLOSE_EXT for connection = %llx\n", out_conn_id);
869 struct ext_connection *Ex = find_ext_connection_by_out_conn_id (out_conn_id);
870 if (Ex) {
871 remove_ext_connection (Ex, 2);
872 }
873 return 1;
874 }
875 break;
876 default:
877 vkprintf (1, "unknown RPC operation %08x, ignoring\n", op);
878 }
879
880 return 0;
881}
882
883int client_packet_job_run (job_t job, int op, struct job_thread *JT) {
884 struct client_packet_info *D = (struct client_packet_info *)(job->j_custom);
885
886 switch (op) {
887 case JS_RUN: {
888 struct tl_in_state *tlio_in = tl_in_state_alloc ();
889 tlf_init_raw_message (tlio_in, &D->msg, D->msg.total_bytes, 0);
890 process_client_packet (tlio_in, D->type, D->conn);
891 tl_in_state_free (tlio_in);
892 return JOB_COMPLETED;
893 }
894 case JS_ALARM:
895 if (!job->j_error) {
896 job->j_error = ETIMEDOUT;
897 }
898 return JOB_COMPLETED;
899 case JS_ABORT:
900 if (!job->j_error) {
901 job->j_error = ECANCELED;
902 }
903 return JOB_COMPLETED;
904 case JS_FINISH:
905 if (D->conn) {
906 job_decref (JOB_REF_PASS (D->conn));
907 }
908 if (D->msg.magic) {
909 rwm_free (&D->msg);
910 }
911 return job_free (JOB_REF_PASS (job));
912 default:
913 return JOB_ERROR;
914 }
915}
916
917int rpcc_execute (connection_job_t C, int op, struct raw_message *msg) {
918 vkprintf (2, "rpcc_execute: fd=%d, op=%08x, len=%d\n", CONN_INFO(C)->fd, op, msg->total_bytes);
919 CONN_INFO(C)->last_response_time = precise_now;
920
921 switch (op) {
922 case RPC_PONG:
923 break;
924 case RPC_PROXY_ANS:
925 case RPC_SIMPLE_ACK:
926 case RPC_CLOSE_EXT: {
927 job_t job = create_async_job (client_packet_job_run, JSP_PARENT_RWE | JSC_ALLOW (JC_ENGINE, JS_RUN) | JSC_ALLOW (JC_ENGINE, JS_ABORT) | JSC_ALLOW (JC_ENGINE, JS_ALARM) | JSC_ALLOW (JC_ENGINE, JS_FINISH), -2, sizeof (struct client_packet_info), JT_HAVE_TIMER, JOB_REF_NULL);
928 struct client_packet_info *D = (struct client_packet_info *)(job->j_custom);
929 D->msg = *msg;
930 D->type = op;
931 D->conn = job_incref (C);
932 schedule_job (JOB_REF_PASS (job));
933 return 1;
934 }
935 default:
936 vkprintf (1, "unknown RPC operation %08x, ignoring\n", op);
937 }
938 return 0;
939}
940
941static inline int get_conn_tag (connection_job_t C) {
942 return 1 + (CONN_INFO(C)->generation & 0xffffff);
943}
944
945int mtfront_client_ready (connection_job_t C) {
946 check_engine_class ();
947 struct tcp_rpc_data *D = TCP_RPC_DATA(C);
948 int fd = CONN_INFO(C)->fd;
949 assert ((unsigned) fd < MAX_CONNECTIONS);
950 assert (!D->extra_int);
951 D->extra_int = get_conn_tag (C);
952 vkprintf (1, "Connected to RPC Middle-End (fd=%d)\n", fd);
953 rpcc_exists++;
954
955 struct ext_connection *H = &ExtConnectionHead[fd];
956 assert (!H->o_prev);
957 H->o_prev = H->o_next = H;
958 H->out_fd = fd;
959
960 CONN_INFO(C)->last_response_time = precise_now;
961 return 0;
962}
963
964int mtfront_client_close (connection_job_t C, int who) {
965 check_engine_class ();
966 struct tcp_rpc_data *D = TCP_RPC_DATA(C);
967 int fd = CONN_INFO(C)->fd;
968 assert ((unsigned) fd < MAX_CONNECTIONS);
969 vkprintf (1, "Disconnected from RPC Middle-End (fd=%d)\n", fd);
970 if (D->extra_int) {
971 assert (D->extra_int == get_conn_tag (C));
972 struct ext_connection *H = &ExtConnectionHead[fd], *Ex, *Ex_next;
973 assert (H->o_next);
974 for (Ex = H->o_next; Ex != H; Ex = Ex_next) {
975 Ex_next = Ex->o_next;
976 assert (Ex->out_fd == fd);
977 remove_ext_connection (Ex, 2);
978 }
979 assert (H->o_next == H && H->o_prev == H);
980 H->o_next = H->o_prev = 0;
981 H->out_fd = 0;
982 }
983 D->extra_int = 0;
984 return 0;
985}
986
987/*
988 *
989 * HTTP INTERFACE
990 *
991 */
992
993int hts_execute (connection_job_t C, struct raw_message *msg, int op);
994int mtproto_http_alarm (connection_job_t C);
995int mtproto_http_close (connection_job_t C, int who);
996
997int hts_stats_execute (connection_job_t C, struct raw_message *msg, int op);
998
999struct http_server_functions http_methods = {
1000 .execute = hts_execute,
1001 .ht_alarm = mtproto_http_alarm,
1002 .ht_close = mtproto_http_close
1003};
1004
1005struct http_server_functions http_methods_stats = {
1006 .execute = hts_stats_execute
1007};
1008
1009int ext_rpcs_execute (connection_job_t C, int op, struct raw_message *msg);
1010
1011int mtproto_ext_rpc_ready (connection_job_t C);
1012int mtproto_ext_rpc_close (connection_job_t C, int who);
1013
1014struct tcp_rpc_server_functions ext_rpc_methods = {
1015 .execute = ext_rpcs_execute,
1016 .check_ready = server_check_ready,
1017 .flush_packet = tcp_rpc_flush_packet,
1018 .rpc_ready = mtproto_ext_rpc_ready,
1019 .rpc_close = mtproto_ext_rpc_close,
1020 //.http_fallback_type = &ct_http_server_mtfront,
1021 //.http_fallback_extra = &http_methods,
1022 .max_packet_len = MAX_POST_SIZE,
1023};
1024
1025int mtproto_proxy_rpc_ready (connection_job_t C);
1026int mtproto_proxy_rpc_close (connection_job_t C, int who);
1027
1028// ENGINE context
1029int do_close_in_ext_conn (void *_data, int s_len) {
1030 assert (s_len == 4);
1031 int fd = *(int *)_data;
1032 struct ext_connection *Ex = get_ext_connection_by_in_fd (fd);
1033 if (Ex) {
1034 remove_ext_connection (Ex, 1);
1035 }
1036 return JOB_COMPLETED;
1037}
1038
1039// NET_CPU context
1040int mtproto_http_close (connection_job_t C, int who) {
1041 assert ((unsigned) CONN_INFO(C)->fd < MAX_CONNECTIONS);
1042 vkprintf (3, "http connection closing (%d) by %d, %d queries pending\n", CONN_INFO(C)->fd, who, CONN_INFO(C)->pending_queries);
1043 if (CONN_INFO(C)->pending_queries) {
1044 assert (CONN_INFO(C)->pending_queries == 1);
1045 pending_http_queries--;
1046 CONN_INFO(C)->pending_queries = 0;
1047 }
1048 schedule_job_callback (JC_ENGINE, do_close_in_ext_conn, &CONN_INFO(C)->fd, 4);
1049 return 0;
1050}
1051
1052int mtproto_ext_rpc_ready (connection_job_t C) {
1053 assert ((unsigned) CONN_INFO(C)->fd < MAX_CONNECTIONS);
1054 vkprintf (3, "ext_rpc connection ready (%d)\n", CONN_INFO(C)->fd);
1055 lru_insert_conn (C);
1056 return 0;
1057}
1058
1059int mtproto_ext_rpc_close (connection_job_t C, int who) {
1060 assert ((unsigned) CONN_INFO(C)->fd < MAX_CONNECTIONS);
1061 vkprintf (3, "ext_rpc connection closing (%d) by %d\n", CONN_INFO(C)->fd, who);
1062 struct ext_connection *Ex = get_ext_connection_by_in_fd (CONN_INFO(C)->fd);
1063 if (Ex) {
1064 remove_ext_connection (Ex, 1);
1065 }
1066 return 0;
1067}
1068
1069int mtproto_proxy_rpc_ready (connection_job_t C) {
1070 check_engine_class ();
1071 struct tcp_rpc_data *D = TCP_RPC_DATA(C);
1072 int fd = CONN_INFO(C)->fd;
1073 assert ((unsigned) fd < MAX_CONNECTIONS);
1074 vkprintf (3, "proxy_rpc connection ready (%d)\n", fd);
1075 struct ext_connection *H = &ExtConnectionHead[fd];
1076 assert (!H->i_prev);
1077 H->i_prev = H->i_next = H;
1078 H->in_fd = fd;
1079 assert (!D->extra_int);
1080 D->extra_int = -get_conn_tag(C);
1081 lru_insert_conn (C);
1082 return 0;
1083}
1084
1085int mtproto_proxy_rpc_close (connection_job_t C, int who) {
1086 check_engine_class ();
1087 struct tcp_rpc_data *D = TCP_RPC_DATA(C);
1088 int fd = CONN_INFO(C)->fd;
1089 assert ((unsigned) fd < MAX_CONNECTIONS);
1090 vkprintf (3, "proxy_rpc connection closing (%d) by %d\n", fd, who);
1091 if (D->extra_int) {
1092 assert (D->extra_int == -get_conn_tag (C));
1093 struct ext_connection *H = &ExtConnectionHead[fd], *Ex, *Ex_next;
1094 assert (H->i_next);
1095 for (Ex = H->i_next; Ex != H; Ex = Ex_next) {
1096 Ex_next = Ex->i_next;
1097 assert (Ex->in_fd == fd);
1098 remove_ext_connection (Ex, 1);
1099 }
1100 assert (H->i_next == H && H->i_prev == H);
1101 H->i_next = H->i_prev = 0;
1102 H->in_fd = 0;
1103 }
1104 D->extra_int = 0;
1105 return 0;
1106}
1107
1108char mtproto_cors_http_headers[] =
1109 "Access-Control-Allow-Origin: *\r\n"
1110 "Access-Control-Allow-Methods: POST, OPTIONS\r\n"
1111 "Access-Control-Allow-Headers: origin, content-type\r\n"
1112 "Access-Control-Max-Age: 1728000\r\n";
1113
1114int forward_mtproto_packet (struct tl_in_state *tlio_in, connection_job_t C, int len, int remote_ip_port[5], int rpc_flags);
1115int forward_tcp_query (struct tl_in_state *tlio_in, connection_job_t C, conn_target_job_t S, int flags, long long auth_key_id, int remote_ip_port[5], int our_ip_port[5]);
1116
1117unsigned parse_text_ipv4 (char *str) {
1118 int a, b, c, d;
1119 if (sscanf (str, "%d.%d.%d.%d", &a, &b, &c, &d) != 4) {
1120 return 0;
1121 }
1122 if ((a | b | c | d) & -0x100) {
1123 return 0;
1124 }
1125 return (a << 24) | (b << 16) | (c << 8) | d;
1126}
1127
1128int parse_text_ipv6 (unsigned char ip[16], const char *str) {
1129 const char *ptr = str;
1130 int i, k = -1;
1131 if (*ptr == ':' && ptr[1] == ':') {
1132 k = 0;
1133 ptr += 2;
1134 }
1135 for (i = 0; i < 8; i++) {
1136 int c = *ptr;
1137 if (i > 0) {
1138 if (c == ':') {
1139 c = *++ptr;
1140 } else if (k >= 0) {
1141 break;
1142 } else {
1143 return -1; // ':' expected
1144 }
1145 if (c == ':') {
1146 if (k >= 0) {
1147 return -1; // second '::'
1148 }
1149 k = i;
1150 c = *++ptr;
1151 }
1152 }
1153 int j = 0, v = 0;
1154 while ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'F') || (c >= 'a' && c <= 'f')) {
1155 c |= 0x20;
1156 v = (v << 4) + (c <= '9' ? c - '0' : c - 'a' + 10);
1157 if (++j > 4) {
1158 return -1; // more than 4 hex digits in component
1159 }
1160 c = *++ptr;
1161 }
1162 if (!j) {
1163 if (k == i) {
1164 break;
1165 }
1166 return -1; // hex digit or ':' expected
1167 }
1168 ip[2*i] = (v >> 8);
1169 ip[2*i+1] = (v & 0xff);
1170 }
1171 if (*ptr) {
1172 return -1;
1173 }
1174 /*
1175 if (*ptr && *ptr != '/' && *ptr != ' ' && *ptr != '\n' && *ptr != '\r' && *ptr != '\t') {
1176 return -1; // extra characters
1177 }
1178 */
1179 if (i < 8) {
1180 assert (k >= 0 && k <= i);
1181 int gap = 2 * (8 - i);
1182 memmove (ip + 2*k + gap, ip + 2*k, 2 * (i - k));
1183 memset (ip + 2*k, 0, gap);
1184 }
1185 return ptr - str;
1186}
1187
1188struct http_query_info {
1189 struct event_timer ev;
1190 connection_job_t conn;
1191 struct raw_message msg;
1192 int conn_fd;
1193 int conn_generation;
1194 int flags;
1195 int query_type;
1196 int header_size;
1197 int data_size;
1198 int first_line_size;
1199 int host_offset;
1200 int host_size;
1201 int uri_offset;
1202 int uri_size;
1203 char header[0];
1204};
1205
1206int process_http_query (struct tl_in_state *tlio_in, job_t HQJ) {
1207 struct http_query_info *D = (struct http_query_info *) HQJ->j_custom;
1208 connection_job_t c = D->conn;
1209 char *qHeaders = D->header + D->first_line_size;
1210 int qHeadersLen = D->header_size - D->first_line_size;
1211
1212 assert (D->first_line_size > 0 && D->first_line_size <= D->header_size);
1213
1214 if (verbosity > 1) {
1215 fprintf (stderr, "===============\n%.*s\n==============\n", D->header_size, D->header);
1216 fprintf (stderr, "%d,%d,%d,%d\n", D->host_offset, D->host_size, D->uri_offset, D->uri_size);
1217
1218 fprintf (stderr, "hostname: '%.*s'\n", D->host_size, D->header + D->host_offset);
1219 fprintf (stderr, "URI: '%.*s'\n", D->uri_size, D->header + D->uri_offset);
1220 }
1221
1222 if (verbosity >= 2) {
1223 char PostPreview[81];
1224 int preview_len = (D->data_size < sizeof (PostPreview) ? D->data_size : sizeof(PostPreview) - 1);
1225 tl_fetch_lookup_data (PostPreview, preview_len);
1226 PostPreview[preview_len] = 0;
1227 kprintf ("have %d POST bytes: `%.80s`\n", D->data_size, PostPreview);
1228 }
1229
1230 char *qUri = D->header + D->uri_offset;
1231 int qUriLen = D->uri_size;
1232
1233 char *get_qm_ptr = memchr (qUri, '?', D->uri_size);
1234 if (get_qm_ptr) {
1235 // qGet = get_qm_ptr + 1;
1236 // qGetLen = qUri + qUriLen - qGet;
1237 qUriLen = get_qm_ptr - qUri;
1238 } else {
1239 // qGet = 0;
1240 // qGetLen = 0;
1241 }
1242
1243 if (qUriLen >= 20) {
1244 return -414;
1245 }
1246
1247 if (qUriLen >= 4 && !memcmp (qUri, "/api", 4)) {
1248 if (qUriLen >= 5 && qUri[4] == 'w') {
1249 HTS_DATA(c)->query_flags |= QF_EXTRA_HEADERS;
1250 extra_http_response_headers = mtproto_cors_http_headers;
1251 } else {
1252 HTS_DATA(c)->query_flags &= ~QF_EXTRA_HEADERS;
1253 }
1254 if (D->query_type == htqt_options) {
1255 char response_buffer[512];
1256 int len = snprintf (response_buffer, 511, "HTTP/1.1 200 OK\r\nConnection: %s\r\nContent-type: text/plain\r\nPragma: no-cache\r\nCache-control: no-store\r\n%sContent-length: 0\r\n\r\n", (HTS_DATA(c)->query_flags & QF_KEEPALIVE) ? "keep-alive" : "close", HTS_DATA(c)->query_flags & QF_EXTRA_HEADERS ? mtproto_cors_http_headers : "");
1257 assert (len < 511);
1258 struct raw_message *m = calloc (sizeof (struct raw_message), 1);
1259 rwm_create (m, response_buffer, len);
1260 http_flush (c, m);
1261 return 0;
1262 }
1263 if (D->data_size & 3) {
1264 return -404;
1265 }
1266 cur_http_origin_len = get_http_header (qHeaders, qHeadersLen, cur_http_origin, sizeof (cur_http_origin) - 1, "Origin", 6);
1267 cur_http_referer_len = get_http_header (qHeaders, qHeadersLen, cur_http_referer, sizeof (cur_http_referer) - 1, "Referer", 7);
1268 cur_http_user_agent_len = get_http_header (qHeaders, qHeadersLen, cur_http_user_agent, sizeof (cur_http_user_agent) - 1, "User-Agent", 10);
1269
1270 int tmp_ip_port[5], *remote_ip_port = 0;
1271 if ((CONN_INFO(c)->remote_ip & 0xff000000) == 0x0a000000 || (CONN_INFO(c)->remote_ip & 0xff000000) == 0x7f000000) {
1272 char x_real_ip[64], x_real_port[16];
1273 int x_real_ip_len = get_http_header (qHeaders, qHeadersLen, x_real_ip, sizeof (x_real_ip) - 1, "X-Real-IP", 9);
1274 int x_real_port_len = get_http_header (qHeaders, qHeadersLen, x_real_port, sizeof (x_real_port) - 1, "X-Real-Port", 11);
1275 if (x_real_ip_len > 0) {
1276 unsigned real_ip = parse_text_ipv4 (x_real_ip);
1277 if (real_ip >= (1 << 24) || parse_text_ipv6 ((unsigned char *)tmp_ip_port, x_real_ip) > 0) {
1278 if (real_ip >= (1 << 24)) {
1279 tmp_ip_port[0] = 0;
1280 tmp_ip_port[1] = 0;
1281 tmp_ip_port[2] = 0xffff0000;
1282 tmp_ip_port[3] = htonl (real_ip);
1283 }
1284 int port = (x_real_port_len > 0 ? atoi (x_real_port) : 0);
1285 tmp_ip_port[4] = (port > 0 && port < 65536 ? port : 0);
1286 remote_ip_port = tmp_ip_port;
1287 vkprintf (3, "set remote IPv6:port to %08x:%08x:%08x:%08x:%08x according to X-Real-Ip '%s', X-Real-Port '%s'\n", tmp_ip_port[0], tmp_ip_port[1], tmp_ip_port[2], tmp_ip_port[3], tmp_ip_port[4], x_real_ip, x_real_port_len > 0 ? x_real_port : "");
1288 }
1289 }
1290 }
1291
1292 int res = forward_mtproto_packet (tlio_in, c, D->data_size, remote_ip_port, 0);
1293 return res ? 1 : -404;
1294 }
1295
1296 return -404;
1297}
1298
1299int http_query_job_run (job_t job, int op, struct job_thread *JT) {
1300 struct http_query_info *HQ = (struct http_query_info *)(job->j_custom);
1301
1302 switch (op) {
1303 case JS_RUN: { // ENGINE context
1304 lru_insert_conn (HQ->conn);
1305 struct tl_in_state *tlio_in = tl_in_state_alloc ();
1306 tlf_init_raw_message (tlio_in, &HQ->msg, HQ->msg.total_bytes, 0);
1307 int res = process_http_query (tlio_in, job);
1308 tl_in_state_free (tlio_in);
1309 assert (!HQ->msg.magic);
1310 //rwm_free (&HQ->msg);
1311 if (res < 0) {
1312 write_http_error (HQ->conn, -res);
1313 } else if (res > 0) {
1314 assert (HQ->flags & 1);
1315 HQ->flags &= ~1;
1316 }
1317 return JOB_COMPLETED;
1318 }
1319 case JS_ALARM:
1320 if (!job->j_error) {
1321 job->j_error = ETIMEDOUT;
1322 }
1323 return JOB_COMPLETED;
1324 case JS_ABORT:
1325 if (!job->j_error) {
1326 job->j_error = ECANCELED;
1327 }
1328 return JOB_COMPLETED;
1329 case JS_FINISH: // NET-CPU
1330 if (HQ->flags & 1) {
1331 connection_job_t c = HQ->conn ? job_incref (HQ->conn): connection_get_by_fd_generation (HQ->conn_fd, HQ->conn_generation);
1332 if (c) {
1333 assert (CONN_INFO(c)->pending_queries == 1);
1334 CONN_INFO(c)->pending_queries--;
1335 if (!(HTS_DATA(c)->query_flags & QF_KEEPALIVE) && CONN_INFO(c)->status == conn_working) {
1336 connection_write_close (c);
1337 }
1338 job_decref (JOB_REF_PASS (c));
1339 }
1340 --pending_http_queries;
1341 HQ->flags &= ~1;
1342 }
1343 if (HQ->conn) {
1344 job_decref (JOB_REF_PASS (HQ->conn));
1345 }
1346 if (HQ->msg.magic) {
1347 rwm_free (&HQ->msg);
1348 }
1349 return job_free (JOB_REF_PASS (job));
1350 default:
1351 return JOB_ERROR;
1352 }
1353}
1354
1355int hts_stats_execute (connection_job_t c, struct raw_message *msg, int op) {
1356 struct hts_data *D = HTS_DATA(c);
1357
1358 // lru_insert_conn (c); // dangerous in net-cpu context
1359 if (check_conn_buffers (c) < 0) {
1360 return -429;
1361 }
1362
1363 if (op != htqt_get || D->data_size != -1) {
1364 D->query_flags &= ~QF_KEEPALIVE;
1365 return -501;
1366 }
1367 if (CONN_INFO(c)->remote_ip != 0x7f000001) {
1368 return -404;
1369 }
1370
1371 if (D->uri_size != 6) {
1372 return -404;
1373 }
1374
1375 char ReqHdr[MAX_HTTP_HEADER_SIZE];
1376 assert (rwm_fetch_data (msg, &ReqHdr, D->header_size) == D->header_size);
1377
1378 if (memcmp (ReqHdr + D->uri_offset, "/stats", 6)) {
1379 return -404;
1380 }
1381
1382 stats_buffer_t sb;
1383 sb_alloc(&sb, 1 << 20);
1384 mtfront_prepare_stats(&sb);
1385
1386 struct raw_message *raw = calloc (sizeof (*raw), 1);
1387 rwm_init (raw, 0);
1388 write_basic_http_header_raw (c, raw, 200, 0, sb.pos, 0, "text/plain");
1389 assert (rwm_push_data (raw, sb.buff, sb.pos) == sb.pos);
1390 mpq_push_w (CONN_INFO(c)->out_queue, raw, 0);
1391 job_signal (JOB_REF_CREATE_PASS (c), JS_RUN);
1392
1393 sb_release (&sb);
1394
1395 return 0;
1396}
1397
1398// NET-CPU context
1399int hts_execute (connection_job_t c, struct raw_message *msg, int op) {
1400 struct hts_data *D = HTS_DATA(c);
1401 vkprintf (2, "in hts_execute: connection #%d, op=%d, header_size=%d, data_size=%d, http_version=%d\n",
1402 CONN_INFO(c)->fd, op, D->header_size, D->data_size, D->http_ver);
1403 rwm_dump(msg);
1404
1405 fail_connection(c, -1);
1406 return 0;
1407 // lru_insert_conn (c); // dangerous in net-cpu context
1408 if (check_conn_buffers (c) < 0) {
1409 return -429;
1410 }
1411
1412 if (D->data_size >= MAX_POST_SIZE) {
1413 return -413;
1414 }
1415
1416 if (!((D->query_type == htqt_post && D->data_size > 0) || (D->query_type == htqt_options && D->data_size < 0))) {
1417 D->query_flags &= ~QF_KEEPALIVE;
1418 return -501;
1419 }
1420
1421 if (D->data_size < 0) {
1422 D->data_size = 0;
1423 }
1424
1425 if (D->uri_size > 14 || D->header_size > MAX_HTTP_HEADER_SIZE) {
1426 return -414;
1427 }
1428
1429 if (D->data_size > 0) {
1430 int need_bytes = D->data_size + D->header_size - msg->total_bytes;
1431 if (need_bytes > 0) {
1432 vkprintf (2, "-- need %d more bytes, waiting\n", need_bytes);
1433 return need_bytes;
1434 }
1435 }
1436
1437 assert (msg->total_bytes == D->header_size + D->data_size);
1438
1439 // create http query job here
1440 job_t job = create_async_job (http_query_job_run, JSP_PARENT_RWE | JSC_ALLOW (JC_ENGINE, JS_RUN) | JSC_ALLOW (JC_ENGINE, JS_ABORT) | JSC_ALLOW (JC_ENGINE, JS_ALARM) | JSC_ALLOW (JC_CONNECTION, JS_FINISH), -2, sizeof (struct http_query_info) + D->header_size + 1, JT_HAVE_TIMER, JOB_REF_NULL);
1441 assert (job);
1442 struct http_query_info *HQ = (struct http_query_info *)(job->j_custom);
1443
1444 rwm_clone (&HQ->msg, msg);
1445 HQ->conn = job_incref (c);
1446 HQ->conn_fd = CONN_INFO(c)->fd;
1447 HQ->conn_generation = CONN_INFO(c)->generation;
1448 HQ->flags = 1; // pending_queries
1449 assert (!CONN_INFO(c)->pending_queries);
1450 CONN_INFO(c)->pending_queries++;
1451 ++pending_http_queries;
1452 HQ->query_type = D->query_type;
1453 HQ->header_size = D->header_size;
1454 HQ->data_size = D->data_size;
1455 HQ->first_line_size = D->first_line_size;
1456 HQ->host_offset = D->host_offset;
1457 HQ->host_size = D->host_size;
1458 HQ->uri_offset = D->uri_offset;
1459 HQ->uri_size = D->uri_size;
1460 assert (rwm_fetch_data (&HQ->msg, HQ->header, HQ->header_size) == HQ->header_size);
1461 HQ->header[HQ->header_size] = 0;
1462 assert (HQ->msg.total_bytes == HQ->data_size);
1463
1464 schedule_job (JOB_REF_PASS (job));
1465 return 0;
1466}
1467
1468struct rpcs_exec_data {
1469 struct raw_message msg;
1470 connection_job_t conn;
1471 int op;
1472 int rpc_flags;
1473};
1474
1475int do_rpcs_execute (void *_data, int s_len) {
1476 struct rpcs_exec_data *data = _data;
1477 assert (s_len == sizeof (struct rpcs_exec_data));
1478 assert (data);
1479
1480 lru_insert_conn (data->conn);
1481
1482 int len = data->msg.total_bytes;
1483 struct tl_in_state *tlio_in = tl_in_state_alloc ();
1484 tlf_init_raw_message (tlio_in, &data->msg, len, 0);
1485
1486 int res = forward_mtproto_packet (tlio_in, data->conn, len, 0, data->rpc_flags);
1487 tl_in_state_free (tlio_in);
1488 job_decref (JOB_REF_PASS (data->conn));
1489
1490 if (!res) {
1491 vkprintf (1, "ext_rpcs_execute: cannot forward mtproto packet\n");
1492 }
1493 return JOB_COMPLETED;
1494}
1495
1496
1497int ext_rpcs_execute (connection_job_t c, int op, struct raw_message *msg) {
1498 int len = msg->total_bytes;
1499
1500 vkprintf (2, "ext_rpcs_execute: fd=%d, op=%08x, len=%d\n", CONN_INFO(c)->fd, op, len);
1501
1502 if (len > MAX_POST_SIZE) {
1503 vkprintf (1, "ext_rpcs_execute: packet too long (%d bytes), skipping\n", len);
1504 return SKIP_ALL_BYTES;
1505 }
1506
1507 // lru_insert_conn (c); // dangerous in net-cpu context
1508 if (check_conn_buffers (c) < 0) {
1509 return SKIP_ALL_BYTES;
1510 }
1511
1512 struct rpcs_exec_data data;
1513 rwm_move (&data.msg, msg);
1514 data.conn = job_incref (c);
1515 data.rpc_flags = TCP_RPC_DATA(c)->flags & (RPC_F_QUICKACK | RPC_F_DROPPED | RPC_F_COMPACT_MEDIUM | RPC_F_EXTMODE3);
1516
1517 schedule_job_callback (JC_ENGINE, do_rpcs_execute, &data, sizeof (struct rpcs_exec_data));
1518
1519 return 1;
1520}
1521
1522// NET-CPU context
1523int mtproto_http_alarm (connection_job_t C) {
1524 vkprintf (2, "http_alarm() for connection %d\n", CONN_INFO(C)->fd);
1525
1526 assert (CONN_INFO(C)->status == conn_working);
1527 HTS_DATA(C)->query_flags &= ~QF_KEEPALIVE;
1528
1529 write_http_error (C, 500);
1530
1531 if (CONN_INFO(C)->pending_queries) {
1532 assert (CONN_INFO(C)->pending_queries == 1);
1533 --pending_http_queries;
1534 CONN_INFO(C)->pending_queries = 0;
1535 }
1536
1537 HTS_DATA(C)->parse_state = -1;
1538 connection_write_close (C);
1539
1540 return 0;
1541}
1542
1543// NET-CPU context
1544int finish_postponed_http_response (void *_data, int len) {
1545 assert (len == sizeof (connection_job_t));
1546 connection_job_t C = *(connection_job_t *)_data;
1547 if (!check_job_completion (C)) {
1548 assert (CONN_INFO(C)->pending_queries >= 0);
1549 assert (CONN_INFO(C)->pending_queries > 0);
1550 assert (CONN_INFO(C)->pending_queries == 1);
1551 CONN_INFO(C)->pending_queries = 0;
1552 --pending_http_queries;
1553 // check_conn_buffers (C);
1554 http_flush (C, 0);
1555 } else {
1556 assert (!CONN_INFO(C)->pending_queries);
1557 }
1558 job_decref (JOB_REF_PASS (C));
1559 return JOB_COMPLETED;
1560}
1561
1562// ENGINE context
1563// problem: mtproto_http_alarm() may be invoked in parallel in NET-CPU context
1564int http_send_message (JOB_REF_ARG (C), struct tl_in_state *tlio_in, int flags) {
1565 clear_connection_timeout (C);
1566 struct hts_data *D = HTS_DATA(C);
1567
1568 if ((flags & 0x10) && TL_IN_REMAINING == 4) {
1569 int error_code = tl_fetch_int ();
1570 D->query_flags &= ~QF_KEEPALIVE;
1571 write_http_error (C, -error_code);
1572 } else {
1573 char response_buffer[512];
1574 TLS_START_UNALIGN (JOB_REF_CREATE_PASS (C)) {
1575 int len = TL_IN_REMAINING;
1576 tl_store_raw_data (response_buffer, snprintf (response_buffer, sizeof (response_buffer) - 1, "HTTP/1.1 200 OK\r\nConnection: %s\r\nContent-type: application/octet-stream\r\nPragma: no-cache\r\nCache-control: no-store\r\n%sContent-length: %d\r\n\r\n", (D->query_flags & QF_KEEPALIVE) ? "keep-alive" : "close", D->query_flags & QF_EXTRA_HEADERS ? mtproto_cors_http_headers : "", len));
1577 assert (tl_copy_through (tlio_in, tlio_out, len, 1) == len);
1578 } TLS_END;
1579 }
1580
1581 assert (CONN_INFO(C)->status == conn_working && CONN_INFO(C)->pending_queries == 1);
1582
1583 assert ((unsigned) CONN_INFO(C)->fd < MAX_CONNECTIONS);
1584 vkprintf (3, "detaching http connection (%d)\n", CONN_INFO(C)->fd);
1585
1586 struct ext_connection *Ex = get_ext_connection_by_in_fd (CONN_INFO(C)->fd);
1587 if (Ex) {
1588 remove_ext_connection (Ex, 1);
1589 }
1590
1591 // reference to C is passed to the new job
1592 schedule_job_callback (JC_CONNECTION, finish_postponed_http_response, &C, sizeof (connection_job_t));
1593
1594 return 1;
1595}
1596
1597int client_send_message (JOB_REF_ARG(C), long long in_conn_id, struct tl_in_state *tlio_in, int flags) {
1598 if (check_conn_buffers (C) < 0) {
1599 job_decref (JOB_REF_PASS (C));
1600 return -1;
1601 }
1602 if (in_conn_id) {
1603 assert (0);
1604 return 1;
1605 }
1606
1607 if (CONN_INFO(C)->type == &ct_http_server_mtfront) {
1608 return http_send_message (JOB_REF_PASS(C), tlio_in, flags);
1609 }
1610 TLS_START (JOB_REF_CREATE_PASS (C)) {
1611 assert (tl_copy_through (tlio_in, tlio_out, TL_IN_REMAINING, 1) >= 0);
1612 } TLS_END;
1613
1614 if (check_conn_buffers (C) < 0) {
1615 job_decref (JOB_REF_PASS (C));
1616 return -1;
1617 } else {
1618 job_decref (JOB_REF_PASS (C));
1619 return 1;
1620 }
1621}
1622
1623/* ------------- process normal (encrypted) packet ----------------- */
1624
1625// connection_job_t get_target_connection (conn_target_job_t S, int rotate);
1626
1627conn_target_job_t choose_proxy_target (int target_dc) {
1628 assert (CurConf->auth_clusters > 0);
1629 struct mf_cluster *MFC = mf_cluster_lookup (CurConf, target_dc, 1);
1630 if (!MFC) {
1631 return 0;
1632 }
1633 int attempts = 5;
1634 while (attempts --> 0) {
1635 assert (MFC->targets_num > 0);
1636 conn_target_job_t S = MFC->cluster_targets[lrand48() % MFC->targets_num];
1637 connection_job_t C = 0;
1638 rpc_target_choose_random_connections (S, 0, 1, &C);
1639 if (C && TCP_RPC_DATA(C)->extra_int == get_conn_tag (C)) {
1640 job_decref (JOB_REF_PASS (C));
1641 return S;
1642 }
1643 }
1644 return 0;
1645}
1646
1647static int forward_mtproto_enc_packet (struct tl_in_state *tlio_in, connection_job_t C, long long auth_key_id, int len, int remote_ip_port[5], int rpc_flags) {
1648 if (len < offsetof (struct encrypted_message, message) || (len & 15) != (offsetof (struct encrypted_message, server_salt) & 15)) {
1649 return 0;
1650 }
1651 vkprintf (2, "received mtproto encrypted packet of %d bytes from connection %p (#%d~%d), key=%016llx\n", len, C, CONN_INFO(C)->fd, CONN_INFO(C)->generation, auth_key_id);
1652
1653 CONN_INFO(C)->query_start_time = get_utime_monotonic ();
1654
1655 conn_target_job_t S = choose_proxy_target (TCP_RPC_DATA(C)->extra_int4);
1656
1657 assert (TL_IN_REMAINING == len);
1658 return forward_tcp_query (tlio_in, C, S, rpc_flags, auth_key_id, remote_ip_port, 0);
1659}
1660
1661int forward_mtproto_packet (struct tl_in_state *tlio_in, connection_job_t C, int len, int remote_ip_port[5], int rpc_flags) {
1662 int header[7];
1663 if (len < sizeof (header) || (len & 3)) {
1664 return 0;
1665 }
1666 assert (tl_fetch_lookup_data (header, sizeof (header)) == sizeof (header));
1667 long long auth_key_id = *(long long *)header;
1668 if (auth_key_id) {
1669 return forward_mtproto_enc_packet (tlio_in, C, auth_key_id, len, remote_ip_port, rpc_flags);
1670 }
1671 vkprintf (2, "received mtproto packet of %d bytes\n", len);
1672 int inner_len = header[4];
1673 if (inner_len + 20 != len) {
1674 vkprintf (1, "received packet with bad inner length: %d (%d expected)\n", inner_len, len - 20);
1675 return 0;
1676 }
1677 if (len < 40) {
1678 //must have at least function id and nonce
1679 return 0;
1680 }
1681 int function = header[5];
1682 if (function != CODE_req_pq && function != CODE_req_pq_multi && function != CODE_req_DH_params && function != CODE_set_client_DH_params) {
1683 return 0;
1684 }
1685 conn_target_job_t S = choose_proxy_target (TCP_RPC_DATA(C)->extra_int4);
1686
1687 assert (len == TL_IN_REMAINING);
1688 return forward_tcp_query (tlio_in, C, S, 2 | rpc_flags, 0, remote_ip_port, 0);
1689}
1690
1691/*
1692 *
1693 * QUERY FORWARDING
1694 *
1695 */
1696
1697/* ----------- query rpc forwarding ------------ */
1698
1699int forward_tcp_query (struct tl_in_state *tlio_in, connection_job_t c, conn_target_job_t S, int flags, long long auth_key_id, int remote_ip_port[5], int our_ip_port[5]) {
1700 connection_job_t d = 0;
1701 int c_fd = CONN_INFO(c)->fd;
1702 struct ext_connection *Ex = get_ext_connection_by_in_fd (c_fd);
1703
1704 if (CONN_INFO(c)->type == &ct_tcp_rpc_ext_server_mtfront) {
1705 flags |= TCP_RPC_DATA(c)->flags & RPC_F_DROPPED;
1706 flags |= 0x1000;
1707 } else if (CONN_INFO(c)->type == &ct_http_server_mtfront) {
1708 flags |= 0x3005;
1709 }
1710
1711 if (Ex && Ex->auth_key_id != auth_key_id) {
1712 Ex->auth_key_id = auth_key_id;
1713 }
1714
1715 if (Ex) {
1716 assert (Ex->out_fd > 0 && Ex->out_fd < MAX_CONNECTIONS);
1717 d = connection_get_by_fd_generation (Ex->out_fd, Ex->out_gen);
1718 if (!d || !CONN_INFO(d)->target) {
1719 if (d) {
1720 job_decref (JOB_REF_PASS (d));
1721 }
1722 remove_ext_connection (Ex, 1);
1723 Ex = 0;
1724 }
1725 }
1726
1727 if (!d) {
1728 int attempts = 5;
1729 while (S && attempts --> 0) {
1730 rpc_target_choose_random_connections (S, 0, 1, &d);
1731 if (d) {
1732 if (TCP_RPC_DATA(d)->extra_int == get_conn_tag (d)) {
1733 break;
1734 } else {
1735 job_decref (JOB_REF_PASS (d));
1736 }
1737 }
1738 }
1739 if (!d) {
1740 vkprintf (2, "nowhere to forward user query from connection %d, dropping\n", CONN_INFO(c)->fd);
1741 dropped_queries++;
1742 if (CONN_INFO(c)->type == &ct_tcp_rpc_ext_server_mtfront) {
1743 __sync_fetch_and_or (&TCP_RPC_DATA(c)->flags, RPC_F_DROPPED);
1744 }
1745 return 0;
1746 }
1747 if (flags & RPC_F_DROPPED) {
1748 // there was at least one dropped inbound packet on this connection, have to close it now instead of forwarding next queries
1749 fail_connection (c, -35);
1750 return 0;
1751 }
1752 Ex = create_ext_connection (c, 0, d, auth_key_id);
1753 }
1754
1755 tot_forwarded_queries++;
1756
1757 assert (Ex);
1758
1759 vkprintf (3, "forwarding user query from connection %d~%d (ext_conn_id %llx) into connection %d~%d (ext_conn_id %llx)\n", Ex->in_fd, Ex->in_gen, Ex->in_conn_id, Ex->out_fd, Ex->out_gen, Ex->out_conn_id);
1760
1761 if (proxy_tag_set) {
1762 flags |= 8;
1763 }
1764
1765 TLS_START (JOB_REF_PASS (d)); // open tlio_out context
1766
1767 tl_store_int (RPC_PROXY_REQ);
1768 tl_store_int (flags);
1769 tl_store_long (Ex->out_conn_id);
1770
1771 if (remote_ip_port) {
1772 tl_store_raw_data (remote_ip_port, 20);
1773 } else {
1774 if (CONN_INFO(c)->remote_ip) {
1775 tl_store_long (0);
1776 tl_store_int (-0x10000);
1777 tl_store_int (htonl (CONN_INFO(c)->remote_ip));
1778 } else {
1779 tl_store_raw_data (CONN_INFO(c)->remote_ipv6, 16);
1780 }
1781 tl_store_int (CONN_INFO(c)->remote_port);
1782 }
1783
1784 if (our_ip_port) {
1785 tl_store_raw_data (our_ip_port, 20);
1786 } else {
1787 if (CONN_INFO(c)->our_ip) {
1788 tl_store_long (0);
1789 tl_store_int (-0x10000);
1790 tl_store_int (htonl (nat_translate_ip (CONN_INFO(c)->our_ip)));
1791 } else {
1792 tl_store_raw_data (CONN_INFO(c)->our_ipv6, 16);
1793 }
1794 tl_store_int (CONN_INFO(c)->our_port);
1795 }
1796
1797 if (flags & 12) {
1798 int *extra_size_ptr = tl_store_get_ptr (4);
1799 int pos = TL_OUT_POS;
1800 if (flags & 8) {
1801 tl_store_int (TL_PROXY_TAG);
1802 tl_store_string (proxy_tag, sizeof (proxy_tag));
1803 }
1804 if (flags & 4) {
1805 tl_store_int (TL_HTTP_QUERY_INFO);
1806 tl_store_string (cur_http_origin, cur_http_origin_len >= 0 ? cur_http_origin_len : 0);
1807 tl_store_string (cur_http_referer, cur_http_referer_len >= 0 ? cur_http_referer_len : 0);
1808 tl_store_string (cur_http_user_agent, cur_http_user_agent_len >= 0 ? cur_http_user_agent_len : 0);
1809 }
1810 *extra_size_ptr = TL_OUT_POS - pos;
1811 }
1812
1813 int len = TL_IN_REMAINING;
1814 assert (tl_copy_through (tlio_in, tlio_out, len, 1) == len);
1815
1816 TLS_END; // close tlio_out context
1817
1818 if (CONN_INFO(c)->type == &ct_http_server_mtfront) {
1819 assert (CONN_INFO(c)->pending_queries >= 0);
1820 assert (CONN_INFO(c)->pending_queries > 0);
1821 assert (CONN_INFO(c)->pending_queries == 1);
1822 set_connection_timeout (c, HTTP_MAX_WAIT_TIMEOUT);
1823 }
1824
1825 return 1;
1826}
1827
1828/* -------------------------- EXTERFACE ---------------------------- */
1829
1830struct tl_act_extra *mtfront_parse_function (struct tl_in_state *tlio_in, long long actor_id) {
1831 ++api_invoke_requests;
1832 if (actor_id != 0) {
1833 tl_fetch_set_error (TL_ERROR_WRONG_ACTOR_ID, "MTProxy only supports actor_id = 0");
1834 return 0;
1835 }
1836 int op = tl_fetch_int ();
1837 if (tl_fetch_error ()) {
1838 return 0;
1839 }
1840 switch (op) {
1841 default:
1842 tl_fetch_set_error_format (TL_ERROR_UNKNOWN_FUNCTION_ID, "Unknown op %08x", op);
1843 return 0;
1844 }
1845}
1846
1847
1848/* ------------------------ FLOOD CONTROL -------------------------- */
1849
1850struct ext_connection ConnLRU = { .lru_prev = &ConnLRU, .lru_next = &ConnLRU };
1851
1852void lru_delete_ext_conn (struct ext_connection *Ext) {
1853 if (Ext->lru_next) {
1854 Ext->lru_next->lru_prev = Ext->lru_prev;
1855 Ext->lru_prev->lru_next = Ext->lru_next;
1856 }
1857 Ext->lru_next = Ext->lru_prev = 0;
1858}
1859
1860void lru_insert_ext_conn (struct ext_connection *Ext) {
1861 lru_delete_ext_conn (Ext);
1862 Ext->lru_prev = ConnLRU.lru_prev;
1863 Ext->lru_next = &ConnLRU;
1864 Ext->lru_next->lru_prev = Ext;
1865 Ext->lru_prev->lru_next = Ext;
1866}
1867
1868void lru_delete_conn (connection_job_t c) {
1869 struct ext_connection *Ext = get_ext_connection_by_in_fd (CONN_INFO(c)->fd);
1870 if (Ext && Ext->in_fd == CONN_INFO(c)->fd) {
1871 lru_delete_ext_conn (Ext);
1872 }
1873}
1874
1875void lru_insert_conn (connection_job_t c) {
1876 struct ext_connection *Ext = get_ext_connection_by_in_fd (CONN_INFO(c)->fd);
1877 if (Ext && Ext->in_fd == CONN_INFO(c)->fd && Ext->in_gen == CONN_INFO(c)->generation) {
1878 lru_insert_ext_conn (Ext);
1879 }
1880}
1881
1882void check_all_conn_buffers (void) {
1883 struct buffers_stat bufs;
1884 fetch_buffers_stat (&bufs);
1885 long long max_buffer_memory = bufs.max_buffer_chunks * (long long) MSG_BUFFERS_CHUNK_SIZE;
1886 long long to_free = bufs.total_used_buffers_size - max_buffer_memory * 3/4;
1887 while (to_free > 0 && ConnLRU.lru_next != &ConnLRU) {
1888 struct ext_connection *Ext = ConnLRU.lru_next;
1889 vkprintf (2, "check_all_conn_buffers(): closing connection %d because of %lld total used buffer vytes (%lld max, %lld bytes to free)\n", Ext->in_fd, bufs.total_used_buffers_size, max_buffer_memory, to_free);
1890 connection_job_t d = connection_get_by_fd_generation (Ext->in_fd, Ext->in_gen);
1891 if (d) {
1892 int tot_used_bytes = CONN_INFO(d)->in.total_bytes + CONN_INFO(d)->in_u.total_bytes + CONN_INFO(d)->out.total_bytes + CONN_INFO(d)->out_p.total_bytes;
1893 to_free -= tot_used_bytes * 2;
1894 fail_connection (d, -500);
1895 job_decref (JOB_REF_PASS (d));
1896 }
1897 lru_delete_ext_conn (Ext);
1898 ++connections_failed_lru;
1899 }
1900}
1901
1902int check_conn_buffers (connection_job_t c) {
1903 int tot_used_bytes = CONN_INFO(c)->in.total_bytes + CONN_INFO(c)->in_u.total_bytes + CONN_INFO(c)->out.total_bytes + CONN_INFO(c)->out_p.total_bytes;
1904 if (tot_used_bytes > MAX_CONNECTION_BUFFER_SPACE) {
1905 vkprintf (2, "check_conn_buffers(): closing connection %d because of %d buffer bytes used (%d max)\n", CONN_INFO(c)->fd, tot_used_bytes, MAX_CONNECTION_BUFFER_SPACE);
1906 fail_connection (c, -429);
1907 ++connections_failed_flood;
1908 return -1;
1909 }
1910 return 0;
1911}
1912
1913// invoked in NET-CPU context!
1914int mtfront_data_received (connection_job_t c, int bytes_received) {
1915 // check_conn_buffers (c);
1916 return 0;
1917}
1918
1919// invoked in NET-CPU context!
1920int mtfront_data_sent (connection_job_t c, int bytes_sent) {
1921 // lru_insert_conn (c);
1922 return 0;
1923}
1924
1925void init_ct_server_mtfront (void) {
1926 assert (check_conn_functions (&ct_http_server, 1) >= 0);
1927 memcpy (&ct_http_server_mtfront, &ct_http_server, sizeof (conn_type_t));
1928 memcpy (&ct_tcp_rpc_ext_server_mtfront, &ct_tcp_rpc_ext_server, sizeof (conn_type_t));
1929 memcpy (&ct_tcp_rpc_server_mtfront, &ct_tcp_rpc_server, sizeof (conn_type_t));
1930 memcpy (&ct_tcp_rpc_client_mtfront, &ct_tcp_rpc_client, sizeof (conn_type_t));
1931 ct_http_server_mtfront.data_received = &mtfront_data_received;
1932 ct_tcp_rpc_ext_server_mtfront.data_received = &mtfront_data_received;
1933 ct_tcp_rpc_server_mtfront.data_received = &mtfront_data_received;
1934 ct_http_server_mtfront.data_sent = &mtfront_data_sent;
1935 ct_tcp_rpc_ext_server_mtfront.data_sent = &mtfront_data_sent;
1936 ct_tcp_rpc_server_mtfront.data_sent = &mtfront_data_sent;
1937}
1938
1939/*
1940 *
1941 * PARSE ARGS & INITIALIZATION
1942 *
1943 */
1944
1945static void check_children_dead (void) {
1946 int i, j;
1947 for (j = 0; j < 11; j++) {
1948 for (i = 0; i < workers; i++) {
1949 if (pids[i]) {
1950 int status = 0;
1951 int res = waitpid (pids[i], &status, WNOHANG);
1952 if (res == pids[i]) {
1953 if (WIFEXITED (status) || WIFSIGNALED (status)) {
1954 pids[i] = 0;
1955 } else {
1956 break;
1957 }
1958 } else if (res == 0) {
1959 break;
1960 } else if (res != -1 || errno != EINTR) {
1961 pids[i] = 0;
1962 } else {
1963 break;
1964 }
1965 }
1966 }
1967 if (i == workers) {
1968 break;
1969 }
1970 if (j < 10) {
1971 usleep (100000);
1972 }
1973 }
1974 if (j == 11) {
1975 int cnt = 0;
1976 for (i = 0; i < workers; i++) {
1977 if (pids[i]) {
1978 ++cnt;
1979 kill (pids[i], SIGKILL);
1980 }
1981 }
1982 kprintf ("WARNING: %d children unfinished --> they are now killed\n", cnt);
1983 }
1984}
1985
1986static void kill_children (int signal) {
1987 int i;
1988 assert (workers);
1989 for (i = 0; i < workers; i++) {
1990 if (pids[i]) {
1991 kill (pids[i], signal);
1992 }
1993 }
1994}
1995
1996// SIGCHLD
1997void on_child_termination (void) {
1998}
1999
2000void check_children_status (void) {
2001 if (workers) {
2002 int i;
2003 for (i = 0; i < workers; i++) {
2004 int status = 0;
2005 int res = waitpid (pids[i], &status, WNOHANG);
2006 if (res == pids[i]) {
2007 if (WIFEXITED (status) || WIFSIGNALED (status)) {
2008 kprintf ("Child %d terminated, aborting\n", pids[i]);
2009 pids[i] = 0;
2010 kill_children (SIGTERM);
2011 check_children_dead ();
2012 exit (EXIT_FAILURE);
2013 }
2014 } else if (res == 0) {
2015 } else if (res != -1 || errno != EINTR) {
2016 kprintf ("Child %d: unknown result during wait (%d, %m), aborting\n", pids[i], res);
2017 pids[i] = 0;
2018 kill_children (SIGTERM);
2019 check_children_dead ();
2020 exit (EXIT_FAILURE);
2021 }
2022 }
2023 } else if (slave_mode) {
2024 if (getppid () != parent_pid) {
2025 kprintf ("Parent %d is changed to %d, aborting\n", parent_pid, getppid ());
2026 exit (EXIT_FAILURE);
2027 }
2028 }
2029}
2030
2031void check_special_connections_overflow (void) {
2032 if (max_special_connections && !slave_mode) {
2033 int max_user_conn = workers ? SumStats.conn.max_special_connections : max_special_connections;
2034 int cur_user_conn = workers ? SumStats.conn.active_special_connections : active_special_connections;
2035 if (cur_user_conn * 10 > max_user_conn * 9) {
2036 vkprintf (0, "CRITICAL: used %d user connections out of %d\n", cur_user_conn, max_user_conn);
2037 }
2038 }
2039}
2040
2041void cron (void) {
2042 check_children_status ();
2043 compute_stats_sum ();
2044 check_special_connections_overflow ();
2045 check_all_conn_buffers ();
2046}
2047
2048int sfd;
2049int http_ports_num;
2050int http_sfd[MAX_HTTP_LISTEN_PORTS], http_port[MAX_HTTP_LISTEN_PORTS];
2051
2052// static double next_create_outbound;
2053// int outbound_connections_per_second = DEFAULT_OUTBOUND_CONNECTION_CREATION_RATE;
2054
2055void mtfront_pre_loop (void) {
2056 int i, enable_ipv6 = engine_check_ipv6_enabled () ? SM_IPV6 : 0;
2057 tcp_maximize_buffers = 1;
2058 if (!workers) {
2059 for (i = 0; i < http_ports_num; i++) {
2060 init_listening_tcpv6_connection (http_sfd[i], &ct_tcp_rpc_ext_server_mtfront, &ext_rpc_methods, enable_ipv6 | SM_LOWPRIO | SM_NOQACK | (max_special_connections ? SM_SPECIAL : 0));
2061 // assert (setsockopt (http_sfd[i], IPPROTO_TCP, TCP_MAXSEG, (int[]){1410}, sizeof (int)) >= 0);
2062 // assert (setsockopt (http_sfd[i], IPPROTO_TCP, TCP_NODELAY, (int[]){1}, sizeof (int)) >= 0);
2063 listening_connection_job_t LC = Events[http_sfd[i]].data;
2064 assert (LC);
2065 CONN_INFO(LC)->window_clamp = window_clamp;
2066 if (setsockopt (http_sfd[i], IPPROTO_TCP, TCP_WINDOW_CLAMP, &window_clamp, 4) < 0) {
2067 vkprintf (0, "error while setting window size for socket %d to %d: %m\n", http_sfd[i], window_clamp);
2068 }
2069 }
2070 // create_all_outbound_connections ();
2071 }
2072}
2073
2074void precise_cron (void) {
2075 update_local_stats ();
2076}
2077
2078void mtfront_sigusr1_handler (void) {
2079 reopen_logs_ext (slave_mode);
2080 if (workers) {
2081 kill_children (SIGUSR1);
2082 }
2083}
2084
2085/*
2086 *
2087 * MAIN
2088 *
2089 */
2090
2091void usage (void) {
2092 printf ("usage: %s [-v] [-6] [-p<port>] [-H<http-port>{,<http-port>}] [-M<workers>] [-u<username>] [-b<backlog>] [-c<max-conn>] [-l<log-name>] [-W<window-size>] <config-file>\n", progname);
2093 printf ("%s\n", FullVersionStr);
2094 printf ("\tSimple MT-Proto proxy\n");
2095 parse_usage ();
2096 exit (2);
2097}
2098
2099int f_parse_option (int val) {
2100 char *colon, *ptr;
2101 switch (val) {
2102 case 'C':
2103 max_special_connections = atoi (optarg);
2104 if (max_special_connections < 0) {
2105 max_special_connections = 0;
2106 }
2107 break;
2108 case 'W':
2109 window_clamp = atoi (optarg);
2110 break;
2111 case 'H':
2112 ptr = optarg;
2113 if (!*ptr) {
2114 usage ();
2115 return 2;
2116 }
2117 while (*ptr >= '1' && *ptr <= '9' && http_ports_num < MAX_HTTP_LISTEN_PORTS) {
2118 int i = http_port[http_ports_num++] = strtol (ptr, &colon, 10);
2119 assert (colon > ptr && i > 0 && i < 65536);
2120 ptr = colon;
2121 if (*ptr != ',') {
2122 break;
2123 } else {
2124 ptr++;
2125 }
2126 }
2127 if (*ptr) {
2128 usage ();
2129 return 2;
2130 }
2131 break;
2132 /*
2133 case 'o':
2134 outbound_connections_per_second = atoi (optarg);
2135 if (outbound_connections_per_second <= 0) {
2136 outbound_connections_per_second = 1;
2137 }
2138 break;
2139 */
2140 case 'M':
2141 workers = atoi (optarg);
2142 assert (workers >= 0 && workers <= MAX_WORKERS);
2143 break;
2144 case 'T':
2145 ping_interval = atof (optarg);
2146 if (ping_interval <= 0) {
2147 ping_interval = PING_INTERVAL;
2148 }
2149 break;
2150 case 'S':
2151 case 'P':
2152 {
2153 if (strlen (optarg) != 32) {
2154 kprintf ("'%c' option requires exactly 32 hex digits\n", val);
2155 usage ();
2156 }
2157
2158 unsigned char secret[16];
2159 int i;
2160 unsigned char b = 0;
2161 for (i = 0; i < 32; i++) {
2162 if (optarg[i] >= '0' && optarg[i] <= '9') {
2163 b = b * 16 + optarg[i] - '0';
2164 } else if (optarg[i] >= 'a' && optarg[i] <= 'f') {
2165 b = b * 16 + optarg[i] - 'a' + 10;
2166 } else if (optarg[i] >= 'A' && optarg[i] <= 'F') {
2167 b = b * 16 + optarg[i] - 'A' + 10;
2168 } else {
2169 kprintf ("'S' option requires exactly 32 hex digits. '%c' is not hexdigit\n", optarg[i]);
2170 usage ();
2171 }
2172 if (i & 1) {
2173 secret[i / 2] = b;
2174 b = 0;
2175 }
2176 }
2177 if (val == 'S') {
2178 tcp_rpcs_set_ext_secret (secret);
2179 } else {
2180 memcpy (proxy_tag, secret, sizeof (proxy_tag));
2181 proxy_tag_set = 1;
2182 }
2183 }
2184 break;
2185 default:
2186 return -1;
2187 }
2188 return 0;
2189}
2190
2191void mtfront_prepare_parse_options (void) {
2192 parse_option ("mtproto-secret", required_argument, 0, 'S', "16-byte secret in hex mode");
2193 parse_option ("proxy-tag", required_argument, 0, 'P', "16-byte proxy tag in hex mode to be passed along with all forwarded queries");
2194 parse_option ("max-special-connections", required_argument, 0, 'C', "sets maximal number of accepted client connections per worker");
2195 parse_option ("window-clamp", required_argument, 0, 'W', "sets window clamp for client TCP connections");
2196 parse_option ("http-ports", required_argument, 0, 'H', "comma-separated list of client (HTTP) ports to listen");
2197 // parse_option ("outbound-connections-ps", required_argument, 0, 'o', "limits creation rate of outbound connections to mtproto-servers (default %d)", DEFAULT_OUTBOUND_CONNECTION_CREATION_RATE);
2198 parse_option ("slaves", required_argument, 0, 'M', "spawn several slave workers");
2199 parse_option ("ping-interval", required_argument, 0, 'T', "sets ping interval in second for local TCP connections (default %.3lf)", PING_INTERVAL);
2200}
2201
2202void mtfront_parse_extra_args (int argc, char *argv[]) /* {{{ */ {
2203 if (argc != 1) {
2204 usage ();
2205 exit (2);
2206 }
2207 config_filename = argv[0];
2208 vkprintf (0, "config_filename = '%s'\n", config_filename);
2209}
2210
2211// executed BEFORE dropping privileges
2212void mtfront_pre_init (void) {
2213 init_ct_server_mtfront ();
2214
2215 int res = do_reload_config (0x26);
2216
2217 if (res < 0) {
2218 fprintf (stderr, "config check failed! (code %d)\n", res);
2219 exit (-res);
2220 }
2221
2222 vkprintf (1, "config loaded!\n");
2223
2224 int i, enable_ipv6 = engine_check_ipv6_enabled () ? SM_IPV6 : 0;
2225
2226 for (i = 0; i < http_ports_num; i++) {
2227 http_sfd[i] = server_socket (http_port[i], engine_state->settings_addr, engine_get_backlog (), enable_ipv6);
2228 if (http_sfd[i] < 0) {
2229 fprintf (stderr, "cannot open http/tcp server socket at port %d: %m\n", http_port[i]);
2230 exit (1);
2231 }
2232 }
2233
2234 if (workers) {
2235 if (!kdb_hosts_loaded) {
2236 kdb_load_hosts ();
2237 }
2238 WStats = mmap (0, 2 * workers * sizeof (struct worker_stats), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
2239 assert (WStats);
2240 // kprintf_multiprocessing_mode_enable ();
2241 int real_parent_pid = getpid();
2242 vkprintf (0, "creating %d workers\n", workers);
2243 for (i = 0; i < workers; i++) {
2244 int pid = fork ();
2245 assert (pid >= 0);
2246 if (!pid) {
2247 worker_id = i;
2248 workers = 0;
2249 slave_mode = 1;
2250 parent_pid = getppid ();
2251 assert (parent_pid == real_parent_pid);
2252 engine_enable_slave_mode ();
2253 engine_state->do_not_open_port = 1;
2254 break;
2255 } else {
2256 pids[i] = pid;
2257 }
2258 }
2259 }
2260}
2261
2262void mtfront_pre_start (void) {
2263 int res = do_reload_config (0x17);
2264
2265 if (res < 0) {
2266 fprintf (stderr, "config check failed! (code %d)\n", res);
2267 exit (-res);
2268 }
2269
2270 assert (CurConf->have_proxy);
2271
2272 proxy_mode |= PROXY_MODE_OUT;
2273 mtfront_rpc_client.mode_flags |= TCP_RPC_IGNORE_PID;
2274 ct_tcp_rpc_client_mtfront.flags |= C_EXTERNAL;
2275
2276 assert (proxy_mode == PROXY_MODE_OUT);
2277}
2278
2279void mtfront_on_exit (void) {
2280 if (workers) {
2281 if (signal_check_pending (SIGTERM)) {
2282 kill_children (SIGTERM);
2283 }
2284 check_children_dead ();
2285 }
2286}
2287
2288server_functions_t mtproto_front_functions = {
2289 .default_modules_disabled = 0,
2290 .cron = cron,
2291 .precise_cron = precise_cron,
2292 .pre_init = mtfront_pre_init,
2293 .pre_start = mtfront_pre_start,
2294 .pre_loop = mtfront_pre_loop,
2295 .on_exit = mtfront_on_exit,
2296 .prepare_stats = mtfront_prepare_stats,
2297 .parse_option = f_parse_option,
2298 .prepare_parse_options = mtfront_prepare_parse_options,
2299 .parse_extra_args = mtfront_parse_extra_args,
2300 .epoll_timeout = 1,
2301 .FullVersionStr = FullVersionStr,
2302 .ShortVersionStr = "mtproxy",
2303 .parse_function = mtfront_parse_function,
2304 .http_functions = &http_methods_stats
2305};
2306
2307int main (int argc, char *argv[]) {
2308 mtproto_front_functions.allowed_signals |= SIG2INT (SIGCHLD);
2309 mtproto_front_functions.signal_handlers[SIGCHLD] = on_child_termination;
2310 mtproto_front_functions.signal_handlers[SIGUSR1] = mtfront_sigusr1_handler;
2311 return default_main (&mtproto_front_functions, argc, argv);
2312}
2313