Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Contents of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1649 - (show annotations) (download) (as text)
Wed May 13 16:45:26 2015 UTC (9 years, 11 months ago) by amb
File MIME type: text/x-csrc
File size: 27528 byte(s)
Remove some pthread related code that was being used even if compiled
without pthreads.

1 /***************************************
2 Merge sort functions.
3
4 Part of the Routino routing software.
5 ******************/ /******************
6 This file Copyright 2009-2015 Andrew M. Bishop
7
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU Affero General Public License as published by
10 the Free Software Foundation, either version 3 of the License, or
11 (at your option) any later version.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU Affero General Public License for more details.
17
18 You should have received a copy of the GNU Affero General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ***************************************/
21
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #if defined(USE_PTHREADS) && USE_PTHREADS
28 #include <pthread.h>
29 #endif
30
31 #include "types.h"
32
33 #include "logging.h"
34 #include "files.h"
35 #include "sorting.h"
36
37
38 /* Global variables */
39
40 /*+ The command line '--tmpdir' option or its default value. +*/
41 extern char *option_tmpdirname;
42
43 /*+ The amount of RAM to use for filesorting. +*/
44 extern size_t option_filesort_ramsize;
45
46 /*+ The number of filesorting threads allowed. +*/
47 extern int option_filesort_threads;
48
49
50 /* Thread data type definitions */
51
52 /*+ A data type for holding data for a thread. +*/
53 typedef struct _thread_data
54 {
55 #if defined(USE_PTHREADS) && USE_PTHREADS
56
57 pthread_t thread; /*+ The thread identifier. +*/
58
59 int running; /*+ A flag indicating the current state of the thread. +*/
60
61 #endif
62
63 void *data; /*+ The main data array. +*/
64 void **datap; /*+ An array of pointers to the data objects. +*/
65 size_t n; /*+ The number of pointers. +*/
66
67 char *filename; /*+ The name of the file to write the results to. +*/
68
69 size_t itemsize; /*+ The size of each item. +*/
70 int (*compare)(const void*,const void*); /*+ The comparison function. +*/
71 }
72 thread_data;
73
74 /* Thread variables */
75
76 #if defined(USE_PTHREADS) && USE_PTHREADS
77
78 static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
79 static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
80
81 #endif
82
83 /* Thread helper functions */
84
85 static void *filesort_fixed_heapsort_thread(thread_data *thread);
86 static void *filesort_vary_heapsort_thread(thread_data *thread);
87
88
89 /*++++++++++++++++++++++++++++++++++++++
90 A function to sort the contents of a file of fixed length objects using a
91 limited amount of RAM.
92
93 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
94 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
95 The individual sort steps and the merge step both use a "Heap sort"
96 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
97 if the data is already partially sorted.
98
99 index_t filesort_fixed Returns the number of objects kept.
100
101 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
102
103 int fd_out The file descriptor of the output file (opened for writing and empty).
104
105 size_t itemsize The size of each item in the file that needs sorting.
106
107 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
108 each item before they have been sorted. The second parameter is the number of objects
109 previously read from the input file. If the function returns 1 then the object is kept
110 and it is sorted, otherwise it is ignored.
111
112 int (*compare_function)(const void*, const void*) The comparison function. This is identical
113 to qsort if the data to be sorted is an array of things not pointers.
114
115 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
116 each item after they have been sorted. The second parameter is the number of objects
117 already written to the output file. If the function returns 1 then the object is written
118 to the output file., otherwise it is ignored.
119 ++++++++++++++++++++++++++++++++++++++*/
120
121 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t),
122 int (*compare_function)(const void*,const void*),
123 int (*post_sort_function)(void*,index_t))
124 {
125 int *fds=NULL,*heap=NULL;
126 int nfiles=0,ndata=0;
127 index_t count_out=0,count_in=0,total=0;
128 size_t nitems;
129 void *data,**datap;
130 thread_data *threads;
131 size_t item;
132 int i,more=1;
133 #if defined(USE_PTHREADS) && USE_PTHREADS
134 int nthreads=0;
135 #endif
136
137 /* Allocate the RAM buffer and other bits */
138
139 nitems=SizeFileFD(fd_in)/itemsize;
140
141 if(nitems==0)
142 return(0);
143
144 if((nitems*(itemsize+sizeof(void*)))<option_filesort_ramsize)
145 nitems=1+nitems/option_filesort_threads;
146 else
147 nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
148
149 threads=(thread_data*)calloc(option_filesort_threads,sizeof(thread_data));
150
151 for(i=0;i<option_filesort_threads;i++)
152 {
153 threads[i].data=malloc(nitems*itemsize);
154 threads[i].datap=malloc(nitems*sizeof(void*));
155
156 log_malloc(threads[i].data,nitems*itemsize);
157 log_malloc(threads[i].datap,nitems*sizeof(void*));
158
159 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
160
161 threads[i].itemsize=itemsize;
162 threads[i].compare=compare_function;
163 }
164
165 /* Loop around, fill the buffer, sort the data and write a temporary file */
166
167 do
168 {
169 int thread=0;
170
171 #if defined(USE_PTHREADS) && USE_PTHREADS
172
173 if(option_filesort_threads>1)
174 {
175 /* Find a spare slot (one *must* be unused at all times) */
176
177 pthread_mutex_lock(&running_mutex);
178
179 for(thread=0;thread<option_filesort_threads;thread++)
180 if(!threads[thread].running)
181 break;
182
183 pthread_mutex_unlock(&running_mutex);
184 }
185
186 #endif
187
188 /* Read in the data and create pointers */
189
190 for(item=0;item<nitems;)
191 {
192 threads[thread].datap[item]=threads[thread].data+item*itemsize;
193
194 if(ReadFileBuffered(fd_in,threads[thread].datap[item],itemsize))
195 {
196 more=0;
197 break;
198 }
199
200 if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in))
201 {
202 item++;
203 total++;
204 }
205
206 count_in++;
207 }
208
209 threads[thread].n=item;
210
211 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
212
213 if(more==0 && total==0)
214 goto tidy_and_exit;
215
216 /* No new data read in this time round */
217
218 if(threads[thread].n==0)
219 break;
220
221 /* Sort the data pointers using a heap sort (potentially in a thread) */
222
223 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
224
225 #if defined(USE_PTHREADS) && USE_PTHREADS
226
227 /* Shortcut if only one file, don't write to disk */
228
229 if(more==0 && nfiles==0)
230 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
231 else if(option_filesort_threads>1)
232 {
233 pthread_mutex_lock(&running_mutex);
234
235 while(nthreads==(option_filesort_threads-1))
236 {
237 for(i=0;i<option_filesort_threads;i++)
238 if(threads[i].running==2)
239 {
240 pthread_join(threads[i].thread,NULL);
241 threads[i].running=0;
242 nthreads--;
243 }
244
245 if(nthreads==(option_filesort_threads-1))
246 pthread_cond_wait(&running_cond,&running_mutex);
247 }
248
249 threads[thread].running=1;
250
251 pthread_mutex_unlock(&running_mutex);
252
253 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
254
255 nthreads++;
256 }
257 else
258 filesort_fixed_heapsort_thread(&threads[thread]);
259
260 #else
261
262 /* Shortcut if only one file, don't write to disk */
263
264 if(more==0 && nfiles==0)
265 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
266 else
267 filesort_fixed_heapsort_thread(&threads[thread]);
268
269 #endif
270
271 nfiles++;
272 }
273 while(more);
274
275 /* Wait for all of the threads to finish */
276
277 #if defined(USE_PTHREADS) && USE_PTHREADS
278
279 while(option_filesort_threads>1 && nthreads)
280 {
281 pthread_mutex_lock(&running_mutex);
282
283 pthread_cond_wait(&running_cond,&running_mutex);
284
285 for(i=0;i<option_filesort_threads;i++)
286 if(threads[i].running==2)
287 {
288 pthread_join(threads[i].thread,NULL);
289 threads[i].running=0;
290 nthreads--;
291 }
292
293 pthread_mutex_unlock(&running_mutex);
294 }
295
296 #endif
297
298 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
299
300 if(nfiles==1)
301 {
302 for(item=0;item<threads[0].n;item++)
303 {
304 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
305 {
306 WriteFileBuffered(fd_out,threads[0].datap[item],itemsize);
307 count_out++;
308 }
309 }
310
311 DeleteFile(threads[0].filename);
312
313 goto tidy_and_exit;
314 }
315
316 /* Check that number of files is less than file size */
317
318 logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)");
319
320 /* Open all of the temporary files */
321
322 fds=(int*)malloc(nfiles*sizeof(int));
323
324 for(i=0;i<nfiles;i++)
325 {
326 char *filename=threads[0].filename;
327
328 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
329
330 fds[i]=ReOpenFileBuffered(filename);
331
332 DeleteFile(filename);
333 }
334
335 /* Perform an n-way merge using a binary heap */
336
337 heap=(int*)malloc((1+nfiles)*sizeof(int));
338
339 data =threads[0].data;
340 datap=threads[0].datap;
341
342 /* Fill the heap to start with */
343
344 for(i=0;i<nfiles;i++)
345 {
346 int index;
347
348 datap[i]=data+i*itemsize;
349
350 ReadFileBuffered(fds[i],datap[i],itemsize);
351
352 index=i+1;
353
354 heap[index]=i;
355
356 /* Bubble up the new value */
357
358 while(index>1)
359 {
360 int newindex;
361 int temp;
362
363 newindex=index/2;
364
365 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
366 break;
367
368 temp=heap[index];
369 heap[index]=heap[newindex];
370 heap[newindex]=temp;
371
372 index=newindex;
373 }
374 }
375
376 /* Repeatedly pull out the root of the heap and refill from the same file */
377
378 ndata=nfiles;
379
380 do
381 {
382 int index=1;
383
384 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
385 {
386 WriteFileBuffered(fd_out,datap[heap[index]],itemsize);
387 count_out++;
388 }
389
390 if(ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize))
391 {
392 heap[index]=heap[ndata];
393 ndata--;
394 }
395
396 /* Bubble down the new value */
397
398 while((2*index)<ndata)
399 {
400 int newindex;
401 int temp;
402
403 newindex=2*index;
404
405 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
406 newindex=newindex+1;
407
408 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
409 break;
410
411 temp=heap[newindex];
412 heap[newindex]=heap[index];
413 heap[index]=temp;
414
415 index=newindex;
416 }
417
418 if((2*index)==ndata)
419 {
420 int newindex;
421 int temp;
422
423 newindex=2*index;
424
425 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
426 ; /* break */
427 else
428 {
429 temp=heap[newindex];
430 heap[newindex]=heap[index];
431 heap[index]=temp;
432 }
433 }
434 }
435 while(ndata>0);
436
437 /* Tidy up */
438
439 tidy_and_exit:
440
441 if(fds)
442 {
443 for(i=0;i<nfiles;i++)
444 CloseFileBuffered(fds[i]);
445 free(fds);
446 }
447
448 if(heap)
449 free(heap);
450
451 for(i=0;i<option_filesort_threads;i++)
452 {
453 log_free(threads[i].data);
454 log_free(threads[i].datap);
455
456 free(threads[i].data);
457 free(threads[i].datap);
458
459 free(threads[i].filename);
460 }
461
462 free(threads);
463
464 return(count_out);
465 }
466
467
468 /*++++++++++++++++++++++++++++++++++++++
469 A function to sort the contents of a file of variable length objects (each
470 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
471
472 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
473 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
474 The individual sort steps and the merge step both use a "Heap sort"
475 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
476 if the data is already partially sorted.
477
478 index_t filesort_vary Returns the number of objects kept.
479
480 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
481
482 int fd_out The file descriptor of the output file (opened for writing and empty).
483
484 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
485 each item before they have been sorted. The second parameter is the number of objects
486 previously read from the input file. If the function returns 1 then the object is kept
487 and it is sorted, otherwise it is ignored.
488
489 int (*compare_function)(const void*, const void*) The comparison function. This is identical
490 to qsort if the data to be sorted is an array of things not pointers.
491
492 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
493 each item after they have been sorted. The second parameter is the number of objects
494 already written to the output file. If the function returns 1 then the object is written
495 to the output file., otherwise it is ignored.
496 ++++++++++++++++++++++++++++++++++++++*/
497
498 index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t),
499 int (*compare_function)(const void*,const void*),
500 int (*post_sort_function)(void*,index_t))
501 {
502 int *fds=NULL,*heap=NULL;
503 int nfiles=0,ndata=0;
504 index_t count_out=0,count_in=0,total=0;
505 size_t datasize;
506 FILESORT_VARINT nextitemsize,largestitemsize=0;
507 void *data,**datap;
508 thread_data *threads;
509 size_t item;
510 int i,more=1;
511 #if defined(USE_PTHREADS) && USE_PTHREADS
512 int nthreads=0;
513 #endif
514
515 /* Allocate the RAM buffer and other bits */
516
517 datasize=SizeFileFD(fd_in);
518
519 if(datasize==0)
520 return(0);
521
522 if((datasize*2)<option_filesort_ramsize) /* estimate that data and pointer are same size */
523 datasize=(datasize*2)/option_filesort_threads;
524 else
525 datasize=option_filesort_ramsize/option_filesort_threads;
526
527 threads=(thread_data*)calloc(option_filesort_threads,sizeof(thread_data));
528
529 for(i=0;i<option_filesort_threads;i++)
530 {
531 threads[i].data=malloc(datasize);
532 threads[i].datap=NULL;
533
534 log_malloc(threads[i].data,datasize);
535
536 threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
537
538 threads[i].compare=compare_function;
539 }
540
541 /* Loop around, fill the buffer, sort the data and write a temporary file */
542
543 if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
544 goto tidy_and_exit;
545
546 do
547 {
548 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
549 int thread=0;
550
551 #if defined(USE_PTHREADS) && USE_PTHREADS
552
553 if(option_filesort_threads>1)
554 {
555 /* Find a spare slot (one *must* be unused at all times) */
556
557 pthread_mutex_lock(&running_mutex);
558
559 for(thread=0;thread<option_filesort_threads;thread++)
560 if(!threads[thread].running)
561 break;
562
563 pthread_mutex_unlock(&running_mutex);
564 }
565
566 #endif
567
568 threads[thread].datap=threads[thread].data+datasize;
569
570 threads[thread].n=0;
571
572 /* Read in the data and create pointers */
573
574 while((ramused+FILESORT_VARSIZE+nextitemsize)<=(unsigned)((void*)threads[thread].datap-sizeof(void*)-threads[thread].data))
575 {
576 FILESORT_VARINT itemsize=nextitemsize;
577
578 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
579
580 ramused+=FILESORT_VARSIZE;
581
582 ReadFileBuffered(fd_in,threads[thread].data+ramused,itemsize);
583
584 if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in))
585 {
586 *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
587
588 if(itemsize>largestitemsize)
589 largestitemsize=itemsize;
590
591 ramused+=itemsize;
592
593 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN);
594 ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
595
596 total++;
597 threads[thread].n++;
598 }
599 else
600 ramused-=FILESORT_VARSIZE;
601
602 count_in++;
603
604 if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE))
605 {
606 more=0;
607 break;
608 }
609 }
610
611 /* No new data read in this time round */
612
613 if(threads[thread].n==0)
614 break;
615
616 /* Sort the data pointers using a heap sort (potentially in a thread) */
617
618 if(more==0 && nfiles==0)
619 threads[thread].filename[0]=0;
620 else
621 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
622
623 #if defined(USE_PTHREADS) && USE_PTHREADS
624
625 /* Shortcut if only one file, don't write to disk */
626
627 if(more==0 && nfiles==0)
628 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
629 else if(option_filesort_threads>1)
630 {
631 pthread_mutex_lock(&running_mutex);
632
633 while(nthreads==(option_filesort_threads-1))
634 {
635 for(i=0;i<option_filesort_threads;i++)
636 if(threads[i].running==2)
637 {
638 pthread_join(threads[i].thread,NULL);
639 threads[i].running=0;
640 nthreads--;
641 }
642
643 if(nthreads==(option_filesort_threads-1))
644 pthread_cond_wait(&running_cond,&running_mutex);
645 }
646
647 threads[thread].running=1;
648
649 pthread_mutex_unlock(&running_mutex);
650
651 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
652
653 nthreads++;
654 }
655 else
656 filesort_vary_heapsort_thread(&threads[thread]);
657
658 #else
659
660 /* Shortcut if only one file, don't write to disk */
661
662 if(more==0 && nfiles==0)
663 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
664 else
665 filesort_vary_heapsort_thread(&threads[thread]);
666
667 #endif
668
669 nfiles++;
670 }
671 while(more);
672
673 /* Wait for all of the threads to finish */
674
675 #if defined(USE_PTHREADS) && USE_PTHREADS
676
677 while(option_filesort_threads>1 && nthreads)
678 {
679 pthread_mutex_lock(&running_mutex);
680
681 pthread_cond_wait(&running_cond,&running_mutex);
682
683 for(i=0;i<option_filesort_threads;i++)
684 if(threads[i].running==2)
685 {
686 pthread_join(threads[i].thread,NULL);
687 threads[i].running=0;
688 nthreads--;
689 }
690
691 pthread_mutex_unlock(&running_mutex);
692 }
693
694 #endif
695
696 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
697
698 if(nfiles==1)
699 {
700 for(item=0;item<threads[0].n;item++)
701 {
702 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
703 {
704 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[item]-FILESORT_VARSIZE);
705
706 WriteFileBuffered(fd_out,threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
707 count_out++;
708 }
709 }
710
711 DeleteFile(threads[0].filename);
712
713 goto tidy_and_exit;
714 }
715
716 /* Check that number of files is less than file size */
717
718 largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN);
719
720 logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize),"Too many temporary files (use more sorting memory?)");
721
722 /* Open all of the temporary files */
723
724 fds=(int*)malloc(nfiles*sizeof(int));
725
726 for(i=0;i<nfiles;i++)
727 {
728 char *filename=threads[0].filename;
729
730 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
731
732 fds[i]=ReOpenFileBuffered(filename);
733
734 DeleteFile(filename);
735 }
736
737 /* Perform an n-way merge using a binary heap */
738
739 heap=(int*)malloc((1+nfiles)*sizeof(int));
740
741 data=threads[0].data;
742 datap=data+datasize-nfiles*sizeof(void*);
743
744 /* Fill the heap to start with */
745
746 for(i=0;i<nfiles;i++)
747 {
748 int index;
749 FILESORT_VARINT itemsize;
750
751 datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize;
752
753 ReadFileBuffered(fds[i],&itemsize,FILESORT_VARSIZE);
754
755 *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize;
756
757 ReadFileBuffered(fds[i],datap[i],itemsize);
758
759 index=i+1;
760
761 heap[index]=i;
762
763 /* Bubble up the new value */
764
765 while(index>1)
766 {
767 int newindex;
768 int temp;
769
770 newindex=index/2;
771
772 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
773 break;
774
775 temp=heap[index];
776 heap[index]=heap[newindex];
777 heap[newindex]=temp;
778
779 index=newindex;
780 }
781 }
782
783 /* Repeatedly pull out the root of the heap and refill from the same file */
784
785 ndata=nfiles;
786
787 do
788 {
789 int index=1;
790 FILESORT_VARINT itemsize;
791
792 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
793 {
794 itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE);
795
796 WriteFileBuffered(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
797 count_out++;
798 }
799
800 if(ReadFileBuffered(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
801 {
802 heap[index]=heap[ndata];
803 ndata--;
804 }
805 else
806 {
807 *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
808
809 ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize);
810 }
811
812 /* Bubble down the new value */
813
814 while((2*index)<ndata)
815 {
816 int newindex;
817 int temp;
818
819 newindex=2*index;
820
821 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
822 newindex=newindex+1;
823
824 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
825 break;
826
827 temp=heap[newindex];
828 heap[newindex]=heap[index];
829 heap[index]=temp;
830
831 index=newindex;
832 }
833
834 if((2*index)==ndata)
835 {
836 int newindex;
837 int temp;
838
839 newindex=2*index;
840
841 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
842 ; /* break */
843 else
844 {
845 temp=heap[newindex];
846 heap[newindex]=heap[index];
847 heap[index]=temp;
848 }
849 }
850 }
851 while(ndata>0);
852
853 /* Tidy up */
854
855 tidy_and_exit:
856
857 if(fds)
858 {
859 for(i=0;i<nfiles;i++)
860 CloseFileBuffered(fds[i]);
861 free(fds);
862 }
863
864 if(heap)
865 free(heap);
866
867 for(i=0;i<option_filesort_threads;i++)
868 {
869 log_free(threads[i].data);
870
871 free(threads[i].data);
872
873 free(threads[i].filename);
874 }
875
876 free(threads);
877
878 return(count_out);
879 }
880
881
882 /*++++++++++++++++++++++++++++++++++++++
883 A wrapper function that can be run in a thread for fixed data.
884
885 void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
886
887 thread_data *thread The data to be processed in this thread.
888 ++++++++++++++++++++++++++++++++++++++*/
889
890 static void *filesort_fixed_heapsort_thread(thread_data *thread)
891 {
892 int fd;
893 size_t item;
894
895 /* Sort the data pointers using a heap sort */
896
897 filesort_heapsort(thread->datap,thread->n,thread->compare);
898
899 /* Create a temporary file and write the result */
900
901 fd=OpenFileBufferedNew(thread->filename);
902
903 for(item=0;item<thread->n;item++)
904 WriteFileBuffered(fd,thread->datap[item],thread->itemsize);
905
906 CloseFileBuffered(fd);
907
908 #if defined(USE_PTHREADS) && USE_PTHREADS
909
910 if(option_filesort_threads>1)
911 {
912 pthread_mutex_lock(&running_mutex);
913
914 thread->running=2;
915
916 pthread_cond_signal(&running_cond);
917
918 pthread_mutex_unlock(&running_mutex);
919 }
920
921 #endif
922
923 return(NULL);
924 }
925
926
927 /*++++++++++++++++++++++++++++++++++++++
928 A wrapper function that can be run in a thread for variable data.
929
930 void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
931
932 thread_data *thread The data to be processed in this thread.
933 ++++++++++++++++++++++++++++++++++++++*/
934
935 static void *filesort_vary_heapsort_thread(thread_data *thread)
936 {
937 int fd;
938 size_t item;
939
940 /* Sort the data pointers using a heap sort */
941
942 filesort_heapsort(thread->datap,thread->n,thread->compare);
943
944 /* Create a temporary file and write the result */
945
946 fd=OpenFileBufferedNew(thread->filename);
947
948 for(item=0;item<thread->n;item++)
949 {
950 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[item]-FILESORT_VARSIZE);
951
952 WriteFileBuffered(fd,thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
953 }
954
955 CloseFileBuffered(fd);
956
957 #if defined(USE_PTHREADS) && USE_PTHREADS
958
959 if(option_filesort_threads>1)
960 {
961 pthread_mutex_lock(&running_mutex);
962
963 thread->running=2;
964
965 pthread_cond_signal(&running_cond);
966
967 pthread_mutex_unlock(&running_mutex);
968 }
969
970 #endif
971
972 return(NULL);
973 }
974
975
976 /*++++++++++++++++++++++++++++++++++++++
977 A function to sort an array of pointers efficiently.
978
979 The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
980 in particular, this is good because it can operate in-place and doesn't
981 allocate more memory like using qsort() does.
982
983 void **datap A pointer to the array of pointers to sort.
984
985 size_t nitems The number of items of data to sort.
986
987 int (*compare_function)(const void*, const void*) The comparison function. This is identical
988 to qsort if the data to be sorted is an array of things not pointers.
989 ++++++++++++++++++++++++++++++++++++++*/
990
991 void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*))
992 {
993 void **datap1=&datap[-1];
994 size_t item;
995
996 /* Fill the heap by pretending to insert the data that is already there */
997
998 for(item=2;item<=nitems;item++)
999 {
1000 size_t index=item;
1001
1002 /* Bubble up the new value (upside-down, put largest at top) */
1003
1004 while(index>1)
1005 {
1006 int newindex;
1007 void *temp;
1008
1009 newindex=index/2;
1010
1011 if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */
1012 break;
1013
1014 temp=datap1[index];
1015 datap1[index]=datap1[newindex];
1016 datap1[newindex]=temp;
1017
1018 index=newindex;
1019 }
1020 }
1021
1022 /* Repeatedly pull out the root of the heap and swap with the bottom item */
1023
1024 for(item=nitems;item>1;item--)
1025 {
1026 size_t index=1;
1027 void *temp;
1028
1029 temp=datap1[index];
1030 datap1[index]=datap1[item];
1031 datap1[item]=temp;
1032
1033 /* Bubble down the new value (upside-down, put largest at top) */
1034
1035 while((2*index)<(item-1))
1036 {
1037 int newindex;
1038 void *temp;
1039
1040 newindex=2*index;
1041
1042 if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */
1043 newindex=newindex+1;
1044
1045 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1046 break;
1047
1048 temp=datap1[newindex];
1049 datap1[newindex]=datap1[index];
1050 datap1[index]=temp;
1051
1052 index=newindex;
1053 }
1054
1055 if((2*index)==(item-1))
1056 {
1057 int newindex;
1058 void *temp;
1059
1060 newindex=2*index;
1061
1062 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1063 ; /* break */
1064 else
1065 {
1066 temp=datap1[newindex];
1067 datap1[newindex]=datap1[index];
1068 datap1[index]=temp;
1069 }
1070 }
1071 }
1072 }

Properties

Name Value
cvs:description Functions to perform sorting.