diff --git a/pom.xml b/pom.xml index 39e811e..cda8208 100644 --- a/pom.xml +++ b/pom.xml @@ -34,6 +34,16 @@ + + org.openjdk.jmh + jmh-core + 1.35 + + + org.openjdk.jmh + jmh-generator-annprocess + 1.35 + diff --git a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java index ec1c8a6..291c8c5 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java @@ -1,52 +1,106 @@ package ru.javaops.masterjava.matrix; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import java.util.stream.IntStream; /** * gkislin * 03.07.2016 */ public class MainMatrix { - private static final int MATRIX_SIZE = 1000; - private static final int THREAD_NUMBER = 10; + public static final int MATRIX_SIZE = 1000; + static final int TASK_SIZE = 100; + public static final int THREAD_NUMBER = 10; + public static final int THRESH = 64; + static int[][] matrixA; + static int[][] matrixB; - private final static ExecutorService executor = Executors.newFixedThreadPool(MainMatrix.THREAD_NUMBER); - - public static void main(String[] args) throws ExecutionException, InterruptedException { - final int[][] matrixA = MatrixUtil.create(MATRIX_SIZE); - final int[][] matrixB = MatrixUtil.create(MATRIX_SIZE); + static { + matrixA = MatrixUtil.create(MATRIX_SIZE); + matrixB = MatrixUtil.create(MATRIX_SIZE); + } + public static void main(String[] args) { double singleThreadSum = 0.; double concurrentThreadSum = 0.; + double concurrentThreadSumCB = 0.; + double concurrentThreadSumFJP = 0.; + double concurrentSumParallelStream = 0.; int count = 1; while (count < 6) { System.out.println("Pass " + count); - long start = System.currentTimeMillis(); + + long start = System.nanoTime(); final int[][] matrixC = MatrixUtil.singleThreadMultiply(matrixA, matrixB); - double duration = (System.currentTimeMillis() - start) / 1000.; - out("Single thread time, sec: %.3f", duration); + double duration = (System.nanoTime() - start) / 1000000.; + out("Single thread time, msec: %,.3f", duration); singleThreadSum += duration; - start = System.currentTimeMillis(); + ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBER); +// ExecutorService executor = Executors.newWorkStealingPool(THREAD_NUMBER); +// ExecutorService executor = Executors.newCachedThreadPool(); + start = System.nanoTime(); final int[][] concurrentMatrixC = MatrixUtil.concurrentMultiply(matrixA, matrixB, executor); - duration = (System.currentTimeMillis() - start) / 1000.; - out("Concurrent thread time, sec: %.3f", duration); + duration = (System.nanoTime() - start) / 1000000.; + out("Concurrent thread time, msec: %,.3f", duration); concurrentThreadSum += duration; + executor.shutdown(); + while (!executor.isTerminated()) ; + + if (MatrixUtil.compare(matrixC, concurrentMatrixC)) { + System.err.println("Comparison (executor) failed"); + break; + } + + CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER); + start = System.nanoTime(); + final int[][] concurrentMatrixCCB = MatrixUtil.concurrentMultiplyCB(matrixA, matrixB, barrier); + duration = (System.nanoTime() - start) / 1000000.; + out("Concurrent CB thread time, msec: %,.3f", duration); + concurrentThreadSumCB += duration; + barrier.reset(); + + if (MatrixUtil.compare(matrixC, concurrentMatrixCCB)) { + System.err.println("Comparison (CyclicBarrier) failed"); + break; + } + + ForkJoinPool fjp = new ForkJoinPool(THREAD_NUMBER); + start = System.nanoTime(); + final int[][] concurrentMatrixFJP = MatrixUtil.concurrentMultiplyFJP(fjp); + duration = (System.nanoTime() - start) / 1000000.; + out("Concurrent ForkJoinPool thread time, msec: %,.3f", duration); + concurrentThreadSumFJP += duration; + fjp.shutdown(); + while (!fjp.isTerminated()) ; + + if (MatrixUtil.compare(matrixC, concurrentMatrixFJP)) { + System.err.println("Comparison (ForkJoinPool) failed"); + break; + } - if (!MatrixUtil.compare(matrixC, concurrentMatrixC)) { - System.err.println("Comparison failed"); + IntStream stream = IntStream.rangeClosed(0, MATRIX_SIZE - 1).parallel().filter(n -> n % TASK_SIZE == 0); + start = System.nanoTime(); + final int[][] concurrentMatrixParallelStream = MatrixUtil.concurrentMultiplyParallelStream(matrixA, matrixB, stream); + duration = (System.nanoTime() - start) / 1000000.; + out("Concurrent concurrentMatrixParralelStream time, msec: %,.3f", duration); + concurrentSumParallelStream += duration; + + if (MatrixUtil.compare(matrixC, concurrentMatrixParallelStream)) { + System.err.println("Comparison (ParallelStream) failed"); break; } + count++; } - executor.shutdown(); - out("\nAverage single thread time, sec: %.3f", singleThreadSum / 5.); - out("Average concurrent thread time, sec: %.3f", concurrentThreadSum / 5.); + out("\nAverage single thread time, msec: %,9.3f", singleThreadSum / 5.); + out("Average concurrent thread (executor) time, msec: %,9.3f", concurrentThreadSum / 5.); + out("Average concurrent thread (CB) time, msec: %,9.3f", concurrentThreadSumCB / 5.); + out("Average concurrent thread (ForkJoinPool) time, msec: %,9.3f", concurrentThreadSumFJP / 5.); + out("Average concurrent thread (ParallelStream) time, msec: %,9.3f", concurrentSumParallelStream / 5.); } - private static void out(String format, double ms) { - System.out.println(String.format(format, ms)); + public static void out(String format, double ms) { + System.out.printf((format) + "%n", ms); } } diff --git a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java index 80a344a..dcd0171 100644 --- a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java +++ b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java @@ -1,8 +1,14 @@ package ru.javaops.masterjava.matrix; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Random; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.*; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static ru.javaops.masterjava.matrix.MainMatrix.*; /** * gkislin @@ -10,27 +16,65 @@ */ public class MatrixUtil { - // TODO implement parallel multiplication matrixA*matrixB - public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, ExecutorService executor) throws InterruptedException, ExecutionException { - final int matrixSize = matrixA.length; - final int[][] matrixC = new int[matrixSize][matrixSize]; + // T_O_D_O implement parallel multiplication matrixA*matrixB + public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, ExecutorService executor) { + int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; + for (int rowA = 0; rowA < MATRIX_SIZE; rowA += TASK_SIZE) { + executor.submit(new Task(matrixA, matrixB, matrixC, rowA)); + } + return matrixC; + } + public static int[][] concurrentMultiplyCB(int[][] matrixA, int[][] matrixB, CyclicBarrier barrier) { + final int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; + List threads = new ArrayList<>(THREAD_NUMBER); + for (int rowA = 0; rowA < MATRIX_SIZE; rowA++) { + threads.add(new TaskCB(matrixA, matrixB, matrixC, rowA, barrier)); + if (threads.size() == THREAD_NUMBER) { + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + threads.clear(); + } + } + return matrixC; + } + + public static int[][] concurrentMultiplyFJP(ForkJoinPool fjp) { + final int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; + TaskFJP task = new TaskFJP(matrixC, 0, MATRIX_SIZE); + fjp.invoke(task); return matrixC; } - // TODO optimize by https://habrahabr.ru/post/114797/ + // T_O_D_O optimize by https://habrahabr.ru/post/114797/ public static int[][] singleThreadMultiply(int[][] matrixA, int[][] matrixB) { final int matrixSize = matrixA.length; final int[][] matrixC = new int[matrixSize][matrixSize]; - for (int i = 0; i < matrixSize; i++) { - for (int j = 0; j < matrixSize; j++) { - int sum = 0; - for (int k = 0; k < matrixSize; k++) { - sum += matrixA[i][k] * matrixB[k][j]; + int[] columnMatrixB = new int[MainMatrix.MATRIX_SIZE]; + try { + for (int i = 0; ; i++) { + for (int k = 0; k < MainMatrix.MATRIX_SIZE; k++) { + columnMatrixB[k] = matrixB[k][i]; + } + for (int j = 0; j < MainMatrix.MATRIX_SIZE; j++) { + int[] rowMatrixA = matrixA[j]; + int sum = 0; + for (int k = 0; k < MainMatrix.MATRIX_SIZE; k++) { + sum += rowMatrixA[k] * columnMatrixB[k]; + } + matrixC[j][i] = sum; } - matrixC[i][j] = sum; } + } catch (IndexOutOfBoundsException ignored) { } return matrixC; } @@ -41,7 +85,7 @@ public static int[][] create(int size) { for (int i = 0; i < size; i++) { for (int j = 0; j < size; j++) { - matrix[i][j] = rn.nextInt(10); + matrix[i][j] = rn.nextInt(6); } } return matrix; @@ -52,10 +96,163 @@ public static boolean compare(int[][] matrixA, int[][] matrixB) { for (int i = 0; i < matrixSize; i++) { for (int j = 0; j < matrixSize; j++) { if (matrixA[i][j] != matrixB[i][j]) { - return false; + return true; + } + } + } + return false; + } + + public static int[][] transpone(int[][] matrix) { + int[][] matrixTransponed = new int[MATRIX_SIZE][MATRIX_SIZE]; + for (int i = 0; i < MATRIX_SIZE; i++) { + for (int j = 0; j < MATRIX_SIZE; j++) { + matrixTransponed[j][i] = matrix[i][j]; + } + } + return matrixTransponed; + } + + private static void calculate(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) { + int[] columnMatrixB = new int[MATRIX_SIZE]; + for (int i = rowA; i < rowA + TASK_SIZE; ++i) { + calcIJ(matrixA, matrixB, matrixC, columnMatrixB, i); + } + } + + private static void calculate(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowFrom, int rowTo) { + int[] columnMatrixB = new int[MATRIX_SIZE]; + for (int row = rowFrom; row < rowTo; row++) { + calcIJ(matrixA, matrixB, matrixC, columnMatrixB, row); + } + } + + static void calcIJ(int[][] matrixA, int[][] matrixB, int[][] matrixC, int[] columnMatrixB, int column) { + for (int k = 0; k < MATRIX_SIZE; k++) { + columnMatrixB[k] = matrixB[k][column]; + } + for (int j = 0; j < MATRIX_SIZE; j++) { + int[] rowMatrixA = matrixA[j]; + int sum = 0; + for (int k = 0; k < MATRIX_SIZE; k++) { + sum += rowMatrixA[k] * columnMatrixB[k]; + } + matrixC[j][column] = sum; + } + } + + private static void calcIJ1(int[][] matrixA, int[][] matrixB, int[][] matrixC, int column) { + int[] columnMatrixB = new int[MATRIX_SIZE]; + for (int k = 0; k < MATRIX_SIZE; k++) { + columnMatrixB[k] = matrixB[k][column]; + } + for (int j = 0; j < MATRIX_SIZE; j++) { + int[] rowMatrixA = matrixA[j]; + int sum = 0; + for (int k = 0; k < MATRIX_SIZE; k++) { + sum += rowMatrixA[k] * columnMatrixB[k]; + } + matrixC[j][column] = sum; + } + } + + private static void calculateCB(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) { + int[] columnMatrixB = new int[MATRIX_SIZE]; + calcIJ(matrixA, matrixB, matrixC, columnMatrixB, rowA); + } + + public static int[][] concurrentMultiplyParallelStream(final int[][] matrixA, final int[][] matrixB, IntStream stream) { + final int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE]; + stream.forEach(row -> { + calculate(matrixA, matrixB, matrixC, row); + }); + return matrixC; + } + + //concurrentMatrixParralelStream time, msec: 17 943,034 + public static int[][] concurrentMultiplyParallelStream1(final int[][] matrixA, final int[][] matrixB) { + return Arrays.stream(matrixA) + .parallel() + .map(AMatrixRow -> IntStream.range(0, matrixB[0].length) + .map(i -> IntStream.range(0, matrixB.length) + .map(j -> AMatrixRow[j] * matrixB[j][i]) + .sum() + ) + .toArray()) + .toArray(int[][]::new); + } + + static class Task implements Runnable { + private final int[][] matrixA; + private final int[][] matrixB; + private final int[][] matrixC; + + private final int rowA; + + public Task(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) { + this.matrixA = matrixA; + this.matrixB = matrixB; + this.matrixC = matrixC; + this.rowA = rowA; + } + + @Override + public void run() { + calculate(matrixA, matrixB, matrixC, rowA); + } + + } + + static class TaskCB extends Thread { + private final int[][] matrixA; + private final int[][] matrixB; + private final int[][] matrixC; + private final int rowA; + + private final CyclicBarrier barrier; + + public TaskCB(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA, CyclicBarrier barrier) { + this.matrixA = matrixA; + this.matrixB = matrixB; + this.matrixC = matrixC; + this.rowA = rowA; + this.barrier = barrier; + } + + @Override + public void run() { + try { + barrier.await(); + calculateCB(matrixA, matrixB, matrixC, rowA); + } catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } + } + } + + static class TaskFJP extends RecursiveAction { + private final int[][] matrixA = MainMatrix.matrixA; + private final int[][] matrixB = MainMatrix.matrixB; + private final int[][] matrixC; + + final int lo, hi; + + TaskFJP(int[][] matrixC, int lo, int hi) { + this.matrixC = matrixC; + this.lo = lo; + this.hi = hi; + } + + protected void compute() { + if (hi - lo <= TASK_SIZE / THRESH) { + for (int i = lo; i < hi; i++) { + calculate(matrixA, matrixB, matrixC, i, hi); } + } else { + int mid = (lo + hi) >>> 1; + invokeAll(new TaskFJP(matrixC, lo, mid), + new TaskFJP(matrixC, mid, hi)); } } - return true; } }