1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2014-2015 Telegram Messenger Inc
18 2014-2015 Nikolai Durov
19 2014 Andrey Lopatin
20*/
21
22#define _FILE_OFFSET_BITS 64
23#define _XOPEN_SOURCE 500
24#define _GNU_SOURCE 1
25
26#include <assert.h>
27#include <errno.h>
28#include <pthread.h>
29#include <signal.h>
30#include <stddef.h>
31#include <stdio.h>
32#include <stdlib.h>
33#include <string.h>
34#include <time.h>
35#include <unistd.h>
36#include <malloc.h>
37#include <sys/syscall.h>
38#include <math.h>
39#include <linux/futex.h>
40
41#include "common/proc-stat.h"
42#include "crc32.h"
43#include "net/net-events.h"
44//#include "net/net-buffers.h"
45#include "server-functions.h"
46#include "kprintf.h"
47#include "precise-time.h"
48#include "mp-queue.h"
49#include "net/net-connections.h"
50#include "jobs/jobs.h"
51#include "common/common-stats.h"
52
53//#include "auto/engine/engine.h"
54
55#define JOB_SUBCLASS_OFFSET 3
56
57struct job_thread JobThreads[MAX_JOB_THREADS] __attribute__((aligned(128)));
58
59struct job_thread_stat {
60 unsigned long tot_sys;
61 unsigned long tot_user;
62 unsigned long recent_sys;
63 unsigned long recent_user;
64};
65struct job_thread_stat JobThreadsStats[MAX_JOB_THREADS] __attribute__((aligned(128)));
66
67#define MODULE jobs
68
69MODULE_STAT_TYPE {
70 double tot_idle_time, a_idle_time, a_idle_quotient;
71 long long jobs_allocated_memory;
72 int jobs_ran;
73 int job_timers_allocated;
74 double locked_since;
75 long long timer_ops;
76 long long timer_ops_scheduler;
77};
78
79MODULE_INIT
80
81MODULE_STAT_FUNCTION
82 int uptime = time (0) - start_time;
83 double tm = get_utime_monotonic ();
84 double tot_recent_idle[16];
85 double tot_recent_q[16];
86 double tot_idle[16];
87 int tot_threads[16];
88 memset (tot_recent_idle, 0, sizeof (tot_recent_idle));
89 memset (tot_recent_q, 0, sizeof (tot_recent_q));
90 memset (tot_idle, 0, sizeof (tot_idle));
91 memset (tot_threads, 0, sizeof (tot_threads));
92
93 tot_recent_idle[JC_MAIN] = a_idle_time;
94 tot_recent_q[JC_MAIN] = a_idle_quotient;
95 tot_idle[JC_MAIN] = tot_idle_time;
96
97 int i,j;
98 for (i = 0; i < max_job_thread_id + 1; i++) {
99 if (MODULE_STAT_ARR[i]) {
100 assert (JobThreads[i].id == i);
101 int class = JobThreads[i].thread_class & JC_MASK;
102 tot_recent_idle[class] += MODULE_STAT_ARR[i]->a_idle_time;
103 tot_recent_q[class] += MODULE_STAT_ARR[i]->a_idle_quotient;
104 tot_idle[class] += MODULE_STAT_ARR[i]->tot_idle_time;
105 if (MODULE_STAT_ARR[i]->locked_since) {
106 double lt = MODULE_STAT_ARR[i]->locked_since;
107 tot_recent_idle[class] += (tm - lt);
108 tot_recent_q[class] += (tm - lt);
109 tot_idle[class] += (tm - lt);
110 }
111 tot_threads[class] ++;
112 }
113 }
114
115
116 sb_printf (sb, "thread_average_idle_percent\t");
117 for (i = 0; i < 16; i++) {
118 if (i != 0) {
119 sb_printf (sb, " ");
120 if (!(i & 3)) {
121 sb_printf (sb, " ");
122 }
123 }
124 sb_printf (sb, "%.3f", safe_div (tot_idle[i], uptime * tot_threads[i]) * 100);
125 }
126 sb_printf (sb, "\n");
127
128 sb_printf (sb, "thread_recent_idle_percent\t");
129 for (i = 0; i < 16; i++) {
130 if (i != 0) {
131 sb_printf (sb, " ");
132 if (!(i & 3)) {
133 sb_printf (sb, " ");
134 }
135 }
136 sb_printf (sb, "%.3f", safe_div (tot_recent_idle[i], tot_recent_q[i]) * 100);
137 }
138 sb_printf (sb, "\n");
139
140 sb_printf (sb, "tot_threads\t");
141 for (i = 0; i < 16; i++) {
142 if (i != 0) {
143 sb_printf (sb, " ");
144 if (!(i & 3)) {
145 sb_printf (sb, " ");
146 }
147 }
148 sb_printf (sb, "%d", tot_threads[i]);
149 }
150 sb_printf (sb, "\n");
151
152 double jb_cpu_load_u[16];
153 double jb_cpu_load_s[16];
154 double jb_cpu_load_t[16];
155 double jb_cpu_load_ru[16];
156 double jb_cpu_load_rs[16];
157 double jb_cpu_load_rt[16];
158 memset (jb_cpu_load_u, 0, sizeof (jb_cpu_load_u));
159 memset (jb_cpu_load_s, 0, sizeof (jb_cpu_load_u));
160 memset (jb_cpu_load_t, 0, sizeof (jb_cpu_load_u));
161 memset (jb_cpu_load_ru, 0, sizeof (jb_cpu_load_u));
162 memset (jb_cpu_load_rs, 0, sizeof (jb_cpu_load_u));
163 memset (jb_cpu_load_rt, 0, sizeof (jb_cpu_load_u));
164 double tot_cpu_load_u = 0;
165 double tot_cpu_load_s = 0;
166 double tot_cpu_load_t = 0;
167 double tot_cpu_load_ru = 0;
168 double tot_cpu_load_rs = 0;
169 double tot_cpu_load_rt = 0;
170 double max_cpu_load_u = 0;
171 double max_cpu_load_s = 0;
172 double max_cpu_load_t = 0;
173 double max_cpu_load_ru = 0;
174 double max_cpu_load_rs = 0;
175 double max_cpu_load_rt = 0;
176 for (i = 0; i < max_job_thread_id + 1; i++) {
177 if (MODULE_STAT_ARR[i]) {
178 assert (JobThreads[i].id == i);
179 int class = JobThreads[i].thread_class & JC_MASK;
180 jb_cpu_load_u[class] += JobThreadsStats[i].tot_user;
181 jb_cpu_load_s[class] += JobThreadsStats[i].tot_sys;
182 jb_cpu_load_t[class] += JobThreadsStats[i].tot_user + JobThreadsStats[i].tot_sys;
183
184 jb_cpu_load_ru[class] += JobThreadsStats[i].recent_user;
185 jb_cpu_load_rs[class] += JobThreadsStats[i].recent_sys;
186 jb_cpu_load_rt[class] += JobThreadsStats[i].recent_user + JobThreadsStats[i].recent_sys;
187 }
188 }
189 for (i = 0; i < 16; i++) {
190 tot_cpu_load_u += jb_cpu_load_u[i];
191 tot_cpu_load_s += jb_cpu_load_s[i];
192 tot_cpu_load_t += jb_cpu_load_t[i];
193 tot_cpu_load_ru += jb_cpu_load_ru[i];
194 tot_cpu_load_rs += jb_cpu_load_rs[i];
195 tot_cpu_load_rt += jb_cpu_load_rt[i];
196
197 #define max(a,b) (a) > (b) ? (a) : (b)
198 max_cpu_load_u = max (max_cpu_load_u, jb_cpu_load_u[i]);
199 max_cpu_load_s = max (max_cpu_load_s, jb_cpu_load_s[i]);
200 max_cpu_load_t = max (max_cpu_load_t, jb_cpu_load_t[i]);
201 max_cpu_load_ru = max (max_cpu_load_ru, jb_cpu_load_ru[i]);
202 max_cpu_load_rs = max (max_cpu_load_rs, jb_cpu_load_rs[i]);
203 max_cpu_load_rt = max (max_cpu_load_rt, jb_cpu_load_rt[i]);
204 #undef max
205 }
206
207 const double m_clk_to_hs = 100.0 / sysconf (_SC_CLK_TCK); /* hundredth of a second */
208 const double m_clk_to_ts = 0.1 * m_clk_to_hs; /* tenth of a second */
209
210 for (j = 0; j < 6; j++) {
211 double *b = NULL;
212 double d = 0;
213 switch (j) {
214 case 0:
215 sb_printf (sb, "thread_load_average_user\t");
216 b = jb_cpu_load_u;
217 d = uptime;
218 break;
219 case 1:
220 sb_printf (sb, "thread_load_average_sys\t");
221 b = jb_cpu_load_s;
222 d = uptime;
223 break;
224 case 2:
225 sb_printf (sb, "thread_load_average\t");
226 b = jb_cpu_load_t;
227 d = uptime;
228 break;
229 case 3:
230 sb_printf (sb, "thread_load_recent_user\t");
231 b = jb_cpu_load_ru;
232 d = 10;
233 break;
234 case 4:
235 sb_printf (sb, "thread_load_recent_sys\t");
236 b = jb_cpu_load_rs;
237 d = 10;
238 break;
239 case 5:
240 sb_printf (sb, "thread_load_recent\t");
241 b = jb_cpu_load_rt;
242 d = 10;
243 break;
244 default:
245 assert (0);
246 }
247 for (i = 0; i < 16; i++) {
248 if (i != 0) {
249 sb_printf (sb, " ");
250 if (!(i & 3)) {
251 sb_printf (sb, " ");
252 }
253 }
254 sb_printf (sb, "%.3f", safe_div (m_clk_to_hs * b[i], d * tot_threads[i]));
255 }
256 sb_printf (sb, "\n");
257 }
258
259 sb_printf (sb,
260 "load_average_user\t%.3f\n"
261 "load_average_sys\t%.3f\n"
262 "load_average_total\t%.3f\n"
263 "load_recent_user\t%.3f\n"
264 "load_recent_sys\t%.3f\n"
265 "load_recent_total\t%.3f\n"
266 "max_average_user\t%.3f\n"
267 "max_average_sys\t%.3f\n"
268 "max_average_total\t%.3f\n"
269 "max_recent_user\t%.3f\n"
270 "max_recent_sys\t%.3f\n"
271 "max_recent_total\t%.3f\n",
272 safe_div (m_clk_to_hs * tot_cpu_load_u, uptime),
273 safe_div (m_clk_to_hs * tot_cpu_load_s, uptime),
274 safe_div (m_clk_to_hs * tot_cpu_load_t, uptime),
275 m_clk_to_ts * tot_cpu_load_ru,
276 m_clk_to_ts * tot_cpu_load_rs,
277 m_clk_to_ts * tot_cpu_load_rt,
278 safe_div (m_clk_to_hs * max_cpu_load_u, uptime),
279 safe_div (m_clk_to_hs * max_cpu_load_s, uptime),
280 safe_div (m_clk_to_hs * max_cpu_load_t, uptime),
281 m_clk_to_ts * max_cpu_load_ru,
282 m_clk_to_ts * max_cpu_load_rs,
283 m_clk_to_ts * max_cpu_load_rt
284 );
285
286 SB_SUM_ONE_I (job_timers_allocated);
287
288 int jb_running[16], jb_active = 0;
289 long long jb_created = 0;
290 memset (jb_running, 0, sizeof (jb_running));
291 for (i = 1; i <= max_job_thread_id; i++) {
292 struct job_thread *JT = &JobThreads[i];
293 if (JT->status) {
294 jb_active += JT->jobs_active;
295 jb_created += JT->jobs_created;
296 for (j = 0; j <= JC_MAX; j++) {
297 jb_running[j] += JT->jobs_running[j];
298 }
299 }
300 }
301 sb_printf (sb,
302 "jobs_created\t%lld\n"
303 "jobs_active\t%d\n",
304 jb_created,
305 jb_active
306 );
307
308 sb_printf (sb, "jobs_running\t");
309 for (i = 0; i < 16; i++) {
310 if (i != 0) {
311 sb_printf (sb, " ");
312 if (!(i & 3)) {
313 sb_printf (sb, " ");
314 }
315 }
316 sb_printf (sb, "%d", jb_running[i]);
317 }
318 sb_printf (sb, "\n");
319
320 SB_SUM_ONE_LL (jobs_allocated_memory);
321 SB_SUM_ONE_LL (timer_ops);
322 SB_SUM_ONE_LL (timer_ops_scheduler);
323MODULE_STAT_FUNCTION_END
324
325long long jobs_get_allocated_memoty (void) {
326 return SB_SUM_LL (jobs_allocated_memory);
327}
328
329void update_thread_stat (int pid, int tid, int id) {
330 struct proc_stats s;
331 if (!tid) { tid = pid; }
332 read_proc_stats (pid, tid, &s);
333
334 struct job_thread_stat *S = &JobThreadsStats[id];
335
336 S->recent_sys = (s.stime - S->tot_sys);
337 S->recent_user = (s.utime - S->tot_user);
338 S->tot_sys = s.stime;
339 S->tot_user = s.utime;
340}
341
342void update_all_thread_stats (void) {
343 int i;
344 pid_t pid = getpid ();
345 for (i = 1; i <= max_job_thread_id; i++) {
346 update_thread_stat (pid, JobThreads[i].thread_system_id, i);
347 }
348}
349
350void wakeup_main_thread (void) __attribute__ ((weak));
351void wakeup_main_thread (void) {}
352
353#define JOB_THREAD_STACK_SIZE (4 << 20)
354
355#define JTS_CREATED 1
356#define JTS_RUNNING 2
357#define JTS_PERFORMING 4
358
359struct job_class JobClasses[JC_MAX + 1];
360
361int max_job_thread_id;
362int cur_job_threads;
363
364int main_pthread_id_initialized;
365pthread_t main_pthread_id;
366struct job_thread *main_job_thread;
367
368__thread struct job_thread *this_job_thread;
369__thread job_t this_job;
370
371long int lrand48_j (void) {
372 if (this_job_thread) {
373 long int t;
374 lrand48_r (&this_job_thread->rand_data, &t);
375 return t;
376 } else {
377 return lrand48 ();
378 }
379}
380
381long int mrand48_j (void) {
382 if (this_job_thread) {
383 long int t;
384 mrand48_r (&this_job_thread->rand_data, &t);
385 return t;
386 } else {
387 return mrand48 ();
388 }
389}
390
391double drand48_j (void) {
392 if (this_job_thread) {
393 double t;
394 drand48_r (&this_job_thread->rand_data, &t);
395 return t;
396 } else {
397 return drand48 ();
398 }
399}
400
401struct mp_queue MainJobQueue __attribute__((aligned(128)));
402
403static struct thread_callback *jobs_cb_list;
404
405void init_main_pthread_id (void) {
406 pthread_t self = pthread_self ();
407 if (main_pthread_id_initialized) {
408 assert (pthread_equal (main_pthread_id, self));
409 } else {
410 main_pthread_id = self;
411 main_pthread_id_initialized = 1;
412 }
413}
414
415void check_main_thread (void) {
416 pthread_t self = pthread_self ();
417 assert (main_pthread_id_initialized && pthread_equal (main_pthread_id, self));
418}
419
420static void set_job_interrupt_signal_handler (void);
421
422void *job_thread (void *arg);
423void *job_thread_sub (void *arg);
424
425int create_job_thread_ex (int thread_class, void *(*thread_work)(void *)) {
426 assert (!(thread_class & ~JC_MASK));
427 assert (thread_class);
428 assert ((thread_class != JC_MAIN) ^ !cur_job_threads);
429 if (cur_job_threads >= MAX_JOB_THREADS) {
430 return -1;
431 }
432 check_main_thread ();
433
434 struct job_class *JC = &JobClasses[thread_class];
435
436 if (thread_class != JC_MAIN && JC->job_queue == &MainJobQueue) {
437 assert (main_job_thread);
438 JC->job_queue = alloc_mp_queue_w ();
439 main_job_thread->job_class_mask &= ~(1 << thread_class);
440 /*if (max_job_class_threads[thread_class] == 1) {
441 run_pending_main_jobs ();
442 }*/
443 }
444 assert (JC->job_queue);;
445
446 int i;
447 struct job_thread *JT = 0;
448 for (i = 1; i < MAX_JOB_THREADS; i++) {
449 if (!JobThreads[i].status && !JobThreads[i].pthread_id) {
450 JT = &JobThreads[i];
451 break;
452 }
453 }
454 if (!JT) {
455 return -1;
456 }
457 memset (JT, 0, sizeof (struct job_thread));
458 JT->status = JTS_CREATED;
459 JT->thread_class = thread_class;
460 JT->job_class_mask = 1 | (thread_class == JC_MAIN ? 0xffff : (1 << thread_class));
461 JT->job_queue = JC->job_queue;
462 JT->job_class = JC;
463 JT->id = i;
464 assert (JT->job_queue);
465
466 srand48_r (rdtsc () ^ lrand48 (), &JT->rand_data);
467
468
469 if (thread_class != JC_MAIN) {
470 pthread_attr_t attr;
471 pthread_attr_init (&attr);
472 pthread_attr_setstacksize (&attr, JOB_THREAD_STACK_SIZE);
473
474 int r = pthread_create (&JT->pthread_id, &attr, thread_work, (void *) JT);
475
476 pthread_attr_destroy (&attr);
477
478 if (r) {
479 vkprintf (0, "create_job_thread: pthread_create() failed: %s\n", strerror (r));
480 JT->status = 0;
481 return -1;
482 }
483 } else {
484 assert (!main_job_thread);
485 get_this_thread_id ();
486 JT->pthread_id = main_pthread_id;
487 this_job_thread = main_job_thread = JT;
488 set_job_interrupt_signal_handler ();
489 assert (JT->id == 1);
490 }
491
492 if (i > max_job_thread_id) {
493 max_job_thread_id = i;
494 }
495
496 cur_job_threads++;
497 JC->cur_threads ++;
498
499 return i;
500}
501
502int create_job_thread (int thread_class) {
503 struct job_class *JC = &JobClasses[thread_class];
504 return create_job_thread_ex (thread_class, JC->subclasses ? job_thread_sub : job_thread);
505}
506
507int create_job_class_threads (int job_class) {
508 assert (job_class != JC_MAIN);
509 int created = 0;
510 assert (job_class >= 1 && job_class <= JC_MAX);
511
512 struct job_class *JC = &JobClasses[job_class];
513 assert (JC->min_threads <= JC->max_threads);
514 check_main_thread ();
515
516 while (JC->cur_threads < JC->min_threads && cur_job_threads < MAX_JOB_THREADS) {
517 assert (create_job_thread (job_class) >= 0);
518 created++;
519 }
520 return created;
521}
522
523int init_async_jobs (void) {
524 init_main_pthread_id ();
525
526 if (!MainJobQueue.mq_magic) {
527 init_mp_queue_w (&MainJobQueue);
528 int i;
529 for (i = 0; i < JC_MAX + 1; i++) {
530 JobClasses[i].job_queue = &MainJobQueue;
531 }
532 }
533
534 if (!cur_job_threads) {
535 assert (create_job_thread (JC_MAIN) >= 0);
536 }
537
538 /*
539 int i;
540 for (i = 1; i < 16; i++) if (i != JC_MAIN) {
541 create_job_class_threads (i);
542 }*/
543
544 return cur_job_threads;
545}
546
547int create_new_job_class (int job_class, int min_threads, int max_threads) {
548 return create_job_class (job_class, min_threads, max_threads, 1);
549}
550
551int create_new_job_class_sub (int job_class, int min_threads, int max_threads, int subclass_cnt) {
552 return create_job_class_sub(job_class, min_threads, max_threads, 1, subclass_cnt);
553}
554
555int create_job_class (int job_class, int min_threads, int max_threads, int excl) {
556 assert (job_class >= 1 && job_class <= JC_MAX);
557 assert (min_threads >= 0 && max_threads >= min_threads);
558 struct job_class *JC = &JobClasses[job_class];
559 assert (!excl || !JC->min_threads);
560 if (min_threads < JC->min_threads || !JC->min_threads) {
561 JC->min_threads = min_threads;
562 }
563 if (max_threads > JC->max_threads) {
564 JC->max_threads = max_threads;
565 }
566 assert (JC->min_threads <= JC->max_threads);
567 if (MainJobQueue.mq_magic) {
568 return create_job_class_threads (job_class);
569 } else {
570 return 0;
571 }
572}
573
574int create_job_class_sub (int job_class, int min_threads, int max_threads, int excl, int subclass_cnt) {
575 assert (job_class >= 1 && job_class <= JC_MAX);
576 assert (min_threads >= 0 && max_threads >= min_threads);
577
578 struct job_subclass_list *L = calloc (sizeof (*L), 1);
579 L->subclass_cnt = subclass_cnt;
580 L->subclasses = calloc (sizeof (struct job_subclass), subclass_cnt + 2);
581 L->subclasses += 2;
582 int i;
583 for (i = -2; i < subclass_cnt; i++) {
584 L->subclasses[i].job_queue = alloc_mp_queue_w ();
585 L->subclasses[i].subclass_id = i;
586 }
587
588 for (i = 0; i < MAX_SUBCLASS_THREADS; i++) {
589 sem_post (&L->sem);
590 }
591
592 JobClasses[job_class].subclasses = L;
593
594 return create_job_class (job_class, min_threads, max_threads, excl);
595}
596
597/* ------ JOB THREAD CODE -------- */
598
599int try_lock_job (job_t job, int set_flags, int clear_flags) {
600 while (1) {
601 barrier ();
602 int flags = job->j_flags;
603 if (flags & JF_LOCKED) {
604 return 0;
605 }
606 if (__sync_bool_compare_and_swap (&job->j_flags, flags, (flags & ~clear_flags) | set_flags | JF_LOCKED)) {
607 job->j_thread = this_job_thread;
608 return 1;
609 }
610 }
611}
612
613int unlock_job (JOB_REF_ARG (job)) {
614 assert (job->j_thread == this_job_thread);
615 struct job_thread *JT = job->j_thread;
616 int thread_class = JT->thread_class;
617 int save_subclass = job->j_subclass;
618 vkprintf (JOBS_DEBUG, "UNLOCK JOB %p, type %p, flags %08x, status %08x, sigclass %08x, refcnt %d\n", job, job->j_execute, job->j_flags, job->j_status, job->j_sigclass, job->j_refcnt);
619 while (1) {
620 barrier ();
621 assert (job->j_flags & JF_LOCKED);
622 int flags = job->j_flags;
623 int todo = flags & job->j_status & (-1 << 24);
624 if (!todo) /* {{{ */ {
625 int new_flags = flags & ~JF_LOCKED;
626 if (!__sync_bool_compare_and_swap (&job->j_flags, flags, new_flags)) {
627 continue;
628 }
629 if (job->j_refcnt >= 2) {
630 if (__sync_fetch_and_add (&job->j_refcnt, -1) != 1) {
631 return 0;
632 }
633 job->j_refcnt = 1;
634 }
635 assert (job->j_refcnt == 1);
636 vkprintf (JOBS_DEBUG, "DESTROYING JOB %p, type %p, flags %08x\n", job, job->j_execute, job->j_flags);
637 if (job->j_status & JSS_ALLOW (JS_FINISH)) {
638 // send signal 7 (JS_FINISH) if it is allowed
639 job->j_flags |= JFS_SET (JS_FINISH) | JF_LOCKED;
640 continue;
641 } else {
642 assert (0 && "unhandled JS_FINISH\n");
643 MODULE_STAT->jobs_allocated_memory -= sizeof (struct async_job) + job->j_custom_bytes;
644 // complete_job (job);
645 job_free (JOB_REF_PASS (job)); // ???
646 JT->jobs_active --;
647 return -1;
648 }
649 }
650 /* }}} */
651
652 int signo = 7 - __builtin_clz (todo);
653 int req_class = (job->j_sigclass >> (signo*4)) & 15;
654 int is_fast = job->j_status & JSS_FAST (signo);
655 int cur_subclass = job->j_subclass;
656
657 /* {{{ Try to run signal signo */
658 if (((JT->job_class_mask >> req_class) & 1) && (is_fast || !JT->current_job) && (cur_subclass == save_subclass)) {
659 job_t current_job = JT->current_job;
660 __sync_fetch_and_and (&job->j_flags, ~JFS_SET (signo));
661 JT->jobs_running[req_class] ++;
662 JT->current_job = job;
663 JT->status |= JTS_PERFORMING;
664 vkprintf (JOBS_DEBUG, "BEGIN JOB %p, type %p, flags %08x, status %08x, sigclass %08x (signal %d of class %d), refcnt %d\n", job, job->j_execute, job->j_flags, job->j_status, job->j_sigclass, signo, req_class, job->j_refcnt);
665 int custom = job->j_custom_bytes;
666 int res = job->j_execute (job, signo, JT);
667 JT->current_job = current_job;
668 if (!current_job) {
669 JT->status &= ~JTS_PERFORMING;
670 }
671 JT->jobs_running[req_class] --;
672 if (res == JOB_DESTROYED) {
673 MODULE_STAT->jobs_allocated_memory -= sizeof (struct async_job) + custom;
674 vkprintf (JOBS_DEBUG, "JOB %p DESTROYED: RES = %d\n", job, res);
675 JT->jobs_active --;
676 return res;
677 }
678 vkprintf (JOBS_DEBUG, "END JOB %p, type %p, flags %08x, status %08x, sigclass %08x (signal %d of class %d), refcnt %d, %d children: RES = %d\n", job, job->j_execute, job->j_flags, job->j_status, job->j_sigclass, signo, req_class, job->j_refcnt, job->j_children, res);
679 if (res == JOB_ERROR) {
680 kprintf ("fatal: thread %p of class %d: error while invoking method %d of job %p (type %p)\n", JT, thread_class, signo, job, job->j_execute);
681 assert (0 && "unknown job signal");
682 }
683 if (!(res & ~0x1ff)) {
684 if (res & 0xff) {
685 __sync_fetch_and_or (&job->j_flags, res << 24);
686 }
687 if (res & JOB_COMPLETED) {
688 complete_job (job);
689 }
690 }
691 continue;
692 }
693 /* }}} */
694
695 /* {{{ Try to Queue */
696 if (!req_class) {
697 // have a "fast" signal with *-class, put it into MAIN queue
698 req_class = JC_MAIN;
699 }
700 // have to insert job into queue of req_class
701 int queued_flag = JF_QUEUED_CLASS (req_class);
702 int new_flags = (flags | queued_flag) & ~JF_LOCKED;
703 if (!__sync_bool_compare_and_swap (&job->j_flags, flags, new_flags)) {
704 continue;
705 }
706 if (!(flags & queued_flag)) {
707 struct job_class *JC = &JobClasses[req_class];
708 if (!JC->subclasses) {
709 struct mp_queue *JQ = JC->job_queue;
710 assert (JQ);
711 vkprintf (JOBS_DEBUG, "RESCHEDULED JOB %p, type %p, flags %08x, refcnt %d -> Queue %d\n", job, job->j_execute, job->j_flags, job->j_refcnt, req_class);
712 vkprintf (JOBS_DEBUG, "sub=%p\n", JT->job_class->subclasses);
713 mpq_push_w (JQ, PTR_MOVE (job), 0);
714 if (JQ == &MainJobQueue && main_thread_interrupt_status == 1 && __sync_fetch_and_add (&main_thread_interrupt_status, 1) == 1) {
715 //pthread_kill (main_pthread_id, SIGRTMAX - 7);
716 vkprintf (JOBS_DEBUG, "WAKING UP MAIN THREAD\n");
717 wakeup_main_thread ();
718 }
719 } else {
720 assert (job->j_subclass == cur_subclass);
721
722 assert (cur_subclass >= -2);
723 assert (cur_subclass < JC->subclasses->subclass_cnt);
724
725 struct job_subclass *JSC = &JC->subclasses->subclasses[cur_subclass];
726 __sync_fetch_and_add (&JSC->total_jobs, 1);
727
728 vkprintf (JOBS_DEBUG, "RESCHEDULED JOB %p, type %p, flags %08x, refcnt %d -> Queue %d subclass %d\n", job, job->j_execute, job->j_flags, job->j_refcnt, req_class, cur_subclass);
729 mpq_push_w (JSC->job_queue, PTR_MOVE (job), 0);
730
731 struct mp_queue *JQ = JC->job_queue;
732 assert (JQ);
733 mpq_push_w (JQ, (void *)(long)(cur_subclass + JOB_SUBCLASS_OFFSET), 0);
734 }
735 return 1;
736 } else {
737 job_decref (JOB_REF_PASS (job));
738 return 0;
739 }
740 /* }}} */
741 }
742}
743
744// destroys one reference to job; sends signal signo to it
745void job_send_signals (JOB_REF_ARG (job), int sigset) {
746 vkprintf (JOBS_DEBUG, "SENDING SIGNALS %08x to JOB %p, type %p, flags %08x, refcnt %d\n", sigset, job, job->j_execute, job->j_flags, job->j_refcnt);
747 assert (!(sigset & 0xffffff));
748 assert (job->j_refcnt > 0);
749 if ((job->j_flags & sigset) == sigset) {
750 assert (job->j_refcnt > 1 || !(job->j_flags & JFS_SET (JS_FINISH)));
751 job_decref (JOB_REF_PASS (job));
752 return;
753 }
754 if (try_lock_job (job, sigset, 0)) {
755 unlock_job (JOB_REF_PASS (job));
756 return;
757 }
758 __sync_fetch_and_or (&job->j_flags, sigset);
759 if (try_lock_job (job, 0, 0)) {
760 unlock_job (JOB_REF_PASS (job));
761 } else {
762 if (job->j_flags & JF_SIGINT) {
763 assert (job->j_thread);
764 pthread_kill (job->j_thread->pthread_id, SIGRTMAX - 7);
765 }
766 job_decref (JOB_REF_PASS (job));
767 }
768}
769
770// destroys one reference to job; sends signal signo to it
771void job_signal (JOB_REF_ARG (job), int signo) {
772 assert ((unsigned) signo <= 7);
773 job_send_signals (JOB_REF_PASS (job), JFS_SET (signo));
774}
775
776// destroys one reference to job
777void job_decref (JOB_REF_ARG (job)) {
778 if (job->j_refcnt >= 2) {
779 if (__sync_fetch_and_add (&job->j_refcnt, -1) != 1) {
780 return;
781 }
782 job->j_refcnt = 1;
783 }
784 assert (job->j_refcnt == 1);
785 job_signal (JOB_REF_PASS (job), JS_FINISH);
786}
787
788// creates one reference to job
789job_t job_incref (job_t job) {
790 //if (job->j_refcnt == 1) {
791 // job->j_refcnt = 2;
792 //} else {
793 __sync_fetch_and_add (&job->j_refcnt, 1);
794 //}
795 return job;
796}
797
798void process_one_job (JOB_REF_ARG (job), int thread_class) {
799 struct job_thread *JT = this_job_thread;
800 assert (JT);
801 assert (job);
802 int queued_flag = job->j_flags & 0xffff & JT->job_class_mask;
803 if (try_lock_job (job, 0, queued_flag)) {
804 unlock_job (JOB_REF_PASS (job));
805 } else {
806 __sync_fetch_and_and (&job->j_flags, ~queued_flag);
807 if (try_lock_job (job, 0, 0)) {
808 unlock_job (JOB_REF_PASS (job));
809 } else {
810 job_decref (JOB_REF_PASS (job));
811 }
812 }
813}
814
815void complete_subjob (job_t job, JOB_REF_ARG (parent), int status) {
816 if (!parent) {
817 return;
818 }
819 if (parent->j_flags & JF_COMPLETED) {
820 job_decref (JOB_REF_PASS (parent));
821 return;
822 }
823 if (job->j_error && (status & JSP_PARENT_ERROR)) {
824 if (!parent->j_error) {
825 __sync_bool_compare_and_swap (&parent->j_error, 0, job->j_error);
826 }
827 if (status & JSP_PARENT_WAKEUP) {
828 __sync_fetch_and_add (&parent->j_children, -1);
829 }
830 vkprintf (JOBS_DEBUG, "waking up parent %p with JS_ABORT (%d children remaining)\n", parent, parent->j_children);
831 job_signal (JOB_REF_PASS (parent), JS_ABORT);
832 return;
833 }
834 if (status & JSP_PARENT_WAKEUP) {
835 if (__sync_fetch_and_add (&parent->j_children, -1) == 1 && (status & JSP_PARENT_RUN)) {
836 vkprintf (JOBS_DEBUG, "waking up parent %p with JS_RUN\n", parent);
837 job_signal (JOB_REF_PASS (parent), JS_RUN);
838 } else {
839 vkprintf (JOBS_DEBUG, "parent %p: %d children remaining\n", parent, parent->j_children);
840 job_decref (JOB_REF_PASS (parent));
841 }
842 return;
843 }
844 if (status & JSP_PARENT_RUN) {
845 job_signal (JOB_REF_PASS (parent), JS_RUN);
846 return;
847 }
848
849 job_decref (JOB_REF_PASS (parent));
850}
851
852void complete_job (job_t job) {
853 vkprintf (JOBS_DEBUG, "COMPLETE JOB %p, type %p, flags %08x, status %08x, error %d; refcnt=%d; PARENT %p\n", job, job->j_execute, job->j_flags, job->j_status, job->j_error, job->j_refcnt, job->j_parent);
854 assert (job->j_flags & JF_LOCKED);
855 if (job->j_flags & JF_COMPLETED) {
856 return;
857 }
858 __sync_fetch_and_or (&job->j_flags, JF_COMPLETED);
859 job_t parent = PTR_MOVE (job->j_parent);
860 if (!parent) {
861 return;
862 }
863 complete_subjob (job, JOB_REF_PASS (parent), job->j_status);
864}
865
866static void job_interrupt_signal_handler (const int sig) {
867 char buffer[256];
868 if (verbosity >= 2) {
869 kwrite (2, buffer, sprintf (buffer, "SIGRTMAX-7 (JOB INTERRUPT) caught in thread #%d running job %p.\n", this_job_thread ? this_job_thread->id : -1, this_job_thread ? this_job_thread->current_job : 0));
870 }
871}
872
873static void set_job_interrupt_signal_handler (void) {
874 struct sigaction act;
875 sigemptyset (&act.sa_mask);
876 act.sa_flags = 0;
877 act.sa_handler = job_interrupt_signal_handler;
878
879 if (sigaction (SIGRTMAX - 7, &act, NULL) != 0) {
880 kwrite (2, "failed sigaction\n", 17);
881 _exit (EXIT_FAILURE);
882 }
883}
884
885void *job_thread_ex (void *arg, void (*work_one)(void *, int)) {
886 struct job_thread *JT = arg;
887 this_job_thread = JT;
888 assert (JT->thread_class);
889 assert (!(JT->thread_class & ~JC_MASK));
890
891 get_this_thread_id ();
892 JT->thread_system_id = syscall (SYS_gettid);
893
894 set_job_interrupt_signal_handler ();
895
896 struct thread_callback *cb = jobs_cb_list;
897 while (cb) {
898 cb->new_thread ();
899 cb = cb->next;
900 }
901
902 JT->status |= JTS_RUNNING;
903
904 int thread_class = JT->thread_class;
905 struct mp_queue *Q = JT->job_queue;
906 // void **hptr = thread_hazard_pointers;
907
908 if (JT->job_class->max_threads == 1) {
909 JT->timer_manager = alloc_timer_manager (thread_class);
910 }
911
912 int prev_now = 0;
913 long long last_rdtsc = 0;
914 while (1) {
915 void *job = mpq_pop_nw (Q, 4);
916 if (!job) {
917 double wait_start = get_utime_monotonic ();
918 MODULE_STAT->locked_since = wait_start;
919 job = mpq_pop_w (Q, 4);
920 double wait_time = get_utime_monotonic () - wait_start;
921 MODULE_STAT->locked_since = 0;
922 MODULE_STAT->tot_idle_time += wait_time;
923 MODULE_STAT->a_idle_time += wait_time;
924 }
925 long long new_rdtsc = rdtsc ();
926 if (new_rdtsc - last_rdtsc > 1000000) {
927 get_utime_monotonic ();
928
929 now = time (0);
930 if (now > prev_now && now < prev_now + 60) {
931 while (prev_now < now) {
932 MODULE_STAT->a_idle_time *= 100.0 / 101;
933 MODULE_STAT->a_idle_quotient = a_idle_quotient * (100.0/101) + 1;
934 prev_now++;
935 }
936 } else {
937 if (now >= prev_now + 60) {
938 MODULE_STAT->a_idle_time = MODULE_STAT->a_idle_quotient;
939 }
940 prev_now = now;
941 }
942
943 last_rdtsc = new_rdtsc;
944 }
945
946 vkprintf (JOBS_DEBUG, "JOB THREAD #%d (CLASS %d): got job %p\n", JT->id, thread_class, job);
947 work_one (PTR_MOVE (job), thread_class);
948 }
949
950 pthread_exit (0);
951}
952
953static void process_one_sublist (unsigned long id, int class) {
954 struct job_thread *JT = this_job_thread;
955 assert (JT);
956
957 struct job_class *JC = JT->job_class;
958 assert (JC->subclasses);
959
960 struct job_subclass_list *J_SCL = JC->subclasses;
961
962 id -= JOB_SUBCLASS_OFFSET;
963
964 int subclass_id = id;
965
966 assert (subclass_id >= -2);
967 assert (subclass_id < JC->subclasses->subclass_cnt);
968
969 struct job_subclass *J_SC = &J_SCL->subclasses[subclass_id];
970
971 __sync_fetch_and_add (&J_SC->allowed_to_run_jobs, 1);
972
973 if (!__sync_bool_compare_and_swap (&J_SC->locked, 0, 1)) {
974 return;
975 }
976
977 if (subclass_id != -1) {
978 while (sem_wait (&J_SCL->sem) < 0);
979 } else {
980 int i;
981 for (i = 0; i < MAX_SUBCLASS_THREADS; i++) {
982 while (sem_wait (&J_SCL->sem));
983 }
984 }
985
986 while (1) {
987 while (J_SC->processed_jobs < J_SC->allowed_to_run_jobs) {
988 job_t job = mpq_pop_nw (J_SC->job_queue, 4);
989 assert (job);
990
991 process_one_job (JOB_REF_PASS (job), JT->thread_class);
992 J_SC->processed_jobs ++;
993 }
994
995 J_SC->locked = 0;
996
997 __sync_synchronize ();
998
999 if (J_SC->processed_jobs < J_SC->allowed_to_run_jobs &&
1000 __sync_bool_compare_and_swap (&J_SC->locked, 0, 1)) {
1001 continue;
1002 }
1003 break;
1004 }
1005
1006 if (subclass_id != -1) {
1007 while (sem_post (&J_SCL->sem) < 0);
1008 } else {
1009 int i;
1010 for (i = 0; i < MAX_SUBCLASS_THREADS; i++) {
1011 while (sem_post (&J_SCL->sem));
1012 }
1013 }
1014}
1015
1016static void process_one_sublist_gw (void *x, int class) {
1017 process_one_sublist ((long)x, class);
1018}
1019
1020static void process_one_job_gw (void *x, int class) {
1021 process_one_job (JOB_REF_PASS (x), class);
1022}
1023
1024void *job_thread (void *arg) {
1025 return job_thread_ex (arg, process_one_job_gw);
1026}
1027
1028void *job_thread_sub (void *arg) {
1029 return job_thread_ex (arg, process_one_sublist_gw);
1030}
1031
1032int run_pending_main_jobs (void) {
1033 if (!MainJobQueue.mq_magic) {
1034 return -1;
1035 }
1036 struct job_thread *JT = this_job_thread;
1037 assert (JT && JT->thread_class == JC_MAIN);
1038 JT->status |= JTS_RUNNING;
1039
1040 int cnt = 0;
1041 while (1) {
1042 job_t job = mpq_pop_nw (&MainJobQueue, 4);
1043 if (!job) {
1044 break;
1045 }
1046 vkprintf (JOBS_DEBUG, "MAIN THREAD: got job %p\n", job);
1047 process_one_job (JOB_REF_PASS (job), JC_MAIN);
1048 cnt++;
1049 }
1050
1051 JT->status &= ~JTS_RUNNING;
1052 return cnt;
1053}
1054
1055/* ------ JOB CREATION/QUEUEING ------ */
1056
1057void job_change_signals (job_t job, unsigned long long job_signals) {
1058 assert (job->j_flags & JF_LOCKED);
1059
1060 job->j_status = job_signals & 0xffff001f;
1061 job->j_sigclass = (job_signals >> 32);
1062}
1063
1064/* "destroys" one reference to parent_job */
1065job_t create_async_job (job_function_t run_job, unsigned long long job_signals, int job_subclass, int custom_bytes, unsigned long long job_type, JOB_REF_ARG (parent_job)) {
1066 if (parent_job) {
1067 if (job_signals & JSP_PARENT_WAKEUP) {
1068 __sync_fetch_and_add (&parent_job->j_children, 1);
1069 }
1070 }
1071
1072 MODULE_STAT->jobs_allocated_memory += sizeof (struct async_job) + custom_bytes;
1073 struct job_thread *JT = this_job_thread;
1074 assert (JT);
1075 void *p = malloc (sizeof (struct async_job) + custom_bytes + 64);
1076 assert (p);
1077 int align = -((uintptr_t) p) & 63;
1078 job_t job = p + align;
1079 assert (!(((uintptr_t) job) & 63));
1080
1081 job->j_flags = JF_LOCKED;
1082 job->j_status = job_signals & 0xffff001f;
1083 job->j_sigclass = (job_signals >> 32);
1084 job->j_refcnt = 1;
1085 job->j_error = 0;
1086 job->j_children = 0;
1087 job->j_custom_bytes = custom_bytes;
1088 job->j_thread = JT;
1089 job->j_align = align;
1090 job->j_execute = run_job;
1091 job->j_parent = PTR_MOVE (parent_job);
1092 job->j_type = job_type;
1093 job->j_subclass = job_subclass;
1094 memset (job->j_custom, 0, custom_bytes);
1095
1096 JT->jobs_created ++;
1097 JT->jobs_active ++;
1098
1099 if (job_type & JT_HAVE_TIMER) {
1100 job_timer_init (job);
1101 }
1102 if (job_type & JT_HAVE_MSG_QUEUE) {
1103 job_message_queue_init (job);
1104 }
1105
1106 vkprintf (JOBS_DEBUG, "CREATING JOB %p, type %p, flags %08x, status %08x, sigclass %08x; PARENT %p\n", job, run_job, job->j_flags, job->j_status, job->j_sigclass, job->j_parent);
1107
1108 return job;
1109}
1110
1111int schedule_job (JOB_REF_ARG (job)) {
1112 assert (job->j_flags & JF_LOCKED);
1113 job->j_flags |= JFS_SET (JS_RUN);
1114 return unlock_job (JOB_REF_PASS (job));
1115}
1116
1117int job_timer_wakeup_gateway (event_timer_t *et) {
1118 job_t job = (job_t)((char *) et - offsetof (struct async_job, j_custom));
1119 if (et->wakeup_time == et->real_wakeup_time) {
1120 vkprintf (JOBS_DEBUG, "ALARM JOB %p, type %p, flags %08x, status %08x, refcnt %d; PARENT %p\n", job, job->j_execute, job->j_flags, job->j_status, job->j_refcnt, job->j_parent);
1121 job_signal (JOB_REF_PASS (job), JS_ALARM);
1122 } else {
1123 vkprintf (JOBS_DEBUG, "ALARM JOB %p, type %p, flags %08x, status %08x, refcnt %d; PARENT %p. SKIPPED\n", job, job->j_execute, job->j_flags, job->j_status, job->j_refcnt, job->j_parent);
1124 job_decref (JOB_REF_PASS (job));
1125 }
1126 return 0;
1127}
1128
1129/* --------- JOB LIST JOBS --------
1130 (enables several connections or jobs to wait for same job completion)
1131*/
1132
1133struct job_list_job_node {
1134 struct job_list_node *jl_next;
1135 job_list_node_type_t jl_type;
1136 job_t jl_job;
1137 int jl_flags;
1138};
1139
1140struct job_list_params {
1141 event_timer_t timer;
1142 struct job_list_node *first, *last;
1143};
1144
1145int job_list_node_wakeup (job_t list_job, int op, struct job_list_node *w) {
1146 struct job_list_job_node *wj = (struct job_list_job_node *) w;
1147 complete_subjob (list_job, JOB_REF_PASS (wj->jl_job), wj->jl_flags);
1148 free (wj);
1149 return 0;
1150}
1151
1152int process_job_list (job_t job, int op, struct job_thread *JT) {
1153 assert (job->j_custom_bytes == sizeof (struct job_list_params));
1154 struct job_list_params *P = (struct job_list_params *) job->j_custom;
1155 struct job_list_node *w, *wn;
1156 switch (op) {
1157 case JS_FINISH:
1158 assert (job->j_refcnt == 1);
1159 assert (job->j_flags & JF_COMPLETED);
1160 job_timer_remove (job);
1161 return job_free (JOB_REF_PASS (job));
1162 case JS_ABORT:
1163 if (!job->j_error) {
1164 job->j_error = ECANCELED;
1165 }
1166 case JS_ALARM:
1167 if (!job->j_error) {
1168 job->j_error = ETIMEDOUT;
1169 }
1170 default:
1171 case JS_RUN:
1172 assert (!(job->j_flags & JF_COMPLETED));
1173 for (w = P->first; w; w = wn) {
1174 wn = w->jl_next;
1175 w->jl_next = 0;
1176 w->jl_type (job, op, w);
1177 }
1178 P->first = P->last = 0;
1179 job->j_status &= ~(JSS_ALLOW (JS_RUN) | JSS_ALLOW (JS_ABORT));
1180 return JOB_COMPLETED;
1181 }
1182}
1183
1184job_t create_job_list (void) {
1185 job_t job = create_async_job (process_job_list, JSC_ALLOW (JC_ENGINE, JS_RUN) | JSC_ALLOW (JC_ENGINE, JS_ABORT) | JSC_ALLOW (JC_ENGINE, JS_FINISH), 0, sizeof (struct job_list_params), JT_HAVE_TIMER, JOB_REF_NULL);
1186 struct job_list_params *P = (struct job_list_params *) job->j_custom;
1187 P->first = 0;
1188 P->last = 0;
1189 P->timer.wakeup = 0;
1190
1191 unlock_job (JOB_REF_CREATE_PASS (job));
1192 return job;
1193}
1194
1195int insert_node_into_job_list (job_t list_job, struct job_list_node *w) {
1196 assert (list_job->j_execute == process_job_list);
1197 assert (!(list_job->j_flags & (JF_LOCKED | JF_COMPLETED)));
1198 assert (try_lock_job (list_job, 0, 0));
1199 w->jl_next = 0;
1200 struct job_list_params *P = (struct job_list_params *) list_job->j_custom;
1201 if (!P->first) {
1202 P->first = P->last = w;
1203 } else {
1204 P->last->jl_next = w;
1205 P->last = w;
1206 }
1207 unlock_job (JOB_REF_CREATE_PASS (list_job));
1208 return 1;
1209}
1210
1211int insert_job_into_job_list (job_t list_job, JOB_REF_ARG(job), int mode) {
1212 check_thread_class (JC_ENGINE);
1213 if (mode & JSP_PARENT_WAKEUP) {
1214 __sync_fetch_and_add (&job->j_children, 1);
1215 }
1216 struct job_list_job_node *wj = malloc (sizeof (struct job_list_job_node));
1217 assert (wj);
1218 wj->jl_type = job_list_node_wakeup;
1219 wj->jl_job = PTR_MOVE (job);
1220 wj->jl_flags = mode;
1221 return insert_node_into_job_list (list_job, (struct job_list_node *) wj);
1222}
1223
1224int insert_connection_into_job_list (job_t list_job, connection_job_t c) {
1225 assert (0);
1226 return 0;
1227}
1228
1229struct job_timer_manager_extra {
1230 struct mp_queue *mpq;
1231};
1232
1233job_t timer_manager_job;
1234
1235int insert_event_timer (event_timer_t *et);
1236int remove_event_timer (event_timer_t *et);
1237
1238void do_immediate_timer_insert (job_t W) {
1239 MODULE_STAT->timer_ops ++;
1240 struct event_timer *ev = (void *)W->j_custom;
1241 int active = ev->h_idx > 0;
1242
1243 double r = ev->real_wakeup_time;
1244 if (r > 0) {
1245 ev->wakeup_time = r;
1246 insert_event_timer (ev);
1247 assert (ev->wakeup == job_timer_wakeup_gateway);
1248 if (!active) {
1249 job_incref (W);
1250 }
1251 } else {
1252 ev->wakeup_time = 0;
1253 remove_event_timer (ev);
1254 if (active) {
1255 job_decref (JOB_REF_PASS (W));
1256 }
1257 }
1258
1259 if (this_job_thread) {
1260 this_job_thread->wakeup_time = timers_get_first ();
1261 }
1262}
1263
1264int do_timer_manager_job (job_t job, int op, struct job_thread *JT) {
1265 if (op != JS_RUN && op != JS_AUX) {
1266 return JOB_ERROR;
1267 }
1268
1269 if (op == JS_AUX) {
1270 thread_run_timers ();
1271 JT->wakeup_time = timers_get_first ();
1272 return 0;
1273 }
1274
1275 struct job_timer_manager_extra *e = (void *)job->j_custom;
1276
1277 while (1) {
1278 job_t W = mpq_pop_nw (e->mpq, 4);
1279 if (!W) { break; }
1280 do_immediate_timer_insert (W);
1281 job_decref (JOB_REF_PASS (W));
1282 }
1283 return 0;
1284}
1285
1286void jobs_check_all_timers (void) {
1287 int i;
1288 for (i = 1; i <= max_job_thread_id; i++) {
1289 struct job_thread *JT = &JobThreads[i];
1290 if (JT->timer_manager && JT->wakeup_time && JT->wakeup_time <= precise_now) {
1291 job_signal (JOB_REF_CREATE_PASS (JT->timer_manager), JS_AUX);
1292 }
1293 }
1294}
1295
1296job_t alloc_timer_manager (int thread_class) {
1297 if (thread_class == JC_EPOLL && timer_manager_job) {
1298 return job_incref (timer_manager_job);
1299 }
1300 job_t timer_manager = create_async_job (do_timer_manager_job, JSC_ALLOW (thread_class, JS_RUN) | JSC_ALLOW (thread_class, JS_AUX) | JSC_ALLOW (thread_class, JS_FINISH), 0, sizeof (struct job_timer_manager_extra), 0, JOB_REF_NULL);
1301 timer_manager->j_refcnt = 1;
1302 struct job_timer_manager_extra *e = (void *)timer_manager->j_custom;
1303 e->mpq = alloc_mp_queue_w ();
1304 unlock_job (JOB_REF_CREATE_PASS (timer_manager));
1305 if (thread_class == JC_EPOLL) {
1306 timer_manager_job = job_incref (timer_manager);
1307 }
1308 return timer_manager;
1309}
1310
1311int do_timer_job (job_t job, int op, struct job_thread *JT) {
1312 if (op == JS_ALARM) {
1313 if (!job_timer_check (job)) {
1314 return 0;
1315 }
1316
1317 if (job->j_flags & JF_COMPLETED) {
1318 return 0;
1319 }
1320
1321 struct job_timer_info *e = (void *)job->j_custom;
1322 double r = e->wakeup (e->extra);
1323 if (r > 0) {
1324 job_timer_insert (job, r);
1325 } else if (r < 0) {
1326 job_decref (JOB_REF_PASS (job));
1327 }
1328 return 0;
1329 }
1330 if (op == JS_ABORT) {
1331 job_timer_remove (job);
1332 return JOB_COMPLETED;
1333 }
1334 if (op == JS_FINISH) {
1335 MODULE_STAT->job_timers_allocated --;
1336 return job_free (JOB_REF_PASS (job));
1337 }
1338 return JOB_ERROR;
1339}
1340
1341job_t job_timer_alloc (int thread_class, double (*alarm)(void *), void *extra) {
1342 assert (thread_class > 0 && thread_class <= 0xf);
1343 job_t t = create_async_job (do_timer_job, JSC_ALLOW (thread_class, JS_ABORT) | JSC_ALLOW (thread_class, JS_ALARM) | JSIG_FAST (JS_FINISH), 0, sizeof (struct job_timer_info), JT_HAVE_TIMER, JOB_REF_NULL);
1344 t->j_refcnt = 1;
1345 struct job_timer_info *e = (void *)t->j_custom;
1346 e->wakeup = alarm;
1347 e->extra = extra;
1348 unlock_job (JOB_REF_CREATE_PASS (t));
1349 MODULE_STAT->job_timers_allocated ++;
1350 return t;
1351}
1352
1353int job_timer_check (job_t job) {
1354 assert (job->j_type & JT_HAVE_TIMER);
1355 struct event_timer *ev = (void *)job->j_custom;
1356
1357 if (ev->real_wakeup_time == 0 || ev->real_wakeup_time != ev->wakeup_time) {
1358 return 0;
1359 }
1360
1361 job_timer_remove (job);
1362 //ev->real_wakeup_time = 0;
1363 return 1;
1364}
1365
1366void job_timer_insert (job_t job, double timeout) {
1367 assert (job->j_type & JT_HAVE_TIMER);
1368 struct event_timer *ev = (void *)job->j_custom;
1369 //timeout = (ceil (timeout * 1000)) * 0.001;
1370 if (ev->real_wakeup_time == timeout) { return; }
1371 ev->real_wakeup_time = timeout;
1372 if (!ev->wakeup) {
1373 ev->wakeup = job_timer_wakeup_gateway;
1374 }
1375 if (ev->flags & 255) {
1376 if ((this_job_thread && (this_job_thread->id == (ev->flags & 255))) ||
1377 (!this_job_thread && (ev->flags & 255) == 1)) {
1378 do_immediate_timer_insert (job);
1379 return;
1380 }
1381 } else {
1382 if (!this_job_thread || this_job_thread->id == 1) {
1383 ev->flags |= 1;
1384 do_immediate_timer_insert (job);
1385 return;
1386 } else if (this_job_thread->timer_manager) {
1387 ev->flags |= this_job_thread->id;
1388 do_immediate_timer_insert (job);
1389 return;
1390 } else {
1391 ev->flags |= 1;
1392 }
1393 }
1394
1395 assert (ev->flags & 255);
1396 job_t m = NULL;
1397 if ((ev->flags & 255) == 1) {
1398 m = timer_manager_job;
1399 } else {
1400 m = JobThreads[ev->flags & 255].timer_manager;
1401 }
1402 MODULE_STAT->timer_ops_scheduler ++;
1403 assert (m);
1404 struct job_timer_manager_extra *e = (void *)m->j_custom;
1405 mpq_push_w (e->mpq, job_incref (job), 0);
1406 job_signal (JOB_REF_CREATE_PASS (m), JS_RUN);
1407}
1408
1409void job_timer_remove (job_t job) {
1410 assert (job->j_type & JT_HAVE_TIMER);
1411 job_timer_insert (job, 0);
1412}
1413
1414int job_timer_active (job_t job) {
1415 assert (job->j_type & JT_HAVE_TIMER);
1416 return ((struct event_timer *)job->j_custom)->real_wakeup_time > 0;
1417}
1418
1419double job_timer_wakeup_time (job_t job) {
1420 assert (job->j_type & JT_HAVE_TIMER);
1421 return ((struct event_timer *)job->j_custom)->real_wakeup_time;
1422}
1423
1424void job_timer_init (job_t job) {
1425 assert (job->j_type & JT_HAVE_TIMER);
1426 memset ((void *)job->j_custom, 0, sizeof (struct event_timer));
1427}
1428
1429void register_thread_callback (struct thread_callback *cb) {
1430 cb->next = jobs_cb_list;
1431 jobs_cb_list = cb;
1432
1433 cb->new_thread ();
1434}
1435
1436struct job_message_queue *job_message_queue_get (job_t job) {
1437 assert (job->j_type & JT_HAVE_MSG_QUEUE);
1438 struct job_message_queue **q = (job->j_type & JT_HAVE_TIMER) ? sizeof (struct event_timer) + (void *)job->j_custom : (void *)job->j_custom;
1439 return *q;
1440}
1441
1442void job_message_queue_set (job_t job, struct job_message_queue *queue) {
1443 assert (job->j_type & JT_HAVE_MSG_QUEUE);
1444 struct job_message_queue **q = (job->j_type & JT_HAVE_TIMER) ? sizeof (struct event_timer) + (void *)job->j_custom : (void *)job->j_custom;
1445 assert (!*q);
1446 *q = queue;
1447}
1448
1449void job_message_queue_free (job_t job) {
1450 assert (job->j_type & JT_HAVE_MSG_QUEUE);
1451 struct job_message_queue **q = (job->j_type & JT_HAVE_TIMER) ? sizeof (struct event_timer) + (void *)job->j_custom : (void *)job->j_custom;
1452 struct job_message_queue *Q = *q;
1453 if (Q) {
1454 struct job_message *M;
1455 while (Q->first) {
1456 M = Q->first;
1457 Q->first = M->next;
1458 if (M->src) {
1459 job_decref (JOB_REF_PASS (M->src));
1460 }
1461 if (M->message.magic) {
1462 rwm_free (&M->message);
1463 }
1464 free (M);
1465 }
1466 assert (!Q->first);
1467 Q->last = NULL;
1468
1469 while ((M = mpq_pop_nw (Q->unsorted, 4))) {
1470 if (M->src) {
1471 job_decref (JOB_REF_PASS (M->src));
1472 }
1473 if (M->message.magic) {
1474 rwm_free (&M->message);
1475 }
1476 free (M);
1477 }
1478 free_mp_queue ((*q)->unsorted);
1479 free (*q);
1480 }
1481 *q = NULL;
1482}
1483
1484void job_message_queue_init (job_t job) {
1485 struct job_message_queue *q = calloc (sizeof (*q), 1);
1486 q->unsorted = alloc_mp_queue_w ();
1487 job_message_queue_set (job, q);
1488}
1489
1490void job_message_free_default (struct job_message *M) {
1491 if (M->src) {
1492 job_decref (JOB_REF_PASS (M->src));
1493 }
1494 if (M->message.magic) {
1495 rwm_free (&M->message);
1496 }
1497 free (M);
1498}
1499
1500void job_message_send (JOB_REF_ARG (job), JOB_REF_ARG (src), unsigned int type, struct raw_message *raw, int dup, int payload_ints, const unsigned int *payload, unsigned int flags, void (*destroy)(struct job_message *)) {
1501 assert (job->j_type & JT_HAVE_MSG_QUEUE);
1502 struct job_message *M = malloc (sizeof (*M) + payload_ints * 4);
1503 M->type = type;
1504 M->flags = 0;
1505 M->src = PTR_MOVE (src);
1506 M->payload_ints = payload_ints;
1507 M->next = NULL;
1508 M->flags = flags;
1509 M->destructor = destroy;
1510 memcpy (M->payload, payload, payload_ints * 4);
1511 (dup ? rwm_clone : rwm_move) (&M->message, raw);
1512
1513 struct job_message_queue *q = job_message_queue_get (job);
1514 mpq_push_w (q->unsorted, M, 0);
1515
1516 job_signal (JOB_REF_PASS (job), JS_MSG);
1517}
1518/*
1519void job_message_send_data (JOB_REF_ARG (job), JOB_REF_ARG (src), unsigned int type, void *ptr1, void *ptr2, int int1, long long long1, int payload_ints, const unsigned int *payload, unsigned int flags) {
1520 assert (job->j_type & JT_HAVE_MSG_QUEUE);
1521 struct job_message *M = malloc (sizeof (*M) + payload_ints * 4);
1522 M->type = type;
1523 M->flags = 0;
1524 M->src = PTR_MOVE (src);
1525 M->payload_ints = payload_ints;
1526 M->next = NULL;
1527 M->flags = flags;
1528 memcpy (M->payload, payload, payload_ints * 4);
1529 M->message_ptr1 = ptr1;
1530 M->message_ptr2 = ptr2;
1531 M->message_int1 = int1;
1532 M->message_long1 = long1;
1533 M->message_magic = 0;
1534
1535 struct job_message_queue *q = job_message_queue_get (job);
1536 mpq_push_w (q->unsorted, M, 0);
1537
1538 job_signal (JOB_REF_PASS (job), JS_RUN);
1539}*/
1540
1541void job_message_send_fake (JOB_REF_ARG (job), int (*receive_message)(job_t job, struct job_message *M, void *extra), void *extra, JOB_REF_ARG (src), unsigned int type, struct raw_message *raw, int dup, int payload_ints, const unsigned int *payload, unsigned int flags, void (*destroy)(struct job_message *)) {
1542 assert (job->j_type & JT_HAVE_MSG_QUEUE);
1543 struct job_message *M = malloc (sizeof (*M) + payload_ints * 4);
1544 M->type = type;
1545 M->flags = 0;
1546 M->src = PTR_MOVE (src);
1547 M->payload_ints = payload_ints;
1548 M->next = NULL;
1549 M->flags = flags;
1550 M->destructor = destroy;
1551 memcpy (M->payload, payload, payload_ints * 4);
1552 (dup ? rwm_clone : rwm_move) (&M->message, raw);
1553
1554 int r = receive_message (job, M, extra);
1555 if (r == 1) {
1556 job_message_free_default (M);
1557 } else if (r == 2) {
1558 if (M->destructor) {
1559 M->destructor (M);
1560 } else {
1561 job_message_free_default (M);
1562 }
1563 }
1564 job_decref (JOB_REF_PASS (job));
1565}
1566
1567void job_message_queue_work (job_t job, int (*receive_message)(job_t job, struct job_message *M, void *extra), void *extra, unsigned int mask) {
1568 assert (job->j_type & JT_HAVE_MSG_QUEUE);
1569 struct job_message_queue *q = job_message_queue_get (job);
1570
1571 while (1) {
1572 struct job_message *msg = mpq_pop_nw (q->unsorted, 4);
1573 if (!msg) { break; }
1574 msg->next = NULL;
1575 if (q->last) {
1576 q->last->next = msg;
1577 q->last = msg;
1578 } else {
1579 q->last = q->first = msg;
1580 }
1581 }
1582
1583 struct job_message *last = NULL;
1584 struct job_message **ptr = &q->first;
1585 int stop = 0;
1586 while (*ptr && !stop) {
1587 struct job_message *M = *ptr;
1588 unsigned int type = M->flags & JMC_TYPE_MASK;
1589 assert (type);
1590 if (mask & (1 << type)) {
1591 struct job_message *next = M->next;
1592 M->next = NULL;
1593
1594 int r;
1595 if (type & JMC_CONTINUATION) {
1596 assert (q->payload_magic);
1597 r = job_message_continuation (job, M, q->payload_magic);
1598 } else {
1599 r = receive_message (job, M, extra);
1600 }
1601
1602 if (r < 0) {
1603 stop = 1;
1604 } else if (r == 1) {
1605 job_message_free_default (M);
1606 } else if (r == 2) {
1607 if (M->destructor) {
1608 M->destructor (M);
1609 } else {
1610 job_message_free_default (M);
1611 }
1612 }
1613 *ptr = next;
1614 if (q->last == M) {
1615 q->last = last;
1616 }
1617 } else {
1618 last = M;
1619 ptr = &last->next;
1620 }
1621 }
1622}
1623
1624unsigned int *payload_continuation_create (unsigned int magic, int (*func)(job_t, struct job_message *, void *extra), void *extra) {
1625 static __thread unsigned int payload_data[5];
1626 payload_data[0] = magic;
1627 *(void **)(payload_data + 1) = func;
1628 *(void **)(payload_data + 3) = extra;
1629 return payload_data;
1630}
1631
1632int job_free (JOB_REF_ARG (job)) {
1633 if (job->j_type & JT_HAVE_MSG_QUEUE) {
1634 job_message_queue_free (job);
1635 }
1636 free (((void *)job) - job->j_align);
1637 return JOB_DESTROYED;
1638}
1639
1640struct notify_job_subscriber {
1641 struct notify_job_subscriber *next;
1642 job_t job;
1643};
1644
1645struct notify_job_extra {
1646 struct job_message_queue *message_queue;
1647 int result;
1648 struct notify_job_subscriber *first, *last;
1649};
1650
1651#define TL_ENGINE_NOTIFICATION_SUBSCRIBE 0x8934a894
1652
1653static int notify_job_receive_message (job_t NJ, struct job_message *M, void *extra) {
1654 struct notify_job_extra *N = (void *)NJ->j_custom;
1655 switch (M->type) {
1656 case TL_ENGINE_NOTIFICATION_SUBSCRIBE:
1657 if (N->result) {
1658 complete_subjob (NJ, JOB_REF_PASS (M->src), JSP_PARENT_RWE);
1659 } else {
1660 struct notify_job_subscriber *S = malloc (sizeof (*S));
1661 S->job = PTR_MOVE (M->src);
1662 S->next = NULL;
1663 if (N->last) {
1664 N->last->next = S;
1665 N->last = S;
1666 } else {
1667 N->last = N->first = S;
1668 }
1669 }
1670 return 1;
1671 default:
1672 kprintf ("%s: unknown message type 0x%08x\n", __func__, M->type);
1673 assert (0);
1674 return 1;
1675 }
1676}
1677
1678static int notify_job_run (job_t NJ, int op, struct job_thread *JT) {
1679 if (op == JS_MSG) {
1680 job_message_queue_work (NJ, notify_job_receive_message, NULL, 0xffffff);
1681 return 0;
1682 }
1683 if (op == JS_RUN || op == JS_ABORT) {
1684 struct notify_job_extra *N = (void *)NJ->j_custom;
1685 while (N->first) {
1686 struct notify_job_subscriber *S = N->first;
1687 N->first = S->next;
1688 if (!N->first) {
1689 N->last = NULL;
1690 }
1691
1692 complete_subjob (NJ, JOB_REF_PASS (S->job), JSP_PARENT_RWE);
1693 free (S);
1694 }
1695 return 0;
1696 }
1697 if (op == JS_FINISH) {
1698 return job_free (JOB_REF_PASS (NJ));
1699 }
1700
1701 return JOB_ERROR;
1702}
1703
1704job_t notify_job_create (int sig_class) {
1705 return create_async_job (notify_job_run, JSC_ALLOW (sig_class, JS_RUN) | JSC_ALLOW (sig_class, JS_ABORT) | JSC_ALLOW (sig_class, JS_MSG) | JSC_ALLOW (sig_class, JS_FINISH), 0, sizeof (struct notify_job_extra), JT_HAVE_MSG_QUEUE, JOB_REF_NULL);
1706}
1707