1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2013 Vkontakte Ltd
18 2013 Vitaliy Valtman
19 2013 Anton Maydell
20
21 Copyright 2014 Telegram Messenger Inc
22 2014 Vitaly Valtman
23 2014 Anton Maydell
24
25 Copyright 2015-2016 Telegram Messenger Inc
26 2015-2016 Vitaliy Valtman
27*/
28#include "engine/engine-rpc.h"
29#include "common/tl-parse.h"
30
31#include <assert.h>
32#include <stdarg.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/time.h>
37#include <errno.h>
38
39//#include "net/net-buffers.h"
40#include "net/net-events.h"
41#include "net/net-msg.h"
42#include "net/net-msg-buffers.h"
43//#include "net/net-rpc-server.h"
44#include "net/net-rpc-targets.h"
45#include "net/net-tcp-connections.h"
46#include "net/net-tcp-rpc-common.h"
47#include "net/net-tcp-rpc-server.h"
48
49#include "common/cpuid.h"
50#include "common/crc32.h"
51#include "common/kprintf.h"
52#include "common/server-functions.h"
53
54#include "engine/engine-rpc-common.h"
55
56#include "vv/vv-io.h"
57#include "vv/vv-tree.h"
58
59//#include "auto/TL/tl-names.h"
60
61#include "engine/engine.h"
62#include "common/common-stats.h"
63
64double tl_aio_timeout;
65
66struct tl_out_state *tl_aio_init_store (enum tl_type type, struct process_id *pid, long long qid) {
67 if (type == tl_type_raw_msg) {
68 struct tl_out_state *IO = tl_out_state_alloc ();
69 tls_init_raw_msg (IO, pid, qid);
70 return IO;
71 } else if (type == tl_type_tcp_raw_msg) {
72 connection_job_t d = rpc_target_choose_connection (rpc_target_lookup (pid), pid);
73 if (d) {
74 vkprintf (2, "%s: Good connection #%d for " PID_PRINT_STR "\n", __func__, CONN_INFO(d)->fd, PID_TO_PRINT (pid));
75 struct tl_out_state *IO = tl_out_state_alloc ();
76 tls_init_tcp_raw_msg (IO, JOB_REF_PASS (d), qid);
77 return IO;
78 } else {
79 vkprintf (2, "%s: Bad connection " PID_PRINT_STR "\n", __func__, PID_TO_PRINT (pid));
80 return NULL;
81 }
82 } else {
83 assert (0);
84 return NULL;
85 }
86}
87
88#define ENGINE_JOB_CLASS JF_CLASS_MAIN
89
90static long long queries_allocated;
91
92long long engine_get_allocated_queries (void) {
93 return queries_allocated;
94}
95
96
97#define rpc_custom_op_cmp(a,b) (a->op < b->op ? -1 : a->op > b->op ? 1 : 0)
98
99#define X_TYPE struct rpc_custom_op *
100#define X_CMP rpc_custom_op_cmp
101#define TREE_NAME rpc_custom_op
102#define TREE_MALLOC
103#include "vv/vv-tree.c"
104static struct tree_rpc_custom_op *rpc_custom_op_tree;
105
106void register_custom_op_cb (unsigned op, void (*func)(struct tl_in_state *tlio_in, struct query_work_params *params)) {
107 struct rpc_custom_op *O = malloc (sizeof (*O));
108 O->op = op;
109 O->func = func;
110 rpc_custom_op_tree = tree_insert_rpc_custom_op (rpc_custom_op_tree, O, lrand48 ());
111}
112
113static struct tl_act_extra *(*tl_parse_function)(struct tl_in_state *tlio_in, long long actor_id);
114static int (*tl_get_op_function)(struct tl_in_state *tlio_in);
115static void (*tl_stat_function)(struct tl_out_state *tlio_out);
116
117int tl_result_new_flags (int old_flags) {
118 return old_flags & 0xffff;
119}
120
121int tl_result_get_header_len (struct tl_query_header *h) {
122 if (!h->flags) { return 0; }
123 int s = 8;
124 return s;
125}
126
127int tl_result_make_header (int *ptr, struct tl_query_header *h) {
128 int *p = ptr;
129 if (!h->flags) { return 0; }
130 int new_flags = tl_result_new_flags (h->flags);
131 *p = RPC_REQ_RESULT_FLAGS;
132 p++;
133 *p = new_flags;
134 p ++;
135 return (p - ptr) * 4;
136}
137
138void tl_default_act_free (struct tl_act_extra *extra) {
139 if (extra->header) {
140 tl_query_header_delete (extra->header);
141 }
142 if (!(extra->flags & 1)) {
143 return;
144 }
145 free (extra);
146}
147
148struct tl_act_extra *tl_default_act_dup (struct tl_act_extra *extra) {
149 struct tl_act_extra *new = malloc (extra->size);
150 memcpy (new, extra, extra->size);
151 new->flags = new->flags | 3;
152 return new;
153}
154
155int need_dup (struct tl_act_extra *extra) {
156 return !(extra->flags & 1);
157}
158
159
160static tl_query_result_fun_t *tl_query_result_functions = NULL;
161
162void tl_query_result_fun_set (tl_query_result_fun_t func, int query_type_id) {
163 if (!tl_query_result_functions) {
164 tl_query_result_functions = calloc (sizeof (void *), 16);
165 }
166 tl_query_result_functions[query_type_id] = func;
167}
168
169long long tl_generate_next_qid (int query_type_id) {
170 assert (((unsigned) query_type_id) < 16);
171 static unsigned int last_qid = 0;
172 if (!last_qid) {
173 last_qid = lrand48_j ();
174 }
175 return ((unsigned long long) ((query_type_id << 28) + (lrand48_j () & 0x0fffffff)) << 32) | (++last_qid);
176}
177
178long long tl_generate_next_qid (int query_type_id);
179
180void engine_work_rpc_req_result (struct tl_in_state *tlio_in, struct query_work_params *params) {
181 if (!tl_query_result_functions) {
182 return;
183 }
184 struct tl_query_header *h = malloc (sizeof (*h));
185 if (tlf_query_answer_header (tlio_in, h) < 0) {
186 tl_query_header_delete (h);
187 return;
188 }
189 h->qw_params = params;
190 int query_type_id = (((unsigned long long) h->qid) >> 60);
191 tl_query_result_fun_t fun = tl_query_result_functions[query_type_id];
192 if (likely (fun != NULL)) {
193 fun (tlio_in, h);
194 } else {
195 vkprintf (1, "Unknown query type %d (qid = 0x%016llx). Skipping query result.\n", query_type_id, h->qid);
196 }
197 tl_query_header_delete (h);
198}
199
200int __tl_query_act_custom (struct tl_in_state *tlio_in, struct query_work_params *P) {
201 unsigned op = tl_fetch_lookup_int ();
202 struct rpc_custom_op *O = tree_lookup_ptr_rpc_custom_op (rpc_custom_op_tree, (void *)&op);
203 if (O) {
204 O->func (tlio_in, P);
205 }
206
207 return 0;
208}
209
210struct colon_extra {
211 struct raw_message *left;
212 char *left_error;
213 int left_error_code;
214 struct raw_message *right;
215 char *right_error;
216 int right_error_code;
217 struct raw_message **result;
218 char **error;
219 int *error_code;
220 job_t extra_ref;
221};
222
223struct ifeq_extra {
224 struct raw_message *left;
225 char *left_error;
226 int left_error_code;
227 struct raw_message *right;
228 char *right_error;
229 int right_error_code;
230 struct raw_message *check;
231 int check_result;
232 struct raw_message **result;
233 char **error;
234 int *error_code;
235 job_t extra_ref;
236 job_t right_job;
237};
238
239static int process_act_atom_subjob (job_t job, int op, struct job_thread *JT);
240
241/* {{{ auto TL parse functions weak declaration */
242
243struct paramed_type *skip_function_any (struct tl_in_state *tlio_in) __attribute__ ((weak));
244struct paramed_type *skip_function_any (struct tl_in_state *tlio_in) { return NULL; }
245struct paramed_type *fetch_function_any (struct tl_in_state *tlio_in) __attribute__ ((weak));
246struct paramed_type *fetch_function_any (struct tl_in_state *tlio_in) { return NULL; }
247
248int skip_type_any (struct tl_in_state *tlio_in, struct paramed_type *P) __attribute__ ((weak));
249int skip_type_any (struct tl_in_state *tlio_in, struct paramed_type *P) { return -1; }
250int fetch_type_any (struct tl_in_state *tlio_in, struct paramed_type *P) __attribute__ ((weak));
251int fetch_type_any (struct tl_in_state *tlio_in, struct paramed_type *P) { return -1; }
252
253void free_vars_to_be_freed (void) __attribute__ ((weak));
254void free_vars_to_be_freed (void) {}
255void tl_printf_clear (void) __attribute__ ((weak));
256void tl_printf_clear (void) {}
257
258static inline struct paramed_type *do_skip_function_any (struct tl_in_state *tlio_in) {
259 free_vars_to_be_freed ();
260 return skip_function_any (tlio_in);
261}
262
263static inline struct paramed_type *do_fetch_function_any (struct tl_in_state *tlio_in) {
264 free_vars_to_be_freed ();
265 tl_printf_clear ();
266 return fetch_function_any (tlio_in);
267}
268
269static inline int do_fetch_type_any (struct tl_in_state *tlio_in, struct paramed_type *P) {
270 tl_printf_clear ();
271 return fetch_type_any (tlio_in, P);
272}
273
274void paramed_type_free (struct paramed_type *P) __attribute__ ((weak));
275void paramed_type_free (struct paramed_type *P) {}
276
277struct paramed_type *paramed_type_dup (struct paramed_type *P) __attribute__ ((weak));
278struct paramed_type *paramed_type_dup (struct paramed_type *P) { return 0; }
279
280/* }}} */
281
282static job_t fetch_query (job_t parent, struct tl_in_state *IO, struct raw_message **raw, char **error, int *error_code, long long actor_id, job_t extra_ref, job_t all_list, int status, struct tl_query_header *h) /* {{{ */ {
283 int fop = tl_get_op_function (IO);
284
285 struct tl_act_extra *extra = tl_default_parse_function (IO, actor_id);
286 if (!extra && tlf_error (IO)) {
287 *error = strdup (IO->error);
288 *error_code = IO->errnum;
289 return NULL;
290 }
291 if (!extra && tl_parse_function) {
292 extra = tl_parse_function (IO, actor_id);
293 }
294 if (!extra) {
295 tlf_set_error_format (IO, TL_ERROR_UNKNOWN_FUNCTION_ID, "Unknown op 0x%08x", tlf_lookup_int (IO));
296 *error = strdup (IO->error);
297 *error_code = IO->errnum;
298 return NULL;
299 }
300 if (!extra->free) {
301 extra->free = tl_default_act_free;
302 }
303 if (!extra->dup) {
304 extra->dup = tl_default_act_dup;
305 }
306 extra->op = fop;
307 assert (extra->act);
308 assert (extra->free);
309 assert (extra->dup);
310 extra->error = error;
311 extra->error_code = error_code;
312 extra->raw = raw;
313 extra->extra_ref = extra_ref ? job_incref (extra_ref) : 0;
314
315 extra = need_dup (extra) ? extra->dup (extra) : extra;
316
317 job_t job = create_async_job (process_act_atom_subjob, status | JSC_ALLOW (JC_ENGINE, JS_RUN) | JSC_ALLOW (JC_ENGINE, JS_ABORT) | JSC_ALLOW (JC_ENGINE, JS_FINISH), extra->subclass, sizeof (void *), 0, JOB_REF_CREATE_PASS_N (parent));
318
319 *(void **)job->j_custom = extra;
320
321 if (all_list) {
322 insert_job_into_job_list (all_list, JOB_REF_CREATE_PASS (job), JSP_PARENT_ERROR);
323 }
324
325 queries_allocated ++;
326
327 return job;
328}
329/* }}} */
330
331
332static int fetch_all_queries (job_t parent, struct tl_in_state *tlio_in) /* {{{ */ {
333 struct query_work_params *P = (struct query_work_params *) parent->j_custom;
334
335
336 struct tl_query_header *h = P->h;
337
338 job_t root = fetch_query (parent, tlio_in, &P->result, &P->error, &P->error_code, h->actor_id, 0, P->all_list, JSP_PARENT_RWE, h);
339
340 if (root == (void *)-1l) {
341 return -2;
342 } else if (root) {
343 schedule_job (JOB_REF_PASS (root));
344
345 return 0;
346 } else {
347 return -1;
348 }
349}
350/* }}} */
351
352static int process_act_atom_subjob (job_t job, int op, struct job_thread *JT) /* {{{ */ {
353 if (op != JS_FINISH) {
354 if (parent_job_aborted (job)) {
355 return job_fatal (job, ECANCELED);
356 }
357 }
358
359 struct tl_act_extra *E = *(void **)job->j_custom;
360
361 switch (op) {
362 case JS_RUN: {
363 int ok = 1;
364 if (!ok && !(E->type & (QUERY_ALLOW_REPLICA_GET | QUERY_ALLOW_UNINIT))) {
365 if (E->raw) {
366 *E->error = strdup ("not coord anymore");
367 *E->error_code = TL_ERROR_BINLOG_DISABLED;
368 E->raw = 0;
369 if (E->extra_ref) {
370 job_decref (JOB_REF_PASS (E->extra_ref));
371 }
372 }
373 return job_fatal (job, EIO);
374 } else {
375 if (!E->raw) {
376 if (E->extra_ref) {
377 job_decref (JOB_REF_PASS (E->extra_ref));
378 }
379 return JOB_COMPLETED;
380 }
381
382 struct tl_out_state *IO = tl_out_state_alloc ();
383 tls_init_raw_msg_nosend (IO);
384 E->tlio_out = IO;
385
386 long long old_rdtsc = rdtsc ();
387 int res = E->act (job, E);
388 E->tlio_out = NULL;
389 long long rdtsc_delta = rdtsc () - old_rdtsc;
390 //vv_incr_stat_counter (STAT_QPS_TIME, rdtsc_delta);
391 //vv_op_stat_insert_rdtsc (E->op, rdtsc_delta);
392 //if (rdtsc_delta > (int)(0.05 * 2e9)) {
393 // long_queries_cpu_cnt ++;
394 //}
395 E->cpu_rdtsc += rdtsc_delta;
396
397 if (res >= 0 && !IO->error) {
398 //assert (TL_OUT_RAW_MSG);
399 struct raw_message *raw = malloc (sizeof (*raw));
400 rwm_clone (raw, (struct raw_message *)IO->out);
401 tl_out_state_free (IO);
402 if (E->raw) {
403 *E->raw = raw;
404 E->raw = 0;
405 if (E->extra_ref) {
406 job_decref (JOB_REF_PASS (E->extra_ref));
407 }
408 }
409
410 return JOB_COMPLETED;
411 } else if (res == -2 && E->attempt < 5 && !IO->error && job->j_children > 0) {
412 tl_out_state_free (IO);
413
414 E->attempt ++;
415
416 return 0;
417 } else {
418 if (!IO->error) {
419 if (res == -2 && E->attempt >= 5) {
420 tls_set_error_format (IO, TL_ERROR_AIO_MAX_RETRY_EXCEEDED, "Maximum number of retries exceeded");
421 } else if (res == -2) {
422 tls_set_error_format (IO, TL_ERROR_BAD_METAFILE, "Error loading metafile");
423 } else {
424 tls_set_error_format (IO, TL_ERROR_UNKNOWN, "Unknown error");
425 }
426 }
427
428 assert (IO->error);
429 if (E->raw) {
430 *E->error = strdup (IO->error);
431 *E->error_code = IO->errnum;
432 E->raw = 0;
433 if (E->extra_ref) {
434 job_decref (JOB_REF_PASS (E->extra_ref));
435 }
436 }
437 tl_out_state_free (IO);
438
439 return job_fatal (job, EIO);
440 }
441 }
442 assert (0);
443 }
444 case JS_ABORT:
445 if (!job->j_error) {
446 job->j_error = ECANCELED;
447
448 if (E->raw) {
449 *E->error = strdup ("Job cancelled");
450 *E->error_code = TL_ERROR_UNKNOWN;
451 E->raw = 0;
452 }
453 }
454 if (E->extra_ref) {
455 job_decref (JOB_REF_PASS (E->extra_ref));
456 }
457 return JOB_COMPLETED;
458 case JS_FINISH:
459 queries_allocated --;
460 if (E->extra_ref) {
461 job_decref (JOB_REF_PASS (E->extra_ref));
462 }
463 E->free (E);
464 assert (job->j_refcnt == 1);
465 return job_free (JOB_REF_PASS (job));
466 default:
467 return JOB_ERROR;
468 }
469}
470/* }}} */
471
472static int process_query_job (job_t job, int op, struct job_thread *JT) /* {{{ */ {
473 struct query_work_params *P = (struct query_work_params *) job->j_custom;
474 struct tl_out_state *IO = NULL;
475 switch (op) {
476 case JS_RUN:
477 assert (!job->j_children);
478 assert (!P->wait_pos);
479 //assert (!P->wait_time);
480
481 if (!P->result && !P->error) {
482 P->error = strdup ("Unknown error");
483 P->error_code = TL_ERROR_UNKNOWN;
484 }
485 if (!P->answer_sent) {
486 if (P->fd && P->type == tl_type_raw_msg) {
487 connection_job_t C = connection_get_by_fd (P->fd);
488 if (C && CONN_INFO(C)->generation != P->generation) {
489 job_decref (JOB_REF_PASS (C));
490 }
491 if (C) {
492 IO = tl_out_state_alloc ();
493 tls_init_tcp_raw_msg (IO, JOB_REF_PASS (C), P->h->qid);
494 }
495 }
496 if (!IO) {
497 IO = tl_aio_init_store (P->type, &P->pid, P->h->qid);
498 }
499 }
500 if (IO) {
501 assert (!P->answer_sent);
502 //long long rdtsc_delta = rdtsc () - P->start_rdtsc;
503 //if (rdtsc_delta > engine_get_long_query_thres () * 2e9) {
504 // long_queries_cnt ++;
505 //}
506 if (P->error_code) {
507 tls_set_error_format (IO, P->error_code, "%s", P->error);
508 free (P->error);
509 P->error = 0;
510 } else {
511 int z = tl_result_get_header_len (P->h);
512 int *hptr = tls_get_ptr (IO, z);
513 assert (z == tl_result_make_header (hptr, P->h));
514 tls_raw_msg (IO, P->result, 0);
515 free (P->result);
516 P->result = NULL;
517 }
518 tls_end_ext (IO, RPC_REQ_RESULT);
519 tl_out_state_free (IO);
520 IO = NULL;
521 }
522 P->answer_sent ++;
523 job_timer_remove (job);
524 if (P->all_list) {
525 job_signal (JOB_REF_PASS (P->all_list), JS_ABORT);
526 }
527 return JOB_COMPLETED;
528 case JS_ALARM:
529 if (!job_timer_check (job)) {
530 return 0;
531 }
532 if (!P->answer_sent) {
533 IO = tl_aio_init_store (P->type, &P->pid, P->h->qid);
534 }
535 if (IO) {
536 if (P->error_code) {
537 tls_set_error_format (IO, P->error_code, "%s", P->error);
538 free (P->error);
539 P->error = NULL;
540 } else {
541 if (P->wait_pos/* || P->wait_time*/) {
542 tls_set_error_format (IO, TL_ERROR_AIO_TIMEOUT, "Binlog wait error");
543 } else {
544 tls_set_error_format (IO, TL_ERROR_AIO_TIMEOUT, "Aio wait error");
545 }
546 }
547
548 tls_end_ext (IO, RPC_REQ_RESULT);
549 tl_out_state_free (IO);
550 P->answer_sent ++;
551 }
552 //P->wait_time = job_delete_wait (P->wait_time);
553 if (!job->j_error) {
554 job->j_error = ETIMEDOUT;
555 }
556 if (P->all_list) {
557 job_signal (JOB_REF_PASS (P->all_list), JS_ABORT);
558 }
559 return JOB_COMPLETED;
560 case JS_ABORT:
561 //P->wait_time = job_delete_wait (P->wait_time);
562 if (!P->answer_sent) {
563 IO = tl_aio_init_store (P->type, &P->pid, P->h->qid);
564 }
565 if (IO) {
566 if (P->error_code) {
567 tls_set_error_format (IO, P->error_code, "%s", P->error);
568 free (P->error);
569 P->error = 0;
570 } else {
571 tls_set_error_format (IO, TL_ERROR_UNKNOWN, "Cancelled");
572 }
573 tls_end_ext (IO, RPC_REQ_RESULT);
574 P->answer_sent ++;
575 tl_out_state_free (IO);
576 IO = NULL;
577 }
578 job_timer_remove (job);
579 if (P->all_list) {
580 job_signal (JOB_REF_PASS (P->all_list), JS_ABORT);
581 }
582 return JOB_COMPLETED;
583 case JS_FINISH:
584 assert (!P->wait_pos);
585 //assert (!P->wait_time);
586 assert (!P->all_list);
587 assert (job->j_refcnt == 1);
588 if (P->P) {
589 paramed_type_free (P->P);
590 P->P = 0;
591 }
592 if (P->error) { free (P->error); }
593 if (P->result) {
594 rwm_free (P->result);
595 free (P->result);
596 }
597 if (P->src.magic) {
598 rwm_free (&P->src);
599 }
600 tl_query_header_delete (P->h);
601 return job_free (JOB_REF_PASS (job));
602 default:
603 return JOB_ERROR;
604 }
605}
606/* }}} */
607
608static int process_parse_subjob (job_t job, int op, struct job_thread *JT) /* {{{ */ {
609 struct query_work_params *P = (struct query_work_params *) job->j_custom;
610
611 switch (op) {
612 case JS_RUN: {
613 job->j_execute = process_query_job;
614
615 struct raw_message raw_copy;
616 rwm_clone (&raw_copy, &P->src);
617
618 struct tl_in_state *IO = tl_in_state_alloc ();
619 tlf_init_raw_message (IO, &P->src, P->src.total_bytes, 0);
620
621 int r = fetch_all_queries (job, IO);
622 tl_in_state_free (IO);
623 IO = NULL;
624
625 rwm_free (&raw_copy);
626 if (r < 0) {
627 return JOB_SENDSIG (JS_ABORT);
628 //return JOB_COMPLETED;
629 } else {
630 return 0;
631 }
632 }
633 case JS_ABORT:
634 case JS_ALARM:
635 case JS_FINISH:
636 return process_query_job (job, op, JT);
637 default:
638 return JOB_ERROR;
639 }
640}
641/* }}} */
642
643static int process_query_custom_subjob (job_t job, int op, struct job_thread *JT) /* {{{ */ {
644 struct query_work_params *P = (struct query_work_params *) job->j_custom;
645 if (op == JS_RUN) {
646 struct tl_in_state *IO = tl_in_state_alloc ();
647 tlf_init_raw_message (IO, &P->src, P->src.total_bytes, 0);
648 __tl_query_act_custom (IO, P);
649 tl_in_state_free (IO);
650
651 job_timer_remove (job);
652 return JOB_COMPLETED;
653 }
654 switch (op) {
655 case JS_ABORT:
656 job_timer_remove (job);
657 if (!job->j_error) {
658 job->j_error = ECANCELED;
659 }
660 return JOB_COMPLETED;
661 case JS_ALARM:
662 if (!job->j_error) {
663 job->j_error = ETIMEDOUT;
664 }
665 return JOB_COMPLETED;
666 case JS_FINISH:
667 assert (job->j_refcnt == 1);
668 if (P->src.magic) {
669 rwm_free (&P->src);
670 }
671 return job_free (JOB_REF_PASS (job));
672 default:
673 return JOB_ERROR;
674 }
675}
676/* }}} */
677
678int create_query_job (job_t job, struct raw_message *raw, struct tl_query_header *h, double timeout, struct process_id *remote_pid, enum tl_type out_type, int fd, int generation) /* {{{ */ {
679 job->j_execute = process_parse_subjob;
680 struct process_id pd = *remote_pid;
681 remote_pid = &pd;
682
683 struct query_work_params *P = (struct query_work_params *) job->j_custom;
684
685 memset (P, 0, sizeof (*P));
686 P->h = tl_query_header_dup (h);
687 P->start_rdtsc = rdtsc ();
688 if (P->wait_coord) {
689 vkprintf (1, "wait coord query\n");
690 }
691 P->fd = fd;
692 P->generation = generation;
693
694 P->pid = *remote_pid;
695 P->type = out_type;
696
697 job_timer_insert (job, precise_now + timeout);
698 rwm_clone (&P->src, raw);
699
700 return JOB_SENDSIG (JS_RUN);
701}
702/* }}} */
703
704int create_query_custom_job (job_t job, struct raw_message *raw, double timeout, int fd, int generation) /* {{{ */ {
705 job->j_execute = process_query_custom_subjob;
706
707 struct query_info *q = QUERY_INFO (job);
708 struct process_id p = q->src_pid;
709 enum tl_type type = q->src_type;
710 struct query_work_params *P = (struct query_work_params *) job->j_custom;
711 memset (P, 0, sizeof (*P));
712 P->pid = p;
713 P->type = type;
714 P->fd = fd;
715 P->generation = generation;
716
717 if (timeout > 0) {
718 job_timer_insert (job, precise_now + timeout);
719 }
720
721 rwm_clone (&P->src, raw);
722
723 return JOB_SENDSIG (JS_RUN);
724}
725/* }}} */
726
727int query_job_run (job_t job, int fd, int generation) /* {{{ */ {
728 struct query_info *q = QUERY_INFO (job);
729
730 struct tl_in_state *IO = tl_in_state_alloc ();
731 tlf_init_raw_message (IO, &q->raw, q->raw.total_bytes, 0);
732
733 int op = tlf_lookup_int (IO);
734 struct tl_query_header *h = NULL;
735
736 int res;
737 if (op != RPC_INVOKE_REQ) {
738 if (rpc_custom_op_tree) {
739 struct raw_message r;
740 rwm_clone (&r, (struct raw_message *)IO->in);
741
742 res = create_query_custom_job (job, &r, 0, fd, generation);
743 rwm_free (&r);
744 } else {
745 res = JOB_COMPLETED;
746 }
747 } else {
748 //vv_incr_stat_counter (STAT_QPS_CNT, 1);
749 h = malloc (sizeof (*h));
750 tlf_query_header (IO, h);
751
752 if (tlf_error (IO)) {
753 struct tl_out_state *OUT = tl_aio_init_store (q->src_type, &q->src_pid, h ? h->qid : 0);
754 if (OUT) {
755 tls_set_error_format (OUT, IO->errnum, "%s", IO->error);
756 tls_end_ext (OUT, RPC_REQ_RESULT);
757 tl_out_state_free (OUT);
758 }
759 res = JOB_COMPLETED;
760 } else {
761 //tl_aio_init_store (q->src_type, &q->src_pid, h ? h->qid : 0);
762 struct raw_message r;
763 rwm_clone (&r, (struct raw_message *)IO->in);
764 res = create_query_job (job, &r, h, tl_aio_timeout, &q->src_pid, q->src_type, fd, generation);
765 rwm_free (&r);
766 }
767 }
768 if (h) {
769 tl_query_header_delete (h);
770 }
771 tl_in_state_free (IO);
772 return res;
773}
774/* }}} */
775
776static int do_query_job_run (job_t job, int op, struct job_thread *JT) /* {{{ */ {
777 struct query_info *q = QUERY_INFO (job);
778 int fd = 0;
779 int generation = 0;
780 if (q->conn) {
781 rpc_target_insert_conn (q->conn);
782 fd = CONN_INFO((job_t)q->conn)->fd;
783 generation = CONN_INFO((job_t)q->conn)->generation;
784 job_decref (JOB_REF_PASS (q->conn));
785 }
786 if (op == JS_RUN) {
787 return query_job_run (job, fd, generation);
788 }
789 assert (!job_timer_active (job));
790 switch (op) {
791 case JS_ALARM:
792 if (!job->j_error) {
793 job->j_error = ETIMEDOUT;
794 }
795 return JOB_COMPLETED;
796 case JS_ABORT:
797 if (!job->j_error) {
798 job->j_error = ECANCELED;
799 }
800 return JOB_COMPLETED;
801 case JS_FINISH:
802 if (q->raw.magic) {
803 rwm_free (&q->raw);
804 }
805 return job_free (JOB_REF_PASS (job));
806 default:
807 return JOB_ERROR;
808 }
809}
810/* }}} */
811
812int do_create_query_job (struct raw_message *raw, int type, struct process_id *pid, void *conn) /* {{{ */ {
813 job_t job = create_async_job (do_query_job_run, JSP_PARENT_RWE | JSC_ALLOW (JC_ENGINE, JS_RUN) | JSC_ALLOW (JC_ENGINE, JS_ABORT) | JSC_ALLOW (JC_ENGINE, JS_ALARM) | JSC_ALLOW (JC_ENGINE, JS_FINISH), -2, sizeof (struct query_work_params), JT_HAVE_TIMER, JOB_REF_NULL);
814
815 struct query_info *q = QUERY_INFO (job);
816
817 q->raw = *raw;
818 q->src_type = type;
819 q->src_pid = *pid;
820 q->conn = conn;
821
822 schedule_job (JOB_REF_PASS (job));
823 return 0;
824}
825/* }}} */
826
827
828
829/* }}} */
830
831int default_tl_close_conn (connection_job_t c, int who) {
832 rpc_target_delete_conn (c);
833 return 0;
834}
835
836int default_tl_tcp_rpcs_execute (connection_job_t c, int op, struct raw_message *raw) /* {{{ */ {
837 CONN_INFO(c)->last_response_time = precise_now;
838 //rpc_target_insert_conn (c);
839
840 if (op == RPC_PONG) {
841 do_create_query_job (raw, tl_type_tcp_raw_msg, &TCP_RPC_DATA(c)->remote_pid, NULL);
842 } else {
843 do_create_query_job (raw, tl_type_tcp_raw_msg, &TCP_RPC_DATA(c)->remote_pid, job_incref (c));
844 }
845 return 1;
846}
847/* }}} */
848
849int tl_store_stats (struct tl_out_state *tlio_out, const char *s, int raw) /* {{{ */ {
850 int i, key_start = 0, value_start = -1;
851 if (!raw) {
852 tl_store_int (TL_STAT);
853 }
854 int *cnt_ptr = tl_store_get_ptr (4);
855 *cnt_ptr = 0;
856 for (i = 0; s[i]; i++) {
857 if (s[i] == '\n') {
858 if (value_start - key_start > 1 && value_start < i) {
859 tl_store_string (s + key_start, value_start - key_start - 1); /* - 1 (trim tabular) */
860 tl_store_string (s + value_start, i - value_start);
861 ++*cnt_ptr;
862 }
863 key_start = i + 1;
864 value_start = -1;
865 } else if (s[i] == '\t') {
866 value_start = value_start == -1 ? i + 1 : -2;
867 }
868 }
869 return *cnt_ptr;
870}
871/* }}} */
872
873
874static void default_stat_function (struct tl_out_state *tlio_out) {
875 static char buf[(1 << 12)];
876 prepare_stats (buf, (1 << 12) - 2);
877 tl_store_stats (tlio_out, buf, 0);
878}
879
880void tl_engine_store_stats (struct tl_out_state *tlio_out) {
881 if (tl_stat_function) {
882 tl_stat_function (tlio_out);
883 } else {
884 default_stat_function (tlio_out);
885 }
886}
887
888void engine_tl_init (struct tl_act_extra *(*parse)(struct tl_in_state *,long long), void (*stat)(), int (get_op)(struct tl_in_state *), double timeout, const char *name) {
889 tl_parse_function = parse;
890 tl_stat_function = stat;
891 tl_aio_timeout = timeout;
892 tl_get_op_function = get_op;
893}
894
895