Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Annotation of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1021 - (hide annotations) (download) (as text)
Mon Jul 16 17:52:03 2012 UTC (12 years, 8 months ago) by amb
File MIME type: text/x-csrc
File size: 24449 byte(s)
Restore the shortcut that doesn't write the data to a temporary file if it all
can be sorted in one go.  This removes the slowdown with the multi-threaded code
even when running with no threads.

1 amb 269 /***************************************
2     Merge sort functions.
3    
4     Part of the Routino routing software.
5     ******************/ /******************
6 amb 948 This file Copyright 2009-2012 Andrew M. Bishop
7 amb 269
8     This program is free software: you can redistribute it and/or modify
9     it under the terms of the GNU Affero General Public License as published by
10     the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12    
13     This program is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16     GNU Affero General Public License for more details.
17    
18     You should have received a copy of the GNU Affero General Public License
19     along with this program. If not, see <http://www.gnu.org/licenses/>.
20     ***************************************/
21    
22    
23     #include <stdio.h>
24     #include <stdlib.h>
25     #include <string.h>
26     #include <assert.h>
27    
28 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
29     #include <pthread.h>
30     #endif
31    
32 amb 532 #include "types.h"
33    
34 amb 449 #include "files.h"
35 amb 532 #include "sorting.h"
36 amb 269
37    
38 amb 680 /* Global variables */
39 amb 269
40 amb 289 /*+ The command line '--tmpdir' option or its default value. +*/
41 amb 284 extern char *option_tmpdirname;
42 amb 269
43 amb 358 /*+ The amount of RAM to use for filesorting. +*/
44     extern size_t option_filesort_ramsize;
45 amb 269
46 amb 991 /*+ The number of filesorting threads allowed. +*/
47     extern int option_filesort_threads;
48 amb 358
49 amb 991
50     /* Thread data type definitions */
51    
52     /*+ A data type for holding data for a thread. +*/
53     typedef struct _thread_data
54     {
55     pthread_t thread; /*+ The thread identifier. +*/
56    
57     int running; /*+ A flag indicating the current state of the thread. +*/
58    
59     void *data; /*+ The main data array. +*/
60     void **datap; /*+ An array of pointers to the data objects. +*/
61     size_t n; /*+ The number of pointers. +*/
62    
63     char *filename; /*+ The name of the file to write the results to. +*/
64    
65     size_t itemsize; /*+ The size of each item. +*/
66     int (*compare)(const void*,const void*); /*+ The comparison function. +*/
67     }
68     thread_data;
69    
70 amb 996 /* Thread variables */
71 amb 991
72 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
73    
74     static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
75     static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
76    
77     #endif
78    
79 amb 991 /* Thread helper functions */
80    
81     static void *filesort_fixed_heapsort_thread(thread_data *thread);
82     static void *filesort_vary_heapsort_thread(thread_data *thread);
83    
84    
85 amb 269 /*++++++++++++++++++++++++++++++++++++++
86 amb 310 A function to sort the contents of a file of fixed length objects using a
87     limited amount of RAM.
88 amb 269
89     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
90     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
91     The individual sort steps and the merge step both use a "Heap sort"
92     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
93     if the data is already partially sorted.
94    
95 amb 948 index_t filesort_fixed Returns the number of objects kept.
96    
97 amb 269 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
98    
99     int fd_out The file descriptor of the output file (opened for writing and empty).
100    
101     size_t itemsize The size of each item in the file that needs sorting.
102    
103     int (*compare)(const void*, const void*) The comparison function (identical to qsort if the
104     data to be sorted is an array of things not pointers).
105    
106 amb 948 int (*keep)(void *,index_t) If non-NULL then this function is called for each item, if it
107     returns 1 then the object is kept and written to the output file.
108 amb 269 ++++++++++++++++++++++++++++++++++++++*/
109    
110 amb 948 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*compare)(const void*,const void*),int (*keep)(void*,index_t))
111 amb 269 {
112     int *fds=NULL,*heap=NULL;
113     int nfiles=0,ndata=0;
114     index_t count=0,total=0;
115 amb 991 size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
116     void *data,**datap;
117     thread_data *threads;
118 amb 269 int i,more=1;
119 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
120     int nthreads=0;
121     #endif
122 amb 269
123     /* Allocate the RAM buffer and other bits */
124    
125 amb 991 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
126 amb 269
127 amb 991 for(i=0;i<option_filesort_threads;i++)
128     {
129     threads[i].running=0;
130 amb 269
131 amb 991 threads[i].data=malloc(nitems*itemsize);
132     threads[i].datap=malloc(nitems*sizeof(void*));
133    
134     threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
135    
136     threads[i].itemsize=itemsize;
137     threads[i].compare=compare;
138     }
139    
140 amb 269 /* Loop around, fill the buffer, sort the data and write a temporary file */
141    
142     do
143     {
144 amb 991 int thread=0;
145 amb 269
146 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
147    
148 amb 1020 if(option_filesort_threads>1)
149     {
150     /* Find a spare slot (one *must* be unused at all times) */
151 amb 991
152 amb 1020 pthread_mutex_lock(&running_mutex);
153 amb 996
154 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
155     if(!threads[thread].running)
156     break;
157 amb 991
158 amb 1020 pthread_mutex_unlock(&running_mutex);
159     }
160 amb 996
161 amb 991 #endif
162    
163 amb 269 /* Read in the data and create pointers */
164    
165     for(i=0;i<nitems;i++)
166     {
167 amb 991 threads[thread].datap[i]=threads[thread].data+i*itemsize;
168 amb 269
169 amb 991 if(ReadFile(fd_in,threads[thread].datap[i],itemsize))
170 amb 269 {
171     more=0;
172     break;
173     }
174    
175     total++;
176     }
177    
178 amb 991 threads[thread].n=i;
179 amb 269
180 amb 1020 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
181 amb 546
182 amb 1020 if(more==0 && total==0)
183 amb 546 goto tidy_and_exit;
184    
185 amb 543 /* No new data read in this time round */
186    
187 amb 991 if(threads[thread].n==0)
188 amb 269 break;
189    
190 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
191 amb 269
192 amb 991 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
193 amb 269
194 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
195 amb 269
196 amb 1021 /* Shortcut if only one file, don't write to disk */
197    
198     if(more==0 && nfiles==0)
199     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
200     else if(option_filesort_threads>1)
201 amb 269 {
202 amb 996 pthread_mutex_lock(&running_mutex);
203    
204 amb 991 while(nthreads==(option_filesort_threads-1))
205 amb 269 {
206 amb 991 for(i=0;i<option_filesort_threads;i++)
207     if(threads[i].running==2)
208     {
209     pthread_join(threads[i].thread,NULL);
210     threads[i].running=0;
211     nthreads--;
212     }
213    
214     if(nthreads==(option_filesort_threads-1))
215 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
216 amb 269 }
217    
218 amb 991 threads[thread].running=1;
219 amb 269
220 amb 996 pthread_mutex_unlock(&running_mutex);
221    
222 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
223 amb 269
224 amb 991 nthreads++;
225     }
226 amb 1020 else
227 amb 1021 filesort_fixed_heapsort_thread(&threads[thread]);
228 amb 269
229 amb 991 #else
230 amb 269
231 amb 1021 /* Shortcut if only one file, don't write to disk */
232    
233     if(more==0 && nfiles==0)
234     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
235     else
236 amb 1020 filesort_fixed_heapsort_thread(&threads[thread]);
237 amb 269
238 amb 991 #endif
239 amb 269
240     nfiles++;
241     }
242     while(more);
243    
244 amb 991 /* Wait for all of the threads to finish */
245 amb 269
246 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
247    
248 amb 1020 while(option_filesort_threads>1 && nthreads)
249 amb 991 {
250 amb 996 pthread_mutex_lock(&running_mutex);
251 amb 991
252 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
253    
254 amb 991 for(i=0;i<option_filesort_threads;i++)
255     if(threads[i].running==2)
256     {
257     pthread_join(threads[i].thread,NULL);
258     threads[i].running=0;
259     nthreads--;
260     }
261    
262 amb 996 pthread_mutex_unlock(&running_mutex);
263 amb 991 }
264    
265     #endif
266    
267     /* Shortcut if only one file, lucky for us we still have the data in RAM) */
268    
269 amb 269 if(nfiles==1)
270     {
271 amb 991 for(i=0;i<threads[0].n;i++)
272 amb 269 {
273 amb 991 if(!keep || keep(threads[0].datap[i],count))
274 amb 274 {
275 amb 991 WriteFile(fd_out,threads[0].datap[i],itemsize);
276 amb 274 count++;
277     }
278 amb 269 }
279    
280 amb 991 DeleteFile(threads[0].filename);
281 amb 269
282 amb 283 goto tidy_and_exit;
283 amb 269 }
284    
285     /* Check that number of files is less than file size */
286    
287     assert(nfiles<nitems);
288    
289     /* Open all of the temporary files */
290    
291     fds=(int*)malloc(nfiles*sizeof(int));
292    
293     for(i=0;i<nfiles;i++)
294     {
295 amb 991 char *filename=threads[0].filename;
296    
297 amb 284 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
298 amb 269
299     fds[i]=ReOpenFile(filename);
300    
301     DeleteFile(filename);
302     }
303    
304     /* Perform an n-way merge using a binary heap */
305    
306 amb 875 heap=(int*)malloc((1+nfiles)*sizeof(int));
307 amb 269
308 amb 991 data =threads[0].data;
309     datap=threads[0].datap;
310    
311 amb 269 /* Fill the heap to start with */
312    
313     for(i=0;i<nfiles;i++)
314     {
315     int index;
316    
317     datap[i]=data+i*itemsize;
318    
319     ReadFile(fds[i],datap[i],itemsize);
320    
321 amb 875 index=i+1;
322    
323 amb 867 heap[index]=i;
324 amb 269
325     /* Bubble up the new value */
326    
327 amb 875 while(index>1)
328 amb 269 {
329     int newindex;
330     int temp;
331    
332 amb 875 newindex=index/2;
333 amb 269
334 amb 869 if(compare(datap[heap[index]],datap[heap[newindex]])>=0)
335     break;
336    
337 amb 269 temp=heap[index];
338     heap[index]=heap[newindex];
339     heap[newindex]=temp;
340    
341     index=newindex;
342     }
343     }
344    
345     /* Repeatedly pull out the root of the heap and refill from the same file */
346    
347     ndata=nfiles;
348    
349     do
350     {
351 amb 875 int index=1;
352 amb 269
353 amb 948 if(!keep || keep(datap[heap[index]],count))
354 amb 274 {
355 amb 867 WriteFile(fd_out,datap[heap[index]],itemsize);
356 amb 274 count++;
357     }
358 amb 269
359 amb 867 if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize))
360 amb 269 {
361 amb 875 heap[index]=heap[ndata];
362 amb 870 ndata--;
363 amb 269 }
364    
365     /* Bubble down the new value */
366    
367 amb 875 while((2*index)<ndata)
368 amb 269 {
369 amb 873 int newindex;
370 amb 269 int temp;
371    
372 amb 875 newindex=2*index;
373 amb 269
374 amb 875 if(compare(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
375 amb 873 newindex=newindex+1;
376 amb 869
377     if(compare(datap[heap[index]],datap[heap[newindex]])<=0)
378     break;
379    
380 amb 269 temp=heap[newindex];
381     heap[newindex]=heap[index];
382     heap[index]=temp;
383    
384     index=newindex;
385     }
386    
387 amb 875 if((2*index)==ndata)
388 amb 269 {
389     int newindex;
390     int temp;
391    
392 amb 875 newindex=2*index;
393 amb 269
394 amb 869 if(compare(datap[heap[index]],datap[heap[newindex]])<=0)
395     ; /* break */
396     else
397     {
398     temp=heap[newindex];
399     heap[newindex]=heap[index];
400     heap[index]=temp;
401     }
402 amb 269 }
403     }
404     while(ndata>0);
405    
406     /* Tidy up */
407    
408 amb 283 tidy_and_exit:
409 amb 269
410 amb 283 if(fds)
411     {
412     for(i=0;i<nfiles;i++)
413     CloseFile(fds[i]);
414     free(fds);
415     }
416    
417     if(heap)
418     free(heap);
419    
420 amb 991 for(i=0;i<option_filesort_threads;i++)
421     {
422     free(threads[i].data);
423     free(threads[i].datap);
424 amb 948
425 amb 991 free(threads[i].filename);
426     }
427    
428 amb 948 return(count);
429 amb 269 }
430    
431    
432     /*++++++++++++++++++++++++++++++++++++++
433 amb 310 A function to sort the contents of a file of variable length objects (each
434 amb 867 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
435 amb 310
436     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
437     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
438     The individual sort steps and the merge step both use a "Heap sort"
439     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
440     if the data is already partially sorted.
441    
442 amb 948 index_t filesort_vary Returns the number of objects kept.
443    
444 amb 310 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
445    
446     int fd_out The file descriptor of the output file (opened for writing and empty).
447    
448     int (*compare)(const void*, const void*) The comparison function (identical to qsort if the
449     data to be sorted is an array of things not pointers).
450    
451 amb 948 int (*keep)(void *,index_t) If non-NULL then this function is called for each item, if it
452     returns 1 then the object is kept and written to the output file.
453 amb 310 ++++++++++++++++++++++++++++++++++++++*/
454    
455 amb 948 index_t filesort_vary(int fd_in,int fd_out,int (*compare)(const void*,const void*),int (*keep)(void*,index_t))
456 amb 310 {
457     int *fds=NULL,*heap=NULL;
458     int nfiles=0,ndata=0;
459     index_t count=0,total=0;
460 amb 991 size_t datasize=option_filesort_ramsize/option_filesort_threads;
461 amb 311 FILESORT_VARINT nextitemsize,largestitemsize=0;
462 amb 991 void *data,**datap;
463     thread_data *threads;
464 amb 310 int i,more=1;
465 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
466     int nthreads=0;
467     #endif
468 amb 310
469     /* Allocate the RAM buffer and other bits */
470    
471 amb 991 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
472 amb 310
473 amb 991 for(i=0;i<option_filesort_threads;i++)
474     {
475     threads[i].running=0;
476 amb 310
477 amb 991 threads[i].data=malloc(datasize);
478     threads[i].datap=NULL;
479    
480     threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
481    
482     threads[i].compare=compare;
483     }
484    
485 amb 310 /* Loop around, fill the buffer, sort the data and write a temporary file */
486    
487 amb 311 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
488 amb 310 goto tidy_and_exit;
489    
490     do
491     {
492 amb 311 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
493 amb 991 int thread=0;
494 amb 310
495 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
496 amb 310
497 amb 1020 if(option_filesort_threads>1)
498     {
499     /* Find a spare slot (one *must* be unused at all times) */
500 amb 991
501 amb 1020 pthread_mutex_lock(&running_mutex);
502 amb 996
503 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
504     if(!threads[thread].running)
505     break;
506 amb 991
507 amb 1020 pthread_mutex_unlock(&running_mutex);
508     }
509 amb 996
510 amb 991 #endif
511    
512     threads[thread].datap=threads[thread].data+datasize;
513    
514     threads[thread].n=0;
515    
516 amb 310 /* Read in the data and create pointers */
517    
518 amb 991 while((ramused+FILESORT_VARSIZE+nextitemsize)<=((void*)threads[thread].datap-sizeof(void*)-threads[thread].data))
519 amb 310 {
520 amb 311 FILESORT_VARINT itemsize=nextitemsize;
521 amb 310
522     if(itemsize>largestitemsize)
523     largestitemsize=itemsize;
524    
525 amb 991 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
526 amb 310
527 amb 311 ramused+=FILESORT_VARSIZE;
528 amb 310
529 amb 991 ReadFile(fd_in,threads[thread].data+ramused,itemsize);
530 amb 310
531 amb 991 *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
532 amb 310
533 amb 311 ramused+=itemsize;
534    
535     ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN);
536     ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
537    
538 amb 310 total++;
539 amb 991 threads[thread].n++;
540 amb 310
541 amb 311 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE))
542 amb 310 {
543     more=0;
544     break;
545     }
546     }
547    
548 amb 543 /* No new data read in this time round */
549    
550 amb 991 if(threads[thread].n==0)
551 amb 310 break;
552    
553 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
554 amb 310
555 amb 1021 if(more==0 && nfiles==0)
556     threads[thread].filename[0]=0;
557     else
558     sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
559 amb 310
560 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
561 amb 310
562 amb 1021 /* Shortcut if only one file, don't write to disk */
563    
564     if(more==0 && nfiles==0)
565     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
566     else if(option_filesort_threads>1)
567 amb 310 {
568 amb 996 pthread_mutex_lock(&running_mutex);
569    
570 amb 991 while(nthreads==(option_filesort_threads-1))
571 amb 310 {
572 amb 991 for(i=0;i<option_filesort_threads;i++)
573     if(threads[i].running==2)
574     {
575     pthread_join(threads[i].thread,NULL);
576     threads[i].running=0;
577     nthreads--;
578     }
579    
580     if(nthreads==(option_filesort_threads-1))
581 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
582 amb 310 }
583    
584 amb 991 threads[thread].running=1;
585    
586 amb 996 pthread_mutex_unlock(&running_mutex);
587    
588 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
589    
590     nthreads++;
591 amb 310 }
592 amb 1020 else
593 amb 1021 filesort_vary_heapsort_thread(&threads[thread]);
594 amb 310
595 amb 991 #else
596 amb 310
597 amb 1021 /* Shortcut if only one file, don't write to disk */
598    
599     if(more==0 && nfiles==0)
600     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
601     else
602 amb 1020 filesort_vary_heapsort_thread(&threads[thread]);
603 amb 310
604 amb 991 #endif
605 amb 310
606 amb 991 nfiles++;
607     }
608     while(more);
609    
610     /* Wait for all of the threads to finish */
611    
612     #if defined(USE_PTHREADS) && USE_PTHREADS
613    
614 amb 1020 while(option_filesort_threads>1 && nthreads)
615 amb 991 {
616 amb 996 pthread_mutex_lock(&running_mutex);
617 amb 991
618 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
619    
620 amb 991 for(i=0;i<option_filesort_threads;i++)
621     if(threads[i].running==2)
622     {
623     pthread_join(threads[i].thread,NULL);
624     threads[i].running=0;
625     nthreads--;
626     }
627    
628 amb 996 pthread_mutex_unlock(&running_mutex);
629 amb 991 }
630    
631     #endif
632    
633     /* Shortcut if only one file, lucky for us we still have the data in RAM) */
634    
635     if(nfiles==1)
636     {
637     for(i=0;i<threads[0].n;i++)
638 amb 310 {
639 amb 991 if(!keep || keep(threads[0].datap[i],count))
640     {
641     FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[i]-FILESORT_VARSIZE);
642 amb 310
643 amb 991 WriteFile(fd_out,threads[0].datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
644     count++;
645     }
646 amb 310 }
647    
648 amb 991 DeleteFile(threads[0].filename);
649 amb 310
650 amb 991 goto tidy_and_exit;
651 amb 310 }
652    
653     /* Check that number of files is less than file size */
654    
655 amb 311 largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN);
656 amb 310
657 amb 991 assert(nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize));
658 amb 310
659     /* Open all of the temporary files */
660    
661     fds=(int*)malloc(nfiles*sizeof(int));
662    
663     for(i=0;i<nfiles;i++)
664     {
665 amb 991 char *filename=threads[0].filename;
666    
667 amb 310 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
668    
669     fds[i]=ReOpenFile(filename);
670    
671     DeleteFile(filename);
672     }
673    
674     /* Perform an n-way merge using a binary heap */
675    
676 amb 875 heap=(int*)malloc((1+nfiles)*sizeof(int));
677 amb 310
678 amb 991 data=threads[0].data;
679     datap=data+datasize-nfiles*sizeof(void*);
680 amb 310
681     /* Fill the heap to start with */
682    
683     for(i=0;i<nfiles;i++)
684     {
685     int index;
686 amb 311 FILESORT_VARINT itemsize;
687 amb 310
688 amb 311 datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize;
689 amb 310
690 amb 311 ReadFile(fds[i],&itemsize,FILESORT_VARSIZE);
691 amb 310
692 amb 311 *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize;
693 amb 310
694     ReadFile(fds[i],datap[i],itemsize);
695    
696 amb 875 index=i+1;
697    
698 amb 867 heap[index]=i;
699 amb 310
700     /* Bubble up the new value */
701    
702 amb 875 while(index>1)
703 amb 310 {
704     int newindex;
705     int temp;
706    
707 amb 875 newindex=index/2;
708 amb 310
709 amb 869 if(compare(datap[heap[index]],datap[heap[newindex]])>=0)
710     break;
711    
712 amb 310 temp=heap[index];
713     heap[index]=heap[newindex];
714     heap[newindex]=temp;
715    
716     index=newindex;
717     }
718     }
719    
720     /* Repeatedly pull out the root of the heap and refill from the same file */
721    
722     ndata=nfiles;
723    
724     do
725     {
726 amb 875 int index=1;
727 amb 311 FILESORT_VARINT itemsize;
728 amb 310
729 amb 948 if(!keep || keep(datap[heap[index]],count))
730 amb 310 {
731 amb 867 itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE);
732 amb 310
733 amb 867 WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
734 amb 310 count++;
735     }
736    
737 amb 867 if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
738 amb 310 {
739 amb 875 heap[index]=heap[ndata];
740 amb 870 ndata--;
741 amb 310 }
742     else
743     {
744 amb 867 *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
745 amb 310
746 amb 867 ReadFile(fds[heap[index]],datap[heap[index]],itemsize);
747 amb 310 }
748    
749     /* Bubble down the new value */
750    
751 amb 875 while((2*index)<ndata)
752 amb 310 {
753 amb 873 int newindex;
754 amb 310 int temp;
755    
756 amb 875 newindex=2*index;
757 amb 310
758 amb 875 if(compare(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
759 amb 873 newindex=newindex+1;
760 amb 869
761     if(compare(datap[heap[index]],datap[heap[newindex]])<=0)
762     break;
763    
764 amb 310 temp=heap[newindex];
765     heap[newindex]=heap[index];
766     heap[index]=temp;
767    
768     index=newindex;
769     }
770    
771 amb 875 if((2*index)==ndata)
772 amb 310 {
773     int newindex;
774     int temp;
775    
776 amb 875 newindex=2*index;
777 amb 310
778 amb 869 if(compare(datap[heap[index]],datap[heap[newindex]])<=0)
779     ; /* break */
780     else
781     {
782     temp=heap[newindex];
783     heap[newindex]=heap[index];
784     heap[index]=temp;
785     }
786 amb 310 }
787     }
788     while(ndata>0);
789    
790     /* Tidy up */
791    
792     tidy_and_exit:
793    
794     if(fds)
795     {
796     for(i=0;i<nfiles;i++)
797     CloseFile(fds[i]);
798     free(fds);
799     }
800    
801     if(heap)
802     free(heap);
803    
804 amb 991 for(i=0;i<option_filesort_threads;i++)
805     {
806     free(threads[i].data);
807 amb 948
808 amb 991 free(threads[i].filename);
809     }
810    
811 amb 948 return(count);
812 amb 310 }
813    
814    
815     /*++++++++++++++++++++++++++++++++++++++
816 amb 991 A wrapper function that can be run in a thread for fixed data.
817    
818     void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
819    
820     thread_data *thread The data to be processed in this thread.
821     ++++++++++++++++++++++++++++++++++++++*/
822    
823     static void *filesort_fixed_heapsort_thread(thread_data *thread)
824     {
825     int fd,i;
826    
827     /* Sort the data pointers using a heap sort */
828    
829     filesort_heapsort(thread->datap,thread->n,thread->compare);
830    
831     /* Create a temporary file and write the result */
832    
833     fd=OpenFileNew(thread->filename);
834    
835     for(i=0;i<thread->n;i++)
836     WriteFile(fd,thread->datap[i],thread->itemsize);
837    
838     CloseFile(fd);
839    
840 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
841    
842 amb 1020 if(option_filesort_threads>1)
843     {
844     pthread_mutex_lock(&running_mutex);
845 amb 996
846 amb 1020 thread->running=2;
847 amb 991
848 amb 1020 pthread_cond_signal(&running_cond);
849 amb 996
850 amb 1020 pthread_mutex_unlock(&running_mutex);
851     }
852 amb 996
853     #endif
854    
855 amb 991 return(NULL);
856     }
857    
858    
859     /*++++++++++++++++++++++++++++++++++++++
860     A wrapper function that can be run in a thread for variable data.
861    
862     void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
863    
864     thread_data *thread The data to be processed in this thread.
865     ++++++++++++++++++++++++++++++++++++++*/
866    
867     static void *filesort_vary_heapsort_thread(thread_data *thread)
868     {
869     int fd,i;
870    
871     /* Sort the data pointers using a heap sort */
872    
873     filesort_heapsort(thread->datap,thread->n,thread->compare);
874    
875     /* Create a temporary file and write the result */
876    
877     fd=OpenFileNew(thread->filename);
878    
879     for(i=0;i<thread->n;i++)
880     {
881     FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[i]-FILESORT_VARSIZE);
882    
883     WriteFile(fd,thread->datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
884     }
885    
886     CloseFile(fd);
887    
888 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
889    
890 amb 1020 if(option_filesort_threads>1)
891     {
892     pthread_mutex_lock(&running_mutex);
893 amb 996
894 amb 1020 thread->running=2;
895 amb 991
896 amb 1020 pthread_cond_signal(&running_cond);
897 amb 996
898 amb 1020 pthread_mutex_unlock(&running_mutex);
899     }
900 amb 996
901     #endif
902    
903 amb 991 return(NULL);
904     }
905    
906    
907     /*++++++++++++++++++++++++++++++++++++++
908 amb 269 A function to sort an array of pointers efficiently.
909    
910     The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
911 amb 873 in particular, this is good because it can operate in-place and doesn't
912 amb 269 allocate more memory like using qsort() does.
913    
914     void **datap A pointer to the array of pointers to sort.
915    
916     size_t nitems The number of items of data to sort.
917    
918     int(*compare)(const void *, const void *) The comparison function (identical to qsort if the
919     data to be sorted was an array of things not pointers).
920     ++++++++++++++++++++++++++++++++++++++*/
921    
922 amb 503 void filesort_heapsort(void **datap,size_t nitems,int(*compare)(const void*, const void*))
923 amb 269 {
924 amb 875 void **datap1=&datap[-1];
925 amb 269 int i;
926    
927     /* Fill the heap by pretending to insert the data that is already there */
928    
929 amb 875 for(i=2;i<=nitems;i++)
930 amb 269 {
931     int index=i;
932    
933     /* Bubble up the new value (upside-down, put largest at top) */
934    
935 amb 875 while(index>1)
936 amb 269 {
937     int newindex;
938     void *temp;
939    
940 amb 875 newindex=index/2;
941 amb 269
942 amb 875 if(compare(datap1[index],datap1[newindex])<=0) /* reversed compared to filesort_fixed() above */
943 amb 869 break;
944    
945 amb 875 temp=datap1[index];
946     datap1[index]=datap1[newindex];
947     datap1[newindex]=temp;
948 amb 269
949     index=newindex;
950     }
951     }
952    
953     /* Repeatedly pull out the root of the heap and swap with the bottom item */
954    
955 amb 875 for(i=nitems;i>1;i--)
956 amb 269 {
957 amb 875 int index=1;
958 amb 269 void *temp;
959    
960 amb 875 temp=datap1[index];
961     datap1[index]=datap1[i];
962     datap1[i]=temp;
963 amb 269
964     /* Bubble down the new value (upside-down, put largest at top) */
965    
966 amb 875 while((2*index)<(i-1))
967 amb 269 {
968 amb 873 int newindex;
969 amb 269 void *temp;
970    
971 amb 875 newindex=2*index;
972 amb 269
973 amb 875 if(compare(datap1[newindex],datap1[newindex+1])<=0) /* reversed compared to filesort_fixed() above */
974 amb 873 newindex=newindex+1;
975 amb 869
976 amb 875 if(compare(datap1[index],datap1[newindex])>=0) /* reversed compared to filesort_fixed() above */
977 amb 869 break;
978    
979 amb 875 temp=datap1[newindex];
980     datap1[newindex]=datap1[index];
981     datap1[index]=temp;
982 amb 269
983     index=newindex;
984     }
985    
986 amb 875 if((2*index)==(i-1))
987 amb 269 {
988     int newindex;
989     void *temp;
990    
991 amb 875 newindex=2*index;
992 amb 269
993 amb 875 if(compare(datap1[index],datap1[newindex])>=0) /* reversed compared to filesort_fixed() above */
994 amb 869 ; /* break */
995     else
996     {
997 amb 875 temp=datap1[newindex];
998     datap1[newindex]=datap1[index];
999     datap1[index]=temp;
1000 amb 869 }
1001 amb 269 }
1002     }
1003     }

Properties

Name Value
cvs:description Functions to perform sorting.