1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2010-2013 Vkontakte Ltd
18 2010-2013 Nikolai Durov
19 2010-2013 Andrey Lopatin
20 2013 Vitaliy Valtman
21
22 Copyright 2014-2016 Telegram Messenger Inc
23 2015-2016 Vitaly Valtman
24*/
25
26#include <assert.h>
27#include <stdio.h>
28#include <sys/uio.h>
29
30#include "common/precise-time.h"
31#include "common/rpc-const.h"
32#include "common/mp-queue.h"
33#include "net/net-msg.h"
34#include "net/net-tcp-connections.h"
35#include "net/net-tcp-rpc-common.h"
36#include "kprintf.h"
37#include "vv/vv-io.h"
38
39// Flags:
40// Flag 1 - can not edit this message. Need to make copy.
41
42void tcp_rpc_conn_send_init (connection_job_t C, struct raw_message *raw, int flags) {
43 struct connection_info *c = CONN_INFO (C);
44 vkprintf (3, "%s: sending message of size %d to conn fd=%d\n", __func__, raw->total_bytes, c->fd);
45 assert (!(raw->total_bytes & 3));
46 int Q[2];
47 Q[0] = raw->total_bytes + 12;
48 Q[1] = TCP_RPC_DATA(C)->out_packet_num ++;
49 struct raw_message *r = malloc (sizeof (*r));
50 if (flags & 1) {
51 rwm_clone (r, raw);
52 } else {
53 *r = *raw;
54 }
55 rwm_push_data_front (r, Q, 8);
56 unsigned crc32 = rwm_custom_crc32 (r, r->total_bytes, TCP_RPC_DATA(C)->custom_crc_partial);
57 rwm_push_data (r, &crc32, 4);
58
59 socket_connection_job_t S = c->io_conn;
60
61 if (S) {
62 mpq_push_w (SOCKET_CONN_INFO (S)->out_packet_queue, r, 0);
63 job_signal (JOB_REF_CREATE_PASS (S), JS_RUN);
64 }
65}
66
67void tcp_rpc_conn_send_im (JOB_REF_ARG (C), struct raw_message *raw, int flags) {
68 struct connection_info *c = CONN_INFO (C);
69 vkprintf (3, "%s: sending message of size %d to conn fd=%d\n", __func__, raw->total_bytes, c->fd);
70 assert (!(raw->total_bytes & 3));
71 int Q[2];
72 Q[0] = raw->total_bytes + 12;
73 Q[1] = TCP_RPC_DATA(C)->out_packet_num ++;
74 struct raw_message *r = malloc (sizeof (*r));
75 if (flags & 1) {
76 rwm_clone (r, raw);
77 } else {
78 *r = *raw;
79 }
80 rwm_push_data_front (r, Q, 8);
81 unsigned crc32 = rwm_custom_crc32 (r, r->total_bytes, TCP_RPC_DATA(C)->custom_crc_partial);
82 rwm_push_data (r, &crc32, 4);
83
84 rwm_union (&c->out, r);
85 free (r);
86
87 job_signal (JOB_REF_PASS (C), JS_RUN);
88}
89
90void tcp_rpc_conn_send (JOB_REF_ARG (C), struct raw_message *raw, int flags) {
91 struct connection_info *c = CONN_INFO (C);
92 vkprintf (3, "%s: sending message of size %d to conn fd=%d\n", __func__, raw->total_bytes, c->fd);
93 if (!(flags & 8)) {
94 assert (!(raw->total_bytes & 3));
95 }
96 struct raw_message *r;
97 if (flags & 4) {
98 r = raw;
99 assert (!(flags & 1));
100 } else {
101 r = malloc (sizeof (*r));
102 if (flags & 1) {
103 rwm_clone (r, raw);
104 } else {
105 *r = *raw;
106 }
107 }
108
109 mpq_push_w (c->out_queue, r, 0);
110 job_signal (JOB_REF_PASS (C), JS_RUN);
111}
112
113void tcp_rpc_conn_send_data (JOB_REF_ARG (C), int len, void *Q) {
114 assert (!(len & 3));
115 struct raw_message r;
116 assert (rwm_create (&r, Q, len) == len);
117 tcp_rpc_conn_send (JOB_REF_PASS (C), &r, 0);
118}
119
120void tcp_rpc_conn_send_data_init (connection_job_t c, int len, void *Q) {
121 assert (!(len & 3));
122 struct raw_message r;
123 assert (rwm_create (&r, Q, len) == len);
124 tcp_rpc_conn_send_init (c, &r, 0);
125}
126
127void tcp_rpc_conn_send_data_im (JOB_REF_ARG (C), int len, void *Q) {
128 assert (!(len & 3));
129 struct raw_message r;
130 assert (rwm_create (&r, Q, len) == len);
131 tcp_rpc_conn_send_im (JOB_REF_PASS (C), &r, 0);
132}
133
134int tcp_rpc_default_execute (connection_job_t C, int op, struct raw_message *raw) /* {{{ */ {
135 struct connection_info *c = CONN_INFO (C);
136
137 vkprintf (1, "rpcc_execute: fd=%d, op=%d, len=%d\n", c->fd, op, raw->total_bytes);
138 if (op == RPC_PING && raw->total_bytes == 12) {
139 c->last_response_time = precise_now;
140 int P[3];
141 assert (rwm_fetch_data (raw, P, 12) == 12);
142 P[0] = RPC_PONG;
143 //P[1] = Q[1];
144 //P[2] = Q[2];
145
146 vkprintf (2, "received ping from " IP_PRINT_STR ":%d (val = %lld)\n", IP_TO_PRINT (c->remote_ip), (int)c->remote_port, *(long long *)(P + 1));
147 tcp_rpc_conn_send_data (JOB_REF_CREATE_PASS (C), 12, P);
148 return 0;
149 }
150 c->last_response_time = precise_now;
151 return 0;
152}
153/* }}} */
154
155int tcp_rpc_flush_packet (connection_job_t C) {
156 return CONN_INFO(C)->type->flush (C);
157}
158
159int tcp_rpc_write_packet (connection_job_t C, struct raw_message *raw) {
160 int Q[2];
161 if (!(TCP_RPC_DATA(C)->flags & (RPC_F_COMPACT | RPC_F_MEDIUM))) {
162 Q[0] = raw->total_bytes + 12;
163 Q[1] = TCP_RPC_DATA(C)->out_packet_num ++;
164
165 rwm_push_data_front (raw, Q, 8);
166 unsigned crc32 = rwm_custom_crc32 (raw, raw->total_bytes, TCP_RPC_DATA(C)->custom_crc_partial);
167 rwm_push_data (raw, &crc32, 4);
168
169 rwm_union (&CONN_INFO(C)->out, raw);
170 }
171
172 return 0;
173}
174
175int tcp_rpc_write_packet_compact (connection_job_t C, struct raw_message *raw) {
176 if (raw->total_bytes == 5) {
177 int flag = 0;
178 assert (rwm_fetch_data (raw, &flag, 1) == 1);
179 assert (flag == 0xdd);
180 rwm_union (&CONN_INFO(C)->out, raw);
181 return 0;
182 }
183
184 if (!(TCP_RPC_DATA(C)->flags & (RPC_F_COMPACT | RPC_F_MEDIUM))) {
185 return tcp_rpc_write_packet (C, raw);
186 }
187
188 int len = raw->total_bytes;
189 assert (!(len & 0xfc000003));
190 if (TCP_RPC_DATA(C)->flags & RPC_F_MEDIUM) {
191 rwm_push_data_front (raw, &len, 4);
192 } else if (len <= 0x7e * 4) {
193 len >>= 2;
194 rwm_push_data_front (raw, &len, 1);
195 } else {
196 len = (len << 6) | 0x7f;
197 rwm_push_data_front (raw, &len, 4);
198 }
199 rwm_union (&CONN_INFO(C)->out, raw);
200
201 return 0;
202}
203
204int tcp_rpc_flush (connection_job_t C) {
205 struct connection_info *c = CONN_INFO (C);
206
207 if (c->crypto) {
208 int pad_bytes = c->type->crypto_needed_output_bytes (C);
209 vkprintf (2, "tcp_rpcs_flush_packet: padding with %d bytes\n", pad_bytes);
210 if (pad_bytes > 0) {
211 assert (!(pad_bytes & 3));
212 static const int pad_str[3] = {4, 4, 4};
213 assert (pad_bytes <= 12);
214 assert (rwm_push_data (&c->out, pad_str, pad_bytes) == pad_bytes);
215 }
216 }
217
218 return 0;
219}
220
221void tcp_rpc_send_ping (connection_job_t C, long long ping_id) {
222 int P[3];
223 P[0] = RPC_PING;
224 *(long long *)(P + 1) = ping_id;
225 tcp_rpc_conn_send_data (JOB_REF_CREATE_PASS (C), 12, P);
226}
227
228static unsigned default_rpc_flags = 0;
229
230unsigned tcp_set_default_rpc_flags (unsigned and_flags, unsigned or_flags) {
231 return (default_rpc_flags = (default_rpc_flags & and_flags) | or_flags);
232}
233
234unsigned tcp_get_default_rpc_flags (void) {
235 return default_rpc_flags;
236}
237
238static __thread double cur_dh_accept_rate_remaining;
239static __thread double cur_dh_accept_rate_time;
240static double max_dh_accept_rate;
241
242void tcp_set_max_dh_accept_rate (int rate) {
243 max_dh_accept_rate = rate;
244}
245
246int tcp_add_dh_accept (void) {
247 if (max_dh_accept_rate) {
248 cur_dh_accept_rate_remaining += (precise_now - cur_dh_accept_rate_time) * max_dh_accept_rate;
249 cur_dh_accept_rate_time = precise_now;
250 if (cur_dh_accept_rate_remaining > max_dh_accept_rate) {
251 cur_dh_accept_rate_remaining = max_dh_accept_rate;
252 }
253 if (cur_dh_accept_rate_remaining < 1) {
254 return -1;
255 }
256 cur_dh_accept_rate_remaining -= 1;
257 }
258 return 0;
259}
260
261