Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Annotation of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1999 - (hide annotations) (download) (as text)
Sat Jul 27 10:33:04 2019 UTC (5 years, 8 months ago) by amb
File MIME type: text/x-csrc
File size: 28532 byte(s)
Add more checking of memory allocation success/failure by combining
the allocation and the assert into one function.

1 amb 269 /***************************************
2     Merge sort functions.
3    
4     Part of the Routino routing software.
5     ******************/ /******************
6 amb 1983 This file Copyright 2009-2015, 2017, 2019 Andrew M. Bishop
7 amb 269
8     This program is free software: you can redistribute it and/or modify
9     it under the terms of the GNU Affero General Public License as published by
10     the Free Software Foundation, either version 3 of the License, or
11     (at your option) any later version.
12    
13     This program is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16     GNU Affero General Public License for more details.
17    
18     You should have received a copy of the GNU Affero General Public License
19     along with this program. If not, see <http://www.gnu.org/licenses/>.
20     ***************************************/
21    
22    
23     #include <stdio.h>
24     #include <stdlib.h>
25     #include <string.h>
26    
27 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
28     #include <pthread.h>
29     #endif
30    
31 amb 532 #include "types.h"
32    
33 amb 1166 #include "logging.h"
34 amb 449 #include "files.h"
35 amb 532 #include "sorting.h"
36 amb 269
37    
38 amb 680 /* Global variables */
39 amb 269
40 amb 289 /*+ The command line '--tmpdir' option or its default value. +*/
41 amb 284 extern char *option_tmpdirname;
42 amb 269
43 amb 358 /*+ The amount of RAM to use for filesorting. +*/
44     extern size_t option_filesort_ramsize;
45 amb 269
46 amb 991 /*+ The number of filesorting threads allowed. +*/
47     extern int option_filesort_threads;
48 amb 358
49 amb 991
50     /* Thread data type definitions */
51    
52     /*+ A data type for holding data for a thread. +*/
53     typedef struct _thread_data
54     {
55 amb 1649 #if defined(USE_PTHREADS) && USE_PTHREADS
56    
57 amb 991 pthread_t thread; /*+ The thread identifier. +*/
58    
59     int running; /*+ A flag indicating the current state of the thread. +*/
60    
61 amb 1649 #endif
62    
63 amb 1652 char *data; /*+ The main data array. +*/
64 amb 991 void **datap; /*+ An array of pointers to the data objects. +*/
65     size_t n; /*+ The number of pointers. +*/
66    
67     char *filename; /*+ The name of the file to write the results to. +*/
68    
69     size_t itemsize; /*+ The size of each item. +*/
70     int (*compare)(const void*,const void*); /*+ The comparison function. +*/
71     }
72     thread_data;
73    
74 amb 996 /* Thread variables */
75 amb 991
76 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
77    
78     static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
79 amb 1943 static pthread_mutex_t files_mutex = PTHREAD_MUTEX_INITIALIZER;
80     static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
81 amb 996
82     #endif
83    
84 amb 991 /* Thread helper functions */
85    
86     static void *filesort_fixed_heapsort_thread(thread_data *thread);
87     static void *filesort_vary_heapsort_thread(thread_data *thread);
88    
89    
90 amb 269 /*++++++++++++++++++++++++++++++++++++++
91 amb 310 A function to sort the contents of a file of fixed length objects using a
92     limited amount of RAM.
93 amb 269
94     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
95     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
96     The individual sort steps and the merge step both use a "Heap sort"
97     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
98     if the data is already partially sorted.
99    
100 amb 948 index_t filesort_fixed Returns the number of objects kept.
101    
102 amb 269 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
103    
104     int fd_out The file descriptor of the output file (opened for writing and empty).
105    
106     size_t itemsize The size of each item in the file that needs sorting.
107    
108 amb 1106 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
109     each item before they have been sorted. The second parameter is the number of objects
110     previously read from the input file. If the function returns 1 then the object is kept
111     and it is sorted, otherwise it is ignored.
112 amb 269
113 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
114     to qsort if the data to be sorted is an array of things not pointers.
115    
116     int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
117     each item after they have been sorted. The second parameter is the number of objects
118     already written to the output file. If the function returns 1 then the object is written
119     to the output file., otherwise it is ignored.
120 amb 269 ++++++++++++++++++++++++++++++++++++++*/
121    
122 amb 1106 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t),
123     int (*compare_function)(const void*,const void*),
124     int (*post_sort_function)(void*,index_t))
125 amb 269 {
126     int *fds=NULL,*heap=NULL;
127     int nfiles=0,ndata=0;
128 amb 1112 index_t count_out=0,count_in=0,total=0;
129 amb 1985 size_t nitems,item;
130 amb 1652 char *data;
131     void **datap;
132 amb 991 thread_data *threads;
133 amb 269 int i,more=1;
134 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
135     int nthreads=0;
136     #endif
137 amb 269
138     /* Allocate the RAM buffer and other bits */
139    
140 amb 1985 nitems=(size_t)SizeFileFD(fd_in)/itemsize;
141 amb 1591
142     if(nitems==0)
143     return(0);
144    
145     if((nitems*(itemsize+sizeof(void*)))<option_filesort_ramsize)
146     nitems=1+nitems/option_filesort_threads;
147     else
148     nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
149    
150 amb 1999 threads=(thread_data*)calloc_logassert(option_filesort_threads,sizeof(thread_data));
151 amb 269
152 amb 991 for(i=0;i<option_filesort_threads;i++)
153     {
154 amb 1999 threads[i].data=malloc_logassert(nitems*itemsize);
155     threads[i].datap=malloc_logassert(nitems*sizeof(void*));
156 amb 991
157 amb 1935 log_malloc(threads[i].data ,nitems*itemsize);
158 amb 1598 log_malloc(threads[i].datap,nitems*sizeof(void*));
159    
160 amb 1999 threads[i].filename=(char*)malloc_logassert(strlen(option_tmpdirname)+24);
161 amb 991
162     threads[i].itemsize=itemsize;
163 amb 1106 threads[i].compare=compare_function;
164 amb 991 }
165    
166 amb 269 /* Loop around, fill the buffer, sort the data and write a temporary file */
167    
168     do
169     {
170 amb 991 int thread=0;
171 amb 269
172 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
173    
174 amb 1020 if(option_filesort_threads>1)
175     {
176     /* Find a spare slot (one *must* be unused at all times) */
177 amb 991
178 amb 1020 pthread_mutex_lock(&running_mutex);
179 amb 996
180 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
181     if(!threads[thread].running)
182     break;
183 amb 991
184 amb 1020 pthread_mutex_unlock(&running_mutex);
185     }
186 amb 996
187 amb 991 #endif
188    
189 amb 269 /* Read in the data and create pointers */
190    
191 amb 1310 for(item=0;item<nitems;)
192 amb 269 {
193 amb 1310 threads[thread].datap[item]=threads[thread].data+item*itemsize;
194 amb 269
195 amb 1409 if(ReadFileBuffered(fd_in,threads[thread].datap[item],itemsize))
196 amb 269 {
197     more=0;
198     break;
199     }
200    
201 amb 1310 if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in))
202 amb 1106 {
203 amb 1310 item++;
204 amb 1112 total++;
205 amb 1106 }
206 amb 1112
207     count_in++;
208 amb 269 }
209    
210 amb 1310 threads[thread].n=item;
211 amb 269
212 amb 1020 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
213 amb 546
214 amb 1112 if(more==0 && total==0)
215 amb 546 goto tidy_and_exit;
216    
217 amb 543 /* No new data read in this time round */
218    
219 amb 991 if(threads[thread].n==0)
220 amb 269 break;
221    
222 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
223 amb 269
224 amb 991 sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
225 amb 269
226 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
227 amb 269
228 amb 1021 /* Shortcut if only one file, don't write to disk */
229    
230     if(more==0 && nfiles==0)
231     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
232     else if(option_filesort_threads>1)
233 amb 269 {
234 amb 996 pthread_mutex_lock(&running_mutex);
235    
236 amb 991 while(nthreads==(option_filesort_threads-1))
237 amb 269 {
238 amb 991 for(i=0;i<option_filesort_threads;i++)
239     if(threads[i].running==2)
240     {
241     pthread_join(threads[i].thread,NULL);
242     threads[i].running=0;
243     nthreads--;
244     }
245    
246     if(nthreads==(option_filesort_threads-1))
247 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
248 amb 269 }
249    
250 amb 991 threads[thread].running=1;
251 amb 269
252 amb 996 pthread_mutex_unlock(&running_mutex);
253    
254 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
255 amb 269
256 amb 991 nthreads++;
257     }
258 amb 1020 else
259 amb 1021 filesort_fixed_heapsort_thread(&threads[thread]);
260 amb 269
261 amb 991 #else
262 amb 269
263 amb 1021 /* Shortcut if only one file, don't write to disk */
264    
265     if(more==0 && nfiles==0)
266     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
267     else
268 amb 1020 filesort_fixed_heapsort_thread(&threads[thread]);
269 amb 269
270 amb 991 #endif
271 amb 269
272     nfiles++;
273     }
274     while(more);
275    
276 amb 991 /* Wait for all of the threads to finish */
277 amb 269
278 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
279    
280 amb 1020 while(option_filesort_threads>1 && nthreads)
281 amb 991 {
282 amb 996 pthread_mutex_lock(&running_mutex);
283 amb 991
284 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
285    
286 amb 991 for(i=0;i<option_filesort_threads;i++)
287     if(threads[i].running==2)
288     {
289     pthread_join(threads[i].thread,NULL);
290     threads[i].running=0;
291     nthreads--;
292     }
293    
294 amb 996 pthread_mutex_unlock(&running_mutex);
295 amb 991 }
296    
297     #endif
298    
299 amb 1935 /* Shortcut if there are no files */
300    
301     if(nfiles==0)
302     goto tidy_and_exit;
303    
304 amb 991 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
305    
306 amb 269 if(nfiles==1)
307     {
308 amb 1310 for(item=0;item<threads[0].n;item++)
309 amb 269 {
310 amb 1310 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
311 amb 274 {
312 amb 1409 WriteFileBuffered(fd_out,threads[0].datap[item],itemsize);
313 amb 1106 count_out++;
314 amb 274 }
315 amb 269 }
316    
317 amb 991 DeleteFile(threads[0].filename);
318 amb 269
319 amb 283 goto tidy_and_exit;
320 amb 269 }
321    
322     /* Check that number of files is less than file size */
323    
324 amb 1310 logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)");
325 amb 269
326     /* Open all of the temporary files */
327    
328 amb 1999 fds=(int*)malloc_logassert(nfiles*sizeof(int));
329 amb 269
330     for(i=0;i<nfiles;i++)
331     {
332 amb 991 char *filename=threads[0].filename;
333    
334 amb 284 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
335 amb 269
336 amb 1409 fds[i]=ReOpenFileBuffered(filename);
337 amb 269
338     DeleteFile(filename);
339     }
340    
341     /* Perform an n-way merge using a binary heap */
342    
343 amb 1999 heap=(int*)malloc_logassert((1+nfiles)*sizeof(int));
344 amb 269
345 amb 991 data =threads[0].data;
346     datap=threads[0].datap;
347    
348 amb 269 /* Fill the heap to start with */
349    
350     for(i=0;i<nfiles;i++)
351     {
352     int index;
353    
354     datap[i]=data+i*itemsize;
355    
356 amb 1409 ReadFileBuffered(fds[i],datap[i],itemsize);
357 amb 269
358 amb 875 index=i+1;
359    
360 amb 867 heap[index]=i;
361 amb 269
362     /* Bubble up the new value */
363    
364 amb 875 while(index>1)
365 amb 269 {
366     int newindex;
367     int temp;
368    
369 amb 875 newindex=index/2;
370 amb 269
371 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
372 amb 869 break;
373    
374 amb 269 temp=heap[index];
375     heap[index]=heap[newindex];
376     heap[newindex]=temp;
377    
378     index=newindex;
379     }
380     }
381    
382     /* Repeatedly pull out the root of the heap and refill from the same file */
383    
384     ndata=nfiles;
385    
386     do
387     {
388 amb 875 int index=1;
389 amb 269
390 amb 1106 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
391 amb 274 {
392 amb 1409 WriteFileBuffered(fd_out,datap[heap[index]],itemsize);
393 amb 1106 count_out++;
394 amb 274 }
395 amb 269
396 amb 1409 if(ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize))
397 amb 269 {
398 amb 875 heap[index]=heap[ndata];
399 amb 870 ndata--;
400 amb 269 }
401    
402     /* Bubble down the new value */
403    
404 amb 875 while((2*index)<ndata)
405 amb 269 {
406 amb 873 int newindex;
407 amb 269 int temp;
408    
409 amb 875 newindex=2*index;
410 amb 269
411 amb 1106 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
412 amb 873 newindex=newindex+1;
413 amb 869
414 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
415 amb 869 break;
416    
417 amb 269 temp=heap[newindex];
418     heap[newindex]=heap[index];
419     heap[index]=temp;
420    
421     index=newindex;
422     }
423    
424 amb 875 if((2*index)==ndata)
425 amb 269 {
426     int newindex;
427     int temp;
428    
429 amb 875 newindex=2*index;
430 amb 269
431 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
432 amb 869 ; /* break */
433     else
434     {
435     temp=heap[newindex];
436     heap[newindex]=heap[index];
437     heap[index]=temp;
438     }
439 amb 269 }
440     }
441     while(ndata>0);
442    
443     /* Tidy up */
444    
445 amb 283 tidy_and_exit:
446 amb 269
447 amb 283 if(fds)
448     {
449     for(i=0;i<nfiles;i++)
450 amb 1409 CloseFileBuffered(fds[i]);
451 amb 283 free(fds);
452     }
453    
454     if(heap)
455     free(heap);
456    
457 amb 991 for(i=0;i<option_filesort_threads;i++)
458     {
459 amb 1598 log_free(threads[i].data);
460     log_free(threads[i].datap);
461    
462 amb 991 free(threads[i].data);
463     free(threads[i].datap);
464 amb 948
465 amb 991 free(threads[i].filename);
466     }
467    
468 amb 1441 free(threads);
469    
470 amb 1106 return(count_out);
471 amb 269 }
472    
473    
474     /*++++++++++++++++++++++++++++++++++++++
475 amb 310 A function to sort the contents of a file of variable length objects (each
476 amb 867 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
477 amb 310
478     The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
479     and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
480     The individual sort steps and the merge step both use a "Heap sort"
481     http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
482     if the data is already partially sorted.
483    
484 amb 948 index_t filesort_vary Returns the number of objects kept.
485    
486 amb 310 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
487    
488     int fd_out The file descriptor of the output file (opened for writing and empty).
489    
490 amb 1106 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
491     each item before they have been sorted. The second parameter is the number of objects
492     previously read from the input file. If the function returns 1 then the object is kept
493     and it is sorted, otherwise it is ignored.
494 amb 310
495 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
496     to qsort if the data to be sorted is an array of things not pointers.
497    
498     int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
499     each item after they have been sorted. The second parameter is the number of objects
500     already written to the output file. If the function returns 1 then the object is written
501     to the output file., otherwise it is ignored.
502 amb 310 ++++++++++++++++++++++++++++++++++++++*/
503    
504 amb 1106 index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t),
505     int (*compare_function)(const void*,const void*),
506     int (*post_sort_function)(void*,index_t))
507 amb 310 {
508     int *fds=NULL,*heap=NULL;
509     int nfiles=0,ndata=0;
510 amb 1112 index_t count_out=0,count_in=0,total=0;
511 amb 1985 size_t datasize,item;
512 amb 311 FILESORT_VARINT nextitemsize,largestitemsize=0;
513 amb 1652 char *data;
514     void **datap;
515 amb 991 thread_data *threads;
516 amb 310 int i,more=1;
517 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
518     int nthreads=0;
519     #endif
520 amb 310
521     /* Allocate the RAM buffer and other bits */
522    
523 amb 1985 datasize=(size_t)SizeFileFD(fd_in);
524 amb 1591
525     if(datasize==0)
526     return(0);
527    
528 amb 1678 /* We can not know in advance how many data items there are. Each
529     one will require RAM for data, FILESORT_VARALIGN and sizeof(void*)
530     Assume that data+FILESORT_VARALIGN+sizeof(void*) is 4*data. */
531    
532     if((datasize*4)<option_filesort_ramsize)
533     datasize=(datasize*4)/option_filesort_threads;
534 amb 1591 else
535     datasize=option_filesort_ramsize/option_filesort_threads;
536    
537 amb 1983 datasize=FILESORT_VARALIGN*((datasize+FILESORT_VARALIGN-1)/FILESORT_VARALIGN);
538    
539 amb 1999 threads=(thread_data*)calloc_logassert(option_filesort_threads,sizeof(thread_data));
540 amb 310
541 amb 991 for(i=0;i<option_filesort_threads;i++)
542     {
543 amb 1999 threads[i].data=malloc_logassert(datasize);
544 amb 991 threads[i].datap=NULL;
545    
546 amb 1598 log_malloc(threads[i].data,datasize);
547    
548 amb 1999 threads[i].filename=(char*)malloc_logassert(strlen(option_tmpdirname)+24);
549 amb 991
550 amb 1106 threads[i].compare=compare_function;
551 amb 991 }
552    
553 amb 310 /* Loop around, fill the buffer, sort the data and write a temporary file */
554    
555 amb 1409 if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
556 amb 310 goto tidy_and_exit;
557    
558     do
559     {
560 amb 311 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
561 amb 991 int thread=0;
562 amb 310
563 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
564 amb 310
565 amb 1020 if(option_filesort_threads>1)
566     {
567     /* Find a spare slot (one *must* be unused at all times) */
568 amb 991
569 amb 1020 pthread_mutex_lock(&running_mutex);
570 amb 996
571 amb 1020 for(thread=0;thread<option_filesort_threads;thread++)
572     if(!threads[thread].running)
573     break;
574 amb 991
575 amb 1020 pthread_mutex_unlock(&running_mutex);
576     }
577 amb 996
578 amb 991 #endif
579    
580 amb 1652 threads[thread].datap=(void**)(threads[thread].data+datasize);
581 amb 991
582     threads[thread].n=0;
583    
584 amb 310 /* Read in the data and create pointers */
585    
586 amb 1652 while((ramused+FILESORT_VARSIZE+nextitemsize)<=(size_t)((char*)threads[thread].datap-sizeof(void*)-threads[thread].data))
587 amb 310 {
588 amb 311 FILESORT_VARINT itemsize=nextitemsize;
589 amb 310
590 amb 991 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
591 amb 310
592 amb 311 ramused+=FILESORT_VARSIZE;
593 amb 310
594 amb 1409 ReadFileBuffered(fd_in,threads[thread].data+ramused,itemsize);
595 amb 310
596 amb 1106 if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in))
597     {
598     *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
599 amb 310
600 amb 1337 if(itemsize>largestitemsize)
601     largestitemsize=itemsize;
602    
603 amb 1106 ramused+=itemsize;
604 amb 311
605 amb 1983 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARALIGN-1)/FILESORT_VARALIGN);
606 amb 1106 ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
607 amb 311
608 amb 1112 total++;
609 amb 1106 threads[thread].n++;
610     }
611 amb 1337 else
612     ramused-=FILESORT_VARSIZE;
613 amb 310
614 amb 1112 count_in++;
615    
616 amb 1409 if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE))
617 amb 310 {
618     more=0;
619     break;
620     }
621     }
622    
623 amb 543 /* No new data read in this time round */
624    
625 amb 991 if(threads[thread].n==0)
626 amb 310 break;
627    
628 amb 991 /* Sort the data pointers using a heap sort (potentially in a thread) */
629 amb 310
630 amb 1021 if(more==0 && nfiles==0)
631     threads[thread].filename[0]=0;
632     else
633     sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
634 amb 310
635 amb 991 #if defined(USE_PTHREADS) && USE_PTHREADS
636 amb 310
637 amb 1021 /* Shortcut if only one file, don't write to disk */
638    
639     if(more==0 && nfiles==0)
640     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
641     else if(option_filesort_threads>1)
642 amb 310 {
643 amb 996 pthread_mutex_lock(&running_mutex);
644    
645 amb 991 while(nthreads==(option_filesort_threads-1))
646 amb 310 {
647 amb 991 for(i=0;i<option_filesort_threads;i++)
648     if(threads[i].running==2)
649     {
650     pthread_join(threads[i].thread,NULL);
651     threads[i].running=0;
652     nthreads--;
653     }
654    
655     if(nthreads==(option_filesort_threads-1))
656 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
657 amb 310 }
658    
659 amb 991 threads[thread].running=1;
660    
661 amb 996 pthread_mutex_unlock(&running_mutex);
662    
663 amb 991 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
664    
665     nthreads++;
666 amb 310 }
667 amb 1020 else
668 amb 1021 filesort_vary_heapsort_thread(&threads[thread]);
669 amb 310
670 amb 991 #else
671 amb 310
672 amb 1021 /* Shortcut if only one file, don't write to disk */
673    
674     if(more==0 && nfiles==0)
675     filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
676     else
677 amb 1020 filesort_vary_heapsort_thread(&threads[thread]);
678 amb 310
679 amb 991 #endif
680 amb 310
681 amb 991 nfiles++;
682     }
683     while(more);
684    
685     /* Wait for all of the threads to finish */
686    
687     #if defined(USE_PTHREADS) && USE_PTHREADS
688    
689 amb 1020 while(option_filesort_threads>1 && nthreads)
690 amb 991 {
691 amb 996 pthread_mutex_lock(&running_mutex);
692 amb 991
693 amb 996 pthread_cond_wait(&running_cond,&running_mutex);
694    
695 amb 991 for(i=0;i<option_filesort_threads;i++)
696     if(threads[i].running==2)
697     {
698     pthread_join(threads[i].thread,NULL);
699     threads[i].running=0;
700     nthreads--;
701     }
702    
703 amb 996 pthread_mutex_unlock(&running_mutex);
704 amb 991 }
705    
706     #endif
707    
708 amb 1935 /* Shortcut if there are no files */
709    
710     if(nfiles==0)
711     goto tidy_and_exit;
712    
713 amb 991 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
714    
715     if(nfiles==1)
716     {
717 amb 1310 for(item=0;item<threads[0].n;item++)
718 amb 310 {
719 amb 1310 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
720 amb 991 {
721 amb 1652 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)((char*)threads[0].datap[item]-FILESORT_VARSIZE);
722 amb 310
723 amb 1652 WriteFileBuffered(fd_out,(char*)threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
724 amb 1106 count_out++;
725 amb 991 }
726 amb 310 }
727    
728 amb 991 DeleteFile(threads[0].filename);
729 amb 310
730 amb 991 goto tidy_and_exit;
731 amb 310 }
732    
733     /* Check that number of files is less than file size */
734    
735 amb 1988 largestitemsize=FILESORT_VARALIGN*((largestitemsize+FILESORT_VARALIGN-1)/FILESORT_VARALIGN);
736 amb 310
737 amb 1988 logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/(FILESORT_VARALIGN+largestitemsize)),"Too many temporary files (use more sorting memory?)");
738 amb 310
739     /* Open all of the temporary files */
740    
741 amb 1999 fds=(int*)malloc_logassert(nfiles*sizeof(int));
742 amb 310
743     for(i=0;i<nfiles;i++)
744     {
745 amb 991 char *filename=threads[0].filename;
746    
747 amb 310 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
748    
749 amb 1409 fds[i]=ReOpenFileBuffered(filename);
750 amb 310
751     DeleteFile(filename);
752     }
753    
754     /* Perform an n-way merge using a binary heap */
755    
756 amb 1999 heap=(int*)malloc_logassert((1+nfiles)*sizeof(int));
757 amb 310
758 amb 991 data=threads[0].data;
759 amb 1652 datap=(void**)(data+datasize-nfiles*sizeof(void*));
760 amb 310
761     /* Fill the heap to start with */
762    
763     for(i=0;i<nfiles;i++)
764     {
765     int index;
766 amb 311 FILESORT_VARINT itemsize;
767 amb 310
768 amb 1988 datap[i]=data+FILESORT_VARALIGN+i*(largestitemsize+FILESORT_VARALIGN);
769 amb 310
770 amb 1409 ReadFileBuffered(fds[i],&itemsize,FILESORT_VARSIZE);
771 amb 310
772 amb 1652 *(FILESORT_VARINT*)((char*)datap[i]-FILESORT_VARSIZE)=itemsize;
773 amb 310
774 amb 1409 ReadFileBuffered(fds[i],datap[i],itemsize);
775 amb 310
776 amb 875 index=i+1;
777    
778 amb 867 heap[index]=i;
779 amb 310
780     /* Bubble up the new value */
781    
782 amb 875 while(index>1)
783 amb 310 {
784     int newindex;
785     int temp;
786    
787 amb 875 newindex=index/2;
788 amb 310
789 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
790 amb 869 break;
791    
792 amb 310 temp=heap[index];
793     heap[index]=heap[newindex];
794     heap[newindex]=temp;
795    
796     index=newindex;
797     }
798     }
799    
800     /* Repeatedly pull out the root of the heap and refill from the same file */
801    
802     ndata=nfiles;
803    
804     do
805     {
806 amb 875 int index=1;
807 amb 311 FILESORT_VARINT itemsize;
808 amb 310
809 amb 1106 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
810 amb 310 {
811 amb 1652 itemsize=*(FILESORT_VARINT*)((char*)datap[heap[index]]-FILESORT_VARSIZE);
812 amb 310
813 amb 1652 WriteFileBuffered(fd_out,(char*)datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
814 amb 1106 count_out++;
815 amb 310 }
816    
817 amb 1409 if(ReadFileBuffered(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
818 amb 310 {
819 amb 875 heap[index]=heap[ndata];
820 amb 870 ndata--;
821 amb 310 }
822     else
823     {
824 amb 1652 *(FILESORT_VARINT*)((char*)datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
825 amb 310
826 amb 1409 ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize);
827 amb 310 }
828    
829     /* Bubble down the new value */
830    
831 amb 875 while((2*index)<ndata)
832 amb 310 {
833 amb 873 int newindex;
834 amb 310 int temp;
835    
836 amb 875 newindex=2*index;
837 amb 310
838 amb 1106 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
839 amb 873 newindex=newindex+1;
840 amb 869
841 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
842 amb 869 break;
843    
844 amb 310 temp=heap[newindex];
845     heap[newindex]=heap[index];
846     heap[index]=temp;
847    
848     index=newindex;
849     }
850    
851 amb 875 if((2*index)==ndata)
852 amb 310 {
853     int newindex;
854     int temp;
855    
856 amb 875 newindex=2*index;
857 amb 310
858 amb 1106 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
859 amb 869 ; /* break */
860     else
861     {
862     temp=heap[newindex];
863     heap[newindex]=heap[index];
864     heap[index]=temp;
865     }
866 amb 310 }
867     }
868     while(ndata>0);
869    
870     /* Tidy up */
871    
872     tidy_and_exit:
873    
874     if(fds)
875     {
876     for(i=0;i<nfiles;i++)
877 amb 1409 CloseFileBuffered(fds[i]);
878 amb 310 free(fds);
879     }
880    
881     if(heap)
882     free(heap);
883    
884 amb 991 for(i=0;i<option_filesort_threads;i++)
885     {
886 amb 1598 log_free(threads[i].data);
887    
888 amb 991 free(threads[i].data);
889 amb 948
890 amb 991 free(threads[i].filename);
891     }
892    
893 amb 1441 free(threads);
894    
895 amb 1106 return(count_out);
896 amb 310 }
897    
898    
899     /*++++++++++++++++++++++++++++++++++++++
900 amb 991 A wrapper function that can be run in a thread for fixed data.
901    
902     void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
903    
904     thread_data *thread The data to be processed in this thread.
905     ++++++++++++++++++++++++++++++++++++++*/
906    
907     static void *filesort_fixed_heapsort_thread(thread_data *thread)
908     {
909 amb 1310 int fd;
910     size_t item;
911 amb 991
912     /* Sort the data pointers using a heap sort */
913    
914     filesort_heapsort(thread->datap,thread->n,thread->compare);
915    
916     /* Create a temporary file and write the result */
917    
918 amb 1943 #if defined(USE_PTHREADS) && USE_PTHREADS
919    
920     if(option_filesort_threads>1)
921     pthread_mutex_lock(&files_mutex);
922    
923     #endif
924    
925 amb 1409 fd=OpenFileBufferedNew(thread->filename);
926 amb 991
927 amb 1310 for(item=0;item<thread->n;item++)
928 amb 1409 WriteFileBuffered(fd,thread->datap[item],thread->itemsize);
929 amb 991
930 amb 1409 CloseFileBuffered(fd);
931 amb 991
932 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
933    
934 amb 1020 if(option_filesort_threads>1)
935     {
936 amb 1943 pthread_mutex_unlock(&files_mutex);
937    
938 amb 1020 pthread_mutex_lock(&running_mutex);
939 amb 996
940 amb 1020 thread->running=2;
941 amb 991
942 amb 1020 pthread_cond_signal(&running_cond);
943 amb 996
944 amb 1020 pthread_mutex_unlock(&running_mutex);
945     }
946 amb 996
947     #endif
948    
949 amb 991 return(NULL);
950     }
951    
952    
953     /*++++++++++++++++++++++++++++++++++++++
954     A wrapper function that can be run in a thread for variable data.
955    
956     void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
957    
958     thread_data *thread The data to be processed in this thread.
959     ++++++++++++++++++++++++++++++++++++++*/
960    
961     static void *filesort_vary_heapsort_thread(thread_data *thread)
962     {
963 amb 1310 int fd;
964     size_t item;
965 amb 991
966     /* Sort the data pointers using a heap sort */
967    
968     filesort_heapsort(thread->datap,thread->n,thread->compare);
969    
970     /* Create a temporary file and write the result */
971    
972 amb 1943 #if defined(USE_PTHREADS) && USE_PTHREADS
973    
974     if(option_filesort_threads>1)
975     pthread_mutex_lock(&files_mutex);
976    
977     #endif
978    
979 amb 1409 fd=OpenFileBufferedNew(thread->filename);
980 amb 991
981 amb 1310 for(item=0;item<thread->n;item++)
982 amb 991 {
983 amb 1652 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)((char*)thread->datap[item]-FILESORT_VARSIZE);
984 amb 991
985 amb 1652 WriteFileBuffered(fd,(char*)thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
986 amb 991 }
987    
988 amb 1409 CloseFileBuffered(fd);
989 amb 991
990 amb 996 #if defined(USE_PTHREADS) && USE_PTHREADS
991    
992 amb 1020 if(option_filesort_threads>1)
993     {
994 amb 1943 pthread_mutex_unlock(&files_mutex);
995    
996 amb 1020 pthread_mutex_lock(&running_mutex);
997 amb 996
998 amb 1020 thread->running=2;
999 amb 991
1000 amb 1020 pthread_cond_signal(&running_cond);
1001 amb 996
1002 amb 1020 pthread_mutex_unlock(&running_mutex);
1003     }
1004 amb 996
1005     #endif
1006    
1007 amb 991 return(NULL);
1008     }
1009    
1010    
1011     /*++++++++++++++++++++++++++++++++++++++
1012 amb 269 A function to sort an array of pointers efficiently.
1013    
1014     The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
1015 amb 873 in particular, this is good because it can operate in-place and doesn't
1016 amb 269 allocate more memory like using qsort() does.
1017    
1018     void **datap A pointer to the array of pointers to sort.
1019    
1020     size_t nitems The number of items of data to sort.
1021    
1022 amb 1106 int (*compare_function)(const void*, const void*) The comparison function. This is identical
1023     to qsort if the data to be sorted is an array of things not pointers.
1024 amb 269 ++++++++++++++++++++++++++++++++++++++*/
1025    
1026 amb 1106 void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*))
1027 amb 269 {
1028 amb 875 void **datap1=&datap[-1];
1029 amb 1310 size_t item;
1030 amb 269
1031     /* Fill the heap by pretending to insert the data that is already there */
1032    
1033 amb 1310 for(item=2;item<=nitems;item++)
1034 amb 269 {
1035 amb 1310 size_t index=item;
1036 amb 269
1037     /* Bubble up the new value (upside-down, put largest at top) */
1038    
1039 amb 875 while(index>1)
1040 amb 269 {
1041     int newindex;
1042     void *temp;
1043    
1044 amb 875 newindex=index/2;
1045 amb 269
1046 amb 1106 if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */
1047 amb 869 break;
1048    
1049 amb 875 temp=datap1[index];
1050     datap1[index]=datap1[newindex];
1051     datap1[newindex]=temp;
1052 amb 269
1053     index=newindex;
1054     }
1055     }
1056    
1057     /* Repeatedly pull out the root of the heap and swap with the bottom item */
1058    
1059 amb 1310 for(item=nitems;item>1;item--)
1060 amb 269 {
1061 amb 1310 size_t index=1;
1062 amb 1986 void *temp;
1063 amb 269
1064 amb 875 temp=datap1[index];
1065 amb 1310 datap1[index]=datap1[item];
1066     datap1[item]=temp;
1067 amb 269
1068     /* Bubble down the new value (upside-down, put largest at top) */
1069    
1070 amb 1310 while((2*index)<(item-1))
1071 amb 269 {
1072 amb 873 int newindex;
1073 amb 1983 void **temp;
1074 amb 269
1075 amb 875 newindex=2*index;
1076 amb 269
1077 amb 1106 if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */
1078 amb 873 newindex=newindex+1;
1079 amb 869
1080 amb 1106 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1081 amb 869 break;
1082    
1083 amb 875 temp=datap1[newindex];
1084     datap1[newindex]=datap1[index];
1085     datap1[index]=temp;
1086 amb 269
1087     index=newindex;
1088     }
1089    
1090 amb 1310 if((2*index)==(item-1))
1091 amb 269 {
1092     int newindex;
1093 amb 1986 void *temp;
1094 amb 269
1095 amb 875 newindex=2*index;
1096 amb 269
1097 amb 1106 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1098 amb 869 ; /* break */
1099     else
1100     {
1101 amb 875 temp=datap1[newindex];
1102     datap1[newindex]=datap1[index];
1103     datap1[index]=temp;
1104 amb 869 }
1105 amb 269 }
1106     }
1107     }

Properties

Name Value
cvs:description Functions to perform sorting.