Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Contents of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1988 - (show annotations) (download) (as text)
Sat Apr 13 18:16:13 2019 UTC (5 years, 11 months ago) by amb
File MIME type: text/x-csrc
File size: 28422 byte(s)
Ensure that data pointers are correctly aligned - found by gcc's
runtime sanitizer (make SANITIZE=1 test).

1 /***************************************
2 Merge sort functions.
3
4 Part of the Routino routing software.
5 ******************/ /******************
6 This file Copyright 2009-2015, 2017, 2019 Andrew M. Bishop
7
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU Affero General Public License as published by
10 the Free Software Foundation, either version 3 of the License, or
11 (at your option) any later version.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU Affero General Public License for more details.
17
18 You should have received a copy of the GNU Affero General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ***************************************/
21
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #if defined(USE_PTHREADS) && USE_PTHREADS
28 #include <pthread.h>
29 #endif
30
31 #include "types.h"
32
33 #include "logging.h"
34 #include "files.h"
35 #include "sorting.h"
36
37
38 /* Global variables */
39
40 /*+ The command line '--tmpdir' option or its default value. +*/
41 extern char *option_tmpdirname;
42
43 /*+ The amount of RAM to use for filesorting. +*/
44 extern size_t option_filesort_ramsize;
45
46 /*+ The number of filesorting threads allowed. +*/
47 extern int option_filesort_threads;
48
49
50 /* Thread data type definitions */
51
52 /*+ A data type for holding data for a thread. +*/
53 typedef struct _thread_data
54 {
55 #if defined(USE_PTHREADS) && USE_PTHREADS
56
57 pthread_t thread; /*+ The thread identifier. +*/
58
59 int running; /*+ A flag indicating the current state of the thread. +*/
60
61 #endif
62
63 char *data; /*+ The main data array. +*/
64 void **datap; /*+ An array of pointers to the data objects. +*/
65 size_t n; /*+ The number of pointers. +*/
66
67 char *filename; /*+ The name of the file to write the results to. +*/
68
69 size_t itemsize; /*+ The size of each item. +*/
70 int (*compare)(const void*,const void*); /*+ The comparison function. +*/
71 }
72 thread_data;
73
74 /* Thread variables */
75
76 #if defined(USE_PTHREADS) && USE_PTHREADS
77
78 static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
79 static pthread_mutex_t files_mutex = PTHREAD_MUTEX_INITIALIZER;
80 static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
81
82 #endif
83
84 /* Thread helper functions */
85
86 static void *filesort_fixed_heapsort_thread(thread_data *thread);
87 static void *filesort_vary_heapsort_thread(thread_data *thread);
88
89
90 /*++++++++++++++++++++++++++++++++++++++
91 A function to sort the contents of a file of fixed length objects using a
92 limited amount of RAM.
93
94 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
95 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
96 The individual sort steps and the merge step both use a "Heap sort"
97 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
98 if the data is already partially sorted.
99
100 index_t filesort_fixed Returns the number of objects kept.
101
102 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
103
104 int fd_out The file descriptor of the output file (opened for writing and empty).
105
106 size_t itemsize The size of each item in the file that needs sorting.
107
108 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
109 each item before they have been sorted. The second parameter is the number of objects
110 previously read from the input file. If the function returns 1 then the object is kept
111 and it is sorted, otherwise it is ignored.
112
113 int (*compare_function)(const void*, const void*) The comparison function. This is identical
114 to qsort if the data to be sorted is an array of things not pointers.
115
116 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
117 each item after they have been sorted. The second parameter is the number of objects
118 already written to the output file. If the function returns 1 then the object is written
119 to the output file., otherwise it is ignored.
120 ++++++++++++++++++++++++++++++++++++++*/
121
122 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t),
123 int (*compare_function)(const void*,const void*),
124 int (*post_sort_function)(void*,index_t))
125 {
126 int *fds=NULL,*heap=NULL;
127 int nfiles=0,ndata=0;
128 index_t count_out=0,count_in=0,total=0;
129 size_t nitems,item;
130 char *data;
131 void **datap;
132 thread_data *threads;
133 int i,more=1;
134 #if defined(USE_PTHREADS) && USE_PTHREADS
135 int nthreads=0;
136 #endif
137
138 /* Allocate the RAM buffer and other bits */
139
140 nitems=(size_t)SizeFileFD(fd_in)/itemsize;
141
142 if(nitems==0)
143 return(0);
144
145 if((nitems*(itemsize+sizeof(void*)))<option_filesort_ramsize)
146 nitems=1+nitems/option_filesort_threads;
147 else
148 nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
149
150 threads=(thread_data*)calloc(option_filesort_threads,sizeof(thread_data));
151
152 for(i=0;i<option_filesort_threads;i++)
153 {
154 threads[i].data=malloc(nitems*itemsize);
155 threads[i].datap=malloc(nitems*sizeof(void*));
156
157 log_malloc(threads[i].data ,nitems*itemsize);
158 log_malloc(threads[i].datap,nitems*sizeof(void*));
159
160 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
161
162 threads[i].itemsize=itemsize;
163 threads[i].compare=compare_function;
164 }
165
166 /* Loop around, fill the buffer, sort the data and write a temporary file */
167
168 do
169 {
170 int thread=0;
171
172 #if defined(USE_PTHREADS) && USE_PTHREADS
173
174 if(option_filesort_threads>1)
175 {
176 /* Find a spare slot (one *must* be unused at all times) */
177
178 pthread_mutex_lock(&running_mutex);
179
180 for(thread=0;thread<option_filesort_threads;thread++)
181 if(!threads[thread].running)
182 break;
183
184 pthread_mutex_unlock(&running_mutex);
185 }
186
187 #endif
188
189 /* Read in the data and create pointers */
190
191 for(item=0;item<nitems;)
192 {
193 threads[thread].datap[item]=threads[thread].data+item*itemsize;
194
195 if(ReadFileBuffered(fd_in,threads[thread].datap[item],itemsize))
196 {
197 more=0;
198 break;
199 }
200
201 if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in))
202 {
203 item++;
204 total++;
205 }
206
207 count_in++;
208 }
209
210 threads[thread].n=item;
211
212 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
213
214 if(more==0 && total==0)
215 goto tidy_and_exit;
216
217 /* No new data read in this time round */
218
219 if(threads[thread].n==0)
220 break;
221
222 /* Sort the data pointers using a heap sort (potentially in a thread) */
223
224 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
225
226 #if defined(USE_PTHREADS) && USE_PTHREADS
227
228 /* Shortcut if only one file, don't write to disk */
229
230 if(more==0 && nfiles==0)
231 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
232 else if(option_filesort_threads>1)
233 {
234 pthread_mutex_lock(&running_mutex);
235
236 while(nthreads==(option_filesort_threads-1))
237 {
238 for(i=0;i<option_filesort_threads;i++)
239 if(threads[i].running==2)
240 {
241 pthread_join(threads[i].thread,NULL);
242 threads[i].running=0;
243 nthreads--;
244 }
245
246 if(nthreads==(option_filesort_threads-1))
247 pthread_cond_wait(&running_cond,&running_mutex);
248 }
249
250 threads[thread].running=1;
251
252 pthread_mutex_unlock(&running_mutex);
253
254 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
255
256 nthreads++;
257 }
258 else
259 filesort_fixed_heapsort_thread(&threads[thread]);
260
261 #else
262
263 /* Shortcut if only one file, don't write to disk */
264
265 if(more==0 && nfiles==0)
266 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
267 else
268 filesort_fixed_heapsort_thread(&threads[thread]);
269
270 #endif
271
272 nfiles++;
273 }
274 while(more);
275
276 /* Wait for all of the threads to finish */
277
278 #if defined(USE_PTHREADS) && USE_PTHREADS
279
280 while(option_filesort_threads>1 && nthreads)
281 {
282 pthread_mutex_lock(&running_mutex);
283
284 pthread_cond_wait(&running_cond,&running_mutex);
285
286 for(i=0;i<option_filesort_threads;i++)
287 if(threads[i].running==2)
288 {
289 pthread_join(threads[i].thread,NULL);
290 threads[i].running=0;
291 nthreads--;
292 }
293
294 pthread_mutex_unlock(&running_mutex);
295 }
296
297 #endif
298
299 /* Shortcut if there are no files */
300
301 if(nfiles==0)
302 goto tidy_and_exit;
303
304 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
305
306 if(nfiles==1)
307 {
308 for(item=0;item<threads[0].n;item++)
309 {
310 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
311 {
312 WriteFileBuffered(fd_out,threads[0].datap[item],itemsize);
313 count_out++;
314 }
315 }
316
317 DeleteFile(threads[0].filename);
318
319 goto tidy_and_exit;
320 }
321
322 /* Check that number of files is less than file size */
323
324 logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)");
325
326 /* Open all of the temporary files */
327
328 fds=(int*)malloc(nfiles*sizeof(int));
329
330 for(i=0;i<nfiles;i++)
331 {
332 char *filename=threads[0].filename;
333
334 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
335
336 fds[i]=ReOpenFileBuffered(filename);
337
338 DeleteFile(filename);
339 }
340
341 /* Perform an n-way merge using a binary heap */
342
343 heap=(int*)malloc((1+nfiles)*sizeof(int));
344
345 data =threads[0].data;
346 datap=threads[0].datap;
347
348 /* Fill the heap to start with */
349
350 for(i=0;i<nfiles;i++)
351 {
352 int index;
353
354 datap[i]=data+i*itemsize;
355
356 ReadFileBuffered(fds[i],datap[i],itemsize);
357
358 index=i+1;
359
360 heap[index]=i;
361
362 /* Bubble up the new value */
363
364 while(index>1)
365 {
366 int newindex;
367 int temp;
368
369 newindex=index/2;
370
371 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
372 break;
373
374 temp=heap[index];
375 heap[index]=heap[newindex];
376 heap[newindex]=temp;
377
378 index=newindex;
379 }
380 }
381
382 /* Repeatedly pull out the root of the heap and refill from the same file */
383
384 ndata=nfiles;
385
386 do
387 {
388 int index=1;
389
390 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
391 {
392 WriteFileBuffered(fd_out,datap[heap[index]],itemsize);
393 count_out++;
394 }
395
396 if(ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize))
397 {
398 heap[index]=heap[ndata];
399 ndata--;
400 }
401
402 /* Bubble down the new value */
403
404 while((2*index)<ndata)
405 {
406 int newindex;
407 int temp;
408
409 newindex=2*index;
410
411 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
412 newindex=newindex+1;
413
414 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
415 break;
416
417 temp=heap[newindex];
418 heap[newindex]=heap[index];
419 heap[index]=temp;
420
421 index=newindex;
422 }
423
424 if((2*index)==ndata)
425 {
426 int newindex;
427 int temp;
428
429 newindex=2*index;
430
431 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
432 ; /* break */
433 else
434 {
435 temp=heap[newindex];
436 heap[newindex]=heap[index];
437 heap[index]=temp;
438 }
439 }
440 }
441 while(ndata>0);
442
443 /* Tidy up */
444
445 tidy_and_exit:
446
447 if(fds)
448 {
449 for(i=0;i<nfiles;i++)
450 CloseFileBuffered(fds[i]);
451 free(fds);
452 }
453
454 if(heap)
455 free(heap);
456
457 for(i=0;i<option_filesort_threads;i++)
458 {
459 log_free(threads[i].data);
460 log_free(threads[i].datap);
461
462 free(threads[i].data);
463 free(threads[i].datap);
464
465 free(threads[i].filename);
466 }
467
468 free(threads);
469
470 return(count_out);
471 }
472
473
474 /*++++++++++++++++++++++++++++++++++++++
475 A function to sort the contents of a file of variable length objects (each
476 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
477
478 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
479 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
480 The individual sort steps and the merge step both use a "Heap sort"
481 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
482 if the data is already partially sorted.
483
484 index_t filesort_vary Returns the number of objects kept.
485
486 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
487
488 int fd_out The file descriptor of the output file (opened for writing and empty).
489
490 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
491 each item before they have been sorted. The second parameter is the number of objects
492 previously read from the input file. If the function returns 1 then the object is kept
493 and it is sorted, otherwise it is ignored.
494
495 int (*compare_function)(const void*, const void*) The comparison function. This is identical
496 to qsort if the data to be sorted is an array of things not pointers.
497
498 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
499 each item after they have been sorted. The second parameter is the number of objects
500 already written to the output file. If the function returns 1 then the object is written
501 to the output file., otherwise it is ignored.
502 ++++++++++++++++++++++++++++++++++++++*/
503
504 index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t),
505 int (*compare_function)(const void*,const void*),
506 int (*post_sort_function)(void*,index_t))
507 {
508 int *fds=NULL,*heap=NULL;
509 int nfiles=0,ndata=0;
510 index_t count_out=0,count_in=0,total=0;
511 size_t datasize,item;
512 FILESORT_VARINT nextitemsize,largestitemsize=0;
513 char *data;
514 void **datap;
515 thread_data *threads;
516 int i,more=1;
517 #if defined(USE_PTHREADS) && USE_PTHREADS
518 int nthreads=0;
519 #endif
520
521 /* Allocate the RAM buffer and other bits */
522
523 datasize=(size_t)SizeFileFD(fd_in);
524
525 if(datasize==0)
526 return(0);
527
528 /* We can not know in advance how many data items there are. Each
529 one will require RAM for data, FILESORT_VARALIGN and sizeof(void*)
530 Assume that data+FILESORT_VARALIGN+sizeof(void*) is 4*data. */
531
532 if((datasize*4)<option_filesort_ramsize)
533 datasize=(datasize*4)/option_filesort_threads;
534 else
535 datasize=option_filesort_ramsize/option_filesort_threads;
536
537 datasize=FILESORT_VARALIGN*((datasize+FILESORT_VARALIGN-1)/FILESORT_VARALIGN);
538
539 threads=(thread_data*)calloc(option_filesort_threads,sizeof(thread_data));
540
541 for(i=0;i<option_filesort_threads;i++)
542 {
543 threads[i].data=malloc(datasize);
544 threads[i].datap=NULL;
545
546 log_malloc(threads[i].data,datasize);
547
548 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
549
550 threads[i].compare=compare_function;
551 }
552
553 /* Loop around, fill the buffer, sort the data and write a temporary file */
554
555 if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
556 goto tidy_and_exit;
557
558 do
559 {
560 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
561 int thread=0;
562
563 #if defined(USE_PTHREADS) && USE_PTHREADS
564
565 if(option_filesort_threads>1)
566 {
567 /* Find a spare slot (one *must* be unused at all times) */
568
569 pthread_mutex_lock(&running_mutex);
570
571 for(thread=0;thread<option_filesort_threads;thread++)
572 if(!threads[thread].running)
573 break;
574
575 pthread_mutex_unlock(&running_mutex);
576 }
577
578 #endif
579
580 threads[thread].datap=(void**)(threads[thread].data+datasize);
581
582 threads[thread].n=0;
583
584 /* Read in the data and create pointers */
585
586 while((ramused+FILESORT_VARSIZE+nextitemsize)<=(size_t)((char*)threads[thread].datap-sizeof(void*)-threads[thread].data))
587 {
588 FILESORT_VARINT itemsize=nextitemsize;
589
590 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
591
592 ramused+=FILESORT_VARSIZE;
593
594 ReadFileBuffered(fd_in,threads[thread].data+ramused,itemsize);
595
596 if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in))
597 {
598 *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
599
600 if(itemsize>largestitemsize)
601 largestitemsize=itemsize;
602
603 ramused+=itemsize;
604
605 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARALIGN-1)/FILESORT_VARALIGN);
606 ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
607
608 total++;
609 threads[thread].n++;
610 }
611 else
612 ramused-=FILESORT_VARSIZE;
613
614 count_in++;
615
616 if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE))
617 {
618 more=0;
619 break;
620 }
621 }
622
623 /* No new data read in this time round */
624
625 if(threads[thread].n==0)
626 break;
627
628 /* Sort the data pointers using a heap sort (potentially in a thread) */
629
630 if(more==0 && nfiles==0)
631 threads[thread].filename[0]=0;
632 else
633 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
634
635 #if defined(USE_PTHREADS) && USE_PTHREADS
636
637 /* Shortcut if only one file, don't write to disk */
638
639 if(more==0 && nfiles==0)
640 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
641 else if(option_filesort_threads>1)
642 {
643 pthread_mutex_lock(&running_mutex);
644
645 while(nthreads==(option_filesort_threads-1))
646 {
647 for(i=0;i<option_filesort_threads;i++)
648 if(threads[i].running==2)
649 {
650 pthread_join(threads[i].thread,NULL);
651 threads[i].running=0;
652 nthreads--;
653 }
654
655 if(nthreads==(option_filesort_threads-1))
656 pthread_cond_wait(&running_cond,&running_mutex);
657 }
658
659 threads[thread].running=1;
660
661 pthread_mutex_unlock(&running_mutex);
662
663 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
664
665 nthreads++;
666 }
667 else
668 filesort_vary_heapsort_thread(&threads[thread]);
669
670 #else
671
672 /* Shortcut if only one file, don't write to disk */
673
674 if(more==0 && nfiles==0)
675 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
676 else
677 filesort_vary_heapsort_thread(&threads[thread]);
678
679 #endif
680
681 nfiles++;
682 }
683 while(more);
684
685 /* Wait for all of the threads to finish */
686
687 #if defined(USE_PTHREADS) && USE_PTHREADS
688
689 while(option_filesort_threads>1 && nthreads)
690 {
691 pthread_mutex_lock(&running_mutex);
692
693 pthread_cond_wait(&running_cond,&running_mutex);
694
695 for(i=0;i<option_filesort_threads;i++)
696 if(threads[i].running==2)
697 {
698 pthread_join(threads[i].thread,NULL);
699 threads[i].running=0;
700 nthreads--;
701 }
702
703 pthread_mutex_unlock(&running_mutex);
704 }
705
706 #endif
707
708 /* Shortcut if there are no files */
709
710 if(nfiles==0)
711 goto tidy_and_exit;
712
713 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
714
715 if(nfiles==1)
716 {
717 for(item=0;item<threads[0].n;item++)
718 {
719 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
720 {
721 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)((char*)threads[0].datap[item]-FILESORT_VARSIZE);
722
723 WriteFileBuffered(fd_out,(char*)threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
724 count_out++;
725 }
726 }
727
728 DeleteFile(threads[0].filename);
729
730 goto tidy_and_exit;
731 }
732
733 /* Check that number of files is less than file size */
734
735 largestitemsize=FILESORT_VARALIGN*((largestitemsize+FILESORT_VARALIGN-1)/FILESORT_VARALIGN);
736
737 logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/(FILESORT_VARALIGN+largestitemsize)),"Too many temporary files (use more sorting memory?)");
738
739 /* Open all of the temporary files */
740
741 fds=(int*)malloc(nfiles*sizeof(int));
742
743 for(i=0;i<nfiles;i++)
744 {
745 char *filename=threads[0].filename;
746
747 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
748
749 fds[i]=ReOpenFileBuffered(filename);
750
751 DeleteFile(filename);
752 }
753
754 /* Perform an n-way merge using a binary heap */
755
756 heap=(int*)malloc((1+nfiles)*sizeof(int));
757
758 data=threads[0].data;
759 datap=(void**)(data+datasize-nfiles*sizeof(void*));
760
761 /* Fill the heap to start with */
762
763 for(i=0;i<nfiles;i++)
764 {
765 int index;
766 FILESORT_VARINT itemsize;
767
768 datap[i]=data+FILESORT_VARALIGN+i*(largestitemsize+FILESORT_VARALIGN);
769
770 ReadFileBuffered(fds[i],&itemsize,FILESORT_VARSIZE);
771
772 *(FILESORT_VARINT*)((char*)datap[i]-FILESORT_VARSIZE)=itemsize;
773
774 ReadFileBuffered(fds[i],datap[i],itemsize);
775
776 index=i+1;
777
778 heap[index]=i;
779
780 /* Bubble up the new value */
781
782 while(index>1)
783 {
784 int newindex;
785 int temp;
786
787 newindex=index/2;
788
789 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
790 break;
791
792 temp=heap[index];
793 heap[index]=heap[newindex];
794 heap[newindex]=temp;
795
796 index=newindex;
797 }
798 }
799
800 /* Repeatedly pull out the root of the heap and refill from the same file */
801
802 ndata=nfiles;
803
804 do
805 {
806 int index=1;
807 FILESORT_VARINT itemsize;
808
809 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
810 {
811 itemsize=*(FILESORT_VARINT*)((char*)datap[heap[index]]-FILESORT_VARSIZE);
812
813 WriteFileBuffered(fd_out,(char*)datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
814 count_out++;
815 }
816
817 if(ReadFileBuffered(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
818 {
819 heap[index]=heap[ndata];
820 ndata--;
821 }
822 else
823 {
824 *(FILESORT_VARINT*)((char*)datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
825
826 ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize);
827 }
828
829 /* Bubble down the new value */
830
831 while((2*index)<ndata)
832 {
833 int newindex;
834 int temp;
835
836 newindex=2*index;
837
838 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
839 newindex=newindex+1;
840
841 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
842 break;
843
844 temp=heap[newindex];
845 heap[newindex]=heap[index];
846 heap[index]=temp;
847
848 index=newindex;
849 }
850
851 if((2*index)==ndata)
852 {
853 int newindex;
854 int temp;
855
856 newindex=2*index;
857
858 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
859 ; /* break */
860 else
861 {
862 temp=heap[newindex];
863 heap[newindex]=heap[index];
864 heap[index]=temp;
865 }
866 }
867 }
868 while(ndata>0);
869
870 /* Tidy up */
871
872 tidy_and_exit:
873
874 if(fds)
875 {
876 for(i=0;i<nfiles;i++)
877 CloseFileBuffered(fds[i]);
878 free(fds);
879 }
880
881 if(heap)
882 free(heap);
883
884 for(i=0;i<option_filesort_threads;i++)
885 {
886 log_free(threads[i].data);
887
888 free(threads[i].data);
889
890 free(threads[i].filename);
891 }
892
893 free(threads);
894
895 return(count_out);
896 }
897
898
899 /*++++++++++++++++++++++++++++++++++++++
900 A wrapper function that can be run in a thread for fixed data.
901
902 void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
903
904 thread_data *thread The data to be processed in this thread.
905 ++++++++++++++++++++++++++++++++++++++*/
906
907 static void *filesort_fixed_heapsort_thread(thread_data *thread)
908 {
909 int fd;
910 size_t item;
911
912 /* Sort the data pointers using a heap sort */
913
914 filesort_heapsort(thread->datap,thread->n,thread->compare);
915
916 /* Create a temporary file and write the result */
917
918 #if defined(USE_PTHREADS) && USE_PTHREADS
919
920 if(option_filesort_threads>1)
921 pthread_mutex_lock(&files_mutex);
922
923 #endif
924
925 fd=OpenFileBufferedNew(thread->filename);
926
927 for(item=0;item<thread->n;item++)
928 WriteFileBuffered(fd,thread->datap[item],thread->itemsize);
929
930 CloseFileBuffered(fd);
931
932 #if defined(USE_PTHREADS) && USE_PTHREADS
933
934 if(option_filesort_threads>1)
935 {
936 pthread_mutex_unlock(&files_mutex);
937
938 pthread_mutex_lock(&running_mutex);
939
940 thread->running=2;
941
942 pthread_cond_signal(&running_cond);
943
944 pthread_mutex_unlock(&running_mutex);
945 }
946
947 #endif
948
949 return(NULL);
950 }
951
952
953 /*++++++++++++++++++++++++++++++++++++++
954 A wrapper function that can be run in a thread for variable data.
955
956 void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
957
958 thread_data *thread The data to be processed in this thread.
959 ++++++++++++++++++++++++++++++++++++++*/
960
961 static void *filesort_vary_heapsort_thread(thread_data *thread)
962 {
963 int fd;
964 size_t item;
965
966 /* Sort the data pointers using a heap sort */
967
968 filesort_heapsort(thread->datap,thread->n,thread->compare);
969
970 /* Create a temporary file and write the result */
971
972 #if defined(USE_PTHREADS) && USE_PTHREADS
973
974 if(option_filesort_threads>1)
975 pthread_mutex_lock(&files_mutex);
976
977 #endif
978
979 fd=OpenFileBufferedNew(thread->filename);
980
981 for(item=0;item<thread->n;item++)
982 {
983 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)((char*)thread->datap[item]-FILESORT_VARSIZE);
984
985 WriteFileBuffered(fd,(char*)thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
986 }
987
988 CloseFileBuffered(fd);
989
990 #if defined(USE_PTHREADS) && USE_PTHREADS
991
992 if(option_filesort_threads>1)
993 {
994 pthread_mutex_unlock(&files_mutex);
995
996 pthread_mutex_lock(&running_mutex);
997
998 thread->running=2;
999
1000 pthread_cond_signal(&running_cond);
1001
1002 pthread_mutex_unlock(&running_mutex);
1003 }
1004
1005 #endif
1006
1007 return(NULL);
1008 }
1009
1010
1011 /*++++++++++++++++++++++++++++++++++++++
1012 A function to sort an array of pointers efficiently.
1013
1014 The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
1015 in particular, this is good because it can operate in-place and doesn't
1016 allocate more memory like using qsort() does.
1017
1018 void **datap A pointer to the array of pointers to sort.
1019
1020 size_t nitems The number of items of data to sort.
1021
1022 int (*compare_function)(const void*, const void*) The comparison function. This is identical
1023 to qsort if the data to be sorted is an array of things not pointers.
1024 ++++++++++++++++++++++++++++++++++++++*/
1025
1026 void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*))
1027 {
1028 void **datap1=&datap[-1];
1029 size_t item;
1030
1031 /* Fill the heap by pretending to insert the data that is already there */
1032
1033 for(item=2;item<=nitems;item++)
1034 {
1035 size_t index=item;
1036
1037 /* Bubble up the new value (upside-down, put largest at top) */
1038
1039 while(index>1)
1040 {
1041 int newindex;
1042 void *temp;
1043
1044 newindex=index/2;
1045
1046 if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */
1047 break;
1048
1049 temp=datap1[index];
1050 datap1[index]=datap1[newindex];
1051 datap1[newindex]=temp;
1052
1053 index=newindex;
1054 }
1055 }
1056
1057 /* Repeatedly pull out the root of the heap and swap with the bottom item */
1058
1059 for(item=nitems;item>1;item--)
1060 {
1061 size_t index=1;
1062 void *temp;
1063
1064 temp=datap1[index];
1065 datap1[index]=datap1[item];
1066 datap1[item]=temp;
1067
1068 /* Bubble down the new value (upside-down, put largest at top) */
1069
1070 while((2*index)<(item-1))
1071 {
1072 int newindex;
1073 void **temp;
1074
1075 newindex=2*index;
1076
1077 if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */
1078 newindex=newindex+1;
1079
1080 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1081 break;
1082
1083 temp=datap1[newindex];
1084 datap1[newindex]=datap1[index];
1085 datap1[index]=temp;
1086
1087 index=newindex;
1088 }
1089
1090 if((2*index)==(item-1))
1091 {
1092 int newindex;
1093 void *temp;
1094
1095 newindex=2*index;
1096
1097 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1098 ; /* break */
1099 else
1100 {
1101 temp=datap1[newindex];
1102 datap1[newindex]=datap1[index];
1103 datap1[index]=temp;
1104 }
1105 }
1106 }
1107 }

Properties

Name Value
cvs:description Functions to perform sorting.