Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino
Contents of /trunk/src/sorting.c
Parent Directory
|
Revision Log
Revision 1310 -
(show annotations)
(download)
(as text)
Fri May 10 19:02:28 2013 UTC (11 years, 10 months ago) by amb
File MIME type: text/x-csrc
File size: 26583 byte(s)
Fri May 10 19:02:28 2013 UTC (11 years, 10 months ago) by amb
File MIME type: text/x-csrc
File size: 26583 byte(s)
Change data type from signed to unsigned (pedantic compiler warning).
1 | /*************************************** |
2 | Merge sort functions. |
3 | |
4 | Part of the Routino routing software. |
5 | ******************/ /****************** |
6 | This file Copyright 2009-2013 Andrew M. Bishop |
7 | |
8 | This program is free software: you can redistribute it and/or modify |
9 | it under the terms of the GNU Affero General Public License as published by |
10 | the Free Software Foundation, either version 3 of the License, or |
11 | (at your option) any later version. |
12 | |
13 | This program is distributed in the hope that it will be useful, |
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
16 | GNU Affero General Public License for more details. |
17 | |
18 | You should have received a copy of the GNU Affero General Public License |
19 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
20 | ***************************************/ |
21 | |
22 | |
23 | #include <stdio.h> |
24 | #include <stdlib.h> |
25 | #include <string.h> |
26 | |
27 | #if defined(USE_PTHREADS) && USE_PTHREADS |
28 | #include <pthread.h> |
29 | #endif |
30 | |
31 | #include "types.h" |
32 | |
33 | #include "logging.h" |
34 | #include "files.h" |
35 | #include "sorting.h" |
36 | |
37 | |
38 | /* Global variables */ |
39 | |
40 | /*+ The command line '--tmpdir' option or its default value. +*/ |
41 | extern char *option_tmpdirname; |
42 | |
43 | /*+ The amount of RAM to use for filesorting. +*/ |
44 | extern size_t option_filesort_ramsize; |
45 | |
46 | /*+ The number of filesorting threads allowed. +*/ |
47 | extern int option_filesort_threads; |
48 | |
49 | |
50 | /* Thread data type definitions */ |
51 | |
52 | /*+ A data type for holding data for a thread. +*/ |
53 | typedef struct _thread_data |
54 | { |
55 | pthread_t thread; /*+ The thread identifier. +*/ |
56 | |
57 | int running; /*+ A flag indicating the current state of the thread. +*/ |
58 | |
59 | void *data; /*+ The main data array. +*/ |
60 | void **datap; /*+ An array of pointers to the data objects. +*/ |
61 | size_t n; /*+ The number of pointers. +*/ |
62 | |
63 | char *filename; /*+ The name of the file to write the results to. +*/ |
64 | |
65 | size_t itemsize; /*+ The size of each item. +*/ |
66 | int (*compare)(const void*,const void*); /*+ The comparison function. +*/ |
67 | } |
68 | thread_data; |
69 | |
70 | /* Thread variables */ |
71 | |
72 | #if defined(USE_PTHREADS) && USE_PTHREADS |
73 | |
74 | static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; |
75 | static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER; |
76 | |
77 | #endif |
78 | |
79 | /* Thread helper functions */ |
80 | |
81 | static void *filesort_fixed_heapsort_thread(thread_data *thread); |
82 | static void *filesort_vary_heapsort_thread(thread_data *thread); |
83 | |
84 | |
85 | /*++++++++++++++++++++++++++++++++++++++ |
86 | A function to sort the contents of a file of fixed length objects using a |
87 | limited amount of RAM. |
88 | |
89 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
90 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
91 | The individual sort steps and the merge step both use a "Heap sort" |
92 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
93 | if the data is already partially sorted. |
94 | |
95 | index_t filesort_fixed Returns the number of objects kept. |
96 | |
97 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
98 | |
99 | int fd_out The file descriptor of the output file (opened for writing and empty). |
100 | |
101 | size_t itemsize The size of each item in the file that needs sorting. |
102 | |
103 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
104 | each item before they have been sorted. The second parameter is the number of objects |
105 | previously read from the input file. If the function returns 1 then the object is kept |
106 | and it is sorted, otherwise it is ignored. |
107 | |
108 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
109 | to qsort if the data to be sorted is an array of things not pointers. |
110 | |
111 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for |
112 | each item after they have been sorted. The second parameter is the number of objects |
113 | already written to the output file. If the function returns 1 then the object is written |
114 | to the output file., otherwise it is ignored. |
115 | ++++++++++++++++++++++++++++++++++++++*/ |
116 | |
117 | index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t), |
118 | int (*compare_function)(const void*,const void*), |
119 | int (*post_sort_function)(void*,index_t)) |
120 | { |
121 | int *fds=NULL,*heap=NULL; |
122 | int nfiles=0,ndata=0; |
123 | index_t count_out=0,count_in=0,total=0; |
124 | size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*))); |
125 | void *data,**datap; |
126 | thread_data *threads; |
127 | size_t item; |
128 | int i,more=1; |
129 | #if defined(USE_PTHREADS) && USE_PTHREADS |
130 | int nthreads=0; |
131 | #endif |
132 | |
133 | /* Allocate the RAM buffer and other bits */ |
134 | |
135 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
136 | |
137 | for(i=0;i<option_filesort_threads;i++) |
138 | { |
139 | threads[i].running=0; |
140 | |
141 | threads[i].data=malloc(nitems*itemsize); |
142 | threads[i].datap=malloc(nitems*sizeof(void*)); |
143 | |
144 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
145 | |
146 | threads[i].itemsize=itemsize; |
147 | threads[i].compare=compare_function; |
148 | } |
149 | |
150 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
151 | |
152 | do |
153 | { |
154 | int thread=0; |
155 | |
156 | #if defined(USE_PTHREADS) && USE_PTHREADS |
157 | |
158 | if(option_filesort_threads>1) |
159 | { |
160 | /* Find a spare slot (one *must* be unused at all times) */ |
161 | |
162 | pthread_mutex_lock(&running_mutex); |
163 | |
164 | for(thread=0;thread<option_filesort_threads;thread++) |
165 | if(!threads[thread].running) |
166 | break; |
167 | |
168 | pthread_mutex_unlock(&running_mutex); |
169 | } |
170 | |
171 | #endif |
172 | |
173 | /* Read in the data and create pointers */ |
174 | |
175 | for(item=0;item<nitems;) |
176 | { |
177 | threads[thread].datap[item]=threads[thread].data+item*itemsize; |
178 | |
179 | if(ReadFile(fd_in,threads[thread].datap[item],itemsize)) |
180 | { |
181 | more=0; |
182 | break; |
183 | } |
184 | |
185 | if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in)) |
186 | { |
187 | item++; |
188 | total++; |
189 | } |
190 | |
191 | count_in++; |
192 | } |
193 | |
194 | threads[thread].n=item; |
195 | |
196 | /* Shortcut if there is no previous data and no more data (i.e. no data at all) */ |
197 | |
198 | if(more==0 && total==0) |
199 | goto tidy_and_exit; |
200 | |
201 | /* No new data read in this time round */ |
202 | |
203 | if(threads[thread].n==0) |
204 | break; |
205 | |
206 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
207 | |
208 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
209 | |
210 | #if defined(USE_PTHREADS) && USE_PTHREADS |
211 | |
212 | /* Shortcut if only one file, don't write to disk */ |
213 | |
214 | if(more==0 && nfiles==0) |
215 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
216 | else if(option_filesort_threads>1) |
217 | { |
218 | pthread_mutex_lock(&running_mutex); |
219 | |
220 | while(nthreads==(option_filesort_threads-1)) |
221 | { |
222 | for(i=0;i<option_filesort_threads;i++) |
223 | if(threads[i].running==2) |
224 | { |
225 | pthread_join(threads[i].thread,NULL); |
226 | threads[i].running=0; |
227 | nthreads--; |
228 | } |
229 | |
230 | if(nthreads==(option_filesort_threads-1)) |
231 | pthread_cond_wait(&running_cond,&running_mutex); |
232 | } |
233 | |
234 | threads[thread].running=1; |
235 | |
236 | pthread_mutex_unlock(&running_mutex); |
237 | |
238 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]); |
239 | |
240 | nthreads++; |
241 | } |
242 | else |
243 | filesort_fixed_heapsort_thread(&threads[thread]); |
244 | |
245 | #else |
246 | |
247 | /* Shortcut if only one file, don't write to disk */ |
248 | |
249 | if(more==0 && nfiles==0) |
250 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
251 | else |
252 | filesort_fixed_heapsort_thread(&threads[thread]); |
253 | |
254 | #endif |
255 | |
256 | nfiles++; |
257 | } |
258 | while(more); |
259 | |
260 | /* Wait for all of the threads to finish */ |
261 | |
262 | #if defined(USE_PTHREADS) && USE_PTHREADS |
263 | |
264 | while(option_filesort_threads>1 && nthreads) |
265 | { |
266 | pthread_mutex_lock(&running_mutex); |
267 | |
268 | pthread_cond_wait(&running_cond,&running_mutex); |
269 | |
270 | for(i=0;i<option_filesort_threads;i++) |
271 | if(threads[i].running==2) |
272 | { |
273 | pthread_join(threads[i].thread,NULL); |
274 | threads[i].running=0; |
275 | nthreads--; |
276 | } |
277 | |
278 | pthread_mutex_unlock(&running_mutex); |
279 | } |
280 | |
281 | #endif |
282 | |
283 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
284 | |
285 | if(nfiles==1) |
286 | { |
287 | for(item=0;item<threads[0].n;item++) |
288 | { |
289 | if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out)) |
290 | { |
291 | WriteFile(fd_out,threads[0].datap[item],itemsize); |
292 | count_out++; |
293 | } |
294 | } |
295 | |
296 | DeleteFile(threads[0].filename); |
297 | |
298 | goto tidy_and_exit; |
299 | } |
300 | |
301 | /* Check that number of files is less than file size */ |
302 | |
303 | logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)"); |
304 | |
305 | /* Open all of the temporary files */ |
306 | |
307 | fds=(int*)malloc(nfiles*sizeof(int)); |
308 | |
309 | for(i=0;i<nfiles;i++) |
310 | { |
311 | char *filename=threads[0].filename; |
312 | |
313 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
314 | |
315 | fds[i]=ReOpenFile(filename); |
316 | |
317 | DeleteFile(filename); |
318 | } |
319 | |
320 | /* Perform an n-way merge using a binary heap */ |
321 | |
322 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
323 | |
324 | data =threads[0].data; |
325 | datap=threads[0].datap; |
326 | |
327 | /* Fill the heap to start with */ |
328 | |
329 | for(i=0;i<nfiles;i++) |
330 | { |
331 | int index; |
332 | |
333 | datap[i]=data+i*itemsize; |
334 | |
335 | ReadFile(fds[i],datap[i],itemsize); |
336 | |
337 | index=i+1; |
338 | |
339 | heap[index]=i; |
340 | |
341 | /* Bubble up the new value */ |
342 | |
343 | while(index>1) |
344 | { |
345 | int newindex; |
346 | int temp; |
347 | |
348 | newindex=index/2; |
349 | |
350 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
351 | break; |
352 | |
353 | temp=heap[index]; |
354 | heap[index]=heap[newindex]; |
355 | heap[newindex]=temp; |
356 | |
357 | index=newindex; |
358 | } |
359 | } |
360 | |
361 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
362 | |
363 | ndata=nfiles; |
364 | |
365 | do |
366 | { |
367 | int index=1; |
368 | |
369 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
370 | { |
371 | WriteFile(fd_out,datap[heap[index]],itemsize); |
372 | count_out++; |
373 | } |
374 | |
375 | if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize)) |
376 | { |
377 | heap[index]=heap[ndata]; |
378 | ndata--; |
379 | } |
380 | |
381 | /* Bubble down the new value */ |
382 | |
383 | while((2*index)<ndata) |
384 | { |
385 | int newindex; |
386 | int temp; |
387 | |
388 | newindex=2*index; |
389 | |
390 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
391 | newindex=newindex+1; |
392 | |
393 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
394 | break; |
395 | |
396 | temp=heap[newindex]; |
397 | heap[newindex]=heap[index]; |
398 | heap[index]=temp; |
399 | |
400 | index=newindex; |
401 | } |
402 | |
403 | if((2*index)==ndata) |
404 | { |
405 | int newindex; |
406 | int temp; |
407 | |
408 | newindex=2*index; |
409 | |
410 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
411 | ; /* break */ |
412 | else |
413 | { |
414 | temp=heap[newindex]; |
415 | heap[newindex]=heap[index]; |
416 | heap[index]=temp; |
417 | } |
418 | } |
419 | } |
420 | while(ndata>0); |
421 | |
422 | /* Tidy up */ |
423 | |
424 | tidy_and_exit: |
425 | |
426 | if(fds) |
427 | { |
428 | for(i=0;i<nfiles;i++) |
429 | CloseFile(fds[i]); |
430 | free(fds); |
431 | } |
432 | |
433 | if(heap) |
434 | free(heap); |
435 | |
436 | for(i=0;i<option_filesort_threads;i++) |
437 | { |
438 | free(threads[i].data); |
439 | free(threads[i].datap); |
440 | |
441 | free(threads[i].filename); |
442 | } |
443 | |
444 | return(count_out); |
445 | } |
446 | |
447 | |
448 | /*++++++++++++++++++++++++++++++++++++++ |
449 | A function to sort the contents of a file of variable length objects (each |
450 | preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM. |
451 | |
452 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
453 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
454 | The individual sort steps and the merge step both use a "Heap sort" |
455 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
456 | if the data is already partially sorted. |
457 | |
458 | index_t filesort_vary Returns the number of objects kept. |
459 | |
460 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
461 | |
462 | int fd_out The file descriptor of the output file (opened for writing and empty). |
463 | |
464 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
465 | each item before they have been sorted. The second parameter is the number of objects |
466 | previously read from the input file. If the function returns 1 then the object is kept |
467 | and it is sorted, otherwise it is ignored. |
468 | |
469 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
470 | to qsort if the data to be sorted is an array of things not pointers. |
471 | |
472 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for |
473 | each item after they have been sorted. The second parameter is the number of objects |
474 | already written to the output file. If the function returns 1 then the object is written |
475 | to the output file., otherwise it is ignored. |
476 | ++++++++++++++++++++++++++++++++++++++*/ |
477 | |
478 | index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t), |
479 | int (*compare_function)(const void*,const void*), |
480 | int (*post_sort_function)(void*,index_t)) |
481 | { |
482 | int *fds=NULL,*heap=NULL; |
483 | int nfiles=0,ndata=0; |
484 | index_t count_out=0,count_in=0,total=0; |
485 | size_t datasize=option_filesort_ramsize/option_filesort_threads; |
486 | FILESORT_VARINT nextitemsize,largestitemsize=0; |
487 | void *data,**datap; |
488 | thread_data *threads; |
489 | size_t item; |
490 | int i,more=1; |
491 | #if defined(USE_PTHREADS) && USE_PTHREADS |
492 | int nthreads=0; |
493 | #endif |
494 | |
495 | /* Allocate the RAM buffer and other bits */ |
496 | |
497 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
498 | |
499 | for(i=0;i<option_filesort_threads;i++) |
500 | { |
501 | threads[i].running=0; |
502 | |
503 | threads[i].data=malloc(datasize); |
504 | threads[i].datap=NULL; |
505 | |
506 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
507 | |
508 | threads[i].compare=compare_function; |
509 | } |
510 | |
511 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
512 | |
513 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */ |
514 | goto tidy_and_exit; |
515 | |
516 | do |
517 | { |
518 | size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE; |
519 | int thread=0; |
520 | |
521 | #if defined(USE_PTHREADS) && USE_PTHREADS |
522 | |
523 | if(option_filesort_threads>1) |
524 | { |
525 | /* Find a spare slot (one *must* be unused at all times) */ |
526 | |
527 | pthread_mutex_lock(&running_mutex); |
528 | |
529 | for(thread=0;thread<option_filesort_threads;thread++) |
530 | if(!threads[thread].running) |
531 | break; |
532 | |
533 | pthread_mutex_unlock(&running_mutex); |
534 | } |
535 | |
536 | #endif |
537 | |
538 | threads[thread].datap=threads[thread].data+datasize; |
539 | |
540 | threads[thread].n=0; |
541 | |
542 | /* Read in the data and create pointers */ |
543 | |
544 | while((ramused+FILESORT_VARSIZE+nextitemsize)<=(unsigned)((void*)threads[thread].datap-sizeof(void*)-threads[thread].data)) |
545 | { |
546 | FILESORT_VARINT itemsize=nextitemsize; |
547 | |
548 | if(itemsize>largestitemsize) |
549 | largestitemsize=itemsize; |
550 | |
551 | *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize; |
552 | |
553 | ramused+=FILESORT_VARSIZE; |
554 | |
555 | ReadFile(fd_in,threads[thread].data+ramused,itemsize); |
556 | |
557 | if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in)) |
558 | { |
559 | *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */ |
560 | |
561 | ramused+=itemsize; |
562 | |
563 | ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN); |
564 | ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE; |
565 | |
566 | total++; |
567 | threads[thread].n++; |
568 | } |
569 | |
570 | count_in++; |
571 | |
572 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) |
573 | { |
574 | more=0; |
575 | break; |
576 | } |
577 | } |
578 | |
579 | /* No new data read in this time round */ |
580 | |
581 | if(threads[thread].n==0) |
582 | break; |
583 | |
584 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
585 | |
586 | if(more==0 && nfiles==0) |
587 | threads[thread].filename[0]=0; |
588 | else |
589 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
590 | |
591 | #if defined(USE_PTHREADS) && USE_PTHREADS |
592 | |
593 | /* Shortcut if only one file, don't write to disk */ |
594 | |
595 | if(more==0 && nfiles==0) |
596 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
597 | else if(option_filesort_threads>1) |
598 | { |
599 | pthread_mutex_lock(&running_mutex); |
600 | |
601 | while(nthreads==(option_filesort_threads-1)) |
602 | { |
603 | for(i=0;i<option_filesort_threads;i++) |
604 | if(threads[i].running==2) |
605 | { |
606 | pthread_join(threads[i].thread,NULL); |
607 | threads[i].running=0; |
608 | nthreads--; |
609 | } |
610 | |
611 | if(nthreads==(option_filesort_threads-1)) |
612 | pthread_cond_wait(&running_cond,&running_mutex); |
613 | } |
614 | |
615 | threads[thread].running=1; |
616 | |
617 | pthread_mutex_unlock(&running_mutex); |
618 | |
619 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]); |
620 | |
621 | nthreads++; |
622 | } |
623 | else |
624 | filesort_vary_heapsort_thread(&threads[thread]); |
625 | |
626 | #else |
627 | |
628 | /* Shortcut if only one file, don't write to disk */ |
629 | |
630 | if(more==0 && nfiles==0) |
631 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
632 | else |
633 | filesort_vary_heapsort_thread(&threads[thread]); |
634 | |
635 | #endif |
636 | |
637 | nfiles++; |
638 | } |
639 | while(more); |
640 | |
641 | /* Wait for all of the threads to finish */ |
642 | |
643 | #if defined(USE_PTHREADS) && USE_PTHREADS |
644 | |
645 | while(option_filesort_threads>1 && nthreads) |
646 | { |
647 | pthread_mutex_lock(&running_mutex); |
648 | |
649 | pthread_cond_wait(&running_cond,&running_mutex); |
650 | |
651 | for(i=0;i<option_filesort_threads;i++) |
652 | if(threads[i].running==2) |
653 | { |
654 | pthread_join(threads[i].thread,NULL); |
655 | threads[i].running=0; |
656 | nthreads--; |
657 | } |
658 | |
659 | pthread_mutex_unlock(&running_mutex); |
660 | } |
661 | |
662 | #endif |
663 | |
664 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
665 | |
666 | if(nfiles==1) |
667 | { |
668 | for(item=0;item<threads[0].n;item++) |
669 | { |
670 | if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out)) |
671 | { |
672 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[item]-FILESORT_VARSIZE); |
673 | |
674 | WriteFile(fd_out,threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
675 | count_out++; |
676 | } |
677 | } |
678 | |
679 | DeleteFile(threads[0].filename); |
680 | |
681 | goto tidy_and_exit; |
682 | } |
683 | |
684 | /* Check that number of files is less than file size */ |
685 | |
686 | largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN); |
687 | |
688 | logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize),"Too many temporary files (use more sorting memory?)"); |
689 | |
690 | /* Open all of the temporary files */ |
691 | |
692 | fds=(int*)malloc(nfiles*sizeof(int)); |
693 | |
694 | for(i=0;i<nfiles;i++) |
695 | { |
696 | char *filename=threads[0].filename; |
697 | |
698 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
699 | |
700 | fds[i]=ReOpenFile(filename); |
701 | |
702 | DeleteFile(filename); |
703 | } |
704 | |
705 | /* Perform an n-way merge using a binary heap */ |
706 | |
707 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
708 | |
709 | data=threads[0].data; |
710 | datap=data+datasize-nfiles*sizeof(void*); |
711 | |
712 | /* Fill the heap to start with */ |
713 | |
714 | for(i=0;i<nfiles;i++) |
715 | { |
716 | int index; |
717 | FILESORT_VARINT itemsize; |
718 | |
719 | datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize; |
720 | |
721 | ReadFile(fds[i],&itemsize,FILESORT_VARSIZE); |
722 | |
723 | *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize; |
724 | |
725 | ReadFile(fds[i],datap[i],itemsize); |
726 | |
727 | index=i+1; |
728 | |
729 | heap[index]=i; |
730 | |
731 | /* Bubble up the new value */ |
732 | |
733 | while(index>1) |
734 | { |
735 | int newindex; |
736 | int temp; |
737 | |
738 | newindex=index/2; |
739 | |
740 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
741 | break; |
742 | |
743 | temp=heap[index]; |
744 | heap[index]=heap[newindex]; |
745 | heap[newindex]=temp; |
746 | |
747 | index=newindex; |
748 | } |
749 | } |
750 | |
751 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
752 | |
753 | ndata=nfiles; |
754 | |
755 | do |
756 | { |
757 | int index=1; |
758 | FILESORT_VARINT itemsize; |
759 | |
760 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
761 | { |
762 | itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE); |
763 | |
764 | WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
765 | count_out++; |
766 | } |
767 | |
768 | if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE)) |
769 | { |
770 | heap[index]=heap[ndata]; |
771 | ndata--; |
772 | } |
773 | else |
774 | { |
775 | *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize; |
776 | |
777 | ReadFile(fds[heap[index]],datap[heap[index]],itemsize); |
778 | } |
779 | |
780 | /* Bubble down the new value */ |
781 | |
782 | while((2*index)<ndata) |
783 | { |
784 | int newindex; |
785 | int temp; |
786 | |
787 | newindex=2*index; |
788 | |
789 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
790 | newindex=newindex+1; |
791 | |
792 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
793 | break; |
794 | |
795 | temp=heap[newindex]; |
796 | heap[newindex]=heap[index]; |
797 | heap[index]=temp; |
798 | |
799 | index=newindex; |
800 | } |
801 | |
802 | if((2*index)==ndata) |
803 | { |
804 | int newindex; |
805 | int temp; |
806 | |
807 | newindex=2*index; |
808 | |
809 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
810 | ; /* break */ |
811 | else |
812 | { |
813 | temp=heap[newindex]; |
814 | heap[newindex]=heap[index]; |
815 | heap[index]=temp; |
816 | } |
817 | } |
818 | } |
819 | while(ndata>0); |
820 | |
821 | /* Tidy up */ |
822 | |
823 | tidy_and_exit: |
824 | |
825 | if(fds) |
826 | { |
827 | for(i=0;i<nfiles;i++) |
828 | CloseFile(fds[i]); |
829 | free(fds); |
830 | } |
831 | |
832 | if(heap) |
833 | free(heap); |
834 | |
835 | for(i=0;i<option_filesort_threads;i++) |
836 | { |
837 | free(threads[i].data); |
838 | |
839 | free(threads[i].filename); |
840 | } |
841 | |
842 | return(count_out); |
843 | } |
844 | |
845 | |
846 | /*++++++++++++++++++++++++++++++++++++++ |
847 | A wrapper function that can be run in a thread for fixed data. |
848 | |
849 | void *filesort_fixed_heapsort_thread Returns NULL (required to return void*). |
850 | |
851 | thread_data *thread The data to be processed in this thread. |
852 | ++++++++++++++++++++++++++++++++++++++*/ |
853 | |
854 | static void *filesort_fixed_heapsort_thread(thread_data *thread) |
855 | { |
856 | int fd; |
857 | size_t item; |
858 | |
859 | /* Sort the data pointers using a heap sort */ |
860 | |
861 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
862 | |
863 | /* Create a temporary file and write the result */ |
864 | |
865 | fd=OpenFileNew(thread->filename); |
866 | |
867 | for(item=0;item<thread->n;item++) |
868 | WriteFile(fd,thread->datap[item],thread->itemsize); |
869 | |
870 | CloseFile(fd); |
871 | |
872 | #if defined(USE_PTHREADS) && USE_PTHREADS |
873 | |
874 | if(option_filesort_threads>1) |
875 | { |
876 | pthread_mutex_lock(&running_mutex); |
877 | |
878 | thread->running=2; |
879 | |
880 | pthread_cond_signal(&running_cond); |
881 | |
882 | pthread_mutex_unlock(&running_mutex); |
883 | } |
884 | |
885 | #endif |
886 | |
887 | return(NULL); |
888 | } |
889 | |
890 | |
891 | /*++++++++++++++++++++++++++++++++++++++ |
892 | A wrapper function that can be run in a thread for variable data. |
893 | |
894 | void *filesort_vary_heapsort_thread Returns NULL (required to return void*). |
895 | |
896 | thread_data *thread The data to be processed in this thread. |
897 | ++++++++++++++++++++++++++++++++++++++*/ |
898 | |
899 | static void *filesort_vary_heapsort_thread(thread_data *thread) |
900 | { |
901 | int fd; |
902 | size_t item; |
903 | |
904 | /* Sort the data pointers using a heap sort */ |
905 | |
906 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
907 | |
908 | /* Create a temporary file and write the result */ |
909 | |
910 | fd=OpenFileNew(thread->filename); |
911 | |
912 | for(item=0;item<thread->n;item++) |
913 | { |
914 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[item]-FILESORT_VARSIZE); |
915 | |
916 | WriteFile(fd,thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
917 | } |
918 | |
919 | CloseFile(fd); |
920 | |
921 | #if defined(USE_PTHREADS) && USE_PTHREADS |
922 | |
923 | if(option_filesort_threads>1) |
924 | { |
925 | pthread_mutex_lock(&running_mutex); |
926 | |
927 | thread->running=2; |
928 | |
929 | pthread_cond_signal(&running_cond); |
930 | |
931 | pthread_mutex_unlock(&running_mutex); |
932 | } |
933 | |
934 | #endif |
935 | |
936 | return(NULL); |
937 | } |
938 | |
939 | |
940 | /*++++++++++++++++++++++++++++++++++++++ |
941 | A function to sort an array of pointers efficiently. |
942 | |
943 | The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort, |
944 | in particular, this is good because it can operate in-place and doesn't |
945 | allocate more memory like using qsort() does. |
946 | |
947 | void **datap A pointer to the array of pointers to sort. |
948 | |
949 | size_t nitems The number of items of data to sort. |
950 | |
951 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
952 | to qsort if the data to be sorted is an array of things not pointers. |
953 | ++++++++++++++++++++++++++++++++++++++*/ |
954 | |
955 | void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*)) |
956 | { |
957 | void **datap1=&datap[-1]; |
958 | size_t item; |
959 | |
960 | /* Fill the heap by pretending to insert the data that is already there */ |
961 | |
962 | for(item=2;item<=nitems;item++) |
963 | { |
964 | size_t index=item; |
965 | |
966 | /* Bubble up the new value (upside-down, put largest at top) */ |
967 | |
968 | while(index>1) |
969 | { |
970 | int newindex; |
971 | void *temp; |
972 | |
973 | newindex=index/2; |
974 | |
975 | if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */ |
976 | break; |
977 | |
978 | temp=datap1[index]; |
979 | datap1[index]=datap1[newindex]; |
980 | datap1[newindex]=temp; |
981 | |
982 | index=newindex; |
983 | } |
984 | } |
985 | |
986 | /* Repeatedly pull out the root of the heap and swap with the bottom item */ |
987 | |
988 | for(item=nitems;item>1;item--) |
989 | { |
990 | size_t index=1; |
991 | void *temp; |
992 | |
993 | temp=datap1[index]; |
994 | datap1[index]=datap1[item]; |
995 | datap1[item]=temp; |
996 | |
997 | /* Bubble down the new value (upside-down, put largest at top) */ |
998 | |
999 | while((2*index)<(item-1)) |
1000 | { |
1001 | int newindex; |
1002 | void *temp; |
1003 | |
1004 | newindex=2*index; |
1005 | |
1006 | if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */ |
1007 | newindex=newindex+1; |
1008 | |
1009 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1010 | break; |
1011 | |
1012 | temp=datap1[newindex]; |
1013 | datap1[newindex]=datap1[index]; |
1014 | datap1[index]=temp; |
1015 | |
1016 | index=newindex; |
1017 | } |
1018 | |
1019 | if((2*index)==(item-1)) |
1020 | { |
1021 | int newindex; |
1022 | void *temp; |
1023 | |
1024 | newindex=2*index; |
1025 | |
1026 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1027 | ; /* break */ |
1028 | else |
1029 | { |
1030 | temp=datap1[newindex]; |
1031 | datap1[newindex]=datap1[index]; |
1032 | datap1[index]=temp; |
1033 | } |
1034 | } |
1035 | } |
1036 | } |
Properties
Name | Value |
---|---|
cvs:description | Functions to perform sorting. |