Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino
Contents of /trunk/src/sorting.c
Parent Directory
|
Revision Log
Revision 1649 -
(show annotations)
(download)
(as text)
Wed May 13 16:45:26 2015 UTC (9 years, 11 months ago) by amb
File MIME type: text/x-csrc
File size: 27528 byte(s)
Wed May 13 16:45:26 2015 UTC (9 years, 11 months ago) by amb
File MIME type: text/x-csrc
File size: 27528 byte(s)
Remove some pthread related code that was being used even if compiled without pthreads.
1 | /*************************************** |
2 | Merge sort functions. |
3 | |
4 | Part of the Routino routing software. |
5 | ******************/ /****************** |
6 | This file Copyright 2009-2015 Andrew M. Bishop |
7 | |
8 | This program is free software: you can redistribute it and/or modify |
9 | it under the terms of the GNU Affero General Public License as published by |
10 | the Free Software Foundation, either version 3 of the License, or |
11 | (at your option) any later version. |
12 | |
13 | This program is distributed in the hope that it will be useful, |
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
16 | GNU Affero General Public License for more details. |
17 | |
18 | You should have received a copy of the GNU Affero General Public License |
19 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
20 | ***************************************/ |
21 | |
22 | |
23 | #include <stdio.h> |
24 | #include <stdlib.h> |
25 | #include <string.h> |
26 | |
27 | #if defined(USE_PTHREADS) && USE_PTHREADS |
28 | #include <pthread.h> |
29 | #endif |
30 | |
31 | #include "types.h" |
32 | |
33 | #include "logging.h" |
34 | #include "files.h" |
35 | #include "sorting.h" |
36 | |
37 | |
38 | /* Global variables */ |
39 | |
40 | /*+ The command line '--tmpdir' option or its default value. +*/ |
41 | extern char *option_tmpdirname; |
42 | |
43 | /*+ The amount of RAM to use for filesorting. +*/ |
44 | extern size_t option_filesort_ramsize; |
45 | |
46 | /*+ The number of filesorting threads allowed. +*/ |
47 | extern int option_filesort_threads; |
48 | |
49 | |
50 | /* Thread data type definitions */ |
51 | |
52 | /*+ A data type for holding data for a thread. +*/ |
53 | typedef struct _thread_data |
54 | { |
55 | #if defined(USE_PTHREADS) && USE_PTHREADS |
56 | |
57 | pthread_t thread; /*+ The thread identifier. +*/ |
58 | |
59 | int running; /*+ A flag indicating the current state of the thread. +*/ |
60 | |
61 | #endif |
62 | |
63 | void *data; /*+ The main data array. +*/ |
64 | void **datap; /*+ An array of pointers to the data objects. +*/ |
65 | size_t n; /*+ The number of pointers. +*/ |
66 | |
67 | char *filename; /*+ The name of the file to write the results to. +*/ |
68 | |
69 | size_t itemsize; /*+ The size of each item. +*/ |
70 | int (*compare)(const void*,const void*); /*+ The comparison function. +*/ |
71 | } |
72 | thread_data; |
73 | |
74 | /* Thread variables */ |
75 | |
76 | #if defined(USE_PTHREADS) && USE_PTHREADS |
77 | |
78 | static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; |
79 | static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER; |
80 | |
81 | #endif |
82 | |
83 | /* Thread helper functions */ |
84 | |
85 | static void *filesort_fixed_heapsort_thread(thread_data *thread); |
86 | static void *filesort_vary_heapsort_thread(thread_data *thread); |
87 | |
88 | |
89 | /*++++++++++++++++++++++++++++++++++++++ |
90 | A function to sort the contents of a file of fixed length objects using a |
91 | limited amount of RAM. |
92 | |
93 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
94 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
95 | The individual sort steps and the merge step both use a "Heap sort" |
96 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
97 | if the data is already partially sorted. |
98 | |
99 | index_t filesort_fixed Returns the number of objects kept. |
100 | |
101 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
102 | |
103 | int fd_out The file descriptor of the output file (opened for writing and empty). |
104 | |
105 | size_t itemsize The size of each item in the file that needs sorting. |
106 | |
107 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
108 | each item before they have been sorted. The second parameter is the number of objects |
109 | previously read from the input file. If the function returns 1 then the object is kept |
110 | and it is sorted, otherwise it is ignored. |
111 | |
112 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
113 | to qsort if the data to be sorted is an array of things not pointers. |
114 | |
115 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for |
116 | each item after they have been sorted. The second parameter is the number of objects |
117 | already written to the output file. If the function returns 1 then the object is written |
118 | to the output file., otherwise it is ignored. |
119 | ++++++++++++++++++++++++++++++++++++++*/ |
120 | |
121 | index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t), |
122 | int (*compare_function)(const void*,const void*), |
123 | int (*post_sort_function)(void*,index_t)) |
124 | { |
125 | int *fds=NULL,*heap=NULL; |
126 | int nfiles=0,ndata=0; |
127 | index_t count_out=0,count_in=0,total=0; |
128 | size_t nitems; |
129 | void *data,**datap; |
130 | thread_data *threads; |
131 | size_t item; |
132 | int i,more=1; |
133 | #if defined(USE_PTHREADS) && USE_PTHREADS |
134 | int nthreads=0; |
135 | #endif |
136 | |
137 | /* Allocate the RAM buffer and other bits */ |
138 | |
139 | nitems=SizeFileFD(fd_in)/itemsize; |
140 | |
141 | if(nitems==0) |
142 | return(0); |
143 | |
144 | if((nitems*(itemsize+sizeof(void*)))<option_filesort_ramsize) |
145 | nitems=1+nitems/option_filesort_threads; |
146 | else |
147 | nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*))); |
148 | |
149 | threads=(thread_data*)calloc(option_filesort_threads,sizeof(thread_data)); |
150 | |
151 | for(i=0;i<option_filesort_threads;i++) |
152 | { |
153 | threads[i].data=malloc(nitems*itemsize); |
154 | threads[i].datap=malloc(nitems*sizeof(void*)); |
155 | |
156 | log_malloc(threads[i].data,nitems*itemsize); |
157 | log_malloc(threads[i].datap,nitems*sizeof(void*)); |
158 | |
159 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
160 | |
161 | threads[i].itemsize=itemsize; |
162 | threads[i].compare=compare_function; |
163 | } |
164 | |
165 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
166 | |
167 | do |
168 | { |
169 | int thread=0; |
170 | |
171 | #if defined(USE_PTHREADS) && USE_PTHREADS |
172 | |
173 | if(option_filesort_threads>1) |
174 | { |
175 | /* Find a spare slot (one *must* be unused at all times) */ |
176 | |
177 | pthread_mutex_lock(&running_mutex); |
178 | |
179 | for(thread=0;thread<option_filesort_threads;thread++) |
180 | if(!threads[thread].running) |
181 | break; |
182 | |
183 | pthread_mutex_unlock(&running_mutex); |
184 | } |
185 | |
186 | #endif |
187 | |
188 | /* Read in the data and create pointers */ |
189 | |
190 | for(item=0;item<nitems;) |
191 | { |
192 | threads[thread].datap[item]=threads[thread].data+item*itemsize; |
193 | |
194 | if(ReadFileBuffered(fd_in,threads[thread].datap[item],itemsize)) |
195 | { |
196 | more=0; |
197 | break; |
198 | } |
199 | |
200 | if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in)) |
201 | { |
202 | item++; |
203 | total++; |
204 | } |
205 | |
206 | count_in++; |
207 | } |
208 | |
209 | threads[thread].n=item; |
210 | |
211 | /* Shortcut if there is no previous data and no more data (i.e. no data at all) */ |
212 | |
213 | if(more==0 && total==0) |
214 | goto tidy_and_exit; |
215 | |
216 | /* No new data read in this time round */ |
217 | |
218 | if(threads[thread].n==0) |
219 | break; |
220 | |
221 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
222 | |
223 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
224 | |
225 | #if defined(USE_PTHREADS) && USE_PTHREADS |
226 | |
227 | /* Shortcut if only one file, don't write to disk */ |
228 | |
229 | if(more==0 && nfiles==0) |
230 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
231 | else if(option_filesort_threads>1) |
232 | { |
233 | pthread_mutex_lock(&running_mutex); |
234 | |
235 | while(nthreads==(option_filesort_threads-1)) |
236 | { |
237 | for(i=0;i<option_filesort_threads;i++) |
238 | if(threads[i].running==2) |
239 | { |
240 | pthread_join(threads[i].thread,NULL); |
241 | threads[i].running=0; |
242 | nthreads--; |
243 | } |
244 | |
245 | if(nthreads==(option_filesort_threads-1)) |
246 | pthread_cond_wait(&running_cond,&running_mutex); |
247 | } |
248 | |
249 | threads[thread].running=1; |
250 | |
251 | pthread_mutex_unlock(&running_mutex); |
252 | |
253 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]); |
254 | |
255 | nthreads++; |
256 | } |
257 | else |
258 | filesort_fixed_heapsort_thread(&threads[thread]); |
259 | |
260 | #else |
261 | |
262 | /* Shortcut if only one file, don't write to disk */ |
263 | |
264 | if(more==0 && nfiles==0) |
265 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
266 | else |
267 | filesort_fixed_heapsort_thread(&threads[thread]); |
268 | |
269 | #endif |
270 | |
271 | nfiles++; |
272 | } |
273 | while(more); |
274 | |
275 | /* Wait for all of the threads to finish */ |
276 | |
277 | #if defined(USE_PTHREADS) && USE_PTHREADS |
278 | |
279 | while(option_filesort_threads>1 && nthreads) |
280 | { |
281 | pthread_mutex_lock(&running_mutex); |
282 | |
283 | pthread_cond_wait(&running_cond,&running_mutex); |
284 | |
285 | for(i=0;i<option_filesort_threads;i++) |
286 | if(threads[i].running==2) |
287 | { |
288 | pthread_join(threads[i].thread,NULL); |
289 | threads[i].running=0; |
290 | nthreads--; |
291 | } |
292 | |
293 | pthread_mutex_unlock(&running_mutex); |
294 | } |
295 | |
296 | #endif |
297 | |
298 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
299 | |
300 | if(nfiles==1) |
301 | { |
302 | for(item=0;item<threads[0].n;item++) |
303 | { |
304 | if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out)) |
305 | { |
306 | WriteFileBuffered(fd_out,threads[0].datap[item],itemsize); |
307 | count_out++; |
308 | } |
309 | } |
310 | |
311 | DeleteFile(threads[0].filename); |
312 | |
313 | goto tidy_and_exit; |
314 | } |
315 | |
316 | /* Check that number of files is less than file size */ |
317 | |
318 | logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)"); |
319 | |
320 | /* Open all of the temporary files */ |
321 | |
322 | fds=(int*)malloc(nfiles*sizeof(int)); |
323 | |
324 | for(i=0;i<nfiles;i++) |
325 | { |
326 | char *filename=threads[0].filename; |
327 | |
328 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
329 | |
330 | fds[i]=ReOpenFileBuffered(filename); |
331 | |
332 | DeleteFile(filename); |
333 | } |
334 | |
335 | /* Perform an n-way merge using a binary heap */ |
336 | |
337 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
338 | |
339 | data =threads[0].data; |
340 | datap=threads[0].datap; |
341 | |
342 | /* Fill the heap to start with */ |
343 | |
344 | for(i=0;i<nfiles;i++) |
345 | { |
346 | int index; |
347 | |
348 | datap[i]=data+i*itemsize; |
349 | |
350 | ReadFileBuffered(fds[i],datap[i],itemsize); |
351 | |
352 | index=i+1; |
353 | |
354 | heap[index]=i; |
355 | |
356 | /* Bubble up the new value */ |
357 | |
358 | while(index>1) |
359 | { |
360 | int newindex; |
361 | int temp; |
362 | |
363 | newindex=index/2; |
364 | |
365 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
366 | break; |
367 | |
368 | temp=heap[index]; |
369 | heap[index]=heap[newindex]; |
370 | heap[newindex]=temp; |
371 | |
372 | index=newindex; |
373 | } |
374 | } |
375 | |
376 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
377 | |
378 | ndata=nfiles; |
379 | |
380 | do |
381 | { |
382 | int index=1; |
383 | |
384 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
385 | { |
386 | WriteFileBuffered(fd_out,datap[heap[index]],itemsize); |
387 | count_out++; |
388 | } |
389 | |
390 | if(ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize)) |
391 | { |
392 | heap[index]=heap[ndata]; |
393 | ndata--; |
394 | } |
395 | |
396 | /* Bubble down the new value */ |
397 | |
398 | while((2*index)<ndata) |
399 | { |
400 | int newindex; |
401 | int temp; |
402 | |
403 | newindex=2*index; |
404 | |
405 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
406 | newindex=newindex+1; |
407 | |
408 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
409 | break; |
410 | |
411 | temp=heap[newindex]; |
412 | heap[newindex]=heap[index]; |
413 | heap[index]=temp; |
414 | |
415 | index=newindex; |
416 | } |
417 | |
418 | if((2*index)==ndata) |
419 | { |
420 | int newindex; |
421 | int temp; |
422 | |
423 | newindex=2*index; |
424 | |
425 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
426 | ; /* break */ |
427 | else |
428 | { |
429 | temp=heap[newindex]; |
430 | heap[newindex]=heap[index]; |
431 | heap[index]=temp; |
432 | } |
433 | } |
434 | } |
435 | while(ndata>0); |
436 | |
437 | /* Tidy up */ |
438 | |
439 | tidy_and_exit: |
440 | |
441 | if(fds) |
442 | { |
443 | for(i=0;i<nfiles;i++) |
444 | CloseFileBuffered(fds[i]); |
445 | free(fds); |
446 | } |
447 | |
448 | if(heap) |
449 | free(heap); |
450 | |
451 | for(i=0;i<option_filesort_threads;i++) |
452 | { |
453 | log_free(threads[i].data); |
454 | log_free(threads[i].datap); |
455 | |
456 | free(threads[i].data); |
457 | free(threads[i].datap); |
458 | |
459 | free(threads[i].filename); |
460 | } |
461 | |
462 | free(threads); |
463 | |
464 | return(count_out); |
465 | } |
466 | |
467 | |
468 | /*++++++++++++++++++++++++++++++++++++++ |
469 | A function to sort the contents of a file of variable length objects (each |
470 | preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM. |
471 | |
472 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
473 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
474 | The individual sort steps and the merge step both use a "Heap sort" |
475 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
476 | if the data is already partially sorted. |
477 | |
478 | index_t filesort_vary Returns the number of objects kept. |
479 | |
480 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
481 | |
482 | int fd_out The file descriptor of the output file (opened for writing and empty). |
483 | |
484 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
485 | each item before they have been sorted. The second parameter is the number of objects |
486 | previously read from the input file. If the function returns 1 then the object is kept |
487 | and it is sorted, otherwise it is ignored. |
488 | |
489 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
490 | to qsort if the data to be sorted is an array of things not pointers. |
491 | |
492 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for |
493 | each item after they have been sorted. The second parameter is the number of objects |
494 | already written to the output file. If the function returns 1 then the object is written |
495 | to the output file., otherwise it is ignored. |
496 | ++++++++++++++++++++++++++++++++++++++*/ |
497 | |
498 | index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t), |
499 | int (*compare_function)(const void*,const void*), |
500 | int (*post_sort_function)(void*,index_t)) |
501 | { |
502 | int *fds=NULL,*heap=NULL; |
503 | int nfiles=0,ndata=0; |
504 | index_t count_out=0,count_in=0,total=0; |
505 | size_t datasize; |
506 | FILESORT_VARINT nextitemsize,largestitemsize=0; |
507 | void *data,**datap; |
508 | thread_data *threads; |
509 | size_t item; |
510 | int i,more=1; |
511 | #if defined(USE_PTHREADS) && USE_PTHREADS |
512 | int nthreads=0; |
513 | #endif |
514 | |
515 | /* Allocate the RAM buffer and other bits */ |
516 | |
517 | datasize=SizeFileFD(fd_in); |
518 | |
519 | if(datasize==0) |
520 | return(0); |
521 | |
522 | if((datasize*2)<option_filesort_ramsize) /* estimate that data and pointer are same size */ |
523 | datasize=(datasize*2)/option_filesort_threads; |
524 | else |
525 | datasize=option_filesort_ramsize/option_filesort_threads; |
526 | |
527 | threads=(thread_data*)calloc(option_filesort_threads,sizeof(thread_data)); |
528 | |
529 | for(i=0;i<option_filesort_threads;i++) |
530 | { |
531 | threads[i].data=malloc(datasize); |
532 | threads[i].datap=NULL; |
533 | |
534 | log_malloc(threads[i].data,datasize); |
535 | |
536 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
537 | |
538 | threads[i].compare=compare_function; |
539 | } |
540 | |
541 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
542 | |
543 | if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */ |
544 | goto tidy_and_exit; |
545 | |
546 | do |
547 | { |
548 | size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE; |
549 | int thread=0; |
550 | |
551 | #if defined(USE_PTHREADS) && USE_PTHREADS |
552 | |
553 | if(option_filesort_threads>1) |
554 | { |
555 | /* Find a spare slot (one *must* be unused at all times) */ |
556 | |
557 | pthread_mutex_lock(&running_mutex); |
558 | |
559 | for(thread=0;thread<option_filesort_threads;thread++) |
560 | if(!threads[thread].running) |
561 | break; |
562 | |
563 | pthread_mutex_unlock(&running_mutex); |
564 | } |
565 | |
566 | #endif |
567 | |
568 | threads[thread].datap=threads[thread].data+datasize; |
569 | |
570 | threads[thread].n=0; |
571 | |
572 | /* Read in the data and create pointers */ |
573 | |
574 | while((ramused+FILESORT_VARSIZE+nextitemsize)<=(unsigned)((void*)threads[thread].datap-sizeof(void*)-threads[thread].data)) |
575 | { |
576 | FILESORT_VARINT itemsize=nextitemsize; |
577 | |
578 | *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize; |
579 | |
580 | ramused+=FILESORT_VARSIZE; |
581 | |
582 | ReadFileBuffered(fd_in,threads[thread].data+ramused,itemsize); |
583 | |
584 | if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in)) |
585 | { |
586 | *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */ |
587 | |
588 | if(itemsize>largestitemsize) |
589 | largestitemsize=itemsize; |
590 | |
591 | ramused+=itemsize; |
592 | |
593 | ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN); |
594 | ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE; |
595 | |
596 | total++; |
597 | threads[thread].n++; |
598 | } |
599 | else |
600 | ramused-=FILESORT_VARSIZE; |
601 | |
602 | count_in++; |
603 | |
604 | if(ReadFileBuffered(fd_in,&nextitemsize,FILESORT_VARSIZE)) |
605 | { |
606 | more=0; |
607 | break; |
608 | } |
609 | } |
610 | |
611 | /* No new data read in this time round */ |
612 | |
613 | if(threads[thread].n==0) |
614 | break; |
615 | |
616 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
617 | |
618 | if(more==0 && nfiles==0) |
619 | threads[thread].filename[0]=0; |
620 | else |
621 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
622 | |
623 | #if defined(USE_PTHREADS) && USE_PTHREADS |
624 | |
625 | /* Shortcut if only one file, don't write to disk */ |
626 | |
627 | if(more==0 && nfiles==0) |
628 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
629 | else if(option_filesort_threads>1) |
630 | { |
631 | pthread_mutex_lock(&running_mutex); |
632 | |
633 | while(nthreads==(option_filesort_threads-1)) |
634 | { |
635 | for(i=0;i<option_filesort_threads;i++) |
636 | if(threads[i].running==2) |
637 | { |
638 | pthread_join(threads[i].thread,NULL); |
639 | threads[i].running=0; |
640 | nthreads--; |
641 | } |
642 | |
643 | if(nthreads==(option_filesort_threads-1)) |
644 | pthread_cond_wait(&running_cond,&running_mutex); |
645 | } |
646 | |
647 | threads[thread].running=1; |
648 | |
649 | pthread_mutex_unlock(&running_mutex); |
650 | |
651 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]); |
652 | |
653 | nthreads++; |
654 | } |
655 | else |
656 | filesort_vary_heapsort_thread(&threads[thread]); |
657 | |
658 | #else |
659 | |
660 | /* Shortcut if only one file, don't write to disk */ |
661 | |
662 | if(more==0 && nfiles==0) |
663 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
664 | else |
665 | filesort_vary_heapsort_thread(&threads[thread]); |
666 | |
667 | #endif |
668 | |
669 | nfiles++; |
670 | } |
671 | while(more); |
672 | |
673 | /* Wait for all of the threads to finish */ |
674 | |
675 | #if defined(USE_PTHREADS) && USE_PTHREADS |
676 | |
677 | while(option_filesort_threads>1 && nthreads) |
678 | { |
679 | pthread_mutex_lock(&running_mutex); |
680 | |
681 | pthread_cond_wait(&running_cond,&running_mutex); |
682 | |
683 | for(i=0;i<option_filesort_threads;i++) |
684 | if(threads[i].running==2) |
685 | { |
686 | pthread_join(threads[i].thread,NULL); |
687 | threads[i].running=0; |
688 | nthreads--; |
689 | } |
690 | |
691 | pthread_mutex_unlock(&running_mutex); |
692 | } |
693 | |
694 | #endif |
695 | |
696 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
697 | |
698 | if(nfiles==1) |
699 | { |
700 | for(item=0;item<threads[0].n;item++) |
701 | { |
702 | if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out)) |
703 | { |
704 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[item]-FILESORT_VARSIZE); |
705 | |
706 | WriteFileBuffered(fd_out,threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
707 | count_out++; |
708 | } |
709 | } |
710 | |
711 | DeleteFile(threads[0].filename); |
712 | |
713 | goto tidy_and_exit; |
714 | } |
715 | |
716 | /* Check that number of files is less than file size */ |
717 | |
718 | largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN); |
719 | |
720 | logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize),"Too many temporary files (use more sorting memory?)"); |
721 | |
722 | /* Open all of the temporary files */ |
723 | |
724 | fds=(int*)malloc(nfiles*sizeof(int)); |
725 | |
726 | for(i=0;i<nfiles;i++) |
727 | { |
728 | char *filename=threads[0].filename; |
729 | |
730 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
731 | |
732 | fds[i]=ReOpenFileBuffered(filename); |
733 | |
734 | DeleteFile(filename); |
735 | } |
736 | |
737 | /* Perform an n-way merge using a binary heap */ |
738 | |
739 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
740 | |
741 | data=threads[0].data; |
742 | datap=data+datasize-nfiles*sizeof(void*); |
743 | |
744 | /* Fill the heap to start with */ |
745 | |
746 | for(i=0;i<nfiles;i++) |
747 | { |
748 | int index; |
749 | FILESORT_VARINT itemsize; |
750 | |
751 | datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize; |
752 | |
753 | ReadFileBuffered(fds[i],&itemsize,FILESORT_VARSIZE); |
754 | |
755 | *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize; |
756 | |
757 | ReadFileBuffered(fds[i],datap[i],itemsize); |
758 | |
759 | index=i+1; |
760 | |
761 | heap[index]=i; |
762 | |
763 | /* Bubble up the new value */ |
764 | |
765 | while(index>1) |
766 | { |
767 | int newindex; |
768 | int temp; |
769 | |
770 | newindex=index/2; |
771 | |
772 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
773 | break; |
774 | |
775 | temp=heap[index]; |
776 | heap[index]=heap[newindex]; |
777 | heap[newindex]=temp; |
778 | |
779 | index=newindex; |
780 | } |
781 | } |
782 | |
783 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
784 | |
785 | ndata=nfiles; |
786 | |
787 | do |
788 | { |
789 | int index=1; |
790 | FILESORT_VARINT itemsize; |
791 | |
792 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
793 | { |
794 | itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE); |
795 | |
796 | WriteFileBuffered(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
797 | count_out++; |
798 | } |
799 | |
800 | if(ReadFileBuffered(fds[heap[index]],&itemsize,FILESORT_VARSIZE)) |
801 | { |
802 | heap[index]=heap[ndata]; |
803 | ndata--; |
804 | } |
805 | else |
806 | { |
807 | *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize; |
808 | |
809 | ReadFileBuffered(fds[heap[index]],datap[heap[index]],itemsize); |
810 | } |
811 | |
812 | /* Bubble down the new value */ |
813 | |
814 | while((2*index)<ndata) |
815 | { |
816 | int newindex; |
817 | int temp; |
818 | |
819 | newindex=2*index; |
820 | |
821 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
822 | newindex=newindex+1; |
823 | |
824 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
825 | break; |
826 | |
827 | temp=heap[newindex]; |
828 | heap[newindex]=heap[index]; |
829 | heap[index]=temp; |
830 | |
831 | index=newindex; |
832 | } |
833 | |
834 | if((2*index)==ndata) |
835 | { |
836 | int newindex; |
837 | int temp; |
838 | |
839 | newindex=2*index; |
840 | |
841 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
842 | ; /* break */ |
843 | else |
844 | { |
845 | temp=heap[newindex]; |
846 | heap[newindex]=heap[index]; |
847 | heap[index]=temp; |
848 | } |
849 | } |
850 | } |
851 | while(ndata>0); |
852 | |
853 | /* Tidy up */ |
854 | |
855 | tidy_and_exit: |
856 | |
857 | if(fds) |
858 | { |
859 | for(i=0;i<nfiles;i++) |
860 | CloseFileBuffered(fds[i]); |
861 | free(fds); |
862 | } |
863 | |
864 | if(heap) |
865 | free(heap); |
866 | |
867 | for(i=0;i<option_filesort_threads;i++) |
868 | { |
869 | log_free(threads[i].data); |
870 | |
871 | free(threads[i].data); |
872 | |
873 | free(threads[i].filename); |
874 | } |
875 | |
876 | free(threads); |
877 | |
878 | return(count_out); |
879 | } |
880 | |
881 | |
882 | /*++++++++++++++++++++++++++++++++++++++ |
883 | A wrapper function that can be run in a thread for fixed data. |
884 | |
885 | void *filesort_fixed_heapsort_thread Returns NULL (required to return void*). |
886 | |
887 | thread_data *thread The data to be processed in this thread. |
888 | ++++++++++++++++++++++++++++++++++++++*/ |
889 | |
890 | static void *filesort_fixed_heapsort_thread(thread_data *thread) |
891 | { |
892 | int fd; |
893 | size_t item; |
894 | |
895 | /* Sort the data pointers using a heap sort */ |
896 | |
897 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
898 | |
899 | /* Create a temporary file and write the result */ |
900 | |
901 | fd=OpenFileBufferedNew(thread->filename); |
902 | |
903 | for(item=0;item<thread->n;item++) |
904 | WriteFileBuffered(fd,thread->datap[item],thread->itemsize); |
905 | |
906 | CloseFileBuffered(fd); |
907 | |
908 | #if defined(USE_PTHREADS) && USE_PTHREADS |
909 | |
910 | if(option_filesort_threads>1) |
911 | { |
912 | pthread_mutex_lock(&running_mutex); |
913 | |
914 | thread->running=2; |
915 | |
916 | pthread_cond_signal(&running_cond); |
917 | |
918 | pthread_mutex_unlock(&running_mutex); |
919 | } |
920 | |
921 | #endif |
922 | |
923 | return(NULL); |
924 | } |
925 | |
926 | |
927 | /*++++++++++++++++++++++++++++++++++++++ |
928 | A wrapper function that can be run in a thread for variable data. |
929 | |
930 | void *filesort_vary_heapsort_thread Returns NULL (required to return void*). |
931 | |
932 | thread_data *thread The data to be processed in this thread. |
933 | ++++++++++++++++++++++++++++++++++++++*/ |
934 | |
935 | static void *filesort_vary_heapsort_thread(thread_data *thread) |
936 | { |
937 | int fd; |
938 | size_t item; |
939 | |
940 | /* Sort the data pointers using a heap sort */ |
941 | |
942 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
943 | |
944 | /* Create a temporary file and write the result */ |
945 | |
946 | fd=OpenFileBufferedNew(thread->filename); |
947 | |
948 | for(item=0;item<thread->n;item++) |
949 | { |
950 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[item]-FILESORT_VARSIZE); |
951 | |
952 | WriteFileBuffered(fd,thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
953 | } |
954 | |
955 | CloseFileBuffered(fd); |
956 | |
957 | #if defined(USE_PTHREADS) && USE_PTHREADS |
958 | |
959 | if(option_filesort_threads>1) |
960 | { |
961 | pthread_mutex_lock(&running_mutex); |
962 | |
963 | thread->running=2; |
964 | |
965 | pthread_cond_signal(&running_cond); |
966 | |
967 | pthread_mutex_unlock(&running_mutex); |
968 | } |
969 | |
970 | #endif |
971 | |
972 | return(NULL); |
973 | } |
974 | |
975 | |
976 | /*++++++++++++++++++++++++++++++++++++++ |
977 | A function to sort an array of pointers efficiently. |
978 | |
979 | The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort, |
980 | in particular, this is good because it can operate in-place and doesn't |
981 | allocate more memory like using qsort() does. |
982 | |
983 | void **datap A pointer to the array of pointers to sort. |
984 | |
985 | size_t nitems The number of items of data to sort. |
986 | |
987 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
988 | to qsort if the data to be sorted is an array of things not pointers. |
989 | ++++++++++++++++++++++++++++++++++++++*/ |
990 | |
991 | void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*)) |
992 | { |
993 | void **datap1=&datap[-1]; |
994 | size_t item; |
995 | |
996 | /* Fill the heap by pretending to insert the data that is already there */ |
997 | |
998 | for(item=2;item<=nitems;item++) |
999 | { |
1000 | size_t index=item; |
1001 | |
1002 | /* Bubble up the new value (upside-down, put largest at top) */ |
1003 | |
1004 | while(index>1) |
1005 | { |
1006 | int newindex; |
1007 | void *temp; |
1008 | |
1009 | newindex=index/2; |
1010 | |
1011 | if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */ |
1012 | break; |
1013 | |
1014 | temp=datap1[index]; |
1015 | datap1[index]=datap1[newindex]; |
1016 | datap1[newindex]=temp; |
1017 | |
1018 | index=newindex; |
1019 | } |
1020 | } |
1021 | |
1022 | /* Repeatedly pull out the root of the heap and swap with the bottom item */ |
1023 | |
1024 | for(item=nitems;item>1;item--) |
1025 | { |
1026 | size_t index=1; |
1027 | void *temp; |
1028 | |
1029 | temp=datap1[index]; |
1030 | datap1[index]=datap1[item]; |
1031 | datap1[item]=temp; |
1032 | |
1033 | /* Bubble down the new value (upside-down, put largest at top) */ |
1034 | |
1035 | while((2*index)<(item-1)) |
1036 | { |
1037 | int newindex; |
1038 | void *temp; |
1039 | |
1040 | newindex=2*index; |
1041 | |
1042 | if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */ |
1043 | newindex=newindex+1; |
1044 | |
1045 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1046 | break; |
1047 | |
1048 | temp=datap1[newindex]; |
1049 | datap1[newindex]=datap1[index]; |
1050 | datap1[index]=temp; |
1051 | |
1052 | index=newindex; |
1053 | } |
1054 | |
1055 | if((2*index)==(item-1)) |
1056 | { |
1057 | int newindex; |
1058 | void *temp; |
1059 | |
1060 | newindex=2*index; |
1061 | |
1062 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1063 | ; /* break */ |
1064 | else |
1065 | { |
1066 | temp=datap1[newindex]; |
1067 | datap1[newindex]=datap1[index]; |
1068 | datap1[index]=temp; |
1069 | } |
1070 | } |
1071 | } |
1072 | } |
Properties
Name | Value |
---|---|
cvs:description | Functions to perform sorting. |