1 | /* |
2 | This file is part of Mtproto-proxy Library. |
3 | |
4 | Mtproto-proxy Library is free software: you can redistribute it and/or modify |
5 | it under the terms of the GNU Lesser General Public License as published by |
6 | the Free Software Foundation, either version 2 of the License, or |
7 | (at your option) any later version. |
8 | |
9 | Mtproto-proxy Library is distributed in the hope that it will be useful, |
10 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | GNU Lesser General Public License for more details. |
13 | |
14 | You should have received a copy of the GNU Lesser General Public License |
15 | along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>. |
16 | |
17 | Copyright 2012-2013 Vkontakte Ltd |
18 | 2012-2013 Vitaliy Valtman |
19 | |
20 | Copyright 2014 Telegram Messenger Inc |
21 | 2014 Vitaly Valtman |
22 | */ |
23 | |
24 | #include "common/tl-parse.h" |
25 | |
26 | #include <assert.h> |
27 | #include <stdarg.h> |
28 | #include <stdio.h> |
29 | #include <stdlib.h> |
30 | #include <string.h> |
31 | #include <sys/time.h> |
32 | #include <errno.h> |
33 | |
34 | #include "net/net-events.h" |
35 | #include "net/net-msg.h" |
36 | #include "net/net-msg-buffers.h" |
37 | #include "net/net-rpc-targets.h" |
38 | #include "net/net-tcp-connections.h" |
39 | #include "net/net-tcp-rpc-common.h" |
40 | #include "net/net-tcp-rpc-server.h" |
41 | |
42 | |
43 | #include "common/cpuid.h" |
44 | #include "common/kprintf.h" |
45 | #include "common/server-functions.h" |
46 | |
47 | #include "vv/vv-io.h" |
48 | #include "vv/vv-tree.h" |
49 | |
50 | //#include "auto/TL/common.h" |
51 | //#include "auto/TL/tl-names.h" |
52 | |
53 | #include "engine/engine.h" |
54 | #include "jobs/jobs.h" |
55 | #include "common/common-stats.h" |
56 | |
57 | #define MODULE tl_parse |
58 | |
59 | MODULE_STAT_TYPE { |
60 | long long rpc_queries_received, rpc_answers_error, rpc_answers_received; |
61 | long long rpc_sent_errors, rpc_sent_answers, rpc_sent_queries; |
62 | int tl_in_allocated, tl_out_allocated; |
63 | /* #ifdef TIME_DEBUG |
64 | long long tl_udp_flush_rdtsc; |
65 | long long tl_udp_flush_cnt; |
66 | #endif*/ |
67 | }; |
68 | |
69 | MODULE_INIT |
70 | |
71 | MODULE_STAT_FUNCTION |
72 | double uptime = time (0) - start_time; |
73 | SB_SUM_ONE_LL (rpc_queries_received); |
74 | SB_SUM_ONE_LL (rpc_answers_error); |
75 | SB_SUM_ONE_LL (rpc_answers_received); |
76 | SB_SUM_ONE_LL (rpc_sent_errors); |
77 | SB_SUM_ONE_LL (rpc_sent_answers); |
78 | SB_SUM_ONE_LL (rpc_sent_queries); |
79 | SB_SUM_ONE_I (tl_in_allocated); |
80 | SB_SUM_ONE_I (tl_out_allocated); |
81 | /*#ifdef TIME_DEBUG |
82 | SB_SUM_ONE_LL (tl_udp_flush_rdtsc); |
83 | SB_SUM_ONE_LL (tl_udp_flush_cnt); |
84 | #endif*/ |
85 | sb_printf (sb, |
86 | "rpc_qps\t%lf\n" |
87 | "default_rpc_flags\t%u\n" , |
88 | safe_div (SB_SUM_LL (rpc_queries_received), uptime), tcp_get_default_rpc_flags () |
89 | ); |
90 | MODULE_STAT_FUNCTION_END |
91 | |
92 | |
93 | |
94 | void (struct tl_query_header *h) { |
95 | if (__sync_fetch_and_add (&h->ref_cnt, -1) > 1) { return; } |
96 | assert (!h->ref_cnt); |
97 | free (h); |
98 | } |
99 | |
100 | struct tl_query_header * (struct tl_query_header *h) { |
101 | __sync_fetch_and_add (&h->ref_cnt, 1); |
102 | return h; |
103 | } |
104 | |
105 | struct tl_query_header * (struct tl_query_header *h_old) { |
106 | struct tl_query_header *h = malloc (sizeof (*h)); |
107 | memcpy (h, h_old, sizeof (*h)); |
108 | h->ref_cnt = 1; |
109 | return h; |
110 | } |
111 | |
112 | int tlf_set_error_format (struct tl_in_state *tlio_in, int errnum, const char *format, ...) { |
113 | if (TL_ERROR) { |
114 | return 0; |
115 | } |
116 | assert (format); |
117 | char s[1000]; |
118 | va_list l; |
119 | va_start (l, format); |
120 | vsnprintf (s, sizeof (s), format, l); |
121 | va_end (l); |
122 | vkprintf (2, "Error %s\n" , s); |
123 | TL_ERRNUM = errnum; |
124 | TL_ERROR = strdup (s); |
125 | return 0; |
126 | } |
127 | |
128 | int tls_set_error_format (struct tl_out_state *tlio_out, int errnum, const char *format, ...) { |
129 | if (tlio_out->error) { |
130 | return 0; |
131 | } |
132 | assert (format); |
133 | char s[1000]; |
134 | va_list l; |
135 | va_start (l, format); |
136 | vsnprintf (s, sizeof (s), format, l); |
137 | va_end (l); |
138 | vkprintf (2, "Error %s\n" , s); |
139 | tlio_out->errnum = errnum; |
140 | tlio_out->error = strdup (s); |
141 | return 0; |
142 | } |
143 | |
144 | /* {{{ Raw msg methods */ |
145 | static inline void __tl_raw_msg_fetch_raw_data (struct tl_in_state *tlio_in, void *buf, int len) { |
146 | assert (rwm_fetch_data (TL_IN_RAW_MSG, buf, len) == len); |
147 | } |
148 | |
149 | static inline void __tl_raw_msg_fetch_move (struct tl_in_state *tlio_in, int len) { |
150 | assert (len >= 0); |
151 | assert (rwm_skip_data (TL_IN_RAW_MSG, len) == len); |
152 | } |
153 | |
154 | static inline void __tl_raw_msg_fetch_lookup (struct tl_in_state *tlio_in, void *buf, int len) { |
155 | assert (rwm_fetch_lookup (TL_IN_RAW_MSG, buf, len) == len); |
156 | } |
157 | |
158 | static inline void __tl_raw_msg_fetch_raw_message (struct tl_in_state *tlio_in, struct raw_message *raw, int len) { |
159 | rwm_split_head (raw, TL_IN_RAW_MSG, len); |
160 | } |
161 | |
162 | static inline void __tl_raw_msg_fetch_lookup_raw_message (struct tl_in_state *tlio_in, struct raw_message *raw, int len) { |
163 | rwm_clone (raw, TL_IN_RAW_MSG); |
164 | rwm_trunc (raw, len); |
165 | } |
166 | |
167 | static inline void __tl_raw_msg_fetch_mark (struct tl_in_state *tlio_in) { |
168 | assert (!TL_IN_MARK); |
169 | struct raw_message *T = malloc (sizeof (*T)); |
170 | rwm_clone (T, TL_IN_RAW_MSG); |
171 | TL_IN_MARK = T; |
172 | TL_IN_MARK_POS = TL_IN_POS; |
173 | } |
174 | |
175 | static inline void __tl_raw_msg_fetch_mark_restore (struct tl_in_state *tlio_in) { |
176 | assert (TL_IN_MARK); |
177 | rwm_free (TL_IN_RAW_MSG); |
178 | *TL_IN_RAW_MSG = *(struct raw_message *)TL_IN_MARK; |
179 | free (TL_IN_MARK); |
180 | TL_IN_MARK = 0; |
181 | int x = TL_IN_POS - TL_IN_MARK_POS; |
182 | TL_IN_POS -= x; |
183 | TL_IN_REMAINING += x; |
184 | } |
185 | |
186 | static inline void __tl_raw_msg_fetch_mark_delete (struct tl_in_state *tlio_in) { |
187 | assert (TL_IN_MARK); |
188 | rwm_free (TL_IN_MARK); |
189 | free (TL_IN_MARK); |
190 | TL_IN_MARK = 0; |
191 | } |
192 | |
193 | static inline void *__tl_raw_msg_store_get_ptr (struct tl_out_state *tlio_out, int len) { |
194 | return rwm_postpone_alloc (TL_OUT_RAW_MSG, len); |
195 | } |
196 | |
197 | static inline void *__tl_raw_msg_store_get_prepend_ptr (struct tl_out_state *tlio_out, int len) { |
198 | return rwm_prepend_alloc (TL_OUT_RAW_MSG, len); |
199 | } |
200 | |
201 | static inline void __tl_raw_msg_store_raw_data (struct tl_out_state *tlio_out, const void *buf, int len) { |
202 | assert (rwm_push_data (TL_OUT_RAW_MSG, buf, len) == len); |
203 | } |
204 | |
205 | static inline void __tl_raw_msg_store_raw_msg (struct tl_out_state *tlio_out, struct raw_message *raw) { |
206 | rwm_union (TL_OUT_RAW_MSG, raw); |
207 | } |
208 | |
209 | static inline void __tl_raw_msg_store_read_back (struct tl_out_state *tlio_out, int len) { |
210 | assert (rwm_fetch_data_back (TL_OUT_RAW_MSG, 0, len) == len); |
211 | } |
212 | |
213 | static inline void __tl_raw_msg_store_read_back_nondestruct (struct tl_out_state *tlio_out, void *buf, int len) { |
214 | struct raw_message r; |
215 | rwm_clone (&r, TL_OUT_RAW_MSG); |
216 | assert (rwm_fetch_data_back (&r, buf, len) == len); |
217 | rwm_free (&r); |
218 | } |
219 | |
220 | static inline void __tl_raw_msg_raw_msg_copy_through (struct tl_in_state *tlio_in, struct tl_out_state *tlio_out, int len, int advance) { |
221 | if (!advance) { |
222 | struct raw_message r; |
223 | rwm_clone (&r, TL_IN_RAW_MSG); |
224 | rwm_trunc (&r, len); |
225 | rwm_union (TL_OUT_RAW_MSG, &r); |
226 | } else { |
227 | struct raw_message r; |
228 | rwm_split_head (&r, TL_IN_RAW_MSG, len); |
229 | rwm_union (TL_OUT_RAW_MSG, &r); |
230 | assert (TL_IN_RAW_MSG->magic == RM_INIT_MAGIC); |
231 | } |
232 | } |
233 | |
234 | static inline void __tl_raw_msg_str_copy_through (struct tl_in_state *tlio_in, struct tl_out_state *tlio_out, int len, int advance) { |
235 | if (advance) { |
236 | assert (rwm_fetch_data (TL_IN_RAW_MSG, TL_OUT_STR, len) == len); |
237 | TL_OUT += len; |
238 | } else { |
239 | assert (rwm_fetch_lookup (TL_IN_RAW_MSG, TL_OUT_STR, len) == len); |
240 | TL_OUT += len; |
241 | } |
242 | } |
243 | |
244 | static inline void __tl_raw_msg_fetch_clear (struct tl_in_state *tlio_in) { |
245 | if (TL_IN_RAW_MSG) { |
246 | rwm_free (TL_IN_RAW_MSG); |
247 | free (TL_IN_RAW_MSG); |
248 | TL_IN = 0; |
249 | } |
250 | } |
251 | |
252 | static inline void __tl_raw_msg_store_clear (struct tl_out_state *tlio_out) { |
253 | if (TL_OUT_RAW_MSG) { |
254 | rwm_free (TL_OUT_RAW_MSG); |
255 | free (TL_OUT_RAW_MSG); |
256 | TL_OUT = 0; |
257 | } |
258 | } |
259 | |
260 | static inline void __tl_raw_msg_store_flush (struct tl_out_state *tlio_out) { |
261 | // struct udp_target *S = (struct udp_target *)TL_OUT_EXTRA; |
262 | assert (TL_OUT_RAW_MSG); |
263 | /*#ifdef TIME_DEBUG |
264 | long long r = rdtsc (); |
265 | #endif*/ |
266 | assert (0); |
267 | /*#ifdef TIME_DEBUG |
268 | MODULE_STAT->tl_udp_flush_rdtsc += (rdtsc () - r); |
269 | MODULE_STAT->tl_udp_flush_cnt ++; |
270 | #endif*/ |
271 | free (TL_OUT_RAW_MSG); |
272 | TL_OUT = 0; |
273 | //udp_target_flush ((struct udp_target *)TL_OUT_EXTRA); |
274 | } |
275 | |
276 | |
277 | /* }}} */ |
278 | |
279 | /* {{{ Tcp raw msg methods */ |
280 | |
281 | static inline void __tl_tcp_raw_msg_store_clear (struct tl_out_state *tlio_out) { |
282 | if (TL_OUT_RAW_MSG) { |
283 | rwm_free (TL_OUT_RAW_MSG); |
284 | free (TL_OUT_RAW_MSG); |
285 | job_decref (JOB_REF_PASS (TL_OUT_EXTRA)); |
286 | TL_OUT = NULL; |
287 | TL_OUT_EXTRA = NULL; |
288 | } |
289 | } |
290 | |
291 | |
292 | static inline void __tl_tcp_raw_msg_store_flush (struct tl_out_state *tlio_out) { |
293 | assert (TL_OUT_RAW_MSG); |
294 | assert (TL_OUT_EXTRA); |
295 | tcp_rpc_conn_send (JOB_REF_PASS (TL_OUT_EXTRA), TL_OUT_RAW_MSG, 4); |
296 | TL_OUT = NULL; |
297 | } |
298 | |
299 | static inline void __tl_tcp_raw_msg_store_flush_unaligned (struct tl_out_state *tlio_out) { |
300 | assert (TL_OUT_RAW_MSG); |
301 | assert (TL_OUT_EXTRA); |
302 | tcp_rpc_conn_send (JOB_REF_PASS (TL_OUT_EXTRA), TL_OUT_RAW_MSG, 12); |
303 | TL_OUT = NULL; |
304 | } |
305 | /* }}} */ |
306 | |
307 | /* {{{ Str methods */ |
308 | static inline void __tl_str_fetch_raw_data (struct tl_in_state *tlio_in, void *buf, int len) { |
309 | memcpy (buf, TL_IN_STR, len); |
310 | TL_IN += len; |
311 | } |
312 | |
313 | static inline void __tl_str_fetch_move (struct tl_in_state *tlio_in, int len) { |
314 | TL_IN += len; |
315 | } |
316 | |
317 | static inline void __tl_str_fetch_lookup (struct tl_in_state *tlio_in, void *buf, int len) { |
318 | memcpy (buf, TL_IN_STR, len); |
319 | } |
320 | |
321 | static inline void __tl_str_fetch_raw_message (struct tl_in_state *tlio_in, struct raw_message *raw, int len) { |
322 | rwm_init (raw, 0); |
323 | rwm_push_data (raw, TL_IN, len); |
324 | TL_IN += len; |
325 | } |
326 | |
327 | static inline void __tl_str_fetch_lookup_raw_message (struct tl_in_state *tlio_in, struct raw_message *raw, int len) { |
328 | rwm_init (raw, 0); |
329 | rwm_push_data (raw, TL_IN, len); |
330 | } |
331 | |
332 | static inline void *__tl_str_store_get_ptr (struct tl_out_state *tlio_out, int len) { |
333 | void *r = TL_OUT_STR; |
334 | TL_OUT += len; |
335 | return r; |
336 | } |
337 | |
338 | static inline void *__tl_str_store_get_prepend_ptr (struct tl_out_state *tlio_out, int len) { |
339 | return TL_OUT_STR - TL_OUT_POS - len; |
340 | } |
341 | |
342 | |
343 | static inline void __tl_str_store_raw_data (struct tl_out_state *tlio_out, const void *buf, int len) { |
344 | memcpy (TL_OUT_STR, buf, len); |
345 | TL_OUT += len; |
346 | } |
347 | |
348 | static inline void __tl_str_store_raw_msg (struct tl_out_state *tlio_out, struct raw_message *raw) { |
349 | int len = raw->total_bytes; |
350 | rwm_fetch_data (raw, TL_OUT_STR, raw->total_bytes); |
351 | TL_OUT += len; |
352 | } |
353 | |
354 | |
355 | static inline void __tl_str_store_read_back (struct tl_out_state *tlio_out, int len) { |
356 | TL_OUT -= len; |
357 | } |
358 | |
359 | static inline void __tl_str_store_read_back_nondestruct (struct tl_out_state *tlio_out, void *buf, int len) { |
360 | memcpy (TL_OUT_STR - len, buf, len); |
361 | } |
362 | |
363 | static inline void __tl_str_raw_msg_copy_through (struct tl_in_state *tlio_in, struct tl_out_state *tlio_out, int len, int advance) { |
364 | assert (rwm_push_data (TL_OUT_RAW_MSG, TL_IN_STR, len) == len); |
365 | if (advance) { |
366 | TL_IN += advance; |
367 | } |
368 | } |
369 | |
370 | static inline void __tl_str_str_copy_through (struct tl_in_state *tlio_in, struct tl_out_state *tlio_out, int len, int advance) { |
371 | memcpy (TL_OUT_STR, TL_IN_STR, len); |
372 | TL_OUT += len; |
373 | if (advance) { |
374 | TL_IN += advance; |
375 | } |
376 | } |
377 | |
378 | static inline void __tl_str_fetch_mark (struct tl_in_state *tlio_in) { |
379 | assert (!TL_IN_MARK); |
380 | TL_IN_MARK = TL_IN_STR; |
381 | TL_IN_MARK_POS = TL_IN_POS; |
382 | } |
383 | |
384 | static inline void __tl_str_fetch_mark_restore (struct tl_in_state *tlio_in) { |
385 | TL_IN = TL_IN_MARK; |
386 | TL_IN_MARK = 0; |
387 | int x = TL_IN_POS - TL_IN_MARK_POS; |
388 | TL_IN_POS -= x; |
389 | TL_IN_REMAINING += x; |
390 | } |
391 | |
392 | static inline void __tl_str_fetch_mark_delete (struct tl_in_state *tlio_in) { |
393 | TL_IN_MARK = 0; |
394 | } |
395 | |
396 | |
397 | static inline void __tl_str_store_clear (struct tl_out_state *tlio_out) { |
398 | TL_OUT = 0; |
399 | } |
400 | |
401 | static inline void __tl_str_store_flush (struct tl_out_state *tlio_out) { |
402 | TL_OUT = 0; |
403 | } |
404 | /* }}} */ |
405 | |
406 | const struct tl_in_methods tl_in_raw_msg_methods = { |
407 | .fetch_raw_data = __tl_raw_msg_fetch_raw_data, |
408 | .fetch_move = __tl_raw_msg_fetch_move, |
409 | .fetch_lookup = __tl_raw_msg_fetch_lookup, |
410 | .fetch_raw_message = __tl_raw_msg_fetch_raw_message, |
411 | .fetch_lookup_raw_message = __tl_raw_msg_fetch_lookup_raw_message, |
412 | .fetch_clear = __tl_raw_msg_fetch_clear, |
413 | .fetch_mark = __tl_raw_msg_fetch_mark, |
414 | .fetch_mark_restore = __tl_raw_msg_fetch_mark_restore, |
415 | .fetch_mark_delete = __tl_raw_msg_fetch_mark_delete, |
416 | .flags = 0, |
417 | }; |
418 | |
419 | const struct tl_in_methods tl_in_str_methods = { |
420 | .fetch_raw_data = __tl_str_fetch_raw_data, |
421 | .fetch_move = __tl_str_fetch_move, |
422 | .fetch_lookup = __tl_str_fetch_lookup, |
423 | .fetch_raw_message = __tl_str_fetch_raw_message, |
424 | .fetch_lookup_raw_message = __tl_str_fetch_lookup_raw_message, |
425 | // .fetch_clear = __tl_str_fetch_clear, |
426 | .fetch_mark = __tl_str_fetch_mark, |
427 | .fetch_mark_restore = __tl_str_fetch_mark_restore, |
428 | .fetch_mark_delete = __tl_str_fetch_mark_delete, |
429 | .flags = 0, |
430 | .prepend_bytes = 0, |
431 | }; |
432 | /* |
433 | const struct tl_out_methods tl_out_conn_simple_methods = { |
434 | .store_get_ptr = __tl_conn_store_get_ptr, |
435 | .store_raw_data = __tl_conn_store_raw_data, |
436 | .store_raw_msg = __tl_conn_store_raw_msg, |
437 | .store_read_back = __tl_conn_store_read_back, |
438 | .store_read_back_nondestruct = __tl_conn_store_read_back_nondestruct, |
439 | // .store_flush = __tl_conn_store_flush, |
440 | .store_clear = __tl_conn_store_clear, |
441 | .copy_through = |
442 | { |
443 | 0, // none |
444 | __tl_str_conn_copy_through, // str |
445 | __tl_raw_msg_conn_copy_through, // raw_msg |
446 | __tl_raw_msg_conn_copy_through, // tcp raw msg |
447 | __tl_raw_msg_conn_copy_through, // gms msg |
448 | __tl_raw_msg_conn_copy_through // gms bcast |
449 | }, |
450 | .flags = TLF_PERMANENT | TLF_DISABLE_PREPEND | TLF_NO_AUTOFLUSH | TLF_NOALIGN, |
451 | .prepend_bytes = 0 |
452 | };*/ |
453 | |
454 | const struct tl_out_methods tl_out_raw_msg_methods = { |
455 | .store_get_ptr = __tl_raw_msg_store_get_ptr, |
456 | .store_get_prepend_ptr = __tl_raw_msg_store_get_prepend_ptr, |
457 | .store_raw_msg = __tl_raw_msg_store_raw_msg, |
458 | .store_raw_data = __tl_raw_msg_store_raw_data, |
459 | .store_read_back = __tl_raw_msg_store_read_back, |
460 | .store_read_back_nondestruct = __tl_raw_msg_store_read_back_nondestruct, |
461 | .store_clear = __tl_raw_msg_store_clear, |
462 | .store_flush = __tl_raw_msg_store_flush, |
463 | .copy_through = |
464 | { |
465 | 0, // none |
466 | __tl_str_raw_msg_copy_through, // str |
467 | __tl_raw_msg_raw_msg_copy_through, // raw_msg |
468 | __tl_raw_msg_raw_msg_copy_through, // tcp conn |
469 | }, |
470 | .flags = TLF_ALLOW_PREPEND |
471 | }; |
472 | |
473 | const struct tl_out_methods tl_out_raw_msg_methods_nosend = { |
474 | .store_get_ptr = __tl_raw_msg_store_get_ptr, |
475 | .store_get_prepend_ptr = __tl_raw_msg_store_get_prepend_ptr, |
476 | .store_raw_msg = __tl_raw_msg_store_raw_msg, |
477 | .store_raw_data = __tl_raw_msg_store_raw_data, |
478 | .store_read_back = __tl_raw_msg_store_read_back, |
479 | .store_read_back_nondestruct = __tl_raw_msg_store_read_back_nondestruct, |
480 | .store_clear = __tl_raw_msg_store_clear, |
481 | .copy_through = |
482 | { |
483 | 0, // none |
484 | __tl_str_raw_msg_copy_through, // str |
485 | __tl_raw_msg_raw_msg_copy_through, // tcp conn |
486 | }, |
487 | .flags = TLF_ALLOW_PREPEND |
488 | }; |
489 | |
490 | const struct tl_out_methods tl_out_tcp_raw_msg_methods = { |
491 | .store_get_ptr = __tl_raw_msg_store_get_ptr, |
492 | .store_get_prepend_ptr = __tl_raw_msg_store_get_prepend_ptr, |
493 | .store_raw_data = __tl_raw_msg_store_raw_data, |
494 | .store_raw_msg = __tl_raw_msg_store_raw_msg, |
495 | .store_read_back = __tl_raw_msg_store_read_back, |
496 | .store_read_back_nondestruct = __tl_raw_msg_store_read_back_nondestruct, |
497 | .store_clear = __tl_tcp_raw_msg_store_clear, |
498 | .store_flush = __tl_tcp_raw_msg_store_flush, |
499 | .copy_through = |
500 | { |
501 | 0, // none |
502 | __tl_str_raw_msg_copy_through, // str |
503 | __tl_raw_msg_raw_msg_copy_through, // raw_msg |
504 | __tl_raw_msg_raw_msg_copy_through, // tcp conn |
505 | }, |
506 | .flags = TLF_ALLOW_PREPEND |
507 | }; |
508 | |
509 | const struct tl_out_methods tl_out_tcp_raw_msg_unaligned_methods = { |
510 | .store_get_ptr = __tl_raw_msg_store_get_ptr, |
511 | .store_get_prepend_ptr = __tl_raw_msg_store_get_prepend_ptr, |
512 | .store_raw_data = __tl_raw_msg_store_raw_data, |
513 | .store_raw_msg = __tl_raw_msg_store_raw_msg, |
514 | .store_read_back = __tl_raw_msg_store_read_back, |
515 | .store_read_back_nondestruct = __tl_raw_msg_store_read_back_nondestruct, |
516 | .store_clear = __tl_tcp_raw_msg_store_clear, |
517 | .store_flush = __tl_tcp_raw_msg_store_flush_unaligned, |
518 | .copy_through = |
519 | { |
520 | 0, // none |
521 | __tl_str_raw_msg_copy_through, // str |
522 | __tl_raw_msg_raw_msg_copy_through, // raw_msg |
523 | __tl_raw_msg_raw_msg_copy_through, // tcp conn |
524 | }, |
525 | .flags = TLF_ALLOW_PREPEND | TLF_NOALIGN |
526 | }; |
527 | |
528 | const struct tl_out_methods tl_out_str_methods = { |
529 | .store_get_ptr = __tl_str_store_get_ptr, |
530 | .store_get_prepend_ptr = __tl_str_store_get_prepend_ptr, |
531 | .store_raw_data = __tl_str_store_raw_data, |
532 | .store_raw_msg = __tl_str_store_raw_msg, |
533 | .store_read_back = __tl_str_store_read_back, |
534 | .store_read_back_nondestruct = __tl_str_store_read_back_nondestruct, |
535 | .store_clear = __tl_str_store_clear, |
536 | .store_flush = __tl_str_store_flush, |
537 | .copy_through = |
538 | { |
539 | 0, // none |
540 | __tl_str_str_copy_through, // str |
541 | __tl_raw_msg_str_copy_through, // raw_msg |
542 | __tl_raw_msg_str_copy_through, // tcp raw_msg |
543 | }, |
544 | .flags = TLF_PERMANENT | TLF_ALLOW_PREPEND, |
545 | .prepend_bytes = 0 |
546 | }; |
547 | |
548 | int tlf_set_error (struct tl_in_state *tlio_in, int errnum, const char *s) { |
549 | assert (s); |
550 | if (TL_ERROR) { |
551 | return 0; |
552 | } |
553 | vkprintf (2, "Error %s\n" , s); |
554 | TL_ERROR = strdup (s); |
555 | TL_ERRNUM = errnum; |
556 | return 0; |
557 | } |
558 | |
559 | int __tl_fetch_init (struct tl_in_state *tlio_in, void *in, void *, enum tl_type type, const struct tl_in_methods *methods, int size) { |
560 | assert (TL_IN_TYPE == tl_type_none); |
561 | assert (in); |
562 | TL_IN_TYPE = type; |
563 | TL_IN = in; |
564 | TL_IN_REMAINING = size; |
565 | TL_IN_POS = 0; |
566 | TL_IN_CUR_FLAGS = 0; |
567 | |
568 | TL_IN_METHODS = methods; |
569 | if (TL_ERROR) { |
570 | free (TL_ERROR); |
571 | TL_ERROR = 0; |
572 | } |
573 | TL_ERRNUM = 0; |
574 | return 0; |
575 | } |
576 | |
577 | int tlf_init_raw_message (struct tl_in_state *tlio_in, struct raw_message *msg, int size, int dup) { |
578 | struct raw_message *r = (struct raw_message *)malloc (sizeof (*r)); |
579 | if (dup == 0) { |
580 | rwm_move (r, msg); |
581 | } else if (dup == 1) { |
582 | rwm_move (r, msg); |
583 | rwm_init (msg, 0); |
584 | } else { |
585 | rwm_clone (r, msg); |
586 | } |
587 | return __tl_fetch_init (tlio_in, r, 0, tl_type_raw_msg, &tl_in_raw_msg_methods, size); |
588 | } |
589 | |
590 | int tlf_init_str (struct tl_in_state *tlio_in, const char *s, int size) { |
591 | return __tl_fetch_init (tlio_in, (void *)s, 0, tl_type_str, &tl_in_str_methods, size); |
592 | } |
593 | |
594 | int tlf_query_flags (struct tl_in_state *tlio_in, struct tl_query_header *) { |
595 | int flags = tl_fetch_int (); |
596 | if (tl_fetch_error ()) { |
597 | return -1; |
598 | } |
599 | if (header->flags & flags) { |
600 | tl_fetch_set_error_format (TL_ERROR_HEADER, "Duplicate flags in header 0x%08x" , header->flags & flags); |
601 | return -1; |
602 | } |
603 | if (flags) { |
604 | tl_fetch_set_error_format (TL_ERROR_HEADER, "Unsupported flags in header 0x%08x" , flags); |
605 | return -1; |
606 | } |
607 | header->flags |= flags; |
608 | |
609 | return 0; |
610 | } |
611 | |
612 | int (struct tl_in_state *tlio_in, struct tl_query_header *) { |
613 | assert (header); |
614 | memset (header, 0, sizeof (*header)); |
615 | int t = tl_fetch_unread (); |
616 | if (TL_IN_METHODS->prepend_bytes) { |
617 | tl_fetch_skip (TL_IN_METHODS->prepend_bytes); |
618 | } |
619 | header->op = tl_fetch_int (); |
620 | header->real_op = header->op; |
621 | header->ref_cnt = 1; |
622 | if (header->op != (int)RPC_INVOKE_REQ && header->op != (int)RPC_INVOKE_KPHP_REQ) { |
623 | tl_fetch_set_error (TL_ERROR_HEADER, "Expected RPC_INVOKE_REQ or RPC_INVOKE_KPHP_REQ" ); |
624 | return -1; |
625 | } |
626 | header->qid = tl_fetch_long (); |
627 | if (header->op == (int)RPC_INVOKE_KPHP_REQ) { |
628 | //tl_fetch_raw_data (header->invoke_kphp_req_extra, 24); |
629 | if (tl_fetch_error ()) { |
630 | return -1; |
631 | } |
632 | MODULE_STAT->rpc_queries_received ++; |
633 | return t - tl_fetch_unread (); |
634 | } |
635 | while (1) { |
636 | int op = tl_fetch_lookup_int (); |
637 | int ok = 1; |
638 | switch (op) { |
639 | case RPC_DEST_ACTOR: |
640 | assert (tl_fetch_int () == (int)RPC_DEST_ACTOR); |
641 | header->actor_id = tl_fetch_long (); |
642 | break; |
643 | case RPC_DEST_ACTOR_FLAGS: |
644 | assert (tl_fetch_int () == (int)RPC_DEST_ACTOR_FLAGS); |
645 | header->actor_id = tl_fetch_long (); |
646 | tlf_query_flags (tlio_in, header); |
647 | break; |
648 | case RPC_DEST_FLAGS: |
649 | assert (tl_fetch_int () == (int)RPC_DEST_FLAGS); |
650 | tlf_query_flags (tlio_in, header); |
651 | break; |
652 | default: |
653 | ok = 0; |
654 | break; |
655 | } |
656 | if (tl_fetch_error ()) { |
657 | return -1; |
658 | } |
659 | if (!ok) { |
660 | MODULE_STAT->rpc_queries_received ++; |
661 | return t - tl_fetch_unread (); |
662 | } |
663 | } |
664 | } |
665 | |
666 | int tlf_query_answer_flags (struct tl_in_state *tlio_in, struct tl_query_header *) { |
667 | int flags = tl_fetch_int (); |
668 | if (tl_fetch_error ()) { |
669 | return -1; |
670 | } |
671 | if (header->flags & flags) { |
672 | tl_fetch_set_error_format (TL_ERROR_HEADER, "Duplicate flags in header 0x%08x" , header->flags & flags); |
673 | return -1; |
674 | } |
675 | if (flags) { |
676 | tl_fetch_set_error_format (TL_ERROR_HEADER, "Unsupported flags in header 0x%08x" , flags); |
677 | return -1; |
678 | } |
679 | header->flags |= flags; |
680 | return 0; |
681 | } |
682 | |
683 | int (struct tl_in_state *tlio_in, struct tl_query_header *) { |
684 | assert (header); |
685 | memset (header, 0, sizeof (*header)); |
686 | int t = tl_fetch_unread (); |
687 | if (TL_IN_METHODS->prepend_bytes) { |
688 | tl_fetch_skip (TL_IN_METHODS->prepend_bytes); |
689 | } |
690 | header->op = tl_fetch_int (); |
691 | header->real_op = header->op; |
692 | header->ref_cnt = 1; |
693 | if (header->op != RPC_REQ_ERROR && header->op != RPC_REQ_RESULT ) { |
694 | tl_fetch_set_error (TL_ERROR_HEADER, "Expected RPC_REQ_ERROR or RPC_REQ_RESULT" ); |
695 | return -1; |
696 | } |
697 | header->qid = tl_fetch_long (); |
698 | while (1) { |
699 | int ok = 1; |
700 | if (header->op != RPC_REQ_ERROR) { |
701 | int op = tl_fetch_lookup_int (); |
702 | switch (op) { |
703 | case RPC_REQ_ERROR: |
704 | assert (tl_fetch_int () == RPC_REQ_ERROR); |
705 | header->op = RPC_REQ_ERROR_WRAPPED; |
706 | tl_fetch_long (); |
707 | break; |
708 | case RPC_REQ_ERROR_WRAPPED: |
709 | header->op = RPC_REQ_ERROR_WRAPPED; |
710 | break; |
711 | case RPC_REQ_RESULT_FLAGS: |
712 | assert (tl_fetch_int () == (int)RPC_REQ_RESULT_FLAGS); |
713 | tlf_query_answer_flags (tlio_in, header); |
714 | break; |
715 | default: |
716 | ok = 0; |
717 | break; |
718 | } |
719 | } else { |
720 | ok = 0; |
721 | } |
722 | if (tl_fetch_error ()) { |
723 | return -1; |
724 | } |
725 | if (!ok) { |
726 | if (header->op == RPC_REQ_ERROR || header->op == RPC_REQ_ERROR_WRAPPED) { |
727 | MODULE_STAT->rpc_answers_error ++; |
728 | } else { |
729 | MODULE_STAT->rpc_answers_received ++; |
730 | } |
731 | return t - tl_fetch_unread (); |
732 | } |
733 | } |
734 | } |
735 | |
736 | static inline int __tl_store_init (struct tl_out_state *tlio_out, void *out, void *, enum tl_type type, const struct tl_out_methods *methods, int size, long long qid) { |
737 | assert (tlio_out); |
738 | assert (!TL_OUT_METHODS); |
739 | |
740 | TL_OUT = out; |
741 | TL_OUT_EXTRA = out_extra; |
742 | if (out) { |
743 | TL_OUT_METHODS = methods; |
744 | TL_OUT_TYPE = type; |
745 | if (type != tl_type_none && !(methods->flags & (TLF_ALLOW_PREPEND | TLF_DISABLE_PREPEND))) { |
746 | TL_OUT_SIZE = (int *) methods->store_get_ptr (tlio_out, methods->prepend_bytes + (qid ? 12 : 0)); |
747 | } |
748 | } else { |
749 | TL_OUT_TYPE = tl_type_none; |
750 | } |
751 | |
752 | TL_OUT_POS = 0; |
753 | TL_OUT_QID = qid; |
754 | TL_OUT_REMAINING = size; |
755 | |
756 | tlio_out->errnum = 0; |
757 | tlio_out->error = NULL; |
758 | |
759 | return 0; |
760 | } |
761 | |
762 | /*int tls_init_simple (struct tl_out_state *tlio_out, connection_job_t c) { |
763 | if (c) { |
764 | TL_OUT_PID = &(RPCS_DATA(c)->remote_pid); |
765 | } else { |
766 | TL_OUT_PID = 0; |
767 | } |
768 | return __tl_store_init (tlio_out, job_incref (c), 0, tl_type_conn, &tl_out_conn_simple_methods, (1 << 27), 0); |
769 | }*/ |
770 | |
771 | int tls_init_raw_msg (struct tl_out_state *tlio_out, struct process_id *pid, long long qid) { |
772 | if (pid) { |
773 | memcpy (&tlio_out->out_pid_buf, pid, 12); |
774 | TL_OUT_PID = &tlio_out->out_pid_buf; |
775 | } else { |
776 | TL_OUT_PID = 0; |
777 | } |
778 | struct raw_message *d = 0; |
779 | if (pid) { |
780 | d = (struct raw_message *)malloc (sizeof (*d)); |
781 | rwm_init (d, 0); |
782 | } |
783 | return __tl_store_init (tlio_out, d, NULL, tl_type_raw_msg, &tl_out_raw_msg_methods, (1 << 27), qid); |
784 | } |
785 | |
786 | int tls_init_tcp_raw_msg (struct tl_out_state *tlio_out, JOB_REF_ARG(c), long long qid) { |
787 | if (c) { |
788 | TL_OUT_PID = &(TCP_RPC_DATA(c)->remote_pid); |
789 | } else { |
790 | TL_OUT_PID = 0; |
791 | } |
792 | struct raw_message *d = 0; |
793 | if (c) { |
794 | d = (struct raw_message *)malloc (sizeof (*d)); |
795 | rwm_init (d, 0); |
796 | } |
797 | return __tl_store_init (tlio_out, d, c, tl_type_tcp_raw_msg, &tl_out_tcp_raw_msg_methods, (1 << 27), qid); |
798 | } |
799 | |
800 | int tls_init_tcp_raw_msg_unaligned (struct tl_out_state *tlio_out, JOB_REF_ARG(c), long long qid) { |
801 | if (c) { |
802 | TL_OUT_PID = &(TCP_RPC_DATA(c)->remote_pid); |
803 | } else { |
804 | TL_OUT_PID = 0; |
805 | } |
806 | struct raw_message *d = 0; |
807 | if (c) { |
808 | d = (struct raw_message *)malloc (sizeof (*d)); |
809 | rwm_init (d, 0); |
810 | } |
811 | return __tl_store_init (tlio_out, d, c, tl_type_tcp_raw_msg, &tl_out_tcp_raw_msg_unaligned_methods, (1 << 27), qid); |
812 | } |
813 | |
814 | int tls_init_str (struct tl_out_state *tlio_out, char *s, long long qid, int size) { |
815 | TL_OUT_PID = 0; |
816 | return __tl_store_init (tlio_out, s, s, tl_type_str, &tl_out_str_methods, size, qid); |
817 | } |
818 | |
819 | int tls_init_raw_msg_nosend (struct tl_out_state *tlio_out) { |
820 | struct raw_message *d = (struct raw_message *)malloc (sizeof (*d)); |
821 | rwm_init (d, 0); |
822 | return __tl_store_init (tlio_out, d, d, tl_type_raw_msg, &tl_out_raw_msg_methods_nosend, (1 << 27), 0); |
823 | } |
824 | /* |
825 | int tls_init_any (struct tl_out_state *tlio_out, enum tl_type type, void *out, long long qid) { |
826 | switch (type) { |
827 | case tl_type_conn: |
828 | return tls_init_connection (tlio_out, (connection_job_t )out, qid); |
829 | case tl_type_tcp_raw_msg: |
830 | return tls_init_tcp_raw_msg (tlio_out, out, qid); |
831 | default: |
832 | assert (0); |
833 | } |
834 | }*/ |
835 | |
836 | int (struct tl_out_state *tlio_out, struct tl_query_header *) { |
837 | assert (tls_check (tlio_out, 0) >= 0); |
838 | assert (header->op == (int)RPC_REQ_ERROR || header->op == (int)RPC_REQ_RESULT || header->op == (int)RPC_INVOKE_REQ || header->op == (int)RPC_REQ_ERROR_WRAPPED); |
839 | if (header->op == (int)RPC_INVOKE_REQ) { |
840 | if (header->flags) { |
841 | tl_store_int (RPC_DEST_ACTOR_FLAGS); |
842 | tl_store_long (header->actor_id); |
843 | tl_store_int (header->flags); |
844 | } else if (header->actor_id) { |
845 | tl_store_int (RPC_DEST_ACTOR); |
846 | tl_store_long (header->actor_id); |
847 | } |
848 | } else if (header->op == RPC_REQ_ERROR_WRAPPED) { |
849 | tl_store_int (RPC_REQ_ERROR); |
850 | tl_store_long (TL_OUT_QID); |
851 | } else if (header->op == RPC_REQ_RESULT) { |
852 | if (header->flags) { |
853 | tl_store_int (RPC_REQ_RESULT_FLAGS); |
854 | tl_store_int (header->flags); |
855 | } |
856 | } |
857 | return 0; |
858 | } |
859 | |
860 | int tls_end_ext (struct tl_out_state *tlio_out, int op) { |
861 | if (TL_OUT_TYPE == tl_type_none) { |
862 | return 0; |
863 | } |
864 | assert (TL_OUT); |
865 | assert (TL_OUT_TYPE); |
866 | if (tlio_out->error) { |
867 | // tl_store_clear (); |
868 | tl_store_clean (); |
869 | vkprintf (1, "tl_store_end: " PID_PRINT_STR" writing error %s, errnum %d, tl.out_pos = %d\n" , PID_TO_PRINT(TL_OUT_PID), tlio_out->error, tlio_out->errnum, TL_OUT_POS); |
870 | //tl_store_clear (); |
871 | tl_store_int (RPC_REQ_ERROR); |
872 | tl_store_long (TL_OUT_QID); |
873 | tl_store_int (tlio_out->errnum); |
874 | tl_store_string0 (tlio_out->error); |
875 | |
876 | MODULE_STAT->rpc_sent_errors ++; |
877 | } else { |
878 | if (op == RPC_REQ_RESULT) { |
879 | MODULE_STAT->rpc_sent_answers ++; |
880 | } else { |
881 | MODULE_STAT->rpc_sent_queries ++; |
882 | } |
883 | } |
884 | if (!(TL_OUT_FLAGS & TLF_NOALIGN)) { |
885 | assert (!(TL_OUT_POS & 3)); |
886 | } |
887 | |
888 | { |
889 | int *p; |
890 | if (TL_OUT_FLAGS & TLF_ALLOW_PREPEND) { |
891 | p = TL_OUT_SIZE = tl_store_get_prepend_ptr (TL_OUT_METHODS->prepend_bytes + (TL_OUT_QID ? 12 : 0)); |
892 | } else { |
893 | p = TL_OUT_SIZE; |
894 | } |
895 | |
896 | if (TL_OUT_QID) { |
897 | assert (op); |
898 | p += (TL_OUT_METHODS->prepend_bytes) / 4; |
899 | *p = op; |
900 | *(long long *)(p + 1) = TL_OUT_QID; |
901 | } |
902 | } |
903 | |
904 | if (TL_OUT_METHODS->store_prefix) { |
905 | TL_OUT_METHODS->store_prefix (tlio_out); |
906 | } |
907 | |
908 | if (!(TL_OUT_FLAGS & TLF_NO_AUTOFLUSH)) { |
909 | TL_OUT_METHODS->store_flush (tlio_out); |
910 | } |
911 | vkprintf (2, "tl_store_end: written %d bytes, qid = %lld, PID = " PID_PRINT_STR "\n" , TL_OUT_POS, TL_OUT_QID, PID_TO_PRINT (TL_OUT_PID)); |
912 | TL_OUT = 0; |
913 | TL_OUT_TYPE = tl_type_none; |
914 | TL_OUT_METHODS = 0; |
915 | TL_OUT_EXTRA = 0; |
916 | return 0; |
917 | } |
918 | |
919 | int tls_init (struct tl_out_state *tlio_out, enum tl_type type, struct process_id *pid, long long qid) { |
920 | switch (type) { |
921 | case tl_type_raw_msg: |
922 | { |
923 | tls_init_raw_msg (tlio_out, pid, qid); |
924 | return 1; |
925 | } |
926 | case tl_type_tcp_raw_msg: |
927 | { |
928 | connection_job_t d = rpc_target_choose_connection (rpc_target_lookup (pid), pid); |
929 | if (d) { |
930 | vkprintf (2, "%s: Good connection " PID_PRINT_STR "\n" , __func__, PID_TO_PRINT (pid)); |
931 | tls_init_tcp_raw_msg (tlio_out, JOB_REF_PASS (d), qid); |
932 | return 1; |
933 | } else { |
934 | vkprintf (2, "%s: Bad connection " PID_PRINT_STR "\n" , __func__, PID_TO_PRINT (pid)); |
935 | return -1; |
936 | } |
937 | } |
938 | case tl_type_none: |
939 | vkprintf (2, "Trying to tl_init_store() with type tl_type_none, qid=%lld\n" , qid); |
940 | return -1; |
941 | default: |
942 | fprintf (stderr, "type = %d\n" , type); |
943 | assert (0); |
944 | return 0; |
945 | } |
946 | } |
947 | |
948 | |
949 | struct tl_in_state *tl_in_state_alloc (void) { |
950 | MODULE_STAT->tl_in_allocated ++; |
951 | return calloc (sizeof (struct tl_in_state), 1); |
952 | } |
953 | |
954 | void tl_in_state_free (struct tl_in_state *tlio_in) { |
955 | MODULE_STAT->tl_in_allocated --; |
956 | if (tlio_in->in_methods && tlio_in->in_methods->fetch_clear) { |
957 | tlio_in->in_methods->fetch_clear (tlio_in); |
958 | } |
959 | if (tlio_in->error) { |
960 | free (PTR_MOVE (tlio_in->error)); |
961 | } |
962 | free (tlio_in); |
963 | } |
964 | |
965 | struct tl_out_state *tl_out_state_alloc (void) { |
966 | MODULE_STAT->tl_out_allocated ++; |
967 | return calloc (sizeof (struct tl_out_state), 1); |
968 | } |
969 | |
970 | void tl_out_state_free (struct tl_out_state *tlio_out) { |
971 | MODULE_STAT->tl_out_allocated --; |
972 | if (tlio_out->out_methods && tlio_out->out_methods->store_clear) { |
973 | tlio_out->out_methods->store_clear (tlio_out); |
974 | } |
975 | if (tlio_out->error) { |
976 | free (PTR_MOVE (tlio_out->error)); |
977 | } |
978 | free (tlio_out); |
979 | } |
980 | |