Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Annotation of /branches/destination-access/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1106 - (hide annotations) (download) (as text)
Sun Oct 21 15:55:48 2012 UTC (12 years, 5 months ago) by amb
Original Path: trunk/src/sorting.c
File MIME type: text/x-csrc
File size: 26210 byte(s)
Change the sorting functions to have a pre-sort and post-sort selection function
instead of just a post-selection one (this will allow deletion of some items
before sorting instead of after sorting in some cases).

1 amb 269 /***************************************
2     Merge sort functions.
3    
4     Part of the Routino routing software.
5     ******************/ /******************
6 amb 948 This file Copyright 2009-2012 Andrew M. Bishop
7 amb 269
8     This program is free software: you can redistribute it and/or modify
9     it under the terms of the GNU Affero General Public License as published by
10     the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12    
13     This program is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16     GNU Affero General Public License for more details.
17    
18     You should have received a copy of the GNU Affero General Public License
19     along with this program. If not, see <http://www.gnu.org/licenses/>.
20     ***************************************/
21    
22    
23     #include <stdio.h>
24     #include <stdlib.h>
25     #include <string.h>
26     #include <assert.h>
27    
28 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
29     #include <pthread.h>
30     #endif
31    
32 amb 532 #include "types.h"
33    
34 amb 449 #include "files.h"
35 amb 532 #include "sorting.h"
36 amb 269
37    
38 amb 680 /* Global variables */
39 amb 269
40 amb 289 /*+ The command line '--tmpdir' option or its default value. +*/
41 amb 284 extern char *option_tmpdirname;
42 amb 269
43 amb 358 /*+ The amount of RAM to use for filesorting. +*/
44     extern size_t option_filesort_ramsize;
45 amb 269
46 amb 991 /*+ The number of filesorting threads allowed. +*/
47     extern int option_filesort_threads;
48 amb 358
49 amb 991
50     /* Thread data type definitions */
51    
52     /*+ A data type for holding data for a thread. +*/
53     typedef struct _thread_data
54     {
55     pthread_t thread; /*+ The thread identifier. +*/
56    
57     int running; /*+ A flag indicating the current state of the thread. +*/
58    
59     void *data; /*+ The main data array. +*/
60     void **datap; /*+ An array of pointers to the data objects. +*/
61     size_t n; /*+ The number of pointers. +*/
62    
63     char *filename; /*+ The name of the file to write the results to. +*/
64    
65     size_t itemsize; /*+ The size of each item. +*/
66     int (*compare)(const void*,const void*); /*+ The comparison function. +*/
67     }
68     thread_data;
69    
70 amb 996 /* Thread variables */
71 amb 991
72 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
73    
74     static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
75     static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
76    
77     #endif
78    
79 amb 991 /* Thread helper functions */
80    
81     static void *filesort_fixed_heapsort_thread(thread_data *thread);
82     static void *filesort_vary_heapsort_thread(thread_data *thread);
83    
84    
85 amb 269 /*++++++++++++++++++++++++++++++++++++++
86 amb 310 A function to sort the contents of a file of fixed length objects using a
87     limited amount of RAM.
88 amb 269
89     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
90     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
91     The individual sort steps and the merge step both use a "Heap sort"
92     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
93     if the data is already partially sorted.
94    
95 amb 948 index_t filesort_fixed Returns the number of objects kept.
96    
97 amb 269 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
98    
99     int fd_out The file descriptor of the output file (opened for writing and empty).
100    
101     size_t itemsize The size of each item in the file that needs sorting.
102    
103 amb 1106 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
104     each item before they have been sorted. The second parameter is the number of objects
105     previously read from the input file. If the function returns 1 then the object is kept
106     and it is sorted, otherwise it is ignored.
107 amb 269
108 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
109     to qsort if the data to be sorted is an array of things not pointers.
110    
111     int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
112     each item after they have been sorted. The second parameter is the number of objects
113     already written to the output file. If the function returns 1 then the object is written
114     to the output file., otherwise it is ignored.
115 amb 269 ++++++++++++++++++++++++++++++++++++++*/
116    
117 amb 1106 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t),
118     int (*compare_function)(const void*,const void*),
119     int (*post_sort_function)(void*,index_t))
120 amb 269 {
121     int *fds=NULL,*heap=NULL;
122     int nfiles=0,ndata=0;
123 amb 1106 index_t count_out=0,count_in=0;
124 amb 991 size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
125     void *data,**datap;
126     thread_data *threads;
127 amb 269 int i,more=1;
128 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
129     int nthreads=0;
130     #endif
131 amb 269
132     /* Allocate the RAM buffer and other bits */
133    
134 amb 991 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
135 amb 269
136 amb 991 for(i=0;i<option_filesort_threads;i++)
137     {
138     threads[i].running=0;
139 amb 269
140 amb 991 threads[i].data=malloc(nitems*itemsize);
141     threads[i].datap=malloc(nitems*sizeof(void*));
142    
143     threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
144    
145     threads[i].itemsize=itemsize;
146 amb 1106 threads[i].compare=compare_function;
147 amb 991 }
148    
149 amb 269 /* Loop around, fill the buffer, sort the data and write a temporary file */
150    
151     do
152     {
153 amb 991 int thread=0;
154 amb 269
155 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
156    
157 amb 1020 if(option_filesort_threads>1)
158     {
159     /* Find a spare slot (one *must* be unused at all times) */
160 amb 991
161 amb 1020 pthread_mutex_lock(&running_mutex);
162 amb 996
163 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
164     if(!threads[thread].running)
165     break;
166 amb 991
167 amb 1020 pthread_mutex_unlock(&running_mutex);
168     }
169 amb 996
170 amb 991 #endif
171    
172 amb 269 /* Read in the data and create pointers */
173    
174 amb 1106 for(i=0;i<nitems;)
175 amb 269 {
176 amb 991 threads[thread].datap[i]=threads[thread].data+i*itemsize;
177 amb 269
178 amb 991 if(ReadFile(fd_in,threads[thread].datap[i],itemsize))
179 amb 269 {
180     more=0;
181     break;
182     }
183    
184 amb 1106 if(!pre_sort_function || pre_sort_function(threads[thread].datap[i],count_in))
185     {
186     i++;
187     count_in++;
188     }
189 amb 269 }
190    
191 amb 991 threads[thread].n=i;
192 amb 269
193 amb 1020 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
194 amb 546
195 amb 1106 if(more==0 && count_in==0)
196 amb 546 goto tidy_and_exit;
197    
198 amb 543 /* No new data read in this time round */
199    
200 amb 991 if(threads[thread].n==0)
201 amb 269 break;
202    
203 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
204 amb 269
205 amb 991 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
206 amb 269
207 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
208 amb 269
209 amb 1021 /* Shortcut if only one file, don't write to disk */
210    
211     if(more==0 && nfiles==0)
212     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
213     else if(option_filesort_threads>1)
214 amb 269 {
215 amb 996 pthread_mutex_lock(&running_mutex);
216    
217 amb 991 while(nthreads==(option_filesort_threads-1))
218 amb 269 {
219 amb 991 for(i=0;i<option_filesort_threads;i++)
220     if(threads[i].running==2)
221     {
222     pthread_join(threads[i].thread,NULL);
223     threads[i].running=0;
224     nthreads--;
225     }
226    
227     if(nthreads==(option_filesort_threads-1))
228 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
229 amb 269 }
230    
231 amb 991 threads[thread].running=1;
232 amb 269
233 amb 996 pthread_mutex_unlock(&running_mutex);
234    
235 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
236 amb 269
237 amb 991 nthreads++;
238     }
239 amb 1020 else
240 amb 1021 filesort_fixed_heapsort_thread(&threads[thread]);
241 amb 269
242 amb 991 #else
243 amb 269
244 amb 1021 /* Shortcut if only one file, don't write to disk */
245    
246     if(more==0 && nfiles==0)
247     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
248     else
249 amb 1020 filesort_fixed_heapsort_thread(&threads[thread]);
250 amb 269
251 amb 991 #endif
252 amb 269
253     nfiles++;
254     }
255     while(more);
256    
257 amb 991 /* Wait for all of the threads to finish */
258 amb 269
259 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
260    
261 amb 1020 while(option_filesort_threads>1 && nthreads)
262 amb 991 {
263 amb 996 pthread_mutex_lock(&running_mutex);
264 amb 991
265 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
266    
267 amb 991 for(i=0;i<option_filesort_threads;i++)
268     if(threads[i].running==2)
269     {
270     pthread_join(threads[i].thread,NULL);
271     threads[i].running=0;
272     nthreads--;
273     }
274    
275 amb 996 pthread_mutex_unlock(&running_mutex);
276 amb 991 }
277    
278     #endif
279    
280     /* Shortcut if only one file, lucky for us we still have the data in RAM) */
281    
282 amb 269 if(nfiles==1)
283     {
284 amb 991 for(i=0;i<threads[0].n;i++)
285 amb 269 {
286 amb 1106 if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out))
287 amb 274 {
288 amb 991 WriteFile(fd_out,threads[0].datap[i],itemsize);
289 amb 1106 count_out++;
290 amb 274 }
291 amb 269 }
292    
293 amb 991 DeleteFile(threads[0].filename);
294 amb 269
295 amb 283 goto tidy_and_exit;
296 amb 269 }
297    
298     /* Check that number of files is less than file size */
299    
300     assert(nfiles<nitems);
301    
302     /* Open all of the temporary files */
303    
304     fds=(int*)malloc(nfiles*sizeof(int));
305    
306     for(i=0;i<nfiles;i++)
307     {
308 amb 991 char *filename=threads[0].filename;
309    
310 amb 284 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
311 amb 269
312     fds[i]=ReOpenFile(filename);
313    
314     DeleteFile(filename);
315     }
316    
317     /* Perform an n-way merge using a binary heap */
318    
319 amb 875 heap=(int*)malloc((1+nfiles)*sizeof(int));
320 amb 269
321 amb 991 data =threads[0].data;
322     datap=threads[0].datap;
323    
324 amb 269 /* Fill the heap to start with */
325    
326     for(i=0;i<nfiles;i++)
327     {
328     int index;
329    
330     datap[i]=data+i*itemsize;
331    
332     ReadFile(fds[i],datap[i],itemsize);
333    
334 amb 875 index=i+1;
335    
336 amb 867 heap[index]=i;
337 amb 269
338     /* Bubble up the new value */
339    
340 amb 875 while(index>1)
341 amb 269 {
342     int newindex;
343     int temp;
344    
345 amb 875 newindex=index/2;
346 amb 269
347 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
348 amb 869 break;
349    
350 amb 269 temp=heap[index];
351     heap[index]=heap[newindex];
352     heap[newindex]=temp;
353    
354     index=newindex;
355     }
356     }
357    
358     /* Repeatedly pull out the root of the heap and refill from the same file */
359    
360     ndata=nfiles;
361    
362     do
363     {
364 amb 875 int index=1;
365 amb 269
366 amb 1106 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
367 amb 274 {
368 amb 867 WriteFile(fd_out,datap[heap[index]],itemsize);
369 amb 1106 count_out++;
370 amb 274 }
371 amb 269
372 amb 867 if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize))
373 amb 269 {
374 amb 875 heap[index]=heap[ndata];
375 amb 870 ndata--;
376 amb 269 }
377    
378     /* Bubble down the new value */
379    
380 amb 875 while((2*index)<ndata)
381 amb 269 {
382 amb 873 int newindex;
383 amb 269 int temp;
384    
385 amb 875 newindex=2*index;
386 amb 269
387 amb 1106 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
388 amb 873 newindex=newindex+1;
389 amb 869
390 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
391 amb 869 break;
392    
393 amb 269 temp=heap[newindex];
394     heap[newindex]=heap[index];
395     heap[index]=temp;
396    
397     index=newindex;
398     }
399    
400 amb 875 if((2*index)==ndata)
401 amb 269 {
402     int newindex;
403     int temp;
404    
405 amb 875 newindex=2*index;
406 amb 269
407 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
408 amb 869 ; /* break */
409     else
410     {
411     temp=heap[newindex];
412     heap[newindex]=heap[index];
413     heap[index]=temp;
414     }
415 amb 269 }
416     }
417     while(ndata>0);
418    
419     /* Tidy up */
420    
421 amb 283 tidy_and_exit:
422 amb 269
423 amb 283 if(fds)
424     {
425     for(i=0;i<nfiles;i++)
426     CloseFile(fds[i]);
427     free(fds);
428     }
429    
430     if(heap)
431     free(heap);
432    
433 amb 991 for(i=0;i<option_filesort_threads;i++)
434     {
435     free(threads[i].data);
436     free(threads[i].datap);
437 amb 948
438 amb 991 free(threads[i].filename);
439     }
440    
441 amb 1106 return(count_out);
442 amb 269 }
443    
444    
445     /*++++++++++++++++++++++++++++++++++++++
446 amb 310 A function to sort the contents of a file of variable length objects (each
447 amb 867 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
448 amb 310
449     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
450     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
451     The individual sort steps and the merge step both use a "Heap sort"
452     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
453     if the data is already partially sorted.
454    
455 amb 948 index_t filesort_vary Returns the number of objects kept.
456    
457 amb 310 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
458    
459     int fd_out The file descriptor of the output file (opened for writing and empty).
460    
461 amb 1106 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
462     each item before they have been sorted. The second parameter is the number of objects
463     previously read from the input file. If the function returns 1 then the object is kept
464     and it is sorted, otherwise it is ignored.
465 amb 310
466 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
467     to qsort if the data to be sorted is an array of things not pointers.
468    
469     int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
470     each item after they have been sorted. The second parameter is the number of objects
471     already written to the output file. If the function returns 1 then the object is written
472     to the output file., otherwise it is ignored.
473 amb 310 ++++++++++++++++++++++++++++++++++++++*/
474    
475 amb 1106 index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t),
476     int (*compare_function)(const void*,const void*),
477     int (*post_sort_function)(void*,index_t))
478 amb 310 {
479     int *fds=NULL,*heap=NULL;
480     int nfiles=0,ndata=0;
481 amb 1106 index_t count_out=0,count_in=0;
482 amb 991 size_t datasize=option_filesort_ramsize/option_filesort_threads;
483 amb 311 FILESORT_VARINT nextitemsize,largestitemsize=0;
484 amb 991 void *data,**datap;
485     thread_data *threads;
486 amb 310 int i,more=1;
487 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
488     int nthreads=0;
489     #endif
490 amb 310
491     /* Allocate the RAM buffer and other bits */
492    
493 amb 991 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
494 amb 310
495 amb 991 for(i=0;i<option_filesort_threads;i++)
496     {
497     threads[i].running=0;
498 amb 310
499 amb 991 threads[i].data=malloc(datasize);
500     threads[i].datap=NULL;
501    
502     threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
503    
504 amb 1106 threads[i].compare=compare_function;
505 amb 991 }
506    
507 amb 310 /* Loop around, fill the buffer, sort the data and write a temporary file */
508    
509 amb 311 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
510 amb 310 goto tidy_and_exit;
511    
512     do
513     {
514 amb 311 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
515 amb 991 int thread=0;
516 amb 310
517 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
518 amb 310
519 amb 1020 if(option_filesort_threads>1)
520     {
521     /* Find a spare slot (one *must* be unused at all times) */
522 amb 991
523 amb 1020 pthread_mutex_lock(&running_mutex);
524 amb 996
525 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
526     if(!threads[thread].running)
527     break;
528 amb 991
529 amb 1020 pthread_mutex_unlock(&running_mutex);
530     }
531 amb 996
532 amb 991 #endif
533    
534     threads[thread].datap=threads[thread].data+datasize;
535    
536     threads[thread].n=0;
537    
538 amb 310 /* Read in the data and create pointers */
539    
540 amb 991 while((ramused+FILESORT_VARSIZE+nextitemsize)<=((void*)threads[thread].datap-sizeof(void*)-threads[thread].data))
541 amb 310 {
542 amb 311 FILESORT_VARINT itemsize=nextitemsize;
543 amb 310
544     if(itemsize>largestitemsize)
545     largestitemsize=itemsize;
546    
547 amb 991 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
548 amb 310
549 amb 311 ramused+=FILESORT_VARSIZE;
550 amb 310
551 amb 991 ReadFile(fd_in,threads[thread].data+ramused,itemsize);
552 amb 310
553 amb 1106 if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in))
554     {
555     *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
556 amb 310
557 amb 1106 ramused+=itemsize;
558 amb 311
559 amb 1106 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN);
560     ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
561 amb 311
562 amb 1106 count_in++;
563     threads[thread].n++;
564     }
565 amb 310
566 amb 311 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE))
567 amb 310 {
568     more=0;
569     break;
570     }
571     }
572    
573 amb 543 /* No new data read in this time round */
574    
575 amb 991 if(threads[thread].n==0)
576 amb 310 break;
577    
578 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
579 amb 310
580 amb 1021 if(more==0 && nfiles==0)
581     threads[thread].filename[0]=0;
582     else
583     sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
584 amb 310
585 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
586 amb 310
587 amb 1021 /* Shortcut if only one file, don't write to disk */
588    
589     if(more==0 && nfiles==0)
590     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
591     else if(option_filesort_threads>1)
592 amb 310 {
593 amb 996 pthread_mutex_lock(&running_mutex);
594    
595 amb 991 while(nthreads==(option_filesort_threads-1))
596 amb 310 {
597 amb 991 for(i=0;i<option_filesort_threads;i++)
598     if(threads[i].running==2)
599     {
600     pthread_join(threads[i].thread,NULL);
601     threads[i].running=0;
602     nthreads--;
603     }
604    
605     if(nthreads==(option_filesort_threads-1))
606 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
607 amb 310 }
608    
609 amb 991 threads[thread].running=1;
610    
611 amb 996 pthread_mutex_unlock(&running_mutex);
612    
613 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
614    
615     nthreads++;
616 amb 310 }
617 amb 1020 else
618 amb 1021 filesort_vary_heapsort_thread(&threads[thread]);
619 amb 310
620 amb 991 #else
621 amb 310
622 amb 1021 /* Shortcut if only one file, don't write to disk */
623    
624     if(more==0 && nfiles==0)
625     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
626     else
627 amb 1020 filesort_vary_heapsort_thread(&threads[thread]);
628 amb 310
629 amb 991 #endif
630 amb 310
631 amb 991 nfiles++;
632     }
633     while(more);
634    
635     /* Wait for all of the threads to finish */
636    
637     #if defined(USE_PTHREADS) && USE_PTHREADS
638    
639 amb 1020 while(option_filesort_threads>1 && nthreads)
640 amb 991 {
641 amb 996 pthread_mutex_lock(&running_mutex);
642 amb 991
643 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
644    
645 amb 991 for(i=0;i<option_filesort_threads;i++)
646     if(threads[i].running==2)
647     {
648     pthread_join(threads[i].thread,NULL);
649     threads[i].running=0;
650     nthreads--;
651     }
652    
653 amb 996 pthread_mutex_unlock(&running_mutex);
654 amb 991 }
655    
656     #endif
657    
658     /* Shortcut if only one file, lucky for us we still have the data in RAM) */
659    
660     if(nfiles==1)
661     {
662     for(i=0;i<threads[0].n;i++)
663 amb 310 {
664 amb 1106 if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out))
665 amb 991 {
666     FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[i]-FILESORT_VARSIZE);
667 amb 310
668 amb 991 WriteFile(fd_out,threads[0].datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
669 amb 1106 count_out++;
670 amb 991 }
671 amb 310 }
672    
673 amb 991 DeleteFile(threads[0].filename);
674 amb 310
675 amb 991 goto tidy_and_exit;
676 amb 310 }
677    
678     /* Check that number of files is less than file size */
679    
680 amb 311 largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN);
681 amb 310
682 amb 991 assert(nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize));
683 amb 310
684     /* Open all of the temporary files */
685    
686     fds=(int*)malloc(nfiles*sizeof(int));
687    
688     for(i=0;i<nfiles;i++)
689     {
690 amb 991 char *filename=threads[0].filename;
691    
692 amb 310 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
693    
694     fds[i]=ReOpenFile(filename);
695    
696     DeleteFile(filename);
697     }
698    
699     /* Perform an n-way merge using a binary heap */
700    
701 amb 875 heap=(int*)malloc((1+nfiles)*sizeof(int));
702 amb 310
703 amb 991 data=threads[0].data;
704     datap=data+datasize-nfiles*sizeof(void*);
705 amb 310
706     /* Fill the heap to start with */
707    
708     for(i=0;i<nfiles;i++)
709     {
710     int index;
711 amb 311 FILESORT_VARINT itemsize;
712 amb 310
713 amb 311 datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize;
714 amb 310
715 amb 311 ReadFile(fds[i],&itemsize,FILESORT_VARSIZE);
716 amb 310
717 amb 311 *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize;
718 amb 310
719     ReadFile(fds[i],datap[i],itemsize);
720    
721 amb 875 index=i+1;
722    
723 amb 867 heap[index]=i;
724 amb 310
725     /* Bubble up the new value */
726    
727 amb 875 while(index>1)
728 amb 310 {
729     int newindex;
730     int temp;
731    
732 amb 875 newindex=index/2;
733 amb 310
734 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
735 amb 869 break;
736    
737 amb 310 temp=heap[index];
738     heap[index]=heap[newindex];
739     heap[newindex]=temp;
740    
741     index=newindex;
742     }
743     }
744    
745     /* Repeatedly pull out the root of the heap and refill from the same file */
746    
747     ndata=nfiles;
748    
749     do
750     {
751 amb 875 int index=1;
752 amb 311 FILESORT_VARINT itemsize;
753 amb 310
754 amb 1106 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
755 amb 310 {
756 amb 867 itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE);
757 amb 310
758 amb 867 WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
759 amb 1106 count_out++;
760 amb 310 }
761    
762 amb 867 if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
763 amb 310 {
764 amb 875 heap[index]=heap[ndata];
765 amb 870 ndata--;
766 amb 310 }
767     else
768     {
769 amb 867 *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
770 amb 310
771 amb 867 ReadFile(fds[heap[index]],datap[heap[index]],itemsize);
772 amb 310 }
773    
774     /* Bubble down the new value */
775    
776 amb 875 while((2*index)<ndata)
777 amb 310 {
778 amb 873 int newindex;
779 amb 310 int temp;
780    
781 amb 875 newindex=2*index;
782 amb 310
783 amb 1106 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
784 amb 873 newindex=newindex+1;
785 amb 869
786 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
787 amb 869 break;
788    
789 amb 310 temp=heap[newindex];
790     heap[newindex]=heap[index];
791     heap[index]=temp;
792    
793     index=newindex;
794     }
795    
796 amb 875 if((2*index)==ndata)
797 amb 310 {
798     int newindex;
799     int temp;
800    
801 amb 875 newindex=2*index;
802 amb 310
803 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
804 amb 869 ; /* break */
805     else
806     {
807     temp=heap[newindex];
808     heap[newindex]=heap[index];
809     heap[index]=temp;
810     }
811 amb 310 }
812     }
813     while(ndata>0);
814    
815     /* Tidy up */
816    
817     tidy_and_exit:
818    
819     if(fds)
820     {
821     for(i=0;i<nfiles;i++)
822     CloseFile(fds[i]);
823     free(fds);
824     }
825    
826     if(heap)
827     free(heap);
828    
829 amb 991 for(i=0;i<option_filesort_threads;i++)
830     {
831     free(threads[i].data);
832 amb 948
833 amb 991 free(threads[i].filename);
834     }
835    
836 amb 1106 return(count_out);
837 amb 310 }
838    
839    
840     /*++++++++++++++++++++++++++++++++++++++
841 amb 991 A wrapper function that can be run in a thread for fixed data.
842    
843     void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
844    
845     thread_data *thread The data to be processed in this thread.
846     ++++++++++++++++++++++++++++++++++++++*/
847    
848     static void *filesort_fixed_heapsort_thread(thread_data *thread)
849     {
850     int fd,i;
851    
852     /* Sort the data pointers using a heap sort */
853    
854     filesort_heapsort(thread->datap,thread->n,thread->compare);
855    
856     /* Create a temporary file and write the result */
857    
858     fd=OpenFileNew(thread->filename);
859    
860     for(i=0;i<thread->n;i++)
861     WriteFile(fd,thread->datap[i],thread->itemsize);
862    
863     CloseFile(fd);
864    
865 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
866    
867 amb 1020 if(option_filesort_threads>1)
868     {
869     pthread_mutex_lock(&running_mutex);
870 amb 996
871 amb 1020 thread->running=2;
872 amb 991
873 amb 1020 pthread_cond_signal(&running_cond);
874 amb 996
875 amb 1020 pthread_mutex_unlock(&running_mutex);
876     }
877 amb 996
878     #endif
879    
880 amb 991 return(NULL);
881     }
882    
883    
884     /*++++++++++++++++++++++++++++++++++++++
885     A wrapper function that can be run in a thread for variable data.
886    
887     void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
888    
889     thread_data *thread The data to be processed in this thread.
890     ++++++++++++++++++++++++++++++++++++++*/
891    
892     static void *filesort_vary_heapsort_thread(thread_data *thread)
893     {
894     int fd,i;
895    
896     /* Sort the data pointers using a heap sort */
897    
898     filesort_heapsort(thread->datap,thread->n,thread->compare);
899    
900     /* Create a temporary file and write the result */
901    
902     fd=OpenFileNew(thread->filename);
903    
904     for(i=0;i<thread->n;i++)
905     {
906     FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[i]-FILESORT_VARSIZE);
907    
908     WriteFile(fd,thread->datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
909     }
910    
911     CloseFile(fd);
912    
913 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
914    
915 amb 1020 if(option_filesort_threads>1)
916     {
917     pthread_mutex_lock(&running_mutex);
918 amb 996
919 amb 1020 thread->running=2;
920 amb 991
921 amb 1020 pthread_cond_signal(&running_cond);
922 amb 996
923 amb 1020 pthread_mutex_unlock(&running_mutex);
924     }
925 amb 996
926     #endif
927    
928 amb 991 return(NULL);
929     }
930    
931    
932     /*++++++++++++++++++++++++++++++++++++++
933 amb 269 A function to sort an array of pointers efficiently.
934    
935     The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
936 amb 873 in particular, this is good because it can operate in-place and doesn't
937 amb 269 allocate more memory like using qsort() does.
938    
939     void **datap A pointer to the array of pointers to sort.
940    
941     size_t nitems The number of items of data to sort.
942    
943 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
944     to qsort if the data to be sorted is an array of things not pointers.
945 amb 269 ++++++++++++++++++++++++++++++++++++++*/
946    
947 amb 1106 void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*))
948 amb 269 {
949 amb 875 void **datap1=&datap[-1];
950 amb 269 int i;
951    
952     /* Fill the heap by pretending to insert the data that is already there */
953    
954 amb 875 for(i=2;i<=nitems;i++)
955 amb 269 {
956     int index=i;
957    
958     /* Bubble up the new value (upside-down, put largest at top) */
959    
960 amb 875 while(index>1)
961 amb 269 {
962     int newindex;
963     void *temp;
964    
965 amb 875 newindex=index/2;
966 amb 269
967 amb 1106 if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */
968 amb 869 break;
969    
970 amb 875 temp=datap1[index];
971     datap1[index]=datap1[newindex];
972     datap1[newindex]=temp;
973 amb 269
974     index=newindex;
975     }
976     }
977    
978     /* Repeatedly pull out the root of the heap and swap with the bottom item */
979    
980 amb 875 for(i=nitems;i>1;i--)
981 amb 269 {
982 amb 875 int index=1;
983 amb 269 void *temp;
984    
985 amb 875 temp=datap1[index];
986     datap1[index]=datap1[i];
987     datap1[i]=temp;
988 amb 269
989     /* Bubble down the new value (upside-down, put largest at top) */
990    
991 amb 875 while((2*index)<(i-1))
992 amb 269 {
993 amb 873 int newindex;
994 amb 269 void *temp;
995    
996 amb 875 newindex=2*index;
997 amb 269
998 amb 1106 if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */
999 amb 873 newindex=newindex+1;
1000 amb 869
1001 amb 1106 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1002 amb 869 break;
1003    
1004 amb 875 temp=datap1[newindex];
1005     datap1[newindex]=datap1[index];
1006     datap1[index]=temp;
1007 amb 269
1008     index=newindex;
1009     }
1010    
1011 amb 875 if((2*index)==(i-1))
1012 amb 269 {
1013     int newindex;
1014     void *temp;
1015    
1016 amb 875 newindex=2*index;
1017 amb 269
1018 amb 1106 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1019 amb 869 ; /* break */
1020     else
1021     {
1022 amb 875 temp=datap1[newindex];
1023     datap1[newindex]=datap1[index];
1024     datap1[index]=temp;
1025 amb 869 }
1026 amb 269 }
1027     }
1028     }

Properties

Name Value
cvs:description Functions to perform sorting.