Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Contents of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 996 - (show annotations) (download) (as text)
Thu May 10 18:18:14 2012 UTC (12 years, 10 months ago) by amb
File MIME type: text/x-csrc
File size: 23926 byte(s)
Added some mutexes and condition variables to communicate between threads.

1 /***************************************
2 Merge sort functions.
3
4 Part of the Routino routing software.
5 ******************/ /******************
6 This file Copyright 2009-2012 Andrew M. Bishop
7
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU Affero General Public License as published by
10 the Free Software Foundation, either version 3 of the License, or
11 (at your option) any later version.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU Affero General Public License for more details.
17
18 You should have received a copy of the GNU Affero General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ***************************************/
21
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <assert.h>
27
28 #if defined(USE_PTHREADS) && USE_PTHREADS
29 #include <pthread.h>
30 #endif
31
32 #include "types.h"
33
34 #include "files.h"
35 #include "sorting.h"
36
37
38 /* Global variables */
39
40 /*+ The command line '--tmpdir' option or its default value. +*/
41 extern char *option_tmpdirname;
42
43 /*+ The amount of RAM to use for filesorting. +*/
44 extern size_t option_filesort_ramsize;
45
46 /*+ The number of filesorting threads allowed. +*/
47 extern int option_filesort_threads;
48
49
50 /* Thread data type definitions */
51
52 /*+ A data type for holding data for a thread. +*/
53 typedef struct _thread_data
54 {
55 pthread_t thread; /*+ The thread identifier. +*/
56
57 int running; /*+ A flag indicating the current state of the thread. +*/
58
59 void *data; /*+ The main data array. +*/
60 void **datap; /*+ An array of pointers to the data objects. +*/
61 size_t n; /*+ The number of pointers. +*/
62
63 char *filename; /*+ The name of the file to write the results to. +*/
64
65 size_t itemsize; /*+ The size of each item. +*/
66 int (*compare)(const void*,const void*); /*+ The comparison function. +*/
67 }
68 thread_data;
69
70 /* Thread variables */
71
72 #if defined(USE_PTHREADS) && USE_PTHREADS
73
74 static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
75 static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
76
77 #endif
78
79 /* Thread helper functions */
80
81 static void *filesort_fixed_heapsort_thread(thread_data *thread);
82 static void *filesort_vary_heapsort_thread(thread_data *thread);
83
84
85 /*++++++++++++++++++++++++++++++++++++++
86 A function to sort the contents of a file of fixed length objects using a
87 limited amount of RAM.
88
89 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
90 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
91 The individual sort steps and the merge step both use a "Heap sort"
92 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
93 if the data is already partially sorted.
94
95 index_t filesort_fixed Returns the number of objects kept.
96
97 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
98
99 int fd_out The file descriptor of the output file (opened for writing and empty).
100
101 size_t itemsize The size of each item in the file that needs sorting.
102
103 int (*compare)(const void*, const void*) The comparison function (identical to qsort if the
104 data to be sorted is an array of things not pointers).
105
106 int (*keep)(void *,index_t) If non-NULL then this function is called for each item, if it
107 returns 1 then the object is kept and written to the output file.
108 ++++++++++++++++++++++++++++++++++++++*/
109
110 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*compare)(const void*,const void*),int (*keep)(void*,index_t))
111 {
112 int *fds=NULL,*heap=NULL;
113 int nfiles=0,ndata=0;
114 index_t count=0,total=0;
115 size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
116 void *data,**datap;
117 thread_data *threads;
118 int i,more=1;
119 #if defined(USE_PTHREADS) && USE_PTHREADS
120 int nthreads=0;
121 #endif
122
123 /* Allocate the RAM buffer and other bits */
124
125 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
126
127 for(i=0;i<option_filesort_threads;i++)
128 {
129 threads[i].running=0;
130
131 threads[i].data=malloc(nitems*itemsize);
132 threads[i].datap=malloc(nitems*sizeof(void*));
133
134 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
135
136 threads[i].itemsize=itemsize;
137 threads[i].compare=compare;
138 }
139
140 /* Loop around, fill the buffer, sort the data and write a temporary file */
141
142 do
143 {
144 int thread=0;
145
146 #if defined(USE_PTHREADS) && USE_PTHREADS
147
148 /* Find a spare slot (one *must* be unused at all times) */
149
150 pthread_mutex_lock(&running_mutex);
151
152 for(thread=0;thread<option_filesort_threads;thread++)
153 if(!threads[thread].running)
154 break;
155
156 pthread_mutex_unlock(&running_mutex);
157
158 #endif
159
160 /* Read in the data and create pointers */
161
162 for(i=0;i<nitems;i++)
163 {
164 threads[thread].datap[i]=threads[thread].data+i*itemsize;
165
166 if(ReadFile(fd_in,threads[thread].datap[i],itemsize))
167 {
168 more=0;
169 break;
170 }
171
172 total++;
173 }
174
175 threads[thread].n=i;
176
177 /* Shortcut if there is no data and no previous files (i.e. no data at all) */
178
179 if(nfiles==0 && threads[thread].n==0)
180 goto tidy_and_exit;
181
182 /* No new data read in this time round */
183
184 if(threads[thread].n==0)
185 break;
186
187 /* Sort the data pointers using a heap sort (potentially in a thread) */
188
189 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
190
191 #if defined(USE_PTHREADS) && USE_PTHREADS
192
193 if(option_filesort_threads==1)
194 {
195 pthread_mutex_lock(&running_mutex);
196
197 threads[thread].running=1;
198
199 pthread_mutex_unlock(&running_mutex);
200
201 filesort_fixed_heapsort_thread(&threads[thread]);
202
203 pthread_mutex_lock(&running_mutex);
204
205 threads[thread].running=0;
206
207 pthread_mutex_unlock(&running_mutex);
208 }
209 else
210 {
211 pthread_mutex_lock(&running_mutex);
212
213 while(nthreads==(option_filesort_threads-1))
214 {
215 for(i=0;i<option_filesort_threads;i++)
216 if(threads[i].running==2)
217 {
218 pthread_join(threads[i].thread,NULL);
219 threads[i].running=0;
220 nthreads--;
221 }
222
223 if(nthreads==(option_filesort_threads-1))
224 pthread_cond_wait(&running_cond,&running_mutex);
225 }
226
227 threads[thread].running=1;
228
229 pthread_mutex_unlock(&running_mutex);
230
231 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
232
233 nthreads++;
234 }
235
236 #else
237
238 filesort_fixed_heapsort_thread(&threads[thread]);
239
240 #endif
241
242 nfiles++;
243 }
244 while(more);
245
246 /* Wait for all of the threads to finish */
247
248 #if defined(USE_PTHREADS) && USE_PTHREADS
249
250 while(nthreads && option_filesort_threads>1)
251 {
252 pthread_mutex_lock(&running_mutex);
253
254 pthread_cond_wait(&running_cond,&running_mutex);
255
256 for(i=0;i<option_filesort_threads;i++)
257 if(threads[i].running==2)
258 {
259 pthread_join(threads[i].thread,NULL);
260 threads[i].running=0;
261 nthreads--;
262 }
263
264 pthread_mutex_unlock(&running_mutex);
265 }
266
267 #endif
268
269 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
270
271 if(nfiles==1)
272 {
273 for(i=0;i<threads[0].n;i++)
274 {
275 if(!keep || keep(threads[0].datap[i],count))
276 {
277 WriteFile(fd_out,threads[0].datap[i],itemsize);
278 count++;
279 }
280 }
281
282 DeleteFile(threads[0].filename);
283
284 goto tidy_and_exit;
285 }
286
287 /* Check that number of files is less than file size */
288
289 assert(nfiles<nitems);
290
291 /* Open all of the temporary files */
292
293 fds=(int*)malloc(nfiles*sizeof(int));
294
295 for(i=0;i<nfiles;i++)
296 {
297 char *filename=threads[0].filename;
298
299 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
300
301 fds[i]=ReOpenFile(filename);
302
303 DeleteFile(filename);
304 }
305
306 /* Perform an n-way merge using a binary heap */
307
308 heap=(int*)malloc((1+nfiles)*sizeof(int));
309
310 data =threads[0].data;
311 datap=threads[0].datap;
312
313 /* Fill the heap to start with */
314
315 for(i=0;i<nfiles;i++)
316 {
317 int index;
318
319 datap[i]=data+i*itemsize;
320
321 ReadFile(fds[i],datap[i],itemsize);
322
323 index=i+1;
324
325 heap[index]=i;
326
327 /* Bubble up the new value */
328
329 while(index>1)
330 {
331 int newindex;
332 int temp;
333
334 newindex=index/2;
335
336 if(compare(datap[heap[index]],datap[heap[newindex]])>=0)
337 break;
338
339 temp=heap[index];
340 heap[index]=heap[newindex];
341 heap[newindex]=temp;
342
343 index=newindex;
344 }
345 }
346
347 /* Repeatedly pull out the root of the heap and refill from the same file */
348
349 ndata=nfiles;
350
351 do
352 {
353 int index=1;
354
355 if(!keep || keep(datap[heap[index]],count))
356 {
357 WriteFile(fd_out,datap[heap[index]],itemsize);
358 count++;
359 }
360
361 if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize))
362 {
363 heap[index]=heap[ndata];
364 ndata--;
365 }
366
367 /* Bubble down the new value */
368
369 while((2*index)<ndata)
370 {
371 int newindex;
372 int temp;
373
374 newindex=2*index;
375
376 if(compare(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
377 newindex=newindex+1;
378
379 if(compare(datap[heap[index]],datap[heap[newindex]])<=0)
380 break;
381
382 temp=heap[newindex];
383 heap[newindex]=heap[index];
384 heap[index]=temp;
385
386 index=newindex;
387 }
388
389 if((2*index)==ndata)
390 {
391 int newindex;
392 int temp;
393
394 newindex=2*index;
395
396 if(compare(datap[heap[index]],datap[heap[newindex]])<=0)
397 ; /* break */
398 else
399 {
400 temp=heap[newindex];
401 heap[newindex]=heap[index];
402 heap[index]=temp;
403 }
404 }
405 }
406 while(ndata>0);
407
408 /* Tidy up */
409
410 tidy_and_exit:
411
412 if(fds)
413 {
414 for(i=0;i<nfiles;i++)
415 CloseFile(fds[i]);
416 free(fds);
417 }
418
419 if(heap)
420 free(heap);
421
422 for(i=0;i<option_filesort_threads;i++)
423 {
424 free(threads[i].data);
425 free(threads[i].datap);
426
427 free(threads[i].filename);
428 }
429
430 return(count);
431 }
432
433
434 /*++++++++++++++++++++++++++++++++++++++
435 A function to sort the contents of a file of variable length objects (each
436 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
437
438 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
439 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
440 The individual sort steps and the merge step both use a "Heap sort"
441 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
442 if the data is already partially sorted.
443
444 index_t filesort_vary Returns the number of objects kept.
445
446 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
447
448 int fd_out The file descriptor of the output file (opened for writing and empty).
449
450 int (*compare)(const void*, const void*) The comparison function (identical to qsort if the
451 data to be sorted is an array of things not pointers).
452
453 int (*keep)(void *,index_t) If non-NULL then this function is called for each item, if it
454 returns 1 then the object is kept and written to the output file.
455 ++++++++++++++++++++++++++++++++++++++*/
456
457 index_t filesort_vary(int fd_in,int fd_out,int (*compare)(const void*,const void*),int (*keep)(void*,index_t))
458 {
459 int *fds=NULL,*heap=NULL;
460 int nfiles=0,ndata=0;
461 index_t count=0,total=0;
462 size_t datasize=option_filesort_ramsize/option_filesort_threads;
463 FILESORT_VARINT nextitemsize,largestitemsize=0;
464 void *data,**datap;
465 thread_data *threads;
466 int i,more=1;
467 #if defined(USE_PTHREADS) && USE_PTHREADS
468 int nthreads=0;
469 #endif
470
471 /* Allocate the RAM buffer and other bits */
472
473 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
474
475 for(i=0;i<option_filesort_threads;i++)
476 {
477 threads[i].running=0;
478
479 threads[i].data=malloc(datasize);
480 threads[i].datap=NULL;
481
482 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
483
484 threads[i].compare=compare;
485 }
486
487 /* Loop around, fill the buffer, sort the data and write a temporary file */
488
489 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
490 goto tidy_and_exit;
491
492 do
493 {
494 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
495 int thread=0;
496
497 #if defined(USE_PTHREADS) && USE_PTHREADS
498
499 /* Find a spare slot (one *must* be unused at all times) */
500
501 pthread_mutex_lock(&running_mutex);
502
503 for(thread=0;thread<option_filesort_threads;thread++)
504 if(!threads[thread].running)
505 break;
506
507 pthread_mutex_unlock(&running_mutex);
508
509 #endif
510
511 threads[thread].datap=threads[thread].data+datasize;
512
513 threads[thread].n=0;
514
515 /* Read in the data and create pointers */
516
517 while((ramused+FILESORT_VARSIZE+nextitemsize)<=((void*)threads[thread].datap-sizeof(void*)-threads[thread].data))
518 {
519 FILESORT_VARINT itemsize=nextitemsize;
520
521 if(itemsize>largestitemsize)
522 largestitemsize=itemsize;
523
524 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
525
526 ramused+=FILESORT_VARSIZE;
527
528 ReadFile(fd_in,threads[thread].data+ramused,itemsize);
529
530 *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
531
532 ramused+=itemsize;
533
534 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN);
535 ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
536
537 total++;
538 threads[thread].n++;
539
540 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE))
541 {
542 more=0;
543 break;
544 }
545 }
546
547 /* No new data read in this time round */
548
549 if(threads[thread].n==0)
550 break;
551
552 /* Sort the data pointers using a heap sort (potentially in a thread) */
553
554 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
555
556 #if defined(USE_PTHREADS) && USE_PTHREADS
557
558 if(option_filesort_threads==1)
559 {
560 pthread_mutex_lock(&running_mutex);
561
562 threads[thread].running=1;
563
564 pthread_mutex_unlock(&running_mutex);
565
566 filesort_vary_heapsort_thread(&threads[thread]);
567
568 pthread_mutex_lock(&running_mutex);
569
570 threads[thread].running=0;
571
572 pthread_mutex_unlock(&running_mutex);
573 }
574 else
575 {
576 pthread_mutex_lock(&running_mutex);
577
578 while(nthreads==(option_filesort_threads-1))
579 {
580 for(i=0;i<option_filesort_threads;i++)
581 if(threads[i].running==2)
582 {
583 pthread_join(threads[i].thread,NULL);
584 threads[i].running=0;
585 nthreads--;
586 }
587
588 if(nthreads==(option_filesort_threads-1))
589 pthread_cond_wait(&running_cond,&running_mutex);
590 }
591
592 threads[thread].running=1;
593
594 pthread_mutex_unlock(&running_mutex);
595
596 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
597
598 nthreads++;
599 }
600
601 #else
602
603 filesort_vary_heapsort_thread(&threads[thread]);
604
605 #endif
606
607 nfiles++;
608 }
609 while(more);
610
611 /* Wait for all of the threads to finish */
612
613 #if defined(USE_PTHREADS) && USE_PTHREADS
614
615 while(nthreads && option_filesort_threads>1)
616 {
617 pthread_mutex_lock(&running_mutex);
618
619 pthread_cond_wait(&running_cond,&running_mutex);
620
621 for(i=0;i<option_filesort_threads;i++)
622 if(threads[i].running==2)
623 {
624 pthread_join(threads[i].thread,NULL);
625 threads[i].running=0;
626 nthreads--;
627 }
628
629 pthread_mutex_unlock(&running_mutex);
630 }
631
632 #endif
633
634 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
635
636 if(nfiles==1)
637 {
638 for(i=0;i<threads[0].n;i++)
639 {
640 if(!keep || keep(threads[0].datap[i],count))
641 {
642 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[i]-FILESORT_VARSIZE);
643
644 WriteFile(fd_out,threads[0].datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
645 count++;
646 }
647 }
648
649 DeleteFile(threads[0].filename);
650
651 goto tidy_and_exit;
652 }
653
654 /* Check that number of files is less than file size */
655
656 largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN);
657
658 assert(nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize));
659
660 /* Open all of the temporary files */
661
662 fds=(int*)malloc(nfiles*sizeof(int));
663
664 for(i=0;i<nfiles;i++)
665 {
666 char *filename=threads[0].filename;
667
668 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
669
670 fds[i]=ReOpenFile(filename);
671
672 DeleteFile(filename);
673 }
674
675 /* Perform an n-way merge using a binary heap */
676
677 heap=(int*)malloc((1+nfiles)*sizeof(int));
678
679 data=threads[0].data;
680 datap=data+datasize-nfiles*sizeof(void*);
681
682 /* Fill the heap to start with */
683
684 for(i=0;i<nfiles;i++)
685 {
686 int index;
687 FILESORT_VARINT itemsize;
688
689 datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize;
690
691 ReadFile(fds[i],&itemsize,FILESORT_VARSIZE);
692
693 *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize;
694
695 ReadFile(fds[i],datap[i],itemsize);
696
697 index=i+1;
698
699 heap[index]=i;
700
701 /* Bubble up the new value */
702
703 while(index>1)
704 {
705 int newindex;
706 int temp;
707
708 newindex=index/2;
709
710 if(compare(datap[heap[index]],datap[heap[newindex]])>=0)
711 break;
712
713 temp=heap[index];
714 heap[index]=heap[newindex];
715 heap[newindex]=temp;
716
717 index=newindex;
718 }
719 }
720
721 /* Repeatedly pull out the root of the heap and refill from the same file */
722
723 ndata=nfiles;
724
725 do
726 {
727 int index=1;
728 FILESORT_VARINT itemsize;
729
730 if(!keep || keep(datap[heap[index]],count))
731 {
732 itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE);
733
734 WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
735 count++;
736 }
737
738 if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
739 {
740 heap[index]=heap[ndata];
741 ndata--;
742 }
743 else
744 {
745 *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
746
747 ReadFile(fds[heap[index]],datap[heap[index]],itemsize);
748 }
749
750 /* Bubble down the new value */
751
752 while((2*index)<ndata)
753 {
754 int newindex;
755 int temp;
756
757 newindex=2*index;
758
759 if(compare(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
760 newindex=newindex+1;
761
762 if(compare(datap[heap[index]],datap[heap[newindex]])<=0)
763 break;
764
765 temp=heap[newindex];
766 heap[newindex]=heap[index];
767 heap[index]=temp;
768
769 index=newindex;
770 }
771
772 if((2*index)==ndata)
773 {
774 int newindex;
775 int temp;
776
777 newindex=2*index;
778
779 if(compare(datap[heap[index]],datap[heap[newindex]])<=0)
780 ; /* break */
781 else
782 {
783 temp=heap[newindex];
784 heap[newindex]=heap[index];
785 heap[index]=temp;
786 }
787 }
788 }
789 while(ndata>0);
790
791 /* Tidy up */
792
793 tidy_and_exit:
794
795 if(fds)
796 {
797 for(i=0;i<nfiles;i++)
798 CloseFile(fds[i]);
799 free(fds);
800 }
801
802 if(heap)
803 free(heap);
804
805 for(i=0;i<option_filesort_threads;i++)
806 {
807 free(threads[i].data);
808
809 free(threads[i].filename);
810 }
811
812 return(count);
813 }
814
815
816 /*++++++++++++++++++++++++++++++++++++++
817 A wrapper function that can be run in a thread for fixed data.
818
819 void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
820
821 thread_data *thread The data to be processed in this thread.
822 ++++++++++++++++++++++++++++++++++++++*/
823
824 static void *filesort_fixed_heapsort_thread(thread_data *thread)
825 {
826 int fd,i;
827
828 /* Sort the data pointers using a heap sort */
829
830 filesort_heapsort(thread->datap,thread->n,thread->compare);
831
832 /* Create a temporary file and write the result */
833
834 fd=OpenFileNew(thread->filename);
835
836 for(i=0;i<thread->n;i++)
837 WriteFile(fd,thread->datap[i],thread->itemsize);
838
839 CloseFile(fd);
840
841 #if defined(USE_PTHREADS) && USE_PTHREADS
842
843 pthread_mutex_lock(&running_mutex);
844
845 thread->running=2;
846
847 pthread_cond_signal(&running_cond);
848
849 pthread_mutex_unlock(&running_mutex);
850
851 #endif
852
853 return(NULL);
854 }
855
856
857 /*++++++++++++++++++++++++++++++++++++++
858 A wrapper function that can be run in a thread for variable data.
859
860 void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
861
862 thread_data *thread The data to be processed in this thread.
863 ++++++++++++++++++++++++++++++++++++++*/
864
865 static void *filesort_vary_heapsort_thread(thread_data *thread)
866 {
867 int fd,i;
868
869 /* Sort the data pointers using a heap sort */
870
871 filesort_heapsort(thread->datap,thread->n,thread->compare);
872
873 /* Create a temporary file and write the result */
874
875 fd=OpenFileNew(thread->filename);
876
877 for(i=0;i<thread->n;i++)
878 {
879 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[i]-FILESORT_VARSIZE);
880
881 WriteFile(fd,thread->datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
882 }
883
884 CloseFile(fd);
885
886 #if defined(USE_PTHREADS) && USE_PTHREADS
887
888 pthread_mutex_lock(&running_mutex);
889
890 thread->running=2;
891
892 pthread_cond_signal(&running_cond);
893
894 pthread_mutex_unlock(&running_mutex);
895
896 #endif
897
898 return(NULL);
899 }
900
901
902 /*++++++++++++++++++++++++++++++++++++++
903 A function to sort an array of pointers efficiently.
904
905 The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
906 in particular, this is good because it can operate in-place and doesn't
907 allocate more memory like using qsort() does.
908
909 void **datap A pointer to the array of pointers to sort.
910
911 size_t nitems The number of items of data to sort.
912
913 int(*compare)(const void *, const void *) The comparison function (identical to qsort if the
914 data to be sorted was an array of things not pointers).
915 ++++++++++++++++++++++++++++++++++++++*/
916
917 void filesort_heapsort(void **datap,size_t nitems,int(*compare)(const void*, const void*))
918 {
919 void **datap1=&datap[-1];
920 int i;
921
922 /* Fill the heap by pretending to insert the data that is already there */
923
924 for(i=2;i<=nitems;i++)
925 {
926 int index=i;
927
928 /* Bubble up the new value (upside-down, put largest at top) */
929
930 while(index>1)
931 {
932 int newindex;
933 void *temp;
934
935 newindex=index/2;
936
937 if(compare(datap1[index],datap1[newindex])<=0) /* reversed compared to filesort_fixed() above */
938 break;
939
940 temp=datap1[index];
941 datap1[index]=datap1[newindex];
942 datap1[newindex]=temp;
943
944 index=newindex;
945 }
946 }
947
948 /* Repeatedly pull out the root of the heap and swap with the bottom item */
949
950 for(i=nitems;i>1;i--)
951 {
952 int index=1;
953 void *temp;
954
955 temp=datap1[index];
956 datap1[index]=datap1[i];
957 datap1[i]=temp;
958
959 /* Bubble down the new value (upside-down, put largest at top) */
960
961 while((2*index)<(i-1))
962 {
963 int newindex;
964 void *temp;
965
966 newindex=2*index;
967
968 if(compare(datap1[newindex],datap1[newindex+1])<=0) /* reversed compared to filesort_fixed() above */
969 newindex=newindex+1;
970
971 if(compare(datap1[index],datap1[newindex])>=0) /* reversed compared to filesort_fixed() above */
972 break;
973
974 temp=datap1[newindex];
975 datap1[newindex]=datap1[index];
976 datap1[index]=temp;
977
978 index=newindex;
979 }
980
981 if((2*index)==(i-1))
982 {
983 int newindex;
984 void *temp;
985
986 newindex=2*index;
987
988 if(compare(datap1[index],datap1[newindex])>=0) /* reversed compared to filesort_fixed() above */
989 ; /* break */
990 else
991 {
992 temp=datap1[newindex];
993 datap1[newindex]=datap1[index];
994 datap1[index]=temp;
995 }
996 }
997 }
998 }

Properties

Name Value
cvs:description Functions to perform sorting.