1/*
2 This file is part of Mtproto-proxy Library.
3
4 Mtproto-proxy Library is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation, either version 2 of the License, or
7 (at your option) any later version.
8
9 Mtproto-proxy Library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with Mtproto-proxy Library. If not, see <http://www.gnu.org/licenses/>.
16
17 Copyright 2012-2013 Vkontakte Ltd
18 2012-2013 Nikolai Durov
19 2012-2013 Andrey Lopatin
20
21 Copyright 2014-2016 Telegram Messenger Inc
22 2015-2016 Vitaly Valtman
23*/
24
25#define _FILE_OFFSET_BITS 64
26
27#include <assert.h>
28#include <stddef.h>
29#include <stdio.h>
30#include <stdlib.h>
31#include <string.h>
32#include <unistd.h>
33
34#include "kprintf.h"
35#include "jobs/jobs.h"
36#include "common/common-stats.h"
37#include "common/server-functions.h"
38
39#define MODULE raw_msg_buffer
40
41int allocated_buffer_chunks, max_allocated_buffer_chunks, max_buffer_chunks;
42long long max_allocated_buffer_bytes;
43
44MODULE_STAT_TYPE {
45 long long total_used_buffers_size;
46 int total_used_buffers;
47 long long allocated_buffer_bytes;
48 long long buffer_chunk_alloc_ops;
49};
50
51MODULE_INIT
52
53MODULE_STAT_FUNCTION
54 SB_SUM_ONE_LL (total_used_buffers_size);
55 SB_SUM_ONE_I (total_used_buffers);
56 SB_SUM_ONE_LL (allocated_buffer_bytes);
57 SB_SUM_ONE_LL (buffer_chunk_alloc_ops);
58 sb_printf (sb,
59 "allocated_buffer_chunks\t%d\n"
60 "max_allocated_buffer_chunks\t%d\n"
61 "max_buffer_chunks\t%d\n"
62 "max_allocated_buffer_bytes\t%lld\n",
63 allocated_buffer_chunks,
64 max_allocated_buffer_chunks,
65 max_buffer_chunks,
66 max_allocated_buffer_bytes
67 );
68MODULE_STAT_FUNCTION_END
69
70void fetch_buffers_stat (struct buffers_stat *bs) {
71 bs->total_used_buffers_size = SB_SUM_LL (total_used_buffers_size);
72 bs->allocated_buffer_bytes = SB_SUM_LL (allocated_buffer_bytes);
73 bs->buffer_chunk_alloc_ops = SB_SUM_LL (buffer_chunk_alloc_ops);
74 bs->total_used_buffers = SB_SUM_I (total_used_buffers);
75 bs->allocated_buffer_chunks = allocated_buffer_chunks;
76 bs->max_allocated_buffer_chunks = max_allocated_buffer_chunks;
77 bs->max_allocated_buffer_bytes = max_allocated_buffer_bytes;
78 bs->max_buffer_chunks = max_buffer_chunks;
79}
80
81int buffer_size_values;
82int rwm_peak_recovery;
83struct msg_buffers_chunk ChunkHeaders[MAX_BUFFER_SIZE_VALUES];
84__thread struct msg_buffers_chunk *ChunkSave[MAX_BUFFER_SIZE_VALUES];
85
86int default_buffer_sizes[] = { 48, 512, 2048, 16384, 262144 };
87int default_buffer_sizes_cnt = sizeof (default_buffer_sizes) / 4;
88
89int free_std_msg_buffer (struct msg_buffers_chunk *C, struct msg_buffer *X);
90
91void init_buffer_chunk_headers (void) {
92 int i;
93 struct msg_buffers_chunk *CH;
94 assert (!buffer_size_values);
95 for (i = 0, CH = ChunkHeaders; i < sizeof (default_buffer_sizes) / sizeof (int); i++, CH++) {
96 CH->magic = MSG_CHUNK_HEAD_MAGIC;
97 CH->buffer_size = default_buffer_sizes[i];
98 CH->ch_next = CH->ch_prev = CH;
99 CH->free_buffer = 0;
100 assert (!i || default_buffer_sizes[i] > default_buffer_sizes[i-1]);
101 }
102 assert (i);
103 buffer_size_values = i;
104}
105
106static inline void prepare_bs_inv (struct msg_buffers_chunk *C) {
107 int x = C->buffer_size + 16;
108 int i = __builtin_ctz (x);
109 x >>= i;
110 x = 1 - x;
111 int y = 1;
112 while (x) {
113 y *= 1 + x;
114 x *= x;
115 }
116 C->bs_inverse = y;
117 C->bs_shift = i;
118}
119
120static void lock_chunk_head (struct msg_buffers_chunk *CH) {
121 while (1) {
122 if (__sync_bool_compare_and_swap (&CH->magic, MSG_CHUNK_HEAD_MAGIC, MSG_CHUNK_HEAD_LOCKED_MAGIC)) {
123 break;
124 }
125 usleep (1000);
126 }
127}
128
129static void unlock_chunk_head (struct msg_buffers_chunk *CH) {
130 CH->magic = MSG_CHUNK_HEAD_MAGIC;
131}
132
133static int try_lock_chunk (struct msg_buffers_chunk *C) {
134 if (C->magic != MSG_CHUNK_USED_MAGIC || !__sync_bool_compare_and_swap (&C->magic, MSG_CHUNK_USED_MAGIC, MSG_CHUNK_USED_LOCKED_MAGIC)) {
135 return 0;
136 }
137 while (1) {
138 struct msg_buffer *X = mpq_pop_nw (C->free_block_queue, 4);
139 if (!X) { break; }
140 assert (X->chunk == C);
141 C->free_buffer (C, X);
142 }
143 return 1;
144}
145
146static void unlock_chunk (struct msg_buffers_chunk *C) {
147 while (1) {
148 while (1) {
149 struct msg_buffer *X = mpq_pop_nw (C->free_block_queue, 4);
150 if (!X) { break; }
151 assert (X->chunk == C);
152 C->free_buffer (C, X);
153 }
154 C->magic = MSG_CHUNK_USED_MAGIC;
155
156 if (mpq_is_empty (C->free_block_queue) || !try_lock_chunk (C)) {
157 break;
158 }
159 }
160}
161
162// returns locked chunk
163struct msg_buffers_chunk *alloc_new_msg_buffers_chunk (struct msg_buffers_chunk *CH) {
164 unsigned magic = CH->magic;
165 assert (magic == MSG_CHUNK_HEAD_MAGIC || magic == MSG_CHUNK_HEAD_LOCKED_MAGIC);
166 if (allocated_buffer_chunks >= max_buffer_chunks) {
167 // ML
168 return 0;
169 }
170 struct msg_buffers_chunk *C = malloc (MSG_BUFFERS_CHUNK_SIZE);
171 if (!C) {
172 return 0;
173 }
174
175 int buffer_size = CH->buffer_size, two_power, chunk_buffers;
176 int buffer_hd_size = buffer_size + BUFF_HD_BYTES;
177 int align = buffer_hd_size & -buffer_hd_size;
178 if (align < 8) {
179 align = 8;
180 }
181 if (align > 64) {
182 align = 64;
183 }
184
185 int t = (MSG_BUFFERS_CHUNK_SIZE - offsetof (struct msg_buffers_chunk, free_cnt)) / (buffer_hd_size + 4);
186 two_power = 1;
187 while (two_power <= t) {
188 two_power <<= 1;
189 }
190
191 chunk_buffers = (MSG_BUFFERS_CHUNK_SIZE - offsetof (struct msg_buffers_chunk, free_cnt) - two_power * 4 - align) / buffer_hd_size;
192 assert (chunk_buffers > 0 && chunk_buffers < 65536 && chunk_buffers <= two_power);
193
194 C->magic = MSG_CHUNK_USED_LOCKED_MAGIC;
195 C->buffer_size = buffer_size;
196 C->free_buffer = free_std_msg_buffer;
197 C->ch_head = CH;
198
199
200 C->first_buffer = (struct msg_buffer *) (((long) C + offsetof (struct msg_buffers_chunk, free_cnt) + two_power * 4 + align - 1) & -align);
201 assert ((char *) (C->first_buffer) + chunk_buffers * buffer_hd_size <= (char *) C + MSG_BUFFERS_CHUNK_SIZE);
202
203 C->two_power = two_power;
204 C->tot_buffers = chunk_buffers;
205
206 C->refcnt = 1;
207
208 lock_chunk_head (CH);
209
210 CH->tot_buffers += chunk_buffers;
211 CH->free_buffers += chunk_buffers;
212 CH->tot_chunks++;
213
214 C->ch_next = CH->ch_next;
215 C->ch_prev = CH;
216 CH->ch_next = C;
217 C->ch_next->ch_prev = C;
218
219 unlock_chunk_head (CH);
220
221 MODULE_STAT->allocated_buffer_bytes += MSG_BUFFERS_CHUNK_SIZE;
222 __sync_fetch_and_add (&allocated_buffer_chunks, 1);
223
224 MODULE_STAT->buffer_chunk_alloc_ops ++;
225
226 while (1) {
227 barrier ();
228 int keep_max_allocated_buffer_chunks = max_allocated_buffer_chunks;
229 barrier ();
230 int keep_allocated_buffer_chunks = allocated_buffer_chunks;
231 barrier ();
232 if (keep_max_allocated_buffer_chunks >= keep_allocated_buffer_chunks) {
233 break;
234 }
235 __sync_bool_compare_and_swap (&max_allocated_buffer_chunks, keep_max_allocated_buffer_chunks, keep_allocated_buffer_chunks);
236 if (allocated_buffer_chunks >= max_buffer_chunks - 8 && max_buffer_chunks >= 32 && verbosity < 3) {
237 // verbosity = 3;
238 // vkprintf (1, "Setting verbosity to 3 (NOTICE) because of high buffer chunk usage (used %d, max %d)\n", allocated_buffer_chunks, max_buffer_chunks);
239 }
240 }
241 /*if (rwm_peak_recovery) {
242 if (allocated_buffer_chunks > (max_buffer_chunks >> 2)) {
243 do_udp_wait (1, 1.0);
244 }
245 if (allocated_buffer_chunks > (max_buffer_chunks >> 1)) {
246 do_udp_wait (2, 1.0);
247 }
248 }*/
249
250 prepare_bs_inv (C);
251
252 int i;
253 for (i = 0; i < chunk_buffers; i++) {
254 C->free_cnt[two_power+i] = 1;
255 }
256 memset (&C->free_cnt[two_power + chunk_buffers], 0, (two_power - chunk_buffers) * 2);
257
258 for (i = two_power - 1; i > 0; i--) {
259 C->free_cnt[i] = C->free_cnt[2*i] + C->free_cnt[2*i+1];
260 }
261
262 C->free_block_queue = alloc_mp_queue_w ();
263
264 //vkprintf (0, "allocated chunk %p\n", C);
265 return C;
266};
267
268void free_msg_buffers_chunk_internal (struct msg_buffers_chunk *C, struct msg_buffers_chunk *CH) {
269 assert (C->magic == MSG_CHUNK_USED_LOCKED_MAGIC);
270 unsigned magic = CH->magic;
271 assert (magic == MSG_CHUNK_HEAD_MAGIC || magic == MSG_CHUNK_HEAD_LOCKED_MAGIC);
272 assert (C->buffer_size == CH->buffer_size);
273 assert (C->tot_buffers == C->free_cnt[1]);
274 assert (CH == C->ch_head);
275
276 C->magic = 0;
277 C->ch_head = 0;
278
279 lock_chunk_head (CH);
280 C->ch_next->ch_prev = C->ch_prev;
281 C->ch_prev->ch_next = C->ch_next;
282
283 CH->tot_buffers -= C->tot_buffers;
284 CH->free_buffers -= C->tot_buffers;
285 CH->tot_chunks--;
286 unlock_chunk_head (CH);
287
288 assert (CH->tot_chunks >= 0);
289
290 __sync_fetch_and_add (&allocated_buffer_chunks, -1);
291 MODULE_STAT->allocated_buffer_bytes -= MSG_BUFFERS_CHUNK_SIZE;
292
293 memset (C, 0, sizeof (struct msg_buffers_chunk));
294 free (C);
295
296 int si = buffer_size_values - 1;
297 while (si > 0 && &ChunkHeaders[si-1] != CH) {
298 si--;
299 }
300 assert (si >= 0);
301
302 if (ChunkSave[si] == C) {
303 ChunkSave[si] = NULL;
304 }
305
306 free_mp_queue (C->free_block_queue);
307 C->free_block_queue = NULL;
308}
309
310
311void free_msg_buffers_chunk (struct msg_buffers_chunk *C) {
312 assert (C->magic == MSG_CHUNK_USED_LOCKED_MAGIC);
313 assert (C->free_cnt[1] == C->tot_buffers);
314
315 free_msg_buffers_chunk_internal (C, C->ch_head);
316}
317
318int init_msg_buffers (long max_buffer_bytes) {
319 if (!max_buffer_bytes) {
320 max_buffer_bytes = max_allocated_buffer_bytes ?: MSG_DEFAULT_MAX_ALLOCATED_BYTES;
321 }
322
323 assert (max_buffer_bytes >= 0 && max_buffer_bytes <= MSG_MAX_ALLOCATED_BYTES);
324 assert (max_buffer_bytes >= allocated_buffer_chunks * MSG_BUFFERS_CHUNK_SIZE);
325
326 max_allocated_buffer_bytes = max_buffer_bytes;
327 max_buffer_chunks = (unsigned long) max_buffer_bytes / MSG_BUFFERS_CHUNK_SIZE;
328
329 if (!buffer_size_values) {
330 init_buffer_chunk_headers ();
331 }
332
333 return 1;
334}
335
336static inline int get_buffer_no (struct msg_buffers_chunk *C, struct msg_buffer *X) {
337 unsigned x = ((char *) X - (char *) C->first_buffer);
338 x >>= C->bs_shift;
339 x *= C->bs_inverse;
340 assert (x <= (unsigned) C->tot_buffers && (char *) X == (char *) C->first_buffer + (C->buffer_size + 16) * x);
341 return x;
342}
343
344struct msg_buffer *alloc_msg_buffer_internal (struct msg_buffer *neighbor, struct msg_buffers_chunk *CH, struct msg_buffers_chunk *C_hint, int si) {
345 unsigned magic = CH->magic;
346 assert (magic == MSG_CHUNK_HEAD_MAGIC || magic == MSG_CHUNK_HEAD_LOCKED_MAGIC);
347 struct msg_buffers_chunk *C;
348 if (!C_hint) {
349 C = alloc_new_msg_buffers_chunk (CH);
350 if (!C) {
351 return 0;
352 }
353 } else {
354 int found = 0;
355 if (C_hint && C_hint->free_cnt[1] && try_lock_chunk (C_hint)) {
356 assert (C_hint->ch_head == CH);
357 C = C_hint;
358 if (C_hint->free_cnt[1]) {
359 found = 1;
360 } else {
361 unlock_chunk (C_hint);
362 }
363 }
364 if (!found) {
365 lock_chunk_head (CH);
366 struct msg_buffers_chunk *CF = C_hint ? C_hint : CH->ch_next;
367 C = CF;
368 do {
369 if (C == CH) {
370 C = C->ch_next;
371 continue;
372 }
373 if (!C->free_cnt[1]) {
374 C = C->ch_next;
375 continue;
376 }
377 if (!try_lock_chunk (C)) {
378 C = C->ch_next;
379 continue;
380 }
381 if (!C->free_cnt[1]) {
382 unlock_chunk (C);
383 C = C->ch_next;
384 continue;
385 }
386 found = 1;
387 break;
388 } while (C != CF);
389 unlock_chunk_head (CH);
390 if (!found) {
391 C = alloc_new_msg_buffers_chunk (CH);
392 if (!C) {
393 return 0;
394 }
395 }
396 if (C_hint) {
397 __sync_fetch_and_add (&C_hint->refcnt, -1);
398 }
399 }
400 }
401
402 assert (C != CH);
403 assert (C->free_cnt[1]);
404 assert (C->magic == MSG_CHUNK_USED_LOCKED_MAGIC);
405 ChunkSave[si] = C;
406
407 int two_power = C->two_power, i = 1;
408
409 if (neighbor && neighbor->chunk == C) {
410 int x = get_buffer_no (C, neighbor);
411 vkprintf (3, "alloc_msg_buffer: allocating neighbor buffer for %d\n", x);
412
413 int k = 0;
414 if (x < two_power - 1 && C->free_cnt[two_power + x + 1]) {
415 i = two_power + x + 1;
416 } else {
417 int j = 1, l = 0, r = two_power;
418 while (i < two_power) {
419 i <<= 1;
420 int m = (l + r) >> 1;
421 if (x < m) {
422 if (C->free_cnt[i] > 0) {
423 r = m;
424 if (C->free_cnt[i+1] > 0) {
425 j = i + 1;
426 }
427 } else {
428 l = m;
429 i++;
430 }
431 } else if (C->free_cnt[i+1] > 0) {
432 l = m;
433 i++;
434 } else {
435 k = i = j;
436 while (i < two_power) {
437 i <<= 1;
438 if (!C->free_cnt[i]) {
439 i++;
440 }
441 assert (-- C->free_cnt[i] >= 0);
442 }
443 break;
444 }
445 }
446 }
447 if (!k) {
448 k = i;
449 }
450 while (k > 0) {
451 assert (-- C->free_cnt[k] >= 0);
452 k >>= 1;
453 }
454 } else {
455 int j = C->free_cnt[1] < 16 ? C->free_cnt[1] : 16;
456 j = ((long long) lrand48_j() * j) >> 31;
457 assert (j >= 0 && j < C->free_cnt[1]);
458 while (i < two_power) {
459 assert (-- C->free_cnt[i] >= 0);
460 i <<= 1;
461 if (C->free_cnt[i] <= j) {
462 j -= C->free_cnt[i];
463 i++;
464 }
465 }
466 assert (-- C->free_cnt[i] == 0);
467 }
468
469 assert (C != CH);
470 unlock_chunk (C);
471 //-- CH->free_buffers;
472
473 i -= two_power;
474 vkprintf (3, "alloc_msg_buffer(%d) [chunk %p, size %d]: tot_buffers = %d, free_buffers = %d\n", i, C, C->buffer_size, CH->tot_buffers, CH->free_buffers);
475 assert (i >= 0 && i < C->tot_buffers);
476
477 struct msg_buffer *X = (struct msg_buffer *) ((char *) C->first_buffer + i * (C->buffer_size + 16));
478
479 X->chunk = C;
480 X->refcnt = 1;
481 X->magic = MSG_BUFFER_USED_MAGIC;
482
483 //__sync_fetch_and_add (&total_used_buffers, 1);
484 MODULE_STAT->total_used_buffers_size += C->buffer_size;
485 MODULE_STAT->total_used_buffers ++;
486
487 return X;
488}
489
490/* allocates buffer of at least given size, -1 = maximal */
491struct msg_buffer *alloc_msg_buffer (struct msg_buffer *neighbor, int size_hint) {
492 if (!buffer_size_values) {
493 init_buffer_chunk_headers ();
494 }
495 int si = buffer_size_values - 1;
496 if (size_hint >= 0) {
497 while (si > 0 && ChunkHeaders[si-1].buffer_size >= size_hint) {
498 si--;
499 }
500 }
501 return alloc_msg_buffer_internal (neighbor, &ChunkHeaders[si], ChunkSave[si], si);
502}
503
504int free_std_msg_buffer (struct msg_buffers_chunk *C, struct msg_buffer *X) {
505 assert (!X->refcnt && X->magic == MSG_BUFFER_USED_MAGIC && C->magic == MSG_CHUNK_USED_LOCKED_MAGIC && X->chunk == C);
506 int x = get_buffer_no (C, X);
507 int two_power = C->two_power;
508 vkprintf (3, "free_msg_buffer(%d)\n", x);
509 x += two_power;
510 assert (!C->free_cnt[x]);
511 do {
512 assert (++C->free_cnt[x] > 0);
513 } while (x >>= 1);
514
515 X->magic = MSG_BUFFER_FREE_MAGIC;
516 X->refcnt = -0x40000000;
517 //++ C->ch_head->free_buffers;
518
519 MODULE_STAT->total_used_buffers --;
520 MODULE_STAT->total_used_buffers_size -= C->buffer_size;
521
522 //if (C->free_cnt[1] == C->tot_buffers && C->ch_head->free_buffers * 4 >= C->tot_buffers * 5) {
523 // free_msg_buffers_chunk (C);
524 //}
525
526 return 1;
527}
528
529static int free_msg_buffer_job (job_t job, int op, struct job_thread *JT) {
530 switch (op) {
531 case JS_RUN: {
532 struct msg_buffer *X = *(void **)job->j_custom;
533 struct msg_buffers_chunk *C = X->chunk;
534 unsigned magic = C->magic;
535 assert (magic == MSG_CHUNK_USED_MAGIC || magic == MSG_CHUNK_USED_LOCKED_MAGIC);
536 C->free_buffer (C, X);
537 return JOB_COMPLETED;
538 }
539 case JS_FINISH:
540 assert (job->j_refcnt == 1);
541 return job_free (JOB_REF_PASS (job));
542 default:
543 assert (0);
544 }
545}
546
547int free_msg_buffer (struct msg_buffer *X) {
548 if (X->magic != MSG_BUFFER_USED_MAGIC) {
549 vkprintf (0, "magic = 0x%08x\n", X->magic);
550 }
551 assert (X->magic == MSG_BUFFER_USED_MAGIC);
552 assert (!X->refcnt);
553 struct msg_buffers_chunk *C = X->chunk;
554 unsigned magic = C->magic;
555 assert (magic == MSG_CHUNK_USED_MAGIC || magic == MSG_CHUNK_USED_LOCKED_MAGIC);
556
557 if (C->free_buffer == free_std_msg_buffer) {
558 if (try_lock_chunk (C)) {
559 C->free_buffer (C, X);
560 unlock_chunk (C);
561 return 1;
562 } else {
563 mpq_push_w (C->free_block_queue, X, 0);
564
565 if (try_lock_chunk (C)) {
566 unlock_chunk (C);
567 }
568 return 1;
569 }
570 } else {
571 if (!this_job_thread || this_job_thread->thread_class == C->thread_class) {
572 return C->free_buffer (C, X);
573 } else {
574 job_t job = create_async_job (free_msg_buffer_job, JSC_ALLOW (C->thread_class, JS_RUN) | JSIG_FAST (JS_FINISH), C->thread_subclass, sizeof (void *), 0, JOB_REF_NULL);
575 *(void **)job->j_custom = X;
576 schedule_job (JOB_REF_PASS (job));
577 return 1;
578 }
579 }
580}
581
582int msg_buffer_reach_limit (double ratio) {
583 return SB_SUM_LL(total_used_buffers_size) >= ratio * max_allocated_buffer_bytes;
584}
585
586double msg_buffer_usage (void) {
587 return (double) SB_SUM_LL(total_used_buffers_size) / (double) max_allocated_buffer_bytes;
588}
589