Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino
Annotation of /trunk/src/sorting.c
Parent Directory
|
Revision Log
Revision 1310 -
(hide annotations)
(download)
(as text)
Fri May 10 19:02:28 2013 UTC (11 years, 10 months ago) by amb
File MIME type: text/x-csrc
File size: 26583 byte(s)
Fri May 10 19:02:28 2013 UTC (11 years, 10 months ago) by amb
File MIME type: text/x-csrc
File size: 26583 byte(s)
Change data type from signed to unsigned (pedantic compiler warning).
1 | amb | 269 | /*************************************** |
2 | Merge sort functions. | ||
3 | |||
4 | Part of the Routino routing software. | ||
5 | ******************/ /****************** | ||
6 | amb | 1310 | This file Copyright 2009-2013 Andrew M. Bishop |
7 | amb | 269 | |
8 | This program is free software: you can redistribute it and/or modify | ||
9 | it under the terms of the GNU Affero General Public License as published by | ||
10 | the Free Software Foundation, either version 3 of the License, or | ||
11 | (at your option) any later version. | ||
12 | |||
13 | This program is distributed in the hope that it will be useful, | ||
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
16 | GNU Affero General Public License for more details. | ||
17 | |||
18 | You should have received a copy of the GNU Affero General Public License | ||
19 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
20 | ***************************************/ | ||
21 | |||
22 | |||
23 | #include <stdio.h> | ||
24 | #include <stdlib.h> | ||
25 | #include <string.h> | ||
26 | |||
27 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
28 | #include <pthread.h> | ||
29 | #endif | ||
30 | |||
31 | amb | 532 | #include "types.h" |
32 | |||
33 | amb | 1166 | #include "logging.h" |
34 | amb | 449 | #include "files.h" |
35 | amb | 532 | #include "sorting.h" |
36 | amb | 269 | |
37 | |||
38 | amb | 680 | /* Global variables */ |
39 | amb | 269 | |
40 | amb | 289 | /*+ The command line '--tmpdir' option or its default value. +*/ |
41 | amb | 284 | extern char *option_tmpdirname; |
42 | amb | 269 | |
43 | amb | 358 | /*+ The amount of RAM to use for filesorting. +*/ |
44 | extern size_t option_filesort_ramsize; | ||
45 | amb | 269 | |
46 | amb | 991 | /*+ The number of filesorting threads allowed. +*/ |
47 | extern int option_filesort_threads; | ||
48 | amb | 358 | |
49 | amb | 991 | |
50 | /* Thread data type definitions */ | ||
51 | |||
52 | /*+ A data type for holding data for a thread. +*/ | ||
53 | typedef struct _thread_data | ||
54 | { | ||
55 | pthread_t thread; /*+ The thread identifier. +*/ | ||
56 | |||
57 | int running; /*+ A flag indicating the current state of the thread. +*/ | ||
58 | |||
59 | void *data; /*+ The main data array. +*/ | ||
60 | void **datap; /*+ An array of pointers to the data objects. +*/ | ||
61 | size_t n; /*+ The number of pointers. +*/ | ||
62 | |||
63 | char *filename; /*+ The name of the file to write the results to. +*/ | ||
64 | |||
65 | size_t itemsize; /*+ The size of each item. +*/ | ||
66 | int (*compare)(const void*,const void*); /*+ The comparison function. +*/ | ||
67 | } | ||
68 | thread_data; | ||
69 | |||
70 | amb | 996 | /* Thread variables */ |
71 | amb | 991 | |
72 | amb | 996 | #if defined(USE_PTHREADS) && USE_PTHREADS |
73 | |||
74 | static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; | ||
75 | static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER; | ||
76 | |||
77 | #endif | ||
78 | |||
79 | amb | 991 | /* Thread helper functions */ |
80 | |||
81 | static void *filesort_fixed_heapsort_thread(thread_data *thread); | ||
82 | static void *filesort_vary_heapsort_thread(thread_data *thread); | ||
83 | |||
84 | |||
85 | amb | 269 | /*++++++++++++++++++++++++++++++++++++++ |
86 | amb | 310 | A function to sort the contents of a file of fixed length objects using a |
87 | limited amount of RAM. | ||
88 | amb | 269 | |
89 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort | ||
90 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. | ||
91 | The individual sort steps and the merge step both use a "Heap sort" | ||
92 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well | ||
93 | if the data is already partially sorted. | ||
94 | |||
95 | amb | 948 | index_t filesort_fixed Returns the number of objects kept. |
96 | |||
97 | amb | 269 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
98 | |||
99 | int fd_out The file descriptor of the output file (opened for writing and empty). | ||
100 | |||
101 | size_t itemsize The size of each item in the file that needs sorting. | ||
102 | |||
103 | amb | 1106 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
104 | each item before they have been sorted. The second parameter is the number of objects | ||
105 | previously read from the input file. If the function returns 1 then the object is kept | ||
106 | and it is sorted, otherwise it is ignored. | ||
107 | amb | 269 | |
108 | amb | 1106 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
109 | to qsort if the data to be sorted is an array of things not pointers. | ||
110 | |||
111 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for | ||
112 | each item after they have been sorted. The second parameter is the number of objects | ||
113 | already written to the output file. If the function returns 1 then the object is written | ||
114 | to the output file., otherwise it is ignored. | ||
115 | amb | 269 | ++++++++++++++++++++++++++++++++++++++*/ |
116 | |||
117 | amb | 1106 | index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t), |
118 | int (*compare_function)(const void*,const void*), | ||
119 | int (*post_sort_function)(void*,index_t)) | ||
120 | amb | 269 | { |
121 | int *fds=NULL,*heap=NULL; | ||
122 | int nfiles=0,ndata=0; | ||
123 | amb | 1112 | index_t count_out=0,count_in=0,total=0; |
124 | amb | 991 | size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*))); |
125 | void *data,**datap; | ||
126 | thread_data *threads; | ||
127 | amb | 1310 | size_t item; |
128 | amb | 269 | int i,more=1; |
129 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
130 | int nthreads=0; | ||
131 | #endif | ||
132 | amb | 269 | |
133 | /* Allocate the RAM buffer and other bits */ | ||
134 | |||
135 | amb | 991 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
136 | amb | 269 | |
137 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
138 | { | ||
139 | threads[i].running=0; | ||
140 | amb | 269 | |
141 | amb | 991 | threads[i].data=malloc(nitems*itemsize); |
142 | threads[i].datap=malloc(nitems*sizeof(void*)); | ||
143 | |||
144 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); | ||
145 | |||
146 | threads[i].itemsize=itemsize; | ||
147 | amb | 1106 | threads[i].compare=compare_function; |
148 | amb | 991 | } |
149 | |||
150 | amb | 269 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
151 | |||
152 | do | ||
153 | { | ||
154 | amb | 991 | int thread=0; |
155 | amb | 269 | |
156 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
157 | |||
158 | amb | 1020 | if(option_filesort_threads>1) |
159 | { | ||
160 | /* Find a spare slot (one *must* be unused at all times) */ | ||
161 | amb | 991 | |
162 | amb | 1020 | pthread_mutex_lock(&running_mutex); |
163 | amb | 996 | |
164 | amb | 1020 | for(thread=0;thread<option_filesort_threads;thread++) |
165 | if(!threads[thread].running) | ||
166 | break; | ||
167 | amb | 991 | |
168 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
169 | } | ||
170 | amb | 996 | |
171 | amb | 991 | #endif |
172 | |||
173 | amb | 269 | /* Read in the data and create pointers */ |
174 | |||
175 | amb | 1310 | for(item=0;item<nitems;) |
176 | amb | 269 | { |
177 | amb | 1310 | threads[thread].datap[item]=threads[thread].data+item*itemsize; |
178 | amb | 269 | |
179 | amb | 1310 | if(ReadFile(fd_in,threads[thread].datap[item],itemsize)) |
180 | amb | 269 | { |
181 | more=0; | ||
182 | break; | ||
183 | } | ||
184 | |||
185 | amb | 1310 | if(!pre_sort_function || pre_sort_function(threads[thread].datap[item],count_in)) |
186 | amb | 1106 | { |
187 | amb | 1310 | item++; |
188 | amb | 1112 | total++; |
189 | amb | 1106 | } |
190 | amb | 1112 | |
191 | count_in++; | ||
192 | amb | 269 | } |
193 | |||
194 | amb | 1310 | threads[thread].n=item; |
195 | amb | 269 | |
196 | amb | 1020 | /* Shortcut if there is no previous data and no more data (i.e. no data at all) */ |
197 | amb | 546 | |
198 | amb | 1112 | if(more==0 && total==0) |
199 | amb | 546 | goto tidy_and_exit; |
200 | |||
201 | amb | 543 | /* No new data read in this time round */ |
202 | |||
203 | amb | 991 | if(threads[thread].n==0) |
204 | amb | 269 | break; |
205 | |||
206 | amb | 991 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
207 | amb | 269 | |
208 | amb | 991 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
209 | amb | 269 | |
210 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
211 | amb | 269 | |
212 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
213 | |||
214 | if(more==0 && nfiles==0) | ||
215 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
216 | else if(option_filesort_threads>1) | ||
217 | amb | 269 | { |
218 | amb | 996 | pthread_mutex_lock(&running_mutex); |
219 | |||
220 | amb | 991 | while(nthreads==(option_filesort_threads-1)) |
221 | amb | 269 | { |
222 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
223 | if(threads[i].running==2) | ||
224 | { | ||
225 | pthread_join(threads[i].thread,NULL); | ||
226 | threads[i].running=0; | ||
227 | nthreads--; | ||
228 | } | ||
229 | |||
230 | if(nthreads==(option_filesort_threads-1)) | ||
231 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
232 | amb | 269 | } |
233 | |||
234 | amb | 991 | threads[thread].running=1; |
235 | amb | 269 | |
236 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
237 | |||
238 | amb | 991 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]); |
239 | amb | 269 | |
240 | amb | 991 | nthreads++; |
241 | } | ||
242 | amb | 1020 | else |
243 | amb | 1021 | filesort_fixed_heapsort_thread(&threads[thread]); |
244 | amb | 269 | |
245 | amb | 991 | #else |
246 | amb | 269 | |
247 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
248 | |||
249 | if(more==0 && nfiles==0) | ||
250 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
251 | else | ||
252 | amb | 1020 | filesort_fixed_heapsort_thread(&threads[thread]); |
253 | amb | 269 | |
254 | amb | 991 | #endif |
255 | amb | 269 | |
256 | nfiles++; | ||
257 | } | ||
258 | while(more); | ||
259 | |||
260 | amb | 991 | /* Wait for all of the threads to finish */ |
261 | amb | 269 | |
262 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
263 | |||
264 | amb | 1020 | while(option_filesort_threads>1 && nthreads) |
265 | amb | 991 | { |
266 | amb | 996 | pthread_mutex_lock(&running_mutex); |
267 | amb | 991 | |
268 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
269 | |||
270 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
271 | if(threads[i].running==2) | ||
272 | { | ||
273 | pthread_join(threads[i].thread,NULL); | ||
274 | threads[i].running=0; | ||
275 | nthreads--; | ||
276 | } | ||
277 | |||
278 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
279 | amb | 991 | } |
280 | |||
281 | #endif | ||
282 | |||
283 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ | ||
284 | |||
285 | amb | 269 | if(nfiles==1) |
286 | { | ||
287 | amb | 1310 | for(item=0;item<threads[0].n;item++) |
288 | amb | 269 | { |
289 | amb | 1310 | if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out)) |
290 | amb | 274 | { |
291 | amb | 1310 | WriteFile(fd_out,threads[0].datap[item],itemsize); |
292 | amb | 1106 | count_out++; |
293 | amb | 274 | } |
294 | amb | 269 | } |
295 | |||
296 | amb | 991 | DeleteFile(threads[0].filename); |
297 | amb | 269 | |
298 | amb | 283 | goto tidy_and_exit; |
299 | amb | 269 | } |
300 | |||
301 | /* Check that number of files is less than file size */ | ||
302 | |||
303 | amb | 1310 | logassert((unsigned)nfiles<nitems,"Too many temporary files (use more sorting memory?)"); |
304 | amb | 269 | |
305 | /* Open all of the temporary files */ | ||
306 | |||
307 | fds=(int*)malloc(nfiles*sizeof(int)); | ||
308 | |||
309 | for(i=0;i<nfiles;i++) | ||
310 | { | ||
311 | amb | 991 | char *filename=threads[0].filename; |
312 | |||
313 | amb | 284 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
314 | amb | 269 | |
315 | fds[i]=ReOpenFile(filename); | ||
316 | |||
317 | DeleteFile(filename); | ||
318 | } | ||
319 | |||
320 | /* Perform an n-way merge using a binary heap */ | ||
321 | |||
322 | amb | 875 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
323 | amb | 269 | |
324 | amb | 991 | data =threads[0].data; |
325 | datap=threads[0].datap; | ||
326 | |||
327 | amb | 269 | /* Fill the heap to start with */ |
328 | |||
329 | for(i=0;i<nfiles;i++) | ||
330 | { | ||
331 | int index; | ||
332 | |||
333 | datap[i]=data+i*itemsize; | ||
334 | |||
335 | ReadFile(fds[i],datap[i],itemsize); | ||
336 | |||
337 | amb | 875 | index=i+1; |
338 | |||
339 | amb | 867 | heap[index]=i; |
340 | amb | 269 | |
341 | /* Bubble up the new value */ | ||
342 | |||
343 | amb | 875 | while(index>1) |
344 | amb | 269 | { |
345 | int newindex; | ||
346 | int temp; | ||
347 | |||
348 | amb | 875 | newindex=index/2; |
349 | amb | 269 | |
350 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
351 | amb | 869 | break; |
352 | |||
353 | amb | 269 | temp=heap[index]; |
354 | heap[index]=heap[newindex]; | ||
355 | heap[newindex]=temp; | ||
356 | |||
357 | index=newindex; | ||
358 | } | ||
359 | } | ||
360 | |||
361 | /* Repeatedly pull out the root of the heap and refill from the same file */ | ||
362 | |||
363 | ndata=nfiles; | ||
364 | |||
365 | do | ||
366 | { | ||
367 | amb | 875 | int index=1; |
368 | amb | 269 | |
369 | amb | 1106 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
370 | amb | 274 | { |
371 | amb | 867 | WriteFile(fd_out,datap[heap[index]],itemsize); |
372 | amb | 1106 | count_out++; |
373 | amb | 274 | } |
374 | amb | 269 | |
375 | amb | 867 | if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize)) |
376 | amb | 269 | { |
377 | amb | 875 | heap[index]=heap[ndata]; |
378 | amb | 870 | ndata--; |
379 | amb | 269 | } |
380 | |||
381 | /* Bubble down the new value */ | ||
382 | |||
383 | amb | 875 | while((2*index)<ndata) |
384 | amb | 269 | { |
385 | amb | 873 | int newindex; |
386 | amb | 269 | int temp; |
387 | |||
388 | amb | 875 | newindex=2*index; |
389 | amb | 269 | |
390 | amb | 1106 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
391 | amb | 873 | newindex=newindex+1; |
392 | amb | 869 | |
393 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
394 | amb | 869 | break; |
395 | |||
396 | amb | 269 | temp=heap[newindex]; |
397 | heap[newindex]=heap[index]; | ||
398 | heap[index]=temp; | ||
399 | |||
400 | index=newindex; | ||
401 | } | ||
402 | |||
403 | amb | 875 | if((2*index)==ndata) |
404 | amb | 269 | { |
405 | int newindex; | ||
406 | int temp; | ||
407 | |||
408 | amb | 875 | newindex=2*index; |
409 | amb | 269 | |
410 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
411 | amb | 869 | ; /* break */ |
412 | else | ||
413 | { | ||
414 | temp=heap[newindex]; | ||
415 | heap[newindex]=heap[index]; | ||
416 | heap[index]=temp; | ||
417 | } | ||
418 | amb | 269 | } |
419 | } | ||
420 | while(ndata>0); | ||
421 | |||
422 | /* Tidy up */ | ||
423 | |||
424 | amb | 283 | tidy_and_exit: |
425 | amb | 269 | |
426 | amb | 283 | if(fds) |
427 | { | ||
428 | for(i=0;i<nfiles;i++) | ||
429 | CloseFile(fds[i]); | ||
430 | free(fds); | ||
431 | } | ||
432 | |||
433 | if(heap) | ||
434 | free(heap); | ||
435 | |||
436 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
437 | { | ||
438 | free(threads[i].data); | ||
439 | free(threads[i].datap); | ||
440 | amb | 948 | |
441 | amb | 991 | free(threads[i].filename); |
442 | } | ||
443 | |||
444 | amb | 1106 | return(count_out); |
445 | amb | 269 | } |
446 | |||
447 | |||
448 | /*++++++++++++++++++++++++++++++++++++++ | ||
449 | amb | 310 | A function to sort the contents of a file of variable length objects (each |
450 | amb | 867 | preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM. |
451 | amb | 310 | |
452 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort | ||
453 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. | ||
454 | The individual sort steps and the merge step both use a "Heap sort" | ||
455 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well | ||
456 | if the data is already partially sorted. | ||
457 | |||
458 | amb | 948 | index_t filesort_vary Returns the number of objects kept. |
459 | |||
460 | amb | 310 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
461 | |||
462 | int fd_out The file descriptor of the output file (opened for writing and empty). | ||
463 | |||
464 | amb | 1106 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
465 | each item before they have been sorted. The second parameter is the number of objects | ||
466 | previously read from the input file. If the function returns 1 then the object is kept | ||
467 | and it is sorted, otherwise it is ignored. | ||
468 | amb | 310 | |
469 | amb | 1106 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
470 | to qsort if the data to be sorted is an array of things not pointers. | ||
471 | |||
472 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for | ||
473 | each item after they have been sorted. The second parameter is the number of objects | ||
474 | already written to the output file. If the function returns 1 then the object is written | ||
475 | to the output file., otherwise it is ignored. | ||
476 | amb | 310 | ++++++++++++++++++++++++++++++++++++++*/ |
477 | |||
478 | amb | 1106 | index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t), |
479 | int (*compare_function)(const void*,const void*), | ||
480 | int (*post_sort_function)(void*,index_t)) | ||
481 | amb | 310 | { |
482 | int *fds=NULL,*heap=NULL; | ||
483 | int nfiles=0,ndata=0; | ||
484 | amb | 1112 | index_t count_out=0,count_in=0,total=0; |
485 | amb | 991 | size_t datasize=option_filesort_ramsize/option_filesort_threads; |
486 | amb | 311 | FILESORT_VARINT nextitemsize,largestitemsize=0; |
487 | amb | 991 | void *data,**datap; |
488 | thread_data *threads; | ||
489 | amb | 1310 | size_t item; |
490 | amb | 310 | int i,more=1; |
491 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
492 | int nthreads=0; | ||
493 | #endif | ||
494 | amb | 310 | |
495 | /* Allocate the RAM buffer and other bits */ | ||
496 | |||
497 | amb | 991 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
498 | amb | 310 | |
499 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
500 | { | ||
501 | threads[i].running=0; | ||
502 | amb | 310 | |
503 | amb | 991 | threads[i].data=malloc(datasize); |
504 | threads[i].datap=NULL; | ||
505 | |||
506 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); | ||
507 | |||
508 | amb | 1106 | threads[i].compare=compare_function; |
509 | amb | 991 | } |
510 | |||
511 | amb | 310 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
512 | |||
513 | amb | 311 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */ |
514 | amb | 310 | goto tidy_and_exit; |
515 | |||
516 | do | ||
517 | { | ||
518 | amb | 311 | size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE; |
519 | amb | 991 | int thread=0; |
520 | amb | 310 | |
521 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
522 | amb | 310 | |
523 | amb | 1020 | if(option_filesort_threads>1) |
524 | { | ||
525 | /* Find a spare slot (one *must* be unused at all times) */ | ||
526 | amb | 991 | |
527 | amb | 1020 | pthread_mutex_lock(&running_mutex); |
528 | amb | 996 | |
529 | amb | 1020 | for(thread=0;thread<option_filesort_threads;thread++) |
530 | if(!threads[thread].running) | ||
531 | break; | ||
532 | amb | 991 | |
533 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
534 | } | ||
535 | amb | 996 | |
536 | amb | 991 | #endif |
537 | |||
538 | threads[thread].datap=threads[thread].data+datasize; | ||
539 | |||
540 | threads[thread].n=0; | ||
541 | |||
542 | amb | 310 | /* Read in the data and create pointers */ |
543 | |||
544 | amb | 1310 | while((ramused+FILESORT_VARSIZE+nextitemsize)<=(unsigned)((void*)threads[thread].datap-sizeof(void*)-threads[thread].data)) |
545 | amb | 310 | { |
546 | amb | 311 | FILESORT_VARINT itemsize=nextitemsize; |
547 | amb | 310 | |
548 | if(itemsize>largestitemsize) | ||
549 | largestitemsize=itemsize; | ||
550 | |||
551 | amb | 991 | *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize; |
552 | amb | 310 | |
553 | amb | 311 | ramused+=FILESORT_VARSIZE; |
554 | amb | 310 | |
555 | amb | 991 | ReadFile(fd_in,threads[thread].data+ramused,itemsize); |
556 | amb | 310 | |
557 | amb | 1106 | if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in)) |
558 | { | ||
559 | *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */ | ||
560 | amb | 310 | |
561 | amb | 1106 | ramused+=itemsize; |
562 | amb | 311 | |
563 | amb | 1106 | ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN); |
564 | ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE; | ||
565 | amb | 311 | |
566 | amb | 1112 | total++; |
567 | amb | 1106 | threads[thread].n++; |
568 | } | ||
569 | amb | 310 | |
570 | amb | 1112 | count_in++; |
571 | |||
572 | amb | 311 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) |
573 | amb | 310 | { |
574 | more=0; | ||
575 | break; | ||
576 | } | ||
577 | } | ||
578 | |||
579 | amb | 543 | /* No new data read in this time round */ |
580 | |||
581 | amb | 991 | if(threads[thread].n==0) |
582 | amb | 310 | break; |
583 | |||
584 | amb | 991 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
585 | amb | 310 | |
586 | amb | 1021 | if(more==0 && nfiles==0) |
587 | threads[thread].filename[0]=0; | ||
588 | else | ||
589 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); | ||
590 | amb | 310 | |
591 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
592 | amb | 310 | |
593 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
594 | |||
595 | if(more==0 && nfiles==0) | ||
596 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
597 | else if(option_filesort_threads>1) | ||
598 | amb | 310 | { |
599 | amb | 996 | pthread_mutex_lock(&running_mutex); |
600 | |||
601 | amb | 991 | while(nthreads==(option_filesort_threads-1)) |
602 | amb | 310 | { |
603 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
604 | if(threads[i].running==2) | ||
605 | { | ||
606 | pthread_join(threads[i].thread,NULL); | ||
607 | threads[i].running=0; | ||
608 | nthreads--; | ||
609 | } | ||
610 | |||
611 | if(nthreads==(option_filesort_threads-1)) | ||
612 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
613 | amb | 310 | } |
614 | |||
615 | amb | 991 | threads[thread].running=1; |
616 | |||
617 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
618 | |||
619 | amb | 991 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]); |
620 | |||
621 | nthreads++; | ||
622 | amb | 310 | } |
623 | amb | 1020 | else |
624 | amb | 1021 | filesort_vary_heapsort_thread(&threads[thread]); |
625 | amb | 310 | |
626 | amb | 991 | #else |
627 | amb | 310 | |
628 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
629 | |||
630 | if(more==0 && nfiles==0) | ||
631 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
632 | else | ||
633 | amb | 1020 | filesort_vary_heapsort_thread(&threads[thread]); |
634 | amb | 310 | |
635 | amb | 991 | #endif |
636 | amb | 310 | |
637 | amb | 991 | nfiles++; |
638 | } | ||
639 | while(more); | ||
640 | |||
641 | /* Wait for all of the threads to finish */ | ||
642 | |||
643 | #if defined(USE_PTHREADS) && USE_PTHREADS | ||
644 | |||
645 | amb | 1020 | while(option_filesort_threads>1 && nthreads) |
646 | amb | 991 | { |
647 | amb | 996 | pthread_mutex_lock(&running_mutex); |
648 | amb | 991 | |
649 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
650 | |||
651 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
652 | if(threads[i].running==2) | ||
653 | { | ||
654 | pthread_join(threads[i].thread,NULL); | ||
655 | threads[i].running=0; | ||
656 | nthreads--; | ||
657 | } | ||
658 | |||
659 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
660 | amb | 991 | } |
661 | |||
662 | #endif | ||
663 | |||
664 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ | ||
665 | |||
666 | if(nfiles==1) | ||
667 | { | ||
668 | amb | 1310 | for(item=0;item<threads[0].n;item++) |
669 | amb | 310 | { |
670 | amb | 1310 | if(!post_sort_function || post_sort_function(threads[0].datap[item],count_out)) |
671 | amb | 991 | { |
672 | amb | 1310 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[item]-FILESORT_VARSIZE); |
673 | amb | 310 | |
674 | amb | 1310 | WriteFile(fd_out,threads[0].datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
675 | amb | 1106 | count_out++; |
676 | amb | 991 | } |
677 | amb | 310 | } |
678 | |||
679 | amb | 991 | DeleteFile(threads[0].filename); |
680 | amb | 310 | |
681 | amb | 991 | goto tidy_and_exit; |
682 | amb | 310 | } |
683 | |||
684 | /* Check that number of files is less than file size */ | ||
685 | |||
686 | amb | 311 | largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN); |
687 | amb | 310 | |
688 | amb | 1310 | logassert((unsigned)nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize),"Too many temporary files (use more sorting memory?)"); |
689 | amb | 310 | |
690 | /* Open all of the temporary files */ | ||
691 | |||
692 | fds=(int*)malloc(nfiles*sizeof(int)); | ||
693 | |||
694 | for(i=0;i<nfiles;i++) | ||
695 | { | ||
696 | amb | 991 | char *filename=threads[0].filename; |
697 | |||
698 | amb | 310 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
699 | |||
700 | fds[i]=ReOpenFile(filename); | ||
701 | |||
702 | DeleteFile(filename); | ||
703 | } | ||
704 | |||
705 | /* Perform an n-way merge using a binary heap */ | ||
706 | |||
707 | amb | 875 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
708 | amb | 310 | |
709 | amb | 991 | data=threads[0].data; |
710 | datap=data+datasize-nfiles*sizeof(void*); | ||
711 | amb | 310 | |
712 | /* Fill the heap to start with */ | ||
713 | |||
714 | for(i=0;i<nfiles;i++) | ||
715 | { | ||
716 | int index; | ||
717 | amb | 311 | FILESORT_VARINT itemsize; |
718 | amb | 310 | |
719 | amb | 311 | datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize; |
720 | amb | 310 | |
721 | amb | 311 | ReadFile(fds[i],&itemsize,FILESORT_VARSIZE); |
722 | amb | 310 | |
723 | amb | 311 | *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize; |
724 | amb | 310 | |
725 | ReadFile(fds[i],datap[i],itemsize); | ||
726 | |||
727 | amb | 875 | index=i+1; |
728 | |||
729 | amb | 867 | heap[index]=i; |
730 | amb | 310 | |
731 | /* Bubble up the new value */ | ||
732 | |||
733 | amb | 875 | while(index>1) |
734 | amb | 310 | { |
735 | int newindex; | ||
736 | int temp; | ||
737 | |||
738 | amb | 875 | newindex=index/2; |
739 | amb | 310 | |
740 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
741 | amb | 869 | break; |
742 | |||
743 | amb | 310 | temp=heap[index]; |
744 | heap[index]=heap[newindex]; | ||
745 | heap[newindex]=temp; | ||
746 | |||
747 | index=newindex; | ||
748 | } | ||
749 | } | ||
750 | |||
751 | /* Repeatedly pull out the root of the heap and refill from the same file */ | ||
752 | |||
753 | ndata=nfiles; | ||
754 | |||
755 | do | ||
756 | { | ||
757 | amb | 875 | int index=1; |
758 | amb | 311 | FILESORT_VARINT itemsize; |
759 | amb | 310 | |
760 | amb | 1106 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
761 | amb | 310 | { |
762 | amb | 867 | itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE); |
763 | amb | 310 | |
764 | amb | 867 | WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
765 | amb | 1106 | count_out++; |
766 | amb | 310 | } |
767 | |||
768 | amb | 867 | if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE)) |
769 | amb | 310 | { |
770 | amb | 875 | heap[index]=heap[ndata]; |
771 | amb | 870 | ndata--; |
772 | amb | 310 | } |
773 | else | ||
774 | { | ||
775 | amb | 867 | *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize; |
776 | amb | 310 | |
777 | amb | 867 | ReadFile(fds[heap[index]],datap[heap[index]],itemsize); |
778 | amb | 310 | } |
779 | |||
780 | /* Bubble down the new value */ | ||
781 | |||
782 | amb | 875 | while((2*index)<ndata) |
783 | amb | 310 | { |
784 | amb | 873 | int newindex; |
785 | amb | 310 | int temp; |
786 | |||
787 | amb | 875 | newindex=2*index; |
788 | amb | 310 | |
789 | amb | 1106 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
790 | amb | 873 | newindex=newindex+1; |
791 | amb | 869 | |
792 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
793 | amb | 869 | break; |
794 | |||
795 | amb | 310 | temp=heap[newindex]; |
796 | heap[newindex]=heap[index]; | ||
797 | heap[index]=temp; | ||
798 | |||
799 | index=newindex; | ||
800 | } | ||
801 | |||
802 | amb | 875 | if((2*index)==ndata) |
803 | amb | 310 | { |
804 | int newindex; | ||
805 | int temp; | ||
806 | |||
807 | amb | 875 | newindex=2*index; |
808 | amb | 310 | |
809 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
810 | amb | 869 | ; /* break */ |
811 | else | ||
812 | { | ||
813 | temp=heap[newindex]; | ||
814 | heap[newindex]=heap[index]; | ||
815 | heap[index]=temp; | ||
816 | } | ||
817 | amb | 310 | } |
818 | } | ||
819 | while(ndata>0); | ||
820 | |||
821 | /* Tidy up */ | ||
822 | |||
823 | tidy_and_exit: | ||
824 | |||
825 | if(fds) | ||
826 | { | ||
827 | for(i=0;i<nfiles;i++) | ||
828 | CloseFile(fds[i]); | ||
829 | free(fds); | ||
830 | } | ||
831 | |||
832 | if(heap) | ||
833 | free(heap); | ||
834 | |||
835 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
836 | { | ||
837 | free(threads[i].data); | ||
838 | amb | 948 | |
839 | amb | 991 | free(threads[i].filename); |
840 | } | ||
841 | |||
842 | amb | 1106 | return(count_out); |
843 | amb | 310 | } |
844 | |||
845 | |||
846 | /*++++++++++++++++++++++++++++++++++++++ | ||
847 | amb | 991 | A wrapper function that can be run in a thread for fixed data. |
848 | |||
849 | void *filesort_fixed_heapsort_thread Returns NULL (required to return void*). | ||
850 | |||
851 | thread_data *thread The data to be processed in this thread. | ||
852 | ++++++++++++++++++++++++++++++++++++++*/ | ||
853 | |||
854 | static void *filesort_fixed_heapsort_thread(thread_data *thread) | ||
855 | { | ||
856 | amb | 1310 | int fd; |
857 | size_t item; | ||
858 | amb | 991 | |
859 | /* Sort the data pointers using a heap sort */ | ||
860 | |||
861 | filesort_heapsort(thread->datap,thread->n,thread->compare); | ||
862 | |||
863 | /* Create a temporary file and write the result */ | ||
864 | |||
865 | fd=OpenFileNew(thread->filename); | ||
866 | |||
867 | amb | 1310 | for(item=0;item<thread->n;item++) |
868 | WriteFile(fd,thread->datap[item],thread->itemsize); | ||
869 | amb | 991 | |
870 | CloseFile(fd); | ||
871 | |||
872 | amb | 996 | #if defined(USE_PTHREADS) && USE_PTHREADS |
873 | |||
874 | amb | 1020 | if(option_filesort_threads>1) |
875 | { | ||
876 | pthread_mutex_lock(&running_mutex); | ||
877 | amb | 996 | |
878 | amb | 1020 | thread->running=2; |
879 | amb | 991 | |
880 | amb | 1020 | pthread_cond_signal(&running_cond); |
881 | amb | 996 | |
882 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
883 | } | ||
884 | amb | 996 | |
885 | #endif | ||
886 | |||
887 | amb | 991 | return(NULL); |
888 | } | ||
889 | |||
890 | |||
891 | /*++++++++++++++++++++++++++++++++++++++ | ||
892 | A wrapper function that can be run in a thread for variable data. | ||
893 | |||
894 | void *filesort_vary_heapsort_thread Returns NULL (required to return void*). | ||
895 | |||
896 | thread_data *thread The data to be processed in this thread. | ||
897 | ++++++++++++++++++++++++++++++++++++++*/ | ||
898 | |||
899 | static void *filesort_vary_heapsort_thread(thread_data *thread) | ||
900 | { | ||
901 | amb | 1310 | int fd; |
902 | size_t item; | ||
903 | amb | 991 | |
904 | /* Sort the data pointers using a heap sort */ | ||
905 | |||
906 | filesort_heapsort(thread->datap,thread->n,thread->compare); | ||
907 | |||
908 | /* Create a temporary file and write the result */ | ||
909 | |||
910 | fd=OpenFileNew(thread->filename); | ||
911 | |||
912 | amb | 1310 | for(item=0;item<thread->n;item++) |
913 | amb | 991 | { |
914 | amb | 1310 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[item]-FILESORT_VARSIZE); |
915 | amb | 991 | |
916 | amb | 1310 | WriteFile(fd,thread->datap[item]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
917 | amb | 991 | } |
918 | |||
919 | CloseFile(fd); | ||
920 | |||
921 | amb | 996 | #if defined(USE_PTHREADS) && USE_PTHREADS |
922 | |||
923 | amb | 1020 | if(option_filesort_threads>1) |
924 | { | ||
925 | pthread_mutex_lock(&running_mutex); | ||
926 | amb | 996 | |
927 | amb | 1020 | thread->running=2; |
928 | amb | 991 | |
929 | amb | 1020 | pthread_cond_signal(&running_cond); |
930 | amb | 996 | |
931 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
932 | } | ||
933 | amb | 996 | |
934 | #endif | ||
935 | |||
936 | amb | 991 | return(NULL); |
937 | } | ||
938 | |||
939 | |||
940 | /*++++++++++++++++++++++++++++++++++++++ | ||
941 | amb | 269 | A function to sort an array of pointers efficiently. |
942 | |||
943 | The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort, | ||
944 | amb | 873 | in particular, this is good because it can operate in-place and doesn't |
945 | amb | 269 | allocate more memory like using qsort() does. |
946 | |||
947 | void **datap A pointer to the array of pointers to sort. | ||
948 | |||
949 | size_t nitems The number of items of data to sort. | ||
950 | |||
951 | amb | 1106 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
952 | to qsort if the data to be sorted is an array of things not pointers. | ||
953 | amb | 269 | ++++++++++++++++++++++++++++++++++++++*/ |
954 | |||
955 | amb | 1106 | void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*)) |
956 | amb | 269 | { |
957 | amb | 875 | void **datap1=&datap[-1]; |
958 | amb | 1310 | size_t item; |
959 | amb | 269 | |
960 | /* Fill the heap by pretending to insert the data that is already there */ | ||
961 | |||
962 | amb | 1310 | for(item=2;item<=nitems;item++) |
963 | amb | 269 | { |
964 | amb | 1310 | size_t index=item; |
965 | amb | 269 | |
966 | /* Bubble up the new value (upside-down, put largest at top) */ | ||
967 | |||
968 | amb | 875 | while(index>1) |
969 | amb | 269 | { |
970 | int newindex; | ||
971 | void *temp; | ||
972 | |||
973 | amb | 875 | newindex=index/2; |
974 | amb | 269 | |
975 | amb | 1106 | if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */ |
976 | amb | 869 | break; |
977 | |||
978 | amb | 875 | temp=datap1[index]; |
979 | datap1[index]=datap1[newindex]; | ||
980 | datap1[newindex]=temp; | ||
981 | amb | 269 | |
982 | index=newindex; | ||
983 | } | ||
984 | } | ||
985 | |||
986 | /* Repeatedly pull out the root of the heap and swap with the bottom item */ | ||
987 | |||
988 | amb | 1310 | for(item=nitems;item>1;item--) |
989 | amb | 269 | { |
990 | amb | 1310 | size_t index=1; |
991 | amb | 269 | void *temp; |
992 | |||
993 | amb | 875 | temp=datap1[index]; |
994 | amb | 1310 | datap1[index]=datap1[item]; |
995 | datap1[item]=temp; | ||
996 | amb | 269 | |
997 | /* Bubble down the new value (upside-down, put largest at top) */ | ||
998 | |||
999 | amb | 1310 | while((2*index)<(item-1)) |
1000 | amb | 269 | { |
1001 | amb | 873 | int newindex; |
1002 | amb | 269 | void *temp; |
1003 | |||
1004 | amb | 875 | newindex=2*index; |
1005 | amb | 269 | |
1006 | amb | 1106 | if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */ |
1007 | amb | 873 | newindex=newindex+1; |
1008 | amb | 869 | |
1009 | amb | 1106 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1010 | amb | 869 | break; |
1011 | |||
1012 | amb | 875 | temp=datap1[newindex]; |
1013 | datap1[newindex]=datap1[index]; | ||
1014 | datap1[index]=temp; | ||
1015 | amb | 269 | |
1016 | index=newindex; | ||
1017 | } | ||
1018 | |||
1019 | amb | 1310 | if((2*index)==(item-1)) |
1020 | amb | 269 | { |
1021 | int newindex; | ||
1022 | void *temp; | ||
1023 | |||
1024 | amb | 875 | newindex=2*index; |
1025 | amb | 269 | |
1026 | amb | 1106 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1027 | amb | 869 | ; /* break */ |
1028 | else | ||
1029 | { | ||
1030 | amb | 875 | temp=datap1[newindex]; |
1031 | datap1[newindex]=datap1[index]; | ||
1032 | datap1[index]=temp; | ||
1033 | amb | 869 | } |
1034 | amb | 269 | } |
1035 | } | ||
1036 | } |
Properties
Name | Value |
---|---|
cvs:description | Functions to perform sorting. |