diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/DBIProvider.java b/persist/src/main/java/ru/javaops/masterjava/persist/DBIProvider.java index 7cfaab7..142bf2a 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/DBIProvider.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/DBIProvider.java @@ -24,11 +24,13 @@ private static class DBIHolder { if (connectionFactory != null) { log.info("Init jDBI with connectionFactory"); dbi = new DBI(connectionFactory); + log.info("Init jDBI with connectionFactory {} {}",dbi,connectionFactory); } else { try { log.info("Init jDBI with JNDI"); InitialContext ctx = new InitialContext(); dbi = new DBI((DataSource) ctx.lookup("java:/comp/env/jdbc/masterjava")); + log.info("Init jDBI with JNDI {}", dbi); } catch (Exception ex) { throw new IllegalStateException("PostgreSQL initialization failed", ex); } diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java index e0230cc..14cb5c9 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java @@ -2,6 +2,7 @@ import com.bertoncelj.jdbi.entitymapper.EntityMapperFactory; import org.skife.jdbi.v2.sqlobject.*; +import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; import ru.javaops.masterjava.persist.model.User; @@ -20,13 +21,16 @@ public User insert(User user) { return user; } - @SqlUpdate("INSERT INTO users (full_name, email, flag) VALUES (:fullName, :email, CAST(:flag AS user_flag)) ") + @SqlUpdate("INSERT INTO users (full_name, email, flag) VALUES (:fullName, :email, CAST(:flag AS user_flag)) ON CONFLICT DO NOTHING") @GetGeneratedKeys abstract int insertGeneratedId(@BindBean User user); @SqlUpdate("INSERT INTO users (id, full_name, email, flag) VALUES (:id, :fullName, :email, CAST(:flag AS user_flag)) ") abstract void insertWitId(@BindBean User user); + @SqlBatch("INSERT INTO users (full_name, email, flag) VALUES (:fullName, :email, CAST(:flag AS user_flag)) ON CONFLICT DO NOTHING") + public abstract int[] insertBatch(@BindBean List users, @BatchChunkSize int chunkSize); + @SqlQuery("SELECT * FROM users ORDER BY full_name, email LIMIT :it") public abstract List getWithLimit(@Bind int limit); diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java new file mode 100644 index 0000000..f344a03 --- /dev/null +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadChunkServlet.java @@ -0,0 +1,51 @@ +package ru.javaops.masterjava.upload; + +import org.thymeleaf.context.WebContext; + +import javax.servlet.ServletException; +import javax.servlet.annotation.MultipartConfig; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.Part; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static ru.javaops.masterjava.common.web.ThymeleafListener.engine; + +@WebServlet(urlPatterns = "/chunk", loadOnStartup = 1) +@MultipartConfig(fileSizeThreshold = 1024 * 1024 * 10) //10 MB in memory limit +public class UploadChunkServlet extends HttpServlet { + + private final UserProcessor userProcessor = new UserProcessor(); + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + final WebContext webContext = new WebContext(req, resp, req.getServletContext(), req.getLocale()); + engine.process("upload", webContext, resp.getWriter()); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + final WebContext webContext = new WebContext(req, resp, req.getServletContext(), req.getLocale()); + + try { +// http://docs.oracle.com/javaee/6/tutorial/doc/glraq.html + int chunkSize = Integer.parseInt(req.getParameter("chunkSize")); + Part filePart = req.getPart("fileToUpload"); + if (filePart.getSize() == 0) { + throw new IllegalStateException("Upload file have not been selected"); + } + try (InputStream is = filePart.getInputStream()) { + List abandonedEmails = userProcessor.processChunkGroupedByReason(is, chunkSize); + webContext.setVariable("abandonedEmails", abandonedEmails); + engine.process("abonded-emails", webContext, resp.getWriter()); + } + } catch (Exception e) { + webContext.setVariable("exception", e); + engine.process("exception", webContext, resp.getWriter()); + } + } +} diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java index a2b583a..8748ba1 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UploadServlet.java @@ -34,12 +34,13 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S try { // http://docs.oracle.com/javaee/6/tutorial/doc/glraq.html + int chunkSize = Integer.parseInt(req.getParameter("chunkSize")); Part filePart = req.getPart("fileToUpload"); if (filePart.getSize() == 0) { throw new IllegalStateException("Upload file have not been selected"); } try (InputStream is = filePart.getInputStream()) { - List users = userProcessor.process(is); + List users = userProcessor.process(is, chunkSize); webContext.setVariable("users", users); engine.process("result", webContext, resp.getWriter()); } diff --git a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java index 5cb3deb..b860f78 100644 --- a/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java +++ b/web/upload/src/main/java/ru/javaops/masterjava/upload/UserProcessor.java @@ -1,5 +1,8 @@ package ru.javaops.masterjava.upload; +import org.slf4j.Logger; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.UserDao; import ru.javaops.masterjava.persist.model.User; import ru.javaops.masterjava.persist.model.UserFlag; import ru.javaops.masterjava.xml.schema.ObjectFactory; @@ -12,10 +15,20 @@ import javax.xml.stream.events.XMLEvent; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.slf4j.LoggerFactory.getLogger; public class UserProcessor { + private static final Logger logger = getLogger(UserProcessor.class); private static final JaxbParser jaxbParser = new JaxbParser(ObjectFactory.class); + private static final UserDao userDao = DBIProvider.getDao(UserDao.class); public List process(final InputStream is) throws XMLStreamException, JAXBException { final StaxStreamProcessor processor = new StaxStreamProcessor(is); @@ -29,4 +42,143 @@ public List process(final InputStream is) throws XMLStreamException, JAXBE } return users; } + + public List process(final InputStream is, int chunkSize) throws XMLStreamException, JAXBException { + final StaxStreamProcessor processor = new StaxStreamProcessor(is); + List users = new ArrayList<>(); + + JaxbUnmarshaller unmarshaller = jaxbParser.createUnmarshaller(); + while (processor.doUntil(XMLEvent.START_ELEMENT, "User")) { + ru.javaops.masterjava.xml.schema.User xmlUser = unmarshaller.unmarshal(processor.getReader(), ru.javaops.masterjava.xml.schema.User.class); + final User user = new User(xmlUser.getValue(), xmlUser.getEmail(), UserFlag.valueOf(xmlUser.getFlag().value())); + users.add(user); + } + int[] ints = userDao.insertBatch(users, chunkSize); + return IntStream.range(0, ints.length).filter(i -> ints[i] == 0).mapToObj(users::get).collect(Collectors.toList()); + } + + public List processChunk(final InputStream is, int chunkSize) throws XMLStreamException, JAXBException { + final StaxStreamProcessor processor = new StaxStreamProcessor(is); + final JaxbUnmarshaller unmarshaller = jaxbParser.createUnmarshaller(); + final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List chunk = new ArrayList<>(); + List abandonedEmails = new ArrayList<>(); + + while (processor.doUntil(XMLEvent.START_ELEMENT, "User")) { + ru.javaops.masterjava.xml.schema.User xmlUser = unmarshaller.unmarshal(processor.getReader(), ru.javaops.masterjava.xml.schema.User.class); + final User user = new User(xmlUser.getValue(), xmlUser.getEmail(), UserFlag.valueOf(xmlUser.getFlag().value())); + chunk.add(user); + if (chunk.size() >= chunkSize) { + abandonedEmails.addAll(extractAbandonedEmails(chunk, executor)); + chunk = new ArrayList<>(); + } + } + abandonedEmails.addAll(extractAbandonedEmails(chunk, executor)); + executor.shutdown(); + return abandonedEmails; + } + + public List processChunkGroupedByReason(final InputStream is, int chunkSize) throws XMLStreamException, JAXBException { + final StaxStreamProcessor processor = new StaxStreamProcessor(is); + final JaxbUnmarshaller unmarshaller = jaxbParser.createUnmarshaller(); + final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List chunk = new ArrayList<>(); + List abandonedEmails = new ArrayList<>(); + + while (processor.doUntil(XMLEvent.START_ELEMENT, "User")) { + ru.javaops.masterjava.xml.schema.User xmlUser = unmarshaller.unmarshal(processor.getReader(), ru.javaops.masterjava.xml.schema.User.class); + final User user = new User(xmlUser.getValue(), xmlUser.getEmail(), UserFlag.valueOf(xmlUser.getFlag().value())); + chunk.add(user); + if (chunk.size() >= chunkSize) { + abandonedEmails.addAll(extractAbandonedEmails(chunk, executor)); + chunk = new ArrayList<>(); + } + } + abandonedEmails.addAll(extractAbandonedEmails(chunk, executor)); + executor.shutdown(); + return abandonedEmails.stream() + .collect(Collectors.groupingBy(AbandonedEmails::getReason)).entrySet().stream() + .map(entry -> new AbandonedEmails(entry.getValue().stream() + .map(AbandonedEmails::getEmails) + .flatMap(Collection::stream) + .collect(Collectors.toList()), entry.getKey())) + .collect(Collectors.toList()); + } + + private List extractAbandonedEmails(List finalChunk, ExecutorService executor) { + List abandonedEmailsList = new ArrayList<>(); + CompletableFuture future = CompletableFuture.supplyAsync(() -> insertChunk(finalChunk), executor) + .handle((res, ex) -> { + if (null != ex) { + logger.debug("Failed to insert chunk -> CompletableFuture"); + List emailRange = new ArrayList<>(); + emailRange.add(String.format("%s -range- %s", finalChunk.get(0).getEmail(), finalChunk.get(finalChunk.size() - 1).getEmail())); + return new AbandonedEmails(emailRange, ex.getMessage()); + } + return res; + }); + try { + AbandonedEmails abandons = future.get(); + if (!abandons.getEmails().isEmpty()) { + if (abandons.getEmails().contains("user1@gmail.com")) { + throw new RuntimeException("CompletableFuture#get exception"); + } + abandonedEmailsList.add(abandons); + } + } catch (Exception ex) { + logger.debug("Failed to insert chunk -> CompletableFuture#get {}", ex.getMessage()); + List emailRange = new ArrayList<>(); + emailRange.add(String.format("%s -range- %s", finalChunk.get(0).getEmail(), finalChunk.get(finalChunk.size() - 1).getEmail())); + abandonedEmailsList.add(new AbandonedEmails(emailRange, ex.getMessage())); + } + return abandonedEmailsList; + } + + private AbandonedEmails insertChunk(List chunk) { + for (User user : chunk) { + if (user.getFullName().equals("User3")) { + throw new RuntimeException("insertChunk userDao.insertBatch exception"); + } + } + int[] ints = userDao.insertBatch(chunk, chunk.size()); + List existedEmails = IntStream.range(0, ints.length) + .filter(i -> ints[i] == 0) + .mapToObj(index -> chunk.get(index).getEmail()) + .collect(Collectors.toList()); + return new AbandonedEmails(existedEmails, "existed emails"); + } + + static class AbandonedEmails { + List emails; + String reason; + + public AbandonedEmails(List emails, String reason) { + this.emails = emails; + this.reason = reason; + } + + public List getEmails() { + return emails; + } + + public void setEmails(List emails) { + this.emails = emails; + } + + public String getReason() { + return reason; + } + + public void setReason(String reason) { + this.reason = reason; + } + + @Override + public String toString() { + return "AbandonedEmails{" + + "emails=" + emails + + ", reason='" + reason + '\'' + + '}'; + } + } } diff --git a/web/upload/src/main/webapp/WEB-INF/templates/abonded-emails.html b/web/upload/src/main/webapp/WEB-INF/templates/abonded-emails.html new file mode 100644 index 0000000..eb150ac --- /dev/null +++ b/web/upload/src/main/webapp/WEB-INF/templates/abonded-emails.html @@ -0,0 +1,24 @@ + + + + Abonded Emails + + +

Upload XML

+

Abonded Emails

+ + + + + + + + + + + + + +
EmailsReason
+ + \ No newline at end of file diff --git a/web/upload/src/main/webapp/WEB-INF/templates/upload.html b/web/upload/src/main/webapp/WEB-INF/templates/upload.html index d48c312..120515c 100644 --- a/web/upload/src/main/webapp/WEB-INF/templates/upload.html +++ b/web/upload/src/main/webapp/WEB-INF/templates/upload.html @@ -6,7 +6,10 @@

Select xml file to upload

-
+
+

+

+

diff --git a/web/webapp/pom.xml b/web/webapp/pom.xml index 60bb1c8..ac21231 100644 --- a/web/webapp/pom.xml +++ b/web/webapp/pom.xml @@ -23,8 +23,9 @@ ${project.groupId} - common + persist ${project.version} + \ No newline at end of file diff --git a/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java b/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java new file mode 100644 index 0000000..0496b10 --- /dev/null +++ b/web/webapp/src/main/java/ru/javaops/masterjava/UsersServlet.java @@ -0,0 +1,30 @@ +package ru.javaops.masterjava; + +import org.thymeleaf.TemplateEngine; +import org.thymeleaf.context.WebContext; +import ru.javaops.masterjava.common.web.ThymeleafUtil; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.UserDao; +import ru.javaops.masterjava.persist.model.User; + +import javax.servlet.ServletException; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.List; + +@WebServlet(urlPatterns = "/", loadOnStartup = 1) +public class UsersServlet extends HttpServlet { + UserDao userDao = DBIProvider.getDao(UserDao.class); + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + List users = userDao.getWithLimit(20); + final WebContext webContext = new WebContext(req, resp, req.getServletContext(), req.getLocale()); + webContext.setVariable("users", users); + TemplateEngine templateEngine = ThymeleafUtil.getTemplateEngine(getServletContext()); + templateEngine.process("users", webContext, resp.getWriter()); + } +}