1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2012-2013 Vkontakte Ltd
18 2012-2013 Vitaliy Valtman
19
20 Copyright 2014 Telegram Messenger Inc
21 2014 Vitaly Valtman
22*/
23
24#include "common/tl-parse.h"
25
26#include <assert.h>
27#include <stdarg.h>
28#include <stdio.h>
29#include <stdlib.h>
30#include <string.h>
31#include <sys/time.h>
32#include <errno.h>
33
34#include "net/net-events.h"
35#include "net/net-msg.h"
36#include "net/net-msg-buffers.h"
37#include "net/net-rpc-targets.h"
38#include "net/net-tcp-connections.h"
39#include "net/net-tcp-rpc-common.h"
40#include "net/net-tcp-rpc-server.h"
41
42
43#include "common/cpuid.h"
44#include "common/kprintf.h"
45#include "common/server-functions.h"
46
47#include "vv/vv-io.h"
48#include "vv/vv-tree.h"
49
50//#include "auto/TL/common.h"
51//#include "auto/TL/tl-names.h"
52
53#include "engine/engine.h"
54#include "jobs/jobs.h"
55#include "common/common-stats.h"
56
57#define MODULE tl_parse
58
59MODULE_STAT_TYPE {
60 long long rpc_queries_received, rpc_answers_error, rpc_answers_received;
61 long long rpc_sent_errors, rpc_sent_answers, rpc_sent_queries;
62 int tl_in_allocated, tl_out_allocated;
63/* #ifdef TIME_DEBUG
64 long long tl_udp_flush_rdtsc;
65 long long tl_udp_flush_cnt;
66 #endif*/
67};
68
69MODULE_INIT
70
71MODULE_STAT_FUNCTION
72 double uptime = time (0) - start_time;
73 SB_SUM_ONE_LL (rpc_queries_received);
74 SB_SUM_ONE_LL (rpc_answers_error);
75 SB_SUM_ONE_LL (rpc_answers_received);
76 SB_SUM_ONE_LL (rpc_sent_errors);
77 SB_SUM_ONE_LL (rpc_sent_answers);
78 SB_SUM_ONE_LL (rpc_sent_queries);
79 SB_SUM_ONE_I (tl_in_allocated);
80 SB_SUM_ONE_I (tl_out_allocated);
81 /*#ifdef TIME_DEBUG
82 SB_SUM_ONE_LL (tl_udp_flush_rdtsc);
83 SB_SUM_ONE_LL (tl_udp_flush_cnt);
84 #endif*/
85 sb_printf (sb,
86 "rpc_qps\t%lf\n"
87 "default_rpc_flags\t%u\n",
88 safe_div (SB_SUM_LL (rpc_queries_received), uptime), tcp_get_default_rpc_flags ()
89 );
90MODULE_STAT_FUNCTION_END
91
92
93
94void tl_query_header_delete (struct tl_query_header *h) {
95 if (__sync_fetch_and_add (&h->ref_cnt, -1) > 1) { return; }
96 assert (!h->ref_cnt);
97 free (h);
98}
99
100struct tl_query_header *tl_query_header_dup (struct tl_query_header *h) {
101 __sync_fetch_and_add (&h->ref_cnt, 1);
102 return h;
103}
104
105struct tl_query_header *tl_query_header_clone (struct tl_query_header *h_old) {
106 struct tl_query_header *h = malloc (sizeof (*h));
107 memcpy (h, h_old, sizeof (*h));
108 h->ref_cnt = 1;
109 return h;
110}
111
112int tlf_set_error_format (struct tl_in_state *tlio_in, int errnum, const char *format, ...) {
113 if (TL_ERROR) {
114 return 0;
115 }
116 assert (format);
117 char s[1000];
118 va_list l;
119 va_start (l, format);
120 vsnprintf (s, sizeof (s), format, l);
121 va_end (l);
122 vkprintf (2, "Error %s\n", s);
123 TL_ERRNUM = errnum;
124 TL_ERROR = strdup (s);
125 return 0;
126}
127
128int tls_set_error_format (struct tl_out_state *tlio_out, int errnum, const char *format, ...) {
129 if (tlio_out->error) {
130 return 0;
131 }
132 assert (format);
133 char s[1000];
134 va_list l;
135 va_start (l, format);
136 vsnprintf (s, sizeof (s), format, l);
137 va_end (l);
138 vkprintf (2, "Error %s\n", s);
139 tlio_out->errnum = errnum;
140 tlio_out->error = strdup (s);
141 return 0;
142}
143
144/* {{{ Raw msg methods */
145static inline void __tl_raw_msg_fetch_raw_data (struct tl_in_state *tlio_in, void *buf, int len) {
146 assert (rwm_fetch_data (TL_IN_RAW_MSG, buf, len) == len);
147}
148
149static inline void __tl_raw_msg_fetch_move (struct tl_in_state *tlio_in, int len) {
150 assert (len >= 0);
151 assert (rwm_skip_data (TL_IN_RAW_MSG, len) == len);
152}
153
154static inline void __tl_raw_msg_fetch_lookup (struct tl_in_state *tlio_in, void *buf, int len) {
155 assert (rwm_fetch_lookup (TL_IN_RAW_MSG, buf, len) == len);
156}
157
158static inline void __tl_raw_msg_fetch_raw_message (struct tl_in_state *tlio_in, struct raw_message *raw, int len) {
159 rwm_split_head (raw, TL_IN_RAW_MSG, len);
160}
161
162static inline void __tl_raw_msg_fetch_lookup_raw_message (struct tl_in_state *tlio_in, struct raw_message *raw, int len) {
163 rwm_clone (raw, TL_IN_RAW_MSG);
164 rwm_trunc (raw, len);
165}
166
167static inline void __tl_raw_msg_fetch_mark (struct tl_in_state *tlio_in) {
168 assert (!TL_IN_MARK);
169 struct raw_message *T = malloc (sizeof (*T));
170 rwm_clone (T, TL_IN_RAW_MSG);
171 TL_IN_MARK = T;
172 TL_IN_MARK_POS = TL_IN_POS;
173}
174
175static inline void __tl_raw_msg_fetch_mark_restore (struct tl_in_state *tlio_in) {
176 assert (TL_IN_MARK);
177 rwm_free (TL_IN_RAW_MSG);
178 *TL_IN_RAW_MSG = *(struct raw_message *)TL_IN_MARK;
179 free (TL_IN_MARK);
180 TL_IN_MARK = 0;
181 int x = TL_IN_POS - TL_IN_MARK_POS;
182 TL_IN_POS -= x;
183 TL_IN_REMAINING += x;
184}
185
186static inline void __tl_raw_msg_fetch_mark_delete (struct tl_in_state *tlio_in) {
187 assert (TL_IN_MARK);
188 rwm_free (TL_IN_MARK);
189 free (TL_IN_MARK);
190 TL_IN_MARK = 0;
191}
192
193static inline void *__tl_raw_msg_store_get_ptr (struct tl_out_state *tlio_out, int len) {
194 return rwm_postpone_alloc (TL_OUT_RAW_MSG, len);
195}
196
197static inline void *__tl_raw_msg_store_get_prepend_ptr (struct tl_out_state *tlio_out, int len) {
198 return rwm_prepend_alloc (TL_OUT_RAW_MSG, len);
199}
200
201static inline void __tl_raw_msg_store_raw_data (struct tl_out_state *tlio_out, const void *buf, int len) {
202 assert (rwm_push_data (TL_OUT_RAW_MSG, buf, len) == len);
203}
204
205static inline void __tl_raw_msg_store_raw_msg (struct tl_out_state *tlio_out, struct raw_message *raw) {
206 rwm_union (TL_OUT_RAW_MSG, raw);
207}
208
209static inline void __tl_raw_msg_store_read_back (struct tl_out_state *tlio_out, int len) {
210 assert (rwm_fetch_data_back (TL_OUT_RAW_MSG, 0, len) == len);
211}
212
213static inline void __tl_raw_msg_store_read_back_nondestruct (struct tl_out_state *tlio_out, void *buf, int len) {
214 struct raw_message r;
215 rwm_clone (&r, TL_OUT_RAW_MSG);
216 assert (rwm_fetch_data_back (&r, buf, len) == len);
217 rwm_free (&r);
218}
219
220static inline void __tl_raw_msg_raw_msg_copy_through (struct tl_in_state *tlio_in, struct tl_out_state *tlio_out, int len, int advance) {
221 if (!advance) {
222 struct raw_message r;
223 rwm_clone (&r, TL_IN_RAW_MSG);
224 rwm_trunc (&r, len);
225 rwm_union (TL_OUT_RAW_MSG, &r);
226 } else {
227 struct raw_message r;
228 rwm_split_head (&r, TL_IN_RAW_MSG, len);
229 rwm_union (TL_OUT_RAW_MSG, &r);
230 assert (TL_IN_RAW_MSG->magic == RM_INIT_MAGIC);
231 }
232}
233
234static inline void __tl_raw_msg_str_copy_through (struct tl_in_state *tlio_in, struct tl_out_state *tlio_out, int len, int advance) {
235 if (advance) {
236 assert (rwm_fetch_data (TL_IN_RAW_MSG, TL_OUT_STR, len) == len);
237 TL_OUT += len;
238 } else {
239 assert (rwm_fetch_lookup (TL_IN_RAW_MSG, TL_OUT_STR, len) == len);
240 TL_OUT += len;
241 }
242}
243
244static inline void __tl_raw_msg_fetch_clear (struct tl_in_state *tlio_in) {
245 if (TL_IN_RAW_MSG) {
246 rwm_free (TL_IN_RAW_MSG);
247 free (TL_IN_RAW_MSG);
248 TL_IN = 0;
249 }
250}
251
252static inline void __tl_raw_msg_store_clear (struct tl_out_state *tlio_out) {
253 if (TL_OUT_RAW_MSG) {
254 rwm_free (TL_OUT_RAW_MSG);
255 free (TL_OUT_RAW_MSG);
256 TL_OUT = 0;
257 }
258}
259
260static inline void __tl_raw_msg_store_flush (struct tl_out_state *tlio_out) {
261// struct udp_target *S = (struct udp_target *)TL_OUT_EXTRA;
262 assert (TL_OUT_RAW_MSG);
263 /*#ifdef TIME_DEBUG
264 long long r = rdtsc ();
265 #endif*/
266 assert (0);
267 /*#ifdef TIME_DEBUG
268 MODULE_STAT->tl_udp_flush_rdtsc += (rdtsc () - r);
269 MODULE_STAT->tl_udp_flush_cnt ++;
270 #endif*/
271 free (TL_OUT_RAW_MSG);
272 TL_OUT = 0;
273 //udp_target_flush ((struct udp_target *)TL_OUT_EXTRA);
274}
275
276
277/* }}} */
278
279/* {{{ Tcp raw msg methods */
280
281static inline void __tl_tcp_raw_msg_store_clear (struct tl_out_state *tlio_out) {
282 if (TL_OUT_RAW_MSG) {
283 rwm_free (TL_OUT_RAW_MSG);
284 free (TL_OUT_RAW_MSG);
285 job_decref (JOB_REF_PASS (TL_OUT_EXTRA));
286 TL_OUT = NULL;
287 TL_OUT_EXTRA = NULL;
288 }
289}
290
291
292static inline void __tl_tcp_raw_msg_store_flush (struct tl_out_state *tlio_out) {
293 assert (TL_OUT_RAW_MSG);
294 assert (TL_OUT_EXTRA);
295 tcp_rpc_conn_send (JOB_REF_PASS (TL_OUT_EXTRA), TL_OUT_RAW_MSG, 4);
296 TL_OUT = NULL;
297}
298
299static inline void __tl_tcp_raw_msg_store_flush_unaligned (struct tl_out_state *tlio_out) {
300 assert (TL_OUT_RAW_MSG);
301 assert (TL_OUT_EXTRA);
302 tcp_rpc_conn_send (JOB_REF_PASS (TL_OUT_EXTRA), TL_OUT_RAW_MSG, 12);
303 TL_OUT = NULL;
304}
305/* }}} */
306
307/* {{{ Str methods */
308static inline void __tl_str_fetch_raw_data (struct tl_in_state *tlio_in, void *buf, int len) {
309 memcpy (buf, TL_IN_STR, len);
310 TL_IN += len;
311}
312
313static inline void __tl_str_fetch_move (struct tl_in_state *tlio_in, int len) {
314 TL_IN += len;
315}
316
317static inline void __tl_str_fetch_lookup (struct tl_in_state *tlio_in, void *buf, int len) {
318 memcpy (buf, TL_IN_STR, len);
319}
320
321static inline void __tl_str_fetch_raw_message (struct tl_in_state *tlio_in, struct raw_message *raw, int len) {
322 rwm_init (raw, 0);
323 rwm_push_data (raw, TL_IN, len);
324 TL_IN += len;
325}
326
327static inline void __tl_str_fetch_lookup_raw_message (struct tl_in_state *tlio_in, struct raw_message *raw, int len) {
328 rwm_init (raw, 0);
329 rwm_push_data (raw, TL_IN, len);
330}
331
332static inline void *__tl_str_store_get_ptr (struct tl_out_state *tlio_out, int len) {
333 void *r = TL_OUT_STR;
334 TL_OUT += len;
335 return r;
336}
337
338static inline void *__tl_str_store_get_prepend_ptr (struct tl_out_state *tlio_out, int len) {
339 return TL_OUT_STR - TL_OUT_POS - len;
340}
341
342
343static inline void __tl_str_store_raw_data (struct tl_out_state *tlio_out, const void *buf, int len) {
344 memcpy (TL_OUT_STR, buf, len);
345 TL_OUT += len;
346}
347
348static inline void __tl_str_store_raw_msg (struct tl_out_state *tlio_out, struct raw_message *raw) {
349 int len = raw->total_bytes;
350 rwm_fetch_data (raw, TL_OUT_STR, raw->total_bytes);
351 TL_OUT += len;
352}
353
354
355static inline void __tl_str_store_read_back (struct tl_out_state *tlio_out, int len) {
356 TL_OUT -= len;
357}
358
359static inline void __tl_str_store_read_back_nondestruct (struct tl_out_state *tlio_out, void *buf, int len) {
360 memcpy (TL_OUT_STR - len, buf, len);
361}
362
363static inline void __tl_str_raw_msg_copy_through (struct tl_in_state *tlio_in, struct tl_out_state *tlio_out, int len, int advance) {
364 assert (rwm_push_data (TL_OUT_RAW_MSG, TL_IN_STR, len) == len);
365 if (advance) {
366 TL_IN += advance;
367 }
368}
369
370static inline void __tl_str_str_copy_through (struct tl_in_state *tlio_in, struct tl_out_state *tlio_out, int len, int advance) {
371 memcpy (TL_OUT_STR, TL_IN_STR, len);
372 TL_OUT += len;
373 if (advance) {
374 TL_IN += advance;
375 }
376}
377
378static inline void __tl_str_fetch_mark (struct tl_in_state *tlio_in) {
379 assert (!TL_IN_MARK);
380 TL_IN_MARK = TL_IN_STR;
381 TL_IN_MARK_POS = TL_IN_POS;
382}
383
384static inline void __tl_str_fetch_mark_restore (struct tl_in_state *tlio_in) {
385 TL_IN = TL_IN_MARK;
386 TL_IN_MARK = 0;
387 int x = TL_IN_POS - TL_IN_MARK_POS;
388 TL_IN_POS -= x;
389 TL_IN_REMAINING += x;
390}
391
392static inline void __tl_str_fetch_mark_delete (struct tl_in_state *tlio_in) {
393 TL_IN_MARK = 0;
394}
395
396
397static inline void __tl_str_store_clear (struct tl_out_state *tlio_out) {
398 TL_OUT = 0;
399}
400
401static inline void __tl_str_store_flush (struct tl_out_state *tlio_out) {
402 TL_OUT = 0;
403}
404/* }}} */
405
406const struct tl_in_methods tl_in_raw_msg_methods = {
407 .fetch_raw_data = __tl_raw_msg_fetch_raw_data,
408 .fetch_move = __tl_raw_msg_fetch_move,
409 .fetch_lookup = __tl_raw_msg_fetch_lookup,
410 .fetch_raw_message = __tl_raw_msg_fetch_raw_message,
411 .fetch_lookup_raw_message = __tl_raw_msg_fetch_lookup_raw_message,
412 .fetch_clear = __tl_raw_msg_fetch_clear,
413 .fetch_mark = __tl_raw_msg_fetch_mark,
414 .fetch_mark_restore = __tl_raw_msg_fetch_mark_restore,
415 .fetch_mark_delete = __tl_raw_msg_fetch_mark_delete,
416 .flags = 0,
417};
418
419const struct tl_in_methods tl_in_str_methods = {
420 .fetch_raw_data = __tl_str_fetch_raw_data,
421 .fetch_move = __tl_str_fetch_move,
422 .fetch_lookup = __tl_str_fetch_lookup,
423 .fetch_raw_message = __tl_str_fetch_raw_message,
424 .fetch_lookup_raw_message = __tl_str_fetch_lookup_raw_message,
425// .fetch_clear = __tl_str_fetch_clear,
426 .fetch_mark = __tl_str_fetch_mark,
427 .fetch_mark_restore = __tl_str_fetch_mark_restore,
428 .fetch_mark_delete = __tl_str_fetch_mark_delete,
429 .flags = 0,
430 .prepend_bytes = 0,
431};
432/*
433const struct tl_out_methods tl_out_conn_simple_methods = {
434 .store_get_ptr = __tl_conn_store_get_ptr,
435 .store_raw_data = __tl_conn_store_raw_data,
436 .store_raw_msg = __tl_conn_store_raw_msg,
437 .store_read_back = __tl_conn_store_read_back,
438 .store_read_back_nondestruct = __tl_conn_store_read_back_nondestruct,
439 // .store_flush = __tl_conn_store_flush,
440 .store_clear = __tl_conn_store_clear,
441 .copy_through =
442 {
443 0, // none
444 __tl_str_conn_copy_through, // str
445 __tl_raw_msg_conn_copy_through, // raw_msg
446 __tl_raw_msg_conn_copy_through, // tcp raw msg
447 __tl_raw_msg_conn_copy_through, // gms msg
448 __tl_raw_msg_conn_copy_through // gms bcast
449 },
450 .flags = TLF_PERMANENT | TLF_DISABLE_PREPEND | TLF_NO_AUTOFLUSH | TLF_NOALIGN,
451 .prepend_bytes = 0
452};*/
453
454const struct tl_out_methods tl_out_raw_msg_methods = {
455 .store_get_ptr = __tl_raw_msg_store_get_ptr,
456 .store_get_prepend_ptr = __tl_raw_msg_store_get_prepend_ptr,
457 .store_raw_msg = __tl_raw_msg_store_raw_msg,
458 .store_raw_data = __tl_raw_msg_store_raw_data,
459 .store_read_back = __tl_raw_msg_store_read_back,
460 .store_read_back_nondestruct = __tl_raw_msg_store_read_back_nondestruct,
461 .store_clear = __tl_raw_msg_store_clear,
462 .store_flush = __tl_raw_msg_store_flush,
463 .copy_through =
464 {
465 0, // none
466 __tl_str_raw_msg_copy_through, // str
467 __tl_raw_msg_raw_msg_copy_through, // raw_msg
468 __tl_raw_msg_raw_msg_copy_through, // tcp conn
469 },
470 .flags = TLF_ALLOW_PREPEND
471};
472
473const struct tl_out_methods tl_out_raw_msg_methods_nosend = {
474 .store_get_ptr = __tl_raw_msg_store_get_ptr,
475 .store_get_prepend_ptr = __tl_raw_msg_store_get_prepend_ptr,
476 .store_raw_msg = __tl_raw_msg_store_raw_msg,
477 .store_raw_data = __tl_raw_msg_store_raw_data,
478 .store_read_back = __tl_raw_msg_store_read_back,
479 .store_read_back_nondestruct = __tl_raw_msg_store_read_back_nondestruct,
480 .store_clear = __tl_raw_msg_store_clear,
481 .copy_through =
482 {
483 0, // none
484 __tl_str_raw_msg_copy_through, // str
485 __tl_raw_msg_raw_msg_copy_through, // tcp conn
486 },
487 .flags = TLF_ALLOW_PREPEND
488};
489
490const struct tl_out_methods tl_out_tcp_raw_msg_methods = {
491 .store_get_ptr = __tl_raw_msg_store_get_ptr,
492 .store_get_prepend_ptr = __tl_raw_msg_store_get_prepend_ptr,
493 .store_raw_data = __tl_raw_msg_store_raw_data,
494 .store_raw_msg = __tl_raw_msg_store_raw_msg,
495 .store_read_back = __tl_raw_msg_store_read_back,
496 .store_read_back_nondestruct = __tl_raw_msg_store_read_back_nondestruct,
497 .store_clear = __tl_tcp_raw_msg_store_clear,
498 .store_flush = __tl_tcp_raw_msg_store_flush,
499 .copy_through =
500 {
501 0, // none
502 __tl_str_raw_msg_copy_through, // str
503 __tl_raw_msg_raw_msg_copy_through, // raw_msg
504 __tl_raw_msg_raw_msg_copy_through, // tcp conn
505 },
506 .flags = TLF_ALLOW_PREPEND
507};
508
509const struct tl_out_methods tl_out_tcp_raw_msg_unaligned_methods = {
510 .store_get_ptr = __tl_raw_msg_store_get_ptr,
511 .store_get_prepend_ptr = __tl_raw_msg_store_get_prepend_ptr,
512 .store_raw_data = __tl_raw_msg_store_raw_data,
513 .store_raw_msg = __tl_raw_msg_store_raw_msg,
514 .store_read_back = __tl_raw_msg_store_read_back,
515 .store_read_back_nondestruct = __tl_raw_msg_store_read_back_nondestruct,
516 .store_clear = __tl_tcp_raw_msg_store_clear,
517 .store_flush = __tl_tcp_raw_msg_store_flush_unaligned,
518 .copy_through =
519 {
520 0, // none
521 __tl_str_raw_msg_copy_through, // str
522 __tl_raw_msg_raw_msg_copy_through, // raw_msg
523 __tl_raw_msg_raw_msg_copy_through, // tcp conn
524 },
525 .flags = TLF_ALLOW_PREPEND | TLF_NOALIGN
526};
527
528const struct tl_out_methods tl_out_str_methods = {
529 .store_get_ptr = __tl_str_store_get_ptr,
530 .store_get_prepend_ptr = __tl_str_store_get_prepend_ptr,
531 .store_raw_data = __tl_str_store_raw_data,
532 .store_raw_msg = __tl_str_store_raw_msg,
533 .store_read_back = __tl_str_store_read_back,
534 .store_read_back_nondestruct = __tl_str_store_read_back_nondestruct,
535 .store_clear = __tl_str_store_clear,
536 .store_flush = __tl_str_store_flush,
537 .copy_through =
538 {
539 0, // none
540 __tl_str_str_copy_through, // str
541 __tl_raw_msg_str_copy_through, // raw_msg
542 __tl_raw_msg_str_copy_through, // tcp raw_msg
543 },
544 .flags = TLF_PERMANENT | TLF_ALLOW_PREPEND,
545 .prepend_bytes = 0
546};
547
548int tlf_set_error (struct tl_in_state *tlio_in, int errnum, const char *s) {
549 assert (s);
550 if (TL_ERROR) {
551 return 0;
552 }
553 vkprintf (2, "Error %s\n", s);
554 TL_ERROR = strdup (s);
555 TL_ERRNUM = errnum;
556 return 0;
557}
558
559int __tl_fetch_init (struct tl_in_state *tlio_in, void *in, void *in_extra, enum tl_type type, const struct tl_in_methods *methods, int size) {
560 assert (TL_IN_TYPE == tl_type_none);
561 assert (in);
562 TL_IN_TYPE = type;
563 TL_IN = in;
564 TL_IN_REMAINING = size;
565 TL_IN_POS = 0;
566 TL_IN_CUR_FLAGS = 0;
567
568 TL_IN_METHODS = methods;
569 if (TL_ERROR) {
570 free (TL_ERROR);
571 TL_ERROR = 0;
572 }
573 TL_ERRNUM = 0;
574 return 0;
575}
576
577int tlf_init_raw_message (struct tl_in_state *tlio_in, struct raw_message *msg, int size, int dup) {
578 struct raw_message *r = (struct raw_message *)malloc (sizeof (*r));
579 if (dup == 0) {
580 rwm_move (r, msg);
581 } else if (dup == 1) {
582 rwm_move (r, msg);
583 rwm_init (msg, 0);
584 } else {
585 rwm_clone (r, msg);
586 }
587 return __tl_fetch_init (tlio_in, r, 0, tl_type_raw_msg, &tl_in_raw_msg_methods, size);
588}
589
590int tlf_init_str (struct tl_in_state *tlio_in, const char *s, int size) {
591 return __tl_fetch_init (tlio_in, (void *)s, 0, tl_type_str, &tl_in_str_methods, size);
592}
593
594int tlf_query_flags (struct tl_in_state *tlio_in, struct tl_query_header *header) {
595 int flags = tl_fetch_int ();
596 if (tl_fetch_error ()) {
597 return -1;
598 }
599 if (header->flags & flags) {
600 tl_fetch_set_error_format (TL_ERROR_HEADER, "Duplicate flags in header 0x%08x", header->flags & flags);
601 return -1;
602 }
603 if (flags) {
604 tl_fetch_set_error_format (TL_ERROR_HEADER, "Unsupported flags in header 0x%08x", flags);
605 return -1;
606 }
607 header->flags |= flags;
608
609 return 0;
610}
611
612int tlf_query_header (struct tl_in_state *tlio_in, struct tl_query_header *header) {
613 assert (header);
614 memset (header, 0, sizeof (*header));
615 int t = tl_fetch_unread ();
616 if (TL_IN_METHODS->prepend_bytes) {
617 tl_fetch_skip (TL_IN_METHODS->prepend_bytes);
618 }
619 header->op = tl_fetch_int ();
620 header->real_op = header->op;
621 header->ref_cnt = 1;
622 if (header->op != (int)RPC_INVOKE_REQ && header->op != (int)RPC_INVOKE_KPHP_REQ) {
623 tl_fetch_set_error (TL_ERROR_HEADER, "Expected RPC_INVOKE_REQ or RPC_INVOKE_KPHP_REQ");
624 return -1;
625 }
626 header->qid = tl_fetch_long ();
627 if (header->op == (int)RPC_INVOKE_KPHP_REQ) {
628 //tl_fetch_raw_data (header->invoke_kphp_req_extra, 24);
629 if (tl_fetch_error ()) {
630 return -1;
631 }
632 MODULE_STAT->rpc_queries_received ++;
633 return t - tl_fetch_unread ();
634 }
635 while (1) {
636 int op = tl_fetch_lookup_int ();
637 int ok = 1;
638 switch (op) {
639 case RPC_DEST_ACTOR:
640 assert (tl_fetch_int () == (int)RPC_DEST_ACTOR);
641 header->actor_id = tl_fetch_long ();
642 break;
643 case RPC_DEST_ACTOR_FLAGS:
644 assert (tl_fetch_int () == (int)RPC_DEST_ACTOR_FLAGS);
645 header->actor_id = tl_fetch_long ();
646 tlf_query_flags (tlio_in, header);
647 break;
648 case RPC_DEST_FLAGS:
649 assert (tl_fetch_int () == (int)RPC_DEST_FLAGS);
650 tlf_query_flags (tlio_in, header);
651 break;
652 default:
653 ok = 0;
654 break;
655 }
656 if (tl_fetch_error ()) {
657 return -1;
658 }
659 if (!ok) {
660 MODULE_STAT->rpc_queries_received ++;
661 return t - tl_fetch_unread ();
662 }
663 }
664}
665
666int tlf_query_answer_flags (struct tl_in_state *tlio_in, struct tl_query_header *header) {
667 int flags = tl_fetch_int ();
668 if (tl_fetch_error ()) {
669 return -1;
670 }
671 if (header->flags & flags) {
672 tl_fetch_set_error_format (TL_ERROR_HEADER, "Duplicate flags in header 0x%08x", header->flags & flags);
673 return -1;
674 }
675 if (flags) {
676 tl_fetch_set_error_format (TL_ERROR_HEADER, "Unsupported flags in header 0x%08x", flags);
677 return -1;
678 }
679 header->flags |= flags;
680 return 0;
681}
682
683int tlf_query_answer_header (struct tl_in_state *tlio_in, struct tl_query_header *header) {
684 assert (header);
685 memset (header, 0, sizeof (*header));
686 int t = tl_fetch_unread ();
687 if (TL_IN_METHODS->prepend_bytes) {
688 tl_fetch_skip (TL_IN_METHODS->prepend_bytes);
689 }
690 header->op = tl_fetch_int ();
691 header->real_op = header->op;
692 header->ref_cnt = 1;
693 if (header->op != RPC_REQ_ERROR && header->op != RPC_REQ_RESULT ) {
694 tl_fetch_set_error (TL_ERROR_HEADER, "Expected RPC_REQ_ERROR or RPC_REQ_RESULT");
695 return -1;
696 }
697 header->qid = tl_fetch_long ();
698 while (1) {
699 int ok = 1;
700 if (header->op != RPC_REQ_ERROR) {
701 int op = tl_fetch_lookup_int ();
702 switch (op) {
703 case RPC_REQ_ERROR:
704 assert (tl_fetch_int () == RPC_REQ_ERROR);
705 header->op = RPC_REQ_ERROR_WRAPPED;
706 tl_fetch_long ();
707 break;
708 case RPC_REQ_ERROR_WRAPPED:
709 header->op = RPC_REQ_ERROR_WRAPPED;
710 break;
711 case RPC_REQ_RESULT_FLAGS:
712 assert (tl_fetch_int () == (int)RPC_REQ_RESULT_FLAGS);
713 tlf_query_answer_flags (tlio_in, header);
714 break;
715 default:
716 ok = 0;
717 break;
718 }
719 } else {
720 ok = 0;
721 }
722 if (tl_fetch_error ()) {
723 return -1;
724 }
725 if (!ok) {
726 if (header->op == RPC_REQ_ERROR || header->op == RPC_REQ_ERROR_WRAPPED) {
727 MODULE_STAT->rpc_answers_error ++;
728 } else {
729 MODULE_STAT->rpc_answers_received ++;
730 }
731 return t - tl_fetch_unread ();
732 }
733 }
734}
735
736static inline int __tl_store_init (struct tl_out_state *tlio_out, void *out, void *out_extra, enum tl_type type, const struct tl_out_methods *methods, int size, long long qid) {
737 assert (tlio_out);
738 assert (!TL_OUT_METHODS);
739
740 TL_OUT = out;
741 TL_OUT_EXTRA = out_extra;
742 if (out) {
743 TL_OUT_METHODS = methods;
744 TL_OUT_TYPE = type;
745 if (type != tl_type_none && !(methods->flags & (TLF_ALLOW_PREPEND | TLF_DISABLE_PREPEND))) {
746 TL_OUT_SIZE = (int *) methods->store_get_ptr (tlio_out, methods->prepend_bytes + (qid ? 12 : 0));
747 }
748 } else {
749 TL_OUT_TYPE = tl_type_none;
750 }
751
752 TL_OUT_POS = 0;
753 TL_OUT_QID = qid;
754 TL_OUT_REMAINING = size;
755
756 tlio_out->errnum = 0;
757 tlio_out->error = NULL;
758
759 return 0;
760}
761
762/*int tls_init_simple (struct tl_out_state *tlio_out, connection_job_t c) {
763 if (c) {
764 TL_OUT_PID = &(RPCS_DATA(c)->remote_pid);
765 } else {
766 TL_OUT_PID = 0;
767 }
768 return __tl_store_init (tlio_out, job_incref (c), 0, tl_type_conn, &tl_out_conn_simple_methods, (1 << 27), 0);
769}*/
770
771int tls_init_raw_msg (struct tl_out_state *tlio_out, struct process_id *pid, long long qid) {
772 if (pid) {
773 memcpy (&tlio_out->out_pid_buf, pid, 12);
774 TL_OUT_PID = &tlio_out->out_pid_buf;
775 } else {
776 TL_OUT_PID = 0;
777 }
778 struct raw_message *d = 0;
779 if (pid) {
780 d = (struct raw_message *)malloc (sizeof (*d));
781 rwm_init (d, 0);
782 }
783 return __tl_store_init (tlio_out, d, NULL, tl_type_raw_msg, &tl_out_raw_msg_methods, (1 << 27), qid);
784}
785
786int tls_init_tcp_raw_msg (struct tl_out_state *tlio_out, JOB_REF_ARG(c), long long qid) {
787 if (c) {
788 TL_OUT_PID = &(TCP_RPC_DATA(c)->remote_pid);
789 } else {
790 TL_OUT_PID = 0;
791 }
792 struct raw_message *d = 0;
793 if (c) {
794 d = (struct raw_message *)malloc (sizeof (*d));
795 rwm_init (d, 0);
796 }
797 return __tl_store_init (tlio_out, d, c, tl_type_tcp_raw_msg, &tl_out_tcp_raw_msg_methods, (1 << 27), qid);
798}
799
800int tls_init_tcp_raw_msg_unaligned (struct tl_out_state *tlio_out, JOB_REF_ARG(c), long long qid) {
801 if (c) {
802 TL_OUT_PID = &(TCP_RPC_DATA(c)->remote_pid);
803 } else {
804 TL_OUT_PID = 0;
805 }
806 struct raw_message *d = 0;
807 if (c) {
808 d = (struct raw_message *)malloc (sizeof (*d));
809 rwm_init (d, 0);
810 }
811 return __tl_store_init (tlio_out, d, c, tl_type_tcp_raw_msg, &tl_out_tcp_raw_msg_unaligned_methods, (1 << 27), qid);
812}
813
814int tls_init_str (struct tl_out_state *tlio_out, char *s, long long qid, int size) {
815 TL_OUT_PID = 0;
816 return __tl_store_init (tlio_out, s, s, tl_type_str, &tl_out_str_methods, size, qid);
817}
818
819int tls_init_raw_msg_nosend (struct tl_out_state *tlio_out) {
820 struct raw_message *d = (struct raw_message *)malloc (sizeof (*d));
821 rwm_init (d, 0);
822 return __tl_store_init (tlio_out, d, d, tl_type_raw_msg, &tl_out_raw_msg_methods_nosend, (1 << 27), 0);
823}
824/*
825int tls_init_any (struct tl_out_state *tlio_out, enum tl_type type, void *out, long long qid) {
826 switch (type) {
827 case tl_type_conn:
828 return tls_init_connection (tlio_out, (connection_job_t )out, qid);
829 case tl_type_tcp_raw_msg:
830 return tls_init_tcp_raw_msg (tlio_out, out, qid);
831 default:
832 assert (0);
833 }
834}*/
835
836int tls_header (struct tl_out_state *tlio_out, struct tl_query_header *header) {
837 assert (tls_check (tlio_out, 0) >= 0);
838 assert (header->op == (int)RPC_REQ_ERROR || header->op == (int)RPC_REQ_RESULT || header->op == (int)RPC_INVOKE_REQ || header->op == (int)RPC_REQ_ERROR_WRAPPED);
839 if (header->op == (int)RPC_INVOKE_REQ) {
840 if (header->flags) {
841 tl_store_int (RPC_DEST_ACTOR_FLAGS);
842 tl_store_long (header->actor_id);
843 tl_store_int (header->flags);
844 } else if (header->actor_id) {
845 tl_store_int (RPC_DEST_ACTOR);
846 tl_store_long (header->actor_id);
847 }
848 } else if (header->op == RPC_REQ_ERROR_WRAPPED) {
849 tl_store_int (RPC_REQ_ERROR);
850 tl_store_long (TL_OUT_QID);
851 } else if (header->op == RPC_REQ_RESULT) {
852 if (header->flags) {
853 tl_store_int (RPC_REQ_RESULT_FLAGS);
854 tl_store_int (header->flags);
855 }
856 }
857 return 0;
858}
859
860int tls_end_ext (struct tl_out_state *tlio_out, int op) {
861 if (TL_OUT_TYPE == tl_type_none) {
862 return 0;
863 }
864 assert (TL_OUT);
865 assert (TL_OUT_TYPE);
866 if (tlio_out->error) {
867// tl_store_clear ();
868 tl_store_clean ();
869 vkprintf (1, "tl_store_end: "PID_PRINT_STR" writing error %s, errnum %d, tl.out_pos = %d\n", PID_TO_PRINT(TL_OUT_PID), tlio_out->error, tlio_out->errnum, TL_OUT_POS);
870 //tl_store_clear ();
871 tl_store_int (RPC_REQ_ERROR);
872 tl_store_long (TL_OUT_QID);
873 tl_store_int (tlio_out->errnum);
874 tl_store_string0 (tlio_out->error);
875
876 MODULE_STAT->rpc_sent_errors ++;
877 } else {
878 if (op == RPC_REQ_RESULT) {
879 MODULE_STAT->rpc_sent_answers ++;
880 } else {
881 MODULE_STAT->rpc_sent_queries ++;
882 }
883 }
884 if (!(TL_OUT_FLAGS & TLF_NOALIGN)) {
885 assert (!(TL_OUT_POS & 3));
886 }
887
888 {
889 int *p;
890 if (TL_OUT_FLAGS & TLF_ALLOW_PREPEND) {
891 p = TL_OUT_SIZE = tl_store_get_prepend_ptr (TL_OUT_METHODS->prepend_bytes + (TL_OUT_QID ? 12 : 0));
892 } else {
893 p = TL_OUT_SIZE;
894 }
895
896 if (TL_OUT_QID) {
897 assert (op);
898 p += (TL_OUT_METHODS->prepend_bytes) / 4;
899 *p = op;
900 *(long long *)(p + 1) = TL_OUT_QID;
901 }
902 }
903
904 if (TL_OUT_METHODS->store_prefix) {
905 TL_OUT_METHODS->store_prefix (tlio_out);
906 }
907
908 if (!(TL_OUT_FLAGS & TLF_NO_AUTOFLUSH)) {
909 TL_OUT_METHODS->store_flush (tlio_out);
910 }
911 vkprintf (2, "tl_store_end: written %d bytes, qid = %lld, PID = " PID_PRINT_STR "\n", TL_OUT_POS, TL_OUT_QID, PID_TO_PRINT (TL_OUT_PID));
912 TL_OUT = 0;
913 TL_OUT_TYPE = tl_type_none;
914 TL_OUT_METHODS = 0;
915 TL_OUT_EXTRA = 0;
916 return 0;
917}
918
919int tls_init (struct tl_out_state *tlio_out, enum tl_type type, struct process_id *pid, long long qid) {
920 switch (type) {
921 case tl_type_raw_msg:
922 {
923 tls_init_raw_msg (tlio_out, pid, qid);
924 return 1;
925 }
926 case tl_type_tcp_raw_msg:
927 {
928 connection_job_t d = rpc_target_choose_connection (rpc_target_lookup (pid), pid);
929 if (d) {
930 vkprintf (2, "%s: Good connection " PID_PRINT_STR "\n", __func__, PID_TO_PRINT (pid));
931 tls_init_tcp_raw_msg (tlio_out, JOB_REF_PASS (d), qid);
932 return 1;
933 } else {
934 vkprintf (2, "%s: Bad connection " PID_PRINT_STR "\n", __func__, PID_TO_PRINT (pid));
935 return -1;
936 }
937 }
938 case tl_type_none:
939 vkprintf (2, "Trying to tl_init_store() with type tl_type_none, qid=%lld\n" , qid);
940 return -1;
941 default:
942 fprintf (stderr, "type = %d\n", type);
943 assert (0);
944 return 0;
945 }
946}
947
948
949struct tl_in_state *tl_in_state_alloc (void) {
950 MODULE_STAT->tl_in_allocated ++;
951 return calloc (sizeof (struct tl_in_state), 1);
952}
953
954void tl_in_state_free (struct tl_in_state *tlio_in) {
955 MODULE_STAT->tl_in_allocated --;
956 if (tlio_in->in_methods && tlio_in->in_methods->fetch_clear) {
957 tlio_in->in_methods->fetch_clear (tlio_in);
958 }
959 if (tlio_in->error) {
960 free (PTR_MOVE (tlio_in->error));
961 }
962 free (tlio_in);
963}
964
965struct tl_out_state *tl_out_state_alloc (void) {
966 MODULE_STAT->tl_out_allocated ++;
967 return calloc (sizeof (struct tl_out_state), 1);
968}
969
970void tl_out_state_free (struct tl_out_state *tlio_out) {
971 MODULE_STAT->tl_out_allocated --;
972 if (tlio_out->out_methods && tlio_out->out_methods->store_clear) {
973 tlio_out->out_methods->store_clear (tlio_out);
974 }
975 if (tlio_out->error) {
976 free (PTR_MOVE (tlio_out->error));
977 }
978 free (tlio_out);
979}
980