Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Annotation of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1310 - (hide annotations) (download) (as text)
Fri May 10 19:02:28 2013 UTC (11 years, 10 months ago) by amb
File MIME type: text/x-csrc
File size: 26583 byte(s)
Change data type from signed to unsigned (pedantic compiler warning).

1 amb 269 /***************************************
2     Merge sort functions.
3    
4     Part of the Routino routing software.
5     ******************/ /******************
6 amb 1310 This file Copyright 2009-2013 Andrew M. Bishop
7 amb 269
8     This program is free software: you can redistribute it and/or modify
9     it under the terms of the GNU Affero General Public License as published by
10     the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12    
13     This program is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16     GNU Affero General Public License for more details.
17    
18     You should have received a copy of the GNU Affero General Public License
19     along with this program. If not, see <http://www.gnu.org/licenses/>.
20     ***************************************/
21    
22    
23     #include <stdio.h>
24     #include <stdlib.h>
25     #include <string.h>
26    
27 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
28     #include <pthread.h>
29     #endif
30    
31 amb 532 #include "types.h"
32    
33 amb 1166 #include "logging.h"
34 amb 449 #include "files.h"
35 amb 532 #include "sorting.h"
36 amb 269
37    
38 amb 680 /* Global variables */
39 amb 269
40 amb 289 /*+ The command line '--tmpdir' option or its default value. +*/
41 amb 284 extern char *option_tmpdirname;
42 amb 269
43 amb 358 /*+ The amount of RAM to use for filesorting. +*/
44     extern size_t option_filesort_ramsize;
45 amb 269
46 amb 991 /*+ The number of filesorting threads allowed. +*/
47     extern int option_filesort_threads;
48 amb 358
49 amb 991
50     /* Thread data type definitions */
51    
52     /*+ A data type for holding data for a thread. +*/
53     typedef struct _thread_data
54     {
55     pthread_t thread; /*+ The thread identifier. +*/
56    
57     int running; /*+ A flag indicating the current state of the thread. +*/
58    
59     void *data; /*+ The main data array. +*/
60     void **datap; /*+ An array of pointers to the data objects. +*/
61     size_t n; /*+ The number of pointers. +*/
62    
63     char *filename; /*+ The name of the file to write the results to. +*/
64    
65     size_t itemsize; /*+ The size of each item. +*/
66     int (*compare)(const void*,const void*); /*+ The comparison function. +*/
67     }
68     thread_data;
69    
70 amb 996 /* Thread variables */
71 amb 991
72 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
73    
74     static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
75     static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
76    
77     #endif
78    
79 amb 991 /* Thread helper functions */
80    
81     static void *filesort_fixed_heapsort_thread(thread_data *thread);
82     static void *filesort_vary_heapsort_thread(thread_data *thread);
83    
84    
85 amb 269 /*++++++++++++++++++++++++++++++++++++++
86 amb 310 A function to sort the contents of a file of fixed length objects using a
87     limited amount of RAM.
88 amb 269
89     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
90     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
91     The individual sort steps and the merge step both use a "Heap sort"
92     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
93     if the data is already partially sorted.
94    
95 amb 948 index_t filesort_fixed Returns the number of objects kept.
96    
97 amb 269 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
98    
99     int fd_out The file descriptor of the output file (opened for writing and empty).
100    
101     size_t itemsize The size of each item in the file that needs sorting.
102    
103 amb 1106 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
104     each item before they have been sorted. The second parameter is the number of objects
105     previously read from the input file. If the function returns 1 then the object is kept
106     and it is sorted, otherwise it is ignored.
107 amb 269
108 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
109     to qsort if the data to be sorted is an array of things not pointers.
110    
111     int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
112     each item after they have been sorted. The second parameter is the number of objects
113     already written to the output file. If the function returns 1 then the object is written
114     to the output file., otherwise it is ignored.
115 amb 269 ++++++++++++++++++++++++++++++++++++++*/
116    
117 amb 1106 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t),
118     int (*compare_function)(const void*,const void*),
119     int (*post_sort_function)(void*,index_t))
120 amb 269 {
121     int *fds=NULL,*heap=NULL;
122     int nfiles=0,ndata=0;
123 amb 1112 index_t count_out=0,count_in=0,total=0;
124 amb 991 size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
125     void *data,**datap;
126     thread_data *threads;
127 amb 1310 size_t item;
128 amb 269 int i,more=1;
129 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
130     int nthreads=0;
131     #endif
132 amb 269
133     /* Allocate the RAM buffer and other bits */
134    
135 amb 991 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
136 amb 269
137 amb 991 for(i=0;i<option_filesort_threads;i++)
138     {
139     threads[i].running=0;
140 amb 269
141 amb 991 threads[i].data=malloc(nitems*itemsize);
142     threads[i].datap=malloc(nitems*sizeof(void*));
143    
144     threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
145    
146     threads[i].itemsize=itemsize;
147 amb 1106 threads[i].compare=compare_function;
148 amb 991 }
149    
150 amb 269 /* Loop around, fill the buffer, sort the data and write a temporary file */
151    
152     do
153     {
154 amb 991 int thread=0;
155 amb 269
156 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
157    
158 amb 1020 if(option_filesort_threads>1)
159     {
160     /* Find a spare slot (one *must* be unused at all times) */
161 amb 991
162 amb 1020 pthread_mutex_lock(&running_mutex);
163 amb 996
164 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
165     if(!threads[thread].running)
166     break;
167 amb 991
168 amb 1020 pthread_mutex_unlock(&running_mutex);
169     }
170 amb 996
171 amb 991 #endif
172    
173 amb 269 /* Read in the data and create pointers */
174    
175 amb 1310 for(item=0;item<nitems;)
176 amb 269 {
177 amb 1310 threads[thread].datap[item]=threads[thread].data+item*itemsize;
178 amb 269
179 amb 1310 if(ReadFile(fd_in,threads[thread].datap[item],itemsize))
180 amb 269 {
181     more=0;
182     break;
183     }
184    
185 amb 1310 if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in))
186 amb 1106 {
187 amb 1310 item++;
188 amb 1112 total++;
189 amb 1106 }
190 amb 1112
191     count_in++;
192 amb 269 }
193    
194 amb 1310 threads[thread].n=item;
195 amb 269
196 amb 1020 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
197 amb 546
198 amb 1112 if(more==0 && total==0)
199 amb 546 goto tidy_and_exit;
200    
201 amb 543 /* No new data read in this time round */
202    
203 amb 991 if(threads[thread].n==0)
204 amb 269 break;
205    
206 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
207 amb 269
208 amb 991 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
209 amb 269
210 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
211 amb 269
212 amb 1021 /* Shortcut if only one file, don't write to disk */
213    
214     if(more==0 && nfiles==0)
215     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
216     else if(option_filesort_threads>1)
217 amb 269 {
218 amb 996 pthread_mutex_lock(&running_mutex);
219    
220 amb 991 while(nthreads==(option_filesort_threads-1))
221 amb 269 {
222 amb 991 for(i=0;i<option_filesort_threads;i++)
223     if(threads[i].running==2)
224     {
225     pthread_join(threads[i].thread,NULL);
226     threads[i].running=0;
227     nthreads--;
228     }
229    
230     if(nthreads==(option_filesort_threads-1))
231 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
232 amb 269 }
233    
234 amb 991 threads[thread].running=1;
235 amb 269
236 amb 996 pthread_mutex_unlock(&running_mutex);
237    
238 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
239 amb 269
240 amb 991 nthreads++;
241     }
242 amb 1020 else
243 amb 1021 filesort_fixed_heapsort_thread(&threads[thread]);
244 amb 269
245 amb 991 #else
246 amb 269
247 amb 1021 /* Shortcut if only one file, don't write to disk */
248    
249     if(more==0 && nfiles==0)
250     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
251     else
252 amb 1020 filesort_fixed_heapsort_thread(&threads[thread]);
253 amb 269
254 amb 991 #endif
255 amb 269
256     nfiles++;
257     }
258     while(more);
259    
260 amb 991 /* Wait for all of the threads to finish */
261 amb 269
262 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
263    
264 amb 1020 while(option_filesort_threads>1 && nthreads)
265 amb 991 {
266 amb 996 pthread_mutex_lock(&running_mutex);
267 amb 991
268 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
269    
270 amb 991 for(i=0;i<option_filesort_threads;i++)
271     if(threads[i].running==2)
272     {
273     pthread_join(threads[i].thread,NULL);
274     threads[i].running=0;
275     nthreads--;
276     }
277    
278 amb 996 pthread_mutex_unlock(&running_mutex);
279 amb 991 }
280    
281     #endif
282    
283     /* Shortcut if only one file, lucky for us we still have the data in RAM) */
284    
285 amb 269 if(nfiles==1)
286     {
287 amb 1310 for(item=0;item<threads[0].n;item++)
288 amb 269 {
289 amb 1310 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
290 amb 274 {
291 amb 1310 WriteFile(fd_out,threads[0].datap[item],itemsize);
292 amb 1106 count_out++;
293 amb 274 }
294 amb 269 }
295    
296 amb 991 DeleteFile(threads[0].filename);
297 amb 269
298 amb 283 goto tidy_and_exit;
299 amb 269 }
300    
301     /* Check that number of files is less than file size */
302    
303 amb 1310 logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)");
304 amb 269
305     /* Open all of the temporary files */
306    
307     fds=(int*)malloc(nfiles*sizeof(int));
308    
309     for(i=0;i<nfiles;i++)
310     {
311 amb 991 char *filename=threads[0].filename;
312    
313 amb 284 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
314 amb 269
315     fds[i]=ReOpenFile(filename);
316    
317     DeleteFile(filename);
318     }
319    
320     /* Perform an n-way merge using a binary heap */
321    
322 amb 875 heap=(int*)malloc((1+nfiles)*sizeof(int));
323 amb 269
324 amb 991 data =threads[0].data;
325     datap=threads[0].datap;
326    
327 amb 269 /* Fill the heap to start with */
328    
329     for(i=0;i<nfiles;i++)
330     {
331     int index;
332    
333     datap[i]=data+i*itemsize;
334    
335     ReadFile(fds[i],datap[i],itemsize);
336    
337 amb 875 index=i+1;
338    
339 amb 867 heap[index]=i;
340 amb 269
341     /* Bubble up the new value */
342    
343 amb 875 while(index>1)
344 amb 269 {
345     int newindex;
346     int temp;
347    
348 amb 875 newindex=index/2;
349 amb 269
350 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
351 amb 869 break;
352    
353 amb 269 temp=heap[index];
354     heap[index]=heap[newindex];
355     heap[newindex]=temp;
356    
357     index=newindex;
358     }
359     }
360    
361     /* Repeatedly pull out the root of the heap and refill from the same file */
362    
363     ndata=nfiles;
364    
365     do
366     {
367 amb 875 int index=1;
368 amb 269
369 amb 1106 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
370 amb 274 {
371 amb 867 WriteFile(fd_out,datap[heap[index]],itemsize);
372 amb 1106 count_out++;
373 amb 274 }
374 amb 269
375 amb 867 if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize))
376 amb 269 {
377 amb 875 heap[index]=heap[ndata];
378 amb 870 ndata--;
379 amb 269 }
380    
381     /* Bubble down the new value */
382    
383 amb 875 while((2*index)<ndata)
384 amb 269 {
385 amb 873 int newindex;
386 amb 269 int temp;
387    
388 amb 875 newindex=2*index;
389 amb 269
390 amb 1106 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
391 amb 873 newindex=newindex+1;
392 amb 869
393 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
394 amb 869 break;
395    
396 amb 269 temp=heap[newindex];
397     heap[newindex]=heap[index];
398     heap[index]=temp;
399    
400     index=newindex;
401     }
402    
403 amb 875 if((2*index)==ndata)
404 amb 269 {
405     int newindex;
406     int temp;
407    
408 amb 875 newindex=2*index;
409 amb 269
410 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
411 amb 869 ; /* break */
412     else
413     {
414     temp=heap[newindex];
415     heap[newindex]=heap[index];
416     heap[index]=temp;
417     }
418 amb 269 }
419     }
420     while(ndata>0);
421    
422     /* Tidy up */
423    
424 amb 283 tidy_and_exit:
425 amb 269
426 amb 283 if(fds)
427     {
428     for(i=0;i<nfiles;i++)
429     CloseFile(fds[i]);
430     free(fds);
431     }
432    
433     if(heap)
434     free(heap);
435    
436 amb 991 for(i=0;i<option_filesort_threads;i++)
437     {
438     free(threads[i].data);
439     free(threads[i].datap);
440 amb 948
441 amb 991 free(threads[i].filename);
442     }
443    
444 amb 1106 return(count_out);
445 amb 269 }
446    
447    
448     /*++++++++++++++++++++++++++++++++++++++
449 amb 310 A function to sort the contents of a file of variable length objects (each
450 amb 867 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
451 amb 310
452     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
453     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
454     The individual sort steps and the merge step both use a "Heap sort"
455     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
456     if the data is already partially sorted.
457    
458 amb 948 index_t filesort_vary Returns the number of objects kept.
459    
460 amb 310 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
461    
462     int fd_out The file descriptor of the output file (opened for writing and empty).
463    
464 amb 1106 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
465     each item before they have been sorted. The second parameter is the number of objects
466     previously read from the input file. If the function returns 1 then the object is kept
467     and it is sorted, otherwise it is ignored.
468 amb 310
469 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
470     to qsort if the data to be sorted is an array of things not pointers.
471    
472     int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
473     each item after they have been sorted. The second parameter is the number of objects
474     already written to the output file. If the function returns 1 then the object is written
475     to the output file., otherwise it is ignored.
476 amb 310 ++++++++++++++++++++++++++++++++++++++*/
477    
478 amb 1106 index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t),
479     int (*compare_function)(const void*,const void*),
480     int (*post_sort_function)(void*,index_t))
481 amb 310 {
482     int *fds=NULL,*heap=NULL;
483     int nfiles=0,ndata=0;
484 amb 1112 index_t count_out=0,count_in=0,total=0;
485 amb 991 size_t datasize=option_filesort_ramsize/option_filesort_threads;
486 amb 311 FILESORT_VARINT nextitemsize,largestitemsize=0;
487 amb 991 void *data,**datap;
488     thread_data *threads;
489 amb 1310 size_t item;
490 amb 310 int i,more=1;
491 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
492     int nthreads=0;
493     #endif
494 amb 310
495     /* Allocate the RAM buffer and other bits */
496    
497 amb 991 threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data));
498 amb 310
499 amb 991 for(i=0;i<option_filesort_threads;i++)
500     {
501     threads[i].running=0;
502 amb 310
503 amb 991 threads[i].data=malloc(datasize);
504     threads[i].datap=NULL;
505    
506     threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24);
507    
508 amb 1106 threads[i].compare=compare_function;
509 amb 991 }
510    
511 amb 310 /* Loop around, fill the buffer, sort the data and write a temporary file */
512    
513 amb 311 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
514 amb 310 goto tidy_and_exit;
515    
516     do
517     {
518 amb 311 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
519 amb 991 int thread=0;
520 amb 310
521 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
522 amb 310
523 amb 1020 if(option_filesort_threads>1)
524     {
525     /* Find a spare slot (one *must* be unused at all times) */
526 amb 991
527 amb 1020 pthread_mutex_lock(&running_mutex);
528 amb 996
529 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
530     if(!threads[thread].running)
531     break;
532 amb 991
533 amb 1020 pthread_mutex_unlock(&running_mutex);
534     }
535 amb 996
536 amb 991 #endif
537    
538     threads[thread].datap=threads[thread].data+datasize;
539    
540     threads[thread].n=0;
541    
542 amb 310 /* Read in the data and create pointers */
543    
544 amb 1310 while((ramused+FILESORT_VARSIZE+nextitemsize)<=(unsigned)((void*)threads[thread].datap-sizeof(void*)-threads[thread].data))
545 amb 310 {
546 amb 311 FILESORT_VARINT itemsize=nextitemsize;
547 amb 310
548     if(itemsize>largestitemsize)
549     largestitemsize=itemsize;
550    
551 amb 991 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
552 amb 310
553 amb 311 ramused+=FILESORT_VARSIZE;
554 amb 310
555 amb 991 ReadFile(fd_in,threads[thread].data+ramused,itemsize);
556 amb 310
557 amb 1106 if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in))
558     {
559     *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
560 amb 310
561 amb 1106 ramused+=itemsize;
562 amb 311
563 amb 1106 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN);
564     ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
565 amb 311
566 amb 1112 total++;
567 amb 1106 threads[thread].n++;
568     }
569 amb 310
570 amb 1112 count_in++;
571    
572 amb 311 if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE))
573 amb 310 {
574     more=0;
575     break;
576     }
577     }
578    
579 amb 543 /* No new data read in this time round */
580    
581 amb 991 if(threads[thread].n==0)
582 amb 310 break;
583    
584 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
585 amb 310
586 amb 1021 if(more==0 && nfiles==0)
587     threads[thread].filename[0]=0;
588     else
589     sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
590 amb 310
591 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
592 amb 310
593 amb 1021 /* Shortcut if only one file, don't write to disk */
594    
595     if(more==0 && nfiles==0)
596     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
597     else if(option_filesort_threads>1)
598 amb 310 {
599 amb 996 pthread_mutex_lock(&running_mutex);
600    
601 amb 991 while(nthreads==(option_filesort_threads-1))
602 amb 310 {
603 amb 991 for(i=0;i<option_filesort_threads;i++)
604     if(threads[i].running==2)
605     {
606     pthread_join(threads[i].thread,NULL);
607     threads[i].running=0;
608     nthreads--;
609     }
610    
611     if(nthreads==(option_filesort_threads-1))
612 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
613 amb 310 }
614    
615 amb 991 threads[thread].running=1;
616    
617 amb 996 pthread_mutex_unlock(&running_mutex);
618    
619 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
620    
621     nthreads++;
622 amb 310 }
623 amb 1020 else
624 amb 1021 filesort_vary_heapsort_thread(&threads[thread]);
625 amb 310
626 amb 991 #else
627 amb 310
628 amb 1021 /* Shortcut if only one file, don't write to disk */
629    
630     if(more==0 && nfiles==0)
631     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
632     else
633 amb 1020 filesort_vary_heapsort_thread(&threads[thread]);
634 amb 310
635 amb 991 #endif
636 amb 310
637 amb 991 nfiles++;
638     }
639     while(more);
640    
641     /* Wait for all of the threads to finish */
642    
643     #if defined(USE_PTHREADS) && USE_PTHREADS
644    
645 amb 1020 while(option_filesort_threads>1 && nthreads)
646 amb 991 {
647 amb 996 pthread_mutex_lock(&running_mutex);
648 amb 991
649 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
650    
651 amb 991 for(i=0;i<option_filesort_threads;i++)
652     if(threads[i].running==2)
653     {
654     pthread_join(threads[i].thread,NULL);
655     threads[i].running=0;
656     nthreads--;
657     }
658    
659 amb 996 pthread_mutex_unlock(&running_mutex);
660 amb 991 }
661    
662     #endif
663    
664     /* Shortcut if only one file, lucky for us we still have the data in RAM) */
665    
666     if(nfiles==1)
667     {
668 amb 1310 for(item=0;item<threads[0].n;item++)
669 amb 310 {
670 amb 1310 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
671 amb 991 {
672 amb 1310 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[item]-FILESORT_VARSIZE);
673 amb 310
674 amb 1310 WriteFile(fd_out,threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
675 amb 1106 count_out++;
676 amb 991 }
677 amb 310 }
678    
679 amb 991 DeleteFile(threads[0].filename);
680 amb 310
681 amb 991 goto tidy_and_exit;
682 amb 310 }
683    
684     /* Check that number of files is less than file size */
685    
686 amb 311 largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN);
687 amb 310
688 amb 1310 logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize),"Too many temporary files (use more sorting memory?)");
689 amb 310
690     /* Open all of the temporary files */
691    
692     fds=(int*)malloc(nfiles*sizeof(int));
693    
694     for(i=0;i<nfiles;i++)
695     {
696 amb 991 char *filename=threads[0].filename;
697    
698 amb 310 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
699    
700     fds[i]=ReOpenFile(filename);
701    
702     DeleteFile(filename);
703     }
704    
705     /* Perform an n-way merge using a binary heap */
706    
707 amb 875 heap=(int*)malloc((1+nfiles)*sizeof(int));
708 amb 310
709 amb 991 data=threads[0].data;
710     datap=data+datasize-nfiles*sizeof(void*);
711 amb 310
712     /* Fill the heap to start with */
713    
714     for(i=0;i<nfiles;i++)
715     {
716     int index;
717 amb 311 FILESORT_VARINT itemsize;
718 amb 310
719 amb 311 datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize;
720 amb 310
721 amb 311 ReadFile(fds[i],&itemsize,FILESORT_VARSIZE);
722 amb 310
723 amb 311 *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize;
724 amb 310
725     ReadFile(fds[i],datap[i],itemsize);
726    
727 amb 875 index=i+1;
728    
729 amb 867 heap[index]=i;
730 amb 310
731     /* Bubble up the new value */
732    
733 amb 875 while(index>1)
734 amb 310 {
735     int newindex;
736     int temp;
737    
738 amb 875 newindex=index/2;
739 amb 310
740 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
741 amb 869 break;
742    
743 amb 310 temp=heap[index];
744     heap[index]=heap[newindex];
745     heap[newindex]=temp;
746    
747     index=newindex;
748     }
749     }
750    
751     /* Repeatedly pull out the root of the heap and refill from the same file */
752    
753     ndata=nfiles;
754    
755     do
756     {
757 amb 875 int index=1;
758 amb 311 FILESORT_VARINT itemsize;
759 amb 310
760 amb 1106 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
761 amb 310 {
762 amb 867 itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE);
763 amb 310
764 amb 867 WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
765 amb 1106 count_out++;
766 amb 310 }
767    
768 amb 867 if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
769 amb 310 {
770 amb 875 heap[index]=heap[ndata];
771 amb 870 ndata--;
772 amb 310 }
773     else
774     {
775 amb 867 *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
776 amb 310
777 amb 867 ReadFile(fds[heap[index]],datap[heap[index]],itemsize);
778 amb 310 }
779    
780     /* Bubble down the new value */
781    
782 amb 875 while((2*index)<ndata)
783 amb 310 {
784 amb 873 int newindex;
785 amb 310 int temp;
786    
787 amb 875 newindex=2*index;
788 amb 310
789 amb 1106 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
790 amb 873 newindex=newindex+1;
791 amb 869
792 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
793 amb 869 break;
794    
795 amb 310 temp=heap[newindex];
796     heap[newindex]=heap[index];
797     heap[index]=temp;
798    
799     index=newindex;
800     }
801    
802 amb 875 if((2*index)==ndata)
803 amb 310 {
804     int newindex;
805     int temp;
806    
807 amb 875 newindex=2*index;
808 amb 310
809 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
810 amb 869 ; /* break */
811     else
812     {
813     temp=heap[newindex];
814     heap[newindex]=heap[index];
815     heap[index]=temp;
816     }
817 amb 310 }
818     }
819     while(ndata>0);
820    
821     /* Tidy up */
822    
823     tidy_and_exit:
824    
825     if(fds)
826     {
827     for(i=0;i<nfiles;i++)
828     CloseFile(fds[i]);
829     free(fds);
830     }
831    
832     if(heap)
833     free(heap);
834    
835 amb 991 for(i=0;i<option_filesort_threads;i++)
836     {
837     free(threads[i].data);
838 amb 948
839 amb 991 free(threads[i].filename);
840     }
841    
842 amb 1106 return(count_out);
843 amb 310 }
844    
845    
846     /*++++++++++++++++++++++++++++++++++++++
847 amb 991 A wrapper function that can be run in a thread for fixed data.
848    
849     void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
850    
851     thread_data *thread The data to be processed in this thread.
852     ++++++++++++++++++++++++++++++++++++++*/
853    
854     static void *filesort_fixed_heapsort_thread(thread_data *thread)
855     {
856 amb 1310 int fd;
857     size_t item;
858 amb 991
859     /* Sort the data pointers using a heap sort */
860    
861     filesort_heapsort(thread->datap,thread->n,thread->compare);
862    
863     /* Create a temporary file and write the result */
864    
865     fd=OpenFileNew(thread->filename);
866    
867 amb 1310 for(item=0;item<thread->n;item++)
868     WriteFile(fd,thread->datap[item],thread->itemsize);
869 amb 991
870     CloseFile(fd);
871    
872 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
873    
874 amb 1020 if(option_filesort_threads>1)
875     {
876     pthread_mutex_lock(&running_mutex);
877 amb 996
878 amb 1020 thread->running=2;
879 amb 991
880 amb 1020 pthread_cond_signal(&running_cond);
881 amb 996
882 amb 1020 pthread_mutex_unlock(&running_mutex);
883     }
884 amb 996
885     #endif
886    
887 amb 991 return(NULL);
888     }
889    
890    
891     /*++++++++++++++++++++++++++++++++++++++
892     A wrapper function that can be run in a thread for variable data.
893    
894     void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
895    
896     thread_data *thread The data to be processed in this thread.
897     ++++++++++++++++++++++++++++++++++++++*/
898    
899     static void *filesort_vary_heapsort_thread(thread_data *thread)
900     {
901 amb 1310 int fd;
902     size_t item;
903 amb 991
904     /* Sort the data pointers using a heap sort */
905    
906     filesort_heapsort(thread->datap,thread->n,thread->compare);
907    
908     /* Create a temporary file and write the result */
909    
910     fd=OpenFileNew(thread->filename);
911    
912 amb 1310 for(item=0;item<thread->n;item++)
913 amb 991 {
914 amb 1310 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[item]-FILESORT_VARSIZE);
915 amb 991
916 amb 1310 WriteFile(fd,thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
917 amb 991 }
918    
919     CloseFile(fd);
920    
921 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
922    
923 amb 1020 if(option_filesort_threads>1)
924     {
925     pthread_mutex_lock(&running_mutex);
926 amb 996
927 amb 1020 thread->running=2;
928 amb 991
929 amb 1020 pthread_cond_signal(&running_cond);
930 amb 996
931 amb 1020 pthread_mutex_unlock(&running_mutex);
932     }
933 amb 996
934     #endif
935    
936 amb 991 return(NULL);
937     }
938    
939    
940     /*++++++++++++++++++++++++++++++++++++++
941 amb 269 A function to sort an array of pointers efficiently.
942    
943     The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
944 amb 873 in particular, this is good because it can operate in-place and doesn't
945 amb 269 allocate more memory like using qsort() does.
946    
947     void **datap A pointer to the array of pointers to sort.
948    
949     size_t nitems The number of items of data to sort.
950    
951 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
952     to qsort if the data to be sorted is an array of things not pointers.
953 amb 269 ++++++++++++++++++++++++++++++++++++++*/
954    
955 amb 1106 void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*))
956 amb 269 {
957 amb 875 void **datap1=&datap[-1];
958 amb 1310 size_t item;
959 amb 269
960     /* Fill the heap by pretending to insert the data that is already there */
961    
962 amb 1310 for(item=2;item<=nitems;item++)
963 amb 269 {
964 amb 1310 size_t index=item;
965 amb 269
966     /* Bubble up the new value (upside-down, put largest at top) */
967    
968 amb 875 while(index>1)
969 amb 269 {
970     int newindex;
971     void *temp;
972    
973 amb 875 newindex=index/2;
974 amb 269
975 amb 1106 if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */
976 amb 869 break;
977    
978 amb 875 temp=datap1[index];
979     datap1[index]=datap1[newindex];
980     datap1[newindex]=temp;
981 amb 269
982     index=newindex;
983     }
984     }
985    
986     /* Repeatedly pull out the root of the heap and swap with the bottom item */
987    
988 amb 1310 for(item=nitems;item>1;item--)
989 amb 269 {
990 amb 1310 size_t index=1;
991 amb 269 void *temp;
992    
993 amb 875 temp=datap1[index];
994 amb 1310 datap1[index]=datap1[item];
995     datap1[item]=temp;
996 amb 269
997     /* Bubble down the new value (upside-down, put largest at top) */
998    
999 amb 1310 while((2*index)<(item-1))
1000 amb 269 {
1001 amb 873 int newindex;
1002 amb 269 void *temp;
1003    
1004 amb 875 newindex=2*index;
1005 amb 269
1006 amb 1106 if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */
1007 amb 873 newindex=newindex+1;
1008 amb 869
1009 amb 1106 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1010 amb 869 break;
1011    
1012 amb 875 temp=datap1[newindex];
1013     datap1[newindex]=datap1[index];
1014     datap1[index]=temp;
1015 amb 269
1016     index=newindex;
1017     }
1018    
1019 amb 1310 if((2*index)==(item-1))
1020 amb 269 {
1021     int newindex;
1022     void *temp;
1023    
1024 amb 875 newindex=2*index;
1025 amb 269
1026 amb 1106 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1027 amb 869 ; /* break */
1028     else
1029     {
1030 amb 875 temp=datap1[newindex];
1031     datap1[newindex]=datap1[index];
1032     datap1[index]=temp;
1033 amb 869 }
1034 amb 269 }
1035     }
1036     }

Properties

Name Value
cvs:description Functions to perform sorting.