Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino
Contents of /trunk/src/sorting.c
Parent Directory
|
Revision Log
Revision 1106 -
(show annotations)
(download)
(as text)
Sun Oct 21 15:55:48 2012 UTC (12 years, 4 months ago) by amb
File MIME type: text/x-csrc
File size: 26210 byte(s)
Sun Oct 21 15:55:48 2012 UTC (12 years, 4 months ago) by amb
File MIME type: text/x-csrc
File size: 26210 byte(s)
Change the sorting functions to have a pre-sort and post-sort selection function instead of just a post-selection one (this will allow deletion of some items before sorting instead of after sorting in some cases).
1 | /*************************************** |
2 | Merge sort functions. |
3 | |
4 | Part of the Routino routing software. |
5 | ******************/ /****************** |
6 | This file Copyright 2009-2012 Andrew M. Bishop |
7 | |
8 | This program is free software: you can redistribute it and/or modify |
9 | it under the terms of the GNU Affero General Public License as published by |
10 | the Free Software Foundation, either version 3 of the License, or |
11 | (at your option) any later version. |
12 | |
13 | This program is distributed in the hope that it will be useful, |
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
16 | GNU Affero General Public License for more details. |
17 | |
18 | You should have received a copy of the GNU Affero General Public License |
19 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
20 | ***************************************/ |
21 | |
22 | |
23 | #include <stdio.h> |
24 | #include <stdlib.h> |
25 | #include <string.h> |
26 | #include <assert.h> |
27 | |
28 | #if defined(USE_PTHREADS) && USE_PTHREADS |
29 | #include <pthread.h> |
30 | #endif |
31 | |
32 | #include "types.h" |
33 | |
34 | #include "files.h" |
35 | #include "sorting.h" |
36 | |
37 | |
38 | /* Global variables */ |
39 | |
40 | /*+ The command line '--tmpdir' option or its default value. +*/ |
41 | extern char *option_tmpdirname; |
42 | |
43 | /*+ The amount of RAM to use for filesorting. +*/ |
44 | extern size_t option_filesort_ramsize; |
45 | |
46 | /*+ The number of filesorting threads allowed. +*/ |
47 | extern int option_filesort_threads; |
48 | |
49 | |
50 | /* Thread data type definitions */ |
51 | |
52 | /*+ A data type for holding data for a thread. +*/ |
53 | typedef struct _thread_data |
54 | { |
55 | pthread_t thread; /*+ The thread identifier. +*/ |
56 | |
57 | int running; /*+ A flag indicating the current state of the thread. +*/ |
58 | |
59 | void *data; /*+ The main data array. +*/ |
60 | void **datap; /*+ An array of pointers to the data objects. +*/ |
61 | size_t n; /*+ The number of pointers. +*/ |
62 | |
63 | char *filename; /*+ The name of the file to write the results to. +*/ |
64 | |
65 | size_t itemsize; /*+ The size of each item. +*/ |
66 | int (*compare)(const void*,const void*); /*+ The comparison function. +*/ |
67 | } |
68 | thread_data; |
69 | |
70 | /* Thread variables */ |
71 | |
72 | #if defined(USE_PTHREADS) && USE_PTHREADS |
73 | |
74 | static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; |
75 | static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER; |
76 | |
77 | #endif |
78 | |
79 | /* Thread helper functions */ |
80 | |
81 | static void *filesort_fixed_heapsort_thread(thread_data *thread); |
82 | static void *filesort_vary_heapsort_thread(thread_data *thread); |
83 | |
84 | |
85 | /*++++++++++++++++++++++++++++++++++++++ |
86 | A function to sort the contents of a file of fixed length objects using a |
87 | limited amount of RAM. |
88 | |
89 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
90 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
91 | The individual sort steps and the merge step both use a "Heap sort" |
92 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
93 | if the data is already partially sorted. |
94 | |
95 | index_t filesort_fixed Returns the number of objects kept. |
96 | |
97 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
98 | |
99 | int fd_out The file descriptor of the output file (opened for writing and empty). |
100 | |
101 | size_t itemsize The size of each item in the file that needs sorting. |
102 | |
103 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
104 | each item before they have been sorted. The second parameter is the number of objects |
105 | previously read from the input file. If the function returns 1 then the object is kept |
106 | and it is sorted, otherwise it is ignored. |
107 | |
108 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
109 | to qsort if the data to be sorted is an array of things not pointers. |
110 | |
111 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for |
112 | each item after they have been sorted. The second parameter is the number of objects |
113 | already written to the output file. If the function returns 1 then the object is written |
114 | to the output file., otherwise it is ignored. |
115 | ++++++++++++++++++++++++++++++++++++++*/ |
116 | |
117 | index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t), |
118 | int (*compare_function)(const void*,const void*), |
119 | int (*post_sort_function)(void*,index_t)) |
120 | { |
121 | int *fds=NULL,*heap=NULL; |
122 | int nfiles=0,ndata=0; |
123 | index_t count_out=0,count_in=0; |
124 | size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*))); |
125 | void *data,**datap; |
126 | thread_data *threads; |
127 | int i,more=1; |
128 | #if defined(USE_PTHREADS) && USE_PTHREADS |
129 | int nthreads=0; |
130 | #endif |
131 | |
132 | /* Allocate the RAM buffer and other bits */ |
133 | |
134 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
135 | |
136 | for(i=0;i<option_filesort_threads;i++) |
137 | { |
138 | threads[i].running=0; |
139 | |
140 | threads[i].data=malloc(nitems*itemsize); |
141 | threads[i].datap=malloc(nitems*sizeof(void*)); |
142 | |
143 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
144 | |
145 | threads[i].itemsize=itemsize; |
146 | threads[i].compare=compare_function; |
147 | } |
148 | |
149 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
150 | |
151 | do |
152 | { |
153 | int thread=0; |
154 | |
155 | #if defined(USE_PTHREADS) && USE_PTHREADS |
156 | |
157 | if(option_filesort_threads>1) |
158 | { |
159 | /* Find a spare slot (one *must* be unused at all times) */ |
160 | |
161 | pthread_mutex_lock(&running_mutex); |
162 | |
163 | for(thread=0;thread<option_filesort_threads;thread++) |
164 | if(!threads[thread].running) |
165 | break; |
166 | |
167 | pthread_mutex_unlock(&running_mutex); |
168 | } |
169 | |
170 | #endif |
171 | |
172 | /* Read in the data and create pointers */ |
173 | |
174 | for(i=0;i<nitems;) |
175 | { |
176 | threads[thread].datap[i]=threads[thread].data+i*itemsize; |
177 | |
178 | if(ReadFile(fd_in,threads[thread].datap[i],itemsize)) |
179 | { |
180 | more=0; |
181 | break; |
182 | } |
183 | |
184 | if(!pre_sort_function || pre_sort_function(threads[thread].datap[i],count_in)) |
185 | { |
186 | i++; |
187 | count_in++; |
188 | } |
189 | } |
190 | |
191 | threads[thread].n=i; |
192 | |
193 | /* Shortcut if there is no previous data and no more data (i.e. no data at all) */ |
194 | |
195 | if(more==0 && count_in==0) |
196 | goto tidy_and_exit; |
197 | |
198 | /* No new data read in this time round */ |
199 | |
200 | if(threads[thread].n==0) |
201 | break; |
202 | |
203 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
204 | |
205 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
206 | |
207 | #if defined(USE_PTHREADS) && USE_PTHREADS |
208 | |
209 | /* Shortcut if only one file, don't write to disk */ |
210 | |
211 | if(more==0 && nfiles==0) |
212 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
213 | else if(option_filesort_threads>1) |
214 | { |
215 | pthread_mutex_lock(&running_mutex); |
216 | |
217 | while(nthreads==(option_filesort_threads-1)) |
218 | { |
219 | for(i=0;i<option_filesort_threads;i++) |
220 | if(threads[i].running==2) |
221 | { |
222 | pthread_join(threads[i].thread,NULL); |
223 | threads[i].running=0; |
224 | nthreads--; |
225 | } |
226 | |
227 | if(nthreads==(option_filesort_threads-1)) |
228 | pthread_cond_wait(&running_cond,&running_mutex); |
229 | } |
230 | |
231 | threads[thread].running=1; |
232 | |
233 | pthread_mutex_unlock(&running_mutex); |
234 | |
235 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]); |
236 | |
237 | nthreads++; |
238 | } |
239 | else |
240 | filesort_fixed_heapsort_thread(&threads[thread]); |
241 | |
242 | #else |
243 | |
244 | /* Shortcut if only one file, don't write to disk */ |
245 | |
246 | if(more==0 && nfiles==0) |
247 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
248 | else |
249 | filesort_fixed_heapsort_thread(&threads[thread]); |
250 | |
251 | #endif |
252 | |
253 | nfiles++; |
254 | } |
255 | while(more); |
256 | |
257 | /* Wait for all of the threads to finish */ |
258 | |
259 | #if defined(USE_PTHREADS) && USE_PTHREADS |
260 | |
261 | while(option_filesort_threads>1 && nthreads) |
262 | { |
263 | pthread_mutex_lock(&running_mutex); |
264 | |
265 | pthread_cond_wait(&running_cond,&running_mutex); |
266 | |
267 | for(i=0;i<option_filesort_threads;i++) |
268 | if(threads[i].running==2) |
269 | { |
270 | pthread_join(threads[i].thread,NULL); |
271 | threads[i].running=0; |
272 | nthreads--; |
273 | } |
274 | |
275 | pthread_mutex_unlock(&running_mutex); |
276 | } |
277 | |
278 | #endif |
279 | |
280 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
281 | |
282 | if(nfiles==1) |
283 | { |
284 | for(i=0;i<threads[0].n;i++) |
285 | { |
286 | if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out)) |
287 | { |
288 | WriteFile(fd_out,threads[0].datap[i],itemsize); |
289 | count_out++; |
290 | } |
291 | } |
292 | |
293 | DeleteFile(threads[0].filename); |
294 | |
295 | goto tidy_and_exit; |
296 | } |
297 | |
298 | /* Check that number of files is less than file size */ |
299 | |
300 | assert(nfiles<nitems); |
301 | |
302 | /* Open all of the temporary files */ |
303 | |
304 | fds=(int*)malloc(nfiles*sizeof(int)); |
305 | |
306 | for(i=0;i<nfiles;i++) |
307 | { |
308 | char *filename=threads[0].filename; |
309 | |
310 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
311 | |
312 | fds[i]=ReOpenFile(filename); |
313 | |
314 | DeleteFile(filename); |
315 | } |
316 | |
317 | /* Perform an n-way merge using a binary heap */ |
318 | |
319 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
320 | |
321 | data =threads[0].data; |
322 | datap=threads[0].datap; |
323 | |
324 | /* Fill the heap to start with */ |
325 | |
326 | for(i=0;i<nfiles;i++) |
327 | { |
328 | int index; |
329 | |
330 | datap[i]=data+i*itemsize; |
331 | |
332 | ReadFile(fds[i],datap[i],itemsize); |
333 | |
334 | index=i+1; |
335 | |
336 | heap[index]=i; |
337 | |
338 | /* Bubble up the new value */ |
339 | |
340 | while(index>1) |
341 | { |
342 | int newindex; |
343 | int temp; |
344 | |
345 | newindex=index/2; |
346 | |
347 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
348 | break; |
349 | |
350 | temp=heap[index]; |
351 | heap[index]=heap[newindex]; |
352 | heap[newindex]=temp; |
353 | |
354 | index=newindex; |
355 | } |
356 | } |
357 | |
358 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
359 | |
360 | ndata=nfiles; |
361 | |
362 | do |
363 | { |
364 | int index=1; |
365 | |
366 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
367 | { |
368 | WriteFile(fd_out,datap[heap[index]],itemsize); |
369 | count_out++; |
370 | } |
371 | |
372 | if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize)) |
373 | { |
374 | heap[index]=heap[ndata]; |
375 | ndata--; |
376 | } |
377 | |
378 | /* Bubble down the new value */ |
379 | |
380 | while((2*index)<ndata) |
381 | { |
382 | int newindex; |
383 | int temp; |
384 | |
385 | newindex=2*index; |
386 | |
387 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
388 | newindex=newindex+1; |
389 | |
390 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
391 | break; |
392 | |
393 | temp=heap[newindex]; |
394 | heap[newindex]=heap[index]; |
395 | heap[index]=temp; |
396 | |
397 | index=newindex; |
398 | } |
399 | |
400 | if((2*index)==ndata) |
401 | { |
402 | int newindex; |
403 | int temp; |
404 | |
405 | newindex=2*index; |
406 | |
407 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
408 | ; /* break */ |
409 | else |
410 | { |
411 | temp=heap[newindex]; |
412 | heap[newindex]=heap[index]; |
413 | heap[index]=temp; |
414 | } |
415 | } |
416 | } |
417 | while(ndata>0); |
418 | |
419 | /* Tidy up */ |
420 | |
421 | tidy_and_exit: |
422 | |
423 | if(fds) |
424 | { |
425 | for(i=0;i<nfiles;i++) |
426 | CloseFile(fds[i]); |
427 | free(fds); |
428 | } |
429 | |
430 | if(heap) |
431 | free(heap); |
432 | |
433 | for(i=0;i<option_filesort_threads;i++) |
434 | { |
435 | free(threads[i].data); |
436 | free(threads[i].datap); |
437 | |
438 | free(threads[i].filename); |
439 | } |
440 | |
441 | return(count_out); |
442 | } |
443 | |
444 | |
445 | /*++++++++++++++++++++++++++++++++++++++ |
446 | A function to sort the contents of a file of variable length objects (each |
447 | preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM. |
448 | |
449 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
450 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
451 | The individual sort steps and the merge step both use a "Heap sort" |
452 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
453 | if the data is already partially sorted. |
454 | |
455 | index_t filesort_vary Returns the number of objects kept. |
456 | |
457 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
458 | |
459 | int fd_out The file descriptor of the output file (opened for writing and empty). |
460 | |
461 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
462 | each item before they have been sorted. The second parameter is the number of objects |
463 | previously read from the input file. If the function returns 1 then the object is kept |
464 | and it is sorted, otherwise it is ignored. |
465 | |
466 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
467 | to qsort if the data to be sorted is an array of things not pointers. |
468 | |
469 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for |
470 | each item after they have been sorted. The second parameter is the number of objects |
471 | already written to the output file. If the function returns 1 then the object is written |
472 | to the output file., otherwise it is ignored. |
473 | ++++++++++++++++++++++++++++++++++++++*/ |
474 | |
475 | index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t), |
476 | int (*compare_function)(const void*,const void*), |
477 | int (*post_sort_function)(void*,index_t)) |
478 | { |
479 | int *fds=NULL,*heap=NULL; |
480 | int nfiles=0,ndata=0; |
481 | index_t count_out=0,count_in=0; |
482 | size_t datasize=option_filesort_ramsize/option_filesort_threads; |
483 | FILESORT_VARINT nextitemsize,largestitemsize=0; |
484 | void *data,**datap; |
485 | thread_data *threads; |
486 | int i,more=1; |
487 | #if defined(USE_PTHREADS) && USE_PTHREADS |
488 | int nthreads=0; |
489 | #endif |
490 | |
491 | /* Allocate the RAM buffer and other bits */ |
492 | |
493 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
494 | |
495 | for(i=0;i<option_filesort_threads;i++) |
496 | { |
497 | threads[i].running=0; |
498 | |
499 | threads[i].data=malloc(datasize); |
500 | threads[i].datap=NULL; |
501 | |
502 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
503 | |
504 | threads[i].compare=compare_function; |
505 | } |
506 | |
507 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
508 | |
509 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */ |
510 | goto tidy_and_exit; |
511 | |
512 | do |
513 | { |
514 | size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE; |
515 | int thread=0; |
516 | |
517 | #if defined(USE_PTHREADS) && USE_PTHREADS |
518 | |
519 | if(option_filesort_threads>1) |
520 | { |
521 | /* Find a spare slot (one *must* be unused at all times) */ |
522 | |
523 | pthread_mutex_lock(&running_mutex); |
524 | |
525 | for(thread=0;thread<option_filesort_threads;thread++) |
526 | if(!threads[thread].running) |
527 | break; |
528 | |
529 | pthread_mutex_unlock(&running_mutex); |
530 | } |
531 | |
532 | #endif |
533 | |
534 | threads[thread].datap=threads[thread].data+datasize; |
535 | |
536 | threads[thread].n=0; |
537 | |
538 | /* Read in the data and create pointers */ |
539 | |
540 | while((ramused+FILESORT_VARSIZE+nextitemsize)<=((void*)threads[thread].datap-sizeof(void*)-threads[thread].data)) |
541 | { |
542 | FILESORT_VARINT itemsize=nextitemsize; |
543 | |
544 | if(itemsize>largestitemsize) |
545 | largestitemsize=itemsize; |
546 | |
547 | *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize; |
548 | |
549 | ramused+=FILESORT_VARSIZE; |
550 | |
551 | ReadFile(fd_in,threads[thread].data+ramused,itemsize); |
552 | |
553 | if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in)) |
554 | { |
555 | *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */ |
556 | |
557 | ramused+=itemsize; |
558 | |
559 | ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN); |
560 | ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE; |
561 | |
562 | count_in++; |
563 | threads[thread].n++; |
564 | } |
565 | |
566 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) |
567 | { |
568 | more=0; |
569 | break; |
570 | } |
571 | } |
572 | |
573 | /* No new data read in this time round */ |
574 | |
575 | if(threads[thread].n==0) |
576 | break; |
577 | |
578 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
579 | |
580 | if(more==0 && nfiles==0) |
581 | threads[thread].filename[0]=0; |
582 | else |
583 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
584 | |
585 | #if defined(USE_PTHREADS) && USE_PTHREADS |
586 | |
587 | /* Shortcut if only one file, don't write to disk */ |
588 | |
589 | if(more==0 && nfiles==0) |
590 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
591 | else if(option_filesort_threads>1) |
592 | { |
593 | pthread_mutex_lock(&running_mutex); |
594 | |
595 | while(nthreads==(option_filesort_threads-1)) |
596 | { |
597 | for(i=0;i<option_filesort_threads;i++) |
598 | if(threads[i].running==2) |
599 | { |
600 | pthread_join(threads[i].thread,NULL); |
601 | threads[i].running=0; |
602 | nthreads--; |
603 | } |
604 | |
605 | if(nthreads==(option_filesort_threads-1)) |
606 | pthread_cond_wait(&running_cond,&running_mutex); |
607 | } |
608 | |
609 | threads[thread].running=1; |
610 | |
611 | pthread_mutex_unlock(&running_mutex); |
612 | |
613 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]); |
614 | |
615 | nthreads++; |
616 | } |
617 | else |
618 | filesort_vary_heapsort_thread(&threads[thread]); |
619 | |
620 | #else |
621 | |
622 | /* Shortcut if only one file, don't write to disk */ |
623 | |
624 | if(more==0 && nfiles==0) |
625 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); |
626 | else |
627 | filesort_vary_heapsort_thread(&threads[thread]); |
628 | |
629 | #endif |
630 | |
631 | nfiles++; |
632 | } |
633 | while(more); |
634 | |
635 | /* Wait for all of the threads to finish */ |
636 | |
637 | #if defined(USE_PTHREADS) && USE_PTHREADS |
638 | |
639 | while(option_filesort_threads>1 && nthreads) |
640 | { |
641 | pthread_mutex_lock(&running_mutex); |
642 | |
643 | pthread_cond_wait(&running_cond,&running_mutex); |
644 | |
645 | for(i=0;i<option_filesort_threads;i++) |
646 | if(threads[i].running==2) |
647 | { |
648 | pthread_join(threads[i].thread,NULL); |
649 | threads[i].running=0; |
650 | nthreads--; |
651 | } |
652 | |
653 | pthread_mutex_unlock(&running_mutex); |
654 | } |
655 | |
656 | #endif |
657 | |
658 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
659 | |
660 | if(nfiles==1) |
661 | { |
662 | for(i=0;i<threads[0].n;i++) |
663 | { |
664 | if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out)) |
665 | { |
666 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[i]-FILESORT_VARSIZE); |
667 | |
668 | WriteFile(fd_out,threads[0].datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
669 | count_out++; |
670 | } |
671 | } |
672 | |
673 | DeleteFile(threads[0].filename); |
674 | |
675 | goto tidy_and_exit; |
676 | } |
677 | |
678 | /* Check that number of files is less than file size */ |
679 | |
680 | largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN); |
681 | |
682 | assert(nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize)); |
683 | |
684 | /* Open all of the temporary files */ |
685 | |
686 | fds=(int*)malloc(nfiles*sizeof(int)); |
687 | |
688 | for(i=0;i<nfiles;i++) |
689 | { |
690 | char *filename=threads[0].filename; |
691 | |
692 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
693 | |
694 | fds[i]=ReOpenFile(filename); |
695 | |
696 | DeleteFile(filename); |
697 | } |
698 | |
699 | /* Perform an n-way merge using a binary heap */ |
700 | |
701 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
702 | |
703 | data=threads[0].data; |
704 | datap=data+datasize-nfiles*sizeof(void*); |
705 | |
706 | /* Fill the heap to start with */ |
707 | |
708 | for(i=0;i<nfiles;i++) |
709 | { |
710 | int index; |
711 | FILESORT_VARINT itemsize; |
712 | |
713 | datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize; |
714 | |
715 | ReadFile(fds[i],&itemsize,FILESORT_VARSIZE); |
716 | |
717 | *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize; |
718 | |
719 | ReadFile(fds[i],datap[i],itemsize); |
720 | |
721 | index=i+1; |
722 | |
723 | heap[index]=i; |
724 | |
725 | /* Bubble up the new value */ |
726 | |
727 | while(index>1) |
728 | { |
729 | int newindex; |
730 | int temp; |
731 | |
732 | newindex=index/2; |
733 | |
734 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
735 | break; |
736 | |
737 | temp=heap[index]; |
738 | heap[index]=heap[newindex]; |
739 | heap[newindex]=temp; |
740 | |
741 | index=newindex; |
742 | } |
743 | } |
744 | |
745 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
746 | |
747 | ndata=nfiles; |
748 | |
749 | do |
750 | { |
751 | int index=1; |
752 | FILESORT_VARINT itemsize; |
753 | |
754 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
755 | { |
756 | itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE); |
757 | |
758 | WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
759 | count_out++; |
760 | } |
761 | |
762 | if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE)) |
763 | { |
764 | heap[index]=heap[ndata]; |
765 | ndata--; |
766 | } |
767 | else |
768 | { |
769 | *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize; |
770 | |
771 | ReadFile(fds[heap[index]],datap[heap[index]],itemsize); |
772 | } |
773 | |
774 | /* Bubble down the new value */ |
775 | |
776 | while((2*index)<ndata) |
777 | { |
778 | int newindex; |
779 | int temp; |
780 | |
781 | newindex=2*index; |
782 | |
783 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
784 | newindex=newindex+1; |
785 | |
786 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
787 | break; |
788 | |
789 | temp=heap[newindex]; |
790 | heap[newindex]=heap[index]; |
791 | heap[index]=temp; |
792 | |
793 | index=newindex; |
794 | } |
795 | |
796 | if((2*index)==ndata) |
797 | { |
798 | int newindex; |
799 | int temp; |
800 | |
801 | newindex=2*index; |
802 | |
803 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
804 | ; /* break */ |
805 | else |
806 | { |
807 | temp=heap[newindex]; |
808 | heap[newindex]=heap[index]; |
809 | heap[index]=temp; |
810 | } |
811 | } |
812 | } |
813 | while(ndata>0); |
814 | |
815 | /* Tidy up */ |
816 | |
817 | tidy_and_exit: |
818 | |
819 | if(fds) |
820 | { |
821 | for(i=0;i<nfiles;i++) |
822 | CloseFile(fds[i]); |
823 | free(fds); |
824 | } |
825 | |
826 | if(heap) |
827 | free(heap); |
828 | |
829 | for(i=0;i<option_filesort_threads;i++) |
830 | { |
831 | free(threads[i].data); |
832 | |
833 | free(threads[i].filename); |
834 | } |
835 | |
836 | return(count_out); |
837 | } |
838 | |
839 | |
840 | /*++++++++++++++++++++++++++++++++++++++ |
841 | A wrapper function that can be run in a thread for fixed data. |
842 | |
843 | void *filesort_fixed_heapsort_thread Returns NULL (required to return void*). |
844 | |
845 | thread_data *thread The data to be processed in this thread. |
846 | ++++++++++++++++++++++++++++++++++++++*/ |
847 | |
848 | static void *filesort_fixed_heapsort_thread(thread_data *thread) |
849 | { |
850 | int fd,i; |
851 | |
852 | /* Sort the data pointers using a heap sort */ |
853 | |
854 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
855 | |
856 | /* Create a temporary file and write the result */ |
857 | |
858 | fd=OpenFileNew(thread->filename); |
859 | |
860 | for(i=0;i<thread->n;i++) |
861 | WriteFile(fd,thread->datap[i],thread->itemsize); |
862 | |
863 | CloseFile(fd); |
864 | |
865 | #if defined(USE_PTHREADS) && USE_PTHREADS |
866 | |
867 | if(option_filesort_threads>1) |
868 | { |
869 | pthread_mutex_lock(&running_mutex); |
870 | |
871 | thread->running=2; |
872 | |
873 | pthread_cond_signal(&running_cond); |
874 | |
875 | pthread_mutex_unlock(&running_mutex); |
876 | } |
877 | |
878 | #endif |
879 | |
880 | return(NULL); |
881 | } |
882 | |
883 | |
884 | /*++++++++++++++++++++++++++++++++++++++ |
885 | A wrapper function that can be run in a thread for variable data. |
886 | |
887 | void *filesort_vary_heapsort_thread Returns NULL (required to return void*). |
888 | |
889 | thread_data *thread The data to be processed in this thread. |
890 | ++++++++++++++++++++++++++++++++++++++*/ |
891 | |
892 | static void *filesort_vary_heapsort_thread(thread_data *thread) |
893 | { |
894 | int fd,i; |
895 | |
896 | /* Sort the data pointers using a heap sort */ |
897 | |
898 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
899 | |
900 | /* Create a temporary file and write the result */ |
901 | |
902 | fd=OpenFileNew(thread->filename); |
903 | |
904 | for(i=0;i<thread->n;i++) |
905 | { |
906 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[i]-FILESORT_VARSIZE); |
907 | |
908 | WriteFile(fd,thread->datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
909 | } |
910 | |
911 | CloseFile(fd); |
912 | |
913 | #if defined(USE_PTHREADS) && USE_PTHREADS |
914 | |
915 | if(option_filesort_threads>1) |
916 | { |
917 | pthread_mutex_lock(&running_mutex); |
918 | |
919 | thread->running=2; |
920 | |
921 | pthread_cond_signal(&running_cond); |
922 | |
923 | pthread_mutex_unlock(&running_mutex); |
924 | } |
925 | |
926 | #endif |
927 | |
928 | return(NULL); |
929 | } |
930 | |
931 | |
932 | /*++++++++++++++++++++++++++++++++++++++ |
933 | A function to sort an array of pointers efficiently. |
934 | |
935 | The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort, |
936 | in particular, this is good because it can operate in-place and doesn't |
937 | allocate more memory like using qsort() does. |
938 | |
939 | void **datap A pointer to the array of pointers to sort. |
940 | |
941 | size_t nitems The number of items of data to sort. |
942 | |
943 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
944 | to qsort if the data to be sorted is an array of things not pointers. |
945 | ++++++++++++++++++++++++++++++++++++++*/ |
946 | |
947 | void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*)) |
948 | { |
949 | void **datap1=&datap[-1]; |
950 | int i; |
951 | |
952 | /* Fill the heap by pretending to insert the data that is already there */ |
953 | |
954 | for(i=2;i<=nitems;i++) |
955 | { |
956 | int index=i; |
957 | |
958 | /* Bubble up the new value (upside-down, put largest at top) */ |
959 | |
960 | while(index>1) |
961 | { |
962 | int newindex; |
963 | void *temp; |
964 | |
965 | newindex=index/2; |
966 | |
967 | if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */ |
968 | break; |
969 | |
970 | temp=datap1[index]; |
971 | datap1[index]=datap1[newindex]; |
972 | datap1[newindex]=temp; |
973 | |
974 | index=newindex; |
975 | } |
976 | } |
977 | |
978 | /* Repeatedly pull out the root of the heap and swap with the bottom item */ |
979 | |
980 | for(i=nitems;i>1;i--) |
981 | { |
982 | int index=1; |
983 | void *temp; |
984 | |
985 | temp=datap1[index]; |
986 | datap1[index]=datap1[i]; |
987 | datap1[i]=temp; |
988 | |
989 | /* Bubble down the new value (upside-down, put largest at top) */ |
990 | |
991 | while((2*index)<(i-1)) |
992 | { |
993 | int newindex; |
994 | void *temp; |
995 | |
996 | newindex=2*index; |
997 | |
998 | if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */ |
999 | newindex=newindex+1; |
1000 | |
1001 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1002 | break; |
1003 | |
1004 | temp=datap1[newindex]; |
1005 | datap1[newindex]=datap1[index]; |
1006 | datap1[index]=temp; |
1007 | |
1008 | index=newindex; |
1009 | } |
1010 | |
1011 | if((2*index)==(i-1)) |
1012 | { |
1013 | int newindex; |
1014 | void *temp; |
1015 | |
1016 | newindex=2*index; |
1017 | |
1018 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1019 | ; /* break */ |
1020 | else |
1021 | { |
1022 | temp=datap1[newindex]; |
1023 | datap1[newindex]=datap1[index]; |
1024 | datap1[index]=temp; |
1025 | } |
1026 | } |
1027 | } |
1028 | } |
Properties
Name | Value |
---|---|
cvs:description | Functions to perform sorting. |