1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2013 Vkontakte Ltd
18 2013 Vitaliy Valtman
19
20 Copyright 2014-2016 Telegram Messenger Inc
21 2015-2016 Vitaly Valtman
22
23*/
24
25#include <assert.h>
26#include <stdlib.h>
27#include <string.h>
28#include <unistd.h>
29#include "net/net-rpc-targets.h"
30#include "vv/vv-tree.h"
31//#include "net/net-rpc-common.h"
32//#include "net/net-rpc-server.h"
33#include "net/net-tcp-rpc-client.h"
34#include "net/net-tcp-rpc-common.h"
35#include "kprintf.h"
36#include "net/net-connections.h"
37#include "vv/vv-io.h"
38
39#include "common/common-stats.h"
40#include "common/mp-queue.h"
41#include "common/server-functions.h"
42
43#define rpc_target_cmp(a,b) (RPC_TARGET_INFO(a)->PID.port ? memcmp (&RPC_TARGET_INFO(a)->PID, &RPC_TARGET_INFO(b)->PID, 6) : memcmp (&RPC_TARGET_INFO(a)->PID, &RPC_TARGET_INFO(b)->PID, 8))
44
45//DEFINE_TREE(rpc_target, rpc_target_job_t, rpc_target_cmp, MALLOC)
46
47//DEFINE_TREE(rpc_target, struct rpc_target *, rpc_target_cmp)
48#define X_TYPE rpc_target_job_t
49#define X_CMP rpc_target_cmp
50#define TREE_NAME rpc_target
51#define TREE_PTHREAD
52#define TREE_MALLOC
53#include "vv/vv-tree.c"
54
55#define X_TYPE connection_job_t
56#define X_CMP(a,b) (((a) < (b)) ? -1 : ((a) > (b)) ? 1 : 0)
57#define TREE_NAME connection
58#define TREE_PTHREAD
59#define TREE_MALLOC
60#define TREE_INCREF job_incref
61#define TREE_DECREF job_decref_f
62#include "vv/vv-tree.c"
63
64static struct tree_rpc_target *rpc_target_tree;
65struct tree_rpc_target *get_rpc_target_tree_ptr (struct tree_rpc_target **T);
66void free_rpc_target_tree_ptr (struct tree_rpc_target *T);
67
68
69/* {{{ STAT */
70#define MODULE rpc_targets
71
72MODULE_STAT_TYPE {
73 long long total_rpc_targets;
74 long long total_connections_in_rpc_targets;
75};
76
77MODULE_INIT
78
79MODULE_STAT_FUNCTION
80 SB_SUM_ONE_LL (total_rpc_targets);
81 SB_SUM_ONE_LL (total_connections_in_rpc_targets);
82MODULE_STAT_FUNCTION_END
83/* }}} */
84
85static rpc_target_job_t rpc_target_alloc (struct process_id PID) {
86 assert_engine_thread ();
87 rpc_target_job_t SS = calloc (sizeof (struct async_job) + sizeof (struct rpc_target_info), 1);
88 struct rpc_target_info *S = RPC_TARGET_INFO (SS);
89
90 S->PID = PID;
91
92 struct tree_rpc_target *old = rpc_target_tree;
93
94 if (old) {
95 __sync_fetch_and_add (&old->refcnt, 1);
96 }
97
98 rpc_target_tree = tree_insert_rpc_target (rpc_target_tree, SS, lrand48_j ());
99 MODULE_STAT->total_rpc_targets ++;
100 //hexdump ((void *)rpc_target_tree, (void *)(rpc_target_tree + 1));
101 free_tree_ptr_rpc_target (old);
102
103 return SS;
104}
105
106void rpc_target_insert_conn (connection_job_t C) {
107 assert_engine_thread ();
108 struct connection_info *c = CONN_INFO (C);
109
110 if (c->flags & (C_ERROR | C_NET_FAILED | C_FAILED)) {
111 return;
112 }
113 if (TCP_RPC_DATA(C)->in_rpc_target) { return; }
114
115 assert_net_cpu_thread ();
116 //st_update_host ();
117 struct rpc_target_info t;
118 t.PID = TCP_RPC_DATA(C)->remote_pid;
119 assert (t.PID.ip);
120
121 vkprintf (2, "rpc_target_insert_conn: ip = " IP_PRINT_STR ", port = %d, fd = %d\n", IP_TO_PRINT (t.PID.ip), (int) t.PID.port, c->fd);
122 rpc_target_job_t fake_target = ((void *)&t) - offsetof (struct async_job, j_custom);
123
124
125 rpc_target_job_t SS = tree_lookup_ptr_rpc_target (rpc_target_tree, fake_target);
126
127 if (!SS) {
128 SS = rpc_target_alloc (t.PID);
129 }
130
131 struct rpc_target_info *S = RPC_TARGET_INFO (SS);
132
133 connection_job_t D = tree_lookup_ptr_connection (S->conn_tree, C);
134 assert (!D);
135
136 struct tree_connection *old = S->conn_tree;
137
138 if (old) {
139 __sync_fetch_and_add (&old->refcnt, 1);
140 }
141
142 S->conn_tree = tree_insert_connection (S->conn_tree, job_incref (C), lrand48_j ());
143 MODULE_STAT->total_connections_in_rpc_targets ++;
144
145 __sync_synchronize ();
146 free_tree_ptr_connection (old);
147
148 TCP_RPC_DATA(C)->in_rpc_target = 1;
149}
150
151void rpc_target_delete_conn (connection_job_t C) {
152 assert_engine_thread ();
153 struct connection_info *c = CONN_INFO (C);
154
155 if (!TCP_RPC_DATA(C)->in_rpc_target) { return; }
156
157 assert_net_cpu_thread ();
158 //st_update_host ();
159 struct rpc_target_info t;
160 t.PID = TCP_RPC_DATA(C)->remote_pid;
161 if (!t.PID.ip) {
162 t.PID.ip = PID.ip;
163 }
164
165 vkprintf (2, "rpc_target_insert_conn: ip = " IP_PRINT_STR ", port = %d, fd = %d\n", IP_TO_PRINT (t.PID.ip), (int) t.PID.port, c->fd);
166 rpc_target_job_t fake_target = ((void *)&t) - offsetof (struct async_job, j_custom);
167
168
169 rpc_target_job_t SS = tree_lookup_ptr_rpc_target (rpc_target_tree, fake_target);
170
171 if (!SS) {
172 SS = rpc_target_alloc (t.PID);
173 }
174
175 struct rpc_target_info *S = RPC_TARGET_INFO (SS);
176
177 connection_job_t D = tree_lookup_ptr_connection (S->conn_tree, C);
178 assert (D);
179
180 struct tree_connection *old = S->conn_tree;
181 if (old) {
182 __sync_fetch_and_add (&old->refcnt, 1);
183 }
184 S->conn_tree = tree_delete_connection (S->conn_tree, C);
185 MODULE_STAT->total_connections_in_rpc_targets --;
186
187 __sync_synchronize ();
188
189 free_tree_ptr_connection (old);
190
191 TCP_RPC_DATA(C)->in_rpc_target = 0;
192}
193
194rpc_target_job_t rpc_target_lookup (struct process_id *pid) {
195 assert (pid);
196 struct rpc_target_info t;
197 t.PID = *pid;
198 if (!t.PID.ip) { t.PID.ip = PID.ip; }
199 rpc_target_job_t fake_target = ((void *)&t) - offsetof (struct async_job, j_custom);
200 assert (RPC_TARGET_INFO(fake_target) == &t);
201
202 int fast = this_job_thread && this_job_thread->thread_class == JC_ENGINE;
203
204 struct tree_rpc_target *T = fast ? rpc_target_tree : get_tree_ptr_rpc_target (&rpc_target_tree);
205 rpc_target_job_t S = tree_lookup_ptr_rpc_target (T, fake_target);
206 if (!fast) {
207 tree_free_rpc_target (T);
208 }
209 return S;
210}
211
212rpc_target_job_t rpc_target_lookup_hp (unsigned ip, int port) {
213 struct process_id p;
214 p.ip = ip;
215 p.port = port;
216 return rpc_target_lookup (&p);
217}
218
219rpc_target_job_t rpc_target_lookup_target (conn_target_job_t SS) {
220 struct conn_target_info *S = CONN_TARGET_INFO (SS);
221 if (S->custom_field == -1) {
222 return 0;
223 }
224 return rpc_target_lookup_hp (S->custom_field, S->port);
225}
226
227void check_connection (connection_job_t C, void *ex, void *ex2, void *ex3) {
228 int *best_unr = ex2;
229 if (*best_unr < 0) { return; }
230 connection_job_t *R = ex;
231 struct process_id *PID = ex3;
232
233 struct connection_info *c = CONN_INFO (C);
234 int r = c->type->check_ready (C);
235
236 if ((c->flags & (C_ERROR | C_FAILED | C_NET_FAILED)) || c->error) {
237 return;
238 }
239
240 if (r == cr_ok) {
241 if (!PID || matches_pid (&TCP_RPC_DATA(C)->remote_pid, PID) >= 1) {
242 *best_unr = -1;
243 *R = C;
244 }
245 } else if (r == cr_stopped && c->unreliability < *best_unr) {
246 if (!PID || matches_pid (&TCP_RPC_DATA(C)->remote_pid, PID) >= 1) {
247 *best_unr = c->unreliability;
248 *R = C;
249 }
250 }
251}
252
253struct connection_choose_extra {
254 connection_job_t *Arr;
255 int limit;
256 int pos;
257 int count;
258};
259
260void check_connection_arr (connection_job_t C, void *ex, void *ex2) {
261 struct connection_choose_extra *E = ex;
262 struct process_id *PID = ex2;
263
264 struct connection_info *c = CONN_INFO (C);
265 int r = c->type->check_ready (C);
266
267 if ((c->flags & (C_ERROR | C_FAILED | C_NET_FAILED)) || c->error || r != cr_ok) {
268 return;
269 }
270 if (PID && matches_pid (&TCP_RPC_DATA(C)->remote_pid, PID) < 1) {
271 return;
272 }
273
274 if (E->pos < E->limit) {
275 E->Arr[E->pos ++] = C;
276 } else {
277 int t = lrand48_j () % (E->count + 1);
278 if (t < E->limit) {
279 E->Arr[t] = C;
280 }
281 }
282 E->count ++;
283}
284
285connection_job_t rpc_target_choose_connection (rpc_target_job_t S, struct process_id *pid) {
286 if (!S) {
287 return 0;
288 }
289
290 int fast = this_job_thread && this_job_thread->thread_class == JC_ENGINE;
291
292 struct tree_connection *T = fast ? RPC_TARGET_INFO (S)->conn_tree : get_tree_ptr_connection (&RPC_TARGET_INFO (S)->conn_tree);
293 if (!T) {
294 if (!fast) {
295 tree_free_connection (T);
296 }
297 return NULL;
298 }
299
300 connection_job_t C = NULL;
301
302 int best_unr = 10000;
303 tree_act_ex3_connection (T, check_connection, &C, &best_unr, pid);
304
305 if (C) {
306 job_incref (C);
307 }
308 if (!fast) {
309 tree_free_connection (T);
310 }
311
312 return C;
313}
314
315int rpc_target_choose_random_connections (rpc_target_job_t S, struct process_id *pid, int limit, connection_job_t buf[]) {
316 if (!S) {
317 return 0;
318 }
319
320 struct connection_choose_extra E;
321 E.Arr = buf;
322 E.count = 0;
323 E.pos = 0;
324 E.limit = limit;
325
326 int fast = this_job_thread && this_job_thread->thread_class == JC_ENGINE;
327
328 struct tree_connection *T = fast ? RPC_TARGET_INFO (S)->conn_tree : get_tree_ptr_connection (&RPC_TARGET_INFO (S)->conn_tree);
329 if (!T) {
330 if (!fast) {
331 tree_free_connection (T);
332 }
333 return 0;
334 }
335
336 tree_act_ex2_connection (T, check_connection_arr, &E, pid);
337
338 int i;
339 for (i = 0; i < E.pos; i++) {
340 job_incref (buf[i]);
341 }
342
343 if (!fast) {
344 tree_free_connection (T);
345 }
346
347 return E.pos;
348}
349
350int rpc_target_get_state (rpc_target_job_t S, struct process_id *pid) {
351 connection_job_t C = rpc_target_choose_connection (S, pid);
352 if (!C) {
353 return -1;
354 }
355
356 int r = CONN_INFO(C)->type->check_ready (C);
357 job_decref (JOB_REF_PASS (C));
358
359 if (r == cr_ok) { return 1; }
360 else { return 0; }
361}
362
363void rpc_target_delete (rpc_target_job_t RT) {}
364