Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino
Contents of /trunk/src/sorting.c
Parent Directory
|
Revision Log
Revision 1988 -
(show annotations)
(download)
(as text)
Sat Apr 13 18:16:13 2019 UTC (5 years, 11 months ago) by amb
File MIME type: text/x-csrc
File size: 28422 byte(s)
Sat Apr 13 18:16:13 2019 UTC (5 years, 11 months ago) by amb
File MIME type: text/x-csrc
File size: 28422 byte(s)
Ensure that data pointers are correctly aligned - found by gcc's runtime sanitizer (make SANITIZE=1 test).
1 | /*************************************** |
2 | Merge sort functions. |
3 | |
4 | Part of the Routino routing software. |
5 | ******************/ /****************** |
6 | This file Copyright 2009-2015, 2017, 2019 Andrew M. Bishop |
7 | |
8 | This program is free software: you can redistribute it and/or modify |
9 | it under the terms of the GNU Affero General Public License as published by |
10 | the Free Software Foundation, either version 3 of the License, or |
11 | (at your option) any later version. |
12 | |
13 | This program is distributed in the hope that it will be useful, |
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
16 | GNU Affero General Public License for more details. |
17 | |
18 | You should have received a copy of the GNU Affero General Public License |
19 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
20 | ***************************************/ |
21 | |
22 | |
23 | #include <stdio.h> |
24 | #include <stdlib.h> |
25 | #include <string.h> |
26 | |
27 | #if defined(USE_PTHREADS) && USE_PTHREADS |
28 | #include <pthread.h> |
29 | #endif |
30 | |
31 | #include "types.h" |
32 | |
33 | #include "logging.h" |
34 | #include "files.h" |
35 | #include "sorting.h" |
36 | |
37 | |
38 | /* Global variables */ |
39 | |
40 | /*+ The command line '--tmpdir' option or its default value. +*/ |
41 | extern char *option_tmpdirname; |
42 | |
43 | /*+ The amount of RAM to use for filesorting. +*/ |
44 | extern size_t option_filesort_ramsize; |
45 | |
46 | /*+ The number of filesorting threads allowed. +*/ |
47 | extern int option_filesort_threads; |
48 | |
49 | |
50 | /* Thread data type definitions */ |
51 | |
52 | /*+ A data type for holding data for a thread. +*/ |
53 | typedef struct _thread_data |
54 | { |
55 | #if defined(USE_PTHREADS) && USE_PTHREADS |
56 | |
57 | pthread_t thread; /*+ The thread identifier. +*/ |
58 | |
59 | int running; /*+ A flag indicating the current state of the thread. +*/ |
60 | |
61 | #endif |
62 | |
63 | char *data; /*+ The main data array. +*/ |
64 | void **datap; /*+ An array of pointers to the data objects. +*/ |
65 | size_t n; /*+ The number of pointers. +*/ |
66 | |
67 | char *filename; /*+ The name of the file to write the results to. +*/ |
68 | |
69 | size_t itemsize; /*+ The size of each item. +*/ |
70 | int (*compare)(const void*,const void*); /*+ The comparison function. +*/ |
71 | } |
72 | thread_data; |
73 | |
74 | /* Thread variables */ |
75 | |
76 | #if defined(USE_PTHREADS) && USE_PTHREADS |
77 | |
78 | static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; |
79 | static pthread_mutex_t files_mutex = PTHREAD_MUTEX_INITIALIZER; |
80 | static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER; |
81 | |
82 | #endif |
83 | |
84 | /* Thread helper functions */ |
85 | |
86 | static void *filesort_fixed_heapsort_thread(thread_data *thread); |
87 | static void *filesort_vary_heapsort_thread(thread_data *thread); |
88 | |
89 | |
90 | /*++++++++++++++++++++++++++++++++++++++ |
91 | A function to sort the contents of a file of fixed length objects using a |
92 | limited amount of RAM. |
93 | |
94 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
95 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
96 | The individual sort steps and the merge step both use a "Heap sort" |
97 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
98 | if the data is already partially sorted. |
99 | |
100 | index_t filesort_fixed Returns the number of objects kept. |
101 | |
102 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
103 | |
104 | int fd_out The file descriptor of the output file (opened for writing and empty). |
105 | |
106 | size_t itemsize The size of each item in the file that needs sorting. |
107 | |
108 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
109 | each item before they have been sorted. The second parameter is the number of objects |
110 | previously read from the input file. If the function returns 1 then the object is kept |
111 | and it is sorted, otherwise it is ignored. |
112 | |
113 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
114 | to qsort if the data to be sorted is an array of things not pointers. |
115 | |
116 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for |
117 | each item after they have been sorted. The second parameter is the number of objects |
118 | already written to the output file. If the function returns 1 then the object is written |
119 | to the output file., otherwise it is ignored. |
120 | ++++++++++++++++++++++++++++++++++++++*/ |
121 | |
122 | index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t), |
123 | int (*compare_function)(const void*,const void*), |
124 | int (*post_sort_function)(void*,index_t)) |
125 | { |
126 | int *fds=NULL,*heap=NULL; |
127 | int nfiles=0,ndata=0; |
128 | index_t count_out=0,count_in=0,total=0; |
129 | size_t nitems,item; |
130 | char *data; |
131 | void **datap; |
132 | thread_data *threads; |
133 | int i,more=1; |
134 | #if defined(USE_PTHREADS) && USE_PTHREADS |
135 | int nthreads=0; |
136 | #endif |
137 | |
138 | /* Allocate the RAM buffer and other bits */ |
139 | |
140 | nitems=(size_t)SizeFileFD(fd_in)/itemsize; |
141 | |
142 | if(nitems==0) |
143 | return(0); |
144 | |
145 | if((nitems*(itemsize+sizeof(void*)))<option_filesort_ramsize) |
146 | nitems=1+nitems/option_filesort_threads; |
147 | else |
148 | nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*))); |
149 | |
150 | threads=(thread_data*)calloc(option_filesort_threads,sizeof(thread_data)); |
151 | |
152 | for(i=0;i<option_filesort_threads;i++) |
153 | { |
154 | threads[i].data=malloc(nitems*itemsize); |
155 | threads[i].datap=malloc(nitems*sizeof(void*)); |
156 | |
157 | log_malloc(threads[i].data ,nitems*itemsize); |
158 | log_malloc(threads[i].datap,nitems*sizeof(void*)); |
159 | |
160 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
161 | |
162 | threads[i].itemsize=itemsize; |
163 | threads[i].compare=compare_function; |
164 | } |
165 | |
166 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
167 | |
168 | do |
169 | { |
170 | int thread=0; |
171 | |
172 | #if defined(USE_PTHREADS) && USE_PTHREADS |
173 | |
174 | if(option_filesort_threads>1) |
175 | { |
176 | /* Find a spare slot (one *must* be unused at all times) */ |
177 | |
178 | pthread_mutex_lock(&running_mutex); |
179 | |
180 | for(thread=0;thread<option_filesort_threads;thread++) |
181 | if(!threads[thread].running) |
182 | break; |
183 | |
184 | pthread_mutex_unlock(&running_mutex); |
185 | } |
186 | |
187 | #endif |
188 | |
189 | /* Read in the data and create pointers */ |
190 | |
191 | for(item=0;item<nitems;) |
192 | { |
193 | threads[thread].datap[item]=threads[thread].data+item*itemsize; |
194 | |
195 | if(ReadFileBuffered(fd_in,threads[thread].datap[item],itemsize)) |
196 | { |
197 | more=0; |
198 | break; |
199 | } |
200 | |
201 | if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in)) |
202 | { |
203 | item++; |
204 | total++; |
205 | } |
206 | |
207 | count_in++; |
208 | } |
209 | |
210 | threads[thread].n=item; |
211 | |
212 | /* Shortcut if there is no previous data and no more data (i.e. no data at all) */ |
213 | |
214 | if(more==0 && total==0) |
215 | goto tidy_and_exit; |
216 | |
217 | /* No new data read in this time round */ |
218 | |
219 | if(threads[thread].n==0) |
220 | break; |
221 | |
222 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
223 | |
224 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
225 | |
226 | #if defined(USE_PTHREADS) && USE_PTHREADS |
227 | |
228 | /* Shortcut if only one file, don't write to disk */ |
229 | |
230 | if(more==0 && nfiles==0) |
231 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
232 | else if(option_filesort_threads>1) |
233 | { |
234 | pthread_mutex_lock(&running_mutex); |
235 | |
236 | while(nthreads==(option_filesort_threads-1)) |
237 | { |
238 | for(i=0;i<option_filesort_threads;i++) |
239 | if(threads[i].running==2) |
240 | { |
241 | pthread_join(threads[i].thread,NULL); |
242 | threads[i].running=0; |
243 | nthreads--; |
244 | } |
245 | |
246 | if(nthreads==(option_filesort_threads-1)) |
247 | pthread_cond_wait(&running_cond,&running_mutex); |
248 | } |
249 | |
250 | threads[thread].running=1; |
251 | |
252 | pthread_mutex_unlock(&running_mutex); |
253 | |
254 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]); |
255 | |
256 | nthreads++; |
257 | } |
258 | else |
259 | filesort_fixed_heapsort_thread(&threads[thread]); |
260 | |
261 | #else |
262 | |
263 | /* Shortcut if only one file, don't write to disk */ |
264 | |
265 | if(more==0 && nfiles==0) |
266 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
267 | else |
268 | filesort_fixed_heapsort_thread(&threads[thread]); |
269 | |
270 | #endif |
271 | |
272 | nfiles++; |
273 | } |
274 | while(more); |
275 | |
276 | /* Wait for all of the threads to finish */ |
277 | |
278 | #if defined(USE_PTHREADS) && USE_PTHREADS |
279 | |
280 | while(option_filesort_threads>1 && nthreads) |
281 | { |
282 | pthread_mutex_lock(&running_mutex); |
283 | |
284 | pthread_cond_wait(&running_cond,&running_mutex); |
285 | |
286 | for(i=0;i<option_filesort_threads;i++) |
287 | if(threads[i].running==2) |
288 | { |
289 | pthread_join(threads[i].thread,NULL); |
290 | threads[i].running=0; |
291 | nthreads--; |
292 | } |
293 | |
294 | pthread_mutex_unlock(&running_mutex); |
295 | } |
296 | |
297 | #endif |
298 | |
299 | /* Shortcut if there are no files */ |
300 | |
301 | if(nfiles==0) |
302 | goto tidy_and_exit; |
303 | |
304 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
305 | |
306 | if(nfiles==1) |
307 | { |
308 | for(item=0;item<threads[0].n;item++) |
309 | { |
310 | if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out)) |
311 | { |
312 | WriteFileBuffered(fd_out,threads[0].datap[item],itemsize); |
313 | count_out++; |
314 | } |
315 | } |
316 | |
317 | DeleteFile(threads[0].filename); |
318 | |
319 | goto tidy_and_exit; |
320 | } |
321 | |
322 | /* Check that number of files is less than file size */ |
323 | |
324 | logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)"); |
325 | |
326 | /* Open all of the temporary files */ |
327 | |
328 | fds=(int*)malloc(nfiles*sizeof(int)); |
329 | |
330 | for(i=0;i<nfiles;i++) |
331 | { |
332 | char *filename=threads[0].filename; |
333 | |
334 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
335 | |
336 | fds[i]=ReOpenFileBuffered(filename); |
337 | |
338 | DeleteFile(filename); |
339 | } |
340 | |
341 | /* Perform an n-way merge using a binary heap */ |
342 | |
343 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
344 | |
345 | data =threads[0].data; |
346 | datap=threads[0].datap; |
347 | |
348 | /* Fill the heap to start with */ |
349 | |
350 | for(i=0;i<nfiles;i++) |
351 | { |
352 | int index; |
353 | |
354 | datap[i]=data+i*itemsize; |
355 | |
356 | ReadFileBuffered(fds[i],datap[i],itemsize); |
357 | |
358 | index=i+1; |
359 | |
360 | heap[index]=i; |
361 | |
362 | /* Bubble up the new value */ |
363 | |
364 | while(index>1) |
365 | { |
366 | int newindex; |
367 | int temp; |
368 | |
369 | newindex=index/2; |
370 | |
371 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
372 | break; |
373 | |
374 | temp=heap[index]; |
375 | heap[index]=heap[newindex]; |
376 | heap[newindex]=temp; |
377 | |
378 | index=newindex; |
379 | } |
380 | } |
381 | |
382 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
383 | |
384 | ndata=nfiles; |
385 | |
386 | do |
387 | { |
388 | int index=1; |
389 | |
390 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
391 | { |
392 | WriteFileBuffered(fd_out,datap[heap[index]],itemsize); |
393 | count_out++; |
394 | } |
395 | |
396 | if(ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize)) |
397 | { |
398 | heap[index]=heap[ndata]; |
399 | ndata--; |
400 | } |
401 | |
402 | /* Bubble down the new value */ |
403 | |
404 | while((2*index)<ndata) |
405 | { |
406 | int newindex; |
407 | int temp; |
408 | |
409 | newindex=2*index; |
410 | |
411 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
412 | newindex=newindex+1; |
413 | |
414 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
415 | break; |
416 | |
417 | temp=heap[newindex]; |
418 | heap[newindex]=heap[index]; |
419 | heap[index]=temp; |
420 | |
421 | index=newindex; |
422 | } |
423 | |
424 | if((2*index)==ndata) |
425 | { |
426 | int newindex; |
427 | int temp; |
428 | |
429 | newindex=2*index; |
430 | |
431 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
432 | ; /* break */ |
433 | else |
434 | { |
435 | temp=heap[newindex]; |
436 | heap[newindex]=heap[index]; |
437 | heap[index]=temp; |
438 | } |
439 | } |
440 | } |
441 | while(ndata>0); |
442 | |
443 | /* Tidy up */ |
444 | |
445 | tidy_and_exit: |
446 | |
447 | if(fds) |
448 | { |
449 | for(i=0;i<nfiles;i++) |
450 | CloseFileBuffered(fds[i]); |
451 | free(fds); |
452 | } |
453 | |
454 | if(heap) |
455 | free(heap); |
456 | |
457 | for(i=0;i<option_filesort_threads;i++) |
458 | { |
459 | log_free(threads[i].data); |
460 | log_free(threads[i].datap); |
461 | |
462 | free(threads[i].data); |
463 | free(threads[i].datap); |
464 | |
465 | free(threads[i].filename); |
466 | } |
467 | |
468 | free(threads); |
469 | |
470 | return(count_out); |
471 | } |
472 | |
473 | |
474 | /*++++++++++++++++++++++++++++++++++++++ |
475 | A function to sort the contents of a file of variable length objects (each |
476 | preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM. |
477 | |
478 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
479 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
480 | The individual sort steps and the merge step both use a "Heap sort" |
481 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
482 | if the data is already partially sorted. |
483 | |
484 | index_t filesort_vary Returns the number of objects kept. |
485 | |
486 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
487 | |
488 | int fd_out The file descriptor of the output file (opened for writing and empty). |
489 | |
490 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
491 | each item before they have been sorted. The second parameter is the number of objects |
492 | previously read from the input file. If the function returns 1 then the object is kept |
493 | and it is sorted, otherwise it is ignored. |
494 | |
495 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
496 | to qsort if the data to be sorted is an array of things not pointers. |
497 | |
498 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for |
499 | each item after they have been sorted. The second parameter is the number of objects |
500 | already written to the output file. If the function returns 1 then the object is written |
501 | to the output file., otherwise it is ignored. |
502 | ++++++++++++++++++++++++++++++++++++++*/ |
503 | |
504 | index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t), |
505 | int (*compare_function)(const void*,const void*), |
506 | int (*post_sort_function)(void*,index_t)) |
507 | { |
508 | int *fds=NULL,*heap=NULL; |
509 | int nfiles=0,ndata=0; |
510 | index_t count_out=0,count_in=0,total=0; |
511 | size_t datasize,item; |
512 | FILESORT_VARINT nextitemsize,largestitemsize=0; |
513 | char *data; |
514 | void **datap; |
515 | thread_data *threads; |
516 | int i,more=1; |
517 | #if defined(USE_PTHREADS) && USE_PTHREADS |
518 | int nthreads=0; |
519 | #endif |
520 | |
521 | /* Allocate the RAM buffer and other bits */ |
522 | |
523 | datasize=(size_t)SizeFileFD(fd_in); |
524 | |
525 | if(datasize==0) |
526 | return(0); |
527 | |
528 | /* We can not know in advance how many data items there are. Each |
529 | one will require RAM for data, FILESORT_VARALIGN and sizeof(void*) |
530 | Assume that data+FILESORT_VARALIGN+sizeof(void*) is 4*data. */ |
531 | |
532 | if((datasize*4)<option_filesort_ramsize) |
533 | datasize=(datasize*4)/option_filesort_threads; |
534 | else |
535 | datasize=option_filesort_ramsize/option_filesort_threads; |
536 | |
537 | datasize=FILESORT_VARALIGN*((datasize+FILESORT_VARALIGN-1)/FILESORT_VARALIGN); |
538 | |
539 | threads=(thread_data*)calloc(option_filesort_threads,sizeof(thread_data)); |
540 | |
541 | for(i=0;i<option_filesort_threads;i++) |
542 | { |
543 | threads[i].data=malloc(datasize); |
544 | threads[i].datap=NULL; |
545 | |
546 | log_malloc(threads[i].data,datasize); |
547 | |
548 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
549 | |
550 | threads[i].compare=compare_function; |
551 | } |
552 | |
553 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
554 | |
555 | if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */ |
556 | goto tidy_and_exit; |
557 | |
558 | do |
559 | { |
560 | size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE; |
561 | int thread=0; |
562 | |
563 | #if defined(USE_PTHREADS) && USE_PTHREADS |
564 | |
565 | if(option_filesort_threads>1) |
566 | { |
567 | /* Find a spare slot (one *must* be unused at all times) */ |
568 | |
569 | pthread_mutex_lock(&running_mutex); |
570 | |
571 | for(thread=0;thread<option_filesort_threads;thread++) |
572 | if(!threads[thread].running) |
573 | break; |
574 | |
575 | pthread_mutex_unlock(&running_mutex); |
576 | } |
577 | |
578 | #endif |
579 | |
580 | threads[thread].datap=(void**)(threads[thread].data+datasize); |
581 | |
582 | threads[thread].n=0; |
583 | |
584 | /* Read in the data and create pointers */ |
585 | |
586 | while((ramused+FILESORT_VARSIZE+nextitemsize)<=(size_t)((char*)threads[thread].datap-sizeof(void*)-threads[thread].data)) |
587 | { |
588 | FILESORT_VARINT itemsize=nextitemsize; |
589 | |
590 | *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize; |
591 | |
592 | ramused+=FILESORT_VARSIZE; |
593 | |
594 | ReadFileBuffered(fd_in,threads[thread].data+ramused,itemsize); |
595 | |
596 | if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in)) |
597 | { |
598 | *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */ |
599 | |
600 | if(itemsize>largestitemsize) |
601 | largestitemsize=itemsize; |
602 | |
603 | ramused+=itemsize; |
604 | |
605 | ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARALIGN-1)/FILESORT_VARALIGN); |
606 | ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE; |
607 | |
608 | total++; |
609 | threads[thread].n++; |
610 | } |
611 | else |
612 | ramused-=FILESORT_VARSIZE; |
613 | |
614 | count_in++; |
615 | |
616 | if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE)) |
617 | { |
618 | more=0; |
619 | break; |
620 | } |
621 | } |
622 | |
623 | /* No new data read in this time round */ |
624 | |
625 | if(threads[thread].n==0) |
626 | break; |
627 | |
628 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
629 | |
630 | if(more==0 && nfiles==0) |
631 | threads[thread].filename[0]=0; |
632 | else |
633 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
634 | |
635 | #if defined(USE_PTHREADS) && USE_PTHREADS |
636 | |
637 | /* Shortcut if only one file, don't write to disk */ |
638 | |
639 | if(more==0 && nfiles==0) |
640 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
641 | else if(option_filesort_threads>1) |
642 | { |
643 | pthread_mutex_lock(&running_mutex); |
644 | |
645 | while(nthreads==(option_filesort_threads-1)) |
646 | { |
647 | for(i=0;i<option_filesort_threads;i++) |
648 | if(threads[i].running==2) |
649 | { |
650 | pthread_join(threads[i].thread,NULL); |
651 | threads[i].running=0; |
652 | nthreads--; |
653 | } |
654 | |
655 | if(nthreads==(option_filesort_threads-1)) |
656 | pthread_cond_wait(&running_cond,&running_mutex); |
657 | } |
658 | |
659 | threads[thread].running=1; |
660 | |
661 | pthread_mutex_unlock(&running_mutex); |
662 | |
663 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]); |
664 | |
665 | nthreads++; |
666 | } |
667 | else |
668 | filesort_vary_heapsort_thread(&threads[thread]); |
669 | |
670 | #else |
671 | |
672 | /* Shortcut if only one file, don't write to disk */ |
673 | |
674 | if(more==0 && nfiles==0) |
675 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
676 | else |
677 | filesort_vary_heapsort_thread(&threads[thread]); |
678 | |
679 | #endif |
680 | |
681 | nfiles++; |
682 | } |
683 | while(more); |
684 | |
685 | /* Wait for all of the threads to finish */ |
686 | |
687 | #if defined(USE_PTHREADS) && USE_PTHREADS |
688 | |
689 | while(option_filesort_threads>1 && nthreads) |
690 | { |
691 | pthread_mutex_lock(&running_mutex); |
692 | |
693 | pthread_cond_wait(&running_cond,&running_mutex); |
694 | |
695 | for(i=0;i<option_filesort_threads;i++) |
696 | if(threads[i].running==2) |
697 | { |
698 | pthread_join(threads[i].thread,NULL); |
699 | threads[i].running=0; |
700 | nthreads--; |
701 | } |
702 | |
703 | pthread_mutex_unlock(&running_mutex); |
704 | } |
705 | |
706 | #endif |
707 | |
708 | /* Shortcut if there are no files */ |
709 | |
710 | if(nfiles==0) |
711 | goto tidy_and_exit; |
712 | |
713 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
714 | |
715 | if(nfiles==1) |
716 | { |
717 | for(item=0;item<threads[0].n;item++) |
718 | { |
719 | if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out)) |
720 | { |
721 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)((char*)threads[0].datap[item]-FILESORT_VARSIZE); |
722 | |
723 | WriteFileBuffered(fd_out,(char*)threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
724 | count_out++; |
725 | } |
726 | } |
727 | |
728 | DeleteFile(threads[0].filename); |
729 | |
730 | goto tidy_and_exit; |
731 | } |
732 | |
733 | /* Check that number of files is less than file size */ |
734 | |
735 | largestitemsize=FILESORT_VARALIGN*((largestitemsize+FILESORT_VARALIGN-1)/FILESORT_VARALIGN); |
736 | |
737 | logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/(FILESORT_VARALIGN+largestitemsize)),"Too many temporary files (use more sorting memory?)"); |
738 | |
739 | /* Open all of the temporary files */ |
740 | |
741 | fds=(int*)malloc(nfiles*sizeof(int)); |
742 | |
743 | for(i=0;i<nfiles;i++) |
744 | { |
745 | char *filename=threads[0].filename; |
746 | |
747 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
748 | |
749 | fds[i]=ReOpenFileBuffered(filename); |
750 | |
751 | DeleteFile(filename); |
752 | } |
753 | |
754 | /* Perform an n-way merge using a binary heap */ |
755 | |
756 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
757 | |
758 | data=threads[0].data; |
759 | datap=(void**)(data+datasize-nfiles*sizeof(void*)); |
760 | |
761 | /* Fill the heap to start with */ |
762 | |
763 | for(i=0;i<nfiles;i++) |
764 | { |
765 | int index; |
766 | FILESORT_VARINT itemsize; |
767 | |
768 | datap[i]=data+FILESORT_VARALIGN+i*(largestitemsize+FILESORT_VARALIGN); |
769 | |
770 | ReadFileBuffered(fds[i],&itemsize,FILESORT_VARSIZE); |
771 | |
772 | *(FILESORT_VARINT*)((char*)datap[i]-FILESORT_VARSIZE)=itemsize; |
773 | |
774 | ReadFileBuffered(fds[i],datap[i],itemsize); |
775 | |
776 | index=i+1; |
777 | |
778 | heap[index]=i; |
779 | |
780 | /* Bubble up the new value */ |
781 | |
782 | while(index>1) |
783 | { |
784 | int newindex; |
785 | int temp; |
786 | |
787 | newindex=index/2; |
788 | |
789 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
790 | break; |
791 | |
792 | temp=heap[index]; |
793 | heap[index]=heap[newindex]; |
794 | heap[newindex]=temp; |
795 | |
796 | index=newindex; |
797 | } |
798 | } |
799 | |
800 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
801 | |
802 | ndata=nfiles; |
803 | |
804 | do |
805 | { |
806 | int index=1; |
807 | FILESORT_VARINT itemsize; |
808 | |
809 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
810 | { |
811 | itemsize=*(FILESORT_VARINT*)((char*)datap[heap[index]]-FILESORT_VARSIZE); |
812 | |
813 | WriteFileBuffered(fd_out,(char*)datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
814 | count_out++; |
815 | } |
816 | |
817 | if(ReadFileBuffered(fds[heap[index]],&itemsize,FILESORT_VARSIZE)) |
818 | { |
819 | heap[index]=heap[ndata]; |
820 | ndata--; |
821 | } |
822 | else |
823 | { |
824 | *(FILESORT_VARINT*)((char*)datap[heap[index]]-FILESORT_VARSIZE)=itemsize; |
825 | |
826 | ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize); |
827 | } |
828 | |
829 | /* Bubble down the new value */ |
830 | |
831 | while((2*index)<ndata) |
832 | { |
833 | int newindex; |
834 | int temp; |
835 | |
836 | newindex=2*index; |
837 | |
838 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
839 | newindex=newindex+1; |
840 | |
841 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
842 | break; |
843 | |
844 | temp=heap[newindex]; |
845 | heap[newindex]=heap[index]; |
846 | heap[index]=temp; |
847 | |
848 | index=newindex; |
849 | } |
850 | |
851 | if((2*index)==ndata) |
852 | { |
853 | int newindex; |
854 | int temp; |
855 | |
856 | newindex=2*index; |
857 | |
858 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
859 | ; /* break */ |
860 | else |
861 | { |
862 | temp=heap[newindex]; |
863 | heap[newindex]=heap[index]; |
864 | heap[index]=temp; |
865 | } |
866 | } |
867 | } |
868 | while(ndata>0); |
869 | |
870 | /* Tidy up */ |
871 | |
872 | tidy_and_exit: |
873 | |
874 | if(fds) |
875 | { |
876 | for(i=0;i<nfiles;i++) |
877 | CloseFileBuffered(fds[i]); |
878 | free(fds); |
879 | } |
880 | |
881 | if(heap) |
882 | free(heap); |
883 | |
884 | for(i=0;i<option_filesort_threads;i++) |
885 | { |
886 | log_free(threads[i].data); |
887 | |
888 | free(threads[i].data); |
889 | |
890 | free(threads[i].filename); |
891 | } |
892 | |
893 | free(threads); |
894 | |
895 | return(count_out); |
896 | } |
897 | |
898 | |
899 | /*++++++++++++++++++++++++++++++++++++++ |
900 | A wrapper function that can be run in a thread for fixed data. |
901 | |
902 | void *filesort_fixed_heapsort_thread Returns NULL (required to return void*). |
903 | |
904 | thread_data *thread The data to be processed in this thread. |
905 | ++++++++++++++++++++++++++++++++++++++*/ |
906 | |
907 | static void *filesort_fixed_heapsort_thread(thread_data *thread) |
908 | { |
909 | int fd; |
910 | size_t item; |
911 | |
912 | /* Sort the data pointers using a heap sort */ |
913 | |
914 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
915 | |
916 | /* Create a temporary file and write the result */ |
917 | |
918 | #if defined(USE_PTHREADS) && USE_PTHREADS |
919 | |
920 | if(option_filesort_threads>1) |
921 | pthread_mutex_lock(&files_mutex); |
922 | |
923 | #endif |
924 | |
925 | fd=OpenFileBufferedNew(thread->filename); |
926 | |
927 | for(item=0;item<thread->n;item++) |
928 | WriteFileBuffered(fd,thread->datap[item],thread->itemsize); |
929 | |
930 | CloseFileBuffered(fd); |
931 | |
932 | #if defined(USE_PTHREADS) && USE_PTHREADS |
933 | |
934 | if(option_filesort_threads>1) |
935 | { |
936 | pthread_mutex_unlock(&files_mutex); |
937 | |
938 | pthread_mutex_lock(&running_mutex); |
939 | |
940 | thread->running=2; |
941 | |
942 | pthread_cond_signal(&running_cond); |
943 | |
944 | pthread_mutex_unlock(&running_mutex); |
945 | } |
946 | |
947 | #endif |
948 | |
949 | return(NULL); |
950 | } |
951 | |
952 | |
953 | /*++++++++++++++++++++++++++++++++++++++ |
954 | A wrapper function that can be run in a thread for variable data. |
955 | |
956 | void *filesort_vary_heapsort_thread Returns NULL (required to return void*). |
957 | |
958 | thread_data *thread The data to be processed in this thread. |
959 | ++++++++++++++++++++++++++++++++++++++*/ |
960 | |
961 | static void *filesort_vary_heapsort_thread(thread_data *thread) |
962 | { |
963 | int fd; |
964 | size_t item; |
965 | |
966 | /* Sort the data pointers using a heap sort */ |
967 | |
968 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
969 | |
970 | /* Create a temporary file and write the result */ |
971 | |
972 | #if defined(USE_PTHREADS) && USE_PTHREADS |
973 | |
974 | if(option_filesort_threads>1) |
975 | pthread_mutex_lock(&files_mutex); |
976 | |
977 | #endif |
978 | |
979 | fd=OpenFileBufferedNew(thread->filename); |
980 | |
981 | for(item=0;item<thread->n;item++) |
982 | { |
983 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)((char*)thread->datap[item]-FILESORT_VARSIZE); |
984 | |
985 | WriteFileBuffered(fd,(char*)thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
986 | } |
987 | |
988 | CloseFileBuffered(fd); |
989 | |
990 | #if defined(USE_PTHREADS) && USE_PTHREADS |
991 | |
992 | if(option_filesort_threads>1) |
993 | { |
994 | pthread_mutex_unlock(&files_mutex); |
995 | |
996 | pthread_mutex_lock(&running_mutex); |
997 | |
998 | thread->running=2; |
999 | |
1000 | pthread_cond_signal(&running_cond); |
1001 | |
1002 | pthread_mutex_unlock(&running_mutex); |
1003 | } |
1004 | |
1005 | #endif |
1006 | |
1007 | return(NULL); |
1008 | } |
1009 | |
1010 | |
1011 | /*++++++++++++++++++++++++++++++++++++++ |
1012 | A function to sort an array of pointers efficiently. |
1013 | |
1014 | The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort, |
1015 | in particular, this is good because it can operate in-place and doesn't |
1016 | allocate more memory like using qsort() does. |
1017 | |
1018 | void **datap A pointer to the array of pointers to sort. |
1019 | |
1020 | size_t nitems The number of items of data to sort. |
1021 | |
1022 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
1023 | to qsort if the data to be sorted is an array of things not pointers. |
1024 | ++++++++++++++++++++++++++++++++++++++*/ |
1025 | |
1026 | void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*)) |
1027 | { |
1028 | void **datap1=&datap[-1]; |
1029 | size_t item; |
1030 | |
1031 | /* Fill the heap by pretending to insert the data that is already there */ |
1032 | |
1033 | for(item=2;item<=nitems;item++) |
1034 | { |
1035 | size_t index=item; |
1036 | |
1037 | /* Bubble up the new value (upside-down, put largest at top) */ |
1038 | |
1039 | while(index>1) |
1040 | { |
1041 | int newindex; |
1042 | void *temp; |
1043 | |
1044 | newindex=index/2; |
1045 | |
1046 | if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */ |
1047 | break; |
1048 | |
1049 | temp=datap1[index]; |
1050 | datap1[index]=datap1[newindex]; |
1051 | datap1[newindex]=temp; |
1052 | |
1053 | index=newindex; |
1054 | } |
1055 | } |
1056 | |
1057 | /* Repeatedly pull out the root of the heap and swap with the bottom item */ |
1058 | |
1059 | for(item=nitems;item>1;item--) |
1060 | { |
1061 | size_t index=1; |
1062 | void *temp; |
1063 | |
1064 | temp=datap1[index]; |
1065 | datap1[index]=datap1[item]; |
1066 | datap1[item]=temp; |
1067 | |
1068 | /* Bubble down the new value (upside-down, put largest at top) */ |
1069 | |
1070 | while((2*index)<(item-1)) |
1071 | { |
1072 | int newindex; |
1073 | void **temp; |
1074 | |
1075 | newindex=2*index; |
1076 | |
1077 | if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */ |
1078 | newindex=newindex+1; |
1079 | |
1080 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1081 | break; |
1082 | |
1083 | temp=datap1[newindex]; |
1084 | datap1[newindex]=datap1[index]; |
1085 | datap1[index]=temp; |
1086 | |
1087 | index=newindex; |
1088 | } |
1089 | |
1090 | if((2*index)==(item-1)) |
1091 | { |
1092 | int newindex; |
1093 | void *temp; |
1094 | |
1095 | newindex=2*index; |
1096 | |
1097 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1098 | ; /* break */ |
1099 | else |
1100 | { |
1101 | temp=datap1[newindex]; |
1102 | datap1[newindex]=datap1[index]; |
1103 | datap1[index]=temp; |
1104 | } |
1105 | } |
1106 | } |
1107 | } |
Properties
Name | Value |
---|---|
cvs:description | Functions to perform sorting. |