Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Annotation of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1166 - (hide annotations) (download) (as text)
Tue Nov 20 16:12:08 2012 UTC (12 years, 4 months ago) by amb
File MIME type: text/x-csrc
File size: 26372 byte(s)
Replace all assert statements with a custom error message that explains the
cause and suggests a solution.

1 amb 269 /***************************************
2     Merge sort functions.
3    
4     Part of the Routino routing software.
5     ******************/ /******************
6 amb 948 This file Copyright 2009-2012 Andrew M. Bishop
7 amb 269
8     This program is free software: you can redistribute it and/or modify
9     it under the terms of the GNU Affero General Public License as published by
10     the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12    
13     This program is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16     GNU Affero General Public License for more details.
17    
18     You should have received a copy of the GNU Affero General Public License
19     along with this program. If not, see <http://www.gnu.org/licenses/>.
20     ***************************************/
21    
22    
23     #include <stdio.h>
24     #include <stdlib.h>
25     #include <string.h>
26    
27 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
28     #include <pthread.h>
29     #endif
30    
31 amb 532 #include "types.h"
32    
33 amb 1166 #include "logging.h"
34 amb 449 #include "files.h"
35 amb 532 #include "sorting.h"
36 amb 269
37    
38 amb 680 /* Global variables */
39 amb 269
40 amb 289 /*+ The command line '--tmpdir' option or its default value. +*/
41 amb 284 extern char *option_tmpdirname;
42 amb 269
43 amb 358 /*+ The amount of RAM to use for filesorting. +*/
44     extern size_t option_filesort_ramsize;
45 amb 269
46 amb 991 /*+ The number of filesorting threads allowed. +*/
47     extern int option_filesort_threads;
48 amb 358
49 amb 991
50     /* Thread data type definitions */
51    
52     /*+ A data type for holding data for a thread. +*/
53     typedef struct _thread_data
54     {
55     pthread_t thread; /*+ The thread identifier. +*/
56    
57     int running; /*+ A flag indicating the current state of the thread. +*/
58    
59     void *data; /*+ The main data array. +*/
60     void **datap; /*+ An array of pointers to the data objects. +*/
61     size_t n; /*+ The number of pointers. +*/
62    
63     char *filename; /*+ The name of the file to write the results to. +*/
64    
65     size_t itemsize; /*+ The size of each item. +*/
66     int (*compare)(const void*,const void*); /*+ The comparison function. +*/
67     }
68     thread_data;
69    
70 amb 996 /* Thread variables */
71 amb 991
72 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
73    
74     static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
75     static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
76    
77     #endif
78    
79 amb 991 /* Thread helper functions */
80    
81     static void *filesort_fixed_heapsort_thread(thread_data *thread);
82     static void *filesort_vary_heapsort_thread(thread_data *thread);
83    
84    
85 amb 269 /*++++++++++++++++++++++++++++++++++++++
86 amb 310 A function to sort the contents of a file of fixed length objects using a
87     limited amount of RAM.
88 amb 269
89     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
90     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
91     The individual sort steps and the merge step both use a "Heap sort"
92     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
93     if the data is already partially sorted.
94    
95 amb 948 index_t filesort_fixed Returns the number of objects kept.
96    
97 amb 269 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
98    
99     int fd_out The file descriptor of the output file (opened for writing and empty).
100    
101     size_t itemsize The size of each item in the file that needs sorting.
102    
103 amb 1106 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
104     each item before they have been sorted. The second parameter is the number of objects
105     previously read from the input file. If the function returns 1 then the object is kept
106     and it is sorted, otherwise it is ignored.
107 amb 269
108 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
109     to qsort if the data to be sorted is an array of things not pointers.
110    
111     int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
112     each item after they have been sorted. The second parameter is the number of objects
113     already written to the output file. If the function returns 1 then the object is written
114     to the output file., otherwise it is ignored.
115 amb 269 ++++++++++++++++++++++++++++++++++++++*/
116    
117 amb 1106 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t),
118     int (*compare_function)(const void*,const void*),
119     int (*post_sort_function)(void*,index_t))
120 amb 269 {
121     int *fds=NULL,*heap=NULL;
122     int nfiles=0,ndata=0;
123 amb 1112 index_t count_out=0,count_in=0,total=0;
124 amb 991 size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
125     void *data,**datap;
126     thread_data *threads;
127 amb 269 int i,more=1;
128 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
129     int nthreads=0;
130     #endif
131 amb 269
132     /* Allocate the RAM buffer and other bits */
133    
134 amb 991 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
135 amb 269
136 amb 991 for(i=0;i<option_filesort_threads;i++)
137     {
138     threads[i].running=0;
139 amb 269
140 amb 991 threads[i].data=malloc(nitems*itemsize);
141     threads[i].datap=malloc(nitems*sizeof(void*));
142    
143     threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
144    
145     threads[i].itemsize=itemsize;
146 amb 1106 threads[i].compare=compare_function;
147 amb 991 }
148    
149 amb 269 /* Loop around, fill the buffer, sort the data and write a temporary file */
150    
151     do
152     {
153 amb 991 int thread=0;
154 amb 269
155 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
156    
157 amb 1020 if(option_filesort_threads>1)
158     {
159     /* Find a spare slot (one *must* be unused at all times) */
160 amb 991
161 amb 1020 pthread_mutex_lock(&running_mutex);
162 amb 996
163 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
164     if(!threads[thread].running)
165     break;
166 amb 991
167 amb 1020 pthread_mutex_unlock(&running_mutex);
168     }
169 amb 996
170 amb 991 #endif
171    
172 amb 269 /* Read in the data and create pointers */
173    
174 amb 1106 for(i=0;i<nitems;)
175 amb 269 {
176 amb 991 threads[thread].datap[i]=threads[thread].data+i*itemsize;
177 amb 269
178 amb 991 if(ReadFile(fd_in,threads[thread].datap[i],itemsize))
179 amb 269 {
180     more=0;
181     break;
182     }
183    
184 amb 1106 if(!pre_sort_function || pre_sort_function(threads[thread].datap[i],count_in))
185     {
186     i++;
187 amb 1112 total++;
188 amb 1106 }
189 amb 1112
190     count_in++;
191 amb 269 }
192    
193 amb 991 threads[thread].n=i;
194 amb 269
195 amb 1020 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
196 amb 546
197 amb 1112 if(more==0 && total==0)
198 amb 546 goto tidy_and_exit;
199    
200 amb 543 /* No new data read in this time round */
201    
202 amb 991 if(threads[thread].n==0)
203 amb 269 break;
204    
205 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
206 amb 269
207 amb 991 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
208 amb 269
209 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
210 amb 269
211 amb 1021 /* Shortcut if only one file, don't write to disk */
212    
213     if(more==0 && nfiles==0)
214     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
215     else if(option_filesort_threads>1)
216 amb 269 {
217 amb 996 pthread_mutex_lock(&running_mutex);
218    
219 amb 991 while(nthreads==(option_filesort_threads-1))
220 amb 269 {
221 amb 991 for(i=0;i<option_filesort_threads;i++)
222     if(threads[i].running==2)
223     {
224     pthread_join(threads[i].thread,NULL);
225     threads[i].running=0;
226     nthreads--;
227     }
228    
229     if(nthreads==(option_filesort_threads-1))
230 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
231 amb 269 }
232    
233 amb 991 threads[thread].running=1;
234 amb 269
235 amb 996 pthread_mutex_unlock(&running_mutex);
236    
237 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
238 amb 269
239 amb 991 nthreads++;
240     }
241 amb 1020 else
242 amb 1021 filesort_fixed_heapsort_thread(&threads[thread]);
243 amb 269
244 amb 991 #else
245 amb 269
246 amb 1021 /* Shortcut if only one file, don't write to disk */
247    
248     if(more==0 && nfiles==0)
249     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
250     else
251 amb 1020 filesort_fixed_heapsort_thread(&threads[thread]);
252 amb 269
253 amb 991 #endif
254 amb 269
255     nfiles++;
256     }
257     while(more);
258    
259 amb 991 /* Wait for all of the threads to finish */
260 amb 269
261 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
262    
263 amb 1020 while(option_filesort_threads>1 && nthreads)
264 amb 991 {
265 amb 996 pthread_mutex_lock(&running_mutex);
266 amb 991
267 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
268    
269 amb 991 for(i=0;i<option_filesort_threads;i++)
270     if(threads[i].running==2)
271     {
272     pthread_join(threads[i].thread,NULL);
273     threads[i].running=0;
274     nthreads--;
275     }
276    
277 amb 996 pthread_mutex_unlock(&running_mutex);
278 amb 991 }
279    
280     #endif
281    
282     /* Shortcut if only one file, lucky for us we still have the data in RAM) */
283    
284 amb 269 if(nfiles==1)
285     {
286 amb 991 for(i=0;i<threads[0].n;i++)
287 amb 269 {
288 amb 1106 if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out))
289 amb 274 {
290 amb 991 WriteFile(fd_out,threads[0].datap[i],itemsize);
291 amb 1106 count_out++;
292 amb 274 }
293 amb 269 }
294    
295 amb 991 DeleteFile(threads[0].filename);
296 amb 269
297 amb 283 goto tidy_and_exit;
298 amb 269 }
299    
300     /* Check that number of files is less than file size */
301    
302 amb 1166 logassert(nfiles<nitems,"Too many temporary files (use more sorting memory?)");
303 amb 269
304     /* Open all of the temporary files */
305    
306     fds=(int*)malloc(nfiles*sizeof(int));
307    
308     for(i=0;i<nfiles;i++)
309     {
310 amb 991 char *filename=threads[0].filename;
311    
312 amb 284 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
313 amb 269
314     fds[i]=ReOpenFile(filename);
315    
316     DeleteFile(filename);
317     }
318    
319     /* Perform an n-way merge using a binary heap */
320    
321 amb 875 heap=(int*)malloc((1+nfiles)*sizeof(int));
322 amb 269
323 amb 991 data =threads[0].data;
324     datap=threads[0].datap;
325    
326 amb 269 /* Fill the heap to start with */
327    
328     for(i=0;i<nfiles;i++)
329     {
330     int index;
331    
332     datap[i]=data+i*itemsize;
333    
334     ReadFile(fds[i],datap[i],itemsize);
335    
336 amb 875 index=i+1;
337    
338 amb 867 heap[index]=i;
339 amb 269
340     /* Bubble up the new value */
341    
342 amb 875 while(index>1)
343 amb 269 {
344     int newindex;
345     int temp;
346    
347 amb 875 newindex=index/2;
348 amb 269
349 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
350 amb 869 break;
351    
352 amb 269 temp=heap[index];
353     heap[index]=heap[newindex];
354     heap[newindex]=temp;
355    
356     index=newindex;
357     }
358     }
359    
360     /* Repeatedly pull out the root of the heap and refill from the same file */
361    
362     ndata=nfiles;
363    
364     do
365     {
366 amb 875 int index=1;
367 amb 269
368 amb 1106 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
369 amb 274 {
370 amb 867 WriteFile(fd_out,datap[heap[index]],itemsize);
371 amb 1106 count_out++;
372 amb 274 }
373 amb 269
374 amb 867 if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize))
375 amb 269 {
376 amb 875 heap[index]=heap[ndata];
377 amb 870 ndata--;
378 amb 269 }
379    
380     /* Bubble down the new value */
381    
382 amb 875 while((2*index)<ndata)
383 amb 269 {
384 amb 873 int newindex;
385 amb 269 int temp;
386    
387 amb 875 newindex=2*index;
388 amb 269
389 amb 1106 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
390 amb 873 newindex=newindex+1;
391 amb 869
392 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
393 amb 869 break;
394    
395 amb 269 temp=heap[newindex];
396     heap[newindex]=heap[index];
397     heap[index]=temp;
398    
399     index=newindex;
400     }
401    
402 amb 875 if((2*index)==ndata)
403 amb 269 {
404     int newindex;
405     int temp;
406    
407 amb 875 newindex=2*index;
408 amb 269
409 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
410 amb 869 ; /* break */
411     else
412     {
413     temp=heap[newindex];
414     heap[newindex]=heap[index];
415     heap[index]=temp;
416     }
417 amb 269 }
418     }
419     while(ndata>0);
420    
421     /* Tidy up */
422    
423 amb 283 tidy_and_exit:
424 amb 269
425 amb 283 if(fds)
426     {
427     for(i=0;i<nfiles;i++)
428     CloseFile(fds[i]);
429     free(fds);
430     }
431    
432     if(heap)
433     free(heap);
434    
435 amb 991 for(i=0;i<option_filesort_threads;i++)
436     {
437     free(threads[i].data);
438     free(threads[i].datap);
439 amb 948
440 amb 991 free(threads[i].filename);
441     }
442    
443 amb 1106 return(count_out);
444 amb 269 }
445    
446    
447     /*++++++++++++++++++++++++++++++++++++++
448 amb 310 A function to sort the contents of a file of variable length objects (each
449 amb 867 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
450 amb 310
451     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
452     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
453     The individual sort steps and the merge step both use a "Heap sort"
454     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
455     if the data is already partially sorted.
456    
457 amb 948 index_t filesort_vary Returns the number of objects kept.
458    
459 amb 310 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
460    
461     int fd_out The file descriptor of the output file (opened for writing and empty).
462    
463 amb 1106 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
464     each item before they have been sorted. The second parameter is the number of objects
465     previously read from the input file. If the function returns 1 then the object is kept
466     and it is sorted, otherwise it is ignored.
467 amb 310
468 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
469     to qsort if the data to be sorted is an array of things not pointers.
470    
471     int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
472     each item after they have been sorted. The second parameter is the number of objects
473     already written to the output file. If the function returns 1 then the object is written
474     to the output file., otherwise it is ignored.
475 amb 310 ++++++++++++++++++++++++++++++++++++++*/
476    
477 amb 1106 index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t),
478     int (*compare_function)(const void*,const void*),
479     int (*post_sort_function)(void*,index_t))
480 amb 310 {
481     int *fds=NULL,*heap=NULL;
482     int nfiles=0,ndata=0;
483 amb 1112 index_t count_out=0,count_in=0,total=0;
484 amb 991 size_t datasize=option_filesort_ramsize/option_filesort_threads;
485 amb 311 FILESORT_VARINT nextitemsize,largestitemsize=0;
486 amb 991 void *data,**datap;
487     thread_data *threads;
488 amb 310 int i,more=1;
489 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
490     int nthreads=0;
491     #endif
492 amb 310
493     /* Allocate the RAM buffer and other bits */
494    
495 amb 991 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
496 amb 310
497 amb 991 for(i=0;i<option_filesort_threads;i++)
498     {
499     threads[i].running=0;
500 amb 310
501 amb 991 threads[i].data=malloc(datasize);
502     threads[i].datap=NULL;
503    
504     threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
505    
506 amb 1106 threads[i].compare=compare_function;
507 amb 991 }
508    
509 amb 310 /* Loop around, fill the buffer, sort the data and write a temporary file */
510    
511 amb 311 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
512 amb 310 goto tidy_and_exit;
513    
514     do
515     {
516 amb 311 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
517 amb 991 int thread=0;
518 amb 310
519 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
520 amb 310
521 amb 1020 if(option_filesort_threads>1)
522     {
523     /* Find a spare slot (one *must* be unused at all times) */
524 amb 991
525 amb 1020 pthread_mutex_lock(&running_mutex);
526 amb 996
527 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
528     if(!threads[thread].running)
529     break;
530 amb 991
531 amb 1020 pthread_mutex_unlock(&running_mutex);
532     }
533 amb 996
534 amb 991 #endif
535    
536     threads[thread].datap=threads[thread].data+datasize;
537    
538     threads[thread].n=0;
539    
540 amb 310 /* Read in the data and create pointers */
541    
542 amb 991 while((ramused+FILESORT_VARSIZE+nextitemsize)<=((void*)threads[thread].datap-sizeof(void*)-threads[thread].data))
543 amb 310 {
544 amb 311 FILESORT_VARINT itemsize=nextitemsize;
545 amb 310
546     if(itemsize>largestitemsize)
547     largestitemsize=itemsize;
548    
549 amb 991 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
550 amb 310
551 amb 311 ramused+=FILESORT_VARSIZE;
552 amb 310
553 amb 991 ReadFile(fd_in,threads[thread].data+ramused,itemsize);
554 amb 310
555 amb 1106 if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in))
556     {
557     *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
558 amb 310
559 amb 1106 ramused+=itemsize;
560 amb 311
561 amb 1106 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN);
562     ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
563 amb 311
564 amb 1112 total++;
565 amb 1106 threads[thread].n++;
566     }
567 amb 310
568 amb 1112 count_in++;
569    
570 amb 311 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE))
571 amb 310 {
572     more=0;
573     break;
574     }
575     }
576    
577 amb 543 /* No new data read in this time round */
578    
579 amb 991 if(threads[thread].n==0)
580 amb 310 break;
581    
582 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
583 amb 310
584 amb 1021 if(more==0 && nfiles==0)
585     threads[thread].filename[0]=0;
586     else
587     sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
588 amb 310
589 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
590 amb 310
591 amb 1021 /* Shortcut if only one file, don't write to disk */
592    
593     if(more==0 && nfiles==0)
594     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
595     else if(option_filesort_threads>1)
596 amb 310 {
597 amb 996 pthread_mutex_lock(&running_mutex);
598    
599 amb 991 while(nthreads==(option_filesort_threads-1))
600 amb 310 {
601 amb 991 for(i=0;i<option_filesort_threads;i++)
602     if(threads[i].running==2)
603     {
604     pthread_join(threads[i].thread,NULL);
605     threads[i].running=0;
606     nthreads--;
607     }
608    
609     if(nthreads==(option_filesort_threads-1))
610 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
611 amb 310 }
612    
613 amb 991 threads[thread].running=1;
614    
615 amb 996 pthread_mutex_unlock(&running_mutex);
616    
617 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
618    
619     nthreads++;
620 amb 310 }
621 amb 1020 else
622 amb 1021 filesort_vary_heapsort_thread(&threads[thread]);
623 amb 310
624 amb 991 #else
625 amb 310
626 amb 1021 /* Shortcut if only one file, don't write to disk */
627    
628     if(more==0 && nfiles==0)
629     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
630     else
631 amb 1020 filesort_vary_heapsort_thread(&threads[thread]);
632 amb 310
633 amb 991 #endif
634 amb 310
635 amb 991 nfiles++;
636     }
637     while(more);
638    
639     /* Wait for all of the threads to finish */
640    
641     #if defined(USE_PTHREADS) && USE_PTHREADS
642    
643 amb 1020 while(option_filesort_threads>1 && nthreads)
644 amb 991 {
645 amb 996 pthread_mutex_lock(&running_mutex);
646 amb 991
647 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
648    
649 amb 991 for(i=0;i<option_filesort_threads;i++)
650     if(threads[i].running==2)
651     {
652     pthread_join(threads[i].thread,NULL);
653     threads[i].running=0;
654     nthreads--;
655     }
656    
657 amb 996 pthread_mutex_unlock(&running_mutex);
658 amb 991 }
659    
660     #endif
661    
662     /* Shortcut if only one file, lucky for us we still have the data in RAM) */
663    
664     if(nfiles==1)
665     {
666     for(i=0;i<threads[0].n;i++)
667 amb 310 {
668 amb 1106 if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out))
669 amb 991 {
670     FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[i]-FILESORT_VARSIZE);
671 amb 310
672 amb 991 WriteFile(fd_out,threads[0].datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
673 amb 1106 count_out++;
674 amb 991 }
675 amb 310 }
676    
677 amb 991 DeleteFile(threads[0].filename);
678 amb 310
679 amb 991 goto tidy_and_exit;
680 amb 310 }
681    
682     /* Check that number of files is less than file size */
683    
684 amb 311 largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN);
685 amb 310
686 amb 1166 logassert(nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize),"Too many temporary files (use more sorting memory?)");
687 amb 310
688     /* Open all of the temporary files */
689    
690     fds=(int*)malloc(nfiles*sizeof(int));
691    
692     for(i=0;i<nfiles;i++)
693     {
694 amb 991 char *filename=threads[0].filename;
695    
696 amb 310 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
697    
698     fds[i]=ReOpenFile(filename);
699    
700     DeleteFile(filename);
701     }
702    
703     /* Perform an n-way merge using a binary heap */
704    
705 amb 875 heap=(int*)malloc((1+nfiles)*sizeof(int));
706 amb 310
707 amb 991 data=threads[0].data;
708     datap=data+datasize-nfiles*sizeof(void*);
709 amb 310
710     /* Fill the heap to start with */
711    
712     for(i=0;i<nfiles;i++)
713     {
714     int index;
715 amb 311 FILESORT_VARINT itemsize;
716 amb 310
717 amb 311 datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize;
718 amb 310
719 amb 311 ReadFile(fds[i],&itemsize,FILESORT_VARSIZE);
720 amb 310
721 amb 311 *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize;
722 amb 310
723     ReadFile(fds[i],datap[i],itemsize);
724    
725 amb 875 index=i+1;
726    
727 amb 867 heap[index]=i;
728 amb 310
729     /* Bubble up the new value */
730    
731 amb 875 while(index>1)
732 amb 310 {
733     int newindex;
734     int temp;
735    
736 amb 875 newindex=index/2;
737 amb 310
738 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
739 amb 869 break;
740    
741 amb 310 temp=heap[index];
742     heap[index]=heap[newindex];
743     heap[newindex]=temp;
744    
745     index=newindex;
746     }
747     }
748    
749     /* Repeatedly pull out the root of the heap and refill from the same file */
750    
751     ndata=nfiles;
752    
753     do
754     {
755 amb 875 int index=1;
756 amb 311 FILESORT_VARINT itemsize;
757 amb 310
758 amb 1106 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
759 amb 310 {
760 amb 867 itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE);
761 amb 310
762 amb 867 WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
763 amb 1106 count_out++;
764 amb 310 }
765    
766 amb 867 if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
767 amb 310 {
768 amb 875 heap[index]=heap[ndata];
769 amb 870 ndata--;
770 amb 310 }
771     else
772     {
773 amb 867 *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
774 amb 310
775 amb 867 ReadFile(fds[heap[index]],datap[heap[index]],itemsize);
776 amb 310 }
777    
778     /* Bubble down the new value */
779    
780 amb 875 while((2*index)<ndata)
781 amb 310 {
782 amb 873 int newindex;
783 amb 310 int temp;
784    
785 amb 875 newindex=2*index;
786 amb 310
787 amb 1106 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
788 amb 873 newindex=newindex+1;
789 amb 869
790 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
791 amb 869 break;
792    
793 amb 310 temp=heap[newindex];
794     heap[newindex]=heap[index];
795     heap[index]=temp;
796    
797     index=newindex;
798     }
799    
800 amb 875 if((2*index)==ndata)
801 amb 310 {
802     int newindex;
803     int temp;
804    
805 amb 875 newindex=2*index;
806 amb 310
807 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
808 amb 869 ; /* break */
809     else
810     {
811     temp=heap[newindex];
812     heap[newindex]=heap[index];
813     heap[index]=temp;
814     }
815 amb 310 }
816     }
817     while(ndata>0);
818    
819     /* Tidy up */
820    
821     tidy_and_exit:
822    
823     if(fds)
824     {
825     for(i=0;i<nfiles;i++)
826     CloseFile(fds[i]);
827     free(fds);
828     }
829    
830     if(heap)
831     free(heap);
832    
833 amb 991 for(i=0;i<option_filesort_threads;i++)
834     {
835     free(threads[i].data);
836 amb 948
837 amb 991 free(threads[i].filename);
838     }
839    
840 amb 1106 return(count_out);
841 amb 310 }
842    
843    
844     /*++++++++++++++++++++++++++++++++++++++
845 amb 991 A wrapper function that can be run in a thread for fixed data.
846    
847     void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
848    
849     thread_data *thread The data to be processed in this thread.
850     ++++++++++++++++++++++++++++++++++++++*/
851    
852     static void *filesort_fixed_heapsort_thread(thread_data *thread)
853     {
854     int fd,i;
855    
856     /* Sort the data pointers using a heap sort */
857    
858     filesort_heapsort(thread->datap,thread->n,thread->compare);
859    
860     /* Create a temporary file and write the result */
861    
862     fd=OpenFileNew(thread->filename);
863    
864     for(i=0;i<thread->n;i++)
865     WriteFile(fd,thread->datap[i],thread->itemsize);
866    
867     CloseFile(fd);
868    
869 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
870    
871 amb 1020 if(option_filesort_threads>1)
872     {
873     pthread_mutex_lock(&running_mutex);
874 amb 996
875 amb 1020 thread->running=2;
876 amb 991
877 amb 1020 pthread_cond_signal(&running_cond);
878 amb 996
879 amb 1020 pthread_mutex_unlock(&running_mutex);
880     }
881 amb 996
882     #endif
883    
884 amb 991 return(NULL);
885     }
886    
887    
888     /*++++++++++++++++++++++++++++++++++++++
889     A wrapper function that can be run in a thread for variable data.
890    
891     void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
892    
893     thread_data *thread The data to be processed in this thread.
894     ++++++++++++++++++++++++++++++++++++++*/
895    
896     static void *filesort_vary_heapsort_thread(thread_data *thread)
897     {
898     int fd,i;
899    
900     /* Sort the data pointers using a heap sort */
901    
902     filesort_heapsort(thread->datap,thread->n,thread->compare);
903    
904     /* Create a temporary file and write the result */
905    
906     fd=OpenFileNew(thread->filename);
907    
908     for(i=0;i<thread->n;i++)
909     {
910     FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[i]-FILESORT_VARSIZE);
911    
912     WriteFile(fd,thread->datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
913     }
914    
915     CloseFile(fd);
916    
917 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
918    
919 amb 1020 if(option_filesort_threads>1)
920     {
921     pthread_mutex_lock(&running_mutex);
922 amb 996
923 amb 1020 thread->running=2;
924 amb 991
925 amb 1020 pthread_cond_signal(&running_cond);
926 amb 996
927 amb 1020 pthread_mutex_unlock(&running_mutex);
928     }
929 amb 996
930     #endif
931    
932 amb 991 return(NULL);
933     }
934    
935    
936     /*++++++++++++++++++++++++++++++++++++++
937 amb 269 A function to sort an array of pointers efficiently.
938    
939     The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
940 amb 873 in particular, this is good because it can operate in-place and doesn't
941 amb 269 allocate more memory like using qsort() does.
942    
943     void **datap A pointer to the array of pointers to sort.
944    
945     size_t nitems The number of items of data to sort.
946    
947 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
948     to qsort if the data to be sorted is an array of things not pointers.
949 amb 269 ++++++++++++++++++++++++++++++++++++++*/
950    
951 amb 1106 void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*))
952 amb 269 {
953 amb 875 void **datap1=&datap[-1];
954 amb 269 int i;
955    
956     /* Fill the heap by pretending to insert the data that is already there */
957    
958 amb 875 for(i=2;i<=nitems;i++)
959 amb 269 {
960     int index=i;
961    
962     /* Bubble up the new value (upside-down, put largest at top) */
963    
964 amb 875 while(index>1)
965 amb 269 {
966     int newindex;
967     void *temp;
968    
969 amb 875 newindex=index/2;
970 amb 269
971 amb 1106 if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */
972 amb 869 break;
973    
974 amb 875 temp=datap1[index];
975     datap1[index]=datap1[newindex];
976     datap1[newindex]=temp;
977 amb 269
978     index=newindex;
979     }
980     }
981    
982     /* Repeatedly pull out the root of the heap and swap with the bottom item */
983    
984 amb 875 for(i=nitems;i>1;i--)
985 amb 269 {
986 amb 875 int index=1;
987 amb 269 void *temp;
988    
989 amb 875 temp=datap1[index];
990     datap1[index]=datap1[i];
991     datap1[i]=temp;
992 amb 269
993     /* Bubble down the new value (upside-down, put largest at top) */
994    
995 amb 875 while((2*index)<(i-1))
996 amb 269 {
997 amb 873 int newindex;
998 amb 269 void *temp;
999    
1000 amb 875 newindex=2*index;
1001 amb 269
1002 amb 1106 if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */
1003 amb 873 newindex=newindex+1;
1004 amb 869
1005 amb 1106 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1006 amb 869 break;
1007    
1008 amb 875 temp=datap1[newindex];
1009     datap1[newindex]=datap1[index];
1010     datap1[index]=temp;
1011 amb 269
1012     index=newindex;
1013     }
1014    
1015 amb 875 if((2*index)==(i-1))
1016 amb 269 {
1017     int newindex;
1018     void *temp;
1019    
1020 amb 875 newindex=2*index;
1021 amb 269
1022 amb 1106 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1023 amb 869 ; /* break */
1024     else
1025     {
1026 amb 875 temp=datap1[newindex];
1027     datap1[newindex]=datap1[index];
1028     datap1[index]=temp;
1029 amb 869 }
1030 amb 269 }
1031     }
1032     }

Properties

Name Value
cvs:description Functions to perform sorting.