From ebbfbe87e127dc30dacebe87f5881fe70ad76188 Mon Sep 17 00:00:00 2001 From: user Date: Wed, 22 Feb 2023 12:28:47 +0300 Subject: [PATCH 1/4] HW1: implemented parallel multiplication matrixA*matrixB --- .../javaops/masterjava/matrix/MainMatrix.java | 9 ++-- .../javaops/masterjava/matrix/MatrixUtil.java | 41 ++++++++++++++++++- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java index ec1c8a6..6bee4e6 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java @@ -4,15 +4,16 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static ru.javaops.masterjava.matrix.MatrixUtil.MATRIX_SIZE; +import static ru.javaops.masterjava.matrix.MatrixUtil.THREAD_NUMBER; + /** * gkislin * 03.07.2016 */ public class MainMatrix { - private static final int MATRIX_SIZE = 1000; - private static final int THREAD_NUMBER = 10; - private final static ExecutorService executor = Executors.newFixedThreadPool(MainMatrix.THREAD_NUMBER); + private final static ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBER); public static void main(String[] args) throws ExecutionException, InterruptedException { final int[][] matrixA = MatrixUtil.create(MATRIX_SIZE); @@ -47,6 +48,6 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc } private static void out(String format, double ms) { - System.out.println(String.format(format, ms)); + System.out.printf((format) + "%n", ms); } } diff --git a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java index 80a344a..f32f53e 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java @@ -1,20 +1,32 @@ package ru.javaops.masterjava.matrix; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * gkislin * 03.07.2016 */ public class MatrixUtil { + static final int MATRIX_SIZE = 1000; + static final int THREAD_NUMBER = 10; - // TODO implement parallel multiplication matrixA*matrixB + // T_O_D_O implement parallel multiplication matrixA*matrixB public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, ExecutorService executor) throws InterruptedException, ExecutionException { final int matrixSize = matrixA.length; final int[][] matrixC = new int[matrixSize][matrixSize]; - +// List tasks = new ArrayList<>(); +// executor.invokeAll(tasks); + for (int i = 0; i < MATRIX_SIZE; i++) { + for (int j = 0; j < MATRIX_SIZE; j++) { + executor.submit(new Task(matrixA, matrixB, matrixC, i, j)); + } + } + while (((ThreadPoolExecutor) executor).getQueue().size() > 0) ; return matrixC; } @@ -58,4 +70,29 @@ public static boolean compare(int[][] matrixA, int[][] matrixB) { } return true; } + + static class Task implements Runnable { + private final int[][] matrixA; + private final int[][] matrixB; + private final int[][] matrixC; + private final int i; + private final int j; + + public Task(int[][] matrixA, int[][] matrixB, int[][] matrixC, int i, int j) { + this.matrixA = matrixA; + this.matrixB = matrixB; + this.matrixC = matrixC; + this.i = i; + this.j = j; + } + + @Override + public void run() { + int sum = 0; + for (int k = 0; k < matrixA.length; k++) { + sum += matrixA[i][k] * matrixB[k][j]; + } + matrixC[i][j] = sum; + } + } } From 294c9f1bbdf0826e494752a8d242efa7cbc3868f Mon Sep 17 00:00:00 2001 From: user Date: Fri, 24 Feb 2023 09:04:43 +0300 Subject: [PATCH 2/4] HW1: implemented parallel multiplication matrixA*matrixB --- .../javaops/masterjava/matrix/MainMatrix.java | 17 ++++-- .../javaops/masterjava/matrix/MatrixUtil.java | 53 ++++++++++++++----- 2 files changed, 53 insertions(+), 17 deletions(-) diff --git a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java index 6bee4e6..6dfafb3 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java @@ -4,20 +4,25 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static ru.javaops.masterjava.matrix.MatrixUtil.MATRIX_SIZE; -import static ru.javaops.masterjava.matrix.MatrixUtil.THREAD_NUMBER; - /** * gkislin * 03.07.2016 */ public class MainMatrix { + static final int MATRIX_SIZE = 1000; + static final int THREAD_NUMBER = 10; + static final int NUMBER_OF_TASKS = 1; + static int[][] matrixA; + static int[][] matrixB; private final static ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBER); + static{ + matrixA = MatrixUtil.create(MATRIX_SIZE); + matrixB = MatrixUtil.create(MATRIX_SIZE); + } + public static void main(String[] args) throws ExecutionException, InterruptedException { - final int[][] matrixA = MatrixUtil.create(MATRIX_SIZE); - final int[][] matrixB = MatrixUtil.create(MATRIX_SIZE); double singleThreadSum = 0.; double concurrentThreadSum = 0.; @@ -36,6 +41,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc out("Concurrent thread time, sec: %.3f", duration); concurrentThreadSum += duration; + Thread.sleep(0, 1); + if (!MatrixUtil.compare(matrixC, concurrentMatrixC)) { System.err.println("Comparison failed"); break; diff --git a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java index f32f53e..a7641d9 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java @@ -7,24 +7,31 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import static ru.javaops.masterjava.matrix.MainMatrix.MATRIX_SIZE; +import static ru.javaops.masterjava.matrix.MainMatrix.NUMBER_OF_TASKS; + /** * gkislin * 03.07.2016 */ public class MatrixUtil { - static final int MATRIX_SIZE = 1000; - static final int THREAD_NUMBER = 10; // T_O_D_O implement parallel multiplication matrixA*matrixB public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, ExecutorService executor) throws InterruptedException, ExecutionException { final int matrixSize = matrixA.length; final int[][] matrixC = new int[matrixSize][matrixSize]; -// List tasks = new ArrayList<>(); -// executor.invokeAll(tasks); - for (int i = 0; i < MATRIX_SIZE; i++) { +/* + List tasks = new ArrayList<>(); + tasks.add(new Task(matrixA, matrixB, matrixC, i + n, j)); + executor.invokeAll(tasks); +*/ + for (int i = 0; i < MATRIX_SIZE; i += NUMBER_OF_TASKS) { for (int j = 0; j < MATRIX_SIZE; j++) { - executor.submit(new Task(matrixA, matrixB, matrixC, i, j)); + for (int n = 0; i + n < MATRIX_SIZE && n < NUMBER_OF_TASKS; n++) { + executor.submit(new Task(matrixA, matrixB, matrixC, i + n, j)); + } } + } while (((ThreadPoolExecutor) executor).getQueue().size() > 0) ; return matrixC; @@ -34,15 +41,24 @@ public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, Execu public static int[][] singleThreadMultiply(int[][] matrixA, int[][] matrixB) { final int matrixSize = matrixA.length; final int[][] matrixC = new int[matrixSize][matrixSize]; + final int[][] matrixBTransponed = transpone(matrixB); - for (int i = 0; i < matrixSize; i++) { - for (int j = 0; j < matrixSize; j++) { - int sum = 0; - for (int k = 0; k < matrixSize; k++) { - sum += matrixA[i][k] * matrixB[k][j]; + int[] columnMatrixB = new int[MATRIX_SIZE]; + try { + for (int i = 0; ; i++) { + for (int k = 0; k < MATRIX_SIZE; k++) { + columnMatrixB[k] = matrixB[k][i]; + } + for (int j = 0; j < MATRIX_SIZE; j++) { + int[] rowMatrixA = matrixA[j]; + int sum = 0; + for (int k = 0; k < MATRIX_SIZE; k++) { + sum += rowMatrixA[k] * columnMatrixB[k]; + } + matrixC[j][i] = sum; } - matrixC[i][j] = sum; } + } catch (IndexOutOfBoundsException ignored) { } return matrixC; } @@ -64,6 +80,7 @@ public static boolean compare(int[][] matrixA, int[][] matrixB) { for (int i = 0; i < matrixSize; i++) { for (int j = 0; j < matrixSize; j++) { if (matrixA[i][j] != matrixB[i][j]) { + System.out.printf("matrixA[%d][%d] = %d != matrixB[%d][%d] = %d", i, j, matrixA[i][j], i, j, matrixB[i][j]); return false; } } @@ -71,6 +88,16 @@ public static boolean compare(int[][] matrixA, int[][] matrixB) { return true; } + public static int[][] transpone(int[][] matrix) { + int[][] matrixTransponed = new int[MATRIX_SIZE][MATRIX_SIZE]; + for (int i = 0; i < MATRIX_SIZE; i++) { + for (int j = 0; j < MATRIX_SIZE; j++) { + matrixTransponed[j][i] = matrix[i][j]; + } + } + return matrixTransponed; + } + static class Task implements Runnable { private final int[][] matrixA; private final int[][] matrixB; @@ -88,11 +115,13 @@ public Task(int[][] matrixA, int[][] matrixB, int[][] matrixC, int i, int j) { @Override public void run() { +// System.out.printf("[%d][%d] Thread %20s started : %d%n", i, j, Thread.currentThread().getName(), System.nanoTime()); int sum = 0; for (int k = 0; k < matrixA.length; k++) { sum += matrixA[i][k] * matrixB[k][j]; } matrixC[i][j] = sum; +// System.out.printf("[%d][%d] Thread %20s finished: %d%n", i, j, Thread.currentThread().getName(), System.nanoTime()); } } } From 67fa19e9ef78b12318b6c54e05d0c22fc4ce7224 Mon Sep 17 00:00:00 2001 From: user Date: Fri, 3 Mar 2023 18:44:45 +0300 Subject: [PATCH 3/4] HW1: implemented parallel multiplication matrixA*matrixB ThreadPoolExecutor + CyclicBarrier + ForkJoinPool --- .../javaops/masterjava/matrix/MainMatrix.java | 72 +++++--- .../javaops/masterjava/matrix/MatrixUtil.java | 169 ++++++++++++++---- 2 files changed, 183 insertions(+), 58 deletions(-) diff --git a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java index 6dfafb3..59757e6 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java @@ -1,8 +1,6 @@ package ru.javaops.masterjava.matrix; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; /** * gkislin @@ -10,48 +8,80 @@ */ public class MainMatrix { static final int MATRIX_SIZE = 1000; + static final int TASK_SIZE = 100; static final int THREAD_NUMBER = 10; - static final int NUMBER_OF_TASKS = 1; static int[][] matrixA; static int[][] matrixB; - private final static ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBER); - - static{ + static { matrixA = MatrixUtil.create(MATRIX_SIZE); matrixB = MatrixUtil.create(MATRIX_SIZE); } - public static void main(String[] args) throws ExecutionException, InterruptedException { - + public static void main(String[] args) { double singleThreadSum = 0.; double concurrentThreadSum = 0.; + double concurrentThreadSumCB = 0.; + double concurrentThreadSumFJP = 0.; int count = 1; while (count < 6) { System.out.println("Pass " + count); - long start = System.currentTimeMillis(); + + long start = System.nanoTime(); final int[][] matrixC = MatrixUtil.singleThreadMultiply(matrixA, matrixB); - double duration = (System.currentTimeMillis() - start) / 1000.; - out("Single thread time, sec: %.3f", duration); + double duration = (System.nanoTime() - start) / 1000000.; + out("Single thread time, msec: %,.3f", duration); singleThreadSum += duration; - start = System.currentTimeMillis(); + ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBER); +// ExecutorService executor = Executors.newWorkStealingPool(THREAD_NUMBER); +// ExecutorService executor = Executors.newCachedThreadPool(); + start = System.nanoTime(); final int[][] concurrentMatrixC = MatrixUtil.concurrentMultiply(matrixA, matrixB, executor); - duration = (System.currentTimeMillis() - start) / 1000.; - out("Concurrent thread time, sec: %.3f", duration); + duration = (System.nanoTime() - start) / 1000000.; + out("Concurrent thread time, msec: %,.3f", duration); concurrentThreadSum += duration; + executor.shutdown(); + while (!executor.isTerminated()) ; + + if (MatrixUtil.compare(matrixC, concurrentMatrixC)) { + System.err.println("Comparison (executor) failed"); + break; + } - Thread.sleep(0, 1); + CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER); + start = System.nanoTime(); + final int[][] concurrentMatrixCCB = MatrixUtil.concurrentMultiplyCB(matrixA, matrixB, barrier); + duration = (System.nanoTime() - start) / 1000000.; + out("Concurrent CB thread time, msec: %,.3f", duration); + concurrentThreadSumCB += duration; + barrier.reset(); - if (!MatrixUtil.compare(matrixC, concurrentMatrixC)) { - System.err.println("Comparison failed"); + if (MatrixUtil.compare(matrixC, concurrentMatrixCCB)) { + System.err.println("Comparison (CyclicBarrier) failed"); break; } + + ForkJoinPool fjp = new ForkJoinPool(THREAD_NUMBER); + start = System.nanoTime(); + final int[][] concurrentMatrixFJP = MatrixUtil.concurrentMultiplyFJP(matrixA, matrixB, fjp); + duration = (System.nanoTime() - start) / 1000000.; + out("Concurrent ForkJoinPool thread time, msec: %,.3f", duration); + concurrentThreadSumFJP += duration; + fjp.shutdown(); + while(!fjp.isTerminated()); + + if (MatrixUtil.compare(matrixC, concurrentMatrixFJP)) { + System.err.println("Comparison (ForkJoinPool) failed"); + break; + } + count++; } - executor.shutdown(); - out("\nAverage single thread time, sec: %.3f", singleThreadSum / 5.); - out("Average concurrent thread time, sec: %.3f", concurrentThreadSum / 5.); + out("\nAverage single thread time, msec: %,9.3f", singleThreadSum / 5.); + out("Average concurrent thread (executor) time, msec: %,9.3f", concurrentThreadSum / 5.); + out("Average concurrent thread (CB) time, msec: %,9.3f", concurrentThreadSumCB / 5.); + out("Average concurrent thread (ForkJoinPool) time, msec: %,9.3f", concurrentThreadSumFJP / 5.); } private static void out(String format, double ms) { diff --git a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java index a7641d9..11bf561 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java @@ -3,12 +3,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.*; -import static ru.javaops.masterjava.matrix.MainMatrix.MATRIX_SIZE; -import static ru.javaops.masterjava.matrix.MainMatrix.NUMBER_OF_TASKS; +import static ru.javaops.masterjava.matrix.MainMatrix.*; /** * gkislin @@ -17,31 +14,49 @@ public class MatrixUtil { // T_O_D_O implement parallel multiplication matrixA*matrixB - public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, ExecutorService executor) throws InterruptedException, ExecutionException { - final int matrixSize = matrixA.length; - final int[][] matrixC = new int[matrixSize][matrixSize]; -/* - List tasks = new ArrayList<>(); - tasks.add(new Task(matrixA, matrixB, matrixC, i + n, j)); - executor.invokeAll(tasks); -*/ - for (int i = 0; i < MATRIX_SIZE; i += NUMBER_OF_TASKS) { - for (int j = 0; j < MATRIX_SIZE; j++) { - for (int n = 0; i + n < MATRIX_SIZE && n < NUMBER_OF_TASKS; n++) { - executor.submit(new Task(matrixA, matrixB, matrixC, i + n, j)); + public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, ExecutorService executor) { + int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; + for (int rowA = 0; rowA < MATRIX_SIZE; rowA += TASK_SIZE) { + executor.submit(new Task(matrixA, matrixB, matrixC, rowA)); + } + return matrixC; + } + + public static int[][] concurrentMultiplyCB(int[][] matrixA, int[][] matrixB, CyclicBarrier barrier) { + int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; + List threads = new ArrayList<>(THREAD_NUMBER); + for (int rowA = 0; rowA < MATRIX_SIZE; rowA++) { + threads.add(new TaskCB(matrixA, matrixB, matrixC, rowA, barrier)); + if (threads.size() == THREAD_NUMBER) { + for (Thread thread : threads) { + thread.start(); } + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + threads.clear(); } - } - while (((ThreadPoolExecutor) executor).getQueue().size() > 0) ; return matrixC; } - // TODO optimize by https://habrahabr.ru/post/114797/ + private static int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; + + public static int[][] concurrentMultiplyFJP(int[][] matrixA, int[][] matrixB, ForkJoinPool fjp) { + int[][] matrixC = MatrixUtil.matrixC; + TaskFJP task = new TaskFJP(matrixC, 0, MATRIX_SIZE); + fjp.invoke(task); + return matrixC; + } + // T_O_D_O optimize by https://habrahabr.ru/post/114797/ + public static int[][] singleThreadMultiply(int[][] matrixA, int[][] matrixB) { final int matrixSize = matrixA.length; final int[][] matrixC = new int[matrixSize][matrixSize]; - final int[][] matrixBTransponed = transpone(matrixB); int[] columnMatrixB = new int[MATRIX_SIZE]; try { @@ -69,7 +84,7 @@ public static int[][] create(int size) { for (int i = 0; i < size; i++) { for (int j = 0; j < size; j++) { - matrix[i][j] = rn.nextInt(10); + matrix[i][j] = rn.nextInt(6); } } return matrix; @@ -80,12 +95,12 @@ public static boolean compare(int[][] matrixA, int[][] matrixB) { for (int i = 0; i < matrixSize; i++) { for (int j = 0; j < matrixSize; j++) { if (matrixA[i][j] != matrixB[i][j]) { - System.out.printf("matrixA[%d][%d] = %d != matrixB[%d][%d] = %d", i, j, matrixA[i][j], i, j, matrixB[i][j]); - return false; + System.out.printf("matrix[%d][%d] = %d != concurrentMatrix[%d][%d] = %d", i, j, matrixA[i][j], i, j, matrixB[i][j]); + return true; } } } - return true; + return false; } public static int[][] transpone(int[][] matrix) { @@ -98,30 +113,110 @@ public static int[][] transpone(int[][] matrix) { return matrixTransponed; } - static class Task implements Runnable { + private static void calculate(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) { + int[] columnMatrixB = new int[MATRIX_SIZE]; + for (int i = rowA; i < rowA + TASK_SIZE; ++i) { + calcIJ(matrixA, matrixB, matrixC, columnMatrixB, i); + } + } + + private static void calculate(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA, int r) { + int[] columnMatrixB = new int[MATRIX_SIZE]; + for (int i = rowA; i < r; i++) { + calcIJ(matrixA, matrixB, matrixC, columnMatrixB, i); + } + } + + private static void calculateCB(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) { + int[] columnMatrixB = new int[MATRIX_SIZE]; + calcIJ(matrixA, matrixB, matrixC, columnMatrixB, rowA); + } + + private static void calcIJ(int[][] matrixA, int[][] matrixB, int[][] matrixC, int[] columnMatrixB, int column) { + for (int k = 0; k < MATRIX_SIZE; k++) { + columnMatrixB[k] = matrixB[k][column]; + } + for (int j = 0; j < MATRIX_SIZE; j++) { + int[] rowMatrixA = matrixA[j]; + int sum = 0; + for (int k = 0; k < MATRIX_SIZE; k++) { + sum += rowMatrixA[k] * columnMatrixB[k]; + } + matrixC[j][column] = sum; + } + } + + private static class Task implements Runnable { private final int[][] matrixA; private final int[][] matrixB; private final int[][] matrixC; - private final int i; - private final int j; - public Task(int[][] matrixA, int[][] matrixB, int[][] matrixC, int i, int j) { + private final int rowA; + + public Task(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) { this.matrixA = matrixA; this.matrixB = matrixB; this.matrixC = matrixC; - this.i = i; - this.j = j; + this.rowA = rowA; } @Override public void run() { -// System.out.printf("[%d][%d] Thread %20s started : %d%n", i, j, Thread.currentThread().getName(), System.nanoTime()); - int sum = 0; - for (int k = 0; k < matrixA.length; k++) { - sum += matrixA[i][k] * matrixB[k][j]; + calculate(matrixA, matrixB, matrixC, rowA); + } + + } + + private static class TaskCB extends Thread { + private final int[][] matrixA; + private final int[][] matrixB; + private final int[][] matrixC; + private final int rowA; + + private final CyclicBarrier barrier; + + public TaskCB(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA, CyclicBarrier barrier) { + this.matrixA = matrixA; + this.matrixB = matrixB; + this.matrixC = matrixC; + this.rowA = rowA; + this.barrier = barrier; + } + + @Override + public void run() { + try { + barrier.await(); + calculateCB(matrixA, matrixB, matrixC, rowA); + } catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } + } + } + + private static class TaskFJP extends RecursiveAction { + private final int[][] matrixA = MainMatrix.matrixA; + private final int[][] matrixB = MainMatrix.matrixB; + private final int[][] matrixC; + + final int lo, hi; + + TaskFJP(int[][] matrixC, int lo, int hi) { + this.matrixC = matrixC; + this.lo = lo; + this.hi = hi; + } + + protected void compute() { + if (hi - lo <= TASK_SIZE / 64) { + for (int i = lo; i < hi; i++) { + calculate(matrixA, matrixB, matrixC, i, hi); + } + } else { + int mid = (lo + hi) >>> 1; + invokeAll(new TaskFJP(matrixC, lo, mid), + new TaskFJP(matrixC, mid, hi)); } - matrixC[i][j] = sum; -// System.out.printf("[%d][%d] Thread %20s finished: %d%n", i, j, Thread.currentThread().getName(), System.nanoTime()); } } } From 0e49493d6465118ee1785a6f5993c91cd0ab65d0 Mon Sep 17 00:00:00 2001 From: user Date: Sun, 2 Apr 2023 21:57:39 +0300 Subject: [PATCH 4/4] HW1: implemented parallel multiplication matrixA*matrixB ThreadPoolExecutor + CyclicBarrier + ForkJoinPool+parallelStream --- pom.xml | 10 +++ .../javaops/masterjava/matrix/MainMatrix.java | 26 ++++-- .../javaops/masterjava/matrix/MatrixUtil.java | 80 ++++++++++++++----- 3 files changed, 89 insertions(+), 27 deletions(-) diff --git a/pom.xml b/pom.xml index 39e811e..cda8208 100644 --- a/pom.xml +++ b/pom.xml @@ -34,6 +34,16 @@ + + org.openjdk.jmh + jmh-core + 1.35 + + + org.openjdk.jmh + jmh-generator-annprocess + 1.35 + diff --git a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java index 59757e6..291c8c5 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java @@ -1,15 +1,17 @@ package ru.javaops.masterjava.matrix; import java.util.concurrent.*; +import java.util.stream.IntStream; /** * gkislin * 03.07.2016 */ public class MainMatrix { - static final int MATRIX_SIZE = 1000; + public static final int MATRIX_SIZE = 1000; static final int TASK_SIZE = 100; - static final int THREAD_NUMBER = 10; + public static final int THREAD_NUMBER = 10; + public static final int THRESH = 64; static int[][] matrixA; static int[][] matrixB; @@ -23,6 +25,7 @@ public static void main(String[] args) { double concurrentThreadSum = 0.; double concurrentThreadSumCB = 0.; double concurrentThreadSumFJP = 0.; + double concurrentSumParallelStream = 0.; int count = 1; while (count < 6) { System.out.println("Pass " + count); @@ -64,27 +67,40 @@ public static void main(String[] args) { ForkJoinPool fjp = new ForkJoinPool(THREAD_NUMBER); start = System.nanoTime(); - final int[][] concurrentMatrixFJP = MatrixUtil.concurrentMultiplyFJP(matrixA, matrixB, fjp); + final int[][] concurrentMatrixFJP = MatrixUtil.concurrentMultiplyFJP(fjp); duration = (System.nanoTime() - start) / 1000000.; out("Concurrent ForkJoinPool thread time, msec: %,.3f", duration); concurrentThreadSumFJP += duration; fjp.shutdown(); - while(!fjp.isTerminated()); + while (!fjp.isTerminated()) ; if (MatrixUtil.compare(matrixC, concurrentMatrixFJP)) { System.err.println("Comparison (ForkJoinPool) failed"); break; } + IntStream stream = IntStream.rangeClosed(0, MATRIX_SIZE - 1).parallel().filter(n -> n % TASK_SIZE == 0); + start = System.nanoTime(); + final int[][] concurrentMatrixParallelStream = MatrixUtil.concurrentMultiplyParallelStream(matrixA, matrixB, stream); + duration = (System.nanoTime() - start) / 1000000.; + out("Concurrent concurrentMatrixParralelStream time, msec: %,.3f", duration); + concurrentSumParallelStream += duration; + + if (MatrixUtil.compare(matrixC, concurrentMatrixParallelStream)) { + System.err.println("Comparison (ParallelStream) failed"); + break; + } + count++; } out("\nAverage single thread time, msec: %,9.3f", singleThreadSum / 5.); out("Average concurrent thread (executor) time, msec: %,9.3f", concurrentThreadSum / 5.); out("Average concurrent thread (CB) time, msec: %,9.3f", concurrentThreadSumCB / 5.); out("Average concurrent thread (ForkJoinPool) time, msec: %,9.3f", concurrentThreadSumFJP / 5.); + out("Average concurrent thread (ParallelStream) time, msec: %,9.3f", concurrentSumParallelStream / 5.); } - private static void out(String format, double ms) { + public static void out(String format, double ms) { System.out.printf((format) + "%n", ms); } } diff --git a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java index 11bf561..dcd0171 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java @@ -1,9 +1,12 @@ package ru.javaops.masterjava.matrix; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.*; +import java.util.stream.IntStream; +import java.util.stream.LongStream; import static ru.javaops.masterjava.matrix.MainMatrix.*; @@ -23,7 +26,7 @@ public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, Execu } public static int[][] concurrentMultiplyCB(int[][] matrixA, int[][] matrixB, CyclicBarrier barrier) { - int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; + final int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; List threads = new ArrayList<>(THREAD_NUMBER); for (int rowA = 0; rowA < MATRIX_SIZE; rowA++) { threads.add(new TaskCB(matrixA, matrixB, matrixC, rowA, barrier)); @@ -44,30 +47,28 @@ public static int[][] concurrentMultiplyCB(int[][] matrixA, int[][] matrixB, Cyc return matrixC; } - private static int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; - - public static int[][] concurrentMultiplyFJP(int[][] matrixA, int[][] matrixB, ForkJoinPool fjp) { - int[][] matrixC = MatrixUtil.matrixC; + public static int[][] concurrentMultiplyFJP(ForkJoinPool fjp) { + final int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; TaskFJP task = new TaskFJP(matrixC, 0, MATRIX_SIZE); fjp.invoke(task); return matrixC; } - // T_O_D_O optimize by https://habrahabr.ru/post/114797/ + // T_O_D_O optimize by https://habrahabr.ru/post/114797/ public static int[][] singleThreadMultiply(int[][] matrixA, int[][] matrixB) { final int matrixSize = matrixA.length; final int[][] matrixC = new int[matrixSize][matrixSize]; - int[] columnMatrixB = new int[MATRIX_SIZE]; + int[] columnMatrixB = new int[MainMatrix.MATRIX_SIZE]; try { for (int i = 0; ; i++) { - for (int k = 0; k < MATRIX_SIZE; k++) { + for (int k = 0; k < MainMatrix.MATRIX_SIZE; k++) { columnMatrixB[k] = matrixB[k][i]; } - for (int j = 0; j < MATRIX_SIZE; j++) { + for (int j = 0; j < MainMatrix.MATRIX_SIZE; j++) { int[] rowMatrixA = matrixA[j]; int sum = 0; - for (int k = 0; k < MATRIX_SIZE; k++) { + for (int k = 0; k < MainMatrix.MATRIX_SIZE; k++) { sum += rowMatrixA[k] * columnMatrixB[k]; } matrixC[j][i] = sum; @@ -95,7 +96,6 @@ public static boolean compare(int[][] matrixA, int[][] matrixB) { for (int i = 0; i < matrixSize; i++) { for (int j = 0; j < matrixSize; j++) { if (matrixA[i][j] != matrixB[i][j]) { - System.out.printf("matrix[%d][%d] = %d != concurrentMatrix[%d][%d] = %d", i, j, matrixA[i][j], i, j, matrixB[i][j]); return true; } } @@ -120,19 +120,29 @@ private static void calculate(int[][] matrixA, int[][] matrixB, int[][] matrixC, } } - private static void calculate(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA, int r) { + private static void calculate(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowFrom, int rowTo) { int[] columnMatrixB = new int[MATRIX_SIZE]; - for (int i = rowA; i < r; i++) { - calcIJ(matrixA, matrixB, matrixC, columnMatrixB, i); + for (int row = rowFrom; row < rowTo; row++) { + calcIJ(matrixA, matrixB, matrixC, columnMatrixB, row); } } - private static void calculateCB(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) { - int[] columnMatrixB = new int[MATRIX_SIZE]; - calcIJ(matrixA, matrixB, matrixC, columnMatrixB, rowA); + static void calcIJ(int[][] matrixA, int[][] matrixB, int[][] matrixC, int[] columnMatrixB, int column) { + for (int k = 0; k < MATRIX_SIZE; k++) { + columnMatrixB[k] = matrixB[k][column]; + } + for (int j = 0; j < MATRIX_SIZE; j++) { + int[] rowMatrixA = matrixA[j]; + int sum = 0; + for (int k = 0; k < MATRIX_SIZE; k++) { + sum += rowMatrixA[k] * columnMatrixB[k]; + } + matrixC[j][column] = sum; + } } - private static void calcIJ(int[][] matrixA, int[][] matrixB, int[][] matrixC, int[] columnMatrixB, int column) { + private static void calcIJ1(int[][] matrixA, int[][] matrixB, int[][] matrixC, int column) { + int[] columnMatrixB = new int[MATRIX_SIZE]; for (int k = 0; k < MATRIX_SIZE; k++) { columnMatrixB[k] = matrixB[k][column]; } @@ -146,7 +156,33 @@ private static void calcIJ(int[][] matrixA, int[][] matrixB, int[][] matrixC, in } } - private static class Task implements Runnable { + private static void calculateCB(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) { + int[] columnMatrixB = new int[MATRIX_SIZE]; + calcIJ(matrixA, matrixB, matrixC, columnMatrixB, rowA); + } + + public static int[][] concurrentMultiplyParallelStream(final int[][] matrixA, final int[][] matrixB, IntStream stream) { + final int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; + stream.forEach(row -> { + calculate(matrixA, matrixB, matrixC, row); + }); + return matrixC; + } + + //concurrentMatrixParralelStream time, msec: 17 943,034 + public static int[][] concurrentMultiplyParallelStream1(final int[][] matrixA, final int[][] matrixB) { + return Arrays.stream(matrixA) + .parallel() + .map(AMatrixRow -> IntStream.range(0, matrixB[0].length) + .map(i -> IntStream.range(0, matrixB.length) + .map(j -> AMatrixRow[j] * matrixB[j][i]) + .sum() + ) + .toArray()) + .toArray(int[][]::new); + } + + static class Task implements Runnable { private final int[][] matrixA; private final int[][] matrixB; private final int[][] matrixC; @@ -167,7 +203,7 @@ public void run() { } - private static class TaskCB extends Thread { + static class TaskCB extends Thread { private final int[][] matrixA; private final int[][] matrixB; private final int[][] matrixC; @@ -194,7 +230,7 @@ public void run() { } } - private static class TaskFJP extends RecursiveAction { + static class TaskFJP extends RecursiveAction { private final int[][] matrixA = MainMatrix.matrixA; private final int[][] matrixB = MainMatrix.matrixB; private final int[][] matrixC; @@ -208,7 +244,7 @@ private static class TaskFJP extends RecursiveAction { } protected void compute() { - if (hi - lo <= TASK_SIZE / 64) { + if (hi - lo <= TASK_SIZE / THRESH) { for (int i = lo; i < hi; i++) { calculate(matrixA, matrixB, matrixC, i, hi); }