FQuickSort.hpp 18.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
#ifndef FQUICKSORT_HPP
#define FQUICKSORT_HPP

#include <omp.h>
#include <cmath>
#include <ctime>
#include <cstdlib>
#include <cstring>

#include <mpi.h>

12
13
14
#include "FGlobal.hpp"
#include "FMemUtils.hpp"
#include "FTrace.hpp"
15
#include "FMpi.hpp"
16
17

#include "FOmpBarrier.hpp"
18

19
20
21
22
23
24
25
26
27
28
/** This class is parallel quick sort
  * It hold a mpi version
  * + 2 openmp versions (one on tasks and the other like mpi)
  * + a sequential version
  *
  * The task based algorithm is easy to undestand,
  * for the mpi/openmp2nd please see
  * Introduction to parallel computing (Grama Gupta Karypis Kumar)
  */

29
template <class SortType, class CompareType, class IndexType>
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class FQuickSort {
    ////////////////////////////////////////////////////////////
    // Miscialenous functions
    ////////////////////////////////////////////////////////////

    /** swap to value */
    template <class NumType>
    static inline void Swap(NumType& value, NumType& other){
        NumType temp = value;
        value = other;
        other = temp;
    }

    ////////////////////////////////////////////////////////////
    // Quick sort
    ////////////////////////////////////////////////////////////

47
    /* Use in the sequential qs */
48
49
    static IndexType QsPartition(SortType array[], IndexType left, IndexType right){
        const IndexType part = right;
berenger-bramas's avatar
berenger-bramas committed
50
        Swap(array[part],array[((right - left ) / 2) + left]);
51
52
53
        --right;

        while(true){
54
            while(CompareType(array[left]) < CompareType(array[part])){
55
56
                ++left;
            }
57
            while(right >= left && CompareType(array[part]) <= CompareType(array[right])){
58
59
60
61
62
63
64
65
66
67
68
69
70
71
                --right;
            }
            if(right < left) break;

            Swap(array[left],array[right]);
            ++left;
            --right;
        }

        Swap(array[part],array[left]);

        return left;
    }

72
    /* A local iteration of qs */
73
74
75
    static void QsLocal(SortType array[], const CompareType& pivot,
               IndexType myLeft, IndexType myRight,
               IndexType& prefix, IndexType& sufix){
76

77
78
        IndexType leftIter = myLeft;
        IndexType rightIter = myRight;
79
80

        while(true){
81
            while(CompareType(array[leftIter]) <= pivot && leftIter < rightIter){
82
83
                ++leftIter;
            }
84
            while(leftIter <= rightIter && pivot < CompareType(array[rightIter])){
85
86
87
88
89
90
91
92
93
94
95
96
97
                --rightIter;
            }
            if(rightIter < leftIter) break;

            Swap(array[leftIter],array[rightIter]);
            ++leftIter;
            --rightIter;
        }

        prefix = leftIter - myLeft;
        sufix = myRight - myLeft - prefix + 1;
    }

98
    /* The sequential qs */
99
    static void QsSequentialStep(SortType array[], const IndexType left, const IndexType right){
100
        if(left < right){
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
            const IndexType part = QsPartition(array, left, right);
            QsSequentialStep(array,part + 1,right);
            QsSequentialStep(array,left,part - 1);
        }
    }

    /** A task dispatcher */
    static void QsOmpTask(SortType array[], const IndexType left, const IndexType right, const int deep){
        if(left < right){
            if( deep ){
                const IndexType part = QsPartition(array, left, right);
                #pragma omp task
                QsOmpTask(array,part + 1,right, deep - 1);
                #pragma omp task
                QsOmpTask(array,left,part - 1, deep - 1);
            }
            else {
                const IndexType part = QsPartition(array, left, right);
                QsSequentialStep(array,part + 1,right);
                QsSequentialStep(array,left,part - 1);
            }
122
123
124
        }
    }

125
    /* The openmp qs */
126
    static void QsOmpNoTask(SortType array[], const IndexType size){
127
        FTRACE( FTrace::FFunction functionTrace(__FUNCTION__, "Quicksort" , __FILE__ , __LINE__) );
128
        struct Fix{
129
130
            IndexType pre;
            IndexType suf;
131
132
133
134
135
136
137
138
139
140
141
142
        };

        const int NbOfThreads = omp_get_max_threads();

        Fix fixes[NbOfThreads + 1];
        Fix allFixesSum[NbOfThreads + 1][NbOfThreads];

        memset(fixes,0,sizeof(Fix) * NbOfThreads);
        memset(allFixesSum,0,sizeof(Fix) * (NbOfThreads + 1) * NbOfThreads);

        SortType*const temporaryArray = reinterpret_cast<SortType*>(new char[sizeof(SortType) * size]);

143
        FOmpBarrier barriers[NbOfThreads];
144
145
146
147
148

        #pragma omp parallel
        {
            const int myThreadId = omp_get_thread_num();

149
150
            IndexType myLeft = FMpi::GetLeft(size, myThreadId, omp_get_num_threads());
            IndexType myRight = FMpi::GetRight(size, myThreadId, omp_get_num_threads()) - 1;
151

152
153
            IndexType startIndex = 0;
            IndexType endIndex = size - 1;
154
155
156
157
158
159

            int firstProc = 0;
            int lastProc = omp_get_num_threads() - 1;

            while( firstProc != lastProc && (endIndex - startIndex + 1) != 0){
                Fix* const fixesSum = &allFixesSum[0][firstProc];
160
                const IndexType nbElements = endIndex - startIndex + 1;
161

162
163
164
165
                if(myThreadId == firstProc){
                    barriers[firstProc].setNbThreads( lastProc - firstProc + 1);
                }

166
                // sort QsLocal part of the array
167
                const CompareType pivot = (CompareType(array[startIndex]) + CompareType(array[endIndex]) )/2;
168
                barriers[firstProc].wait();
169
170
171
172
173

                QsLocal(array, pivot, myLeft, myRight, fixes[myThreadId].pre, fixes[myThreadId].suf);

                // wait others that work on this part
                #pragma omp flush(array)
174
                barriers[firstProc].wait();
175
176
177
178
179
180
181
182
183
184
185
186
187

                // merge result
                if(myThreadId == firstProc){
                    fixesSum[firstProc].pre = 0;
                    fixesSum[firstProc].suf = 0;
                    for(int idxProc = firstProc ; idxProc <= lastProc ; ++idxProc){
                        fixesSum[idxProc + 1].pre = fixesSum[idxProc].pre + fixes[idxProc].pre;
                        fixesSum[idxProc + 1].suf = fixesSum[idxProc].suf + fixes[idxProc].suf;
                    }
                    #pragma omp flush(fixesSum)
                }
                // prepare copy
                if(myThreadId == firstProc + 1){
188
                    FMemUtils::memcpy(&temporaryArray[startIndex], &array[startIndex], sizeof(SortType) * nbElements );
189
190
191
                    #pragma omp flush(temporaryArray)
                }

192
                barriers[firstProc].wait();
193
194

                // copy my result where it belong (< pivot)
195
                FMemUtils::memcpy(&array[startIndex + fixesSum[myThreadId].pre], &temporaryArray[myLeft], sizeof(SortType) * fixes[myThreadId].pre);
196
197

                // copy my result where it belong (> pivot)
198
                const IndexType sufoffset = fixesSum[lastProc + 1].pre + startIndex;
199
                FMemUtils::memcpy(&array[sufoffset + fixesSum[myThreadId].suf], &temporaryArray[myLeft + fixes[myThreadId].pre ], sizeof(SortType) * fixes[myThreadId].suf);
200

201
                barriers[firstProc].wait();
202
203

                // find my next QsLocal part
204
                int splitProc = FMpi::GetProc(sufoffset - startIndex, nbElements, lastProc - firstProc + 1) + firstProc;
205
206
207
208
209
210
211
212
213
214
215
216
217
                if(splitProc == lastProc){
                    --splitProc;
                }

                if( myThreadId <= splitProc ){
                    endIndex = sufoffset - 1;
                    lastProc = splitProc;
                }
                else{
                    startIndex = sufoffset;
                    firstProc = splitProc + 1;
                }

218
219
                myLeft = FMpi::GetLeft(endIndex - startIndex + 1, myThreadId - firstProc, lastProc - firstProc + 1) + startIndex;
                myRight = FMpi::GetRight(endIndex - startIndex + 1, myThreadId - firstProc, lastProc - firstProc + 1) + startIndex - 1;
220
221
            }

222
            QsSequentialStep(array,myLeft,myRight);
223
224
225
226
227
        }

        delete[] reinterpret_cast<char*>(temporaryArray);
    }

228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
public:
    /* a sequential qs */
    static void QsSequential(SortType array[], const IndexType size){
        QsSequentialStep(array,0,size-1);
    }

    /** The openmp quick sort */
    static void QsOmp(SortType array[], const IndexType size){
        #if _OPENMP >= 200805
        #pragma omp parallel
        {
            #pragma omp single nowait
            {
                QsOmpTask(array, 0, size - 1 , 15);
            }
        }
        #else
        QsOmpNoTask(array, size);
        #endif
    }

249
    /* the mpi qs */
250
    static void QsMpi(const SortType originalArray[], IndexType size, SortType* & outputArray, IndexType& outputSize, const FMpi::FComm& originalComm){
251
        FTRACE( FTrace::FFunction functionTrace(__FUNCTION__, "Quicksort" , __FILE__ , __LINE__) );
252
253
        // We need a structure see the algorithm detail to know more
        struct Fix{
254
255
            IndexType pre;
            IndexType suf;
256
257
258
259
        };

        // first we copy data into our working buffer : outputArray
        outputArray = new SortType[size];
260
        FMemUtils::memcpy(outputArray, originalArray, sizeof(SortType) * size);
261
262
263
        outputSize = size;

        // alloc outputArray to store pre/sufixe, maximum needed is nb procs[comm world] + 1
264
265
266
267
        Fix fixes[originalComm.processCount() + 1];
        Fix fixesSum[originalComm.processCount() + 1];
        memset(fixes,0,sizeof(Fix) * originalComm.processCount());
        memset(fixesSum,0,sizeof(Fix) * (originalComm.processCount() + 1) );
268
269

        // receiving buffer
270
        IndexType bufferSize = 0;
271
272
273
        SortType* buffer = 0;

        // Create the first com
274
        FMpi::FComm currentComm(originalComm.getComm());
275
276

        // While I am not working alone on my own data
277
278
279
        while( currentComm.processCount() != 1 ){
            const int currentRank = currentComm.processId();
            const int currentNbProcs = currentComm.processCount();
280
281
282
283
284
285
286
287
288

            MPI_Request requests[currentNbProcs * 2];
            int iterRequest = 0;

            /////////////////////////////////////////////////
            // Local sort
            /////////////////////////////////////////////////

            // sort QsLocal part of the outputArray
289
            const CompareType pivot = currentComm.reduceAverageAll( CompareType(outputArray[size/2]) );
290
291
292
293
            Fix myFix;
            QsLocal(outputArray, pivot, 0, size - 1, myFix.pre, myFix.suf);

            // exchange fixes
294
            FMpi::Assert( MPI_Allgather( &myFix, sizeof(Fix), MPI_BYTE, fixes, sizeof(Fix), MPI_BYTE, currentComm.getComm()),  __LINE__ );
295
296
297
298
299
300
301
302
303
304

            // each procs compute the summation
            fixesSum[0].pre = 0;
            fixesSum[0].suf = 0;
            for(int idxProc = 0 ; idxProc < currentNbProcs ; ++idxProc){
                fixesSum[idxProc + 1].pre = fixesSum[idxProc].pre + fixes[idxProc].pre;
                fixesSum[idxProc + 1].suf = fixesSum[idxProc].suf + fixes[idxProc].suf;
            }

            // then I need to know which procs will be in the middle
305
            int splitProc = FMpi::GetProc(fixesSum[currentNbProcs].pre - 1, fixesSum[currentNbProcs].pre + fixesSum[currentNbProcs].suf, currentNbProcs);
306
307
308
309
310
311
312
313
314
315
316
317
            if(splitProc == currentNbProcs - 1){
                --splitProc;
            }

            /////////////////////////////////////////////////
            // Send my data
            /////////////////////////////////////////////////

            // above pivot (right part)
            if( fixes[currentRank].suf ){
                const int procsInSuf = currentNbProcs - 1 - splitProc;
                const int firstProcInSuf = splitProc + 1;
318
                const IndexType elementsInSuf = fixesSum[currentNbProcs].suf;
319

320
321
                const int firstProcToSend = FMpi::GetProc(fixesSum[currentRank].suf, elementsInSuf, procsInSuf) + firstProcInSuf;
                const int lastProcToSend = FMpi::GetProc(fixesSum[currentRank + 1].suf - 1, elementsInSuf, procsInSuf) + firstProcInSuf;
322

323
                IndexType sent = 0;
324
                for(int idxProc = firstProcToSend ; idxProc <= lastProcToSend ; ++idxProc){
325
                    const IndexType thisProcRight = FMpi::GetRight(elementsInSuf, idxProc - firstProcInSuf, procsInSuf);
326
                    IndexType sendToProc = thisProcRight - fixesSum[currentRank].suf - sent;
327
328
329
330

                    if(sendToProc + sent > fixes[currentRank].suf){
                        sendToProc = fixes[currentRank].suf - sent;
                    }
331
                    if( sendToProc ){
332
                        FMpi::Assert( MPI_Isend(&outputArray[sent + fixes[currentRank].pre], int(sendToProc * sizeof(SortType)), MPI_BYTE , idxProc, FMpi::TagQuickSort, currentComm.getComm(), &requests[iterRequest++]),  __LINE__ );
333
                    }
334
335
336
337
338
339
340
                    sent += sendToProc;
                }
            }

            // under pivot (left part)
            if( fixes[currentRank].pre ){
                const int procsInPre = splitProc + 1;
341
                const IndexType elementsInPre = fixesSum[currentNbProcs].pre;
342

343
344
                const int firstProcToSend = FMpi::GetProc(fixesSum[currentRank].pre, elementsInPre, procsInPre);
                const int lastProcToSend = FMpi::GetProc(fixesSum[currentRank + 1].pre - 1, elementsInPre, procsInPre);
345

346
                IndexType sent = 0;
347
                for(int idxProc = firstProcToSend ; idxProc <= lastProcToSend ; ++idxProc){
348
                    const IndexType thisProcRight = FMpi::GetRight(elementsInPre, idxProc, procsInPre);
349
                    IndexType sendToProc = thisProcRight - fixesSum[currentRank].pre - sent;
350
351
352
353

                    if(sendToProc + sent > fixes[currentRank].pre){
                        sendToProc = fixes[currentRank].pre - sent;
                    }
354
                    if(sendToProc){
355
                        FMpi::Assert( MPI_Isend(&outputArray[sent], int(sendToProc * sizeof(SortType)), MPI_BYTE , idxProc, FMpi::TagQuickSort, currentComm.getComm(), &requests[iterRequest++]),  __LINE__ );
356
                    }
357
358
359
360
361
362
363
364
365
366
367
                    sent += sendToProc;
                }
            }

            /////////////////////////////////////////////////
            // Receive data that belong to me
            /////////////////////////////////////////////////

            if( currentRank <= splitProc ){
                // I am in S-Part (smaller than pivot)
                const int procsInPre = splitProc + 1;
368
                const IndexType elementsInPre = fixesSum[currentNbProcs].pre;
369

370
371
                IndexType myLeft = FMpi::GetLeft(elementsInPre, currentRank, procsInPre);
                IndexType myRightLimit = FMpi::GetRight(elementsInPre, currentRank, procsInPre);
372
373
374
375
376
377
378
379
380
381
382
383
384

                size = myRightLimit - myLeft;
                if(bufferSize < size){
                    bufferSize = size;
                    delete[] buffer;
                    buffer = new SortType[bufferSize];
                }

                int idxProc = 0;
                while( idxProc < currentNbProcs && fixesSum[idxProc + 1].pre <= myLeft ){
                    ++idxProc;
                }

385
                IndexType indexArray = 0;
386
387

                while( idxProc < currentNbProcs && indexArray < myRightLimit - myLeft){
388
389
                    const IndexType firstIndex = FMath::Max(myLeft , fixesSum[idxProc].pre );
                    const IndexType endIndex = FMath::Min(fixesSum[idxProc + 1].pre,  myRightLimit);
390
                    if( (endIndex - firstIndex) ){
391
                        FMpi::Assert( MPI_Irecv(&buffer[indexArray], int((endIndex - firstIndex) * sizeof(SortType)), MPI_BYTE, idxProc, FMpi::TagQuickSort, currentComm.getComm(), &requests[iterRequest++]),  __LINE__ );
392
                    }
393
394
395
                    indexArray += endIndex - firstIndex;
                    ++idxProc;
                }
396
                // Proceed all send/receive
397
                FMpi::Assert( MPI_Waitall(iterRequest, requests, MPI_STATUSES_IGNORE),  __LINE__ );
398

399
                currentComm.groupReduce( 0, splitProc);
400
401
402
403
            }
            else{
                // I am in L-Part (larger than pivot)
                const int procsInSuf = currentNbProcs - 1 - splitProc;
404
                const IndexType elementsInSuf = fixesSum[currentNbProcs].suf;
405
406

                const int rankInL = currentRank - splitProc - 1;
407
408
                IndexType myLeft = FMpi::GetLeft(elementsInSuf, rankInL, procsInSuf);
                IndexType myRightLimit = FMpi::GetRight(elementsInSuf, rankInL, procsInSuf);
409
410
411
412
413
414
415
416
417
418
419
420
421

                size = myRightLimit - myLeft;
                if(bufferSize < size){
                    bufferSize = size;
                    delete[] buffer;
                    buffer = new SortType[bufferSize];
                }

                int idxProc = 0;
                while( idxProc < currentNbProcs && fixesSum[idxProc + 1].suf <= myLeft ){
                    ++idxProc;
                }

422
                IndexType indexArray = 0;
423
424

                while( idxProc < currentNbProcs && indexArray < myRightLimit - myLeft){
425
426
                    const IndexType firstIndex = FMath::Max(myLeft , fixesSum[idxProc].suf );
                    const IndexType endIndex = FMath::Min(fixesSum[idxProc + 1].suf,  myRightLimit);
427
                    if( (endIndex - firstIndex) ){
428
                        FMpi::Assert( MPI_Irecv(&buffer[indexArray], int((endIndex - firstIndex) * sizeof(SortType)), MPI_BYTE, idxProc, FMpi::TagQuickSort, currentComm.getComm(), &requests[iterRequest++]),  __LINE__ );
429
                    }
430
431
432
                    indexArray += endIndex - firstIndex;
                    ++idxProc;
                }
433
                // Proceed all send/receive
434
                FMpi::Assert( MPI_Waitall(iterRequest, requests, MPI_STATUSES_IGNORE),  __LINE__ );
435

436
                currentComm.groupReduce( splitProc + 1, currentNbProcs - 1);
437
438
439
440
441
442
443
444
445
446
447
            }



            // Copy res into outputArray
            if(outputSize < size){
                delete[] outputArray;
                outputArray = new SortType[size];
                outputSize = size;
            }

448
            FMemUtils::memcpy(outputArray, buffer, sizeof(SortType) * size);
449
450
451
452
453
454
455
456
457
458
        }

        /////////////////////////////////////////////////
        // End QsMpi sort
        /////////////////////////////////////////////////

        // Clean
        delete[] buffer;

        // Normal Quick sort
459
        QsOmp(outputArray, size);
460
461
462
463
464
465
        outputSize = size;
    }

};

#endif // FQUICKSORT_HPP