1 | /* |
2 | This file is part of Mtproto-proxy Library. |
3 | |
4 | Mtproto-proxy Library is free software: you can redistribute it and/or modify |
5 | it under the terms of the GNU Lesser General Public License as published by |
6 | the Free Software Foundation, either version 2 of the License, or |
7 | (at your option) any later version. |
8 | |
9 | Mtproto-proxy Library is distributed in the hope that it will be useful, |
10 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | GNU Lesser General Public License for more details. |
13 | |
14 | You should have received a copy of the GNU Lesser General Public License |
15 | along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>. |
16 | |
17 | Copyright 2012-2013 Vkontakte Ltd |
18 | 2012-2013 Nikolai Durov |
19 | 2012-2013 Andrey Lopatin |
20 | |
21 | Copyright 2014-2016 Telegram Messenger Inc |
22 | 2015-2016 Vitaly Valtman |
23 | */ |
24 | |
25 | #define _FILE_OFFSET_BITS 64 |
26 | |
27 | #include <assert.h> |
28 | #include <stddef.h> |
29 | #include <stdio.h> |
30 | #include <stdlib.h> |
31 | #include <string.h> |
32 | #include <unistd.h> |
33 | |
34 | #include "kprintf.h" |
35 | #include "jobs/jobs.h" |
36 | #include "common/common-stats.h" |
37 | #include "common/server-functions.h" |
38 | |
39 | #define MODULE raw_msg_buffer |
40 | |
41 | int allocated_buffer_chunks, max_allocated_buffer_chunks, max_buffer_chunks; |
42 | long long max_allocated_buffer_bytes; |
43 | |
44 | MODULE_STAT_TYPE { |
45 | long long total_used_buffers_size; |
46 | int total_used_buffers; |
47 | long long allocated_buffer_bytes; |
48 | long long buffer_chunk_alloc_ops; |
49 | }; |
50 | |
51 | MODULE_INIT |
52 | |
53 | MODULE_STAT_FUNCTION |
54 | SB_SUM_ONE_LL (total_used_buffers_size); |
55 | SB_SUM_ONE_I (total_used_buffers); |
56 | SB_SUM_ONE_LL (allocated_buffer_bytes); |
57 | SB_SUM_ONE_LL (buffer_chunk_alloc_ops); |
58 | sb_printf (sb, |
59 | "allocated_buffer_chunks\t%d\n" |
60 | "max_allocated_buffer_chunks\t%d\n" |
61 | "max_buffer_chunks\t%d\n" |
62 | "max_allocated_buffer_bytes\t%lld\n" , |
63 | allocated_buffer_chunks, |
64 | max_allocated_buffer_chunks, |
65 | max_buffer_chunks, |
66 | max_allocated_buffer_bytes |
67 | ); |
68 | MODULE_STAT_FUNCTION_END |
69 | |
70 | void fetch_buffers_stat (struct buffers_stat *bs) { |
71 | bs->total_used_buffers_size = SB_SUM_LL (total_used_buffers_size); |
72 | bs->allocated_buffer_bytes = SB_SUM_LL (allocated_buffer_bytes); |
73 | bs->buffer_chunk_alloc_ops = SB_SUM_LL (buffer_chunk_alloc_ops); |
74 | bs->total_used_buffers = SB_SUM_I (total_used_buffers); |
75 | bs->allocated_buffer_chunks = allocated_buffer_chunks; |
76 | bs->max_allocated_buffer_chunks = max_allocated_buffer_chunks; |
77 | bs->max_allocated_buffer_bytes = max_allocated_buffer_bytes; |
78 | bs->max_buffer_chunks = max_buffer_chunks; |
79 | } |
80 | |
81 | int buffer_size_values; |
82 | int rwm_peak_recovery; |
83 | struct msg_buffers_chunk [MAX_BUFFER_SIZE_VALUES]; |
84 | __thread struct msg_buffers_chunk *ChunkSave[MAX_BUFFER_SIZE_VALUES]; |
85 | |
86 | int default_buffer_sizes[] = { 48, 512, 2048, 16384, 262144 }; |
87 | int default_buffer_sizes_cnt = sizeof (default_buffer_sizes) / 4; |
88 | |
89 | int free_std_msg_buffer (struct msg_buffers_chunk *C, struct msg_buffer *X); |
90 | |
91 | void (void) { |
92 | int i; |
93 | struct msg_buffers_chunk *CH; |
94 | assert (!buffer_size_values); |
95 | for (i = 0, CH = ChunkHeaders; i < sizeof (default_buffer_sizes) / sizeof (int); i++, CH++) { |
96 | CH->magic = MSG_CHUNK_HEAD_MAGIC; |
97 | CH->buffer_size = default_buffer_sizes[i]; |
98 | CH->ch_next = CH->ch_prev = CH; |
99 | CH->free_buffer = 0; |
100 | assert (!i || default_buffer_sizes[i] > default_buffer_sizes[i-1]); |
101 | } |
102 | assert (i); |
103 | buffer_size_values = i; |
104 | } |
105 | |
106 | static inline void prepare_bs_inv (struct msg_buffers_chunk *C) { |
107 | int x = C->buffer_size + 16; |
108 | int i = __builtin_ctz (x); |
109 | x >>= i; |
110 | x = 1 - x; |
111 | int y = 1; |
112 | while (x) { |
113 | y *= 1 + x; |
114 | x *= x; |
115 | } |
116 | C->bs_inverse = y; |
117 | C->bs_shift = i; |
118 | } |
119 | |
120 | static void lock_chunk_head (struct msg_buffers_chunk *CH) { |
121 | while (1) { |
122 | if (__sync_bool_compare_and_swap (&CH->magic, MSG_CHUNK_HEAD_MAGIC, MSG_CHUNK_HEAD_LOCKED_MAGIC)) { |
123 | break; |
124 | } |
125 | usleep (1000); |
126 | } |
127 | } |
128 | |
129 | static void unlock_chunk_head (struct msg_buffers_chunk *CH) { |
130 | CH->magic = MSG_CHUNK_HEAD_MAGIC; |
131 | } |
132 | |
133 | static int try_lock_chunk (struct msg_buffers_chunk *C) { |
134 | if (C->magic != MSG_CHUNK_USED_MAGIC || !__sync_bool_compare_and_swap (&C->magic, MSG_CHUNK_USED_MAGIC, MSG_CHUNK_USED_LOCKED_MAGIC)) { |
135 | return 0; |
136 | } |
137 | while (1) { |
138 | struct msg_buffer *X = mpq_pop_nw (C->free_block_queue, 4); |
139 | if (!X) { break; } |
140 | assert (X->chunk == C); |
141 | C->free_buffer (C, X); |
142 | } |
143 | return 1; |
144 | } |
145 | |
146 | static void unlock_chunk (struct msg_buffers_chunk *C) { |
147 | while (1) { |
148 | while (1) { |
149 | struct msg_buffer *X = mpq_pop_nw (C->free_block_queue, 4); |
150 | if (!X) { break; } |
151 | assert (X->chunk == C); |
152 | C->free_buffer (C, X); |
153 | } |
154 | C->magic = MSG_CHUNK_USED_MAGIC; |
155 | |
156 | if (mpq_is_empty (C->free_block_queue) || !try_lock_chunk (C)) { |
157 | break; |
158 | } |
159 | } |
160 | } |
161 | |
162 | // returns locked chunk |
163 | struct msg_buffers_chunk *alloc_new_msg_buffers_chunk (struct msg_buffers_chunk *CH) { |
164 | unsigned magic = CH->magic; |
165 | assert (magic == MSG_CHUNK_HEAD_MAGIC || magic == MSG_CHUNK_HEAD_LOCKED_MAGIC); |
166 | if (allocated_buffer_chunks >= max_buffer_chunks) { |
167 | // ML |
168 | return 0; |
169 | } |
170 | struct msg_buffers_chunk *C = malloc (MSG_BUFFERS_CHUNK_SIZE); |
171 | if (!C) { |
172 | return 0; |
173 | } |
174 | |
175 | int buffer_size = CH->buffer_size, two_power, chunk_buffers; |
176 | int buffer_hd_size = buffer_size + BUFF_HD_BYTES; |
177 | int align = buffer_hd_size & -buffer_hd_size; |
178 | if (align < 8) { |
179 | align = 8; |
180 | } |
181 | if (align > 64) { |
182 | align = 64; |
183 | } |
184 | |
185 | int t = (MSG_BUFFERS_CHUNK_SIZE - offsetof (struct msg_buffers_chunk, free_cnt)) / (buffer_hd_size + 4); |
186 | two_power = 1; |
187 | while (two_power <= t) { |
188 | two_power <<= 1; |
189 | } |
190 | |
191 | chunk_buffers = (MSG_BUFFERS_CHUNK_SIZE - offsetof (struct msg_buffers_chunk, free_cnt) - two_power * 4 - align) / buffer_hd_size; |
192 | assert (chunk_buffers > 0 && chunk_buffers < 65536 && chunk_buffers <= two_power); |
193 | |
194 | C->magic = MSG_CHUNK_USED_LOCKED_MAGIC; |
195 | C->buffer_size = buffer_size; |
196 | C->free_buffer = free_std_msg_buffer; |
197 | C->ch_head = CH; |
198 | |
199 | |
200 | C->first_buffer = (struct msg_buffer *) (((long) C + offsetof (struct msg_buffers_chunk, free_cnt) + two_power * 4 + align - 1) & -align); |
201 | assert ((char *) (C->first_buffer) + chunk_buffers * buffer_hd_size <= (char *) C + MSG_BUFFERS_CHUNK_SIZE); |
202 | |
203 | C->two_power = two_power; |
204 | C->tot_buffers = chunk_buffers; |
205 | |
206 | C->refcnt = 1; |
207 | |
208 | lock_chunk_head (CH); |
209 | |
210 | CH->tot_buffers += chunk_buffers; |
211 | CH->free_buffers += chunk_buffers; |
212 | CH->tot_chunks++; |
213 | |
214 | C->ch_next = CH->ch_next; |
215 | C->ch_prev = CH; |
216 | CH->ch_next = C; |
217 | C->ch_next->ch_prev = C; |
218 | |
219 | unlock_chunk_head (CH); |
220 | |
221 | MODULE_STAT->allocated_buffer_bytes += MSG_BUFFERS_CHUNK_SIZE; |
222 | __sync_fetch_and_add (&allocated_buffer_chunks, 1); |
223 | |
224 | MODULE_STAT->buffer_chunk_alloc_ops ++; |
225 | |
226 | while (1) { |
227 | barrier (); |
228 | int keep_max_allocated_buffer_chunks = max_allocated_buffer_chunks; |
229 | barrier (); |
230 | int keep_allocated_buffer_chunks = allocated_buffer_chunks; |
231 | barrier (); |
232 | if (keep_max_allocated_buffer_chunks >= keep_allocated_buffer_chunks) { |
233 | break; |
234 | } |
235 | __sync_bool_compare_and_swap (&max_allocated_buffer_chunks, keep_max_allocated_buffer_chunks, keep_allocated_buffer_chunks); |
236 | if (allocated_buffer_chunks >= max_buffer_chunks - 8 && max_buffer_chunks >= 32 && verbosity < 3) { |
237 | // verbosity = 3; |
238 | // vkprintf (1, "Setting verbosity to 3 (NOTICE) because of high buffer chunk usage (used %d, max %d)\n", allocated_buffer_chunks, max_buffer_chunks); |
239 | } |
240 | } |
241 | /*if (rwm_peak_recovery) { |
242 | if (allocated_buffer_chunks > (max_buffer_chunks >> 2)) { |
243 | do_udp_wait (1, 1.0); |
244 | } |
245 | if (allocated_buffer_chunks > (max_buffer_chunks >> 1)) { |
246 | do_udp_wait (2, 1.0); |
247 | } |
248 | }*/ |
249 | |
250 | prepare_bs_inv (C); |
251 | |
252 | int i; |
253 | for (i = 0; i < chunk_buffers; i++) { |
254 | C->free_cnt[two_power+i] = 1; |
255 | } |
256 | memset (&C->free_cnt[two_power + chunk_buffers], 0, (two_power - chunk_buffers) * 2); |
257 | |
258 | for (i = two_power - 1; i > 0; i--) { |
259 | C->free_cnt[i] = C->free_cnt[2*i] + C->free_cnt[2*i+1]; |
260 | } |
261 | |
262 | C->free_block_queue = alloc_mp_queue_w (); |
263 | |
264 | //vkprintf (0, "allocated chunk %p\n", C); |
265 | return C; |
266 | }; |
267 | |
268 | void free_msg_buffers_chunk_internal (struct msg_buffers_chunk *C, struct msg_buffers_chunk *CH) { |
269 | assert (C->magic == MSG_CHUNK_USED_LOCKED_MAGIC); |
270 | unsigned magic = CH->magic; |
271 | assert (magic == MSG_CHUNK_HEAD_MAGIC || magic == MSG_CHUNK_HEAD_LOCKED_MAGIC); |
272 | assert (C->buffer_size == CH->buffer_size); |
273 | assert (C->tot_buffers == C->free_cnt[1]); |
274 | assert (CH == C->ch_head); |
275 | |
276 | C->magic = 0; |
277 | C->ch_head = 0; |
278 | |
279 | lock_chunk_head (CH); |
280 | C->ch_next->ch_prev = C->ch_prev; |
281 | C->ch_prev->ch_next = C->ch_next; |
282 | |
283 | CH->tot_buffers -= C->tot_buffers; |
284 | CH->free_buffers -= C->tot_buffers; |
285 | CH->tot_chunks--; |
286 | unlock_chunk_head (CH); |
287 | |
288 | assert (CH->tot_chunks >= 0); |
289 | |
290 | __sync_fetch_and_add (&allocated_buffer_chunks, -1); |
291 | MODULE_STAT->allocated_buffer_bytes -= MSG_BUFFERS_CHUNK_SIZE; |
292 | |
293 | memset (C, 0, sizeof (struct msg_buffers_chunk)); |
294 | free (C); |
295 | |
296 | int si = buffer_size_values - 1; |
297 | while (si > 0 && &ChunkHeaders[si-1] != CH) { |
298 | si--; |
299 | } |
300 | assert (si >= 0); |
301 | |
302 | if (ChunkSave[si] == C) { |
303 | ChunkSave[si] = NULL; |
304 | } |
305 | |
306 | free_mp_queue (C->free_block_queue); |
307 | C->free_block_queue = NULL; |
308 | } |
309 | |
310 | |
311 | void free_msg_buffers_chunk (struct msg_buffers_chunk *C) { |
312 | assert (C->magic == MSG_CHUNK_USED_LOCKED_MAGIC); |
313 | assert (C->free_cnt[1] == C->tot_buffers); |
314 | |
315 | free_msg_buffers_chunk_internal (C, C->ch_head); |
316 | } |
317 | |
318 | int init_msg_buffers (long max_buffer_bytes) { |
319 | if (!max_buffer_bytes) { |
320 | max_buffer_bytes = max_allocated_buffer_bytes ?: MSG_DEFAULT_MAX_ALLOCATED_BYTES; |
321 | } |
322 | |
323 | assert (max_buffer_bytes >= 0 && max_buffer_bytes <= MSG_MAX_ALLOCATED_BYTES); |
324 | assert (max_buffer_bytes >= allocated_buffer_chunks * MSG_BUFFERS_CHUNK_SIZE); |
325 | |
326 | max_allocated_buffer_bytes = max_buffer_bytes; |
327 | max_buffer_chunks = (unsigned long) max_buffer_bytes / MSG_BUFFERS_CHUNK_SIZE; |
328 | |
329 | if (!buffer_size_values) { |
330 | init_buffer_chunk_headers (); |
331 | } |
332 | |
333 | return 1; |
334 | } |
335 | |
336 | static inline int get_buffer_no (struct msg_buffers_chunk *C, struct msg_buffer *X) { |
337 | unsigned x = ((char *) X - (char *) C->first_buffer); |
338 | x >>= C->bs_shift; |
339 | x *= C->bs_inverse; |
340 | assert (x <= (unsigned) C->tot_buffers && (char *) X == (char *) C->first_buffer + (C->buffer_size + 16) * x); |
341 | return x; |
342 | } |
343 | |
344 | struct msg_buffer *alloc_msg_buffer_internal (struct msg_buffer *neighbor, struct msg_buffers_chunk *CH, struct msg_buffers_chunk *C_hint, int si) { |
345 | unsigned magic = CH->magic; |
346 | assert (magic == MSG_CHUNK_HEAD_MAGIC || magic == MSG_CHUNK_HEAD_LOCKED_MAGIC); |
347 | struct msg_buffers_chunk *C; |
348 | if (!C_hint) { |
349 | C = alloc_new_msg_buffers_chunk (CH); |
350 | if (!C) { |
351 | return 0; |
352 | } |
353 | } else { |
354 | int found = 0; |
355 | if (C_hint && C_hint->free_cnt[1] && try_lock_chunk (C_hint)) { |
356 | assert (C_hint->ch_head == CH); |
357 | C = C_hint; |
358 | if (C_hint->free_cnt[1]) { |
359 | found = 1; |
360 | } else { |
361 | unlock_chunk (C_hint); |
362 | } |
363 | } |
364 | if (!found) { |
365 | lock_chunk_head (CH); |
366 | struct msg_buffers_chunk *CF = C_hint ? C_hint : CH->ch_next; |
367 | C = CF; |
368 | do { |
369 | if (C == CH) { |
370 | C = C->ch_next; |
371 | continue; |
372 | } |
373 | if (!C->free_cnt[1]) { |
374 | C = C->ch_next; |
375 | continue; |
376 | } |
377 | if (!try_lock_chunk (C)) { |
378 | C = C->ch_next; |
379 | continue; |
380 | } |
381 | if (!C->free_cnt[1]) { |
382 | unlock_chunk (C); |
383 | C = C->ch_next; |
384 | continue; |
385 | } |
386 | found = 1; |
387 | break; |
388 | } while (C != CF); |
389 | unlock_chunk_head (CH); |
390 | if (!found) { |
391 | C = alloc_new_msg_buffers_chunk (CH); |
392 | if (!C) { |
393 | return 0; |
394 | } |
395 | } |
396 | if (C_hint) { |
397 | __sync_fetch_and_add (&C_hint->refcnt, -1); |
398 | } |
399 | } |
400 | } |
401 | |
402 | assert (C != CH); |
403 | assert (C->free_cnt[1]); |
404 | assert (C->magic == MSG_CHUNK_USED_LOCKED_MAGIC); |
405 | ChunkSave[si] = C; |
406 | |
407 | int two_power = C->two_power, i = 1; |
408 | |
409 | if (neighbor && neighbor->chunk == C) { |
410 | int x = get_buffer_no (C, neighbor); |
411 | vkprintf (3, "alloc_msg_buffer: allocating neighbor buffer for %d\n" , x); |
412 | |
413 | int k = 0; |
414 | if (x < two_power - 1 && C->free_cnt[two_power + x + 1]) { |
415 | i = two_power + x + 1; |
416 | } else { |
417 | int j = 1, l = 0, r = two_power; |
418 | while (i < two_power) { |
419 | i <<= 1; |
420 | int m = (l + r) >> 1; |
421 | if (x < m) { |
422 | if (C->free_cnt[i] > 0) { |
423 | r = m; |
424 | if (C->free_cnt[i+1] > 0) { |
425 | j = i + 1; |
426 | } |
427 | } else { |
428 | l = m; |
429 | i++; |
430 | } |
431 | } else if (C->free_cnt[i+1] > 0) { |
432 | l = m; |
433 | i++; |
434 | } else { |
435 | k = i = j; |
436 | while (i < two_power) { |
437 | i <<= 1; |
438 | if (!C->free_cnt[i]) { |
439 | i++; |
440 | } |
441 | assert (-- C->free_cnt[i] >= 0); |
442 | } |
443 | break; |
444 | } |
445 | } |
446 | } |
447 | if (!k) { |
448 | k = i; |
449 | } |
450 | while (k > 0) { |
451 | assert (-- C->free_cnt[k] >= 0); |
452 | k >>= 1; |
453 | } |
454 | } else { |
455 | int j = C->free_cnt[1] < 16 ? C->free_cnt[1] : 16; |
456 | j = ((long long) lrand48_j() * j) >> 31; |
457 | assert (j >= 0 && j < C->free_cnt[1]); |
458 | while (i < two_power) { |
459 | assert (-- C->free_cnt[i] >= 0); |
460 | i <<= 1; |
461 | if (C->free_cnt[i] <= j) { |
462 | j -= C->free_cnt[i]; |
463 | i++; |
464 | } |
465 | } |
466 | assert (-- C->free_cnt[i] == 0); |
467 | } |
468 | |
469 | assert (C != CH); |
470 | unlock_chunk (C); |
471 | //-- CH->free_buffers; |
472 | |
473 | i -= two_power; |
474 | vkprintf (3, "alloc_msg_buffer(%d) [chunk %p, size %d]: tot_buffers = %d, free_buffers = %d\n" , i, C, C->buffer_size, CH->tot_buffers, CH->free_buffers); |
475 | assert (i >= 0 && i < C->tot_buffers); |
476 | |
477 | struct msg_buffer *X = (struct msg_buffer *) ((char *) C->first_buffer + i * (C->buffer_size + 16)); |
478 | |
479 | X->chunk = C; |
480 | X->refcnt = 1; |
481 | X->magic = MSG_BUFFER_USED_MAGIC; |
482 | |
483 | //__sync_fetch_and_add (&total_used_buffers, 1); |
484 | MODULE_STAT->total_used_buffers_size += C->buffer_size; |
485 | MODULE_STAT->total_used_buffers ++; |
486 | |
487 | return X; |
488 | } |
489 | |
490 | /* allocates buffer of at least given size, -1 = maximal */ |
491 | struct msg_buffer *alloc_msg_buffer (struct msg_buffer *neighbor, int size_hint) { |
492 | if (!buffer_size_values) { |
493 | init_buffer_chunk_headers (); |
494 | } |
495 | int si = buffer_size_values - 1; |
496 | if (size_hint >= 0) { |
497 | while (si > 0 && ChunkHeaders[si-1].buffer_size >= size_hint) { |
498 | si--; |
499 | } |
500 | } |
501 | return alloc_msg_buffer_internal (neighbor, &ChunkHeaders[si], ChunkSave[si], si); |
502 | } |
503 | |
504 | int free_std_msg_buffer (struct msg_buffers_chunk *C, struct msg_buffer *X) { |
505 | assert (!X->refcnt && X->magic == MSG_BUFFER_USED_MAGIC && C->magic == MSG_CHUNK_USED_LOCKED_MAGIC && X->chunk == C); |
506 | int x = get_buffer_no (C, X); |
507 | int two_power = C->two_power; |
508 | vkprintf (3, "free_msg_buffer(%d)\n" , x); |
509 | x += two_power; |
510 | assert (!C->free_cnt[x]); |
511 | do { |
512 | assert (++C->free_cnt[x] > 0); |
513 | } while (x >>= 1); |
514 | |
515 | X->magic = MSG_BUFFER_FREE_MAGIC; |
516 | X->refcnt = -0x40000000; |
517 | //++ C->ch_head->free_buffers; |
518 | |
519 | MODULE_STAT->total_used_buffers --; |
520 | MODULE_STAT->total_used_buffers_size -= C->buffer_size; |
521 | |
522 | //if (C->free_cnt[1] == C->tot_buffers && C->ch_head->free_buffers * 4 >= C->tot_buffers * 5) { |
523 | // free_msg_buffers_chunk (C); |
524 | //} |
525 | |
526 | return 1; |
527 | } |
528 | |
529 | static int free_msg_buffer_job (job_t job, int op, struct job_thread *JT) { |
530 | switch (op) { |
531 | case JS_RUN: { |
532 | struct msg_buffer *X = *(void **)job->j_custom; |
533 | struct msg_buffers_chunk *C = X->chunk; |
534 | unsigned magic = C->magic; |
535 | assert (magic == MSG_CHUNK_USED_MAGIC || magic == MSG_CHUNK_USED_LOCKED_MAGIC); |
536 | C->free_buffer (C, X); |
537 | return JOB_COMPLETED; |
538 | } |
539 | case JS_FINISH: |
540 | assert (job->j_refcnt == 1); |
541 | return job_free (JOB_REF_PASS (job)); |
542 | default: |
543 | assert (0); |
544 | } |
545 | } |
546 | |
547 | int free_msg_buffer (struct msg_buffer *X) { |
548 | if (X->magic != MSG_BUFFER_USED_MAGIC) { |
549 | vkprintf (0, "magic = 0x%08x\n" , X->magic); |
550 | } |
551 | assert (X->magic == MSG_BUFFER_USED_MAGIC); |
552 | assert (!X->refcnt); |
553 | struct msg_buffers_chunk *C = X->chunk; |
554 | unsigned magic = C->magic; |
555 | assert (magic == MSG_CHUNK_USED_MAGIC || magic == MSG_CHUNK_USED_LOCKED_MAGIC); |
556 | |
557 | if (C->free_buffer == free_std_msg_buffer) { |
558 | if (try_lock_chunk (C)) { |
559 | C->free_buffer (C, X); |
560 | unlock_chunk (C); |
561 | return 1; |
562 | } else { |
563 | mpq_push_w (C->free_block_queue, X, 0); |
564 | |
565 | if (try_lock_chunk (C)) { |
566 | unlock_chunk (C); |
567 | } |
568 | return 1; |
569 | } |
570 | } else { |
571 | if (!this_job_thread || this_job_thread->thread_class == C->thread_class) { |
572 | return C->free_buffer (C, X); |
573 | } else { |
574 | job_t job = create_async_job (free_msg_buffer_job, JSC_ALLOW (C->thread_class, JS_RUN) | JSIG_FAST (JS_FINISH), C->thread_subclass, sizeof (void *), 0, JOB_REF_NULL); |
575 | *(void **)job->j_custom = X; |
576 | schedule_job (JOB_REF_PASS (job)); |
577 | return 1; |
578 | } |
579 | } |
580 | } |
581 | |
582 | int msg_buffer_reach_limit (double ratio) { |
583 | return SB_SUM_LL(total_used_buffers_size) >= ratio * max_allocated_buffer_bytes; |
584 | } |
585 | |
586 | double msg_buffer_usage (void) { |
587 | return (double) SB_SUM_LL(total_used_buffers_size) / (double) max_allocated_buffer_bytes; |
588 | } |
589 | |