1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2014-2015 Telegram Messenger Inc
18 2014-2015 Nikolai Durov
19 2014 Andrey Lopatin
20*/
21
22#pragma once
23
24#include <stdlib.h>
25#include <string.h>
26#include <semaphore.h>
27#include "net/net-events.h"
28#include "net/net-msg.h"
29#include "net/net-timers.h"
30
31#define __joblocked
32#define __jobref
33
34#define MAX_SUBCLASS_THREADS 16
35
36//#include "net/net-connections.h"
37
38// verbosity level for jobs
39#define JOBS_DEBUG 3
40
41#define CONCAT(a,b) a ## b
42
43#define PTR_MOVE(__ptr_v) \
44 ({ typeof(__ptr_v) __ptr_v_save = __ptr_v; __ptr_v = NULL; __ptr_v_save; })
45
46#define JOB_REF_ARG(__name) int __name ## _tag_int, job_t __name
47#define JOB_REF_PASS(__ptr) 1, PTR_MOVE (__ptr)
48#define JOB_REF_NULL 1, NULL
49#define JOB_REF_CREATE_PASS(__ptr) 1, job_incref (__ptr)
50#define JOB_REF_CREATE_PASS_N(__ptr) 1, __ptr ? job_incref (__ptr) : NULL
51
52struct job_thread;
53struct async_job;
54typedef struct async_job *job_t;
55
56typedef int (*job_function_t)(job_t job, int op, struct job_thread *JT);
57
58extern __thread struct job_thread *this_job_thread;
59extern __thread job_t this_job;
60
61#define JOB_DESTROYED -0x80000000
62#define JOB_COMPLETED 0x100
63#define JOB_FINISH 0x80
64#define JOB_ERROR -1
65
66/* job signal numbers (0..7) */
67#define JS_FREE -1 /* pseudo-signal, invoked to free job structure ("destructor") */
68#define JS_RUN 0
69#define JS_AUX 1
70#define JS_MSG 2
71#define JS_ALARM 4 /* usually sent by timer */
72#define JS_ABORT 5 /* used for error propagation, especially from children */
73#define JS_KILL 6
74#define JS_FINISH 7
75#define JS_SIG0 0
76#define JS_SIG1 1
77#define JS_SIG2 2
78#define JS_SIG3 3
79#define JS_SIG4 4
80#define JS_SIG5 5
81#define JS_SIG6 6
82#define JS_SIG7 7
83
84extern int engine_multithread_mode;
85
86#define JC_EPOLL JC_MAIN
87#define JC_METAFILE_READ JC_IO
88#define JC_METAFILE_PREPARE JC_CPU
89#define JC_CONNECTION 4
90#define JC_CONNECTION_IO 5
91#define JC_UDP 6
92#define JC_UDP_IO 7
93#define JC_ENGINE 8
94#define JC_GMS JC_ENGINE
95#define JC_GMS_CPU 10
96#define JC_ENGINE_MULT 11
97
98
99#define DEFAULT_IO_JOB_THREADS 16
100#define DEFAULT_CPU_JOB_THREADS 8
101#define DEFAULT_GMS_CPU_JOB_THREADS 8
102
103// fake class
104// no signals should be allowed
105#define JC_MP_QUEUE 9
106
107
108#define JC_NONE 0 // no signal (unless used with "fast" flag; then it means "any context")
109#define JC_IO 1 // signal must be processed in I/O thread
110#define JC_CPU 2 // signal must be processed in CPU thread
111#define JC_MAIN 3 // signal must be processed in main thread (unless specified otherwise)
112#define JC_MAX 0xf
113#define JC_MASK JC_MAX
114
115#define JF_LOCKED 0x10000 // job is "locked" (usually this means that a signal is being processed)
116#define JF_SIGINT 0x20000 // signal interruption: if job is "locked" and we send a new signal to it, invoke pthread_signal() as well
117#define JF_COMPLETED 0x40000 // used to signal job "completion" to outside observers
118
119#define JF_QUEUED_CLASS(__c) (1 << (__c))
120#define JF_QUEUED_MAIN JF_QUEUED_CLASS(JC_MAIN) // job is in MAIN execution queue
121#define JF_QUEUED_IO JF_QUEUED_CLASS(JC_IO) // job is in IO execution queue
122#define JF_QUEUED_CPU JF_QUEUED_CLASS(JC_CPU) // job is in CPU execution queue
123#define JF_QUEUED 0xffff // job is in some execution queue
124
125#define JT_HAVE_TIMER 1
126#define JT_HAVE_MSG_QUEUE 2
127
128#define JFS_SET(__s) (0x1000000U << (__s)) // j_flags: signal __s is awaiting delivery
129#define JSS_ALLOW(__s) (0x1000000U << (__s)) // j_status: signal __s is allowed for delivery
130#define JSS_FAST(__s) (0x10000U << (__s)) // j_status: signal __s is "fast" -- may be processed recursively in specified or in any context, not necessarily in order
131#define JSS_ALLOW_FAST(__s) (0x1010000U << (__s))
132
133
134#define JOB_SENDSIG(__s) (1 << (__s))
135
136#define JSC_TYPE(__c,__s) (((unsigned long long)(__c) << ((__s) * 4 + 32)))
137#define JSC_ALLOW(__c,__s) (JSC_TYPE(__c,__s) | JSS_ALLOW(__s))
138#define JSC_FAST(__c,__s) (JSC_TYPE(__c,__s) | JSS_ALLOW_FAST(__s))
139#define JSIG_MAIN(__s) JSC_ALLOW(JC_MAIN,__s)
140#define JSIG_IO(__s) JSC_ALLOW(JC_IO,__s)
141#define JSIG_CPU(__s) JSC_ALLOW(JC_CPU,__s)
142#define JSIG_FAST(__s) JSC_FAST(JC_NONE,__s)
143#define JSIG_ENGINE(__s) JSC_ALLOW(JC_ENGINE,__s)
144
145#define JSP_PARENT_ERROR 1 // j_status: propagate error to j_error field in j_parent, and send ABORT to parent
146#define JSP_PARENT_RUN 2 // j_status: send RUN to j_parent after job completion
147#define JSP_PARENT_WAKEUP 4 // j_status: decrease j_parent's j_children; if it becomes 0, maybe send RUN
148#define JSP_PARENT_RESPTR 8 // j_status: (result) pointer(s) kept in j_custom actually point inside j_parent; use them only if j_parent is still valid
149#define JSP_PARENT_INCOMPLETE 0x10 // abort job if parent already completed
150#define JSP_PARENT_RWE 7
151#define JSP_PARENT_RWEP 0xf
152#define JSP_PARENT_RWEI 0x17
153#define JSP_PARENT_RWEPI 0x1f
154
155#define JMC_UPDATE 1
156#define JMC_FORCE_UPDATE 2
157#define JMC_RPC_QUERY 3
158#define JMC_TYPE_MASK 31
159
160#define JMC_CONTINUATION 8
161
162#define JMC_EXTRACT_ANSWER(__type) (((__type) >> 8) & 255)
163#define JMC_ANSWER(__type) ((__type) << 8)
164
165/* all fields here, with the exception of bits 24..31 and JF_LOCKED of j_flags, j_error, j_refcnt, j_children, may be changed only
166 by somebody who already owns a lock to this job, or has the only pointer to it. */
167struct async_job { // must be partially compatible with `struct connection`
168 int j_flags; // bits 0..15: queue flags; bits 16..23: status; bits 24..31: received signals (only bits that can be changed without having lock)
169 int j_status; // bits 24..31: allowed signals; bits 16..23: corresponding signal is "fast"; bits 0..4: relation to parent
170 int j_sigclass; // bits (4*n)..(4*n-3): queue class of signal n, n=0..7
171 int j_refcnt; // reference counter, changed by job_incref() and job_decref(); when becomes zero, j_execute is invoked with op = JS_FREE
172 int j_error; // if non-zero, error code; may be overwritten by children (unless already non-zero: remembers first error only)
173 int j_children; // number of jobs to complete before scheduling this job
174 int j_align; // align of real allocated pointer
175 int j_custom_bytes;
176
177 unsigned int j_type; // Bit 0 - have event_timer (must be first bytes of j_custom)
178 // Bit 1 - have message queue (must be after event_timer or first, if there is no event_timer)
179 int j_subclass;
180
181 struct job_thread *j_thread; // thread currently processing this job
182 // maybe: reference to queue, position in queue -- if j_flags & JF_QUEUED -- to remove from queue if necessary
183 job_function_t j_execute; // invoked in correct context to process signals
184 job_t j_parent; // parent (dependent) job or 0
185 long long j_custom[0] __attribute__((aligned(64)));
186} __attribute__((aligned(64)));
187
188struct job_subclass {
189 int subclass_id;
190
191 int total_jobs;
192 int allowed_to_run_jobs;
193 int processed_jobs;
194
195 int locked;
196
197 struct mp_queue *job_queue;
198};
199
200struct job_subclass_list {
201 int subclass_cnt;
202
203 sem_t sem;
204
205 struct job_subclass *subclasses;
206};
207
208struct job_class {
209 int thread_class;
210
211 int min_threads;
212 int max_threads;
213 int cur_threads;
214
215 struct mp_queue *job_queue;
216
217 struct job_subclass_list *subclasses;
218};
219
220struct job_thread {
221 pthread_t pthread_id;
222 int id;
223 int thread_class;
224 int job_class_mask; // job classes allowed to run in this thread
225 int status; // 0 = absent; +1 = created, +2 = running/waiting, +4 = performing job
226 long long jobs_performed;
227 struct mp_queue *job_queue;
228 struct async_job *current_job; // job currently performed or 0 (for DEBUG only)
229 double current_job_start_time, last_job_time, tot_jobs_time;
230 int jobs_running[JC_MAX+1];
231 long long jobs_created;
232 long long jobs_active;
233 int thread_system_id;
234 struct drand48_data rand_data;
235 job_t timer_manager;
236 double wakeup_time;
237 struct job_class *job_class;
238} __attribute__((aligned(128)));
239
240struct job_message {
241 unsigned int type;
242 unsigned int flags;
243 unsigned int payload_ints;
244 job_t src;
245 void (*destructor)(struct job_message *M);
246 struct raw_message message;
247 struct job_message *next;
248 unsigned int payload[0];
249};
250
251struct job_message_queue {
252 struct mp_queue *unsorted;
253 struct job_message *first, *last;
254 unsigned int payload_magic;
255};
256
257struct job_timer_info {
258 struct event_timer ev;
259 void *extra;
260 double (*wakeup)(void *);
261};
262
263#define MAX_JOB_THREADS 256
264
265long int lrand48_j (void);
266long int mrand48_j (void);
267double drand48_j (void);
268
269int init_async_jobs (void);
270int create_job_class (int job_class, int min_threads, int max_threads, int excl);
271int create_job_class_sub (int job_class, int min_threads, int max_threads, int excl, int subclass_cnt);
272job_t notify_job_create (int sig_class);
273int create_job_thread_ex (int thread_class, void *(*thread_work)(void *));
274int create_new_job_class (int job_class, int min_threads, int max_threads);
275int create_new_job_class_sub (int job_class, int min_threads, int max_threads, int subclass_cnt);
276void *job_thread_ex (void *arg, void (*work_one)(void *, int));
277
278/* creates a new async job as described */
279job_t create_async_job (job_function_t run_job, unsigned long long job_signals, int job_subclass, int custom_bytes, unsigned long long job_type, JOB_REF_ARG (parent_job));
280void job_change_signals (job_t job, unsigned long long job_signals);
281/* puts job into execution queue according to its priority class (actually, unlocks it and sends signal 0) */
282int schedule_job (JOB_REF_ARG (job));
283
284job_t job_incref (job_t job);
285static inline job_t job_incref_f (job_t job) {
286 if (job) {
287 job_incref (job);
288 }
289 return job;
290}
291void job_decref (JOB_REF_ARG (job)); // if job->j_refcnt becomes 0, invokes j_execute with op = JS_FREE
292static inline void job_decref_f (job_t job) {
293 job_decref (JOB_REF_PASS (job));
294}
295
296int unlock_job (JOB_REF_ARG (job));
297int try_lock_job (job_t job, int set_flags, int clear_flags);
298
299void complete_job (job_t job); // if JF_COMPLETED is not set, sets it and acts according to JFS_PARENT_*
300
301int change_locked_job_subclass (job_t job, int new_subclass);
302
303static inline int check_job_completion (job_t job) {
304 return job->j_flags & JF_COMPLETED;
305}
306static inline int check_job_validity (job_t job) {
307 return job && !check_job_completion (job);
308}
309static inline int check_parent_job_validity (job_t job) {
310 return check_job_validity (job->j_parent);
311}
312static inline int parent_job_aborted (job_t job) {
313 return (job->j_status & JSP_PARENT_INCOMPLETE) && job->j_parent && check_job_completion (job->j_parent);
314}
315static inline int job_parent_ptr_valid (job_t job) {
316 return (!(job->j_status & JSP_PARENT_RESPTR) || check_parent_job_validity (job));
317}
318static inline int job_fatal (job_t job, int error) {
319 if (!job->j_error) {
320 job->j_error = error;
321 }
322 return JOB_COMPLETED;
323}
324
325/* runs all pending jobs of class JF_CLASS_MAIN, then returns */
326int run_pending_main_jobs (void);
327
328
329/* ----------- JOB WAIT QUEUE ------ */
330
331struct job_list_node;
332
333typedef int (*job_list_node_type_t)(job_t list_job, int op, struct job_list_node *w);
334
335struct job_list_node {
336 struct job_list_node *jl_next;
337 job_list_node_type_t jl_type;
338 int jl_custom[0];
339};
340
341job_t create_job_list (void);
342int insert_job_into_job_list (job_t list_job, JOB_REF_ARG(job), int mode);
343void update_all_thread_stats (void);
344
345/* adds job to the list of jobs awaited by connection */
346// int conn_wait_job (job_t job, connection_job_t c, double timeout, struct conn_query_functions *cq);
347/* increases connection's generation (effectively clearing list of awaited jobs), then adds given job */
348// int conn_wait_only_job (job_t job, connection_job_t c, double timeout, struct conn_query_functions *cq);
349
350extern int max_job_thread_id;
351
352void check_main_thread (void);
353int job_timer_wakeup_gateway (event_timer_t *et);
354int job_timer_check (job_t job);
355void job_signal (JOB_REF_ARG (job), int signo);
356void complete_subjob (job_t job, JOB_REF_ARG (parent), int status);
357void job_timer_insert (job_t job, double timeout);
358void job_timer_remove (job_t job);
359int job_timer_active (job_t job);
360void job_timer_init (job_t job);
361double job_timer_wakeup_time (job_t job);
362void jobs_check_all_timers (void);
363
364static inline void check_thread_class (int class) {
365 assert (this_job_thread->job_class_mask & (1 << class));
366}
367
368void job_message_send (JOB_REF_ARG (job), JOB_REF_ARG (src), unsigned int type, struct raw_message *raw, int dup, int payload_ints, const unsigned int *payload, unsigned int flags, void (*destructor)(struct job_message *M));
369void job_message_send_fake (JOB_REF_ARG (job), int (*receive_message)(job_t job, struct job_message *M, void *extra), void *extra, JOB_REF_ARG (src), unsigned int type, struct raw_message *raw, int dup, int payload_ints, const unsigned int *payload, unsigned int flags, void (*destructor)(struct job_message *M));
370//void job_message_send_data (JOB_REF_ARG (job), JOB_REF_ARG (src), unsigned int type, void *ptr1, void *ptr2, int int1, long long long1, int payload_ints, const unsigned int *payload, unsigned int flags);
371static inline void job_message_send_empty (JOB_REF_ARG (job), JOB_REF_ARG (src), unsigned int type, unsigned int flags) {
372 job_message_send (JOB_REF_PASS (job), JOB_REF_PASS (src), type, &empty_rwm, 1, 0, NULL, flags, NULL);
373}
374
375#define TL_TRUE 0x3fedd339
376static inline int job_message_answer_true (struct job_message *M) {
377 if (M->src) {
378 job_message_send (JOB_REF_PASS (M->src), JOB_REF_NULL, TL_TRUE, &empty_rwm, 1, M->payload_ints, M->payload, JMC_EXTRACT_ANSWER (M->flags), NULL);
379 }
380 return 1;
381}
382
383static inline int job_message_continuation (job_t job, struct job_message *M, int payload_magic) {
384 if (M->payload_ints >= 1) {
385 assert (M->payload[0] == payload_magic);
386 assert (M->payload_ints == 5);
387 int (*func)(job_t, struct job_message *, void *) = *(void **)(M->payload + 1);
388 void *extra = *(void **)(M->payload + 3);
389 assert (func);
390 return func (job, M, extra);
391 }
392 return 1;
393}
394
395void job_message_queue_free (job_t job);
396void job_message_queue_init (job_t job);
397void job_message_queue_work (job_t job, int (*receive_message)(job_t job, struct job_message *M, void *extra), void *extra, unsigned int mask);
398
399int job_free (JOB_REF_ARG (job));
400job_t job_timer_alloc (int thread_class, double (*alarm)(void *), void *extra);
401
402struct thread_callback {
403 struct thread_callback *next;
404 void (*new_thread)(void);
405};
406
407void register_thread_callback (struct thread_callback *cb);
408job_t alloc_timer_manager (int thread_class);
409
410struct job_message_payload {
411 job_t job;
412 int message_class;
413 int payload_ints;
414 unsigned int payload[0];
415};
416
417static inline struct job_message_payload *job_message_payload_alloc (JOB_REF_ARG (job), int message_class, int payload_ints, unsigned int *payload) {
418 struct job_message_payload *P = malloc (sizeof (*P) + 4 * payload_ints);
419 P->message_class = message_class;
420 P->payload_ints = payload_ints;
421 P->job = PTR_MOVE (job);
422 memcpy (P->payload, payload, 4 * payload_ints);
423 return P;
424}
425
426long long jobs_get_allocated_memoty (void);
427
428unsigned int *payload_continuation_create (unsigned int magic, int (*func)(job_t, struct job_message *, void *extra), void *extra);
429#define PAYLOAD_CONTINUATION(_magic,_func,_extra) 5, payload_continuation_create (_magic, _func, _extra)
430
431extern struct job_thread JobThreads[];
432#define CNCT2(a,b) a ## b
433#define CNCT(a,b) CNCT2(a,b)
434
435#define MODULE_STAT_TYPE struct CNCT(jobs_module_stat_,MODULE)
436#define MODULE_STR(a) MODULE_STR2(a)
437#define MODULE_STR2(a) #a
438#define MODULE_STAT_PREFIX_NAME CNCT(jobs_module_state_prefix_,MODULE)
439#define MODULE_STAT_PREFIX char *MODULE_STAT_PREFIX_NAME
440
441#define MODULE_STAT CNCT(jobs_module_stat_,MODULE)
442#define MODULE_STAT_ARR CNCT(jobs_module_list_stat_,MODULE)
443
444#define MODULE_STAT_FUNCTION int CNCT(MODULE,_prepare_stat) (stats_buffer_t *sb) { \
445 sb_printf (sb, ">>>>>>%s>>>>>>\tstart\n", MODULE_STR(MODULE));
446
447
448#define MODULE_STAT_FUNCTION_END \
449 sb_printf (sb, "<<<<<<%s<<<<<<\tend\n", MODULE_STR(MODULE)); \
450 return sb->pos; }
451
452#define MODULE_INIT \
453 MODULE_STAT_TYPE *MODULE_STAT_ARR[MAX_JOB_THREADS]; \
454 __thread MODULE_STAT_TYPE *MODULE_STAT; \
455 MODULE_STAT_PREFIX; \
456 \
457 void CNCT(jobs_module_thread_init_,MODULE) (void) {\
458 int id = get_this_thread_id ();\
459 assert (id >= 0 && id < MAX_JOB_THREADS);\
460 MODULE_STAT = MODULE_STAT_ARR[id] = calloc (sizeof (MODULE_STAT_TYPE), 1);\
461 } \
462 \
463 struct thread_callback CNCT(MODULE,_thread_callback) = { \
464 .new_thread = CNCT(jobs_module_thread_init_, MODULE), \
465 .next = NULL \
466 }; \
467 void CNCT(jobs_module_register_,MODULE) (void) __attribute__ ((constructor));\
468 void CNCT(jobs_module_register_,MODULE) (void) { \
469 register_thread_callback (&CNCT(MODULE,_thread_callback)); \
470 }
471
472