1 | /* |
2 | This file is part of Mtproto-proxy Library. |
3 | |
4 | Mtproto-proxy Library is free software: you can redistribute it and/or modify |
5 | it under the terms of the GNU Lesser General Public License as published by |
6 | the Free Software Foundation, either version 2 of the License, or |
7 | (at your option) any later version. |
8 | |
9 | Mtproto-proxy Library is distributed in the hope that it will be useful, |
10 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | GNU Lesser General Public License for more details. |
13 | |
14 | You should have received a copy of the GNU Lesser General Public License |
15 | along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>. |
16 | |
17 | Copyright 2014-2018 Telegram Messenger Inc |
18 | 2014-2015 Andrey Lopatin |
19 | 2014-2018 Nikolai Durov |
20 | */ |
21 | |
22 | #define _FILE_OFFSET_BITS 64 |
23 | |
24 | #include <assert.h> |
25 | #include <errno.h> |
26 | #include <pthread.h> |
27 | #include <signal.h> |
28 | #include <stddef.h> |
29 | #include <stdio.h> |
30 | #include <stdlib.h> |
31 | #include <string.h> |
32 | #include <time.h> |
33 | #include <unistd.h> |
34 | |
35 | #include <linux/futex.h> |
36 | #include <sys/syscall.h> |
37 | |
38 | #include "server-functions.h" |
39 | #include "kprintf.h" |
40 | #include "precise-time.h" |
41 | #include "mp-queue.h" |
42 | #include "jobs/jobs.h" |
43 | #include "common/common-stats.h" |
44 | |
45 | volatile int mpq_blocks_allocated, mpq_blocks_allocated_max, mpq_blocks_allocations, mpq_blocks_true_allocations, mpq_blocks_wasted, mpq_blocks_prepared; |
46 | volatile int mpq_small_blocks_allocated, mpq_small_blocks_allocated_max; |
47 | |
48 | __thread int mpq_this_thread_id; |
49 | __thread void **thread_hazard_pointers; |
50 | volatile int mpq_threads; |
51 | |
52 | struct mp_queue MqGarbageBlocks, MqPreparedBlocks; |
53 | struct mp_queue MqGarbageSmallBlocks, MqPreparedSmallBlocks; |
54 | |
55 | #define MODULE mp_queue |
56 | |
57 | MODULE_STAT_TYPE { |
58 | int mpq_active; |
59 | int mpq_allocated; |
60 | }; |
61 | |
62 | MODULE_INIT |
63 | |
64 | MODULE_STAT_FUNCTION |
65 | SBP_PRINT_I32 (mpq_blocks_allocated); |
66 | SBP_PRINT_I32 (mpq_blocks_allocated_max); |
67 | SBP_PRINT_I32 (mpq_blocks_allocations); |
68 | SBP_PRINT_I32 (mpq_blocks_true_allocations); |
69 | SBP_PRINT_I32 (mpq_blocks_wasted); |
70 | SBP_PRINT_I32 (mpq_blocks_prepared); |
71 | SBP_PRINT_I32 (mpq_small_blocks_allocated); |
72 | SBP_PRINT_I32 (mpq_small_blocks_allocated_max); |
73 | SB_SUM_ONE_I (mpq_active); |
74 | SB_SUM_ONE_I (mpq_allocated); |
75 | MODULE_STAT_FUNCTION_END |
76 | |
77 | /* hazard pointers, one per thread */ |
78 | |
79 | void *mqb_hazard_ptr[MAX_MPQ_THREADS][THREAD_HPTRS] __attribute__ ((aligned(64))); |
80 | |
81 | int is_hazard_ptr (void *ptr, int a, int b) { |
82 | barrier (); |
83 | int k = mpq_threads, q = mpq_this_thread_id; |
84 | barrier (); |
85 | int i, j, r = 0; |
86 | for (j = a; j <= b; j++) { |
87 | if (mqb_hazard_ptr[q][j] == ptr) { |
88 | r = 1; |
89 | break; |
90 | } |
91 | } |
92 | for (i = 1; i <= k; i++) { |
93 | if (i == q) { |
94 | continue; |
95 | } |
96 | for (j = a; j <= b; j++) { |
97 | if (mqb_hazard_ptr[i][j] == ptr) { |
98 | barrier (); |
99 | return r + 2; |
100 | } |
101 | } |
102 | } |
103 | barrier (); |
104 | return r; |
105 | } |
106 | |
107 | /* initialize this thread id and return it */ |
108 | int get_this_thread_id (void) { |
109 | int i = mpq_this_thread_id; |
110 | if (i) { |
111 | return i; |
112 | } |
113 | i = __sync_fetch_and_add (&mpq_threads, 1) + 1; |
114 | assert (i > 0 && i < MAX_MPQ_THREADS); |
115 | thread_hazard_pointers = mqb_hazard_ptr[i]; |
116 | return mpq_this_thread_id = i; |
117 | } |
118 | |
119 | /* custom semaphore implementation using futexes */ |
120 | |
121 | int mp_sem_post (mp_sem_t *sem) { |
122 | __sync_fetch_and_add (&sem->value, 1); |
123 | if (sem->waiting > 0) { |
124 | syscall (__NR_futex, &sem->value, FUTEX_WAKE, 1, NULL, 0, 0); |
125 | } |
126 | return 0; |
127 | } |
128 | |
129 | int mp_sem_wait (mp_sem_t *sem) { |
130 | int v = sem->value, q = 0; |
131 | while (1) { |
132 | if (v > 0) { |
133 | v = __sync_fetch_and_add (&sem->value, -1); |
134 | if (v > 0) { |
135 | return 0; |
136 | } |
137 | v = __sync_add_and_fetch (&sem->value, 1); |
138 | } else { |
139 | if (v < 0 && q++ < 10) { |
140 | barrier (); |
141 | v = sem->value; |
142 | continue; |
143 | } |
144 | __sync_fetch_and_add (&sem->waiting, 1); |
145 | syscall (__NR_futex, &sem->value, FUTEX_WAIT, v, NULL, 0, 0); |
146 | __sync_fetch_and_add (&sem->waiting, -1); |
147 | v = sem->value; |
148 | q = 0; |
149 | } |
150 | } |
151 | } |
152 | |
153 | int mp_sem_trywait (mp_sem_t *sem) { |
154 | int v = sem->value; |
155 | if (v > 0) { |
156 | v = __sync_fetch_and_add (&sem->value, -1); |
157 | if (v > 0) { |
158 | return 0; |
159 | } |
160 | __sync_fetch_and_add (&sem->value, 1); |
161 | } |
162 | return -1; |
163 | } |
164 | |
165 | /* functions for one mp_queue_block */ |
166 | |
167 | // may invoke mpq_pop()/mpq_push() if allow_recursion=1 |
168 | struct mp_queue_block *alloc_mpq_block (mqn_value_t first_val, int allow_recursion, int is_small) { |
169 | struct mp_queue_block *QB = 0; |
170 | int prepared = 0, align_bytes = 0; |
171 | long size = (is_small ? MPQ_SMALL_BLOCK_SIZE : MPQ_BLOCK_SIZE); |
172 | if (allow_recursion) { |
173 | QB = mpq_pop (is_small ? &MqGarbageSmallBlocks : &MqGarbageBlocks, MPQF_RECURSIVE); |
174 | if (QB) { |
175 | if (!is_hazard_ptr (QB, 0, 2)) { |
176 | // reclaiming garbage |
177 | assert (QB->mqb_magic == MQ_BLOCK_GARBAGE_MAGIC); |
178 | __sync_fetch_and_add (&mpq_blocks_wasted, -1); |
179 | align_bytes = QB->mqb_align_bytes; |
180 | } else { |
181 | mpq_push (is_small ? &MqGarbageSmallBlocks : &MqGarbageBlocks, QB, MPQF_RECURSIVE); |
182 | QB = 0; |
183 | } |
184 | } |
185 | if (!QB) { |
186 | QB = mpq_pop (is_small ? &MqPreparedSmallBlocks : &MqPreparedBlocks, MPQF_RECURSIVE); |
187 | if (QB) { |
188 | assert (QB->mqb_magic == MQ_BLOCK_PREPARED_MAGIC); |
189 | prepared = 1; |
190 | __sync_fetch_and_add (&mpq_blocks_prepared, -1); |
191 | align_bytes = QB->mqb_align_bytes; |
192 | } |
193 | } |
194 | } |
195 | if (!QB) { |
196 | char *new_block = malloc (offsetof (struct mp_queue_block, mqb_nodes) + size * (2 * sizeof (void *)) + MPQ_BLOCK_ALIGNMENT - sizeof (void *)); |
197 | assert (new_block); |
198 | assert (!((long) new_block & (sizeof (void *) - 1))); |
199 | align_bytes = -(int)(long) new_block & (MPQ_BLOCK_ALIGNMENT - 1); |
200 | QB = (struct mp_queue_block *) (new_block + align_bytes); |
201 | |
202 | __sync_fetch_and_add (&mpq_blocks_true_allocations, 1); |
203 | if (is_small) { |
204 | int t = __sync_fetch_and_add (&mpq_small_blocks_allocated, 1); |
205 | if (t >= mpq_small_blocks_allocated_max) { |
206 | __sync_bool_compare_and_swap (&mpq_small_blocks_allocated_max, mpq_small_blocks_allocated_max, t + 1); |
207 | } |
208 | } else { |
209 | int t = __sync_fetch_and_add (&mpq_blocks_allocated, 1); |
210 | if (t >= mpq_blocks_allocated_max) { |
211 | __sync_bool_compare_and_swap (&mpq_blocks_allocated_max, mpq_blocks_allocated_max, t + 1); |
212 | } |
213 | } |
214 | } else { |
215 | assert (QB->mqb_size == size); |
216 | } |
217 | __sync_fetch_and_add (&mpq_blocks_allocations, 1); |
218 | |
219 | memset (QB, 0, offsetof (struct mp_queue_block, mqb_nodes)); |
220 | QB->mqb_align_bytes = align_bytes; |
221 | QB->mqb_size = size; |
222 | |
223 | QB->mqb_nodes[0].idx = MQN_SAFE; |
224 | QB->mqb_nodes[0].val = first_val; |
225 | |
226 | if (!prepared) { |
227 | long i; |
228 | for (i = 1; i < size; i++) { |
229 | QB->mqb_nodes[i].idx = MQN_SAFE + i; |
230 | QB->mqb_nodes[i].val = 0; |
231 | } |
232 | } |
233 | |
234 | if (first_val) { |
235 | QB->mqb_tail = 1; |
236 | } |
237 | |
238 | QB->mqb_magic = MQ_BLOCK_USED_MAGIC; |
239 | return QB; |
240 | } |
241 | |
242 | void free_mpq_block (struct mp_queue_block *QB) { |
243 | assert (QB->mqb_magic == MQ_BLOCK_USED_MAGIC); |
244 | assert ((unsigned) QB->mqb_align_bytes < MPQ_BLOCK_ALIGNMENT && !(QB->mqb_align_bytes & (sizeof (void *) - 1))); |
245 | QB->mqb_magic = MQ_BLOCK_FREE_MAGIC; |
246 | if (QB->mqb_size == MPQ_SMALL_BLOCK_SIZE) { |
247 | __sync_fetch_and_add (&mpq_small_blocks_allocated, -1); |
248 | } else { |
249 | assert (QB->mqb_size == MPQ_BLOCK_SIZE); |
250 | __sync_fetch_and_add (&mpq_blocks_allocated, -1); |
251 | } |
252 | free ((char *) QB - QB->mqb_align_bytes); |
253 | } |
254 | |
255 | static inline void mpq_fix_state (struct mp_queue_block *QB) { |
256 | long h, t; |
257 | while (1) { |
258 | barrier(); |
259 | t = QB->mqb_tail; |
260 | barrier(); |
261 | h = QB->mqb_head; |
262 | barrier(); |
263 | if ((unsigned long) h <= (unsigned long) t) { |
264 | break; |
265 | } |
266 | if (QB->mqb_tail != t) { |
267 | continue; |
268 | } |
269 | // here tail < head ; try to advance tail to head |
270 | // (or to some value h such that tail < h <= head) |
271 | if (__sync_bool_compare_and_swap (&QB->mqb_tail, t, h)) { |
272 | break; |
273 | } |
274 | } |
275 | } |
276 | |
277 | mqn_value_t mpq_block_pop (struct mp_queue_block *QB) { |
278 | // fprintf (stderr, "%d:mpq_block_pop(%p)\n", mpq_this_thread_id, QB); |
279 | long size = QB->mqb_size; |
280 | while (1) { |
281 | long h = __sync_fetch_and_add (&QB->mqb_head, 1); |
282 | // fprintf (stderr, "%d: mpq_block_pop(%ld)\n", mpq_this_thread_id, h); |
283 | mpq_node_t *node = &QB->mqb_nodes[h & (size - 1)]; |
284 | while (1) { |
285 | mpq_node_t d, e; |
286 | barrier(); |
287 | mqn_value_t val = node->val; |
288 | barrier(); |
289 | long safe_idx = node->idx; |
290 | barrier(); |
291 | long idx = safe_idx & MQN_IDX_MASK; |
292 | if (idx > h) { |
293 | break; |
294 | } |
295 | d.val = val; |
296 | d.idx = safe_idx; |
297 | if (val) { |
298 | if (idx == h) { |
299 | e.idx = safe_idx + size; |
300 | e.val = 0; |
301 | if (__sync_bool_compare_and_swap (&node->pair, d.pair, e.pair)) { |
302 | // fprintf (stderr, "%d: mpq_block_pop(%ld) -> %lx\n", mpq_this_thread_id, h, (long) val); |
303 | return val; |
304 | } |
305 | } else { |
306 | e.val = val; |
307 | e.idx = idx; // clear 'safe' flag |
308 | if (__sync_bool_compare_and_swap (&node->pair, d.pair, e.pair)) { |
309 | break; |
310 | } |
311 | } |
312 | } else { |
313 | e.idx = (safe_idx & MQN_SAFE) + h + size; |
314 | e.val = 0; |
315 | if (__sync_bool_compare_and_swap (&node->pair, d.pair, e.pair)) { |
316 | break; |
317 | } |
318 | } |
319 | /* somebody changed this element while we were inspecting it, make another loop iteration */ |
320 | } |
321 | barrier(); |
322 | long t = QB->mqb_tail & MQN_IDX_MASK; |
323 | barrier(); |
324 | if (t <= h + 1) { |
325 | mpq_fix_state (QB); |
326 | return 0; |
327 | } |
328 | /* now try again with a new value of h */ |
329 | } |
330 | } |
331 | |
332 | long mpq_block_push (struct mp_queue_block *QB, mqn_value_t val) { |
333 | int iterations = 0; |
334 | long size = QB->mqb_size; |
335 | // fprintf (stderr, "%d:mpq_block_push(%p)\n", mpq_this_thread_id, QB); |
336 | while (1) { |
337 | long t = __sync_fetch_and_add (&QB->mqb_tail, 1); |
338 | // fprintf (stderr, "%d: mpq_block_push(%ld)\n", mpq_this_thread_id, t); |
339 | if (t & MQN_SAFE) { |
340 | return -1L; // bad luck |
341 | } |
342 | mpq_node_t *node = &QB->mqb_nodes[t & (size - 1)]; |
343 | barrier(); |
344 | mqn_value_t old_val = node->val; |
345 | barrier(); |
346 | long safe_idx = node->idx; |
347 | barrier(); |
348 | long idx = safe_idx & MQN_IDX_MASK; |
349 | if (!old_val && idx <= t && ((safe_idx & MQN_SAFE) || QB->mqb_head <= t)) { |
350 | mpq_node_t d, e; |
351 | d.idx = safe_idx; |
352 | d.val = 0; |
353 | e.idx = MQN_SAFE + t; |
354 | e.val = val; |
355 | if (__sync_bool_compare_and_swap (&node->pair, d.pair, e.pair)) { |
356 | // fprintf (stderr, "%d: mpq_block_push(%ld) <- %lx\n", mpq_this_thread_id, t, (long) val); |
357 | return t; // pushed OK |
358 | } |
359 | } |
360 | barrier (); |
361 | long h = QB->mqb_head; |
362 | barrier (); |
363 | if (t - h >= size || ++iterations > 10) { |
364 | __sync_fetch_and_or (&QB->mqb_tail, MQN_SAFE); // closing queue |
365 | return -1L; // bad luck |
366 | } |
367 | } |
368 | } |
369 | |
370 | /* functions for mp_queue = list of mp_queue_block's */ |
371 | void init_mp_queue (struct mp_queue *MQ) { |
372 | assert (MQ->mq_magic != MQ_MAGIC && MQ->mq_magic != MQ_MAGIC_SEM); |
373 | memset (MQ, 0, sizeof (struct mp_queue)); |
374 | MQ->mq_head = MQ->mq_tail = alloc_mpq_block (0, 0, 1); |
375 | MQ->mq_magic = MQ_MAGIC; |
376 | |
377 | if (!MqGarbageBlocks.mq_magic) { |
378 | init_mp_queue (&MqGarbageBlocks); |
379 | init_mp_queue (&MqGarbageSmallBlocks); |
380 | } else if (!MqPreparedBlocks.mq_magic) { |
381 | init_mp_queue (&MqPreparedBlocks); |
382 | init_mp_queue (&MqPreparedSmallBlocks); |
383 | } |
384 | } |
385 | |
386 | void init_mp_queue_w (struct mp_queue *MQ) { |
387 | init_mp_queue (MQ); |
388 | MODULE_STAT->mpq_active ++; |
389 | #if MPQ_USE_POSIX_SEMAPHORES |
390 | sem_init (&MQ->mq_sem, 0, 0); |
391 | #endif |
392 | MQ->mq_magic = MQ_MAGIC_SEM; |
393 | } |
394 | |
395 | struct mp_queue *alloc_mp_queue (void) { |
396 | struct mp_queue *MQ = NULL; |
397 | assert (!posix_memalign ((void **)&MQ, 64, sizeof (*MQ))); |
398 | memset (MQ, 0, sizeof (*MQ)); |
399 | init_mp_queue (MQ); |
400 | return MQ; |
401 | } |
402 | |
403 | struct mp_queue *alloc_mp_queue_w (void) { |
404 | struct mp_queue *MQ = NULL; |
405 | assert (!posix_memalign ((void **)&MQ, 64, sizeof (*MQ))); |
406 | memset (MQ, 0, sizeof (*MQ)); |
407 | MODULE_STAT->mpq_allocated ++; |
408 | init_mp_queue_w (MQ); |
409 | return MQ; |
410 | } |
411 | |
412 | /* invoke only if sure that nobody else may be using this mp_queue in parallel */ |
413 | void clear_mp_queue (struct mp_queue *MQ) { |
414 | MODULE_STAT->mpq_active --; |
415 | assert (MQ->mq_magic == MQ_MAGIC || MQ->mq_magic == MQ_MAGIC_SEM); |
416 | assert (MQ->mq_head && MQ->mq_tail); |
417 | struct mp_queue_block *QB = MQ->mq_head, *QBN; |
418 | for (QB = MQ->mq_head; QB; QB = QBN) { |
419 | QBN = QB->mqb_next; |
420 | assert (QB->mqb_next || QB == MQ->mq_tail); |
421 | QB->mqb_next = 0; |
422 | free_mpq_block (QB); |
423 | } |
424 | MQ->mq_head = MQ->mq_tail = 0; |
425 | MQ->mq_magic = 0; |
426 | } |
427 | |
428 | void free_mp_queue (struct mp_queue *MQ) { |
429 | MODULE_STAT->mpq_allocated --; |
430 | clear_mp_queue (MQ); |
431 | free (MQ); |
432 | } |
433 | |
434 | // may invoke mpq_push() to discard new empty block |
435 | mqn_value_t mpq_pop (struct mp_queue *MQ, int flags) { |
436 | void **hptr = &mqb_hazard_ptr[get_this_thread_id()][0]; |
437 | long r = ((flags & MPQF_RECURSIVE) != 0); |
438 | struct mp_queue_block *QB; |
439 | mqn_value_t v; |
440 | while (1) { |
441 | QB = MQ->mq_head; |
442 | barrier (); |
443 | hptr[r] = QB; |
444 | barrier (); |
445 | __sync_synchronize (); |
446 | if (MQ->mq_head != QB) { |
447 | continue; |
448 | } |
449 | |
450 | v = mpq_block_pop (QB); |
451 | if (v) { |
452 | break; |
453 | } |
454 | barrier (); |
455 | if (!QB->mqb_next) { |
456 | QB = 0; |
457 | break; |
458 | } |
459 | v = mpq_block_pop (QB); |
460 | if (v) { |
461 | break; |
462 | } |
463 | if (__sync_bool_compare_and_swap (&MQ->mq_head, QB, QB->mqb_next)) { |
464 | // want to free QB here, but this is complicated if somebody else holds a pointer |
465 | if (is_hazard_ptr (QB, 0, 2) <= 1) { |
466 | free_mpq_block (QB); |
467 | } else { |
468 | __sync_fetch_and_add (&mpq_blocks_wasted, 1); |
469 | // ... put QB into some GC queue? ... |
470 | QB->mqb_magic = MQ_BLOCK_GARBAGE_MAGIC; |
471 | mpq_push (QB->mqb_size == MPQ_SMALL_BLOCK_SIZE ? &MqGarbageSmallBlocks : &MqGarbageBlocks, QB, flags & MPQF_RECURSIVE); |
472 | } |
473 | } |
474 | } |
475 | if (flags & MPQF_STORE_PTR) { |
476 | hptr[2] = QB; |
477 | } |
478 | hptr[r] = 0; |
479 | return v; |
480 | } |
481 | |
482 | /* 1 = definitely empty (for some serialization), 0 = possibly non-empty; |
483 | may invoke mpq_push() to discard empty block */ |
484 | int mpq_is_empty (struct mp_queue *MQ) { |
485 | void **hptr = &mqb_hazard_ptr[get_this_thread_id()][0]; |
486 | struct mp_queue_block *QB; |
487 | while (1) { |
488 | QB = MQ->mq_head; |
489 | barrier (); |
490 | *hptr = QB; |
491 | barrier (); |
492 | __sync_synchronize (); |
493 | if (MQ->mq_head != QB) { |
494 | continue; |
495 | } |
496 | barrier(); |
497 | long h = QB->mqb_head; |
498 | barrier(); |
499 | long t = QB->mqb_tail; |
500 | barrier(); |
501 | if (!(t & MQN_SAFE)) { |
502 | *hptr = 0; |
503 | return t <= h; |
504 | } |
505 | t &= MQN_IDX_MASK; |
506 | if (t > h) { |
507 | *hptr = 0; |
508 | return 0; |
509 | } |
510 | barrier (); |
511 | if (!QB->mqb_next) { |
512 | *hptr = 0; |
513 | return 1; |
514 | } |
515 | if (__sync_bool_compare_and_swap (&MQ->mq_head, QB, QB->mqb_next)) { |
516 | // want to free QB here, but this is complicated if somebody else holds a pointer |
517 | if (is_hazard_ptr (QB, 0, 2) <= 1) { |
518 | free_mpq_block (QB); |
519 | } else { |
520 | __sync_fetch_and_add (&mpq_blocks_wasted, 1); |
521 | // ... put QB into some GC queue? ... |
522 | QB->mqb_magic = MQ_BLOCK_GARBAGE_MAGIC; |
523 | mpq_push (QB->mqb_size == MPQ_SMALL_BLOCK_SIZE ? &MqGarbageSmallBlocks : &MqGarbageBlocks, QB, 0); |
524 | } |
525 | } |
526 | } |
527 | *hptr = 0; |
528 | return 0; |
529 | } |
530 | |
531 | /* may invoke mpq_alloc_block (which recursively invokes mpq_pop) |
532 | or mpq_push() (without needing to hold hazard pointer) to deal with blocks */ |
533 | long mpq_push (struct mp_queue *MQ, mqn_value_t val, int flags) { |
534 | void **hptr = mqb_hazard_ptr[get_this_thread_id()]; |
535 | long r = ((flags & MPQF_RECURSIVE) != 0); |
536 | while (1) { |
537 | struct mp_queue_block *QB = MQ->mq_tail; |
538 | barrier (); |
539 | hptr[r] = QB; |
540 | barrier (); |
541 | __sync_synchronize (); |
542 | if (MQ->mq_tail != QB) { |
543 | continue; |
544 | } |
545 | |
546 | if (QB->mqb_next) { |
547 | __sync_bool_compare_and_swap (&MQ->mq_tail, QB, QB->mqb_next); |
548 | continue; |
549 | } |
550 | long pos = mpq_block_push (QB, val); |
551 | if (pos >= 0) { |
552 | if (flags & MPQF_STORE_PTR) { |
553 | hptr[2] = QB; |
554 | } |
555 | hptr[r] = 0; |
556 | return pos; |
557 | } |
558 | #define DBG(c) // fprintf (stderr, "[%d] pushing %lx to %p,%p: %c\n", mpq_this_thread_id, (long) val, MQ, QB, c); |
559 | DBG('A'); |
560 | /* |
561 | if (__sync_fetch_and_add (&QB->mqb_next_allocators, 1)) { |
562 | // somebody else will allocate next block; busy wait instead of spuruous alloc/free |
563 | DBG('B') |
564 | while (!QB->mqb_next) { |
565 | barrier (); |
566 | } |
567 | DBG('C') |
568 | continue; |
569 | } |
570 | */ |
571 | int is_small = (QB == MQ->mq_head); |
572 | struct mp_queue_block *NQB; |
573 | if (!r) { |
574 | assert (!hptr[1]); |
575 | NQB = alloc_mpq_block (val, 1, is_small); |
576 | assert (!hptr[1]); |
577 | } else { |
578 | NQB = alloc_mpq_block (val, 0, is_small); |
579 | } |
580 | assert (hptr[r] == QB); |
581 | DBG('D') |
582 | if (__sync_bool_compare_and_swap (&QB->mqb_next, 0, NQB)) { |
583 | __sync_bool_compare_and_swap (&MQ->mq_tail, QB, NQB); |
584 | DBG('E') |
585 | if (flags & MPQF_STORE_PTR) { |
586 | hptr[2] = NQB; |
587 | } |
588 | hptr[r] = 0; |
589 | return 0; |
590 | } else { |
591 | DBG('F'); |
592 | NQB->mqb_magic = MQ_BLOCK_PREPARED_MAGIC; |
593 | mpq_push (is_small ? &MqPreparedSmallBlocks : &MqPreparedBlocks, NQB, 0); |
594 | __sync_fetch_and_add (&mpq_blocks_prepared, 1); |
595 | } |
596 | } |
597 | #undef DBG |
598 | } |
599 | |
600 | mqn_value_t mpq_pop_w (struct mp_queue *MQ, int flags) { |
601 | assert (MQ->mq_magic == MQ_MAGIC_SEM); |
602 | int s = -1, iterations = flags & MPQF_MAX_ITERATIONS; |
603 | while (iterations --> 0) { |
604 | #if MPQ_USE_POSIX_SEMAPHORES |
605 | s = sem_trywait (&MQ->mq_sem); |
606 | #else |
607 | s = mp_sem_trywait (&MQ->mq_sem); |
608 | #endif |
609 | if (!s) { |
610 | break; |
611 | } |
612 | #if MPQ_USE_POSIX_SEMAPHORES |
613 | assert (errno == EAGAIN || errno == EINTR); |
614 | #endif |
615 | } |
616 | while (s < 0) { |
617 | #if MPQ_USE_POSIX_SEMAPHORES |
618 | s = sem_wait (&MQ->mq_sem); |
619 | #else |
620 | s = mp_sem_wait (&MQ->mq_sem); |
621 | #endif |
622 | if (!s) { |
623 | break; |
624 | } |
625 | #if MPQ_USE_POSIX_SEMAPHORES |
626 | assert (errno == EAGAIN); |
627 | #endif |
628 | } |
629 | mqn_value_t *v = mpq_pop (MQ, flags); |
630 | assert (v); |
631 | return v; |
632 | } |
633 | |
634 | mqn_value_t mpq_pop_nw (struct mp_queue *MQ, int flags) { |
635 | assert (MQ->mq_magic == MQ_MAGIC_SEM); |
636 | int s = -1, iterations = flags & MPQF_MAX_ITERATIONS; |
637 | while (iterations --> 0) { |
638 | #if MPQ_USE_POSIX_SEMAPHORES |
639 | s = sem_trywait (&MQ->mq_sem); |
640 | #else |
641 | s = mp_sem_trywait (&MQ->mq_sem); |
642 | #endif |
643 | if (s >= 0) { |
644 | break; |
645 | } |
646 | #if MPQ_USE_POSIX_SEMAPHORES |
647 | assert (errno == EAGAIN || errno == EINTR); |
648 | #endif |
649 | } |
650 | if (s < 0) { |
651 | return 0; |
652 | } |
653 | mqn_value_t *v = mpq_pop (MQ, flags); |
654 | assert (v); |
655 | return v; |
656 | } |
657 | |
658 | long mpq_push_w (struct mp_queue *MQ, mqn_value_t v, int flags) { |
659 | assert (MQ->mq_magic == MQ_MAGIC_SEM); |
660 | long res = mpq_push (MQ, v, flags); |
661 | #if MPQ_USE_POSIX_SEMAPHORES |
662 | assert (sem_post (&MQ->mq_sem) >= 0); |
663 | #else |
664 | assert (mp_sem_post (&MQ->mq_sem) >= 0); |
665 | #endif |
666 | return res; |
667 | } |
668 | |
669 | void *get_ptr_multithread_copy (void **ptr, void (*incref)(void *ptr)) { |
670 | void **hptr = &mqb_hazard_ptr[get_this_thread_id()][COMMON_HAZARD_PTR_NUM]; |
671 | assert (*hptr == NULL); |
672 | |
673 | void *R; |
674 | while (1) { |
675 | R = *ptr; |
676 | barrier (); |
677 | *hptr = R; |
678 | barrier (); |
679 | mfence (); |
680 | |
681 | if (R != *ptr) { |
682 | continue; |
683 | } |
684 | |
685 | incref (R); |
686 | |
687 | barrier (); |
688 | *hptr = NULL; |
689 | |
690 | break; |
691 | } |
692 | return R; |
693 | } |
694 | |