Routino SVN Repository Browser

Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino

ViewVC logotype

Contents of /trunk/src/sorting.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2136 - (show annotations) (download) (as text)
Mon May 29 14:07:33 2023 UTC (22 months ago) by amb
File MIME type: text/x-csrc
File size: 28332 byte(s)
Improve the shortcut if all the data fitted into RAM (1/Nth of the RAM
if multi-threaded) so that it is not written to disk at all.

1 /***************************************
2 Merge sort functions.
3
4 Part of the Routino routing software.
5 ******************/ /******************
6 This file Copyright 2009-2015, 2017, 2019, 2023 Andrew M. Bishop
7
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU Affero General Public License as published by
10 the Free Software Foundation, either version 3 of the License, or
11 (at your option) any later version.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU Affero General Public License for more details.
17
18 You should have received a copy of the GNU Affero General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ***************************************/
21
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #if defined(USE_PTHREADS) && USE_PTHREADS
28 #include <pthread.h>
29 #endif
30
31 #include "types.h"
32
33 #include "logging.h"
34 #include "files.h"
35 #include "sorting.h"
36
37
38 /* Global variables */
39
40 /*+ The command line '--tmpdir' option or its default value. +*/
41 extern char *option_tmpdirname;
42
43 /*+ The amount of RAM to use for filesorting. +*/
44 extern size_t option_filesort_ramsize;
45
46 /*+ The number of filesorting threads allowed. +*/
47 extern int option_filesort_threads;
48
49
50 /* Thread data type definitions */
51
52 /*+ A data type for holding data for a thread. +*/
53 typedef struct _thread_data
54 {
55 #if defined(USE_PTHREADS) && USE_PTHREADS
56
57 pthread_t thread; /*+ The thread identifier. +*/
58
59 int running; /*+ A flag indicating the current state of the thread. +*/
60
61 #endif
62
63 char *data; /*+ The main data array. +*/
64 void **datap; /*+ An array of pointers to the data objects. +*/
65 size_t n; /*+ The number of pointers. +*/
66
67 int fd; /*+ The file descriptor of the file to write the results to. +*/
68
69 size_t itemsize; /*+ The size of each item. +*/
70 int (*compare)(const void*,const void*); /*+ The comparison function. +*/
71 }
72 thread_data;
73
74 /* Thread variables */
75
76 #if defined(USE_PTHREADS) && USE_PTHREADS
77
78 static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
79 static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER;
80
81 #endif
82
83 /* Thread helper functions */
84
85 static void *filesort_fixed_heapsort_thread(thread_data *thread);
86 static void *filesort_vary_heapsort_thread(thread_data *thread);
87
88
89 /*++++++++++++++++++++++++++++++++++++++
90 A function to sort the contents of a file of fixed length objects using a
91 limited amount of RAM.
92
93 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
94 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
95 The individual sort steps and the merge step both use a "Heap sort"
96 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
97 if the data is already partially sorted.
98
99 index_t filesort_fixed Returns the number of objects kept.
100
101 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
102
103 int fd_out The file descriptor of the output file (opened for writing and empty).
104
105 size_t itemsize The size of each item in the file that needs sorting.
106
107 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
108 each item before they have been sorted. The second parameter is the number of objects
109 previously read from the input file. If the function returns 1 then the object is kept
110 and it is sorted, otherwise it is ignored.
111
112 int (*compare_function)(const void*, const void*) The comparison function. This is identical
113 to qsort if the data to be sorted is an array of things not pointers.
114
115 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
116 each item after they have been sorted. The second parameter is the number of objects
117 already written to the output file. If the function returns 1 then the object is written
118 to the output file., otherwise it is ignored.
119 ++++++++++++++++++++++++++++++++++++++*/
120
121 index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t),
122 int (*compare_function)(const void*,const void*),
123 int (*post_sort_function)(void*,index_t))
124 {
125 int *fds=NULL,*heap=NULL;
126 int nfiles=0,ndata=0;
127 index_t count_out=0,count_in=0,total=0;
128 size_t nitems,item;
129 char *data;
130 void **datap;
131 thread_data *threads;
132 int i,more=1;
133 char *filename=(char*)malloc_logassert(strlen(option_tmpdirname)+24);
134 #if defined(USE_PTHREADS) && USE_PTHREADS
135 int nthreads=0;
136 #endif
137
138 /* Allocate the RAM buffer and other bits */
139
140 nitems=(size_t)SizeFileFD(fd_in)/itemsize;
141
142 if(nitems==0)
143 return(0);
144
145 if((nitems*(itemsize+sizeof(void*)))<option_filesort_ramsize)
146 nitems=1+nitems/option_filesort_threads;
147 else
148 nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*)));
149
150 threads=(thread_data*)calloc_logassert(option_filesort_threads,sizeof(thread_data));
151
152 for(i=0;i<option_filesort_threads;i++)
153 {
154 threads[i].fd=-1;
155
156 threads[i].data=malloc_logassert(nitems*itemsize);
157 threads[i].datap=malloc_logassert(nitems*sizeof(void*));
158
159 log_malloc(threads[i].data ,nitems*itemsize);
160 log_malloc(threads[i].datap,nitems*sizeof(void*));
161
162 threads[i].itemsize=itemsize;
163 threads[i].compare=compare_function;
164 }
165
166 /* Loop around, fill the buffer, sort the data and write a temporary file */
167
168 do
169 {
170 int thread=0;
171
172 #if defined(USE_PTHREADS) && USE_PTHREADS
173
174 if(option_filesort_threads>1)
175 {
176 /* If all threads are in use wait for an existing thread to finish */
177
178 if(nthreads==option_filesort_threads)
179 {
180 pthread_mutex_lock(&running_mutex);
181
182 while(nthreads==option_filesort_threads)
183 {
184 for(i=0;i<option_filesort_threads;i++)
185 if(threads[i].running==2)
186 {
187 pthread_join(threads[i].thread,NULL);
188 threads[i].running=0;
189 CloseFileBuffered(threads[i].fd);
190 nthreads--;
191 }
192
193 if(nthreads==option_filesort_threads)
194 pthread_cond_wait(&running_cond,&running_mutex);
195 }
196
197 pthread_mutex_unlock(&running_mutex);
198 }
199
200 /* Find a spare slot */
201
202 pthread_mutex_lock(&running_mutex);
203
204 for(thread=0;thread<option_filesort_threads;thread++)
205 if(!threads[thread].running)
206 break;
207
208 pthread_mutex_unlock(&running_mutex);
209 }
210
211 #endif
212
213 /* Read in the data and create pointers */
214
215 for(item=0;item<nitems;)
216 {
217 threads[thread].datap[item]=threads[thread].data+item*itemsize;
218
219 if(ReadFileBuffered(fd_in,threads[thread].datap[item],itemsize))
220 {
221 more=0;
222 break;
223 }
224
225 if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in))
226 {
227 item++;
228 total++;
229 }
230
231 count_in++;
232 }
233
234 threads[thread].n=item;
235
236 /* Shortcut if there is no previous data and no more data (i.e. no data at all) */
237
238 if(more==0 && total==0)
239 goto tidy_and_exit;
240
241 /* No new data read in this time round */
242
243 if(threads[thread].n==0)
244 break;
245
246 /* Shortcut if only one file, don't write to disk */
247
248 if(more==0 && nfiles==0)
249 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
250
251 else
252 {
253 /* Create the file descriptor (not thread-safe) */
254
255 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
256
257 threads[thread].fd=OpenFileBufferedNew(filename);
258
259 if(option_filesort_threads==1)
260 {
261 filesort_fixed_heapsort_thread(&threads[thread]);
262
263 CloseFileBuffered(threads[thread].fd);
264 }
265
266 #if defined(USE_PTHREADS) && USE_PTHREADS
267
268 else
269 {
270 pthread_mutex_lock(&running_mutex);
271
272 threads[thread].running=1;
273
274 pthread_mutex_unlock(&running_mutex);
275
276 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]);
277
278 nthreads++;
279 }
280
281 #endif
282
283 }
284
285 nfiles++;
286 }
287 while(more);
288
289 /* Wait for all of the threads to finish */
290
291 #if defined(USE_PTHREADS) && USE_PTHREADS
292
293 while(option_filesort_threads>1 && nthreads)
294 {
295 pthread_mutex_lock(&running_mutex);
296
297 pthread_cond_wait(&running_cond,&running_mutex);
298
299 for(i=0;i<option_filesort_threads;i++)
300 if(threads[i].running==2)
301 {
302 pthread_join(threads[i].thread,NULL);
303 threads[i].running=0;
304 CloseFileBuffered(threads[i].fd);
305 nthreads--;
306 }
307
308 pthread_mutex_unlock(&running_mutex);
309 }
310
311 #endif
312
313 /* Shortcut if there are no files */
314
315 if(nfiles==0)
316 goto tidy_and_exit;
317
318 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
319
320 if(nfiles==1)
321 {
322 for(item=0;item<threads[0].n;item++)
323 {
324 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
325 {
326 WriteFileBuffered(fd_out,threads[0].datap[item],itemsize);
327 count_out++;
328 }
329 }
330
331 goto tidy_and_exit;
332 }
333
334 /* Check that number of files is less than file size */
335
336 logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)");
337
338 /* Open all of the temporary files */
339
340 fds=(int*)malloc_logassert(nfiles*sizeof(int));
341
342 for(i=0;i<nfiles;i++)
343 {
344 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
345
346 fds[i]=ReOpenFileBuffered(filename);
347
348 DeleteFile(filename);
349 }
350
351 /* Perform an n-way merge using a binary heap */
352
353 heap=(int*)malloc_logassert((1+nfiles)*sizeof(int));
354
355 data =threads[0].data;
356 datap=threads[0].datap;
357
358 /* Fill the heap to start with */
359
360 for(i=0;i<nfiles;i++)
361 {
362 int index;
363
364 datap[i]=data+i*itemsize;
365
366 ReadFileBuffered(fds[i],datap[i],itemsize);
367
368 index=i+1;
369
370 heap[index]=i;
371
372 /* Bubble up the new value */
373
374 while(index>1)
375 {
376 int newindex;
377 int temp;
378
379 newindex=index/2;
380
381 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
382 break;
383
384 temp=heap[index];
385 heap[index]=heap[newindex];
386 heap[newindex]=temp;
387
388 index=newindex;
389 }
390 }
391
392 /* Repeatedly pull out the root of the heap and refill from the same file */
393
394 ndata=nfiles;
395
396 do
397 {
398 int index=1;
399
400 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
401 {
402 WriteFileBuffered(fd_out,datap[heap[index]],itemsize);
403 count_out++;
404 }
405
406 if(ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize))
407 {
408 heap[index]=heap[ndata];
409 ndata--;
410 }
411
412 /* Bubble down the new value */
413
414 while((2*index)<ndata)
415 {
416 int newindex;
417 int temp;
418
419 newindex=2*index;
420
421 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
422 newindex=newindex+1;
423
424 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
425 break;
426
427 temp=heap[newindex];
428 heap[newindex]=heap[index];
429 heap[index]=temp;
430
431 index=newindex;
432 }
433
434 if((2*index)==ndata)
435 {
436 int newindex;
437 int temp;
438
439 newindex=2*index;
440
441 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
442 ; /* break */
443 else
444 {
445 temp=heap[newindex];
446 heap[newindex]=heap[index];
447 heap[index]=temp;
448 }
449 }
450 }
451 while(ndata>0);
452
453 /* Tidy up */
454
455 tidy_and_exit:
456
457 if(fds)
458 {
459 for(i=0;i<nfiles;i++)
460 CloseFileBuffered(fds[i]);
461 free(fds);
462 }
463
464 if(heap)
465 free(heap);
466
467 for(i=0;i<option_filesort_threads;i++)
468 {
469 log_free(threads[i].data);
470 log_free(threads[i].datap);
471
472 free(threads[i].data);
473 free(threads[i].datap);
474 }
475
476 free(threads);
477
478 free(filename);
479
480 return(count_out);
481 }
482
483
484 /*++++++++++++++++++++++++++++++++++++++
485 A function to sort the contents of a file of variable length objects (each
486 preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM.
487
488 The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort
489 and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting.
490 The individual sort steps and the merge step both use a "Heap sort"
491 http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well
492 if the data is already partially sorted.
493
494 index_t filesort_vary Returns the number of objects kept.
495
496 int fd_in The file descriptor of the input file (opened for reading and at the beginning).
497
498 int fd_out The file descriptor of the output file (opened for writing and empty).
499
500 int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for
501 each item before they have been sorted. The second parameter is the number of objects
502 previously read from the input file. If the function returns 1 then the object is kept
503 and it is sorted, otherwise it is ignored.
504
505 int (*compare_function)(const void*, const void*) The comparison function. This is identical
506 to qsort if the data to be sorted is an array of things not pointers.
507
508 int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for
509 each item after they have been sorted. The second parameter is the number of objects
510 already written to the output file. If the function returns 1 then the object is written
511 to the output file., otherwise it is ignored.
512 ++++++++++++++++++++++++++++++++++++++*/
513
514 index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t),
515 int (*compare_function)(const void*,const void*),
516 int (*post_sort_function)(void*,index_t))
517 {
518 int *fds=NULL,*heap=NULL;
519 int nfiles=0,ndata=0;
520 index_t count_out=0,count_in=0,total=0;
521 size_t datasize,item;
522 FILESORT_VARINT nextitemsize,largestitemsize=0;
523 char *data;
524 void **datap;
525 thread_data *threads;
526 int i,more=1;
527 char *filename=(char*)malloc_logassert(strlen(option_tmpdirname)+24);
528 #if defined(USE_PTHREADS) && USE_PTHREADS
529 int nthreads=0;
530 #endif
531
532 /* Allocate the RAM buffer and other bits */
533
534 datasize=(size_t)SizeFileFD(fd_in);
535
536 if(datasize==0)
537 return(0);
538
539 /* We can not know in advance how many data items there are. Each
540 one will require RAM for data, FILESORT_VARALIGN and sizeof(void*)
541 Assume that data+FILESORT_VARALIGN+sizeof(void*) is 4*data. */
542
543 if((datasize*4)<option_filesort_ramsize)
544 datasize=(datasize*4)/option_filesort_threads;
545 else
546 datasize=option_filesort_ramsize/option_filesort_threads;
547
548 datasize=FILESORT_VARALIGN*((datasize+FILESORT_VARALIGN-1)/FILESORT_VARALIGN);
549
550 threads=(thread_data*)calloc_logassert(option_filesort_threads,sizeof(thread_data));
551
552 for(i=0;i<option_filesort_threads;i++)
553 {
554 threads[i].fd=-1;
555
556 threads[i].data=malloc_logassert(datasize);
557 threads[i].datap=NULL;
558
559 log_malloc(threads[i].data,datasize);
560
561 threads[i].compare=compare_function;
562 }
563
564 /* Loop around, fill the buffer, sort the data and write a temporary file */
565
566 if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */
567 goto tidy_and_exit;
568
569 do
570 {
571 size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE;
572 int thread=0;
573
574 #if defined(USE_PTHREADS) && USE_PTHREADS
575
576 if(option_filesort_threads>1)
577 {
578 /* If all threads are in use wait for an existing thread to finish */
579
580 if(nthreads==option_filesort_threads)
581 {
582 pthread_mutex_lock(&running_mutex);
583
584 while(nthreads==option_filesort_threads)
585 {
586 for(i=0;i<option_filesort_threads;i++)
587 if(threads[i].running==2)
588 {
589 pthread_join(threads[i].thread,NULL);
590 threads[i].running=0;
591 CloseFileBuffered(threads[i].fd);
592 nthreads--;
593 }
594
595 if(nthreads==option_filesort_threads)
596 pthread_cond_wait(&running_cond,&running_mutex);
597 }
598
599 pthread_mutex_unlock(&running_mutex);
600 }
601
602 /* Find a spare slot */
603
604 pthread_mutex_lock(&running_mutex);
605
606 for(thread=0;thread<option_filesort_threads;thread++)
607 if(!threads[thread].running)
608 break;
609
610 pthread_mutex_unlock(&running_mutex);
611 }
612
613 #endif
614
615 threads[thread].datap=(void**)(threads[thread].data+datasize);
616
617 threads[thread].n=0;
618
619 /* Read in the data and create pointers */
620
621 while((ramused+FILESORT_VARSIZE+nextitemsize)<=(size_t)((char*)threads[thread].datap-sizeof(void*)-threads[thread].data))
622 {
623 FILESORT_VARINT itemsize=nextitemsize;
624
625 *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize;
626
627 ramused+=FILESORT_VARSIZE;
628
629 ReadFileBuffered(fd_in,threads[thread].data+ramused,itemsize);
630
631 if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in))
632 {
633 *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */
634
635 if(itemsize>largestitemsize)
636 largestitemsize=itemsize;
637
638 ramused+=itemsize;
639
640 ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARALIGN-1)/FILESORT_VARALIGN);
641 ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE;
642
643 total++;
644 threads[thread].n++;
645 }
646 else
647 ramused-=FILESORT_VARSIZE;
648
649 count_in++;
650
651 if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE))
652 {
653 more=0;
654 break;
655 }
656 }
657
658 /* No new data read in this time round */
659
660 if(threads[thread].n==0)
661 break;
662
663 /* Shortcut if only one file, don't write to disk */
664
665 if(more==0 && nfiles==0)
666 filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare);
667
668 else
669 {
670 /* Create the file descriptor (not thread-safe) */
671
672 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles);
673
674 threads[thread].fd=OpenFileBufferedNew(filename);
675
676 if(option_filesort_threads==1)
677 {
678 filesort_vary_heapsort_thread(&threads[thread]);
679
680 CloseFileBuffered(threads[thread].fd);
681 }
682
683 #if defined(USE_PTHREADS) && USE_PTHREADS
684
685 else
686 {
687 pthread_mutex_lock(&running_mutex);
688
689 threads[thread].running=1;
690
691 pthread_mutex_unlock(&running_mutex);
692
693 pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]);
694
695 nthreads++;
696 }
697
698 #endif
699
700 }
701
702 nfiles++;
703 }
704 while(more);
705
706 /* Wait for all of the threads to finish */
707
708 #if defined(USE_PTHREADS) && USE_PTHREADS
709
710 while(option_filesort_threads>1 && nthreads)
711 {
712 pthread_mutex_lock(&running_mutex);
713
714 pthread_cond_wait(&running_cond,&running_mutex);
715
716 for(i=0;i<option_filesort_threads;i++)
717 if(threads[i].running==2)
718 {
719 pthread_join(threads[i].thread,NULL);
720 threads[i].running=0;
721 CloseFileBuffered(threads[i].fd);
722 nthreads--;
723 }
724
725 pthread_mutex_unlock(&running_mutex);
726 }
727
728 #endif
729
730 /* Shortcut if there are no files */
731
732 if(nfiles==0)
733 goto tidy_and_exit;
734
735 /* Shortcut if only one file, lucky for us we still have the data in RAM) */
736
737 if(nfiles==1)
738 {
739 for(item=0;item<threads[0].n;item++)
740 {
741 if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out))
742 {
743 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)((char*)threads[0].datap[item]-FILESORT_VARSIZE);
744
745 WriteFileBuffered(fd_out,(char*)threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
746 count_out++;
747 }
748 }
749
750 DeleteFile(filename);
751
752 goto tidy_and_exit;
753 }
754
755 /* Check that number of files is less than file size */
756
757 largestitemsize=FILESORT_VARALIGN*((largestitemsize+FILESORT_VARALIGN-1)/FILESORT_VARALIGN);
758
759 logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/(FILESORT_VARALIGN+largestitemsize)),"Too many temporary files (use more sorting memory?)");
760
761 /* Open all of the temporary files */
762
763 fds=(int*)malloc_logassert(nfiles*sizeof(int));
764
765 for(i=0;i<nfiles;i++)
766 {
767 sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i);
768
769 fds[i]=ReOpenFileBuffered(filename);
770
771 DeleteFile(filename);
772 }
773
774 /* Perform an n-way merge using a binary heap */
775
776 heap=(int*)malloc_logassert((1+nfiles)*sizeof(int));
777
778 data=threads[0].data;
779 datap=(void**)(data+datasize-nfiles*sizeof(void*));
780
781 /* Fill the heap to start with */
782
783 for(i=0;i<nfiles;i++)
784 {
785 int index;
786 FILESORT_VARINT itemsize;
787
788 datap[i]=data+FILESORT_VARALIGN+i*(largestitemsize+FILESORT_VARALIGN);
789
790 ReadFileBuffered(fds[i],&itemsize,FILESORT_VARSIZE);
791
792 *(FILESORT_VARINT*)((char*)datap[i]-FILESORT_VARSIZE)=itemsize;
793
794 ReadFileBuffered(fds[i],datap[i],itemsize);
795
796 index=i+1;
797
798 heap[index]=i;
799
800 /* Bubble up the new value */
801
802 while(index>1)
803 {
804 int newindex;
805 int temp;
806
807 newindex=index/2;
808
809 if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0)
810 break;
811
812 temp=heap[index];
813 heap[index]=heap[newindex];
814 heap[newindex]=temp;
815
816 index=newindex;
817 }
818 }
819
820 /* Repeatedly pull out the root of the heap and refill from the same file */
821
822 ndata=nfiles;
823
824 do
825 {
826 int index=1;
827 FILESORT_VARINT itemsize;
828
829 if(!post_sort_function || post_sort_function(datap[heap[index]],count_out))
830 {
831 itemsize=*(FILESORT_VARINT*)((char*)datap[heap[index]]-FILESORT_VARSIZE);
832
833 WriteFileBuffered(fd_out,(char*)datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
834 count_out++;
835 }
836
837 if(ReadFileBuffered(fds[heap[index]],&itemsize,FILESORT_VARSIZE))
838 {
839 heap[index]=heap[ndata];
840 ndata--;
841 }
842 else
843 {
844 *(FILESORT_VARINT*)((char*)datap[heap[index]]-FILESORT_VARSIZE)=itemsize;
845
846 ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize);
847 }
848
849 /* Bubble down the new value */
850
851 while((2*index)<ndata)
852 {
853 int newindex;
854 int temp;
855
856 newindex=2*index;
857
858 if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0)
859 newindex=newindex+1;
860
861 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
862 break;
863
864 temp=heap[newindex];
865 heap[newindex]=heap[index];
866 heap[index]=temp;
867
868 index=newindex;
869 }
870
871 if((2*index)==ndata)
872 {
873 int newindex;
874 int temp;
875
876 newindex=2*index;
877
878 if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0)
879 ; /* break */
880 else
881 {
882 temp=heap[newindex];
883 heap[newindex]=heap[index];
884 heap[index]=temp;
885 }
886 }
887 }
888 while(ndata>0);
889
890 /* Tidy up */
891
892 tidy_and_exit:
893
894 if(fds)
895 {
896 for(i=0;i<nfiles;i++)
897 CloseFileBuffered(fds[i]);
898 free(fds);
899 }
900
901 if(heap)
902 free(heap);
903
904 for(i=0;i<option_filesort_threads;i++)
905 {
906 log_free(threads[i].data);
907
908 free(threads[i].data);
909 }
910
911 free(threads);
912
913 free(filename);
914
915 return(count_out);
916 }
917
918
919 /*++++++++++++++++++++++++++++++++++++++
920 A wrapper function that can be run in a thread for fixed data.
921
922 void *filesort_fixed_heapsort_thread Returns NULL (required to return void*).
923
924 thread_data *thread The data to be processed in this thread.
925 ++++++++++++++++++++++++++++++++++++++*/
926
927 static void *filesort_fixed_heapsort_thread(thread_data *thread)
928 {
929 size_t item;
930
931 /* Sort the data pointers using a heap sort */
932
933 filesort_heapsort(thread->datap,thread->n,thread->compare);
934
935 /* Write the result to the given temporary file */
936
937 if(thread->fd > 0)
938 for(item=0;item<thread->n;item++)
939 WriteFileBuffered(thread->fd,thread->datap[item],thread->itemsize);
940
941 #if defined(USE_PTHREADS) && USE_PTHREADS
942
943 if(option_filesort_threads>1)
944 {
945 pthread_mutex_lock(&running_mutex);
946
947 thread->running=2;
948
949 pthread_cond_signal(&running_cond);
950
951 pthread_mutex_unlock(&running_mutex);
952 }
953
954 #endif
955
956 return(NULL);
957 }
958
959
960 /*++++++++++++++++++++++++++++++++++++++
961 A wrapper function that can be run in a thread for variable data.
962
963 void *filesort_vary_heapsort_thread Returns NULL (required to return void*).
964
965 thread_data *thread The data to be processed in this thread.
966 ++++++++++++++++++++++++++++++++++++++*/
967
968 static void *filesort_vary_heapsort_thread(thread_data *thread)
969 {
970 size_t item;
971
972 /* Sort the data pointers using a heap sort */
973
974 filesort_heapsort(thread->datap,thread->n,thread->compare);
975
976 /* Write the result to the given temporary file */
977
978 if(thread->fd > 0)
979 for(item=0;item<thread->n;item++)
980 {
981 FILESORT_VARINT itemsize=*(FILESORT_VARINT*)((char*)thread->datap[item]-FILESORT_VARSIZE);
982
983 WriteFileBuffered(thread->fd,(char*)thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE);
984 }
985
986 #if defined(USE_PTHREADS) && USE_PTHREADS
987
988 if(option_filesort_threads>1)
989 {
990 pthread_mutex_lock(&running_mutex);
991
992 thread->running=2;
993
994 pthread_cond_signal(&running_cond);
995
996 pthread_mutex_unlock(&running_mutex);
997 }
998
999 #endif
1000
1001 return(NULL);
1002 }
1003
1004
1005 /*++++++++++++++++++++++++++++++++++++++
1006 A function to sort an array of pointers efficiently.
1007
1008 The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort,
1009 in particular, this is good because it can operate in-place and doesn't
1010 allocate more memory like using qsort() does.
1011
1012 void **datap A pointer to the array of pointers to sort.
1013
1014 size_t nitems The number of items of data to sort.
1015
1016 int (*compare_function)(const void*, const void*) The comparison function. This is identical
1017 to qsort if the data to be sorted is an array of things not pointers.
1018 ++++++++++++++++++++++++++++++++++++++*/
1019
1020 void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*))
1021 {
1022 void **datap1=&datap[-1];
1023 size_t item;
1024
1025 /* Fill the heap by pretending to insert the data that is already there */
1026
1027 for(item=2;item<=nitems;item++)
1028 {
1029 size_t index=item;
1030
1031 /* Bubble up the new value (upside-down, put largest at top) */
1032
1033 while(index>1)
1034 {
1035 int newindex;
1036 void *temp;
1037
1038 newindex=index/2;
1039
1040 if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */
1041 break;
1042
1043 temp=datap1[index];
1044 datap1[index]=datap1[newindex];
1045 datap1[newindex]=temp;
1046
1047 index=newindex;
1048 }
1049 }
1050
1051 /* Repeatedly pull out the root of the heap and swap with the bottom item */
1052
1053 for(item=nitems;item>1;item--)
1054 {
1055 size_t index=1;
1056 void *temp;
1057
1058 temp=datap1[index];
1059 datap1[index]=datap1[item];
1060 datap1[item]=temp;
1061
1062 /* Bubble down the new value (upside-down, put largest at top) */
1063
1064 while((2*index)<(item-1))
1065 {
1066 int newindex;
1067 void **temp;
1068
1069 newindex=2*index;
1070
1071 if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */
1072 newindex=newindex+1;
1073
1074 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1075 break;
1076
1077 temp=datap1[newindex];
1078 datap1[newindex]=datap1[index];
1079 datap1[index]=temp;
1080
1081 index=newindex;
1082 }
1083
1084 if((2*index)==(item-1))
1085 {
1086 int newindex;
1087 void *temp;
1088
1089 newindex=2*index;
1090
1091 if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */
1092 ; /* break */
1093 else
1094 {
1095 temp=datap1[newindex];
1096 datap1[newindex]=datap1[index];
1097 datap1[index]=temp;
1098 }
1099 }
1100 }
1101 }

Properties

Name Value
cvs:description Functions to perform sorting.