雙核CPU上的快速排序效率
為了試驗一下多核CPU上排序算法的效率,得比較單任務情況下和多任務并行排序算法的差距,因此選用快速排序算法來進行比較。
測試環境:雙核CPU 2.66GHZ
單核CPU 2.4GHZ
以下是一個快速排序算法的源代碼:
- UINTSplit(void **ppData, UINTuStart, UINTuEnd,
- COMPAREFUNCCompareFunc)
- {
- void *pSelData;
- UINTuLow;
- UINTuHigh;
- uLow = uStart;
- uHigh = uEnd;
- pSelData = ppData[uLow];
- while ( uLow < uHigh )
- {
- while ( (*CompareFunc)(ppData[uHigh], pSelData) > 0
- && uLow != uHigh )
- {
- --uHigh;
- }
- if ( uHigh != uLow )
- {
- ppData[uLow] = ppData[uHigh];
- ++uLow;
- }
- while ( (*CompareFunc)( ppData[uLow], pSelData ) < 0
- && uLow != uHigh )
- {
- ++uLow;
- }
- if ( uLow != uHigh )
- {
- ppData[uHigh] = ppData[uLow];
- --uHigh;
- }
- }
- ppData[uLow] = pSelData;
- returnuLow;
- }
- voidQuickSort(void **ppData, UINTuStart, UINTuEnd,
- COMPAREFUNCCompareFunc)
- {
- UINTuMid = Split(ppData, uStart, uEnd, CompareFunc );
- if ( uMid > uStart )
- {
- QuickSort(ppData, uStart, uMid - 1, CompareFunc);
- }
- if ( uEnd > uMid )
- {
- QuickSort(ppData, uMid + 1, uEnd, CompareFunc);
- }
- }
先測試一下這個快速排序算法排一百萬個隨機整數所花的時間:
- voidTest_QuickSort(void)
- {
- UINTi;
- UINTuCount = 1000000; //1000000個
- srand(time(NULL));
- void **pp = (void **)malloc(uCount * sizeof(void *));
- for ( i = 0; i < uCount; i++ )
- {
- pp[i] = (void *)(rand() % uCount);
- }
- clockclock_tt1 = clock();
- QuickSort(pp, 0, uCount-1, UIntCompare);
- clockclock_tt2 = clock();
- printf("QuickSort 1000000 Time %ld/n", t2-t1);
- free(pp);
- }
在雙核CPU2.66GHZ機器上運行測試程序,打印出花費的時間約為406 ms
在單核CPU2.4GHZ機器上運行測試程序,打印出花費時間約為484ms
可見在雙核CPU上運行單任務程序和單核CPU基本是一樣的,效率沒有任何提高。
下面再來把上面的快速排序程序變成并行的,一個簡單的方法就是將要排序的區間分成相同的幾個段,然后對每個段進行快速排序,排序完后再使用歸并算法將排好的幾個區間歸并成一個排好序的表,我們先四個線程來進行排序,代碼如下:
- void ** Merge(void **ppData, UINTuStart, UINTuEnd,
- void **ppData2, UINTuStart2, UINTuEnd2, COMPAREFUNCcfunc)
- {
- UINTi, j, k;
- UINTu1, u2, v1,v2;
- void **pp1;
- void **pp2;
- void **pp = (void **)malloc( (uEnd-uStart+1+uEnd2-uStart2+1) * sizeof(void *));
- if ( pp == NULL )
- {
- returnNULL;
- }
- if ( (*cfunc)(ppData2[uStart2], ppData[uStart]) > 0 )
- {
- u1 = uStart;
- u2 = uEnd;
- v1 = uStart2;
- v2 = uEnd2;
- pp1 = ppData;
- pp2 = ppData2;
- }
- else
- {
- u1 = uStart2;
- u2 = uEnd2;
- v1 = uStart;
- v2 = uEnd;
- pp1 = ppData2;
- pp2 = ppData;
- }
- k = 0;
- pp[k] = pp1[u1];
- j = v1;
- for (i = u1+1; i <= u2; i++ )
- {
- while ( j <= v2 )
- {
- if ( (*cfunc)(pp2[j], pp1[i]) < 0 )
- {
- ++k;
- pp[k] = pp2[j];
- j++;
- }
- else
- {
- break;
- }
- }
- ++k;
- pp[k] = pp1[i];
- }
- if ( j < v2 )
- {
- for ( i = j; i <= v2; i++)
- {
- ++k;
- pp[k] = pp2[i];
- }
- }
- returnpp;
- }
- typedefstructSORTNODE_st {
- void ** ppData;
- UINT uStart;
- UINT uEnd;
- COMPAREFUNCfunc;
- } SORTNODE;
- DWORDWINAPIQuickSort_Thread(void *arg)
- {
- SORTNODE *pNode = (SORTNODE *)arg;
- QuickSort(pNode->ppData, pNode->uStart, pNode->uEnd, pNode->func);
- return 1;
- }
- #define THREAD_COUNT 4
- INTMQuickSort(void **ppData, UINTuStart, UINTuEnd,
- COMPAREFUNCCompareFunc)
- {
- void **pp1;
- void **pp2;
- void **pp3;
- INT i;
- SORTNODE Node[THREAD_COUNT];
- HANDLE hThread[THREAD_COUNT];
- INT nRet = CAPI_FAILED;
- for ( i = 0; i < THREAD_COUNT; i++)
- {
- Node[i].ppData = ppData;
- if ( i == 0 )
- {
- Node[i].uStart = uStart;
- }
- else
- {
- Node[i].uStart = uEnd * i /THREAD_COUNT + 1;
- }
- Node[i].uEnd = uEnd *(i+1) / THREAD_COUNT;
- Node[i].func = CompareFunc;
- hThread[i] = CreateThread(NULL, 0, QuickSort_Thread, &(Node[i]), 0, NULL);
- }
- for ( i = 0; i < THREAD_COUNT; i++ )
- {
- WaitForSingleObject(hThread[i], INFINITE);
- }
- pp1 = Merge(ppData, uStart, uEnd/4, ppData, uEnd/4+1, uEnd/2, CompareFunc);
- pp2 = Merge(ppData, uEnd/2+1, uEnd*3/4, ppData, uEnd*3/4+1, uEnd, CompareFunc);
- if ( pp1 != NULL && pp2 != NULL )
- {
- pp3 = Merge(pp1, 0, uEnd/2-uStart, pp2, 0, uEnd - uEnd/2 - 1, CompareFunc);
- if ( pp3 != NULL )
- {
- UINTi;
- for ( i = uStart; i <= uEnd; i++)
- {
- ppData[i] = pp3[i-uStart];
- }
- free(pp3);
- nRet = CAPI_SUCCESS;
- }
- }
- if( pp1 != NULL)
- {
- free( pp1 );
- }
- if ( pp2 != NULL )
- {
- free( pp2 );
- }
- returnnRet;
- }
#p#
用下面程序來測試一下排1百萬個隨機整數的花費時間:
- voidTest_MQuickSort (void)
- {
- UINTi;
- UINTuCount = 1000000; //1000個
- srand(time(NULL));
- void **pp = (void **)malloc(uCount * sizeof(void *));
- for ( i = 0; i < uCount; i++ )
- {
- pp[i] = (void *)(rand() % uCount);
- }
- clockclock_tt1 = clock();
- INTnRet = MQuickSort(pp, 0, uCount-1, UIntCompare);
- clockclock_tt2 = clock();
- printf("MQuickSort 1000000 Time %ld/n", t2-t1);
- free(pp);
- }
在雙核CPU上運行后,打印出花費的時間為 234 ms , 單任務版的快速排序函數約需406ms左右,并行運行效率為:406/(2×234) = 86.7% 左右。運行速度快了172ms。
可見雙核CPU中,多任務程序速度還是有很大提高的。
當然上面的多任務版的快速排序程序還有很大的改進余地,當對4個區間排好序后,后面的歸并操作都是在一個任務里運行的,對整體效率會產生影響。估計將程序繼續優化后,速度還能再快一些。
原文鏈接:http://blog.csdn.net/drzhouweiming/article/details/1109499