Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino
Contents of /trunk/src/sorting.c
Parent Directory
|
Revision Log
Revision 996 -
(show annotations)
(download)
(as text)
Thu May 10 18:18:14 2012 UTC (12 years, 10 months ago) by amb
File MIME type: text/x-csrc
File size: 23926 byte(s)
Thu May 10 18:18:14 2012 UTC (12 years, 10 months ago) by amb
File MIME type: text/x-csrc
File size: 23926 byte(s)
Added some mutexes and condition variables to communicate between threads.
1 | /*************************************** |
2 | Merge sort functions. |
3 | |
4 | Part of the Routino routing software. |
5 | ******************/ /****************** |
6 | This file Copyright 2009-2012 Andrew M. Bishop |
7 | |
8 | This program is free software: you can redistribute it and/or modify |
9 | it under the terms of the GNU Affero General Public License as published by |
10 | the Free Software Foundation, either version 3 of the License, or |
11 | (at your option) any later version. |
12 | |
13 | This program is distributed in the hope that it will be useful, |
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
16 | GNU Affero General Public License for more details. |
17 | |
18 | You should have received a copy of the GNU Affero General Public License |
19 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
20 | ***************************************/ |
21 | |
22 | |
23 | #include <stdio.h> |
24 | #include <stdlib.h> |
25 | #include <string.h> |
26 | #include <assert.h> |
27 | |
28 | #if defined(USE_PTHREADS) && USE_PTHREADS |
29 | #include <pthread.h> |
30 | #endif |
31 | |
32 | #include "types.h" |
33 | |
34 | #include "files.h" |
35 | #include "sorting.h" |
36 | |
37 | |
38 | /* Global variables */ |
39 | |
40 | /*+ The command line '--tmpdir' option or its default value. +*/ |
41 | extern char *option_tmpdirname; |
42 | |
43 | /*+ The amount of RAM to use for filesorting. +*/ |
44 | extern size_t option_filesort_ramsize; |
45 | |
46 | /*+ The number of filesorting threads allowed. +*/ |
47 | extern int option_filesort_threads; |
48 | |
49 | |
50 | /* Thread data type definitions */ |
51 | |
52 | /*+ A data type for holding data for a thread. +*/ |
53 | typedef struct _thread_data |
54 | { |
55 | pthread_t thread; /*+ The thread identifier. +*/ |
56 | |
57 | int running; /*+ A flag indicating the current state of the thread. +*/ |
58 | |
59 | void *data; /*+ The main data array. +*/ |
60 | void **datap; /*+ An array of pointers to the data objects. +*/ |
61 | size_t n; /*+ The number of pointers. +*/ |
62 | |
63 | char *filename; /*+ The name of the file to write the results to. +*/ |
64 | |
65 | size_t itemsize; /*+ The size of each item. +*/ |
66 | int (*compare)(const void*,const void*); /*+ The comparison function. +*/ |
67 | } |
68 | thread_data; |
69 | |
70 | /* Thread variables */ |
71 | |
72 | #if defined(USE_PTHREADS) && USE_PTHREADS |
73 | |
74 | static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; |
75 | static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER; |
76 | |
77 | #endif |
78 | |
79 | /* Thread helper functions */ |
80 | |
81 | static void *filesort_fixed_heapsort_thread(thread_data *thread); |
82 | static void *filesort_vary_heapsort_thread(thread_data *thread); |
83 | |
84 | |
85 | /*++++++++++++++++++++++++++++++++++++++ |
86 | A function to sort the contents of a file of fixed length objects using a |
87 | limited amount of RAM. |
88 | |
89 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
90 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
91 | The individual sort steps and the merge step both use a "Heap sort" |
92 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
93 | if the data is already partially sorted. |
94 | |
95 | index_t filesort_fixed Returns the number of objects kept. |
96 | |
97 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
98 | |
99 | int fd_out The file descriptor of the output file (opened for writing and empty). |
100 | |
101 | size_t itemsize The size of each item in the file that needs sorting. |
102 | |
103 | int (*compare)(const void*, const void*) The comparison function (identical to qsort if the |
104 | data to be sorted is an array of things not pointers). |
105 | |
106 | int (*keep)(void *,index_t) If non-NULL then this function is called for each item, if it |
107 | returns 1 then the object is kept and written to the output file. |
108 | ++++++++++++++++++++++++++++++++++++++*/ |
109 | |
110 | index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*compare)(const void*,const void*),int (*keep)(void*,index_t)) |
111 | { |
112 | int *fds=NULL,*heap=NULL; |
113 | int nfiles=0,ndata=0; |
114 | index_t count=0,total=0; |
115 | size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*))); |
116 | void *data,**datap; |
117 | thread_data *threads; |
118 | int i,more=1; |
119 | #if defined(USE_PTHREADS) && USE_PTHREADS |
120 | int nthreads=0; |
121 | #endif |
122 | |
123 | /* Allocate the RAM buffer and other bits */ |
124 | |
125 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
126 | |
127 | for(i=0;i<option_filesort_threads;i++) |
128 | { |
129 | threads[i].running=0; |
130 | |
131 | threads[i].data=malloc(nitems*itemsize); |
132 | threads[i].datap=malloc(nitems*sizeof(void*)); |
133 | |
134 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
135 | |
136 | threads[i].itemsize=itemsize; |
137 | threads[i].compare=compare; |
138 | } |
139 | |
140 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
141 | |
142 | do |
143 | { |
144 | int thread=0; |
145 | |
146 | #if defined(USE_PTHREADS) && USE_PTHREADS |
147 | |
148 | /* Find a spare slot (one *must* be unused at all times) */ |
149 | |
150 | pthread_mutex_lock(&running_mutex); |
151 | |
152 | for(thread=0;thread<option_filesort_threads;thread++) |
153 | if(!threads[thread].running) |
154 | break; |
155 | |
156 | pthread_mutex_unlock(&running_mutex); |
157 | |
158 | #endif |
159 | |
160 | /* Read in the data and create pointers */ |
161 | |
162 | for(i=0;i<nitems;i++) |
163 | { |
164 | threads[thread].datap[i]=threads[thread].data+i*itemsize; |
165 | |
166 | if(ReadFile(fd_in,threads[thread].datap[i],itemsize)) |
167 | { |
168 | more=0; |
169 | break; |
170 | } |
171 | |
172 | total++; |
173 | } |
174 | |
175 | threads[thread].n=i; |
176 | |
177 | /* Shortcut if there is no data and no previous files (i.e. no data at all) */ |
178 | |
179 | if(nfiles==0 && threads[thread].n==0) |
180 | goto tidy_and_exit; |
181 | |
182 | /* No new data read in this time round */ |
183 | |
184 | if(threads[thread].n==0) |
185 | break; |
186 | |
187 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
188 | |
189 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
190 | |
191 | #if defined(USE_PTHREADS) && USE_PTHREADS |
192 | |
193 | if(option_filesort_threads==1) |
194 | { |
195 | pthread_mutex_lock(&running_mutex); |
196 | |
197 | threads[thread].running=1; |
198 | |
199 | pthread_mutex_unlock(&running_mutex); |
200 | |
201 | filesort_fixed_heapsort_thread(&threads[thread]); |
202 | |
203 | pthread_mutex_lock(&running_mutex); |
204 | |
205 | threads[thread].running=0; |
206 | |
207 | pthread_mutex_unlock(&running_mutex); |
208 | } |
209 | else |
210 | { |
211 | pthread_mutex_lock(&running_mutex); |
212 | |
213 | while(nthreads==(option_filesort_threads-1)) |
214 | { |
215 | for(i=0;i<option_filesort_threads;i++) |
216 | if(threads[i].running==2) |
217 | { |
218 | pthread_join(threads[i].thread,NULL); |
219 | threads[i].running=0; |
220 | nthreads--; |
221 | } |
222 | |
223 | if(nthreads==(option_filesort_threads-1)) |
224 | pthread_cond_wait(&running_cond,&running_mutex); |
225 | } |
226 | |
227 | threads[thread].running=1; |
228 | |
229 | pthread_mutex_unlock(&running_mutex); |
230 | |
231 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]); |
232 | |
233 | nthreads++; |
234 | } |
235 | |
236 | #else |
237 | |
238 | filesort_fixed_heapsort_thread(&threads[thread]); |
239 | |
240 | #endif |
241 | |
242 | nfiles++; |
243 | } |
244 | while(more); |
245 | |
246 | /* Wait for all of the threads to finish */ |
247 | |
248 | #if defined(USE_PTHREADS) && USE_PTHREADS |
249 | |
250 | while(nthreads && option_filesort_threads>1) |
251 | { |
252 | pthread_mutex_lock(&running_mutex); |
253 | |
254 | pthread_cond_wait(&running_cond,&running_mutex); |
255 | |
256 | for(i=0;i<option_filesort_threads;i++) |
257 | if(threads[i].running==2) |
258 | { |
259 | pthread_join(threads[i].thread,NULL); |
260 | threads[i].running=0; |
261 | nthreads--; |
262 | } |
263 | |
264 | pthread_mutex_unlock(&running_mutex); |
265 | } |
266 | |
267 | #endif |
268 | |
269 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
270 | |
271 | if(nfiles==1) |
272 | { |
273 | for(i=0;i<threads[0].n;i++) |
274 | { |
275 | if(!keep || keep(threads[0].datap[i],count)) |
276 | { |
277 | WriteFile(fd_out,threads[0].datap[i],itemsize); |
278 | count++; |
279 | } |
280 | } |
281 | |
282 | DeleteFile(threads[0].filename); |
283 | |
284 | goto tidy_and_exit; |
285 | } |
286 | |
287 | /* Check that number of files is less than file size */ |
288 | |
289 | assert(nfiles<nitems); |
290 | |
291 | /* Open all of the temporary files */ |
292 | |
293 | fds=(int*)malloc(nfiles*sizeof(int)); |
294 | |
295 | for(i=0;i<nfiles;i++) |
296 | { |
297 | char *filename=threads[0].filename; |
298 | |
299 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
300 | |
301 | fds[i]=ReOpenFile(filename); |
302 | |
303 | DeleteFile(filename); |
304 | } |
305 | |
306 | /* Perform an n-way merge using a binary heap */ |
307 | |
308 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
309 | |
310 | data =threads[0].data; |
311 | datap=threads[0].datap; |
312 | |
313 | /* Fill the heap to start with */ |
314 | |
315 | for(i=0;i<nfiles;i++) |
316 | { |
317 | int index; |
318 | |
319 | datap[i]=data+i*itemsize; |
320 | |
321 | ReadFile(fds[i],datap[i],itemsize); |
322 | |
323 | index=i+1; |
324 | |
325 | heap[index]=i; |
326 | |
327 | /* Bubble up the new value */ |
328 | |
329 | while(index>1) |
330 | { |
331 | int newindex; |
332 | int temp; |
333 | |
334 | newindex=index/2; |
335 | |
336 | if(compare(datap[heap[index]],datap[heap[newindex]])>=0) |
337 | break; |
338 | |
339 | temp=heap[index]; |
340 | heap[index]=heap[newindex]; |
341 | heap[newindex]=temp; |
342 | |
343 | index=newindex; |
344 | } |
345 | } |
346 | |
347 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
348 | |
349 | ndata=nfiles; |
350 | |
351 | do |
352 | { |
353 | int index=1; |
354 | |
355 | if(!keep || keep(datap[heap[index]],count)) |
356 | { |
357 | WriteFile(fd_out,datap[heap[index]],itemsize); |
358 | count++; |
359 | } |
360 | |
361 | if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize)) |
362 | { |
363 | heap[index]=heap[ndata]; |
364 | ndata--; |
365 | } |
366 | |
367 | /* Bubble down the new value */ |
368 | |
369 | while((2*index)<ndata) |
370 | { |
371 | int newindex; |
372 | int temp; |
373 | |
374 | newindex=2*index; |
375 | |
376 | if(compare(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
377 | newindex=newindex+1; |
378 | |
379 | if(compare(datap[heap[index]],datap[heap[newindex]])<=0) |
380 | break; |
381 | |
382 | temp=heap[newindex]; |
383 | heap[newindex]=heap[index]; |
384 | heap[index]=temp; |
385 | |
386 | index=newindex; |
387 | } |
388 | |
389 | if((2*index)==ndata) |
390 | { |
391 | int newindex; |
392 | int temp; |
393 | |
394 | newindex=2*index; |
395 | |
396 | if(compare(datap[heap[index]],datap[heap[newindex]])<=0) |
397 | ; /* break */ |
398 | else |
399 | { |
400 | temp=heap[newindex]; |
401 | heap[newindex]=heap[index]; |
402 | heap[index]=temp; |
403 | } |
404 | } |
405 | } |
406 | while(ndata>0); |
407 | |
408 | /* Tidy up */ |
409 | |
410 | tidy_and_exit: |
411 | |
412 | if(fds) |
413 | { |
414 | for(i=0;i<nfiles;i++) |
415 | CloseFile(fds[i]); |
416 | free(fds); |
417 | } |
418 | |
419 | if(heap) |
420 | free(heap); |
421 | |
422 | for(i=0;i<option_filesort_threads;i++) |
423 | { |
424 | free(threads[i].data); |
425 | free(threads[i].datap); |
426 | |
427 | free(threads[i].filename); |
428 | } |
429 | |
430 | return(count); |
431 | } |
432 | |
433 | |
434 | /*++++++++++++++++++++++++++++++++++++++ |
435 | A function to sort the contents of a file of variable length objects (each |
436 | preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM. |
437 | |
438 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort |
439 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. |
440 | The individual sort steps and the merge step both use a "Heap sort" |
441 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well |
442 | if the data is already partially sorted. |
443 | |
444 | index_t filesort_vary Returns the number of objects kept. |
445 | |
446 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
447 | |
448 | int fd_out The file descriptor of the output file (opened for writing and empty). |
449 | |
450 | int (*compare)(const void*, const void*) The comparison function (identical to qsort if the |
451 | data to be sorted is an array of things not pointers). |
452 | |
453 | int (*keep)(void *,index_t) If non-NULL then this function is called for each item, if it |
454 | returns 1 then the object is kept and written to the output file. |
455 | ++++++++++++++++++++++++++++++++++++++*/ |
456 | |
457 | index_t filesort_vary(int fd_in,int fd_out,int (*compare)(const void*,const void*),int (*keep)(void*,index_t)) |
458 | { |
459 | int *fds=NULL,*heap=NULL; |
460 | int nfiles=0,ndata=0; |
461 | index_t count=0,total=0; |
462 | size_t datasize=option_filesort_ramsize/option_filesort_threads; |
463 | FILESORT_VARINT nextitemsize,largestitemsize=0; |
464 | void *data,**datap; |
465 | thread_data *threads; |
466 | int i,more=1; |
467 | #if defined(USE_PTHREADS) && USE_PTHREADS |
468 | int nthreads=0; |
469 | #endif |
470 | |
471 | /* Allocate the RAM buffer and other bits */ |
472 | |
473 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
474 | |
475 | for(i=0;i<option_filesort_threads;i++) |
476 | { |
477 | threads[i].running=0; |
478 | |
479 | threads[i].data=malloc(datasize); |
480 | threads[i].datap=NULL; |
481 | |
482 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); |
483 | |
484 | threads[i].compare=compare; |
485 | } |
486 | |
487 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
488 | |
489 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */ |
490 | goto tidy_and_exit; |
491 | |
492 | do |
493 | { |
494 | size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE; |
495 | int thread=0; |
496 | |
497 | #if defined(USE_PTHREADS) && USE_PTHREADS |
498 | |
499 | /* Find a spare slot (one *must* be unused at all times) */ |
500 | |
501 | pthread_mutex_lock(&running_mutex); |
502 | |
503 | for(thread=0;thread<option_filesort_threads;thread++) |
504 | if(!threads[thread].running) |
505 | break; |
506 | |
507 | pthread_mutex_unlock(&running_mutex); |
508 | |
509 | #endif |
510 | |
511 | threads[thread].datap=threads[thread].data+datasize; |
512 | |
513 | threads[thread].n=0; |
514 | |
515 | /* Read in the data and create pointers */ |
516 | |
517 | while((ramused+FILESORT_VARSIZE+nextitemsize)<=((void*)threads[thread].datap-sizeof(void*)-threads[thread].data)) |
518 | { |
519 | FILESORT_VARINT itemsize=nextitemsize; |
520 | |
521 | if(itemsize>largestitemsize) |
522 | largestitemsize=itemsize; |
523 | |
524 | *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize; |
525 | |
526 | ramused+=FILESORT_VARSIZE; |
527 | |
528 | ReadFile(fd_in,threads[thread].data+ramused,itemsize); |
529 | |
530 | *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */ |
531 | |
532 | ramused+=itemsize; |
533 | |
534 | ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN); |
535 | ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE; |
536 | |
537 | total++; |
538 | threads[thread].n++; |
539 | |
540 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) |
541 | { |
542 | more=0; |
543 | break; |
544 | } |
545 | } |
546 | |
547 | /* No new data read in this time round */ |
548 | |
549 | if(threads[thread].n==0) |
550 | break; |
551 | |
552 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
553 | |
554 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
555 | |
556 | #if defined(USE_PTHREADS) && USE_PTHREADS |
557 | |
558 | if(option_filesort_threads==1) |
559 | { |
560 | pthread_mutex_lock(&running_mutex); |
561 | |
562 | threads[thread].running=1; |
563 | |
564 | pthread_mutex_unlock(&running_mutex); |
565 | |
566 | filesort_vary_heapsort_thread(&threads[thread]); |
567 | |
568 | pthread_mutex_lock(&running_mutex); |
569 | |
570 | threads[thread].running=0; |
571 | |
572 | pthread_mutex_unlock(&running_mutex); |
573 | } |
574 | else |
575 | { |
576 | pthread_mutex_lock(&running_mutex); |
577 | |
578 | while(nthreads==(option_filesort_threads-1)) |
579 | { |
580 | for(i=0;i<option_filesort_threads;i++) |
581 | if(threads[i].running==2) |
582 | { |
583 | pthread_join(threads[i].thread,NULL); |
584 | threads[i].running=0; |
585 | nthreads--; |
586 | } |
587 | |
588 | if(nthreads==(option_filesort_threads-1)) |
589 | pthread_cond_wait(&running_cond,&running_mutex); |
590 | } |
591 | |
592 | threads[thread].running=1; |
593 | |
594 | pthread_mutex_unlock(&running_mutex); |
595 | |
596 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]); |
597 | |
598 | nthreads++; |
599 | } |
600 | |
601 | #else |
602 | |
603 | filesort_vary_heapsort_thread(&threads[thread]); |
604 | |
605 | #endif |
606 | |
607 | nfiles++; |
608 | } |
609 | while(more); |
610 | |
611 | /* Wait for all of the threads to finish */ |
612 | |
613 | #if defined(USE_PTHREADS) && USE_PTHREADS |
614 | |
615 | while(nthreads && option_filesort_threads>1) |
616 | { |
617 | pthread_mutex_lock(&running_mutex); |
618 | |
619 | pthread_cond_wait(&running_cond,&running_mutex); |
620 | |
621 | for(i=0;i<option_filesort_threads;i++) |
622 | if(threads[i].running==2) |
623 | { |
624 | pthread_join(threads[i].thread,NULL); |
625 | threads[i].running=0; |
626 | nthreads--; |
627 | } |
628 | |
629 | pthread_mutex_unlock(&running_mutex); |
630 | } |
631 | |
632 | #endif |
633 | |
634 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ |
635 | |
636 | if(nfiles==1) |
637 | { |
638 | for(i=0;i<threads[0].n;i++) |
639 | { |
640 | if(!keep || keep(threads[0].datap[i],count)) |
641 | { |
642 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[i]-FILESORT_VARSIZE); |
643 | |
644 | WriteFile(fd_out,threads[0].datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
645 | count++; |
646 | } |
647 | } |
648 | |
649 | DeleteFile(threads[0].filename); |
650 | |
651 | goto tidy_and_exit; |
652 | } |
653 | |
654 | /* Check that number of files is less than file size */ |
655 | |
656 | largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN); |
657 | |
658 | assert(nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize)); |
659 | |
660 | /* Open all of the temporary files */ |
661 | |
662 | fds=(int*)malloc(nfiles*sizeof(int)); |
663 | |
664 | for(i=0;i<nfiles;i++) |
665 | { |
666 | char *filename=threads[0].filename; |
667 | |
668 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
669 | |
670 | fds[i]=ReOpenFile(filename); |
671 | |
672 | DeleteFile(filename); |
673 | } |
674 | |
675 | /* Perform an n-way merge using a binary heap */ |
676 | |
677 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
678 | |
679 | data=threads[0].data; |
680 | datap=data+datasize-nfiles*sizeof(void*); |
681 | |
682 | /* Fill the heap to start with */ |
683 | |
684 | for(i=0;i<nfiles;i++) |
685 | { |
686 | int index; |
687 | FILESORT_VARINT itemsize; |
688 | |
689 | datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize; |
690 | |
691 | ReadFile(fds[i],&itemsize,FILESORT_VARSIZE); |
692 | |
693 | *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize; |
694 | |
695 | ReadFile(fds[i],datap[i],itemsize); |
696 | |
697 | index=i+1; |
698 | |
699 | heap[index]=i; |
700 | |
701 | /* Bubble up the new value */ |
702 | |
703 | while(index>1) |
704 | { |
705 | int newindex; |
706 | int temp; |
707 | |
708 | newindex=index/2; |
709 | |
710 | if(compare(datap[heap[index]],datap[heap[newindex]])>=0) |
711 | break; |
712 | |
713 | temp=heap[index]; |
714 | heap[index]=heap[newindex]; |
715 | heap[newindex]=temp; |
716 | |
717 | index=newindex; |
718 | } |
719 | } |
720 | |
721 | /* Repeatedly pull out the root of the heap and refill from the same file */ |
722 | |
723 | ndata=nfiles; |
724 | |
725 | do |
726 | { |
727 | int index=1; |
728 | FILESORT_VARINT itemsize; |
729 | |
730 | if(!keep || keep(datap[heap[index]],count)) |
731 | { |
732 | itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE); |
733 | |
734 | WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
735 | count++; |
736 | } |
737 | |
738 | if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE)) |
739 | { |
740 | heap[index]=heap[ndata]; |
741 | ndata--; |
742 | } |
743 | else |
744 | { |
745 | *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize; |
746 | |
747 | ReadFile(fds[heap[index]],datap[heap[index]],itemsize); |
748 | } |
749 | |
750 | /* Bubble down the new value */ |
751 | |
752 | while((2*index)<ndata) |
753 | { |
754 | int newindex; |
755 | int temp; |
756 | |
757 | newindex=2*index; |
758 | |
759 | if(compare(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
760 | newindex=newindex+1; |
761 | |
762 | if(compare(datap[heap[index]],datap[heap[newindex]])<=0) |
763 | break; |
764 | |
765 | temp=heap[newindex]; |
766 | heap[newindex]=heap[index]; |
767 | heap[index]=temp; |
768 | |
769 | index=newindex; |
770 | } |
771 | |
772 | if((2*index)==ndata) |
773 | { |
774 | int newindex; |
775 | int temp; |
776 | |
777 | newindex=2*index; |
778 | |
779 | if(compare(datap[heap[index]],datap[heap[newindex]])<=0) |
780 | ; /* break */ |
781 | else |
782 | { |
783 | temp=heap[newindex]; |
784 | heap[newindex]=heap[index]; |
785 | heap[index]=temp; |
786 | } |
787 | } |
788 | } |
789 | while(ndata>0); |
790 | |
791 | /* Tidy up */ |
792 | |
793 | tidy_and_exit: |
794 | |
795 | if(fds) |
796 | { |
797 | for(i=0;i<nfiles;i++) |
798 | CloseFile(fds[i]); |
799 | free(fds); |
800 | } |
801 | |
802 | if(heap) |
803 | free(heap); |
804 | |
805 | for(i=0;i<option_filesort_threads;i++) |
806 | { |
807 | free(threads[i].data); |
808 | |
809 | free(threads[i].filename); |
810 | } |
811 | |
812 | return(count); |
813 | } |
814 | |
815 | |
816 | /*++++++++++++++++++++++++++++++++++++++ |
817 | A wrapper function that can be run in a thread for fixed data. |
818 | |
819 | void *filesort_fixed_heapsort_thread Returns NULL (required to return void*). |
820 | |
821 | thread_data *thread The data to be processed in this thread. |
822 | ++++++++++++++++++++++++++++++++++++++*/ |
823 | |
824 | static void *filesort_fixed_heapsort_thread(thread_data *thread) |
825 | { |
826 | int fd,i; |
827 | |
828 | /* Sort the data pointers using a heap sort */ |
829 | |
830 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
831 | |
832 | /* Create a temporary file and write the result */ |
833 | |
834 | fd=OpenFileNew(thread->filename); |
835 | |
836 | for(i=0;i<thread->n;i++) |
837 | WriteFile(fd,thread->datap[i],thread->itemsize); |
838 | |
839 | CloseFile(fd); |
840 | |
841 | #if defined(USE_PTHREADS) && USE_PTHREADS |
842 | |
843 | pthread_mutex_lock(&running_mutex); |
844 | |
845 | thread->running=2; |
846 | |
847 | pthread_cond_signal(&running_cond); |
848 | |
849 | pthread_mutex_unlock(&running_mutex); |
850 | |
851 | #endif |
852 | |
853 | return(NULL); |
854 | } |
855 | |
856 | |
857 | /*++++++++++++++++++++++++++++++++++++++ |
858 | A wrapper function that can be run in a thread for variable data. |
859 | |
860 | void *filesort_vary_heapsort_thread Returns NULL (required to return void*). |
861 | |
862 | thread_data *thread The data to be processed in this thread. |
863 | ++++++++++++++++++++++++++++++++++++++*/ |
864 | |
865 | static void *filesort_vary_heapsort_thread(thread_data *thread) |
866 | { |
867 | int fd,i; |
868 | |
869 | /* Sort the data pointers using a heap sort */ |
870 | |
871 | filesort_heapsort(thread->datap,thread->n,thread->compare); |
872 | |
873 | /* Create a temporary file and write the result */ |
874 | |
875 | fd=OpenFileNew(thread->filename); |
876 | |
877 | for(i=0;i<thread->n;i++) |
878 | { |
879 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[i]-FILESORT_VARSIZE); |
880 | |
881 | WriteFile(fd,thread->datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
882 | } |
883 | |
884 | CloseFile(fd); |
885 | |
886 | #if defined(USE_PTHREADS) && USE_PTHREADS |
887 | |
888 | pthread_mutex_lock(&running_mutex); |
889 | |
890 | thread->running=2; |
891 | |
892 | pthread_cond_signal(&running_cond); |
893 | |
894 | pthread_mutex_unlock(&running_mutex); |
895 | |
896 | #endif |
897 | |
898 | return(NULL); |
899 | } |
900 | |
901 | |
902 | /*++++++++++++++++++++++++++++++++++++++ |
903 | A function to sort an array of pointers efficiently. |
904 | |
905 | The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort, |
906 | in particular, this is good because it can operate in-place and doesn't |
907 | allocate more memory like using qsort() does. |
908 | |
909 | void **datap A pointer to the array of pointers to sort. |
910 | |
911 | size_t nitems The number of items of data to sort. |
912 | |
913 | int(*compare)(const void *, const void *) The comparison function (identical to qsort if the |
914 | data to be sorted was an array of things not pointers). |
915 | ++++++++++++++++++++++++++++++++++++++*/ |
916 | |
917 | void filesort_heapsort(void **datap,size_t nitems,int(*compare)(const void*, const void*)) |
918 | { |
919 | void **datap1=&datap[-1]; |
920 | int i; |
921 | |
922 | /* Fill the heap by pretending to insert the data that is already there */ |
923 | |
924 | for(i=2;i<=nitems;i++) |
925 | { |
926 | int index=i; |
927 | |
928 | /* Bubble up the new value (upside-down, put largest at top) */ |
929 | |
930 | while(index>1) |
931 | { |
932 | int newindex; |
933 | void *temp; |
934 | |
935 | newindex=index/2; |
936 | |
937 | if(compare(datap1[index],datap1[newindex])<=0) /* reversed compared to filesort_fixed() above */ |
938 | break; |
939 | |
940 | temp=datap1[index]; |
941 | datap1[index]=datap1[newindex]; |
942 | datap1[newindex]=temp; |
943 | |
944 | index=newindex; |
945 | } |
946 | } |
947 | |
948 | /* Repeatedly pull out the root of the heap and swap with the bottom item */ |
949 | |
950 | for(i=nitems;i>1;i--) |
951 | { |
952 | int index=1; |
953 | void *temp; |
954 | |
955 | temp=datap1[index]; |
956 | datap1[index]=datap1[i]; |
957 | datap1[i]=temp; |
958 | |
959 | /* Bubble down the new value (upside-down, put largest at top) */ |
960 | |
961 | while((2*index)<(i-1)) |
962 | { |
963 | int newindex; |
964 | void *temp; |
965 | |
966 | newindex=2*index; |
967 | |
968 | if(compare(datap1[newindex],datap1[newindex+1])<=0) /* reversed compared to filesort_fixed() above */ |
969 | newindex=newindex+1; |
970 | |
971 | if(compare(datap1[index],datap1[newindex])>=0) /* reversed compared to filesort_fixed() above */ |
972 | break; |
973 | |
974 | temp=datap1[newindex]; |
975 | datap1[newindex]=datap1[index]; |
976 | datap1[index]=temp; |
977 | |
978 | index=newindex; |
979 | } |
980 | |
981 | if((2*index)==(i-1)) |
982 | { |
983 | int newindex; |
984 | void *temp; |
985 | |
986 | newindex=2*index; |
987 | |
988 | if(compare(datap1[index],datap1[newindex])>=0) /* reversed compared to filesort_fixed() above */ |
989 | ; /* break */ |
990 | else |
991 | { |
992 | temp=datap1[newindex]; |
993 | datap1[newindex]=datap1[index]; |
994 | datap1[index]=temp; |
995 | } |
996 | } |
997 | } |
998 | } |
Properties
Name | Value |
---|---|
cvs:description | Functions to perform sorting. |