From c38b97af9e7fb1edbb4fbad118b28ff83d970224 Mon Sep 17 00:00:00 2001 From: user Date: Wed, 21 Jun 2023 22:30:02 +0300 Subject: [PATCH 1/5] HW4 with no Optional --- .../masterjava/persist/DBIProvider.java | 2 ++ .../masterjava/persist/dao/UserDao.java | 4 +++ .../masterjava/upload/UploadServlet.java | 5 ++-- .../masterjava/upload/UserProcessor.java | 20 +++++++++++++ .../main/webapp/WEB-INF/templates/upload.html | 5 +++- web/webapp/pom.xml | 3 +- .../ru/javaops/masterjava/UsersServlet.java | 30 +++++++++++++++++++ 7 files changed, 65 insertions(+), 4 deletions(-) create mode 100644 web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/DBIProvider.java b/persist/src/main/java/ru/javaops/masterjava/persist/DBIProvider.java index 7cfaab7..142bf2a 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/DBIProvider.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/DBIProvider.java @@ -24,11 +24,13 @@ private static class DBIHolder { if (connectionFactory != null) { log.info("Init jDBI with connectionFactory"); dbi = new DBI(connectionFactory); + log.info("Init jDBI with connectionFactory {} {}",dbi,connectionFactory); } else { try { log.info("Init jDBI with JNDI"); InitialContext ctx = new InitialContext(); dbi = new DBI((DataSource) ctx.lookup("java:/comp/env/jdbc/masterjava")); + log.info("Init jDBI with JNDI {}", dbi); } catch (Exception ex) { throw new IllegalStateException("PostgreSQL initialization failed", ex); } diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java index e0230cc..753ab22 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java @@ -2,6 +2,7 @@ import com.bertoncelj.jdbi.entitymapper.EntityMapperFactory; import org.skife.jdbi.v2.sqlobject.*; +import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; import ru.javaops.masterjava.persist.model.User; @@ -27,6 +28,9 @@ public User insert(User user) { @SqlUpdate("INSERT INTO users (id, full_name, email, flag) VALUES (:id, :fullName, :email, CAST(:flag AS user_flag)) ") abstract void insertWitId(@BindBean User user); + @SqlBatch("INSERT INTO users (full_name, email, flag) VALUES (:fullName, :email, CAST(:flag AS user_flag)) ON CONFLICT DO NOTHING") + public abstract int[] insertBatch(@BindBean List users, @BatchChunkSize int chunkSize); + @SqlQuery("SELECT * FROM users ORDER BY full_name, email LIMIT :it") public abstract List getWithLimit(@Bind int limit); diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java index a2b583a..15a67e6 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java @@ -16,7 +16,7 @@ import static ru.javaops.masterjava.common.web.ThymeleafListener.engine; -@WebServlet(urlPatterns = "/", loadOnStartup = 1) +@WebServlet(urlPatterns = "/upload", loadOnStartup = 1) @MultipartConfig(fileSizeThreshold = 1024 * 1024 * 10) //10 MB in memory limit public class UploadServlet extends HttpServlet { @@ -34,12 +34,13 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S try { // http://docs.oracle.com/javaee/6/tutorial/doc/glraq.html + int chunkSize = Integer.parseInt(req.getParameter("chunkSize")); Part filePart = req.getPart("fileToUpload"); if (filePart.getSize() == 0) { throw new IllegalStateException("Upload file have not been selected"); } try (InputStream is = filePart.getInputStream()) { - List users = userProcessor.process(is); + List users = userProcessor.process(is, chunkSize); webContext.setVariable("users", users); engine.process("result", webContext, resp.getWriter()); } diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java index 5cb3deb..5aa6b06 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java @@ -1,5 +1,7 @@ package ru.javaops.masterjava.upload; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.UserDao; import ru.javaops.masterjava.persist.model.User; import ru.javaops.masterjava.persist.model.UserFlag; import ru.javaops.masterjava.xml.schema.ObjectFactory; @@ -13,9 +15,12 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class UserProcessor { private static final JaxbParser jaxbParser = new JaxbParser(ObjectFactory.class); + private static final UserDao userDao = DBIProvider.getDao(UserDao.class); public List process(final InputStream is) throws XMLStreamException, JAXBException { final StaxStreamProcessor processor = new StaxStreamProcessor(is); @@ -29,4 +34,19 @@ public List process(final InputStream is) throws XMLStreamException, JAXBE } return users; } + + public List process(final InputStream is, int chunkSize) throws XMLStreamException, JAXBException { + final StaxStreamProcessor processor = new StaxStreamProcessor(is); + List users = new ArrayList<>(); + + JaxbUnmarshaller unmarshaller = jaxbParser.createUnmarshaller(); + while (processor.doUntil(XMLEvent.START_ELEMENT, "User")) { + ru.javaops.masterjava.xml.schema.User xmlUser = unmarshaller.unmarshal(processor.getReader(), ru.javaops.masterjava.xml.schema.User.class); + final User user = new User(xmlUser.getValue(), xmlUser.getEmail(), UserFlag.valueOf(xmlUser.getFlag().value())); + users.add(user); + } + int[] ints = userDao.insertBatch(users, chunkSize); + return + IntStream.range(0, ints.length).filter(i -> ints[i] == 0).mapToObj(users::get).collect(Collectors.toList()); + } } diff --git a/web/upload/src/main/webapp/WEB-INF/templates/upload.html b/web/upload/src/main/webapp/WEB-INF/templates/upload.html index d48c312..120515c 100644 --- a/web/upload/src/main/webapp/WEB-INF/templates/upload.html +++ b/web/upload/src/main/webapp/WEB-INF/templates/upload.html @@ -6,7 +6,10 @@

Select xml file to upload

-
+
+

+

+

diff --git a/web/webapp/pom.xml b/web/webapp/pom.xml index 60bb1c8..ac21231 100644 --- a/web/webapp/pom.xml +++ b/web/webapp/pom.xml @@ -23,8 +23,9 @@ ${project.groupId} - common + persist ${project.version} + \ No newline at end of file diff --git a/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java b/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java new file mode 100644 index 0000000..5f4c95a --- /dev/null +++ b/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java @@ -0,0 +1,30 @@ +package ru.javaops.masterjava; + +import org.thymeleaf.TemplateEngine; +import org.thymeleaf.context.WebContext; +import ru.javaops.masterjava.common.web.ThymeleafUtil; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.UserDao; +import ru.javaops.masterjava.persist.model.User; + +import javax.servlet.ServletException; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.List; + +@WebServlet(urlPatterns = "/users", loadOnStartup = 1) +public class UsersServlet extends HttpServlet { + UserDao userDao = DBIProvider.getDao(UserDao.class); + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + List users = userDao.getWithLimit(20); + final WebContext webContext = new WebContext(req, resp, req.getServletContext(), req.getLocale()); + webContext.setVariable("users", users); + TemplateEngine templateEngine = ThymeleafUtil.getTemplateEngine(getServletContext()); + templateEngine.process("users", webContext, resp.getWriter()); + } +} From 9193c826cba7350fc0bdc410cf2ca2670f0f7b78 Mon Sep 17 00:00:00 2001 From: user Date: Mon, 26 Jun 2023 19:26:24 +0300 Subject: [PATCH 2/5] HW4 Optional for each chunk Tomcat memory leak when ending (To prevent a memory leak, the JDBC Driver has been forcibly unregistered.) --- .../masterjava/persist/dao/UserDao.java | 2 +- .../masterjava/upload/UploadChunkServlet.java | 51 +++++++++ .../masterjava/upload/UploadServlet.java | 2 +- .../masterjava/upload/UserProcessor.java | 107 +++++++++++++++++- .../WEB-INF/templates/abonded-emails.html | 24 ++++ .../ru/javaops/masterjava/UsersServlet.java | 2 +- 6 files changed, 183 insertions(+), 5 deletions(-) create mode 100644 web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java create mode 100644 web/upload/src/main/webapp/WEB-INF/templates/abonded-emails.html diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java index 753ab22..14cb5c9 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java @@ -21,7 +21,7 @@ public User insert(User user) { return user; } - @SqlUpdate("INSERT INTO users (full_name, email, flag) VALUES (:fullName, :email, CAST(:flag AS user_flag)) ") + @SqlUpdate("INSERT INTO users (full_name, email, flag) VALUES (:fullName, :email, CAST(:flag AS user_flag)) ON CONFLICT DO NOTHING") @GetGeneratedKeys abstract int insertGeneratedId(@BindBean User user); diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java new file mode 100644 index 0000000..3869078 --- /dev/null +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java @@ -0,0 +1,51 @@ +package ru.javaops.masterjava.upload; + +import org.thymeleaf.context.WebContext; + +import javax.servlet.ServletException; +import javax.servlet.annotation.MultipartConfig; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.Part; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static ru.javaops.masterjava.common.web.ThymeleafListener.engine; + +@WebServlet(urlPatterns = "/chunk", loadOnStartup = 1) +@MultipartConfig(fileSizeThreshold = 1024 * 1024 * 10) //10 MB in memory limit +public class UploadChunkServlet extends HttpServlet { + + private final UserProcessor userProcessor = new UserProcessor(); + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + final WebContext webContext = new WebContext(req, resp, req.getServletContext(), req.getLocale()); + engine.process("upload", webContext, resp.getWriter()); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + final WebContext webContext = new WebContext(req, resp, req.getServletContext(), req.getLocale()); + + try { +// http://docs.oracle.com/javaee/6/tutorial/doc/glraq.html + int chunkSize = Integer.parseInt(req.getParameter("chunkSize")); + Part filePart = req.getPart("fileToUpload"); + if (filePart.getSize() == 0) { + throw new IllegalStateException("Upload file have not been selected"); + } + try (InputStream is = filePart.getInputStream()) { + List abandonedEmails = userProcessor.processChunk(is, chunkSize); + webContext.setVariable("abandonedEmails", abandonedEmails); + engine.process("abonded-emails", webContext, resp.getWriter()); + } + } catch (Exception e) { + webContext.setVariable("exception", e); + engine.process("exception", webContext, resp.getWriter()); + } + } +} diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java index 15a67e6..8748ba1 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java @@ -16,7 +16,7 @@ import static ru.javaops.masterjava.common.web.ThymeleafListener.engine; -@WebServlet(urlPatterns = "/upload", loadOnStartup = 1) +@WebServlet(urlPatterns = "/", loadOnStartup = 1) @MultipartConfig(fileSizeThreshold = 1024 * 1024 * 10) //10 MB in memory limit public class UploadServlet extends HttpServlet { diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java index 5aa6b06..712f39c 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java @@ -1,5 +1,6 @@ package ru.javaops.masterjava.upload; +import org.slf4j.Logger; import ru.javaops.masterjava.persist.DBIProvider; import ru.javaops.masterjava.persist.dao.UserDao; import ru.javaops.masterjava.persist.model.User; @@ -15,10 +16,16 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.slf4j.LoggerFactory.getLogger; + public class UserProcessor { + private static final Logger logger = getLogger(UserProcessor.class); private static final JaxbParser jaxbParser = new JaxbParser(ObjectFactory.class); private static final UserDao userDao = DBIProvider.getDao(UserDao.class); @@ -46,7 +53,103 @@ public List process(final InputStream is, int chunkSize) throws XMLStreamE users.add(user); } int[] ints = userDao.insertBatch(users, chunkSize); - return - IntStream.range(0, ints.length).filter(i -> ints[i] == 0).mapToObj(users::get).collect(Collectors.toList()); + return IntStream.range(0, ints.length).filter(i -> ints[i] == 0).mapToObj(users::get).collect(Collectors.toList()); + } + + public List processChunk(final InputStream is, int chunkSize) throws XMLStreamException, JAXBException { + final StaxStreamProcessor processor = new StaxStreamProcessor(is); + final JaxbUnmarshaller unmarshaller = jaxbParser.createUnmarshaller(); + final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List chunk = new ArrayList<>(); + List abandonedEmails = new ArrayList<>(); + + while (processor.doUntil(XMLEvent.START_ELEMENT, "User")) { + ru.javaops.masterjava.xml.schema.User xmlUser = unmarshaller.unmarshal(processor.getReader(), ru.javaops.masterjava.xml.schema.User.class); + final User user = new User(xmlUser.getValue(), xmlUser.getEmail(), UserFlag.valueOf(xmlUser.getFlag().value())); + chunk.add(user); + if (chunk.size() >= chunkSize) { + extractAbandonedEmails(chunk, executor, abandonedEmails); + chunk = new ArrayList<>(); + } + } + extractAbandonedEmails(chunk, executor, abandonedEmails); + return abandonedEmails; + } + + private void extractAbandonedEmails(List finalChunk, ExecutorService executor, List abandonedEmails) { + CompletableFuture future + = CompletableFuture.supplyAsync(() -> insertChunk(finalChunk), executor) + .handle((res, ex) -> { + if (null != ex) { + logger.debug("Failed to insert chunk -> CompletableFuture"); + List emailRange = new ArrayList<>(); + emailRange.add(String.format("%s -range- %s", finalChunk.get(0).getEmail(), finalChunk.get(finalChunk.size() - 1).getEmail())); + return new AbandonedEmails(emailRange, ex.getMessage()); + } + return res; + }); + try { + AbandonedEmails abandons = future.get(); + if (!abandons.getEmails().isEmpty()) { + if (abandons.getEmails().contains("user1@gmail.com")) { + throw new RuntimeException("CompletableFuture#get exception"); + } + abandonedEmails.add(abandons); + } + } catch (Exception ex) { + logger.debug("Failed to insert chunk -> CompletableFuture#get {}", ex.getMessage()); + abandonedEmails.clear(); + List emailRange = new ArrayList<>(); + emailRange.add(String.format("%s -range- %s", finalChunk.get(0).getEmail(), finalChunk.get(finalChunk.size() - 1).getEmail())); + abandonedEmails.add(new AbandonedEmails(emailRange, ex.getMessage())); + } + } + + private AbandonedEmails insertChunk(List chunk) { + for (User user : chunk) { + if (user.getFullName().equals("User3")) { + throw new RuntimeException("insertChunk exception"); + } + } + int[] ints = userDao.insertBatch(chunk, chunk.size()); + List existedEmails = IntStream.range(0, ints.length) + .filter(i -> ints[i] == 0) + .mapToObj(index -> chunk.get(index).getEmail()) + .collect(Collectors.toList()); + return new AbandonedEmails(existedEmails, "existed emails"); + } + + static class AbandonedEmails { + List emails; + String reason; + + public AbandonedEmails(List emails, String reason) { + this.emails = emails; + this.reason = reason; + } + + public List getEmails() { + return emails; + } + + public void setEmails(List emails) { + this.emails = emails; + } + + public String getReason() { + return reason; + } + + public void setReason(String reason) { + this.reason = reason; + } + + @Override + public String toString() { + return "AbandonedEmails{" + + "emails=" + emails + + ", reason='" + reason + '\'' + + '}'; + } } } diff --git a/web/upload/src/main/webapp/WEB-INF/templates/abonded-emails.html b/web/upload/src/main/webapp/WEB-INF/templates/abonded-emails.html new file mode 100644 index 0000000..eb150ac --- /dev/null +++ b/web/upload/src/main/webapp/WEB-INF/templates/abonded-emails.html @@ -0,0 +1,24 @@ + + + + Abonded Emails + + +

Upload XML

+

Abonded Emails

+ + + + + + + + + + + + + +
EmailsReason
+ + \ No newline at end of file diff --git a/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java b/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java index 5f4c95a..0496b10 100644 --- a/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java +++ b/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java @@ -15,7 +15,7 @@ import java.io.IOException; import java.util.List; -@WebServlet(urlPatterns = "/users", loadOnStartup = 1) +@WebServlet(urlPatterns = "/", loadOnStartup = 1) public class UsersServlet extends HttpServlet { UserDao userDao = DBIProvider.getDao(UserDao.class); From 8403b8c67933bd8af5cdd51dde7da83369d98aed Mon Sep 17 00:00:00 2001 From: user Date: Tue, 27 Jun 2023 16:46:58 +0300 Subject: [PATCH 3/5] HW4 Optional for each chunk # - processChunkGroupedByReason - Tomcat memory leak when ending (To prevent a memory leak, the JDBC Driver has been forcibly unregistered.) --- .../masterjava/upload/UploadChunkServlet.java | 2 +- .../masterjava/upload/UserProcessor.java | 34 ++++++++++++++++--- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java index 3869078..f344a03 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java @@ -39,7 +39,7 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S throw new IllegalStateException("Upload file have not been selected"); } try (InputStream is = filePart.getInputStream()) { - List abandonedEmails = userProcessor.processChunk(is, chunkSize); + List abandonedEmails = userProcessor.processChunkGroupedByReason(is, chunkSize); webContext.setVariable("abandonedEmails", abandonedEmails); engine.process("abonded-emails", webContext, resp.getWriter()); } diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java index 712f39c..33cf35b 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java @@ -15,6 +15,7 @@ import javax.xml.stream.events.XMLEvent; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -73,12 +74,38 @@ public List processChunk(final InputStream is, int chunkSize) t } } extractAbandonedEmails(chunk, executor, abandonedEmails); + executor.shutdown(); return abandonedEmails; } + public List processChunkGroupedByReason(final InputStream is, int chunkSize) throws XMLStreamException, JAXBException { + final StaxStreamProcessor processor = new StaxStreamProcessor(is); + final JaxbUnmarshaller unmarshaller = jaxbParser.createUnmarshaller(); + final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List chunk = new ArrayList<>(); + List abandonedEmails = new ArrayList<>(); + + while (processor.doUntil(XMLEvent.START_ELEMENT, "User")) { + ru.javaops.masterjava.xml.schema.User xmlUser = unmarshaller.unmarshal(processor.getReader(), ru.javaops.masterjava.xml.schema.User.class); + final User user = new User(xmlUser.getValue(), xmlUser.getEmail(), UserFlag.valueOf(xmlUser.getFlag().value())); + chunk.add(user); + if (chunk.size() >= chunkSize) { + extractAbandonedEmails(chunk, executor, abandonedEmails); + chunk = new ArrayList<>(); + } + } + extractAbandonedEmails(chunk, executor, abandonedEmails); + return abandonedEmails.stream() + .collect(Collectors.groupingBy(AbandonedEmails::getReason)).entrySet().stream() + .map(entry -> new AbandonedEmails(entry.getValue().stream() + .map(AbandonedEmails::getEmails) + .flatMap(Collection::stream) + .collect(Collectors.toList()), entry.getKey())) + .collect(Collectors.toList()); + } + private void extractAbandonedEmails(List finalChunk, ExecutorService executor, List abandonedEmails) { - CompletableFuture future - = CompletableFuture.supplyAsync(() -> insertChunk(finalChunk), executor) + CompletableFuture future = CompletableFuture.supplyAsync(() -> insertChunk(finalChunk), executor) .handle((res, ex) -> { if (null != ex) { logger.debug("Failed to insert chunk -> CompletableFuture"); @@ -98,7 +125,6 @@ private void extractAbandonedEmails(List finalChunk, ExecutorService execu } } catch (Exception ex) { logger.debug("Failed to insert chunk -> CompletableFuture#get {}", ex.getMessage()); - abandonedEmails.clear(); List emailRange = new ArrayList<>(); emailRange.add(String.format("%s -range- %s", finalChunk.get(0).getEmail(), finalChunk.get(finalChunk.size() - 1).getEmail())); abandonedEmails.add(new AbandonedEmails(emailRange, ex.getMessage())); @@ -108,7 +134,7 @@ private void extractAbandonedEmails(List finalChunk, ExecutorService execu private AbandonedEmails insertChunk(List chunk) { for (User user : chunk) { if (user.getFullName().equals("User3")) { - throw new RuntimeException("insertChunk exception"); + throw new RuntimeException("insertChunk userDao.insertBatch exception"); } } int[] ints = userDao.insertBatch(chunk, chunk.size()); From 872c9c9c2e98853653acd0ff29bc6e6fb71c4a10 Mon Sep 17 00:00:00 2001 From: user Date: Tue, 27 Jun 2023 17:24:47 +0300 Subject: [PATCH 4/5] HW4 Optional for each chunk # processChunkGroupedByReason # executor shutdown --- .../main/java/ru/javaops/masterjava/upload/UserProcessor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java index 33cf35b..51c59b4 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java @@ -95,6 +95,7 @@ public List processChunkGroupedByReason(final InputStream is, i } } extractAbandonedEmails(chunk, executor, abandonedEmails); + executor.shutdown(); return abandonedEmails.stream() .collect(Collectors.groupingBy(AbandonedEmails::getReason)).entrySet().stream() .map(entry -> new AbandonedEmails(entry.getValue().stream() From 0b527fc86e77a23f3408398a95ef965df2cb2870 Mon Sep 17 00:00:00 2001 From: user Date: Wed, 28 Jun 2023 15:32:06 +0300 Subject: [PATCH 5/5] HW4 - small refactor --- .../javaops/masterjava/upload/UserProcessor.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java index 51c59b4..b860f78 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java @@ -69,11 +69,11 @@ public List processChunk(final InputStream is, int chunkSize) t final User user = new User(xmlUser.getValue(), xmlUser.getEmail(), UserFlag.valueOf(xmlUser.getFlag().value())); chunk.add(user); if (chunk.size() >= chunkSize) { - extractAbandonedEmails(chunk, executor, abandonedEmails); + abandonedEmails.addAll(extractAbandonedEmails(chunk, executor)); chunk = new ArrayList<>(); } } - extractAbandonedEmails(chunk, executor, abandonedEmails); + abandonedEmails.addAll(extractAbandonedEmails(chunk, executor)); executor.shutdown(); return abandonedEmails; } @@ -90,11 +90,11 @@ public List processChunkGroupedByReason(final InputStream is, i final User user = new User(xmlUser.getValue(), xmlUser.getEmail(), UserFlag.valueOf(xmlUser.getFlag().value())); chunk.add(user); if (chunk.size() >= chunkSize) { - extractAbandonedEmails(chunk, executor, abandonedEmails); + abandonedEmails.addAll(extractAbandonedEmails(chunk, executor)); chunk = new ArrayList<>(); } } - extractAbandonedEmails(chunk, executor, abandonedEmails); + abandonedEmails.addAll(extractAbandonedEmails(chunk, executor)); executor.shutdown(); return abandonedEmails.stream() .collect(Collectors.groupingBy(AbandonedEmails::getReason)).entrySet().stream() @@ -105,7 +105,8 @@ public List processChunkGroupedByReason(final InputStream is, i .collect(Collectors.toList()); } - private void extractAbandonedEmails(List finalChunk, ExecutorService executor, List abandonedEmails) { + private List extractAbandonedEmails(List finalChunk, ExecutorService executor) { + List abandonedEmailsList = new ArrayList<>(); CompletableFuture future = CompletableFuture.supplyAsync(() -> insertChunk(finalChunk), executor) .handle((res, ex) -> { if (null != ex) { @@ -122,14 +123,15 @@ private void extractAbandonedEmails(List finalChunk, ExecutorService execu if (abandons.getEmails().contains("user1@gmail.com")) { throw new RuntimeException("CompletableFuture#get exception"); } - abandonedEmails.add(abandons); + abandonedEmailsList.add(abandons); } } catch (Exception ex) { logger.debug("Failed to insert chunk -> CompletableFuture#get {}", ex.getMessage()); List emailRange = new ArrayList<>(); emailRange.add(String.format("%s -range- %s", finalChunk.get(0).getEmail(), finalChunk.get(finalChunk.size() - 1).getEmail())); - abandonedEmails.add(new AbandonedEmails(emailRange, ex.getMessage())); + abandonedEmailsList.add(new AbandonedEmails(emailRange, ex.getMessage())); } + return abandonedEmailsList; } private AbandonedEmails insertChunk(List chunk) {