1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2014-2018 Telegram Messenger Inc
18 2014-2015 Andrey Lopatin
19 2014-2018 Nikolai Durov
20*/
21
22#define _FILE_OFFSET_BITS 64
23
24#include <assert.h>
25#include <errno.h>
26#include <pthread.h>
27#include <signal.h>
28#include <stddef.h>
29#include <stdio.h>
30#include <stdlib.h>
31#include <string.h>
32#include <time.h>
33#include <unistd.h>
34
35#include <linux/futex.h>
36#include <sys/syscall.h>
37
38#include "server-functions.h"
39#include "kprintf.h"
40#include "precise-time.h"
41#include "mp-queue.h"
42#include "jobs/jobs.h"
43#include "common/common-stats.h"
44
45volatile int mpq_blocks_allocated, mpq_blocks_allocated_max, mpq_blocks_allocations, mpq_blocks_true_allocations, mpq_blocks_wasted, mpq_blocks_prepared;
46volatile int mpq_small_blocks_allocated, mpq_small_blocks_allocated_max;
47
48__thread int mpq_this_thread_id;
49__thread void **thread_hazard_pointers;
50volatile int mpq_threads;
51
52struct mp_queue MqGarbageBlocks, MqPreparedBlocks;
53struct mp_queue MqGarbageSmallBlocks, MqPreparedSmallBlocks;
54
55#define MODULE mp_queue
56
57MODULE_STAT_TYPE {
58 int mpq_active;
59 int mpq_allocated;
60};
61
62MODULE_INIT
63
64MODULE_STAT_FUNCTION
65 SBP_PRINT_I32 (mpq_blocks_allocated);
66 SBP_PRINT_I32 (mpq_blocks_allocated_max);
67 SBP_PRINT_I32 (mpq_blocks_allocations);
68 SBP_PRINT_I32 (mpq_blocks_true_allocations);
69 SBP_PRINT_I32 (mpq_blocks_wasted);
70 SBP_PRINT_I32 (mpq_blocks_prepared);
71 SBP_PRINT_I32 (mpq_small_blocks_allocated);
72 SBP_PRINT_I32 (mpq_small_blocks_allocated_max);
73 SB_SUM_ONE_I (mpq_active);
74 SB_SUM_ONE_I (mpq_allocated);
75MODULE_STAT_FUNCTION_END
76
77/* hazard pointers, one per thread */
78
79void *mqb_hazard_ptr[MAX_MPQ_THREADS][THREAD_HPTRS] __attribute__ ((aligned(64)));
80
81int is_hazard_ptr (void *ptr, int a, int b) {
82 barrier ();
83 int k = mpq_threads, q = mpq_this_thread_id;
84 barrier ();
85 int i, j, r = 0;
86 for (j = a; j <= b; j++) {
87 if (mqb_hazard_ptr[q][j] == ptr) {
88 r = 1;
89 break;
90 }
91 }
92 for (i = 1; i <= k; i++) {
93 if (i == q) {
94 continue;
95 }
96 for (j = a; j <= b; j++) {
97 if (mqb_hazard_ptr[i][j] == ptr) {
98 barrier ();
99 return r + 2;
100 }
101 }
102 }
103 barrier ();
104 return r;
105}
106
107/* initialize this thread id and return it */
108int get_this_thread_id (void) {
109 int i = mpq_this_thread_id;
110 if (i) {
111 return i;
112 }
113 i = __sync_fetch_and_add (&mpq_threads, 1) + 1;
114 assert (i > 0 && i < MAX_MPQ_THREADS);
115 thread_hazard_pointers = mqb_hazard_ptr[i];
116 return mpq_this_thread_id = i;
117}
118
119/* custom semaphore implementation using futexes */
120
121int mp_sem_post (mp_sem_t *sem) {
122 __sync_fetch_and_add (&sem->value, 1);
123 if (sem->waiting > 0) {
124 syscall (__NR_futex, &sem->value, FUTEX_WAKE, 1, NULL, 0, 0);
125 }
126 return 0;
127}
128
129int mp_sem_wait (mp_sem_t *sem) {
130 int v = sem->value, q = 0;
131 while (1) {
132 if (v > 0) {
133 v = __sync_fetch_and_add (&sem->value, -1);
134 if (v > 0) {
135 return 0;
136 }
137 v = __sync_add_and_fetch (&sem->value, 1);
138 } else {
139 if (v < 0 && q++ < 10) {
140 barrier ();
141 v = sem->value;
142 continue;
143 }
144 __sync_fetch_and_add (&sem->waiting, 1);
145 syscall (__NR_futex, &sem->value, FUTEX_WAIT, v, NULL, 0, 0);
146 __sync_fetch_and_add (&sem->waiting, -1);
147 v = sem->value;
148 q = 0;
149 }
150 }
151}
152
153int mp_sem_trywait (mp_sem_t *sem) {
154 int v = sem->value;
155 if (v > 0) {
156 v = __sync_fetch_and_add (&sem->value, -1);
157 if (v > 0) {
158 return 0;
159 }
160 __sync_fetch_and_add (&sem->value, 1);
161 }
162 return -1;
163}
164
165/* functions for one mp_queue_block */
166
167// may invoke mpq_pop()/mpq_push() if allow_recursion=1
168struct mp_queue_block *alloc_mpq_block (mqn_value_t first_val, int allow_recursion, int is_small) {
169 struct mp_queue_block *QB = 0;
170 int prepared = 0, align_bytes = 0;
171 long size = (is_small ? MPQ_SMALL_BLOCK_SIZE : MPQ_BLOCK_SIZE);
172 if (allow_recursion) {
173 QB = mpq_pop (is_small ? &MqGarbageSmallBlocks : &MqGarbageBlocks, MPQF_RECURSIVE);
174 if (QB) {
175 if (!is_hazard_ptr (QB, 0, 2)) {
176 // reclaiming garbage
177 assert (QB->mqb_magic == MQ_BLOCK_GARBAGE_MAGIC);
178 __sync_fetch_and_add (&mpq_blocks_wasted, -1);
179 align_bytes = QB->mqb_align_bytes;
180 } else {
181 mpq_push (is_small ? &MqGarbageSmallBlocks : &MqGarbageBlocks, QB, MPQF_RECURSIVE);
182 QB = 0;
183 }
184 }
185 if (!QB) {
186 QB = mpq_pop (is_small ? &MqPreparedSmallBlocks : &MqPreparedBlocks, MPQF_RECURSIVE);
187 if (QB) {
188 assert (QB->mqb_magic == MQ_BLOCK_PREPARED_MAGIC);
189 prepared = 1;
190 __sync_fetch_and_add (&mpq_blocks_prepared, -1);
191 align_bytes = QB->mqb_align_bytes;
192 }
193 }
194 }
195 if (!QB) {
196 char *new_block = malloc (offsetof (struct mp_queue_block, mqb_nodes) + size * (2 * sizeof (void *)) + MPQ_BLOCK_ALIGNMENT - sizeof (void *));
197 assert (new_block);
198 assert (!((long) new_block & (sizeof (void *) - 1)));
199 align_bytes = -(int)(long) new_block & (MPQ_BLOCK_ALIGNMENT - 1);
200 QB = (struct mp_queue_block *) (new_block + align_bytes);
201
202 __sync_fetch_and_add (&mpq_blocks_true_allocations, 1);
203 if (is_small) {
204 int t = __sync_fetch_and_add (&mpq_small_blocks_allocated, 1);
205 if (t >= mpq_small_blocks_allocated_max) {
206 __sync_bool_compare_and_swap (&mpq_small_blocks_allocated_max, mpq_small_blocks_allocated_max, t + 1);
207 }
208 } else {
209 int t = __sync_fetch_and_add (&mpq_blocks_allocated, 1);
210 if (t >= mpq_blocks_allocated_max) {
211 __sync_bool_compare_and_swap (&mpq_blocks_allocated_max, mpq_blocks_allocated_max, t + 1);
212 }
213 }
214 } else {
215 assert (QB->mqb_size == size);
216 }
217 __sync_fetch_and_add (&mpq_blocks_allocations, 1);
218
219 memset (QB, 0, offsetof (struct mp_queue_block, mqb_nodes));
220 QB->mqb_align_bytes = align_bytes;
221 QB->mqb_size = size;
222
223 QB->mqb_nodes[0].idx = MQN_SAFE;
224 QB->mqb_nodes[0].val = first_val;
225
226 if (!prepared) {
227 long i;
228 for (i = 1; i < size; i++) {
229 QB->mqb_nodes[i].idx = MQN_SAFE + i;
230 QB->mqb_nodes[i].val = 0;
231 }
232 }
233
234 if (first_val) {
235 QB->mqb_tail = 1;
236 }
237
238 QB->mqb_magic = MQ_BLOCK_USED_MAGIC;
239 return QB;
240}
241
242void free_mpq_block (struct mp_queue_block *QB) {
243 assert (QB->mqb_magic == MQ_BLOCK_USED_MAGIC);
244 assert ((unsigned) QB->mqb_align_bytes < MPQ_BLOCK_ALIGNMENT && !(QB->mqb_align_bytes & (sizeof (void *) - 1)));
245 QB->mqb_magic = MQ_BLOCK_FREE_MAGIC;
246 if (QB->mqb_size == MPQ_SMALL_BLOCK_SIZE) {
247 __sync_fetch_and_add (&mpq_small_blocks_allocated, -1);
248 } else {
249 assert (QB->mqb_size == MPQ_BLOCK_SIZE);
250 __sync_fetch_and_add (&mpq_blocks_allocated, -1);
251 }
252 free ((char *) QB - QB->mqb_align_bytes);
253}
254
255static inline void mpq_fix_state (struct mp_queue_block *QB) {
256 long h, t;
257 while (1) {
258 barrier();
259 t = QB->mqb_tail;
260 barrier();
261 h = QB->mqb_head;
262 barrier();
263 if ((unsigned long) h <= (unsigned long) t) {
264 break;
265 }
266 if (QB->mqb_tail != t) {
267 continue;
268 }
269 // here tail < head ; try to advance tail to head
270 // (or to some value h such that tail < h <= head)
271 if (__sync_bool_compare_and_swap (&QB->mqb_tail, t, h)) {
272 break;
273 }
274 }
275}
276
277mqn_value_t mpq_block_pop (struct mp_queue_block *QB) {
278 // fprintf (stderr, "%d:mpq_block_pop(%p)\n", mpq_this_thread_id, QB);
279 long size = QB->mqb_size;
280 while (1) {
281 long h = __sync_fetch_and_add (&QB->mqb_head, 1);
282 // fprintf (stderr, "%d: mpq_block_pop(%ld)\n", mpq_this_thread_id, h);
283 mpq_node_t *node = &QB->mqb_nodes[h & (size - 1)];
284 while (1) {
285 mpq_node_t d, e;
286 barrier();
287 mqn_value_t val = node->val;
288 barrier();
289 long safe_idx = node->idx;
290 barrier();
291 long idx = safe_idx & MQN_IDX_MASK;
292 if (idx > h) {
293 break;
294 }
295 d.val = val;
296 d.idx = safe_idx;
297 if (val) {
298 if (idx == h) {
299 e.idx = safe_idx + size;
300 e.val = 0;
301 if (__sync_bool_compare_and_swap (&node->pair, d.pair, e.pair)) {
302 // fprintf (stderr, "%d: mpq_block_pop(%ld) -> %lx\n", mpq_this_thread_id, h, (long) val);
303 return val;
304 }
305 } else {
306 e.val = val;
307 e.idx = idx; // clear 'safe' flag
308 if (__sync_bool_compare_and_swap (&node->pair, d.pair, e.pair)) {
309 break;
310 }
311 }
312 } else {
313 e.idx = (safe_idx & MQN_SAFE) + h + size;
314 e.val = 0;
315 if (__sync_bool_compare_and_swap (&node->pair, d.pair, e.pair)) {
316 break;
317 }
318 }
319 /* somebody changed this element while we were inspecting it, make another loop iteration */
320 }
321 barrier();
322 long t = QB->mqb_tail & MQN_IDX_MASK;
323 barrier();
324 if (t <= h + 1) {
325 mpq_fix_state (QB);
326 return 0;
327 }
328 /* now try again with a new value of h */
329 }
330}
331
332long mpq_block_push (struct mp_queue_block *QB, mqn_value_t val) {
333 int iterations = 0;
334 long size = QB->mqb_size;
335 // fprintf (stderr, "%d:mpq_block_push(%p)\n", mpq_this_thread_id, QB);
336 while (1) {
337 long t = __sync_fetch_and_add (&QB->mqb_tail, 1);
338 // fprintf (stderr, "%d: mpq_block_push(%ld)\n", mpq_this_thread_id, t);
339 if (t & MQN_SAFE) {
340 return -1L; // bad luck
341 }
342 mpq_node_t *node = &QB->mqb_nodes[t & (size - 1)];
343 barrier();
344 mqn_value_t old_val = node->val;
345 barrier();
346 long safe_idx = node->idx;
347 barrier();
348 long idx = safe_idx & MQN_IDX_MASK;
349 if (!old_val && idx <= t && ((safe_idx & MQN_SAFE) || QB->mqb_head <= t)) {
350 mpq_node_t d, e;
351 d.idx = safe_idx;
352 d.val = 0;
353 e.idx = MQN_SAFE + t;
354 e.val = val;
355 if (__sync_bool_compare_and_swap (&node->pair, d.pair, e.pair)) {
356 // fprintf (stderr, "%d: mpq_block_push(%ld) <- %lx\n", mpq_this_thread_id, t, (long) val);
357 return t; // pushed OK
358 }
359 }
360 barrier ();
361 long h = QB->mqb_head;
362 barrier ();
363 if (t - h >= size || ++iterations > 10) {
364 __sync_fetch_and_or (&QB->mqb_tail, MQN_SAFE); // closing queue
365 return -1L; // bad luck
366 }
367 }
368}
369
370/* functions for mp_queue = list of mp_queue_block's */
371void init_mp_queue (struct mp_queue *MQ) {
372 assert (MQ->mq_magic != MQ_MAGIC && MQ->mq_magic != MQ_MAGIC_SEM);
373 memset (MQ, 0, sizeof (struct mp_queue));
374 MQ->mq_head = MQ->mq_tail = alloc_mpq_block (0, 0, 1);
375 MQ->mq_magic = MQ_MAGIC;
376
377 if (!MqGarbageBlocks.mq_magic) {
378 init_mp_queue (&MqGarbageBlocks);
379 init_mp_queue (&MqGarbageSmallBlocks);
380 } else if (!MqPreparedBlocks.mq_magic) {
381 init_mp_queue (&MqPreparedBlocks);
382 init_mp_queue (&MqPreparedSmallBlocks);
383 }
384}
385
386void init_mp_queue_w (struct mp_queue *MQ) {
387 init_mp_queue (MQ);
388 MODULE_STAT->mpq_active ++;
389#if MPQ_USE_POSIX_SEMAPHORES
390 sem_init (&MQ->mq_sem, 0, 0);
391#endif
392 MQ->mq_magic = MQ_MAGIC_SEM;
393}
394
395struct mp_queue *alloc_mp_queue (void) {
396 struct mp_queue *MQ = NULL;
397 assert (!posix_memalign ((void **)&MQ, 64, sizeof (*MQ)));
398 memset (MQ, 0, sizeof (*MQ));
399 init_mp_queue (MQ);
400 return MQ;
401}
402
403struct mp_queue *alloc_mp_queue_w (void) {
404 struct mp_queue *MQ = NULL;
405 assert (!posix_memalign ((void **)&MQ, 64, sizeof (*MQ)));
406 memset (MQ, 0, sizeof (*MQ));
407 MODULE_STAT->mpq_allocated ++;
408 init_mp_queue_w (MQ);
409 return MQ;
410}
411
412/* invoke only if sure that nobody else may be using this mp_queue in parallel */
413void clear_mp_queue (struct mp_queue *MQ) {
414 MODULE_STAT->mpq_active --;
415 assert (MQ->mq_magic == MQ_MAGIC || MQ->mq_magic == MQ_MAGIC_SEM);
416 assert (MQ->mq_head && MQ->mq_tail);
417 struct mp_queue_block *QB = MQ->mq_head, *QBN;
418 for (QB = MQ->mq_head; QB; QB = QBN) {
419 QBN = QB->mqb_next;
420 assert (QB->mqb_next || QB == MQ->mq_tail);
421 QB->mqb_next = 0;
422 free_mpq_block (QB);
423 }
424 MQ->mq_head = MQ->mq_tail = 0;
425 MQ->mq_magic = 0;
426}
427
428void free_mp_queue (struct mp_queue *MQ) {
429 MODULE_STAT->mpq_allocated --;
430 clear_mp_queue (MQ);
431 free (MQ);
432}
433
434// may invoke mpq_push() to discard new empty block
435mqn_value_t mpq_pop (struct mp_queue *MQ, int flags) {
436 void **hptr = &mqb_hazard_ptr[get_this_thread_id()][0];
437 long r = ((flags & MPQF_RECURSIVE) != 0);
438 struct mp_queue_block *QB;
439 mqn_value_t v;
440 while (1) {
441 QB = MQ->mq_head;
442 barrier ();
443 hptr[r] = QB;
444 barrier ();
445 __sync_synchronize ();
446 if (MQ->mq_head != QB) {
447 continue;
448 }
449
450 v = mpq_block_pop (QB);
451 if (v) {
452 break;
453 }
454 barrier ();
455 if (!QB->mqb_next) {
456 QB = 0;
457 break;
458 }
459 v = mpq_block_pop (QB);
460 if (v) {
461 break;
462 }
463 if (__sync_bool_compare_and_swap (&MQ->mq_head, QB, QB->mqb_next)) {
464 // want to free QB here, but this is complicated if somebody else holds a pointer
465 if (is_hazard_ptr (QB, 0, 2) <= 1) {
466 free_mpq_block (QB);
467 } else {
468 __sync_fetch_and_add (&mpq_blocks_wasted, 1);
469 // ... put QB into some GC queue? ...
470 QB->mqb_magic = MQ_BLOCK_GARBAGE_MAGIC;
471 mpq_push (QB->mqb_size == MPQ_SMALL_BLOCK_SIZE ? &MqGarbageSmallBlocks : &MqGarbageBlocks, QB, flags & MPQF_RECURSIVE);
472 }
473 }
474 }
475 if (flags & MPQF_STORE_PTR) {
476 hptr[2] = QB;
477 }
478 hptr[r] = 0;
479 return v;
480}
481
482/* 1 = definitely empty (for some serialization), 0 = possibly non-empty;
483 may invoke mpq_push() to discard empty block */
484int mpq_is_empty (struct mp_queue *MQ) {
485 void **hptr = &mqb_hazard_ptr[get_this_thread_id()][0];
486 struct mp_queue_block *QB;
487 while (1) {
488 QB = MQ->mq_head;
489 barrier ();
490 *hptr = QB;
491 barrier ();
492 __sync_synchronize ();
493 if (MQ->mq_head != QB) {
494 continue;
495 }
496 barrier();
497 long h = QB->mqb_head;
498 barrier();
499 long t = QB->mqb_tail;
500 barrier();
501 if (!(t & MQN_SAFE)) {
502 *hptr = 0;
503 return t <= h;
504 }
505 t &= MQN_IDX_MASK;
506 if (t > h) {
507 *hptr = 0;
508 return 0;
509 }
510 barrier ();
511 if (!QB->mqb_next) {
512 *hptr = 0;
513 return 1;
514 }
515 if (__sync_bool_compare_and_swap (&MQ->mq_head, QB, QB->mqb_next)) {
516 // want to free QB here, but this is complicated if somebody else holds a pointer
517 if (is_hazard_ptr (QB, 0, 2) <= 1) {
518 free_mpq_block (QB);
519 } else {
520 __sync_fetch_and_add (&mpq_blocks_wasted, 1);
521 // ... put QB into some GC queue? ...
522 QB->mqb_magic = MQ_BLOCK_GARBAGE_MAGIC;
523 mpq_push (QB->mqb_size == MPQ_SMALL_BLOCK_SIZE ? &MqGarbageSmallBlocks : &MqGarbageBlocks, QB, 0);
524 }
525 }
526 }
527 *hptr = 0;
528 return 0;
529}
530
531/* may invoke mpq_alloc_block (which recursively invokes mpq_pop)
532 or mpq_push() (without needing to hold hazard pointer) to deal with blocks */
533long mpq_push (struct mp_queue *MQ, mqn_value_t val, int flags) {
534 void **hptr = mqb_hazard_ptr[get_this_thread_id()];
535 long r = ((flags & MPQF_RECURSIVE) != 0);
536 while (1) {
537 struct mp_queue_block *QB = MQ->mq_tail;
538 barrier ();
539 hptr[r] = QB;
540 barrier ();
541 __sync_synchronize ();
542 if (MQ->mq_tail != QB) {
543 continue;
544 }
545
546 if (QB->mqb_next) {
547 __sync_bool_compare_and_swap (&MQ->mq_tail, QB, QB->mqb_next);
548 continue;
549 }
550 long pos = mpq_block_push (QB, val);
551 if (pos >= 0) {
552 if (flags & MPQF_STORE_PTR) {
553 hptr[2] = QB;
554 }
555 hptr[r] = 0;
556 return pos;
557 }
558#define DBG(c) // fprintf (stderr, "[%d] pushing %lx to %p,%p: %c\n", mpq_this_thread_id, (long) val, MQ, QB, c);
559 DBG('A');
560 /*
561 if (__sync_fetch_and_add (&QB->mqb_next_allocators, 1)) {
562 // somebody else will allocate next block; busy wait instead of spuruous alloc/free
563 DBG('B')
564 while (!QB->mqb_next) {
565 barrier ();
566 }
567 DBG('C')
568 continue;
569 }
570 */
571 int is_small = (QB == MQ->mq_head);
572 struct mp_queue_block *NQB;
573 if (!r) {
574 assert (!hptr[1]);
575 NQB = alloc_mpq_block (val, 1, is_small);
576 assert (!hptr[1]);
577 } else {
578 NQB = alloc_mpq_block (val, 0, is_small);
579 }
580 assert (hptr[r] == QB);
581 DBG('D')
582 if (__sync_bool_compare_and_swap (&QB->mqb_next, 0, NQB)) {
583 __sync_bool_compare_and_swap (&MQ->mq_tail, QB, NQB);
584 DBG('E')
585 if (flags & MPQF_STORE_PTR) {
586 hptr[2] = NQB;
587 }
588 hptr[r] = 0;
589 return 0;
590 } else {
591 DBG('F');
592 NQB->mqb_magic = MQ_BLOCK_PREPARED_MAGIC;
593 mpq_push (is_small ? &MqPreparedSmallBlocks : &MqPreparedBlocks, NQB, 0);
594 __sync_fetch_and_add (&mpq_blocks_prepared, 1);
595 }
596 }
597#undef DBG
598}
599
600mqn_value_t mpq_pop_w (struct mp_queue *MQ, int flags) {
601 assert (MQ->mq_magic == MQ_MAGIC_SEM);
602 int s = -1, iterations = flags & MPQF_MAX_ITERATIONS;
603 while (iterations --> 0) {
604#if MPQ_USE_POSIX_SEMAPHORES
605 s = sem_trywait (&MQ->mq_sem);
606#else
607 s = mp_sem_trywait (&MQ->mq_sem);
608#endif
609 if (!s) {
610 break;
611 }
612#if MPQ_USE_POSIX_SEMAPHORES
613 assert (errno == EAGAIN || errno == EINTR);
614#endif
615 }
616 while (s < 0) {
617#if MPQ_USE_POSIX_SEMAPHORES
618 s = sem_wait (&MQ->mq_sem);
619#else
620 s = mp_sem_wait (&MQ->mq_sem);
621#endif
622 if (!s) {
623 break;
624 }
625#if MPQ_USE_POSIX_SEMAPHORES
626 assert (errno == EAGAIN);
627#endif
628 }
629 mqn_value_t *v = mpq_pop (MQ, flags);
630 assert (v);
631 return v;
632}
633
634mqn_value_t mpq_pop_nw (struct mp_queue *MQ, int flags) {
635 assert (MQ->mq_magic == MQ_MAGIC_SEM);
636 int s = -1, iterations = flags & MPQF_MAX_ITERATIONS;
637 while (iterations --> 0) {
638#if MPQ_USE_POSIX_SEMAPHORES
639 s = sem_trywait (&MQ->mq_sem);
640#else
641 s = mp_sem_trywait (&MQ->mq_sem);
642#endif
643 if (s >= 0) {
644 break;
645 }
646#if MPQ_USE_POSIX_SEMAPHORES
647 assert (errno == EAGAIN || errno == EINTR);
648#endif
649 }
650 if (s < 0) {
651 return 0;
652 }
653 mqn_value_t *v = mpq_pop (MQ, flags);
654 assert (v);
655 return v;
656}
657
658long mpq_push_w (struct mp_queue *MQ, mqn_value_t v, int flags) {
659 assert (MQ->mq_magic == MQ_MAGIC_SEM);
660 long res = mpq_push (MQ, v, flags);
661#if MPQ_USE_POSIX_SEMAPHORES
662 assert (sem_post (&MQ->mq_sem) >= 0);
663#else
664 assert (mp_sem_post (&MQ->mq_sem) >= 0);
665#endif
666 return res;
667}
668
669void *get_ptr_multithread_copy (void **ptr, void (*incref)(void *ptr)) {
670 void **hptr = &mqb_hazard_ptr[get_this_thread_id()][COMMON_HAZARD_PTR_NUM];
671 assert (*hptr == NULL);
672
673 void *R;
674 while (1) {
675 R = *ptr;
676 barrier ();
677 *hptr = R;
678 barrier ();
679 mfence ();
680
681 if (R != *ptr) {
682 continue;
683 }
684
685 incref (R);
686
687 barrier ();
688 *hptr = NULL;
689
690 break;
691 }
692 return R;
693}
694