Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Contents of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1106 - (show annotations) (download) (as text)
Sun Oct 21 15:55:48 2012 UTC (12 years, 4 months ago) by amb
File MIME type: text/x-csrc
File size: 26210 byte(s)
Change the sorting functions to have a pre-sort and post-sort selection function
instead of just a post-selection one (this will allow deletion of some items
before sorting instead of after sorting in some cases).

1 /***************************************
2 Merge sort functions.
3
4 Part of the Routino routing software.
5 ******************/ /******************
6 This file Copyright 2009-2012 Andrew M. Bishop
7
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU Affero General Public License as published by
10 the Free Software Foundation, either version 3 of the License, or
11 (at your option) any later version.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU Affero General Public License for more details.
17
18 You should have received a copy of the GNU Affero General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ***************************************/
21
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <assert.h>
27
28 #if defined(USE_PTHREADS) && USE_PTHREADS
29 #include <pthread.h>
30 #endif
31
32 #include "types.h"
33
34 #include "files.h"
35 #include "sorting.h"
36
37
38 /* Global variables */
39
40 /*+ The command line '--tmpdir' option or its default value. +*/
41 extern char *option_tmpdirname;
42
43 /*+ The amount of RAM to use for filesorting. +*/
44 extern size_t option_filesort_ramsize;
45
46 /*+ The number of filesorting threads allowed. +*/
47 extern int option_filesort_threads;
48
49
50 /* Thread data type definitions */
51
52 /*+ A data type for holding data for a thread. +*/
53 typedef struct _thread_data
54 {
55 pthread_t thread; /*+ The thread identifier. +*/
56
57 int running; /*+ A flag indicating the current state of the thread. +*/
58
59 void *data; /*+ The main data array. +*/
60 void **datap; /*+ An array of pointers to the data objects. +*/
61 size_t n; /*+ The number of pointers. +*/
62
63 char *filename; /*+ The name of the file to write the results to. +*/
64
65 size_t itemsize; /*+ The size of each item. +*/
66 int (*compare)(const void*,const void*); /*+ The comparison function. +*/
67 }
68 thread_data;
69
70 /* Thread variables */
71
72 #if defined(USE_PTHREADS) && USE_PTHREADS
73
74 static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
75 static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
76
77 #endif
78
79 /* Thread helper functions */
80
81 static void *filesort_fixed_heapsort_thread(thread_data *thread);
82 static void *filesort_vary_heapsort_thread(thread_data *thread);
83
84
85 /*++++++++++++++++++++++++++++++++++++++
86 A function to sort the contents of a file of fixed length objects using a
87 limited amount of RAM.
88
89 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
90 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
91 The individual sort steps and the merge step both use a "Heap sort"
92 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
93 if the data is already partially sorted.
94
95 index_t filesort_fixed Returns the number of objects kept.
96
97 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
98
99 int fd_out The file descriptor of the output file (opened for writing and empty).
100
101 size_t itemsize The size of each item in the file that needs sorting.
102
103 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
104 each item before they have been sorted. The second parameter is the number of objects
105 previously read from the input file. If the function returns 1 then the object is kept
106 and it is sorted, otherwise it is ignored.
107
108 int (*compare_function)(const void*, const void*) The comparison function. This is identical
109 to qsort if the data to be sorted is an array of things not pointers.
110
111 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
112 each item after they have been sorted. The second parameter is the number of objects
113 already written to the output file. If the function returns 1 then the object is written
114 to the output file., otherwise it is ignored.
115 ++++++++++++++++++++++++++++++++++++++*/
116
117 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t),
118 int (*compare_function)(const void*,const void*),
119 int (*post_sort_function)(void*,index_t))
120 {
121 int *fds=NULL,*heap=NULL;
122 int nfiles=0,ndata=0;
123 index_t count_out=0,count_in=0;
124 size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
125 void *data,**datap;
126 thread_data *threads;
127 int i,more=1;
128 #if defined(USE_PTHREADS) && USE_PTHREADS
129 int nthreads=0;
130 #endif
131
132 /* Allocate the RAM buffer and other bits */
133
134 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
135
136 for(i=0;i<option_filesort_threads;i++)
137 {
138 threads[i].running=0;
139
140 threads[i].data=malloc(nitems*itemsize);
141 threads[i].datap=malloc(nitems*sizeof(void*));
142
143 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
144
145 threads[i].itemsize=itemsize;
146 threads[i].compare=compare_function;
147 }
148
149 /* Loop around, fill the buffer, sort the data and write a temporary file */
150
151 do
152 {
153 int thread=0;
154
155 #if defined(USE_PTHREADS) && USE_PTHREADS
156
157 if(option_filesort_threads>1)
158 {
159 /* Find a spare slot (one *must* be unused at all times) */
160
161 pthread_mutex_lock(&running_mutex);
162
163 for(thread=0;thread<option_filesort_threads;thread++)
164 if(!threads[thread].running)
165 break;
166
167 pthread_mutex_unlock(&running_mutex);
168 }
169
170 #endif
171
172 /* Read in the data and create pointers */
173
174 for(i=0;i<nitems;)
175 {
176 threads[thread].datap[i]=threads[thread].data+i*itemsize;
177
178 if(ReadFile(fd_in,threads[thread].datap[i],itemsize))
179 {
180 more=0;
181 break;
182 }
183
184 if(!pre_sort_function || pre_sort_function(threads[thread].datap[i],count_in))
185 {
186 i++;
187 count_in++;
188 }
189 }
190
191 threads[thread].n=i;
192
193 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
194
195 if(more==0 && count_in==0)
196 goto tidy_and_exit;
197
198 /* No new data read in this time round */
199
200 if(threads[thread].n==0)
201 break;
202
203 /* Sort the data pointers using a heap sort (potentially in a thread) */
204
205 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
206
207 #if defined(USE_PTHREADS) && USE_PTHREADS
208
209 /* Shortcut if only one file, don't write to disk */
210
211 if(more==0 && nfiles==0)
212 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
213 else if(option_filesort_threads>1)
214 {
215 pthread_mutex_lock(&running_mutex);
216
217 while(nthreads==(option_filesort_threads-1))
218 {
219 for(i=0;i<option_filesort_threads;i++)
220 if(threads[i].running==2)
221 {
222 pthread_join(threads[i].thread,NULL);
223 threads[i].running=0;
224 nthreads--;
225 }
226
227 if(nthreads==(option_filesort_threads-1))
228 pthread_cond_wait(&running_cond,&running_mutex);
229 }
230
231 threads[thread].running=1;
232
233 pthread_mutex_unlock(&running_mutex);
234
235 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
236
237 nthreads++;
238 }
239 else
240 filesort_fixed_heapsort_thread(&threads[thread]);
241
242 #else
243
244 /* Shortcut if only one file, don't write to disk */
245
246 if(more==0 && nfiles==0)
247 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
248 else
249 filesort_fixed_heapsort_thread(&threads[thread]);
250
251 #endif
252
253 nfiles++;
254 }
255 while(more);
256
257 /* Wait for all of the threads to finish */
258
259 #if defined(USE_PTHREADS) && USE_PTHREADS
260
261 while(option_filesort_threads>1 && nthreads)
262 {
263 pthread_mutex_lock(&running_mutex);
264
265 pthread_cond_wait(&running_cond,&running_mutex);
266
267 for(i=0;i<option_filesort_threads;i++)
268 if(threads[i].running==2)
269 {
270 pthread_join(threads[i].thread,NULL);
271 threads[i].running=0;
272 nthreads--;
273 }
274
275 pthread_mutex_unlock(&running_mutex);
276 }
277
278 #endif
279
280 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
281
282 if(nfiles==1)
283 {
284 for(i=0;i<threads[0].n;i++)
285 {
286 if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out))
287 {
288 WriteFile(fd_out,threads[0].datap[i],itemsize);
289 count_out++;
290 }
291 }
292
293 DeleteFile(threads[0].filename);
294
295 goto tidy_and_exit;
296 }
297
298 /* Check that number of files is less than file size */
299
300 assert(nfiles<nitems);
301
302 /* Open all of the temporary files */
303
304 fds=(int*)malloc(nfiles*sizeof(int));
305
306 for(i=0;i<nfiles;i++)
307 {
308 char *filename=threads[0].filename;
309
310 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
311
312 fds[i]=ReOpenFile(filename);
313
314 DeleteFile(filename);
315 }
316
317 /* Perform an n-way merge using a binary heap */
318
319 heap=(int*)malloc((1+nfiles)*sizeof(int));
320
321 data =threads[0].data;
322 datap=threads[0].datap;
323
324 /* Fill the heap to start with */
325
326 for(i=0;i<nfiles;i++)
327 {
328 int index;
329
330 datap[i]=data+i*itemsize;
331
332 ReadFile(fds[i],datap[i],itemsize);
333
334 index=i+1;
335
336 heap[index]=i;
337
338 /* Bubble up the new value */
339
340 while(index>1)
341 {
342 int newindex;
343 int temp;
344
345 newindex=index/2;
346
347 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
348 break;
349
350 temp=heap[index];
351 heap[index]=heap[newindex];
352 heap[newindex]=temp;
353
354 index=newindex;
355 }
356 }
357
358 /* Repeatedly pull out the root of the heap and refill from the same file */
359
360 ndata=nfiles;
361
362 do
363 {
364 int index=1;
365
366 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
367 {
368 WriteFile(fd_out,datap[heap[index]],itemsize);
369 count_out++;
370 }
371
372 if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize))
373 {
374 heap[index]=heap[ndata];
375 ndata--;
376 }
377
378 /* Bubble down the new value */
379
380 while((2*index)<ndata)
381 {
382 int newindex;
383 int temp;
384
385 newindex=2*index;
386
387 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
388 newindex=newindex+1;
389
390 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
391 break;
392
393 temp=heap[newindex];
394 heap[newindex]=heap[index];
395 heap[index]=temp;
396
397 index=newindex;
398 }
399
400 if((2*index)==ndata)
401 {
402 int newindex;
403 int temp;
404
405 newindex=2*index;
406
407 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
408 ; /* break */
409 else
410 {
411 temp=heap[newindex];
412 heap[newindex]=heap[index];
413 heap[index]=temp;
414 }
415 }
416 }
417 while(ndata>0);
418
419 /* Tidy up */
420
421 tidy_and_exit:
422
423 if(fds)
424 {
425 for(i=0;i<nfiles;i++)
426 CloseFile(fds[i]);
427 free(fds);
428 }
429
430 if(heap)
431 free(heap);
432
433 for(i=0;i<option_filesort_threads;i++)
434 {
435 free(threads[i].data);
436 free(threads[i].datap);
437
438 free(threads[i].filename);
439 }
440
441 return(count_out);
442 }
443
444
445 /*++++++++++++++++++++++++++++++++++++++
446 A function to sort the contents of a file of variable length objects (each
447 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
448
449 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
450 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
451 The individual sort steps and the merge step both use a "Heap sort"
452 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
453 if the data is already partially sorted.
454
455 index_t filesort_vary Returns the number of objects kept.
456
457 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
458
459 int fd_out The file descriptor of the output file (opened for writing and empty).
460
461 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
462 each item before they have been sorted. The second parameter is the number of objects
463 previously read from the input file. If the function returns 1 then the object is kept
464 and it is sorted, otherwise it is ignored.
465
466 int (*compare_function)(const void*, const void*) The comparison function. This is identical
467 to qsort if the data to be sorted is an array of things not pointers.
468
469 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
470 each item after they have been sorted. The second parameter is the number of objects
471 already written to the output file. If the function returns 1 then the object is written
472 to the output file., otherwise it is ignored.
473 ++++++++++++++++++++++++++++++++++++++*/
474
475 index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t),
476 int (*compare_function)(const void*,const void*),
477 int (*post_sort_function)(void*,index_t))
478 {
479 int *fds=NULL,*heap=NULL;
480 int nfiles=0,ndata=0;
481 index_t count_out=0,count_in=0;
482 size_t datasize=option_filesort_ramsize/option_filesort_threads;
483 FILESORT_VARINT nextitemsize,largestitemsize=0;
484 void *data,**datap;
485 thread_data *threads;
486 int i,more=1;
487 #if defined(USE_PTHREADS) && USE_PTHREADS
488 int nthreads=0;
489 #endif
490
491 /* Allocate the RAM buffer and other bits */
492
493 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
494
495 for(i=0;i<option_filesort_threads;i++)
496 {
497 threads[i].running=0;
498
499 threads[i].data=malloc(datasize);
500 threads[i].datap=NULL;
501
502 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
503
504 threads[i].compare=compare_function;
505 }
506
507 /* Loop around, fill the buffer, sort the data and write a temporary file */
508
509 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
510 goto tidy_and_exit;
511
512 do
513 {
514 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
515 int thread=0;
516
517 #if defined(USE_PTHREADS) && USE_PTHREADS
518
519 if(option_filesort_threads>1)
520 {
521 /* Find a spare slot (one *must* be unused at all times) */
522
523 pthread_mutex_lock(&running_mutex);
524
525 for(thread=0;thread<option_filesort_threads;thread++)
526 if(!threads[thread].running)
527 break;
528
529 pthread_mutex_unlock(&running_mutex);
530 }
531
532 #endif
533
534 threads[thread].datap=threads[thread].data+datasize;
535
536 threads[thread].n=0;
537
538 /* Read in the data and create pointers */
539
540 while((ramused+FILESORT_VARSIZE+nextitemsize)<=((void*)threads[thread].datap-sizeof(void*)-threads[thread].data))
541 {
542 FILESORT_VARINT itemsize=nextitemsize;
543
544 if(itemsize>largestitemsize)
545 largestitemsize=itemsize;
546
547 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
548
549 ramused+=FILESORT_VARSIZE;
550
551 ReadFile(fd_in,threads[thread].data+ramused,itemsize);
552
553 if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in))
554 {
555 *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
556
557 ramused+=itemsize;
558
559 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN);
560 ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
561
562 count_in++;
563 threads[thread].n++;
564 }
565
566 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE))
567 {
568 more=0;
569 break;
570 }
571 }
572
573 /* No new data read in this time round */
574
575 if(threads[thread].n==0)
576 break;
577
578 /* Sort the data pointers using a heap sort (potentially in a thread) */
579
580 if(more==0 && nfiles==0)
581 threads[thread].filename[0]=0;
582 else
583 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
584
585 #if defined(USE_PTHREADS) && USE_PTHREADS
586
587 /* Shortcut if only one file, don't write to disk */
588
589 if(more==0 && nfiles==0)
590 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
591 else if(option_filesort_threads>1)
592 {
593 pthread_mutex_lock(&running_mutex);
594
595 while(nthreads==(option_filesort_threads-1))
596 {
597 for(i=0;i<option_filesort_threads;i++)
598 if(threads[i].running==2)
599 {
600 pthread_join(threads[i].thread,NULL);
601 threads[i].running=0;
602 nthreads--;
603 }
604
605 if(nthreads==(option_filesort_threads-1))
606 pthread_cond_wait(&running_cond,&running_mutex);
607 }
608
609 threads[thread].running=1;
610
611 pthread_mutex_unlock(&running_mutex);
612
613 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
614
615 nthreads++;
616 }
617 else
618 filesort_vary_heapsort_thread(&threads[thread]);
619
620 #else
621
622 /* Shortcut if only one file, don't write to disk */
623
624 if(more==0 && nfiles==0)
625 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
626 else
627 filesort_vary_heapsort_thread(&threads[thread]);
628
629 #endif
630
631 nfiles++;
632 }
633 while(more);
634
635 /* Wait for all of the threads to finish */
636
637 #if defined(USE_PTHREADS) && USE_PTHREADS
638
639 while(option_filesort_threads>1 && nthreads)
640 {
641 pthread_mutex_lock(&running_mutex);
642
643 pthread_cond_wait(&running_cond,&running_mutex);
644
645 for(i=0;i<option_filesort_threads;i++)
646 if(threads[i].running==2)
647 {
648 pthread_join(threads[i].thread,NULL);
649 threads[i].running=0;
650 nthreads--;
651 }
652
653 pthread_mutex_unlock(&running_mutex);
654 }
655
656 #endif
657
658 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
659
660 if(nfiles==1)
661 {
662 for(i=0;i<threads[0].n;i++)
663 {
664 if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out))
665 {
666 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[i]-FILESORT_VARSIZE);
667
668 WriteFile(fd_out,threads[0].datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
669 count_out++;
670 }
671 }
672
673 DeleteFile(threads[0].filename);
674
675 goto tidy_and_exit;
676 }
677
678 /* Check that number of files is less than file size */
679
680 largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN);
681
682 assert(nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize));
683
684 /* Open all of the temporary files */
685
686 fds=(int*)malloc(nfiles*sizeof(int));
687
688 for(i=0;i<nfiles;i++)
689 {
690 char *filename=threads[0].filename;
691
692 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
693
694 fds[i]=ReOpenFile(filename);
695
696 DeleteFile(filename);
697 }
698
699 /* Perform an n-way merge using a binary heap */
700
701 heap=(int*)malloc((1+nfiles)*sizeof(int));
702
703 data=threads[0].data;
704 datap=data+datasize-nfiles*sizeof(void*);
705
706 /* Fill the heap to start with */
707
708 for(i=0;i<nfiles;i++)
709 {
710 int index;
711 FILESORT_VARINT itemsize;
712
713 datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize;
714
715 ReadFile(fds[i],&itemsize,FILESORT_VARSIZE);
716
717 *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize;
718
719 ReadFile(fds[i],datap[i],itemsize);
720
721 index=i+1;
722
723 heap[index]=i;
724
725 /* Bubble up the new value */
726
727 while(index>1)
728 {
729 int newindex;
730 int temp;
731
732 newindex=index/2;
733
734 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
735 break;
736
737 temp=heap[index];
738 heap[index]=heap[newindex];
739 heap[newindex]=temp;
740
741 index=newindex;
742 }
743 }
744
745 /* Repeatedly pull out the root of the heap and refill from the same file */
746
747 ndata=nfiles;
748
749 do
750 {
751 int index=1;
752 FILESORT_VARINT itemsize;
753
754 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
755 {
756 itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE);
757
758 WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
759 count_out++;
760 }
761
762 if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
763 {
764 heap[index]=heap[ndata];
765 ndata--;
766 }
767 else
768 {
769 *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
770
771 ReadFile(fds[heap[index]],datap[heap[index]],itemsize);
772 }
773
774 /* Bubble down the new value */
775
776 while((2*index)<ndata)
777 {
778 int newindex;
779 int temp;
780
781 newindex=2*index;
782
783 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
784 newindex=newindex+1;
785
786 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
787 break;
788
789 temp=heap[newindex];
790 heap[newindex]=heap[index];
791 heap[index]=temp;
792
793 index=newindex;
794 }
795
796 if((2*index)==ndata)
797 {
798 int newindex;
799 int temp;
800
801 newindex=2*index;
802
803 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
804 ; /* break */
805 else
806 {
807 temp=heap[newindex];
808 heap[newindex]=heap[index];
809 heap[index]=temp;
810 }
811 }
812 }
813 while(ndata>0);
814
815 /* Tidy up */
816
817 tidy_and_exit:
818
819 if(fds)
820 {
821 for(i=0;i<nfiles;i++)
822 CloseFile(fds[i]);
823 free(fds);
824 }
825
826 if(heap)
827 free(heap);
828
829 for(i=0;i<option_filesort_threads;i++)
830 {
831 free(threads[i].data);
832
833 free(threads[i].filename);
834 }
835
836 return(count_out);
837 }
838
839
840 /*++++++++++++++++++++++++++++++++++++++
841 A wrapper function that can be run in a thread for fixed data.
842
843 void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
844
845 thread_data *thread The data to be processed in this thread.
846 ++++++++++++++++++++++++++++++++++++++*/
847
848 static void *filesort_fixed_heapsort_thread(thread_data *thread)
849 {
850 int fd,i;
851
852 /* Sort the data pointers using a heap sort */
853
854 filesort_heapsort(thread->datap,thread->n,thread->compare);
855
856 /* Create a temporary file and write the result */
857
858 fd=OpenFileNew(thread->filename);
859
860 for(i=0;i<thread->n;i++)
861 WriteFile(fd,thread->datap[i],thread->itemsize);
862
863 CloseFile(fd);
864
865 #if defined(USE_PTHREADS) && USE_PTHREADS
866
867 if(option_filesort_threads>1)
868 {
869 pthread_mutex_lock(&running_mutex);
870
871 thread->running=2;
872
873 pthread_cond_signal(&running_cond);
874
875 pthread_mutex_unlock(&running_mutex);
876 }
877
878 #endif
879
880 return(NULL);
881 }
882
883
884 /*++++++++++++++++++++++++++++++++++++++
885 A wrapper function that can be run in a thread for variable data.
886
887 void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
888
889 thread_data *thread The data to be processed in this thread.
890 ++++++++++++++++++++++++++++++++++++++*/
891
892 static void *filesort_vary_heapsort_thread(thread_data *thread)
893 {
894 int fd,i;
895
896 /* Sort the data pointers using a heap sort */
897
898 filesort_heapsort(thread->datap,thread->n,thread->compare);
899
900 /* Create a temporary file and write the result */
901
902 fd=OpenFileNew(thread->filename);
903
904 for(i=0;i<thread->n;i++)
905 {
906 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[i]-FILESORT_VARSIZE);
907
908 WriteFile(fd,thread->datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
909 }
910
911 CloseFile(fd);
912
913 #if defined(USE_PTHREADS) && USE_PTHREADS
914
915 if(option_filesort_threads>1)
916 {
917 pthread_mutex_lock(&running_mutex);
918
919 thread->running=2;
920
921 pthread_cond_signal(&running_cond);
922
923 pthread_mutex_unlock(&running_mutex);
924 }
925
926 #endif
927
928 return(NULL);
929 }
930
931
932 /*++++++++++++++++++++++++++++++++++++++
933 A function to sort an array of pointers efficiently.
934
935 The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
936 in particular, this is good because it can operate in-place and doesn't
937 allocate more memory like using qsort() does.
938
939 void **datap A pointer to the array of pointers to sort.
940
941 size_t nitems The number of items of data to sort.
942
943 int (*compare_function)(const void*, const void*) The comparison function. This is identical
944 to qsort if the data to be sorted is an array of things not pointers.
945 ++++++++++++++++++++++++++++++++++++++*/
946
947 void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*))
948 {
949 void **datap1=&datap[-1];
950 int i;
951
952 /* Fill the heap by pretending to insert the data that is already there */
953
954 for(i=2;i<=nitems;i++)
955 {
956 int index=i;
957
958 /* Bubble up the new value (upside-down, put largest at top) */
959
960 while(index>1)
961 {
962 int newindex;
963 void *temp;
964
965 newindex=index/2;
966
967 if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */
968 break;
969
970 temp=datap1[index];
971 datap1[index]=datap1[newindex];
972 datap1[newindex]=temp;
973
974 index=newindex;
975 }
976 }
977
978 /* Repeatedly pull out the root of the heap and swap with the bottom item */
979
980 for(i=nitems;i>1;i--)
981 {
982 int index=1;
983 void *temp;
984
985 temp=datap1[index];
986 datap1[index]=datap1[i];
987 datap1[i]=temp;
988
989 /* Bubble down the new value (upside-down, put largest at top) */
990
991 while((2*index)<(i-1))
992 {
993 int newindex;
994 void *temp;
995
996 newindex=2*index;
997
998 if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */
999 newindex=newindex+1;
1000
1001 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1002 break;
1003
1004 temp=datap1[newindex];
1005 datap1[newindex]=datap1[index];
1006 datap1[index]=temp;
1007
1008 index=newindex;
1009 }
1010
1011 if((2*index)==(i-1))
1012 {
1013 int newindex;
1014 void *temp;
1015
1016 newindex=2*index;
1017
1018 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1019 ; /* break */
1020 else
1021 {
1022 temp=datap1[newindex];
1023 datap1[newindex]=datap1[index];
1024 datap1[index]=temp;
1025 }
1026 }
1027 }
1028 }

Properties

Name Value
cvs:description Functions to perform sorting.