diff --git a/pom.xml b/pom.xml
index 39e811e..cda8208 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,6 +34,16 @@
+
+ org.openjdk.jmh
+ jmh-core
+ 1.35
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ 1.35
+
diff --git a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java
index ec1c8a6..291c8c5 100644
--- a/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java
+++ b/src/main/java/ru/javaops/masterjava/matrix/MainMatrix.java
@@ -1,52 +1,106 @@
package ru.javaops.masterjava.matrix;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
+import java.util.stream.IntStream;
/**
* gkislin
* 03.07.2016
*/
public class MainMatrix {
- private static final int MATRIX_SIZE = 1000;
- private static final int THREAD_NUMBER = 10;
+ public static final int MATRIX_SIZE = 1000;
+ static final int TASK_SIZE = 100;
+ public static final int THREAD_NUMBER = 10;
+ public static final int THRESH = 64;
+ static int[][] matrixA;
+ static int[][] matrixB;
- private final static ExecutorService executor = Executors.newFixedThreadPool(MainMatrix.THREAD_NUMBER);
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- final int[][] matrixA = MatrixUtil.create(MATRIX_SIZE);
- final int[][] matrixB = MatrixUtil.create(MATRIX_SIZE);
+ static {
+ matrixA = MatrixUtil.create(MATRIX_SIZE);
+ matrixB = MatrixUtil.create(MATRIX_SIZE);
+ }
+ public static void main(String[] args) {
double singleThreadSum = 0.;
double concurrentThreadSum = 0.;
+ double concurrentThreadSumCB = 0.;
+ double concurrentThreadSumFJP = 0.;
+ double concurrentSumParallelStream = 0.;
int count = 1;
while (count < 6) {
System.out.println("Pass " + count);
- long start = System.currentTimeMillis();
+
+ long start = System.nanoTime();
final int[][] matrixC = MatrixUtil.singleThreadMultiply(matrixA, matrixB);
- double duration = (System.currentTimeMillis() - start) / 1000.;
- out("Single thread time, sec: %.3f", duration);
+ double duration = (System.nanoTime() - start) / 1000000.;
+ out("Single thread time, msec: %,.3f", duration);
singleThreadSum += duration;
- start = System.currentTimeMillis();
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBER);
+// ExecutorService executor = Executors.newWorkStealingPool(THREAD_NUMBER);
+// ExecutorService executor = Executors.newCachedThreadPool();
+ start = System.nanoTime();
final int[][] concurrentMatrixC = MatrixUtil.concurrentMultiply(matrixA, matrixB, executor);
- duration = (System.currentTimeMillis() - start) / 1000.;
- out("Concurrent thread time, sec: %.3f", duration);
+ duration = (System.nanoTime() - start) / 1000000.;
+ out("Concurrent thread time, msec: %,.3f", duration);
concurrentThreadSum += duration;
+ executor.shutdown();
+ while (!executor.isTerminated()) ;
+
+ if (MatrixUtil.compare(matrixC, concurrentMatrixC)) {
+ System.err.println("Comparison (executor) failed");
+ break;
+ }
+
+ CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER);
+ start = System.nanoTime();
+ final int[][] concurrentMatrixCCB = MatrixUtil.concurrentMultiplyCB(matrixA, matrixB, barrier);
+ duration = (System.nanoTime() - start) / 1000000.;
+ out("Concurrent CB thread time, msec: %,.3f", duration);
+ concurrentThreadSumCB += duration;
+ barrier.reset();
+
+ if (MatrixUtil.compare(matrixC, concurrentMatrixCCB)) {
+ System.err.println("Comparison (CyclicBarrier) failed");
+ break;
+ }
+
+ ForkJoinPool fjp = new ForkJoinPool(THREAD_NUMBER);
+ start = System.nanoTime();
+ final int[][] concurrentMatrixFJP = MatrixUtil.concurrentMultiplyFJP(fjp);
+ duration = (System.nanoTime() - start) / 1000000.;
+ out("Concurrent ForkJoinPool thread time, msec: %,.3f", duration);
+ concurrentThreadSumFJP += duration;
+ fjp.shutdown();
+ while (!fjp.isTerminated()) ;
+
+ if (MatrixUtil.compare(matrixC, concurrentMatrixFJP)) {
+ System.err.println("Comparison (ForkJoinPool) failed");
+ break;
+ }
- if (!MatrixUtil.compare(matrixC, concurrentMatrixC)) {
- System.err.println("Comparison failed");
+ IntStream stream = IntStream.rangeClosed(0, MATRIX_SIZE - 1).parallel().filter(n -> n % TASK_SIZE == 0);
+ start = System.nanoTime();
+ final int[][] concurrentMatrixParallelStream = MatrixUtil.concurrentMultiplyParallelStream(matrixA, matrixB, stream);
+ duration = (System.nanoTime() - start) / 1000000.;
+ out("Concurrent concurrentMatrixParralelStream time, msec: %,.3f", duration);
+ concurrentSumParallelStream += duration;
+
+ if (MatrixUtil.compare(matrixC, concurrentMatrixParallelStream)) {
+ System.err.println("Comparison (ParallelStream) failed");
break;
}
+
count++;
}
- executor.shutdown();
- out("\nAverage single thread time, sec: %.3f", singleThreadSum / 5.);
- out("Average concurrent thread time, sec: %.3f", concurrentThreadSum / 5.);
+ out("\nAverage single thread time, msec: %,9.3f", singleThreadSum / 5.);
+ out("Average concurrent thread (executor) time, msec: %,9.3f", concurrentThreadSum / 5.);
+ out("Average concurrent thread (CB) time, msec: %,9.3f", concurrentThreadSumCB / 5.);
+ out("Average concurrent thread (ForkJoinPool) time, msec: %,9.3f", concurrentThreadSumFJP / 5.);
+ out("Average concurrent thread (ParallelStream) time, msec: %,9.3f", concurrentSumParallelStream / 5.);
}
- private static void out(String format, double ms) {
- System.out.println(String.format(format, ms));
+ public static void out(String format, double ms) {
+ System.out.printf((format) + "%n", ms);
}
}
diff --git a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java
index 80a344a..dcd0171 100644
--- a/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java
+++ b/src/main/java/ru/javaops/masterjava/matrix/MatrixUtil.java
@@ -1,8 +1,14 @@
package ru.javaops.masterjava.matrix;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.*;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static ru.javaops.masterjava.matrix.MainMatrix.*;
/**
* gkislin
@@ -10,27 +16,65 @@
*/
public class MatrixUtil {
- // TODO implement parallel multiplication matrixA*matrixB
- public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, ExecutorService executor) throws InterruptedException, ExecutionException {
- final int matrixSize = matrixA.length;
- final int[][] matrixC = new int[matrixSize][matrixSize];
+ // T_O_D_O implement parallel multiplication matrixA*matrixB
+ public static int[][] concurrentMultiply(int[][] matrixA, int[][] matrixB, ExecutorService executor) {
+ int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE];
+ for (int rowA = 0; rowA < MATRIX_SIZE; rowA += TASK_SIZE) {
+ executor.submit(new Task(matrixA, matrixB, matrixC, rowA));
+ }
+ return matrixC;
+ }
+ public static int[][] concurrentMultiplyCB(int[][] matrixA, int[][] matrixB, CyclicBarrier barrier) {
+ final int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE];
+ List threads = new ArrayList<>(THREAD_NUMBER);
+ for (int rowA = 0; rowA < MATRIX_SIZE; rowA++) {
+ threads.add(new TaskCB(matrixA, matrixB, matrixC, rowA, barrier));
+ if (threads.size() == THREAD_NUMBER) {
+ for (Thread thread : threads) {
+ thread.start();
+ }
+ for (Thread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ threads.clear();
+ }
+ }
+ return matrixC;
+ }
+
+ public static int[][] concurrentMultiplyFJP(ForkJoinPool fjp) {
+ final int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE];
+ TaskFJP task = new TaskFJP(matrixC, 0, MATRIX_SIZE);
+ fjp.invoke(task);
return matrixC;
}
- // TODO optimize by https://habrahabr.ru/post/114797/
+ // T_O_D_O optimize by https://habrahabr.ru/post/114797/
public static int[][] singleThreadMultiply(int[][] matrixA, int[][] matrixB) {
final int matrixSize = matrixA.length;
final int[][] matrixC = new int[matrixSize][matrixSize];
- for (int i = 0; i < matrixSize; i++) {
- for (int j = 0; j < matrixSize; j++) {
- int sum = 0;
- for (int k = 0; k < matrixSize; k++) {
- sum += matrixA[i][k] * matrixB[k][j];
+ int[] columnMatrixB = new int[MainMatrix.MATRIX_SIZE];
+ try {
+ for (int i = 0; ; i++) {
+ for (int k = 0; k < MainMatrix.MATRIX_SIZE; k++) {
+ columnMatrixB[k] = matrixB[k][i];
+ }
+ for (int j = 0; j < MainMatrix.MATRIX_SIZE; j++) {
+ int[] rowMatrixA = matrixA[j];
+ int sum = 0;
+ for (int k = 0; k < MainMatrix.MATRIX_SIZE; k++) {
+ sum += rowMatrixA[k] * columnMatrixB[k];
+ }
+ matrixC[j][i] = sum;
}
- matrixC[i][j] = sum;
}
+ } catch (IndexOutOfBoundsException ignored) {
}
return matrixC;
}
@@ -41,7 +85,7 @@ public static int[][] create(int size) {
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
- matrix[i][j] = rn.nextInt(10);
+ matrix[i][j] = rn.nextInt(6);
}
}
return matrix;
@@ -52,10 +96,163 @@ public static boolean compare(int[][] matrixA, int[][] matrixB) {
for (int i = 0; i < matrixSize; i++) {
for (int j = 0; j < matrixSize; j++) {
if (matrixA[i][j] != matrixB[i][j]) {
- return false;
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public static int[][] transpone(int[][] matrix) {
+ int[][] matrixTransponed = new int[MATRIX_SIZE][MATRIX_SIZE];
+ for (int i = 0; i < MATRIX_SIZE; i++) {
+ for (int j = 0; j < MATRIX_SIZE; j++) {
+ matrixTransponed[j][i] = matrix[i][j];
+ }
+ }
+ return matrixTransponed;
+ }
+
+ private static void calculate(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) {
+ int[] columnMatrixB = new int[MATRIX_SIZE];
+ for (int i = rowA; i < rowA + TASK_SIZE; ++i) {
+ calcIJ(matrixA, matrixB, matrixC, columnMatrixB, i);
+ }
+ }
+
+ private static void calculate(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowFrom, int rowTo) {
+ int[] columnMatrixB = new int[MATRIX_SIZE];
+ for (int row = rowFrom; row < rowTo; row++) {
+ calcIJ(matrixA, matrixB, matrixC, columnMatrixB, row);
+ }
+ }
+
+ static void calcIJ(int[][] matrixA, int[][] matrixB, int[][] matrixC, int[] columnMatrixB, int column) {
+ for (int k = 0; k < MATRIX_SIZE; k++) {
+ columnMatrixB[k] = matrixB[k][column];
+ }
+ for (int j = 0; j < MATRIX_SIZE; j++) {
+ int[] rowMatrixA = matrixA[j];
+ int sum = 0;
+ for (int k = 0; k < MATRIX_SIZE; k++) {
+ sum += rowMatrixA[k] * columnMatrixB[k];
+ }
+ matrixC[j][column] = sum;
+ }
+ }
+
+ private static void calcIJ1(int[][] matrixA, int[][] matrixB, int[][] matrixC, int column) {
+ int[] columnMatrixB = new int[MATRIX_SIZE];
+ for (int k = 0; k < MATRIX_SIZE; k++) {
+ columnMatrixB[k] = matrixB[k][column];
+ }
+ for (int j = 0; j < MATRIX_SIZE; j++) {
+ int[] rowMatrixA = matrixA[j];
+ int sum = 0;
+ for (int k = 0; k < MATRIX_SIZE; k++) {
+ sum += rowMatrixA[k] * columnMatrixB[k];
+ }
+ matrixC[j][column] = sum;
+ }
+ }
+
+ private static void calculateCB(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) {
+ int[] columnMatrixB = new int[MATRIX_SIZE];
+ calcIJ(matrixA, matrixB, matrixC, columnMatrixB, rowA);
+ }
+
+ public static int[][] concurrentMultiplyParallelStream(final int[][] matrixA, final int[][] matrixB, IntStream stream) {
+ final int[][] matrixC = new int[MATRIX_SIZE][MATRIX_SIZE];
+ stream.forEach(row -> {
+ calculate(matrixA, matrixB, matrixC, row);
+ });
+ return matrixC;
+ }
+
+ //concurrentMatrixParralelStream time, msec: 17 943,034
+ public static int[][] concurrentMultiplyParallelStream1(final int[][] matrixA, final int[][] matrixB) {
+ return Arrays.stream(matrixA)
+ .parallel()
+ .map(AMatrixRow -> IntStream.range(0, matrixB[0].length)
+ .map(i -> IntStream.range(0, matrixB.length)
+ .map(j -> AMatrixRow[j] * matrixB[j][i])
+ .sum()
+ )
+ .toArray())
+ .toArray(int[][]::new);
+ }
+
+ static class Task implements Runnable {
+ private final int[][] matrixA;
+ private final int[][] matrixB;
+ private final int[][] matrixC;
+
+ private final int rowA;
+
+ public Task(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA) {
+ this.matrixA = matrixA;
+ this.matrixB = matrixB;
+ this.matrixC = matrixC;
+ this.rowA = rowA;
+ }
+
+ @Override
+ public void run() {
+ calculate(matrixA, matrixB, matrixC, rowA);
+ }
+
+ }
+
+ static class TaskCB extends Thread {
+ private final int[][] matrixA;
+ private final int[][] matrixB;
+ private final int[][] matrixC;
+ private final int rowA;
+
+ private final CyclicBarrier barrier;
+
+ public TaskCB(int[][] matrixA, int[][] matrixB, int[][] matrixC, int rowA, CyclicBarrier barrier) {
+ this.matrixA = matrixA;
+ this.matrixB = matrixB;
+ this.matrixC = matrixC;
+ this.rowA = rowA;
+ this.barrier = barrier;
+ }
+
+ @Override
+ public void run() {
+ try {
+ barrier.await();
+ calculateCB(matrixA, matrixB, matrixC, rowA);
+ } catch (InterruptedException | BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ static class TaskFJP extends RecursiveAction {
+ private final int[][] matrixA = MainMatrix.matrixA;
+ private final int[][] matrixB = MainMatrix.matrixB;
+ private final int[][] matrixC;
+
+ final int lo, hi;
+
+ TaskFJP(int[][] matrixC, int lo, int hi) {
+ this.matrixC = matrixC;
+ this.lo = lo;
+ this.hi = hi;
+ }
+
+ protected void compute() {
+ if (hi - lo <= TASK_SIZE / THRESH) {
+ for (int i = lo; i < hi; i++) {
+ calculate(matrixA, matrixB, matrixC, i, hi);
}
+ } else {
+ int mid = (lo + hi) >>> 1;
+ invokeAll(new TaskFJP(matrixC, lo, mid),
+ new TaskFJP(matrixC, mid, hi));
}
}
- return true;
}
}