Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Contents of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1310 - (show annotations) (download) (as text)
Fri May 10 19:02:28 2013 UTC (11 years, 10 months ago) by amb
File MIME type: text/x-csrc
File size: 26583 byte(s)
Change data type from signed to unsigned (pedantic compiler warning).

1 /***************************************
2 Merge sort functions.
3
4 Part of the Routino routing software.
5 ******************/ /******************
6 This file Copyright 2009-2013 Andrew M. Bishop
7
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU Affero General Public License as published by
10 the Free Software Foundation, either version 3 of the License, or
11 (at your option) any later version.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU Affero General Public License for more details.
17
18 You should have received a copy of the GNU Affero General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ***************************************/
21
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #if defined(USE_PTHREADS) && USE_PTHREADS
28 #include <pthread.h>
29 #endif
30
31 #include "types.h"
32
33 #include "logging.h"
34 #include "files.h"
35 #include "sorting.h"
36
37
38 /* Global variables */
39
40 /*+ The command line '--tmpdir' option or its default value. +*/
41 extern char *option_tmpdirname;
42
43 /*+ The amount of RAM to use for filesorting. +*/
44 extern size_t option_filesort_ramsize;
45
46 /*+ The number of filesorting threads allowed. +*/
47 extern int option_filesort_threads;
48
49
50 /* Thread data type definitions */
51
52 /*+ A data type for holding data for a thread. +*/
53 typedef struct _thread_data
54 {
55 pthread_t thread; /*+ The thread identifier. +*/
56
57 int running; /*+ A flag indicating the current state of the thread. +*/
58
59 void *data; /*+ The main data array. +*/
60 void **datap; /*+ An array of pointers to the data objects. +*/
61 size_t n; /*+ The number of pointers. +*/
62
63 char *filename; /*+ The name of the file to write the results to. +*/
64
65 size_t itemsize; /*+ The size of each item. +*/
66 int (*compare)(const void*,const void*); /*+ The comparison function. +*/
67 }
68 thread_data;
69
70 /* Thread variables */
71
72 #if defined(USE_PTHREADS) && USE_PTHREADS
73
74 static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
75 static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
76
77 #endif
78
79 /* Thread helper functions */
80
81 static void *filesort_fixed_heapsort_thread(thread_data *thread);
82 static void *filesort_vary_heapsort_thread(thread_data *thread);
83
84
85 /*++++++++++++++++++++++++++++++++++++++
86 A function to sort the contents of a file of fixed length objects using a
87 limited amount of RAM.
88
89 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
90 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
91 The individual sort steps and the merge step both use a "Heap sort"
92 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
93 if the data is already partially sorted.
94
95 index_t filesort_fixed Returns the number of objects kept.
96
97 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
98
99 int fd_out The file descriptor of the output file (opened for writing and empty).
100
101 size_t itemsize The size of each item in the file that needs sorting.
102
103 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
104 each item before they have been sorted. The second parameter is the number of objects
105 previously read from the input file. If the function returns 1 then the object is kept
106 and it is sorted, otherwise it is ignored.
107
108 int (*compare_function)(const void*, const void*) The comparison function. This is identical
109 to qsort if the data to be sorted is an array of things not pointers.
110
111 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
112 each item after they have been sorted. The second parameter is the number of objects
113 already written to the output file. If the function returns 1 then the object is written
114 to the output file., otherwise it is ignored.
115 ++++++++++++++++++++++++++++++++++++++*/
116
117 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t),
118 int (*compare_function)(const void*,const void*),
119 int (*post_sort_function)(void*,index_t))
120 {
121 int *fds=NULL,*heap=NULL;
122 int nfiles=0,ndata=0;
123 index_t count_out=0,count_in=0,total=0;
124 size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
125 void *data,**datap;
126 thread_data *threads;
127 size_t item;
128 int i,more=1;
129 #if defined(USE_PTHREADS) && USE_PTHREADS
130 int nthreads=0;
131 #endif
132
133 /* Allocate the RAM buffer and other bits */
134
135 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
136
137 for(i=0;i<option_filesort_threads;i++)
138 {
139 threads[i].running=0;
140
141 threads[i].data=malloc(nitems*itemsize);
142 threads[i].datap=malloc(nitems*sizeof(void*));
143
144 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
145
146 threads[i].itemsize=itemsize;
147 threads[i].compare=compare_function;
148 }
149
150 /* Loop around, fill the buffer, sort the data and write a temporary file */
151
152 do
153 {
154 int thread=0;
155
156 #if defined(USE_PTHREADS) && USE_PTHREADS
157
158 if(option_filesort_threads>1)
159 {
160 /* Find a spare slot (one *must* be unused at all times) */
161
162 pthread_mutex_lock(&running_mutex);
163
164 for(thread=0;thread<option_filesort_threads;thread++)
165 if(!threads[thread].running)
166 break;
167
168 pthread_mutex_unlock(&running_mutex);
169 }
170
171 #endif
172
173 /* Read in the data and create pointers */
174
175 for(item=0;item<nitems;)
176 {
177 threads[thread].datap[item]=threads[thread].data+item*itemsize;
178
179 if(ReadFile(fd_in,threads[thread].datap[item],itemsize))
180 {
181 more=0;
182 break;
183 }
184
185 if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in))
186 {
187 item++;
188 total++;
189 }
190
191 count_in++;
192 }
193
194 threads[thread].n=item;
195
196 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
197
198 if(more==0 && total==0)
199 goto tidy_and_exit;
200
201 /* No new data read in this time round */
202
203 if(threads[thread].n==0)
204 break;
205
206 /* Sort the data pointers using a heap sort (potentially in a thread) */
207
208 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
209
210 #if defined(USE_PTHREADS) && USE_PTHREADS
211
212 /* Shortcut if only one file, don't write to disk */
213
214 if(more==0 && nfiles==0)
215 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
216 else if(option_filesort_threads>1)
217 {
218 pthread_mutex_lock(&running_mutex);
219
220 while(nthreads==(option_filesort_threads-1))
221 {
222 for(i=0;i<option_filesort_threads;i++)
223 if(threads[i].running==2)
224 {
225 pthread_join(threads[i].thread,NULL);
226 threads[i].running=0;
227 nthreads--;
228 }
229
230 if(nthreads==(option_filesort_threads-1))
231 pthread_cond_wait(&running_cond,&running_mutex);
232 }
233
234 threads[thread].running=1;
235
236 pthread_mutex_unlock(&running_mutex);
237
238 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
239
240 nthreads++;
241 }
242 else
243 filesort_fixed_heapsort_thread(&threads[thread]);
244
245 #else
246
247 /* Shortcut if only one file, don't write to disk */
248
249 if(more==0 && nfiles==0)
250 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
251 else
252 filesort_fixed_heapsort_thread(&threads[thread]);
253
254 #endif
255
256 nfiles++;
257 }
258 while(more);
259
260 /* Wait for all of the threads to finish */
261
262 #if defined(USE_PTHREADS) && USE_PTHREADS
263
264 while(option_filesort_threads>1 && nthreads)
265 {
266 pthread_mutex_lock(&running_mutex);
267
268 pthread_cond_wait(&running_cond,&running_mutex);
269
270 for(i=0;i<option_filesort_threads;i++)
271 if(threads[i].running==2)
272 {
273 pthread_join(threads[i].thread,NULL);
274 threads[i].running=0;
275 nthreads--;
276 }
277
278 pthread_mutex_unlock(&running_mutex);
279 }
280
281 #endif
282
283 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
284
285 if(nfiles==1)
286 {
287 for(item=0;item<threads[0].n;item++)
288 {
289 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
290 {
291 WriteFile(fd_out,threads[0].datap[item],itemsize);
292 count_out++;
293 }
294 }
295
296 DeleteFile(threads[0].filename);
297
298 goto tidy_and_exit;
299 }
300
301 /* Check that number of files is less than file size */
302
303 logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)");
304
305 /* Open all of the temporary files */
306
307 fds=(int*)malloc(nfiles*sizeof(int));
308
309 for(i=0;i<nfiles;i++)
310 {
311 char *filename=threads[0].filename;
312
313 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
314
315 fds[i]=ReOpenFile(filename);
316
317 DeleteFile(filename);
318 }
319
320 /* Perform an n-way merge using a binary heap */
321
322 heap=(int*)malloc((1+nfiles)*sizeof(int));
323
324 data =threads[0].data;
325 datap=threads[0].datap;
326
327 /* Fill the heap to start with */
328
329 for(i=0;i<nfiles;i++)
330 {
331 int index;
332
333 datap[i]=data+i*itemsize;
334
335 ReadFile(fds[i],datap[i],itemsize);
336
337 index=i+1;
338
339 heap[index]=i;
340
341 /* Bubble up the new value */
342
343 while(index>1)
344 {
345 int newindex;
346 int temp;
347
348 newindex=index/2;
349
350 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
351 break;
352
353 temp=heap[index];
354 heap[index]=heap[newindex];
355 heap[newindex]=temp;
356
357 index=newindex;
358 }
359 }
360
361 /* Repeatedly pull out the root of the heap and refill from the same file */
362
363 ndata=nfiles;
364
365 do
366 {
367 int index=1;
368
369 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
370 {
371 WriteFile(fd_out,datap[heap[index]],itemsize);
372 count_out++;
373 }
374
375 if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize))
376 {
377 heap[index]=heap[ndata];
378 ndata--;
379 }
380
381 /* Bubble down the new value */
382
383 while((2*index)<ndata)
384 {
385 int newindex;
386 int temp;
387
388 newindex=2*index;
389
390 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
391 newindex=newindex+1;
392
393 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
394 break;
395
396 temp=heap[newindex];
397 heap[newindex]=heap[index];
398 heap[index]=temp;
399
400 index=newindex;
401 }
402
403 if((2*index)==ndata)
404 {
405 int newindex;
406 int temp;
407
408 newindex=2*index;
409
410 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
411 ; /* break */
412 else
413 {
414 temp=heap[newindex];
415 heap[newindex]=heap[index];
416 heap[index]=temp;
417 }
418 }
419 }
420 while(ndata>0);
421
422 /* Tidy up */
423
424 tidy_and_exit:
425
426 if(fds)
427 {
428 for(i=0;i<nfiles;i++)
429 CloseFile(fds[i]);
430 free(fds);
431 }
432
433 if(heap)
434 free(heap);
435
436 for(i=0;i<option_filesort_threads;i++)
437 {
438 free(threads[i].data);
439 free(threads[i].datap);
440
441 free(threads[i].filename);
442 }
443
444 return(count_out);
445 }
446
447
448 /*++++++++++++++++++++++++++++++++++++++
449 A function to sort the contents of a file of variable length objects (each
450 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
451
452 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
453 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
454 The individual sort steps and the merge step both use a "Heap sort"
455 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
456 if the data is already partially sorted.
457
458 index_t filesort_vary Returns the number of objects kept.
459
460 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
461
462 int fd_out The file descriptor of the output file (opened for writing and empty).
463
464 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
465 each item before they have been sorted. The second parameter is the number of objects
466 previously read from the input file. If the function returns 1 then the object is kept
467 and it is sorted, otherwise it is ignored.
468
469 int (*compare_function)(const void*, const void*) The comparison function. This is identical
470 to qsort if the data to be sorted is an array of things not pointers.
471
472 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
473 each item after they have been sorted. The second parameter is the number of objects
474 already written to the output file. If the function returns 1 then the object is written
475 to the output file., otherwise it is ignored.
476 ++++++++++++++++++++++++++++++++++++++*/
477
478 index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t),
479 int (*compare_function)(const void*,const void*),
480 int (*post_sort_function)(void*,index_t))
481 {
482 int *fds=NULL,*heap=NULL;
483 int nfiles=0,ndata=0;
484 index_t count_out=0,count_in=0,total=0;
485 size_t datasize=option_filesort_ramsize/option_filesort_threads;
486 FILESORT_VARINT nextitemsize,largestitemsize=0;
487 void *data,**datap;
488 thread_data *threads;
489 size_t item;
490 int i,more=1;
491 #if defined(USE_PTHREADS) && USE_PTHREADS
492 int nthreads=0;
493 #endif
494
495 /* Allocate the RAM buffer and other bits */
496
497 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
498
499 for(i=0;i<option_filesort_threads;i++)
500 {
501 threads[i].running=0;
502
503 threads[i].data=malloc(datasize);
504 threads[i].datap=NULL;
505
506 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
507
508 threads[i].compare=compare_function;
509 }
510
511 /* Loop around, fill the buffer, sort the data and write a temporary file */
512
513 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
514 goto tidy_and_exit;
515
516 do
517 {
518 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
519 int thread=0;
520
521 #if defined(USE_PTHREADS) && USE_PTHREADS
522
523 if(option_filesort_threads>1)
524 {
525 /* Find a spare slot (one *must* be unused at all times) */
526
527 pthread_mutex_lock(&running_mutex);
528
529 for(thread=0;thread<option_filesort_threads;thread++)
530 if(!threads[thread].running)
531 break;
532
533 pthread_mutex_unlock(&running_mutex);
534 }
535
536 #endif
537
538 threads[thread].datap=threads[thread].data+datasize;
539
540 threads[thread].n=0;
541
542 /* Read in the data and create pointers */
543
544 while((ramused+FILESORT_VARSIZE+nextitemsize)<=(unsigned)((void*)threads[thread].datap-sizeof(void*)-threads[thread].data))
545 {
546 FILESORT_VARINT itemsize=nextitemsize;
547
548 if(itemsize>largestitemsize)
549 largestitemsize=itemsize;
550
551 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
552
553 ramused+=FILESORT_VARSIZE;
554
555 ReadFile(fd_in,threads[thread].data+ramused,itemsize);
556
557 if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in))
558 {
559 *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
560
561 ramused+=itemsize;
562
563 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN);
564 ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
565
566 total++;
567 threads[thread].n++;
568 }
569
570 count_in++;
571
572 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE))
573 {
574 more=0;
575 break;
576 }
577 }
578
579 /* No new data read in this time round */
580
581 if(threads[thread].n==0)
582 break;
583
584 /* Sort the data pointers using a heap sort (potentially in a thread) */
585
586 if(more==0 && nfiles==0)
587 threads[thread].filename[0]=0;
588 else
589 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
590
591 #if defined(USE_PTHREADS) && USE_PTHREADS
592
593 /* Shortcut if only one file, don't write to disk */
594
595 if(more==0 && nfiles==0)
596 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
597 else if(option_filesort_threads>1)
598 {
599 pthread_mutex_lock(&running_mutex);
600
601 while(nthreads==(option_filesort_threads-1))
602 {
603 for(i=0;i<option_filesort_threads;i++)
604 if(threads[i].running==2)
605 {
606 pthread_join(threads[i].thread,NULL);
607 threads[i].running=0;
608 nthreads--;
609 }
610
611 if(nthreads==(option_filesort_threads-1))
612 pthread_cond_wait(&running_cond,&running_mutex);
613 }
614
615 threads[thread].running=1;
616
617 pthread_mutex_unlock(&running_mutex);
618
619 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
620
621 nthreads++;
622 }
623 else
624 filesort_vary_heapsort_thread(&threads[thread]);
625
626 #else
627
628 /* Shortcut if only one file, don't write to disk */
629
630 if(more==0 && nfiles==0)
631 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
632 else
633 filesort_vary_heapsort_thread(&threads[thread]);
634
635 #endif
636
637 nfiles++;
638 }
639 while(more);
640
641 /* Wait for all of the threads to finish */
642
643 #if defined(USE_PTHREADS) && USE_PTHREADS
644
645 while(option_filesort_threads>1 && nthreads)
646 {
647 pthread_mutex_lock(&running_mutex);
648
649 pthread_cond_wait(&running_cond,&running_mutex);
650
651 for(i=0;i<option_filesort_threads;i++)
652 if(threads[i].running==2)
653 {
654 pthread_join(threads[i].thread,NULL);
655 threads[i].running=0;
656 nthreads--;
657 }
658
659 pthread_mutex_unlock(&running_mutex);
660 }
661
662 #endif
663
664 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
665
666 if(nfiles==1)
667 {
668 for(item=0;item<threads[0].n;item++)
669 {
670 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
671 {
672 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[item]-FILESORT_VARSIZE);
673
674 WriteFile(fd_out,threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
675 count_out++;
676 }
677 }
678
679 DeleteFile(threads[0].filename);
680
681 goto tidy_and_exit;
682 }
683
684 /* Check that number of files is less than file size */
685
686 largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN);
687
688 logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize),"Too many temporary files (use more sorting memory?)");
689
690 /* Open all of the temporary files */
691
692 fds=(int*)malloc(nfiles*sizeof(int));
693
694 for(i=0;i<nfiles;i++)
695 {
696 char *filename=threads[0].filename;
697
698 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
699
700 fds[i]=ReOpenFile(filename);
701
702 DeleteFile(filename);
703 }
704
705 /* Perform an n-way merge using a binary heap */
706
707 heap=(int*)malloc((1+nfiles)*sizeof(int));
708
709 data=threads[0].data;
710 datap=data+datasize-nfiles*sizeof(void*);
711
712 /* Fill the heap to start with */
713
714 for(i=0;i<nfiles;i++)
715 {
716 int index;
717 FILESORT_VARINT itemsize;
718
719 datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize;
720
721 ReadFile(fds[i],&itemsize,FILESORT_VARSIZE);
722
723 *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize;
724
725 ReadFile(fds[i],datap[i],itemsize);
726
727 index=i+1;
728
729 heap[index]=i;
730
731 /* Bubble up the new value */
732
733 while(index>1)
734 {
735 int newindex;
736 int temp;
737
738 newindex=index/2;
739
740 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
741 break;
742
743 temp=heap[index];
744 heap[index]=heap[newindex];
745 heap[newindex]=temp;
746
747 index=newindex;
748 }
749 }
750
751 /* Repeatedly pull out the root of the heap and refill from the same file */
752
753 ndata=nfiles;
754
755 do
756 {
757 int index=1;
758 FILESORT_VARINT itemsize;
759
760 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
761 {
762 itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE);
763
764 WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
765 count_out++;
766 }
767
768 if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
769 {
770 heap[index]=heap[ndata];
771 ndata--;
772 }
773 else
774 {
775 *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
776
777 ReadFile(fds[heap[index]],datap[heap[index]],itemsize);
778 }
779
780 /* Bubble down the new value */
781
782 while((2*index)<ndata)
783 {
784 int newindex;
785 int temp;
786
787 newindex=2*index;
788
789 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
790 newindex=newindex+1;
791
792 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
793 break;
794
795 temp=heap[newindex];
796 heap[newindex]=heap[index];
797 heap[index]=temp;
798
799 index=newindex;
800 }
801
802 if((2*index)==ndata)
803 {
804 int newindex;
805 int temp;
806
807 newindex=2*index;
808
809 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
810 ; /* break */
811 else
812 {
813 temp=heap[newindex];
814 heap[newindex]=heap[index];
815 heap[index]=temp;
816 }
817 }
818 }
819 while(ndata>0);
820
821 /* Tidy up */
822
823 tidy_and_exit:
824
825 if(fds)
826 {
827 for(i=0;i<nfiles;i++)
828 CloseFile(fds[i]);
829 free(fds);
830 }
831
832 if(heap)
833 free(heap);
834
835 for(i=0;i<option_filesort_threads;i++)
836 {
837 free(threads[i].data);
838
839 free(threads[i].filename);
840 }
841
842 return(count_out);
843 }
844
845
846 /*++++++++++++++++++++++++++++++++++++++
847 A wrapper function that can be run in a thread for fixed data.
848
849 void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
850
851 thread_data *thread The data to be processed in this thread.
852 ++++++++++++++++++++++++++++++++++++++*/
853
854 static void *filesort_fixed_heapsort_thread(thread_data *thread)
855 {
856 int fd;
857 size_t item;
858
859 /* Sort the data pointers using a heap sort */
860
861 filesort_heapsort(thread->datap,thread->n,thread->compare);
862
863 /* Create a temporary file and write the result */
864
865 fd=OpenFileNew(thread->filename);
866
867 for(item=0;item<thread->n;item++)
868 WriteFile(fd,thread->datap[item],thread->itemsize);
869
870 CloseFile(fd);
871
872 #if defined(USE_PTHREADS) && USE_PTHREADS
873
874 if(option_filesort_threads>1)
875 {
876 pthread_mutex_lock(&running_mutex);
877
878 thread->running=2;
879
880 pthread_cond_signal(&running_cond);
881
882 pthread_mutex_unlock(&running_mutex);
883 }
884
885 #endif
886
887 return(NULL);
888 }
889
890
891 /*++++++++++++++++++++++++++++++++++++++
892 A wrapper function that can be run in a thread for variable data.
893
894 void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
895
896 thread_data *thread The data to be processed in this thread.
897 ++++++++++++++++++++++++++++++++++++++*/
898
899 static void *filesort_vary_heapsort_thread(thread_data *thread)
900 {
901 int fd;
902 size_t item;
903
904 /* Sort the data pointers using a heap sort */
905
906 filesort_heapsort(thread->datap,thread->n,thread->compare);
907
908 /* Create a temporary file and write the result */
909
910 fd=OpenFileNew(thread->filename);
911
912 for(item=0;item<thread->n;item++)
913 {
914 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[item]-FILESORT_VARSIZE);
915
916 WriteFile(fd,thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
917 }
918
919 CloseFile(fd);
920
921 #if defined(USE_PTHREADS) && USE_PTHREADS
922
923 if(option_filesort_threads>1)
924 {
925 pthread_mutex_lock(&running_mutex);
926
927 thread->running=2;
928
929 pthread_cond_signal(&running_cond);
930
931 pthread_mutex_unlock(&running_mutex);
932 }
933
934 #endif
935
936 return(NULL);
937 }
938
939
940 /*++++++++++++++++++++++++++++++++++++++
941 A function to sort an array of pointers efficiently.
942
943 The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
944 in particular, this is good because it can operate in-place and doesn't
945 allocate more memory like using qsort() does.
946
947 void **datap A pointer to the array of pointers to sort.
948
949 size_t nitems The number of items of data to sort.
950
951 int (*compare_function)(const void*, const void*) The comparison function. This is identical
952 to qsort if the data to be sorted is an array of things not pointers.
953 ++++++++++++++++++++++++++++++++++++++*/
954
955 void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*))
956 {
957 void **datap1=&datap[-1];
958 size_t item;
959
960 /* Fill the heap by pretending to insert the data that is already there */
961
962 for(item=2;item<=nitems;item++)
963 {
964 size_t index=item;
965
966 /* Bubble up the new value (upside-down, put largest at top) */
967
968 while(index>1)
969 {
970 int newindex;
971 void *temp;
972
973 newindex=index/2;
974
975 if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */
976 break;
977
978 temp=datap1[index];
979 datap1[index]=datap1[newindex];
980 datap1[newindex]=temp;
981
982 index=newindex;
983 }
984 }
985
986 /* Repeatedly pull out the root of the heap and swap with the bottom item */
987
988 for(item=nitems;item>1;item--)
989 {
990 size_t index=1;
991 void *temp;
992
993 temp=datap1[index];
994 datap1[index]=datap1[item];
995 datap1[item]=temp;
996
997 /* Bubble down the new value (upside-down, put largest at top) */
998
999 while((2*index)<(item-1))
1000 {
1001 int newindex;
1002 void *temp;
1003
1004 newindex=2*index;
1005
1006 if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */
1007 newindex=newindex+1;
1008
1009 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1010 break;
1011
1012 temp=datap1[newindex];
1013 datap1[newindex]=datap1[index];
1014 datap1[index]=temp;
1015
1016 index=newindex;
1017 }
1018
1019 if((2*index)==(item-1))
1020 {
1021 int newindex;
1022 void *temp;
1023
1024 newindex=2*index;
1025
1026 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1027 ; /* break */
1028 else
1029 {
1030 temp=datap1[newindex];
1031 datap1[newindex]=datap1[index];
1032 datap1[index]=temp;
1033 }
1034 }
1035 }
1036 }

Properties

Name Value
cvs:description Functions to perform sorting.