Check out the latest version of Routino: svn co http://routino.org/svn/trunk routino
Annotation of /branches/destination-access/src/sorting.c
Parent Directory
|
Revision Log
Revision 1106 -
(hide annotations)
(download)
(as text)
Sun Oct 21 15:55:48 2012 UTC (12 years, 5 months ago) by amb
Original Path: trunk/src/sorting.c
File MIME type: text/x-csrc
File size: 26210 byte(s)
Sun Oct 21 15:55:48 2012 UTC (12 years, 5 months ago) by amb
Original Path: trunk/src/sorting.c
File MIME type: text/x-csrc
File size: 26210 byte(s)
Change the sorting functions to have a pre-sort and post-sort selection function instead of just a post-selection one (this will allow deletion of some items before sorting instead of after sorting in some cases).
1 | amb | 269 | /*************************************** |
2 | Merge sort functions. | ||
3 | |||
4 | Part of the Routino routing software. | ||
5 | ******************/ /****************** | ||
6 | amb | 948 | This file Copyright 2009-2012 Andrew M. Bishop |
7 | amb | 269 | |
8 | This program is free software: you can redistribute it and/or modify | ||
9 | it under the terms of the GNU Affero General Public License as published by | ||
10 | the Free Software Foundation, either version 3 of the License, or | ||
11 | (at your option) any later version. | ||
12 | |||
13 | This program is distributed in the hope that it will be useful, | ||
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
16 | GNU Affero General Public License for more details. | ||
17 | |||
18 | You should have received a copy of the GNU Affero General Public License | ||
19 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
20 | ***************************************/ | ||
21 | |||
22 | |||
23 | #include <stdio.h> | ||
24 | #include <stdlib.h> | ||
25 | #include <string.h> | ||
26 | #include <assert.h> | ||
27 | |||
28 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
29 | #include <pthread.h> | ||
30 | #endif | ||
31 | |||
32 | amb | 532 | #include "types.h" |
33 | |||
34 | amb | 449 | #include "files.h" |
35 | amb | 532 | #include "sorting.h" |
36 | amb | 269 | |
37 | |||
38 | amb | 680 | /* Global variables */ |
39 | amb | 269 | |
40 | amb | 289 | /*+ The command line '--tmpdir' option or its default value. +*/ |
41 | amb | 284 | extern char *option_tmpdirname; |
42 | amb | 269 | |
43 | amb | 358 | /*+ The amount of RAM to use for filesorting. +*/ |
44 | extern size_t option_filesort_ramsize; | ||
45 | amb | 269 | |
46 | amb | 991 | /*+ The number of filesorting threads allowed. +*/ |
47 | extern int option_filesort_threads; | ||
48 | amb | 358 | |
49 | amb | 991 | |
50 | /* Thread data type definitions */ | ||
51 | |||
52 | /*+ A data type for holding data for a thread. +*/ | ||
53 | typedef struct _thread_data | ||
54 | { | ||
55 | pthread_t thread; /*+ The thread identifier. +*/ | ||
56 | |||
57 | int running; /*+ A flag indicating the current state of the thread. +*/ | ||
58 | |||
59 | void *data; /*+ The main data array. +*/ | ||
60 | void **datap; /*+ An array of pointers to the data objects. +*/ | ||
61 | size_t n; /*+ The number of pointers. +*/ | ||
62 | |||
63 | char *filename; /*+ The name of the file to write the results to. +*/ | ||
64 | |||
65 | size_t itemsize; /*+ The size of each item. +*/ | ||
66 | int (*compare)(const void*,const void*); /*+ The comparison function. +*/ | ||
67 | } | ||
68 | thread_data; | ||
69 | |||
70 | amb | 996 | /* Thread variables */ |
71 | amb | 991 | |
72 | amb | 996 | #if defined(USE_PTHREADS) && USE_PTHREADS |
73 | |||
74 | static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; | ||
75 | static pthread_cond_t running_cond = PTHREAD_COND_INITIALIZER; | ||
76 | |||
77 | #endif | ||
78 | |||
79 | amb | 991 | /* Thread helper functions */ |
80 | |||
81 | static void *filesort_fixed_heapsort_thread(thread_data *thread); | ||
82 | static void *filesort_vary_heapsort_thread(thread_data *thread); | ||
83 | |||
84 | |||
85 | amb | 269 | /*++++++++++++++++++++++++++++++++++++++ |
86 | amb | 310 | A function to sort the contents of a file of fixed length objects using a |
87 | limited amount of RAM. | ||
88 | amb | 269 | |
89 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort | ||
90 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. | ||
91 | The individual sort steps and the merge step both use a "Heap sort" | ||
92 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well | ||
93 | if the data is already partially sorted. | ||
94 | |||
95 | amb | 948 | index_t filesort_fixed Returns the number of objects kept. |
96 | |||
97 | amb | 269 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
98 | |||
99 | int fd_out The file descriptor of the output file (opened for writing and empty). | ||
100 | |||
101 | size_t itemsize The size of each item in the file that needs sorting. | ||
102 | |||
103 | amb | 1106 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
104 | each item before they have been sorted. The second parameter is the number of objects | ||
105 | previously read from the input file. If the function returns 1 then the object is kept | ||
106 | and it is sorted, otherwise it is ignored. | ||
107 | amb | 269 | |
108 | amb | 1106 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
109 | to qsort if the data to be sorted is an array of things not pointers. | ||
110 | |||
111 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for | ||
112 | each item after they have been sorted. The second parameter is the number of objects | ||
113 | already written to the output file. If the function returns 1 then the object is written | ||
114 | to the output file., otherwise it is ignored. | ||
115 | amb | 269 | ++++++++++++++++++++++++++++++++++++++*/ |
116 | |||
117 | amb | 1106 | index_t filesort_fixed(int fd_in,int fd_out,size_t itemsize,int (*pre_sort_function)(void*,index_t), |
118 | int (*compare_function)(const void*,const void*), | ||
119 | int (*post_sort_function)(void*,index_t)) | ||
120 | amb | 269 | { |
121 | int *fds=NULL,*heap=NULL; | ||
122 | int nfiles=0,ndata=0; | ||
123 | amb | 1106 | index_t count_out=0,count_in=0; |
124 | amb | 991 | size_t nitems=option_filesort_ramsize/(option_filesort_threads*(itemsize+sizeof(void*))); |
125 | void *data,**datap; | ||
126 | thread_data *threads; | ||
127 | amb | 269 | int i,more=1; |
128 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
129 | int nthreads=0; | ||
130 | #endif | ||
131 | amb | 269 | |
132 | /* Allocate the RAM buffer and other bits */ | ||
133 | |||
134 | amb | 991 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
135 | amb | 269 | |
136 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
137 | { | ||
138 | threads[i].running=0; | ||
139 | amb | 269 | |
140 | amb | 991 | threads[i].data=malloc(nitems*itemsize); |
141 | threads[i].datap=malloc(nitems*sizeof(void*)); | ||
142 | |||
143 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); | ||
144 | |||
145 | threads[i].itemsize=itemsize; | ||
146 | amb | 1106 | threads[i].compare=compare_function; |
147 | amb | 991 | } |
148 | |||
149 | amb | 269 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
150 | |||
151 | do | ||
152 | { | ||
153 | amb | 991 | int thread=0; |
154 | amb | 269 | |
155 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
156 | |||
157 | amb | 1020 | if(option_filesort_threads>1) |
158 | { | ||
159 | /* Find a spare slot (one *must* be unused at all times) */ | ||
160 | amb | 991 | |
161 | amb | 1020 | pthread_mutex_lock(&running_mutex); |
162 | amb | 996 | |
163 | amb | 1020 | for(thread=0;thread<option_filesort_threads;thread++) |
164 | if(!threads[thread].running) | ||
165 | break; | ||
166 | amb | 991 | |
167 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
168 | } | ||
169 | amb | 996 | |
170 | amb | 991 | #endif |
171 | |||
172 | amb | 269 | /* Read in the data and create pointers */ |
173 | |||
174 | amb | 1106 | for(i=0;i<nitems;) |
175 | amb | 269 | { |
176 | amb | 991 | threads[thread].datap[i]=threads[thread].data+i*itemsize; |
177 | amb | 269 | |
178 | amb | 991 | if(ReadFile(fd_in,threads[thread].datap[i],itemsize)) |
179 | amb | 269 | { |
180 | more=0; | ||
181 | break; | ||
182 | } | ||
183 | |||
184 | amb | 1106 | if(!pre_sort_function || pre_sort_function(threads[thread].datap[i],count_in)) |
185 | { | ||
186 | i++; | ||
187 | count_in++; | ||
188 | } | ||
189 | amb | 269 | } |
190 | |||
191 | amb | 991 | threads[thread].n=i; |
192 | amb | 269 | |
193 | amb | 1020 | /* Shortcut if there is no previous data and no more data (i.e. no data at all) */ |
194 | amb | 546 | |
195 | amb | 1106 | if(more==0 && count_in==0) |
196 | amb | 546 | goto tidy_and_exit; |
197 | |||
198 | amb | 543 | /* No new data read in this time round */ |
199 | |||
200 | amb | 991 | if(threads[thread].n==0) |
201 | amb | 269 | break; |
202 | |||
203 | amb | 991 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
204 | amb | 269 | |
205 | amb | 991 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); |
206 | amb | 269 | |
207 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
208 | amb | 269 | |
209 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
210 | |||
211 | if(more==0 && nfiles==0) | ||
212 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
213 | else if(option_filesort_threads>1) | ||
214 | amb | 269 | { |
215 | amb | 996 | pthread_mutex_lock(&running_mutex); |
216 | |||
217 | amb | 991 | while(nthreads==(option_filesort_threads-1)) |
218 | amb | 269 | { |
219 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
220 | if(threads[i].running==2) | ||
221 | { | ||
222 | pthread_join(threads[i].thread,NULL); | ||
223 | threads[i].running=0; | ||
224 | nthreads--; | ||
225 | } | ||
226 | |||
227 | if(nthreads==(option_filesort_threads-1)) | ||
228 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
229 | amb | 269 | } |
230 | |||
231 | amb | 991 | threads[thread].running=1; |
232 | amb | 269 | |
233 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
234 | |||
235 | amb | 991 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_fixed_heapsort_thread,&threads[thread]); |
236 | amb | 269 | |
237 | amb | 991 | nthreads++; |
238 | } | ||
239 | amb | 1020 | else |
240 | amb | 1021 | filesort_fixed_heapsort_thread(&threads[thread]); |
241 | amb | 269 | |
242 | amb | 991 | #else |
243 | amb | 269 | |
244 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
245 | |||
246 | if(more==0 && nfiles==0) | ||
247 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
248 | else | ||
249 | amb | 1020 | filesort_fixed_heapsort_thread(&threads[thread]); |
250 | amb | 269 | |
251 | amb | 991 | #endif |
252 | amb | 269 | |
253 | nfiles++; | ||
254 | } | ||
255 | while(more); | ||
256 | |||
257 | amb | 991 | /* Wait for all of the threads to finish */ |
258 | amb | 269 | |
259 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
260 | |||
261 | amb | 1020 | while(option_filesort_threads>1 && nthreads) |
262 | amb | 991 | { |
263 | amb | 996 | pthread_mutex_lock(&running_mutex); |
264 | amb | 991 | |
265 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
266 | |||
267 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
268 | if(threads[i].running==2) | ||
269 | { | ||
270 | pthread_join(threads[i].thread,NULL); | ||
271 | threads[i].running=0; | ||
272 | nthreads--; | ||
273 | } | ||
274 | |||
275 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
276 | amb | 991 | } |
277 | |||
278 | #endif | ||
279 | |||
280 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ | ||
281 | |||
282 | amb | 269 | if(nfiles==1) |
283 | { | ||
284 | amb | 991 | for(i=0;i<threads[0].n;i++) |
285 | amb | 269 | { |
286 | amb | 1106 | if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out)) |
287 | amb | 274 | { |
288 | amb | 991 | WriteFile(fd_out,threads[0].datap[i],itemsize); |
289 | amb | 1106 | count_out++; |
290 | amb | 274 | } |
291 | amb | 269 | } |
292 | |||
293 | amb | 991 | DeleteFile(threads[0].filename); |
294 | amb | 269 | |
295 | amb | 283 | goto tidy_and_exit; |
296 | amb | 269 | } |
297 | |||
298 | /* Check that number of files is less than file size */ | ||
299 | |||
300 | assert(nfiles<nitems); | ||
301 | |||
302 | /* Open all of the temporary files */ | ||
303 | |||
304 | fds=(int*)malloc(nfiles*sizeof(int)); | ||
305 | |||
306 | for(i=0;i<nfiles;i++) | ||
307 | { | ||
308 | amb | 991 | char *filename=threads[0].filename; |
309 | |||
310 | amb | 284 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
311 | amb | 269 | |
312 | fds[i]=ReOpenFile(filename); | ||
313 | |||
314 | DeleteFile(filename); | ||
315 | } | ||
316 | |||
317 | /* Perform an n-way merge using a binary heap */ | ||
318 | |||
319 | amb | 875 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
320 | amb | 269 | |
321 | amb | 991 | data =threads[0].data; |
322 | datap=threads[0].datap; | ||
323 | |||
324 | amb | 269 | /* Fill the heap to start with */ |
325 | |||
326 | for(i=0;i<nfiles;i++) | ||
327 | { | ||
328 | int index; | ||
329 | |||
330 | datap[i]=data+i*itemsize; | ||
331 | |||
332 | ReadFile(fds[i],datap[i],itemsize); | ||
333 | |||
334 | amb | 875 | index=i+1; |
335 | |||
336 | amb | 867 | heap[index]=i; |
337 | amb | 269 | |
338 | /* Bubble up the new value */ | ||
339 | |||
340 | amb | 875 | while(index>1) |
341 | amb | 269 | { |
342 | int newindex; | ||
343 | int temp; | ||
344 | |||
345 | amb | 875 | newindex=index/2; |
346 | amb | 269 | |
347 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
348 | amb | 869 | break; |
349 | |||
350 | amb | 269 | temp=heap[index]; |
351 | heap[index]=heap[newindex]; | ||
352 | heap[newindex]=temp; | ||
353 | |||
354 | index=newindex; | ||
355 | } | ||
356 | } | ||
357 | |||
358 | /* Repeatedly pull out the root of the heap and refill from the same file */ | ||
359 | |||
360 | ndata=nfiles; | ||
361 | |||
362 | do | ||
363 | { | ||
364 | amb | 875 | int index=1; |
365 | amb | 269 | |
366 | amb | 1106 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
367 | amb | 274 | { |
368 | amb | 867 | WriteFile(fd_out,datap[heap[index]],itemsize); |
369 | amb | 1106 | count_out++; |
370 | amb | 274 | } |
371 | amb | 269 | |
372 | amb | 867 | if(ReadFile(fds[heap[index]],datap[heap[index]],itemsize)) |
373 | amb | 269 | { |
374 | amb | 875 | heap[index]=heap[ndata]; |
375 | amb | 870 | ndata--; |
376 | amb | 269 | } |
377 | |||
378 | /* Bubble down the new value */ | ||
379 | |||
380 | amb | 875 | while((2*index)<ndata) |
381 | amb | 269 | { |
382 | amb | 873 | int newindex; |
383 | amb | 269 | int temp; |
384 | |||
385 | amb | 875 | newindex=2*index; |
386 | amb | 269 | |
387 | amb | 1106 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
388 | amb | 873 | newindex=newindex+1; |
389 | amb | 869 | |
390 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
391 | amb | 869 | break; |
392 | |||
393 | amb | 269 | temp=heap[newindex]; |
394 | heap[newindex]=heap[index]; | ||
395 | heap[index]=temp; | ||
396 | |||
397 | index=newindex; | ||
398 | } | ||
399 | |||
400 | amb | 875 | if((2*index)==ndata) |
401 | amb | 269 | { |
402 | int newindex; | ||
403 | int temp; | ||
404 | |||
405 | amb | 875 | newindex=2*index; |
406 | amb | 269 | |
407 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
408 | amb | 869 | ; /* break */ |
409 | else | ||
410 | { | ||
411 | temp=heap[newindex]; | ||
412 | heap[newindex]=heap[index]; | ||
413 | heap[index]=temp; | ||
414 | } | ||
415 | amb | 269 | } |
416 | } | ||
417 | while(ndata>0); | ||
418 | |||
419 | /* Tidy up */ | ||
420 | |||
421 | amb | 283 | tidy_and_exit: |
422 | amb | 269 | |
423 | amb | 283 | if(fds) |
424 | { | ||
425 | for(i=0;i<nfiles;i++) | ||
426 | CloseFile(fds[i]); | ||
427 | free(fds); | ||
428 | } | ||
429 | |||
430 | if(heap) | ||
431 | free(heap); | ||
432 | |||
433 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
434 | { | ||
435 | free(threads[i].data); | ||
436 | free(threads[i].datap); | ||
437 | amb | 948 | |
438 | amb | 991 | free(threads[i].filename); |
439 | } | ||
440 | |||
441 | amb | 1106 | return(count_out); |
442 | amb | 269 | } |
443 | |||
444 | |||
445 | /*++++++++++++++++++++++++++++++++++++++ | ||
446 | amb | 310 | A function to sort the contents of a file of variable length objects (each |
447 | amb | 867 | preceded by its length in FILESORT_VARSIZE bytes) using a limited amount of RAM. |
448 | amb | 310 | |
449 | The data is sorted using a "Merge sort" http://en.wikipedia.org/wiki/Merge_sort | ||
450 | and in particular an "external sort" http://en.wikipedia.org/wiki/External_sorting. | ||
451 | The individual sort steps and the merge step both use a "Heap sort" | ||
452 | http://en.wikipedia.org/wiki/Heapsort. The combination of the two should work well | ||
453 | if the data is already partially sorted. | ||
454 | |||
455 | amb | 948 | index_t filesort_vary Returns the number of objects kept. |
456 | |||
457 | amb | 310 | int fd_in The file descriptor of the input file (opened for reading and at the beginning). |
458 | |||
459 | int fd_out The file descriptor of the output file (opened for writing and empty). | ||
460 | |||
461 | amb | 1106 | int (*pre_sort_function)(void *,index_t) If non-NULL then this function is called for |
462 | each item before they have been sorted. The second parameter is the number of objects | ||
463 | previously read from the input file. If the function returns 1 then the object is kept | ||
464 | and it is sorted, otherwise it is ignored. | ||
465 | amb | 310 | |
466 | amb | 1106 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
467 | to qsort if the data to be sorted is an array of things not pointers. | ||
468 | |||
469 | int (*post_sort_function)(void *,index_t) If non-NULL then this function is called for | ||
470 | each item after they have been sorted. The second parameter is the number of objects | ||
471 | already written to the output file. If the function returns 1 then the object is written | ||
472 | to the output file., otherwise it is ignored. | ||
473 | amb | 310 | ++++++++++++++++++++++++++++++++++++++*/ |
474 | |||
475 | amb | 1106 | index_t filesort_vary(int fd_in,int fd_out,int (*pre_sort_function)(void*,index_t), |
476 | int (*compare_function)(const void*,const void*), | ||
477 | int (*post_sort_function)(void*,index_t)) | ||
478 | amb | 310 | { |
479 | int *fds=NULL,*heap=NULL; | ||
480 | int nfiles=0,ndata=0; | ||
481 | amb | 1106 | index_t count_out=0,count_in=0; |
482 | amb | 991 | size_t datasize=option_filesort_ramsize/option_filesort_threads; |
483 | amb | 311 | FILESORT_VARINT nextitemsize,largestitemsize=0; |
484 | amb | 991 | void *data,**datap; |
485 | thread_data *threads; | ||
486 | amb | 310 | int i,more=1; |
487 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
488 | int nthreads=0; | ||
489 | #endif | ||
490 | amb | 310 | |
491 | /* Allocate the RAM buffer and other bits */ | ||
492 | |||
493 | amb | 991 | threads=(thread_data*)malloc(option_filesort_threads*sizeof(thread_data)); |
494 | amb | 310 | |
495 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
496 | { | ||
497 | threads[i].running=0; | ||
498 | amb | 310 | |
499 | amb | 991 | threads[i].data=malloc(datasize); |
500 | threads[i].datap=NULL; | ||
501 | |||
502 | threads[i].filename=(char*)malloc(strlen(option_tmpdirname)+24); | ||
503 | |||
504 | amb | 1106 | threads[i].compare=compare_function; |
505 | amb | 991 | } |
506 | |||
507 | amb | 310 | /* Loop around, fill the buffer, sort the data and write a temporary file */ |
508 | |||
509 | amb | 311 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) /* Always have the next item size known in advance */ |
510 | amb | 310 | goto tidy_and_exit; |
511 | |||
512 | do | ||
513 | { | ||
514 | amb | 311 | size_t ramused=FILESORT_VARALIGN-FILESORT_VARSIZE; |
515 | amb | 991 | int thread=0; |
516 | amb | 310 | |
517 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
518 | amb | 310 | |
519 | amb | 1020 | if(option_filesort_threads>1) |
520 | { | ||
521 | /* Find a spare slot (one *must* be unused at all times) */ | ||
522 | amb | 991 | |
523 | amb | 1020 | pthread_mutex_lock(&running_mutex); |
524 | amb | 996 | |
525 | amb | 1020 | for(thread=0;thread<option_filesort_threads;thread++) |
526 | if(!threads[thread].running) | ||
527 | break; | ||
528 | amb | 991 | |
529 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
530 | } | ||
531 | amb | 996 | |
532 | amb | 991 | #endif |
533 | |||
534 | threads[thread].datap=threads[thread].data+datasize; | ||
535 | |||
536 | threads[thread].n=0; | ||
537 | |||
538 | amb | 310 | /* Read in the data and create pointers */ |
539 | |||
540 | amb | 991 | while((ramused+FILESORT_VARSIZE+nextitemsize)<=((void*)threads[thread].datap-sizeof(void*)-threads[thread].data)) |
541 | amb | 310 | { |
542 | amb | 311 | FILESORT_VARINT itemsize=nextitemsize; |
543 | amb | 310 | |
544 | if(itemsize>largestitemsize) | ||
545 | largestitemsize=itemsize; | ||
546 | |||
547 | amb | 991 | *(FILESORT_VARINT*)(threads[thread].data+ramused)=itemsize; |
548 | amb | 310 | |
549 | amb | 311 | ramused+=FILESORT_VARSIZE; |
550 | amb | 310 | |
551 | amb | 991 | ReadFile(fd_in,threads[thread].data+ramused,itemsize); |
552 | amb | 310 | |
553 | amb | 1106 | if(!pre_sort_function || pre_sort_function(threads[thread].data+ramused,count_in)) |
554 | { | ||
555 | *--threads[thread].datap=threads[thread].data+ramused; /* points to real data */ | ||
556 | amb | 310 | |
557 | amb | 1106 | ramused+=itemsize; |
558 | amb | 311 | |
559 | amb | 1106 | ramused =FILESORT_VARALIGN*((ramused+FILESORT_VARSIZE-1)/FILESORT_VARALIGN); |
560 | ramused+=FILESORT_VARALIGN-FILESORT_VARSIZE; | ||
561 | amb | 311 | |
562 | amb | 1106 | count_in++; |
563 | threads[thread].n++; | ||
564 | } | ||
565 | amb | 310 | |
566 | amb | 311 | if(ReadFile(fd_in,&nextitemsize,FILESORT_VARSIZE)) |
567 | amb | 310 | { |
568 | more=0; | ||
569 | break; | ||
570 | } | ||
571 | } | ||
572 | |||
573 | amb | 543 | /* No new data read in this time round */ |
574 | |||
575 | amb | 991 | if(threads[thread].n==0) |
576 | amb | 310 | break; |
577 | |||
578 | amb | 991 | /* Sort the data pointers using a heap sort (potentially in a thread) */ |
579 | amb | 310 | |
580 | amb | 1021 | if(more==0 && nfiles==0) |
581 | threads[thread].filename[0]=0; | ||
582 | else | ||
583 | sprintf(threads[thread].filename,"%s/filesort.%d.tmp",option_tmpdirname,nfiles); | ||
584 | amb | 310 | |
585 | amb | 991 | #if defined(USE_PTHREADS) && USE_PTHREADS |
586 | amb | 310 | |
587 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
588 | |||
589 | if(more==0 && nfiles==0) | ||
590 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
591 | else if(option_filesort_threads>1) | ||
592 | amb | 310 | { |
593 | amb | 996 | pthread_mutex_lock(&running_mutex); |
594 | |||
595 | amb | 991 | while(nthreads==(option_filesort_threads-1)) |
596 | amb | 310 | { |
597 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
598 | if(threads[i].running==2) | ||
599 | { | ||
600 | pthread_join(threads[i].thread,NULL); | ||
601 | threads[i].running=0; | ||
602 | nthreads--; | ||
603 | } | ||
604 | |||
605 | if(nthreads==(option_filesort_threads-1)) | ||
606 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
607 | amb | 310 | } |
608 | |||
609 | amb | 991 | threads[thread].running=1; |
610 | |||
611 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
612 | |||
613 | amb | 991 | pthread_create(&threads[thread].thread,NULL,(void* (*)(void*))filesort_vary_heapsort_thread,&threads[thread]); |
614 | |||
615 | nthreads++; | ||
616 | amb | 310 | } |
617 | amb | 1020 | else |
618 | amb | 1021 | filesort_vary_heapsort_thread(&threads[thread]); |
619 | amb | 310 | |
620 | amb | 991 | #else |
621 | amb | 310 | |
622 | amb | 1021 | /* Shortcut if only one file, don't write to disk */ |
623 | |||
624 | if(more==0 && nfiles==0) | ||
625 | filesort_heapsort(threads[thread].datap,threads[thread].n,threads[thread].compare); | ||
626 | else | ||
627 | amb | 1020 | filesort_vary_heapsort_thread(&threads[thread]); |
628 | amb | 310 | |
629 | amb | 991 | #endif |
630 | amb | 310 | |
631 | amb | 991 | nfiles++; |
632 | } | ||
633 | while(more); | ||
634 | |||
635 | /* Wait for all of the threads to finish */ | ||
636 | |||
637 | #if defined(USE_PTHREADS) && USE_PTHREADS | ||
638 | |||
639 | amb | 1020 | while(option_filesort_threads>1 && nthreads) |
640 | amb | 991 | { |
641 | amb | 996 | pthread_mutex_lock(&running_mutex); |
642 | amb | 991 | |
643 | amb | 996 | pthread_cond_wait(&running_cond,&running_mutex); |
644 | |||
645 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
646 | if(threads[i].running==2) | ||
647 | { | ||
648 | pthread_join(threads[i].thread,NULL); | ||
649 | threads[i].running=0; | ||
650 | nthreads--; | ||
651 | } | ||
652 | |||
653 | amb | 996 | pthread_mutex_unlock(&running_mutex); |
654 | amb | 991 | } |
655 | |||
656 | #endif | ||
657 | |||
658 | /* Shortcut if only one file, lucky for us we still have the data in RAM) */ | ||
659 | |||
660 | if(nfiles==1) | ||
661 | { | ||
662 | for(i=0;i<threads[0].n;i++) | ||
663 | amb | 310 | { |
664 | amb | 1106 | if(!post_sort_function || post_sort_function(threads[0].datap[i],count_out)) |
665 | amb | 991 | { |
666 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(threads[0].datap[i]-FILESORT_VARSIZE); | ||
667 | amb | 310 | |
668 | amb | 991 | WriteFile(fd_out,threads[0].datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
669 | amb | 1106 | count_out++; |
670 | amb | 991 | } |
671 | amb | 310 | } |
672 | |||
673 | amb | 991 | DeleteFile(threads[0].filename); |
674 | amb | 310 | |
675 | amb | 991 | goto tidy_and_exit; |
676 | amb | 310 | } |
677 | |||
678 | /* Check that number of files is less than file size */ | ||
679 | |||
680 | amb | 311 | largestitemsize=FILESORT_VARALIGN*(1+(largestitemsize+FILESORT_VARALIGN-FILESORT_VARSIZE)/FILESORT_VARALIGN); |
681 | amb | 310 | |
682 | amb | 991 | assert(nfiles<((datasize-nfiles*sizeof(void*))/largestitemsize)); |
683 | amb | 310 | |
684 | /* Open all of the temporary files */ | ||
685 | |||
686 | fds=(int*)malloc(nfiles*sizeof(int)); | ||
687 | |||
688 | for(i=0;i<nfiles;i++) | ||
689 | { | ||
690 | amb | 991 | char *filename=threads[0].filename; |
691 | |||
692 | amb | 310 | sprintf(filename,"%s/filesort.%d.tmp",option_tmpdirname,i); |
693 | |||
694 | fds[i]=ReOpenFile(filename); | ||
695 | |||
696 | DeleteFile(filename); | ||
697 | } | ||
698 | |||
699 | /* Perform an n-way merge using a binary heap */ | ||
700 | |||
701 | amb | 875 | heap=(int*)malloc((1+nfiles)*sizeof(int)); |
702 | amb | 310 | |
703 | amb | 991 | data=threads[0].data; |
704 | datap=data+datasize-nfiles*sizeof(void*); | ||
705 | amb | 310 | |
706 | /* Fill the heap to start with */ | ||
707 | |||
708 | for(i=0;i<nfiles;i++) | ||
709 | { | ||
710 | int index; | ||
711 | amb | 311 | FILESORT_VARINT itemsize; |
712 | amb | 310 | |
713 | amb | 311 | datap[i]=data+FILESORT_VARALIGN-FILESORT_VARSIZE+i*largestitemsize; |
714 | amb | 310 | |
715 | amb | 311 | ReadFile(fds[i],&itemsize,FILESORT_VARSIZE); |
716 | amb | 310 | |
717 | amb | 311 | *(FILESORT_VARINT*)(datap[i]-FILESORT_VARSIZE)=itemsize; |
718 | amb | 310 | |
719 | ReadFile(fds[i],datap[i],itemsize); | ||
720 | |||
721 | amb | 875 | index=i+1; |
722 | |||
723 | amb | 867 | heap[index]=i; |
724 | amb | 310 | |
725 | /* Bubble up the new value */ | ||
726 | |||
727 | amb | 875 | while(index>1) |
728 | amb | 310 | { |
729 | int newindex; | ||
730 | int temp; | ||
731 | |||
732 | amb | 875 | newindex=index/2; |
733 | amb | 310 | |
734 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])>=0) |
735 | amb | 869 | break; |
736 | |||
737 | amb | 310 | temp=heap[index]; |
738 | heap[index]=heap[newindex]; | ||
739 | heap[newindex]=temp; | ||
740 | |||
741 | index=newindex; | ||
742 | } | ||
743 | } | ||
744 | |||
745 | /* Repeatedly pull out the root of the heap and refill from the same file */ | ||
746 | |||
747 | ndata=nfiles; | ||
748 | |||
749 | do | ||
750 | { | ||
751 | amb | 875 | int index=1; |
752 | amb | 311 | FILESORT_VARINT itemsize; |
753 | amb | 310 | |
754 | amb | 1106 | if(!post_sort_function || post_sort_function(datap[heap[index]],count_out)) |
755 | amb | 310 | { |
756 | amb | 867 | itemsize=*(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE); |
757 | amb | 310 | |
758 | amb | 867 | WriteFile(fd_out,datap[heap[index]]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); |
759 | amb | 1106 | count_out++; |
760 | amb | 310 | } |
761 | |||
762 | amb | 867 | if(ReadFile(fds[heap[index]],&itemsize,FILESORT_VARSIZE)) |
763 | amb | 310 | { |
764 | amb | 875 | heap[index]=heap[ndata]; |
765 | amb | 870 | ndata--; |
766 | amb | 310 | } |
767 | else | ||
768 | { | ||
769 | amb | 867 | *(FILESORT_VARINT*)(datap[heap[index]]-FILESORT_VARSIZE)=itemsize; |
770 | amb | 310 | |
771 | amb | 867 | ReadFile(fds[heap[index]],datap[heap[index]],itemsize); |
772 | amb | 310 | } |
773 | |||
774 | /* Bubble down the new value */ | ||
775 | |||
776 | amb | 875 | while((2*index)<ndata) |
777 | amb | 310 | { |
778 | amb | 873 | int newindex; |
779 | amb | 310 | int temp; |
780 | |||
781 | amb | 875 | newindex=2*index; |
782 | amb | 310 | |
783 | amb | 1106 | if(compare_function(datap[heap[newindex]],datap[heap[newindex+1]])>=0) |
784 | amb | 873 | newindex=newindex+1; |
785 | amb | 869 | |
786 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
787 | amb | 869 | break; |
788 | |||
789 | amb | 310 | temp=heap[newindex]; |
790 | heap[newindex]=heap[index]; | ||
791 | heap[index]=temp; | ||
792 | |||
793 | index=newindex; | ||
794 | } | ||
795 | |||
796 | amb | 875 | if((2*index)==ndata) |
797 | amb | 310 | { |
798 | int newindex; | ||
799 | int temp; | ||
800 | |||
801 | amb | 875 | newindex=2*index; |
802 | amb | 310 | |
803 | amb | 1106 | if(compare_function(datap[heap[index]],datap[heap[newindex]])<=0) |
804 | amb | 869 | ; /* break */ |
805 | else | ||
806 | { | ||
807 | temp=heap[newindex]; | ||
808 | heap[newindex]=heap[index]; | ||
809 | heap[index]=temp; | ||
810 | } | ||
811 | amb | 310 | } |
812 | } | ||
813 | while(ndata>0); | ||
814 | |||
815 | /* Tidy up */ | ||
816 | |||
817 | tidy_and_exit: | ||
818 | |||
819 | if(fds) | ||
820 | { | ||
821 | for(i=0;i<nfiles;i++) | ||
822 | CloseFile(fds[i]); | ||
823 | free(fds); | ||
824 | } | ||
825 | |||
826 | if(heap) | ||
827 | free(heap); | ||
828 | |||
829 | amb | 991 | for(i=0;i<option_filesort_threads;i++) |
830 | { | ||
831 | free(threads[i].data); | ||
832 | amb | 948 | |
833 | amb | 991 | free(threads[i].filename); |
834 | } | ||
835 | |||
836 | amb | 1106 | return(count_out); |
837 | amb | 310 | } |
838 | |||
839 | |||
840 | /*++++++++++++++++++++++++++++++++++++++ | ||
841 | amb | 991 | A wrapper function that can be run in a thread for fixed data. |
842 | |||
843 | void *filesort_fixed_heapsort_thread Returns NULL (required to return void*). | ||
844 | |||
845 | thread_data *thread The data to be processed in this thread. | ||
846 | ++++++++++++++++++++++++++++++++++++++*/ | ||
847 | |||
848 | static void *filesort_fixed_heapsort_thread(thread_data *thread) | ||
849 | { | ||
850 | int fd,i; | ||
851 | |||
852 | /* Sort the data pointers using a heap sort */ | ||
853 | |||
854 | filesort_heapsort(thread->datap,thread->n,thread->compare); | ||
855 | |||
856 | /* Create a temporary file and write the result */ | ||
857 | |||
858 | fd=OpenFileNew(thread->filename); | ||
859 | |||
860 | for(i=0;i<thread->n;i++) | ||
861 | WriteFile(fd,thread->datap[i],thread->itemsize); | ||
862 | |||
863 | CloseFile(fd); | ||
864 | |||
865 | amb | 996 | #if defined(USE_PTHREADS) && USE_PTHREADS |
866 | |||
867 | amb | 1020 | if(option_filesort_threads>1) |
868 | { | ||
869 | pthread_mutex_lock(&running_mutex); | ||
870 | amb | 996 | |
871 | amb | 1020 | thread->running=2; |
872 | amb | 991 | |
873 | amb | 1020 | pthread_cond_signal(&running_cond); |
874 | amb | 996 | |
875 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
876 | } | ||
877 | amb | 996 | |
878 | #endif | ||
879 | |||
880 | amb | 991 | return(NULL); |
881 | } | ||
882 | |||
883 | |||
884 | /*++++++++++++++++++++++++++++++++++++++ | ||
885 | A wrapper function that can be run in a thread for variable data. | ||
886 | |||
887 | void *filesort_vary_heapsort_thread Returns NULL (required to return void*). | ||
888 | |||
889 | thread_data *thread The data to be processed in this thread. | ||
890 | ++++++++++++++++++++++++++++++++++++++*/ | ||
891 | |||
892 | static void *filesort_vary_heapsort_thread(thread_data *thread) | ||
893 | { | ||
894 | int fd,i; | ||
895 | |||
896 | /* Sort the data pointers using a heap sort */ | ||
897 | |||
898 | filesort_heapsort(thread->datap,thread->n,thread->compare); | ||
899 | |||
900 | /* Create a temporary file and write the result */ | ||
901 | |||
902 | fd=OpenFileNew(thread->filename); | ||
903 | |||
904 | for(i=0;i<thread->n;i++) | ||
905 | { | ||
906 | FILESORT_VARINT itemsize=*(FILESORT_VARINT*)(thread->datap[i]-FILESORT_VARSIZE); | ||
907 | |||
908 | WriteFile(fd,thread->datap[i]-FILESORT_VARSIZE,itemsize+FILESORT_VARSIZE); | ||
909 | } | ||
910 | |||
911 | CloseFile(fd); | ||
912 | |||
913 | amb | 996 | #if defined(USE_PTHREADS) && USE_PTHREADS |
914 | |||
915 | amb | 1020 | if(option_filesort_threads>1) |
916 | { | ||
917 | pthread_mutex_lock(&running_mutex); | ||
918 | amb | 996 | |
919 | amb | 1020 | thread->running=2; |
920 | amb | 991 | |
921 | amb | 1020 | pthread_cond_signal(&running_cond); |
922 | amb | 996 | |
923 | amb | 1020 | pthread_mutex_unlock(&running_mutex); |
924 | } | ||
925 | amb | 996 | |
926 | #endif | ||
927 | |||
928 | amb | 991 | return(NULL); |
929 | } | ||
930 | |||
931 | |||
932 | /*++++++++++++++++++++++++++++++++++++++ | ||
933 | amb | 269 | A function to sort an array of pointers efficiently. |
934 | |||
935 | The data is sorted using a "Heap sort" http://en.wikipedia.org/wiki/Heapsort, | ||
936 | amb | 873 | in particular, this is good because it can operate in-place and doesn't |
937 | amb | 269 | allocate more memory like using qsort() does. |
938 | |||
939 | void **datap A pointer to the array of pointers to sort. | ||
940 | |||
941 | size_t nitems The number of items of data to sort. | ||
942 | |||
943 | amb | 1106 | int (*compare_function)(const void*, const void*) The comparison function. This is identical |
944 | to qsort if the data to be sorted is an array of things not pointers. | ||
945 | amb | 269 | ++++++++++++++++++++++++++++++++++++++*/ |
946 | |||
947 | amb | 1106 | void filesort_heapsort(void **datap,size_t nitems,int(*compare_function)(const void*, const void*)) |
948 | amb | 269 | { |
949 | amb | 875 | void **datap1=&datap[-1]; |
950 | amb | 269 | int i; |
951 | |||
952 | /* Fill the heap by pretending to insert the data that is already there */ | ||
953 | |||
954 | amb | 875 | for(i=2;i<=nitems;i++) |
955 | amb | 269 | { |
956 | int index=i; | ||
957 | |||
958 | /* Bubble up the new value (upside-down, put largest at top) */ | ||
959 | |||
960 | amb | 875 | while(index>1) |
961 | amb | 269 | { |
962 | int newindex; | ||
963 | void *temp; | ||
964 | |||
965 | amb | 875 | newindex=index/2; |
966 | amb | 269 | |
967 | amb | 1106 | if(compare_function(datap1[index],datap1[newindex])<=0) /* reversed comparison to filesort_fixed() above */ |
968 | amb | 869 | break; |
969 | |||
970 | amb | 875 | temp=datap1[index]; |
971 | datap1[index]=datap1[newindex]; | ||
972 | datap1[newindex]=temp; | ||
973 | amb | 269 | |
974 | index=newindex; | ||
975 | } | ||
976 | } | ||
977 | |||
978 | /* Repeatedly pull out the root of the heap and swap with the bottom item */ | ||
979 | |||
980 | amb | 875 | for(i=nitems;i>1;i--) |
981 | amb | 269 | { |
982 | amb | 875 | int index=1; |
983 | amb | 269 | void *temp; |
984 | |||
985 | amb | 875 | temp=datap1[index]; |
986 | datap1[index]=datap1[i]; | ||
987 | datap1[i]=temp; | ||
988 | amb | 269 | |
989 | /* Bubble down the new value (upside-down, put largest at top) */ | ||
990 | |||
991 | amb | 875 | while((2*index)<(i-1)) |
992 | amb | 269 | { |
993 | amb | 873 | int newindex; |
994 | amb | 269 | void *temp; |
995 | |||
996 | amb | 875 | newindex=2*index; |
997 | amb | 269 | |
998 | amb | 1106 | if(compare_function(datap1[newindex],datap1[newindex+1])<=0) /* reversed comparison to filesort_fixed() above */ |
999 | amb | 873 | newindex=newindex+1; |
1000 | amb | 869 | |
1001 | amb | 1106 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1002 | amb | 869 | break; |
1003 | |||
1004 | amb | 875 | temp=datap1[newindex]; |
1005 | datap1[newindex]=datap1[index]; | ||
1006 | datap1[index]=temp; | ||
1007 | amb | 269 | |
1008 | index=newindex; | ||
1009 | } | ||
1010 | |||
1011 | amb | 875 | if((2*index)==(i-1)) |
1012 | amb | 269 | { |
1013 | int newindex; | ||
1014 | void *temp; | ||
1015 | |||
1016 | amb | 875 | newindex=2*index; |
1017 | amb | 269 | |
1018 | amb | 1106 | if(compare_function(datap1[index],datap1[newindex])>=0) /* reversed comparison to filesort_fixed() above */ |
1019 | amb | 869 | ; /* break */ |
1020 | else | ||
1021 | { | ||
1022 | amb | 875 | temp=datap1[newindex]; |
1023 | datap1[newindex]=datap1[index]; | ||
1024 | datap1[index]=temp; | ||
1025 | amb | 869 | } |
1026 | amb | 269 | } |
1027 | } | ||
1028 | } |
Properties
Name | Value |
---|---|
cvs:description | Functions to perform sorting. |