diff --git a/src/main/java/ru/javaops/masterjava/service/MailService.java b/src/main/java/ru/javaops/masterjava/service/MailService.java index 2b6aeb294..cef46e55e 100644 --- a/src/main/java/ru/javaops/masterjava/service/MailService.java +++ b/src/main/java/ru/javaops/masterjava/service/MailService.java @@ -1,8 +1,10 @@ package ru.javaops.masterjava.service; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.*; +import java.util.stream.Collectors; public class MailService { private static final String OK = "OK"; @@ -11,10 +13,74 @@ public class MailService { private static final String INTERRUPTED_BY_TIMEOUT = "+++ Interrupted by timeout"; private static final String INTERRUPTED_EXCEPTION = "+++ InterruptedException"; + private final ExecutorService mailExecutor = Executors.newFixedThreadPool(8); + public GroupResult sendToList(final String template, final Set emails) throws Exception { - return new GroupResult(0, Collections.emptyList(), null); - } + final CompletionService completionService = new ExecutorCompletionService<>(mailExecutor); + + List> futures = emails.stream() + .map(email -> completionService.submit(() -> sendToUser(template, email))) + .collect(Collectors.toList()); + return new Callable() { + private int success = 0; + private List failed = new ArrayList<>(); + + @Override + public GroupResult call() { + while (!futures.isEmpty()) { + try { + Future future = completionService.poll(10, TimeUnit.SECONDS); + if (future == null) { + return cancelWithFail(INTERRUPTED_BY_TIMEOUT); + } + futures.remove(future); + MailResult mailResult = future.get(); + if (mailResult.isOk()) { + success++; + } else { + failed.add(mailResult); + if (failed.size() >= 5) { + return cancelWithFail(INTERRUPTED_BY_FAULTS_NUMBER); + } + } + } catch (ExecutionException e) { + return cancelWithFail(e.getCause().toString()); + } catch (InterruptedException e) { + return cancelWithFail(INTERRUPTED_EXCEPTION); + } + } +/* + for (Future future : futures) { + MailResult mailResult; + try { + mailResult = future.get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + return cancelWithFail(INTERRUPTED_EXCEPTION); + } catch (ExecutionException e) { + return cancelWithFail(e.getCause().toString()); + } catch (TimeoutException e) { + return cancelWithFail(INTERRUPTED_BY_TIMEOUT); + } + if (mailResult.isOk()) { + success++; + } else { + failed.add(mailResult); + if (failed.size() >= 5) { + return cancelWithFail(INTERRUPTED_BY_FAULTS_NUMBER); + } + } + } +*/ + return new GroupResult(success, failed, null); + } + + private GroupResult cancelWithFail(String cause) { + futures.forEach(f -> f.cancel(true)); + return new GroupResult(success, failed, cause); + } + }.call(); + } // dummy realization public MailResult sendToUser(String template, String email) throws Exception {