Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino
Annotation of /trunk/src/sorting.c
Parent Directory
|
Revision Log
Revision 1166 -
(hide annotations)
(download)
(as text)
Tue Nov 20 16:12:08 2012 UTC (12 years, 4 months ago) by amb
File MIME type: text/x-csrc
File size: 26372 byte(s)
Tue Nov 20 16:12:08 2012 UTC (12 years, 4 months ago) by amb
File MIME type: text/x-csrc
File size: 26372 byte(s)
Replace all assert statements with a custom error message that explains the cause and suggests a solution.
1 | amb | 269 | /*************************************** |
2 | Merge sort functions. | ||
3 | |||
4 | Part of the Routino routing software. | ||
5 | ******************/ /****************** | ||
6 | amb | 948 | This file Copyright 2009-2012 Andrew M. Bishop |
7 | amb | 269 | |
8 | This program is free software: you can redistribute it and/or modify | ||
9 | it under the terms of the GNU Affero General Public License as published by | ||
10 | the Free Software Foundation, either version 3 of the License, or | ||
11 | (at your option) any later version. | ||
12 | |||
13 | This program is distributed in the hope that it will be useful, | ||
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
16 | GNU Affero General Public License for more details. | ||
17 | |||
18 | You should have received a copy of the GNU Affero General Public License | ||
19 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
20 | ***************************************/ | ||
21 | |||
22 | |||
23 | #include <stdio.h> | ||
24 | #include <stdlib.h> | ||
25 | #include <string.h> | ||
26 | |||
27 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
28 | #include <pthread.h> | ||
29 | #endif | ||
30 | |||
31 | amb | 532 | #include "types.h" |
32 | |||
33 | amb | 1166 | #include "logging.h" |
34 | amb | 449 | #include "files.h" |
35 | amb | 532 | #include "sorting.h" |
36 | amb | 269 | |
37 | |||
38 | amb | 680 | /* Global variables */ |
39 | amb | 269 | |
40 | amb | 289 | /*+ The command line '--tmpdir' option or its default value. +*/ |
41 | amb | 284 | extern char *option_tmpdirname; |
42 | amb | 269 | |
43 | amb | 358 | /*+ The amount of RAM to use for filesorting. +*/ |
44 | extern size_t option_filesort_ramsize; | ||
45 | amb | 269 | |
46 | amb | 991 | /*+ The number of filesorting threads allowed. +*/ |
47 | extern int option_filesort_threads; | ||
48 | amb | 358 | |
49 | amb | 991 | |
50 | /* Thread data type definitions */ | ||
51 | |||
52 | /*+ A data type for holding data for a thread. +*/ | ||
53 | typedef struct _thread_data | ||
54 | { | ||
55 | pthread_t thread; /*+ The thread identifier. +*/ | ||
56 | |||
57 | int running; /*+ A flag indicating the current state of the thread. +*/ | ||
58 | |||
59 | void *data; /*+ The main data array. +*/ | ||
60 | void **datap; /*+ An array of pointers to the data objects. +*/ | ||
61 | size_t n; /*+ The number of pointers. +*/ | ||
62 | |||
63 | char *filename; /*+ The name of the file to write the results to. +*/ | ||
64 | |||
65 | size_t itemsize; /*+ The size of each item. +*/ | ||
66 | int (*compare)(const void*,const void*); /*+ The comparison function. +*/ | ||
67 | } | ||
68 | thread_data; | ||
69 | |||
70 | amb | 996 | /* Thread variables */ |
71 | amb | 991 | |
72 | amb | 996 | #if defined(USE_PTHREADS) && USE_PTHREADS |
73 | |||
74 | static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; | ||
75 | static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER; | ||
76 | |||
77 | #endif | ||
78 | |||
79 | amb | 991 | /* Thread helper functions */ |
80 | |||
81 | static void *filesort_fixed_heapsort_thread(thread_data *thread); | ||
82 | static void *filesort_vary_heapsort_thread(thread_data *thread); | ||
83 | |||
84 | |||
85 | amb | 269 | /*++++++++++++++++++++++++++++++++++++++ |
86 | amb | 310 | A function to sort the contents of a file of fixed length objects using a |
87 | limited amount of RAM. | ||
88 | amb | 269 | |
89 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort | ||
90 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. | ||
91 | The individual sort steps and the merge step both use a "Heap sort" | ||
92 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well | ||
93 | if the data is already partially sorted. | ||
94 | |||
95 | amb | 948 | index_t filesort_fixed Returns the number of objects kept. |
96 | |||
97 | amb | 269 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
98 | |||
99 | int fd_out The file descriptor of the output file (opened for writing and empty). | ||
100 | |||
101 | size_t itemsize The size of each item in the file that needs sorting. | ||
102 | |||
103 | amb | 1106 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
104 | each item before they have been sorted. The second parameter is the number of objects | ||
105 | previously read from the input file. If the function returns 1 then the object is kept | ||
106 | and it is sorted, otherwise it is ignored. | ||
107 | amb | 269 | |
108 | amb | 1106 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
109 | to qsort if the data to be sorted is an array of things not pointers. | ||
110 | |||
111 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for | ||
112 | each item after they have been sorted. The second parameter is the number of objects | ||
113 | already written to the output file. If the function returns 1 then the object is written | ||
114 | to the output file., otherwise it is ignored. | ||
115 | amb | 269 | ++++++++++++++++++++++++++++++++++++++*/ |
116 | |||
117 | amb | 1106 | index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t), |
118 | int (*compare_function)(const void*,const void*), | ||
119 | int (*post_sort_function)(void*,index_t)) | ||
120 | amb | 269 | { |
121 | int *fds=NULL,*heap=NULL; | ||
122 | int nfiles=0,ndata=0; | ||
123 | amb | 1112 | index_t count_out=0,count_in=0,total=0; |
124 | amb | 991 | size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*))); |
125 | void *data,**datap; | ||
126 | thread_data *threads; | ||
127 | amb | 269 | int i,more=1; |
128 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
129 | int nthreads=0; | ||
130 | #endif | ||
131 | amb | 269 | |
132 | /* Allocate the RAM buffer and other bits */ | ||
133 | |||
134 | amb | 991 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
135 | amb | 269 | |
136 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
137 | { | ||
138 | threads[i].running=0; | ||
139 | amb | 269 | |
140 | amb | 991 | threads[i].data=malloc(nitems*itemsize); |
141 | threads[i].datap=malloc(nitems*sizeof(void*)); | ||
142 | |||
143 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); | ||
144 | |||
145 | threads[i].itemsize=itemsize; | ||
146 | amb | 1106 | threads[i].compare=compare_function; |
147 | amb | 991 | } |
148 | |||
149 | amb | 269 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
150 | |||
151 | do | ||
152 | { | ||
153 | amb | 991 | int thread=0; |
154 | amb | 269 | |
155 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
156 | |||
157 | amb | 1020 | if(option_filesort_threads>1) |
158 | { | ||
159 | /* Find a spare slot (one *must* be unused at all times) */ | ||
160 | amb | 991 | |
161 | amb | 1020 | pthread_mutex_lock(&running_mutex); |
162 | amb | 996 | |
163 | amb | 1020 | for(thread=0;thread<option_filesort_threads;thread++) |
164 | if(!threads[thread].running) | ||
165 | break; | ||
166 | amb | 991 | |
167 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
168 | } | ||
169 | amb | 996 | |
170 | amb | 991 | #endif |
171 | |||
172 | amb | 269 | /* Read in the data and create pointers */ |
173 | |||
174 | amb | 1106 | for(i=0;i<nitems;) |
175 | amb | 269 | { |
176 | amb | 991 | threads[thread].datap[i]=threads[thread].data+i*itemsize; |
177 | amb | 269 | |
178 | amb | 991 | if(ReadFile(fd_in,threads[thread].datap[i],itemsize)) |
179 | amb | 269 | { |
180 | more=0; | ||
181 | break; | ||
182 | } | ||
183 | |||
184 | amb | 1106 | if(!pre_sort_function || pre_sort_function(threads[thread].datap[i],count_in)) |
185 | { | ||
186 | i++; | ||
187 | amb | 1112 | total++; |
188 | amb | 1106 | } |
189 | amb | 1112 | |
190 | count_in++; | ||
191 | amb | 269 | } |
192 | |||
193 | amb | 991 | threads[thread].n=i; |
194 | amb | 269 | |
195 | amb | 1020 | /* Shortcut if there is no previous data and no more data (i.e. no data at all) */ |
196 | amb | 546 | |
197 | amb | 1112 | if(more==0 && total==0) |
198 | amb | 546 | goto tidy_and_exit; |
199 | |||
200 | amb | 543 | /* No new data read in this time round */ |
201 | |||
202 | amb | 991 | if(threads[thread].n==0) |
203 | amb | 269 | break; |
204 | |||
205 | amb | 991 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
206 | amb | 269 | |
207 | amb | 991 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
208 | amb | 269 | |
209 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
210 | amb | 269 | |
211 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
212 | |||
213 | if(more==0 && nfiles==0) | ||
214 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
215 | else if(option_filesort_threads>1) | ||
216 | amb | 269 | { |
217 | amb | 996 | pthread_mutex_lock(&running_mutex); |
218 | |||
219 | amb | 991 | while(nthreads==(option_filesort_threads-1)) |
220 | amb | 269 | { |
221 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
222 | if(threads[i].running==2) | ||
223 | { | ||
224 | pthread_join(threads[i].thread,NULL); | ||
225 | threads[i].running=0; | ||
226 | nthreads--; | ||
227 | } | ||
228 | |||
229 | if(nthreads==(option_filesort_threads-1)) | ||
230 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
231 | amb | 269 | } |
232 | |||
233 | amb | 991 | threads[thread].running=1; |
234 | amb | 269 | |
235 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
236 | |||
237 | amb | 991 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]); |
238 | amb | 269 | |
239 | amb | 991 | nthreads++; |
240 | } | ||
241 | amb | 1020 | else |
242 | amb | 1021 | filesort_fixed_heapsort_thread(&threads[thread]); |
243 | amb | 269 | |
244 | amb | 991 | #else |
245 | amb | 269 | |
246 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
247 | |||
248 | if(more==0 && nfiles==0) | ||
249 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
250 | else | ||
251 | amb | 1020 | filesort_fixed_heapsort_thread(&threads[thread]); |
252 | amb | 269 | |
253 | amb | 991 | #endif |
254 | amb | 269 | |
255 | nfiles++; | ||
256 | } | ||
257 | while(more); | ||
258 | |||
259 | amb | 991 | /* Wait for all of the threads to finish */ |
260 | amb | 269 | |
261 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
262 | |||
263 | amb | 1020 | while(option_filesort_threads>1 && nthreads) |
264 | amb | 991 | { |
265 | amb | 996 | pthread_mutex_lock(&running_mutex); |
266 | amb | 991 | |
267 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
268 | |||
269 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
270 | if(threads[i].running==2) | ||
271 | { | ||
272 | pthread_join(threads[i].thread,NULL); | ||
273 | threads[i].running=0; | ||
274 | nthreads--; | ||
275 | } | ||
276 | |||
277 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
278 | amb | 991 | } |
279 | |||
280 | #endif | ||
281 | |||
282 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ | ||
283 | |||
284 | amb | 269 | if(nfiles==1) |
285 | { | ||
286 | amb | 991 | for(i=0;i<threads[0].n;i++) |
287 | amb | 269 | { |
288 | amb | 1106 | if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out)) |
289 | amb | 274 | { |
290 | amb | 991 | WriteFile(fd_out,threads[0].datap[i],itemsize); |
291 | amb | 1106 | count_out++; |
292 | amb | 274 | } |
293 | amb | 269 | } |
294 | |||
295 | amb | 991 | DeleteFile(threads[0].filename); |
296 | amb | 269 | |
297 | amb | 283 | goto tidy_and_exit; |
298 | amb | 269 | } |
299 | |||
300 | /* Check that number of files is less than file size */ | ||
301 | |||
302 | amb | 1166 | logassert(nfiles<nitems,"Too many temporary files (use more sorting memory?)"); |
303 | amb | 269 | |
304 | /* Open all of the temporary files */ | ||
305 | |||
306 | fds=(int*)malloc(nfiles*sizeof(int)); | ||
307 | |||
308 | for(i=0;i<nfiles;i++) | ||
309 | { | ||
310 | amb | 991 | char *filename=threads[0].filename; |
311 | |||
312 | amb | 284 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
313 | amb | 269 | |
314 | fds[i]=ReOpenFile(filename); | ||
315 | |||
316 | DeleteFile(filename); | ||
317 | } | ||
318 | |||
319 | /* Perform an n-way merge using a binary heap */ | ||
320 | |||
321 | amb | 875 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
322 | amb | 269 | |
323 | amb | 991 | data =threads[0].data; |
324 | datap=threads[0].datap; | ||
325 | |||
326 | amb | 269 | /* Fill the heap to start with */ |
327 | |||
328 | for(i=0;i<nfiles;i++) | ||
329 | { | ||
330 | int index; | ||
331 | |||
332 | datap[i]=data+i*itemsize; | ||
333 | |||
334 | ReadFile(fds[i],datap[i],itemsize); | ||
335 | |||
336 | amb | 875 | index=i+1; |
337 | |||
338 | amb | 867 | heap[index]=i; |
339 | amb | 269 | |
340 | /* Bubble up the new value */ | ||
341 | |||
342 | amb | 875 | while(index>1) |
343 | amb | 269 | { |
344 | int newindex; | ||
345 | int temp; | ||
346 | |||
347 | amb | 875 | newindex=index/2; |
348 | amb | 269 | |
349 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
350 | amb | 869 | break; |
351 | |||
352 | amb | 269 | temp=heap[index]; |
353 | heap[index]=heap[newindex]; | ||
354 | heap[newindex]=temp; | ||
355 | |||
356 | index=newindex; | ||
357 | } | ||
358 | } | ||
359 | |||
360 | /* Repeatedly pull out the root of the heap and refill from the same file */ | ||
361 | |||
362 | ndata=nfiles; | ||
363 | |||
364 | do | ||
365 | { | ||
366 | amb | 875 | int index=1; |
367 | amb | 269 | |
368 | amb | 1106 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
369 | amb | 274 | { |
370 | amb | 867 | WriteFile(fd_out,datap[heap[index]],itemsize); |
371 | amb | 1106 | count_out++; |
372 | amb | 274 | } |
373 | amb | 269 | |
374 | amb | 867 | if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize)) |
375 | amb | 269 | { |
376 | amb | 875 | heap[index]=heap[ndata]; |
377 | amb | 870 | ndata--; |
378 | amb | 269 | } |
379 | |||
380 | /* Bubble down the new value */ | ||
381 | |||
382 | amb | 875 | while((2*index)<ndata) |
383 | amb | 269 | { |
384 | amb | 873 | int newindex; |
385 | amb | 269 | int temp; |
386 | |||
387 | amb | 875 | newindex=2*index; |
388 | amb | 269 | |
389 | amb | 1106 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
390 | amb | 873 | newindex=newindex+1; |
391 | amb | 869 | |
392 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
393 | amb | 869 | break; |
394 | |||
395 | amb | 269 | temp=heap[newindex]; |
396 | heap[newindex]=heap[index]; | ||
397 | heap[index]=temp; | ||
398 | |||
399 | index=newindex; | ||
400 | } | ||
401 | |||
402 | amb | 875 | if((2*index)==ndata) |
403 | amb | 269 | { |
404 | int newindex; | ||
405 | int temp; | ||
406 | |||
407 | amb | 875 | newindex=2*index; |
408 | amb | 269 | |
409 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
410 | amb | 869 | ; /* break */ |
411 | else | ||
412 | { | ||
413 | temp=heap[newindex]; | ||
414 | heap[newindex]=heap[index]; | ||
415 | heap[index]=temp; | ||
416 | } | ||
417 | amb | 269 | } |
418 | } | ||
419 | while(ndata>0); | ||
420 | |||
421 | /* Tidy up */ | ||
422 | |||
423 | amb | 283 | tidy_and_exit: |
424 | amb | 269 | |
425 | amb | 283 | if(fds) |
426 | { | ||
427 | for(i=0;i<nfiles;i++) | ||
428 | CloseFile(fds[i]); | ||
429 | free(fds); | ||
430 | } | ||
431 | |||
432 | if(heap) | ||
433 | free(heap); | ||
434 | |||
435 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
436 | { | ||
437 | free(threads[i].data); | ||
438 | free(threads[i].datap); | ||
439 | amb | 948 | |
440 | amb | 991 | free(threads[i].filename); |
441 | } | ||
442 | |||
443 | amb | 1106 | return(count_out); |
444 | amb | 269 | } |
445 | |||
446 | |||
447 | /*++++++++++++++++++++++++++++++++++++++ | ||
448 | amb | 310 | A function to sort the contents of a file of variable length objects (each |
449 | amb | 867 | preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM. |
450 | amb | 310 | |
451 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort | ||
452 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. | ||
453 | The individual sort steps and the merge step both use a "Heap sort" | ||
454 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well | ||
455 | if the data is already partially sorted. | ||
456 | |||
457 | amb | 948 | index_t filesort_vary Returns the number of objects kept. |
458 | |||
459 | amb | 310 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
460 | |||
461 | int fd_out The file descriptor of the output file (opened for writing and empty). | ||
462 | |||
463 | amb | 1106 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
464 | each item before they have been sorted. The second parameter is the number of objects | ||
465 | previously read from the input file. If the function returns 1 then the object is kept | ||
466 | and it is sorted, otherwise it is ignored. | ||
467 | amb | 310 | |
468 | amb | 1106 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
469 | to qsort if the data to be sorted is an array of things not pointers. | ||
470 | |||
471 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for | ||
472 | each item after they have been sorted. The second parameter is the number of objects | ||
473 | already written to the output file. If the function returns 1 then the object is written | ||
474 | to the output file., otherwise it is ignored. | ||
475 | amb | 310 | ++++++++++++++++++++++++++++++++++++++*/ |
476 | |||
477 | amb | 1106 | index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t), |
478 | int (*compare_function)(const void*,const void*), | ||
479 | int (*post_sort_function)(void*,index_t)) | ||
480 | amb | 310 | { |
481 | int *fds=NULL,*heap=NULL; | ||
482 | int nfiles=0,ndata=0; | ||
483 | amb | 1112 | index_t count_out=0,count_in=0,total=0; |
484 | amb | 991 | size_t datasize=option_filesort_ramsize/option_filesort_threads; |
485 | amb | 311 | FILESORT_VARINT nextitemsize,largestitemsize=0; |
486 | amb | 991 | void *data,**datap; |
487 | thread_data *threads; | ||
488 | amb | 310 | int i,more=1; |
489 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
490 | int nthreads=0; | ||
491 | #endif | ||
492 | amb | 310 | |
493 | /* Allocate the RAM buffer and other bits */ | ||
494 | |||
495 | amb | 991 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
496 | amb | 310 | |
497 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
498 | { | ||
499 | threads[i].running=0; | ||
500 | amb | 310 | |
501 | amb | 991 | threads[i].data=malloc(datasize); |
502 | threads[i].datap=NULL; | ||
503 | |||
504 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); | ||
505 | |||
506 | amb | 1106 | threads[i].compare=compare_function; |
507 | amb | 991 | } |
508 | |||
509 | amb | 310 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
510 | |||
511 | amb | 311 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */ |
512 | amb | 310 | goto tidy_and_exit; |
513 | |||
514 | do | ||
515 | { | ||
516 | amb | 311 | size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE; |
517 | amb | 991 | int thread=0; |
518 | amb | 310 | |
519 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
520 | amb | 310 | |
521 | amb | 1020 | if(option_filesort_threads>1) |
522 | { | ||
523 | /* Find a spare slot (one *must* be unused at all times) */ | ||
524 | amb | 991 | |
525 | amb | 1020 | pthread_mutex_lock(&running_mutex); |
526 | amb | 996 | |
527 | amb | 1020 | for(thread=0;thread<option_filesort_threads;thread++) |
528 | if(!threads[thread].running) | ||
529 | break; | ||
530 | amb | 991 | |
531 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
532 | } | ||
533 | amb | 996 | |
534 | amb | 991 | #endif |
535 | |||
536 | threads[thread].datap=threads[thread].data+datasize; | ||
537 | |||
538 | threads[thread].n=0; | ||
539 | |||
540 | amb | 310 | /* Read in the data and create pointers */ |
541 | |||
542 | amb | 991 | while((ramused+FILESORT_VARSIZE+nextitemsize)<=((void*)threads[thread].datap-sizeof(void*)-threads[thread].data)) |
543 | amb | 310 | { |
544 | amb | 311 | FILESORT_VARINT itemsize=nextitemsize; |
545 | amb | 310 | |
546 | if(itemsize>largestitemsize) | ||
547 | largestitemsize=itemsize; | ||
548 | |||
549 | amb | 991 | *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize; |
550 | amb | 310 | |
551 | amb | 311 | ramused+=FILESORT_VARSIZE; |
552 | amb | 310 | |
553 | amb | 991 | ReadFile(fd_in,threads[thread].data+ramused,itemsize); |
554 | amb | 310 | |
555 | amb | 1106 | if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in)) |
556 | { | ||
557 | *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */ | ||
558 | amb | 310 | |
559 | amb | 1106 | ramused+=itemsize; |
560 | amb | 311 | |
561 | amb | 1106 | ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN); |
562 | ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE; | ||
563 | amb | 311 | |
564 | amb | 1112 | total++; |
565 | amb | 1106 | threads[thread].n++; |
566 | } | ||
567 | amb | 310 | |
568 | amb | 1112 | count_in++; |
569 | |||
570 | amb | 311 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) |
571 | amb | 310 | { |
572 | more=0; | ||
573 | break; | ||
574 | } | ||
575 | } | ||
576 | |||
577 | amb | 543 | /* No new data read in this time round */ |
578 | |||
579 | amb | 991 | if(threads[thread].n==0) |
580 | amb | 310 | break; |
581 | |||
582 | amb | 991 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
583 | amb | 310 | |
584 | amb | 1021 | if(more==0 && nfiles==0) |
585 | threads[thread].filename[0]=0; | ||
586 | else | ||
587 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); | ||
588 | amb | 310 | |
589 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
590 | amb | 310 | |
591 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
592 | |||
593 | if(more==0 && nfiles==0) | ||
594 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
595 | else if(option_filesort_threads>1) | ||
596 | amb | 310 | { |
597 | amb | 996 | pthread_mutex_lock(&running_mutex); |
598 | |||
599 | amb | 991 | while(nthreads==(option_filesort_threads-1)) |
600 | amb | 310 | { |
601 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
602 | if(threads[i].running==2) | ||
603 | { | ||
604 | pthread_join(threads[i].thread,NULL); | ||
605 | threads[i].running=0; | ||
606 | nthreads--; | ||
607 | } | ||
608 | |||
609 | if(nthreads==(option_filesort_threads-1)) | ||
610 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
611 | amb | 310 | } |
612 | |||
613 | amb | 991 | threads[thread].running=1; |
614 | |||
615 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
616 | |||
617 | amb | 991 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]); |
618 | |||
619 | nthreads++; | ||
620 | amb | 310 | } |
621 | amb | 1020 | else |
622 | amb | 1021 | filesort_vary_heapsort_thread(&threads[thread]); |
623 | amb | 310 | |
624 | amb | 991 | #else |
625 | amb | 310 | |
626 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
627 | |||
628 | if(more==0 && nfiles==0) | ||
629 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
630 | else | ||
631 | amb | 1020 | filesort_vary_heapsort_thread(&threads[thread]); |
632 | amb | 310 | |
633 | amb | 991 | #endif |
634 | amb | 310 | |
635 | amb | 991 | nfiles++; |
636 | } | ||
637 | while(more); | ||
638 | |||
639 | /* Wait for all of the threads to finish */ | ||
640 | |||
641 | #if defined(USE_PTHREADS) && USE_PTHREADS | ||
642 | |||
643 | amb | 1020 | while(option_filesort_threads>1 && nthreads) |
644 | amb | 991 | { |
645 | amb | 996 | pthread_mutex_lock(&running_mutex); |
646 | amb | 991 | |
647 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
648 | |||
649 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
650 | if(threads[i].running==2) | ||
651 | { | ||
652 | pthread_join(threads[i].thread,NULL); | ||
653 | threads[i].running=0; | ||
654 | nthreads--; | ||
655 | } | ||
656 | |||
657 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
658 | amb | 991 | } |
659 | |||
660 | #endif | ||
661 | |||
662 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ | ||
663 | |||
664 | if(nfiles==1) | ||
665 | { | ||
666 | for(i=0;i<threads[0].n;i++) | ||
667 | amb | 310 | { |
668 | amb | 1106 | if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out)) |
669 | amb | 991 | { |
670 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[i]-FILESORT_VARSIZE); | ||
671 | amb | 310 | |
672 | amb | 991 | WriteFile(fd_out,threads[0].datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
673 | amb | 1106 | count_out++; |
674 | amb | 991 | } |
675 | amb | 310 | } |
676 | |||
677 | amb | 991 | DeleteFile(threads[0].filename); |
678 | amb | 310 | |
679 | amb | 991 | goto tidy_and_exit; |
680 | amb | 310 | } |
681 | |||
682 | /* Check that number of files is less than file size */ | ||
683 | |||
684 | amb | 311 | largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN); |
685 | amb | 310 | |
686 | amb | 1166 | logassert(nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize),"Too many temporary files (use more sorting memory?)"); |
687 | amb | 310 | |
688 | /* Open all of the temporary files */ | ||
689 | |||
690 | fds=(int*)malloc(nfiles*sizeof(int)); | ||
691 | |||
692 | for(i=0;i<nfiles;i++) | ||
693 | { | ||
694 | amb | 991 | char *filename=threads[0].filename; |
695 | |||
696 | amb | 310 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
697 | |||
698 | fds[i]=ReOpenFile(filename); | ||
699 | |||
700 | DeleteFile(filename); | ||
701 | } | ||
702 | |||
703 | /* Perform an n-way merge using a binary heap */ | ||
704 | |||
705 | amb | 875 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
706 | amb | 310 | |
707 | amb | 991 | data=threads[0].data; |
708 | datap=data+datasize-nfiles*sizeof(void*); | ||
709 | amb | 310 | |
710 | /* Fill the heap to start with */ | ||
711 | |||
712 | for(i=0;i<nfiles;i++) | ||
713 | { | ||
714 | int index; | ||
715 | amb | 311 | FILESORT_VARINT itemsize; |
716 | amb | 310 | |
717 | amb | 311 | datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize; |
718 | amb | 310 | |
719 | amb | 311 | ReadFile(fds[i],&itemsize,FILESORT_VARSIZE); |
720 | amb | 310 | |
721 | amb | 311 | *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize; |
722 | amb | 310 | |
723 | ReadFile(fds[i],datap[i],itemsize); | ||
724 | |||
725 | amb | 875 | index=i+1; |
726 | |||
727 | amb | 867 | heap[index]=i; |
728 | amb | 310 | |
729 | /* Bubble up the new value */ | ||
730 | |||
731 | amb | 875 | while(index>1) |
732 | amb | 310 | { |
733 | int newindex; | ||
734 | int temp; | ||
735 | |||
736 | amb | 875 | newindex=index/2; |
737 | amb | 310 | |
738 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
739 | amb | 869 | break; |
740 | |||
741 | amb | 310 | temp=heap[index]; |
742 | heap[index]=heap[newindex]; | ||
743 | heap[newindex]=temp; | ||
744 | |||
745 | index=newindex; | ||
746 | } | ||
747 | } | ||
748 | |||
749 | /* Repeatedly pull out the root of the heap and refill from the same file */ | ||
750 | |||
751 | ndata=nfiles; | ||
752 | |||
753 | do | ||
754 | { | ||
755 | amb | 875 | int index=1; |
756 | amb | 311 | FILESORT_VARINT itemsize; |
757 | amb | 310 | |
758 | amb | 1106 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
759 | amb | 310 | { |
760 | amb | 867 | itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE); |
761 | amb | 310 | |
762 | amb | 867 | WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
763 | amb | 1106 | count_out++; |
764 | amb | 310 | } |
765 | |||
766 | amb | 867 | if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE)) |
767 | amb | 310 | { |
768 | amb | 875 | heap[index]=heap[ndata]; |
769 | amb | 870 | ndata--; |
770 | amb | 310 | } |
771 | else | ||
772 | { | ||
773 | amb | 867 | *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize; |
774 | amb | 310 | |
775 | amb | 867 | ReadFile(fds[heap[index]],datap[heap[index]],itemsize); |
776 | amb | 310 | } |
777 | |||
778 | /* Bubble down the new value */ | ||
779 | |||
780 | amb | 875 | while((2*index)<ndata) |
781 | amb | 310 | { |
782 | amb | 873 | int newindex; |
783 | amb | 310 | int temp; |
784 | |||
785 | amb | 875 | newindex=2*index; |
786 | amb | 310 | |
787 | amb | 1106 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
788 | amb | 873 | newindex=newindex+1; |
789 | amb | 869 | |
790 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
791 | amb | 869 | break; |
792 | |||
793 | amb | 310 | temp=heap[newindex]; |
794 | heap[newindex]=heap[index]; | ||
795 | heap[index]=temp; | ||
796 | |||
797 | index=newindex; | ||
798 | } | ||
799 | |||
800 | amb | 875 | if((2*index)==ndata) |
801 | amb | 310 | { |
802 | int newindex; | ||
803 | int temp; | ||
804 | |||
805 | amb | 875 | newindex=2*index; |
806 | amb | 310 | |
807 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
808 | amb | 869 | ; /* break */ |
809 | else | ||
810 | { | ||
811 | temp=heap[newindex]; | ||
812 | heap[newindex]=heap[index]; | ||
813 | heap[index]=temp; | ||
814 | } | ||
815 | amb | 310 | } |
816 | } | ||
817 | while(ndata>0); | ||
818 | |||
819 | /* Tidy up */ | ||
820 | |||
821 | tidy_and_exit: | ||
822 | |||
823 | if(fds) | ||
824 | { | ||
825 | for(i=0;i<nfiles;i++) | ||
826 | CloseFile(fds[i]); | ||
827 | free(fds); | ||
828 | } | ||
829 | |||
830 | if(heap) | ||
831 | free(heap); | ||
832 | |||
833 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
834 | { | ||
835 | free(threads[i].data); | ||
836 | amb | 948 | |
837 | amb | 991 | free(threads[i].filename); |
838 | } | ||
839 | |||
840 | amb | 1106 | return(count_out); |
841 | amb | 310 | } |
842 | |||
843 | |||
844 | /*++++++++++++++++++++++++++++++++++++++ | ||
845 | amb | 991 | A wrapper function that can be run in a thread for fixed data. |
846 | |||
847 | void *filesort_fixed_heapsort_thread Returns NULL (required to return void*). | ||
848 | |||
849 | thread_data *thread The data to be processed in this thread. | ||
850 | ++++++++++++++++++++++++++++++++++++++*/ | ||
851 | |||
852 | static void *filesort_fixed_heapsort_thread(thread_data *thread) | ||
853 | { | ||
854 | int fd,i; | ||
855 | |||
856 | /* Sort the data pointers using a heap sort */ | ||
857 | |||
858 | filesort_heapsort(thread->datap,thread->n,thread->compare); | ||
859 | |||
860 | /* Create a temporary file and write the result */ | ||
861 | |||
862 | fd=OpenFileNew(thread->filename); | ||
863 | |||
864 | for(i=0;i<thread->n;i++) | ||
865 | WriteFile(fd,thread->datap[i],thread->itemsize); | ||
866 | |||
867 | CloseFile(fd); | ||
868 | |||
869 | amb | 996 | #if defined(USE_PTHREADS) && USE_PTHREADS |
870 | |||
871 | amb | 1020 | if(option_filesort_threads>1) |
872 | { | ||
873 | pthread_mutex_lock(&running_mutex); | ||
874 | amb | 996 | |
875 | amb | 1020 | thread->running=2; |
876 | amb | 991 | |
877 | amb | 1020 | pthread_cond_signal(&running_cond); |
878 | amb | 996 | |
879 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
880 | } | ||
881 | amb | 996 | |
882 | #endif | ||
883 | |||
884 | amb | 991 | return(NULL); |
885 | } | ||
886 | |||
887 | |||
888 | /*++++++++++++++++++++++++++++++++++++++ | ||
889 | A wrapper function that can be run in a thread for variable data. | ||
890 | |||
891 | void *filesort_vary_heapsort_thread Returns NULL (required to return void*). | ||
892 | |||
893 | thread_data *thread The data to be processed in this thread. | ||
894 | ++++++++++++++++++++++++++++++++++++++*/ | ||
895 | |||
896 | static void *filesort_vary_heapsort_thread(thread_data *thread) | ||
897 | { | ||
898 | int fd,i; | ||
899 | |||
900 | /* Sort the data pointers using a heap sort */ | ||
901 | |||
902 | filesort_heapsort(thread->datap,thread->n,thread->compare); | ||
903 | |||
904 | /* Create a temporary file and write the result */ | ||
905 | |||
906 | fd=OpenFileNew(thread->filename); | ||
907 | |||
908 | for(i=0;i<thread->n;i++) | ||
909 | { | ||
910 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[i]-FILESORT_VARSIZE); | ||
911 | |||
912 | WriteFile(fd,thread->datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); | ||
913 | } | ||
914 | |||
915 | CloseFile(fd); | ||
916 | |||
917 | amb | 996 | #if defined(USE_PTHREADS) && USE_PTHREADS |
918 | |||
919 | amb | 1020 | if(option_filesort_threads>1) |
920 | { | ||
921 | pthread_mutex_lock(&running_mutex); | ||
922 | amb | 996 | |
923 | amb | 1020 | thread->running=2; |
924 | amb | 991 | |
925 | amb | 1020 | pthread_cond_signal(&running_cond); |
926 | amb | 996 | |
927 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
928 | } | ||
929 | amb | 996 | |
930 | #endif | ||
931 | |||
932 | amb | 991 | return(NULL); |
933 | } | ||
934 | |||
935 | |||
936 | /*++++++++++++++++++++++++++++++++++++++ | ||
937 | amb | 269 | A function to sort an array of pointers efficiently. |
938 | |||
939 | The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort, | ||
940 | amb | 873 | in particular, this is good because it can operate in-place and doesn't |
941 | amb | 269 | allocate more memory like using qsort() does. |
942 | |||
943 | void **datap A pointer to the array of pointers to sort. | ||
944 | |||
945 | size_t nitems The number of items of data to sort. | ||
946 | |||
947 | amb | 1106 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
948 | to qsort if the data to be sorted is an array of things not pointers. | ||
949 | amb | 269 | ++++++++++++++++++++++++++++++++++++++*/ |
950 | |||
951 | amb | 1106 | void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*)) |
952 | amb | 269 | { |
953 | amb | 875 | void **datap1=&datap[-1]; |
954 | amb | 269 | int i; |
955 | |||
956 | /* Fill the heap by pretending to insert the data that is already there */ | ||
957 | |||
958 | amb | 875 | for(i=2;i<=nitems;i++) |
959 | amb | 269 | { |
960 | int index=i; | ||
961 | |||
962 | /* Bubble up the new value (upside-down, put largest at top) */ | ||
963 | |||
964 | amb | 875 | while(index>1) |
965 | amb | 269 | { |
966 | int newindex; | ||
967 | void *temp; | ||
968 | |||
969 | amb | 875 | newindex=index/2; |
970 | amb | 269 | |
971 | amb | 1106 | if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */ |
972 | amb | 869 | break; |
973 | |||
974 | amb | 875 | temp=datap1[index]; |
975 | datap1[index]=datap1[newindex]; | ||
976 | datap1[newindex]=temp; | ||
977 | amb | 269 | |
978 | index=newindex; | ||
979 | } | ||
980 | } | ||
981 | |||
982 | /* Repeatedly pull out the root of the heap and swap with the bottom item */ | ||
983 | |||
984 | amb | 875 | for(i=nitems;i>1;i--) |
985 | amb | 269 | { |
986 | amb | 875 | int index=1; |
987 | amb | 269 | void *temp; |
988 | |||
989 | amb | 875 | temp=datap1[index]; |
990 | datap1[index]=datap1[i]; | ||
991 | datap1[i]=temp; | ||
992 | amb | 269 | |
993 | /* Bubble down the new value (upside-down, put largest at top) */ | ||
994 | |||
995 | amb | 875 | while((2*index)<(i-1)) |
996 | amb | 269 | { |
997 | amb | 873 | int newindex; |
998 | amb | 269 | void *temp; |
999 | |||
1000 | amb | 875 | newindex=2*index; |
1001 | amb | 269 | |
1002 | amb | 1106 | if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */ |
1003 | amb | 873 | newindex=newindex+1; |
1004 | amb | 869 | |
1005 | amb | 1106 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1006 | amb | 869 | break; |
1007 | |||
1008 | amb | 875 | temp=datap1[newindex]; |
1009 | datap1[newindex]=datap1[index]; | ||
1010 | datap1[index]=temp; | ||
1011 | amb | 269 | |
1012 | index=newindex; | ||
1013 | } | ||
1014 | |||
1015 | amb | 875 | if((2*index)==(i-1)) |
1016 | amb | 269 | { |
1017 | int newindex; | ||
1018 | void *temp; | ||
1019 | |||
1020 | amb | 875 | newindex=2*index; |
1021 | amb | 269 | |
1022 | amb | 1106 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1023 | amb | 869 | ; /* break */ |
1024 | else | ||
1025 | { | ||
1026 | amb | 875 | temp=datap1[newindex]; |
1027 | datap1[newindex]=datap1[index]; | ||
1028 | datap1[index]=temp; | ||
1029 | amb | 869 | } |
1030 | amb | 269 | } |
1031 | } | ||
1032 | } |
Properties
Name | Value |
---|---|
cvs:description | Functions to perform sorting. |