logo

Samanaikainen yhdistäminen Lajittele jaettuun muistiin

Kun annetaan numero 'n' ja a n, lajittele numerot käyttämällä Samanaikainen Yhdistä lajittelu. (Vihje: Yritä käyttää shmget shmat järjestelmäkutsuja).
Osa 1: Algoritmi (MITEN?)  
Tee rekursiivisesti kahdesta aliprosessoinnista yksi vasemmalle puolikkaalle toinen oikeasta puoliskosta. Jos prosessin taulukon elementtien määrä on alle 5, suorita a Lisäys Lajittele . Kahden lapsen vanhempi yhdistää tuloksen ja palaa takaisin vanhemmalle ja niin edelleen. Mutta miten saat sen samanaikaiseksi?
Osa 2: Looginen (MIKSI?)  
Tärkeä osa tämän ongelman ratkaisua ei ole algoritminen, vaan käyttöjärjestelmän ja ytimen käsitteiden selittäminen. 
Samanaikaisen lajittelun saavuttamiseksi tarvitsemme tavan saada kaksi prosessia toimimaan samassa taulukossa samanaikaisesti. Asioiden helpottamiseksi Linux tarjoaa paljon järjestelmäkutsuja yksinkertaisten API-päätepisteiden kautta. Kaksi heistä on shmget() (jaetun muistin varaamiseen) ja shmat() (jaetun muistin toimintoihin). Luomme jaetun muistitilan haaroittamamme lapsiprosessin välille. Jokainen segmentti on jaettu vasempaan ja oikeaan lapsiin, jotka lajitellaan siten, että mielenkiintoinen osa on työskennellyt samanaikaisesti! shmget() pyytää ydintä allokoimaan a jaettu sivu molemmille prosesseille.
Miksi perinteinen fork() ei toimi?  
Vastaus piilee siinä, mitä fork() todella tekee. Dokumentaatiosta 'fork() luo uuden prosessin monistamalla kutsuprosessin'. Lapsiprosessi ja emoprosessi toimivat erillisissä muistitiloissa. Fork():n aikana molemmilla muistitiloilla on sama sisältö. Muisti kirjoittaa tiedosto-descriptor(fd) -muutokset jne., jotka yhden prosessista suorittaa, eivät vaikuta toiseen. Siksi tarvitsemme jaetun muistisegmentin.
 

CPP
#include    #include  #include  #include  #include  #include  #include  #include  void insertionSort(int arr[] int n); void merge(int a[] int l1 int h1 int h2); void mergeSort(int a[] int l int h) {  int i len = (h - l + 1);  // Using insertion sort for small sized array  if (len <= 5)  {  insertionSort(a + l len);  return;  }  pid_t lpid rpid;  lpid = fork();  if (lpid < 0)  {  // Lchild proc not created  perror('Left Child Proc. not createdn');  _exit(-1);  }  else if (lpid == 0)  {  mergeSort(a l l + len / 2 - 1);  _exit(0);  }  else  {  rpid = fork();  if (rpid < 0)  {  // Rchild proc not created  perror('Right Child Proc. not createdn');  _exit(-1);  }  else if (rpid == 0)  {  mergeSort(a l + len / 2 h);  _exit(0);  }  }  int status;  // Wait for child processes to finish  waitpid(lpid &status 0);  waitpid(rpid &status 0);  // Merge the sorted subarrays  merge(a l l + len / 2 - 1 h); } /* Function to sort an array using insertion sort*/ void insertionSort(int arr[] int n) {  int i key j;  for (i = 1; i < n; i++)  {  key = arr[i];  j = i - 1;  /* Move elements of arr[0..i-1] that are  greater than key to one position ahead  of their current position */  while (j >= 0 && arr[j] > key)  {  arr[j + 1] = arr[j];  j = j - 1;  }  arr[j + 1] = key;  } } // Method to merge sorted subarrays void merge(int a[] int l1 int h1 int h2) {  // We can directly copy the sorted elements  // in the final array no need for a temporary  // sorted array.  int count = h2 - l1 + 1;  int sorted[count];  int i = l1 k = h1 + 1 m = 0;  while (i <= h1 && k <= h2)  {  if (a[i] < a[k])  sorted[m++] = a[i++];  else if (a[k] < a[i])  sorted[m++] = a[k++];  else if (a[i] == a[k])  {  sorted[m++] = a[i++];  sorted[m++] = a[k++];  }  }  while (i <= h1)  sorted[m++] = a[i++];  while (k <= h2)  sorted[m++] = a[k++];  int arr_count = l1;  for (i = 0; i < count; i++ l1++)  a[l1] = sorted[i]; } // To check if array is actually sorted or not void isSorted(int arr[] int len) {  if (len == 1)  {  std::cout << 'Sorting Done Successfully' << std::endl;  return;  }  int i;  for (i = 1; i < len; i++)  {  if (arr[i] < arr[i - 1])  {  std::cout << 'Sorting Not Done' << std::endl;  return;  }  }  std::cout << 'Sorting Done Successfully' << std::endl;  return; } // To fill random values in array for testing // purpose void fillData(int a[] int len) {  // Create random arrays  int i;  for (i = 0; i < len; i++)  a[i] = rand();  return; } // Driver code int main() {  int shmid;  key_t key = IPC_PRIVATE;  int *shm_array;  int length = 128;  // Calculate segment length  size_t SHM_SIZE = sizeof(int) * length;  // Create the segment.  if ((shmid = shmget(key SHM_SIZE IPC_CREAT | 0666)) < 0)  {  perror('shmget');  _exit(1);  }  // Now we attach the segment to our data space.  if ((shm_array = (int *)shmat(shmid NULL 0)) == (int *)-1)  {  perror('shmat');  _exit(1);  }  // Create a random array of given length  srand(time(NULL));  fillData(shm_array length);  // Sort the created array  mergeSort(shm_array 0 length - 1);  // Check if array is sorted or not  isSorted(shm_array length);  /* Detach from the shared memory now that we are  done using it. */  if (shmdt(shm_array) == -1)  {  perror('shmdt');  _exit(1);  }  /* Delete the shared memory segment. */  if (shmctl(shmid IPC_RMID NULL) == -1)  {  perror('shmctl');  _exit(1);  }  return 0; } 
Java
import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class ConcurrentMergeSort {  // Method to merge sorted subarrays  private static void merge(int[] a int low int mid int high) {  int[] temp = new int[high - low + 1];  int i = low j = mid + 1 k = 0;  while (i <= mid && j <= high) {  if (a[i] <= a[j]) {  temp[k++] = a[i++];  } else {  temp[k++] = a[j++];  }  }  while (i <= mid) {  temp[k++] = a[i++];  }  while (j <= high) {  temp[k++] = a[j++];  }  System.arraycopy(temp 0 a low temp.length);  }  // RecursiveAction for fork/join framework  static class SortTask extends RecursiveAction {  private final int[] a;  private final int low high;  SortTask(int[] a int low int high) {  this.a = a;  this.low = low;  this.high = high;  }  @Override  protected void compute() {  if (high - low <= 5) {  Arrays.sort(a low high + 1);  } else {  int mid = low + (high - low) / 2;  invokeAll(new SortTask(a low mid) new SortTask(a mid + 1 high));  merge(a low mid high);  }  }  }  // Method to check if array is sorted  private static boolean isSorted(int[] a) {  for (int i = 0; i < a.length - 1; i++) {  if (a[i] > a[i + 1]) {  return false;  }  }  return true;  }  // Method to fill array with random numbers  private static void fillData(int[] a) {  Random rand = new Random();  for (int i = 0; i < a.length; i++) {  a[i] = rand.nextInt();  }  }  public static void main(String[] args) {  int length = 128;  int[] a = new int[length];  fillData(a);  ForkJoinPool pool = new ForkJoinPool();  pool.invoke(new SortTask(a 0 a.length - 1));  if (isSorted(a)) {  System.out.println('Sorting Done Successfully');  } else {  System.out.println('Sorting Not Done');  }  } } 
Python3
import numpy as np import multiprocessing as mp import time def insertion_sort(arr): n = len(arr) for i in range(1 n): key = arr[i] j = i - 1 while j >= 0 and arr[j] > key: arr[j + 1] = arr[j] j -= 1 arr[j + 1] = key def merge(arr l mid r): n1 = mid - l + 1 n2 = r - mid L = arr[l:l + n1].copy() R = arr[mid + 1:mid + 1 + n2].copy() i = j = 0 k = l while i < n1 and j < n2: if L[i] <= R[j]: arr[k] = L[i] i += 1 else: arr[k] = R[j] j += 1 k += 1 while i < n1: arr[k] = L[i] i += 1 k += 1 while j < n2: arr[k] = R[j] j += 1 k += 1 def merge_sort(arr l r): if l < r: if r - l + 1 <= 5: insertion_sort(arr) else: mid = (l + r) // 2 p1 = mp.Process(target=merge_sort args=(arr l mid)) p2 = mp.Process(target=merge_sort args=(arr mid + 1 r)) p1.start() p2.start() p1.join() p2.join() merge(arr l mid r) def is_sorted(arr): for i in range(1 len(arr)): if arr[i] < arr[i - 1]: return False return True def fill_data(arr): np.random.seed(0) arr[:] = np.random.randint(0 1000 size=len(arr)) if __name__ == '__main__': length = 128 shm_array = mp.Array('i' length) fill_data(shm_array) start_time = time.time() merge_sort(shm_array 0 length - 1) end_time = time.time() if is_sorted(shm_array): print('Sorting Done Successfully') else: print('Sorting Not Done') print('Time taken:' end_time - start_time) 
JavaScript
// Importing required modules const { Worker isMainThread parentPort workerData } = require('worker_threads'); // Function to merge sorted subarrays function merge(a low mid high) {  let temp = new Array(high - low + 1);  let i = low j = mid + 1 k = 0;  while (i <= mid && j <= high) {  if (a[i] <= a[j]) {  temp[k++] = a[i++];  } else {  temp[k++] = a[j++];  }  }  while (i <= mid) {  temp[k++] = a[i++];  }  while (j <= high) {  temp[k++] = a[j++];  }  for (let p = 0; p < temp.length; p++) {  a[low + p] = temp[p];  } } // Function to check if array is sorted function isSorted(a) {  for (let i = 0; i < a.length - 1; i++) {  if (a[i] > a[i + 1]) {  return false;  }  }  return true; } // Function to fill array with random numbers function fillData(a) {  for (let i = 0; i < a.length; i++) {  a[i] = Math.floor(Math.random() * 1000);  } } // Function to sort the array using merge sort function sortArray(a low high) {  if (high - low <= 5) {  a.sort((a b) => a - b);  } else {  let mid = low + Math.floor((high - low) / 2);  sortArray(a low mid);  sortArray(a mid + 1 high);  merge(a low mid high);  } } // Main function function main() {  let length = 128;  let a = new Array(length);  fillData(a);  sortArray(a 0 a.length - 1);  if (isSorted(a)) {  console.log('Sorting Done Successfully');  } else {  console.log('Sorting Not Done');  } } main(); 

Lähtö: 
 



Sorting Done Successfully  

Aika monimutkaisuus :O(N log N )

Aputila: O(N)


Suorituskyvyn parannuksia?  
Yritä ajastaa koodia ja vertailla sen suorituskykyä perinteiseen peräkkäiseen koodiin. Yllättyisit, jos tietäisit, että peräkkäinen lajittelu toimii paremmin! 
Kun sanotaan, että vasen lapsi käyttää vasenta taulukkoa, taulukko ladataan prosessorin välimuistiin. Nyt kun oikeaa taulukkoa käytetään (samaaikaisten käyttöjen takia), välimuisti on täytetty, koska välimuisti on täytetty vasemmalla segmentillä ja sitten oikea segmentti kopioidaan välimuistiin. Tämä edestakaisin prosessi jatkuu ja se heikentää suorituskykyä sellaiselle tasolle, että se toimii huonommin kuin peräkkäinen koodi.
On olemassa tapoja vähentää välimuistin puuttumista ohjaamalla koodin työnkulkua. Mutta niitä ei voida täysin välttää!